Skip to main content

assay_workflow/store/
postgres.rs

1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9    name            TEXT PRIMARY KEY,
10    created_at      DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13    VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14    ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17    id              TEXT PRIMARY KEY,
18    namespace       TEXT NOT NULL DEFAULT 'main',
19    run_id          TEXT NOT NULL,
20    workflow_type   TEXT NOT NULL,
21    task_queue      TEXT NOT NULL DEFAULT 'main',
22    status          TEXT NOT NULL DEFAULT 'PENDING',
23    input           TEXT,
24    result          TEXT,
25    error           TEXT,
26    parent_id       TEXT,
27    claimed_by      TEXT,
28    search_attributes TEXT,
29    archived_at     DOUBLE PRECISION,
30    archive_uri     TEXT,
31    -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
32    needs_dispatch  BOOLEAN NOT NULL DEFAULT FALSE,
33    dispatch_claimed_by    TEXT,
34    dispatch_last_heartbeat DOUBLE PRECISION,
35    created_at      DOUBLE PRECISION NOT NULL,
36    updated_at      DOUBLE PRECISION NOT NULL,
37    completed_at    DOUBLE PRECISION
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44    id              BIGSERIAL PRIMARY KEY,
45    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
46    seq             INTEGER NOT NULL,
47    event_type      TEXT NOT NULL,
48    payload         TEXT,
49    timestamp       DOUBLE PRECISION NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54    id              BIGSERIAL PRIMARY KEY,
55    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
56    seq             INTEGER NOT NULL,
57    name            TEXT NOT NULL,
58    task_queue      TEXT NOT NULL DEFAULT 'main',
59    input           TEXT,
60    status          TEXT NOT NULL DEFAULT 'PENDING',
61    result          TEXT,
62    error           TEXT,
63    attempt         INTEGER NOT NULL DEFAULT 1,
64    max_attempts    INTEGER NOT NULL DEFAULT 3,
65    initial_interval_secs   DOUBLE PRECISION NOT NULL DEFAULT 1,
66    backoff_coefficient     DOUBLE PRECISION NOT NULL DEFAULT 2,
67    start_to_close_secs     DOUBLE PRECISION NOT NULL DEFAULT 300,
68    heartbeat_timeout_secs  DOUBLE PRECISION,
69    claimed_by      TEXT,
70    scheduled_at    DOUBLE PRECISION NOT NULL,
71    started_at      DOUBLE PRECISION,
72    completed_at    DOUBLE PRECISION,
73    last_heartbeat  DOUBLE PRECISION,
74    UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79    id              BIGSERIAL PRIMARY KEY,
80    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
81    seq             INTEGER NOT NULL,
82    fire_at         DOUBLE PRECISION NOT NULL,
83    fired           BOOLEAN NOT NULL DEFAULT FALSE,
84    UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89    id              BIGSERIAL PRIMARY KEY,
90    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
91    name            TEXT NOT NULL,
92    payload         TEXT,
93    consumed        BOOLEAN NOT NULL DEFAULT FALSE,
94    received_at     DOUBLE PRECISION NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99    namespace       TEXT NOT NULL DEFAULT 'main',
100    name            TEXT NOT NULL,
101    workflow_type   TEXT NOT NULL,
102    cron_expr       TEXT NOT NULL,
103    timezone        TEXT NOT NULL DEFAULT 'UTC',
104    input           TEXT,
105    task_queue      TEXT NOT NULL DEFAULT 'main',
106    overlap_policy  TEXT NOT NULL DEFAULT 'skip',
107    paused          BOOLEAN NOT NULL DEFAULT FALSE,
108    last_run_at     DOUBLE PRECISION,
109    next_run_at     DOUBLE PRECISION,
110    last_workflow_id TEXT,
111    created_at      DOUBLE PRECISION NOT NULL,
112    PRIMARY KEY (namespace, name)
113);
114
115CREATE TABLE IF NOT EXISTS workflow_workers (
116    id              TEXT PRIMARY KEY,
117    namespace       TEXT NOT NULL DEFAULT 'main',
118    identity        TEXT NOT NULL,
119    task_queue      TEXT NOT NULL,
120    workflows       TEXT,
121    activities      TEXT,
122    max_concurrent_workflows  INTEGER NOT NULL DEFAULT 10,
123    max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
124    active_tasks    INTEGER NOT NULL DEFAULT 0,
125    last_heartbeat  DOUBLE PRECISION NOT NULL,
126    registered_at   DOUBLE PRECISION NOT NULL
127);
128
129CREATE TABLE IF NOT EXISTS workflow_snapshots (
130    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
131    event_seq       INTEGER NOT NULL,
132    state_json      TEXT NOT NULL,
133    created_at      DOUBLE PRECISION NOT NULL,
134    PRIMARY KEY (workflow_id, event_seq)
135);
136
137CREATE TABLE IF NOT EXISTS api_keys (
138    key_hash        TEXT PRIMARY KEY,
139    prefix          TEXT NOT NULL,
140    label           TEXT,
141    created_at      DOUBLE PRECISION NOT NULL
142);
143CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
144
145-- Future additive migrations go below this line. Postgres supports
146-- `ADD COLUMN IF NOT EXISTS` natively, so the pattern is simply:
147--
148--   ALTER TABLE workflows ADD COLUMN IF NOT EXISTS some_new_field TEXT;
149--
150-- Idempotent across startups; fresh installs pick the column up from the
151-- CREATE TABLE above so the ADD is a no-op. Currently no pending
152-- migrations — baseline schema in CREATE TABLE statements above is the
153-- source of truth through v0.11.3.
154"#;
155
156/// Split a Postgres DDL script into individual statements ready for `sqlx::query`.
157///
158/// Drops pure-comment lines (those starting with `--` after optional whitespace)
159/// *before* splitting on `;`. Without this step, a semicolon inside a line comment
160/// (e.g. `-- Idempotent across startups; fresh installs pick the column up`) would
161/// split the surrounding comment into fragments — one of which is naked prose that
162/// Postgres tries to parse as SQL and rejects with `syntax error at or near "<word>"`.
163///
164/// The filter only drops *pure-comment* lines (leading whitespace then `--`), leaving
165/// `--`-after-code untouched. That keeps string literals safe (could legally contain
166/// `--`) and is conservative enough to remain correct if the SCHEMA grows more prose.
167fn sanitise_schema(schema: &str) -> Vec<String> {
168    let without_comments: String = schema
169        .lines()
170        .filter(|line| !line.trim_start().starts_with("--"))
171        .collect::<Vec<_>>()
172        .join("\n");
173
174    without_comments
175        .split(';')
176        .map(|s| s.trim().to_string())
177        .filter(|s| !s.is_empty())
178        .collect()
179}
180
181pub struct PostgresStore {
182    pool: PgPool,
183}
184
185impl PostgresStore {
186    pub async fn new(url: &str) -> Result<Self> {
187        let pool = PgPool::connect(url).await?;
188        let store = Self { pool };
189        store.migrate().await?;
190        Ok(store)
191    }
192
193    async fn migrate(&self) -> Result<()> {
194        for statement in sanitise_schema(SCHEMA) {
195            sqlx::query(&statement).execute(&self.pool).await?;
196        }
197        Ok(())
198    }
199
200    /// Try to acquire pg_advisory_lock for leader election.
201    /// Returns true if this instance is the leader (scheduler should run).
202    pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
203        let row: (bool,) =
204            sqlx::query_as("SELECT pg_try_advisory_lock(1)")
205                .fetch_one(&self.pool)
206                .await?;
207        Ok(row.0)
208    }
209}
210
211impl WorkflowStore for PostgresStore {
212    // ── Namespaces ─────────────────────────────────────────
213
214    async fn create_namespace(&self, name: &str) -> Result<()> {
215        sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
216            .bind(name)
217            .execute(&self.pool)
218            .await?;
219        Ok(())
220    }
221
222    async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
223        let rows = sqlx::query_as::<_, (String, f64)>(
224            "SELECT name, created_at FROM namespaces ORDER BY name",
225        )
226        .fetch_all(&self.pool)
227        .await?;
228        Ok(rows
229            .into_iter()
230            .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
231            .collect())
232    }
233
234    async fn delete_namespace(&self, name: &str) -> Result<bool> {
235        let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
236            .bind(name)
237            .execute(&self.pool)
238            .await?;
239        Ok(res.rows_affected() > 0)
240    }
241
242    async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
243        let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
244            .bind(namespace)
245            .fetch_one(&self.pool)
246            .await?;
247        let running: (i64,) = sqlx::query_as(
248            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
249        )
250        .bind(namespace)
251        .fetch_one(&self.pool)
252        .await?;
253        let pending: (i64,) = sqlx::query_as(
254            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
255        )
256        .bind(namespace)
257        .fetch_one(&self.pool)
258        .await?;
259        let completed: (i64,) = sqlx::query_as(
260            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
261        )
262        .bind(namespace)
263        .fetch_one(&self.pool)
264        .await?;
265        let failed: (i64,) = sqlx::query_as(
266            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
267        )
268        .bind(namespace)
269        .fetch_one(&self.pool)
270        .await?;
271        let schedules: (i64,) =
272            sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
273                .bind(namespace)
274                .fetch_one(&self.pool)
275                .await?;
276        let workers: (i64,) =
277            sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
278                .bind(namespace)
279                .fetch_one(&self.pool)
280                .await?;
281
282        Ok(crate::store::NamespaceStats {
283            namespace: namespace.to_string(),
284            total_workflows: total.0,
285            running: running.0,
286            pending: pending.0,
287            completed: completed.0,
288            failed: failed.0,
289            schedules: schedules.0,
290            workers: workers.0,
291        })
292    }
293
294    // ── Workflows ──────────────────────────────────────────
295
296    async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
297        sqlx::query(
298            "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
299             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
300        )
301        .bind(&wf.id)
302        .bind(&wf.namespace)
303        .bind(&wf.run_id)
304        .bind(&wf.workflow_type)
305        .bind(&wf.task_queue)
306        .bind(&wf.status)
307        .bind(&wf.input)
308        .bind(&wf.result)
309        .bind(&wf.error)
310        .bind(&wf.parent_id)
311        .bind(&wf.claimed_by)
312        .bind(&wf.search_attributes)
313        .bind(wf.archived_at)
314        .bind(&wf.archive_uri)
315        .bind(wf.created_at)
316        .bind(wf.updated_at)
317        .bind(wf.completed_at)
318        .execute(&self.pool)
319        .await?;
320        Ok(())
321    }
322
323    async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
324        let row = sqlx::query_as::<_, PgWorkflowRow>(
325            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
326        )
327        .bind(id)
328        .fetch_optional(&self.pool)
329        .await?;
330        Ok(row.map(Into::into))
331    }
332
333    async fn list_workflows(
334        &self,
335        namespace: &str,
336        status: Option<WorkflowStatus>,
337        workflow_type: Option<&str>,
338        search_attrs_filter: Option<&str>,
339        limit: i64,
340        offset: i64,
341    ) -> Result<Vec<WorkflowRecord>> {
342        let status_str = status.map(|s| s.to_string());
343
344        let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
345            .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
346            .and_then(|v| v.as_object().cloned())
347            .map(|m| m.into_iter().collect())
348            .unwrap_or_default();
349
350        let mut sql = String::from(
351            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
352             FROM workflows
353             WHERE namespace = $1
354               AND ($2::TEXT IS NULL OR status = $2)
355               AND ($3::TEXT IS NULL OR workflow_type = $3)",
356        );
357        // Bind placeholders for the filter follow $3; next index is 4.
358        let mut idx = 4usize;
359        for _ in &filter_pairs {
360            sql.push_str(&format!(
361                " AND (search_attributes::jsonb)->>${} = ${}",
362                idx,
363                idx + 1
364            ));
365            idx += 2;
366        }
367        sql.push_str(&format!(" ORDER BY created_at DESC LIMIT ${} OFFSET ${}", idx, idx + 1));
368
369        let mut q = sqlx::query_as::<_, PgWorkflowRow>(&sql)
370            .bind(namespace)
371            .bind(&status_str)
372            .bind(workflow_type);
373        for (key, value) in &filter_pairs {
374            q = q.bind(key.clone());
375            // JSONB ->> always returns TEXT; compare by stringified value.
376            let as_text = match value {
377                serde_json::Value::String(s) => s.clone(),
378                other => other.to_string(),
379            };
380            q = q.bind(as_text);
381        }
382        let rows = q.bind(limit).bind(offset).fetch_all(&self.pool).await?;
383        Ok(rows.into_iter().map(Into::into).collect())
384    }
385
386    async fn update_workflow_status(
387        &self,
388        id: &str,
389        status: WorkflowStatus,
390        result: Option<&str>,
391        error: Option<&str>,
392    ) -> Result<()> {
393        let now = timestamp_now();
394        let completed_at = if status.is_terminal() { Some(now) } else { None };
395        sqlx::query(
396            "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
397        )
398        .bind(status.to_string())
399        .bind(result)
400        .bind(error)
401        .bind(now)
402        .bind(completed_at)
403        .bind(id)
404        .execute(&self.pool)
405        .await?;
406        Ok(())
407    }
408
409    async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
410        let res = sqlx::query(
411            "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
412        )
413        .bind(worker_id)
414        .bind(timestamp_now())
415        .bind(id)
416        .execute(&self.pool)
417        .await?;
418        Ok(res.rows_affected() > 0)
419    }
420
421    async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
422        sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
423            .bind(workflow_id)
424            .execute(&self.pool)
425            .await?;
426        Ok(())
427    }
428
429    async fn claim_workflow_task(
430        &self,
431        task_queue: &str,
432        worker_id: &str,
433    ) -> Result<Option<WorkflowRecord>> {
434        let now = timestamp_now();
435        // Atomic claim with FOR UPDATE SKIP LOCKED so multiple engine
436        // replicas don't fight over the same workflow task.
437        let row = sqlx::query_as::<_, PgWorkflowRow>(
438            "UPDATE workflows
439             SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
440             WHERE id = (
441                SELECT id FROM workflows
442                WHERE task_queue = $3
443                  AND needs_dispatch = TRUE
444                  AND dispatch_claimed_by IS NULL
445                  AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
446                ORDER BY updated_at ASC
447                FOR UPDATE SKIP LOCKED
448                LIMIT 1
449             )
450             RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
451        )
452        .bind(worker_id)
453        .bind(now)
454        .bind(task_queue)
455        .fetch_optional(&self.pool)
456        .await?;
457        Ok(row.map(Into::into))
458    }
459
460    async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
461        sqlx::query(
462            "UPDATE workflows
463             SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
464             WHERE id = $1 AND dispatch_claimed_by = $2",
465        )
466        .bind(workflow_id)
467        .bind(worker_id)
468        .execute(&self.pool)
469        .await?;
470        Ok(())
471    }
472
473    async fn release_stale_dispatch_leases(
474        &self,
475        now: f64,
476        timeout_secs: f64,
477    ) -> Result<u64> {
478        let res = sqlx::query(
479            "UPDATE workflows
480             SET dispatch_claimed_by = NULL,
481                 dispatch_last_heartbeat = NULL,
482                 needs_dispatch = TRUE
483             WHERE dispatch_claimed_by IS NOT NULL
484               AND ($1 - dispatch_last_heartbeat) > $2
485               AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
486        )
487        .bind(now)
488        .bind(timeout_secs)
489        .execute(&self.pool)
490        .await?;
491        Ok(res.rows_affected())
492    }
493
494    // ── Events ─────────────────────────────────────────────
495
496    async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
497        let row: (i64,) = sqlx::query_as(
498            "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
499        )
500        .bind(&ev.workflow_id)
501        .bind(ev.seq)
502        .bind(&ev.event_type)
503        .bind(&ev.payload)
504        .bind(ev.timestamp)
505        .fetch_one(&self.pool)
506        .await?;
507        Ok(row.0)
508    }
509
510    async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
511        let rows = sqlx::query_as::<_, PgEventRow>(
512            "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
513        )
514        .bind(workflow_id)
515        .fetch_all(&self.pool)
516        .await?;
517        Ok(rows.into_iter().map(Into::into).collect())
518    }
519
520    async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
521        let row: (i64,) =
522            sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
523                .bind(workflow_id)
524                .fetch_one(&self.pool)
525                .await?;
526        Ok(row.0)
527    }
528
529    // ── Activities ──────────────────────────────────────────
530
531    async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
532        let row: (i64,) = sqlx::query_as(
533            "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
534             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
535        )
536        .bind(&act.workflow_id)
537        .bind(act.seq)
538        .bind(&act.name)
539        .bind(&act.task_queue)
540        .bind(&act.input)
541        .bind(&act.status)
542        .bind(act.attempt)
543        .bind(act.max_attempts)
544        .bind(act.initial_interval_secs)
545        .bind(act.backoff_coefficient)
546        .bind(act.start_to_close_secs)
547        .bind(act.heartbeat_timeout_secs)
548        .bind(act.scheduled_at)
549        .fetch_one(&self.pool)
550        .await?;
551        Ok(row.0)
552    }
553
554    async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
555        let row = sqlx::query_as::<_, PgActivityRow>(
556            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
557             FROM workflow_activities WHERE id = $1",
558        )
559        .bind(id)
560        .fetch_optional(&self.pool)
561        .await?;
562        Ok(row.map(Into::into))
563    }
564
565    async fn get_activity_by_workflow_seq(
566        &self,
567        workflow_id: &str,
568        seq: i32,
569    ) -> Result<Option<WorkflowActivity>> {
570        let row = sqlx::query_as::<_, PgActivityRow>(
571            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
572             FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
573        )
574        .bind(workflow_id)
575        .bind(seq)
576        .fetch_optional(&self.pool)
577        .await?;
578        Ok(row.map(Into::into))
579    }
580
581    async fn claim_activity(
582        &self,
583        task_queue: &str,
584        worker_id: &str,
585    ) -> Result<Option<WorkflowActivity>> {
586        let now = timestamp_now();
587        // Atomic claim using FOR UPDATE SKIP LOCKED — prevents contention
588        // between multiple assay serve instances claiming the same activity
589        let row = sqlx::query_as::<_, PgActivityRow>(
590            "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
591             WHERE id = (
592                SELECT id FROM workflow_activities
593                WHERE task_queue = $3 AND status = 'PENDING'
594                ORDER BY scheduled_at ASC
595                FOR UPDATE SKIP LOCKED
596                LIMIT 1
597             )
598             RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
599        )
600        .bind(worker_id)
601        .bind(now)
602        .bind(task_queue)
603        .fetch_optional(&self.pool)
604        .await?;
605        Ok(row.map(Into::into))
606    }
607
608    async fn requeue_activity_for_retry(
609        &self,
610        id: i64,
611        next_attempt: i32,
612        next_scheduled_at: f64,
613    ) -> Result<()> {
614        sqlx::query(
615            "UPDATE workflow_activities
616             SET status = 'PENDING', attempt = $1, scheduled_at = $2,
617                 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
618                 error = NULL
619             WHERE id = $3",
620        )
621        .bind(next_attempt)
622        .bind(next_scheduled_at)
623        .bind(id)
624        .execute(&self.pool)
625        .await?;
626        Ok(())
627    }
628
629    async fn complete_activity(
630        &self,
631        id: i64,
632        result: Option<&str>,
633        error: Option<&str>,
634        failed: bool,
635    ) -> Result<()> {
636        let status = if failed { "FAILED" } else { "COMPLETED" };
637        sqlx::query(
638            "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
639        )
640        .bind(status)
641        .bind(result)
642        .bind(error)
643        .bind(timestamp_now())
644        .bind(id)
645        .execute(&self.pool)
646        .await?;
647        Ok(())
648    }
649
650    async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
651        sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
652            .bind(timestamp_now())
653            .bind(id)
654            .execute(&self.pool)
655            .await?;
656        Ok(())
657    }
658
659    async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
660        let rows = sqlx::query_as::<_, PgActivityRow>(
661            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
662             FROM workflow_activities
663             WHERE status = 'RUNNING'
664               AND heartbeat_timeout_secs IS NOT NULL
665               AND last_heartbeat IS NOT NULL
666               AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
667        )
668        .bind(now)
669        .fetch_all(&self.pool)
670        .await?;
671        Ok(rows.into_iter().map(Into::into).collect())
672    }
673
674    // ── Timers ──────────────────────────────────────────────
675
676    async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
677        let row: (i64,) = sqlx::query_as(
678            "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
679        )
680        .bind(&timer.workflow_id)
681        .bind(timer.seq)
682        .bind(timer.fire_at)
683        .fetch_one(&self.pool)
684        .await?;
685        Ok(row.0)
686    }
687
688    async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
689        let res = sqlx::query(
690            "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
691             WHERE workflow_id = $2 AND status = 'PENDING'",
692        )
693        .bind(timestamp_now())
694        .bind(workflow_id)
695        .execute(&self.pool)
696        .await?;
697        Ok(res.rows_affected())
698    }
699
700    async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
701        let res = sqlx::query(
702            "UPDATE workflow_timers SET fired = TRUE
703             WHERE workflow_id = $1 AND fired = FALSE",
704        )
705        .bind(workflow_id)
706        .execute(&self.pool)
707        .await?;
708        Ok(res.rows_affected())
709    }
710
711    async fn get_timer_by_workflow_seq(
712        &self,
713        workflow_id: &str,
714        seq: i32,
715    ) -> Result<Option<WorkflowTimer>> {
716        let row = sqlx::query_as::<_, PgTimerRow>(
717            "SELECT id, workflow_id, seq, fire_at, fired
718             FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
719        )
720        .bind(workflow_id)
721        .bind(seq)
722        .fetch_optional(&self.pool)
723        .await?;
724        Ok(row.map(Into::into))
725    }
726
727    async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
728        let rows = sqlx::query_as::<_, PgTimerRow>(
729            "UPDATE workflow_timers SET fired = TRUE
730             WHERE fired = FALSE AND fire_at <= $1
731             RETURNING id, workflow_id, seq, fire_at, fired",
732        )
733        .bind(now)
734        .fetch_all(&self.pool)
735        .await?;
736        Ok(rows.into_iter().map(Into::into).collect())
737    }
738
739    // ── Signals ─────────────────────────────────────────────
740
741    async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
742        let row: (i64,) = sqlx::query_as(
743            "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
744        )
745        .bind(&sig.workflow_id)
746        .bind(&sig.name)
747        .bind(&sig.payload)
748        .bind(sig.received_at)
749        .fetch_one(&self.pool)
750        .await?;
751        Ok(row.0)
752    }
753
754    async fn consume_signals(
755        &self,
756        workflow_id: &str,
757        name: &str,
758    ) -> Result<Vec<WorkflowSignal>> {
759        let rows = sqlx::query_as::<_, PgSignalRow>(
760            "UPDATE workflow_signals SET consumed = TRUE
761             WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
762             RETURNING id, workflow_id, name, payload, consumed, received_at",
763        )
764        .bind(workflow_id)
765        .bind(name)
766        .fetch_all(&self.pool)
767        .await?;
768        Ok(rows.into_iter().map(Into::into).collect())
769    }
770
771    // ── Schedules ───────────────────────────────────────────
772
773    async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
774        sqlx::query(
775            "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
776             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
777        )
778        .bind(&sched.namespace)
779        .bind(&sched.name)
780        .bind(&sched.workflow_type)
781        .bind(&sched.cron_expr)
782        .bind(&sched.timezone)
783        .bind(&sched.input)
784        .bind(&sched.task_queue)
785        .bind(&sched.overlap_policy)
786        .bind(sched.paused)
787        .bind(sched.last_run_at)
788        .bind(sched.next_run_at)
789        .bind(&sched.last_workflow_id)
790        .bind(sched.created_at)
791        .execute(&self.pool)
792        .await?;
793        Ok(())
794    }
795
796    async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
797        let row = sqlx::query_as::<_, PgScheduleRow>(
798            "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
799        )
800        .bind(namespace)
801        .bind(name)
802        .fetch_optional(&self.pool)
803        .await?;
804        Ok(row.map(Into::into))
805    }
806
807    async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
808        let rows = sqlx::query_as::<_, PgScheduleRow>(
809            "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
810        )
811        .bind(namespace)
812        .fetch_all(&self.pool)
813        .await?;
814        Ok(rows.into_iter().map(Into::into).collect())
815    }
816
817    async fn update_schedule_last_run(
818        &self,
819        namespace: &str,
820        name: &str,
821        last_run_at: f64,
822        next_run_at: f64,
823        workflow_id: &str,
824    ) -> Result<()> {
825        sqlx::query(
826            "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
827        )
828        .bind(last_run_at)
829        .bind(next_run_at)
830        .bind(workflow_id)
831        .bind(namespace)
832        .bind(name)
833        .execute(&self.pool)
834        .await?;
835        Ok(())
836    }
837
838    async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
839        let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
840            .bind(namespace)
841            .bind(name)
842            .execute(&self.pool)
843            .await?;
844        Ok(res.rows_affected() > 0)
845    }
846
847    async fn list_archivable_workflows(
848        &self,
849        cutoff: f64,
850        limit: i64,
851    ) -> Result<Vec<WorkflowRecord>> {
852        let rows = sqlx::query_as::<_, PgWorkflowRow>(
853            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
854             FROM workflows
855             WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
856               AND completed_at IS NOT NULL
857               AND completed_at < $1
858               AND archived_at IS NULL
859             ORDER BY completed_at ASC
860             LIMIT $2",
861        )
862        .bind(cutoff)
863        .bind(limit)
864        .fetch_all(&self.pool)
865        .await?;
866        Ok(rows.into_iter().map(Into::into).collect())
867    }
868
869    async fn mark_archived_and_purge(
870        &self,
871        workflow_id: &str,
872        archive_uri: &str,
873        archived_at: f64,
874    ) -> Result<()> {
875        let mut tx = self.pool.begin().await?;
876        sqlx::query("DELETE FROM workflow_events WHERE workflow_id = $1")
877            .bind(workflow_id)
878            .execute(&mut *tx)
879            .await?;
880        sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = $1")
881            .bind(workflow_id)
882            .execute(&mut *tx)
883            .await?;
884        sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = $1")
885            .bind(workflow_id)
886            .execute(&mut *tx)
887            .await?;
888        sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = $1")
889            .bind(workflow_id)
890            .execute(&mut *tx)
891            .await?;
892        sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = $1")
893            .bind(workflow_id)
894            .execute(&mut *tx)
895            .await?;
896        sqlx::query(
897            "UPDATE workflows SET archived_at = $1, archive_uri = $2 WHERE id = $3",
898        )
899        .bind(archived_at)
900        .bind(archive_uri)
901        .bind(workflow_id)
902        .execute(&mut *tx)
903        .await?;
904        tx.commit().await?;
905        Ok(())
906    }
907
908    async fn upsert_search_attributes(
909        &self,
910        workflow_id: &str,
911        patch_json: &str,
912    ) -> Result<()> {
913        let current: Option<(Option<String>,)> =
914            sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = $1")
915                .bind(workflow_id)
916                .fetch_optional(&self.pool)
917                .await?;
918        let merged = crate::store::sqlite::merge_search_attrs(
919            current.and_then(|(s,)| s).as_deref(),
920            patch_json,
921        )?;
922        sqlx::query("UPDATE workflows SET search_attributes = $1 WHERE id = $2")
923            .bind(merged)
924            .bind(workflow_id)
925            .execute(&self.pool)
926            .await?;
927        Ok(())
928    }
929
930    async fn update_schedule(
931        &self,
932        namespace: &str,
933        name: &str,
934        patch: &SchedulePatch,
935    ) -> Result<Option<WorkflowSchedule>> {
936        let mut sets: Vec<String> = Vec::new();
937        let mut idx = 1usize;
938        if patch.cron_expr.is_some() {
939            sets.push(format!("cron_expr = ${idx}"));
940            idx += 1;
941        }
942        if patch.timezone.is_some() {
943            sets.push(format!("timezone = ${idx}"));
944            idx += 1;
945        }
946        if patch.input.is_some() {
947            sets.push(format!("input = ${idx}"));
948            idx += 1;
949        }
950        if patch.task_queue.is_some() {
951            sets.push(format!("task_queue = ${idx}"));
952            idx += 1;
953        }
954        if patch.overlap_policy.is_some() {
955            sets.push(format!("overlap_policy = ${idx}"));
956            idx += 1;
957        }
958        if sets.is_empty() {
959            return self.get_schedule(namespace, name).await;
960        }
961        let sql = format!(
962            "UPDATE workflow_schedules SET {} WHERE namespace = ${} AND name = ${}",
963            sets.join(", "),
964            idx,
965            idx + 1
966        );
967        let mut q = sqlx::query(&sql);
968        if let Some(ref v) = patch.cron_expr {
969            q = q.bind(v);
970        }
971        if let Some(ref v) = patch.timezone {
972            q = q.bind(v);
973        }
974        if let Some(ref v) = patch.input {
975            q = q.bind(v.to_string());
976        }
977        if let Some(ref v) = patch.task_queue {
978            q = q.bind(v);
979        }
980        if let Some(ref v) = patch.overlap_policy {
981            q = q.bind(v);
982        }
983        let res = q
984            .bind(namespace)
985            .bind(name)
986            .execute(&self.pool)
987            .await?;
988        if res.rows_affected() == 0 {
989            return Ok(None);
990        }
991        self.get_schedule(namespace, name).await
992    }
993
994    async fn set_schedule_paused(
995        &self,
996        namespace: &str,
997        name: &str,
998        paused: bool,
999    ) -> Result<Option<WorkflowSchedule>> {
1000        let res = sqlx::query(
1001            "UPDATE workflow_schedules SET paused = $1 WHERE namespace = $2 AND name = $3",
1002        )
1003        .bind(paused)
1004        .bind(namespace)
1005        .bind(name)
1006        .execute(&self.pool)
1007        .await?;
1008        if res.rows_affected() == 0 {
1009            return Ok(None);
1010        }
1011        self.get_schedule(namespace, name).await
1012    }
1013
1014    // ── Workers ─────────────────────────────────────────────
1015
1016    async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1017        sqlx::query(
1018            "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1019             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1020             ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
1021        )
1022        .bind(&w.id)
1023        .bind(&w.namespace)
1024        .bind(&w.identity)
1025        .bind(&w.task_queue)
1026        .bind(&w.workflows)
1027        .bind(&w.activities)
1028        .bind(w.max_concurrent_workflows)
1029        .bind(w.max_concurrent_activities)
1030        .bind(w.active_tasks)
1031        .bind(w.last_heartbeat)
1032        .bind(w.registered_at)
1033        .execute(&self.pool)
1034        .await?;
1035        Ok(())
1036    }
1037
1038    async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1039        sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
1040            .bind(now)
1041            .bind(id)
1042            .execute(&self.pool)
1043            .await?;
1044        Ok(())
1045    }
1046
1047    async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1048        let rows = sqlx::query_as::<_, PgWorkerRow>(
1049            "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
1050        )
1051        .bind(namespace)
1052        .fetch_all(&self.pool)
1053        .await?;
1054        Ok(rows.into_iter().map(Into::into).collect())
1055    }
1056
1057    async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1058        let rows: Vec<(String,)> =
1059            sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
1060                .bind(cutoff)
1061                .fetch_all(&self.pool)
1062                .await?;
1063        let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1064        if !ids.is_empty() {
1065            sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
1066                .bind(cutoff)
1067                .execute(&self.pool)
1068                .await?;
1069        }
1070        Ok(ids)
1071    }
1072
1073    // ── API Keys ────────────────────────────────────────────
1074
1075    async fn create_api_key(
1076        &self,
1077        key_hash: &str,
1078        prefix: &str,
1079        label: Option<&str>,
1080        created_at: f64,
1081    ) -> Result<()> {
1082        sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
1083            .bind(key_hash)
1084            .bind(prefix)
1085            .bind(label)
1086            .bind(created_at)
1087            .execute(&self.pool)
1088            .await?;
1089        Ok(())
1090    }
1091
1092    async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1093        let row: Option<(i64,)> =
1094            sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
1095                .bind(key_hash)
1096                .fetch_optional(&self.pool)
1097                .await?;
1098        Ok(row.is_some())
1099    }
1100
1101    async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
1102        let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1103            "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1104        )
1105        .fetch_all(&self.pool)
1106        .await?;
1107        Ok(rows
1108            .into_iter()
1109            .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1110                prefix,
1111                label,
1112                created_at,
1113            })
1114            .collect())
1115    }
1116
1117    async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1118        let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
1119            .bind(prefix)
1120            .execute(&self.pool)
1121            .await?;
1122        Ok(res.rows_affected() > 0)
1123    }
1124
1125    // ── Child Workflows ─────────────────────────────────────
1126
1127    async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1128        let rows = sqlx::query_as::<_, PgWorkflowRow>(
1129            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1130             FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
1131        )
1132        .bind(parent_id)
1133        .fetch_all(&self.pool)
1134        .await?;
1135        Ok(rows.into_iter().map(Into::into).collect())
1136    }
1137
1138    // ── Snapshots ───────────────────────────────────────────
1139
1140    async fn create_snapshot(
1141        &self,
1142        workflow_id: &str,
1143        event_seq: i32,
1144        state_json: &str,
1145    ) -> Result<()> {
1146        sqlx::query(
1147            "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1148             VALUES ($1, $2, $3, $4)
1149             ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
1150        )
1151        .bind(workflow_id)
1152        .bind(event_seq)
1153        .bind(state_json)
1154        .bind(timestamp_now())
1155        .execute(&self.pool)
1156        .await?;
1157        Ok(())
1158    }
1159
1160    async fn get_latest_snapshot(
1161        &self,
1162        workflow_id: &str,
1163    ) -> Result<Option<WorkflowSnapshot>> {
1164        let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1165            "SELECT workflow_id, event_seq, state_json, created_at
1166             FROM workflow_snapshots WHERE workflow_id = $1
1167             ORDER BY event_seq DESC LIMIT 1",
1168        )
1169        .bind(workflow_id)
1170        .fetch_optional(&self.pool)
1171        .await?;
1172
1173        Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1174            workflow_id,
1175            event_seq,
1176            state_json,
1177            created_at,
1178        }))
1179    }
1180
1181    // ── Queue Stats ─────────────────────────────────────────
1182
1183    async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1184        let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
1185            "SELECT
1186                a.task_queue AS queue,
1187                SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
1188                SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
1189                (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
1190             FROM workflow_activities a
1191             JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
1192             GROUP BY a.task_queue",
1193        )
1194        .bind(namespace)
1195        .fetch_all(&self.pool)
1196        .await?;
1197
1198        Ok(rows
1199            .into_iter()
1200            .map(|(queue, pending, running, workers)| crate::store::QueueStats {
1201                queue,
1202                pending_activities: pending,
1203                running_activities: running,
1204                workers,
1205            })
1206            .collect())
1207    }
1208
1209    // ── Leader Election ─────────────────────────────────────
1210
1211    async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1212        // pg_try_advisory_lock is session-scoped — only one connection
1213        // in the pool will hold the lock. In a multi-replica Kubernetes
1214        // deployment, only one pod's connection wins.
1215        let row: (bool,) =
1216            sqlx::query_as("SELECT pg_try_advisory_lock(42)")
1217                .fetch_one(&self.pool)
1218                .await?;
1219        Ok(row.0)
1220    }
1221}
1222
1223fn timestamp_now() -> f64 {
1224    std::time::SystemTime::now()
1225        .duration_since(std::time::UNIX_EPOCH)
1226        .unwrap()
1227        .as_secs_f64()
1228}
1229
1230// ── Postgres row types (sqlx::FromRow) ──────────────────────
1231
1232#[derive(sqlx::FromRow)]
1233struct PgWorkflowRow {
1234    id: String,
1235    namespace: String,
1236    run_id: String,
1237    workflow_type: String,
1238    task_queue: String,
1239    status: String,
1240    input: Option<String>,
1241    result: Option<String>,
1242    error: Option<String>,
1243    parent_id: Option<String>,
1244    claimed_by: Option<String>,
1245    search_attributes: Option<String>,
1246    archived_at: Option<f64>,
1247    archive_uri: Option<String>,
1248    created_at: f64,
1249    updated_at: f64,
1250    completed_at: Option<f64>,
1251}
1252
1253impl From<PgWorkflowRow> for WorkflowRecord {
1254    fn from(r: PgWorkflowRow) -> Self {
1255        Self {
1256            id: r.id,
1257            namespace: r.namespace,
1258            run_id: r.run_id,
1259            workflow_type: r.workflow_type,
1260            task_queue: r.task_queue,
1261            status: r.status,
1262            input: r.input,
1263            result: r.result,
1264            error: r.error,
1265            parent_id: r.parent_id,
1266            claimed_by: r.claimed_by,
1267            search_attributes: r.search_attributes,
1268            archived_at: r.archived_at,
1269            archive_uri: r.archive_uri,
1270            created_at: r.created_at,
1271            updated_at: r.updated_at,
1272            completed_at: r.completed_at,
1273        }
1274    }
1275}
1276
1277#[derive(sqlx::FromRow)]
1278struct PgEventRow {
1279    id: i64,
1280    workflow_id: String,
1281    seq: i32,
1282    event_type: String,
1283    payload: Option<String>,
1284    timestamp: f64,
1285}
1286
1287impl From<PgEventRow> for WorkflowEvent {
1288    fn from(r: PgEventRow) -> Self {
1289        Self {
1290            id: Some(r.id),
1291            workflow_id: r.workflow_id,
1292            seq: r.seq,
1293            event_type: r.event_type,
1294            payload: r.payload,
1295            timestamp: r.timestamp,
1296        }
1297    }
1298}
1299
1300#[derive(sqlx::FromRow)]
1301struct PgActivityRow {
1302    id: i64,
1303    workflow_id: String,
1304    seq: i32,
1305    name: String,
1306    task_queue: String,
1307    input: Option<String>,
1308    status: String,
1309    result: Option<String>,
1310    error: Option<String>,
1311    attempt: i32,
1312    max_attempts: i32,
1313    initial_interval_secs: f64,
1314    backoff_coefficient: f64,
1315    start_to_close_secs: f64,
1316    heartbeat_timeout_secs: Option<f64>,
1317    claimed_by: Option<String>,
1318    scheduled_at: f64,
1319    started_at: Option<f64>,
1320    completed_at: Option<f64>,
1321    last_heartbeat: Option<f64>,
1322}
1323
1324impl From<PgActivityRow> for WorkflowActivity {
1325    fn from(r: PgActivityRow) -> Self {
1326        Self {
1327            id: Some(r.id),
1328            workflow_id: r.workflow_id,
1329            seq: r.seq,
1330            name: r.name,
1331            task_queue: r.task_queue,
1332            input: r.input,
1333            status: r.status,
1334            result: r.result,
1335            error: r.error,
1336            attempt: r.attempt,
1337            max_attempts: r.max_attempts,
1338            initial_interval_secs: r.initial_interval_secs,
1339            backoff_coefficient: r.backoff_coefficient,
1340            start_to_close_secs: r.start_to_close_secs,
1341            heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1342            claimed_by: r.claimed_by,
1343            scheduled_at: r.scheduled_at,
1344            started_at: r.started_at,
1345            completed_at: r.completed_at,
1346            last_heartbeat: r.last_heartbeat,
1347        }
1348    }
1349}
1350
1351#[derive(sqlx::FromRow)]
1352struct PgTimerRow {
1353    id: i64,
1354    workflow_id: String,
1355    seq: i32,
1356    fire_at: f64,
1357    fired: bool,
1358}
1359
1360impl From<PgTimerRow> for WorkflowTimer {
1361    fn from(r: PgTimerRow) -> Self {
1362        Self {
1363            id: Some(r.id),
1364            workflow_id: r.workflow_id,
1365            seq: r.seq,
1366            fire_at: r.fire_at,
1367            fired: r.fired,
1368        }
1369    }
1370}
1371
1372#[derive(sqlx::FromRow)]
1373struct PgSignalRow {
1374    id: i64,
1375    workflow_id: String,
1376    name: String,
1377    payload: Option<String>,
1378    consumed: bool,
1379    received_at: f64,
1380}
1381
1382impl From<PgSignalRow> for WorkflowSignal {
1383    fn from(r: PgSignalRow) -> Self {
1384        Self {
1385            id: Some(r.id),
1386            workflow_id: r.workflow_id,
1387            name: r.name,
1388            payload: r.payload,
1389            consumed: r.consumed,
1390            received_at: r.received_at,
1391        }
1392    }
1393}
1394
1395#[derive(sqlx::FromRow)]
1396struct PgScheduleRow {
1397    namespace: String,
1398    name: String,
1399    workflow_type: String,
1400    cron_expr: String,
1401    timezone: String,
1402    input: Option<String>,
1403    task_queue: String,
1404    overlap_policy: String,
1405    paused: bool,
1406    last_run_at: Option<f64>,
1407    next_run_at: Option<f64>,
1408    last_workflow_id: Option<String>,
1409    created_at: f64,
1410}
1411
1412impl From<PgScheduleRow> for WorkflowSchedule {
1413    fn from(r: PgScheduleRow) -> Self {
1414        Self {
1415            namespace: r.namespace,
1416            name: r.name,
1417            workflow_type: r.workflow_type,
1418            cron_expr: r.cron_expr,
1419            timezone: r.timezone,
1420            input: r.input,
1421            task_queue: r.task_queue,
1422            overlap_policy: r.overlap_policy,
1423            paused: r.paused,
1424            last_run_at: r.last_run_at,
1425            next_run_at: r.next_run_at,
1426            last_workflow_id: r.last_workflow_id,
1427            created_at: r.created_at,
1428        }
1429    }
1430}
1431
1432#[derive(sqlx::FromRow)]
1433struct PgWorkerRow {
1434    id: String,
1435    namespace: String,
1436    identity: String,
1437    task_queue: String,
1438    workflows: Option<String>,
1439    activities: Option<String>,
1440    max_concurrent_workflows: i32,
1441    max_concurrent_activities: i32,
1442    active_tasks: i32,
1443    last_heartbeat: f64,
1444    registered_at: f64,
1445}
1446
1447impl From<PgWorkerRow> for WorkflowWorker {
1448    fn from(r: PgWorkerRow) -> Self {
1449        Self {
1450            id: r.id,
1451            namespace: r.namespace,
1452            identity: r.identity,
1453            task_queue: r.task_queue,
1454            workflows: r.workflows,
1455            activities: r.activities,
1456            max_concurrent_workflows: r.max_concurrent_workflows,
1457            max_concurrent_activities: r.max_concurrent_activities,
1458            active_tasks: r.active_tasks,
1459            last_heartbeat: r.last_heartbeat,
1460            registered_at: r.registered_at,
1461        }
1462    }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467    use super::*;
1468
1469    #[test]
1470    fn sanitise_schema_keeps_statements_intact() {
1471        let input = "CREATE TABLE foo (x INT);\nCREATE INDEX idx_foo ON foo(x);\n";
1472        let out = sanitise_schema(input);
1473        assert_eq!(out.len(), 2);
1474        assert!(out[0].starts_with("CREATE TABLE foo"));
1475        assert!(out[1].starts_with("CREATE INDEX idx_foo"));
1476    }
1477
1478    #[test]
1479    fn sanitise_schema_drops_pure_comment_lines() {
1480        let input = "-- header comment\nCREATE TABLE foo (x INT);\n-- trailing comment\n";
1481        let out = sanitise_schema(input);
1482        assert_eq!(out.len(), 1);
1483        assert!(out[0].starts_with("CREATE TABLE foo"));
1484    }
1485
1486    #[test]
1487    fn sanitise_schema_ignores_semicolons_inside_comment_prose() {
1488        // Regression: the exact shape that broke v0.11.3–v0.11.5 in production.
1489        // `-- foo; bar` used to split into "foo" and " bar" fragments, the second
1490        // of which was executed as SQL and rejected with `syntax error at or near "bar"`.
1491        let input = "\
1492CREATE TABLE foo (x INT);
1493-- Idempotent across startups; fresh installs pick the column up from the
1494-- CREATE TABLE above so the ADD is a no-op.
1495";
1496        let out = sanitise_schema(input);
1497        assert_eq!(
1498            out.len(),
1499            1,
1500            "expected 1 real statement, got {}: {:?}",
1501            out.len(),
1502            out
1503        );
1504        assert!(out[0].starts_with("CREATE TABLE foo"));
1505    }
1506
1507    #[test]
1508    fn sanitise_schema_drops_indented_comment_lines() {
1509        let input = "  -- indented comment\n\tCREATE TABLE foo (x INT);\n";
1510        let out = sanitise_schema(input);
1511        assert_eq!(out.len(), 1);
1512        assert!(out[0].contains("CREATE TABLE foo"));
1513    }
1514
1515    #[test]
1516    fn sanitise_schema_real_constant_produces_only_ddl() {
1517        // The real SCHEMA constant must not produce any statement whose first
1518        // token isn't a recognised SQL keyword. A prose fragment leaking in
1519        // (e.g. "fresh installs...") means the filter regressed.
1520        for stmt in sanitise_schema(SCHEMA) {
1521            let first_word = stmt
1522                .split_whitespace()
1523                .next()
1524                .expect("non-empty statement")
1525                .to_uppercase();
1526            assert!(
1527                matches!(
1528                    first_word.as_str(),
1529                    "CREATE" | "INSERT" | "UPDATE" | "DROP" | "ALTER" | "WITH"
1530                ),
1531                "SCHEMA produced non-DDL statement starting with {first_word:?}: {stmt:?}"
1532            );
1533        }
1534    }
1535}