1pub mod ant_quic_adapter;
28
29use crate::validation::{Validate, ValidationContext, validate_message_size, validate_peer_id};
34use crate::{NetworkAddress, P2PError, PeerId, Result};
35use async_trait::async_trait;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{Mutex, RwLock};
42use tracing::{debug, info, warn};
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub enum TransportType {
47 QUIC,
49}
50
51#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
53pub enum TransportSelection {
54 #[default]
56 QUIC,
57}
58
59#[derive(Debug, Clone)]
61pub struct ConnectionQuality {
62 pub latency: Duration,
64 pub throughput_mbps: f64,
66 pub packet_loss: f64,
68 pub jitter: Duration,
70 pub connect_time: Duration,
72}
73
74#[derive(Debug, Clone)]
76pub struct ConnectionInfo {
77 pub transport_type: TransportType,
79 pub local_addr: NetworkAddress,
81 pub remote_addr: NetworkAddress,
83 pub is_encrypted: bool,
85 pub cipher_suite: String,
87 pub used_0rtt: bool,
89 pub established_at: Instant,
91 pub last_activity: Instant,
93}
94
95#[derive(Debug, Clone)]
97pub struct ConnectionPoolInfo {
98 pub active_connections: usize,
100 pub total_connections: usize,
102 pub bytes_sent: u64,
104 pub bytes_received: u64,
106}
107
108#[derive(Debug, Clone)]
110pub struct ConnectionPoolStats {
111 pub messages_per_connection: HashMap<String, usize>,
113 pub bytes_per_connection: HashMap<String, u64>,
115 pub latency_per_connection: HashMap<String, Duration>,
117}
118
119#[derive(Debug, Clone)]
121pub struct TransportMessage {
122 pub sender: PeerId,
124 pub data: Vec<u8>,
126 pub protocol: String,
128 pub received_at: Instant,
130}
131
132impl Validate for TransportMessage {
133 fn validate(&self, ctx: &ValidationContext) -> Result<()> {
134 validate_peer_id(&self.sender)?;
136
137 validate_message_size(self.data.len(), ctx.max_message_size)?;
139
140 if self.protocol.is_empty() || self.protocol.len() > 64 {
142 return Err(P2PError::validation("Invalid protocol identifier"));
143 }
144
145 Ok(())
146 }
147}
148
149#[allow(dead_code)] #[async_trait]
152pub trait Transport: Send + Sync {
153 async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress>;
155
156 async fn accept(&self) -> Result<Box<dyn Connection>>;
158
159 async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>>;
161
162 async fn connect_with_options(
164 &self,
165 addr: NetworkAddress,
166 options: TransportOptions,
167 ) -> Result<Box<dyn Connection>>;
168
169 fn supports_ipv6(&self) -> bool;
171
172 fn transport_type(&self) -> TransportType;
174
175 fn supports_address(&self, addr: &NetworkAddress) -> bool;
177}
178
179#[allow(dead_code)] #[async_trait]
182pub trait Connection: Send + Sync {
183 async fn send(&mut self, data: &[u8]) -> Result<()>;
185
186 async fn receive(&mut self) -> Result<Vec<u8>>;
188
189 async fn info(&self) -> ConnectionInfo;
191
192 async fn close(&mut self) -> Result<()>;
194
195 async fn is_alive(&self) -> bool;
197
198 async fn measure_quality(&self) -> Result<ConnectionQuality>;
200
201 fn local_addr(&self) -> NetworkAddress;
203
204 fn remote_addr(&self) -> NetworkAddress;
206}
207
208#[derive(Debug, Clone)]
210pub struct TransportOptions {
211 pub enable_0rtt: bool,
213 pub require_encryption: bool,
215 pub connect_timeout: Duration,
217 pub keep_alive: Duration,
219 pub max_message_size: usize,
221}
222
223#[allow(dead_code)] pub struct TransportManager {
226 transports: HashMap<TransportType, Arc<dyn Transport>>,
228 connections: Arc<RwLock<HashMap<PeerId, Arc<Mutex<ConnectionPool>>>>>,
230 selection: TransportSelection,
232 options: TransportOptions,
234}
235
236struct ConnectionPool {
238 connections: Vec<Arc<Mutex<Box<dyn Connection>>>>,
240 _info_cache: HashMap<String, ConnectionInfo>,
242 stats: ConnectionPoolStats,
244 max_connections: usize,
246 round_robin_index: usize,
248}
249
250impl TransportManager {
251 pub fn new(selection: TransportSelection, options: TransportOptions) -> Self {
253 Self {
254 transports: HashMap::new(),
255 connections: Arc::new(RwLock::new(HashMap::new())),
256 selection,
257 options,
258 }
259 }
260
261 pub fn register_transport(&mut self, transport: Arc<dyn Transport>) {
263 let transport_type = transport.transport_type();
264 self.transports.insert(transport_type, transport);
265 info!("Registered transport: {:?}", transport_type);
266 }
267
268 pub async fn connect(&self, addr: NetworkAddress) -> Result<PeerId> {
270 let transport_type = self.select_transport(&addr).await?;
271 let transport = self.transports.get(&transport_type).ok_or_else(|| {
272 P2PError::Transport(crate::error::TransportError::SetupFailed(
273 format!("Transport {transport_type:?} not available").into(),
274 ))
275 })?;
276
277 debug!("Connecting to {} using {:?}", addr, transport_type);
278
279 let connection = transport
280 .connect_with_options(addr.clone(), self.options.clone())
281 .await?;
282 let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port()); self.add_connection(peer_id.clone(), connection).await?;
286
287 info!("Connected to peer {} via {:?}", peer_id, transport_type);
288 Ok(peer_id)
289 }
290
291 pub async fn connect_with_transport(
293 &self,
294 addr: NetworkAddress,
295 transport_type: TransportType,
296 ) -> Result<PeerId> {
297 let transport = self.transports.get(&transport_type).ok_or_else(|| {
298 P2PError::Transport(crate::error::TransportError::SetupFailed(
299 format!("Transport {transport_type:?} not available").into(),
300 ))
301 })?;
302
303 let connection = transport
304 .connect_with_options(addr.clone(), self.options.clone())
305 .await?;
306 let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port());
307
308 self.add_connection(peer_id.clone(), connection).await?;
309 Ok(peer_id)
310 }
311
312 pub async fn send_message(&self, peer_id: &PeerId, data: Vec<u8>) -> Result<()> {
314 let connections = self.connections.read().await;
315 let pool = connections.get(peer_id).ok_or_else(|| {
316 P2PError::Network(crate::error::NetworkError::PeerNotFound(
317 peer_id.to_string().into(),
318 ))
319 })?;
320
321 let mut pool_guard = pool.lock().await;
322 let connection = pool_guard.get_connection()?;
323
324 let mut conn_guard = connection.lock().await;
325 conn_guard.send(&data).await?;
326
327 debug!("Sent {} bytes to peer {}", data.len(), peer_id);
328 Ok(())
329 }
330
331 pub async fn get_connection_info(&self, peer_id: &PeerId) -> Result<ConnectionInfo> {
333 let connections = self.connections.read().await;
334 let pool = connections.get(peer_id).ok_or_else(|| {
335 P2PError::Network(crate::error::NetworkError::PeerNotFound(
336 peer_id.to_string().into(),
337 ))
338 })?;
339
340 let mut pool_guard = pool.lock().await;
341 let connection = pool_guard.get_connection()?;
342 let conn_guard = connection.lock().await;
343
344 Ok(conn_guard.info().await)
345 }
346
347 pub async fn get_connection_pool_info(&self, peer_id: &PeerId) -> Result<ConnectionPoolInfo> {
349 let connections = self.connections.read().await;
350 let pool = connections.get(peer_id).ok_or_else(|| {
351 P2PError::Network(crate::error::NetworkError::PeerNotFound(
352 peer_id.to_string().into(),
353 ))
354 })?;
355
356 let pool_guard = pool.lock().await;
357 Ok(ConnectionPoolInfo {
358 active_connections: pool_guard.connections.len(),
359 total_connections: pool_guard.stats.messages_per_connection.len(),
360 bytes_sent: pool_guard.stats.bytes_per_connection.values().sum(),
361 bytes_received: 0, })
363 }
364
365 pub async fn get_connection_pool_stats(&self, peer_id: &PeerId) -> Result<ConnectionPoolStats> {
367 let connections = self.connections.read().await;
368 let pool = connections.get(peer_id).ok_or_else(|| {
369 P2PError::Network(crate::error::NetworkError::PeerNotFound(
370 peer_id.to_string().into(),
371 ))
372 })?;
373
374 let pool_guard = pool.lock().await;
375 Ok(pool_guard.stats.clone())
376 }
377
378 pub async fn measure_connection_quality(&self, peer_id: &PeerId) -> Result<ConnectionQuality> {
380 let connections = self.connections.read().await;
381 let pool = connections.get(peer_id).ok_or_else(|| {
382 P2PError::Network(crate::error::NetworkError::PeerNotFound(
383 peer_id.to_string().into(),
384 ))
385 })?;
386
387 let mut pool_guard = pool.lock().await;
388 let connection = pool_guard.get_connection()?;
389 let conn_guard = connection.lock().await;
390
391 conn_guard.measure_quality().await
392 }
393
394 pub async fn switch_transport(
396 &self,
397 peer_id: &PeerId,
398 _new_transport: TransportType,
399 ) -> Result<()> {
400 warn!(
405 "Transport switching not yet fully implemented for peer {}",
406 peer_id
407 );
408 Ok(())
409 }
410
411 async fn select_transport(&self, _addr: &NetworkAddress) -> Result<TransportType> {
413 match &self.selection {
414 TransportSelection::QUIC => {
415 if self.transports.contains_key(&TransportType::QUIC) {
416 Ok(TransportType::QUIC)
417 } else {
418 Err(P2PError::Transport(
419 crate::error::TransportError::SetupFailed(
420 "QUIC transport not available".into(),
421 ),
422 ))
423 }
424 }
425 }
426 }
427
428 #[allow(dead_code)]
430 async fn auto_select_transport(&self, addr: &NetworkAddress) -> Result<TransportType> {
431 if self.transports.contains_key(&TransportType::QUIC)
433 && let Some(transport) = self.transports.get(&TransportType::QUIC)
434 && transport.supports_address(addr)
435 {
436 debug!(
437 "Using QUIC transport for {} (only available transport)",
438 addr
439 );
440 return Ok(TransportType::QUIC);
441 }
442
443 Err(P2PError::Transport(
444 crate::error::TransportError::SetupFailed(
445 "QUIC transport not available or address not supported"
446 .to_string()
447 .into(),
448 ),
449 ))
450 }
451
452 async fn add_connection(&self, peer_id: PeerId, connection: Box<dyn Connection>) -> Result<()> {
454 let mut connections = self.connections.write().await;
455
456 let pool = connections.entry(peer_id.clone()).or_insert_with(|| {
457 Arc::new(Mutex::new(ConnectionPool::new(3))) });
459
460 let mut pool_guard = pool.lock().await;
461 pool_guard.add_connection(connection).await?;
462
463 Ok(())
464 }
465}
466
467impl ConnectionPool {
468 fn new(max_connections: usize) -> Self {
470 Self {
471 connections: Vec::new(),
472 _info_cache: HashMap::new(),
473 stats: ConnectionPoolStats {
474 messages_per_connection: HashMap::new(),
475 bytes_per_connection: HashMap::new(),
476 latency_per_connection: HashMap::new(),
477 },
478 max_connections,
479 round_robin_index: 0,
480 }
481 }
482
483 async fn add_connection(&mut self, connection: Box<dyn Connection>) -> Result<()> {
485 if self.connections.len() >= self.max_connections {
486 self.connections.remove(0);
488 }
489
490 let conn_id = format!("conn_{}", self.connections.len());
491 self.stats
492 .messages_per_connection
493 .insert(conn_id.clone(), 0);
494 self.stats.bytes_per_connection.insert(conn_id.clone(), 0);
495 self.stats
496 .latency_per_connection
497 .insert(conn_id, Duration::from_millis(0));
498
499 self.connections.push(Arc::new(Mutex::new(connection)));
500 Ok(())
501 }
502
503 fn get_connection(&mut self) -> Result<Arc<Mutex<Box<dyn Connection>>>> {
505 if self.connections.is_empty() {
506 return Err(P2PError::Network(
507 crate::error::NetworkError::ProtocolError(
508 "No connections available".to_string().into(),
509 ),
510 ));
511 }
512
513 let connection = self.connections[self.round_robin_index % self.connections.len()].clone();
514 self.round_robin_index += 1;
515
516 let conn_id = format!("conn_{}", self.round_robin_index % self.connections.len());
518 if let Some(count) = self.stats.messages_per_connection.get_mut(&conn_id) {
519 *count += 1;
520 }
521
522 Ok(connection)
523 }
524}
525
526impl fmt::Display for TransportType {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 match self {
529 TransportType::QUIC => write!(f, "quic"),
530 }
531 }
532}
533
534impl Default for TransportOptions {
535 fn default() -> Self {
536 Self {
537 enable_0rtt: true,
538 require_encryption: true,
539 connect_timeout: Duration::from_secs(30),
540 keep_alive: Duration::from_secs(60),
541 max_message_size: 64 * 1024 * 1024, }
543 }
544}
545
546impl Default for ConnectionQuality {
547 fn default() -> Self {
548 Self {
549 latency: Duration::from_millis(50),
550 throughput_mbps: 100.0,
551 packet_loss: 0.0,
552 jitter: Duration::from_millis(5),
553 connect_time: Duration::from_millis(100),
554 }
555 }
556}
557pub mod transport_types {
559 pub use super::TransportType;
560}
561
562#[cfg(test)]
567mod tests {
568 use super::*;
569 use crate::error::NetworkError;
570 use async_trait::async_trait;
571 use std::sync::atomic::{AtomicUsize, Ordering};
572 use tokio::time::Duration;
573
574 fn parse_addr(addr: &str) -> Result<NetworkAddress> {
576 addr.parse::<NetworkAddress>().map_err(|e| {
577 P2PError::Network(crate::error::NetworkError::InvalidAddress(
578 e.to_string().into(),
579 ))
580 })
581 }
582
583 struct MockTransport {
585 transport_type: TransportType,
586 should_fail: bool,
587 supports_all: bool,
588 }
589
590 impl MockTransport {
591 fn new(transport_type: TransportType) -> Self {
592 Self {
593 transport_type,
594 should_fail: false,
595 supports_all: true,
596 }
597 }
598
599 fn with_failure(mut self) -> Self {
600 self.should_fail = true;
601 self
602 }
603
604 fn with_limited_support(mut self) -> Self {
605 self.supports_all = false;
606 self
607 }
608 }
609
610 #[async_trait]
611 impl Transport for MockTransport {
612 async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress> {
613 if self.should_fail {
614 return Err(P2PError::Transport(
615 crate::error::TransportError::SetupFailed("Listen failed".to_string().into()),
616 ));
617 }
618 Ok(addr)
619 }
620
621 async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>> {
622 if self.should_fail {
623 return Err(P2PError::Transport(
624 crate::error::TransportError::SetupFailed(
625 "Connection failed".to_string().into(),
626 ),
627 ));
628 }
629 Ok(Box::new(MockConnection::new(addr)))
630 }
631
632 async fn connect_with_options(
633 &self,
634 addr: NetworkAddress,
635 _options: TransportOptions,
636 ) -> Result<Box<dyn Connection>> {
637 self.connect(addr).await
638 }
639
640 async fn accept(&self) -> Result<Box<dyn Connection>> {
641 if self.should_fail {
642 return Err(P2PError::Transport(
643 crate::error::TransportError::SetupFailed("Accept failed".into()),
644 ));
645 }
646 Ok(Box::new(MockConnection::new(
647 "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
648 crate::error::TransportError::SetupFailed(
649 format!("Invalid mock address: {}", e).into(),
650 )
651 })?,
652 )))
653 }
654
655 fn supports_ipv6(&self) -> bool {
656 true }
658
659 fn transport_type(&self) -> TransportType {
660 self.transport_type
661 }
662
663 fn supports_address(&self, addr: &NetworkAddress) -> bool {
664 addr.is_ipv4() || addr.is_ipv6()
666 }
667 }
668
669 struct MockConnection {
671 remote_addr: NetworkAddress,
672 is_alive: bool,
673 bytes_sent: AtomicUsize,
674 bytes_received: AtomicUsize,
675 }
676
677 impl MockConnection {
678 fn new(remote_addr: NetworkAddress) -> Self {
679 Self {
680 remote_addr,
681 is_alive: true,
682 bytes_sent: AtomicUsize::new(0),
683 bytes_received: AtomicUsize::new(0),
684 }
685 }
686 }
687
688 #[async_trait]
689 impl Connection for MockConnection {
690 async fn send(&mut self, data: &[u8]) -> Result<()> {
691 if !self.is_alive {
692 return Err(P2PError::Network(
693 crate::error::NetworkError::PeerDisconnected {
694 peer: "unknown".to_string(),
695 reason: "Connection closed".to_string(),
696 },
697 ));
698 }
699 self.bytes_sent.fetch_add(data.len(), Ordering::Relaxed);
700 Ok(())
701 }
702
703 async fn receive(&mut self) -> Result<Vec<u8>> {
704 if !self.is_alive {
705 return Err(P2PError::Network(
706 crate::error::NetworkError::PeerDisconnected {
707 peer: "unknown".to_string(),
708 reason: "Connection closed".to_string(),
709 },
710 ));
711 }
712 let data = b"mock_response".to_vec();
713 self.bytes_received.fetch_add(data.len(), Ordering::Relaxed);
714 Ok(data)
715 }
716
717 async fn info(&self) -> ConnectionInfo {
718 ConnectionInfo {
719 transport_type: TransportType::QUIC,
720 local_addr: "127.0.0.1:9000"
721 .parse::<NetworkAddress>()
722 .expect("Test address should be valid"),
723 remote_addr: self.remote_addr.clone(),
724 is_encrypted: true,
725 cipher_suite: "TLS_AES_256_GCM_SHA384".to_string(),
726 used_0rtt: false,
727 established_at: Instant::now(),
728 last_activity: Instant::now(),
729 }
730 }
731
732 async fn close(&mut self) -> Result<()> {
733 self.is_alive = false;
734 Ok(())
735 }
736
737 async fn is_alive(&self) -> bool {
738 self.is_alive
739 }
740
741 async fn measure_quality(&self) -> Result<ConnectionQuality> {
742 Ok(ConnectionQuality {
743 latency: Duration::from_millis(10),
744 throughput_mbps: 1000.0,
745 packet_loss: 0.1,
746 jitter: Duration::from_millis(2),
747 connect_time: Duration::from_millis(50),
748 })
749 }
750
751 fn local_addr(&self) -> NetworkAddress {
752 "127.0.0.1:9000"
753 .parse::<NetworkAddress>()
754 .expect("Test address should be valid")
755 }
756
757 fn remote_addr(&self) -> NetworkAddress {
758 self.remote_addr.clone()
759 }
760 }
761
762 fn create_test_transport_manager() -> TransportManager {
763 let options = TransportOptions::default();
764 TransportManager::new(TransportSelection::QUIC, options)
765 }
766
767 #[test]
768 fn test_transport_type_display() {
769 assert_eq!(format!("{}", TransportType::QUIC), "quic");
770 }
771
772 #[test]
773 fn test_transport_type_serialization() {
774 let quic_type = TransportType::QUIC;
775
776 assert_eq!(quic_type, TransportType::QUIC);
777 }
778
779 #[test]
780 fn test_transport_selection_variants() {
781 let quic_selection = TransportSelection::QUIC;
782
783 assert!(matches!(quic_selection, TransportSelection::QUIC));
784 }
785
786 #[test]
787 fn test_transport_selection_default() {
788 let default = TransportSelection::default();
789 assert!(matches!(default, TransportSelection::QUIC));
790 }
791
792 #[test]
793 fn test_transport_options_default() {
794 let options = TransportOptions::default();
795
796 assert!(options.enable_0rtt);
797 assert!(options.require_encryption);
798 assert_eq!(options.connect_timeout, Duration::from_secs(30));
799 assert_eq!(options.keep_alive, Duration::from_secs(60));
800 assert_eq!(options.max_message_size, 64 * 1024 * 1024);
801 }
802
803 #[test]
804 fn test_connection_quality_default() {
805 let quality = ConnectionQuality::default();
806
807 assert_eq!(quality.latency, Duration::from_millis(50));
808 assert_eq!(quality.throughput_mbps, 100.0);
809 assert_eq!(quality.packet_loss, 0.0);
810 assert_eq!(quality.jitter, Duration::from_millis(5));
811 assert_eq!(quality.connect_time, Duration::from_millis(100));
812 }
813
814 #[tokio::test]
815 async fn test_transport_manager_creation() {
816 let manager = create_test_transport_manager();
817 assert!(manager.transports.is_empty());
818 }
819
820 #[tokio::test]
821 async fn test_transport_registration() {
822 let mut manager = create_test_transport_manager();
823 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
824
825 manager.register_transport(quic_transport.clone());
826
827 assert_eq!(manager.transports.len(), 1);
828 assert!(manager.transports.contains_key(&TransportType::QUIC));
829 }
830
831 #[tokio::test]
832 async fn test_connection_establishment() -> Result<()> {
833 let mut manager = create_test_transport_manager();
834 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
835 manager.register_transport(transport);
836
837 let peer_id = manager
838 .connect("127.0.0.1:9001".parse::<NetworkAddress>().map_err(|e| {
839 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
840 })?)
841 .await?;
842 assert_eq!(peer_id, "peer_from_127.0.0.1_9001");
843
844 let connections = manager.connections.read().await;
845 assert!(connections.contains_key(&peer_id));
846
847 Ok(())
848 }
849
850 #[tokio::test]
851 async fn test_connection_with_specific_transport() -> Result<()> {
852 let mut manager = create_test_transport_manager();
853 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
854 manager.register_transport(transport);
855
856 let peer_id = manager
857 .connect_with_transport(
858 "127.0.0.1:9002".parse::<NetworkAddress>().map_err(|e| {
859 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
860 })?,
861 TransportType::QUIC,
862 )
863 .await?;
864
865 assert_eq!(peer_id, "peer_from_127.0.0.1_9002");
866 Ok(())
867 }
868
869 #[tokio::test]
870 async fn test_connection_failure_handling() -> Result<()> {
871 let mut manager = create_test_transport_manager();
872 let failing_transport = Arc::new(MockTransport::new(TransportType::QUIC).with_failure());
873 manager.register_transport(failing_transport);
874
875 let result = manager
876 .connect("127.0.0.1:9003".parse::<NetworkAddress>().map_err(|e| {
877 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
878 })?)
879 .await;
880 assert!(result.is_err());
881 assert!(
882 result
883 .unwrap_err()
884 .to_string()
885 .contains("Connection failed")
886 );
887 Ok(())
888 }
889
890 #[tokio::test]
891 async fn test_message_sending() -> Result<()> {
892 let mut manager = create_test_transport_manager();
893 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
894 manager.register_transport(transport);
895
896 let peer_id = manager
897 .connect("127.0.0.1:9004".parse::<NetworkAddress>().map_err(|e| {
898 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
899 })?)
900 .await?;
901 let message = b"Hello, transport!".to_vec();
902
903 manager.send_message(&peer_id, message.clone()).await?;
904
905 let pool_info = manager.get_connection_pool_info(&peer_id).await?;
907 assert_eq!(pool_info.active_connections, 1);
908
909 Ok(())
910 }
911
912 #[tokio::test]
913 async fn test_message_sending_no_connection() {
914 let manager = create_test_transport_manager();
915 let result = manager
916 .send_message(&"nonexistent_peer".to_string(), vec![1, 2, 3])
917 .await;
918
919 assert!(result.is_err());
920 assert!(
921 result
922 .unwrap_err()
923 .to_string()
924 .contains("No connection to peer")
925 );
926 }
927
928 #[tokio::test]
929 async fn test_connection_info_retrieval() -> Result<()> {
930 let mut manager = create_test_transport_manager();
931 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
932 manager.register_transport(transport);
933
934 let peer_id = manager
935 .connect("127.0.0.1:9005".parse::<NetworkAddress>().map_err(|e| {
936 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
937 })?)
938 .await?;
939 let info = manager.get_connection_info(&peer_id).await?;
940
941 assert_eq!(info.transport_type, TransportType::QUIC);
942 assert_eq!(
943 info.remote_addr,
944 "127.0.0.1:9005"
945 .parse::<NetworkAddress>()
946 .map_err(|e| P2PError::Network(NetworkError::InvalidAddress(
947 format!("{}", e).into()
948 )))?
949 );
950 assert!(info.is_encrypted);
951 assert_eq!(info.cipher_suite, "TLS_AES_256_GCM_SHA384");
952
953 Ok(())
954 }
955
956 #[tokio::test]
957 async fn test_connection_pool_info() -> Result<()> {
958 let mut manager = create_test_transport_manager();
959 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
960 manager.register_transport(transport);
961
962 let peer_id = manager
963 .connect("127.0.0.1:9006".parse::<NetworkAddress>().map_err(|e| {
964 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
965 })?)
966 .await?;
967 let pool_info = manager.get_connection_pool_info(&peer_id).await?;
968
969 assert_eq!(pool_info.active_connections, 1);
970 assert_eq!(pool_info.total_connections, 1);
971 assert_eq!(pool_info.bytes_sent, 0);
972
973 Ok(())
974 }
975
976 #[tokio::test]
977 async fn test_connection_pool_stats() -> Result<()> {
978 let mut manager = create_test_transport_manager();
979 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
980 manager.register_transport(transport);
981
982 let peer_id = manager
983 .connect("127.0.0.1:9007".parse::<NetworkAddress>().map_err(|e| {
984 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
985 })?)
986 .await?;
987 let stats = manager.get_connection_pool_stats(&peer_id).await?;
988
989 assert_eq!(stats.messages_per_connection.len(), 1);
990 assert_eq!(stats.bytes_per_connection.len(), 1);
991 assert_eq!(stats.latency_per_connection.len(), 1);
992
993 Ok(())
994 }
995
996 #[tokio::test]
997 async fn test_connection_quality_measurement() -> Result<()> {
998 let mut manager = create_test_transport_manager();
999 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
1000 manager.register_transport(transport);
1001
1002 let peer_id = manager
1003 .connect("127.0.0.1:9008".parse::<NetworkAddress>().map_err(|e| {
1004 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1005 })?)
1006 .await?;
1007 let quality = manager.measure_connection_quality(&peer_id).await?;
1008
1009 assert_eq!(quality.latency, Duration::from_millis(10));
1010 assert_eq!(quality.throughput_mbps, 1000.0);
1011 assert_eq!(quality.packet_loss, 0.1);
1012 assert_eq!(quality.jitter, Duration::from_millis(2));
1013
1014 Ok(())
1015 }
1016
1017 #[tokio::test]
1018 async fn test_transport_switching() -> Result<()> {
1019 let mut manager = create_test_transport_manager();
1020 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
1021 manager.register_transport(transport);
1022
1023 let peer_id = manager
1024 .connect("127.0.0.1:9009".parse::<NetworkAddress>().map_err(|e| {
1025 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1026 })?)
1027 .await?;
1028
1029 let result = manager
1031 .switch_transport(&peer_id, TransportType::QUIC)
1032 .await;
1033 assert!(result.is_ok());
1034
1035 Ok(())
1036 }
1037
1038 #[tokio::test]
1039 async fn test_auto_transport_selection_quic() -> Result<()> {
1040 let mut manager = create_test_transport_manager();
1041 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1042
1043 manager.register_transport(quic_transport);
1044
1045 let addr = "127.0.0.1:9010".parse::<NetworkAddress>().map_err(|e| {
1046 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1047 })?;
1048 let selected = manager.auto_select_transport(&addr).await?;
1049
1050 assert_eq!(selected, TransportType::QUIC);
1052
1053 Ok(())
1054 }
1055
1056 #[tokio::test]
1057 async fn test_transport_selection_no_quic() -> Result<()> {
1058 let manager = create_test_transport_manager();
1059 let addr = "127.0.0.1:9011".parse::<NetworkAddress>().map_err(|e| {
1062 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1063 })?;
1064 let selected = manager.auto_select_transport(&addr).await;
1065
1066 assert!(selected.is_err());
1068
1069 Ok(())
1070 }
1071
1072 #[tokio::test]
1073 async fn test_transport_selection_no_suitable_transport() -> Result<()> {
1074 let manager = create_test_transport_manager();
1075 let addr = "127.0.0.1:9012".parse::<NetworkAddress>().map_err(|e| {
1076 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1077 })?;
1078
1079 let result = manager.auto_select_transport(&addr).await;
1080 assert!(result.is_err());
1081 assert!(
1082 result
1083 .unwrap_err()
1084 .to_string()
1085 .contains("QUIC transport not available")
1086 );
1087 Ok(())
1088 }
1089
1090 #[tokio::test]
1091 async fn test_quic_transport_selection() -> Result<()> {
1092 let mut manager =
1093 TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1094 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1095
1096 manager.register_transport(quic_transport);
1097
1098 let addr = "127.0.0.1:9013".parse::<NetworkAddress>().map_err(|e| {
1099 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1100 })?;
1101 let selected = manager.select_transport(&addr).await?;
1102
1103 assert_eq!(selected, TransportType::QUIC);
1104
1105 Ok(())
1106 }
1107
1108 #[tokio::test]
1109 async fn test_quic_transport_unavailable() -> Result<()> {
1110 let manager = TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1111
1112 let addr = "127.0.0.1:9014".parse::<NetworkAddress>().map_err(|e| {
1113 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1114 })?;
1115 let result = manager.select_transport(&addr).await;
1116
1117 assert!(result.is_err());
1118 assert!(
1119 result
1120 .unwrap_err()
1121 .to_string()
1122 .contains("QUIC transport not available")
1123 );
1124 Ok(())
1125 }
1126
1127 #[tokio::test]
1128 async fn test_quic_transport_with_registration() -> Result<()> {
1129 let mut manager =
1130 TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1131 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1132
1133 manager.register_transport(quic_transport);
1134
1135 let addr = "127.0.0.1:9015".parse::<NetworkAddress>().map_err(|e| {
1136 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1137 })?;
1138 let selected = manager.select_transport(&addr).await?;
1139
1140 assert_eq!(selected, TransportType::QUIC);
1142
1143 Ok(())
1144 }
1145
1146 #[tokio::test]
1147 async fn test_mock_connection_lifecycle() -> Result<()> {
1148 let mut conn =
1149 MockConnection::new("127.0.0.1:9016".parse::<NetworkAddress>().map_err(|e| {
1150 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1151 })?);
1152
1153 assert!(conn.is_alive().await);
1154
1155 conn.send(b"test message").await?;
1157 assert_eq!(conn.bytes_sent.load(Ordering::Relaxed), 12);
1158
1159 let received = conn.receive().await?;
1161 assert_eq!(received, b"mock_response");
1162 assert_eq!(conn.bytes_received.load(Ordering::Relaxed), 13);
1163
1164 let info = conn.info().await;
1166 assert_eq!(info.transport_type, TransportType::QUIC);
1167 assert!(info.is_encrypted);
1168
1169 let quality = conn.measure_quality().await?;
1171 assert_eq!(quality.latency, Duration::from_millis(10));
1172
1173 conn.close().await?;
1175 assert!(!conn.is_alive().await);
1176
1177 let result = conn.send(b"test").await;
1179 assert!(result.is_err());
1180
1181 Ok(())
1182 }
1183
1184 #[tokio::test]
1185 async fn test_connection_pool_max_connections() -> Result<()> {
1186 let mut pool = ConnectionPool::new(2); let conn1 = Box::new(MockConnection::new(
1190 "127.0.0.1:9017".parse::<NetworkAddress>().map_err(|e| {
1191 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1192 })?,
1193 ));
1194 pool.add_connection(conn1).await?;
1195 assert_eq!(pool.connections.len(), 1);
1196
1197 let conn2 = Box::new(MockConnection::new(
1199 "127.0.0.1:9018".parse::<NetworkAddress>().map_err(|e| {
1200 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1201 })?,
1202 ));
1203 pool.add_connection(conn2).await?;
1204 assert_eq!(pool.connections.len(), 2);
1205
1206 let conn3 = Box::new(MockConnection::new(
1208 "127.0.0.1:9019".parse::<NetworkAddress>().map_err(|e| {
1209 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1210 })?,
1211 ));
1212 pool.add_connection(conn3).await?;
1213 assert_eq!(pool.connections.len(), 2);
1214
1215 Ok(())
1216 }
1217
1218 #[tokio::test]
1219 async fn test_connection_pool_round_robin() -> Result<()> {
1220 let mut pool = ConnectionPool::new(3);
1221
1222 for i in 0..3 {
1224 let conn = Box::new(MockConnection::new(
1225 format!("127.0.0.1:{}", 9020 + i)
1226 .parse()
1227 .expect("Test address should be valid"),
1228 ));
1229 pool.add_connection(conn).await?;
1230 }
1231
1232 let conn1 = pool.get_connection()?;
1234 let conn2 = pool.get_connection()?;
1235 let conn3 = pool.get_connection()?;
1236 let conn4 = pool.get_connection()?; assert_ne!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn2));
1240 assert_ne!(Arc::as_ptr(&conn2), Arc::as_ptr(&conn3));
1241 assert_eq!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn4));
1243
1244 Ok(())
1245 }
1246
1247 #[tokio::test]
1248 async fn test_connection_pool_empty() {
1249 let mut pool = ConnectionPool::new(3);
1250 let result = pool.get_connection();
1251
1252 assert!(result.is_err());
1253 if let Err(e) = result {
1254 assert!(e.to_string().contains("No connections available"));
1255 }
1256 }
1257
1258 #[tokio::test]
1259 async fn test_transport_message_structure() {
1260 let message = TransportMessage {
1261 sender: "test_peer".to_string(),
1262 data: vec![1, 2, 3, 4],
1263 protocol: "/p2p/test/1.0.0".to_string(),
1264 received_at: Instant::now(),
1265 };
1266
1267 assert_eq!(message.sender, "test_peer");
1268 assert_eq!(message.data, vec![1, 2, 3, 4]);
1269 assert_eq!(message.protocol, "/p2p/test/1.0.0");
1270 }
1271
1272 #[tokio::test]
1273 async fn test_mock_transport_address_support() -> Result<()> {
1274 let transport = MockTransport::new(TransportType::QUIC);
1275
1276 let addr1 = "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
1277 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1278 })?;
1279 let addr2 = "[::1]:9000".parse::<NetworkAddress>().map_err(|e| {
1280 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1281 })?;
1282
1283 assert!(transport.supports_address(&addr1)); assert!(transport.supports_address(&addr2)); assert!(transport.supports_address(&addr1)); let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1288 assert!(limited_transport.supports_address(&addr1)); assert!(limited_transport.supports_address(&addr2)); Ok(())
1291 }
1292
1293 #[tokio::test]
1294 async fn test_mock_transport_supported_addresses() -> Result<()> {
1295 let transport = MockTransport::new(TransportType::QUIC);
1296 let supports_ipv6 = transport.supports_ipv6();
1297
1298 assert!(supports_ipv6);
1300
1301 let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1302 let limited_supports_ipv6 = limited_transport.supports_ipv6();
1303
1304 assert!(limited_supports_ipv6);
1306 Ok(())
1307 }
1308
1309 #[tokio::test]
1310 async fn test_transport_options_configuration() -> Result<()> {
1311 let options = TransportOptions {
1312 enable_0rtt: false,
1313 require_encryption: false,
1314 connect_timeout: Duration::from_secs(10),
1315 keep_alive: Duration::from_secs(30),
1316 max_message_size: 1024,
1317 };
1318
1319 assert!(!options.enable_0rtt);
1320 assert!(!options.require_encryption);
1321 assert_eq!(options.connect_timeout, Duration::from_secs(10));
1322 assert_eq!(options.keep_alive, Duration::from_secs(30));
1323 assert_eq!(options.max_message_size, 1024);
1324 Ok(())
1325 }
1326}