Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::provider::AgentProvider;
38use ironflow_store::models::{
39    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
40    StepUpdate, TriggerKind,
41};
42use ironflow_store::store::RunStore;
43
44use crate::config::{
45    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
46};
47use crate::error::EngineError;
48use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
49use crate::handler::WorkflowHandler;
50use crate::operation::Operation;
51
52/// Callback type for resolving workflow handlers by name.
53pub(crate) type HandlerResolver =
54    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
55
56/// Execution context for a single workflow run.
57///
58/// Tracks the current step position and provides convenience methods
59/// for executing operations with automatic persistence.
60///
61/// # Examples
62///
63/// ```no_run
64/// use ironflow_engine::context::WorkflowContext;
65/// use ironflow_engine::config::ShellConfig;
66/// use ironflow_engine::error::EngineError;
67///
68/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
69/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
70/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkflowContext {
75    run_id: Uuid,
76    store: Arc<dyn RunStore>,
77    provider: Arc<dyn AgentProvider>,
78    handler_resolver: Option<HandlerResolver>,
79    position: u32,
80    /// IDs of the last executed step(s) -- used to record DAG dependencies.
81    last_step_ids: Vec<Uuid>,
82    /// Accumulated cost across all steps in this run.
83    total_cost_usd: Decimal,
84    /// Accumulated duration across all steps.
85    total_duration_ms: u64,
86    /// Steps from a previous execution, keyed by position.
87    /// Used when resuming after approval to replay completed steps.
88    replay_steps: std::collections::HashMap<u32, Step>,
89}
90
91impl WorkflowContext {
92    /// Create a new context for a run.
93    ///
94    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
95    /// creates this when executing a [`WorkflowHandler`].
96    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97        Self {
98            run_id,
99            store,
100            provider,
101            handler_resolver: None,
102            position: 0,
103            last_step_ids: Vec::new(),
104            total_cost_usd: Decimal::ZERO,
105            total_duration_ms: 0,
106            replay_steps: std::collections::HashMap::new(),
107        }
108    }
109
110    /// Create a new context with a handler resolver for sub-workflow support.
111    ///
112    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
113    /// look up registered handlers by name.
114    pub(crate) fn with_handler_resolver(
115        run_id: Uuid,
116        store: Arc<dyn RunStore>,
117        provider: Arc<dyn AgentProvider>,
118        resolver: HandlerResolver,
119    ) -> Self {
120        Self {
121            run_id,
122            store,
123            provider,
124            handler_resolver: Some(resolver),
125            position: 0,
126            last_step_ids: Vec::new(),
127            total_cost_usd: Decimal::ZERO,
128            total_duration_ms: 0,
129            replay_steps: std::collections::HashMap::new(),
130        }
131    }
132
133    /// Load existing steps from the store for replay after approval.
134    ///
135    /// Called by the engine when resuming a run. All completed steps
136    /// and the approved approval step are indexed by position so that
137    /// `execute_step` and `approval` can skip them.
138    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
139        let steps = self.store.list_steps(self.run_id).await?;
140        for step in steps {
141            let dominated = matches!(
142                step.status.state,
143                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
144            );
145            if dominated {
146                self.replay_steps.insert(step.position, step);
147            }
148        }
149        Ok(())
150    }
151
152    /// The run ID this context is executing for.
153    pub fn run_id(&self) -> Uuid {
154        self.run_id
155    }
156
157    /// Accumulated cost across all executed steps so far.
158    pub fn total_cost_usd(&self) -> Decimal {
159        self.total_cost_usd
160    }
161
162    /// Accumulated duration across all executed steps so far.
163    pub fn total_duration_ms(&self) -> u64 {
164        self.total_duration_ms
165    }
166
167    /// Execute multiple steps concurrently (wait-all model).
168    ///
169    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
170    /// Each step is recorded with the same `position` (execution wave).
171    /// Dependencies on previous steps are recorded automatically.
172    ///
173    /// When `fail_fast` is true, remaining steps are aborted on the first
174    /// failure. When false, all steps run to completion and the first
175    /// error is returned.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`EngineError`] if any step fails.
180    ///
181    /// # Examples
182    ///
183    /// ```no_run
184    /// use ironflow_engine::context::WorkflowContext;
185    /// use ironflow_engine::config::{StepConfig, ShellConfig};
186    /// use ironflow_engine::error::EngineError;
187    ///
188    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
189    /// let results = ctx.parallel(
190    ///     vec![
191    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
192    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
193    ///     ],
194    ///     true,
195    /// ).await?;
196    ///
197    /// for r in &results {
198    ///     println!("{}: {:?}", r.name, r.output.output);
199    /// }
200    /// # Ok(())
201    /// # }
202    /// ```
203    pub async fn parallel(
204        &mut self,
205        steps: Vec<(&str, StepConfig)>,
206        fail_fast: bool,
207    ) -> Result<Vec<ParallelStepResult>, EngineError> {
208        if steps.is_empty() {
209            return Ok(Vec::new());
210        }
211
212        let wave_position = self.position;
213        self.position += 1;
214
215        let now = Utc::now();
216        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
217
218        for (name, config) in &steps {
219            let kind = config.kind();
220            let step = self
221                .store
222                .create_step(NewStep {
223                    run_id: self.run_id,
224                    name: name.to_string(),
225                    kind,
226                    position: wave_position,
227                    input: Some(serde_json::to_value(config)?),
228                })
229                .await?;
230
231            self.start_step(step.id, now).await?;
232
233            step_records.push((step.id, name.to_string(), config.clone()));
234        }
235
236        let mut join_set = JoinSet::new();
237        for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
238            let provider = self.provider.clone();
239            let config = config.clone();
240            join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
241        }
242
243        // JoinSet returns in completion order; indexed_results restores input order.
244        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
245            vec![None; step_records.len()];
246        let mut first_error: Option<EngineError> = None;
247
248        while let Some(join_result) = join_set.join_next().await {
249            let (idx, step_result) = match join_result {
250                Ok(r) => r,
251                Err(e) => {
252                    if first_error.is_none() {
253                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
254                    }
255                    if fail_fast {
256                        join_set.abort_all();
257                    }
258                    continue;
259                }
260            };
261
262            let (step_id, step_name, _) = &step_records[idx];
263            let completed_at = Utc::now();
264
265            match step_result {
266                Ok(output) => {
267                    self.total_cost_usd += output.cost_usd;
268                    self.total_duration_ms += output.duration_ms;
269
270                    self.store
271                        .update_step(
272                            *step_id,
273                            StepUpdate {
274                                status: Some(StepStatus::Completed),
275                                output: Some(output.output.clone()),
276                                duration_ms: Some(output.duration_ms),
277                                cost_usd: Some(output.cost_usd),
278                                input_tokens: output.input_tokens,
279                                output_tokens: output.output_tokens,
280                                completed_at: Some(completed_at),
281                                ..StepUpdate::default()
282                            },
283                        )
284                        .await?;
285
286                    info!(
287                        run_id = %self.run_id,
288                        step = %step_name,
289                        duration_ms = output.duration_ms,
290                        "parallel step completed"
291                    );
292
293                    indexed_results[idx] = Some(Ok(output));
294                }
295                Err(err) => {
296                    let err_msg = err.to_string();
297
298                    if let Err(store_err) = self
299                        .store
300                        .update_step(
301                            *step_id,
302                            StepUpdate {
303                                status: Some(StepStatus::Failed),
304                                error: Some(err_msg.clone()),
305                                completed_at: Some(completed_at),
306                                ..StepUpdate::default()
307                            },
308                        )
309                        .await
310                    {
311                        tracing::error!(
312                            step_id = %step_id,
313                            error = %store_err,
314                            "failed to persist parallel step failure"
315                        );
316                    }
317
318                    indexed_results[idx] = Some(Err(err_msg.clone()));
319
320                    if first_error.is_none() {
321                        first_error = Some(err);
322                    }
323
324                    if fail_fast {
325                        join_set.abort_all();
326                    }
327                }
328            }
329        }
330
331        if let Some(err) = first_error {
332            return Err(err);
333        }
334
335        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
336
337        // Build results in original order.
338        let results: Vec<ParallelStepResult> = step_records
339            .iter()
340            .enumerate()
341            .map(|(idx, (step_id, name, _))| {
342                let output = match indexed_results[idx].take() {
343                    Some(Ok(o)) => o,
344                    _ => unreachable!("all steps succeeded if no error returned"),
345                };
346                ParallelStepResult {
347                    name: name.clone(),
348                    output,
349                    step_id: *step_id,
350                }
351            })
352            .collect();
353
354        Ok(results)
355    }
356
357    /// Execute a shell step.
358    ///
359    /// Creates the step record, runs the command, persists the result,
360    /// and returns the output for use in subsequent steps.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`EngineError`] if the command fails or the store errors.
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// use ironflow_engine::context::WorkflowContext;
370    /// use ironflow_engine::config::ShellConfig;
371    /// use ironflow_engine::error::EngineError;
372    ///
373    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
375    /// println!("stdout: {}", files.output["stdout"]);
376    /// # Ok(())
377    /// # }
378    /// ```
379    pub async fn shell(
380        &mut self,
381        name: &str,
382        config: ShellConfig,
383    ) -> Result<StepOutput, EngineError> {
384        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
385            .await
386    }
387
388    /// Execute an HTTP step.
389    ///
390    /// # Errors
391    ///
392    /// Returns [`EngineError`] if the request fails or the store errors.
393    ///
394    /// # Examples
395    ///
396    /// ```no_run
397    /// use ironflow_engine::context::WorkflowContext;
398    /// use ironflow_engine::config::HttpConfig;
399    /// use ironflow_engine::error::EngineError;
400    ///
401    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
403    /// println!("status: {}", resp.output["status"]);
404    /// # Ok(())
405    /// # }
406    /// ```
407    pub async fn http(
408        &mut self,
409        name: &str,
410        config: HttpConfig,
411    ) -> Result<StepOutput, EngineError> {
412        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
413            .await
414    }
415
416    /// Execute an agent step.
417    ///
418    /// # Errors
419    ///
420    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
421    ///
422    /// # Examples
423    ///
424    /// ```no_run
425    /// use ironflow_engine::context::WorkflowContext;
426    /// use ironflow_engine::config::AgentStepConfig;
427    /// use ironflow_engine::error::EngineError;
428    ///
429    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
430    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
431    /// println!("review: {}", review.output["value"]);
432    /// # Ok(())
433    /// # }
434    /// ```
435    pub async fn agent(
436        &mut self,
437        name: &str,
438        config: AgentStepConfig,
439    ) -> Result<StepOutput, EngineError> {
440        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
441            .await
442    }
443
444    /// Create a human approval gate.
445    ///
446    /// On first execution, records an approval step and returns
447    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
448    /// transitions the run to `AwaitingApproval`.
449    ///
450    /// On resume (after a human approved via the API), the approval step
451    /// is replayed: it is marked as `Completed` and execution continues
452    /// past it. Multiple approval gates in the same handler work -- each
453    /// one pauses and resumes independently.
454    ///
455    /// # Errors
456    ///
457    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
458    /// first execution. Returns other [`EngineError`] variants on store
459    /// failures.
460    ///
461    /// # Examples
462    ///
463    /// ```no_run
464    /// use ironflow_engine::context::WorkflowContext;
465    /// use ironflow_engine::config::ApprovalConfig;
466    /// use ironflow_engine::error::EngineError;
467    ///
468    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
469    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
470    /// // Execution continues here after approval
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub async fn approval(
475        &mut self,
476        name: &str,
477        config: ApprovalConfig,
478    ) -> Result<(), EngineError> {
479        let position = self.position;
480        self.position += 1;
481
482        // Replay: if this approval step exists from a prior execution,
483        // the run was approved -- mark it completed (if not already) and continue.
484        if let Some(existing) = self.replay_steps.get(&position)
485            && existing.kind == StepKind::Approval
486        {
487            if existing.status.state == StepStatus::AwaitingApproval {
488                self.store
489                    .update_step(
490                        existing.id,
491                        StepUpdate {
492                            status: Some(StepStatus::Completed),
493                            completed_at: Some(Utc::now()),
494                            ..StepUpdate::default()
495                        },
496                    )
497                    .await?;
498            }
499
500            self.last_step_ids = vec![existing.id];
501            info!(
502                run_id = %self.run_id,
503                step = %name,
504                position,
505                "approval step replayed (approved)"
506            );
507            return Ok(());
508        }
509
510        // First execution: create the approval step and suspend.
511        let step = self
512            .store
513            .create_step(NewStep {
514                run_id: self.run_id,
515                name: name.to_string(),
516                kind: StepKind::Approval,
517                position,
518                input: Some(serde_json::to_value(&config)?),
519            })
520            .await?;
521
522        self.start_step(step.id, Utc::now()).await?;
523
524        // Transition the step to AwaitingApproval so it reflects
525        // the suspended state on the dashboard.
526        self.store
527            .update_step(
528                step.id,
529                StepUpdate {
530                    status: Some(StepStatus::AwaitingApproval),
531                    ..StepUpdate::default()
532                },
533            )
534            .await?;
535
536        self.last_step_ids = vec![step.id];
537
538        Err(EngineError::ApprovalRequired {
539            run_id: self.run_id,
540            step_id: step.id,
541            message: config.message().to_string(),
542        })
543    }
544
545    /// Execute a custom operation step.
546    ///
547    /// Runs a user-defined [`Operation`] with full step lifecycle management:
548    /// creates the step record, transitions to Running, executes the operation,
549    /// persists the output and duration, and marks the step Completed or Failed.
550    ///
551    /// The operation's [`kind()`](Operation::kind) is stored as
552    /// [`StepKind::Custom`].
553    ///
554    /// # Errors
555    ///
556    /// Returns [`EngineError`] if the operation fails or the store errors.
557    ///
558    /// # Examples
559    ///
560    /// ```no_run
561    /// use ironflow_engine::context::WorkflowContext;
562    /// use ironflow_engine::operation::Operation;
563    /// use ironflow_engine::error::EngineError;
564    /// use serde_json::{Value, json};
565    /// use std::pin::Pin;
566    /// use std::future::Future;
567    ///
568    /// struct MyOp;
569    /// impl Operation for MyOp {
570    ///     fn kind(&self) -> &str { "my-service" }
571    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
572    ///         Box::pin(async { Ok(json!({"ok": true})) })
573    ///     }
574    /// }
575    ///
576    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
577    /// let result = ctx.operation("call-service", &MyOp).await?;
578    /// println!("output: {}", result.output);
579    /// # Ok(())
580    /// # }
581    /// ```
582    pub async fn operation(
583        &mut self,
584        name: &str,
585        op: &dyn Operation,
586    ) -> Result<StepOutput, EngineError> {
587        let kind = StepKind::Custom(op.kind().to_string());
588        let position = self.position;
589        self.position += 1;
590
591        let step = self
592            .store
593            .create_step(NewStep {
594                run_id: self.run_id,
595                name: name.to_string(),
596                kind,
597                position,
598                input: op.input(),
599            })
600            .await?;
601
602        self.start_step(step.id, Utc::now()).await?;
603
604        let start = Instant::now();
605
606        match op.execute().await {
607            Ok(output_value) => {
608                let duration_ms = start.elapsed().as_millis() as u64;
609                self.total_duration_ms += duration_ms;
610
611                let completed_at = Utc::now();
612                self.store
613                    .update_step(
614                        step.id,
615                        StepUpdate {
616                            status: Some(StepStatus::Completed),
617                            output: Some(output_value.clone()),
618                            duration_ms: Some(duration_ms),
619                            cost_usd: Some(Decimal::ZERO),
620                            completed_at: Some(completed_at),
621                            ..StepUpdate::default()
622                        },
623                    )
624                    .await?;
625
626                info!(
627                    run_id = %self.run_id,
628                    step = %name,
629                    kind = op.kind(),
630                    duration_ms,
631                    "operation step completed"
632                );
633
634                self.last_step_ids = vec![step.id];
635
636                Ok(StepOutput {
637                    output: output_value,
638                    duration_ms,
639                    cost_usd: Decimal::ZERO,
640                    input_tokens: None,
641                    output_tokens: None,
642                })
643            }
644            Err(err) => {
645                let completed_at = Utc::now();
646                if let Err(store_err) = self
647                    .store
648                    .update_step(
649                        step.id,
650                        StepUpdate {
651                            status: Some(StepStatus::Failed),
652                            error: Some(err.to_string()),
653                            completed_at: Some(completed_at),
654                            ..StepUpdate::default()
655                        },
656                    )
657                    .await
658                {
659                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
660                }
661
662                Err(err)
663            }
664        }
665    }
666
667    /// Execute a sub-workflow step.
668    ///
669    /// Creates a child run for the named workflow handler, executes it with
670    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
671    /// the child run ID and aggregated metrics.
672    ///
673    /// Requires the context to be created with
674    /// `with_handler_resolver`.
675    ///
676    /// # Errors
677    ///
678    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
679    /// with the given name, or if no handler resolver is available.
680    ///
681    /// # Examples
682    ///
683    /// ```no_run
684    /// use ironflow_engine::context::WorkflowContext;
685    /// use ironflow_engine::error::EngineError;
686    /// use serde_json::json;
687    ///
688    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
689    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
690    /// # Ok(())
691    /// # }
692    /// ```
693    pub async fn workflow(
694        &mut self,
695        handler: &dyn WorkflowHandler,
696        payload: Value,
697    ) -> Result<StepOutput, EngineError> {
698        let config = WorkflowStepConfig::new(handler.name(), payload);
699        let position = self.position;
700        self.position += 1;
701
702        let step = self
703            .store
704            .create_step(NewStep {
705                run_id: self.run_id,
706                name: config.workflow_name.clone(),
707                kind: StepKind::Workflow,
708                position,
709                input: Some(serde_json::to_value(&config)?),
710            })
711            .await?;
712
713        self.start_step(step.id, Utc::now()).await?;
714
715        match self.execute_child_workflow(&config).await {
716            Ok(output) => {
717                self.total_cost_usd += output.cost_usd;
718                self.total_duration_ms += output.duration_ms;
719
720                let completed_at = Utc::now();
721                self.store
722                    .update_step(
723                        step.id,
724                        StepUpdate {
725                            status: Some(StepStatus::Completed),
726                            output: Some(output.output.clone()),
727                            duration_ms: Some(output.duration_ms),
728                            cost_usd: Some(output.cost_usd),
729                            completed_at: Some(completed_at),
730                            ..StepUpdate::default()
731                        },
732                    )
733                    .await?;
734
735                info!(
736                    run_id = %self.run_id,
737                    child_workflow = %config.workflow_name,
738                    duration_ms = output.duration_ms,
739                    "workflow step completed"
740                );
741
742                self.last_step_ids = vec![step.id];
743
744                Ok(output)
745            }
746            Err(err) => {
747                let completed_at = Utc::now();
748                if let Err(store_err) = self
749                    .store
750                    .update_step(
751                        step.id,
752                        StepUpdate {
753                            status: Some(StepStatus::Failed),
754                            error: Some(err.to_string()),
755                            completed_at: Some(completed_at),
756                            ..StepUpdate::default()
757                        },
758                    )
759                    .await
760                {
761                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
762                }
763
764                Err(err)
765            }
766        }
767    }
768
769    /// Execute a child workflow and return aggregated output.
770    async fn execute_child_workflow(
771        &self,
772        config: &WorkflowStepConfig,
773    ) -> Result<StepOutput, EngineError> {
774        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
775            EngineError::InvalidWorkflow(
776                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
777            )
778        })?;
779
780        let handler = resolver(&config.workflow_name).ok_or_else(|| {
781            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
782        })?;
783
784        let child_run = self
785            .store
786            .create_run(NewRun {
787                workflow_name: config.workflow_name.clone(),
788                trigger: TriggerKind::Workflow,
789                payload: config.payload.clone(),
790                max_retries: 0,
791            })
792            .await?;
793
794        let child_run_id = child_run.id;
795        info!(
796            parent_run_id = %self.run_id,
797            child_run_id = %child_run_id,
798            workflow = %config.workflow_name,
799            "child run created"
800        );
801
802        self.store
803            .update_run_status(child_run_id, RunStatus::Running)
804            .await?;
805
806        let run_start = Instant::now();
807        let mut child_ctx = WorkflowContext {
808            run_id: child_run_id,
809            store: self.store.clone(),
810            provider: self.provider.clone(),
811            handler_resolver: self.handler_resolver.clone(),
812            position: 0,
813            last_step_ids: Vec::new(),
814            total_cost_usd: Decimal::ZERO,
815            total_duration_ms: 0,
816            replay_steps: std::collections::HashMap::new(),
817        };
818
819        let result = handler.execute(&mut child_ctx).await;
820        let total_duration = run_start.elapsed().as_millis() as u64;
821        let completed_at = Utc::now();
822
823        match result {
824            Ok(()) => {
825                self.store
826                    .update_run(
827                        child_run_id,
828                        RunUpdate {
829                            status: Some(RunStatus::Completed),
830                            cost_usd: Some(child_ctx.total_cost_usd),
831                            duration_ms: Some(total_duration),
832                            completed_at: Some(completed_at),
833                            ..RunUpdate::default()
834                        },
835                    )
836                    .await?;
837
838                Ok(StepOutput {
839                    output: serde_json::json!({
840                        "run_id": child_run_id,
841                        "workflow_name": config.workflow_name,
842                        "status": RunStatus::Completed,
843                        "cost_usd": child_ctx.total_cost_usd,
844                        "duration_ms": total_duration,
845                    }),
846                    duration_ms: total_duration,
847                    cost_usd: child_ctx.total_cost_usd,
848                    input_tokens: None,
849                    output_tokens: None,
850                })
851            }
852            Err(err) => {
853                if let Err(store_err) = self
854                    .store
855                    .update_run(
856                        child_run_id,
857                        RunUpdate {
858                            status: Some(RunStatus::Failed),
859                            error: Some(err.to_string()),
860                            cost_usd: Some(child_ctx.total_cost_usd),
861                            duration_ms: Some(total_duration),
862                            completed_at: Some(completed_at),
863                            ..RunUpdate::default()
864                        },
865                    )
866                    .await
867                {
868                    error!(
869                        child_run_id = %child_run_id,
870                        store_error = %store_err,
871                        "failed to persist child run failure"
872                    );
873                }
874
875                Err(err)
876            }
877        }
878    }
879
880    /// Try to replay a completed step from a previous execution.
881    ///
882    /// Returns `Some(StepOutput)` if a completed step exists at the given
883    /// position, `None` otherwise.
884    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
885        let step = self.replay_steps.get(&position)?;
886        if step.status.state != StepStatus::Completed {
887            return None;
888        }
889        let output = StepOutput {
890            output: step.output.clone().unwrap_or(Value::Null),
891            duration_ms: step.duration_ms,
892            cost_usd: step.cost_usd,
893            input_tokens: step.input_tokens,
894            output_tokens: step.output_tokens,
895        };
896        self.total_cost_usd += output.cost_usd;
897        self.total_duration_ms += output.duration_ms;
898        self.last_step_ids = vec![step.id];
899        info!(
900            run_id = %self.run_id,
901            step = %step.name,
902            position,
903            "step replayed from previous execution"
904        );
905        Some(output)
906    }
907
908    /// Internal: execute a step with full persistence lifecycle.
909    async fn execute_step(
910        &mut self,
911        name: &str,
912        kind: StepKind,
913        config: StepConfig,
914    ) -> Result<StepOutput, EngineError> {
915        let position = self.position;
916        self.position += 1;
917
918        // Replay: if this step already completed in a prior execution, return cached output.
919        if let Some(output) = self.try_replay_step(position) {
920            return Ok(output);
921        }
922
923        // Create step record in Pending.
924        let step = self
925            .store
926            .create_step(NewStep {
927                run_id: self.run_id,
928                name: name.to_string(),
929                kind,
930                position,
931                input: Some(serde_json::to_value(&config)?),
932            })
933            .await?;
934
935        self.start_step(step.id, Utc::now()).await?;
936
937        match execute_step_config(&config, &self.provider).await {
938            Ok(output) => {
939                self.total_cost_usd += output.cost_usd;
940                self.total_duration_ms += output.duration_ms;
941
942                let completed_at = Utc::now();
943                self.store
944                    .update_step(
945                        step.id,
946                        StepUpdate {
947                            status: Some(StepStatus::Completed),
948                            output: Some(output.output.clone()),
949                            duration_ms: Some(output.duration_ms),
950                            cost_usd: Some(output.cost_usd),
951                            input_tokens: output.input_tokens,
952                            output_tokens: output.output_tokens,
953                            completed_at: Some(completed_at),
954                            ..StepUpdate::default()
955                        },
956                    )
957                    .await?;
958
959                info!(
960                    run_id = %self.run_id,
961                    step = %name,
962                    duration_ms = output.duration_ms,
963                    "step completed"
964                );
965
966                self.last_step_ids = vec![step.id];
967
968                Ok(output)
969            }
970            Err(err) => {
971                let completed_at = Utc::now();
972                if let Err(store_err) = self
973                    .store
974                    .update_step(
975                        step.id,
976                        StepUpdate {
977                            status: Some(StepStatus::Failed),
978                            error: Some(err.to_string()),
979                            completed_at: Some(completed_at),
980                            ..StepUpdate::default()
981                        },
982                    )
983                    .await
984                {
985                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
986                }
987
988                Err(err)
989            }
990        }
991    }
992
993    /// Record dependency edges and transition a step to Running.
994    ///
995    /// Records edges from `step_id` to all `last_step_ids`, then
996    /// transitions the step to `Running` with the given timestamp.
997    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
998        if !self.last_step_ids.is_empty() {
999            let deps: Vec<NewStepDependency> = self
1000                .last_step_ids
1001                .iter()
1002                .map(|&depends_on| NewStepDependency {
1003                    step_id,
1004                    depends_on,
1005                })
1006                .collect();
1007            self.store.create_step_dependencies(deps).await?;
1008        }
1009
1010        self.store
1011            .update_step(
1012                step_id,
1013                StepUpdate {
1014                    status: Some(StepStatus::Running),
1015                    started_at: Some(now),
1016                    ..StepUpdate::default()
1017                },
1018            )
1019            .await?;
1020
1021        Ok(())
1022    }
1023
1024    /// Access the store directly (advanced usage).
1025    pub fn store(&self) -> &Arc<dyn RunStore> {
1026        &self.store
1027    }
1028
1029    /// Access the payload that triggered this run.
1030    ///
1031    /// Fetches the run from the store and returns its payload.
1032    ///
1033    /// # Errors
1034    ///
1035    /// Returns [`EngineError::Store`] if the run is not found.
1036    pub async fn payload(&self) -> Result<Value, EngineError> {
1037        let run = self
1038            .store
1039            .get_run(self.run_id)
1040            .await?
1041            .ok_or(EngineError::Store(
1042                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1043            ))?;
1044        Ok(run.payload)
1045    }
1046}
1047
1048impl fmt::Debug for WorkflowContext {
1049    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1050        f.debug_struct("WorkflowContext")
1051            .field("run_id", &self.run_id)
1052            .field("position", &self.position)
1053            .field("total_cost_usd", &self.total_cost_usd)
1054            .finish_non_exhaustive()
1055    }
1056}