ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::error::{AgentError, OperationError};
38use ironflow_core::provider::AgentProvider;
39use ironflow_store::models::{
40 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
41 StepUpdate, TriggerKind,
42};
43use ironflow_store::store::RunStore;
44
45use crate::config::{
46 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
47};
48use crate::error::EngineError;
49use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
50use crate::handler::WorkflowHandler;
51use crate::operation::Operation;
52
53/// Callback type for resolving workflow handlers by name.
54pub(crate) type HandlerResolver =
55 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
56
57/// Execution context for a single workflow run.
58///
59/// Tracks the current step position and provides convenience methods
60/// for executing operations with automatic persistence.
61///
62/// # Examples
63///
64/// ```no_run
65/// use ironflow_engine::context::WorkflowContext;
66/// use ironflow_engine::config::ShellConfig;
67/// use ironflow_engine::error::EngineError;
68///
69/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
70/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
71/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
72/// # Ok(())
73/// # }
74/// ```
75pub struct WorkflowContext {
76 run_id: Uuid,
77 store: Arc<dyn RunStore>,
78 provider: Arc<dyn AgentProvider>,
79 handler_resolver: Option<HandlerResolver>,
80 position: u32,
81 /// IDs of the last executed step(s) -- used to record DAG dependencies.
82 last_step_ids: Vec<Uuid>,
83 /// Accumulated cost across all steps in this run.
84 total_cost_usd: Decimal,
85 /// Accumulated duration across all steps.
86 total_duration_ms: u64,
87 /// Steps from a previous execution, keyed by position.
88 /// Used when resuming after approval to replay completed steps.
89 replay_steps: std::collections::HashMap<u32, Step>,
90}
91
92impl WorkflowContext {
93 /// Create a new context for a run.
94 ///
95 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
96 /// creates this when executing a [`WorkflowHandler`].
97 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
98 Self {
99 run_id,
100 store,
101 provider,
102 handler_resolver: None,
103 position: 0,
104 last_step_ids: Vec::new(),
105 total_cost_usd: Decimal::ZERO,
106 total_duration_ms: 0,
107 replay_steps: std::collections::HashMap::new(),
108 }
109 }
110
111 /// Create a new context with a handler resolver for sub-workflow support.
112 ///
113 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
114 /// look up registered handlers by name.
115 pub(crate) fn with_handler_resolver(
116 run_id: Uuid,
117 store: Arc<dyn RunStore>,
118 provider: Arc<dyn AgentProvider>,
119 resolver: HandlerResolver,
120 ) -> Self {
121 Self {
122 run_id,
123 store,
124 provider,
125 handler_resolver: Some(resolver),
126 position: 0,
127 last_step_ids: Vec::new(),
128 total_cost_usd: Decimal::ZERO,
129 total_duration_ms: 0,
130 replay_steps: std::collections::HashMap::new(),
131 }
132 }
133
134 /// Load existing steps from the store for replay after approval.
135 ///
136 /// Called by the engine when resuming a run. All completed steps
137 /// and the approved approval step are indexed by position so that
138 /// `execute_step` and `approval` can skip them.
139 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
140 let steps = self.store.list_steps(self.run_id).await?;
141 for step in steps {
142 let dominated = matches!(
143 step.status.state,
144 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
145 );
146 if dominated {
147 self.replay_steps.insert(step.position, step);
148 }
149 }
150 Ok(())
151 }
152
153 /// The run ID this context is executing for.
154 pub fn run_id(&self) -> Uuid {
155 self.run_id
156 }
157
158 /// Accumulated cost across all executed steps so far.
159 pub fn total_cost_usd(&self) -> Decimal {
160 self.total_cost_usd
161 }
162
163 /// Accumulated duration across all executed steps so far.
164 pub fn total_duration_ms(&self) -> u64 {
165 self.total_duration_ms
166 }
167
168 /// Execute multiple steps concurrently (wait-all model).
169 ///
170 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
171 /// Each step is recorded with the same `position` (execution wave).
172 /// Dependencies on previous steps are recorded automatically.
173 ///
174 /// When `fail_fast` is true, remaining steps are aborted on the first
175 /// failure. When false, all steps run to completion and the first
176 /// error is returned.
177 ///
178 /// # Errors
179 ///
180 /// Returns [`EngineError`] if any step fails.
181 ///
182 /// # Examples
183 ///
184 /// ```no_run
185 /// use ironflow_engine::context::WorkflowContext;
186 /// use ironflow_engine::config::{StepConfig, ShellConfig};
187 /// use ironflow_engine::error::EngineError;
188 ///
189 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
190 /// let results = ctx.parallel(
191 /// vec![
192 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
193 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
194 /// ],
195 /// true,
196 /// ).await?;
197 ///
198 /// for r in &results {
199 /// println!("{}: {:?}", r.name, r.output.output);
200 /// }
201 /// # Ok(())
202 /// # }
203 /// ```
204 pub async fn parallel(
205 &mut self,
206 steps: Vec<(&str, StepConfig)>,
207 fail_fast: bool,
208 ) -> Result<Vec<ParallelStepResult>, EngineError> {
209 if steps.is_empty() {
210 return Ok(Vec::new());
211 }
212
213 let wave_position = self.position;
214 self.position += 1;
215
216 let now = Utc::now();
217 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
218
219 for (name, config) in &steps {
220 let kind = config.kind();
221 let step = self
222 .store
223 .create_step(NewStep {
224 run_id: self.run_id,
225 name: name.to_string(),
226 kind,
227 position: wave_position,
228 input: Some(serde_json::to_value(config)?),
229 })
230 .await?;
231
232 self.start_step(step.id, now).await?;
233
234 step_records.push((step.id, name.to_string(), config.clone()));
235 }
236
237 let mut join_set = JoinSet::new();
238 for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
239 let provider = self.provider.clone();
240 let config = config.clone();
241 join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
242 }
243
244 // JoinSet returns in completion order; indexed_results restores input order.
245 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
246 vec![None; step_records.len()];
247 let mut first_error: Option<EngineError> = None;
248
249 while let Some(join_result) = join_set.join_next().await {
250 let (idx, step_result) = match join_result {
251 Ok(r) => r,
252 Err(e) => {
253 if first_error.is_none() {
254 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
255 }
256 if fail_fast {
257 join_set.abort_all();
258 }
259 continue;
260 }
261 };
262
263 let (step_id, step_name, _) = &step_records[idx];
264 let completed_at = Utc::now();
265
266 match step_result {
267 Ok(output) => {
268 self.total_cost_usd += output.cost_usd;
269 self.total_duration_ms += output.duration_ms;
270
271 let debug_messages_json = output.debug_messages_json();
272
273 self.store
274 .update_step(
275 *step_id,
276 StepUpdate {
277 status: Some(StepStatus::Completed),
278 output: Some(output.output.clone()),
279 duration_ms: Some(output.duration_ms),
280 cost_usd: Some(output.cost_usd),
281 input_tokens: output.input_tokens,
282 output_tokens: output.output_tokens,
283 completed_at: Some(completed_at),
284 debug_messages: debug_messages_json,
285 ..StepUpdate::default()
286 },
287 )
288 .await?;
289
290 info!(
291 run_id = %self.run_id,
292 step = %step_name,
293 duration_ms = output.duration_ms,
294 "parallel step completed"
295 );
296
297 indexed_results[idx] = Some(Ok(output));
298 }
299 Err(err) => {
300 let err_msg = err.to_string();
301 let debug_messages_json = extract_debug_messages_from_error(&err);
302
303 if let Err(store_err) = self
304 .store
305 .update_step(
306 *step_id,
307 StepUpdate {
308 status: Some(StepStatus::Failed),
309 error: Some(err_msg.clone()),
310 completed_at: Some(completed_at),
311 debug_messages: debug_messages_json,
312 ..StepUpdate::default()
313 },
314 )
315 .await
316 {
317 tracing::error!(
318 step_id = %step_id,
319 error = %store_err,
320 "failed to persist parallel step failure"
321 );
322 }
323
324 indexed_results[idx] = Some(Err(err_msg.clone()));
325
326 if first_error.is_none() {
327 first_error = Some(err);
328 }
329
330 if fail_fast {
331 join_set.abort_all();
332 }
333 }
334 }
335 }
336
337 if let Some(err) = first_error {
338 return Err(err);
339 }
340
341 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
342
343 // Build results in original order.
344 let results: Vec<ParallelStepResult> = step_records
345 .iter()
346 .enumerate()
347 .map(|(idx, (step_id, name, _))| {
348 let output = match indexed_results[idx].take() {
349 Some(Ok(o)) => o,
350 _ => unreachable!("all steps succeeded if no error returned"),
351 };
352 ParallelStepResult {
353 name: name.clone(),
354 output,
355 step_id: *step_id,
356 }
357 })
358 .collect();
359
360 Ok(results)
361 }
362
363 /// Execute a shell step.
364 ///
365 /// Creates the step record, runs the command, persists the result,
366 /// and returns the output for use in subsequent steps.
367 ///
368 /// # Errors
369 ///
370 /// Returns [`EngineError`] if the command fails or the store errors.
371 ///
372 /// # Examples
373 ///
374 /// ```no_run
375 /// use ironflow_engine::context::WorkflowContext;
376 /// use ironflow_engine::config::ShellConfig;
377 /// use ironflow_engine::error::EngineError;
378 ///
379 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
380 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
381 /// println!("stdout: {}", files.output["stdout"]);
382 /// # Ok(())
383 /// # }
384 /// ```
385 pub async fn shell(
386 &mut self,
387 name: &str,
388 config: ShellConfig,
389 ) -> Result<StepOutput, EngineError> {
390 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
391 .await
392 }
393
394 /// Execute an HTTP step.
395 ///
396 /// # Errors
397 ///
398 /// Returns [`EngineError`] if the request fails or the store errors.
399 ///
400 /// # Examples
401 ///
402 /// ```no_run
403 /// use ironflow_engine::context::WorkflowContext;
404 /// use ironflow_engine::config::HttpConfig;
405 /// use ironflow_engine::error::EngineError;
406 ///
407 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
408 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
409 /// println!("status: {}", resp.output["status"]);
410 /// # Ok(())
411 /// # }
412 /// ```
413 pub async fn http(
414 &mut self,
415 name: &str,
416 config: HttpConfig,
417 ) -> Result<StepOutput, EngineError> {
418 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
419 .await
420 }
421
422 /// Execute an agent step.
423 ///
424 /// # Errors
425 ///
426 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
427 ///
428 /// # Examples
429 ///
430 /// ```no_run
431 /// use ironflow_engine::context::WorkflowContext;
432 /// use ironflow_engine::config::AgentStepConfig;
433 /// use ironflow_engine::error::EngineError;
434 ///
435 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
436 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
437 /// println!("review: {}", review.output["value"]);
438 /// # Ok(())
439 /// # }
440 /// ```
441 pub async fn agent(
442 &mut self,
443 name: &str,
444 config: impl Into<AgentStepConfig>,
445 ) -> Result<StepOutput, EngineError> {
446 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
447 .await
448 }
449
450 /// Create a human approval gate.
451 ///
452 /// On first execution, records an approval step and returns
453 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
454 /// transitions the run to `AwaitingApproval`.
455 ///
456 /// On resume (after a human approved via the API), the approval step
457 /// is replayed: it is marked as `Completed` and execution continues
458 /// past it. Multiple approval gates in the same handler work -- each
459 /// one pauses and resumes independently.
460 ///
461 /// # Errors
462 ///
463 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
464 /// first execution. Returns other [`EngineError`] variants on store
465 /// failures.
466 ///
467 /// # Examples
468 ///
469 /// ```no_run
470 /// use ironflow_engine::context::WorkflowContext;
471 /// use ironflow_engine::config::ApprovalConfig;
472 /// use ironflow_engine::error::EngineError;
473 ///
474 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
475 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
476 /// // Execution continues here after approval
477 /// # Ok(())
478 /// # }
479 /// ```
480 pub async fn approval(
481 &mut self,
482 name: &str,
483 config: ApprovalConfig,
484 ) -> Result<(), EngineError> {
485 let position = self.position;
486 self.position += 1;
487
488 // Replay: if this approval step exists from a prior execution,
489 // the run was approved -- mark it completed (if not already) and continue.
490 if let Some(existing) = self.replay_steps.get(&position)
491 && existing.kind == StepKind::Approval
492 {
493 if existing.status.state == StepStatus::AwaitingApproval {
494 self.store
495 .update_step(
496 existing.id,
497 StepUpdate {
498 status: Some(StepStatus::Completed),
499 completed_at: Some(Utc::now()),
500 ..StepUpdate::default()
501 },
502 )
503 .await?;
504 }
505
506 self.last_step_ids = vec![existing.id];
507 info!(
508 run_id = %self.run_id,
509 step = %name,
510 position,
511 "approval step replayed (approved)"
512 );
513 return Ok(());
514 }
515
516 // First execution: create the approval step and suspend.
517 let step = self
518 .store
519 .create_step(NewStep {
520 run_id: self.run_id,
521 name: name.to_string(),
522 kind: StepKind::Approval,
523 position,
524 input: Some(serde_json::to_value(&config)?),
525 })
526 .await?;
527
528 self.start_step(step.id, Utc::now()).await?;
529
530 // Transition the step to AwaitingApproval so it reflects
531 // the suspended state on the dashboard.
532 self.store
533 .update_step(
534 step.id,
535 StepUpdate {
536 status: Some(StepStatus::AwaitingApproval),
537 ..StepUpdate::default()
538 },
539 )
540 .await?;
541
542 self.last_step_ids = vec![step.id];
543
544 Err(EngineError::ApprovalRequired {
545 run_id: self.run_id,
546 step_id: step.id,
547 message: config.message().to_string(),
548 })
549 }
550
551 /// Execute a custom operation step.
552 ///
553 /// Runs a user-defined [`Operation`] with full step lifecycle management:
554 /// creates the step record, transitions to Running, executes the operation,
555 /// persists the output and duration, and marks the step Completed or Failed.
556 ///
557 /// The operation's [`kind()`](Operation::kind) is stored as
558 /// [`StepKind::Custom`].
559 ///
560 /// # Errors
561 ///
562 /// Returns [`EngineError`] if the operation fails or the store errors.
563 ///
564 /// # Examples
565 ///
566 /// ```no_run
567 /// use ironflow_engine::context::WorkflowContext;
568 /// use ironflow_engine::operation::Operation;
569 /// use ironflow_engine::error::EngineError;
570 /// use serde_json::{Value, json};
571 /// use std::pin::Pin;
572 /// use std::future::Future;
573 ///
574 /// struct MyOp;
575 /// impl Operation for MyOp {
576 /// fn kind(&self) -> &str { "my-service" }
577 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
578 /// Box::pin(async { Ok(json!({"ok": true})) })
579 /// }
580 /// }
581 ///
582 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
583 /// let result = ctx.operation("call-service", &MyOp).await?;
584 /// println!("output: {}", result.output);
585 /// # Ok(())
586 /// # }
587 /// ```
588 pub async fn operation(
589 &mut self,
590 name: &str,
591 op: &dyn Operation,
592 ) -> Result<StepOutput, EngineError> {
593 let kind = StepKind::Custom(op.kind().to_string());
594 let position = self.position;
595 self.position += 1;
596
597 let step = self
598 .store
599 .create_step(NewStep {
600 run_id: self.run_id,
601 name: name.to_string(),
602 kind,
603 position,
604 input: op.input(),
605 })
606 .await?;
607
608 self.start_step(step.id, Utc::now()).await?;
609
610 let start = Instant::now();
611
612 match op.execute().await {
613 Ok(output_value) => {
614 let duration_ms = start.elapsed().as_millis() as u64;
615 self.total_duration_ms += duration_ms;
616
617 let completed_at = Utc::now();
618 self.store
619 .update_step(
620 step.id,
621 StepUpdate {
622 status: Some(StepStatus::Completed),
623 output: Some(output_value.clone()),
624 duration_ms: Some(duration_ms),
625 cost_usd: Some(Decimal::ZERO),
626 completed_at: Some(completed_at),
627 ..StepUpdate::default()
628 },
629 )
630 .await?;
631
632 info!(
633 run_id = %self.run_id,
634 step = %name,
635 kind = op.kind(),
636 duration_ms,
637 "operation step completed"
638 );
639
640 self.last_step_ids = vec![step.id];
641
642 Ok(StepOutput {
643 output: output_value,
644 duration_ms,
645 cost_usd: Decimal::ZERO,
646 input_tokens: None,
647 output_tokens: None,
648 debug_messages: None,
649 })
650 }
651 Err(err) => {
652 let completed_at = Utc::now();
653 if let Err(store_err) = self
654 .store
655 .update_step(
656 step.id,
657 StepUpdate {
658 status: Some(StepStatus::Failed),
659 error: Some(err.to_string()),
660 completed_at: Some(completed_at),
661 ..StepUpdate::default()
662 },
663 )
664 .await
665 {
666 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
667 }
668
669 Err(err)
670 }
671 }
672 }
673
674 /// Execute a sub-workflow step.
675 ///
676 /// Creates a child run for the named workflow handler, executes it with
677 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
678 /// the child run ID and aggregated metrics.
679 ///
680 /// Requires the context to be created with
681 /// `with_handler_resolver`.
682 ///
683 /// # Errors
684 ///
685 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
686 /// with the given name, or if no handler resolver is available.
687 ///
688 /// # Examples
689 ///
690 /// ```no_run
691 /// use ironflow_engine::context::WorkflowContext;
692 /// use ironflow_engine::error::EngineError;
693 /// use serde_json::json;
694 ///
695 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
696 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
697 /// # Ok(())
698 /// # }
699 /// ```
700 pub async fn workflow(
701 &mut self,
702 handler: &dyn WorkflowHandler,
703 payload: Value,
704 ) -> Result<StepOutput, EngineError> {
705 let config = WorkflowStepConfig::new(handler.name(), payload);
706 let position = self.position;
707 self.position += 1;
708
709 let step = self
710 .store
711 .create_step(NewStep {
712 run_id: self.run_id,
713 name: config.workflow_name.clone(),
714 kind: StepKind::Workflow,
715 position,
716 input: Some(serde_json::to_value(&config)?),
717 })
718 .await?;
719
720 self.start_step(step.id, Utc::now()).await?;
721
722 match self.execute_child_workflow(&config).await {
723 Ok(output) => {
724 self.total_cost_usd += output.cost_usd;
725 self.total_duration_ms += output.duration_ms;
726
727 let completed_at = Utc::now();
728 self.store
729 .update_step(
730 step.id,
731 StepUpdate {
732 status: Some(StepStatus::Completed),
733 output: Some(output.output.clone()),
734 duration_ms: Some(output.duration_ms),
735 cost_usd: Some(output.cost_usd),
736 completed_at: Some(completed_at),
737 ..StepUpdate::default()
738 },
739 )
740 .await?;
741
742 info!(
743 run_id = %self.run_id,
744 child_workflow = %config.workflow_name,
745 duration_ms = output.duration_ms,
746 "workflow step completed"
747 );
748
749 self.last_step_ids = vec![step.id];
750
751 Ok(output)
752 }
753 Err(err) => {
754 let completed_at = Utc::now();
755 if let Err(store_err) = self
756 .store
757 .update_step(
758 step.id,
759 StepUpdate {
760 status: Some(StepStatus::Failed),
761 error: Some(err.to_string()),
762 completed_at: Some(completed_at),
763 ..StepUpdate::default()
764 },
765 )
766 .await
767 {
768 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
769 }
770
771 Err(err)
772 }
773 }
774 }
775
776 /// Execute a child workflow and return aggregated output.
777 async fn execute_child_workflow(
778 &self,
779 config: &WorkflowStepConfig,
780 ) -> Result<StepOutput, EngineError> {
781 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
782 EngineError::InvalidWorkflow(
783 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
784 )
785 })?;
786
787 let handler = resolver(&config.workflow_name).ok_or_else(|| {
788 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
789 })?;
790
791 let child_run = self
792 .store
793 .create_run(NewRun {
794 workflow_name: config.workflow_name.clone(),
795 trigger: TriggerKind::Workflow,
796 payload: config.payload.clone(),
797 max_retries: 0,
798 })
799 .await?;
800
801 let child_run_id = child_run.id;
802 info!(
803 parent_run_id = %self.run_id,
804 child_run_id = %child_run_id,
805 workflow = %config.workflow_name,
806 "child run created"
807 );
808
809 self.store
810 .update_run_status(child_run_id, RunStatus::Running)
811 .await?;
812
813 let run_start = Instant::now();
814 let mut child_ctx = WorkflowContext {
815 run_id: child_run_id,
816 store: self.store.clone(),
817 provider: self.provider.clone(),
818 handler_resolver: self.handler_resolver.clone(),
819 position: 0,
820 last_step_ids: Vec::new(),
821 total_cost_usd: Decimal::ZERO,
822 total_duration_ms: 0,
823 replay_steps: std::collections::HashMap::new(),
824 };
825
826 let result = handler.execute(&mut child_ctx).await;
827 let total_duration = run_start.elapsed().as_millis() as u64;
828 let completed_at = Utc::now();
829
830 match result {
831 Ok(()) => {
832 self.store
833 .update_run(
834 child_run_id,
835 RunUpdate {
836 status: Some(RunStatus::Completed),
837 cost_usd: Some(child_ctx.total_cost_usd),
838 duration_ms: Some(total_duration),
839 completed_at: Some(completed_at),
840 ..RunUpdate::default()
841 },
842 )
843 .await?;
844
845 Ok(StepOutput {
846 output: serde_json::json!({
847 "run_id": child_run_id,
848 "workflow_name": config.workflow_name,
849 "status": RunStatus::Completed,
850 "cost_usd": child_ctx.total_cost_usd,
851 "duration_ms": total_duration,
852 }),
853 duration_ms: total_duration,
854 cost_usd: child_ctx.total_cost_usd,
855 input_tokens: None,
856 output_tokens: None,
857 debug_messages: None,
858 })
859 }
860 Err(err) => {
861 if let Err(store_err) = self
862 .store
863 .update_run(
864 child_run_id,
865 RunUpdate {
866 status: Some(RunStatus::Failed),
867 error: Some(err.to_string()),
868 cost_usd: Some(child_ctx.total_cost_usd),
869 duration_ms: Some(total_duration),
870 completed_at: Some(completed_at),
871 ..RunUpdate::default()
872 },
873 )
874 .await
875 {
876 error!(
877 child_run_id = %child_run_id,
878 store_error = %store_err,
879 "failed to persist child run failure"
880 );
881 }
882
883 Err(err)
884 }
885 }
886 }
887
888 /// Try to replay a completed step from a previous execution.
889 ///
890 /// Returns `Some(StepOutput)` if a completed step exists at the given
891 /// position, `None` otherwise.
892 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
893 let step = self.replay_steps.get(&position)?;
894 if step.status.state != StepStatus::Completed {
895 return None;
896 }
897 let output = StepOutput {
898 output: step.output.clone().unwrap_or(Value::Null),
899 duration_ms: step.duration_ms,
900 cost_usd: step.cost_usd,
901 input_tokens: step.input_tokens,
902 output_tokens: step.output_tokens,
903 debug_messages: None,
904 };
905 self.total_cost_usd += output.cost_usd;
906 self.total_duration_ms += output.duration_ms;
907 self.last_step_ids = vec![step.id];
908 info!(
909 run_id = %self.run_id,
910 step = %step.name,
911 position,
912 "step replayed from previous execution"
913 );
914 Some(output)
915 }
916
917 /// Internal: execute a step with full persistence lifecycle.
918 async fn execute_step(
919 &mut self,
920 name: &str,
921 kind: StepKind,
922 config: StepConfig,
923 ) -> Result<StepOutput, EngineError> {
924 let position = self.position;
925 self.position += 1;
926
927 // Replay: if this step already completed in a prior execution, return cached output.
928 if let Some(output) = self.try_replay_step(position) {
929 return Ok(output);
930 }
931
932 // Create step record in Pending.
933 let step = self
934 .store
935 .create_step(NewStep {
936 run_id: self.run_id,
937 name: name.to_string(),
938 kind,
939 position,
940 input: Some(serde_json::to_value(&config)?),
941 })
942 .await?;
943
944 self.start_step(step.id, Utc::now()).await?;
945
946 match execute_step_config(&config, &self.provider).await {
947 Ok(output) => {
948 self.total_cost_usd += output.cost_usd;
949 self.total_duration_ms += output.duration_ms;
950
951 let debug_messages_json = output.debug_messages_json();
952
953 let completed_at = Utc::now();
954 self.store
955 .update_step(
956 step.id,
957 StepUpdate {
958 status: Some(StepStatus::Completed),
959 output: Some(output.output.clone()),
960 duration_ms: Some(output.duration_ms),
961 cost_usd: Some(output.cost_usd),
962 input_tokens: output.input_tokens,
963 output_tokens: output.output_tokens,
964 completed_at: Some(completed_at),
965 debug_messages: debug_messages_json,
966 ..StepUpdate::default()
967 },
968 )
969 .await?;
970
971 info!(
972 run_id = %self.run_id,
973 step = %name,
974 duration_ms = output.duration_ms,
975 "step completed"
976 );
977
978 self.last_step_ids = vec![step.id];
979
980 Ok(output)
981 }
982 Err(err) => {
983 let completed_at = Utc::now();
984 let debug_messages_json = extract_debug_messages_from_error(&err);
985
986 if let Err(store_err) = self
987 .store
988 .update_step(
989 step.id,
990 StepUpdate {
991 status: Some(StepStatus::Failed),
992 error: Some(err.to_string()),
993 completed_at: Some(completed_at),
994 debug_messages: debug_messages_json,
995 ..StepUpdate::default()
996 },
997 )
998 .await
999 {
1000 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1001 }
1002
1003 Err(err)
1004 }
1005 }
1006 }
1007
1008 /// Record dependency edges and transition a step to Running.
1009 ///
1010 /// Records edges from `step_id` to all `last_step_ids`, then
1011 /// transitions the step to `Running` with the given timestamp.
1012 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1013 if !self.last_step_ids.is_empty() {
1014 let deps: Vec<NewStepDependency> = self
1015 .last_step_ids
1016 .iter()
1017 .map(|&depends_on| NewStepDependency {
1018 step_id,
1019 depends_on,
1020 })
1021 .collect();
1022 self.store.create_step_dependencies(deps).await?;
1023 }
1024
1025 self.store
1026 .update_step(
1027 step_id,
1028 StepUpdate {
1029 status: Some(StepStatus::Running),
1030 started_at: Some(now),
1031 ..StepUpdate::default()
1032 },
1033 )
1034 .await?;
1035
1036 Ok(())
1037 }
1038
1039 /// Access the store directly (advanced usage).
1040 pub fn store(&self) -> &Arc<dyn RunStore> {
1041 &self.store
1042 }
1043
1044 /// Access the payload that triggered this run.
1045 ///
1046 /// Fetches the run from the store and returns its payload.
1047 ///
1048 /// # Errors
1049 ///
1050 /// Returns [`EngineError::Store`] if the run is not found.
1051 pub async fn payload(&self) -> Result<Value, EngineError> {
1052 let run = self
1053 .store
1054 .get_run(self.run_id)
1055 .await?
1056 .ok_or(EngineError::Store(
1057 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1058 ))?;
1059 Ok(run.payload)
1060 }
1061}
1062
1063impl fmt::Debug for WorkflowContext {
1064 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1065 f.debug_struct("WorkflowContext")
1066 .field("run_id", &self.run_id)
1067 .field("position", &self.position)
1068 .field("total_cost_usd", &self.total_cost_usd)
1069 .finish_non_exhaustive()
1070 }
1071}
1072
1073/// Extract debug messages from an engine error, if it wraps a schema validation
1074/// failure that carries a verbose conversation trace.
1075fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1076 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1077 debug_messages,
1078 ..
1079 })) = err
1080 && !debug_messages.is_empty()
1081 {
1082 return serde_json::to_value(debug_messages).ok();
1083 }
1084 None
1085}