#![allow(missing_docs)]
pub mod api;
pub mod chat;
pub mod connector_config;
pub mod coordinator;
#[cfg(feature = "federation")]
pub mod federation;
#[cfg(feature = "federation")]
pub mod federation_routing;
#[cfg(feature = "k8s")]
pub mod ha;
pub mod health;
#[cfg(feature = "k8s")]
pub mod k8s_watcher;
pub mod metrics;
pub mod migration;
pub mod model_registry;
pub mod nats_coordinator;
pub mod nats_transport;
pub mod nats_worker;
pub mod pipeline_group;
#[cfg(feature = "raft")]
pub mod raft;
pub mod rate_limit;
pub mod rbac;
pub mod routing;
pub mod worker;
pub use api::{cluster_routes, shared_coordinator, SharedCoordinator};
pub use connector_config::ClusterConnector;
pub use coordinator::{
Coordinator, HaRole, InjectEventRequest, InjectResponse, ScalingAction, ScalingPolicy,
ScalingRecommendation,
};
#[cfg(feature = "federation")]
pub use federation::{
CatalogEntry, FederationConfig, FederationCoordinator, FederationStatus, RegionConfig,
RegionState, RegionStatus, RegionSummary,
};
#[cfg(feature = "federation")]
pub use federation_routing::{CrossRegionRoute, FederationRoutingTable};
pub use health::{
DEFAULT_HEARTBEAT_INTERVAL, DEFAULT_HEARTBEAT_TIMEOUT, HEARTBEAT_INTERVAL, HEARTBEAT_TIMEOUT,
};
pub use metrics::ClusterPrometheusMetrics;
pub use migration::{MigrationReason, MigrationStatus, MigrationTask};
pub use pipeline_group::{
CrossRegionRouteSpec, DeployedPipelineGroup, GroupStatus, InterPipelineRoute,
PartitionStrategy, PipelineDeployment, PipelineDeploymentStatus, PipelineGroupInfo,
PipelineGroupSpec, PipelinePlacement, ReplicaGroup,
};
pub use rbac::{RbacConfig, Role};
pub use routing::{event_type_matches, find_target_pipeline, RoutingTable};
pub use worker::{
ConnectorHealth, HeartbeatRequest, HeartbeatResponse, PipelineMetrics, RegisterWorkerRequest,
RegisterWorkerResponse, WorkerCapacity, WorkerId, WorkerInfo, WorkerNode, WorkerStatus,
};
#[derive(Debug, thiserror::Error)]
pub enum ClusterError {
#[error("Worker not found: {0}")]
WorkerNotFound(String),
#[error("Pipeline group not found: {0}")]
GroupNotFound(String),
#[error("No workers available for deployment")]
NoWorkersAvailable,
#[error("Pipeline deployment failed: {0}")]
DeployFailed(String),
#[error("Event routing failed: {0}")]
RoutingFailed(String),
#[error("Connector not found: {0}")]
ConnectorNotFound(String),
#[error("Connector validation failed: {0}")]
ConnectorValidation(String),
#[error("Migration failed: {0}")]
MigrationFailed(String),
#[error("Worker is draining: {0}")]
WorkerDraining(String),
#[error("Not the leader coordinator; forward to: {0}")]
NotLeader(String),
}
pub trait PlacementStrategy: Send + Sync {
fn place(&self, pipeline: &PipelinePlacement, workers: &[&WorkerNode]) -> Option<WorkerId>;
}
#[derive(Debug)]
pub struct RoundRobinPlacement {
counter: std::sync::atomic::AtomicUsize,
}
impl RoundRobinPlacement {
pub fn new() -> Self {
Self {
counter: std::sync::atomic::AtomicUsize::new(0),
}
}
}
impl Default for RoundRobinPlacement {
fn default() -> Self {
Self::new()
}
}
impl PlacementStrategy for RoundRobinPlacement {
fn place(&self, _pipeline: &PipelinePlacement, workers: &[&WorkerNode]) -> Option<WorkerId> {
if workers.is_empty() {
return None;
}
let idx = self
.counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% workers.len();
Some(workers[idx].id.clone())
}
}
#[derive(Debug)]
pub struct LeastLoadedPlacement;
impl PlacementStrategy for LeastLoadedPlacement {
fn place(&self, _pipeline: &PipelinePlacement, workers: &[&WorkerNode]) -> Option<WorkerId> {
workers
.iter()
.min_by(|a, b| {
let cores_a = a.capacity.cpu_cores.max(1) as f64;
let cores_b = b.capacity.cpu_cores.max(1) as f64;
let ratio_a = a.capacity.pipelines_running as f64 / cores_a;
let ratio_b = b.capacity.pipelines_running as f64 / cores_b;
ratio_a
.partial_cmp(&ratio_b)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| {
a.capacity
.pipelines_running
.cmp(&b.capacity.pipelines_running)
})
})
.map(|w| w.id.clone())
}
}
pub async fn worker_registration_loop(
coordinator_url: String,
worker_id: String,
worker_address: String,
api_key: String,
tenant_manager: Option<varpulis_runtime::SharedTenantManager>,
) {
worker_registration_loop_with_client(
coordinator_url,
worker_id,
worker_address,
api_key,
tenant_manager,
reqwest::Client::new(),
)
.await;
}
pub async fn worker_registration_loop_with_client(
coordinator_url: String,
worker_id: String,
worker_address: String,
api_key: String,
tenant_manager: Option<varpulis_runtime::SharedTenantManager>,
client: reqwest::Client,
) {
use tracing::{info, warn};
let register_url = format!("{}/api/v1/cluster/workers/register", coordinator_url);
let heartbeat_url = format!(
"{}/api/v1/cluster/workers/{}/heartbeat",
coordinator_url, worker_id
);
let mut backoff = std::time::Duration::from_secs(1);
let max_backoff = std::time::Duration::from_secs(30);
let mut coordinator_heartbeat_interval = None;
loop {
let body = RegisterWorkerRequest {
worker_id: worker_id.clone(),
address: worker_address.clone(),
api_key: api_key.clone(),
capacity: WorkerCapacity::default(),
};
match client
.post(®ister_url)
.header("x-api-key", &api_key)
.json(&body)
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
if let Ok(reg_resp) = resp.json::<RegisterWorkerResponse>().await {
coordinator_heartbeat_interval = reg_resp.heartbeat_interval_secs;
}
info!(
"Registered with coordinator as '{}' at {}",
worker_id, coordinator_url
);
break;
}
Ok(resp) => {
warn!(
"Registration failed (HTTP {}), retrying in {:?}",
resp.status(),
backoff
);
}
Err(e) => {
warn!(
"Cannot reach coordinator at {}: {}, retrying in {:?}",
coordinator_url, e, backoff
);
}
}
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
let interval = coordinator_heartbeat_interval
.map(std::time::Duration::from_secs)
.unwrap_or(HEARTBEAT_INTERVAL);
let register_body = RegisterWorkerRequest {
worker_id: worker_id.clone(),
address: worker_address.clone(),
api_key: api_key.clone(),
capacity: WorkerCapacity::default(),
};
rest_heartbeat_loop(
&client,
&heartbeat_url,
®ister_url,
®ister_body,
interval,
&tenant_manager,
&api_key,
)
.await;
}
async fn collect_worker_metrics(
tenant_manager: &Option<varpulis_runtime::SharedTenantManager>,
) -> (usize, Vec<PipelineMetrics>) {
if let Some(ref tm) = tenant_manager {
let mgr = tm.read().await;
let tenant_count = mgr.tenant_count();
let metrics = mgr.collect_pipeline_metrics().await;
let count = metrics.len();
if count > 0 || tenant_count > 0 {
tracing::debug!(
tenant_count,
pipeline_count = count,
total_events = metrics.iter().map(|(_, e, _)| *e).sum::<u64>(),
"Heartbeat: collected worker metrics"
);
}
let health_data = mgr.collect_connector_health();
let pm: Vec<PipelineMetrics> = metrics
.into_iter()
.map(|(name, events_in, events_out)| {
let connector_health: Vec<ConnectorHealth> = health_data
.iter()
.filter(|(pname, _, _, _)| pname == &name)
.map(|(_, cname, ctype, report)| ConnectorHealth {
connector_name: cname.clone(),
connector_type: ctype.clone(),
connected: report.connected,
last_error: report.last_error.clone(),
messages_received: report.messages_received,
seconds_since_last_message: report.seconds_since_last_message,
})
.collect();
PipelineMetrics {
pipeline_name: name,
events_in,
events_out,
connector_health,
}
})
.collect();
(count, pm)
} else {
(0, Vec::new())
}
}
fn build_heartbeat(
pipelines_running: usize,
pipeline_metrics: Vec<PipelineMetrics>,
) -> HeartbeatRequest {
let total_events: u64 = pipeline_metrics.iter().map(|m| m.events_in).sum();
HeartbeatRequest {
events_processed: total_events,
pipelines_running,
pipeline_metrics,
}
}
async fn rest_heartbeat_loop(
client: &reqwest::Client,
heartbeat_url: &str,
register_url: &str,
register_body: &RegisterWorkerRequest,
interval: std::time::Duration,
tenant_manager: &Option<varpulis_runtime::SharedTenantManager>,
api_key: &str,
) {
use tracing::{debug, error, info, warn};
loop {
tokio::time::sleep(interval).await;
let (pipelines_running, pipeline_metrics) = collect_worker_metrics(tenant_manager).await;
let hb = build_heartbeat(pipelines_running, pipeline_metrics);
debug!(
pipelines_running = hb.pipelines_running,
events_processed = hb.events_processed,
pipeline_metrics_count = hb.pipeline_metrics.len(),
"Sending heartbeat"
);
match client
.post(heartbeat_url)
.header("x-api-key", api_key)
.json(&hb)
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
}
Ok(resp) if resp.status() == reqwest::StatusCode::NOT_FOUND => {
warn!("Heartbeat returned 404, re-registering with coordinator");
match client
.post(register_url)
.header("x-api-key", api_key)
.json(register_body)
.send()
.await
{
Ok(r) if r.status().is_success() => {
info!("Re-registered with coordinator successfully");
}
Ok(r) => {
warn!("Re-registration failed (HTTP {})", r.status());
}
Err(e) => {
error!("Re-registration failed: {}", e);
}
}
}
Ok(resp) => {
warn!("Heartbeat rejected (HTTP {})", resp.status());
}
Err(e) => {
error!("Heartbeat failed: {}", e);
}
}
}
}
#[cfg(feature = "nats-transport")]
pub async fn worker_nats_registration_loop(
nats_url: &str,
worker_id: &str,
api_key: &str,
tenant_manager: Option<varpulis_runtime::SharedTenantManager>,
) {
use tracing::{info, warn};
let client = match nats_transport::connect_nats(nats_url).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Failed to connect to NATS at {}: {}", nats_url, e);
return;
}
};
info!("Connected to NATS at {}", nats_url);
let mut backoff = std::time::Duration::from_secs(1);
let max_backoff = std::time::Duration::from_secs(30);
let timeout = std::time::Duration::from_secs(10);
let coordinator_heartbeat_interval: Option<u64>;
loop {
let body = RegisterWorkerRequest {
worker_id: worker_id.to_string(),
address: String::new(), api_key: api_key.to_string(),
capacity: WorkerCapacity::default(),
};
match nats_transport::nats_request::<_, RegisterWorkerResponse>(
&client,
&nats_transport::subject_register(),
&body,
timeout,
)
.await
{
Ok(resp) => {
coordinator_heartbeat_interval = resp.heartbeat_interval_secs;
info!("Registered with coordinator via NATS as '{}'", worker_id);
break;
}
Err(e) => {
warn!("NATS registration failed: {}, retrying in {:?}", e, backoff);
}
}
tokio::time::sleep(backoff).await;
backoff = (backoff * 2).min(max_backoff);
}
if let Some(ref tm) = tenant_manager {
let cmd_client = client.clone();
let wid = worker_id.to_string();
let key = api_key.to_string();
let tm_clone = tm.clone();
tokio::spawn(async move {
nats_worker::run_worker_nats_handler(cmd_client, &wid, &key, tm_clone).await;
});
}
let interval = coordinator_heartbeat_interval
.map(std::time::Duration::from_secs)
.unwrap_or(HEARTBEAT_INTERVAL);
let heartbeat_subject = nats_transport::subject_heartbeat(worker_id);
let mut ticker = tokio::time::interval(interval);
loop {
ticker.tick().await;
let (pipelines_running, pipeline_metrics) = collect_worker_metrics(&tenant_manager).await;
let hb = build_heartbeat(pipelines_running, pipeline_metrics);
if let Err(e) = nats_transport::nats_publish(&client, &heartbeat_subject, &hb).await {
warn!("Failed to publish heartbeat: {}", e);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_round_robin_placement() {
let placement = RoundRobinPlacement::new();
let w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
let w2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
let workers = vec![&w1, &w2];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
let first = placement.place(&pipeline, &workers).unwrap();
let second = placement.place(&pipeline, &workers).unwrap();
let third = placement.place(&pipeline, &workers).unwrap();
assert_eq!(first, WorkerId("w1".into()));
assert_eq!(second, WorkerId("w2".into()));
assert_eq!(third, WorkerId("w1".into()));
}
#[test]
fn test_least_loaded_placement() {
let placement = LeastLoadedPlacement;
let mut w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
w1.capacity.pipelines_running = 3;
let mut w2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
w2.capacity.pipelines_running = 1;
let workers = vec![&w1, &w2];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
let selected = placement.place(&pipeline, &workers).unwrap();
assert_eq!(selected, WorkerId("w2".into()));
}
#[test]
fn test_placement_empty_workers() {
let placement = RoundRobinPlacement::new();
let workers: Vec<&WorkerNode> = vec![];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
assert!(placement.place(&pipeline, &workers).is_none());
}
#[test]
fn test_round_robin_single_worker() {
let placement = RoundRobinPlacement::new();
let w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
let workers = vec![&w1];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
for _ in 0..10 {
assert_eq!(
placement.place(&pipeline, &workers),
Some(WorkerId("w1".into()))
);
}
}
#[test]
fn test_round_robin_wraps_around() {
let placement = RoundRobinPlacement::new();
let w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
let w2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
let w3 = WorkerNode::new(
WorkerId("w3".into()),
"http://localhost:9002".into(),
"key".into(),
);
let workers = vec![&w1, &w2, &w3];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
let mut results = Vec::new();
for _ in 0..9 {
results.push(placement.place(&pipeline, &workers).unwrap());
}
assert_eq!(results[0], WorkerId("w1".into()));
assert_eq!(results[1], WorkerId("w2".into()));
assert_eq!(results[2], WorkerId("w3".into()));
assert_eq!(results[3], WorkerId("w1".into()));
assert_eq!(results[4], WorkerId("w2".into()));
assert_eq!(results[5], WorkerId("w3".into()));
assert_eq!(results[6], WorkerId("w1".into()));
}
#[test]
fn test_least_loaded_empty_workers() {
let placement = LeastLoadedPlacement;
let workers: Vec<&WorkerNode> = vec![];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
assert!(placement.place(&pipeline, &workers).is_none());
}
#[test]
fn test_least_loaded_tied_workers() {
let placement = LeastLoadedPlacement;
let mut w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
w1.capacity.pipelines_running = 2;
let mut w2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
w2.capacity.pipelines_running = 2;
let workers = vec![&w1, &w2];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
let result = placement.place(&pipeline, &workers).unwrap();
assert!(result == WorkerId("w1".into()) || result == WorkerId("w2".into()));
}
#[test]
fn test_least_loaded_picks_zero_load() {
let placement = LeastLoadedPlacement;
let mut w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
w1.capacity.pipelines_running = 5;
let w2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
let workers = vec![&w1, &w2];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
assert_eq!(
placement.place(&pipeline, &workers),
Some(WorkerId("w2".into()))
);
}
#[test]
fn test_cluster_error_display() {
let e = ClusterError::WorkerNotFound("w42".into());
assert_eq!(e.to_string(), "Worker not found: w42");
let e = ClusterError::GroupNotFound("g99".into());
assert_eq!(e.to_string(), "Pipeline group not found: g99");
let e = ClusterError::NoWorkersAvailable;
assert_eq!(e.to_string(), "No workers available for deployment");
let e = ClusterError::DeployFailed("connection refused".into());
assert_eq!(
e.to_string(),
"Pipeline deployment failed: connection refused"
);
let e = ClusterError::RoutingFailed("no target".into());
assert_eq!(e.to_string(), "Event routing failed: no target");
}
#[test]
fn test_round_robin_default() {
let placement = RoundRobinPlacement::default();
let w = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
let workers = vec![&w];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
assert_eq!(
placement.place(&pipeline, &workers),
Some(WorkerId("w1".into()))
);
}
#[test]
fn test_least_loaded_prefers_more_cores() {
let placement = LeastLoadedPlacement;
let mut w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
w1.capacity.pipelines_running = 2;
w1.capacity.cpu_cores = 8;
let mut w2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
w2.capacity.pipelines_running = 2;
w2.capacity.cpu_cores = 2;
let workers = vec![&w1, &w2];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
assert_eq!(
placement.place(&pipeline, &workers),
Some(WorkerId("w1".into()))
);
}
#[test]
fn test_least_loaded_same_ratio_picks_fewer_pipelines() {
let placement = LeastLoadedPlacement;
let mut w1 = WorkerNode::new(
WorkerId("w1".into()),
"http://localhost:9000".into(),
"key".into(),
);
w1.capacity.pipelines_running = 4;
w1.capacity.cpu_cores = 4;
let mut w2 = WorkerNode::new(
WorkerId("w2".into()),
"http://localhost:9001".into(),
"key".into(),
);
w2.capacity.pipelines_running = 2;
w2.capacity.cpu_cores = 2;
let workers = vec![&w1, &w2];
let pipeline = PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
};
assert_eq!(
placement.place(&pipeline, &workers),
Some(WorkerId("w2".into()))
);
}
#[tokio::test]
async fn test_collect_worker_metrics_with_shared_tenant_manager() {
let tm = varpulis_runtime::shared_tenant_manager();
{
let mut mgr = tm.write().await;
let id = mgr
.create_tenant(
"default".into(),
"test-key".into(),
varpulis_runtime::TenantQuota::enterprise(),
)
.unwrap();
mgr.deploy_pipeline_on_tenant(
&id,
"test-pipeline".into(),
"stream A = SensorReading .where(temperature > 100)".into(),
)
.await
.unwrap();
}
let tenant_manager = Some(tm.clone());
let (count, metrics) = collect_worker_metrics(&tenant_manager).await;
assert_eq!(count, 1, "Should find 1 pipeline");
assert_eq!(metrics[0].pipeline_name, "test-pipeline");
assert_eq!(metrics[0].events_in, 0);
{
let mut mgr = tm.write().await;
let tenant_id = mgr.get_tenant_by_api_key("test-key").unwrap().clone();
let pid = {
let tenant = mgr.get_tenant(&tenant_id).unwrap();
tenant.pipelines.keys().next().unwrap().clone()
};
for _ in 0..10 {
let event =
varpulis_runtime::Event::new("SensorReading").with_field("temperature", 150.0);
mgr.process_event_with_backpressure(&tenant_id, &pid, event)
.await
.unwrap();
}
}
let (count, metrics) = collect_worker_metrics(&tenant_manager).await;
assert_eq!(count, 1);
assert_eq!(
metrics[0].events_in, 10,
"Heartbeat should report 10 events processed"
);
let hb = build_heartbeat(count, metrics);
assert_eq!(hb.pipelines_running, 1);
assert_eq!(hb.events_processed, 10);
assert_eq!(hb.pipeline_metrics.len(), 1);
}
#[tokio::test]
async fn test_collect_worker_metrics_none_tenant_manager() {
let (count, metrics) = collect_worker_metrics(&None).await;
assert_eq!(count, 0);
assert!(metrics.is_empty());
}
#[tokio::test]
async fn test_heartbeat_reflects_pipeline_deployment() {
let tm = varpulis_runtime::shared_tenant_manager();
{
let mut mgr = tm.write().await;
mgr.create_tenant(
"default".into(),
"key".into(),
varpulis_runtime::TenantQuota::enterprise(),
)
.unwrap();
}
let tenant_manager = Some(tm.clone());
let (count, _) = collect_worker_metrics(&tenant_manager).await;
assert_eq!(count, 0, "No pipelines before deployment");
{
let mut mgr = tm.write().await;
let tid = mgr.get_tenant_by_api_key("key").unwrap().clone();
mgr.deploy_pipeline_on_tenant(
&tid,
"my-pipeline".into(),
"stream A = SensorReading .where(x > 1)".into(),
)
.await
.unwrap();
}
let (count, metrics) = collect_worker_metrics(&tenant_manager).await;
assert_eq!(count, 1, "Should see pipeline after deployment");
assert_eq!(metrics[0].pipeline_name, "my-pipeline");
}
}