chie_core/
quic_transport.rs

1//! QUIC Transport Integration
2//!
3//! This module provides QUIC (Quick UDP Internet Connections) transport support for
4//! the CHIE protocol, offering improved performance, reliability, and security compared
5//! to traditional TCP-based transports.
6//!
7//! # Features
8//!
9//! - **Modern Transport Protocol**: QUIC combines the best of TCP, TLS, and HTTP/2
10//! - **Zero-RTT Connection Establishment**: Reduced latency for repeat connections
11//! - **Multiplexing**: Multiple streams over a single connection without head-of-line blocking
12//! - **Connection Migration**: Seamless connection handover between networks
13//! - **Built-in Encryption**: TLS 1.3 encryption by default
14//! - **Congestion Control**: Improved algorithms for better throughput
15//! - **Stream Prioritization**: Efficient resource allocation
16//!
17//! # Architecture
18//!
19//! ```text
20//! ┌─────────────────────────────────────────────────────────┐
21//! │                    QuicEndpoint                         │
22//! │  ┌────────────────┐  ┌─────────────────────────┐       │
23//! │  │ Server Binding │  │  Client Configuration   │       │
24//! │  └────────────────┘  └─────────────────────────┘       │
25//! └──────────────────────┬──────────────────────────────────┘
26//!                        │
27//!         ┌──────────────┴──────────────┐
28//!         │                             │
29//!    ┌────▼─────┐                  ┌────▼─────┐
30//!    │Connection│                  │Connection│
31//!    │  Pool    │                  │ Manager  │
32//!    └────┬─────┘                  └────┬─────┘
33//!         │                             │
34//!    ┌────▼──────────────────────────────▼─────┐
35//!    │         Bidirectional Streams           │
36//!    │  ┌──────────┐      ┌──────────┐        │
37//!    │  │  Send    │      │ Receive  │        │
38//!    │  └──────────┘      └──────────┘        │
39//!    └────────────────────────────────────────┘
40//! ```
41//!
42//! # Usage Examples
43//!
44//! ## Server Setup
45//!
46//! ```rust
47//! use chie_core::quic_transport::{QuicConfig, QuicEndpoint};
48//!
49//! # async fn example() -> anyhow::Result<()> {
50//! // Configure QUIC server
51//! let config = QuicConfig::builder()
52//!     .with_max_concurrent_streams(100)
53//!     .with_max_idle_timeout(std::time::Duration::from_secs(30))
54//!     .with_keep_alive_interval(std::time::Duration::from_secs(5))
55//!     .build();
56//!
57//! // Create server endpoint
58//! let mut endpoint = QuicEndpoint::server("127.0.0.1:4433", config).await?;
59//!
60//! // Accept incoming connections
61//! while let Some(connecting) = endpoint.accept().await {
62//!     tokio::spawn(async move {
63//!         if let Ok(connection) = connecting.accept().await {
64//!             // Handle connection
65//!         }
66//!     });
67//! }
68//! # Ok(())
69//! # }
70//! ```
71//!
72//! ## Client Connection
73//!
74//! ```rust
75//! use chie_core::quic_transport::{QuicConfig, QuicEndpoint};
76//!
77//! # async fn example() -> anyhow::Result<()> {
78//! let config = QuicConfig::default();
79//! let endpoint = QuicEndpoint::client(config).await?;
80//!
81//! // Connect to server
82//! let connection = endpoint.connect("127.0.0.1:4433", "localhost").await?;
83//!
84//! // Open bidirectional stream
85//! let mut stream = connection.open_bidirectional_stream().await?;
86//! stream.send(b"Hello, QUIC!").await?;
87//! stream.finish().await?;
88//!
89//! let response = stream.receive_all().await?;
90//! # Ok(())
91//! # }
92//! ```
93//!
94//! ## Stream Communication
95//!
96//! ```rust
97//! # use chie_core::quic_transport::*;
98//! # async fn example(mut stream: QuicStream) -> anyhow::Result<()> {
99//! // Send data
100//! stream.send(b"request data").await?;
101//! stream.finish().await?;
102//!
103//! // Receive response
104//! let mut buffer = vec![0u8; 8192];
105//! let len = stream.receive(&mut buffer).await?;
106//! let response = &buffer[..len];
107//! # Ok(())
108//! # }
109//! ```
110
111use anyhow::{Context, Result};
112use quinn::{ClientConfig, Endpoint, ServerConfig};
113use rustls::pki_types::{CertificateDer, PrivateKeyDer};
114use std::net::SocketAddr;
115use std::sync::Arc;
116use std::sync::atomic::{AtomicU64, Ordering};
117use std::time::Duration;
118use tokio::sync::RwLock;
119
120/// QUIC transport configuration
121///
122/// This type provides comprehensive configuration options for QUIC transport,
123/// including connection limits, timeouts, stream management, and performance tuning.
124#[derive(Debug, Clone)]
125#[must_use]
126pub struct QuicConfig {
127    /// Maximum number of concurrent bidirectional streams per connection
128    pub max_concurrent_bidi_streams: u64,
129    /// Maximum number of concurrent unidirectional streams per connection
130    pub max_concurrent_uni_streams: u64,
131    /// Maximum idle timeout before connection is closed
132    pub max_idle_timeout: Duration,
133    /// Keep-alive interval to prevent idle timeout
134    pub keep_alive_interval: Duration,
135    /// Maximum UDP payload size
136    pub max_udp_payload_size: u16,
137    /// Initial maximum data (flow control window)
138    pub initial_max_data: u64,
139    /// Initial maximum stream data (per-stream flow control)
140    pub initial_max_stream_data_bidi_local: u64,
141    pub initial_max_stream_data_bidi_remote: u64,
142    pub initial_max_stream_data_uni: u64,
143    /// Enable connection migration
144    pub enable_migration: bool,
145    /// Enable 0-RTT (zero round-trip time) for repeat connections
146    pub enable_0rtt: bool,
147}
148
149impl Default for QuicConfig {
150    #[inline]
151    fn default() -> Self {
152        Self {
153            max_concurrent_bidi_streams: 100,
154            max_concurrent_uni_streams: 100,
155            max_idle_timeout: Duration::from_secs(30),
156            keep_alive_interval: Duration::from_secs(5),
157            max_udp_payload_size: 1350,
158            initial_max_data: 10 * 1024 * 1024, // 10 MB
159            initial_max_stream_data_bidi_local: 1024 * 1024, // 1 MB
160            initial_max_stream_data_bidi_remote: 1024 * 1024, // 1 MB
161            initial_max_stream_data_uni: 1024 * 1024, // 1 MB
162            enable_migration: true,
163            enable_0rtt: false,
164        }
165    }
166}
167
168impl QuicConfig {
169    /// Create a new configuration builder
170    #[inline]
171    #[must_use]
172    pub fn builder() -> QuicConfigBuilder {
173        QuicConfigBuilder::default()
174    }
175}
176
177/// Builder for QUIC configuration
178#[derive(Debug, Default)]
179pub struct QuicConfigBuilder {
180    config: QuicConfig,
181}
182
183impl QuicConfigBuilder {
184    /// Set maximum concurrent bidirectional streams
185    #[inline]
186    #[must_use]
187    pub fn with_max_concurrent_streams(mut self, count: u64) -> Self {
188        self.config.max_concurrent_bidi_streams = count;
189        self.config.max_concurrent_uni_streams = count;
190        self
191    }
192
193    /// Set maximum idle timeout
194    #[inline]
195    #[must_use]
196    pub fn with_max_idle_timeout(mut self, timeout: Duration) -> Self {
197        self.config.max_idle_timeout = timeout;
198        self
199    }
200
201    /// Set keep-alive interval
202    #[inline]
203    #[must_use]
204    pub fn with_keep_alive_interval(mut self, interval: Duration) -> Self {
205        self.config.keep_alive_interval = interval;
206        self
207    }
208
209    /// Set maximum UDP payload size
210    #[inline]
211    #[must_use]
212    pub fn with_max_udp_payload_size(mut self, size: u16) -> Self {
213        self.config.max_udp_payload_size = size;
214        self
215    }
216
217    /// Set initial flow control window
218    #[inline]
219    #[must_use]
220    pub fn with_initial_max_data(mut self, size: u64) -> Self {
221        self.config.initial_max_data = size;
222        self
223    }
224
225    /// Enable connection migration
226    #[inline]
227    #[must_use]
228    pub fn with_migration(mut self, enable: bool) -> Self {
229        self.config.enable_migration = enable;
230        self
231    }
232
233    /// Enable 0-RTT connections
234    #[inline]
235    #[must_use]
236    pub fn with_0rtt(mut self, enable: bool) -> Self {
237        self.config.enable_0rtt = enable;
238        self
239    }
240
241    /// Build the configuration
242    #[inline]
243    pub fn build(self) -> QuicConfig {
244        self.config
245    }
246}
247
248/// QUIC endpoint for server or client connections
249///
250/// An endpoint manages the QUIC protocol state and can act as either a server
251/// (accepting incoming connections) or a client (initiating connections).
252pub struct QuicEndpoint {
253    endpoint: Endpoint,
254    stats: Arc<QuicStats>,
255    #[allow(dead_code)]
256    config: QuicConfig,
257}
258
259impl QuicEndpoint {
260    /// Create a server endpoint bound to the specified address
261    ///
262    /// # Arguments
263    ///
264    /// * `addr` - Socket address to bind to (e.g., "0.0.0.0:4433")
265    /// * `config` - QUIC configuration
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if certificate generation fails or binding fails
270    pub async fn server(addr: &str, config: QuicConfig) -> Result<Self> {
271        let addr: SocketAddr = addr.parse().context("Invalid server address")?;
272
273        // Generate self-signed certificate for server
274        let (cert, key) = generate_self_signed_cert()?;
275
276        // Configure server
277        let mut server_config = ServerConfig::with_single_cert(vec![cert], key)
278            .context("Failed to create server config")?;
279
280        let mut transport_config = quinn::TransportConfig::default();
281        transport_config.max_concurrent_bidi_streams(
282            config
283                .max_concurrent_bidi_streams
284                .try_into()
285                .unwrap_or(100u32.into()),
286        );
287        transport_config.max_concurrent_uni_streams(
288            config
289                .max_concurrent_uni_streams
290                .try_into()
291                .unwrap_or(100u32.into()),
292        );
293        transport_config.max_idle_timeout(Some(config.max_idle_timeout.try_into()?));
294        transport_config.keep_alive_interval(Some(config.keep_alive_interval));
295
296        server_config.transport_config(Arc::new(transport_config));
297
298        let endpoint =
299            Endpoint::server(server_config, addr).context("Failed to create server endpoint")?;
300
301        Ok(Self {
302            endpoint,
303            stats: Arc::new(QuicStats::default()),
304            config,
305        })
306    }
307
308    /// Create a client endpoint
309    ///
310    /// # Arguments
311    ///
312    /// * `config` - QUIC configuration
313    ///
314    /// # Errors
315    ///
316    /// Returns an error if client configuration fails
317    pub async fn client(config: QuicConfig) -> Result<Self> {
318        let mut client_config = ClientConfig::try_with_platform_verifier()
319            .context("Failed to create client config with platform verifier")?;
320
321        let mut transport_config = quinn::TransportConfig::default();
322        transport_config.max_concurrent_bidi_streams(
323            config
324                .max_concurrent_bidi_streams
325                .try_into()
326                .unwrap_or(100u32.into()),
327        );
328        transport_config.max_concurrent_uni_streams(
329            config
330                .max_concurrent_uni_streams
331                .try_into()
332                .unwrap_or(100u32.into()),
333        );
334        transport_config.max_idle_timeout(Some(config.max_idle_timeout.try_into()?));
335        transport_config.keep_alive_interval(Some(config.keep_alive_interval));
336
337        client_config.transport_config(Arc::new(transport_config));
338
339        let mut endpoint = Endpoint::client("0.0.0.0:0".parse()?)?;
340        endpoint.set_default_client_config(client_config);
341
342        Ok(Self {
343            endpoint,
344            stats: Arc::new(QuicStats::default()),
345            config,
346        })
347    }
348
349    /// Accept an incoming connection (server-side)
350    ///
351    /// # Returns
352    ///
353    /// Returns `Some(IncomingConnection)` if a connection is available,
354    /// or `None` if the endpoint is closed
355    #[inline]
356    pub async fn accept(&mut self) -> Option<IncomingConnection> {
357        self.endpoint.accept().await.map(|incoming| {
358            self.stats
359                .connections_accepted
360                .fetch_add(1, Ordering::Relaxed);
361            IncomingConnection {
362                incoming,
363                stats: Arc::clone(&self.stats),
364            }
365        })
366    }
367
368    /// Connect to a remote server (client-side)
369    ///
370    /// # Arguments
371    ///
372    /// * `addr` - Server address (e.g., "example.com:4433")
373    /// * `server_name` - Server name for TLS verification
374    ///
375    /// # Errors
376    ///
377    /// Returns an error if connection fails
378    pub async fn connect(&self, addr: &str, server_name: &str) -> Result<QuicConnection> {
379        let addr: SocketAddr = addr.parse().context("Invalid server address")?;
380
381        let connecting = self
382            .endpoint
383            .connect(addr, server_name)
384            .context("Failed to initiate connection")?;
385
386        self.stats
387            .connections_initiated
388            .fetch_add(1, Ordering::Relaxed);
389
390        let connection = connecting.await.context("Failed to establish connection")?;
391
392        self.stats
393            .connections_established
394            .fetch_add(1, Ordering::Relaxed);
395
396        Ok(QuicConnection {
397            connection,
398            stats: Arc::clone(&self.stats),
399        })
400    }
401
402    /// Get endpoint statistics
403    #[inline]
404    #[must_use]
405    pub fn stats(&self) -> QuicStats {
406        (*self.stats).clone()
407    }
408
409    /// Get local socket address
410    #[inline]
411    #[must_use]
412    pub fn local_addr(&self) -> Option<SocketAddr> {
413        self.endpoint.local_addr().ok()
414    }
415
416    /// Close the endpoint
417    ///
418    /// # Arguments
419    ///
420    /// * `error_code` - Error code to send to peers
421    /// * `reason` - Human-readable reason for closure
422    pub fn close(&self, error_code: u32, reason: &[u8]) {
423        self.endpoint.close(error_code.into(), reason);
424    }
425}
426
427/// Incoming connection waiting to be accepted
428pub struct IncomingConnection {
429    incoming: quinn::Incoming,
430    stats: Arc<QuicStats>,
431}
432
433impl IncomingConnection {
434    /// Accept the incoming connection
435    ///
436    /// # Errors
437    ///
438    /// Returns an error if connection acceptance fails
439    pub async fn accept(self) -> Result<QuicConnection> {
440        let connection = self.incoming.await.context("Failed to accept connection")?;
441
442        self.stats
443            .connections_established
444            .fetch_add(1, Ordering::Relaxed);
445
446        Ok(QuicConnection {
447            connection,
448            stats: self.stats,
449        })
450    }
451
452    /// Get remote address of the connecting peer
453    #[inline]
454    #[must_use]
455    pub fn remote_address(&self) -> SocketAddr {
456        self.incoming.remote_address()
457    }
458}
459
460/// Established QUIC connection
461///
462/// Represents an active connection to a peer, supporting multiple
463/// concurrent streams for efficient data transfer.
464pub struct QuicConnection {
465    connection: quinn::Connection,
466    stats: Arc<QuicStats>,
467}
468
469impl QuicConnection {
470    /// Open a new bidirectional stream
471    ///
472    /// # Errors
473    ///
474    /// Returns an error if stream opening fails
475    pub async fn open_bidirectional_stream(&self) -> Result<QuicStream> {
476        let (send, recv) = self
477            .connection
478            .open_bi()
479            .await
480            .context("Failed to open bidirectional stream")?;
481
482        self.stats.streams_opened.fetch_add(1, Ordering::Relaxed);
483
484        Ok(QuicStream {
485            send: Some(send),
486            recv: Some(recv),
487            stats: Arc::clone(&self.stats),
488        })
489    }
490
491    /// Open a new unidirectional stream (send-only)
492    ///
493    /// # Errors
494    ///
495    /// Returns an error if stream opening fails
496    pub async fn open_unidirectional_stream(&self) -> Result<QuicSendStream> {
497        let send = self
498            .connection
499            .open_uni()
500            .await
501            .context("Failed to open unidirectional stream")?;
502
503        self.stats.streams_opened.fetch_add(1, Ordering::Relaxed);
504
505        Ok(QuicSendStream {
506            send,
507            stats: Arc::clone(&self.stats),
508        })
509    }
510
511    /// Accept an incoming bidirectional stream
512    ///
513    /// # Returns
514    ///
515    /// Returns `Some(QuicStream)` if a stream is available,
516    /// or `None` if the connection is closed
517    pub async fn accept_bidirectional_stream(&self) -> Option<QuicStream> {
518        self.connection.accept_bi().await.ok().map(|(send, recv)| {
519            self.stats.streams_accepted.fetch_add(1, Ordering::Relaxed);
520            QuicStream {
521                send: Some(send),
522                recv: Some(recv),
523                stats: Arc::clone(&self.stats),
524            }
525        })
526    }
527
528    /// Accept an incoming unidirectional stream
529    ///
530    /// # Returns
531    ///
532    /// Returns `Some(QuicRecvStream)` if a stream is available,
533    /// or `None` if the connection is closed
534    pub async fn accept_unidirectional_stream(&self) -> Option<QuicRecvStream> {
535        self.connection.accept_uni().await.ok().map(|recv| {
536            self.stats.streams_accepted.fetch_add(1, Ordering::Relaxed);
537            QuicRecvStream {
538                recv,
539                stats: Arc::clone(&self.stats),
540            }
541        })
542    }
543
544    /// Get remote address of the peer
545    #[inline]
546    #[must_use]
547    pub fn remote_address(&self) -> SocketAddr {
548        self.connection.remote_address()
549    }
550
551    /// Close the connection gracefully
552    ///
553    /// # Arguments
554    ///
555    /// * `error_code` - Error code to send to peer
556    /// * `reason` - Human-readable reason for closure
557    pub fn close(&self, error_code: u32, reason: &[u8]) {
558        self.connection.close(error_code.into(), reason);
559        self.stats
560            .connections_closed
561            .fetch_add(1, Ordering::Relaxed);
562    }
563
564    /// Get connection statistics
565    #[inline]
566    #[must_use]
567    pub fn stats(&self) -> QuicStats {
568        (*self.stats).clone()
569    }
570}
571
572/// Bidirectional QUIC stream
573///
574/// Supports both sending and receiving data on the same stream.
575pub struct QuicStream {
576    send: Option<quinn::SendStream>,
577    recv: Option<quinn::RecvStream>,
578    stats: Arc<QuicStats>,
579}
580
581impl QuicStream {
582    /// Send data on the stream
583    ///
584    /// # Errors
585    ///
586    /// Returns an error if sending fails
587    pub async fn send(&mut self, data: &[u8]) -> Result<()> {
588        let send = self.send.as_mut().context("Send stream already closed")?;
589
590        send.write_all(data).await.context("Failed to send data")?;
591
592        self.stats
593            .bytes_sent
594            .fetch_add(data.len() as u64, Ordering::Relaxed);
595
596        Ok(())
597    }
598
599    /// Finish sending (close send side of stream)
600    ///
601    /// # Errors
602    ///
603    /// Returns an error if finishing fails
604    pub async fn finish(&mut self) -> Result<()> {
605        if let Some(mut send) = self.send.take() {
606            send.finish().context("Failed to finish stream")?;
607        }
608        Ok(())
609    }
610
611    /// Receive data from the stream
612    ///
613    /// # Arguments
614    ///
615    /// * `buffer` - Buffer to receive data into
616    ///
617    /// # Returns
618    ///
619    /// Returns the number of bytes received, or 0 if the stream is finished
620    ///
621    /// # Errors
622    ///
623    /// Returns an error if receiving fails
624    pub async fn receive(&mut self, buffer: &mut [u8]) -> Result<usize> {
625        let recv = self
626            .recv
627            .as_mut()
628            .context("Receive stream already closed")?;
629
630        let len = recv
631            .read(buffer)
632            .await
633            .context("Failed to receive data")?
634            .unwrap_or(0);
635
636        self.stats
637            .bytes_received
638            .fetch_add(len as u64, Ordering::Relaxed);
639
640        Ok(len)
641    }
642
643    /// Receive all remaining data from the stream
644    ///
645    /// # Returns
646    ///
647    /// Returns all data received until the stream is finished
648    ///
649    /// # Errors
650    ///
651    /// Returns an error if receiving fails or data exceeds 10MB
652    pub async fn receive_all(&mut self) -> Result<Vec<u8>> {
653        let recv = self
654            .recv
655            .as_mut()
656            .context("Receive stream already closed")?;
657
658        let data = recv
659            .read_to_end(10 * 1024 * 1024) // 10 MB limit
660            .await
661            .context("Failed to receive all data")?;
662
663        self.stats
664            .bytes_received
665            .fetch_add(data.len() as u64, Ordering::Relaxed);
666
667        Ok(data)
668    }
669}
670
671impl Drop for QuicStream {
672    fn drop(&mut self) {
673        self.stats.streams_closed.fetch_add(1, Ordering::Relaxed);
674    }
675}
676
677/// Unidirectional send-only QUIC stream
678pub struct QuicSendStream {
679    send: quinn::SendStream,
680    stats: Arc<QuicStats>,
681}
682
683impl QuicSendStream {
684    /// Send data on the stream
685    ///
686    /// # Errors
687    ///
688    /// Returns an error if sending fails
689    pub async fn send(&mut self, data: &[u8]) -> Result<()> {
690        self.send
691            .write_all(data)
692            .await
693            .context("Failed to send data")?;
694
695        self.stats
696            .bytes_sent
697            .fetch_add(data.len() as u64, Ordering::Relaxed);
698
699        Ok(())
700    }
701
702    /// Finish sending (close stream)
703    ///
704    /// # Errors
705    ///
706    /// Returns an error if finishing fails
707    pub async fn finish(mut self) -> Result<()> {
708        self.send.finish().context("Failed to finish stream")?;
709        Ok(())
710    }
711}
712
713impl Drop for QuicSendStream {
714    fn drop(&mut self) {
715        self.stats.streams_closed.fetch_add(1, Ordering::Relaxed);
716    }
717}
718
719/// Unidirectional receive-only QUIC stream
720pub struct QuicRecvStream {
721    recv: quinn::RecvStream,
722    stats: Arc<QuicStats>,
723}
724
725impl QuicRecvStream {
726    /// Receive data from the stream
727    ///
728    /// # Arguments
729    ///
730    /// * `buffer` - Buffer to receive data into
731    ///
732    /// # Returns
733    ///
734    /// Returns the number of bytes received, or 0 if the stream is finished
735    ///
736    /// # Errors
737    ///
738    /// Returns an error if receiving fails
739    pub async fn receive(&mut self, buffer: &mut [u8]) -> Result<usize> {
740        let len = self
741            .recv
742            .read(buffer)
743            .await
744            .context("Failed to receive data")?
745            .unwrap_or(0);
746
747        self.stats
748            .bytes_received
749            .fetch_add(len as u64, Ordering::Relaxed);
750
751        Ok(len)
752    }
753
754    /// Receive all remaining data from the stream
755    ///
756    /// # Returns
757    ///
758    /// Returns all data received until the stream is finished
759    ///
760    /// # Errors
761    ///
762    /// Returns an error if receiving fails or data exceeds 10MB
763    pub async fn receive_all(mut self) -> Result<Vec<u8>> {
764        let data = self
765            .recv
766            .read_to_end(10 * 1024 * 1024) // 10 MB limit
767            .await
768            .context("Failed to receive all data")?;
769
770        self.stats
771            .bytes_received
772            .fetch_add(data.len() as u64, Ordering::Relaxed);
773
774        Ok(data)
775    }
776}
777
778impl Drop for QuicRecvStream {
779    fn drop(&mut self) {
780        self.stats.streams_closed.fetch_add(1, Ordering::Relaxed);
781    }
782}
783
784/// QUIC transport statistics
785///
786/// Tracks various metrics about QUIC connections and streams.
787#[derive(Debug, Default)]
788pub struct QuicStats {
789    /// Number of connections initiated (client-side)
790    pub connections_initiated: AtomicU64,
791    /// Number of connections accepted (server-side)
792    pub connections_accepted: AtomicU64,
793    /// Number of connections successfully established
794    pub connections_established: AtomicU64,
795    /// Number of connections closed
796    pub connections_closed: AtomicU64,
797    /// Number of streams opened
798    pub streams_opened: AtomicU64,
799    /// Number of streams accepted
800    pub streams_accepted: AtomicU64,
801    /// Number of streams closed
802    pub streams_closed: AtomicU64,
803    /// Total bytes sent
804    pub bytes_sent: AtomicU64,
805    /// Total bytes received
806    pub bytes_received: AtomicU64,
807}
808
809impl Clone for QuicStats {
810    fn clone(&self) -> Self {
811        Self {
812            connections_initiated: AtomicU64::new(
813                self.connections_initiated.load(Ordering::Relaxed),
814            ),
815            connections_accepted: AtomicU64::new(self.connections_accepted.load(Ordering::Relaxed)),
816            connections_established: AtomicU64::new(
817                self.connections_established.load(Ordering::Relaxed),
818            ),
819            connections_closed: AtomicU64::new(self.connections_closed.load(Ordering::Relaxed)),
820            streams_opened: AtomicU64::new(self.streams_opened.load(Ordering::Relaxed)),
821            streams_accepted: AtomicU64::new(self.streams_accepted.load(Ordering::Relaxed)),
822            streams_closed: AtomicU64::new(self.streams_closed.load(Ordering::Relaxed)),
823            bytes_sent: AtomicU64::new(self.bytes_sent.load(Ordering::Relaxed)),
824            bytes_received: AtomicU64::new(self.bytes_received.load(Ordering::Relaxed)),
825        }
826    }
827}
828
829impl QuicStats {
830    /// Get number of active connections
831    #[inline]
832    #[must_use]
833    pub fn active_connections(&self) -> u64 {
834        let established = self.connections_established.load(Ordering::Relaxed);
835        let closed = self.connections_closed.load(Ordering::Relaxed);
836        established.saturating_sub(closed)
837    }
838
839    /// Get number of active streams
840    #[inline]
841    #[must_use]
842    pub fn active_streams(&self) -> u64 {
843        let opened = self.streams_opened.load(Ordering::Relaxed);
844        let accepted = self.streams_accepted.load(Ordering::Relaxed);
845        let closed = self.streams_closed.load(Ordering::Relaxed);
846        (opened + accepted).saturating_sub(closed)
847    }
848
849    /// Get total bytes transferred (sent + received)
850    #[inline]
851    #[must_use]
852    pub fn total_bytes(&self) -> u64 {
853        self.bytes_sent.load(Ordering::Relaxed) + self.bytes_received.load(Ordering::Relaxed)
854    }
855}
856
857/// Connection pool for managing multiple QUIC connections
858///
859/// Provides connection reuse and load balancing across multiple connections.
860pub struct QuicConnectionPool {
861    connections: Arc<RwLock<Vec<QuicConnection>>>,
862    endpoint: Arc<QuicEndpoint>,
863    server_addr: String,
864    server_name: String,
865    max_connections: usize,
866}
867
868impl QuicConnectionPool {
869    /// Create a new connection pool
870    ///
871    /// # Arguments
872    ///
873    /// * `endpoint` - QUIC endpoint to use for connections
874    /// * `server_addr` - Server address to connect to
875    /// * `server_name` - Server name for TLS verification
876    /// * `max_connections` - Maximum number of pooled connections
877    #[must_use]
878    pub fn new(
879        endpoint: QuicEndpoint,
880        server_addr: String,
881        server_name: String,
882        max_connections: usize,
883    ) -> Self {
884        Self {
885            connections: Arc::new(RwLock::new(Vec::new())),
886            endpoint: Arc::new(endpoint),
887            server_addr,
888            server_name,
889            max_connections,
890        }
891    }
892
893    /// Get a connection from the pool or create a new one
894    ///
895    /// # Errors
896    ///
897    /// Returns an error if connection fails
898    pub async fn get_connection(&self) -> Result<QuicConnection> {
899        // Try to reuse existing connection
900        {
901            let mut connections = self.connections.write().await;
902            if let Some(conn) = connections.pop() {
903                return Ok(conn);
904            }
905        }
906
907        // Create new connection
908        let connection = self
909            .endpoint
910            .connect(&self.server_addr, &self.server_name)
911            .await?;
912
913        Ok(connection)
914    }
915
916    /// Return a connection to the pool
917    ///
918    /// # Arguments
919    ///
920    /// * `connection` - Connection to return
921    pub async fn return_connection(&self, connection: QuicConnection) {
922        let mut connections = self.connections.write().await;
923        if connections.len() < self.max_connections {
924            connections.push(connection);
925        }
926        // Otherwise, connection is dropped
927    }
928
929    /// Get pool statistics
930    #[must_use]
931    pub async fn stats(&self) -> PoolStats {
932        let connections = self.connections.read().await;
933        PoolStats {
934            pooled_connections: connections.len(),
935            max_connections: self.max_connections,
936        }
937    }
938}
939
940/// Connection pool statistics
941#[derive(Debug, Clone)]
942pub struct PoolStats {
943    /// Number of connections currently in the pool
944    pub pooled_connections: usize,
945    /// Maximum number of connections
946    pub max_connections: usize,
947}
948
949/// Generate a self-signed certificate for testing
950///
951/// # Errors
952///
953/// Returns an error if certificate generation fails
954fn generate_self_signed_cert() -> Result<(CertificateDer<'static>, PrivateKeyDer<'static>)> {
955    let certified_key = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
956        .context("Failed to generate certificate")?;
957
958    let key = PrivateKeyDer::Pkcs8(certified_key.signing_key.serialize_der().into());
959    let cert_der = CertificateDer::from(certified_key.cert);
960
961    Ok((cert_der, key))
962}
963
964#[cfg(test)]
965mod tests {
966    use super::*;
967    use rustls::client::danger::{ServerCertVerified, ServerCertVerifier};
968    use rustls::pki_types::{ServerName, UnixTime};
969
970    /// Test-only: Create a client that skips certificate verification
971    async fn create_insecure_client(config: QuicConfig) -> Result<QuicEndpoint> {
972        // Install default crypto provider if not already installed
973        let _ = rustls::crypto::ring::default_provider().install_default();
974
975        // Custom verifier that accepts all certificates
976        #[derive(Debug)]
977        struct SkipServerVerification;
978
979        impl ServerCertVerifier for SkipServerVerification {
980            fn verify_server_cert(
981                &self,
982                _end_entity: &CertificateDer<'_>,
983                _intermediates: &[CertificateDer<'_>],
984                _server_name: &ServerName<'_>,
985                _ocsp_response: &[u8],
986                _now: UnixTime,
987            ) -> Result<ServerCertVerified, rustls::Error> {
988                Ok(ServerCertVerified::assertion())
989            }
990
991            fn verify_tls12_signature(
992                &self,
993                _message: &[u8],
994                _cert: &CertificateDer<'_>,
995                _dss: &rustls::DigitallySignedStruct,
996            ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
997            {
998                Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
999            }
1000
1001            fn verify_tls13_signature(
1002                &self,
1003                _message: &[u8],
1004                _cert: &CertificateDer<'_>,
1005                _dss: &rustls::DigitallySignedStruct,
1006            ) -> Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error>
1007            {
1008                Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
1009            }
1010
1011            fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
1012                vec![
1013                    rustls::SignatureScheme::RSA_PKCS1_SHA256,
1014                    rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
1015                    rustls::SignatureScheme::ED25519,
1016                ]
1017            }
1018        }
1019
1020        let crypto = rustls::ClientConfig::builder()
1021            .dangerous()
1022            .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
1023            .with_no_client_auth();
1024
1025        let mut client_config = ClientConfig::new(Arc::new(
1026            quinn::crypto::rustls::QuicClientConfig::try_from(crypto)?,
1027        ));
1028
1029        let mut transport_config = quinn::TransportConfig::default();
1030        transport_config.max_concurrent_bidi_streams(
1031            config
1032                .max_concurrent_bidi_streams
1033                .try_into()
1034                .unwrap_or(100u32.into()),
1035        );
1036        transport_config.max_concurrent_uni_streams(
1037            config
1038                .max_concurrent_uni_streams
1039                .try_into()
1040                .unwrap_or(100u32.into()),
1041        );
1042        transport_config.max_idle_timeout(Some(config.max_idle_timeout.try_into()?));
1043        transport_config.keep_alive_interval(Some(config.keep_alive_interval));
1044
1045        client_config.transport_config(Arc::new(transport_config));
1046
1047        let mut endpoint = Endpoint::client("0.0.0.0:0".parse()?)?;
1048        endpoint.set_default_client_config(client_config);
1049
1050        Ok(QuicEndpoint {
1051            endpoint,
1052            stats: Arc::new(QuicStats::default()),
1053            config,
1054        })
1055    }
1056
1057    #[test]
1058    fn test_config_builder() {
1059        let config = QuicConfig::builder()
1060            .with_max_concurrent_streams(200)
1061            .with_max_idle_timeout(Duration::from_secs(60))
1062            .with_keep_alive_interval(Duration::from_secs(10))
1063            .with_migration(false)
1064            .with_0rtt(true)
1065            .build();
1066
1067        assert_eq!(config.max_concurrent_bidi_streams, 200);
1068        assert_eq!(config.max_concurrent_uni_streams, 200);
1069        assert_eq!(config.max_idle_timeout, Duration::from_secs(60));
1070        assert_eq!(config.keep_alive_interval, Duration::from_secs(10));
1071        assert!(!config.enable_migration);
1072        assert!(config.enable_0rtt);
1073    }
1074
1075    #[test]
1076    fn test_default_config() {
1077        let config = QuicConfig::default();
1078        assert_eq!(config.max_concurrent_bidi_streams, 100);
1079        assert_eq!(config.max_idle_timeout, Duration::from_secs(30));
1080        assert!(config.enable_migration);
1081        assert!(!config.enable_0rtt);
1082    }
1083
1084    #[test]
1085    fn test_stats_calculations() {
1086        let stats = QuicStats::default();
1087
1088        stats.connections_established.store(10, Ordering::Relaxed);
1089        stats.connections_closed.store(3, Ordering::Relaxed);
1090        assert_eq!(stats.active_connections(), 7);
1091
1092        stats.streams_opened.store(20, Ordering::Relaxed);
1093        stats.streams_accepted.store(15, Ordering::Relaxed);
1094        stats.streams_closed.store(10, Ordering::Relaxed);
1095        assert_eq!(stats.active_streams(), 25);
1096
1097        stats.bytes_sent.store(1000, Ordering::Relaxed);
1098        stats.bytes_received.store(2000, Ordering::Relaxed);
1099        assert_eq!(stats.total_bytes(), 3000);
1100    }
1101
1102    #[tokio::test]
1103    async fn test_server_creation() {
1104        let config = QuicConfig::default();
1105        let result = QuicEndpoint::server("127.0.0.1:0", config).await;
1106        assert!(result.is_ok());
1107
1108        let endpoint = result.unwrap();
1109        assert!(endpoint.local_addr().is_some());
1110    }
1111
1112    #[tokio::test]
1113    async fn test_client_creation() {
1114        let config = QuicConfig::default();
1115        let result = QuicEndpoint::client(config).await;
1116        assert!(result.is_ok());
1117    }
1118
1119    #[tokio::test]
1120    async fn test_connection_pool_creation() {
1121        let config = QuicConfig::default();
1122        let endpoint = QuicEndpoint::client(config).await.unwrap();
1123
1124        let pool = QuicConnectionPool::new(
1125            endpoint,
1126            "127.0.0.1:4433".to_string(),
1127            "localhost".to_string(),
1128            10,
1129        );
1130
1131        let stats = pool.stats().await;
1132        assert_eq!(stats.pooled_connections, 0);
1133        assert_eq!(stats.max_connections, 10);
1134    }
1135
1136    #[tokio::test]
1137    async fn test_server_client_communication() {
1138        // Create server
1139        let server_config = QuicConfig::default();
1140        let mut server = QuicEndpoint::server("127.0.0.1:0", server_config)
1141            .await
1142            .unwrap();
1143
1144        let server_addr = server.local_addr().unwrap();
1145
1146        // Spawn server task
1147        let server_task = tokio::spawn(async move {
1148            if let Some(incoming) = server.accept().await {
1149                let connection = incoming.accept().await.unwrap();
1150                if let Some(mut stream) = connection.accept_bidirectional_stream().await {
1151                    // Receive all data from client
1152                    let received_data = stream.receive_all().await.unwrap();
1153                    let received = String::from_utf8_lossy(&received_data);
1154
1155                    // Send response
1156                    stream.send(b"Hello, Client!").await.unwrap();
1157                    stream.finish().await.unwrap();
1158
1159                    // Keep connection alive a bit longer
1160                    tokio::time::sleep(Duration::from_millis(100)).await;
1161
1162                    received.to_string()
1163                } else {
1164                    String::new()
1165                }
1166            } else {
1167                String::new()
1168            }
1169        });
1170
1171        // Give server time to start
1172        tokio::time::sleep(Duration::from_millis(100)).await;
1173
1174        // Create client with insecure certificate verification for testing
1175        let client_config = QuicConfig::default();
1176        let client = create_insecure_client(client_config).await.unwrap();
1177
1178        // Connect to server
1179        let connection = client
1180            .connect(&server_addr.to_string(), "localhost")
1181            .await
1182            .unwrap();
1183
1184        // Open stream and send data
1185        let mut stream = connection.open_bidirectional_stream().await.unwrap();
1186        stream.send(b"Hello, Server!").await.unwrap();
1187        stream.finish().await.unwrap();
1188
1189        // Receive response
1190        let response = stream.receive_all().await.unwrap();
1191        assert_eq!(response, b"Hello, Client!");
1192
1193        // Wait for server
1194        let server_received = server_task.await.unwrap();
1195        assert_eq!(server_received, "Hello, Server!");
1196    }
1197
1198    #[test]
1199    fn test_certificate_generation() {
1200        let result = generate_self_signed_cert();
1201        assert!(result.is_ok());
1202    }
1203
1204    #[tokio::test]
1205    async fn test_stream_statistics() {
1206        let config = QuicConfig::default();
1207        let mut server = QuicEndpoint::server("127.0.0.1:0", config.clone())
1208            .await
1209            .unwrap();
1210
1211        let server_addr = server.local_addr().unwrap();
1212
1213        tokio::spawn(async move {
1214            if let Some(incoming) = server.accept().await {
1215                let _ = incoming.accept().await;
1216            }
1217        });
1218
1219        tokio::time::sleep(Duration::from_millis(50)).await;
1220
1221        let client = create_insecure_client(config).await.unwrap();
1222        let connection = client
1223            .connect(&server_addr.to_string(), "localhost")
1224            .await
1225            .unwrap();
1226
1227        let stats_before = connection.stats();
1228        let initial_streams = stats_before.streams_opened.load(Ordering::Relaxed);
1229
1230        let _stream = connection.open_bidirectional_stream().await.unwrap();
1231
1232        let stats_after = connection.stats();
1233        assert_eq!(
1234            stats_after.streams_opened.load(Ordering::Relaxed),
1235            initial_streams + 1
1236        );
1237    }
1238
1239    #[tokio::test]
1240    async fn test_multiple_streams() {
1241        let config = QuicConfig::default();
1242        let mut server = QuicEndpoint::server("127.0.0.1:0", config.clone())
1243            .await
1244            .unwrap();
1245
1246        let server_addr = server.local_addr().unwrap();
1247
1248        tokio::spawn(async move {
1249            if let Some(incoming) = server.accept().await {
1250                let connection = incoming.accept().await.unwrap();
1251                for _ in 0..3 {
1252                    if let Some(mut stream) = connection.accept_bidirectional_stream().await {
1253                        // Receive all data from client
1254                        let _ = stream.receive_all().await;
1255                        // Send response
1256                        let _ = stream.send(b"ACK").await;
1257                        let _ = stream.finish().await;
1258                    }
1259                }
1260                // Keep connection alive a bit longer
1261                tokio::time::sleep(Duration::from_millis(100)).await;
1262            }
1263        });
1264
1265        tokio::time::sleep(Duration::from_millis(50)).await;
1266
1267        let client = create_insecure_client(config).await.unwrap();
1268        let connection = client
1269            .connect(&server_addr.to_string(), "localhost")
1270            .await
1271            .unwrap();
1272
1273        // Open and use multiple streams
1274        for i in 0..3 {
1275            let mut stream = connection.open_bidirectional_stream().await.unwrap();
1276            stream
1277                .send(format!("Message {}", i).as_bytes())
1278                .await
1279                .unwrap();
1280            stream.finish().await.unwrap();
1281
1282            let response = stream.receive_all().await.unwrap();
1283            assert_eq!(response, b"ACK");
1284        }
1285
1286        let stats = connection.stats();
1287        assert!(stats.streams_opened.load(Ordering::Relaxed) >= 3);
1288    }
1289
1290    #[tokio::test]
1291    async fn test_connection_close() {
1292        let config = QuicConfig::default();
1293        let client = QuicEndpoint::client(config).await.unwrap();
1294        let initial_stats = client.stats();
1295
1296        client.close(0, b"test close");
1297
1298        // Stats should remain valid after close
1299        let final_stats = client.stats();
1300        assert_eq!(
1301            initial_stats.connections_initiated.load(Ordering::Relaxed),
1302            final_stats.connections_initiated.load(Ordering::Relaxed)
1303        );
1304    }
1305
1306    #[tokio::test]
1307    async fn test_unidirectional_streams() {
1308        let config = QuicConfig::default();
1309        let mut server = QuicEndpoint::server("127.0.0.1:0", config.clone())
1310            .await
1311            .unwrap();
1312
1313        let server_addr = server.local_addr().unwrap();
1314
1315        tokio::spawn(async move {
1316            if let Some(incoming) = server.accept().await {
1317                let connection = incoming.accept().await.unwrap();
1318                if let Some(stream) = connection.accept_unidirectional_stream().await {
1319                    let data = stream.receive_all().await.unwrap();
1320                    assert_eq!(data, b"Unidirectional message");
1321                }
1322            }
1323        });
1324
1325        tokio::time::sleep(Duration::from_millis(50)).await;
1326
1327        let client = create_insecure_client(config).await.unwrap();
1328        let connection = client
1329            .connect(&server_addr.to_string(), "localhost")
1330            .await
1331            .unwrap();
1332
1333        let mut stream = connection.open_unidirectional_stream().await.unwrap();
1334        stream.send(b"Unidirectional message").await.unwrap();
1335        stream.finish().await.unwrap();
1336
1337        tokio::time::sleep(Duration::from_millis(100)).await;
1338    }
1339}