Skip to main content

assay_workflow/store/
mod.rs

1pub mod postgres;
2pub mod sqlite;
3
4use std::future::Future;
5
6use crate::types::*;
7
8/// Core storage trait for the workflow engine.
9///
10/// All database access goes through this trait. Methods that operate on
11/// namespace-scoped data take a `namespace` parameter. The "main"
12/// namespace is always available.
13///
14/// All methods return `Send` futures so they can be used from `tokio::spawn`.
15pub trait WorkflowStore: Send + Sync + 'static {
16    // ── Namespaces ─────────────────────────────────────────
17
18    fn create_namespace(
19        &self,
20        name: &str,
21    ) -> impl Future<Output = anyhow::Result<()>> + Send;
22
23    fn list_namespaces(
24        &self,
25    ) -> impl Future<Output = anyhow::Result<Vec<NamespaceRecord>>> + Send;
26
27    fn delete_namespace(
28        &self,
29        name: &str,
30    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
31
32    fn get_namespace_stats(
33        &self,
34        namespace: &str,
35    ) -> impl Future<Output = anyhow::Result<NamespaceStats>> + Send;
36
37    // ── Workflows ──────────────────────────────────────────
38
39    fn create_workflow(
40        &self,
41        workflow: &WorkflowRecord,
42    ) -> impl Future<Output = anyhow::Result<()>> + Send;
43
44    fn get_workflow(
45        &self,
46        id: &str,
47    ) -> impl Future<Output = anyhow::Result<Option<WorkflowRecord>>> + Send;
48
49    fn list_workflows(
50        &self,
51        namespace: &str,
52        status: Option<WorkflowStatus>,
53        workflow_type: Option<&str>,
54        search_attrs_filter: Option<&str>,
55        limit: i64,
56        offset: i64,
57    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowRecord>>> + Send;
58
59    /// List workflows in terminal states whose `completed_at` is older than
60    /// `cutoff` and which haven't been archived yet. Used by the optional
61    /// S3 archival background task to batch candidates.
62    fn list_archivable_workflows(
63        &self,
64        cutoff: f64,
65        limit: i64,
66    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowRecord>>> + Send;
67
68    /// Mark a workflow as archived (records `archived_at` + `archive_uri`)
69    /// and purge its events, activities, timers, signals, and snapshots.
70    /// The workflow record itself is preserved so `GET /workflows/{id}`
71    /// still resolves with an archive pointer.
72    fn mark_archived_and_purge(
73        &self,
74        workflow_id: &str,
75        archive_uri: &str,
76        archived_at: f64,
77    ) -> impl Future<Output = anyhow::Result<()>> + Send;
78
79    /// Merge a JSON object patch into the workflow's `search_attributes`.
80    /// Keys in the patch overwrite existing keys; keys already present but
81    /// not in the patch are preserved. If the current column is NULL, the
82    /// patch becomes the new value.
83    fn upsert_search_attributes(
84        &self,
85        workflow_id: &str,
86        patch_json: &str,
87    ) -> impl Future<Output = anyhow::Result<()>> + Send;
88
89    fn update_workflow_status(
90        &self,
91        id: &str,
92        status: WorkflowStatus,
93        result: Option<&str>,
94        error: Option<&str>,
95    ) -> impl Future<Output = anyhow::Result<()>> + Send;
96
97    fn claim_workflow(
98        &self,
99        id: &str,
100        worker_id: &str,
101    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
102
103    // ── Workflow-task dispatch (Phase 9) ────────────────────
104
105    /// Mark a workflow as having new events that need a worker to replay it.
106    /// Idempotent — calling repeatedly is fine. Cleared by `claim_workflow_task`.
107    fn mark_workflow_dispatchable(
108        &self,
109        workflow_id: &str,
110    ) -> impl Future<Output = anyhow::Result<()>> + Send;
111
112    /// Atomically claim the oldest dispatchable workflow on a queue. Sets
113    /// `dispatch_claimed_by` and `dispatch_last_heartbeat`, clears
114    /// `needs_dispatch`. Returns the workflow record or None if nothing
115    /// is available.
116    fn claim_workflow_task(
117        &self,
118        task_queue: &str,
119        worker_id: &str,
120    ) -> impl Future<Output = anyhow::Result<Option<WorkflowRecord>>> + Send;
121
122    /// Release a workflow task's dispatch lease (called when the worker
123    /// submits its commands batch). Only succeeds if `dispatch_claimed_by`
124    /// matches the calling worker.
125    fn release_workflow_task(
126        &self,
127        workflow_id: &str,
128        worker_id: &str,
129    ) -> impl Future<Output = anyhow::Result<()>> + Send;
130
131    /// Forcibly release dispatch leases whose worker hasn't heartbeat'd
132    /// within `timeout_secs`. Used by the engine's background poller to
133    /// recover from worker crashes. Returns how many leases were released
134    /// (each becomes claimable again, with `needs_dispatch=true`).
135    fn release_stale_dispatch_leases(
136        &self,
137        now: f64,
138        timeout_secs: f64,
139    ) -> impl Future<Output = anyhow::Result<u64>> + Send;
140
141    // ── Events ─────────────────────────────────────────────
142
143    fn append_event(
144        &self,
145        event: &WorkflowEvent,
146    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
147
148    fn list_events(
149        &self,
150        workflow_id: &str,
151    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowEvent>>> + Send;
152
153    fn get_event_count(
154        &self,
155        workflow_id: &str,
156    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
157
158    // ── Activities ──────────────────────────────────────────
159
160    fn create_activity(
161        &self,
162        activity: &WorkflowActivity,
163    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
164
165    /// Look up an activity by its primary key.
166    fn get_activity(
167        &self,
168        id: i64,
169    ) -> impl Future<Output = anyhow::Result<Option<WorkflowActivity>>> + Send;
170
171    /// Look up an activity by its workflow-relative sequence number.
172    /// Used for idempotent scheduling: the engine checks if (workflow_id, seq)
173    /// already exists before creating a new row.
174    fn get_activity_by_workflow_seq(
175        &self,
176        workflow_id: &str,
177        seq: i32,
178    ) -> impl Future<Output = anyhow::Result<Option<WorkflowActivity>>> + Send;
179
180    fn claim_activity(
181        &self,
182        task_queue: &str,
183        worker_id: &str,
184    ) -> impl Future<Output = anyhow::Result<Option<WorkflowActivity>>> + Send;
185
186    /// Re-queue an activity for retry: clears the running state
187    /// (status→PENDING, claimed_by/started_at cleared), bumps `attempt`,
188    /// and sets `scheduled_at = now + backoff` so the next claim_activity
189    /// won't pick it up before the backoff elapses.
190    fn requeue_activity_for_retry(
191        &self,
192        id: i64,
193        next_attempt: i32,
194        next_scheduled_at: f64,
195    ) -> impl Future<Output = anyhow::Result<()>> + Send;
196
197    fn complete_activity(
198        &self,
199        id: i64,
200        result: Option<&str>,
201        error: Option<&str>,
202        failed: bool,
203    ) -> impl Future<Output = anyhow::Result<()>> + Send;
204
205    fn heartbeat_activity(
206        &self,
207        id: i64,
208        details: Option<&str>,
209    ) -> impl Future<Output = anyhow::Result<()>> + Send;
210
211    fn get_timed_out_activities(
212        &self,
213        now: f64,
214    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowActivity>>> + Send;
215
216    // ── Timers ──────────────────────────────────────────────
217
218    /// Mark all PENDING activities of a workflow as CANCELLED so workers
219    /// that haven't claimed them yet won't pick them up. Returns the
220    /// number of rows affected. Does NOT touch RUNNING activities — those
221    /// will see the cancellation when they next heartbeat or complete.
222    fn cancel_pending_activities(
223        &self,
224        workflow_id: &str,
225    ) -> impl Future<Output = anyhow::Result<u64>> + Send;
226
227    /// Mark all unfired timers of a workflow as fired without firing
228    /// (effectively removing them from the timer poller). Returns the
229    /// number of rows affected.
230    fn cancel_pending_timers(
231        &self,
232        workflow_id: &str,
233    ) -> impl Future<Output = anyhow::Result<u64>> + Send;
234
235    fn create_timer(
236        &self,
237        timer: &WorkflowTimer,
238    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
239
240    /// Look up an existing timer by its workflow-relative seq. Used by the
241    /// engine for idempotent ScheduleTimer (deterministic replay can call
242    /// schedule_timer for the same seq more than once on retries).
243    fn get_timer_by_workflow_seq(
244        &self,
245        workflow_id: &str,
246        seq: i32,
247    ) -> impl Future<Output = anyhow::Result<Option<WorkflowTimer>>> + Send;
248
249    fn fire_due_timers(
250        &self,
251        now: f64,
252    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowTimer>>> + Send;
253
254    // ── Signals ─────────────────────────────────────────────
255
256    fn send_signal(
257        &self,
258        signal: &WorkflowSignal,
259    ) -> impl Future<Output = anyhow::Result<i64>> + Send;
260
261    fn consume_signals(
262        &self,
263        workflow_id: &str,
264        name: &str,
265    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowSignal>>> + Send;
266
267    // ── Schedules ───────────────────────────────────────────
268
269    fn create_schedule(
270        &self,
271        schedule: &WorkflowSchedule,
272    ) -> impl Future<Output = anyhow::Result<()>> + Send;
273
274    fn get_schedule(
275        &self,
276        namespace: &str,
277        name: &str,
278    ) -> impl Future<Output = anyhow::Result<Option<WorkflowSchedule>>> + Send;
279
280    fn list_schedules(
281        &self,
282        namespace: &str,
283    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowSchedule>>> + Send;
284
285    fn update_schedule_last_run(
286        &self,
287        namespace: &str,
288        name: &str,
289        last_run_at: f64,
290        next_run_at: f64,
291        workflow_id: &str,
292    ) -> impl Future<Output = anyhow::Result<()>> + Send;
293
294    fn delete_schedule(
295        &self,
296        namespace: &str,
297        name: &str,
298    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
299
300    /// Apply an in-place patch to a schedule. Only fields present on
301    /// `patch` are updated; the rest keep their current values. Returns
302    /// the updated record, or `None` if the schedule doesn't exist.
303    ///
304    /// The scheduler's `next_run_at` is recomputed from the new
305    /// `cron_expr` + `timezone` on the next evaluation tick, so a PATCH
306    /// takes effect within the scheduler's poll interval.
307    fn update_schedule(
308        &self,
309        namespace: &str,
310        name: &str,
311        patch: &SchedulePatch,
312    ) -> impl Future<Output = anyhow::Result<Option<WorkflowSchedule>>> + Send;
313
314    /// Flip a schedule's `paused` flag. Returns the updated record, or
315    /// `None` if the schedule doesn't exist.
316    ///
317    /// A paused schedule is skipped by the scheduler; resuming it
318    /// doesn't backfill missed fires — the next fire is whatever the
319    /// cron expression says, starting from now.
320    fn set_schedule_paused(
321        &self,
322        namespace: &str,
323        name: &str,
324        paused: bool,
325    ) -> impl Future<Output = anyhow::Result<Option<WorkflowSchedule>>> + Send;
326
327    // ── Workers ─────────────────────────────────────────────
328
329    fn register_worker(
330        &self,
331        worker: &WorkflowWorker,
332    ) -> impl Future<Output = anyhow::Result<()>> + Send;
333
334    fn heartbeat_worker(
335        &self,
336        id: &str,
337        now: f64,
338    ) -> impl Future<Output = anyhow::Result<()>> + Send;
339
340    fn list_workers(
341        &self,
342        namespace: &str,
343    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowWorker>>> + Send;
344
345    fn remove_dead_workers(
346        &self,
347        cutoff: f64,
348    ) -> impl Future<Output = anyhow::Result<Vec<String>>> + Send;
349
350    // ── API Keys ────────────────────────────────────────────
351
352    fn create_api_key(
353        &self,
354        key_hash: &str,
355        prefix: &str,
356        label: Option<&str>,
357        created_at: f64,
358    ) -> impl Future<Output = anyhow::Result<()>> + Send;
359
360    fn validate_api_key(
361        &self,
362        key_hash: &str,
363    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
364
365    fn list_api_keys(
366        &self,
367    ) -> impl Future<Output = anyhow::Result<Vec<ApiKeyRecord>>> + Send;
368
369    fn revoke_api_key(
370        &self,
371        prefix: &str,
372    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
373
374    /// Return true iff the `api_keys` table has no rows. Used by the HTTP layer
375    /// to identify the first-ever key-creation window (where the `POST
376    /// /api/v1/api-keys` endpoint is callable without authentication).
377    fn api_keys_empty(&self) -> impl Future<Output = anyhow::Result<bool>> + Send;
378
379    /// Find an existing API key by its label. Returns None if no key has this
380    /// label (or `label` is NULL). Used to implement idempotent key creation:
381    /// a second `POST /api/v1/api-keys { label, idempotent: true }` call hits
382    /// this method and returns the existing record's metadata (without a
383    /// plaintext, which is only ever retrievable at generation time).
384    fn get_api_key_by_label(
385        &self,
386        label: &str,
387    ) -> impl Future<Output = anyhow::Result<Option<ApiKeyRecord>>> + Send;
388
389    // ── Child Workflows ─────────────────────────────────────
390
391    fn list_child_workflows(
392        &self,
393        parent_id: &str,
394    ) -> impl Future<Output = anyhow::Result<Vec<WorkflowRecord>>> + Send;
395
396    // ── Snapshots ───────────────────────────────────────────
397
398    fn create_snapshot(
399        &self,
400        workflow_id: &str,
401        event_seq: i32,
402        state_json: &str,
403    ) -> impl Future<Output = anyhow::Result<()>> + Send;
404
405    fn get_latest_snapshot(
406        &self,
407        workflow_id: &str,
408    ) -> impl Future<Output = anyhow::Result<Option<WorkflowSnapshot>>> + Send;
409
410    // ── Queue Stats ─────────────────────────────────────────
411
412    fn get_queue_stats(
413        &self,
414        namespace: &str,
415    ) -> impl Future<Output = anyhow::Result<Vec<QueueStats>>> + Send;
416
417    // ── Leader Election ─────────────────────────────────────
418
419    /// Try to acquire the scheduler lock for leader election.
420    /// Returns true if this instance should run the cron scheduler.
421    ///
422    /// - SQLite: always returns true (single-instance assumed)
423    /// - Postgres: uses pg_try_advisory_lock (only one instance wins)
424    fn try_acquire_scheduler_lock(
425        &self,
426    ) -> impl Future<Output = anyhow::Result<bool>> + Send;
427}
428
429/// API key metadata (hash is never exposed).
430#[derive(Clone, Debug, serde::Serialize, utoipa::ToSchema)]
431pub struct ApiKeyRecord {
432    pub prefix: String,
433    pub label: Option<String>,
434    pub created_at: f64,
435}
436
437/// Namespace record.
438#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
439pub struct NamespaceRecord {
440    pub name: String,
441    pub created_at: f64,
442}
443
444/// Namespace-level statistics.
445#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
446pub struct NamespaceStats {
447    pub namespace: String,
448    pub total_workflows: i64,
449    pub running: i64,
450    pub pending: i64,
451    pub completed: i64,
452    pub failed: i64,
453    pub schedules: i64,
454    pub workers: i64,
455}
456
457/// Task queue statistics.
458#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, utoipa::ToSchema)]
459pub struct QueueStats {
460    pub queue: String,
461    pub pending_activities: i64,
462    pub running_activities: i64,
463    pub workers: i64,
464}