use std::sync::Arc;
use std::time::Duration;
use futures_util::StreamExt;
use redis::Msg;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, info, warn};
use crate::manager::SentinelConnectionManager;
const SWITCH_MASTER_CHANNEL: &str = "+switch-master";
pub(crate) fn spawn_watcher(
manager: SentinelConnectionManager,
shutdown: Arc<Notify>,
) -> JoinHandle<()> {
tokio::spawn(async move {
let service_name = manager.config.service_name.clone();
let backoff = manager.config.watcher_reconnect_backoff;
info!(%service_name, "sentinel watcher started");
loop {
tokio::select! {
_ = shutdown.notified() => {
info!("sentinel watcher received shutdown signal");
break;
}
res = run_once(&manager, &shutdown) => {
if let Err(err) = res {
warn!(?err, "watcher loop errored, will retry");
}
if wait_or_shutdown(&shutdown, backoff).await {
info!("sentinel watcher exiting after error");
break;
}
}
}
}
info!("sentinel watcher stopped");
})
}
async fn wait_or_shutdown(shutdown: &Arc<Notify>, dur: Duration) -> bool {
if dur.is_zero() {
return false;
}
tokio::select! {
_ = shutdown.notified() => true,
_ = tokio::time::sleep(dur) => false,
}
}
async fn run_once(
manager: &SentinelConnectionManager,
shutdown: &Arc<Notify>,
) -> Result<(), redis::RedisError> {
let target_service = manager.config.service_name.clone();
let sentinel_client = {
let mut client = manager.shared_client().lock_owned().await;
client.async_get_sentinel_client().await?
};
let mut pubsub = sentinel_client.get_async_pubsub().await?;
pubsub.subscribe(SWITCH_MASTER_CHANNEL).await?;
debug!(channel = SWITCH_MASTER_CHANNEL, "subscribed");
let mut stream = pubsub.on_message();
loop {
tokio::select! {
_ = shutdown.notified() => {
debug!("shutdown received inside pubsub loop");
return Ok(());
}
msg = stream.next() => {
match msg {
Some(m) => handle_message(m, &target_service, manager),
None => {
return Err(redis::RedisError::from((
redis::ErrorKind::Io,
"sentinel pubsub stream closed",
)));
}
}
}
}
}
}
fn handle_message(msg: Msg, target_service: &str, manager: &SentinelConnectionManager) {
let payload: String = match msg.get_payload() {
Ok(p) => p,
Err(err) => {
warn!(?err, "failed to decode +switch-master payload");
return;
}
};
let mut parts = payload.split_whitespace();
let name = parts.next().unwrap_or_default();
if name == target_service {
info!(payload = %payload, "+switch-master matched, bumping epoch");
manager.bump_epoch();
} else {
debug!(payload = %payload, "+switch-master for another service, ignored");
}
}