fips_core/node/handlers/rx_loop.rs
1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
6use crate::node::wire::{
7 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
8 PHASE_MSG2,
9};
10use crate::node::{Node, NodeError};
11use crate::transport::ReceivedPacket;
12use crate::transport::TransportHandle;
13use std::time::Duration;
14use tokio::sync::mpsc::UnboundedReceiver;
15use tracing::{debug, info, warn};
16
17/// How often the raw-packet drain loop yields a slice of work to the
18/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
19/// progress steady under sustained inbound bursts.
20const FALLBACK_INTERLEAVE_EVERY: usize = 32;
21/// Cap on the per-interleave fallback drain so a hot inbound spike
22/// can't starve the outer raw-packet drain in the opposite direction.
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
24
25impl Node {
26 /// Run the receive event loop.
27 ///
28 /// Processes packets from all transports, dispatching based on
29 /// the phase field in the 4-byte common prefix:
30 /// - Phase 0x0: Encrypted frame (session data)
31 /// - Phase 0x1: Handshake message 1 (initiator -> responder)
32 /// - Phase 0x2: Handshake message 2 (responder -> initiator)
33 ///
34 /// Also processes outbound IPv6 packets from the TUN reader for session
35 /// encapsulation and routing through the mesh.
36 ///
37 /// Also processes DNS-resolved identities for identity cache population.
38 ///
39 /// Also runs a periodic tick (1s) to clean up stale handshake connections
40 /// that never received a response. This prevents resource leaks when peers
41 /// are unreachable.
42 ///
43 /// This method takes ownership of the packet_rx channel and runs
44 /// until the channel is closed (typically when stop() is called).
45 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
46 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
47
48 // Take the TUN outbound receiver, or create a dummy channel that never
49 // produces messages (when TUN is disabled). Holding the sender prevents
50 // the channel from closing.
51 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
52 Some(rx) => (rx, None),
53 None => {
54 let (tx, rx) = tokio::sync::mpsc::channel(1);
55 (rx, Some(tx))
56 }
57 };
58
59 // Take the DNS identity receiver, or create a dummy channel (when DNS
60 // is disabled). Same pattern as TUN outbound.
61 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
62 Some(rx) => (rx, None),
63 None => {
64 let (tx, rx) = tokio::sync::mpsc::channel(1);
65 (rx, Some(tx))
66 }
67 };
68
69 // Take the endpoint-data command receiver, or create a dummy channel
70 // when the embedded endpoint API is not in use.
71 let (mut endpoint_command_rx, _endpoint_command_guard) =
72 match self.endpoint_command_rx.take() {
73 Some(rx) => (rx, None),
74 None => {
75 let (tx, rx) = tokio::sync::mpsc::channel(1);
76 (rx, Some(tx))
77 }
78 };
79
80 // Take the decrypt worker fallback receiver if a worker pool
81 // is in use. The worker pushes non-fast-path packets (anything
82 // that's not bulk EndpointData) here for the legacy dispatch.
83 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
84 match self.decrypt_fallback_rx.take() {
85 Some(rx) => (rx, None),
86 None => {
87 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
88 (rx, Some(tx))
89 }
90 };
91
92 let mut tick =
93 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
94
95 // Set up control socket channel
96 let (control_tx, mut control_rx) =
97 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
98
99 if self.config.node.control.enabled {
100 let config = self.config.node.control.clone();
101 let tx = control_tx.clone();
102 tokio::spawn(async move {
103 match ControlSocket::bind(&config) {
104 Ok(socket) => {
105 socket.accept_loop(tx).await;
106 }
107 Err(e) => {
108 warn!(error = %e, "Failed to bind control socket");
109 }
110 }
111 });
112 }
113 // Drop unused sender to avoid keeping channel open if control is disabled
114 drop(control_tx);
115
116 info!("RX event loop started");
117 // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
118 crate::perf_profile::maybe_spawn_reporter();
119
120 loop {
121 tokio::select! {
122 biased;
123 // Decrypt-worker fallback drains FIRST. The previous
124 // ordering put `packet_rx` first, which under sustained
125 // inbound bursts let the raw-packet drain (up to 256
126 // packets + flush) starve fallback work for tens of
127 // milliseconds. UDP throughput tolerates that; TCP
128 // doesn't — late FMP plaintexts mean late ACKs,
129 // dup-ACK fast retransmits, and cwnd collapse.
130 // Reproduced on native macOS / Wi-Fi where TCP fell to
131 // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
132 // tunnel. Promoting fallback gives the kernel-side
133 // TCP machinery a fair chance to see its ACKs and
134 // keep cwnd growing.
135 Some(event) = decrypt_fallback_rx.recv() => {
136 self.process_decrypt_worker_event(event).await;
137 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
138 self.flush_pending_sends().await;
139 }
140 // Keep maintenance above the hot packet drains in this
141 // biased select. Sustained UDP/TUN traffic can otherwise
142 // keep packet branches ready long enough to delay MMP
143 // reports, rekey retries, congestion samples, and connected
144 // UDP activation until after the dataplane already looks
145 // unhealthy.
146 _ = tick.tick() => {
147 self.run_rx_loop_maintenance_tick().await;
148 }
149 packet = packet_rx.recv() => {
150 match packet {
151 Some(p) => self.process_packet(p).await,
152 None => break, // channel closed
153 }
154 // Drain remaining ready inbound packets in a tight loop
155 // before yielding back to select! — every yield is a
156 // futex hop on tokio's multi-thread scheduler, and at
157 // line rate the kernel UDP queue typically has several
158 // datagrams available per wake. Caps at a batch
159 // boundary so other branches (tick, control) eventually
160 // get a turn even under sustained load.
161 let mut drained: usize = 1; // counts the packet processed above
162 while drained < 256 {
163 if drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
164 // Interleave fallback drain so bounced FMP
165 // plaintexts (heartbeats, post-FSP-decrypt
166 // TCP ACKs, control frames) don't sit in
167 // the fallback queue for a full 256-packet
168 // burst. Without this, even the
169 // priority-first ordering above can't help
170 // once we're inside this inner loop.
171 self.drain_decrypt_fallback(
172 &mut decrypt_fallback_rx,
173 FALLBACK_INTERLEAVE_BUDGET,
174 )
175 .await;
176 }
177 match packet_rx.try_recv() {
178 Ok(p) => {
179 self.process_packet(p).await;
180 drained += 1;
181 }
182 Err(_) => break,
183 }
184 }
185 // One trailing fallback drain so the last bounced
186 // packets of the burst aren't held up by the
187 // post-burst send flush.
188 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 256)
189 .await;
190 // Flush any batched sends triggered by inbound
191 // packets (e.g. forwarded SessionDatagrams, MMP
192 // reports, tree announces).
193 self.flush_pending_sends().await;
194 }
195 Some(ipv6_packet) = tun_outbound_rx.recv() => {
196 self.handle_tun_outbound(ipv6_packet).await;
197 let mut drained = 0;
198 while drained < 256 {
199 match tun_outbound_rx.try_recv() {
200 Ok(p) => {
201 self.handle_tun_outbound(p).await;
202 drained += 1;
203 }
204 Err(_) => break,
205 }
206 }
207 // Flush any trailing batched sends so the last
208 // packets of a burst don't sit in the per-transport
209 // sendmmsg buffer waiting for the threshold.
210 self.flush_pending_sends().await;
211 }
212 Some(identity) = dns_identity_rx.recv() => {
213 debug!(
214 node_addr = %identity.node_addr,
215 "Registering identity from DNS resolution"
216 );
217 self.register_identity(identity.node_addr, identity.pubkey);
218 }
219 Some(command) = endpoint_command_rx.recv() => {
220 self.handle_endpoint_data_command(command).await;
221 // Same drain pattern: when the application is shoving
222 // tunnel data in via send_oneway, several Send commands
223 // typically queue up between scheduler hops.
224 let mut drained = 0;
225 while drained < 256 {
226 match endpoint_command_rx.try_recv() {
227 Ok(c) => {
228 self.handle_endpoint_data_command(c).await;
229 drained += 1;
230 }
231 Err(_) => break,
232 }
233 }
234 // Flush any trailing batched sends from the
235 // per-transport sendmmsg buffer.
236 self.flush_pending_sends().await;
237 }
238 Some((request, response_tx)) = control_rx.recv() => {
239 let response = if request.command.starts_with("show_") {
240 queries::dispatch(self, &request.command, request.params.as_ref())
241 } else {
242 commands::dispatch(
243 self,
244 &request.command,
245 request.params.as_ref(),
246 ).await
247 };
248 let _ = response_tx.send(response);
249 }
250 }
251 }
252
253 info!("RX event loop stopped (channel closed)");
254 Ok(())
255 }
256
257 async fn run_rx_loop_maintenance_tick(&mut self) {
258 self.check_timeouts();
259 let now_ms = Self::now_ms();
260 self.reload_peer_acl();
261 self.poll_pending_connects().await;
262 self.poll_nostr_discovery().await;
263 self.poll_lan_discovery().await;
264 self.resend_pending_handshakes(now_ms).await;
265 self.resend_pending_rekeys(now_ms).await;
266 self.resend_pending_session_handshakes(now_ms).await;
267 self.purge_idle_sessions(now_ms);
268 self.purge_learned_routes(now_ms);
269 self.process_pending_retries(now_ms).await;
270 self.check_tree_state().await;
271 self.check_bloom_state().await;
272 self.compute_mesh_size();
273 self.record_stats_history();
274 self.check_mmp_reports().await;
275 self.check_session_mmp_reports().await;
276 self.check_link_heartbeats().await;
277 self.check_rekey().await;
278 self.check_session_rekey().await;
279 self.check_pending_lookups(now_ms).await;
280 self.poll_transport_discovery().await;
281 self.sample_transport_congestion();
282 self.activate_connected_udp_sessions().await;
283 }
284
285 /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
286 /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
287 /// flag byte the worker captured into `DecryptFallback::fmp_flags`
288 /// (without this both ECN CE propagation and spin-bit RTT
289 /// observation are dropped on the worker path) and slices the
290 /// plaintext out of the original wire buffer with zero allocation.
291 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
292 match event {
293 DecryptWorkerEvent::Plaintext(fallback) => {
294 self.process_decrypt_fallback(fallback).await;
295 }
296 DecryptWorkerEvent::DecryptFailure(report) => {
297 self.process_decrypt_failure_report(report).await;
298 }
299 }
300 }
301
302 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
303 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
304 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
305 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
306 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
307 self.process_authentic_fmp_plaintext(
308 &fallback.source_node_addr,
309 fallback.transport_id,
310 &fallback.remote_addr,
311 fallback.timestamp_ms,
312 fallback.packet_len,
313 fallback.fmp_counter,
314 ce_flag,
315 sp_flag,
316 plaintext,
317 )
318 .await;
319 }
320
321 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
322 debug!(
323 peer = %self.peer_display_name(&report.source_node_addr),
324 counter = report.fmp_counter,
325 replay_highest = report.fmp_replay_highest,
326 "Worker FMP AEAD decryption failed"
327 );
328 self.handle_decrypt_failure_report(&report).await;
329 }
330
331 /// Drain up to `budget` queued fallbacks without yielding back to
332 /// `select!`. Returns the number processed. Called both from the
333 /// promoted-fallback select arm (after the head item) and
334 /// interleaved inside the packet_rx drain loop so bounced FMP
335 /// plaintexts can't accumulate behind a 256-packet inbound burst.
336 async fn drain_decrypt_fallback(
337 &mut self,
338 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
339 budget: usize,
340 ) -> usize {
341 let mut drained = 0;
342 while drained < budget {
343 match rx.try_recv() {
344 Ok(event) => {
345 self.process_decrypt_worker_event(event).await;
346 drained += 1;
347 }
348 Err(_) => break,
349 }
350 }
351 drained
352 }
353
354 /// Flush any pending batched sends across all transports. Today
355 /// every transport's `flush_pending_send` is a no-op — the UDP
356 /// transport's per-transport `pending_send` buffer was removed
357 /// when the bulk data path moved into `encrypt_worker` (which
358 /// does its own target-grouped `sendmmsg(2)` directly). The
359 /// call sites are retained so any future batched transport can
360 /// opt in by overriding `flush_pending_send` without touching
361 /// the rx_loop.
362 async fn flush_pending_sends(&self) {
363 for transport in self.transports.values() {
364 if matches!(transport, TransportHandle::Udp(_)) {
365 transport.flush_pending_send().await;
366 }
367 }
368 }
369
370 /// Process a single received packet.
371 ///
372 /// Dispatches based on the phase field in the 4-byte common prefix.
373 async fn process_packet(&mut self, packet: ReceivedPacket) {
374 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
375 crate::perf_profile::record_since(
376 crate::perf_profile::Stage::TransportQueueWait,
377 packet.trace_enqueued_at,
378 );
379 if packet.data.len() < COMMON_PREFIX_SIZE {
380 return; // Drop packets too short for common prefix
381 }
382
383 let prefix = match CommonPrefix::parse(&packet.data) {
384 Some(p) => p,
385 None => return, // Malformed prefix
386 };
387
388 if prefix.version != FMP_VERSION {
389 debug!(
390 version = prefix.version,
391 transport_id = %packet.transport_id,
392 "Unknown FMP version, dropping"
393 );
394
395 // If the packet arrived on an adopted Nostr-NAT bootstrap
396 // transport, the originating peer is necessarily on a
397 // different FMP-protocol version than us — the discovery
398 // sweep would otherwise re-traverse them every cycle even
399 // though no msg1/msg2 exchange can ever succeed. Bump the
400 // discovery-layer cooldown to the long protocol-mismatch
401 // window and emit a single WARN per fresh observation.
402 if self.bootstrap_transports.contains(&packet.transport_id)
403 && let Some(npub) = self
404 .bootstrap_transport_npubs
405 .get(&packet.transport_id)
406 .cloned()
407 && let Some(handle) = self.nostr_discovery_handle()
408 {
409 let now_ms = Self::now_ms();
410 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
411 if handle.record_protocol_mismatch(&npub, now_ms) {
412 warn!(
413 peer_npub = %npub,
414 transport_id = %packet.transport_id,
415 peer_version = prefix.version,
416 our_version = FMP_VERSION,
417 cooldown_secs,
418 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
419 );
420 }
421 }
422 return;
423 }
424
425 match prefix.phase {
426 PHASE_ESTABLISHED => {
427 self.handle_encrypted_frame(packet).await;
428 }
429 PHASE_MSG1 => {
430 self.handle_msg1(packet).await;
431 }
432 PHASE_MSG2 => {
433 self.handle_msg2(packet).await;
434 }
435 _ => {
436 debug!(
437 phase = prefix.phase,
438 transport_id = %packet.transport_id,
439 "Unknown FMP phase, dropping"
440 );
441 }
442 }
443 }
444}