Skip to main content

ironflow_store/
store.rs

1//! The [`RunStore`] trait — async storage abstraction for runs and steps.
2//!
3//! Implement this trait to plug in any backing store. Built-in implementations:
4//!
5//! - [`InMemoryStore`](crate::memory::InMemoryStore) — development and testing.
6//! - `PostgresStore` — production (behind the `store-postgres` feature).
7
8use std::future::Future;
9use std::pin::Pin;
10
11use uuid::Uuid;
12
13use crate::entities::{
14    NewRun, NewStep, NewStepDependency, Page, Run, RunFilter, RunStats, RunStatus, RunUpdate, Step,
15    StepDependency, StepUpdate,
16};
17use crate::error::StoreError;
18
19/// Boxed future for [`RunStore`] methods — ensures object safety for `dyn RunStore`.
20pub type StoreFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, StoreError>> + Send + 'a>>;
21
22/// Async storage abstraction for workflow runs and steps.
23///
24/// All methods return a [`StoreFuture`] (boxed future) to maintain object safety,
25/// allowing the store to be used as `Arc<dyn RunStore>`.
26///
27/// # Examples
28///
29/// ```no_run
30/// use ironflow_store::prelude::*;
31/// use serde_json::json;
32/// use uuid::Uuid;
33///
34/// # async fn example() -> Result<(), ironflow_store::error::StoreError> {
35/// let store = InMemoryStore::new();
36///
37/// let run = store.create_run(NewRun {
38///     workflow_name: "deploy".to_string(),
39///     trigger: TriggerKind::Manual,
40///     payload: json!({}),
41///     max_retries: 3,
42/// }).await?;
43///
44/// let fetched = store.get_run(run.id).await?;
45/// assert!(fetched.is_some());
46/// # Ok(())
47/// # }
48/// ```
49pub trait RunStore: Send + Sync {
50    /// Create a new run in `Pending` status.
51    fn create_run(&self, req: NewRun) -> StoreFuture<'_, Run>;
52
53    /// Get a run by ID. Returns `None` if not found.
54    fn get_run(&self, id: Uuid) -> StoreFuture<'_, Option<Run>>;
55
56    /// List runs matching the given filter, with pagination.
57    ///
58    /// Results are ordered by `created_at` descending (newest first).
59    fn list_runs(&self, filter: RunFilter, page: u32, per_page: u32) -> StoreFuture<'_, Page<Run>>;
60
61    /// Update a run's status with FSM validation.
62    ///
63    /// # Errors
64    ///
65    /// Returns [`StoreError::InvalidTransition`] if the transition is not allowed.
66    /// Returns [`StoreError::RunNotFound`] if the run does not exist.
67    fn update_run_status(&self, id: Uuid, new_status: RunStatus) -> StoreFuture<'_, ()>;
68
69    /// Apply a partial update to a run.
70    ///
71    /// # Errors
72    ///
73    /// Returns [`StoreError::RunNotFound`] if the run does not exist.
74    fn update_run(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, ()>;
75
76    /// Atomically pick the oldest pending run and transition it to `Running`.
77    ///
78    /// In PostgreSQL, this uses `SELECT FOR UPDATE SKIP LOCKED` for safe
79    /// multi-worker concurrency. The in-memory implementation uses a write lock.
80    ///
81    /// Returns `None` if no pending runs are available.
82    fn pick_next_pending(&self) -> StoreFuture<'_, Option<Run>>;
83
84    /// Create a new step for a run.
85    ///
86    /// # Errors
87    ///
88    /// Returns [`StoreError::RunNotFound`] if the parent run does not exist.
89    fn create_step(&self, step: NewStep) -> StoreFuture<'_, Step>;
90
91    /// Apply a partial update to a step after execution.
92    ///
93    /// # Errors
94    ///
95    /// Returns [`StoreError::StepNotFound`] if the step does not exist.
96    fn update_step(&self, id: Uuid, update: StepUpdate) -> StoreFuture<'_, ()>;
97
98    /// List all steps for a run, ordered by position ascending.
99    fn list_steps(&self, run_id: Uuid) -> StoreFuture<'_, Vec<Step>>;
100
101    /// Get aggregated statistics across all runs.
102    ///
103    /// Returns counts of runs by terminal state, counts of active runs,
104    /// and totals for cost and duration. Computed efficiently by the store
105    /// implementation (single SQL query in PostgreSQL).
106    fn get_stats(&self) -> StoreFuture<'_, RunStats>;
107
108    /// Create step dependency edges in batch.
109    ///
110    /// Each entry records that `step_id` depends on `depends_on`.
111    /// Duplicate edges are silently ignored.
112    ///
113    /// # Errors
114    ///
115    /// Returns [`StoreError`] if a referenced step does not exist.
116    fn create_step_dependencies(&self, deps: Vec<NewStepDependency>) -> StoreFuture<'_, ()>;
117
118    /// List all step dependencies for a given run.
119    ///
120    /// Returns every edge where either `step_id` or `depends_on` belongs
121    /// to the run. Ordered by `created_at` ascending.
122    fn list_step_dependencies(&self, run_id: Uuid) -> StoreFuture<'_, Vec<StepDependency>>;
123
124    /// Apply a partial update to a run and return the updated run.
125    ///
126    /// Combines [`update_run`](Self::update_run) and [`get_run`](Self::get_run) in
127    /// a single operation to avoid an extra round-trip. Store implementations
128    /// may override this for efficiency (e.g. reading within the same transaction).
129    ///
130    /// The default implementation calls `update_run` followed by `get_run`.
131    ///
132    /// # Errors
133    ///
134    /// Returns [`StoreError::RunNotFound`] if the run does not exist.
135    /// Returns [`StoreError::InvalidTransition`] if the status transition is not allowed.
136    fn update_run_returning(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, Run> {
137        Box::pin(async move {
138            self.update_run(id, update).await?;
139            self.get_run(id).await?.ok_or(StoreError::RunNotFound(id))
140        })
141    }
142}