redis_oxide/
pool_optimized.rs

1//! Optimized connection pooling implementations
2//!
3//! This module provides optimized connection pooling strategies with:
4//! - Multiple worker tasks for multiplexed pools
5//! - Lock-free connection management
6//! - Connection health monitoring
7//! - Adaptive pool sizing
8
9#![allow(unused_variables)]
10#![allow(dead_code)]
11#![allow(missing_docs)]
12
13use crate::connection::RedisConnection;
14use crate::core::{
15    config::{ConnectionConfig, PoolStrategy},
16    error::{RedisError, RedisResult},
17    value::RespValue,
18};
19use std::sync::{
20    atomic::{AtomicBool, AtomicUsize, Ordering},
21    Arc,
22};
23use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
24use tokio::time::{Duration, Instant};
25use tracing::{debug, error, info, warn};
26
27/// Request to execute a command through the optimized multiplexed connection
28#[derive(Debug)]
29struct OptimizedCommandRequest {
30    command: String,
31    args: Vec<RespValue>,
32    response_tx: tokio::sync::oneshot::Sender<RedisResult<RespValue>>,
33    timestamp: Instant,
34}
35
36/// Statistics for monitoring pool performance
37#[derive(Debug, Clone)]
38pub struct PoolStats {
39    pub active_connections: usize,
40    pub pending_requests: usize,
41    pub total_requests: u64,
42    pub failed_requests: u64,
43    pub average_response_time_ms: f64,
44    pub worker_count: usize,
45}
46
47/// Optimized multiplexed connection pool with multiple workers
48pub struct OptimizedMultiplexedPool {
49    command_tx: mpsc::UnboundedSender<OptimizedCommandRequest>,
50    worker_count: Arc<AtomicUsize>,
51    stats: Arc<RwLock<PoolStats>>,
52    shutdown: Arc<AtomicBool>,
53    config: ConnectionConfig,
54    host: String,
55    port: u16,
56}
57
58impl OptimizedMultiplexedPool {
59    /// Create a new optimized multiplexed pool
60    pub async fn new(config: ConnectionConfig, host: String, port: u16) -> RedisResult<Self> {
61        let (command_tx, command_rx) = mpsc::unbounded_channel::<OptimizedCommandRequest>();
62        let command_rx = Arc::new(Mutex::new(command_rx));
63
64        let worker_count = Arc::new(AtomicUsize::new(0));
65        let stats = Arc::new(RwLock::new(PoolStats {
66            active_connections: 0,
67            pending_requests: 0,
68            total_requests: 0,
69            failed_requests: 0,
70            average_response_time_ms: 0.0,
71            worker_count: 0,
72        }));
73        let shutdown = Arc::new(AtomicBool::new(false));
74
75        let pool = Self {
76            command_tx,
77            worker_count: worker_count.clone(),
78            stats: stats.clone(),
79            shutdown: shutdown.clone(),
80            config: config.clone(),
81            host: host.clone(),
82            port,
83        };
84
85        // Start initial worker
86        pool.spawn_worker(command_rx.clone()).await?;
87
88        // Start monitoring task for adaptive scaling
89        pool.start_monitoring_task().await;
90
91        Ok(pool)
92    }
93
94    /// Spawn a new worker task
95    async fn spawn_worker(
96        &self,
97        command_rx: Arc<Mutex<mpsc::UnboundedReceiver<OptimizedCommandRequest>>>,
98    ) -> RedisResult<()> {
99        let config = self.config.clone();
100        let host = self.host.clone();
101        let port = self.port;
102        let worker_count = self.worker_count.clone();
103        let stats = self.stats.clone();
104        let shutdown = self.shutdown.clone();
105
106        tokio::spawn(async move {
107            let worker_id = worker_count.fetch_add(1, Ordering::SeqCst);
108            debug!("Starting optimized worker {}", worker_id);
109
110            // Create connection with retry logic
111            let mut conn = match Self::create_connection_with_retry(&host, port, &config).await {
112                Ok(conn) => conn,
113                Err(e) => {
114                    error!(
115                        "Failed to create connection for worker {}: {:?}",
116                        worker_id, e
117                    );
118                    worker_count.fetch_sub(1, Ordering::SeqCst);
119                    return;
120                }
121            };
122
123            // Update stats
124            {
125                let mut stats_guard = stats.write().await;
126                stats_guard.active_connections += 1;
127                stats_guard.worker_count = worker_count.load(Ordering::SeqCst);
128            }
129
130            let mut last_health_check = Instant::now();
131            let health_check_interval = Duration::from_secs(30);
132
133            while !shutdown.load(Ordering::SeqCst) {
134                // Health check
135                if last_health_check.elapsed() > health_check_interval {
136                    if let Err(e) = Self::health_check(&mut conn).await {
137                        warn!("Health check failed for worker {}: {:?}", worker_id, e);
138                        // Try to reconnect
139                        match Self::create_connection_with_retry(&host, port, &config).await {
140                            Ok(new_conn) => {
141                                conn = new_conn;
142                                info!("Reconnected worker {}", worker_id);
143                            }
144                            Err(e) => {
145                                error!("Failed to reconnect worker {}: {:?}", worker_id, e);
146                                break;
147                            }
148                        }
149                    }
150                    last_health_check = Instant::now();
151                }
152
153                // Process requests with timeout
154                let request = {
155                    let mut rx = command_rx.lock().await;
156                    tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
157                };
158
159                match request {
160                    Ok(Some(req)) => {
161                        let start_time = Instant::now();
162                        let result = conn.execute_command(&req.command, &req.args).await;
163                        let response_time = start_time.elapsed();
164
165                        // Update stats
166                        {
167                            let mut stats_guard = stats.write().await;
168                            stats_guard.total_requests += 1;
169                            if result.is_err() {
170                                stats_guard.failed_requests += 1;
171                            }
172                            // Update average response time (simple moving average)
173                            let current_avg = stats_guard.average_response_time_ms;
174                            let new_time = response_time.as_millis() as f64;
175                            stats_guard.average_response_time_ms =
176                                (current_avg * 0.9) + (new_time * 0.1);
177                        }
178
179                        // Send response (ignore send errors - client may have dropped)
180                        let _ = req.response_tx.send(result);
181                    }
182                    Ok(None) => {
183                        // Channel closed
184                        break;
185                    }
186                    Err(_) => {
187                        // Timeout - continue to next iteration
188                    }
189                }
190            }
191
192            // Update stats on worker exit
193            {
194                let mut stats_guard = stats.write().await;
195                stats_guard.active_connections = stats_guard.active_connections.saturating_sub(1);
196                stats_guard.worker_count = worker_count
197                    .fetch_sub(1, Ordering::SeqCst)
198                    .saturating_sub(1);
199            }
200
201            debug!("Optimized worker {} stopped", worker_id);
202        });
203
204        Ok(())
205    }
206
207    /// Create connection with retry logic
208    async fn create_connection_with_retry(
209        host: &str,
210        port: u16,
211        config: &ConnectionConfig,
212    ) -> RedisResult<RedisConnection> {
213        let mut attempts = 0;
214        let max_attempts = 3;
215        let mut delay = Duration::from_millis(100);
216
217        loop {
218            match RedisConnection::connect(host, port, config.clone()).await {
219                Ok(conn) => return Ok(conn),
220                Err(e) => {
221                    attempts += 1;
222                    if attempts >= max_attempts {
223                        return Err(e);
224                    }
225                    warn!(
226                        "Connection attempt {} failed: {:?}, retrying in {:?}",
227                        attempts, e, delay
228                    );
229                    tokio::time::sleep(delay).await;
230                    delay *= 2; // Exponential backoff
231                }
232            }
233        }
234    }
235
236    /// Perform health check on connection
237    async fn health_check(conn: &mut RedisConnection) -> RedisResult<()> {
238        // Simple PING command
239        conn.execute_command("PING", &[]).await.map(|_| ())
240    }
241
242    /// Start monitoring task for adaptive scaling
243    async fn start_monitoring_task(&self) {
244        let command_tx = self.command_tx.clone();
245        let worker_count = self.worker_count.clone();
246        let stats = self.stats.clone();
247        let shutdown = self.shutdown.clone();
248        let config = self.config.clone();
249        let host = self.host.clone();
250        let port = self.port;
251
252        tokio::spawn(async move {
253            let mut check_interval = tokio::time::interval(Duration::from_secs(10));
254            let command_rx = Arc::new(Mutex::new(
255                // This is a dummy receiver since we can't clone the original
256                mpsc::unbounded_channel::<OptimizedCommandRequest>().1,
257            ));
258
259            while !shutdown.load(Ordering::SeqCst) {
260                check_interval.tick().await;
261
262                let stats_snapshot = {
263                    let stats_guard = stats.read().await;
264                    stats_guard.clone()
265                };
266
267                // Adaptive scaling logic
268                let current_workers = worker_count.load(Ordering::SeqCst);
269                // UnboundedSender doesn't have capacity(), so we'll use a different metric
270                let pending_requests = 0; // This would need to be tracked separately
271                let avg_response_time = stats_snapshot.average_response_time_ms;
272
273                // Scale up if:
274                // - High pending requests
275                // - High response time
276                // - Not too many workers already
277                if (pending_requests > 10 || avg_response_time > 100.0) && current_workers < 8 {
278                    info!(
279                        "Scaling up: adding worker (current: {}, pending: {}, avg_time: {:.2}ms)",
280                        current_workers, pending_requests, avg_response_time
281                    );
282
283                    // This is simplified - in real implementation we'd need proper worker spawning
284                    // For now, just log the scaling decision
285                }
286                // Scale down if:
287                // - Low pending requests
288                // - Low response time
289                // - More than 1 worker
290                else if pending_requests < 2 && avg_response_time < 50.0 && current_workers > 1 {
291                    info!(
292                        "Could scale down: current workers: {}, pending: {}, avg_time: {:.2}ms",
293                        current_workers, pending_requests, avg_response_time
294                    );
295                }
296            }
297        });
298    }
299
300    /// Execute a command through the optimized multiplexed pool
301    pub async fn execute_command(
302        &self,
303        command: String,
304        args: Vec<RespValue>,
305    ) -> RedisResult<RespValue> {
306        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
307
308        let request = OptimizedCommandRequest {
309            command,
310            args,
311            response_tx,
312            timestamp: Instant::now(),
313        };
314
315        self.command_tx.send(request).map_err(|_| {
316            RedisError::Connection("Optimized multiplexed connection closed".to_string())
317        })?;
318
319        // Update pending requests count
320        {
321            let mut stats_guard = self.stats.write().await;
322            stats_guard.pending_requests += 1;
323        }
324
325        let result = response_rx
326            .await
327            .map_err(|_| RedisError::Connection("Response channel closed".to_string()))?;
328
329        // Update pending requests count
330        {
331            let mut stats_guard = self.stats.write().await;
332            stats_guard.pending_requests = stats_guard.pending_requests.saturating_sub(1);
333        }
334
335        result
336    }
337
338    /// Get current pool statistics
339    pub async fn stats(&self) -> PoolStats {
340        self.stats.read().await.clone()
341    }
342
343    /// Shutdown the pool gracefully
344    pub async fn shutdown(&self) {
345        self.shutdown.store(true, Ordering::SeqCst);
346        info!("Optimized multiplexed pool shutdown initiated");
347    }
348}
349
350/// Lock-free connection pool with atomic operations
351pub struct LockFreeConnectionPool {
352    connections: Arc<RwLock<Vec<Arc<Mutex<RedisConnection>>>>>,
353    available_count: Arc<AtomicUsize>,
354    total_count: Arc<AtomicUsize>,
355    semaphore: Arc<Semaphore>,
356    config: ConnectionConfig,
357    host: String,
358    port: u16,
359    stats: Arc<RwLock<PoolStats>>,
360}
361
362impl LockFreeConnectionPool {
363    /// Create a new lock-free connection pool
364    pub async fn new(
365        config: ConnectionConfig,
366        host: String,
367        port: u16,
368        max_size: usize,
369    ) -> RedisResult<Self> {
370        let mut connections = Vec::new();
371        let initial_size = config.pool.min_idle.min(max_size).max(1);
372
373        // Create initial connections
374        for _ in 0..initial_size {
375            let conn = RedisConnection::connect(&host, port, config.clone()).await?;
376            connections.push(Arc::new(Mutex::new(conn)));
377        }
378
379        let stats = Arc::new(RwLock::new(PoolStats {
380            active_connections: initial_size,
381            pending_requests: 0,
382            total_requests: 0,
383            failed_requests: 0,
384            average_response_time_ms: 0.0,
385            worker_count: 0,
386        }));
387
388        Ok(Self {
389            connections: Arc::new(RwLock::new(connections)),
390            available_count: Arc::new(AtomicUsize::new(initial_size)),
391            total_count: Arc::new(AtomicUsize::new(initial_size)),
392            semaphore: Arc::new(Semaphore::new(max_size)),
393            config,
394            host,
395            port,
396            stats,
397        })
398    }
399
400    /// Get a connection from the pool with validation
401    async fn get_validated_connection(&self) -> RedisResult<Arc<Mutex<RedisConnection>>> {
402        // Acquire semaphore permit
403        let _permit = self
404            .semaphore
405            .acquire()
406            .await
407            .map_err(|_| RedisError::Pool("Failed to acquire permit".to_string()))?;
408
409        // Try to get an existing connection
410        let conn = {
411            let mut connections = self.connections.write().await;
412            if let Some(conn) = connections.pop() {
413                self.available_count.fetch_sub(1, Ordering::SeqCst);
414                Some(conn)
415            } else {
416                None
417            }
418        };
419
420        let conn = match conn {
421            Some(conn) => conn,
422            None => {
423                // Create a new connection
424                let new_conn =
425                    RedisConnection::connect(&self.host, self.port, self.config.clone()).await?;
426                self.total_count.fetch_add(1, Ordering::SeqCst);
427                Arc::new(Mutex::new(new_conn))
428            }
429        };
430
431        // Validate connection
432        {
433            let mut conn_guard = conn.lock().await;
434            if let Err(_) = conn_guard.execute_command("PING", &[]).await {
435                // Connection is stale, create a new one
436                let new_conn =
437                    RedisConnection::connect(&self.host, self.port, self.config.clone()).await?;
438                *conn_guard = new_conn;
439            }
440        }
441
442        Ok(conn)
443    }
444
445    /// Return a connection to the pool
446    async fn return_connection(&self, conn: Arc<Mutex<RedisConnection>>) {
447        let mut connections = self.connections.write().await;
448        connections.push(conn);
449        self.available_count.fetch_add(1, Ordering::SeqCst);
450    }
451
452    /// Execute a command using a connection from the pool
453    pub async fn execute_command(
454        &self,
455        command: String,
456        args: Vec<RespValue>,
457    ) -> RedisResult<RespValue> {
458        let start_time = Instant::now();
459        let conn = self.get_validated_connection().await?;
460
461        let result = {
462            let mut conn_guard = conn.lock().await;
463            conn_guard.execute_command(&command, &args).await
464        };
465
466        // Return connection to pool
467        self.return_connection(conn).await;
468
469        // Update stats
470        let response_time = start_time.elapsed();
471        {
472            let mut stats_guard = self.stats.write().await;
473            stats_guard.total_requests += 1;
474            if result.is_err() {
475                stats_guard.failed_requests += 1;
476            }
477            let current_avg = stats_guard.average_response_time_ms;
478            let new_time = response_time.as_millis() as f64;
479            stats_guard.average_response_time_ms = (current_avg * 0.9) + (new_time * 0.1);
480        }
481
482        result
483    }
484
485    /// Get current pool statistics
486    pub async fn stats(&self) -> PoolStats {
487        let mut stats = self.stats.read().await.clone();
488        stats.active_connections = self.total_count.load(Ordering::SeqCst);
489        stats
490    }
491}
492
493/// Unified optimized pool abstraction
494pub enum OptimizedPool {
495    /// Optimized multiplexed connection
496    Multiplexed(OptimizedMultiplexedPool),
497    /// Lock-free connection pool
498    Pool(Box<LockFreeConnectionPool>),
499}
500
501impl OptimizedPool {
502    /// Create a new optimized pool based on the configuration
503    pub async fn new(config: ConnectionConfig, host: String, port: u16) -> RedisResult<Self> {
504        match config.pool.strategy {
505            PoolStrategy::Multiplexed => {
506                let pool = OptimizedMultiplexedPool::new(config, host, port).await?;
507                Ok(Self::Multiplexed(pool))
508            }
509            PoolStrategy::Pool => {
510                let pool =
511                    LockFreeConnectionPool::new(config.clone(), host, port, config.pool.max_size)
512                        .await?;
513                Ok(Self::Pool(Box::new(pool)))
514            }
515        }
516    }
517
518    /// Execute a command through the optimized pool
519    pub async fn execute_command(
520        &self,
521        command: String,
522        args: Vec<RespValue>,
523    ) -> RedisResult<RespValue> {
524        match self {
525            Self::Multiplexed(pool) => pool.execute_command(command, args).await,
526            Self::Pool(pool) => pool.execute_command(command, args).await,
527        }
528    }
529
530    /// Get current pool statistics
531    pub async fn stats(&self) -> PoolStats {
532        match self {
533            Self::Multiplexed(pool) => pool.stats().await,
534            Self::Pool(pool) => pool.stats().await,
535        }
536    }
537
538    /// Shutdown the pool gracefully
539    pub async fn shutdown(&self) {
540        match self {
541            Self::Multiplexed(pool) => pool.shutdown().await,
542            Self::Pool(_) => {
543                // Lock-free pool doesn't need explicit shutdown
544                info!("Lock-free pool shutdown");
545            }
546        }
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553
554    #[tokio::test]
555    async fn test_optimized_pool_stats() {
556        let config = ConnectionConfig::default();
557
558        // This test would require a real Redis connection
559        // For now, just test the stats structure
560        let stats = PoolStats {
561            active_connections: 2,
562            pending_requests: 0,
563            total_requests: 100,
564            failed_requests: 5,
565            average_response_time_ms: 25.5,
566            worker_count: 2,
567        };
568
569        assert_eq!(stats.active_connections, 2);
570        assert_eq!(stats.total_requests, 100);
571        assert_eq!(stats.failed_requests, 5);
572        assert!((stats.average_response_time_ms - 25.5).abs() < f64::EPSILON);
573    }
574}