use std::sync::Arc;
use std::time::Duration;
use bb8::{Pool, PooledConnection as Bb8Pooled};
use redis::aio::MultiplexedConnection;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, warn};
use crate::config::SentinelPoolConfig;
use crate::error::{Result, SentinelPoolError};
use crate::manager::{PooledConnection, SentinelConnectionManager};
use crate::watcher::spawn_watcher;
#[derive(Clone)]
pub struct SentinelPool {
inner: Arc<Inner>,
}
struct Inner {
pool: Pool<SentinelConnectionManager>,
manager: SentinelConnectionManager,
config: SentinelPoolConfig,
watcher_handle: parking_lot::Mutex<Option<JoinHandle<()>>>,
shutdown: Arc<Notify>,
}
impl SentinelPool {
pub async fn new(config: SentinelPoolConfig) -> Result<Self> {
let manager = SentinelConnectionManager::new(config.clone())?;
let mut builder = Pool::builder()
.max_size(config.max_size)
.connection_timeout(config.connection_timeout);
if let Some(min_idle) = config.min_idle {
builder = builder.min_idle(Some(min_idle));
}
builder = builder.idle_timeout(config.idle_timeout);
builder = builder.max_lifetime(config.max_lifetime);
let pool = builder
.build(manager.clone())
.await
.map_err(|e| SentinelPoolError::Pool(format!("failed to build pool: {e}")))?;
let shutdown = Arc::new(Notify::new());
let watcher_handle = if config.enable_watcher {
Some(spawn_watcher(manager.clone(), Arc::clone(&shutdown)))
} else {
None
};
Ok(Self {
inner: Arc::new(Inner {
pool,
manager,
config,
watcher_handle: parking_lot::Mutex::new(watcher_handle),
shutdown,
}),
})
}
pub fn config(&self) -> &SentinelPoolConfig {
&self.inner.config
}
pub fn inner_pool(&self) -> &Pool<SentinelConnectionManager> {
&self.inner.pool
}
pub fn epoch(&self) -> u64 {
self.inner.manager.current_epoch()
}
pub fn force_refresh(&self) -> u64 {
self.inner.manager.bump_epoch()
}
pub async fn get(&self) -> Result<ConnectionLease<'_>> {
let inner = self.inner.pool.get().await.map_err(map_bb8_run_error)?;
Ok(ConnectionLease { inner })
}
pub async fn execute<F, Fut, T>(&self, mut op: F) -> Result<T>
where
F: FnMut(MultiplexedConnection) -> Fut,
Fut: std::future::Future<Output = std::result::Result<T, redis::RedisError>>,
{
let max_retries = self.inner.config.max_retries.max(1);
let backoff = self.inner.config.retry_backoff;
let mut last_err: Option<redis::RedisError> = None;
for attempt in 1..=max_retries {
let mut lease = match self.get().await {
Ok(l) => l,
Err(SentinelPoolError::Redis(e)) => {
last_err = Some(e);
sleep_for(backoff * attempt).await;
continue;
}
Err(other) => return Err(other),
};
let conn_clone = lease.connection().clone();
match op(conn_clone).await {
Ok(value) => return Ok(value),
Err(err) => {
if is_retryable(&err) {
warn!(?err, attempt, "redis op failed, will retry after refresh");
self.force_refresh();
drop(lease);
last_err = Some(err);
sleep_for(backoff * attempt).await;
continue;
} else {
return Err(err.into());
}
}
}
}
Err(SentinelPoolError::RetryExhausted {
attempts: max_retries,
source: last_err.unwrap_or_else(|| {
redis::RedisError::from((redis::ErrorKind::Io, "unknown failure"))
}),
})
}
pub async fn shutdown(self) {
self.inner.shutdown.notify_waiters();
let handle = self.inner.watcher_handle.lock().take();
if let Some(h) = handle {
if let Err(e) = h.await {
debug!(?e, "watcher join error");
}
}
}
}
pub struct ConnectionLease<'a> {
inner: Bb8Pooled<'a, SentinelConnectionManager>,
}
impl<'a> ConnectionLease<'a> {
pub fn connection(&mut self) -> &mut MultiplexedConnection {
self.inner.as_mut()
}
pub fn pooled(&mut self) -> &mut PooledConnection {
&mut self.inner
}
}
impl<'a> std::ops::Deref for ConnectionLease<'a> {
type Target = MultiplexedConnection;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
impl<'a> std::ops::DerefMut for ConnectionLease<'a> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.inner.as_mut()
}
}
fn map_bb8_run_error(err: bb8::RunError<SentinelPoolError>) -> SentinelPoolError {
match err {
bb8::RunError::User(e) => e,
bb8::RunError::TimedOut => SentinelPoolError::Pool("checkout timed out".into()),
}
}
fn is_retryable(err: &redis::RedisError) -> bool {
if err.is_io_error() || err.is_connection_dropped() || err.is_connection_refusal() {
return true;
}
matches!(
err.retry_method(),
redis::RetryMethod::Reconnect
| redis::RetryMethod::ReconnectFromInitialConnections
| redis::RetryMethod::RetryImmediately
| redis::RetryMethod::WaitAndRetry
)
}
async fn sleep_for(d: Duration) {
if d.is_zero() {
tokio::task::yield_now().await;
} else {
tokio::time::sleep(d).await;
}
}