fips_core/node/handlers/rx_loop.rs
1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9 PHASE_MSG2,
10};
11use crate::node::{Node, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use std::time::Duration;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, info, trace, warn};
17
18/// How often the raw-packet drain loop yields a slice of work to the
19/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
20/// progress steady under sustained inbound bursts.
21const FALLBACK_INTERLEAVE_EVERY: usize = 32;
22/// Cap on the per-interleave fallback drain so a hot inbound spike
23/// can't starve the outer raw-packet drain in the opposite direction.
24const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
25const PACKET_DRAIN_BUDGET: usize = 256;
26const RX_LOOP_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(500);
27
28impl Node {
29 /// Run the receive event loop.
30 ///
31 /// Processes packets from all transports, dispatching based on
32 /// the phase field in the 4-byte common prefix:
33 /// - Phase 0x0: Encrypted frame (session data)
34 /// - Phase 0x1: Handshake message 1 (initiator -> responder)
35 /// - Phase 0x2: Handshake message 2 (responder -> initiator)
36 ///
37 /// Also processes outbound IPv6 packets from the TUN reader for session
38 /// encapsulation and routing through the mesh.
39 ///
40 /// Also processes DNS-resolved identities for identity cache population.
41 ///
42 /// Also runs a periodic tick (1s) to clean up stale handshake connections
43 /// that never received a response. This prevents resource leaks when peers
44 /// are unreachable.
45 ///
46 /// This method takes ownership of the packet_rx channel and runs
47 /// until the channel is closed (typically when stop() is called).
48 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
49 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
50
51 // Take the TUN outbound receiver, or create a dummy channel that never
52 // produces messages (when TUN is disabled). Holding the sender prevents
53 // the channel from closing.
54 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
55 Some(rx) => (rx, None),
56 None => {
57 let (tx, rx) = tokio::sync::mpsc::channel(1);
58 (rx, Some(tx))
59 }
60 };
61
62 // Take the DNS identity receiver, or create a dummy channel (when DNS
63 // is disabled). Same pattern as TUN outbound.
64 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
65 Some(rx) => (rx, None),
66 None => {
67 let (tx, rx) = tokio::sync::mpsc::channel(1);
68 (rx, Some(tx))
69 }
70 };
71
72 // Take the endpoint-data command receiver, or create a dummy channel
73 // when the embedded endpoint API is not in use.
74 let (mut endpoint_command_rx, _endpoint_command_guard) =
75 match self.endpoint_command_rx.take() {
76 Some(rx) => (rx, None),
77 None => {
78 let (tx, rx) = tokio::sync::mpsc::channel(1);
79 (rx, Some(tx))
80 }
81 };
82
83 // Take the decrypt worker fallback receiver if a worker pool
84 // is in use. The worker pushes non-fast-path packets (anything
85 // that's not bulk EndpointData) here for the legacy dispatch.
86 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
87 match self.decrypt_fallback_rx.take() {
88 Some(rx) => (rx, None),
89 None => {
90 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
91 (rx, Some(tx))
92 }
93 };
94
95 let mut tick =
96 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
97
98 // Set up control socket channel
99 let (control_tx, mut control_rx) =
100 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
101
102 if self.config.node.control.enabled {
103 let config = self.config.node.control.clone();
104 let tx = control_tx.clone();
105 tokio::spawn(async move {
106 match ControlSocket::bind(&config) {
107 Ok(socket) => {
108 socket.accept_loop(tx).await;
109 }
110 Err(e) => {
111 warn!(error = %e, "Failed to bind control socket");
112 }
113 }
114 });
115 }
116 // Drop unused sender to avoid keeping channel open if control is disabled
117 drop(control_tx);
118
119 info!("RX event loop started");
120 // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
121 crate::perf_profile::maybe_spawn_reporter();
122
123 loop {
124 tokio::select! {
125 biased;
126 // Decrypt-worker fallback drains FIRST. The previous
127 // ordering put `packet_rx` first, which under sustained
128 // inbound bursts let the raw-packet drain (up to 256
129 // packets + flush) starve fallback work for tens of
130 // milliseconds. UDP throughput tolerates that; TCP
131 // doesn't — late FMP plaintexts mean late ACKs,
132 // dup-ACK fast retransmits, and cwnd collapse.
133 // Reproduced on native macOS / Wi-Fi where TCP fell to
134 // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
135 // tunnel. Promoting fallback gives the kernel-side
136 // TCP machinery a fair chance to see its ACKs and
137 // keep cwnd growing.
138 Some(event) = decrypt_fallback_rx.recv() => {
139 self.process_decrypt_worker_event(event).await;
140 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
141 self.flush_pending_sends().await;
142 }
143 packet = packet_rx.recv() => {
144 match packet {
145 Some(p) => {
146 self.drain_packet_rx(
147 &mut packet_rx,
148 &mut decrypt_fallback_rx,
149 Some(p),
150 PACKET_DRAIN_BUDGET,
151 ).await;
152 }
153 None => break, // channel closed
154 }
155 }
156 _ = tick.tick() => {
157 let drained = self.drain_packet_rx(
158 &mut packet_rx,
159 &mut decrypt_fallback_rx,
160 None,
161 PACKET_DRAIN_BUDGET,
162 ).await;
163 if drained > 0 {
164 debug!(
165 drained,
166 "Drained queued packets before rx-loop maintenance"
167 );
168 }
169 if tokio::time::timeout(
170 RX_LOOP_MAINTENANCE_TIMEOUT,
171 self.run_rx_loop_maintenance_tick(),
172 ).await.is_err() {
173 self.mark_rx_loop_maintenance_timeout();
174 warn!(
175 timeout_ms = RX_LOOP_MAINTENANCE_TIMEOUT.as_millis() as u64,
176 "RX loop maintenance timed out; continuing packet processing"
177 );
178 }
179 }
180 Some(ipv6_packet) = tun_outbound_rx.recv() => {
181 self.handle_tun_outbound(ipv6_packet).await;
182 let mut drained = 0;
183 while drained < 256 {
184 match tun_outbound_rx.try_recv() {
185 Ok(p) => {
186 self.handle_tun_outbound(p).await;
187 drained += 1;
188 }
189 Err(_) => break,
190 }
191 }
192 // Flush any trailing batched sends so the last
193 // packets of a burst don't sit in the per-transport
194 // sendmmsg buffer waiting for the threshold.
195 self.flush_pending_sends().await;
196 }
197 Some(identity) = dns_identity_rx.recv() => {
198 debug!(
199 node_addr = %identity.node_addr,
200 "Registering identity from DNS resolution"
201 );
202 self.register_identity(identity.node_addr, identity.pubkey);
203 }
204 Some(command) = endpoint_command_rx.recv() => {
205 self.handle_endpoint_data_command(command).await;
206 // Same drain pattern: when the application is shoving
207 // tunnel data in via send_oneway, several Send commands
208 // typically queue up between scheduler hops.
209 let mut drained = 0;
210 while drained < 256 {
211 match endpoint_command_rx.try_recv() {
212 Ok(c) => {
213 self.handle_endpoint_data_command(c).await;
214 drained += 1;
215 }
216 Err(_) => break,
217 }
218 }
219 // Flush any trailing batched sends from the
220 // per-transport sendmmsg buffer.
221 self.flush_pending_sends().await;
222 }
223 Some((request, response_tx)) = control_rx.recv() => {
224 let response = if request.command.starts_with("show_") {
225 queries::dispatch(self, &request.command, request.params.as_ref())
226 } else {
227 commands::dispatch(
228 self,
229 &request.command,
230 request.params.as_ref(),
231 ).await
232 };
233 let _ = response_tx.send(response);
234 }
235 }
236 }
237
238 info!("RX event loop stopped (channel closed)");
239 Ok(())
240 }
241
242 async fn drain_packet_rx(
243 &mut self,
244 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
245 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
246 first_packet: Option<ReceivedPacket>,
247 budget: usize,
248 ) -> usize {
249 let mut drained = 0usize;
250 if let Some(packet) = first_packet {
251 self.process_packet(packet).await;
252 drained = 1;
253 }
254
255 // Drain remaining ready inbound packets in a tight loop before
256 // yielding back to select! Every yield is a scheduler hop, and at
257 // line rate transports typically have several packets available per
258 // wake. Caps at a batch boundary so other branches eventually get a
259 // turn even under sustained load.
260 while drained < budget {
261 if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
262 self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
263 .await;
264 }
265 match packet_rx.try_recv() {
266 Ok(packet) => {
267 self.process_packet(packet).await;
268 drained += 1;
269 }
270 Err(_) => break,
271 }
272 }
273
274 if drained > 0 {
275 // One trailing fallback drain so the last bounced packets of the
276 // burst aren't held up by the post-burst send flush.
277 self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
278 .await;
279 // Flush any batched sends triggered by inbound packets (e.g.
280 // forwarded SessionDatagrams, MMP reports, tree announces).
281 self.flush_pending_sends().await;
282 }
283 drained
284 }
285
286 async fn run_rx_loop_maintenance_tick(&mut self) {
287 self.check_timeouts();
288 let now_ms = Self::now_ms();
289 // Link liveness must run before slower retry/discovery/report work:
290 // this whole maintenance future is guarded by a watchdog timeout, and
291 // under bulk send pressure a late heartbeat is indistinguishable from a
292 // dead direct path on the remote peer.
293 self.check_link_heartbeats().await;
294 self.reload_peer_acl();
295 self.poll_pending_connects().await;
296 self.poll_nostr_discovery().await;
297 self.poll_lan_discovery().await;
298 self.poll_local_instance_discovery().await;
299 self.resend_pending_handshakes(now_ms).await;
300 self.resend_pending_rekeys(now_ms).await;
301 self.resend_pending_session_handshakes(now_ms).await;
302 self.resend_pending_session_msg3(now_ms).await;
303 self.purge_idle_sessions(now_ms);
304 self.purge_learned_routes(now_ms);
305 self.process_pending_retries(now_ms).await;
306 self.check_tree_state().await;
307 self.check_bloom_state().await;
308 self.compute_mesh_size();
309 self.record_stats_history();
310 self.check_mmp_reports().await;
311 self.check_session_mmp_reports().await;
312 self.check_rekey().await;
313 self.check_session_rekey().await;
314 self.check_pending_lookups(now_ms).await;
315 self.poll_transport_discovery().await;
316 self.sample_transport_congestion();
317 self.activate_connected_udp_sessions().await;
318 }
319
320 /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
321 /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
322 /// flag byte the worker captured into `DecryptFallback::fmp_flags`
323 /// (without this both ECN CE propagation and spin-bit RTT
324 /// observation are dropped on the worker path) and slices the
325 /// plaintext out of the original wire buffer with zero allocation.
326 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
327 match event {
328 DecryptWorkerEvent::Plaintext(fallback) => {
329 self.process_decrypt_fallback(fallback).await;
330 }
331 DecryptWorkerEvent::DecryptFailure(report) => {
332 self.process_decrypt_failure_report(report).await;
333 }
334 }
335 }
336
337 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
338 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
339 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
340 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
341 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
342 self.process_authentic_fmp_plaintext(
343 &fallback.source_node_addr,
344 fallback.transport_id,
345 &fallback.remote_addr,
346 fallback.timestamp_ms,
347 fallback.packet_len,
348 fallback.fmp_counter,
349 ce_flag,
350 sp_flag,
351 plaintext,
352 )
353 .await;
354 }
355
356 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
357 debug!(
358 peer = %self.peer_display_name(&report.source_node_addr),
359 counter = report.fmp_counter,
360 replay_highest = report.fmp_replay_highest,
361 "Worker FMP AEAD decryption failed"
362 );
363 self.handle_decrypt_failure_report(&report).await;
364 }
365
366 /// Drain up to `budget` queued fallbacks without yielding back to
367 /// `select!`. Returns the number processed. Called both from the
368 /// promoted-fallback select arm (after the head item) and
369 /// interleaved inside the packet_rx drain loop so bounced FMP
370 /// plaintexts can't accumulate behind a 256-packet inbound burst.
371 async fn drain_decrypt_fallback(
372 &mut self,
373 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
374 budget: usize,
375 ) -> usize {
376 let mut drained = 0;
377 while drained < budget {
378 match rx.try_recv() {
379 Ok(event) => {
380 self.process_decrypt_worker_event(event).await;
381 drained += 1;
382 }
383 Err(_) => break,
384 }
385 }
386 drained
387 }
388
389 /// Flush any pending batched sends across all transports. Today
390 /// every transport's `flush_pending_send` is a no-op — the UDP
391 /// transport's per-transport `pending_send` buffer was removed
392 /// when the bulk data path moved into `encrypt_worker` (which
393 /// does its own target-grouped `sendmmsg(2)` directly). The
394 /// call sites are retained so any future batched transport can
395 /// opt in by overriding `flush_pending_send` without touching
396 /// the rx_loop.
397 async fn flush_pending_sends(&self) {
398 for transport in self.transports.values() {
399 if matches!(transport, TransportHandle::Udp(_)) {
400 transport.flush_pending_send().await;
401 }
402 }
403 }
404
405 /// Process a single received packet.
406 ///
407 /// Dispatches based on the phase field in the 4-byte common prefix.
408 pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
409 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
410 crate::perf_profile::record_since(
411 crate::perf_profile::Stage::TransportQueueWait,
412 packet.trace_enqueued_at,
413 );
414 if is_punch_packet(&packet.data) {
415 trace!(
416 transport_id = %packet.transport_id,
417 remote_addr = %packet.remote_addr,
418 bytes = packet.data.len(),
419 "Dropping stray punch probe/ack in FMP rx loop"
420 );
421 return;
422 }
423 if packet.data.len() < COMMON_PREFIX_SIZE {
424 return; // Drop packets too short for common prefix
425 }
426
427 let prefix = match CommonPrefix::parse(&packet.data) {
428 Some(p) => p,
429 None => return, // Malformed prefix
430 };
431 if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
432 debug!(
433 transport_id = %packet.transport_id,
434 remote_addr = %packet.remote_addr,
435 bytes = packet.data.len(),
436 phase = prefix.phase,
437 version = prefix.version,
438 "FMP handshake packet dispatch"
439 );
440 } else {
441 trace!(
442 transport_id = %packet.transport_id,
443 remote_addr = %packet.remote_addr,
444 bytes = packet.data.len(),
445 phase = prefix.phase,
446 version = prefix.version,
447 "FMP packet dispatch"
448 );
449 }
450
451 if prefix.version != FMP_VERSION {
452 debug!(
453 version = prefix.version,
454 transport_id = %packet.transport_id,
455 "Unknown FMP version, dropping"
456 );
457
458 // If the packet arrived on an adopted Nostr-NAT bootstrap
459 // transport, the originating peer is necessarily on a
460 // different FMP-protocol version than us — the discovery
461 // sweep would otherwise re-traverse them every cycle even
462 // though no msg1/msg2 exchange can ever succeed. Bump the
463 // discovery-layer cooldown to the long protocol-mismatch
464 // window and emit a single WARN per fresh observation.
465 let looks_like_fmp_phase =
466 matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
467 if looks_like_fmp_phase
468 && self.bootstrap_transports.contains(&packet.transport_id)
469 && let Some(npub) = self
470 .bootstrap_transport_npubs
471 .get(&packet.transport_id)
472 .cloned()
473 && let Some(handle) = self.nostr_discovery_handle()
474 {
475 let now_ms = Self::now_ms();
476 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
477 if handle.record_protocol_mismatch(&npub, now_ms) {
478 warn!(
479 peer_npub = %npub,
480 transport_id = %packet.transport_id,
481 peer_version = prefix.version,
482 our_version = FMP_VERSION,
483 cooldown_secs,
484 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
485 );
486 }
487 }
488 return;
489 }
490
491 match prefix.phase {
492 PHASE_ESTABLISHED => {
493 self.handle_encrypted_frame(packet).await;
494 }
495 PHASE_MSG1 => {
496 self.handle_msg1(packet).await;
497 }
498 PHASE_MSG2 => {
499 self.handle_msg2(packet).await;
500 }
501 _ => {
502 debug!(
503 phase = prefix.phase,
504 transport_id = %packet.transport_id,
505 "Unknown FMP phase, dropping"
506 );
507 }
508 }
509 }
510}