ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42 StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::log_sender::{LogSender, StepLogSender};
53use crate::operation::Operation;
54
55/// Callback type for resolving workflow handlers by name.
56pub(crate) type HandlerResolver =
57 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
58
59/// Execution context for a single workflow run.
60///
61/// Tracks the current step position and provides convenience methods
62/// for executing operations with automatic persistence.
63///
64/// # Examples
65///
66/// ```no_run
67/// use ironflow_engine::context::WorkflowContext;
68/// use ironflow_engine::config::ShellConfig;
69/// use ironflow_engine::error::EngineError;
70///
71/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
72/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
73/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
74/// # Ok(())
75/// # }
76/// ```
77pub struct WorkflowContext {
78 run_id: Uuid,
79 store: Arc<dyn Store>,
80 provider: Arc<dyn AgentProvider>,
81 handler_resolver: Option<HandlerResolver>,
82 position: u32,
83 /// IDs of the last executed step(s) -- used to record DAG dependencies.
84 last_step_ids: Vec<Uuid>,
85 /// Accumulated cost across all steps in this run.
86 total_cost_usd: Decimal,
87 /// Accumulated duration across all steps.
88 total_duration_ms: u64,
89 /// Steps from a previous execution, keyed by position.
90 /// Used when resuming after approval to replay completed steps.
91 replay_steps: HashMap<u32, Step>,
92 /// Optional sender for real-time log streaming.
93 log_sender: Option<LogSender>,
94}
95
96impl WorkflowContext {
97 /// Create a new context for a run.
98 ///
99 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
100 /// creates this when executing a [`WorkflowHandler`].
101 pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
102 Self {
103 run_id,
104 store,
105 provider,
106 handler_resolver: None,
107 position: 0,
108 last_step_ids: Vec::new(),
109 total_cost_usd: Decimal::ZERO,
110 total_duration_ms: 0,
111 replay_steps: HashMap::new(),
112 log_sender: None,
113 }
114 }
115
116 /// Create a new context with a handler resolver for sub-workflow support.
117 ///
118 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
119 /// look up registered handlers by name.
120 pub(crate) fn with_handler_resolver(
121 run_id: Uuid,
122 store: Arc<dyn Store>,
123 provider: Arc<dyn AgentProvider>,
124 resolver: HandlerResolver,
125 ) -> Self {
126 Self {
127 run_id,
128 store,
129 provider,
130 handler_resolver: Some(resolver),
131 position: 0,
132 last_step_ids: Vec::new(),
133 total_cost_usd: Decimal::ZERO,
134 total_duration_ms: 0,
135 replay_steps: HashMap::new(),
136 log_sender: None,
137 }
138 }
139
140 /// Attach a log sender for real-time step output streaming.
141 pub fn set_log_sender(&mut self, sender: LogSender) {
142 self.log_sender = Some(sender);
143 }
144
145 /// Load existing steps from the store for replay after approval.
146 ///
147 /// Called by the engine when resuming a run. All completed steps
148 /// and the approved approval step are indexed by position so that
149 /// `execute_step` and `approval` can skip them.
150 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
151 let steps = self.store.list_steps(self.run_id).await?;
152 for step in steps {
153 let dominated = matches!(
154 step.status.state,
155 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
156 );
157 if dominated {
158 self.replay_steps.insert(step.position, step);
159 }
160 }
161 Ok(())
162 }
163
164 /// The run ID this context is executing for.
165 pub fn run_id(&self) -> Uuid {
166 self.run_id
167 }
168
169 /// Accumulated cost across all executed steps so far.
170 pub fn total_cost_usd(&self) -> Decimal {
171 self.total_cost_usd
172 }
173
174 /// Accumulated duration across all executed steps so far.
175 pub fn total_duration_ms(&self) -> u64 {
176 self.total_duration_ms
177 }
178
179 /// Execute multiple steps concurrently (wait-all model).
180 ///
181 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
182 /// Each step is recorded with the same `position` (execution wave).
183 /// Dependencies on previous steps are recorded automatically.
184 ///
185 /// When `fail_fast` is true, remaining steps are aborted on the first
186 /// failure. When false, all steps run to completion and the first
187 /// error is returned.
188 ///
189 /// # Errors
190 ///
191 /// Returns [`EngineError`] if any step fails.
192 ///
193 /// # Examples
194 ///
195 /// ```no_run
196 /// use ironflow_engine::context::WorkflowContext;
197 /// use ironflow_engine::config::{StepConfig, ShellConfig};
198 /// use ironflow_engine::error::EngineError;
199 ///
200 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
201 /// let results = ctx.parallel(
202 /// vec![
203 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
204 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
205 /// ],
206 /// true,
207 /// ).await?;
208 ///
209 /// for r in &results {
210 /// println!("{}: {:?}", r.name, r.output.output);
211 /// }
212 /// # Ok(())
213 /// # }
214 /// ```
215 pub async fn parallel(
216 &mut self,
217 steps: Vec<(&str, StepConfig)>,
218 fail_fast: bool,
219 ) -> Result<Vec<ParallelStepResult>, EngineError> {
220 if steps.is_empty() {
221 return Ok(Vec::new());
222 }
223
224 let wave_position = self.position;
225 self.position += 1;
226
227 let now = Utc::now();
228 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
229
230 for (name, config) in &steps {
231 let kind = config.kind();
232 let step = self
233 .store
234 .create_step(NewStep {
235 run_id: self.run_id,
236 name: name.to_string(),
237 kind,
238 position: wave_position,
239 input: Some(serde_json::to_value(config)?),
240 })
241 .await?;
242
243 self.start_step(step.id, now).await?;
244
245 step_records.push((step.id, name.to_string(), config.clone()));
246 }
247
248 let mut join_set = JoinSet::new();
249 for (idx, (step_id, step_name, config)) in step_records.iter().enumerate() {
250 let provider = self.provider.clone();
251 let config = config.clone();
252 let step_log_sender = self
253 .log_sender
254 .as_ref()
255 .map(|s| StepLogSender::new(s.clone(), self.run_id, *step_id, step_name.clone()));
256 join_set.spawn(async move {
257 (
258 idx,
259 execute_step_config(&config, &provider, step_log_sender).await,
260 )
261 });
262 }
263
264 // JoinSet returns in completion order; indexed_results restores input order.
265 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
266 vec![None; step_records.len()];
267 let mut first_error: Option<EngineError> = None;
268
269 while let Some(join_result) = join_set.join_next().await {
270 let (idx, step_result) = match join_result {
271 Ok(r) => r,
272 Err(e) => {
273 if first_error.is_none() {
274 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
275 }
276 if fail_fast {
277 join_set.abort_all();
278 }
279 continue;
280 }
281 };
282
283 let (step_id, step_name, _) = &step_records[idx];
284 let completed_at = Utc::now();
285
286 match step_result {
287 Ok(output) => {
288 self.total_cost_usd += output.cost_usd;
289 self.total_duration_ms += output.duration_ms;
290
291 let debug_messages_json = output.debug_messages_json();
292
293 self.store
294 .update_step(
295 *step_id,
296 StepUpdate {
297 status: Some(StepStatus::Completed),
298 output: Some(output.output.clone()),
299 duration_ms: Some(output.duration_ms),
300 cost_usd: Some(output.cost_usd),
301 input_tokens: output.input_tokens,
302 output_tokens: output.output_tokens,
303 completed_at: Some(completed_at),
304 debug_messages: debug_messages_json,
305 ..StepUpdate::default()
306 },
307 )
308 .await?;
309
310 info!(
311 run_id = %self.run_id,
312 step = %step_name,
313 duration_ms = output.duration_ms,
314 "parallel step completed"
315 );
316
317 indexed_results[idx] = Some(Ok(output));
318 }
319 Err(err) => {
320 let err_msg = err.to_string();
321 let debug_messages_json = extract_debug_messages_from_error(&err);
322 let partial = extract_partial_usage_from_error(&err);
323
324 if let Some(ref usage) = partial {
325 if let Some(cost) = usage.cost_usd {
326 self.total_cost_usd += cost;
327 }
328 if let Some(dur) = usage.duration_ms {
329 self.total_duration_ms += dur;
330 }
331 }
332
333 if let Err(store_err) = self
334 .store
335 .update_step(
336 *step_id,
337 StepUpdate {
338 status: Some(StepStatus::Failed),
339 error: Some(err_msg.clone()),
340 completed_at: Some(completed_at),
341 debug_messages: debug_messages_json,
342 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
343 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
344 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
345 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
346 ..StepUpdate::default()
347 },
348 )
349 .await
350 {
351 tracing::error!(
352 step_id = %step_id,
353 error = %store_err,
354 "failed to persist parallel step failure"
355 );
356 }
357
358 indexed_results[idx] = Some(Err(err_msg.clone()));
359
360 if first_error.is_none() {
361 first_error = Some(err);
362 }
363
364 if fail_fast {
365 join_set.abort_all();
366 }
367 }
368 }
369 }
370
371 if let Some(err) = first_error {
372 return Err(err);
373 }
374
375 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
376
377 // Build results in original order.
378 let results: Vec<ParallelStepResult> = step_records
379 .iter()
380 .enumerate()
381 .map(|(idx, (step_id, name, _))| {
382 let output = match indexed_results[idx].take() {
383 Some(Ok(o)) => o,
384 _ => unreachable!("all steps succeeded if no error returned"),
385 };
386 ParallelStepResult {
387 name: name.clone(),
388 output,
389 step_id: *step_id,
390 }
391 })
392 .collect();
393
394 Ok(results)
395 }
396
397 /// Execute a shell step.
398 ///
399 /// Creates the step record, runs the command, persists the result,
400 /// and returns the output for use in subsequent steps.
401 ///
402 /// # Errors
403 ///
404 /// Returns [`EngineError`] if the command fails or the store errors.
405 ///
406 /// # Examples
407 ///
408 /// ```no_run
409 /// use ironflow_engine::context::WorkflowContext;
410 /// use ironflow_engine::config::ShellConfig;
411 /// use ironflow_engine::error::EngineError;
412 ///
413 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
414 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
415 /// println!("stdout: {}", files.output["stdout"]);
416 /// # Ok(())
417 /// # }
418 /// ```
419 pub async fn shell(
420 &mut self,
421 name: &str,
422 config: ShellConfig,
423 ) -> Result<StepOutput, EngineError> {
424 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
425 .await
426 }
427
428 /// Execute an HTTP step.
429 ///
430 /// # Errors
431 ///
432 /// Returns [`EngineError`] if the request fails or the store errors.
433 ///
434 /// # Examples
435 ///
436 /// ```no_run
437 /// use ironflow_engine::context::WorkflowContext;
438 /// use ironflow_engine::config::HttpConfig;
439 /// use ironflow_engine::error::EngineError;
440 ///
441 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
442 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
443 /// println!("status: {}", resp.output["status"]);
444 /// # Ok(())
445 /// # }
446 /// ```
447 pub async fn http(
448 &mut self,
449 name: &str,
450 config: HttpConfig,
451 ) -> Result<StepOutput, EngineError> {
452 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
453 .await
454 }
455
456 /// Execute an agent step.
457 ///
458 /// # Errors
459 ///
460 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
461 ///
462 /// # Examples
463 ///
464 /// ```no_run
465 /// use ironflow_engine::context::WorkflowContext;
466 /// use ironflow_engine::config::AgentStepConfig;
467 /// use ironflow_engine::error::EngineError;
468 ///
469 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
470 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
471 /// println!("review: {}", review.output["value"]);
472 /// # Ok(())
473 /// # }
474 /// ```
475 pub async fn agent(
476 &mut self,
477 name: &str,
478 config: impl Into<AgentStepConfig>,
479 ) -> Result<StepOutput, EngineError> {
480 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
481 .await
482 }
483
484 /// Create a human approval gate.
485 ///
486 /// On first execution, records an approval step and returns
487 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
488 /// transitions the run to `AwaitingApproval`.
489 ///
490 /// On resume (after a human approved via the API), the approval step
491 /// is replayed: it is marked as `Completed` and execution continues
492 /// past it. Multiple approval gates in the same handler work -- each
493 /// one pauses and resumes independently.
494 ///
495 /// # Errors
496 ///
497 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
498 /// first execution. Returns other [`EngineError`] variants on store
499 /// failures.
500 ///
501 /// # Examples
502 ///
503 /// ```no_run
504 /// use ironflow_engine::context::WorkflowContext;
505 /// use ironflow_engine::config::ApprovalConfig;
506 /// use ironflow_engine::error::EngineError;
507 ///
508 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
509 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
510 /// // Execution continues here after approval
511 /// # Ok(())
512 /// # }
513 /// ```
514 pub async fn approval(
515 &mut self,
516 name: &str,
517 config: ApprovalConfig,
518 ) -> Result<(), EngineError> {
519 let position = self.position;
520 self.position += 1;
521
522 // Replay: if this approval step exists from a prior execution,
523 // the run was approved -- mark it completed (if not already) and continue.
524 if let Some(existing) = self.replay_steps.get(&position)
525 && existing.kind == StepKind::Approval
526 {
527 if existing.status.state == StepStatus::AwaitingApproval {
528 self.store
529 .update_step(
530 existing.id,
531 StepUpdate {
532 status: Some(StepStatus::Completed),
533 completed_at: Some(Utc::now()),
534 ..StepUpdate::default()
535 },
536 )
537 .await?;
538 }
539
540 self.last_step_ids = vec![existing.id];
541 info!(
542 run_id = %self.run_id,
543 step = %name,
544 position,
545 "approval step replayed (approved)"
546 );
547 return Ok(());
548 }
549
550 // First execution: create the approval step and suspend.
551 let step = self
552 .store
553 .create_step(NewStep {
554 run_id: self.run_id,
555 name: name.to_string(),
556 kind: StepKind::Approval,
557 position,
558 input: Some(serde_json::to_value(&config)?),
559 })
560 .await?;
561
562 self.start_step(step.id, Utc::now()).await?;
563
564 // Transition the step to AwaitingApproval so it reflects
565 // the suspended state on the dashboard.
566 self.store
567 .update_step(
568 step.id,
569 StepUpdate {
570 status: Some(StepStatus::AwaitingApproval),
571 ..StepUpdate::default()
572 },
573 )
574 .await?;
575
576 self.last_step_ids = vec![step.id];
577
578 Err(EngineError::ApprovalRequired {
579 run_id: self.run_id,
580 step_id: step.id,
581 message: config.message().to_string(),
582 })
583 }
584
585 /// Record a step as explicitly skipped.
586 ///
587 /// Use this inside an `if`/`else` branch when a step should not execute
588 /// but must still appear in the DAG and timeline with its reason.
589 ///
590 /// The step is created directly in [`StepStatus::Skipped`] state and the
591 /// reason is stored in the output as `{"reason": "..."}`.
592 ///
593 /// # Errors
594 ///
595 /// Returns [`EngineError`] if the store fails.
596 ///
597 /// # Examples
598 ///
599 /// ```no_run
600 /// use ironflow_engine::context::WorkflowContext;
601 /// use ironflow_engine::error::EngineError;
602 ///
603 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
604 /// let tests_passed = false;
605 /// if tests_passed {
606 /// // ctx.shell("deploy", ...).await?;
607 /// } else {
608 /// ctx.skip("deploy", "tests failed").await?;
609 /// }
610 /// # Ok(())
611 /// # }
612 /// ```
613 pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
614 let position = self.position;
615 self.position += 1;
616
617 let step = self
618 .store
619 .create_step(NewStep {
620 run_id: self.run_id,
621 name: name.to_string(),
622 kind: StepKind::Custom("skip".to_string()),
623 position,
624 input: None,
625 })
626 .await?;
627
628 if !self.last_step_ids.is_empty() {
629 let deps: Vec<NewStepDependency> = self
630 .last_step_ids
631 .iter()
632 .map(|&depends_on| NewStepDependency {
633 step_id: step.id,
634 depends_on,
635 })
636 .collect();
637 self.store.create_step_dependencies(deps).await?;
638 }
639
640 let now = Utc::now();
641 self.store
642 .update_step(
643 step.id,
644 StepUpdate {
645 status: Some(StepStatus::Skipped),
646 output: Some(serde_json::json!({"reason": reason})),
647 completed_at: Some(now),
648 ..StepUpdate::default()
649 },
650 )
651 .await?;
652
653 self.last_step_ids = vec![step.id];
654
655 info!(
656 run_id = %self.run_id,
657 step = %name,
658 reason,
659 "step skipped"
660 );
661
662 Ok(())
663 }
664
665 /// Execute a custom operation step.
666 ///
667 /// Runs a user-defined [`Operation`] with full step lifecycle management:
668 /// creates the step record, transitions to Running, executes the operation,
669 /// persists the output and duration, and marks the step Completed or Failed.
670 ///
671 /// The operation's [`kind()`](Operation::kind) is stored as
672 /// [`StepKind::Custom`].
673 ///
674 /// # Errors
675 ///
676 /// Returns [`EngineError`] if the operation fails or the store errors.
677 ///
678 /// # Examples
679 ///
680 /// ```no_run
681 /// use ironflow_engine::context::WorkflowContext;
682 /// use ironflow_engine::operation::Operation;
683 /// use ironflow_engine::error::EngineError;
684 /// use serde_json::{Value, json};
685 /// use std::pin::Pin;
686 /// use std::future::Future;
687 ///
688 /// struct MyOp;
689 /// impl Operation for MyOp {
690 /// fn kind(&self) -> &str { "my-service" }
691 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
692 /// Box::pin(async { Ok(json!({"ok": true})) })
693 /// }
694 /// }
695 ///
696 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
697 /// let result = ctx.operation("call-service", &MyOp).await?;
698 /// println!("output: {}", result.output);
699 /// # Ok(())
700 /// # }
701 /// ```
702 pub async fn operation(
703 &mut self,
704 name: &str,
705 op: &dyn Operation,
706 ) -> Result<StepOutput, EngineError> {
707 let kind = StepKind::Custom(op.kind().to_string());
708 let position = self.position;
709 self.position += 1;
710
711 let step = self
712 .store
713 .create_step(NewStep {
714 run_id: self.run_id,
715 name: name.to_string(),
716 kind,
717 position,
718 input: op.input(),
719 })
720 .await?;
721
722 self.start_step(step.id, Utc::now()).await?;
723
724 let start = Instant::now();
725
726 match op.execute().await {
727 Ok(output_value) => {
728 let duration_ms = start.elapsed().as_millis() as u64;
729 self.total_duration_ms += duration_ms;
730
731 let completed_at = Utc::now();
732 self.store
733 .update_step(
734 step.id,
735 StepUpdate {
736 status: Some(StepStatus::Completed),
737 output: Some(output_value.clone()),
738 duration_ms: Some(duration_ms),
739 cost_usd: Some(Decimal::ZERO),
740 completed_at: Some(completed_at),
741 ..StepUpdate::default()
742 },
743 )
744 .await?;
745
746 info!(
747 run_id = %self.run_id,
748 step = %name,
749 kind = op.kind(),
750 duration_ms,
751 "operation step completed"
752 );
753
754 self.last_step_ids = vec![step.id];
755
756 Ok(StepOutput {
757 output: output_value,
758 duration_ms,
759 cost_usd: Decimal::ZERO,
760 input_tokens: None,
761 output_tokens: None,
762 debug_messages: None,
763 })
764 }
765 Err(err) => {
766 let completed_at = Utc::now();
767 if let Err(store_err) = self
768 .store
769 .update_step(
770 step.id,
771 StepUpdate {
772 status: Some(StepStatus::Failed),
773 error: Some(err.to_string()),
774 completed_at: Some(completed_at),
775 ..StepUpdate::default()
776 },
777 )
778 .await
779 {
780 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
781 }
782
783 Err(err)
784 }
785 }
786 }
787
788 /// Execute a sub-workflow step.
789 ///
790 /// Creates a child run for the named workflow handler, executes it with
791 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
792 /// the child run ID and aggregated metrics.
793 ///
794 /// Requires the context to be created with
795 /// `with_handler_resolver`.
796 ///
797 /// # Errors
798 ///
799 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
800 /// with the given name, or if no handler resolver is available.
801 ///
802 /// # Examples
803 ///
804 /// ```no_run
805 /// use ironflow_engine::context::WorkflowContext;
806 /// use ironflow_engine::error::EngineError;
807 /// use serde_json::json;
808 ///
809 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
810 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
811 /// # Ok(())
812 /// # }
813 /// ```
814 pub async fn workflow(
815 &mut self,
816 handler: &dyn WorkflowHandler,
817 payload: Value,
818 ) -> Result<StepOutput, EngineError> {
819 let config = WorkflowStepConfig::new(handler.name(), payload);
820 let position = self.position;
821 self.position += 1;
822
823 let step = self
824 .store
825 .create_step(NewStep {
826 run_id: self.run_id,
827 name: config.workflow_name.clone(),
828 kind: StepKind::Workflow,
829 position,
830 input: Some(serde_json::to_value(&config)?),
831 })
832 .await?;
833
834 self.start_step(step.id, Utc::now()).await?;
835
836 match self.execute_child_workflow(&config).await {
837 Ok(output) => {
838 self.total_cost_usd += output.cost_usd;
839 self.total_duration_ms += output.duration_ms;
840
841 let completed_at = Utc::now();
842 self.store
843 .update_step(
844 step.id,
845 StepUpdate {
846 status: Some(StepStatus::Completed),
847 output: Some(output.output.clone()),
848 duration_ms: Some(output.duration_ms),
849 cost_usd: Some(output.cost_usd),
850 completed_at: Some(completed_at),
851 ..StepUpdate::default()
852 },
853 )
854 .await?;
855
856 info!(
857 run_id = %self.run_id,
858 child_workflow = %config.workflow_name,
859 duration_ms = output.duration_ms,
860 "workflow step completed"
861 );
862
863 self.last_step_ids = vec![step.id];
864
865 Ok(output)
866 }
867 Err(err) => {
868 let completed_at = Utc::now();
869 if let Err(store_err) = self
870 .store
871 .update_step(
872 step.id,
873 StepUpdate {
874 status: Some(StepStatus::Failed),
875 error: Some(err.to_string()),
876 completed_at: Some(completed_at),
877 ..StepUpdate::default()
878 },
879 )
880 .await
881 {
882 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
883 }
884
885 Err(err)
886 }
887 }
888 }
889
890 /// Execute a child workflow and return aggregated output.
891 async fn execute_child_workflow(
892 &self,
893 config: &WorkflowStepConfig,
894 ) -> Result<StepOutput, EngineError> {
895 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
896 EngineError::InvalidWorkflow(
897 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
898 )
899 })?;
900
901 let handler = resolver(&config.workflow_name).ok_or_else(|| {
902 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
903 })?;
904
905 let parent_labels = self
906 .store
907 .get_run(self.run_id)
908 .await?
909 .map(|r| r.labels)
910 .unwrap_or_default();
911
912 let child_run = self
913 .store
914 .create_run(NewRun {
915 workflow_name: config.workflow_name.clone(),
916 trigger: TriggerKind::Workflow,
917 payload: config.payload.clone(),
918 max_retries: 0,
919 handler_version: None,
920 labels: parent_labels,
921 scheduled_at: None,
922 })
923 .await?;
924
925 let child_run_id = child_run.id;
926 info!(
927 parent_run_id = %self.run_id,
928 child_run_id = %child_run_id,
929 workflow = %config.workflow_name,
930 "child run created"
931 );
932
933 self.store
934 .update_run_status(child_run_id, RunStatus::Running)
935 .await?;
936
937 let run_start = Instant::now();
938 let mut child_ctx = WorkflowContext {
939 run_id: child_run_id,
940 store: self.store.clone(),
941 provider: self.provider.clone(),
942 handler_resolver: self.handler_resolver.clone(),
943 position: 0,
944 last_step_ids: Vec::new(),
945 total_cost_usd: Decimal::ZERO,
946 total_duration_ms: 0,
947 replay_steps: HashMap::new(),
948 log_sender: self.log_sender.clone(),
949 };
950
951 let result = handler.execute(&mut child_ctx).await;
952 let total_duration = run_start.elapsed().as_millis() as u64;
953 let completed_at = Utc::now();
954
955 match result {
956 Ok(()) => {
957 self.store
958 .update_run(
959 child_run_id,
960 RunUpdate {
961 status: Some(RunStatus::Completed),
962 cost_usd: Some(child_ctx.total_cost_usd),
963 duration_ms: Some(total_duration),
964 completed_at: Some(completed_at),
965 ..RunUpdate::default()
966 },
967 )
968 .await?;
969
970 Ok(StepOutput {
971 output: serde_json::json!({
972 "run_id": child_run_id,
973 "workflow_name": config.workflow_name,
974 "status": RunStatus::Completed,
975 "cost_usd": child_ctx.total_cost_usd,
976 "duration_ms": total_duration,
977 }),
978 duration_ms: total_duration,
979 cost_usd: child_ctx.total_cost_usd,
980 input_tokens: None,
981 output_tokens: None,
982 debug_messages: None,
983 })
984 }
985 Err(err) => {
986 if let Err(store_err) = self
987 .store
988 .update_run(
989 child_run_id,
990 RunUpdate {
991 status: Some(RunStatus::Failed),
992 error: Some(err.to_string()),
993 cost_usd: Some(child_ctx.total_cost_usd),
994 duration_ms: Some(total_duration),
995 completed_at: Some(completed_at),
996 ..RunUpdate::default()
997 },
998 )
999 .await
1000 {
1001 error!(
1002 child_run_id = %child_run_id,
1003 store_error = %store_err,
1004 "failed to persist child run failure"
1005 );
1006 }
1007
1008 Err(err)
1009 }
1010 }
1011 }
1012
1013 /// Try to replay a completed step from a previous execution.
1014 ///
1015 /// Returns `Some(StepOutput)` if a completed step exists at the given
1016 /// position, `None` otherwise.
1017 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
1018 let step = self.replay_steps.get(&position)?;
1019 if step.status.state != StepStatus::Completed {
1020 return None;
1021 }
1022 let output = StepOutput {
1023 output: step.output.clone().unwrap_or(Value::Null),
1024 duration_ms: step.duration_ms,
1025 cost_usd: step.cost_usd,
1026 input_tokens: step.input_tokens,
1027 output_tokens: step.output_tokens,
1028 debug_messages: None,
1029 };
1030 self.total_cost_usd += output.cost_usd;
1031 self.total_duration_ms += output.duration_ms;
1032 self.last_step_ids = vec![step.id];
1033 info!(
1034 run_id = %self.run_id,
1035 step = %step.name,
1036 position,
1037 "step replayed from previous execution"
1038 );
1039 Some(output)
1040 }
1041
1042 /// Internal: execute a step with full persistence lifecycle.
1043 async fn execute_step(
1044 &mut self,
1045 name: &str,
1046 kind: StepKind,
1047 config: StepConfig,
1048 ) -> Result<StepOutput, EngineError> {
1049 let position = self.position;
1050 self.position += 1;
1051
1052 // Replay: if this step already completed in a prior execution, return cached output.
1053 if let Some(output) = self.try_replay_step(position) {
1054 return Ok(output);
1055 }
1056
1057 // Create step record in Pending.
1058 let step = self
1059 .store
1060 .create_step(NewStep {
1061 run_id: self.run_id,
1062 name: name.to_string(),
1063 kind,
1064 position,
1065 input: Some(serde_json::to_value(&config)?),
1066 })
1067 .await?;
1068
1069 self.start_step(step.id, Utc::now()).await?;
1070
1071 let step_log_sender = self
1072 .log_sender
1073 .as_ref()
1074 .map(|s| StepLogSender::new(s.clone(), self.run_id, step.id, name.to_string()));
1075
1076 match execute_step_config(&config, &self.provider, step_log_sender).await {
1077 Ok(output) => {
1078 self.total_cost_usd += output.cost_usd;
1079 self.total_duration_ms += output.duration_ms;
1080
1081 let debug_messages_json = output.debug_messages_json();
1082
1083 let completed_at = Utc::now();
1084 self.store
1085 .update_step(
1086 step.id,
1087 StepUpdate {
1088 status: Some(StepStatus::Completed),
1089 output: Some(output.output.clone()),
1090 duration_ms: Some(output.duration_ms),
1091 cost_usd: Some(output.cost_usd),
1092 input_tokens: output.input_tokens,
1093 output_tokens: output.output_tokens,
1094 completed_at: Some(completed_at),
1095 debug_messages: debug_messages_json,
1096 ..StepUpdate::default()
1097 },
1098 )
1099 .await?;
1100
1101 info!(
1102 run_id = %self.run_id,
1103 step = %name,
1104 duration_ms = output.duration_ms,
1105 "step completed"
1106 );
1107
1108 self.last_step_ids = vec![step.id];
1109
1110 Ok(output)
1111 }
1112 Err(err) => {
1113 let completed_at = Utc::now();
1114 let debug_messages_json = extract_debug_messages_from_error(&err);
1115 let partial = extract_partial_usage_from_error(&err);
1116
1117 if let Some(ref usage) = partial {
1118 if let Some(cost) = usage.cost_usd {
1119 self.total_cost_usd += cost;
1120 }
1121 if let Some(dur) = usage.duration_ms {
1122 self.total_duration_ms += dur;
1123 }
1124 }
1125
1126 if let Err(store_err) = self
1127 .store
1128 .update_step(
1129 step.id,
1130 StepUpdate {
1131 status: Some(StepStatus::Failed),
1132 error: Some(err.to_string()),
1133 completed_at: Some(completed_at),
1134 debug_messages: debug_messages_json,
1135 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1136 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1137 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1138 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1139 ..StepUpdate::default()
1140 },
1141 )
1142 .await
1143 {
1144 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1145 }
1146
1147 Err(err)
1148 }
1149 }
1150 }
1151
1152 /// Record dependency edges and transition a step to Running.
1153 ///
1154 /// Records edges from `step_id` to all `last_step_ids`, then
1155 /// transitions the step to `Running` with the given timestamp.
1156 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1157 if !self.last_step_ids.is_empty() {
1158 let deps: Vec<NewStepDependency> = self
1159 .last_step_ids
1160 .iter()
1161 .map(|&depends_on| NewStepDependency {
1162 step_id,
1163 depends_on,
1164 })
1165 .collect();
1166 self.store.create_step_dependencies(deps).await?;
1167 }
1168
1169 self.store
1170 .update_step(
1171 step_id,
1172 StepUpdate {
1173 status: Some(StepStatus::Running),
1174 started_at: Some(now),
1175 ..StepUpdate::default()
1176 },
1177 )
1178 .await?;
1179
1180 Ok(())
1181 }
1182
1183 /// Access the store directly (advanced usage).
1184 pub fn store(&self) -> &Arc<dyn Store> {
1185 &self.store
1186 }
1187
1188 /// Access the payload that triggered this run.
1189 ///
1190 /// Fetches the run from the store and returns its payload.
1191 ///
1192 /// # Errors
1193 ///
1194 /// Returns [`EngineError::Store`] if the run is not found.
1195 pub async fn payload(&self) -> Result<Value, EngineError> {
1196 let run = self
1197 .store
1198 .get_run(self.run_id)
1199 .await?
1200 .ok_or(EngineError::Store(
1201 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1202 ))?;
1203 Ok(run.payload)
1204 }
1205}
1206
1207impl fmt::Debug for WorkflowContext {
1208 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1209 f.debug_struct("WorkflowContext")
1210 .field("run_id", &self.run_id)
1211 .field("position", &self.position)
1212 .field("total_cost_usd", &self.total_cost_usd)
1213 .finish_non_exhaustive()
1214 }
1215}
1216
1217/// Extract debug messages from an engine error, if it wraps a schema validation
1218/// failure that carries a verbose conversation trace.
1219fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1220 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1221 debug_messages,
1222 ..
1223 })) = err
1224 && !debug_messages.is_empty()
1225 {
1226 return serde_json::to_value(debug_messages).ok();
1227 }
1228 None
1229}
1230
1231/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1232///
1233/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1234/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1235/// happens here, at the engine/store boundary.
1236struct StepPartialUsage {
1237 cost_usd: Option<Decimal>,
1238 duration_ms: Option<u64>,
1239 input_tokens: Option<u64>,
1240 output_tokens: Option<u64>,
1241}
1242
1243fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1244 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1245 partial_usage,
1246 ..
1247 })) = err
1248 && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1249 {
1250 return Some(StepPartialUsage {
1251 cost_usd: partial_usage
1252 .cost_usd
1253 .and_then(|c| Decimal::try_from(c).ok()),
1254 duration_ms: partial_usage.duration_ms,
1255 input_tokens: partial_usage.input_tokens,
1256 output_tokens: partial_usage.output_tokens,
1257 });
1258 }
1259 None
1260}