Skip to main content

assay_workflow/store/
mod.rs

1pub mod postgres;
2pub mod sqlite;
3
4use std::future::Future;
5
6use crate::types::*;
7
8/// Core storage trait for the workflow engine.
9///
10/// All database access goes through this trait. Methods that operate on
11/// namespace-scoped data take a `namespace` parameter. The "main"
12/// namespace is always available.
13///
14/// All methods return `Send` futures so they can be used from `tokio::spawn`.
15pub trait WorkflowStore: Send + Sync + 'static {
16    // ── Namespaces ─────────────────────────────────────────
17
18    fn create_namespace(
19        &self,
20        name: &str,
21    ) -> impl Future<Output = anyhow::Result<()>> + Send;
22
23    fn list_namespaces(
24        &self,
25    ) -> impl Future<Output = anyhow::Result<Vec<NamespaceRecord>>> + Send;
26
27    fn delete_namespace(
28        &self,
29        name: &str,
30    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
31
32    fn get_namespace_stats(
33        &self,
34        namespace: &str,
35    ) -> impl Future<Output = anyhow::Result<NamespaceStats>> + Send;
36
37    // ── Workflows ──────────────────────────────────────────
38
39    fn create_workflow(
40        &self,
41        workflow: &WorkflowRecord,
42    ) -> impl Future<Output = anyhow::Result<()>> + Send;
43
44    fn get_workflow(
45        &self,
46        id: &str,
47    ) -> impl Future<Output = anyhow::Result<Option<WorkflowRecord>>> + Send;
48
49    fn list_workflows(
50        &self,
51        namespace: &str,
52        status: Option<WorkflowStatus>,
53        workflow_type: Option<&str>,
54        limit: i64,
55        offset: i64,
56    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowRecord>>> + Send;
57
58    fn update_workflow_status(
59        &self,
60        id: &str,
61        status: WorkflowStatus,
62        result: Option<&str>,
63        error: Option<&str>,
64    ) -> impl Future<Output = anyhow::Result<()>> + Send;
65
66    fn claim_workflow(
67        &self,
68        id: &str,
69        worker_id: &str,
70    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
71
72    // ── Workflow-task dispatch (Phase 9) ────────────────────
73
74    /// Mark a workflow as having new events that need a worker to replay it.
75    /// Idempotent — calling repeatedly is fine. Cleared by `claim_workflow_task`.
76    fn mark_workflow_dispatchable(
77        &self,
78        workflow_id: &str,
79    ) -> impl Future<Output = anyhow::Result<()>> + Send;
80
81    /// Atomically claim the oldest dispatchable workflow on a queue. Sets
82    /// `dispatch_claimed_by` and `dispatch_last_heartbeat`, clears
83    /// `needs_dispatch`. Returns the workflow record or None if nothing
84    /// is available.
85    fn claim_workflow_task(
86        &self,
87        task_queue: &str,
88        worker_id: &str,
89    ) -> impl Future<Output = anyhow::Result<Option<WorkflowRecord>>> + Send;
90
91    /// Release a workflow task's dispatch lease (called when the worker
92    /// submits its commands batch). Only succeeds if `dispatch_claimed_by`
93    /// matches the calling worker.
94    fn release_workflow_task(
95        &self,
96        workflow_id: &str,
97        worker_id: &str,
98    ) -> impl Future<Output = anyhow::Result<()>> + Send;
99
100    /// Forcibly release dispatch leases whose worker hasn't heartbeat'd
101    /// within `timeout_secs`. Used by the engine's background poller to
102    /// recover from worker crashes. Returns how many leases were released
103    /// (each becomes claimable again, with `needs_dispatch=true`).
104    fn release_stale_dispatch_leases(
105        &self,
106        now: f64,
107        timeout_secs: f64,
108    ) -> impl Future<Output = anyhow::Result<u64>> + Send;
109
110    // ── Events ─────────────────────────────────────────────
111
112    fn append_event(
113        &self,
114        event: &WorkflowEvent,
115    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
116
117    fn list_events(
118        &self,
119        workflow_id: &str,
120    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowEvent>>> + Send;
121
122    fn get_event_count(
123        &self,
124        workflow_id: &str,
125    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
126
127    // ── Activities ──────────────────────────────────────────
128
129    fn create_activity(
130        &self,
131        activity: &WorkflowActivity,
132    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
133
134    /// Look up an activity by its primary key.
135    fn get_activity(
136        &self,
137        id: i64,
138    ) -> impl Future<Output = anyhow::Result<Option<WorkflowActivity>>> + Send;
139
140    /// Look up an activity by its workflow-relative sequence number.
141    /// Used for idempotent scheduling: the engine checks if (workflow_id, seq)
142    /// already exists before creating a new row.
143    fn get_activity_by_workflow_seq(
144        &self,
145        workflow_id: &str,
146        seq: i32,
147    ) -> impl Future<Output = anyhow::Result<Option<WorkflowActivity>>> + Send;
148
149    fn claim_activity(
150        &self,
151        task_queue: &str,
152        worker_id: &str,
153    ) -> impl Future<Output = anyhow::Result<Option<WorkflowActivity>>> + Send;
154
155    /// Re-queue an activity for retry: clears the running state
156    /// (status→PENDING, claimed_by/started_at cleared), bumps `attempt`,
157    /// and sets `scheduled_at = now + backoff` so the next claim_activity
158    /// won't pick it up before the backoff elapses.
159    fn requeue_activity_for_retry(
160        &self,
161        id: i64,
162        next_attempt: i32,
163        next_scheduled_at: f64,
164    ) -> impl Future<Output = anyhow::Result<()>> + Send;
165
166    fn complete_activity(
167        &self,
168        id: i64,
169        result: Option<&str>,
170        error: Option<&str>,
171        failed: bool,
172    ) -> impl Future<Output = anyhow::Result<()>> + Send;
173
174    fn heartbeat_activity(
175        &self,
176        id: i64,
177        details: Option<&str>,
178    ) -> impl Future<Output = anyhow::Result<()>> + Send;
179
180    fn get_timed_out_activities(
181        &self,
182        now: f64,
183    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowActivity>>> + Send;
184
185    // ── Timers ──────────────────────────────────────────────
186
187    /// Mark all PENDING activities of a workflow as CANCELLED so workers
188    /// that haven't claimed them yet won't pick them up. Returns the
189    /// number of rows affected. Does NOT touch RUNNING activities — those
190    /// will see the cancellation when they next heartbeat or complete.
191    fn cancel_pending_activities(
192        &self,
193        workflow_id: &str,
194    ) -> impl Future<Output = anyhow::Result<u64>> + Send;
195
196    /// Mark all unfired timers of a workflow as fired without firing
197    /// (effectively removing them from the timer poller). Returns the
198    /// number of rows affected.
199    fn cancel_pending_timers(
200        &self,
201        workflow_id: &str,
202    ) -> impl Future<Output = anyhow::Result<u64>> + Send;
203
204    fn create_timer(
205        &self,
206        timer: &WorkflowTimer,
207    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
208
209    /// Look up an existing timer by its workflow-relative seq. Used by the
210    /// engine for idempotent ScheduleTimer (deterministic replay can call
211    /// schedule_timer for the same seq more than once on retries).
212    fn get_timer_by_workflow_seq(
213        &self,
214        workflow_id: &str,
215        seq: i32,
216    ) -> impl Future<Output = anyhow::Result<Option<WorkflowTimer>>> + Send;
217
218    fn fire_due_timers(
219        &self,
220        now: f64,
221    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowTimer>>> + Send;
222
223    // ── Signals ─────────────────────────────────────────────
224
225    fn send_signal(
226        &self,
227        signal: &WorkflowSignal,
228    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
229
230    fn consume_signals(
231        &self,
232        workflow_id: &str,
233        name: &str,
234    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowSignal>>> + Send;
235
236    // ── Schedules ───────────────────────────────────────────
237
238    fn create_schedule(
239        &self,
240        schedule: &WorkflowSchedule,
241    ) -> impl Future<Output = anyhow::Result<()>> + Send;
242
243    fn get_schedule(
244        &self,
245        namespace: &str,
246        name: &str,
247    ) -> impl Future<Output = anyhow::Result<Option<WorkflowSchedule>>> + Send;
248
249    fn list_schedules(
250        &self,
251        namespace: &str,
252    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowSchedule>>> + Send;
253
254    fn update_schedule_last_run(
255        &self,
256        namespace: &str,
257        name: &str,
258        last_run_at: f64,
259        next_run_at: f64,
260        workflow_id: &str,
261    ) -> impl Future<Output = anyhow::Result<()>> + Send;
262
263    fn delete_schedule(
264        &self,
265        namespace: &str,
266        name: &str,
267    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
268
269    // ── Workers ─────────────────────────────────────────────
270
271    fn register_worker(
272        &self,
273        worker: &WorkflowWorker,
274    ) -> impl Future<Output = anyhow::Result<()>> + Send;
275
276    fn heartbeat_worker(
277        &self,
278        id: &str,
279        now: f64,
280    ) -> impl Future<Output = anyhow::Result<()>> + Send;
281
282    fn list_workers(
283        &self,
284        namespace: &str,
285    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowWorker>>> + Send;
286
287    fn remove_dead_workers(
288        &self,
289        cutoff: f64,
290    ) -> impl Future<Output = anyhow::Result<Vec<String>>> + Send;
291
292    // ── API Keys ────────────────────────────────────────────
293
294    fn create_api_key(
295        &self,
296        key_hash: &str,
297        prefix: &str,
298        label: Option<&str>,
299        created_at: f64,
300    ) -> impl Future<Output = anyhow::Result<()>> + Send;
301
302    fn validate_api_key(
303        &self,
304        key_hash: &str,
305    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
306
307    fn list_api_keys(
308        &self,
309    ) -> impl Future<Output = anyhow::Result<Vec<ApiKeyRecord>>> + Send;
310
311    fn revoke_api_key(
312        &self,
313        prefix: &str,
314    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
315
316    // ── Child Workflows ─────────────────────────────────────
317
318    fn list_child_workflows(
319        &self,
320        parent_id: &str,
321    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowRecord>>> + Send;
322
323    // ── Snapshots ───────────────────────────────────────────
324
325    fn create_snapshot(
326        &self,
327        workflow_id: &str,
328        event_seq: i32,
329        state_json: &str,
330    ) -> impl Future<Output = anyhow::Result<()>> + Send;
331
332    fn get_latest_snapshot(
333        &self,
334        workflow_id: &str,
335    ) -> impl Future<Output = anyhow::Result<Option<WorkflowSnapshot>>> + Send;
336
337    // ── Queue Stats ─────────────────────────────────────────
338
339    fn get_queue_stats(
340        &self,
341        namespace: &str,
342    ) -> impl Future<Output = anyhow::Result<Vec<QueueStats>>> + Send;
343
344    // ── Leader Election ─────────────────────────────────────
345
346    /// Try to acquire the scheduler lock for leader election.
347    /// Returns true if this instance should run the cron scheduler.
348    ///
349    /// - SQLite: always returns true (single-instance assumed)
350    /// - Postgres: uses pg_try_advisory_lock (only one instance wins)
351    fn try_acquire_scheduler_lock(
352        &self,
353    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
354}
355
356/// API key metadata (hash is never exposed).
357#[derive(Clone, Debug, serde::Serialize)]
358pub struct ApiKeyRecord {
359    pub prefix: String,
360    pub label: Option<String>,
361    pub created_at: f64,
362}
363
364/// Namespace record.
365#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
366pub struct NamespaceRecord {
367    pub name: String,
368    pub created_at: f64,
369}
370
371/// Namespace-level statistics.
372#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
373pub struct NamespaceStats {
374    pub namespace: String,
375    pub total_workflows: i64,
376    pub running: i64,
377    pub pending: i64,
378    pub completed: i64,
379    pub failed: i64,
380    pub schedules: i64,
381    pub workers: i64,
382}
383
384/// Task queue statistics.
385#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
386pub struct QueueStats {
387    pub queue: String,
388    pub pending_activities: i64,
389    pub running_activities: i64,
390    pub workers: i64,
391}