Skip to main content

fips_core/node/handlers/
rx_loop.rs

1//! RX event loop and packet dispatch.
2
3use crate::control::queries;
4use crate::control::{ControlSocket, commands};
5use crate::discovery::is_punch_packet;
6use crate::node::decrypt_worker::{DecryptFailureReport, DecryptFallback, DecryptWorkerEvent};
7use crate::node::wire::{
8    COMMON_PREFIX_SIZE, CommonPrefix, FLAG_CE, FLAG_SP, FMP_VERSION, PHASE_ESTABLISHED, PHASE_MSG1,
9    PHASE_MSG2,
10};
11use crate::node::{Node, NodeEndpointCommand, NodeError};
12use crate::transport::ReceivedPacket;
13use crate::transport::TransportHandle;
14use crate::upper::tun::TunOutboundRx;
15use std::time::{Duration, Instant};
16use tokio::sync::mpsc::{Receiver, UnboundedReceiver};
17use tracing::{debug, info, trace, warn};
18
19/// How often the raw-packet drain loop yields a slice of work to the
20/// decrypt-fallback drain. Keeps TCP ACK / heartbeat / handshake
21/// progress steady under sustained inbound bursts.
22const FALLBACK_INTERLEAVE_EVERY: usize = 32;
23/// Cap on the per-interleave fallback drain so a hot inbound spike
24/// can't starve the outer raw-packet drain in the opposite direction.
25const FALLBACK_INTERLEAVE_BUDGET: usize = 64;
26const PACKET_DRAIN_BUDGET: usize = 256;
27const RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT: Duration = Duration::from_millis(100);
28const RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT: Duration = Duration::from_millis(10);
29const RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW: Duration = Duration::from_secs(2);
30const RX_LOOP_FAULT_MAX_DELAY_MS: u64 = 5_000;
31
32fn rx_loop_slow_maintenance_fault_delay() -> Option<Duration> {
33    let raw = std::env::var("FIPS_FAULT_INJECT_RX_LOOP_SLOW_MAINTENANCE_MS").ok()?;
34    let ms = raw
35        .trim()
36        .parse::<u64>()
37        .ok()?
38        .min(RX_LOOP_FAULT_MAX_DELAY_MS);
39    (ms > 0).then(|| Duration::from_millis(ms))
40}
41
42impl Node {
43    /// Run the receive event loop.
44    ///
45    /// Processes packets from all transports, dispatching based on
46    /// the phase field in the 4-byte common prefix:
47    /// - Phase 0x0: Encrypted frame (session data)
48    /// - Phase 0x1: Handshake message 1 (initiator -> responder)
49    /// - Phase 0x2: Handshake message 2 (responder -> initiator)
50    ///
51    /// Also processes outbound IPv6 packets from the TUN reader for session
52    /// encapsulation and routing through the mesh.
53    ///
54    /// Also processes DNS-resolved identities for identity cache population.
55    ///
56    /// Also runs a periodic tick (1s) to clean up stale handshake connections
57    /// that never received a response. This prevents resource leaks when peers
58    /// are unreachable.
59    ///
60    /// This method takes ownership of the packet_rx channel and runs
61    /// until the channel is closed (typically when stop() is called).
62    pub async fn run_rx_loop(&mut self) -> Result<(), NodeError> {
63        let mut packet_rx = self.packet_rx.take().ok_or(NodeError::NotStarted)?;
64
65        // Take the TUN outbound receiver, or create a dummy channel that never
66        // produces messages (when TUN is disabled). Holding the sender prevents
67        // the channel from closing.
68        let (mut tun_outbound_rx, _tun_guard) = match self.tun_outbound_rx.take() {
69            Some(rx) => (rx, None),
70            None => {
71                let (tx, rx) = tokio::sync::mpsc::channel(1);
72                (rx, Some(tx))
73            }
74        };
75
76        // Take the DNS identity receiver, or create a dummy channel (when DNS
77        // is disabled). Same pattern as TUN outbound.
78        let (mut dns_identity_rx, _dns_guard) = match self.dns_identity_rx.take() {
79            Some(rx) => (rx, None),
80            None => {
81                let (tx, rx) = tokio::sync::mpsc::channel(1);
82                (rx, Some(tx))
83            }
84        };
85
86        // Take the endpoint-data command receiver, or create a dummy channel
87        // when the embedded endpoint API is not in use.
88        let (mut endpoint_command_rx, _endpoint_command_guard) =
89            match self.endpoint_command_rx.take() {
90                Some(rx) => (rx, None),
91                None => {
92                    let (tx, rx) = tokio::sync::mpsc::channel(1);
93                    (rx, Some(tx))
94                }
95            };
96
97        // Take the decrypt worker fallback receiver if a worker pool
98        // is in use. The worker pushes non-fast-path packets (anything
99        // that's not bulk EndpointData) here for the legacy dispatch.
100        let (mut decrypt_fallback_rx, _decrypt_fallback_guard) =
101            match self.decrypt_fallback_rx.take() {
102                Some(rx) => (rx, None),
103                None => {
104                    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
105                    (rx, Some(tx))
106                }
107            };
108
109        let mut tick =
110            tokio::time::interval(Duration::from_secs(self.config.node.tick_interval_secs));
111        tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
112        let mut last_data_activity = None::<Instant>;
113        let mut slow_maintenance_timed_out_under_data = false;
114
115        // Set up control socket channel
116        let (control_tx, mut control_rx) =
117            tokio::sync::mpsc::channel::<crate::control::ControlMessage>(32);
118
119        if self.config.node.control.enabled {
120            let config = self.config.node.control.clone();
121            let tx = control_tx.clone();
122            tokio::spawn(async move {
123                match ControlSocket::bind(&config) {
124                    Ok(socket) => {
125                        socket.accept_loop(tx).await;
126                    }
127                    Err(e) => {
128                        warn!(error = %e, "Failed to bind control socket");
129                    }
130                }
131            });
132        }
133        // Drop unused sender to avoid keeping channel open if control is disabled
134        drop(control_tx);
135
136        info!("RX event loop started");
137        // Optional perf profiler (FIPS_PERF=1). No-op otherwise.
138        crate::perf_profile::maybe_spawn_reporter();
139
140        loop {
141            tokio::select! {
142                biased;
143                // Decrypt-worker fallback drains FIRST. The previous
144                // ordering put `packet_rx` first, which under sustained
145                // inbound bursts let the raw-packet drain (up to 256
146                // packets + flush) starve fallback work for tens of
147                // milliseconds. UDP throughput tolerates that; TCP
148                // doesn't — late FMP plaintexts mean late ACKs,
149                // dup-ACK fast retransmits, and cwnd collapse.
150                // Reproduced on native macOS / Wi-Fi where TCP fell to
151                // ~10 Mb/s while UDP cleared ~100 Mb/s on the same
152                // tunnel. Promoting fallback gives the kernel-side
153                // TCP machinery a fair chance to see its ACKs and
154                // keep cwnd growing.
155                Some(event) = decrypt_fallback_rx.recv() => {
156                    self.process_decrypt_worker_event(event).await;
157                    self.drain_decrypt_fallback(&mut decrypt_fallback_rx, 255).await;
158                    last_data_activity = Some(Instant::now());
159                    self.flush_pending_sends().await;
160                }
161                packet = packet_rx.recv() => {
162                    match packet {
163                        Some(p) => {
164                            let drained = self.drain_packet_rx(
165                                &mut packet_rx,
166                                &mut decrypt_fallback_rx,
167                                Some(p),
168                                PACKET_DRAIN_BUDGET,
169                            ).await;
170                            if drained > 0 {
171                                last_data_activity = Some(Instant::now());
172                            }
173                        }
174                        None => break, // channel closed
175                    }
176                }
177                _ = tick.tick() => {
178                    let (drained_packets, drained_tun, drained_endpoint) = self.drain_rx_loop_data_queues(
179                        &mut packet_rx,
180                        &mut decrypt_fallback_rx,
181                        &mut tun_outbound_rx,
182                        &mut endpoint_command_rx,
183                        PACKET_DRAIN_BUDGET,
184                    ).await;
185                    let drained = drained_packets + drained_tun + drained_endpoint;
186                    if drained > 0 {
187                        last_data_activity = Some(Instant::now());
188                        debug!(
189                            drained,
190                            drained_packets,
191                            drained_tun,
192                            drained_endpoint,
193                            "Drained queued packets before rx-loop maintenance"
194                        );
195                    }
196                    let recent_data_activity = last_data_activity
197                        .is_some_and(|t| t.elapsed() <= RX_LOOP_RECENT_DATA_ACTIVITY_WINDOW);
198                    let data_pressure = drained > 0 || recent_data_activity;
199                    if !data_pressure {
200                        slow_maintenance_timed_out_under_data = false;
201                    }
202
203                    let slow_timed_out = self.run_rx_loop_maintenance_tick(
204                        data_pressure,
205                        data_pressure && slow_maintenance_timed_out_under_data,
206                    ).await;
207                    if slow_timed_out && data_pressure {
208                        slow_maintenance_timed_out_under_data = true;
209                    }
210
211                    let (post_drained_packets, post_drained_tun, post_drained_endpoint) = self.drain_rx_loop_data_queues(
212                        &mut packet_rx,
213                        &mut decrypt_fallback_rx,
214                        &mut tun_outbound_rx,
215                        &mut endpoint_command_rx,
216                        PACKET_DRAIN_BUDGET,
217                    ).await;
218                    let post_drained = post_drained_packets + post_drained_tun + post_drained_endpoint;
219                    if post_drained > 0 {
220                        last_data_activity = Some(Instant::now());
221                        debug!(
222                            drained = post_drained,
223                            drained_packets = post_drained_packets,
224                            drained_tun = post_drained_tun,
225                            drained_endpoint = post_drained_endpoint,
226                            "Drained queued packets after rx-loop maintenance"
227                        );
228                    }
229                }
230                Some(ipv6_packet) = tun_outbound_rx.recv() => {
231                    let drained = self.drain_tun_outbound(
232                        &mut tun_outbound_rx,
233                        Some(ipv6_packet),
234                        PACKET_DRAIN_BUDGET,
235                    ).await;
236                    if drained > 0 {
237                        last_data_activity = Some(Instant::now());
238                    }
239                }
240                Some(identity) = dns_identity_rx.recv() => {
241                    debug!(
242                        node_addr = %identity.node_addr,
243                        "Registering identity from DNS resolution"
244                    );
245                    self.register_identity(identity.node_addr, identity.pubkey);
246                }
247                Some(command) = endpoint_command_rx.recv() => {
248                    let drained = self.drain_endpoint_commands(
249                        &mut endpoint_command_rx,
250                        Some(command),
251                        PACKET_DRAIN_BUDGET,
252                    ).await;
253                    if drained > 0 {
254                        last_data_activity = Some(Instant::now());
255                    }
256                }
257                Some((request, response_tx)) = control_rx.recv() => {
258                    let response = if request.command.starts_with("show_") {
259                        queries::dispatch(self, &request.command, request.params.as_ref())
260                    } else {
261                        commands::dispatch(
262                            self,
263                            &request.command,
264                            request.params.as_ref(),
265                        ).await
266                    };
267                    let _ = response_tx.send(response);
268                }
269            }
270        }
271
272        info!("RX event loop stopped (channel closed)");
273        Ok(())
274    }
275
276    async fn drain_rx_loop_data_queues(
277        &mut self,
278        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
279        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
280        tun_outbound_rx: &mut TunOutboundRx,
281        endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
282        budget: usize,
283    ) -> (usize, usize, usize) {
284        let drained_packets = self
285            .drain_packet_rx(packet_rx, decrypt_fallback_rx, None, budget)
286            .await;
287        let drained_tun = self.drain_tun_outbound(tun_outbound_rx, None, budget).await;
288        let drained_endpoint = self
289            .drain_endpoint_commands(endpoint_command_rx, None, budget)
290            .await;
291        (drained_packets, drained_tun, drained_endpoint)
292    }
293
294    async fn drain_packet_rx(
295        &mut self,
296        packet_rx: &mut UnboundedReceiver<ReceivedPacket>,
297        decrypt_fallback_rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
298        first_packet: Option<ReceivedPacket>,
299        budget: usize,
300    ) -> usize {
301        let mut drained = 0usize;
302        if let Some(packet) = first_packet {
303            self.process_packet(packet).await;
304            drained = 1;
305        }
306
307        // Drain remaining ready inbound packets in a tight loop before
308        // yielding back to select! Every yield is a scheduler hop, and at
309        // line rate transports typically have several packets available per
310        // wake. Caps at a batch boundary so other branches eventually get a
311        // turn even under sustained load.
312        while drained < budget {
313            if drained > 0 && drained.is_multiple_of(FALLBACK_INTERLEAVE_EVERY) {
314                self.drain_decrypt_fallback(decrypt_fallback_rx, FALLBACK_INTERLEAVE_BUDGET)
315                    .await;
316            }
317            match packet_rx.try_recv() {
318                Ok(packet) => {
319                    self.process_packet(packet).await;
320                    drained += 1;
321                }
322                Err(_) => break,
323            }
324        }
325
326        if drained > 0 {
327            // One trailing fallback drain so the last bounced packets of the
328            // burst aren't held up by the post-burst send flush.
329            self.drain_decrypt_fallback(decrypt_fallback_rx, PACKET_DRAIN_BUDGET)
330                .await;
331            // Flush any batched sends triggered by inbound packets (e.g.
332            // forwarded SessionDatagrams, MMP reports, tree announces).
333            self.flush_pending_sends().await;
334        }
335        drained
336    }
337
338    async fn drain_tun_outbound(
339        &mut self,
340        tun_outbound_rx: &mut TunOutboundRx,
341        first_packet: Option<Vec<u8>>,
342        budget: usize,
343    ) -> usize {
344        let mut drained = 0usize;
345        if let Some(packet) = first_packet {
346            self.handle_tun_outbound(packet).await;
347            drained = 1;
348        }
349
350        while drained < budget {
351            match tun_outbound_rx.try_recv() {
352                Ok(packet) => {
353                    self.handle_tun_outbound(packet).await;
354                    drained += 1;
355                }
356                Err(_) => break,
357            }
358        }
359
360        if drained > 0 {
361            self.flush_pending_sends().await;
362        }
363        drained
364    }
365
366    async fn drain_endpoint_commands(
367        &mut self,
368        endpoint_command_rx: &mut Receiver<NodeEndpointCommand>,
369        first_command: Option<NodeEndpointCommand>,
370        budget: usize,
371    ) -> usize {
372        let mut drained = 0usize;
373        if let Some(command) = first_command {
374            self.handle_endpoint_data_command(command).await;
375            drained = 1;
376        }
377
378        while drained < budget {
379            match endpoint_command_rx.try_recv() {
380                Ok(command) => {
381                    self.handle_endpoint_data_command(command).await;
382                    drained += 1;
383                }
384                Err(_) => break,
385            }
386        }
387
388        if drained > 0 {
389            self.flush_pending_sends().await;
390        }
391        drained
392    }
393
394    async fn run_rx_loop_maintenance_tick(
395        &mut self,
396        data_pressure: bool,
397        skip_slow_maintenance: bool,
398    ) -> bool {
399        self.check_timeouts();
400        let now_ms = Self::now_ms();
401        // Link/session liveness must run before slower retry/discovery work:
402        // under bulk send pressure a late heartbeat or MMP report is
403        // indistinguishable from a dead direct path on the remote peer.
404        self.check_link_heartbeats().await;
405        self.reload_peer_acl();
406        self.resend_pending_handshakes(now_ms).await;
407        self.resend_pending_rekeys(now_ms).await;
408        self.resend_pending_session_handshakes(now_ms).await;
409        self.resend_pending_session_msg3(now_ms).await;
410        self.purge_idle_sessions(now_ms);
411        self.purge_learned_routes(now_ms);
412        self.check_mmp_reports().await;
413        self.check_session_mmp_reports().await;
414        self.check_rekey().await;
415        self.check_session_rekey().await;
416        self.check_pending_lookups(now_ms).await;
417        self.poll_pending_connects().await;
418        self.process_pending_retries(now_ms).await;
419        self.poll_transport_discovery().await;
420        self.activate_connected_udp_sessions().await;
421        self.sample_transport_congestion();
422
423        if skip_slow_maintenance {
424            return false;
425        }
426
427        let slow_timeout = if data_pressure {
428            RX_LOOP_SLOW_MAINTENANCE_BUSY_TIMEOUT
429        } else {
430            RX_LOOP_SLOW_MAINTENANCE_IDLE_TIMEOUT
431        };
432
433        if tokio::time::timeout(slow_timeout, self.run_rx_loop_slow_maintenance_tick())
434            .await
435            .is_err()
436        {
437            self.mark_rx_loop_maintenance_timeout();
438            warn!(
439                timeout_ms = slow_timeout.as_millis() as u64,
440                data_pressure, "RX loop slow maintenance timed out; continuing packet processing"
441            );
442            return true;
443        }
444        false
445    }
446
447    async fn run_rx_loop_slow_maintenance_tick(&mut self) {
448        if let Some(delay) = rx_loop_slow_maintenance_fault_delay() {
449            tokio::time::sleep(delay).await;
450        }
451
452        // Discovery and graph/stat maintenance can involve relay work or
453        // larger scans. Keep it bounded after direct-path liveness and session
454        // upkeep so a slow Nostr/LAN tick degrades discovery freshness, not
455        // packet flow.
456        self.poll_nostr_discovery().await;
457        self.poll_lan_discovery().await;
458        self.poll_local_instance_discovery().await;
459        self.check_tree_state().await;
460        self.check_bloom_state().await;
461        self.compute_mesh_size();
462        self.record_stats_history();
463    }
464
465    /// Hand a decrypt-worker fallback to the canonical post-FMP-decrypt
466    /// processor. Reconstructs `ce_flag` / `sp_flag` from the FMP header
467    /// flag byte the worker captured into `DecryptFallback::fmp_flags`
468    /// (without this both ECN CE propagation and spin-bit RTT
469    /// observation are dropped on the worker path) and slices the
470    /// plaintext out of the original wire buffer with zero allocation.
471    async fn process_decrypt_worker_event(&mut self, event: DecryptWorkerEvent) {
472        match event {
473            DecryptWorkerEvent::Plaintext(fallback) => {
474                self.process_decrypt_fallback(fallback).await;
475            }
476            DecryptWorkerEvent::DecryptFailure(report) => {
477                self.process_decrypt_failure_report(report).await;
478            }
479        }
480    }
481
482    async fn process_decrypt_fallback(&mut self, fallback: DecryptFallback) {
483        let ce_flag = fallback.fmp_flags & FLAG_CE != 0;
484        let sp_flag = fallback.fmp_flags & FLAG_SP != 0;
485        let plaintext = &fallback.packet_data[fallback.fmp_plaintext_offset
486            ..fallback.fmp_plaintext_offset + fallback.fmp_plaintext_len];
487        self.process_authentic_fmp_plaintext(
488            &fallback.source_node_addr,
489            fallback.transport_id,
490            &fallback.remote_addr,
491            fallback.timestamp_ms,
492            fallback.packet_len,
493            fallback.fmp_counter,
494            ce_flag,
495            sp_flag,
496            plaintext,
497        )
498        .await;
499    }
500
501    async fn process_decrypt_failure_report(&mut self, report: DecryptFailureReport) {
502        debug!(
503            peer = %self.peer_display_name(&report.source_node_addr),
504            counter = report.fmp_counter,
505            replay_highest = report.fmp_replay_highest,
506            "Worker FMP AEAD decryption failed"
507        );
508        self.handle_decrypt_failure_report(&report).await;
509    }
510
511    /// Drain up to `budget` queued fallbacks without yielding back to
512    /// `select!`. Returns the number processed. Called both from the
513    /// promoted-fallback select arm (after the head item) and
514    /// interleaved inside the packet_rx drain loop so bounced FMP
515    /// plaintexts can't accumulate behind a 256-packet inbound burst.
516    async fn drain_decrypt_fallback(
517        &mut self,
518        rx: &mut UnboundedReceiver<DecryptWorkerEvent>,
519        budget: usize,
520    ) -> usize {
521        let mut drained = 0;
522        while drained < budget {
523            match rx.try_recv() {
524                Ok(event) => {
525                    self.process_decrypt_worker_event(event).await;
526                    drained += 1;
527                }
528                Err(_) => break,
529            }
530        }
531        drained
532    }
533
534    /// Flush any pending batched sends across all transports. Today
535    /// every transport's `flush_pending_send` is a no-op — the UDP
536    /// transport's per-transport `pending_send` buffer was removed
537    /// when the bulk data path moved into `encrypt_worker` (which
538    /// does its own target-grouped `sendmmsg(2)` directly). The
539    /// call sites are retained so any future batched transport can
540    /// opt in by overriding `flush_pending_send` without touching
541    /// the rx_loop.
542    async fn flush_pending_sends(&self) {
543        for transport in self.transports.values() {
544            if matches!(transport, TransportHandle::Udp(_)) {
545                transport.flush_pending_send().await;
546            }
547        }
548    }
549
550    /// Process a single received packet.
551    ///
552    /// Dispatches based on the phase field in the 4-byte common prefix.
553    pub(in crate::node) async fn process_packet(&mut self, packet: ReceivedPacket) {
554        let _t_total = crate::perf_profile::Timer::start(crate::perf_profile::Stage::ProcessPacket);
555        crate::perf_profile::record_since(
556            crate::perf_profile::Stage::TransportQueueWait,
557            packet.trace_enqueued_at,
558        );
559        if is_punch_packet(&packet.data) {
560            trace!(
561                transport_id = %packet.transport_id,
562                remote_addr = %packet.remote_addr,
563                bytes = packet.data.len(),
564                "Dropping stray punch probe/ack in FMP rx loop"
565            );
566            return;
567        }
568        if packet.data.len() < COMMON_PREFIX_SIZE {
569            return; // Drop packets too short for common prefix
570        }
571
572        let prefix = match CommonPrefix::parse(&packet.data) {
573            Some(p) => p,
574            None => return, // Malformed prefix
575        };
576        if matches!(prefix.phase, PHASE_MSG1 | PHASE_MSG2) {
577            debug!(
578                transport_id = %packet.transport_id,
579                remote_addr = %packet.remote_addr,
580                bytes = packet.data.len(),
581                phase = prefix.phase,
582                version = prefix.version,
583                "FMP handshake packet dispatch"
584            );
585        } else {
586            trace!(
587                transport_id = %packet.transport_id,
588                remote_addr = %packet.remote_addr,
589                bytes = packet.data.len(),
590                phase = prefix.phase,
591                version = prefix.version,
592                "FMP packet dispatch"
593            );
594        }
595
596        if prefix.version != FMP_VERSION {
597            debug!(
598                version = prefix.version,
599                transport_id = %packet.transport_id,
600                "Unknown FMP version, dropping"
601            );
602
603            // If the packet arrived on an adopted Nostr-NAT bootstrap
604            // transport, the originating peer is necessarily on a
605            // different FMP-protocol version than us — the discovery
606            // sweep would otherwise re-traverse them every cycle even
607            // though no msg1/msg2 exchange can ever succeed. Bump the
608            // discovery-layer cooldown to the long protocol-mismatch
609            // window and emit a single WARN per fresh observation.
610            let looks_like_fmp_phase =
611                matches!(prefix.phase, PHASE_ESTABLISHED | PHASE_MSG1 | PHASE_MSG2);
612            if looks_like_fmp_phase
613                && self.bootstrap_transports.contains(&packet.transport_id)
614                && let Some(npub) = self
615                    .bootstrap_transport_npubs
616                    .get(&packet.transport_id)
617                    .cloned()
618                && let Some(handle) = self.nostr_discovery_handle()
619            {
620                let now_ms = Self::now_ms();
621                let cooldown_secs = handle.protocol_mismatch_cooldown_secs();
622                if handle.record_protocol_mismatch(&npub, now_ms) {
623                    warn!(
624                        peer_npub = %npub,
625                        transport_id = %packet.transport_id,
626                        peer_version = prefix.version,
627                        our_version = FMP_VERSION,
628                        cooldown_secs,
629                        "Nostr-discovered peer speaks a different FMP version; suppressing retraversal"
630                    );
631                }
632            }
633            return;
634        }
635
636        match prefix.phase {
637            PHASE_ESTABLISHED => {
638                self.handle_encrypted_frame(packet).await;
639            }
640            PHASE_MSG1 => {
641                self.handle_msg1(packet).await;
642            }
643            PHASE_MSG2 => {
644                self.handle_msg2(packet).await;
645            }
646            _ => {
647                debug!(
648                    phase = prefix.phase,
649                    transport_id = %packet.transport_id,
650                    "Unknown FMP phase, dropping"
651                );
652            }
653        }
654    }
655}