Skip to main content

fips_core/node/handlers/
rx_loop.rs

1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8    COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9    PHASE_MSG2,
10};
11use crate::node::{Node, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use std::time::Duration;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, info, trace, warn};
17
18/// How often the raw-packet drain loop yields a slice of work to the
19/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
20/// progress steady under sustained inbound bursts.
21const FALLBACK_INTERLEAVE_EVERY: usize = 32;
22/// Cap on the per-interleave fallback drain so a hot inbound spike
23/// can't starve the outer raw-packet drain in the opposite direction.
24const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
25const PACKET_DRAIN_BUDGET: usize = 256;
26const RX_LOOP_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(500);
27
28impl Node {
29    /// Run the receive event loop.
30    ///
31    /// Processes packets from all transports, dispatching based on
32    /// the phase field in the 4-byte common prefix:
33    /// - Phase 0x0: Encrypted frame (session data)
34    /// - Phase 0x1: Handshake message 1 (initiator -> responder)
35    /// - Phase 0x2: Handshake message 2 (responder -> initiator)
36    ///
37    /// Also processes outbound IPv6 packets from the TUN reader for session
38    /// encapsulation and routing through the mesh.
39    ///
40    /// Also processes DNS-resolved identities for identity cache population.
41    ///
42    /// Also runs a periodic tick (1s) to clean up stale handshake connections
43    /// that never received a response. This prevents resource leaks when peers
44    /// are unreachable.
45    ///
46    /// This method takes ownership of the packet_rx channel and runs
47    /// until the channel is closed (typically when stop() is called).
48    pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
49        let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
50
51        // Take the TUN outbound receiver, or create a dummy channel that never
52        // produces messages (when TUN is disabled). Holding the sender prevents
53        // the channel from closing.
54        let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
55            Some(rx) => (rx, None),
56            None => {
57                let (tx, rx) = tokio::sync::mpsc::channel(1);
58                (rx, Some(tx))
59            }
60        };
61
62        // Take the DNS identity receiver, or create a dummy channel (when DNS
63        // is disabled). Same pattern as TUN outbound.
64        let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
65            Some(rx) => (rx, None),
66            None => {
67                let (tx, rx) = tokio::sync::mpsc::channel(1);
68                (rx, Some(tx))
69            }
70        };
71
72        // Take the endpoint-data command receiver, or create a dummy channel
73        // when the embedded endpoint API is not in use.
74        let (mut endpoint_command_rx, _endpoint_command_guard) =
75            match self.endpoint_command_rx.take() {
76                Some(rx) => (rx, None),
77                None => {
78                    let (tx, rx) = tokio::sync::mpsc::channel(1);
79                    (rx, Some(tx))
80                }
81            };
82
83        // Take the decrypt worker fallback receiver if a worker pool
84        // is in use. The worker pushes non-fast-path packets (anything
85        // that's not bulk EndpointData) here for the legacy dispatch.
86        let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
87            match self.decrypt_fallback_rx.take() {
88                Some(rx) => (rx, None),
89                None => {
90                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
91                    (rx, Some(tx))
92                }
93            };
94
95        let mut tick =
96            tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
97
98        // Set up control socket channel
99        let (control_tx, mut control_rx) =
100            tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
101
102        if self.config.node.control.enabled {
103            let config = self.config.node.control.clone();
104            let tx = control_tx.clone();
105            tokio::spawn(async move {
106                match ControlSocket::bind(&config) {
107                    Ok(socket) => {
108                        socket.accept_loop(tx).await;
109                    }
110                    Err(e) => {
111                        warn!(error = %e, "Failed to bind control socket");
112                    }
113                }
114            });
115        }
116        // Drop unused sender to avoid keeping channel open if control is disabled
117        drop(control_tx);
118
119        info!("RX event loop started");
120        // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
121        crate::perf_profile::maybe_spawn_reporter();
122
123        loop {
124            tokio::select! {
125                biased;
126                // Decrypt-worker fallback drains FIRST. The previous
127                // ordering put `packet_rx` first, which under sustained
128                // inbound bursts let the raw-packet drain (up to 256
129                // packets + flush) starve fallback work for tens of
130                // milliseconds. UDP throughput tolerates that; TCP
131                // doesn't — late FMP plaintexts mean late ACKs,
132                // dup-ACK fast retransmits, and cwnd collapse.
133                // Reproduced on native macOS / Wi-Fi where TCP fell to
134                // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
135                // tunnel. Promoting fallback gives the kernel-side
136                // TCP machinery a fair chance to see its ACKs and
137                // keep cwnd growing.
138                Some(event) = decrypt_fallback_rx.recv() => {
139                    self.process_decrypt_worker_event(event).await;
140                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
141                    self.flush_pending_sends().await;
142                }
143                packet = packet_rx.recv() => {
144                    match packet {
145                        Some(p) => {
146                            self.drain_packet_rx(
147                                &mut packet_rx,
148                                &mut decrypt_fallback_rx,
149                                Some(p),
150                                PACKET_DRAIN_BUDGET,
151                            ).await;
152                        }
153                        None => break, // channel closed
154                    }
155                }
156                _ = tick.tick() => {
157                    let drained = self.drain_packet_rx(
158                        &mut packet_rx,
159                        &mut decrypt_fallback_rx,
160                        None,
161                        PACKET_DRAIN_BUDGET,
162                    ).await;
163                    if drained > 0 {
164                        debug!(
165                            drained,
166                            "Drained queued packets before rx-loop maintenance"
167                        );
168                    }
169                    if tokio::time::timeout(
170                        RX_LOOP_MAINTENANCE_TIMEOUT,
171                        self.run_rx_loop_maintenance_tick(),
172                    ).await.is_err() {
173                        self.mark_rx_loop_maintenance_timeout();
174                        warn!(
175                            timeout_ms = RX_LOOP_MAINTENANCE_TIMEOUT.as_millis() as u64,
176                            "RX loop maintenance timed out; continuing packet processing"
177                        );
178                    }
179                }
180                Some(ipv6_packet) = tun_outbound_rx.recv() => {
181                    self.handle_tun_outbound(ipv6_packet).await;
182                    let mut drained = 0;
183                    while drained < 256 {
184                        match tun_outbound_rx.try_recv() {
185                            Ok(p) => {
186                                self.handle_tun_outbound(p).await;
187                                drained += 1;
188                            }
189                            Err(_) => break,
190                        }
191                    }
192                    // Flush any trailing batched sends so the last
193                    // packets of a burst don't sit in the per-transport
194                    // sendmmsg buffer waiting for the threshold.
195                    self.flush_pending_sends().await;
196                }
197                Some(identity) = dns_identity_rx.recv() => {
198                    debug!(
199                        node_addr = %identity.node_addr,
200                        "Registering identity from DNS resolution"
201                    );
202                    self.register_identity(identity.node_addr, identity.pubkey);
203                }
204                Some(command) = endpoint_command_rx.recv() => {
205                    self.handle_endpoint_data_command(command).await;
206                    // Same drain pattern: when the application is shoving
207                    // tunnel data in via send_oneway, several Send commands
208                    // typically queue up between scheduler hops.
209                    let mut drained = 0;
210                    while drained < 256 {
211                        match endpoint_command_rx.try_recv() {
212                            Ok(c) => {
213                                self.handle_endpoint_data_command(c).await;
214                                drained += 1;
215                            }
216                            Err(_) => break,
217                        }
218                    }
219                    // Flush any trailing batched sends from the
220                    // per-transport sendmmsg buffer.
221                    self.flush_pending_sends().await;
222                }
223                Some((request, response_tx)) = control_rx.recv() => {
224                    let response = if request.command.starts_with("show_") {
225                        queries::dispatch(self, &request.command, request.params.as_ref())
226                    } else {
227                        commands::dispatch(
228                            self,
229                            &request.command,
230                            request.params.as_ref(),
231                        ).await
232                    };
233                    let _ = response_tx.send(response);
234                }
235            }
236        }
237
238        info!("RX event loop stopped (channel closed)");
239        Ok(())
240    }
241
242    async fn drain_packet_rx(
243        &mut self,
244        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
245        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
246        first_packet: Option<ReceivedPacket>,
247        budget: usize,
248    ) -> usize {
249        let mut drained = 0usize;
250        if let Some(packet) = first_packet {
251            self.process_packet(packet).await;
252            drained = 1;
253        }
254
255        // Drain remaining ready inbound packets in a tight loop before
256        // yielding back to select! Every yield is a scheduler hop, and at
257        // line rate transports typically have several packets available per
258        // wake. Caps at a batch boundary so other branches eventually get a
259        // turn even under sustained load.
260        while drained < budget {
261            if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
262                self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
263                    .await;
264            }
265            match packet_rx.try_recv() {
266                Ok(packet) => {
267                    self.process_packet(packet).await;
268                    drained += 1;
269                }
270                Err(_) => break,
271            }
272        }
273
274        if drained > 0 {
275            // One trailing fallback drain so the last bounced packets of the
276            // burst aren't held up by the post-burst send flush.
277            self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
278                .await;
279            // Flush any batched sends triggered by inbound packets (e.g.
280            // forwarded SessionDatagrams, MMP reports, tree announces).
281            self.flush_pending_sends().await;
282        }
283        drained
284    }
285
286    async fn run_rx_loop_maintenance_tick(&mut self) {
287        self.check_timeouts();
288        let now_ms = Self::now_ms();
289        // Link liveness must run before slower retry/discovery/report work:
290        // this whole maintenance future is guarded by a watchdog timeout, and
291        // under bulk send pressure a late heartbeat is indistinguishable from a
292        // dead direct path on the remote peer.
293        self.check_link_heartbeats().await;
294        self.reload_peer_acl();
295        self.poll_pending_connects().await;
296        self.poll_nostr_discovery().await;
297        self.poll_lan_discovery().await;
298        self.poll_local_instance_discovery().await;
299        self.resend_pending_handshakes(now_ms).await;
300        self.resend_pending_rekeys(now_ms).await;
301        self.resend_pending_session_handshakes(now_ms).await;
302        self.resend_pending_session_msg3(now_ms).await;
303        self.purge_idle_sessions(now_ms);
304        self.purge_learned_routes(now_ms);
305        self.process_pending_retries(now_ms).await;
306        self.check_tree_state().await;
307        self.check_bloom_state().await;
308        self.compute_mesh_size();
309        self.record_stats_history();
310        self.check_mmp_reports().await;
311        self.check_session_mmp_reports().await;
312        self.check_rekey().await;
313        self.check_session_rekey().await;
314        self.check_pending_lookups(now_ms).await;
315        self.poll_transport_discovery().await;
316        self.sample_transport_congestion();
317        self.activate_connected_udp_sessions().await;
318    }
319
320    /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
321    /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
322    /// flag byte the worker captured into `DecryptFallback::fmp_flags`
323    /// (without this both ECN CE propagation and spin-bit RTT
324    /// observation are dropped on the worker path) and slices the
325    /// plaintext out of the original wire buffer with zero allocation.
326    async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
327        match event {
328            DecryptWorkerEvent::Plaintext(fallback) => {
329                self.process_decrypt_fallback(fallback).await;
330            }
331            DecryptWorkerEvent::DecryptFailure(report) => {
332                self.process_decrypt_failure_report(report).await;
333            }
334        }
335    }
336
337    async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
338        let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
339        let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
340        let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
341            ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
342        self.process_authentic_fmp_plaintext(
343            &fallback.source_node_addr,
344            fallback.transport_id,
345            &fallback.remote_addr,
346            fallback.timestamp_ms,
347            fallback.packet_len,
348            fallback.fmp_counter,
349            ce_flag,
350            sp_flag,
351            plaintext,
352        )
353        .await;
354    }
355
356    async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
357        debug!(
358            peer = %self.peer_display_name(&report.source_node_addr),
359            counter = report.fmp_counter,
360            replay_highest = report.fmp_replay_highest,
361            "Worker FMP AEAD decryption failed"
362        );
363        self.handle_decrypt_failure_report(&report).await;
364    }
365
366    /// Drain up to `budget` queued fallbacks without yielding back to
367    /// `select!`. Returns the number processed. Called both from the
368    /// promoted-fallback select arm (after the head item) and
369    /// interleaved inside the packet_rx drain loop so bounced FMP
370    /// plaintexts can't accumulate behind a 256-packet inbound burst.
371    async fn drain_decrypt_fallback(
372        &mut self,
373        rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
374        budget: usize,
375    ) -> usize {
376        let mut drained = 0;
377        while drained < budget {
378            match rx.try_recv() {
379                Ok(event) => {
380                    self.process_decrypt_worker_event(event).await;
381                    drained += 1;
382                }
383                Err(_) => break,
384            }
385        }
386        drained
387    }
388
389    /// Flush any pending batched sends across all transports. Today
390    /// every transport's `flush_pending_send` is a no-op — the UDP
391    /// transport's per-transport `pending_send` buffer was removed
392    /// when the bulk data path moved into `encrypt_worker` (which
393    /// does its own target-grouped `sendmmsg(2)` directly). The
394    /// call sites are retained so any future batched transport can
395    /// opt in by overriding `flush_pending_send` without touching
396    /// the rx_loop.
397    async fn flush_pending_sends(&self) {
398        for transport in self.transports.values() {
399            if matches!(transport, TransportHandle::Udp(_)) {
400                transport.flush_pending_send().await;
401            }
402        }
403    }
404
405    /// Process a single received packet.
406    ///
407    /// Dispatches based on the phase field in the 4-byte common prefix.
408    pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
409        let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
410        crate::perf_profile::record_since(
411            crate::perf_profile::Stage::TransportQueueWait,
412            packet.trace_enqueued_at,
413        );
414        if is_punch_packet(&packet.data) {
415            trace!(
416                transport_id = %packet.transport_id,
417                remote_addr = %packet.remote_addr,
418                bytes = packet.data.len(),
419                "Dropping stray punch probe/ack in FMP rx loop"
420            );
421            return;
422        }
423        if packet.data.len() < COMMON_PREFIX_SIZE {
424            return; // Drop packets too short for common prefix
425        }
426
427        let prefix = match CommonPrefix::parse(&packet.data) {
428            Some(p) => p,
429            None => return, // Malformed prefix
430        };
431        if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
432            debug!(
433                transport_id = %packet.transport_id,
434                remote_addr = %packet.remote_addr,
435                bytes = packet.data.len(),
436                phase = prefix.phase,
437                version = prefix.version,
438                "FMP handshake packet dispatch"
439            );
440        } else {
441            trace!(
442                transport_id = %packet.transport_id,
443                remote_addr = %packet.remote_addr,
444                bytes = packet.data.len(),
445                phase = prefix.phase,
446                version = prefix.version,
447                "FMP packet dispatch"
448            );
449        }
450
451        if prefix.version != FMP_VERSION {
452            debug!(
453                version = prefix.version,
454                transport_id = %packet.transport_id,
455                "Unknown FMP version, dropping"
456            );
457
458            // If the packet arrived on an adopted Nostr-NAT bootstrap
459            // transport, the originating peer is necessarily on a
460            // different FMP-protocol version than us — the discovery
461            // sweep would otherwise re-traverse them every cycle even
462            // though no msg1/msg2 exchange can ever succeed. Bump the
463            // discovery-layer cooldown to the long protocol-mismatch
464            // window and emit a single WARN per fresh observation.
465            let looks_like_fmp_phase =
466                matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
467            if looks_like_fmp_phase
468                && self.bootstrap_transports.contains(&packet.transport_id)
469                && let Some(npub) = self
470                    .bootstrap_transport_npubs
471                    .get(&packet.transport_id)
472                    .cloned()
473                && let Some(handle) = self.nostr_discovery_handle()
474            {
475                let now_ms = Self::now_ms();
476                let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
477                if handle.record_protocol_mismatch(&npub, now_ms) {
478                    warn!(
479                        peer_npub = %npub,
480                        transport_id = %packet.transport_id,
481                        peer_version = prefix.version,
482                        our_version = FMP_VERSION,
483                        cooldown_secs,
484                        "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
485                    );
486                }
487            }
488            return;
489        }
490
491        match prefix.phase {
492            PHASE_ESTABLISHED => {
493                self.handle_encrypted_frame(packet).await;
494            }
495            PHASE_MSG1 => {
496                self.handle_msg1(packet).await;
497            }
498            PHASE_MSG2 => {
499                self.handle_msg2(packet).await;
500            }
501            _ => {
502                debug!(
503                    phase = prefix.phase,
504                    transport_id = %packet.transport_id,
505                    "Unknown FMP phase, dropping"
506                );
507            }
508        }
509    }
510}