1use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9 PHASE_MSG2,
10};
11use crate::node::{Node, NodeEndpointCommand, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use crate::upper::tun::TunOutboundRx;
15use std::time::{Duration, Instant};
16use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
17use tracing::{debug, info, trace, warn};
18
19const FALLBACK_INTERLEAVE_EVERY: usize = 32;
23const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
26const PACKET_DRAIN_BUDGET: usize = 256;
27const RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
28const RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT: Duration = Duration::from_millis(10);
29const RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW: Duration = Duration::from_secs(2);
30const RX_LOOP_FAULT_MAX_DELAY_MS: u64 = 5_000;
31
32fn rx_loop_slow_maintenance_fault_delay() -> Option<Duration> {
33 let raw = std::env::var("FIPS_FAULT_INJECT_RX_LOOP_SLOW_MAINTENANCE_MS").ok()?;
34 let ms = raw
35 .trim()
36 .parse::<u64>()
37 .ok()?
38 .min(RX_LOOP_FAULT_MAX_DELAY_MS);
39 (ms > 0).then(|| Duration::from_millis(ms))
40}
41
42impl Node {
43 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
63 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
64
65 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
69 Some(rx) => (rx, None),
70 None => {
71 let (tx, rx) = tokio::sync::mpsc::channel(1);
72 (rx, Some(tx))
73 }
74 };
75
76 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
79 Some(rx) => (rx, None),
80 None => {
81 let (tx, rx) = tokio::sync::mpsc::channel(1);
82 (rx, Some(tx))
83 }
84 };
85
86 let (mut endpoint_command_rx, _endpoint_command_guard) =
89 match self.endpoint_command_rx.take() {
90 Some(rx) => (rx, None),
91 None => {
92 let (tx, rx) = tokio::sync::mpsc::channel(1);
93 (rx, Some(tx))
94 }
95 };
96
97 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
101 match self.decrypt_fallback_rx.take() {
102 Some(rx) => (rx, None),
103 None => {
104 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
105 (rx, Some(tx))
106 }
107 };
108
109 let mut tick =
110 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
111 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
112 let mut last_data_activity = None::<Instant>;
113 let mut slow_maintenance_timed_out_under_data = false;
114
115 let (control_tx, mut control_rx) =
117 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
118
119 if self.config.node.control.enabled {
120 let config = self.config.node.control.clone();
121 let tx = control_tx.clone();
122 tokio::spawn(async move {
123 match ControlSocket::bind(&config) {
124 Ok(socket) => {
125 socket.accept_loop(tx).await;
126 }
127 Err(e) => {
128 warn!(error = %e, "Failed to bind control socket");
129 }
130 }
131 });
132 }
133 drop(control_tx);
135
136 info!("RX event loop started");
137 crate::perf_profile::maybe_spawn_reporter();
139
140 loop {
141 tokio::select! {
142 biased;
143 Some(event) = decrypt_fallback_rx.recv() => {
156 self.process_decrypt_worker_event(event).await;
157 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
158 last_data_activity = Some(Instant::now());
159 self.flush_pending_sends().await;
160 }
161 packet = packet_rx.recv() => {
162 match packet {
163 Some(p) => {
164 let drained = self.drain_packet_rx(
165 &mut packet_rx,
166 &mut decrypt_fallback_rx,
167 Some(p),
168 PACKET_DRAIN_BUDGET,
169 ).await;
170 if drained > 0 {
171 last_data_activity = Some(Instant::now());
172 }
173 }
174 None => break, }
176 }
177 _ = tick.tick() => {
178 let (drained_packets, drained_tun, drained_endpoint) = self.drain_rx_loop_data_queues(
179 &mut packet_rx,
180 &mut decrypt_fallback_rx,
181 &mut tun_outbound_rx,
182 &mut endpoint_command_rx,
183 PACKET_DRAIN_BUDGET,
184 ).await;
185 let drained = drained_packets + drained_tun + drained_endpoint;
186 if drained > 0 {
187 last_data_activity = Some(Instant::now());
188 debug!(
189 drained,
190 drained_packets,
191 drained_tun,
192 drained_endpoint,
193 "Drained queued packets before rx-loop maintenance"
194 );
195 }
196 let recent_data_activity = last_data_activity
197 .is_some_and(|t| t.elapsed() <= RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW);
198 let data_pressure = drained > 0 || recent_data_activity;
199 if !data_pressure {
200 slow_maintenance_timed_out_under_data = false;
201 }
202
203 let slow_timed_out = self.run_rx_loop_maintenance_tick(
204 data_pressure,
205 data_pressure && slow_maintenance_timed_out_under_data,
206 ).await;
207 if slow_timed_out && data_pressure {
208 slow_maintenance_timed_out_under_data = true;
209 }
210
211 let (post_drained_packets, post_drained_tun, post_drained_endpoint) = self.drain_rx_loop_data_queues(
212 &mut packet_rx,
213 &mut decrypt_fallback_rx,
214 &mut tun_outbound_rx,
215 &mut endpoint_command_rx,
216 PACKET_DRAIN_BUDGET,
217 ).await;
218 let post_drained = post_drained_packets + post_drained_tun + post_drained_endpoint;
219 if post_drained > 0 {
220 last_data_activity = Some(Instant::now());
221 debug!(
222 drained = post_drained,
223 drained_packets = post_drained_packets,
224 drained_tun = post_drained_tun,
225 drained_endpoint = post_drained_endpoint,
226 "Drained queued packets after rx-loop maintenance"
227 );
228 }
229 }
230 Some(ipv6_packet) = tun_outbound_rx.recv() => {
231 let drained = self.drain_tun_outbound(
232 &mut tun_outbound_rx,
233 Some(ipv6_packet),
234 PACKET_DRAIN_BUDGET,
235 ).await;
236 if drained > 0 {
237 last_data_activity = Some(Instant::now());
238 }
239 }
240 Some(identity) = dns_identity_rx.recv() => {
241 debug!(
242 node_addr = %identity.node_addr,
243 "Registering identity from DNS resolution"
244 );
245 self.register_identity(identity.node_addr, identity.pubkey);
246 }
247 Some(command) = endpoint_command_rx.recv() => {
248 let drained = self.drain_endpoint_commands(
249 &mut endpoint_command_rx,
250 Some(command),
251 PACKET_DRAIN_BUDGET,
252 ).await;
253 if drained > 0 {
254 last_data_activity = Some(Instant::now());
255 }
256 }
257 Some((request, response_tx)) = control_rx.recv() => {
258 let response = if request.command.starts_with("show_") {
259 queries::dispatch(self, &request.command, request.params.as_ref())
260 } else {
261 commands::dispatch(
262 self,
263 &request.command,
264 request.params.as_ref(),
265 ).await
266 };
267 let _ = response_tx.send(response);
268 }
269 }
270 }
271
272 info!("RX event loop stopped (channel closed)");
273 Ok(())
274 }
275
276 async fn drain_rx_loop_data_queues(
277 &mut self,
278 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
279 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
280 tun_outbound_rx: &mut TunOutboundRx,
281 endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
282 budget: usize,
283 ) -> (usize, usize, usize) {
284 let drained_packets = self
285 .drain_packet_rx(packet_rx, decrypt_fallback_rx, None, budget)
286 .await;
287 let drained_tun = self.drain_tun_outbound(tun_outbound_rx, None, budget).await;
288 let drained_endpoint = self
289 .drain_endpoint_commands(endpoint_command_rx, None, budget)
290 .await;
291 (drained_packets, drained_tun, drained_endpoint)
292 }
293
294 async fn drain_packet_rx(
295 &mut self,
296 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
297 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
298 first_packet: Option<ReceivedPacket>,
299 budget: usize,
300 ) -> usize {
301 let mut drained = 0usize;
302 if let Some(packet) = first_packet {
303 self.process_packet(packet).await;
304 drained = 1;
305 }
306
307 while drained < budget {
313 if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
314 self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
315 .await;
316 }
317 match packet_rx.try_recv() {
318 Ok(packet) => {
319 self.process_packet(packet).await;
320 drained += 1;
321 }
322 Err(_) => break,
323 }
324 }
325
326 if drained > 0 {
327 self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
330 .await;
331 self.flush_pending_sends().await;
334 }
335 drained
336 }
337
338 async fn drain_tun_outbound(
339 &mut self,
340 tun_outbound_rx: &mut TunOutboundRx,
341 first_packet: Option<Vec<u8>>,
342 budget: usize,
343 ) -> usize {
344 let mut drained = 0usize;
345 if let Some(packet) = first_packet {
346 self.handle_tun_outbound(packet).await;
347 drained = 1;
348 }
349
350 while drained < budget {
351 match tun_outbound_rx.try_recv() {
352 Ok(packet) => {
353 self.handle_tun_outbound(packet).await;
354 drained += 1;
355 }
356 Err(_) => break,
357 }
358 }
359
360 if drained > 0 {
361 self.flush_pending_sends().await;
362 }
363 drained
364 }
365
366 async fn drain_endpoint_commands(
367 &mut self,
368 endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
369 first_command: Option<NodeEndpointCommand>,
370 budget: usize,
371 ) -> usize {
372 let mut drained = 0usize;
373 if let Some(command) = first_command {
374 self.handle_endpoint_data_command(command).await;
375 drained = 1;
376 }
377
378 while drained < budget {
379 match endpoint_command_rx.try_recv() {
380 Ok(command) => {
381 self.handle_endpoint_data_command(command).await;
382 drained += 1;
383 }
384 Err(_) => break,
385 }
386 }
387
388 if drained > 0 {
389 self.flush_pending_sends().await;
390 }
391 drained
392 }
393
394 async fn run_rx_loop_maintenance_tick(
395 &mut self,
396 data_pressure: bool,
397 skip_slow_maintenance: bool,
398 ) -> bool {
399 self.check_timeouts();
400 let now_ms = Self::now_ms();
401 self.check_link_heartbeats().await;
405 self.reload_peer_acl();
406 self.resend_pending_handshakes(now_ms).await;
407 self.resend_pending_rekeys(now_ms).await;
408 self.resend_pending_session_handshakes(now_ms).await;
409 self.resend_pending_session_msg3(now_ms).await;
410 self.purge_idle_sessions(now_ms);
411 self.purge_learned_routes(now_ms);
412 self.check_mmp_reports().await;
413 self.check_session_mmp_reports().await;
414 self.check_rekey().await;
415 self.check_session_rekey().await;
416 self.check_pending_lookups(now_ms).await;
417 self.poll_pending_connects().await;
418 self.process_pending_retries(now_ms).await;
419 self.poll_transport_discovery().await;
420 self.activate_connected_udp_sessions().await;
421 self.sample_transport_congestion();
422
423 if skip_slow_maintenance {
424 return false;
425 }
426
427 let slow_timeout = if data_pressure {
428 RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT
429 } else {
430 RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT
431 };
432
433 if tokio::time::timeout(slow_timeout, self.run_rx_loop_slow_maintenance_tick())
434 .await
435 .is_err()
436 {
437 self.mark_rx_loop_maintenance_timeout();
438 warn!(
439 timeout_ms = slow_timeout.as_millis() as u64,
440 data_pressure, "RX loop slow maintenance timed out; continuing packet processing"
441 );
442 return true;
443 }
444 false
445 }
446
447 async fn run_rx_loop_slow_maintenance_tick(&mut self) {
448 if let Some(delay) = rx_loop_slow_maintenance_fault_delay() {
449 tokio::time::sleep(delay).await;
450 }
451
452 self.poll_nostr_discovery().await;
457 self.poll_lan_discovery().await;
458 self.poll_local_instance_discovery().await;
459 self.check_tree_state().await;
460 self.check_bloom_state().await;
461 self.compute_mesh_size();
462 self.record_stats_history();
463 }
464
465 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
472 match event {
473 DecryptWorkerEvent::Plaintext(fallback) => {
474 self.process_decrypt_fallback(fallback).await;
475 }
476 DecryptWorkerEvent::DecryptFailure(report) => {
477 self.process_decrypt_failure_report(report).await;
478 }
479 }
480 }
481
482 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
483 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
484 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
485 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
486 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
487 self.process_authentic_fmp_plaintext(
488 &fallback.source_node_addr,
489 fallback.transport_id,
490 &fallback.remote_addr,
491 fallback.timestamp_ms,
492 fallback.packet_len,
493 fallback.fmp_counter,
494 ce_flag,
495 sp_flag,
496 plaintext,
497 )
498 .await;
499 }
500
501 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
502 debug!(
503 peer = %self.peer_display_name(&report.source_node_addr),
504 counter = report.fmp_counter,
505 replay_highest = report.fmp_replay_highest,
506 "Worker FMP AEAD decryption failed"
507 );
508 self.handle_decrypt_failure_report(&report).await;
509 }
510
511 async fn drain_decrypt_fallback(
517 &mut self,
518 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
519 budget: usize,
520 ) -> usize {
521 let mut drained = 0;
522 while drained < budget {
523 match rx.try_recv() {
524 Ok(event) => {
525 self.process_decrypt_worker_event(event).await;
526 drained += 1;
527 }
528 Err(_) => break,
529 }
530 }
531 drained
532 }
533
534 async fn flush_pending_sends(&self) {
543 for transport in self.transports.values() {
544 if matches!(transport, TransportHandle::Udp(_)) {
545 transport.flush_pending_send().await;
546 }
547 }
548 }
549
550 pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
554 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
555 crate::perf_profile::record_since(
556 crate::perf_profile::Stage::TransportQueueWait,
557 packet.trace_enqueued_at,
558 );
559 if is_punch_packet(&packet.data) {
560 trace!(
561 transport_id = %packet.transport_id,
562 remote_addr = %packet.remote_addr,
563 bytes = packet.data.len(),
564 "Dropping stray punch probe/ack in FMP rx loop"
565 );
566 return;
567 }
568 if packet.data.len() < COMMON_PREFIX_SIZE {
569 return; }
571
572 let prefix = match CommonPrefix::parse(&packet.data) {
573 Some(p) => p,
574 None => return, };
576 if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
577 debug!(
578 transport_id = %packet.transport_id,
579 remote_addr = %packet.remote_addr,
580 bytes = packet.data.len(),
581 phase = prefix.phase,
582 version = prefix.version,
583 "FMP handshake packet dispatch"
584 );
585 } else {
586 trace!(
587 transport_id = %packet.transport_id,
588 remote_addr = %packet.remote_addr,
589 bytes = packet.data.len(),
590 phase = prefix.phase,
591 version = prefix.version,
592 "FMP packet dispatch"
593 );
594 }
595
596 if prefix.version != FMP_VERSION {
597 debug!(
598 version = prefix.version,
599 transport_id = %packet.transport_id,
600 "Unknown FMP version, dropping"
601 );
602
603 let looks_like_fmp_phase =
611 matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
612 if looks_like_fmp_phase
613 && self.bootstrap_transports.contains(&packet.transport_id)
614 && let Some(npub) = self
615 .bootstrap_transport_npubs
616 .get(&packet.transport_id)
617 .cloned()
618 && let Some(handle) = self.nostr_discovery_handle()
619 {
620 let now_ms = Self::now_ms();
621 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
622 if handle.record_protocol_mismatch(&npub, now_ms) {
623 warn!(
624 peer_npub = %npub,
625 transport_id = %packet.transport_id,
626 peer_version = prefix.version,
627 our_version = FMP_VERSION,
628 cooldown_secs,
629 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
630 );
631 }
632 }
633 return;
634 }
635
636 match prefix.phase {
637 PHASE_ESTABLISHED => {
638 self.handle_encrypted_frame(packet).await;
639 }
640 PHASE_MSG1 => {
641 self.handle_msg1(packet).await;
642 }
643 PHASE_MSG2 => {
644 self.handle_msg2(packet).await;
645 }
646 _ => {
647 debug!(
648 phase = prefix.phase,
649 transport_id = %packet.transport_id,
650 "Unknown FMP phase, dropping"
651 );
652 }
653 }
654 }
655}