ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42 StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::log_sender::{LogSender, StepLogSender};
53use crate::operation::Operation;
54
55/// Callback type for resolving workflow handlers by name.
56pub(crate) type HandlerResolver =
57 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
58
59/// Execution context for a single workflow run.
60///
61/// Tracks the current step position and provides convenience methods
62/// for executing operations with automatic persistence.
63///
64/// # Examples
65///
66/// ```no_run
67/// use ironflow_engine::context::WorkflowContext;
68/// use ironflow_engine::config::ShellConfig;
69/// use ironflow_engine::error::EngineError;
70///
71/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
72/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
73/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
74/// # Ok(())
75/// # }
76/// ```
77pub struct WorkflowContext {
78 run_id: Uuid,
79 store: Arc<dyn Store>,
80 provider: Arc<dyn AgentProvider>,
81 handler_resolver: Option<HandlerResolver>,
82 position: u32,
83 /// IDs of the last executed step(s) -- used to record DAG dependencies.
84 last_step_ids: Vec<Uuid>,
85 /// Accumulated cost across all steps in this run.
86 total_cost_usd: Decimal,
87 /// Accumulated duration across all steps.
88 total_duration_ms: u64,
89 /// Steps from a previous execution, keyed by position.
90 /// Used when resuming after approval to replay completed steps.
91 replay_steps: HashMap<u32, Step>,
92 /// Optional sender for real-time log streaming.
93 log_sender: Option<LogSender>,
94}
95
96impl WorkflowContext {
97 /// Create a new context for a run.
98 ///
99 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
100 /// creates this when executing a [`WorkflowHandler`].
101 pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
102 Self {
103 run_id,
104 store,
105 provider,
106 handler_resolver: None,
107 position: 0,
108 last_step_ids: Vec::new(),
109 total_cost_usd: Decimal::ZERO,
110 total_duration_ms: 0,
111 replay_steps: HashMap::new(),
112 log_sender: None,
113 }
114 }
115
116 /// Create a new context with a handler resolver for sub-workflow support.
117 ///
118 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
119 /// look up registered handlers by name.
120 pub(crate) fn with_handler_resolver(
121 run_id: Uuid,
122 store: Arc<dyn Store>,
123 provider: Arc<dyn AgentProvider>,
124 resolver: HandlerResolver,
125 ) -> Self {
126 Self {
127 run_id,
128 store,
129 provider,
130 handler_resolver: Some(resolver),
131 position: 0,
132 last_step_ids: Vec::new(),
133 total_cost_usd: Decimal::ZERO,
134 total_duration_ms: 0,
135 replay_steps: HashMap::new(),
136 log_sender: None,
137 }
138 }
139
140 /// Attach a log sender for real-time step output streaming.
141 pub fn set_log_sender(&mut self, sender: LogSender) {
142 self.log_sender = Some(sender);
143 }
144
145 /// Load existing steps from the store for replay after approval.
146 ///
147 /// Called by the engine when resuming a run. All completed steps
148 /// and the approved approval step are indexed by position so that
149 /// `execute_step` and `approval` can skip them.
150 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
151 let steps = self.store.list_steps(self.run_id).await?;
152 for step in steps {
153 let dominated = matches!(
154 step.status.state,
155 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
156 );
157 if dominated {
158 self.replay_steps.insert(step.position, step);
159 }
160 }
161 Ok(())
162 }
163
164 /// The run ID this context is executing for.
165 pub fn run_id(&self) -> Uuid {
166 self.run_id
167 }
168
169 /// Accumulated cost across all executed steps so far.
170 pub fn total_cost_usd(&self) -> Decimal {
171 self.total_cost_usd
172 }
173
174 /// Accumulated duration across all executed steps so far.
175 pub fn total_duration_ms(&self) -> u64 {
176 self.total_duration_ms
177 }
178
179 /// Execute multiple steps concurrently (wait-all model).
180 ///
181 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
182 /// Each step is recorded with the same `position` (execution wave).
183 /// Dependencies on previous steps are recorded automatically.
184 ///
185 /// When `fail_fast` is true, remaining steps are aborted on the first
186 /// failure. When false, all steps run to completion and the first
187 /// error is returned.
188 ///
189 /// # Errors
190 ///
191 /// Returns [`EngineError`] if any step fails.
192 ///
193 /// # Examples
194 ///
195 /// ```no_run
196 /// use ironflow_engine::context::WorkflowContext;
197 /// use ironflow_engine::config::{StepConfig, ShellConfig};
198 /// use ironflow_engine::error::EngineError;
199 ///
200 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
201 /// let results = ctx.parallel(
202 /// vec![
203 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
204 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
205 /// ],
206 /// true,
207 /// ).await?;
208 ///
209 /// for r in &results {
210 /// println!("{}: {:?}", r.name, r.output.output);
211 /// }
212 /// # Ok(())
213 /// # }
214 /// ```
215 pub async fn parallel(
216 &mut self,
217 steps: Vec<(&str, StepConfig)>,
218 fail_fast: bool,
219 ) -> Result<Vec<ParallelStepResult>, EngineError> {
220 if steps.is_empty() {
221 return Ok(Vec::new());
222 }
223
224 let wave_position = self.position;
225 self.position += 1;
226
227 let now = Utc::now();
228 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
229
230 for (name, config) in &steps {
231 let kind = config.kind();
232 let step = self
233 .store
234 .create_step(NewStep {
235 run_id: self.run_id,
236 name: name.to_string(),
237 kind,
238 position: wave_position,
239 input: Some(serde_json::to_value(config)?),
240 })
241 .await?;
242
243 self.start_step(step.id, now).await?;
244
245 step_records.push((step.id, name.to_string(), config.clone()));
246 }
247
248 let mut join_set = JoinSet::new();
249 for (idx, (step_id, step_name, config)) in step_records.iter().enumerate() {
250 let provider = self.provider.clone();
251 let config = config.clone();
252 let step_log_sender = self
253 .log_sender
254 .as_ref()
255 .map(|s| StepLogSender::new(s.clone(), self.run_id, *step_id, step_name.clone()));
256 join_set.spawn(async move {
257 (
258 idx,
259 execute_step_config(&config, &provider, step_log_sender).await,
260 )
261 });
262 }
263
264 // JoinSet returns in completion order; indexed_results restores input order.
265 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
266 vec![None; step_records.len()];
267 let mut first_error: Option<EngineError> = None;
268
269 while let Some(join_result) = join_set.join_next().await {
270 let (idx, step_result) = match join_result {
271 Ok(r) => r,
272 Err(e) => {
273 if first_error.is_none() {
274 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
275 }
276 if fail_fast {
277 join_set.abort_all();
278 }
279 continue;
280 }
281 };
282
283 let (step_id, step_name, _) = &step_records[idx];
284 let completed_at = Utc::now();
285
286 match step_result {
287 Ok(output) => {
288 self.total_cost_usd += output.cost_usd;
289 self.total_duration_ms += output.duration_ms;
290
291 let debug_messages_json = output.debug_messages_json();
292
293 self.store
294 .update_step(
295 *step_id,
296 StepUpdate {
297 status: Some(StepStatus::Completed),
298 output: Some(output.output.clone()),
299 duration_ms: Some(output.duration_ms),
300 cost_usd: Some(output.cost_usd),
301 input_tokens: output.input_tokens,
302 output_tokens: output.output_tokens,
303 completed_at: Some(completed_at),
304 debug_messages: debug_messages_json,
305 ..StepUpdate::default()
306 },
307 )
308 .await?;
309
310 info!(
311 run_id = %self.run_id,
312 step = %step_name,
313 duration_ms = output.duration_ms,
314 "parallel step completed"
315 );
316
317 indexed_results[idx] = Some(Ok(output));
318 }
319 Err(err) => {
320 let err_msg = err.to_string();
321 let debug_messages_json = extract_debug_messages_from_error(&err);
322 let partial = extract_partial_usage_from_error(&err);
323 let raw_response_output = extract_raw_response_from_error(&err);
324
325 if let Some(ref usage) = partial {
326 if let Some(cost) = usage.cost_usd {
327 self.total_cost_usd += cost;
328 }
329 if let Some(dur) = usage.duration_ms {
330 self.total_duration_ms += dur;
331 }
332 }
333
334 if let Err(store_err) = self
335 .store
336 .update_step(
337 *step_id,
338 StepUpdate {
339 status: Some(StepStatus::Failed),
340 error: Some(err_msg.clone()),
341 output: raw_response_output,
342 completed_at: Some(completed_at),
343 debug_messages: debug_messages_json,
344 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
345 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
346 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
347 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
348 ..StepUpdate::default()
349 },
350 )
351 .await
352 {
353 tracing::error!(
354 step_id = %step_id,
355 error = %store_err,
356 "failed to persist parallel step failure"
357 );
358 }
359
360 indexed_results[idx] = Some(Err(err_msg.clone()));
361
362 if first_error.is_none() {
363 first_error = Some(err);
364 }
365
366 if fail_fast {
367 join_set.abort_all();
368 }
369 }
370 }
371 }
372
373 if let Some(err) = first_error {
374 return Err(err);
375 }
376
377 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
378
379 // Build results in original order.
380 let results: Vec<ParallelStepResult> = step_records
381 .iter()
382 .enumerate()
383 .map(|(idx, (step_id, name, _))| {
384 let output = match indexed_results[idx].take() {
385 Some(Ok(o)) => o,
386 _ => unreachable!("all steps succeeded if no error returned"),
387 };
388 ParallelStepResult {
389 name: name.clone(),
390 output,
391 step_id: *step_id,
392 }
393 })
394 .collect();
395
396 Ok(results)
397 }
398
399 /// Execute a shell step.
400 ///
401 /// Creates the step record, runs the command, persists the result,
402 /// and returns the output for use in subsequent steps.
403 ///
404 /// # Errors
405 ///
406 /// Returns [`EngineError`] if the command fails or the store errors.
407 ///
408 /// # Examples
409 ///
410 /// ```no_run
411 /// use ironflow_engine::context::WorkflowContext;
412 /// use ironflow_engine::config::ShellConfig;
413 /// use ironflow_engine::error::EngineError;
414 ///
415 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
416 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
417 /// println!("stdout: {}", files.output["stdout"]);
418 /// # Ok(())
419 /// # }
420 /// ```
421 pub async fn shell(
422 &mut self,
423 name: &str,
424 config: ShellConfig,
425 ) -> Result<StepOutput, EngineError> {
426 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
427 .await
428 }
429
430 /// Execute an HTTP step.
431 ///
432 /// # Errors
433 ///
434 /// Returns [`EngineError`] if the request fails or the store errors.
435 ///
436 /// # Examples
437 ///
438 /// ```no_run
439 /// use ironflow_engine::context::WorkflowContext;
440 /// use ironflow_engine::config::HttpConfig;
441 /// use ironflow_engine::error::EngineError;
442 ///
443 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
444 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
445 /// println!("status: {}", resp.output["status"]);
446 /// # Ok(())
447 /// # }
448 /// ```
449 pub async fn http(
450 &mut self,
451 name: &str,
452 config: HttpConfig,
453 ) -> Result<StepOutput, EngineError> {
454 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
455 .await
456 }
457
458 /// Execute an agent step.
459 ///
460 /// # Errors
461 ///
462 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
463 ///
464 /// # Examples
465 ///
466 /// ```no_run
467 /// use ironflow_engine::context::WorkflowContext;
468 /// use ironflow_engine::config::AgentStepConfig;
469 /// use ironflow_engine::error::EngineError;
470 ///
471 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
472 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
473 /// println!("review: {}", review.output);
474 /// # Ok(())
475 /// # }
476 /// ```
477 pub async fn agent(
478 &mut self,
479 name: &str,
480 config: impl Into<AgentStepConfig>,
481 ) -> Result<StepOutput, EngineError> {
482 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
483 .await
484 }
485
486 /// Create a human approval gate.
487 ///
488 /// On first execution, records an approval step and returns
489 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
490 /// transitions the run to `AwaitingApproval`.
491 ///
492 /// On resume (after a human approved via the API), the approval step
493 /// is replayed: it is marked as `Completed` and execution continues
494 /// past it. Multiple approval gates in the same handler work -- each
495 /// one pauses and resumes independently.
496 ///
497 /// # Errors
498 ///
499 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
500 /// first execution. Returns other [`EngineError`] variants on store
501 /// failures.
502 ///
503 /// # Examples
504 ///
505 /// ```no_run
506 /// use ironflow_engine::context::WorkflowContext;
507 /// use ironflow_engine::config::ApprovalConfig;
508 /// use ironflow_engine::error::EngineError;
509 ///
510 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
511 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
512 /// // Execution continues here after approval
513 /// # Ok(())
514 /// # }
515 /// ```
516 pub async fn approval(
517 &mut self,
518 name: &str,
519 config: ApprovalConfig,
520 ) -> Result<(), EngineError> {
521 let position = self.position;
522 self.position += 1;
523
524 // Replay: if this approval step exists from a prior execution,
525 // the run was approved -- mark it completed (if not already) and continue.
526 if let Some(existing) = self.replay_steps.get(&position)
527 && existing.kind == StepKind::Approval
528 {
529 if existing.status.state == StepStatus::AwaitingApproval {
530 self.store
531 .update_step(
532 existing.id,
533 StepUpdate {
534 status: Some(StepStatus::Completed),
535 completed_at: Some(Utc::now()),
536 ..StepUpdate::default()
537 },
538 )
539 .await?;
540 }
541
542 self.last_step_ids = vec![existing.id];
543 info!(
544 run_id = %self.run_id,
545 step = %name,
546 position,
547 "approval step replayed (approved)"
548 );
549 return Ok(());
550 }
551
552 // First execution: create the approval step and suspend.
553 let step = self
554 .store
555 .create_step(NewStep {
556 run_id: self.run_id,
557 name: name.to_string(),
558 kind: StepKind::Approval,
559 position,
560 input: Some(serde_json::to_value(&config)?),
561 })
562 .await?;
563
564 self.start_step(step.id, Utc::now()).await?;
565
566 // Transition the step to AwaitingApproval so it reflects
567 // the suspended state on the dashboard.
568 self.store
569 .update_step(
570 step.id,
571 StepUpdate {
572 status: Some(StepStatus::AwaitingApproval),
573 ..StepUpdate::default()
574 },
575 )
576 .await?;
577
578 self.last_step_ids = vec![step.id];
579
580 Err(EngineError::ApprovalRequired {
581 run_id: self.run_id,
582 step_id: step.id,
583 message: config.message().to_string(),
584 })
585 }
586
587 /// Record a step as explicitly skipped.
588 ///
589 /// Use this inside an `if`/`else` branch when a step should not execute
590 /// but must still appear in the DAG and timeline with its reason.
591 ///
592 /// The step is created directly in [`StepStatus::Skipped`] state and the
593 /// reason is stored in the output as `{"reason": "..."}`.
594 ///
595 /// # Errors
596 ///
597 /// Returns [`EngineError`] if the store fails.
598 ///
599 /// # Examples
600 ///
601 /// ```no_run
602 /// use ironflow_engine::context::WorkflowContext;
603 /// use ironflow_engine::error::EngineError;
604 ///
605 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
606 /// let tests_passed = false;
607 /// if tests_passed {
608 /// // ctx.shell("deploy", ...).await?;
609 /// } else {
610 /// ctx.skip("deploy", "tests failed").await?;
611 /// }
612 /// # Ok(())
613 /// # }
614 /// ```
615 pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
616 let position = self.position;
617 self.position += 1;
618
619 let step = self
620 .store
621 .create_step(NewStep {
622 run_id: self.run_id,
623 name: name.to_string(),
624 kind: StepKind::Custom("skip".to_string()),
625 position,
626 input: None,
627 })
628 .await?;
629
630 if !self.last_step_ids.is_empty() {
631 let deps: Vec<NewStepDependency> = self
632 .last_step_ids
633 .iter()
634 .map(|&depends_on| NewStepDependency {
635 step_id: step.id,
636 depends_on,
637 })
638 .collect();
639 self.store.create_step_dependencies(deps).await?;
640 }
641
642 let now = Utc::now();
643 self.store
644 .update_step(
645 step.id,
646 StepUpdate {
647 status: Some(StepStatus::Skipped),
648 output: Some(serde_json::json!({"reason": reason})),
649 completed_at: Some(now),
650 ..StepUpdate::default()
651 },
652 )
653 .await?;
654
655 self.last_step_ids = vec![step.id];
656
657 info!(
658 run_id = %self.run_id,
659 step = %name,
660 reason,
661 "step skipped"
662 );
663
664 Ok(())
665 }
666
667 /// Execute a custom operation step.
668 ///
669 /// Runs a user-defined [`Operation`] with full step lifecycle management:
670 /// creates the step record, transitions to Running, executes the operation,
671 /// persists the output and duration, and marks the step Completed or Failed.
672 ///
673 /// The operation's [`kind()`](Operation::kind) is stored as
674 /// [`StepKind::Custom`].
675 ///
676 /// # Errors
677 ///
678 /// Returns [`EngineError`] if the operation fails or the store errors.
679 ///
680 /// # Examples
681 ///
682 /// ```no_run
683 /// use ironflow_engine::context::WorkflowContext;
684 /// use ironflow_engine::operation::Operation;
685 /// use ironflow_engine::error::EngineError;
686 /// use serde_json::{Value, json};
687 /// use std::pin::Pin;
688 /// use std::future::Future;
689 ///
690 /// struct MyOp;
691 /// impl Operation for MyOp {
692 /// fn kind(&self) -> &str { "my-service" }
693 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
694 /// Box::pin(async { Ok(json!({"ok": true})) })
695 /// }
696 /// }
697 ///
698 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
699 /// let result = ctx.operation("call-service", &MyOp).await?;
700 /// println!("output: {}", result.output);
701 /// # Ok(())
702 /// # }
703 /// ```
704 pub async fn operation(
705 &mut self,
706 name: &str,
707 op: &dyn Operation,
708 ) -> Result<StepOutput, EngineError> {
709 let kind = StepKind::Custom(op.kind().to_string());
710 let position = self.position;
711 self.position += 1;
712
713 let step = self
714 .store
715 .create_step(NewStep {
716 run_id: self.run_id,
717 name: name.to_string(),
718 kind,
719 position,
720 input: op.input(),
721 })
722 .await?;
723
724 self.start_step(step.id, Utc::now()).await?;
725
726 let start = Instant::now();
727
728 match op.execute().await {
729 Ok(output_value) => {
730 let duration_ms = start.elapsed().as_millis() as u64;
731 self.total_duration_ms += duration_ms;
732
733 let completed_at = Utc::now();
734 self.store
735 .update_step(
736 step.id,
737 StepUpdate {
738 status: Some(StepStatus::Completed),
739 output: Some(output_value.clone()),
740 duration_ms: Some(duration_ms),
741 cost_usd: Some(Decimal::ZERO),
742 completed_at: Some(completed_at),
743 ..StepUpdate::default()
744 },
745 )
746 .await?;
747
748 info!(
749 run_id = %self.run_id,
750 step = %name,
751 kind = op.kind(),
752 duration_ms,
753 "operation step completed"
754 );
755
756 self.last_step_ids = vec![step.id];
757
758 Ok(StepOutput {
759 output: output_value,
760 duration_ms,
761 cost_usd: Decimal::ZERO,
762 input_tokens: None,
763 output_tokens: None,
764 model: None,
765 debug_messages: None,
766 })
767 }
768 Err(err) => {
769 let completed_at = Utc::now();
770 if let Err(store_err) = self
771 .store
772 .update_step(
773 step.id,
774 StepUpdate {
775 status: Some(StepStatus::Failed),
776 error: Some(err.to_string()),
777 completed_at: Some(completed_at),
778 ..StepUpdate::default()
779 },
780 )
781 .await
782 {
783 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
784 }
785
786 Err(err)
787 }
788 }
789 }
790
791 /// Execute a sub-workflow step.
792 ///
793 /// Creates a child run for the named workflow handler, executes it with
794 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
795 /// the child run ID and aggregated metrics.
796 ///
797 /// Requires the context to be created with
798 /// `with_handler_resolver`.
799 ///
800 /// # Errors
801 ///
802 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
803 /// with the given name, or if no handler resolver is available.
804 ///
805 /// # Examples
806 ///
807 /// ```no_run
808 /// use ironflow_engine::context::WorkflowContext;
809 /// use ironflow_engine::error::EngineError;
810 /// use serde_json::json;
811 ///
812 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
813 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
814 /// # Ok(())
815 /// # }
816 /// ```
817 pub async fn workflow(
818 &mut self,
819 handler: &dyn WorkflowHandler,
820 payload: Value,
821 ) -> Result<StepOutput, EngineError> {
822 let config = WorkflowStepConfig::new(handler.name(), payload);
823 let position = self.position;
824 self.position += 1;
825
826 let step = self
827 .store
828 .create_step(NewStep {
829 run_id: self.run_id,
830 name: config.workflow_name.clone(),
831 kind: StepKind::Workflow,
832 position,
833 input: Some(serde_json::to_value(&config)?),
834 })
835 .await?;
836
837 self.start_step(step.id, Utc::now()).await?;
838
839 match self.execute_child_workflow(&config).await {
840 Ok(output) => {
841 self.total_cost_usd += output.cost_usd;
842 self.total_duration_ms += output.duration_ms;
843
844 let completed_at = Utc::now();
845 self.store
846 .update_step(
847 step.id,
848 StepUpdate {
849 status: Some(StepStatus::Completed),
850 output: Some(output.output.clone()),
851 duration_ms: Some(output.duration_ms),
852 cost_usd: Some(output.cost_usd),
853 completed_at: Some(completed_at),
854 ..StepUpdate::default()
855 },
856 )
857 .await?;
858
859 info!(
860 run_id = %self.run_id,
861 child_workflow = %config.workflow_name,
862 duration_ms = output.duration_ms,
863 "workflow step completed"
864 );
865
866 self.last_step_ids = vec![step.id];
867
868 Ok(output)
869 }
870 Err(err) => {
871 let completed_at = Utc::now();
872 if let Err(store_err) = self
873 .store
874 .update_step(
875 step.id,
876 StepUpdate {
877 status: Some(StepStatus::Failed),
878 error: Some(err.to_string()),
879 completed_at: Some(completed_at),
880 ..StepUpdate::default()
881 },
882 )
883 .await
884 {
885 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
886 }
887
888 Err(err)
889 }
890 }
891 }
892
893 /// Execute a child workflow and return aggregated output.
894 async fn execute_child_workflow(
895 &self,
896 config: &WorkflowStepConfig,
897 ) -> Result<StepOutput, EngineError> {
898 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
899 EngineError::InvalidWorkflow(
900 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
901 )
902 })?;
903
904 let handler = resolver(&config.workflow_name).ok_or_else(|| {
905 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
906 })?;
907
908 let parent_labels = self
909 .store
910 .get_run(self.run_id)
911 .await?
912 .map(|r| r.labels)
913 .unwrap_or_default();
914
915 let child_run = self
916 .store
917 .create_run(NewRun {
918 workflow_name: config.workflow_name.clone(),
919 trigger: TriggerKind::Workflow,
920 payload: config.payload.clone(),
921 max_retries: 0,
922 handler_version: None,
923 labels: parent_labels,
924 scheduled_at: None,
925 })
926 .await?;
927
928 let child_run_id = child_run.id;
929 info!(
930 parent_run_id = %self.run_id,
931 child_run_id = %child_run_id,
932 workflow = %config.workflow_name,
933 "child run created"
934 );
935
936 self.store
937 .update_run_status(child_run_id, RunStatus::Running)
938 .await?;
939
940 let run_start = Instant::now();
941 let mut child_ctx = WorkflowContext {
942 run_id: child_run_id,
943 store: self.store.clone(),
944 provider: self.provider.clone(),
945 handler_resolver: self.handler_resolver.clone(),
946 position: 0,
947 last_step_ids: Vec::new(),
948 total_cost_usd: Decimal::ZERO,
949 total_duration_ms: 0,
950 replay_steps: HashMap::new(),
951 log_sender: self.log_sender.clone(),
952 };
953
954 let result = handler.execute(&mut child_ctx).await;
955 let total_duration = run_start.elapsed().as_millis() as u64;
956 let completed_at = Utc::now();
957
958 match result {
959 Ok(()) => {
960 self.store
961 .update_run(
962 child_run_id,
963 RunUpdate {
964 status: Some(RunStatus::Completed),
965 cost_usd: Some(child_ctx.total_cost_usd),
966 duration_ms: Some(total_duration),
967 completed_at: Some(completed_at),
968 ..RunUpdate::default()
969 },
970 )
971 .await?;
972
973 Ok(StepOutput {
974 output: serde_json::json!({
975 "run_id": child_run_id,
976 "workflow_name": config.workflow_name,
977 "status": RunStatus::Completed,
978 "cost_usd": child_ctx.total_cost_usd,
979 "duration_ms": total_duration,
980 }),
981 duration_ms: total_duration,
982 cost_usd: child_ctx.total_cost_usd,
983 input_tokens: None,
984 output_tokens: None,
985 model: None,
986 debug_messages: None,
987 })
988 }
989 Err(err) => {
990 if let Err(store_err) = self
991 .store
992 .update_run(
993 child_run_id,
994 RunUpdate {
995 status: Some(RunStatus::Failed),
996 error: Some(err.to_string()),
997 cost_usd: Some(child_ctx.total_cost_usd),
998 duration_ms: Some(total_duration),
999 completed_at: Some(completed_at),
1000 ..RunUpdate::default()
1001 },
1002 )
1003 .await
1004 {
1005 error!(
1006 child_run_id = %child_run_id,
1007 store_error = %store_err,
1008 "failed to persist child run failure"
1009 );
1010 }
1011
1012 Err(err)
1013 }
1014 }
1015 }
1016
1017 /// Try to replay a completed step from a previous execution.
1018 ///
1019 /// Returns `Some(StepOutput)` if a completed step exists at the given
1020 /// position, `None` otherwise.
1021 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
1022 let step = self.replay_steps.get(&position)?;
1023 if step.status.state != StepStatus::Completed {
1024 return None;
1025 }
1026 let output = StepOutput {
1027 output: step.output.clone().unwrap_or(Value::Null),
1028 duration_ms: step.duration_ms,
1029 cost_usd: step.cost_usd,
1030 input_tokens: step.input_tokens,
1031 output_tokens: step.output_tokens,
1032 model: None,
1033 debug_messages: None,
1034 };
1035 self.total_cost_usd += output.cost_usd;
1036 self.total_duration_ms += output.duration_ms;
1037 self.last_step_ids = vec![step.id];
1038 info!(
1039 run_id = %self.run_id,
1040 step = %step.name,
1041 position,
1042 "step replayed from previous execution"
1043 );
1044 Some(output)
1045 }
1046
1047 /// Internal: execute a step with full persistence lifecycle.
1048 async fn execute_step(
1049 &mut self,
1050 name: &str,
1051 kind: StepKind,
1052 config: StepConfig,
1053 ) -> Result<StepOutput, EngineError> {
1054 let position = self.position;
1055 self.position += 1;
1056
1057 // Replay: if this step already completed in a prior execution, return cached output.
1058 if let Some(output) = self.try_replay_step(position) {
1059 return Ok(output);
1060 }
1061
1062 // Create step record in Pending.
1063 let step = self
1064 .store
1065 .create_step(NewStep {
1066 run_id: self.run_id,
1067 name: name.to_string(),
1068 kind,
1069 position,
1070 input: Some(serde_json::to_value(&config)?),
1071 })
1072 .await?;
1073
1074 self.start_step(step.id, Utc::now()).await?;
1075
1076 let step_log_sender = self
1077 .log_sender
1078 .as_ref()
1079 .map(|s| StepLogSender::new(s.clone(), self.run_id, step.id, name.to_string()));
1080
1081 match execute_step_config(&config, &self.provider, step_log_sender).await {
1082 Ok(output) => {
1083 self.total_cost_usd += output.cost_usd;
1084 self.total_duration_ms += output.duration_ms;
1085
1086 let debug_messages_json = output.debug_messages_json();
1087
1088 let completed_at = Utc::now();
1089 self.store
1090 .update_step(
1091 step.id,
1092 StepUpdate {
1093 status: Some(StepStatus::Completed),
1094 output: Some(output.output.clone()),
1095 duration_ms: Some(output.duration_ms),
1096 cost_usd: Some(output.cost_usd),
1097 input_tokens: output.input_tokens,
1098 output_tokens: output.output_tokens,
1099 completed_at: Some(completed_at),
1100 debug_messages: debug_messages_json,
1101 ..StepUpdate::default()
1102 },
1103 )
1104 .await?;
1105
1106 info!(
1107 run_id = %self.run_id,
1108 step = %name,
1109 duration_ms = output.duration_ms,
1110 "step completed"
1111 );
1112
1113 self.last_step_ids = vec![step.id];
1114
1115 Ok(output)
1116 }
1117 Err(err) => {
1118 let completed_at = Utc::now();
1119 let debug_messages_json = extract_debug_messages_from_error(&err);
1120 let partial = extract_partial_usage_from_error(&err);
1121 let raw_response_output = extract_raw_response_from_error(&err);
1122
1123 if let Some(ref usage) = partial {
1124 if let Some(cost) = usage.cost_usd {
1125 self.total_cost_usd += cost;
1126 }
1127 if let Some(dur) = usage.duration_ms {
1128 self.total_duration_ms += dur;
1129 }
1130 }
1131
1132 if let Err(store_err) = self
1133 .store
1134 .update_step(
1135 step.id,
1136 StepUpdate {
1137 status: Some(StepStatus::Failed),
1138 error: Some(err.to_string()),
1139 output: raw_response_output,
1140 completed_at: Some(completed_at),
1141 debug_messages: debug_messages_json,
1142 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1143 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1144 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1145 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1146 ..StepUpdate::default()
1147 },
1148 )
1149 .await
1150 {
1151 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1152 }
1153
1154 Err(err)
1155 }
1156 }
1157 }
1158
1159 /// Record dependency edges and transition a step to Running.
1160 ///
1161 /// Records edges from `step_id` to all `last_step_ids`, then
1162 /// transitions the step to `Running` with the given timestamp.
1163 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1164 if !self.last_step_ids.is_empty() {
1165 let deps: Vec<NewStepDependency> = self
1166 .last_step_ids
1167 .iter()
1168 .map(|&depends_on| NewStepDependency {
1169 step_id,
1170 depends_on,
1171 })
1172 .collect();
1173 self.store.create_step_dependencies(deps).await?;
1174 }
1175
1176 self.store
1177 .update_step(
1178 step_id,
1179 StepUpdate {
1180 status: Some(StepStatus::Running),
1181 started_at: Some(now),
1182 ..StepUpdate::default()
1183 },
1184 )
1185 .await?;
1186
1187 Ok(())
1188 }
1189
1190 /// Access the store directly (advanced usage).
1191 pub fn store(&self) -> &Arc<dyn Store> {
1192 &self.store
1193 }
1194
1195 /// Access the payload that triggered this run.
1196 ///
1197 /// Fetches the run from the store and returns its payload.
1198 ///
1199 /// # Errors
1200 ///
1201 /// Returns [`EngineError::Store`] if the run is not found.
1202 pub async fn payload(&self) -> Result<Value, EngineError> {
1203 let run = self
1204 .store
1205 .get_run(self.run_id)
1206 .await?
1207 .ok_or(EngineError::Store(
1208 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1209 ))?;
1210 Ok(run.payload)
1211 }
1212}
1213
1214impl fmt::Debug for WorkflowContext {
1215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1216 f.debug_struct("WorkflowContext")
1217 .field("run_id", &self.run_id)
1218 .field("position", &self.position)
1219 .field("total_cost_usd", &self.total_cost_usd)
1220 .finish_non_exhaustive()
1221 }
1222}
1223
1224/// Extract debug messages from an engine error, if it wraps a schema validation
1225/// failure that carries a verbose conversation trace.
1226fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1227 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1228 debug_messages,
1229 ..
1230 })) = err
1231 && !debug_messages.is_empty()
1232 {
1233 return serde_json::to_value(debug_messages).ok();
1234 }
1235 None
1236}
1237
1238/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1239///
1240/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1241/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1242/// happens here, at the engine/store boundary.
1243struct StepPartialUsage {
1244 cost_usd: Option<Decimal>,
1245 duration_ms: Option<u64>,
1246 input_tokens: Option<u64>,
1247 output_tokens: Option<u64>,
1248}
1249
1250/// Extract the raw response text from a schema validation error.
1251///
1252/// When the agent produced text but structured output extraction failed,
1253/// this returns the truncated raw text so it can be persisted as the
1254/// step output for dashboard visibility.
1255fn extract_raw_response_from_error(err: &EngineError) -> Option<Value> {
1256 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1257 raw_response: Some(text),
1258 ..
1259 })) = err
1260 {
1261 return Some(Value::String(text.clone()));
1262 }
1263 None
1264}
1265
1266fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1267 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1268 partial_usage,
1269 ..
1270 })) = err
1271 && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1272 {
1273 return Some(StepPartialUsage {
1274 cost_usd: partial_usage
1275 .cost_usd
1276 .and_then(|c| Decimal::try_from(c).ok()),
1277 duration_ms: partial_usage.duration_ms,
1278 input_tokens: partial_usage.input_tokens,
1279 output_tokens: partial_usage.output_tokens,
1280 });
1281 }
1282 None
1283}