Skip to main content

fips_core/node/handlers/
rx_loop.rs

1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
6use crate::node::wire::{
7    COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
8    PHASE_MSG2,
9};
10use crate::node::{Node, NodeError};
11use crate::transport::ReceivedPacket;
12use crate::transport::TransportHandle;
13use std::time::Duration;
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, info, warn};
16
17/// How often the raw-packet drain loop yields a slice of work to the
18/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
19/// progress steady under sustained inbound bursts.
20const FALLBACK_INTERLEAVE_EVERY: usize = 32;
21/// Cap on the per-interleave fallback drain so a hot inbound spike
22/// can't starve the outer raw-packet drain in the opposite direction.
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
24
25impl Node {
26    /// Run the receive event loop.
27    ///
28    /// Processes packets from all transports, dispatching based on
29    /// the phase field in the 4-byte common prefix:
30    /// - Phase 0x0: Encrypted frame (session data)
31    /// - Phase 0x1: Handshake message 1 (initiator -> responder)
32    /// - Phase 0x2: Handshake message 2 (responder -> initiator)
33    ///
34    /// Also processes outbound IPv6 packets from the TUN reader for session
35    /// encapsulation and routing through the mesh.
36    ///
37    /// Also processes DNS-resolved identities for identity cache population.
38    ///
39    /// Also runs a periodic tick (1s) to clean up stale handshake connections
40    /// that never received a response. This prevents resource leaks when peers
41    /// are unreachable.
42    ///
43    /// This method takes ownership of the packet_rx channel and runs
44    /// until the channel is closed (typically when stop() is called).
45    pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
46        let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
47
48        // Take the TUN outbound receiver, or create a dummy channel that never
49        // produces messages (when TUN is disabled). Holding the sender prevents
50        // the channel from closing.
51        let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
52            Some(rx) => (rx, None),
53            None => {
54                let (tx, rx) = tokio::sync::mpsc::channel(1);
55                (rx, Some(tx))
56            }
57        };
58
59        // Take the DNS identity receiver, or create a dummy channel (when DNS
60        // is disabled). Same pattern as TUN outbound.
61        let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
62            Some(rx) => (rx, None),
63            None => {
64                let (tx, rx) = tokio::sync::mpsc::channel(1);
65                (rx, Some(tx))
66            }
67        };
68
69        // Take the endpoint-data command receiver, or create a dummy channel
70        // when the embedded endpoint API is not in use.
71        let (mut endpoint_command_rx, _endpoint_command_guard) =
72            match self.endpoint_command_rx.take() {
73                Some(rx) => (rx, None),
74                None => {
75                    let (tx, rx) = tokio::sync::mpsc::channel(1);
76                    (rx, Some(tx))
77                }
78            };
79
80        // Take the decrypt worker fallback receiver if a worker pool
81        // is in use. The worker pushes non-fast-path packets (anything
82        // that's not bulk EndpointData) here for the legacy dispatch.
83        let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
84            match self.decrypt_fallback_rx.take() {
85                Some(rx) => (rx, None),
86                None => {
87                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
88                    (rx, Some(tx))
89                }
90            };
91
92        let mut tick =
93            tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
94
95        // Set up control socket channel
96        let (control_tx, mut control_rx) =
97            tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
98
99        if self.config.node.control.enabled {
100            let config = self.config.node.control.clone();
101            let tx = control_tx.clone();
102            tokio::spawn(async move {
103                match ControlSocket::bind(&config) {
104                    Ok(socket) => {
105                        socket.accept_loop(tx).await;
106                    }
107                    Err(e) => {
108                        warn!(error = %e, "Failed to bind control socket");
109                    }
110                }
111            });
112        }
113        // Drop unused sender to avoid keeping channel open if control is disabled
114        drop(control_tx);
115
116        info!("RX event loop started");
117        // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
118        crate::perf_profile::maybe_spawn_reporter();
119
120        loop {
121            tokio::select! {
122                biased;
123                // Decrypt-worker fallback drains FIRST. The previous
124                // ordering put `packet_rx` first, which under sustained
125                // inbound bursts let the raw-packet drain (up to 256
126                // packets + flush) starve fallback work for tens of
127                // milliseconds. UDP throughput tolerates that; TCP
128                // doesn't — late FMP plaintexts mean late ACKs,
129                // dup-ACK fast retransmits, and cwnd collapse.
130                // Reproduced on native macOS / Wi-Fi where TCP fell to
131                // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
132                // tunnel. Promoting fallback gives the kernel-side
133                // TCP machinery a fair chance to see its ACKs and
134                // keep cwnd growing.
135                Some(event) = decrypt_fallback_rx.recv() => {
136                    self.process_decrypt_worker_event(event).await;
137                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
138                    self.flush_pending_sends().await;
139                }
140                // Keep maintenance above the hot packet drains in this
141                // biased select. Sustained UDP/TUN traffic can otherwise
142                // keep packet branches ready long enough to delay MMP
143                // reports, rekey retries, congestion samples, and connected
144                // UDP activation until after the dataplane already looks
145                // unhealthy.
146                _ = tick.tick() => {
147                    self.run_rx_loop_maintenance_tick().await;
148                }
149                packet = packet_rx.recv() => {
150                    match packet {
151                        Some(p) => self.process_packet(p).await,
152                        None => break, // channel closed
153                    }
154                    // Drain remaining ready inbound packets in a tight loop
155                    // before yielding back to select! — every yield is a
156                    // futex hop on tokio's multi-thread scheduler, and at
157                    // line rate the kernel UDP queue typically has several
158                    // datagrams available per wake. Caps at a batch
159                    // boundary so other branches (tick, control) eventually
160                    // get a turn even under sustained load.
161                    let mut drained: usize = 1; // counts the packet processed above
162                    while drained < 256 {
163                        if drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
164                            // Interleave fallback drain so bounced FMP
165                            // plaintexts (heartbeats, post-FSP-decrypt
166                            // TCP ACKs, control frames) don't sit in
167                            // the fallback queue for a full 256-packet
168                            // burst. Without this, even the
169                            // priority-first ordering above can't help
170                            // once we're inside this inner loop.
171                            self.drain_decrypt_fallback(
172                                &mut decrypt_fallback_rx,
173                                FALLBACK_INTERLEAVE_BUDGET,
174                            )
175                            .await;
176                        }
177                        match packet_rx.try_recv() {
178                            Ok(p) => {
179                                self.process_packet(p).await;
180                                drained += 1;
181                            }
182                            Err(_) => break,
183                        }
184                    }
185                    // One trailing fallback drain so the last bounced
186                    // packets of the burst aren't held up by the
187                    // post-burst send flush.
188                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 256)
189                        .await;
190                    // Flush any batched sends triggered by inbound
191                    // packets (e.g. forwarded SessionDatagrams, MMP
192                    // reports, tree announces).
193                    self.flush_pending_sends().await;
194                }
195                Some(ipv6_packet) = tun_outbound_rx.recv() => {
196                    self.handle_tun_outbound(ipv6_packet).await;
197                    let mut drained = 0;
198                    while drained < 256 {
199                        match tun_outbound_rx.try_recv() {
200                            Ok(p) => {
201                                self.handle_tun_outbound(p).await;
202                                drained += 1;
203                            }
204                            Err(_) => break,
205                        }
206                    }
207                    // Flush any trailing batched sends so the last
208                    // packets of a burst don't sit in the per-transport
209                    // sendmmsg buffer waiting for the threshold.
210                    self.flush_pending_sends().await;
211                }
212                Some(identity) = dns_identity_rx.recv() => {
213                    debug!(
214                        node_addr = %identity.node_addr,
215                        "Registering identity from DNS resolution"
216                    );
217                    self.register_identity(identity.node_addr, identity.pubkey);
218                }
219                Some(command) = endpoint_command_rx.recv() => {
220                    self.handle_endpoint_data_command(command).await;
221                    // Same drain pattern: when the application is shoving
222                    // tunnel data in via send_oneway, several Send commands
223                    // typically queue up between scheduler hops.
224                    let mut drained = 0;
225                    while drained < 256 {
226                        match endpoint_command_rx.try_recv() {
227                            Ok(c) => {
228                                self.handle_endpoint_data_command(c).await;
229                                drained += 1;
230                            }
231                            Err(_) => break,
232                        }
233                    }
234                    // Flush any trailing batched sends from the
235                    // per-transport sendmmsg buffer.
236                    self.flush_pending_sends().await;
237                }
238                Some((request, response_tx)) = control_rx.recv() => {
239                    let response = if request.command.starts_with("show_") {
240                        queries::dispatch(self, &request.command, request.params.as_ref())
241                    } else {
242                        commands::dispatch(
243                            self,
244                            &request.command,
245                            request.params.as_ref(),
246                        ).await
247                    };
248                    let _ = response_tx.send(response);
249                }
250            }
251        }
252
253        info!("RX event loop stopped (channel closed)");
254        Ok(())
255    }
256
257    async fn run_rx_loop_maintenance_tick(&mut self) {
258        self.check_timeouts();
259        let now_ms = Self::now_ms();
260        self.reload_peer_acl();
261        self.poll_pending_connects().await;
262        self.poll_nostr_discovery().await;
263        self.poll_lan_discovery().await;
264        self.resend_pending_handshakes(now_ms).await;
265        self.resend_pending_rekeys(now_ms).await;
266        self.resend_pending_session_handshakes(now_ms).await;
267        self.purge_idle_sessions(now_ms);
268        self.purge_learned_routes(now_ms);
269        self.process_pending_retries(now_ms).await;
270        self.check_tree_state().await;
271        self.check_bloom_state().await;
272        self.compute_mesh_size();
273        self.record_stats_history();
274        self.check_mmp_reports().await;
275        self.check_session_mmp_reports().await;
276        self.check_link_heartbeats().await;
277        self.check_rekey().await;
278        self.check_session_rekey().await;
279        self.check_pending_lookups(now_ms).await;
280        self.poll_transport_discovery().await;
281        self.sample_transport_congestion();
282        self.activate_connected_udp_sessions().await;
283    }
284
285    /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
286    /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
287    /// flag byte the worker captured into `DecryptFallback::fmp_flags`
288    /// (without this both ECN CE propagation and spin-bit RTT
289    /// observation are dropped on the worker path) and slices the
290    /// plaintext out of the original wire buffer with zero allocation.
291    async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
292        match event {
293            DecryptWorkerEvent::Plaintext(fallback) => {
294                self.process_decrypt_fallback(fallback).await;
295            }
296            DecryptWorkerEvent::DecryptFailure(report) => {
297                self.process_decrypt_failure_report(report).await;
298            }
299        }
300    }
301
302    async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
303        let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
304        let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
305        let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
306            ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
307        self.process_authentic_fmp_plaintext(
308            &fallback.source_node_addr,
309            fallback.transport_id,
310            &fallback.remote_addr,
311            fallback.timestamp_ms,
312            fallback.packet_len,
313            fallback.fmp_counter,
314            ce_flag,
315            sp_flag,
316            plaintext,
317        )
318        .await;
319    }
320
321    async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
322        debug!(
323            peer = %self.peer_display_name(&report.source_node_addr),
324            counter = report.fmp_counter,
325            replay_highest = report.fmp_replay_highest,
326            "Worker FMP AEAD decryption failed"
327        );
328        self.handle_decrypt_failure_report(&report).await;
329    }
330
331    /// Drain up to `budget` queued fallbacks without yielding back to
332    /// `select!`. Returns the number processed. Called both from the
333    /// promoted-fallback select arm (after the head item) and
334    /// interleaved inside the packet_rx drain loop so bounced FMP
335    /// plaintexts can't accumulate behind a 256-packet inbound burst.
336    async fn drain_decrypt_fallback(
337        &mut self,
338        rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
339        budget: usize,
340    ) -> usize {
341        let mut drained = 0;
342        while drained < budget {
343            match rx.try_recv() {
344                Ok(event) => {
345                    self.process_decrypt_worker_event(event).await;
346                    drained += 1;
347                }
348                Err(_) => break,
349            }
350        }
351        drained
352    }
353
354    /// Flush any pending batched sends across all transports. Today
355    /// every transport's `flush_pending_send` is a no-op — the UDP
356    /// transport's per-transport `pending_send` buffer was removed
357    /// when the bulk data path moved into `encrypt_worker` (which
358    /// does its own target-grouped `sendmmsg(2)` directly). The
359    /// call sites are retained so any future batched transport can
360    /// opt in by overriding `flush_pending_send` without touching
361    /// the rx_loop.
362    async fn flush_pending_sends(&self) {
363        for transport in self.transports.values() {
364            if matches!(transport, TransportHandle::Udp(_)) {
365                transport.flush_pending_send().await;
366            }
367        }
368    }
369
370    /// Process a single received packet.
371    ///
372    /// Dispatches based on the phase field in the 4-byte common prefix.
373    async fn process_packet(&mut self, packet: ReceivedPacket) {
374        let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
375        crate::perf_profile::record_since(
376            crate::perf_profile::Stage::TransportQueueWait,
377            packet.trace_enqueued_at,
378        );
379        if packet.data.len() < COMMON_PREFIX_SIZE {
380            return; // Drop packets too short for common prefix
381        }
382
383        let prefix = match CommonPrefix::parse(&packet.data) {
384            Some(p) => p,
385            None => return, // Malformed prefix
386        };
387
388        if prefix.version != FMP_VERSION {
389            debug!(
390                version = prefix.version,
391                transport_id = %packet.transport_id,
392                "Unknown FMP version, dropping"
393            );
394
395            // If the packet arrived on an adopted Nostr-NAT bootstrap
396            // transport, the originating peer is necessarily on a
397            // different FMP-protocol version than us — the discovery
398            // sweep would otherwise re-traverse them every cycle even
399            // though no msg1/msg2 exchange can ever succeed. Bump the
400            // discovery-layer cooldown to the long protocol-mismatch
401            // window and emit a single WARN per fresh observation.
402            if self.bootstrap_transports.contains(&packet.transport_id)
403                && let Some(npub) = self
404                    .bootstrap_transport_npubs
405                    .get(&packet.transport_id)
406                    .cloned()
407                && let Some(handle) = self.nostr_discovery_handle()
408            {
409                let now_ms = Self::now_ms();
410                let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
411                if handle.record_protocol_mismatch(&npub, now_ms) {
412                    warn!(
413                        peer_npub = %npub,
414                        transport_id = %packet.transport_id,
415                        peer_version = prefix.version,
416                        our_version = FMP_VERSION,
417                        cooldown_secs,
418                        "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
419                    );
420                }
421            }
422            return;
423        }
424
425        match prefix.phase {
426            PHASE_ESTABLISHED => {
427                self.handle_encrypted_frame(packet).await;
428            }
429            PHASE_MSG1 => {
430                self.handle_msg1(packet).await;
431            }
432            PHASE_MSG2 => {
433                self.handle_msg2(packet).await;
434            }
435            _ => {
436                debug!(
437                    phase = prefix.phase,
438                    transport_id = %packet.transport_id,
439                    "Unknown FMP phase, dropping"
440                );
441            }
442        }
443    }
444}