ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::provider::AgentProvider;
38use ironflow_store::models::{
39 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
40 StepUpdate, TriggerKind,
41};
42use ironflow_store::store::RunStore;
43
44use crate::config::{
45 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
46};
47use crate::error::EngineError;
48use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
49use crate::handler::WorkflowHandler;
50use crate::operation::Operation;
51
52/// Callback type for resolving workflow handlers by name.
53pub(crate) type HandlerResolver =
54 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
55
56/// Execution context for a single workflow run.
57///
58/// Tracks the current step position and provides convenience methods
59/// for executing operations with automatic persistence.
60///
61/// # Examples
62///
63/// ```no_run
64/// use ironflow_engine::context::WorkflowContext;
65/// use ironflow_engine::config::ShellConfig;
66/// use ironflow_engine::error::EngineError;
67///
68/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
69/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
70/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkflowContext {
75 run_id: Uuid,
76 store: Arc<dyn RunStore>,
77 provider: Arc<dyn AgentProvider>,
78 handler_resolver: Option<HandlerResolver>,
79 position: u32,
80 /// IDs of the last executed step(s) -- used to record DAG dependencies.
81 last_step_ids: Vec<Uuid>,
82 /// Accumulated cost across all steps in this run.
83 total_cost_usd: Decimal,
84 /// Accumulated duration across all steps.
85 total_duration_ms: u64,
86 /// Steps from a previous execution, keyed by position.
87 /// Used when resuming after approval to replay completed steps.
88 replay_steps: std::collections::HashMap<u32, Step>,
89}
90
91impl WorkflowContext {
92 /// Create a new context for a run.
93 ///
94 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
95 /// creates this when executing a [`WorkflowHandler`].
96 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97 Self {
98 run_id,
99 store,
100 provider,
101 handler_resolver: None,
102 position: 0,
103 last_step_ids: Vec::new(),
104 total_cost_usd: Decimal::ZERO,
105 total_duration_ms: 0,
106 replay_steps: std::collections::HashMap::new(),
107 }
108 }
109
110 /// Create a new context with a handler resolver for sub-workflow support.
111 ///
112 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
113 /// look up registered handlers by name.
114 pub(crate) fn with_handler_resolver(
115 run_id: Uuid,
116 store: Arc<dyn RunStore>,
117 provider: Arc<dyn AgentProvider>,
118 resolver: HandlerResolver,
119 ) -> Self {
120 Self {
121 run_id,
122 store,
123 provider,
124 handler_resolver: Some(resolver),
125 position: 0,
126 last_step_ids: Vec::new(),
127 total_cost_usd: Decimal::ZERO,
128 total_duration_ms: 0,
129 replay_steps: std::collections::HashMap::new(),
130 }
131 }
132
133 /// Load existing steps from the store for replay after approval.
134 ///
135 /// Called by the engine when resuming a run. All completed steps
136 /// and the approved approval step are indexed by position so that
137 /// `execute_step` and `approval` can skip them.
138 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
139 let steps = self.store.list_steps(self.run_id).await?;
140 for step in steps {
141 let dominated = matches!(
142 step.status.state,
143 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
144 );
145 if dominated {
146 self.replay_steps.insert(step.position, step);
147 }
148 }
149 Ok(())
150 }
151
152 /// The run ID this context is executing for.
153 pub fn run_id(&self) -> Uuid {
154 self.run_id
155 }
156
157 /// Accumulated cost across all executed steps so far.
158 pub fn total_cost_usd(&self) -> Decimal {
159 self.total_cost_usd
160 }
161
162 /// Accumulated duration across all executed steps so far.
163 pub fn total_duration_ms(&self) -> u64 {
164 self.total_duration_ms
165 }
166
167 /// Execute multiple steps concurrently (wait-all model).
168 ///
169 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
170 /// Each step is recorded with the same `position` (execution wave).
171 /// Dependencies on previous steps are recorded automatically.
172 ///
173 /// When `fail_fast` is true, remaining steps are aborted on the first
174 /// failure. When false, all steps run to completion and the first
175 /// error is returned.
176 ///
177 /// # Errors
178 ///
179 /// Returns [`EngineError`] if any step fails.
180 ///
181 /// # Examples
182 ///
183 /// ```no_run
184 /// use ironflow_engine::context::WorkflowContext;
185 /// use ironflow_engine::config::{StepConfig, ShellConfig};
186 /// use ironflow_engine::error::EngineError;
187 ///
188 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
189 /// let results = ctx.parallel(
190 /// vec![
191 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
192 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
193 /// ],
194 /// true,
195 /// ).await?;
196 ///
197 /// for r in &results {
198 /// println!("{}: {:?}", r.name, r.output.output);
199 /// }
200 /// # Ok(())
201 /// # }
202 /// ```
203 pub async fn parallel(
204 &mut self,
205 steps: Vec<(&str, StepConfig)>,
206 fail_fast: bool,
207 ) -> Result<Vec<ParallelStepResult>, EngineError> {
208 if steps.is_empty() {
209 return Ok(Vec::new());
210 }
211
212 let wave_position = self.position;
213 self.position += 1;
214
215 let now = Utc::now();
216 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
217
218 for (name, config) in &steps {
219 let kind = config.kind();
220 let step = self
221 .store
222 .create_step(NewStep {
223 run_id: self.run_id,
224 name: name.to_string(),
225 kind,
226 position: wave_position,
227 input: Some(serde_json::to_value(config)?),
228 })
229 .await?;
230
231 self.start_step(step.id, now).await?;
232
233 step_records.push((step.id, name.to_string(), config.clone()));
234 }
235
236 let mut join_set = JoinSet::new();
237 for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
238 let provider = self.provider.clone();
239 let config = config.clone();
240 join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
241 }
242
243 // JoinSet returns in completion order; indexed_results restores input order.
244 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
245 vec![None; step_records.len()];
246 let mut first_error: Option<EngineError> = None;
247
248 while let Some(join_result) = join_set.join_next().await {
249 let (idx, step_result) = match join_result {
250 Ok(r) => r,
251 Err(e) => {
252 if first_error.is_none() {
253 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
254 }
255 if fail_fast {
256 join_set.abort_all();
257 }
258 continue;
259 }
260 };
261
262 let (step_id, step_name, _) = &step_records[idx];
263 let completed_at = Utc::now();
264
265 match step_result {
266 Ok(output) => {
267 self.total_cost_usd += output.cost_usd;
268 self.total_duration_ms += output.duration_ms;
269
270 self.store
271 .update_step(
272 *step_id,
273 StepUpdate {
274 status: Some(StepStatus::Completed),
275 output: Some(output.output.clone()),
276 duration_ms: Some(output.duration_ms),
277 cost_usd: Some(output.cost_usd),
278 input_tokens: output.input_tokens,
279 output_tokens: output.output_tokens,
280 completed_at: Some(completed_at),
281 ..StepUpdate::default()
282 },
283 )
284 .await?;
285
286 info!(
287 run_id = %self.run_id,
288 step = %step_name,
289 duration_ms = output.duration_ms,
290 "parallel step completed"
291 );
292
293 indexed_results[idx] = Some(Ok(output));
294 }
295 Err(err) => {
296 let err_msg = err.to_string();
297
298 if let Err(store_err) = self
299 .store
300 .update_step(
301 *step_id,
302 StepUpdate {
303 status: Some(StepStatus::Failed),
304 error: Some(err_msg.clone()),
305 completed_at: Some(completed_at),
306 ..StepUpdate::default()
307 },
308 )
309 .await
310 {
311 tracing::error!(
312 step_id = %step_id,
313 error = %store_err,
314 "failed to persist parallel step failure"
315 );
316 }
317
318 indexed_results[idx] = Some(Err(err_msg.clone()));
319
320 if first_error.is_none() {
321 first_error = Some(err);
322 }
323
324 if fail_fast {
325 join_set.abort_all();
326 }
327 }
328 }
329 }
330
331 if let Some(err) = first_error {
332 return Err(err);
333 }
334
335 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
336
337 // Build results in original order.
338 let results: Vec<ParallelStepResult> = step_records
339 .iter()
340 .enumerate()
341 .map(|(idx, (step_id, name, _))| {
342 let output = match indexed_results[idx].take() {
343 Some(Ok(o)) => o,
344 _ => unreachable!("all steps succeeded if no error returned"),
345 };
346 ParallelStepResult {
347 name: name.clone(),
348 output,
349 step_id: *step_id,
350 }
351 })
352 .collect();
353
354 Ok(results)
355 }
356
357 /// Execute a shell step.
358 ///
359 /// Creates the step record, runs the command, persists the result,
360 /// and returns the output for use in subsequent steps.
361 ///
362 /// # Errors
363 ///
364 /// Returns [`EngineError`] if the command fails or the store errors.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use ironflow_engine::context::WorkflowContext;
370 /// use ironflow_engine::config::ShellConfig;
371 /// use ironflow_engine::error::EngineError;
372 ///
373 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
375 /// println!("stdout: {}", files.output["stdout"]);
376 /// # Ok(())
377 /// # }
378 /// ```
379 pub async fn shell(
380 &mut self,
381 name: &str,
382 config: ShellConfig,
383 ) -> Result<StepOutput, EngineError> {
384 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
385 .await
386 }
387
388 /// Execute an HTTP step.
389 ///
390 /// # Errors
391 ///
392 /// Returns [`EngineError`] if the request fails or the store errors.
393 ///
394 /// # Examples
395 ///
396 /// ```no_run
397 /// use ironflow_engine::context::WorkflowContext;
398 /// use ironflow_engine::config::HttpConfig;
399 /// use ironflow_engine::error::EngineError;
400 ///
401 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
403 /// println!("status: {}", resp.output["status"]);
404 /// # Ok(())
405 /// # }
406 /// ```
407 pub async fn http(
408 &mut self,
409 name: &str,
410 config: HttpConfig,
411 ) -> Result<StepOutput, EngineError> {
412 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
413 .await
414 }
415
416 /// Execute an agent step.
417 ///
418 /// # Errors
419 ///
420 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
421 ///
422 /// # Examples
423 ///
424 /// ```no_run
425 /// use ironflow_engine::context::WorkflowContext;
426 /// use ironflow_engine::config::AgentStepConfig;
427 /// use ironflow_engine::error::EngineError;
428 ///
429 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
430 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
431 /// println!("review: {}", review.output["value"]);
432 /// # Ok(())
433 /// # }
434 /// ```
435 pub async fn agent(
436 &mut self,
437 name: &str,
438 config: AgentStepConfig,
439 ) -> Result<StepOutput, EngineError> {
440 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
441 .await
442 }
443
444 /// Create a human approval gate.
445 ///
446 /// On first execution, records an approval step and returns
447 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
448 /// transitions the run to `AwaitingApproval`.
449 ///
450 /// On resume (after a human approved via the API), the approval step
451 /// is replayed: it is marked as `Completed` and execution continues
452 /// past it. Multiple approval gates in the same handler work -- each
453 /// one pauses and resumes independently.
454 ///
455 /// # Errors
456 ///
457 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
458 /// first execution. Returns other [`EngineError`] variants on store
459 /// failures.
460 ///
461 /// # Examples
462 ///
463 /// ```no_run
464 /// use ironflow_engine::context::WorkflowContext;
465 /// use ironflow_engine::config::ApprovalConfig;
466 /// use ironflow_engine::error::EngineError;
467 ///
468 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
469 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
470 /// // Execution continues here after approval
471 /// # Ok(())
472 /// # }
473 /// ```
474 pub async fn approval(
475 &mut self,
476 name: &str,
477 config: ApprovalConfig,
478 ) -> Result<(), EngineError> {
479 let position = self.position;
480 self.position += 1;
481
482 // Replay: if this approval step exists from a prior execution,
483 // the run was approved -- mark it completed (if not already) and continue.
484 if let Some(existing) = self.replay_steps.get(&position)
485 && existing.kind == StepKind::Approval
486 {
487 if existing.status.state == StepStatus::AwaitingApproval {
488 self.store
489 .update_step(
490 existing.id,
491 StepUpdate {
492 status: Some(StepStatus::Completed),
493 completed_at: Some(Utc::now()),
494 ..StepUpdate::default()
495 },
496 )
497 .await?;
498 }
499
500 self.last_step_ids = vec![existing.id];
501 info!(
502 run_id = %self.run_id,
503 step = %name,
504 position,
505 "approval step replayed (approved)"
506 );
507 return Ok(());
508 }
509
510 // First execution: create the approval step and suspend.
511 let step = self
512 .store
513 .create_step(NewStep {
514 run_id: self.run_id,
515 name: name.to_string(),
516 kind: StepKind::Approval,
517 position,
518 input: Some(serde_json::to_value(&config)?),
519 })
520 .await?;
521
522 self.start_step(step.id, Utc::now()).await?;
523
524 // Transition the step to AwaitingApproval so it reflects
525 // the suspended state on the dashboard.
526 self.store
527 .update_step(
528 step.id,
529 StepUpdate {
530 status: Some(StepStatus::AwaitingApproval),
531 ..StepUpdate::default()
532 },
533 )
534 .await?;
535
536 self.last_step_ids = vec![step.id];
537
538 Err(EngineError::ApprovalRequired {
539 run_id: self.run_id,
540 step_id: step.id,
541 message: config.message().to_string(),
542 })
543 }
544
545 /// Execute a custom operation step.
546 ///
547 /// Runs a user-defined [`Operation`] with full step lifecycle management:
548 /// creates the step record, transitions to Running, executes the operation,
549 /// persists the output and duration, and marks the step Completed or Failed.
550 ///
551 /// The operation's [`kind()`](Operation::kind) is stored as
552 /// [`StepKind::Custom`].
553 ///
554 /// # Errors
555 ///
556 /// Returns [`EngineError`] if the operation fails or the store errors.
557 ///
558 /// # Examples
559 ///
560 /// ```no_run
561 /// use ironflow_engine::context::WorkflowContext;
562 /// use ironflow_engine::operation::Operation;
563 /// use ironflow_engine::error::EngineError;
564 /// use serde_json::{Value, json};
565 /// use std::pin::Pin;
566 /// use std::future::Future;
567 ///
568 /// struct MyOp;
569 /// impl Operation for MyOp {
570 /// fn kind(&self) -> &str { "my-service" }
571 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
572 /// Box::pin(async { Ok(json!({"ok": true})) })
573 /// }
574 /// }
575 ///
576 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
577 /// let result = ctx.operation("call-service", &MyOp).await?;
578 /// println!("output: {}", result.output);
579 /// # Ok(())
580 /// # }
581 /// ```
582 pub async fn operation(
583 &mut self,
584 name: &str,
585 op: &dyn Operation,
586 ) -> Result<StepOutput, EngineError> {
587 let kind = StepKind::Custom(op.kind().to_string());
588 let position = self.position;
589 self.position += 1;
590
591 let step = self
592 .store
593 .create_step(NewStep {
594 run_id: self.run_id,
595 name: name.to_string(),
596 kind,
597 position,
598 input: op.input(),
599 })
600 .await?;
601
602 self.start_step(step.id, Utc::now()).await?;
603
604 let start = Instant::now();
605
606 match op.execute().await {
607 Ok(output_value) => {
608 let duration_ms = start.elapsed().as_millis() as u64;
609 self.total_duration_ms += duration_ms;
610
611 let completed_at = Utc::now();
612 self.store
613 .update_step(
614 step.id,
615 StepUpdate {
616 status: Some(StepStatus::Completed),
617 output: Some(output_value.clone()),
618 duration_ms: Some(duration_ms),
619 cost_usd: Some(Decimal::ZERO),
620 completed_at: Some(completed_at),
621 ..StepUpdate::default()
622 },
623 )
624 .await?;
625
626 info!(
627 run_id = %self.run_id,
628 step = %name,
629 kind = op.kind(),
630 duration_ms,
631 "operation step completed"
632 );
633
634 self.last_step_ids = vec![step.id];
635
636 Ok(StepOutput {
637 output: output_value,
638 duration_ms,
639 cost_usd: Decimal::ZERO,
640 input_tokens: None,
641 output_tokens: None,
642 debug_messages: None,
643 })
644 }
645 Err(err) => {
646 let completed_at = Utc::now();
647 if let Err(store_err) = self
648 .store
649 .update_step(
650 step.id,
651 StepUpdate {
652 status: Some(StepStatus::Failed),
653 error: Some(err.to_string()),
654 completed_at: Some(completed_at),
655 ..StepUpdate::default()
656 },
657 )
658 .await
659 {
660 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
661 }
662
663 Err(err)
664 }
665 }
666 }
667
668 /// Execute a sub-workflow step.
669 ///
670 /// Creates a child run for the named workflow handler, executes it with
671 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
672 /// the child run ID and aggregated metrics.
673 ///
674 /// Requires the context to be created with
675 /// `with_handler_resolver`.
676 ///
677 /// # Errors
678 ///
679 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
680 /// with the given name, or if no handler resolver is available.
681 ///
682 /// # Examples
683 ///
684 /// ```no_run
685 /// use ironflow_engine::context::WorkflowContext;
686 /// use ironflow_engine::error::EngineError;
687 /// use serde_json::json;
688 ///
689 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
690 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
691 /// # Ok(())
692 /// # }
693 /// ```
694 pub async fn workflow(
695 &mut self,
696 handler: &dyn WorkflowHandler,
697 payload: Value,
698 ) -> Result<StepOutput, EngineError> {
699 let config = WorkflowStepConfig::new(handler.name(), payload);
700 let position = self.position;
701 self.position += 1;
702
703 let step = self
704 .store
705 .create_step(NewStep {
706 run_id: self.run_id,
707 name: config.workflow_name.clone(),
708 kind: StepKind::Workflow,
709 position,
710 input: Some(serde_json::to_value(&config)?),
711 })
712 .await?;
713
714 self.start_step(step.id, Utc::now()).await?;
715
716 match self.execute_child_workflow(&config).await {
717 Ok(output) => {
718 self.total_cost_usd += output.cost_usd;
719 self.total_duration_ms += output.duration_ms;
720
721 let completed_at = Utc::now();
722 self.store
723 .update_step(
724 step.id,
725 StepUpdate {
726 status: Some(StepStatus::Completed),
727 output: Some(output.output.clone()),
728 duration_ms: Some(output.duration_ms),
729 cost_usd: Some(output.cost_usd),
730 completed_at: Some(completed_at),
731 ..StepUpdate::default()
732 },
733 )
734 .await?;
735
736 info!(
737 run_id = %self.run_id,
738 child_workflow = %config.workflow_name,
739 duration_ms = output.duration_ms,
740 "workflow step completed"
741 );
742
743 self.last_step_ids = vec![step.id];
744
745 Ok(output)
746 }
747 Err(err) => {
748 let completed_at = Utc::now();
749 if let Err(store_err) = self
750 .store
751 .update_step(
752 step.id,
753 StepUpdate {
754 status: Some(StepStatus::Failed),
755 error: Some(err.to_string()),
756 completed_at: Some(completed_at),
757 ..StepUpdate::default()
758 },
759 )
760 .await
761 {
762 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
763 }
764
765 Err(err)
766 }
767 }
768 }
769
770 /// Execute a child workflow and return aggregated output.
771 async fn execute_child_workflow(
772 &self,
773 config: &WorkflowStepConfig,
774 ) -> Result<StepOutput, EngineError> {
775 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
776 EngineError::InvalidWorkflow(
777 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
778 )
779 })?;
780
781 let handler = resolver(&config.workflow_name).ok_or_else(|| {
782 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
783 })?;
784
785 let child_run = self
786 .store
787 .create_run(NewRun {
788 workflow_name: config.workflow_name.clone(),
789 trigger: TriggerKind::Workflow,
790 payload: config.payload.clone(),
791 max_retries: 0,
792 })
793 .await?;
794
795 let child_run_id = child_run.id;
796 info!(
797 parent_run_id = %self.run_id,
798 child_run_id = %child_run_id,
799 workflow = %config.workflow_name,
800 "child run created"
801 );
802
803 self.store
804 .update_run_status(child_run_id, RunStatus::Running)
805 .await?;
806
807 let run_start = Instant::now();
808 let mut child_ctx = WorkflowContext {
809 run_id: child_run_id,
810 store: self.store.clone(),
811 provider: self.provider.clone(),
812 handler_resolver: self.handler_resolver.clone(),
813 position: 0,
814 last_step_ids: Vec::new(),
815 total_cost_usd: Decimal::ZERO,
816 total_duration_ms: 0,
817 replay_steps: std::collections::HashMap::new(),
818 };
819
820 let result = handler.execute(&mut child_ctx).await;
821 let total_duration = run_start.elapsed().as_millis() as u64;
822 let completed_at = Utc::now();
823
824 match result {
825 Ok(()) => {
826 self.store
827 .update_run(
828 child_run_id,
829 RunUpdate {
830 status: Some(RunStatus::Completed),
831 cost_usd: Some(child_ctx.total_cost_usd),
832 duration_ms: Some(total_duration),
833 completed_at: Some(completed_at),
834 ..RunUpdate::default()
835 },
836 )
837 .await?;
838
839 Ok(StepOutput {
840 output: serde_json::json!({
841 "run_id": child_run_id,
842 "workflow_name": config.workflow_name,
843 "status": RunStatus::Completed,
844 "cost_usd": child_ctx.total_cost_usd,
845 "duration_ms": total_duration,
846 }),
847 duration_ms: total_duration,
848 cost_usd: child_ctx.total_cost_usd,
849 input_tokens: None,
850 output_tokens: None,
851 debug_messages: None,
852 })
853 }
854 Err(err) => {
855 if let Err(store_err) = self
856 .store
857 .update_run(
858 child_run_id,
859 RunUpdate {
860 status: Some(RunStatus::Failed),
861 error: Some(err.to_string()),
862 cost_usd: Some(child_ctx.total_cost_usd),
863 duration_ms: Some(total_duration),
864 completed_at: Some(completed_at),
865 ..RunUpdate::default()
866 },
867 )
868 .await
869 {
870 error!(
871 child_run_id = %child_run_id,
872 store_error = %store_err,
873 "failed to persist child run failure"
874 );
875 }
876
877 Err(err)
878 }
879 }
880 }
881
882 /// Try to replay a completed step from a previous execution.
883 ///
884 /// Returns `Some(StepOutput)` if a completed step exists at the given
885 /// position, `None` otherwise.
886 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
887 let step = self.replay_steps.get(&position)?;
888 if step.status.state != StepStatus::Completed {
889 return None;
890 }
891 let output = StepOutput {
892 output: step.output.clone().unwrap_or(Value::Null),
893 duration_ms: step.duration_ms,
894 cost_usd: step.cost_usd,
895 input_tokens: step.input_tokens,
896 output_tokens: step.output_tokens,
897 debug_messages: None,
898 };
899 self.total_cost_usd += output.cost_usd;
900 self.total_duration_ms += output.duration_ms;
901 self.last_step_ids = vec![step.id];
902 info!(
903 run_id = %self.run_id,
904 step = %step.name,
905 position,
906 "step replayed from previous execution"
907 );
908 Some(output)
909 }
910
911 /// Internal: execute a step with full persistence lifecycle.
912 async fn execute_step(
913 &mut self,
914 name: &str,
915 kind: StepKind,
916 config: StepConfig,
917 ) -> Result<StepOutput, EngineError> {
918 let position = self.position;
919 self.position += 1;
920
921 // Replay: if this step already completed in a prior execution, return cached output.
922 if let Some(output) = self.try_replay_step(position) {
923 return Ok(output);
924 }
925
926 // Create step record in Pending.
927 let step = self
928 .store
929 .create_step(NewStep {
930 run_id: self.run_id,
931 name: name.to_string(),
932 kind,
933 position,
934 input: Some(serde_json::to_value(&config)?),
935 })
936 .await?;
937
938 self.start_step(step.id, Utc::now()).await?;
939
940 match execute_step_config(&config, &self.provider).await {
941 Ok(output) => {
942 self.total_cost_usd += output.cost_usd;
943 self.total_duration_ms += output.duration_ms;
944
945 let completed_at = Utc::now();
946 self.store
947 .update_step(
948 step.id,
949 StepUpdate {
950 status: Some(StepStatus::Completed),
951 output: Some(output.output.clone()),
952 duration_ms: Some(output.duration_ms),
953 cost_usd: Some(output.cost_usd),
954 input_tokens: output.input_tokens,
955 output_tokens: output.output_tokens,
956 completed_at: Some(completed_at),
957 ..StepUpdate::default()
958 },
959 )
960 .await?;
961
962 info!(
963 run_id = %self.run_id,
964 step = %name,
965 duration_ms = output.duration_ms,
966 "step completed"
967 );
968
969 self.last_step_ids = vec![step.id];
970
971 Ok(output)
972 }
973 Err(err) => {
974 let completed_at = Utc::now();
975 if let Err(store_err) = self
976 .store
977 .update_step(
978 step.id,
979 StepUpdate {
980 status: Some(StepStatus::Failed),
981 error: Some(err.to_string()),
982 completed_at: Some(completed_at),
983 ..StepUpdate::default()
984 },
985 )
986 .await
987 {
988 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
989 }
990
991 Err(err)
992 }
993 }
994 }
995
996 /// Record dependency edges and transition a step to Running.
997 ///
998 /// Records edges from `step_id` to all `last_step_ids`, then
999 /// transitions the step to `Running` with the given timestamp.
1000 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1001 if !self.last_step_ids.is_empty() {
1002 let deps: Vec<NewStepDependency> = self
1003 .last_step_ids
1004 .iter()
1005 .map(|&depends_on| NewStepDependency {
1006 step_id,
1007 depends_on,
1008 })
1009 .collect();
1010 self.store.create_step_dependencies(deps).await?;
1011 }
1012
1013 self.store
1014 .update_step(
1015 step_id,
1016 StepUpdate {
1017 status: Some(StepStatus::Running),
1018 started_at: Some(now),
1019 ..StepUpdate::default()
1020 },
1021 )
1022 .await?;
1023
1024 Ok(())
1025 }
1026
1027 /// Access the store directly (advanced usage).
1028 pub fn store(&self) -> &Arc<dyn RunStore> {
1029 &self.store
1030 }
1031
1032 /// Access the payload that triggered this run.
1033 ///
1034 /// Fetches the run from the store and returns its payload.
1035 ///
1036 /// # Errors
1037 ///
1038 /// Returns [`EngineError::Store`] if the run is not found.
1039 pub async fn payload(&self) -> Result<Value, EngineError> {
1040 let run = self
1041 .store
1042 .get_run(self.run_id)
1043 .await?
1044 .ok_or(EngineError::Store(
1045 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1046 ))?;
1047 Ok(run.payload)
1048 }
1049}
1050
1051impl fmt::Debug for WorkflowContext {
1052 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1053 f.debug_struct("WorkflowContext")
1054 .field("run_id", &self.run_id)
1055 .field("position", &self.position)
1056 .field("total_cost_usd", &self.total_cost_usd)
1057 .finish_non_exhaustive()
1058 }
1059}