Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::time::Instant;
28
29use chrono::{DateTime, Utc};
30use rust_decimal::Decimal;
31use serde_json::Value;
32use tracing::{error, info};
33use uuid::Uuid;
34
35use ironflow_core::provider::AgentProvider;
36use ironflow_store::models::{
37    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, StepKind, StepStatus, StepUpdate,
38    TriggerKind,
39};
40use ironflow_store::store::RunStore;
41
42use crate::config::{AgentStepConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig};
43use crate::error::EngineError;
44use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
45use crate::handler::WorkflowHandler;
46use crate::operation::Operation;
47
48/// Callback type for resolving workflow handlers by name.
49pub(crate) type HandlerResolver =
50    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
51
52/// Execution context for a single workflow run.
53///
54/// Tracks the current step position and provides convenience methods
55/// for executing operations with automatic persistence.
56///
57/// # Examples
58///
59/// ```no_run
60/// use ironflow_engine::context::WorkflowContext;
61/// use ironflow_engine::config::ShellConfig;
62/// use ironflow_engine::error::EngineError;
63///
64/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
65/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
66/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
67/// # Ok(())
68/// # }
69/// ```
70pub struct WorkflowContext {
71    run_id: Uuid,
72    store: Arc<dyn RunStore>,
73    provider: Arc<dyn AgentProvider>,
74    handler_resolver: Option<HandlerResolver>,
75    position: u32,
76    /// IDs of the last executed step(s) -- used to record DAG dependencies.
77    last_step_ids: Vec<Uuid>,
78    /// Accumulated cost across all steps in this run.
79    total_cost_usd: Decimal,
80    /// Accumulated duration across all steps.
81    total_duration_ms: u64,
82}
83
84impl WorkflowContext {
85    /// Create a new context for a run.
86    ///
87    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
88    /// creates this when executing a [`WorkflowHandler`].
89    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
90        Self {
91            run_id,
92            store,
93            provider,
94            handler_resolver: None,
95            position: 0,
96            last_step_ids: Vec::new(),
97            total_cost_usd: Decimal::ZERO,
98            total_duration_ms: 0,
99        }
100    }
101
102    /// Create a new context with a handler resolver for sub-workflow support.
103    ///
104    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
105    /// look up registered handlers by name.
106    pub(crate) fn with_handler_resolver(
107        run_id: Uuid,
108        store: Arc<dyn RunStore>,
109        provider: Arc<dyn AgentProvider>,
110        resolver: HandlerResolver,
111    ) -> Self {
112        Self {
113            run_id,
114            store,
115            provider,
116            handler_resolver: Some(resolver),
117            position: 0,
118            last_step_ids: Vec::new(),
119            total_cost_usd: Decimal::ZERO,
120            total_duration_ms: 0,
121        }
122    }
123
124    /// The run ID this context is executing for.
125    pub fn run_id(&self) -> Uuid {
126        self.run_id
127    }
128
129    /// Accumulated cost across all executed steps so far.
130    pub fn total_cost_usd(&self) -> Decimal {
131        self.total_cost_usd
132    }
133
134    /// Accumulated duration across all executed steps so far.
135    pub fn total_duration_ms(&self) -> u64 {
136        self.total_duration_ms
137    }
138
139    /// Execute multiple steps concurrently (wait-all model).
140    ///
141    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
142    /// Each step is recorded with the same `position` (execution wave).
143    /// Dependencies on previous steps are recorded automatically.
144    ///
145    /// When `fail_fast` is true, remaining steps are aborted on the first
146    /// failure. When false, all steps run to completion and the first
147    /// error is returned.
148    ///
149    /// # Errors
150    ///
151    /// Returns [`EngineError`] if any step fails.
152    ///
153    /// # Examples
154    ///
155    /// ```no_run
156    /// use ironflow_engine::context::WorkflowContext;
157    /// use ironflow_engine::config::{StepConfig, ShellConfig};
158    /// use ironflow_engine::error::EngineError;
159    ///
160    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
161    /// let results = ctx.parallel(
162    ///     vec![
163    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
164    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
165    ///     ],
166    ///     true,
167    /// ).await?;
168    ///
169    /// for r in &results {
170    ///     println!("{}: {:?}", r.name, r.output.output);
171    /// }
172    /// # Ok(())
173    /// # }
174    /// ```
175    pub async fn parallel(
176        &mut self,
177        steps: Vec<(&str, StepConfig)>,
178        fail_fast: bool,
179    ) -> Result<Vec<ParallelStepResult>, EngineError> {
180        if steps.is_empty() {
181            return Ok(Vec::new());
182        }
183
184        let wave_position = self.position;
185        self.position += 1;
186
187        let now = Utc::now();
188        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
189
190        for (name, config) in &steps {
191            let kind = config.kind();
192            let step = self
193                .store
194                .create_step(NewStep {
195                    run_id: self.run_id,
196                    name: name.to_string(),
197                    kind,
198                    position: wave_position,
199                    input: Some(serde_json::to_value(config)?),
200                })
201                .await?;
202
203            self.start_step(step.id, now).await?;
204
205            step_records.push((step.id, name.to_string(), config.clone()));
206        }
207
208        let mut join_set = tokio::task::JoinSet::new();
209        for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
210            let provider = self.provider.clone();
211            let config = config.clone();
212            join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
213        }
214
215        // JoinSet returns in completion order; indexed_results restores input order.
216        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
217            vec![None; step_records.len()];
218        let mut first_error: Option<EngineError> = None;
219
220        while let Some(join_result) = join_set.join_next().await {
221            let (idx, step_result) = match join_result {
222                Ok(r) => r,
223                Err(e) => {
224                    if first_error.is_none() {
225                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
226                    }
227                    if fail_fast {
228                        join_set.abort_all();
229                    }
230                    continue;
231                }
232            };
233
234            let (step_id, step_name, _) = &step_records[idx];
235            let completed_at = Utc::now();
236
237            match step_result {
238                Ok(output) => {
239                    self.total_cost_usd += output.cost_usd;
240                    self.total_duration_ms += output.duration_ms;
241
242                    self.store
243                        .update_step(
244                            *step_id,
245                            StepUpdate {
246                                status: Some(StepStatus::Completed),
247                                output: Some(output.output.clone()),
248                                duration_ms: Some(output.duration_ms),
249                                cost_usd: Some(output.cost_usd),
250                                input_tokens: output.input_tokens,
251                                output_tokens: output.output_tokens,
252                                completed_at: Some(completed_at),
253                                ..StepUpdate::default()
254                            },
255                        )
256                        .await?;
257
258                    info!(
259                        run_id = %self.run_id,
260                        step = %step_name,
261                        duration_ms = output.duration_ms,
262                        "parallel step completed"
263                    );
264
265                    indexed_results[idx] = Some(Ok(output));
266                }
267                Err(err) => {
268                    let err_msg = err.to_string();
269
270                    if let Err(store_err) = self
271                        .store
272                        .update_step(
273                            *step_id,
274                            StepUpdate {
275                                status: Some(StepStatus::Failed),
276                                error: Some(err_msg.clone()),
277                                completed_at: Some(completed_at),
278                                ..StepUpdate::default()
279                            },
280                        )
281                        .await
282                    {
283                        tracing::error!(
284                            step_id = %step_id,
285                            error = %store_err,
286                            "failed to persist parallel step failure"
287                        );
288                    }
289
290                    indexed_results[idx] = Some(Err(err_msg.clone()));
291
292                    if first_error.is_none() {
293                        first_error = Some(err);
294                    }
295
296                    if fail_fast {
297                        join_set.abort_all();
298                    }
299                }
300            }
301        }
302
303        if let Some(err) = first_error {
304            return Err(err);
305        }
306
307        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
308
309        // Build results in original order.
310        let results: Vec<ParallelStepResult> = step_records
311            .iter()
312            .enumerate()
313            .map(|(idx, (step_id, name, _))| {
314                let output = match indexed_results[idx].take() {
315                    Some(Ok(o)) => o,
316                    _ => unreachable!("all steps succeeded if no error returned"),
317                };
318                ParallelStepResult {
319                    name: name.clone(),
320                    output,
321                    step_id: *step_id,
322                }
323            })
324            .collect();
325
326        Ok(results)
327    }
328
329    /// Execute a shell step.
330    ///
331    /// Creates the step record, runs the command, persists the result,
332    /// and returns the output for use in subsequent steps.
333    ///
334    /// # Errors
335    ///
336    /// Returns [`EngineError`] if the command fails or the store errors.
337    ///
338    /// # Examples
339    ///
340    /// ```no_run
341    /// use ironflow_engine::context::WorkflowContext;
342    /// use ironflow_engine::config::ShellConfig;
343    /// use ironflow_engine::error::EngineError;
344    ///
345    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
346    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
347    /// println!("stdout: {}", files.output["stdout"]);
348    /// # Ok(())
349    /// # }
350    /// ```
351    pub async fn shell(
352        &mut self,
353        name: &str,
354        config: ShellConfig,
355    ) -> Result<StepOutput, EngineError> {
356        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
357            .await
358    }
359
360    /// Execute an HTTP step.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`EngineError`] if the request fails or the store errors.
365    ///
366    /// # Examples
367    ///
368    /// ```no_run
369    /// use ironflow_engine::context::WorkflowContext;
370    /// use ironflow_engine::config::HttpConfig;
371    /// use ironflow_engine::error::EngineError;
372    ///
373    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
375    /// println!("status: {}", resp.output["status"]);
376    /// # Ok(())
377    /// # }
378    /// ```
379    pub async fn http(
380        &mut self,
381        name: &str,
382        config: HttpConfig,
383    ) -> Result<StepOutput, EngineError> {
384        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
385            .await
386    }
387
388    /// Execute an agent step.
389    ///
390    /// # Errors
391    ///
392    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
393    ///
394    /// # Examples
395    ///
396    /// ```no_run
397    /// use ironflow_engine::context::WorkflowContext;
398    /// use ironflow_engine::config::AgentStepConfig;
399    /// use ironflow_engine::error::EngineError;
400    ///
401    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
403    /// println!("review: {}", review.output["value"]);
404    /// # Ok(())
405    /// # }
406    /// ```
407    pub async fn agent(
408        &mut self,
409        name: &str,
410        config: AgentStepConfig,
411    ) -> Result<StepOutput, EngineError> {
412        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
413            .await
414    }
415
416    /// Execute a custom operation step.
417    ///
418    /// Runs a user-defined [`Operation`] with full step lifecycle management:
419    /// creates the step record, transitions to Running, executes the operation,
420    /// persists the output and duration, and marks the step Completed or Failed.
421    ///
422    /// The operation's [`kind()`](Operation::kind) is stored as
423    /// [`StepKind::Custom`].
424    ///
425    /// # Errors
426    ///
427    /// Returns [`EngineError`] if the operation fails or the store errors.
428    ///
429    /// # Examples
430    ///
431    /// ```no_run
432    /// use ironflow_engine::context::WorkflowContext;
433    /// use ironflow_engine::operation::Operation;
434    /// use ironflow_engine::error::EngineError;
435    /// use serde_json::{Value, json};
436    /// use std::pin::Pin;
437    /// use std::future::Future;
438    ///
439    /// struct MyOp;
440    /// impl Operation for MyOp {
441    ///     fn kind(&self) -> &str { "my-service" }
442    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
443    ///         Box::pin(async { Ok(json!({"ok": true})) })
444    ///     }
445    /// }
446    ///
447    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
448    /// let result = ctx.operation("call-service", &MyOp).await?;
449    /// println!("output: {}", result.output);
450    /// # Ok(())
451    /// # }
452    /// ```
453    pub async fn operation(
454        &mut self,
455        name: &str,
456        op: &dyn Operation,
457    ) -> Result<StepOutput, EngineError> {
458        let kind = StepKind::Custom(op.kind().to_string());
459        let position = self.position;
460        self.position += 1;
461
462        let step = self
463            .store
464            .create_step(NewStep {
465                run_id: self.run_id,
466                name: name.to_string(),
467                kind,
468                position,
469                input: op.input(),
470            })
471            .await?;
472
473        self.start_step(step.id, Utc::now()).await?;
474
475        let start = std::time::Instant::now();
476
477        match op.execute().await {
478            Ok(output_value) => {
479                let duration_ms = start.elapsed().as_millis() as u64;
480                self.total_duration_ms += duration_ms;
481
482                let completed_at = Utc::now();
483                self.store
484                    .update_step(
485                        step.id,
486                        StepUpdate {
487                            status: Some(StepStatus::Completed),
488                            output: Some(output_value.clone()),
489                            duration_ms: Some(duration_ms),
490                            cost_usd: Some(Decimal::ZERO),
491                            completed_at: Some(completed_at),
492                            ..StepUpdate::default()
493                        },
494                    )
495                    .await?;
496
497                info!(
498                    run_id = %self.run_id,
499                    step = %name,
500                    kind = op.kind(),
501                    duration_ms,
502                    "operation step completed"
503                );
504
505                self.last_step_ids = vec![step.id];
506
507                Ok(StepOutput {
508                    output: output_value,
509                    duration_ms,
510                    cost_usd: Decimal::ZERO,
511                    input_tokens: None,
512                    output_tokens: None,
513                })
514            }
515            Err(err) => {
516                let completed_at = Utc::now();
517                if let Err(store_err) = self
518                    .store
519                    .update_step(
520                        step.id,
521                        StepUpdate {
522                            status: Some(StepStatus::Failed),
523                            error: Some(err.to_string()),
524                            completed_at: Some(completed_at),
525                            ..StepUpdate::default()
526                        },
527                    )
528                    .await
529                {
530                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
531                }
532
533                Err(err)
534            }
535        }
536    }
537
538    /// Execute a sub-workflow step.
539    ///
540    /// Creates a child run for the named workflow handler, executes it with
541    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
542    /// the child run ID and aggregated metrics.
543    ///
544    /// Requires the context to be created with
545    /// `with_handler_resolver`.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
550    /// with the given name, or if no handler resolver is available.
551    ///
552    /// # Examples
553    ///
554    /// ```no_run
555    /// use ironflow_engine::context::WorkflowContext;
556    /// use ironflow_engine::error::EngineError;
557    /// use serde_json::json;
558    ///
559    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
560    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
561    /// # Ok(())
562    /// # }
563    /// ```
564    pub async fn workflow(
565        &mut self,
566        handler: &dyn WorkflowHandler,
567        payload: Value,
568    ) -> Result<StepOutput, EngineError> {
569        let config = WorkflowStepConfig::new(handler.name(), payload);
570        let position = self.position;
571        self.position += 1;
572
573        let step = self
574            .store
575            .create_step(NewStep {
576                run_id: self.run_id,
577                name: config.workflow_name.clone(),
578                kind: StepKind::Workflow,
579                position,
580                input: Some(serde_json::to_value(&config)?),
581            })
582            .await?;
583
584        self.start_step(step.id, Utc::now()).await?;
585
586        match self.execute_child_workflow(&config).await {
587            Ok(output) => {
588                self.total_cost_usd += output.cost_usd;
589                self.total_duration_ms += output.duration_ms;
590
591                let completed_at = Utc::now();
592                self.store
593                    .update_step(
594                        step.id,
595                        StepUpdate {
596                            status: Some(StepStatus::Completed),
597                            output: Some(output.output.clone()),
598                            duration_ms: Some(output.duration_ms),
599                            cost_usd: Some(output.cost_usd),
600                            completed_at: Some(completed_at),
601                            ..StepUpdate::default()
602                        },
603                    )
604                    .await?;
605
606                info!(
607                    run_id = %self.run_id,
608                    child_workflow = %config.workflow_name,
609                    duration_ms = output.duration_ms,
610                    "workflow step completed"
611                );
612
613                self.last_step_ids = vec![step.id];
614
615                Ok(output)
616            }
617            Err(err) => {
618                let completed_at = Utc::now();
619                if let Err(store_err) = self
620                    .store
621                    .update_step(
622                        step.id,
623                        StepUpdate {
624                            status: Some(StepStatus::Failed),
625                            error: Some(err.to_string()),
626                            completed_at: Some(completed_at),
627                            ..StepUpdate::default()
628                        },
629                    )
630                    .await
631                {
632                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
633                }
634
635                Err(err)
636            }
637        }
638    }
639
640    /// Execute a child workflow and return aggregated output.
641    async fn execute_child_workflow(
642        &self,
643        config: &WorkflowStepConfig,
644    ) -> Result<StepOutput, EngineError> {
645        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
646            EngineError::InvalidWorkflow(
647                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
648            )
649        })?;
650
651        let handler = resolver(&config.workflow_name).ok_or_else(|| {
652            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
653        })?;
654
655        let child_run = self
656            .store
657            .create_run(NewRun {
658                workflow_name: config.workflow_name.clone(),
659                trigger: TriggerKind::Workflow,
660                payload: config.payload.clone(),
661                max_retries: 0,
662            })
663            .await?;
664
665        let child_run_id = child_run.id;
666        info!(
667            parent_run_id = %self.run_id,
668            child_run_id = %child_run_id,
669            workflow = %config.workflow_name,
670            "child run created"
671        );
672
673        self.store
674            .update_run_status(child_run_id, RunStatus::Running)
675            .await?;
676
677        let run_start = Instant::now();
678        let mut child_ctx = WorkflowContext {
679            run_id: child_run_id,
680            store: self.store.clone(),
681            provider: self.provider.clone(),
682            handler_resolver: self.handler_resolver.clone(),
683            position: 0,
684            last_step_ids: Vec::new(),
685            total_cost_usd: Decimal::ZERO,
686            total_duration_ms: 0,
687        };
688
689        let result = handler.execute(&mut child_ctx).await;
690        let total_duration = run_start.elapsed().as_millis() as u64;
691        let completed_at = Utc::now();
692
693        match result {
694            Ok(()) => {
695                self.store
696                    .update_run(
697                        child_run_id,
698                        RunUpdate {
699                            status: Some(RunStatus::Completed),
700                            cost_usd: Some(child_ctx.total_cost_usd),
701                            duration_ms: Some(total_duration),
702                            completed_at: Some(completed_at),
703                            ..RunUpdate::default()
704                        },
705                    )
706                    .await?;
707
708                Ok(StepOutput {
709                    output: serde_json::json!({
710                        "run_id": child_run_id,
711                        "workflow_name": config.workflow_name,
712                        "status": RunStatus::Completed,
713                        "cost_usd": child_ctx.total_cost_usd,
714                        "duration_ms": total_duration,
715                    }),
716                    duration_ms: total_duration,
717                    cost_usd: child_ctx.total_cost_usd,
718                    input_tokens: None,
719                    output_tokens: None,
720                })
721            }
722            Err(err) => {
723                if let Err(store_err) = self
724                    .store
725                    .update_run(
726                        child_run_id,
727                        RunUpdate {
728                            status: Some(RunStatus::Failed),
729                            error: Some(err.to_string()),
730                            cost_usd: Some(child_ctx.total_cost_usd),
731                            duration_ms: Some(total_duration),
732                            completed_at: Some(completed_at),
733                            ..RunUpdate::default()
734                        },
735                    )
736                    .await
737                {
738                    error!(
739                        child_run_id = %child_run_id,
740                        store_error = %store_err,
741                        "failed to persist child run failure"
742                    );
743                }
744
745                Err(err)
746            }
747        }
748    }
749
750    /// Internal: execute a step with full persistence lifecycle.
751    async fn execute_step(
752        &mut self,
753        name: &str,
754        kind: StepKind,
755        config: StepConfig,
756    ) -> Result<StepOutput, EngineError> {
757        let position = self.position;
758        self.position += 1;
759
760        // Create step record in Pending.
761        let step = self
762            .store
763            .create_step(NewStep {
764                run_id: self.run_id,
765                name: name.to_string(),
766                kind,
767                position,
768                input: Some(serde_json::to_value(&config)?),
769            })
770            .await?;
771
772        self.start_step(step.id, Utc::now()).await?;
773
774        match execute_step_config(&config, &self.provider).await {
775            Ok(output) => {
776                self.total_cost_usd += output.cost_usd;
777                self.total_duration_ms += output.duration_ms;
778
779                let completed_at = Utc::now();
780                self.store
781                    .update_step(
782                        step.id,
783                        StepUpdate {
784                            status: Some(StepStatus::Completed),
785                            output: Some(output.output.clone()),
786                            duration_ms: Some(output.duration_ms),
787                            cost_usd: Some(output.cost_usd),
788                            input_tokens: output.input_tokens,
789                            output_tokens: output.output_tokens,
790                            completed_at: Some(completed_at),
791                            ..StepUpdate::default()
792                        },
793                    )
794                    .await?;
795
796                info!(
797                    run_id = %self.run_id,
798                    step = %name,
799                    duration_ms = output.duration_ms,
800                    "step completed"
801                );
802
803                self.last_step_ids = vec![step.id];
804
805                Ok(output)
806            }
807            Err(err) => {
808                let completed_at = Utc::now();
809                if let Err(store_err) = self
810                    .store
811                    .update_step(
812                        step.id,
813                        StepUpdate {
814                            status: Some(StepStatus::Failed),
815                            error: Some(err.to_string()),
816                            completed_at: Some(completed_at),
817                            ..StepUpdate::default()
818                        },
819                    )
820                    .await
821                {
822                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
823                }
824
825                Err(err)
826            }
827        }
828    }
829
830    /// Record dependency edges and transition a step to Running.
831    ///
832    /// Records edges from `step_id` to all `last_step_ids`, then
833    /// transitions the step to `Running` with the given timestamp.
834    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
835        if !self.last_step_ids.is_empty() {
836            let deps: Vec<NewStepDependency> = self
837                .last_step_ids
838                .iter()
839                .map(|&depends_on| NewStepDependency {
840                    step_id,
841                    depends_on,
842                })
843                .collect();
844            self.store.create_step_dependencies(deps).await?;
845        }
846
847        self.store
848            .update_step(
849                step_id,
850                StepUpdate {
851                    status: Some(StepStatus::Running),
852                    started_at: Some(now),
853                    ..StepUpdate::default()
854                },
855            )
856            .await?;
857
858        Ok(())
859    }
860
861    /// Access the store directly (advanced usage).
862    pub fn store(&self) -> &Arc<dyn RunStore> {
863        &self.store
864    }
865
866    /// Access the payload that triggered this run.
867    ///
868    /// Fetches the run from the store and returns its payload.
869    ///
870    /// # Errors
871    ///
872    /// Returns [`EngineError::Store`] if the run is not found.
873    pub async fn payload(&self) -> Result<Value, EngineError> {
874        let run = self
875            .store
876            .get_run(self.run_id)
877            .await?
878            .ok_or(EngineError::Store(
879                ironflow_store::error::StoreError::RunNotFound(self.run_id),
880            ))?;
881        Ok(run.payload)
882    }
883}
884
885impl std::fmt::Debug for WorkflowContext {
886    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887        f.debug_struct("WorkflowContext")
888            .field("run_id", &self.run_id)
889            .field("position", &self.position)
890            .field("total_cost_usd", &self.total_cost_usd)
891            .finish_non_exhaustive()
892    }
893}