chie_core/
connection_multiplexing.rs

1//! Connection multiplexing for coordinator communication.
2//!
3//! This module provides connection pooling and multiplexing capabilities for
4//! efficient HTTP communication with the coordinator. It reduces connection
5//! overhead by reusing existing connections and queuing requests.
6//!
7//! # Features
8//!
9//! - Connection pooling with configurable size limits
10//! - Automatic connection health monitoring
11//! - Request queuing and fair scheduling
12//! - Exponential backoff retry logic
13//! - Connection keep-alive management
14//! - Circuit breaker integration for failed connections
15//!
16//! # Example
17//!
18//! ```rust
19//! use chie_core::connection_multiplexing::{ConnectionPool, PoolConfig};
20//! use std::time::Duration;
21//!
22//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
23//! // Configure connection pool
24//! let config = PoolConfig::default()
25//!     .with_max_connections(10)
26//!     .with_idle_timeout(Duration::from_secs(60));
27//!
28//! // Create pool
29//! let pool = ConnectionPool::new("https://coordinator.example.com", config);
30//!
31//! // Make requests through the pool
32//! let response = pool.request("POST", "/api/proofs", b"proof_data").await?;
33//! # Ok(())
34//! # }
35//! ```
36
37use std::collections::VecDeque;
38use std::sync::Arc;
39use std::time::{Duration, Instant};
40use tokio::sync::{Mutex, RwLock, Semaphore};
41use tokio::time::sleep;
42
43// Circuit breaker integration removed for simplicity
44// Connection pool handles retries and failures internally
45
46/// Configuration for the connection pool.
47#[derive(Debug, Clone)]
48pub struct PoolConfig {
49    /// Maximum number of concurrent connections.
50    max_connections: usize,
51    /// Idle timeout before closing unused connections.
52    idle_timeout: Duration,
53    /// Connection timeout for establishing new connections.
54    connect_timeout: Duration,
55    /// Request timeout for individual requests.
56    request_timeout: Duration,
57    /// Maximum number of retries for failed requests.
58    max_retries: usize,
59    /// Base delay for exponential backoff.
60    retry_base_delay: Duration,
61    /// Maximum delay for exponential backoff.
62    retry_max_delay: Duration,
63    /// Enable TCP keep-alive.
64    tcp_keepalive: bool,
65    /// TCP keep-alive interval.
66    tcp_keepalive_interval: Duration,
67}
68
69impl Default for PoolConfig {
70    #[inline]
71    fn default() -> Self {
72        Self {
73            max_connections: 10,
74            idle_timeout: Duration::from_secs(60),
75            connect_timeout: Duration::from_secs(10),
76            request_timeout: Duration::from_secs(30),
77            max_retries: 3,
78            retry_base_delay: Duration::from_millis(100),
79            retry_max_delay: Duration::from_secs(30),
80            tcp_keepalive: true,
81            tcp_keepalive_interval: Duration::from_secs(60),
82        }
83    }
84}
85
86impl PoolConfig {
87    /// Creates a new pool configuration with default values.
88    #[must_use]
89    #[inline]
90    pub fn new() -> Self {
91        Self::default()
92    }
93
94    /// Sets the maximum number of concurrent connections.
95    #[must_use]
96    #[inline]
97    pub fn with_max_connections(mut self, max: usize) -> Self {
98        self.max_connections = max;
99        self
100    }
101
102    /// Sets the idle timeout for connections.
103    #[must_use]
104    #[inline]
105    pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
106        self.idle_timeout = timeout;
107        self
108    }
109
110    /// Sets the connection timeout.
111    #[must_use]
112    #[inline]
113    pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
114        self.connect_timeout = timeout;
115        self
116    }
117
118    /// Sets the request timeout.
119    #[must_use]
120    #[inline]
121    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
122        self.request_timeout = timeout;
123        self
124    }
125
126    /// Sets the maximum number of retries.
127    #[must_use]
128    #[inline]
129    pub fn with_max_retries(mut self, retries: usize) -> Self {
130        self.max_retries = retries;
131        self
132    }
133
134    /// Sets the retry base delay.
135    #[must_use]
136    #[inline]
137    pub fn with_retry_base_delay(mut self, delay: Duration) -> Self {
138        self.retry_base_delay = delay;
139        self
140    }
141
142    /// Enables or disables TCP keep-alive.
143    #[must_use]
144    #[inline]
145    pub fn with_tcp_keepalive(mut self, enabled: bool) -> Self {
146        self.tcp_keepalive = enabled;
147        self
148    }
149
150    /// Gets the maximum number of connections.
151    #[must_use]
152    #[inline]
153    pub const fn max_connections(&self) -> usize {
154        self.max_connections
155    }
156
157    /// Gets the idle timeout.
158    #[must_use]
159    #[inline]
160    pub const fn idle_timeout(&self) -> Duration {
161        self.idle_timeout
162    }
163
164    /// Gets the connect timeout.
165    #[must_use]
166    #[inline]
167    pub const fn connect_timeout(&self) -> Duration {
168        self.connect_timeout
169    }
170
171    /// Gets the request timeout.
172    #[must_use]
173    #[inline]
174    pub const fn request_timeout(&self) -> Duration {
175        self.request_timeout
176    }
177
178    /// Gets the maximum retries.
179    #[must_use]
180    #[inline]
181    pub const fn max_retries(&self) -> usize {
182        self.max_retries
183    }
184}
185
186/// Represents a pooled HTTP connection.
187#[derive(Debug)]
188struct PooledConnection {
189    /// Connection ID for tracking.
190    #[allow(dead_code)]
191    id: usize,
192    /// Last time this connection was used.
193    last_used: Instant,
194    /// Number of requests served by this connection.
195    requests_served: u64,
196    /// Whether the connection is currently in use.
197    in_use: bool,
198    /// HTTP client for this connection (reqwest reuses connections internally).
199    client: reqwest::Client,
200}
201
202impl PooledConnection {
203    /// Creates a new pooled connection.
204    #[must_use]
205    fn new(id: usize, config: &PoolConfig) -> Self {
206        let mut builder = reqwest::Client::builder()
207            .timeout(config.request_timeout)
208            .connect_timeout(config.connect_timeout)
209            .pool_max_idle_per_host(1)
210            .pool_idle_timeout(config.idle_timeout);
211
212        if config.tcp_keepalive {
213            builder = builder.tcp_keepalive(Some(config.tcp_keepalive_interval));
214        }
215
216        let client = builder.build().unwrap_or_else(|_| reqwest::Client::new());
217
218        Self {
219            id,
220            last_used: Instant::now(),
221            requests_served: 0,
222            in_use: false,
223            client,
224        }
225    }
226
227    /// Marks the connection as used.
228    #[inline]
229    fn mark_used(&mut self) {
230        self.last_used = Instant::now();
231        self.requests_served += 1;
232        self.in_use = true;
233    }
234
235    /// Marks the connection as released.
236    #[inline]
237    fn release(&mut self) {
238        self.in_use = false;
239        self.last_used = Instant::now();
240    }
241
242    /// Checks if the connection is idle for longer than the timeout.
243    #[must_use]
244    #[inline]
245    fn is_idle(&self, idle_timeout: Duration) -> bool {
246        !self.in_use && self.last_used.elapsed() > idle_timeout
247    }
248
249    /// Gets the connection ID.
250    #[must_use]
251    #[inline]
252    #[allow(dead_code)]
253    const fn id(&self) -> usize {
254        self.id
255    }
256
257    /// Gets the number of requests served.
258    #[must_use]
259    #[inline]
260    #[allow(dead_code)]
261    const fn requests_served(&self) -> u64 {
262        self.requests_served
263    }
264}
265
266/// Statistics for the connection pool.
267#[derive(Debug, Clone, Default)]
268pub struct PoolStats {
269    /// Total number of requests processed.
270    pub total_requests: u64,
271    /// Number of successful requests.
272    pub successful_requests: u64,
273    /// Number of failed requests.
274    pub failed_requests: u64,
275    /// Number of retried requests.
276    pub retried_requests: u64,
277    /// Number of active connections.
278    pub active_connections: usize,
279    /// Number of idle connections.
280    pub idle_connections: usize,
281    /// Total connections created.
282    pub total_connections_created: u64,
283    /// Total connections closed.
284    pub total_connections_closed: u64,
285    /// Average requests per connection.
286    pub avg_requests_per_connection: f64,
287}
288
289/// HTTP connection pool with multiplexing support.
290pub struct ConnectionPool {
291    /// Base URL for the coordinator.
292    base_url: String,
293    /// Pool configuration.
294    config: PoolConfig,
295    /// Available connections.
296    connections: Arc<RwLock<Vec<PooledConnection>>>,
297    /// Request queue for when all connections are busy.
298    #[allow(dead_code)]
299    request_queue: Arc<Mutex<VecDeque<PendingRequest>>>,
300    /// Semaphore to limit concurrent connections.
301    connection_semaphore: Arc<Semaphore>,
302    /// Pool statistics.
303    stats: Arc<RwLock<PoolStats>>,
304    /// Next connection ID.
305    next_connection_id: Arc<Mutex<usize>>,
306}
307
308/// A pending request in the queue.
309#[derive(Debug)]
310#[allow(dead_code)]
311struct PendingRequest {
312    method: String,
313    path: String,
314    body: Vec<u8>,
315    response_tx: tokio::sync::oneshot::Sender<Result<Vec<u8>, ConnectionError>>,
316}
317
318/// Errors that can occur during connection pool operations.
319#[derive(Debug, Clone, thiserror::Error)]
320pub enum ConnectionError {
321    /// Request timeout.
322    #[error("Request timed out")]
323    Timeout,
324    /// Connection failed.
325    #[error("Connection failed: {0}")]
326    ConnectionFailed(String),
327    /// Request failed after all retries.
328    #[error("Request failed after {0} retries")]
329    RetriesExhausted(usize),
330    /// Invalid URL or configuration.
331    #[error("Invalid configuration: {0}")]
332    InvalidConfig(String),
333    /// HTTP error.
334    #[error("HTTP error: {status}")]
335    HttpError { status: u16 },
336    /// Response channel closed.
337    #[error("Response channel closed")]
338    ChannelClosed,
339}
340
341impl ConnectionPool {
342    /// Creates a new connection pool.
343    #[must_use]
344    pub fn new(base_url: impl Into<String>, config: PoolConfig) -> Self {
345        let max_connections = config.max_connections;
346
347        Self {
348            base_url: base_url.into(),
349            config,
350            connections: Arc::new(RwLock::new(Vec::new())),
351            request_queue: Arc::new(Mutex::new(VecDeque::new())),
352            connection_semaphore: Arc::new(Semaphore::new(max_connections)),
353            stats: Arc::new(RwLock::new(PoolStats::default())),
354            next_connection_id: Arc::new(Mutex::new(0)),
355        }
356    }
357
358    /// Makes an HTTP request through the connection pool.
359    pub async fn request(
360        &self,
361        method: &str,
362        path: &str,
363        body: &[u8],
364    ) -> Result<Vec<u8>, ConnectionError> {
365        // Update stats
366        {
367            let mut stats = self.stats.write().await;
368            stats.total_requests += 1;
369        }
370
371        // Try to execute the request with retries
372        let mut attempts = 0;
373        let mut last_error = None;
374
375        while attempts <= self.config.max_retries {
376            if attempts > 0 {
377                // Update retry stats
378                let mut stats = self.stats.write().await;
379                stats.retried_requests += 1;
380
381                // Calculate exponential backoff delay
382                let delay = self.calculate_backoff_delay(attempts);
383                sleep(delay).await;
384            }
385
386            match self.execute_request(method, path, body).await {
387                Ok(response) => {
388                    // Success - record it
389                    let mut stats = self.stats.write().await;
390                    stats.successful_requests += 1;
391
392                    return Ok(response);
393                }
394                Err(e) => {
395                    last_error = Some(e.clone());
396
397                    // Check if we should retry
398                    if !self.should_retry(&e) {
399                        break;
400                    }
401
402                    attempts += 1;
403                }
404            }
405        }
406
407        // All retries exhausted
408        let mut stats = self.stats.write().await;
409        stats.failed_requests += 1;
410
411        Err(last_error.unwrap_or(ConnectionError::RetriesExhausted(attempts)))
412    }
413
414    /// Executes a single request attempt.
415    async fn execute_request(
416        &self,
417        method: &str,
418        path: &str,
419        body: &[u8],
420    ) -> Result<Vec<u8>, ConnectionError> {
421        // Acquire a connection from the pool or create a new one
422        let connection = self.acquire_connection().await?;
423
424        // Build the full URL
425        let url = format!("{}{}", self.base_url, path);
426
427        // Execute the request
428        let result = tokio::time::timeout(
429            self.config.request_timeout,
430            connection
431                .client
432                .request(method.parse().unwrap_or(reqwest::Method::GET), &url)
433                .body(body.to_vec())
434                .send(),
435        )
436        .await;
437
438        // Release the connection back to the pool
439        self.release_connection(connection).await;
440
441        // Process the result
442        match result {
443            Ok(Ok(response)) => {
444                let status = response.status();
445                if status.is_success() {
446                    response
447                        .bytes()
448                        .await
449                        .map(|b| b.to_vec())
450                        .map_err(|e| ConnectionError::ConnectionFailed(e.to_string()))
451                } else {
452                    Err(ConnectionError::HttpError {
453                        status: status.as_u16(),
454                    })
455                }
456            }
457            Ok(Err(e)) => Err(ConnectionError::ConnectionFailed(e.to_string())),
458            Err(_) => Err(ConnectionError::Timeout),
459        }
460    }
461
462    /// Acquires a connection from the pool.
463    async fn acquire_connection(&self) -> Result<PooledConnection, ConnectionError> {
464        // Try to get an existing idle connection
465        {
466            let mut connections = self.connections.write().await;
467            if let Some(pos) = connections.iter().position(|c| !c.in_use) {
468                connections[pos].mark_used();
469                return Ok(connections.remove(pos));
470            }
471        }
472
473        // No idle connection available, try to create a new one
474        if let Ok(_permit) = self.connection_semaphore.try_acquire() {
475            let mut next_id = self.next_connection_id.lock().await;
476            let id = *next_id;
477            *next_id += 1;
478
479            let connection = PooledConnection::new(id, &self.config);
480
481            let mut stats = self.stats.write().await;
482            stats.total_connections_created += 1;
483            stats.active_connections += 1;
484
485            return Ok(connection);
486        }
487
488        // Pool is full, wait for a connection to become available
489        let _permit = self
490            .connection_semaphore
491            .acquire()
492            .await
493            .map_err(|_| ConnectionError::InvalidConfig("Semaphore closed".to_string()))?;
494
495        let mut next_id = self.next_connection_id.lock().await;
496        let id = *next_id;
497        *next_id += 1;
498
499        let connection = PooledConnection::new(id, &self.config);
500
501        let mut stats = self.stats.write().await;
502        stats.total_connections_created += 1;
503        stats.active_connections += 1;
504
505        Ok(connection)
506    }
507
508    /// Releases a connection back to the pool.
509    async fn release_connection(&self, mut connection: PooledConnection) {
510        connection.release();
511
512        // Check if the connection should be closed due to idle timeout
513        if connection.is_idle(self.config.idle_timeout) {
514            let mut stats = self.stats.write().await;
515            stats.total_connections_closed += 1;
516            stats.active_connections = stats.active_connections.saturating_sub(1);
517            return;
518        }
519
520        // Add back to the pool
521        let mut connections = self.connections.write().await;
522        connections.push(connection);
523    }
524
525    /// Calculates the backoff delay for a retry attempt.
526    #[must_use]
527    #[inline]
528    fn calculate_backoff_delay(&self, attempt: usize) -> Duration {
529        let delay_ms = self.config.retry_base_delay.as_millis() as u64 * 2u64.pow(attempt as u32);
530        let delay = Duration::from_millis(delay_ms);
531        delay.min(self.config.retry_max_delay)
532    }
533
534    /// Checks if an error should be retried.
535    #[must_use]
536    #[inline]
537    fn should_retry(&self, error: &ConnectionError) -> bool {
538        matches!(
539            error,
540            ConnectionError::Timeout | ConnectionError::ConnectionFailed(_)
541        )
542    }
543
544    /// Gets the current pool statistics.
545    pub async fn stats(&self) -> PoolStats {
546        let mut stats = self.stats.read().await.clone();
547
548        // Update active/idle connection counts
549        let connections = self.connections.read().await;
550        stats.active_connections = connections.iter().filter(|c| c.in_use).count();
551        stats.idle_connections = connections.iter().filter(|c| !c.in_use).count();
552
553        // Calculate average requests per connection
554        if stats.total_connections_created > 0 {
555            stats.avg_requests_per_connection =
556                stats.total_requests as f64 / stats.total_connections_created as f64;
557        }
558
559        stats
560    }
561
562    /// Closes all idle connections.
563    pub async fn close_idle_connections(&self) {
564        let mut connections = self.connections.write().await;
565        let idle_timeout = self.config.idle_timeout;
566
567        let closed_count = connections
568            .iter()
569            .filter(|conn| conn.is_idle(idle_timeout))
570            .count();
571
572        connections.retain(|conn| !conn.is_idle(idle_timeout));
573
574        // Update stats
575        if closed_count > 0 {
576            let mut stats = self.stats.write().await;
577            stats.total_connections_closed += closed_count as u64;
578            stats.active_connections = stats.active_connections.saturating_sub(closed_count);
579        }
580    }
581
582    /// Gets the pool configuration.
583    #[must_use]
584    #[inline]
585    pub fn config(&self) -> &PoolConfig {
586        &self.config
587    }
588
589    /// Gets the base URL.
590    #[must_use]
591    #[inline]
592    pub fn base_url(&self) -> &str {
593        &self.base_url
594    }
595}
596
597#[cfg(test)]
598mod tests {
599    use super::*;
600
601    #[test]
602    fn test_pool_config_default() {
603        let config = PoolConfig::default();
604        assert_eq!(config.max_connections(), 10);
605        assert_eq!(config.idle_timeout(), Duration::from_secs(60));
606        assert_eq!(config.connect_timeout(), Duration::from_secs(10));
607        assert_eq!(config.request_timeout(), Duration::from_secs(30));
608        assert_eq!(config.max_retries(), 3);
609    }
610
611    #[test]
612    fn test_pool_config_builder() {
613        let config = PoolConfig::new()
614            .with_max_connections(20)
615            .with_idle_timeout(Duration::from_secs(120))
616            .with_connect_timeout(Duration::from_secs(5))
617            .with_request_timeout(Duration::from_secs(60))
618            .with_max_retries(5)
619            .with_tcp_keepalive(false);
620
621        assert_eq!(config.max_connections(), 20);
622        assert_eq!(config.idle_timeout(), Duration::from_secs(120));
623        assert_eq!(config.connect_timeout(), Duration::from_secs(5));
624        assert_eq!(config.request_timeout(), Duration::from_secs(60));
625        assert_eq!(config.max_retries(), 5);
626    }
627
628    #[test]
629    fn test_pooled_connection_creation() {
630        let config = PoolConfig::default();
631        let conn = PooledConnection::new(0, &config);
632        assert_eq!(conn.id(), 0);
633        assert_eq!(conn.requests_served(), 0);
634        assert!(!conn.in_use);
635    }
636
637    #[test]
638    fn test_pooled_connection_mark_used() {
639        let config = PoolConfig::default();
640        let mut conn = PooledConnection::new(0, &config);
641        conn.mark_used();
642        assert!(conn.in_use);
643        assert_eq!(conn.requests_served(), 1);
644    }
645
646    #[test]
647    fn test_pooled_connection_release() {
648        let config = PoolConfig::default();
649        let mut conn = PooledConnection::new(0, &config);
650        conn.mark_used();
651        conn.release();
652        assert!(!conn.in_use);
653        assert_eq!(conn.requests_served(), 1);
654    }
655
656    #[test]
657    fn test_pooled_connection_idle() {
658        let config = PoolConfig::default();
659        let conn = PooledConnection::new(0, &config);
660        // Immediately after creation, connection is not idle
661        assert!(!conn.is_idle(Duration::from_millis(1)));
662    }
663
664    #[test]
665    fn test_calculate_backoff_delay() {
666        let config = PoolConfig::default();
667        let pool = ConnectionPool::new("http://localhost", config);
668
669        let delay0 = pool.calculate_backoff_delay(0);
670        let delay1 = pool.calculate_backoff_delay(1);
671        let delay2 = pool.calculate_backoff_delay(2);
672
673        assert_eq!(delay0, Duration::from_millis(100)); // 100 * 2^0 = 100
674        assert_eq!(delay1, Duration::from_millis(200)); // 100 * 2^1 = 200
675        assert_eq!(delay2, Duration::from_millis(400)); // 100 * 2^2 = 400
676    }
677
678    #[test]
679    fn test_should_retry() {
680        let config = PoolConfig::default();
681        let pool = ConnectionPool::new("http://localhost", config);
682
683        assert!(pool.should_retry(&ConnectionError::Timeout));
684        assert!(pool.should_retry(&ConnectionError::ConnectionFailed("test".to_string())));
685        assert!(!pool.should_retry(&ConnectionError::HttpError { status: 400 }));
686    }
687
688    #[tokio::test]
689    async fn test_pool_creation() {
690        let config = PoolConfig::default();
691        let pool = ConnectionPool::new("http://localhost:8080", config);
692        assert_eq!(pool.base_url(), "http://localhost:8080");
693    }
694
695    #[tokio::test]
696    async fn test_pool_stats_initial() {
697        let config = PoolConfig::default();
698        let pool = ConnectionPool::new("http://localhost:8080", config);
699        let stats = pool.stats().await;
700        assert_eq!(stats.total_requests, 0);
701        assert_eq!(stats.successful_requests, 0);
702        assert_eq!(stats.failed_requests, 0);
703        assert_eq!(stats.active_connections, 0);
704        assert_eq!(stats.idle_connections, 0);
705    }
706
707    #[tokio::test]
708    async fn test_pool_close_idle_connections() {
709        let config = PoolConfig::default().with_idle_timeout(Duration::from_millis(10));
710        let pool = ConnectionPool::new("http://localhost:8080", config);
711
712        // Initially no connections
713        let stats = pool.stats().await;
714        assert_eq!(stats.idle_connections, 0);
715
716        // Close idle connections (should be a no-op)
717        pool.close_idle_connections().await;
718
719        let stats = pool.stats().await;
720        assert_eq!(stats.idle_connections, 0);
721    }
722}