fips_core/node/handlers/rx_loop.rs
1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::node::decrypt_worker::DecryptFallback;
6use crate::node::wire::{
7 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
8 PHASE_MSG2,
9};
10use crate::node::{Node, NodeError};
11use crate::transport::ReceivedPacket;
12use crate::transport::TransportHandle;
13use std::time::Duration;
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, info, warn};
16
17/// How often the raw-packet drain loop yields a slice of work to the
18/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
19/// progress steady under sustained inbound bursts.
20const FALLBACK_INTERLEAVE_EVERY: usize = 32;
21/// Cap on the per-interleave fallback drain so a hot inbound spike
22/// can't starve the outer raw-packet drain in the opposite direction.
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
24
25impl Node {
26 /// Run the receive event loop.
27 ///
28 /// Processes packets from all transports, dispatching based on
29 /// the phase field in the 4-byte common prefix:
30 /// - Phase 0x0: Encrypted frame (session data)
31 /// - Phase 0x1: Handshake message 1 (initiator -> responder)
32 /// - Phase 0x2: Handshake message 2 (responder -> initiator)
33 ///
34 /// Also processes outbound IPv6 packets from the TUN reader for session
35 /// encapsulation and routing through the mesh.
36 ///
37 /// Also processes DNS-resolved identities for identity cache population.
38 ///
39 /// Also runs a periodic tick (1s) to clean up stale handshake connections
40 /// that never received a response. This prevents resource leaks when peers
41 /// are unreachable.
42 ///
43 /// This method takes ownership of the packet_rx channel and runs
44 /// until the channel is closed (typically when stop() is called).
45 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
46 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
47
48 // Take the TUN outbound receiver, or create a dummy channel that never
49 // produces messages (when TUN is disabled). Holding the sender prevents
50 // the channel from closing.
51 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
52 Some(rx) => (rx, None),
53 None => {
54 let (tx, rx) = tokio::sync::mpsc::channel(1);
55 (rx, Some(tx))
56 }
57 };
58
59 // Take the DNS identity receiver, or create a dummy channel (when DNS
60 // is disabled). Same pattern as TUN outbound.
61 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
62 Some(rx) => (rx, None),
63 None => {
64 let (tx, rx) = tokio::sync::mpsc::channel(1);
65 (rx, Some(tx))
66 }
67 };
68
69 // Take the endpoint-data command receiver, or create a dummy channel
70 // when the embedded endpoint API is not in use.
71 let (mut endpoint_command_rx, _endpoint_command_guard) =
72 match self.endpoint_command_rx.take() {
73 Some(rx) => (rx, None),
74 None => {
75 let (tx, rx) = tokio::sync::mpsc::channel(1);
76 (rx, Some(tx))
77 }
78 };
79
80 // Take the decrypt worker fallback receiver if a worker pool
81 // is in use. The worker pushes non-fast-path packets (anything
82 // that's not bulk EndpointData) here for the legacy dispatch.
83 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
84 match self.decrypt_fallback_rx.take() {
85 Some(rx) => (rx, None),
86 None => {
87 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
88 (rx, Some(tx))
89 }
90 };
91
92 let mut tick =
93 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
94
95 // Set up control socket channel
96 let (control_tx, mut control_rx) =
97 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
98
99 if self.config.node.control.enabled {
100 let config = self.config.node.control.clone();
101 let tx = control_tx.clone();
102 tokio::spawn(async move {
103 match ControlSocket::bind(&config) {
104 Ok(socket) => {
105 socket.accept_loop(tx).await;
106 }
107 Err(e) => {
108 warn!(error = %e, "Failed to bind control socket");
109 }
110 }
111 });
112 }
113 // Drop unused sender to avoid keeping channel open if control is disabled
114 drop(control_tx);
115
116 info!("RX event loop started");
117 // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
118 crate::perf_profile::maybe_spawn_reporter();
119
120 loop {
121 tokio::select! {
122 biased;
123 // Decrypt-worker fallback drains FIRST. The previous
124 // ordering put `packet_rx` first, which under sustained
125 // inbound bursts let the raw-packet drain (up to 256
126 // packets + flush) starve fallback work for tens of
127 // milliseconds. UDP throughput tolerates that; TCP
128 // doesn't — late FMP plaintexts mean late ACKs,
129 // dup-ACK fast retransmits, and cwnd collapse.
130 // Reproduced on native macOS / Wi-Fi where TCP fell to
131 // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
132 // tunnel. Promoting fallback gives the kernel-side
133 // TCP machinery a fair chance to see its ACKs and
134 // keep cwnd growing.
135 Some(fallback) = decrypt_fallback_rx.recv() => {
136 self.process_decrypt_fallback(fallback).await;
137 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
138 self.flush_pending_sends().await;
139 }
140 // Keep maintenance above the hot packet drains in this
141 // biased select. Sustained UDP/TUN traffic can otherwise
142 // keep packet branches ready long enough to delay MMP
143 // reports, rekey retries, congestion samples, and connected
144 // UDP activation until after the dataplane already looks
145 // unhealthy.
146 _ = tick.tick() => {
147 self.run_rx_loop_maintenance_tick().await;
148 }
149 packet = packet_rx.recv() => {
150 match packet {
151 Some(p) => self.process_packet(p).await,
152 None => break, // channel closed
153 }
154 // Drain remaining ready inbound packets in a tight loop
155 // before yielding back to select! — every yield is a
156 // futex hop on tokio's multi-thread scheduler, and at
157 // line rate the kernel UDP queue typically has several
158 // datagrams available per wake. Caps at a batch
159 // boundary so other branches (tick, control) eventually
160 // get a turn even under sustained load.
161 let mut drained: usize = 1; // counts the packet processed above
162 while drained < 256 {
163 if drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
164 // Interleave fallback drain so bounced FMP
165 // plaintexts (heartbeats, post-FSP-decrypt
166 // TCP ACKs, control frames) don't sit in
167 // the fallback queue for a full 256-packet
168 // burst. Without this, even the
169 // priority-first ordering above can't help
170 // once we're inside this inner loop.
171 self.drain_decrypt_fallback(
172 &mut decrypt_fallback_rx,
173 FALLBACK_INTERLEAVE_BUDGET,
174 )
175 .await;
176 }
177 match packet_rx.try_recv() {
178 Ok(p) => {
179 self.process_packet(p).await;
180 drained += 1;
181 }
182 Err(_) => break,
183 }
184 }
185 // One trailing fallback drain so the last bounced
186 // packets of the burst aren't held up by the
187 // post-burst send flush.
188 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 256)
189 .await;
190 // Flush any batched sends triggered by inbound
191 // packets (e.g. forwarded SessionDatagrams, MMP
192 // reports, tree announces).
193 self.flush_pending_sends().await;
194 }
195 Some(ipv6_packet) = tun_outbound_rx.recv() => {
196 self.handle_tun_outbound(ipv6_packet).await;
197 let mut drained = 0;
198 while drained < 256 {
199 match tun_outbound_rx.try_recv() {
200 Ok(p) => {
201 self.handle_tun_outbound(p).await;
202 drained += 1;
203 }
204 Err(_) => break,
205 }
206 }
207 // Flush any trailing batched sends so the last
208 // packets of a burst don't sit in the per-transport
209 // sendmmsg buffer waiting for the threshold.
210 self.flush_pending_sends().await;
211 }
212 Some(identity) = dns_identity_rx.recv() => {
213 debug!(
214 node_addr = %identity.node_addr,
215 "Registering identity from DNS resolution"
216 );
217 self.register_identity(identity.node_addr, identity.pubkey);
218 }
219 Some(command) = endpoint_command_rx.recv() => {
220 self.handle_endpoint_data_command(command).await;
221 // Same drain pattern: when the application is shoving
222 // tunnel data in via send_oneway, several Send commands
223 // typically queue up between scheduler hops.
224 let mut drained = 0;
225 while drained < 256 {
226 match endpoint_command_rx.try_recv() {
227 Ok(c) => {
228 self.handle_endpoint_data_command(c).await;
229 drained += 1;
230 }
231 Err(_) => break,
232 }
233 }
234 // Flush any trailing batched sends from the
235 // per-transport sendmmsg buffer.
236 self.flush_pending_sends().await;
237 }
238 Some((request, response_tx)) = control_rx.recv() => {
239 let response = if request.command.starts_with("show_") {
240 queries::dispatch(self, &request.command, request.params.as_ref())
241 } else {
242 commands::dispatch(
243 self,
244 &request.command,
245 request.params.as_ref(),
246 ).await
247 };
248 let _ = response_tx.send(response);
249 }
250 }
251 }
252
253 info!("RX event loop stopped (channel closed)");
254 Ok(())
255 }
256
257 async fn run_rx_loop_maintenance_tick(&mut self) {
258 self.check_timeouts();
259 let now_ms = Self::now_ms();
260 self.reload_peer_acl();
261 self.poll_pending_connects().await;
262 self.poll_nostr_discovery().await;
263 self.poll_lan_discovery().await;
264 self.resend_pending_handshakes(now_ms).await;
265 self.resend_pending_rekeys(now_ms).await;
266 self.resend_pending_session_handshakes(now_ms).await;
267 self.purge_idle_sessions(now_ms);
268 self.purge_learned_routes(now_ms);
269 self.process_pending_retries(now_ms).await;
270 self.check_tree_state().await;
271 self.check_bloom_state().await;
272 self.compute_mesh_size();
273 self.record_stats_history();
274 self.check_mmp_reports().await;
275 self.check_session_mmp_reports().await;
276 self.check_link_heartbeats().await;
277 self.check_rekey().await;
278 self.check_session_rekey().await;
279 self.check_pending_lookups(now_ms).await;
280 self.poll_transport_discovery().await;
281 self.sample_transport_congestion();
282 self.activate_connected_udp_sessions().await;
283 }
284
285 /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
286 /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
287 /// flag byte the worker captured into `DecryptFallback::fmp_flags`
288 /// (without this both ECN CE propagation and spin-bit RTT
289 /// observation are dropped on the worker path) and slices the
290 /// plaintext out of the original wire buffer with zero allocation.
291 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
292 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
293 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
294 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
295 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
296 self.process_authentic_fmp_plaintext(
297 &fallback.source_node_addr,
298 fallback.transport_id,
299 &fallback.remote_addr,
300 fallback.timestamp_ms,
301 fallback.packet_len,
302 fallback.fmp_counter,
303 ce_flag,
304 sp_flag,
305 plaintext,
306 )
307 .await;
308 }
309
310 /// Drain up to `budget` queued fallbacks without yielding back to
311 /// `select!`. Returns the number processed. Called both from the
312 /// promoted-fallback select arm (after the head item) and
313 /// interleaved inside the packet_rx drain loop so bounced FMP
314 /// plaintexts can't accumulate behind a 256-packet inbound burst.
315 async fn drain_decrypt_fallback(
316 &mut self,
317 rx: &mut UnboundedReceiver<DecryptFallback>,
318 budget: usize,
319 ) -> usize {
320 let mut drained = 0;
321 while drained < budget {
322 match rx.try_recv() {
323 Ok(fallback) => {
324 self.process_decrypt_fallback(fallback).await;
325 drained += 1;
326 }
327 Err(_) => break,
328 }
329 }
330 drained
331 }
332
333 /// Flush any pending batched sends across all transports. Today
334 /// every transport's `flush_pending_send` is a no-op — the UDP
335 /// transport's per-transport `pending_send` buffer was removed
336 /// when the bulk data path moved into `encrypt_worker` (which
337 /// does its own target-grouped `sendmmsg(2)` directly). The
338 /// call sites are retained so any future batched transport can
339 /// opt in by overriding `flush_pending_send` without touching
340 /// the rx_loop.
341 async fn flush_pending_sends(&self) {
342 for transport in self.transports.values() {
343 if matches!(transport, TransportHandle::Udp(_)) {
344 transport.flush_pending_send().await;
345 }
346 }
347 }
348
349 /// Process a single received packet.
350 ///
351 /// Dispatches based on the phase field in the 4-byte common prefix.
352 async fn process_packet(&mut self, packet: ReceivedPacket) {
353 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
354 crate::perf_profile::record_since(
355 crate::perf_profile::Stage::TransportQueueWait,
356 packet.trace_enqueued_at,
357 );
358 if packet.data.len() < COMMON_PREFIX_SIZE {
359 return; // Drop packets too short for common prefix
360 }
361
362 let prefix = match CommonPrefix::parse(&packet.data) {
363 Some(p) => p,
364 None => return, // Malformed prefix
365 };
366
367 if prefix.version != FMP_VERSION {
368 debug!(
369 version = prefix.version,
370 transport_id = %packet.transport_id,
371 "Unknown FMP version, dropping"
372 );
373
374 // If the packet arrived on an adopted Nostr-NAT bootstrap
375 // transport, the originating peer is necessarily on a
376 // different FMP-protocol version than us — the discovery
377 // sweep would otherwise re-traverse them every cycle even
378 // though no msg1/msg2 exchange can ever succeed. Bump the
379 // discovery-layer cooldown to the long protocol-mismatch
380 // window and emit a single WARN per fresh observation.
381 if self.bootstrap_transports.contains(&packet.transport_id)
382 && let Some(npub) = self
383 .bootstrap_transport_npubs
384 .get(&packet.transport_id)
385 .cloned()
386 && let Some(handle) = self.nostr_discovery_handle()
387 {
388 let now_ms = Self::now_ms();
389 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
390 if handle.record_protocol_mismatch(&npub, now_ms) {
391 warn!(
392 peer_npub = %npub,
393 transport_id = %packet.transport_id,
394 peer_version = prefix.version,
395 our_version = FMP_VERSION,
396 cooldown_secs,
397 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
398 );
399 }
400 }
401 return;
402 }
403
404 match prefix.phase {
405 PHASE_ESTABLISHED => {
406 self.handle_encrypted_frame(packet).await;
407 }
408 PHASE_MSG1 => {
409 self.handle_msg1(packet).await;
410 }
411 PHASE_MSG2 => {
412 self.handle_msg2(packet).await;
413 }
414 _ => {
415 debug!(
416 phase = prefix.phase,
417 transport_id = %packet.transport_id,
418 "Unknown FMP phase, dropping"
419 );
420 }
421 }
422 }
423}