fips_core/node/handlers/rx_loop.rs
1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9 PHASE_MSG2,
10};
11use crate::node::{Node, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use std::time::Duration;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, info, trace, warn};
17
18/// How often the raw-packet drain loop yields a slice of work to the
19/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
20/// progress steady under sustained inbound bursts.
21const FALLBACK_INTERLEAVE_EVERY: usize = 32;
22/// Cap on the per-interleave fallback drain so a hot inbound spike
23/// can't starve the outer raw-packet drain in the opposite direction.
24const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
25const PACKET_DRAIN_BUDGET: usize = 256;
26const RX_LOOP_SLOW_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(100);
27
28impl Node {
29 /// Run the receive event loop.
30 ///
31 /// Processes packets from all transports, dispatching based on
32 /// the phase field in the 4-byte common prefix:
33 /// - Phase 0x0: Encrypted frame (session data)
34 /// - Phase 0x1: Handshake message 1 (initiator -> responder)
35 /// - Phase 0x2: Handshake message 2 (responder -> initiator)
36 ///
37 /// Also processes outbound IPv6 packets from the TUN reader for session
38 /// encapsulation and routing through the mesh.
39 ///
40 /// Also processes DNS-resolved identities for identity cache population.
41 ///
42 /// Also runs a periodic tick (1s) to clean up stale handshake connections
43 /// that never received a response. This prevents resource leaks when peers
44 /// are unreachable.
45 ///
46 /// This method takes ownership of the packet_rx channel and runs
47 /// until the channel is closed (typically when stop() is called).
48 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
49 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
50
51 // Take the TUN outbound receiver, or create a dummy channel that never
52 // produces messages (when TUN is disabled). Holding the sender prevents
53 // the channel from closing.
54 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
55 Some(rx) => (rx, None),
56 None => {
57 let (tx, rx) = tokio::sync::mpsc::channel(1);
58 (rx, Some(tx))
59 }
60 };
61
62 // Take the DNS identity receiver, or create a dummy channel (when DNS
63 // is disabled). Same pattern as TUN outbound.
64 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
65 Some(rx) => (rx, None),
66 None => {
67 let (tx, rx) = tokio::sync::mpsc::channel(1);
68 (rx, Some(tx))
69 }
70 };
71
72 // Take the endpoint-data command receiver, or create a dummy channel
73 // when the embedded endpoint API is not in use.
74 let (mut endpoint_command_rx, _endpoint_command_guard) =
75 match self.endpoint_command_rx.take() {
76 Some(rx) => (rx, None),
77 None => {
78 let (tx, rx) = tokio::sync::mpsc::channel(1);
79 (rx, Some(tx))
80 }
81 };
82
83 // Take the decrypt worker fallback receiver if a worker pool
84 // is in use. The worker pushes non-fast-path packets (anything
85 // that's not bulk EndpointData) here for the legacy dispatch.
86 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
87 match self.decrypt_fallback_rx.take() {
88 Some(rx) => (rx, None),
89 None => {
90 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
91 (rx, Some(tx))
92 }
93 };
94
95 let mut tick =
96 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
97
98 // Set up control socket channel
99 let (control_tx, mut control_rx) =
100 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
101
102 if self.config.node.control.enabled {
103 let config = self.config.node.control.clone();
104 let tx = control_tx.clone();
105 tokio::spawn(async move {
106 match ControlSocket::bind(&config) {
107 Ok(socket) => {
108 socket.accept_loop(tx).await;
109 }
110 Err(e) => {
111 warn!(error = %e, "Failed to bind control socket");
112 }
113 }
114 });
115 }
116 // Drop unused sender to avoid keeping channel open if control is disabled
117 drop(control_tx);
118
119 info!("RX event loop started");
120 // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
121 crate::perf_profile::maybe_spawn_reporter();
122
123 loop {
124 tokio::select! {
125 biased;
126 // Decrypt-worker fallback drains FIRST. The previous
127 // ordering put `packet_rx` first, which under sustained
128 // inbound bursts let the raw-packet drain (up to 256
129 // packets + flush) starve fallback work for tens of
130 // milliseconds. UDP throughput tolerates that; TCP
131 // doesn't — late FMP plaintexts mean late ACKs,
132 // dup-ACK fast retransmits, and cwnd collapse.
133 // Reproduced on native macOS / Wi-Fi where TCP fell to
134 // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
135 // tunnel. Promoting fallback gives the kernel-side
136 // TCP machinery a fair chance to see its ACKs and
137 // keep cwnd growing.
138 Some(event) = decrypt_fallback_rx.recv() => {
139 self.process_decrypt_worker_event(event).await;
140 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
141 self.flush_pending_sends().await;
142 }
143 packet = packet_rx.recv() => {
144 match packet {
145 Some(p) => {
146 self.drain_packet_rx(
147 &mut packet_rx,
148 &mut decrypt_fallback_rx,
149 Some(p),
150 PACKET_DRAIN_BUDGET,
151 ).await;
152 }
153 None => break, // channel closed
154 }
155 }
156 _ = tick.tick() => {
157 let drained = self.drain_packet_rx(
158 &mut packet_rx,
159 &mut decrypt_fallback_rx,
160 None,
161 PACKET_DRAIN_BUDGET,
162 ).await;
163 if drained > 0 {
164 debug!(
165 drained,
166 "Drained queued packets before rx-loop maintenance"
167 );
168 }
169 self.run_rx_loop_maintenance_tick().await;
170 }
171 Some(ipv6_packet) = tun_outbound_rx.recv() => {
172 self.handle_tun_outbound(ipv6_packet).await;
173 let mut drained = 0;
174 while drained < 256 {
175 match tun_outbound_rx.try_recv() {
176 Ok(p) => {
177 self.handle_tun_outbound(p).await;
178 drained += 1;
179 }
180 Err(_) => break,
181 }
182 }
183 // Flush any trailing batched sends so the last
184 // packets of a burst don't sit in the per-transport
185 // sendmmsg buffer waiting for the threshold.
186 self.flush_pending_sends().await;
187 }
188 Some(identity) = dns_identity_rx.recv() => {
189 debug!(
190 node_addr = %identity.node_addr,
191 "Registering identity from DNS resolution"
192 );
193 self.register_identity(identity.node_addr, identity.pubkey);
194 }
195 Some(command) = endpoint_command_rx.recv() => {
196 self.handle_endpoint_data_command(command).await;
197 // Same drain pattern: when the application is shoving
198 // tunnel data in via send_oneway, several Send commands
199 // typically queue up between scheduler hops.
200 let mut drained = 0;
201 while drained < 256 {
202 match endpoint_command_rx.try_recv() {
203 Ok(c) => {
204 self.handle_endpoint_data_command(c).await;
205 drained += 1;
206 }
207 Err(_) => break,
208 }
209 }
210 // Flush any trailing batched sends from the
211 // per-transport sendmmsg buffer.
212 self.flush_pending_sends().await;
213 }
214 Some((request, response_tx)) = control_rx.recv() => {
215 let response = if request.command.starts_with("show_") {
216 queries::dispatch(self, &request.command, request.params.as_ref())
217 } else {
218 commands::dispatch(
219 self,
220 &request.command,
221 request.params.as_ref(),
222 ).await
223 };
224 let _ = response_tx.send(response);
225 }
226 }
227 }
228
229 info!("RX event loop stopped (channel closed)");
230 Ok(())
231 }
232
233 async fn drain_packet_rx(
234 &mut self,
235 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
236 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
237 first_packet: Option<ReceivedPacket>,
238 budget: usize,
239 ) -> usize {
240 let mut drained = 0usize;
241 if let Some(packet) = first_packet {
242 self.process_packet(packet).await;
243 drained = 1;
244 }
245
246 // Drain remaining ready inbound packets in a tight loop before
247 // yielding back to select! Every yield is a scheduler hop, and at
248 // line rate transports typically have several packets available per
249 // wake. Caps at a batch boundary so other branches eventually get a
250 // turn even under sustained load.
251 while drained < budget {
252 if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
253 self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
254 .await;
255 }
256 match packet_rx.try_recv() {
257 Ok(packet) => {
258 self.process_packet(packet).await;
259 drained += 1;
260 }
261 Err(_) => break,
262 }
263 }
264
265 if drained > 0 {
266 // One trailing fallback drain so the last bounced packets of the
267 // burst aren't held up by the post-burst send flush.
268 self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
269 .await;
270 // Flush any batched sends triggered by inbound packets (e.g.
271 // forwarded SessionDatagrams, MMP reports, tree announces).
272 self.flush_pending_sends().await;
273 }
274 drained
275 }
276
277 async fn run_rx_loop_maintenance_tick(&mut self) {
278 self.check_timeouts();
279 let now_ms = Self::now_ms();
280 // Link/session liveness must run before slower retry/discovery work:
281 // under bulk send pressure a late heartbeat or MMP report is
282 // indistinguishable from a dead direct path on the remote peer.
283 self.check_link_heartbeats().await;
284 self.reload_peer_acl();
285 self.resend_pending_handshakes(now_ms).await;
286 self.resend_pending_rekeys(now_ms).await;
287 self.resend_pending_session_handshakes(now_ms).await;
288 self.resend_pending_session_msg3(now_ms).await;
289 self.purge_idle_sessions(now_ms);
290 self.purge_learned_routes(now_ms);
291 self.check_mmp_reports().await;
292 self.check_session_mmp_reports().await;
293 self.check_rekey().await;
294 self.check_session_rekey().await;
295 self.check_pending_lookups(now_ms).await;
296 self.poll_pending_connects().await;
297 self.process_pending_retries(now_ms).await;
298 self.poll_transport_discovery().await;
299 self.activate_connected_udp_sessions().await;
300 self.sample_transport_congestion();
301
302 if tokio::time::timeout(
303 RX_LOOP_SLOW_MAINTENANCE_TIMEOUT,
304 self.run_rx_loop_slow_maintenance_tick(),
305 )
306 .await
307 .is_err()
308 {
309 self.mark_rx_loop_maintenance_timeout();
310 warn!(
311 timeout_ms = RX_LOOP_SLOW_MAINTENANCE_TIMEOUT.as_millis() as u64,
312 "RX loop slow maintenance timed out; continuing packet processing"
313 );
314 }
315 }
316
317 async fn run_rx_loop_slow_maintenance_tick(&mut self) {
318 // Discovery and graph/stat maintenance can involve relay work or
319 // larger scans. Keep it bounded after direct-path liveness and session
320 // upkeep so a slow Nostr/LAN tick degrades discovery freshness, not
321 // packet flow.
322 self.poll_nostr_discovery().await;
323 self.poll_lan_discovery().await;
324 self.poll_local_instance_discovery().await;
325 self.check_tree_state().await;
326 self.check_bloom_state().await;
327 self.compute_mesh_size();
328 self.record_stats_history();
329 }
330
331 /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
332 /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
333 /// flag byte the worker captured into `DecryptFallback::fmp_flags`
334 /// (without this both ECN CE propagation and spin-bit RTT
335 /// observation are dropped on the worker path) and slices the
336 /// plaintext out of the original wire buffer with zero allocation.
337 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
338 match event {
339 DecryptWorkerEvent::Plaintext(fallback) => {
340 self.process_decrypt_fallback(fallback).await;
341 }
342 DecryptWorkerEvent::DecryptFailure(report) => {
343 self.process_decrypt_failure_report(report).await;
344 }
345 }
346 }
347
348 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
349 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
350 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
351 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
352 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
353 self.process_authentic_fmp_plaintext(
354 &fallback.source_node_addr,
355 fallback.transport_id,
356 &fallback.remote_addr,
357 fallback.timestamp_ms,
358 fallback.packet_len,
359 fallback.fmp_counter,
360 ce_flag,
361 sp_flag,
362 plaintext,
363 )
364 .await;
365 }
366
367 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
368 debug!(
369 peer = %self.peer_display_name(&report.source_node_addr),
370 counter = report.fmp_counter,
371 replay_highest = report.fmp_replay_highest,
372 "Worker FMP AEAD decryption failed"
373 );
374 self.handle_decrypt_failure_report(&report).await;
375 }
376
377 /// Drain up to `budget` queued fallbacks without yielding back to
378 /// `select!`. Returns the number processed. Called both from the
379 /// promoted-fallback select arm (after the head item) and
380 /// interleaved inside the packet_rx drain loop so bounced FMP
381 /// plaintexts can't accumulate behind a 256-packet inbound burst.
382 async fn drain_decrypt_fallback(
383 &mut self,
384 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
385 budget: usize,
386 ) -> usize {
387 let mut drained = 0;
388 while drained < budget {
389 match rx.try_recv() {
390 Ok(event) => {
391 self.process_decrypt_worker_event(event).await;
392 drained += 1;
393 }
394 Err(_) => break,
395 }
396 }
397 drained
398 }
399
400 /// Flush any pending batched sends across all transports. Today
401 /// every transport's `flush_pending_send` is a no-op — the UDP
402 /// transport's per-transport `pending_send` buffer was removed
403 /// when the bulk data path moved into `encrypt_worker` (which
404 /// does its own target-grouped `sendmmsg(2)` directly). The
405 /// call sites are retained so any future batched transport can
406 /// opt in by overriding `flush_pending_send` without touching
407 /// the rx_loop.
408 async fn flush_pending_sends(&self) {
409 for transport in self.transports.values() {
410 if matches!(transport, TransportHandle::Udp(_)) {
411 transport.flush_pending_send().await;
412 }
413 }
414 }
415
416 /// Process a single received packet.
417 ///
418 /// Dispatches based on the phase field in the 4-byte common prefix.
419 pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
420 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
421 crate::perf_profile::record_since(
422 crate::perf_profile::Stage::TransportQueueWait,
423 packet.trace_enqueued_at,
424 );
425 if is_punch_packet(&packet.data) {
426 trace!(
427 transport_id = %packet.transport_id,
428 remote_addr = %packet.remote_addr,
429 bytes = packet.data.len(),
430 "Dropping stray punch probe/ack in FMP rx loop"
431 );
432 return;
433 }
434 if packet.data.len() < COMMON_PREFIX_SIZE {
435 return; // Drop packets too short for common prefix
436 }
437
438 let prefix = match CommonPrefix::parse(&packet.data) {
439 Some(p) => p,
440 None => return, // Malformed prefix
441 };
442 if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
443 debug!(
444 transport_id = %packet.transport_id,
445 remote_addr = %packet.remote_addr,
446 bytes = packet.data.len(),
447 phase = prefix.phase,
448 version = prefix.version,
449 "FMP handshake packet dispatch"
450 );
451 } else {
452 trace!(
453 transport_id = %packet.transport_id,
454 remote_addr = %packet.remote_addr,
455 bytes = packet.data.len(),
456 phase = prefix.phase,
457 version = prefix.version,
458 "FMP packet dispatch"
459 );
460 }
461
462 if prefix.version != FMP_VERSION {
463 debug!(
464 version = prefix.version,
465 transport_id = %packet.transport_id,
466 "Unknown FMP version, dropping"
467 );
468
469 // If the packet arrived on an adopted Nostr-NAT bootstrap
470 // transport, the originating peer is necessarily on a
471 // different FMP-protocol version than us — the discovery
472 // sweep would otherwise re-traverse them every cycle even
473 // though no msg1/msg2 exchange can ever succeed. Bump the
474 // discovery-layer cooldown to the long protocol-mismatch
475 // window and emit a single WARN per fresh observation.
476 let looks_like_fmp_phase =
477 matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
478 if looks_like_fmp_phase
479 && self.bootstrap_transports.contains(&packet.transport_id)
480 && let Some(npub) = self
481 .bootstrap_transport_npubs
482 .get(&packet.transport_id)
483 .cloned()
484 && let Some(handle) = self.nostr_discovery_handle()
485 {
486 let now_ms = Self::now_ms();
487 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
488 if handle.record_protocol_mismatch(&npub, now_ms) {
489 warn!(
490 peer_npub = %npub,
491 transport_id = %packet.transport_id,
492 peer_version = prefix.version,
493 our_version = FMP_VERSION,
494 cooldown_secs,
495 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
496 );
497 }
498 }
499 return;
500 }
501
502 match prefix.phase {
503 PHASE_ESTABLISHED => {
504 self.handle_encrypted_frame(packet).await;
505 }
506 PHASE_MSG1 => {
507 self.handle_msg1(packet).await;
508 }
509 PHASE_MSG2 => {
510 self.handle_msg2(packet).await;
511 }
512 _ => {
513 debug!(
514 phase = prefix.phase,
515 transport_id = %packet.transport_id,
516 "Unknown FMP phase, dropping"
517 );
518 }
519 }
520 }
521}