Skip to main content

assay_workflow/store/
postgres.rs

1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9    name            TEXT PRIMARY KEY,
10    created_at      DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13    VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14    ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17    id              TEXT PRIMARY KEY,
18    namespace       TEXT NOT NULL DEFAULT 'main',
19    run_id          TEXT NOT NULL,
20    workflow_type   TEXT NOT NULL,
21    task_queue      TEXT NOT NULL DEFAULT 'main',
22    status          TEXT NOT NULL DEFAULT 'PENDING',
23    input           TEXT,
24    result          TEXT,
25    error           TEXT,
26    parent_id       TEXT,
27    claimed_by      TEXT,
28    search_attributes TEXT,
29    archived_at     DOUBLE PRECISION,
30    archive_uri     TEXT,
31    -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
32    needs_dispatch  BOOLEAN NOT NULL DEFAULT FALSE,
33    dispatch_claimed_by    TEXT,
34    dispatch_last_heartbeat DOUBLE PRECISION,
35    created_at      DOUBLE PRECISION NOT NULL,
36    updated_at      DOUBLE PRECISION NOT NULL,
37    completed_at    DOUBLE PRECISION
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44    id              BIGSERIAL PRIMARY KEY,
45    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
46    seq             INTEGER NOT NULL,
47    event_type      TEXT NOT NULL,
48    payload         TEXT,
49    timestamp       DOUBLE PRECISION NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54    id              BIGSERIAL PRIMARY KEY,
55    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
56    seq             INTEGER NOT NULL,
57    name            TEXT NOT NULL,
58    task_queue      TEXT NOT NULL DEFAULT 'main',
59    input           TEXT,
60    status          TEXT NOT NULL DEFAULT 'PENDING',
61    result          TEXT,
62    error           TEXT,
63    attempt         INTEGER NOT NULL DEFAULT 1,
64    max_attempts    INTEGER NOT NULL DEFAULT 3,
65    initial_interval_secs   DOUBLE PRECISION NOT NULL DEFAULT 1,
66    backoff_coefficient     DOUBLE PRECISION NOT NULL DEFAULT 2,
67    start_to_close_secs     DOUBLE PRECISION NOT NULL DEFAULT 300,
68    heartbeat_timeout_secs  DOUBLE PRECISION,
69    claimed_by      TEXT,
70    scheduled_at    DOUBLE PRECISION NOT NULL,
71    started_at      DOUBLE PRECISION,
72    completed_at    DOUBLE PRECISION,
73    last_heartbeat  DOUBLE PRECISION,
74    UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79    id              BIGSERIAL PRIMARY KEY,
80    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
81    seq             INTEGER NOT NULL,
82    fire_at         DOUBLE PRECISION NOT NULL,
83    fired           BOOLEAN NOT NULL DEFAULT FALSE,
84    UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89    id              BIGSERIAL PRIMARY KEY,
90    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
91    name            TEXT NOT NULL,
92    payload         TEXT,
93    consumed        BOOLEAN NOT NULL DEFAULT FALSE,
94    received_at     DOUBLE PRECISION NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99    namespace       TEXT NOT NULL DEFAULT 'main',
100    name            TEXT NOT NULL,
101    workflow_type   TEXT NOT NULL,
102    cron_expr       TEXT NOT NULL,
103    timezone        TEXT NOT NULL DEFAULT 'UTC',
104    input           TEXT,
105    task_queue      TEXT NOT NULL DEFAULT 'main',
106    overlap_policy  TEXT NOT NULL DEFAULT 'skip',
107    paused          BOOLEAN NOT NULL DEFAULT FALSE,
108    last_run_at     DOUBLE PRECISION,
109    next_run_at     DOUBLE PRECISION,
110    last_workflow_id TEXT,
111    created_at      DOUBLE PRECISION NOT NULL,
112    PRIMARY KEY (namespace, name)
113);
114
115CREATE TABLE IF NOT EXISTS workflow_workers (
116    id              TEXT PRIMARY KEY,
117    namespace       TEXT NOT NULL DEFAULT 'main',
118    identity        TEXT NOT NULL,
119    task_queue      TEXT NOT NULL,
120    workflows       TEXT,
121    activities      TEXT,
122    max_concurrent_workflows  INTEGER NOT NULL DEFAULT 10,
123    max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
124    active_tasks    INTEGER NOT NULL DEFAULT 0,
125    last_heartbeat  DOUBLE PRECISION NOT NULL,
126    registered_at   DOUBLE PRECISION NOT NULL
127);
128
129CREATE TABLE IF NOT EXISTS workflow_snapshots (
130    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
131    event_seq       INTEGER NOT NULL,
132    state_json      TEXT NOT NULL,
133    created_at      DOUBLE PRECISION NOT NULL,
134    PRIMARY KEY (workflow_id, event_seq)
135);
136
137CREATE TABLE IF NOT EXISTS api_keys (
138    key_hash        TEXT PRIMARY KEY,
139    prefix          TEXT NOT NULL,
140    label           TEXT,
141    created_at      DOUBLE PRECISION NOT NULL
142);
143CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
144
145-- Future additive migrations go below this line. Postgres supports
146-- `ADD COLUMN IF NOT EXISTS` natively, so the pattern is simply:
147--
148--   ALTER TABLE workflows ADD COLUMN IF NOT EXISTS some_new_field TEXT;
149--
150-- Idempotent across startups; fresh installs pick the column up from the
151-- CREATE TABLE above so the ADD is a no-op. Currently no pending
152-- migrations — baseline schema in CREATE TABLE statements above is the
153-- source of truth through v0.11.3.
154"#;
155
156/// Split a Postgres DDL script into individual statements ready for `sqlx::query`.
157///
158/// Drops pure-comment lines (those starting with `--` after optional whitespace)
159/// *before* splitting on `;`. Without this step, a semicolon inside a line comment
160/// (e.g. `-- Idempotent across startups; fresh installs pick the column up`) would
161/// split the surrounding comment into fragments — one of which is naked prose that
162/// Postgres tries to parse as SQL and rejects with `syntax error at or near "<word>"`.
163///
164/// The filter only drops *pure-comment* lines (leading whitespace then `--`), leaving
165/// `--`-after-code untouched. That keeps string literals safe (could legally contain
166/// `--`) and is conservative enough to remain correct if the SCHEMA grows more prose.
167fn sanitise_schema(schema: &str) -> Vec<String> {
168    let without_comments: String = schema
169        .lines()
170        .filter(|line| !line.trim_start().starts_with("--"))
171        .collect::<Vec<_>>()
172        .join("\n");
173
174    without_comments
175        .split(';')
176        .map(|s| s.trim().to_string())
177        .filter(|s| !s.is_empty())
178        .collect()
179}
180
181pub struct PostgresStore {
182    pool: PgPool,
183}
184
185impl PostgresStore {
186    pub async fn new(url: &str) -> Result<Self> {
187        let pool = PgPool::connect(url).await?;
188        let store = Self { pool };
189        store.migrate().await?;
190        Ok(store)
191    }
192
193    async fn migrate(&self) -> Result<()> {
194        for statement in sanitise_schema(SCHEMA) {
195            sqlx::query(&statement).execute(&self.pool).await?;
196        }
197        Ok(())
198    }
199
200    /// Try to acquire pg_advisory_lock for leader election.
201    /// Returns true if this instance is the leader (scheduler should run).
202    pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
203        let row: (bool,) =
204            sqlx::query_as("SELECT pg_try_advisory_lock(1)")
205                .fetch_one(&self.pool)
206                .await?;
207        Ok(row.0)
208    }
209}
210
211impl WorkflowStore for PostgresStore {
212    // ── Namespaces ─────────────────────────────────────────
213
214    async fn create_namespace(&self, name: &str) -> Result<()> {
215        sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
216            .bind(name)
217            .execute(&self.pool)
218            .await?;
219        Ok(())
220    }
221
222    async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
223        let rows = sqlx::query_as::<_, (String, f64)>(
224            "SELECT name, created_at FROM namespaces ORDER BY name",
225        )
226        .fetch_all(&self.pool)
227        .await?;
228        Ok(rows
229            .into_iter()
230            .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
231            .collect())
232    }
233
234    async fn delete_namespace(&self, name: &str) -> Result<bool> {
235        let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
236            .bind(name)
237            .execute(&self.pool)
238            .await?;
239        Ok(res.rows_affected() > 0)
240    }
241
242    async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
243        let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
244            .bind(namespace)
245            .fetch_one(&self.pool)
246            .await?;
247        let running: (i64,) = sqlx::query_as(
248            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
249        )
250        .bind(namespace)
251        .fetch_one(&self.pool)
252        .await?;
253        let pending: (i64,) = sqlx::query_as(
254            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
255        )
256        .bind(namespace)
257        .fetch_one(&self.pool)
258        .await?;
259        let completed: (i64,) = sqlx::query_as(
260            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
261        )
262        .bind(namespace)
263        .fetch_one(&self.pool)
264        .await?;
265        let failed: (i64,) = sqlx::query_as(
266            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
267        )
268        .bind(namespace)
269        .fetch_one(&self.pool)
270        .await?;
271        let schedules: (i64,) =
272            sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
273                .bind(namespace)
274                .fetch_one(&self.pool)
275                .await?;
276        let workers: (i64,) =
277            sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
278                .bind(namespace)
279                .fetch_one(&self.pool)
280                .await?;
281
282        Ok(crate::store::NamespaceStats {
283            namespace: namespace.to_string(),
284            total_workflows: total.0,
285            running: running.0,
286            pending: pending.0,
287            completed: completed.0,
288            failed: failed.0,
289            schedules: schedules.0,
290            workers: workers.0,
291        })
292    }
293
294    // ── Workflows ──────────────────────────────────────────
295
296    async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
297        sqlx::query(
298            "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
299             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
300        )
301        .bind(&wf.id)
302        .bind(&wf.namespace)
303        .bind(&wf.run_id)
304        .bind(&wf.workflow_type)
305        .bind(&wf.task_queue)
306        .bind(&wf.status)
307        .bind(&wf.input)
308        .bind(&wf.result)
309        .bind(&wf.error)
310        .bind(&wf.parent_id)
311        .bind(&wf.claimed_by)
312        .bind(&wf.search_attributes)
313        .bind(wf.archived_at)
314        .bind(&wf.archive_uri)
315        .bind(wf.created_at)
316        .bind(wf.updated_at)
317        .bind(wf.completed_at)
318        .execute(&self.pool)
319        .await?;
320        Ok(())
321    }
322
323    async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
324        let row = sqlx::query_as::<_, PgWorkflowRow>(
325            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
326        )
327        .bind(id)
328        .fetch_optional(&self.pool)
329        .await?;
330        Ok(row.map(Into::into))
331    }
332
333    async fn list_workflows(
334        &self,
335        namespace: &str,
336        status: Option<WorkflowStatus>,
337        workflow_type: Option<&str>,
338        search_attrs_filter: Option<&str>,
339        limit: i64,
340        offset: i64,
341    ) -> Result<Vec<WorkflowRecord>> {
342        let status_str = status.map(|s| s.to_string());
343
344        let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
345            .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
346            .and_then(|v| v.as_object().cloned())
347            .map(|m| m.into_iter().collect())
348            .unwrap_or_default();
349
350        let mut sql = String::from(
351            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
352             FROM workflows
353             WHERE namespace = $1
354               AND ($2::TEXT IS NULL OR status = $2)
355               AND ($3::TEXT IS NULL OR workflow_type = $3)",
356        );
357        // Bind placeholders for the filter follow $3; next index is 4.
358        let mut idx = 4usize;
359        for _ in &filter_pairs {
360            sql.push_str(&format!(
361                " AND (search_attributes::jsonb)->>${} = ${}",
362                idx,
363                idx + 1
364            ));
365            idx += 2;
366        }
367        sql.push_str(&format!(" ORDER BY created_at DESC LIMIT ${} OFFSET ${}", idx, idx + 1));
368
369        let mut q = sqlx::query_as::<_, PgWorkflowRow>(&sql)
370            .bind(namespace)
371            .bind(&status_str)
372            .bind(workflow_type);
373        for (key, value) in &filter_pairs {
374            q = q.bind(key.clone());
375            // JSONB ->> always returns TEXT; compare by stringified value.
376            let as_text = match value {
377                serde_json::Value::String(s) => s.clone(),
378                other => other.to_string(),
379            };
380            q = q.bind(as_text);
381        }
382        let rows = q.bind(limit).bind(offset).fetch_all(&self.pool).await?;
383        Ok(rows.into_iter().map(Into::into).collect())
384    }
385
386    async fn update_workflow_status(
387        &self,
388        id: &str,
389        status: WorkflowStatus,
390        result: Option<&str>,
391        error: Option<&str>,
392    ) -> Result<()> {
393        let now = timestamp_now();
394        let completed_at = if status.is_terminal() { Some(now) } else { None };
395        sqlx::query(
396            "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
397        )
398        .bind(status.to_string())
399        .bind(result)
400        .bind(error)
401        .bind(now)
402        .bind(completed_at)
403        .bind(id)
404        .execute(&self.pool)
405        .await?;
406        Ok(())
407    }
408
409    async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
410        let res = sqlx::query(
411            "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
412        )
413        .bind(worker_id)
414        .bind(timestamp_now())
415        .bind(id)
416        .execute(&self.pool)
417        .await?;
418        Ok(res.rows_affected() > 0)
419    }
420
421    async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
422        sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
423            .bind(workflow_id)
424            .execute(&self.pool)
425            .await?;
426        Ok(())
427    }
428
429    async fn claim_workflow_task(
430        &self,
431        task_queue: &str,
432        worker_id: &str,
433    ) -> Result<Option<WorkflowRecord>> {
434        let now = timestamp_now();
435        // Atomic claim with FOR UPDATE SKIP LOCKED so multiple engine
436        // replicas don't fight over the same workflow task.
437        let row = sqlx::query_as::<_, PgWorkflowRow>(
438            "UPDATE workflows
439             SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
440             WHERE id = (
441                SELECT id FROM workflows
442                WHERE task_queue = $3
443                  AND needs_dispatch = TRUE
444                  AND dispatch_claimed_by IS NULL
445                  AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
446                ORDER BY updated_at ASC
447                FOR UPDATE SKIP LOCKED
448                LIMIT 1
449             )
450             RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
451        )
452        .bind(worker_id)
453        .bind(now)
454        .bind(task_queue)
455        .fetch_optional(&self.pool)
456        .await?;
457        Ok(row.map(Into::into))
458    }
459
460    async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
461        sqlx::query(
462            "UPDATE workflows
463             SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
464             WHERE id = $1 AND dispatch_claimed_by = $2",
465        )
466        .bind(workflow_id)
467        .bind(worker_id)
468        .execute(&self.pool)
469        .await?;
470        Ok(())
471    }
472
473    async fn release_stale_dispatch_leases(
474        &self,
475        now: f64,
476        timeout_secs: f64,
477    ) -> Result<u64> {
478        let res = sqlx::query(
479            "UPDATE workflows
480             SET dispatch_claimed_by = NULL,
481                 dispatch_last_heartbeat = NULL,
482                 needs_dispatch = TRUE
483             WHERE dispatch_claimed_by IS NOT NULL
484               AND ($1 - dispatch_last_heartbeat) > $2
485               AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
486        )
487        .bind(now)
488        .bind(timeout_secs)
489        .execute(&self.pool)
490        .await?;
491        Ok(res.rows_affected())
492    }
493
494    // ── Events ─────────────────────────────────────────────
495
496    async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
497        let row: (i64,) = sqlx::query_as(
498            "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
499        )
500        .bind(&ev.workflow_id)
501        .bind(ev.seq)
502        .bind(&ev.event_type)
503        .bind(&ev.payload)
504        .bind(ev.timestamp)
505        .fetch_one(&self.pool)
506        .await?;
507        Ok(row.0)
508    }
509
510    async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
511        let rows = sqlx::query_as::<_, PgEventRow>(
512            "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
513        )
514        .bind(workflow_id)
515        .fetch_all(&self.pool)
516        .await?;
517        Ok(rows.into_iter().map(Into::into).collect())
518    }
519
520    async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
521        let row: (i64,) =
522            sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
523                .bind(workflow_id)
524                .fetch_one(&self.pool)
525                .await?;
526        Ok(row.0)
527    }
528
529    // ── Activities ──────────────────────────────────────────
530
531    async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
532        let row: (i64,) = sqlx::query_as(
533            "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
534             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
535        )
536        .bind(&act.workflow_id)
537        .bind(act.seq)
538        .bind(&act.name)
539        .bind(&act.task_queue)
540        .bind(&act.input)
541        .bind(&act.status)
542        .bind(act.attempt)
543        .bind(act.max_attempts)
544        .bind(act.initial_interval_secs)
545        .bind(act.backoff_coefficient)
546        .bind(act.start_to_close_secs)
547        .bind(act.heartbeat_timeout_secs)
548        .bind(act.scheduled_at)
549        .fetch_one(&self.pool)
550        .await?;
551        Ok(row.0)
552    }
553
554    async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
555        let row = sqlx::query_as::<_, PgActivityRow>(
556            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
557             FROM workflow_activities WHERE id = $1",
558        )
559        .bind(id)
560        .fetch_optional(&self.pool)
561        .await?;
562        Ok(row.map(Into::into))
563    }
564
565    async fn get_activity_by_workflow_seq(
566        &self,
567        workflow_id: &str,
568        seq: i32,
569    ) -> Result<Option<WorkflowActivity>> {
570        let row = sqlx::query_as::<_, PgActivityRow>(
571            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
572             FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
573        )
574        .bind(workflow_id)
575        .bind(seq)
576        .fetch_optional(&self.pool)
577        .await?;
578        Ok(row.map(Into::into))
579    }
580
581    async fn claim_activity(
582        &self,
583        task_queue: &str,
584        worker_id: &str,
585    ) -> Result<Option<WorkflowActivity>> {
586        let now = timestamp_now();
587        // Atomic claim using FOR UPDATE SKIP LOCKED — prevents contention
588        // between multiple assay serve instances claiming the same activity
589        let row = sqlx::query_as::<_, PgActivityRow>(
590            "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
591             WHERE id = (
592                SELECT id FROM workflow_activities
593                WHERE task_queue = $3 AND status = 'PENDING'
594                ORDER BY scheduled_at ASC
595                FOR UPDATE SKIP LOCKED
596                LIMIT 1
597             )
598             RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
599        )
600        .bind(worker_id)
601        .bind(now)
602        .bind(task_queue)
603        .fetch_optional(&self.pool)
604        .await?;
605        Ok(row.map(Into::into))
606    }
607
608    async fn requeue_activity_for_retry(
609        &self,
610        id: i64,
611        next_attempt: i32,
612        next_scheduled_at: f64,
613    ) -> Result<()> {
614        sqlx::query(
615            "UPDATE workflow_activities
616             SET status = 'PENDING', attempt = $1, scheduled_at = $2,
617                 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
618                 error = NULL
619             WHERE id = $3",
620        )
621        .bind(next_attempt)
622        .bind(next_scheduled_at)
623        .bind(id)
624        .execute(&self.pool)
625        .await?;
626        Ok(())
627    }
628
629    async fn complete_activity(
630        &self,
631        id: i64,
632        result: Option<&str>,
633        error: Option<&str>,
634        failed: bool,
635    ) -> Result<()> {
636        let status = if failed { "FAILED" } else { "COMPLETED" };
637        sqlx::query(
638            "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
639        )
640        .bind(status)
641        .bind(result)
642        .bind(error)
643        .bind(timestamp_now())
644        .bind(id)
645        .execute(&self.pool)
646        .await?;
647        Ok(())
648    }
649
650    async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
651        sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
652            .bind(timestamp_now())
653            .bind(id)
654            .execute(&self.pool)
655            .await?;
656        Ok(())
657    }
658
659    async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
660        let rows = sqlx::query_as::<_, PgActivityRow>(
661            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
662             FROM workflow_activities
663             WHERE status = 'RUNNING'
664               AND heartbeat_timeout_secs IS NOT NULL
665               AND last_heartbeat IS NOT NULL
666               AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
667        )
668        .bind(now)
669        .fetch_all(&self.pool)
670        .await?;
671        Ok(rows.into_iter().map(Into::into).collect())
672    }
673
674    // ── Timers ──────────────────────────────────────────────
675
676    async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
677        let row: (i64,) = sqlx::query_as(
678            "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
679        )
680        .bind(&timer.workflow_id)
681        .bind(timer.seq)
682        .bind(timer.fire_at)
683        .fetch_one(&self.pool)
684        .await?;
685        Ok(row.0)
686    }
687
688    async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
689        let res = sqlx::query(
690            "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
691             WHERE workflow_id = $2 AND status = 'PENDING'",
692        )
693        .bind(timestamp_now())
694        .bind(workflow_id)
695        .execute(&self.pool)
696        .await?;
697        Ok(res.rows_affected())
698    }
699
700    async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
701        let res = sqlx::query(
702            "UPDATE workflow_timers SET fired = TRUE
703             WHERE workflow_id = $1 AND fired = FALSE",
704        )
705        .bind(workflow_id)
706        .execute(&self.pool)
707        .await?;
708        Ok(res.rows_affected())
709    }
710
711    async fn get_timer_by_workflow_seq(
712        &self,
713        workflow_id: &str,
714        seq: i32,
715    ) -> Result<Option<WorkflowTimer>> {
716        let row = sqlx::query_as::<_, PgTimerRow>(
717            "SELECT id, workflow_id, seq, fire_at, fired
718             FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
719        )
720        .bind(workflow_id)
721        .bind(seq)
722        .fetch_optional(&self.pool)
723        .await?;
724        Ok(row.map(Into::into))
725    }
726
727    async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
728        let rows = sqlx::query_as::<_, PgTimerRow>(
729            "UPDATE workflow_timers SET fired = TRUE
730             WHERE fired = FALSE AND fire_at <= $1
731             RETURNING id, workflow_id, seq, fire_at, fired",
732        )
733        .bind(now)
734        .fetch_all(&self.pool)
735        .await?;
736        Ok(rows.into_iter().map(Into::into).collect())
737    }
738
739    // ── Signals ─────────────────────────────────────────────
740
741    async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
742        let row: (i64,) = sqlx::query_as(
743            "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
744        )
745        .bind(&sig.workflow_id)
746        .bind(&sig.name)
747        .bind(&sig.payload)
748        .bind(sig.received_at)
749        .fetch_one(&self.pool)
750        .await?;
751        Ok(row.0)
752    }
753
754    async fn consume_signals(
755        &self,
756        workflow_id: &str,
757        name: &str,
758    ) -> Result<Vec<WorkflowSignal>> {
759        let rows = sqlx::query_as::<_, PgSignalRow>(
760            "UPDATE workflow_signals SET consumed = TRUE
761             WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
762             RETURNING id, workflow_id, name, payload, consumed, received_at",
763        )
764        .bind(workflow_id)
765        .bind(name)
766        .fetch_all(&self.pool)
767        .await?;
768        Ok(rows.into_iter().map(Into::into).collect())
769    }
770
771    // ── Schedules ───────────────────────────────────────────
772
773    async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
774        sqlx::query(
775            "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
776             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
777        )
778        .bind(&sched.namespace)
779        .bind(&sched.name)
780        .bind(&sched.workflow_type)
781        .bind(&sched.cron_expr)
782        .bind(&sched.timezone)
783        .bind(&sched.input)
784        .bind(&sched.task_queue)
785        .bind(&sched.overlap_policy)
786        .bind(sched.paused)
787        .bind(sched.last_run_at)
788        .bind(sched.next_run_at)
789        .bind(&sched.last_workflow_id)
790        .bind(sched.created_at)
791        .execute(&self.pool)
792        .await?;
793        Ok(())
794    }
795
796    async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
797        let row = sqlx::query_as::<_, PgScheduleRow>(
798            "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
799        )
800        .bind(namespace)
801        .bind(name)
802        .fetch_optional(&self.pool)
803        .await?;
804        Ok(row.map(Into::into))
805    }
806
807    async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
808        let rows = sqlx::query_as::<_, PgScheduleRow>(
809            "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
810        )
811        .bind(namespace)
812        .fetch_all(&self.pool)
813        .await?;
814        Ok(rows.into_iter().map(Into::into).collect())
815    }
816
817    async fn update_schedule_last_run(
818        &self,
819        namespace: &str,
820        name: &str,
821        last_run_at: f64,
822        next_run_at: f64,
823        workflow_id: &str,
824    ) -> Result<()> {
825        sqlx::query(
826            "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
827        )
828        .bind(last_run_at)
829        .bind(next_run_at)
830        .bind(workflow_id)
831        .bind(namespace)
832        .bind(name)
833        .execute(&self.pool)
834        .await?;
835        Ok(())
836    }
837
838    async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
839        let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
840            .bind(namespace)
841            .bind(name)
842            .execute(&self.pool)
843            .await?;
844        Ok(res.rows_affected() > 0)
845    }
846
847    async fn list_archivable_workflows(
848        &self,
849        cutoff: f64,
850        limit: i64,
851    ) -> Result<Vec<WorkflowRecord>> {
852        let rows = sqlx::query_as::<_, PgWorkflowRow>(
853            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
854             FROM workflows
855             WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
856               AND completed_at IS NOT NULL
857               AND completed_at < $1
858               AND archived_at IS NULL
859             ORDER BY completed_at ASC
860             LIMIT $2",
861        )
862        .bind(cutoff)
863        .bind(limit)
864        .fetch_all(&self.pool)
865        .await?;
866        Ok(rows.into_iter().map(Into::into).collect())
867    }
868
869    async fn mark_archived_and_purge(
870        &self,
871        workflow_id: &str,
872        archive_uri: &str,
873        archived_at: f64,
874    ) -> Result<()> {
875        let mut tx = self.pool.begin().await?;
876        sqlx::query("DELETE FROM workflow_events WHERE workflow_id = $1")
877            .bind(workflow_id)
878            .execute(&mut *tx)
879            .await?;
880        sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = $1")
881            .bind(workflow_id)
882            .execute(&mut *tx)
883            .await?;
884        sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = $1")
885            .bind(workflow_id)
886            .execute(&mut *tx)
887            .await?;
888        sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = $1")
889            .bind(workflow_id)
890            .execute(&mut *tx)
891            .await?;
892        sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = $1")
893            .bind(workflow_id)
894            .execute(&mut *tx)
895            .await?;
896        sqlx::query(
897            "UPDATE workflows SET archived_at = $1, archive_uri = $2 WHERE id = $3",
898        )
899        .bind(archived_at)
900        .bind(archive_uri)
901        .bind(workflow_id)
902        .execute(&mut *tx)
903        .await?;
904        tx.commit().await?;
905        Ok(())
906    }
907
908    async fn upsert_search_attributes(
909        &self,
910        workflow_id: &str,
911        patch_json: &str,
912    ) -> Result<()> {
913        let current: Option<(Option<String>,)> =
914            sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = $1")
915                .bind(workflow_id)
916                .fetch_optional(&self.pool)
917                .await?;
918        let merged = crate::store::sqlite::merge_search_attrs(
919            current.and_then(|(s,)| s).as_deref(),
920            patch_json,
921        )?;
922        sqlx::query("UPDATE workflows SET search_attributes = $1 WHERE id = $2")
923            .bind(merged)
924            .bind(workflow_id)
925            .execute(&self.pool)
926            .await?;
927        Ok(())
928    }
929
930    async fn update_schedule(
931        &self,
932        namespace: &str,
933        name: &str,
934        patch: &SchedulePatch,
935    ) -> Result<Option<WorkflowSchedule>> {
936        let mut sets: Vec<String> = Vec::new();
937        let mut idx = 1usize;
938        if patch.cron_expr.is_some() {
939            sets.push(format!("cron_expr = ${idx}"));
940            idx += 1;
941        }
942        if patch.timezone.is_some() {
943            sets.push(format!("timezone = ${idx}"));
944            idx += 1;
945        }
946        if patch.input.is_some() {
947            sets.push(format!("input = ${idx}"));
948            idx += 1;
949        }
950        if patch.task_queue.is_some() {
951            sets.push(format!("task_queue = ${idx}"));
952            idx += 1;
953        }
954        if patch.overlap_policy.is_some() {
955            sets.push(format!("overlap_policy = ${idx}"));
956            idx += 1;
957        }
958        if sets.is_empty() {
959            return self.get_schedule(namespace, name).await;
960        }
961        let sql = format!(
962            "UPDATE workflow_schedules SET {} WHERE namespace = ${} AND name = ${}",
963            sets.join(", "),
964            idx,
965            idx + 1
966        );
967        let mut q = sqlx::query(&sql);
968        if let Some(ref v) = patch.cron_expr {
969            q = q.bind(v);
970        }
971        if let Some(ref v) = patch.timezone {
972            q = q.bind(v);
973        }
974        if let Some(ref v) = patch.input {
975            q = q.bind(v.to_string());
976        }
977        if let Some(ref v) = patch.task_queue {
978            q = q.bind(v);
979        }
980        if let Some(ref v) = patch.overlap_policy {
981            q = q.bind(v);
982        }
983        let res = q
984            .bind(namespace)
985            .bind(name)
986            .execute(&self.pool)
987            .await?;
988        if res.rows_affected() == 0 {
989            return Ok(None);
990        }
991        self.get_schedule(namespace, name).await
992    }
993
994    async fn set_schedule_paused(
995        &self,
996        namespace: &str,
997        name: &str,
998        paused: bool,
999    ) -> Result<Option<WorkflowSchedule>> {
1000        let res = sqlx::query(
1001            "UPDATE workflow_schedules SET paused = $1 WHERE namespace = $2 AND name = $3",
1002        )
1003        .bind(paused)
1004        .bind(namespace)
1005        .bind(name)
1006        .execute(&self.pool)
1007        .await?;
1008        if res.rows_affected() == 0 {
1009            return Ok(None);
1010        }
1011        self.get_schedule(namespace, name).await
1012    }
1013
1014    // ── Workers ─────────────────────────────────────────────
1015
1016    async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1017        sqlx::query(
1018            "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1019             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1020             ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
1021        )
1022        .bind(&w.id)
1023        .bind(&w.namespace)
1024        .bind(&w.identity)
1025        .bind(&w.task_queue)
1026        .bind(&w.workflows)
1027        .bind(&w.activities)
1028        .bind(w.max_concurrent_workflows)
1029        .bind(w.max_concurrent_activities)
1030        .bind(w.active_tasks)
1031        .bind(w.last_heartbeat)
1032        .bind(w.registered_at)
1033        .execute(&self.pool)
1034        .await?;
1035        Ok(())
1036    }
1037
1038    async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1039        sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
1040            .bind(now)
1041            .bind(id)
1042            .execute(&self.pool)
1043            .await?;
1044        Ok(())
1045    }
1046
1047    async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1048        let rows = sqlx::query_as::<_, PgWorkerRow>(
1049            "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
1050        )
1051        .bind(namespace)
1052        .fetch_all(&self.pool)
1053        .await?;
1054        Ok(rows.into_iter().map(Into::into).collect())
1055    }
1056
1057    async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1058        let rows: Vec<(String,)> =
1059            sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
1060                .bind(cutoff)
1061                .fetch_all(&self.pool)
1062                .await?;
1063        let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1064        if !ids.is_empty() {
1065            sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
1066                .bind(cutoff)
1067                .execute(&self.pool)
1068                .await?;
1069        }
1070        Ok(ids)
1071    }
1072
1073    // ── API Keys ────────────────────────────────────────────
1074
1075    async fn create_api_key(
1076        &self,
1077        key_hash: &str,
1078        prefix: &str,
1079        label: Option<&str>,
1080        created_at: f64,
1081    ) -> Result<()> {
1082        sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
1083            .bind(key_hash)
1084            .bind(prefix)
1085            .bind(label)
1086            .bind(created_at)
1087            .execute(&self.pool)
1088            .await?;
1089        Ok(())
1090    }
1091
1092    async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1093        let row: Option<(i64,)> =
1094            sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
1095                .bind(key_hash)
1096                .fetch_optional(&self.pool)
1097                .await?;
1098        Ok(row.is_some())
1099    }
1100
1101    async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
1102        let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1103            "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1104        )
1105        .fetch_all(&self.pool)
1106        .await?;
1107        Ok(rows
1108            .into_iter()
1109            .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1110                prefix,
1111                label,
1112                created_at,
1113            })
1114            .collect())
1115    }
1116
1117    async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1118        let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
1119            .bind(prefix)
1120            .execute(&self.pool)
1121            .await?;
1122        Ok(res.rows_affected() > 0)
1123    }
1124
1125    async fn api_keys_empty(&self) -> Result<bool> {
1126        let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM api_keys")
1127            .fetch_one(&self.pool)
1128            .await?;
1129        Ok(row.0 == 0)
1130    }
1131
1132    async fn get_api_key_by_label(
1133        &self,
1134        label: &str,
1135    ) -> Result<Option<crate::store::ApiKeyRecord>> {
1136        let row: Option<(String, Option<String>, f64)> = sqlx::query_as(
1137            "SELECT prefix, label, created_at FROM api_keys WHERE label = $1 LIMIT 1",
1138        )
1139        .bind(label)
1140        .fetch_optional(&self.pool)
1141        .await?;
1142        Ok(row.map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1143            prefix,
1144            label,
1145            created_at,
1146        }))
1147    }
1148
1149    // ── Child Workflows ─────────────────────────────────────
1150
1151    async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1152        let rows = sqlx::query_as::<_, PgWorkflowRow>(
1153            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1154             FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
1155        )
1156        .bind(parent_id)
1157        .fetch_all(&self.pool)
1158        .await?;
1159        Ok(rows.into_iter().map(Into::into).collect())
1160    }
1161
1162    // ── Snapshots ───────────────────────────────────────────
1163
1164    async fn create_snapshot(
1165        &self,
1166        workflow_id: &str,
1167        event_seq: i32,
1168        state_json: &str,
1169    ) -> Result<()> {
1170        sqlx::query(
1171            "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1172             VALUES ($1, $2, $3, $4)
1173             ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
1174        )
1175        .bind(workflow_id)
1176        .bind(event_seq)
1177        .bind(state_json)
1178        .bind(timestamp_now())
1179        .execute(&self.pool)
1180        .await?;
1181        Ok(())
1182    }
1183
1184    async fn get_latest_snapshot(
1185        &self,
1186        workflow_id: &str,
1187    ) -> Result<Option<WorkflowSnapshot>> {
1188        let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1189            "SELECT workflow_id, event_seq, state_json, created_at
1190             FROM workflow_snapshots WHERE workflow_id = $1
1191             ORDER BY event_seq DESC LIMIT 1",
1192        )
1193        .bind(workflow_id)
1194        .fetch_optional(&self.pool)
1195        .await?;
1196
1197        Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1198            workflow_id,
1199            event_seq,
1200            state_json,
1201            created_at,
1202        }))
1203    }
1204
1205    // ── Queue Stats ─────────────────────────────────────────
1206
1207    async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1208        let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
1209            "SELECT
1210                a.task_queue AS queue,
1211                SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
1212                SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
1213                (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
1214             FROM workflow_activities a
1215             JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
1216             GROUP BY a.task_queue",
1217        )
1218        .bind(namespace)
1219        .fetch_all(&self.pool)
1220        .await?;
1221
1222        Ok(rows
1223            .into_iter()
1224            .map(|(queue, pending, running, workers)| crate::store::QueueStats {
1225                queue,
1226                pending_activities: pending,
1227                running_activities: running,
1228                workers,
1229            })
1230            .collect())
1231    }
1232
1233    // ── Leader Election ─────────────────────────────────────
1234
1235    async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1236        // pg_try_advisory_lock is session-scoped — only one connection
1237        // in the pool will hold the lock. In a multi-replica Kubernetes
1238        // deployment, only one pod's connection wins.
1239        let row: (bool,) =
1240            sqlx::query_as("SELECT pg_try_advisory_lock(42)")
1241                .fetch_one(&self.pool)
1242                .await?;
1243        Ok(row.0)
1244    }
1245}
1246
1247fn timestamp_now() -> f64 {
1248    std::time::SystemTime::now()
1249        .duration_since(std::time::UNIX_EPOCH)
1250        .unwrap()
1251        .as_secs_f64()
1252}
1253
1254// ── Postgres row types (sqlx::FromRow) ──────────────────────
1255
1256#[derive(sqlx::FromRow)]
1257struct PgWorkflowRow {
1258    id: String,
1259    namespace: String,
1260    run_id: String,
1261    workflow_type: String,
1262    task_queue: String,
1263    status: String,
1264    input: Option<String>,
1265    result: Option<String>,
1266    error: Option<String>,
1267    parent_id: Option<String>,
1268    claimed_by: Option<String>,
1269    search_attributes: Option<String>,
1270    archived_at: Option<f64>,
1271    archive_uri: Option<String>,
1272    created_at: f64,
1273    updated_at: f64,
1274    completed_at: Option<f64>,
1275}
1276
1277impl From<PgWorkflowRow> for WorkflowRecord {
1278    fn from(r: PgWorkflowRow) -> Self {
1279        Self {
1280            id: r.id,
1281            namespace: r.namespace,
1282            run_id: r.run_id,
1283            workflow_type: r.workflow_type,
1284            task_queue: r.task_queue,
1285            status: r.status,
1286            input: r.input,
1287            result: r.result,
1288            error: r.error,
1289            parent_id: r.parent_id,
1290            claimed_by: r.claimed_by,
1291            search_attributes: r.search_attributes,
1292            archived_at: r.archived_at,
1293            archive_uri: r.archive_uri,
1294            created_at: r.created_at,
1295            updated_at: r.updated_at,
1296            completed_at: r.completed_at,
1297        }
1298    }
1299}
1300
1301#[derive(sqlx::FromRow)]
1302struct PgEventRow {
1303    id: i64,
1304    workflow_id: String,
1305    seq: i32,
1306    event_type: String,
1307    payload: Option<String>,
1308    timestamp: f64,
1309}
1310
1311impl From<PgEventRow> for WorkflowEvent {
1312    fn from(r: PgEventRow) -> Self {
1313        Self {
1314            id: Some(r.id),
1315            workflow_id: r.workflow_id,
1316            seq: r.seq,
1317            event_type: r.event_type,
1318            payload: r.payload,
1319            timestamp: r.timestamp,
1320        }
1321    }
1322}
1323
1324#[derive(sqlx::FromRow)]
1325struct PgActivityRow {
1326    id: i64,
1327    workflow_id: String,
1328    seq: i32,
1329    name: String,
1330    task_queue: String,
1331    input: Option<String>,
1332    status: String,
1333    result: Option<String>,
1334    error: Option<String>,
1335    attempt: i32,
1336    max_attempts: i32,
1337    initial_interval_secs: f64,
1338    backoff_coefficient: f64,
1339    start_to_close_secs: f64,
1340    heartbeat_timeout_secs: Option<f64>,
1341    claimed_by: Option<String>,
1342    scheduled_at: f64,
1343    started_at: Option<f64>,
1344    completed_at: Option<f64>,
1345    last_heartbeat: Option<f64>,
1346}
1347
1348impl From<PgActivityRow> for WorkflowActivity {
1349    fn from(r: PgActivityRow) -> Self {
1350        Self {
1351            id: Some(r.id),
1352            workflow_id: r.workflow_id,
1353            seq: r.seq,
1354            name: r.name,
1355            task_queue: r.task_queue,
1356            input: r.input,
1357            status: r.status,
1358            result: r.result,
1359            error: r.error,
1360            attempt: r.attempt,
1361            max_attempts: r.max_attempts,
1362            initial_interval_secs: r.initial_interval_secs,
1363            backoff_coefficient: r.backoff_coefficient,
1364            start_to_close_secs: r.start_to_close_secs,
1365            heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1366            claimed_by: r.claimed_by,
1367            scheduled_at: r.scheduled_at,
1368            started_at: r.started_at,
1369            completed_at: r.completed_at,
1370            last_heartbeat: r.last_heartbeat,
1371        }
1372    }
1373}
1374
1375#[derive(sqlx::FromRow)]
1376struct PgTimerRow {
1377    id: i64,
1378    workflow_id: String,
1379    seq: i32,
1380    fire_at: f64,
1381    fired: bool,
1382}
1383
1384impl From<PgTimerRow> for WorkflowTimer {
1385    fn from(r: PgTimerRow) -> Self {
1386        Self {
1387            id: Some(r.id),
1388            workflow_id: r.workflow_id,
1389            seq: r.seq,
1390            fire_at: r.fire_at,
1391            fired: r.fired,
1392        }
1393    }
1394}
1395
1396#[derive(sqlx::FromRow)]
1397struct PgSignalRow {
1398    id: i64,
1399    workflow_id: String,
1400    name: String,
1401    payload: Option<String>,
1402    consumed: bool,
1403    received_at: f64,
1404}
1405
1406impl From<PgSignalRow> for WorkflowSignal {
1407    fn from(r: PgSignalRow) -> Self {
1408        Self {
1409            id: Some(r.id),
1410            workflow_id: r.workflow_id,
1411            name: r.name,
1412            payload: r.payload,
1413            consumed: r.consumed,
1414            received_at: r.received_at,
1415        }
1416    }
1417}
1418
1419#[derive(sqlx::FromRow)]
1420struct PgScheduleRow {
1421    namespace: String,
1422    name: String,
1423    workflow_type: String,
1424    cron_expr: String,
1425    timezone: String,
1426    input: Option<String>,
1427    task_queue: String,
1428    overlap_policy: String,
1429    paused: bool,
1430    last_run_at: Option<f64>,
1431    next_run_at: Option<f64>,
1432    last_workflow_id: Option<String>,
1433    created_at: f64,
1434}
1435
1436impl From<PgScheduleRow> for WorkflowSchedule {
1437    fn from(r: PgScheduleRow) -> Self {
1438        Self {
1439            namespace: r.namespace,
1440            name: r.name,
1441            workflow_type: r.workflow_type,
1442            cron_expr: r.cron_expr,
1443            timezone: r.timezone,
1444            input: r.input,
1445            task_queue: r.task_queue,
1446            overlap_policy: r.overlap_policy,
1447            paused: r.paused,
1448            last_run_at: r.last_run_at,
1449            next_run_at: r.next_run_at,
1450            last_workflow_id: r.last_workflow_id,
1451            created_at: r.created_at,
1452        }
1453    }
1454}
1455
1456#[derive(sqlx::FromRow)]
1457struct PgWorkerRow {
1458    id: String,
1459    namespace: String,
1460    identity: String,
1461    task_queue: String,
1462    workflows: Option<String>,
1463    activities: Option<String>,
1464    max_concurrent_workflows: i32,
1465    max_concurrent_activities: i32,
1466    active_tasks: i32,
1467    last_heartbeat: f64,
1468    registered_at: f64,
1469}
1470
1471impl From<PgWorkerRow> for WorkflowWorker {
1472    fn from(r: PgWorkerRow) -> Self {
1473        Self {
1474            id: r.id,
1475            namespace: r.namespace,
1476            identity: r.identity,
1477            task_queue: r.task_queue,
1478            workflows: r.workflows,
1479            activities: r.activities,
1480            max_concurrent_workflows: r.max_concurrent_workflows,
1481            max_concurrent_activities: r.max_concurrent_activities,
1482            active_tasks: r.active_tasks,
1483            last_heartbeat: r.last_heartbeat,
1484            registered_at: r.registered_at,
1485        }
1486    }
1487}
1488
1489#[cfg(test)]
1490mod tests {
1491    use super::*;
1492
1493    #[test]
1494    fn sanitise_schema_keeps_statements_intact() {
1495        let input = "CREATE TABLE foo (x INT);\nCREATE INDEX idx_foo ON foo(x);\n";
1496        let out = sanitise_schema(input);
1497        assert_eq!(out.len(), 2);
1498        assert!(out[0].starts_with("CREATE TABLE foo"));
1499        assert!(out[1].starts_with("CREATE INDEX idx_foo"));
1500    }
1501
1502    #[test]
1503    fn sanitise_schema_drops_pure_comment_lines() {
1504        let input = "-- header comment\nCREATE TABLE foo (x INT);\n-- trailing comment\n";
1505        let out = sanitise_schema(input);
1506        assert_eq!(out.len(), 1);
1507        assert!(out[0].starts_with("CREATE TABLE foo"));
1508    }
1509
1510    #[test]
1511    fn sanitise_schema_ignores_semicolons_inside_comment_prose() {
1512        // Regression: the exact shape that broke v0.11.3–v0.11.5 in production.
1513        // `-- foo; bar` used to split into "foo" and " bar" fragments, the second
1514        // of which was executed as SQL and rejected with `syntax error at or near "bar"`.
1515        let input = "\
1516CREATE TABLE foo (x INT);
1517-- Idempotent across startups; fresh installs pick the column up from the
1518-- CREATE TABLE above so the ADD is a no-op.
1519";
1520        let out = sanitise_schema(input);
1521        assert_eq!(
1522            out.len(),
1523            1,
1524            "expected 1 real statement, got {}: {:?}",
1525            out.len(),
1526            out
1527        );
1528        assert!(out[0].starts_with("CREATE TABLE foo"));
1529    }
1530
1531    #[test]
1532    fn sanitise_schema_drops_indented_comment_lines() {
1533        let input = "  -- indented comment\n\tCREATE TABLE foo (x INT);\n";
1534        let out = sanitise_schema(input);
1535        assert_eq!(out.len(), 1);
1536        assert!(out[0].contains("CREATE TABLE foo"));
1537    }
1538
1539    #[test]
1540    fn sanitise_schema_real_constant_produces_only_ddl() {
1541        // The real SCHEMA constant must not produce any statement whose first
1542        // token isn't a recognised SQL keyword. A prose fragment leaking in
1543        // (e.g. "fresh installs...") means the filter regressed.
1544        for stmt in sanitise_schema(SCHEMA) {
1545            let first_word = stmt
1546                .split_whitespace()
1547                .next()
1548                .expect("non-empty statement")
1549                .to_uppercase();
1550            assert!(
1551                matches!(
1552                    first_word.as_str(),
1553                    "CREATE" | "INSERT" | "UPDATE" | "DROP" | "ALTER" | "WITH"
1554                ),
1555                "SCHEMA produced non-DDL statement starting with {first_word:?}: {stmt:?}"
1556            );
1557        }
1558    }
1559}