use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskMetadataEntry {
pub index: u32,
pub id: String,
pub namespaced_id_template: String,
pub dependencies: Vec<String>,
pub description: String,
pub source_location: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackageTasksMetadata {
pub workflow_name: String,
pub package_name: String,
pub package_description: Option<String>,
pub package_author: Option<String>,
pub workflow_fingerprint: Option<String>,
pub graph_data_json: Option<String>,
pub tasks: Vec<TaskMetadataEntry>,
#[serde(default)]
pub triggers: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskExecutionRequest {
pub task_name: String,
pub context_json: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskExecutionResult {
pub success: bool,
pub context_json: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphPackageMetadata {
pub graph_name: String,
pub package_name: String,
pub reaction_mode: String,
#[serde(default = "default_input_strategy")]
pub input_strategy: String,
pub accumulators: Vec<AccumulatorDeclarationEntry>,
#[serde(default)]
pub trigger_reactor: Option<String>,
}
fn default_input_strategy() -> String {
"latest".to_string()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccumulatorDeclarationEntry {
pub name: String,
pub accumulator_type: String,
#[serde(default)]
pub config: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphExecutionRequest {
pub cache: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReactorPackageMetadata {
pub name: String,
pub package_name: String,
pub reaction_mode: String,
pub accumulators: Vec<AccumulatorDeclarationEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerlessGraphMetadataEntry {
pub name: String,
pub package_name: String,
pub terminal_node_names: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerlessGraphInvokeRequest {
pub graph_name: String,
pub context_json: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerlessGraphInvokeResult {
pub success: bool,
pub terminal_outputs_json: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerInvokeRequest {
pub trigger_name: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerInvokeResult {
pub fire: bool,
pub context_json: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerPackageMetadata {
pub name: String,
pub package_name: String,
pub poll_interval: String,
#[serde(default)]
pub cron_expression: Option<String>,
#[serde(default)]
pub allow_concurrent: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GraphExecutionResult {
pub success: bool,
pub terminal_outputs_json: Option<Vec<String>>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct CloacinaMetadata {
#[serde(default)]
pub workflow_name: Option<String>,
#[serde(default)]
pub graph_name: Option<String>,
pub language: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub author: Option<String>,
#[serde(default)]
pub requires_python: Option<String>,
#[serde(default)]
pub entry_module: Option<String>,
#[serde(default)]
pub reaction_mode: Option<String>,
#[serde(default)]
pub input_strategy: Option<String>,
#[serde(default)]
pub accumulators: Vec<AccumulatorConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AccumulatorConfig {
pub name: String,
#[serde(default = "default_accumulator_type")]
pub accumulator_type: String,
#[serde(default)]
pub config: std::collections::HashMap<String, String>,
}
fn default_accumulator_type() -> String {
"passthrough".to_string()
}
impl CloacinaMetadata {
pub fn has_workflow(&self) -> bool {
self.graph_name.is_none() || self.workflow_name.is_some()
}
pub fn has_computation_graph(&self) -> bool {
self.graph_name.is_some()
}
pub fn effective_workflow_name(&self) -> Option<&str> {
self.workflow_name.as_deref()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_metadata_serde_round_trip() {
let entry = TaskMetadataEntry {
index: 0,
id: "extract_data".to_string(),
namespaced_id_template: "{tenant}::{pkg}::pipeline::extract_data".to_string(),
dependencies: vec![],
description: "Extract data from sources".to_string(),
source_location: "src/lib.rs".to_string(),
};
let json = serde_json::to_string(&entry).unwrap();
let roundtrip: TaskMetadataEntry = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.id, "extract_data");
assert_eq!(roundtrip.index, 0);
}
#[test]
fn test_package_tasks_metadata_serde_round_trip() {
let metadata = PackageTasksMetadata {
workflow_name: "analytics_pipeline".to_string(),
package_name: "analytics-pkg".to_string(),
package_description: Some("Analytics workflow".to_string()),
package_author: Some("Team".to_string()),
workflow_fingerprint: Some("sha256:abc123".to_string()),
graph_data_json: None,
tasks: vec![TaskMetadataEntry {
index: 0,
id: "step_one".to_string(),
namespaced_id_template: "{tenant}::{pkg}::analytics::step_one".to_string(),
dependencies: vec![],
description: "First step".to_string(),
source_location: "src/lib.rs".to_string(),
}],
triggers: Vec::new(),
};
let json = serde_json::to_string(&metadata).unwrap();
let roundtrip: PackageTasksMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.workflow_name, "analytics_pipeline");
assert_eq!(roundtrip.tasks.len(), 1);
}
#[test]
fn test_task_execution_request_round_trip() {
let request = TaskExecutionRequest {
task_name: "extract_data".to_string(),
context_json: r#"{"key": "value"}"#.to_string(),
};
let json = serde_json::to_string(&request).unwrap();
let roundtrip: TaskExecutionRequest = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.task_name, "extract_data");
}
#[test]
fn test_task_execution_result_success() {
let result = TaskExecutionResult {
success: true,
context_json: Some(r#"{"updated": true}"#.to_string()),
error: None,
};
let json = serde_json::to_string(&result).unwrap();
let roundtrip: TaskExecutionResult = serde_json::from_str(&json).unwrap();
assert!(roundtrip.success);
assert!(roundtrip.context_json.is_some());
assert!(roundtrip.error.is_none());
}
#[test]
fn test_task_execution_result_failure() {
let result = TaskExecutionResult {
success: false,
context_json: None,
error: Some("Task panicked".to_string()),
};
let json = serde_json::to_string(&result).unwrap();
let roundtrip: TaskExecutionResult = serde_json::from_str(&json).unwrap();
assert!(!roundtrip.success);
assert!(roundtrip.error.is_some());
}
#[test]
fn test_cloacina_metadata_rust_from_toml() {
let toml_str = r#"
workflow_name = "analytics_pipeline"
language = "rust"
description = "Data analytics workflow"
author = "Analytics Team"
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert_eq!(
metadata.workflow_name.as_deref(),
Some("analytics_pipeline")
);
assert_eq!(metadata.language, "rust");
assert_eq!(
metadata.description.as_deref(),
Some("Data analytics workflow")
);
assert!(metadata.requires_python.is_none());
assert!(metadata.entry_module.is_none());
}
#[test]
fn test_cloacina_metadata_python_from_toml() {
let toml_str = r#"
workflow_name = "etl_pipeline"
language = "python"
description = "Python ETL workflow"
requires_python = ">=3.11"
entry_module = "workflow.tasks"
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert_eq!(metadata.workflow_name.as_deref(), Some("etl_pipeline"));
assert_eq!(metadata.language, "python");
assert_eq!(metadata.requires_python.as_deref(), Some(">=3.11"));
assert_eq!(metadata.entry_module.as_deref(), Some("workflow.tasks"));
}
#[test]
fn test_cloacina_metadata_minimal_rust() {
let toml_str = r#"
workflow_name = "simple_workflow"
language = "rust"
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert_eq!(metadata.workflow_name.as_deref(), Some("simple_workflow"));
assert_eq!(metadata.language, "rust");
assert!(metadata.description.is_none());
}
#[test]
fn test_cloacina_metadata_missing_language_fails() {
let toml_str = r#"
workflow_name = "no_language"
"#;
let result = toml::from_str::<CloacinaMetadata>(toml_str);
assert!(result.is_err(), "Missing language field should fail");
}
#[test]
fn test_cloacina_metadata_workflow_classification() {
let toml_str = r#"
workflow_name = "legacy_workflow"
language = "rust"
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert!(metadata.has_workflow());
assert!(!metadata.has_computation_graph());
}
#[test]
fn test_cloacina_metadata_computation_graph_from_toml() {
let toml_str = r#"
graph_name = "market_maker"
language = "rust"
reaction_mode = "when_any"
input_strategy = "latest"
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert!(metadata.has_computation_graph());
assert_eq!(metadata.graph_name.as_deref(), Some("market_maker"));
assert_eq!(metadata.reaction_mode.as_deref(), Some("when_any"));
assert!(metadata.workflow_name.is_none());
}
#[test]
fn test_cloacina_metadata_legacy_package_type_rejected() {
let toml_str = r#"
package_type = ["computation_graph"]
workflow_name = "x"
language = "rust"
"#;
let err = toml::from_str::<CloacinaMetadata>(toml_str).unwrap_err();
assert!(
err.to_string().contains("package_type"),
"expected error to name `package_type`, got: {}",
err
);
}
#[test]
fn test_cloacina_metadata_legacy_triggers_rejected() {
let toml_str = r#"
workflow_name = "x"
language = "rust"
[[triggers]]
name = "t"
workflow = "x"
poll_interval = "5s"
allow_concurrent = false
"#;
let err = toml::from_str::<CloacinaMetadata>(toml_str).unwrap_err();
assert!(
err.to_string().contains("triggers"),
"expected error to name `triggers`, got: {}",
err
);
}
#[test]
fn test_graph_package_metadata_round_trip() {
let metadata = GraphPackageMetadata {
graph_name: "market_maker".to_string(),
package_name: "mm-pkg".to_string(),
reaction_mode: "when_any".to_string(),
input_strategy: "latest".to_string(),
accumulators: vec![
AccumulatorDeclarationEntry {
name: "orderbook".to_string(),
accumulator_type: "stream".to_string(),
config: [("topic".to_string(), "market.orderbook".to_string())]
.into_iter()
.collect(),
},
AccumulatorDeclarationEntry {
name: "pricing".to_string(),
accumulator_type: "passthrough".to_string(),
config: std::collections::HashMap::new(),
},
],
trigger_reactor: None,
};
let json = serde_json::to_string(&metadata).unwrap();
let roundtrip: GraphPackageMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.graph_name, "market_maker");
assert_eq!(roundtrip.accumulators.len(), 2);
assert_eq!(roundtrip.accumulators[0].accumulator_type, "stream");
assert_eq!(
roundtrip.accumulators[0].config.get("topic").unwrap(),
"market.orderbook"
);
}
#[test]
fn test_graph_execution_request_round_trip() {
let request = GraphExecutionRequest {
cache: [("alpha".to_string(), r#"{"value": 42.0}"#.to_string())]
.into_iter()
.collect(),
};
let json = serde_json::to_string(&request).unwrap();
let roundtrip: GraphExecutionRequest = serde_json::from_str(&json).unwrap();
assert!(roundtrip.cache.contains_key("alpha"));
}
#[test]
fn test_graph_execution_result_round_trip() {
let result = GraphExecutionResult {
success: true,
terminal_outputs_json: Some(vec![r#"{"published": true}"#.to_string()]),
error: None,
};
let json = serde_json::to_string(&result).unwrap();
let roundtrip: GraphExecutionResult = serde_json::from_str(&json).unwrap();
assert!(roundtrip.success);
assert_eq!(roundtrip.terminal_outputs_json.unwrap().len(), 1);
}
}