Skip to main content

net/adapter/net/
mod.rs

1//! Net L0 Transport Protocol (Net) adapter.
2//!
3//! Net is a high-performance UDP-based transport protocol designed for
4//! GPU-to-GPU encrypted streaming over UDP. It provides:
5//!
6//! - Zero-copy, zero-allocation hot path
7//! - XChaCha20-Poly1305 encryption per packet
8//! - Noise protocol handshake (NKpsk0)
9//! - Optional per-stream reliability with selective NACKs
10//! - 40-60M events/sec target throughput
11//!
12//! # Usage
13//!
14//! ```rust,ignore
15//! use net::adapter::net::{NetAdapter, NetAdapterConfig, StaticKeypair};
16//!
17//! // Generate keypair for responder
18//! let keypair = StaticKeypair::generate();
19//!
20//! // Create initiator config
21//! let config = NetAdapterConfig::initiator(
22//!     "127.0.0.1:9000".parse()?,
23//!     "127.0.0.1:9001".parse()?,
24//!     psk,
25//!     keypair.public,
26//! );
27//!
28//! // Create adapter
29//! let mut adapter = NetAdapter::new(config)?;
30//! adapter.init().await?;
31//! ```
32
33mod batch;
34pub mod behavior;
35// SDK-level cancel-token registry consumed by the cortex `mesh_rpc`
36// call shapes. Always-built (no cortex feature gate) — the registry
37// is type-pure and small; gating it would mean two parallel
38// definitions, and the `cortex` build is the only consumer today
39// regardless. The `#[allow(dead_code)]` on cortex-off builds keeps
40// the dead-code lint silent without a per-item annotation fan-out.
41#[cfg_attr(not(feature = "cortex"), allow(dead_code))]
42mod cancel_registry;
43pub mod channel;
44pub mod compute;
45mod config;
46pub mod contested;
47pub mod continuity;
48#[cfg(feature = "cortex")]
49pub mod cortex;
50mod crypto;
51mod failure;
52pub mod identity;
53mod mesh;
54// nRPC glue + metrics depend on the cortex fold types (RpcServerFold,
55// RpcClientPending, etc.) and the per-channel-hash inbound dispatcher
56// hook the cortex layer wires up. They're meaningless without
57// `cortex` enabled, and unconditionally exposing them broke `--features
58// net` builds (mesh_rpc.rs references `super::cortex::*`). Gating both
59// keeps the bare-net build clean.
60#[cfg(feature = "dataforts")]
61pub mod dataforts;
62#[cfg(feature = "cortex")]
63pub mod mesh_rpc;
64#[cfg(feature = "cortex")]
65pub mod mesh_rpc_metrics;
66#[cfg(feature = "netdb")]
67pub mod netdb;
68mod pool;
69mod protocol;
70mod proxy;
71#[cfg(feature = "redex")]
72pub mod redex;
73mod reliability;
74mod reroute;
75mod route;
76mod router;
77mod session;
78pub mod state;
79mod stream;
80pub mod subnet;
81pub mod subprotocol;
82mod swarm;
83mod transport;
84#[cfg(feature = "nat-traversal")]
85pub mod traversal;
86
87#[cfg(target_os = "linux")]
88mod linux;
89
90pub use batch::AdaptiveBatcher;
91pub use channel::{
92    AckReason, AuthGuard, AuthVerdict, ChannelConfig, ChannelConfigRegistry, ChannelError,
93    ChannelHash, ChannelId, ChannelName, ChannelPublisher, ChannelRegistry, MembershipMsg,
94    OnFailure, PublishConfig, PublishReport, SubscriberRoster, Visibility,
95    SUBPROTOCOL_CHANNEL_MEMBERSHIP,
96};
97pub use compute::{
98    DaemonError, DaemonFactoryRegistry, DaemonHost, DaemonHostConfig, DaemonRegistry, DaemonStats,
99    FactoryEntry, MeshDaemon, MigrationError, MigrationMessage, MigrationOrchestrator,
100    MigrationPhase, MigrationSourceHandler, MigrationState, MigrationTargetHandler,
101    PlacementDecision, Scheduler, SchedulerError, SUBPROTOCOL_MIGRATION,
102};
103pub use config::{ConnectionRole, NetAdapterConfig, ReliabilityConfig};
104pub use contested::{
105    CorrelatedFailureConfig, CorrelatedFailureDetector, CorrelationVerdict, FailureCause,
106    PartitionDetector, PartitionPhase, PartitionRecord, ReconcileOutcome, Side,
107    SUBPROTOCOL_PARTITION,
108};
109pub use continuity::{
110    assess_continuity, CausalCone, Causality, ContinuityProof, ContinuityStatus, Discontinuity,
111    DiscontinuityReason, ForkRecord, HorizonDivergence, ObservationWindow, ProofError,
112    PropagationModel, SuperpositionPhase, SuperpositionState, SUBPROTOCOL_CONTINUITY,
113};
114#[cfg(feature = "cortex")]
115pub use cortex::{
116    CortexAdapter, CortexAdapterConfig, CortexAdapterError, EventEnvelope, EventMeta,
117    FoldErrorPolicy, IntoRedexPayload, StartPosition, EVENT_META_SIZE,
118};
119pub use crypto::{CryptoError, SessionKeys, StaticKeypair};
120pub use failure::{
121    CircuitBreaker, CircuitState, FailureDetector, FailureDetectorConfig, FailureStats,
122    LossSimulator, NodeStatus, RecoveryAction, RecoveryManager, RecoveryStats,
123};
124pub use identity::{
125    EntityError, EntityId, EntityKeypair, OriginStamp, PermissionToken, TokenCache, TokenError,
126    TokenScope,
127};
128pub use mesh::{MeshNode, MeshNodeConfig, PartitionFilter};
129#[cfg(feature = "netdb")]
130pub use netdb::{MemoriesFilter, NetDb, NetDbBuilder, NetDbError, NetDbSnapshot, TasksFilter};
131// `SharedPacketPool` is intentionally not re-exported — see
132// `pool.rs` for the cross-pool nonce-reuse rationale.
133// `PacketPool` itself stays exposed because tests reference it;
134// only the `Arc<PacketPool>` wrapper alias and its constructor
135// are absent.
136pub use pool::{PacketBuilder, PacketPool, SharedLocalPool, ThreadLocalPool};
137pub use protocol::{
138    EventFrame, NackPayload, NetHeader, PacketFlags, HEADER_SIZE, NONCE_SIZE, TAG_SIZE,
139};
140pub use proxy::{
141    ForwardResult, HopStats, MultiHopPacketBuilder, NetProxy, ProxyConfig, ProxyError, ProxyStats,
142};
143#[cfg(feature = "redex")]
144pub use redex::{
145    FsyncPolicy, IndexOp, IndexStart, OrderedAppender, Redex, RedexEntry, RedexError, RedexEvent,
146    RedexFile, RedexFileConfig, RedexFlags, RedexFold, RedexIndex, TypedRedexFile,
147};
148pub use reliability::{FireAndForget, ReliabilityMode, ReliableStream, RetransmitDescriptor};
149pub use reroute::ReroutePolicy;
150pub use route::{
151    AggregateStats, RouteEntry, RouteFlags, RoutingHeader, RoutingTable, SchedulerStreamStats,
152    ROUTING_HEADER_SIZE,
153};
154pub use router::{FairScheduler, NetRouter, RouteAction, RouterConfig, RouterError, RouterStats};
155// Send-loop drain instrument (NRPC_SEND_LOOP_BATCHING_PLAN). Re-exported only
156// so the in-repo integration tests (which compile as external crates) can
157// drive it; `#[doc(hidden)]` keeps it out of the crate's documented surface.
158#[doc(hidden)]
159pub use router::{
160    arm_send_drain_histo, send_batch_stats, send_drain_histo_snapshot, send_drain_max,
161};
162pub use session::{NetSession, SessionManager, StreamState, TxAdmit, TxSlotGuard};
163pub use state::{
164    CausalChainBuilder, CausalEvent, CausalLink, ChainError, EntityLog, HorizonEncoder, LogError,
165    LogIndex, ObservedHorizon, SnapshotStore, StateSnapshot, CAUSAL_LINK_SIZE, SUBPROTOCOL_CAUSAL,
166    SUBPROTOCOL_SNAPSHOT,
167};
168pub use stream::{
169    CloseBehavior, Reliability, Stream, StreamConfig, StreamError, StreamStats,
170    DEFAULT_STREAM_WINDOW_BYTES,
171};
172pub use subnet::{DropReason, ForwardDecision, SubnetGateway, SubnetId, SubnetPolicy, SubnetRule};
173pub use subprotocol::{
174    negotiate, MigrationSubprotocolHandler, NegotiatedSet, OutboundMigrationMessage,
175    SubprotocolDescriptor, SubprotocolManifest, SubprotocolRegistry, SubprotocolVersion,
176    SUBPROTOCOL_NEGOTIATION,
177};
178pub use swarm::{
179    Capabilities, CapabilityAd, EdgeInfo, GraphStats, LocalGraph, NodeInfo, Pingwave,
180    MAX_GRAPH_NODES, MAX_SEEN_PINGWAVES, PINGWAVE_SIZE,
181};
182pub use transport::{NetSocket, PacketReceiver, PacketSender, ParsedPacket, SocketBufferConfig};
183// Recv-loop batching instrument (NRPC_RECV_LOOP_BATCHING_PLAN), symmetric to
184// the send-side drain instrument. Compiled only under the `batched-ingress`
185// build feature (it measures that path). Re-exported only so the in-repo
186// integration tests can drive it; `#[doc(hidden)]` keeps it off the
187// documented surface.
188#[cfg(feature = "batched-ingress")]
189#[doc(hidden)]
190pub use transport::{
191    arm_recv_drain_histo, recv_batch_stats, recv_drain_histo_snapshot, recv_drain_max,
192    RECV_DRAIN_BUCKETS,
193};
194
195use async_trait::async_trait;
196use bytes::Bytes;
197use crossbeam_queue::SegQueue;
198use dashmap::DashMap;
199use std::sync::atomic::{AtomicBool, Ordering};
200use std::sync::Arc;
201use tokio::sync::Mutex as TokioMutex;
202use tokio::sync::Notify;
203use tokio::task::JoinHandle;
204
205use crate::adapter::{Adapter, ShardPollResult};
206use crate::error::AdapterError;
207use crate::event::{Batch, StoredEvent};
208
209use crypto::NoiseHandshake;
210use session::SessionManager as SessionMgr;
211use transport::NetSocket as Socket;
212
213// Re-export xxh3 utilities for stream routing
214pub use routing::{route_to_shard, stream_id_from_bytes, stream_id_from_key};
215
216/// Threshold below which the cached coarse-clock reading is reused
217/// instead of re-reading the OS wall clock. 1 ms is well below the
218/// session-timeout / heartbeat / NACK cadence the consumers care
219/// about (those tick on the seconds scale), and well above the
220/// `Instant::now` cost (~10 ns) we still pay per call to gate the
221/// cache. Per PERF_AUDIT §2.7.
222const COARSE_CLOCK_REFRESH_NS: u64 = 1_000_000; // 1 ms
223
224/// Current timestamp in nanoseconds since the Unix epoch.
225///
226/// Shared utility — avoids duplicating this across `causal.rs`, `snapshot.rs`,
227/// `observation.rs`, `migration.rs`, `session.rs`, and `token.rs`.
228///
229/// Saturates via `try_from` so future-dated clocks land at
230/// `u64::MAX` instead of wrapping near 0. A bare `as u64` would
231/// silently truncate the `u128` returned by
232/// `Duration::as_nanos()`. Practical wraparound from monotonic
233/// flow doesn't happen until ~year 2554, but a system whose clock
234/// was misconfigured to a far-future date would produce a tiny
235/// truncated timestamp — immediately tripping `is_timed_out`
236/// everywhere. `unwrap_or_default()` returning `Duration::ZERO`
237/// for a pre-epoch clock would also produce identical timestamps
238/// that break ordering.
239///
240/// Coarse-clock cached per thread at [`COARSE_CLOCK_REFRESH_NS`]
241/// granularity (PERF_AUDIT §2.7) — readings may be up to 1 ms
242/// stale, and two threads may disagree by up to that much.
243/// Consumers doing timeout arithmetic MUST use `saturating_sub`
244/// (they all do today) so a reader with a staler cache than the
245/// toucher can't wrap and false-expire.
246#[inline]
247pub(crate) fn current_timestamp() -> u64 {
248    // **PERF_AUDIT §2.7.** Per-packet RX/TX paths each call
249    // `current_timestamp()` twice (one stream `touch` + one session
250    // `touch`). On Windows `SystemTime::now()` is
251    // `GetSystemTimePreciseAsFileTime` (~600 ns); on Linux it's
252    // `clock_gettime(CLOCK_REALTIME)` (~120 ns). At sustained packet
253    // rates the four wall-clock reads per ping eat measurable CPU.
254    //
255    // Coarse-clock cache: a `thread_local!` Cell holds the last
256    // `(Instant, u64-ns)` pair. Each call asks `Instant::now()`
257    // (~10 ns — TSC-backed on both Linux and Windows) whether 1 ms
258    // has elapsed; if not, the cached `u64` is reused. Repeated
259    // calls within the same millisecond from the same thread pay
260    // one Instant comparison instead of one OS wall-clock syscall.
261    //
262    // Consumers (`session.is_timed_out` against multi-second
263    // timeouts, `last_activity_ns` for diagnostics) are insensitive
264    // to ≤ 1 ms drift; the wire envelopes that need absolute epoch
265    // ns (capability announcements, snapshots) call
266    // `current_timestamp_micros` or stamp `SystemTime::now()`
267    // directly — neither hits this path.
268    thread_local! {
269        static COARSE_CLOCK: std::cell::Cell<Option<(std::time::Instant, u64)>>
270            = const { std::cell::Cell::new(None) };
271    }
272    COARSE_CLOCK.with(|cell| {
273        let now_inst = std::time::Instant::now();
274        let (store, ns) = coarse_clock_advance(cell.get(), now_inst, || {
275            let elapsed = std::time::SystemTime::now()
276                .duration_since(std::time::UNIX_EPOCH)
277                .unwrap_or_default();
278            u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX)
279        });
280        if let Some(pair) = store {
281            cell.set(Some(pair));
282        }
283        ns
284    })
285}
286
287/// Pure core of the §2.7 coarse clock: given the cached
288/// `(instant, ns)` pair and the current `Instant`, decide whether
289/// to reuse the cached reading (younger than
290/// [`COARSE_CLOCK_REFRESH_NS`]) or call `read_wall` for a fresh
291/// wall-clock value. Returns `(cache update, value)` — `None`
292/// means a cache hit (nothing to store, keeping the hit path
293/// store-free); `Some(pair)` rebases the refresh window on the
294/// read instant.
295///
296/// Extracted from [`current_timestamp`] so the reuse/refresh
297/// decision is testable with synthetic instants. The previous
298/// test drove the real thread-local with a 100-read burst and
299/// asserted every value matched — correct on an idle machine, but
300/// a > 1 ms OS preemption mid-burst legitimately rolls the window,
301/// so the assertion was probabilistic under CI load even with
302/// retries.
303#[inline]
304fn coarse_clock_advance(
305    cached: Option<(std::time::Instant, u64)>,
306    now_inst: std::time::Instant,
307    read_wall: impl FnOnce() -> u64,
308) -> (Option<(std::time::Instant, u64)>, u64) {
309    if let Some((last_inst, last_ns)) = cached {
310        if now_inst.duration_since(last_inst).as_nanos() < COARSE_CLOCK_REFRESH_NS as u128 {
311            return (None, last_ns);
312        }
313    }
314    let ns = read_wall();
315    (Some((now_inst, ns)), ns)
316}
317
318/// Current timestamp in microseconds since the Unix epoch.
319/// Saturates at `0` on pre-epoch clocks (the wire envelopes that
320/// consume this — fold announcements, snapshots — treat micros
321/// purely as diagnostics, never as ordering, so a saturated
322/// reading is benign).
323#[inline]
324pub(crate) fn current_timestamp_micros() -> u64 {
325    std::time::SystemTime::now()
326        .duration_since(std::time::UNIX_EPOCH)
327        .map(|d| d.as_micros() as u64)
328        .unwrap_or(0)
329}
330
331/// Fast xxh3-based routing utilities for Net streams.
332///
333/// Uses xxh3 (~50GB/s) for deterministic, high-performance stream routing.
334mod routing {
335    use xxhash_rust::xxh3::xxh3_64;
336
337    /// Generate a stream ID from arbitrary data.
338    ///
339    /// Uses xxh3 for fast, deterministic hashing (~50GB/s on modern CPUs).
340    #[inline]
341    pub fn stream_id_from_bytes(data: &[u8]) -> u64 {
342        xxh3_64(data)
343    }
344
345    /// Generate a stream ID from a string key.
346    ///
347    /// Convenience wrapper for `stream_id_from_bytes`.
348    #[inline]
349    pub fn stream_id_from_key(key: &str) -> u64 {
350        xxh3_64(key.as_bytes())
351    }
352
353    /// Route data to a shard based on its content hash.
354    ///
355    /// Returns a shard ID in the range `[0, num_shards)`.
356    ///
357    /// # Panics
358    ///
359    /// Panics if `num_shards` is 0.
360    #[inline]
361    pub fn route_to_shard(data: &[u8], num_shards: u16) -> u16 {
362        assert!(num_shards > 0, "num_shards must be > 0");
363        (xxh3_64(data) % num_shards as u64) as u16
364    }
365
366    #[cfg(test)]
367    mod tests {
368        use super::*;
369
370        #[test]
371        fn test_stream_id_deterministic() {
372            let data = b"test event data";
373            let id1 = stream_id_from_bytes(data);
374            let id2 = stream_id_from_bytes(data);
375            assert_eq!(id1, id2);
376        }
377
378        #[test]
379        fn test_stream_id_different_for_different_data() {
380            let id1 = stream_id_from_bytes(b"event1");
381            let id2 = stream_id_from_bytes(b"event2");
382            assert_ne!(id1, id2);
383        }
384
385        #[test]
386        fn test_stream_id_from_key() {
387            let id = stream_id_from_key("user:12345");
388            assert_ne!(id, 0);
389        }
390
391        #[test]
392        fn test_route_to_shard_range() {
393            let num_shards = 16u16;
394            for i in 0..1000 {
395                let data = format!("event_{}", i);
396                let shard = route_to_shard(data.as_bytes(), num_shards);
397                assert!(shard < num_shards);
398            }
399        }
400
401        #[test]
402        #[should_panic(expected = "num_shards must be > 0")]
403        fn test_route_to_shard_zero_shards_panics() {
404            // Regression: route_to_shard(_, 0) caused a divide-by-zero panic
405            // with no helpful message. Now it asserts with a clear message.
406            route_to_shard(b"test", 0);
407        }
408
409        #[test]
410        fn test_route_to_shard_distribution() {
411            let num_shards = 8u16;
412            let mut counts = [0u32; 8];
413
414            for i in 0..8000 {
415                let data = format!("event_{}", i);
416                let shard = route_to_shard(data.as_bytes(), num_shards);
417                counts[shard as usize] += 1;
418            }
419
420            // Check that distribution is reasonably uniform (within 50% of expected)
421            let expected = 1000;
422            for count in counts {
423                assert!(count > expected / 2, "shard count {} too low", count);
424                assert!(count < expected * 2, "shard count {} too high", count);
425            }
426        }
427    }
428}
429
430/// Shared inbound queue type
431type InboundQueues = Arc<DashMap<u16, SegQueue<StoredEvent>>>;
432
433/// Per-source rate limiter for the handshake responder loop.
434///
435/// The responder used to accept whichever source emitted msg1
436/// first, with no per-source pacing — an attacker who knows the PSK
437/// (PSKs are typically multi-tenant) could race the legitimate
438/// initiator's msg1; even without the PSK an attacker could flood
439/// handshake-flagged datagrams to monopolize the recv loop.
440///
441/// `HandshakePacer` keeps a rolling count of recent attempts per
442/// source and rejects sources that exceed the budget within the
443/// window. Expired entries are garbage-collected on a periodic
444/// schedule rather than on every check, so a sustained flood from
445/// many distinct sources doesn't pay an O(n) sweep per packet.
446pub(crate) struct HandshakePacer {
447    /// Per-source `(count_in_window, window_start)`.
448    entries: std::collections::HashMap<std::net::SocketAddr, (u32, std::time::Instant)>,
449    /// Maximum attempts per source within `window`.
450    max_per_window: u32,
451    /// Window length.
452    window: std::time::Duration,
453    /// Last time we ran the GC pass.
454    last_gc: std::time::Instant,
455    /// Soft cap on `entries` size before forcing a GC pass even
456    /// before the periodic deadline. Keeps memory bounded against
457    /// an attacker fanning across many spoofed source addresses.
458    gc_size_threshold: usize,
459}
460
461impl HandshakePacer {
462    pub(crate) fn new(max_per_window: u32, window: std::time::Duration) -> Self {
463        Self {
464            entries: std::collections::HashMap::new(),
465            max_per_window,
466            window,
467            last_gc: std::time::Instant::now(),
468            // 4096 entries × ~40 bytes each ≈ 160 KiB — comfortable
469            // ceiling that still triggers GC well before any
470            // realistic memory issue.
471            gc_size_threshold: 4096,
472        }
473    }
474
475    /// Record an attempt from `source`. Returns `true` if the source
476    /// is within budget (caller may proceed); `false` if it has
477    /// exceeded the rate limit (caller must drop the packet).
478    pub(crate) fn check_and_record(&mut self, source: std::net::SocketAddr) -> bool {
479        let now = std::time::Instant::now();
480        // Amortized GC: only run the O(n) `retain` sweep when one
481        // of two thresholds trips:
482        //   1. We haven't GC'd in `window` (entries are valid for
483        //      at most `2 * window` so a once-per-`window` cadence
484        //      is sufficient to keep the map proportional to the
485        //      active source set).
486        //   2. The map exceeds `gc_size_threshold`, indicating a
487        //      flood attempt across many spoofed sources.
488        if now.duration_since(self.last_gc) >= self.window
489            || self.entries.len() >= self.gc_size_threshold
490        {
491            let cutoff = self.window.saturating_mul(2);
492            self.entries
493                .retain(|_, (_, start)| now.duration_since(*start) < cutoff);
494            self.last_gc = now;
495        }
496
497        let entry = self.entries.entry(source).or_insert((0, now));
498        if now.duration_since(entry.1) > self.window {
499            // Window expired; reset the counter.
500            entry.0 = 0;
501            entry.1 = now;
502        }
503        entry.0 = entry.0.saturating_add(1);
504        entry.0 <= self.max_per_window
505    }
506}
507
508/// Net adapter for high-performance UDP transport.
509pub struct NetAdapter {
510    /// Configuration
511    config: NetAdapterConfig,
512    /// UDP socket
513    socket: Option<Arc<Socket>>,
514    /// Session (stored separately for init)
515    session: Option<Arc<NetSession>>,
516    /// Session manager
517    session_manager: SessionMgr,
518    /// Inbound events per shard (for poll_shard)
519    inbound: InboundQueues,
520    /// Background tasks
521    tasks: TokioMutex<Vec<JoinHandle<()>>>,
522    /// Shutdown signal (flag for polling, Notify for waking blocked tasks)
523    shutdown: Arc<AtomicBool>,
524    /// Notify to wake tasks blocked on I/O during shutdown
525    shutdown_notify: Arc<Notify>,
526    /// Initialization state
527    initialized: AtomicBool,
528    /// Per-source rate limiter for the handshake responder loop.
529    /// Without this, an attacker can flood handshake-flagged
530    /// datagrams to monopolize the recv path or race a legitimate
531    /// initiator's msg1.
532    handshake_pacer: parking_lot::Mutex<HandshakePacer>,
533}
534
535impl NetAdapter {
536    /// Create a new Net adapter.
537    pub fn new(config: NetAdapterConfig) -> Result<Self, AdapterError> {
538        config
539            .validate()
540            .map_err(|e| AdapterError::Fatal(format!("invalid config: {}", e)))?;
541
542        Ok(Self {
543            session_manager: SessionMgr::new(config.session_timeout),
544            config,
545            socket: None,
546            session: None,
547            inbound: Arc::new(DashMap::new()),
548            tasks: TokioMutex::new(Vec::new()),
549            shutdown: Arc::new(AtomicBool::new(false)),
550            shutdown_notify: Arc::new(Notify::new()),
551            initialized: AtomicBool::new(false),
552            // 5 attempts per second per source, plenty for any
553            // legitimate initiator (RTT-limited) and tight enough
554            // to throttle a flooder on consumer-grade hardware.
555            handshake_pacer: parking_lot::Mutex::new(HandshakePacer::new(
556                5,
557                std::time::Duration::from_secs(1),
558            )),
559        })
560    }
561
562    /// Perform Noise handshake with peer.
563    /// Returns session keys and the actual peer address (from the wire, not config).
564    async fn perform_handshake(
565        &self,
566        socket: &Socket,
567    ) -> Result<(SessionKeys, std::net::SocketAddr), AdapterError> {
568        let mut attempt = 0;
569        let max_attempts = self.config.handshake_retries;
570
571        // Cap per-attempt sleep so a misconfigured `handshake_retries`
572        // near `MAX_HANDSHAKE_RETRIES` (1024) cannot pin `init()` for
573        // hours. Pre-fix `100 * attempt` grew linearly and unbounded:
574        // attempt 1024 slept ~102s, with cumulative wait across all
575        // attempts approaching 14 hours. Capping at 5s gives bounded
576        // worst-case `max_attempts × 5s` (~85 minutes at the cap),
577        // which is still long but not unbounded.
578        const HANDSHAKE_RETRY_SLEEP_CAP_MS: u64 = 5_000;
579
580        loop {
581            attempt += 1;
582            match self.try_handshake(socket).await {
583                Ok(result) => return Ok(result),
584                Err(e) if attempt < max_attempts => {
585                    tracing::warn!(
586                        attempt = attempt,
587                        max = max_attempts,
588                        error = %e,
589                        "handshake failed, retrying"
590                    );
591                    let backoff_ms =
592                        (100u64.saturating_mul(attempt as u64)).min(HANDSHAKE_RETRY_SLEEP_CAP_MS);
593                    tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
594                }
595                Err(e) => return Err(e),
596            }
597        }
598    }
599
600    /// Single handshake attempt.
601    /// Returns session keys and the actual peer address.
602    async fn try_handshake(
603        &self,
604        socket: &Socket,
605    ) -> Result<(SessionKeys, std::net::SocketAddr), AdapterError> {
606        let timeout = self.config.handshake_timeout;
607        let socket_arc = socket.socket_arc();
608
609        if self.config.is_initiator() {
610            // Initiator flow
611            let peer_pubkey = self
612                .config
613                .peer_static_pubkey
614                .as_ref()
615                .ok_or_else(|| AdapterError::Fatal("missing peer public key".into()))?;
616
617            let mut handshake = NoiseHandshake::initiator(&self.config.psk, peer_pubkey)
618                .map_err(|e| AdapterError::Fatal(format!("handshake init failed: {}", e)))?;
619
620            // Send first message
621            let msg1 = handshake
622                .write_message(&[])
623                .map_err(|e| AdapterError::Connection(format!("write_message failed: {}", e)))?;
624
625            let mut builder = PacketBuilder::new(&[0u8; 32], 0);
626            let packet = builder.build_handshake(&msg1);
627
628            socket
629                .send_to(&packet, self.config.peer_addr)
630                .await
631                .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
632
633            // Receive response, discarding datagrams that are not handshake
634            // packets from the expected peer. This prevents stray traffic on
635            // the shared socket from consuming the handshake slot.
636            let (parsed, _source) = tokio::time::timeout(timeout, async {
637                // Stack buffer reused across loop iterations.
638                // `MAX_PACKET_SIZE` is 8192 bytes — small enough to
639                // live on the async stack without spilling, and the
640                // reuse drops the per-iteration `BytesMut::with_capacity`
641                // alloc on the stray-traffic path. Pre-fix every
642                // discarded datagram (an off-peer packet, an invalid
643                // handshake) allocated a fresh 8 KiB and freed it at
644                // loop end — under stray UDP traffic on the same
645                // bind port this churned the allocator. Only the
646                // success path now allocates (a `Bytes::copy_from_slice`
647                // sized to the actual payload, since `ParsedPacket`
648                // owns its `Bytes`).
649                let mut recv_buf = [0u8; protocol::MAX_PACKET_SIZE];
650                loop {
651                    let (n, source) = socket_arc
652                        .recv_from(&mut recv_buf)
653                        .await
654                        .map_err(|e| AdapterError::Connection(format!("recv failed: {}", e)))?;
655
656                    // Only accept packets from the peer we initiated with
657                    if source != self.config.peer_addr {
658                        continue;
659                    }
660
661                    let data = bytes::Bytes::copy_from_slice(&recv_buf[..n]);
662
663                    if let Some(p) = ParsedPacket::parse(data, source) {
664                        if p.header.flags.is_handshake() {
665                            return Ok::<_, AdapterError>((p, source));
666                        }
667                    }
668                    // Not a valid handshake packet from our peer — keep waiting
669                }
670            })
671            .await
672            .map_err(|_| AdapterError::Connection("handshake timeout".into()))??;
673
674            // Process response
675            handshake
676                .read_message(&parsed.payload)
677                .map_err(|e| AdapterError::Connection(format!("read_message failed: {}", e)))?;
678
679            // Extract session keys
680            let keys = handshake
681                .into_session_keys()
682                .map_err(|e| AdapterError::Fatal(format!("key extraction failed: {}", e)))?;
683            Ok((keys, self.config.peer_addr))
684        } else {
685            // Responder flow
686            let keypair = self
687                .config
688                .static_keypair
689                .as_ref()
690                .ok_or_else(|| AdapterError::Fatal("missing static keypair".into()))?;
691
692            // Wait for an initiator handshake message, discarding any
693            // non-handshake datagrams that arrive on the shared
694            // socket. Per-source pacing throttles flooders so the
695            // legitimate initiator's msg1 can land — without it,
696            // an attacker could blast handshake-flagged datagrams
697            // and monopolize this recv loop.
698            let (parsed, source) = tokio::time::timeout(timeout, async {
699                loop {
700                    let mut recv_buf = bytes::BytesMut::with_capacity(protocol::MAX_PACKET_SIZE);
701                    recv_buf.resize(protocol::MAX_PACKET_SIZE, 0);
702
703                    let (n, source) = socket_arc
704                        .recv_from(&mut recv_buf)
705                        .await
706                        .map_err(|e| AdapterError::Connection(format!("recv failed: {}", e)))?;
707
708                    recv_buf.truncate(n);
709                    let data = recv_buf.freeze();
710
711                    if let Some(p) = ParsedPacket::parse(data, source) {
712                        if p.header.flags.is_handshake() {
713                            // Per-source pacing: drop packets from
714                            // sources that exceed the budget.
715                            let allowed = self.handshake_pacer.lock().check_and_record(source);
716                            if !allowed {
717                                tracing::debug!(
718                                    %source,
719                                    "handshake responder: dropping packet from \
720                                     rate-limited source"
721                                );
722                                continue;
723                            }
724                            return Ok::<_, AdapterError>((p, source));
725                        }
726                    }
727                    // Not a valid handshake packet — keep waiting
728                }
729            })
730            .await
731            .map_err(|_| AdapterError::Connection("handshake timeout".into()))??;
732
733            let mut handshake = NoiseHandshake::responder(&self.config.psk, keypair)
734                .map_err(|e| AdapterError::Fatal(format!("handshake init failed: {}", e)))?;
735
736            // Process initiator message
737            handshake
738                .read_message(&parsed.payload)
739                .map_err(|e| AdapterError::Connection(format!("read_message failed: {}", e)))?;
740
741            // Send response
742            let msg2 = handshake
743                .write_message(&[])
744                .map_err(|e| AdapterError::Connection(format!("write_message failed: {}", e)))?;
745
746            let mut builder = PacketBuilder::new(&[0u8; 32], 0);
747            let packet = builder.build_handshake(&msg2);
748
749            // Reply to the actual source address (not the configured peer_addr),
750            // so the handshake completes even behind NAT or when the config is stale.
751            socket
752                .send_to(&packet, source)
753                .await
754                .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
755
756            // Extract session keys and use the actual source address as peer
757            let keys = handshake
758                .into_session_keys()
759                .map_err(|e| AdapterError::Fatal(format!("key extraction failed: {}", e)))?;
760            Ok((keys, source))
761        }
762    }
763
764    /// Process a single received packet: parse, decrypt, and queue events.
765    fn process_packet(
766        data: Bytes,
767        source: std::net::SocketAddr,
768        session: &NetSession,
769        inbound: &InboundQueues,
770        num_shards: u16,
771    ) {
772        // Parse packet
773        let mut parsed = match ParsedPacket::parse(data, source) {
774            Some(p) => p,
775            None => return,
776        };
777
778        // Reject packets whose actual payload size doesn't match the declared
779        // length. This catches truncated or oversized packets before they
780        // reach the decrypt path.
781        if !parsed.header.flags.is_handshake()
782            && !parsed.header.flags.is_heartbeat()
783            && !parsed.is_valid_length()
784        {
785            return;
786        }
787
788        // Skip handshake packets in the data path (handled during init)
789        if parsed.header.flags.is_handshake() {
790            return;
791        }
792
793        // Validate session before any state mutation (including touch)
794        if parsed.header.session_id != session.session_id() {
795            return;
796        }
797
798        // Heartbeats are AEAD-tagged: the empty payload encrypts to
799        // a 16-byte Poly1305 tag, and the receiver verifies the
800        // tag here. Without this check, an off-path attacker who
801        // observed or guessed the session_id could spoof
802        // heartbeats and keep a session alive (the source-address
803        // check on UDP is itself spoofable, and session_id is in
804        // cleartext on every prior packet).
805        //
806        // We still require `source == peer_addr` as a cheap
807        // first-line filter so an unauthenticated flood doesn't
808        // get to do the AEAD verify.
809        //
810        // The verify+touch sequence lives inside
811        // `NetSession::verify_and_touch_heartbeat` so callers can't
812        // touch a session whose heartbeat failed verify, and can't
813        // forget to touch on success.
814        if parsed.header.flags.is_heartbeat() {
815            if source == session.peer_addr() {
816                session.verify_and_touch_heartbeat(&parsed);
817            }
818            return;
819        }
820
821        // Decrypt payload. Per crypto-session perf #128, route
822        // through `decrypt_to_bytes` so the common case
823        // (refcount-1 inbound buffer) decrypts in place instead
824        // of allocating a fresh `Vec<u8>` plaintext per packet.
825        let aad = parsed.header.aad();
826        let counter = u64::from_le_bytes(parsed.header.nonce[4..12].try_into().unwrap_or([0u8; 8]));
827        let rx_cipher = session.rx_cipher();
828        let payload = std::mem::take(&mut parsed.payload);
829        // Per crypto-session perf #132: collapsed the pre-decrypt
830        // `is_valid_rx_counter` + post-decrypt `update_rx_counter`
831        // two-step into one `try_admit_rx_counter` call. See
832        // `mesh.rs::process_local_packet` for the full rationale —
833        // the contract is identical (replays still rejected, TOCTOU
834        // still closed), and the redundant Mutex lock on every
835        // non-replay packet is gone.
836        let decrypted = match rx_cipher.decrypt_to_bytes(counter, &aad, payload) {
837            Ok(d) => {
838                if !rx_cipher.try_admit_rx_counter(counter) {
839                    return;
840                }
841                d
842            }
843            Err(_) => return,
844        };
845
846        // Parse events
847        let events = EventFrame::read_events(decrypted, parsed.header.event_count);
848
849        // Update stream state
850        let stream_id = parsed.header.stream_id;
851        let shard_id = if num_shards > 0 {
852            (stream_id % num_shards as u64) as u16
853        } else {
854            0
855        };
856
857        // Previously the boolean result of `r.on_receive(seq)` was
858        // discarded — a duplicate (NACK retransmit, rebroadcast,
859        // etc.) returned `false` but the events were still queued for
860        // poll_shard, breaking exactly-once delivery on reliable
861        // streams. The cipher's replay window doesn't catch this
862        // because each retransmit is re-encrypted with a fresh outer
863        // counter.
864        //
865        // Now: if `on_receive` reports a duplicate, we still call
866        // `session.touch()` (the peer is alive) but skip the queue
867        // step entirely — the original delivery already queued the
868        // events.
869        let is_fresh = {
870            let stream = session.get_or_create_stream(stream_id);
871            // `with_reliability` always invokes the closure (it
872            // locks an internal `Mutex<Box<dyn ReliabilityMode>>`).
873            // For streams without a meaningful reliability mode the
874            // implementation returns `true` for every `on_receive`,
875            // matching the historical "always queue" behavior.
876            let fresh = stream.with_reliability(|r| r.on_receive(parsed.header.sequence));
877            stream.update_rx_seq(parsed.header.sequence);
878            fresh
879        };
880
881        if is_fresh {
882            // Queue events for poll_shard
883            let queue = inbound.entry(shard_id).or_default();
884            let seq = parsed.header.sequence;
885            for (i, event_data) in events.into_iter().enumerate() {
886                use std::fmt::Write;
887                let mut event_id = String::with_capacity(24);
888                let _ = write!(event_id, "{}:{}", seq, i);
889                queue.push(StoredEvent::new(event_id, event_data, seq, shard_id));
890            }
891        } else {
892            tracing::debug!(
893                seq = parsed.header.sequence,
894                stream_id,
895                "Dropping duplicate packet"
896            );
897        }
898
899        session.touch();
900    }
901
902    /// Spawn receiver task.
903    ///
904    /// On Linux, uses a dedicated OS thread with batched recvmmsg for up to
905    /// 64 packets per syscall. On other platforms, uses standard async recv.
906    ///
907    /// Note: `BatchedPacketReceiver`'s channel carries a whole recvmmsg batch
908    /// per message (depth measured in batches, not packets). That is a shared
909    /// property of the type — see its docstring — and applies here regardless
910    /// of the `batched-ingress` feature, which only gates the mesh opt-in and
911    /// the measurement instrument, not this always-on adapter path.
912    #[cfg(target_os = "linux")]
913    fn spawn_receiver(
914        shutdown: Arc<AtomicBool>,
915        shutdown_notify: Arc<Notify>,
916        socket: Arc<Socket>,
917        session: Arc<NetSession>,
918        inbound: InboundQueues,
919        num_shards: u16,
920    ) -> JoinHandle<()> {
921        let mut receiver = transport::BatchedPacketReceiver::new(socket.socket_arc());
922
923        tokio::spawn(async move {
924            while !shutdown.load(Ordering::Acquire) {
925                tokio::select! {
926                    result = receiver.recv() => {
927                        match result {
928                            Ok((data, source)) => {
929                                Self::process_packet(data, source, &session, &inbound, num_shards);
930                            }
931                            Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {
932                                tracing::warn!("batch receiver thread exited, stopping receiver");
933                                break;
934                            }
935                            Err(e) => {
936                                if !shutdown.load(Ordering::Acquire) {
937                                    tracing::warn!(error = %e, "receive error");
938                                }
939                            }
940                        }
941                    }
942                    _ = shutdown_notify.notified() => {
943                        break;
944                    }
945                }
946            }
947        })
948    }
949
950    /// Spawn receiver task (non-Linux fallback).
951    #[cfg(not(target_os = "linux"))]
952    fn spawn_receiver(
953        shutdown: Arc<AtomicBool>,
954        shutdown_notify: Arc<Notify>,
955        socket: Arc<Socket>,
956        session: Arc<NetSession>,
957        inbound: InboundQueues,
958        num_shards: u16,
959    ) -> JoinHandle<()> {
960        tokio::spawn(async move {
961            let mut receiver = PacketReceiver::new(socket.socket_arc());
962
963            while !shutdown.load(Ordering::Acquire) {
964                // Race recv against shutdown notification so the task
965                // can exit promptly instead of blocking on recv_from
966                // until a packet arrives.
967                tokio::select! {
968                    result = receiver.recv() => {
969                        match result {
970                            Ok((data, source)) => {
971                                Self::process_packet(data, source, &session, &inbound, num_shards);
972                            }
973                            Err(e) => {
974                                if !shutdown.load(Ordering::Acquire) {
975                                    tracing::warn!(error = %e, "receive error");
976                                }
977                            }
978                        }
979                    }
980                    _ = shutdown_notify.notified() => {
981                        break;
982                    }
983                }
984            }
985        })
986    }
987
988    /// Spawn heartbeat task.
989    fn spawn_heartbeat(
990        shutdown: Arc<AtomicBool>,
991        shutdown_notify: Arc<Notify>,
992        socket: Arc<Socket>,
993        session: Arc<NetSession>,
994        interval: std::time::Duration,
995        peer_addr: std::net::SocketAddr,
996    ) -> JoinHandle<()> {
997        tokio::spawn(async move {
998            let mut ticker = tokio::time::interval(interval);
999
1000            loop {
1001                tokio::select! {
1002                    _ = ticker.tick() => {
1003                        if shutdown.load(Ordering::Acquire) || !session.is_active() {
1004                            break;
1005                        }
1006
1007                        // `Session::build_heartbeat` routes through
1008                        // `thread_local_pool` (same pool the data
1009                        // path uses) so heartbeats share a single
1010                        // TX counter with data and interleave
1011                        // correctly on the wire. Constructing a
1012                        // bespoke `PacketBuilder::new(&[0u8; 32],
1013                        // session.session_id())` per tick would
1014                        // (a) use the wrong key so the receiver's
1015                        // AEAD verify would reject every heartbeat,
1016                        // and (b) reuse counter=0 across successive
1017                        // heartbeats so the receiver's replay
1018                        // window would reject every heartbeat
1019                        // after the first.
1020                        let packet = session.build_heartbeat();
1021
1022                        if let Err(e) = socket.send_to(&packet, peer_addr).await {
1023                            tracing::warn!(error = %e, "heartbeat send failed");
1024                        }
1025                    }
1026                    _ = shutdown_notify.notified() => {
1027                        break;
1028                    }
1029                }
1030            }
1031        })
1032    }
1033}
1034
1035#[async_trait]
1036impl Adapter for NetAdapter {
1037    async fn init(&mut self) -> Result<(), AdapterError> {
1038        if self.initialized.load(Ordering::Acquire) {
1039            return Ok(());
1040        }
1041
1042        // Create socket with configured buffer sizes
1043        let socket_config = match (
1044            self.config.socket_recv_buffer,
1045            self.config.socket_send_buffer,
1046        ) {
1047            (Some(recv), Some(send)) => transport::SocketBufferConfig {
1048                recv_buffer_size: recv,
1049                send_buffer_size: send,
1050            },
1051            _ => transport::SocketBufferConfig::default(),
1052        };
1053        let socket = Socket::with_config(self.config.bind_addr, socket_config)
1054            .await
1055            .map_err(|e| AdapterError::Connection(format!("socket creation failed: {}", e)))?;
1056
1057        let socket = Arc::new(socket);
1058        self.socket = Some(socket.clone());
1059
1060        // Perform handshake — actual_peer is the real address from the wire
1061        let (keys, actual_peer) = self.perform_handshake(&socket).await?;
1062
1063        // Create packet pool with TX key
1064        // Create session with the actual peer address (not the configured one,
1065        // which may be stale or pre-NAT)
1066        let session = Arc::new(NetSession::new(
1067            keys,
1068            actual_peer,
1069            self.config.packet_pool_size,
1070            self.config.default_reliability.is_reliable(),
1071        ));
1072        self.session = Some(session.clone());
1073
1074        // Store in session manager for health checks (same Arc as the active session)
1075        self.session_manager.set_session_arc(session.clone());
1076
1077        // Spawn background tasks
1078        let recv_task = Self::spawn_receiver(
1079            self.shutdown.clone(),
1080            self.shutdown_notify.clone(),
1081            socket.clone(),
1082            session.clone(),
1083            self.inbound.clone(),
1084            self.config.num_shards,
1085        );
1086
1087        let heartbeat_task = Self::spawn_heartbeat(
1088            self.shutdown.clone(),
1089            self.shutdown_notify.clone(),
1090            socket,
1091            session,
1092            self.config.heartbeat_interval,
1093            actual_peer,
1094        );
1095
1096        {
1097            let mut tasks = self.tasks.lock().await;
1098            tasks.push(recv_task);
1099            tasks.push(heartbeat_task);
1100        }
1101
1102        self.initialized.store(true, Ordering::Release);
1103
1104        tracing::info!(
1105            bind_addr = %self.config.bind_addr,
1106            peer_addr = %self.config.peer_addr,
1107            role = ?self.config.role,
1108            "Net adapter initialized"
1109        );
1110
1111        Ok(())
1112    }
1113
1114    async fn on_batch(&self, batch: std::sync::Arc<Batch>) -> Result<(), AdapterError> {
1115        let session = self
1116            .session
1117            .as_ref()
1118            .ok_or_else(|| AdapterError::Connection("not connected".into()))?;
1119
1120        let socket = self
1121            .socket
1122            .as_ref()
1123            .ok_or_else(|| AdapterError::Connection("socket not initialized".into()))?;
1124
1125        let stream_id = batch.shard_id as u64;
1126        let peer_addr = session.peer_addr();
1127
1128        // Read stream config under the lock, then drop it immediately.
1129        // Holding the DashMap RefMut across .await would deadlock against
1130        // the receiver task which also calls get_or_create_stream().
1131        let reliable = {
1132            let stream = session.get_or_create_stream(stream_id);
1133            stream.with_reliability(|r| r.needs_ack())
1134            // RefMut dropped here
1135        };
1136
1137        // Convert events to bytes and batch them
1138        let mut current_batch: Vec<Bytes> = Vec::with_capacity(64);
1139        let mut current_size = 0usize;
1140
1141        // Thread-local pool with counter-based nonces — zero contention
1142        let pool = session.thread_local_pool();
1143        let mut builder = pool.get();
1144
1145        for event in &batch.events {
1146            let event_bytes = event.raw.clone();
1147            let frame_size = EventFrame::LEN_SIZE + event_bytes.len();
1148
1149            // Check if adding this event would exceed packet size
1150            if current_size + frame_size > protocol::MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
1151                // Acquire stream lock briefly for seq + reliability tracking
1152                let seq;
1153                {
1154                    let stream = session.get_or_create_stream(stream_id);
1155                    seq = stream.next_tx_seq();
1156                }
1157
1158                let flags = if reliable {
1159                    PacketFlags::RELIABLE
1160                } else {
1161                    PacketFlags::NONE
1162                };
1163
1164                let packet = builder.build(stream_id, seq, &current_batch, flags);
1165
1166                // No DashMap lock held during this .await
1167                socket
1168                    .send_to(&packet, peer_addr)
1169                    .await
1170                    .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
1171
1172                // Track for reliability with PRE-encryption inputs.
1173                // Stashing the encrypted bytes was unsound: the
1174                // receiver's replay window rejects retransmits that
1175                // carry a stale wire counter. The descriptor lets
1176                // the retransmit driver call `builder.build` again
1177                // with a fresh counter.
1178                if reliable {
1179                    // Per perf #133 — Arc-wrap the descriptor before
1180                    // handing it to the reliability mode. The
1181                    // retransmit window then shares the inner
1182                    // `Vec<Bytes>` rather than holding an owned copy.
1183                    let descriptor = std::sync::Arc::new(reliability::RetransmitDescriptor {
1184                        seq,
1185                        stream_id,
1186                        events: current_batch.clone(),
1187                        flags,
1188                    });
1189                    let stream = session.get_or_create_stream(stream_id);
1190                    stream.with_reliability(|r| r.on_send(descriptor));
1191                }
1192
1193                current_batch.clear();
1194                current_size = 0;
1195            }
1196
1197            current_batch.push(event_bytes);
1198            current_size += frame_size;
1199        }
1200
1201        // Send remaining events
1202        if !current_batch.is_empty() {
1203            let seq;
1204            {
1205                let stream = session.get_or_create_stream(stream_id);
1206                seq = stream.next_tx_seq();
1207            }
1208
1209            let flags = if reliable {
1210                PacketFlags::RELIABLE
1211            } else {
1212                PacketFlags::NONE
1213            };
1214
1215            let packet = builder.build(stream_id, seq, &current_batch, flags);
1216
1217            socket
1218                .send_to(&packet, peer_addr)
1219                .await
1220                .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
1221
1222            if reliable {
1223                // Per perf #133 — see the matching call site above.
1224                let descriptor = std::sync::Arc::new(reliability::RetransmitDescriptor {
1225                    seq,
1226                    stream_id,
1227                    events: current_batch.clone(),
1228                    flags,
1229                });
1230                let stream = session.get_or_create_stream(stream_id);
1231                stream.with_reliability(|r| r.on_send(descriptor));
1232            }
1233        }
1234
1235        session.touch();
1236
1237        Ok(())
1238    }
1239
1240    async fn poll_shard(
1241        &self,
1242        shard_id: u16,
1243        from_id: Option<&str>,
1244        limit: usize,
1245    ) -> Result<ShardPollResult, AdapterError> {
1246        let mut events = Vec::with_capacity(limit);
1247
1248        if let Some(queue) = self.inbound.get(&shard_id) {
1249            while events.len() < limit {
1250                if let Some(event) = queue.pop() {
1251                    if from_id.is_none() || event_id_gt(&event.id, from_id.unwrap_or("")) {
1252                        events.push(event);
1253                    }
1254                    // Events at or before the cursor have already been
1255                    // consumed — drop them instead of requeuing. Requeuing
1256                    // caused unbounded memory growth because these events
1257                    // can never pass an advancing cursor.
1258                } else {
1259                    break;
1260                }
1261            }
1262        }
1263
1264        let has_more = self
1265            .inbound
1266            .get(&shard_id)
1267            .map(|q| !q.is_empty())
1268            .unwrap_or(false);
1269        let next_id = events.last().map(|e| e.id.clone());
1270
1271        Ok(ShardPollResult {
1272            events,
1273            next_id,
1274            has_more,
1275        })
1276    }
1277
1278    async fn flush(&self) -> Result<(), AdapterError> {
1279        // For reliable streams, wait for all pending ACKs
1280        // Currently a no-op since we're fire-and-forget by default
1281        Ok(())
1282    }
1283
1284    async fn shutdown(&self) -> Result<(), AdapterError> {
1285        self.shutdown.store(true, Ordering::Release);
1286
1287        // Wake all tasks blocked on I/O so they can observe the shutdown flag.
1288        // notify_waiters wakes all current waiters (receiver + heartbeat).
1289        self.shutdown_notify.notify_waiters();
1290
1291        // Clear session
1292        self.session_manager.clear_session();
1293
1294        // Wait for tasks to complete
1295        let mut tasks = self.tasks.lock().await;
1296        for task in tasks.drain(..) {
1297            let _ = task.await;
1298        }
1299
1300        self.initialized.store(false, Ordering::Release);
1301
1302        tracing::info!("Net adapter shutdown complete");
1303
1304        Ok(())
1305    }
1306
1307    fn name(&self) -> &'static str {
1308        "net"
1309    }
1310
1311    async fn is_healthy(&self) -> bool {
1312        self.initialized.load(Ordering::Acquire) && self.session_manager.check_session()
1313    }
1314}
1315
1316impl std::fmt::Debug for NetAdapter {
1317    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1318        f.debug_struct("NetAdapter")
1319            .field("config", &self.config)
1320            .field("initialized", &self.initialized.load(Ordering::Relaxed))
1321            .finish()
1322    }
1323}
1324
1325/// Compare two event IDs numerically.
1326///
1327/// IDs are formatted as `"seq:idx"`. Lexicographic comparison is wrong for
1328/// numeric values (e.g. `"9:0" > "10:0"` lexicographically). This function
1329/// parses the components and compares numerically, falling back to string
1330/// comparison only if parsing fails.
1331fn event_id_gt(a: &str, b: &str) -> bool {
1332    fn parse_id(id: &str) -> Option<(u64, u64)> {
1333        let (seq, idx) = id.split_once(':')?;
1334        Some((seq.parse().ok()?, idx.parse().ok()?))
1335    }
1336
1337    match (parse_id(a), parse_id(b)) {
1338        (Some(a), Some(b)) => a > b,
1339        _ => a > b, // fallback to lexicographic
1340    }
1341}
1342
1343#[cfg(test)]
1344mod tests {
1345    use super::*;
1346
1347    #[test]
1348    fn test_adapter_creation() {
1349        let psk = [0x42u8; 32];
1350        let peer_pubkey = [0x24u8; 32];
1351
1352        let config = NetAdapterConfig::initiator(
1353            "127.0.0.1:0".parse().unwrap(),
1354            "127.0.0.1:9999".parse().unwrap(),
1355            psk,
1356            peer_pubkey,
1357        );
1358
1359        let adapter = NetAdapter::new(config).unwrap();
1360        assert_eq!(adapter.name(), "net");
1361    }
1362
1363    /// PERF_AUDIT §2.7 — cache hit: a read younger than the
1364    /// refresh window must return the cached value WITHOUT
1365    /// touching the wall clock and without rewriting the cache.
1366    /// Deterministic — drives `coarse_clock_advance` with
1367    /// synthetic instants instead of racing a real 1 ms window
1368    /// against OS scheduling.
1369    #[test]
1370    fn coarse_clock_reuses_cache_within_refresh_window() {
1371        let t0 = std::time::Instant::now();
1372        let within = t0 + std::time::Duration::from_nanos(COARSE_CLOCK_REFRESH_NS - 1);
1373        let (store, ns) = coarse_clock_advance(Some((t0, 42)), within, || {
1374            panic!("cache hit must not read the wall clock")
1375        });
1376        assert_eq!(ns, 42, "hit must return the cached reading");
1377        assert!(store.is_none(), "hit must keep the hit path store-free");
1378    }
1379
1380    /// PERF_AUDIT §2.7 — refresh: a read at (boundary is
1381    /// exclusive) or past the window must take a fresh wall-clock
1382    /// reading and rebase the window on the read instant.
1383    #[test]
1384    fn coarse_clock_refreshes_at_and_past_the_window() {
1385        let t0 = std::time::Instant::now();
1386        let at_boundary = t0 + std::time::Duration::from_nanos(COARSE_CLOCK_REFRESH_NS);
1387        let (store, ns) = coarse_clock_advance(Some((t0, 42)), at_boundary, || 100);
1388        assert_eq!(ns, 100, "boundary read must refresh");
1389        assert_eq!(
1390            store,
1391            Some((at_boundary, 100)),
1392            "refresh must rebase the window on the read instant"
1393        );
1394    }
1395
1396    /// PERF_AUDIT §2.7 — cold start: no cached pair → wall-clock
1397    /// read, cached against the read instant.
1398    #[test]
1399    fn coarse_clock_cold_start_reads_wall_clock() {
1400        let t0 = std::time::Instant::now();
1401        let (store, ns) = coarse_clock_advance(None, t0, || 7);
1402        assert_eq!(ns, 7);
1403        assert_eq!(store, Some((t0, 7)));
1404    }
1405
1406    #[test]
1407    fn current_timestamp_advances_after_refresh_interval() {
1408        // After sleeping past the refresh interval, the next read
1409        // must return a larger value — pins that the cache
1410        // actually refreshes rather than getting stuck on the
1411        // initial reading.
1412        let first = current_timestamp();
1413        std::thread::sleep(std::time::Duration::from_millis(5));
1414        let later = current_timestamp();
1415        assert!(
1416            later > first,
1417            "post-refresh reading must advance: first={}, later={}",
1418            first,
1419            later
1420        );
1421    }
1422
1423    #[test]
1424    fn test_shard_id_from_stream_id_uses_modulo() {
1425        // Regression: shard_id was computed as `stream_id as u16` (truncation),
1426        // which collides for stream IDs that differ only in upper bits.
1427        // The fix uses `stream_id % num_shards`.
1428        let num_shards: u16 = 8;
1429
1430        // Two stream IDs that are identical in their low 16 bits
1431        // but different overall must map to the same shard via modulo,
1432        // while truncation would also give the same result here.
1433        // More importantly, a large stream_id must stay within [0, num_shards).
1434        let stream_a: u64 = 0xDEAD_BEEF_0000_0003;
1435        let stream_b: u64 = 0xCAFE_BABE_0000_0003;
1436
1437        let shard_a = (stream_a % num_shards as u64) as u16;
1438        let shard_b = (stream_b % num_shards as u64) as u16;
1439
1440        assert!(
1441            shard_a < num_shards,
1442            "shard must be in range [0, num_shards)"
1443        );
1444        assert!(
1445            shard_b < num_shards,
1446            "shard must be in range [0, num_shards)"
1447        );
1448
1449        // Large stream IDs that would overflow u16 must still be valid shard IDs
1450        let big_stream: u64 = 0xFFFF_FFFF_FFFF_FFFF;
1451        let shard_big = (big_stream % num_shards as u64) as u16;
1452        assert!(shard_big < num_shards);
1453
1454        // Truncation would give 0xFFFF = 65535, which is >= num_shards.
1455        // Modulo gives a valid shard.
1456        assert_ne!(
1457            big_stream as u16, shard_big,
1458            "modulo must differ from truncation for large stream IDs"
1459        );
1460    }
1461
1462    #[test]
1463    fn test_invalid_config() {
1464        let psk = [0x42u8; 32];
1465        let peer_pubkey = [0x24u8; 32];
1466
1467        let mut config = NetAdapterConfig::initiator(
1468            "127.0.0.1:0".parse().unwrap(),
1469            "127.0.0.1:9999".parse().unwrap(),
1470            psk,
1471            peer_pubkey,
1472        );
1473        config.peer_static_pubkey = None;
1474
1475        let result = NetAdapter::new(config);
1476        assert!(result.is_err());
1477    }
1478
1479    // Regression: event_id_gt used lexicographic comparison, so "9:0" > "10:0"
1480    // was true (wrong). Now uses numeric comparison (BUGS_4 #2).
1481    #[test]
1482    fn test_event_id_gt_numeric_ordering() {
1483        // Basic ordering
1484        assert!(event_id_gt("2:0", "1:0"));
1485        assert!(!event_id_gt("1:0", "2:0"));
1486        assert!(!event_id_gt("1:0", "1:0"));
1487
1488        // The critical case: double-digit seq must compare correctly
1489        assert!(event_id_gt("10:0", "9:0"));
1490        assert!(event_id_gt("100:0", "99:0"));
1491        assert!(!event_id_gt("9:0", "10:0"));
1492
1493        // Index comparison within same sequence
1494        assert!(event_id_gt("5:2", "5:1"));
1495        assert!(!event_id_gt("5:1", "5:2"));
1496
1497        // Large sequences
1498        assert!(event_id_gt("1000000:0", "999999:0"));
1499    }
1500
1501    // Regression: poll_shard used to destructively pop events that didn't
1502    // pass the cursor filter, causing permanent data loss (BUGS_4 #1).
1503    // This is tested indirectly via event_id_gt since poll_shard requires
1504    // a full adapter setup, but the non-destructive requeue logic is
1505    // verified by the SegQueue re-push in the implementation.
1506    #[test]
1507    fn test_event_id_gt_edge_cases() {
1508        // Empty strings
1509        assert!(event_id_gt("1:0", ""));
1510        // Malformed IDs fall back to string comparison
1511        assert!(event_id_gt("b", "a"));
1512        assert!(!event_id_gt("a", "b"));
1513    }
1514
1515    /// Regression: packets built by PacketBuilder must survive process_packet.
1516    /// This test bypasses the network and directly verifies the encrypt→decrypt
1517    /// data path, catching AAD mismatches, nonce construction bugs, and key
1518    /// derivation errors.
1519    #[test]
1520    fn test_build_then_process_packet_roundtrip() {
1521        use crate::adapter::net::crypto::{NoiseHandshake, StaticKeypair};
1522        use dashmap::DashMap;
1523        use std::sync::Arc;
1524
1525        // Perform a real handshake to get matching keys
1526        let psk = [0x42u8; 32];
1527        let responder_kp = StaticKeypair::generate();
1528
1529        let mut initiator = NoiseHandshake::initiator(&psk, &responder_kp.public).unwrap();
1530        let mut responder = NoiseHandshake::responder(&psk, &responder_kp).unwrap();
1531
1532        let msg1 = initiator.write_message(&[]).unwrap();
1533        responder.read_message(&msg1).unwrap();
1534        let msg2 = responder.write_message(&[]).unwrap();
1535        initiator.read_message(&msg2).unwrap();
1536
1537        let init_keys = initiator.into_session_keys().unwrap();
1538        let resp_keys = responder.into_session_keys().unwrap();
1539
1540        // Initiator builds a packet
1541        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1542        let events = vec![
1543            Bytes::from(r#"{"token":"hello"}"#),
1544            Bytes::from(r#"{"token":"world"}"#),
1545        ];
1546        let packet = builder.build(0, 0, &events, PacketFlags::NONE);
1547
1548        // Responder processes the packet
1549        let resp_session = Arc::new(NetSession::new(
1550            resp_keys,
1551            "127.0.0.1:5000".parse().unwrap(),
1552            4,
1553            false,
1554        ));
1555        let inbound: InboundQueues = Arc::new(DashMap::new());
1556        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1557
1558        NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1559
1560        // Events should be queued in shard 0
1561        let queue = inbound.get(&0).expect("shard 0 should have events");
1562        assert_eq!(queue.len(), 2, "expected 2 events, got {}", queue.len());
1563
1564        let e1 = queue.pop().unwrap();
1565        assert_eq!(&e1.raw[..], br#"{"token":"hello"}"#);
1566
1567        let e2 = queue.pop().unwrap();
1568        assert_eq!(&e2.raw[..], br#"{"token":"world"}"#);
1569    }
1570
1571    /// Helper: perform a Noise handshake and return matched key pairs.
1572    fn make_session_keys() -> (SessionKeys, SessionKeys) {
1573        use crate::adapter::net::crypto::{NoiseHandshake, StaticKeypair};
1574
1575        let psk = [0x42u8; 32];
1576        let responder_kp = StaticKeypair::generate();
1577
1578        let mut initiator = NoiseHandshake::initiator(&psk, &responder_kp.public).unwrap();
1579        let mut responder = NoiseHandshake::responder(&psk, &responder_kp).unwrap();
1580
1581        let msg1 = initiator.write_message(&[]).unwrap();
1582        responder.read_message(&msg1).unwrap();
1583        let msg2 = responder.write_message(&[]).unwrap();
1584        initiator.read_message(&msg2).unwrap();
1585
1586        (
1587            initiator.into_session_keys().unwrap(),
1588            responder.into_session_keys().unwrap(),
1589        )
1590    }
1591
1592    #[test]
1593    fn test_process_packet_rejects_truncated_packet() {
1594        use dashmap::DashMap;
1595        use std::sync::Arc;
1596
1597        let (init_keys, resp_keys) = make_session_keys();
1598
1599        // Build a valid packet, then truncate it
1600        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1601        let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1602
1603        let resp_session = Arc::new(NetSession::new(
1604            resp_keys,
1605            "127.0.0.1:5000".parse().unwrap(),
1606            4,
1607            false,
1608        ));
1609        let inbound: InboundQueues = Arc::new(DashMap::new());
1610        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1611
1612        // Truncate: remove last 10 bytes (partial auth tag)
1613        let truncated = packet.slice(..packet.len() - 10);
1614        NetAdapter::process_packet(truncated, source, &resp_session, &inbound, 1);
1615        assert!(
1616            inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1617            "truncated packet must be silently dropped"
1618        );
1619    }
1620
1621    #[test]
1622    fn test_process_packet_rejects_tampered_payload() {
1623        use dashmap::DashMap;
1624        use std::sync::Arc;
1625
1626        let (init_keys, resp_keys) = make_session_keys();
1627
1628        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1629        let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1630
1631        let resp_session = Arc::new(NetSession::new(
1632            resp_keys,
1633            "127.0.0.1:5000".parse().unwrap(),
1634            4,
1635            false,
1636        ));
1637        let inbound: InboundQueues = Arc::new(DashMap::new());
1638        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1639
1640        // Tamper: flip a byte in the encrypted payload
1641        let mut tampered = bytes::BytesMut::from(&packet[..]);
1642        tampered[super::protocol::HEADER_SIZE + 2] ^= 0xFF;
1643        NetAdapter::process_packet(tampered.freeze(), source, &resp_session, &inbound, 1);
1644
1645        assert!(
1646            inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1647            "tampered packet must be rejected by AEAD"
1648        );
1649    }
1650
1651    #[test]
1652    fn test_process_packet_rejects_wrong_session_id() {
1653        use dashmap::DashMap;
1654        use std::sync::Arc;
1655
1656        let (init_keys, resp_keys) = make_session_keys();
1657
1658        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1659        let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1660
1661        // Create session with a DIFFERENT session_id
1662        let mut wrong_keys = resp_keys;
1663        wrong_keys.session_id = 0xDEAD;
1664        let resp_session = Arc::new(NetSession::new(
1665            wrong_keys,
1666            "127.0.0.1:5000".parse().unwrap(),
1667            4,
1668            false,
1669        ));
1670        let inbound: InboundQueues = Arc::new(DashMap::new());
1671        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1672
1673        NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1674
1675        assert!(
1676            inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1677            "packet with wrong session_id must be dropped"
1678        );
1679    }
1680
1681    #[test]
1682    fn test_process_packet_multi_packet_batch_all_events_arrive() {
1683        use dashmap::DashMap;
1684        use std::sync::Arc;
1685
1686        let (init_keys, resp_keys) = make_session_keys();
1687
1688        let resp_session = Arc::new(NetSession::new(
1689            resp_keys,
1690            "127.0.0.1:5000".parse().unwrap(),
1691            4,
1692            false,
1693        ));
1694        let inbound: InboundQueues = Arc::new(DashMap::new());
1695        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1696
1697        // Build events large enough to span multiple packets.
1698        // Each event is ~200 bytes, MAX_PAYLOAD_SIZE is ~8112, so ~40 per packet.
1699        // 200 events → ~5 packets.
1700        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1701        let total_events = 200;
1702        let mut seq = 0u64;
1703
1704        // Simulate on_batch splitting into multiple packets
1705        let mut current_batch: Vec<Bytes> = Vec::new();
1706        let mut current_size = 0;
1707
1708        for i in 0..total_events {
1709            let data = format!("{{\"i\":{},\"pad\":\"{}\"}}", i, "x".repeat(150));
1710            let event_bytes = Bytes::from(data);
1711            let frame_size = EventFrame::LEN_SIZE + event_bytes.len();
1712
1713            if current_size + frame_size > protocol::MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
1714                let packet = builder.build(0, seq, &current_batch, PacketFlags::NONE);
1715                NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1716                seq += 1;
1717                current_batch.clear();
1718                current_size = 0;
1719            }
1720
1721            current_batch.push(event_bytes);
1722            current_size += frame_size;
1723        }
1724
1725        if !current_batch.is_empty() {
1726            let packet = builder.build(0, seq, &current_batch, PacketFlags::NONE);
1727            NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1728        }
1729
1730        // All events must arrive
1731        let queue = inbound.get(&0).expect("shard 0 should have events");
1732        assert_eq!(
1733            queue.len(),
1734            total_events,
1735            "all {} events must arrive across multiple packets",
1736            total_events
1737        );
1738    }
1739
1740    #[test]
1741    fn test_build_then_process_packet_both_directions() {
1742        use dashmap::DashMap;
1743        use std::sync::Arc;
1744
1745        let (init_keys, resp_keys) = make_session_keys();
1746        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1747
1748        // Direction 1: initiator → responder
1749        {
1750            let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1751            let packet = builder.build(0, 0, &[Bytes::from_static(b"i2r")], PacketFlags::NONE);
1752
1753            let session = Arc::new(NetSession::new(resp_keys.clone(), source, 4, false));
1754            let inbound: InboundQueues = Arc::new(DashMap::new());
1755            NetAdapter::process_packet(packet, source, &session, &inbound, 1);
1756
1757            let queue = inbound.get(&0).expect("i2r: shard 0 should have events");
1758            assert_eq!(queue.len(), 1, "i2r: expected 1 event");
1759            assert_eq!(&queue.pop().unwrap().raw[..], b"i2r");
1760        }
1761
1762        // Direction 2: responder → initiator
1763        {
1764            let mut builder = PacketBuilder::new(&resp_keys.tx_key, resp_keys.session_id);
1765            let packet = builder.build(0, 0, &[Bytes::from_static(b"r2i")], PacketFlags::NONE);
1766
1767            let session = Arc::new(NetSession::new(init_keys.clone(), source, 4, false));
1768            let inbound: InboundQueues = Arc::new(DashMap::new());
1769            NetAdapter::process_packet(packet, source, &session, &inbound, 1);
1770
1771            let queue = inbound.get(&0).expect("r2i: shard 0 should have events");
1772            assert_eq!(queue.len(), 1, "r2i: expected 1 event");
1773            assert_eq!(&queue.pop().unwrap().raw[..], b"r2i");
1774        }
1775    }
1776
1777    #[test]
1778    fn test_poll_shard_cursor_drops_consumed_events() {
1779        // Verify that poll_shard with a cursor drops events at or before
1780        // the cursor (they've already been consumed) and returns only
1781        // events after the cursor. The queue should be empty afterward —
1782        // no unbounded requeue growth.
1783        use std::sync::Arc;
1784
1785        let (init_keys, resp_keys) = make_session_keys();
1786
1787        let resp_session = Arc::new(NetSession::new(
1788            resp_keys,
1789            "127.0.0.1:5000".parse().unwrap(),
1790            4,
1791            false,
1792        ));
1793        let inbound: InboundQueues = Arc::new(DashMap::new());
1794        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1795
1796        // Send 3 packets (sequences 0, 1, 2), each with 1 event
1797        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1798        for seq in 0..3u64 {
1799            let events = vec![Bytes::from(format!("event-{}", seq))];
1800            let packet = builder.build(0, seq, &events, PacketFlags::NONE);
1801            NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1802        }
1803
1804        let queue = inbound.get(&0u16).unwrap();
1805        assert_eq!(queue.len(), 3);
1806
1807        // Simulate poll_shard with cursor "0:0" — drops event 0:0,
1808        // returns events 1:0 and 2:0
1809        let from_id = "0:0";
1810        let mut events = Vec::new();
1811        while events.len() < 10 {
1812            if let Some(event) = queue.pop() {
1813                if event_id_gt(&event.id, from_id) {
1814                    events.push(event);
1815                }
1816                // Events at/before cursor are dropped (not requeued)
1817            } else {
1818                break;
1819            }
1820        }
1821
1822        assert_eq!(events.len(), 2, "should get 2 events after cursor 0:0");
1823        assert_eq!(events[0].id, "1:0");
1824        assert_eq!(events[1].id, "2:0");
1825
1826        // Queue should be empty — consumed events are dropped, not requeued
1827        assert_eq!(queue.len(), 0, "queue should be empty after poll drains it");
1828    }
1829
1830    #[test]
1831    fn test_process_packet_old_counter_rejected() {
1832        // Verify that a packet with a counter below the replay window
1833        // is rejected after the window has advanced.
1834        use std::sync::Arc;
1835
1836        let (init_keys, resp_keys) = make_session_keys();
1837        let resp_session = Arc::new(NetSession::new(
1838            resp_keys,
1839            "127.0.0.1:5000".parse().unwrap(),
1840            4,
1841            false,
1842        ));
1843        let inbound: InboundQueues = Arc::new(DashMap::new());
1844        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1845
1846        // Send 1100 packets to advance the rx_counter past the replay window (1024)
1847        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1848        for seq in 0..1100u64 {
1849            let packet = builder.build(0, seq, &[Bytes::from_static(b"x")], PacketFlags::NONE);
1850            NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1851        }
1852        assert_eq!(inbound.get(&0).unwrap().len(), 1100);
1853
1854        // Build a packet with a fresh builder whose counter starts at 0.
1855        // The rx_counter is now at ~1100, so counter 0 is outside the
1856        // 1024-wide replay window and must be rejected.
1857        let mut stale_builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1858        let stale_packet =
1859            stale_builder.build(0, 9999, &[Bytes::from_static(b"stale")], PacketFlags::NONE);
1860        NetAdapter::process_packet(stale_packet, source, &resp_session, &inbound, 1);
1861
1862        // Should still be 1100 — stale packet rejected
1863        assert_eq!(
1864            inbound.get(&0).unwrap().len(),
1865            1100,
1866            "packet with stale counter must be rejected"
1867        );
1868    }
1869
1870    #[test]
1871    fn test_process_packet_far_future_counter_rejected() {
1872        // Verify that a packet with a counter far beyond MAX_FORWARD is
1873        // rejected, preventing an attacker from advancing the rx_counter
1874        // and denying subsequent legitimate packets.
1875        use std::sync::Arc;
1876
1877        let (_init_keys, resp_keys) = make_session_keys();
1878
1879        // Build a valid packet, then manually tamper the nonce counter
1880        // to a huge value. The AEAD will fail because the nonce doesn't
1881        // match, but we're testing that is_valid_rx_counter rejects it
1882        // before even attempting decryption.
1883        let resp_session = Arc::new(NetSession::new(
1884            resp_keys,
1885            "127.0.0.1:5000".parse().unwrap(),
1886            4,
1887            false,
1888        ));
1889
1890        // Directly test the cipher's counter validation
1891        let rx_cipher = resp_session.rx_cipher();
1892        assert!(
1893            !rx_cipher.is_valid_rx_counter(u64::MAX),
1894            "counter at u64::MAX must be rejected (far beyond MAX_FORWARD)"
1895        );
1896        assert!(
1897            rx_cipher.is_valid_rx_counter(0),
1898            "counter 0 should be valid initially"
1899        );
1900    }
1901
1902    /// Regression: BUG_REPORT.md #5 — `process_packet` previously
1903    /// discarded the bool returned by `r.on_receive(seq)` on the
1904    /// reliability layer, queueing events even for duplicates.
1905    /// Each retransmit re-encrypts with a fresh outer counter, so
1906    /// the cipher's replay window does not catch this; without
1907    /// honoring `on_receive`, the inbound queue accumulates
1908    /// duplicates and breaks exactly-once delivery on reliable
1909    /// streams.
1910    ///
1911    /// We construct the duplicate-detection scenario by building
1912    /// two distinct packets that share the same stream sequence.
1913    /// On a reliable session the second one's `on_receive` returns
1914    /// `false`, so `process_packet` must not enqueue its events.
1915    /// (The cipher's outer counter is fresh on both packets, so
1916    /// the replay window can't filter them — only the reliability
1917    /// layer's check stops the duplicate.)
1918    #[test]
1919    fn process_packet_drops_duplicates_per_reliability_decision() {
1920        use dashmap::DashMap;
1921        use std::sync::Arc;
1922
1923        let (init_keys, resp_keys) = make_session_keys();
1924
1925        // Reliable session — its streams use `ReliableStream`,
1926        // whose `on_receive` returns `false` for `seq <
1927        // next_expected` (duplicates).
1928        let resp_session = Arc::new(NetSession::new(
1929            resp_keys,
1930            "127.0.0.1:5000".parse().unwrap(),
1931            4,
1932            true, // default_reliable
1933        ));
1934        let inbound: InboundQueues = Arc::new(DashMap::new());
1935        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1936
1937        // Two packets on stream 7. First carries sequences 0..1,
1938        // second is a duplicate (same seq=0) that should be
1939        // filtered. We deliver seq=0 then seq=1 first to advance
1940        // `next_expected` past 0, then a packet with seq=0 — that
1941        // last one is the duplicate.
1942        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1943        let packet0 = builder.build(7, 0, &[Bytes::from(r#"{"first":0}"#)], PacketFlags::NONE);
1944        let packet1 = builder.build(7, 1, &[Bytes::from(r#"{"first":1}"#)], PacketFlags::NONE);
1945        // Re-encrypted retransmit of seq=0 — same stream, same seq,
1946        // different payload. This is the scenario the bug allowed
1947        // through.
1948        let packet0_dup = builder.build(
1949            7,
1950            0,
1951            &[Bytes::from(r#"{"dup":"should_not_appear"}"#)],
1952            PacketFlags::NONE,
1953        );
1954
1955        NetAdapter::process_packet(packet0, source, &resp_session, &inbound, 1);
1956        NetAdapter::process_packet(packet1, source, &resp_session, &inbound, 1);
1957        NetAdapter::process_packet(packet0_dup, source, &resp_session, &inbound, 1);
1958
1959        let queue = inbound.get(&0).expect("shard 0 should exist");
1960        assert_eq!(
1961            queue.len(),
1962            2,
1963            "duplicate packet must NOT enqueue (BUG_REPORT.md #5); \
1964             got {} events, expected exactly 2 (seq=0 and seq=1, no dup)",
1965            queue.len()
1966        );
1967
1968        // Drain in FIFO order and assert no `should_not_appear`
1969        // event sneaked through.
1970        let e0 = queue.pop().unwrap();
1971        assert_eq!(&e0.raw[..], br#"{"first":0}"#);
1972        let e1 = queue.pop().unwrap();
1973        assert_eq!(&e1.raw[..], br#"{"first":1}"#);
1974        assert!(queue.is_empty());
1975    }
1976
1977    /// Regression: heartbeats must be AEAD-authenticated so an
1978    /// off-path attacker who knows or observes the session_id
1979    /// cannot spoof them. Pre-fix the receiver only checked
1980    /// `source == peer_addr` (UDP source — spoofable) and
1981    /// `session_id` match (in cleartext on every packet); now the
1982    /// 16-byte Poly1305 tag binds the heartbeat to the session
1983    /// key.
1984    #[test]
1985    fn heartbeat_is_aead_authenticated() {
1986        use crate::adapter::net::pool::PacketBuilder;
1987        use dashmap::DashMap;
1988        use std::sync::Arc;
1989
1990        let (init_keys, resp_keys) = make_session_keys();
1991
1992        let resp_session = Arc::new(NetSession::new(
1993            resp_keys,
1994            "127.0.0.1:5000".parse().unwrap(),
1995            4,
1996            false,
1997        ));
1998        let inbound: InboundQueues = Arc::new(DashMap::new());
1999        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
2000
2001        // Build a legitimate heartbeat with the initiator's
2002        // session key and tag it.
2003        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
2004        let heartbeat = builder.build_heartbeat();
2005        let last_activity_before = resp_session.last_activity_ns();
2006        std::thread::sleep(std::time::Duration::from_millis(2));
2007
2008        // Process: this must succeed and call session.touch().
2009        NetAdapter::process_packet(heartbeat, source, &resp_session, &inbound, 1);
2010        let last_activity_after = resp_session.last_activity_ns();
2011        assert!(
2012            last_activity_after > last_activity_before,
2013            "legitimate AEAD-tagged heartbeat must call session.touch()"
2014        );
2015
2016        // Forge an unauthenticated heartbeat: header-only, no tag.
2017        // Pre-fix this would have passed; post-fix it must be
2018        // rejected.
2019        let mut forged = bytes::BytesMut::new();
2020        let header = NetHeader::heartbeat(resp_session.session_id());
2021        forged.extend_from_slice(&header.to_bytes());
2022        let forged = forged.freeze();
2023        let last_activity_before = resp_session.last_activity_ns();
2024        std::thread::sleep(std::time::Duration::from_millis(2));
2025        NetAdapter::process_packet(forged, source, &resp_session, &inbound, 1);
2026        let last_activity_after = resp_session.last_activity_ns();
2027        assert_eq!(
2028            last_activity_before, last_activity_after,
2029            "unauthenticated heartbeat (no AEAD tag) must NOT touch the session"
2030        );
2031
2032        // Forge a heartbeat with the right session_id but a
2033        // garbage 16-byte "tag". Tag verification fails.
2034        let mut forged_tag = bytes::BytesMut::new();
2035        let mut header_bytes = NetHeader::heartbeat(resp_session.session_id()).to_bytes();
2036        // Stamp a plausible nonce so the receiver gets to the
2037        // decrypt step (otherwise it bails earlier on counter).
2038        header_bytes[12..16].copy_from_slice(&[0u8; 4]);
2039        header_bytes[16..24].copy_from_slice(&1u64.to_le_bytes());
2040        forged_tag.extend_from_slice(&header_bytes);
2041        forged_tag.extend_from_slice(&[0xAAu8; 16]); // garbage tag
2042        let forged_tag = forged_tag.freeze();
2043        let last_activity_before = resp_session.last_activity_ns();
2044        std::thread::sleep(std::time::Duration::from_millis(2));
2045        NetAdapter::process_packet(forged_tag, source, &resp_session, &inbound, 1);
2046        let last_activity_after = resp_session.last_activity_ns();
2047        assert_eq!(
2048            last_activity_before, last_activity_after,
2049            "heartbeat with garbage AEAD tag must NOT touch the session"
2050        );
2051    }
2052
2053    /// Regression: the handshake responder must rate-limit per
2054    /// source so a flooder can't monopolize the recv loop.
2055    /// `HandshakePacer` is the building block: it tracks
2056    /// `(count, window_start)` per source and rejects after
2057    /// `max_per_window` attempts within `window`.
2058    #[test]
2059    fn handshake_pacer_rejects_floods_per_source() {
2060        use std::time::Duration;
2061        let mut pacer = HandshakePacer::new(3, Duration::from_millis(50));
2062
2063        let attacker: std::net::SocketAddr = "10.0.0.1:9000".parse().unwrap();
2064        let legit: std::net::SocketAddr = "10.0.0.2:9000".parse().unwrap();
2065
2066        // Attacker fires 3 attempts — all allowed (within budget).
2067        for _ in 0..3 {
2068            assert!(pacer.check_and_record(attacker));
2069        }
2070        // Fourth and beyond — rejected.
2071        for _ in 0..10 {
2072            assert!(
2073                !pacer.check_and_record(attacker),
2074                "attacker exceeding budget must be dropped"
2075            );
2076        }
2077
2078        // The legitimate initiator (different source) is unaffected
2079        // by the attacker's burst — the budget is per-source.
2080        assert!(
2081            pacer.check_and_record(legit),
2082            "legitimate source must still get through despite attacker flood"
2083        );
2084
2085        // After the window expires the attacker's budget refills.
2086        std::thread::sleep(Duration::from_millis(55));
2087        assert!(
2088            pacer.check_and_record(attacker),
2089            "attacker budget must refill after window"
2090        );
2091    }
2092}