Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42    StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::log_sender::{LogSender, StepLogSender};
53use crate::operation::Operation;
54
55/// Callback type for resolving workflow handlers by name.
56pub(crate) type HandlerResolver =
57    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
58
59/// Execution context for a single workflow run.
60///
61/// Tracks the current step position and provides convenience methods
62/// for executing operations with automatic persistence.
63///
64/// # Examples
65///
66/// ```no_run
67/// use ironflow_engine::context::WorkflowContext;
68/// use ironflow_engine::config::ShellConfig;
69/// use ironflow_engine::error::EngineError;
70///
71/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
72/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
73/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
74/// # Ok(())
75/// # }
76/// ```
77pub struct WorkflowContext {
78    run_id: Uuid,
79    store: Arc<dyn Store>,
80    provider: Arc<dyn AgentProvider>,
81    handler_resolver: Option<HandlerResolver>,
82    position: u32,
83    /// IDs of the last executed step(s) -- used to record DAG dependencies.
84    last_step_ids: Vec<Uuid>,
85    /// Accumulated cost across all steps in this run.
86    total_cost_usd: Decimal,
87    /// Accumulated duration across all steps.
88    total_duration_ms: u64,
89    /// Steps from a previous execution, keyed by position.
90    /// Used when resuming after approval to replay completed steps.
91    replay_steps: HashMap<u32, Step>,
92    /// Optional sender for real-time log streaming.
93    log_sender: Option<LogSender>,
94}
95
96impl WorkflowContext {
97    /// Create a new context for a run.
98    ///
99    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
100    /// creates this when executing a [`WorkflowHandler`].
101    pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
102        Self {
103            run_id,
104            store,
105            provider,
106            handler_resolver: None,
107            position: 0,
108            last_step_ids: Vec::new(),
109            total_cost_usd: Decimal::ZERO,
110            total_duration_ms: 0,
111            replay_steps: HashMap::new(),
112            log_sender: None,
113        }
114    }
115
116    /// Create a new context with a handler resolver for sub-workflow support.
117    ///
118    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
119    /// look up registered handlers by name.
120    pub(crate) fn with_handler_resolver(
121        run_id: Uuid,
122        store: Arc<dyn Store>,
123        provider: Arc<dyn AgentProvider>,
124        resolver: HandlerResolver,
125    ) -> Self {
126        Self {
127            run_id,
128            store,
129            provider,
130            handler_resolver: Some(resolver),
131            position: 0,
132            last_step_ids: Vec::new(),
133            total_cost_usd: Decimal::ZERO,
134            total_duration_ms: 0,
135            replay_steps: HashMap::new(),
136            log_sender: None,
137        }
138    }
139
140    /// Attach a log sender for real-time step output streaming.
141    pub fn set_log_sender(&mut self, sender: LogSender) {
142        self.log_sender = Some(sender);
143    }
144
145    /// Load existing steps from the store for replay after approval.
146    ///
147    /// Called by the engine when resuming a run. All completed steps
148    /// and the approved approval step are indexed by position so that
149    /// `execute_step` and `approval` can skip them.
150    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
151        let steps = self.store.list_steps(self.run_id).await?;
152        for step in steps {
153            let dominated = matches!(
154                step.status.state,
155                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
156            );
157            if dominated {
158                self.replay_steps.insert(step.position, step);
159            }
160        }
161        Ok(())
162    }
163
164    /// The run ID this context is executing for.
165    pub fn run_id(&self) -> Uuid {
166        self.run_id
167    }
168
169    /// Accumulated cost across all executed steps so far.
170    pub fn total_cost_usd(&self) -> Decimal {
171        self.total_cost_usd
172    }
173
174    /// Accumulated duration across all executed steps so far.
175    pub fn total_duration_ms(&self) -> u64 {
176        self.total_duration_ms
177    }
178
179    /// Execute multiple steps concurrently (wait-all model).
180    ///
181    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
182    /// Each step is recorded with the same `position` (execution wave).
183    /// Dependencies on previous steps are recorded automatically.
184    ///
185    /// When `fail_fast` is true, remaining steps are aborted on the first
186    /// failure. When false, all steps run to completion and the first
187    /// error is returned.
188    ///
189    /// # Errors
190    ///
191    /// Returns [`EngineError`] if any step fails.
192    ///
193    /// # Examples
194    ///
195    /// ```no_run
196    /// use ironflow_engine::context::WorkflowContext;
197    /// use ironflow_engine::config::{StepConfig, ShellConfig};
198    /// use ironflow_engine::error::EngineError;
199    ///
200    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
201    /// let results = ctx.parallel(
202    ///     vec![
203    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
204    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
205    ///     ],
206    ///     true,
207    /// ).await?;
208    ///
209    /// for r in &results {
210    ///     println!("{}: {:?}", r.name, r.output.output);
211    /// }
212    /// # Ok(())
213    /// # }
214    /// ```
215    pub async fn parallel(
216        &mut self,
217        steps: Vec<(&str, StepConfig)>,
218        fail_fast: bool,
219    ) -> Result<Vec<ParallelStepResult>, EngineError> {
220        if steps.is_empty() {
221            return Ok(Vec::new());
222        }
223
224        let wave_position = self.position;
225        self.position += 1;
226
227        let now = Utc::now();
228        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
229
230        for (name, config) in &steps {
231            let kind = config.kind();
232            let step = self
233                .store
234                .create_step(NewStep {
235                    run_id: self.run_id,
236                    name: name.to_string(),
237                    kind,
238                    position: wave_position,
239                    input: Some(serde_json::to_value(config)?),
240                })
241                .await?;
242
243            self.start_step(step.id, now).await?;
244
245            step_records.push((step.id, name.to_string(), config.clone()));
246        }
247
248        let mut join_set = JoinSet::new();
249        for (idx, (step_id, step_name, config)) in step_records.iter().enumerate() {
250            let provider = self.provider.clone();
251            let config = config.clone();
252            let step_log_sender = self
253                .log_sender
254                .as_ref()
255                .map(|s| StepLogSender::new(s.clone(), self.run_id, *step_id, step_name.clone()));
256            join_set.spawn(async move {
257                (
258                    idx,
259                    execute_step_config(&config, &provider, step_log_sender).await,
260                )
261            });
262        }
263
264        // JoinSet returns in completion order; indexed_results restores input order.
265        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
266            vec![None; step_records.len()];
267        let mut first_error: Option<EngineError> = None;
268
269        while let Some(join_result) = join_set.join_next().await {
270            let (idx, step_result) = match join_result {
271                Ok(r) => r,
272                Err(e) => {
273                    if first_error.is_none() {
274                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
275                    }
276                    if fail_fast {
277                        join_set.abort_all();
278                    }
279                    continue;
280                }
281            };
282
283            let (step_id, step_name, _) = &step_records[idx];
284            let completed_at = Utc::now();
285
286            match step_result {
287                Ok(output) => {
288                    self.total_cost_usd += output.cost_usd;
289                    self.total_duration_ms += output.duration_ms;
290
291                    let debug_messages_json = output.debug_messages_json();
292
293                    self.store
294                        .update_step(
295                            *step_id,
296                            StepUpdate {
297                                status: Some(StepStatus::Completed),
298                                output: Some(output.output.clone()),
299                                duration_ms: Some(output.duration_ms),
300                                cost_usd: Some(output.cost_usd),
301                                input_tokens: output.input_tokens,
302                                output_tokens: output.output_tokens,
303                                completed_at: Some(completed_at),
304                                debug_messages: debug_messages_json,
305                                ..StepUpdate::default()
306                            },
307                        )
308                        .await?;
309
310                    info!(
311                        run_id = %self.run_id,
312                        step = %step_name,
313                        duration_ms = output.duration_ms,
314                        "parallel step completed"
315                    );
316
317                    indexed_results[idx] = Some(Ok(output));
318                }
319                Err(err) => {
320                    let err_msg = err.to_string();
321                    let debug_messages_json = extract_debug_messages_from_error(&err);
322                    let partial = extract_partial_usage_from_error(&err);
323
324                    if let Some(ref usage) = partial {
325                        if let Some(cost) = usage.cost_usd {
326                            self.total_cost_usd += cost;
327                        }
328                        if let Some(dur) = usage.duration_ms {
329                            self.total_duration_ms += dur;
330                        }
331                    }
332
333                    if let Err(store_err) = self
334                        .store
335                        .update_step(
336                            *step_id,
337                            StepUpdate {
338                                status: Some(StepStatus::Failed),
339                                error: Some(err_msg.clone()),
340                                completed_at: Some(completed_at),
341                                debug_messages: debug_messages_json,
342                                duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
343                                cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
344                                input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
345                                output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
346                                ..StepUpdate::default()
347                            },
348                        )
349                        .await
350                    {
351                        tracing::error!(
352                            step_id = %step_id,
353                            error = %store_err,
354                            "failed to persist parallel step failure"
355                        );
356                    }
357
358                    indexed_results[idx] = Some(Err(err_msg.clone()));
359
360                    if first_error.is_none() {
361                        first_error = Some(err);
362                    }
363
364                    if fail_fast {
365                        join_set.abort_all();
366                    }
367                }
368            }
369        }
370
371        if let Some(err) = first_error {
372            return Err(err);
373        }
374
375        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
376
377        // Build results in original order.
378        let results: Vec<ParallelStepResult> = step_records
379            .iter()
380            .enumerate()
381            .map(|(idx, (step_id, name, _))| {
382                let output = match indexed_results[idx].take() {
383                    Some(Ok(o)) => o,
384                    _ => unreachable!("all steps succeeded if no error returned"),
385                };
386                ParallelStepResult {
387                    name: name.clone(),
388                    output,
389                    step_id: *step_id,
390                }
391            })
392            .collect();
393
394        Ok(results)
395    }
396
397    /// Execute a shell step.
398    ///
399    /// Creates the step record, runs the command, persists the result,
400    /// and returns the output for use in subsequent steps.
401    ///
402    /// # Errors
403    ///
404    /// Returns [`EngineError`] if the command fails or the store errors.
405    ///
406    /// # Examples
407    ///
408    /// ```no_run
409    /// use ironflow_engine::context::WorkflowContext;
410    /// use ironflow_engine::config::ShellConfig;
411    /// use ironflow_engine::error::EngineError;
412    ///
413    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
414    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
415    /// println!("stdout: {}", files.output["stdout"]);
416    /// # Ok(())
417    /// # }
418    /// ```
419    pub async fn shell(
420        &mut self,
421        name: &str,
422        config: ShellConfig,
423    ) -> Result<StepOutput, EngineError> {
424        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
425            .await
426    }
427
428    /// Execute an HTTP step.
429    ///
430    /// # Errors
431    ///
432    /// Returns [`EngineError`] if the request fails or the store errors.
433    ///
434    /// # Examples
435    ///
436    /// ```no_run
437    /// use ironflow_engine::context::WorkflowContext;
438    /// use ironflow_engine::config::HttpConfig;
439    /// use ironflow_engine::error::EngineError;
440    ///
441    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
442    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
443    /// println!("status: {}", resp.output["status"]);
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub async fn http(
448        &mut self,
449        name: &str,
450        config: HttpConfig,
451    ) -> Result<StepOutput, EngineError> {
452        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
453            .await
454    }
455
456    /// Execute an agent step.
457    ///
458    /// # Errors
459    ///
460    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
461    ///
462    /// # Examples
463    ///
464    /// ```no_run
465    /// use ironflow_engine::context::WorkflowContext;
466    /// use ironflow_engine::config::AgentStepConfig;
467    /// use ironflow_engine::error::EngineError;
468    ///
469    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
470    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
471    /// println!("review: {}", review.output);
472    /// # Ok(())
473    /// # }
474    /// ```
475    pub async fn agent(
476        &mut self,
477        name: &str,
478        config: impl Into<AgentStepConfig>,
479    ) -> Result<StepOutput, EngineError> {
480        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
481            .await
482    }
483
484    /// Create a human approval gate.
485    ///
486    /// On first execution, records an approval step and returns
487    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
488    /// transitions the run to `AwaitingApproval`.
489    ///
490    /// On resume (after a human approved via the API), the approval step
491    /// is replayed: it is marked as `Completed` and execution continues
492    /// past it. Multiple approval gates in the same handler work -- each
493    /// one pauses and resumes independently.
494    ///
495    /// # Errors
496    ///
497    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
498    /// first execution. Returns other [`EngineError`] variants on store
499    /// failures.
500    ///
501    /// # Examples
502    ///
503    /// ```no_run
504    /// use ironflow_engine::context::WorkflowContext;
505    /// use ironflow_engine::config::ApprovalConfig;
506    /// use ironflow_engine::error::EngineError;
507    ///
508    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
509    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
510    /// // Execution continues here after approval
511    /// # Ok(())
512    /// # }
513    /// ```
514    pub async fn approval(
515        &mut self,
516        name: &str,
517        config: ApprovalConfig,
518    ) -> Result<(), EngineError> {
519        let position = self.position;
520        self.position += 1;
521
522        // Replay: if this approval step exists from a prior execution,
523        // the run was approved -- mark it completed (if not already) and continue.
524        if let Some(existing) = self.replay_steps.get(&position)
525            && existing.kind == StepKind::Approval
526        {
527            if existing.status.state == StepStatus::AwaitingApproval {
528                self.store
529                    .update_step(
530                        existing.id,
531                        StepUpdate {
532                            status: Some(StepStatus::Completed),
533                            completed_at: Some(Utc::now()),
534                            ..StepUpdate::default()
535                        },
536                    )
537                    .await?;
538            }
539
540            self.last_step_ids = vec![existing.id];
541            info!(
542                run_id = %self.run_id,
543                step = %name,
544                position,
545                "approval step replayed (approved)"
546            );
547            return Ok(());
548        }
549
550        // First execution: create the approval step and suspend.
551        let step = self
552            .store
553            .create_step(NewStep {
554                run_id: self.run_id,
555                name: name.to_string(),
556                kind: StepKind::Approval,
557                position,
558                input: Some(serde_json::to_value(&config)?),
559            })
560            .await?;
561
562        self.start_step(step.id, Utc::now()).await?;
563
564        // Transition the step to AwaitingApproval so it reflects
565        // the suspended state on the dashboard.
566        self.store
567            .update_step(
568                step.id,
569                StepUpdate {
570                    status: Some(StepStatus::AwaitingApproval),
571                    ..StepUpdate::default()
572                },
573            )
574            .await?;
575
576        self.last_step_ids = vec![step.id];
577
578        Err(EngineError::ApprovalRequired {
579            run_id: self.run_id,
580            step_id: step.id,
581            message: config.message().to_string(),
582        })
583    }
584
585    /// Record a step as explicitly skipped.
586    ///
587    /// Use this inside an `if`/`else` branch when a step should not execute
588    /// but must still appear in the DAG and timeline with its reason.
589    ///
590    /// The step is created directly in [`StepStatus::Skipped`] state and the
591    /// reason is stored in the output as `{"reason": "..."}`.
592    ///
593    /// # Errors
594    ///
595    /// Returns [`EngineError`] if the store fails.
596    ///
597    /// # Examples
598    ///
599    /// ```no_run
600    /// use ironflow_engine::context::WorkflowContext;
601    /// use ironflow_engine::error::EngineError;
602    ///
603    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
604    /// let tests_passed = false;
605    /// if tests_passed {
606    ///     // ctx.shell("deploy", ...).await?;
607    /// } else {
608    ///     ctx.skip("deploy", "tests failed").await?;
609    /// }
610    /// # Ok(())
611    /// # }
612    /// ```
613    pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
614        let position = self.position;
615        self.position += 1;
616
617        let step = self
618            .store
619            .create_step(NewStep {
620                run_id: self.run_id,
621                name: name.to_string(),
622                kind: StepKind::Custom("skip".to_string()),
623                position,
624                input: None,
625            })
626            .await?;
627
628        if !self.last_step_ids.is_empty() {
629            let deps: Vec<NewStepDependency> = self
630                .last_step_ids
631                .iter()
632                .map(|&depends_on| NewStepDependency {
633                    step_id: step.id,
634                    depends_on,
635                })
636                .collect();
637            self.store.create_step_dependencies(deps).await?;
638        }
639
640        let now = Utc::now();
641        self.store
642            .update_step(
643                step.id,
644                StepUpdate {
645                    status: Some(StepStatus::Skipped),
646                    output: Some(serde_json::json!({"reason": reason})),
647                    completed_at: Some(now),
648                    ..StepUpdate::default()
649                },
650            )
651            .await?;
652
653        self.last_step_ids = vec![step.id];
654
655        info!(
656            run_id = %self.run_id,
657            step = %name,
658            reason,
659            "step skipped"
660        );
661
662        Ok(())
663    }
664
665    /// Execute a custom operation step.
666    ///
667    /// Runs a user-defined [`Operation`] with full step lifecycle management:
668    /// creates the step record, transitions to Running, executes the operation,
669    /// persists the output and duration, and marks the step Completed or Failed.
670    ///
671    /// The operation's [`kind()`](Operation::kind) is stored as
672    /// [`StepKind::Custom`].
673    ///
674    /// # Errors
675    ///
676    /// Returns [`EngineError`] if the operation fails or the store errors.
677    ///
678    /// # Examples
679    ///
680    /// ```no_run
681    /// use ironflow_engine::context::WorkflowContext;
682    /// use ironflow_engine::operation::Operation;
683    /// use ironflow_engine::error::EngineError;
684    /// use serde_json::{Value, json};
685    /// use std::pin::Pin;
686    /// use std::future::Future;
687    ///
688    /// struct MyOp;
689    /// impl Operation for MyOp {
690    ///     fn kind(&self) -> &str { "my-service" }
691    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
692    ///         Box::pin(async { Ok(json!({"ok": true})) })
693    ///     }
694    /// }
695    ///
696    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
697    /// let result = ctx.operation("call-service", &MyOp).await?;
698    /// println!("output: {}", result.output);
699    /// # Ok(())
700    /// # }
701    /// ```
702    pub async fn operation(
703        &mut self,
704        name: &str,
705        op: &dyn Operation,
706    ) -> Result<StepOutput, EngineError> {
707        let kind = StepKind::Custom(op.kind().to_string());
708        let position = self.position;
709        self.position += 1;
710
711        let step = self
712            .store
713            .create_step(NewStep {
714                run_id: self.run_id,
715                name: name.to_string(),
716                kind,
717                position,
718                input: op.input(),
719            })
720            .await?;
721
722        self.start_step(step.id, Utc::now()).await?;
723
724        let start = Instant::now();
725
726        match op.execute().await {
727            Ok(output_value) => {
728                let duration_ms = start.elapsed().as_millis() as u64;
729                self.total_duration_ms += duration_ms;
730
731                let completed_at = Utc::now();
732                self.store
733                    .update_step(
734                        step.id,
735                        StepUpdate {
736                            status: Some(StepStatus::Completed),
737                            output: Some(output_value.clone()),
738                            duration_ms: Some(duration_ms),
739                            cost_usd: Some(Decimal::ZERO),
740                            completed_at: Some(completed_at),
741                            ..StepUpdate::default()
742                        },
743                    )
744                    .await?;
745
746                info!(
747                    run_id = %self.run_id,
748                    step = %name,
749                    kind = op.kind(),
750                    duration_ms,
751                    "operation step completed"
752                );
753
754                self.last_step_ids = vec![step.id];
755
756                Ok(StepOutput {
757                    output: output_value,
758                    duration_ms,
759                    cost_usd: Decimal::ZERO,
760                    input_tokens: None,
761                    output_tokens: None,
762                    model: None,
763                    debug_messages: None,
764                })
765            }
766            Err(err) => {
767                let completed_at = Utc::now();
768                if let Err(store_err) = self
769                    .store
770                    .update_step(
771                        step.id,
772                        StepUpdate {
773                            status: Some(StepStatus::Failed),
774                            error: Some(err.to_string()),
775                            completed_at: Some(completed_at),
776                            ..StepUpdate::default()
777                        },
778                    )
779                    .await
780                {
781                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
782                }
783
784                Err(err)
785            }
786        }
787    }
788
789    /// Execute a sub-workflow step.
790    ///
791    /// Creates a child run for the named workflow handler, executes it with
792    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
793    /// the child run ID and aggregated metrics.
794    ///
795    /// Requires the context to be created with
796    /// `with_handler_resolver`.
797    ///
798    /// # Errors
799    ///
800    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
801    /// with the given name, or if no handler resolver is available.
802    ///
803    /// # Examples
804    ///
805    /// ```no_run
806    /// use ironflow_engine::context::WorkflowContext;
807    /// use ironflow_engine::error::EngineError;
808    /// use serde_json::json;
809    ///
810    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
811    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
812    /// # Ok(())
813    /// # }
814    /// ```
815    pub async fn workflow(
816        &mut self,
817        handler: &dyn WorkflowHandler,
818        payload: Value,
819    ) -> Result<StepOutput, EngineError> {
820        let config = WorkflowStepConfig::new(handler.name(), payload);
821        let position = self.position;
822        self.position += 1;
823
824        let step = self
825            .store
826            .create_step(NewStep {
827                run_id: self.run_id,
828                name: config.workflow_name.clone(),
829                kind: StepKind::Workflow,
830                position,
831                input: Some(serde_json::to_value(&config)?),
832            })
833            .await?;
834
835        self.start_step(step.id, Utc::now()).await?;
836
837        match self.execute_child_workflow(&config).await {
838            Ok(output) => {
839                self.total_cost_usd += output.cost_usd;
840                self.total_duration_ms += output.duration_ms;
841
842                let completed_at = Utc::now();
843                self.store
844                    .update_step(
845                        step.id,
846                        StepUpdate {
847                            status: Some(StepStatus::Completed),
848                            output: Some(output.output.clone()),
849                            duration_ms: Some(output.duration_ms),
850                            cost_usd: Some(output.cost_usd),
851                            completed_at: Some(completed_at),
852                            ..StepUpdate::default()
853                        },
854                    )
855                    .await?;
856
857                info!(
858                    run_id = %self.run_id,
859                    child_workflow = %config.workflow_name,
860                    duration_ms = output.duration_ms,
861                    "workflow step completed"
862                );
863
864                self.last_step_ids = vec![step.id];
865
866                Ok(output)
867            }
868            Err(err) => {
869                let completed_at = Utc::now();
870                if let Err(store_err) = self
871                    .store
872                    .update_step(
873                        step.id,
874                        StepUpdate {
875                            status: Some(StepStatus::Failed),
876                            error: Some(err.to_string()),
877                            completed_at: Some(completed_at),
878                            ..StepUpdate::default()
879                        },
880                    )
881                    .await
882                {
883                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
884                }
885
886                Err(err)
887            }
888        }
889    }
890
891    /// Execute a child workflow and return aggregated output.
892    async fn execute_child_workflow(
893        &self,
894        config: &WorkflowStepConfig,
895    ) -> Result<StepOutput, EngineError> {
896        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
897            EngineError::InvalidWorkflow(
898                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
899            )
900        })?;
901
902        let handler = resolver(&config.workflow_name).ok_or_else(|| {
903            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
904        })?;
905
906        let parent_labels = self
907            .store
908            .get_run(self.run_id)
909            .await?
910            .map(|r| r.labels)
911            .unwrap_or_default();
912
913        let child_run = self
914            .store
915            .create_run(NewRun {
916                workflow_name: config.workflow_name.clone(),
917                trigger: TriggerKind::Workflow,
918                payload: config.payload.clone(),
919                max_retries: 0,
920                handler_version: None,
921                labels: parent_labels,
922                scheduled_at: None,
923            })
924            .await?;
925
926        let child_run_id = child_run.id;
927        info!(
928            parent_run_id = %self.run_id,
929            child_run_id = %child_run_id,
930            workflow = %config.workflow_name,
931            "child run created"
932        );
933
934        self.store
935            .update_run_status(child_run_id, RunStatus::Running)
936            .await?;
937
938        let run_start = Instant::now();
939        let mut child_ctx = WorkflowContext {
940            run_id: child_run_id,
941            store: self.store.clone(),
942            provider: self.provider.clone(),
943            handler_resolver: self.handler_resolver.clone(),
944            position: 0,
945            last_step_ids: Vec::new(),
946            total_cost_usd: Decimal::ZERO,
947            total_duration_ms: 0,
948            replay_steps: HashMap::new(),
949            log_sender: self.log_sender.clone(),
950        };
951
952        let result = handler.execute(&mut child_ctx).await;
953        let total_duration = run_start.elapsed().as_millis() as u64;
954        let completed_at = Utc::now();
955
956        match result {
957            Ok(()) => {
958                self.store
959                    .update_run(
960                        child_run_id,
961                        RunUpdate {
962                            status: Some(RunStatus::Completed),
963                            cost_usd: Some(child_ctx.total_cost_usd),
964                            duration_ms: Some(total_duration),
965                            completed_at: Some(completed_at),
966                            ..RunUpdate::default()
967                        },
968                    )
969                    .await?;
970
971                Ok(StepOutput {
972                    output: serde_json::json!({
973                        "run_id": child_run_id,
974                        "workflow_name": config.workflow_name,
975                        "status": RunStatus::Completed,
976                        "cost_usd": child_ctx.total_cost_usd,
977                        "duration_ms": total_duration,
978                    }),
979                    duration_ms: total_duration,
980                    cost_usd: child_ctx.total_cost_usd,
981                    input_tokens: None,
982                    output_tokens: None,
983                    model: None,
984                    debug_messages: None,
985                })
986            }
987            Err(err) => {
988                if let Err(store_err) = self
989                    .store
990                    .update_run(
991                        child_run_id,
992                        RunUpdate {
993                            status: Some(RunStatus::Failed),
994                            error: Some(err.to_string()),
995                            cost_usd: Some(child_ctx.total_cost_usd),
996                            duration_ms: Some(total_duration),
997                            completed_at: Some(completed_at),
998                            ..RunUpdate::default()
999                        },
1000                    )
1001                    .await
1002                {
1003                    error!(
1004                        child_run_id = %child_run_id,
1005                        store_error = %store_err,
1006                        "failed to persist child run failure"
1007                    );
1008                }
1009
1010                Err(err)
1011            }
1012        }
1013    }
1014
1015    /// Try to replay a completed step from a previous execution.
1016    ///
1017    /// Returns `Some(StepOutput)` if a completed step exists at the given
1018    /// position, `None` otherwise.
1019    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
1020        let step = self.replay_steps.get(&position)?;
1021        if step.status.state != StepStatus::Completed {
1022            return None;
1023        }
1024        let output = StepOutput {
1025            output: step.output.clone().unwrap_or(Value::Null),
1026            duration_ms: step.duration_ms,
1027            cost_usd: step.cost_usd,
1028            input_tokens: step.input_tokens,
1029            output_tokens: step.output_tokens,
1030            model: None,
1031            debug_messages: None,
1032        };
1033        self.total_cost_usd += output.cost_usd;
1034        self.total_duration_ms += output.duration_ms;
1035        self.last_step_ids = vec![step.id];
1036        info!(
1037            run_id = %self.run_id,
1038            step = %step.name,
1039            position,
1040            "step replayed from previous execution"
1041        );
1042        Some(output)
1043    }
1044
1045    /// Internal: execute a step with full persistence lifecycle.
1046    async fn execute_step(
1047        &mut self,
1048        name: &str,
1049        kind: StepKind,
1050        config: StepConfig,
1051    ) -> Result<StepOutput, EngineError> {
1052        let position = self.position;
1053        self.position += 1;
1054
1055        // Replay: if this step already completed in a prior execution, return cached output.
1056        if let Some(output) = self.try_replay_step(position) {
1057            return Ok(output);
1058        }
1059
1060        // Create step record in Pending.
1061        let step = self
1062            .store
1063            .create_step(NewStep {
1064                run_id: self.run_id,
1065                name: name.to_string(),
1066                kind,
1067                position,
1068                input: Some(serde_json::to_value(&config)?),
1069            })
1070            .await?;
1071
1072        self.start_step(step.id, Utc::now()).await?;
1073
1074        let step_log_sender = self
1075            .log_sender
1076            .as_ref()
1077            .map(|s| StepLogSender::new(s.clone(), self.run_id, step.id, name.to_string()));
1078
1079        match execute_step_config(&config, &self.provider, step_log_sender).await {
1080            Ok(output) => {
1081                self.total_cost_usd += output.cost_usd;
1082                self.total_duration_ms += output.duration_ms;
1083
1084                let debug_messages_json = output.debug_messages_json();
1085
1086                let completed_at = Utc::now();
1087                self.store
1088                    .update_step(
1089                        step.id,
1090                        StepUpdate {
1091                            status: Some(StepStatus::Completed),
1092                            output: Some(output.output.clone()),
1093                            duration_ms: Some(output.duration_ms),
1094                            cost_usd: Some(output.cost_usd),
1095                            input_tokens: output.input_tokens,
1096                            output_tokens: output.output_tokens,
1097                            completed_at: Some(completed_at),
1098                            debug_messages: debug_messages_json,
1099                            ..StepUpdate::default()
1100                        },
1101                    )
1102                    .await?;
1103
1104                info!(
1105                    run_id = %self.run_id,
1106                    step = %name,
1107                    duration_ms = output.duration_ms,
1108                    "step completed"
1109                );
1110
1111                self.last_step_ids = vec![step.id];
1112
1113                Ok(output)
1114            }
1115            Err(err) => {
1116                let completed_at = Utc::now();
1117                let debug_messages_json = extract_debug_messages_from_error(&err);
1118                let partial = extract_partial_usage_from_error(&err);
1119
1120                if let Some(ref usage) = partial {
1121                    if let Some(cost) = usage.cost_usd {
1122                        self.total_cost_usd += cost;
1123                    }
1124                    if let Some(dur) = usage.duration_ms {
1125                        self.total_duration_ms += dur;
1126                    }
1127                }
1128
1129                if let Err(store_err) = self
1130                    .store
1131                    .update_step(
1132                        step.id,
1133                        StepUpdate {
1134                            status: Some(StepStatus::Failed),
1135                            error: Some(err.to_string()),
1136                            completed_at: Some(completed_at),
1137                            debug_messages: debug_messages_json,
1138                            duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1139                            cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1140                            input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1141                            output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1142                            ..StepUpdate::default()
1143                        },
1144                    )
1145                    .await
1146                {
1147                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1148                }
1149
1150                Err(err)
1151            }
1152        }
1153    }
1154
1155    /// Record dependency edges and transition a step to Running.
1156    ///
1157    /// Records edges from `step_id` to all `last_step_ids`, then
1158    /// transitions the step to `Running` with the given timestamp.
1159    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1160        if !self.last_step_ids.is_empty() {
1161            let deps: Vec<NewStepDependency> = self
1162                .last_step_ids
1163                .iter()
1164                .map(|&depends_on| NewStepDependency {
1165                    step_id,
1166                    depends_on,
1167                })
1168                .collect();
1169            self.store.create_step_dependencies(deps).await?;
1170        }
1171
1172        self.store
1173            .update_step(
1174                step_id,
1175                StepUpdate {
1176                    status: Some(StepStatus::Running),
1177                    started_at: Some(now),
1178                    ..StepUpdate::default()
1179                },
1180            )
1181            .await?;
1182
1183        Ok(())
1184    }
1185
1186    /// Access the store directly (advanced usage).
1187    pub fn store(&self) -> &Arc<dyn Store> {
1188        &self.store
1189    }
1190
1191    /// Access the payload that triggered this run.
1192    ///
1193    /// Fetches the run from the store and returns its payload.
1194    ///
1195    /// # Errors
1196    ///
1197    /// Returns [`EngineError::Store`] if the run is not found.
1198    pub async fn payload(&self) -> Result<Value, EngineError> {
1199        let run = self
1200            .store
1201            .get_run(self.run_id)
1202            .await?
1203            .ok_or(EngineError::Store(
1204                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1205            ))?;
1206        Ok(run.payload)
1207    }
1208}
1209
1210impl fmt::Debug for WorkflowContext {
1211    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212        f.debug_struct("WorkflowContext")
1213            .field("run_id", &self.run_id)
1214            .field("position", &self.position)
1215            .field("total_cost_usd", &self.total_cost_usd)
1216            .finish_non_exhaustive()
1217    }
1218}
1219
1220/// Extract debug messages from an engine error, if it wraps a schema validation
1221/// failure that carries a verbose conversation trace.
1222fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1223    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1224        debug_messages,
1225        ..
1226    })) = err
1227        && !debug_messages.is_empty()
1228    {
1229        return serde_json::to_value(debug_messages).ok();
1230    }
1231    None
1232}
1233
1234/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1235///
1236/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1237/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1238/// happens here, at the engine/store boundary.
1239struct StepPartialUsage {
1240    cost_usd: Option<Decimal>,
1241    duration_ms: Option<u64>,
1242    input_tokens: Option<u64>,
1243    output_tokens: Option<u64>,
1244}
1245
1246fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1247    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1248        partial_usage,
1249        ..
1250    })) = err
1251        && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1252    {
1253        return Some(StepPartialUsage {
1254            cost_usd: partial_usage
1255                .cost_usd
1256                .and_then(|c| Decimal::try_from(c).ok()),
1257            duration_ms: partial_usage.duration_ms,
1258            input_tokens: partial_usage.input_tokens,
1259            output_tokens: partial_usage.output_tokens,
1260        });
1261    }
1262    None
1263}