Skip to main content

fips_core/node/handlers/
rx_loop.rs

1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8    COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9    PHASE_MSG2,
10};
11use crate::node::{Node, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use std::time::Duration;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, info, trace, warn};
17
18/// How often the raw-packet drain loop yields a slice of work to the
19/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
20/// progress steady under sustained inbound bursts.
21const FALLBACK_INTERLEAVE_EVERY: usize = 32;
22/// Cap on the per-interleave fallback drain so a hot inbound spike
23/// can't starve the outer raw-packet drain in the opposite direction.
24const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
25const PACKET_DRAIN_BUDGET: usize = 256;
26const RX_LOOP_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(500);
27
28impl Node {
29    /// Run the receive event loop.
30    ///
31    /// Processes packets from all transports, dispatching based on
32    /// the phase field in the 4-byte common prefix:
33    /// - Phase 0x0: Encrypted frame (session data)
34    /// - Phase 0x1: Handshake message 1 (initiator -> responder)
35    /// - Phase 0x2: Handshake message 2 (responder -> initiator)
36    ///
37    /// Also processes outbound IPv6 packets from the TUN reader for session
38    /// encapsulation and routing through the mesh.
39    ///
40    /// Also processes DNS-resolved identities for identity cache population.
41    ///
42    /// Also runs a periodic tick (1s) to clean up stale handshake connections
43    /// that never received a response. This prevents resource leaks when peers
44    /// are unreachable.
45    ///
46    /// This method takes ownership of the packet_rx channel and runs
47    /// until the channel is closed (typically when stop() is called).
48    pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
49        let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
50
51        // Take the TUN outbound receiver, or create a dummy channel that never
52        // produces messages (when TUN is disabled). Holding the sender prevents
53        // the channel from closing.
54        let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
55            Some(rx) => (rx, None),
56            None => {
57                let (tx, rx) = tokio::sync::mpsc::channel(1);
58                (rx, Some(tx))
59            }
60        };
61
62        // Take the DNS identity receiver, or create a dummy channel (when DNS
63        // is disabled). Same pattern as TUN outbound.
64        let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
65            Some(rx) => (rx, None),
66            None => {
67                let (tx, rx) = tokio::sync::mpsc::channel(1);
68                (rx, Some(tx))
69            }
70        };
71
72        // Take the endpoint-data command receiver, or create a dummy channel
73        // when the embedded endpoint API is not in use.
74        let (mut endpoint_command_rx, _endpoint_command_guard) =
75            match self.endpoint_command_rx.take() {
76                Some(rx) => (rx, None),
77                None => {
78                    let (tx, rx) = tokio::sync::mpsc::channel(1);
79                    (rx, Some(tx))
80                }
81            };
82
83        // Take the decrypt worker fallback receiver if a worker pool
84        // is in use. The worker pushes non-fast-path packets (anything
85        // that's not bulk EndpointData) here for the legacy dispatch.
86        let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
87            match self.decrypt_fallback_rx.take() {
88                Some(rx) => (rx, None),
89                None => {
90                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
91                    (rx, Some(tx))
92                }
93            };
94
95        let mut tick =
96            tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
97
98        // Set up control socket channel
99        let (control_tx, mut control_rx) =
100            tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
101
102        if self.config.node.control.enabled {
103            let config = self.config.node.control.clone();
104            let tx = control_tx.clone();
105            tokio::spawn(async move {
106                match ControlSocket::bind(&config) {
107                    Ok(socket) => {
108                        socket.accept_loop(tx).await;
109                    }
110                    Err(e) => {
111                        warn!(error = %e, "Failed to bind control socket");
112                    }
113                }
114            });
115        }
116        // Drop unused sender to avoid keeping channel open if control is disabled
117        drop(control_tx);
118
119        info!("RX event loop started");
120        // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
121        crate::perf_profile::maybe_spawn_reporter();
122
123        loop {
124            tokio::select! {
125                biased;
126                // Decrypt-worker fallback drains FIRST. The previous
127                // ordering put `packet_rx` first, which under sustained
128                // inbound bursts let the raw-packet drain (up to 256
129                // packets + flush) starve fallback work for tens of
130                // milliseconds. UDP throughput tolerates that; TCP
131                // doesn't — late FMP plaintexts mean late ACKs,
132                // dup-ACK fast retransmits, and cwnd collapse.
133                // Reproduced on native macOS / Wi-Fi where TCP fell to
134                // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
135                // tunnel. Promoting fallback gives the kernel-side
136                // TCP machinery a fair chance to see its ACKs and
137                // keep cwnd growing.
138                Some(event) = decrypt_fallback_rx.recv() => {
139                    self.process_decrypt_worker_event(event).await;
140                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
141                    self.flush_pending_sends().await;
142                }
143                packet = packet_rx.recv() => {
144                    match packet {
145                        Some(p) => {
146                            self.drain_packet_rx(
147                                &mut packet_rx,
148                                &mut decrypt_fallback_rx,
149                                Some(p),
150                                PACKET_DRAIN_BUDGET,
151                            ).await;
152                        }
153                        None => break, // channel closed
154                    }
155                }
156                _ = tick.tick() => {
157                    let drained = self.drain_packet_rx(
158                        &mut packet_rx,
159                        &mut decrypt_fallback_rx,
160                        None,
161                        PACKET_DRAIN_BUDGET,
162                    ).await;
163                    if drained > 0 {
164                        debug!(
165                            drained,
166                            "Drained queued packets before rx-loop maintenance"
167                        );
168                    }
169                    if tokio::time::timeout(
170                        RX_LOOP_MAINTENANCE_TIMEOUT,
171                        self.run_rx_loop_maintenance_tick(),
172                    ).await.is_err() {
173                        warn!(
174                            timeout_ms = RX_LOOP_MAINTENANCE_TIMEOUT.as_millis() as u64,
175                            "RX loop maintenance timed out; continuing packet processing"
176                        );
177                    }
178                }
179                Some(ipv6_packet) = tun_outbound_rx.recv() => {
180                    self.handle_tun_outbound(ipv6_packet).await;
181                    let mut drained = 0;
182                    while drained < 256 {
183                        match tun_outbound_rx.try_recv() {
184                            Ok(p) => {
185                                self.handle_tun_outbound(p).await;
186                                drained += 1;
187                            }
188                            Err(_) => break,
189                        }
190                    }
191                    // Flush any trailing batched sends so the last
192                    // packets of a burst don't sit in the per-transport
193                    // sendmmsg buffer waiting for the threshold.
194                    self.flush_pending_sends().await;
195                }
196                Some(identity) = dns_identity_rx.recv() => {
197                    debug!(
198                        node_addr = %identity.node_addr,
199                        "Registering identity from DNS resolution"
200                    );
201                    self.register_identity(identity.node_addr, identity.pubkey);
202                }
203                Some(command) = endpoint_command_rx.recv() => {
204                    self.handle_endpoint_data_command(command).await;
205                    // Same drain pattern: when the application is shoving
206                    // tunnel data in via send_oneway, several Send commands
207                    // typically queue up between scheduler hops.
208                    let mut drained = 0;
209                    while drained < 256 {
210                        match endpoint_command_rx.try_recv() {
211                            Ok(c) => {
212                                self.handle_endpoint_data_command(c).await;
213                                drained += 1;
214                            }
215                            Err(_) => break,
216                        }
217                    }
218                    // Flush any trailing batched sends from the
219                    // per-transport sendmmsg buffer.
220                    self.flush_pending_sends().await;
221                }
222                Some((request, response_tx)) = control_rx.recv() => {
223                    let response = if request.command.starts_with("show_") {
224                        queries::dispatch(self, &request.command, request.params.as_ref())
225                    } else {
226                        commands::dispatch(
227                            self,
228                            &request.command,
229                            request.params.as_ref(),
230                        ).await
231                    };
232                    let _ = response_tx.send(response);
233                }
234            }
235        }
236
237        info!("RX event loop stopped (channel closed)");
238        Ok(())
239    }
240
241    async fn drain_packet_rx(
242        &mut self,
243        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
244        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
245        first_packet: Option<ReceivedPacket>,
246        budget: usize,
247    ) -> usize {
248        let mut drained = 0usize;
249        if let Some(packet) = first_packet {
250            self.process_packet(packet).await;
251            drained = 1;
252        }
253
254        // Drain remaining ready inbound packets in a tight loop before
255        // yielding back to select! Every yield is a scheduler hop, and at
256        // line rate transports typically have several packets available per
257        // wake. Caps at a batch boundary so other branches eventually get a
258        // turn even under sustained load.
259        while drained < budget {
260            if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
261                self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
262                    .await;
263            }
264            match packet_rx.try_recv() {
265                Ok(packet) => {
266                    self.process_packet(packet).await;
267                    drained += 1;
268                }
269                Err(_) => break,
270            }
271        }
272
273        if drained > 0 {
274            // One trailing fallback drain so the last bounced packets of the
275            // burst aren't held up by the post-burst send flush.
276            self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
277                .await;
278            // Flush any batched sends triggered by inbound packets (e.g.
279            // forwarded SessionDatagrams, MMP reports, tree announces).
280            self.flush_pending_sends().await;
281        }
282        drained
283    }
284
285    async fn run_rx_loop_maintenance_tick(&mut self) {
286        self.check_timeouts();
287        let now_ms = Self::now_ms();
288        self.reload_peer_acl();
289        self.poll_pending_connects().await;
290        self.poll_nostr_discovery().await;
291        self.poll_lan_discovery().await;
292        self.poll_local_instance_discovery().await;
293        self.resend_pending_handshakes(now_ms).await;
294        self.resend_pending_rekeys(now_ms).await;
295        self.resend_pending_session_handshakes(now_ms).await;
296        self.resend_pending_session_msg3(now_ms).await;
297        self.purge_idle_sessions(now_ms);
298        self.purge_learned_routes(now_ms);
299        self.process_pending_retries(now_ms).await;
300        self.check_tree_state().await;
301        self.check_bloom_state().await;
302        self.compute_mesh_size();
303        self.record_stats_history();
304        self.check_mmp_reports().await;
305        self.check_session_mmp_reports().await;
306        self.check_link_heartbeats().await;
307        self.check_rekey().await;
308        self.check_session_rekey().await;
309        self.check_pending_lookups(now_ms).await;
310        self.poll_transport_discovery().await;
311        self.sample_transport_congestion();
312        self.activate_connected_udp_sessions().await;
313    }
314
315    /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
316    /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
317    /// flag byte the worker captured into `DecryptFallback::fmp_flags`
318    /// (without this both ECN CE propagation and spin-bit RTT
319    /// observation are dropped on the worker path) and slices the
320    /// plaintext out of the original wire buffer with zero allocation.
321    async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
322        match event {
323            DecryptWorkerEvent::Plaintext(fallback) => {
324                self.process_decrypt_fallback(fallback).await;
325            }
326            DecryptWorkerEvent::DecryptFailure(report) => {
327                self.process_decrypt_failure_report(report).await;
328            }
329        }
330    }
331
332    async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
333        let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
334        let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
335        let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
336            ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
337        self.process_authentic_fmp_plaintext(
338            &fallback.source_node_addr,
339            fallback.transport_id,
340            &fallback.remote_addr,
341            fallback.timestamp_ms,
342            fallback.packet_len,
343            fallback.fmp_counter,
344            ce_flag,
345            sp_flag,
346            plaintext,
347        )
348        .await;
349    }
350
351    async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
352        debug!(
353            peer = %self.peer_display_name(&report.source_node_addr),
354            counter = report.fmp_counter,
355            replay_highest = report.fmp_replay_highest,
356            "Worker FMP AEAD decryption failed"
357        );
358        self.handle_decrypt_failure_report(&report).await;
359    }
360
361    /// Drain up to `budget` queued fallbacks without yielding back to
362    /// `select!`. Returns the number processed. Called both from the
363    /// promoted-fallback select arm (after the head item) and
364    /// interleaved inside the packet_rx drain loop so bounced FMP
365    /// plaintexts can't accumulate behind a 256-packet inbound burst.
366    async fn drain_decrypt_fallback(
367        &mut self,
368        rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
369        budget: usize,
370    ) -> usize {
371        let mut drained = 0;
372        while drained < budget {
373            match rx.try_recv() {
374                Ok(event) => {
375                    self.process_decrypt_worker_event(event).await;
376                    drained += 1;
377                }
378                Err(_) => break,
379            }
380        }
381        drained
382    }
383
384    /// Flush any pending batched sends across all transports. Today
385    /// every transport's `flush_pending_send` is a no-op — the UDP
386    /// transport's per-transport `pending_send` buffer was removed
387    /// when the bulk data path moved into `encrypt_worker` (which
388    /// does its own target-grouped `sendmmsg(2)` directly). The
389    /// call sites are retained so any future batched transport can
390    /// opt in by overriding `flush_pending_send` without touching
391    /// the rx_loop.
392    async fn flush_pending_sends(&self) {
393        for transport in self.transports.values() {
394            if matches!(transport, TransportHandle::Udp(_)) {
395                transport.flush_pending_send().await;
396            }
397        }
398    }
399
400    /// Process a single received packet.
401    ///
402    /// Dispatches based on the phase field in the 4-byte common prefix.
403    pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
404        let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
405        crate::perf_profile::record_since(
406            crate::perf_profile::Stage::TransportQueueWait,
407            packet.trace_enqueued_at,
408        );
409        if is_punch_packet(&packet.data) {
410            trace!(
411                transport_id = %packet.transport_id,
412                remote_addr = %packet.remote_addr,
413                bytes = packet.data.len(),
414                "Dropping stray punch probe/ack in FMP rx loop"
415            );
416            return;
417        }
418        if packet.data.len() < COMMON_PREFIX_SIZE {
419            return; // Drop packets too short for common prefix
420        }
421
422        let prefix = match CommonPrefix::parse(&packet.data) {
423            Some(p) => p,
424            None => return, // Malformed prefix
425        };
426        if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
427            debug!(
428                transport_id = %packet.transport_id,
429                remote_addr = %packet.remote_addr,
430                bytes = packet.data.len(),
431                phase = prefix.phase,
432                version = prefix.version,
433                "FMP handshake packet dispatch"
434            );
435        } else {
436            trace!(
437                transport_id = %packet.transport_id,
438                remote_addr = %packet.remote_addr,
439                bytes = packet.data.len(),
440                phase = prefix.phase,
441                version = prefix.version,
442                "FMP packet dispatch"
443            );
444        }
445
446        if prefix.version != FMP_VERSION {
447            debug!(
448                version = prefix.version,
449                transport_id = %packet.transport_id,
450                "Unknown FMP version, dropping"
451            );
452
453            // If the packet arrived on an adopted Nostr-NAT bootstrap
454            // transport, the originating peer is necessarily on a
455            // different FMP-protocol version than us — the discovery
456            // sweep would otherwise re-traverse them every cycle even
457            // though no msg1/msg2 exchange can ever succeed. Bump the
458            // discovery-layer cooldown to the long protocol-mismatch
459            // window and emit a single WARN per fresh observation.
460            let looks_like_fmp_phase =
461                matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
462            if looks_like_fmp_phase
463                && self.bootstrap_transports.contains(&packet.transport_id)
464                && let Some(npub) = self
465                    .bootstrap_transport_npubs
466                    .get(&packet.transport_id)
467                    .cloned()
468                && let Some(handle) = self.nostr_discovery_handle()
469            {
470                let now_ms = Self::now_ms();
471                let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
472                if handle.record_protocol_mismatch(&npub, now_ms) {
473                    warn!(
474                        peer_npub = %npub,
475                        transport_id = %packet.transport_id,
476                        peer_version = prefix.version,
477                        our_version = FMP_VERSION,
478                        cooldown_secs,
479                        "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
480                    );
481                }
482            }
483            return;
484        }
485
486        match prefix.phase {
487            PHASE_ESTABLISHED => {
488                self.handle_encrypted_frame(packet).await;
489            }
490            PHASE_MSG1 => {
491                self.handle_msg1(packet).await;
492            }
493            PHASE_MSG2 => {
494                self.handle_msg2(packet).await;
495            }
496            _ => {
497                debug!(
498                    phase = prefix.phase,
499                    transport_id = %packet.transport_id,
500                    "Unknown FMP phase, dropping"
501                );
502            }
503        }
504    }
505}