ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::time::Instant;
28
29use chrono::{DateTime, Utc};
30use rust_decimal::Decimal;
31use serde_json::Value;
32use tracing::{error, info};
33use uuid::Uuid;
34
35use ironflow_core::provider::AgentProvider;
36use ironflow_store::models::{
37 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, StepKind, StepStatus, StepUpdate,
38 TriggerKind,
39};
40use ironflow_store::store::RunStore;
41
42use crate::config::{AgentStepConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig};
43use crate::error::EngineError;
44use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
45use crate::handler::WorkflowHandler;
46use crate::operation::Operation;
47
48/// Callback type for resolving workflow handlers by name.
49pub(crate) type HandlerResolver =
50 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
51
52/// Execution context for a single workflow run.
53///
54/// Tracks the current step position and provides convenience methods
55/// for executing operations with automatic persistence.
56///
57/// # Examples
58///
59/// ```no_run
60/// use ironflow_engine::context::WorkflowContext;
61/// use ironflow_engine::config::ShellConfig;
62/// use ironflow_engine::error::EngineError;
63///
64/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
65/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
66/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
67/// # Ok(())
68/// # }
69/// ```
70pub struct WorkflowContext {
71 run_id: Uuid,
72 store: Arc<dyn RunStore>,
73 provider: Arc<dyn AgentProvider>,
74 handler_resolver: Option<HandlerResolver>,
75 position: u32,
76 /// IDs of the last executed step(s) -- used to record DAG dependencies.
77 last_step_ids: Vec<Uuid>,
78 /// Accumulated cost across all steps in this run.
79 total_cost_usd: Decimal,
80 /// Accumulated duration across all steps.
81 total_duration_ms: u64,
82}
83
84impl WorkflowContext {
85 /// Create a new context for a run.
86 ///
87 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
88 /// creates this when executing a [`WorkflowHandler`].
89 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
90 Self {
91 run_id,
92 store,
93 provider,
94 handler_resolver: None,
95 position: 0,
96 last_step_ids: Vec::new(),
97 total_cost_usd: Decimal::ZERO,
98 total_duration_ms: 0,
99 }
100 }
101
102 /// Create a new context with a handler resolver for sub-workflow support.
103 ///
104 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
105 /// look up registered handlers by name.
106 pub(crate) fn with_handler_resolver(
107 run_id: Uuid,
108 store: Arc<dyn RunStore>,
109 provider: Arc<dyn AgentProvider>,
110 resolver: HandlerResolver,
111 ) -> Self {
112 Self {
113 run_id,
114 store,
115 provider,
116 handler_resolver: Some(resolver),
117 position: 0,
118 last_step_ids: Vec::new(),
119 total_cost_usd: Decimal::ZERO,
120 total_duration_ms: 0,
121 }
122 }
123
124 /// The run ID this context is executing for.
125 pub fn run_id(&self) -> Uuid {
126 self.run_id
127 }
128
129 /// Accumulated cost across all executed steps so far.
130 pub fn total_cost_usd(&self) -> Decimal {
131 self.total_cost_usd
132 }
133
134 /// Accumulated duration across all executed steps so far.
135 pub fn total_duration_ms(&self) -> u64 {
136 self.total_duration_ms
137 }
138
139 /// Execute multiple steps concurrently (wait-all model).
140 ///
141 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
142 /// Each step is recorded with the same `position` (execution wave).
143 /// Dependencies on previous steps are recorded automatically.
144 ///
145 /// When `fail_fast` is true, remaining steps are aborted on the first
146 /// failure. When false, all steps run to completion and the first
147 /// error is returned.
148 ///
149 /// # Errors
150 ///
151 /// Returns [`EngineError`] if any step fails.
152 ///
153 /// # Examples
154 ///
155 /// ```no_run
156 /// use ironflow_engine::context::WorkflowContext;
157 /// use ironflow_engine::config::{StepConfig, ShellConfig};
158 /// use ironflow_engine::error::EngineError;
159 ///
160 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
161 /// let results = ctx.parallel(
162 /// vec![
163 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
164 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
165 /// ],
166 /// true,
167 /// ).await?;
168 ///
169 /// for r in &results {
170 /// println!("{}: {:?}", r.name, r.output.output);
171 /// }
172 /// # Ok(())
173 /// # }
174 /// ```
175 pub async fn parallel(
176 &mut self,
177 steps: Vec<(&str, StepConfig)>,
178 fail_fast: bool,
179 ) -> Result<Vec<ParallelStepResult>, EngineError> {
180 if steps.is_empty() {
181 return Ok(Vec::new());
182 }
183
184 let wave_position = self.position;
185 self.position += 1;
186
187 let now = Utc::now();
188 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
189
190 for (name, config) in &steps {
191 let kind = config.kind();
192 let step = self
193 .store
194 .create_step(NewStep {
195 run_id: self.run_id,
196 name: name.to_string(),
197 kind,
198 position: wave_position,
199 input: Some(serde_json::to_value(config)?),
200 })
201 .await?;
202
203 self.start_step(step.id, now).await?;
204
205 step_records.push((step.id, name.to_string(), config.clone()));
206 }
207
208 let mut join_set = tokio::task::JoinSet::new();
209 for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
210 let provider = self.provider.clone();
211 let config = config.clone();
212 join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
213 }
214
215 // JoinSet returns in completion order; indexed_results restores input order.
216 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
217 vec![None; step_records.len()];
218 let mut first_error: Option<EngineError> = None;
219
220 while let Some(join_result) = join_set.join_next().await {
221 let (idx, step_result) = match join_result {
222 Ok(r) => r,
223 Err(e) => {
224 if first_error.is_none() {
225 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
226 }
227 if fail_fast {
228 join_set.abort_all();
229 }
230 continue;
231 }
232 };
233
234 let (step_id, step_name, _) = &step_records[idx];
235 let completed_at = Utc::now();
236
237 match step_result {
238 Ok(output) => {
239 self.total_cost_usd += output.cost_usd;
240 self.total_duration_ms += output.duration_ms;
241
242 self.store
243 .update_step(
244 *step_id,
245 StepUpdate {
246 status: Some(StepStatus::Completed),
247 output: Some(output.output.clone()),
248 duration_ms: Some(output.duration_ms),
249 cost_usd: Some(output.cost_usd),
250 input_tokens: output.input_tokens,
251 output_tokens: output.output_tokens,
252 completed_at: Some(completed_at),
253 ..StepUpdate::default()
254 },
255 )
256 .await?;
257
258 info!(
259 run_id = %self.run_id,
260 step = %step_name,
261 duration_ms = output.duration_ms,
262 "parallel step completed"
263 );
264
265 indexed_results[idx] = Some(Ok(output));
266 }
267 Err(err) => {
268 let err_msg = err.to_string();
269
270 if let Err(store_err) = self
271 .store
272 .update_step(
273 *step_id,
274 StepUpdate {
275 status: Some(StepStatus::Failed),
276 error: Some(err_msg.clone()),
277 completed_at: Some(completed_at),
278 ..StepUpdate::default()
279 },
280 )
281 .await
282 {
283 tracing::error!(
284 step_id = %step_id,
285 error = %store_err,
286 "failed to persist parallel step failure"
287 );
288 }
289
290 indexed_results[idx] = Some(Err(err_msg.clone()));
291
292 if first_error.is_none() {
293 first_error = Some(err);
294 }
295
296 if fail_fast {
297 join_set.abort_all();
298 }
299 }
300 }
301 }
302
303 if let Some(err) = first_error {
304 return Err(err);
305 }
306
307 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
308
309 // Build results in original order.
310 let results: Vec<ParallelStepResult> = step_records
311 .iter()
312 .enumerate()
313 .map(|(idx, (step_id, name, _))| {
314 let output = match indexed_results[idx].take() {
315 Some(Ok(o)) => o,
316 _ => unreachable!("all steps succeeded if no error returned"),
317 };
318 ParallelStepResult {
319 name: name.clone(),
320 output,
321 step_id: *step_id,
322 }
323 })
324 .collect();
325
326 Ok(results)
327 }
328
329 /// Execute a shell step.
330 ///
331 /// Creates the step record, runs the command, persists the result,
332 /// and returns the output for use in subsequent steps.
333 ///
334 /// # Errors
335 ///
336 /// Returns [`EngineError`] if the command fails or the store errors.
337 ///
338 /// # Examples
339 ///
340 /// ```no_run
341 /// use ironflow_engine::context::WorkflowContext;
342 /// use ironflow_engine::config::ShellConfig;
343 /// use ironflow_engine::error::EngineError;
344 ///
345 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
346 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
347 /// println!("stdout: {}", files.output["stdout"]);
348 /// # Ok(())
349 /// # }
350 /// ```
351 pub async fn shell(
352 &mut self,
353 name: &str,
354 config: ShellConfig,
355 ) -> Result<StepOutput, EngineError> {
356 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
357 .await
358 }
359
360 /// Execute an HTTP step.
361 ///
362 /// # Errors
363 ///
364 /// Returns [`EngineError`] if the request fails or the store errors.
365 ///
366 /// # Examples
367 ///
368 /// ```no_run
369 /// use ironflow_engine::context::WorkflowContext;
370 /// use ironflow_engine::config::HttpConfig;
371 /// use ironflow_engine::error::EngineError;
372 ///
373 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
374 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
375 /// println!("status: {}", resp.output["status"]);
376 /// # Ok(())
377 /// # }
378 /// ```
379 pub async fn http(
380 &mut self,
381 name: &str,
382 config: HttpConfig,
383 ) -> Result<StepOutput, EngineError> {
384 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
385 .await
386 }
387
388 /// Execute an agent step.
389 ///
390 /// # Errors
391 ///
392 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
393 ///
394 /// # Examples
395 ///
396 /// ```no_run
397 /// use ironflow_engine::context::WorkflowContext;
398 /// use ironflow_engine::config::AgentStepConfig;
399 /// use ironflow_engine::error::EngineError;
400 ///
401 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
402 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
403 /// println!("review: {}", review.output["value"]);
404 /// # Ok(())
405 /// # }
406 /// ```
407 pub async fn agent(
408 &mut self,
409 name: &str,
410 config: AgentStepConfig,
411 ) -> Result<StepOutput, EngineError> {
412 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
413 .await
414 }
415
416 /// Execute a custom operation step.
417 ///
418 /// Runs a user-defined [`Operation`] with full step lifecycle management:
419 /// creates the step record, transitions to Running, executes the operation,
420 /// persists the output and duration, and marks the step Completed or Failed.
421 ///
422 /// The operation's [`kind()`](Operation::kind) is stored as
423 /// [`StepKind::Custom`].
424 ///
425 /// # Errors
426 ///
427 /// Returns [`EngineError`] if the operation fails or the store errors.
428 ///
429 /// # Examples
430 ///
431 /// ```no_run
432 /// use ironflow_engine::context::WorkflowContext;
433 /// use ironflow_engine::operation::Operation;
434 /// use ironflow_engine::error::EngineError;
435 /// use serde_json::{Value, json};
436 /// use std::pin::Pin;
437 /// use std::future::Future;
438 ///
439 /// struct MyOp;
440 /// impl Operation for MyOp {
441 /// fn kind(&self) -> &str { "my-service" }
442 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
443 /// Box::pin(async { Ok(json!({"ok": true})) })
444 /// }
445 /// }
446 ///
447 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
448 /// let result = ctx.operation("call-service", &MyOp).await?;
449 /// println!("output: {}", result.output);
450 /// # Ok(())
451 /// # }
452 /// ```
453 pub async fn operation(
454 &mut self,
455 name: &str,
456 op: &dyn Operation,
457 ) -> Result<StepOutput, EngineError> {
458 let kind = StepKind::Custom(op.kind().to_string());
459 let position = self.position;
460 self.position += 1;
461
462 let step = self
463 .store
464 .create_step(NewStep {
465 run_id: self.run_id,
466 name: name.to_string(),
467 kind,
468 position,
469 input: op.input(),
470 })
471 .await?;
472
473 self.start_step(step.id, Utc::now()).await?;
474
475 let start = std::time::Instant::now();
476
477 match op.execute().await {
478 Ok(output_value) => {
479 let duration_ms = start.elapsed().as_millis() as u64;
480 self.total_duration_ms += duration_ms;
481
482 let completed_at = Utc::now();
483 self.store
484 .update_step(
485 step.id,
486 StepUpdate {
487 status: Some(StepStatus::Completed),
488 output: Some(output_value.clone()),
489 duration_ms: Some(duration_ms),
490 cost_usd: Some(Decimal::ZERO),
491 completed_at: Some(completed_at),
492 ..StepUpdate::default()
493 },
494 )
495 .await?;
496
497 info!(
498 run_id = %self.run_id,
499 step = %name,
500 kind = op.kind(),
501 duration_ms,
502 "operation step completed"
503 );
504
505 self.last_step_ids = vec![step.id];
506
507 Ok(StepOutput {
508 output: output_value,
509 duration_ms,
510 cost_usd: Decimal::ZERO,
511 input_tokens: None,
512 output_tokens: None,
513 })
514 }
515 Err(err) => {
516 let completed_at = Utc::now();
517 if let Err(store_err) = self
518 .store
519 .update_step(
520 step.id,
521 StepUpdate {
522 status: Some(StepStatus::Failed),
523 error: Some(err.to_string()),
524 completed_at: Some(completed_at),
525 ..StepUpdate::default()
526 },
527 )
528 .await
529 {
530 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
531 }
532
533 Err(err)
534 }
535 }
536 }
537
538 /// Execute a sub-workflow step.
539 ///
540 /// Creates a child run for the named workflow handler, executes it with
541 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
542 /// the child run ID and aggregated metrics.
543 ///
544 /// Requires the context to be created with
545 /// `with_handler_resolver`.
546 ///
547 /// # Errors
548 ///
549 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
550 /// with the given name, or if no handler resolver is available.
551 ///
552 /// # Examples
553 ///
554 /// ```no_run
555 /// use ironflow_engine::context::WorkflowContext;
556 /// use ironflow_engine::error::EngineError;
557 /// use serde_json::json;
558 ///
559 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
560 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
561 /// # Ok(())
562 /// # }
563 /// ```
564 pub async fn workflow(
565 &mut self,
566 handler: &dyn WorkflowHandler,
567 payload: Value,
568 ) -> Result<StepOutput, EngineError> {
569 let config = WorkflowStepConfig::new(handler.name(), payload);
570 let position = self.position;
571 self.position += 1;
572
573 let step = self
574 .store
575 .create_step(NewStep {
576 run_id: self.run_id,
577 name: config.workflow_name.clone(),
578 kind: StepKind::Workflow,
579 position,
580 input: Some(serde_json::to_value(&config)?),
581 })
582 .await?;
583
584 self.start_step(step.id, Utc::now()).await?;
585
586 match self.execute_child_workflow(&config).await {
587 Ok(output) => {
588 self.total_cost_usd += output.cost_usd;
589 self.total_duration_ms += output.duration_ms;
590
591 let completed_at = Utc::now();
592 self.store
593 .update_step(
594 step.id,
595 StepUpdate {
596 status: Some(StepStatus::Completed),
597 output: Some(output.output.clone()),
598 duration_ms: Some(output.duration_ms),
599 cost_usd: Some(output.cost_usd),
600 completed_at: Some(completed_at),
601 ..StepUpdate::default()
602 },
603 )
604 .await?;
605
606 info!(
607 run_id = %self.run_id,
608 child_workflow = %config.workflow_name,
609 duration_ms = output.duration_ms,
610 "workflow step completed"
611 );
612
613 self.last_step_ids = vec![step.id];
614
615 Ok(output)
616 }
617 Err(err) => {
618 let completed_at = Utc::now();
619 if let Err(store_err) = self
620 .store
621 .update_step(
622 step.id,
623 StepUpdate {
624 status: Some(StepStatus::Failed),
625 error: Some(err.to_string()),
626 completed_at: Some(completed_at),
627 ..StepUpdate::default()
628 },
629 )
630 .await
631 {
632 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
633 }
634
635 Err(err)
636 }
637 }
638 }
639
640 /// Execute a child workflow and return aggregated output.
641 async fn execute_child_workflow(
642 &self,
643 config: &WorkflowStepConfig,
644 ) -> Result<StepOutput, EngineError> {
645 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
646 EngineError::InvalidWorkflow(
647 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
648 )
649 })?;
650
651 let handler = resolver(&config.workflow_name).ok_or_else(|| {
652 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
653 })?;
654
655 let child_run = self
656 .store
657 .create_run(NewRun {
658 workflow_name: config.workflow_name.clone(),
659 trigger: TriggerKind::Workflow,
660 payload: config.payload.clone(),
661 max_retries: 0,
662 })
663 .await?;
664
665 let child_run_id = child_run.id;
666 info!(
667 parent_run_id = %self.run_id,
668 child_run_id = %child_run_id,
669 workflow = %config.workflow_name,
670 "child run created"
671 );
672
673 self.store
674 .update_run_status(child_run_id, RunStatus::Running)
675 .await?;
676
677 let run_start = Instant::now();
678 let mut child_ctx = WorkflowContext {
679 run_id: child_run_id,
680 store: self.store.clone(),
681 provider: self.provider.clone(),
682 handler_resolver: self.handler_resolver.clone(),
683 position: 0,
684 last_step_ids: Vec::new(),
685 total_cost_usd: Decimal::ZERO,
686 total_duration_ms: 0,
687 };
688
689 let result = handler.execute(&mut child_ctx).await;
690 let total_duration = run_start.elapsed().as_millis() as u64;
691 let completed_at = Utc::now();
692
693 match result {
694 Ok(()) => {
695 self.store
696 .update_run(
697 child_run_id,
698 RunUpdate {
699 status: Some(RunStatus::Completed),
700 cost_usd: Some(child_ctx.total_cost_usd),
701 duration_ms: Some(total_duration),
702 completed_at: Some(completed_at),
703 ..RunUpdate::default()
704 },
705 )
706 .await?;
707
708 Ok(StepOutput {
709 output: serde_json::json!({
710 "run_id": child_run_id,
711 "workflow_name": config.workflow_name,
712 "status": RunStatus::Completed,
713 "cost_usd": child_ctx.total_cost_usd,
714 "duration_ms": total_duration,
715 }),
716 duration_ms: total_duration,
717 cost_usd: child_ctx.total_cost_usd,
718 input_tokens: None,
719 output_tokens: None,
720 })
721 }
722 Err(err) => {
723 if let Err(store_err) = self
724 .store
725 .update_run(
726 child_run_id,
727 RunUpdate {
728 status: Some(RunStatus::Failed),
729 error: Some(err.to_string()),
730 cost_usd: Some(child_ctx.total_cost_usd),
731 duration_ms: Some(total_duration),
732 completed_at: Some(completed_at),
733 ..RunUpdate::default()
734 },
735 )
736 .await
737 {
738 error!(
739 child_run_id = %child_run_id,
740 store_error = %store_err,
741 "failed to persist child run failure"
742 );
743 }
744
745 Err(err)
746 }
747 }
748 }
749
750 /// Internal: execute a step with full persistence lifecycle.
751 async fn execute_step(
752 &mut self,
753 name: &str,
754 kind: StepKind,
755 config: StepConfig,
756 ) -> Result<StepOutput, EngineError> {
757 let position = self.position;
758 self.position += 1;
759
760 // Create step record in Pending.
761 let step = self
762 .store
763 .create_step(NewStep {
764 run_id: self.run_id,
765 name: name.to_string(),
766 kind,
767 position,
768 input: Some(serde_json::to_value(&config)?),
769 })
770 .await?;
771
772 self.start_step(step.id, Utc::now()).await?;
773
774 match execute_step_config(&config, &self.provider).await {
775 Ok(output) => {
776 self.total_cost_usd += output.cost_usd;
777 self.total_duration_ms += output.duration_ms;
778
779 let completed_at = Utc::now();
780 self.store
781 .update_step(
782 step.id,
783 StepUpdate {
784 status: Some(StepStatus::Completed),
785 output: Some(output.output.clone()),
786 duration_ms: Some(output.duration_ms),
787 cost_usd: Some(output.cost_usd),
788 input_tokens: output.input_tokens,
789 output_tokens: output.output_tokens,
790 completed_at: Some(completed_at),
791 ..StepUpdate::default()
792 },
793 )
794 .await?;
795
796 info!(
797 run_id = %self.run_id,
798 step = %name,
799 duration_ms = output.duration_ms,
800 "step completed"
801 );
802
803 self.last_step_ids = vec![step.id];
804
805 Ok(output)
806 }
807 Err(err) => {
808 let completed_at = Utc::now();
809 if let Err(store_err) = self
810 .store
811 .update_step(
812 step.id,
813 StepUpdate {
814 status: Some(StepStatus::Failed),
815 error: Some(err.to_string()),
816 completed_at: Some(completed_at),
817 ..StepUpdate::default()
818 },
819 )
820 .await
821 {
822 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
823 }
824
825 Err(err)
826 }
827 }
828 }
829
830 /// Record dependency edges and transition a step to Running.
831 ///
832 /// Records edges from `step_id` to all `last_step_ids`, then
833 /// transitions the step to `Running` with the given timestamp.
834 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
835 if !self.last_step_ids.is_empty() {
836 let deps: Vec<NewStepDependency> = self
837 .last_step_ids
838 .iter()
839 .map(|&depends_on| NewStepDependency {
840 step_id,
841 depends_on,
842 })
843 .collect();
844 self.store.create_step_dependencies(deps).await?;
845 }
846
847 self.store
848 .update_step(
849 step_id,
850 StepUpdate {
851 status: Some(StepStatus::Running),
852 started_at: Some(now),
853 ..StepUpdate::default()
854 },
855 )
856 .await?;
857
858 Ok(())
859 }
860
861 /// Access the store directly (advanced usage).
862 pub fn store(&self) -> &Arc<dyn RunStore> {
863 &self.store
864 }
865
866 /// Access the payload that triggered this run.
867 ///
868 /// Fetches the run from the store and returns its payload.
869 ///
870 /// # Errors
871 ///
872 /// Returns [`EngineError::Store`] if the run is not found.
873 pub async fn payload(&self) -> Result<Value, EngineError> {
874 let run = self
875 .store
876 .get_run(self.run_id)
877 .await?
878 .ok_or(EngineError::Store(
879 ironflow_store::error::StoreError::RunNotFound(self.run_id),
880 ))?;
881 Ok(run.payload)
882 }
883}
884
885impl std::fmt::Debug for WorkflowContext {
886 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
887 f.debug_struct("WorkflowContext")
888 .field("run_id", &self.run_id)
889 .field("position", &self.position)
890 .field("total_cost_usd", &self.total_cost_usd)
891 .finish_non_exhaustive()
892 }
893}