Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::error::{AgentError, OperationError};
38use ironflow_core::provider::AgentProvider;
39use ironflow_store::models::{
40    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
41    StepUpdate, TriggerKind,
42};
43use ironflow_store::store::RunStore;
44
45use crate::config::{
46    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
47};
48use crate::error::EngineError;
49use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
50use crate::handler::WorkflowHandler;
51use crate::operation::Operation;
52
53/// Callback type for resolving workflow handlers by name.
54pub(crate) type HandlerResolver =
55    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
56
57/// Execution context for a single workflow run.
58///
59/// Tracks the current step position and provides convenience methods
60/// for executing operations with automatic persistence.
61///
62/// # Examples
63///
64/// ```no_run
65/// use ironflow_engine::context::WorkflowContext;
66/// use ironflow_engine::config::ShellConfig;
67/// use ironflow_engine::error::EngineError;
68///
69/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
70/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
71/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
72/// # Ok(())
73/// # }
74/// ```
75pub struct WorkflowContext {
76    run_id: Uuid,
77    store: Arc<dyn RunStore>,
78    provider: Arc<dyn AgentProvider>,
79    handler_resolver: Option<HandlerResolver>,
80    position: u32,
81    /// IDs of the last executed step(s) -- used to record DAG dependencies.
82    last_step_ids: Vec<Uuid>,
83    /// Accumulated cost across all steps in this run.
84    total_cost_usd: Decimal,
85    /// Accumulated duration across all steps.
86    total_duration_ms: u64,
87    /// Steps from a previous execution, keyed by position.
88    /// Used when resuming after approval to replay completed steps.
89    replay_steps: std::collections::HashMap<u32, Step>,
90}
91
92impl WorkflowContext {
93    /// Create a new context for a run.
94    ///
95    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
96    /// creates this when executing a [`WorkflowHandler`].
97    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
98        Self {
99            run_id,
100            store,
101            provider,
102            handler_resolver: None,
103            position: 0,
104            last_step_ids: Vec::new(),
105            total_cost_usd: Decimal::ZERO,
106            total_duration_ms: 0,
107            replay_steps: std::collections::HashMap::new(),
108        }
109    }
110
111    /// Create a new context with a handler resolver for sub-workflow support.
112    ///
113    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
114    /// look up registered handlers by name.
115    pub(crate) fn with_handler_resolver(
116        run_id: Uuid,
117        store: Arc<dyn RunStore>,
118        provider: Arc<dyn AgentProvider>,
119        resolver: HandlerResolver,
120    ) -> Self {
121        Self {
122            run_id,
123            store,
124            provider,
125            handler_resolver: Some(resolver),
126            position: 0,
127            last_step_ids: Vec::new(),
128            total_cost_usd: Decimal::ZERO,
129            total_duration_ms: 0,
130            replay_steps: std::collections::HashMap::new(),
131        }
132    }
133
134    /// Load existing steps from the store for replay after approval.
135    ///
136    /// Called by the engine when resuming a run. All completed steps
137    /// and the approved approval step are indexed by position so that
138    /// `execute_step` and `approval` can skip them.
139    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
140        let steps = self.store.list_steps(self.run_id).await?;
141        for step in steps {
142            let dominated = matches!(
143                step.status.state,
144                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
145            );
146            if dominated {
147                self.replay_steps.insert(step.position, step);
148            }
149        }
150        Ok(())
151    }
152
153    /// The run ID this context is executing for.
154    pub fn run_id(&self) -> Uuid {
155        self.run_id
156    }
157
158    /// Accumulated cost across all executed steps so far.
159    pub fn total_cost_usd(&self) -> Decimal {
160        self.total_cost_usd
161    }
162
163    /// Accumulated duration across all executed steps so far.
164    pub fn total_duration_ms(&self) -> u64 {
165        self.total_duration_ms
166    }
167
168    /// Execute multiple steps concurrently (wait-all model).
169    ///
170    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
171    /// Each step is recorded with the same `position` (execution wave).
172    /// Dependencies on previous steps are recorded automatically.
173    ///
174    /// When `fail_fast` is true, remaining steps are aborted on the first
175    /// failure. When false, all steps run to completion and the first
176    /// error is returned.
177    ///
178    /// # Errors
179    ///
180    /// Returns [`EngineError`] if any step fails.
181    ///
182    /// # Examples
183    ///
184    /// ```no_run
185    /// use ironflow_engine::context::WorkflowContext;
186    /// use ironflow_engine::config::{StepConfig, ShellConfig};
187    /// use ironflow_engine::error::EngineError;
188    ///
189    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
190    /// let results = ctx.parallel(
191    ///     vec![
192    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
193    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
194    ///     ],
195    ///     true,
196    /// ).await?;
197    ///
198    /// for r in &results {
199    ///     println!("{}: {:?}", r.name, r.output.output);
200    /// }
201    /// # Ok(())
202    /// # }
203    /// ```
204    pub async fn parallel(
205        &mut self,
206        steps: Vec<(&str, StepConfig)>,
207        fail_fast: bool,
208    ) -> Result<Vec<ParallelStepResult>, EngineError> {
209        if steps.is_empty() {
210            return Ok(Vec::new());
211        }
212
213        let wave_position = self.position;
214        self.position += 1;
215
216        let now = Utc::now();
217        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
218
219        for (name, config) in &steps {
220            let kind = config.kind();
221            let step = self
222                .store
223                .create_step(NewStep {
224                    run_id: self.run_id,
225                    name: name.to_string(),
226                    kind,
227                    position: wave_position,
228                    input: Some(serde_json::to_value(config)?),
229                })
230                .await?;
231
232            self.start_step(step.id, now).await?;
233
234            step_records.push((step.id, name.to_string(), config.clone()));
235        }
236
237        let mut join_set = JoinSet::new();
238        for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
239            let provider = self.provider.clone();
240            let config = config.clone();
241            join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
242        }
243
244        // JoinSet returns in completion order; indexed_results restores input order.
245        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
246            vec![None; step_records.len()];
247        let mut first_error: Option<EngineError> = None;
248
249        while let Some(join_result) = join_set.join_next().await {
250            let (idx, step_result) = match join_result {
251                Ok(r) => r,
252                Err(e) => {
253                    if first_error.is_none() {
254                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
255                    }
256                    if fail_fast {
257                        join_set.abort_all();
258                    }
259                    continue;
260                }
261            };
262
263            let (step_id, step_name, _) = &step_records[idx];
264            let completed_at = Utc::now();
265
266            match step_result {
267                Ok(output) => {
268                    self.total_cost_usd += output.cost_usd;
269                    self.total_duration_ms += output.duration_ms;
270
271                    let debug_messages_json = output.debug_messages_json();
272
273                    self.store
274                        .update_step(
275                            *step_id,
276                            StepUpdate {
277                                status: Some(StepStatus::Completed),
278                                output: Some(output.output.clone()),
279                                duration_ms: Some(output.duration_ms),
280                                cost_usd: Some(output.cost_usd),
281                                input_tokens: output.input_tokens,
282                                output_tokens: output.output_tokens,
283                                completed_at: Some(completed_at),
284                                debug_messages: debug_messages_json,
285                                ..StepUpdate::default()
286                            },
287                        )
288                        .await?;
289
290                    info!(
291                        run_id = %self.run_id,
292                        step = %step_name,
293                        duration_ms = output.duration_ms,
294                        "parallel step completed"
295                    );
296
297                    indexed_results[idx] = Some(Ok(output));
298                }
299                Err(err) => {
300                    let err_msg = err.to_string();
301                    let debug_messages_json = extract_debug_messages_from_error(&err);
302
303                    if let Err(store_err) = self
304                        .store
305                        .update_step(
306                            *step_id,
307                            StepUpdate {
308                                status: Some(StepStatus::Failed),
309                                error: Some(err_msg.clone()),
310                                completed_at: Some(completed_at),
311                                debug_messages: debug_messages_json,
312                                ..StepUpdate::default()
313                            },
314                        )
315                        .await
316                    {
317                        tracing::error!(
318                            step_id = %step_id,
319                            error = %store_err,
320                            "failed to persist parallel step failure"
321                        );
322                    }
323
324                    indexed_results[idx] = Some(Err(err_msg.clone()));
325
326                    if first_error.is_none() {
327                        first_error = Some(err);
328                    }
329
330                    if fail_fast {
331                        join_set.abort_all();
332                    }
333                }
334            }
335        }
336
337        if let Some(err) = first_error {
338            return Err(err);
339        }
340
341        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
342
343        // Build results in original order.
344        let results: Vec<ParallelStepResult> = step_records
345            .iter()
346            .enumerate()
347            .map(|(idx, (step_id, name, _))| {
348                let output = match indexed_results[idx].take() {
349                    Some(Ok(o)) => o,
350                    _ => unreachable!("all steps succeeded if no error returned"),
351                };
352                ParallelStepResult {
353                    name: name.clone(),
354                    output,
355                    step_id: *step_id,
356                }
357            })
358            .collect();
359
360        Ok(results)
361    }
362
363    /// Execute a shell step.
364    ///
365    /// Creates the step record, runs the command, persists the result,
366    /// and returns the output for use in subsequent steps.
367    ///
368    /// # Errors
369    ///
370    /// Returns [`EngineError`] if the command fails or the store errors.
371    ///
372    /// # Examples
373    ///
374    /// ```no_run
375    /// use ironflow_engine::context::WorkflowContext;
376    /// use ironflow_engine::config::ShellConfig;
377    /// use ironflow_engine::error::EngineError;
378    ///
379    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
380    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
381    /// println!("stdout: {}", files.output["stdout"]);
382    /// # Ok(())
383    /// # }
384    /// ```
385    pub async fn shell(
386        &mut self,
387        name: &str,
388        config: ShellConfig,
389    ) -> Result<StepOutput, EngineError> {
390        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
391            .await
392    }
393
394    /// Execute an HTTP step.
395    ///
396    /// # Errors
397    ///
398    /// Returns [`EngineError`] if the request fails or the store errors.
399    ///
400    /// # Examples
401    ///
402    /// ```no_run
403    /// use ironflow_engine::context::WorkflowContext;
404    /// use ironflow_engine::config::HttpConfig;
405    /// use ironflow_engine::error::EngineError;
406    ///
407    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
408    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
409    /// println!("status: {}", resp.output["status"]);
410    /// # Ok(())
411    /// # }
412    /// ```
413    pub async fn http(
414        &mut self,
415        name: &str,
416        config: HttpConfig,
417    ) -> Result<StepOutput, EngineError> {
418        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
419            .await
420    }
421
422    /// Execute an agent step.
423    ///
424    /// # Errors
425    ///
426    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
427    ///
428    /// # Examples
429    ///
430    /// ```no_run
431    /// use ironflow_engine::context::WorkflowContext;
432    /// use ironflow_engine::config::AgentStepConfig;
433    /// use ironflow_engine::error::EngineError;
434    ///
435    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
436    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
437    /// println!("review: {}", review.output["value"]);
438    /// # Ok(())
439    /// # }
440    /// ```
441    pub async fn agent(
442        &mut self,
443        name: &str,
444        config: impl Into<AgentStepConfig>,
445    ) -> Result<StepOutput, EngineError> {
446        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
447            .await
448    }
449
450    /// Create a human approval gate.
451    ///
452    /// On first execution, records an approval step and returns
453    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
454    /// transitions the run to `AwaitingApproval`.
455    ///
456    /// On resume (after a human approved via the API), the approval step
457    /// is replayed: it is marked as `Completed` and execution continues
458    /// past it. Multiple approval gates in the same handler work -- each
459    /// one pauses and resumes independently.
460    ///
461    /// # Errors
462    ///
463    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
464    /// first execution. Returns other [`EngineError`] variants on store
465    /// failures.
466    ///
467    /// # Examples
468    ///
469    /// ```no_run
470    /// use ironflow_engine::context::WorkflowContext;
471    /// use ironflow_engine::config::ApprovalConfig;
472    /// use ironflow_engine::error::EngineError;
473    ///
474    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
475    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
476    /// // Execution continues here after approval
477    /// # Ok(())
478    /// # }
479    /// ```
480    pub async fn approval(
481        &mut self,
482        name: &str,
483        config: ApprovalConfig,
484    ) -> Result<(), EngineError> {
485        let position = self.position;
486        self.position += 1;
487
488        // Replay: if this approval step exists from a prior execution,
489        // the run was approved -- mark it completed (if not already) and continue.
490        if let Some(existing) = self.replay_steps.get(&position)
491            && existing.kind == StepKind::Approval
492        {
493            if existing.status.state == StepStatus::AwaitingApproval {
494                self.store
495                    .update_step(
496                        existing.id,
497                        StepUpdate {
498                            status: Some(StepStatus::Completed),
499                            completed_at: Some(Utc::now()),
500                            ..StepUpdate::default()
501                        },
502                    )
503                    .await?;
504            }
505
506            self.last_step_ids = vec![existing.id];
507            info!(
508                run_id = %self.run_id,
509                step = %name,
510                position,
511                "approval step replayed (approved)"
512            );
513            return Ok(());
514        }
515
516        // First execution: create the approval step and suspend.
517        let step = self
518            .store
519            .create_step(NewStep {
520                run_id: self.run_id,
521                name: name.to_string(),
522                kind: StepKind::Approval,
523                position,
524                input: Some(serde_json::to_value(&config)?),
525            })
526            .await?;
527
528        self.start_step(step.id, Utc::now()).await?;
529
530        // Transition the step to AwaitingApproval so it reflects
531        // the suspended state on the dashboard.
532        self.store
533            .update_step(
534                step.id,
535                StepUpdate {
536                    status: Some(StepStatus::AwaitingApproval),
537                    ..StepUpdate::default()
538                },
539            )
540            .await?;
541
542        self.last_step_ids = vec![step.id];
543
544        Err(EngineError::ApprovalRequired {
545            run_id: self.run_id,
546            step_id: step.id,
547            message: config.message().to_string(),
548        })
549    }
550
551    /// Execute a custom operation step.
552    ///
553    /// Runs a user-defined [`Operation`] with full step lifecycle management:
554    /// creates the step record, transitions to Running, executes the operation,
555    /// persists the output and duration, and marks the step Completed or Failed.
556    ///
557    /// The operation's [`kind()`](Operation::kind) is stored as
558    /// [`StepKind::Custom`].
559    ///
560    /// # Errors
561    ///
562    /// Returns [`EngineError`] if the operation fails or the store errors.
563    ///
564    /// # Examples
565    ///
566    /// ```no_run
567    /// use ironflow_engine::context::WorkflowContext;
568    /// use ironflow_engine::operation::Operation;
569    /// use ironflow_engine::error::EngineError;
570    /// use serde_json::{Value, json};
571    /// use std::pin::Pin;
572    /// use std::future::Future;
573    ///
574    /// struct MyOp;
575    /// impl Operation for MyOp {
576    ///     fn kind(&self) -> &str { "my-service" }
577    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
578    ///         Box::pin(async { Ok(json!({"ok": true})) })
579    ///     }
580    /// }
581    ///
582    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
583    /// let result = ctx.operation("call-service", &MyOp).await?;
584    /// println!("output: {}", result.output);
585    /// # Ok(())
586    /// # }
587    /// ```
588    pub async fn operation(
589        &mut self,
590        name: &str,
591        op: &dyn Operation,
592    ) -> Result<StepOutput, EngineError> {
593        let kind = StepKind::Custom(op.kind().to_string());
594        let position = self.position;
595        self.position += 1;
596
597        let step = self
598            .store
599            .create_step(NewStep {
600                run_id: self.run_id,
601                name: name.to_string(),
602                kind,
603                position,
604                input: op.input(),
605            })
606            .await?;
607
608        self.start_step(step.id, Utc::now()).await?;
609
610        let start = Instant::now();
611
612        match op.execute().await {
613            Ok(output_value) => {
614                let duration_ms = start.elapsed().as_millis() as u64;
615                self.total_duration_ms += duration_ms;
616
617                let completed_at = Utc::now();
618                self.store
619                    .update_step(
620                        step.id,
621                        StepUpdate {
622                            status: Some(StepStatus::Completed),
623                            output: Some(output_value.clone()),
624                            duration_ms: Some(duration_ms),
625                            cost_usd: Some(Decimal::ZERO),
626                            completed_at: Some(completed_at),
627                            ..StepUpdate::default()
628                        },
629                    )
630                    .await?;
631
632                info!(
633                    run_id = %self.run_id,
634                    step = %name,
635                    kind = op.kind(),
636                    duration_ms,
637                    "operation step completed"
638                );
639
640                self.last_step_ids = vec![step.id];
641
642                Ok(StepOutput {
643                    output: output_value,
644                    duration_ms,
645                    cost_usd: Decimal::ZERO,
646                    input_tokens: None,
647                    output_tokens: None,
648                    debug_messages: None,
649                })
650            }
651            Err(err) => {
652                let completed_at = Utc::now();
653                if let Err(store_err) = self
654                    .store
655                    .update_step(
656                        step.id,
657                        StepUpdate {
658                            status: Some(StepStatus::Failed),
659                            error: Some(err.to_string()),
660                            completed_at: Some(completed_at),
661                            ..StepUpdate::default()
662                        },
663                    )
664                    .await
665                {
666                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
667                }
668
669                Err(err)
670            }
671        }
672    }
673
674    /// Execute a sub-workflow step.
675    ///
676    /// Creates a child run for the named workflow handler, executes it with
677    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
678    /// the child run ID and aggregated metrics.
679    ///
680    /// Requires the context to be created with
681    /// `with_handler_resolver`.
682    ///
683    /// # Errors
684    ///
685    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
686    /// with the given name, or if no handler resolver is available.
687    ///
688    /// # Examples
689    ///
690    /// ```no_run
691    /// use ironflow_engine::context::WorkflowContext;
692    /// use ironflow_engine::error::EngineError;
693    /// use serde_json::json;
694    ///
695    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
696    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
697    /// # Ok(())
698    /// # }
699    /// ```
700    pub async fn workflow(
701        &mut self,
702        handler: &dyn WorkflowHandler,
703        payload: Value,
704    ) -> Result<StepOutput, EngineError> {
705        let config = WorkflowStepConfig::new(handler.name(), payload);
706        let position = self.position;
707        self.position += 1;
708
709        let step = self
710            .store
711            .create_step(NewStep {
712                run_id: self.run_id,
713                name: config.workflow_name.clone(),
714                kind: StepKind::Workflow,
715                position,
716                input: Some(serde_json::to_value(&config)?),
717            })
718            .await?;
719
720        self.start_step(step.id, Utc::now()).await?;
721
722        match self.execute_child_workflow(&config).await {
723            Ok(output) => {
724                self.total_cost_usd += output.cost_usd;
725                self.total_duration_ms += output.duration_ms;
726
727                let completed_at = Utc::now();
728                self.store
729                    .update_step(
730                        step.id,
731                        StepUpdate {
732                            status: Some(StepStatus::Completed),
733                            output: Some(output.output.clone()),
734                            duration_ms: Some(output.duration_ms),
735                            cost_usd: Some(output.cost_usd),
736                            completed_at: Some(completed_at),
737                            ..StepUpdate::default()
738                        },
739                    )
740                    .await?;
741
742                info!(
743                    run_id = %self.run_id,
744                    child_workflow = %config.workflow_name,
745                    duration_ms = output.duration_ms,
746                    "workflow step completed"
747                );
748
749                self.last_step_ids = vec![step.id];
750
751                Ok(output)
752            }
753            Err(err) => {
754                let completed_at = Utc::now();
755                if let Err(store_err) = self
756                    .store
757                    .update_step(
758                        step.id,
759                        StepUpdate {
760                            status: Some(StepStatus::Failed),
761                            error: Some(err.to_string()),
762                            completed_at: Some(completed_at),
763                            ..StepUpdate::default()
764                        },
765                    )
766                    .await
767                {
768                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
769                }
770
771                Err(err)
772            }
773        }
774    }
775
776    /// Execute a child workflow and return aggregated output.
777    async fn execute_child_workflow(
778        &self,
779        config: &WorkflowStepConfig,
780    ) -> Result<StepOutput, EngineError> {
781        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
782            EngineError::InvalidWorkflow(
783                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
784            )
785        })?;
786
787        let handler = resolver(&config.workflow_name).ok_or_else(|| {
788            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
789        })?;
790
791        let child_run = self
792            .store
793            .create_run(NewRun {
794                workflow_name: config.workflow_name.clone(),
795                trigger: TriggerKind::Workflow,
796                payload: config.payload.clone(),
797                max_retries: 0,
798            })
799            .await?;
800
801        let child_run_id = child_run.id;
802        info!(
803            parent_run_id = %self.run_id,
804            child_run_id = %child_run_id,
805            workflow = %config.workflow_name,
806            "child run created"
807        );
808
809        self.store
810            .update_run_status(child_run_id, RunStatus::Running)
811            .await?;
812
813        let run_start = Instant::now();
814        let mut child_ctx = WorkflowContext {
815            run_id: child_run_id,
816            store: self.store.clone(),
817            provider: self.provider.clone(),
818            handler_resolver: self.handler_resolver.clone(),
819            position: 0,
820            last_step_ids: Vec::new(),
821            total_cost_usd: Decimal::ZERO,
822            total_duration_ms: 0,
823            replay_steps: std::collections::HashMap::new(),
824        };
825
826        let result = handler.execute(&mut child_ctx).await;
827        let total_duration = run_start.elapsed().as_millis() as u64;
828        let completed_at = Utc::now();
829
830        match result {
831            Ok(()) => {
832                self.store
833                    .update_run(
834                        child_run_id,
835                        RunUpdate {
836                            status: Some(RunStatus::Completed),
837                            cost_usd: Some(child_ctx.total_cost_usd),
838                            duration_ms: Some(total_duration),
839                            completed_at: Some(completed_at),
840                            ..RunUpdate::default()
841                        },
842                    )
843                    .await?;
844
845                Ok(StepOutput {
846                    output: serde_json::json!({
847                        "run_id": child_run_id,
848                        "workflow_name": config.workflow_name,
849                        "status": RunStatus::Completed,
850                        "cost_usd": child_ctx.total_cost_usd,
851                        "duration_ms": total_duration,
852                    }),
853                    duration_ms: total_duration,
854                    cost_usd: child_ctx.total_cost_usd,
855                    input_tokens: None,
856                    output_tokens: None,
857                    debug_messages: None,
858                })
859            }
860            Err(err) => {
861                if let Err(store_err) = self
862                    .store
863                    .update_run(
864                        child_run_id,
865                        RunUpdate {
866                            status: Some(RunStatus::Failed),
867                            error: Some(err.to_string()),
868                            cost_usd: Some(child_ctx.total_cost_usd),
869                            duration_ms: Some(total_duration),
870                            completed_at: Some(completed_at),
871                            ..RunUpdate::default()
872                        },
873                    )
874                    .await
875                {
876                    error!(
877                        child_run_id = %child_run_id,
878                        store_error = %store_err,
879                        "failed to persist child run failure"
880                    );
881                }
882
883                Err(err)
884            }
885        }
886    }
887
888    /// Try to replay a completed step from a previous execution.
889    ///
890    /// Returns `Some(StepOutput)` if a completed step exists at the given
891    /// position, `None` otherwise.
892    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
893        let step = self.replay_steps.get(&position)?;
894        if step.status.state != StepStatus::Completed {
895            return None;
896        }
897        let output = StepOutput {
898            output: step.output.clone().unwrap_or(Value::Null),
899            duration_ms: step.duration_ms,
900            cost_usd: step.cost_usd,
901            input_tokens: step.input_tokens,
902            output_tokens: step.output_tokens,
903            debug_messages: None,
904        };
905        self.total_cost_usd += output.cost_usd;
906        self.total_duration_ms += output.duration_ms;
907        self.last_step_ids = vec![step.id];
908        info!(
909            run_id = %self.run_id,
910            step = %step.name,
911            position,
912            "step replayed from previous execution"
913        );
914        Some(output)
915    }
916
917    /// Internal: execute a step with full persistence lifecycle.
918    async fn execute_step(
919        &mut self,
920        name: &str,
921        kind: StepKind,
922        config: StepConfig,
923    ) -> Result<StepOutput, EngineError> {
924        let position = self.position;
925        self.position += 1;
926
927        // Replay: if this step already completed in a prior execution, return cached output.
928        if let Some(output) = self.try_replay_step(position) {
929            return Ok(output);
930        }
931
932        // Create step record in Pending.
933        let step = self
934            .store
935            .create_step(NewStep {
936                run_id: self.run_id,
937                name: name.to_string(),
938                kind,
939                position,
940                input: Some(serde_json::to_value(&config)?),
941            })
942            .await?;
943
944        self.start_step(step.id, Utc::now()).await?;
945
946        match execute_step_config(&config, &self.provider).await {
947            Ok(output) => {
948                self.total_cost_usd += output.cost_usd;
949                self.total_duration_ms += output.duration_ms;
950
951                let debug_messages_json = output.debug_messages_json();
952
953                let completed_at = Utc::now();
954                self.store
955                    .update_step(
956                        step.id,
957                        StepUpdate {
958                            status: Some(StepStatus::Completed),
959                            output: Some(output.output.clone()),
960                            duration_ms: Some(output.duration_ms),
961                            cost_usd: Some(output.cost_usd),
962                            input_tokens: output.input_tokens,
963                            output_tokens: output.output_tokens,
964                            completed_at: Some(completed_at),
965                            debug_messages: debug_messages_json,
966                            ..StepUpdate::default()
967                        },
968                    )
969                    .await?;
970
971                info!(
972                    run_id = %self.run_id,
973                    step = %name,
974                    duration_ms = output.duration_ms,
975                    "step completed"
976                );
977
978                self.last_step_ids = vec![step.id];
979
980                Ok(output)
981            }
982            Err(err) => {
983                let completed_at = Utc::now();
984                let debug_messages_json = extract_debug_messages_from_error(&err);
985
986                if let Err(store_err) = self
987                    .store
988                    .update_step(
989                        step.id,
990                        StepUpdate {
991                            status: Some(StepStatus::Failed),
992                            error: Some(err.to_string()),
993                            completed_at: Some(completed_at),
994                            debug_messages: debug_messages_json,
995                            ..StepUpdate::default()
996                        },
997                    )
998                    .await
999                {
1000                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1001                }
1002
1003                Err(err)
1004            }
1005        }
1006    }
1007
1008    /// Record dependency edges and transition a step to Running.
1009    ///
1010    /// Records edges from `step_id` to all `last_step_ids`, then
1011    /// transitions the step to `Running` with the given timestamp.
1012    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1013        if !self.last_step_ids.is_empty() {
1014            let deps: Vec<NewStepDependency> = self
1015                .last_step_ids
1016                .iter()
1017                .map(|&depends_on| NewStepDependency {
1018                    step_id,
1019                    depends_on,
1020                })
1021                .collect();
1022            self.store.create_step_dependencies(deps).await?;
1023        }
1024
1025        self.store
1026            .update_step(
1027                step_id,
1028                StepUpdate {
1029                    status: Some(StepStatus::Running),
1030                    started_at: Some(now),
1031                    ..StepUpdate::default()
1032                },
1033            )
1034            .await?;
1035
1036        Ok(())
1037    }
1038
1039    /// Access the store directly (advanced usage).
1040    pub fn store(&self) -> &Arc<dyn RunStore> {
1041        &self.store
1042    }
1043
1044    /// Access the payload that triggered this run.
1045    ///
1046    /// Fetches the run from the store and returns its payload.
1047    ///
1048    /// # Errors
1049    ///
1050    /// Returns [`EngineError::Store`] if the run is not found.
1051    pub async fn payload(&self) -> Result<Value, EngineError> {
1052        let run = self
1053            .store
1054            .get_run(self.run_id)
1055            .await?
1056            .ok_or(EngineError::Store(
1057                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1058            ))?;
1059        Ok(run.payload)
1060    }
1061}
1062
1063impl fmt::Debug for WorkflowContext {
1064    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1065        f.debug_struct("WorkflowContext")
1066            .field("run_id", &self.run_id)
1067            .field("position", &self.position)
1068            .field("total_cost_usd", &self.total_cost_usd)
1069            .finish_non_exhaustive()
1070    }
1071}
1072
1073/// Extract debug messages from an engine error, if it wraps a schema validation
1074/// failure that carries a verbose conversation trace.
1075fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1076    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1077        debug_messages,
1078        ..
1079    })) = err
1080        && !debug_messages.is_empty()
1081    {
1082        return serde_json::to_value(debug_messages).ok();
1083    }
1084    None
1085}