1use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8 COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9 PHASE_MSG2,
10};
11use crate::node::{Node, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use std::time::Duration;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tracing::{debug, info, trace, warn};
17
18const FALLBACK_INTERLEAVE_EVERY: usize = 32;
22const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
25const PACKET_DRAIN_BUDGET: usize = 256;
26const RX_LOOP_MAINTENANCE_TIMEOUT: Duration = Duration::from_millis(500);
27
28impl Node {
29 pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
49 let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
50
51 let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
55 Some(rx) => (rx, None),
56 None => {
57 let (tx, rx) = tokio::sync::mpsc::channel(1);
58 (rx, Some(tx))
59 }
60 };
61
62 let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
65 Some(rx) => (rx, None),
66 None => {
67 let (tx, rx) = tokio::sync::mpsc::channel(1);
68 (rx, Some(tx))
69 }
70 };
71
72 let (mut endpoint_command_rx, _endpoint_command_guard) =
75 match self.endpoint_command_rx.take() {
76 Some(rx) => (rx, None),
77 None => {
78 let (tx, rx) = tokio::sync::mpsc::channel(1);
79 (rx, Some(tx))
80 }
81 };
82
83 let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
87 match self.decrypt_fallback_rx.take() {
88 Some(rx) => (rx, None),
89 None => {
90 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
91 (rx, Some(tx))
92 }
93 };
94
95 let mut tick =
96 tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
97
98 let (control_tx, mut control_rx) =
100 tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
101
102 if self.config.node.control.enabled {
103 let config = self.config.node.control.clone();
104 let tx = control_tx.clone();
105 tokio::spawn(async move {
106 match ControlSocket::bind(&config) {
107 Ok(socket) => {
108 socket.accept_loop(tx).await;
109 }
110 Err(e) => {
111 warn!(error = %e, "Failed to bind control socket");
112 }
113 }
114 });
115 }
116 drop(control_tx);
118
119 info!("RX event loop started");
120 crate::perf_profile::maybe_spawn_reporter();
122
123 loop {
124 tokio::select! {
125 biased;
126 Some(event) = decrypt_fallback_rx.recv() => {
139 self.process_decrypt_worker_event(event).await;
140 self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
141 self.flush_pending_sends().await;
142 }
143 packet = packet_rx.recv() => {
144 match packet {
145 Some(p) => {
146 self.drain_packet_rx(
147 &mut packet_rx,
148 &mut decrypt_fallback_rx,
149 Some(p),
150 PACKET_DRAIN_BUDGET,
151 ).await;
152 }
153 None => break, }
155 }
156 _ = tick.tick() => {
157 let drained = self.drain_packet_rx(
158 &mut packet_rx,
159 &mut decrypt_fallback_rx,
160 None,
161 PACKET_DRAIN_BUDGET,
162 ).await;
163 if drained > 0 {
164 debug!(
165 drained,
166 "Drained queued packets before rx-loop maintenance"
167 );
168 }
169 if tokio::time::timeout(
170 RX_LOOP_MAINTENANCE_TIMEOUT,
171 self.run_rx_loop_maintenance_tick(),
172 ).await.is_err() {
173 warn!(
174 timeout_ms = RX_LOOP_MAINTENANCE_TIMEOUT.as_millis() as u64,
175 "RX loop maintenance timed out; continuing packet processing"
176 );
177 }
178 }
179 Some(ipv6_packet) = tun_outbound_rx.recv() => {
180 self.handle_tun_outbound(ipv6_packet).await;
181 let mut drained = 0;
182 while drained < 256 {
183 match tun_outbound_rx.try_recv() {
184 Ok(p) => {
185 self.handle_tun_outbound(p).await;
186 drained += 1;
187 }
188 Err(_) => break,
189 }
190 }
191 self.flush_pending_sends().await;
195 }
196 Some(identity) = dns_identity_rx.recv() => {
197 debug!(
198 node_addr = %identity.node_addr,
199 "Registering identity from DNS resolution"
200 );
201 self.register_identity(identity.node_addr, identity.pubkey);
202 }
203 Some(command) = endpoint_command_rx.recv() => {
204 self.handle_endpoint_data_command(command).await;
205 let mut drained = 0;
209 while drained < 256 {
210 match endpoint_command_rx.try_recv() {
211 Ok(c) => {
212 self.handle_endpoint_data_command(c).await;
213 drained += 1;
214 }
215 Err(_) => break,
216 }
217 }
218 self.flush_pending_sends().await;
221 }
222 Some((request, response_tx)) = control_rx.recv() => {
223 let response = if request.command.starts_with("show_") {
224 queries::dispatch(self, &request.command, request.params.as_ref())
225 } else {
226 commands::dispatch(
227 self,
228 &request.command,
229 request.params.as_ref(),
230 ).await
231 };
232 let _ = response_tx.send(response);
233 }
234 }
235 }
236
237 info!("RX event loop stopped (channel closed)");
238 Ok(())
239 }
240
241 async fn drain_packet_rx(
242 &mut self,
243 packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
244 decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
245 first_packet: Option<ReceivedPacket>,
246 budget: usize,
247 ) -> usize {
248 let mut drained = 0usize;
249 if let Some(packet) = first_packet {
250 self.process_packet(packet).await;
251 drained = 1;
252 }
253
254 while drained < budget {
260 if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
261 self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
262 .await;
263 }
264 match packet_rx.try_recv() {
265 Ok(packet) => {
266 self.process_packet(packet).await;
267 drained += 1;
268 }
269 Err(_) => break,
270 }
271 }
272
273 if drained > 0 {
274 self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
277 .await;
278 self.flush_pending_sends().await;
281 }
282 drained
283 }
284
285 async fn run_rx_loop_maintenance_tick(&mut self) {
286 self.check_timeouts();
287 let now_ms = Self::now_ms();
288 self.reload_peer_acl();
289 self.poll_pending_connects().await;
290 self.poll_nostr_discovery().await;
291 self.poll_lan_discovery().await;
292 self.poll_local_instance_discovery().await;
293 self.resend_pending_handshakes(now_ms).await;
294 self.resend_pending_rekeys(now_ms).await;
295 self.resend_pending_session_handshakes(now_ms).await;
296 self.resend_pending_session_msg3(now_ms).await;
297 self.purge_idle_sessions(now_ms);
298 self.purge_learned_routes(now_ms);
299 self.process_pending_retries(now_ms).await;
300 self.check_tree_state().await;
301 self.check_bloom_state().await;
302 self.compute_mesh_size();
303 self.record_stats_history();
304 self.check_mmp_reports().await;
305 self.check_session_mmp_reports().await;
306 self.check_link_heartbeats().await;
307 self.check_rekey().await;
308 self.check_session_rekey().await;
309 self.check_pending_lookups(now_ms).await;
310 self.poll_transport_discovery().await;
311 self.sample_transport_congestion();
312 self.activate_connected_udp_sessions().await;
313 }
314
315 async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
322 match event {
323 DecryptWorkerEvent::Plaintext(fallback) => {
324 self.process_decrypt_fallback(fallback).await;
325 }
326 DecryptWorkerEvent::DecryptFailure(report) => {
327 self.process_decrypt_failure_report(report).await;
328 }
329 }
330 }
331
332 async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
333 let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
334 let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
335 let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
336 ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
337 self.process_authentic_fmp_plaintext(
338 &fallback.source_node_addr,
339 fallback.transport_id,
340 &fallback.remote_addr,
341 fallback.timestamp_ms,
342 fallback.packet_len,
343 fallback.fmp_counter,
344 ce_flag,
345 sp_flag,
346 plaintext,
347 )
348 .await;
349 }
350
351 async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
352 debug!(
353 peer = %self.peer_display_name(&report.source_node_addr),
354 counter = report.fmp_counter,
355 replay_highest = report.fmp_replay_highest,
356 "Worker FMP AEAD decryption failed"
357 );
358 self.handle_decrypt_failure_report(&report).await;
359 }
360
361 async fn drain_decrypt_fallback(
367 &mut self,
368 rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
369 budget: usize,
370 ) -> usize {
371 let mut drained = 0;
372 while drained < budget {
373 match rx.try_recv() {
374 Ok(event) => {
375 self.process_decrypt_worker_event(event).await;
376 drained += 1;
377 }
378 Err(_) => break,
379 }
380 }
381 drained
382 }
383
384 async fn flush_pending_sends(&self) {
393 for transport in self.transports.values() {
394 if matches!(transport, TransportHandle::Udp(_)) {
395 transport.flush_pending_send().await;
396 }
397 }
398 }
399
400 pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
404 let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
405 crate::perf_profile::record_since(
406 crate::perf_profile::Stage::TransportQueueWait,
407 packet.trace_enqueued_at,
408 );
409 if is_punch_packet(&packet.data) {
410 trace!(
411 transport_id = %packet.transport_id,
412 remote_addr = %packet.remote_addr,
413 bytes = packet.data.len(),
414 "Dropping stray punch probe/ack in FMP rx loop"
415 );
416 return;
417 }
418 if packet.data.len() < COMMON_PREFIX_SIZE {
419 return; }
421
422 let prefix = match CommonPrefix::parse(&packet.data) {
423 Some(p) => p,
424 None => return, };
426 if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
427 debug!(
428 transport_id = %packet.transport_id,
429 remote_addr = %packet.remote_addr,
430 bytes = packet.data.len(),
431 phase = prefix.phase,
432 version = prefix.version,
433 "FMP handshake packet dispatch"
434 );
435 } else {
436 trace!(
437 transport_id = %packet.transport_id,
438 remote_addr = %packet.remote_addr,
439 bytes = packet.data.len(),
440 phase = prefix.phase,
441 version = prefix.version,
442 "FMP packet dispatch"
443 );
444 }
445
446 if prefix.version != FMP_VERSION {
447 debug!(
448 version = prefix.version,
449 transport_id = %packet.transport_id,
450 "Unknown FMP version, dropping"
451 );
452
453 let looks_like_fmp_phase =
461 matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
462 if looks_like_fmp_phase
463 && self.bootstrap_transports.contains(&packet.transport_id)
464 && let Some(npub) = self
465 .bootstrap_transport_npubs
466 .get(&packet.transport_id)
467 .cloned()
468 && let Some(handle) = self.nostr_discovery_handle()
469 {
470 let now_ms = Self::now_ms();
471 let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
472 if handle.record_protocol_mismatch(&npub, now_ms) {
473 warn!(
474 peer_npub = %npub,
475 transport_id = %packet.transport_id,
476 peer_version = prefix.version,
477 our_version = FMP_VERSION,
478 cooldown_secs,
479 "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
480 );
481 }
482 }
483 return;
484 }
485
486 match prefix.phase {
487 PHASE_ESTABLISHED => {
488 self.handle_encrypted_frame(packet).await;
489 }
490 PHASE_MSG1 => {
491 self.handle_msg1(packet).await;
492 }
493 PHASE_MSG2 => {
494 self.handle_msg2(packet).await;
495 }
496 _ => {
497 debug!(
498 phase = prefix.phase,
499 transport_id = %packet.transport_id,
500 "Unknown FMP phase, dropping"
501 );
502 }
503 }
504 }
505}