1use crate::audit::AuditStore;
5use crate::constitution::Constitution;
6use crate::model::router::ModelRouter;
7use crate::types::{
8 Action, ActionDecision, ActionType, ExecutionResult, FailureAction, Step, StepResult, StepType,
9 Workflow,
10};
11use crate::workflow::autofix::AutoFixEngine;
12use crate::workflow::checkpoint::{save_checkpoint, Checkpoint};
13use crate::workflow::parser::resolve_variables;
14use anyhow::{Context, Result};
15use chrono::Utc;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Instant;
19use tokio::process::Command;
20use uuid::Uuid;
21
22pub struct WorkflowRunner {
24 pub constitution: Arc<Constitution>,
25 pub audit: Arc<AuditStore>,
26 pub model_router: Option<Arc<ModelRouter>>,
27}
28
29impl WorkflowRunner {
30 pub fn new(constitution: Arc<Constitution>, audit: Arc<AuditStore>) -> Self {
31 Self {
32 constitution,
33 audit,
34 model_router: None,
35 }
36 }
37
38 pub fn with_model_router(mut self, router: Arc<ModelRouter>) -> Self {
40 self.model_router = Some(router);
41 self
42 }
43
44 pub async fn execute(
46 &self,
47 workflow: &Workflow,
48 user_vars: HashMap<String, String>,
49 shadow: bool,
50 ) -> Result<ExecutionResult> {
51 let execution_id = Uuid::new_v4();
52 let started_at = Utc::now();
53 let timer = Instant::now();
54
55 let mut variables = workflow.variables.clone();
57 variables.extend(user_vars);
58
59 let mut step_results = Vec::new();
60 let mut total_cost = 0.0;
61 let mut success = true;
62 let mut error_msg: Option<String> = None;
63
64 tracing::info!(
65 "Starting workflow '{}' (execution={}, shadow={})",
66 workflow.name,
67 execution_id,
68 shadow
69 );
70
71 for (idx, step) in workflow.steps.iter().enumerate() {
72 let checkpoint = Checkpoint {
74 execution_id,
75 step_index: idx,
76 variables: variables.clone(),
77 completed_steps: step_results.clone(),
78 created_at: Utc::now(),
79 };
80 if let Err(e) = save_checkpoint(&checkpoint).await {
81 tracing::warn!("Failed to save checkpoint: {}", e);
82 }
83
84 let resolved_action = resolve_variables(&step.action, &variables);
86
87 let action = step_to_action(step, &resolved_action);
89 let decision = self.constitution.check_action(&action);
90
91 match decision {
92 ActionDecision::Blocked { reason } => {
93 tracing::warn!("Step '{}' blocked: {}", step.name, reason);
94 self.audit.log_action(&action, "blocked", &reason);
95 step_results.push(StepResult {
96 step_name: step.name.clone(),
97 success: false,
98 output: String::new(),
99 duration_ms: 0,
100 cost: 0.0,
101 model_used: None,
102 error: Some(format!("Blocked by constitution: {}", reason)),
103 });
104 success = false;
105 error_msg = Some(format!("Step '{}' blocked: {}", step.name, reason));
106 break;
107 }
108 ActionDecision::NeedsApproval { prompt } => {
109 tracing::info!("Step '{}' needs approval: {}", step.name, prompt);
110 step_results.push(StepResult {
112 step_name: step.name.clone(),
113 success: false,
114 output: String::new(),
115 duration_ms: 0,
116 cost: 0.0,
117 model_used: None,
118 error: Some(format!("Needs approval: {}", prompt)),
119 });
120 success = false;
121 error_msg = Some(format!("Step '{}' needs approval", step.name));
122 break;
123 }
124 ActionDecision::Allowed => {}
125 }
126
127 if step.breakpoint {
129 let msg = step
130 .breakpoint_message
131 .as_deref()
132 .unwrap_or("Breakpoint reached");
133 tracing::info!("⏸ Breakpoint at '{}': {}", step.name, msg);
134 if shadow {
135 tracing::info!(" [shadow] Would pause here for confirmation");
136 }
137 }
140
141 let step_result = if shadow {
143 execute_step_shadow(step, &resolved_action)
144 } else {
145 execute_step(step, &resolved_action, self.model_router.as_ref(), &step_results.iter().map(|r: &StepResult| (r.step_name.clone(), r.output.clone())).collect::<Vec<_>>()).await
146 };
147
148 match step_result {
149 Ok(mut result) => {
150 total_cost += result.cost;
151 self.audit
152 .log_action(&action, "executed", &result.output);
153
154 if !result.success {
155 match &step.on_failure {
156 FailureAction::Abort => {
157 error_msg = Some(format!(
158 "Step '{}' failed: {}",
159 step.name,
160 result.error.as_deref().unwrap_or("unknown")
161 ));
162 success = false;
163 step_results.push(result);
164 break;
165 }
166 FailureAction::Skip => {
167 tracing::info!("Skipping failed step '{}'", step.name);
168 result.output =
169 format!("[skipped] {}", result.error.as_deref().unwrap_or(""));
170 step_results.push(result);
171 }
172 FailureAction::Retry { max } => {
173 let mut retried = false;
174 for attempt in 1..=*max {
175 tracing::info!(
176 "Retrying step '{}' (attempt {}/{})",
177 step.name,
178 attempt,
179 max
180 );
181 let retry_result = if shadow {
182 execute_step_shadow(step, &resolved_action)
183 } else {
184 execute_step(step, &resolved_action, self.model_router.as_ref(), &step_results.iter().map(|r: &StepResult| (r.step_name.clone(), r.output.clone())).collect::<Vec<_>>()).await
185 };
186 match retry_result {
187 Ok(r) if r.success => {
188 step_results.push(r);
189 retried = true;
190 break;
191 }
192 Ok(r) => result = r,
193 Err(e) => {
194 result.error = Some(e.to_string());
195 }
196 }
197 }
198 if !retried {
199 error_msg = Some(format!(
200 "Step '{}' failed after {} retries",
201 step.name, max
202 ));
203 success = false;
204 step_results.push(result);
205 break;
206 }
207 }
208 FailureAction::AutoFix => {
209 if let Some(router) = &self.model_router {
210 let autofix = AutoFixEngine::new(router.clone());
211 let error_output = result
212 .error
213 .as_deref()
214 .unwrap_or(&result.output);
215 let max_fix_attempts = 3u32;
216
217 tracing::info!(
218 "AutoFix: analyzing failure for step '{}'",
219 step.name
220 );
221
222 eprintln!("🔧 Auto-fix: analyzing failure...");
223 match autofix
224 .analyze_and_fix(
225 &step.name,
226 &resolved_action,
227 error_output,
228 max_fix_attempts,
229 )
230 .await
231 {
232 Ok(fix_result) if fix_result.success => {
233 eprintln!("🔧 Auto-fix found: {}", fix_result.fix_command);
234 tracing::info!(
235 "AutoFix: got fix command after {} attempts: {}",
236 fix_result.attempts,
237 fix_result.fix_command
238 );
239
240 self.audit.log_action(
242 &action,
243 "autofix_attempt",
244 &format!(
245 "Fix: {}\nAnalysis: {}",
246 fix_result.fix_command,
247 fix_result.analysis
248 ),
249 );
250
251 let fix_step = Step {
253 name: format!("{}-autofix", step.name),
254 step_type: StepType::Execute,
255 action: fix_result.fix_command.clone(),
256 on_failure: FailureAction::Abort,
257 breakpoint: false,
258 breakpoint_message: None,
259 };
260 match execute_step(
261 &fix_step,
262 &fix_result.fix_command,
263 self.model_router.as_ref(), &step_results.iter().map(|r: &StepResult| (r.step_name.clone(), r.output.clone())).collect::<Vec<_>>(),
264 )
265 .await
266 {
267 Ok(fix_exec) if fix_exec.success => {
268 eprintln!("🔧 Auto-fix succeeded! Using fix result.");
269 total_cost += fix_exec.cost;
270
271 let mut fixed_result = fix_exec;
274 fixed_result.step_name = step.name.clone();
275 step_results.push(fixed_result);
276 }
277 Ok(_fix_exec) => {
278 success = false;
280 error_msg = Some(format!(
281 "Step '{}' auto-fix command failed",
282 step.name
283 ));
284 step_results.push(result);
285 break;
286 }
287 Err(e) => {
288 success = false;
289 error_msg = Some(format!(
290 "Step '{}' fix execution error: {}",
291 step.name, e
292 ));
293 step_results.push(result);
294 break;
295 }
296 }
297 }
298 Ok(_) => {
299 success = false;
301 error_msg = Some(format!(
302 "Step '{}' failed, auto-fix could not find a solution",
303 step.name
304 ));
305 step_results.push(result);
306 break;
307 }
308 Err(e) => {
309 tracing::warn!(
311 "AutoFix error for step '{}': {}",
312 step.name,
313 e
314 );
315 success = false;
316 error_msg = Some(format!(
317 "Step '{}' failed, auto-fix error: {}",
318 step.name, e
319 ));
320 step_results.push(result);
321 break;
322 }
323 }
324 } else {
325 tracing::warn!(
327 "AutoFix requested but no model router configured, aborting at '{}'",
328 step.name
329 );
330 success = false;
331 error_msg = Some(format!(
332 "Step '{}' failed, auto-fix requires AI models (run murc init)",
333 step.name
334 ));
335 step_results.push(result);
336 break;
337 }
338 }
339 }
340 } else {
341 step_results.push(result);
342 }
343 }
344 Err(e) => {
345 success = false;
346 error_msg = Some(format!("Step '{}' error: {}", step.name, e));
347 step_results.push(StepResult {
348 step_name: step.name.clone(),
349 success: false,
350 output: String::new(),
351 duration_ms: 0,
352 cost: 0.0,
353 model_used: None,
354 error: Some(e.to_string()),
355 });
356 break;
357 }
358 }
359 }
360
361 let finished_at = Utc::now();
362 let duration_ms = timer.elapsed().as_millis() as u64;
363
364 Ok(ExecutionResult {
365 execution_id,
366 workflow_id: workflow.id.clone(),
367 steps_completed: step_results.iter().filter(|r| r.success).count(),
368 steps_total: workflow.steps.len(),
369 success,
370 duration_ms,
371 total_cost,
372 step_results,
373 shadow,
374 error: error_msg,
375 started_at,
376 finished_at,
377 })
378 }
379}
380
381async fn execute_step(
383 step: &Step,
384 resolved_action: &str,
385 model_router: Option<&Arc<ModelRouter>>,
386 prior_outputs: &[(String, String)],
387) -> Result<StepResult> {
388 let timer = Instant::now();
389
390 let is_ai_step = matches!(
391 step.step_type,
392 StepType::Analyze
393 | StepType::Plan
394 | StepType::Debug
395 | StepType::Summarize
396 | StepType::Code
397 | StepType::Refactor
398 | StepType::Fix
399 | StepType::Search
400 | StepType::Classify
401 | StepType::SecurityCheck
402 );
403
404 if is_ai_step {
406 if let Some(router) = model_router {
407 tracing::info!(
408 "Step '{}' using AI model (type={:?})",
409 step.name,
410 step.step_type
411 );
412 let context_prompt = if !prior_outputs.is_empty() {
414 let ctx: String = prior_outputs
415 .iter()
416 .map(|(name, output)| format!("## Output from step '{}':\n{}\n", name, output))
417 .collect();
418 format!("{}\n\n## Task:\n{}", ctx, resolved_action)
419 } else {
420 resolved_action.to_string()
421 };
422 match router.complete_for_step(&step.step_type, &context_prompt).await {
423 Ok(response) => {
424 return Ok(StepResult {
425 step_name: step.name.clone(),
426 success: true,
427 output: response.content,
428 duration_ms: timer.elapsed().as_millis() as u64,
429 cost: response.cost,
430 model_used: Some(response.model),
431 error: None,
432 });
433 }
434 Err(e) => {
435 let err_msg = format!("AI model error: {}", e);
436 tracing::warn!(
437 "AI model failed for step '{}': {}, falling back to shell",
438 step.name,
439 e
440 );
441 return Ok(StepResult {
444 step_name: step.name.clone(),
445 success: false,
446 output: err_msg.clone(),
447 duration_ms: timer.elapsed().as_millis() as u64,
448 cost: 0.0,
449 model_used: None,
450 error: Some(err_msg),
451 });
452 }
453 }
454 } else {
455 tracing::debug!(
456 "Step '{}' is AI-powered but no model router configured, using shell",
457 step.name
458 );
459 }
460 }
461
462 let output = Command::new("sh")
464 .arg("-c")
465 .arg(resolved_action)
466 .output()
467 .await
468 .with_context(|| format!("Executing step '{}'", step.name))?;
469
470 let stdout = String::from_utf8_lossy(&output.stdout).to_string();
471 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
472 let combined = if stderr.is_empty() {
473 stdout.clone()
474 } else {
475 format!("{}\n{}", stdout, stderr)
476 };
477
478 Ok(StepResult {
479 step_name: step.name.clone(),
480 success: output.status.success(),
481 output: combined,
482 duration_ms: timer.elapsed().as_millis() as u64,
483 cost: 0.0,
484 model_used: None,
485 error: if output.status.success() {
486 None
487 } else {
488 Some(format!(
489 "Exit code: {}. {}",
490 output.status.code().unwrap_or(-1),
491 stderr.trim()
492 ))
493 },
494 })
495}
496
497fn execute_step_shadow(step: &Step, resolved_action: &str) -> Result<StepResult> {
499 let output = format!(
500 "[shadow] Would execute: {}\n Type: {:?}\n Failure policy: {:?}{}",
501 resolved_action,
502 step.step_type,
503 step.on_failure,
504 if step.breakpoint {
505 format!(
506 "\n ⏸ Breakpoint: {}",
507 step.breakpoint_message.as_deref().unwrap_or("(none)")
508 )
509 } else {
510 String::new()
511 }
512 );
513
514 Ok(StepResult {
515 step_name: step.name.clone(),
516 success: true,
517 output,
518 duration_ms: 0,
519 cost: 0.0,
520 model_used: None,
521 error: None,
522 })
523}
524
525fn step_to_action(step: &Step, resolved_action: &str) -> Action {
527 let action_type = match step.step_type {
528 StepType::Execute => ActionType::Execute,
529 StepType::Code | StepType::Refactor | StepType::Fix => ActionType::Write,
530 StepType::Analyze | StepType::Plan | StepType::Debug | StepType::Summarize => {
531 ActionType::ModelInvoke
532 }
533 StepType::Search | StepType::Classify => ActionType::Read,
534 StepType::SecurityCheck => ActionType::Read,
535 StepType::Other => ActionType::Other,
536 };
537
538 Action {
539 id: Uuid::new_v4(),
540 action_type,
541 description: format!("Workflow step: {}", step.name),
542 command: resolved_action.to_string(),
543 working_dir: None,
544 created_at: Utc::now(),
545 }
546}
547
548#[cfg(test)]
549mod tests {
550 use super::*;
551 use crate::audit::AuditStore;
552 use crate::constitution::Constitution;
553 use crate::workflow::parser::parse_workflow_str;
554
555 fn test_constitution() -> Constitution {
556 let toml = r#"
557[identity]
558version = "1.0.0"
559
560[boundaries]
561forbidden = ["rm -rf /"]
562requires_approval = []
563auto_allowed = ["echo", "cat", "curl", "docker", "workflow step"]
564
565[resource_limits]
566max_api_cost_per_run = 10.0
567max_api_cost_per_day = 50.0
568max_execution_time = 3600
569max_concurrent_workflows = 5
570max_file_write_size = "10MB"
571allowed_directories = ["/tmp", "/home"]
572blocked_directories = ["/etc", "/sys"]
573
574[model_permissions.thinking_model]
575can_execute = false
576can_read = true
577
578[model_permissions.coding_model]
579can_execute = true
580can_read = true
581
582[model_permissions.task_model]
583can_execute = true
584can_read = true
585"#;
586 Constitution::from_toml(toml).unwrap()
587 }
588
589 #[tokio::test]
590 async fn test_shadow_execution() {
591 let yaml = r#"
592id: test-shadow
593name: Shadow Test
594description: Test shadow mode
595variables:
596 name: world
597steps:
598 - name: greet
599 step_type: execute
600 action: "echo Hello {{name}}"
601 - name: deploy
602 step_type: execute
603 action: "docker compose up -d"
604 breakpoint: true
605 breakpoint_message: "Deploy now?"
606"#;
607 let wf = parse_workflow_str(yaml).unwrap();
608 let constitution = Arc::new(test_constitution());
609 let audit = Arc::new(AuditStore::new_memory());
610 let runner = WorkflowRunner::new(constitution, audit);
611
612 let result = runner
613 .execute(&wf, HashMap::new(), true)
614 .await
615 .unwrap();
616
617 assert!(result.shadow);
618 assert!(result.success);
619 assert_eq!(result.step_results.len(), 2);
620 assert!(result.step_results[0].output.contains("[shadow]"));
621 assert!(result.step_results[1].output.contains("Breakpoint"));
622 }
623
624 #[tokio::test]
625 async fn test_real_execution() {
626 let yaml = r#"
627id: test-real
628name: Real Test
629description: Test real execution
630variables: {}
631steps:
632 - name: echo-test
633 step_type: execute
634 action: "echo hello-from-runner"
635"#;
636 let wf = parse_workflow_str(yaml).unwrap();
637 let constitution = Arc::new(test_constitution());
638 let audit = Arc::new(AuditStore::new_memory());
639 let runner = WorkflowRunner::new(constitution, audit);
640
641 let result = runner
642 .execute(&wf, HashMap::new(), false)
643 .await
644 .unwrap();
645
646 assert!(!result.shadow);
647 assert!(result.success);
648 assert!(result.step_results[0]
649 .output
650 .contains("hello-from-runner"));
651 }
652}