Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41    NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42    StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47    AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::log_sender::{LogSender, StepLogSender};
53use crate::operation::Operation;
54
55/// Callback type for resolving workflow handlers by name.
56pub(crate) type HandlerResolver =
57    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
58
59/// Execution context for a single workflow run.
60///
61/// Tracks the current step position and provides convenience methods
62/// for executing operations with automatic persistence.
63///
64/// # Examples
65///
66/// ```no_run
67/// use ironflow_engine::context::WorkflowContext;
68/// use ironflow_engine::config::ShellConfig;
69/// use ironflow_engine::error::EngineError;
70///
71/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
72/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
73/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
74/// # Ok(())
75/// # }
76/// ```
77pub struct WorkflowContext {
78    run_id: Uuid,
79    store: Arc<dyn Store>,
80    provider: Arc<dyn AgentProvider>,
81    handler_resolver: Option<HandlerResolver>,
82    position: u32,
83    /// IDs of the last executed step(s) -- used to record DAG dependencies.
84    last_step_ids: Vec<Uuid>,
85    /// Accumulated cost across all steps in this run.
86    total_cost_usd: Decimal,
87    /// Accumulated duration across all steps.
88    total_duration_ms: u64,
89    /// Steps from a previous execution, keyed by position.
90    /// Used when resuming after approval to replay completed steps.
91    replay_steps: HashMap<u32, Step>,
92    /// Optional sender for real-time log streaming.
93    log_sender: Option<LogSender>,
94}
95
96impl WorkflowContext {
97    /// Create a new context for a run.
98    ///
99    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
100    /// creates this when executing a [`WorkflowHandler`].
101    pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
102        Self {
103            run_id,
104            store,
105            provider,
106            handler_resolver: None,
107            position: 0,
108            last_step_ids: Vec::new(),
109            total_cost_usd: Decimal::ZERO,
110            total_duration_ms: 0,
111            replay_steps: HashMap::new(),
112            log_sender: None,
113        }
114    }
115
116    /// Create a new context with a handler resolver for sub-workflow support.
117    ///
118    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
119    /// look up registered handlers by name.
120    pub(crate) fn with_handler_resolver(
121        run_id: Uuid,
122        store: Arc<dyn Store>,
123        provider: Arc<dyn AgentProvider>,
124        resolver: HandlerResolver,
125    ) -> Self {
126        Self {
127            run_id,
128            store,
129            provider,
130            handler_resolver: Some(resolver),
131            position: 0,
132            last_step_ids: Vec::new(),
133            total_cost_usd: Decimal::ZERO,
134            total_duration_ms: 0,
135            replay_steps: HashMap::new(),
136            log_sender: None,
137        }
138    }
139
140    /// Attach a log sender for real-time step output streaming.
141    pub fn set_log_sender(&mut self, sender: LogSender) {
142        self.log_sender = Some(sender);
143    }
144
145    /// Load existing steps from the store for replay after approval.
146    ///
147    /// Called by the engine when resuming a run. All completed steps
148    /// and the approved approval step are indexed by position so that
149    /// `execute_step` and `approval` can skip them.
150    pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
151        let steps = self.store.list_steps(self.run_id).await?;
152        for step in steps {
153            let dominated = matches!(
154                step.status.state,
155                StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
156            );
157            if dominated {
158                self.replay_steps.insert(step.position, step);
159            }
160        }
161        Ok(())
162    }
163
164    /// The run ID this context is executing for.
165    pub fn run_id(&self) -> Uuid {
166        self.run_id
167    }
168
169    /// Accumulated cost across all executed steps so far.
170    pub fn total_cost_usd(&self) -> Decimal {
171        self.total_cost_usd
172    }
173
174    /// Accumulated duration across all executed steps so far.
175    pub fn total_duration_ms(&self) -> u64 {
176        self.total_duration_ms
177    }
178
179    /// Execute multiple steps concurrently (wait-all model).
180    ///
181    /// All steps in the batch execute in parallel via `tokio::JoinSet`.
182    /// Each step is recorded with the same `position` (execution wave).
183    /// Dependencies on previous steps are recorded automatically.
184    ///
185    /// When `fail_fast` is true, remaining steps are aborted on the first
186    /// failure. When false, all steps run to completion and the first
187    /// error is returned.
188    ///
189    /// # Errors
190    ///
191    /// Returns [`EngineError`] if any step fails.
192    ///
193    /// # Examples
194    ///
195    /// ```no_run
196    /// use ironflow_engine::context::WorkflowContext;
197    /// use ironflow_engine::config::{StepConfig, ShellConfig};
198    /// use ironflow_engine::error::EngineError;
199    ///
200    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
201    /// let results = ctx.parallel(
202    ///     vec![
203    ///         ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
204    ///         ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
205    ///     ],
206    ///     true,
207    /// ).await?;
208    ///
209    /// for r in &results {
210    ///     println!("{}: {:?}", r.name, r.output.output);
211    /// }
212    /// # Ok(())
213    /// # }
214    /// ```
215    pub async fn parallel(
216        &mut self,
217        steps: Vec<(&str, StepConfig)>,
218        fail_fast: bool,
219    ) -> Result<Vec<ParallelStepResult>, EngineError> {
220        if steps.is_empty() {
221            return Ok(Vec::new());
222        }
223
224        let wave_position = self.position;
225        self.position += 1;
226
227        let now = Utc::now();
228        let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
229
230        for (name, config) in &steps {
231            let kind = config.kind();
232            let step = self
233                .store
234                .create_step(NewStep {
235                    run_id: self.run_id,
236                    name: name.to_string(),
237                    kind,
238                    position: wave_position,
239                    input: Some(serde_json::to_value(config)?),
240                })
241                .await?;
242
243            self.start_step(step.id, now).await?;
244
245            step_records.push((step.id, name.to_string(), config.clone()));
246        }
247
248        let mut join_set = JoinSet::new();
249        for (idx, (step_id, step_name, config)) in step_records.iter().enumerate() {
250            let provider = self.provider.clone();
251            let config = config.clone();
252            let step_log_sender = self
253                .log_sender
254                .as_ref()
255                .map(|s| StepLogSender::new(s.clone(), self.run_id, *step_id, step_name.clone()));
256            join_set.spawn(async move {
257                (
258                    idx,
259                    execute_step_config(&config, &provider, step_log_sender).await,
260                )
261            });
262        }
263
264        // JoinSet returns in completion order; indexed_results restores input order.
265        let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
266            vec![None; step_records.len()];
267        let mut first_error: Option<EngineError> = None;
268
269        while let Some(join_result) = join_set.join_next().await {
270            let (idx, step_result) = match join_result {
271                Ok(r) => r,
272                Err(e) => {
273                    if first_error.is_none() {
274                        first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
275                    }
276                    if fail_fast {
277                        join_set.abort_all();
278                    }
279                    continue;
280                }
281            };
282
283            let (step_id, step_name, _) = &step_records[idx];
284            let completed_at = Utc::now();
285
286            match step_result {
287                Ok(output) => {
288                    self.total_cost_usd += output.cost_usd;
289                    self.total_duration_ms += output.duration_ms;
290
291                    let debug_messages_json = output.debug_messages_json();
292
293                    self.store
294                        .update_step(
295                            *step_id,
296                            StepUpdate {
297                                status: Some(StepStatus::Completed),
298                                output: Some(output.output.clone()),
299                                duration_ms: Some(output.duration_ms),
300                                cost_usd: Some(output.cost_usd),
301                                input_tokens: output.input_tokens,
302                                output_tokens: output.output_tokens,
303                                completed_at: Some(completed_at),
304                                debug_messages: debug_messages_json,
305                                ..StepUpdate::default()
306                            },
307                        )
308                        .await?;
309
310                    info!(
311                        run_id = %self.run_id,
312                        step = %step_name,
313                        duration_ms = output.duration_ms,
314                        "parallel step completed"
315                    );
316
317                    indexed_results[idx] = Some(Ok(output));
318                }
319                Err(err) => {
320                    let err_msg = err.to_string();
321                    let debug_messages_json = extract_debug_messages_from_error(&err);
322                    let partial = extract_partial_usage_from_error(&err);
323                    let raw_response_output = extract_raw_response_from_error(&err);
324
325                    if let Some(ref usage) = partial {
326                        if let Some(cost) = usage.cost_usd {
327                            self.total_cost_usd += cost;
328                        }
329                        if let Some(dur) = usage.duration_ms {
330                            self.total_duration_ms += dur;
331                        }
332                    }
333
334                    if let Err(store_err) = self
335                        .store
336                        .update_step(
337                            *step_id,
338                            StepUpdate {
339                                status: Some(StepStatus::Failed),
340                                error: Some(err_msg.clone()),
341                                output: raw_response_output,
342                                completed_at: Some(completed_at),
343                                debug_messages: debug_messages_json,
344                                duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
345                                cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
346                                input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
347                                output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
348                                ..StepUpdate::default()
349                            },
350                        )
351                        .await
352                    {
353                        tracing::error!(
354                            step_id = %step_id,
355                            error = %store_err,
356                            "failed to persist parallel step failure"
357                        );
358                    }
359
360                    indexed_results[idx] = Some(Err(err_msg.clone()));
361
362                    if first_error.is_none() {
363                        first_error = Some(err);
364                    }
365
366                    if fail_fast {
367                        join_set.abort_all();
368                    }
369                }
370            }
371        }
372
373        if let Some(err) = first_error {
374            return Err(err);
375        }
376
377        self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
378
379        // Build results in original order.
380        let results: Vec<ParallelStepResult> = step_records
381            .iter()
382            .enumerate()
383            .map(|(idx, (step_id, name, _))| {
384                let output = match indexed_results[idx].take() {
385                    Some(Ok(o)) => o,
386                    _ => unreachable!("all steps succeeded if no error returned"),
387                };
388                ParallelStepResult {
389                    name: name.clone(),
390                    output,
391                    step_id: *step_id,
392                }
393            })
394            .collect();
395
396        Ok(results)
397    }
398
399    /// Execute a shell step.
400    ///
401    /// Creates the step record, runs the command, persists the result,
402    /// and returns the output for use in subsequent steps.
403    ///
404    /// # Errors
405    ///
406    /// Returns [`EngineError`] if the command fails or the store errors.
407    ///
408    /// # Examples
409    ///
410    /// ```no_run
411    /// use ironflow_engine::context::WorkflowContext;
412    /// use ironflow_engine::config::ShellConfig;
413    /// use ironflow_engine::error::EngineError;
414    ///
415    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
416    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
417    /// println!("stdout: {}", files.output["stdout"]);
418    /// # Ok(())
419    /// # }
420    /// ```
421    pub async fn shell(
422        &mut self,
423        name: &str,
424        config: ShellConfig,
425    ) -> Result<StepOutput, EngineError> {
426        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
427            .await
428    }
429
430    /// Execute an HTTP step.
431    ///
432    /// # Errors
433    ///
434    /// Returns [`EngineError`] if the request fails or the store errors.
435    ///
436    /// # Examples
437    ///
438    /// ```no_run
439    /// use ironflow_engine::context::WorkflowContext;
440    /// use ironflow_engine::config::HttpConfig;
441    /// use ironflow_engine::error::EngineError;
442    ///
443    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
444    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
445    /// println!("status: {}", resp.output["status"]);
446    /// # Ok(())
447    /// # }
448    /// ```
449    pub async fn http(
450        &mut self,
451        name: &str,
452        config: HttpConfig,
453    ) -> Result<StepOutput, EngineError> {
454        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
455            .await
456    }
457
458    /// Execute an agent step.
459    ///
460    /// # Errors
461    ///
462    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
463    ///
464    /// # Examples
465    ///
466    /// ```no_run
467    /// use ironflow_engine::context::WorkflowContext;
468    /// use ironflow_engine::config::AgentStepConfig;
469    /// use ironflow_engine::error::EngineError;
470    ///
471    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
472    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
473    /// println!("review: {}", review.output);
474    /// # Ok(())
475    /// # }
476    /// ```
477    pub async fn agent(
478        &mut self,
479        name: &str,
480        config: impl Into<AgentStepConfig>,
481    ) -> Result<StepOutput, EngineError> {
482        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
483            .await
484    }
485
486    /// Create a human approval gate.
487    ///
488    /// On first execution, records an approval step and returns
489    /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
490    /// transitions the run to `AwaitingApproval`.
491    ///
492    /// On resume (after a human approved via the API), the approval step
493    /// is replayed: it is marked as `Completed` and execution continues
494    /// past it. Multiple approval gates in the same handler work -- each
495    /// one pauses and resumes independently.
496    ///
497    /// # Errors
498    ///
499    /// Returns [`EngineError::ApprovalRequired`] to pause the run on
500    /// first execution. Returns other [`EngineError`] variants on store
501    /// failures.
502    ///
503    /// # Examples
504    ///
505    /// ```no_run
506    /// use ironflow_engine::context::WorkflowContext;
507    /// use ironflow_engine::config::ApprovalConfig;
508    /// use ironflow_engine::error::EngineError;
509    ///
510    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
511    /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
512    /// // Execution continues here after approval
513    /// # Ok(())
514    /// # }
515    /// ```
516    pub async fn approval(
517        &mut self,
518        name: &str,
519        config: ApprovalConfig,
520    ) -> Result<(), EngineError> {
521        let position = self.position;
522        self.position += 1;
523
524        // Replay: if this approval step exists from a prior execution,
525        // the run was approved -- mark it completed (if not already) and continue.
526        if let Some(existing) = self.replay_steps.get(&position)
527            && existing.kind == StepKind::Approval
528        {
529            if existing.status.state == StepStatus::AwaitingApproval {
530                self.store
531                    .update_step(
532                        existing.id,
533                        StepUpdate {
534                            status: Some(StepStatus::Completed),
535                            completed_at: Some(Utc::now()),
536                            ..StepUpdate::default()
537                        },
538                    )
539                    .await?;
540            }
541
542            self.last_step_ids = vec![existing.id];
543            info!(
544                run_id = %self.run_id,
545                step = %name,
546                position,
547                "approval step replayed (approved)"
548            );
549            return Ok(());
550        }
551
552        // First execution: create the approval step and suspend.
553        let step = self
554            .store
555            .create_step(NewStep {
556                run_id: self.run_id,
557                name: name.to_string(),
558                kind: StepKind::Approval,
559                position,
560                input: Some(serde_json::to_value(&config)?),
561            })
562            .await?;
563
564        self.start_step(step.id, Utc::now()).await?;
565
566        // Transition the step to AwaitingApproval so it reflects
567        // the suspended state on the dashboard.
568        self.store
569            .update_step(
570                step.id,
571                StepUpdate {
572                    status: Some(StepStatus::AwaitingApproval),
573                    ..StepUpdate::default()
574                },
575            )
576            .await?;
577
578        self.last_step_ids = vec![step.id];
579
580        Err(EngineError::ApprovalRequired {
581            run_id: self.run_id,
582            step_id: step.id,
583            message: config.message().to_string(),
584        })
585    }
586
587    /// Record a step as explicitly skipped.
588    ///
589    /// Use this inside an `if`/`else` branch when a step should not execute
590    /// but must still appear in the DAG and timeline with its reason.
591    ///
592    /// The step is created directly in [`StepStatus::Skipped`] state and the
593    /// reason is stored in the output as `{"reason": "..."}`.
594    ///
595    /// # Errors
596    ///
597    /// Returns [`EngineError`] if the store fails.
598    ///
599    /// # Examples
600    ///
601    /// ```no_run
602    /// use ironflow_engine::context::WorkflowContext;
603    /// use ironflow_engine::error::EngineError;
604    ///
605    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
606    /// let tests_passed = false;
607    /// if tests_passed {
608    ///     // ctx.shell("deploy", ...).await?;
609    /// } else {
610    ///     ctx.skip("deploy", "tests failed").await?;
611    /// }
612    /// # Ok(())
613    /// # }
614    /// ```
615    pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
616        let position = self.position;
617        self.position += 1;
618
619        let step = self
620            .store
621            .create_step(NewStep {
622                run_id: self.run_id,
623                name: name.to_string(),
624                kind: StepKind::Custom("skip".to_string()),
625                position,
626                input: None,
627            })
628            .await?;
629
630        if !self.last_step_ids.is_empty() {
631            let deps: Vec<NewStepDependency> = self
632                .last_step_ids
633                .iter()
634                .map(|&depends_on| NewStepDependency {
635                    step_id: step.id,
636                    depends_on,
637                })
638                .collect();
639            self.store.create_step_dependencies(deps).await?;
640        }
641
642        let now = Utc::now();
643        self.store
644            .update_step(
645                step.id,
646                StepUpdate {
647                    status: Some(StepStatus::Skipped),
648                    output: Some(serde_json::json!({"reason": reason})),
649                    completed_at: Some(now),
650                    ..StepUpdate::default()
651                },
652            )
653            .await?;
654
655        self.last_step_ids = vec![step.id];
656
657        info!(
658            run_id = %self.run_id,
659            step = %name,
660            reason,
661            "step skipped"
662        );
663
664        Ok(())
665    }
666
667    /// Execute a custom operation step.
668    ///
669    /// Runs a user-defined [`Operation`] with full step lifecycle management:
670    /// creates the step record, transitions to Running, executes the operation,
671    /// persists the output and duration, and marks the step Completed or Failed.
672    ///
673    /// The operation's [`kind()`](Operation::kind) is stored as
674    /// [`StepKind::Custom`].
675    ///
676    /// # Errors
677    ///
678    /// Returns [`EngineError`] if the operation fails or the store errors.
679    ///
680    /// # Examples
681    ///
682    /// ```no_run
683    /// use ironflow_engine::context::WorkflowContext;
684    /// use ironflow_engine::operation::Operation;
685    /// use ironflow_engine::error::EngineError;
686    /// use serde_json::{Value, json};
687    /// use std::pin::Pin;
688    /// use std::future::Future;
689    ///
690    /// struct MyOp;
691    /// impl Operation for MyOp {
692    ///     fn kind(&self) -> &str { "my-service" }
693    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
694    ///         Box::pin(async { Ok(json!({"ok": true})) })
695    ///     }
696    /// }
697    ///
698    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
699    /// let result = ctx.operation("call-service", &MyOp).await?;
700    /// println!("output: {}", result.output);
701    /// # Ok(())
702    /// # }
703    /// ```
704    pub async fn operation(
705        &mut self,
706        name: &str,
707        op: &dyn Operation,
708    ) -> Result<StepOutput, EngineError> {
709        let kind = StepKind::Custom(op.kind().to_string());
710        let position = self.position;
711        self.position += 1;
712
713        let step = self
714            .store
715            .create_step(NewStep {
716                run_id: self.run_id,
717                name: name.to_string(),
718                kind,
719                position,
720                input: op.input(),
721            })
722            .await?;
723
724        self.start_step(step.id, Utc::now()).await?;
725
726        let start = Instant::now();
727
728        match op.execute().await {
729            Ok(output_value) => {
730                let duration_ms = start.elapsed().as_millis() as u64;
731                self.total_duration_ms += duration_ms;
732
733                let completed_at = Utc::now();
734                self.store
735                    .update_step(
736                        step.id,
737                        StepUpdate {
738                            status: Some(StepStatus::Completed),
739                            output: Some(output_value.clone()),
740                            duration_ms: Some(duration_ms),
741                            cost_usd: Some(Decimal::ZERO),
742                            completed_at: Some(completed_at),
743                            ..StepUpdate::default()
744                        },
745                    )
746                    .await?;
747
748                info!(
749                    run_id = %self.run_id,
750                    step = %name,
751                    kind = op.kind(),
752                    duration_ms,
753                    "operation step completed"
754                );
755
756                self.last_step_ids = vec![step.id];
757
758                Ok(StepOutput {
759                    output: output_value,
760                    duration_ms,
761                    cost_usd: Decimal::ZERO,
762                    input_tokens: None,
763                    output_tokens: None,
764                    model: None,
765                    debug_messages: None,
766                })
767            }
768            Err(err) => {
769                let completed_at = Utc::now();
770                if let Err(store_err) = self
771                    .store
772                    .update_step(
773                        step.id,
774                        StepUpdate {
775                            status: Some(StepStatus::Failed),
776                            error: Some(err.to_string()),
777                            completed_at: Some(completed_at),
778                            ..StepUpdate::default()
779                        },
780                    )
781                    .await
782                {
783                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
784                }
785
786                Err(err)
787            }
788        }
789    }
790
791    /// Execute a sub-workflow step.
792    ///
793    /// Creates a child run for the named workflow handler, executes it with
794    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
795    /// the child run ID and aggregated metrics.
796    ///
797    /// Requires the context to be created with
798    /// `with_handler_resolver`.
799    ///
800    /// # Errors
801    ///
802    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
803    /// with the given name, or if no handler resolver is available.
804    ///
805    /// # Examples
806    ///
807    /// ```no_run
808    /// use ironflow_engine::context::WorkflowContext;
809    /// use ironflow_engine::error::EngineError;
810    /// use serde_json::json;
811    ///
812    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
813    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
814    /// # Ok(())
815    /// # }
816    /// ```
817    pub async fn workflow(
818        &mut self,
819        handler: &dyn WorkflowHandler,
820        payload: Value,
821    ) -> Result<StepOutput, EngineError> {
822        let config = WorkflowStepConfig::new(handler.name(), payload);
823        let position = self.position;
824        self.position += 1;
825
826        let step = self
827            .store
828            .create_step(NewStep {
829                run_id: self.run_id,
830                name: config.workflow_name.clone(),
831                kind: StepKind::Workflow,
832                position,
833                input: Some(serde_json::to_value(&config)?),
834            })
835            .await?;
836
837        self.start_step(step.id, Utc::now()).await?;
838
839        match self.execute_child_workflow(&config).await {
840            Ok(output) => {
841                self.total_cost_usd += output.cost_usd;
842                self.total_duration_ms += output.duration_ms;
843
844                let completed_at = Utc::now();
845                self.store
846                    .update_step(
847                        step.id,
848                        StepUpdate {
849                            status: Some(StepStatus::Completed),
850                            output: Some(output.output.clone()),
851                            duration_ms: Some(output.duration_ms),
852                            cost_usd: Some(output.cost_usd),
853                            completed_at: Some(completed_at),
854                            ..StepUpdate::default()
855                        },
856                    )
857                    .await?;
858
859                info!(
860                    run_id = %self.run_id,
861                    child_workflow = %config.workflow_name,
862                    duration_ms = output.duration_ms,
863                    "workflow step completed"
864                );
865
866                self.last_step_ids = vec![step.id];
867
868                Ok(output)
869            }
870            Err(err) => {
871                let completed_at = Utc::now();
872                if let Err(store_err) = self
873                    .store
874                    .update_step(
875                        step.id,
876                        StepUpdate {
877                            status: Some(StepStatus::Failed),
878                            error: Some(err.to_string()),
879                            completed_at: Some(completed_at),
880                            ..StepUpdate::default()
881                        },
882                    )
883                    .await
884                {
885                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
886                }
887
888                Err(err)
889            }
890        }
891    }
892
893    /// Execute a child workflow and return aggregated output.
894    async fn execute_child_workflow(
895        &self,
896        config: &WorkflowStepConfig,
897    ) -> Result<StepOutput, EngineError> {
898        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
899            EngineError::InvalidWorkflow(
900                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
901            )
902        })?;
903
904        let handler = resolver(&config.workflow_name).ok_or_else(|| {
905            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
906        })?;
907
908        let parent_labels = self
909            .store
910            .get_run(self.run_id)
911            .await?
912            .map(|r| r.labels)
913            .unwrap_or_default();
914
915        let child_run = self
916            .store
917            .create_run(NewRun {
918                workflow_name: config.workflow_name.clone(),
919                trigger: TriggerKind::Workflow,
920                payload: config.payload.clone(),
921                max_retries: 0,
922                handler_version: None,
923                labels: parent_labels,
924                scheduled_at: None,
925            })
926            .await?;
927
928        let child_run_id = child_run.id;
929        info!(
930            parent_run_id = %self.run_id,
931            child_run_id = %child_run_id,
932            workflow = %config.workflow_name,
933            "child run created"
934        );
935
936        self.store
937            .update_run_status(child_run_id, RunStatus::Running)
938            .await?;
939
940        let run_start = Instant::now();
941        let mut child_ctx = WorkflowContext {
942            run_id: child_run_id,
943            store: self.store.clone(),
944            provider: self.provider.clone(),
945            handler_resolver: self.handler_resolver.clone(),
946            position: 0,
947            last_step_ids: Vec::new(),
948            total_cost_usd: Decimal::ZERO,
949            total_duration_ms: 0,
950            replay_steps: HashMap::new(),
951            log_sender: self.log_sender.clone(),
952        };
953
954        let result = handler.execute(&mut child_ctx).await;
955        let total_duration = run_start.elapsed().as_millis() as u64;
956        let completed_at = Utc::now();
957
958        match result {
959            Ok(()) => {
960                self.store
961                    .update_run(
962                        child_run_id,
963                        RunUpdate {
964                            status: Some(RunStatus::Completed),
965                            cost_usd: Some(child_ctx.total_cost_usd),
966                            duration_ms: Some(total_duration),
967                            completed_at: Some(completed_at),
968                            ..RunUpdate::default()
969                        },
970                    )
971                    .await?;
972
973                Ok(StepOutput {
974                    output: serde_json::json!({
975                        "run_id": child_run_id,
976                        "workflow_name": config.workflow_name,
977                        "status": RunStatus::Completed,
978                        "cost_usd": child_ctx.total_cost_usd,
979                        "duration_ms": total_duration,
980                    }),
981                    duration_ms: total_duration,
982                    cost_usd: child_ctx.total_cost_usd,
983                    input_tokens: None,
984                    output_tokens: None,
985                    model: None,
986                    debug_messages: None,
987                })
988            }
989            Err(err) => {
990                if let Err(store_err) = self
991                    .store
992                    .update_run(
993                        child_run_id,
994                        RunUpdate {
995                            status: Some(RunStatus::Failed),
996                            error: Some(err.to_string()),
997                            cost_usd: Some(child_ctx.total_cost_usd),
998                            duration_ms: Some(total_duration),
999                            completed_at: Some(completed_at),
1000                            ..RunUpdate::default()
1001                        },
1002                    )
1003                    .await
1004                {
1005                    error!(
1006                        child_run_id = %child_run_id,
1007                        store_error = %store_err,
1008                        "failed to persist child run failure"
1009                    );
1010                }
1011
1012                Err(err)
1013            }
1014        }
1015    }
1016
1017    /// Try to replay a completed step from a previous execution.
1018    ///
1019    /// Returns `Some(StepOutput)` if a completed step exists at the given
1020    /// position, `None` otherwise.
1021    fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
1022        let step = self.replay_steps.get(&position)?;
1023        if step.status.state != StepStatus::Completed {
1024            return None;
1025        }
1026        let output = StepOutput {
1027            output: step.output.clone().unwrap_or(Value::Null),
1028            duration_ms: step.duration_ms,
1029            cost_usd: step.cost_usd,
1030            input_tokens: step.input_tokens,
1031            output_tokens: step.output_tokens,
1032            model: None,
1033            debug_messages: None,
1034        };
1035        self.total_cost_usd += output.cost_usd;
1036        self.total_duration_ms += output.duration_ms;
1037        self.last_step_ids = vec![step.id];
1038        info!(
1039            run_id = %self.run_id,
1040            step = %step.name,
1041            position,
1042            "step replayed from previous execution"
1043        );
1044        Some(output)
1045    }
1046
1047    /// Internal: execute a step with full persistence lifecycle.
1048    async fn execute_step(
1049        &mut self,
1050        name: &str,
1051        kind: StepKind,
1052        config: StepConfig,
1053    ) -> Result<StepOutput, EngineError> {
1054        let position = self.position;
1055        self.position += 1;
1056
1057        // Replay: if this step already completed in a prior execution, return cached output.
1058        if let Some(output) = self.try_replay_step(position) {
1059            return Ok(output);
1060        }
1061
1062        // Create step record in Pending.
1063        let step = self
1064            .store
1065            .create_step(NewStep {
1066                run_id: self.run_id,
1067                name: name.to_string(),
1068                kind,
1069                position,
1070                input: Some(serde_json::to_value(&config)?),
1071            })
1072            .await?;
1073
1074        self.start_step(step.id, Utc::now()).await?;
1075
1076        let step_log_sender = self
1077            .log_sender
1078            .as_ref()
1079            .map(|s| StepLogSender::new(s.clone(), self.run_id, step.id, name.to_string()));
1080
1081        match execute_step_config(&config, &self.provider, step_log_sender).await {
1082            Ok(output) => {
1083                self.total_cost_usd += output.cost_usd;
1084                self.total_duration_ms += output.duration_ms;
1085
1086                let debug_messages_json = output.debug_messages_json();
1087
1088                let completed_at = Utc::now();
1089                self.store
1090                    .update_step(
1091                        step.id,
1092                        StepUpdate {
1093                            status: Some(StepStatus::Completed),
1094                            output: Some(output.output.clone()),
1095                            duration_ms: Some(output.duration_ms),
1096                            cost_usd: Some(output.cost_usd),
1097                            input_tokens: output.input_tokens,
1098                            output_tokens: output.output_tokens,
1099                            completed_at: Some(completed_at),
1100                            debug_messages: debug_messages_json,
1101                            ..StepUpdate::default()
1102                        },
1103                    )
1104                    .await?;
1105
1106                info!(
1107                    run_id = %self.run_id,
1108                    step = %name,
1109                    duration_ms = output.duration_ms,
1110                    "step completed"
1111                );
1112
1113                self.last_step_ids = vec![step.id];
1114
1115                Ok(output)
1116            }
1117            Err(err) => {
1118                let completed_at = Utc::now();
1119                let debug_messages_json = extract_debug_messages_from_error(&err);
1120                let partial = extract_partial_usage_from_error(&err);
1121                let raw_response_output = extract_raw_response_from_error(&err);
1122
1123                if let Some(ref usage) = partial {
1124                    if let Some(cost) = usage.cost_usd {
1125                        self.total_cost_usd += cost;
1126                    }
1127                    if let Some(dur) = usage.duration_ms {
1128                        self.total_duration_ms += dur;
1129                    }
1130                }
1131
1132                if let Err(store_err) = self
1133                    .store
1134                    .update_step(
1135                        step.id,
1136                        StepUpdate {
1137                            status: Some(StepStatus::Failed),
1138                            error: Some(err.to_string()),
1139                            output: raw_response_output,
1140                            completed_at: Some(completed_at),
1141                            debug_messages: debug_messages_json,
1142                            duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1143                            cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1144                            input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1145                            output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1146                            ..StepUpdate::default()
1147                        },
1148                    )
1149                    .await
1150                {
1151                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1152                }
1153
1154                Err(err)
1155            }
1156        }
1157    }
1158
1159    /// Record dependency edges and transition a step to Running.
1160    ///
1161    /// Records edges from `step_id` to all `last_step_ids`, then
1162    /// transitions the step to `Running` with the given timestamp.
1163    async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1164        if !self.last_step_ids.is_empty() {
1165            let deps: Vec<NewStepDependency> = self
1166                .last_step_ids
1167                .iter()
1168                .map(|&depends_on| NewStepDependency {
1169                    step_id,
1170                    depends_on,
1171                })
1172                .collect();
1173            self.store.create_step_dependencies(deps).await?;
1174        }
1175
1176        self.store
1177            .update_step(
1178                step_id,
1179                StepUpdate {
1180                    status: Some(StepStatus::Running),
1181                    started_at: Some(now),
1182                    ..StepUpdate::default()
1183                },
1184            )
1185            .await?;
1186
1187        Ok(())
1188    }
1189
1190    /// Access the store directly (advanced usage).
1191    pub fn store(&self) -> &Arc<dyn Store> {
1192        &self.store
1193    }
1194
1195    /// Access the payload that triggered this run.
1196    ///
1197    /// Fetches the run from the store and returns its payload.
1198    ///
1199    /// # Errors
1200    ///
1201    /// Returns [`EngineError::Store`] if the run is not found.
1202    pub async fn payload(&self) -> Result<Value, EngineError> {
1203        let run = self
1204            .store
1205            .get_run(self.run_id)
1206            .await?
1207            .ok_or(EngineError::Store(
1208                ironflow_store::error::StoreError::RunNotFound(self.run_id),
1209            ))?;
1210        Ok(run.payload)
1211    }
1212}
1213
1214impl fmt::Debug for WorkflowContext {
1215    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216        f.debug_struct("WorkflowContext")
1217            .field("run_id", &self.run_id)
1218            .field("position", &self.position)
1219            .field("total_cost_usd", &self.total_cost_usd)
1220            .finish_non_exhaustive()
1221    }
1222}
1223
1224/// Extract debug messages from an engine error, if it wraps a schema validation
1225/// failure that carries a verbose conversation trace.
1226fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1227    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1228        debug_messages,
1229        ..
1230    })) = err
1231        && !debug_messages.is_empty()
1232    {
1233        return serde_json::to_value(debug_messages).ok();
1234    }
1235    None
1236}
1237
1238/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1239///
1240/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1241/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1242/// happens here, at the engine/store boundary.
1243struct StepPartialUsage {
1244    cost_usd: Option<Decimal>,
1245    duration_ms: Option<u64>,
1246    input_tokens: Option<u64>,
1247    output_tokens: Option<u64>,
1248}
1249
1250/// Extract the raw response text from a schema validation error.
1251///
1252/// When the agent produced text but structured output extraction failed,
1253/// this returns the truncated raw text so it can be persisted as the
1254/// step output for dashboard visibility.
1255fn extract_raw_response_from_error(err: &EngineError) -> Option<Value> {
1256    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1257        raw_response: Some(text),
1258        ..
1259    })) = err
1260    {
1261        return Some(Value::String(text.clone()));
1262    }
1263    None
1264}
1265
1266fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1267    if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1268        partial_usage,
1269        ..
1270    })) = err
1271        && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1272    {
1273        return Some(StepPartialUsage {
1274            cost_usd: partial_usage
1275                .cost_usd
1276                .and_then(|c| Decimal::try_from(c).ok()),
1277            duration_ms: partial_usage.duration_ms,
1278            input_tokens: partial_usage.input_tokens,
1279            output_tokens: partial_usage.output_tokens,
1280        });
1281    }
1282    None
1283}