car-workflow 0.26.0

Declarative multi-stage workflow orchestration for Common Agent Runtime
//! Builder for the external-item automation recipe.
//!
//! Packages the poll → dedup → per-item agent → deliver loop that a scheduled
//! automation needs: pull items from a source, skip ones already handled in a
//! prior run (persistent content-hash [`dedup`](crate::dedup)), fan an ephemeral
//! agent out over each new item, then optionally deliver the results.
//!
//! [`AutomationSpec::build`] lowers the spec to a plain [`Workflow`] of existing
//! stage steps, so the result runs, verifies, checkpoints, and compensates like
//! any hand-written workflow — there is no separate execution path.

use std::collections::HashMap;

use car_ir::ActionProposal;
use serde::{Deserialize, Serialize};

use crate::types::*;

/// State key the dedup stage writes its unseen subset to, and the per-item
/// fan-out reads from. Internal to the lowered workflow.
const UNSEEN_KEY: &str = "unseen_items";

/// Declarative spec for an external-item automation.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AutomationSpec {
    /// Workflow id.
    pub id: String,
    /// Human-readable name.
    pub name: String,
    /// Proposal that fetches items from the source and writes a JSON array to
    /// the state key named by `items_key` (e.g. a tool call that shells out to
    /// `gh pr list --json ...`). The proposal action must declare that key in
    /// its `expected_effects` for static verification to see it produced.
    pub poll: ActionProposal,
    /// State key the `poll` proposal writes the fetched array to.
    #[serde(default = "default_items_key")]
    pub items_key: String,
    /// Persistent dedup namespace — the seen-set shared across runs of this
    /// automation, isolated from other automations.
    pub dedup_store: String,
    /// Optional identity fields for the content hash (see
    /// [`DedupStep::hash_fields`]). E.g. `["number"]` for a PR so re-polling an
    /// edited PR doesn't reprocess it.
    #[serde(default, skip_serializing_if = "Vec::is_empty")]
    pub hash_fields: Vec<String>,
    /// TTL for dedup seen-set entries, in seconds (see [`DedupStep::ttl_secs`]).
    /// Defaults to 90 days so the packaged automation's seen-set stays bounded
    /// without configuration; pass an explicit `null` to dedup forever (an
    /// unbounded file). An item that aged past the TTL and reappears in the
    /// source is reprocessed.
    #[serde(default = "default_dedup_ttl_secs")]
    pub dedup_ttl_secs: Option<u64>,
    /// The pattern + agent(s) that process a single item. The item is templated
    /// into the task via `{{item}}` / `{{index}}` by the fan-out.
    pub worker: PatternStep,
    /// Max items processed concurrently (`0`/`1` = sequential).
    #[serde(default)]
    pub max_concurrent: usize,
    /// Optional proposal run after the per-item fan-out (e.g. post a digest to
    /// Slack/Telegram, or comment back on the source). It can read each item's
    /// result from `foreach.process.<i>.answer`.
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub deliver: Option<ActionProposal>,
}

fn default_items_key() -> String {
    "items".to_string()
}

/// Default dedup TTL for the packaged recipe: 90 days. Long enough that a source
/// item won't reappear within the window under normal churn, short enough to
/// bound the seen-set file.
fn default_dedup_ttl_secs() -> Option<u64> {
    Some(90 * 24 * 60 * 60)
}

impl AutomationSpec {
    /// Lower this spec to a runnable [`Workflow`].
    pub fn build(&self) -> Workflow {
        let mut stages = vec![
            simple_stage(
                "poll",
                "Poll source",
                StageStep::Proposal(ProposalStep {
                    proposal: self.poll.clone(),
                }),
            ),
            simple_stage(
                "dedup",
                "Deduplicate",
                StageStep::Dedup(DedupStep {
                    items_from: self.items_key.clone(),
                    into: UNSEEN_KEY.to_string(),
                    store: self.dedup_store.clone(),
                    hash_fields: self.hash_fields.clone(),
                    ttl_secs: self.dedup_ttl_secs,
                }),
            ),
            simple_stage(
                "process",
                "Process items",
                StageStep::ForEach(ForEachStep {
                    items_from: UNSEEN_KEY.to_string(),
                    body: Box::new(StageStep::Pattern(self.worker.clone())),
                    max_concurrent: self.max_concurrent,
                }),
            ),
        ];

        let mut edges = vec![
            unconditional_edge("poll", "dedup", "polled"),
            unconditional_edge("dedup", "process", "deduped"),
        ];

        if let Some(deliver) = &self.deliver {
            stages.push(simple_stage(
                "deliver",
                "Deliver results",
                StageStep::Proposal(ProposalStep {
                    proposal: deliver.clone(),
                }),
            ));
            edges.push(unconditional_edge("process", "deliver", "processed"));
        }

        Workflow {
            id: self.id.clone(),
            name: self.name.clone(),
            start: "poll".to_string(),
            goal: None,
            stages,
            edges,
            max_iterations: 100,
            metadata: HashMap::new(),
        }
    }
}

fn simple_stage(id: &str, name: &str, step: StageStep) -> Stage {
    Stage {
        id: id.to_string(),
        name: name.to_string(),
        step,
        compensation: None,
        timeout_ms: None,
        metadata: HashMap::new(),
    }
}

fn unconditional_edge(from: &str, to: &str, label: &str) -> Edge {
    Edge {
        from: from.to_string(),
        to: to.to_string(),
        conditions: vec![],
        label: label.to_string(),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use car_multi::AgentSpec;

    fn poll_proposal() -> ActionProposal {
        ActionProposal {
            id: "poll".into(),
            source: "test".into(),
            actions: vec![],
            timestamp: chrono::Utc::now(),
            context: HashMap::new(),
        }
    }

    fn worker() -> PatternStep {
        PatternStep {
            pattern: PatternKind::SwarmParallel,
            task: "Handle item {{item}}".into(),
            agents: vec![AgentSpec::new("worker", "Process the given item.")],
            config: HashMap::new(),
        }
    }

    #[test]
    fn builds_without_deliver() {
        let spec = AutomationSpec {
            id: "auto".into(),
            name: "Auto".into(),
            poll: poll_proposal(),
            items_key: "items".into(),
            dedup_store: "gh-prs".into(),
            hash_fields: vec!["number".into()],
            dedup_ttl_secs: Some(3600),
            worker: worker(),
            max_concurrent: 4,
            deliver: None,
        };
        let wf = spec.build();
        assert_eq!(wf.start, "poll");
        assert_eq!(wf.stages.len(), 3);
        assert_eq!(wf.edges.len(), 2);
        // The configured TTL is threaded into the dedup step.
        match &wf.stages.iter().find(|s| s.id == "dedup").unwrap().step {
            StageStep::Dedup(d) => assert_eq!(d.ttl_secs, Some(3600)),
            _ => panic!("expected dedup step"),
        }
        // The lowered workflow passes static verification.
        let report = crate::verify_workflow(&wf);
        assert!(report.valid, "issues: {:?}", report.issues);
    }

    #[test]
    fn ttl_defaults_to_90_days_when_absent_and_disables_on_null() {
        // Absent key → bounded-by-default 90-day TTL.
        let json = r#"{
            "id": "a", "name": "A",
            "poll": {"id":"p","source":"t","actions":[],"timestamp":"2026-01-01T00:00:00Z","context":{}},
            "dedup_store": "s",
            "worker": {"pattern":"swarm_parallel","task":"x {{item}}","agents":[{"name":"w","system_prompt":"p","tools":[],"max_turns":5,"metadata":{}}],"config":{}}
        }"#;
        let spec: AutomationSpec = serde_json::from_str(json).unwrap();
        assert_eq!(spec.dedup_ttl_secs, Some(90 * 24 * 60 * 60));

        // Explicit null → dedup forever.
        let spec2: AutomationSpec =
            serde_json::from_str(&json.replace(r#""dedup_store": "s","#, r#""dedup_store": "s", "dedup_ttl_secs": null,"#)).unwrap();
        assert_eq!(spec2.dedup_ttl_secs, None);
    }

    #[test]
    fn builds_with_deliver() {
        let spec = AutomationSpec {
            id: "auto".into(),
            name: "Auto".into(),
            poll: poll_proposal(),
            items_key: "items".into(),
            dedup_store: "gh-prs".into(),
            hash_fields: vec![],
            dedup_ttl_secs: None,
            worker: worker(),
            max_concurrent: 0,
            deliver: Some(poll_proposal()),
        };
        let wf = spec.build();
        assert_eq!(wf.stages.len(), 4);
        assert_eq!(wf.edges.len(), 3);
        assert!(wf.stages.iter().any(|s| s.id == "deliver"));
        // dedup feeds the fan-out via the internal unseen key.
        let dedup = wf.stages.iter().find(|s| s.id == "dedup").unwrap();
        match &dedup.step {
            StageStep::Dedup(d) => assert_eq!(d.into, UNSEEN_KEY),
            _ => panic!("expected dedup step"),
        }
        let process = wf.stages.iter().find(|s| s.id == "process").unwrap();
        match &process.step {
            StageStep::ForEach(fe) => assert_eq!(fe.items_from, UNSEEN_KEY),
            _ => panic!("expected foreach step"),
        }
    }
}