#![allow(unused_variables)]
#![allow(dead_code)]
#![allow(missing_docs)]
use crate::connection::RedisConnection;
use crate::core::{
config::{ConnectionConfig, PoolStrategy},
error::{RedisError, RedisResult},
value::RespValue,
};
use std::sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
};
use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
use tokio::time::{Duration, Instant};
use tracing::{debug, error, info, warn};
#[derive(Debug)]
struct OptimizedCommandRequest {
command: String,
args: Vec<RespValue>,
response_tx: tokio::sync::oneshot::Sender<RedisResult<RespValue>>,
timestamp: Instant,
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub active_connections: usize,
pub pending_requests: usize,
pub total_requests: u64,
pub failed_requests: u64,
pub average_response_time_ms: f64,
pub worker_count: usize,
}
pub struct OptimizedMultiplexedPool {
command_tx: mpsc::UnboundedSender<OptimizedCommandRequest>,
worker_count: Arc<AtomicUsize>,
stats: Arc<RwLock<PoolStats>>,
shutdown: Arc<AtomicBool>,
config: ConnectionConfig,
host: String,
port: u16,
}
impl OptimizedMultiplexedPool {
pub async fn new(config: ConnectionConfig, host: String, port: u16) -> RedisResult<Self> {
let (command_tx, command_rx) = mpsc::unbounded_channel::<OptimizedCommandRequest>();
let command_rx = Arc::new(Mutex::new(command_rx));
let worker_count = Arc::new(AtomicUsize::new(0));
let stats = Arc::new(RwLock::new(PoolStats {
active_connections: 0,
pending_requests: 0,
total_requests: 0,
failed_requests: 0,
average_response_time_ms: 0.0,
worker_count: 0,
}));
let shutdown = Arc::new(AtomicBool::new(false));
let pool = Self {
command_tx,
worker_count: worker_count.clone(),
stats: stats.clone(),
shutdown: shutdown.clone(),
config: config.clone(),
host: host.clone(),
port,
};
pool.spawn_worker(command_rx.clone()).await?;
pool.start_monitoring_task().await;
Ok(pool)
}
async fn spawn_worker(
&self,
command_rx: Arc<Mutex<mpsc::UnboundedReceiver<OptimizedCommandRequest>>>,
) -> RedisResult<()> {
let config = self.config.clone();
let host = self.host.clone();
let port = self.port;
let worker_count = self.worker_count.clone();
let stats = self.stats.clone();
let shutdown = self.shutdown.clone();
tokio::spawn(async move {
let worker_id = worker_count.fetch_add(1, Ordering::SeqCst);
debug!("Starting optimized worker {}", worker_id);
let mut conn = match Self::create_connection_with_retry(&host, port, &config).await {
Ok(conn) => conn,
Err(e) => {
error!(
"Failed to create connection for worker {}: {:?}",
worker_id, e
);
worker_count.fetch_sub(1, Ordering::SeqCst);
return;
}
};
{
let mut stats_guard = stats.write().await;
stats_guard.active_connections += 1;
stats_guard.worker_count = worker_count.load(Ordering::SeqCst);
}
let mut last_health_check = Instant::now();
let health_check_interval = Duration::from_secs(30);
while !shutdown.load(Ordering::SeqCst) {
if last_health_check.elapsed() > health_check_interval {
if let Err(e) = Self::health_check(&mut conn).await {
warn!("Health check failed for worker {}: {:?}", worker_id, e);
match Self::create_connection_with_retry(&host, port, &config).await {
Ok(new_conn) => {
conn = new_conn;
info!("Reconnected worker {}", worker_id);
}
Err(e) => {
error!("Failed to reconnect worker {}: {:?}", worker_id, e);
break;
}
}
}
last_health_check = Instant::now();
}
let request = {
let mut rx = command_rx.lock().await;
tokio::time::timeout(Duration::from_millis(100), rx.recv()).await
};
match request {
Ok(Some(req)) => {
let start_time = Instant::now();
let result = conn.execute_command(&req.command, &req.args).await;
let response_time = start_time.elapsed();
{
let mut stats_guard = stats.write().await;
stats_guard.total_requests += 1;
if result.is_err() {
stats_guard.failed_requests += 1;
}
let current_avg = stats_guard.average_response_time_ms;
let new_time = response_time.as_millis() as f64;
stats_guard.average_response_time_ms =
(current_avg * 0.9) + (new_time * 0.1);
}
let _ = req.response_tx.send(result);
}
Ok(None) => {
break;
}
Err(_) => {
}
}
}
{
let mut stats_guard = stats.write().await;
stats_guard.active_connections = stats_guard.active_connections.saturating_sub(1);
stats_guard.worker_count = worker_count
.fetch_sub(1, Ordering::SeqCst)
.saturating_sub(1);
}
debug!("Optimized worker {} stopped", worker_id);
});
Ok(())
}
async fn create_connection_with_retry(
host: &str,
port: u16,
config: &ConnectionConfig,
) -> RedisResult<RedisConnection> {
let mut attempts = 0;
let max_attempts = 3;
let mut delay = Duration::from_millis(100);
loop {
match RedisConnection::connect(host, port, config.clone()).await {
Ok(conn) => return Ok(conn),
Err(e) => {
attempts += 1;
if attempts >= max_attempts {
return Err(e);
}
warn!(
"Connection attempt {} failed: {:?}, retrying in {:?}",
attempts, e, delay
);
tokio::time::sleep(delay).await;
delay *= 2; }
}
}
}
async fn health_check(conn: &mut RedisConnection) -> RedisResult<()> {
conn.execute_command("PING", &[]).await.map(|_| ())
}
async fn start_monitoring_task(&self) {
let command_tx = self.command_tx.clone();
let worker_count = self.worker_count.clone();
let stats = self.stats.clone();
let shutdown = self.shutdown.clone();
let config = self.config.clone();
let host = self.host.clone();
let port = self.port;
tokio::spawn(async move {
let mut check_interval = tokio::time::interval(Duration::from_secs(10));
let command_rx = Arc::new(Mutex::new(
mpsc::unbounded_channel::<OptimizedCommandRequest>().1,
));
while !shutdown.load(Ordering::SeqCst) {
check_interval.tick().await;
let stats_snapshot = {
let stats_guard = stats.read().await;
stats_guard.clone()
};
let current_workers = worker_count.load(Ordering::SeqCst);
let pending_requests = 0; let avg_response_time = stats_snapshot.average_response_time_ms;
if (pending_requests > 10 || avg_response_time > 100.0) && current_workers < 8 {
info!(
"Scaling up: adding worker (current: {}, pending: {}, avg_time: {:.2}ms)",
current_workers, pending_requests, avg_response_time
);
}
else if pending_requests < 2 && avg_response_time < 50.0 && current_workers > 1 {
info!(
"Could scale down: current workers: {}, pending: {}, avg_time: {:.2}ms",
current_workers, pending_requests, avg_response_time
);
}
}
});
}
pub async fn execute_command(
&self,
command: String,
args: Vec<RespValue>,
) -> RedisResult<RespValue> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let request = OptimizedCommandRequest {
command,
args,
response_tx,
timestamp: Instant::now(),
};
self.command_tx.send(request).map_err(|_| {
RedisError::Connection("Optimized multiplexed connection closed".to_string())
})?;
{
let mut stats_guard = self.stats.write().await;
stats_guard.pending_requests += 1;
}
let result = response_rx
.await
.map_err(|_| RedisError::Connection("Response channel closed".to_string()))?;
{
let mut stats_guard = self.stats.write().await;
stats_guard.pending_requests = stats_guard.pending_requests.saturating_sub(1);
}
result
}
pub async fn stats(&self) -> PoolStats {
self.stats.read().await.clone()
}
pub async fn shutdown(&self) {
self.shutdown.store(true, Ordering::SeqCst);
info!("Optimized multiplexed pool shutdown initiated");
}
}
pub struct LockFreeConnectionPool {
connections: Arc<RwLock<Vec<Arc<Mutex<RedisConnection>>>>>,
available_count: Arc<AtomicUsize>,
total_count: Arc<AtomicUsize>,
semaphore: Arc<Semaphore>,
config: ConnectionConfig,
host: String,
port: u16,
stats: Arc<RwLock<PoolStats>>,
}
impl LockFreeConnectionPool {
pub async fn new(
config: ConnectionConfig,
host: String,
port: u16,
max_size: usize,
) -> RedisResult<Self> {
let mut connections = Vec::new();
let initial_size = config.pool.min_idle.min(max_size).max(1);
for _ in 0..initial_size {
let conn = RedisConnection::connect(&host, port, config.clone()).await?;
connections.push(Arc::new(Mutex::new(conn)));
}
let stats = Arc::new(RwLock::new(PoolStats {
active_connections: initial_size,
pending_requests: 0,
total_requests: 0,
failed_requests: 0,
average_response_time_ms: 0.0,
worker_count: 0,
}));
Ok(Self {
connections: Arc::new(RwLock::new(connections)),
available_count: Arc::new(AtomicUsize::new(initial_size)),
total_count: Arc::new(AtomicUsize::new(initial_size)),
semaphore: Arc::new(Semaphore::new(max_size)),
config,
host,
port,
stats,
})
}
async fn get_validated_connection(&self) -> RedisResult<Arc<Mutex<RedisConnection>>> {
let _permit = self
.semaphore
.acquire()
.await
.map_err(|_| RedisError::Pool("Failed to acquire permit".to_string()))?;
let conn = {
let mut connections = self.connections.write().await;
if let Some(conn) = connections.pop() {
self.available_count.fetch_sub(1, Ordering::SeqCst);
Some(conn)
} else {
None
}
};
let conn = match conn {
Some(conn) => conn,
None => {
let new_conn =
RedisConnection::connect(&self.host, self.port, self.config.clone()).await?;
self.total_count.fetch_add(1, Ordering::SeqCst);
Arc::new(Mutex::new(new_conn))
}
};
{
let mut conn_guard = conn.lock().await;
if let Err(_) = conn_guard.execute_command("PING", &[]).await {
let new_conn =
RedisConnection::connect(&self.host, self.port, self.config.clone()).await?;
*conn_guard = new_conn;
}
}
Ok(conn)
}
async fn return_connection(&self, conn: Arc<Mutex<RedisConnection>>) {
let mut connections = self.connections.write().await;
connections.push(conn);
self.available_count.fetch_add(1, Ordering::SeqCst);
}
pub async fn execute_command(
&self,
command: String,
args: Vec<RespValue>,
) -> RedisResult<RespValue> {
let start_time = Instant::now();
let conn = self.get_validated_connection().await?;
let result = {
let mut conn_guard = conn.lock().await;
conn_guard.execute_command(&command, &args).await
};
self.return_connection(conn).await;
let response_time = start_time.elapsed();
{
let mut stats_guard = self.stats.write().await;
stats_guard.total_requests += 1;
if result.is_err() {
stats_guard.failed_requests += 1;
}
let current_avg = stats_guard.average_response_time_ms;
let new_time = response_time.as_millis() as f64;
stats_guard.average_response_time_ms = (current_avg * 0.9) + (new_time * 0.1);
}
result
}
pub async fn stats(&self) -> PoolStats {
let mut stats = self.stats.read().await.clone();
stats.active_connections = self.total_count.load(Ordering::SeqCst);
stats
}
}
pub enum OptimizedPool {
Multiplexed(OptimizedMultiplexedPool),
Pool(Box<LockFreeConnectionPool>),
}
impl OptimizedPool {
pub async fn new(config: ConnectionConfig, host: String, port: u16) -> RedisResult<Self> {
match config.pool.strategy {
PoolStrategy::Multiplexed => {
let pool = OptimizedMultiplexedPool::new(config, host, port).await?;
Ok(Self::Multiplexed(pool))
}
PoolStrategy::Pool => {
let pool =
LockFreeConnectionPool::new(config.clone(), host, port, config.pool.max_size)
.await?;
Ok(Self::Pool(Box::new(pool)))
}
}
}
pub async fn execute_command(
&self,
command: String,
args: Vec<RespValue>,
) -> RedisResult<RespValue> {
match self {
Self::Multiplexed(pool) => pool.execute_command(command, args).await,
Self::Pool(pool) => pool.execute_command(command, args).await,
}
}
pub async fn stats(&self) -> PoolStats {
match self {
Self::Multiplexed(pool) => pool.stats().await,
Self::Pool(pool) => pool.stats().await,
}
}
pub async fn shutdown(&self) {
match self {
Self::Multiplexed(pool) => pool.shutdown().await,
Self::Pool(_) => {
info!("Lock-free pool shutdown");
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_optimized_pool_stats() {
let config = ConnectionConfig::default();
let stats = PoolStats {
active_connections: 2,
pending_requests: 0,
total_requests: 100,
failed_requests: 5,
average_response_time_ms: 25.5,
worker_count: 2,
};
assert_eq!(stats.active_connections, 2);
assert_eq!(stats.total_requests, 100);
assert_eq!(stats.failed_requests, 5);
assert!((stats.average_response_time_ms - 25.5).abs() < f64::EPSILON);
}
}