ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::fmt;
27use std::sync::Arc;
28use std::time::Instant;
29
30use chrono::{DateTime, Utc};
31use rust_decimal::Decimal;
32use serde_json::Value;
33use tokio::task::JoinSet;
34use tracing::{error, info};
35use uuid::Uuid;
36
37use ironflow_core::provider::AgentProvider;
38use ironflow_store::models::{
39 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
40 StepUpdate, TriggerKind,
41};
42use ironflow_store::store::RunStore;
43
44use crate::config::{
45 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
46};
47use crate::error::EngineError;
48use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
49use crate::handler::WorkflowHandler;
50use crate::operation::Operation;
51
52/// Callback type for resolving workflow handlers by name.
53pub(crate) type HandlerResolver =
54 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
55
56/// Execution context for a single workflow run.
57///
58/// Tracks the current step position and provides convenience methods
59/// for executing operations with automatic persistence.
60///
61/// # Examples
62///
63/// ```no_run
64/// use ironflow_engine::context::WorkflowContext;
65/// use ironflow_engine::config::ShellConfig;
66/// use ironflow_engine::error::EngineError;
67///
68/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
69/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
70/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
71/// # Ok(())
72/// # }
73/// ```
74pub struct WorkflowContext {
75 run_id: Uuid,
76 store: Arc<dyn RunStore>,
77 provider: Arc<dyn AgentProvider>,
78 handler_resolver: Option<HandlerResolver>,
79 position: u32,
80 /// IDs of the last executed step(s) -- used to record DAG dependencies.
81 last_step_ids: Vec<Uuid>,
82 /// Accumulated cost across all steps in this run.
83 total_cost_usd: Decimal,
84 /// Accumulated duration across all steps.
85 total_duration_ms: u64,
86 /// Steps from a previous execution, keyed by position.
87 /// Used when resuming after approval to replay completed steps.
88 replay_steps: std::collections::HashMap<u32, Step>,
89}
90
91impl WorkflowContext {
92 /// Create a new context for a run.
93 ///
94 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
95 /// creates this when executing a [`WorkflowHandler`].
96 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
97 Self {
98 run_id,
99 store,
100 provider,
101 handler_resolver: None,
102 position: 0,
103 last_step_ids: Vec::new(),
104 total_cost_usd: Decimal::ZERO,
105 total_duration_ms: 0,
106 replay_steps: std::collections::HashMap::new(),
107 }
108 }
109
110 /// Create a new context with a handler resolver for sub-workflow support.
111 ///
112 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
113 /// look up registered handlers by name.
114 pub(crate) fn with_handler_resolver(
115 run_id: Uuid,
116 store: Arc<dyn RunStore>,
117 provider: Arc<dyn AgentProvider>,
118 resolver: HandlerResolver,
119 ) -> Self {
120 Self {
121 run_id,
122 store,
123 provider,
124 handler_resolver: Some(resolver),
125 position: 0,
126 last_step_ids: Vec::new(),
127 total_cost_usd: Decimal::ZERO,
128 total_duration_ms: 0,
129 replay_steps: std::collections::HashMap::new(),
130 }
131 }
132
133 /// Load existing steps from the store for replay after approval.
134 ///
135 /// Called by the engine when resuming a run. All completed steps
136 /// and the approved approval step are indexed by position so that
137 /// `execute_step` and `approval` can skip them.
138 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
139 let steps = self.store.list_steps(self.run_id).await?;
140 for step in steps {
141 let dominated = matches!(
142 step.status.state,
143 StepStatus::Completed | StepStatus::Running
144 );
145 if dominated {
146 self.replay_steps.insert(step.position, step);
147 }
148 }
149 Ok(())
150 }
151
152 /// The run ID this context is executing for.
153 pub fn run_id(&self) -> Uuid {
154 self.run_id
155 }
156
157 /// Accumulated cost across all executed steps so far.
158 pub fn total_cost_usd(&self) -> Decimal {
159 self.total_cost_usd
160 }
161
162 /// Accumulated duration across all executed steps so far.
163 pub fn total_duration_ms(&self) -> u64 {
164 self.total_duration_ms
165 }
166
167 /// Execute multiple steps concurrently (wait-all model).
168 ///
169 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
170 /// Each step is recorded with the same `position` (execution wave).
171 /// Dependencies on previous steps are recorded automatically.
172 ///
173 /// When `fail_fast` is true, remaining steps are aborted on the first
174 /// failure. When false, all steps run to completion and the first
175 /// error is returned.
176 ///
177 /// # Errors
178 ///
179 /// Returns [`EngineError`] if any step fails.
180 ///
181 /// # Examples
182 ///
183 /// ```no_run
184 /// use ironflow_engine::context::WorkflowContext;
185 /// use ironflow_engine::config::{StepConfig, ShellConfig};
186 /// use ironflow_engine::error::EngineError;
187 ///
188 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
189 /// let results = ctx.parallel(
190 /// vec![
191 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
192 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
193 /// ],
194 /// true,
195 /// ).await?;
196 ///
197 /// for r in &results {
198 /// println!("{}: {:?}", r.name, r.output.output);
199 /// }
200 /// # Ok(())
201 /// # }
202 /// ```
203 pub async fn parallel(
204 &mut self,
205 steps: Vec<(&str, StepConfig)>,
206 fail_fast: bool,
207 ) -> Result<Vec<ParallelStepResult>, EngineError> {
208 if steps.is_empty() {
209 return Ok(Vec::new());
210 }
211
212 let wave_position = self.position;
213 self.position += 1;
214
215 let now = Utc::now();
216 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
217
218 for (name, config) in &steps {
219 let kind = config.kind();
220 let step = self
221 .store
222 .create_step(NewStep {
223 run_id: self.run_id,
224 name: name.to_string(),
225 kind,
226 position: wave_position,
227 input: Some(serde_json::to_value(config)?),
228 })
229 .await?;
230
231 self.start_step(step.id, now).await?;
232
233 step_records.push((step.id, name.to_string(), config.clone()));
234 }
235
236 let mut join_set = JoinSet::new();
237 for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
238 let provider = self.provider.clone();
239 let config = config.clone();
240 join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
241 }
242
243 // JoinSet returns in completion order; indexed_results restores input order.
244 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
245 vec![None; step_records.len()];
246 let mut first_error: Option<EngineError> = None;
247
248 while let Some(join_result) = join_set.join_next().await {
249 let (idx, step_result) = match join_result {
250 Ok(r) => r,
251 Err(e) => {
252 if first_error.is_none() {
253 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
254 }
255 if fail_fast {
256 join_set.abort_all();
257 }
258 continue;
259 }
260 };
261
262 let (step_id, step_name, _) = &step_records[idx];
263 let completed_at = Utc::now();
264
265 match step_result {
266 Ok(output) => {
267 self.total_cost_usd += output.cost_usd;
268 self.total_duration_ms += output.duration_ms;
269
270 self.store
271 .update_step(
272 *step_id,
273 StepUpdate {
274 status: Some(StepStatus::Completed),
275 output: Some(output.output.clone()),
276 duration_ms: Some(output.duration_ms),
277 cost_usd: Some(output.cost_usd),
278 input_tokens: output.input_tokens,
279 output_tokens: output.output_tokens,
280 completed_at: Some(completed_at),
281 ..StepUpdate::default()
282 },
283 )
284 .await?;
285
286 info!(
287 run_id = %self.run_id,
288 step = %step_name,
289 duration_ms = output.duration_ms,
290 "parallel step completed"
291 );
292
293 indexed_results[idx] = Some(Ok(output));
294 }
295 Err(err) => {
296 let err_msg = err.to_string();
297
298 if let Err(store_err) = self
299 .store
300 .update_step(
301 *step_id,
302 StepUpdate {
303 status: Some(StepStatus::Failed),
304 error: Some(err_msg.clone()),
305 completed_at: Some(completed_at),
306 ..StepUpdate::default()
307 },
308 )
309 .await
310 {
311 tracing::error!(
312 step_id = %step_id,
313 error = %store_err,
314 "failed to persist parallel step failure"
315 );
316 }
317
318 indexed_results[idx] = Some(Err(err_msg.clone()));
319
320 if first_error.is_none() {
321 first_error = Some(err);
322 }
323
324 if fail_fast {
325 join_set.abort_all();
326 }
327 }
328 }
329 }
330
331 if let Some(err) = first_error {
332 return Err(err);
333 }
334
335 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
336
337 // Build results in original order.
338 let results: Vec<ParallelStepResult> = step_records
339 .iter()
340 .enumerate()
341 .map(|(idx, (step_id, name, _))| {
342 let output = match indexed_results[idx].take() {
343 Some(Ok(o)) => o,
344 _ => unreachable!("all steps succeeded if no error returned"),
345 };
346 ParallelStepResult {
347 name: name.clone(),
348 output,
349 step_id: *step_id,
350 }
351 })
352 .collect();
353
354 Ok(results)
355 }
356
357 /// Execute a shell step.
358 ///
359 /// Creates the step record, runs the command, persists the result,
360 /// and returns the output for use in subsequent steps.
361 ///
362 /// # Errors
363 ///
364 /// Returns [`EngineError`] if the command fails or the store errors.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use ironflow_engine::context::WorkflowContext;
370 /// use ironflow_engine::config::ShellConfig;
371 /// use ironflow_engine::error::EngineError;
372 ///
373 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
375 /// println!("stdout: {}", files.output["stdout"]);
376 /// # Ok(())
377 /// # }
378 /// ```
379 pub async fn shell(
380 &mut self,
381 name: &str,
382 config: ShellConfig,
383 ) -> Result<StepOutput, EngineError> {
384 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
385 .await
386 }
387
388 /// Execute an HTTP step.
389 ///
390 /// # Errors
391 ///
392 /// Returns [`EngineError`] if the request fails or the store errors.
393 ///
394 /// # Examples
395 ///
396 /// ```no_run
397 /// use ironflow_engine::context::WorkflowContext;
398 /// use ironflow_engine::config::HttpConfig;
399 /// use ironflow_engine::error::EngineError;
400 ///
401 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
403 /// println!("status: {}", resp.output["status"]);
404 /// # Ok(())
405 /// # }
406 /// ```
407 pub async fn http(
408 &mut self,
409 name: &str,
410 config: HttpConfig,
411 ) -> Result<StepOutput, EngineError> {
412 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
413 .await
414 }
415
416 /// Execute an agent step.
417 ///
418 /// # Errors
419 ///
420 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
421 ///
422 /// # Examples
423 ///
424 /// ```no_run
425 /// use ironflow_engine::context::WorkflowContext;
426 /// use ironflow_engine::config::AgentStepConfig;
427 /// use ironflow_engine::error::EngineError;
428 ///
429 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
430 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
431 /// println!("review: {}", review.output["value"]);
432 /// # Ok(())
433 /// # }
434 /// ```
435 pub async fn agent(
436 &mut self,
437 name: &str,
438 config: AgentStepConfig,
439 ) -> Result<StepOutput, EngineError> {
440 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
441 .await
442 }
443
444 /// Create a human approval gate.
445 ///
446 /// On first execution, records an approval step and returns
447 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
448 /// transitions the run to `AwaitingApproval`.
449 ///
450 /// On resume (after a human approved via the API), the approval step
451 /// is replayed: it is marked as `Completed` and execution continues
452 /// past it. Multiple approval gates in the same handler work -- each
453 /// one pauses and resumes independently.
454 ///
455 /// # Errors
456 ///
457 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
458 /// first execution. Returns other [`EngineError`] variants on store
459 /// failures.
460 ///
461 /// # Examples
462 ///
463 /// ```no_run
464 /// use ironflow_engine::context::WorkflowContext;
465 /// use ironflow_engine::config::ApprovalConfig;
466 /// use ironflow_engine::error::EngineError;
467 ///
468 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
469 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
470 /// // Execution continues here after approval
471 /// # Ok(())
472 /// # }
473 /// ```
474 pub async fn approval(
475 &mut self,
476 name: &str,
477 config: ApprovalConfig,
478 ) -> Result<(), EngineError> {
479 let position = self.position;
480 self.position += 1;
481
482 // Replay: if this approval step exists from a prior execution,
483 // the run was approved -- mark it completed (if not already) and continue.
484 if let Some(existing) = self.replay_steps.get(&position)
485 && existing.kind == StepKind::Approval
486 {
487 if existing.status.state == StepStatus::Running {
488 self.store
489 .update_step(
490 existing.id,
491 StepUpdate {
492 status: Some(StepStatus::Completed),
493 completed_at: Some(Utc::now()),
494 ..StepUpdate::default()
495 },
496 )
497 .await?;
498 }
499
500 self.last_step_ids = vec![existing.id];
501 info!(
502 run_id = %self.run_id,
503 step = %name,
504 position,
505 "approval step replayed (approved)"
506 );
507 return Ok(());
508 }
509
510 // First execution: create the approval step and suspend.
511 let step = self
512 .store
513 .create_step(NewStep {
514 run_id: self.run_id,
515 name: name.to_string(),
516 kind: StepKind::Approval,
517 position,
518 input: Some(serde_json::to_value(&config)?),
519 })
520 .await?;
521
522 self.start_step(step.id, Utc::now()).await?;
523
524 self.last_step_ids = vec![step.id];
525
526 Err(EngineError::ApprovalRequired {
527 run_id: self.run_id,
528 step_id: step.id,
529 message: config.message().to_string(),
530 })
531 }
532
533 /// Execute a custom operation step.
534 ///
535 /// Runs a user-defined [`Operation`] with full step lifecycle management:
536 /// creates the step record, transitions to Running, executes the operation,
537 /// persists the output and duration, and marks the step Completed or Failed.
538 ///
539 /// The operation's [`kind()`](Operation::kind) is stored as
540 /// [`StepKind::Custom`].
541 ///
542 /// # Errors
543 ///
544 /// Returns [`EngineError`] if the operation fails or the store errors.
545 ///
546 /// # Examples
547 ///
548 /// ```no_run
549 /// use ironflow_engine::context::WorkflowContext;
550 /// use ironflow_engine::operation::Operation;
551 /// use ironflow_engine::error::EngineError;
552 /// use serde_json::{Value, json};
553 /// use std::pin::Pin;
554 /// use std::future::Future;
555 ///
556 /// struct MyOp;
557 /// impl Operation for MyOp {
558 /// fn kind(&self) -> &str { "my-service" }
559 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
560 /// Box::pin(async { Ok(json!({"ok": true})) })
561 /// }
562 /// }
563 ///
564 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
565 /// let result = ctx.operation("call-service", &MyOp).await?;
566 /// println!("output: {}", result.output);
567 /// # Ok(())
568 /// # }
569 /// ```
570 pub async fn operation(
571 &mut self,
572 name: &str,
573 op: &dyn Operation,
574 ) -> Result<StepOutput, EngineError> {
575 let kind = StepKind::Custom(op.kind().to_string());
576 let position = self.position;
577 self.position += 1;
578
579 let step = self
580 .store
581 .create_step(NewStep {
582 run_id: self.run_id,
583 name: name.to_string(),
584 kind,
585 position,
586 input: op.input(),
587 })
588 .await?;
589
590 self.start_step(step.id, Utc::now()).await?;
591
592 let start = Instant::now();
593
594 match op.execute().await {
595 Ok(output_value) => {
596 let duration_ms = start.elapsed().as_millis() as u64;
597 self.total_duration_ms += duration_ms;
598
599 let completed_at = Utc::now();
600 self.store
601 .update_step(
602 step.id,
603 StepUpdate {
604 status: Some(StepStatus::Completed),
605 output: Some(output_value.clone()),
606 duration_ms: Some(duration_ms),
607 cost_usd: Some(Decimal::ZERO),
608 completed_at: Some(completed_at),
609 ..StepUpdate::default()
610 },
611 )
612 .await?;
613
614 info!(
615 run_id = %self.run_id,
616 step = %name,
617 kind = op.kind(),
618 duration_ms,
619 "operation step completed"
620 );
621
622 self.last_step_ids = vec![step.id];
623
624 Ok(StepOutput {
625 output: output_value,
626 duration_ms,
627 cost_usd: Decimal::ZERO,
628 input_tokens: None,
629 output_tokens: None,
630 })
631 }
632 Err(err) => {
633 let completed_at = Utc::now();
634 if let Err(store_err) = self
635 .store
636 .update_step(
637 step.id,
638 StepUpdate {
639 status: Some(StepStatus::Failed),
640 error: Some(err.to_string()),
641 completed_at: Some(completed_at),
642 ..StepUpdate::default()
643 },
644 )
645 .await
646 {
647 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
648 }
649
650 Err(err)
651 }
652 }
653 }
654
655 /// Execute a sub-workflow step.
656 ///
657 /// Creates a child run for the named workflow handler, executes it with
658 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
659 /// the child run ID and aggregated metrics.
660 ///
661 /// Requires the context to be created with
662 /// `with_handler_resolver`.
663 ///
664 /// # Errors
665 ///
666 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
667 /// with the given name, or if no handler resolver is available.
668 ///
669 /// # Examples
670 ///
671 /// ```no_run
672 /// use ironflow_engine::context::WorkflowContext;
673 /// use ironflow_engine::error::EngineError;
674 /// use serde_json::json;
675 ///
676 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
677 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
678 /// # Ok(())
679 /// # }
680 /// ```
681 pub async fn workflow(
682 &mut self,
683 handler: &dyn WorkflowHandler,
684 payload: Value,
685 ) -> Result<StepOutput, EngineError> {
686 let config = WorkflowStepConfig::new(handler.name(), payload);
687 let position = self.position;
688 self.position += 1;
689
690 let step = self
691 .store
692 .create_step(NewStep {
693 run_id: self.run_id,
694 name: config.workflow_name.clone(),
695 kind: StepKind::Workflow,
696 position,
697 input: Some(serde_json::to_value(&config)?),
698 })
699 .await?;
700
701 self.start_step(step.id, Utc::now()).await?;
702
703 match self.execute_child_workflow(&config).await {
704 Ok(output) => {
705 self.total_cost_usd += output.cost_usd;
706 self.total_duration_ms += output.duration_ms;
707
708 let completed_at = Utc::now();
709 self.store
710 .update_step(
711 step.id,
712 StepUpdate {
713 status: Some(StepStatus::Completed),
714 output: Some(output.output.clone()),
715 duration_ms: Some(output.duration_ms),
716 cost_usd: Some(output.cost_usd),
717 completed_at: Some(completed_at),
718 ..StepUpdate::default()
719 },
720 )
721 .await?;
722
723 info!(
724 run_id = %self.run_id,
725 child_workflow = %config.workflow_name,
726 duration_ms = output.duration_ms,
727 "workflow step completed"
728 );
729
730 self.last_step_ids = vec![step.id];
731
732 Ok(output)
733 }
734 Err(err) => {
735 let completed_at = Utc::now();
736 if let Err(store_err) = self
737 .store
738 .update_step(
739 step.id,
740 StepUpdate {
741 status: Some(StepStatus::Failed),
742 error: Some(err.to_string()),
743 completed_at: Some(completed_at),
744 ..StepUpdate::default()
745 },
746 )
747 .await
748 {
749 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
750 }
751
752 Err(err)
753 }
754 }
755 }
756
757 /// Execute a child workflow and return aggregated output.
758 async fn execute_child_workflow(
759 &self,
760 config: &WorkflowStepConfig,
761 ) -> Result<StepOutput, EngineError> {
762 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
763 EngineError::InvalidWorkflow(
764 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
765 )
766 })?;
767
768 let handler = resolver(&config.workflow_name).ok_or_else(|| {
769 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
770 })?;
771
772 let child_run = self
773 .store
774 .create_run(NewRun {
775 workflow_name: config.workflow_name.clone(),
776 trigger: TriggerKind::Workflow,
777 payload: config.payload.clone(),
778 max_retries: 0,
779 })
780 .await?;
781
782 let child_run_id = child_run.id;
783 info!(
784 parent_run_id = %self.run_id,
785 child_run_id = %child_run_id,
786 workflow = %config.workflow_name,
787 "child run created"
788 );
789
790 self.store
791 .update_run_status(child_run_id, RunStatus::Running)
792 .await?;
793
794 let run_start = Instant::now();
795 let mut child_ctx = WorkflowContext {
796 run_id: child_run_id,
797 store: self.store.clone(),
798 provider: self.provider.clone(),
799 handler_resolver: self.handler_resolver.clone(),
800 position: 0,
801 last_step_ids: Vec::new(),
802 total_cost_usd: Decimal::ZERO,
803 total_duration_ms: 0,
804 replay_steps: std::collections::HashMap::new(),
805 };
806
807 let result = handler.execute(&mut child_ctx).await;
808 let total_duration = run_start.elapsed().as_millis() as u64;
809 let completed_at = Utc::now();
810
811 match result {
812 Ok(()) => {
813 self.store
814 .update_run(
815 child_run_id,
816 RunUpdate {
817 status: Some(RunStatus::Completed),
818 cost_usd: Some(child_ctx.total_cost_usd),
819 duration_ms: Some(total_duration),
820 completed_at: Some(completed_at),
821 ..RunUpdate::default()
822 },
823 )
824 .await?;
825
826 Ok(StepOutput {
827 output: serde_json::json!({
828 "run_id": child_run_id,
829 "workflow_name": config.workflow_name,
830 "status": RunStatus::Completed,
831 "cost_usd": child_ctx.total_cost_usd,
832 "duration_ms": total_duration,
833 }),
834 duration_ms: total_duration,
835 cost_usd: child_ctx.total_cost_usd,
836 input_tokens: None,
837 output_tokens: None,
838 })
839 }
840 Err(err) => {
841 if let Err(store_err) = self
842 .store
843 .update_run(
844 child_run_id,
845 RunUpdate {
846 status: Some(RunStatus::Failed),
847 error: Some(err.to_string()),
848 cost_usd: Some(child_ctx.total_cost_usd),
849 duration_ms: Some(total_duration),
850 completed_at: Some(completed_at),
851 ..RunUpdate::default()
852 },
853 )
854 .await
855 {
856 error!(
857 child_run_id = %child_run_id,
858 store_error = %store_err,
859 "failed to persist child run failure"
860 );
861 }
862
863 Err(err)
864 }
865 }
866 }
867
868 /// Try to replay a completed step from a previous execution.
869 ///
870 /// Returns `Some(StepOutput)` if a completed step exists at the given
871 /// position, `None` otherwise.
872 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
873 let step = self.replay_steps.get(&position)?;
874 if step.status.state != StepStatus::Completed {
875 return None;
876 }
877 let output = StepOutput {
878 output: step.output.clone().unwrap_or(Value::Null),
879 duration_ms: step.duration_ms,
880 cost_usd: step.cost_usd,
881 input_tokens: step.input_tokens,
882 output_tokens: step.output_tokens,
883 };
884 self.total_cost_usd += output.cost_usd;
885 self.total_duration_ms += output.duration_ms;
886 self.last_step_ids = vec![step.id];
887 info!(
888 run_id = %self.run_id,
889 step = %step.name,
890 position,
891 "step replayed from previous execution"
892 );
893 Some(output)
894 }
895
896 /// Internal: execute a step with full persistence lifecycle.
897 async fn execute_step(
898 &mut self,
899 name: &str,
900 kind: StepKind,
901 config: StepConfig,
902 ) -> Result<StepOutput, EngineError> {
903 let position = self.position;
904 self.position += 1;
905
906 // Replay: if this step already completed in a prior execution, return cached output.
907 if let Some(output) = self.try_replay_step(position) {
908 return Ok(output);
909 }
910
911 // Create step record in Pending.
912 let step = self
913 .store
914 .create_step(NewStep {
915 run_id: self.run_id,
916 name: name.to_string(),
917 kind,
918 position,
919 input: Some(serde_json::to_value(&config)?),
920 })
921 .await?;
922
923 self.start_step(step.id, Utc::now()).await?;
924
925 match execute_step_config(&config, &self.provider).await {
926 Ok(output) => {
927 self.total_cost_usd += output.cost_usd;
928 self.total_duration_ms += output.duration_ms;
929
930 let completed_at = Utc::now();
931 self.store
932 .update_step(
933 step.id,
934 StepUpdate {
935 status: Some(StepStatus::Completed),
936 output: Some(output.output.clone()),
937 duration_ms: Some(output.duration_ms),
938 cost_usd: Some(output.cost_usd),
939 input_tokens: output.input_tokens,
940 output_tokens: output.output_tokens,
941 completed_at: Some(completed_at),
942 ..StepUpdate::default()
943 },
944 )
945 .await?;
946
947 info!(
948 run_id = %self.run_id,
949 step = %name,
950 duration_ms = output.duration_ms,
951 "step completed"
952 );
953
954 self.last_step_ids = vec![step.id];
955
956 Ok(output)
957 }
958 Err(err) => {
959 let completed_at = Utc::now();
960 if let Err(store_err) = self
961 .store
962 .update_step(
963 step.id,
964 StepUpdate {
965 status: Some(StepStatus::Failed),
966 error: Some(err.to_string()),
967 completed_at: Some(completed_at),
968 ..StepUpdate::default()
969 },
970 )
971 .await
972 {
973 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
974 }
975
976 Err(err)
977 }
978 }
979 }
980
981 /// Record dependency edges and transition a step to Running.
982 ///
983 /// Records edges from `step_id` to all `last_step_ids`, then
984 /// transitions the step to `Running` with the given timestamp.
985 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
986 if !self.last_step_ids.is_empty() {
987 let deps: Vec<NewStepDependency> = self
988 .last_step_ids
989 .iter()
990 .map(|&depends_on| NewStepDependency {
991 step_id,
992 depends_on,
993 })
994 .collect();
995 self.store.create_step_dependencies(deps).await?;
996 }
997
998 self.store
999 .update_step(
1000 step_id,
1001 StepUpdate {
1002 status: Some(StepStatus::Running),
1003 started_at: Some(now),
1004 ..StepUpdate::default()
1005 },
1006 )
1007 .await?;
1008
1009 Ok(())
1010 }
1011
1012 /// Access the store directly (advanced usage).
1013 pub fn store(&self) -> &Arc<dyn RunStore> {
1014 &self.store
1015 }
1016
1017 /// Access the payload that triggered this run.
1018 ///
1019 /// Fetches the run from the store and returns its payload.
1020 ///
1021 /// # Errors
1022 ///
1023 /// Returns [`EngineError::Store`] if the run is not found.
1024 pub async fn payload(&self) -> Result<Value, EngineError> {
1025 let run = self
1026 .store
1027 .get_run(self.run_id)
1028 .await?
1029 .ok_or(EngineError::Store(
1030 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1031 ))?;
1032 Ok(run.payload)
1033 }
1034}
1035
1036impl fmt::Debug for WorkflowContext {
1037 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1038 f.debug_struct("WorkflowContext")
1039 .field("run_id", &self.run_id)
1040 .field("position", &self.position)
1041 .field("total_cost_usd", &self.total_cost_usd)
1042 .finish_non_exhaustive()
1043 }
1044}