fips_core/node/handlers/rx_loop.rs
1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
6use crate::node::wire::{
7 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
8 PHASE_MSG2,
9};
10use crate::node::{Node, NodeError};
11use crate::transport::ReceivedPacket;
12use crate::transport::TransportHandle;
13use std::time::Duration;
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, info, trace, warn};
16
17/// How often the raw-packet drain loop yields a slice of work to the
18/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
19/// progress steady under sustained inbound bursts.
20const FALLBACK_INTERLEAVE_EVERY: usize = 32;
21/// Cap on the per-interleave fallback drain so a hot inbound spike
22/// can't starve the outer raw-packet drain in the opposite direction.
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
24const PACKET_DRAIN_BUDGET: usize = 256;
25const RX_LOOP_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(500);
26
27impl Node {
28 /// Run the receive event loop.
29 ///
30 /// Processes packets from all transports, dispatching based on
31 /// the phase field in the 4-byte common prefix:
32 /// - Phase 0x0: Encrypted frame (session data)
33 /// - Phase 0x1: Handshake message 1 (initiator -> responder)
34 /// - Phase 0x2: Handshake message 2 (responder -> initiator)
35 ///
36 /// Also processes outbound IPv6 packets from the TUN reader for session
37 /// encapsulation and routing through the mesh.
38 ///
39 /// Also processes DNS-resolved identities for identity cache population.
40 ///
41 /// Also runs a periodic tick (1s) to clean up stale handshake connections
42 /// that never received a response. This prevents resource leaks when peers
43 /// are unreachable.
44 ///
45 /// This method takes ownership of the packet_rx channel and runs
46 /// until the channel is closed (typically when stop() is called).
47 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
48 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
49
50 // Take the TUN outbound receiver, or create a dummy channel that never
51 // produces messages (when TUN is disabled). Holding the sender prevents
52 // the channel from closing.
53 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
54 Some(rx) => (rx, None),
55 None => {
56 let (tx, rx) = tokio::sync::mpsc::channel(1);
57 (rx, Some(tx))
58 }
59 };
60
61 // Take the DNS identity receiver, or create a dummy channel (when DNS
62 // is disabled). Same pattern as TUN outbound.
63 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
64 Some(rx) => (rx, None),
65 None => {
66 let (tx, rx) = tokio::sync::mpsc::channel(1);
67 (rx, Some(tx))
68 }
69 };
70
71 // Take the endpoint-data command receiver, or create a dummy channel
72 // when the embedded endpoint API is not in use.
73 let (mut endpoint_command_rx, _endpoint_command_guard) =
74 match self.endpoint_command_rx.take() {
75 Some(rx) => (rx, None),
76 None => {
77 let (tx, rx) = tokio::sync::mpsc::channel(1);
78 (rx, Some(tx))
79 }
80 };
81
82 // Take the decrypt worker fallback receiver if a worker pool
83 // is in use. The worker pushes non-fast-path packets (anything
84 // that's not bulk EndpointData) here for the legacy dispatch.
85 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
86 match self.decrypt_fallback_rx.take() {
87 Some(rx) => (rx, None),
88 None => {
89 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
90 (rx, Some(tx))
91 }
92 };
93
94 let mut tick =
95 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
96
97 // Set up control socket channel
98 let (control_tx, mut control_rx) =
99 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
100
101 if self.config.node.control.enabled {
102 let config = self.config.node.control.clone();
103 let tx = control_tx.clone();
104 tokio::spawn(async move {
105 match ControlSocket::bind(&config) {
106 Ok(socket) => {
107 socket.accept_loop(tx).await;
108 }
109 Err(e) => {
110 warn!(error = %e, "Failed to bind control socket");
111 }
112 }
113 });
114 }
115 // Drop unused sender to avoid keeping channel open if control is disabled
116 drop(control_tx);
117
118 info!("RX event loop started");
119 // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
120 crate::perf_profile::maybe_spawn_reporter();
121
122 loop {
123 tokio::select! {
124 biased;
125 // Decrypt-worker fallback drains FIRST. The previous
126 // ordering put `packet_rx` first, which under sustained
127 // inbound bursts let the raw-packet drain (up to 256
128 // packets + flush) starve fallback work for tens of
129 // milliseconds. UDP throughput tolerates that; TCP
130 // doesn't — late FMP plaintexts mean late ACKs,
131 // dup-ACK fast retransmits, and cwnd collapse.
132 // Reproduced on native macOS / Wi-Fi where TCP fell to
133 // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
134 // tunnel. Promoting fallback gives the kernel-side
135 // TCP machinery a fair chance to see its ACKs and
136 // keep cwnd growing.
137 Some(event) = decrypt_fallback_rx.recv() => {
138 self.process_decrypt_worker_event(event).await;
139 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
140 self.flush_pending_sends().await;
141 }
142 packet = packet_rx.recv() => {
143 match packet {
144 Some(p) => {
145 self.drain_packet_rx(
146 &mut packet_rx,
147 &mut decrypt_fallback_rx,
148 Some(p),
149 PACKET_DRAIN_BUDGET,
150 ).await;
151 }
152 None => break, // channel closed
153 }
154 }
155 _ = tick.tick() => {
156 let drained = self.drain_packet_rx(
157 &mut packet_rx,
158 &mut decrypt_fallback_rx,
159 None,
160 PACKET_DRAIN_BUDGET,
161 ).await;
162 if drained > 0 {
163 debug!(
164 drained,
165 "Drained queued packets before rx-loop maintenance"
166 );
167 }
168 if tokio::time::timeout(
169 RX_LOOP_MAINTENANCE_TIMEOUT,
170 self.run_rx_loop_maintenance_tick(),
171 ).await.is_err() {
172 warn!(
173 timeout_ms = RX_LOOP_MAINTENANCE_TIMEOUT.as_millis() as u64,
174 "RX loop maintenance timed out; continuing packet processing"
175 );
176 }
177 }
178 Some(ipv6_packet) = tun_outbound_rx.recv() => {
179 self.handle_tun_outbound(ipv6_packet).await;
180 let mut drained = 0;
181 while drained < 256 {
182 match tun_outbound_rx.try_recv() {
183 Ok(p) => {
184 self.handle_tun_outbound(p).await;
185 drained += 1;
186 }
187 Err(_) => break,
188 }
189 }
190 // Flush any trailing batched sends so the last
191 // packets of a burst don't sit in the per-transport
192 // sendmmsg buffer waiting for the threshold.
193 self.flush_pending_sends().await;
194 }
195 Some(identity) = dns_identity_rx.recv() => {
196 debug!(
197 node_addr = %identity.node_addr,
198 "Registering identity from DNS resolution"
199 );
200 self.register_identity(identity.node_addr, identity.pubkey);
201 }
202 Some(command) = endpoint_command_rx.recv() => {
203 self.handle_endpoint_data_command(command).await;
204 // Same drain pattern: when the application is shoving
205 // tunnel data in via send_oneway, several Send commands
206 // typically queue up between scheduler hops.
207 let mut drained = 0;
208 while drained < 256 {
209 match endpoint_command_rx.try_recv() {
210 Ok(c) => {
211 self.handle_endpoint_data_command(c).await;
212 drained += 1;
213 }
214 Err(_) => break,
215 }
216 }
217 // Flush any trailing batched sends from the
218 // per-transport sendmmsg buffer.
219 self.flush_pending_sends().await;
220 }
221 Some((request, response_tx)) = control_rx.recv() => {
222 let response = if request.command.starts_with("show_") {
223 queries::dispatch(self, &request.command, request.params.as_ref())
224 } else {
225 commands::dispatch(
226 self,
227 &request.command,
228 request.params.as_ref(),
229 ).await
230 };
231 let _ = response_tx.send(response);
232 }
233 }
234 }
235
236 info!("RX event loop stopped (channel closed)");
237 Ok(())
238 }
239
240 async fn drain_packet_rx(
241 &mut self,
242 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
243 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
244 first_packet: Option<ReceivedPacket>,
245 budget: usize,
246 ) -> usize {
247 let mut drained = 0usize;
248 if let Some(packet) = first_packet {
249 self.process_packet(packet).await;
250 drained = 1;
251 }
252
253 // Drain remaining ready inbound packets in a tight loop before
254 // yielding back to select! Every yield is a scheduler hop, and at
255 // line rate transports typically have several packets available per
256 // wake. Caps at a batch boundary so other branches eventually get a
257 // turn even under sustained load.
258 while drained < budget {
259 if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
260 self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
261 .await;
262 }
263 match packet_rx.try_recv() {
264 Ok(packet) => {
265 self.process_packet(packet).await;
266 drained += 1;
267 }
268 Err(_) => break,
269 }
270 }
271
272 if drained > 0 {
273 // One trailing fallback drain so the last bounced packets of the
274 // burst aren't held up by the post-burst send flush.
275 self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
276 .await;
277 // Flush any batched sends triggered by inbound packets (e.g.
278 // forwarded SessionDatagrams, MMP reports, tree announces).
279 self.flush_pending_sends().await;
280 }
281 drained
282 }
283
284 async fn run_rx_loop_maintenance_tick(&mut self) {
285 self.check_timeouts();
286 let now_ms = Self::now_ms();
287 self.reload_peer_acl();
288 self.poll_pending_connects().await;
289 self.poll_nostr_discovery().await;
290 self.poll_lan_discovery().await;
291 self.poll_local_instance_discovery().await;
292 self.resend_pending_handshakes(now_ms).await;
293 self.resend_pending_rekeys(now_ms).await;
294 self.resend_pending_session_handshakes(now_ms).await;
295 self.resend_pending_session_msg3(now_ms).await;
296 self.purge_idle_sessions(now_ms);
297 self.purge_learned_routes(now_ms);
298 self.process_pending_retries(now_ms).await;
299 self.check_tree_state().await;
300 self.check_bloom_state().await;
301 self.compute_mesh_size();
302 self.record_stats_history();
303 self.check_mmp_reports().await;
304 self.check_session_mmp_reports().await;
305 self.check_link_heartbeats().await;
306 self.check_rekey().await;
307 self.check_session_rekey().await;
308 self.check_pending_lookups(now_ms).await;
309 self.poll_transport_discovery().await;
310 self.sample_transport_congestion();
311 self.activate_connected_udp_sessions().await;
312 }
313
314 /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
315 /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
316 /// flag byte the worker captured into `DecryptFallback::fmp_flags`
317 /// (without this both ECN CE propagation and spin-bit RTT
318 /// observation are dropped on the worker path) and slices the
319 /// plaintext out of the original wire buffer with zero allocation.
320 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
321 match event {
322 DecryptWorkerEvent::Plaintext(fallback) => {
323 self.process_decrypt_fallback(fallback).await;
324 }
325 DecryptWorkerEvent::DecryptFailure(report) => {
326 self.process_decrypt_failure_report(report).await;
327 }
328 }
329 }
330
331 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
332 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
333 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
334 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
335 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
336 self.process_authentic_fmp_plaintext(
337 &fallback.source_node_addr,
338 fallback.transport_id,
339 &fallback.remote_addr,
340 fallback.timestamp_ms,
341 fallback.packet_len,
342 fallback.fmp_counter,
343 ce_flag,
344 sp_flag,
345 plaintext,
346 )
347 .await;
348 }
349
350 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
351 debug!(
352 peer = %self.peer_display_name(&report.source_node_addr),
353 counter = report.fmp_counter,
354 replay_highest = report.fmp_replay_highest,
355 "Worker FMP AEAD decryption failed"
356 );
357 self.handle_decrypt_failure_report(&report).await;
358 }
359
360 /// Drain up to `budget` queued fallbacks without yielding back to
361 /// `select!`. Returns the number processed. Called both from the
362 /// promoted-fallback select arm (after the head item) and
363 /// interleaved inside the packet_rx drain loop so bounced FMP
364 /// plaintexts can't accumulate behind a 256-packet inbound burst.
365 async fn drain_decrypt_fallback(
366 &mut self,
367 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
368 budget: usize,
369 ) -> usize {
370 let mut drained = 0;
371 while drained < budget {
372 match rx.try_recv() {
373 Ok(event) => {
374 self.process_decrypt_worker_event(event).await;
375 drained += 1;
376 }
377 Err(_) => break,
378 }
379 }
380 drained
381 }
382
383 /// Flush any pending batched sends across all transports. Today
384 /// every transport's `flush_pending_send` is a no-op — the UDP
385 /// transport's per-transport `pending_send` buffer was removed
386 /// when the bulk data path moved into `encrypt_worker` (which
387 /// does its own target-grouped `sendmmsg(2)` directly). The
388 /// call sites are retained so any future batched transport can
389 /// opt in by overriding `flush_pending_send` without touching
390 /// the rx_loop.
391 async fn flush_pending_sends(&self) {
392 for transport in self.transports.values() {
393 if matches!(transport, TransportHandle::Udp(_)) {
394 transport.flush_pending_send().await;
395 }
396 }
397 }
398
399 /// Process a single received packet.
400 ///
401 /// Dispatches based on the phase field in the 4-byte common prefix.
402 async fn process_packet(&mut self, packet: ReceivedPacket) {
403 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
404 crate::perf_profile::record_since(
405 crate::perf_profile::Stage::TransportQueueWait,
406 packet.trace_enqueued_at,
407 );
408 if packet.data.len() < COMMON_PREFIX_SIZE {
409 return; // Drop packets too short for common prefix
410 }
411
412 let prefix = match CommonPrefix::parse(&packet.data) {
413 Some(p) => p,
414 None => return, // Malformed prefix
415 };
416 if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
417 debug!(
418 transport_id = %packet.transport_id,
419 remote_addr = %packet.remote_addr,
420 bytes = packet.data.len(),
421 phase = prefix.phase,
422 version = prefix.version,
423 "FMP handshake packet dispatch"
424 );
425 } else {
426 trace!(
427 transport_id = %packet.transport_id,
428 remote_addr = %packet.remote_addr,
429 bytes = packet.data.len(),
430 phase = prefix.phase,
431 version = prefix.version,
432 "FMP packet dispatch"
433 );
434 }
435
436 if prefix.version != FMP_VERSION {
437 debug!(
438 version = prefix.version,
439 transport_id = %packet.transport_id,
440 "Unknown FMP version, dropping"
441 );
442
443 // If the packet arrived on an adopted Nostr-NAT bootstrap
444 // transport, the originating peer is necessarily on a
445 // different FMP-protocol version than us — the discovery
446 // sweep would otherwise re-traverse them every cycle even
447 // though no msg1/msg2 exchange can ever succeed. Bump the
448 // discovery-layer cooldown to the long protocol-mismatch
449 // window and emit a single WARN per fresh observation.
450 if self.bootstrap_transports.contains(&packet.transport_id)
451 && let Some(npub) = self
452 .bootstrap_transport_npubs
453 .get(&packet.transport_id)
454 .cloned()
455 && let Some(handle) = self.nostr_discovery_handle()
456 {
457 let now_ms = Self::now_ms();
458 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
459 if handle.record_protocol_mismatch(&npub, now_ms) {
460 warn!(
461 peer_npub = %npub,
462 transport_id = %packet.transport_id,
463 peer_version = prefix.version,
464 our_version = FMP_VERSION,
465 cooldown_secs,
466 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
467 );
468 }
469 }
470 return;
471 }
472
473 match prefix.phase {
474 PHASE_ESTABLISHED => {
475 self.handle_encrypted_frame(packet).await;
476 }
477 PHASE_MSG1 => {
478 self.handle_msg1(packet).await;
479 }
480 PHASE_MSG2 => {
481 self.handle_msg2(packet).await;
482 }
483 _ => {
484 debug!(
485 phase = prefix.phase,
486 transport_id = %packet.transport_id,
487 "Unknown FMP phase, dropping"
488 );
489 }
490 }
491 }
492}