Skip to main content

fips_core/node/handlers/
rx_loop.rs

1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8    COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9    PHASE_MSG2,
10};
11use crate::node::{Node, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use std::time::Duration;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, info, trace, warn};
17
18/// How often the raw-packet drain loop yields a slice of work to the
19/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
20/// progress steady under sustained inbound bursts.
21const FALLBACK_INTERLEAVE_EVERY: usize = 32;
22/// Cap on the per-interleave fallback drain so a hot inbound spike
23/// can't starve the outer raw-packet drain in the opposite direction.
24const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
25const PACKET_DRAIN_BUDGET: usize = 256;
26const RX_LOOP_SLOW_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(100);
27
28impl Node {
29    /// Run the receive event loop.
30    ///
31    /// Processes packets from all transports, dispatching based on
32    /// the phase field in the 4-byte common prefix:
33    /// - Phase 0x0: Encrypted frame (session data)
34    /// - Phase 0x1: Handshake message 1 (initiator -> responder)
35    /// - Phase 0x2: Handshake message 2 (responder -> initiator)
36    ///
37    /// Also processes outbound IPv6 packets from the TUN reader for session
38    /// encapsulation and routing through the mesh.
39    ///
40    /// Also processes DNS-resolved identities for identity cache population.
41    ///
42    /// Also runs a periodic tick (1s) to clean up stale handshake connections
43    /// that never received a response. This prevents resource leaks when peers
44    /// are unreachable.
45    ///
46    /// This method takes ownership of the packet_rx channel and runs
47    /// until the channel is closed (typically when stop() is called).
48    pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
49        let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
50
51        // Take the TUN outbound receiver, or create a dummy channel that never
52        // produces messages (when TUN is disabled). Holding the sender prevents
53        // the channel from closing.
54        let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
55            Some(rx) => (rx, None),
56            None => {
57                let (tx, rx) = tokio::sync::mpsc::channel(1);
58                (rx, Some(tx))
59            }
60        };
61
62        // Take the DNS identity receiver, or create a dummy channel (when DNS
63        // is disabled). Same pattern as TUN outbound.
64        let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
65            Some(rx) => (rx, None),
66            None => {
67                let (tx, rx) = tokio::sync::mpsc::channel(1);
68                (rx, Some(tx))
69            }
70        };
71
72        // Take the endpoint-data command receiver, or create a dummy channel
73        // when the embedded endpoint API is not in use.
74        let (mut endpoint_command_rx, _endpoint_command_guard) =
75            match self.endpoint_command_rx.take() {
76                Some(rx) => (rx, None),
77                None => {
78                    let (tx, rx) = tokio::sync::mpsc::channel(1);
79                    (rx, Some(tx))
80                }
81            };
82
83        // Take the decrypt worker fallback receiver if a worker pool
84        // is in use. The worker pushes non-fast-path packets (anything
85        // that's not bulk EndpointData) here for the legacy dispatch.
86        let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
87            match self.decrypt_fallback_rx.take() {
88                Some(rx) => (rx, None),
89                None => {
90                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
91                    (rx, Some(tx))
92                }
93            };
94
95        let mut tick =
96            tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
97
98        // Set up control socket channel
99        let (control_tx, mut control_rx) =
100            tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
101
102        if self.config.node.control.enabled {
103            let config = self.config.node.control.clone();
104            let tx = control_tx.clone();
105            tokio::spawn(async move {
106                match ControlSocket::bind(&config) {
107                    Ok(socket) => {
108                        socket.accept_loop(tx).await;
109                    }
110                    Err(e) => {
111                        warn!(error = %e, "Failed to bind control socket");
112                    }
113                }
114            });
115        }
116        // Drop unused sender to avoid keeping channel open if control is disabled
117        drop(control_tx);
118
119        info!("RX event loop started");
120        // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
121        crate::perf_profile::maybe_spawn_reporter();
122
123        loop {
124            tokio::select! {
125                biased;
126                // Decrypt-worker fallback drains FIRST. The previous
127                // ordering put `packet_rx` first, which under sustained
128                // inbound bursts let the raw-packet drain (up to 256
129                // packets + flush) starve fallback work for tens of
130                // milliseconds. UDP throughput tolerates that; TCP
131                // doesn't — late FMP plaintexts mean late ACKs,
132                // dup-ACK fast retransmits, and cwnd collapse.
133                // Reproduced on native macOS / Wi-Fi where TCP fell to
134                // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
135                // tunnel. Promoting fallback gives the kernel-side
136                // TCP machinery a fair chance to see its ACKs and
137                // keep cwnd growing.
138                Some(event) = decrypt_fallback_rx.recv() => {
139                    self.process_decrypt_worker_event(event).await;
140                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
141                    self.flush_pending_sends().await;
142                }
143                packet = packet_rx.recv() => {
144                    match packet {
145                        Some(p) => {
146                            self.drain_packet_rx(
147                                &mut packet_rx,
148                                &mut decrypt_fallback_rx,
149                                Some(p),
150                                PACKET_DRAIN_BUDGET,
151                            ).await;
152                        }
153                        None => break, // channel closed
154                    }
155                }
156                _ = tick.tick() => {
157                    let drained = self.drain_packet_rx(
158                        &mut packet_rx,
159                        &mut decrypt_fallback_rx,
160                        None,
161                        PACKET_DRAIN_BUDGET,
162                    ).await;
163                    if drained > 0 {
164                        debug!(
165                            drained,
166                            "Drained queued packets before rx-loop maintenance"
167                        );
168                    }
169                    self.run_rx_loop_maintenance_tick().await;
170                }
171                Some(ipv6_packet) = tun_outbound_rx.recv() => {
172                    self.handle_tun_outbound(ipv6_packet).await;
173                    let mut drained = 0;
174                    while drained < 256 {
175                        match tun_outbound_rx.try_recv() {
176                            Ok(p) => {
177                                self.handle_tun_outbound(p).await;
178                                drained += 1;
179                            }
180                            Err(_) => break,
181                        }
182                    }
183                    // Flush any trailing batched sends so the last
184                    // packets of a burst don't sit in the per-transport
185                    // sendmmsg buffer waiting for the threshold.
186                    self.flush_pending_sends().await;
187                }
188                Some(identity) = dns_identity_rx.recv() => {
189                    debug!(
190                        node_addr = %identity.node_addr,
191                        "Registering identity from DNS resolution"
192                    );
193                    self.register_identity(identity.node_addr, identity.pubkey);
194                }
195                Some(command) = endpoint_command_rx.recv() => {
196                    self.handle_endpoint_data_command(command).await;
197                    // Same drain pattern: when the application is shoving
198                    // tunnel data in via send_oneway, several Send commands
199                    // typically queue up between scheduler hops.
200                    let mut drained = 0;
201                    while drained < 256 {
202                        match endpoint_command_rx.try_recv() {
203                            Ok(c) => {
204                                self.handle_endpoint_data_command(c).await;
205                                drained += 1;
206                            }
207                            Err(_) => break,
208                        }
209                    }
210                    // Flush any trailing batched sends from the
211                    // per-transport sendmmsg buffer.
212                    self.flush_pending_sends().await;
213                }
214                Some((request, response_tx)) = control_rx.recv() => {
215                    let response = if request.command.starts_with("show_") {
216                        queries::dispatch(self, &request.command, request.params.as_ref())
217                    } else {
218                        commands::dispatch(
219                            self,
220                            &request.command,
221                            request.params.as_ref(),
222                        ).await
223                    };
224                    let _ = response_tx.send(response);
225                }
226            }
227        }
228
229        info!("RX event loop stopped (channel closed)");
230        Ok(())
231    }
232
233    async fn drain_packet_rx(
234        &mut self,
235        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
236        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
237        first_packet: Option<ReceivedPacket>,
238        budget: usize,
239    ) -> usize {
240        let mut drained = 0usize;
241        if let Some(packet) = first_packet {
242            self.process_packet(packet).await;
243            drained = 1;
244        }
245
246        // Drain remaining ready inbound packets in a tight loop before
247        // yielding back to select! Every yield is a scheduler hop, and at
248        // line rate transports typically have several packets available per
249        // wake. Caps at a batch boundary so other branches eventually get a
250        // turn even under sustained load.
251        while drained < budget {
252            if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
253                self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
254                    .await;
255            }
256            match packet_rx.try_recv() {
257                Ok(packet) => {
258                    self.process_packet(packet).await;
259                    drained += 1;
260                }
261                Err(_) => break,
262            }
263        }
264
265        if drained > 0 {
266            // One trailing fallback drain so the last bounced packets of the
267            // burst aren't held up by the post-burst send flush.
268            self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
269                .await;
270            // Flush any batched sends triggered by inbound packets (e.g.
271            // forwarded SessionDatagrams, MMP reports, tree announces).
272            self.flush_pending_sends().await;
273        }
274        drained
275    }
276
277    async fn run_rx_loop_maintenance_tick(&mut self) {
278        self.check_timeouts();
279        let now_ms = Self::now_ms();
280        // Link/session liveness must run before slower retry/discovery work:
281        // under bulk send pressure a late heartbeat or MMP report is
282        // indistinguishable from a dead direct path on the remote peer.
283        self.check_link_heartbeats().await;
284        self.reload_peer_acl();
285        self.resend_pending_handshakes(now_ms).await;
286        self.resend_pending_rekeys(now_ms).await;
287        self.resend_pending_session_handshakes(now_ms).await;
288        self.resend_pending_session_msg3(now_ms).await;
289        self.purge_idle_sessions(now_ms);
290        self.purge_learned_routes(now_ms);
291        self.check_mmp_reports().await;
292        self.check_session_mmp_reports().await;
293        self.check_rekey().await;
294        self.check_session_rekey().await;
295        self.check_pending_lookups(now_ms).await;
296        self.poll_pending_connects().await;
297        self.process_pending_retries(now_ms).await;
298        self.poll_transport_discovery().await;
299        self.activate_connected_udp_sessions().await;
300        self.sample_transport_congestion();
301
302        if tokio::time::timeout(
303            RX_LOOP_SLOW_MAINTENANCE_TIMEOUT,
304            self.run_rx_loop_slow_maintenance_tick(),
305        )
306        .await
307        .is_err()
308        {
309            self.mark_rx_loop_maintenance_timeout();
310            warn!(
311                timeout_ms = RX_LOOP_SLOW_MAINTENANCE_TIMEOUT.as_millis() as u64,
312                "RX loop slow maintenance timed out; continuing packet processing"
313            );
314        }
315    }
316
317    async fn run_rx_loop_slow_maintenance_tick(&mut self) {
318        // Discovery and graph/stat maintenance can involve relay work or
319        // larger scans. Keep it bounded after direct-path liveness and session
320        // upkeep so a slow Nostr/LAN tick degrades discovery freshness, not
321        // packet flow.
322        self.poll_nostr_discovery().await;
323        self.poll_lan_discovery().await;
324        self.poll_local_instance_discovery().await;
325        self.check_tree_state().await;
326        self.check_bloom_state().await;
327        self.compute_mesh_size();
328        self.record_stats_history();
329    }
330
331    /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
332    /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
333    /// flag byte the worker captured into `DecryptFallback::fmp_flags`
334    /// (without this both ECN CE propagation and spin-bit RTT
335    /// observation are dropped on the worker path) and slices the
336    /// plaintext out of the original wire buffer with zero allocation.
337    async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
338        match event {
339            DecryptWorkerEvent::Plaintext(fallback) => {
340                self.process_decrypt_fallback(fallback).await;
341            }
342            DecryptWorkerEvent::DecryptFailure(report) => {
343                self.process_decrypt_failure_report(report).await;
344            }
345        }
346    }
347
348    async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
349        let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
350        let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
351        let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
352            ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
353        self.process_authentic_fmp_plaintext(
354            &fallback.source_node_addr,
355            fallback.transport_id,
356            &fallback.remote_addr,
357            fallback.timestamp_ms,
358            fallback.packet_len,
359            fallback.fmp_counter,
360            ce_flag,
361            sp_flag,
362            plaintext,
363        )
364        .await;
365    }
366
367    async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
368        debug!(
369            peer = %self.peer_display_name(&report.source_node_addr),
370            counter = report.fmp_counter,
371            replay_highest = report.fmp_replay_highest,
372            "Worker FMP AEAD decryption failed"
373        );
374        self.handle_decrypt_failure_report(&report).await;
375    }
376
377    /// Drain up to `budget` queued fallbacks without yielding back to
378    /// `select!`. Returns the number processed. Called both from the
379    /// promoted-fallback select arm (after the head item) and
380    /// interleaved inside the packet_rx drain loop so bounced FMP
381    /// plaintexts can't accumulate behind a 256-packet inbound burst.
382    async fn drain_decrypt_fallback(
383        &mut self,
384        rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
385        budget: usize,
386    ) -> usize {
387        let mut drained = 0;
388        while drained < budget {
389            match rx.try_recv() {
390                Ok(event) => {
391                    self.process_decrypt_worker_event(event).await;
392                    drained += 1;
393                }
394                Err(_) => break,
395            }
396        }
397        drained
398    }
399
400    /// Flush any pending batched sends across all transports. Today
401    /// every transport's `flush_pending_send` is a no-op — the UDP
402    /// transport's per-transport `pending_send` buffer was removed
403    /// when the bulk data path moved into `encrypt_worker` (which
404    /// does its own target-grouped `sendmmsg(2)` directly). The
405    /// call sites are retained so any future batched transport can
406    /// opt in by overriding `flush_pending_send` without touching
407    /// the rx_loop.
408    async fn flush_pending_sends(&self) {
409        for transport in self.transports.values() {
410            if matches!(transport, TransportHandle::Udp(_)) {
411                transport.flush_pending_send().await;
412            }
413        }
414    }
415
416    /// Process a single received packet.
417    ///
418    /// Dispatches based on the phase field in the 4-byte common prefix.
419    pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
420        let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
421        crate::perf_profile::record_since(
422            crate::perf_profile::Stage::TransportQueueWait,
423            packet.trace_enqueued_at,
424        );
425        if is_punch_packet(&packet.data) {
426            trace!(
427                transport_id = %packet.transport_id,
428                remote_addr = %packet.remote_addr,
429                bytes = packet.data.len(),
430                "Dropping stray punch probe/ack in FMP rx loop"
431            );
432            return;
433        }
434        if packet.data.len() < COMMON_PREFIX_SIZE {
435            return; // Drop packets too short for common prefix
436        }
437
438        let prefix = match CommonPrefix::parse(&packet.data) {
439            Some(p) => p,
440            None => return, // Malformed prefix
441        };
442        if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
443            debug!(
444                transport_id = %packet.transport_id,
445                remote_addr = %packet.remote_addr,
446                bytes = packet.data.len(),
447                phase = prefix.phase,
448                version = prefix.version,
449                "FMP handshake packet dispatch"
450            );
451        } else {
452            trace!(
453                transport_id = %packet.transport_id,
454                remote_addr = %packet.remote_addr,
455                bytes = packet.data.len(),
456                phase = prefix.phase,
457                version = prefix.version,
458                "FMP packet dispatch"
459            );
460        }
461
462        if prefix.version != FMP_VERSION {
463            debug!(
464                version = prefix.version,
465                transport_id = %packet.transport_id,
466                "Unknown FMP version, dropping"
467            );
468
469            // If the packet arrived on an adopted Nostr-NAT bootstrap
470            // transport, the originating peer is necessarily on a
471            // different FMP-protocol version than us — the discovery
472            // sweep would otherwise re-traverse them every cycle even
473            // though no msg1/msg2 exchange can ever succeed. Bump the
474            // discovery-layer cooldown to the long protocol-mismatch
475            // window and emit a single WARN per fresh observation.
476            let looks_like_fmp_phase =
477                matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
478            if looks_like_fmp_phase
479                && self.bootstrap_transports.contains(&packet.transport_id)
480                && let Some(npub) = self
481                    .bootstrap_transport_npubs
482                    .get(&packet.transport_id)
483                    .cloned()
484                && let Some(handle) = self.nostr_discovery_handle()
485            {
486                let now_ms = Self::now_ms();
487                let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
488                if handle.record_protocol_mismatch(&npub, now_ms) {
489                    warn!(
490                        peer_npub = %npub,
491                        transport_id = %packet.transport_id,
492                        peer_version = prefix.version,
493                        our_version = FMP_VERSION,
494                        cooldown_secs,
495                        "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
496                    );
497                }
498            }
499            return;
500        }
501
502        match prefix.phase {
503            PHASE_ESTABLISHED => {
504                self.handle_encrypted_frame(packet).await;
505            }
506            PHASE_MSG1 => {
507                self.handle_msg1(packet).await;
508            }
509            PHASE_MSG2 => {
510                self.handle_msg2(packet).await;
511            }
512            _ => {
513                debug!(
514                    phase = prefix.phase,
515                    transport_id = %packet.transport_id,
516                    "Unknown FMP phase, dropping"
517                );
518            }
519        }
520    }
521}