Skip to main content

fips_core/node/handlers/
rx_loop.rs

1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
6use crate::node::wire::{
7    COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
8    PHASE_MSG2,
9};
10use crate::node::{Node, NodeError};
11use crate::transport::ReceivedPacket;
12use crate::transport::TransportHandle;
13use std::time::Duration;
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, info, trace, warn};
16
17/// How often the raw-packet drain loop yields a slice of work to the
18/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
19/// progress steady under sustained inbound bursts.
20const FALLBACK_INTERLEAVE_EVERY: usize = 32;
21/// Cap on the per-interleave fallback drain so a hot inbound spike
22/// can't starve the outer raw-packet drain in the opposite direction.
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
24const PACKET_DRAIN_BUDGET: usize = 256;
25const RX_LOOP_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(500);
26
27impl Node {
28    /// Run the receive event loop.
29    ///
30    /// Processes packets from all transports, dispatching based on
31    /// the phase field in the 4-byte common prefix:
32    /// - Phase 0x0: Encrypted frame (session data)
33    /// - Phase 0x1: Handshake message 1 (initiator -> responder)
34    /// - Phase 0x2: Handshake message 2 (responder -> initiator)
35    ///
36    /// Also processes outbound IPv6 packets from the TUN reader for session
37    /// encapsulation and routing through the mesh.
38    ///
39    /// Also processes DNS-resolved identities for identity cache population.
40    ///
41    /// Also runs a periodic tick (1s) to clean up stale handshake connections
42    /// that never received a response. This prevents resource leaks when peers
43    /// are unreachable.
44    ///
45    /// This method takes ownership of the packet_rx channel and runs
46    /// until the channel is closed (typically when stop() is called).
47    pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
48        let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
49
50        // Take the TUN outbound receiver, or create a dummy channel that never
51        // produces messages (when TUN is disabled). Holding the sender prevents
52        // the channel from closing.
53        let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
54            Some(rx) => (rx, None),
55            None => {
56                let (tx, rx) = tokio::sync::mpsc::channel(1);
57                (rx, Some(tx))
58            }
59        };
60
61        // Take the DNS identity receiver, or create a dummy channel (when DNS
62        // is disabled). Same pattern as TUN outbound.
63        let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
64            Some(rx) => (rx, None),
65            None => {
66                let (tx, rx) = tokio::sync::mpsc::channel(1);
67                (rx, Some(tx))
68            }
69        };
70
71        // Take the endpoint-data command receiver, or create a dummy channel
72        // when the embedded endpoint API is not in use.
73        let (mut endpoint_command_rx, _endpoint_command_guard) =
74            match self.endpoint_command_rx.take() {
75                Some(rx) => (rx, None),
76                None => {
77                    let (tx, rx) = tokio::sync::mpsc::channel(1);
78                    (rx, Some(tx))
79                }
80            };
81
82        // Take the decrypt worker fallback receiver if a worker pool
83        // is in use. The worker pushes non-fast-path packets (anything
84        // that's not bulk EndpointData) here for the legacy dispatch.
85        let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
86            match self.decrypt_fallback_rx.take() {
87                Some(rx) => (rx, None),
88                None => {
89                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
90                    (rx, Some(tx))
91                }
92            };
93
94        let mut tick =
95            tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
96
97        // Set up control socket channel
98        let (control_tx, mut control_rx) =
99            tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
100
101        if self.config.node.control.enabled {
102            let config = self.config.node.control.clone();
103            let tx = control_tx.clone();
104            tokio::spawn(async move {
105                match ControlSocket::bind(&config) {
106                    Ok(socket) => {
107                        socket.accept_loop(tx).await;
108                    }
109                    Err(e) => {
110                        warn!(error = %e, "Failed to bind control socket");
111                    }
112                }
113            });
114        }
115        // Drop unused sender to avoid keeping channel open if control is disabled
116        drop(control_tx);
117
118        info!("RX event loop started");
119        // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
120        crate::perf_profile::maybe_spawn_reporter();
121
122        loop {
123            tokio::select! {
124                biased;
125                // Decrypt-worker fallback drains FIRST. The previous
126                // ordering put `packet_rx` first, which under sustained
127                // inbound bursts let the raw-packet drain (up to 256
128                // packets + flush) starve fallback work for tens of
129                // milliseconds. UDP throughput tolerates that; TCP
130                // doesn't — late FMP plaintexts mean late ACKs,
131                // dup-ACK fast retransmits, and cwnd collapse.
132                // Reproduced on native macOS / Wi-Fi where TCP fell to
133                // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
134                // tunnel. Promoting fallback gives the kernel-side
135                // TCP machinery a fair chance to see its ACKs and
136                // keep cwnd growing.
137                Some(event) = decrypt_fallback_rx.recv() => {
138                    self.process_decrypt_worker_event(event).await;
139                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
140                    self.flush_pending_sends().await;
141                }
142                packet = packet_rx.recv() => {
143                    match packet {
144                        Some(p) => {
145                            self.drain_packet_rx(
146                                &mut packet_rx,
147                                &mut decrypt_fallback_rx,
148                                Some(p),
149                                PACKET_DRAIN_BUDGET,
150                            ).await;
151                        }
152                        None => break, // channel closed
153                    }
154                }
155                _ = tick.tick() => {
156                    let drained = self.drain_packet_rx(
157                        &mut packet_rx,
158                        &mut decrypt_fallback_rx,
159                        None,
160                        PACKET_DRAIN_BUDGET,
161                    ).await;
162                    if drained > 0 {
163                        debug!(
164                            drained,
165                            "Drained queued packets before rx-loop maintenance"
166                        );
167                    }
168                    if tokio::time::timeout(
169                        RX_LOOP_MAINTENANCE_TIMEOUT,
170                        self.run_rx_loop_maintenance_tick(),
171                    ).await.is_err() {
172                        warn!(
173                            timeout_ms = RX_LOOP_MAINTENANCE_TIMEOUT.as_millis() as u64,
174                            "RX loop maintenance timed out; continuing packet processing"
175                        );
176                    }
177                }
178                Some(ipv6_packet) = tun_outbound_rx.recv() => {
179                    self.handle_tun_outbound(ipv6_packet).await;
180                    let mut drained = 0;
181                    while drained < 256 {
182                        match tun_outbound_rx.try_recv() {
183                            Ok(p) => {
184                                self.handle_tun_outbound(p).await;
185                                drained += 1;
186                            }
187                            Err(_) => break,
188                        }
189                    }
190                    // Flush any trailing batched sends so the last
191                    // packets of a burst don't sit in the per-transport
192                    // sendmmsg buffer waiting for the threshold.
193                    self.flush_pending_sends().await;
194                }
195                Some(identity) = dns_identity_rx.recv() => {
196                    debug!(
197                        node_addr = %identity.node_addr,
198                        "Registering identity from DNS resolution"
199                    );
200                    self.register_identity(identity.node_addr, identity.pubkey);
201                }
202                Some(command) = endpoint_command_rx.recv() => {
203                    self.handle_endpoint_data_command(command).await;
204                    // Same drain pattern: when the application is shoving
205                    // tunnel data in via send_oneway, several Send commands
206                    // typically queue up between scheduler hops.
207                    let mut drained = 0;
208                    while drained < 256 {
209                        match endpoint_command_rx.try_recv() {
210                            Ok(c) => {
211                                self.handle_endpoint_data_command(c).await;
212                                drained += 1;
213                            }
214                            Err(_) => break,
215                        }
216                    }
217                    // Flush any trailing batched sends from the
218                    // per-transport sendmmsg buffer.
219                    self.flush_pending_sends().await;
220                }
221                Some((request, response_tx)) = control_rx.recv() => {
222                    let response = if request.command.starts_with("show_") {
223                        queries::dispatch(self, &request.command, request.params.as_ref())
224                    } else {
225                        commands::dispatch(
226                            self,
227                            &request.command,
228                            request.params.as_ref(),
229                        ).await
230                    };
231                    let _ = response_tx.send(response);
232                }
233            }
234        }
235
236        info!("RX event loop stopped (channel closed)");
237        Ok(())
238    }
239
240    async fn drain_packet_rx(
241        &mut self,
242        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
243        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
244        first_packet: Option<ReceivedPacket>,
245        budget: usize,
246    ) -> usize {
247        let mut drained = 0usize;
248        if let Some(packet) = first_packet {
249            self.process_packet(packet).await;
250            drained = 1;
251        }
252
253        // Drain remaining ready inbound packets in a tight loop before
254        // yielding back to select! Every yield is a scheduler hop, and at
255        // line rate transports typically have several packets available per
256        // wake. Caps at a batch boundary so other branches eventually get a
257        // turn even under sustained load.
258        while drained < budget {
259            if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
260                self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
261                    .await;
262            }
263            match packet_rx.try_recv() {
264                Ok(packet) => {
265                    self.process_packet(packet).await;
266                    drained += 1;
267                }
268                Err(_) => break,
269            }
270        }
271
272        if drained > 0 {
273            // One trailing fallback drain so the last bounced packets of the
274            // burst aren't held up by the post-burst send flush.
275            self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
276                .await;
277            // Flush any batched sends triggered by inbound packets (e.g.
278            // forwarded SessionDatagrams, MMP reports, tree announces).
279            self.flush_pending_sends().await;
280        }
281        drained
282    }
283
284    async fn run_rx_loop_maintenance_tick(&mut self) {
285        self.check_timeouts();
286        let now_ms = Self::now_ms();
287        self.reload_peer_acl();
288        self.poll_pending_connects().await;
289        self.poll_nostr_discovery().await;
290        self.poll_lan_discovery().await;
291        self.poll_local_instance_discovery().await;
292        self.resend_pending_handshakes(now_ms).await;
293        self.resend_pending_rekeys(now_ms).await;
294        self.resend_pending_session_handshakes(now_ms).await;
295        self.resend_pending_session_msg3(now_ms).await;
296        self.purge_idle_sessions(now_ms);
297        self.purge_learned_routes(now_ms);
298        self.process_pending_retries(now_ms).await;
299        self.check_tree_state().await;
300        self.check_bloom_state().await;
301        self.compute_mesh_size();
302        self.record_stats_history();
303        self.check_mmp_reports().await;
304        self.check_session_mmp_reports().await;
305        self.check_link_heartbeats().await;
306        self.check_rekey().await;
307        self.check_session_rekey().await;
308        self.check_pending_lookups(now_ms).await;
309        self.poll_transport_discovery().await;
310        self.sample_transport_congestion();
311        self.activate_connected_udp_sessions().await;
312    }
313
314    /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
315    /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
316    /// flag byte the worker captured into `DecryptFallback::fmp_flags`
317    /// (without this both ECN CE propagation and spin-bit RTT
318    /// observation are dropped on the worker path) and slices the
319    /// plaintext out of the original wire buffer with zero allocation.
320    async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
321        match event {
322            DecryptWorkerEvent::Plaintext(fallback) => {
323                self.process_decrypt_fallback(fallback).await;
324            }
325            DecryptWorkerEvent::DecryptFailure(report) => {
326                self.process_decrypt_failure_report(report).await;
327            }
328        }
329    }
330
331    async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
332        let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
333        let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
334        let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
335            ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
336        self.process_authentic_fmp_plaintext(
337            &fallback.source_node_addr,
338            fallback.transport_id,
339            &fallback.remote_addr,
340            fallback.timestamp_ms,
341            fallback.packet_len,
342            fallback.fmp_counter,
343            ce_flag,
344            sp_flag,
345            plaintext,
346        )
347        .await;
348    }
349
350    async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
351        debug!(
352            peer = %self.peer_display_name(&report.source_node_addr),
353            counter = report.fmp_counter,
354            replay_highest = report.fmp_replay_highest,
355            "Worker FMP AEAD decryption failed"
356        );
357        self.handle_decrypt_failure_report(&report).await;
358    }
359
360    /// Drain up to `budget` queued fallbacks without yielding back to
361    /// `select!`. Returns the number processed. Called both from the
362    /// promoted-fallback select arm (after the head item) and
363    /// interleaved inside the packet_rx drain loop so bounced FMP
364    /// plaintexts can't accumulate behind a 256-packet inbound burst.
365    async fn drain_decrypt_fallback(
366        &mut self,
367        rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
368        budget: usize,
369    ) -> usize {
370        let mut drained = 0;
371        while drained < budget {
372            match rx.try_recv() {
373                Ok(event) => {
374                    self.process_decrypt_worker_event(event).await;
375                    drained += 1;
376                }
377                Err(_) => break,
378            }
379        }
380        drained
381    }
382
383    /// Flush any pending batched sends across all transports. Today
384    /// every transport's `flush_pending_send` is a no-op — the UDP
385    /// transport's per-transport `pending_send` buffer was removed
386    /// when the bulk data path moved into `encrypt_worker` (which
387    /// does its own target-grouped `sendmmsg(2)` directly). The
388    /// call sites are retained so any future batched transport can
389    /// opt in by overriding `flush_pending_send` without touching
390    /// the rx_loop.
391    async fn flush_pending_sends(&self) {
392        for transport in self.transports.values() {
393            if matches!(transport, TransportHandle::Udp(_)) {
394                transport.flush_pending_send().await;
395            }
396        }
397    }
398
399    /// Process a single received packet.
400    ///
401    /// Dispatches based on the phase field in the 4-byte common prefix.
402    async fn process_packet(&mut self, packet: ReceivedPacket) {
403        let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
404        crate::perf_profile::record_since(
405            crate::perf_profile::Stage::TransportQueueWait,
406            packet.trace_enqueued_at,
407        );
408        if packet.data.len() < COMMON_PREFIX_SIZE {
409            return; // Drop packets too short for common prefix
410        }
411
412        let prefix = match CommonPrefix::parse(&packet.data) {
413            Some(p) => p,
414            None => return, // Malformed prefix
415        };
416        if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
417            debug!(
418                transport_id = %packet.transport_id,
419                remote_addr = %packet.remote_addr,
420                bytes = packet.data.len(),
421                phase = prefix.phase,
422                version = prefix.version,
423                "FMP handshake packet dispatch"
424            );
425        } else {
426            trace!(
427                transport_id = %packet.transport_id,
428                remote_addr = %packet.remote_addr,
429                bytes = packet.data.len(),
430                phase = prefix.phase,
431                version = prefix.version,
432                "FMP packet dispatch"
433            );
434        }
435
436        if prefix.version != FMP_VERSION {
437            debug!(
438                version = prefix.version,
439                transport_id = %packet.transport_id,
440                "Unknown FMP version, dropping"
441            );
442
443            // If the packet arrived on an adopted Nostr-NAT bootstrap
444            // transport, the originating peer is necessarily on a
445            // different FMP-protocol version than us — the discovery
446            // sweep would otherwise re-traverse them every cycle even
447            // though no msg1/msg2 exchange can ever succeed. Bump the
448            // discovery-layer cooldown to the long protocol-mismatch
449            // window and emit a single WARN per fresh observation.
450            if self.bootstrap_transports.contains(&packet.transport_id)
451                && let Some(npub) = self
452                    .bootstrap_transport_npubs
453                    .get(&packet.transport_id)
454                    .cloned()
455                && let Some(handle) = self.nostr_discovery_handle()
456            {
457                let now_ms = Self::now_ms();
458                let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
459                if handle.record_protocol_mismatch(&npub, now_ms) {
460                    warn!(
461                        peer_npub = %npub,
462                        transport_id = %packet.transport_id,
463                        peer_version = prefix.version,
464                        our_version = FMP_VERSION,
465                        cooldown_secs,
466                        "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
467                    );
468                }
469            }
470            return;
471        }
472
473        match prefix.phase {
474            PHASE_ESTABLISHED => {
475                self.handle_encrypted_frame(packet).await;
476            }
477            PHASE_MSG1 => {
478                self.handle_msg1(packet).await;
479            }
480            PHASE_MSG2 => {
481                self.handle_msg2(packet).await;
482            }
483            _ => {
484                debug!(
485                    phase = prefix.phase,
486                    transport_id = %packet.transport_id,
487                    "Unknown FMP phase, dropping"
488                );
489            }
490        }
491    }
492}