fips_core/node/handlers/rx_loop.rs
1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
6use crate::node::wire::{
7 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
8 PHASE_MSG2,
9};
10use crate::node::{Node, NodeError};
11use crate::transport::ReceivedPacket;
12use crate::transport::TransportHandle;
13use std::time::Duration;
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, info, trace, warn};
16
17/// How often the raw-packet drain loop yields a slice of work to the
18/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
19/// progress steady under sustained inbound bursts.
20const FALLBACK_INTERLEAVE_EVERY: usize = 32;
21/// Cap on the per-interleave fallback drain so a hot inbound spike
22/// can't starve the outer raw-packet drain in the opposite direction.
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
24const PACKET_DRAIN_BUDGET: usize = 256;
25const RX_LOOP_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(500);
26
27impl Node {
28 /// Run the receive event loop.
29 ///
30 /// Processes packets from all transports, dispatching based on
31 /// the phase field in the 4-byte common prefix:
32 /// - Phase 0x0: Encrypted frame (session data)
33 /// - Phase 0x1: Handshake message 1 (initiator -> responder)
34 /// - Phase 0x2: Handshake message 2 (responder -> initiator)
35 ///
36 /// Also processes outbound IPv6 packets from the TUN reader for session
37 /// encapsulation and routing through the mesh.
38 ///
39 /// Also processes DNS-resolved identities for identity cache population.
40 ///
41 /// Also runs a periodic tick (1s) to clean up stale handshake connections
42 /// that never received a response. This prevents resource leaks when peers
43 /// are unreachable.
44 ///
45 /// This method takes ownership of the packet_rx channel and runs
46 /// until the channel is closed (typically when stop() is called).
47 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
48 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
49
50 // Take the TUN outbound receiver, or create a dummy channel that never
51 // produces messages (when TUN is disabled). Holding the sender prevents
52 // the channel from closing.
53 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
54 Some(rx) => (rx, None),
55 None => {
56 let (tx, rx) = tokio::sync::mpsc::channel(1);
57 (rx, Some(tx))
58 }
59 };
60
61 // Take the DNS identity receiver, or create a dummy channel (when DNS
62 // is disabled). Same pattern as TUN outbound.
63 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
64 Some(rx) => (rx, None),
65 None => {
66 let (tx, rx) = tokio::sync::mpsc::channel(1);
67 (rx, Some(tx))
68 }
69 };
70
71 // Take the endpoint-data command receiver, or create a dummy channel
72 // when the embedded endpoint API is not in use.
73 let (mut endpoint_command_rx, _endpoint_command_guard) =
74 match self.endpoint_command_rx.take() {
75 Some(rx) => (rx, None),
76 None => {
77 let (tx, rx) = tokio::sync::mpsc::channel(1);
78 (rx, Some(tx))
79 }
80 };
81
82 // Take the decrypt worker fallback receiver if a worker pool
83 // is in use. The worker pushes non-fast-path packets (anything
84 // that's not bulk EndpointData) here for the legacy dispatch.
85 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
86 match self.decrypt_fallback_rx.take() {
87 Some(rx) => (rx, None),
88 None => {
89 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
90 (rx, Some(tx))
91 }
92 };
93
94 let mut tick =
95 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
96
97 // Set up control socket channel
98 let (control_tx, mut control_rx) =
99 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
100
101 if self.config.node.control.enabled {
102 let config = self.config.node.control.clone();
103 let tx = control_tx.clone();
104 tokio::spawn(async move {
105 match ControlSocket::bind(&config) {
106 Ok(socket) => {
107 socket.accept_loop(tx).await;
108 }
109 Err(e) => {
110 warn!(error = %e, "Failed to bind control socket");
111 }
112 }
113 });
114 }
115 // Drop unused sender to avoid keeping channel open if control is disabled
116 drop(control_tx);
117
118 info!("RX event loop started");
119 // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
120 crate::perf_profile::maybe_spawn_reporter();
121
122 loop {
123 tokio::select! {
124 biased;
125 // Decrypt-worker fallback drains FIRST. The previous
126 // ordering put `packet_rx` first, which under sustained
127 // inbound bursts let the raw-packet drain (up to 256
128 // packets + flush) starve fallback work for tens of
129 // milliseconds. UDP throughput tolerates that; TCP
130 // doesn't — late FMP plaintexts mean late ACKs,
131 // dup-ACK fast retransmits, and cwnd collapse.
132 // Reproduced on native macOS / Wi-Fi where TCP fell to
133 // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
134 // tunnel. Promoting fallback gives the kernel-side
135 // TCP machinery a fair chance to see its ACKs and
136 // keep cwnd growing.
137 Some(event) = decrypt_fallback_rx.recv() => {
138 self.process_decrypt_worker_event(event).await;
139 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
140 self.flush_pending_sends().await;
141 }
142 packet = packet_rx.recv() => {
143 match packet {
144 Some(p) => {
145 self.drain_packet_rx(
146 &mut packet_rx,
147 &mut decrypt_fallback_rx,
148 Some(p),
149 PACKET_DRAIN_BUDGET,
150 ).await;
151 }
152 None => break, // channel closed
153 }
154 }
155 _ = tick.tick() => {
156 let drained = self.drain_packet_rx(
157 &mut packet_rx,
158 &mut decrypt_fallback_rx,
159 None,
160 PACKET_DRAIN_BUDGET,
161 ).await;
162 if drained > 0 {
163 debug!(
164 drained,
165 "Drained queued packets before rx-loop maintenance"
166 );
167 }
168 if tokio::time::timeout(
169 RX_LOOP_MAINTENANCE_TIMEOUT,
170 self.run_rx_loop_maintenance_tick(),
171 ).await.is_err() {
172 warn!(
173 timeout_ms = RX_LOOP_MAINTENANCE_TIMEOUT.as_millis() as u64,
174 "RX loop maintenance timed out; continuing packet processing"
175 );
176 }
177 }
178 Some(ipv6_packet) = tun_outbound_rx.recv() => {
179 self.handle_tun_outbound(ipv6_packet).await;
180 let mut drained = 0;
181 while drained < 256 {
182 match tun_outbound_rx.try_recv() {
183 Ok(p) => {
184 self.handle_tun_outbound(p).await;
185 drained += 1;
186 }
187 Err(_) => break,
188 }
189 }
190 // Flush any trailing batched sends so the last
191 // packets of a burst don't sit in the per-transport
192 // sendmmsg buffer waiting for the threshold.
193 self.flush_pending_sends().await;
194 }
195 Some(identity) = dns_identity_rx.recv() => {
196 debug!(
197 node_addr = %identity.node_addr,
198 "Registering identity from DNS resolution"
199 );
200 self.register_identity(identity.node_addr, identity.pubkey);
201 }
202 Some(command) = endpoint_command_rx.recv() => {
203 self.handle_endpoint_data_command(command).await;
204 // Same drain pattern: when the application is shoving
205 // tunnel data in via send_oneway, several Send commands
206 // typically queue up between scheduler hops.
207 let mut drained = 0;
208 while drained < 256 {
209 match endpoint_command_rx.try_recv() {
210 Ok(c) => {
211 self.handle_endpoint_data_command(c).await;
212 drained += 1;
213 }
214 Err(_) => break,
215 }
216 }
217 // Flush any trailing batched sends from the
218 // per-transport sendmmsg buffer.
219 self.flush_pending_sends().await;
220 }
221 Some((request, response_tx)) = control_rx.recv() => {
222 let response = if request.command.starts_with("show_") {
223 queries::dispatch(self, &request.command, request.params.as_ref())
224 } else {
225 commands::dispatch(
226 self,
227 &request.command,
228 request.params.as_ref(),
229 ).await
230 };
231 let _ = response_tx.send(response);
232 }
233 }
234 }
235
236 info!("RX event loop stopped (channel closed)");
237 Ok(())
238 }
239
240 async fn drain_packet_rx(
241 &mut self,
242 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
243 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
244 first_packet: Option<ReceivedPacket>,
245 budget: usize,
246 ) -> usize {
247 let mut drained = 0usize;
248 if let Some(packet) = first_packet {
249 self.process_packet(packet).await;
250 drained = 1;
251 }
252
253 // Drain remaining ready inbound packets in a tight loop before
254 // yielding back to select! Every yield is a scheduler hop, and at
255 // line rate transports typically have several packets available per
256 // wake. Caps at a batch boundary so other branches eventually get a
257 // turn even under sustained load.
258 while drained < budget {
259 if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
260 self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
261 .await;
262 }
263 match packet_rx.try_recv() {
264 Ok(packet) => {
265 self.process_packet(packet).await;
266 drained += 1;
267 }
268 Err(_) => break,
269 }
270 }
271
272 if drained > 0 {
273 // One trailing fallback drain so the last bounced packets of the
274 // burst aren't held up by the post-burst send flush.
275 self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
276 .await;
277 // Flush any batched sends triggered by inbound packets (e.g.
278 // forwarded SessionDatagrams, MMP reports, tree announces).
279 self.flush_pending_sends().await;
280 }
281 drained
282 }
283
284 async fn run_rx_loop_maintenance_tick(&mut self) {
285 self.check_timeouts();
286 let now_ms = Self::now_ms();
287 self.reload_peer_acl();
288 self.poll_pending_connects().await;
289 self.poll_nostr_discovery().await;
290 self.poll_lan_discovery().await;
291 self.resend_pending_handshakes(now_ms).await;
292 self.resend_pending_rekeys(now_ms).await;
293 self.resend_pending_session_handshakes(now_ms).await;
294 self.purge_idle_sessions(now_ms);
295 self.purge_learned_routes(now_ms);
296 self.process_pending_retries(now_ms).await;
297 self.check_tree_state().await;
298 self.check_bloom_state().await;
299 self.compute_mesh_size();
300 self.record_stats_history();
301 self.check_mmp_reports().await;
302 self.check_session_mmp_reports().await;
303 self.check_link_heartbeats().await;
304 self.check_rekey().await;
305 self.check_session_rekey().await;
306 self.check_pending_lookups(now_ms).await;
307 self.poll_transport_discovery().await;
308 self.sample_transport_congestion();
309 self.activate_connected_udp_sessions().await;
310 }
311
312 /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
313 /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
314 /// flag byte the worker captured into `DecryptFallback::fmp_flags`
315 /// (without this both ECN CE propagation and spin-bit RTT
316 /// observation are dropped on the worker path) and slices the
317 /// plaintext out of the original wire buffer with zero allocation.
318 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
319 match event {
320 DecryptWorkerEvent::Plaintext(fallback) => {
321 self.process_decrypt_fallback(fallback).await;
322 }
323 DecryptWorkerEvent::DecryptFailure(report) => {
324 self.process_decrypt_failure_report(report).await;
325 }
326 }
327 }
328
329 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
330 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
331 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
332 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
333 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
334 self.process_authentic_fmp_plaintext(
335 &fallback.source_node_addr,
336 fallback.transport_id,
337 &fallback.remote_addr,
338 fallback.timestamp_ms,
339 fallback.packet_len,
340 fallback.fmp_counter,
341 ce_flag,
342 sp_flag,
343 plaintext,
344 )
345 .await;
346 }
347
348 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
349 debug!(
350 peer = %self.peer_display_name(&report.source_node_addr),
351 counter = report.fmp_counter,
352 replay_highest = report.fmp_replay_highest,
353 "Worker FMP AEAD decryption failed"
354 );
355 self.handle_decrypt_failure_report(&report).await;
356 }
357
358 /// Drain up to `budget` queued fallbacks without yielding back to
359 /// `select!`. Returns the number processed. Called both from the
360 /// promoted-fallback select arm (after the head item) and
361 /// interleaved inside the packet_rx drain loop so bounced FMP
362 /// plaintexts can't accumulate behind a 256-packet inbound burst.
363 async fn drain_decrypt_fallback(
364 &mut self,
365 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
366 budget: usize,
367 ) -> usize {
368 let mut drained = 0;
369 while drained < budget {
370 match rx.try_recv() {
371 Ok(event) => {
372 self.process_decrypt_worker_event(event).await;
373 drained += 1;
374 }
375 Err(_) => break,
376 }
377 }
378 drained
379 }
380
381 /// Flush any pending batched sends across all transports. Today
382 /// every transport's `flush_pending_send` is a no-op — the UDP
383 /// transport's per-transport `pending_send` buffer was removed
384 /// when the bulk data path moved into `encrypt_worker` (which
385 /// does its own target-grouped `sendmmsg(2)` directly). The
386 /// call sites are retained so any future batched transport can
387 /// opt in by overriding `flush_pending_send` without touching
388 /// the rx_loop.
389 async fn flush_pending_sends(&self) {
390 for transport in self.transports.values() {
391 if matches!(transport, TransportHandle::Udp(_)) {
392 transport.flush_pending_send().await;
393 }
394 }
395 }
396
397 /// Process a single received packet.
398 ///
399 /// Dispatches based on the phase field in the 4-byte common prefix.
400 async fn process_packet(&mut self, packet: ReceivedPacket) {
401 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
402 crate::perf_profile::record_since(
403 crate::perf_profile::Stage::TransportQueueWait,
404 packet.trace_enqueued_at,
405 );
406 if packet.data.len() < COMMON_PREFIX_SIZE {
407 return; // Drop packets too short for common prefix
408 }
409
410 let prefix = match CommonPrefix::parse(&packet.data) {
411 Some(p) => p,
412 None => return, // Malformed prefix
413 };
414 if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
415 debug!(
416 transport_id = %packet.transport_id,
417 remote_addr = %packet.remote_addr,
418 bytes = packet.data.len(),
419 phase = prefix.phase,
420 version = prefix.version,
421 "FMP handshake packet dispatch"
422 );
423 } else {
424 trace!(
425 transport_id = %packet.transport_id,
426 remote_addr = %packet.remote_addr,
427 bytes = packet.data.len(),
428 phase = prefix.phase,
429 version = prefix.version,
430 "FMP packet dispatch"
431 );
432 }
433
434 if prefix.version != FMP_VERSION {
435 debug!(
436 version = prefix.version,
437 transport_id = %packet.transport_id,
438 "Unknown FMP version, dropping"
439 );
440
441 // If the packet arrived on an adopted Nostr-NAT bootstrap
442 // transport, the originating peer is necessarily on a
443 // different FMP-protocol version than us — the discovery
444 // sweep would otherwise re-traverse them every cycle even
445 // though no msg1/msg2 exchange can ever succeed. Bump the
446 // discovery-layer cooldown to the long protocol-mismatch
447 // window and emit a single WARN per fresh observation.
448 if self.bootstrap_transports.contains(&packet.transport_id)
449 && let Some(npub) = self
450 .bootstrap_transport_npubs
451 .get(&packet.transport_id)
452 .cloned()
453 && let Some(handle) = self.nostr_discovery_handle()
454 {
455 let now_ms = Self::now_ms();
456 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
457 if handle.record_protocol_mismatch(&npub, now_ms) {
458 warn!(
459 peer_npub = %npub,
460 transport_id = %packet.transport_id,
461 peer_version = prefix.version,
462 our_version = FMP_VERSION,
463 cooldown_secs,
464 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
465 );
466 }
467 }
468 return;
469 }
470
471 match prefix.phase {
472 PHASE_ESTABLISHED => {
473 self.handle_encrypted_frame(packet).await;
474 }
475 PHASE_MSG1 => {
476 self.handle_msg1(packet).await;
477 }
478 PHASE_MSG2 => {
479 self.handle_msg2(packet).await;
480 }
481 _ => {
482 debug!(
483 phase = prefix.phase,
484 transport_id = %packet.transport_id,
485 "Unknown FMP phase, dropping"
486 );
487 }
488 }
489 }
490}