ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::time::Instant;
28
29use chrono::Utc;
30use rust_decimal::Decimal;
31use serde_json::Value;
32use tracing::{error, info};
33use uuid::Uuid;
34
35use ironflow_core::provider::AgentProvider;
36use ironflow_store::models::{
37 NewRun, NewStep, RunStatus, RunUpdate, StepKind, StepStatus, StepUpdate, TriggerKind,
38};
39use ironflow_store::store::RunStore;
40
41use crate::config::{AgentStepConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig};
42use crate::error::EngineError;
43use crate::executor::{StepOutput, execute_step_config};
44use crate::handler::WorkflowHandler;
45use crate::operation::Operation;
46
47/// Callback type for resolving workflow handlers by name.
48pub(crate) type HandlerResolver =
49 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
50
51/// Execution context for a single workflow run.
52///
53/// Tracks the current step position and provides convenience methods
54/// for executing operations with automatic persistence.
55///
56/// # Examples
57///
58/// ```no_run
59/// use ironflow_engine::context::WorkflowContext;
60/// use ironflow_engine::config::ShellConfig;
61/// use ironflow_engine::error::EngineError;
62///
63/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
64/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
65/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
66/// # Ok(())
67/// # }
68/// ```
69pub struct WorkflowContext {
70 run_id: Uuid,
71 store: Arc<dyn RunStore>,
72 provider: Arc<dyn AgentProvider>,
73 handler_resolver: Option<HandlerResolver>,
74 position: u32,
75 /// Accumulated cost across all steps in this run.
76 total_cost_usd: Decimal,
77 /// Accumulated duration across all steps.
78 total_duration_ms: u64,
79}
80
81impl WorkflowContext {
82 /// Create a new context for a run.
83 ///
84 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
85 /// creates this when executing a [`WorkflowHandler`](crate::handler::WorkflowHandler).
86 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
87 Self {
88 run_id,
89 store,
90 provider,
91 handler_resolver: None,
92 position: 0,
93 total_cost_usd: Decimal::ZERO,
94 total_duration_ms: 0,
95 }
96 }
97
98 /// Create a new context with a handler resolver for sub-workflow support.
99 ///
100 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
101 /// look up registered handlers by name.
102 pub(crate) fn with_handler_resolver(
103 run_id: Uuid,
104 store: Arc<dyn RunStore>,
105 provider: Arc<dyn AgentProvider>,
106 resolver: HandlerResolver,
107 ) -> Self {
108 Self {
109 run_id,
110 store,
111 provider,
112 handler_resolver: Some(resolver),
113 position: 0,
114 total_cost_usd: Decimal::ZERO,
115 total_duration_ms: 0,
116 }
117 }
118
119 /// The run ID this context is executing for.
120 pub fn run_id(&self) -> Uuid {
121 self.run_id
122 }
123
124 /// Accumulated cost across all executed steps so far.
125 pub fn total_cost_usd(&self) -> Decimal {
126 self.total_cost_usd
127 }
128
129 /// Accumulated duration across all executed steps so far.
130 pub fn total_duration_ms(&self) -> u64 {
131 self.total_duration_ms
132 }
133
134 /// Execute a shell step.
135 ///
136 /// Creates the step record, runs the command, persists the result,
137 /// and returns the output for use in subsequent steps.
138 ///
139 /// # Errors
140 ///
141 /// Returns [`EngineError`] if the command fails or the store errors.
142 ///
143 /// # Examples
144 ///
145 /// ```no_run
146 /// use ironflow_engine::context::WorkflowContext;
147 /// use ironflow_engine::config::ShellConfig;
148 /// use ironflow_engine::error::EngineError;
149 ///
150 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
151 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
152 /// println!("stdout: {}", files.output["stdout"]);
153 /// # Ok(())
154 /// # }
155 /// ```
156 pub async fn shell(
157 &mut self,
158 name: &str,
159 config: ShellConfig,
160 ) -> Result<StepOutput, EngineError> {
161 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
162 .await
163 }
164
165 /// Execute an HTTP step.
166 ///
167 /// # Errors
168 ///
169 /// Returns [`EngineError`] if the request fails or the store errors.
170 ///
171 /// # Examples
172 ///
173 /// ```no_run
174 /// use ironflow_engine::context::WorkflowContext;
175 /// use ironflow_engine::config::HttpConfig;
176 /// use ironflow_engine::error::EngineError;
177 ///
178 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
179 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
180 /// println!("status: {}", resp.output["status"]);
181 /// # Ok(())
182 /// # }
183 /// ```
184 pub async fn http(
185 &mut self,
186 name: &str,
187 config: HttpConfig,
188 ) -> Result<StepOutput, EngineError> {
189 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
190 .await
191 }
192
193 /// Execute an agent step.
194 ///
195 /// # Errors
196 ///
197 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
198 ///
199 /// # Examples
200 ///
201 /// ```no_run
202 /// use ironflow_engine::context::WorkflowContext;
203 /// use ironflow_engine::config::AgentStepConfig;
204 /// use ironflow_engine::error::EngineError;
205 ///
206 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
207 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
208 /// println!("review: {}", review.output["value"]);
209 /// # Ok(())
210 /// # }
211 /// ```
212 pub async fn agent(
213 &mut self,
214 name: &str,
215 config: AgentStepConfig,
216 ) -> Result<StepOutput, EngineError> {
217 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
218 .await
219 }
220
221 /// Execute a custom operation step.
222 ///
223 /// Runs a user-defined [`Operation`] with full step lifecycle management:
224 /// creates the step record, transitions to Running, executes the operation,
225 /// persists the output and duration, and marks the step Completed or Failed.
226 ///
227 /// The operation's [`kind()`](Operation::kind) is stored as
228 /// [`StepKind::Custom`](ironflow_store::models::StepKind::Custom).
229 ///
230 /// # Errors
231 ///
232 /// Returns [`EngineError`] if the operation fails or the store errors.
233 ///
234 /// # Examples
235 ///
236 /// ```no_run
237 /// use ironflow_engine::context::WorkflowContext;
238 /// use ironflow_engine::operation::Operation;
239 /// use ironflow_engine::error::EngineError;
240 /// use serde_json::{Value, json};
241 /// use std::pin::Pin;
242 /// use std::future::Future;
243 ///
244 /// struct MyOp;
245 /// impl Operation for MyOp {
246 /// fn kind(&self) -> &str { "my-service" }
247 /// fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
248 /// Box::pin(async { Ok(json!({"ok": true})) })
249 /// }
250 /// }
251 ///
252 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
253 /// let result = ctx.operation("call-service", &MyOp).await?;
254 /// println!("output: {}", result.output);
255 /// # Ok(())
256 /// # }
257 /// ```
258 pub async fn operation(
259 &mut self,
260 name: &str,
261 op: &dyn Operation,
262 ) -> Result<StepOutput, EngineError> {
263 let kind = StepKind::Custom(op.kind().to_string());
264 let position = self.position;
265 self.position += 1;
266
267 let step = self
268 .store
269 .create_step(NewStep {
270 run_id: self.run_id,
271 name: name.to_string(),
272 kind,
273 position,
274 input: op.input(),
275 })
276 .await?;
277
278 let now = Utc::now();
279 self.store
280 .update_step(
281 step.id,
282 StepUpdate {
283 status: Some(StepStatus::Running),
284 started_at: Some(now),
285 ..StepUpdate::default()
286 },
287 )
288 .await?;
289
290 let start = std::time::Instant::now();
291
292 match op.execute().await {
293 Ok(output_value) => {
294 let duration_ms = start.elapsed().as_millis() as u64;
295 self.total_duration_ms += duration_ms;
296
297 let completed_at = Utc::now();
298 self.store
299 .update_step(
300 step.id,
301 StepUpdate {
302 status: Some(StepStatus::Completed),
303 output: Some(output_value.clone()),
304 duration_ms: Some(duration_ms),
305 cost_usd: Some(Decimal::ZERO),
306 completed_at: Some(completed_at),
307 ..StepUpdate::default()
308 },
309 )
310 .await?;
311
312 info!(
313 run_id = %self.run_id,
314 step = %name,
315 kind = op.kind(),
316 duration_ms,
317 "operation step completed"
318 );
319
320 Ok(StepOutput {
321 output: output_value,
322 duration_ms,
323 cost_usd: Decimal::ZERO,
324 input_tokens: None,
325 output_tokens: None,
326 })
327 }
328 Err(err) => {
329 let completed_at = Utc::now();
330 if let Err(store_err) = self
331 .store
332 .update_step(
333 step.id,
334 StepUpdate {
335 status: Some(StepStatus::Failed),
336 error: Some(err.to_string()),
337 completed_at: Some(completed_at),
338 ..StepUpdate::default()
339 },
340 )
341 .await
342 {
343 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
344 }
345
346 Err(err)
347 }
348 }
349 }
350
351 /// Execute a sub-workflow step.
352 ///
353 /// Creates a child run for the named workflow handler, executes it with
354 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
355 /// the child run ID and aggregated metrics.
356 ///
357 /// Requires the context to be created with
358 /// [`with_handler_resolver`](Self::with_handler_resolver).
359 ///
360 /// # Errors
361 ///
362 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
363 /// with the given name, or if no handler resolver is available.
364 ///
365 /// # Examples
366 ///
367 /// ```no_run
368 /// use ironflow_engine::context::WorkflowContext;
369 /// use ironflow_engine::error::EngineError;
370 /// use serde_json::json;
371 ///
372 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
373 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
374 /// # Ok(())
375 /// # }
376 /// ```
377 pub async fn workflow(
378 &mut self,
379 handler: &dyn WorkflowHandler,
380 payload: Value,
381 ) -> Result<StepOutput, EngineError> {
382 let config = WorkflowStepConfig::new(handler.name(), payload);
383 let position = self.position;
384 self.position += 1;
385
386 let step = self
387 .store
388 .create_step(NewStep {
389 run_id: self.run_id,
390 name: config.workflow_name.clone(),
391 kind: StepKind::Workflow,
392 position,
393 input: Some(serde_json::to_value(&config)?),
394 })
395 .await?;
396
397 let now = Utc::now();
398 self.store
399 .update_step(
400 step.id,
401 StepUpdate {
402 status: Some(StepStatus::Running),
403 started_at: Some(now),
404 ..StepUpdate::default()
405 },
406 )
407 .await?;
408
409 match self.execute_child_workflow(&config).await {
410 Ok(output) => {
411 self.total_cost_usd += output.cost_usd;
412 self.total_duration_ms += output.duration_ms;
413
414 let completed_at = Utc::now();
415 self.store
416 .update_step(
417 step.id,
418 StepUpdate {
419 status: Some(StepStatus::Completed),
420 output: Some(output.output.clone()),
421 duration_ms: Some(output.duration_ms),
422 cost_usd: Some(output.cost_usd),
423 completed_at: Some(completed_at),
424 ..StepUpdate::default()
425 },
426 )
427 .await?;
428
429 info!(
430 run_id = %self.run_id,
431 child_workflow = %config.workflow_name,
432 duration_ms = output.duration_ms,
433 "workflow step completed"
434 );
435
436 Ok(output)
437 }
438 Err(err) => {
439 let completed_at = Utc::now();
440 if let Err(store_err) = self
441 .store
442 .update_step(
443 step.id,
444 StepUpdate {
445 status: Some(StepStatus::Failed),
446 error: Some(err.to_string()),
447 completed_at: Some(completed_at),
448 ..StepUpdate::default()
449 },
450 )
451 .await
452 {
453 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
454 }
455
456 Err(err)
457 }
458 }
459 }
460
461 /// Execute a child workflow and return aggregated output.
462 async fn execute_child_workflow(
463 &self,
464 config: &WorkflowStepConfig,
465 ) -> Result<StepOutput, EngineError> {
466 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
467 EngineError::InvalidWorkflow(
468 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
469 )
470 })?;
471
472 let handler = resolver(&config.workflow_name).ok_or_else(|| {
473 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
474 })?;
475
476 let child_run = self
477 .store
478 .create_run(NewRun {
479 workflow_name: config.workflow_name.clone(),
480 trigger: TriggerKind::Workflow,
481 payload: config.payload.clone(),
482 max_retries: 0,
483 })
484 .await?;
485
486 let child_run_id = child_run.id;
487 info!(
488 parent_run_id = %self.run_id,
489 child_run_id = %child_run_id,
490 workflow = %config.workflow_name,
491 "child run created"
492 );
493
494 self.store
495 .update_run_status(child_run_id, RunStatus::Running)
496 .await?;
497
498 let run_start = Instant::now();
499 let mut child_ctx = WorkflowContext {
500 run_id: child_run_id,
501 store: self.store.clone(),
502 provider: self.provider.clone(),
503 handler_resolver: self.handler_resolver.clone(),
504 position: 0,
505 total_cost_usd: Decimal::ZERO,
506 total_duration_ms: 0,
507 };
508
509 let result = handler.execute(&mut child_ctx).await;
510 let total_duration = run_start.elapsed().as_millis() as u64;
511 let completed_at = Utc::now();
512
513 match result {
514 Ok(()) => {
515 self.store
516 .update_run(
517 child_run_id,
518 RunUpdate {
519 status: Some(RunStatus::Completed),
520 cost_usd: Some(child_ctx.total_cost_usd),
521 duration_ms: Some(total_duration),
522 completed_at: Some(completed_at),
523 ..RunUpdate::default()
524 },
525 )
526 .await?;
527
528 Ok(StepOutput {
529 output: serde_json::json!({
530 "run_id": child_run_id,
531 "workflow_name": config.workflow_name,
532 "status": RunStatus::Completed,
533 "cost_usd": child_ctx.total_cost_usd,
534 "duration_ms": total_duration,
535 }),
536 duration_ms: total_duration,
537 cost_usd: child_ctx.total_cost_usd,
538 input_tokens: None,
539 output_tokens: None,
540 })
541 }
542 Err(err) => {
543 if let Err(store_err) = self
544 .store
545 .update_run(
546 child_run_id,
547 RunUpdate {
548 status: Some(RunStatus::Failed),
549 error: Some(err.to_string()),
550 cost_usd: Some(child_ctx.total_cost_usd),
551 duration_ms: Some(total_duration),
552 completed_at: Some(completed_at),
553 ..RunUpdate::default()
554 },
555 )
556 .await
557 {
558 error!(
559 child_run_id = %child_run_id,
560 store_error = %store_err,
561 "failed to persist child run failure"
562 );
563 }
564
565 Err(err)
566 }
567 }
568 }
569
570 /// Internal: execute a step with full persistence lifecycle.
571 async fn execute_step(
572 &mut self,
573 name: &str,
574 kind: StepKind,
575 config: StepConfig,
576 ) -> Result<StepOutput, EngineError> {
577 let position = self.position;
578 self.position += 1;
579
580 // Create step record in Pending.
581 let step = self
582 .store
583 .create_step(NewStep {
584 run_id: self.run_id,
585 name: name.to_string(),
586 kind,
587 position,
588 input: Some(serde_json::to_value(&config)?),
589 })
590 .await?;
591
592 // Transition to Running.
593 let now = Utc::now();
594 self.store
595 .update_step(
596 step.id,
597 StepUpdate {
598 status: Some(StepStatus::Running),
599 started_at: Some(now),
600 ..StepUpdate::default()
601 },
602 )
603 .await?;
604
605 // Execute the operation.
606 match execute_step_config(&config, &self.provider).await {
607 Ok(output) => {
608 self.total_cost_usd += output.cost_usd;
609 self.total_duration_ms += output.duration_ms;
610
611 let completed_at = Utc::now();
612 self.store
613 .update_step(
614 step.id,
615 StepUpdate {
616 status: Some(StepStatus::Completed),
617 output: Some(output.output.clone()),
618 duration_ms: Some(output.duration_ms),
619 cost_usd: Some(output.cost_usd),
620 input_tokens: output.input_tokens,
621 output_tokens: output.output_tokens,
622 completed_at: Some(completed_at),
623 ..StepUpdate::default()
624 },
625 )
626 .await?;
627
628 info!(
629 run_id = %self.run_id,
630 step = %name,
631 duration_ms = output.duration_ms,
632 "step completed"
633 );
634
635 Ok(output)
636 }
637 Err(err) => {
638 let completed_at = Utc::now();
639 if let Err(store_err) = self
640 .store
641 .update_step(
642 step.id,
643 StepUpdate {
644 status: Some(StepStatus::Failed),
645 error: Some(err.to_string()),
646 completed_at: Some(completed_at),
647 ..StepUpdate::default()
648 },
649 )
650 .await
651 {
652 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
653 }
654
655 Err(err)
656 }
657 }
658 }
659
660 /// Access the store directly (advanced usage).
661 pub fn store(&self) -> &Arc<dyn RunStore> {
662 &self.store
663 }
664
665 /// Access the payload that triggered this run.
666 ///
667 /// Fetches the run from the store and returns its payload.
668 ///
669 /// # Errors
670 ///
671 /// Returns [`EngineError::Store`] if the run is not found.
672 pub async fn payload(&self) -> Result<Value, EngineError> {
673 let run = self
674 .store
675 .get_run(self.run_id)
676 .await?
677 .ok_or(EngineError::Store(
678 ironflow_store::error::StoreError::RunNotFound(self.run_id),
679 ))?;
680 Ok(run.payload)
681 }
682}
683
684impl std::fmt::Debug for WorkflowContext {
685 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
686 f.debug_struct("WorkflowContext")
687 .field("run_id", &self.run_id)
688 .field("position", &self.position)
689 .field("total_cost_usd", &self.total_cost_usd)
690 .finish_non_exhaustive()
691 }
692}