use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::sync::atomic::AtomicBool;
use std::time::Instant;
use serde::{Deserialize, Serialize};
use tracing::warn;
use crate::worker::WorkerId;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineGroupSpec {
pub name: String,
pub pipelines: Vec<PipelinePlacement>,
#[serde(default)]
pub routes: Vec<InterPipelineRoute>,
#[serde(default)]
pub region_affinity: Option<String>,
#[serde(default)]
pub cross_region_routes: Vec<CrossRegionRouteSpec>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CrossRegionRouteSpec {
pub to_region: String,
pub event_types: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelinePlacement {
pub name: String,
pub source: String,
#[serde(default)]
pub worker_affinity: Option<String>,
#[serde(default = "default_replicas")]
pub replicas: usize,
#[serde(default)]
pub partition_key: Option<String>,
}
fn default_replicas() -> usize {
1
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterPipelineRoute {
pub from_pipeline: String,
pub to_pipeline: String,
pub event_types: Vec<String>,
#[serde(default)]
pub nats_subject: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum GroupStatus {
Deploying,
Running,
PartiallyRunning,
Failed,
TornDown,
}
impl std::fmt::Display for GroupStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Deploying => write!(f, "deploying"),
Self::Running => write!(f, "running"),
Self::PartiallyRunning => write!(f, "partially_running"),
Self::Failed => write!(f, "failed"),
Self::TornDown => write!(f, "torn_down"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PipelineDeploymentStatus {
Deploying,
Running,
Failed,
Stopped,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineDeployment {
pub worker_id: WorkerId,
pub worker_address: String,
pub worker_api_key: String,
pub pipeline_id: String,
pub status: PipelineDeploymentStatus,
#[serde(default)]
pub epoch: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeployedPipelineGroup {
pub id: String,
pub name: String,
pub spec: PipelineGroupSpec,
pub placements: HashMap<String, PipelineDeployment>,
pub replica_groups: HashMap<String, ReplicaGroup>,
#[serde(skip, default = "Instant::now")]
pub created_at: Instant,
pub status: GroupStatus,
}
impl DeployedPipelineGroup {
pub fn new(id: String, name: String, spec: PipelineGroupSpec) -> Self {
Self {
id,
name,
spec,
placements: HashMap::new(),
replica_groups: HashMap::new(),
created_at: Instant::now(),
status: GroupStatus::Deploying,
}
}
pub fn update_status(&mut self) {
if self.placements.is_empty() {
return;
}
let all_running = self
.placements
.values()
.all(|p| p.status == PipelineDeploymentStatus::Running);
let any_running = self
.placements
.values()
.any(|p| p.status == PipelineDeploymentStatus::Running);
let all_failed = self
.placements
.values()
.all(|p| p.status == PipelineDeploymentStatus::Failed);
self.status = if all_running {
GroupStatus::Running
} else if all_failed {
GroupStatus::Failed
} else if any_running {
GroupStatus::PartiallyRunning
} else {
GroupStatus::Deploying
};
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PartitionStrategy {
RoundRobin,
HashKey(String),
}
#[derive(Debug, Serialize, Deserialize)]
pub struct ReplicaGroup {
pub pipeline_name: String,
pub replica_names: Vec<String>,
pub strategy: PartitionStrategy,
#[serde(skip, default = "default_counter")]
pub counter: std::sync::Arc<std::sync::atomic::AtomicUsize>,
#[serde(skip)]
missing_field_warned: AtomicBool,
}
fn default_counter() -> std::sync::Arc<std::sync::atomic::AtomicUsize> {
std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0))
}
impl Clone for ReplicaGroup {
fn clone(&self) -> Self {
Self {
pipeline_name: self.pipeline_name.clone(),
replica_names: self.replica_names.clone(),
strategy: self.strategy.clone(),
counter: self.counter.clone(),
missing_field_warned: AtomicBool::new(
self.missing_field_warned
.load(std::sync::atomic::Ordering::Relaxed),
),
}
}
}
impl ReplicaGroup {
pub fn new(
pipeline_name: String,
replica_names: Vec<String>,
strategy: PartitionStrategy,
) -> Self {
Self {
pipeline_name,
replica_names,
strategy,
counter: std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0)),
missing_field_warned: AtomicBool::new(false),
}
}
pub fn select_replica(&self, fields: &serde_json::Map<String, serde_json::Value>) -> &str {
if self.replica_names.is_empty() {
return &self.pipeline_name;
}
let idx = match &self.strategy {
PartitionStrategy::RoundRobin => {
self.counter
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
% self.replica_names.len()
}
PartitionStrategy::HashKey(field) => {
let value = match fields.get(field) {
Some(v) => v.to_string(),
None => {
if !self
.missing_field_warned
.swap(true, std::sync::atomic::Ordering::Relaxed)
{
warn!(
"Partition field '{}' missing from event for pipeline '{}'; \
all such events will route to the same replica",
field, self.pipeline_name
);
}
String::new()
}
};
let mut hasher = std::collections::hash_map::DefaultHasher::new();
value.hash(&mut hasher);
(hasher.finish() as usize) % self.replica_names.len()
}
};
&self.replica_names[idx]
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PipelineGroupInfo {
pub id: String,
pub name: String,
pub status: String,
pub pipeline_count: usize,
pub placements: Vec<PipelinePlacementInfo>,
#[serde(default)]
pub sources: HashMap<String, String>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PipelinePlacementInfo {
pub pipeline_name: String,
pub worker_id: String,
pub worker_address: String,
pub pipeline_id: String,
pub status: String,
}
impl From<&DeployedPipelineGroup> for PipelineGroupInfo {
fn from(group: &DeployedPipelineGroup) -> Self {
let placements = group
.placements
.iter()
.map(|(name, dep)| PipelinePlacementInfo {
pipeline_name: name.clone(),
worker_id: dep.worker_id.0.clone(),
worker_address: dep.worker_address.clone(),
pipeline_id: dep.pipeline_id.clone(),
status: format!("{:?}", dep.status),
})
.collect();
let sources = group
.spec
.pipelines
.iter()
.map(|p| (p.name.clone(), p.source.clone()))
.collect();
Self {
id: group.id.clone(),
name: group.name.clone(),
status: group.status.to_string(),
pipeline_count: group.spec.pipelines.len(),
placements,
sources,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_group_status_tracking() {
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![
PipelinePlacement {
name: "p1".into(),
source: "stream A = X".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
PipelinePlacement {
name: "p2".into(),
source: "stream B = Y".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
assert_eq!(group.status, GroupStatus::Deploying);
group.placements.insert(
"p1".into(),
PipelineDeployment {
worker_id: WorkerId("w1".into()),
worker_address: "http://localhost:9000".into(),
worker_api_key: "key".into(),
pipeline_id: "pid1".into(),
status: PipelineDeploymentStatus::Running,
epoch: 0,
},
);
group.placements.insert(
"p2".into(),
PipelineDeployment {
worker_id: WorkerId("w2".into()),
worker_address: "http://localhost:9001".into(),
worker_api_key: "key".into(),
pipeline_id: "pid2".into(),
status: PipelineDeploymentStatus::Running,
epoch: 0,
},
);
group.update_status();
assert_eq!(group.status, GroupStatus::Running);
group.placements.get_mut("p2").unwrap().status = PipelineDeploymentStatus::Failed;
group.update_status();
assert_eq!(group.status, GroupStatus::PartiallyRunning);
}
#[test]
fn test_pipeline_group_spec_serde() {
let json = r#"{
"name": "test-group",
"pipelines": [
{"name": "p1", "source": "stream A = X", "worker_affinity": "worker-0"},
{"name": "p2", "source": "stream B = Y"}
],
"routes": [
{"from_pipeline": "_external", "to_pipeline": "p1", "event_types": ["Event*"]}
]
}"#;
let spec: PipelineGroupSpec = serde_json::from_str(json).unwrap();
assert_eq!(spec.name, "test-group");
assert_eq!(spec.pipelines.len(), 2);
assert_eq!(
spec.pipelines[0].worker_affinity,
Some("worker-0".to_string())
);
assert!(spec.pipelines[1].worker_affinity.is_none());
assert_eq!(spec.routes.len(), 1);
}
#[test]
fn test_group_status_all_failed() {
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![
PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
PipelinePlacement {
name: "p2".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
group.placements.insert(
"p1".into(),
PipelineDeployment {
worker_id: WorkerId("w1".into()),
worker_address: "http://localhost:9000".into(),
worker_api_key: "key".into(),
pipeline_id: "".into(),
status: PipelineDeploymentStatus::Failed,
epoch: 0,
},
);
group.placements.insert(
"p2".into(),
PipelineDeployment {
worker_id: WorkerId("w2".into()),
worker_address: "http://localhost:9001".into(),
worker_api_key: "key".into(),
pipeline_id: "".into(),
status: PipelineDeploymentStatus::Failed,
epoch: 0,
},
);
group.update_status();
assert_eq!(group.status, GroupStatus::Failed);
}
#[test]
fn test_group_status_empty_placements() {
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
assert_eq!(group.status, GroupStatus::Deploying);
group.update_status();
assert_eq!(group.status, GroupStatus::Deploying);
}
#[test]
fn test_group_status_mixed_deploying_and_running() {
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![
PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
PipelinePlacement {
name: "p2".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
group.placements.insert(
"p1".into(),
PipelineDeployment {
worker_id: WorkerId("w1".into()),
worker_address: "http://localhost:9000".into(),
worker_api_key: "key".into(),
pipeline_id: "pid1".into(),
status: PipelineDeploymentStatus::Running,
epoch: 0,
},
);
group.placements.insert(
"p2".into(),
PipelineDeployment {
worker_id: WorkerId("w2".into()),
worker_address: "http://localhost:9001".into(),
worker_api_key: "key".into(),
pipeline_id: "pid2".into(),
status: PipelineDeploymentStatus::Deploying,
epoch: 0,
},
);
group.update_status();
assert_eq!(group.status, GroupStatus::PartiallyRunning);
}
#[test]
fn test_group_status_all_stopped() {
let spec = PipelineGroupSpec {
name: "test".into(),
pipelines: vec![PipelinePlacement {
name: "p1".into(),
source: "".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
}],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test".into(), spec);
group.placements.insert(
"p1".into(),
PipelineDeployment {
worker_id: WorkerId("w1".into()),
worker_address: "http://localhost:9000".into(),
worker_api_key: "key".into(),
pipeline_id: "pid1".into(),
status: PipelineDeploymentStatus::Stopped,
epoch: 0,
},
);
group.update_status();
assert_eq!(group.status, GroupStatus::Deploying);
}
#[test]
fn test_group_status_display() {
assert_eq!(GroupStatus::Deploying.to_string(), "deploying");
assert_eq!(GroupStatus::Running.to_string(), "running");
assert_eq!(
GroupStatus::PartiallyRunning.to_string(),
"partially_running"
);
assert_eq!(GroupStatus::Failed.to_string(), "failed");
assert_eq!(GroupStatus::TornDown.to_string(), "torn_down");
}
#[test]
fn test_group_status_serde() {
for status in [
GroupStatus::Deploying,
GroupStatus::Running,
GroupStatus::PartiallyRunning,
GroupStatus::Failed,
GroupStatus::TornDown,
] {
let json = serde_json::to_string(&status).unwrap();
let parsed: GroupStatus = serde_json::from_str(&json).unwrap();
assert_eq!(parsed, status);
}
}
#[test]
fn test_pipeline_group_info_from_deployed() {
let spec = PipelineGroupSpec {
name: "test-group".into(),
pipelines: vec![
PipelinePlacement {
name: "p1".into(),
source: "stream A = X".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
PipelinePlacement {
name: "p2".into(),
source: "stream B = Y".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
},
],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "test-group".into(), spec);
group.status = GroupStatus::Running;
group.placements.insert(
"p1".into(),
PipelineDeployment {
worker_id: WorkerId("w1".into()),
worker_address: "http://localhost:9000".into(),
worker_api_key: "key".into(),
pipeline_id: "pid-abc".into(),
status: PipelineDeploymentStatus::Running,
epoch: 0,
},
);
let info = PipelineGroupInfo::from(&group);
assert_eq!(info.id, "g1");
assert_eq!(info.name, "test-group");
assert_eq!(info.status, "running");
assert_eq!(info.pipeline_count, 2);
assert_eq!(info.placements.len(), 1); assert_eq!(info.sources.len(), 2);
assert_eq!(info.sources["p1"], "stream A = X");
assert_eq!(info.sources["p2"], "stream B = Y");
let p = &info.placements[0];
assert_eq!(p.pipeline_name, "p1");
assert_eq!(p.worker_id, "w1");
assert_eq!(p.pipeline_id, "pid-abc");
}
#[test]
fn test_pipeline_group_info_serde() {
let info = PipelineGroupInfo {
id: "g1".into(),
name: "test".into(),
status: "running".into(),
pipeline_count: 2,
placements: vec![PipelinePlacementInfo {
pipeline_name: "p1".into(),
worker_id: "w1".into(),
worker_address: "http://localhost:9000".into(),
pipeline_id: "pid1".into(),
status: "Running".into(),
}],
sources: HashMap::new(),
};
let json = serde_json::to_string(&info).unwrap();
let parsed: PipelineGroupInfo = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.id, "g1");
assert_eq!(parsed.placements.len(), 1);
}
#[test]
fn test_pipeline_group_spec_no_routes() {
let json = r#"{
"name": "simple",
"pipelines": [
{"name": "p1", "source": "stream A = X"}
]
}"#;
let spec: PipelineGroupSpec = serde_json::from_str(json).unwrap();
assert_eq!(spec.name, "simple");
assert!(spec.routes.is_empty()); }
#[test]
fn test_inter_pipeline_route_serde() {
let route = InterPipelineRoute {
from_pipeline: "a".into(),
to_pipeline: "b".into(),
event_types: vec!["TypeA*".into(), "TypeB".into()],
nats_subject: Some("custom/topic".into()),
};
let json = serde_json::to_string(&route).unwrap();
let parsed: InterPipelineRoute = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.from_pipeline, "a");
assert_eq!(parsed.to_pipeline, "b");
assert_eq!(parsed.event_types.len(), 2);
assert_eq!(parsed.nats_subject, Some("custom/topic".into()));
}
#[test]
fn test_inter_pipeline_route_no_nats_subject() {
let json = r#"{
"from_pipeline": "a",
"to_pipeline": "b",
"event_types": ["*"]
}"#;
let parsed: InterPipelineRoute = serde_json::from_str(json).unwrap();
assert!(parsed.nats_subject.is_none());
}
#[test]
fn test_single_pipeline_group() {
let spec = PipelineGroupSpec {
name: "singleton".into(),
pipelines: vec![PipelinePlacement {
name: "only".into(),
source: "stream A = X".into(),
worker_affinity: None,
replicas: 1,
partition_key: None,
}],
routes: vec![],
region_affinity: None,
cross_region_routes: vec![],
};
let mut group = DeployedPipelineGroup::new("g1".into(), "singleton".into(), spec);
group.placements.insert(
"only".into(),
PipelineDeployment {
worker_id: WorkerId("w1".into()),
worker_address: "http://localhost:9000".into(),
worker_api_key: "key".into(),
pipeline_id: "pid1".into(),
status: PipelineDeploymentStatus::Running,
epoch: 0,
},
);
group.update_status();
assert_eq!(group.status, GroupStatus::Running);
}
#[test]
fn test_replica_group_round_robin() {
let group = ReplicaGroup::new(
"p1".into(),
vec!["p1#0".into(), "p1#1".into(), "p1#2".into()],
PartitionStrategy::RoundRobin,
);
let fields = serde_json::Map::new();
assert_eq!(group.select_replica(&fields), "p1#0");
assert_eq!(group.select_replica(&fields), "p1#1");
assert_eq!(group.select_replica(&fields), "p1#2");
assert_eq!(group.select_replica(&fields), "p1#0"); }
#[test]
fn test_replica_group_hash_key_deterministic() {
let group = ReplicaGroup::new(
"p1".into(),
vec!["p1#0".into(), "p1#1".into()],
PartitionStrategy::HashKey("source".into()),
);
let mut fields1 = serde_json::Map::new();
fields1.insert(
"source".into(),
serde_json::Value::String("server-A".into()),
);
let mut fields2 = serde_json::Map::new();
fields2.insert(
"source".into(),
serde_json::Value::String("server-A".into()),
);
let r1 = group.select_replica(&fields1);
let r2 = group.select_replica(&fields2);
assert_eq!(r1, r2);
}
#[test]
fn test_replica_group_hash_key_missing_field_routes_consistently() {
let group = ReplicaGroup::new(
"p1".into(),
vec!["p1#0".into(), "p1#1".into()],
PartitionStrategy::HashKey("missing_field".into()),
);
let fields = serde_json::Map::new();
let r1 = group.select_replica(&fields);
let r2 = group.select_replica(&fields);
assert_eq!(r1, r2);
assert!(group
.missing_field_warned
.load(std::sync::atomic::Ordering::Relaxed));
}
#[test]
fn test_replica_group_hash_key_missing_field_warns_once() {
let group = ReplicaGroup::new(
"p1".into(),
vec!["p1#0".into(), "p1#1".into()],
PartitionStrategy::HashKey("nonexistent".into()),
);
let fields = serde_json::Map::new();
assert!(!group
.missing_field_warned
.load(std::sync::atomic::Ordering::Relaxed));
group.select_replica(&fields);
assert!(group
.missing_field_warned
.load(std::sync::atomic::Ordering::Relaxed));
group.select_replica(&fields);
assert!(group
.missing_field_warned
.load(std::sync::atomic::Ordering::Relaxed));
}
#[test]
fn test_replica_group_empty_replicas_returns_pipeline_name() {
let group = ReplicaGroup::new("p1".into(), vec![], PartitionStrategy::RoundRobin);
let fields = serde_json::Map::new();
assert_eq!(group.select_replica(&fields), "p1");
}
#[test]
fn test_replica_group_clone() {
let group = ReplicaGroup::new(
"p1".into(),
vec!["p1#0".into(), "p1#1".into()],
PartitionStrategy::HashKey("source".into()),
);
group
.counter
.fetch_add(5, std::sync::atomic::Ordering::Relaxed);
group
.missing_field_warned
.store(true, std::sync::atomic::Ordering::Relaxed);
let cloned = group;
assert_eq!(cloned.pipeline_name, "p1");
assert_eq!(cloned.replica_names.len(), 2);
assert!(cloned
.missing_field_warned
.load(std::sync::atomic::Ordering::Relaxed));
assert_eq!(cloned.counter.load(std::sync::atomic::Ordering::Relaxed), 5);
}
#[test]
fn test_replica_group_hash_different_keys_may_differ() {
let group = ReplicaGroup::new(
"p1".into(),
vec!["p1#0".into(), "p1#1".into(), "p1#2".into(), "p1#3".into()],
PartitionStrategy::HashKey("id".into()),
);
let mut replicas_seen = std::collections::HashSet::new();
for i in 0..100 {
let mut fields = serde_json::Map::new();
fields.insert("id".into(), serde_json::Value::String(format!("key-{}", i)));
replicas_seen.insert(group.select_replica(&fields).to_string());
}
assert!(
replicas_seen.len() >= 2,
"Expected multiple replicas hit, got: {:?}",
replicas_seen
);
}
#[test]
fn test_pipeline_placement_replicas_default() {
let json = r#"{
"name": "p1",
"source": "stream A = X"
}"#;
let spec: PipelinePlacement = serde_json::from_str(json).unwrap();
assert_eq!(spec.replicas, 1);
assert!(spec.partition_key.is_none());
}
#[test]
fn test_pipeline_placement_replicas_explicit() {
let json = r#"{
"name": "p1",
"source": "stream A = X",
"replicas": 3,
"partition_key": "source_ip"
}"#;
let spec: PipelinePlacement = serde_json::from_str(json).unwrap();
assert_eq!(spec.replicas, 3);
assert_eq!(spec.partition_key, Some("source_ip".into()));
}
#[test]
fn test_partition_strategy_serde() {
for strategy in [
PartitionStrategy::RoundRobin,
PartitionStrategy::HashKey("source".into()),
] {
let json = serde_json::to_string(&strategy).unwrap();
let parsed: PartitionStrategy = serde_json::from_str(&json).unwrap();
match (&strategy, &parsed) {
(PartitionStrategy::RoundRobin, PartitionStrategy::RoundRobin) => {}
(PartitionStrategy::HashKey(a), PartitionStrategy::HashKey(b)) => {
assert_eq!(a, b);
}
_ => panic!("Serde round-trip mismatch"),
}
}
}
}