Skip to main content

agentctl/workflow/
run.rs

1use super::schema::{
2    AutomationStep, ProviderStep, WorkflowDocument, WorkflowOnError, WorkflowSchemaError,
3    WorkflowStep,
4};
5use super::steps::automation::{AutomationCommandProvenance, resolve_automation_invocation};
6use crate::provider::registry::ProviderRegistry;
7use agent_runtime_core::schema::{ExecuteRequest, ProviderError};
8use clap::{Args, ValueEnum};
9use serde::Serialize;
10use std::fs;
11use std::io::{self, Read};
12use std::path::{Path, PathBuf};
13use std::process::{Command, Stdio};
14use std::thread;
15use std::time::{Duration, Instant};
16
17const EXIT_OK: i32 = 0;
18const EXIT_RUNTIME_ERROR: i32 = 1;
19const EXIT_USAGE: i32 = 64;
20const TIMEOUT_EXIT_CODE: i32 = 124;
21const WORKFLOW_RUN_SCHEMA_VERSION: &str = "agentctl.workflow.run.v1";
22const WORKFLOW_ARTIFACT_NAMESPACE: &str = "agentctl-workflow";
23
24#[derive(Debug, Args)]
25pub struct RunArgs {
26    /// Path to workflow manifest JSON file
27    #[arg(long)]
28    pub file: PathBuf,
29
30    /// Render format
31    #[arg(long, value_enum, default_value_t = RunOutputFormat::Json)]
32    pub format: RunOutputFormat,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Default)]
36pub enum RunOutputFormat {
37    Text,
38    #[default]
39    Json,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct WorkflowRunReport {
44    pub schema_version: &'static str,
45    pub command: &'static str,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub workflow_name: Option<String>,
48    pub on_error: WorkflowOnError,
49    pub summary: WorkflowRunSummary,
50    pub ledger: Vec<StepLedgerEntry>,
51}
52
53#[derive(Debug, Clone, Serialize)]
54pub struct WorkflowRunSummary {
55    pub total_steps: usize,
56    pub executed_steps: usize,
57    pub succeeded_steps: usize,
58    pub failed_steps: usize,
59    pub skipped_steps: usize,
60    pub elapsed_ms: u64,
61    pub success: bool,
62}
63
64#[derive(Debug, Clone, Serialize)]
65pub struct StepLedgerEntry {
66    pub step_id: String,
67    pub step_type: &'static str,
68    #[serde(skip_serializing_if = "Option::is_none")]
69    pub provider: Option<String>,
70    #[serde(skip_serializing_if = "Option::is_none")]
71    pub automation_tool: Option<String>,
72    #[serde(skip_serializing_if = "Option::is_none")]
73    pub command: Option<AutomationCommandProvenance>,
74    #[serde(default, skip_serializing_if = "Vec::is_empty")]
75    pub artifact_paths: Vec<String>,
76    pub attempts: u32,
77    pub status: StepStatus,
78    pub exit_code: i32,
79    pub stdout: String,
80    pub stderr: String,
81    pub elapsed_ms: u64,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
85#[serde(rename_all = "kebab-case")]
86pub enum StepStatus {
87    Succeeded,
88    Failed,
89}
90
91impl StepStatus {
92    fn is_failed(self) -> bool {
93        matches!(self, Self::Failed)
94    }
95}
96
97#[derive(Debug)]
98pub enum WorkflowLoadError {
99    Read {
100        path: PathBuf,
101        source: io::Error,
102    },
103    Parse {
104        path: PathBuf,
105        source: serde_json::Error,
106    },
107    Schema(WorkflowSchemaError),
108}
109
110impl std::fmt::Display for WorkflowLoadError {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            Self::Read { path, source } => {
114                write!(
115                    f,
116                    "failed to read workflow file '{}': {}",
117                    path.display(),
118                    source
119                )
120            }
121            Self::Parse { path, source } => write!(
122                f,
123                "failed to parse workflow file '{}' as JSON: {}",
124                path.display(),
125                source
126            ),
127            Self::Schema(error) => write!(f, "invalid workflow schema: {error}"),
128        }
129    }
130}
131
132impl std::error::Error for WorkflowLoadError {}
133
134pub fn run(args: RunArgs) -> i32 {
135    let workflow = match load_workflow_file(&args.file) {
136        Ok(workflow) => workflow,
137        Err(error) => {
138            eprintln!("agentctl workflow run: {error}");
139            return EXIT_USAGE;
140        }
141    };
142
143    let report = execute_workflow_document(&workflow);
144    let render_exit = match args.format {
145        RunOutputFormat::Json => emit_json(&report),
146        RunOutputFormat::Text => emit_text(&report),
147    };
148    if render_exit != EXIT_OK {
149        return EXIT_RUNTIME_ERROR;
150    }
151
152    if report.summary.success {
153        EXIT_OK
154    } else {
155        EXIT_RUNTIME_ERROR
156    }
157}
158
159pub fn load_workflow_file(path: &Path) -> Result<WorkflowDocument, WorkflowLoadError> {
160    let raw = fs::read_to_string(path).map_err(|source| WorkflowLoadError::Read {
161        path: path.to_path_buf(),
162        source,
163    })?;
164    let workflow: WorkflowDocument =
165        serde_json::from_str(raw.as_str()).map_err(|source| WorkflowLoadError::Parse {
166            path: path.to_path_buf(),
167            source,
168        })?;
169    workflow.validate().map_err(WorkflowLoadError::Schema)?;
170    Ok(workflow)
171}
172
173pub fn execute_workflow_document(workflow: &WorkflowDocument) -> WorkflowRunReport {
174    let started = Instant::now();
175    let registry = ProviderRegistry::with_builtins();
176    let mut ledger = Vec::with_capacity(workflow.steps.len());
177
178    for step in &workflow.steps {
179        let entry = execute_step(step, &registry);
180        let failed = entry.status.is_failed();
181        ledger.push(entry);
182
183        if failed && workflow.on_error == WorkflowOnError::FailFast {
184            break;
185        }
186    }
187
188    let succeeded_steps = ledger
189        .iter()
190        .filter(|entry| entry.status == StepStatus::Succeeded)
191        .count();
192    let failed_steps = ledger.len().saturating_sub(succeeded_steps);
193    let total_steps = workflow.steps.len();
194    let executed_steps = ledger.len();
195    let skipped_steps = total_steps.saturating_sub(executed_steps);
196
197    WorkflowRunReport {
198        schema_version: WORKFLOW_RUN_SCHEMA_VERSION,
199        command: "workflow-run",
200        workflow_name: workflow.name.clone(),
201        on_error: workflow.on_error,
202        summary: WorkflowRunSummary {
203            total_steps,
204            executed_steps,
205            succeeded_steps,
206            failed_steps,
207            skipped_steps,
208            elapsed_ms: as_millis(started.elapsed()),
209            success: failed_steps == 0,
210        },
211        ledger,
212    }
213}
214
215fn execute_step(step: &WorkflowStep, registry: &ProviderRegistry) -> StepLedgerEntry {
216    let retry = step.retry();
217    let max_attempts = retry.normalized_max_attempts();
218    let started = Instant::now();
219
220    let mut attempts = 0;
221    let mut last = AttemptOutcome::failed(
222        EXIT_RUNTIME_ERROR,
223        String::new(),
224        "workflow step did not execute".to_string(),
225    );
226    for attempt in 1..=max_attempts {
227        attempts = attempt;
228        last = match step {
229            WorkflowStep::Provider(provider_step) => execute_provider_step(provider_step, registry),
230            WorkflowStep::Automation(automation_step) => {
231                execute_automation_step(step.id(), attempt, automation_step)
232            }
233        };
234        if last.status == StepStatus::Succeeded {
235            break;
236        }
237
238        if attempt < max_attempts && retry.backoff_ms > 0 {
239            thread::sleep(Duration::from_millis(retry.backoff_ms));
240        }
241    }
242
243    let (step_type, provider, automation_tool) = match step {
244        WorkflowStep::Provider(_) => ("provider", last.provider.clone(), None),
245        WorkflowStep::Automation(_) => ("automation", None, last.automation_tool.clone()),
246    };
247
248    StepLedgerEntry {
249        step_id: step.id().to_string(),
250        step_type,
251        provider,
252        automation_tool,
253        command: last.command,
254        artifact_paths: last.artifact_paths,
255        attempts,
256        status: last.status,
257        exit_code: last.exit_code,
258        stdout: last.stdout,
259        stderr: last.stderr,
260        elapsed_ms: as_millis(started.elapsed()),
261    }
262}
263
264fn execute_provider_step(step: &ProviderStep, registry: &ProviderRegistry) -> AttemptOutcome {
265    let selection = match registry.resolve_selection(step.provider.as_deref()) {
266        Ok(selection) => selection,
267        Err(error) => {
268            return AttemptOutcome {
269                status: StepStatus::Failed,
270                exit_code: EXIT_USAGE,
271                stdout: String::new(),
272                stderr: error.to_string(),
273                provider: step.provider.clone(),
274                automation_tool: None,
275                command: None,
276                artifact_paths: Vec::new(),
277            };
278        }
279    };
280
281    let provider_id = selection.provider_id;
282    let Some(adapter) = registry.get(provider_id.as_str()) else {
283        return AttemptOutcome {
284            status: StepStatus::Failed,
285            exit_code: EXIT_RUNTIME_ERROR,
286            stdout: String::new(),
287            stderr: format!("selected provider '{}' is not registered", provider_id),
288            provider: Some(provider_id),
289            automation_tool: None,
290            command: None,
291            artifact_paths: Vec::new(),
292        };
293    };
294
295    let request = ExecuteRequest {
296        task: step.task.clone(),
297        input: step.input.clone(),
298        timeout_ms: step.timeout_ms,
299    };
300    match adapter.execute(request) {
301        Ok(response) => AttemptOutcome {
302            status: if response.exit_code == 0 {
303                StepStatus::Succeeded
304            } else {
305                StepStatus::Failed
306            },
307            exit_code: response.exit_code,
308            stdout: response.stdout,
309            stderr: response.stderr,
310            provider: Some(provider_id),
311            automation_tool: None,
312            command: None,
313            artifact_paths: Vec::new(),
314        },
315        Err(error) => provider_error_outcome(provider_id, error.as_ref()),
316    }
317}
318
319fn provider_error_outcome(provider_id: String, error: &ProviderError) -> AttemptOutcome {
320    let mut stderr = error.message.clone();
321    if let Some(extra_stderr) = error
322        .details
323        .as_ref()
324        .and_then(|details| details.get("stderr"))
325        .and_then(|value| value.as_str())
326        .map(str::trim)
327        .filter(|value| !value.is_empty())
328    {
329        stderr.push('\n');
330        stderr.push_str(extra_stderr);
331    }
332
333    let stdout = error
334        .details
335        .as_ref()
336        .and_then(|details| details.get("stdout"))
337        .and_then(|value| value.as_str())
338        .map(ToOwned::to_owned)
339        .unwrap_or_default();
340
341    let exit_code = error
342        .details
343        .as_ref()
344        .and_then(|details| details.get("exit_code"))
345        .and_then(|value| value.as_i64())
346        .and_then(|value| i32::try_from(value).ok())
347        .unwrap_or(EXIT_RUNTIME_ERROR);
348
349    AttemptOutcome {
350        status: StepStatus::Failed,
351        exit_code,
352        stdout,
353        stderr,
354        provider: Some(provider_id),
355        automation_tool: None,
356        command: None,
357        artifact_paths: Vec::new(),
358    }
359}
360
361fn execute_automation_step(step_id: &str, attempt: u32, step: &AutomationStep) -> AttemptOutcome {
362    let invocation = resolve_automation_invocation(step);
363    let mut outcome = match run_command(
364        invocation.command.as_str(),
365        invocation.args.as_slice(),
366        invocation.env.as_slice(),
367        step.timeout_ms,
368    ) {
369        Ok(output) => AttemptOutcome {
370            status: if output.exit_code == 0 {
371                StepStatus::Succeeded
372            } else {
373                StepStatus::Failed
374            },
375            exit_code: output.exit_code,
376            stdout: output.stdout,
377            stderr: output.stderr,
378            provider: None,
379            automation_tool: Some(step.tool.as_id().to_string()),
380            command: Some(invocation.provenance),
381            artifact_paths: Vec::new(),
382        },
383        Err(error) => AttemptOutcome {
384            status: StepStatus::Failed,
385            exit_code: EXIT_RUNTIME_ERROR,
386            stdout: String::new(),
387            stderr: format!(
388                "failed to run automation tool '{}': {}",
389                step.tool.as_id(),
390                error
391            ),
392            provider: None,
393            automation_tool: Some(step.tool.as_id().to_string()),
394            command: Some(invocation.provenance),
395            artifact_paths: Vec::new(),
396        },
397    };
398
399    match persist_automation_artifacts(step_id, attempt, &outcome.stdout, &outcome.stderr) {
400        Ok(artifact_paths) => {
401            outcome.artifact_paths = artifact_paths;
402        }
403        Err(error) => {
404            if !outcome.stderr.is_empty() && !outcome.stderr.ends_with('\n') {
405                outcome.stderr.push('\n');
406            }
407            outcome
408                .stderr
409                .push_str(format!("failed to persist step artifacts: {error}").as_str());
410        }
411    }
412
413    outcome
414}
415
416struct AttemptOutcome {
417    status: StepStatus,
418    exit_code: i32,
419    stdout: String,
420    stderr: String,
421    provider: Option<String>,
422    automation_tool: Option<String>,
423    command: Option<AutomationCommandProvenance>,
424    artifact_paths: Vec<String>,
425}
426
427impl AttemptOutcome {
428    fn failed(exit_code: i32, stdout: String, stderr: String) -> Self {
429        Self {
430            status: StepStatus::Failed,
431            exit_code,
432            stdout,
433            stderr,
434            provider: None,
435            automation_tool: None,
436            command: None,
437            artifact_paths: Vec::new(),
438        }
439    }
440}
441
442struct CommandOutput {
443    exit_code: i32,
444    stdout: String,
445    stderr: String,
446}
447
448fn run_command(
449    command: &str,
450    args: &[String],
451    env: &[(String, String)],
452    timeout_ms: Option<u64>,
453) -> io::Result<CommandOutput> {
454    let mut cmd = Command::new(command);
455    cmd.args(args);
456    for (key, value) in env {
457        cmd.env(key, value);
458    }
459
460    let mut child = cmd
461        .stdin(Stdio::null())
462        .stdout(Stdio::piped())
463        .stderr(Stdio::piped())
464        .spawn()?;
465
466    let stdout_reader = child.stdout.take().map(spawn_pipe_reader);
467    let stderr_reader = child.stderr.take().map(spawn_pipe_reader);
468
469    let started = Instant::now();
470    let timeout = timeout_ms.map(Duration::from_millis);
471    let mut timed_out = false;
472
473    let status = loop {
474        if let Some(status) = child.try_wait()? {
475            break status;
476        }
477
478        if let Some(timeout) = timeout
479            && started.elapsed() >= timeout
480        {
481            timed_out = true;
482            let _ = child.kill();
483            break child.wait()?;
484        }
485
486        thread::sleep(Duration::from_millis(5));
487    };
488
489    let stdout = pipe_reader_output(stdout_reader);
490    let stderr = pipe_reader_output(stderr_reader);
491    let mut stderr_text = String::from_utf8_lossy(stderr.as_slice()).to_string();
492
493    let exit_code = if timed_out {
494        if !stderr_text.is_empty() && !stderr_text.ends_with('\n') {
495            stderr_text.push('\n');
496        }
497        stderr_text.push_str("step timed out");
498        if let Some(timeout_ms) = timeout_ms {
499            stderr_text.push_str(format!(" after {}ms", timeout_ms).as_str());
500        }
501        TIMEOUT_EXIT_CODE
502    } else {
503        status.code().unwrap_or(EXIT_RUNTIME_ERROR)
504    };
505
506    Ok(CommandOutput {
507        exit_code,
508        stdout: String::from_utf8_lossy(stdout.as_slice()).to_string(),
509        stderr: stderr_text,
510    })
511}
512
513fn persist_automation_artifacts(
514    step_id: &str,
515    attempt: u32,
516    stdout: &str,
517    stderr: &str,
518) -> io::Result<Vec<String>> {
519    let artifact_dir = workflow_artifact_dir(step_id, attempt);
520    fs::create_dir_all(&artifact_dir)?;
521
522    let stdout_path = artifact_dir.join("stdout.log");
523    let stderr_path = artifact_dir.join("stderr.log");
524    fs::write(&stdout_path, stdout.as_bytes())?;
525    fs::write(&stderr_path, stderr.as_bytes())?;
526
527    Ok(vec![
528        path_to_string(stdout_path.as_path()),
529        path_to_string(stderr_path.as_path()),
530    ])
531}
532
533fn workflow_artifact_dir(step_id: &str, attempt: u32) -> PathBuf {
534    agents_out_dir()
535        .join(WORKFLOW_ARTIFACT_NAMESPACE)
536        .join(sanitize_component(step_id))
537        .join(format!("attempt-{attempt}"))
538}
539
540fn sanitize_component(value: &str) -> String {
541    let mut sanitized = String::with_capacity(value.len());
542    for ch in value.chars() {
543        if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
544            sanitized.push(ch);
545        } else {
546            sanitized.push('-');
547        }
548    }
549
550    let trimmed = sanitized.trim_matches('-');
551    if trimmed.is_empty() {
552        "step".to_string()
553    } else {
554        trimmed.to_string()
555    }
556}
557
558fn path_to_string(path: &Path) -> String {
559    path.to_string_lossy().to_string()
560}
561
562fn agents_out_dir() -> PathBuf {
563    if let Ok(agent_home) = std::env::var("AGENT_HOME") {
564        return PathBuf::from(agent_home).join("out");
565    }
566    if let Some(home) = std::env::var_os("HOME") {
567        return PathBuf::from(home).join(".agents").join("out");
568    }
569    PathBuf::from(".agents").join("out")
570}
571
572fn spawn_pipe_reader<R>(mut reader: R) -> thread::JoinHandle<Vec<u8>>
573where
574    R: Read + Send + 'static,
575{
576    thread::spawn(move || {
577        let mut bytes = Vec::new();
578        let _ = reader.read_to_end(&mut bytes);
579        bytes
580    })
581}
582
583fn pipe_reader_output(handle: Option<thread::JoinHandle<Vec<u8>>>) -> Vec<u8> {
584    let Some(handle) = handle else {
585        return Vec::new();
586    };
587    handle.join().unwrap_or_default()
588}
589
590fn emit_json<T: Serialize>(value: &T) -> i32 {
591    match serde_json::to_string_pretty(value) {
592        Ok(encoded) => {
593            println!("{encoded}");
594            EXIT_OK
595        }
596        Err(error) => {
597            eprintln!("agentctl workflow run: failed to render json output: {error}");
598            EXIT_RUNTIME_ERROR
599        }
600    }
601}
602
603fn emit_text(report: &WorkflowRunReport) -> i32 {
604    println!("schema_version: {}", report.schema_version);
605    println!("command: {}", report.command);
606    if let Some(workflow_name) = report.workflow_name.as_deref() {
607        println!("workflow_name: {workflow_name}");
608    }
609    println!(
610        "on_error: {}",
611        match report.on_error {
612            WorkflowOnError::FailFast => "fail-fast",
613            WorkflowOnError::ContinueOnError => "continue-on-error",
614        }
615    );
616    println!(
617        "summary: total={} executed={} succeeded={} failed={} skipped={} elapsed_ms={} success={}",
618        report.summary.total_steps,
619        report.summary.executed_steps,
620        report.summary.succeeded_steps,
621        report.summary.failed_steps,
622        report.summary.skipped_steps,
623        report.summary.elapsed_ms,
624        report.summary.success
625    );
626    println!("ledger:");
627    for step in &report.ledger {
628        println!(
629            "- {} [{}] status={} attempts={} exit_code={} elapsed_ms={}",
630            step.step_id,
631            step.step_type,
632            match step.status {
633                StepStatus::Succeeded => "succeeded",
634                StepStatus::Failed => "failed",
635            },
636            step.attempts,
637            step.exit_code,
638            step.elapsed_ms
639        );
640    }
641    EXIT_OK
642}
643
644fn as_millis(duration: Duration) -> u64 {
645    u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
646}