Skip to main content

assay_workflow/store/
postgres.rs

1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9    name            TEXT PRIMARY KEY,
10    created_at      DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13    VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14    ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17    id              TEXT PRIMARY KEY,
18    namespace       TEXT NOT NULL DEFAULT 'main',
19    run_id          TEXT NOT NULL,
20    workflow_type   TEXT NOT NULL,
21    task_queue      TEXT NOT NULL DEFAULT 'main',
22    status          TEXT NOT NULL DEFAULT 'PENDING',
23    input           TEXT,
24    result          TEXT,
25    error           TEXT,
26    parent_id       TEXT,
27    claimed_by      TEXT,
28    -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
29    needs_dispatch  BOOLEAN NOT NULL DEFAULT FALSE,
30    dispatch_claimed_by    TEXT,
31    dispatch_last_heartbeat DOUBLE PRECISION,
32    created_at      DOUBLE PRECISION NOT NULL,
33    updated_at      DOUBLE PRECISION NOT NULL,
34    completed_at    DOUBLE PRECISION
35);
36CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
37CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
38CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
39
40CREATE TABLE IF NOT EXISTS workflow_events (
41    id              BIGSERIAL PRIMARY KEY,
42    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
43    seq             INTEGER NOT NULL,
44    event_type      TEXT NOT NULL,
45    payload         TEXT,
46    timestamp       DOUBLE PRECISION NOT NULL
47);
48CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
49
50CREATE TABLE IF NOT EXISTS workflow_activities (
51    id              BIGSERIAL PRIMARY KEY,
52    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
53    seq             INTEGER NOT NULL,
54    name            TEXT NOT NULL,
55    task_queue      TEXT NOT NULL DEFAULT 'main',
56    input           TEXT,
57    status          TEXT NOT NULL DEFAULT 'PENDING',
58    result          TEXT,
59    error           TEXT,
60    attempt         INTEGER NOT NULL DEFAULT 1,
61    max_attempts    INTEGER NOT NULL DEFAULT 3,
62    initial_interval_secs   DOUBLE PRECISION NOT NULL DEFAULT 1,
63    backoff_coefficient     DOUBLE PRECISION NOT NULL DEFAULT 2,
64    start_to_close_secs     DOUBLE PRECISION NOT NULL DEFAULT 300,
65    heartbeat_timeout_secs  DOUBLE PRECISION,
66    claimed_by      TEXT,
67    scheduled_at    DOUBLE PRECISION NOT NULL,
68    started_at      DOUBLE PRECISION,
69    completed_at    DOUBLE PRECISION,
70    last_heartbeat  DOUBLE PRECISION,
71    UNIQUE (workflow_id, seq)
72);
73CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
74
75CREATE TABLE IF NOT EXISTS workflow_timers (
76    id              BIGSERIAL PRIMARY KEY,
77    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
78    seq             INTEGER NOT NULL,
79    fire_at         DOUBLE PRECISION NOT NULL,
80    fired           BOOLEAN NOT NULL DEFAULT FALSE,
81    UNIQUE (workflow_id, seq)
82);
83CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
84
85CREATE TABLE IF NOT EXISTS workflow_signals (
86    id              BIGSERIAL PRIMARY KEY,
87    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
88    name            TEXT NOT NULL,
89    payload         TEXT,
90    consumed        BOOLEAN NOT NULL DEFAULT FALSE,
91    received_at     DOUBLE PRECISION NOT NULL
92);
93CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
94
95CREATE TABLE IF NOT EXISTS workflow_schedules (
96    namespace       TEXT NOT NULL DEFAULT 'main',
97    name            TEXT NOT NULL,
98    workflow_type   TEXT NOT NULL,
99    cron_expr       TEXT NOT NULL,
100    input           TEXT,
101    task_queue      TEXT NOT NULL DEFAULT 'main',
102    overlap_policy  TEXT NOT NULL DEFAULT 'skip',
103    paused          BOOLEAN NOT NULL DEFAULT FALSE,
104    last_run_at     DOUBLE PRECISION,
105    next_run_at     DOUBLE PRECISION,
106    last_workflow_id TEXT,
107    created_at      DOUBLE PRECISION NOT NULL,
108    PRIMARY KEY (namespace, name)
109);
110
111CREATE TABLE IF NOT EXISTS workflow_workers (
112    id              TEXT PRIMARY KEY,
113    namespace       TEXT NOT NULL DEFAULT 'main',
114    identity        TEXT NOT NULL,
115    task_queue      TEXT NOT NULL,
116    workflows       TEXT,
117    activities      TEXT,
118    max_concurrent_workflows  INTEGER NOT NULL DEFAULT 10,
119    max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
120    active_tasks    INTEGER NOT NULL DEFAULT 0,
121    last_heartbeat  DOUBLE PRECISION NOT NULL,
122    registered_at   DOUBLE PRECISION NOT NULL
123);
124
125CREATE TABLE IF NOT EXISTS workflow_snapshots (
126    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
127    event_seq       INTEGER NOT NULL,
128    state_json      TEXT NOT NULL,
129    created_at      DOUBLE PRECISION NOT NULL,
130    PRIMARY KEY (workflow_id, event_seq)
131);
132
133CREATE TABLE IF NOT EXISTS api_keys (
134    key_hash        TEXT PRIMARY KEY,
135    prefix          TEXT NOT NULL,
136    label           TEXT,
137    created_at      DOUBLE PRECISION NOT NULL
138);
139CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
140"#;
141
142pub struct PostgresStore {
143    pool: PgPool,
144}
145
146impl PostgresStore {
147    pub async fn new(url: &str) -> Result<Self> {
148        let pool = PgPool::connect(url).await?;
149        let store = Self { pool };
150        store.migrate().await?;
151        Ok(store)
152    }
153
154    async fn migrate(&self) -> Result<()> {
155        // Execute each statement separately — Postgres handles CREATE IF NOT EXISTS
156        for statement in SCHEMA.split(';') {
157            let trimmed = statement.trim();
158            if !trimmed.is_empty() {
159                sqlx::query(trimmed).execute(&self.pool).await?;
160            }
161        }
162        Ok(())
163    }
164
165    /// Try to acquire pg_advisory_lock for leader election.
166    /// Returns true if this instance is the leader (scheduler should run).
167    pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
168        let row: (bool,) =
169            sqlx::query_as("SELECT pg_try_advisory_lock(1)")
170                .fetch_one(&self.pool)
171                .await?;
172        Ok(row.0)
173    }
174}
175
176impl WorkflowStore for PostgresStore {
177    // ── Namespaces ─────────────────────────────────────────
178
179    async fn create_namespace(&self, name: &str) -> Result<()> {
180        sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
181            .bind(name)
182            .execute(&self.pool)
183            .await?;
184        Ok(())
185    }
186
187    async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
188        let rows = sqlx::query_as::<_, (String, f64)>(
189            "SELECT name, created_at FROM namespaces ORDER BY name",
190        )
191        .fetch_all(&self.pool)
192        .await?;
193        Ok(rows
194            .into_iter()
195            .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
196            .collect())
197    }
198
199    async fn delete_namespace(&self, name: &str) -> Result<bool> {
200        let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
201            .bind(name)
202            .execute(&self.pool)
203            .await?;
204        Ok(res.rows_affected() > 0)
205    }
206
207    async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
208        let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
209            .bind(namespace)
210            .fetch_one(&self.pool)
211            .await?;
212        let running: (i64,) = sqlx::query_as(
213            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
214        )
215        .bind(namespace)
216        .fetch_one(&self.pool)
217        .await?;
218        let pending: (i64,) = sqlx::query_as(
219            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
220        )
221        .bind(namespace)
222        .fetch_one(&self.pool)
223        .await?;
224        let completed: (i64,) = sqlx::query_as(
225            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
226        )
227        .bind(namespace)
228        .fetch_one(&self.pool)
229        .await?;
230        let failed: (i64,) = sqlx::query_as(
231            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
232        )
233        .bind(namespace)
234        .fetch_one(&self.pool)
235        .await?;
236        let schedules: (i64,) =
237            sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
238                .bind(namespace)
239                .fetch_one(&self.pool)
240                .await?;
241        let workers: (i64,) =
242            sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
243                .bind(namespace)
244                .fetch_one(&self.pool)
245                .await?;
246
247        Ok(crate::store::NamespaceStats {
248            namespace: namespace.to_string(),
249            total_workflows: total.0,
250            running: running.0,
251            pending: pending.0,
252            completed: completed.0,
253            failed: failed.0,
254            schedules: schedules.0,
255            workers: workers.0,
256        })
257    }
258
259    // ── Workflows ──────────────────────────────────────────
260
261    async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
262        sqlx::query(
263            "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at)
264             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
265        )
266        .bind(&wf.id)
267        .bind(&wf.namespace)
268        .bind(&wf.run_id)
269        .bind(&wf.workflow_type)
270        .bind(&wf.task_queue)
271        .bind(&wf.status)
272        .bind(&wf.input)
273        .bind(&wf.result)
274        .bind(&wf.error)
275        .bind(&wf.parent_id)
276        .bind(&wf.claimed_by)
277        .bind(wf.created_at)
278        .bind(wf.updated_at)
279        .bind(wf.completed_at)
280        .execute(&self.pool)
281        .await?;
282        Ok(())
283    }
284
285    async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
286        let row = sqlx::query_as::<_, PgWorkflowRow>(
287            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
288        )
289        .bind(id)
290        .fetch_optional(&self.pool)
291        .await?;
292        Ok(row.map(Into::into))
293    }
294
295    async fn list_workflows(
296        &self,
297        namespace: &str,
298        status: Option<WorkflowStatus>,
299        workflow_type: Option<&str>,
300        limit: i64,
301        offset: i64,
302    ) -> Result<Vec<WorkflowRecord>> {
303        let status_str = status.map(|s| s.to_string());
304        let rows = sqlx::query_as::<_, PgWorkflowRow>(
305            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
306             FROM workflows
307             WHERE namespace = $1
308               AND ($2::TEXT IS NULL OR status = $2)
309               AND ($3::TEXT IS NULL OR workflow_type = $3)
310             ORDER BY created_at DESC
311             LIMIT $4 OFFSET $5",
312        )
313        .bind(namespace)
314        .bind(&status_str)
315        .bind(workflow_type)
316        .bind(limit)
317        .bind(offset)
318        .fetch_all(&self.pool)
319        .await?;
320        Ok(rows.into_iter().map(Into::into).collect())
321    }
322
323    async fn update_workflow_status(
324        &self,
325        id: &str,
326        status: WorkflowStatus,
327        result: Option<&str>,
328        error: Option<&str>,
329    ) -> Result<()> {
330        let now = timestamp_now();
331        let completed_at = if status.is_terminal() { Some(now) } else { None };
332        sqlx::query(
333            "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
334        )
335        .bind(status.to_string())
336        .bind(result)
337        .bind(error)
338        .bind(now)
339        .bind(completed_at)
340        .bind(id)
341        .execute(&self.pool)
342        .await?;
343        Ok(())
344    }
345
346    async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
347        let res = sqlx::query(
348            "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
349        )
350        .bind(worker_id)
351        .bind(timestamp_now())
352        .bind(id)
353        .execute(&self.pool)
354        .await?;
355        Ok(res.rows_affected() > 0)
356    }
357
358    async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
359        sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
360            .bind(workflow_id)
361            .execute(&self.pool)
362            .await?;
363        Ok(())
364    }
365
366    async fn claim_workflow_task(
367        &self,
368        task_queue: &str,
369        worker_id: &str,
370    ) -> Result<Option<WorkflowRecord>> {
371        let now = timestamp_now();
372        // Atomic claim with FOR UPDATE SKIP LOCKED so multiple engine
373        // replicas don't fight over the same workflow task.
374        let row = sqlx::query_as::<_, PgWorkflowRow>(
375            "UPDATE workflows
376             SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
377             WHERE id = (
378                SELECT id FROM workflows
379                WHERE task_queue = $3
380                  AND needs_dispatch = TRUE
381                  AND dispatch_claimed_by IS NULL
382                  AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
383                ORDER BY updated_at ASC
384                FOR UPDATE SKIP LOCKED
385                LIMIT 1
386             )
387             RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at",
388        )
389        .bind(worker_id)
390        .bind(now)
391        .bind(task_queue)
392        .fetch_optional(&self.pool)
393        .await?;
394        Ok(row.map(Into::into))
395    }
396
397    async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
398        sqlx::query(
399            "UPDATE workflows
400             SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
401             WHERE id = $1 AND dispatch_claimed_by = $2",
402        )
403        .bind(workflow_id)
404        .bind(worker_id)
405        .execute(&self.pool)
406        .await?;
407        Ok(())
408    }
409
410    async fn release_stale_dispatch_leases(
411        &self,
412        now: f64,
413        timeout_secs: f64,
414    ) -> Result<u64> {
415        let res = sqlx::query(
416            "UPDATE workflows
417             SET dispatch_claimed_by = NULL,
418                 dispatch_last_heartbeat = NULL,
419                 needs_dispatch = TRUE
420             WHERE dispatch_claimed_by IS NOT NULL
421               AND ($1 - dispatch_last_heartbeat) > $2
422               AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
423        )
424        .bind(now)
425        .bind(timeout_secs)
426        .execute(&self.pool)
427        .await?;
428        Ok(res.rows_affected())
429    }
430
431    // ── Events ─────────────────────────────────────────────
432
433    async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
434        let row: (i64,) = sqlx::query_as(
435            "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
436        )
437        .bind(&ev.workflow_id)
438        .bind(ev.seq)
439        .bind(&ev.event_type)
440        .bind(&ev.payload)
441        .bind(ev.timestamp)
442        .fetch_one(&self.pool)
443        .await?;
444        Ok(row.0)
445    }
446
447    async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
448        let rows = sqlx::query_as::<_, PgEventRow>(
449            "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
450        )
451        .bind(workflow_id)
452        .fetch_all(&self.pool)
453        .await?;
454        Ok(rows.into_iter().map(Into::into).collect())
455    }
456
457    async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
458        let row: (i64,) =
459            sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
460                .bind(workflow_id)
461                .fetch_one(&self.pool)
462                .await?;
463        Ok(row.0)
464    }
465
466    // ── Activities ──────────────────────────────────────────
467
468    async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
469        let row: (i64,) = sqlx::query_as(
470            "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
471             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
472        )
473        .bind(&act.workflow_id)
474        .bind(act.seq)
475        .bind(&act.name)
476        .bind(&act.task_queue)
477        .bind(&act.input)
478        .bind(&act.status)
479        .bind(act.attempt)
480        .bind(act.max_attempts)
481        .bind(act.initial_interval_secs)
482        .bind(act.backoff_coefficient)
483        .bind(act.start_to_close_secs)
484        .bind(act.heartbeat_timeout_secs)
485        .bind(act.scheduled_at)
486        .fetch_one(&self.pool)
487        .await?;
488        Ok(row.0)
489    }
490
491    async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
492        let row = sqlx::query_as::<_, PgActivityRow>(
493            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
494             FROM workflow_activities WHERE id = $1",
495        )
496        .bind(id)
497        .fetch_optional(&self.pool)
498        .await?;
499        Ok(row.map(Into::into))
500    }
501
502    async fn get_activity_by_workflow_seq(
503        &self,
504        workflow_id: &str,
505        seq: i32,
506    ) -> Result<Option<WorkflowActivity>> {
507        let row = sqlx::query_as::<_, PgActivityRow>(
508            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
509             FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
510        )
511        .bind(workflow_id)
512        .bind(seq)
513        .fetch_optional(&self.pool)
514        .await?;
515        Ok(row.map(Into::into))
516    }
517
518    async fn claim_activity(
519        &self,
520        task_queue: &str,
521        worker_id: &str,
522    ) -> Result<Option<WorkflowActivity>> {
523        let now = timestamp_now();
524        // Atomic claim using FOR UPDATE SKIP LOCKED — prevents contention
525        // between multiple assay serve instances claiming the same activity
526        let row = sqlx::query_as::<_, PgActivityRow>(
527            "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
528             WHERE id = (
529                SELECT id FROM workflow_activities
530                WHERE task_queue = $3 AND status = 'PENDING'
531                ORDER BY scheduled_at ASC
532                FOR UPDATE SKIP LOCKED
533                LIMIT 1
534             )
535             RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
536        )
537        .bind(worker_id)
538        .bind(now)
539        .bind(task_queue)
540        .fetch_optional(&self.pool)
541        .await?;
542        Ok(row.map(Into::into))
543    }
544
545    async fn requeue_activity_for_retry(
546        &self,
547        id: i64,
548        next_attempt: i32,
549        next_scheduled_at: f64,
550    ) -> Result<()> {
551        sqlx::query(
552            "UPDATE workflow_activities
553             SET status = 'PENDING', attempt = $1, scheduled_at = $2,
554                 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
555                 error = NULL
556             WHERE id = $3",
557        )
558        .bind(next_attempt)
559        .bind(next_scheduled_at)
560        .bind(id)
561        .execute(&self.pool)
562        .await?;
563        Ok(())
564    }
565
566    async fn complete_activity(
567        &self,
568        id: i64,
569        result: Option<&str>,
570        error: Option<&str>,
571        failed: bool,
572    ) -> Result<()> {
573        let status = if failed { "FAILED" } else { "COMPLETED" };
574        sqlx::query(
575            "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
576        )
577        .bind(status)
578        .bind(result)
579        .bind(error)
580        .bind(timestamp_now())
581        .bind(id)
582        .execute(&self.pool)
583        .await?;
584        Ok(())
585    }
586
587    async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
588        sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
589            .bind(timestamp_now())
590            .bind(id)
591            .execute(&self.pool)
592            .await?;
593        Ok(())
594    }
595
596    async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
597        let rows = sqlx::query_as::<_, PgActivityRow>(
598            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
599             FROM workflow_activities
600             WHERE status = 'RUNNING'
601               AND heartbeat_timeout_secs IS NOT NULL
602               AND last_heartbeat IS NOT NULL
603               AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
604        )
605        .bind(now)
606        .fetch_all(&self.pool)
607        .await?;
608        Ok(rows.into_iter().map(Into::into).collect())
609    }
610
611    // ── Timers ──────────────────────────────────────────────
612
613    async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
614        let row: (i64,) = sqlx::query_as(
615            "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
616        )
617        .bind(&timer.workflow_id)
618        .bind(timer.seq)
619        .bind(timer.fire_at)
620        .fetch_one(&self.pool)
621        .await?;
622        Ok(row.0)
623    }
624
625    async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
626        let res = sqlx::query(
627            "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
628             WHERE workflow_id = $2 AND status = 'PENDING'",
629        )
630        .bind(timestamp_now())
631        .bind(workflow_id)
632        .execute(&self.pool)
633        .await?;
634        Ok(res.rows_affected())
635    }
636
637    async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
638        let res = sqlx::query(
639            "UPDATE workflow_timers SET fired = TRUE
640             WHERE workflow_id = $1 AND fired = FALSE",
641        )
642        .bind(workflow_id)
643        .execute(&self.pool)
644        .await?;
645        Ok(res.rows_affected())
646    }
647
648    async fn get_timer_by_workflow_seq(
649        &self,
650        workflow_id: &str,
651        seq: i32,
652    ) -> Result<Option<WorkflowTimer>> {
653        let row = sqlx::query_as::<_, PgTimerRow>(
654            "SELECT id, workflow_id, seq, fire_at, fired
655             FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
656        )
657        .bind(workflow_id)
658        .bind(seq)
659        .fetch_optional(&self.pool)
660        .await?;
661        Ok(row.map(Into::into))
662    }
663
664    async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
665        let rows = sqlx::query_as::<_, PgTimerRow>(
666            "UPDATE workflow_timers SET fired = TRUE
667             WHERE fired = FALSE AND fire_at <= $1
668             RETURNING id, workflow_id, seq, fire_at, fired",
669        )
670        .bind(now)
671        .fetch_all(&self.pool)
672        .await?;
673        Ok(rows.into_iter().map(Into::into).collect())
674    }
675
676    // ── Signals ─────────────────────────────────────────────
677
678    async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
679        let row: (i64,) = sqlx::query_as(
680            "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
681        )
682        .bind(&sig.workflow_id)
683        .bind(&sig.name)
684        .bind(&sig.payload)
685        .bind(sig.received_at)
686        .fetch_one(&self.pool)
687        .await?;
688        Ok(row.0)
689    }
690
691    async fn consume_signals(
692        &self,
693        workflow_id: &str,
694        name: &str,
695    ) -> Result<Vec<WorkflowSignal>> {
696        let rows = sqlx::query_as::<_, PgSignalRow>(
697            "UPDATE workflow_signals SET consumed = TRUE
698             WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
699             RETURNING id, workflow_id, name, payload, consumed, received_at",
700        )
701        .bind(workflow_id)
702        .bind(name)
703        .fetch_all(&self.pool)
704        .await?;
705        Ok(rows.into_iter().map(Into::into).collect())
706    }
707
708    // ── Schedules ───────────────────────────────────────────
709
710    async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
711        sqlx::query(
712            "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
713             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
714        )
715        .bind(&sched.namespace)
716        .bind(&sched.name)
717        .bind(&sched.workflow_type)
718        .bind(&sched.cron_expr)
719        .bind(&sched.input)
720        .bind(&sched.task_queue)
721        .bind(&sched.overlap_policy)
722        .bind(sched.paused)
723        .bind(sched.last_run_at)
724        .bind(sched.next_run_at)
725        .bind(&sched.last_workflow_id)
726        .bind(sched.created_at)
727        .execute(&self.pool)
728        .await?;
729        Ok(())
730    }
731
732    async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
733        let row = sqlx::query_as::<_, PgScheduleRow>(
734            "SELECT namespace, name, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
735        )
736        .bind(namespace)
737        .bind(name)
738        .fetch_optional(&self.pool)
739        .await?;
740        Ok(row.map(Into::into))
741    }
742
743    async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
744        let rows = sqlx::query_as::<_, PgScheduleRow>(
745            "SELECT namespace, name, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
746        )
747        .bind(namespace)
748        .fetch_all(&self.pool)
749        .await?;
750        Ok(rows.into_iter().map(Into::into).collect())
751    }
752
753    async fn update_schedule_last_run(
754        &self,
755        namespace: &str,
756        name: &str,
757        last_run_at: f64,
758        next_run_at: f64,
759        workflow_id: &str,
760    ) -> Result<()> {
761        sqlx::query(
762            "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
763        )
764        .bind(last_run_at)
765        .bind(next_run_at)
766        .bind(workflow_id)
767        .bind(namespace)
768        .bind(name)
769        .execute(&self.pool)
770        .await?;
771        Ok(())
772    }
773
774    async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
775        let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
776            .bind(namespace)
777            .bind(name)
778            .execute(&self.pool)
779            .await?;
780        Ok(res.rows_affected() > 0)
781    }
782
783    // ── Workers ─────────────────────────────────────────────
784
785    async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
786        sqlx::query(
787            "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
788             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
789             ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
790        )
791        .bind(&w.id)
792        .bind(&w.namespace)
793        .bind(&w.identity)
794        .bind(&w.task_queue)
795        .bind(&w.workflows)
796        .bind(&w.activities)
797        .bind(w.max_concurrent_workflows)
798        .bind(w.max_concurrent_activities)
799        .bind(w.active_tasks)
800        .bind(w.last_heartbeat)
801        .bind(w.registered_at)
802        .execute(&self.pool)
803        .await?;
804        Ok(())
805    }
806
807    async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
808        sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
809            .bind(now)
810            .bind(id)
811            .execute(&self.pool)
812            .await?;
813        Ok(())
814    }
815
816    async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
817        let rows = sqlx::query_as::<_, PgWorkerRow>(
818            "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
819        )
820        .bind(namespace)
821        .fetch_all(&self.pool)
822        .await?;
823        Ok(rows.into_iter().map(Into::into).collect())
824    }
825
826    async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
827        let rows: Vec<(String,)> =
828            sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
829                .bind(cutoff)
830                .fetch_all(&self.pool)
831                .await?;
832        let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
833        if !ids.is_empty() {
834            sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
835                .bind(cutoff)
836                .execute(&self.pool)
837                .await?;
838        }
839        Ok(ids)
840    }
841
842    // ── API Keys ────────────────────────────────────────────
843
844    async fn create_api_key(
845        &self,
846        key_hash: &str,
847        prefix: &str,
848        label: Option<&str>,
849        created_at: f64,
850    ) -> Result<()> {
851        sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
852            .bind(key_hash)
853            .bind(prefix)
854            .bind(label)
855            .bind(created_at)
856            .execute(&self.pool)
857            .await?;
858        Ok(())
859    }
860
861    async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
862        let row: Option<(i64,)> =
863            sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
864                .bind(key_hash)
865                .fetch_optional(&self.pool)
866                .await?;
867        Ok(row.is_some())
868    }
869
870    async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
871        let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
872            "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
873        )
874        .fetch_all(&self.pool)
875        .await?;
876        Ok(rows
877            .into_iter()
878            .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
879                prefix,
880                label,
881                created_at,
882            })
883            .collect())
884    }
885
886    async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
887        let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
888            .bind(prefix)
889            .execute(&self.pool)
890            .await?;
891        Ok(res.rows_affected() > 0)
892    }
893
894    // ── Child Workflows ─────────────────────────────────────
895
896    async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
897        let rows = sqlx::query_as::<_, PgWorkflowRow>(
898            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
899             FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
900        )
901        .bind(parent_id)
902        .fetch_all(&self.pool)
903        .await?;
904        Ok(rows.into_iter().map(Into::into).collect())
905    }
906
907    // ── Snapshots ───────────────────────────────────────────
908
909    async fn create_snapshot(
910        &self,
911        workflow_id: &str,
912        event_seq: i32,
913        state_json: &str,
914    ) -> Result<()> {
915        sqlx::query(
916            "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
917             VALUES ($1, $2, $3, $4)
918             ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
919        )
920        .bind(workflow_id)
921        .bind(event_seq)
922        .bind(state_json)
923        .bind(timestamp_now())
924        .execute(&self.pool)
925        .await?;
926        Ok(())
927    }
928
929    async fn get_latest_snapshot(
930        &self,
931        workflow_id: &str,
932    ) -> Result<Option<WorkflowSnapshot>> {
933        let row = sqlx::query_as::<_, (String, i32, String, f64)>(
934            "SELECT workflow_id, event_seq, state_json, created_at
935             FROM workflow_snapshots WHERE workflow_id = $1
936             ORDER BY event_seq DESC LIMIT 1",
937        )
938        .bind(workflow_id)
939        .fetch_optional(&self.pool)
940        .await?;
941
942        Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
943            workflow_id,
944            event_seq,
945            state_json,
946            created_at,
947        }))
948    }
949
950    // ── Queue Stats ─────────────────────────────────────────
951
952    async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
953        let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
954            "SELECT
955                a.task_queue AS queue,
956                SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
957                SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
958                (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
959             FROM workflow_activities a
960             JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
961             GROUP BY a.task_queue",
962        )
963        .bind(namespace)
964        .fetch_all(&self.pool)
965        .await?;
966
967        Ok(rows
968            .into_iter()
969            .map(|(queue, pending, running, workers)| crate::store::QueueStats {
970                queue,
971                pending_activities: pending,
972                running_activities: running,
973                workers,
974            })
975            .collect())
976    }
977
978    // ── Leader Election ─────────────────────────────────────
979
980    async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
981        // pg_try_advisory_lock is session-scoped — only one connection
982        // in the pool will hold the lock. In a multi-replica Kubernetes
983        // deployment, only one pod's connection wins.
984        let row: (bool,) =
985            sqlx::query_as("SELECT pg_try_advisory_lock(42)")
986                .fetch_one(&self.pool)
987                .await?;
988        Ok(row.0)
989    }
990}
991
992fn timestamp_now() -> f64 {
993    std::time::SystemTime::now()
994        .duration_since(std::time::UNIX_EPOCH)
995        .unwrap()
996        .as_secs_f64()
997}
998
999// ── Postgres row types (sqlx::FromRow) ──────────────────────
1000
1001#[derive(sqlx::FromRow)]
1002struct PgWorkflowRow {
1003    id: String,
1004    namespace: String,
1005    run_id: String,
1006    workflow_type: String,
1007    task_queue: String,
1008    status: String,
1009    input: Option<String>,
1010    result: Option<String>,
1011    error: Option<String>,
1012    parent_id: Option<String>,
1013    claimed_by: Option<String>,
1014    created_at: f64,
1015    updated_at: f64,
1016    completed_at: Option<f64>,
1017}
1018
1019impl From<PgWorkflowRow> for WorkflowRecord {
1020    fn from(r: PgWorkflowRow) -> Self {
1021        Self {
1022            id: r.id,
1023            namespace: r.namespace,
1024            run_id: r.run_id,
1025            workflow_type: r.workflow_type,
1026            task_queue: r.task_queue,
1027            status: r.status,
1028            input: r.input,
1029            result: r.result,
1030            error: r.error,
1031            parent_id: r.parent_id,
1032            claimed_by: r.claimed_by,
1033            created_at: r.created_at,
1034            updated_at: r.updated_at,
1035            completed_at: r.completed_at,
1036        }
1037    }
1038}
1039
1040#[derive(sqlx::FromRow)]
1041struct PgEventRow {
1042    id: i64,
1043    workflow_id: String,
1044    seq: i32,
1045    event_type: String,
1046    payload: Option<String>,
1047    timestamp: f64,
1048}
1049
1050impl From<PgEventRow> for WorkflowEvent {
1051    fn from(r: PgEventRow) -> Self {
1052        Self {
1053            id: Some(r.id),
1054            workflow_id: r.workflow_id,
1055            seq: r.seq,
1056            event_type: r.event_type,
1057            payload: r.payload,
1058            timestamp: r.timestamp,
1059        }
1060    }
1061}
1062
1063#[derive(sqlx::FromRow)]
1064struct PgActivityRow {
1065    id: i64,
1066    workflow_id: String,
1067    seq: i32,
1068    name: String,
1069    task_queue: String,
1070    input: Option<String>,
1071    status: String,
1072    result: Option<String>,
1073    error: Option<String>,
1074    attempt: i32,
1075    max_attempts: i32,
1076    initial_interval_secs: f64,
1077    backoff_coefficient: f64,
1078    start_to_close_secs: f64,
1079    heartbeat_timeout_secs: Option<f64>,
1080    claimed_by: Option<String>,
1081    scheduled_at: f64,
1082    started_at: Option<f64>,
1083    completed_at: Option<f64>,
1084    last_heartbeat: Option<f64>,
1085}
1086
1087impl From<PgActivityRow> for WorkflowActivity {
1088    fn from(r: PgActivityRow) -> Self {
1089        Self {
1090            id: Some(r.id),
1091            workflow_id: r.workflow_id,
1092            seq: r.seq,
1093            name: r.name,
1094            task_queue: r.task_queue,
1095            input: r.input,
1096            status: r.status,
1097            result: r.result,
1098            error: r.error,
1099            attempt: r.attempt,
1100            max_attempts: r.max_attempts,
1101            initial_interval_secs: r.initial_interval_secs,
1102            backoff_coefficient: r.backoff_coefficient,
1103            start_to_close_secs: r.start_to_close_secs,
1104            heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1105            claimed_by: r.claimed_by,
1106            scheduled_at: r.scheduled_at,
1107            started_at: r.started_at,
1108            completed_at: r.completed_at,
1109            last_heartbeat: r.last_heartbeat,
1110        }
1111    }
1112}
1113
1114#[derive(sqlx::FromRow)]
1115struct PgTimerRow {
1116    id: i64,
1117    workflow_id: String,
1118    seq: i32,
1119    fire_at: f64,
1120    fired: bool,
1121}
1122
1123impl From<PgTimerRow> for WorkflowTimer {
1124    fn from(r: PgTimerRow) -> Self {
1125        Self {
1126            id: Some(r.id),
1127            workflow_id: r.workflow_id,
1128            seq: r.seq,
1129            fire_at: r.fire_at,
1130            fired: r.fired,
1131        }
1132    }
1133}
1134
1135#[derive(sqlx::FromRow)]
1136struct PgSignalRow {
1137    id: i64,
1138    workflow_id: String,
1139    name: String,
1140    payload: Option<String>,
1141    consumed: bool,
1142    received_at: f64,
1143}
1144
1145impl From<PgSignalRow> for WorkflowSignal {
1146    fn from(r: PgSignalRow) -> Self {
1147        Self {
1148            id: Some(r.id),
1149            workflow_id: r.workflow_id,
1150            name: r.name,
1151            payload: r.payload,
1152            consumed: r.consumed,
1153            received_at: r.received_at,
1154        }
1155    }
1156}
1157
1158#[derive(sqlx::FromRow)]
1159struct PgScheduleRow {
1160    namespace: String,
1161    name: String,
1162    workflow_type: String,
1163    cron_expr: String,
1164    input: Option<String>,
1165    task_queue: String,
1166    overlap_policy: String,
1167    paused: bool,
1168    last_run_at: Option<f64>,
1169    next_run_at: Option<f64>,
1170    last_workflow_id: Option<String>,
1171    created_at: f64,
1172}
1173
1174impl From<PgScheduleRow> for WorkflowSchedule {
1175    fn from(r: PgScheduleRow) -> Self {
1176        Self {
1177            namespace: r.namespace,
1178            name: r.name,
1179            workflow_type: r.workflow_type,
1180            cron_expr: r.cron_expr,
1181            input: r.input,
1182            task_queue: r.task_queue,
1183            overlap_policy: r.overlap_policy,
1184            paused: r.paused,
1185            last_run_at: r.last_run_at,
1186            next_run_at: r.next_run_at,
1187            last_workflow_id: r.last_workflow_id,
1188            created_at: r.created_at,
1189        }
1190    }
1191}
1192
1193#[derive(sqlx::FromRow)]
1194struct PgWorkerRow {
1195    id: String,
1196    namespace: String,
1197    identity: String,
1198    task_queue: String,
1199    workflows: Option<String>,
1200    activities: Option<String>,
1201    max_concurrent_workflows: i32,
1202    max_concurrent_activities: i32,
1203    active_tasks: i32,
1204    last_heartbeat: f64,
1205    registered_at: f64,
1206}
1207
1208impl From<PgWorkerRow> for WorkflowWorker {
1209    fn from(r: PgWorkerRow) -> Self {
1210        Self {
1211            id: r.id,
1212            namespace: r.namespace,
1213            identity: r.identity,
1214            task_queue: r.task_queue,
1215            workflows: r.workflows,
1216            activities: r.activities,
1217            max_concurrent_workflows: r.max_concurrent_workflows,
1218            max_concurrent_activities: r.max_concurrent_activities,
1219            active_tasks: r.active_tasks,
1220            last_heartbeat: r.last_heartbeat,
1221            registered_at: r.registered_at,
1222        }
1223    }
1224}