1use super::schema::{
2 AutomationStep, ProviderStep, WorkflowDocument, WorkflowOnError, WorkflowSchemaError,
3 WorkflowStep,
4};
5use super::steps::automation::{AutomationCommandProvenance, resolve_automation_invocation};
6use crate::provider::registry::ProviderRegistry;
7use agent_runtime_core::schema::{ExecuteRequest, ProviderError};
8use clap::{Args, ValueEnum};
9use serde::Serialize;
10use std::fs;
11use std::io::{self, Read};
12use std::path::{Path, PathBuf};
13use std::process::{Command, Stdio};
14use std::thread;
15use std::time::{Duration, Instant};
16
17const EXIT_OK: i32 = 0;
18const EXIT_RUNTIME_ERROR: i32 = 1;
19const EXIT_USAGE: i32 = 64;
20const TIMEOUT_EXIT_CODE: i32 = 124;
21const WORKFLOW_RUN_SCHEMA_VERSION: &str = "agentctl.workflow.run.v1";
22const WORKFLOW_ARTIFACT_NAMESPACE: &str = "agentctl-workflow";
23
24#[derive(Debug, Args)]
25pub struct RunArgs {
26 #[arg(long)]
28 pub file: PathBuf,
29
30 #[arg(long, value_enum, default_value_t = RunOutputFormat::Json)]
32 pub format: RunOutputFormat,
33}
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum, Default)]
36pub enum RunOutputFormat {
37 Text,
38 #[default]
39 Json,
40}
41
42#[derive(Debug, Clone, Serialize)]
43pub struct WorkflowRunReport {
44 pub schema_version: &'static str,
45 pub command: &'static str,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 pub workflow_name: Option<String>,
48 pub on_error: WorkflowOnError,
49 pub summary: WorkflowRunSummary,
50 pub ledger: Vec<StepLedgerEntry>,
51}
52
53#[derive(Debug, Clone, Serialize)]
54pub struct WorkflowRunSummary {
55 pub total_steps: usize,
56 pub executed_steps: usize,
57 pub succeeded_steps: usize,
58 pub failed_steps: usize,
59 pub skipped_steps: usize,
60 pub elapsed_ms: u64,
61 pub success: bool,
62}
63
64#[derive(Debug, Clone, Serialize)]
65pub struct StepLedgerEntry {
66 pub step_id: String,
67 pub step_type: &'static str,
68 #[serde(skip_serializing_if = "Option::is_none")]
69 pub provider: Option<String>,
70 #[serde(skip_serializing_if = "Option::is_none")]
71 pub automation_tool: Option<String>,
72 #[serde(skip_serializing_if = "Option::is_none")]
73 pub command: Option<AutomationCommandProvenance>,
74 #[serde(default, skip_serializing_if = "Vec::is_empty")]
75 pub artifact_paths: Vec<String>,
76 pub attempts: u32,
77 pub status: StepStatus,
78 pub exit_code: i32,
79 pub stdout: String,
80 pub stderr: String,
81 pub elapsed_ms: u64,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
85#[serde(rename_all = "kebab-case")]
86pub enum StepStatus {
87 Succeeded,
88 Failed,
89}
90
91impl StepStatus {
92 fn is_failed(self) -> bool {
93 matches!(self, Self::Failed)
94 }
95}
96
97#[derive(Debug)]
98pub enum WorkflowLoadError {
99 Read {
100 path: PathBuf,
101 source: io::Error,
102 },
103 Parse {
104 path: PathBuf,
105 source: serde_json::Error,
106 },
107 Schema(WorkflowSchemaError),
108}
109
110impl std::fmt::Display for WorkflowLoadError {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 Self::Read { path, source } => {
114 write!(
115 f,
116 "failed to read workflow file '{}': {}",
117 path.display(),
118 source
119 )
120 }
121 Self::Parse { path, source } => write!(
122 f,
123 "failed to parse workflow file '{}' as JSON: {}",
124 path.display(),
125 source
126 ),
127 Self::Schema(error) => write!(f, "invalid workflow schema: {error}"),
128 }
129 }
130}
131
132impl std::error::Error for WorkflowLoadError {}
133
134pub fn run(args: RunArgs) -> i32 {
135 let workflow = match load_workflow_file(&args.file) {
136 Ok(workflow) => workflow,
137 Err(error) => {
138 eprintln!("agentctl workflow run: {error}");
139 return EXIT_USAGE;
140 }
141 };
142
143 let report = execute_workflow_document(&workflow);
144 let render_exit = match args.format {
145 RunOutputFormat::Json => emit_json(&report),
146 RunOutputFormat::Text => emit_text(&report),
147 };
148 if render_exit != EXIT_OK {
149 return EXIT_RUNTIME_ERROR;
150 }
151
152 if report.summary.success {
153 EXIT_OK
154 } else {
155 EXIT_RUNTIME_ERROR
156 }
157}
158
159pub fn load_workflow_file(path: &Path) -> Result<WorkflowDocument, WorkflowLoadError> {
160 let raw = fs::read_to_string(path).map_err(|source| WorkflowLoadError::Read {
161 path: path.to_path_buf(),
162 source,
163 })?;
164 let workflow: WorkflowDocument =
165 serde_json::from_str(raw.as_str()).map_err(|source| WorkflowLoadError::Parse {
166 path: path.to_path_buf(),
167 source,
168 })?;
169 workflow.validate().map_err(WorkflowLoadError::Schema)?;
170 Ok(workflow)
171}
172
173pub fn execute_workflow_document(workflow: &WorkflowDocument) -> WorkflowRunReport {
174 let started = Instant::now();
175 let registry = ProviderRegistry::with_builtins();
176 let mut ledger = Vec::with_capacity(workflow.steps.len());
177
178 for step in &workflow.steps {
179 let entry = execute_step(step, ®istry);
180 let failed = entry.status.is_failed();
181 ledger.push(entry);
182
183 if failed && workflow.on_error == WorkflowOnError::FailFast {
184 break;
185 }
186 }
187
188 let succeeded_steps = ledger
189 .iter()
190 .filter(|entry| entry.status == StepStatus::Succeeded)
191 .count();
192 let failed_steps = ledger.len().saturating_sub(succeeded_steps);
193 let total_steps = workflow.steps.len();
194 let executed_steps = ledger.len();
195 let skipped_steps = total_steps.saturating_sub(executed_steps);
196
197 WorkflowRunReport {
198 schema_version: WORKFLOW_RUN_SCHEMA_VERSION,
199 command: "workflow-run",
200 workflow_name: workflow.name.clone(),
201 on_error: workflow.on_error,
202 summary: WorkflowRunSummary {
203 total_steps,
204 executed_steps,
205 succeeded_steps,
206 failed_steps,
207 skipped_steps,
208 elapsed_ms: as_millis(started.elapsed()),
209 success: failed_steps == 0,
210 },
211 ledger,
212 }
213}
214
215fn execute_step(step: &WorkflowStep, registry: &ProviderRegistry) -> StepLedgerEntry {
216 let retry = step.retry();
217 let max_attempts = retry.normalized_max_attempts();
218 let started = Instant::now();
219
220 let mut attempts = 0;
221 let mut last = AttemptOutcome::failed(
222 EXIT_RUNTIME_ERROR,
223 String::new(),
224 "workflow step did not execute".to_string(),
225 );
226 for attempt in 1..=max_attempts {
227 attempts = attempt;
228 last = match step {
229 WorkflowStep::Provider(provider_step) => execute_provider_step(provider_step, registry),
230 WorkflowStep::Automation(automation_step) => {
231 execute_automation_step(step.id(), attempt, automation_step)
232 }
233 };
234 if last.status == StepStatus::Succeeded {
235 break;
236 }
237
238 if attempt < max_attempts && retry.backoff_ms > 0 {
239 thread::sleep(Duration::from_millis(retry.backoff_ms));
240 }
241 }
242
243 let (step_type, provider, automation_tool) = match step {
244 WorkflowStep::Provider(_) => ("provider", last.provider.clone(), None),
245 WorkflowStep::Automation(_) => ("automation", None, last.automation_tool.clone()),
246 };
247
248 StepLedgerEntry {
249 step_id: step.id().to_string(),
250 step_type,
251 provider,
252 automation_tool,
253 command: last.command,
254 artifact_paths: last.artifact_paths,
255 attempts,
256 status: last.status,
257 exit_code: last.exit_code,
258 stdout: last.stdout,
259 stderr: last.stderr,
260 elapsed_ms: as_millis(started.elapsed()),
261 }
262}
263
264fn execute_provider_step(step: &ProviderStep, registry: &ProviderRegistry) -> AttemptOutcome {
265 let selection = match registry.resolve_selection(step.provider.as_deref()) {
266 Ok(selection) => selection,
267 Err(error) => {
268 return AttemptOutcome {
269 status: StepStatus::Failed,
270 exit_code: EXIT_USAGE,
271 stdout: String::new(),
272 stderr: error.to_string(),
273 provider: step.provider.clone(),
274 automation_tool: None,
275 command: None,
276 artifact_paths: Vec::new(),
277 };
278 }
279 };
280
281 let provider_id = selection.provider_id;
282 let Some(adapter) = registry.get(provider_id.as_str()) else {
283 return AttemptOutcome {
284 status: StepStatus::Failed,
285 exit_code: EXIT_RUNTIME_ERROR,
286 stdout: String::new(),
287 stderr: format!("selected provider '{}' is not registered", provider_id),
288 provider: Some(provider_id),
289 automation_tool: None,
290 command: None,
291 artifact_paths: Vec::new(),
292 };
293 };
294
295 let request = ExecuteRequest {
296 task: step.task.clone(),
297 input: step.input.clone(),
298 timeout_ms: step.timeout_ms,
299 };
300 match adapter.execute(request) {
301 Ok(response) => AttemptOutcome {
302 status: if response.exit_code == 0 {
303 StepStatus::Succeeded
304 } else {
305 StepStatus::Failed
306 },
307 exit_code: response.exit_code,
308 stdout: response.stdout,
309 stderr: response.stderr,
310 provider: Some(provider_id),
311 automation_tool: None,
312 command: None,
313 artifact_paths: Vec::new(),
314 },
315 Err(error) => provider_error_outcome(provider_id, error.as_ref()),
316 }
317}
318
319fn provider_error_outcome(provider_id: String, error: &ProviderError) -> AttemptOutcome {
320 let mut stderr = error.message.clone();
321 if let Some(extra_stderr) = error
322 .details
323 .as_ref()
324 .and_then(|details| details.get("stderr"))
325 .and_then(|value| value.as_str())
326 .map(str::trim)
327 .filter(|value| !value.is_empty())
328 {
329 stderr.push('\n');
330 stderr.push_str(extra_stderr);
331 }
332
333 let stdout = error
334 .details
335 .as_ref()
336 .and_then(|details| details.get("stdout"))
337 .and_then(|value| value.as_str())
338 .map(ToOwned::to_owned)
339 .unwrap_or_default();
340
341 let exit_code = error
342 .details
343 .as_ref()
344 .and_then(|details| details.get("exit_code"))
345 .and_then(|value| value.as_i64())
346 .and_then(|value| i32::try_from(value).ok())
347 .unwrap_or(EXIT_RUNTIME_ERROR);
348
349 AttemptOutcome {
350 status: StepStatus::Failed,
351 exit_code,
352 stdout,
353 stderr,
354 provider: Some(provider_id),
355 automation_tool: None,
356 command: None,
357 artifact_paths: Vec::new(),
358 }
359}
360
361fn execute_automation_step(step_id: &str, attempt: u32, step: &AutomationStep) -> AttemptOutcome {
362 let invocation = resolve_automation_invocation(step);
363 let mut outcome = match run_command(
364 invocation.command.as_str(),
365 invocation.args.as_slice(),
366 invocation.env.as_slice(),
367 step.timeout_ms,
368 ) {
369 Ok(output) => AttemptOutcome {
370 status: if output.exit_code == 0 {
371 StepStatus::Succeeded
372 } else {
373 StepStatus::Failed
374 },
375 exit_code: output.exit_code,
376 stdout: output.stdout,
377 stderr: output.stderr,
378 provider: None,
379 automation_tool: Some(step.tool.as_id().to_string()),
380 command: Some(invocation.provenance),
381 artifact_paths: Vec::new(),
382 },
383 Err(error) => AttemptOutcome {
384 status: StepStatus::Failed,
385 exit_code: EXIT_RUNTIME_ERROR,
386 stdout: String::new(),
387 stderr: format!(
388 "failed to run automation tool '{}': {}",
389 step.tool.as_id(),
390 error
391 ),
392 provider: None,
393 automation_tool: Some(step.tool.as_id().to_string()),
394 command: Some(invocation.provenance),
395 artifact_paths: Vec::new(),
396 },
397 };
398
399 match persist_automation_artifacts(step_id, attempt, &outcome.stdout, &outcome.stderr) {
400 Ok(artifact_paths) => {
401 outcome.artifact_paths = artifact_paths;
402 }
403 Err(error) => {
404 if !outcome.stderr.is_empty() && !outcome.stderr.ends_with('\n') {
405 outcome.stderr.push('\n');
406 }
407 outcome
408 .stderr
409 .push_str(format!("failed to persist step artifacts: {error}").as_str());
410 }
411 }
412
413 outcome
414}
415
416struct AttemptOutcome {
417 status: StepStatus,
418 exit_code: i32,
419 stdout: String,
420 stderr: String,
421 provider: Option<String>,
422 automation_tool: Option<String>,
423 command: Option<AutomationCommandProvenance>,
424 artifact_paths: Vec<String>,
425}
426
427impl AttemptOutcome {
428 fn failed(exit_code: i32, stdout: String, stderr: String) -> Self {
429 Self {
430 status: StepStatus::Failed,
431 exit_code,
432 stdout,
433 stderr,
434 provider: None,
435 automation_tool: None,
436 command: None,
437 artifact_paths: Vec::new(),
438 }
439 }
440}
441
442struct CommandOutput {
443 exit_code: i32,
444 stdout: String,
445 stderr: String,
446}
447
448fn run_command(
449 command: &str,
450 args: &[String],
451 env: &[(String, String)],
452 timeout_ms: Option<u64>,
453) -> io::Result<CommandOutput> {
454 let mut cmd = Command::new(command);
455 cmd.args(args);
456 for (key, value) in env {
457 cmd.env(key, value);
458 }
459
460 let mut child = cmd
461 .stdin(Stdio::null())
462 .stdout(Stdio::piped())
463 .stderr(Stdio::piped())
464 .spawn()?;
465
466 let stdout_reader = child.stdout.take().map(spawn_pipe_reader);
467 let stderr_reader = child.stderr.take().map(spawn_pipe_reader);
468
469 let started = Instant::now();
470 let timeout = timeout_ms.map(Duration::from_millis);
471 let mut timed_out = false;
472
473 let status = loop {
474 if let Some(status) = child.try_wait()? {
475 break status;
476 }
477
478 if let Some(timeout) = timeout
479 && started.elapsed() >= timeout
480 {
481 timed_out = true;
482 let _ = child.kill();
483 break child.wait()?;
484 }
485
486 thread::sleep(Duration::from_millis(5));
487 };
488
489 let stdout = pipe_reader_output(stdout_reader);
490 let stderr = pipe_reader_output(stderr_reader);
491 let mut stderr_text = String::from_utf8_lossy(stderr.as_slice()).to_string();
492
493 let exit_code = if timed_out {
494 if !stderr_text.is_empty() && !stderr_text.ends_with('\n') {
495 stderr_text.push('\n');
496 }
497 stderr_text.push_str("step timed out");
498 if let Some(timeout_ms) = timeout_ms {
499 stderr_text.push_str(format!(" after {}ms", timeout_ms).as_str());
500 }
501 TIMEOUT_EXIT_CODE
502 } else {
503 status.code().unwrap_or(EXIT_RUNTIME_ERROR)
504 };
505
506 Ok(CommandOutput {
507 exit_code,
508 stdout: String::from_utf8_lossy(stdout.as_slice()).to_string(),
509 stderr: stderr_text,
510 })
511}
512
513fn persist_automation_artifacts(
514 step_id: &str,
515 attempt: u32,
516 stdout: &str,
517 stderr: &str,
518) -> io::Result<Vec<String>> {
519 let artifact_dir = workflow_artifact_dir(step_id, attempt);
520 fs::create_dir_all(&artifact_dir)?;
521
522 let stdout_path = artifact_dir.join("stdout.log");
523 let stderr_path = artifact_dir.join("stderr.log");
524 fs::write(&stdout_path, stdout.as_bytes())?;
525 fs::write(&stderr_path, stderr.as_bytes())?;
526
527 Ok(vec![
528 path_to_string(stdout_path.as_path()),
529 path_to_string(stderr_path.as_path()),
530 ])
531}
532
533fn workflow_artifact_dir(step_id: &str, attempt: u32) -> PathBuf {
534 agents_out_dir()
535 .join(WORKFLOW_ARTIFACT_NAMESPACE)
536 .join(sanitize_component(step_id))
537 .join(format!("attempt-{attempt}"))
538}
539
540fn sanitize_component(value: &str) -> String {
541 let mut sanitized = String::with_capacity(value.len());
542 for ch in value.chars() {
543 if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
544 sanitized.push(ch);
545 } else {
546 sanitized.push('-');
547 }
548 }
549
550 let trimmed = sanitized.trim_matches('-');
551 if trimmed.is_empty() {
552 "step".to_string()
553 } else {
554 trimmed.to_string()
555 }
556}
557
558fn path_to_string(path: &Path) -> String {
559 path.to_string_lossy().to_string()
560}
561
562fn agents_out_dir() -> PathBuf {
563 if let Ok(agent_home) = std::env::var("AGENT_HOME") {
564 return PathBuf::from(agent_home).join("out");
565 }
566 if let Some(home) = std::env::var_os("HOME") {
567 return PathBuf::from(home).join(".agents").join("out");
568 }
569 PathBuf::from(".agents").join("out")
570}
571
572fn spawn_pipe_reader<R>(mut reader: R) -> thread::JoinHandle<Vec<u8>>
573where
574 R: Read + Send + 'static,
575{
576 thread::spawn(move || {
577 let mut bytes = Vec::new();
578 let _ = reader.read_to_end(&mut bytes);
579 bytes
580 })
581}
582
583fn pipe_reader_output(handle: Option<thread::JoinHandle<Vec<u8>>>) -> Vec<u8> {
584 let Some(handle) = handle else {
585 return Vec::new();
586 };
587 handle.join().unwrap_or_default()
588}
589
590fn emit_json<T: Serialize>(value: &T) -> i32 {
591 match serde_json::to_string_pretty(value) {
592 Ok(encoded) => {
593 println!("{encoded}");
594 EXIT_OK
595 }
596 Err(error) => {
597 eprintln!("agentctl workflow run: failed to render json output: {error}");
598 EXIT_RUNTIME_ERROR
599 }
600 }
601}
602
603fn emit_text(report: &WorkflowRunReport) -> i32 {
604 println!("schema_version: {}", report.schema_version);
605 println!("command: {}", report.command);
606 if let Some(workflow_name) = report.workflow_name.as_deref() {
607 println!("workflow_name: {workflow_name}");
608 }
609 println!(
610 "on_error: {}",
611 match report.on_error {
612 WorkflowOnError::FailFast => "fail-fast",
613 WorkflowOnError::ContinueOnError => "continue-on-error",
614 }
615 );
616 println!(
617 "summary: total={} executed={} succeeded={} failed={} skipped={} elapsed_ms={} success={}",
618 report.summary.total_steps,
619 report.summary.executed_steps,
620 report.summary.succeeded_steps,
621 report.summary.failed_steps,
622 report.summary.skipped_steps,
623 report.summary.elapsed_ms,
624 report.summary.success
625 );
626 println!("ledger:");
627 for step in &report.ledger {
628 println!(
629 "- {} [{}] status={} attempts={} exit_code={} elapsed_ms={}",
630 step.step_id,
631 step.step_type,
632 match step.status {
633 StepStatus::Succeeded => "succeeded",
634 StepStatus::Failed => "failed",
635 },
636 step.attempts,
637 step.exit_code,
638 step.elapsed_ms
639 );
640 }
641 EXIT_OK
642}
643
644fn as_millis(duration: Duration) -> u64 {
645 u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
646}