Skip to main content

assay_workflow/store/
postgres.rs

1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9    name            TEXT PRIMARY KEY,
10    created_at      DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13    VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14    ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17    id              TEXT PRIMARY KEY,
18    namespace       TEXT NOT NULL DEFAULT 'main',
19    run_id          TEXT NOT NULL,
20    workflow_type   TEXT NOT NULL,
21    task_queue      TEXT NOT NULL DEFAULT 'main',
22    status          TEXT NOT NULL DEFAULT 'PENDING',
23    input           TEXT,
24    result          TEXT,
25    error           TEXT,
26    parent_id       TEXT,
27    claimed_by      TEXT,
28    search_attributes TEXT,
29    archived_at     DOUBLE PRECISION,
30    archive_uri     TEXT,
31    -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
32    needs_dispatch  BOOLEAN NOT NULL DEFAULT FALSE,
33    dispatch_claimed_by    TEXT,
34    dispatch_last_heartbeat DOUBLE PRECISION,
35    created_at      DOUBLE PRECISION NOT NULL,
36    updated_at      DOUBLE PRECISION NOT NULL,
37    completed_at    DOUBLE PRECISION
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44    id              BIGSERIAL PRIMARY KEY,
45    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
46    seq             INTEGER NOT NULL,
47    event_type      TEXT NOT NULL,
48    payload         TEXT,
49    timestamp       DOUBLE PRECISION NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54    id              BIGSERIAL PRIMARY KEY,
55    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
56    seq             INTEGER NOT NULL,
57    name            TEXT NOT NULL,
58    task_queue      TEXT NOT NULL DEFAULT 'main',
59    input           TEXT,
60    status          TEXT NOT NULL DEFAULT 'PENDING',
61    result          TEXT,
62    error           TEXT,
63    attempt         INTEGER NOT NULL DEFAULT 1,
64    max_attempts    INTEGER NOT NULL DEFAULT 3,
65    initial_interval_secs   DOUBLE PRECISION NOT NULL DEFAULT 1,
66    backoff_coefficient     DOUBLE PRECISION NOT NULL DEFAULT 2,
67    start_to_close_secs     DOUBLE PRECISION NOT NULL DEFAULT 300,
68    heartbeat_timeout_secs  DOUBLE PRECISION,
69    claimed_by      TEXT,
70    scheduled_at    DOUBLE PRECISION NOT NULL,
71    started_at      DOUBLE PRECISION,
72    completed_at    DOUBLE PRECISION,
73    last_heartbeat  DOUBLE PRECISION,
74    UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79    id              BIGSERIAL PRIMARY KEY,
80    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
81    seq             INTEGER NOT NULL,
82    fire_at         DOUBLE PRECISION NOT NULL,
83    fired           BOOLEAN NOT NULL DEFAULT FALSE,
84    UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89    id              BIGSERIAL PRIMARY KEY,
90    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
91    name            TEXT NOT NULL,
92    payload         TEXT,
93    consumed        BOOLEAN NOT NULL DEFAULT FALSE,
94    received_at     DOUBLE PRECISION NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99    namespace       TEXT NOT NULL DEFAULT 'main',
100    name            TEXT NOT NULL,
101    workflow_type   TEXT NOT NULL,
102    cron_expr       TEXT NOT NULL,
103    timezone        TEXT NOT NULL DEFAULT 'UTC',
104    input           TEXT,
105    task_queue      TEXT NOT NULL DEFAULT 'main',
106    overlap_policy  TEXT NOT NULL DEFAULT 'skip',
107    paused          BOOLEAN NOT NULL DEFAULT FALSE,
108    last_run_at     DOUBLE PRECISION,
109    next_run_at     DOUBLE PRECISION,
110    last_workflow_id TEXT,
111    created_at      DOUBLE PRECISION NOT NULL,
112    PRIMARY KEY (namespace, name)
113);
114
115CREATE TABLE IF NOT EXISTS workflow_workers (
116    id              TEXT PRIMARY KEY,
117    namespace       TEXT NOT NULL DEFAULT 'main',
118    identity        TEXT NOT NULL,
119    task_queue      TEXT NOT NULL,
120    workflows       TEXT,
121    activities      TEXT,
122    max_concurrent_workflows  INTEGER NOT NULL DEFAULT 10,
123    max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
124    active_tasks    INTEGER NOT NULL DEFAULT 0,
125    last_heartbeat  DOUBLE PRECISION NOT NULL,
126    registered_at   DOUBLE PRECISION NOT NULL
127);
128
129CREATE TABLE IF NOT EXISTS workflow_snapshots (
130    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
131    event_seq       INTEGER NOT NULL,
132    state_json      TEXT NOT NULL,
133    created_at      DOUBLE PRECISION NOT NULL,
134    PRIMARY KEY (workflow_id, event_seq)
135);
136
137CREATE TABLE IF NOT EXISTS api_keys (
138    key_hash        TEXT PRIMARY KEY,
139    prefix          TEXT NOT NULL,
140    label           TEXT,
141    created_at      DOUBLE PRECISION NOT NULL
142);
143CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
144
145-- Future additive migrations go below this line. Postgres supports
146-- `ADD COLUMN IF NOT EXISTS` natively, so the pattern is simply:
147--
148--   ALTER TABLE workflows ADD COLUMN IF NOT EXISTS some_new_field TEXT;
149--
150-- Idempotent across startups; fresh installs pick the column up from the
151-- CREATE TABLE above so the ADD is a no-op. Currently no pending
152-- migrations — baseline schema in CREATE TABLE statements above is the
153-- source of truth through v0.11.3.
154"#;
155
156pub struct PostgresStore {
157    pool: PgPool,
158}
159
160impl PostgresStore {
161    pub async fn new(url: &str) -> Result<Self> {
162        let pool = PgPool::connect(url).await?;
163        let store = Self { pool };
164        store.migrate().await?;
165        Ok(store)
166    }
167
168    async fn migrate(&self) -> Result<()> {
169        // Execute each statement separately — Postgres handles CREATE IF NOT EXISTS
170        for statement in SCHEMA.split(';') {
171            let trimmed = statement.trim();
172            if !trimmed.is_empty() {
173                sqlx::query(trimmed).execute(&self.pool).await?;
174            }
175        }
176        Ok(())
177    }
178
179    /// Try to acquire pg_advisory_lock for leader election.
180    /// Returns true if this instance is the leader (scheduler should run).
181    pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
182        let row: (bool,) =
183            sqlx::query_as("SELECT pg_try_advisory_lock(1)")
184                .fetch_one(&self.pool)
185                .await?;
186        Ok(row.0)
187    }
188}
189
190impl WorkflowStore for PostgresStore {
191    // ── Namespaces ─────────────────────────────────────────
192
193    async fn create_namespace(&self, name: &str) -> Result<()> {
194        sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
195            .bind(name)
196            .execute(&self.pool)
197            .await?;
198        Ok(())
199    }
200
201    async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
202        let rows = sqlx::query_as::<_, (String, f64)>(
203            "SELECT name, created_at FROM namespaces ORDER BY name",
204        )
205        .fetch_all(&self.pool)
206        .await?;
207        Ok(rows
208            .into_iter()
209            .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
210            .collect())
211    }
212
213    async fn delete_namespace(&self, name: &str) -> Result<bool> {
214        let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
215            .bind(name)
216            .execute(&self.pool)
217            .await?;
218        Ok(res.rows_affected() > 0)
219    }
220
221    async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
222        let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
223            .bind(namespace)
224            .fetch_one(&self.pool)
225            .await?;
226        let running: (i64,) = sqlx::query_as(
227            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
228        )
229        .bind(namespace)
230        .fetch_one(&self.pool)
231        .await?;
232        let pending: (i64,) = sqlx::query_as(
233            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
234        )
235        .bind(namespace)
236        .fetch_one(&self.pool)
237        .await?;
238        let completed: (i64,) = sqlx::query_as(
239            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
240        )
241        .bind(namespace)
242        .fetch_one(&self.pool)
243        .await?;
244        let failed: (i64,) = sqlx::query_as(
245            "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
246        )
247        .bind(namespace)
248        .fetch_one(&self.pool)
249        .await?;
250        let schedules: (i64,) =
251            sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
252                .bind(namespace)
253                .fetch_one(&self.pool)
254                .await?;
255        let workers: (i64,) =
256            sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
257                .bind(namespace)
258                .fetch_one(&self.pool)
259                .await?;
260
261        Ok(crate::store::NamespaceStats {
262            namespace: namespace.to_string(),
263            total_workflows: total.0,
264            running: running.0,
265            pending: pending.0,
266            completed: completed.0,
267            failed: failed.0,
268            schedules: schedules.0,
269            workers: workers.0,
270        })
271    }
272
273    // ── Workflows ──────────────────────────────────────────
274
275    async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
276        sqlx::query(
277            "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
278             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
279        )
280        .bind(&wf.id)
281        .bind(&wf.namespace)
282        .bind(&wf.run_id)
283        .bind(&wf.workflow_type)
284        .bind(&wf.task_queue)
285        .bind(&wf.status)
286        .bind(&wf.input)
287        .bind(&wf.result)
288        .bind(&wf.error)
289        .bind(&wf.parent_id)
290        .bind(&wf.claimed_by)
291        .bind(&wf.search_attributes)
292        .bind(wf.archived_at)
293        .bind(&wf.archive_uri)
294        .bind(wf.created_at)
295        .bind(wf.updated_at)
296        .bind(wf.completed_at)
297        .execute(&self.pool)
298        .await?;
299        Ok(())
300    }
301
302    async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
303        let row = sqlx::query_as::<_, PgWorkflowRow>(
304            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
305        )
306        .bind(id)
307        .fetch_optional(&self.pool)
308        .await?;
309        Ok(row.map(Into::into))
310    }
311
312    async fn list_workflows(
313        &self,
314        namespace: &str,
315        status: Option<WorkflowStatus>,
316        workflow_type: Option<&str>,
317        search_attrs_filter: Option<&str>,
318        limit: i64,
319        offset: i64,
320    ) -> Result<Vec<WorkflowRecord>> {
321        let status_str = status.map(|s| s.to_string());
322
323        let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
324            .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
325            .and_then(|v| v.as_object().cloned())
326            .map(|m| m.into_iter().collect())
327            .unwrap_or_default();
328
329        let mut sql = String::from(
330            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
331             FROM workflows
332             WHERE namespace = $1
333               AND ($2::TEXT IS NULL OR status = $2)
334               AND ($3::TEXT IS NULL OR workflow_type = $3)",
335        );
336        // Bind placeholders for the filter follow $3; next index is 4.
337        let mut idx = 4usize;
338        for _ in &filter_pairs {
339            sql.push_str(&format!(
340                " AND (search_attributes::jsonb)->>${} = ${}",
341                idx,
342                idx + 1
343            ));
344            idx += 2;
345        }
346        sql.push_str(&format!(" ORDER BY created_at DESC LIMIT ${} OFFSET ${}", idx, idx + 1));
347
348        let mut q = sqlx::query_as::<_, PgWorkflowRow>(&sql)
349            .bind(namespace)
350            .bind(&status_str)
351            .bind(workflow_type);
352        for (key, value) in &filter_pairs {
353            q = q.bind(key.clone());
354            // JSONB ->> always returns TEXT; compare by stringified value.
355            let as_text = match value {
356                serde_json::Value::String(s) => s.clone(),
357                other => other.to_string(),
358            };
359            q = q.bind(as_text);
360        }
361        let rows = q.bind(limit).bind(offset).fetch_all(&self.pool).await?;
362        Ok(rows.into_iter().map(Into::into).collect())
363    }
364
365    async fn update_workflow_status(
366        &self,
367        id: &str,
368        status: WorkflowStatus,
369        result: Option<&str>,
370        error: Option<&str>,
371    ) -> Result<()> {
372        let now = timestamp_now();
373        let completed_at = if status.is_terminal() { Some(now) } else { None };
374        sqlx::query(
375            "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
376        )
377        .bind(status.to_string())
378        .bind(result)
379        .bind(error)
380        .bind(now)
381        .bind(completed_at)
382        .bind(id)
383        .execute(&self.pool)
384        .await?;
385        Ok(())
386    }
387
388    async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
389        let res = sqlx::query(
390            "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
391        )
392        .bind(worker_id)
393        .bind(timestamp_now())
394        .bind(id)
395        .execute(&self.pool)
396        .await?;
397        Ok(res.rows_affected() > 0)
398    }
399
400    async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
401        sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
402            .bind(workflow_id)
403            .execute(&self.pool)
404            .await?;
405        Ok(())
406    }
407
408    async fn claim_workflow_task(
409        &self,
410        task_queue: &str,
411        worker_id: &str,
412    ) -> Result<Option<WorkflowRecord>> {
413        let now = timestamp_now();
414        // Atomic claim with FOR UPDATE SKIP LOCKED so multiple engine
415        // replicas don't fight over the same workflow task.
416        let row = sqlx::query_as::<_, PgWorkflowRow>(
417            "UPDATE workflows
418             SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
419             WHERE id = (
420                SELECT id FROM workflows
421                WHERE task_queue = $3
422                  AND needs_dispatch = TRUE
423                  AND dispatch_claimed_by IS NULL
424                  AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
425                ORDER BY updated_at ASC
426                FOR UPDATE SKIP LOCKED
427                LIMIT 1
428             )
429             RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
430        )
431        .bind(worker_id)
432        .bind(now)
433        .bind(task_queue)
434        .fetch_optional(&self.pool)
435        .await?;
436        Ok(row.map(Into::into))
437    }
438
439    async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
440        sqlx::query(
441            "UPDATE workflows
442             SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
443             WHERE id = $1 AND dispatch_claimed_by = $2",
444        )
445        .bind(workflow_id)
446        .bind(worker_id)
447        .execute(&self.pool)
448        .await?;
449        Ok(())
450    }
451
452    async fn release_stale_dispatch_leases(
453        &self,
454        now: f64,
455        timeout_secs: f64,
456    ) -> Result<u64> {
457        let res = sqlx::query(
458            "UPDATE workflows
459             SET dispatch_claimed_by = NULL,
460                 dispatch_last_heartbeat = NULL,
461                 needs_dispatch = TRUE
462             WHERE dispatch_claimed_by IS NOT NULL
463               AND ($1 - dispatch_last_heartbeat) > $2
464               AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
465        )
466        .bind(now)
467        .bind(timeout_secs)
468        .execute(&self.pool)
469        .await?;
470        Ok(res.rows_affected())
471    }
472
473    // ── Events ─────────────────────────────────────────────
474
475    async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
476        let row: (i64,) = sqlx::query_as(
477            "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
478        )
479        .bind(&ev.workflow_id)
480        .bind(ev.seq)
481        .bind(&ev.event_type)
482        .bind(&ev.payload)
483        .bind(ev.timestamp)
484        .fetch_one(&self.pool)
485        .await?;
486        Ok(row.0)
487    }
488
489    async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
490        let rows = sqlx::query_as::<_, PgEventRow>(
491            "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
492        )
493        .bind(workflow_id)
494        .fetch_all(&self.pool)
495        .await?;
496        Ok(rows.into_iter().map(Into::into).collect())
497    }
498
499    async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
500        let row: (i64,) =
501            sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
502                .bind(workflow_id)
503                .fetch_one(&self.pool)
504                .await?;
505        Ok(row.0)
506    }
507
508    // ── Activities ──────────────────────────────────────────
509
510    async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
511        let row: (i64,) = sqlx::query_as(
512            "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
513             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
514        )
515        .bind(&act.workflow_id)
516        .bind(act.seq)
517        .bind(&act.name)
518        .bind(&act.task_queue)
519        .bind(&act.input)
520        .bind(&act.status)
521        .bind(act.attempt)
522        .bind(act.max_attempts)
523        .bind(act.initial_interval_secs)
524        .bind(act.backoff_coefficient)
525        .bind(act.start_to_close_secs)
526        .bind(act.heartbeat_timeout_secs)
527        .bind(act.scheduled_at)
528        .fetch_one(&self.pool)
529        .await?;
530        Ok(row.0)
531    }
532
533    async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
534        let row = sqlx::query_as::<_, PgActivityRow>(
535            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
536             FROM workflow_activities WHERE id = $1",
537        )
538        .bind(id)
539        .fetch_optional(&self.pool)
540        .await?;
541        Ok(row.map(Into::into))
542    }
543
544    async fn get_activity_by_workflow_seq(
545        &self,
546        workflow_id: &str,
547        seq: i32,
548    ) -> Result<Option<WorkflowActivity>> {
549        let row = sqlx::query_as::<_, PgActivityRow>(
550            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
551             FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
552        )
553        .bind(workflow_id)
554        .bind(seq)
555        .fetch_optional(&self.pool)
556        .await?;
557        Ok(row.map(Into::into))
558    }
559
560    async fn claim_activity(
561        &self,
562        task_queue: &str,
563        worker_id: &str,
564    ) -> Result<Option<WorkflowActivity>> {
565        let now = timestamp_now();
566        // Atomic claim using FOR UPDATE SKIP LOCKED — prevents contention
567        // between multiple assay serve instances claiming the same activity
568        let row = sqlx::query_as::<_, PgActivityRow>(
569            "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
570             WHERE id = (
571                SELECT id FROM workflow_activities
572                WHERE task_queue = $3 AND status = 'PENDING'
573                ORDER BY scheduled_at ASC
574                FOR UPDATE SKIP LOCKED
575                LIMIT 1
576             )
577             RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
578        )
579        .bind(worker_id)
580        .bind(now)
581        .bind(task_queue)
582        .fetch_optional(&self.pool)
583        .await?;
584        Ok(row.map(Into::into))
585    }
586
587    async fn requeue_activity_for_retry(
588        &self,
589        id: i64,
590        next_attempt: i32,
591        next_scheduled_at: f64,
592    ) -> Result<()> {
593        sqlx::query(
594            "UPDATE workflow_activities
595             SET status = 'PENDING', attempt = $1, scheduled_at = $2,
596                 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
597                 error = NULL
598             WHERE id = $3",
599        )
600        .bind(next_attempt)
601        .bind(next_scheduled_at)
602        .bind(id)
603        .execute(&self.pool)
604        .await?;
605        Ok(())
606    }
607
608    async fn complete_activity(
609        &self,
610        id: i64,
611        result: Option<&str>,
612        error: Option<&str>,
613        failed: bool,
614    ) -> Result<()> {
615        let status = if failed { "FAILED" } else { "COMPLETED" };
616        sqlx::query(
617            "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
618        )
619        .bind(status)
620        .bind(result)
621        .bind(error)
622        .bind(timestamp_now())
623        .bind(id)
624        .execute(&self.pool)
625        .await?;
626        Ok(())
627    }
628
629    async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
630        sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
631            .bind(timestamp_now())
632            .bind(id)
633            .execute(&self.pool)
634            .await?;
635        Ok(())
636    }
637
638    async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
639        let rows = sqlx::query_as::<_, PgActivityRow>(
640            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
641             FROM workflow_activities
642             WHERE status = 'RUNNING'
643               AND heartbeat_timeout_secs IS NOT NULL
644               AND last_heartbeat IS NOT NULL
645               AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
646        )
647        .bind(now)
648        .fetch_all(&self.pool)
649        .await?;
650        Ok(rows.into_iter().map(Into::into).collect())
651    }
652
653    // ── Timers ──────────────────────────────────────────────
654
655    async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
656        let row: (i64,) = sqlx::query_as(
657            "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
658        )
659        .bind(&timer.workflow_id)
660        .bind(timer.seq)
661        .bind(timer.fire_at)
662        .fetch_one(&self.pool)
663        .await?;
664        Ok(row.0)
665    }
666
667    async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
668        let res = sqlx::query(
669            "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
670             WHERE workflow_id = $2 AND status = 'PENDING'",
671        )
672        .bind(timestamp_now())
673        .bind(workflow_id)
674        .execute(&self.pool)
675        .await?;
676        Ok(res.rows_affected())
677    }
678
679    async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
680        let res = sqlx::query(
681            "UPDATE workflow_timers SET fired = TRUE
682             WHERE workflow_id = $1 AND fired = FALSE",
683        )
684        .bind(workflow_id)
685        .execute(&self.pool)
686        .await?;
687        Ok(res.rows_affected())
688    }
689
690    async fn get_timer_by_workflow_seq(
691        &self,
692        workflow_id: &str,
693        seq: i32,
694    ) -> Result<Option<WorkflowTimer>> {
695        let row = sqlx::query_as::<_, PgTimerRow>(
696            "SELECT id, workflow_id, seq, fire_at, fired
697             FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
698        )
699        .bind(workflow_id)
700        .bind(seq)
701        .fetch_optional(&self.pool)
702        .await?;
703        Ok(row.map(Into::into))
704    }
705
706    async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
707        let rows = sqlx::query_as::<_, PgTimerRow>(
708            "UPDATE workflow_timers SET fired = TRUE
709             WHERE fired = FALSE AND fire_at <= $1
710             RETURNING id, workflow_id, seq, fire_at, fired",
711        )
712        .bind(now)
713        .fetch_all(&self.pool)
714        .await?;
715        Ok(rows.into_iter().map(Into::into).collect())
716    }
717
718    // ── Signals ─────────────────────────────────────────────
719
720    async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
721        let row: (i64,) = sqlx::query_as(
722            "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
723        )
724        .bind(&sig.workflow_id)
725        .bind(&sig.name)
726        .bind(&sig.payload)
727        .bind(sig.received_at)
728        .fetch_one(&self.pool)
729        .await?;
730        Ok(row.0)
731    }
732
733    async fn consume_signals(
734        &self,
735        workflow_id: &str,
736        name: &str,
737    ) -> Result<Vec<WorkflowSignal>> {
738        let rows = sqlx::query_as::<_, PgSignalRow>(
739            "UPDATE workflow_signals SET consumed = TRUE
740             WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
741             RETURNING id, workflow_id, name, payload, consumed, received_at",
742        )
743        .bind(workflow_id)
744        .bind(name)
745        .fetch_all(&self.pool)
746        .await?;
747        Ok(rows.into_iter().map(Into::into).collect())
748    }
749
750    // ── Schedules ───────────────────────────────────────────
751
752    async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
753        sqlx::query(
754            "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
755             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
756        )
757        .bind(&sched.namespace)
758        .bind(&sched.name)
759        .bind(&sched.workflow_type)
760        .bind(&sched.cron_expr)
761        .bind(&sched.timezone)
762        .bind(&sched.input)
763        .bind(&sched.task_queue)
764        .bind(&sched.overlap_policy)
765        .bind(sched.paused)
766        .bind(sched.last_run_at)
767        .bind(sched.next_run_at)
768        .bind(&sched.last_workflow_id)
769        .bind(sched.created_at)
770        .execute(&self.pool)
771        .await?;
772        Ok(())
773    }
774
775    async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
776        let row = sqlx::query_as::<_, PgScheduleRow>(
777            "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
778        )
779        .bind(namespace)
780        .bind(name)
781        .fetch_optional(&self.pool)
782        .await?;
783        Ok(row.map(Into::into))
784    }
785
786    async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
787        let rows = sqlx::query_as::<_, PgScheduleRow>(
788            "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
789        )
790        .bind(namespace)
791        .fetch_all(&self.pool)
792        .await?;
793        Ok(rows.into_iter().map(Into::into).collect())
794    }
795
796    async fn update_schedule_last_run(
797        &self,
798        namespace: &str,
799        name: &str,
800        last_run_at: f64,
801        next_run_at: f64,
802        workflow_id: &str,
803    ) -> Result<()> {
804        sqlx::query(
805            "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
806        )
807        .bind(last_run_at)
808        .bind(next_run_at)
809        .bind(workflow_id)
810        .bind(namespace)
811        .bind(name)
812        .execute(&self.pool)
813        .await?;
814        Ok(())
815    }
816
817    async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
818        let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
819            .bind(namespace)
820            .bind(name)
821            .execute(&self.pool)
822            .await?;
823        Ok(res.rows_affected() > 0)
824    }
825
826    async fn list_archivable_workflows(
827        &self,
828        cutoff: f64,
829        limit: i64,
830    ) -> Result<Vec<WorkflowRecord>> {
831        let rows = sqlx::query_as::<_, PgWorkflowRow>(
832            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
833             FROM workflows
834             WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
835               AND completed_at IS NOT NULL
836               AND completed_at < $1
837               AND archived_at IS NULL
838             ORDER BY completed_at ASC
839             LIMIT $2",
840        )
841        .bind(cutoff)
842        .bind(limit)
843        .fetch_all(&self.pool)
844        .await?;
845        Ok(rows.into_iter().map(Into::into).collect())
846    }
847
848    async fn mark_archived_and_purge(
849        &self,
850        workflow_id: &str,
851        archive_uri: &str,
852        archived_at: f64,
853    ) -> Result<()> {
854        let mut tx = self.pool.begin().await?;
855        sqlx::query("DELETE FROM workflow_events WHERE workflow_id = $1")
856            .bind(workflow_id)
857            .execute(&mut *tx)
858            .await?;
859        sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = $1")
860            .bind(workflow_id)
861            .execute(&mut *tx)
862            .await?;
863        sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = $1")
864            .bind(workflow_id)
865            .execute(&mut *tx)
866            .await?;
867        sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = $1")
868            .bind(workflow_id)
869            .execute(&mut *tx)
870            .await?;
871        sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = $1")
872            .bind(workflow_id)
873            .execute(&mut *tx)
874            .await?;
875        sqlx::query(
876            "UPDATE workflows SET archived_at = $1, archive_uri = $2 WHERE id = $3",
877        )
878        .bind(archived_at)
879        .bind(archive_uri)
880        .bind(workflow_id)
881        .execute(&mut *tx)
882        .await?;
883        tx.commit().await?;
884        Ok(())
885    }
886
887    async fn upsert_search_attributes(
888        &self,
889        workflow_id: &str,
890        patch_json: &str,
891    ) -> Result<()> {
892        let current: Option<(Option<String>,)> =
893            sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = $1")
894                .bind(workflow_id)
895                .fetch_optional(&self.pool)
896                .await?;
897        let merged = crate::store::sqlite::merge_search_attrs(
898            current.and_then(|(s,)| s).as_deref(),
899            patch_json,
900        )?;
901        sqlx::query("UPDATE workflows SET search_attributes = $1 WHERE id = $2")
902            .bind(merged)
903            .bind(workflow_id)
904            .execute(&self.pool)
905            .await?;
906        Ok(())
907    }
908
909    async fn update_schedule(
910        &self,
911        namespace: &str,
912        name: &str,
913        patch: &SchedulePatch,
914    ) -> Result<Option<WorkflowSchedule>> {
915        let mut sets: Vec<String> = Vec::new();
916        let mut idx = 1usize;
917        if patch.cron_expr.is_some() {
918            sets.push(format!("cron_expr = ${idx}"));
919            idx += 1;
920        }
921        if patch.timezone.is_some() {
922            sets.push(format!("timezone = ${idx}"));
923            idx += 1;
924        }
925        if patch.input.is_some() {
926            sets.push(format!("input = ${idx}"));
927            idx += 1;
928        }
929        if patch.task_queue.is_some() {
930            sets.push(format!("task_queue = ${idx}"));
931            idx += 1;
932        }
933        if patch.overlap_policy.is_some() {
934            sets.push(format!("overlap_policy = ${idx}"));
935            idx += 1;
936        }
937        if sets.is_empty() {
938            return self.get_schedule(namespace, name).await;
939        }
940        let sql = format!(
941            "UPDATE workflow_schedules SET {} WHERE namespace = ${} AND name = ${}",
942            sets.join(", "),
943            idx,
944            idx + 1
945        );
946        let mut q = sqlx::query(&sql);
947        if let Some(ref v) = patch.cron_expr {
948            q = q.bind(v);
949        }
950        if let Some(ref v) = patch.timezone {
951            q = q.bind(v);
952        }
953        if let Some(ref v) = patch.input {
954            q = q.bind(v.to_string());
955        }
956        if let Some(ref v) = patch.task_queue {
957            q = q.bind(v);
958        }
959        if let Some(ref v) = patch.overlap_policy {
960            q = q.bind(v);
961        }
962        let res = q
963            .bind(namespace)
964            .bind(name)
965            .execute(&self.pool)
966            .await?;
967        if res.rows_affected() == 0 {
968            return Ok(None);
969        }
970        self.get_schedule(namespace, name).await
971    }
972
973    async fn set_schedule_paused(
974        &self,
975        namespace: &str,
976        name: &str,
977        paused: bool,
978    ) -> Result<Option<WorkflowSchedule>> {
979        let res = sqlx::query(
980            "UPDATE workflow_schedules SET paused = $1 WHERE namespace = $2 AND name = $3",
981        )
982        .bind(paused)
983        .bind(namespace)
984        .bind(name)
985        .execute(&self.pool)
986        .await?;
987        if res.rows_affected() == 0 {
988            return Ok(None);
989        }
990        self.get_schedule(namespace, name).await
991    }
992
993    // ── Workers ─────────────────────────────────────────────
994
995    async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
996        sqlx::query(
997            "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
998             VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
999             ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
1000        )
1001        .bind(&w.id)
1002        .bind(&w.namespace)
1003        .bind(&w.identity)
1004        .bind(&w.task_queue)
1005        .bind(&w.workflows)
1006        .bind(&w.activities)
1007        .bind(w.max_concurrent_workflows)
1008        .bind(w.max_concurrent_activities)
1009        .bind(w.active_tasks)
1010        .bind(w.last_heartbeat)
1011        .bind(w.registered_at)
1012        .execute(&self.pool)
1013        .await?;
1014        Ok(())
1015    }
1016
1017    async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1018        sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
1019            .bind(now)
1020            .bind(id)
1021            .execute(&self.pool)
1022            .await?;
1023        Ok(())
1024    }
1025
1026    async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1027        let rows = sqlx::query_as::<_, PgWorkerRow>(
1028            "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
1029        )
1030        .bind(namespace)
1031        .fetch_all(&self.pool)
1032        .await?;
1033        Ok(rows.into_iter().map(Into::into).collect())
1034    }
1035
1036    async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1037        let rows: Vec<(String,)> =
1038            sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
1039                .bind(cutoff)
1040                .fetch_all(&self.pool)
1041                .await?;
1042        let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1043        if !ids.is_empty() {
1044            sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
1045                .bind(cutoff)
1046                .execute(&self.pool)
1047                .await?;
1048        }
1049        Ok(ids)
1050    }
1051
1052    // ── API Keys ────────────────────────────────────────────
1053
1054    async fn create_api_key(
1055        &self,
1056        key_hash: &str,
1057        prefix: &str,
1058        label: Option<&str>,
1059        created_at: f64,
1060    ) -> Result<()> {
1061        sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
1062            .bind(key_hash)
1063            .bind(prefix)
1064            .bind(label)
1065            .bind(created_at)
1066            .execute(&self.pool)
1067            .await?;
1068        Ok(())
1069    }
1070
1071    async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1072        let row: Option<(i64,)> =
1073            sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
1074                .bind(key_hash)
1075                .fetch_optional(&self.pool)
1076                .await?;
1077        Ok(row.is_some())
1078    }
1079
1080    async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
1081        let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1082            "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1083        )
1084        .fetch_all(&self.pool)
1085        .await?;
1086        Ok(rows
1087            .into_iter()
1088            .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1089                prefix,
1090                label,
1091                created_at,
1092            })
1093            .collect())
1094    }
1095
1096    async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1097        let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
1098            .bind(prefix)
1099            .execute(&self.pool)
1100            .await?;
1101        Ok(res.rows_affected() > 0)
1102    }
1103
1104    // ── Child Workflows ─────────────────────────────────────
1105
1106    async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1107        let rows = sqlx::query_as::<_, PgWorkflowRow>(
1108            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1109             FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
1110        )
1111        .bind(parent_id)
1112        .fetch_all(&self.pool)
1113        .await?;
1114        Ok(rows.into_iter().map(Into::into).collect())
1115    }
1116
1117    // ── Snapshots ───────────────────────────────────────────
1118
1119    async fn create_snapshot(
1120        &self,
1121        workflow_id: &str,
1122        event_seq: i32,
1123        state_json: &str,
1124    ) -> Result<()> {
1125        sqlx::query(
1126            "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1127             VALUES ($1, $2, $3, $4)
1128             ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
1129        )
1130        .bind(workflow_id)
1131        .bind(event_seq)
1132        .bind(state_json)
1133        .bind(timestamp_now())
1134        .execute(&self.pool)
1135        .await?;
1136        Ok(())
1137    }
1138
1139    async fn get_latest_snapshot(
1140        &self,
1141        workflow_id: &str,
1142    ) -> Result<Option<WorkflowSnapshot>> {
1143        let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1144            "SELECT workflow_id, event_seq, state_json, created_at
1145             FROM workflow_snapshots WHERE workflow_id = $1
1146             ORDER BY event_seq DESC LIMIT 1",
1147        )
1148        .bind(workflow_id)
1149        .fetch_optional(&self.pool)
1150        .await?;
1151
1152        Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1153            workflow_id,
1154            event_seq,
1155            state_json,
1156            created_at,
1157        }))
1158    }
1159
1160    // ── Queue Stats ─────────────────────────────────────────
1161
1162    async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1163        let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
1164            "SELECT
1165                a.task_queue AS queue,
1166                SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
1167                SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
1168                (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
1169             FROM workflow_activities a
1170             JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
1171             GROUP BY a.task_queue",
1172        )
1173        .bind(namespace)
1174        .fetch_all(&self.pool)
1175        .await?;
1176
1177        Ok(rows
1178            .into_iter()
1179            .map(|(queue, pending, running, workers)| crate::store::QueueStats {
1180                queue,
1181                pending_activities: pending,
1182                running_activities: running,
1183                workers,
1184            })
1185            .collect())
1186    }
1187
1188    // ── Leader Election ─────────────────────────────────────
1189
1190    async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1191        // pg_try_advisory_lock is session-scoped — only one connection
1192        // in the pool will hold the lock. In a multi-replica Kubernetes
1193        // deployment, only one pod's connection wins.
1194        let row: (bool,) =
1195            sqlx::query_as("SELECT pg_try_advisory_lock(42)")
1196                .fetch_one(&self.pool)
1197                .await?;
1198        Ok(row.0)
1199    }
1200}
1201
1202fn timestamp_now() -> f64 {
1203    std::time::SystemTime::now()
1204        .duration_since(std::time::UNIX_EPOCH)
1205        .unwrap()
1206        .as_secs_f64()
1207}
1208
1209// ── Postgres row types (sqlx::FromRow) ──────────────────────
1210
1211#[derive(sqlx::FromRow)]
1212struct PgWorkflowRow {
1213    id: String,
1214    namespace: String,
1215    run_id: String,
1216    workflow_type: String,
1217    task_queue: String,
1218    status: String,
1219    input: Option<String>,
1220    result: Option<String>,
1221    error: Option<String>,
1222    parent_id: Option<String>,
1223    claimed_by: Option<String>,
1224    search_attributes: Option<String>,
1225    archived_at: Option<f64>,
1226    archive_uri: Option<String>,
1227    created_at: f64,
1228    updated_at: f64,
1229    completed_at: Option<f64>,
1230}
1231
1232impl From<PgWorkflowRow> for WorkflowRecord {
1233    fn from(r: PgWorkflowRow) -> Self {
1234        Self {
1235            id: r.id,
1236            namespace: r.namespace,
1237            run_id: r.run_id,
1238            workflow_type: r.workflow_type,
1239            task_queue: r.task_queue,
1240            status: r.status,
1241            input: r.input,
1242            result: r.result,
1243            error: r.error,
1244            parent_id: r.parent_id,
1245            claimed_by: r.claimed_by,
1246            search_attributes: r.search_attributes,
1247            archived_at: r.archived_at,
1248            archive_uri: r.archive_uri,
1249            created_at: r.created_at,
1250            updated_at: r.updated_at,
1251            completed_at: r.completed_at,
1252        }
1253    }
1254}
1255
1256#[derive(sqlx::FromRow)]
1257struct PgEventRow {
1258    id: i64,
1259    workflow_id: String,
1260    seq: i32,
1261    event_type: String,
1262    payload: Option<String>,
1263    timestamp: f64,
1264}
1265
1266impl From<PgEventRow> for WorkflowEvent {
1267    fn from(r: PgEventRow) -> Self {
1268        Self {
1269            id: Some(r.id),
1270            workflow_id: r.workflow_id,
1271            seq: r.seq,
1272            event_type: r.event_type,
1273            payload: r.payload,
1274            timestamp: r.timestamp,
1275        }
1276    }
1277}
1278
1279#[derive(sqlx::FromRow)]
1280struct PgActivityRow {
1281    id: i64,
1282    workflow_id: String,
1283    seq: i32,
1284    name: String,
1285    task_queue: String,
1286    input: Option<String>,
1287    status: String,
1288    result: Option<String>,
1289    error: Option<String>,
1290    attempt: i32,
1291    max_attempts: i32,
1292    initial_interval_secs: f64,
1293    backoff_coefficient: f64,
1294    start_to_close_secs: f64,
1295    heartbeat_timeout_secs: Option<f64>,
1296    claimed_by: Option<String>,
1297    scheduled_at: f64,
1298    started_at: Option<f64>,
1299    completed_at: Option<f64>,
1300    last_heartbeat: Option<f64>,
1301}
1302
1303impl From<PgActivityRow> for WorkflowActivity {
1304    fn from(r: PgActivityRow) -> Self {
1305        Self {
1306            id: Some(r.id),
1307            workflow_id: r.workflow_id,
1308            seq: r.seq,
1309            name: r.name,
1310            task_queue: r.task_queue,
1311            input: r.input,
1312            status: r.status,
1313            result: r.result,
1314            error: r.error,
1315            attempt: r.attempt,
1316            max_attempts: r.max_attempts,
1317            initial_interval_secs: r.initial_interval_secs,
1318            backoff_coefficient: r.backoff_coefficient,
1319            start_to_close_secs: r.start_to_close_secs,
1320            heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1321            claimed_by: r.claimed_by,
1322            scheduled_at: r.scheduled_at,
1323            started_at: r.started_at,
1324            completed_at: r.completed_at,
1325            last_heartbeat: r.last_heartbeat,
1326        }
1327    }
1328}
1329
1330#[derive(sqlx::FromRow)]
1331struct PgTimerRow {
1332    id: i64,
1333    workflow_id: String,
1334    seq: i32,
1335    fire_at: f64,
1336    fired: bool,
1337}
1338
1339impl From<PgTimerRow> for WorkflowTimer {
1340    fn from(r: PgTimerRow) -> Self {
1341        Self {
1342            id: Some(r.id),
1343            workflow_id: r.workflow_id,
1344            seq: r.seq,
1345            fire_at: r.fire_at,
1346            fired: r.fired,
1347        }
1348    }
1349}
1350
1351#[derive(sqlx::FromRow)]
1352struct PgSignalRow {
1353    id: i64,
1354    workflow_id: String,
1355    name: String,
1356    payload: Option<String>,
1357    consumed: bool,
1358    received_at: f64,
1359}
1360
1361impl From<PgSignalRow> for WorkflowSignal {
1362    fn from(r: PgSignalRow) -> Self {
1363        Self {
1364            id: Some(r.id),
1365            workflow_id: r.workflow_id,
1366            name: r.name,
1367            payload: r.payload,
1368            consumed: r.consumed,
1369            received_at: r.received_at,
1370        }
1371    }
1372}
1373
1374#[derive(sqlx::FromRow)]
1375struct PgScheduleRow {
1376    namespace: String,
1377    name: String,
1378    workflow_type: String,
1379    cron_expr: String,
1380    timezone: String,
1381    input: Option<String>,
1382    task_queue: String,
1383    overlap_policy: String,
1384    paused: bool,
1385    last_run_at: Option<f64>,
1386    next_run_at: Option<f64>,
1387    last_workflow_id: Option<String>,
1388    created_at: f64,
1389}
1390
1391impl From<PgScheduleRow> for WorkflowSchedule {
1392    fn from(r: PgScheduleRow) -> Self {
1393        Self {
1394            namespace: r.namespace,
1395            name: r.name,
1396            workflow_type: r.workflow_type,
1397            cron_expr: r.cron_expr,
1398            timezone: r.timezone,
1399            input: r.input,
1400            task_queue: r.task_queue,
1401            overlap_policy: r.overlap_policy,
1402            paused: r.paused,
1403            last_run_at: r.last_run_at,
1404            next_run_at: r.next_run_at,
1405            last_workflow_id: r.last_workflow_id,
1406            created_at: r.created_at,
1407        }
1408    }
1409}
1410
1411#[derive(sqlx::FromRow)]
1412struct PgWorkerRow {
1413    id: String,
1414    namespace: String,
1415    identity: String,
1416    task_queue: String,
1417    workflows: Option<String>,
1418    activities: Option<String>,
1419    max_concurrent_workflows: i32,
1420    max_concurrent_activities: i32,
1421    active_tasks: i32,
1422    last_heartbeat: f64,
1423    registered_at: f64,
1424}
1425
1426impl From<PgWorkerRow> for WorkflowWorker {
1427    fn from(r: PgWorkerRow) -> Self {
1428        Self {
1429            id: r.id,
1430            namespace: r.namespace,
1431            identity: r.identity,
1432            task_queue: r.task_queue,
1433            workflows: r.workflows,
1434            activities: r.activities,
1435            max_concurrent_workflows: r.max_concurrent_workflows,
1436            max_concurrent_activities: r.max_concurrent_activities,
1437            active_tasks: r.active_tasks,
1438            last_heartbeat: r.last_heartbeat,
1439            registered_at: r.registered_at,
1440        }
1441    }
1442}