Skip to main content

ironflow_store/
store.rs

1//! The [`RunStore`] trait — async storage abstraction for runs and steps.
2//!
3//! Implement this trait to plug in any backing store. Built-in implementations:
4//!
5//! - [`InMemoryStore`](crate::memory::InMemoryStore) — development and testing.
6//! - `PostgresStore` — production (behind the `store-postgres` feature).
7
8use std::future::Future;
9use std::pin::Pin;
10
11use uuid::Uuid;
12
13use crate::api_key_store::ApiKeyStore;
14use crate::audit_log_store::AuditLogStore;
15use crate::entities::{
16    NewRun, NewStep, NewStepDependency, Page, Run, RunFilter, RunStats, RunStatus, RunUpdate, Step,
17    StepDependency, StepUpdate,
18};
19use crate::error::StoreError;
20use crate::secret_store::SecretStore;
21use crate::user_store::UserStore;
22
23/// Boxed future for [`RunStore`] methods — ensures object safety for `dyn RunStore`.
24pub type StoreFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, StoreError>> + Send + 'a>>;
25
26/// Async storage abstraction for workflow runs and steps.
27///
28/// All methods return a [`StoreFuture`] (boxed future) to maintain object safety,
29/// allowing the store to be used as `Arc<dyn RunStore>`.
30///
31/// # Examples
32///
33/// ```no_run
34/// use std::collections::HashMap;
35/// use ironflow_store::prelude::*;
36/// use serde_json::json;
37/// use uuid::Uuid;
38///
39/// # async fn example() -> Result<(), ironflow_store::error::StoreError> {
40/// let store = InMemoryStore::new();
41///
42/// let run = store.create_run(NewRun {
43///     workflow_name: "deploy".to_string(),
44///     trigger: TriggerKind::Manual,
45///     payload: json!({}),
46///     max_retries: 3,
47///     handler_version: None,
48///     labels: HashMap::new(),
49///     scheduled_at: None,
50/// }).await?;
51///
52/// let fetched = store.get_run(run.id).await?;
53/// assert!(fetched.is_some());
54/// # Ok(())
55/// # }
56/// ```
57pub trait RunStore: Send + Sync {
58    /// Create a new run in `Pending` status.
59    fn create_run(&self, req: NewRun) -> StoreFuture<'_, Run>;
60
61    /// Get a run by ID. Returns `None` if not found.
62    fn get_run(&self, id: Uuid) -> StoreFuture<'_, Option<Run>>;
63
64    /// List runs matching the given filter, with pagination.
65    ///
66    /// Results are ordered by `created_at` descending (newest first).
67    fn list_runs(&self, filter: RunFilter, page: u32, per_page: u32) -> StoreFuture<'_, Page<Run>>;
68
69    /// Update a run's status with FSM validation.
70    ///
71    /// # Errors
72    ///
73    /// Returns [`StoreError::InvalidTransition`] if the transition is not allowed.
74    /// Returns [`StoreError::RunNotFound`] if the run does not exist.
75    fn update_run_status(&self, id: Uuid, new_status: RunStatus) -> StoreFuture<'_, ()>;
76
77    /// Apply a partial update to a run.
78    ///
79    /// # Errors
80    ///
81    /// Returns [`StoreError::RunNotFound`] if the run does not exist.
82    fn update_run(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, ()>;
83
84    /// Atomically pick the oldest pending run and transition it to `Running`.
85    ///
86    /// In PostgreSQL, this uses `SELECT FOR UPDATE SKIP LOCKED` for safe
87    /// multi-worker concurrency. The in-memory implementation uses a write lock.
88    ///
89    /// Returns `None` if no pending runs are available.
90    fn pick_next_pending(&self) -> StoreFuture<'_, Option<Run>>;
91
92    /// Create a new step for a run.
93    ///
94    /// # Errors
95    ///
96    /// Returns [`StoreError::RunNotFound`] if the parent run does not exist.
97    fn create_step(&self, step: NewStep) -> StoreFuture<'_, Step>;
98
99    /// Apply a partial update to a step after execution.
100    ///
101    /// # Errors
102    ///
103    /// Returns [`StoreError::StepNotFound`] if the step does not exist.
104    fn update_step(&self, id: Uuid, update: StepUpdate) -> StoreFuture<'_, ()>;
105
106    /// Get a single step by ID. Returns `None` if not found.
107    fn get_step(&self, id: Uuid) -> StoreFuture<'_, Option<Step>>;
108
109    /// List all steps for a run, ordered by position ascending.
110    fn list_steps(&self, run_id: Uuid) -> StoreFuture<'_, Vec<Step>>;
111
112    /// Get aggregated statistics across runs matching the filter.
113    ///
114    /// Returns counts of runs by terminal state, counts of active runs,
115    /// and totals for cost and duration. Computed efficiently by the store
116    /// implementation (single SQL query in PostgreSQL).
117    ///
118    /// Pass [`RunFilter::default()`] to get stats across all runs.
119    fn get_stats(&self, filter: RunFilter) -> StoreFuture<'_, RunStats>;
120
121    /// Create step dependency edges in batch.
122    ///
123    /// Each entry records that `step_id` depends on `depends_on`.
124    /// Duplicate edges are silently ignored.
125    ///
126    /// # Errors
127    ///
128    /// Returns [`StoreError`] if a referenced step does not exist.
129    fn create_step_dependencies(&self, deps: Vec<NewStepDependency>) -> StoreFuture<'_, ()>;
130
131    /// List all step dependencies for a given run.
132    ///
133    /// Returns every edge where either `step_id` or `depends_on` belongs
134    /// to the run. Ordered by `created_at` ascending.
135    fn list_step_dependencies(&self, run_id: Uuid) -> StoreFuture<'_, Vec<StepDependency>>;
136
137    /// Apply a partial update to a run and return the updated run.
138    ///
139    /// Combines [`update_run`](Self::update_run) and [`get_run`](Self::get_run) in
140    /// a single operation to avoid an extra round-trip. Store implementations
141    /// may override this for efficiency (e.g. reading within the same transaction).
142    ///
143    /// The default implementation calls `update_run` followed by `get_run`.
144    ///
145    /// # Errors
146    ///
147    /// Returns [`StoreError::RunNotFound`] if the run does not exist.
148    /// Returns [`StoreError::InvalidTransition`] if the status transition is not allowed.
149    fn update_run_returning(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, Run> {
150        Box::pin(async move {
151            self.update_run(id, update).await?;
152            self.get_run(id).await?.ok_or(StoreError::RunNotFound(id))
153        })
154    }
155}
156
157/// Unified storage abstraction combining all store capabilities.
158///
159/// Implementors provide runs, steps, users, API keys, and secrets
160/// through a single type. Pick one backend (in-memory or PostgreSQL)
161/// and it handles everything.
162///
163/// Both [`InMemoryStore`](crate::memory::InMemoryStore) and
164/// [`PostgresStore`](crate::postgres::PostgresStore) implement this trait.
165///
166/// # Examples
167///
168/// ```no_run
169/// use std::collections::HashMap;
170/// use std::sync::Arc;
171/// use ironflow_store::prelude::*;
172///
173/// # async fn example() -> Result<(), ironflow_store::error::StoreError> {
174/// let store: Arc<dyn Store> = Arc::new(InMemoryStore::new());
175///
176/// // All capabilities through one reference
177/// let _run = store.create_run(NewRun {
178///     workflow_name: "deploy".to_string(),
179///     trigger: TriggerKind::Manual,
180///     payload: serde_json::json!({}),
181///     max_retries: 3,
182///     handler_version: None,
183///     labels: HashMap::new(),
184///     scheduled_at: None,
185/// }).await?;
186/// let _users = store.count_users().await?;
187/// # Ok(())
188/// # }
189/// ```
190pub trait Store: RunStore + UserStore + ApiKeyStore + SecretStore + AuditLogStore {}
191
192impl<T: RunStore + UserStore + ApiKeyStore + SecretStore + AuditLogStore> Store for T {}