ironflow_store/store.rs
1//! The [`RunStore`] trait — async storage abstraction for runs and steps.
2//!
3//! Implement this trait to plug in any backing store. Built-in implementations:
4//!
5//! - [`InMemoryStore`](crate::memory::InMemoryStore) — development and testing.
6//! - `PostgresStore` — production (behind the `store-postgres` feature).
7
8use std::future::Future;
9use std::pin::Pin;
10
11use uuid::Uuid;
12
13use crate::api_key_store::ApiKeyStore;
14use crate::entities::{
15 NewRun, NewStep, NewStepDependency, Page, Run, RunFilter, RunStats, RunStatus, RunUpdate, Step,
16 StepDependency, StepUpdate,
17};
18use crate::error::StoreError;
19use crate::secret_store::SecretStore;
20use crate::user_store::UserStore;
21
22/// Boxed future for [`RunStore`] methods — ensures object safety for `dyn RunStore`.
23pub type StoreFuture<'a, T> = Pin<Box<dyn Future<Output = Result<T, StoreError>> + Send + 'a>>;
24
25/// Async storage abstraction for workflow runs and steps.
26///
27/// All methods return a [`StoreFuture`] (boxed future) to maintain object safety,
28/// allowing the store to be used as `Arc<dyn RunStore>`.
29///
30/// # Examples
31///
32/// ```no_run
33/// use ironflow_store::prelude::*;
34/// use serde_json::json;
35/// use uuid::Uuid;
36///
37/// # async fn example() -> Result<(), ironflow_store::error::StoreError> {
38/// let store = InMemoryStore::new();
39///
40/// let run = store.create_run(NewRun {
41/// workflow_name: "deploy".to_string(),
42/// trigger: TriggerKind::Manual,
43/// payload: json!({}),
44/// max_retries: 3,
45/// handler_version: None,
46/// }).await?;
47///
48/// let fetched = store.get_run(run.id).await?;
49/// assert!(fetched.is_some());
50/// # Ok(())
51/// # }
52/// ```
53pub trait RunStore: Send + Sync {
54 /// Create a new run in `Pending` status.
55 fn create_run(&self, req: NewRun) -> StoreFuture<'_, Run>;
56
57 /// Get a run by ID. Returns `None` if not found.
58 fn get_run(&self, id: Uuid) -> StoreFuture<'_, Option<Run>>;
59
60 /// List runs matching the given filter, with pagination.
61 ///
62 /// Results are ordered by `created_at` descending (newest first).
63 fn list_runs(&self, filter: RunFilter, page: u32, per_page: u32) -> StoreFuture<'_, Page<Run>>;
64
65 /// Update a run's status with FSM validation.
66 ///
67 /// # Errors
68 ///
69 /// Returns [`StoreError::InvalidTransition`] if the transition is not allowed.
70 /// Returns [`StoreError::RunNotFound`] if the run does not exist.
71 fn update_run_status(&self, id: Uuid, new_status: RunStatus) -> StoreFuture<'_, ()>;
72
73 /// Apply a partial update to a run.
74 ///
75 /// # Errors
76 ///
77 /// Returns [`StoreError::RunNotFound`] if the run does not exist.
78 fn update_run(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, ()>;
79
80 /// Atomically pick the oldest pending run and transition it to `Running`.
81 ///
82 /// In PostgreSQL, this uses `SELECT FOR UPDATE SKIP LOCKED` for safe
83 /// multi-worker concurrency. The in-memory implementation uses a write lock.
84 ///
85 /// Returns `None` if no pending runs are available.
86 fn pick_next_pending(&self) -> StoreFuture<'_, Option<Run>>;
87
88 /// Create a new step for a run.
89 ///
90 /// # Errors
91 ///
92 /// Returns [`StoreError::RunNotFound`] if the parent run does not exist.
93 fn create_step(&self, step: NewStep) -> StoreFuture<'_, Step>;
94
95 /// Apply a partial update to a step after execution.
96 ///
97 /// # Errors
98 ///
99 /// Returns [`StoreError::StepNotFound`] if the step does not exist.
100 fn update_step(&self, id: Uuid, update: StepUpdate) -> StoreFuture<'_, ()>;
101
102 /// Get a single step by ID. Returns `None` if not found.
103 fn get_step(&self, id: Uuid) -> StoreFuture<'_, Option<Step>>;
104
105 /// List all steps for a run, ordered by position ascending.
106 fn list_steps(&self, run_id: Uuid) -> StoreFuture<'_, Vec<Step>>;
107
108 /// Get aggregated statistics across runs matching the filter.
109 ///
110 /// Returns counts of runs by terminal state, counts of active runs,
111 /// and totals for cost and duration. Computed efficiently by the store
112 /// implementation (single SQL query in PostgreSQL).
113 ///
114 /// Pass [`RunFilter::default()`] to get stats across all runs.
115 fn get_stats(&self, filter: RunFilter) -> StoreFuture<'_, RunStats>;
116
117 /// Create step dependency edges in batch.
118 ///
119 /// Each entry records that `step_id` depends on `depends_on`.
120 /// Duplicate edges are silently ignored.
121 ///
122 /// # Errors
123 ///
124 /// Returns [`StoreError`] if a referenced step does not exist.
125 fn create_step_dependencies(&self, deps: Vec<NewStepDependency>) -> StoreFuture<'_, ()>;
126
127 /// List all step dependencies for a given run.
128 ///
129 /// Returns every edge where either `step_id` or `depends_on` belongs
130 /// to the run. Ordered by `created_at` ascending.
131 fn list_step_dependencies(&self, run_id: Uuid) -> StoreFuture<'_, Vec<StepDependency>>;
132
133 /// Apply a partial update to a run and return the updated run.
134 ///
135 /// Combines [`update_run`](Self::update_run) and [`get_run`](Self::get_run) in
136 /// a single operation to avoid an extra round-trip. Store implementations
137 /// may override this for efficiency (e.g. reading within the same transaction).
138 ///
139 /// The default implementation calls `update_run` followed by `get_run`.
140 ///
141 /// # Errors
142 ///
143 /// Returns [`StoreError::RunNotFound`] if the run does not exist.
144 /// Returns [`StoreError::InvalidTransition`] if the status transition is not allowed.
145 fn update_run_returning(&self, id: Uuid, update: RunUpdate) -> StoreFuture<'_, Run> {
146 Box::pin(async move {
147 self.update_run(id, update).await?;
148 self.get_run(id).await?.ok_or(StoreError::RunNotFound(id))
149 })
150 }
151}
152
153/// Unified storage abstraction combining all store capabilities.
154///
155/// Implementors provide runs, steps, users, API keys, and secrets
156/// through a single type. Pick one backend (in-memory or PostgreSQL)
157/// and it handles everything.
158///
159/// Both [`InMemoryStore`](crate::memory::InMemoryStore) and
160/// [`PostgresStore`](crate::postgres::PostgresStore) implement this trait.
161///
162/// # Examples
163///
164/// ```no_run
165/// use std::sync::Arc;
166/// use ironflow_store::prelude::*;
167///
168/// # async fn example() -> Result<(), ironflow_store::error::StoreError> {
169/// let store: Arc<dyn Store> = Arc::new(InMemoryStore::new());
170///
171/// // All capabilities through one reference
172/// let _run = store.create_run(NewRun {
173/// workflow_name: "deploy".to_string(),
174/// trigger: TriggerKind::Manual,
175/// payload: serde_json::json!({}),
176/// max_retries: 3,
177/// handler_version: None,
178/// }).await?;
179/// let _users = store.count_users().await?;
180/// # Ok(())
181/// # }
182/// ```
183pub trait Store: RunStore + UserStore + ApiKeyStore + SecretStore {}
184
185impl<T: RunStore + UserStore + ApiKeyStore + SecretStore> Store for T {}