Skip to main content

fips_core/node/handlers/
rx_loop.rs

1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8    COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9    PHASE_MSG2,
10};
11use crate::node::{Node, NodeEndpointCommand, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use crate::upper::tun::TunOutboundRx;
15use std::time::{Duration, Instant};
16use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
17use tracing::{debug, info, trace, warn};
18
19/// How often the raw-packet drain loop yields a slice of work to the
20/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
21/// progress steady under sustained inbound bursts.
22const FALLBACK_INTERLEAVE_EVERY: usize = 32;
23/// Cap on the per-interleave fallback drain so a hot inbound spike
24/// can't starve the outer raw-packet drain in the opposite direction.
25const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
26const PACKET_DRAIN_BUDGET: usize = 256;
27const RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
28const RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT: Duration = Duration::from_millis(10);
29const RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW: Duration = Duration::from_secs(2);
30const RX_LOOP_FAULT_MAX_DELAY_MS: u64 = 5_000;
31
32fn rx_loop_slow_maintenance_fault_delay() -> Option<Duration> {
33    let raw = std::env::var("FIPS_FAULT_INJECT_RX_LOOP_SLOW_MAINTENANCE_MS").ok()?;
34    let ms = raw
35        .trim()
36        .parse::<u64>()
37        .ok()?
38        .min(RX_LOOP_FAULT_MAX_DELAY_MS);
39    (ms > 0).then(|| Duration::from_millis(ms))
40}
41
42impl Node {
43    /// Run the receive event loop.
44    ///
45    /// Processes packets from all transports, dispatching based on
46    /// the phase field in the 4-byte common prefix:
47    /// - Phase 0x0: Encrypted frame (session data)
48    /// - Phase 0x1: Handshake message 1 (initiator -> responder)
49    /// - Phase 0x2: Handshake message 2 (responder -> initiator)
50    ///
51    /// Also processes outbound IPv6 packets from the TUN reader for session
52    /// encapsulation and routing through the mesh.
53    ///
54    /// Also processes DNS-resolved identities for identity cache population.
55    ///
56    /// Also runs a periodic tick (1s) to clean up stale handshake connections
57    /// that never received a response. This prevents resource leaks when peers
58    /// are unreachable.
59    ///
60    /// This method takes ownership of the packet_rx channel and runs
61    /// until the channel is closed (typically when stop() is called).
62    pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
63        let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
64
65        // Take the TUN outbound receiver, or create a dummy channel that never
66        // produces messages (when TUN is disabled). Holding the sender prevents
67        // the channel from closing.
68        let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
69            Some(rx) => (rx, None),
70            None => {
71                let (tx, rx) = tokio::sync::mpsc::channel(1);
72                (rx, Some(tx))
73            }
74        };
75
76        // Take the DNS identity receiver, or create a dummy channel (when DNS
77        // is disabled). Same pattern as TUN outbound.
78        let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
79            Some(rx) => (rx, None),
80            None => {
81                let (tx, rx) = tokio::sync::mpsc::channel(1);
82                (rx, Some(tx))
83            }
84        };
85
86        // Take the endpoint-data command receiver, or create a dummy channel
87        // when the embedded endpoint API is not in use.
88        let (mut endpoint_priority_command_rx, _endpoint_priority_command_guard) =
89            match self.endpoint_priority_command_rx.take() {
90                Some(rx) => (rx, None),
91                None => {
92                    let (tx, rx) = tokio::sync::mpsc::channel(1);
93                    (rx, Some(tx))
94                }
95            };
96        let (mut endpoint_command_rx, _endpoint_command_guard) =
97            match self.endpoint_command_rx.take() {
98                Some(rx) => (rx, None),
99                None => {
100                    let (tx, rx) = tokio::sync::mpsc::channel(1);
101                    (rx, Some(tx))
102                }
103            };
104
105        // Take the decrypt worker fallback receiver if a worker pool
106        // is in use. The worker pushes non-fast-path packets (anything
107        // that's not bulk EndpointData) here for the legacy dispatch.
108        let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
109            match self.decrypt_fallback_rx.take() {
110                Some(rx) => (rx, None),
111                None => {
112                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
113                    (rx, Some(tx))
114                }
115            };
116
117        let mut tick =
118            tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
119        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
120        let mut last_data_activity = None::<Instant>;
121        let mut slow_maintenance_timed_out_under_data = false;
122
123        // Set up control socket channel
124        let (control_tx, mut control_rx) =
125            tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
126
127        if self.config.node.control.enabled {
128            let config = self.config.node.control.clone();
129            let tx = control_tx.clone();
130            tokio::spawn(async move {
131                match ControlSocket::bind(&config) {
132                    Ok(socket) => {
133                        socket.accept_loop(tx).await;
134                    }
135                    Err(e) => {
136                        warn!(error = %e, "Failed to bind control socket");
137                    }
138                }
139            });
140        }
141        // Drop unused sender to avoid keeping channel open if control is disabled
142        drop(control_tx);
143
144        info!("RX event loop started");
145        // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
146        crate::perf_profile::maybe_spawn_reporter();
147
148        loop {
149            tokio::select! {
150                biased;
151                // Decrypt-worker fallback drains FIRST. The previous
152                // ordering put `packet_rx` first, which under sustained
153                // inbound bursts let the raw-packet drain (up to 256
154                // packets + flush) starve fallback work for tens of
155                // milliseconds. UDP throughput tolerates that; TCP
156                // doesn't — late FMP plaintexts mean late ACKs,
157                // dup-ACK fast retransmits, and cwnd collapse.
158                // Reproduced on native macOS / Wi-Fi where TCP fell to
159                // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
160                // tunnel. Promoting fallback gives the kernel-side
161                // TCP machinery a fair chance to see its ACKs and
162                // keep cwnd growing.
163                Some(event) = decrypt_fallback_rx.recv() => {
164                    self.process_decrypt_worker_event(event).await;
165                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
166                    last_data_activity = Some(Instant::now());
167                    self.flush_pending_sends().await;
168                }
169                packet = packet_rx.recv() => {
170                    match packet {
171                        Some(p) => {
172                            let drained = self.drain_packet_rx(
173                                &mut packet_rx,
174                                &mut decrypt_fallback_rx,
175                                Some(p),
176                                PACKET_DRAIN_BUDGET,
177                            ).await;
178                            if drained > 0 {
179                                last_data_activity = Some(Instant::now());
180                            }
181                        }
182                        None => break, // channel closed
183                    }
184                }
185                _ = tick.tick() => {
186                    let (drained_packets, drained_tun, drained_endpoint) = self.drain_rx_loop_data_queues(
187                        &mut packet_rx,
188                        &mut decrypt_fallback_rx,
189                        &mut tun_outbound_rx,
190                        &mut endpoint_priority_command_rx,
191                        &mut endpoint_command_rx,
192                        PACKET_DRAIN_BUDGET,
193                    ).await;
194                    let drained = drained_packets + drained_tun + drained_endpoint;
195                    if drained > 0 {
196                        last_data_activity = Some(Instant::now());
197                        debug!(
198                            drained,
199                            drained_packets,
200                            drained_tun,
201                            drained_endpoint,
202                            "Drained queued packets before rx-loop maintenance"
203                        );
204                    }
205                    let recent_data_activity = last_data_activity
206                        .is_some_and(|t| t.elapsed() <= RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW);
207                    let data_pressure = drained > 0 || recent_data_activity;
208                    if !data_pressure {
209                        slow_maintenance_timed_out_under_data = false;
210                    }
211
212                    let slow_timed_out = self.run_rx_loop_maintenance_tick(
213                        data_pressure,
214                        data_pressure && slow_maintenance_timed_out_under_data,
215                    ).await;
216                    if slow_timed_out && data_pressure {
217                        slow_maintenance_timed_out_under_data = true;
218                    }
219
220                    let (post_drained_packets, post_drained_tun, post_drained_endpoint) = self.drain_rx_loop_data_queues(
221                        &mut packet_rx,
222                        &mut decrypt_fallback_rx,
223                        &mut tun_outbound_rx,
224                        &mut endpoint_priority_command_rx,
225                        &mut endpoint_command_rx,
226                        PACKET_DRAIN_BUDGET,
227                    ).await;
228                    let post_drained = post_drained_packets + post_drained_tun + post_drained_endpoint;
229                    if post_drained > 0 {
230                        last_data_activity = Some(Instant::now());
231                        debug!(
232                            drained = post_drained,
233                            drained_packets = post_drained_packets,
234                            drained_tun = post_drained_tun,
235                            drained_endpoint = post_drained_endpoint,
236                            "Drained queued packets after rx-loop maintenance"
237                        );
238                    }
239                }
240                Some(ipv6_packet) = tun_outbound_rx.recv() => {
241                    let drained = self.drain_tun_outbound(
242                        &mut tun_outbound_rx,
243                        Some(ipv6_packet),
244                        PACKET_DRAIN_BUDGET,
245                    ).await;
246                    if drained > 0 {
247                        last_data_activity = Some(Instant::now());
248                    }
249                }
250                Some(identity) = dns_identity_rx.recv() => {
251                    debug!(
252                        node_addr = %identity.node_addr,
253                        "Registering identity from DNS resolution"
254                    );
255                    self.register_identity(identity.node_addr, identity.pubkey);
256                }
257                Some(command) = endpoint_priority_command_rx.recv() => {
258                    let drained = self.drain_endpoint_commands(
259                        &mut endpoint_priority_command_rx,
260                        &mut endpoint_command_rx,
261                        Some(command),
262                        None,
263                        PACKET_DRAIN_BUDGET,
264                    ).await;
265                    if drained > 0 {
266                        last_data_activity = Some(Instant::now());
267                    }
268                }
269                Some(command) = endpoint_command_rx.recv() => {
270                    let drained = self.drain_endpoint_commands(
271                        &mut endpoint_priority_command_rx,
272                        &mut endpoint_command_rx,
273                        None,
274                        Some(command),
275                        PACKET_DRAIN_BUDGET,
276                    ).await;
277                    if drained > 0 {
278                        last_data_activity = Some(Instant::now());
279                    }
280                }
281                Some((request, response_tx)) = control_rx.recv() => {
282                    let response = if request.command.starts_with("show_") {
283                        queries::dispatch(self, &request.command, request.params.as_ref())
284                    } else {
285                        commands::dispatch(
286                            self,
287                            &request.command,
288                            request.params.as_ref(),
289                        ).await
290                    };
291                    let _ = response_tx.send(response);
292                }
293            }
294        }
295
296        info!("RX event loop stopped (channel closed)");
297        Ok(())
298    }
299
300    async fn drain_rx_loop_data_queues(
301        &mut self,
302        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
303        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
304        tun_outbound_rx: &mut TunOutboundRx,
305        endpoint_priority_command_rx: &mut Receiver<NodeEndpointCommand>,
306        endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
307        budget: usize,
308    ) -> (usize, usize, usize) {
309        let drained_packets = self
310            .drain_packet_rx(packet_rx, decrypt_fallback_rx, None, budget)
311            .await;
312        let drained_tun = self.drain_tun_outbound(tun_outbound_rx, None, budget).await;
313        let drained_endpoint = self
314            .drain_endpoint_commands(
315                endpoint_priority_command_rx,
316                endpoint_command_rx,
317                None,
318                None,
319                budget,
320            )
321            .await;
322        (drained_packets, drained_tun, drained_endpoint)
323    }
324
325    async fn drain_packet_rx(
326        &mut self,
327        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
328        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
329        first_packet: Option<ReceivedPacket>,
330        budget: usize,
331    ) -> usize {
332        let mut drained = 0usize;
333        if let Some(packet) = first_packet {
334            self.process_packet(packet).await;
335            drained = 1;
336        }
337
338        // Drain remaining ready inbound packets in a tight loop before
339        // yielding back to select! Every yield is a scheduler hop, and at
340        // line rate transports typically have several packets available per
341        // wake. Caps at a batch boundary so other branches eventually get a
342        // turn even under sustained load.
343        while drained < budget {
344            if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
345                self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
346                    .await;
347            }
348            match packet_rx.try_recv() {
349                Ok(packet) => {
350                    self.process_packet(packet).await;
351                    drained += 1;
352                }
353                Err(_) => break,
354            }
355        }
356
357        if drained > 0 {
358            // One trailing fallback drain so the last bounced packets of the
359            // burst aren't held up by the post-burst send flush.
360            self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
361                .await;
362            // Flush any batched sends triggered by inbound packets (e.g.
363            // forwarded SessionDatagrams, MMP reports, tree announces).
364            self.flush_pending_sends().await;
365        }
366        drained
367    }
368
369    async fn drain_tun_outbound(
370        &mut self,
371        tun_outbound_rx: &mut TunOutboundRx,
372        first_packet: Option<Vec<u8>>,
373        budget: usize,
374    ) -> usize {
375        let mut drained = 0usize;
376        if let Some(packet) = first_packet {
377            self.handle_tun_outbound(packet).await;
378            drained = 1;
379        }
380
381        while drained < budget {
382            match tun_outbound_rx.try_recv() {
383                Ok(packet) => {
384                    self.handle_tun_outbound(packet).await;
385                    drained += 1;
386                }
387                Err(_) => break,
388            }
389        }
390
391        if drained > 0 {
392            self.flush_pending_sends().await;
393        }
394        drained
395    }
396
397    async fn drain_endpoint_commands(
398        &mut self,
399        endpoint_priority_command_rx: &mut Receiver<NodeEndpointCommand>,
400        endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
401        first_priority_command: Option<NodeEndpointCommand>,
402        first_bulk_command: Option<NodeEndpointCommand>,
403        budget: usize,
404    ) -> usize {
405        let mut first_bulk_command = first_bulk_command;
406        let mut drained = 0usize;
407        if let Some(command) = first_priority_command {
408            self.handle_endpoint_data_command(command).await;
409            drained = 1;
410        }
411
412        while drained < budget {
413            let Some(command) = try_recv_endpoint_command(
414                endpoint_priority_command_rx,
415                endpoint_command_rx,
416                &mut first_bulk_command,
417            ) else {
418                break;
419            };
420            self.handle_endpoint_data_command(command).await;
421            drained += 1;
422        }
423
424        if drained > 0 {
425            self.flush_pending_sends().await;
426        }
427        drained
428    }
429
430    async fn run_rx_loop_maintenance_tick(
431        &mut self,
432        data_pressure: bool,
433        skip_slow_maintenance: bool,
434    ) -> bool {
435        self.check_timeouts();
436        let now_ms = Self::now_ms();
437        // Link/session liveness must run before slower retry/discovery work:
438        // under bulk send pressure a late heartbeat or MMP report is
439        // indistinguishable from a dead direct path on the remote peer.
440        self.check_link_heartbeats().await;
441        self.reload_peer_acl();
442        self.resend_pending_handshakes(now_ms).await;
443        self.resend_pending_rekeys(now_ms).await;
444        self.resend_pending_session_handshakes(now_ms).await;
445        self.resend_pending_session_msg3(now_ms).await;
446        self.purge_idle_sessions(now_ms);
447        self.purge_learned_routes(now_ms);
448        self.check_mmp_reports().await;
449        self.check_session_mmp_reports().await;
450        self.check_rekey().await;
451        self.check_session_rekey().await;
452        self.check_pending_lookups(now_ms).await;
453        self.poll_pending_connects().await;
454        self.process_pending_retries(now_ms).await;
455        self.poll_transport_discovery().await;
456        self.activate_connected_udp_sessions().await;
457        self.sample_transport_congestion();
458
459        if skip_slow_maintenance {
460            return false;
461        }
462
463        let slow_timeout = if data_pressure {
464            RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT
465        } else {
466            RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT
467        };
468
469        if tokio::time::timeout(slow_timeout, self.run_rx_loop_slow_maintenance_tick())
470            .await
471            .is_err()
472        {
473            self.mark_rx_loop_maintenance_timeout();
474            warn!(
475                timeout_ms = slow_timeout.as_millis() as u64,
476                data_pressure, "RX loop slow maintenance timed out; continuing packet processing"
477            );
478            return true;
479        }
480        false
481    }
482
483    async fn run_rx_loop_slow_maintenance_tick(&mut self) {
484        if let Some(delay) = rx_loop_slow_maintenance_fault_delay() {
485            tokio::time::sleep(delay).await;
486        }
487
488        // Discovery and graph/stat maintenance can involve relay work or
489        // larger scans. Keep it bounded after direct-path liveness and session
490        // upkeep so a slow Nostr/LAN tick degrades discovery freshness, not
491        // packet flow.
492        self.poll_nostr_discovery().await;
493        self.poll_lan_discovery().await;
494        self.poll_local_instance_discovery().await;
495        self.check_tree_state().await;
496        self.check_bloom_state().await;
497        self.compute_mesh_size();
498        self.record_stats_history();
499    }
500
501    /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
502    /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
503    /// flag byte the worker captured into `DecryptFallback::fmp_flags`
504    /// (without this both ECN CE propagation and spin-bit RTT
505    /// observation are dropped on the worker path) and slices the
506    /// plaintext out of the original wire buffer with zero allocation.
507    async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
508        match event {
509            DecryptWorkerEvent::Plaintext(fallback) => {
510                self.process_decrypt_fallback(fallback).await;
511            }
512            DecryptWorkerEvent::DecryptFailure(report) => {
513                self.process_decrypt_failure_report(report).await;
514            }
515        }
516    }
517
518    async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
519        let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
520        let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
521        let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
522            ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
523        self.process_authentic_fmp_plaintext(
524            &fallback.source_node_addr,
525            fallback.transport_id,
526            &fallback.remote_addr,
527            fallback.timestamp_ms,
528            fallback.packet_len,
529            fallback.fmp_counter,
530            ce_flag,
531            sp_flag,
532            plaintext,
533        )
534        .await;
535    }
536
537    async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
538        debug!(
539            peer = %self.peer_display_name(&report.source_node_addr),
540            counter = report.fmp_counter,
541            replay_highest = report.fmp_replay_highest,
542            "Worker FMP AEAD decryption failed"
543        );
544        self.handle_decrypt_failure_report(&report).await;
545    }
546
547    /// Drain up to `budget` queued fallbacks without yielding back to
548    /// `select!`. Returns the number processed. Called both from the
549    /// promoted-fallback select arm (after the head item) and
550    /// interleaved inside the packet_rx drain loop so bounced FMP
551    /// plaintexts can't accumulate behind a 256-packet inbound burst.
552    async fn drain_decrypt_fallback(
553        &mut self,
554        rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
555        budget: usize,
556    ) -> usize {
557        let mut drained = 0;
558        while drained < budget {
559            match rx.try_recv() {
560                Ok(event) => {
561                    self.process_decrypt_worker_event(event).await;
562                    drained += 1;
563                }
564                Err(_) => break,
565            }
566        }
567        drained
568    }
569
570    /// Flush any pending batched sends across all transports. Today
571    /// every transport's `flush_pending_send` is a no-op — the UDP
572    /// transport's per-transport `pending_send` buffer was removed
573    /// when the bulk data path moved into `encrypt_worker` (which
574    /// does its own target-grouped `sendmmsg(2)` directly). The
575    /// call sites are retained so any future batched transport can
576    /// opt in by overriding `flush_pending_send` without touching
577    /// the rx_loop.
578    async fn flush_pending_sends(&self) {
579        for transport in self.transports.values() {
580            if matches!(transport, TransportHandle::Udp(_)) {
581                transport.flush_pending_send().await;
582            }
583        }
584    }
585
586    /// Process a single received packet.
587    ///
588    /// Dispatches based on the phase field in the 4-byte common prefix.
589    pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
590        let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
591        crate::perf_profile::record_since(
592            crate::perf_profile::Stage::TransportQueueWait,
593            packet.trace_enqueued_at,
594        );
595        if is_punch_packet(&packet.data) {
596            trace!(
597                transport_id = %packet.transport_id,
598                remote_addr = %packet.remote_addr,
599                bytes = packet.data.len(),
600                "Dropping stray punch probe/ack in FMP rx loop"
601            );
602            return;
603        }
604        if packet.data.len() < COMMON_PREFIX_SIZE {
605            return; // Drop packets too short for common prefix
606        }
607
608        let prefix = match CommonPrefix::parse(&packet.data) {
609            Some(p) => p,
610            None => return, // Malformed prefix
611        };
612        if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
613            debug!(
614                transport_id = %packet.transport_id,
615                remote_addr = %packet.remote_addr,
616                bytes = packet.data.len(),
617                phase = prefix.phase,
618                version = prefix.version,
619                "FMP handshake packet dispatch"
620            );
621        } else {
622            trace!(
623                transport_id = %packet.transport_id,
624                remote_addr = %packet.remote_addr,
625                bytes = packet.data.len(),
626                phase = prefix.phase,
627                version = prefix.version,
628                "FMP packet dispatch"
629            );
630        }
631
632        if prefix.version != FMP_VERSION {
633            debug!(
634                version = prefix.version,
635                transport_id = %packet.transport_id,
636                "Unknown FMP version, dropping"
637            );
638
639            // If the packet arrived on an adopted Nostr-NAT bootstrap
640            // transport, the originating peer is necessarily on a
641            // different FMP-protocol version than us — the discovery
642            // sweep would otherwise re-traverse them every cycle even
643            // though no msg1/msg2 exchange can ever succeed. Bump the
644            // discovery-layer cooldown to the long protocol-mismatch
645            // window and emit a single WARN per fresh observation.
646            let looks_like_fmp_phase =
647                matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
648            if looks_like_fmp_phase
649                && self.bootstrap_transports.contains(&packet.transport_id)
650                && let Some(npub) = self
651                    .bootstrap_transport_npubs
652                    .get(&packet.transport_id)
653                    .cloned()
654                && let Some(handle) = self.nostr_discovery_handle()
655            {
656                let now_ms = Self::now_ms();
657                let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
658                if handle.record_protocol_mismatch(&npub, now_ms) {
659                    warn!(
660                        peer_npub = %npub,
661                        transport_id = %packet.transport_id,
662                        peer_version = prefix.version,
663                        our_version = FMP_VERSION,
664                        cooldown_secs,
665                        "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
666                    );
667                }
668            }
669            return;
670        }
671
672        match prefix.phase {
673            PHASE_ESTABLISHED => {
674                self.handle_encrypted_frame(packet).await;
675            }
676            PHASE_MSG1 => {
677                self.handle_msg1(packet).await;
678            }
679            PHASE_MSG2 => {
680                self.handle_msg2(packet).await;
681            }
682            _ => {
683                debug!(
684                    phase = prefix.phase,
685                    transport_id = %packet.transport_id,
686                    "Unknown FMP phase, dropping"
687                );
688            }
689        }
690    }
691}
692
693fn try_recv_endpoint_command<T>(
694    priority_rx: &mut Receiver<T>,
695    bulk_rx: &mut Receiver<T>,
696    first_bulk: &mut Option<T>,
697) -> Option<T> {
698    priority_rx
699        .try_recv()
700        .ok()
701        .or_else(|| first_bulk.take())
702        .or_else(|| bulk_rx.try_recv().ok())
703}
704
705#[cfg(test)]
706mod tests {
707    use super::try_recv_endpoint_command;
708
709    #[tokio::test]
710    async fn endpoint_command_drain_prefers_ready_priority_over_selected_bulk() {
711        let (priority_tx, mut priority_rx) = tokio::sync::mpsc::channel(4);
712        let (bulk_tx, mut bulk_rx) = tokio::sync::mpsc::channel(4);
713
714        priority_tx.send("priority").await.unwrap();
715        bulk_tx.send("bulk-queued").await.unwrap();
716        let mut selected_bulk = Some("bulk-selected");
717
718        assert_eq!(
719            try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
720            Some("priority")
721        );
722        assert_eq!(
723            try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
724            Some("bulk-selected")
725        );
726        assert_eq!(
727            try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
728            Some("bulk-queued")
729        );
730        assert_eq!(
731            try_recv_endpoint_command(&mut priority_rx, &mut bulk_rx, &mut selected_bulk),
732            None
733        );
734    }
735}