1use anyhow::{Context, Result};
112use quinn::{ClientConfig, Endpoint, ServerConfig};
113use rustls::pki_types::{CertificateDer, PrivateKeyDer};
114use std::net::SocketAddr;
115use std::sync::Arc;
116use std::sync::atomic::{AtomicU64, Ordering};
117use std::time::Duration;
118use tokio::sync::RwLock;
119
120#[derive(Debug, Clone)]
125#[must_use]
126pub struct QuicConfig {
127 pub max_concurrent_bidi_streams: u64,
129 pub max_concurrent_uni_streams: u64,
131 pub max_idle_timeout: Duration,
133 pub keep_alive_interval: Duration,
135 pub max_udp_payload_size: u16,
137 pub initial_max_data: u64,
139 pub initial_max_stream_data_bidi_local: u64,
141 pub initial_max_stream_data_bidi_remote: u64,
142 pub initial_max_stream_data_uni: u64,
143 pub enable_migration: bool,
145 pub enable_0rtt: bool,
147}
148
149impl Default for QuicConfig {
150 #[inline]
151 fn default() -> Self {
152 Self {
153 max_concurrent_bidi_streams: 100,
154 max_concurrent_uni_streams: 100,
155 max_idle_timeout: Duration::from_secs(30),
156 keep_alive_interval: Duration::from_secs(5),
157 max_udp_payload_size: 1350,
158 initial_max_data: 10 * 1024 * 1024, initial_max_stream_data_bidi_local: 1024 * 1024, initial_max_stream_data_bidi_remote: 1024 * 1024, initial_max_stream_data_uni: 1024 * 1024, enable_migration: true,
163 enable_0rtt: false,
164 }
165 }
166}
167
168impl QuicConfig {
169 #[inline]
171 #[must_use]
172 pub fn builder() -> QuicConfigBuilder {
173 QuicConfigBuilder::default()
174 }
175}
176
177#[derive(Debug, Default)]
179pub struct QuicConfigBuilder {
180 config: QuicConfig,
181}
182
183impl QuicConfigBuilder {
184 #[inline]
186 #[must_use]
187 pub fn with_max_concurrent_streams(mut self, count: u64) -> Self {
188 self.config.max_concurrent_bidi_streams = count;
189 self.config.max_concurrent_uni_streams = count;
190 self
191 }
192
193 #[inline]
195 #[must_use]
196 pub fn with_max_idle_timeout(mut self, timeout: Duration) -> Self {
197 self.config.max_idle_timeout = timeout;
198 self
199 }
200
201 #[inline]
203 #[must_use]
204 pub fn with_keep_alive_interval(mut self, interval: Duration) -> Self {
205 self.config.keep_alive_interval = interval;
206 self
207 }
208
209 #[inline]
211 #[must_use]
212 pub fn with_max_udp_payload_size(mut self, size: u16) -> Self {
213 self.config.max_udp_payload_size = size;
214 self
215 }
216
217 #[inline]
219 #[must_use]
220 pub fn with_initial_max_data(mut self, size: u64) -> Self {
221 self.config.initial_max_data = size;
222 self
223 }
224
225 #[inline]
227 #[must_use]
228 pub fn with_migration(mut self, enable: bool) -> Self {
229 self.config.enable_migration = enable;
230 self
231 }
232
233 #[inline]
235 #[must_use]
236 pub fn with_0rtt(mut self, enable: bool) -> Self {
237 self.config.enable_0rtt = enable;
238 self
239 }
240
241 #[inline]
243 pub fn build(self) -> QuicConfig {
244 self.config
245 }
246}
247
248pub struct QuicEndpoint {
253 endpoint: Endpoint,
254 stats: Arc<QuicStats>,
255 #[allow(dead_code)]
256 config: QuicConfig,
257}
258
259impl QuicEndpoint {
260 pub async fn server(addr: &str, config: QuicConfig) -> Result<Self> {
271 let addr: SocketAddr = addr.parse().context("Invalid server address")?;
272
273 let (cert, key) = generate_self_signed_cert()?;
275
276 let mut server_config = ServerConfig::with_single_cert(vec![cert], key)
278 .context("Failed to create server config")?;
279
280 let mut transport_config = quinn::TransportConfig::default();
281 transport_config.max_concurrent_bidi_streams(
282 config
283 .max_concurrent_bidi_streams
284 .try_into()
285 .unwrap_or(100u32.into()),
286 );
287 transport_config.max_concurrent_uni_streams(
288 config
289 .max_concurrent_uni_streams
290 .try_into()
291 .unwrap_or(100u32.into()),
292 );
293 transport_config.max_idle_timeout(Some(config.max_idle_timeout.try_into()?));
294 transport_config.keep_alive_interval(Some(config.keep_alive_interval));
295
296 server_config.transport_config(Arc::new(transport_config));
297
298 let endpoint =
299 Endpoint::server(server_config, addr).context("Failed to create server endpoint")?;
300
301 Ok(Self {
302 endpoint,
303 stats: Arc::new(QuicStats::default()),
304 config,
305 })
306 }
307
308 pub async fn client(config: QuicConfig) -> Result<Self> {
318 let mut client_config = ClientConfig::try_with_platform_verifier()
319 .context("Failed to create client config with platform verifier")?;
320
321 let mut transport_config = quinn::TransportConfig::default();
322 transport_config.max_concurrent_bidi_streams(
323 config
324 .max_concurrent_bidi_streams
325 .try_into()
326 .unwrap_or(100u32.into()),
327 );
328 transport_config.max_concurrent_uni_streams(
329 config
330 .max_concurrent_uni_streams
331 .try_into()
332 .unwrap_or(100u32.into()),
333 );
334 transport_config.max_idle_timeout(Some(config.max_idle_timeout.try_into()?));
335 transport_config.keep_alive_interval(Some(config.keep_alive_interval));
336
337 client_config.transport_config(Arc::new(transport_config));
338
339 let mut endpoint = Endpoint::client("0.0.0.0:0".parse()?)?;
340 endpoint.set_default_client_config(client_config);
341
342 Ok(Self {
343 endpoint,
344 stats: Arc::new(QuicStats::default()),
345 config,
346 })
347 }
348
349 #[inline]
356 pub async fn accept(&mut self) -> Option<IncomingConnection> {
357 self.endpoint.accept().await.map(|incoming| {
358 self.stats
359 .connections_accepted
360 .fetch_add(1, Ordering::Relaxed);
361 IncomingConnection {
362 incoming,
363 stats: Arc::clone(&self.stats),
364 }
365 })
366 }
367
368 pub async fn connect(&self, addr: &str, server_name: &str) -> Result<QuicConnection> {
379 let addr: SocketAddr = addr.parse().context("Invalid server address")?;
380
381 let connecting = self
382 .endpoint
383 .connect(addr, server_name)
384 .context("Failed to initiate connection")?;
385
386 self.stats
387 .connections_initiated
388 .fetch_add(1, Ordering::Relaxed);
389
390 let connection = connecting.await.context("Failed to establish connection")?;
391
392 self.stats
393 .connections_established
394 .fetch_add(1, Ordering::Relaxed);
395
396 Ok(QuicConnection {
397 connection,
398 stats: Arc::clone(&self.stats),
399 })
400 }
401
402 #[inline]
404 #[must_use]
405 pub fn stats(&self) -> QuicStats {
406 (*self.stats).clone()
407 }
408
409 #[inline]
411 #[must_use]
412 pub fn local_addr(&self) -> Option<SocketAddr> {
413 self.endpoint.local_addr().ok()
414 }
415
416 pub fn close(&self, error_code: u32, reason: &[u8]) {
423 self.endpoint.close(error_code.into(), reason);
424 }
425}
426
427pub struct IncomingConnection {
429 incoming: quinn::Incoming,
430 stats: Arc<QuicStats>,
431}
432
433impl IncomingConnection {
434 pub async fn accept(self) -> Result<QuicConnection> {
440 let connection = self.incoming.await.context("Failed to accept connection")?;
441
442 self.stats
443 .connections_established
444 .fetch_add(1, Ordering::Relaxed);
445
446 Ok(QuicConnection {
447 connection,
448 stats: self.stats,
449 })
450 }
451
452 #[inline]
454 #[must_use]
455 pub fn remote_address(&self) -> SocketAddr {
456 self.incoming.remote_address()
457 }
458}
459
460pub struct QuicConnection {
465 connection: quinn::Connection,
466 stats: Arc<QuicStats>,
467}
468
469impl QuicConnection {
470 pub async fn open_bidirectional_stream(&self) -> Result<QuicStream> {
476 let (send, recv) = self
477 .connection
478 .open_bi()
479 .await
480 .context("Failed to open bidirectional stream")?;
481
482 self.stats.streams_opened.fetch_add(1, Ordering::Relaxed);
483
484 Ok(QuicStream {
485 send: Some(send),
486 recv: Some(recv),
487 stats: Arc::clone(&self.stats),
488 })
489 }
490
491 pub async fn open_unidirectional_stream(&self) -> Result<QuicSendStream> {
497 let send = self
498 .connection
499 .open_uni()
500 .await
501 .context("Failed to open unidirectional stream")?;
502
503 self.stats.streams_opened.fetch_add(1, Ordering::Relaxed);
504
505 Ok(QuicSendStream {
506 send,
507 stats: Arc::clone(&self.stats),
508 })
509 }
510
511 pub async fn accept_bidirectional_stream(&self) -> Option<QuicStream> {
518 self.connection.accept_bi().await.ok().map(|(send, recv)| {
519 self.stats.streams_accepted.fetch_add(1, Ordering::Relaxed);
520 QuicStream {
521 send: Some(send),
522 recv: Some(recv),
523 stats: Arc::clone(&self.stats),
524 }
525 })
526 }
527
528 pub async fn accept_unidirectional_stream(&self) -> Option<QuicRecvStream> {
535 self.connection.accept_uni().await.ok().map(|recv| {
536 self.stats.streams_accepted.fetch_add(1, Ordering::Relaxed);
537 QuicRecvStream {
538 recv,
539 stats: Arc::clone(&self.stats),
540 }
541 })
542 }
543
544 #[inline]
546 #[must_use]
547 pub fn remote_address(&self) -> SocketAddr {
548 self.connection.remote_address()
549 }
550
551 pub fn close(&self, error_code: u32, reason: &[u8]) {
558 self.connection.close(error_code.into(), reason);
559 self.stats
560 .connections_closed
561 .fetch_add(1, Ordering::Relaxed);
562 }
563
564 #[inline]
566 #[must_use]
567 pub fn stats(&self) -> QuicStats {
568 (*self.stats).clone()
569 }
570}
571
572pub struct QuicStream {
576 send: Option<quinn::SendStream>,
577 recv: Option<quinn::RecvStream>,
578 stats: Arc<QuicStats>,
579}
580
581impl QuicStream {
582 pub async fn send(&mut self, data: &[u8]) -> Result<()> {
588 let send = self.send.as_mut().context("Send stream already closed")?;
589
590 send.write_all(data).await.context("Failed to send data")?;
591
592 self.stats
593 .bytes_sent
594 .fetch_add(data.len() as u64, Ordering::Relaxed);
595
596 Ok(())
597 }
598
599 pub async fn finish(&mut self) -> Result<()> {
605 if let Some(mut send) = self.send.take() {
606 send.finish().context("Failed to finish stream")?;
607 }
608 Ok(())
609 }
610
611 pub async fn receive(&mut self, buffer: &mut [u8]) -> Result<usize> {
625 let recv = self
626 .recv
627 .as_mut()
628 .context("Receive stream already closed")?;
629
630 let len = recv
631 .read(buffer)
632 .await
633 .context("Failed to receive data")?
634 .unwrap_or(0);
635
636 self.stats
637 .bytes_received
638 .fetch_add(len as u64, Ordering::Relaxed);
639
640 Ok(len)
641 }
642
643 pub async fn receive_all(&mut self) -> Result<Vec<u8>> {
653 let recv = self
654 .recv
655 .as_mut()
656 .context("Receive stream already closed")?;
657
658 let data = recv
659 .read_to_end(10 * 1024 * 1024) .await
661 .context("Failed to receive all data")?;
662
663 self.stats
664 .bytes_received
665 .fetch_add(data.len() as u64, Ordering::Relaxed);
666
667 Ok(data)
668 }
669}
670
671impl Drop for QuicStream {
672 fn drop(&mut self) {
673 self.stats.streams_closed.fetch_add(1, Ordering::Relaxed);
674 }
675}
676
677pub struct QuicSendStream {
679 send: quinn::SendStream,
680 stats: Arc<QuicStats>,
681}
682
683impl QuicSendStream {
684 pub async fn send(&mut self, data: &[u8]) -> Result<()> {
690 self.send
691 .write_all(data)
692 .await
693 .context("Failed to send data")?;
694
695 self.stats
696 .bytes_sent
697 .fetch_add(data.len() as u64, Ordering::Relaxed);
698
699 Ok(())
700 }
701
702 pub async fn finish(mut self) -> Result<()> {
708 self.send.finish().context("Failed to finish stream")?;
709 Ok(())
710 }
711}
712
713impl Drop for QuicSendStream {
714 fn drop(&mut self) {
715 self.stats.streams_closed.fetch_add(1, Ordering::Relaxed);
716 }
717}
718
719pub struct QuicRecvStream {
721 recv: quinn::RecvStream,
722 stats: Arc<QuicStats>,
723}
724
725impl QuicRecvStream {
726 pub async fn receive(&mut self, buffer: &mut [u8]) -> Result<usize> {
740 let len = self
741 .recv
742 .read(buffer)
743 .await
744 .context("Failed to receive data")?
745 .unwrap_or(0);
746
747 self.stats
748 .bytes_received
749 .fetch_add(len as u64, Ordering::Relaxed);
750
751 Ok(len)
752 }
753
754 pub async fn receive_all(mut self) -> Result<Vec<u8>> {
764 let data = self
765 .recv
766 .read_to_end(10 * 1024 * 1024) .await
768 .context("Failed to receive all data")?;
769
770 self.stats
771 .bytes_received
772 .fetch_add(data.len() as u64, Ordering::Relaxed);
773
774 Ok(data)
775 }
776}
777
778impl Drop for QuicRecvStream {
779 fn drop(&mut self) {
780 self.stats.streams_closed.fetch_add(1, Ordering::Relaxed);
781 }
782}
783
784#[derive(Debug, Default)]
788pub struct QuicStats {
789 pub connections_initiated: AtomicU64,
791 pub connections_accepted: AtomicU64,
793 pub connections_established: AtomicU64,
795 pub connections_closed: AtomicU64,
797 pub streams_opened: AtomicU64,
799 pub streams_accepted: AtomicU64,
801 pub streams_closed: AtomicU64,
803 pub bytes_sent: AtomicU64,
805 pub bytes_received: AtomicU64,
807}
808
809impl Clone for QuicStats {
810 fn clone(&self) -> Self {
811 Self {
812 connections_initiated: AtomicU64::new(
813 self.connections_initiated.load(Ordering::Relaxed),
814 ),
815 connections_accepted: AtomicU64::new(self.connections_accepted.load(Ordering::Relaxed)),
816 connections_established: AtomicU64::new(
817 self.connections_established.load(Ordering::Relaxed),
818 ),
819 connections_closed: AtomicU64::new(self.connections_closed.load(Ordering::Relaxed)),
820 streams_opened: AtomicU64::new(self.streams_opened.load(Ordering::Relaxed)),
821 streams_accepted: AtomicU64::new(self.streams_accepted.load(Ordering::Relaxed)),
822 streams_closed: AtomicU64::new(self.streams_closed.load(Ordering::Relaxed)),
823 bytes_sent: AtomicU64::new(self.bytes_sent.load(Ordering::Relaxed)),
824 bytes_received: AtomicU64::new(self.bytes_received.load(Ordering::Relaxed)),
825 }
826 }
827}
828
829impl QuicStats {
830 #[inline]
832 #[must_use]
833 pub fn active_connections(&self) -> u64 {
834 let established = self.connections_established.load(Ordering::Relaxed);
835 let closed = self.connections_closed.load(Ordering::Relaxed);
836 established.saturating_sub(closed)
837 }
838
839 #[inline]
841 #[must_use]
842 pub fn active_streams(&self) -> u64 {
843 let opened = self.streams_opened.load(Ordering::Relaxed);
844 let accepted = self.streams_accepted.load(Ordering::Relaxed);
845 let closed = self.streams_closed.load(Ordering::Relaxed);
846 (opened + accepted).saturating_sub(closed)
847 }
848
849 #[inline]
851 #[must_use]
852 pub fn total_bytes(&self) -> u64 {
853 self.bytes_sent.load(Ordering::Relaxed) + self.bytes_received.load(Ordering::Relaxed)
854 }
855}
856
857pub struct QuicConnectionPool {
861 connections: Arc<RwLock<Vec<QuicConnection>>>,
862 endpoint: Arc<QuicEndpoint>,
863 server_addr: String,
864 server_name: String,
865 max_connections: usize,
866}
867
868impl QuicConnectionPool {
869 #[must_use]
878 pub fn new(
879 endpoint: QuicEndpoint,
880 server_addr: String,
881 server_name: String,
882 max_connections: usize,
883 ) -> Self {
884 Self {
885 connections: Arc::new(RwLock::new(Vec::new())),
886 endpoint: Arc::new(endpoint),
887 server_addr,
888 server_name,
889 max_connections,
890 }
891 }
892
893 pub async fn get_connection(&self) -> Result<QuicConnection> {
899 {
901 let mut connections = self.connections.write().await;
902 if let Some(conn) = connections.pop() {
903 return Ok(conn);
904 }
905 }
906
907 let connection = self
909 .endpoint
910 .connect(&self.server_addr, &self.server_name)
911 .await?;
912
913 Ok(connection)
914 }
915
916 pub async fn return_connection(&self, connection: QuicConnection) {
922 let mut connections = self.connections.write().await;
923 if connections.len() < self.max_connections {
924 connections.push(connection);
925 }
926 }
928
929 #[must_use]
931 pub async fn stats(&self) -> PoolStats {
932 let connections = self.connections.read().await;
933 PoolStats {
934 pooled_connections: connections.len(),
935 max_connections: self.max_connections,
936 }
937 }
938}
939
940#[derive(Debug, Clone)]
942pub struct PoolStats {
943 pub pooled_connections: usize,
945 pub max_connections: usize,
947}
948
949fn generate_self_signed_cert() -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> {
955 let certified_key = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
956 .context("Failed to generate certificate")?;
957
958 let key = PrivateKeyDer::Pkcs8(certified_key.signing_key.serialize_der().into());
959 let cert_der = CertificateDer::from(certified_key.cert);
960
961 Ok((cert_der, key))
962}
963
964#[cfg(test)]
965mod tests {
966 use super::*;
967 use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
968 use rustls::pki_types::{ServerName, UnixTime};
969
970 async fn create_insecure_client(config: QuicConfig) -> Result<QuicEndpoint> {
972 let _ = rustls::crypto::ring::default_provider().install_default();
974
975 #[derive(Debug)]
977 struct SkipServerVerification;
978
979 impl ServerCertVerifier for SkipServerVerification {
980 fn verify_server_cert(
981 &self,
982 _end_entity: &CertificateDer<'_>,
983 _intermediates: &[CertificateDer<'_>],
984 _server_name: &ServerName<'_>,
985 _ocsp_response: &[u8],
986 _now: UnixTime,
987 ) -> Result<ServerCertVerified, rustls::Error> {
988 Ok(ServerCertVerified::assertion())
989 }
990
991 fn verify_tls12_signature(
992 &self,
993 _message: &[u8],
994 _cert: &CertificateDer<'_>,
995 _dss: &rustls::DigitallySignedStruct,
996 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
997 {
998 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
999 }
1000
1001 fn verify_tls13_signature(
1002 &self,
1003 _message: &[u8],
1004 _cert: &CertificateDer<'_>,
1005 _dss: &rustls::DigitallySignedStruct,
1006 ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
1007 {
1008 Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1009 }
1010
1011 fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1012 vec![
1013 rustls::SignatureScheme::RSA_PKCS1_SHA256,
1014 rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1015 rustls::SignatureScheme::ED25519,
1016 ]
1017 }
1018 }
1019
1020 let crypto = rustls::ClientConfig::builder()
1021 .dangerous()
1022 .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
1023 .with_no_client_auth();
1024
1025 let mut client_config = ClientConfig::new(Arc::new(
1026 quinn::crypto::rustls::QuicClientConfig::try_from(crypto)?,
1027 ));
1028
1029 let mut transport_config = quinn::TransportConfig::default();
1030 transport_config.max_concurrent_bidi_streams(
1031 config
1032 .max_concurrent_bidi_streams
1033 .try_into()
1034 .unwrap_or(100u32.into()),
1035 );
1036 transport_config.max_concurrent_uni_streams(
1037 config
1038 .max_concurrent_uni_streams
1039 .try_into()
1040 .unwrap_or(100u32.into()),
1041 );
1042 transport_config.max_idle_timeout(Some(config.max_idle_timeout.try_into()?));
1043 transport_config.keep_alive_interval(Some(config.keep_alive_interval));
1044
1045 client_config.transport_config(Arc::new(transport_config));
1046
1047 let mut endpoint = Endpoint::client("0.0.0.0:0".parse()?)?;
1048 endpoint.set_default_client_config(client_config);
1049
1050 Ok(QuicEndpoint {
1051 endpoint,
1052 stats: Arc::new(QuicStats::default()),
1053 config,
1054 })
1055 }
1056
1057 #[test]
1058 fn test_config_builder() {
1059 let config = QuicConfig::builder()
1060 .with_max_concurrent_streams(200)
1061 .with_max_idle_timeout(Duration::from_secs(60))
1062 .with_keep_alive_interval(Duration::from_secs(10))
1063 .with_migration(false)
1064 .with_0rtt(true)
1065 .build();
1066
1067 assert_eq!(config.max_concurrent_bidi_streams, 200);
1068 assert_eq!(config.max_concurrent_uni_streams, 200);
1069 assert_eq!(config.max_idle_timeout, Duration::from_secs(60));
1070 assert_eq!(config.keep_alive_interval, Duration::from_secs(10));
1071 assert!(!config.enable_migration);
1072 assert!(config.enable_0rtt);
1073 }
1074
1075 #[test]
1076 fn test_default_config() {
1077 let config = QuicConfig::default();
1078 assert_eq!(config.max_concurrent_bidi_streams, 100);
1079 assert_eq!(config.max_idle_timeout, Duration::from_secs(30));
1080 assert!(config.enable_migration);
1081 assert!(!config.enable_0rtt);
1082 }
1083
1084 #[test]
1085 fn test_stats_calculations() {
1086 let stats = QuicStats::default();
1087
1088 stats.connections_established.store(10, Ordering::Relaxed);
1089 stats.connections_closed.store(3, Ordering::Relaxed);
1090 assert_eq!(stats.active_connections(), 7);
1091
1092 stats.streams_opened.store(20, Ordering::Relaxed);
1093 stats.streams_accepted.store(15, Ordering::Relaxed);
1094 stats.streams_closed.store(10, Ordering::Relaxed);
1095 assert_eq!(stats.active_streams(), 25);
1096
1097 stats.bytes_sent.store(1000, Ordering::Relaxed);
1098 stats.bytes_received.store(2000, Ordering::Relaxed);
1099 assert_eq!(stats.total_bytes(), 3000);
1100 }
1101
1102 #[tokio::test]
1103 async fn test_server_creation() {
1104 let config = QuicConfig::default();
1105 let result = QuicEndpoint::server("127.0.0.1:0", config).await;
1106 assert!(result.is_ok());
1107
1108 let endpoint = result.unwrap();
1109 assert!(endpoint.local_addr().is_some());
1110 }
1111
1112 #[tokio::test]
1113 async fn test_client_creation() {
1114 let config = QuicConfig::default();
1115 let result = QuicEndpoint::client(config).await;
1116 assert!(result.is_ok());
1117 }
1118
1119 #[tokio::test]
1120 async fn test_connection_pool_creation() {
1121 let config = QuicConfig::default();
1122 let endpoint = QuicEndpoint::client(config).await.unwrap();
1123
1124 let pool = QuicConnectionPool::new(
1125 endpoint,
1126 "127.0.0.1:4433".to_string(),
1127 "localhost".to_string(),
1128 10,
1129 );
1130
1131 let stats = pool.stats().await;
1132 assert_eq!(stats.pooled_connections, 0);
1133 assert_eq!(stats.max_connections, 10);
1134 }
1135
1136 #[tokio::test]
1137 async fn test_server_client_communication() {
1138 let server_config = QuicConfig::default();
1140 let mut server = QuicEndpoint::server("127.0.0.1:0", server_config)
1141 .await
1142 .unwrap();
1143
1144 let server_addr = server.local_addr().unwrap();
1145
1146 let server_task = tokio::spawn(async move {
1148 if let Some(incoming) = server.accept().await {
1149 let connection = incoming.accept().await.unwrap();
1150 if let Some(mut stream) = connection.accept_bidirectional_stream().await {
1151 let received_data = stream.receive_all().await.unwrap();
1153 let received = String::from_utf8_lossy(&received_data);
1154
1155 stream.send(b"Hello, Client!").await.unwrap();
1157 stream.finish().await.unwrap();
1158
1159 tokio::time::sleep(Duration::from_millis(100)).await;
1161
1162 received.to_string()
1163 } else {
1164 String::new()
1165 }
1166 } else {
1167 String::new()
1168 }
1169 });
1170
1171 tokio::time::sleep(Duration::from_millis(100)).await;
1173
1174 let client_config = QuicConfig::default();
1176 let client = create_insecure_client(client_config).await.unwrap();
1177
1178 let connection = client
1180 .connect(&server_addr.to_string(), "localhost")
1181 .await
1182 .unwrap();
1183
1184 let mut stream = connection.open_bidirectional_stream().await.unwrap();
1186 stream.send(b"Hello, Server!").await.unwrap();
1187 stream.finish().await.unwrap();
1188
1189 let response = stream.receive_all().await.unwrap();
1191 assert_eq!(response, b"Hello, Client!");
1192
1193 let server_received = server_task.await.unwrap();
1195 assert_eq!(server_received, "Hello, Server!");
1196 }
1197
1198 #[test]
1199 fn test_certificate_generation() {
1200 let result = generate_self_signed_cert();
1201 assert!(result.is_ok());
1202 }
1203
1204 #[tokio::test]
1205 async fn test_stream_statistics() {
1206 let config = QuicConfig::default();
1207 let mut server = QuicEndpoint::server("127.0.0.1:0", config.clone())
1208 .await
1209 .unwrap();
1210
1211 let server_addr = server.local_addr().unwrap();
1212
1213 tokio::spawn(async move {
1214 if let Some(incoming) = server.accept().await {
1215 let _ = incoming.accept().await;
1216 }
1217 });
1218
1219 tokio::time::sleep(Duration::from_millis(50)).await;
1220
1221 let client = create_insecure_client(config).await.unwrap();
1222 let connection = client
1223 .connect(&server_addr.to_string(), "localhost")
1224 .await
1225 .unwrap();
1226
1227 let stats_before = connection.stats();
1228 let initial_streams = stats_before.streams_opened.load(Ordering::Relaxed);
1229
1230 let _stream = connection.open_bidirectional_stream().await.unwrap();
1231
1232 let stats_after = connection.stats();
1233 assert_eq!(
1234 stats_after.streams_opened.load(Ordering::Relaxed),
1235 initial_streams + 1
1236 );
1237 }
1238
1239 #[tokio::test]
1240 async fn test_multiple_streams() {
1241 let config = QuicConfig::default();
1242 let mut server = QuicEndpoint::server("127.0.0.1:0", config.clone())
1243 .await
1244 .unwrap();
1245
1246 let server_addr = server.local_addr().unwrap();
1247
1248 tokio::spawn(async move {
1249 if let Some(incoming) = server.accept().await {
1250 let connection = incoming.accept().await.unwrap();
1251 for _ in 0..3 {
1252 if let Some(mut stream) = connection.accept_bidirectional_stream().await {
1253 let _ = stream.receive_all().await;
1255 let _ = stream.send(b"ACK").await;
1257 let _ = stream.finish().await;
1258 }
1259 }
1260 tokio::time::sleep(Duration::from_millis(100)).await;
1262 }
1263 });
1264
1265 tokio::time::sleep(Duration::from_millis(50)).await;
1266
1267 let client = create_insecure_client(config).await.unwrap();
1268 let connection = client
1269 .connect(&server_addr.to_string(), "localhost")
1270 .await
1271 .unwrap();
1272
1273 for i in 0..3 {
1275 let mut stream = connection.open_bidirectional_stream().await.unwrap();
1276 stream
1277 .send(format!("Message {}", i).as_bytes())
1278 .await
1279 .unwrap();
1280 stream.finish().await.unwrap();
1281
1282 let response = stream.receive_all().await.unwrap();
1283 assert_eq!(response, b"ACK");
1284 }
1285
1286 let stats = connection.stats();
1287 assert!(stats.streams_opened.load(Ordering::Relaxed) >= 3);
1288 }
1289
1290 #[tokio::test]
1291 async fn test_connection_close() {
1292 let config = QuicConfig::default();
1293 let client = QuicEndpoint::client(config).await.unwrap();
1294 let initial_stats = client.stats();
1295
1296 client.close(0, b"test close");
1297
1298 let final_stats = client.stats();
1300 assert_eq!(
1301 initial_stats.connections_initiated.load(Ordering::Relaxed),
1302 final_stats.connections_initiated.load(Ordering::Relaxed)
1303 );
1304 }
1305
1306 #[tokio::test]
1307 async fn test_unidirectional_streams() {
1308 let config = QuicConfig::default();
1309 let mut server = QuicEndpoint::server("127.0.0.1:0", config.clone())
1310 .await
1311 .unwrap();
1312
1313 let server_addr = server.local_addr().unwrap();
1314
1315 tokio::spawn(async move {
1316 if let Some(incoming) = server.accept().await {
1317 let connection = incoming.accept().await.unwrap();
1318 if let Some(stream) = connection.accept_unidirectional_stream().await {
1319 let data = stream.receive_all().await.unwrap();
1320 assert_eq!(data, b"Unidirectional message");
1321 }
1322 }
1323 });
1324
1325 tokio::time::sleep(Duration::from_millis(50)).await;
1326
1327 let client = create_insecure_client(config).await.unwrap();
1328 let connection = client
1329 .connect(&server_addr.to_string(), "localhost")
1330 .await
1331 .unwrap();
1332
1333 let mut stream = connection.open_unidirectional_stream().await.unwrap();
1334 stream.send(b"Unidirectional message").await.unwrap();
1335 stream.finish().await.unwrap();
1336
1337 tokio::time::sleep(Duration::from_millis(100)).await;
1338 }
1339}