use std::sync::Arc;
use std::time::Instant;
use async_trait::async_trait;
use tokio::sync::watch;
use crate::migration_executor::MigrationExecutor;
use crate::rebalancer::driver::{RebalancerLoop, RebalancerLoopConfig};
use super::super::context::BootstrapCtx;
use super::super::errors::{BootstrapError, ShutdownError};
use super::super::health::SubsystemHealth;
use super::super::r#trait::{ClusterSubsystem, SubsystemHandle};
use super::adapters::{ExecutorDispatcher, MultiRaftElectionGate, NexarTransportMetricsProvider};
pub struct RebalancerSubsystem {
cfg: RebalancerLoopConfig,
executor: Arc<MigrationExecutor>,
}
impl RebalancerSubsystem {
pub fn new(cfg: RebalancerLoopConfig, executor: Arc<MigrationExecutor>) -> Self {
Self { cfg, executor }
}
}
#[async_trait]
impl ClusterSubsystem for RebalancerSubsystem {
fn name(&self) -> &'static str {
"rebalancer"
}
fn dependencies(&self) -> &'static [&'static str] {
&["swim", "reachability"]
}
async fn start(&self, ctx: &BootstrapCtx) -> Result<SubsystemHandle, BootstrapError> {
let metrics = Arc::new(NexarTransportMetricsProvider::new(Arc::clone(
&ctx.transport,
)));
let dispatcher = Arc::new(ExecutorDispatcher::new(Arc::clone(&self.executor)));
let gate = Arc::new(MultiRaftElectionGate::new(&ctx.multi_raft));
let routing = Arc::clone(&ctx.routing);
let topology = Arc::clone(&ctx.topology);
let rloop = Arc::new(RebalancerLoop::new(
self.cfg.clone(),
metrics,
dispatcher,
gate,
routing,
topology,
));
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let task = tokio::spawn(async move { rloop.run(shutdown_rx).await });
Ok(SubsystemHandle::new("rebalancer", task, shutdown_tx))
}
async fn shutdown(&self, _deadline: Instant) -> Result<(), ShutdownError> {
Ok(())
}
fn health(&self) -> SubsystemHealth {
SubsystemHealth::Running
}
}
#[cfg(test)]
mod tests {
use super::*;
fn _assert_name_is_rebalancer(s: &RebalancerSubsystem) {
assert_eq!(s.name(), "rebalancer");
}
fn _assert_deps_correct(s: &RebalancerSubsystem) {
assert_eq!(s.dependencies(), &["swim", "reachability"]);
}
#[test]
fn dependency_slice_has_two_entries() {
const EXPECTED: &[&str] = &["swim", "reachability"];
assert_eq!(EXPECTED.len(), 2);
assert_eq!(EXPECTED[0], "swim");
assert_eq!(EXPECTED[1], "reachability");
}
#[test]
fn rebalancer_loop_config_default_is_sensible() {
let cfg = RebalancerLoopConfig::default();
assert!(cfg.interval.as_secs() > 0);
assert!(cfg.backpressure_cpu_threshold > 0.0);
}
}