1pub mod ant_quic_adapter;
28
29use crate::validation::{Validate, ValidationContext, validate_message_size, validate_peer_id};
34use crate::{NetworkAddress, P2PError, PeerId, Result};
35use async_trait::async_trait;
36use serde::{Deserialize, Serialize};
37use std::collections::HashMap;
38use std::fmt;
39use std::sync::Arc;
40use std::time::{Duration, Instant};
41use tokio::sync::{Mutex, RwLock};
42use tracing::{debug, info, warn};
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub enum TransportType {
47 QUIC,
49}
50
51#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
53pub enum TransportSelection {
54 #[default]
56 QUIC,
57}
58
59#[derive(Debug, Clone)]
61pub struct ConnectionQuality {
62 pub latency: Duration,
64 pub throughput_mbps: f64,
66 pub packet_loss: f64,
68 pub jitter: Duration,
70 pub connect_time: Duration,
72}
73
74#[derive(Debug, Clone)]
76pub struct ConnectionInfo {
77 pub transport_type: TransportType,
79 pub local_addr: NetworkAddress,
81 pub remote_addr: NetworkAddress,
83 pub is_encrypted: bool,
85 pub cipher_suite: String,
87 pub used_0rtt: bool,
89 pub established_at: Instant,
91 pub last_activity: Instant,
93}
94
95#[derive(Debug, Clone)]
97pub struct ConnectionPoolInfo {
98 pub active_connections: usize,
100 pub total_connections: usize,
102 pub bytes_sent: u64,
104 pub bytes_received: u64,
106}
107
108#[derive(Debug, Clone)]
110pub struct ConnectionPoolStats {
111 pub messages_per_connection: HashMap<String, usize>,
113 pub bytes_per_connection: HashMap<String, u64>,
115 pub latency_per_connection: HashMap<String, Duration>,
117}
118
119#[derive(Debug, Clone)]
121pub struct TransportMessage {
122 pub sender: PeerId,
124 pub data: Vec<u8>,
126 pub protocol: String,
128 pub received_at: Instant,
130}
131
132impl Validate for TransportMessage {
133 fn validate(&self, ctx: &ValidationContext) -> Result<()> {
134 validate_peer_id(&self.sender)?;
136
137 validate_message_size(self.data.len(), ctx.max_message_size)?;
139
140 if self.protocol.is_empty() || self.protocol.len() > 64 {
142 return Err(P2PError::validation("Invalid protocol identifier"));
143 }
144
145 Ok(())
146 }
147}
148
149#[allow(dead_code)] #[async_trait]
152pub trait Transport: Send + Sync {
153 async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress>;
155
156 async fn accept(&self) -> Result<Box<dyn Connection>>;
158
159 async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>>;
161
162 async fn connect_with_options(
164 &self,
165 addr: NetworkAddress,
166 options: TransportOptions,
167 ) -> Result<Box<dyn Connection>>;
168
169 fn supports_ipv6(&self) -> bool;
171
172 fn transport_type(&self) -> TransportType;
174
175 fn supports_address(&self, addr: &NetworkAddress) -> bool;
177}
178
179#[allow(dead_code)] #[async_trait]
182pub trait Connection: Send + Sync {
183 async fn send(&mut self, data: &[u8]) -> Result<()>;
185
186 async fn receive(&mut self) -> Result<Vec<u8>>;
188
189 async fn info(&self) -> ConnectionInfo;
191
192 async fn close(&mut self) -> Result<()>;
194
195 async fn is_alive(&self) -> bool;
197
198 async fn measure_quality(&self) -> Result<ConnectionQuality>;
200
201 fn local_addr(&self) -> NetworkAddress;
203
204 fn remote_addr(&self) -> NetworkAddress;
206}
207
208#[derive(Debug, Clone)]
210pub struct TransportOptions {
211 pub enable_0rtt: bool,
213 pub require_encryption: bool,
215 pub connect_timeout: Duration,
217 pub keep_alive: Duration,
219 pub max_message_size: usize,
221}
222
223#[allow(dead_code)] pub struct TransportManager {
226 transports: HashMap<TransportType, Arc<dyn Transport>>,
228 connections: Arc<RwLock<HashMap<PeerId, Arc<Mutex<ConnectionPool>>>>>,
230 selection: TransportSelection,
232 options: TransportOptions,
234}
235
236struct ConnectionPool {
238 connections: Vec<Arc<Mutex<Box<dyn Connection>>>>,
240 _info_cache: HashMap<String, ConnectionInfo>,
242 stats: ConnectionPoolStats,
244 max_connections: usize,
246 round_robin_index: usize,
248}
249
250impl TransportManager {
251 pub fn new(selection: TransportSelection, options: TransportOptions) -> Self {
253 Self {
254 transports: HashMap::new(),
255 connections: Arc::new(RwLock::new(HashMap::new())),
256 selection,
257 options,
258 }
259 }
260
261 pub fn register_transport(&mut self, transport: Arc<dyn Transport>) {
263 let transport_type = transport.transport_type();
264 self.transports.insert(transport_type, transport);
265 info!("Registered transport: {:?}", transport_type);
266 }
267
268 pub async fn connect(&self, addr: NetworkAddress) -> Result<PeerId> {
270 let transport_type = self.select_transport(&addr).await?;
271 let transport = self.transports.get(&transport_type).ok_or_else(|| {
272 P2PError::Transport(crate::error::TransportError::SetupFailed(
273 format!("Transport {transport_type:?} not available").into(),
274 ))
275 })?;
276
277 debug!("Connecting to {} using {:?}", addr, transport_type);
278
279 let connection = transport
280 .connect_with_options(addr.clone(), self.options.clone())
281 .await?;
282 let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port()); self.add_connection(peer_id.clone(), connection).await?;
286
287 info!("Connected to peer {} via {:?}", peer_id, transport_type);
288 Ok(peer_id)
289 }
290
291 pub async fn connect_with_transport(
293 &self,
294 addr: NetworkAddress,
295 transport_type: TransportType,
296 ) -> Result<PeerId> {
297 let transport = self.transports.get(&transport_type).ok_or_else(|| {
298 P2PError::Transport(crate::error::TransportError::SetupFailed(
299 format!("Transport {transport_type:?} not available").into(),
300 ))
301 })?;
302
303 let connection = transport
304 .connect_with_options(addr.clone(), self.options.clone())
305 .await?;
306 let peer_id = format!("peer_from_{}_{}", addr.ip(), addr.port());
307
308 self.add_connection(peer_id.clone(), connection).await?;
309 Ok(peer_id)
310 }
311
312 pub async fn send_message(&self, peer_id: &PeerId, data: Vec<u8>) -> Result<()> {
314 let connections = self.connections.read().await;
315 let pool = connections.get(peer_id).ok_or_else(|| {
316 P2PError::Network(crate::error::NetworkError::PeerNotFound(
317 peer_id.to_string().into(),
318 ))
319 })?;
320
321 let mut pool_guard = pool.lock().await;
322 let connection = pool_guard.get_connection()?;
323
324 let mut conn_guard = connection.lock().await;
325 conn_guard.send(&data).await?;
326
327 debug!("Sent {} bytes to peer {}", data.len(), peer_id);
328 Ok(())
329 }
330
331 pub async fn get_connection_info(&self, peer_id: &PeerId) -> Result<ConnectionInfo> {
333 let connections = self.connections.read().await;
334 let pool = connections.get(peer_id).ok_or_else(|| {
335 P2PError::Network(crate::error::NetworkError::PeerNotFound(
336 peer_id.to_string().into(),
337 ))
338 })?;
339
340 let mut pool_guard = pool.lock().await;
341 let connection = pool_guard.get_connection()?;
342 let conn_guard = connection.lock().await;
343
344 Ok(conn_guard.info().await)
345 }
346
347 pub async fn get_connection_pool_info(&self, peer_id: &PeerId) -> Result<ConnectionPoolInfo> {
349 let connections = self.connections.read().await;
350 let pool = connections.get(peer_id).ok_or_else(|| {
351 P2PError::Network(crate::error::NetworkError::PeerNotFound(
352 peer_id.to_string().into(),
353 ))
354 })?;
355
356 let pool_guard = pool.lock().await;
357 Ok(ConnectionPoolInfo {
358 active_connections: pool_guard.connections.len(),
359 total_connections: pool_guard.stats.messages_per_connection.len(),
360 bytes_sent: pool_guard.stats.bytes_per_connection.values().sum(),
361 bytes_received: 0, })
363 }
364
365 pub async fn get_connection_pool_stats(&self, peer_id: &PeerId) -> Result<ConnectionPoolStats> {
367 let connections = self.connections.read().await;
368 let pool = connections.get(peer_id).ok_or_else(|| {
369 P2PError::Network(crate::error::NetworkError::PeerNotFound(
370 peer_id.to_string().into(),
371 ))
372 })?;
373
374 let pool_guard = pool.lock().await;
375 Ok(pool_guard.stats.clone())
376 }
377
378 pub async fn measure_connection_quality(&self, peer_id: &PeerId) -> Result<ConnectionQuality> {
380 let connections = self.connections.read().await;
381 let pool = connections.get(peer_id).ok_or_else(|| {
382 P2PError::Network(crate::error::NetworkError::PeerNotFound(
383 peer_id.to_string().into(),
384 ))
385 })?;
386
387 let mut pool_guard = pool.lock().await;
388 let connection = pool_guard.get_connection()?;
389 let conn_guard = connection.lock().await;
390
391 conn_guard.measure_quality().await
392 }
393
394 pub async fn switch_transport(
396 &self,
397 peer_id: &PeerId,
398 _new_transport: TransportType,
399 ) -> Result<()> {
400 warn!(
405 "Transport switching not yet fully implemented for peer {}",
406 peer_id
407 );
408 Ok(())
409 }
410
411 async fn select_transport(&self, _addr: &NetworkAddress) -> Result<TransportType> {
413 match &self.selection {
414 TransportSelection::QUIC => {
415 if self.transports.contains_key(&TransportType::QUIC) {
416 Ok(TransportType::QUIC)
417 } else {
418 Err(P2PError::Transport(
419 crate::error::TransportError::SetupFailed(
420 "QUIC transport not available".into(),
421 ),
422 ))
423 }
424 }
425 }
426 }
427
428 #[allow(dead_code)]
430 async fn auto_select_transport(&self, addr: &NetworkAddress) -> Result<TransportType> {
431 if self.transports.contains_key(&TransportType::QUIC)
433 && let Some(transport) = self.transports.get(&TransportType::QUIC)
434 && transport.supports_address(addr)
435 {
436 debug!(
437 "Using QUIC transport for {} (only available transport)",
438 addr
439 );
440 return Ok(TransportType::QUIC);
441 }
442
443 Err(P2PError::Transport(
444 crate::error::TransportError::SetupFailed(
445 "QUIC transport not available or address not supported"
446 .to_string()
447 .into(),
448 ),
449 ))
450 }
451
452 async fn add_connection(&self, peer_id: PeerId, connection: Box<dyn Connection>) -> Result<()> {
454 let mut connections = self.connections.write().await;
455
456 let pool = connections.entry(peer_id.clone()).or_insert_with(|| {
457 Arc::new(Mutex::new(ConnectionPool::new(3))) });
459
460 let mut pool_guard = pool.lock().await;
461 pool_guard.add_connection(connection).await?;
462
463 Ok(())
464 }
465}
466
467impl ConnectionPool {
468 fn new(max_connections: usize) -> Self {
470 Self {
471 connections: Vec::new(),
472 _info_cache: HashMap::new(),
473 stats: ConnectionPoolStats {
474 messages_per_connection: HashMap::new(),
475 bytes_per_connection: HashMap::new(),
476 latency_per_connection: HashMap::new(),
477 },
478 max_connections,
479 round_robin_index: 0,
480 }
481 }
482
483 async fn add_connection(&mut self, connection: Box<dyn Connection>) -> Result<()> {
485 if self.connections.len() >= self.max_connections {
486 self.connections.remove(0);
488 }
489
490 let conn_id = format!("conn_{}", self.connections.len());
491 self.stats
492 .messages_per_connection
493 .insert(conn_id.clone(), 0);
494 self.stats.bytes_per_connection.insert(conn_id.clone(), 0);
495 self.stats
496 .latency_per_connection
497 .insert(conn_id, Duration::from_millis(0));
498
499 self.connections.push(Arc::new(Mutex::new(connection)));
500 Ok(())
501 }
502
503 fn get_connection(&mut self) -> Result<Arc<Mutex<Box<dyn Connection>>>> {
505 if self.connections.is_empty() {
506 return Err(P2PError::Network(
507 crate::error::NetworkError::ProtocolError(
508 "No connections available".to_string().into(),
509 ),
510 ));
511 }
512
513 let connection = self.connections[self.round_robin_index % self.connections.len()].clone();
514 self.round_robin_index += 1;
515
516 let conn_id = format!("conn_{}", self.round_robin_index % self.connections.len());
518 if let Some(count) = self.stats.messages_per_connection.get_mut(&conn_id) {
519 *count += 1;
520 }
521
522 Ok(connection)
523 }
524}
525
526impl fmt::Display for TransportType {
527 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
528 match self {
529 TransportType::QUIC => write!(f, "quic"),
530 }
531 }
532}
533
534impl Default for TransportOptions {
535 fn default() -> Self {
536 Self {
537 enable_0rtt: true,
538 require_encryption: true,
539 connect_timeout: Duration::from_secs(30),
540 keep_alive: Duration::from_secs(60),
541 max_message_size: 64 * 1024 * 1024, }
543 }
544}
545
546impl Default for ConnectionQuality {
547 fn default() -> Self {
548 Self {
549 latency: Duration::from_millis(50),
550 throughput_mbps: 100.0,
551 packet_loss: 0.0,
552 jitter: Duration::from_millis(5),
553 connect_time: Duration::from_millis(100),
554 }
555 }
556}
557pub mod transport_types {
559 pub use super::TransportType;
560}
561
562#[cfg(test)]
567mod tests {
568 use super::*;
569 use crate::error::NetworkError;
570 use async_trait::async_trait;
571 use std::sync::atomic::{AtomicUsize, Ordering};
572 use tokio::time::Duration;
573
574 #[allow(dead_code)]
576 fn parse_addr(addr: &str) -> Result<NetworkAddress> {
577 addr.parse::<NetworkAddress>().map_err(|e| {
578 P2PError::Network(crate::error::NetworkError::InvalidAddress(
579 e.to_string().into(),
580 ))
581 })
582 }
583
584 struct MockTransport {
586 transport_type: TransportType,
587 should_fail: bool,
588 supports_all: bool,
589 }
590
591 impl MockTransport {
592 fn new(transport_type: TransportType) -> Self {
593 Self {
594 transport_type,
595 should_fail: false,
596 supports_all: true,
597 }
598 }
599
600 fn with_failure(mut self) -> Self {
601 self.should_fail = true;
602 self
603 }
604
605 fn with_limited_support(mut self) -> Self {
606 self.supports_all = false;
607 self
608 }
609 }
610
611 #[async_trait]
612 impl Transport for MockTransport {
613 async fn listen(&self, addr: NetworkAddress) -> Result<NetworkAddress> {
614 if self.should_fail {
615 return Err(P2PError::Transport(
616 crate::error::TransportError::SetupFailed("Listen failed".to_string().into()),
617 ));
618 }
619 Ok(addr)
620 }
621
622 async fn connect(&self, addr: NetworkAddress) -> Result<Box<dyn Connection>> {
623 if self.should_fail {
624 return Err(P2PError::Transport(
625 crate::error::TransportError::SetupFailed(
626 "Connection failed".to_string().into(),
627 ),
628 ));
629 }
630 Ok(Box::new(MockConnection::new(addr)))
631 }
632
633 async fn connect_with_options(
634 &self,
635 addr: NetworkAddress,
636 _options: TransportOptions,
637 ) -> Result<Box<dyn Connection>> {
638 self.connect(addr).await
639 }
640
641 async fn accept(&self) -> Result<Box<dyn Connection>> {
642 if self.should_fail {
643 return Err(P2PError::Transport(
644 crate::error::TransportError::SetupFailed("Accept failed".into()),
645 ));
646 }
647 Ok(Box::new(MockConnection::new(
648 "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
649 crate::error::TransportError::SetupFailed(
650 format!("Invalid mock address: {}", e).into(),
651 )
652 })?,
653 )))
654 }
655
656 fn supports_ipv6(&self) -> bool {
657 true }
659
660 fn transport_type(&self) -> TransportType {
661 self.transport_type
662 }
663
664 fn supports_address(&self, addr: &NetworkAddress) -> bool {
665 addr.is_ipv4() || addr.is_ipv6()
667 }
668 }
669
670 struct MockConnection {
672 remote_addr: NetworkAddress,
673 is_alive: bool,
674 bytes_sent: AtomicUsize,
675 bytes_received: AtomicUsize,
676 }
677
678 impl MockConnection {
679 fn new(remote_addr: NetworkAddress) -> Self {
680 Self {
681 remote_addr,
682 is_alive: true,
683 bytes_sent: AtomicUsize::new(0),
684 bytes_received: AtomicUsize::new(0),
685 }
686 }
687 }
688
689 #[async_trait]
690 impl Connection for MockConnection {
691 async fn send(&mut self, data: &[u8]) -> Result<()> {
692 if !self.is_alive {
693 return Err(P2PError::Network(
694 crate::error::NetworkError::PeerDisconnected {
695 peer: "unknown".to_string(),
696 reason: "Connection closed".to_string(),
697 },
698 ));
699 }
700 self.bytes_sent.fetch_add(data.len(), Ordering::Relaxed);
701 Ok(())
702 }
703
704 async fn receive(&mut self) -> Result<Vec<u8>> {
705 if !self.is_alive {
706 return Err(P2PError::Network(
707 crate::error::NetworkError::PeerDisconnected {
708 peer: "unknown".to_string(),
709 reason: "Connection closed".to_string(),
710 },
711 ));
712 }
713 let data = b"mock_response".to_vec();
714 self.bytes_received.fetch_add(data.len(), Ordering::Relaxed);
715 Ok(data)
716 }
717
718 async fn info(&self) -> ConnectionInfo {
719 ConnectionInfo {
720 transport_type: TransportType::QUIC,
721 local_addr: "127.0.0.1:9000"
722 .parse::<NetworkAddress>()
723 .expect("Test address should be valid"),
724 remote_addr: self.remote_addr.clone(),
725 is_encrypted: true,
726 cipher_suite: "TLS_AES_256_GCM_SHA384".to_string(),
727 used_0rtt: false,
728 established_at: Instant::now(),
729 last_activity: Instant::now(),
730 }
731 }
732
733 async fn close(&mut self) -> Result<()> {
734 self.is_alive = false;
735 Ok(())
736 }
737
738 async fn is_alive(&self) -> bool {
739 self.is_alive
740 }
741
742 async fn measure_quality(&self) -> Result<ConnectionQuality> {
743 Ok(ConnectionQuality {
744 latency: Duration::from_millis(10),
745 throughput_mbps: 1000.0,
746 packet_loss: 0.1,
747 jitter: Duration::from_millis(2),
748 connect_time: Duration::from_millis(50),
749 })
750 }
751
752 fn local_addr(&self) -> NetworkAddress {
753 "127.0.0.1:9000"
754 .parse::<NetworkAddress>()
755 .expect("Test address should be valid")
756 }
757
758 fn remote_addr(&self) -> NetworkAddress {
759 self.remote_addr.clone()
760 }
761 }
762
763 fn create_test_transport_manager() -> TransportManager {
764 let options = TransportOptions::default();
765 TransportManager::new(TransportSelection::QUIC, options)
766 }
767
768 #[test]
769 fn test_transport_type_display() {
770 assert_eq!(format!("{}", TransportType::QUIC), "quic");
771 }
772
773 #[test]
774 fn test_transport_type_serialization() {
775 let quic_type = TransportType::QUIC;
776
777 assert_eq!(quic_type, TransportType::QUIC);
778 }
779
780 #[test]
781 fn test_transport_selection_variants() {
782 let quic_selection = TransportSelection::QUIC;
783
784 assert!(matches!(quic_selection, TransportSelection::QUIC));
785 }
786
787 #[test]
788 fn test_transport_selection_default() {
789 let default = TransportSelection::default();
790 assert!(matches!(default, TransportSelection::QUIC));
791 }
792
793 #[test]
794 fn test_transport_options_default() {
795 let options = TransportOptions::default();
796
797 assert!(options.enable_0rtt);
798 assert!(options.require_encryption);
799 assert_eq!(options.connect_timeout, Duration::from_secs(30));
800 assert_eq!(options.keep_alive, Duration::from_secs(60));
801 assert_eq!(options.max_message_size, 64 * 1024 * 1024);
802 }
803
804 #[test]
805 fn test_connection_quality_default() {
806 let quality = ConnectionQuality::default();
807
808 assert_eq!(quality.latency, Duration::from_millis(50));
809 assert_eq!(quality.throughput_mbps, 100.0);
810 assert_eq!(quality.packet_loss, 0.0);
811 assert_eq!(quality.jitter, Duration::from_millis(5));
812 assert_eq!(quality.connect_time, Duration::from_millis(100));
813 }
814
815 #[tokio::test]
816 async fn test_transport_manager_creation() {
817 let manager = create_test_transport_manager();
818 assert!(manager.transports.is_empty());
819 }
820
821 #[tokio::test]
822 async fn test_transport_registration() {
823 let mut manager = create_test_transport_manager();
824 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
825
826 manager.register_transport(quic_transport.clone());
827
828 assert_eq!(manager.transports.len(), 1);
829 assert!(manager.transports.contains_key(&TransportType::QUIC));
830 }
831
832 #[tokio::test]
833 async fn test_connection_establishment() -> Result<()> {
834 let mut manager = create_test_transport_manager();
835 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
836 manager.register_transport(transport);
837
838 let peer_id = manager
839 .connect("127.0.0.1:9001".parse::<NetworkAddress>().map_err(|e| {
840 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
841 })?)
842 .await?;
843 assert_eq!(peer_id, "peer_from_127.0.0.1_9001");
844
845 let connections = manager.connections.read().await;
846 assert!(connections.contains_key(&peer_id));
847
848 Ok(())
849 }
850
851 #[tokio::test]
852 async fn test_connection_with_specific_transport() -> Result<()> {
853 let mut manager = create_test_transport_manager();
854 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
855 manager.register_transport(transport);
856
857 let peer_id = manager
858 .connect_with_transport(
859 "127.0.0.1:9002".parse::<NetworkAddress>().map_err(|e| {
860 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
861 })?,
862 TransportType::QUIC,
863 )
864 .await?;
865
866 assert_eq!(peer_id, "peer_from_127.0.0.1_9002");
867 Ok(())
868 }
869
870 #[tokio::test]
871 async fn test_connection_failure_handling() -> Result<()> {
872 let mut manager = create_test_transport_manager();
873 let failing_transport = Arc::new(MockTransport::new(TransportType::QUIC).with_failure());
874 manager.register_transport(failing_transport);
875
876 let result = manager
877 .connect("127.0.0.1:9003".parse::<NetworkAddress>().map_err(|e| {
878 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
879 })?)
880 .await;
881 assert!(result.is_err());
882 assert!(
883 result
884 .unwrap_err()
885 .to_string()
886 .contains("Connection failed")
887 );
888 Ok(())
889 }
890
891 #[tokio::test]
892 async fn test_message_sending() -> Result<()> {
893 let mut manager = create_test_transport_manager();
894 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
895 manager.register_transport(transport);
896
897 let peer_id = manager
898 .connect("127.0.0.1:9004".parse::<NetworkAddress>().map_err(|e| {
899 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
900 })?)
901 .await?;
902 let message = b"Hello, transport!".to_vec();
903
904 manager.send_message(&peer_id, message.clone()).await?;
905
906 let pool_info = manager.get_connection_pool_info(&peer_id).await?;
908 assert_eq!(pool_info.active_connections, 1);
909
910 Ok(())
911 }
912
913 #[tokio::test]
914 async fn test_message_sending_no_connection() {
915 let manager = create_test_transport_manager();
916 let result = manager
917 .send_message(&"nonexistent_peer".to_string(), vec![1, 2, 3])
918 .await;
919
920 assert!(result.is_err());
921 assert!(result.unwrap_err().to_string().contains("Peer not found"));
922 }
923
924 #[tokio::test]
925 async fn test_connection_info_retrieval() -> Result<()> {
926 let mut manager = create_test_transport_manager();
927 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
928 manager.register_transport(transport);
929
930 let peer_id = manager
931 .connect("127.0.0.1:9005".parse::<NetworkAddress>().map_err(|e| {
932 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
933 })?)
934 .await?;
935 let info = manager.get_connection_info(&peer_id).await?;
936
937 assert_eq!(info.transport_type, TransportType::QUIC);
938 assert_eq!(
939 info.remote_addr,
940 "127.0.0.1:9005"
941 .parse::<NetworkAddress>()
942 .map_err(|e| P2PError::Network(NetworkError::InvalidAddress(
943 format!("{}", e).into()
944 )))?
945 );
946 assert!(info.is_encrypted);
947 assert_eq!(info.cipher_suite, "TLS_AES_256_GCM_SHA384");
948
949 Ok(())
950 }
951
952 #[tokio::test]
953 async fn test_connection_pool_info() -> Result<()> {
954 let mut manager = create_test_transport_manager();
955 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
956 manager.register_transport(transport);
957
958 let peer_id = manager
959 .connect("127.0.0.1:9006".parse::<NetworkAddress>().map_err(|e| {
960 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
961 })?)
962 .await?;
963 let pool_info = manager.get_connection_pool_info(&peer_id).await?;
964
965 assert_eq!(pool_info.active_connections, 1);
966 assert_eq!(pool_info.total_connections, 1);
967 assert_eq!(pool_info.bytes_sent, 0);
968
969 Ok(())
970 }
971
972 #[tokio::test]
973 async fn test_connection_pool_stats() -> Result<()> {
974 let mut manager = create_test_transport_manager();
975 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
976 manager.register_transport(transport);
977
978 let peer_id = manager
979 .connect("127.0.0.1:9007".parse::<NetworkAddress>().map_err(|e| {
980 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
981 })?)
982 .await?;
983 let stats = manager.get_connection_pool_stats(&peer_id).await?;
984
985 assert_eq!(stats.messages_per_connection.len(), 1);
986 assert_eq!(stats.bytes_per_connection.len(), 1);
987 assert_eq!(stats.latency_per_connection.len(), 1);
988
989 Ok(())
990 }
991
992 #[tokio::test]
993 async fn test_connection_quality_measurement() -> Result<()> {
994 let mut manager = create_test_transport_manager();
995 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
996 manager.register_transport(transport);
997
998 let peer_id = manager
999 .connect("127.0.0.1:9008".parse::<NetworkAddress>().map_err(|e| {
1000 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1001 })?)
1002 .await?;
1003 let quality = manager.measure_connection_quality(&peer_id).await?;
1004
1005 assert_eq!(quality.latency, Duration::from_millis(10));
1006 assert_eq!(quality.throughput_mbps, 1000.0);
1007 assert_eq!(quality.packet_loss, 0.1);
1008 assert_eq!(quality.jitter, Duration::from_millis(2));
1009
1010 Ok(())
1011 }
1012
1013 #[tokio::test]
1014 async fn test_transport_switching() -> Result<()> {
1015 let mut manager = create_test_transport_manager();
1016 let transport = Arc::new(MockTransport::new(TransportType::QUIC));
1017 manager.register_transport(transport);
1018
1019 let peer_id = manager
1020 .connect("127.0.0.1:9009".parse::<NetworkAddress>().map_err(|e| {
1021 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1022 })?)
1023 .await?;
1024
1025 let result = manager
1027 .switch_transport(&peer_id, TransportType::QUIC)
1028 .await;
1029 assert!(result.is_ok());
1030
1031 Ok(())
1032 }
1033
1034 #[tokio::test]
1035 async fn test_auto_transport_selection_quic() -> Result<()> {
1036 let mut manager = create_test_transport_manager();
1037 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1038
1039 manager.register_transport(quic_transport);
1040
1041 let addr = "127.0.0.1:9010".parse::<NetworkAddress>().map_err(|e| {
1042 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1043 })?;
1044 let selected = manager.auto_select_transport(&addr).await?;
1045
1046 assert_eq!(selected, TransportType::QUIC);
1048
1049 Ok(())
1050 }
1051
1052 #[tokio::test]
1053 async fn test_transport_selection_no_quic() -> Result<()> {
1054 let manager = create_test_transport_manager();
1055 let addr = "127.0.0.1:9011".parse::<NetworkAddress>().map_err(|e| {
1058 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1059 })?;
1060 let selected = manager.auto_select_transport(&addr).await;
1061
1062 assert!(selected.is_err());
1064
1065 Ok(())
1066 }
1067
1068 #[tokio::test]
1069 async fn test_transport_selection_no_suitable_transport() -> Result<()> {
1070 let manager = create_test_transport_manager();
1071 let addr = "127.0.0.1:9012".parse::<NetworkAddress>().map_err(|e| {
1072 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1073 })?;
1074
1075 let result = manager.auto_select_transport(&addr).await;
1076 assert!(result.is_err());
1077 assert!(
1078 result
1079 .unwrap_err()
1080 .to_string()
1081 .contains("QUIC transport not available")
1082 );
1083 Ok(())
1084 }
1085
1086 #[tokio::test]
1087 async fn test_quic_transport_selection() -> Result<()> {
1088 let mut manager =
1089 TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1090 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1091
1092 manager.register_transport(quic_transport);
1093
1094 let addr = "127.0.0.1:9013".parse::<NetworkAddress>().map_err(|e| {
1095 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1096 })?;
1097 let selected = manager.select_transport(&addr).await?;
1098
1099 assert_eq!(selected, TransportType::QUIC);
1100
1101 Ok(())
1102 }
1103
1104 #[tokio::test]
1105 async fn test_quic_transport_unavailable() -> Result<()> {
1106 let manager = TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1107
1108 let addr = "127.0.0.1:9014".parse::<NetworkAddress>().map_err(|e| {
1109 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1110 })?;
1111 let result = manager.select_transport(&addr).await;
1112
1113 assert!(result.is_err());
1114 assert!(
1115 result
1116 .unwrap_err()
1117 .to_string()
1118 .contains("QUIC transport not available")
1119 );
1120 Ok(())
1121 }
1122
1123 #[tokio::test]
1124 async fn test_quic_transport_with_registration() -> Result<()> {
1125 let mut manager =
1126 TransportManager::new(TransportSelection::QUIC, TransportOptions::default());
1127 let quic_transport = Arc::new(MockTransport::new(TransportType::QUIC));
1128
1129 manager.register_transport(quic_transport);
1130
1131 let addr = "127.0.0.1:9015".parse::<NetworkAddress>().map_err(|e| {
1132 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1133 })?;
1134 let selected = manager.select_transport(&addr).await?;
1135
1136 assert_eq!(selected, TransportType::QUIC);
1138
1139 Ok(())
1140 }
1141
1142 #[tokio::test]
1143 async fn test_mock_connection_lifecycle() -> Result<()> {
1144 let mut conn =
1145 MockConnection::new("127.0.0.1:9016".parse::<NetworkAddress>().map_err(|e| {
1146 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1147 })?);
1148
1149 assert!(conn.is_alive().await);
1150
1151 conn.send(b"test message").await?;
1153 assert_eq!(conn.bytes_sent.load(Ordering::Relaxed), 12);
1154
1155 let received = conn.receive().await?;
1157 assert_eq!(received, b"mock_response");
1158 assert_eq!(conn.bytes_received.load(Ordering::Relaxed), 13);
1159
1160 let info = conn.info().await;
1162 assert_eq!(info.transport_type, TransportType::QUIC);
1163 assert!(info.is_encrypted);
1164
1165 let quality = conn.measure_quality().await?;
1167 assert_eq!(quality.latency, Duration::from_millis(10));
1168
1169 conn.close().await?;
1171 assert!(!conn.is_alive().await);
1172
1173 let result = conn.send(b"test").await;
1175 assert!(result.is_err());
1176
1177 Ok(())
1178 }
1179
1180 #[tokio::test]
1181 async fn test_connection_pool_max_connections() -> Result<()> {
1182 let mut pool = ConnectionPool::new(2); let conn1 = Box::new(MockConnection::new(
1186 "127.0.0.1:9017".parse::<NetworkAddress>().map_err(|e| {
1187 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1188 })?,
1189 ));
1190 pool.add_connection(conn1).await?;
1191 assert_eq!(pool.connections.len(), 1);
1192
1193 let conn2 = Box::new(MockConnection::new(
1195 "127.0.0.1:9018".parse::<NetworkAddress>().map_err(|e| {
1196 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1197 })?,
1198 ));
1199 pool.add_connection(conn2).await?;
1200 assert_eq!(pool.connections.len(), 2);
1201
1202 let conn3 = Box::new(MockConnection::new(
1204 "127.0.0.1:9019".parse::<NetworkAddress>().map_err(|e| {
1205 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1206 })?,
1207 ));
1208 pool.add_connection(conn3).await?;
1209 assert_eq!(pool.connections.len(), 2);
1210
1211 Ok(())
1212 }
1213
1214 #[tokio::test]
1215 async fn test_connection_pool_round_robin() -> Result<()> {
1216 let mut pool = ConnectionPool::new(3);
1217
1218 for i in 0..3 {
1220 let conn = Box::new(MockConnection::new(
1221 format!("127.0.0.1:{}", 9020 + i)
1222 .parse()
1223 .expect("Test address should be valid"),
1224 ));
1225 pool.add_connection(conn).await?;
1226 }
1227
1228 let conn1 = pool.get_connection()?;
1230 let conn2 = pool.get_connection()?;
1231 let conn3 = pool.get_connection()?;
1232 let conn4 = pool.get_connection()?; assert_ne!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn2));
1236 assert_ne!(Arc::as_ptr(&conn2), Arc::as_ptr(&conn3));
1237 assert_eq!(Arc::as_ptr(&conn1), Arc::as_ptr(&conn4));
1239
1240 Ok(())
1241 }
1242
1243 #[tokio::test]
1244 async fn test_connection_pool_empty() {
1245 let mut pool = ConnectionPool::new(3);
1246 let result = pool.get_connection();
1247
1248 assert!(result.is_err());
1249 if let Err(e) = result {
1250 assert!(e.to_string().contains("No connections available"));
1251 }
1252 }
1253
1254 #[tokio::test]
1255 async fn test_transport_message_structure() {
1256 let message = TransportMessage {
1257 sender: "test_peer".to_string(),
1258 data: vec![1, 2, 3, 4],
1259 protocol: "/p2p/test/1.0.0".to_string(),
1260 received_at: Instant::now(),
1261 };
1262
1263 assert_eq!(message.sender, "test_peer");
1264 assert_eq!(message.data, vec![1, 2, 3, 4]);
1265 assert_eq!(message.protocol, "/p2p/test/1.0.0");
1266 }
1267
1268 #[tokio::test]
1269 async fn test_mock_transport_address_support() -> Result<()> {
1270 let transport = MockTransport::new(TransportType::QUIC);
1271
1272 let addr1 = "127.0.0.1:9000".parse::<NetworkAddress>().map_err(|e| {
1273 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1274 })?;
1275 let addr2 = "[::1]:9000".parse::<NetworkAddress>().map_err(|e| {
1276 P2PError::Network(NetworkError::InvalidAddress(format!("{}", e).into()))
1277 })?;
1278
1279 assert!(transport.supports_address(&addr1)); assert!(transport.supports_address(&addr2)); assert!(transport.supports_address(&addr1)); let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1284 assert!(limited_transport.supports_address(&addr1)); assert!(limited_transport.supports_address(&addr2)); Ok(())
1287 }
1288
1289 #[tokio::test]
1290 async fn test_mock_transport_supported_addresses() -> Result<()> {
1291 let transport = MockTransport::new(TransportType::QUIC);
1292 let supports_ipv6 = transport.supports_ipv6();
1293
1294 assert!(supports_ipv6);
1296
1297 let limited_transport = MockTransport::new(TransportType::QUIC).with_limited_support();
1298 let limited_supports_ipv6 = limited_transport.supports_ipv6();
1299
1300 assert!(limited_supports_ipv6);
1302 Ok(())
1303 }
1304
1305 #[tokio::test]
1306 async fn test_transport_options_configuration() -> Result<()> {
1307 let options = TransportOptions {
1308 enable_0rtt: false,
1309 require_encryption: false,
1310 connect_timeout: Duration::from_secs(10),
1311 keep_alive: Duration::from_secs(30),
1312 max_message_size: 1024,
1313 };
1314
1315 assert!(!options.enable_0rtt);
1316 assert!(!options.require_encryption);
1317 assert_eq!(options.connect_timeout, Duration::from_secs(10));
1318 assert_eq!(options.keep_alive, Duration::from_secs(30));
1319 assert_eq!(options.max_message_size, 1024);
1320 Ok(())
1321 }
1322}