Skip to main content

mur_core/workflow/
runner.rs

1//! Workflow execution engine — runs workflow steps with constitution checks,
2//! audit logging, checkpoints, and failure handling.
3
4use crate::audit::AuditStore;
5use crate::constitution::Constitution;
6use crate::model::router::ModelRouter;
7use crate::types::{
8    Action, ActionDecision, ActionType, ExecutionResult, FailureAction, Step, StepResult, StepType,
9    Workflow,
10};
11use crate::workflow::autofix::AutoFixEngine;
12use crate::workflow::checkpoint::{save_checkpoint, Checkpoint};
13use crate::workflow::parser::resolve_variables;
14use anyhow::{Context, Result};
15use chrono::Utc;
16use std::collections::HashMap;
17use std::sync::Arc;
18use std::time::Instant;
19use tokio::process::Command;
20use uuid::Uuid;
21
22/// Workflow execution engine.
23pub struct WorkflowRunner {
24    pub constitution: Arc<Constitution>,
25    pub audit: Arc<AuditStore>,
26    pub model_router: Option<Arc<ModelRouter>>,
27}
28
29impl WorkflowRunner {
30    pub fn new(constitution: Arc<Constitution>, audit: Arc<AuditStore>) -> Self {
31        Self {
32            constitution,
33            audit,
34            model_router: None,
35        }
36    }
37
38    /// Create a runner with a model router for AI-powered steps.
39    pub fn with_model_router(mut self, router: Arc<ModelRouter>) -> Self {
40        self.model_router = Some(router);
41        self
42    }
43
44    /// Execute a workflow, optionally in shadow (dry-run) mode.
45    pub async fn execute(
46        &self,
47        workflow: &Workflow,
48        user_vars: HashMap<String, String>,
49        shadow: bool,
50    ) -> Result<ExecutionResult> {
51        let execution_id = Uuid::new_v4();
52        let started_at = Utc::now();
53        let timer = Instant::now();
54
55        // Merge user variables with workflow defaults
56        let mut variables = workflow.variables.clone();
57        variables.extend(user_vars);
58
59        let mut step_results = Vec::new();
60        let mut total_cost = 0.0;
61        let mut success = true;
62        let mut error_msg: Option<String> = None;
63
64        tracing::info!(
65            "Starting workflow '{}' (execution={}, shadow={})",
66            workflow.name,
67            execution_id,
68            shadow
69        );
70
71        for (idx, step) in workflow.steps.iter().enumerate() {
72            // Save checkpoint before each step
73            let checkpoint = Checkpoint {
74                execution_id,
75                step_index: idx,
76                variables: variables.clone(),
77                completed_steps: step_results.clone(),
78                created_at: Utc::now(),
79            };
80            if let Err(e) = save_checkpoint(&checkpoint).await {
81                tracing::warn!("Failed to save checkpoint: {}", e);
82            }
83
84            // Resolve variables in action
85            let resolved_action = resolve_variables(&step.action, &variables);
86
87            // Constitution check
88            let action = step_to_action(step, &resolved_action);
89            let decision = self.constitution.check_action(&action);
90
91            match decision {
92                ActionDecision::Blocked { reason } => {
93                    tracing::warn!("Step '{}' blocked: {}", step.name, reason);
94                    self.audit.log_action(&action, "blocked", &reason);
95                    step_results.push(StepResult {
96                        step_name: step.name.clone(),
97                        success: false,
98                        output: String::new(),
99                        duration_ms: 0,
100                        cost: 0.0,
101                        model_used: None,
102                        error: Some(format!("Blocked by constitution: {}", reason)),
103                    });
104                    success = false;
105                    error_msg = Some(format!("Step '{}' blocked: {}", step.name, reason));
106                    break;
107                }
108                ActionDecision::NeedsApproval { prompt } => {
109                    tracing::info!("Step '{}' needs approval: {}", step.name, prompt);
110                    // For now, treat as breakpoint — log and skip
111                    step_results.push(StepResult {
112                        step_name: step.name.clone(),
113                        success: false,
114                        output: String::new(),
115                        duration_ms: 0,
116                        cost: 0.0,
117                        model_used: None,
118                        error: Some(format!("Needs approval: {}", prompt)),
119                    });
120                    success = false;
121                    error_msg = Some(format!("Step '{}' needs approval", step.name));
122                    break;
123                }
124                ActionDecision::Allowed => {}
125            }
126
127            // Breakpoint check
128            if step.breakpoint {
129                let msg = step
130                    .breakpoint_message
131                    .as_deref()
132                    .unwrap_or("Breakpoint reached");
133                tracing::info!("⏸ Breakpoint at '{}': {}", step.name, msg);
134                if shadow {
135                    tracing::info!("  [shadow] Would pause here for confirmation");
136                }
137                // In non-interactive mode, we log and continue
138                // Interactive breakpoints will be added in Phase 3
139            }
140
141            // Execute step
142            let step_result = if shadow {
143                execute_step_shadow(step, &resolved_action)
144            } else {
145                execute_step(step, &resolved_action, self.model_router.as_ref(), &step_results.iter().map(|r: &StepResult| (r.step_name.clone(), r.output.clone())).collect::<Vec<_>>()).await
146            };
147
148            match step_result {
149                Ok(mut result) => {
150                    total_cost += result.cost;
151                    self.audit
152                        .log_action(&action, "executed", &result.output);
153
154                    if !result.success {
155                        match &step.on_failure {
156                            FailureAction::Abort => {
157                                error_msg = Some(format!(
158                                    "Step '{}' failed: {}",
159                                    step.name,
160                                    result.error.as_deref().unwrap_or("unknown")
161                                ));
162                                success = false;
163                                step_results.push(result);
164                                break;
165                            }
166                            FailureAction::Skip => {
167                                tracing::info!("Skipping failed step '{}'", step.name);
168                                result.output =
169                                    format!("[skipped] {}", result.error.as_deref().unwrap_or(""));
170                                step_results.push(result);
171                            }
172                            FailureAction::Retry { max } => {
173                                let mut retried = false;
174                                for attempt in 1..=*max {
175                                    tracing::info!(
176                                        "Retrying step '{}' (attempt {}/{})",
177                                        step.name,
178                                        attempt,
179                                        max
180                                    );
181                                    let retry_result = if shadow {
182                                        execute_step_shadow(step, &resolved_action)
183                                    } else {
184                                        execute_step(step, &resolved_action, self.model_router.as_ref(), &step_results.iter().map(|r: &StepResult| (r.step_name.clone(), r.output.clone())).collect::<Vec<_>>()).await
185                                    };
186                                    match retry_result {
187                                        Ok(r) if r.success => {
188                                            step_results.push(r);
189                                            retried = true;
190                                            break;
191                                        }
192                                        Ok(r) => result = r,
193                                        Err(e) => {
194                                            result.error = Some(e.to_string());
195                                        }
196                                    }
197                                }
198                                if !retried {
199                                    error_msg = Some(format!(
200                                        "Step '{}' failed after {} retries",
201                                        step.name, max
202                                    ));
203                                    success = false;
204                                    step_results.push(result);
205                                    break;
206                                }
207                            }
208                            FailureAction::AutoFix => {
209                                if let Some(router) = &self.model_router {
210                                    let autofix = AutoFixEngine::new(router.clone());
211                                    let error_output = result
212                                        .error
213                                        .as_deref()
214                                        .unwrap_or(&result.output);
215                                    let max_fix_attempts = 3u32;
216
217                                    tracing::info!(
218                                        "AutoFix: analyzing failure for step '{}'",
219                                        step.name
220                                    );
221
222                                    eprintln!("🔧 Auto-fix: analyzing failure...");
223                                    match autofix
224                                        .analyze_and_fix(
225                                            &step.name,
226                                            &resolved_action,
227                                            error_output,
228                                            max_fix_attempts,
229                                        )
230                                        .await
231                                    {
232                                        Ok(fix_result) if fix_result.success => {
233                                            eprintln!("🔧 Auto-fix found: {}", fix_result.fix_command);
234                                            tracing::info!(
235                                                "AutoFix: got fix command after {} attempts: {}",
236                                                fix_result.attempts,
237                                                fix_result.fix_command
238                                            );
239
240                                            // Log the fix attempt to audit
241                                            self.audit.log_action(
242                                                &action,
243                                                "autofix_attempt",
244                                                &format!(
245                                                    "Fix: {}\nAnalysis: {}",
246                                                    fix_result.fix_command,
247                                                    fix_result.analysis
248                                                ),
249                                            );
250
251                                            // Execute the fix command
252                                            let fix_step = Step {
253                                                name: format!("{}-autofix", step.name),
254                                                step_type: StepType::Execute,
255                                                action: fix_result.fix_command.clone(),
256                                                on_failure: FailureAction::Abort,
257                                                breakpoint: false,
258                                                breakpoint_message: None,
259                                            };
260                                            match execute_step(
261                                                &fix_step,
262                                                &fix_result.fix_command,
263                                                self.model_router.as_ref(), &step_results.iter().map(|r: &StepResult| (r.step_name.clone(), r.output.clone())).collect::<Vec<_>>(),
264                                            )
265                                            .await
266                                            {
267                                                Ok(fix_exec) if fix_exec.success => {
268                                                    eprintln!("🔧 Auto-fix succeeded! Using fix result.");
269                                                    total_cost += fix_exec.cost;
270
271                                                    // Use the fix result directly (the fix command
272                                                    // replaces the broken original)
273                                                    let mut fixed_result = fix_exec;
274                                                    fixed_result.step_name = step.name.clone();
275                                                    step_results.push(fixed_result);
276                                                }
277                                                Ok(_fix_exec) => {
278                                                    // Fix command itself failed
279                                                    success = false;
280                                                    error_msg = Some(format!(
281                                                        "Step '{}' auto-fix command failed",
282                                                        step.name
283                                                    ));
284                                                    step_results.push(result);
285                                                    break;
286                                                }
287                                                Err(e) => {
288                                                    success = false;
289                                                    error_msg = Some(format!(
290                                                        "Step '{}' fix execution error: {}",
291                                                        step.name, e
292                                                    ));
293                                                    step_results.push(result);
294                                                    break;
295                                                }
296                                            }
297                                        }
298                                        Ok(_) => {
299                                            // AutoFix couldn't generate a fix
300                                            success = false;
301                                            error_msg = Some(format!(
302                                                "Step '{}' failed, auto-fix could not find a solution",
303                                                step.name
304                                            ));
305                                            step_results.push(result);
306                                            break;
307                                        }
308                                        Err(e) => {
309                                            // AutoFix errored out (no API key, network, etc.)
310                                            tracing::warn!(
311                                                "AutoFix error for step '{}': {}",
312                                                step.name,
313                                                e
314                                            );
315                                            success = false;
316                                            error_msg = Some(format!(
317                                                "Step '{}' failed, auto-fix error: {}",
318                                                step.name, e
319                                            ));
320                                            step_results.push(result);
321                                            break;
322                                        }
323                                    }
324                                } else {
325                                    // No model router — can't auto-fix
326                                    tracing::warn!(
327                                        "AutoFix requested but no model router configured, aborting at '{}'",
328                                        step.name
329                                    );
330                                    success = false;
331                                    error_msg = Some(format!(
332                                        "Step '{}' failed, auto-fix requires AI models (run murc init)",
333                                        step.name
334                                    ));
335                                    step_results.push(result);
336                                    break;
337                                }
338                            }
339                        }
340                    } else {
341                        step_results.push(result);
342                    }
343                }
344                Err(e) => {
345                    success = false;
346                    error_msg = Some(format!("Step '{}' error: {}", step.name, e));
347                    step_results.push(StepResult {
348                        step_name: step.name.clone(),
349                        success: false,
350                        output: String::new(),
351                        duration_ms: 0,
352                        cost: 0.0,
353                        model_used: None,
354                        error: Some(e.to_string()),
355                    });
356                    break;
357                }
358            }
359        }
360
361        let finished_at = Utc::now();
362        let duration_ms = timer.elapsed().as_millis() as u64;
363
364        Ok(ExecutionResult {
365            execution_id,
366            workflow_id: workflow.id.clone(),
367            steps_completed: step_results.iter().filter(|r| r.success).count(),
368            steps_total: workflow.steps.len(),
369            success,
370            duration_ms,
371            total_cost,
372            step_results,
373            shadow,
374            error: error_msg,
375            started_at,
376            finished_at,
377        })
378    }
379}
380
381/// Execute a step — shell for `execute` type, AI model for analyze/plan/debug/etc.
382async fn execute_step(
383    step: &Step,
384    resolved_action: &str,
385    model_router: Option<&Arc<ModelRouter>>,
386    prior_outputs: &[(String, String)],
387) -> Result<StepResult> {
388    let timer = Instant::now();
389
390    let is_ai_step = matches!(
391        step.step_type,
392        StepType::Analyze
393            | StepType::Plan
394            | StepType::Debug
395            | StepType::Summarize
396            | StepType::Code
397            | StepType::Refactor
398            | StepType::Fix
399            | StepType::Search
400            | StepType::Classify
401            | StepType::SecurityCheck
402    );
403
404    // Use AI model if available and step type requires it
405    if is_ai_step {
406        if let Some(router) = model_router {
407            tracing::info!(
408                "Step '{}' using AI model (type={:?})",
409                step.name,
410                step.step_type
411            );
412            // Build context from previous steps for AI
413            let context_prompt = if !prior_outputs.is_empty() {
414                let ctx: String = prior_outputs
415                    .iter()
416                    .map(|(name, output)| format!("## Output from step '{}':\n{}\n", name, output))
417                    .collect();
418                format!("{}\n\n## Task:\n{}", ctx, resolved_action)
419            } else {
420                resolved_action.to_string()
421            };
422            match router.complete_for_step(&step.step_type, &context_prompt).await {
423                Ok(response) => {
424                    return Ok(StepResult {
425                        step_name: step.name.clone(),
426                        success: true,
427                        output: response.content,
428                        duration_ms: timer.elapsed().as_millis() as u64,
429                        cost: response.cost,
430                        model_used: Some(response.model),
431                        error: None,
432                    });
433                }
434                Err(e) => {
435                    let err_msg = format!("AI model error: {}", e);
436                    tracing::warn!(
437                        "AI model failed for step '{}': {}, falling back to shell",
438                        step.name,
439                        e
440                    );
441                    // For AI steps, return the error instead of falling back to shell
442                    // (shell execution of a prompt makes no sense)
443                    return Ok(StepResult {
444                        step_name: step.name.clone(),
445                        success: false,
446                        output: err_msg.clone(),
447                        duration_ms: timer.elapsed().as_millis() as u64,
448                        cost: 0.0,
449                        model_used: None,
450                        error: Some(err_msg),
451                    });
452                }
453            }
454        } else {
455            tracing::debug!(
456                "Step '{}' is AI-powered but no model router configured, using shell",
457                step.name
458            );
459        }
460    }
461
462    // Shell execution (default for execute type, fallback for AI steps)
463    let output = Command::new("sh")
464        .arg("-c")
465        .arg(resolved_action)
466        .output()
467        .await
468        .with_context(|| format!("Executing step '{}'", step.name))?;
469
470    let stdout = String::from_utf8_lossy(&output.stdout).to_string();
471    let stderr = String::from_utf8_lossy(&output.stderr).to_string();
472    let combined = if stderr.is_empty() {
473        stdout.clone()
474    } else {
475        format!("{}\n{}", stdout, stderr)
476    };
477
478    Ok(StepResult {
479        step_name: step.name.clone(),
480        success: output.status.success(),
481        output: combined,
482        duration_ms: timer.elapsed().as_millis() as u64,
483        cost: 0.0,
484        model_used: None,
485        error: if output.status.success() {
486            None
487        } else {
488            Some(format!(
489                "Exit code: {}. {}",
490                output.status.code().unwrap_or(-1),
491                stderr.trim()
492            ))
493        },
494    })
495}
496
497/// Shadow mode: log what would happen without executing.
498fn execute_step_shadow(step: &Step, resolved_action: &str) -> Result<StepResult> {
499    let output = format!(
500        "[shadow] Would execute: {}\n  Type: {:?}\n  Failure policy: {:?}{}",
501        resolved_action,
502        step.step_type,
503        step.on_failure,
504        if step.breakpoint {
505            format!(
506                "\n  ⏸ Breakpoint: {}",
507                step.breakpoint_message.as_deref().unwrap_or("(none)")
508            )
509        } else {
510            String::new()
511        }
512    );
513
514    Ok(StepResult {
515        step_name: step.name.clone(),
516        success: true,
517        output,
518        duration_ms: 0,
519        cost: 0.0,
520        model_used: None,
521        error: None,
522    })
523}
524
525/// Convert a Step to an Action for constitution checking.
526fn step_to_action(step: &Step, resolved_action: &str) -> Action {
527    let action_type = match step.step_type {
528        StepType::Execute => ActionType::Execute,
529        StepType::Code | StepType::Refactor | StepType::Fix => ActionType::Write,
530        StepType::Analyze | StepType::Plan | StepType::Debug | StepType::Summarize => {
531            ActionType::ModelInvoke
532        }
533        StepType::Search | StepType::Classify => ActionType::Read,
534        StepType::SecurityCheck => ActionType::Read,
535        StepType::Other => ActionType::Other,
536    };
537
538    Action {
539        id: Uuid::new_v4(),
540        action_type,
541        description: format!("Workflow step: {}", step.name),
542        command: resolved_action.to_string(),
543        working_dir: None,
544        created_at: Utc::now(),
545    }
546}
547
548#[cfg(test)]
549mod tests {
550    use super::*;
551    use crate::audit::AuditStore;
552    use crate::constitution::Constitution;
553    use crate::workflow::parser::parse_workflow_str;
554
555    fn test_constitution() -> Constitution {
556        let toml = r#"
557[identity]
558version = "1.0.0"
559
560[boundaries]
561forbidden = ["rm -rf /"]
562requires_approval = []
563auto_allowed = ["echo", "cat", "curl", "docker", "workflow step"]
564
565[resource_limits]
566max_api_cost_per_run = 10.0
567max_api_cost_per_day = 50.0
568max_execution_time = 3600
569max_concurrent_workflows = 5
570max_file_write_size = "10MB"
571allowed_directories = ["/tmp", "/home"]
572blocked_directories = ["/etc", "/sys"]
573
574[model_permissions.thinking_model]
575can_execute = false
576can_read = true
577
578[model_permissions.coding_model]
579can_execute = true
580can_read = true
581
582[model_permissions.task_model]
583can_execute = true
584can_read = true
585"#;
586        Constitution::from_toml(toml).unwrap()
587    }
588
589    #[tokio::test]
590    async fn test_shadow_execution() {
591        let yaml = r#"
592id: test-shadow
593name: Shadow Test
594description: Test shadow mode
595variables:
596  name: world
597steps:
598  - name: greet
599    step_type: execute
600    action: "echo Hello {{name}}"
601  - name: deploy
602    step_type: execute
603    action: "docker compose up -d"
604    breakpoint: true
605    breakpoint_message: "Deploy now?"
606"#;
607        let wf = parse_workflow_str(yaml).unwrap();
608        let constitution = Arc::new(test_constitution());
609        let audit = Arc::new(AuditStore::new_memory());
610        let runner = WorkflowRunner::new(constitution, audit);
611
612        let result = runner
613            .execute(&wf, HashMap::new(), true)
614            .await
615            .unwrap();
616
617        assert!(result.shadow);
618        assert!(result.success);
619        assert_eq!(result.step_results.len(), 2);
620        assert!(result.step_results[0].output.contains("[shadow]"));
621        assert!(result.step_results[1].output.contains("Breakpoint"));
622    }
623
624    #[tokio::test]
625    async fn test_real_execution() {
626        let yaml = r#"
627id: test-real
628name: Real Test
629description: Test real execution
630variables: {}
631steps:
632  - name: echo-test
633    step_type: execute
634    action: "echo hello-from-runner"
635"#;
636        let wf = parse_workflow_str(yaml).unwrap();
637        let constitution = Arc::new(test_constitution());
638        let audit = Arc::new(AuditStore::new_memory());
639        let runner = WorkflowRunner::new(constitution, audit);
640
641        let result = runner
642            .execute(&wf, HashMap::new(), false)
643            .await
644            .unwrap();
645
646        assert!(!result.shadow);
647        assert!(result.success);
648        assert!(result.step_results[0]
649            .output
650            .contains("hello-from-runner"));
651    }
652}