ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42 StepUpdate, TriggerKind,
43};
44use ironflow_store::store::RunStore;
45
46use crate::config::{
47 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::operation::Operation;
53
54/// Callback type for resolving workflow handlers by name.
55pub(crate) type HandlerResolver =
56 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
57
58/// Execution context for a single workflow run.
59///
60/// Tracks the current step position and provides convenience methods
61/// for executing operations with automatic persistence.
62///
63/// # Examples
64///
65/// ```no_run
66/// use ironflow_engine::context::WorkflowContext;
67/// use ironflow_engine::config::ShellConfig;
68/// use ironflow_engine::error::EngineError;
69///
70/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
71/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
72/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
73/// # Ok(())
74/// # }
75/// ```
76pub struct WorkflowContext {
77 run_id: Uuid,
78 store: Arc<dyn RunStore>,
79 provider: Arc<dyn AgentProvider>,
80 handler_resolver: Option<HandlerResolver>,
81 position: u32,
82 /// IDs of the last executed step(s) -- used to record DAG dependencies.
83 last_step_ids: Vec<Uuid>,
84 /// Accumulated cost across all steps in this run.
85 total_cost_usd: Decimal,
86 /// Accumulated duration across all steps.
87 total_duration_ms: u64,
88 /// Steps from a previous execution, keyed by position.
89 /// Used when resuming after approval to replay completed steps.
90 replay_steps: HashMap<u32, Step>,
91}
92
93impl WorkflowContext {
94 /// Create a new context for a run.
95 ///
96 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
97 /// creates this when executing a [`WorkflowHandler`].
98 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
99 Self {
100 run_id,
101 store,
102 provider,
103 handler_resolver: None,
104 position: 0,
105 last_step_ids: Vec::new(),
106 total_cost_usd: Decimal::ZERO,
107 total_duration_ms: 0,
108 replay_steps: HashMap::new(),
109 }
110 }
111
112 /// Create a new context with a handler resolver for sub-workflow support.
113 ///
114 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
115 /// look up registered handlers by name.
116 pub(crate) fn with_handler_resolver(
117 run_id: Uuid,
118 store: Arc<dyn RunStore>,
119 provider: Arc<dyn AgentProvider>,
120 resolver: HandlerResolver,
121 ) -> Self {
122 Self {
123 run_id,
124 store,
125 provider,
126 handler_resolver: Some(resolver),
127 position: 0,
128 last_step_ids: Vec::new(),
129 total_cost_usd: Decimal::ZERO,
130 total_duration_ms: 0,
131 replay_steps: HashMap::new(),
132 }
133 }
134
135 /// Load existing steps from the store for replay after approval.
136 ///
137 /// Called by the engine when resuming a run. All completed steps
138 /// and the approved approval step are indexed by position so that
139 /// `execute_step` and `approval` can skip them.
140 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
141 let steps = self.store.list_steps(self.run_id).await?;
142 for step in steps {
143 let dominated = matches!(
144 step.status.state,
145 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
146 );
147 if dominated {
148 self.replay_steps.insert(step.position, step);
149 }
150 }
151 Ok(())
152 }
153
154 /// The run ID this context is executing for.
155 pub fn run_id(&self) -> Uuid {
156 self.run_id
157 }
158
159 /// Accumulated cost across all executed steps so far.
160 pub fn total_cost_usd(&self) -> Decimal {
161 self.total_cost_usd
162 }
163
164 /// Accumulated duration across all executed steps so far.
165 pub fn total_duration_ms(&self) -> u64 {
166 self.total_duration_ms
167 }
168
169 /// Execute multiple steps concurrently (wait-all model).
170 ///
171 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
172 /// Each step is recorded with the same `position` (execution wave).
173 /// Dependencies on previous steps are recorded automatically.
174 ///
175 /// When `fail_fast` is true, remaining steps are aborted on the first
176 /// failure. When false, all steps run to completion and the first
177 /// error is returned.
178 ///
179 /// # Errors
180 ///
181 /// Returns [`EngineError`] if any step fails.
182 ///
183 /// # Examples
184 ///
185 /// ```no_run
186 /// use ironflow_engine::context::WorkflowContext;
187 /// use ironflow_engine::config::{StepConfig, ShellConfig};
188 /// use ironflow_engine::error::EngineError;
189 ///
190 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
191 /// let results = ctx.parallel(
192 /// vec![
193 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
194 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
195 /// ],
196 /// true,
197 /// ).await?;
198 ///
199 /// for r in &results {
200 /// println!("{}: {:?}", r.name, r.output.output);
201 /// }
202 /// # Ok(())
203 /// # }
204 /// ```
205 pub async fn parallel(
206 &mut self,
207 steps: Vec<(&str, StepConfig)>,
208 fail_fast: bool,
209 ) -> Result<Vec<ParallelStepResult>, EngineError> {
210 if steps.is_empty() {
211 return Ok(Vec::new());
212 }
213
214 let wave_position = self.position;
215 self.position += 1;
216
217 let now = Utc::now();
218 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
219
220 for (name, config) in &steps {
221 let kind = config.kind();
222 let step = self
223 .store
224 .create_step(NewStep {
225 run_id: self.run_id,
226 name: name.to_string(),
227 kind,
228 position: wave_position,
229 input: Some(serde_json::to_value(config)?),
230 })
231 .await?;
232
233 self.start_step(step.id, now).await?;
234
235 step_records.push((step.id, name.to_string(), config.clone()));
236 }
237
238 let mut join_set = JoinSet::new();
239 for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
240 let provider = self.provider.clone();
241 let config = config.clone();
242 join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
243 }
244
245 // JoinSet returns in completion order; indexed_results restores input order.
246 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
247 vec![None; step_records.len()];
248 let mut first_error: Option<EngineError> = None;
249
250 while let Some(join_result) = join_set.join_next().await {
251 let (idx, step_result) = match join_result {
252 Ok(r) => r,
253 Err(e) => {
254 if first_error.is_none() {
255 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
256 }
257 if fail_fast {
258 join_set.abort_all();
259 }
260 continue;
261 }
262 };
263
264 let (step_id, step_name, _) = &step_records[idx];
265 let completed_at = Utc::now();
266
267 match step_result {
268 Ok(output) => {
269 self.total_cost_usd += output.cost_usd;
270 self.total_duration_ms += output.duration_ms;
271
272 let debug_messages_json = output.debug_messages_json();
273
274 self.store
275 .update_step(
276 *step_id,
277 StepUpdate {
278 status: Some(StepStatus::Completed),
279 output: Some(output.output.clone()),
280 duration_ms: Some(output.duration_ms),
281 cost_usd: Some(output.cost_usd),
282 input_tokens: output.input_tokens,
283 output_tokens: output.output_tokens,
284 completed_at: Some(completed_at),
285 debug_messages: debug_messages_json,
286 ..StepUpdate::default()
287 },
288 )
289 .await?;
290
291 info!(
292 run_id = %self.run_id,
293 step = %step_name,
294 duration_ms = output.duration_ms,
295 "parallel step completed"
296 );
297
298 indexed_results[idx] = Some(Ok(output));
299 }
300 Err(err) => {
301 let err_msg = err.to_string();
302 let debug_messages_json = extract_debug_messages_from_error(&err);
303
304 if let Err(store_err) = self
305 .store
306 .update_step(
307 *step_id,
308 StepUpdate {
309 status: Some(StepStatus::Failed),
310 error: Some(err_msg.clone()),
311 completed_at: Some(completed_at),
312 debug_messages: debug_messages_json,
313 ..StepUpdate::default()
314 },
315 )
316 .await
317 {
318 tracing::error!(
319 step_id = %step_id,
320 error = %store_err,
321 "failed to persist parallel step failure"
322 );
323 }
324
325 indexed_results[idx] = Some(Err(err_msg.clone()));
326
327 if first_error.is_none() {
328 first_error = Some(err);
329 }
330
331 if fail_fast {
332 join_set.abort_all();
333 }
334 }
335 }
336 }
337
338 if let Some(err) = first_error {
339 return Err(err);
340 }
341
342 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
343
344 // Build results in original order.
345 let results: Vec<ParallelStepResult> = step_records
346 .iter()
347 .enumerate()
348 .map(|(idx, (step_id, name, _))| {
349 let output = match indexed_results[idx].take() {
350 Some(Ok(o)) => o,
351 _ => unreachable!("all steps succeeded if no error returned"),
352 };
353 ParallelStepResult {
354 name: name.clone(),
355 output,
356 step_id: *step_id,
357 }
358 })
359 .collect();
360
361 Ok(results)
362 }
363
364 /// Execute a shell step.
365 ///
366 /// Creates the step record, runs the command, persists the result,
367 /// and returns the output for use in subsequent steps.
368 ///
369 /// # Errors
370 ///
371 /// Returns [`EngineError`] if the command fails or the store errors.
372 ///
373 /// # Examples
374 ///
375 /// ```no_run
376 /// use ironflow_engine::context::WorkflowContext;
377 /// use ironflow_engine::config::ShellConfig;
378 /// use ironflow_engine::error::EngineError;
379 ///
380 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
381 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
382 /// println!("stdout: {}", files.output["stdout"]);
383 /// # Ok(())
384 /// # }
385 /// ```
386 pub async fn shell(
387 &mut self,
388 name: &str,
389 config: ShellConfig,
390 ) -> Result<StepOutput, EngineError> {
391 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
392 .await
393 }
394
395 /// Execute an HTTP step.
396 ///
397 /// # Errors
398 ///
399 /// Returns [`EngineError`] if the request fails or the store errors.
400 ///
401 /// # Examples
402 ///
403 /// ```no_run
404 /// use ironflow_engine::context::WorkflowContext;
405 /// use ironflow_engine::config::HttpConfig;
406 /// use ironflow_engine::error::EngineError;
407 ///
408 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
409 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
410 /// println!("status: {}", resp.output["status"]);
411 /// # Ok(())
412 /// # }
413 /// ```
414 pub async fn http(
415 &mut self,
416 name: &str,
417 config: HttpConfig,
418 ) -> Result<StepOutput, EngineError> {
419 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
420 .await
421 }
422
423 /// Execute an agent step.
424 ///
425 /// # Errors
426 ///
427 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
428 ///
429 /// # Examples
430 ///
431 /// ```no_run
432 /// use ironflow_engine::context::WorkflowContext;
433 /// use ironflow_engine::config::AgentStepConfig;
434 /// use ironflow_engine::error::EngineError;
435 ///
436 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
437 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
438 /// println!("review: {}", review.output["value"]);
439 /// # Ok(())
440 /// # }
441 /// ```
442 pub async fn agent(
443 &mut self,
444 name: &str,
445 config: impl Into<AgentStepConfig>,
446 ) -> Result<StepOutput, EngineError> {
447 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
448 .await
449 }
450
451 /// Create a human approval gate.
452 ///
453 /// On first execution, records an approval step and returns
454 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
455 /// transitions the run to `AwaitingApproval`.
456 ///
457 /// On resume (after a human approved via the API), the approval step
458 /// is replayed: it is marked as `Completed` and execution continues
459 /// past it. Multiple approval gates in the same handler work -- each
460 /// one pauses and resumes independently.
461 ///
462 /// # Errors
463 ///
464 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
465 /// first execution. Returns other [`EngineError`] variants on store
466 /// failures.
467 ///
468 /// # Examples
469 ///
470 /// ```no_run
471 /// use ironflow_engine::context::WorkflowContext;
472 /// use ironflow_engine::config::ApprovalConfig;
473 /// use ironflow_engine::error::EngineError;
474 ///
475 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
476 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
477 /// // Execution continues here after approval
478 /// # Ok(())
479 /// # }
480 /// ```
481 pub async fn approval(
482 &mut self,
483 name: &str,
484 config: ApprovalConfig,
485 ) -> Result<(), EngineError> {
486 let position = self.position;
487 self.position += 1;
488
489 // Replay: if this approval step exists from a prior execution,
490 // the run was approved -- mark it completed (if not already) and continue.
491 if let Some(existing) = self.replay_steps.get(&position)
492 && existing.kind == StepKind::Approval
493 {
494 if existing.status.state == StepStatus::AwaitingApproval {
495 self.store
496 .update_step(
497 existing.id,
498 StepUpdate {
499 status: Some(StepStatus::Completed),
500 completed_at: Some(Utc::now()),
501 ..StepUpdate::default()
502 },
503 )
504 .await?;
505 }
506
507 self.last_step_ids = vec![existing.id];
508 info!(
509 run_id = %self.run_id,
510 step = %name,
511 position,
512 "approval step replayed (approved)"
513 );
514 return Ok(());
515 }
516
517 // First execution: create the approval step and suspend.
518 let step = self
519 .store
520 .create_step(NewStep {
521 run_id: self.run_id,
522 name: name.to_string(),
523 kind: StepKind::Approval,
524 position,
525 input: Some(serde_json::to_value(&config)?),
526 })
527 .await?;
528
529 self.start_step(step.id, Utc::now()).await?;
530
531 // Transition the step to AwaitingApproval so it reflects
532 // the suspended state on the dashboard.
533 self.store
534 .update_step(
535 step.id,
536 StepUpdate {
537 status: Some(StepStatus::AwaitingApproval),
538 ..StepUpdate::default()
539 },
540 )
541 .await?;
542
543 self.last_step_ids = vec![step.id];
544
545 Err(EngineError::ApprovalRequired {
546 run_id: self.run_id,
547 step_id: step.id,
548 message: config.message().to_string(),
549 })
550 }
551
552 /// Execute a custom operation step.
553 ///
554 /// Runs a user-defined [`Operation`] with full step lifecycle management:
555 /// creates the step record, transitions to Running, executes the operation,
556 /// persists the output and duration, and marks the step Completed or Failed.
557 ///
558 /// The operation's [`kind()`](Operation::kind) is stored as
559 /// [`StepKind::Custom`].
560 ///
561 /// # Errors
562 ///
563 /// Returns [`EngineError`] if the operation fails or the store errors.
564 ///
565 /// # Examples
566 ///
567 /// ```no_run
568 /// use ironflow_engine::context::WorkflowContext;
569 /// use ironflow_engine::operation::Operation;
570 /// use ironflow_engine::error::EngineError;
571 /// use serde_json::{Value, json};
572 /// use std::pin::Pin;
573 /// use std::future::Future;
574 ///
575 /// struct MyOp;
576 /// impl Operation for MyOp {
577 /// fn kind(&self) -> &str { "my-service" }
578 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
579 /// Box::pin(async { Ok(json!({"ok": true})) })
580 /// }
581 /// }
582 ///
583 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
584 /// let result = ctx.operation("call-service", &MyOp).await?;
585 /// println!("output: {}", result.output);
586 /// # Ok(())
587 /// # }
588 /// ```
589 pub async fn operation(
590 &mut self,
591 name: &str,
592 op: &dyn Operation,
593 ) -> Result<StepOutput, EngineError> {
594 let kind = StepKind::Custom(op.kind().to_string());
595 let position = self.position;
596 self.position += 1;
597
598 let step = self
599 .store
600 .create_step(NewStep {
601 run_id: self.run_id,
602 name: name.to_string(),
603 kind,
604 position,
605 input: op.input(),
606 })
607 .await?;
608
609 self.start_step(step.id, Utc::now()).await?;
610
611 let start = Instant::now();
612
613 match op.execute().await {
614 Ok(output_value) => {
615 let duration_ms = start.elapsed().as_millis() as u64;
616 self.total_duration_ms += duration_ms;
617
618 let completed_at = Utc::now();
619 self.store
620 .update_step(
621 step.id,
622 StepUpdate {
623 status: Some(StepStatus::Completed),
624 output: Some(output_value.clone()),
625 duration_ms: Some(duration_ms),
626 cost_usd: Some(Decimal::ZERO),
627 completed_at: Some(completed_at),
628 ..StepUpdate::default()
629 },
630 )
631 .await?;
632
633 info!(
634 run_id = %self.run_id,
635 step = %name,
636 kind = op.kind(),
637 duration_ms,
638 "operation step completed"
639 );
640
641 self.last_step_ids = vec![step.id];
642
643 Ok(StepOutput {
644 output: output_value,
645 duration_ms,
646 cost_usd: Decimal::ZERO,
647 input_tokens: None,
648 output_tokens: None,
649 debug_messages: None,
650 })
651 }
652 Err(err) => {
653 let completed_at = Utc::now();
654 if let Err(store_err) = self
655 .store
656 .update_step(
657 step.id,
658 StepUpdate {
659 status: Some(StepStatus::Failed),
660 error: Some(err.to_string()),
661 completed_at: Some(completed_at),
662 ..StepUpdate::default()
663 },
664 )
665 .await
666 {
667 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
668 }
669
670 Err(err)
671 }
672 }
673 }
674
675 /// Execute a sub-workflow step.
676 ///
677 /// Creates a child run for the named workflow handler, executes it with
678 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
679 /// the child run ID and aggregated metrics.
680 ///
681 /// Requires the context to be created with
682 /// `with_handler_resolver`.
683 ///
684 /// # Errors
685 ///
686 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
687 /// with the given name, or if no handler resolver is available.
688 ///
689 /// # Examples
690 ///
691 /// ```no_run
692 /// use ironflow_engine::context::WorkflowContext;
693 /// use ironflow_engine::error::EngineError;
694 /// use serde_json::json;
695 ///
696 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
697 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
698 /// # Ok(())
699 /// # }
700 /// ```
701 pub async fn workflow(
702 &mut self,
703 handler: &dyn WorkflowHandler,
704 payload: Value,
705 ) -> Result<StepOutput, EngineError> {
706 let config = WorkflowStepConfig::new(handler.name(), payload);
707 let position = self.position;
708 self.position += 1;
709
710 let step = self
711 .store
712 .create_step(NewStep {
713 run_id: self.run_id,
714 name: config.workflow_name.clone(),
715 kind: StepKind::Workflow,
716 position,
717 input: Some(serde_json::to_value(&config)?),
718 })
719 .await?;
720
721 self.start_step(step.id, Utc::now()).await?;
722
723 match self.execute_child_workflow(&config).await {
724 Ok(output) => {
725 self.total_cost_usd += output.cost_usd;
726 self.total_duration_ms += output.duration_ms;
727
728 let completed_at = Utc::now();
729 self.store
730 .update_step(
731 step.id,
732 StepUpdate {
733 status: Some(StepStatus::Completed),
734 output: Some(output.output.clone()),
735 duration_ms: Some(output.duration_ms),
736 cost_usd: Some(output.cost_usd),
737 completed_at: Some(completed_at),
738 ..StepUpdate::default()
739 },
740 )
741 .await?;
742
743 info!(
744 run_id = %self.run_id,
745 child_workflow = %config.workflow_name,
746 duration_ms = output.duration_ms,
747 "workflow step completed"
748 );
749
750 self.last_step_ids = vec![step.id];
751
752 Ok(output)
753 }
754 Err(err) => {
755 let completed_at = Utc::now();
756 if let Err(store_err) = self
757 .store
758 .update_step(
759 step.id,
760 StepUpdate {
761 status: Some(StepStatus::Failed),
762 error: Some(err.to_string()),
763 completed_at: Some(completed_at),
764 ..StepUpdate::default()
765 },
766 )
767 .await
768 {
769 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
770 }
771
772 Err(err)
773 }
774 }
775 }
776
777 /// Execute a child workflow and return aggregated output.
778 async fn execute_child_workflow(
779 &self,
780 config: &WorkflowStepConfig,
781 ) -> Result<StepOutput, EngineError> {
782 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
783 EngineError::InvalidWorkflow(
784 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
785 )
786 })?;
787
788 let handler = resolver(&config.workflow_name).ok_or_else(|| {
789 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
790 })?;
791
792 let child_run = self
793 .store
794 .create_run(NewRun {
795 workflow_name: config.workflow_name.clone(),
796 trigger: TriggerKind::Workflow,
797 payload: config.payload.clone(),
798 max_retries: 0,
799 })
800 .await?;
801
802 let child_run_id = child_run.id;
803 info!(
804 parent_run_id = %self.run_id,
805 child_run_id = %child_run_id,
806 workflow = %config.workflow_name,
807 "child run created"
808 );
809
810 self.store
811 .update_run_status(child_run_id, RunStatus::Running)
812 .await?;
813
814 let run_start = Instant::now();
815 let mut child_ctx = WorkflowContext {
816 run_id: child_run_id,
817 store: self.store.clone(),
818 provider: self.provider.clone(),
819 handler_resolver: self.handler_resolver.clone(),
820 position: 0,
821 last_step_ids: Vec::new(),
822 total_cost_usd: Decimal::ZERO,
823 total_duration_ms: 0,
824 replay_steps: HashMap::new(),
825 };
826
827 let result = handler.execute(&mut child_ctx).await;
828 let total_duration = run_start.elapsed().as_millis() as u64;
829 let completed_at = Utc::now();
830
831 match result {
832 Ok(()) => {
833 self.store
834 .update_run(
835 child_run_id,
836 RunUpdate {
837 status: Some(RunStatus::Completed),
838 cost_usd: Some(child_ctx.total_cost_usd),
839 duration_ms: Some(total_duration),
840 completed_at: Some(completed_at),
841 ..RunUpdate::default()
842 },
843 )
844 .await?;
845
846 Ok(StepOutput {
847 output: serde_json::json!({
848 "run_id": child_run_id,
849 "workflow_name": config.workflow_name,
850 "status": RunStatus::Completed,
851 "cost_usd": child_ctx.total_cost_usd,
852 "duration_ms": total_duration,
853 }),
854 duration_ms: total_duration,
855 cost_usd: child_ctx.total_cost_usd,
856 input_tokens: None,
857 output_tokens: None,
858 debug_messages: None,
859 })
860 }
861 Err(err) => {
862 if let Err(store_err) = self
863 .store
864 .update_run(
865 child_run_id,
866 RunUpdate {
867 status: Some(RunStatus::Failed),
868 error: Some(err.to_string()),
869 cost_usd: Some(child_ctx.total_cost_usd),
870 duration_ms: Some(total_duration),
871 completed_at: Some(completed_at),
872 ..RunUpdate::default()
873 },
874 )
875 .await
876 {
877 error!(
878 child_run_id = %child_run_id,
879 store_error = %store_err,
880 "failed to persist child run failure"
881 );
882 }
883
884 Err(err)
885 }
886 }
887 }
888
889 /// Try to replay a completed step from a previous execution.
890 ///
891 /// Returns `Some(StepOutput)` if a completed step exists at the given
892 /// position, `None` otherwise.
893 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
894 let step = self.replay_steps.get(&position)?;
895 if step.status.state != StepStatus::Completed {
896 return None;
897 }
898 let output = StepOutput {
899 output: step.output.clone().unwrap_or(Value::Null),
900 duration_ms: step.duration_ms,
901 cost_usd: step.cost_usd,
902 input_tokens: step.input_tokens,
903 output_tokens: step.output_tokens,
904 debug_messages: None,
905 };
906 self.total_cost_usd += output.cost_usd;
907 self.total_duration_ms += output.duration_ms;
908 self.last_step_ids = vec![step.id];
909 info!(
910 run_id = %self.run_id,
911 step = %step.name,
912 position,
913 "step replayed from previous execution"
914 );
915 Some(output)
916 }
917
918 /// Internal: execute a step with full persistence lifecycle.
919 async fn execute_step(
920 &mut self,
921 name: &str,
922 kind: StepKind,
923 config: StepConfig,
924 ) -> Result<StepOutput, EngineError> {
925 let position = self.position;
926 self.position += 1;
927
928 // Replay: if this step already completed in a prior execution, return cached output.
929 if let Some(output) = self.try_replay_step(position) {
930 return Ok(output);
931 }
932
933 // Create step record in Pending.
934 let step = self
935 .store
936 .create_step(NewStep {
937 run_id: self.run_id,
938 name: name.to_string(),
939 kind,
940 position,
941 input: Some(serde_json::to_value(&config)?),
942 })
943 .await?;
944
945 self.start_step(step.id, Utc::now()).await?;
946
947 match execute_step_config(&config, &self.provider).await {
948 Ok(output) => {
949 self.total_cost_usd += output.cost_usd;
950 self.total_duration_ms += output.duration_ms;
951
952 let debug_messages_json = output.debug_messages_json();
953
954 let completed_at = Utc::now();
955 self.store
956 .update_step(
957 step.id,
958 StepUpdate {
959 status: Some(StepStatus::Completed),
960 output: Some(output.output.clone()),
961 duration_ms: Some(output.duration_ms),
962 cost_usd: Some(output.cost_usd),
963 input_tokens: output.input_tokens,
964 output_tokens: output.output_tokens,
965 completed_at: Some(completed_at),
966 debug_messages: debug_messages_json,
967 ..StepUpdate::default()
968 },
969 )
970 .await?;
971
972 info!(
973 run_id = %self.run_id,
974 step = %name,
975 duration_ms = output.duration_ms,
976 "step completed"
977 );
978
979 self.last_step_ids = vec![step.id];
980
981 Ok(output)
982 }
983 Err(err) => {
984 let completed_at = Utc::now();
985 let debug_messages_json = extract_debug_messages_from_error(&err);
986
987 if let Err(store_err) = self
988 .store
989 .update_step(
990 step.id,
991 StepUpdate {
992 status: Some(StepStatus::Failed),
993 error: Some(err.to_string()),
994 completed_at: Some(completed_at),
995 debug_messages: debug_messages_json,
996 ..StepUpdate::default()
997 },
998 )
999 .await
1000 {
1001 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1002 }
1003
1004 Err(err)
1005 }
1006 }
1007 }
1008
1009 /// Record dependency edges and transition a step to Running.
1010 ///
1011 /// Records edges from `step_id` to all `last_step_ids`, then
1012 /// transitions the step to `Running` with the given timestamp.
1013 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1014 if !self.last_step_ids.is_empty() {
1015 let deps: Vec<NewStepDependency> = self
1016 .last_step_ids
1017 .iter()
1018 .map(|&depends_on| NewStepDependency {
1019 step_id,
1020 depends_on,
1021 })
1022 .collect();
1023 self.store.create_step_dependencies(deps).await?;
1024 }
1025
1026 self.store
1027 .update_step(
1028 step_id,
1029 StepUpdate {
1030 status: Some(StepStatus::Running),
1031 started_at: Some(now),
1032 ..StepUpdate::default()
1033 },
1034 )
1035 .await?;
1036
1037 Ok(())
1038 }
1039
1040 /// Access the store directly (advanced usage).
1041 pub fn store(&self) -> &Arc<dyn RunStore> {
1042 &self.store
1043 }
1044
1045 /// Access the payload that triggered this run.
1046 ///
1047 /// Fetches the run from the store and returns its payload.
1048 ///
1049 /// # Errors
1050 ///
1051 /// Returns [`EngineError::Store`] if the run is not found.
1052 pub async fn payload(&self) -> Result<Value, EngineError> {
1053 let run = self
1054 .store
1055 .get_run(self.run_id)
1056 .await?
1057 .ok_or(EngineError::Store(
1058 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1059 ))?;
1060 Ok(run.payload)
1061 }
1062}
1063
1064impl fmt::Debug for WorkflowContext {
1065 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1066 f.debug_struct("WorkflowContext")
1067 .field("run_id", &self.run_id)
1068 .field("position", &self.position)
1069 .field("total_cost_usd", &self.total_cost_usd)
1070 .finish_non_exhaustive()
1071 }
1072}
1073
1074/// Extract debug messages from an engine error, if it wraps a schema validation
1075/// failure that carries a verbose conversation trace.
1076fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1077 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1078 debug_messages,
1079 ..
1080 })) = err
1081 && !debug_messages.is_empty()
1082 {
1083 return serde_json::to_value(debug_messages).ok();
1084 }
1085 None
1086}