ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::provider::AgentProvider;
38use ironflow_store::models::{
39 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
40 StepUpdate, TriggerKind,
41};
42use ironflow_store::store::RunStore;
43
44use crate::config::{
45 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
46};
47use crate::error::EngineError;
48use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
49use crate::handler::WorkflowHandler;
50use crate::operation::Operation;
51
52/// Callback type for resolving workflow handlers by name.
53pub(crate) type HandlerResolver =
54 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
55
56/// Execution context for a single workflow run.
57///
58/// Tracks the current step position and provides convenience methods
59/// for executing operations with automatic persistence.
60///
61/// # Examples
62///
63/// ```no_run
64/// use ironflow_engine::context::WorkflowContext;
65/// use ironflow_engine::config::ShellConfig;
66/// use ironflow_engine::error::EngineError;
67///
68/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
69/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
70/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkflowContext {
75 run_id: Uuid,
76 store: Arc<dyn RunStore>,
77 provider: Arc<dyn AgentProvider>,
78 handler_resolver: Option<HandlerResolver>,
79 position: u32,
80 /// IDs of the last executed step(s) -- used to record DAG dependencies.
81 last_step_ids: Vec<Uuid>,
82 /// Accumulated cost across all steps in this run.
83 total_cost_usd: Decimal,
84 /// Accumulated duration across all steps.
85 total_duration_ms: u64,
86 /// Steps from a previous execution, keyed by position.
87 /// Used when resuming after approval to replay completed steps.
88 replay_steps: std::collections::HashMap<u32, Step>,
89}
90
91impl WorkflowContext {
92 /// Create a new context for a run.
93 ///
94 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
95 /// creates this when executing a [`WorkflowHandler`].
96 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97 Self {
98 run_id,
99 store,
100 provider,
101 handler_resolver: None,
102 position: 0,
103 last_step_ids: Vec::new(),
104 total_cost_usd: Decimal::ZERO,
105 total_duration_ms: 0,
106 replay_steps: std::collections::HashMap::new(),
107 }
108 }
109
110 /// Create a new context with a handler resolver for sub-workflow support.
111 ///
112 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
113 /// look up registered handlers by name.
114 pub(crate) fn with_handler_resolver(
115 run_id: Uuid,
116 store: Arc<dyn RunStore>,
117 provider: Arc<dyn AgentProvider>,
118 resolver: HandlerResolver,
119 ) -> Self {
120 Self {
121 run_id,
122 store,
123 provider,
124 handler_resolver: Some(resolver),
125 position: 0,
126 last_step_ids: Vec::new(),
127 total_cost_usd: Decimal::ZERO,
128 total_duration_ms: 0,
129 replay_steps: std::collections::HashMap::new(),
130 }
131 }
132
133 /// Load existing steps from the store for replay after approval.
134 ///
135 /// Called by the engine when resuming a run. All completed steps
136 /// and the approved approval step are indexed by position so that
137 /// `execute_step` and `approval` can skip them.
138 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
139 let steps = self.store.list_steps(self.run_id).await?;
140 for step in steps {
141 let dominated = matches!(
142 step.status.state,
143 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
144 );
145 if dominated {
146 self.replay_steps.insert(step.position, step);
147 }
148 }
149 Ok(())
150 }
151
152 /// The run ID this context is executing for.
153 pub fn run_id(&self) -> Uuid {
154 self.run_id
155 }
156
157 /// Accumulated cost across all executed steps so far.
158 pub fn total_cost_usd(&self) -> Decimal {
159 self.total_cost_usd
160 }
161
162 /// Accumulated duration across all executed steps so far.
163 pub fn total_duration_ms(&self) -> u64 {
164 self.total_duration_ms
165 }
166
167 /// Execute multiple steps concurrently (wait-all model).
168 ///
169 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
170 /// Each step is recorded with the same `position` (execution wave).
171 /// Dependencies on previous steps are recorded automatically.
172 ///
173 /// When `fail_fast` is true, remaining steps are aborted on the first
174 /// failure. When false, all steps run to completion and the first
175 /// error is returned.
176 ///
177 /// # Errors
178 ///
179 /// Returns [`EngineError`] if any step fails.
180 ///
181 /// # Examples
182 ///
183 /// ```no_run
184 /// use ironflow_engine::context::WorkflowContext;
185 /// use ironflow_engine::config::{StepConfig, ShellConfig};
186 /// use ironflow_engine::error::EngineError;
187 ///
188 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
189 /// let results = ctx.parallel(
190 /// vec![
191 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
192 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
193 /// ],
194 /// true,
195 /// ).await?;
196 ///
197 /// for r in &results {
198 /// println!("{}: {:?}", r.name, r.output.output);
199 /// }
200 /// # Ok(())
201 /// # }
202 /// ```
203 pub async fn parallel(
204 &mut self,
205 steps: Vec<(&str, StepConfig)>,
206 fail_fast: bool,
207 ) -> Result<Vec<ParallelStepResult>, EngineError> {
208 if steps.is_empty() {
209 return Ok(Vec::new());
210 }
211
212 let wave_position = self.position;
213 self.position += 1;
214
215 let now = Utc::now();
216 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
217
218 for (name, config) in &steps {
219 let kind = config.kind();
220 let step = self
221 .store
222 .create_step(NewStep {
223 run_id: self.run_id,
224 name: name.to_string(),
225 kind,
226 position: wave_position,
227 input: Some(serde_json::to_value(config)?),
228 })
229 .await?;
230
231 self.start_step(step.id, now).await?;
232
233 step_records.push((step.id, name.to_string(), config.clone()));
234 }
235
236 let mut join_set = JoinSet::new();
237 for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
238 let provider = self.provider.clone();
239 let config = config.clone();
240 join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
241 }
242
243 // JoinSet returns in completion order; indexed_results restores input order.
244 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
245 vec![None; step_records.len()];
246 let mut first_error: Option<EngineError> = None;
247
248 while let Some(join_result) = join_set.join_next().await {
249 let (idx, step_result) = match join_result {
250 Ok(r) => r,
251 Err(e) => {
252 if first_error.is_none() {
253 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
254 }
255 if fail_fast {
256 join_set.abort_all();
257 }
258 continue;
259 }
260 };
261
262 let (step_id, step_name, _) = &step_records[idx];
263 let completed_at = Utc::now();
264
265 match step_result {
266 Ok(output) => {
267 self.total_cost_usd += output.cost_usd;
268 self.total_duration_ms += output.duration_ms;
269
270 self.store
271 .update_step(
272 *step_id,
273 StepUpdate {
274 status: Some(StepStatus::Completed),
275 output: Some(output.output.clone()),
276 duration_ms: Some(output.duration_ms),
277 cost_usd: Some(output.cost_usd),
278 input_tokens: output.input_tokens,
279 output_tokens: output.output_tokens,
280 completed_at: Some(completed_at),
281 ..StepUpdate::default()
282 },
283 )
284 .await?;
285
286 info!(
287 run_id = %self.run_id,
288 step = %step_name,
289 duration_ms = output.duration_ms,
290 "parallel step completed"
291 );
292
293 indexed_results[idx] = Some(Ok(output));
294 }
295 Err(err) => {
296 let err_msg = err.to_string();
297
298 if let Err(store_err) = self
299 .store
300 .update_step(
301 *step_id,
302 StepUpdate {
303 status: Some(StepStatus::Failed),
304 error: Some(err_msg.clone()),
305 completed_at: Some(completed_at),
306 ..StepUpdate::default()
307 },
308 )
309 .await
310 {
311 tracing::error!(
312 step_id = %step_id,
313 error = %store_err,
314 "failed to persist parallel step failure"
315 );
316 }
317
318 indexed_results[idx] = Some(Err(err_msg.clone()));
319
320 if first_error.is_none() {
321 first_error = Some(err);
322 }
323
324 if fail_fast {
325 join_set.abort_all();
326 }
327 }
328 }
329 }
330
331 if let Some(err) = first_error {
332 return Err(err);
333 }
334
335 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
336
337 // Build results in original order.
338 let results: Vec<ParallelStepResult> = step_records
339 .iter()
340 .enumerate()
341 .map(|(idx, (step_id, name, _))| {
342 let output = match indexed_results[idx].take() {
343 Some(Ok(o)) => o,
344 _ => unreachable!("all steps succeeded if no error returned"),
345 };
346 ParallelStepResult {
347 name: name.clone(),
348 output,
349 step_id: *step_id,
350 }
351 })
352 .collect();
353
354 Ok(results)
355 }
356
357 /// Execute a shell step.
358 ///
359 /// Creates the step record, runs the command, persists the result,
360 /// and returns the output for use in subsequent steps.
361 ///
362 /// # Errors
363 ///
364 /// Returns [`EngineError`] if the command fails or the store errors.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use ironflow_engine::context::WorkflowContext;
370 /// use ironflow_engine::config::ShellConfig;
371 /// use ironflow_engine::error::EngineError;
372 ///
373 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
375 /// println!("stdout: {}", files.output["stdout"]);
376 /// # Ok(())
377 /// # }
378 /// ```
379 pub async fn shell(
380 &mut self,
381 name: &str,
382 config: ShellConfig,
383 ) -> Result<StepOutput, EngineError> {
384 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
385 .await
386 }
387
388 /// Execute an HTTP step.
389 ///
390 /// # Errors
391 ///
392 /// Returns [`EngineError`] if the request fails or the store errors.
393 ///
394 /// # Examples
395 ///
396 /// ```no_run
397 /// use ironflow_engine::context::WorkflowContext;
398 /// use ironflow_engine::config::HttpConfig;
399 /// use ironflow_engine::error::EngineError;
400 ///
401 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
403 /// println!("status: {}", resp.output["status"]);
404 /// # Ok(())
405 /// # }
406 /// ```
407 pub async fn http(
408 &mut self,
409 name: &str,
410 config: HttpConfig,
411 ) -> Result<StepOutput, EngineError> {
412 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
413 .await
414 }
415
416 /// Execute an agent step.
417 ///
418 /// # Errors
419 ///
420 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
421 ///
422 /// # Examples
423 ///
424 /// ```no_run
425 /// use ironflow_engine::context::WorkflowContext;
426 /// use ironflow_engine::config::AgentStepConfig;
427 /// use ironflow_engine::error::EngineError;
428 ///
429 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
430 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
431 /// println!("review: {}", review.output["value"]);
432 /// # Ok(())
433 /// # }
434 /// ```
435 pub async fn agent(
436 &mut self,
437 name: &str,
438 config: AgentStepConfig,
439 ) -> Result<StepOutput, EngineError> {
440 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
441 .await
442 }
443
444 /// Create a human approval gate.
445 ///
446 /// On first execution, records an approval step and returns
447 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
448 /// transitions the run to `AwaitingApproval`.
449 ///
450 /// On resume (after a human approved via the API), the approval step
451 /// is replayed: it is marked as `Completed` and execution continues
452 /// past it. Multiple approval gates in the same handler work -- each
453 /// one pauses and resumes independently.
454 ///
455 /// # Errors
456 ///
457 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
458 /// first execution. Returns other [`EngineError`] variants on store
459 /// failures.
460 ///
461 /// # Examples
462 ///
463 /// ```no_run
464 /// use ironflow_engine::context::WorkflowContext;
465 /// use ironflow_engine::config::ApprovalConfig;
466 /// use ironflow_engine::error::EngineError;
467 ///
468 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
469 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
470 /// // Execution continues here after approval
471 /// # Ok(())
472 /// # }
473 /// ```
474 pub async fn approval(
475 &mut self,
476 name: &str,
477 config: ApprovalConfig,
478 ) -> Result<(), EngineError> {
479 let position = self.position;
480 self.position += 1;
481
482 // Replay: if this approval step exists from a prior execution,
483 // the run was approved -- mark it completed (if not already) and continue.
484 if let Some(existing) = self.replay_steps.get(&position)
485 && existing.kind == StepKind::Approval
486 {
487 if existing.status.state == StepStatus::AwaitingApproval {
488 self.store
489 .update_step(
490 existing.id,
491 StepUpdate {
492 status: Some(StepStatus::Completed),
493 completed_at: Some(Utc::now()),
494 ..StepUpdate::default()
495 },
496 )
497 .await?;
498 }
499
500 self.last_step_ids = vec![existing.id];
501 info!(
502 run_id = %self.run_id,
503 step = %name,
504 position,
505 "approval step replayed (approved)"
506 );
507 return Ok(());
508 }
509
510 // First execution: create the approval step and suspend.
511 let step = self
512 .store
513 .create_step(NewStep {
514 run_id: self.run_id,
515 name: name.to_string(),
516 kind: StepKind::Approval,
517 position,
518 input: Some(serde_json::to_value(&config)?),
519 })
520 .await?;
521
522 self.start_step(step.id, Utc::now()).await?;
523
524 // Transition the step to AwaitingApproval so it reflects
525 // the suspended state on the dashboard.
526 self.store
527 .update_step(
528 step.id,
529 StepUpdate {
530 status: Some(StepStatus::AwaitingApproval),
531 ..StepUpdate::default()
532 },
533 )
534 .await?;
535
536 self.last_step_ids = vec![step.id];
537
538 Err(EngineError::ApprovalRequired {
539 run_id: self.run_id,
540 step_id: step.id,
541 message: config.message().to_string(),
542 })
543 }
544
545 /// Execute a custom operation step.
546 ///
547 /// Runs a user-defined [`Operation`] with full step lifecycle management:
548 /// creates the step record, transitions to Running, executes the operation,
549 /// persists the output and duration, and marks the step Completed or Failed.
550 ///
551 /// The operation's [`kind()`](Operation::kind) is stored as
552 /// [`StepKind::Custom`].
553 ///
554 /// # Errors
555 ///
556 /// Returns [`EngineError`] if the operation fails or the store errors.
557 ///
558 /// # Examples
559 ///
560 /// ```no_run
561 /// use ironflow_engine::context::WorkflowContext;
562 /// use ironflow_engine::operation::Operation;
563 /// use ironflow_engine::error::EngineError;
564 /// use serde_json::{Value, json};
565 /// use std::pin::Pin;
566 /// use std::future::Future;
567 ///
568 /// struct MyOp;
569 /// impl Operation for MyOp {
570 /// fn kind(&self) -> &str { "my-service" }
571 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
572 /// Box::pin(async { Ok(json!({"ok": true})) })
573 /// }
574 /// }
575 ///
576 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
577 /// let result = ctx.operation("call-service", &MyOp).await?;
578 /// println!("output: {}", result.output);
579 /// # Ok(())
580 /// # }
581 /// ```
582 pub async fn operation(
583 &mut self,
584 name: &str,
585 op: &dyn Operation,
586 ) -> Result<StepOutput, EngineError> {
587 let kind = StepKind::Custom(op.kind().to_string());
588 let position = self.position;
589 self.position += 1;
590
591 let step = self
592 .store
593 .create_step(NewStep {
594 run_id: self.run_id,
595 name: name.to_string(),
596 kind,
597 position,
598 input: op.input(),
599 })
600 .await?;
601
602 self.start_step(step.id, Utc::now()).await?;
603
604 let start = Instant::now();
605
606 match op.execute().await {
607 Ok(output_value) => {
608 let duration_ms = start.elapsed().as_millis() as u64;
609 self.total_duration_ms += duration_ms;
610
611 let completed_at = Utc::now();
612 self.store
613 .update_step(
614 step.id,
615 StepUpdate {
616 status: Some(StepStatus::Completed),
617 output: Some(output_value.clone()),
618 duration_ms: Some(duration_ms),
619 cost_usd: Some(Decimal::ZERO),
620 completed_at: Some(completed_at),
621 ..StepUpdate::default()
622 },
623 )
624 .await?;
625
626 info!(
627 run_id = %self.run_id,
628 step = %name,
629 kind = op.kind(),
630 duration_ms,
631 "operation step completed"
632 );
633
634 self.last_step_ids = vec![step.id];
635
636 Ok(StepOutput {
637 output: output_value,
638 duration_ms,
639 cost_usd: Decimal::ZERO,
640 input_tokens: None,
641 output_tokens: None,
642 })
643 }
644 Err(err) => {
645 let completed_at = Utc::now();
646 if let Err(store_err) = self
647 .store
648 .update_step(
649 step.id,
650 StepUpdate {
651 status: Some(StepStatus::Failed),
652 error: Some(err.to_string()),
653 completed_at: Some(completed_at),
654 ..StepUpdate::default()
655 },
656 )
657 .await
658 {
659 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
660 }
661
662 Err(err)
663 }
664 }
665 }
666
667 /// Execute a sub-workflow step.
668 ///
669 /// Creates a child run for the named workflow handler, executes it with
670 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
671 /// the child run ID and aggregated metrics.
672 ///
673 /// Requires the context to be created with
674 /// `with_handler_resolver`.
675 ///
676 /// # Errors
677 ///
678 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
679 /// with the given name, or if no handler resolver is available.
680 ///
681 /// # Examples
682 ///
683 /// ```no_run
684 /// use ironflow_engine::context::WorkflowContext;
685 /// use ironflow_engine::error::EngineError;
686 /// use serde_json::json;
687 ///
688 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
689 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
690 /// # Ok(())
691 /// # }
692 /// ```
693 pub async fn workflow(
694 &mut self,
695 handler: &dyn WorkflowHandler,
696 payload: Value,
697 ) -> Result<StepOutput, EngineError> {
698 let config = WorkflowStepConfig::new(handler.name(), payload);
699 let position = self.position;
700 self.position += 1;
701
702 let step = self
703 .store
704 .create_step(NewStep {
705 run_id: self.run_id,
706 name: config.workflow_name.clone(),
707 kind: StepKind::Workflow,
708 position,
709 input: Some(serde_json::to_value(&config)?),
710 })
711 .await?;
712
713 self.start_step(step.id, Utc::now()).await?;
714
715 match self.execute_child_workflow(&config).await {
716 Ok(output) => {
717 self.total_cost_usd += output.cost_usd;
718 self.total_duration_ms += output.duration_ms;
719
720 let completed_at = Utc::now();
721 self.store
722 .update_step(
723 step.id,
724 StepUpdate {
725 status: Some(StepStatus::Completed),
726 output: Some(output.output.clone()),
727 duration_ms: Some(output.duration_ms),
728 cost_usd: Some(output.cost_usd),
729 completed_at: Some(completed_at),
730 ..StepUpdate::default()
731 },
732 )
733 .await?;
734
735 info!(
736 run_id = %self.run_id,
737 child_workflow = %config.workflow_name,
738 duration_ms = output.duration_ms,
739 "workflow step completed"
740 );
741
742 self.last_step_ids = vec![step.id];
743
744 Ok(output)
745 }
746 Err(err) => {
747 let completed_at = Utc::now();
748 if let Err(store_err) = self
749 .store
750 .update_step(
751 step.id,
752 StepUpdate {
753 status: Some(StepStatus::Failed),
754 error: Some(err.to_string()),
755 completed_at: Some(completed_at),
756 ..StepUpdate::default()
757 },
758 )
759 .await
760 {
761 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
762 }
763
764 Err(err)
765 }
766 }
767 }
768
769 /// Execute a child workflow and return aggregated output.
770 async fn execute_child_workflow(
771 &self,
772 config: &WorkflowStepConfig,
773 ) -> Result<StepOutput, EngineError> {
774 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
775 EngineError::InvalidWorkflow(
776 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
777 )
778 })?;
779
780 let handler = resolver(&config.workflow_name).ok_or_else(|| {
781 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
782 })?;
783
784 let child_run = self
785 .store
786 .create_run(NewRun {
787 workflow_name: config.workflow_name.clone(),
788 trigger: TriggerKind::Workflow,
789 payload: config.payload.clone(),
790 max_retries: 0,
791 })
792 .await?;
793
794 let child_run_id = child_run.id;
795 info!(
796 parent_run_id = %self.run_id,
797 child_run_id = %child_run_id,
798 workflow = %config.workflow_name,
799 "child run created"
800 );
801
802 self.store
803 .update_run_status(child_run_id, RunStatus::Running)
804 .await?;
805
806 let run_start = Instant::now();
807 let mut child_ctx = WorkflowContext {
808 run_id: child_run_id,
809 store: self.store.clone(),
810 provider: self.provider.clone(),
811 handler_resolver: self.handler_resolver.clone(),
812 position: 0,
813 last_step_ids: Vec::new(),
814 total_cost_usd: Decimal::ZERO,
815 total_duration_ms: 0,
816 replay_steps: std::collections::HashMap::new(),
817 };
818
819 let result = handler.execute(&mut child_ctx).await;
820 let total_duration = run_start.elapsed().as_millis() as u64;
821 let completed_at = Utc::now();
822
823 match result {
824 Ok(()) => {
825 self.store
826 .update_run(
827 child_run_id,
828 RunUpdate {
829 status: Some(RunStatus::Completed),
830 cost_usd: Some(child_ctx.total_cost_usd),
831 duration_ms: Some(total_duration),
832 completed_at: Some(completed_at),
833 ..RunUpdate::default()
834 },
835 )
836 .await?;
837
838 Ok(StepOutput {
839 output: serde_json::json!({
840 "run_id": child_run_id,
841 "workflow_name": config.workflow_name,
842 "status": RunStatus::Completed,
843 "cost_usd": child_ctx.total_cost_usd,
844 "duration_ms": total_duration,
845 }),
846 duration_ms: total_duration,
847 cost_usd: child_ctx.total_cost_usd,
848 input_tokens: None,
849 output_tokens: None,
850 })
851 }
852 Err(err) => {
853 if let Err(store_err) = self
854 .store
855 .update_run(
856 child_run_id,
857 RunUpdate {
858 status: Some(RunStatus::Failed),
859 error: Some(err.to_string()),
860 cost_usd: Some(child_ctx.total_cost_usd),
861 duration_ms: Some(total_duration),
862 completed_at: Some(completed_at),
863 ..RunUpdate::default()
864 },
865 )
866 .await
867 {
868 error!(
869 child_run_id = %child_run_id,
870 store_error = %store_err,
871 "failed to persist child run failure"
872 );
873 }
874
875 Err(err)
876 }
877 }
878 }
879
880 /// Try to replay a completed step from a previous execution.
881 ///
882 /// Returns `Some(StepOutput)` if a completed step exists at the given
883 /// position, `None` otherwise.
884 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
885 let step = self.replay_steps.get(&position)?;
886 if step.status.state != StepStatus::Completed {
887 return None;
888 }
889 let output = StepOutput {
890 output: step.output.clone().unwrap_or(Value::Null),
891 duration_ms: step.duration_ms,
892 cost_usd: step.cost_usd,
893 input_tokens: step.input_tokens,
894 output_tokens: step.output_tokens,
895 };
896 self.total_cost_usd += output.cost_usd;
897 self.total_duration_ms += output.duration_ms;
898 self.last_step_ids = vec![step.id];
899 info!(
900 run_id = %self.run_id,
901 step = %step.name,
902 position,
903 "step replayed from previous execution"
904 );
905 Some(output)
906 }
907
908 /// Internal: execute a step with full persistence lifecycle.
909 async fn execute_step(
910 &mut self,
911 name: &str,
912 kind: StepKind,
913 config: StepConfig,
914 ) -> Result<StepOutput, EngineError> {
915 let position = self.position;
916 self.position += 1;
917
918 // Replay: if this step already completed in a prior execution, return cached output.
919 if let Some(output) = self.try_replay_step(position) {
920 return Ok(output);
921 }
922
923 // Create step record in Pending.
924 let step = self
925 .store
926 .create_step(NewStep {
927 run_id: self.run_id,
928 name: name.to_string(),
929 kind,
930 position,
931 input: Some(serde_json::to_value(&config)?),
932 })
933 .await?;
934
935 self.start_step(step.id, Utc::now()).await?;
936
937 match execute_step_config(&config, &self.provider).await {
938 Ok(output) => {
939 self.total_cost_usd += output.cost_usd;
940 self.total_duration_ms += output.duration_ms;
941
942 let completed_at = Utc::now();
943 self.store
944 .update_step(
945 step.id,
946 StepUpdate {
947 status: Some(StepStatus::Completed),
948 output: Some(output.output.clone()),
949 duration_ms: Some(output.duration_ms),
950 cost_usd: Some(output.cost_usd),
951 input_tokens: output.input_tokens,
952 output_tokens: output.output_tokens,
953 completed_at: Some(completed_at),
954 ..StepUpdate::default()
955 },
956 )
957 .await?;
958
959 info!(
960 run_id = %self.run_id,
961 step = %name,
962 duration_ms = output.duration_ms,
963 "step completed"
964 );
965
966 self.last_step_ids = vec![step.id];
967
968 Ok(output)
969 }
970 Err(err) => {
971 let completed_at = Utc::now();
972 if let Err(store_err) = self
973 .store
974 .update_step(
975 step.id,
976 StepUpdate {
977 status: Some(StepStatus::Failed),
978 error: Some(err.to_string()),
979 completed_at: Some(completed_at),
980 ..StepUpdate::default()
981 },
982 )
983 .await
984 {
985 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
986 }
987
988 Err(err)
989 }
990 }
991 }
992
993 /// Record dependency edges and transition a step to Running.
994 ///
995 /// Records edges from `step_id` to all `last_step_ids`, then
996 /// transitions the step to `Running` with the given timestamp.
997 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
998 if !self.last_step_ids.is_empty() {
999 let deps: Vec<NewStepDependency> = self
1000 .last_step_ids
1001 .iter()
1002 .map(|&depends_on| NewStepDependency {
1003 step_id,
1004 depends_on,
1005 })
1006 .collect();
1007 self.store.create_step_dependencies(deps).await?;
1008 }
1009
1010 self.store
1011 .update_step(
1012 step_id,
1013 StepUpdate {
1014 status: Some(StepStatus::Running),
1015 started_at: Some(now),
1016 ..StepUpdate::default()
1017 },
1018 )
1019 .await?;
1020
1021 Ok(())
1022 }
1023
1024 /// Access the store directly (advanced usage).
1025 pub fn store(&self) -> &Arc<dyn RunStore> {
1026 &self.store
1027 }
1028
1029 /// Access the payload that triggered this run.
1030 ///
1031 /// Fetches the run from the store and returns its payload.
1032 ///
1033 /// # Errors
1034 ///
1035 /// Returns [`EngineError::Store`] if the run is not found.
1036 pub async fn payload(&self) -> Result<Value, EngineError> {
1037 let run = self
1038 .store
1039 .get_run(self.run_id)
1040 .await?
1041 .ok_or(EngineError::Store(
1042 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1043 ))?;
1044 Ok(run.payload)
1045 }
1046}
1047
1048impl fmt::Debug for WorkflowContext {
1049 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1050 f.debug_struct("WorkflowContext")
1051 .field("run_id", &self.run_id)
1052 .field("position", &self.position)
1053 .field("total_cost_usd", &self.total_cost_usd)
1054 .finish_non_exhaustive()
1055 }
1056}