Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42    StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::log_sender::{LogSender, StepLogSender};
53use crate::operation::Operation;
54
55/// Callback type for resolving workflow handlers by name.
56pub(crate) type HandlerResolver =
57    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
58
59/// Execution context for a single workflow run.
60///
61/// Tracks the current step position and provides convenience methods
62/// for executing operations with automatic persistence.
63///
64/// # Examples
65///
66/// ```no_run
67/// use ironflow_engine::context::WorkflowContext;
68/// use ironflow_engine::config::ShellConfig;
69/// use ironflow_engine::error::EngineError;
70///
71/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
72/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
73/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
74/// # Ok(())
75/// # }
76/// ```
77pub struct WorkflowContext {
78    run_id: Uuid,
79    store: Arc<dyn Store>,
80    provider: Arc<dyn AgentProvider>,
81    handler_resolver: Option<HandlerResolver>,
82    position: u32,
83    /// IDs of the last executed step(s) -- used to record DAG dependencies.
84    last_step_ids: Vec<Uuid>,
85    /// Accumulated cost across all steps in this run.
86    total_cost_usd: Decimal,
87    /// Accumulated duration across all steps.
88    total_duration_ms: u64,
89    /// Steps from a previous execution, keyed by position.
90    /// Used when resuming after approval to replay completed steps.
91    replay_steps: HashMap<u32, Step>,
92    /// Optional sender for real-time log streaming.
93    log_sender: Option<LogSender>,
94}
95
96impl WorkflowContext {
97    /// Create a new context for a run.
98    ///
99    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
100    /// creates this when executing a [`WorkflowHandler`].
101    pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
102        Self {
103            run_id,
104            store,
105            provider,
106            handler_resolver: None,
107            position: 0,
108            last_step_ids: Vec::new(),
109            total_cost_usd: Decimal::ZERO,
110            total_duration_ms: 0,
111            replay_steps: HashMap::new(),
112            log_sender: None,
113        }
114    }
115
116    /// Create a new context with a handler resolver for sub-workflow support.
117    ///
118    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
119    /// look up registered handlers by name.
120    pub(crate) fn with_handler_resolver(
121        run_id: Uuid,
122        store: Arc<dyn Store>,
123        provider: Arc<dyn AgentProvider>,
124        resolver: HandlerResolver,
125    ) -> Self {
126        Self {
127            run_id,
128            store,
129            provider,
130            handler_resolver: Some(resolver),
131            position: 0,
132            last_step_ids: Vec::new(),
133            total_cost_usd: Decimal::ZERO,
134            total_duration_ms: 0,
135            replay_steps: HashMap::new(),
136            log_sender: None,
137        }
138    }
139
140    /// Attach a log sender for real-time step output streaming.
141    pub fn set_log_sender(&mut self, sender: LogSender) {
142        self.log_sender = Some(sender);
143    }
144
145    /// Load existing steps from the store for replay after approval.
146    ///
147    /// Called by the engine when resuming a run. All completed steps
148    /// and the approved approval step are indexed by position so that
149    /// `execute_step` and `approval` can skip them.
150    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
151        let steps = self.store.list_steps(self.run_id).await?;
152        for step in steps {
153            let dominated = matches!(
154                step.status.state,
155                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
156            );
157            if dominated {
158                self.replay_steps.insert(step.position, step);
159            }
160        }
161        Ok(())
162    }
163
164    /// The run ID this context is executing for.
165    pub fn run_id(&self) -> Uuid {
166        self.run_id
167    }
168
169    /// Accumulated cost across all executed steps so far.
170    pub fn total_cost_usd(&self) -> Decimal {
171        self.total_cost_usd
172    }
173
174    /// Accumulated duration across all executed steps so far.
175    pub fn total_duration_ms(&self) -> u64 {
176        self.total_duration_ms
177    }
178
179    /// Execute multiple steps concurrently (wait-all model).
180    ///
181    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
182    /// Each step is recorded with the same `position` (execution wave).
183    /// Dependencies on previous steps are recorded automatically.
184    ///
185    /// When `fail_fast` is true, remaining steps are aborted on the first
186    /// failure. When false, all steps run to completion and the first
187    /// error is returned.
188    ///
189    /// # Errors
190    ///
191    /// Returns [`EngineError`] if any step fails.
192    ///
193    /// # Examples
194    ///
195    /// ```no_run
196    /// use ironflow_engine::context::WorkflowContext;
197    /// use ironflow_engine::config::{StepConfig, ShellConfig};
198    /// use ironflow_engine::error::EngineError;
199    ///
200    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
201    /// let results = ctx.parallel(
202    ///     vec![
203    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
204    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
205    ///     ],
206    ///     true,
207    /// ).await?;
208    ///
209    /// for r in &results {
210    ///     println!("{}: {:?}", r.name, r.output.output);
211    /// }
212    /// # Ok(())
213    /// # }
214    /// ```
215    pub async fn parallel(
216        &mut self,
217        steps: Vec<(&str, StepConfig)>,
218        fail_fast: bool,
219    ) -> Result<Vec<ParallelStepResult>, EngineError> {
220        if steps.is_empty() {
221            return Ok(Vec::new());
222        }
223
224        let wave_position = self.position;
225        self.position += 1;
226
227        let now = Utc::now();
228        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
229
230        for (name, config) in &steps {
231            let kind = config.kind();
232            let step = self
233                .store
234                .create_step(NewStep {
235                    run_id: self.run_id,
236                    name: name.to_string(),
237                    kind,
238                    position: wave_position,
239                    input: Some(serde_json::to_value(config)?),
240                })
241                .await?;
242
243            self.start_step(step.id, now).await?;
244
245            step_records.push((step.id, name.to_string(), config.clone()));
246        }
247
248        let mut join_set = JoinSet::new();
249        for (idx, (step_id, step_name, config)) in step_records.iter().enumerate() {
250            let provider = self.provider.clone();
251            let config = config.clone();
252            let step_log_sender = self
253                .log_sender
254                .as_ref()
255                .map(|s| StepLogSender::new(s.clone(), self.run_id, *step_id, step_name.clone()));
256            join_set.spawn(async move {
257                (
258                    idx,
259                    execute_step_config(&config, &provider, step_log_sender).await,
260                )
261            });
262        }
263
264        // JoinSet returns in completion order; indexed_results restores input order.
265        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
266            vec![None; step_records.len()];
267        let mut first_error: Option<EngineError> = None;
268
269        while let Some(join_result) = join_set.join_next().await {
270            let (idx, step_result) = match join_result {
271                Ok(r) => r,
272                Err(e) => {
273                    if first_error.is_none() {
274                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
275                    }
276                    if fail_fast {
277                        join_set.abort_all();
278                    }
279                    continue;
280                }
281            };
282
283            let (step_id, step_name, _) = &step_records[idx];
284            let completed_at = Utc::now();
285
286            match step_result {
287                Ok(output) => {
288                    self.total_cost_usd += output.cost_usd;
289                    self.total_duration_ms += output.duration_ms;
290
291                    let debug_messages_json = output.debug_messages_json();
292
293                    self.store
294                        .update_step(
295                            *step_id,
296                            StepUpdate {
297                                status: Some(StepStatus::Completed),
298                                output: Some(output.output.clone()),
299                                duration_ms: Some(output.duration_ms),
300                                cost_usd: Some(output.cost_usd),
301                                input_tokens: output.input_tokens,
302                                output_tokens: output.output_tokens,
303                                completed_at: Some(completed_at),
304                                debug_messages: debug_messages_json,
305                                ..StepUpdate::default()
306                            },
307                        )
308                        .await?;
309
310                    info!(
311                        run_id = %self.run_id,
312                        step = %step_name,
313                        duration_ms = output.duration_ms,
314                        "parallel step completed"
315                    );
316
317                    indexed_results[idx] = Some(Ok(output));
318                }
319                Err(err) => {
320                    let err_msg = err.to_string();
321                    let debug_messages_json = extract_debug_messages_from_error(&err);
322                    let partial = extract_partial_usage_from_error(&err);
323
324                    if let Some(ref usage) = partial {
325                        if let Some(cost) = usage.cost_usd {
326                            self.total_cost_usd += cost;
327                        }
328                        if let Some(dur) = usage.duration_ms {
329                            self.total_duration_ms += dur;
330                        }
331                    }
332
333                    if let Err(store_err) = self
334                        .store
335                        .update_step(
336                            *step_id,
337                            StepUpdate {
338                                status: Some(StepStatus::Failed),
339                                error: Some(err_msg.clone()),
340                                completed_at: Some(completed_at),
341                                debug_messages: debug_messages_json,
342                                duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
343                                cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
344                                input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
345                                output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
346                                ..StepUpdate::default()
347                            },
348                        )
349                        .await
350                    {
351                        tracing::error!(
352                            step_id = %step_id,
353                            error = %store_err,
354                            "failed to persist parallel step failure"
355                        );
356                    }
357
358                    indexed_results[idx] = Some(Err(err_msg.clone()));
359
360                    if first_error.is_none() {
361                        first_error = Some(err);
362                    }
363
364                    if fail_fast {
365                        join_set.abort_all();
366                    }
367                }
368            }
369        }
370
371        if let Some(err) = first_error {
372            return Err(err);
373        }
374
375        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
376
377        // Build results in original order.
378        let results: Vec<ParallelStepResult> = step_records
379            .iter()
380            .enumerate()
381            .map(|(idx, (step_id, name, _))| {
382                let output = match indexed_results[idx].take() {
383                    Some(Ok(o)) => o,
384                    _ => unreachable!("all steps succeeded if no error returned"),
385                };
386                ParallelStepResult {
387                    name: name.clone(),
388                    output,
389                    step_id: *step_id,
390                }
391            })
392            .collect();
393
394        Ok(results)
395    }
396
397    /// Execute a shell step.
398    ///
399    /// Creates the step record, runs the command, persists the result,
400    /// and returns the output for use in subsequent steps.
401    ///
402    /// # Errors
403    ///
404    /// Returns [`EngineError`] if the command fails or the store errors.
405    ///
406    /// # Examples
407    ///
408    /// ```no_run
409    /// use ironflow_engine::context::WorkflowContext;
410    /// use ironflow_engine::config::ShellConfig;
411    /// use ironflow_engine::error::EngineError;
412    ///
413    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
414    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
415    /// println!("stdout: {}", files.output["stdout"]);
416    /// # Ok(())
417    /// # }
418    /// ```
419    pub async fn shell(
420        &mut self,
421        name: &str,
422        config: ShellConfig,
423    ) -> Result<StepOutput, EngineError> {
424        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
425            .await
426    }
427
428    /// Execute an HTTP step.
429    ///
430    /// # Errors
431    ///
432    /// Returns [`EngineError`] if the request fails or the store errors.
433    ///
434    /// # Examples
435    ///
436    /// ```no_run
437    /// use ironflow_engine::context::WorkflowContext;
438    /// use ironflow_engine::config::HttpConfig;
439    /// use ironflow_engine::error::EngineError;
440    ///
441    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
442    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
443    /// println!("status: {}", resp.output["status"]);
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub async fn http(
448        &mut self,
449        name: &str,
450        config: HttpConfig,
451    ) -> Result<StepOutput, EngineError> {
452        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
453            .await
454    }
455
456    /// Execute an agent step.
457    ///
458    /// # Errors
459    ///
460    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
461    ///
462    /// # Examples
463    ///
464    /// ```no_run
465    /// use ironflow_engine::context::WorkflowContext;
466    /// use ironflow_engine::config::AgentStepConfig;
467    /// use ironflow_engine::error::EngineError;
468    ///
469    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
470    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
471    /// println!("review: {}", review.output["value"]);
472    /// # Ok(())
473    /// # }
474    /// ```
475    pub async fn agent(
476        &mut self,
477        name: &str,
478        config: impl Into<AgentStepConfig>,
479    ) -> Result<StepOutput, EngineError> {
480        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
481            .await
482    }
483
484    /// Create a human approval gate.
485    ///
486    /// On first execution, records an approval step and returns
487    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
488    /// transitions the run to `AwaitingApproval`.
489    ///
490    /// On resume (after a human approved via the API), the approval step
491    /// is replayed: it is marked as `Completed` and execution continues
492    /// past it. Multiple approval gates in the same handler work -- each
493    /// one pauses and resumes independently.
494    ///
495    /// # Errors
496    ///
497    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
498    /// first execution. Returns other [`EngineError`] variants on store
499    /// failures.
500    ///
501    /// # Examples
502    ///
503    /// ```no_run
504    /// use ironflow_engine::context::WorkflowContext;
505    /// use ironflow_engine::config::ApprovalConfig;
506    /// use ironflow_engine::error::EngineError;
507    ///
508    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
509    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
510    /// // Execution continues here after approval
511    /// # Ok(())
512    /// # }
513    /// ```
514    pub async fn approval(
515        &mut self,
516        name: &str,
517        config: ApprovalConfig,
518    ) -> Result<(), EngineError> {
519        let position = self.position;
520        self.position += 1;
521
522        // Replay: if this approval step exists from a prior execution,
523        // the run was approved -- mark it completed (if not already) and continue.
524        if let Some(existing) = self.replay_steps.get(&position)
525            && existing.kind == StepKind::Approval
526        {
527            if existing.status.state == StepStatus::AwaitingApproval {
528                self.store
529                    .update_step(
530                        existing.id,
531                        StepUpdate {
532                            status: Some(StepStatus::Completed),
533                            completed_at: Some(Utc::now()),
534                            ..StepUpdate::default()
535                        },
536                    )
537                    .await?;
538            }
539
540            self.last_step_ids = vec![existing.id];
541            info!(
542                run_id = %self.run_id,
543                step = %name,
544                position,
545                "approval step replayed (approved)"
546            );
547            return Ok(());
548        }
549
550        // First execution: create the approval step and suspend.
551        let step = self
552            .store
553            .create_step(NewStep {
554                run_id: self.run_id,
555                name: name.to_string(),
556                kind: StepKind::Approval,
557                position,
558                input: Some(serde_json::to_value(&config)?),
559            })
560            .await?;
561
562        self.start_step(step.id, Utc::now()).await?;
563
564        // Transition the step to AwaitingApproval so it reflects
565        // the suspended state on the dashboard.
566        self.store
567            .update_step(
568                step.id,
569                StepUpdate {
570                    status: Some(StepStatus::AwaitingApproval),
571                    ..StepUpdate::default()
572                },
573            )
574            .await?;
575
576        self.last_step_ids = vec![step.id];
577
578        Err(EngineError::ApprovalRequired {
579            run_id: self.run_id,
580            step_id: step.id,
581            message: config.message().to_string(),
582        })
583    }
584
585    /// Record a step as explicitly skipped.
586    ///
587    /// Use this inside an `if`/`else` branch when a step should not execute
588    /// but must still appear in the DAG and timeline with its reason.
589    ///
590    /// The step is created directly in [`StepStatus::Skipped`] state and the
591    /// reason is stored in the output as `{"reason": "..."}`.
592    ///
593    /// # Errors
594    ///
595    /// Returns [`EngineError`] if the store fails.
596    ///
597    /// # Examples
598    ///
599    /// ```no_run
600    /// use ironflow_engine::context::WorkflowContext;
601    /// use ironflow_engine::error::EngineError;
602    ///
603    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
604    /// let tests_passed = false;
605    /// if tests_passed {
606    ///     // ctx.shell("deploy", ...).await?;
607    /// } else {
608    ///     ctx.skip("deploy", "tests failed").await?;
609    /// }
610    /// # Ok(())
611    /// # }
612    /// ```
613    pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
614        let position = self.position;
615        self.position += 1;
616
617        let step = self
618            .store
619            .create_step(NewStep {
620                run_id: self.run_id,
621                name: name.to_string(),
622                kind: StepKind::Custom("skip".to_string()),
623                position,
624                input: None,
625            })
626            .await?;
627
628        if !self.last_step_ids.is_empty() {
629            let deps: Vec<NewStepDependency> = self
630                .last_step_ids
631                .iter()
632                .map(|&depends_on| NewStepDependency {
633                    step_id: step.id,
634                    depends_on,
635                })
636                .collect();
637            self.store.create_step_dependencies(deps).await?;
638        }
639
640        let now = Utc::now();
641        self.store
642            .update_step(
643                step.id,
644                StepUpdate {
645                    status: Some(StepStatus::Skipped),
646                    output: Some(serde_json::json!({"reason": reason})),
647                    completed_at: Some(now),
648                    ..StepUpdate::default()
649                },
650            )
651            .await?;
652
653        self.last_step_ids = vec![step.id];
654
655        info!(
656            run_id = %self.run_id,
657            step = %name,
658            reason,
659            "step skipped"
660        );
661
662        Ok(())
663    }
664
665    /// Execute a custom operation step.
666    ///
667    /// Runs a user-defined [`Operation`] with full step lifecycle management:
668    /// creates the step record, transitions to Running, executes the operation,
669    /// persists the output and duration, and marks the step Completed or Failed.
670    ///
671    /// The operation's [`kind()`](Operation::kind) is stored as
672    /// [`StepKind::Custom`].
673    ///
674    /// # Errors
675    ///
676    /// Returns [`EngineError`] if the operation fails or the store errors.
677    ///
678    /// # Examples
679    ///
680    /// ```no_run
681    /// use ironflow_engine::context::WorkflowContext;
682    /// use ironflow_engine::operation::Operation;
683    /// use ironflow_engine::error::EngineError;
684    /// use serde_json::{Value, json};
685    /// use std::pin::Pin;
686    /// use std::future::Future;
687    ///
688    /// struct MyOp;
689    /// impl Operation for MyOp {
690    ///     fn kind(&self) -> &str { "my-service" }
691    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
692    ///         Box::pin(async { Ok(json!({"ok": true})) })
693    ///     }
694    /// }
695    ///
696    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
697    /// let result = ctx.operation("call-service", &MyOp).await?;
698    /// println!("output: {}", result.output);
699    /// # Ok(())
700    /// # }
701    /// ```
702    pub async fn operation(
703        &mut self,
704        name: &str,
705        op: &dyn Operation,
706    ) -> Result<StepOutput, EngineError> {
707        let kind = StepKind::Custom(op.kind().to_string());
708        let position = self.position;
709        self.position += 1;
710
711        let step = self
712            .store
713            .create_step(NewStep {
714                run_id: self.run_id,
715                name: name.to_string(),
716                kind,
717                position,
718                input: op.input(),
719            })
720            .await?;
721
722        self.start_step(step.id, Utc::now()).await?;
723
724        let start = Instant::now();
725
726        match op.execute().await {
727            Ok(output_value) => {
728                let duration_ms = start.elapsed().as_millis() as u64;
729                self.total_duration_ms += duration_ms;
730
731                let completed_at = Utc::now();
732                self.store
733                    .update_step(
734                        step.id,
735                        StepUpdate {
736                            status: Some(StepStatus::Completed),
737                            output: Some(output_value.clone()),
738                            duration_ms: Some(duration_ms),
739                            cost_usd: Some(Decimal::ZERO),
740                            completed_at: Some(completed_at),
741                            ..StepUpdate::default()
742                        },
743                    )
744                    .await?;
745
746                info!(
747                    run_id = %self.run_id,
748                    step = %name,
749                    kind = op.kind(),
750                    duration_ms,
751                    "operation step completed"
752                );
753
754                self.last_step_ids = vec![step.id];
755
756                Ok(StepOutput {
757                    output: output_value,
758                    duration_ms,
759                    cost_usd: Decimal::ZERO,
760                    input_tokens: None,
761                    output_tokens: None,
762                    debug_messages: None,
763                })
764            }
765            Err(err) => {
766                let completed_at = Utc::now();
767                if let Err(store_err) = self
768                    .store
769                    .update_step(
770                        step.id,
771                        StepUpdate {
772                            status: Some(StepStatus::Failed),
773                            error: Some(err.to_string()),
774                            completed_at: Some(completed_at),
775                            ..StepUpdate::default()
776                        },
777                    )
778                    .await
779                {
780                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
781                }
782
783                Err(err)
784            }
785        }
786    }
787
788    /// Execute a sub-workflow step.
789    ///
790    /// Creates a child run for the named workflow handler, executes it with
791    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
792    /// the child run ID and aggregated metrics.
793    ///
794    /// Requires the context to be created with
795    /// `with_handler_resolver`.
796    ///
797    /// # Errors
798    ///
799    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
800    /// with the given name, or if no handler resolver is available.
801    ///
802    /// # Examples
803    ///
804    /// ```no_run
805    /// use ironflow_engine::context::WorkflowContext;
806    /// use ironflow_engine::error::EngineError;
807    /// use serde_json::json;
808    ///
809    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
810    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
811    /// # Ok(())
812    /// # }
813    /// ```
814    pub async fn workflow(
815        &mut self,
816        handler: &dyn WorkflowHandler,
817        payload: Value,
818    ) -> Result<StepOutput, EngineError> {
819        let config = WorkflowStepConfig::new(handler.name(), payload);
820        let position = self.position;
821        self.position += 1;
822
823        let step = self
824            .store
825            .create_step(NewStep {
826                run_id: self.run_id,
827                name: config.workflow_name.clone(),
828                kind: StepKind::Workflow,
829                position,
830                input: Some(serde_json::to_value(&config)?),
831            })
832            .await?;
833
834        self.start_step(step.id, Utc::now()).await?;
835
836        match self.execute_child_workflow(&config).await {
837            Ok(output) => {
838                self.total_cost_usd += output.cost_usd;
839                self.total_duration_ms += output.duration_ms;
840
841                let completed_at = Utc::now();
842                self.store
843                    .update_step(
844                        step.id,
845                        StepUpdate {
846                            status: Some(StepStatus::Completed),
847                            output: Some(output.output.clone()),
848                            duration_ms: Some(output.duration_ms),
849                            cost_usd: Some(output.cost_usd),
850                            completed_at: Some(completed_at),
851                            ..StepUpdate::default()
852                        },
853                    )
854                    .await?;
855
856                info!(
857                    run_id = %self.run_id,
858                    child_workflow = %config.workflow_name,
859                    duration_ms = output.duration_ms,
860                    "workflow step completed"
861                );
862
863                self.last_step_ids = vec![step.id];
864
865                Ok(output)
866            }
867            Err(err) => {
868                let completed_at = Utc::now();
869                if let Err(store_err) = self
870                    .store
871                    .update_step(
872                        step.id,
873                        StepUpdate {
874                            status: Some(StepStatus::Failed),
875                            error: Some(err.to_string()),
876                            completed_at: Some(completed_at),
877                            ..StepUpdate::default()
878                        },
879                    )
880                    .await
881                {
882                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
883                }
884
885                Err(err)
886            }
887        }
888    }
889
890    /// Execute a child workflow and return aggregated output.
891    async fn execute_child_workflow(
892        &self,
893        config: &WorkflowStepConfig,
894    ) -> Result<StepOutput, EngineError> {
895        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
896            EngineError::InvalidWorkflow(
897                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
898            )
899        })?;
900
901        let handler = resolver(&config.workflow_name).ok_or_else(|| {
902            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
903        })?;
904
905        let parent_labels = self
906            .store
907            .get_run(self.run_id)
908            .await?
909            .map(|r| r.labels)
910            .unwrap_or_default();
911
912        let child_run = self
913            .store
914            .create_run(NewRun {
915                workflow_name: config.workflow_name.clone(),
916                trigger: TriggerKind::Workflow,
917                payload: config.payload.clone(),
918                max_retries: 0,
919                handler_version: None,
920                labels: parent_labels,
921                scheduled_at: None,
922            })
923            .await?;
924
925        let child_run_id = child_run.id;
926        info!(
927            parent_run_id = %self.run_id,
928            child_run_id = %child_run_id,
929            workflow = %config.workflow_name,
930            "child run created"
931        );
932
933        self.store
934            .update_run_status(child_run_id, RunStatus::Running)
935            .await?;
936
937        let run_start = Instant::now();
938        let mut child_ctx = WorkflowContext {
939            run_id: child_run_id,
940            store: self.store.clone(),
941            provider: self.provider.clone(),
942            handler_resolver: self.handler_resolver.clone(),
943            position: 0,
944            last_step_ids: Vec::new(),
945            total_cost_usd: Decimal::ZERO,
946            total_duration_ms: 0,
947            replay_steps: HashMap::new(),
948            log_sender: self.log_sender.clone(),
949        };
950
951        let result = handler.execute(&mut child_ctx).await;
952        let total_duration = run_start.elapsed().as_millis() as u64;
953        let completed_at = Utc::now();
954
955        match result {
956            Ok(()) => {
957                self.store
958                    .update_run(
959                        child_run_id,
960                        RunUpdate {
961                            status: Some(RunStatus::Completed),
962                            cost_usd: Some(child_ctx.total_cost_usd),
963                            duration_ms: Some(total_duration),
964                            completed_at: Some(completed_at),
965                            ..RunUpdate::default()
966                        },
967                    )
968                    .await?;
969
970                Ok(StepOutput {
971                    output: serde_json::json!({
972                        "run_id": child_run_id,
973                        "workflow_name": config.workflow_name,
974                        "status": RunStatus::Completed,
975                        "cost_usd": child_ctx.total_cost_usd,
976                        "duration_ms": total_duration,
977                    }),
978                    duration_ms: total_duration,
979                    cost_usd: child_ctx.total_cost_usd,
980                    input_tokens: None,
981                    output_tokens: None,
982                    debug_messages: None,
983                })
984            }
985            Err(err) => {
986                if let Err(store_err) = self
987                    .store
988                    .update_run(
989                        child_run_id,
990                        RunUpdate {
991                            status: Some(RunStatus::Failed),
992                            error: Some(err.to_string()),
993                            cost_usd: Some(child_ctx.total_cost_usd),
994                            duration_ms: Some(total_duration),
995                            completed_at: Some(completed_at),
996                            ..RunUpdate::default()
997                        },
998                    )
999                    .await
1000                {
1001                    error!(
1002                        child_run_id = %child_run_id,
1003                        store_error = %store_err,
1004                        "failed to persist child run failure"
1005                    );
1006                }
1007
1008                Err(err)
1009            }
1010        }
1011    }
1012
1013    /// Try to replay a completed step from a previous execution.
1014    ///
1015    /// Returns `Some(StepOutput)` if a completed step exists at the given
1016    /// position, `None` otherwise.
1017    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
1018        let step = self.replay_steps.get(&position)?;
1019        if step.status.state != StepStatus::Completed {
1020            return None;
1021        }
1022        let output = StepOutput {
1023            output: step.output.clone().unwrap_or(Value::Null),
1024            duration_ms: step.duration_ms,
1025            cost_usd: step.cost_usd,
1026            input_tokens: step.input_tokens,
1027            output_tokens: step.output_tokens,
1028            debug_messages: None,
1029        };
1030        self.total_cost_usd += output.cost_usd;
1031        self.total_duration_ms += output.duration_ms;
1032        self.last_step_ids = vec![step.id];
1033        info!(
1034            run_id = %self.run_id,
1035            step = %step.name,
1036            position,
1037            "step replayed from previous execution"
1038        );
1039        Some(output)
1040    }
1041
1042    /// Internal: execute a step with full persistence lifecycle.
1043    async fn execute_step(
1044        &mut self,
1045        name: &str,
1046        kind: StepKind,
1047        config: StepConfig,
1048    ) -> Result<StepOutput, EngineError> {
1049        let position = self.position;
1050        self.position += 1;
1051
1052        // Replay: if this step already completed in a prior execution, return cached output.
1053        if let Some(output) = self.try_replay_step(position) {
1054            return Ok(output);
1055        }
1056
1057        // Create step record in Pending.
1058        let step = self
1059            .store
1060            .create_step(NewStep {
1061                run_id: self.run_id,
1062                name: name.to_string(),
1063                kind,
1064                position,
1065                input: Some(serde_json::to_value(&config)?),
1066            })
1067            .await?;
1068
1069        self.start_step(step.id, Utc::now()).await?;
1070
1071        let step_log_sender = self
1072            .log_sender
1073            .as_ref()
1074            .map(|s| StepLogSender::new(s.clone(), self.run_id, step.id, name.to_string()));
1075
1076        match execute_step_config(&config, &self.provider, step_log_sender).await {
1077            Ok(output) => {
1078                self.total_cost_usd += output.cost_usd;
1079                self.total_duration_ms += output.duration_ms;
1080
1081                let debug_messages_json = output.debug_messages_json();
1082
1083                let completed_at = Utc::now();
1084                self.store
1085                    .update_step(
1086                        step.id,
1087                        StepUpdate {
1088                            status: Some(StepStatus::Completed),
1089                            output: Some(output.output.clone()),
1090                            duration_ms: Some(output.duration_ms),
1091                            cost_usd: Some(output.cost_usd),
1092                            input_tokens: output.input_tokens,
1093                            output_tokens: output.output_tokens,
1094                            completed_at: Some(completed_at),
1095                            debug_messages: debug_messages_json,
1096                            ..StepUpdate::default()
1097                        },
1098                    )
1099                    .await?;
1100
1101                info!(
1102                    run_id = %self.run_id,
1103                    step = %name,
1104                    duration_ms = output.duration_ms,
1105                    "step completed"
1106                );
1107
1108                self.last_step_ids = vec![step.id];
1109
1110                Ok(output)
1111            }
1112            Err(err) => {
1113                let completed_at = Utc::now();
1114                let debug_messages_json = extract_debug_messages_from_error(&err);
1115                let partial = extract_partial_usage_from_error(&err);
1116
1117                if let Some(ref usage) = partial {
1118                    if let Some(cost) = usage.cost_usd {
1119                        self.total_cost_usd += cost;
1120                    }
1121                    if let Some(dur) = usage.duration_ms {
1122                        self.total_duration_ms += dur;
1123                    }
1124                }
1125
1126                if let Err(store_err) = self
1127                    .store
1128                    .update_step(
1129                        step.id,
1130                        StepUpdate {
1131                            status: Some(StepStatus::Failed),
1132                            error: Some(err.to_string()),
1133                            completed_at: Some(completed_at),
1134                            debug_messages: debug_messages_json,
1135                            duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1136                            cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1137                            input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1138                            output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1139                            ..StepUpdate::default()
1140                        },
1141                    )
1142                    .await
1143                {
1144                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1145                }
1146
1147                Err(err)
1148            }
1149        }
1150    }
1151
1152    /// Record dependency edges and transition a step to Running.
1153    ///
1154    /// Records edges from `step_id` to all `last_step_ids`, then
1155    /// transitions the step to `Running` with the given timestamp.
1156    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1157        if !self.last_step_ids.is_empty() {
1158            let deps: Vec<NewStepDependency> = self
1159                .last_step_ids
1160                .iter()
1161                .map(|&depends_on| NewStepDependency {
1162                    step_id,
1163                    depends_on,
1164                })
1165                .collect();
1166            self.store.create_step_dependencies(deps).await?;
1167        }
1168
1169        self.store
1170            .update_step(
1171                step_id,
1172                StepUpdate {
1173                    status: Some(StepStatus::Running),
1174                    started_at: Some(now),
1175                    ..StepUpdate::default()
1176                },
1177            )
1178            .await?;
1179
1180        Ok(())
1181    }
1182
1183    /// Access the store directly (advanced usage).
1184    pub fn store(&self) -> &Arc<dyn Store> {
1185        &self.store
1186    }
1187
1188    /// Access the payload that triggered this run.
1189    ///
1190    /// Fetches the run from the store and returns its payload.
1191    ///
1192    /// # Errors
1193    ///
1194    /// Returns [`EngineError::Store`] if the run is not found.
1195    pub async fn payload(&self) -> Result<Value, EngineError> {
1196        let run = self
1197            .store
1198            .get_run(self.run_id)
1199            .await?
1200            .ok_or(EngineError::Store(
1201                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1202            ))?;
1203        Ok(run.payload)
1204    }
1205}
1206
1207impl fmt::Debug for WorkflowContext {
1208    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1209        f.debug_struct("WorkflowContext")
1210            .field("run_id", &self.run_id)
1211            .field("position", &self.position)
1212            .field("total_cost_usd", &self.total_cost_usd)
1213            .finish_non_exhaustive()
1214    }
1215}
1216
1217/// Extract debug messages from an engine error, if it wraps a schema validation
1218/// failure that carries a verbose conversation trace.
1219fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1220    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1221        debug_messages,
1222        ..
1223    })) = err
1224        && !debug_messages.is_empty()
1225    {
1226        return serde_json::to_value(debug_messages).ok();
1227    }
1228    None
1229}
1230
1231/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1232///
1233/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1234/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1235/// happens here, at the engine/store boundary.
1236struct StepPartialUsage {
1237    cost_usd: Option<Decimal>,
1238    duration_ms: Option<u64>,
1239    input_tokens: Option<u64>,
1240    output_tokens: Option<u64>,
1241}
1242
1243fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1244    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1245        partial_usage,
1246        ..
1247    })) = err
1248        && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1249    {
1250        return Some(StepPartialUsage {
1251            cost_usd: partial_usage
1252                .cost_usd
1253                .and_then(|c| Decimal::try_from(c).ok()),
1254            duration_ms: partial_usage.duration_ms,
1255            input_tokens: partial_usage.input_tokens,
1256            output_tokens: partial_usage.output_tokens,
1257        });
1258    }
1259    None
1260}