Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::provider::AgentProvider;
38use ironflow_store::models::{
39    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
40    StepUpdate, TriggerKind,
41};
42use ironflow_store::store::RunStore;
43
44use crate::config::{
45    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
46};
47use crate::error::EngineError;
48use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
49use crate::handler::WorkflowHandler;
50use crate::operation::Operation;
51
52/// Callback type for resolving workflow handlers by name.
53pub(crate) type HandlerResolver =
54    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
55
56/// Execution context for a single workflow run.
57///
58/// Tracks the current step position and provides convenience methods
59/// for executing operations with automatic persistence.
60///
61/// # Examples
62///
63/// ```no_run
64/// use ironflow_engine::context::WorkflowContext;
65/// use ironflow_engine::config::ShellConfig;
66/// use ironflow_engine::error::EngineError;
67///
68/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
69/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
70/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkflowContext {
75    run_id: Uuid,
76    store: Arc<dyn RunStore>,
77    provider: Arc<dyn AgentProvider>,
78    handler_resolver: Option<HandlerResolver>,
79    position: u32,
80    /// IDs of the last executed step(s) -- used to record DAG dependencies.
81    last_step_ids: Vec<Uuid>,
82    /// Accumulated cost across all steps in this run.
83    total_cost_usd: Decimal,
84    /// Accumulated duration across all steps.
85    total_duration_ms: u64,
86    /// Steps from a previous execution, keyed by position.
87    /// Used when resuming after approval to replay completed steps.
88    replay_steps: std::collections::HashMap<u32, Step>,
89}
90
91impl WorkflowContext {
92    /// Create a new context for a run.
93    ///
94    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
95    /// creates this when executing a [`WorkflowHandler`].
96    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97        Self {
98            run_id,
99            store,
100            provider,
101            handler_resolver: None,
102            position: 0,
103            last_step_ids: Vec::new(),
104            total_cost_usd: Decimal::ZERO,
105            total_duration_ms: 0,
106            replay_steps: std::collections::HashMap::new(),
107        }
108    }
109
110    /// Create a new context with a handler resolver for sub-workflow support.
111    ///
112    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
113    /// look up registered handlers by name.
114    pub(crate) fn with_handler_resolver(
115        run_id: Uuid,
116        store: Arc<dyn RunStore>,
117        provider: Arc<dyn AgentProvider>,
118        resolver: HandlerResolver,
119    ) -> Self {
120        Self {
121            run_id,
122            store,
123            provider,
124            handler_resolver: Some(resolver),
125            position: 0,
126            last_step_ids: Vec::new(),
127            total_cost_usd: Decimal::ZERO,
128            total_duration_ms: 0,
129            replay_steps: std::collections::HashMap::new(),
130        }
131    }
132
133    /// Load existing steps from the store for replay after approval.
134    ///
135    /// Called by the engine when resuming a run. All completed steps
136    /// and the approved approval step are indexed by position so that
137    /// `execute_step` and `approval` can skip them.
138    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
139        let steps = self.store.list_steps(self.run_id).await?;
140        for step in steps {
141            let dominated = matches!(
142                step.status.state,
143                StepStatus::Completed | StepStatus::Running
144            );
145            if dominated {
146                self.replay_steps.insert(step.position, step);
147            }
148        }
149        Ok(())
150    }
151
152    /// The run ID this context is executing for.
153    pub fn run_id(&self) -> Uuid {
154        self.run_id
155    }
156
157    /// Accumulated cost across all executed steps so far.
158    pub fn total_cost_usd(&self) -> Decimal {
159        self.total_cost_usd
160    }
161
162    /// Accumulated duration across all executed steps so far.
163    pub fn total_duration_ms(&self) -> u64 {
164        self.total_duration_ms
165    }
166
167    /// Execute multiple steps concurrently (wait-all model).
168    ///
169    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
170    /// Each step is recorded with the same `position` (execution wave).
171    /// Dependencies on previous steps are recorded automatically.
172    ///
173    /// When `fail_fast` is true, remaining steps are aborted on the first
174    /// failure. When false, all steps run to completion and the first
175    /// error is returned.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`EngineError`] if any step fails.
180    ///
181    /// # Examples
182    ///
183    /// ```no_run
184    /// use ironflow_engine::context::WorkflowContext;
185    /// use ironflow_engine::config::{StepConfig, ShellConfig};
186    /// use ironflow_engine::error::EngineError;
187    ///
188    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
189    /// let results = ctx.parallel(
190    ///     vec![
191    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
192    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
193    ///     ],
194    ///     true,
195    /// ).await?;
196    ///
197    /// for r in &results {
198    ///     println!("{}: {:?}", r.name, r.output.output);
199    /// }
200    /// # Ok(())
201    /// # }
202    /// ```
203    pub async fn parallel(
204        &mut self,
205        steps: Vec<(&str, StepConfig)>,
206        fail_fast: bool,
207    ) -> Result<Vec<ParallelStepResult>, EngineError> {
208        if steps.is_empty() {
209            return Ok(Vec::new());
210        }
211
212        let wave_position = self.position;
213        self.position += 1;
214
215        let now = Utc::now();
216        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
217
218        for (name, config) in &steps {
219            let kind = config.kind();
220            let step = self
221                .store
222                .create_step(NewStep {
223                    run_id: self.run_id,
224                    name: name.to_string(),
225                    kind,
226                    position: wave_position,
227                    input: Some(serde_json::to_value(config)?),
228                })
229                .await?;
230
231            self.start_step(step.id, now).await?;
232
233            step_records.push((step.id, name.to_string(), config.clone()));
234        }
235
236        let mut join_set = JoinSet::new();
237        for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
238            let provider = self.provider.clone();
239            let config = config.clone();
240            join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
241        }
242
243        // JoinSet returns in completion order; indexed_results restores input order.
244        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
245            vec![None; step_records.len()];
246        let mut first_error: Option<EngineError> = None;
247
248        while let Some(join_result) = join_set.join_next().await {
249            let (idx, step_result) = match join_result {
250                Ok(r) => r,
251                Err(e) => {
252                    if first_error.is_none() {
253                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
254                    }
255                    if fail_fast {
256                        join_set.abort_all();
257                    }
258                    continue;
259                }
260            };
261
262            let (step_id, step_name, _) = &step_records[idx];
263            let completed_at = Utc::now();
264
265            match step_result {
266                Ok(output) => {
267                    self.total_cost_usd += output.cost_usd;
268                    self.total_duration_ms += output.duration_ms;
269
270                    self.store
271                        .update_step(
272                            *step_id,
273                            StepUpdate {
274                                status: Some(StepStatus::Completed),
275                                output: Some(output.output.clone()),
276                                duration_ms: Some(output.duration_ms),
277                                cost_usd: Some(output.cost_usd),
278                                input_tokens: output.input_tokens,
279                                output_tokens: output.output_tokens,
280                                completed_at: Some(completed_at),
281                                ..StepUpdate::default()
282                            },
283                        )
284                        .await?;
285
286                    info!(
287                        run_id = %self.run_id,
288                        step = %step_name,
289                        duration_ms = output.duration_ms,
290                        "parallel step completed"
291                    );
292
293                    indexed_results[idx] = Some(Ok(output));
294                }
295                Err(err) => {
296                    let err_msg = err.to_string();
297
298                    if let Err(store_err) = self
299                        .store
300                        .update_step(
301                            *step_id,
302                            StepUpdate {
303                                status: Some(StepStatus::Failed),
304                                error: Some(err_msg.clone()),
305                                completed_at: Some(completed_at),
306                                ..StepUpdate::default()
307                            },
308                        )
309                        .await
310                    {
311                        tracing::error!(
312                            step_id = %step_id,
313                            error = %store_err,
314                            "failed to persist parallel step failure"
315                        );
316                    }
317
318                    indexed_results[idx] = Some(Err(err_msg.clone()));
319
320                    if first_error.is_none() {
321                        first_error = Some(err);
322                    }
323
324                    if fail_fast {
325                        join_set.abort_all();
326                    }
327                }
328            }
329        }
330
331        if let Some(err) = first_error {
332            return Err(err);
333        }
334
335        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
336
337        // Build results in original order.
338        let results: Vec<ParallelStepResult> = step_records
339            .iter()
340            .enumerate()
341            .map(|(idx, (step_id, name, _))| {
342                let output = match indexed_results[idx].take() {
343                    Some(Ok(o)) => o,
344                    _ => unreachable!("all steps succeeded if no error returned"),
345                };
346                ParallelStepResult {
347                    name: name.clone(),
348                    output,
349                    step_id: *step_id,
350                }
351            })
352            .collect();
353
354        Ok(results)
355    }
356
357    /// Execute a shell step.
358    ///
359    /// Creates the step record, runs the command, persists the result,
360    /// and returns the output for use in subsequent steps.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`EngineError`] if the command fails or the store errors.
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// use ironflow_engine::context::WorkflowContext;
370    /// use ironflow_engine::config::ShellConfig;
371    /// use ironflow_engine::error::EngineError;
372    ///
373    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
375    /// println!("stdout: {}", files.output["stdout"]);
376    /// # Ok(())
377    /// # }
378    /// ```
379    pub async fn shell(
380        &mut self,
381        name: &str,
382        config: ShellConfig,
383    ) -> Result<StepOutput, EngineError> {
384        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
385            .await
386    }
387
388    /// Execute an HTTP step.
389    ///
390    /// # Errors
391    ///
392    /// Returns [`EngineError`] if the request fails or the store errors.
393    ///
394    /// # Examples
395    ///
396    /// ```no_run
397    /// use ironflow_engine::context::WorkflowContext;
398    /// use ironflow_engine::config::HttpConfig;
399    /// use ironflow_engine::error::EngineError;
400    ///
401    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
403    /// println!("status: {}", resp.output["status"]);
404    /// # Ok(())
405    /// # }
406    /// ```
407    pub async fn http(
408        &mut self,
409        name: &str,
410        config: HttpConfig,
411    ) -> Result<StepOutput, EngineError> {
412        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
413            .await
414    }
415
416    /// Execute an agent step.
417    ///
418    /// # Errors
419    ///
420    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
421    ///
422    /// # Examples
423    ///
424    /// ```no_run
425    /// use ironflow_engine::context::WorkflowContext;
426    /// use ironflow_engine::config::AgentStepConfig;
427    /// use ironflow_engine::error::EngineError;
428    ///
429    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
430    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
431    /// println!("review: {}", review.output["value"]);
432    /// # Ok(())
433    /// # }
434    /// ```
435    pub async fn agent(
436        &mut self,
437        name: &str,
438        config: AgentStepConfig,
439    ) -> Result<StepOutput, EngineError> {
440        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
441            .await
442    }
443
444    /// Create a human approval gate.
445    ///
446    /// On first execution, records an approval step and returns
447    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
448    /// transitions the run to `AwaitingApproval`.
449    ///
450    /// On resume (after a human approved via the API), the approval step
451    /// is replayed: it is marked as `Completed` and execution continues
452    /// past it. Multiple approval gates in the same handler work -- each
453    /// one pauses and resumes independently.
454    ///
455    /// # Errors
456    ///
457    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
458    /// first execution. Returns other [`EngineError`] variants on store
459    /// failures.
460    ///
461    /// # Examples
462    ///
463    /// ```no_run
464    /// use ironflow_engine::context::WorkflowContext;
465    /// use ironflow_engine::config::ApprovalConfig;
466    /// use ironflow_engine::error::EngineError;
467    ///
468    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
469    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
470    /// // Execution continues here after approval
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub async fn approval(
475        &mut self,
476        name: &str,
477        config: ApprovalConfig,
478    ) -> Result<(), EngineError> {
479        let position = self.position;
480        self.position += 1;
481
482        // Replay: if this approval step exists from a prior execution,
483        // the run was approved -- mark it completed (if not already) and continue.
484        if let Some(existing) = self.replay_steps.get(&position)
485            && existing.kind == StepKind::Approval
486        {
487            if existing.status.state == StepStatus::Running {
488                self.store
489                    .update_step(
490                        existing.id,
491                        StepUpdate {
492                            status: Some(StepStatus::Completed),
493                            completed_at: Some(Utc::now()),
494                            ..StepUpdate::default()
495                        },
496                    )
497                    .await?;
498            }
499
500            self.last_step_ids = vec![existing.id];
501            info!(
502                run_id = %self.run_id,
503                step = %name,
504                position,
505                "approval step replayed (approved)"
506            );
507            return Ok(());
508        }
509
510        // First execution: create the approval step and suspend.
511        let step = self
512            .store
513            .create_step(NewStep {
514                run_id: self.run_id,
515                name: name.to_string(),
516                kind: StepKind::Approval,
517                position,
518                input: Some(serde_json::to_value(&config)?),
519            })
520            .await?;
521
522        self.start_step(step.id, Utc::now()).await?;
523
524        self.last_step_ids = vec![step.id];
525
526        Err(EngineError::ApprovalRequired {
527            run_id: self.run_id,
528            step_id: step.id,
529            message: config.message().to_string(),
530        })
531    }
532
533    /// Execute a custom operation step.
534    ///
535    /// Runs a user-defined [`Operation`] with full step lifecycle management:
536    /// creates the step record, transitions to Running, executes the operation,
537    /// persists the output and duration, and marks the step Completed or Failed.
538    ///
539    /// The operation's [`kind()`](Operation::kind) is stored as
540    /// [`StepKind::Custom`].
541    ///
542    /// # Errors
543    ///
544    /// Returns [`EngineError`] if the operation fails or the store errors.
545    ///
546    /// # Examples
547    ///
548    /// ```no_run
549    /// use ironflow_engine::context::WorkflowContext;
550    /// use ironflow_engine::operation::Operation;
551    /// use ironflow_engine::error::EngineError;
552    /// use serde_json::{Value, json};
553    /// use std::pin::Pin;
554    /// use std::future::Future;
555    ///
556    /// struct MyOp;
557    /// impl Operation for MyOp {
558    ///     fn kind(&self) -> &str { "my-service" }
559    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
560    ///         Box::pin(async { Ok(json!({"ok": true})) })
561    ///     }
562    /// }
563    ///
564    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
565    /// let result = ctx.operation("call-service", &MyOp).await?;
566    /// println!("output: {}", result.output);
567    /// # Ok(())
568    /// # }
569    /// ```
570    pub async fn operation(
571        &mut self,
572        name: &str,
573        op: &dyn Operation,
574    ) -> Result<StepOutput, EngineError> {
575        let kind = StepKind::Custom(op.kind().to_string());
576        let position = self.position;
577        self.position += 1;
578
579        let step = self
580            .store
581            .create_step(NewStep {
582                run_id: self.run_id,
583                name: name.to_string(),
584                kind,
585                position,
586                input: op.input(),
587            })
588            .await?;
589
590        self.start_step(step.id, Utc::now()).await?;
591
592        let start = Instant::now();
593
594        match op.execute().await {
595            Ok(output_value) => {
596                let duration_ms = start.elapsed().as_millis() as u64;
597                self.total_duration_ms += duration_ms;
598
599                let completed_at = Utc::now();
600                self.store
601                    .update_step(
602                        step.id,
603                        StepUpdate {
604                            status: Some(StepStatus::Completed),
605                            output: Some(output_value.clone()),
606                            duration_ms: Some(duration_ms),
607                            cost_usd: Some(Decimal::ZERO),
608                            completed_at: Some(completed_at),
609                            ..StepUpdate::default()
610                        },
611                    )
612                    .await?;
613
614                info!(
615                    run_id = %self.run_id,
616                    step = %name,
617                    kind = op.kind(),
618                    duration_ms,
619                    "operation step completed"
620                );
621
622                self.last_step_ids = vec![step.id];
623
624                Ok(StepOutput {
625                    output: output_value,
626                    duration_ms,
627                    cost_usd: Decimal::ZERO,
628                    input_tokens: None,
629                    output_tokens: None,
630                })
631            }
632            Err(err) => {
633                let completed_at = Utc::now();
634                if let Err(store_err) = self
635                    .store
636                    .update_step(
637                        step.id,
638                        StepUpdate {
639                            status: Some(StepStatus::Failed),
640                            error: Some(err.to_string()),
641                            completed_at: Some(completed_at),
642                            ..StepUpdate::default()
643                        },
644                    )
645                    .await
646                {
647                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
648                }
649
650                Err(err)
651            }
652        }
653    }
654
655    /// Execute a sub-workflow step.
656    ///
657    /// Creates a child run for the named workflow handler, executes it with
658    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
659    /// the child run ID and aggregated metrics.
660    ///
661    /// Requires the context to be created with
662    /// `with_handler_resolver`.
663    ///
664    /// # Errors
665    ///
666    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
667    /// with the given name, or if no handler resolver is available.
668    ///
669    /// # Examples
670    ///
671    /// ```no_run
672    /// use ironflow_engine::context::WorkflowContext;
673    /// use ironflow_engine::error::EngineError;
674    /// use serde_json::json;
675    ///
676    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
677    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
678    /// # Ok(())
679    /// # }
680    /// ```
681    pub async fn workflow(
682        &mut self,
683        handler: &dyn WorkflowHandler,
684        payload: Value,
685    ) -> Result<StepOutput, EngineError> {
686        let config = WorkflowStepConfig::new(handler.name(), payload);
687        let position = self.position;
688        self.position += 1;
689
690        let step = self
691            .store
692            .create_step(NewStep {
693                run_id: self.run_id,
694                name: config.workflow_name.clone(),
695                kind: StepKind::Workflow,
696                position,
697                input: Some(serde_json::to_value(&config)?),
698            })
699            .await?;
700
701        self.start_step(step.id, Utc::now()).await?;
702
703        match self.execute_child_workflow(&config).await {
704            Ok(output) => {
705                self.total_cost_usd += output.cost_usd;
706                self.total_duration_ms += output.duration_ms;
707
708                let completed_at = Utc::now();
709                self.store
710                    .update_step(
711                        step.id,
712                        StepUpdate {
713                            status: Some(StepStatus::Completed),
714                            output: Some(output.output.clone()),
715                            duration_ms: Some(output.duration_ms),
716                            cost_usd: Some(output.cost_usd),
717                            completed_at: Some(completed_at),
718                            ..StepUpdate::default()
719                        },
720                    )
721                    .await?;
722
723                info!(
724                    run_id = %self.run_id,
725                    child_workflow = %config.workflow_name,
726                    duration_ms = output.duration_ms,
727                    "workflow step completed"
728                );
729
730                self.last_step_ids = vec![step.id];
731
732                Ok(output)
733            }
734            Err(err) => {
735                let completed_at = Utc::now();
736                if let Err(store_err) = self
737                    .store
738                    .update_step(
739                        step.id,
740                        StepUpdate {
741                            status: Some(StepStatus::Failed),
742                            error: Some(err.to_string()),
743                            completed_at: Some(completed_at),
744                            ..StepUpdate::default()
745                        },
746                    )
747                    .await
748                {
749                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
750                }
751
752                Err(err)
753            }
754        }
755    }
756
757    /// Execute a child workflow and return aggregated output.
758    async fn execute_child_workflow(
759        &self,
760        config: &WorkflowStepConfig,
761    ) -> Result<StepOutput, EngineError> {
762        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
763            EngineError::InvalidWorkflow(
764                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
765            )
766        })?;
767
768        let handler = resolver(&config.workflow_name).ok_or_else(|| {
769            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
770        })?;
771
772        let child_run = self
773            .store
774            .create_run(NewRun {
775                workflow_name: config.workflow_name.clone(),
776                trigger: TriggerKind::Workflow,
777                payload: config.payload.clone(),
778                max_retries: 0,
779            })
780            .await?;
781
782        let child_run_id = child_run.id;
783        info!(
784            parent_run_id = %self.run_id,
785            child_run_id = %child_run_id,
786            workflow = %config.workflow_name,
787            "child run created"
788        );
789
790        self.store
791            .update_run_status(child_run_id, RunStatus::Running)
792            .await?;
793
794        let run_start = Instant::now();
795        let mut child_ctx = WorkflowContext {
796            run_id: child_run_id,
797            store: self.store.clone(),
798            provider: self.provider.clone(),
799            handler_resolver: self.handler_resolver.clone(),
800            position: 0,
801            last_step_ids: Vec::new(),
802            total_cost_usd: Decimal::ZERO,
803            total_duration_ms: 0,
804            replay_steps: std::collections::HashMap::new(),
805        };
806
807        let result = handler.execute(&mut child_ctx).await;
808        let total_duration = run_start.elapsed().as_millis() as u64;
809        let completed_at = Utc::now();
810
811        match result {
812            Ok(()) => {
813                self.store
814                    .update_run(
815                        child_run_id,
816                        RunUpdate {
817                            status: Some(RunStatus::Completed),
818                            cost_usd: Some(child_ctx.total_cost_usd),
819                            duration_ms: Some(total_duration),
820                            completed_at: Some(completed_at),
821                            ..RunUpdate::default()
822                        },
823                    )
824                    .await?;
825
826                Ok(StepOutput {
827                    output: serde_json::json!({
828                        "run_id": child_run_id,
829                        "workflow_name": config.workflow_name,
830                        "status": RunStatus::Completed,
831                        "cost_usd": child_ctx.total_cost_usd,
832                        "duration_ms": total_duration,
833                    }),
834                    duration_ms: total_duration,
835                    cost_usd: child_ctx.total_cost_usd,
836                    input_tokens: None,
837                    output_tokens: None,
838                })
839            }
840            Err(err) => {
841                if let Err(store_err) = self
842                    .store
843                    .update_run(
844                        child_run_id,
845                        RunUpdate {
846                            status: Some(RunStatus::Failed),
847                            error: Some(err.to_string()),
848                            cost_usd: Some(child_ctx.total_cost_usd),
849                            duration_ms: Some(total_duration),
850                            completed_at: Some(completed_at),
851                            ..RunUpdate::default()
852                        },
853                    )
854                    .await
855                {
856                    error!(
857                        child_run_id = %child_run_id,
858                        store_error = %store_err,
859                        "failed to persist child run failure"
860                    );
861                }
862
863                Err(err)
864            }
865        }
866    }
867
868    /// Try to replay a completed step from a previous execution.
869    ///
870    /// Returns `Some(StepOutput)` if a completed step exists at the given
871    /// position, `None` otherwise.
872    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
873        let step = self.replay_steps.get(&position)?;
874        if step.status.state != StepStatus::Completed {
875            return None;
876        }
877        let output = StepOutput {
878            output: step.output.clone().unwrap_or(Value::Null),
879            duration_ms: step.duration_ms,
880            cost_usd: step.cost_usd,
881            input_tokens: step.input_tokens,
882            output_tokens: step.output_tokens,
883        };
884        self.total_cost_usd += output.cost_usd;
885        self.total_duration_ms += output.duration_ms;
886        self.last_step_ids = vec![step.id];
887        info!(
888            run_id = %self.run_id,
889            step = %step.name,
890            position,
891            "step replayed from previous execution"
892        );
893        Some(output)
894    }
895
896    /// Internal: execute a step with full persistence lifecycle.
897    async fn execute_step(
898        &mut self,
899        name: &str,
900        kind: StepKind,
901        config: StepConfig,
902    ) -> Result<StepOutput, EngineError> {
903        let position = self.position;
904        self.position += 1;
905
906        // Replay: if this step already completed in a prior execution, return cached output.
907        if let Some(output) = self.try_replay_step(position) {
908            return Ok(output);
909        }
910
911        // Create step record in Pending.
912        let step = self
913            .store
914            .create_step(NewStep {
915                run_id: self.run_id,
916                name: name.to_string(),
917                kind,
918                position,
919                input: Some(serde_json::to_value(&config)?),
920            })
921            .await?;
922
923        self.start_step(step.id, Utc::now()).await?;
924
925        match execute_step_config(&config, &self.provider).await {
926            Ok(output) => {
927                self.total_cost_usd += output.cost_usd;
928                self.total_duration_ms += output.duration_ms;
929
930                let completed_at = Utc::now();
931                self.store
932                    .update_step(
933                        step.id,
934                        StepUpdate {
935                            status: Some(StepStatus::Completed),
936                            output: Some(output.output.clone()),
937                            duration_ms: Some(output.duration_ms),
938                            cost_usd: Some(output.cost_usd),
939                            input_tokens: output.input_tokens,
940                            output_tokens: output.output_tokens,
941                            completed_at: Some(completed_at),
942                            ..StepUpdate::default()
943                        },
944                    )
945                    .await?;
946
947                info!(
948                    run_id = %self.run_id,
949                    step = %name,
950                    duration_ms = output.duration_ms,
951                    "step completed"
952                );
953
954                self.last_step_ids = vec![step.id];
955
956                Ok(output)
957            }
958            Err(err) => {
959                let completed_at = Utc::now();
960                if let Err(store_err) = self
961                    .store
962                    .update_step(
963                        step.id,
964                        StepUpdate {
965                            status: Some(StepStatus::Failed),
966                            error: Some(err.to_string()),
967                            completed_at: Some(completed_at),
968                            ..StepUpdate::default()
969                        },
970                    )
971                    .await
972                {
973                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
974                }
975
976                Err(err)
977            }
978        }
979    }
980
981    /// Record dependency edges and transition a step to Running.
982    ///
983    /// Records edges from `step_id` to all `last_step_ids`, then
984    /// transitions the step to `Running` with the given timestamp.
985    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
986        if !self.last_step_ids.is_empty() {
987            let deps: Vec<NewStepDependency> = self
988                .last_step_ids
989                .iter()
990                .map(|&depends_on| NewStepDependency {
991                    step_id,
992                    depends_on,
993                })
994                .collect();
995            self.store.create_step_dependencies(deps).await?;
996        }
997
998        self.store
999            .update_step(
1000                step_id,
1001                StepUpdate {
1002                    status: Some(StepStatus::Running),
1003                    started_at: Some(now),
1004                    ..StepUpdate::default()
1005                },
1006            )
1007            .await?;
1008
1009        Ok(())
1010    }
1011
1012    /// Access the store directly (advanced usage).
1013    pub fn store(&self) -> &Arc<dyn RunStore> {
1014        &self.store
1015    }
1016
1017    /// Access the payload that triggered this run.
1018    ///
1019    /// Fetches the run from the store and returns its payload.
1020    ///
1021    /// # Errors
1022    ///
1023    /// Returns [`EngineError::Store`] if the run is not found.
1024    pub async fn payload(&self) -> Result<Value, EngineError> {
1025        let run = self
1026            .store
1027            .get_run(self.run_id)
1028            .await?
1029            .ok_or(EngineError::Store(
1030                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1031            ))?;
1032        Ok(run.payload)
1033    }
1034}
1035
1036impl fmt::Debug for WorkflowContext {
1037    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1038        f.debug_struct("WorkflowContext")
1039            .field("run_id", &self.run_id)
1040            .field("position", &self.position)
1041            .field("total_cost_usd", &self.total_cost_usd)
1042            .finish_non_exhaustive()
1043    }
1044}