use crate::{Command, Connection, Result};
use futures::lock::{Mutex, MutexGuard};
use std::sync::Arc;
#[derive(Clone)]
pub struct ConnectionPool {
connections: Vec<Arc<Mutex<Connection>>>,
address: Arc<String>,
}
impl ConnectionPool {
pub async fn create(
address: String,
password: Option<&str>,
connection_count: usize,
) -> Result<Self> {
let connections = Vec::new();
let mut out = Self {
connections,
address: Arc::new(address),
};
for i in 0..connection_count {
let mut conn = Connection::connect(out.address.as_ref(), password).await?;
let client_name = format!("darkredis-{}", i + 1);
conn.run_command(Command::new("CLIENT").arg(b"SETNAME").arg(&client_name))
.await?;
out.connections.push(Arc::new(Mutex::new(conn)));
}
Ok(out)
}
pub async fn get(&self) -> MutexGuard<'_, Connection> {
for conn in self.connections.iter() {
if let Some(lock) = conn.try_lock() {
return lock;
}
}
let lockers = self.connections.iter().map(|l| l.lock());
futures::future::select_all(lockers).await.0
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::Value;
#[runtime::test]
async fn pooling() {
let connections = 4; let pool = ConnectionPool::create(crate::test::TEST_ADDRESS.into(), None, connections)
.await
.unwrap();
let mut locks = Vec::with_capacity(connections);
for i in 0..connections - 1 {
let mut conn = pool.get().await;
let command = Command::new("CLIENT").arg(b"GETNAME");
assert_eq!(
conn.run_command(command).await.unwrap(),
Value::String(format!("darkredis-{}", i + 1).into_bytes())
);
locks.push(conn);
}
}
}