use std::collections::HashMap;
use car_ir::ActionProposal;
use serde::{Deserialize, Serialize};
use crate::types::*;
const UNSEEN_KEY: &str = "unseen_items";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutomationSpec {
pub id: String,
pub name: String,
pub poll: ActionProposal,
#[serde(default = "default_items_key")]
pub items_key: String,
pub dedup_store: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub hash_fields: Vec<String>,
#[serde(default = "default_dedup_ttl_secs")]
pub dedup_ttl_secs: Option<u64>,
pub worker: PatternStep,
#[serde(default)]
pub max_concurrent: usize,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub deliver: Option<ActionProposal>,
}
fn default_items_key() -> String {
"items".to_string()
}
fn default_dedup_ttl_secs() -> Option<u64> {
Some(90 * 24 * 60 * 60)
}
impl AutomationSpec {
pub fn build(&self) -> Workflow {
let mut stages = vec![
simple_stage(
"poll",
"Poll source",
StageStep::Proposal(ProposalStep {
proposal: self.poll.clone(),
}),
),
simple_stage(
"dedup",
"Deduplicate",
StageStep::Dedup(DedupStep {
items_from: self.items_key.clone(),
into: UNSEEN_KEY.to_string(),
store: self.dedup_store.clone(),
hash_fields: self.hash_fields.clone(),
ttl_secs: self.dedup_ttl_secs,
}),
),
simple_stage(
"process",
"Process items",
StageStep::ForEach(ForEachStep {
items_from: UNSEEN_KEY.to_string(),
body: Box::new(StageStep::Pattern(self.worker.clone())),
max_concurrent: self.max_concurrent,
}),
),
];
let mut edges = vec![
unconditional_edge("poll", "dedup", "polled"),
unconditional_edge("dedup", "process", "deduped"),
];
if let Some(deliver) = &self.deliver {
stages.push(simple_stage(
"deliver",
"Deliver results",
StageStep::Proposal(ProposalStep {
proposal: deliver.clone(),
}),
));
edges.push(unconditional_edge("process", "deliver", "processed"));
}
Workflow {
id: self.id.clone(),
name: self.name.clone(),
start: "poll".to_string(),
goal: None,
stages,
edges,
max_iterations: 100,
metadata: HashMap::new(),
}
}
}
fn simple_stage(id: &str, name: &str, step: StageStep) -> Stage {
Stage {
id: id.to_string(),
name: name.to_string(),
step,
compensation: None,
timeout_ms: None,
metadata: HashMap::new(),
}
}
fn unconditional_edge(from: &str, to: &str, label: &str) -> Edge {
Edge {
from: from.to_string(),
to: to.to_string(),
conditions: vec![],
label: label.to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use car_multi::AgentSpec;
fn poll_proposal() -> ActionProposal {
ActionProposal {
id: "poll".into(),
source: "test".into(),
actions: vec![],
timestamp: chrono::Utc::now(),
context: HashMap::new(),
}
}
fn worker() -> PatternStep {
PatternStep {
pattern: PatternKind::SwarmParallel,
task: "Handle item {{item}}".into(),
agents: vec![AgentSpec::new("worker", "Process the given item.")],
config: HashMap::new(),
}
}
#[test]
fn builds_without_deliver() {
let spec = AutomationSpec {
id: "auto".into(),
name: "Auto".into(),
poll: poll_proposal(),
items_key: "items".into(),
dedup_store: "gh-prs".into(),
hash_fields: vec!["number".into()],
dedup_ttl_secs: Some(3600),
worker: worker(),
max_concurrent: 4,
deliver: None,
};
let wf = spec.build();
assert_eq!(wf.start, "poll");
assert_eq!(wf.stages.len(), 3);
assert_eq!(wf.edges.len(), 2);
match &wf.stages.iter().find(|s| s.id == "dedup").unwrap().step {
StageStep::Dedup(d) => assert_eq!(d.ttl_secs, Some(3600)),
_ => panic!("expected dedup step"),
}
let report = crate::verify_workflow(&wf);
assert!(report.valid, "issues: {:?}", report.issues);
}
#[test]
fn ttl_defaults_to_90_days_when_absent_and_disables_on_null() {
let json = r#"{
"id": "a", "name": "A",
"poll": {"id":"p","source":"t","actions":[],"timestamp":"2026-01-01T00:00:00Z","context":{}},
"dedup_store": "s",
"worker": {"pattern":"swarm_parallel","task":"x {{item}}","agents":[{"name":"w","system_prompt":"p","tools":[],"max_turns":5,"metadata":{}}],"config":{}}
}"#;
let spec: AutomationSpec = serde_json::from_str(json).unwrap();
assert_eq!(spec.dedup_ttl_secs, Some(90 * 24 * 60 * 60));
let spec2: AutomationSpec =
serde_json::from_str(&json.replace(r#""dedup_store": "s","#, r#""dedup_store": "s", "dedup_ttl_secs": null,"#)).unwrap();
assert_eq!(spec2.dedup_ttl_secs, None);
}
#[test]
fn builds_with_deliver() {
let spec = AutomationSpec {
id: "auto".into(),
name: "Auto".into(),
poll: poll_proposal(),
items_key: "items".into(),
dedup_store: "gh-prs".into(),
hash_fields: vec![],
dedup_ttl_secs: None,
worker: worker(),
max_concurrent: 0,
deliver: Some(poll_proposal()),
};
let wf = spec.build();
assert_eq!(wf.stages.len(), 4);
assert_eq!(wf.edges.len(), 3);
assert!(wf.stages.iter().any(|s| s.id == "deliver"));
let dedup = wf.stages.iter().find(|s| s.id == "dedup").unwrap();
match &dedup.step {
StageStep::Dedup(d) => assert_eq!(d.into, UNSEEN_KEY),
_ => panic!("expected dedup step"),
}
let process = wf.stages.iter().find(|s| s.id == "process").unwrap();
match &process.step {
StageStep::ForEach(fe) => assert_eq!(fe.items_from, UNSEEN_KEY),
_ => panic!("expected foreach step"),
}
}
}