Skip to main content

net/adapter/net/
mod.rs

1//! Net L0 Transport Protocol (Net) adapter.
2//!
3//! Net is a high-performance UDP-based transport protocol designed for
4//! GPU-to-GPU encrypted streaming over UDP. It provides:
5//!
6//! - Zero-copy, zero-allocation hot path
7//! - XChaCha20-Poly1305 encryption per packet
8//! - Noise protocol handshake (NKpsk0)
9//! - Optional per-stream reliability with selective NACKs
10//! - 40-60M events/sec target throughput
11//!
12//! # Usage
13//!
14//! ```rust,ignore
15//! use net::adapter::net::{NetAdapter, NetAdapterConfig, StaticKeypair};
16//!
17//! // Generate keypair for responder
18//! let keypair = StaticKeypair::generate();
19//!
20//! // Create initiator config
21//! let config = NetAdapterConfig::initiator(
22//!     "127.0.0.1:9000".parse()?,
23//!     "127.0.0.1:9001".parse()?,
24//!     psk,
25//!     keypair.public,
26//! );
27//!
28//! // Create adapter
29//! let mut adapter = NetAdapter::new(config)?;
30//! adapter.init().await?;
31//! ```
32
33mod batch;
34pub mod behavior;
35// SDK-level cancel-token registry consumed by the cortex `mesh_rpc`
36// call shapes. Always-built (no cortex feature gate) — the registry
37// is type-pure and small; gating it would mean two parallel
38// definitions, and the `cortex` build is the only consumer today
39// regardless.
40mod cancel_registry;
41pub mod channel;
42pub mod compute;
43mod config;
44pub mod contested;
45pub mod continuity;
46#[cfg(feature = "cortex")]
47pub mod cortex;
48mod crypto;
49mod failure;
50pub mod identity;
51mod mesh;
52// nRPC glue + metrics depend on the cortex fold types (RpcServerFold,
53// RpcClientPending, etc.) and the per-channel-hash inbound dispatcher
54// hook the cortex layer wires up. They're meaningless without
55// `cortex` enabled, and unconditionally exposing them broke `--features
56// net` builds (mesh_rpc.rs references `super::cortex::*`). Gating both
57// keeps the bare-net build clean.
58#[cfg(feature = "dataforts")]
59pub mod dataforts;
60#[cfg(feature = "cortex")]
61pub mod mesh_rpc;
62#[cfg(feature = "cortex")]
63pub mod mesh_rpc_metrics;
64#[cfg(feature = "netdb")]
65pub mod netdb;
66mod pool;
67mod protocol;
68mod proxy;
69#[cfg(feature = "redex")]
70pub mod redex;
71mod reliability;
72mod reroute;
73mod route;
74mod router;
75mod session;
76pub mod state;
77mod stream;
78pub mod subnet;
79pub mod subprotocol;
80mod swarm;
81mod transport;
82#[cfg(feature = "nat-traversal")]
83pub mod traversal;
84
85#[cfg(target_os = "linux")]
86mod linux;
87
88pub use batch::AdaptiveBatcher;
89pub use channel::{
90    AckReason, AuthGuard, AuthVerdict, ChannelConfig, ChannelConfigRegistry, ChannelError,
91    ChannelHash, ChannelId, ChannelName, ChannelPublisher, ChannelRegistry, MembershipMsg,
92    OnFailure, PublishConfig, PublishReport, SubscriberRoster, Visibility,
93    SUBPROTOCOL_CHANNEL_MEMBERSHIP,
94};
95pub use compute::{
96    DaemonError, DaemonFactoryRegistry, DaemonHost, DaemonHostConfig, DaemonRegistry, DaemonStats,
97    FactoryEntry, MeshDaemon, MigrationError, MigrationMessage, MigrationOrchestrator,
98    MigrationPhase, MigrationSourceHandler, MigrationState, MigrationTargetHandler,
99    PlacementDecision, Scheduler, SchedulerError, SUBPROTOCOL_MIGRATION,
100};
101pub use config::{ConnectionRole, NetAdapterConfig, ReliabilityConfig};
102pub use contested::{
103    CorrelatedFailureConfig, CorrelatedFailureDetector, CorrelationVerdict, FailureCause,
104    PartitionDetector, PartitionPhase, PartitionRecord, ReconcileOutcome, Side,
105    SUBPROTOCOL_PARTITION,
106};
107pub use continuity::{
108    assess_continuity, CausalCone, Causality, ContinuityProof, ContinuityStatus, Discontinuity,
109    DiscontinuityReason, ForkRecord, HorizonDivergence, ObservationWindow, ProofError,
110    PropagationModel, SuperpositionPhase, SuperpositionState, SUBPROTOCOL_CONTINUITY,
111};
112#[cfg(feature = "cortex")]
113pub use cortex::{
114    CortexAdapter, CortexAdapterConfig, CortexAdapterError, EventEnvelope, EventMeta,
115    FoldErrorPolicy, IntoRedexPayload, StartPosition, EVENT_META_SIZE,
116};
117pub use crypto::{CryptoError, SessionKeys, StaticKeypair};
118pub use failure::{
119    CircuitBreaker, CircuitState, FailureDetector, FailureDetectorConfig, FailureStats,
120    LossSimulator, NodeStatus, RecoveryAction, RecoveryManager, RecoveryStats,
121};
122pub use identity::{
123    EntityError, EntityId, EntityKeypair, OriginStamp, PermissionToken, TokenCache, TokenError,
124    TokenScope,
125};
126pub use mesh::{MeshNode, MeshNodeConfig, PartitionFilter};
127#[cfg(feature = "netdb")]
128pub use netdb::{MemoriesFilter, NetDb, NetDbBuilder, NetDbError, NetDbSnapshot, TasksFilter};
129// `SharedPacketPool` is intentionally not re-exported — see
130// `pool.rs` for the cross-pool nonce-reuse rationale.
131// `PacketPool` itself stays exposed because tests reference it;
132// only the `Arc<PacketPool>` wrapper alias and its constructor
133// are absent.
134pub use pool::{PacketBuilder, PacketPool, SharedLocalPool, ThreadLocalPool};
135pub use protocol::{
136    EventFrame, NackPayload, NetHeader, PacketFlags, HEADER_SIZE, NONCE_SIZE, TAG_SIZE,
137};
138pub use proxy::{
139    ForwardResult, HopStats, MultiHopPacketBuilder, NetProxy, ProxyConfig, ProxyError, ProxyStats,
140};
141#[cfg(feature = "redex")]
142pub use redex::{
143    FsyncPolicy, IndexOp, IndexStart, OrderedAppender, Redex, RedexEntry, RedexError, RedexEvent,
144    RedexFile, RedexFileConfig, RedexFlags, RedexFold, RedexIndex, TypedRedexFile,
145};
146pub use reliability::{FireAndForget, ReliabilityMode, ReliableStream, RetransmitDescriptor};
147pub use reroute::ReroutePolicy;
148pub use route::{
149    AggregateStats, RouteEntry, RouteFlags, RoutingHeader, RoutingTable, SchedulerStreamStats,
150    ROUTING_HEADER_SIZE,
151};
152pub use router::{FairScheduler, NetRouter, RouteAction, RouterConfig, RouterError, RouterStats};
153pub use session::{NetSession, SessionManager, StreamState, TxAdmit, TxSlotGuard};
154pub use state::{
155    CausalChainBuilder, CausalEvent, CausalLink, ChainError, EntityLog, HorizonEncoder, LogError,
156    LogIndex, ObservedHorizon, SnapshotStore, StateSnapshot, CAUSAL_LINK_SIZE, SUBPROTOCOL_CAUSAL,
157    SUBPROTOCOL_SNAPSHOT,
158};
159pub use stream::{
160    CloseBehavior, Reliability, Stream, StreamConfig, StreamError, StreamStats,
161    DEFAULT_STREAM_WINDOW_BYTES,
162};
163pub use subnet::{DropReason, ForwardDecision, SubnetGateway, SubnetId, SubnetPolicy, SubnetRule};
164pub use subprotocol::{
165    negotiate, MigrationSubprotocolHandler, NegotiatedSet, OutboundMigrationMessage,
166    SubprotocolDescriptor, SubprotocolManifest, SubprotocolRegistry, SubprotocolVersion,
167    SUBPROTOCOL_NEGOTIATION,
168};
169pub use swarm::{
170    Capabilities, CapabilityAd, EdgeInfo, GraphStats, LocalGraph, NodeInfo, Pingwave,
171    MAX_GRAPH_NODES, MAX_SEEN_PINGWAVES, PINGWAVE_SIZE,
172};
173pub use transport::{NetSocket, PacketReceiver, PacketSender, ParsedPacket, SocketBufferConfig};
174
175use async_trait::async_trait;
176use bytes::Bytes;
177use crossbeam_queue::SegQueue;
178use dashmap::DashMap;
179use std::sync::atomic::{AtomicBool, Ordering};
180use std::sync::Arc;
181use tokio::sync::Mutex as TokioMutex;
182use tokio::sync::Notify;
183use tokio::task::JoinHandle;
184
185use crate::adapter::{Adapter, ShardPollResult};
186use crate::error::AdapterError;
187use crate::event::{Batch, StoredEvent};
188
189use crypto::NoiseHandshake;
190use session::SessionManager as SessionMgr;
191use transport::NetSocket as Socket;
192
193// Re-export xxh3 utilities for stream routing
194pub use routing::{route_to_shard, stream_id_from_bytes, stream_id_from_key};
195
196/// Current timestamp in nanoseconds since the Unix epoch.
197///
198/// Shared utility — avoids duplicating this across `causal.rs`, `snapshot.rs`,
199/// `observation.rs`, `migration.rs`, `session.rs`, and `token.rs`.
200///
201/// Saturates via `try_from` so future-dated clocks land at
202/// `u64::MAX` instead of wrapping near 0. A bare `as u64` would
203/// silently truncate the `u128` returned by
204/// `Duration::as_nanos()`. Practical wraparound from monotonic
205/// flow doesn't happen until ~year 2554, but a system whose clock
206/// was misconfigured to a far-future date would produce a tiny
207/// truncated timestamp — immediately tripping `is_timed_out`
208/// everywhere. `unwrap_or_default()` returning `Duration::ZERO`
209/// for a pre-epoch clock would also produce identical timestamps
210/// that break ordering.
211#[inline]
212pub(crate) fn current_timestamp() -> u64 {
213    let elapsed = std::time::SystemTime::now()
214        .duration_since(std::time::UNIX_EPOCH)
215        .unwrap_or_default();
216    u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX)
217}
218
219/// Current timestamp in microseconds since the Unix epoch.
220/// Saturates at `0` on pre-epoch clocks (the wire envelopes that
221/// consume this — fold announcements, snapshots — treat micros
222/// purely as diagnostics, never as ordering, so a saturated
223/// reading is benign).
224#[inline]
225pub(crate) fn current_timestamp_micros() -> u64 {
226    std::time::SystemTime::now()
227        .duration_since(std::time::UNIX_EPOCH)
228        .map(|d| d.as_micros() as u64)
229        .unwrap_or(0)
230}
231
232/// Fast xxh3-based routing utilities for Net streams.
233///
234/// Uses xxh3 (~50GB/s) for deterministic, high-performance stream routing.
235mod routing {
236    use xxhash_rust::xxh3::xxh3_64;
237
238    /// Generate a stream ID from arbitrary data.
239    ///
240    /// Uses xxh3 for fast, deterministic hashing (~50GB/s on modern CPUs).
241    #[inline]
242    pub fn stream_id_from_bytes(data: &[u8]) -> u64 {
243        xxh3_64(data)
244    }
245
246    /// Generate a stream ID from a string key.
247    ///
248    /// Convenience wrapper for `stream_id_from_bytes`.
249    #[inline]
250    pub fn stream_id_from_key(key: &str) -> u64 {
251        xxh3_64(key.as_bytes())
252    }
253
254    /// Route data to a shard based on its content hash.
255    ///
256    /// Returns a shard ID in the range `[0, num_shards)`.
257    ///
258    /// # Panics
259    ///
260    /// Panics if `num_shards` is 0.
261    #[inline]
262    pub fn route_to_shard(data: &[u8], num_shards: u16) -> u16 {
263        assert!(num_shards > 0, "num_shards must be > 0");
264        (xxh3_64(data) % num_shards as u64) as u16
265    }
266
267    #[cfg(test)]
268    mod tests {
269        use super::*;
270
271        #[test]
272        fn test_stream_id_deterministic() {
273            let data = b"test event data";
274            let id1 = stream_id_from_bytes(data);
275            let id2 = stream_id_from_bytes(data);
276            assert_eq!(id1, id2);
277        }
278
279        #[test]
280        fn test_stream_id_different_for_different_data() {
281            let id1 = stream_id_from_bytes(b"event1");
282            let id2 = stream_id_from_bytes(b"event2");
283            assert_ne!(id1, id2);
284        }
285
286        #[test]
287        fn test_stream_id_from_key() {
288            let id = stream_id_from_key("user:12345");
289            assert_ne!(id, 0);
290        }
291
292        #[test]
293        fn test_route_to_shard_range() {
294            let num_shards = 16u16;
295            for i in 0..1000 {
296                let data = format!("event_{}", i);
297                let shard = route_to_shard(data.as_bytes(), num_shards);
298                assert!(shard < num_shards);
299            }
300        }
301
302        #[test]
303        #[should_panic(expected = "num_shards must be > 0")]
304        fn test_route_to_shard_zero_shards_panics() {
305            // Regression: route_to_shard(_, 0) caused a divide-by-zero panic
306            // with no helpful message. Now it asserts with a clear message.
307            route_to_shard(b"test", 0);
308        }
309
310        #[test]
311        fn test_route_to_shard_distribution() {
312            let num_shards = 8u16;
313            let mut counts = [0u32; 8];
314
315            for i in 0..8000 {
316                let data = format!("event_{}", i);
317                let shard = route_to_shard(data.as_bytes(), num_shards);
318                counts[shard as usize] += 1;
319            }
320
321            // Check that distribution is reasonably uniform (within 50% of expected)
322            let expected = 1000;
323            for count in counts {
324                assert!(count > expected / 2, "shard count {} too low", count);
325                assert!(count < expected * 2, "shard count {} too high", count);
326            }
327        }
328    }
329}
330
331/// Shared inbound queue type
332type InboundQueues = Arc<DashMap<u16, SegQueue<StoredEvent>>>;
333
334/// Per-source rate limiter for the handshake responder loop.
335///
336/// The responder used to accept whichever source emitted msg1
337/// first, with no per-source pacing — an attacker who knows the PSK
338/// (PSKs are typically multi-tenant) could race the legitimate
339/// initiator's msg1; even without the PSK an attacker could flood
340/// handshake-flagged datagrams to monopolize the recv loop.
341///
342/// `HandshakePacer` keeps a rolling count of recent attempts per
343/// source and rejects sources that exceed the budget within the
344/// window. Expired entries are garbage-collected on a periodic
345/// schedule rather than on every check, so a sustained flood from
346/// many distinct sources doesn't pay an O(n) sweep per packet.
347pub(crate) struct HandshakePacer {
348    /// Per-source `(count_in_window, window_start)`.
349    entries: std::collections::HashMap<std::net::SocketAddr, (u32, std::time::Instant)>,
350    /// Maximum attempts per source within `window`.
351    max_per_window: u32,
352    /// Window length.
353    window: std::time::Duration,
354    /// Last time we ran the GC pass.
355    last_gc: std::time::Instant,
356    /// Soft cap on `entries` size before forcing a GC pass even
357    /// before the periodic deadline. Keeps memory bounded against
358    /// an attacker fanning across many spoofed source addresses.
359    gc_size_threshold: usize,
360}
361
362impl HandshakePacer {
363    pub(crate) fn new(max_per_window: u32, window: std::time::Duration) -> Self {
364        Self {
365            entries: std::collections::HashMap::new(),
366            max_per_window,
367            window,
368            last_gc: std::time::Instant::now(),
369            // 4096 entries × ~40 bytes each ≈ 160 KiB — comfortable
370            // ceiling that still triggers GC well before any
371            // realistic memory issue.
372            gc_size_threshold: 4096,
373        }
374    }
375
376    /// Record an attempt from `source`. Returns `true` if the source
377    /// is within budget (caller may proceed); `false` if it has
378    /// exceeded the rate limit (caller must drop the packet).
379    pub(crate) fn check_and_record(&mut self, source: std::net::SocketAddr) -> bool {
380        let now = std::time::Instant::now();
381        // Amortized GC: only run the O(n) `retain` sweep when one
382        // of two thresholds trips:
383        //   1. We haven't GC'd in `window` (entries are valid for
384        //      at most `2 * window` so a once-per-`window` cadence
385        //      is sufficient to keep the map proportional to the
386        //      active source set).
387        //   2. The map exceeds `gc_size_threshold`, indicating a
388        //      flood attempt across many spoofed sources.
389        if now.duration_since(self.last_gc) >= self.window
390            || self.entries.len() >= self.gc_size_threshold
391        {
392            let cutoff = self.window.saturating_mul(2);
393            self.entries
394                .retain(|_, (_, start)| now.duration_since(*start) < cutoff);
395            self.last_gc = now;
396        }
397
398        let entry = self.entries.entry(source).or_insert((0, now));
399        if now.duration_since(entry.1) > self.window {
400            // Window expired; reset the counter.
401            entry.0 = 0;
402            entry.1 = now;
403        }
404        entry.0 = entry.0.saturating_add(1);
405        entry.0 <= self.max_per_window
406    }
407}
408
409/// Net adapter for high-performance UDP transport.
410pub struct NetAdapter {
411    /// Configuration
412    config: NetAdapterConfig,
413    /// UDP socket
414    socket: Option<Arc<Socket>>,
415    /// Session (stored separately for init)
416    session: Option<Arc<NetSession>>,
417    /// Session manager
418    session_manager: SessionMgr,
419    /// Inbound events per shard (for poll_shard)
420    inbound: InboundQueues,
421    /// Background tasks
422    tasks: TokioMutex<Vec<JoinHandle<()>>>,
423    /// Shutdown signal (flag for polling, Notify for waking blocked tasks)
424    shutdown: Arc<AtomicBool>,
425    /// Notify to wake tasks blocked on I/O during shutdown
426    shutdown_notify: Arc<Notify>,
427    /// Initialization state
428    initialized: AtomicBool,
429    /// Per-source rate limiter for the handshake responder loop.
430    /// Without this, an attacker can flood handshake-flagged
431    /// datagrams to monopolize the recv path or race a legitimate
432    /// initiator's msg1.
433    handshake_pacer: parking_lot::Mutex<HandshakePacer>,
434}
435
436impl NetAdapter {
437    /// Create a new Net adapter.
438    pub fn new(config: NetAdapterConfig) -> Result<Self, AdapterError> {
439        config
440            .validate()
441            .map_err(|e| AdapterError::Fatal(format!("invalid config: {}", e)))?;
442
443        Ok(Self {
444            session_manager: SessionMgr::new(config.session_timeout),
445            config,
446            socket: None,
447            session: None,
448            inbound: Arc::new(DashMap::new()),
449            tasks: TokioMutex::new(Vec::new()),
450            shutdown: Arc::new(AtomicBool::new(false)),
451            shutdown_notify: Arc::new(Notify::new()),
452            initialized: AtomicBool::new(false),
453            // 5 attempts per second per source, plenty for any
454            // legitimate initiator (RTT-limited) and tight enough
455            // to throttle a flooder on consumer-grade hardware.
456            handshake_pacer: parking_lot::Mutex::new(HandshakePacer::new(
457                5,
458                std::time::Duration::from_secs(1),
459            )),
460        })
461    }
462
463    /// Perform Noise handshake with peer.
464    /// Returns session keys and the actual peer address (from the wire, not config).
465    async fn perform_handshake(
466        &self,
467        socket: &Socket,
468    ) -> Result<(SessionKeys, std::net::SocketAddr), AdapterError> {
469        let mut attempt = 0;
470        let max_attempts = self.config.handshake_retries;
471
472        // Cap per-attempt sleep so a misconfigured `handshake_retries`
473        // near `MAX_HANDSHAKE_RETRIES` (1024) cannot pin `init()` for
474        // hours. Pre-fix `100 * attempt` grew linearly and unbounded:
475        // attempt 1024 slept ~102s, with cumulative wait across all
476        // attempts approaching 14 hours. Capping at 5s gives bounded
477        // worst-case `max_attempts × 5s` (~85 minutes at the cap),
478        // which is still long but not unbounded.
479        const HANDSHAKE_RETRY_SLEEP_CAP_MS: u64 = 5_000;
480
481        loop {
482            attempt += 1;
483            match self.try_handshake(socket).await {
484                Ok(result) => return Ok(result),
485                Err(e) if attempt < max_attempts => {
486                    tracing::warn!(
487                        attempt = attempt,
488                        max = max_attempts,
489                        error = %e,
490                        "handshake failed, retrying"
491                    );
492                    let backoff_ms =
493                        (100u64.saturating_mul(attempt as u64)).min(HANDSHAKE_RETRY_SLEEP_CAP_MS);
494                    tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
495                }
496                Err(e) => return Err(e),
497            }
498        }
499    }
500
501    /// Single handshake attempt.
502    /// Returns session keys and the actual peer address.
503    async fn try_handshake(
504        &self,
505        socket: &Socket,
506    ) -> Result<(SessionKeys, std::net::SocketAddr), AdapterError> {
507        let timeout = self.config.handshake_timeout;
508        let socket_arc = socket.socket_arc();
509
510        if self.config.is_initiator() {
511            // Initiator flow
512            let peer_pubkey = self
513                .config
514                .peer_static_pubkey
515                .as_ref()
516                .ok_or_else(|| AdapterError::Fatal("missing peer public key".into()))?;
517
518            let mut handshake = NoiseHandshake::initiator(&self.config.psk, peer_pubkey)
519                .map_err(|e| AdapterError::Fatal(format!("handshake init failed: {}", e)))?;
520
521            // Send first message
522            let msg1 = handshake
523                .write_message(&[])
524                .map_err(|e| AdapterError::Connection(format!("write_message failed: {}", e)))?;
525
526            let mut builder = PacketBuilder::new(&[0u8; 32], 0);
527            let packet = builder.build_handshake(&msg1);
528
529            socket
530                .send_to(&packet, self.config.peer_addr)
531                .await
532                .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
533
534            // Receive response, discarding datagrams that are not handshake
535            // packets from the expected peer. This prevents stray traffic on
536            // the shared socket from consuming the handshake slot.
537            let (parsed, _source) = tokio::time::timeout(timeout, async {
538                // Stack buffer reused across loop iterations.
539                // `MAX_PACKET_SIZE` is 8192 bytes — small enough to
540                // live on the async stack without spilling, and the
541                // reuse drops the per-iteration `BytesMut::with_capacity`
542                // alloc on the stray-traffic path. Pre-fix every
543                // discarded datagram (an off-peer packet, an invalid
544                // handshake) allocated a fresh 8 KiB and freed it at
545                // loop end — under stray UDP traffic on the same
546                // bind port this churned the allocator. Only the
547                // success path now allocates (a `Bytes::copy_from_slice`
548                // sized to the actual payload, since `ParsedPacket`
549                // owns its `Bytes`).
550                let mut recv_buf = [0u8; protocol::MAX_PACKET_SIZE];
551                loop {
552                    let (n, source) = socket_arc
553                        .recv_from(&mut recv_buf)
554                        .await
555                        .map_err(|e| AdapterError::Connection(format!("recv failed: {}", e)))?;
556
557                    // Only accept packets from the peer we initiated with
558                    if source != self.config.peer_addr {
559                        continue;
560                    }
561
562                    let data = bytes::Bytes::copy_from_slice(&recv_buf[..n]);
563
564                    if let Some(p) = ParsedPacket::parse(data, source) {
565                        if p.header.flags.is_handshake() {
566                            return Ok::<_, AdapterError>((p, source));
567                        }
568                    }
569                    // Not a valid handshake packet from our peer — keep waiting
570                }
571            })
572            .await
573            .map_err(|_| AdapterError::Connection("handshake timeout".into()))??;
574
575            // Process response
576            handshake
577                .read_message(&parsed.payload)
578                .map_err(|e| AdapterError::Connection(format!("read_message failed: {}", e)))?;
579
580            // Extract session keys
581            let keys = handshake
582                .into_session_keys()
583                .map_err(|e| AdapterError::Fatal(format!("key extraction failed: {}", e)))?;
584            Ok((keys, self.config.peer_addr))
585        } else {
586            // Responder flow
587            let keypair = self
588                .config
589                .static_keypair
590                .as_ref()
591                .ok_or_else(|| AdapterError::Fatal("missing static keypair".into()))?;
592
593            // Wait for an initiator handshake message, discarding any
594            // non-handshake datagrams that arrive on the shared
595            // socket. Per-source pacing throttles flooders so the
596            // legitimate initiator's msg1 can land — without it,
597            // an attacker could blast handshake-flagged datagrams
598            // and monopolize this recv loop.
599            let (parsed, source) = tokio::time::timeout(timeout, async {
600                loop {
601                    let mut recv_buf = bytes::BytesMut::with_capacity(protocol::MAX_PACKET_SIZE);
602                    recv_buf.resize(protocol::MAX_PACKET_SIZE, 0);
603
604                    let (n, source) = socket_arc
605                        .recv_from(&mut recv_buf)
606                        .await
607                        .map_err(|e| AdapterError::Connection(format!("recv failed: {}", e)))?;
608
609                    recv_buf.truncate(n);
610                    let data = recv_buf.freeze();
611
612                    if let Some(p) = ParsedPacket::parse(data, source) {
613                        if p.header.flags.is_handshake() {
614                            // Per-source pacing: drop packets from
615                            // sources that exceed the budget.
616                            let allowed = self.handshake_pacer.lock().check_and_record(source);
617                            if !allowed {
618                                tracing::debug!(
619                                    %source,
620                                    "handshake responder: dropping packet from \
621                                     rate-limited source"
622                                );
623                                continue;
624                            }
625                            return Ok::<_, AdapterError>((p, source));
626                        }
627                    }
628                    // Not a valid handshake packet — keep waiting
629                }
630            })
631            .await
632            .map_err(|_| AdapterError::Connection("handshake timeout".into()))??;
633
634            let mut handshake = NoiseHandshake::responder(&self.config.psk, keypair)
635                .map_err(|e| AdapterError::Fatal(format!("handshake init failed: {}", e)))?;
636
637            // Process initiator message
638            handshake
639                .read_message(&parsed.payload)
640                .map_err(|e| AdapterError::Connection(format!("read_message failed: {}", e)))?;
641
642            // Send response
643            let msg2 = handshake
644                .write_message(&[])
645                .map_err(|e| AdapterError::Connection(format!("write_message failed: {}", e)))?;
646
647            let mut builder = PacketBuilder::new(&[0u8; 32], 0);
648            let packet = builder.build_handshake(&msg2);
649
650            // Reply to the actual source address (not the configured peer_addr),
651            // so the handshake completes even behind NAT or when the config is stale.
652            socket
653                .send_to(&packet, source)
654                .await
655                .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
656
657            // Extract session keys and use the actual source address as peer
658            let keys = handshake
659                .into_session_keys()
660                .map_err(|e| AdapterError::Fatal(format!("key extraction failed: {}", e)))?;
661            Ok((keys, source))
662        }
663    }
664
665    /// Process a single received packet: parse, decrypt, and queue events.
666    fn process_packet(
667        data: Bytes,
668        source: std::net::SocketAddr,
669        session: &NetSession,
670        inbound: &InboundQueues,
671        num_shards: u16,
672    ) {
673        // Parse packet
674        let mut parsed = match ParsedPacket::parse(data, source) {
675            Some(p) => p,
676            None => return,
677        };
678
679        // Reject packets whose actual payload size doesn't match the declared
680        // length. This catches truncated or oversized packets before they
681        // reach the decrypt path.
682        if !parsed.header.flags.is_handshake()
683            && !parsed.header.flags.is_heartbeat()
684            && !parsed.is_valid_length()
685        {
686            return;
687        }
688
689        // Skip handshake packets in the data path (handled during init)
690        if parsed.header.flags.is_handshake() {
691            return;
692        }
693
694        // Validate session before any state mutation (including touch)
695        if parsed.header.session_id != session.session_id() {
696            return;
697        }
698
699        // Heartbeats are AEAD-tagged: the empty payload encrypts to
700        // a 16-byte Poly1305 tag, and the receiver verifies the
701        // tag here. Without this check, an off-path attacker who
702        // observed or guessed the session_id could spoof
703        // heartbeats and keep a session alive (the source-address
704        // check on UDP is itself spoofable, and session_id is in
705        // cleartext on every prior packet).
706        //
707        // We still require `source == peer_addr` as a cheap
708        // first-line filter so an unauthenticated flood doesn't
709        // get to do the AEAD verify.
710        //
711        // The verify+touch sequence lives inside
712        // `NetSession::verify_and_touch_heartbeat` so callers can't
713        // touch a session whose heartbeat failed verify, and can't
714        // forget to touch on success.
715        if parsed.header.flags.is_heartbeat() {
716            if source == session.peer_addr() {
717                session.verify_and_touch_heartbeat(&parsed);
718            }
719            return;
720        }
721
722        // Decrypt payload. Per crypto-session perf #128, route
723        // through `decrypt_to_bytes` so the common case
724        // (refcount-1 inbound buffer) decrypts in place instead
725        // of allocating a fresh `Vec<u8>` plaintext per packet.
726        let aad = parsed.header.aad();
727        let counter = u64::from_le_bytes(parsed.header.nonce[4..12].try_into().unwrap_or([0u8; 8]));
728        let rx_cipher = session.rx_cipher();
729        let payload = std::mem::take(&mut parsed.payload);
730        // Per crypto-session perf #132: collapsed the pre-decrypt
731        // `is_valid_rx_counter` + post-decrypt `update_rx_counter`
732        // two-step into one `try_admit_rx_counter` call. See
733        // `mesh.rs::process_local_packet` for the full rationale —
734        // the contract is identical (replays still rejected, TOCTOU
735        // still closed), and the redundant Mutex lock on every
736        // non-replay packet is gone.
737        let decrypted = match rx_cipher.decrypt_to_bytes(counter, &aad, payload) {
738            Ok(d) => {
739                if !rx_cipher.try_admit_rx_counter(counter) {
740                    return;
741                }
742                d
743            }
744            Err(_) => return,
745        };
746
747        // Parse events
748        let events = EventFrame::read_events(decrypted, parsed.header.event_count);
749
750        // Update stream state
751        let stream_id = parsed.header.stream_id;
752        let shard_id = if num_shards > 0 {
753            (stream_id % num_shards as u64) as u16
754        } else {
755            0
756        };
757
758        // Previously the boolean result of `r.on_receive(seq)` was
759        // discarded — a duplicate (NACK retransmit, rebroadcast,
760        // etc.) returned `false` but the events were still queued for
761        // poll_shard, breaking exactly-once delivery on reliable
762        // streams. The cipher's replay window doesn't catch this
763        // because each retransmit is re-encrypted with a fresh outer
764        // counter.
765        //
766        // Now: if `on_receive` reports a duplicate, we still call
767        // `session.touch()` (the peer is alive) but skip the queue
768        // step entirely — the original delivery already queued the
769        // events.
770        let is_fresh = {
771            let stream = session.get_or_create_stream(stream_id);
772            // `with_reliability` always invokes the closure (it
773            // locks an internal `Mutex<Box<dyn ReliabilityMode>>`).
774            // For streams without a meaningful reliability mode the
775            // implementation returns `true` for every `on_receive`,
776            // matching the historical "always queue" behavior.
777            let fresh = stream.with_reliability(|r| r.on_receive(parsed.header.sequence));
778            stream.update_rx_seq(parsed.header.sequence);
779            fresh
780        };
781
782        if is_fresh {
783            // Queue events for poll_shard
784            let queue = inbound.entry(shard_id).or_default();
785            let seq = parsed.header.sequence;
786            for (i, event_data) in events.into_iter().enumerate() {
787                use std::fmt::Write;
788                let mut event_id = String::with_capacity(24);
789                let _ = write!(event_id, "{}:{}", seq, i);
790                queue.push(StoredEvent::new(event_id, event_data, seq, shard_id));
791            }
792        } else {
793            tracing::debug!(
794                seq = parsed.header.sequence,
795                stream_id,
796                "Dropping duplicate packet"
797            );
798        }
799
800        session.touch();
801    }
802
803    /// Spawn receiver task.
804    ///
805    /// On Linux, uses a dedicated OS thread with batched recvmmsg for up to
806    /// 64 packets per syscall. On other platforms, uses standard async recv.
807    #[cfg(target_os = "linux")]
808    fn spawn_receiver(
809        shutdown: Arc<AtomicBool>,
810        shutdown_notify: Arc<Notify>,
811        socket: Arc<Socket>,
812        session: Arc<NetSession>,
813        inbound: InboundQueues,
814        num_shards: u16,
815    ) -> JoinHandle<()> {
816        let mut receiver = transport::BatchedPacketReceiver::new(socket.socket_arc());
817
818        tokio::spawn(async move {
819            while !shutdown.load(Ordering::Acquire) {
820                tokio::select! {
821                    result = receiver.recv() => {
822                        match result {
823                            Ok((data, source)) => {
824                                Self::process_packet(data, source, &session, &inbound, num_shards);
825                            }
826                            Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {
827                                tracing::warn!("batch receiver thread exited, stopping receiver");
828                                break;
829                            }
830                            Err(e) => {
831                                if !shutdown.load(Ordering::Acquire) {
832                                    tracing::warn!(error = %e, "receive error");
833                                }
834                            }
835                        }
836                    }
837                    _ = shutdown_notify.notified() => {
838                        break;
839                    }
840                }
841            }
842        })
843    }
844
845    /// Spawn receiver task (non-Linux fallback).
846    #[cfg(not(target_os = "linux"))]
847    fn spawn_receiver(
848        shutdown: Arc<AtomicBool>,
849        shutdown_notify: Arc<Notify>,
850        socket: Arc<Socket>,
851        session: Arc<NetSession>,
852        inbound: InboundQueues,
853        num_shards: u16,
854    ) -> JoinHandle<()> {
855        tokio::spawn(async move {
856            let mut receiver = PacketReceiver::new(socket.socket_arc());
857
858            while !shutdown.load(Ordering::Acquire) {
859                // Race recv against shutdown notification so the task
860                // can exit promptly instead of blocking on recv_from
861                // until a packet arrives.
862                tokio::select! {
863                    result = receiver.recv() => {
864                        match result {
865                            Ok((data, source)) => {
866                                Self::process_packet(data, source, &session, &inbound, num_shards);
867                            }
868                            Err(e) => {
869                                if !shutdown.load(Ordering::Acquire) {
870                                    tracing::warn!(error = %e, "receive error");
871                                }
872                            }
873                        }
874                    }
875                    _ = shutdown_notify.notified() => {
876                        break;
877                    }
878                }
879            }
880        })
881    }
882
883    /// Spawn heartbeat task.
884    fn spawn_heartbeat(
885        shutdown: Arc<AtomicBool>,
886        shutdown_notify: Arc<Notify>,
887        socket: Arc<Socket>,
888        session: Arc<NetSession>,
889        interval: std::time::Duration,
890        peer_addr: std::net::SocketAddr,
891    ) -> JoinHandle<()> {
892        tokio::spawn(async move {
893            let mut ticker = tokio::time::interval(interval);
894
895            loop {
896                tokio::select! {
897                    _ = ticker.tick() => {
898                        if shutdown.load(Ordering::Acquire) || !session.is_active() {
899                            break;
900                        }
901
902                        // `Session::build_heartbeat` routes through
903                        // `thread_local_pool` (same pool the data
904                        // path uses) so heartbeats share a single
905                        // TX counter with data and interleave
906                        // correctly on the wire. Constructing a
907                        // bespoke `PacketBuilder::new(&[0u8; 32],
908                        // session.session_id())` per tick would
909                        // (a) use the wrong key so the receiver's
910                        // AEAD verify would reject every heartbeat,
911                        // and (b) reuse counter=0 across successive
912                        // heartbeats so the receiver's replay
913                        // window would reject every heartbeat
914                        // after the first.
915                        let packet = session.build_heartbeat();
916
917                        if let Err(e) = socket.send_to(&packet, peer_addr).await {
918                            tracing::warn!(error = %e, "heartbeat send failed");
919                        }
920                    }
921                    _ = shutdown_notify.notified() => {
922                        break;
923                    }
924                }
925            }
926        })
927    }
928}
929
930#[async_trait]
931impl Adapter for NetAdapter {
932    async fn init(&mut self) -> Result<(), AdapterError> {
933        if self.initialized.load(Ordering::Acquire) {
934            return Ok(());
935        }
936
937        // Create socket with configured buffer sizes
938        let socket_config = match (
939            self.config.socket_recv_buffer,
940            self.config.socket_send_buffer,
941        ) {
942            (Some(recv), Some(send)) => transport::SocketBufferConfig {
943                recv_buffer_size: recv,
944                send_buffer_size: send,
945            },
946            _ => transport::SocketBufferConfig::default(),
947        };
948        let socket = Socket::with_config(self.config.bind_addr, socket_config)
949            .await
950            .map_err(|e| AdapterError::Connection(format!("socket creation failed: {}", e)))?;
951
952        let socket = Arc::new(socket);
953        self.socket = Some(socket.clone());
954
955        // Perform handshake — actual_peer is the real address from the wire
956        let (keys, actual_peer) = self.perform_handshake(&socket).await?;
957
958        // Create packet pool with TX key
959        // Create session with the actual peer address (not the configured one,
960        // which may be stale or pre-NAT)
961        let session = Arc::new(NetSession::new(
962            keys,
963            actual_peer,
964            self.config.packet_pool_size,
965            self.config.default_reliability.is_reliable(),
966        ));
967        self.session = Some(session.clone());
968
969        // Store in session manager for health checks (same Arc as the active session)
970        self.session_manager.set_session_arc(session.clone());
971
972        // Spawn background tasks
973        let recv_task = Self::spawn_receiver(
974            self.shutdown.clone(),
975            self.shutdown_notify.clone(),
976            socket.clone(),
977            session.clone(),
978            self.inbound.clone(),
979            self.config.num_shards,
980        );
981
982        let heartbeat_task = Self::spawn_heartbeat(
983            self.shutdown.clone(),
984            self.shutdown_notify.clone(),
985            socket,
986            session,
987            self.config.heartbeat_interval,
988            actual_peer,
989        );
990
991        {
992            let mut tasks = self.tasks.lock().await;
993            tasks.push(recv_task);
994            tasks.push(heartbeat_task);
995        }
996
997        self.initialized.store(true, Ordering::Release);
998
999        tracing::info!(
1000            bind_addr = %self.config.bind_addr,
1001            peer_addr = %self.config.peer_addr,
1002            role = ?self.config.role,
1003            "Net adapter initialized"
1004        );
1005
1006        Ok(())
1007    }
1008
1009    async fn on_batch(&self, batch: std::sync::Arc<Batch>) -> Result<(), AdapterError> {
1010        let session = self
1011            .session
1012            .as_ref()
1013            .ok_or_else(|| AdapterError::Connection("not connected".into()))?;
1014
1015        let socket = self
1016            .socket
1017            .as_ref()
1018            .ok_or_else(|| AdapterError::Connection("socket not initialized".into()))?;
1019
1020        let stream_id = batch.shard_id as u64;
1021        let peer_addr = session.peer_addr();
1022
1023        // Read stream config under the lock, then drop it immediately.
1024        // Holding the DashMap RefMut across .await would deadlock against
1025        // the receiver task which also calls get_or_create_stream().
1026        let reliable = {
1027            let stream = session.get_or_create_stream(stream_id);
1028            stream.with_reliability(|r| r.needs_ack())
1029            // RefMut dropped here
1030        };
1031
1032        // Convert events to bytes and batch them
1033        let mut current_batch: Vec<Bytes> = Vec::with_capacity(64);
1034        let mut current_size = 0usize;
1035
1036        // Thread-local pool with counter-based nonces — zero contention
1037        let pool = session.thread_local_pool();
1038        let mut builder = pool.get();
1039
1040        for event in &batch.events {
1041            let event_bytes = event.raw.clone();
1042            let frame_size = EventFrame::LEN_SIZE + event_bytes.len();
1043
1044            // Check if adding this event would exceed packet size
1045            if current_size + frame_size > protocol::MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
1046                // Acquire stream lock briefly for seq + reliability tracking
1047                let seq;
1048                {
1049                    let stream = session.get_or_create_stream(stream_id);
1050                    seq = stream.next_tx_seq();
1051                }
1052
1053                let flags = if reliable {
1054                    PacketFlags::RELIABLE
1055                } else {
1056                    PacketFlags::NONE
1057                };
1058
1059                let packet = builder.build(stream_id, seq, &current_batch, flags);
1060
1061                // No DashMap lock held during this .await
1062                socket
1063                    .send_to(&packet, peer_addr)
1064                    .await
1065                    .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
1066
1067                // Track for reliability with PRE-encryption inputs.
1068                // Stashing the encrypted bytes was unsound: the
1069                // receiver's replay window rejects retransmits that
1070                // carry a stale wire counter. The descriptor lets
1071                // the retransmit driver call `builder.build` again
1072                // with a fresh counter.
1073                if reliable {
1074                    // Per perf #133 — Arc-wrap the descriptor before
1075                    // handing it to the reliability mode. The
1076                    // retransmit window then shares the inner
1077                    // `Vec<Bytes>` rather than holding an owned copy.
1078                    let descriptor = std::sync::Arc::new(reliability::RetransmitDescriptor {
1079                        seq,
1080                        stream_id,
1081                        events: current_batch.clone(),
1082                        flags,
1083                    });
1084                    let stream = session.get_or_create_stream(stream_id);
1085                    stream.with_reliability(|r| r.on_send(descriptor));
1086                }
1087
1088                current_batch.clear();
1089                current_size = 0;
1090            }
1091
1092            current_batch.push(event_bytes);
1093            current_size += frame_size;
1094        }
1095
1096        // Send remaining events
1097        if !current_batch.is_empty() {
1098            let seq;
1099            {
1100                let stream = session.get_or_create_stream(stream_id);
1101                seq = stream.next_tx_seq();
1102            }
1103
1104            let flags = if reliable {
1105                PacketFlags::RELIABLE
1106            } else {
1107                PacketFlags::NONE
1108            };
1109
1110            let packet = builder.build(stream_id, seq, &current_batch, flags);
1111
1112            socket
1113                .send_to(&packet, peer_addr)
1114                .await
1115                .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
1116
1117            if reliable {
1118                // Per perf #133 — see the matching call site above.
1119                let descriptor = std::sync::Arc::new(reliability::RetransmitDescriptor {
1120                    seq,
1121                    stream_id,
1122                    events: current_batch.clone(),
1123                    flags,
1124                });
1125                let stream = session.get_or_create_stream(stream_id);
1126                stream.with_reliability(|r| r.on_send(descriptor));
1127            }
1128        }
1129
1130        session.touch();
1131
1132        Ok(())
1133    }
1134
1135    async fn poll_shard(
1136        &self,
1137        shard_id: u16,
1138        from_id: Option<&str>,
1139        limit: usize,
1140    ) -> Result<ShardPollResult, AdapterError> {
1141        let mut events = Vec::with_capacity(limit);
1142
1143        if let Some(queue) = self.inbound.get(&shard_id) {
1144            while events.len() < limit {
1145                if let Some(event) = queue.pop() {
1146                    if from_id.is_none() || event_id_gt(&event.id, from_id.unwrap_or("")) {
1147                        events.push(event);
1148                    }
1149                    // Events at or before the cursor have already been
1150                    // consumed — drop them instead of requeuing. Requeuing
1151                    // caused unbounded memory growth because these events
1152                    // can never pass an advancing cursor.
1153                } else {
1154                    break;
1155                }
1156            }
1157        }
1158
1159        let has_more = self
1160            .inbound
1161            .get(&shard_id)
1162            .map(|q| !q.is_empty())
1163            .unwrap_or(false);
1164        let next_id = events.last().map(|e| e.id.clone());
1165
1166        Ok(ShardPollResult {
1167            events,
1168            next_id,
1169            has_more,
1170        })
1171    }
1172
1173    async fn flush(&self) -> Result<(), AdapterError> {
1174        // For reliable streams, wait for all pending ACKs
1175        // Currently a no-op since we're fire-and-forget by default
1176        Ok(())
1177    }
1178
1179    async fn shutdown(&self) -> Result<(), AdapterError> {
1180        self.shutdown.store(true, Ordering::Release);
1181
1182        // Wake all tasks blocked on I/O so they can observe the shutdown flag.
1183        // notify_waiters wakes all current waiters (receiver + heartbeat).
1184        self.shutdown_notify.notify_waiters();
1185
1186        // Clear session
1187        self.session_manager.clear_session();
1188
1189        // Wait for tasks to complete
1190        let mut tasks = self.tasks.lock().await;
1191        for task in tasks.drain(..) {
1192            let _ = task.await;
1193        }
1194
1195        self.initialized.store(false, Ordering::Release);
1196
1197        tracing::info!("Net adapter shutdown complete");
1198
1199        Ok(())
1200    }
1201
1202    fn name(&self) -> &'static str {
1203        "net"
1204    }
1205
1206    async fn is_healthy(&self) -> bool {
1207        self.initialized.load(Ordering::Acquire) && self.session_manager.check_session()
1208    }
1209}
1210
1211impl std::fmt::Debug for NetAdapter {
1212    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1213        f.debug_struct("NetAdapter")
1214            .field("config", &self.config)
1215            .field("initialized", &self.initialized.load(Ordering::Relaxed))
1216            .finish()
1217    }
1218}
1219
1220/// Compare two event IDs numerically.
1221///
1222/// IDs are formatted as `"seq:idx"`. Lexicographic comparison is wrong for
1223/// numeric values (e.g. `"9:0" > "10:0"` lexicographically). This function
1224/// parses the components and compares numerically, falling back to string
1225/// comparison only if parsing fails.
1226fn event_id_gt(a: &str, b: &str) -> bool {
1227    fn parse_id(id: &str) -> Option<(u64, u64)> {
1228        let (seq, idx) = id.split_once(':')?;
1229        Some((seq.parse().ok()?, idx.parse().ok()?))
1230    }
1231
1232    match (parse_id(a), parse_id(b)) {
1233        (Some(a), Some(b)) => a > b,
1234        _ => a > b, // fallback to lexicographic
1235    }
1236}
1237
1238#[cfg(test)]
1239mod tests {
1240    use super::*;
1241
1242    #[test]
1243    fn test_adapter_creation() {
1244        let psk = [0x42u8; 32];
1245        let peer_pubkey = [0x24u8; 32];
1246
1247        let config = NetAdapterConfig::initiator(
1248            "127.0.0.1:0".parse().unwrap(),
1249            "127.0.0.1:9999".parse().unwrap(),
1250            psk,
1251            peer_pubkey,
1252        );
1253
1254        let adapter = NetAdapter::new(config).unwrap();
1255        assert_eq!(adapter.name(), "net");
1256    }
1257
1258    #[test]
1259    fn test_shard_id_from_stream_id_uses_modulo() {
1260        // Regression: shard_id was computed as `stream_id as u16` (truncation),
1261        // which collides for stream IDs that differ only in upper bits.
1262        // The fix uses `stream_id % num_shards`.
1263        let num_shards: u16 = 8;
1264
1265        // Two stream IDs that are identical in their low 16 bits
1266        // but different overall must map to the same shard via modulo,
1267        // while truncation would also give the same result here.
1268        // More importantly, a large stream_id must stay within [0, num_shards).
1269        let stream_a: u64 = 0xDEAD_BEEF_0000_0003;
1270        let stream_b: u64 = 0xCAFE_BABE_0000_0003;
1271
1272        let shard_a = (stream_a % num_shards as u64) as u16;
1273        let shard_b = (stream_b % num_shards as u64) as u16;
1274
1275        assert!(
1276            shard_a < num_shards,
1277            "shard must be in range [0, num_shards)"
1278        );
1279        assert!(
1280            shard_b < num_shards,
1281            "shard must be in range [0, num_shards)"
1282        );
1283
1284        // Large stream IDs that would overflow u16 must still be valid shard IDs
1285        let big_stream: u64 = 0xFFFF_FFFF_FFFF_FFFF;
1286        let shard_big = (big_stream % num_shards as u64) as u16;
1287        assert!(shard_big < num_shards);
1288
1289        // Truncation would give 0xFFFF = 65535, which is >= num_shards.
1290        // Modulo gives a valid shard.
1291        assert_ne!(
1292            big_stream as u16, shard_big,
1293            "modulo must differ from truncation for large stream IDs"
1294        );
1295    }
1296
1297    #[test]
1298    fn test_invalid_config() {
1299        let psk = [0x42u8; 32];
1300        let peer_pubkey = [0x24u8; 32];
1301
1302        let mut config = NetAdapterConfig::initiator(
1303            "127.0.0.1:0".parse().unwrap(),
1304            "127.0.0.1:9999".parse().unwrap(),
1305            psk,
1306            peer_pubkey,
1307        );
1308        config.peer_static_pubkey = None;
1309
1310        let result = NetAdapter::new(config);
1311        assert!(result.is_err());
1312    }
1313
1314    // Regression: event_id_gt used lexicographic comparison, so "9:0" > "10:0"
1315    // was true (wrong). Now uses numeric comparison (BUGS_4 #2).
1316    #[test]
1317    fn test_event_id_gt_numeric_ordering() {
1318        // Basic ordering
1319        assert!(event_id_gt("2:0", "1:0"));
1320        assert!(!event_id_gt("1:0", "2:0"));
1321        assert!(!event_id_gt("1:0", "1:0"));
1322
1323        // The critical case: double-digit seq must compare correctly
1324        assert!(event_id_gt("10:0", "9:0"));
1325        assert!(event_id_gt("100:0", "99:0"));
1326        assert!(!event_id_gt("9:0", "10:0"));
1327
1328        // Index comparison within same sequence
1329        assert!(event_id_gt("5:2", "5:1"));
1330        assert!(!event_id_gt("5:1", "5:2"));
1331
1332        // Large sequences
1333        assert!(event_id_gt("1000000:0", "999999:0"));
1334    }
1335
1336    // Regression: poll_shard used to destructively pop events that didn't
1337    // pass the cursor filter, causing permanent data loss (BUGS_4 #1).
1338    // This is tested indirectly via event_id_gt since poll_shard requires
1339    // a full adapter setup, but the non-destructive requeue logic is
1340    // verified by the SegQueue re-push in the implementation.
1341    #[test]
1342    fn test_event_id_gt_edge_cases() {
1343        // Empty strings
1344        assert!(event_id_gt("1:0", ""));
1345        // Malformed IDs fall back to string comparison
1346        assert!(event_id_gt("b", "a"));
1347        assert!(!event_id_gt("a", "b"));
1348    }
1349
1350    /// Regression: packets built by PacketBuilder must survive process_packet.
1351    /// This test bypasses the network and directly verifies the encrypt→decrypt
1352    /// data path, catching AAD mismatches, nonce construction bugs, and key
1353    /// derivation errors.
1354    #[test]
1355    fn test_build_then_process_packet_roundtrip() {
1356        use crate::adapter::net::crypto::{NoiseHandshake, StaticKeypair};
1357        use dashmap::DashMap;
1358        use std::sync::Arc;
1359
1360        // Perform a real handshake to get matching keys
1361        let psk = [0x42u8; 32];
1362        let responder_kp = StaticKeypair::generate();
1363
1364        let mut initiator = NoiseHandshake::initiator(&psk, &responder_kp.public).unwrap();
1365        let mut responder = NoiseHandshake::responder(&psk, &responder_kp).unwrap();
1366
1367        let msg1 = initiator.write_message(&[]).unwrap();
1368        responder.read_message(&msg1).unwrap();
1369        let msg2 = responder.write_message(&[]).unwrap();
1370        initiator.read_message(&msg2).unwrap();
1371
1372        let init_keys = initiator.into_session_keys().unwrap();
1373        let resp_keys = responder.into_session_keys().unwrap();
1374
1375        // Initiator builds a packet
1376        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1377        let events = vec![
1378            Bytes::from(r#"{"token":"hello"}"#),
1379            Bytes::from(r#"{"token":"world"}"#),
1380        ];
1381        let packet = builder.build(0, 0, &events, PacketFlags::NONE);
1382
1383        // Responder processes the packet
1384        let resp_session = Arc::new(NetSession::new(
1385            resp_keys,
1386            "127.0.0.1:5000".parse().unwrap(),
1387            4,
1388            false,
1389        ));
1390        let inbound: InboundQueues = Arc::new(DashMap::new());
1391        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1392
1393        NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1394
1395        // Events should be queued in shard 0
1396        let queue = inbound.get(&0).expect("shard 0 should have events");
1397        assert_eq!(queue.len(), 2, "expected 2 events, got {}", queue.len());
1398
1399        let e1 = queue.pop().unwrap();
1400        assert_eq!(&e1.raw[..], br#"{"token":"hello"}"#);
1401
1402        let e2 = queue.pop().unwrap();
1403        assert_eq!(&e2.raw[..], br#"{"token":"world"}"#);
1404    }
1405
1406    /// Helper: perform a Noise handshake and return matched key pairs.
1407    fn make_session_keys() -> (SessionKeys, SessionKeys) {
1408        use crate::adapter::net::crypto::{NoiseHandshake, StaticKeypair};
1409
1410        let psk = [0x42u8; 32];
1411        let responder_kp = StaticKeypair::generate();
1412
1413        let mut initiator = NoiseHandshake::initiator(&psk, &responder_kp.public).unwrap();
1414        let mut responder = NoiseHandshake::responder(&psk, &responder_kp).unwrap();
1415
1416        let msg1 = initiator.write_message(&[]).unwrap();
1417        responder.read_message(&msg1).unwrap();
1418        let msg2 = responder.write_message(&[]).unwrap();
1419        initiator.read_message(&msg2).unwrap();
1420
1421        (
1422            initiator.into_session_keys().unwrap(),
1423            responder.into_session_keys().unwrap(),
1424        )
1425    }
1426
1427    #[test]
1428    fn test_process_packet_rejects_truncated_packet() {
1429        use dashmap::DashMap;
1430        use std::sync::Arc;
1431
1432        let (init_keys, resp_keys) = make_session_keys();
1433
1434        // Build a valid packet, then truncate it
1435        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1436        let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1437
1438        let resp_session = Arc::new(NetSession::new(
1439            resp_keys,
1440            "127.0.0.1:5000".parse().unwrap(),
1441            4,
1442            false,
1443        ));
1444        let inbound: InboundQueues = Arc::new(DashMap::new());
1445        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1446
1447        // Truncate: remove last 10 bytes (partial auth tag)
1448        let truncated = packet.slice(..packet.len() - 10);
1449        NetAdapter::process_packet(truncated, source, &resp_session, &inbound, 1);
1450        assert!(
1451            inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1452            "truncated packet must be silently dropped"
1453        );
1454    }
1455
1456    #[test]
1457    fn test_process_packet_rejects_tampered_payload() {
1458        use dashmap::DashMap;
1459        use std::sync::Arc;
1460
1461        let (init_keys, resp_keys) = make_session_keys();
1462
1463        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1464        let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1465
1466        let resp_session = Arc::new(NetSession::new(
1467            resp_keys,
1468            "127.0.0.1:5000".parse().unwrap(),
1469            4,
1470            false,
1471        ));
1472        let inbound: InboundQueues = Arc::new(DashMap::new());
1473        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1474
1475        // Tamper: flip a byte in the encrypted payload
1476        let mut tampered = bytes::BytesMut::from(&packet[..]);
1477        tampered[super::protocol::HEADER_SIZE + 2] ^= 0xFF;
1478        NetAdapter::process_packet(tampered.freeze(), source, &resp_session, &inbound, 1);
1479
1480        assert!(
1481            inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1482            "tampered packet must be rejected by AEAD"
1483        );
1484    }
1485
1486    #[test]
1487    fn test_process_packet_rejects_wrong_session_id() {
1488        use dashmap::DashMap;
1489        use std::sync::Arc;
1490
1491        let (init_keys, resp_keys) = make_session_keys();
1492
1493        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1494        let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1495
1496        // Create session with a DIFFERENT session_id
1497        let mut wrong_keys = resp_keys;
1498        wrong_keys.session_id = 0xDEAD;
1499        let resp_session = Arc::new(NetSession::new(
1500            wrong_keys,
1501            "127.0.0.1:5000".parse().unwrap(),
1502            4,
1503            false,
1504        ));
1505        let inbound: InboundQueues = Arc::new(DashMap::new());
1506        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1507
1508        NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1509
1510        assert!(
1511            inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1512            "packet with wrong session_id must be dropped"
1513        );
1514    }
1515
1516    #[test]
1517    fn test_process_packet_multi_packet_batch_all_events_arrive() {
1518        use dashmap::DashMap;
1519        use std::sync::Arc;
1520
1521        let (init_keys, resp_keys) = make_session_keys();
1522
1523        let resp_session = Arc::new(NetSession::new(
1524            resp_keys,
1525            "127.0.0.1:5000".parse().unwrap(),
1526            4,
1527            false,
1528        ));
1529        let inbound: InboundQueues = Arc::new(DashMap::new());
1530        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1531
1532        // Build events large enough to span multiple packets.
1533        // Each event is ~200 bytes, MAX_PAYLOAD_SIZE is ~8112, so ~40 per packet.
1534        // 200 events → ~5 packets.
1535        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1536        let total_events = 200;
1537        let mut seq = 0u64;
1538
1539        // Simulate on_batch splitting into multiple packets
1540        let mut current_batch: Vec<Bytes> = Vec::new();
1541        let mut current_size = 0;
1542
1543        for i in 0..total_events {
1544            let data = format!("{{\"i\":{},\"pad\":\"{}\"}}", i, "x".repeat(150));
1545            let event_bytes = Bytes::from(data);
1546            let frame_size = EventFrame::LEN_SIZE + event_bytes.len();
1547
1548            if current_size + frame_size > protocol::MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
1549                let packet = builder.build(0, seq, &current_batch, PacketFlags::NONE);
1550                NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1551                seq += 1;
1552                current_batch.clear();
1553                current_size = 0;
1554            }
1555
1556            current_batch.push(event_bytes);
1557            current_size += frame_size;
1558        }
1559
1560        if !current_batch.is_empty() {
1561            let packet = builder.build(0, seq, &current_batch, PacketFlags::NONE);
1562            NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1563        }
1564
1565        // All events must arrive
1566        let queue = inbound.get(&0).expect("shard 0 should have events");
1567        assert_eq!(
1568            queue.len(),
1569            total_events,
1570            "all {} events must arrive across multiple packets",
1571            total_events
1572        );
1573    }
1574
1575    #[test]
1576    fn test_build_then_process_packet_both_directions() {
1577        use dashmap::DashMap;
1578        use std::sync::Arc;
1579
1580        let (init_keys, resp_keys) = make_session_keys();
1581        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1582
1583        // Direction 1: initiator → responder
1584        {
1585            let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1586            let packet = builder.build(0, 0, &[Bytes::from_static(b"i2r")], PacketFlags::NONE);
1587
1588            let session = Arc::new(NetSession::new(resp_keys.clone(), source, 4, false));
1589            let inbound: InboundQueues = Arc::new(DashMap::new());
1590            NetAdapter::process_packet(packet, source, &session, &inbound, 1);
1591
1592            let queue = inbound.get(&0).expect("i2r: shard 0 should have events");
1593            assert_eq!(queue.len(), 1, "i2r: expected 1 event");
1594            assert_eq!(&queue.pop().unwrap().raw[..], b"i2r");
1595        }
1596
1597        // Direction 2: responder → initiator
1598        {
1599            let mut builder = PacketBuilder::new(&resp_keys.tx_key, resp_keys.session_id);
1600            let packet = builder.build(0, 0, &[Bytes::from_static(b"r2i")], PacketFlags::NONE);
1601
1602            let session = Arc::new(NetSession::new(init_keys.clone(), source, 4, false));
1603            let inbound: InboundQueues = Arc::new(DashMap::new());
1604            NetAdapter::process_packet(packet, source, &session, &inbound, 1);
1605
1606            let queue = inbound.get(&0).expect("r2i: shard 0 should have events");
1607            assert_eq!(queue.len(), 1, "r2i: expected 1 event");
1608            assert_eq!(&queue.pop().unwrap().raw[..], b"r2i");
1609        }
1610    }
1611
1612    #[test]
1613    fn test_poll_shard_cursor_drops_consumed_events() {
1614        // Verify that poll_shard with a cursor drops events at or before
1615        // the cursor (they've already been consumed) and returns only
1616        // events after the cursor. The queue should be empty afterward —
1617        // no unbounded requeue growth.
1618        use std::sync::Arc;
1619
1620        let (init_keys, resp_keys) = make_session_keys();
1621
1622        let resp_session = Arc::new(NetSession::new(
1623            resp_keys,
1624            "127.0.0.1:5000".parse().unwrap(),
1625            4,
1626            false,
1627        ));
1628        let inbound: InboundQueues = Arc::new(DashMap::new());
1629        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1630
1631        // Send 3 packets (sequences 0, 1, 2), each with 1 event
1632        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1633        for seq in 0..3u64 {
1634            let events = vec![Bytes::from(format!("event-{}", seq))];
1635            let packet = builder.build(0, seq, &events, PacketFlags::NONE);
1636            NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1637        }
1638
1639        let queue = inbound.get(&0u16).unwrap();
1640        assert_eq!(queue.len(), 3);
1641
1642        // Simulate poll_shard with cursor "0:0" — drops event 0:0,
1643        // returns events 1:0 and 2:0
1644        let from_id = "0:0";
1645        let mut events = Vec::new();
1646        while events.len() < 10 {
1647            if let Some(event) = queue.pop() {
1648                if event_id_gt(&event.id, from_id) {
1649                    events.push(event);
1650                }
1651                // Events at/before cursor are dropped (not requeued)
1652            } else {
1653                break;
1654            }
1655        }
1656
1657        assert_eq!(events.len(), 2, "should get 2 events after cursor 0:0");
1658        assert_eq!(events[0].id, "1:0");
1659        assert_eq!(events[1].id, "2:0");
1660
1661        // Queue should be empty — consumed events are dropped, not requeued
1662        assert_eq!(queue.len(), 0, "queue should be empty after poll drains it");
1663    }
1664
1665    #[test]
1666    fn test_process_packet_old_counter_rejected() {
1667        // Verify that a packet with a counter below the replay window
1668        // is rejected after the window has advanced.
1669        use std::sync::Arc;
1670
1671        let (init_keys, resp_keys) = make_session_keys();
1672        let resp_session = Arc::new(NetSession::new(
1673            resp_keys,
1674            "127.0.0.1:5000".parse().unwrap(),
1675            4,
1676            false,
1677        ));
1678        let inbound: InboundQueues = Arc::new(DashMap::new());
1679        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1680
1681        // Send 1100 packets to advance the rx_counter past the replay window (1024)
1682        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1683        for seq in 0..1100u64 {
1684            let packet = builder.build(0, seq, &[Bytes::from_static(b"x")], PacketFlags::NONE);
1685            NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1686        }
1687        assert_eq!(inbound.get(&0).unwrap().len(), 1100);
1688
1689        // Build a packet with a fresh builder whose counter starts at 0.
1690        // The rx_counter is now at ~1100, so counter 0 is outside the
1691        // 1024-wide replay window and must be rejected.
1692        let mut stale_builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1693        let stale_packet =
1694            stale_builder.build(0, 9999, &[Bytes::from_static(b"stale")], PacketFlags::NONE);
1695        NetAdapter::process_packet(stale_packet, source, &resp_session, &inbound, 1);
1696
1697        // Should still be 1100 — stale packet rejected
1698        assert_eq!(
1699            inbound.get(&0).unwrap().len(),
1700            1100,
1701            "packet with stale counter must be rejected"
1702        );
1703    }
1704
1705    #[test]
1706    fn test_process_packet_far_future_counter_rejected() {
1707        // Verify that a packet with a counter far beyond MAX_FORWARD is
1708        // rejected, preventing an attacker from advancing the rx_counter
1709        // and denying subsequent legitimate packets.
1710        use std::sync::Arc;
1711
1712        let (_init_keys, resp_keys) = make_session_keys();
1713
1714        // Build a valid packet, then manually tamper the nonce counter
1715        // to a huge value. The AEAD will fail because the nonce doesn't
1716        // match, but we're testing that is_valid_rx_counter rejects it
1717        // before even attempting decryption.
1718        let resp_session = Arc::new(NetSession::new(
1719            resp_keys,
1720            "127.0.0.1:5000".parse().unwrap(),
1721            4,
1722            false,
1723        ));
1724
1725        // Directly test the cipher's counter validation
1726        let rx_cipher = resp_session.rx_cipher();
1727        assert!(
1728            !rx_cipher.is_valid_rx_counter(u64::MAX),
1729            "counter at u64::MAX must be rejected (far beyond MAX_FORWARD)"
1730        );
1731        assert!(
1732            rx_cipher.is_valid_rx_counter(0),
1733            "counter 0 should be valid initially"
1734        );
1735    }
1736
1737    /// Regression: BUG_REPORT.md #5 — `process_packet` previously
1738    /// discarded the bool returned by `r.on_receive(seq)` on the
1739    /// reliability layer, queueing events even for duplicates.
1740    /// Each retransmit re-encrypts with a fresh outer counter, so
1741    /// the cipher's replay window does not catch this; without
1742    /// honoring `on_receive`, the inbound queue accumulates
1743    /// duplicates and breaks exactly-once delivery on reliable
1744    /// streams.
1745    ///
1746    /// We construct the duplicate-detection scenario by building
1747    /// two distinct packets that share the same stream sequence.
1748    /// On a reliable session the second one's `on_receive` returns
1749    /// `false`, so `process_packet` must not enqueue its events.
1750    /// (The cipher's outer counter is fresh on both packets, so
1751    /// the replay window can't filter them — only the reliability
1752    /// layer's check stops the duplicate.)
1753    #[test]
1754    fn process_packet_drops_duplicates_per_reliability_decision() {
1755        use dashmap::DashMap;
1756        use std::sync::Arc;
1757
1758        let (init_keys, resp_keys) = make_session_keys();
1759
1760        // Reliable session — its streams use `ReliableStream`,
1761        // whose `on_receive` returns `false` for `seq <
1762        // next_expected` (duplicates).
1763        let resp_session = Arc::new(NetSession::new(
1764            resp_keys,
1765            "127.0.0.1:5000".parse().unwrap(),
1766            4,
1767            true, // default_reliable
1768        ));
1769        let inbound: InboundQueues = Arc::new(DashMap::new());
1770        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1771
1772        // Two packets on stream 7. First carries sequences 0..1,
1773        // second is a duplicate (same seq=0) that should be
1774        // filtered. We deliver seq=0 then seq=1 first to advance
1775        // `next_expected` past 0, then a packet with seq=0 — that
1776        // last one is the duplicate.
1777        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1778        let packet0 = builder.build(7, 0, &[Bytes::from(r#"{"first":0}"#)], PacketFlags::NONE);
1779        let packet1 = builder.build(7, 1, &[Bytes::from(r#"{"first":1}"#)], PacketFlags::NONE);
1780        // Re-encrypted retransmit of seq=0 — same stream, same seq,
1781        // different payload. This is the scenario the bug allowed
1782        // through.
1783        let packet0_dup = builder.build(
1784            7,
1785            0,
1786            &[Bytes::from(r#"{"dup":"should_not_appear"}"#)],
1787            PacketFlags::NONE,
1788        );
1789
1790        NetAdapter::process_packet(packet0, source, &resp_session, &inbound, 1);
1791        NetAdapter::process_packet(packet1, source, &resp_session, &inbound, 1);
1792        NetAdapter::process_packet(packet0_dup, source, &resp_session, &inbound, 1);
1793
1794        let queue = inbound.get(&0).expect("shard 0 should exist");
1795        assert_eq!(
1796            queue.len(),
1797            2,
1798            "duplicate packet must NOT enqueue (BUG_REPORT.md #5); \
1799             got {} events, expected exactly 2 (seq=0 and seq=1, no dup)",
1800            queue.len()
1801        );
1802
1803        // Drain in FIFO order and assert no `should_not_appear`
1804        // event sneaked through.
1805        let e0 = queue.pop().unwrap();
1806        assert_eq!(&e0.raw[..], br#"{"first":0}"#);
1807        let e1 = queue.pop().unwrap();
1808        assert_eq!(&e1.raw[..], br#"{"first":1}"#);
1809        assert!(queue.is_empty());
1810    }
1811
1812    /// Regression: heartbeats must be AEAD-authenticated so an
1813    /// off-path attacker who knows or observes the session_id
1814    /// cannot spoof them. Pre-fix the receiver only checked
1815    /// `source == peer_addr` (UDP source — spoofable) and
1816    /// `session_id` match (in cleartext on every packet); now the
1817    /// 16-byte Poly1305 tag binds the heartbeat to the session
1818    /// key.
1819    #[test]
1820    fn heartbeat_is_aead_authenticated() {
1821        use crate::adapter::net::pool::PacketBuilder;
1822        use dashmap::DashMap;
1823        use std::sync::Arc;
1824
1825        let (init_keys, resp_keys) = make_session_keys();
1826
1827        let resp_session = Arc::new(NetSession::new(
1828            resp_keys,
1829            "127.0.0.1:5000".parse().unwrap(),
1830            4,
1831            false,
1832        ));
1833        let inbound: InboundQueues = Arc::new(DashMap::new());
1834        let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1835
1836        // Build a legitimate heartbeat with the initiator's
1837        // session key and tag it.
1838        let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1839        let heartbeat = builder.build_heartbeat();
1840        let last_activity_before = resp_session.last_activity_ns();
1841        std::thread::sleep(std::time::Duration::from_millis(2));
1842
1843        // Process: this must succeed and call session.touch().
1844        NetAdapter::process_packet(heartbeat, source, &resp_session, &inbound, 1);
1845        let last_activity_after = resp_session.last_activity_ns();
1846        assert!(
1847            last_activity_after > last_activity_before,
1848            "legitimate AEAD-tagged heartbeat must call session.touch()"
1849        );
1850
1851        // Forge an unauthenticated heartbeat: header-only, no tag.
1852        // Pre-fix this would have passed; post-fix it must be
1853        // rejected.
1854        let mut forged = bytes::BytesMut::new();
1855        let header = NetHeader::heartbeat(resp_session.session_id());
1856        forged.extend_from_slice(&header.to_bytes());
1857        let forged = forged.freeze();
1858        let last_activity_before = resp_session.last_activity_ns();
1859        std::thread::sleep(std::time::Duration::from_millis(2));
1860        NetAdapter::process_packet(forged, source, &resp_session, &inbound, 1);
1861        let last_activity_after = resp_session.last_activity_ns();
1862        assert_eq!(
1863            last_activity_before, last_activity_after,
1864            "unauthenticated heartbeat (no AEAD tag) must NOT touch the session"
1865        );
1866
1867        // Forge a heartbeat with the right session_id but a
1868        // garbage 16-byte "tag". Tag verification fails.
1869        let mut forged_tag = bytes::BytesMut::new();
1870        let mut header_bytes = NetHeader::heartbeat(resp_session.session_id()).to_bytes();
1871        // Stamp a plausible nonce so the receiver gets to the
1872        // decrypt step (otherwise it bails earlier on counter).
1873        header_bytes[12..16].copy_from_slice(&[0u8; 4]);
1874        header_bytes[16..24].copy_from_slice(&1u64.to_le_bytes());
1875        forged_tag.extend_from_slice(&header_bytes);
1876        forged_tag.extend_from_slice(&[0xAAu8; 16]); // garbage tag
1877        let forged_tag = forged_tag.freeze();
1878        let last_activity_before = resp_session.last_activity_ns();
1879        std::thread::sleep(std::time::Duration::from_millis(2));
1880        NetAdapter::process_packet(forged_tag, source, &resp_session, &inbound, 1);
1881        let last_activity_after = resp_session.last_activity_ns();
1882        assert_eq!(
1883            last_activity_before, last_activity_after,
1884            "heartbeat with garbage AEAD tag must NOT touch the session"
1885        );
1886    }
1887
1888    /// Regression: the handshake responder must rate-limit per
1889    /// source so a flooder can't monopolize the recv loop.
1890    /// `HandshakePacer` is the building block: it tracks
1891    /// `(count, window_start)` per source and rejects after
1892    /// `max_per_window` attempts within `window`.
1893    #[test]
1894    fn handshake_pacer_rejects_floods_per_source() {
1895        use std::time::Duration;
1896        let mut pacer = HandshakePacer::new(3, Duration::from_millis(50));
1897
1898        let attacker: std::net::SocketAddr = "10.0.0.1:9000".parse().unwrap();
1899        let legit: std::net::SocketAddr = "10.0.0.2:9000".parse().unwrap();
1900
1901        // Attacker fires 3 attempts — all allowed (within budget).
1902        for _ in 0..3 {
1903            assert!(pacer.check_and_record(attacker));
1904        }
1905        // Fourth and beyond — rejected.
1906        for _ in 0..10 {
1907            assert!(
1908                !pacer.check_and_record(attacker),
1909                "attacker exceeding budget must be dropped"
1910            );
1911        }
1912
1913        // The legitimate initiator (different source) is unaffected
1914        // by the attacker's burst — the budget is per-source.
1915        assert!(
1916            pacer.check_and_record(legit),
1917            "legitimate source must still get through despite attacker flood"
1918        );
1919
1920        // After the window expires the attacker's budget refills.
1921        std::thread::sleep(Duration::from_millis(55));
1922        assert!(
1923            pacer.check_and_record(attacker),
1924            "attacker budget must refill after window"
1925        );
1926    }
1927}