Skip to main content

ironflow_engine/
context.rs

1//! [`WorkflowContext`] — execution context for dynamic workflows.
2//!
3//! Provides step execution methods that automatically persist results to the
4//! store. Each call to [`shell`](WorkflowContext::shell),
5//! [`http`](WorkflowContext::http), [`agent`](WorkflowContext::agent), or
6//! [`workflow`](WorkflowContext::workflow) creates a step record, executes the
7//! operation, captures the output, and returns a [`StepOutput`] that the next
8//! step can reference.
9//!
10//! # Examples
11//!
12//! ```no_run
13//! use ironflow_engine::context::WorkflowContext;
14//! use ironflow_engine::config::{ShellConfig, AgentStepConfig};
15//! use ironflow_engine::error::EngineError;
16//!
17//! # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
18//! let build = ctx.shell("build", ShellConfig::new("cargo build")).await?;
19//! let review = ctx.agent("review", AgentStepConfig::new(
20//!     &format!("Build output:\n{}", build.output["stdout"])
21//! )).await?;
22//! # Ok(())
23//! # }
24//! ```
25
26use std::sync::Arc;
27use std::time::Instant;
28
29use chrono::Utc;
30use rust_decimal::Decimal;
31use serde_json::Value;
32use tracing::{error, info};
33use uuid::Uuid;
34
35use ironflow_core::provider::AgentProvider;
36use ironflow_store::models::{
37    NewRun, NewStep, RunStatus, RunUpdate, StepKind, StepStatus, StepUpdate, TriggerKind,
38};
39use ironflow_store::store::RunStore;
40
41use crate::config::{AgentStepConfig, HttpConfig, ShellConfig, StepConfig, WorkflowStepConfig};
42use crate::error::EngineError;
43use crate::executor::{StepOutput, execute_step_config};
44use crate::handler::WorkflowHandler;
45use crate::operation::Operation;
46
47/// Callback type for resolving workflow handlers by name.
48pub(crate) type HandlerResolver =
49    Arc<dyn Fn(&str) -> Option<Arc<dyn WorkflowHandler>> + Send + Sync>;
50
51/// Execution context for a single workflow run.
52///
53/// Tracks the current step position and provides convenience methods
54/// for executing operations with automatic persistence.
55///
56/// # Examples
57///
58/// ```no_run
59/// use ironflow_engine::context::WorkflowContext;
60/// use ironflow_engine::config::ShellConfig;
61/// use ironflow_engine::error::EngineError;
62///
63/// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
64/// let result = ctx.shell("greet", ShellConfig::new("echo hello")).await?;
65/// assert!(result.output["stdout"].as_str().unwrap().contains("hello"));
66/// # Ok(())
67/// # }
68/// ```
69pub struct WorkflowContext {
70    run_id: Uuid,
71    store: Arc<dyn RunStore>,
72    provider: Arc<dyn AgentProvider>,
73    handler_resolver: Option<HandlerResolver>,
74    position: u32,
75    /// Accumulated cost across all steps in this run.
76    total_cost_usd: Decimal,
77    /// Accumulated duration across all steps.
78    total_duration_ms: u64,
79}
80
81impl WorkflowContext {
82    /// Create a new context for a run.
83    ///
84    /// Not typically called directly — the [`Engine`](crate::engine::Engine)
85    /// creates this when executing a [`WorkflowHandler`](crate::handler::WorkflowHandler).
86    pub fn new(run_id: Uuid, store: Arc<dyn RunStore>, provider: Arc<dyn AgentProvider>) -> Self {
87        Self {
88            run_id,
89            store,
90            provider,
91            handler_resolver: None,
92            position: 0,
93            total_cost_usd: Decimal::ZERO,
94            total_duration_ms: 0,
95        }
96    }
97
98    /// Create a new context with a handler resolver for sub-workflow support.
99    ///
100    /// The resolver is called when [`workflow`](Self::workflow) is invoked to
101    /// look up registered handlers by name.
102    pub(crate) fn with_handler_resolver(
103        run_id: Uuid,
104        store: Arc<dyn RunStore>,
105        provider: Arc<dyn AgentProvider>,
106        resolver: HandlerResolver,
107    ) -> Self {
108        Self {
109            run_id,
110            store,
111            provider,
112            handler_resolver: Some(resolver),
113            position: 0,
114            total_cost_usd: Decimal::ZERO,
115            total_duration_ms: 0,
116        }
117    }
118
119    /// The run ID this context is executing for.
120    pub fn run_id(&self) -> Uuid {
121        self.run_id
122    }
123
124    /// Accumulated cost across all executed steps so far.
125    pub fn total_cost_usd(&self) -> Decimal {
126        self.total_cost_usd
127    }
128
129    /// Accumulated duration across all executed steps so far.
130    pub fn total_duration_ms(&self) -> u64 {
131        self.total_duration_ms
132    }
133
134    /// Execute a shell step.
135    ///
136    /// Creates the step record, runs the command, persists the result,
137    /// and returns the output for use in subsequent steps.
138    ///
139    /// # Errors
140    ///
141    /// Returns [`EngineError`] if the command fails or the store errors.
142    ///
143    /// # Examples
144    ///
145    /// ```no_run
146    /// use ironflow_engine::context::WorkflowContext;
147    /// use ironflow_engine::config::ShellConfig;
148    /// use ironflow_engine::error::EngineError;
149    ///
150    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
151    /// let files = ctx.shell("list", ShellConfig::new("ls -la")).await?;
152    /// println!("stdout: {}", files.output["stdout"]);
153    /// # Ok(())
154    /// # }
155    /// ```
156    pub async fn shell(
157        &mut self,
158        name: &str,
159        config: ShellConfig,
160    ) -> Result<StepOutput, EngineError> {
161        self.execute_step(name, StepKind::Shell, StepConfig::Shell(config))
162            .await
163    }
164
165    /// Execute an HTTP step.
166    ///
167    /// # Errors
168    ///
169    /// Returns [`EngineError`] if the request fails or the store errors.
170    ///
171    /// # Examples
172    ///
173    /// ```no_run
174    /// use ironflow_engine::context::WorkflowContext;
175    /// use ironflow_engine::config::HttpConfig;
176    /// use ironflow_engine::error::EngineError;
177    ///
178    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
179    /// let resp = ctx.http("health", HttpConfig::get("https://api.example.com/health")).await?;
180    /// println!("status: {}", resp.output["status"]);
181    /// # Ok(())
182    /// # }
183    /// ```
184    pub async fn http(
185        &mut self,
186        name: &str,
187        config: HttpConfig,
188    ) -> Result<StepOutput, EngineError> {
189        self.execute_step(name, StepKind::Http, StepConfig::Http(config))
190            .await
191    }
192
193    /// Execute an agent step.
194    ///
195    /// # Errors
196    ///
197    /// Returns [`EngineError`] if the agent invocation fails or the store errors.
198    ///
199    /// # Examples
200    ///
201    /// ```no_run
202    /// use ironflow_engine::context::WorkflowContext;
203    /// use ironflow_engine::config::AgentStepConfig;
204    /// use ironflow_engine::error::EngineError;
205    ///
206    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
207    /// let review = ctx.agent("review", AgentStepConfig::new("Review the code")).await?;
208    /// println!("review: {}", review.output["value"]);
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub async fn agent(
213        &mut self,
214        name: &str,
215        config: AgentStepConfig,
216    ) -> Result<StepOutput, EngineError> {
217        self.execute_step(name, StepKind::Agent, StepConfig::Agent(config))
218            .await
219    }
220
221    /// Execute a custom operation step.
222    ///
223    /// Runs a user-defined [`Operation`] with full step lifecycle management:
224    /// creates the step record, transitions to Running, executes the operation,
225    /// persists the output and duration, and marks the step Completed or Failed.
226    ///
227    /// The operation's [`kind()`](Operation::kind) is stored as
228    /// [`StepKind::Custom`](ironflow_store::models::StepKind::Custom).
229    ///
230    /// # Errors
231    ///
232    /// Returns [`EngineError`] if the operation fails or the store errors.
233    ///
234    /// # Examples
235    ///
236    /// ```no_run
237    /// use ironflow_engine::context::WorkflowContext;
238    /// use ironflow_engine::operation::Operation;
239    /// use ironflow_engine::error::EngineError;
240    /// use serde_json::{Value, json};
241    /// use std::pin::Pin;
242    /// use std::future::Future;
243    ///
244    /// struct MyOp;
245    /// impl Operation for MyOp {
246    ///     fn kind(&self) -> &str { "my-service" }
247    ///     fn execute(&self) -> Pin<Box<dyn Future<Output = Result<Value, EngineError>> + Send + '_>> {
248    ///         Box::pin(async { Ok(json!({"ok": true})) })
249    ///     }
250    /// }
251    ///
252    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
253    /// let result = ctx.operation("call-service", &MyOp).await?;
254    /// println!("output: {}", result.output);
255    /// # Ok(())
256    /// # }
257    /// ```
258    pub async fn operation(
259        &mut self,
260        name: &str,
261        op: &dyn Operation,
262    ) -> Result<StepOutput, EngineError> {
263        let kind = StepKind::Custom(op.kind().to_string());
264        let position = self.position;
265        self.position += 1;
266
267        let step = self
268            .store
269            .create_step(NewStep {
270                run_id: self.run_id,
271                name: name.to_string(),
272                kind,
273                position,
274                input: op.input(),
275            })
276            .await?;
277
278        let now = Utc::now();
279        self.store
280            .update_step(
281                step.id,
282                StepUpdate {
283                    status: Some(StepStatus::Running),
284                    started_at: Some(now),
285                    ..StepUpdate::default()
286                },
287            )
288            .await?;
289
290        let start = std::time::Instant::now();
291
292        match op.execute().await {
293            Ok(output_value) => {
294                let duration_ms = start.elapsed().as_millis() as u64;
295                self.total_duration_ms += duration_ms;
296
297                let completed_at = Utc::now();
298                self.store
299                    .update_step(
300                        step.id,
301                        StepUpdate {
302                            status: Some(StepStatus::Completed),
303                            output: Some(output_value.clone()),
304                            duration_ms: Some(duration_ms),
305                            cost_usd: Some(Decimal::ZERO),
306                            completed_at: Some(completed_at),
307                            ..StepUpdate::default()
308                        },
309                    )
310                    .await?;
311
312                info!(
313                    run_id = %self.run_id,
314                    step = %name,
315                    kind = op.kind(),
316                    duration_ms,
317                    "operation step completed"
318                );
319
320                Ok(StepOutput {
321                    output: output_value,
322                    duration_ms,
323                    cost_usd: Decimal::ZERO,
324                    input_tokens: None,
325                    output_tokens: None,
326                })
327            }
328            Err(err) => {
329                let completed_at = Utc::now();
330                if let Err(store_err) = self
331                    .store
332                    .update_step(
333                        step.id,
334                        StepUpdate {
335                            status: Some(StepStatus::Failed),
336                            error: Some(err.to_string()),
337                            completed_at: Some(completed_at),
338                            ..StepUpdate::default()
339                        },
340                    )
341                    .await
342                {
343                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
344                }
345
346                Err(err)
347            }
348        }
349    }
350
351    /// Execute a sub-workflow step.
352    ///
353    /// Creates a child run for the named workflow handler, executes it with
354    /// its own steps and lifecycle, and returns a [`StepOutput`] containing
355    /// the child run ID and aggregated metrics.
356    ///
357    /// Requires the context to be created with
358    /// [`with_handler_resolver`](Self::with_handler_resolver).
359    ///
360    /// # Errors
361    ///
362    /// Returns [`EngineError::InvalidWorkflow`] if no handler is registered
363    /// with the given name, or if no handler resolver is available.
364    ///
365    /// # Examples
366    ///
367    /// ```no_run
368    /// use ironflow_engine::context::WorkflowContext;
369    /// use ironflow_engine::error::EngineError;
370    /// use serde_json::json;
371    ///
372    /// # async fn example(ctx: &mut WorkflowContext) -> Result<(), EngineError> {
373    /// // let result = ctx.workflow(&MySubWorkflow, json!({})).await?;
374    /// # Ok(())
375    /// # }
376    /// ```
377    pub async fn workflow(
378        &mut self,
379        handler: &dyn WorkflowHandler,
380        payload: Value,
381    ) -> Result<StepOutput, EngineError> {
382        let config = WorkflowStepConfig::new(handler.name(), payload);
383        let position = self.position;
384        self.position += 1;
385
386        let step = self
387            .store
388            .create_step(NewStep {
389                run_id: self.run_id,
390                name: config.workflow_name.clone(),
391                kind: StepKind::Workflow,
392                position,
393                input: Some(serde_json::to_value(&config)?),
394            })
395            .await?;
396
397        let now = Utc::now();
398        self.store
399            .update_step(
400                step.id,
401                StepUpdate {
402                    status: Some(StepStatus::Running),
403                    started_at: Some(now),
404                    ..StepUpdate::default()
405                },
406            )
407            .await?;
408
409        match self.execute_child_workflow(&config).await {
410            Ok(output) => {
411                self.total_cost_usd += output.cost_usd;
412                self.total_duration_ms += output.duration_ms;
413
414                let completed_at = Utc::now();
415                self.store
416                    .update_step(
417                        step.id,
418                        StepUpdate {
419                            status: Some(StepStatus::Completed),
420                            output: Some(output.output.clone()),
421                            duration_ms: Some(output.duration_ms),
422                            cost_usd: Some(output.cost_usd),
423                            completed_at: Some(completed_at),
424                            ..StepUpdate::default()
425                        },
426                    )
427                    .await?;
428
429                info!(
430                    run_id = %self.run_id,
431                    child_workflow = %config.workflow_name,
432                    duration_ms = output.duration_ms,
433                    "workflow step completed"
434                );
435
436                Ok(output)
437            }
438            Err(err) => {
439                let completed_at = Utc::now();
440                if let Err(store_err) = self
441                    .store
442                    .update_step(
443                        step.id,
444                        StepUpdate {
445                            status: Some(StepStatus::Failed),
446                            error: Some(err.to_string()),
447                            completed_at: Some(completed_at),
448                            ..StepUpdate::default()
449                        },
450                    )
451                    .await
452                {
453                    error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
454                }
455
456                Err(err)
457            }
458        }
459    }
460
461    /// Execute a child workflow and return aggregated output.
462    async fn execute_child_workflow(
463        &self,
464        config: &WorkflowStepConfig,
465    ) -> Result<StepOutput, EngineError> {
466        let resolver = self.handler_resolver.as_ref().ok_or_else(|| {
467            EngineError::InvalidWorkflow(
468                "sub-workflow requires a handler resolver (use Engine to execute)".to_string(),
469            )
470        })?;
471
472        let handler = resolver(&config.workflow_name).ok_or_else(|| {
473            EngineError::InvalidWorkflow(format!("no handler registered: {}", config.workflow_name))
474        })?;
475
476        let child_run = self
477            .store
478            .create_run(NewRun {
479                workflow_name: config.workflow_name.clone(),
480                trigger: TriggerKind::Workflow,
481                payload: config.payload.clone(),
482                max_retries: 0,
483            })
484            .await?;
485
486        let child_run_id = child_run.id;
487        info!(
488            parent_run_id = %self.run_id,
489            child_run_id = %child_run_id,
490            workflow = %config.workflow_name,
491            "child run created"
492        );
493
494        self.store
495            .update_run_status(child_run_id, RunStatus::Running)
496            .await?;
497
498        let run_start = Instant::now();
499        let mut child_ctx = WorkflowContext {
500            run_id: child_run_id,
501            store: self.store.clone(),
502            provider: self.provider.clone(),
503            handler_resolver: self.handler_resolver.clone(),
504            position: 0,
505            total_cost_usd: Decimal::ZERO,
506            total_duration_ms: 0,
507        };
508
509        let result = handler.execute(&mut child_ctx).await;
510        let total_duration = run_start.elapsed().as_millis() as u64;
511        let completed_at = Utc::now();
512
513        match result {
514            Ok(()) => {
515                self.store
516                    .update_run(
517                        child_run_id,
518                        RunUpdate {
519                            status: Some(RunStatus::Completed),
520                            cost_usd: Some(child_ctx.total_cost_usd),
521                            duration_ms: Some(total_duration),
522                            completed_at: Some(completed_at),
523                            ..RunUpdate::default()
524                        },
525                    )
526                    .await?;
527
528                Ok(StepOutput {
529                    output: serde_json::json!({
530                        "run_id": child_run_id,
531                        "workflow_name": config.workflow_name,
532                        "status": RunStatus::Completed,
533                        "cost_usd": child_ctx.total_cost_usd,
534                        "duration_ms": total_duration,
535                    }),
536                    duration_ms: total_duration,
537                    cost_usd: child_ctx.total_cost_usd,
538                    input_tokens: None,
539                    output_tokens: None,
540                })
541            }
542            Err(err) => {
543                if let Err(store_err) = self
544                    .store
545                    .update_run(
546                        child_run_id,
547                        RunUpdate {
548                            status: Some(RunStatus::Failed),
549                            error: Some(err.to_string()),
550                            cost_usd: Some(child_ctx.total_cost_usd),
551                            duration_ms: Some(total_duration),
552                            completed_at: Some(completed_at),
553                            ..RunUpdate::default()
554                        },
555                    )
556                    .await
557                {
558                    error!(
559                        child_run_id = %child_run_id,
560                        store_error = %store_err,
561                        "failed to persist child run failure"
562                    );
563                }
564
565                Err(err)
566            }
567        }
568    }
569
570    /// Internal: execute a step with full persistence lifecycle.
571    async fn execute_step(
572        &mut self,
573        name: &str,
574        kind: StepKind,
575        config: StepConfig,
576    ) -> Result<StepOutput, EngineError> {
577        let position = self.position;
578        self.position += 1;
579
580        // Create step record in Pending.
581        let step = self
582            .store
583            .create_step(NewStep {
584                run_id: self.run_id,
585                name: name.to_string(),
586                kind,
587                position,
588                input: Some(serde_json::to_value(&config)?),
589            })
590            .await?;
591
592        // Transition to Running.
593        let now = Utc::now();
594        self.store
595            .update_step(
596                step.id,
597                StepUpdate {
598                    status: Some(StepStatus::Running),
599                    started_at: Some(now),
600                    ..StepUpdate::default()
601                },
602            )
603            .await?;
604
605        // Execute the operation.
606        match execute_step_config(&config, &self.provider).await {
607            Ok(output) => {
608                self.total_cost_usd += output.cost_usd;
609                self.total_duration_ms += output.duration_ms;
610
611                let completed_at = Utc::now();
612                self.store
613                    .update_step(
614                        step.id,
615                        StepUpdate {
616                            status: Some(StepStatus::Completed),
617                            output: Some(output.output.clone()),
618                            duration_ms: Some(output.duration_ms),
619                            cost_usd: Some(output.cost_usd),
620                            input_tokens: output.input_tokens,
621                            output_tokens: output.output_tokens,
622                            completed_at: Some(completed_at),
623                            ..StepUpdate::default()
624                        },
625                    )
626                    .await?;
627
628                info!(
629                    run_id = %self.run_id,
630                    step = %name,
631                    duration_ms = output.duration_ms,
632                    "step completed"
633                );
634
635                Ok(output)
636            }
637            Err(err) => {
638                let completed_at = Utc::now();
639                if let Err(store_err) = self
640                    .store
641                    .update_step(
642                        step.id,
643                        StepUpdate {
644                            status: Some(StepStatus::Failed),
645                            error: Some(err.to_string()),
646                            completed_at: Some(completed_at),
647                            ..StepUpdate::default()
648                        },
649                    )
650                    .await
651                {
652                    tracing::error!(step_id = %step.id, error = %store_err, "failed to persist step failure");
653                }
654
655                Err(err)
656            }
657        }
658    }
659
660    /// Access the store directly (advanced usage).
661    pub fn store(&self) -> &Arc<dyn RunStore> {
662        &self.store
663    }
664
665    /// Access the payload that triggered this run.
666    ///
667    /// Fetches the run from the store and returns its payload.
668    ///
669    /// # Errors
670    ///
671    /// Returns [`EngineError::Store`] if the run is not found.
672    pub async fn payload(&self) -> Result<Value, EngineError> {
673        let run = self
674            .store
675            .get_run(self.run_id)
676            .await?
677            .ok_or(EngineError::Store(
678                ironflow_store::error::StoreError::RunNotFound(self.run_id),
679            ))?;
680        Ok(run.payload)
681    }
682}
683
684impl std::fmt::Debug for WorkflowContext {
685    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
686        f.debug_struct("WorkflowContext")
687            .field("run_id", &self.run_id)
688            .field("position", &self.position)
689            .field("total_cost_usd", &self.total_cost_usd)
690            .finish_non_exhaustive()
691    }
692}