Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42    StepUpdate, TriggerKind,
43};
44use ironflow_store::store::RunStore;
45
46use crate::config::{
47    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::operation::Operation;
53
54/// Callback type for resolving workflow handlers by name.
55pub(crate) type HandlerResolver =
56    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
57
58/// Execution context for a single workflow run.
59///
60/// Tracks the current step position and provides convenience methods
61/// for executing operations with automatic persistence.
62///
63/// # Examples
64///
65/// ```no_run
66/// use ironflow_engine::context::WorkflowContext;
67/// use ironflow_engine::config::ShellConfig;
68/// use ironflow_engine::error::EngineError;
69///
70/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
71/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
72/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
73/// # Ok(())
74/// # }
75/// ```
76pub struct WorkflowContext {
77    run_id: Uuid,
78    store: Arc<dyn RunStore>,
79    provider: Arc<dyn AgentProvider>,
80    handler_resolver: Option<HandlerResolver>,
81    position: u32,
82    /// IDs of the last executed step(s) -- used to record DAG dependencies.
83    last_step_ids: Vec<Uuid>,
84    /// Accumulated cost across all steps in this run.
85    total_cost_usd: Decimal,
86    /// Accumulated duration across all steps.
87    total_duration_ms: u64,
88    /// Steps from a previous execution, keyed by position.
89    /// Used when resuming after approval to replay completed steps.
90    replay_steps: HashMap<u32, Step>,
91}
92
93impl WorkflowContext {
94    /// Create a new context for a run.
95    ///
96    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
97    /// creates this when executing a [`WorkflowHandler`].
98    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
99        Self {
100            run_id,
101            store,
102            provider,
103            handler_resolver: None,
104            position: 0,
105            last_step_ids: Vec::new(),
106            total_cost_usd: Decimal::ZERO,
107            total_duration_ms: 0,
108            replay_steps: HashMap::new(),
109        }
110    }
111
112    /// Create a new context with a handler resolver for sub-workflow support.
113    ///
114    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
115    /// look up registered handlers by name.
116    pub(crate) fn with_handler_resolver(
117        run_id: Uuid,
118        store: Arc<dyn RunStore>,
119        provider: Arc<dyn AgentProvider>,
120        resolver: HandlerResolver,
121    ) -> Self {
122        Self {
123            run_id,
124            store,
125            provider,
126            handler_resolver: Some(resolver),
127            position: 0,
128            last_step_ids: Vec::new(),
129            total_cost_usd: Decimal::ZERO,
130            total_duration_ms: 0,
131            replay_steps: HashMap::new(),
132        }
133    }
134
135    /// Load existing steps from the store for replay after approval.
136    ///
137    /// Called by the engine when resuming a run. All completed steps
138    /// and the approved approval step are indexed by position so that
139    /// `execute_step` and `approval` can skip them.
140    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
141        let steps = self.store.list_steps(self.run_id).await?;
142        for step in steps {
143            let dominated = matches!(
144                step.status.state,
145                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
146            );
147            if dominated {
148                self.replay_steps.insert(step.position, step);
149            }
150        }
151        Ok(())
152    }
153
154    /// The run ID this context is executing for.
155    pub fn run_id(&self) -> Uuid {
156        self.run_id
157    }
158
159    /// Accumulated cost across all executed steps so far.
160    pub fn total_cost_usd(&self) -> Decimal {
161        self.total_cost_usd
162    }
163
164    /// Accumulated duration across all executed steps so far.
165    pub fn total_duration_ms(&self) -> u64 {
166        self.total_duration_ms
167    }
168
169    /// Execute multiple steps concurrently (wait-all model).
170    ///
171    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
172    /// Each step is recorded with the same `position` (execution wave).
173    /// Dependencies on previous steps are recorded automatically.
174    ///
175    /// When `fail_fast` is true, remaining steps are aborted on the first
176    /// failure. When false, all steps run to completion and the first
177    /// error is returned.
178    ///
179    /// # Errors
180    ///
181    /// Returns [`EngineError`] if any step fails.
182    ///
183    /// # Examples
184    ///
185    /// ```no_run
186    /// use ironflow_engine::context::WorkflowContext;
187    /// use ironflow_engine::config::{StepConfig, ShellConfig};
188    /// use ironflow_engine::error::EngineError;
189    ///
190    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
191    /// let results = ctx.parallel(
192    ///     vec![
193    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
194    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
195    ///     ],
196    ///     true,
197    /// ).await?;
198    ///
199    /// for r in &results {
200    ///     println!("{}: {:?}", r.name, r.output.output);
201    /// }
202    /// # Ok(())
203    /// # }
204    /// ```
205    pub async fn parallel(
206        &mut self,
207        steps: Vec<(&str, StepConfig)>,
208        fail_fast: bool,
209    ) -> Result<Vec<ParallelStepResult>, EngineError> {
210        if steps.is_empty() {
211            return Ok(Vec::new());
212        }
213
214        let wave_position = self.position;
215        self.position += 1;
216
217        let now = Utc::now();
218        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
219
220        for (name, config) in &steps {
221            let kind = config.kind();
222            let step = self
223                .store
224                .create_step(NewStep {
225                    run_id: self.run_id,
226                    name: name.to_string(),
227                    kind,
228                    position: wave_position,
229                    input: Some(serde_json::to_value(config)?),
230                })
231                .await?;
232
233            self.start_step(step.id, now).await?;
234
235            step_records.push((step.id, name.to_string(), config.clone()));
236        }
237
238        let mut join_set = JoinSet::new();
239        for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
240            let provider = self.provider.clone();
241            let config = config.clone();
242            join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
243        }
244
245        // JoinSet returns in completion order; indexed_results restores input order.
246        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
247            vec![None; step_records.len()];
248        let mut first_error: Option<EngineError> = None;
249
250        while let Some(join_result) = join_set.join_next().await {
251            let (idx, step_result) = match join_result {
252                Ok(r) => r,
253                Err(e) => {
254                    if first_error.is_none() {
255                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
256                    }
257                    if fail_fast {
258                        join_set.abort_all();
259                    }
260                    continue;
261                }
262            };
263
264            let (step_id, step_name, _) = &step_records[idx];
265            let completed_at = Utc::now();
266
267            match step_result {
268                Ok(output) => {
269                    self.total_cost_usd += output.cost_usd;
270                    self.total_duration_ms += output.duration_ms;
271
272                    let debug_messages_json = output.debug_messages_json();
273
274                    self.store
275                        .update_step(
276                            *step_id,
277                            StepUpdate {
278                                status: Some(StepStatus::Completed),
279                                output: Some(output.output.clone()),
280                                duration_ms: Some(output.duration_ms),
281                                cost_usd: Some(output.cost_usd),
282                                input_tokens: output.input_tokens,
283                                output_tokens: output.output_tokens,
284                                completed_at: Some(completed_at),
285                                debug_messages: debug_messages_json,
286                                ..StepUpdate::default()
287                            },
288                        )
289                        .await?;
290
291                    info!(
292                        run_id = %self.run_id,
293                        step = %step_name,
294                        duration_ms = output.duration_ms,
295                        "parallel step completed"
296                    );
297
298                    indexed_results[idx] = Some(Ok(output));
299                }
300                Err(err) => {
301                    let err_msg = err.to_string();
302                    let debug_messages_json = extract_debug_messages_from_error(&err);
303
304                    if let Err(store_err) = self
305                        .store
306                        .update_step(
307                            *step_id,
308                            StepUpdate {
309                                status: Some(StepStatus::Failed),
310                                error: Some(err_msg.clone()),
311                                completed_at: Some(completed_at),
312                                debug_messages: debug_messages_json,
313                                ..StepUpdate::default()
314                            },
315                        )
316                        .await
317                    {
318                        tracing::error!(
319                            step_id = %step_id,
320                            error = %store_err,
321                            "failed to persist parallel step failure"
322                        );
323                    }
324
325                    indexed_results[idx] = Some(Err(err_msg.clone()));
326
327                    if first_error.is_none() {
328                        first_error = Some(err);
329                    }
330
331                    if fail_fast {
332                        join_set.abort_all();
333                    }
334                }
335            }
336        }
337
338        if let Some(err) = first_error {
339            return Err(err);
340        }
341
342        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
343
344        // Build results in original order.
345        let results: Vec<ParallelStepResult> = step_records
346            .iter()
347            .enumerate()
348            .map(|(idx, (step_id, name, _))| {
349                let output = match indexed_results[idx].take() {
350                    Some(Ok(o)) => o,
351                    _ => unreachable!("all steps succeeded if no error returned"),
352                };
353                ParallelStepResult {
354                    name: name.clone(),
355                    output,
356                    step_id: *step_id,
357                }
358            })
359            .collect();
360
361        Ok(results)
362    }
363
364    /// Execute a shell step.
365    ///
366    /// Creates the step record, runs the command, persists the result,
367    /// and returns the output for use in subsequent steps.
368    ///
369    /// # Errors
370    ///
371    /// Returns [`EngineError`] if the command fails or the store errors.
372    ///
373    /// # Examples
374    ///
375    /// ```no_run
376    /// use ironflow_engine::context::WorkflowContext;
377    /// use ironflow_engine::config::ShellConfig;
378    /// use ironflow_engine::error::EngineError;
379    ///
380    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
381    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
382    /// println!("stdout: {}", files.output["stdout"]);
383    /// # Ok(())
384    /// # }
385    /// ```
386    pub async fn shell(
387        &mut self,
388        name: &str,
389        config: ShellConfig,
390    ) -> Result<StepOutput, EngineError> {
391        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
392            .await
393    }
394
395    /// Execute an HTTP step.
396    ///
397    /// # Errors
398    ///
399    /// Returns [`EngineError`] if the request fails or the store errors.
400    ///
401    /// # Examples
402    ///
403    /// ```no_run
404    /// use ironflow_engine::context::WorkflowContext;
405    /// use ironflow_engine::config::HttpConfig;
406    /// use ironflow_engine::error::EngineError;
407    ///
408    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
409    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
410    /// println!("status: {}", resp.output["status"]);
411    /// # Ok(())
412    /// # }
413    /// ```
414    pub async fn http(
415        &mut self,
416        name: &str,
417        config: HttpConfig,
418    ) -> Result<StepOutput, EngineError> {
419        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
420            .await
421    }
422
423    /// Execute an agent step.
424    ///
425    /// # Errors
426    ///
427    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
428    ///
429    /// # Examples
430    ///
431    /// ```no_run
432    /// use ironflow_engine::context::WorkflowContext;
433    /// use ironflow_engine::config::AgentStepConfig;
434    /// use ironflow_engine::error::EngineError;
435    ///
436    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
437    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
438    /// println!("review: {}", review.output["value"]);
439    /// # Ok(())
440    /// # }
441    /// ```
442    pub async fn agent(
443        &mut self,
444        name: &str,
445        config: impl Into<AgentStepConfig>,
446    ) -> Result<StepOutput, EngineError> {
447        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
448            .await
449    }
450
451    /// Create a human approval gate.
452    ///
453    /// On first execution, records an approval step and returns
454    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
455    /// transitions the run to `AwaitingApproval`.
456    ///
457    /// On resume (after a human approved via the API), the approval step
458    /// is replayed: it is marked as `Completed` and execution continues
459    /// past it. Multiple approval gates in the same handler work -- each
460    /// one pauses and resumes independently.
461    ///
462    /// # Errors
463    ///
464    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
465    /// first execution. Returns other [`EngineError`] variants on store
466    /// failures.
467    ///
468    /// # Examples
469    ///
470    /// ```no_run
471    /// use ironflow_engine::context::WorkflowContext;
472    /// use ironflow_engine::config::ApprovalConfig;
473    /// use ironflow_engine::error::EngineError;
474    ///
475    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
476    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
477    /// // Execution continues here after approval
478    /// # Ok(())
479    /// # }
480    /// ```
481    pub async fn approval(
482        &mut self,
483        name: &str,
484        config: ApprovalConfig,
485    ) -> Result<(), EngineError> {
486        let position = self.position;
487        self.position += 1;
488
489        // Replay: if this approval step exists from a prior execution,
490        // the run was approved -- mark it completed (if not already) and continue.
491        if let Some(existing) = self.replay_steps.get(&position)
492            && existing.kind == StepKind::Approval
493        {
494            if existing.status.state == StepStatus::AwaitingApproval {
495                self.store
496                    .update_step(
497                        existing.id,
498                        StepUpdate {
499                            status: Some(StepStatus::Completed),
500                            completed_at: Some(Utc::now()),
501                            ..StepUpdate::default()
502                        },
503                    )
504                    .await?;
505            }
506
507            self.last_step_ids = vec![existing.id];
508            info!(
509                run_id = %self.run_id,
510                step = %name,
511                position,
512                "approval step replayed (approved)"
513            );
514            return Ok(());
515        }
516
517        // First execution: create the approval step and suspend.
518        let step = self
519            .store
520            .create_step(NewStep {
521                run_id: self.run_id,
522                name: name.to_string(),
523                kind: StepKind::Approval,
524                position,
525                input: Some(serde_json::to_value(&config)?),
526            })
527            .await?;
528
529        self.start_step(step.id, Utc::now()).await?;
530
531        // Transition the step to AwaitingApproval so it reflects
532        // the suspended state on the dashboard.
533        self.store
534            .update_step(
535                step.id,
536                StepUpdate {
537                    status: Some(StepStatus::AwaitingApproval),
538                    ..StepUpdate::default()
539                },
540            )
541            .await?;
542
543        self.last_step_ids = vec![step.id];
544
545        Err(EngineError::ApprovalRequired {
546            run_id: self.run_id,
547            step_id: step.id,
548            message: config.message().to_string(),
549        })
550    }
551
552    /// Execute a custom operation step.
553    ///
554    /// Runs a user-defined [`Operation`] with full step lifecycle management:
555    /// creates the step record, transitions to Running, executes the operation,
556    /// persists the output and duration, and marks the step Completed or Failed.
557    ///
558    /// The operation's [`kind()`](Operation::kind) is stored as
559    /// [`StepKind::Custom`].
560    ///
561    /// # Errors
562    ///
563    /// Returns [`EngineError`] if the operation fails or the store errors.
564    ///
565    /// # Examples
566    ///
567    /// ```no_run
568    /// use ironflow_engine::context::WorkflowContext;
569    /// use ironflow_engine::operation::Operation;
570    /// use ironflow_engine::error::EngineError;
571    /// use serde_json::{Value, json};
572    /// use std::pin::Pin;
573    /// use std::future::Future;
574    ///
575    /// struct MyOp;
576    /// impl Operation for MyOp {
577    ///     fn kind(&self) -> &str { "my-service" }
578    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
579    ///         Box::pin(async { Ok(json!({"ok": true})) })
580    ///     }
581    /// }
582    ///
583    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
584    /// let result = ctx.operation("call-service", &MyOp).await?;
585    /// println!("output: {}", result.output);
586    /// # Ok(())
587    /// # }
588    /// ```
589    pub async fn operation(
590        &mut self,
591        name: &str,
592        op: &dyn Operation,
593    ) -> Result<StepOutput, EngineError> {
594        let kind = StepKind::Custom(op.kind().to_string());
595        let position = self.position;
596        self.position += 1;
597
598        let step = self
599            .store
600            .create_step(NewStep {
601                run_id: self.run_id,
602                name: name.to_string(),
603                kind,
604                position,
605                input: op.input(),
606            })
607            .await?;
608
609        self.start_step(step.id, Utc::now()).await?;
610
611        let start = Instant::now();
612
613        match op.execute().await {
614            Ok(output_value) => {
615                let duration_ms = start.elapsed().as_millis() as u64;
616                self.total_duration_ms += duration_ms;
617
618                let completed_at = Utc::now();
619                self.store
620                    .update_step(
621                        step.id,
622                        StepUpdate {
623                            status: Some(StepStatus::Completed),
624                            output: Some(output_value.clone()),
625                            duration_ms: Some(duration_ms),
626                            cost_usd: Some(Decimal::ZERO),
627                            completed_at: Some(completed_at),
628                            ..StepUpdate::default()
629                        },
630                    )
631                    .await?;
632
633                info!(
634                    run_id = %self.run_id,
635                    step = %name,
636                    kind = op.kind(),
637                    duration_ms,
638                    "operation step completed"
639                );
640
641                self.last_step_ids = vec![step.id];
642
643                Ok(StepOutput {
644                    output: output_value,
645                    duration_ms,
646                    cost_usd: Decimal::ZERO,
647                    input_tokens: None,
648                    output_tokens: None,
649                    debug_messages: None,
650                })
651            }
652            Err(err) => {
653                let completed_at = Utc::now();
654                if let Err(store_err) = self
655                    .store
656                    .update_step(
657                        step.id,
658                        StepUpdate {
659                            status: Some(StepStatus::Failed),
660                            error: Some(err.to_string()),
661                            completed_at: Some(completed_at),
662                            ..StepUpdate::default()
663                        },
664                    )
665                    .await
666                {
667                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
668                }
669
670                Err(err)
671            }
672        }
673    }
674
675    /// Execute a sub-workflow step.
676    ///
677    /// Creates a child run for the named workflow handler, executes it with
678    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
679    /// the child run ID and aggregated metrics.
680    ///
681    /// Requires the context to be created with
682    /// `with_handler_resolver`.
683    ///
684    /// # Errors
685    ///
686    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
687    /// with the given name, or if no handler resolver is available.
688    ///
689    /// # Examples
690    ///
691    /// ```no_run
692    /// use ironflow_engine::context::WorkflowContext;
693    /// use ironflow_engine::error::EngineError;
694    /// use serde_json::json;
695    ///
696    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
697    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
698    /// # Ok(())
699    /// # }
700    /// ```
701    pub async fn workflow(
702        &mut self,
703        handler: &dyn WorkflowHandler,
704        payload: Value,
705    ) -> Result<StepOutput, EngineError> {
706        let config = WorkflowStepConfig::new(handler.name(), payload);
707        let position = self.position;
708        self.position += 1;
709
710        let step = self
711            .store
712            .create_step(NewStep {
713                run_id: self.run_id,
714                name: config.workflow_name.clone(),
715                kind: StepKind::Workflow,
716                position,
717                input: Some(serde_json::to_value(&config)?),
718            })
719            .await?;
720
721        self.start_step(step.id, Utc::now()).await?;
722
723        match self.execute_child_workflow(&config).await {
724            Ok(output) => {
725                self.total_cost_usd += output.cost_usd;
726                self.total_duration_ms += output.duration_ms;
727
728                let completed_at = Utc::now();
729                self.store
730                    .update_step(
731                        step.id,
732                        StepUpdate {
733                            status: Some(StepStatus::Completed),
734                            output: Some(output.output.clone()),
735                            duration_ms: Some(output.duration_ms),
736                            cost_usd: Some(output.cost_usd),
737                            completed_at: Some(completed_at),
738                            ..StepUpdate::default()
739                        },
740                    )
741                    .await?;
742
743                info!(
744                    run_id = %self.run_id,
745                    child_workflow = %config.workflow_name,
746                    duration_ms = output.duration_ms,
747                    "workflow step completed"
748                );
749
750                self.last_step_ids = vec![step.id];
751
752                Ok(output)
753            }
754            Err(err) => {
755                let completed_at = Utc::now();
756                if let Err(store_err) = self
757                    .store
758                    .update_step(
759                        step.id,
760                        StepUpdate {
761                            status: Some(StepStatus::Failed),
762                            error: Some(err.to_string()),
763                            completed_at: Some(completed_at),
764                            ..StepUpdate::default()
765                        },
766                    )
767                    .await
768                {
769                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
770                }
771
772                Err(err)
773            }
774        }
775    }
776
777    /// Execute a child workflow and return aggregated output.
778    async fn execute_child_workflow(
779        &self,
780        config: &WorkflowStepConfig,
781    ) -> Result<StepOutput, EngineError> {
782        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
783            EngineError::InvalidWorkflow(
784                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
785            )
786        })?;
787
788        let handler = resolver(&config.workflow_name).ok_or_else(|| {
789            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
790        })?;
791
792        let child_run = self
793            .store
794            .create_run(NewRun {
795                workflow_name: config.workflow_name.clone(),
796                trigger: TriggerKind::Workflow,
797                payload: config.payload.clone(),
798                max_retries: 0,
799            })
800            .await?;
801
802        let child_run_id = child_run.id;
803        info!(
804            parent_run_id = %self.run_id,
805            child_run_id = %child_run_id,
806            workflow = %config.workflow_name,
807            "child run created"
808        );
809
810        self.store
811            .update_run_status(child_run_id, RunStatus::Running)
812            .await?;
813
814        let run_start = Instant::now();
815        let mut child_ctx = WorkflowContext {
816            run_id: child_run_id,
817            store: self.store.clone(),
818            provider: self.provider.clone(),
819            handler_resolver: self.handler_resolver.clone(),
820            position: 0,
821            last_step_ids: Vec::new(),
822            total_cost_usd: Decimal::ZERO,
823            total_duration_ms: 0,
824            replay_steps: HashMap::new(),
825        };
826
827        let result = handler.execute(&mut child_ctx).await;
828        let total_duration = run_start.elapsed().as_millis() as u64;
829        let completed_at = Utc::now();
830
831        match result {
832            Ok(()) => {
833                self.store
834                    .update_run(
835                        child_run_id,
836                        RunUpdate {
837                            status: Some(RunStatus::Completed),
838                            cost_usd: Some(child_ctx.total_cost_usd),
839                            duration_ms: Some(total_duration),
840                            completed_at: Some(completed_at),
841                            ..RunUpdate::default()
842                        },
843                    )
844                    .await?;
845
846                Ok(StepOutput {
847                    output: serde_json::json!({
848                        "run_id": child_run_id,
849                        "workflow_name": config.workflow_name,
850                        "status": RunStatus::Completed,
851                        "cost_usd": child_ctx.total_cost_usd,
852                        "duration_ms": total_duration,
853                    }),
854                    duration_ms: total_duration,
855                    cost_usd: child_ctx.total_cost_usd,
856                    input_tokens: None,
857                    output_tokens: None,
858                    debug_messages: None,
859                })
860            }
861            Err(err) => {
862                if let Err(store_err) = self
863                    .store
864                    .update_run(
865                        child_run_id,
866                        RunUpdate {
867                            status: Some(RunStatus::Failed),
868                            error: Some(err.to_string()),
869                            cost_usd: Some(child_ctx.total_cost_usd),
870                            duration_ms: Some(total_duration),
871                            completed_at: Some(completed_at),
872                            ..RunUpdate::default()
873                        },
874                    )
875                    .await
876                {
877                    error!(
878                        child_run_id = %child_run_id,
879                        store_error = %store_err,
880                        "failed to persist child run failure"
881                    );
882                }
883
884                Err(err)
885            }
886        }
887    }
888
889    /// Try to replay a completed step from a previous execution.
890    ///
891    /// Returns `Some(StepOutput)` if a completed step exists at the given
892    /// position, `None` otherwise.
893    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
894        let step = self.replay_steps.get(&position)?;
895        if step.status.state != StepStatus::Completed {
896            return None;
897        }
898        let output = StepOutput {
899            output: step.output.clone().unwrap_or(Value::Null),
900            duration_ms: step.duration_ms,
901            cost_usd: step.cost_usd,
902            input_tokens: step.input_tokens,
903            output_tokens: step.output_tokens,
904            debug_messages: None,
905        };
906        self.total_cost_usd += output.cost_usd;
907        self.total_duration_ms += output.duration_ms;
908        self.last_step_ids = vec![step.id];
909        info!(
910            run_id = %self.run_id,
911            step = %step.name,
912            position,
913            "step replayed from previous execution"
914        );
915        Some(output)
916    }
917
918    /// Internal: execute a step with full persistence lifecycle.
919    async fn execute_step(
920        &mut self,
921        name: &str,
922        kind: StepKind,
923        config: StepConfig,
924    ) -> Result<StepOutput, EngineError> {
925        let position = self.position;
926        self.position += 1;
927
928        // Replay: if this step already completed in a prior execution, return cached output.
929        if let Some(output) = self.try_replay_step(position) {
930            return Ok(output);
931        }
932
933        // Create step record in Pending.
934        let step = self
935            .store
936            .create_step(NewStep {
937                run_id: self.run_id,
938                name: name.to_string(),
939                kind,
940                position,
941                input: Some(serde_json::to_value(&config)?),
942            })
943            .await?;
944
945        self.start_step(step.id, Utc::now()).await?;
946
947        match execute_step_config(&config, &self.provider).await {
948            Ok(output) => {
949                self.total_cost_usd += output.cost_usd;
950                self.total_duration_ms += output.duration_ms;
951
952                let debug_messages_json = output.debug_messages_json();
953
954                let completed_at = Utc::now();
955                self.store
956                    .update_step(
957                        step.id,
958                        StepUpdate {
959                            status: Some(StepStatus::Completed),
960                            output: Some(output.output.clone()),
961                            duration_ms: Some(output.duration_ms),
962                            cost_usd: Some(output.cost_usd),
963                            input_tokens: output.input_tokens,
964                            output_tokens: output.output_tokens,
965                            completed_at: Some(completed_at),
966                            debug_messages: debug_messages_json,
967                            ..StepUpdate::default()
968                        },
969                    )
970                    .await?;
971
972                info!(
973                    run_id = %self.run_id,
974                    step = %name,
975                    duration_ms = output.duration_ms,
976                    "step completed"
977                );
978
979                self.last_step_ids = vec![step.id];
980
981                Ok(output)
982            }
983            Err(err) => {
984                let completed_at = Utc::now();
985                let debug_messages_json = extract_debug_messages_from_error(&err);
986
987                if let Err(store_err) = self
988                    .store
989                    .update_step(
990                        step.id,
991                        StepUpdate {
992                            status: Some(StepStatus::Failed),
993                            error: Some(err.to_string()),
994                            completed_at: Some(completed_at),
995                            debug_messages: debug_messages_json,
996                            ..StepUpdate::default()
997                        },
998                    )
999                    .await
1000                {
1001                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1002                }
1003
1004                Err(err)
1005            }
1006        }
1007    }
1008
1009    /// Record dependency edges and transition a step to Running.
1010    ///
1011    /// Records edges from `step_id` to all `last_step_ids`, then
1012    /// transitions the step to `Running` with the given timestamp.
1013    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1014        if !self.last_step_ids.is_empty() {
1015            let deps: Vec<NewStepDependency> = self
1016                .last_step_ids
1017                .iter()
1018                .map(|&depends_on| NewStepDependency {
1019                    step_id,
1020                    depends_on,
1021                })
1022                .collect();
1023            self.store.create_step_dependencies(deps).await?;
1024        }
1025
1026        self.store
1027            .update_step(
1028                step_id,
1029                StepUpdate {
1030                    status: Some(StepStatus::Running),
1031                    started_at: Some(now),
1032                    ..StepUpdate::default()
1033                },
1034            )
1035            .await?;
1036
1037        Ok(())
1038    }
1039
1040    /// Access the store directly (advanced usage).
1041    pub fn store(&self) -> &Arc<dyn RunStore> {
1042        &self.store
1043    }
1044
1045    /// Access the payload that triggered this run.
1046    ///
1047    /// Fetches the run from the store and returns its payload.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns [`EngineError::Store`] if the run is not found.
1052    pub async fn payload(&self) -> Result<Value, EngineError> {
1053        let run = self
1054            .store
1055            .get_run(self.run_id)
1056            .await?
1057            .ok_or(EngineError::Store(
1058                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1059            ))?;
1060        Ok(run.payload)
1061    }
1062}
1063
1064impl fmt::Debug for WorkflowContext {
1065    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1066        f.debug_struct("WorkflowContext")
1067            .field("run_id", &self.run_id)
1068            .field("position", &self.position)
1069            .field("total_cost_usd", &self.total_cost_usd)
1070            .finish_non_exhaustive()
1071    }
1072}
1073
1074/// Extract debug messages from an engine error, if it wraps a schema validation
1075/// failure that carries a verbose conversation trace.
1076fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1077    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1078        debug_messages,
1079        ..
1080    })) = err
1081        && !debug_messages.is_empty()
1082    {
1083        return serde_json::to_value(debug_messages).ok();
1084    }
1085    None
1086}