1use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9 PHASE_MSG2,
10};
11use crate::node::{Node, NodeEndpointCommand, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use crate::upper::tun::TunOutboundRx;
15use std::time::{Duration, Instant};
16use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
17use tracing::{debug, info, trace, warn};
18
19const FALLBACK_INTERLEAVE_EVERY: usize = 32;
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
26const PACKET_DRAIN_BUDGET: usize = 256;
27const RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
28const RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT: Duration = Duration::from_millis(10);
29const RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW: Duration = Duration::from_secs(2);
30const RX_LOOP_FAULT_MAX_DELAY_MS: u64 = 5_000;
31
32fn rx_loop_slow_maintenance_fault_delay() -> Option<Duration> {
33 let raw = std::env::var("FIPS_FAULT_INJECT_RX_LOOP_SLOW_MAINTENANCE_MS").ok()?;
34 let ms = raw
35 .trim()
36 .parse::<u64>()
37 .ok()?
38 .min(RX_LOOP_FAULT_MAX_DELAY_MS);
39 (ms > 0).then(|| Duration::from_millis(ms))
40}
41
42impl Node {
43 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
63 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
64
65 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
69 Some(rx) => (rx, None),
70 None => {
71 let (tx, rx) = tokio::sync::mpsc::channel(1);
72 (rx, Some(tx))
73 }
74 };
75
76 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
79 Some(rx) => (rx, None),
80 None => {
81 let (tx, rx) = tokio::sync::mpsc::channel(1);
82 (rx, Some(tx))
83 }
84 };
85
86 let (mut endpoint_priority_command_rx, _endpoint_priority_command_guard) =
89 match self.endpoint_priority_command_rx.take() {
90 Some(rx) => (rx, None),
91 None => {
92 let (tx, rx) = tokio::sync::mpsc::channel(1);
93 (rx, Some(tx))
94 }
95 };
96 let (mut endpoint_command_rx, _endpoint_command_guard) =
97 match self.endpoint_command_rx.take() {
98 Some(rx) => (rx, None),
99 None => {
100 let (tx, rx) = tokio::sync::mpsc::channel(1);
101 (rx, Some(tx))
102 }
103 };
104
105 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
109 match self.decrypt_fallback_rx.take() {
110 Some(rx) => (rx, None),
111 None => {
112 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
113 (rx, Some(tx))
114 }
115 };
116
117 let mut tick =
118 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
119 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
120 let mut last_data_activity = None::<Instant>;
121 let mut slow_maintenance_timed_out_under_data = false;
122
123 let (control_tx, mut control_rx) =
125 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
126
127 if self.config.node.control.enabled {
128 let config = self.config.node.control.clone();
129 let tx = control_tx.clone();
130 tokio::spawn(async move {
131 match ControlSocket::bind(&config) {
132 Ok(socket) => {
133 socket.accept_loop(tx).await;
134 }
135 Err(e) => {
136 warn!(error = %e, "Failed to bind control socket");
137 }
138 }
139 });
140 }
141 drop(control_tx);
143
144 info!("RX event loop started");
145 crate::perf_profile::maybe_spawn_reporter();
147
148 loop {
149 tokio::select! {
150 biased;
151 Some(event) = decrypt_fallback_rx.recv() => {
164 self.process_decrypt_worker_event(event).await;
165 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
166 last_data_activity = Some(Instant::now());
167 self.flush_pending_sends().await;
168 }
169 packet = packet_rx.recv() => {
170 match packet {
171 Some(p) => {
172 let drained = self.drain_packet_rx(
173 &mut packet_rx,
174 &mut decrypt_fallback_rx,
175 Some(p),
176 PACKET_DRAIN_BUDGET,
177 ).await;
178 if drained > 0 {
179 last_data_activity = Some(Instant::now());
180 }
181 }
182 None => break, }
184 }
185 _ = tick.tick() => {
186 let (drained_packets, drained_tun, drained_endpoint) = self.drain_rx_loop_data_queues(
187 &mut packet_rx,
188 &mut decrypt_fallback_rx,
189 &mut tun_outbound_rx,
190 &mut endpoint_priority_command_rx,
191 &mut endpoint_command_rx,
192 PACKET_DRAIN_BUDGET,
193 ).await;
194 let drained = drained_packets + drained_tun + drained_endpoint;
195 if drained > 0 {
196 last_data_activity = Some(Instant::now());
197 debug!(
198 drained,
199 drained_packets,
200 drained_tun,
201 drained_endpoint,
202 "Drained queued packets before rx-loop maintenance"
203 );
204 }
205 let recent_data_activity = last_data_activity
206 .is_some_and(|t| t.elapsed() <= RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW);
207 let data_pressure = drained > 0 || recent_data_activity;
208 if !data_pressure {
209 slow_maintenance_timed_out_under_data = false;
210 }
211
212 let slow_timed_out = self.run_rx_loop_maintenance_tick(
213 data_pressure,
214 data_pressure && slow_maintenance_timed_out_under_data,
215 ).await;
216 if slow_timed_out && data_pressure {
217 slow_maintenance_timed_out_under_data = true;
218 }
219
220 let (post_drained_packets, post_drained_tun, post_drained_endpoint) = self.drain_rx_loop_data_queues(
221 &mut packet_rx,
222 &mut decrypt_fallback_rx,
223 &mut tun_outbound_rx,
224 &mut endpoint_priority_command_rx,
225 &mut endpoint_command_rx,
226 PACKET_DRAIN_BUDGET,
227 ).await;
228 let post_drained = post_drained_packets + post_drained_tun + post_drained_endpoint;
229 if post_drained > 0 {
230 last_data_activity = Some(Instant::now());
231 debug!(
232 drained = post_drained,
233 drained_packets = post_drained_packets,
234 drained_tun = post_drained_tun,
235 drained_endpoint = post_drained_endpoint,
236 "Drained queued packets after rx-loop maintenance"
237 );
238 }
239 }
240 Some(ipv6_packet) = tun_outbound_rx.recv() => {
241 let drained = self.drain_tun_outbound(
242 &mut tun_outbound_rx,
243 Some(ipv6_packet),
244 PACKET_DRAIN_BUDGET,
245 ).await;
246 if drained > 0 {
247 last_data_activity = Some(Instant::now());
248 }
249 }
250 Some(identity) = dns_identity_rx.recv() => {
251 debug!(
252 node_addr = %identity.node_addr,
253 "Registering identity from DNS resolution"
254 );
255 self.register_identity(identity.node_addr, identity.pubkey);
256 }
257 Some(command) = endpoint_priority_command_rx.recv() => {
258 let drained = self.drain_endpoint_commands(
259 &mut endpoint_priority_command_rx,
260 &mut endpoint_command_rx,
261 Some(command),
262 None,
263 PACKET_DRAIN_BUDGET,
264 ).await;
265 if drained > 0 {
266 last_data_activity = Some(Instant::now());
267 }
268 }
269 Some(command) = endpoint_command_rx.recv() => {
270 let drained = self.drain_endpoint_commands(
271 &mut endpoint_priority_command_rx,
272 &mut endpoint_command_rx,
273 None,
274 Some(command),
275 PACKET_DRAIN_BUDGET,
276 ).await;
277 if drained > 0 {
278 last_data_activity = Some(Instant::now());
279 }
280 }
281 Some((request, response_tx)) = control_rx.recv() => {
282 let response = if request.command.starts_with("show_") {
283 queries::dispatch(self, &request.command, request.params.as_ref())
284 } else {
285 commands::dispatch(
286 self,
287 &request.command,
288 request.params.as_ref(),
289 ).await
290 };
291 let _ = response_tx.send(response);
292 }
293 }
294 }
295
296 info!("RX event loop stopped (channel closed)");
297 Ok(())
298 }
299
300 async fn drain_rx_loop_data_queues(
301 &mut self,
302 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
303 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
304 tun_outbound_rx: &mut TunOutboundRx,
305 endpoint_priority_command_rx: &mut Receiver<NodeEndpointCommand>,
306 endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
307 budget: usize,
308 ) -> (usize, usize, usize) {
309 let drained_packets = self
310 .drain_packet_rx(packet_rx, decrypt_fallback_rx, None, budget)
311 .await;
312 let drained_tun = self.drain_tun_outbound(tun_outbound_rx, None, budget).await;
313 let drained_endpoint = self
314 .drain_endpoint_commands(
315 endpoint_priority_command_rx,
316 endpoint_command_rx,
317 None,
318 None,
319 budget,
320 )
321 .await;
322 (drained_packets, drained_tun, drained_endpoint)
323 }
324
325 async fn drain_packet_rx(
326 &mut self,
327 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
328 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
329 first_packet: Option<ReceivedPacket>,
330 budget: usize,
331 ) -> usize {
332 let mut drained = 0usize;
333 if let Some(packet) = first_packet {
334 self.process_packet(packet).await;
335 drained = 1;
336 }
337
338 while drained < budget {
344 if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
345 self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
346 .await;
347 }
348 match packet_rx.try_recv() {
349 Ok(packet) => {
350 self.process_packet(packet).await;
351 drained += 1;
352 }
353 Err(_) => break,
354 }
355 }
356
357 if drained > 0 {
358 self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
361 .await;
362 self.flush_pending_sends().await;
365 }
366 drained
367 }
368
369 async fn drain_tun_outbound(
370 &mut self,
371 tun_outbound_rx: &mut TunOutboundRx,
372 first_packet: Option<Vec<u8>>,
373 budget: usize,
374 ) -> usize {
375 let mut drained = 0usize;
376 if let Some(packet) = first_packet {
377 self.handle_tun_outbound(packet).await;
378 drained = 1;
379 }
380
381 while drained < budget {
382 match tun_outbound_rx.try_recv() {
383 Ok(packet) => {
384 self.handle_tun_outbound(packet).await;
385 drained += 1;
386 }
387 Err(_) => break,
388 }
389 }
390
391 if drained > 0 {
392 self.flush_pending_sends().await;
393 }
394 drained
395 }
396
397 async fn drain_endpoint_commands(
398 &mut self,
399 endpoint_priority_command_rx: &mut Receiver<NodeEndpointCommand>,
400 endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
401 first_priority_command: Option<NodeEndpointCommand>,
402 first_bulk_command: Option<NodeEndpointCommand>,
403 budget: usize,
404 ) -> usize {
405 let mut first_bulk_command = first_bulk_command;
406 let mut drained = 0usize;
407 if let Some(command) = first_priority_command {
408 self.handle_endpoint_data_command(command).await;
409 drained = 1;
410 }
411
412 while drained < budget {
413 let Some(command) = try_recv_endpoint_command(
414 endpoint_priority_command_rx,
415 endpoint_command_rx,
416 &mut first_bulk_command,
417 ) else {
418 break;
419 };
420 self.handle_endpoint_data_command(command).await;
421 drained += 1;
422 }
423
424 if drained > 0 {
425 self.flush_pending_sends().await;
426 }
427 drained
428 }
429
430 async fn run_rx_loop_maintenance_tick(
431 &mut self,
432 data_pressure: bool,
433 skip_slow_maintenance: bool,
434 ) -> bool {
435 self.check_timeouts();
436 let now_ms = Self::now_ms();
437 self.check_link_heartbeats().await;
441 self.reload_peer_acl();
442 self.resend_pending_handshakes(now_ms).await;
443 self.resend_pending_rekeys(now_ms).await;
444 self.resend_pending_session_handshakes(now_ms).await;
445 self.resend_pending_session_msg3(now_ms).await;
446 self.purge_idle_sessions(now_ms);
447 self.purge_learned_routes(now_ms);
448 self.check_mmp_reports().await;
449 self.check_session_mmp_reports().await;
450 self.check_rekey().await;
451 self.check_session_rekey().await;
452 self.check_pending_lookups(now_ms).await;
453 self.poll_pending_connects().await;
454 self.process_pending_retries(now_ms).await;
455 self.poll_transport_discovery().await;
456 self.activate_connected_udp_sessions().await;
457 self.sample_transport_congestion();
458
459 if skip_slow_maintenance {
460 return false;
461 }
462
463 let slow_timeout = if data_pressure {
464 RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT
465 } else {
466 RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT
467 };
468
469 if tokio::time::timeout(slow_timeout, self.run_rx_loop_slow_maintenance_tick())
470 .await
471 .is_err()
472 {
473 self.mark_rx_loop_maintenance_timeout();
474 warn!(
475 timeout_ms = slow_timeout.as_millis() as u64,
476 data_pressure, "RX loop slow maintenance timed out; continuing packet processing"
477 );
478 return true;
479 }
480 false
481 }
482
483 async fn run_rx_loop_slow_maintenance_tick(&mut self) {
484 if let Some(delay) = rx_loop_slow_maintenance_fault_delay() {
485 tokio::time::sleep(delay).await;
486 }
487
488 self.poll_nostr_discovery().await;
493 self.poll_lan_discovery().await;
494 self.poll_local_instance_discovery().await;
495 self.check_tree_state().await;
496 self.check_bloom_state().await;
497 self.compute_mesh_size();
498 self.record_stats_history();
499 }
500
501 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
508 match event {
509 DecryptWorkerEvent::Plaintext(fallback) => {
510 self.process_decrypt_fallback(fallback).await;
511 }
512 DecryptWorkerEvent::DecryptFailure(report) => {
513 self.process_decrypt_failure_report(report).await;
514 }
515 }
516 }
517
518 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
519 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
520 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
521 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
522 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
523 self.process_authentic_fmp_plaintext(
524 &fallback.source_node_addr,
525 fallback.transport_id,
526 &fallback.remote_addr,
527 fallback.timestamp_ms,
528 fallback.packet_len,
529 fallback.fmp_counter,
530 ce_flag,
531 sp_flag,
532 plaintext,
533 )
534 .await;
535 }
536
537 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
538 debug!(
539 peer = %self.peer_display_name(&report.source_node_addr),
540 counter = report.fmp_counter,
541 replay_highest = report.fmp_replay_highest,
542 "Worker FMP AEAD decryption failed"
543 );
544 self.handle_decrypt_failure_report(&report).await;
545 }
546
547 async fn drain_decrypt_fallback(
553 &mut self,
554 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
555 budget: usize,
556 ) -> usize {
557 let mut drained = 0;
558 while drained < budget {
559 match rx.try_recv() {
560 Ok(event) => {
561 self.process_decrypt_worker_event(event).await;
562 drained += 1;
563 }
564 Err(_) => break,
565 }
566 }
567 drained
568 }
569
570 async fn flush_pending_sends(&self) {
579 for transport in self.transports.values() {
580 if matches!(transport, TransportHandle::Udp(_)) {
581 transport.flush_pending_send().await;
582 }
583 }
584 }
585
586 pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
590 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
591 crate::perf_profile::record_since(
592 crate::perf_profile::Stage::TransportQueueWait,
593 packet.trace_enqueued_at,
594 );
595 if is_punch_packet(&packet.data) {
596 trace!(
597 transport_id = %packet.transport_id,
598 remote_addr = %packet.remote_addr,
599 bytes = packet.data.len(),
600 "Dropping stray punch probe/ack in FMP rx loop"
601 );
602 return;
603 }
604 if packet.data.len() < COMMON_PREFIX_SIZE {
605 return; }
607
608 let prefix = match CommonPrefix::parse(&packet.data) {
609 Some(p) => p,
610 None => return, };
612 if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
613 debug!(
614 transport_id = %packet.transport_id,
615 remote_addr = %packet.remote_addr,
616 bytes = packet.data.len(),
617 phase = prefix.phase,
618 version = prefix.version,
619 "FMP handshake packet dispatch"
620 );
621 } else {
622 trace!(
623 transport_id = %packet.transport_id,
624 remote_addr = %packet.remote_addr,
625 bytes = packet.data.len(),
626 phase = prefix.phase,
627 version = prefix.version,
628 "FMP packet dispatch"
629 );
630 }
631
632 if prefix.version != FMP_VERSION {
633 debug!(
634 version = prefix.version,
635 transport_id = %packet.transport_id,
636 "Unknown FMP version, dropping"
637 );
638
639 let looks_like_fmp_phase =
647 matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
648 if looks_like_fmp_phase
649 && self.bootstrap_transports.contains(&packet.transport_id)
650 && let Some(npub) = self
651 .bootstrap_transport_npubs
652 .get(&packet.transport_id)
653 .cloned()
654 && let Some(handle) = self.nostr_discovery_handle()
655 {
656 let now_ms = Self::now_ms();
657 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
658 if handle.record_protocol_mismatch(&npub, now_ms) {
659 warn!(
660 peer_npub = %npub,
661 transport_id = %packet.transport_id,
662 peer_version = prefix.version,
663 our_version = FMP_VERSION,
664 cooldown_secs,
665 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
666 );
667 }
668 }
669 return;
670 }
671
672 match prefix.phase {
673 PHASE_ESTABLISHED => {
674 self.handle_encrypted_frame(packet).await;
675 }
676 PHASE_MSG1 => {
677 self.handle_msg1(packet).await;
678 }
679 PHASE_MSG2 => {
680 self.handle_msg2(packet).await;
681 }
682 _ => {
683 debug!(
684 phase = prefix.phase,
685 transport_id = %packet.transport_id,
686 "Unknown FMP phase, dropping"
687 );
688 }
689 }
690 }
691}
692
693fn try_recv_endpoint_command<T>(
694 priority_rx: &mut Receiver<T>,
695 bulk_rx: &mut Receiver<T>,
696 first_bulk: &mut Option<T>,
697) -> Option<T> {
698 priority_rx
699 .try_recv()
700 .ok()
701 .or_else(|| first_bulk.take())
702 .or_else(|| bulk_rx.try_recv().ok())
703}
704
705#[cfg(test)]
706mod tests {
707 use super::try_recv_endpoint_command;
708
709 #[tokio::test]
710 async fn endpoint_command_drain_prefers_ready_priority_over_selected_bulk() {
711 let (priority_tx, mut priority_rx) = tokio::sync::mpsc::channel(4);
712 let (bulk_tx, mut bulk_rx) = tokio::sync::mpsc::channel(4);
713
714 priority_tx.send("priority").await.unwrap();
715 bulk_tx.send("bulk-queued").await.unwrap();
716 let mut selected_bulk = Some("bulk-selected");
717
718 assert_eq!(
719 try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
720 Some("priority")
721 );
722 assert_eq!(
723 try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
724 Some("bulk-selected")
725 );
726 assert_eq!(
727 try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
728 Some("bulk-queued")
729 );
730 assert_eq!(
731 try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
732 None
733 );
734 }
735}