Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42    StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::operation::Operation;
53
54/// Callback type for resolving workflow handlers by name.
55pub(crate) type HandlerResolver =
56    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
57
58/// Execution context for a single workflow run.
59///
60/// Tracks the current step position and provides convenience methods
61/// for executing operations with automatic persistence.
62///
63/// # Examples
64///
65/// ```no_run
66/// use ironflow_engine::context::WorkflowContext;
67/// use ironflow_engine::config::ShellConfig;
68/// use ironflow_engine::error::EngineError;
69///
70/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
71/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
72/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
73/// # Ok(())
74/// # }
75/// ```
76pub struct WorkflowContext {
77    run_id: Uuid,
78    store: Arc<dyn Store>,
79    provider: Arc<dyn AgentProvider>,
80    handler_resolver: Option<HandlerResolver>,
81    position: u32,
82    /// IDs of the last executed step(s) -- used to record DAG dependencies.
83    last_step_ids: Vec<Uuid>,
84    /// Accumulated cost across all steps in this run.
85    total_cost_usd: Decimal,
86    /// Accumulated duration across all steps.
87    total_duration_ms: u64,
88    /// Steps from a previous execution, keyed by position.
89    /// Used when resuming after approval to replay completed steps.
90    replay_steps: HashMap<u32, Step>,
91}
92
93impl WorkflowContext {
94    /// Create a new context for a run.
95    ///
96    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
97    /// creates this when executing a [`WorkflowHandler`].
98    pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
99        Self {
100            run_id,
101            store,
102            provider,
103            handler_resolver: None,
104            position: 0,
105            last_step_ids: Vec::new(),
106            total_cost_usd: Decimal::ZERO,
107            total_duration_ms: 0,
108            replay_steps: HashMap::new(),
109        }
110    }
111
112    /// Create a new context with a handler resolver for sub-workflow support.
113    ///
114    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
115    /// look up registered handlers by name.
116    pub(crate) fn with_handler_resolver(
117        run_id: Uuid,
118        store: Arc<dyn Store>,
119        provider: Arc<dyn AgentProvider>,
120        resolver: HandlerResolver,
121    ) -> Self {
122        Self {
123            run_id,
124            store,
125            provider,
126            handler_resolver: Some(resolver),
127            position: 0,
128            last_step_ids: Vec::new(),
129            total_cost_usd: Decimal::ZERO,
130            total_duration_ms: 0,
131            replay_steps: HashMap::new(),
132        }
133    }
134
135    /// Load existing steps from the store for replay after approval.
136    ///
137    /// Called by the engine when resuming a run. All completed steps
138    /// and the approved approval step are indexed by position so that
139    /// `execute_step` and `approval` can skip them.
140    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
141        let steps = self.store.list_steps(self.run_id).await?;
142        for step in steps {
143            let dominated = matches!(
144                step.status.state,
145                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
146            );
147            if dominated {
148                self.replay_steps.insert(step.position, step);
149            }
150        }
151        Ok(())
152    }
153
154    /// The run ID this context is executing for.
155    pub fn run_id(&self) -> Uuid {
156        self.run_id
157    }
158
159    /// Accumulated cost across all executed steps so far.
160    pub fn total_cost_usd(&self) -> Decimal {
161        self.total_cost_usd
162    }
163
164    /// Accumulated duration across all executed steps so far.
165    pub fn total_duration_ms(&self) -> u64 {
166        self.total_duration_ms
167    }
168
169    /// Execute multiple steps concurrently (wait-all model).
170    ///
171    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
172    /// Each step is recorded with the same `position` (execution wave).
173    /// Dependencies on previous steps are recorded automatically.
174    ///
175    /// When `fail_fast` is true, remaining steps are aborted on the first
176    /// failure. When false, all steps run to completion and the first
177    /// error is returned.
178    ///
179    /// # Errors
180    ///
181    /// Returns [`EngineError`] if any step fails.
182    ///
183    /// # Examples
184    ///
185    /// ```no_run
186    /// use ironflow_engine::context::WorkflowContext;
187    /// use ironflow_engine::config::{StepConfig, ShellConfig};
188    /// use ironflow_engine::error::EngineError;
189    ///
190    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
191    /// let results = ctx.parallel(
192    ///     vec![
193    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
194    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
195    ///     ],
196    ///     true,
197    /// ).await?;
198    ///
199    /// for r in &results {
200    ///     println!("{}: {:?}", r.name, r.output.output);
201    /// }
202    /// # Ok(())
203    /// # }
204    /// ```
205    pub async fn parallel(
206        &mut self,
207        steps: Vec<(&str, StepConfig)>,
208        fail_fast: bool,
209    ) -> Result<Vec<ParallelStepResult>, EngineError> {
210        if steps.is_empty() {
211            return Ok(Vec::new());
212        }
213
214        let wave_position = self.position;
215        self.position += 1;
216
217        let now = Utc::now();
218        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
219
220        for (name, config) in &steps {
221            let kind = config.kind();
222            let step = self
223                .store
224                .create_step(NewStep {
225                    run_id: self.run_id,
226                    name: name.to_string(),
227                    kind,
228                    position: wave_position,
229                    input: Some(serde_json::to_value(config)?),
230                })
231                .await?;
232
233            self.start_step(step.id, now).await?;
234
235            step_records.push((step.id, name.to_string(), config.clone()));
236        }
237
238        let mut join_set = JoinSet::new();
239        for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
240            let provider = self.provider.clone();
241            let config = config.clone();
242            join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
243        }
244
245        // JoinSet returns in completion order; indexed_results restores input order.
246        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
247            vec![None; step_records.len()];
248        let mut first_error: Option<EngineError> = None;
249
250        while let Some(join_result) = join_set.join_next().await {
251            let (idx, step_result) = match join_result {
252                Ok(r) => r,
253                Err(e) => {
254                    if first_error.is_none() {
255                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
256                    }
257                    if fail_fast {
258                        join_set.abort_all();
259                    }
260                    continue;
261                }
262            };
263
264            let (step_id, step_name, _) = &step_records[idx];
265            let completed_at = Utc::now();
266
267            match step_result {
268                Ok(output) => {
269                    self.total_cost_usd += output.cost_usd;
270                    self.total_duration_ms += output.duration_ms;
271
272                    let debug_messages_json = output.debug_messages_json();
273
274                    self.store
275                        .update_step(
276                            *step_id,
277                            StepUpdate {
278                                status: Some(StepStatus::Completed),
279                                output: Some(output.output.clone()),
280                                duration_ms: Some(output.duration_ms),
281                                cost_usd: Some(output.cost_usd),
282                                input_tokens: output.input_tokens,
283                                output_tokens: output.output_tokens,
284                                completed_at: Some(completed_at),
285                                debug_messages: debug_messages_json,
286                                ..StepUpdate::default()
287                            },
288                        )
289                        .await?;
290
291                    info!(
292                        run_id = %self.run_id,
293                        step = %step_name,
294                        duration_ms = output.duration_ms,
295                        "parallel step completed"
296                    );
297
298                    indexed_results[idx] = Some(Ok(output));
299                }
300                Err(err) => {
301                    let err_msg = err.to_string();
302                    let debug_messages_json = extract_debug_messages_from_error(&err);
303                    let partial = extract_partial_usage_from_error(&err);
304
305                    if let Some(ref usage) = partial {
306                        if let Some(cost) = usage.cost_usd {
307                            self.total_cost_usd += cost;
308                        }
309                        if let Some(dur) = usage.duration_ms {
310                            self.total_duration_ms += dur;
311                        }
312                    }
313
314                    if let Err(store_err) = self
315                        .store
316                        .update_step(
317                            *step_id,
318                            StepUpdate {
319                                status: Some(StepStatus::Failed),
320                                error: Some(err_msg.clone()),
321                                completed_at: Some(completed_at),
322                                debug_messages: debug_messages_json,
323                                duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
324                                cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
325                                input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
326                                output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
327                                ..StepUpdate::default()
328                            },
329                        )
330                        .await
331                    {
332                        tracing::error!(
333                            step_id = %step_id,
334                            error = %store_err,
335                            "failed to persist parallel step failure"
336                        );
337                    }
338
339                    indexed_results[idx] = Some(Err(err_msg.clone()));
340
341                    if first_error.is_none() {
342                        first_error = Some(err);
343                    }
344
345                    if fail_fast {
346                        join_set.abort_all();
347                    }
348                }
349            }
350        }
351
352        if let Some(err) = first_error {
353            return Err(err);
354        }
355
356        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
357
358        // Build results in original order.
359        let results: Vec<ParallelStepResult> = step_records
360            .iter()
361            .enumerate()
362            .map(|(idx, (step_id, name, _))| {
363                let output = match indexed_results[idx].take() {
364                    Some(Ok(o)) => o,
365                    _ => unreachable!("all steps succeeded if no error returned"),
366                };
367                ParallelStepResult {
368                    name: name.clone(),
369                    output,
370                    step_id: *step_id,
371                }
372            })
373            .collect();
374
375        Ok(results)
376    }
377
378    /// Execute a shell step.
379    ///
380    /// Creates the step record, runs the command, persists the result,
381    /// and returns the output for use in subsequent steps.
382    ///
383    /// # Errors
384    ///
385    /// Returns [`EngineError`] if the command fails or the store errors.
386    ///
387    /// # Examples
388    ///
389    /// ```no_run
390    /// use ironflow_engine::context::WorkflowContext;
391    /// use ironflow_engine::config::ShellConfig;
392    /// use ironflow_engine::error::EngineError;
393    ///
394    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
395    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
396    /// println!("stdout: {}", files.output["stdout"]);
397    /// # Ok(())
398    /// # }
399    /// ```
400    pub async fn shell(
401        &mut self,
402        name: &str,
403        config: ShellConfig,
404    ) -> Result<StepOutput, EngineError> {
405        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
406            .await
407    }
408
409    /// Execute an HTTP step.
410    ///
411    /// # Errors
412    ///
413    /// Returns [`EngineError`] if the request fails or the store errors.
414    ///
415    /// # Examples
416    ///
417    /// ```no_run
418    /// use ironflow_engine::context::WorkflowContext;
419    /// use ironflow_engine::config::HttpConfig;
420    /// use ironflow_engine::error::EngineError;
421    ///
422    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
423    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
424    /// println!("status: {}", resp.output["status"]);
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn http(
429        &mut self,
430        name: &str,
431        config: HttpConfig,
432    ) -> Result<StepOutput, EngineError> {
433        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
434            .await
435    }
436
437    /// Execute an agent step.
438    ///
439    /// # Errors
440    ///
441    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
442    ///
443    /// # Examples
444    ///
445    /// ```no_run
446    /// use ironflow_engine::context::WorkflowContext;
447    /// use ironflow_engine::config::AgentStepConfig;
448    /// use ironflow_engine::error::EngineError;
449    ///
450    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
451    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
452    /// println!("review: {}", review.output["value"]);
453    /// # Ok(())
454    /// # }
455    /// ```
456    pub async fn agent(
457        &mut self,
458        name: &str,
459        config: impl Into<AgentStepConfig>,
460    ) -> Result<StepOutput, EngineError> {
461        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
462            .await
463    }
464
465    /// Create a human approval gate.
466    ///
467    /// On first execution, records an approval step and returns
468    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
469    /// transitions the run to `AwaitingApproval`.
470    ///
471    /// On resume (after a human approved via the API), the approval step
472    /// is replayed: it is marked as `Completed` and execution continues
473    /// past it. Multiple approval gates in the same handler work -- each
474    /// one pauses and resumes independently.
475    ///
476    /// # Errors
477    ///
478    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
479    /// first execution. Returns other [`EngineError`] variants on store
480    /// failures.
481    ///
482    /// # Examples
483    ///
484    /// ```no_run
485    /// use ironflow_engine::context::WorkflowContext;
486    /// use ironflow_engine::config::ApprovalConfig;
487    /// use ironflow_engine::error::EngineError;
488    ///
489    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
490    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
491    /// // Execution continues here after approval
492    /// # Ok(())
493    /// # }
494    /// ```
495    pub async fn approval(
496        &mut self,
497        name: &str,
498        config: ApprovalConfig,
499    ) -> Result<(), EngineError> {
500        let position = self.position;
501        self.position += 1;
502
503        // Replay: if this approval step exists from a prior execution,
504        // the run was approved -- mark it completed (if not already) and continue.
505        if let Some(existing) = self.replay_steps.get(&position)
506            && existing.kind == StepKind::Approval
507        {
508            if existing.status.state == StepStatus::AwaitingApproval {
509                self.store
510                    .update_step(
511                        existing.id,
512                        StepUpdate {
513                            status: Some(StepStatus::Completed),
514                            completed_at: Some(Utc::now()),
515                            ..StepUpdate::default()
516                        },
517                    )
518                    .await?;
519            }
520
521            self.last_step_ids = vec![existing.id];
522            info!(
523                run_id = %self.run_id,
524                step = %name,
525                position,
526                "approval step replayed (approved)"
527            );
528            return Ok(());
529        }
530
531        // First execution: create the approval step and suspend.
532        let step = self
533            .store
534            .create_step(NewStep {
535                run_id: self.run_id,
536                name: name.to_string(),
537                kind: StepKind::Approval,
538                position,
539                input: Some(serde_json::to_value(&config)?),
540            })
541            .await?;
542
543        self.start_step(step.id, Utc::now()).await?;
544
545        // Transition the step to AwaitingApproval so it reflects
546        // the suspended state on the dashboard.
547        self.store
548            .update_step(
549                step.id,
550                StepUpdate {
551                    status: Some(StepStatus::AwaitingApproval),
552                    ..StepUpdate::default()
553                },
554            )
555            .await?;
556
557        self.last_step_ids = vec![step.id];
558
559        Err(EngineError::ApprovalRequired {
560            run_id: self.run_id,
561            step_id: step.id,
562            message: config.message().to_string(),
563        })
564    }
565
566    /// Record a step as explicitly skipped.
567    ///
568    /// Use this inside an `if`/`else` branch when a step should not execute
569    /// but must still appear in the DAG and timeline with its reason.
570    ///
571    /// The step is created directly in [`StepStatus::Skipped`] state and the
572    /// reason is stored in the output as `{"reason": "..."}`.
573    ///
574    /// # Errors
575    ///
576    /// Returns [`EngineError`] if the store fails.
577    ///
578    /// # Examples
579    ///
580    /// ```no_run
581    /// use ironflow_engine::context::WorkflowContext;
582    /// use ironflow_engine::error::EngineError;
583    ///
584    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
585    /// let tests_passed = false;
586    /// if tests_passed {
587    ///     // ctx.shell("deploy", ...).await?;
588    /// } else {
589    ///     ctx.skip("deploy", "tests failed").await?;
590    /// }
591    /// # Ok(())
592    /// # }
593    /// ```
594    pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
595        let position = self.position;
596        self.position += 1;
597
598        let step = self
599            .store
600            .create_step(NewStep {
601                run_id: self.run_id,
602                name: name.to_string(),
603                kind: StepKind::Custom("skip".to_string()),
604                position,
605                input: None,
606            })
607            .await?;
608
609        if !self.last_step_ids.is_empty() {
610            let deps: Vec<NewStepDependency> = self
611                .last_step_ids
612                .iter()
613                .map(|&depends_on| NewStepDependency {
614                    step_id: step.id,
615                    depends_on,
616                })
617                .collect();
618            self.store.create_step_dependencies(deps).await?;
619        }
620
621        let now = Utc::now();
622        self.store
623            .update_step(
624                step.id,
625                StepUpdate {
626                    status: Some(StepStatus::Skipped),
627                    output: Some(serde_json::json!({"reason": reason})),
628                    completed_at: Some(now),
629                    ..StepUpdate::default()
630                },
631            )
632            .await?;
633
634        self.last_step_ids = vec![step.id];
635
636        info!(
637            run_id = %self.run_id,
638            step = %name,
639            reason,
640            "step skipped"
641        );
642
643        Ok(())
644    }
645
646    /// Execute a custom operation step.
647    ///
648    /// Runs a user-defined [`Operation`] with full step lifecycle management:
649    /// creates the step record, transitions to Running, executes the operation,
650    /// persists the output and duration, and marks the step Completed or Failed.
651    ///
652    /// The operation's [`kind()`](Operation::kind) is stored as
653    /// [`StepKind::Custom`].
654    ///
655    /// # Errors
656    ///
657    /// Returns [`EngineError`] if the operation fails or the store errors.
658    ///
659    /// # Examples
660    ///
661    /// ```no_run
662    /// use ironflow_engine::context::WorkflowContext;
663    /// use ironflow_engine::operation::Operation;
664    /// use ironflow_engine::error::EngineError;
665    /// use serde_json::{Value, json};
666    /// use std::pin::Pin;
667    /// use std::future::Future;
668    ///
669    /// struct MyOp;
670    /// impl Operation for MyOp {
671    ///     fn kind(&self) -> &str { "my-service" }
672    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
673    ///         Box::pin(async { Ok(json!({"ok": true})) })
674    ///     }
675    /// }
676    ///
677    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
678    /// let result = ctx.operation("call-service", &MyOp).await?;
679    /// println!("output: {}", result.output);
680    /// # Ok(())
681    /// # }
682    /// ```
683    pub async fn operation(
684        &mut self,
685        name: &str,
686        op: &dyn Operation,
687    ) -> Result<StepOutput, EngineError> {
688        let kind = StepKind::Custom(op.kind().to_string());
689        let position = self.position;
690        self.position += 1;
691
692        let step = self
693            .store
694            .create_step(NewStep {
695                run_id: self.run_id,
696                name: name.to_string(),
697                kind,
698                position,
699                input: op.input(),
700            })
701            .await?;
702
703        self.start_step(step.id, Utc::now()).await?;
704
705        let start = Instant::now();
706
707        match op.execute().await {
708            Ok(output_value) => {
709                let duration_ms = start.elapsed().as_millis() as u64;
710                self.total_duration_ms += duration_ms;
711
712                let completed_at = Utc::now();
713                self.store
714                    .update_step(
715                        step.id,
716                        StepUpdate {
717                            status: Some(StepStatus::Completed),
718                            output: Some(output_value.clone()),
719                            duration_ms: Some(duration_ms),
720                            cost_usd: Some(Decimal::ZERO),
721                            completed_at: Some(completed_at),
722                            ..StepUpdate::default()
723                        },
724                    )
725                    .await?;
726
727                info!(
728                    run_id = %self.run_id,
729                    step = %name,
730                    kind = op.kind(),
731                    duration_ms,
732                    "operation step completed"
733                );
734
735                self.last_step_ids = vec![step.id];
736
737                Ok(StepOutput {
738                    output: output_value,
739                    duration_ms,
740                    cost_usd: Decimal::ZERO,
741                    input_tokens: None,
742                    output_tokens: None,
743                    debug_messages: None,
744                })
745            }
746            Err(err) => {
747                let completed_at = Utc::now();
748                if let Err(store_err) = self
749                    .store
750                    .update_step(
751                        step.id,
752                        StepUpdate {
753                            status: Some(StepStatus::Failed),
754                            error: Some(err.to_string()),
755                            completed_at: Some(completed_at),
756                            ..StepUpdate::default()
757                        },
758                    )
759                    .await
760                {
761                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
762                }
763
764                Err(err)
765            }
766        }
767    }
768
769    /// Execute a sub-workflow step.
770    ///
771    /// Creates a child run for the named workflow handler, executes it with
772    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
773    /// the child run ID and aggregated metrics.
774    ///
775    /// Requires the context to be created with
776    /// `with_handler_resolver`.
777    ///
778    /// # Errors
779    ///
780    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
781    /// with the given name, or if no handler resolver is available.
782    ///
783    /// # Examples
784    ///
785    /// ```no_run
786    /// use ironflow_engine::context::WorkflowContext;
787    /// use ironflow_engine::error::EngineError;
788    /// use serde_json::json;
789    ///
790    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
791    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
792    /// # Ok(())
793    /// # }
794    /// ```
795    pub async fn workflow(
796        &mut self,
797        handler: &dyn WorkflowHandler,
798        payload: Value,
799    ) -> Result<StepOutput, EngineError> {
800        let config = WorkflowStepConfig::new(handler.name(), payload);
801        let position = self.position;
802        self.position += 1;
803
804        let step = self
805            .store
806            .create_step(NewStep {
807                run_id: self.run_id,
808                name: config.workflow_name.clone(),
809                kind: StepKind::Workflow,
810                position,
811                input: Some(serde_json::to_value(&config)?),
812            })
813            .await?;
814
815        self.start_step(step.id, Utc::now()).await?;
816
817        match self.execute_child_workflow(&config).await {
818            Ok(output) => {
819                self.total_cost_usd += output.cost_usd;
820                self.total_duration_ms += output.duration_ms;
821
822                let completed_at = Utc::now();
823                self.store
824                    .update_step(
825                        step.id,
826                        StepUpdate {
827                            status: Some(StepStatus::Completed),
828                            output: Some(output.output.clone()),
829                            duration_ms: Some(output.duration_ms),
830                            cost_usd: Some(output.cost_usd),
831                            completed_at: Some(completed_at),
832                            ..StepUpdate::default()
833                        },
834                    )
835                    .await?;
836
837                info!(
838                    run_id = %self.run_id,
839                    child_workflow = %config.workflow_name,
840                    duration_ms = output.duration_ms,
841                    "workflow step completed"
842                );
843
844                self.last_step_ids = vec![step.id];
845
846                Ok(output)
847            }
848            Err(err) => {
849                let completed_at = Utc::now();
850                if let Err(store_err) = self
851                    .store
852                    .update_step(
853                        step.id,
854                        StepUpdate {
855                            status: Some(StepStatus::Failed),
856                            error: Some(err.to_string()),
857                            completed_at: Some(completed_at),
858                            ..StepUpdate::default()
859                        },
860                    )
861                    .await
862                {
863                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
864                }
865
866                Err(err)
867            }
868        }
869    }
870
871    /// Execute a child workflow and return aggregated output.
872    async fn execute_child_workflow(
873        &self,
874        config: &WorkflowStepConfig,
875    ) -> Result<StepOutput, EngineError> {
876        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
877            EngineError::InvalidWorkflow(
878                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
879            )
880        })?;
881
882        let handler = resolver(&config.workflow_name).ok_or_else(|| {
883            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
884        })?;
885
886        let parent_labels = self
887            .store
888            .get_run(self.run_id)
889            .await?
890            .map(|r| r.labels)
891            .unwrap_or_default();
892
893        let child_run = self
894            .store
895            .create_run(NewRun {
896                workflow_name: config.workflow_name.clone(),
897                trigger: TriggerKind::Workflow,
898                payload: config.payload.clone(),
899                max_retries: 0,
900                handler_version: None,
901                labels: parent_labels,
902                scheduled_at: None,
903            })
904            .await?;
905
906        let child_run_id = child_run.id;
907        info!(
908            parent_run_id = %self.run_id,
909            child_run_id = %child_run_id,
910            workflow = %config.workflow_name,
911            "child run created"
912        );
913
914        self.store
915            .update_run_status(child_run_id, RunStatus::Running)
916            .await?;
917
918        let run_start = Instant::now();
919        let mut child_ctx = WorkflowContext {
920            run_id: child_run_id,
921            store: self.store.clone(),
922            provider: self.provider.clone(),
923            handler_resolver: self.handler_resolver.clone(),
924            position: 0,
925            last_step_ids: Vec::new(),
926            total_cost_usd: Decimal::ZERO,
927            total_duration_ms: 0,
928            replay_steps: HashMap::new(),
929        };
930
931        let result = handler.execute(&mut child_ctx).await;
932        let total_duration = run_start.elapsed().as_millis() as u64;
933        let completed_at = Utc::now();
934
935        match result {
936            Ok(()) => {
937                self.store
938                    .update_run(
939                        child_run_id,
940                        RunUpdate {
941                            status: Some(RunStatus::Completed),
942                            cost_usd: Some(child_ctx.total_cost_usd),
943                            duration_ms: Some(total_duration),
944                            completed_at: Some(completed_at),
945                            ..RunUpdate::default()
946                        },
947                    )
948                    .await?;
949
950                Ok(StepOutput {
951                    output: serde_json::json!({
952                        "run_id": child_run_id,
953                        "workflow_name": config.workflow_name,
954                        "status": RunStatus::Completed,
955                        "cost_usd": child_ctx.total_cost_usd,
956                        "duration_ms": total_duration,
957                    }),
958                    duration_ms: total_duration,
959                    cost_usd: child_ctx.total_cost_usd,
960                    input_tokens: None,
961                    output_tokens: None,
962                    debug_messages: None,
963                })
964            }
965            Err(err) => {
966                if let Err(store_err) = self
967                    .store
968                    .update_run(
969                        child_run_id,
970                        RunUpdate {
971                            status: Some(RunStatus::Failed),
972                            error: Some(err.to_string()),
973                            cost_usd: Some(child_ctx.total_cost_usd),
974                            duration_ms: Some(total_duration),
975                            completed_at: Some(completed_at),
976                            ..RunUpdate::default()
977                        },
978                    )
979                    .await
980                {
981                    error!(
982                        child_run_id = %child_run_id,
983                        store_error = %store_err,
984                        "failed to persist child run failure"
985                    );
986                }
987
988                Err(err)
989            }
990        }
991    }
992
993    /// Try to replay a completed step from a previous execution.
994    ///
995    /// Returns `Some(StepOutput)` if a completed step exists at the given
996    /// position, `None` otherwise.
997    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
998        let step = self.replay_steps.get(&position)?;
999        if step.status.state != StepStatus::Completed {
1000            return None;
1001        }
1002        let output = StepOutput {
1003            output: step.output.clone().unwrap_or(Value::Null),
1004            duration_ms: step.duration_ms,
1005            cost_usd: step.cost_usd,
1006            input_tokens: step.input_tokens,
1007            output_tokens: step.output_tokens,
1008            debug_messages: None,
1009        };
1010        self.total_cost_usd += output.cost_usd;
1011        self.total_duration_ms += output.duration_ms;
1012        self.last_step_ids = vec![step.id];
1013        info!(
1014            run_id = %self.run_id,
1015            step = %step.name,
1016            position,
1017            "step replayed from previous execution"
1018        );
1019        Some(output)
1020    }
1021
1022    /// Internal: execute a step with full persistence lifecycle.
1023    async fn execute_step(
1024        &mut self,
1025        name: &str,
1026        kind: StepKind,
1027        config: StepConfig,
1028    ) -> Result<StepOutput, EngineError> {
1029        let position = self.position;
1030        self.position += 1;
1031
1032        // Replay: if this step already completed in a prior execution, return cached output.
1033        if let Some(output) = self.try_replay_step(position) {
1034            return Ok(output);
1035        }
1036
1037        // Create step record in Pending.
1038        let step = self
1039            .store
1040            .create_step(NewStep {
1041                run_id: self.run_id,
1042                name: name.to_string(),
1043                kind,
1044                position,
1045                input: Some(serde_json::to_value(&config)?),
1046            })
1047            .await?;
1048
1049        self.start_step(step.id, Utc::now()).await?;
1050
1051        match execute_step_config(&config, &self.provider).await {
1052            Ok(output) => {
1053                self.total_cost_usd += output.cost_usd;
1054                self.total_duration_ms += output.duration_ms;
1055
1056                let debug_messages_json = output.debug_messages_json();
1057
1058                let completed_at = Utc::now();
1059                self.store
1060                    .update_step(
1061                        step.id,
1062                        StepUpdate {
1063                            status: Some(StepStatus::Completed),
1064                            output: Some(output.output.clone()),
1065                            duration_ms: Some(output.duration_ms),
1066                            cost_usd: Some(output.cost_usd),
1067                            input_tokens: output.input_tokens,
1068                            output_tokens: output.output_tokens,
1069                            completed_at: Some(completed_at),
1070                            debug_messages: debug_messages_json,
1071                            ..StepUpdate::default()
1072                        },
1073                    )
1074                    .await?;
1075
1076                info!(
1077                    run_id = %self.run_id,
1078                    step = %name,
1079                    duration_ms = output.duration_ms,
1080                    "step completed"
1081                );
1082
1083                self.last_step_ids = vec![step.id];
1084
1085                Ok(output)
1086            }
1087            Err(err) => {
1088                let completed_at = Utc::now();
1089                let debug_messages_json = extract_debug_messages_from_error(&err);
1090                let partial = extract_partial_usage_from_error(&err);
1091
1092                if let Some(ref usage) = partial {
1093                    if let Some(cost) = usage.cost_usd {
1094                        self.total_cost_usd += cost;
1095                    }
1096                    if let Some(dur) = usage.duration_ms {
1097                        self.total_duration_ms += dur;
1098                    }
1099                }
1100
1101                if let Err(store_err) = self
1102                    .store
1103                    .update_step(
1104                        step.id,
1105                        StepUpdate {
1106                            status: Some(StepStatus::Failed),
1107                            error: Some(err.to_string()),
1108                            completed_at: Some(completed_at),
1109                            debug_messages: debug_messages_json,
1110                            duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1111                            cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1112                            input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1113                            output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1114                            ..StepUpdate::default()
1115                        },
1116                    )
1117                    .await
1118                {
1119                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1120                }
1121
1122                Err(err)
1123            }
1124        }
1125    }
1126
1127    /// Record dependency edges and transition a step to Running.
1128    ///
1129    /// Records edges from `step_id` to all `last_step_ids`, then
1130    /// transitions the step to `Running` with the given timestamp.
1131    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1132        if !self.last_step_ids.is_empty() {
1133            let deps: Vec<NewStepDependency> = self
1134                .last_step_ids
1135                .iter()
1136                .map(|&depends_on| NewStepDependency {
1137                    step_id,
1138                    depends_on,
1139                })
1140                .collect();
1141            self.store.create_step_dependencies(deps).await?;
1142        }
1143
1144        self.store
1145            .update_step(
1146                step_id,
1147                StepUpdate {
1148                    status: Some(StepStatus::Running),
1149                    started_at: Some(now),
1150                    ..StepUpdate::default()
1151                },
1152            )
1153            .await?;
1154
1155        Ok(())
1156    }
1157
1158    /// Access the store directly (advanced usage).
1159    pub fn store(&self) -> &Arc<dyn Store> {
1160        &self.store
1161    }
1162
1163    /// Access the payload that triggered this run.
1164    ///
1165    /// Fetches the run from the store and returns its payload.
1166    ///
1167    /// # Errors
1168    ///
1169    /// Returns [`EngineError::Store`] if the run is not found.
1170    pub async fn payload(&self) -> Result<Value, EngineError> {
1171        let run = self
1172            .store
1173            .get_run(self.run_id)
1174            .await?
1175            .ok_or(EngineError::Store(
1176                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1177            ))?;
1178        Ok(run.payload)
1179    }
1180}
1181
1182impl fmt::Debug for WorkflowContext {
1183    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1184        f.debug_struct("WorkflowContext")
1185            .field("run_id", &self.run_id)
1186            .field("position", &self.position)
1187            .field("total_cost_usd", &self.total_cost_usd)
1188            .finish_non_exhaustive()
1189    }
1190}
1191
1192/// Extract debug messages from an engine error, if it wraps a schema validation
1193/// failure that carries a verbose conversation trace.
1194fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1195    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1196        debug_messages,
1197        ..
1198    })) = err
1199        && !debug_messages.is_empty()
1200    {
1201        return serde_json::to_value(debug_messages).ok();
1202    }
1203    None
1204}
1205
1206/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1207///
1208/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1209/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1210/// happens here, at the engine/store boundary.
1211struct StepPartialUsage {
1212    cost_usd: Option<Decimal>,
1213    duration_ms: Option<u64>,
1214    input_tokens: Option<u64>,
1215    output_tokens: Option<u64>,
1216}
1217
1218fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1219    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1220        partial_usage,
1221        ..
1222    })) = err
1223        && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1224    {
1225        return Some(StepPartialUsage {
1226            cost_usd: partial_usage
1227                .cost_usd
1228                .and_then(|c| Decimal::try_from(c).ok()),
1229            duration_ms: partial_usage.duration_ms,
1230            input_tokens: partial_usage.input_tokens,
1231            output_tokens: partial_usage.output_tokens,
1232        });
1233    }
1234    None
1235}