saorsa_core/
transport.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: david@saorsalabs.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Transport Layer
15//!
16//! This module provides native ant-quic integration for the P2P Foundation.
17//!
18//! ## Migration Notice
19//! The old Transport/Connection trait abstractions have been deprecated
20//! in favor of direct ant-quic integration via P2PNetworkNode.
21//!
22//! Use `ant_quic_adapter::P2PNetworkNode` directly for all networking needs.
23
24// ant-quic is used directly via ant_quic_adapter module
25
26// Native ant-quic integration with advanced NAT traversal and PQC support
27pub mod ant_quic_adapter;
28
29// Tests for old QuicTransport - removed during ant-quic migration
30// #[cfg(test)]
31// mod quic_error_tests;
32
33use crate::validation::{Validate, ValidationContext, validate_message_size, validate_peer_id};
34use crate::{NetworkAddress, P2PError, PeerId, Result};
35use async_trait::async_trait;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{Mutex, RwLock};
42use tracing::{debug, info, warn};
43
44/// Transport protocol types
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub enum TransportType {
47    /// QUIC transport protocol with NAT traversal
48    QUIC,
49}
50
51/// Transport selection strategy (simplified for QUIC-only)
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
53pub enum TransportSelection {
54    /// Use QUIC transport (default and only option)
55    #[default]
56    QUIC,
57}
58
59/// Connection quality metrics
60#[derive(Debug, Clone)]
61pub struct ConnectionQuality {
62    /// Round-trip latency
63    pub latency: Duration,
64    /// Throughput in Mbps
65    pub throughput_mbps: f64,
66    /// Packet loss percentage
67    pub packet_loss: f64,
68    /// Jitter (latency variation)
69    pub jitter: Duration,
70    /// Connection establishment time
71    pub connect_time: Duration,
72}
73
74/// Connection information
75#[derive(Debug, Clone)]
76pub struct ConnectionInfo {
77    /// Transport type being used
78    pub transport_type: TransportType,
79    /// Local address
80    pub local_addr: NetworkAddress,
81    /// Remote address
82    pub remote_addr: NetworkAddress,
83    /// Whether connection is encrypted
84    pub is_encrypted: bool,
85    /// Cipher suite being used
86    pub cipher_suite: String,
87    /// Whether 0-RTT was used
88    pub used_0rtt: bool,
89    /// Connection establishment time
90    pub established_at: Instant,
91    /// Last activity timestamp
92    pub last_activity: Instant,
93}
94
95/// Connection pool information
96#[derive(Debug, Clone)]
97pub struct ConnectionPoolInfo {
98    /// Number of active connections
99    pub active_connections: usize,
100    /// Total connections ever created
101    pub total_connections: usize,
102    /// Bytes sent through pool
103    pub bytes_sent: u64,
104    /// Bytes received through pool
105    pub bytes_received: u64,
106}
107
108/// Connection pool statistics
109#[derive(Debug, Clone)]
110pub struct ConnectionPoolStats {
111    /// Messages sent per connection
112    pub messages_per_connection: HashMap<String, usize>,
113    /// Bytes per connection
114    pub bytes_per_connection: HashMap<String, u64>,
115    /// Average latency per connection
116    pub latency_per_connection: HashMap<String, Duration>,
117}
118
119/// Message received from transport
120#[derive(Debug, Clone)]
121pub struct TransportMessage {
122    /// Sender peer ID
123    pub sender: PeerId,
124    /// Message data
125    pub data: Vec<u8>,
126    /// Protocol identifier
127    pub protocol: String,
128    /// Timestamp when received
129    pub received_at: Instant,
130}
131
132impl Validate for TransportMessage {
133    fn validate(&self, ctx: &ValidationContext) -> Result<()> {
134        // Validate sender peer ID
135        validate_peer_id(&self.sender)?;
136
137        // Validate message size
138        validate_message_size(self.data.len(), ctx.max_message_size)?;
139
140        // Validate protocol identifier
141        if self.protocol.is_empty() || self.protocol.len() > 64 {
142            return Err(P2PError::validation("Invalid protocol identifier"));
143        }
144
145        Ok(())
146    }
147}
148
149/// Transport trait for protocol implementations
150#[allow(dead_code)] // Deprecated during ant-quic migration
151#[async_trait]
152pub trait Transport: Send + Sync {
153    /// Start listening on the given address
154    async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress>;
155
156    /// Accept incoming connections (for server-side)
157    async fn accept(&self) -> Result<Box<dyn Connection>>;
158
159    /// Connect to a remote peer
160    async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>>;
161
162    /// Connect with specific transport options
163    async fn connect_with_options(
164        &self,
165        addr: NetworkAddress,
166        options: TransportOptions,
167    ) -> Result<Box<dyn Connection>>;
168
169    /// Check if this transport supports IPv6 (deprecated - IPv4-only focus)
170    fn supports_ipv6(&self) -> bool;
171
172    /// Get transport type
173    fn transport_type(&self) -> TransportType;
174
175    /// Check if address is supported
176    fn supports_address(&self, addr: &NetworkAddress) -> bool;
177}
178
179/// Connection trait for active connections
180#[allow(dead_code)] // Deprecated during ant-quic migration
181#[async_trait]
182pub trait Connection: Send + Sync {
183    /// Send data over the connection
184    async fn send(&mut self, data: &[u8]) -> Result<()>;
185
186    /// Receive data from the connection
187    async fn receive(&mut self) -> Result<Vec<u8>>;
188
189    /// Get connection info
190    async fn info(&self) -> ConnectionInfo;
191
192    /// Close the connection
193    async fn close(&mut self) -> Result<()>;
194
195    /// Check if connection is alive
196    async fn is_alive(&self) -> bool;
197
198    /// Measure connection quality
199    async fn measure_quality(&self) -> Result<ConnectionQuality>;
200
201    /// Get local address
202    fn local_addr(&self) -> NetworkAddress;
203
204    /// Get remote address
205    fn remote_addr(&self) -> NetworkAddress;
206}
207
208/// Transport configuration options
209#[derive(Debug, Clone)]
210pub struct TransportOptions {
211    /// Enable 0-RTT for QUIC
212    pub enable_0rtt: bool,
213    /// Force encryption
214    pub require_encryption: bool,
215    /// Connection timeout
216    pub connect_timeout: Duration,
217    /// Keep-alive interval
218    pub keep_alive: Duration,
219    /// Maximum message size
220    pub max_message_size: usize,
221}
222
223/// Transport manager coordinates different transport protocols
224#[allow(dead_code)] // Deprecated during ant-quic migration
225pub struct TransportManager {
226    /// Available transports
227    transports: HashMap<TransportType, Arc<dyn Transport>>,
228    /// Active connections
229    connections: Arc<RwLock<HashMap<PeerId, Arc<Mutex<ConnectionPool>>>>>,
230    /// Transport selection strategy
231    selection: TransportSelection,
232    /// Configuration options
233    options: TransportOptions,
234}
235
236/// Connection pool for a specific peer
237struct ConnectionPool {
238    /// Active connections
239    connections: Vec<Arc<Mutex<Box<dyn Connection>>>>,
240    /// Connection info cache (reserved for future use)
241    _info_cache: HashMap<String, ConnectionInfo>,
242    /// Pool statistics
243    stats: ConnectionPoolStats,
244    /// Pool configuration
245    max_connections: usize,
246    /// Round-robin index for load balancing
247    round_robin_index: usize,
248}
249
250impl TransportManager {
251    /// Create a new transport manager
252    pub fn new(selection: TransportSelection, options: TransportOptions) -> Self {
253        Self {
254            transports: HashMap::new(),
255            connections: Arc::new(RwLock::new(HashMap::new())),
256            selection,
257            options,
258        }
259    }
260
261    /// Register a transport implementation
262    pub fn register_transport(&mut self, transport: Arc<dyn Transport>) {
263        let transport_type = transport.transport_type();
264        self.transports.insert(transport_type, transport);
265        info!("Registered transport: {:?}", transport_type);
266    }
267
268    /// Connect to a peer using the best available transport
269    pub async fn connect(&self, addr: NetworkAddress) -> Result<PeerId> {
270        let transport_type = self.select_transport(&addr).await?;
271        let transport = self.transports.get(&transport_type).ok_or_else(|| {
272            P2PError::Transport(crate::error::TransportError::SetupFailed(
273                format!("Transport {transport_type:?} not available").into(),
274            ))
275        })?;
276
277        debug!("Connecting to {} using {:?}", addr, transport_type);
278
279        let connection = transport
280            .connect_with_options(addr.clone(), self.options.clone())
281            .await?;
282        let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port()); // Simplified peer ID
283
284        // Add to connection pool
285        self.add_connection(peer_id.clone(), connection).await?;
286
287        info!("Connected to peer {} via {:?}", peer_id, transport_type);
288        Ok(peer_id)
289    }
290
291    /// Connect with specific transport
292    pub async fn connect_with_transport(
293        &self,
294        addr: NetworkAddress,
295        transport_type: TransportType,
296    ) -> Result<PeerId> {
297        let transport = self.transports.get(&transport_type).ok_or_else(|| {
298            P2PError::Transport(crate::error::TransportError::SetupFailed(
299                format!("Transport {transport_type:?} not available").into(),
300            ))
301        })?;
302
303        let connection = transport
304            .connect_with_options(addr.clone(), self.options.clone())
305            .await?;
306        let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port());
307
308        self.add_connection(peer_id.clone(), connection).await?;
309        Ok(peer_id)
310    }
311
312    /// Send message to a peer
313    pub async fn send_message(&self, peer_id: &PeerId, data: Vec<u8>) -> Result<()> {
314        let connections = self.connections.read().await;
315        let pool = connections.get(peer_id).ok_or_else(|| {
316            P2PError::Network(crate::error::NetworkError::PeerNotFound(
317                peer_id.to_string().into(),
318            ))
319        })?;
320
321        let mut pool_guard = pool.lock().await;
322        let connection = pool_guard.get_connection()?;
323
324        let mut conn_guard = connection.lock().await;
325        conn_guard.send(&data).await?;
326
327        debug!("Sent {} bytes to peer {}", data.len(), peer_id);
328        Ok(())
329    }
330
331    /// Get connection info for a peer
332    pub async fn get_connection_info(&self, peer_id: &PeerId) -> Result<ConnectionInfo> {
333        let connections = self.connections.read().await;
334        let pool = connections.get(peer_id).ok_or_else(|| {
335            P2PError::Network(crate::error::NetworkError::PeerNotFound(
336                peer_id.to_string().into(),
337            ))
338        })?;
339
340        let mut pool_guard = pool.lock().await;
341        let connection = pool_guard.get_connection()?;
342        let conn_guard = connection.lock().await;
343
344        Ok(conn_guard.info().await)
345    }
346
347    /// Get connection pool info
348    pub async fn get_connection_pool_info(&self, peer_id: &PeerId) -> Result<ConnectionPoolInfo> {
349        let connections = self.connections.read().await;
350        let pool = connections.get(peer_id).ok_or_else(|| {
351            P2PError::Network(crate::error::NetworkError::PeerNotFound(
352                peer_id.to_string().into(),
353            ))
354        })?;
355
356        let pool_guard = pool.lock().await;
357        Ok(ConnectionPoolInfo {
358            active_connections: pool_guard.connections.len(),
359            total_connections: pool_guard.stats.messages_per_connection.len(),
360            bytes_sent: pool_guard.stats.bytes_per_connection.values().sum(),
361            bytes_received: 0, // TODO: Track separately
362        })
363    }
364
365    /// Get connection pool statistics
366    pub async fn get_connection_pool_stats(&self, peer_id: &PeerId) -> Result<ConnectionPoolStats> {
367        let connections = self.connections.read().await;
368        let pool = connections.get(peer_id).ok_or_else(|| {
369            P2PError::Network(crate::error::NetworkError::PeerNotFound(
370                peer_id.to_string().into(),
371            ))
372        })?;
373
374        let pool_guard = pool.lock().await;
375        Ok(pool_guard.stats.clone())
376    }
377
378    /// Measure connection quality
379    pub async fn measure_connection_quality(&self, peer_id: &PeerId) -> Result<ConnectionQuality> {
380        let connections = self.connections.read().await;
381        let pool = connections.get(peer_id).ok_or_else(|| {
382            P2PError::Network(crate::error::NetworkError::PeerNotFound(
383                peer_id.to_string().into(),
384            ))
385        })?;
386
387        let mut pool_guard = pool.lock().await;
388        let connection = pool_guard.get_connection()?;
389        let conn_guard = connection.lock().await;
390
391        conn_guard.measure_quality().await
392    }
393
394    /// Switch transport for a peer
395    pub async fn switch_transport(
396        &self,
397        peer_id: &PeerId,
398        _new_transport: TransportType,
399    ) -> Result<()> {
400        // This is a placeholder implementation
401        // In reality, this would establish a new connection with the new transport
402        // and gracefully migrate the existing connection
403
404        warn!(
405            "Transport switching not yet fully implemented for peer {}",
406            peer_id
407        );
408        Ok(())
409    }
410
411    /// Select transport for an address (always QUIC)
412    async fn select_transport(&self, _addr: &NetworkAddress) -> Result<TransportType> {
413        match &self.selection {
414            TransportSelection::QUIC => {
415                if self.transports.contains_key(&TransportType::QUIC) {
416                    Ok(TransportType::QUIC)
417                } else {
418                    Err(P2PError::Transport(
419                        crate::error::TransportError::SetupFailed(
420                            "QUIC transport not available".into(),
421                        ),
422                    ))
423                }
424            }
425        }
426    }
427
428    /// Auto-select transport (always QUIC in this implementation)
429    #[allow(dead_code)]
430    async fn auto_select_transport(&self, addr: &NetworkAddress) -> Result<TransportType> {
431        // Always use QUIC as it's the only transport protocol
432        if self.transports.contains_key(&TransportType::QUIC)
433            && let Some(transport) = self.transports.get(&TransportType::QUIC)
434            && transport.supports_address(addr)
435        {
436            debug!(
437                "Using QUIC transport for {} (only available transport)",
438                addr
439            );
440            return Ok(TransportType::QUIC);
441        }
442
443        Err(P2PError::Transport(
444            crate::error::TransportError::SetupFailed(
445                "QUIC transport not available or address not supported"
446                    .to_string()
447                    .into(),
448            ),
449        ))
450    }
451
452    /// Add connection to pool
453    async fn add_connection(&self, peer_id: PeerId, connection: Box<dyn Connection>) -> Result<()> {
454        let mut connections = self.connections.write().await;
455
456        let pool = connections.entry(peer_id.clone()).or_insert_with(|| {
457            Arc::new(Mutex::new(ConnectionPool::new(3))) // Default max 3 connections per peer
458        });
459
460        let mut pool_guard = pool.lock().await;
461        pool_guard.add_connection(connection).await?;
462
463        Ok(())
464    }
465}
466
467impl ConnectionPool {
468    /// Create a new connection pool
469    fn new(max_connections: usize) -> Self {
470        Self {
471            connections: Vec::new(),
472            _info_cache: HashMap::new(),
473            stats: ConnectionPoolStats {
474                messages_per_connection: HashMap::new(),
475                bytes_per_connection: HashMap::new(),
476                latency_per_connection: HashMap::new(),
477            },
478            max_connections,
479            round_robin_index: 0,
480        }
481    }
482
483    /// Add a connection to the pool
484    async fn add_connection(&mut self, connection: Box<dyn Connection>) -> Result<()> {
485        if self.connections.len() >= self.max_connections {
486            // Remove oldest connection
487            self.connections.remove(0);
488        }
489
490        let conn_id = format!("conn_{}", self.connections.len());
491        self.stats
492            .messages_per_connection
493            .insert(conn_id.clone(), 0);
494        self.stats.bytes_per_connection.insert(conn_id.clone(), 0);
495        self.stats
496            .latency_per_connection
497            .insert(conn_id, Duration::from_millis(0));
498
499        self.connections.push(Arc::new(Mutex::new(connection)));
500        Ok(())
501    }
502
503    /// Get a connection using round-robin load balancing
504    fn get_connection(&mut self) -> Result<Arc<Mutex<Box<dyn Connection>>>> {
505        if self.connections.is_empty() {
506            return Err(P2PError::Network(
507                crate::error::NetworkError::ProtocolError(
508                    "No connections available".to_string().into(),
509                ),
510            ));
511        }
512
513        let connection = self.connections[self.round_robin_index % self.connections.len()].clone();
514        self.round_robin_index += 1;
515
516        // Update stats
517        let conn_id = format!("conn_{}", self.round_robin_index % self.connections.len());
518        if let Some(count) = self.stats.messages_per_connection.get_mut(&conn_id) {
519            *count += 1;
520        }
521
522        Ok(connection)
523    }
524}
525
526impl fmt::Display for TransportType {
527    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528        match self {
529            TransportType::QUIC => write!(f, "quic"),
530        }
531    }
532}
533
534impl Default for TransportOptions {
535    fn default() -> Self {
536        Self {
537            enable_0rtt: true,
538            require_encryption: true,
539            connect_timeout: Duration::from_secs(30),
540            keep_alive: Duration::from_secs(60),
541            max_message_size: 64 * 1024 * 1024, // 64MB
542        }
543    }
544}
545
546impl Default for ConnectionQuality {
547    fn default() -> Self {
548        Self {
549            latency: Duration::from_millis(50),
550            throughput_mbps: 100.0,
551            packet_loss: 0.0,
552            jitter: Duration::from_millis(5),
553            connect_time: Duration::from_millis(100),
554        }
555    }
556}
557/// Legacy transport types module for backward compatibility
558pub mod transport_types {
559    pub use super::TransportType;
560}
561
562// Re-export transport implementation
563// pub use quic::QuicTransport; // Disabled during ant-quic migration
564// pub use ant_quic_adapter::AntQuicAdapter; // Available but not needed for now
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use crate::error::NetworkError;
570    use async_trait::async_trait;
571    use std::sync::atomic::{AtomicUsize, Ordering};
572    use tokio::time::Duration;
573
574    /// Helper function to parse addresses in tests
575    #[allow(dead_code)]
576    fn parse_addr(addr: &str) -> Result<NetworkAddress> {
577        addr.parse::<NetworkAddress>().map_err(|e| {
578            P2PError::Network(crate::error::NetworkError::InvalidAddress(
579                e.to_string().into(),
580            ))
581        })
582    }
583
584    /// Mock transport implementation for testing
585    struct MockTransport {
586        transport_type: TransportType,
587        should_fail: bool,
588        supports_all: bool,
589    }
590
591    impl MockTransport {
592        fn new(transport_type: TransportType) -> Self {
593            Self {
594                transport_type,
595                should_fail: false,
596                supports_all: true,
597            }
598        }
599
600        fn with_failure(mut self) -> Self {
601            self.should_fail = true;
602            self
603        }
604
605        fn with_limited_support(mut self) -> Self {
606            self.supports_all = false;
607            self
608        }
609    }
610
611    #[async_trait]
612    impl Transport for MockTransport {
613        async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress> {
614            if self.should_fail {
615                return Err(P2PError::Transport(
616                    crate::error::TransportError::SetupFailed("Listen failed".to_string().into()),
617                ));
618            }
619            Ok(addr)
620        }
621
622        async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>> {
623            if self.should_fail {
624                return Err(P2PError::Transport(
625                    crate::error::TransportError::SetupFailed(
626                        "Connection failed".to_string().into(),
627                    ),
628                ));
629            }
630            Ok(Box::new(MockConnection::new(addr)))
631        }
632
633        async fn connect_with_options(
634            &self,
635            addr: NetworkAddress,
636            _options: TransportOptions,
637        ) -> Result<Box<dyn Connection>> {
638            self.connect(addr).await
639        }
640
641        async fn accept(&self) -> Result<Box<dyn Connection>> {
642            if self.should_fail {
643                return Err(P2PError::Transport(
644                    crate::error::TransportError::SetupFailed("Accept failed".into()),
645                ));
646            }
647            Ok(Box::new(MockConnection::new(
648                "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
649                    crate::error::TransportError::SetupFailed(
650                        format!("Invalid mock address: {}", e).into(),
651                    )
652                })?,
653            )))
654        }
655
656        fn supports_ipv6(&self) -> bool {
657            true // Dual-stack support enabled
658        }
659
660        fn transport_type(&self) -> TransportType {
661            self.transport_type
662        }
663
664        fn supports_address(&self, addr: &NetworkAddress) -> bool {
665            // Support both IPv4 and IPv6
666            addr.is_ipv4() || addr.is_ipv6()
667        }
668    }
669
670    /// Mock connection implementation for testing
671    struct MockConnection {
672        remote_addr: NetworkAddress,
673        is_alive: bool,
674        bytes_sent: AtomicUsize,
675        bytes_received: AtomicUsize,
676    }
677
678    impl MockConnection {
679        fn new(remote_addr: NetworkAddress) -> Self {
680            Self {
681                remote_addr,
682                is_alive: true,
683                bytes_sent: AtomicUsize::new(0),
684                bytes_received: AtomicUsize::new(0),
685            }
686        }
687    }
688
689    #[async_trait]
690    impl Connection for MockConnection {
691        async fn send(&mut self, data: &[u8]) -> Result<()> {
692            if !self.is_alive {
693                return Err(P2PError::Network(
694                    crate::error::NetworkError::PeerDisconnected {
695                        peer: "unknown".to_string(),
696                        reason: "Connection closed".to_string(),
697                    },
698                ));
699            }
700            self.bytes_sent.fetch_add(data.len(), Ordering::Relaxed);
701            Ok(())
702        }
703
704        async fn receive(&mut self) -> Result<Vec<u8>> {
705            if !self.is_alive {
706                return Err(P2PError::Network(
707                    crate::error::NetworkError::PeerDisconnected {
708                        peer: "unknown".to_string(),
709                        reason: "Connection closed".to_string(),
710                    },
711                ));
712            }
713            let data = b"mock_response".to_vec();
714            self.bytes_received.fetch_add(data.len(), Ordering::Relaxed);
715            Ok(data)
716        }
717
718        async fn info(&self) -> ConnectionInfo {
719            ConnectionInfo {
720                transport_type: TransportType::QUIC,
721                local_addr: "127.0.0.1:9000"
722                    .parse::<NetworkAddress>()
723                    .expect("Test address should be valid"),
724                remote_addr: self.remote_addr.clone(),
725                is_encrypted: true,
726                cipher_suite: "TLS_AES_256_GCM_SHA384".to_string(),
727                used_0rtt: false,
728                established_at: Instant::now(),
729                last_activity: Instant::now(),
730            }
731        }
732
733        async fn close(&mut self) -> Result<()> {
734            self.is_alive = false;
735            Ok(())
736        }
737
738        async fn is_alive(&self) -> bool {
739            self.is_alive
740        }
741
742        async fn measure_quality(&self) -> Result<ConnectionQuality> {
743            Ok(ConnectionQuality {
744                latency: Duration::from_millis(10),
745                throughput_mbps: 1000.0,
746                packet_loss: 0.1,
747                jitter: Duration::from_millis(2),
748                connect_time: Duration::from_millis(50),
749            })
750        }
751
752        fn local_addr(&self) -> NetworkAddress {
753            "127.0.0.1:9000"
754                .parse::<NetworkAddress>()
755                .expect("Test address should be valid")
756        }
757
758        fn remote_addr(&self) -> NetworkAddress {
759            self.remote_addr.clone()
760        }
761    }
762
763    fn create_test_transport_manager() -> TransportManager {
764        let options = TransportOptions::default();
765        TransportManager::new(TransportSelection::QUIC, options)
766    }
767
768    #[test]
769    fn test_transport_type_display() {
770        assert_eq!(format!("{}", TransportType::QUIC), "quic");
771    }
772
773    #[test]
774    fn test_transport_type_serialization() {
775        let quic_type = TransportType::QUIC;
776
777        assert_eq!(quic_type, TransportType::QUIC);
778    }
779
780    #[test]
781    fn test_transport_selection_variants() {
782        let quic_selection = TransportSelection::QUIC;
783
784        assert!(matches!(quic_selection, TransportSelection::QUIC));
785    }
786
787    #[test]
788    fn test_transport_selection_default() {
789        let default = TransportSelection::default();
790        assert!(matches!(default, TransportSelection::QUIC));
791    }
792
793    #[test]
794    fn test_transport_options_default() {
795        let options = TransportOptions::default();
796
797        assert!(options.enable_0rtt);
798        assert!(options.require_encryption);
799        assert_eq!(options.connect_timeout, Duration::from_secs(30));
800        assert_eq!(options.keep_alive, Duration::from_secs(60));
801        assert_eq!(options.max_message_size, 64 * 1024 * 1024);
802    }
803
804    #[test]
805    fn test_connection_quality_default() {
806        let quality = ConnectionQuality::default();
807
808        assert_eq!(quality.latency, Duration::from_millis(50));
809        assert_eq!(quality.throughput_mbps, 100.0);
810        assert_eq!(quality.packet_loss, 0.0);
811        assert_eq!(quality.jitter, Duration::from_millis(5));
812        assert_eq!(quality.connect_time, Duration::from_millis(100));
813    }
814
815    #[tokio::test]
816    async fn test_transport_manager_creation() {
817        let manager = create_test_transport_manager();
818        assert!(manager.transports.is_empty());
819    }
820
821    #[tokio::test]
822    async fn test_transport_registration() {
823        let mut manager = create_test_transport_manager();
824        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
825
826        manager.register_transport(quic_transport.clone());
827
828        assert_eq!(manager.transports.len(), 1);
829        assert!(manager.transports.contains_key(&TransportType::QUIC));
830    }
831
832    #[tokio::test]
833    async fn test_connection_establishment() -> Result<()> {
834        let mut manager = create_test_transport_manager();
835        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
836        manager.register_transport(transport);
837
838        let peer_id = manager
839            .connect("127.0.0.1:9001".parse::<NetworkAddress>().map_err(|e| {
840                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
841            })?)
842            .await?;
843        assert_eq!(peer_id, "peer_from_127.0.0.1_9001");
844
845        let connections = manager.connections.read().await;
846        assert!(connections.contains_key(&peer_id));
847
848        Ok(())
849    }
850
851    #[tokio::test]
852    async fn test_connection_with_specific_transport() -> Result<()> {
853        let mut manager = create_test_transport_manager();
854        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
855        manager.register_transport(transport);
856
857        let peer_id = manager
858            .connect_with_transport(
859                "127.0.0.1:9002".parse::<NetworkAddress>().map_err(|e| {
860                    P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
861                })?,
862                TransportType::QUIC,
863            )
864            .await?;
865
866        assert_eq!(peer_id, "peer_from_127.0.0.1_9002");
867        Ok(())
868    }
869
870    #[tokio::test]
871    async fn test_connection_failure_handling() -> Result<()> {
872        let mut manager = create_test_transport_manager();
873        let failing_transport = Arc::new(MockTransport::new(TransportType::QUIC).with_failure());
874        manager.register_transport(failing_transport);
875
876        let result = manager
877            .connect("127.0.0.1:9003".parse::<NetworkAddress>().map_err(|e| {
878                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
879            })?)
880            .await;
881        assert!(result.is_err());
882        assert!(
883            result
884                .unwrap_err()
885                .to_string()
886                .contains("Connection failed")
887        );
888        Ok(())
889    }
890
891    #[tokio::test]
892    async fn test_message_sending() -> Result<()> {
893        let mut manager = create_test_transport_manager();
894        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
895        manager.register_transport(transport);
896
897        let peer_id = manager
898            .connect("127.0.0.1:9004".parse::<NetworkAddress>().map_err(|e| {
899                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
900            })?)
901            .await?;
902        let message = b"Hello, transport!".to_vec();
903
904        manager.send_message(&peer_id, message.clone()).await?;
905
906        // Verify message was processed
907        let pool_info = manager.get_connection_pool_info(&peer_id).await?;
908        assert_eq!(pool_info.active_connections, 1);
909
910        Ok(())
911    }
912
913    #[tokio::test]
914    async fn test_message_sending_no_connection() {
915        let manager = create_test_transport_manager();
916        let result = manager
917            .send_message(&"nonexistent_peer".to_string(), vec![1, 2, 3])
918            .await;
919
920        assert!(result.is_err());
921        assert!(result.unwrap_err().to_string().contains("Peer not found"));
922    }
923
924    #[tokio::test]
925    async fn test_connection_info_retrieval() -> Result<()> {
926        let mut manager = create_test_transport_manager();
927        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
928        manager.register_transport(transport);
929
930        let peer_id = manager
931            .connect("127.0.0.1:9005".parse::<NetworkAddress>().map_err(|e| {
932                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
933            })?)
934            .await?;
935        let info = manager.get_connection_info(&peer_id).await?;
936
937        assert_eq!(info.transport_type, TransportType::QUIC);
938        assert_eq!(
939            info.remote_addr,
940            "127.0.0.1:9005"
941                .parse::<NetworkAddress>()
942                .map_err(|e| P2PError::Network(NetworkError::InvalidAddress(
943                    format!("{}", e).into()
944                )))?
945        );
946        assert!(info.is_encrypted);
947        assert_eq!(info.cipher_suite, "TLS_AES_256_GCM_SHA384");
948
949        Ok(())
950    }
951
952    #[tokio::test]
953    async fn test_connection_pool_info() -> Result<()> {
954        let mut manager = create_test_transport_manager();
955        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
956        manager.register_transport(transport);
957
958        let peer_id = manager
959            .connect("127.0.0.1:9006".parse::<NetworkAddress>().map_err(|e| {
960                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
961            })?)
962            .await?;
963        let pool_info = manager.get_connection_pool_info(&peer_id).await?;
964
965        assert_eq!(pool_info.active_connections, 1);
966        assert_eq!(pool_info.total_connections, 1);
967        assert_eq!(pool_info.bytes_sent, 0);
968
969        Ok(())
970    }
971
972    #[tokio::test]
973    async fn test_connection_pool_stats() -> Result<()> {
974        let mut manager = create_test_transport_manager();
975        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
976        manager.register_transport(transport);
977
978        let peer_id = manager
979            .connect("127.0.0.1:9007".parse::<NetworkAddress>().map_err(|e| {
980                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
981            })?)
982            .await?;
983        let stats = manager.get_connection_pool_stats(&peer_id).await?;
984
985        assert_eq!(stats.messages_per_connection.len(), 1);
986        assert_eq!(stats.bytes_per_connection.len(), 1);
987        assert_eq!(stats.latency_per_connection.len(), 1);
988
989        Ok(())
990    }
991
992    #[tokio::test]
993    async fn test_connection_quality_measurement() -> Result<()> {
994        let mut manager = create_test_transport_manager();
995        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
996        manager.register_transport(transport);
997
998        let peer_id = manager
999            .connect("127.0.0.1:9008".parse::<NetworkAddress>().map_err(|e| {
1000                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1001            })?)
1002            .await?;
1003        let quality = manager.measure_connection_quality(&peer_id).await?;
1004
1005        assert_eq!(quality.latency, Duration::from_millis(10));
1006        assert_eq!(quality.throughput_mbps, 1000.0);
1007        assert_eq!(quality.packet_loss, 0.1);
1008        assert_eq!(quality.jitter, Duration::from_millis(2));
1009
1010        Ok(())
1011    }
1012
1013    #[tokio::test]
1014    async fn test_transport_switching() -> Result<()> {
1015        let mut manager = create_test_transport_manager();
1016        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
1017        manager.register_transport(transport);
1018
1019        let peer_id = manager
1020            .connect("127.0.0.1:9009".parse::<NetworkAddress>().map_err(|e| {
1021                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1022            })?)
1023            .await?;
1024
1025        // Transport switching is not fully implemented, but should not error
1026        let result = manager
1027            .switch_transport(&peer_id, TransportType::QUIC)
1028            .await;
1029        assert!(result.is_ok());
1030
1031        Ok(())
1032    }
1033
1034    #[tokio::test]
1035    async fn test_auto_transport_selection_quic() -> Result<()> {
1036        let mut manager = create_test_transport_manager();
1037        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1038
1039        manager.register_transport(quic_transport);
1040
1041        let addr = "127.0.0.1:9010".parse::<NetworkAddress>().map_err(|e| {
1042            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1043        })?;
1044        let selected = manager.auto_select_transport(&addr).await?;
1045
1046        // Should use QUIC when available
1047        assert_eq!(selected, TransportType::QUIC);
1048
1049        Ok(())
1050    }
1051
1052    #[tokio::test]
1053    async fn test_transport_selection_no_quic() -> Result<()> {
1054        let manager = create_test_transport_manager();
1055        // No transports registered
1056
1057        let addr = "127.0.0.1:9011".parse::<NetworkAddress>().map_err(|e| {
1058            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1059        })?;
1060        let selected = manager.auto_select_transport(&addr).await;
1061
1062        // Should fail when QUIC not available
1063        assert!(selected.is_err());
1064
1065        Ok(())
1066    }
1067
1068    #[tokio::test]
1069    async fn test_transport_selection_no_suitable_transport() -> Result<()> {
1070        let manager = create_test_transport_manager();
1071        let addr = "127.0.0.1:9012".parse::<NetworkAddress>().map_err(|e| {
1072            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1073        })?;
1074
1075        let result = manager.auto_select_transport(&addr).await;
1076        assert!(result.is_err());
1077        assert!(
1078            result
1079                .unwrap_err()
1080                .to_string()
1081                .contains("QUIC transport not available")
1082        );
1083        Ok(())
1084    }
1085
1086    #[tokio::test]
1087    async fn test_quic_transport_selection() -> Result<()> {
1088        let mut manager =
1089            TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1090        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1091
1092        manager.register_transport(quic_transport);
1093
1094        let addr = "127.0.0.1:9013".parse::<NetworkAddress>().map_err(|e| {
1095            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1096        })?;
1097        let selected = manager.select_transport(&addr).await?;
1098
1099        assert_eq!(selected, TransportType::QUIC);
1100
1101        Ok(())
1102    }
1103
1104    #[tokio::test]
1105    async fn test_quic_transport_unavailable() -> Result<()> {
1106        let manager = TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1107
1108        let addr = "127.0.0.1:9014".parse::<NetworkAddress>().map_err(|e| {
1109            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1110        })?;
1111        let result = manager.select_transport(&addr).await;
1112
1113        assert!(result.is_err());
1114        assert!(
1115            result
1116                .unwrap_err()
1117                .to_string()
1118                .contains("QUIC transport not available")
1119        );
1120        Ok(())
1121    }
1122
1123    #[tokio::test]
1124    async fn test_quic_transport_with_registration() -> Result<()> {
1125        let mut manager =
1126            TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1127        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1128
1129        manager.register_transport(quic_transport);
1130
1131        let addr = "127.0.0.1:9015".parse::<NetworkAddress>().map_err(|e| {
1132            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1133        })?;
1134        let selected = manager.select_transport(&addr).await?;
1135
1136        // Should use QUIC when available
1137        assert_eq!(selected, TransportType::QUIC);
1138
1139        Ok(())
1140    }
1141
1142    #[tokio::test]
1143    async fn test_mock_connection_lifecycle() -> Result<()> {
1144        let mut conn =
1145            MockConnection::new("127.0.0.1:9016".parse::<NetworkAddress>().map_err(|e| {
1146                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1147            })?);
1148
1149        assert!(conn.is_alive().await);
1150
1151        // Test sending
1152        conn.send(b"test message").await?;
1153        assert_eq!(conn.bytes_sent.load(Ordering::Relaxed), 12);
1154
1155        // Test receiving
1156        let received = conn.receive().await?;
1157        assert_eq!(received, b"mock_response");
1158        assert_eq!(conn.bytes_received.load(Ordering::Relaxed), 13);
1159
1160        // Test connection info
1161        let info = conn.info().await;
1162        assert_eq!(info.transport_type, TransportType::QUIC);
1163        assert!(info.is_encrypted);
1164
1165        // Test quality measurement
1166        let quality = conn.measure_quality().await?;
1167        assert_eq!(quality.latency, Duration::from_millis(10));
1168
1169        // Test close
1170        conn.close().await?;
1171        assert!(!conn.is_alive().await);
1172
1173        // Operations should fail after close
1174        let result = conn.send(b"test").await;
1175        assert!(result.is_err());
1176
1177        Ok(())
1178    }
1179
1180    #[tokio::test]
1181    async fn test_connection_pool_max_connections() -> Result<()> {
1182        let mut pool = ConnectionPool::new(2); // Max 2 connections
1183
1184        // Add first connection
1185        let conn1 = Box::new(MockConnection::new(
1186            "127.0.0.1:9017".parse::<NetworkAddress>().map_err(|e| {
1187                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1188            })?,
1189        ));
1190        pool.add_connection(conn1).await?;
1191        assert_eq!(pool.connections.len(), 1);
1192
1193        // Add second connection
1194        let conn2 = Box::new(MockConnection::new(
1195            "127.0.0.1:9018".parse::<NetworkAddress>().map_err(|e| {
1196                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1197            })?,
1198        ));
1199        pool.add_connection(conn2).await?;
1200        assert_eq!(pool.connections.len(), 2);
1201
1202        // Add third connection (should remove first)
1203        let conn3 = Box::new(MockConnection::new(
1204            "127.0.0.1:9019".parse::<NetworkAddress>().map_err(|e| {
1205                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1206            })?,
1207        ));
1208        pool.add_connection(conn3).await?;
1209        assert_eq!(pool.connections.len(), 2);
1210
1211        Ok(())
1212    }
1213
1214    #[tokio::test]
1215    async fn test_connection_pool_round_robin() -> Result<()> {
1216        let mut pool = ConnectionPool::new(3);
1217
1218        // Add connections
1219        for i in 0..3 {
1220            let conn = Box::new(MockConnection::new(
1221                format!("127.0.0.1:{}", 9020 + i)
1222                    .parse()
1223                    .expect("Test address should be valid"),
1224            ));
1225            pool.add_connection(conn).await?;
1226        }
1227
1228        // Test round-robin selection
1229        let conn1 = pool.get_connection()?;
1230        let conn2 = pool.get_connection()?;
1231        let conn3 = pool.get_connection()?;
1232        let conn4 = pool.get_connection()?; // Should wrap around
1233
1234        // All connections should be different (until wraparound)
1235        assert_ne!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn2));
1236        assert_ne!(Arc::as_ptr(&conn2), Arc::as_ptr(&conn3));
1237        // Fourth should be same as first (round-robin)
1238        assert_eq!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn4));
1239
1240        Ok(())
1241    }
1242
1243    #[tokio::test]
1244    async fn test_connection_pool_empty() {
1245        let mut pool = ConnectionPool::new(3);
1246        let result = pool.get_connection();
1247
1248        assert!(result.is_err());
1249        if let Err(e) = result {
1250            assert!(e.to_string().contains("No connections available"));
1251        }
1252    }
1253
1254    #[tokio::test]
1255    async fn test_transport_message_structure() {
1256        let message = TransportMessage {
1257            sender: "test_peer".to_string(),
1258            data: vec![1, 2, 3, 4],
1259            protocol: "/p2p/test/1.0.0".to_string(),
1260            received_at: Instant::now(),
1261        };
1262
1263        assert_eq!(message.sender, "test_peer");
1264        assert_eq!(message.data, vec![1, 2, 3, 4]);
1265        assert_eq!(message.protocol, "/p2p/test/1.0.0");
1266    }
1267
1268    #[tokio::test]
1269    async fn test_mock_transport_address_support() -> Result<()> {
1270        let transport = MockTransport::new(TransportType::QUIC);
1271
1272        let addr1 = "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
1273            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1274        })?;
1275        let addr2 = "[::1]:9000".parse::<NetworkAddress>().map_err(|e| {
1276            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1277        })?;
1278
1279        assert!(transport.supports_address(&addr1)); // IPv4 supported
1280        assert!(transport.supports_address(&addr2)); // IPv6 supported
1281        assert!(transport.supports_address(&addr1)); // IPv4 supported
1282
1283        let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1284        assert!(limited_transport.supports_address(&addr1)); // IPv4 supported
1285        assert!(limited_transport.supports_address(&addr2)); // IPv6 supported
1286        Ok(())
1287    }
1288
1289    #[tokio::test]
1290    async fn test_mock_transport_supported_addresses() -> Result<()> {
1291        let transport = MockTransport::new(TransportType::QUIC);
1292        let supports_ipv6 = transport.supports_ipv6();
1293
1294        // Dual-stack supported now
1295        assert!(supports_ipv6);
1296
1297        let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1298        let limited_supports_ipv6 = limited_transport.supports_ipv6();
1299
1300        // All transports support IPv6 now
1301        assert!(limited_supports_ipv6);
1302        Ok(())
1303    }
1304
1305    #[tokio::test]
1306    async fn test_transport_options_configuration() -> Result<()> {
1307        let options = TransportOptions {
1308            enable_0rtt: false,
1309            require_encryption: false,
1310            connect_timeout: Duration::from_secs(10),
1311            keep_alive: Duration::from_secs(30),
1312            max_message_size: 1024,
1313        };
1314
1315        assert!(!options.enable_0rtt);
1316        assert!(!options.require_encryption);
1317        assert_eq!(options.connect_timeout, Duration::from_secs(10));
1318        assert_eq!(options.keep_alive, Duration::from_secs(30));
1319        assert_eq!(options.max_message_size, 1024);
1320        Ok(())
1321    }
1322}