ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42 StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::operation::Operation;
53
54/// Callback type for resolving workflow handlers by name.
55pub(crate) type HandlerResolver =
56 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
57
58/// Execution context for a single workflow run.
59///
60/// Tracks the current step position and provides convenience methods
61/// for executing operations with automatic persistence.
62///
63/// # Examples
64///
65/// ```no_run
66/// use ironflow_engine::context::WorkflowContext;
67/// use ironflow_engine::config::ShellConfig;
68/// use ironflow_engine::error::EngineError;
69///
70/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
71/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
72/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
73/// # Ok(())
74/// # }
75/// ```
76pub struct WorkflowContext {
77 run_id: Uuid,
78 store: Arc<dyn Store>,
79 provider: Arc<dyn AgentProvider>,
80 handler_resolver: Option<HandlerResolver>,
81 position: u32,
82 /// IDs of the last executed step(s) -- used to record DAG dependencies.
83 last_step_ids: Vec<Uuid>,
84 /// Accumulated cost across all steps in this run.
85 total_cost_usd: Decimal,
86 /// Accumulated duration across all steps.
87 total_duration_ms: u64,
88 /// Steps from a previous execution, keyed by position.
89 /// Used when resuming after approval to replay completed steps.
90 replay_steps: HashMap<u32, Step>,
91}
92
93impl WorkflowContext {
94 /// Create a new context for a run.
95 ///
96 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
97 /// creates this when executing a [`WorkflowHandler`].
98 pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
99 Self {
100 run_id,
101 store,
102 provider,
103 handler_resolver: None,
104 position: 0,
105 last_step_ids: Vec::new(),
106 total_cost_usd: Decimal::ZERO,
107 total_duration_ms: 0,
108 replay_steps: HashMap::new(),
109 }
110 }
111
112 /// Create a new context with a handler resolver for sub-workflow support.
113 ///
114 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
115 /// look up registered handlers by name.
116 pub(crate) fn with_handler_resolver(
117 run_id: Uuid,
118 store: Arc<dyn Store>,
119 provider: Arc<dyn AgentProvider>,
120 resolver: HandlerResolver,
121 ) -> Self {
122 Self {
123 run_id,
124 store,
125 provider,
126 handler_resolver: Some(resolver),
127 position: 0,
128 last_step_ids: Vec::new(),
129 total_cost_usd: Decimal::ZERO,
130 total_duration_ms: 0,
131 replay_steps: HashMap::new(),
132 }
133 }
134
135 /// Load existing steps from the store for replay after approval.
136 ///
137 /// Called by the engine when resuming a run. All completed steps
138 /// and the approved approval step are indexed by position so that
139 /// `execute_step` and `approval` can skip them.
140 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
141 let steps = self.store.list_steps(self.run_id).await?;
142 for step in steps {
143 let dominated = matches!(
144 step.status.state,
145 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
146 );
147 if dominated {
148 self.replay_steps.insert(step.position, step);
149 }
150 }
151 Ok(())
152 }
153
154 /// The run ID this context is executing for.
155 pub fn run_id(&self) -> Uuid {
156 self.run_id
157 }
158
159 /// Accumulated cost across all executed steps so far.
160 pub fn total_cost_usd(&self) -> Decimal {
161 self.total_cost_usd
162 }
163
164 /// Accumulated duration across all executed steps so far.
165 pub fn total_duration_ms(&self) -> u64 {
166 self.total_duration_ms
167 }
168
169 /// Execute multiple steps concurrently (wait-all model).
170 ///
171 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
172 /// Each step is recorded with the same `position` (execution wave).
173 /// Dependencies on previous steps are recorded automatically.
174 ///
175 /// When `fail_fast` is true, remaining steps are aborted on the first
176 /// failure. When false, all steps run to completion and the first
177 /// error is returned.
178 ///
179 /// # Errors
180 ///
181 /// Returns [`EngineError`] if any step fails.
182 ///
183 /// # Examples
184 ///
185 /// ```no_run
186 /// use ironflow_engine::context::WorkflowContext;
187 /// use ironflow_engine::config::{StepConfig, ShellConfig};
188 /// use ironflow_engine::error::EngineError;
189 ///
190 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
191 /// let results = ctx.parallel(
192 /// vec![
193 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
194 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
195 /// ],
196 /// true,
197 /// ).await?;
198 ///
199 /// for r in &results {
200 /// println!("{}: {:?}", r.name, r.output.output);
201 /// }
202 /// # Ok(())
203 /// # }
204 /// ```
205 pub async fn parallel(
206 &mut self,
207 steps: Vec<(&str, StepConfig)>,
208 fail_fast: bool,
209 ) -> Result<Vec<ParallelStepResult>, EngineError> {
210 if steps.is_empty() {
211 return Ok(Vec::new());
212 }
213
214 let wave_position = self.position;
215 self.position += 1;
216
217 let now = Utc::now();
218 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
219
220 for (name, config) in &steps {
221 let kind = config.kind();
222 let step = self
223 .store
224 .create_step(NewStep {
225 run_id: self.run_id,
226 name: name.to_string(),
227 kind,
228 position: wave_position,
229 input: Some(serde_json::to_value(config)?),
230 })
231 .await?;
232
233 self.start_step(step.id, now).await?;
234
235 step_records.push((step.id, name.to_string(), config.clone()));
236 }
237
238 let mut join_set = JoinSet::new();
239 for (idx, (_id, _name, config)) in step_records.iter().enumerate() {
240 let provider = self.provider.clone();
241 let config = config.clone();
242 join_set.spawn(async move { (idx, execute_step_config(&config, &provider).await) });
243 }
244
245 // JoinSet returns in completion order; indexed_results restores input order.
246 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
247 vec![None; step_records.len()];
248 let mut first_error: Option<EngineError> = None;
249
250 while let Some(join_result) = join_set.join_next().await {
251 let (idx, step_result) = match join_result {
252 Ok(r) => r,
253 Err(e) => {
254 if first_error.is_none() {
255 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
256 }
257 if fail_fast {
258 join_set.abort_all();
259 }
260 continue;
261 }
262 };
263
264 let (step_id, step_name, _) = &step_records[idx];
265 let completed_at = Utc::now();
266
267 match step_result {
268 Ok(output) => {
269 self.total_cost_usd += output.cost_usd;
270 self.total_duration_ms += output.duration_ms;
271
272 let debug_messages_json = output.debug_messages_json();
273
274 self.store
275 .update_step(
276 *step_id,
277 StepUpdate {
278 status: Some(StepStatus::Completed),
279 output: Some(output.output.clone()),
280 duration_ms: Some(output.duration_ms),
281 cost_usd: Some(output.cost_usd),
282 input_tokens: output.input_tokens,
283 output_tokens: output.output_tokens,
284 completed_at: Some(completed_at),
285 debug_messages: debug_messages_json,
286 ..StepUpdate::default()
287 },
288 )
289 .await?;
290
291 info!(
292 run_id = %self.run_id,
293 step = %step_name,
294 duration_ms = output.duration_ms,
295 "parallel step completed"
296 );
297
298 indexed_results[idx] = Some(Ok(output));
299 }
300 Err(err) => {
301 let err_msg = err.to_string();
302 let debug_messages_json = extract_debug_messages_from_error(&err);
303 let partial = extract_partial_usage_from_error(&err);
304
305 if let Some(ref usage) = partial {
306 if let Some(cost) = usage.cost_usd {
307 self.total_cost_usd += cost;
308 }
309 if let Some(dur) = usage.duration_ms {
310 self.total_duration_ms += dur;
311 }
312 }
313
314 if let Err(store_err) = self
315 .store
316 .update_step(
317 *step_id,
318 StepUpdate {
319 status: Some(StepStatus::Failed),
320 error: Some(err_msg.clone()),
321 completed_at: Some(completed_at),
322 debug_messages: debug_messages_json,
323 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
324 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
325 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
326 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
327 ..StepUpdate::default()
328 },
329 )
330 .await
331 {
332 tracing::error!(
333 step_id = %step_id,
334 error = %store_err,
335 "failed to persist parallel step failure"
336 );
337 }
338
339 indexed_results[idx] = Some(Err(err_msg.clone()));
340
341 if first_error.is_none() {
342 first_error = Some(err);
343 }
344
345 if fail_fast {
346 join_set.abort_all();
347 }
348 }
349 }
350 }
351
352 if let Some(err) = first_error {
353 return Err(err);
354 }
355
356 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
357
358 // Build results in original order.
359 let results: Vec<ParallelStepResult> = step_records
360 .iter()
361 .enumerate()
362 .map(|(idx, (step_id, name, _))| {
363 let output = match indexed_results[idx].take() {
364 Some(Ok(o)) => o,
365 _ => unreachable!("all steps succeeded if no error returned"),
366 };
367 ParallelStepResult {
368 name: name.clone(),
369 output,
370 step_id: *step_id,
371 }
372 })
373 .collect();
374
375 Ok(results)
376 }
377
378 /// Execute a shell step.
379 ///
380 /// Creates the step record, runs the command, persists the result,
381 /// and returns the output for use in subsequent steps.
382 ///
383 /// # Errors
384 ///
385 /// Returns [`EngineError`] if the command fails or the store errors.
386 ///
387 /// # Examples
388 ///
389 /// ```no_run
390 /// use ironflow_engine::context::WorkflowContext;
391 /// use ironflow_engine::config::ShellConfig;
392 /// use ironflow_engine::error::EngineError;
393 ///
394 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
395 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
396 /// println!("stdout: {}", files.output["stdout"]);
397 /// # Ok(())
398 /// # }
399 /// ```
400 pub async fn shell(
401 &mut self,
402 name: &str,
403 config: ShellConfig,
404 ) -> Result<StepOutput, EngineError> {
405 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
406 .await
407 }
408
409 /// Execute an HTTP step.
410 ///
411 /// # Errors
412 ///
413 /// Returns [`EngineError`] if the request fails or the store errors.
414 ///
415 /// # Examples
416 ///
417 /// ```no_run
418 /// use ironflow_engine::context::WorkflowContext;
419 /// use ironflow_engine::config::HttpConfig;
420 /// use ironflow_engine::error::EngineError;
421 ///
422 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
423 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
424 /// println!("status: {}", resp.output["status"]);
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub async fn http(
429 &mut self,
430 name: &str,
431 config: HttpConfig,
432 ) -> Result<StepOutput, EngineError> {
433 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
434 .await
435 }
436
437 /// Execute an agent step.
438 ///
439 /// # Errors
440 ///
441 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
442 ///
443 /// # Examples
444 ///
445 /// ```no_run
446 /// use ironflow_engine::context::WorkflowContext;
447 /// use ironflow_engine::config::AgentStepConfig;
448 /// use ironflow_engine::error::EngineError;
449 ///
450 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
451 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
452 /// println!("review: {}", review.output["value"]);
453 /// # Ok(())
454 /// # }
455 /// ```
456 pub async fn agent(
457 &mut self,
458 name: &str,
459 config: impl Into<AgentStepConfig>,
460 ) -> Result<StepOutput, EngineError> {
461 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
462 .await
463 }
464
465 /// Create a human approval gate.
466 ///
467 /// On first execution, records an approval step and returns
468 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
469 /// transitions the run to `AwaitingApproval`.
470 ///
471 /// On resume (after a human approved via the API), the approval step
472 /// is replayed: it is marked as `Completed` and execution continues
473 /// past it. Multiple approval gates in the same handler work -- each
474 /// one pauses and resumes independently.
475 ///
476 /// # Errors
477 ///
478 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
479 /// first execution. Returns other [`EngineError`] variants on store
480 /// failures.
481 ///
482 /// # Examples
483 ///
484 /// ```no_run
485 /// use ironflow_engine::context::WorkflowContext;
486 /// use ironflow_engine::config::ApprovalConfig;
487 /// use ironflow_engine::error::EngineError;
488 ///
489 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
490 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
491 /// // Execution continues here after approval
492 /// # Ok(())
493 /// # }
494 /// ```
495 pub async fn approval(
496 &mut self,
497 name: &str,
498 config: ApprovalConfig,
499 ) -> Result<(), EngineError> {
500 let position = self.position;
501 self.position += 1;
502
503 // Replay: if this approval step exists from a prior execution,
504 // the run was approved -- mark it completed (if not already) and continue.
505 if let Some(existing) = self.replay_steps.get(&position)
506 && existing.kind == StepKind::Approval
507 {
508 if existing.status.state == StepStatus::AwaitingApproval {
509 self.store
510 .update_step(
511 existing.id,
512 StepUpdate {
513 status: Some(StepStatus::Completed),
514 completed_at: Some(Utc::now()),
515 ..StepUpdate::default()
516 },
517 )
518 .await?;
519 }
520
521 self.last_step_ids = vec![existing.id];
522 info!(
523 run_id = %self.run_id,
524 step = %name,
525 position,
526 "approval step replayed (approved)"
527 );
528 return Ok(());
529 }
530
531 // First execution: create the approval step and suspend.
532 let step = self
533 .store
534 .create_step(NewStep {
535 run_id: self.run_id,
536 name: name.to_string(),
537 kind: StepKind::Approval,
538 position,
539 input: Some(serde_json::to_value(&config)?),
540 })
541 .await?;
542
543 self.start_step(step.id, Utc::now()).await?;
544
545 // Transition the step to AwaitingApproval so it reflects
546 // the suspended state on the dashboard.
547 self.store
548 .update_step(
549 step.id,
550 StepUpdate {
551 status: Some(StepStatus::AwaitingApproval),
552 ..StepUpdate::default()
553 },
554 )
555 .await?;
556
557 self.last_step_ids = vec![step.id];
558
559 Err(EngineError::ApprovalRequired {
560 run_id: self.run_id,
561 step_id: step.id,
562 message: config.message().to_string(),
563 })
564 }
565
566 /// Record a step as explicitly skipped.
567 ///
568 /// Use this inside an `if`/`else` branch when a step should not execute
569 /// but must still appear in the DAG and timeline with its reason.
570 ///
571 /// The step is created directly in [`StepStatus::Skipped`] state and the
572 /// reason is stored in the output as `{"reason": "..."}`.
573 ///
574 /// # Errors
575 ///
576 /// Returns [`EngineError`] if the store fails.
577 ///
578 /// # Examples
579 ///
580 /// ```no_run
581 /// use ironflow_engine::context::WorkflowContext;
582 /// use ironflow_engine::error::EngineError;
583 ///
584 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
585 /// let tests_passed = false;
586 /// if tests_passed {
587 /// // ctx.shell("deploy", ...).await?;
588 /// } else {
589 /// ctx.skip("deploy", "tests failed").await?;
590 /// }
591 /// # Ok(())
592 /// # }
593 /// ```
594 pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
595 let position = self.position;
596 self.position += 1;
597
598 let step = self
599 .store
600 .create_step(NewStep {
601 run_id: self.run_id,
602 name: name.to_string(),
603 kind: StepKind::Custom("skip".to_string()),
604 position,
605 input: None,
606 })
607 .await?;
608
609 if !self.last_step_ids.is_empty() {
610 let deps: Vec<NewStepDependency> = self
611 .last_step_ids
612 .iter()
613 .map(|&depends_on| NewStepDependency {
614 step_id: step.id,
615 depends_on,
616 })
617 .collect();
618 self.store.create_step_dependencies(deps).await?;
619 }
620
621 let now = Utc::now();
622 self.store
623 .update_step(
624 step.id,
625 StepUpdate {
626 status: Some(StepStatus::Skipped),
627 output: Some(serde_json::json!({"reason": reason})),
628 completed_at: Some(now),
629 ..StepUpdate::default()
630 },
631 )
632 .await?;
633
634 self.last_step_ids = vec![step.id];
635
636 info!(
637 run_id = %self.run_id,
638 step = %name,
639 reason,
640 "step skipped"
641 );
642
643 Ok(())
644 }
645
646 /// Execute a custom operation step.
647 ///
648 /// Runs a user-defined [`Operation`] with full step lifecycle management:
649 /// creates the step record, transitions to Running, executes the operation,
650 /// persists the output and duration, and marks the step Completed or Failed.
651 ///
652 /// The operation's [`kind()`](Operation::kind) is stored as
653 /// [`StepKind::Custom`].
654 ///
655 /// # Errors
656 ///
657 /// Returns [`EngineError`] if the operation fails or the store errors.
658 ///
659 /// # Examples
660 ///
661 /// ```no_run
662 /// use ironflow_engine::context::WorkflowContext;
663 /// use ironflow_engine::operation::Operation;
664 /// use ironflow_engine::error::EngineError;
665 /// use serde_json::{Value, json};
666 /// use std::pin::Pin;
667 /// use std::future::Future;
668 ///
669 /// struct MyOp;
670 /// impl Operation for MyOp {
671 /// fn kind(&self) -> &str { "my-service" }
672 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
673 /// Box::pin(async { Ok(json!({"ok": true})) })
674 /// }
675 /// }
676 ///
677 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
678 /// let result = ctx.operation("call-service", &MyOp).await?;
679 /// println!("output: {}", result.output);
680 /// # Ok(())
681 /// # }
682 /// ```
683 pub async fn operation(
684 &mut self,
685 name: &str,
686 op: &dyn Operation,
687 ) -> Result<StepOutput, EngineError> {
688 let kind = StepKind::Custom(op.kind().to_string());
689 let position = self.position;
690 self.position += 1;
691
692 let step = self
693 .store
694 .create_step(NewStep {
695 run_id: self.run_id,
696 name: name.to_string(),
697 kind,
698 position,
699 input: op.input(),
700 })
701 .await?;
702
703 self.start_step(step.id, Utc::now()).await?;
704
705 let start = Instant::now();
706
707 match op.execute().await {
708 Ok(output_value) => {
709 let duration_ms = start.elapsed().as_millis() as u64;
710 self.total_duration_ms += duration_ms;
711
712 let completed_at = Utc::now();
713 self.store
714 .update_step(
715 step.id,
716 StepUpdate {
717 status: Some(StepStatus::Completed),
718 output: Some(output_value.clone()),
719 duration_ms: Some(duration_ms),
720 cost_usd: Some(Decimal::ZERO),
721 completed_at: Some(completed_at),
722 ..StepUpdate::default()
723 },
724 )
725 .await?;
726
727 info!(
728 run_id = %self.run_id,
729 step = %name,
730 kind = op.kind(),
731 duration_ms,
732 "operation step completed"
733 );
734
735 self.last_step_ids = vec![step.id];
736
737 Ok(StepOutput {
738 output: output_value,
739 duration_ms,
740 cost_usd: Decimal::ZERO,
741 input_tokens: None,
742 output_tokens: None,
743 debug_messages: None,
744 })
745 }
746 Err(err) => {
747 let completed_at = Utc::now();
748 if let Err(store_err) = self
749 .store
750 .update_step(
751 step.id,
752 StepUpdate {
753 status: Some(StepStatus::Failed),
754 error: Some(err.to_string()),
755 completed_at: Some(completed_at),
756 ..StepUpdate::default()
757 },
758 )
759 .await
760 {
761 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
762 }
763
764 Err(err)
765 }
766 }
767 }
768
769 /// Execute a sub-workflow step.
770 ///
771 /// Creates a child run for the named workflow handler, executes it with
772 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
773 /// the child run ID and aggregated metrics.
774 ///
775 /// Requires the context to be created with
776 /// `with_handler_resolver`.
777 ///
778 /// # Errors
779 ///
780 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
781 /// with the given name, or if no handler resolver is available.
782 ///
783 /// # Examples
784 ///
785 /// ```no_run
786 /// use ironflow_engine::context::WorkflowContext;
787 /// use ironflow_engine::error::EngineError;
788 /// use serde_json::json;
789 ///
790 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
791 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
792 /// # Ok(())
793 /// # }
794 /// ```
795 pub async fn workflow(
796 &mut self,
797 handler: &dyn WorkflowHandler,
798 payload: Value,
799 ) -> Result<StepOutput, EngineError> {
800 let config = WorkflowStepConfig::new(handler.name(), payload);
801 let position = self.position;
802 self.position += 1;
803
804 let step = self
805 .store
806 .create_step(NewStep {
807 run_id: self.run_id,
808 name: config.workflow_name.clone(),
809 kind: StepKind::Workflow,
810 position,
811 input: Some(serde_json::to_value(&config)?),
812 })
813 .await?;
814
815 self.start_step(step.id, Utc::now()).await?;
816
817 match self.execute_child_workflow(&config).await {
818 Ok(output) => {
819 self.total_cost_usd += output.cost_usd;
820 self.total_duration_ms += output.duration_ms;
821
822 let completed_at = Utc::now();
823 self.store
824 .update_step(
825 step.id,
826 StepUpdate {
827 status: Some(StepStatus::Completed),
828 output: Some(output.output.clone()),
829 duration_ms: Some(output.duration_ms),
830 cost_usd: Some(output.cost_usd),
831 completed_at: Some(completed_at),
832 ..StepUpdate::default()
833 },
834 )
835 .await?;
836
837 info!(
838 run_id = %self.run_id,
839 child_workflow = %config.workflow_name,
840 duration_ms = output.duration_ms,
841 "workflow step completed"
842 );
843
844 self.last_step_ids = vec![step.id];
845
846 Ok(output)
847 }
848 Err(err) => {
849 let completed_at = Utc::now();
850 if let Err(store_err) = self
851 .store
852 .update_step(
853 step.id,
854 StepUpdate {
855 status: Some(StepStatus::Failed),
856 error: Some(err.to_string()),
857 completed_at: Some(completed_at),
858 ..StepUpdate::default()
859 },
860 )
861 .await
862 {
863 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
864 }
865
866 Err(err)
867 }
868 }
869 }
870
871 /// Execute a child workflow and return aggregated output.
872 async fn execute_child_workflow(
873 &self,
874 config: &WorkflowStepConfig,
875 ) -> Result<StepOutput, EngineError> {
876 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
877 EngineError::InvalidWorkflow(
878 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
879 )
880 })?;
881
882 let handler = resolver(&config.workflow_name).ok_or_else(|| {
883 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
884 })?;
885
886 let parent_labels = self
887 .store
888 .get_run(self.run_id)
889 .await?
890 .map(|r| r.labels)
891 .unwrap_or_default();
892
893 let child_run = self
894 .store
895 .create_run(NewRun {
896 workflow_name: config.workflow_name.clone(),
897 trigger: TriggerKind::Workflow,
898 payload: config.payload.clone(),
899 max_retries: 0,
900 handler_version: None,
901 labels: parent_labels,
902 scheduled_at: None,
903 })
904 .await?;
905
906 let child_run_id = child_run.id;
907 info!(
908 parent_run_id = %self.run_id,
909 child_run_id = %child_run_id,
910 workflow = %config.workflow_name,
911 "child run created"
912 );
913
914 self.store
915 .update_run_status(child_run_id, RunStatus::Running)
916 .await?;
917
918 let run_start = Instant::now();
919 let mut child_ctx = WorkflowContext {
920 run_id: child_run_id,
921 store: self.store.clone(),
922 provider: self.provider.clone(),
923 handler_resolver: self.handler_resolver.clone(),
924 position: 0,
925 last_step_ids: Vec::new(),
926 total_cost_usd: Decimal::ZERO,
927 total_duration_ms: 0,
928 replay_steps: HashMap::new(),
929 };
930
931 let result = handler.execute(&mut child_ctx).await;
932 let total_duration = run_start.elapsed().as_millis() as u64;
933 let completed_at = Utc::now();
934
935 match result {
936 Ok(()) => {
937 self.store
938 .update_run(
939 child_run_id,
940 RunUpdate {
941 status: Some(RunStatus::Completed),
942 cost_usd: Some(child_ctx.total_cost_usd),
943 duration_ms: Some(total_duration),
944 completed_at: Some(completed_at),
945 ..RunUpdate::default()
946 },
947 )
948 .await?;
949
950 Ok(StepOutput {
951 output: serde_json::json!({
952 "run_id": child_run_id,
953 "workflow_name": config.workflow_name,
954 "status": RunStatus::Completed,
955 "cost_usd": child_ctx.total_cost_usd,
956 "duration_ms": total_duration,
957 }),
958 duration_ms: total_duration,
959 cost_usd: child_ctx.total_cost_usd,
960 input_tokens: None,
961 output_tokens: None,
962 debug_messages: None,
963 })
964 }
965 Err(err) => {
966 if let Err(store_err) = self
967 .store
968 .update_run(
969 child_run_id,
970 RunUpdate {
971 status: Some(RunStatus::Failed),
972 error: Some(err.to_string()),
973 cost_usd: Some(child_ctx.total_cost_usd),
974 duration_ms: Some(total_duration),
975 completed_at: Some(completed_at),
976 ..RunUpdate::default()
977 },
978 )
979 .await
980 {
981 error!(
982 child_run_id = %child_run_id,
983 store_error = %store_err,
984 "failed to persist child run failure"
985 );
986 }
987
988 Err(err)
989 }
990 }
991 }
992
993 /// Try to replay a completed step from a previous execution.
994 ///
995 /// Returns `Some(StepOutput)` if a completed step exists at the given
996 /// position, `None` otherwise.
997 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
998 let step = self.replay_steps.get(&position)?;
999 if step.status.state != StepStatus::Completed {
1000 return None;
1001 }
1002 let output = StepOutput {
1003 output: step.output.clone().unwrap_or(Value::Null),
1004 duration_ms: step.duration_ms,
1005 cost_usd: step.cost_usd,
1006 input_tokens: step.input_tokens,
1007 output_tokens: step.output_tokens,
1008 debug_messages: None,
1009 };
1010 self.total_cost_usd += output.cost_usd;
1011 self.total_duration_ms += output.duration_ms;
1012 self.last_step_ids = vec![step.id];
1013 info!(
1014 run_id = %self.run_id,
1015 step = %step.name,
1016 position,
1017 "step replayed from previous execution"
1018 );
1019 Some(output)
1020 }
1021
1022 /// Internal: execute a step with full persistence lifecycle.
1023 async fn execute_step(
1024 &mut self,
1025 name: &str,
1026 kind: StepKind,
1027 config: StepConfig,
1028 ) -> Result<StepOutput, EngineError> {
1029 let position = self.position;
1030 self.position += 1;
1031
1032 // Replay: if this step already completed in a prior execution, return cached output.
1033 if let Some(output) = self.try_replay_step(position) {
1034 return Ok(output);
1035 }
1036
1037 // Create step record in Pending.
1038 let step = self
1039 .store
1040 .create_step(NewStep {
1041 run_id: self.run_id,
1042 name: name.to_string(),
1043 kind,
1044 position,
1045 input: Some(serde_json::to_value(&config)?),
1046 })
1047 .await?;
1048
1049 self.start_step(step.id, Utc::now()).await?;
1050
1051 match execute_step_config(&config, &self.provider).await {
1052 Ok(output) => {
1053 self.total_cost_usd += output.cost_usd;
1054 self.total_duration_ms += output.duration_ms;
1055
1056 let debug_messages_json = output.debug_messages_json();
1057
1058 let completed_at = Utc::now();
1059 self.store
1060 .update_step(
1061 step.id,
1062 StepUpdate {
1063 status: Some(StepStatus::Completed),
1064 output: Some(output.output.clone()),
1065 duration_ms: Some(output.duration_ms),
1066 cost_usd: Some(output.cost_usd),
1067 input_tokens: output.input_tokens,
1068 output_tokens: output.output_tokens,
1069 completed_at: Some(completed_at),
1070 debug_messages: debug_messages_json,
1071 ..StepUpdate::default()
1072 },
1073 )
1074 .await?;
1075
1076 info!(
1077 run_id = %self.run_id,
1078 step = %name,
1079 duration_ms = output.duration_ms,
1080 "step completed"
1081 );
1082
1083 self.last_step_ids = vec![step.id];
1084
1085 Ok(output)
1086 }
1087 Err(err) => {
1088 let completed_at = Utc::now();
1089 let debug_messages_json = extract_debug_messages_from_error(&err);
1090 let partial = extract_partial_usage_from_error(&err);
1091
1092 if let Some(ref usage) = partial {
1093 if let Some(cost) = usage.cost_usd {
1094 self.total_cost_usd += cost;
1095 }
1096 if let Some(dur) = usage.duration_ms {
1097 self.total_duration_ms += dur;
1098 }
1099 }
1100
1101 if let Err(store_err) = self
1102 .store
1103 .update_step(
1104 step.id,
1105 StepUpdate {
1106 status: Some(StepStatus::Failed),
1107 error: Some(err.to_string()),
1108 completed_at: Some(completed_at),
1109 debug_messages: debug_messages_json,
1110 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1111 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1112 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1113 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1114 ..StepUpdate::default()
1115 },
1116 )
1117 .await
1118 {
1119 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1120 }
1121
1122 Err(err)
1123 }
1124 }
1125 }
1126
1127 /// Record dependency edges and transition a step to Running.
1128 ///
1129 /// Records edges from `step_id` to all `last_step_ids`, then
1130 /// transitions the step to `Running` with the given timestamp.
1131 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1132 if !self.last_step_ids.is_empty() {
1133 let deps: Vec<NewStepDependency> = self
1134 .last_step_ids
1135 .iter()
1136 .map(|&depends_on| NewStepDependency {
1137 step_id,
1138 depends_on,
1139 })
1140 .collect();
1141 self.store.create_step_dependencies(deps).await?;
1142 }
1143
1144 self.store
1145 .update_step(
1146 step_id,
1147 StepUpdate {
1148 status: Some(StepStatus::Running),
1149 started_at: Some(now),
1150 ..StepUpdate::default()
1151 },
1152 )
1153 .await?;
1154
1155 Ok(())
1156 }
1157
1158 /// Access the store directly (advanced usage).
1159 pub fn store(&self) -> &Arc<dyn Store> {
1160 &self.store
1161 }
1162
1163 /// Access the payload that triggered this run.
1164 ///
1165 /// Fetches the run from the store and returns its payload.
1166 ///
1167 /// # Errors
1168 ///
1169 /// Returns [`EngineError::Store`] if the run is not found.
1170 pub async fn payload(&self) -> Result<Value, EngineError> {
1171 let run = self
1172 .store
1173 .get_run(self.run_id)
1174 .await?
1175 .ok_or(EngineError::Store(
1176 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1177 ))?;
1178 Ok(run.payload)
1179 }
1180}
1181
1182impl fmt::Debug for WorkflowContext {
1183 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1184 f.debug_struct("WorkflowContext")
1185 .field("run_id", &self.run_id)
1186 .field("position", &self.position)
1187 .field("total_cost_usd", &self.total_cost_usd)
1188 .finish_non_exhaustive()
1189 }
1190}
1191
1192/// Extract debug messages from an engine error, if it wraps a schema validation
1193/// failure that carries a verbose conversation trace.
1194fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1195 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1196 debug_messages,
1197 ..
1198 })) = err
1199 && !debug_messages.is_empty()
1200 {
1201 return serde_json::to_value(debug_messages).ok();
1202 }
1203 None
1204}
1205
1206/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1207///
1208/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1209/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1210/// happens here, at the engine/store boundary.
1211struct StepPartialUsage {
1212 cost_usd: Option<Decimal>,
1213 duration_ms: Option<u64>,
1214 input_tokens: Option<u64>,
1215 output_tokens: Option<u64>,
1216}
1217
1218fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1219 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1220 partial_usage,
1221 ..
1222 })) = err
1223 && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1224 {
1225 return Some(StepPartialUsage {
1226 cost_usd: partial_usage
1227 .cost_usd
1228 .and_then(|c| Decimal::try_from(c).ok()),
1229 duration_ms: partial_usage.duration_ms,
1230 input_tokens: partial_usage.input_tokens,
1231 output_tokens: partial_usage.output_tokens,
1232 });
1233 }
1234 None
1235}