freenet 0.2.56

Freenet core software
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
//! Freenet Transport protocol implementation.
//!
//! Please see `docs/architecture/transport.md` for more information.
//!
//! Provides the low-level network transport abstraction (e.g., UDP).
//! Handles the raw sending and receiving of byte packets over the network.
//! See `architecture.md`.

use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::{borrow::Cow, io, net::SocketAddr};

use futures::Future;
use tokio::net::UdpSocket;

// =============================================================================
// Auto-update version mismatch detection
// =============================================================================
//
// When a peer detects a version mismatch with another peer (typically the gateway),
// it sets this flag. The node's run loop checks this periodically and triggers
// a GitHub check to verify if a newer version is actually available.
//
// This is temporary alpha-testing infrastructure to reduce the burden of
// frequent updates during rapid development.

/// Global flag set by transport layer when a version mismatch is detected.
static VERSION_MISMATCH_DETECTED: AtomicBool = AtomicBool::new(false);

/// Generation counter incremented on each `signal_version_mismatch()` call.
/// The update check task uses this to detect fresh mismatches and reset backoff.
static VERSION_MISMATCH_GENERATION: AtomicU64 = AtomicU64::new(0);

/// Signal that a version mismatch was detected with another peer.
/// Called from the transport layer when a connection fails due to
/// protocol version incompatibility.
pub fn signal_version_mismatch() {
    VERSION_MISMATCH_DETECTED.store(true, Ordering::SeqCst);
    VERSION_MISMATCH_GENERATION.fetch_add(1, Ordering::SeqCst);
}

/// Get the current mismatch generation counter.
/// Each call to `signal_version_mismatch()` increments this.
pub fn version_mismatch_generation() -> u64 {
    VERSION_MISMATCH_GENERATION.load(Ordering::SeqCst)
}

/// Check if there's a pending version mismatch that should trigger an update check.
pub fn has_version_mismatch() -> bool {
    VERSION_MISMATCH_DETECTED.load(Ordering::SeqCst)
}

/// Clear the version mismatch flag (called after checking for updates).
pub fn clear_version_mismatch() {
    VERSION_MISMATCH_DETECTED.store(false, Ordering::SeqCst);
}

// =============================================================================
// Decentralized version discovery
// =============================================================================
//
// Peers learn about newer versions from successful handshakes with neighbors.
// HIGHEST_SEEN_VERSION tracks the highest version seen in the network.
// URGENT_UPDATE_NEEDED is set when a remote's min_compatible > our version
// (meaning we MUST update to remain connected).

/// Highest version observed from any peer during handshake, with the number
/// of handshakes that reported it.
static HIGHEST_SEEN_VERSION: Mutex<Option<HighestSeenVersion>> = Mutex::new(None);

/// Set when a remote peer's min_compatible version exceeds our version,
/// meaning we cannot connect to that peer without updating.
static URGENT_UPDATE_NEEDED: AtomicBool = AtomicBool::new(false);

/// Minimum number of handshakes that must report a version before we
/// trust it for the stagger timer. This is a lightweight defense against
/// a single transient bad report — it does not track peer identity, so a
/// persistent attacker reconnecting can bypass it. The real protection is
/// that GitHub verification is always required before exit-42.
const MIN_VERSION_REPORTERS: usize = 2;

#[derive(Clone, Copy)]
struct HighestSeenVersion {
    version: (u8, u8, u16),
    reporter_count: usize,
}

/// Called on every successful handshake to track the highest version seen.
///
/// Lightweight defense: at least `MIN_VERSION_REPORTERS` handshakes must
/// report the same version before `get_highest_seen_version()` returns it.
/// This catches single transient bad reports. A persistent attacker can
/// bypass this by reconnecting, but the real protection is that the update
/// check task always verifies against GitHub before triggering exit-42.
pub fn report_peer_version(version: (u8, u8, u16)) {
    // Recover from mutex poisoning — version tracking is best-effort and
    // must not permanently break if another thread panicked while holding it.
    let mut guard = HIGHEST_SEEN_VERSION
        .lock()
        .unwrap_or_else(|e| e.into_inner());

    match *guard {
        None => {
            *guard = Some(HighestSeenVersion {
                version,
                reporter_count: 1,
            });
        }
        Some(ref mut current) => {
            if version > current.version {
                // New highest — reset count since this is a different version.
                *current = HighestSeenVersion {
                    version,
                    reporter_count: 1,
                };
            } else if version == current.version {
                current.reporter_count = current.reporter_count.saturating_add(1);
            }
            // version < current: ignore
        }
    }
}

/// Get the highest version seen from peers, if enough peers have reported it.
///
/// Returns `None` until at least `MIN_VERSION_REPORTERS` peers have reported
/// the same highest version. This prevents a single malicious peer from
/// triggering update checks by advertising a fake high version.
pub fn get_highest_seen_version() -> Option<(u8, u8, u16)> {
    let guard = HIGHEST_SEEN_VERSION
        .lock()
        .unwrap_or_else(|e| e.into_inner());
    guard.and_then(|hsv| {
        if hsv.reporter_count >= MIN_VERSION_REPORTERS {
            Some(hsv.version)
        } else {
            None
        }
    })
}

/// Reset version discovery state (test only).
#[cfg(test)]
fn reset_version_discovery() {
    let mut guard = HIGHEST_SEEN_VERSION
        .lock()
        .unwrap_or_else(|e| e.into_inner());
    *guard = None;
}

/// Called when a version mismatch indicates we are too old
/// (remote's min_compatible > our version). This is a breaking change
/// that requires immediate update.
pub fn signal_urgent_update() {
    URGENT_UPDATE_NEEDED.store(true, Ordering::SeqCst);
    // Also trigger the legacy mismatch path
    signal_version_mismatch();
}

/// Check if an urgent (breaking) update is needed.
pub fn is_urgent_update() -> bool {
    URGENT_UPDATE_NEEDED.load(Ordering::SeqCst)
}

/// Clear the urgent update flag.
pub fn clear_urgent_update() {
    URGENT_UPDATE_NEEDED.store(false, Ordering::SeqCst);
}

/// Global counter of open ring connections, updated by `connection_maintenance`.
/// The update check task reads this to decide whether to trust a gateway
/// version mismatch signal when the GitHub check keeps failing.
static OPEN_CONNECTION_COUNT: AtomicUsize = AtomicUsize::new(0);

/// Update the global open connection count (called from `connection_maintenance`).
pub fn set_open_connection_count(count: usize) {
    OPEN_CONNECTION_COUNT.store(count, Ordering::SeqCst);
}

/// Get the current open connection count.
pub fn get_open_connection_count() -> usize {
    OPEN_CONNECTION_COUNT.load(Ordering::SeqCst)
}

pub mod connection_handler;
mod crypto;
pub mod in_memory_socket;

/// Mock transport infrastructure for testing and benchmarking.
/// Provides MockSocket and helper functions to create mock peer connections
/// without real network I/O.
#[cfg(any(test, feature = "bench"))]
pub use connection_handler::mock_transport;
mod packet_data;
pub mod peer_connection;
// todo: optimize trackers
mod received_packet_tracker;

pub(crate) mod bbr;
pub mod congestion_control;
pub(crate) mod fixed_rate;
pub mod global_bandwidth;
pub(crate) mod ledbat;
pub mod metrics;
mod sent_packet_tracker;
mod symmetric_message;
pub(crate) mod token_bucket;

// Re-export LEDBAT stats for telemetry
pub use ledbat::LedbatStats;
// Re-export congestion control interface
pub use congestion_control::{
    AlgorithmConfig, CongestionControl, CongestionControlAlgorithm, CongestionControlConfig,
    CongestionControlStats, CongestionController,
};
// Re-export transport metrics for periodic telemetry snapshots
pub use metrics::{TRANSPORT_METRICS, TransportMetrics, TransportSnapshot};
// Re-export reset functions for deterministic simulation testing
pub use packet_data::reset_nonce_counter;
pub use peer_connection::StreamId;

use std::time::Duration;

/// Statistics from a completed stream transfer.
///
/// Provides metrics for telemetry including congestion control data.
/// Used to emit TransferEvent telemetry when streams complete.
#[derive(Debug, Clone)]
pub struct TransferStats {
    /// Unique stream identifier.
    pub stream_id: u64,
    /// Remote peer address.
    pub remote_addr: SocketAddr,
    /// Total bytes transferred.
    pub bytes_transferred: u64,
    /// Time elapsed from start to transmission completion.
    /// Note: This is when we finished SENDING, not when all ACKs arrived.
    pub elapsed: Duration,
    /// Peak congestion window during transfer (bytes).
    pub peak_cwnd_bytes: u32,
    /// Final congestion window at completion (bytes).
    pub final_cwnd_bytes: u32,
    /// Number of LEDBAT slowdowns triggered during transfer.
    pub slowdowns_triggered: u32,
    /// Minimum observed RTT (base delay) at completion.
    pub base_delay: Duration,
    /// Final slow start threshold at completion (bytes).
    /// Key diagnostic for death spiral: if ssthresh collapses to min_cwnd,
    /// slow start can't recover useful throughput.
    pub final_ssthresh_bytes: u32,
    /// Effective minimum ssthresh floor (bytes).
    /// This floor prevents ssthresh from collapsing too low during timeouts.
    pub min_ssthresh_floor_bytes: u32,
    /// Total retransmission timeouts (RTO events) during transfer.
    /// High values indicate severe congestion or path issues.
    pub total_timeouts: u32,
    /// Bytes still in flight (sent but not yet ACKed) when transmission completed.
    /// High values relative to bytes_transferred indicate ACK lag.
    /// This helps estimate the gap between transmission end and final ACK.
    pub final_flightsize: u32,
    /// Configured transmission rate (bytes/sec) for fixed-rate controller.
    /// Zero for adaptive algorithms (BBR, LEDBAT).
    pub configured_rate: u32,
}

impl TransferStats {
    /// Calculate average throughput in bytes per second.
    pub fn avg_throughput_bps(&self) -> u64 {
        if self.elapsed.is_zero() {
            return 0;
        }
        (self.bytes_transferred as f64 / self.elapsed.as_secs_f64()) as u64
    }
}

type MessagePayload = bytes::Bytes;

type PacketId = u32;

/// A wrapper around SocketAddr that represents an address observed at the transport layer.
/// This is the "ground truth" for NAT scenarios - it's the actual address we see
/// at the network layer, not what the peer claims in protocol messages.
///
/// Using a newtype instead of raw `SocketAddr` makes the address semantics explicit
/// and prevents accidental confusion with advertised/claimed addresses.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct ObservedAddr(SocketAddr);

impl ObservedAddr {
    /// Get the underlying socket address.
    pub fn socket_addr(&self) -> SocketAddr {
        self.0
    }
}

impl std::fmt::Display for ObservedAddr {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

impl From<SocketAddr> for ObservedAddr {
    fn from(addr: SocketAddr) -> Self {
        Self(addr)
    }
}

pub(crate) use self::connection_handler::ExpectedInboundTracker;
pub use self::connection_handler::create_connection_handler;
pub use self::crypto::{TransportKeypair, TransportPublicKey};
pub use self::{
    connection_handler::{InboundConnectionHandler, OutboundConnectionHandler},
    peer_connection::PeerConnection,
};

// Streaming infrastructure (Phase 1)
pub use self::peer_connection::{
    streaming::{StreamError, StreamHandle, StreamRegistry, StreamingInboundStream},
    streaming_buffer::{InsertError, LockFreeStreamBuffer},
};

#[derive(Debug, thiserror::Error)]
#[cfg_attr(feature = "bench", allow(dead_code))]
pub enum TransportError {
    #[error("transport handler channel closed, socket likely closed")]
    ChannelClosed,
    #[error("connection to remote closed")]
    ConnectionClosed(SocketAddr),
    #[error("failed while establishing connection, reason: {cause}")]
    ConnectionEstablishmentFailure { cause: Cow<'static, str> },
    #[error(
        "Version incompatibility with gateway\n  Your client version: {actual}\n  Gateway version: {expected}\n  \n  To fix this, update your Freenet client:\n    cargo install --force freenet --version {expected}\n  \n  Or if building from source:\n    git pull && cargo install --path crates/core"
    )]
    ProtocolVersionMismatch { expected: String, actual: String },
    #[error("send to {0} failed: {1}")]
    SendFailed(SocketAddr, std::io::ErrorKind),
    #[error(transparent)]
    IO(#[from] std::io::Error),
    #[error(transparent)]
    Other(#[from] anyhow::Error),
    #[error(transparent)]
    PubKeyDecryptionError(#[from] crypto::DecryptionError),
    #[error(transparent)]
    Serialization(#[from] bincode::Error),
}

impl TransportError {
    /// Returns true if this error is a transient UDP send failure that should
    /// not kill the connection. The idle timeout is the sole authority on
    /// connection liveness.
    pub fn is_transient_send_failure(&self) -> bool {
        matches!(self, TransportError::SendFailed(..))
    }
}

/// Socket trait for abstracting UDP communication.
///
/// This trait allows the transport layer to work with both real UDP sockets
/// and mock sockets for testing.
pub trait Socket: Sized + Send + Sync + 'static {
    fn bind(addr: SocketAddr) -> impl Future<Output = io::Result<Self>> + Send;
    fn recv_from(
        &self,
        buf: &mut [u8],
    ) -> impl Future<Output = io::Result<(usize, SocketAddr)>> + Send;
    fn send_to(
        &self,
        buf: &[u8],
        target: SocketAddr,
    ) -> impl Future<Output = io::Result<usize>> + Send;

    /// Synchronous send for use in blocking contexts (e.g., spawn_blocking).
    /// For UDP, this typically succeeds immediately since there's no flow control.
    /// Used by MockSocket implementations in tests.
    #[allow(dead_code)]
    fn send_to_blocking(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize>;
}

/// Normalize an IPv4-mapped IPv6 address (`::ffff:x.x.x.x`) to plain IPv4.
///
/// Dual-stack sockets report IPv4 peers as `::ffff:x.x.x.x`. Without
/// normalization, the same peer gets different ring locations, connection map
/// entries, and backoff tracking depending on whether the local socket is
/// IPv4-only or dual-stack. Normalizing at the transport boundary ensures a
/// single canonical representation throughout the system.
pub fn normalize_mapped_addr(addr: SocketAddr) -> SocketAddr {
    if let SocketAddr::V6(v6) = &addr {
        if let Some(v4) = v6.ip().to_ipv4_mapped() {
            return SocketAddr::new(std::net::IpAddr::V4(v4), v6.port());
        }
    }
    addr
}

/// Convert a plain IPv4 address to IPv4-mapped IPv6 for sending on a dual-stack socket.
///
/// An AF_INET6 socket cannot `sendto()` a plain AF_INET address — the kernel
/// rejects the address family mismatch with EINVAL. This function converts
/// `x.x.x.x` to `::ffff:x.x.x.x` when the local socket is IPv6.
fn map_addr_for_send(local_is_ipv6: bool, target: SocketAddr) -> SocketAddr {
    if local_is_ipv6 {
        if let SocketAddr::V4(v4) = &target {
            let mapped = v4.ip().to_ipv6_mapped();
            return SocketAddr::new(std::net::IpAddr::V6(mapped), v4.port());
        }
    }
    target
}

impl Socket for UdpSocket {
    async fn bind(addr: SocketAddr) -> io::Result<Self> {
        // Use socket2 to configure dual-stack before binding, then convert
        // to a tokio UdpSocket. This ensures IPv6 sockets accept IPv4 too.
        let is_ipv6 = addr.is_ipv6();
        let domain = if is_ipv6 {
            socket2::Domain::IPV6
        } else {
            socket2::Domain::IPV4
        };
        let sock = socket2::Socket::new(domain, socket2::Type::DGRAM, Some(socket2::Protocol::UDP))
            .map_err(|e| io::Error::new(e.kind(), format!("Failed to create UDP socket: {e}")))?;
        if is_ipv6 {
            // Dual-stack: accept both IPv4 (mapped as ::ffff:x.x.x.x) and IPv6
            sock.set_only_v6(false)?;
        }
        sock.set_nonblocking(true)?;
        sock.bind(&addr.into()).map_err(|e| {
            io::Error::new(
                e.kind(),
                format!("Failed to bind UDP socket to {addr}: {e}"),
            )
        })?;
        let std_socket: std::net::UdpSocket = sock.into();
        Self::from_std(std_socket)
    }

    async fn recv_from(&self, buf: &mut [u8]) -> io::Result<(usize, SocketAddr)> {
        let (len, addr) = self.recv_from(buf).await?;
        // Normalize ::ffff:x.x.x.x → plain IPv4 so the rest of the system
        // sees a consistent address regardless of socket type.
        let addr = normalize_mapped_addr(addr);
        // Receive-side dashboard metering happens post-authentication — see
        // `peer_connection::PeerConnection::recv` after `try_decrypt_sym`
        // succeeds. Doing it here would let an attacker spoofing UDP source
        // addresses inflate the dashboard counters and crowd legitimate
        // peers out of the bounded per-peer table (#3999).
        Ok((len, addr))
    }

    async fn send_to(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
        // Convert plain IPv4 targets to mapped form when sending on an IPv6 socket,
        // since AF_INET6 sockets reject AF_INET addresses with EINVAL.
        let local_is_ipv6 = self.local_addr().map(|a| a.is_ipv6()).unwrap_or(false);
        let mapped_target = map_addr_for_send(local_is_ipv6, target);
        let result = self.send_to(buf, mapped_target).await;
        if let Ok(bytes) = result {
            // Record against the un-mapped target so per-peer keys match the
            // normalized form used everywhere else in the system.
            metrics::TRANSPORT_METRICS.record_packet_sent(target, bytes as u64);
        }
        result
    }

    fn send_to_blocking(&self, buf: &[u8], target: SocketAddr) -> io::Result<usize> {
        // try_send_to is synchronous and for UDP typically succeeds immediately.
        // However, under high load the kernel buffer might be full, returning WouldBlock.
        // In that case, we retry with exponential backoff since we're in a blocking context.
        let local_is_ipv6 = self.local_addr().map(|a| a.is_ipv6()).unwrap_or(false);
        let mapped_target = map_addr_for_send(local_is_ipv6, target);
        let mut backoff_us = 1u64; // Start at 1μs
        const MAX_BACKOFF_US: u64 = 1000; // Cap at 1ms

        loop {
            match self.try_send_to(buf, mapped_target) {
                Ok(n) => {
                    metrics::TRANSPORT_METRICS.record_packet_sent(target, n as u64);
                    return Ok(n);
                }
                Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
                    // Kernel buffer full - exponential backoff
                    std::thread::sleep(std::time::Duration::from_micros(backoff_us));
                    backoff_us = (backoff_us * 2).min(MAX_BACKOFF_US);
                }
                Err(e) => return Err(e),
            }
        }
    }
}

// =============================================================================
// Type-erased peer connection trait
// =============================================================================
//
// This trait abstracts over `PeerConnection<S>` to allow the event loop to work
// with connections without being generic over the socket type. This enables
// using the same event loop code for both production (UdpSocket) and testing
// (InMemorySocket) without propagating generics through the entire codebase.

use crate::message::NetMessage;

/// Type-erased interface for peer connections.
///
/// This trait provides the minimal interface needed by `peer_connection_listener`
/// and related event loop code to communicate with a peer. By boxing connections
/// as `Box<dyn PeerConnectionApi>`, we avoid making the event loop generic over
/// the socket type.
pub(crate) trait PeerConnectionApi: Send {
    /// Returns the remote peer's socket address.
    fn remote_addr(&self) -> SocketAddr;

    /// Sends a network message to the remote peer.
    ///
    /// The message is serialized and sent over the transport connection.
    fn send_message(
        &mut self,
        msg: NetMessage,
    ) -> std::pin::Pin<Box<dyn Future<Output = Result<(), TransportError>> + Send + '_>>;

    /// Receives raw bytes from the remote peer.
    ///
    /// Returns the deserialized message bytes. The caller is responsible for
    /// deserializing into the appropriate message type.
    fn recv(
        &mut self,
    ) -> std::pin::Pin<Box<dyn Future<Output = Result<Vec<u8>, TransportError>> + Send + '_>>;

    /// Sets the orphan stream registry for handling race conditions between
    /// stream fragments and metadata messages (RequestStreaming/ResponseStreaming).
    ///
    /// This should be called by the node layer after connection establishment,
    /// before any messages are processed.
    fn set_orphan_stream_registry(
        &mut self,
        registry: std::sync::Arc<crate::operations::orphan_streams::OrphanStreamRegistry>,
    );

    /// Sends raw stream data to the remote peer using the given operations-level StreamId.
    ///
    /// The data is fragmented and sent via the transport's outbound stream mechanism,
    /// using the provided `stream_id` (which should have the operations marker bit set)
    /// so that the receiver routes fragments through the orphan registry instead of
    /// the legacy InboundStream decode path.
    ///
    /// If `completion_tx` is provided, it will be signaled when the stream transfer
    /// completes (success or failure). Used by the broadcast queue to track when the
    /// actual data transfer finishes, not just when the send is enqueued.
    fn send_stream_data(
        &mut self,
        stream_id: crate::transport::peer_connection::StreamId,
        data: bytes::Bytes,
        metadata: Option<bytes::Bytes>,
        completion_tx: Option<tokio::sync::oneshot::Sender<()>>,
    ) -> std::pin::Pin<Box<dyn Future<Output = Result<(), TransportError>> + Send + '_>>;

    /// Pipes an inbound stream to the remote peer, forwarding fragments as they arrive.
    ///
    /// Unlike `send_stream_data` which takes complete data and fragments it, this reads
    /// from an existing `StreamHandle` and forwards each fragment incrementally without
    /// waiting for full reassembly. This enables low-latency forwarding at intermediate nodes.
    ///
    /// The `outbound_stream_id` should be created via `StreamId::next_operations()` for
    /// operations-level routing at the receiver.
    fn pipe_stream_data(
        &mut self,
        outbound_stream_id: crate::transport::peer_connection::StreamId,
        inbound_handle: crate::transport::peer_connection::streaming::StreamHandle,
        metadata: Option<bytes::Bytes>,
    ) -> std::pin::Pin<Box<dyn Future<Output = Result<(), TransportError>> + Send + '_>>;
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::transport::received_packet_tracker::ReportResult;
    use crate::transport::sent_packet_tracker::ResendAction;

    #[test]
    fn test_packet_send_receive_acknowledge_flow() {
        let mut sent_tracker = sent_packet_tracker::tests::mock_sent_packet_tracker();
        let mut received_tracker = received_packet_tracker::tests::mock_received_packet_tracker();

        // Capture effective RTO before sending (packets use this for timeout)
        let effective_rto = sent_tracker.effective_rto();

        // Simulate sending packets
        for id in 1..=5 {
            sent_tracker.report_sent_packet(id, vec![id as u8].into());
        }

        // Simulate receiving some packets
        for id in [1u32, 3, 5] {
            assert_eq!(
                received_tracker.report_received_packet(id),
                ReportResult::Ok
            );
        }

        // Get receipts and simulate acknowledging them
        let receipts = received_tracker.get_receipts();
        assert_eq!(receipts, vec![1u32, 3, 5]);
        let _ = sent_tracker.report_received_receipts(&receipts);

        // Check resend action for lost packets
        // Packets now use effective_rto() for timeout (not MESSAGE_CONFIRMATION_TIMEOUT)
        sent_tracker.time_source.advance(effective_rto);
        for id in [2, 4] {
            match sent_tracker.get_resend() {
                ResendAction::Resend(packet_id, packet) => {
                    assert_eq!(packet_id, id);
                    // Simulate resending packet
                    sent_tracker.report_sent_packet(id, packet);
                }
                ResendAction::WaitUntil(_) | ResendAction::TlpProbe(..) => {
                    panic!("Expected resend action for packet {id}")
                }
            }
        }
    }
}

#[cfg(test)]
mod version_discovery_tests {
    use super::*;

    // These tests mutate global statics and must run sequentially.
    // Use `cargo test -- --test-threads=1 version_discovery` if running standalone.

    #[test]
    fn single_reporter_not_trusted() {
        reset_version_discovery();
        report_peer_version((0, 1, 153));
        // Only 1 reporter — should not be trusted yet
        assert_eq!(get_highest_seen_version(), None);
    }

    #[test]
    fn two_reporters_trusted() {
        reset_version_discovery();
        report_peer_version((0, 1, 153));
        report_peer_version((0, 1, 153));
        assert_eq!(get_highest_seen_version(), Some((0, 1, 153)));
    }

    #[test]
    fn higher_version_resets_count() {
        reset_version_discovery();
        report_peer_version((0, 1, 153));
        report_peer_version((0, 1, 153)); // count=2, now trusted
        assert_eq!(get_highest_seen_version(), Some((0, 1, 153)));

        // New higher version resets count
        report_peer_version((0, 1, 154));
        // Only 1 reporter for 154 — not trusted yet
        assert_eq!(get_highest_seen_version(), None);

        report_peer_version((0, 1, 154));
        assert_eq!(get_highest_seen_version(), Some((0, 1, 154)));
    }

    #[test]
    fn major_minor_bumps_accepted() {
        reset_version_discovery();
        report_peer_version((1, 0, 0));
        report_peer_version((1, 0, 0));
        assert_eq!(get_highest_seen_version(), Some((1, 0, 0)));
    }
}

#[cfg(test)]
mod dual_stack_tests {
    //! These tests use real UdpSocket (not SimulationSocket) because they validate
    //! OS-level dual-stack behavior: IPv4-mapped address normalization, kernel
    //! address-family mapping in sendto(), and IPV6_V6ONLY socket option effects.
    //! SimulationSocket cannot exercise any of these kernel-level behaviors.

    use super::*;
    use std::net::{Ipv6Addr, SocketAddr};

    /// Verify that binding to [::]:0 creates a dual-stack UDP socket that
    /// can receive IPv4 traffic (via IPv4-mapped addresses).
    #[tokio::test]
    async fn udp_dual_stack_accepts_ipv4() {
        let dual_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
        let dual_sock = <UdpSocket as Socket>::bind(dual_addr)
            .await
            .expect("bind to [::]:0 should succeed");
        let bound_port = dual_sock.local_addr().unwrap().port();

        // Send a packet from an IPv4 socket to the dual-stack socket
        let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
        let v4_sender = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let target = SocketAddr::new(
            std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
            bound_port,
        );
        <UdpSocket as Socket>::send_to(&v4_sender, b"hello", target)
            .await
            .expect("send from IPv4 should succeed");

        let mut buf = [0u8; 16];
        let (len, src) = <UdpSocket as Socket>::recv_from(&dual_sock, &mut buf)
            .await
            .unwrap();
        assert_eq!(&buf[..len], b"hello");
        // recv_from must normalize ::ffff:127.0.0.1 → 127.0.0.1
        assert!(
            src.is_ipv4(),
            "IPv4 source should be normalized to plain IPv4, got {src}"
        );
    }

    /// Verify that binding to [::]:0 also accepts IPv6 traffic.
    #[tokio::test]
    async fn udp_dual_stack_accepts_ipv6() {
        let dual_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
        let dual_sock = <UdpSocket as Socket>::bind(dual_addr)
            .await
            .expect("bind to [::]:0 should succeed");
        let bound_port = dual_sock.local_addr().unwrap().port();

        let v6_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::LOCALHOST), 0);
        let v6_sender = <UdpSocket as Socket>::bind(v6_addr).await.unwrap();
        let target = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::LOCALHOST), bound_port);
        <UdpSocket as Socket>::send_to(&v6_sender, b"world", target)
            .await
            .expect("send from IPv6 should succeed");

        let mut buf = [0u8; 16];
        let (len, _src) = <UdpSocket as Socket>::recv_from(&dual_sock, &mut buf)
            .await
            .unwrap();
        assert_eq!(&buf[..len], b"world");
    }

    #[test]
    fn normalize_mapped_addr_converts_ipv4_mapped() {
        // ::ffff:127.0.0.1 → 127.0.0.1
        let mapped: SocketAddr = "[::ffff:127.0.0.1]:1234".parse().unwrap();
        let normalized = normalize_mapped_addr(mapped);
        assert_eq!(normalized, "127.0.0.1:1234".parse::<SocketAddr>().unwrap());
    }

    #[test]
    fn normalize_mapped_addr_preserves_native_ipv4() {
        let v4: SocketAddr = "1.2.3.4:5678".parse().unwrap();
        assert_eq!(normalize_mapped_addr(v4), v4);
    }

    #[test]
    fn normalize_mapped_addr_preserves_native_ipv6() {
        let v6: SocketAddr = "[2001:db8::1]:9999".parse().unwrap();
        assert_eq!(normalize_mapped_addr(v6), v6);
    }

    #[test]
    fn map_addr_for_send_maps_ipv4_on_ipv6_socket() {
        let v4: SocketAddr = "1.2.3.4:5678".parse().unwrap();
        let mapped = map_addr_for_send(true, v4);
        assert!(mapped.is_ipv6());
        if let SocketAddr::V6(v6) = mapped {
            assert_eq!(v6.ip().to_ipv4_mapped(), Some("1.2.3.4".parse().unwrap()));
            assert_eq!(v6.port(), 5678);
        }
    }

    #[test]
    fn map_addr_for_send_noop_on_ipv4_socket() {
        let v4: SocketAddr = "1.2.3.4:5678".parse().unwrap();
        assert_eq!(map_addr_for_send(false, v4), v4);
    }

    /// Verify round-trip: dual-stack socket can send back to an IPv4 peer
    /// whose address was learned via recv_from (normalized to plain IPv4).
    #[tokio::test]
    async fn udp_dual_stack_roundtrip_ipv4() {
        let dual_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
        let dual_sock = <UdpSocket as Socket>::bind(dual_addr)
            .await
            .expect("bind to [::]:0 should succeed");
        let dual_port = dual_sock.local_addr().unwrap().port();

        let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
        let v4_sock = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let v4_port = v4_sock.local_addr().unwrap().port();

        // IPv4 → dual-stack
        let target = SocketAddr::new(
            std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
            dual_port,
        );
        <UdpSocket as Socket>::send_to(&v4_sock, b"ping", target)
            .await
            .unwrap();

        let mut buf = [0u8; 16];
        let (len, src) = <UdpSocket as Socket>::recv_from(&dual_sock, &mut buf)
            .await
            .unwrap();
        assert_eq!(&buf[..len], b"ping");
        assert!(src.is_ipv4(), "source should be normalized to IPv4");

        // dual-stack → IPv4 (using the normalized address from recv_from)
        let reply_target = SocketAddr::new(src.ip(), v4_port);
        <UdpSocket as Socket>::send_to(&dual_sock, b"pong", reply_target)
            .await
            .expect("sending to normalized IPv4 addr from IPv6 socket should work");

        let (len, _) = <UdpSocket as Socket>::recv_from(&v4_sock, &mut buf)
            .await
            .unwrap();
        assert_eq!(&buf[..len], b"pong");
    }

    /// Regression: every successful send_to on the production `UdpSocket`
    /// must update the cumulative + per-peer counters used by the local
    /// dashboard. Before #3996, those counters only moved on stream-transfer
    /// completion, so a node connected for hours could legitimately show "—"
    /// for SENT despite real keep-alive traffic flowing.
    ///
    /// Receive-side accounting is owned by the post-authentication hook in
    /// `peer_connection.rs` (see #3999), so this test only asserts send
    /// behavior — `recv_from` deliberately does NOT bump any counter.
    #[tokio::test]
    async fn udp_socket_records_packet_metrics() {
        use crate::transport::metrics::TRANSPORT_METRICS;

        let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
        let sender = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let receiver = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let receiver_addr = receiver.local_addr().unwrap();
        let sender_addr = sender.local_addr().unwrap();

        let cumulative_sent_before = TRANSPORT_METRICS.cumulative_bytes_sent();

        let payload = b"keep-alive-sized-control-packet";
        <UdpSocket as Socket>::send_to(&sender, payload, receiver_addr)
            .await
            .unwrap();

        let mut buf = [0u8; 64];
        let (len, _) = <UdpSocket as Socket>::recv_from(&receiver, &mut buf)
            .await
            .unwrap();
        assert_eq!(len, payload.len());

        // Send-side cumulative counter must move at least by the payload.
        // Other tests running in parallel may push it higher, hence the >=.
        assert!(
            TRANSPORT_METRICS.cumulative_bytes_sent()
                >= cumulative_sent_before + payload.len() as u64,
            "cumulative_bytes_sent must include the payload"
        );

        // Per-peer entry for the send target must exist with bytes_sent
        // covering the payload. The reverse-direction entry (keyed by the
        // sender's address) must NOT be created by `recv_from` alone —
        // recv-side accounting moved to the post-auth hook in #3999, so
        // an unauthenticated packet contributes nothing to per-peer state.
        let peers = TRANSPORT_METRICS.per_peer_snapshot();
        let receiver_entry = peers
            .iter()
            .find(|(a, _, _)| *a == receiver_addr)
            .expect("per-peer entry for send target must exist");
        assert!(
            receiver_entry.1 >= payload.len() as u64,
            "per-peer bytes_sent must include the payload (got {})",
            receiver_entry.1
        );
        assert!(
            peers.iter().all(|(a, _, _)| *a != sender_addr),
            "recv_from must not create a per-peer entry pre-authentication"
        );
    }

    /// Regression: failed sends must not bump the metrics. Without this,
    /// a future refactor could move the increment before the result check
    /// and silently inflate the counters on every error.
    #[tokio::test]
    async fn udp_socket_failed_send_does_not_record_metrics() {
        use crate::transport::metrics::TRANSPORT_METRICS;

        let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
        let sender = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();

        // Connect the socket to a known-bad address so subsequent sends
        // synchronously fail with ECONNREFUSED on Linux. This is the
        // simplest way to force a Send error from `send_to`.
        sender
            .connect("127.0.0.1:1") // port 1 is reserved; nothing listens
            .await
            .ok();

        let unbound: SocketAddr = "127.0.0.1:1".parse().unwrap();
        let cumulative_sent_before = TRANSPORT_METRICS.cumulative_bytes_sent();

        // Capture per-peer state before — the exact behavior depends on the
        // OS (some kernels still accept the sendto), so we tolerate either
        // outcome but assert that *if* the send fails, no metric moves.
        let result = <UdpSocket as Socket>::send_to(&sender, b"x", unbound).await;
        if result.is_err() {
            assert_eq!(
                TRANSPORT_METRICS.cumulative_bytes_sent(),
                cumulative_sent_before,
                "failed send must not bump cumulative_bytes_sent"
            );
            // Per-peer entry for the unbound address should also not exist
            // due to this send. (Other tests may have created it; this is
            // why we don't assert absence outright.)
        }
    }

    /// Verify that `send_to_blocking` (the synchronous code path used in
    /// `spawn_blocking` contexts) also records metrics on success. Same
    /// dashboard-correctness guarantee as the async path. Runs inside a
    /// `spawn_blocking` so the call is in a real blocking context with the
    /// tokio reactor still active for `try_send_to` polling.
    #[tokio::test]
    async fn udp_socket_blocking_send_records_packet_metrics() {
        use crate::transport::metrics::TRANSPORT_METRICS;
        use std::sync::Arc;

        let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
        let sender = Arc::new(<UdpSocket as Socket>::bind(v4_addr).await.unwrap());
        let receiver = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let receiver_addr = receiver.local_addr().unwrap();

        let cumulative_sent_before = TRANSPORT_METRICS.cumulative_bytes_sent();
        let payload: &'static [u8] = b"blocking-send-payload";
        let sender_clone = sender.clone();

        tokio::task::spawn_blocking(move || {
            <UdpSocket as Socket>::send_to_blocking(&sender_clone, payload, receiver_addr)
                .expect("blocking send should succeed on localhost UDP");
        })
        .await
        .unwrap();

        // Drain the receiver so the kernel buffer doesn't leak across tests.
        let mut buf = [0u8; 64];
        let _drain = <UdpSocket as Socket>::recv_from(&receiver, &mut buf).await;

        assert!(
            TRANSPORT_METRICS.cumulative_bytes_sent()
                >= cumulative_sent_before + payload.len() as u64,
            "send_to_blocking must record cumulative_bytes_sent"
        );
        let peers = TRANSPORT_METRICS.per_peer_snapshot();
        let entry = peers
            .iter()
            .find(|(a, _, _)| *a == receiver_addr)
            .expect("per-peer entry must exist after blocking send");
        assert!(
            entry.1 >= payload.len() as u64,
            "send_to_blocking must record per-peer bytes_sent"
        );
    }

    /// On a dual-stack IPv6 socket sending to an IPv4 target, the kernel
    /// receives the AF_INET6-mapped form, but the per-peer entry MUST be
    /// keyed by the canonical (un-mapped) IPv4 form. Otherwise the dashboard
    /// would split a single peer across two map entries when the local
    /// socket happens to be IPv6.
    #[tokio::test]
    async fn udp_socket_dual_stack_send_keys_per_peer_by_unmapped_addr() {
        use crate::transport::metrics::TRANSPORT_METRICS;

        let dual_addr = SocketAddr::new(std::net::IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0);
        let dual_sock = <UdpSocket as Socket>::bind(dual_addr).await.unwrap();
        let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
        let v4_recv = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let v4_recv_port = v4_recv.local_addr().unwrap().port();

        let target = SocketAddr::new(
            std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST),
            v4_recv_port,
        );
        assert!(target.is_ipv4(), "test precondition");

        <UdpSocket as Socket>::send_to(&dual_sock, b"dual-stack", target)
            .await
            .unwrap();

        let peers = TRANSPORT_METRICS.per_peer_snapshot();
        let entry = peers
            .iter()
            .find(|(a, _, _)| *a == target)
            .expect("per-peer entry must be keyed by un-mapped IPv4 target");
        assert!(
            entry.0.is_ipv4(),
            "per-peer key must be plain IPv4, not ::ffff:x.x.x.x"
        );
        // Same target in mapped form must NOT have its own entry.
        let mapped: SocketAddr = format!("[::ffff:127.0.0.1]:{v4_recv_port}")
            .parse()
            .unwrap();
        assert!(
            peers.iter().all(|(a, _, _)| *a != mapped),
            "mapped form must not produce a duplicate per-peer entry"
        );
    }

    /// Regression for #3999: an attacker that fabricates UDP packets from
    /// many spoofed source IPs MUST NOT be able to inflate the dashboard
    /// cumulative_bytes_received counter or fill the bounded per-peer
    /// table. `recv_from` is the kernel handoff point that sees every
    /// packet regardless of authentication, so the fix is "don't meter
    /// here" — receive-side metering moved to the post-`try_decrypt_sym`
    /// hook in `peer_connection::PeerConnection::recv`.
    #[tokio::test]
    async fn udp_socket_recv_from_does_not_record_pre_authentication() {
        use crate::transport::metrics::TRANSPORT_METRICS;

        let v4_addr = SocketAddr::new(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 0);
        let attacker = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let victim = <UdpSocket as Socket>::bind(v4_addr).await.unwrap();
        let victim_addr = victim.local_addr().unwrap();
        let attacker_addr = attacker.local_addr().unwrap();

        // Simulate a forged UDP packet arriving at the listening port: the
        // "attacker" socket stands in for any unauthenticated sender.
        let payload = vec![0xAB; 1024];
        attacker.send_to(&payload, victim_addr).await.unwrap();

        let mut buf = [0u8; 2048];
        let (len, src) = <UdpSocket as Socket>::recv_from(&victim, &mut buf)
            .await
            .unwrap();
        assert_eq!(len, payload.len());
        assert_eq!(src, attacker_addr);

        // The packet never reached `try_decrypt_sym`, so it must not have
        // created a per-peer entry. (Cumulative is process-global and may
        // be moved by concurrent authenticated paths; only per-peer
        // absence is reliably observable from this test.)
        let peers = TRANSPORT_METRICS.per_peer_snapshot();
        assert!(
            peers.iter().all(|(a, _, _)| *a != attacker_addr),
            "spoofed (un-authenticated) source MUST NOT create a per-peer entry"
        );
    }
}