ironflow_engine/context.rs
1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//! &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::time::Instant;
28
29use chrono::Utc;
30use rust_decimal::Decimal;
31use serde_json::Value;
32use tracing::{error, info};
33use uuid::Uuid;
34
35use ironflow_core::provider::AgentProvider;
36use ironflow_store::models::{
37 NewRun, NewStep, RunStatus, RunUpdate, StepKind, StepStatus, StepUpdate, TriggerKind,
38};
39use ironflow_store::store::RunStore;
40
41use crate::config::{AgentStepConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig};
42use crate::error::EngineError;
43use crate::executor::{StepOutput, execute_step_config};
44use crate::handler::WorkflowHandler;
45
46/// Callback type for resolving workflow handlers by name.
47pub(crate) type HandlerResolver =
48 Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
49
50/// Execution context for a single workflow run.
51///
52/// Tracks the current step position and provides convenience methods
53/// for executing operations with automatic persistence.
54///
55/// # Examples
56///
57/// ```no_run
58/// use ironflow_engine::context::WorkflowContext;
59/// use ironflow_engine::config::ShellConfig;
60/// use ironflow_engine::error::EngineError;
61///
62/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
63/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
64/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
65/// # Ok(())
66/// # }
67/// ```
68pub struct WorkflowContext {
69 run_id: Uuid,
70 store: Arc<dyn RunStore>,
71 provider: Arc<dyn AgentProvider>,
72 handler_resolver: Option<HandlerResolver>,
73 position: u32,
74 /// Accumulated cost across all steps in this run.
75 total_cost_usd: Decimal,
76 /// Accumulated duration across all steps.
77 total_duration_ms: u64,
78}
79
80impl WorkflowContext {
81 /// Create a new context for a run.
82 ///
83 /// Not typically called directly — the [`Engine`](crate::engine::Engine)
84 /// creates this when executing a [`WorkflowHandler`](crate::handler::WorkflowHandler).
85 pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
86 Self {
87 run_id,
88 store,
89 provider,
90 handler_resolver: None,
91 position: 0,
92 total_cost_usd: Decimal::ZERO,
93 total_duration_ms: 0,
94 }
95 }
96
97 /// Create a new context with a handler resolver for sub-workflow support.
98 ///
99 /// The resolver is called when [`workflow`](Self::workflow) is invoked to
100 /// look up registered handlers by name.
101 pub(crate) fn with_handler_resolver(
102 run_id: Uuid,
103 store: Arc<dyn RunStore>,
104 provider: Arc<dyn AgentProvider>,
105 resolver: HandlerResolver,
106 ) -> Self {
107 Self {
108 run_id,
109 store,
110 provider,
111 handler_resolver: Some(resolver),
112 position: 0,
113 total_cost_usd: Decimal::ZERO,
114 total_duration_ms: 0,
115 }
116 }
117
118 /// The run ID this context is executing for.
119 pub fn run_id(&self) -> Uuid {
120 self.run_id
121 }
122
123 /// Accumulated cost across all executed steps so far.
124 pub fn total_cost_usd(&self) -> Decimal {
125 self.total_cost_usd
126 }
127
128 /// Accumulated duration across all executed steps so far.
129 pub fn total_duration_ms(&self) -> u64 {
130 self.total_duration_ms
131 }
132
133 /// Execute a shell step.
134 ///
135 /// Creates the step record, runs the command, persists the result,
136 /// and returns the output for use in subsequent steps.
137 ///
138 /// # Errors
139 ///
140 /// Returns [`EngineError`] if the command fails or the store errors.
141 ///
142 /// # Examples
143 ///
144 /// ```no_run
145 /// use ironflow_engine::context::WorkflowContext;
146 /// use ironflow_engine::config::ShellConfig;
147 /// use ironflow_engine::error::EngineError;
148 ///
149 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
150 /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
151 /// println!("stdout: {}", files.output["stdout"]);
152 /// # Ok(())
153 /// # }
154 /// ```
155 pub async fn shell(
156 &mut self,
157 name: &str,
158 config: ShellConfig,
159 ) -> Result<StepOutput, EngineError> {
160 self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
161 .await
162 }
163
164 /// Execute an HTTP step.
165 ///
166 /// # Errors
167 ///
168 /// Returns [`EngineError`] if the request fails or the store errors.
169 ///
170 /// # Examples
171 ///
172 /// ```no_run
173 /// use ironflow_engine::context::WorkflowContext;
174 /// use ironflow_engine::config::HttpConfig;
175 /// use ironflow_engine::error::EngineError;
176 ///
177 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
178 /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
179 /// println!("status: {}", resp.output["status"]);
180 /// # Ok(())
181 /// # }
182 /// ```
183 pub async fn http(
184 &mut self,
185 name: &str,
186 config: HttpConfig,
187 ) -> Result<StepOutput, EngineError> {
188 self.execute_step(name, StepKind::Http, StepConfig::Http(config))
189 .await
190 }
191
192 /// Execute an agent step.
193 ///
194 /// # Errors
195 ///
196 /// Returns [`EngineError`] if the agent invocation fails or the store errors.
197 ///
198 /// # Examples
199 ///
200 /// ```no_run
201 /// use ironflow_engine::context::WorkflowContext;
202 /// use ironflow_engine::config::AgentStepConfig;
203 /// use ironflow_engine::error::EngineError;
204 ///
205 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
206 /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
207 /// println!("review: {}", review.output["value"]);
208 /// # Ok(())
209 /// # }
210 /// ```
211 pub async fn agent(
212 &mut self,
213 name: &str,
214 config: AgentStepConfig,
215 ) -> Result<StepOutput, EngineError> {
216 self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
217 .await
218 }
219
220 /// Execute a sub-workflow step.
221 ///
222 /// Creates a child run for the named workflow handler, executes it with
223 /// its own steps and lifecycle, and returns a [`StepOutput`] containing
224 /// the child run ID and aggregated metrics.
225 ///
226 /// Requires the context to be created with
227 /// [`with_handler_resolver`](Self::with_handler_resolver).
228 ///
229 /// # Errors
230 ///
231 /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
232 /// with the given name, or if no handler resolver is available.
233 ///
234 /// # Examples
235 ///
236 /// ```no_run
237 /// use ironflow_engine::context::WorkflowContext;
238 /// use ironflow_engine::error::EngineError;
239 /// use serde_json::json;
240 ///
241 /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
242 /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
243 /// # Ok(())
244 /// # }
245 /// ```
246 pub async fn workflow(
247 &mut self,
248 handler: &dyn WorkflowHandler,
249 payload: Value,
250 ) -> Result<StepOutput, EngineError> {
251 let config = WorkflowStepConfig::new(handler.name(), payload);
252 let position = self.position;
253 self.position += 1;
254
255 let step = self
256 .store
257 .create_step(NewStep {
258 run_id: self.run_id,
259 name: config.workflow_name.clone(),
260 kind: StepKind::Workflow,
261 position,
262 input: Some(serde_json::to_value(&config)?),
263 })
264 .await?;
265
266 let now = Utc::now();
267 self.store
268 .update_step(
269 step.id,
270 StepUpdate {
271 status: Some(StepStatus::Running),
272 started_at: Some(now),
273 ..StepUpdate::default()
274 },
275 )
276 .await?;
277
278 match self.execute_child_workflow(&config).await {
279 Ok(output) => {
280 self.total_cost_usd += output.cost_usd;
281 self.total_duration_ms += output.duration_ms;
282
283 let completed_at = Utc::now();
284 self.store
285 .update_step(
286 step.id,
287 StepUpdate {
288 status: Some(StepStatus::Completed),
289 output: Some(output.output.clone()),
290 duration_ms: Some(output.duration_ms),
291 cost_usd: Some(output.cost_usd),
292 completed_at: Some(completed_at),
293 ..StepUpdate::default()
294 },
295 )
296 .await?;
297
298 info!(
299 run_id = %self.run_id,
300 child_workflow = %config.workflow_name,
301 duration_ms = output.duration_ms,
302 "workflow step completed"
303 );
304
305 Ok(output)
306 }
307 Err(err) => {
308 let completed_at = Utc::now();
309 if let Err(store_err) = self
310 .store
311 .update_step(
312 step.id,
313 StepUpdate {
314 status: Some(StepStatus::Failed),
315 error: Some(err.to_string()),
316 completed_at: Some(completed_at),
317 ..StepUpdate::default()
318 },
319 )
320 .await
321 {
322 error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
323 }
324
325 Err(err)
326 }
327 }
328 }
329
330 /// Execute a child workflow and return aggregated output.
331 async fn execute_child_workflow(
332 &self,
333 config: &WorkflowStepConfig,
334 ) -> Result<StepOutput, EngineError> {
335 let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
336 EngineError::InvalidWorkflow(
337 "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
338 )
339 })?;
340
341 let handler = resolver(&config.workflow_name).ok_or_else(|| {
342 EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
343 })?;
344
345 let child_run = self
346 .store
347 .create_run(NewRun {
348 workflow_name: config.workflow_name.clone(),
349 trigger: TriggerKind::Workflow,
350 payload: config.payload.clone(),
351 max_retries: 0,
352 })
353 .await?;
354
355 let child_run_id = child_run.id;
356 info!(
357 parent_run_id = %self.run_id,
358 child_run_id = %child_run_id,
359 workflow = %config.workflow_name,
360 "child run created"
361 );
362
363 self.store
364 .update_run_status(child_run_id, RunStatus::Running)
365 .await?;
366
367 let run_start = Instant::now();
368 let mut child_ctx = WorkflowContext {
369 run_id: child_run_id,
370 store: self.store.clone(),
371 provider: self.provider.clone(),
372 handler_resolver: self.handler_resolver.clone(),
373 position: 0,
374 total_cost_usd: Decimal::ZERO,
375 total_duration_ms: 0,
376 };
377
378 let result = handler.execute(&mut child_ctx).await;
379 let total_duration = run_start.elapsed().as_millis() as u64;
380 let completed_at = Utc::now();
381
382 match result {
383 Ok(()) => {
384 self.store
385 .update_run(
386 child_run_id,
387 RunUpdate {
388 status: Some(RunStatus::Completed),
389 cost_usd: Some(child_ctx.total_cost_usd),
390 duration_ms: Some(total_duration),
391 completed_at: Some(completed_at),
392 ..RunUpdate::default()
393 },
394 )
395 .await?;
396
397 Ok(StepOutput {
398 output: serde_json::json!({
399 "run_id": child_run_id,
400 "workflow_name": config.workflow_name,
401 "status": RunStatus::Completed,
402 "cost_usd": child_ctx.total_cost_usd,
403 "duration_ms": total_duration,
404 }),
405 duration_ms: total_duration,
406 cost_usd: child_ctx.total_cost_usd,
407 input_tokens: None,
408 output_tokens: None,
409 })
410 }
411 Err(err) => {
412 if let Err(store_err) = self
413 .store
414 .update_run(
415 child_run_id,
416 RunUpdate {
417 status: Some(RunStatus::Failed),
418 error: Some(err.to_string()),
419 cost_usd: Some(child_ctx.total_cost_usd),
420 duration_ms: Some(total_duration),
421 completed_at: Some(completed_at),
422 ..RunUpdate::default()
423 },
424 )
425 .await
426 {
427 error!(
428 child_run_id = %child_run_id,
429 store_error = %store_err,
430 "failed to persist child run failure"
431 );
432 }
433
434 Err(err)
435 }
436 }
437 }
438
439 /// Internal: execute a step with full persistence lifecycle.
440 async fn execute_step(
441 &mut self,
442 name: &str,
443 kind: StepKind,
444 config: StepConfig,
445 ) -> Result<StepOutput, EngineError> {
446 let position = self.position;
447 self.position += 1;
448
449 // Create step record in Pending.
450 let step = self
451 .store
452 .create_step(NewStep {
453 run_id: self.run_id,
454 name: name.to_string(),
455 kind,
456 position,
457 input: Some(serde_json::to_value(&config)?),
458 })
459 .await?;
460
461 // Transition to Running.
462 let now = Utc::now();
463 self.store
464 .update_step(
465 step.id,
466 StepUpdate {
467 status: Some(StepStatus::Running),
468 started_at: Some(now),
469 ..StepUpdate::default()
470 },
471 )
472 .await?;
473
474 // Execute the operation.
475 match execute_step_config(&config, &self.provider).await {
476 Ok(output) => {
477 self.total_cost_usd += output.cost_usd;
478 self.total_duration_ms += output.duration_ms;
479
480 let completed_at = Utc::now();
481 self.store
482 .update_step(
483 step.id,
484 StepUpdate {
485 status: Some(StepStatus::Completed),
486 output: Some(output.output.clone()),
487 duration_ms: Some(output.duration_ms),
488 cost_usd: Some(output.cost_usd),
489 input_tokens: output.input_tokens,
490 output_tokens: output.output_tokens,
491 completed_at: Some(completed_at),
492 ..StepUpdate::default()
493 },
494 )
495 .await?;
496
497 info!(
498 run_id = %self.run_id,
499 step = %name,
500 duration_ms = output.duration_ms,
501 "step completed"
502 );
503
504 Ok(output)
505 }
506 Err(err) => {
507 let completed_at = Utc::now();
508 if let Err(store_err) = self
509 .store
510 .update_step(
511 step.id,
512 StepUpdate {
513 status: Some(StepStatus::Failed),
514 error: Some(err.to_string()),
515 completed_at: Some(completed_at),
516 ..StepUpdate::default()
517 },
518 )
519 .await
520 {
521 tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
522 }
523
524 Err(err)
525 }
526 }
527 }
528
529 /// Access the store directly (advanced usage).
530 pub fn store(&self) -> &Arc<dyn RunStore> {
531 &self.store
532 }
533
534 /// Access the payload that triggered this run.
535 ///
536 /// Fetches the run from the store and returns its payload.
537 ///
538 /// # Errors
539 ///
540 /// Returns [`EngineError::Store`] if the run is not found.
541 pub async fn payload(&self) -> Result<Value, EngineError> {
542 let run = self
543 .store
544 .get_run(self.run_id)
545 .await?
546 .ok_or(EngineError::Store(
547 ironflow_store::error::StoreError::RunNotFound(self.run_id),
548 ))?;
549 Ok(run.payload)
550 }
551}
552
553impl std::fmt::Debug for WorkflowContext {
554 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555 f.debug_struct("WorkflowContext")
556 .field("run_id", &self.run_id)
557 .field("position", &self.position)
558 .field("total_cost_usd", &self.total_cost_usd)
559 .finish_non_exhaustive()
560 }
561}