use crate::{
clients::RedisClient,
error::{RedisError, RedisErrorKind},
interfaces::ClientLike,
types::{ConnectHandle, PerformanceConfig, ReconnectPolicy, RedisConfig},
utils,
};
use futures::future::{join_all, try_join_all};
use std::{
fmt,
ops::Deref,
sync::{atomic::AtomicUsize, Arc},
};
#[cfg(feature = "dns")]
use crate::types::Resolve;
#[derive(Clone)]
pub(crate) struct RedisPoolInner {
clients: Vec<RedisClient>,
last: Arc<AtomicUsize>,
}
#[derive(Clone)]
pub struct RedisPool {
inner: Arc<RedisPoolInner>,
}
impl fmt::Debug for RedisPool {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("RedisPool")
.field("size", &self.inner.clients.len())
.finish()
}
}
impl Deref for RedisPool {
type Target = RedisClient;
fn deref(&self) -> &Self::Target {
self.next()
}
}
impl<'a> From<&'a RedisPool> for &'a RedisClient {
fn from(p: &'a RedisPool) -> &'a RedisClient {
p.next()
}
}
impl<'a> From<&'a RedisPool> for RedisClient {
fn from(p: &'a RedisPool) -> RedisClient {
p.next().clone()
}
}
impl RedisPool {
pub fn new(
config: RedisConfig,
perf: Option<PerformanceConfig>,
policy: Option<ReconnectPolicy>,
size: usize,
) -> Result<Self, RedisError> {
if size > 0 {
let mut clients = Vec::with_capacity(size);
for _ in 0 .. size {
clients.push(RedisClient::new(config.clone(), perf.clone(), policy.clone()));
}
let last = Arc::new(AtomicUsize::new(0));
Ok(RedisPool {
inner: Arc::new(RedisPoolInner { clients, last }),
})
} else {
Err(RedisError::new(RedisErrorKind::Config, "Pool cannot be empty."))
}
}
pub fn clients(&self) -> &[RedisClient] {
&self.inner.clients
}
pub fn connect(&self) -> Vec<ConnectHandle> {
self.inner.clients.iter().map(|c| c.connect()).collect()
}
pub async fn wait_for_connect(&self) -> Result<(), RedisError> {
let futures = self.inner.clients.iter().map(|c| c.wait_for_connect());
let _ = try_join_all(futures).await?;
Ok(())
}
#[cfg(feature = "dns")]
#[cfg_attr(docsrs, doc(cfg(feature = "dns")))]
pub async fn set_resolver(&self, resolver: Arc<dyn Resolve>) {
for client in self.inner.clients.iter() {
client.set_resolver(resolver.clone()).await;
}
}
pub fn size(&self) -> usize {
self.inner.clients.len()
}
#[cfg(feature = "pool-prefer-active")]
pub fn next(&self) -> &RedisClient {
let mut idx = utils::incr_atomic(&self.inner.last) % self.inner.clients.len();
for _ in 0 .. self.inner.clients.len() {
let client = &self.inner.clients[idx];
if client.is_connected() {
return client;
}
idx = (idx + 1) % self.inner.clients.len();
}
&self.inner.clients[idx]
}
#[cfg(not(feature = "pool-prefer-active"))]
pub fn next(&self) -> &RedisClient {
&self.inner.clients[utils::incr_atomic(&self.inner.last) % self.inner.clients.len()]
}
pub fn last(&self) -> &RedisClient {
&self.inner.clients[utils::read_atomic(&self.inner.last) % self.inner.clients.len()]
}
pub async fn quit_pool(&self) {
let futures = self.inner.clients.iter().map(|c| c.quit());
let _ = join_all(futures).await;
}
}