saorsa_core/
transport.rs

1// Copyright 2024 Saorsa Labs Limited
2//
3// This software is dual-licensed under:
4// - GNU Affero General Public License v3.0 or later (AGPL-3.0-or-later)
5// - Commercial License
6//
7// For AGPL-3.0 license, see LICENSE-AGPL-3.0
8// For commercial licensing, contact: saorsalabs@gmail.com
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under these licenses is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
14//! Transport Layer
15//!
16//! This module provides native ant-quic integration for the P2P Foundation.
17//!
18//! ## Migration Notice
19//! The old Transport/Connection trait abstractions have been deprecated
20//! in favor of direct ant-quic integration via P2PNetworkNode.
21//!
22//! Use `ant_quic_adapter::P2PNetworkNode` directly for all networking needs.
23
24// ant-quic is used directly via ant_quic_adapter module
25
26// Native ant-quic integration with advanced NAT traversal and PQC support
27pub mod ant_quic_adapter;
28
29// Tests for old QuicTransport - removed during ant-quic migration
30// #[cfg(test)]
31// mod quic_error_tests;
32
33use crate::validation::{Validate, ValidationContext, validate_message_size, validate_peer_id};
34use crate::{NetworkAddress, P2PError, PeerId, Result};
35use async_trait::async_trait;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{Mutex, RwLock};
42use tracing::{debug, info, warn};
43
44/// Transport protocol types
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub enum TransportType {
47    /// QUIC transport protocol with NAT traversal
48    QUIC,
49}
50
51/// Transport selection strategy (simplified for QUIC-only)
52#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
53pub enum TransportSelection {
54    /// Use QUIC transport (default and only option)
55    #[default]
56    QUIC,
57}
58
59/// Connection quality metrics
60#[derive(Debug, Clone)]
61pub struct ConnectionQuality {
62    /// Round-trip latency
63    pub latency: Duration,
64    /// Throughput in Mbps
65    pub throughput_mbps: f64,
66    /// Packet loss percentage
67    pub packet_loss: f64,
68    /// Jitter (latency variation)
69    pub jitter: Duration,
70    /// Connection establishment time
71    pub connect_time: Duration,
72}
73
74/// Connection information
75#[derive(Debug, Clone)]
76pub struct ConnectionInfo {
77    /// Transport type being used
78    pub transport_type: TransportType,
79    /// Local address
80    pub local_addr: NetworkAddress,
81    /// Remote address
82    pub remote_addr: NetworkAddress,
83    /// Whether connection is encrypted
84    pub is_encrypted: bool,
85    /// Cipher suite being used
86    pub cipher_suite: String,
87    /// Whether 0-RTT was used
88    pub used_0rtt: bool,
89    /// Connection establishment time
90    pub established_at: Instant,
91    /// Last activity timestamp
92    pub last_activity: Instant,
93}
94
95/// Connection pool information
96#[derive(Debug, Clone)]
97pub struct ConnectionPoolInfo {
98    /// Number of active connections
99    pub active_connections: usize,
100    /// Total connections ever created
101    pub total_connections: usize,
102    /// Bytes sent through pool
103    pub bytes_sent: u64,
104    /// Bytes received through pool
105    pub bytes_received: u64,
106}
107
108/// Connection pool statistics
109#[derive(Debug, Clone)]
110pub struct ConnectionPoolStats {
111    /// Messages sent per connection
112    pub messages_per_connection: HashMap<String, usize>,
113    /// Bytes per connection
114    pub bytes_per_connection: HashMap<String, u64>,
115    /// Average latency per connection
116    pub latency_per_connection: HashMap<String, Duration>,
117}
118
119/// Message received from transport
120#[derive(Debug, Clone)]
121pub struct TransportMessage {
122    /// Sender peer ID
123    pub sender: PeerId,
124    /// Message data
125    pub data: Vec<u8>,
126    /// Protocol identifier
127    pub protocol: String,
128    /// Timestamp when received
129    pub received_at: Instant,
130}
131
132impl Validate for TransportMessage {
133    fn validate(&self, ctx: &ValidationContext) -> Result<()> {
134        // Validate sender peer ID
135        validate_peer_id(&self.sender)?;
136
137        // Validate message size
138        validate_message_size(self.data.len(), ctx.max_message_size)?;
139
140        // Validate protocol identifier
141        if self.protocol.is_empty() || self.protocol.len() > 64 {
142            return Err(P2PError::validation("Invalid protocol identifier"));
143        }
144
145        Ok(())
146    }
147}
148
149/// Transport trait for protocol implementations
150#[allow(dead_code)] // Deprecated during ant-quic migration
151#[async_trait]
152pub trait Transport: Send + Sync {
153    /// Start listening on the given address
154    async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress>;
155
156    /// Accept incoming connections (for server-side)
157    async fn accept(&self) -> Result<Box<dyn Connection>>;
158
159    /// Connect to a remote peer
160    async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>>;
161
162    /// Connect with specific transport options
163    async fn connect_with_options(
164        &self,
165        addr: NetworkAddress,
166        options: TransportOptions,
167    ) -> Result<Box<dyn Connection>>;
168
169    /// Check if this transport supports IPv6 (deprecated - IPv4-only focus)
170    fn supports_ipv6(&self) -> bool;
171
172    /// Get transport type
173    fn transport_type(&self) -> TransportType;
174
175    /// Check if address is supported
176    fn supports_address(&self, addr: &NetworkAddress) -> bool;
177}
178
179/// Connection trait for active connections
180#[allow(dead_code)] // Deprecated during ant-quic migration
181#[async_trait]
182pub trait Connection: Send + Sync {
183    /// Send data over the connection
184    async fn send(&mut self, data: &[u8]) -> Result<()>;
185
186    /// Receive data from the connection
187    async fn receive(&mut self) -> Result<Vec<u8>>;
188
189    /// Get connection info
190    async fn info(&self) -> ConnectionInfo;
191
192    /// Close the connection
193    async fn close(&mut self) -> Result<()>;
194
195    /// Check if connection is alive
196    async fn is_alive(&self) -> bool;
197
198    /// Measure connection quality
199    async fn measure_quality(&self) -> Result<ConnectionQuality>;
200
201    /// Get local address
202    fn local_addr(&self) -> NetworkAddress;
203
204    /// Get remote address
205    fn remote_addr(&self) -> NetworkAddress;
206}
207
208/// Transport configuration options
209#[derive(Debug, Clone)]
210pub struct TransportOptions {
211    /// Enable 0-RTT for QUIC
212    pub enable_0rtt: bool,
213    /// Force encryption
214    pub require_encryption: bool,
215    /// Connection timeout
216    pub connect_timeout: Duration,
217    /// Keep-alive interval
218    pub keep_alive: Duration,
219    /// Maximum message size
220    pub max_message_size: usize,
221}
222
223/// Transport manager coordinates different transport protocols
224#[allow(dead_code)] // Deprecated during ant-quic migration
225pub struct TransportManager {
226    /// Available transports
227    transports: HashMap<TransportType, Arc<dyn Transport>>,
228    /// Active connections
229    connections: Arc<RwLock<HashMap<PeerId, Arc<Mutex<ConnectionPool>>>>>,
230    /// Transport selection strategy
231    selection: TransportSelection,
232    /// Configuration options
233    options: TransportOptions,
234}
235
236/// Connection pool for a specific peer
237struct ConnectionPool {
238    /// Active connections
239    connections: Vec<Arc<Mutex<Box<dyn Connection>>>>,
240    /// Connection info cache (reserved for future use)
241    _info_cache: HashMap<String, ConnectionInfo>,
242    /// Pool statistics
243    stats: ConnectionPoolStats,
244    /// Pool configuration
245    max_connections: usize,
246    /// Round-robin index for load balancing
247    round_robin_index: usize,
248}
249
250impl TransportManager {
251    /// Create a new transport manager
252    pub fn new(selection: TransportSelection, options: TransportOptions) -> Self {
253        Self {
254            transports: HashMap::new(),
255            connections: Arc::new(RwLock::new(HashMap::new())),
256            selection,
257            options,
258        }
259    }
260
261    /// Register a transport implementation
262    pub fn register_transport(&mut self, transport: Arc<dyn Transport>) {
263        let transport_type = transport.transport_type();
264        self.transports.insert(transport_type, transport);
265        info!("Registered transport: {:?}", transport_type);
266    }
267
268    /// Connect to a peer using the best available transport
269    pub async fn connect(&self, addr: NetworkAddress) -> Result<PeerId> {
270        let transport_type = self.select_transport(&addr).await?;
271        let transport = self.transports.get(&transport_type).ok_or_else(|| {
272            P2PError::Transport(crate::error::TransportError::SetupFailed(
273                format!("Transport {transport_type:?} not available").into(),
274            ))
275        })?;
276
277        debug!("Connecting to {} using {:?}", addr, transport_type);
278
279        let connection = transport
280            .connect_with_options(addr.clone(), self.options.clone())
281            .await?;
282        let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port()); // Simplified peer ID
283
284        // Add to connection pool
285        self.add_connection(peer_id.clone(), connection).await?;
286
287        info!("Connected to peer {} via {:?}", peer_id, transport_type);
288        Ok(peer_id)
289    }
290
291    /// Connect with specific transport
292    pub async fn connect_with_transport(
293        &self,
294        addr: NetworkAddress,
295        transport_type: TransportType,
296    ) -> Result<PeerId> {
297        let transport = self.transports.get(&transport_type).ok_or_else(|| {
298            P2PError::Transport(crate::error::TransportError::SetupFailed(
299                format!("Transport {transport_type:?} not available").into(),
300            ))
301        })?;
302
303        let connection = transport
304            .connect_with_options(addr.clone(), self.options.clone())
305            .await?;
306        let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port());
307
308        self.add_connection(peer_id.clone(), connection).await?;
309        Ok(peer_id)
310    }
311
312    /// Send message to a peer
313    pub async fn send_message(&self, peer_id: &PeerId, data: Vec<u8>) -> Result<()> {
314        let connections = self.connections.read().await;
315        let pool = connections.get(peer_id).ok_or_else(|| {
316            P2PError::Network(crate::error::NetworkError::PeerNotFound(
317                peer_id.to_string().into(),
318            ))
319        })?;
320
321        let mut pool_guard = pool.lock().await;
322        let connection = pool_guard.get_connection()?;
323
324        let mut conn_guard = connection.lock().await;
325        conn_guard.send(&data).await?;
326
327        debug!("Sent {} bytes to peer {}", data.len(), peer_id);
328        Ok(())
329    }
330
331    /// Get connection info for a peer
332    pub async fn get_connection_info(&self, peer_id: &PeerId) -> Result<ConnectionInfo> {
333        let connections = self.connections.read().await;
334        let pool = connections.get(peer_id).ok_or_else(|| {
335            P2PError::Network(crate::error::NetworkError::PeerNotFound(
336                peer_id.to_string().into(),
337            ))
338        })?;
339
340        let mut pool_guard = pool.lock().await;
341        let connection = pool_guard.get_connection()?;
342        let conn_guard = connection.lock().await;
343
344        Ok(conn_guard.info().await)
345    }
346
347    /// Get connection pool info
348    pub async fn get_connection_pool_info(&self, peer_id: &PeerId) -> Result<ConnectionPoolInfo> {
349        let connections = self.connections.read().await;
350        let pool = connections.get(peer_id).ok_or_else(|| {
351            P2PError::Network(crate::error::NetworkError::PeerNotFound(
352                peer_id.to_string().into(),
353            ))
354        })?;
355
356        let pool_guard = pool.lock().await;
357        Ok(ConnectionPoolInfo {
358            active_connections: pool_guard.connections.len(),
359            total_connections: pool_guard.stats.messages_per_connection.len(),
360            bytes_sent: pool_guard.stats.bytes_per_connection.values().sum(),
361            bytes_received: 0, // TODO: Track separately
362        })
363    }
364
365    /// Get connection pool statistics
366    pub async fn get_connection_pool_stats(&self, peer_id: &PeerId) -> Result<ConnectionPoolStats> {
367        let connections = self.connections.read().await;
368        let pool = connections.get(peer_id).ok_or_else(|| {
369            P2PError::Network(crate::error::NetworkError::PeerNotFound(
370                peer_id.to_string().into(),
371            ))
372        })?;
373
374        let pool_guard = pool.lock().await;
375        Ok(pool_guard.stats.clone())
376    }
377
378    /// Measure connection quality
379    pub async fn measure_connection_quality(&self, peer_id: &PeerId) -> Result<ConnectionQuality> {
380        let connections = self.connections.read().await;
381        let pool = connections.get(peer_id).ok_or_else(|| {
382            P2PError::Network(crate::error::NetworkError::PeerNotFound(
383                peer_id.to_string().into(),
384            ))
385        })?;
386
387        let mut pool_guard = pool.lock().await;
388        let connection = pool_guard.get_connection()?;
389        let conn_guard = connection.lock().await;
390
391        conn_guard.measure_quality().await
392    }
393
394    /// Switch transport for a peer
395    pub async fn switch_transport(
396        &self,
397        peer_id: &PeerId,
398        _new_transport: TransportType,
399    ) -> Result<()> {
400        // This is a placeholder implementation
401        // In reality, this would establish a new connection with the new transport
402        // and gracefully migrate the existing connection
403
404        warn!(
405            "Transport switching not yet fully implemented for peer {}",
406            peer_id
407        );
408        Ok(())
409    }
410
411    /// Select transport for an address (always QUIC)
412    async fn select_transport(&self, _addr: &NetworkAddress) -> Result<TransportType> {
413        match &self.selection {
414            TransportSelection::QUIC => {
415                if self.transports.contains_key(&TransportType::QUIC) {
416                    Ok(TransportType::QUIC)
417                } else {
418                    Err(P2PError::Transport(
419                        crate::error::TransportError::SetupFailed(
420                            "QUIC transport not available".into(),
421                        ),
422                    ))
423                }
424            }
425        }
426    }
427
428    /// Auto-select transport (always QUIC in this implementation)
429    #[allow(dead_code)]
430    async fn auto_select_transport(&self, addr: &NetworkAddress) -> Result<TransportType> {
431        // Always use QUIC as it's the only transport protocol
432        if self.transports.contains_key(&TransportType::QUIC)
433            && let Some(transport) = self.transports.get(&TransportType::QUIC)
434            && transport.supports_address(addr)
435        {
436            debug!(
437                "Using QUIC transport for {} (only available transport)",
438                addr
439            );
440            return Ok(TransportType::QUIC);
441        }
442
443        Err(P2PError::Transport(
444            crate::error::TransportError::SetupFailed(
445                "QUIC transport not available or address not supported"
446                    .to_string()
447                    .into(),
448            ),
449        ))
450    }
451
452    /// Add connection to pool
453    async fn add_connection(&self, peer_id: PeerId, connection: Box<dyn Connection>) -> Result<()> {
454        let mut connections = self.connections.write().await;
455
456        let pool = connections.entry(peer_id.clone()).or_insert_with(|| {
457            Arc::new(Mutex::new(ConnectionPool::new(3))) // Default max 3 connections per peer
458        });
459
460        let mut pool_guard = pool.lock().await;
461        pool_guard.add_connection(connection).await?;
462
463        Ok(())
464    }
465}
466
467impl ConnectionPool {
468    /// Create a new connection pool
469    fn new(max_connections: usize) -> Self {
470        Self {
471            connections: Vec::new(),
472            _info_cache: HashMap::new(),
473            stats: ConnectionPoolStats {
474                messages_per_connection: HashMap::new(),
475                bytes_per_connection: HashMap::new(),
476                latency_per_connection: HashMap::new(),
477            },
478            max_connections,
479            round_robin_index: 0,
480        }
481    }
482
483    /// Add a connection to the pool
484    async fn add_connection(&mut self, connection: Box<dyn Connection>) -> Result<()> {
485        if self.connections.len() >= self.max_connections {
486            // Remove oldest connection
487            self.connections.remove(0);
488        }
489
490        let conn_id = format!("conn_{}", self.connections.len());
491        self.stats
492            .messages_per_connection
493            .insert(conn_id.clone(), 0);
494        self.stats.bytes_per_connection.insert(conn_id.clone(), 0);
495        self.stats
496            .latency_per_connection
497            .insert(conn_id, Duration::from_millis(0));
498
499        self.connections.push(Arc::new(Mutex::new(connection)));
500        Ok(())
501    }
502
503    /// Get a connection using round-robin load balancing
504    fn get_connection(&mut self) -> Result<Arc<Mutex<Box<dyn Connection>>>> {
505        if self.connections.is_empty() {
506            return Err(P2PError::Network(
507                crate::error::NetworkError::ProtocolError(
508                    "No connections available".to_string().into(),
509                ),
510            ));
511        }
512
513        let connection = self.connections[self.round_robin_index % self.connections.len()].clone();
514        self.round_robin_index += 1;
515
516        // Update stats
517        let conn_id = format!("conn_{}", self.round_robin_index % self.connections.len());
518        if let Some(count) = self.stats.messages_per_connection.get_mut(&conn_id) {
519            *count += 1;
520        }
521
522        Ok(connection)
523    }
524}
525
526impl fmt::Display for TransportType {
527    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528        match self {
529            TransportType::QUIC => write!(f, "quic"),
530        }
531    }
532}
533
534impl Default for TransportOptions {
535    fn default() -> Self {
536        Self {
537            enable_0rtt: true,
538            require_encryption: true,
539            connect_timeout: Duration::from_secs(30),
540            keep_alive: Duration::from_secs(60),
541            max_message_size: 64 * 1024 * 1024, // 64MB
542        }
543    }
544}
545
546impl Default for ConnectionQuality {
547    fn default() -> Self {
548        Self {
549            latency: Duration::from_millis(50),
550            throughput_mbps: 100.0,
551            packet_loss: 0.0,
552            jitter: Duration::from_millis(5),
553            connect_time: Duration::from_millis(100),
554        }
555    }
556}
557/// Legacy transport types module for backward compatibility
558pub mod transport_types {
559    pub use super::TransportType;
560}
561
562// Re-export transport implementation
563// pub use quic::QuicTransport; // Disabled during ant-quic migration
564// pub use ant_quic_adapter::AntQuicAdapter; // Available but not needed for now
565
566#[cfg(test)]
567mod tests {
568    use super::*;
569    use crate::error::NetworkError;
570    use async_trait::async_trait;
571    use std::sync::atomic::{AtomicUsize, Ordering};
572    use tokio::time::Duration;
573
574    /// Helper function to parse addresses in tests
575    fn parse_addr(addr: &str) -> Result<NetworkAddress> {
576        addr.parse::<NetworkAddress>().map_err(|e| {
577            P2PError::Network(crate::error::NetworkError::InvalidAddress(
578                e.to_string().into(),
579            ))
580        })
581    }
582
583    /// Mock transport implementation for testing
584    struct MockTransport {
585        transport_type: TransportType,
586        should_fail: bool,
587        supports_all: bool,
588    }
589
590    impl MockTransport {
591        fn new(transport_type: TransportType) -> Self {
592            Self {
593                transport_type,
594                should_fail: false,
595                supports_all: true,
596            }
597        }
598
599        fn with_failure(mut self) -> Self {
600            self.should_fail = true;
601            self
602        }
603
604        fn with_limited_support(mut self) -> Self {
605            self.supports_all = false;
606            self
607        }
608    }
609
610    #[async_trait]
611    impl Transport for MockTransport {
612        async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress> {
613            if self.should_fail {
614                return Err(P2PError::Transport(
615                    crate::error::TransportError::SetupFailed("Listen failed".to_string().into()),
616                ));
617            }
618            Ok(addr)
619        }
620
621        async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>> {
622            if self.should_fail {
623                return Err(P2PError::Transport(
624                    crate::error::TransportError::SetupFailed(
625                        "Connection failed".to_string().into(),
626                    ),
627                ));
628            }
629            Ok(Box::new(MockConnection::new(addr)))
630        }
631
632        async fn connect_with_options(
633            &self,
634            addr: NetworkAddress,
635            _options: TransportOptions,
636        ) -> Result<Box<dyn Connection>> {
637            self.connect(addr).await
638        }
639
640        async fn accept(&self) -> Result<Box<dyn Connection>> {
641            if self.should_fail {
642                return Err(P2PError::Transport(
643                    crate::error::TransportError::SetupFailed("Accept failed".into()),
644                ));
645            }
646            Ok(Box::new(MockConnection::new(
647                "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
648                    crate::error::TransportError::SetupFailed(
649                        format!("Invalid mock address: {}", e).into(),
650                    )
651                })?,
652            )))
653        }
654
655        fn supports_ipv6(&self) -> bool {
656            true // Dual-stack support enabled
657        }
658
659        fn transport_type(&self) -> TransportType {
660            self.transport_type
661        }
662
663        fn supports_address(&self, addr: &NetworkAddress) -> bool {
664            // Support both IPv4 and IPv6
665            addr.is_ipv4() || addr.is_ipv6()
666        }
667    }
668
669    /// Mock connection implementation for testing
670    struct MockConnection {
671        remote_addr: NetworkAddress,
672        is_alive: bool,
673        bytes_sent: AtomicUsize,
674        bytes_received: AtomicUsize,
675    }
676
677    impl MockConnection {
678        fn new(remote_addr: NetworkAddress) -> Self {
679            Self {
680                remote_addr,
681                is_alive: true,
682                bytes_sent: AtomicUsize::new(0),
683                bytes_received: AtomicUsize::new(0),
684            }
685        }
686    }
687
688    #[async_trait]
689    impl Connection for MockConnection {
690        async fn send(&mut self, data: &[u8]) -> Result<()> {
691            if !self.is_alive {
692                return Err(P2PError::Network(
693                    crate::error::NetworkError::PeerDisconnected {
694                        peer: "unknown".to_string(),
695                        reason: "Connection closed".to_string(),
696                    },
697                ));
698            }
699            self.bytes_sent.fetch_add(data.len(), Ordering::Relaxed);
700            Ok(())
701        }
702
703        async fn receive(&mut self) -> Result<Vec<u8>> {
704            if !self.is_alive {
705                return Err(P2PError::Network(
706                    crate::error::NetworkError::PeerDisconnected {
707                        peer: "unknown".to_string(),
708                        reason: "Connection closed".to_string(),
709                    },
710                ));
711            }
712            let data = b"mock_response".to_vec();
713            self.bytes_received.fetch_add(data.len(), Ordering::Relaxed);
714            Ok(data)
715        }
716
717        async fn info(&self) -> ConnectionInfo {
718            ConnectionInfo {
719                transport_type: TransportType::QUIC,
720                local_addr: "127.0.0.1:9000"
721                    .parse::<NetworkAddress>()
722                    .expect("Test address should be valid"),
723                remote_addr: self.remote_addr.clone(),
724                is_encrypted: true,
725                cipher_suite: "TLS_AES_256_GCM_SHA384".to_string(),
726                used_0rtt: false,
727                established_at: Instant::now(),
728                last_activity: Instant::now(),
729            }
730        }
731
732        async fn close(&mut self) -> Result<()> {
733            self.is_alive = false;
734            Ok(())
735        }
736
737        async fn is_alive(&self) -> bool {
738            self.is_alive
739        }
740
741        async fn measure_quality(&self) -> Result<ConnectionQuality> {
742            Ok(ConnectionQuality {
743                latency: Duration::from_millis(10),
744                throughput_mbps: 1000.0,
745                packet_loss: 0.1,
746                jitter: Duration::from_millis(2),
747                connect_time: Duration::from_millis(50),
748            })
749        }
750
751        fn local_addr(&self) -> NetworkAddress {
752            "127.0.0.1:9000"
753                .parse::<NetworkAddress>()
754                .expect("Test address should be valid")
755        }
756
757        fn remote_addr(&self) -> NetworkAddress {
758            self.remote_addr.clone()
759        }
760    }
761
762    fn create_test_transport_manager() -> TransportManager {
763        let options = TransportOptions::default();
764        TransportManager::new(TransportSelection::QUIC, options)
765    }
766
767    #[test]
768    fn test_transport_type_display() {
769        assert_eq!(format!("{}", TransportType::QUIC), "quic");
770    }
771
772    #[test]
773    fn test_transport_type_serialization() {
774        let quic_type = TransportType::QUIC;
775
776        assert_eq!(quic_type, TransportType::QUIC);
777    }
778
779    #[test]
780    fn test_transport_selection_variants() {
781        let quic_selection = TransportSelection::QUIC;
782
783        assert!(matches!(quic_selection, TransportSelection::QUIC));
784    }
785
786    #[test]
787    fn test_transport_selection_default() {
788        let default = TransportSelection::default();
789        assert!(matches!(default, TransportSelection::QUIC));
790    }
791
792    #[test]
793    fn test_transport_options_default() {
794        let options = TransportOptions::default();
795
796        assert!(options.enable_0rtt);
797        assert!(options.require_encryption);
798        assert_eq!(options.connect_timeout, Duration::from_secs(30));
799        assert_eq!(options.keep_alive, Duration::from_secs(60));
800        assert_eq!(options.max_message_size, 64 * 1024 * 1024);
801    }
802
803    #[test]
804    fn test_connection_quality_default() {
805        let quality = ConnectionQuality::default();
806
807        assert_eq!(quality.latency, Duration::from_millis(50));
808        assert_eq!(quality.throughput_mbps, 100.0);
809        assert_eq!(quality.packet_loss, 0.0);
810        assert_eq!(quality.jitter, Duration::from_millis(5));
811        assert_eq!(quality.connect_time, Duration::from_millis(100));
812    }
813
814    #[tokio::test]
815    async fn test_transport_manager_creation() {
816        let manager = create_test_transport_manager();
817        assert!(manager.transports.is_empty());
818    }
819
820    #[tokio::test]
821    async fn test_transport_registration() {
822        let mut manager = create_test_transport_manager();
823        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
824
825        manager.register_transport(quic_transport.clone());
826
827        assert_eq!(manager.transports.len(), 1);
828        assert!(manager.transports.contains_key(&TransportType::QUIC));
829    }
830
831    #[tokio::test]
832    async fn test_connection_establishment() -> Result<()> {
833        let mut manager = create_test_transport_manager();
834        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
835        manager.register_transport(transport);
836
837        let peer_id = manager
838            .connect("127.0.0.1:9001".parse::<NetworkAddress>().map_err(|e| {
839                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
840            })?)
841            .await?;
842        assert_eq!(peer_id, "peer_from_127.0.0.1_9001");
843
844        let connections = manager.connections.read().await;
845        assert!(connections.contains_key(&peer_id));
846
847        Ok(())
848    }
849
850    #[tokio::test]
851    async fn test_connection_with_specific_transport() -> Result<()> {
852        let mut manager = create_test_transport_manager();
853        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
854        manager.register_transport(transport);
855
856        let peer_id = manager
857            .connect_with_transport(
858                "127.0.0.1:9002".parse::<NetworkAddress>().map_err(|e| {
859                    P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
860                })?,
861                TransportType::QUIC,
862            )
863            .await?;
864
865        assert_eq!(peer_id, "peer_from_127.0.0.1_9002");
866        Ok(())
867    }
868
869    #[tokio::test]
870    async fn test_connection_failure_handling() -> Result<()> {
871        let mut manager = create_test_transport_manager();
872        let failing_transport = Arc::new(MockTransport::new(TransportType::QUIC).with_failure());
873        manager.register_transport(failing_transport);
874
875        let result = manager
876            .connect("127.0.0.1:9003".parse::<NetworkAddress>().map_err(|e| {
877                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
878            })?)
879            .await;
880        assert!(result.is_err());
881        assert!(
882            result
883                .unwrap_err()
884                .to_string()
885                .contains("Connection failed")
886        );
887        Ok(())
888    }
889
890    #[tokio::test]
891    async fn test_message_sending() -> Result<()> {
892        let mut manager = create_test_transport_manager();
893        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
894        manager.register_transport(transport);
895
896        let peer_id = manager
897            .connect("127.0.0.1:9004".parse::<NetworkAddress>().map_err(|e| {
898                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
899            })?)
900            .await?;
901        let message = b"Hello, transport!".to_vec();
902
903        manager.send_message(&peer_id, message.clone()).await?;
904
905        // Verify message was processed
906        let pool_info = manager.get_connection_pool_info(&peer_id).await?;
907        assert_eq!(pool_info.active_connections, 1);
908
909        Ok(())
910    }
911
912    #[tokio::test]
913    async fn test_message_sending_no_connection() {
914        let manager = create_test_transport_manager();
915        let result = manager
916            .send_message(&"nonexistent_peer".to_string(), vec![1, 2, 3])
917            .await;
918
919        assert!(result.is_err());
920        assert!(
921            result
922                .unwrap_err()
923                .to_string()
924                .contains("No connection to peer")
925        );
926    }
927
928    #[tokio::test]
929    async fn test_connection_info_retrieval() -> Result<()> {
930        let mut manager = create_test_transport_manager();
931        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
932        manager.register_transport(transport);
933
934        let peer_id = manager
935            .connect("127.0.0.1:9005".parse::<NetworkAddress>().map_err(|e| {
936                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
937            })?)
938            .await?;
939        let info = manager.get_connection_info(&peer_id).await?;
940
941        assert_eq!(info.transport_type, TransportType::QUIC);
942        assert_eq!(
943            info.remote_addr,
944            "127.0.0.1:9005"
945                .parse::<NetworkAddress>()
946                .map_err(|e| P2PError::Network(NetworkError::InvalidAddress(
947                    format!("{}", e).into()
948                )))?
949        );
950        assert!(info.is_encrypted);
951        assert_eq!(info.cipher_suite, "TLS_AES_256_GCM_SHA384");
952
953        Ok(())
954    }
955
956    #[tokio::test]
957    async fn test_connection_pool_info() -> Result<()> {
958        let mut manager = create_test_transport_manager();
959        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
960        manager.register_transport(transport);
961
962        let peer_id = manager
963            .connect("127.0.0.1:9006".parse::<NetworkAddress>().map_err(|e| {
964                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
965            })?)
966            .await?;
967        let pool_info = manager.get_connection_pool_info(&peer_id).await?;
968
969        assert_eq!(pool_info.active_connections, 1);
970        assert_eq!(pool_info.total_connections, 1);
971        assert_eq!(pool_info.bytes_sent, 0);
972
973        Ok(())
974    }
975
976    #[tokio::test]
977    async fn test_connection_pool_stats() -> Result<()> {
978        let mut manager = create_test_transport_manager();
979        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
980        manager.register_transport(transport);
981
982        let peer_id = manager
983            .connect("127.0.0.1:9007".parse::<NetworkAddress>().map_err(|e| {
984                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
985            })?)
986            .await?;
987        let stats = manager.get_connection_pool_stats(&peer_id).await?;
988
989        assert_eq!(stats.messages_per_connection.len(), 1);
990        assert_eq!(stats.bytes_per_connection.len(), 1);
991        assert_eq!(stats.latency_per_connection.len(), 1);
992
993        Ok(())
994    }
995
996    #[tokio::test]
997    async fn test_connection_quality_measurement() -> Result<()> {
998        let mut manager = create_test_transport_manager();
999        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
1000        manager.register_transport(transport);
1001
1002        let peer_id = manager
1003            .connect("127.0.0.1:9008".parse::<NetworkAddress>().map_err(|e| {
1004                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1005            })?)
1006            .await?;
1007        let quality = manager.measure_connection_quality(&peer_id).await?;
1008
1009        assert_eq!(quality.latency, Duration::from_millis(10));
1010        assert_eq!(quality.throughput_mbps, 1000.0);
1011        assert_eq!(quality.packet_loss, 0.1);
1012        assert_eq!(quality.jitter, Duration::from_millis(2));
1013
1014        Ok(())
1015    }
1016
1017    #[tokio::test]
1018    async fn test_transport_switching() -> Result<()> {
1019        let mut manager = create_test_transport_manager();
1020        let transport = Arc::new(MockTransport::new(TransportType::QUIC));
1021        manager.register_transport(transport);
1022
1023        let peer_id = manager
1024            .connect("127.0.0.1:9009".parse::<NetworkAddress>().map_err(|e| {
1025                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1026            })?)
1027            .await?;
1028
1029        // Transport switching is not fully implemented, but should not error
1030        let result = manager
1031            .switch_transport(&peer_id, TransportType::QUIC)
1032            .await;
1033        assert!(result.is_ok());
1034
1035        Ok(())
1036    }
1037
1038    #[tokio::test]
1039    async fn test_auto_transport_selection_quic() -> Result<()> {
1040        let mut manager = create_test_transport_manager();
1041        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1042
1043        manager.register_transport(quic_transport);
1044
1045        let addr = "127.0.0.1:9010".parse::<NetworkAddress>().map_err(|e| {
1046            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1047        })?;
1048        let selected = manager.auto_select_transport(&addr).await?;
1049
1050        // Should use QUIC when available
1051        assert_eq!(selected, TransportType::QUIC);
1052
1053        Ok(())
1054    }
1055
1056    #[tokio::test]
1057    async fn test_transport_selection_no_quic() -> Result<()> {
1058        let manager = create_test_transport_manager();
1059        // No transports registered
1060
1061        let addr = "127.0.0.1:9011".parse::<NetworkAddress>().map_err(|e| {
1062            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1063        })?;
1064        let selected = manager.auto_select_transport(&addr).await;
1065
1066        // Should fail when QUIC not available
1067        assert!(selected.is_err());
1068
1069        Ok(())
1070    }
1071
1072    #[tokio::test]
1073    async fn test_transport_selection_no_suitable_transport() -> Result<()> {
1074        let manager = create_test_transport_manager();
1075        let addr = "127.0.0.1:9012".parse::<NetworkAddress>().map_err(|e| {
1076            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1077        })?;
1078
1079        let result = manager.auto_select_transport(&addr).await;
1080        assert!(result.is_err());
1081        assert!(
1082            result
1083                .unwrap_err()
1084                .to_string()
1085                .contains("QUIC transport not available")
1086        );
1087        Ok(())
1088    }
1089
1090    #[tokio::test]
1091    async fn test_quic_transport_selection() -> Result<()> {
1092        let mut manager =
1093            TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1094        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1095
1096        manager.register_transport(quic_transport);
1097
1098        let addr = "127.0.0.1:9013".parse::<NetworkAddress>().map_err(|e| {
1099            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1100        })?;
1101        let selected = manager.select_transport(&addr).await?;
1102
1103        assert_eq!(selected, TransportType::QUIC);
1104
1105        Ok(())
1106    }
1107
1108    #[tokio::test]
1109    async fn test_quic_transport_unavailable() -> Result<()> {
1110        let manager = TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1111
1112        let addr = "127.0.0.1:9014".parse::<NetworkAddress>().map_err(|e| {
1113            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1114        })?;
1115        let result = manager.select_transport(&addr).await;
1116
1117        assert!(result.is_err());
1118        assert!(
1119            result
1120                .unwrap_err()
1121                .to_string()
1122                .contains("QUIC transport not available")
1123        );
1124        Ok(())
1125    }
1126
1127    #[tokio::test]
1128    async fn test_quic_transport_with_registration() -> Result<()> {
1129        let mut manager =
1130            TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1131        let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1132
1133        manager.register_transport(quic_transport);
1134
1135        let addr = "127.0.0.1:9015".parse::<NetworkAddress>().map_err(|e| {
1136            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1137        })?;
1138        let selected = manager.select_transport(&addr).await?;
1139
1140        // Should use QUIC when available
1141        assert_eq!(selected, TransportType::QUIC);
1142
1143        Ok(())
1144    }
1145
1146    #[tokio::test]
1147    async fn test_mock_connection_lifecycle() -> Result<()> {
1148        let mut conn =
1149            MockConnection::new("127.0.0.1:9016".parse::<NetworkAddress>().map_err(|e| {
1150                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1151            })?);
1152
1153        assert!(conn.is_alive().await);
1154
1155        // Test sending
1156        conn.send(b"test message").await?;
1157        assert_eq!(conn.bytes_sent.load(Ordering::Relaxed), 12);
1158
1159        // Test receiving
1160        let received = conn.receive().await?;
1161        assert_eq!(received, b"mock_response");
1162        assert_eq!(conn.bytes_received.load(Ordering::Relaxed), 13);
1163
1164        // Test connection info
1165        let info = conn.info().await;
1166        assert_eq!(info.transport_type, TransportType::QUIC);
1167        assert!(info.is_encrypted);
1168
1169        // Test quality measurement
1170        let quality = conn.measure_quality().await?;
1171        assert_eq!(quality.latency, Duration::from_millis(10));
1172
1173        // Test close
1174        conn.close().await?;
1175        assert!(!conn.is_alive().await);
1176
1177        // Operations should fail after close
1178        let result = conn.send(b"test").await;
1179        assert!(result.is_err());
1180
1181        Ok(())
1182    }
1183
1184    #[tokio::test]
1185    async fn test_connection_pool_max_connections() -> Result<()> {
1186        let mut pool = ConnectionPool::new(2); // Max 2 connections
1187
1188        // Add first connection
1189        let conn1 = Box::new(MockConnection::new(
1190            "127.0.0.1:9017".parse::<NetworkAddress>().map_err(|e| {
1191                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1192            })?,
1193        ));
1194        pool.add_connection(conn1).await?;
1195        assert_eq!(pool.connections.len(), 1);
1196
1197        // Add second connection
1198        let conn2 = Box::new(MockConnection::new(
1199            "127.0.0.1:9018".parse::<NetworkAddress>().map_err(|e| {
1200                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1201            })?,
1202        ));
1203        pool.add_connection(conn2).await?;
1204        assert_eq!(pool.connections.len(), 2);
1205
1206        // Add third connection (should remove first)
1207        let conn3 = Box::new(MockConnection::new(
1208            "127.0.0.1:9019".parse::<NetworkAddress>().map_err(|e| {
1209                P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1210            })?,
1211        ));
1212        pool.add_connection(conn3).await?;
1213        assert_eq!(pool.connections.len(), 2);
1214
1215        Ok(())
1216    }
1217
1218    #[tokio::test]
1219    async fn test_connection_pool_round_robin() -> Result<()> {
1220        let mut pool = ConnectionPool::new(3);
1221
1222        // Add connections
1223        for i in 0..3 {
1224            let conn = Box::new(MockConnection::new(
1225                format!("127.0.0.1:{}", 9020 + i)
1226                    .parse()
1227                    .expect("Test address should be valid"),
1228            ));
1229            pool.add_connection(conn).await?;
1230        }
1231
1232        // Test round-robin selection
1233        let conn1 = pool.get_connection()?;
1234        let conn2 = pool.get_connection()?;
1235        let conn3 = pool.get_connection()?;
1236        let conn4 = pool.get_connection()?; // Should wrap around
1237
1238        // All connections should be different (until wraparound)
1239        assert_ne!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn2));
1240        assert_ne!(Arc::as_ptr(&conn2), Arc::as_ptr(&conn3));
1241        // Fourth should be same as first (round-robin)
1242        assert_eq!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn4));
1243
1244        Ok(())
1245    }
1246
1247    #[tokio::test]
1248    async fn test_connection_pool_empty() {
1249        let mut pool = ConnectionPool::new(3);
1250        let result = pool.get_connection();
1251
1252        assert!(result.is_err());
1253        if let Err(e) = result {
1254            assert!(e.to_string().contains("No connections available"));
1255        }
1256    }
1257
1258    #[tokio::test]
1259    async fn test_transport_message_structure() {
1260        let message = TransportMessage {
1261            sender: "test_peer".to_string(),
1262            data: vec![1, 2, 3, 4],
1263            protocol: "/p2p/test/1.0.0".to_string(),
1264            received_at: Instant::now(),
1265        };
1266
1267        assert_eq!(message.sender, "test_peer");
1268        assert_eq!(message.data, vec![1, 2, 3, 4]);
1269        assert_eq!(message.protocol, "/p2p/test/1.0.0");
1270    }
1271
1272    #[tokio::test]
1273    async fn test_mock_transport_address_support() -> Result<()> {
1274        let transport = MockTransport::new(TransportType::QUIC);
1275
1276        let addr1 = "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
1277            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1278        })?;
1279        let addr2 = "[::1]:9000".parse::<NetworkAddress>().map_err(|e| {
1280            P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1281        })?;
1282
1283        assert!(transport.supports_address(&addr1)); // IPv4 supported
1284        assert!(transport.supports_address(&addr2)); // IPv6 supported
1285        assert!(transport.supports_address(&addr1)); // IPv4 supported
1286
1287        let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1288        assert!(limited_transport.supports_address(&addr1)); // IPv4 supported
1289        assert!(limited_transport.supports_address(&addr2)); // IPv6 supported
1290        Ok(())
1291    }
1292
1293    #[tokio::test]
1294    async fn test_mock_transport_supported_addresses() -> Result<()> {
1295        let transport = MockTransport::new(TransportType::QUIC);
1296        let supports_ipv6 = transport.supports_ipv6();
1297
1298        // Dual-stack supported now
1299        assert!(supports_ipv6);
1300
1301        let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1302        let limited_supports_ipv6 = limited_transport.supports_ipv6();
1303
1304        // All transports support IPv6 now
1305        assert!(limited_supports_ipv6);
1306        Ok(())
1307    }
1308
1309    #[tokio::test]
1310    async fn test_transport_options_configuration() -> Result<()> {
1311        let options = TransportOptions {
1312            enable_0rtt: false,
1313            require_encryption: false,
1314            connect_timeout: Duration::from_secs(10),
1315            keep_alive: Duration::from_secs(30),
1316            max_message_size: 1024,
1317        };
1318
1319        assert!(!options.enable_0rtt);
1320        assert!(!options.require_encryption);
1321        assert_eq!(options.connect_timeout, Duration::from_secs(10));
1322        assert_eq!(options.keep_alive, Duration::from_secs(30));
1323        assert_eq!(options.max_message_size, 1024);
1324        Ok(())
1325    }
1326}