mod deployment;
mod rebalance;
use std::collections::HashMap;
#[cfg(feature = "raft")]
use std::sync::Arc;
use std::time::{Duration, Instant};
use serde::{Deserialize, Serialize};
use tracing::info;
use crate::connector_config::{self, ClusterConnector};
use crate::health::{self, HealthSweepResult};
use crate::metrics::ClusterPrometheusMetrics;
use crate::migration::{MigrationReason, MigrationTask};
use crate::pipeline_group::{DeployedPipelineGroup, PipelineDeployment, PipelineGroupSpec};
use crate::worker::{HeartbeatRequest, PipelineMetrics, WorkerId, WorkerNode, WorkerStatus};
use crate::{ClusterError, PlacementStrategy, RoundRobinPlacement};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ClusterMetrics {
pub pipelines: Vec<PipelineWorkerMetrics>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineWorkerMetrics {
pub pipeline_name: String,
pub worker_id: String,
pub events_in: u64,
pub events_out: u64,
#[serde(default)]
pub connector_health: Vec<crate::worker::ConnectorHealth>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScalingPolicy {
pub min_workers: usize,
pub max_workers: usize,
pub scale_up_threshold: f64,
pub scale_down_threshold: f64,
pub cooldown_secs: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub webhook_url: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScalingRecommendation {
pub action: ScalingAction,
pub current_workers: usize,
pub target_workers: usize,
pub reason: String,
pub avg_pipelines_per_worker: f64,
pub total_pipelines: usize,
pub timestamp: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ScalingAction {
ScaleUp,
ScaleDown,
Stable,
}
#[cfg(feature = "raft")]
pub struct RaftHandle {
pub raft: Arc<crate::raft::VarpulisRaft>,
pub store_state: crate::raft::store::SharedCoordinatorState,
pub peer_addrs: std::collections::BTreeMap<u64, String>,
pub admin_key: Option<String>,
}
#[cfg(feature = "raft")]
impl std::fmt::Debug for RaftHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RaftHandle")
.field("peer_addrs", &self.peer_addrs)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone)]
pub struct DeployTask {
pub replica_name: String,
pub pipeline_name: String,
pub worker_id: WorkerId,
pub worker_address: String,
pub worker_api_key: String,
pub source: String,
pub replica_count: usize,
}
#[derive(Debug)]
pub struct DeployTaskResult {
pub replica_name: String,
pub pipeline_name: String,
pub worker_id: WorkerId,
pub worker_address: String,
pub worker_api_key: String,
pub replica_count: usize,
pub outcome: Result<DeployResponse, String>,
}
#[derive(Debug)]
pub struct DeployGroupPlan {
pub group_id: String,
pub spec: PipelineGroupSpec,
pub tasks: Vec<DeployTask>,
pub deploy_start: Instant,
}
#[derive(Debug, Clone)]
pub struct MigratePipelinePlan {
pub migration_id: String,
pub pipeline_name: String,
pub group_id: String,
pub source_worker_id: WorkerId,
pub target_worker_id: WorkerId,
pub target_address: String,
pub target_api_key: String,
pub deployment: PipelineDeployment,
pub vpl_source: String,
pub reason: MigrationReason,
pub migrate_start: Instant,
}
pub struct Coordinator {
pub workers: HashMap<WorkerId, WorkerNode>,
pub pipeline_groups: HashMap<String, DeployedPipelineGroup>,
pub connectors: HashMap<String, ClusterConnector>,
pub worker_metrics: HashMap<WorkerId, Vec<PipelineMetrics>>,
pub active_migrations: HashMap<String, MigrationTask>,
pub pending_rebalance: bool,
pub last_health_sweep: Option<HealthSweepResult>,
pub scaling_policy: Option<ScalingPolicy>,
pub last_scaling_recommendation: Option<ScalingRecommendation>,
pub(crate) last_scaling_webhook: Option<Instant>,
pub heartbeat_interval: Duration,
pub heartbeat_timeout: Duration,
pub(crate) placement: Box<dyn PlacementStrategy>,
pub(crate) http_client: reqwest::Client,
#[cfg(feature = "nats-transport")]
pub nats_client: Option<async_nats::Client>,
pub ha_role: HaRole,
#[cfg(feature = "raft")]
pub raft_handle: Option<RaftHandle>,
pub cluster_metrics: ClusterPrometheusMetrics,
pub model_registry: HashMap<String, crate::model_registry::ModelRegistryEntry>,
pub llm_config: Option<crate::chat::LlmConfig>,
#[cfg(feature = "federation")]
pub federation: Option<crate::federation::FederationCoordinator>,
}
impl std::fmt::Debug for Coordinator {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Coordinator")
.field("workers", &self.workers)
.field("pipeline_groups", &self.pipeline_groups)
.field("connectors", &self.connectors)
.field("worker_metrics", &self.worker_metrics)
.field("active_migrations", &self.active_migrations)
.field("pending_rebalance", &self.pending_rebalance)
.field("last_health_sweep", &self.last_health_sweep)
.field("scaling_policy", &self.scaling_policy)
.field(
"last_scaling_recommendation",
&self.last_scaling_recommendation,
)
.field("last_scaling_webhook", &self.last_scaling_webhook)
.field("heartbeat_interval", &self.heartbeat_interval)
.field("heartbeat_timeout", &self.heartbeat_timeout)
.field("ha_role", &self.ha_role)
.field("cluster_metrics", &self.cluster_metrics)
.field("model_registry", &self.model_registry)
.field("llm_config", &self.llm_config)
.finish_non_exhaustive()
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum HaRole {
#[default]
Standalone,
Leader,
Follower {
leader_id: String,
},
}
impl HaRole {
pub fn is_writer(&self) -> bool {
matches!(self, HaRole::Standalone | HaRole::Leader)
}
}
impl Coordinator {
pub fn new() -> Self {
Self {
workers: HashMap::new(),
pipeline_groups: HashMap::new(),
connectors: HashMap::new(),
worker_metrics: HashMap::new(),
active_migrations: HashMap::new(),
pending_rebalance: false,
last_health_sweep: None,
scaling_policy: None,
last_scaling_recommendation: None,
last_scaling_webhook: None,
heartbeat_interval: health::DEFAULT_HEARTBEAT_INTERVAL,
heartbeat_timeout: health::DEFAULT_HEARTBEAT_TIMEOUT,
placement: Box::new(RoundRobinPlacement::new()),
http_client: reqwest::Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to build HTTP client"),
#[cfg(feature = "nats-transport")]
nats_client: None,
ha_role: HaRole::default(),
#[cfg(feature = "raft")]
raft_handle: None,
cluster_metrics: ClusterPrometheusMetrics::new(),
model_registry: HashMap::new(),
llm_config: None,
#[cfg(feature = "federation")]
federation: None,
}
}
#[cfg(feature = "raft")]
pub fn with_raft(
raft: Arc<crate::raft::VarpulisRaft>,
store_state: crate::raft::store::SharedCoordinatorState,
peer_addrs: std::collections::BTreeMap<u64, String>,
admin_key: Option<String>,
) -> Self {
let mut coord = Self::new();
coord.raft_handle = Some(RaftHandle {
raft,
store_state,
peer_addrs,
admin_key,
});
coord
}
#[cfg(feature = "raft")]
#[tracing::instrument(skip(self))]
pub async fn raft_replicate(
&self,
cmd: crate::raft::ClusterCommand,
) -> Result<(), ClusterError> {
if let Some(ref handle) = self.raft_handle {
handle.raft.client_write(cmd).await.map_err(|e| {
let leader_info = format!("{}", e);
ClusterError::NotLeader(leader_info)
})?;
}
Ok(())
}
#[cfg(feature = "raft")]
pub fn is_raft_leader(&self) -> bool {
match &self.raft_handle {
None => true, Some(handle) => {
let metrics = handle.raft.metrics().borrow().clone();
metrics.current_leader == Some(metrics.id)
}
}
}
#[cfg(feature = "raft")]
pub fn raft_leader_addr(&self) -> Option<String> {
let handle = self.raft_handle.as_ref()?;
let metrics = handle.raft.metrics().borrow().clone();
let leader_id = metrics.current_leader?;
if leader_id == metrics.id {
return None; }
handle.peer_addrs.get(&leader_id).cloned()
}
#[cfg(feature = "raft")]
pub fn sync_from_raft(&mut self) {
let Some(ref handle) = self.raft_handle else {
return;
};
let raft_state = handle.store_state.read().unwrap_or_else(|e| e.into_inner());
let raft_worker_ids: std::collections::HashSet<WorkerId> = raft_state
.workers
.keys()
.map(|k| WorkerId(k.clone()))
.collect();
for (id, entry) in &raft_state.workers {
let wid = WorkerId(id.clone());
let raft_status = match entry.status.as_str() {
"ready" => WorkerStatus::Ready,
"unhealthy" => WorkerStatus::Unhealthy,
"draining" => WorkerStatus::Draining,
"registering" => WorkerStatus::Registering,
_ => WorkerStatus::Ready,
};
if let Some(local) = self.workers.get_mut(&wid) {
local.assigned_pipelines = entry.assigned_pipelines.clone();
local.capacity.cpu_cores = entry.cpu_cores;
local.capacity.max_pipelines = entry.max_pipelines;
if entry.events_processed > local.events_processed {
local.events_processed = entry.events_processed;
}
local.capacity.pipelines_running = entry.pipelines_running;
match raft_status {
WorkerStatus::Unhealthy | WorkerStatus::Draining => {
local.status = raft_status;
}
WorkerStatus::Ready => {
local.last_heartbeat = Instant::now();
}
_ => {}
}
} else {
let worker = WorkerNode {
id: wid.clone(),
address: entry.address.clone(),
api_key: varpulis_core::security::SecretString::new(entry.api_key.clone()),
status: raft_status,
capacity: crate::worker::WorkerCapacity {
cpu_cores: entry.cpu_cores,
pipelines_running: entry.pipelines_running,
max_pipelines: entry.max_pipelines,
},
last_heartbeat: Instant::now(),
assigned_pipelines: entry.assigned_pipelines.clone(),
events_processed: entry.events_processed,
};
self.workers.insert(wid, worker);
}
}
self.workers.retain(|id, _| raft_worker_ids.contains(id));
self.pipeline_groups = raft_state
.pipeline_groups
.iter()
.filter_map(|(name, val)| {
serde_json::from_value(val.clone())
.ok()
.map(|group| (name.clone(), group))
})
.collect();
self.connectors = raft_state.connectors.clone();
self.scaling_policy = raft_state
.scaling_policy
.as_ref()
.and_then(|v| serde_json::from_value(v.clone()).ok());
for (worker_id_str, metrics) in &raft_state.worker_pipeline_metrics {
let wid = WorkerId(worker_id_str.clone());
if !metrics.is_empty() {
self.worker_metrics.insert(wid, metrics.clone());
}
}
tracing::debug!(
"Synced from Raft state: {} workers, {} groups, {} connectors",
self.workers.len(),
self.pipeline_groups.len(),
self.connectors.len()
);
}
#[cfg(feature = "raft")]
pub fn update_raft_role(&mut self) {
let Some(ref handle) = self.raft_handle else {
return;
};
let metrics = handle.raft.metrics().borrow().clone();
let is_leader = metrics.current_leader == Some(metrics.id);
self.ha_role = if is_leader {
HaRole::Leader
} else {
let leader_id = metrics
.current_leader
.map(|id| id.to_string())
.unwrap_or_else(|| "unknown".to_string());
HaRole::Follower { leader_id }
};
}
pub fn http_client(&self) -> &reqwest::Client {
&self.http_client
}
pub fn require_writer(&self) -> Result<(), crate::ClusterError> {
if self.ha_role.is_writer() {
Ok(())
} else {
let leader = match &self.ha_role {
HaRole::Follower { leader_id } => leader_id.clone(),
_ => "unknown".to_string(),
};
Err(crate::ClusterError::NotLeader(leader))
}
}
pub fn register_worker(&mut self, mut node: WorkerNode) -> WorkerId {
let id = node.id.clone();
node.status = WorkerStatus::Ready;
info!(worker_id = %id, address = %node.address, "Worker registered");
self.workers.insert(id.clone(), node);
self.update_metrics_counts();
if !self.pipeline_groups.is_empty() {
self.pending_rebalance = true;
}
id
}
pub fn heartbeat(
&mut self,
worker_id: &WorkerId,
hb: &HeartbeatRequest,
) -> Result<(), ClusterError> {
let worker = self
.workers
.get_mut(worker_id)
.ok_or_else(|| ClusterError::WorkerNotFound(worker_id.0.clone()))?;
worker.last_heartbeat = std::time::Instant::now();
worker.capacity.pipelines_running = hb.pipelines_running;
worker.events_processed = hb.events_processed;
if worker.status == WorkerStatus::Unhealthy {
info!("Worker {} recovered (heartbeat received)", worker_id);
worker.status = WorkerStatus::Ready;
}
if !hb.pipeline_metrics.is_empty() {
self.update_worker_metrics(worker_id, hb.pipeline_metrics.clone());
}
Ok(())
}
pub fn deregister_worker(&mut self, worker_id: &WorkerId) -> Result<(), ClusterError> {
self.workers
.remove(worker_id)
.ok_or_else(|| ClusterError::WorkerNotFound(worker_id.0.clone()))?;
self.worker_metrics.remove(worker_id);
info!(worker_id = %worker_id, "Worker deregistered");
Ok(())
}
pub fn health_sweep(&mut self) -> HealthSweepResult {
let start = Instant::now();
let timeout = self.heartbeat_timeout;
let result = health::health_sweep(&mut self.workers, timeout);
self.last_health_sweep = Some(HealthSweepResult {
workers_checked: result.workers_checked,
workers_marked_unhealthy: result.workers_marked_unhealthy.clone(),
});
self.cluster_metrics
.record_health_sweep(result.workers_checked, start.elapsed().as_secs_f64());
if !result.workers_marked_unhealthy.is_empty() {
self.update_metrics_counts();
}
result
}
pub fn list_connectors(&self) -> Vec<&ClusterConnector> {
self.connectors.values().collect()
}
pub fn get_connector(&self, name: &str) -> Result<&ClusterConnector, ClusterError> {
self.connectors
.get(name)
.ok_or_else(|| ClusterError::ConnectorNotFound(name.to_string()))
}
pub fn create_connector(
&mut self,
connector: ClusterConnector,
) -> Result<&ClusterConnector, ClusterError> {
if self.connectors.contains_key(&connector.name) {
return Err(ClusterError::ConnectorValidation(format!(
"Connector '{}' already exists",
connector.name
)));
}
connector_config::validate_connector(&connector)?;
let name = connector.name.clone();
self.connectors.insert(name.clone(), connector);
info!("Connector created: {}", name);
Ok(&self.connectors[&name])
}
pub fn update_connector(
&mut self,
name: &str,
connector: ClusterConnector,
) -> Result<&ClusterConnector, ClusterError> {
if !self.connectors.contains_key(name) {
return Err(ClusterError::ConnectorNotFound(name.to_string()));
}
connector_config::validate_connector(&connector)?;
self.connectors.insert(name.to_string(), connector);
info!("Connector updated: {}", name);
Ok(&self.connectors[name])
}
pub fn delete_connector(&mut self, name: &str) -> Result<(), ClusterError> {
self.connectors
.remove(name)
.ok_or_else(|| ClusterError::ConnectorNotFound(name.to_string()))?;
info!("Connector deleted: {}", name);
Ok(())
}
pub fn update_worker_metrics(&mut self, worker_id: &WorkerId, metrics: Vec<PipelineMetrics>) {
self.worker_metrics.insert(worker_id.clone(), metrics);
}
pub fn get_cluster_metrics(&self) -> ClusterMetrics {
let mut active_placements = std::collections::HashSet::new();
for group in self.pipeline_groups.values() {
for (pipeline_name, placement) in &group.placements {
active_placements.insert((placement.worker_id.0.clone(), pipeline_name.clone()));
}
}
let mut pipelines = Vec::new();
for (worker_id, metrics) in &self.worker_metrics {
for m in metrics {
if active_placements.contains(&(worker_id.0.clone(), m.pipeline_name.clone())) {
pipelines.push(PipelineWorkerMetrics {
pipeline_name: m.pipeline_name.clone(),
worker_id: worker_id.0.clone(),
events_in: m.events_in,
events_out: m.events_out,
connector_health: m.connector_health.clone(),
});
}
}
}
ClusterMetrics { pipelines }
}
pub fn check_connector_health(&self) -> Vec<(String, WorkerId, String)> {
let mut unhealthy = Vec::new();
for (worker_id, metrics) in &self.worker_metrics {
for m in metrics {
for ch in &m.connector_health {
if !ch.connected && ch.seconds_since_last_message > 60 {
unhealthy.push((
m.pipeline_name.clone(),
worker_id.clone(),
ch.connector_name.clone(),
));
}
}
}
}
unhealthy
}
pub fn evaluate_scaling(&mut self) -> Option<ScalingRecommendation> {
let policy = self.scaling_policy.as_ref()?;
let healthy_workers = self
.workers
.values()
.filter(|w| w.status == WorkerStatus::Ready)
.count();
let total_pipelines: usize = self
.pipeline_groups
.values()
.map(|g| g.placements.len())
.sum();
let avg_load = if healthy_workers > 0 {
total_pipelines as f64 / healthy_workers as f64
} else {
0.0
};
let (action, target, reason) = if healthy_workers < policy.min_workers {
(
ScalingAction::ScaleUp,
policy.min_workers,
format!(
"Below minimum workers: {} < {}",
healthy_workers, policy.min_workers
),
)
} else if avg_load > policy.scale_up_threshold && healthy_workers < policy.max_workers {
let needed = (total_pipelines as f64 / policy.scale_up_threshold).ceil() as usize;
let target = needed.min(policy.max_workers);
(
ScalingAction::ScaleUp,
target,
format!(
"Load {:.1} exceeds threshold {:.1}",
avg_load, policy.scale_up_threshold
),
)
} else if avg_load < policy.scale_down_threshold && healthy_workers > policy.min_workers {
let needed = if total_pipelines > 0 {
(total_pipelines as f64 / policy.scale_up_threshold).ceil() as usize
} else {
policy.min_workers
};
let target = needed.max(policy.min_workers);
(
ScalingAction::ScaleDown,
target,
format!(
"Load {:.1} below threshold {:.1}",
avg_load, policy.scale_down_threshold
),
)
} else {
(
ScalingAction::Stable,
healthy_workers,
"Load within thresholds".to_string(),
)
};
let recommendation = ScalingRecommendation {
action,
current_workers: healthy_workers,
target_workers: target,
reason,
avg_pipelines_per_worker: avg_load,
total_pipelines,
timestamp: chrono::Utc::now().to_rfc3339(),
};
self.last_scaling_recommendation = Some(recommendation.clone());
Some(recommendation)
}
pub(crate) fn update_metrics_counts(&self) {
let (mut ready, mut unhealthy, mut draining) = (0usize, 0usize, 0usize);
for w in self.workers.values() {
match w.status {
WorkerStatus::Ready => ready += 1,
WorkerStatus::Unhealthy => unhealthy += 1,
WorkerStatus::Draining => draining += 1,
_ => {}
}
}
self.cluster_metrics
.set_worker_counts(ready, unhealthy, draining);
let total_deployments: usize = self
.pipeline_groups
.values()
.map(|g| g.placements.len())
.sum();
self.cluster_metrics
.set_deployment_counts(self.pipeline_groups.len(), total_deployments);
}
}
impl Default for Coordinator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Deserialize)]
pub struct DeployResponse {
pub id: String,
pub name: String,
pub status: String,
}
#[derive(Debug, Deserialize)]
pub(crate) struct CheckpointResponsePayload {
pub(crate) pipeline_id: String,
pub(crate) checkpoint: varpulis_runtime::persistence::EngineCheckpoint,
pub(crate) events_processed: u64,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InjectEventRequest {
pub event_type: String,
#[serde(default)]
pub fields: serde_json::Map<String, serde_json::Value>,
}
#[derive(Debug, Clone)]
pub struct TeardownPlan {
pub group_id: String,
pub tasks: Vec<(String, PipelineDeployment)>,
}
#[derive(Debug, Clone)]
pub struct InjectTarget {
pub url: String,
pub api_key: String,
pub target_name: String,
pub worker_id: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InjectResponse {
pub routed_to: String,
pub worker_id: String,
pub worker_response: serde_json::Value,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InjectBatchRequest {
pub events_text: String,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct InjectBatchResponse {
pub events_sent: usize,
pub events_failed: usize,
pub output_events: Vec<serde_json::Value>,
pub errors: Vec<String>,
pub processing_time_us: u64,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::migration::MigrationStatus;
use crate::pipeline_group::{DeployedPipelineGroup, PipelineGroupSpec, PipelinePlacement};
use crate::worker::WorkerNode;
#[test]
fn test_coordinator_register_worker() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
let id = coord.register_worker(node);
assert_eq!(id, WorkerId("w1".into()));
assert_eq!(coord.workers.len(), 1);
assert_eq!(coord.workers[&id].status, WorkerStatus::Ready);
}
#[test]
fn test_coordinator_deregister_worker() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
assert!(coord.deregister_worker(&WorkerId("w1".into())).is_ok());
assert!(coord.workers.is_empty());
}
#[test]
fn test_coordinator_deregister_unknown() {
let mut coord = Coordinator::new();
assert!(coord
.deregister_worker(&WorkerId("unknown".into()))
.is_err());
}
#[test]
fn test_coordinator_heartbeat() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
let hb = HeartbeatRequest {
events_processed: 100,
pipelines_running: 2,
pipeline_metrics: vec![],
};
assert!(coord.heartbeat(&WorkerId("w1".into()), &hb).is_ok());
assert_eq!(
coord.workers[&WorkerId("w1".into())]
.capacity
.pipelines_running,
2
);
}
#[test]
fn test_coordinator_health_sweep() {
let mut coord = Coordinator::new();
let mut node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
node.status = WorkerStatus::Ready;
node.last_heartbeat = std::time::Instant::now() - std::time::Duration::from_secs(20);
coord.workers.insert(node.id.clone(), node);
let result = coord.health_sweep();
assert_eq!(result.workers_marked_unhealthy.len(), 1);
}
#[test]
fn test_coordinator_heartbeat_unknown_worker() {
let mut coord = Coordinator::new();
let hb = HeartbeatRequest {
events_processed: 0,
pipelines_running: 0,
pipeline_metrics: vec![],
};
let result = coord.heartbeat(&WorkerId("nonexistent".into()), &hb);
assert!(result.is_err());
match result.unwrap_err() {
crate::ClusterError::WorkerNotFound(id) => assert_eq!(id, "nonexistent"),
other => panic!("Expected WorkerNotFound, got: {:?}", other),
}
}
#[test]
fn test_coordinator_heartbeat_recovers_unhealthy() {
let mut coord = Coordinator::new();
let mut node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
node.status = WorkerStatus::Ready;
coord.workers.insert(node.id.clone(), node);
coord
.workers
.get_mut(&WorkerId("w1".into()))
.unwrap()
.status = WorkerStatus::Unhealthy;
assert_eq!(
coord.workers[&WorkerId("w1".into())].status,
WorkerStatus::Unhealthy
);
let hb = HeartbeatRequest {
events_processed: 50,
pipelines_running: 1,
pipeline_metrics: vec![],
};
assert!(coord.heartbeat(&WorkerId("w1".into()), &hb).is_ok());
assert_eq!(
coord.workers[&WorkerId("w1".into())].status,
WorkerStatus::Ready
);
}
#[test]
fn test_coordinator_re_register_same_worker() {
let mut coord = Coordinator::new();
let node1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key1".into(),
);
coord.register_worker(node1);
assert_eq!(
coord.workers[&WorkerId("w1".into())].api_key.expose(),
"key1"
);
let node2 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9999".into(),
"key2".into(),
);
coord.register_worker(node2);
assert_eq!(coord.workers.len(), 1);
assert_eq!(
coord.workers[&WorkerId("w1".into())].address,
"http://localhost:9999"
);
assert_eq!(
coord.workers[&WorkerId("w1".into())].api_key.expose(),
"key2"
);
}
#[test]
fn test_coordinator_multiple_workers() {
let mut coord = Coordinator::new();
for i in 0..5 {
let node = WorkerNode::new(
WorkerId(format!("w{}", i)),
format!("http://localhost:900{}", i),
"key".into(),
);
coord.register_worker(node);
}
assert_eq!(coord.workers.len(), 5);
for i in 0..5 {
assert!(coord.workers.contains_key(&WorkerId(format!("w{}", i))));
assert_eq!(
coord.workers[&WorkerId(format!("w{}", i))].status,
WorkerStatus::Ready
);
}
}
#[test]
fn test_coordinator_deregister_all() {
let mut coord = Coordinator::new();
for i in 0..3 {
let node = WorkerNode::new(
WorkerId(format!("w{}", i)),
format!("http://localhost:900{}", i),
"key".into(),
);
coord.register_worker(node);
}
assert_eq!(coord.workers.len(), 3);
for i in 0..3 {
assert!(coord
.deregister_worker(&WorkerId(format!("w{}", i)))
.is_ok());
}
assert!(coord.workers.is_empty());
}
#[test]
fn test_coordinator_heartbeat_updates_pipelines_running() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
assert_eq!(
coord.workers[&WorkerId("w1".into())]
.capacity
.pipelines_running,
0
);
let hb = HeartbeatRequest {
events_processed: 1000,
pipelines_running: 5,
pipeline_metrics: vec![],
};
coord.heartbeat(&WorkerId("w1".into()), &hb).unwrap();
assert_eq!(
coord.workers[&WorkerId("w1".into())]
.capacity
.pipelines_running,
5
);
}
#[test]
fn test_coordinator_default() {
let coord = Coordinator::default();
assert!(coord.workers.is_empty());
assert!(coord.pipeline_groups.is_empty());
}
#[test]
fn test_cleanup_completed_migrations_removes_old() {
let mut coord = Coordinator::new();
let old_instant = Instant::now()
.checked_sub(Duration::from_secs(7200))
.unwrap_or(Instant::now());
if old_instant.elapsed() < Duration::from_secs(3600) {
return;
}
let mut task = MigrationTask {
id: "m1".into(),
pipeline_name: "p1".into(),
group_id: "g1".into(),
source_worker: WorkerId("w1".into()),
target_worker: WorkerId("w2".into()),
status: MigrationStatus::Completed,
started_at: old_instant,
checkpoint: None,
reason: MigrationReason::Failover,
};
coord.active_migrations.insert("m1".into(), task.clone());
task.id = "m2".into();
task.started_at = Instant::now(); coord.active_migrations.insert("m2".into(), task.clone());
task.id = "m3".into();
task.status = MigrationStatus::Failed("error".into());
task.started_at = old_instant;
coord.active_migrations.insert("m3".into(), task.clone());
task.id = "m4".into();
task.status = MigrationStatus::Deploying;
task.started_at = old_instant;
coord.active_migrations.insert("m4".into(), task);
assert_eq!(coord.active_migrations.len(), 4);
coord.cleanup_completed_migrations(Duration::from_secs(3600));
assert_eq!(coord.active_migrations.len(), 2);
assert!(coord.active_migrations.contains_key("m2"));
assert!(coord.active_migrations.contains_key("m4"));
}
#[test]
fn test_cleanup_completed_migrations_noop_when_empty() {
let mut coord = Coordinator::new();
coord.cleanup_completed_migrations(Duration::from_secs(3600));
assert!(coord.active_migrations.is_empty());
}
#[test]
fn test_cleanup_completed_migrations_keeps_recent() {
let mut coord = Coordinator::new();
let task = MigrationTask {
id: "m1".into(),
pipeline_name: "p1".into(),
group_id: "g1".into(),
source_worker: WorkerId("w1".into()),
target_worker: WorkerId("w2".into()),
status: MigrationStatus::Completed,
started_at: Instant::now(), checkpoint: None,
reason: MigrationReason::Rebalance,
};
coord.active_migrations.insert("m1".into(), task);
coord.cleanup_completed_migrations(Duration::from_secs(3600));
assert_eq!(coord.active_migrations.len(), 1);
}
#[tokio::test]
async fn test_drain_worker_idempotent() {
let mut coord = Coordinator::new();
let mut node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
node.status = WorkerStatus::Draining;
coord.workers.insert(node.id.clone(), node);
let result = coord.drain_worker(&WorkerId("w1".into()), None).await;
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[tokio::test]
async fn test_drain_worker_not_found() {
let mut coord = Coordinator::new();
let result = coord
.drain_worker(&WorkerId("nonexistent".into()), None)
.await;
assert!(result.is_err());
match result.unwrap_err() {
ClusterError::WorkerNotFound(id) => assert_eq!(id, "nonexistent"),
other => panic!("Expected WorkerNotFound, got: {:?}", other),
}
}
#[tokio::test]
async fn test_drain_worker_marks_draining() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
assert_eq!(
coord.workers[&WorkerId("w1".into())].status,
WorkerStatus::Ready
);
let result = coord.drain_worker(&WorkerId("w1".into()), None).await;
assert!(result.is_ok());
assert!(!coord.workers.contains_key(&WorkerId("w1".into())));
}
#[test]
fn test_register_worker_triggers_pending_rebalance() {
let mut coord = Coordinator::new();
assert!(!coord.pending_rebalance);
let node1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node1);
assert!(!coord.pending_rebalance);
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![PipelinePlacement {
name: "p1".into(),
source: "stream A = X".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
}],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
coord.pipeline_groups.insert("g1".into(), group);
let node2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
coord.register_worker(node2);
assert!(coord.pending_rebalance);
}
#[tokio::test]
async fn test_rebalance_needs_two_workers() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
let result = coord.rebalance().await;
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[tokio::test]
async fn test_rebalance_no_pipelines() {
let mut coord = Coordinator::new();
for i in 0..3 {
let node = WorkerNode::new(
WorkerId(format!("w{}", i)),
format!("http://localhost:900{}", i),
"key".into(),
);
coord.register_worker(node);
}
let result = coord.rebalance().await;
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[tokio::test]
async fn test_handle_worker_failure_no_pipelines() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
let results = coord.handle_worker_failure(&WorkerId("w1".into())).await;
assert!(results.is_empty());
}
#[test]
fn test_heartbeat_stores_events_processed() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
assert_eq!(coord.workers[&WorkerId("w1".into())].events_processed, 0);
let hb = HeartbeatRequest {
events_processed: 42000,
pipelines_running: 3,
pipeline_metrics: vec![],
};
coord.heartbeat(&WorkerId("w1".into()), &hb).unwrap();
assert_eq!(
coord.workers[&WorkerId("w1".into())].events_processed,
42000
);
}
#[test]
fn test_health_sweep_stores_last_result() {
let mut coord = Coordinator::new();
assert!(coord.last_health_sweep.is_none());
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
let result = coord.health_sweep();
assert_eq!(result.workers_checked, 1);
assert!(result.workers_marked_unhealthy.is_empty());
let stored = coord.last_health_sweep.as_ref().unwrap();
assert_eq!(stored.workers_checked, 1);
assert!(stored.workers_marked_unhealthy.is_empty());
}
#[test]
fn test_health_sweep_stores_unhealthy_workers() {
let mut coord = Coordinator::new();
let mut node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
node.status = WorkerStatus::Ready;
node.last_heartbeat = std::time::Instant::now() - std::time::Duration::from_secs(20);
coord.workers.insert(node.id.clone(), node);
let result = coord.health_sweep();
assert_eq!(result.workers_marked_unhealthy.len(), 1);
let stored = coord.last_health_sweep.as_ref().unwrap();
assert_eq!(stored.workers_marked_unhealthy.len(), 1);
}
fn make_scaling_policy() -> ScalingPolicy {
ScalingPolicy {
min_workers: 1,
max_workers: 10,
scale_up_threshold: 5.0,
scale_down_threshold: 1.0,
cooldown_secs: 60,
webhook_url: None,
}
}
#[test]
fn test_evaluate_scaling_no_policy() {
let mut coord = Coordinator::new();
assert!(coord.evaluate_scaling().is_none());
}
#[test]
fn test_evaluate_scaling_stable() {
let mut coord = Coordinator::new();
coord.scaling_policy = Some(make_scaling_policy());
for i in 0..2 {
let node = WorkerNode::new(
WorkerId(format!("w{}", i)),
format!("http://localhost:900{}", i),
"key".into(),
);
coord.register_worker(node);
}
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![
PipelinePlacement {
name: "p1".into(),
source: "x".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
PipelinePlacement {
name: "p2".into(),
source: "x".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
group.placements.insert(
"p1".into(),
crate::pipeline_group::PipelineDeployment {
worker_id: WorkerId("w0".into()),
worker_address: String::new(),
worker_api_key: String::new(),
pipeline_id: String::new(),
status: crate::pipeline_group::PipelineDeploymentStatus::Running,
epoch: 0,
},
);
group.placements.insert(
"p2".into(),
crate::pipeline_group::PipelineDeployment {
worker_id: WorkerId("w1".into()),
worker_address: String::new(),
worker_api_key: String::new(),
pipeline_id: String::new(),
status: crate::pipeline_group::PipelineDeploymentStatus::Running,
epoch: 0,
},
);
coord.pipeline_groups.insert("g1".into(), group);
let rec = coord.evaluate_scaling().unwrap();
assert_eq!(rec.action, ScalingAction::Stable);
assert_eq!(rec.current_workers, 2);
}
#[test]
fn test_evaluate_scaling_scale_up() {
let mut coord = Coordinator::new();
coord.scaling_policy = Some(make_scaling_policy());
let node = WorkerNode::new(
WorkerId("w0".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
for i in 0..6 {
group.placements.insert(
format!("p{}", i),
crate::pipeline_group::PipelineDeployment {
worker_id: WorkerId("w0".into()),
worker_address: String::new(),
worker_api_key: String::new(),
pipeline_id: String::new(),
status: crate::pipeline_group::PipelineDeploymentStatus::Running,
epoch: 0,
},
);
}
coord.pipeline_groups.insert("g1".into(), group);
let rec = coord.evaluate_scaling().unwrap();
assert_eq!(rec.action, ScalingAction::ScaleUp);
assert!(rec.target_workers > 1);
}
#[test]
fn test_evaluate_scaling_scale_down() {
let mut coord = Coordinator::new();
coord.scaling_policy = Some(ScalingPolicy {
min_workers: 1,
max_workers: 10,
scale_up_threshold: 5.0,
scale_down_threshold: 1.0,
cooldown_secs: 60,
webhook_url: None,
});
for i in 0..5 {
let node = WorkerNode::new(
WorkerId(format!("w{}", i)),
format!("http://localhost:900{}", i),
"key".into(),
);
coord.register_worker(node);
}
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
for i in 0..2 {
group.placements.insert(
format!("p{}", i),
crate::pipeline_group::PipelineDeployment {
worker_id: WorkerId("w0".into()),
worker_address: String::new(),
worker_api_key: String::new(),
pipeline_id: String::new(),
status: crate::pipeline_group::PipelineDeploymentStatus::Running,
epoch: 0,
},
);
}
coord.pipeline_groups.insert("g1".into(), group);
let rec = coord.evaluate_scaling().unwrap();
assert_eq!(rec.action, ScalingAction::ScaleDown);
assert!(rec.target_workers < 5);
assert!(rec.target_workers >= 1); }
#[test]
fn test_evaluate_scaling_below_min_workers() {
let mut coord = Coordinator::new();
coord.scaling_policy = Some(ScalingPolicy {
min_workers: 3,
max_workers: 10,
scale_up_threshold: 5.0,
scale_down_threshold: 1.0,
cooldown_secs: 60,
webhook_url: None,
});
let node = WorkerNode::new(
WorkerId("w0".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
let rec = coord.evaluate_scaling().unwrap();
assert_eq!(rec.action, ScalingAction::ScaleUp);
assert_eq!(rec.target_workers, 3);
}
#[test]
fn test_evaluate_scaling_respects_max_workers() {
let mut coord = Coordinator::new();
coord.scaling_policy = Some(ScalingPolicy {
min_workers: 1,
max_workers: 3,
scale_up_threshold: 2.0,
scale_down_threshold: 0.5,
cooldown_secs: 60,
webhook_url: None,
});
for i in 0..2 {
let node = WorkerNode::new(
WorkerId(format!("w{}", i)),
format!("http://localhost:900{}", i),
"key".into(),
);
coord.register_worker(node);
}
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
for i in 0..20 {
group.placements.insert(
format!("p{}", i),
crate::pipeline_group::PipelineDeployment {
worker_id: WorkerId("w0".into()),
worker_address: String::new(),
worker_api_key: String::new(),
pipeline_id: String::new(),
status: crate::pipeline_group::PipelineDeploymentStatus::Running,
epoch: 0,
},
);
}
coord.pipeline_groups.insert("g1".into(), group);
let rec = coord.evaluate_scaling().unwrap();
assert_eq!(rec.action, ScalingAction::ScaleUp);
assert!(rec.target_workers <= 3); }
#[test]
fn test_scaling_recommendation_serde() {
let rec = ScalingRecommendation {
action: ScalingAction::ScaleUp,
current_workers: 2,
target_workers: 4,
reason: "Load exceeded".into(),
avg_pipelines_per_worker: 6.0,
total_pipelines: 12,
timestamp: "2026-02-12T00:00:00Z".into(),
};
let json = serde_json::to_string(&rec).unwrap();
let parsed: ScalingRecommendation = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.action, ScalingAction::ScaleUp);
assert_eq!(parsed.current_workers, 2);
assert_eq!(parsed.target_workers, 4);
}
#[test]
fn test_check_connector_health() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
coord.worker_metrics.insert(
WorkerId("w1".into()),
vec![PipelineMetrics {
pipeline_name: "p1".into(),
events_in: 100,
events_out: 50,
connector_health: vec![crate::worker::ConnectorHealth {
connector_name: "mqtt_in".into(),
connector_type: "mqtt".into(),
connected: false,
last_error: Some("connection refused".into()),
messages_received: 0,
seconds_since_last_message: 120,
}],
}],
);
let unhealthy = coord.check_connector_health();
assert_eq!(unhealthy.len(), 1);
assert_eq!(unhealthy[0].0, "p1");
assert_eq!(unhealthy[0].2, "mqtt_in");
}
#[test]
fn test_check_connector_health_healthy() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
coord.register_worker(node);
coord.worker_metrics.insert(
WorkerId("w1".into()),
vec![PipelineMetrics {
pipeline_name: "p1".into(),
events_in: 100,
events_out: 50,
connector_health: vec![crate::worker::ConnectorHealth {
connector_name: "mqtt_in".into(),
connector_type: "mqtt".into(),
connected: true,
last_error: None,
messages_received: 42,
seconds_since_last_message: 2,
}],
}],
);
let unhealthy = coord.check_connector_health();
assert!(unhealthy.is_empty());
}
#[test]
fn test_heartbeat_updates_worker_metrics() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(WorkerId("w1".into()), "http://w1:9000".into(), "key".into());
coord.register_worker(node);
let worker = coord.workers.get(&WorkerId("w1".into())).unwrap();
assert_eq!(worker.events_processed, 0);
assert_eq!(worker.capacity.pipelines_running, 0);
let hb = crate::worker::HeartbeatRequest {
events_processed: 500,
pipelines_running: 2,
pipeline_metrics: vec![crate::worker::PipelineMetrics {
pipeline_name: "financial-cep".into(),
events_in: 500,
events_out: 30,
connector_health: vec![],
}],
};
coord.heartbeat(&WorkerId("w1".into()), &hb).unwrap();
let worker = coord.workers.get(&WorkerId("w1".into())).unwrap();
assert_eq!(
worker.events_processed, 500,
"heartbeat should update events_processed"
);
assert_eq!(
worker.capacity.pipelines_running, 2,
"heartbeat should update pipelines_running"
);
let metrics = coord.worker_metrics.get(&WorkerId("w1".into())).unwrap();
assert_eq!(metrics.len(), 1);
assert_eq!(metrics[0].pipeline_name, "financial-cep");
assert_eq!(metrics[0].events_in, 500);
}
#[test]
fn test_heartbeat_overwrites_previous_metrics() {
let mut coord = Coordinator::new();
let node = WorkerNode::new(WorkerId("w1".into()), "http://w1:9000".into(), "key".into());
coord.register_worker(node);
let hb1 = crate::worker::HeartbeatRequest {
events_processed: 100,
pipelines_running: 1,
pipeline_metrics: vec![],
};
coord.heartbeat(&WorkerId("w1".into()), &hb1).unwrap();
let hb2 = crate::worker::HeartbeatRequest {
events_processed: 500,
pipelines_running: 2,
pipeline_metrics: vec![],
};
coord.heartbeat(&WorkerId("w1".into()), &hb2).unwrap();
let worker = coord.workers.get(&WorkerId("w1".into())).unwrap();
assert_eq!(
worker.events_processed, 500,
"second heartbeat should overwrite, not accumulate"
);
assert_eq!(worker.capacity.pipelines_running, 2);
}
}