use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskMetadataEntry {
pub index: u32,
pub id: String,
pub namespaced_id_template: String,
pub dependencies: Vec<String>,
pub description: String,
pub source_location: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PackageTasksMetadata {
pub workflow_name: String,
pub package_name: String,
pub package_description: Option<String>,
pub package_author: Option<String>,
pub workflow_fingerprint: Option<String>,
pub graph_data_json: Option<String>,
pub tasks: Vec<TaskMetadataEntry>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskExecutionRequest {
pub task_name: String,
pub context_json: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TaskExecutionResult {
pub success: bool,
pub context_json: Option<String>,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CloacinaMetadata {
pub workflow_name: String,
pub language: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub author: Option<String>,
#[serde(default)]
pub requires_python: Option<String>,
#[serde(default)]
pub entry_module: Option<String>,
#[serde(default)]
pub triggers: Vec<TriggerDefinition>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TriggerDefinition {
pub name: String,
pub workflow: String,
pub poll_interval: String,
#[serde(default)]
pub cron_expression: Option<String>,
#[serde(default)]
pub allow_concurrent: bool,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_metadata_serde_round_trip() {
let entry = TaskMetadataEntry {
index: 0,
id: "extract_data".to_string(),
namespaced_id_template: "{tenant}::{pkg}::pipeline::extract_data".to_string(),
dependencies: vec![],
description: "Extract data from sources".to_string(),
source_location: "src/lib.rs".to_string(),
};
let json = serde_json::to_string(&entry).unwrap();
let roundtrip: TaskMetadataEntry = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.id, "extract_data");
assert_eq!(roundtrip.index, 0);
}
#[test]
fn test_package_tasks_metadata_serde_round_trip() {
let metadata = PackageTasksMetadata {
workflow_name: "analytics_pipeline".to_string(),
package_name: "analytics-pkg".to_string(),
package_description: Some("Analytics workflow".to_string()),
package_author: Some("Team".to_string()),
workflow_fingerprint: Some("sha256:abc123".to_string()),
graph_data_json: None,
tasks: vec![TaskMetadataEntry {
index: 0,
id: "step_one".to_string(),
namespaced_id_template: "{tenant}::{pkg}::analytics::step_one".to_string(),
dependencies: vec![],
description: "First step".to_string(),
source_location: "src/lib.rs".to_string(),
}],
};
let json = serde_json::to_string(&metadata).unwrap();
let roundtrip: PackageTasksMetadata = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.workflow_name, "analytics_pipeline");
assert_eq!(roundtrip.tasks.len(), 1);
}
#[test]
fn test_task_execution_request_round_trip() {
let request = TaskExecutionRequest {
task_name: "extract_data".to_string(),
context_json: r#"{"key": "value"}"#.to_string(),
};
let json = serde_json::to_string(&request).unwrap();
let roundtrip: TaskExecutionRequest = serde_json::from_str(&json).unwrap();
assert_eq!(roundtrip.task_name, "extract_data");
}
#[test]
fn test_task_execution_result_success() {
let result = TaskExecutionResult {
success: true,
context_json: Some(r#"{"updated": true}"#.to_string()),
error: None,
};
let json = serde_json::to_string(&result).unwrap();
let roundtrip: TaskExecutionResult = serde_json::from_str(&json).unwrap();
assert!(roundtrip.success);
assert!(roundtrip.context_json.is_some());
assert!(roundtrip.error.is_none());
}
#[test]
fn test_task_execution_result_failure() {
let result = TaskExecutionResult {
success: false,
context_json: None,
error: Some("Task panicked".to_string()),
};
let json = serde_json::to_string(&result).unwrap();
let roundtrip: TaskExecutionResult = serde_json::from_str(&json).unwrap();
assert!(!roundtrip.success);
assert!(roundtrip.error.is_some());
}
#[test]
fn test_cloacina_metadata_rust_from_toml() {
let toml_str = r#"
workflow_name = "analytics_pipeline"
language = "rust"
description = "Data analytics workflow"
author = "Analytics Team"
[[triggers]]
name = "file_watcher"
workflow = "analytics_pipeline"
poll_interval = "5s"
allow_concurrent = false
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert_eq!(metadata.workflow_name, "analytics_pipeline");
assert_eq!(metadata.language, "rust");
assert_eq!(
metadata.description.as_deref(),
Some("Data analytics workflow")
);
assert!(metadata.requires_python.is_none());
assert!(metadata.entry_module.is_none());
assert_eq!(metadata.triggers.len(), 1);
assert_eq!(metadata.triggers[0].name, "file_watcher");
assert!(!metadata.triggers[0].allow_concurrent);
}
#[test]
fn test_cloacina_metadata_python_from_toml() {
let toml_str = r#"
workflow_name = "etl_pipeline"
language = "python"
description = "Python ETL workflow"
requires_python = ">=3.11"
entry_module = "workflow.tasks"
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert_eq!(metadata.workflow_name, "etl_pipeline");
assert_eq!(metadata.language, "python");
assert_eq!(metadata.requires_python.as_deref(), Some(">=3.11"));
assert_eq!(metadata.entry_module.as_deref(), Some("workflow.tasks"));
assert!(metadata.triggers.is_empty());
}
#[test]
fn test_cloacina_metadata_minimal_rust() {
let toml_str = r#"
workflow_name = "simple_workflow"
language = "rust"
"#;
let metadata: CloacinaMetadata = toml::from_str(toml_str).unwrap();
assert_eq!(metadata.workflow_name, "simple_workflow");
assert_eq!(metadata.language, "rust");
assert!(metadata.description.is_none());
assert!(metadata.triggers.is_empty());
}
#[test]
fn test_cloacina_metadata_missing_language_fails() {
let toml_str = r#"
workflow_name = "no_language"
"#;
let result = toml::from_str::<CloacinaMetadata>(toml_str);
assert!(result.is_err(), "Missing language field should fail");
}
}