Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::provider::AgentProvider;
38use ironflow_store::models::{
39    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
40    StepUpdate, TriggerKind,
41};
42use ironflow_store::store::RunStore;
43
44use crate::config::{
45    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
46};
47use crate::error::EngineError;
48use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
49use crate::handler::WorkflowHandler;
50use crate::operation::Operation;
51
52/// Callback type for resolving workflow handlers by name.
53pub(crate) type HandlerResolver =
54    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
55
56/// Execution context for a single workflow run.
57///
58/// Tracks the current step position and provides convenience methods
59/// for executing operations with automatic persistence.
60///
61/// # Examples
62///
63/// ```no_run
64/// use ironflow_engine::context::WorkflowContext;
65/// use ironflow_engine::config::ShellConfig;
66/// use ironflow_engine::error::EngineError;
67///
68/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
69/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
70/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkflowContext {
75    run_id: Uuid,
76    store: Arc<dyn RunStore>,
77    provider: Arc<dyn AgentProvider>,
78    handler_resolver: Option<HandlerResolver>,
79    position: u32,
80    /// IDs of the last executed step(s) -- used to record DAG dependencies.
81    last_step_ids: Vec<Uuid>,
82    /// Accumulated cost across all steps in this run.
83    total_cost_usd: Decimal,
84    /// Accumulated duration across all steps.
85    total_duration_ms: u64,
86    /// Steps from a previous execution, keyed by position.
87    /// Used when resuming after approval to replay completed steps.
88    replay_steps: std::collections::HashMap<u32, Step>,
89}
90
91impl WorkflowContext {
92    /// Create a new context for a run.
93    ///
94    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
95    /// creates this when executing a [`WorkflowHandler`].
96    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97        Self {
98            run_id,
99            store,
100            provider,
101            handler_resolver: None,
102            position: 0,
103            last_step_ids: Vec::new(),
104            total_cost_usd: Decimal::ZERO,
105            total_duration_ms: 0,
106            replay_steps: std::collections::HashMap::new(),
107        }
108    }
109
110    /// Create a new context with a handler resolver for sub-workflow support.
111    ///
112    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
113    /// look up registered handlers by name.
114    pub(crate) fn with_handler_resolver(
115        run_id: Uuid,
116        store: Arc<dyn RunStore>,
117        provider: Arc<dyn AgentProvider>,
118        resolver: HandlerResolver,
119    ) -> Self {
120        Self {
121            run_id,
122            store,
123            provider,
124            handler_resolver: Some(resolver),
125            position: 0,
126            last_step_ids: Vec::new(),
127            total_cost_usd: Decimal::ZERO,
128            total_duration_ms: 0,
129            replay_steps: std::collections::HashMap::new(),
130        }
131    }
132
133    /// Load existing steps from the store for replay after approval.
134    ///
135    /// Called by the engine when resuming a run. All completed steps
136    /// and the approved approval step are indexed by position so that
137    /// `execute_step` and `approval` can skip them.
138    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
139        let steps = self.store.list_steps(self.run_id).await?;
140        for step in steps {
141            let dominated = matches!(
142                step.status.state,
143                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
144            );
145            if dominated {
146                self.replay_steps.insert(step.position, step);
147            }
148        }
149        Ok(())
150    }
151
152    /// The run ID this context is executing for.
153    pub fn run_id(&self) -> Uuid {
154        self.run_id
155    }
156
157    /// Accumulated cost across all executed steps so far.
158    pub fn total_cost_usd(&self) -> Decimal {
159        self.total_cost_usd
160    }
161
162    /// Accumulated duration across all executed steps so far.
163    pub fn total_duration_ms(&self) -> u64 {
164        self.total_duration_ms
165    }
166
167    /// Execute multiple steps concurrently (wait-all model).
168    ///
169    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
170    /// Each step is recorded with the same `position` (execution wave).
171    /// Dependencies on previous steps are recorded automatically.
172    ///
173    /// When `fail_fast` is true, remaining steps are aborted on the first
174    /// failure. When false, all steps run to completion and the first
175    /// error is returned.
176    ///
177    /// # Errors
178    ///
179    /// Returns [`EngineError`] if any step fails.
180    ///
181    /// # Examples
182    ///
183    /// ```no_run
184    /// use ironflow_engine::context::WorkflowContext;
185    /// use ironflow_engine::config::{StepConfig, ShellConfig};
186    /// use ironflow_engine::error::EngineError;
187    ///
188    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
189    /// let results = ctx.parallel(
190    ///     vec![
191    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
192    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
193    ///     ],
194    ///     true,
195    /// ).await?;
196    ///
197    /// for r in &results {
198    ///     println!("{}: {:?}", r.name, r.output.output);
199    /// }
200    /// # Ok(())
201    /// # }
202    /// ```
203    pub async fn parallel(
204        &mut self,
205        steps: Vec<(&str, StepConfig)>,
206        fail_fast: bool,
207    ) -> Result<Vec<ParallelStepResult>, EngineError> {
208        if steps.is_empty() {
209            return Ok(Vec::new());
210        }
211
212        let wave_position = self.position;
213        self.position += 1;
214
215        let now = Utc::now();
216        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
217
218        for (name, config) in &steps {
219            let kind = config.kind();
220            let step = self
221                .store
222                .create_step(NewStep {
223                    run_id: self.run_id,
224                    name: name.to_string(),
225                    kind,
226                    position: wave_position,
227                    input: Some(serde_json::to_value(config)?),
228                })
229                .await?;
230
231            self.start_step(step.id, now).await?;
232
233            step_records.push((step.id, name.to_string(), config.clone()));
234        }
235
236        let mut join_set = JoinSet::new();
237        for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
238            let provider = self.provider.clone();
239            let config = config.clone();
240            join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
241        }
242
243        // JoinSet returns in completion order; indexed_results restores input order.
244        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
245            vec![None; step_records.len()];
246        let mut first_error: Option<EngineError> = None;
247
248        while let Some(join_result) = join_set.join_next().await {
249            let (idx, step_result) = match join_result {
250                Ok(r) => r,
251                Err(e) => {
252                    if first_error.is_none() {
253                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
254                    }
255                    if fail_fast {
256                        join_set.abort_all();
257                    }
258                    continue;
259                }
260            };
261
262            let (step_id, step_name, _) = &step_records[idx];
263            let completed_at = Utc::now();
264
265            match step_result {
266                Ok(output) => {
267                    self.total_cost_usd += output.cost_usd;
268                    self.total_duration_ms += output.duration_ms;
269
270                    self.store
271                        .update_step(
272                            *step_id,
273                            StepUpdate {
274                                status: Some(StepStatus::Completed),
275                                output: Some(output.output.clone()),
276                                duration_ms: Some(output.duration_ms),
277                                cost_usd: Some(output.cost_usd),
278                                input_tokens: output.input_tokens,
279                                output_tokens: output.output_tokens,
280                                completed_at: Some(completed_at),
281                                ..StepUpdate::default()
282                            },
283                        )
284                        .await?;
285
286                    info!(
287                        run_id = %self.run_id,
288                        step = %step_name,
289                        duration_ms = output.duration_ms,
290                        "parallel step completed"
291                    );
292
293                    indexed_results[idx] = Some(Ok(output));
294                }
295                Err(err) => {
296                    let err_msg = err.to_string();
297
298                    if let Err(store_err) = self
299                        .store
300                        .update_step(
301                            *step_id,
302                            StepUpdate {
303                                status: Some(StepStatus::Failed),
304                                error: Some(err_msg.clone()),
305                                completed_at: Some(completed_at),
306                                ..StepUpdate::default()
307                            },
308                        )
309                        .await
310                    {
311                        tracing::error!(
312                            step_id = %step_id,
313                            error = %store_err,
314                            "failed to persist parallel step failure"
315                        );
316                    }
317
318                    indexed_results[idx] = Some(Err(err_msg.clone()));
319
320                    if first_error.is_none() {
321                        first_error = Some(err);
322                    }
323
324                    if fail_fast {
325                        join_set.abort_all();
326                    }
327                }
328            }
329        }
330
331        if let Some(err) = first_error {
332            return Err(err);
333        }
334
335        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
336
337        // Build results in original order.
338        let results: Vec<ParallelStepResult> = step_records
339            .iter()
340            .enumerate()
341            .map(|(idx, (step_id, name, _))| {
342                let output = match indexed_results[idx].take() {
343                    Some(Ok(o)) => o,
344                    _ => unreachable!("all steps succeeded if no error returned"),
345                };
346                ParallelStepResult {
347                    name: name.clone(),
348                    output,
349                    step_id: *step_id,
350                }
351            })
352            .collect();
353
354        Ok(results)
355    }
356
357    /// Execute a shell step.
358    ///
359    /// Creates the step record, runs the command, persists the result,
360    /// and returns the output for use in subsequent steps.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`EngineError`] if the command fails or the store errors.
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// use ironflow_engine::context::WorkflowContext;
370    /// use ironflow_engine::config::ShellConfig;
371    /// use ironflow_engine::error::EngineError;
372    ///
373    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
375    /// println!("stdout: {}", files.output["stdout"]);
376    /// # Ok(())
377    /// # }
378    /// ```
379    pub async fn shell(
380        &mut self,
381        name: &str,
382        config: ShellConfig,
383    ) -> Result<StepOutput, EngineError> {
384        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
385            .await
386    }
387
388    /// Execute an HTTP step.
389    ///
390    /// # Errors
391    ///
392    /// Returns [`EngineError`] if the request fails or the store errors.
393    ///
394    /// # Examples
395    ///
396    /// ```no_run
397    /// use ironflow_engine::context::WorkflowContext;
398    /// use ironflow_engine::config::HttpConfig;
399    /// use ironflow_engine::error::EngineError;
400    ///
401    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
403    /// println!("status: {}", resp.output["status"]);
404    /// # Ok(())
405    /// # }
406    /// ```
407    pub async fn http(
408        &mut self,
409        name: &str,
410        config: HttpConfig,
411    ) -> Result<StepOutput, EngineError> {
412        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
413            .await
414    }
415
416    /// Execute an agent step.
417    ///
418    /// # Errors
419    ///
420    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
421    ///
422    /// # Examples
423    ///
424    /// ```no_run
425    /// use ironflow_engine::context::WorkflowContext;
426    /// use ironflow_engine::config::AgentStepConfig;
427    /// use ironflow_engine::error::EngineError;
428    ///
429    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
430    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
431    /// println!("review: {}", review.output["value"]);
432    /// # Ok(())
433    /// # }
434    /// ```
435    pub async fn agent(
436        &mut self,
437        name: &str,
438        config: AgentStepConfig,
439    ) -> Result<StepOutput, EngineError> {
440        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
441            .await
442    }
443
444    /// Create a human approval gate.
445    ///
446    /// On first execution, records an approval step and returns
447    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
448    /// transitions the run to `AwaitingApproval`.
449    ///
450    /// On resume (after a human approved via the API), the approval step
451    /// is replayed: it is marked as `Completed` and execution continues
452    /// past it. Multiple approval gates in the same handler work -- each
453    /// one pauses and resumes independently.
454    ///
455    /// # Errors
456    ///
457    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
458    /// first execution. Returns other [`EngineError`] variants on store
459    /// failures.
460    ///
461    /// # Examples
462    ///
463    /// ```no_run
464    /// use ironflow_engine::context::WorkflowContext;
465    /// use ironflow_engine::config::ApprovalConfig;
466    /// use ironflow_engine::error::EngineError;
467    ///
468    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
469    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
470    /// // Execution continues here after approval
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub async fn approval(
475        &mut self,
476        name: &str,
477        config: ApprovalConfig,
478    ) -> Result<(), EngineError> {
479        let position = self.position;
480        self.position += 1;
481
482        // Replay: if this approval step exists from a prior execution,
483        // the run was approved -- mark it completed (if not already) and continue.
484        if let Some(existing) = self.replay_steps.get(&position)
485            && existing.kind == StepKind::Approval
486        {
487            if existing.status.state == StepStatus::AwaitingApproval {
488                self.store
489                    .update_step(
490                        existing.id,
491                        StepUpdate {
492                            status: Some(StepStatus::Completed),
493                            completed_at: Some(Utc::now()),
494                            ..StepUpdate::default()
495                        },
496                    )
497                    .await?;
498            }
499
500            self.last_step_ids = vec![existing.id];
501            info!(
502                run_id = %self.run_id,
503                step = %name,
504                position,
505                "approval step replayed (approved)"
506            );
507            return Ok(());
508        }
509
510        // First execution: create the approval step and suspend.
511        let step = self
512            .store
513            .create_step(NewStep {
514                run_id: self.run_id,
515                name: name.to_string(),
516                kind: StepKind::Approval,
517                position,
518                input: Some(serde_json::to_value(&config)?),
519            })
520            .await?;
521
522        self.start_step(step.id, Utc::now()).await?;
523
524        // Transition the step to AwaitingApproval so it reflects
525        // the suspended state on the dashboard.
526        self.store
527            .update_step(
528                step.id,
529                StepUpdate {
530                    status: Some(StepStatus::AwaitingApproval),
531                    ..StepUpdate::default()
532                },
533            )
534            .await?;
535
536        self.last_step_ids = vec![step.id];
537
538        Err(EngineError::ApprovalRequired {
539            run_id: self.run_id,
540            step_id: step.id,
541            message: config.message().to_string(),
542        })
543    }
544
545    /// Execute a custom operation step.
546    ///
547    /// Runs a user-defined [`Operation`] with full step lifecycle management:
548    /// creates the step record, transitions to Running, executes the operation,
549    /// persists the output and duration, and marks the step Completed or Failed.
550    ///
551    /// The operation's [`kind()`](Operation::kind) is stored as
552    /// [`StepKind::Custom`].
553    ///
554    /// # Errors
555    ///
556    /// Returns [`EngineError`] if the operation fails or the store errors.
557    ///
558    /// # Examples
559    ///
560    /// ```no_run
561    /// use ironflow_engine::context::WorkflowContext;
562    /// use ironflow_engine::operation::Operation;
563    /// use ironflow_engine::error::EngineError;
564    /// use serde_json::{Value, json};
565    /// use std::pin::Pin;
566    /// use std::future::Future;
567    ///
568    /// struct MyOp;
569    /// impl Operation for MyOp {
570    ///     fn kind(&self) -> &str { "my-service" }
571    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
572    ///         Box::pin(async { Ok(json!({"ok": true})) })
573    ///     }
574    /// }
575    ///
576    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
577    /// let result = ctx.operation("call-service", &MyOp).await?;
578    /// println!("output: {}", result.output);
579    /// # Ok(())
580    /// # }
581    /// ```
582    pub async fn operation(
583        &mut self,
584        name: &str,
585        op: &dyn Operation,
586    ) -> Result<StepOutput, EngineError> {
587        let kind = StepKind::Custom(op.kind().to_string());
588        let position = self.position;
589        self.position += 1;
590
591        let step = self
592            .store
593            .create_step(NewStep {
594                run_id: self.run_id,
595                name: name.to_string(),
596                kind,
597                position,
598                input: op.input(),
599            })
600            .await?;
601
602        self.start_step(step.id, Utc::now()).await?;
603
604        let start = Instant::now();
605
606        match op.execute().await {
607            Ok(output_value) => {
608                let duration_ms = start.elapsed().as_millis() as u64;
609                self.total_duration_ms += duration_ms;
610
611                let completed_at = Utc::now();
612                self.store
613                    .update_step(
614                        step.id,
615                        StepUpdate {
616                            status: Some(StepStatus::Completed),
617                            output: Some(output_value.clone()),
618                            duration_ms: Some(duration_ms),
619                            cost_usd: Some(Decimal::ZERO),
620                            completed_at: Some(completed_at),
621                            ..StepUpdate::default()
622                        },
623                    )
624                    .await?;
625
626                info!(
627                    run_id = %self.run_id,
628                    step = %name,
629                    kind = op.kind(),
630                    duration_ms,
631                    "operation step completed"
632                );
633
634                self.last_step_ids = vec![step.id];
635
636                Ok(StepOutput {
637                    output: output_value,
638                    duration_ms,
639                    cost_usd: Decimal::ZERO,
640                    input_tokens: None,
641                    output_tokens: None,
642                    debug_messages: None,
643                })
644            }
645            Err(err) => {
646                let completed_at = Utc::now();
647                if let Err(store_err) = self
648                    .store
649                    .update_step(
650                        step.id,
651                        StepUpdate {
652                            status: Some(StepStatus::Failed),
653                            error: Some(err.to_string()),
654                            completed_at: Some(completed_at),
655                            ..StepUpdate::default()
656                        },
657                    )
658                    .await
659                {
660                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
661                }
662
663                Err(err)
664            }
665        }
666    }
667
668    /// Execute a sub-workflow step.
669    ///
670    /// Creates a child run for the named workflow handler, executes it with
671    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
672    /// the child run ID and aggregated metrics.
673    ///
674    /// Requires the context to be created with
675    /// `with_handler_resolver`.
676    ///
677    /// # Errors
678    ///
679    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
680    /// with the given name, or if no handler resolver is available.
681    ///
682    /// # Examples
683    ///
684    /// ```no_run
685    /// use ironflow_engine::context::WorkflowContext;
686    /// use ironflow_engine::error::EngineError;
687    /// use serde_json::json;
688    ///
689    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
690    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
691    /// # Ok(())
692    /// # }
693    /// ```
694    pub async fn workflow(
695        &mut self,
696        handler: &dyn WorkflowHandler,
697        payload: Value,
698    ) -> Result<StepOutput, EngineError> {
699        let config = WorkflowStepConfig::new(handler.name(), payload);
700        let position = self.position;
701        self.position += 1;
702
703        let step = self
704            .store
705            .create_step(NewStep {
706                run_id: self.run_id,
707                name: config.workflow_name.clone(),
708                kind: StepKind::Workflow,
709                position,
710                input: Some(serde_json::to_value(&config)?),
711            })
712            .await?;
713
714        self.start_step(step.id, Utc::now()).await?;
715
716        match self.execute_child_workflow(&config).await {
717            Ok(output) => {
718                self.total_cost_usd += output.cost_usd;
719                self.total_duration_ms += output.duration_ms;
720
721                let completed_at = Utc::now();
722                self.store
723                    .update_step(
724                        step.id,
725                        StepUpdate {
726                            status: Some(StepStatus::Completed),
727                            output: Some(output.output.clone()),
728                            duration_ms: Some(output.duration_ms),
729                            cost_usd: Some(output.cost_usd),
730                            completed_at: Some(completed_at),
731                            ..StepUpdate::default()
732                        },
733                    )
734                    .await?;
735
736                info!(
737                    run_id = %self.run_id,
738                    child_workflow = %config.workflow_name,
739                    duration_ms = output.duration_ms,
740                    "workflow step completed"
741                );
742
743                self.last_step_ids = vec![step.id];
744
745                Ok(output)
746            }
747            Err(err) => {
748                let completed_at = Utc::now();
749                if let Err(store_err) = self
750                    .store
751                    .update_step(
752                        step.id,
753                        StepUpdate {
754                            status: Some(StepStatus::Failed),
755                            error: Some(err.to_string()),
756                            completed_at: Some(completed_at),
757                            ..StepUpdate::default()
758                        },
759                    )
760                    .await
761                {
762                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
763                }
764
765                Err(err)
766            }
767        }
768    }
769
770    /// Execute a child workflow and return aggregated output.
771    async fn execute_child_workflow(
772        &self,
773        config: &WorkflowStepConfig,
774    ) -> Result<StepOutput, EngineError> {
775        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
776            EngineError::InvalidWorkflow(
777                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
778            )
779        })?;
780
781        let handler = resolver(&config.workflow_name).ok_or_else(|| {
782            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
783        })?;
784
785        let child_run = self
786            .store
787            .create_run(NewRun {
788                workflow_name: config.workflow_name.clone(),
789                trigger: TriggerKind::Workflow,
790                payload: config.payload.clone(),
791                max_retries: 0,
792            })
793            .await?;
794
795        let child_run_id = child_run.id;
796        info!(
797            parent_run_id = %self.run_id,
798            child_run_id = %child_run_id,
799            workflow = %config.workflow_name,
800            "child run created"
801        );
802
803        self.store
804            .update_run_status(child_run_id, RunStatus::Running)
805            .await?;
806
807        let run_start = Instant::now();
808        let mut child_ctx = WorkflowContext {
809            run_id: child_run_id,
810            store: self.store.clone(),
811            provider: self.provider.clone(),
812            handler_resolver: self.handler_resolver.clone(),
813            position: 0,
814            last_step_ids: Vec::new(),
815            total_cost_usd: Decimal::ZERO,
816            total_duration_ms: 0,
817            replay_steps: std::collections::HashMap::new(),
818        };
819
820        let result = handler.execute(&mut child_ctx).await;
821        let total_duration = run_start.elapsed().as_millis() as u64;
822        let completed_at = Utc::now();
823
824        match result {
825            Ok(()) => {
826                self.store
827                    .update_run(
828                        child_run_id,
829                        RunUpdate {
830                            status: Some(RunStatus::Completed),
831                            cost_usd: Some(child_ctx.total_cost_usd),
832                            duration_ms: Some(total_duration),
833                            completed_at: Some(completed_at),
834                            ..RunUpdate::default()
835                        },
836                    )
837                    .await?;
838
839                Ok(StepOutput {
840                    output: serde_json::json!({
841                        "run_id": child_run_id,
842                        "workflow_name": config.workflow_name,
843                        "status": RunStatus::Completed,
844                        "cost_usd": child_ctx.total_cost_usd,
845                        "duration_ms": total_duration,
846                    }),
847                    duration_ms: total_duration,
848                    cost_usd: child_ctx.total_cost_usd,
849                    input_tokens: None,
850                    output_tokens: None,
851                    debug_messages: None,
852                })
853            }
854            Err(err) => {
855                if let Err(store_err) = self
856                    .store
857                    .update_run(
858                        child_run_id,
859                        RunUpdate {
860                            status: Some(RunStatus::Failed),
861                            error: Some(err.to_string()),
862                            cost_usd: Some(child_ctx.total_cost_usd),
863                            duration_ms: Some(total_duration),
864                            completed_at: Some(completed_at),
865                            ..RunUpdate::default()
866                        },
867                    )
868                    .await
869                {
870                    error!(
871                        child_run_id = %child_run_id,
872                        store_error = %store_err,
873                        "failed to persist child run failure"
874                    );
875                }
876
877                Err(err)
878            }
879        }
880    }
881
882    /// Try to replay a completed step from a previous execution.
883    ///
884    /// Returns `Some(StepOutput)` if a completed step exists at the given
885    /// position, `None` otherwise.
886    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
887        let step = self.replay_steps.get(&position)?;
888        if step.status.state != StepStatus::Completed {
889            return None;
890        }
891        let output = StepOutput {
892            output: step.output.clone().unwrap_or(Value::Null),
893            duration_ms: step.duration_ms,
894            cost_usd: step.cost_usd,
895            input_tokens: step.input_tokens,
896            output_tokens: step.output_tokens,
897            debug_messages: None,
898        };
899        self.total_cost_usd += output.cost_usd;
900        self.total_duration_ms += output.duration_ms;
901        self.last_step_ids = vec![step.id];
902        info!(
903            run_id = %self.run_id,
904            step = %step.name,
905            position,
906            "step replayed from previous execution"
907        );
908        Some(output)
909    }
910
911    /// Internal: execute a step with full persistence lifecycle.
912    async fn execute_step(
913        &mut self,
914        name: &str,
915        kind: StepKind,
916        config: StepConfig,
917    ) -> Result<StepOutput, EngineError> {
918        let position = self.position;
919        self.position += 1;
920
921        // Replay: if this step already completed in a prior execution, return cached output.
922        if let Some(output) = self.try_replay_step(position) {
923            return Ok(output);
924        }
925
926        // Create step record in Pending.
927        let step = self
928            .store
929            .create_step(NewStep {
930                run_id: self.run_id,
931                name: name.to_string(),
932                kind,
933                position,
934                input: Some(serde_json::to_value(&config)?),
935            })
936            .await?;
937
938        self.start_step(step.id, Utc::now()).await?;
939
940        match execute_step_config(&config, &self.provider).await {
941            Ok(output) => {
942                self.total_cost_usd += output.cost_usd;
943                self.total_duration_ms += output.duration_ms;
944
945                let completed_at = Utc::now();
946                self.store
947                    .update_step(
948                        step.id,
949                        StepUpdate {
950                            status: Some(StepStatus::Completed),
951                            output: Some(output.output.clone()),
952                            duration_ms: Some(output.duration_ms),
953                            cost_usd: Some(output.cost_usd),
954                            input_tokens: output.input_tokens,
955                            output_tokens: output.output_tokens,
956                            completed_at: Some(completed_at),
957                            ..StepUpdate::default()
958                        },
959                    )
960                    .await?;
961
962                info!(
963                    run_id = %self.run_id,
964                    step = %name,
965                    duration_ms = output.duration_ms,
966                    "step completed"
967                );
968
969                self.last_step_ids = vec![step.id];
970
971                Ok(output)
972            }
973            Err(err) => {
974                let completed_at = Utc::now();
975                if let Err(store_err) = self
976                    .store
977                    .update_step(
978                        step.id,
979                        StepUpdate {
980                            status: Some(StepStatus::Failed),
981                            error: Some(err.to_string()),
982                            completed_at: Some(completed_at),
983                            ..StepUpdate::default()
984                        },
985                    )
986                    .await
987                {
988                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
989                }
990
991                Err(err)
992            }
993        }
994    }
995
996    /// Record dependency edges and transition a step to Running.
997    ///
998    /// Records edges from `step_id` to all `last_step_ids`, then
999    /// transitions the step to `Running` with the given timestamp.
1000    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1001        if !self.last_step_ids.is_empty() {
1002            let deps: Vec<NewStepDependency> = self
1003                .last_step_ids
1004                .iter()
1005                .map(|&depends_on| NewStepDependency {
1006                    step_id,
1007                    depends_on,
1008                })
1009                .collect();
1010            self.store.create_step_dependencies(deps).await?;
1011        }
1012
1013        self.store
1014            .update_step(
1015                step_id,
1016                StepUpdate {
1017                    status: Some(StepStatus::Running),
1018                    started_at: Some(now),
1019                    ..StepUpdate::default()
1020                },
1021            )
1022            .await?;
1023
1024        Ok(())
1025    }
1026
1027    /// Access the store directly (advanced usage).
1028    pub fn store(&self) -> &Arc<dyn RunStore> {
1029        &self.store
1030    }
1031
1032    /// Access the payload that triggered this run.
1033    ///
1034    /// Fetches the run from the store and returns its payload.
1035    ///
1036    /// # Errors
1037    ///
1038    /// Returns [`EngineError::Store`] if the run is not found.
1039    pub async fn payload(&self) -> Result<Value, EngineError> {
1040        let run = self
1041            .store
1042            .get_run(self.run_id)
1043            .await?
1044            .ok_or(EngineError::Store(
1045                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1046            ))?;
1047        Ok(run.payload)
1048    }
1049}
1050
1051impl fmt::Debug for WorkflowContext {
1052    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1053        f.debug_struct("WorkflowContext")
1054            .field("run_id", &self.run_id)
1055            .field("position", &self.position)
1056            .field("total_cost_usd", &self.total_cost_usd)
1057            .finish_non_exhaustive()
1058    }
1059}