use car_multi::{AgentRunner, SharedInfra};
use car_workflow::{CheckpointStore, PausedWorkflow, Workflow, WorkflowEngine};
use std::collections::HashMap;
use std::sync::Arc;
pub fn list_paused_workflows(runs_dir: &str) -> Result<String, String> {
let store = CheckpointStore::open(runs_dir).map_err(|e| e.to_string())?;
let summaries = store.list_summaries().map_err(|e| e.to_string())?;
serde_json::to_string(&summaries).map_err(|e| e.to_string())
}
pub async fn run_workflow(
workflow_json: &str,
initial_state: Option<HashMap<String, serde_json::Value>>,
runner: Arc<dyn AgentRunner>,
) -> Result<String, String> {
let workflow: Workflow =
serde_json::from_str(workflow_json).map_err(|e| format!("invalid workflow JSON: {}", e))?;
let infra = SharedInfra::new();
let engine = WorkflowEngine::new(runner, infra);
let result = engine
.run_with_state(&workflow, initial_state.unwrap_or_default())
.await
.map_err(|e| format!("workflow error: {}", e))?;
serde_json::to_string(&result).map_err(|e| e.to_string())
}
pub async fn chain_workflows(
workflows_json: &str,
initial_state: Option<HashMap<String, serde_json::Value>>,
runner: Arc<dyn AgentRunner>,
) -> Result<String, String> {
let workflows: Vec<Workflow> = serde_json::from_str(workflows_json)
.map_err(|e| format!("invalid workflows JSON (expected an array): {}", e))?;
if workflows.is_empty() {
return Err("workflow chain requires at least one workflow".to_string());
}
for (i, workflow) in workflows.iter().enumerate() {
let report = car_workflow::verify_workflow(workflow);
if !report.valid {
let errors: Vec<String> = report
.issues
.iter()
.filter(|issue| issue.severity == "error")
.map(|issue| issue.message.clone())
.collect();
return Err(format!(
"workflow at index {i} ('{}') failed static verification: {}",
workflow.id,
errors.join("; ")
));
}
}
let base = initial_state.unwrap_or_default();
let mut carry = base.clone();
let mut results: Vec<car_workflow::WorkflowResult> = Vec::with_capacity(workflows.len());
let mut status = car_workflow::WorkflowStatus::Completed;
let mut paused_at_index: Option<usize> = None;
let mut chain_error: Option<(usize, String)> = None;
let infra = SharedInfra::new();
let engine = WorkflowEngine::new(runner, infra);
for (i, workflow) in workflows.iter().enumerate() {
let result = match engine.run_with_state(workflow, carry.clone()).await {
Ok(r) => r,
Err(e) => {
status = car_workflow::WorkflowStatus::Failed;
chain_error = Some((i, format!("workflow '{}': {}", workflow.id, e)));
break;
}
};
let done = result.status == car_workflow::WorkflowStatus::Completed;
if !done {
status = result.status.clone();
if result.is_paused() {
paused_at_index = Some(i);
}
results.push(result);
break;
}
carry = base.clone();
for (k, v) in &result.final_state {
carry.insert(k.clone(), v.clone());
}
results.push(result);
}
let mut out = serde_json::json!({
"results": results,
"status": status,
});
if let Some(idx) = paused_at_index {
out["paused_at_index"] = serde_json::json!(idx);
}
if let Some((idx, error)) = chain_error {
out["failed_at_index"] = serde_json::json!(idx);
out["error"] = serde_json::json!(error);
}
serde_json::to_string(&out).map_err(|e| e.to_string())
}
pub async fn resume_workflow(
paused_json: &str,
input_json: &str,
runner: Arc<dyn AgentRunner>,
) -> Result<String, String> {
let paused: PausedWorkflow = serde_json::from_str(paused_json)
.map_err(|e| format!("invalid paused checkpoint JSON: {}", e))?;
let input: HashMap<String, serde_json::Value> = serde_json::from_str(input_json)
.map_err(|e| format!("invalid approval input JSON: {}", e))?;
let infra = SharedInfra::new();
let engine = WorkflowEngine::new(runner, infra);
let result = engine
.resume(paused, input)
.await
.map_err(|e| format!("workflow resume error: {}", e))?;
serde_json::to_string(&result).map_err(|e| e.to_string())
}
pub fn build_automation_workflow(spec_json: &str) -> Result<String, String> {
let spec: car_workflow::AutomationSpec = serde_json::from_str(spec_json)
.map_err(|e| format!("invalid automation spec JSON: {}", e))?;
serde_json::to_string(&spec.build()).map_err(|e| e.to_string())
}
pub fn verify_workflow(workflow_json: &str) -> Result<String, String> {
let workflow: Workflow =
serde_json::from_str(workflow_json).map_err(|e| format!("invalid workflow JSON: {}", e))?;
let report = car_workflow::verify_workflow(&workflow);
let json = serde_json::json!({
"valid": report.valid,
"has_cycles": report.has_cycles,
"reachable_stages": report.reachable_stages,
"unreachable_stages": report.unreachable_stages,
"issues": report.issues.iter().map(|i| format!("{:?}", i)).collect::<Vec<_>>(),
"semantic": car_workflow::semantic_issues(&workflow),
});
Ok(json.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::{json, Value};
struct NoopRunner;
#[async_trait::async_trait]
impl AgentRunner for NoopRunner {
async fn run(
&self,
_spec: &car_multi::AgentSpec,
_task: &str,
_runtime: &car_engine::Runtime,
_mailbox: &car_multi::Mailbox,
) -> Result<car_multi::AgentOutput, car_multi::MultiError> {
Err(car_multi::MultiError::NoOutput)
}
}
fn runner() -> Arc<dyn AgentRunner> {
Arc::new(NoopRunner)
}
fn writer_workflow(id: &str, key: &str, value: Value) -> Value {
json!({
"id": id,
"name": id,
"start": "write",
"stages": [{
"id": "write",
"name": "write",
"step": {
"type": "proposal",
"proposal": {
"id": format!("p-{id}"),
"source": "test",
"actions": [{
"id": "w1",
"type": "state_write",
"parameters": { "key": key, "value": value }
}],
"timestamp": "2026-01-01T00:00:00Z",
"context": {}
}
}
}],
"edges": []
})
}
fn branching_workflow() -> Value {
let noop_stage = |id: &str| {
json!({
"id": id,
"name": id,
"step": {
"type": "proposal",
"proposal": {
"id": format!("p-{id}"),
"source": "test",
"actions": [],
"timestamp": "2026-01-01T00:00:00Z",
"context": {}
}
}
})
};
json!({
"id": "wf-b",
"name": "WF B",
"start": "check",
"stages": [noop_stage("check"), noop_stage("yes"), noop_stage("no")],
"edges": [
{
"from": "check",
"to": "yes",
"conditions": [{
"key": "handoff",
"operator": "eq",
"value": "from-a",
"description": ""
}]
},
{ "from": "check", "to": "no" }
]
})
}
fn ran_stages(result: &Value) -> Vec<String> {
result["stages"]
.as_array()
.unwrap()
.iter()
.map(|s| s["stage_id"].as_str().unwrap().to_string())
.collect()
}
#[tokio::test]
async fn run_workflow_seeds_initial_state() {
let wf = branching_workflow();
let mut seed = HashMap::new();
seed.insert("handoff".to_string(), json!("from-a"));
let out = run_workflow(&wf.to_string(), Some(seed), runner())
.await
.unwrap();
let result: Value = serde_json::from_str(&out).unwrap();
assert_eq!(result["status"], "completed");
assert!(ran_stages(&result).contains(&"yes".to_string()));
let out = run_workflow(&wf.to_string(), None, runner()).await.unwrap();
let result: Value = serde_json::from_str(&out).unwrap();
assert!(ran_stages(&result).contains(&"no".to_string()));
}
#[tokio::test]
async fn chain_threads_final_state_between_workflows() {
let workflows = json!([
writer_workflow("wf-a", "handoff", json!("from-a")),
branching_workflow(),
]);
let out = chain_workflows(&workflows.to_string(), None, runner())
.await
.unwrap();
let chained: Value = serde_json::from_str(&out).unwrap();
assert_eq!(chained["status"], "completed");
assert!(chained.get("paused_at_index").is_none());
let results = chained["results"].as_array().unwrap();
assert_eq!(results.len(), 2);
assert!(
ran_stages(&results[1]).contains(&"yes".to_string()),
"A's final_state routed B down the yes branch"
);
}
#[tokio::test]
async fn chain_stops_at_paused_intermediate() {
let gate = json!({
"id": "wf-gate",
"name": "gated",
"start": "gate",
"stages": [{
"id": "gate",
"name": "gate",
"step": { "type": "approval", "prompt": "ok?", "output_key": "approval" }
}],
"edges": []
});
let workflows = json!([gate, writer_workflow("wf-a", "x", json!(1))]);
let out = chain_workflows(&workflows.to_string(), None, runner())
.await
.unwrap();
let chained: Value = serde_json::from_str(&out).unwrap();
assert_eq!(chained["status"], "paused");
assert_eq!(chained["paused_at_index"], 0);
let results = chained["results"].as_array().unwrap();
assert_eq!(results.len(), 1, "chain stops at the paused workflow");
assert!(results[0]["paused"].is_object(), "checkpoint is carried");
}
fn deliver_workflow(id: &str) -> Value {
json!({
"id": id,
"name": id,
"start": "deliver",
"stages": [{
"id": "deliver",
"name": "deliver",
"step": {
"type": "deliver",
"sinks": [{
"id": "sink-ok",
"source": "test",
"actions": [{
"id": "d1",
"type": "state_write",
"parameters": { "key": "notify", "value": "sent" }
}],
"timestamp": "2026-01-01T00:00:00Z",
"context": {}
}]
}
}],
"edges": []
})
}
fn cycle_limited_workflow() -> Value {
let noop_stage = |id: &str| {
json!({
"id": id,
"name": id,
"step": {
"type": "proposal",
"proposal": {
"id": format!("p-{id}"),
"source": "test",
"actions": [],
"timestamp": "2026-01-01T00:00:00Z",
"context": {}
}
}
})
};
json!({
"id": "wf-cycle",
"name": "cyclic",
"start": "a",
"stages": [noop_stage("a"), noop_stage("b")],
"edges": [
{ "from": "a", "to": "b" },
{ "from": "b", "to": "a" }
],
"max_iterations": 2
})
}
#[tokio::test]
async fn chain_preserves_results_on_mid_chain_runtime_error() {
let workflows = json!([deliver_workflow("wf-a"), cycle_limited_workflow()]);
let out = chain_workflows(&workflows.to_string(), None, runner())
.await
.expect("mid-chain runtime error must not become a call-level Err");
let chained: Value = serde_json::from_str(&out).unwrap();
assert_eq!(chained["status"], "failed");
assert_eq!(chained["failed_at_index"], 1);
let err = chained["error"].as_str().unwrap();
assert!(err.contains("wf-cycle"), "error names the workflow: {err}");
assert!(
err.to_lowercase().contains("cycle") || err.contains("iteration"),
"error carries the engine cause: {err}"
);
let results = chained["results"].as_array().unwrap();
assert_eq!(results.len(), 1, "only the workflows that ran are recorded");
assert_eq!(results[0]["status"], "completed");
let output = &results[0]["stages"][0]["output"];
assert_eq!(output["type"], "deliver");
assert_eq!(output["results"][0]["sink_id"], "sink-ok");
assert_eq!(output["results"][0]["ok"], true);
}
#[tokio::test]
async fn chain_pre_validates_every_workflow_before_running_any() {
let broken = json!({
"id": "wf-broken",
"name": "broken",
"start": "missing",
"stages": [],
"edges": []
});
let workflows = json!([deliver_workflow("wf-a"), broken]);
let err = chain_workflows(&workflows.to_string(), None, runner())
.await
.expect_err("structurally invalid workflow must reject the chain up front");
assert!(err.contains("index 1"), "err: {err}");
assert!(err.contains("static verification"), "err: {err}");
assert!(err.contains("wf-broken"), "err: {err}");
}
}