1mod batch;
34pub mod behavior;
35#[cfg_attr(not(feature = "cortex"), allow(dead_code))]
42mod cancel_registry;
43pub mod channel;
44pub mod compute;
45mod config;
46pub mod contested;
47pub mod continuity;
48#[cfg(feature = "cortex")]
49pub mod cortex;
50mod crypto;
51mod failure;
52pub mod identity;
53mod mesh;
54#[cfg(feature = "dataforts")]
61pub mod dataforts;
62#[cfg(feature = "cortex")]
63pub mod mesh_rpc;
64#[cfg(feature = "cortex")]
65pub mod mesh_rpc_metrics;
66#[cfg(feature = "netdb")]
67pub mod netdb;
68mod pool;
69mod protocol;
70mod proxy;
71#[cfg(feature = "redex")]
72pub mod redex;
73mod reliability;
74mod reroute;
75mod route;
76mod router;
77mod session;
78pub mod state;
79mod stream;
80pub mod subnet;
81pub mod subprotocol;
82mod swarm;
83mod transport;
84#[cfg(feature = "nat-traversal")]
85pub mod traversal;
86
87#[cfg(target_os = "linux")]
88mod linux;
89
90pub use batch::AdaptiveBatcher;
91pub use channel::{
92 AckReason, AuthGuard, AuthVerdict, ChannelConfig, ChannelConfigRegistry, ChannelError,
93 ChannelHash, ChannelId, ChannelName, ChannelPublisher, ChannelRegistry, MembershipMsg,
94 OnFailure, PublishConfig, PublishReport, SubscriberRoster, Visibility,
95 SUBPROTOCOL_CHANNEL_MEMBERSHIP,
96};
97pub use compute::{
98 DaemonError, DaemonFactoryRegistry, DaemonHost, DaemonHostConfig, DaemonRegistry, DaemonStats,
99 FactoryEntry, MeshDaemon, MigrationError, MigrationMessage, MigrationOrchestrator,
100 MigrationPhase, MigrationSourceHandler, MigrationState, MigrationTargetHandler,
101 PlacementDecision, Scheduler, SchedulerError, SUBPROTOCOL_MIGRATION,
102};
103pub use config::{ConnectionRole, NetAdapterConfig, ReliabilityConfig};
104pub use contested::{
105 CorrelatedFailureConfig, CorrelatedFailureDetector, CorrelationVerdict, FailureCause,
106 PartitionDetector, PartitionPhase, PartitionRecord, ReconcileOutcome, Side,
107 SUBPROTOCOL_PARTITION,
108};
109pub use continuity::{
110 assess_continuity, CausalCone, Causality, ContinuityProof, ContinuityStatus, Discontinuity,
111 DiscontinuityReason, ForkRecord, HorizonDivergence, ObservationWindow, ProofError,
112 PropagationModel, SuperpositionPhase, SuperpositionState, SUBPROTOCOL_CONTINUITY,
113};
114#[cfg(feature = "cortex")]
115pub use cortex::{
116 CortexAdapter, CortexAdapterConfig, CortexAdapterError, EventEnvelope, EventMeta,
117 FoldErrorPolicy, IntoRedexPayload, StartPosition, EVENT_META_SIZE,
118};
119pub use crypto::{CryptoError, SessionKeys, StaticKeypair};
120pub use failure::{
121 CircuitBreaker, CircuitState, FailureDetector, FailureDetectorConfig, FailureStats,
122 LossSimulator, NodeStatus, RecoveryAction, RecoveryManager, RecoveryStats,
123};
124pub use identity::{
125 EntityError, EntityId, EntityKeypair, OriginStamp, PermissionToken, TokenCache, TokenError,
126 TokenScope,
127};
128pub use mesh::{MeshNode, MeshNodeConfig, PartitionFilter};
129#[cfg(feature = "netdb")]
130pub use netdb::{MemoriesFilter, NetDb, NetDbBuilder, NetDbError, NetDbSnapshot, TasksFilter};
131pub use pool::{PacketBuilder, PacketPool, SharedLocalPool, ThreadLocalPool};
137pub use protocol::{
138 EventFrame, NackPayload, NetHeader, PacketFlags, HEADER_SIZE, NONCE_SIZE, TAG_SIZE,
139};
140pub use proxy::{
141 ForwardResult, HopStats, MultiHopPacketBuilder, NetProxy, ProxyConfig, ProxyError, ProxyStats,
142};
143#[cfg(feature = "redex")]
144pub use redex::{
145 FsyncPolicy, IndexOp, IndexStart, OrderedAppender, Redex, RedexEntry, RedexError, RedexEvent,
146 RedexFile, RedexFileConfig, RedexFlags, RedexFold, RedexIndex, TypedRedexFile,
147};
148pub use reliability::{FireAndForget, ReliabilityMode, ReliableStream, RetransmitDescriptor};
149pub use reroute::ReroutePolicy;
150pub use route::{
151 AggregateStats, RouteEntry, RouteFlags, RoutingHeader, RoutingTable, SchedulerStreamStats,
152 ROUTING_HEADER_SIZE,
153};
154pub use router::{FairScheduler, NetRouter, RouteAction, RouterConfig, RouterError, RouterStats};
155#[doc(hidden)]
159pub use router::{
160 arm_send_drain_histo, send_batch_stats, send_drain_histo_snapshot, send_drain_max,
161};
162pub use session::{NetSession, SessionManager, StreamState, TxAdmit, TxSlotGuard};
163pub use state::{
164 CausalChainBuilder, CausalEvent, CausalLink, ChainError, EntityLog, HorizonEncoder, LogError,
165 LogIndex, ObservedHorizon, SnapshotStore, StateSnapshot, CAUSAL_LINK_SIZE, SUBPROTOCOL_CAUSAL,
166 SUBPROTOCOL_SNAPSHOT,
167};
168pub use stream::{
169 CloseBehavior, Reliability, Stream, StreamConfig, StreamError, StreamStats,
170 DEFAULT_STREAM_WINDOW_BYTES,
171};
172pub use subnet::{DropReason, ForwardDecision, SubnetGateway, SubnetId, SubnetPolicy, SubnetRule};
173pub use subprotocol::{
174 negotiate, MigrationSubprotocolHandler, NegotiatedSet, OutboundMigrationMessage,
175 SubprotocolDescriptor, SubprotocolManifest, SubprotocolRegistry, SubprotocolVersion,
176 SUBPROTOCOL_NEGOTIATION,
177};
178pub use swarm::{
179 Capabilities, CapabilityAd, EdgeInfo, GraphStats, LocalGraph, NodeInfo, Pingwave,
180 MAX_GRAPH_NODES, MAX_SEEN_PINGWAVES, PINGWAVE_SIZE,
181};
182pub use transport::{NetSocket, PacketReceiver, PacketSender, ParsedPacket, SocketBufferConfig};
183#[cfg(feature = "batched-ingress")]
189#[doc(hidden)]
190pub use transport::{
191 arm_recv_drain_histo, recv_batch_stats, recv_drain_histo_snapshot, recv_drain_max,
192 RECV_DRAIN_BUCKETS,
193};
194
195use async_trait::async_trait;
196use bytes::Bytes;
197use crossbeam_queue::SegQueue;
198use dashmap::DashMap;
199use std::sync::atomic::{AtomicBool, Ordering};
200use std::sync::Arc;
201use tokio::sync::Mutex as TokioMutex;
202use tokio::sync::Notify;
203use tokio::task::JoinHandle;
204
205use crate::adapter::{Adapter, ShardPollResult};
206use crate::error::AdapterError;
207use crate::event::{Batch, StoredEvent};
208
209use crypto::NoiseHandshake;
210use session::SessionManager as SessionMgr;
211use transport::NetSocket as Socket;
212
213pub use routing::{route_to_shard, stream_id_from_bytes, stream_id_from_key};
215
216const COARSE_CLOCK_REFRESH_NS: u64 = 1_000_000; #[inline]
247pub(crate) fn current_timestamp() -> u64 {
248 thread_local! {
269 static COARSE_CLOCK: std::cell::Cell<Option<(std::time::Instant, u64)>>
270 = const { std::cell::Cell::new(None) };
271 }
272 COARSE_CLOCK.with(|cell| {
273 let now_inst = std::time::Instant::now();
274 let (store, ns) = coarse_clock_advance(cell.get(), now_inst, || {
275 let elapsed = std::time::SystemTime::now()
276 .duration_since(std::time::UNIX_EPOCH)
277 .unwrap_or_default();
278 u64::try_from(elapsed.as_nanos()).unwrap_or(u64::MAX)
279 });
280 if let Some(pair) = store {
281 cell.set(Some(pair));
282 }
283 ns
284 })
285}
286
287#[inline]
304fn coarse_clock_advance(
305 cached: Option<(std::time::Instant, u64)>,
306 now_inst: std::time::Instant,
307 read_wall: impl FnOnce() -> u64,
308) -> (Option<(std::time::Instant, u64)>, u64) {
309 if let Some((last_inst, last_ns)) = cached {
310 if now_inst.duration_since(last_inst).as_nanos() < COARSE_CLOCK_REFRESH_NS as u128 {
311 return (None, last_ns);
312 }
313 }
314 let ns = read_wall();
315 (Some((now_inst, ns)), ns)
316}
317
318#[inline]
324pub(crate) fn current_timestamp_micros() -> u64 {
325 std::time::SystemTime::now()
326 .duration_since(std::time::UNIX_EPOCH)
327 .map(|d| d.as_micros() as u64)
328 .unwrap_or(0)
329}
330
331mod routing {
335 use xxhash_rust::xxh3::xxh3_64;
336
337 #[inline]
341 pub fn stream_id_from_bytes(data: &[u8]) -> u64 {
342 xxh3_64(data)
343 }
344
345 #[inline]
349 pub fn stream_id_from_key(key: &str) -> u64 {
350 xxh3_64(key.as_bytes())
351 }
352
353 #[inline]
361 pub fn route_to_shard(data: &[u8], num_shards: u16) -> u16 {
362 assert!(num_shards > 0, "num_shards must be > 0");
363 (xxh3_64(data) % num_shards as u64) as u16
364 }
365
366 #[cfg(test)]
367 mod tests {
368 use super::*;
369
370 #[test]
371 fn test_stream_id_deterministic() {
372 let data = b"test event data";
373 let id1 = stream_id_from_bytes(data);
374 let id2 = stream_id_from_bytes(data);
375 assert_eq!(id1, id2);
376 }
377
378 #[test]
379 fn test_stream_id_different_for_different_data() {
380 let id1 = stream_id_from_bytes(b"event1");
381 let id2 = stream_id_from_bytes(b"event2");
382 assert_ne!(id1, id2);
383 }
384
385 #[test]
386 fn test_stream_id_from_key() {
387 let id = stream_id_from_key("user:12345");
388 assert_ne!(id, 0);
389 }
390
391 #[test]
392 fn test_route_to_shard_range() {
393 let num_shards = 16u16;
394 for i in 0..1000 {
395 let data = format!("event_{}", i);
396 let shard = route_to_shard(data.as_bytes(), num_shards);
397 assert!(shard < num_shards);
398 }
399 }
400
401 #[test]
402 #[should_panic(expected = "num_shards must be > 0")]
403 fn test_route_to_shard_zero_shards_panics() {
404 route_to_shard(b"test", 0);
407 }
408
409 #[test]
410 fn test_route_to_shard_distribution() {
411 let num_shards = 8u16;
412 let mut counts = [0u32; 8];
413
414 for i in 0..8000 {
415 let data = format!("event_{}", i);
416 let shard = route_to_shard(data.as_bytes(), num_shards);
417 counts[shard as usize] += 1;
418 }
419
420 let expected = 1000;
422 for count in counts {
423 assert!(count > expected / 2, "shard count {} too low", count);
424 assert!(count < expected * 2, "shard count {} too high", count);
425 }
426 }
427 }
428}
429
430type InboundQueues = Arc<DashMap<u16, SegQueue<StoredEvent>>>;
432
433pub(crate) struct HandshakePacer {
447 entries: std::collections::HashMap<std::net::SocketAddr, (u32, std::time::Instant)>,
449 max_per_window: u32,
451 window: std::time::Duration,
453 last_gc: std::time::Instant,
455 gc_size_threshold: usize,
459}
460
461impl HandshakePacer {
462 pub(crate) fn new(max_per_window: u32, window: std::time::Duration) -> Self {
463 Self {
464 entries: std::collections::HashMap::new(),
465 max_per_window,
466 window,
467 last_gc: std::time::Instant::now(),
468 gc_size_threshold: 4096,
472 }
473 }
474
475 pub(crate) fn check_and_record(&mut self, source: std::net::SocketAddr) -> bool {
479 let now = std::time::Instant::now();
480 if now.duration_since(self.last_gc) >= self.window
489 || self.entries.len() >= self.gc_size_threshold
490 {
491 let cutoff = self.window.saturating_mul(2);
492 self.entries
493 .retain(|_, (_, start)| now.duration_since(*start) < cutoff);
494 self.last_gc = now;
495 }
496
497 let entry = self.entries.entry(source).or_insert((0, now));
498 if now.duration_since(entry.1) > self.window {
499 entry.0 = 0;
501 entry.1 = now;
502 }
503 entry.0 = entry.0.saturating_add(1);
504 entry.0 <= self.max_per_window
505 }
506}
507
508pub struct NetAdapter {
510 config: NetAdapterConfig,
512 socket: Option<Arc<Socket>>,
514 session: Option<Arc<NetSession>>,
516 session_manager: SessionMgr,
518 inbound: InboundQueues,
520 tasks: TokioMutex<Vec<JoinHandle<()>>>,
522 shutdown: Arc<AtomicBool>,
524 shutdown_notify: Arc<Notify>,
526 initialized: AtomicBool,
528 handshake_pacer: parking_lot::Mutex<HandshakePacer>,
533}
534
535impl NetAdapter {
536 pub fn new(config: NetAdapterConfig) -> Result<Self, AdapterError> {
538 config
539 .validate()
540 .map_err(|e| AdapterError::Fatal(format!("invalid config: {}", e)))?;
541
542 Ok(Self {
543 session_manager: SessionMgr::new(config.session_timeout),
544 config,
545 socket: None,
546 session: None,
547 inbound: Arc::new(DashMap::new()),
548 tasks: TokioMutex::new(Vec::new()),
549 shutdown: Arc::new(AtomicBool::new(false)),
550 shutdown_notify: Arc::new(Notify::new()),
551 initialized: AtomicBool::new(false),
552 handshake_pacer: parking_lot::Mutex::new(HandshakePacer::new(
556 5,
557 std::time::Duration::from_secs(1),
558 )),
559 })
560 }
561
562 async fn perform_handshake(
565 &self,
566 socket: &Socket,
567 ) -> Result<(SessionKeys, std::net::SocketAddr), AdapterError> {
568 let mut attempt = 0;
569 let max_attempts = self.config.handshake_retries;
570
571 const HANDSHAKE_RETRY_SLEEP_CAP_MS: u64 = 5_000;
579
580 loop {
581 attempt += 1;
582 match self.try_handshake(socket).await {
583 Ok(result) => return Ok(result),
584 Err(e) if attempt < max_attempts => {
585 tracing::warn!(
586 attempt = attempt,
587 max = max_attempts,
588 error = %e,
589 "handshake failed, retrying"
590 );
591 let backoff_ms =
592 (100u64.saturating_mul(attempt as u64)).min(HANDSHAKE_RETRY_SLEEP_CAP_MS);
593 tokio::time::sleep(std::time::Duration::from_millis(backoff_ms)).await;
594 }
595 Err(e) => return Err(e),
596 }
597 }
598 }
599
600 async fn try_handshake(
603 &self,
604 socket: &Socket,
605 ) -> Result<(SessionKeys, std::net::SocketAddr), AdapterError> {
606 let timeout = self.config.handshake_timeout;
607 let socket_arc = socket.socket_arc();
608
609 if self.config.is_initiator() {
610 let peer_pubkey = self
612 .config
613 .peer_static_pubkey
614 .as_ref()
615 .ok_or_else(|| AdapterError::Fatal("missing peer public key".into()))?;
616
617 let mut handshake = NoiseHandshake::initiator(&self.config.psk, peer_pubkey)
618 .map_err(|e| AdapterError::Fatal(format!("handshake init failed: {}", e)))?;
619
620 let msg1 = handshake
622 .write_message(&[])
623 .map_err(|e| AdapterError::Connection(format!("write_message failed: {}", e)))?;
624
625 let mut builder = PacketBuilder::new(&[0u8; 32], 0);
626 let packet = builder.build_handshake(&msg1);
627
628 socket
629 .send_to(&packet, self.config.peer_addr)
630 .await
631 .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
632
633 let (parsed, _source) = tokio::time::timeout(timeout, async {
637 let mut recv_buf = [0u8; protocol::MAX_PACKET_SIZE];
650 loop {
651 let (n, source) = socket_arc
652 .recv_from(&mut recv_buf)
653 .await
654 .map_err(|e| AdapterError::Connection(format!("recv failed: {}", e)))?;
655
656 if source != self.config.peer_addr {
658 continue;
659 }
660
661 let data = bytes::Bytes::copy_from_slice(&recv_buf[..n]);
662
663 if let Some(p) = ParsedPacket::parse(data, source) {
664 if p.header.flags.is_handshake() {
665 return Ok::<_, AdapterError>((p, source));
666 }
667 }
668 }
670 })
671 .await
672 .map_err(|_| AdapterError::Connection("handshake timeout".into()))??;
673
674 handshake
676 .read_message(&parsed.payload)
677 .map_err(|e| AdapterError::Connection(format!("read_message failed: {}", e)))?;
678
679 let keys = handshake
681 .into_session_keys()
682 .map_err(|e| AdapterError::Fatal(format!("key extraction failed: {}", e)))?;
683 Ok((keys, self.config.peer_addr))
684 } else {
685 let keypair = self
687 .config
688 .static_keypair
689 .as_ref()
690 .ok_or_else(|| AdapterError::Fatal("missing static keypair".into()))?;
691
692 let (parsed, source) = tokio::time::timeout(timeout, async {
699 loop {
700 let mut recv_buf = bytes::BytesMut::with_capacity(protocol::MAX_PACKET_SIZE);
701 recv_buf.resize(protocol::MAX_PACKET_SIZE, 0);
702
703 let (n, source) = socket_arc
704 .recv_from(&mut recv_buf)
705 .await
706 .map_err(|e| AdapterError::Connection(format!("recv failed: {}", e)))?;
707
708 recv_buf.truncate(n);
709 let data = recv_buf.freeze();
710
711 if let Some(p) = ParsedPacket::parse(data, source) {
712 if p.header.flags.is_handshake() {
713 let allowed = self.handshake_pacer.lock().check_and_record(source);
716 if !allowed {
717 tracing::debug!(
718 %source,
719 "handshake responder: dropping packet from \
720 rate-limited source"
721 );
722 continue;
723 }
724 return Ok::<_, AdapterError>((p, source));
725 }
726 }
727 }
729 })
730 .await
731 .map_err(|_| AdapterError::Connection("handshake timeout".into()))??;
732
733 let mut handshake = NoiseHandshake::responder(&self.config.psk, keypair)
734 .map_err(|e| AdapterError::Fatal(format!("handshake init failed: {}", e)))?;
735
736 handshake
738 .read_message(&parsed.payload)
739 .map_err(|e| AdapterError::Connection(format!("read_message failed: {}", e)))?;
740
741 let msg2 = handshake
743 .write_message(&[])
744 .map_err(|e| AdapterError::Connection(format!("write_message failed: {}", e)))?;
745
746 let mut builder = PacketBuilder::new(&[0u8; 32], 0);
747 let packet = builder.build_handshake(&msg2);
748
749 socket
752 .send_to(&packet, source)
753 .await
754 .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
755
756 let keys = handshake
758 .into_session_keys()
759 .map_err(|e| AdapterError::Fatal(format!("key extraction failed: {}", e)))?;
760 Ok((keys, source))
761 }
762 }
763
764 fn process_packet(
766 data: Bytes,
767 source: std::net::SocketAddr,
768 session: &NetSession,
769 inbound: &InboundQueues,
770 num_shards: u16,
771 ) {
772 let mut parsed = match ParsedPacket::parse(data, source) {
774 Some(p) => p,
775 None => return,
776 };
777
778 if !parsed.header.flags.is_handshake()
782 && !parsed.header.flags.is_heartbeat()
783 && !parsed.is_valid_length()
784 {
785 return;
786 }
787
788 if parsed.header.flags.is_handshake() {
790 return;
791 }
792
793 if parsed.header.session_id != session.session_id() {
795 return;
796 }
797
798 if parsed.header.flags.is_heartbeat() {
815 if source == session.peer_addr() {
816 session.verify_and_touch_heartbeat(&parsed);
817 }
818 return;
819 }
820
821 let aad = parsed.header.aad();
826 let counter = u64::from_le_bytes(parsed.header.nonce[4..12].try_into().unwrap_or([0u8; 8]));
827 let rx_cipher = session.rx_cipher();
828 let payload = std::mem::take(&mut parsed.payload);
829 let decrypted = match rx_cipher.decrypt_to_bytes(counter, &aad, payload) {
837 Ok(d) => {
838 if !rx_cipher.try_admit_rx_counter(counter) {
839 return;
840 }
841 d
842 }
843 Err(_) => return,
844 };
845
846 let events = EventFrame::read_events(decrypted, parsed.header.event_count);
848
849 let stream_id = parsed.header.stream_id;
851 let shard_id = if num_shards > 0 {
852 (stream_id % num_shards as u64) as u16
853 } else {
854 0
855 };
856
857 let is_fresh = {
870 let stream = session.get_or_create_stream(stream_id);
871 let fresh = stream.with_reliability(|r| r.on_receive(parsed.header.sequence));
877 stream.update_rx_seq(parsed.header.sequence);
878 fresh
879 };
880
881 if is_fresh {
882 let queue = inbound.entry(shard_id).or_default();
884 let seq = parsed.header.sequence;
885 for (i, event_data) in events.into_iter().enumerate() {
886 use std::fmt::Write;
887 let mut event_id = String::with_capacity(24);
888 let _ = write!(event_id, "{}:{}", seq, i);
889 queue.push(StoredEvent::new(event_id, event_data, seq, shard_id));
890 }
891 } else {
892 tracing::debug!(
893 seq = parsed.header.sequence,
894 stream_id,
895 "Dropping duplicate packet"
896 );
897 }
898
899 session.touch();
900 }
901
902 #[cfg(target_os = "linux")]
913 fn spawn_receiver(
914 shutdown: Arc<AtomicBool>,
915 shutdown_notify: Arc<Notify>,
916 socket: Arc<Socket>,
917 session: Arc<NetSession>,
918 inbound: InboundQueues,
919 num_shards: u16,
920 ) -> JoinHandle<()> {
921 let mut receiver = transport::BatchedPacketReceiver::new(socket.socket_arc());
922
923 tokio::spawn(async move {
924 while !shutdown.load(Ordering::Acquire) {
925 tokio::select! {
926 result = receiver.recv() => {
927 match result {
928 Ok((data, source)) => {
929 Self::process_packet(data, source, &session, &inbound, num_shards);
930 }
931 Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => {
932 tracing::warn!("batch receiver thread exited, stopping receiver");
933 break;
934 }
935 Err(e) => {
936 if !shutdown.load(Ordering::Acquire) {
937 tracing::warn!(error = %e, "receive error");
938 }
939 }
940 }
941 }
942 _ = shutdown_notify.notified() => {
943 break;
944 }
945 }
946 }
947 })
948 }
949
950 #[cfg(not(target_os = "linux"))]
952 fn spawn_receiver(
953 shutdown: Arc<AtomicBool>,
954 shutdown_notify: Arc<Notify>,
955 socket: Arc<Socket>,
956 session: Arc<NetSession>,
957 inbound: InboundQueues,
958 num_shards: u16,
959 ) -> JoinHandle<()> {
960 tokio::spawn(async move {
961 let mut receiver = PacketReceiver::new(socket.socket_arc());
962
963 while !shutdown.load(Ordering::Acquire) {
964 tokio::select! {
968 result = receiver.recv() => {
969 match result {
970 Ok((data, source)) => {
971 Self::process_packet(data, source, &session, &inbound, num_shards);
972 }
973 Err(e) => {
974 if !shutdown.load(Ordering::Acquire) {
975 tracing::warn!(error = %e, "receive error");
976 }
977 }
978 }
979 }
980 _ = shutdown_notify.notified() => {
981 break;
982 }
983 }
984 }
985 })
986 }
987
988 fn spawn_heartbeat(
990 shutdown: Arc<AtomicBool>,
991 shutdown_notify: Arc<Notify>,
992 socket: Arc<Socket>,
993 session: Arc<NetSession>,
994 interval: std::time::Duration,
995 peer_addr: std::net::SocketAddr,
996 ) -> JoinHandle<()> {
997 tokio::spawn(async move {
998 let mut ticker = tokio::time::interval(interval);
999
1000 loop {
1001 tokio::select! {
1002 _ = ticker.tick() => {
1003 if shutdown.load(Ordering::Acquire) || !session.is_active() {
1004 break;
1005 }
1006
1007 let packet = session.build_heartbeat();
1021
1022 if let Err(e) = socket.send_to(&packet, peer_addr).await {
1023 tracing::warn!(error = %e, "heartbeat send failed");
1024 }
1025 }
1026 _ = shutdown_notify.notified() => {
1027 break;
1028 }
1029 }
1030 }
1031 })
1032 }
1033}
1034
1035#[async_trait]
1036impl Adapter for NetAdapter {
1037 async fn init(&mut self) -> Result<(), AdapterError> {
1038 if self.initialized.load(Ordering::Acquire) {
1039 return Ok(());
1040 }
1041
1042 let socket_config = match (
1044 self.config.socket_recv_buffer,
1045 self.config.socket_send_buffer,
1046 ) {
1047 (Some(recv), Some(send)) => transport::SocketBufferConfig {
1048 recv_buffer_size: recv,
1049 send_buffer_size: send,
1050 },
1051 _ => transport::SocketBufferConfig::default(),
1052 };
1053 let socket = Socket::with_config(self.config.bind_addr, socket_config)
1054 .await
1055 .map_err(|e| AdapterError::Connection(format!("socket creation failed: {}", e)))?;
1056
1057 let socket = Arc::new(socket);
1058 self.socket = Some(socket.clone());
1059
1060 let (keys, actual_peer) = self.perform_handshake(&socket).await?;
1062
1063 let session = Arc::new(NetSession::new(
1067 keys,
1068 actual_peer,
1069 self.config.packet_pool_size,
1070 self.config.default_reliability.is_reliable(),
1071 ));
1072 self.session = Some(session.clone());
1073
1074 self.session_manager.set_session_arc(session.clone());
1076
1077 let recv_task = Self::spawn_receiver(
1079 self.shutdown.clone(),
1080 self.shutdown_notify.clone(),
1081 socket.clone(),
1082 session.clone(),
1083 self.inbound.clone(),
1084 self.config.num_shards,
1085 );
1086
1087 let heartbeat_task = Self::spawn_heartbeat(
1088 self.shutdown.clone(),
1089 self.shutdown_notify.clone(),
1090 socket,
1091 session,
1092 self.config.heartbeat_interval,
1093 actual_peer,
1094 );
1095
1096 {
1097 let mut tasks = self.tasks.lock().await;
1098 tasks.push(recv_task);
1099 tasks.push(heartbeat_task);
1100 }
1101
1102 self.initialized.store(true, Ordering::Release);
1103
1104 tracing::info!(
1105 bind_addr = %self.config.bind_addr,
1106 peer_addr = %self.config.peer_addr,
1107 role = ?self.config.role,
1108 "Net adapter initialized"
1109 );
1110
1111 Ok(())
1112 }
1113
1114 async fn on_batch(&self, batch: std::sync::Arc<Batch>) -> Result<(), AdapterError> {
1115 let session = self
1116 .session
1117 .as_ref()
1118 .ok_or_else(|| AdapterError::Connection("not connected".into()))?;
1119
1120 let socket = self
1121 .socket
1122 .as_ref()
1123 .ok_or_else(|| AdapterError::Connection("socket not initialized".into()))?;
1124
1125 let stream_id = batch.shard_id as u64;
1126 let peer_addr = session.peer_addr();
1127
1128 let reliable = {
1132 let stream = session.get_or_create_stream(stream_id);
1133 stream.with_reliability(|r| r.needs_ack())
1134 };
1136
1137 let mut current_batch: Vec<Bytes> = Vec::with_capacity(64);
1139 let mut current_size = 0usize;
1140
1141 let pool = session.thread_local_pool();
1143 let mut builder = pool.get();
1144
1145 for event in &batch.events {
1146 let event_bytes = event.raw.clone();
1147 let frame_size = EventFrame::LEN_SIZE + event_bytes.len();
1148
1149 if current_size + frame_size > protocol::MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
1151 let seq;
1153 {
1154 let stream = session.get_or_create_stream(stream_id);
1155 seq = stream.next_tx_seq();
1156 }
1157
1158 let flags = if reliable {
1159 PacketFlags::RELIABLE
1160 } else {
1161 PacketFlags::NONE
1162 };
1163
1164 let packet = builder.build(stream_id, seq, ¤t_batch, flags);
1165
1166 socket
1168 .send_to(&packet, peer_addr)
1169 .await
1170 .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
1171
1172 if reliable {
1179 let descriptor = std::sync::Arc::new(reliability::RetransmitDescriptor {
1184 seq,
1185 stream_id,
1186 events: current_batch.clone(),
1187 flags,
1188 });
1189 let stream = session.get_or_create_stream(stream_id);
1190 stream.with_reliability(|r| r.on_send(descriptor));
1191 }
1192
1193 current_batch.clear();
1194 current_size = 0;
1195 }
1196
1197 current_batch.push(event_bytes);
1198 current_size += frame_size;
1199 }
1200
1201 if !current_batch.is_empty() {
1203 let seq;
1204 {
1205 let stream = session.get_or_create_stream(stream_id);
1206 seq = stream.next_tx_seq();
1207 }
1208
1209 let flags = if reliable {
1210 PacketFlags::RELIABLE
1211 } else {
1212 PacketFlags::NONE
1213 };
1214
1215 let packet = builder.build(stream_id, seq, ¤t_batch, flags);
1216
1217 socket
1218 .send_to(&packet, peer_addr)
1219 .await
1220 .map_err(|e| AdapterError::Connection(format!("send failed: {}", e)))?;
1221
1222 if reliable {
1223 let descriptor = std::sync::Arc::new(reliability::RetransmitDescriptor {
1225 seq,
1226 stream_id,
1227 events: current_batch.clone(),
1228 flags,
1229 });
1230 let stream = session.get_or_create_stream(stream_id);
1231 stream.with_reliability(|r| r.on_send(descriptor));
1232 }
1233 }
1234
1235 session.touch();
1236
1237 Ok(())
1238 }
1239
1240 async fn poll_shard(
1241 &self,
1242 shard_id: u16,
1243 from_id: Option<&str>,
1244 limit: usize,
1245 ) -> Result<ShardPollResult, AdapterError> {
1246 let mut events = Vec::with_capacity(limit);
1247
1248 if let Some(queue) = self.inbound.get(&shard_id) {
1249 while events.len() < limit {
1250 if let Some(event) = queue.pop() {
1251 if from_id.is_none() || event_id_gt(&event.id, from_id.unwrap_or("")) {
1252 events.push(event);
1253 }
1254 } else {
1259 break;
1260 }
1261 }
1262 }
1263
1264 let has_more = self
1265 .inbound
1266 .get(&shard_id)
1267 .map(|q| !q.is_empty())
1268 .unwrap_or(false);
1269 let next_id = events.last().map(|e| e.id.clone());
1270
1271 Ok(ShardPollResult {
1272 events,
1273 next_id,
1274 has_more,
1275 })
1276 }
1277
1278 async fn flush(&self) -> Result<(), AdapterError> {
1279 Ok(())
1282 }
1283
1284 async fn shutdown(&self) -> Result<(), AdapterError> {
1285 self.shutdown.store(true, Ordering::Release);
1286
1287 self.shutdown_notify.notify_waiters();
1290
1291 self.session_manager.clear_session();
1293
1294 let mut tasks = self.tasks.lock().await;
1296 for task in tasks.drain(..) {
1297 let _ = task.await;
1298 }
1299
1300 self.initialized.store(false, Ordering::Release);
1301
1302 tracing::info!("Net adapter shutdown complete");
1303
1304 Ok(())
1305 }
1306
1307 fn name(&self) -> &'static str {
1308 "net"
1309 }
1310
1311 async fn is_healthy(&self) -> bool {
1312 self.initialized.load(Ordering::Acquire) && self.session_manager.check_session()
1313 }
1314}
1315
1316impl std::fmt::Debug for NetAdapter {
1317 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1318 f.debug_struct("NetAdapter")
1319 .field("config", &self.config)
1320 .field("initialized", &self.initialized.load(Ordering::Relaxed))
1321 .finish()
1322 }
1323}
1324
1325fn event_id_gt(a: &str, b: &str) -> bool {
1332 fn parse_id(id: &str) -> Option<(u64, u64)> {
1333 let (seq, idx) = id.split_once(':')?;
1334 Some((seq.parse().ok()?, idx.parse().ok()?))
1335 }
1336
1337 match (parse_id(a), parse_id(b)) {
1338 (Some(a), Some(b)) => a > b,
1339 _ => a > b, }
1341}
1342
1343#[cfg(test)]
1344mod tests {
1345 use super::*;
1346
1347 #[test]
1348 fn test_adapter_creation() {
1349 let psk = [0x42u8; 32];
1350 let peer_pubkey = [0x24u8; 32];
1351
1352 let config = NetAdapterConfig::initiator(
1353 "127.0.0.1:0".parse().unwrap(),
1354 "127.0.0.1:9999".parse().unwrap(),
1355 psk,
1356 peer_pubkey,
1357 );
1358
1359 let adapter = NetAdapter::new(config).unwrap();
1360 assert_eq!(adapter.name(), "net");
1361 }
1362
1363 #[test]
1370 fn coarse_clock_reuses_cache_within_refresh_window() {
1371 let t0 = std::time::Instant::now();
1372 let within = t0 + std::time::Duration::from_nanos(COARSE_CLOCK_REFRESH_NS - 1);
1373 let (store, ns) = coarse_clock_advance(Some((t0, 42)), within, || {
1374 panic!("cache hit must not read the wall clock")
1375 });
1376 assert_eq!(ns, 42, "hit must return the cached reading");
1377 assert!(store.is_none(), "hit must keep the hit path store-free");
1378 }
1379
1380 #[test]
1384 fn coarse_clock_refreshes_at_and_past_the_window() {
1385 let t0 = std::time::Instant::now();
1386 let at_boundary = t0 + std::time::Duration::from_nanos(COARSE_CLOCK_REFRESH_NS);
1387 let (store, ns) = coarse_clock_advance(Some((t0, 42)), at_boundary, || 100);
1388 assert_eq!(ns, 100, "boundary read must refresh");
1389 assert_eq!(
1390 store,
1391 Some((at_boundary, 100)),
1392 "refresh must rebase the window on the read instant"
1393 );
1394 }
1395
1396 #[test]
1399 fn coarse_clock_cold_start_reads_wall_clock() {
1400 let t0 = std::time::Instant::now();
1401 let (store, ns) = coarse_clock_advance(None, t0, || 7);
1402 assert_eq!(ns, 7);
1403 assert_eq!(store, Some((t0, 7)));
1404 }
1405
1406 #[test]
1407 fn current_timestamp_advances_after_refresh_interval() {
1408 let first = current_timestamp();
1413 std::thread::sleep(std::time::Duration::from_millis(5));
1414 let later = current_timestamp();
1415 assert!(
1416 later > first,
1417 "post-refresh reading must advance: first={}, later={}",
1418 first,
1419 later
1420 );
1421 }
1422
1423 #[test]
1424 fn test_shard_id_from_stream_id_uses_modulo() {
1425 let num_shards: u16 = 8;
1429
1430 let stream_a: u64 = 0xDEAD_BEEF_0000_0003;
1435 let stream_b: u64 = 0xCAFE_BABE_0000_0003;
1436
1437 let shard_a = (stream_a % num_shards as u64) as u16;
1438 let shard_b = (stream_b % num_shards as u64) as u16;
1439
1440 assert!(
1441 shard_a < num_shards,
1442 "shard must be in range [0, num_shards)"
1443 );
1444 assert!(
1445 shard_b < num_shards,
1446 "shard must be in range [0, num_shards)"
1447 );
1448
1449 let big_stream: u64 = 0xFFFF_FFFF_FFFF_FFFF;
1451 let shard_big = (big_stream % num_shards as u64) as u16;
1452 assert!(shard_big < num_shards);
1453
1454 assert_ne!(
1457 big_stream as u16, shard_big,
1458 "modulo must differ from truncation for large stream IDs"
1459 );
1460 }
1461
1462 #[test]
1463 fn test_invalid_config() {
1464 let psk = [0x42u8; 32];
1465 let peer_pubkey = [0x24u8; 32];
1466
1467 let mut config = NetAdapterConfig::initiator(
1468 "127.0.0.1:0".parse().unwrap(),
1469 "127.0.0.1:9999".parse().unwrap(),
1470 psk,
1471 peer_pubkey,
1472 );
1473 config.peer_static_pubkey = None;
1474
1475 let result = NetAdapter::new(config);
1476 assert!(result.is_err());
1477 }
1478
1479 #[test]
1482 fn test_event_id_gt_numeric_ordering() {
1483 assert!(event_id_gt("2:0", "1:0"));
1485 assert!(!event_id_gt("1:0", "2:0"));
1486 assert!(!event_id_gt("1:0", "1:0"));
1487
1488 assert!(event_id_gt("10:0", "9:0"));
1490 assert!(event_id_gt("100:0", "99:0"));
1491 assert!(!event_id_gt("9:0", "10:0"));
1492
1493 assert!(event_id_gt("5:2", "5:1"));
1495 assert!(!event_id_gt("5:1", "5:2"));
1496
1497 assert!(event_id_gt("1000000:0", "999999:0"));
1499 }
1500
1501 #[test]
1507 fn test_event_id_gt_edge_cases() {
1508 assert!(event_id_gt("1:0", ""));
1510 assert!(event_id_gt("b", "a"));
1512 assert!(!event_id_gt("a", "b"));
1513 }
1514
1515 #[test]
1520 fn test_build_then_process_packet_roundtrip() {
1521 use crate::adapter::net::crypto::{NoiseHandshake, StaticKeypair};
1522 use dashmap::DashMap;
1523 use std::sync::Arc;
1524
1525 let psk = [0x42u8; 32];
1527 let responder_kp = StaticKeypair::generate();
1528
1529 let mut initiator = NoiseHandshake::initiator(&psk, &responder_kp.public).unwrap();
1530 let mut responder = NoiseHandshake::responder(&psk, &responder_kp).unwrap();
1531
1532 let msg1 = initiator.write_message(&[]).unwrap();
1533 responder.read_message(&msg1).unwrap();
1534 let msg2 = responder.write_message(&[]).unwrap();
1535 initiator.read_message(&msg2).unwrap();
1536
1537 let init_keys = initiator.into_session_keys().unwrap();
1538 let resp_keys = responder.into_session_keys().unwrap();
1539
1540 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1542 let events = vec![
1543 Bytes::from(r#"{"token":"hello"}"#),
1544 Bytes::from(r#"{"token":"world"}"#),
1545 ];
1546 let packet = builder.build(0, 0, &events, PacketFlags::NONE);
1547
1548 let resp_session = Arc::new(NetSession::new(
1550 resp_keys,
1551 "127.0.0.1:5000".parse().unwrap(),
1552 4,
1553 false,
1554 ));
1555 let inbound: InboundQueues = Arc::new(DashMap::new());
1556 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1557
1558 NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1559
1560 let queue = inbound.get(&0).expect("shard 0 should have events");
1562 assert_eq!(queue.len(), 2, "expected 2 events, got {}", queue.len());
1563
1564 let e1 = queue.pop().unwrap();
1565 assert_eq!(&e1.raw[..], br#"{"token":"hello"}"#);
1566
1567 let e2 = queue.pop().unwrap();
1568 assert_eq!(&e2.raw[..], br#"{"token":"world"}"#);
1569 }
1570
1571 fn make_session_keys() -> (SessionKeys, SessionKeys) {
1573 use crate::adapter::net::crypto::{NoiseHandshake, StaticKeypair};
1574
1575 let psk = [0x42u8; 32];
1576 let responder_kp = StaticKeypair::generate();
1577
1578 let mut initiator = NoiseHandshake::initiator(&psk, &responder_kp.public).unwrap();
1579 let mut responder = NoiseHandshake::responder(&psk, &responder_kp).unwrap();
1580
1581 let msg1 = initiator.write_message(&[]).unwrap();
1582 responder.read_message(&msg1).unwrap();
1583 let msg2 = responder.write_message(&[]).unwrap();
1584 initiator.read_message(&msg2).unwrap();
1585
1586 (
1587 initiator.into_session_keys().unwrap(),
1588 responder.into_session_keys().unwrap(),
1589 )
1590 }
1591
1592 #[test]
1593 fn test_process_packet_rejects_truncated_packet() {
1594 use dashmap::DashMap;
1595 use std::sync::Arc;
1596
1597 let (init_keys, resp_keys) = make_session_keys();
1598
1599 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1601 let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1602
1603 let resp_session = Arc::new(NetSession::new(
1604 resp_keys,
1605 "127.0.0.1:5000".parse().unwrap(),
1606 4,
1607 false,
1608 ));
1609 let inbound: InboundQueues = Arc::new(DashMap::new());
1610 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1611
1612 let truncated = packet.slice(..packet.len() - 10);
1614 NetAdapter::process_packet(truncated, source, &resp_session, &inbound, 1);
1615 assert!(
1616 inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1617 "truncated packet must be silently dropped"
1618 );
1619 }
1620
1621 #[test]
1622 fn test_process_packet_rejects_tampered_payload() {
1623 use dashmap::DashMap;
1624 use std::sync::Arc;
1625
1626 let (init_keys, resp_keys) = make_session_keys();
1627
1628 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1629 let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1630
1631 let resp_session = Arc::new(NetSession::new(
1632 resp_keys,
1633 "127.0.0.1:5000".parse().unwrap(),
1634 4,
1635 false,
1636 ));
1637 let inbound: InboundQueues = Arc::new(DashMap::new());
1638 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1639
1640 let mut tampered = bytes::BytesMut::from(&packet[..]);
1642 tampered[super::protocol::HEADER_SIZE + 2] ^= 0xFF;
1643 NetAdapter::process_packet(tampered.freeze(), source, &resp_session, &inbound, 1);
1644
1645 assert!(
1646 inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1647 "tampered packet must be rejected by AEAD"
1648 );
1649 }
1650
1651 #[test]
1652 fn test_process_packet_rejects_wrong_session_id() {
1653 use dashmap::DashMap;
1654 use std::sync::Arc;
1655
1656 let (init_keys, resp_keys) = make_session_keys();
1657
1658 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1659 let packet = builder.build(0, 0, &[Bytes::from_static(b"hello")], PacketFlags::NONE);
1660
1661 let mut wrong_keys = resp_keys;
1663 wrong_keys.session_id = 0xDEAD;
1664 let resp_session = Arc::new(NetSession::new(
1665 wrong_keys,
1666 "127.0.0.1:5000".parse().unwrap(),
1667 4,
1668 false,
1669 ));
1670 let inbound: InboundQueues = Arc::new(DashMap::new());
1671 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1672
1673 NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1674
1675 assert!(
1676 inbound.get(&0).is_none() || inbound.get(&0).unwrap().is_empty(),
1677 "packet with wrong session_id must be dropped"
1678 );
1679 }
1680
1681 #[test]
1682 fn test_process_packet_multi_packet_batch_all_events_arrive() {
1683 use dashmap::DashMap;
1684 use std::sync::Arc;
1685
1686 let (init_keys, resp_keys) = make_session_keys();
1687
1688 let resp_session = Arc::new(NetSession::new(
1689 resp_keys,
1690 "127.0.0.1:5000".parse().unwrap(),
1691 4,
1692 false,
1693 ));
1694 let inbound: InboundQueues = Arc::new(DashMap::new());
1695 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1696
1697 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1701 let total_events = 200;
1702 let mut seq = 0u64;
1703
1704 let mut current_batch: Vec<Bytes> = Vec::new();
1706 let mut current_size = 0;
1707
1708 for i in 0..total_events {
1709 let data = format!("{{\"i\":{},\"pad\":\"{}\"}}", i, "x".repeat(150));
1710 let event_bytes = Bytes::from(data);
1711 let frame_size = EventFrame::LEN_SIZE + event_bytes.len();
1712
1713 if current_size + frame_size > protocol::MAX_PAYLOAD_SIZE && !current_batch.is_empty() {
1714 let packet = builder.build(0, seq, ¤t_batch, PacketFlags::NONE);
1715 NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1716 seq += 1;
1717 current_batch.clear();
1718 current_size = 0;
1719 }
1720
1721 current_batch.push(event_bytes);
1722 current_size += frame_size;
1723 }
1724
1725 if !current_batch.is_empty() {
1726 let packet = builder.build(0, seq, ¤t_batch, PacketFlags::NONE);
1727 NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1728 }
1729
1730 let queue = inbound.get(&0).expect("shard 0 should have events");
1732 assert_eq!(
1733 queue.len(),
1734 total_events,
1735 "all {} events must arrive across multiple packets",
1736 total_events
1737 );
1738 }
1739
1740 #[test]
1741 fn test_build_then_process_packet_both_directions() {
1742 use dashmap::DashMap;
1743 use std::sync::Arc;
1744
1745 let (init_keys, resp_keys) = make_session_keys();
1746 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1747
1748 {
1750 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1751 let packet = builder.build(0, 0, &[Bytes::from_static(b"i2r")], PacketFlags::NONE);
1752
1753 let session = Arc::new(NetSession::new(resp_keys.clone(), source, 4, false));
1754 let inbound: InboundQueues = Arc::new(DashMap::new());
1755 NetAdapter::process_packet(packet, source, &session, &inbound, 1);
1756
1757 let queue = inbound.get(&0).expect("i2r: shard 0 should have events");
1758 assert_eq!(queue.len(), 1, "i2r: expected 1 event");
1759 assert_eq!(&queue.pop().unwrap().raw[..], b"i2r");
1760 }
1761
1762 {
1764 let mut builder = PacketBuilder::new(&resp_keys.tx_key, resp_keys.session_id);
1765 let packet = builder.build(0, 0, &[Bytes::from_static(b"r2i")], PacketFlags::NONE);
1766
1767 let session = Arc::new(NetSession::new(init_keys.clone(), source, 4, false));
1768 let inbound: InboundQueues = Arc::new(DashMap::new());
1769 NetAdapter::process_packet(packet, source, &session, &inbound, 1);
1770
1771 let queue = inbound.get(&0).expect("r2i: shard 0 should have events");
1772 assert_eq!(queue.len(), 1, "r2i: expected 1 event");
1773 assert_eq!(&queue.pop().unwrap().raw[..], b"r2i");
1774 }
1775 }
1776
1777 #[test]
1778 fn test_poll_shard_cursor_drops_consumed_events() {
1779 use std::sync::Arc;
1784
1785 let (init_keys, resp_keys) = make_session_keys();
1786
1787 let resp_session = Arc::new(NetSession::new(
1788 resp_keys,
1789 "127.0.0.1:5000".parse().unwrap(),
1790 4,
1791 false,
1792 ));
1793 let inbound: InboundQueues = Arc::new(DashMap::new());
1794 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1795
1796 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1798 for seq in 0..3u64 {
1799 let events = vec![Bytes::from(format!("event-{}", seq))];
1800 let packet = builder.build(0, seq, &events, PacketFlags::NONE);
1801 NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1802 }
1803
1804 let queue = inbound.get(&0u16).unwrap();
1805 assert_eq!(queue.len(), 3);
1806
1807 let from_id = "0:0";
1810 let mut events = Vec::new();
1811 while events.len() < 10 {
1812 if let Some(event) = queue.pop() {
1813 if event_id_gt(&event.id, from_id) {
1814 events.push(event);
1815 }
1816 } else {
1818 break;
1819 }
1820 }
1821
1822 assert_eq!(events.len(), 2, "should get 2 events after cursor 0:0");
1823 assert_eq!(events[0].id, "1:0");
1824 assert_eq!(events[1].id, "2:0");
1825
1826 assert_eq!(queue.len(), 0, "queue should be empty after poll drains it");
1828 }
1829
1830 #[test]
1831 fn test_process_packet_old_counter_rejected() {
1832 use std::sync::Arc;
1835
1836 let (init_keys, resp_keys) = make_session_keys();
1837 let resp_session = Arc::new(NetSession::new(
1838 resp_keys,
1839 "127.0.0.1:5000".parse().unwrap(),
1840 4,
1841 false,
1842 ));
1843 let inbound: InboundQueues = Arc::new(DashMap::new());
1844 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1845
1846 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1848 for seq in 0..1100u64 {
1849 let packet = builder.build(0, seq, &[Bytes::from_static(b"x")], PacketFlags::NONE);
1850 NetAdapter::process_packet(packet, source, &resp_session, &inbound, 1);
1851 }
1852 assert_eq!(inbound.get(&0).unwrap().len(), 1100);
1853
1854 let mut stale_builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1858 let stale_packet =
1859 stale_builder.build(0, 9999, &[Bytes::from_static(b"stale")], PacketFlags::NONE);
1860 NetAdapter::process_packet(stale_packet, source, &resp_session, &inbound, 1);
1861
1862 assert_eq!(
1864 inbound.get(&0).unwrap().len(),
1865 1100,
1866 "packet with stale counter must be rejected"
1867 );
1868 }
1869
1870 #[test]
1871 fn test_process_packet_far_future_counter_rejected() {
1872 use std::sync::Arc;
1876
1877 let (_init_keys, resp_keys) = make_session_keys();
1878
1879 let resp_session = Arc::new(NetSession::new(
1884 resp_keys,
1885 "127.0.0.1:5000".parse().unwrap(),
1886 4,
1887 false,
1888 ));
1889
1890 let rx_cipher = resp_session.rx_cipher();
1892 assert!(
1893 !rx_cipher.is_valid_rx_counter(u64::MAX),
1894 "counter at u64::MAX must be rejected (far beyond MAX_FORWARD)"
1895 );
1896 assert!(
1897 rx_cipher.is_valid_rx_counter(0),
1898 "counter 0 should be valid initially"
1899 );
1900 }
1901
1902 #[test]
1919 fn process_packet_drops_duplicates_per_reliability_decision() {
1920 use dashmap::DashMap;
1921 use std::sync::Arc;
1922
1923 let (init_keys, resp_keys) = make_session_keys();
1924
1925 let resp_session = Arc::new(NetSession::new(
1929 resp_keys,
1930 "127.0.0.1:5000".parse().unwrap(),
1931 4,
1932 true, ));
1934 let inbound: InboundQueues = Arc::new(DashMap::new());
1935 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
1936
1937 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
1943 let packet0 = builder.build(7, 0, &[Bytes::from(r#"{"first":0}"#)], PacketFlags::NONE);
1944 let packet1 = builder.build(7, 1, &[Bytes::from(r#"{"first":1}"#)], PacketFlags::NONE);
1945 let packet0_dup = builder.build(
1949 7,
1950 0,
1951 &[Bytes::from(r#"{"dup":"should_not_appear"}"#)],
1952 PacketFlags::NONE,
1953 );
1954
1955 NetAdapter::process_packet(packet0, source, &resp_session, &inbound, 1);
1956 NetAdapter::process_packet(packet1, source, &resp_session, &inbound, 1);
1957 NetAdapter::process_packet(packet0_dup, source, &resp_session, &inbound, 1);
1958
1959 let queue = inbound.get(&0).expect("shard 0 should exist");
1960 assert_eq!(
1961 queue.len(),
1962 2,
1963 "duplicate packet must NOT enqueue (BUG_REPORT.md #5); \
1964 got {} events, expected exactly 2 (seq=0 and seq=1, no dup)",
1965 queue.len()
1966 );
1967
1968 let e0 = queue.pop().unwrap();
1971 assert_eq!(&e0.raw[..], br#"{"first":0}"#);
1972 let e1 = queue.pop().unwrap();
1973 assert_eq!(&e1.raw[..], br#"{"first":1}"#);
1974 assert!(queue.is_empty());
1975 }
1976
1977 #[test]
1985 fn heartbeat_is_aead_authenticated() {
1986 use crate::adapter::net::pool::PacketBuilder;
1987 use dashmap::DashMap;
1988 use std::sync::Arc;
1989
1990 let (init_keys, resp_keys) = make_session_keys();
1991
1992 let resp_session = Arc::new(NetSession::new(
1993 resp_keys,
1994 "127.0.0.1:5000".parse().unwrap(),
1995 4,
1996 false,
1997 ));
1998 let inbound: InboundQueues = Arc::new(DashMap::new());
1999 let source: std::net::SocketAddr = "127.0.0.1:5000".parse().unwrap();
2000
2001 let mut builder = PacketBuilder::new(&init_keys.tx_key, init_keys.session_id);
2004 let heartbeat = builder.build_heartbeat();
2005 let last_activity_before = resp_session.last_activity_ns();
2006 std::thread::sleep(std::time::Duration::from_millis(2));
2007
2008 NetAdapter::process_packet(heartbeat, source, &resp_session, &inbound, 1);
2010 let last_activity_after = resp_session.last_activity_ns();
2011 assert!(
2012 last_activity_after > last_activity_before,
2013 "legitimate AEAD-tagged heartbeat must call session.touch()"
2014 );
2015
2016 let mut forged = bytes::BytesMut::new();
2020 let header = NetHeader::heartbeat(resp_session.session_id());
2021 forged.extend_from_slice(&header.to_bytes());
2022 let forged = forged.freeze();
2023 let last_activity_before = resp_session.last_activity_ns();
2024 std::thread::sleep(std::time::Duration::from_millis(2));
2025 NetAdapter::process_packet(forged, source, &resp_session, &inbound, 1);
2026 let last_activity_after = resp_session.last_activity_ns();
2027 assert_eq!(
2028 last_activity_before, last_activity_after,
2029 "unauthenticated heartbeat (no AEAD tag) must NOT touch the session"
2030 );
2031
2032 let mut forged_tag = bytes::BytesMut::new();
2035 let mut header_bytes = NetHeader::heartbeat(resp_session.session_id()).to_bytes();
2036 header_bytes[12..16].copy_from_slice(&[0u8; 4]);
2039 header_bytes[16..24].copy_from_slice(&1u64.to_le_bytes());
2040 forged_tag.extend_from_slice(&header_bytes);
2041 forged_tag.extend_from_slice(&[0xAAu8; 16]); let forged_tag = forged_tag.freeze();
2043 let last_activity_before = resp_session.last_activity_ns();
2044 std::thread::sleep(std::time::Duration::from_millis(2));
2045 NetAdapter::process_packet(forged_tag, source, &resp_session, &inbound, 1);
2046 let last_activity_after = resp_session.last_activity_ns();
2047 assert_eq!(
2048 last_activity_before, last_activity_after,
2049 "heartbeat with garbage AEAD tag must NOT touch the session"
2050 );
2051 }
2052
2053 #[test]
2059 fn handshake_pacer_rejects_floods_per_source() {
2060 use std::time::Duration;
2061 let mut pacer = HandshakePacer::new(3, Duration::from_millis(50));
2062
2063 let attacker: std::net::SocketAddr = "10.0.0.1:9000".parse().unwrap();
2064 let legit: std::net::SocketAddr = "10.0.0.2:9000".parse().unwrap();
2065
2066 for _ in 0..3 {
2068 assert!(pacer.check_and_record(attacker));
2069 }
2070 for _ in 0..10 {
2072 assert!(
2073 !pacer.check_and_record(attacker),
2074 "attacker exceeding budget must be dropped"
2075 );
2076 }
2077
2078 assert!(
2081 pacer.check_and_record(legit),
2082 "legitimate source must still get through despite attacker flood"
2083 );
2084
2085 std::thread::sleep(Duration::from_millis(55));
2087 assert!(
2088 pacer.check_and_record(attacker),
2089 "attacker budget must refill after window"
2090 );
2091 }
2092}