ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::collections::HashMap;
27use std::fmt;
28use std::sync::Arc;
29use std::time::Instant;
30
31use chrono::{DateTime, Utc};
32use rust_decimal::Decimal;
33use serde_json::Value;
34use tokio::task::JoinSet;
35use tracing::{error, info};
36use uuid::Uuid;
37
38use ironflow_core::error::{AgentError, OperationError};
39use ironflow_core::provider::AgentProvider;
40use ironflow_store::models::{
41 NewRun, NewStep, NewStepDependency, RunStatus, RunUpdate, Step, StepKind, StepStatus,
42 StepUpdate, TriggerKind,
43};
44use ironflow_store::store::Store;
45
46use crate::config::{
47 AgentStepConfig, ApprovalConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig,
48};
49use crate::error::EngineError;
50use crate::executor::{ParallelStepResult, StepOutput, execute_step_config};
51use crate::handler::WorkflowHandler;
52use crate::log_sender::{LogSender, StepLogSender};
53use crate::operation::Operation;
54
55/// Callback type for resolving workflow handlers by name.
56pub(crate) type HandlerResolver =
57 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
58
59/// Execution context for a single workflow run.
60///
61/// Tracks the current step position and provides convenience methods
62/// for executing operations with automatic persistence.
63///
64/// # Examples
65///
66/// ```no_run
67/// use ironflow_engine::context::WorkflowContext;
68/// use ironflow_engine::config::ShellConfig;
69/// use ironflow_engine::error::EngineError;
70///
71/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
72/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
73/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
74/// # Ok(())
75/// # }
76/// ```
77pub struct WorkflowContext {
78 run_id: Uuid,
79 store: Arc<dyn Store>,
80 provider: Arc<dyn AgentProvider>,
81 handler_resolver: Option<HandlerResolver>,
82 position: u32,
83 /// IDs of the last executed step(s) -- used to record DAG dependencies.
84 last_step_ids: Vec<Uuid>,
85 /// Accumulated cost across all steps in this run.
86 total_cost_usd: Decimal,
87 /// Accumulated duration across all steps.
88 total_duration_ms: u64,
89 /// Steps from a previous execution, keyed by position.
90 /// Used when resuming after approval to replay completed steps.
91 replay_steps: HashMap<u32, Step>,
92 /// Optional sender for real-time log streaming.
93 log_sender: Option<LogSender>,
94}
95
96impl WorkflowContext {
97 /// Create a new context for a run.
98 ///
99 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
100 /// creates this when executing a [`WorkflowHandler`].
101 pub fn new(run_id: Uuid, store: Arc<dyn Store>, provider: Arc<dyn AgentProvider>) -> Self {
102 Self {
103 run_id,
104 store,
105 provider,
106 handler_resolver: None,
107 position: 0,
108 last_step_ids: Vec::new(),
109 total_cost_usd: Decimal::ZERO,
110 total_duration_ms: 0,
111 replay_steps: HashMap::new(),
112 log_sender: None,
113 }
114 }
115
116 /// Create a new context with a handler resolver for sub-workflow support.
117 ///
118 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
119 /// look up registered handlers by name.
120 pub(crate) fn with_handler_resolver(
121 run_id: Uuid,
122 store: Arc<dyn Store>,
123 provider: Arc<dyn AgentProvider>,
124 resolver: HandlerResolver,
125 ) -> Self {
126 Self {
127 run_id,
128 store,
129 provider,
130 handler_resolver: Some(resolver),
131 position: 0,
132 last_step_ids: Vec::new(),
133 total_cost_usd: Decimal::ZERO,
134 total_duration_ms: 0,
135 replay_steps: HashMap::new(),
136 log_sender: None,
137 }
138 }
139
140 /// Attach a log sender for real-time step output streaming.
141 pub fn set_log_sender(&mut self, sender: LogSender) {
142 self.log_sender = Some(sender);
143 }
144
145 /// Load existing steps from the store for replay after approval.
146 ///
147 /// Called by the engine when resuming a run. All completed steps
148 /// and the approved approval step are indexed by position so that
149 /// `execute_step` and `approval` can skip them.
150 pub(crate) async fn load_replay_steps(&mut self) -> Result<(), EngineError> {
151 let steps = self.store.list_steps(self.run_id).await?;
152 for step in steps {
153 let dominated = matches!(
154 step.status.state,
155 StepStatus::Completed | StepStatus::Running | StepStatus::AwaitingApproval
156 );
157 if dominated {
158 self.replay_steps.insert(step.position, step);
159 }
160 }
161 Ok(())
162 }
163
164 /// The run ID this context is executing for.
165 pub fn run_id(&self) -> Uuid {
166 self.run_id
167 }
168
169 /// Accumulated cost across all executed steps so far.
170 pub fn total_cost_usd(&self) -> Decimal {
171 self.total_cost_usd
172 }
173
174 /// Accumulated duration across all executed steps so far.
175 pub fn total_duration_ms(&self) -> u64 {
176 self.total_duration_ms
177 }
178
179 /// Execute multiple steps concurrently (wait-all model).
180 ///
181 /// All steps in the batch execute in parallel via `tokio::JoinSet`.
182 /// Each step is recorded with the same `position` (execution wave).
183 /// Dependencies on previous steps are recorded automatically.
184 ///
185 /// When `fail_fast` is true, remaining steps are aborted on the first
186 /// failure. When false, all steps run to completion and the first
187 /// error is returned.
188 ///
189 /// # Errors
190 ///
191 /// Returns [`EngineError`] if any step fails.
192 ///
193 /// # Examples
194 ///
195 /// ```no_run
196 /// use ironflow_engine::context::WorkflowContext;
197 /// use ironflow_engine::config::{StepConfig, ShellConfig};
198 /// use ironflow_engine::error::EngineError;
199 ///
200 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
201 /// let results = ctx.parallel(
202 /// vec![
203 /// ("test-unit", StepConfig::Shell(ShellConfig::new("cargo test --lib"))),
204 /// ("lint", StepConfig::Shell(ShellConfig::new("cargo clippy"))),
205 /// ],
206 /// true,
207 /// ).await?;
208 ///
209 /// for r in &results {
210 /// println!("{}: {:?}", r.name, r.output.output);
211 /// }
212 /// # Ok(())
213 /// # }
214 /// ```
215 pub async fn parallel(
216 &mut self,
217 steps: Vec<(&str, StepConfig)>,
218 fail_fast: bool,
219 ) -> Result<Vec<ParallelStepResult>, EngineError> {
220 if steps.is_empty() {
221 return Ok(Vec::new());
222 }
223
224 let wave_position = self.position;
225 self.position += 1;
226
227 let now = Utc::now();
228 let mut step_records: Vec<(Uuid, String, StepConfig)> = Vec::with_capacity(steps.len());
229
230 for (name, config) in &steps {
231 let kind = config.kind();
232 let step = self
233 .store
234 .create_step(NewStep {
235 run_id: self.run_id,
236 name: name.to_string(),
237 kind,
238 position: wave_position,
239 input: Some(serde_json::to_value(config)?),
240 })
241 .await?;
242
243 self.start_step(step.id, now).await?;
244
245 step_records.push((step.id, name.to_string(), config.clone()));
246 }
247
248 let mut join_set = JoinSet::new();
249 for (idx, (step_id, step_name, config)) in step_records.iter().enumerate() {
250 let provider = self.provider.clone();
251 let config = config.clone();
252 let step_log_sender = self
253 .log_sender
254 .as_ref()
255 .map(|s| StepLogSender::new(s.clone(), self.run_id, *step_id, step_name.clone()));
256 join_set.spawn(async move {
257 (
258 idx,
259 execute_step_config(&config, &provider, step_log_sender).await,
260 )
261 });
262 }
263
264 // JoinSet returns in completion order; indexed_results restores input order.
265 let mut indexed_results: Vec<Option<Result<StepOutput, String>>> =
266 vec![None; step_records.len()];
267 let mut first_error: Option<EngineError> = None;
268
269 while let Some(join_result) = join_set.join_next().await {
270 let (idx, step_result) = match join_result {
271 Ok(r) => r,
272 Err(e) => {
273 if first_error.is_none() {
274 first_error = Some(EngineError::StepConfig(format!("join error: {e}")));
275 }
276 if fail_fast {
277 join_set.abort_all();
278 }
279 continue;
280 }
281 };
282
283 let (step_id, step_name, _) = &step_records[idx];
284 let completed_at = Utc::now();
285
286 match step_result {
287 Ok(output) => {
288 self.total_cost_usd += output.cost_usd;
289 self.total_duration_ms += output.duration_ms;
290
291 let debug_messages_json = output.debug_messages_json();
292
293 self.store
294 .update_step(
295 *step_id,
296 StepUpdate {
297 status: Some(StepStatus::Completed),
298 output: Some(output.output.clone()),
299 duration_ms: Some(output.duration_ms),
300 cost_usd: Some(output.cost_usd),
301 input_tokens: output.input_tokens,
302 output_tokens: output.output_tokens,
303 completed_at: Some(completed_at),
304 debug_messages: debug_messages_json,
305 ..StepUpdate::default()
306 },
307 )
308 .await?;
309
310 info!(
311 run_id = %self.run_id,
312 step = %step_name,
313 duration_ms = output.duration_ms,
314 "parallel step completed"
315 );
316
317 indexed_results[idx] = Some(Ok(output));
318 }
319 Err(err) => {
320 let err_msg = err.to_string();
321 let debug_messages_json = extract_debug_messages_from_error(&err);
322 let partial = extract_partial_usage_from_error(&err);
323
324 if let Some(ref usage) = partial {
325 if let Some(cost) = usage.cost_usd {
326 self.total_cost_usd += cost;
327 }
328 if let Some(dur) = usage.duration_ms {
329 self.total_duration_ms += dur;
330 }
331 }
332
333 if let Err(store_err) = self
334 .store
335 .update_step(
336 *step_id,
337 StepUpdate {
338 status: Some(StepStatus::Failed),
339 error: Some(err_msg.clone()),
340 completed_at: Some(completed_at),
341 debug_messages: debug_messages_json,
342 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
343 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
344 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
345 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
346 ..StepUpdate::default()
347 },
348 )
349 .await
350 {
351 tracing::error!(
352 step_id = %step_id,
353 error = %store_err,
354 "failed to persist parallel step failure"
355 );
356 }
357
358 indexed_results[idx] = Some(Err(err_msg.clone()));
359
360 if first_error.is_none() {
361 first_error = Some(err);
362 }
363
364 if fail_fast {
365 join_set.abort_all();
366 }
367 }
368 }
369 }
370
371 if let Some(err) = first_error {
372 return Err(err);
373 }
374
375 self.last_step_ids = step_records.iter().map(|(id, _, _)| *id).collect();
376
377 // Build results in original order.
378 let results: Vec<ParallelStepResult> = step_records
379 .iter()
380 .enumerate()
381 .map(|(idx, (step_id, name, _))| {
382 let output = match indexed_results[idx].take() {
383 Some(Ok(o)) => o,
384 _ => unreachable!("all steps succeeded if no error returned"),
385 };
386 ParallelStepResult {
387 name: name.clone(),
388 output,
389 step_id: *step_id,
390 }
391 })
392 .collect();
393
394 Ok(results)
395 }
396
397 /// Execute a shell step.
398 ///
399 /// Creates the step record, runs the command, persists the result,
400 /// and returns the output for use in subsequent steps.
401 ///
402 /// # Errors
403 ///
404 /// Returns [`EngineError`] if the command fails or the store errors.
405 ///
406 /// # Examples
407 ///
408 /// ```no_run
409 /// use ironflow_engine::context::WorkflowContext;
410 /// use ironflow_engine::config::ShellConfig;
411 /// use ironflow_engine::error::EngineError;
412 ///
413 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
414 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
415 /// println!("stdout: {}", files.output["stdout"]);
416 /// # Ok(())
417 /// # }
418 /// ```
419 pub async fn shell(
420 &mut self,
421 name: &str,
422 config: ShellConfig,
423 ) -> Result<StepOutput, EngineError> {
424 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
425 .await
426 }
427
428 /// Execute an HTTP step.
429 ///
430 /// # Errors
431 ///
432 /// Returns [`EngineError`] if the request fails or the store errors.
433 ///
434 /// # Examples
435 ///
436 /// ```no_run
437 /// use ironflow_engine::context::WorkflowContext;
438 /// use ironflow_engine::config::HttpConfig;
439 /// use ironflow_engine::error::EngineError;
440 ///
441 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
442 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
443 /// println!("status: {}", resp.output["status"]);
444 /// # Ok(())
445 /// # }
446 /// ```
447 pub async fn http(
448 &mut self,
449 name: &str,
450 config: HttpConfig,
451 ) -> Result<StepOutput, EngineError> {
452 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
453 .await
454 }
455
456 /// Execute an agent step.
457 ///
458 /// # Errors
459 ///
460 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
461 ///
462 /// # Examples
463 ///
464 /// ```no_run
465 /// use ironflow_engine::context::WorkflowContext;
466 /// use ironflow_engine::config::AgentStepConfig;
467 /// use ironflow_engine::error::EngineError;
468 ///
469 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
470 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
471 /// println!("review: {}", review.output);
472 /// # Ok(())
473 /// # }
474 /// ```
475 pub async fn agent(
476 &mut self,
477 name: &str,
478 config: impl Into<AgentStepConfig>,
479 ) -> Result<StepOutput, EngineError> {
480 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config.into()))
481 .await
482 }
483
484 /// Create a human approval gate.
485 ///
486 /// On first execution, records an approval step and returns
487 /// [`EngineError::ApprovalRequired`] to suspend the run. The engine
488 /// transitions the run to `AwaitingApproval`.
489 ///
490 /// On resume (after a human approved via the API), the approval step
491 /// is replayed: it is marked as `Completed` and execution continues
492 /// past it. Multiple approval gates in the same handler work -- each
493 /// one pauses and resumes independently.
494 ///
495 /// # Errors
496 ///
497 /// Returns [`EngineError::ApprovalRequired`] to pause the run on
498 /// first execution. Returns other [`EngineError`] variants on store
499 /// failures.
500 ///
501 /// # Examples
502 ///
503 /// ```no_run
504 /// use ironflow_engine::context::WorkflowContext;
505 /// use ironflow_engine::config::ApprovalConfig;
506 /// use ironflow_engine::error::EngineError;
507 ///
508 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
509 /// ctx.approval("deploy-gate", ApprovalConfig::new("Approve deployment?")).await?;
510 /// // Execution continues here after approval
511 /// # Ok(())
512 /// # }
513 /// ```
514 pub async fn approval(
515 &mut self,
516 name: &str,
517 config: ApprovalConfig,
518 ) -> Result<(), EngineError> {
519 let position = self.position;
520 self.position += 1;
521
522 // Replay: if this approval step exists from a prior execution,
523 // the run was approved -- mark it completed (if not already) and continue.
524 if let Some(existing) = self.replay_steps.get(&position)
525 && existing.kind == StepKind::Approval
526 {
527 if existing.status.state == StepStatus::AwaitingApproval {
528 self.store
529 .update_step(
530 existing.id,
531 StepUpdate {
532 status: Some(StepStatus::Completed),
533 completed_at: Some(Utc::now()),
534 ..StepUpdate::default()
535 },
536 )
537 .await?;
538 }
539
540 self.last_step_ids = vec![existing.id];
541 info!(
542 run_id = %self.run_id,
543 step = %name,
544 position,
545 "approval step replayed (approved)"
546 );
547 return Ok(());
548 }
549
550 // First execution: create the approval step and suspend.
551 let step = self
552 .store
553 .create_step(NewStep {
554 run_id: self.run_id,
555 name: name.to_string(),
556 kind: StepKind::Approval,
557 position,
558 input: Some(serde_json::to_value(&config)?),
559 })
560 .await?;
561
562 self.start_step(step.id, Utc::now()).await?;
563
564 // Transition the step to AwaitingApproval so it reflects
565 // the suspended state on the dashboard.
566 self.store
567 .update_step(
568 step.id,
569 StepUpdate {
570 status: Some(StepStatus::AwaitingApproval),
571 ..StepUpdate::default()
572 },
573 )
574 .await?;
575
576 self.last_step_ids = vec![step.id];
577
578 Err(EngineError::ApprovalRequired {
579 run_id: self.run_id,
580 step_id: step.id,
581 message: config.message().to_string(),
582 })
583 }
584
585 /// Record a step as explicitly skipped.
586 ///
587 /// Use this inside an `if`/`else` branch when a step should not execute
588 /// but must still appear in the DAG and timeline with its reason.
589 ///
590 /// The step is created directly in [`StepStatus::Skipped`] state and the
591 /// reason is stored in the output as `{"reason": "..."}`.
592 ///
593 /// # Errors
594 ///
595 /// Returns [`EngineError`] if the store fails.
596 ///
597 /// # Examples
598 ///
599 /// ```no_run
600 /// use ironflow_engine::context::WorkflowContext;
601 /// use ironflow_engine::error::EngineError;
602 ///
603 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
604 /// let tests_passed = false;
605 /// if tests_passed {
606 /// // ctx.shell("deploy", ...).await?;
607 /// } else {
608 /// ctx.skip("deploy", "tests failed").await?;
609 /// }
610 /// # Ok(())
611 /// # }
612 /// ```
613 pub async fn skip(&mut self, name: &str, reason: &str) -> Result<(), EngineError> {
614 let position = self.position;
615 self.position += 1;
616
617 let step = self
618 .store
619 .create_step(NewStep {
620 run_id: self.run_id,
621 name: name.to_string(),
622 kind: StepKind::Custom("skip".to_string()),
623 position,
624 input: None,
625 })
626 .await?;
627
628 if !self.last_step_ids.is_empty() {
629 let deps: Vec<NewStepDependency> = self
630 .last_step_ids
631 .iter()
632 .map(|&depends_on| NewStepDependency {
633 step_id: step.id,
634 depends_on,
635 })
636 .collect();
637 self.store.create_step_dependencies(deps).await?;
638 }
639
640 let now = Utc::now();
641 self.store
642 .update_step(
643 step.id,
644 StepUpdate {
645 status: Some(StepStatus::Skipped),
646 output: Some(serde_json::json!({"reason": reason})),
647 completed_at: Some(now),
648 ..StepUpdate::default()
649 },
650 )
651 .await?;
652
653 self.last_step_ids = vec![step.id];
654
655 info!(
656 run_id = %self.run_id,
657 step = %name,
658 reason,
659 "step skipped"
660 );
661
662 Ok(())
663 }
664
665 /// Execute a custom operation step.
666 ///
667 /// Runs a user-defined [`Operation`] with full step lifecycle management:
668 /// creates the step record, transitions to Running, executes the operation,
669 /// persists the output and duration, and marks the step Completed or Failed.
670 ///
671 /// The operation's [`kind()`](Operation::kind) is stored as
672 /// [`StepKind::Custom`].
673 ///
674 /// # Errors
675 ///
676 /// Returns [`EngineError`] if the operation fails or the store errors.
677 ///
678 /// # Examples
679 ///
680 /// ```no_run
681 /// use ironflow_engine::context::WorkflowContext;
682 /// use ironflow_engine::operation::Operation;
683 /// use ironflow_engine::error::EngineError;
684 /// use serde_json::{Value, json};
685 /// use std::pin::Pin;
686 /// use std::future::Future;
687 ///
688 /// struct MyOp;
689 /// impl Operation for MyOp {
690 /// fn kind(&self) -> &str { "my-service" }
691 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
692 /// Box::pin(async { Ok(json!({"ok": true})) })
693 /// }
694 /// }
695 ///
696 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
697 /// let result = ctx.operation("call-service", &MyOp).await?;
698 /// println!("output: {}", result.output);
699 /// # Ok(())
700 /// # }
701 /// ```
702 pub async fn operation(
703 &mut self,
704 name: &str,
705 op: &dyn Operation,
706 ) -> Result<StepOutput, EngineError> {
707 let kind = StepKind::Custom(op.kind().to_string());
708 let position = self.position;
709 self.position += 1;
710
711 let step = self
712 .store
713 .create_step(NewStep {
714 run_id: self.run_id,
715 name: name.to_string(),
716 kind,
717 position,
718 input: op.input(),
719 })
720 .await?;
721
722 self.start_step(step.id, Utc::now()).await?;
723
724 let start = Instant::now();
725
726 match op.execute().await {
727 Ok(output_value) => {
728 let duration_ms = start.elapsed().as_millis() as u64;
729 self.total_duration_ms += duration_ms;
730
731 let completed_at = Utc::now();
732 self.store
733 .update_step(
734 step.id,
735 StepUpdate {
736 status: Some(StepStatus::Completed),
737 output: Some(output_value.clone()),
738 duration_ms: Some(duration_ms),
739 cost_usd: Some(Decimal::ZERO),
740 completed_at: Some(completed_at),
741 ..StepUpdate::default()
742 },
743 )
744 .await?;
745
746 info!(
747 run_id = %self.run_id,
748 step = %name,
749 kind = op.kind(),
750 duration_ms,
751 "operation step completed"
752 );
753
754 self.last_step_ids = vec![step.id];
755
756 Ok(StepOutput {
757 output: output_value,
758 duration_ms,
759 cost_usd: Decimal::ZERO,
760 input_tokens: None,
761 output_tokens: None,
762 model: None,
763 debug_messages: None,
764 })
765 }
766 Err(err) => {
767 let completed_at = Utc::now();
768 if let Err(store_err) = self
769 .store
770 .update_step(
771 step.id,
772 StepUpdate {
773 status: Some(StepStatus::Failed),
774 error: Some(err.to_string()),
775 completed_at: Some(completed_at),
776 ..StepUpdate::default()
777 },
778 )
779 .await
780 {
781 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
782 }
783
784 Err(err)
785 }
786 }
787 }
788
789 /// Execute a sub-workflow step.
790 ///
791 /// Creates a child run for the named workflow handler, executes it with
792 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
793 /// the child run ID and aggregated metrics.
794 ///
795 /// Requires the context to be created with
796 /// `with_handler_resolver`.
797 ///
798 /// # Errors
799 ///
800 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
801 /// with the given name, or if no handler resolver is available.
802 ///
803 /// # Examples
804 ///
805 /// ```no_run
806 /// use ironflow_engine::context::WorkflowContext;
807 /// use ironflow_engine::error::EngineError;
808 /// use serde_json::json;
809 ///
810 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
811 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
812 /// # Ok(())
813 /// # }
814 /// ```
815 pub async fn workflow(
816 &mut self,
817 handler: &dyn WorkflowHandler,
818 payload: Value,
819 ) -> Result<StepOutput, EngineError> {
820 let config = WorkflowStepConfig::new(handler.name(), payload);
821 let position = self.position;
822 self.position += 1;
823
824 let step = self
825 .store
826 .create_step(NewStep {
827 run_id: self.run_id,
828 name: config.workflow_name.clone(),
829 kind: StepKind::Workflow,
830 position,
831 input: Some(serde_json::to_value(&config)?),
832 })
833 .await?;
834
835 self.start_step(step.id, Utc::now()).await?;
836
837 match self.execute_child_workflow(&config).await {
838 Ok(output) => {
839 self.total_cost_usd += output.cost_usd;
840 self.total_duration_ms += output.duration_ms;
841
842 let completed_at = Utc::now();
843 self.store
844 .update_step(
845 step.id,
846 StepUpdate {
847 status: Some(StepStatus::Completed),
848 output: Some(output.output.clone()),
849 duration_ms: Some(output.duration_ms),
850 cost_usd: Some(output.cost_usd),
851 completed_at: Some(completed_at),
852 ..StepUpdate::default()
853 },
854 )
855 .await?;
856
857 info!(
858 run_id = %self.run_id,
859 child_workflow = %config.workflow_name,
860 duration_ms = output.duration_ms,
861 "workflow step completed"
862 );
863
864 self.last_step_ids = vec![step.id];
865
866 Ok(output)
867 }
868 Err(err) => {
869 let completed_at = Utc::now();
870 if let Err(store_err) = self
871 .store
872 .update_step(
873 step.id,
874 StepUpdate {
875 status: Some(StepStatus::Failed),
876 error: Some(err.to_string()),
877 completed_at: Some(completed_at),
878 ..StepUpdate::default()
879 },
880 )
881 .await
882 {
883 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
884 }
885
886 Err(err)
887 }
888 }
889 }
890
891 /// Execute a child workflow and return aggregated output.
892 async fn execute_child_workflow(
893 &self,
894 config: &WorkflowStepConfig,
895 ) -> Result<StepOutput, EngineError> {
896 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
897 EngineError::InvalidWorkflow(
898 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
899 )
900 })?;
901
902 let handler = resolver(&config.workflow_name).ok_or_else(|| {
903 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
904 })?;
905
906 let parent_labels = self
907 .store
908 .get_run(self.run_id)
909 .await?
910 .map(|r| r.labels)
911 .unwrap_or_default();
912
913 let child_run = self
914 .store
915 .create_run(NewRun {
916 workflow_name: config.workflow_name.clone(),
917 trigger: TriggerKind::Workflow,
918 payload: config.payload.clone(),
919 max_retries: 0,
920 handler_version: None,
921 labels: parent_labels,
922 scheduled_at: None,
923 })
924 .await?;
925
926 let child_run_id = child_run.id;
927 info!(
928 parent_run_id = %self.run_id,
929 child_run_id = %child_run_id,
930 workflow = %config.workflow_name,
931 "child run created"
932 );
933
934 self.store
935 .update_run_status(child_run_id, RunStatus::Running)
936 .await?;
937
938 let run_start = Instant::now();
939 let mut child_ctx = WorkflowContext {
940 run_id: child_run_id,
941 store: self.store.clone(),
942 provider: self.provider.clone(),
943 handler_resolver: self.handler_resolver.clone(),
944 position: 0,
945 last_step_ids: Vec::new(),
946 total_cost_usd: Decimal::ZERO,
947 total_duration_ms: 0,
948 replay_steps: HashMap::new(),
949 log_sender: self.log_sender.clone(),
950 };
951
952 let result = handler.execute(&mut child_ctx).await;
953 let total_duration = run_start.elapsed().as_millis() as u64;
954 let completed_at = Utc::now();
955
956 match result {
957 Ok(()) => {
958 self.store
959 .update_run(
960 child_run_id,
961 RunUpdate {
962 status: Some(RunStatus::Completed),
963 cost_usd: Some(child_ctx.total_cost_usd),
964 duration_ms: Some(total_duration),
965 completed_at: Some(completed_at),
966 ..RunUpdate::default()
967 },
968 )
969 .await?;
970
971 Ok(StepOutput {
972 output: serde_json::json!({
973 "run_id": child_run_id,
974 "workflow_name": config.workflow_name,
975 "status": RunStatus::Completed,
976 "cost_usd": child_ctx.total_cost_usd,
977 "duration_ms": total_duration,
978 }),
979 duration_ms: total_duration,
980 cost_usd: child_ctx.total_cost_usd,
981 input_tokens: None,
982 output_tokens: None,
983 model: None,
984 debug_messages: None,
985 })
986 }
987 Err(err) => {
988 if let Err(store_err) = self
989 .store
990 .update_run(
991 child_run_id,
992 RunUpdate {
993 status: Some(RunStatus::Failed),
994 error: Some(err.to_string()),
995 cost_usd: Some(child_ctx.total_cost_usd),
996 duration_ms: Some(total_duration),
997 completed_at: Some(completed_at),
998 ..RunUpdate::default()
999 },
1000 )
1001 .await
1002 {
1003 error!(
1004 child_run_id = %child_run_id,
1005 store_error = %store_err,
1006 "failed to persist child run failure"
1007 );
1008 }
1009
1010 Err(err)
1011 }
1012 }
1013 }
1014
1015 /// Try to replay a completed step from a previous execution.
1016 ///
1017 /// Returns `Some(StepOutput)` if a completed step exists at the given
1018 /// position, `None` otherwise.
1019 fn try_replay_step(&mut self, position: u32) -> Option<StepOutput> {
1020 let step = self.replay_steps.get(&position)?;
1021 if step.status.state != StepStatus::Completed {
1022 return None;
1023 }
1024 let output = StepOutput {
1025 output: step.output.clone().unwrap_or(Value::Null),
1026 duration_ms: step.duration_ms,
1027 cost_usd: step.cost_usd,
1028 input_tokens: step.input_tokens,
1029 output_tokens: step.output_tokens,
1030 model: None,
1031 debug_messages: None,
1032 };
1033 self.total_cost_usd += output.cost_usd;
1034 self.total_duration_ms += output.duration_ms;
1035 self.last_step_ids = vec![step.id];
1036 info!(
1037 run_id = %self.run_id,
1038 step = %step.name,
1039 position,
1040 "step replayed from previous execution"
1041 );
1042 Some(output)
1043 }
1044
1045 /// Internal: execute a step with full persistence lifecycle.
1046 async fn execute_step(
1047 &mut self,
1048 name: &str,
1049 kind: StepKind,
1050 config: StepConfig,
1051 ) -> Result<StepOutput, EngineError> {
1052 let position = self.position;
1053 self.position += 1;
1054
1055 // Replay: if this step already completed in a prior execution, return cached output.
1056 if let Some(output) = self.try_replay_step(position) {
1057 return Ok(output);
1058 }
1059
1060 // Create step record in Pending.
1061 let step = self
1062 .store
1063 .create_step(NewStep {
1064 run_id: self.run_id,
1065 name: name.to_string(),
1066 kind,
1067 position,
1068 input: Some(serde_json::to_value(&config)?),
1069 })
1070 .await?;
1071
1072 self.start_step(step.id, Utc::now()).await?;
1073
1074 let step_log_sender = self
1075 .log_sender
1076 .as_ref()
1077 .map(|s| StepLogSender::new(s.clone(), self.run_id, step.id, name.to_string()));
1078
1079 match execute_step_config(&config, &self.provider, step_log_sender).await {
1080 Ok(output) => {
1081 self.total_cost_usd += output.cost_usd;
1082 self.total_duration_ms += output.duration_ms;
1083
1084 let debug_messages_json = output.debug_messages_json();
1085
1086 let completed_at = Utc::now();
1087 self.store
1088 .update_step(
1089 step.id,
1090 StepUpdate {
1091 status: Some(StepStatus::Completed),
1092 output: Some(output.output.clone()),
1093 duration_ms: Some(output.duration_ms),
1094 cost_usd: Some(output.cost_usd),
1095 input_tokens: output.input_tokens,
1096 output_tokens: output.output_tokens,
1097 completed_at: Some(completed_at),
1098 debug_messages: debug_messages_json,
1099 ..StepUpdate::default()
1100 },
1101 )
1102 .await?;
1103
1104 info!(
1105 run_id = %self.run_id,
1106 step = %name,
1107 duration_ms = output.duration_ms,
1108 "step completed"
1109 );
1110
1111 self.last_step_ids = vec![step.id];
1112
1113 Ok(output)
1114 }
1115 Err(err) => {
1116 let completed_at = Utc::now();
1117 let debug_messages_json = extract_debug_messages_from_error(&err);
1118 let partial = extract_partial_usage_from_error(&err);
1119
1120 if let Some(ref usage) = partial {
1121 if let Some(cost) = usage.cost_usd {
1122 self.total_cost_usd += cost;
1123 }
1124 if let Some(dur) = usage.duration_ms {
1125 self.total_duration_ms += dur;
1126 }
1127 }
1128
1129 if let Err(store_err) = self
1130 .store
1131 .update_step(
1132 step.id,
1133 StepUpdate {
1134 status: Some(StepStatus::Failed),
1135 error: Some(err.to_string()),
1136 completed_at: Some(completed_at),
1137 debug_messages: debug_messages_json,
1138 duration_ms: partial.as_ref().and_then(|p| p.duration_ms),
1139 cost_usd: partial.as_ref().and_then(|p| p.cost_usd),
1140 input_tokens: partial.as_ref().and_then(|p| p.input_tokens),
1141 output_tokens: partial.as_ref().and_then(|p| p.output_tokens),
1142 ..StepUpdate::default()
1143 },
1144 )
1145 .await
1146 {
1147 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
1148 }
1149
1150 Err(err)
1151 }
1152 }
1153 }
1154
1155 /// Record dependency edges and transition a step to Running.
1156 ///
1157 /// Records edges from `step_id` to all `last_step_ids`, then
1158 /// transitions the step to `Running` with the given timestamp.
1159 async fn start_step(&self, step_id: Uuid, now: DateTime<Utc>) -> Result<(), EngineError> {
1160 if !self.last_step_ids.is_empty() {
1161 let deps: Vec<NewStepDependency> = self
1162 .last_step_ids
1163 .iter()
1164 .map(|&depends_on| NewStepDependency {
1165 step_id,
1166 depends_on,
1167 })
1168 .collect();
1169 self.store.create_step_dependencies(deps).await?;
1170 }
1171
1172 self.store
1173 .update_step(
1174 step_id,
1175 StepUpdate {
1176 status: Some(StepStatus::Running),
1177 started_at: Some(now),
1178 ..StepUpdate::default()
1179 },
1180 )
1181 .await?;
1182
1183 Ok(())
1184 }
1185
1186 /// Access the store directly (advanced usage).
1187 pub fn store(&self) -> &Arc<dyn Store> {
1188 &self.store
1189 }
1190
1191 /// Access the payload that triggered this run.
1192 ///
1193 /// Fetches the run from the store and returns its payload.
1194 ///
1195 /// # Errors
1196 ///
1197 /// Returns [`EngineError::Store`] if the run is not found.
1198 pub async fn payload(&self) -> Result<Value, EngineError> {
1199 let run = self
1200 .store
1201 .get_run(self.run_id)
1202 .await?
1203 .ok_or(EngineError::Store(
1204 ironflow_store::error::StoreError::RunNotFound(self.run_id),
1205 ))?;
1206 Ok(run.payload)
1207 }
1208}
1209
1210impl fmt::Debug for WorkflowContext {
1211 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1212 f.debug_struct("WorkflowContext")
1213 .field("run_id", &self.run_id)
1214 .field("position", &self.position)
1215 .field("total_cost_usd", &self.total_cost_usd)
1216 .finish_non_exhaustive()
1217 }
1218}
1219
1220/// Extract debug messages from an engine error, if it wraps a schema validation
1221/// failure that carries a verbose conversation trace.
1222fn extract_debug_messages_from_error(err: &EngineError) -> Option<Value> {
1223 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1224 debug_messages,
1225 ..
1226 })) = err
1227 && !debug_messages.is_empty()
1228 {
1229 return serde_json::to_value(debug_messages).ok();
1230 }
1231 None
1232}
1233
1234/// Partial usage with `Decimal` cost, converted from the `f64` in [`PartialUsage`].
1235///
1236/// Exists only because `ironflow-store` uses [`Decimal`] for monetary values
1237/// while `ironflow-core` uses `f64` (the CLI's native type). The conversion
1238/// happens here, at the engine/store boundary.
1239struct StepPartialUsage {
1240 cost_usd: Option<Decimal>,
1241 duration_ms: Option<u64>,
1242 input_tokens: Option<u64>,
1243 output_tokens: Option<u64>,
1244}
1245
1246fn extract_partial_usage_from_error(err: &EngineError) -> Option<StepPartialUsage> {
1247 if let EngineError::Operation(OperationError::Agent(AgentError::SchemaValidation {
1248 partial_usage,
1249 ..
1250 })) = err
1251 && (partial_usage.cost_usd.is_some() || partial_usage.duration_ms.is_some())
1252 {
1253 return Some(StepPartialUsage {
1254 cost_usd: partial_usage
1255 .cost_usd
1256 .and_then(|c| Decimal::try_from(c).ok()),
1257 duration_ms: partial_usage.duration_ms,
1258 input_tokens: partial_usage.input_tokens,
1259 output_tokens: partial_usage.output_tokens,
1260 });
1261 }
1262 None
1263}