Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::time::Instant;
28
29use chrono::Utc;
30use rust_decimal::Decimal;
31use serde_json::Value;
32use tracing::{error, info};
33use uuid::Uuid;
34
35use ironflow_core::provider::AgentProvider;
36use ironflow_store::models::{
37    NewRun, NewStep, RunStatus, RunUpdate, StepKind, StepStatus, StepUpdate, TriggerKind,
38};
39use ironflow_store::store::RunStore;
40
41use crate::config::{AgentStepConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig};
42use crate::error::EngineError;
43use crate::executor::{StepOutput, execute_step_config};
44use crate::handler::WorkflowHandler;
45
46/// Callback type for resolving workflow handlers by name.
47pub(crate) type HandlerResolver =
48    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
49
50/// Execution context for a single workflow run.
51///
52/// Tracks the current step position and provides convenience methods
53/// for executing operations with automatic persistence.
54///
55/// # Examples
56///
57/// ```no_run
58/// use ironflow_engine::context::WorkflowContext;
59/// use ironflow_engine::config::ShellConfig;
60/// use ironflow_engine::error::EngineError;
61///
62/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
63/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
64/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
65/// # Ok(())
66/// # }
67/// ```
68pub struct WorkflowContext {
69    run_id: Uuid,
70    store: Arc<dyn RunStore>,
71    provider: Arc<dyn AgentProvider>,
72    handler_resolver: Option<HandlerResolver>,
73    position: u32,
74    /// Accumulated cost across all steps in this run.
75    total_cost_usd: Decimal,
76    /// Accumulated duration across all steps.
77    total_duration_ms: u64,
78}
79
80impl WorkflowContext {
81    /// Create a new context for a run.
82    ///
83    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
84    /// creates this when executing a [`WorkflowHandler`](crate::handler::WorkflowHandler).
85    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
86        Self {
87            run_id,
88            store,
89            provider,
90            handler_resolver: None,
91            position: 0,
92            total_cost_usd: Decimal::ZERO,
93            total_duration_ms: 0,
94        }
95    }
96
97    /// Create a new context with a handler resolver for sub-workflow support.
98    ///
99    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
100    /// look up registered handlers by name.
101    pub(crate) fn with_handler_resolver(
102        run_id: Uuid,
103        store: Arc<dyn RunStore>,
104        provider: Arc<dyn AgentProvider>,
105        resolver: HandlerResolver,
106    ) -> Self {
107        Self {
108            run_id,
109            store,
110            provider,
111            handler_resolver: Some(resolver),
112            position: 0,
113            total_cost_usd: Decimal::ZERO,
114            total_duration_ms: 0,
115        }
116    }
117
118    /// The run ID this context is executing for.
119    pub fn run_id(&self) -> Uuid {
120        self.run_id
121    }
122
123    /// Accumulated cost across all executed steps so far.
124    pub fn total_cost_usd(&self) -> Decimal {
125        self.total_cost_usd
126    }
127
128    /// Accumulated duration across all executed steps so far.
129    pub fn total_duration_ms(&self) -> u64 {
130        self.total_duration_ms
131    }
132
133    /// Execute a shell step.
134    ///
135    /// Creates the step record, runs the command, persists the result,
136    /// and returns the output for use in subsequent steps.
137    ///
138    /// # Errors
139    ///
140    /// Returns [`EngineError`] if the command fails or the store errors.
141    ///
142    /// # Examples
143    ///
144    /// ```no_run
145    /// use ironflow_engine::context::WorkflowContext;
146    /// use ironflow_engine::config::ShellConfig;
147    /// use ironflow_engine::error::EngineError;
148    ///
149    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
150    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
151    /// println!("stdout: {}", files.output["stdout"]);
152    /// # Ok(())
153    /// # }
154    /// ```
155    pub async fn shell(
156        &mut self,
157        name: &str,
158        config: ShellConfig,
159    ) -> Result<StepOutput, EngineError> {
160        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
161            .await
162    }
163
164    /// Execute an HTTP step.
165    ///
166    /// # Errors
167    ///
168    /// Returns [`EngineError`] if the request fails or the store errors.
169    ///
170    /// # Examples
171    ///
172    /// ```no_run
173    /// use ironflow_engine::context::WorkflowContext;
174    /// use ironflow_engine::config::HttpConfig;
175    /// use ironflow_engine::error::EngineError;
176    ///
177    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
178    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
179    /// println!("status: {}", resp.output["status"]);
180    /// # Ok(())
181    /// # }
182    /// ```
183    pub async fn http(
184        &mut self,
185        name: &str,
186        config: HttpConfig,
187    ) -> Result<StepOutput, EngineError> {
188        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
189            .await
190    }
191
192    /// Execute an agent step.
193    ///
194    /// # Errors
195    ///
196    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
197    ///
198    /// # Examples
199    ///
200    /// ```no_run
201    /// use ironflow_engine::context::WorkflowContext;
202    /// use ironflow_engine::config::AgentStepConfig;
203    /// use ironflow_engine::error::EngineError;
204    ///
205    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
206    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
207    /// println!("review: {}", review.output["value"]);
208    /// # Ok(())
209    /// # }
210    /// ```
211    pub async fn agent(
212        &mut self,
213        name: &str,
214        config: AgentStepConfig,
215    ) -> Result<StepOutput, EngineError> {
216        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
217            .await
218    }
219
220    /// Execute a sub-workflow step.
221    ///
222    /// Creates a child run for the named workflow handler, executes it with
223    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
224    /// the child run ID and aggregated metrics.
225    ///
226    /// Requires the context to be created with
227    /// [`with_handler_resolver`](Self::with_handler_resolver).
228    ///
229    /// # Errors
230    ///
231    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
232    /// with the given name, or if no handler resolver is available.
233    ///
234    /// # Examples
235    ///
236    /// ```no_run
237    /// use ironflow_engine::context::WorkflowContext;
238    /// use ironflow_engine::error::EngineError;
239    /// use serde_json::json;
240    ///
241    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
242    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
243    /// # Ok(())
244    /// # }
245    /// ```
246    pub async fn workflow(
247        &mut self,
248        handler: &dyn WorkflowHandler,
249        payload: Value,
250    ) -> Result<StepOutput, EngineError> {
251        let config = WorkflowStepConfig::new(handler.name(), payload);
252        let position = self.position;
253        self.position += 1;
254
255        let step = self
256            .store
257            .create_step(NewStep {
258                run_id: self.run_id,
259                name: config.workflow_name.clone(),
260                kind: StepKind::Workflow,
261                position,
262                input: Some(serde_json::to_value(&config)?),
263            })
264            .await?;
265
266        let now = Utc::now();
267        self.store
268            .update_step(
269                step.id,
270                StepUpdate {
271                    status: Some(StepStatus::Running),
272                    started_at: Some(now),
273                    ..StepUpdate::default()
274                },
275            )
276            .await?;
277
278        match self.execute_child_workflow(&config).await {
279            Ok(output) => {
280                self.total_cost_usd += output.cost_usd;
281                self.total_duration_ms += output.duration_ms;
282
283                let completed_at = Utc::now();
284                self.store
285                    .update_step(
286                        step.id,
287                        StepUpdate {
288                            status: Some(StepStatus::Completed),
289                            output: Some(output.output.clone()),
290                            duration_ms: Some(output.duration_ms),
291                            cost_usd: Some(output.cost_usd),
292                            completed_at: Some(completed_at),
293                            ..StepUpdate::default()
294                        },
295                    )
296                    .await?;
297
298                info!(
299                    run_id = %self.run_id,
300                    child_workflow = %config.workflow_name,
301                    duration_ms = output.duration_ms,
302                    "workflow step completed"
303                );
304
305                Ok(output)
306            }
307            Err(err) => {
308                let completed_at = Utc::now();
309                if let Err(store_err) = self
310                    .store
311                    .update_step(
312                        step.id,
313                        StepUpdate {
314                            status: Some(StepStatus::Failed),
315                            error: Some(err.to_string()),
316                            completed_at: Some(completed_at),
317                            ..StepUpdate::default()
318                        },
319                    )
320                    .await
321                {
322                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
323                }
324
325                Err(err)
326            }
327        }
328    }
329
330    /// Execute a child workflow and return aggregated output.
331    async fn execute_child_workflow(
332        &self,
333        config: &WorkflowStepConfig,
334    ) -> Result<StepOutput, EngineError> {
335        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
336            EngineError::InvalidWorkflow(
337                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
338            )
339        })?;
340
341        let handler = resolver(&config.workflow_name).ok_or_else(|| {
342            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
343        })?;
344
345        let child_run = self
346            .store
347            .create_run(NewRun {
348                workflow_name: config.workflow_name.clone(),
349                trigger: TriggerKind::Workflow,
350                payload: config.payload.clone(),
351                max_retries: 0,
352            })
353            .await?;
354
355        let child_run_id = child_run.id;
356        info!(
357            parent_run_id = %self.run_id,
358            child_run_id = %child_run_id,
359            workflow = %config.workflow_name,
360            "child run created"
361        );
362
363        self.store
364            .update_run_status(child_run_id, RunStatus::Running)
365            .await?;
366
367        let run_start = Instant::now();
368        let mut child_ctx = WorkflowContext {
369            run_id: child_run_id,
370            store: self.store.clone(),
371            provider: self.provider.clone(),
372            handler_resolver: self.handler_resolver.clone(),
373            position: 0,
374            total_cost_usd: Decimal::ZERO,
375            total_duration_ms: 0,
376        };
377
378        let result = handler.execute(&mut child_ctx).await;
379        let total_duration = run_start.elapsed().as_millis() as u64;
380        let completed_at = Utc::now();
381
382        match result {
383            Ok(()) => {
384                self.store
385                    .update_run(
386                        child_run_id,
387                        RunUpdate {
388                            status: Some(RunStatus::Completed),
389                            cost_usd: Some(child_ctx.total_cost_usd),
390                            duration_ms: Some(total_duration),
391                            completed_at: Some(completed_at),
392                            ..RunUpdate::default()
393                        },
394                    )
395                    .await?;
396
397                Ok(StepOutput {
398                    output: serde_json::json!({
399                        "run_id": child_run_id,
400                        "workflow_name": config.workflow_name,
401                        "status": RunStatus::Completed,
402                        "cost_usd": child_ctx.total_cost_usd,
403                        "duration_ms": total_duration,
404                    }),
405                    duration_ms: total_duration,
406                    cost_usd: child_ctx.total_cost_usd,
407                    input_tokens: None,
408                    output_tokens: None,
409                })
410            }
411            Err(err) => {
412                if let Err(store_err) = self
413                    .store
414                    .update_run(
415                        child_run_id,
416                        RunUpdate {
417                            status: Some(RunStatus::Failed),
418                            error: Some(err.to_string()),
419                            cost_usd: Some(child_ctx.total_cost_usd),
420                            duration_ms: Some(total_duration),
421                            completed_at: Some(completed_at),
422                            ..RunUpdate::default()
423                        },
424                    )
425                    .await
426                {
427                    error!(
428                        child_run_id = %child_run_id,
429                        store_error = %store_err,
430                        "failed to persist child run failure"
431                    );
432                }
433
434                Err(err)
435            }
436        }
437    }
438
439    /// Internal: execute a step with full persistence lifecycle.
440    async fn execute_step(
441        &mut self,
442        name: &str,
443        kind: StepKind,
444        config: StepConfig,
445    ) -> Result<StepOutput, EngineError> {
446        let position = self.position;
447        self.position += 1;
448
449        // Create step record in Pending.
450        let step = self
451            .store
452            .create_step(NewStep {
453                run_id: self.run_id,
454                name: name.to_string(),
455                kind,
456                position,
457                input: Some(serde_json::to_value(&config)?),
458            })
459            .await?;
460
461        // Transition to Running.
462        let now = Utc::now();
463        self.store
464            .update_step(
465                step.id,
466                StepUpdate {
467                    status: Some(StepStatus::Running),
468                    started_at: Some(now),
469                    ..StepUpdate::default()
470                },
471            )
472            .await?;
473
474        // Execute the operation.
475        match execute_step_config(&config, &self.provider).await {
476            Ok(output) => {
477                self.total_cost_usd += output.cost_usd;
478                self.total_duration_ms += output.duration_ms;
479
480                let completed_at = Utc::now();
481                self.store
482                    .update_step(
483                        step.id,
484                        StepUpdate {
485                            status: Some(StepStatus::Completed),
486                            output: Some(output.output.clone()),
487                            duration_ms: Some(output.duration_ms),
488                            cost_usd: Some(output.cost_usd),
489                            input_tokens: output.input_tokens,
490                            output_tokens: output.output_tokens,
491                            completed_at: Some(completed_at),
492                            ..StepUpdate::default()
493                        },
494                    )
495                    .await?;
496
497                info!(
498                    run_id = %self.run_id,
499                    step = %name,
500                    duration_ms = output.duration_ms,
501                    "step completed"
502                );
503
504                Ok(output)
505            }
506            Err(err) => {
507                let completed_at = Utc::now();
508                if let Err(store_err) = self
509                    .store
510                    .update_step(
511                        step.id,
512                        StepUpdate {
513                            status: Some(StepStatus::Failed),
514                            error: Some(err.to_string()),
515                            completed_at: Some(completed_at),
516                            ..StepUpdate::default()
517                        },
518                    )
519                    .await
520                {
521                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
522                }
523
524                Err(err)
525            }
526        }
527    }
528
529    /// Access the store directly (advanced usage).
530    pub fn store(&self) -> &Arc<dyn RunStore> {
531        &self.store
532    }
533
534    /// Access the payload that triggered this run.
535    ///
536    /// Fetches the run from the store and returns its payload.
537    ///
538    /// # Errors
539    ///
540    /// Returns [`EngineError::Store`] if the run is not found.
541    pub async fn payload(&self) -> Result<Value, EngineError> {
542        let run = self
543            .store
544            .get_run(self.run_id)
545            .await?
546            .ok_or(EngineError::Store(
547                ironflow_store::error::StoreError::RunNotFound(self.run_id),
548            ))?;
549        Ok(run.payload)
550    }
551}
552
553impl std::fmt::Debug for WorkflowContext {
554    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
555        f.debug_struct("WorkflowContext")
556            .field("run_id", &self.run_id)
557            .field("position", &self.position)
558            .field("total_cost_usd", &self.total_cost_usd)
559            .finish_non_exhaustive()
560    }
561}