use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use tokio::sync::watch;
use tracing::warn;
use crate::decommission::observer::DecommissionObserver;
use super::super::context::BootstrapCtx;
use super::super::errors::{BootstrapError, ShutdownError};
use super::super::health::SubsystemHealth;
use super::super::r#trait::{ClusterSubsystem, SubsystemHandle};
pub struct DecommissionSubsystem {
local_node_id: u64,
poll_interval: Duration,
}
impl DecommissionSubsystem {
pub fn new(local_node_id: u64, poll_interval: Duration) -> Self {
Self {
local_node_id,
poll_interval,
}
}
}
#[async_trait]
impl ClusterSubsystem for DecommissionSubsystem {
fn name(&self) -> &'static str {
"decommission"
}
fn dependencies(&self) -> &'static [&'static str] {
&["swim"]
}
async fn start(&self, ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
let (observer, decommission_rx) = DecommissionObserver::new(
Arc::clone(&ctx.topology),
self.local_node_id,
self.poll_interval,
);
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let mut decommission_rx_clone = decommission_rx.clone();
let shutdown_tx_clone = shutdown_tx.clone();
tokio::spawn(async move {
while decommission_rx_clone.changed().await.is_ok() {
if *decommission_rx_clone.borrow() {
warn!("decommission observer fired: local node is leaving cluster");
let _ = shutdown_tx_clone.send(true);
return;
}
}
});
let task = tokio::spawn(async move { observer.run(shutdown_rx).await });
Ok(SubsystemHandle::new("decommission", task, shutdown_tx))
}
async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
Ok(())
}
fn health(&self) -> SubsystemHealth {
SubsystemHealth::Running
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn decommission_name_and_deps() {
let s = DecommissionSubsystem::new(1, Duration::from_secs(5));
assert_eq!(s.name(), "decommission");
assert_eq!(s.dependencies(), &["swim"]);
}
}