ipfrs_transport/
quic.rs

1//! QUIC transport for efficient block exchange
2//!
3//! Implements QUIC-based transport using the quinn crate:
4//! - 0-RTT connection establishment
5//! - Connection pooling and reuse
6//! - Stream multiplexing
7//! - Congestion control tuning for bulk transfer
8//! - Zero-copy block forwarding with bytes::Bytes
9
10use bytes::Bytes;
11use ipfrs_core::error::{Error, Result};
12use quinn::{
13    ClientConfig, Connection, Endpoint, RecvStream, SendStream, ServerConfig, TransportConfig,
14};
15use std::collections::HashMap;
16use std::net::SocketAddr;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19use tokio::sync::RwLock;
20
21/// QUIC transport configuration
22#[derive(Debug, Clone)]
23pub struct QuicConfig {
24    /// Address to bind the endpoint to
25    pub bind_addr: SocketAddr,
26    /// Maximum idle timeout for connections
27    pub idle_timeout: Duration,
28    /// Maximum concurrent streams per connection
29    pub max_streams: u32,
30    /// Enable 0-RTT early data
31    pub enable_0rtt: bool,
32    /// Connection pool size per peer
33    pub pool_size: usize,
34    /// Idle connection timeout before eviction
35    pub pool_idle_timeout: Duration,
36    /// Maximum message size
37    pub max_message_size: usize,
38    /// Initial congestion window (bytes)
39    pub initial_window: u32,
40    /// Maximum congestion window (bytes)
41    pub max_window: u32,
42}
43
44impl Default for QuicConfig {
45    fn default() -> Self {
46        Self {
47            bind_addr: "0.0.0.0:0".parse().unwrap(),
48            idle_timeout: Duration::from_secs(30),
49            max_streams: 256,
50            enable_0rtt: true,
51            pool_size: 4,
52            pool_idle_timeout: Duration::from_secs(60),
53            max_message_size: 16 * 1024 * 1024, // 16 MB
54            initial_window: 10 * 1024 * 1024,   // 10 MB
55            max_window: 100 * 1024 * 1024,      // 100 MB
56        }
57    }
58}
59
60/// Connection pool entry
61struct PooledConnection {
62    connection: Connection,
63    /// When connection was created - reserved for connection age metrics
64    #[allow(dead_code)]
65    created_at: Instant,
66    last_used: Instant,
67    active_streams: u32,
68}
69
70impl PooledConnection {
71    fn new(connection: Connection) -> Self {
72        let now = Instant::now();
73        Self {
74            connection,
75            created_at: now,
76            last_used: now,
77            active_streams: 0,
78        }
79    }
80
81    fn is_healthy(&self) -> bool {
82        self.connection.close_reason().is_none()
83    }
84
85    fn is_idle(&self, timeout: Duration) -> bool {
86        self.last_used.elapsed() > timeout && self.active_streams == 0
87    }
88
89    fn touch(&mut self) {
90        self.last_used = Instant::now();
91    }
92}
93
94/// Connection pool for a single peer
95struct PeerPool {
96    connections: Vec<PooledConnection>,
97    max_size: usize,
98    idle_timeout: Duration,
99}
100
101impl PeerPool {
102    fn new(max_size: usize, idle_timeout: Duration) -> Self {
103        Self {
104            connections: Vec::with_capacity(max_size),
105            max_size,
106            idle_timeout,
107        }
108    }
109
110    /// Get an available connection from the pool
111    fn get(&mut self) -> Option<&mut PooledConnection> {
112        // Clean up closed connections
113        self.connections.retain(|c| c.is_healthy());
114
115        // Remove idle connections
116        self.connections.retain(|c| !c.is_idle(self.idle_timeout));
117
118        // Find connection with lowest active streams
119        self.connections
120            .iter_mut()
121            .filter(|c| c.is_healthy())
122            .min_by_key(|c| c.active_streams)
123    }
124
125    /// Add a connection to the pool
126    fn add(&mut self, connection: Connection) -> bool {
127        if self.connections.len() >= self.max_size {
128            // Remove oldest idle connection
129            if let Some(pos) = self
130                .connections
131                .iter()
132                .position(|c| c.is_idle(Duration::ZERO))
133            {
134                self.connections.remove(pos);
135            } else {
136                return false;
137            }
138        }
139
140        self.connections.push(PooledConnection::new(connection));
141        true
142    }
143
144    fn connection_count(&self) -> usize {
145        self.connections.len()
146    }
147}
148
149/// QUIC transport for block exchange
150pub struct QuicTransport {
151    /// QUIC endpoint
152    endpoint: Endpoint,
153    /// Connection pools per peer address
154    pools: Arc<RwLock<HashMap<SocketAddr, PeerPool>>>,
155    /// Configuration
156    config: QuicConfig,
157    /// Client configuration for outbound connections
158    client_config: ClientConfig,
159}
160
161impl QuicTransport {
162    /// Create a new QUIC transport
163    pub async fn new(config: QuicConfig) -> Result<Self> {
164        // Create self-signed certificate for development
165        let (cert, key) = Self::generate_self_signed_cert()?;
166
167        // Server config with its own transport config
168        let server_transport = Self::create_transport_config(&config);
169        let mut server_config = ServerConfig::with_single_cert(vec![cert.clone()], key.clone_key())
170            .map_err(|e| Error::Internal(format!("Failed to create server config: {}", e)))?;
171        server_config.transport_config(Arc::new(server_transport));
172
173        // Client config with its own transport config (skip verification for development)
174        let client_transport = Self::create_transport_config(&config);
175        let client_crypto = rustls::ClientConfig::builder()
176            .dangerous()
177            .with_custom_certificate_verifier(Arc::new(SkipServerVerification))
178            .with_no_client_auth();
179        let mut client_config = ClientConfig::new(Arc::new(
180            quinn::crypto::rustls::QuicClientConfig::try_from(client_crypto).map_err(|e| {
181                Error::Internal(format!("Failed to create QUIC client config: {}", e))
182            })?,
183        ));
184        client_config.transport_config(Arc::new(client_transport));
185
186        // Create endpoint
187        let endpoint = Endpoint::server(server_config, config.bind_addr)
188            .map_err(|e| Error::Internal(format!("Failed to create QUIC endpoint: {}", e)))?;
189
190        Ok(Self {
191            endpoint,
192            pools: Arc::new(RwLock::new(HashMap::new())),
193            config,
194            client_config,
195        })
196    }
197
198    /// Create transport configuration optimized for bulk transfer
199    fn create_transport_config(config: &QuicConfig) -> TransportConfig {
200        let mut transport = TransportConfig::default();
201        transport.max_idle_timeout(Some(config.idle_timeout.try_into().unwrap_or_default()));
202        transport.max_concurrent_bidi_streams(config.max_streams.into());
203        transport.max_concurrent_uni_streams(config.max_streams.into());
204        transport.initial_mtu(1200);
205        // Note: Congestion window settings would be configured via
206        // custom congestion controller implementation
207        transport
208    }
209
210    /// Generate a self-signed certificate for development
211    fn generate_self_signed_cert() -> Result<(
212        rustls::pki_types::CertificateDer<'static>,
213        rustls::pki_types::PrivateKeyDer<'static>,
214    )> {
215        let rcgen_cert = rcgen::generate_simple_self_signed(vec!["localhost".to_string()])
216            .map_err(|e| Error::Internal(format!("Failed to generate certificate: {}", e)))?;
217
218        let cert_der = rustls::pki_types::CertificateDer::from(rcgen_cert.cert.der().to_vec());
219        let key_der =
220            rustls::pki_types::PrivateKeyDer::try_from(rcgen_cert.signing_key.serialize_der())
221                .map_err(|e| Error::Internal(format!("Failed to serialize key: {}", e)))?;
222
223        Ok((cert_der, key_der))
224    }
225
226    /// Get local address
227    pub fn local_addr(&self) -> Result<SocketAddr> {
228        self.endpoint
229            .local_addr()
230            .map_err(|e| Error::Internal(format!("Failed to get local address: {}", e)))
231    }
232
233    /// Connect to a peer
234    pub async fn connect(&self, addr: SocketAddr) -> Result<Connection> {
235        // Check pool first
236        {
237            let mut pools = self.pools.write().await;
238            if let Some(pool) = pools.get_mut(&addr) {
239                if let Some(conn) = pool.get() {
240                    conn.touch();
241                    return Ok(conn.connection.clone());
242                }
243            }
244        }
245
246        // Establish new connection
247        let connection = self
248            .endpoint
249            .connect_with(self.client_config.clone(), addr, "localhost")
250            .map_err(|e| Error::Internal(format!("Failed to initiate connection: {}", e)))?
251            .await
252            .map_err(|e| Error::Internal(format!("Failed to connect: {}", e)))?;
253
254        // Add to pool
255        {
256            let mut pools = self.pools.write().await;
257            let pool = pools.entry(addr).or_insert_with(|| {
258                PeerPool::new(self.config.pool_size, self.config.pool_idle_timeout)
259            });
260            pool.add(connection.clone());
261        }
262
263        Ok(connection)
264    }
265
266    /// Accept an incoming connection
267    pub async fn accept(&self) -> Result<Option<Connection>> {
268        if let Some(incoming) = self.endpoint.accept().await {
269            let connection = incoming
270                .await
271                .map_err(|e| Error::Internal(format!("Failed to accept connection: {}", e)))?;
272            Ok(Some(connection))
273        } else {
274            Ok(None)
275        }
276    }
277
278    /// Open a bidirectional stream on a connection
279    pub async fn open_stream(&self, connection: &Connection) -> Result<(SendStream, RecvStream)> {
280        connection
281            .open_bi()
282            .await
283            .map_err(|e| Error::Internal(format!("Failed to open stream: {}", e)))
284    }
285
286    /// Send data on a stream
287    pub async fn send(&self, stream: &mut SendStream, data: &[u8]) -> Result<()> {
288        stream
289            .write_all(data)
290            .await
291            .map_err(|e| Error::Internal(format!("Failed to send data: {}", e)))?;
292        stream
293            .finish()
294            .map_err(|e| Error::Internal(format!("Failed to finish stream: {}", e)))?;
295        Ok(())
296    }
297
298    /// Receive data from a stream
299    pub async fn receive(&self, stream: &mut RecvStream) -> Result<Vec<u8>> {
300        let data = stream
301            .read_to_end(self.config.max_message_size)
302            .await
303            .map_err(|e| Error::Internal(format!("Failed to receive data: {}", e)))?;
304        Ok(data)
305    }
306
307    /// Send data using zero-copy with Bytes
308    pub async fn send_zero_copy(&self, stream: &mut SendStream, data: Bytes) -> Result<()> {
309        stream
310            .write_all(&data)
311            .await
312            .map_err(|e| Error::Internal(format!("Failed to send data: {}", e)))?;
313        stream
314            .finish()
315            .map_err(|e| Error::Internal(format!("Failed to finish stream: {}", e)))?;
316        Ok(())
317    }
318
319    /// Receive data from a stream as Bytes (zero-copy)
320    pub async fn receive_zero_copy(&self, stream: &mut RecvStream) -> Result<Bytes> {
321        let data = stream
322            .read_to_end(self.config.max_message_size)
323            .await
324            .map_err(|e| Error::Internal(format!("Failed to receive data: {}", e)))?;
325        Ok(Bytes::from(data))
326    }
327
328    /// Forward block data directly between streams (zero-copy)
329    pub async fn forward_block(
330        &self,
331        recv_stream: &mut RecvStream,
332        send_stream: &mut SendStream,
333    ) -> Result<usize> {
334        let mut total_bytes = 0;
335        let mut buffer = vec![0u8; 16384]; // 16 KB chunks
336
337        loop {
338            let n = match recv_stream.read(&mut buffer).await {
339                Ok(Some(n)) => n,
340                Ok(None) => break,
341                Err(e) => return Err(Error::Internal(format!("Failed to read: {}", e))),
342            };
343
344            send_stream
345                .write_all(&buffer[..n])
346                .await
347                .map_err(|e| Error::Internal(format!("Failed to write: {}", e)))?;
348
349            total_bytes += n;
350        }
351
352        send_stream
353            .finish()
354            .map_err(|e| Error::Internal(format!("Failed to finish stream: {}", e)))?;
355
356        Ok(total_bytes)
357    }
358
359    /// Send data to a peer address (opens connection if needed)
360    pub async fn send_to(&self, addr: SocketAddr, data: &[u8]) -> Result<()> {
361        let connection = self.connect(addr).await?;
362        let (mut send, _recv) = self.open_stream(&connection).await?;
363        self.send(&mut send, data).await
364    }
365
366    /// Get connection pool statistics
367    pub async fn pool_stats(&self) -> QuicPoolStats {
368        let pools = self.pools.read().await;
369        let total_connections: usize = pools.values().map(|p| p.connection_count()).sum();
370        let peer_count = pools.len();
371
372        QuicPoolStats {
373            peer_count,
374            total_connections,
375        }
376    }
377
378    /// Clean up idle connections
379    pub async fn cleanup_idle(&self) {
380        let mut pools = self.pools.write().await;
381        for pool in pools.values_mut() {
382            pool.connections
383                .retain(|c| c.is_healthy() && !c.is_idle(pool.idle_timeout));
384        }
385        // Remove empty pools
386        pools.retain(|_, p| !p.connections.is_empty());
387    }
388
389    /// Close the transport
390    pub fn close(&self) {
391        self.endpoint.close(0u32.into(), b"shutdown");
392    }
393}
394
395/// QUIC connection pool statistics
396#[derive(Debug, Clone)]
397pub struct QuicPoolStats {
398    /// Number of peers with pooled connections
399    pub peer_count: usize,
400    /// Total pooled connections
401    pub total_connections: usize,
402}
403
404/// Skip server certificate verification (for development only)
405#[derive(Debug)]
406struct SkipServerVerification;
407
408impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
409    fn verify_server_cert(
410        &self,
411        _end_entity: &rustls::pki_types::CertificateDer<'_>,
412        _intermediates: &[rustls::pki_types::CertificateDer<'_>],
413        _server_name: &rustls::pki_types::ServerName<'_>,
414        _ocsp_response: &[u8],
415        _now: rustls::pki_types::UnixTime,
416    ) -> std::result::Result<rustls::client::danger::ServerCertVerified, rustls::Error> {
417        Ok(rustls::client::danger::ServerCertVerified::assertion())
418    }
419
420    fn verify_tls12_signature(
421        &self,
422        _message: &[u8],
423        _cert: &rustls::pki_types::CertificateDer<'_>,
424        _dss: &rustls::DigitallySignedStruct,
425    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
426        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
427    }
428
429    fn verify_tls13_signature(
430        &self,
431        _message: &[u8],
432        _cert: &rustls::pki_types::CertificateDer<'_>,
433        _dss: &rustls::DigitallySignedStruct,
434    ) -> std::result::Result<rustls::client::danger::HandshakeSignatureValid, rustls::Error> {
435        Ok(rustls::client::danger::HandshakeSignatureValid::assertion())
436    }
437
438    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
439        vec![
440            rustls::SignatureScheme::RSA_PKCS1_SHA256,
441            rustls::SignatureScheme::ECDSA_NISTP256_SHA256,
442            rustls::SignatureScheme::RSA_PKCS1_SHA384,
443            rustls::SignatureScheme::ECDSA_NISTP384_SHA384,
444            rustls::SignatureScheme::RSA_PKCS1_SHA512,
445            rustls::SignatureScheme::ECDSA_NISTP521_SHA512,
446            rustls::SignatureScheme::RSA_PSS_SHA256,
447            rustls::SignatureScheme::RSA_PSS_SHA384,
448            rustls::SignatureScheme::RSA_PSS_SHA512,
449            rustls::SignatureScheme::ED25519,
450        ]
451    }
452}
453
454/// Stream handle for parallel block requests
455pub struct BlockStream {
456    send: SendStream,
457    recv: RecvStream,
458}
459
460impl BlockStream {
461    /// Create a new block stream
462    pub fn new(send: SendStream, recv: RecvStream) -> Self {
463        Self { send, recv }
464    }
465
466    /// Send a block request
467    pub async fn send_request(&mut self, data: &[u8]) -> Result<()> {
468        self.send
469            .write_all(data)
470            .await
471            .map_err(|e| Error::Internal(format!("Failed to send request: {}", e)))?;
472        self.send
473            .finish()
474            .map_err(|e| Error::Internal(format!("Failed to finish stream: {}", e)))?;
475        Ok(())
476    }
477
478    /// Receive a block response
479    pub async fn receive_response(&mut self, max_size: usize) -> Result<Vec<u8>> {
480        self.recv
481            .read_to_end(max_size)
482            .await
483            .map_err(|e| Error::Internal(format!("Failed to receive response: {}", e)))
484    }
485
486    /// Send a block request using zero-copy Bytes
487    pub async fn send_request_zero_copy(&mut self, data: Bytes) -> Result<()> {
488        self.send
489            .write_all(&data)
490            .await
491            .map_err(|e| Error::Internal(format!("Failed to send request: {}", e)))?;
492        self.send
493            .finish()
494            .map_err(|e| Error::Internal(format!("Failed to finish stream: {}", e)))?;
495        Ok(())
496    }
497
498    /// Receive a block response as zero-copy Bytes
499    pub async fn receive_response_zero_copy(&mut self, max_size: usize) -> Result<Bytes> {
500        let data = self
501            .recv
502            .read_to_end(max_size)
503            .await
504            .map_err(|e| Error::Internal(format!("Failed to receive response: {}", e)))?;
505        Ok(Bytes::from(data))
506    }
507}
508
509/// Parallel block request manager
510pub struct ParallelRequester {
511    connection: Connection,
512    max_concurrent: usize,
513    /// Maximum message size - reserved for size validation
514    #[allow(dead_code)]
515    max_message_size: usize,
516}
517
518impl ParallelRequester {
519    /// Create a new parallel requester
520    pub fn new(connection: Connection, max_concurrent: usize, max_message_size: usize) -> Self {
521        Self {
522            connection,
523            max_concurrent,
524            max_message_size,
525        }
526    }
527
528    /// Open a new stream for a request
529    pub async fn open_stream(&self) -> Result<BlockStream> {
530        let (send, recv) = self
531            .connection
532            .open_bi()
533            .await
534            .map_err(|e| Error::Internal(format!("Failed to open stream: {}", e)))?;
535        Ok(BlockStream::new(send, recv))
536    }
537
538    /// Execute multiple requests in parallel
539    pub async fn execute_parallel<F, Fut, T>(&self, requests: Vec<F>) -> Vec<Result<T>>
540    where
541        F: FnOnce(BlockStream) -> Fut,
542        Fut: std::future::Future<Output = Result<T>> + Send,
543        T: Send,
544    {
545        use futures::stream::{self, StreamExt};
546
547        let max_concurrent = self.max_concurrent;
548
549        stream::iter(requests)
550            .map(|request| async move {
551                let stream = self.open_stream().await?;
552                request(stream).await
553            })
554            .buffer_unordered(max_concurrent)
555            .collect()
556            .await
557    }
558
559    /// Get maximum concurrent streams
560    pub fn max_concurrent(&self) -> usize {
561        self.max_concurrent
562    }
563}
564
565/// Adaptive batch size tuner
566///
567/// Dynamically adjusts batch sizes based on network performance and peer capacity
568pub struct AdaptiveBatchTuner {
569    /// Current batch size
570    current_batch_size: usize,
571    /// Minimum batch size
572    min_batch_size: usize,
573    /// Maximum batch size
574    max_batch_size: usize,
575    /// Recent completion times (in milliseconds)
576    completion_times: Vec<u64>,
577    /// Window size for averaging
578    window_size: usize,
579    /// Target throughput (blocks/sec)
580    target_throughput: f64,
581    /// Last adjustment time
582    last_adjustment: Instant,
583    /// Adjustment cooldown
584    adjustment_interval: Duration,
585}
586
587impl AdaptiveBatchTuner {
588    /// Create a new adaptive batch tuner
589    pub fn new(
590        initial_batch_size: usize,
591        min_batch_size: usize,
592        max_batch_size: usize,
593        target_throughput: f64,
594    ) -> Self {
595        Self {
596            current_batch_size: initial_batch_size,
597            min_batch_size,
598            max_batch_size,
599            completion_times: Vec::new(),
600            window_size: 10,
601            target_throughput,
602            last_adjustment: Instant::now(),
603            adjustment_interval: Duration::from_secs(1),
604        }
605    }
606
607    /// Record a batch completion time
608    pub fn record_completion(&mut self, duration_ms: u64) {
609        self.completion_times.push(duration_ms);
610        if self.completion_times.len() > self.window_size {
611            self.completion_times.remove(0);
612        }
613    }
614
615    /// Get current batch size
616    pub fn current_batch_size(&self) -> usize {
617        self.current_batch_size
618    }
619
620    /// Adjust batch size based on recent performance
621    pub fn adjust_batch_size(&mut self) -> usize {
622        // Only adjust if enough time has passed
623        if self.last_adjustment.elapsed() < self.adjustment_interval {
624            return self.current_batch_size;
625        }
626
627        // Need at least a few samples to make a decision
628        if self.completion_times.len() < 3 {
629            return self.current_batch_size;
630        }
631
632        // Calculate average completion time
633        let avg_time =
634            self.completion_times.iter().sum::<u64>() as f64 / self.completion_times.len() as f64;
635
636        // Calculate current throughput (blocks per second)
637        let current_throughput = (self.current_batch_size as f64 / avg_time) * 1000.0;
638
639        // Adjust batch size based on throughput
640        let new_batch_size = if current_throughput < self.target_throughput * 0.8 {
641            // Too slow, increase batch size
642            (self.current_batch_size as f64 * 1.2) as usize
643        } else if current_throughput > self.target_throughput * 1.2 {
644            // Too fast, decrease batch size to reduce memory pressure
645            (self.current_batch_size as f64 * 0.8) as usize
646        } else {
647            // Within acceptable range
648            self.current_batch_size
649        };
650
651        // Clamp to min/max
652        self.current_batch_size = new_batch_size.clamp(self.min_batch_size, self.max_batch_size);
653        self.last_adjustment = Instant::now();
654        self.completion_times.clear();
655
656        self.current_batch_size
657    }
658
659    /// Reset tuner state
660    pub fn reset(&mut self) {
661        self.completion_times.clear();
662        self.last_adjustment = Instant::now();
663    }
664}
665
666impl Default for AdaptiveBatchTuner {
667    fn default() -> Self {
668        Self::new(32, 8, 128, 100.0)
669    }
670}
671
672/// Pipeline configuration
673#[derive(Debug, Clone)]
674pub struct PipelineConfig {
675    /// Number of blocks to prefetch ahead
676    pub prefetch_depth: usize,
677    /// Maximum pipeline size
678    pub max_pipeline_size: usize,
679    /// Enable speculative prefetching
680    pub enable_speculation: bool,
681}
682
683impl Default for PipelineConfig {
684    fn default() -> Self {
685        Self {
686            prefetch_depth: 4,
687            max_pipeline_size: 16,
688            enable_speculation: true,
689        }
690    }
691}
692
693/// Pipelined block fetcher for sequential access
694///
695/// Implements request pipelining to reduce round-trip latency for sequential block access
696pub struct SequentialPipeline {
697    /// QUIC connection
698    connection: Connection,
699    /// Pipeline configuration
700    config: PipelineConfig,
701    /// Maximum message size
702    max_message_size: usize,
703    /// Active in-flight requests
704    in_flight: Arc<RwLock<HashMap<u64, tokio::task::JoinHandle<Result<Bytes>>>>>,
705    /// Next block index to request
706    next_index: Arc<RwLock<u64>>,
707}
708
709impl SequentialPipeline {
710    /// Create a new sequential pipeline
711    pub fn new(connection: Connection, config: PipelineConfig, max_message_size: usize) -> Self {
712        Self {
713            connection,
714            config,
715            max_message_size,
716            in_flight: Arc::new(RwLock::new(HashMap::new())),
717            next_index: Arc::new(RwLock::new(0)),
718        }
719    }
720
721    /// Start a pipelined request for a block index
722    async fn start_request(&self, index: u64, request_data: Bytes) -> Result<()> {
723        let connection = self.connection.clone();
724        let max_size = self.max_message_size;
725
726        let handle = tokio::spawn(async move {
727            let (mut send, mut recv) = connection
728                .open_bi()
729                .await
730                .map_err(|e| Error::Internal(format!("Failed to open stream: {}", e)))?;
731
732            // Send request
733            send.write_all(&request_data)
734                .await
735                .map_err(|e| Error::Internal(format!("Failed to send: {}", e)))?;
736            send.finish()
737                .map_err(|e| Error::Internal(format!("Failed to finish: {}", e)))?;
738
739            // Receive response
740            let data = recv
741                .read_to_end(max_size)
742                .await
743                .map_err(|e| Error::Internal(format!("Failed to receive: {}", e)))?;
744
745            Ok(Bytes::from(data))
746        });
747
748        let mut in_flight = self.in_flight.write().await;
749        in_flight.insert(index, handle);
750
751        Ok(())
752    }
753
754    /// Fetch the next block in sequence
755    pub async fn fetch_next(&self, request_data: Bytes) -> Result<Bytes> {
756        let current_index = {
757            let mut next = self.next_index.write().await;
758            let current = *next;
759            *next += 1;
760            current
761        };
762
763        // Start prefetch requests for upcoming blocks
764        if self.config.enable_speculation {
765            for i in 1..=self.config.prefetch_depth {
766                let prefetch_index = current_index + i as u64;
767
768                // Check if already in flight
769                let in_flight = self.in_flight.read().await;
770                if !in_flight.contains_key(&prefetch_index) {
771                    drop(in_flight);
772
773                    // Start speculative request (with same data for now)
774                    let _ = self
775                        .start_request(prefetch_index, request_data.clone())
776                        .await;
777                }
778            }
779        }
780
781        // Wait for current block
782        let handle = {
783            let mut in_flight = self.in_flight.write().await;
784
785            // If not already started, start now
786            if !in_flight.contains_key(&current_index) {
787                drop(in_flight);
788                self.start_request(current_index, request_data).await?;
789                let mut in_flight = self.in_flight.write().await;
790                in_flight.remove(&current_index)
791            } else {
792                in_flight.remove(&current_index)
793            }
794        };
795
796        if let Some(handle) = handle {
797            handle
798                .await
799                .map_err(|e| Error::Internal(format!("Task failed: {}", e)))?
800        } else {
801            Err(Error::Internal("Request handle not found".to_string()))
802        }
803    }
804
805    /// Clear all in-flight requests
806    pub async fn clear(&self) {
807        let mut in_flight = self.in_flight.write().await;
808        for (_, handle) in in_flight.drain() {
809            handle.abort();
810        }
811    }
812
813    /// Get number of in-flight requests
814    pub async fn in_flight_count(&self) -> usize {
815        self.in_flight.read().await.len()
816    }
817}
818
819#[cfg(test)]
820mod tests {
821    use super::*;
822
823    #[test]
824    fn test_quic_config_defaults() {
825        let config = QuicConfig::default();
826        assert_eq!(config.max_streams, 256);
827        assert!(config.enable_0rtt);
828        assert_eq!(config.pool_size, 4);
829    }
830
831    #[test]
832    fn test_peer_pool() {
833        // Note: Full integration tests would require actual QUIC connections
834        let pool = PeerPool::new(4, Duration::from_secs(60));
835        assert_eq!(pool.connection_count(), 0);
836    }
837}