use crate::connection::RedisConnection;
use crate::core::{
config::{ConnectionConfig, PoolStrategy},
error::{RedisError, RedisResult},
value::RespValue,
};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex, RwLock, Semaphore};
use tracing::{debug, warn};
struct CommandRequest {
command: String,
args: Vec<RespValue>,
response_tx: tokio::sync::oneshot::Sender<RedisResult<RespValue>>,
}
pub struct MultiplexedPool {
command_tx: mpsc::UnboundedSender<CommandRequest>,
}
impl MultiplexedPool {
pub async fn new(config: ConnectionConfig, host: String, port: u16) -> RedisResult<Self> {
let (command_tx, mut command_rx) = mpsc::unbounded_channel::<CommandRequest>();
tokio::spawn(async move {
let mut conn = match RedisConnection::connect(&host, port, config.clone()).await {
Ok(conn) => conn,
Err(e) => {
warn!("Failed to create multiplexed connection: {:?}", e);
return;
}
};
while let Some(req) = command_rx.recv().await {
let result = conn.execute_command(&req.command, &req.args).await;
let _ = req.response_tx.send(result);
}
debug!("Multiplexed connection handler stopped");
});
Ok(Self { command_tx })
}
pub async fn execute_command(
&self,
command: String,
args: Vec<RespValue>,
) -> RedisResult<RespValue> {
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
self.command_tx
.send(CommandRequest {
command,
args,
response_tx,
})
.map_err(|_| RedisError::Connection("Multiplexed connection closed".to_string()))?;
response_rx
.await
.map_err(|_| RedisError::Connection("Response channel closed".to_string()))?
}
}
pub struct ConnectionPool {
connections: Arc<RwLock<Vec<Arc<Mutex<RedisConnection>>>>>,
semaphore: Arc<Semaphore>,
config: ConnectionConfig,
host: String,
port: u16,
}
impl ConnectionPool {
pub async fn new(
config: ConnectionConfig,
host: String,
port: u16,
max_size: usize,
) -> RedisResult<Self> {
let mut connections = Vec::new();
let initial_size = config.pool.min_idle.min(max_size).max(1);
for _ in 0..initial_size {
let conn = RedisConnection::connect(&host, port, config.clone()).await?;
connections.push(Arc::new(Mutex::new(conn)));
}
Ok(Self {
connections: Arc::new(RwLock::new(connections)),
semaphore: Arc::new(Semaphore::new(max_size)),
config,
host,
port,
})
}
async fn get_connection(&self) -> RedisResult<Arc<Mutex<RedisConnection>>> {
let _permit = self
.semaphore
.acquire()
.await
.map_err(|_| RedisError::Pool("Failed to acquire permit".to_string()))?;
{
let mut connections = self.connections.write().await;
if let Some(conn) = connections.pop() {
return Ok(conn);
}
}
let conn = RedisConnection::connect(&self.host, self.port, self.config.clone()).await?;
Ok(Arc::new(Mutex::new(conn)))
}
async fn return_connection(&self, conn: Arc<Mutex<RedisConnection>>) {
let mut connections = self.connections.write().await;
connections.push(conn);
}
pub async fn execute_command(
&self,
command: String,
args: Vec<RespValue>,
) -> RedisResult<RespValue> {
let conn = self.get_connection().await?;
let result = {
let mut conn_guard = conn.lock().await;
conn_guard.execute_command(&command, &args).await
};
self.return_connection(conn).await;
result
}
}
pub enum Pool {
Multiplexed(MultiplexedPool),
Pool(Box<ConnectionPool>),
}
impl Pool {
pub async fn new(config: ConnectionConfig, host: String, port: u16) -> RedisResult<Self> {
match config.pool.strategy {
PoolStrategy::Multiplexed => {
let pool = MultiplexedPool::new(config, host, port).await?;
Ok(Self::Multiplexed(pool))
}
PoolStrategy::Pool => {
let pool =
ConnectionPool::new(config.clone(), host, port, config.pool.max_size).await?;
Ok(Self::Pool(Box::new(pool)))
}
}
}
pub async fn execute_command(
&self,
command: String,
args: Vec<RespValue>,
) -> RedisResult<RespValue> {
match self {
Self::Multiplexed(pool) => pool.execute_command(command, args).await,
Self::Pool(pool) => pool.execute_command(command, args).await,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::config::PoolConfig;
#[test]
fn test_pool_config() {
let config = ConnectionConfig::new("redis://localhost:6379");
assert_eq!(config.pool.strategy, PoolStrategy::Multiplexed);
}
#[test]
fn test_custom_pool_config() {
let mut config = ConnectionConfig::new("redis://localhost:6379");
config.pool = PoolConfig {
strategy: PoolStrategy::Pool,
max_size: 20,
min_idle: 5,
..Default::default()
};
assert_eq!(config.pool.strategy, PoolStrategy::Pool);
assert_eq!(config.pool.max_size, 20);
}
}