Skip to main content

assay_workflow/store/
sqlite.rs

1use anyhow::Result;
2use sqlx::SqlitePool;
3
4use crate::store::{ApiKeyRecord, NamespaceRecord, NamespaceStats, QueueStats, WorkflowStore};
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9    name            TEXT PRIMARY KEY,
10    created_at      REAL NOT NULL
11);
12
13INSERT OR IGNORE INTO namespaces (name, created_at)
14    VALUES ('main', strftime('%s', 'now'));
15
16CREATE TABLE IF NOT EXISTS workflows (
17    id              TEXT PRIMARY KEY,
18    namespace       TEXT NOT NULL DEFAULT 'main',
19    run_id          TEXT NOT NULL,
20    workflow_type   TEXT NOT NULL,
21    task_queue      TEXT NOT NULL DEFAULT 'main',
22    status          TEXT NOT NULL DEFAULT 'PENDING',
23    input           TEXT,
24    result          TEXT,
25    error           TEXT,
26    parent_id       TEXT,
27    claimed_by      TEXT,
28    search_attributes TEXT,
29    archived_at     REAL,
30    archive_uri     TEXT,
31    -- Workflow-task dispatch (Phase 9): a workflow is "dispatchable" when
32    -- it has new events a worker needs to replay against. Set true on
33    -- start, on activity completion, on timer fire, on signal arrival.
34    -- Cleared when a worker claims the dispatch lease.
35    needs_dispatch  INTEGER NOT NULL DEFAULT 0,
36    dispatch_claimed_by    TEXT,
37    dispatch_last_heartbeat REAL,
38    created_at      REAL NOT NULL,
39    updated_at      REAL NOT NULL,
40    completed_at    REAL
41);
42CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
43CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
44CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
45
46CREATE TABLE IF NOT EXISTS workflow_events (
47    id              INTEGER PRIMARY KEY AUTOINCREMENT,
48    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
49    seq             INTEGER NOT NULL,
50    event_type      TEXT NOT NULL,
51    payload         TEXT,
52    timestamp       REAL NOT NULL
53);
54CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
55
56CREATE TABLE IF NOT EXISTS workflow_activities (
57    id              INTEGER PRIMARY KEY AUTOINCREMENT,
58    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
59    seq             INTEGER NOT NULL,
60    name            TEXT NOT NULL,
61    task_queue      TEXT NOT NULL DEFAULT 'main',
62    input           TEXT,
63    status          TEXT NOT NULL DEFAULT 'PENDING',
64    result          TEXT,
65    error           TEXT,
66    attempt         INTEGER NOT NULL DEFAULT 1,
67    max_attempts    INTEGER NOT NULL DEFAULT 3,
68    initial_interval_secs   REAL NOT NULL DEFAULT 1,
69    backoff_coefficient     REAL NOT NULL DEFAULT 2,
70    start_to_close_secs     REAL NOT NULL DEFAULT 300,
71    heartbeat_timeout_secs  REAL,
72    claimed_by      TEXT,
73    scheduled_at    REAL NOT NULL,
74    started_at      REAL,
75    completed_at    REAL,
76    last_heartbeat  REAL,
77    UNIQUE (workflow_id, seq)
78);
79CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
80
81CREATE TABLE IF NOT EXISTS workflow_timers (
82    id              INTEGER PRIMARY KEY AUTOINCREMENT,
83    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
84    seq             INTEGER NOT NULL,
85    fire_at         REAL NOT NULL,
86    fired           INTEGER NOT NULL DEFAULT 0,
87    UNIQUE (workflow_id, seq)
88);
89CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at);
90
91CREATE TABLE IF NOT EXISTS workflow_signals (
92    id              INTEGER PRIMARY KEY AUTOINCREMENT,
93    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
94    name            TEXT NOT NULL,
95    payload         TEXT,
96    consumed        INTEGER NOT NULL DEFAULT 0,
97    received_at     REAL NOT NULL
98);
99CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
100
101CREATE TABLE IF NOT EXISTS workflow_schedules (
102    name            TEXT NOT NULL,
103    namespace       TEXT NOT NULL DEFAULT 'main',
104    workflow_type   TEXT NOT NULL,
105    cron_expr       TEXT NOT NULL,
106    timezone        TEXT NOT NULL DEFAULT 'UTC',
107    input           TEXT,
108    task_queue      TEXT NOT NULL DEFAULT 'main',
109    overlap_policy  TEXT NOT NULL DEFAULT 'skip',
110    paused          INTEGER NOT NULL DEFAULT 0,
111    last_run_at     REAL,
112    next_run_at     REAL,
113    last_workflow_id TEXT,
114    created_at      REAL NOT NULL,
115    PRIMARY KEY (namespace, name)
116);
117
118CREATE TABLE IF NOT EXISTS workflow_workers (
119    id              TEXT PRIMARY KEY,
120    namespace       TEXT NOT NULL DEFAULT 'main',
121    identity        TEXT NOT NULL,
122    task_queue      TEXT NOT NULL,
123    workflows       TEXT,
124    activities      TEXT,
125    max_concurrent_workflows  INTEGER NOT NULL DEFAULT 10,
126    max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
127    active_tasks    INTEGER NOT NULL DEFAULT 0,
128    last_heartbeat  REAL NOT NULL,
129    registered_at   REAL NOT NULL
130);
131
132CREATE TABLE IF NOT EXISTS workflow_snapshots (
133    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
134    event_seq       INTEGER NOT NULL,
135    state_json      TEXT NOT NULL,
136    created_at      REAL NOT NULL,
137    PRIMARY KEY (workflow_id, event_seq)
138);
139
140CREATE TABLE IF NOT EXISTS api_keys (
141    key_hash        TEXT PRIMARY KEY,
142    prefix          TEXT NOT NULL,
143    label           TEXT,
144    created_at      REAL NOT NULL
145);
146CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
147
148CREATE TABLE IF NOT EXISTS engine_lock (
149    id              INTEGER PRIMARY KEY CHECK (id = 1),
150    instance_id     TEXT NOT NULL,
151    started_at      REAL NOT NULL,
152    last_heartbeat  REAL NOT NULL
153);
154"#;
155
156/// Stale lock timeout — if the lock holder hasn't heartbeated in this
157/// many seconds, assume it's dead and allow takeover.
158const LOCK_STALE_SECS: f64 = 60.0;
159/// How often to refresh the lock heartbeat.
160const LOCK_HEARTBEAT_SECS: u64 = 15;
161
162pub struct SqliteStore {
163    pool: SqlitePool,
164    instance_id: String,
165}
166
167impl SqliteStore {
168    pub async fn new(url: &str) -> Result<Self> {
169        let pool = SqlitePool::connect(url).await?;
170        let instance_id = format!("assay-{:016x}", {
171            use std::collections::hash_map::DefaultHasher;
172            use std::hash::{Hash, Hasher};
173            let mut h = DefaultHasher::new();
174            std::time::SystemTime::now().hash(&mut h);
175            std::process::id().hash(&mut h);
176            h.finish()
177        });
178        let store = Self { pool, instance_id };
179        store.migrate().await?;
180        Ok(store)
181    }
182
183    /// Acquire the single-instance engine lock.
184    /// Returns an error if another instance is already running.
185    pub async fn acquire_engine_lock(&self) -> Result<()> {
186        let now = timestamp_now();
187
188        // Try to insert the lock
189        let result = sqlx::query(
190            "INSERT INTO engine_lock (id, instance_id, started_at, last_heartbeat) VALUES (1, ?, ?, ?)",
191        )
192        .bind(&self.instance_id)
193        .bind(now)
194        .bind(now)
195        .execute(&self.pool)
196        .await;
197
198        match result {
199            Ok(_) => Ok(()),
200            Err(_) => {
201                // Lock exists — check if it's stale
202                let row: Option<(String, f64)> = sqlx::query_as(
203                    "SELECT instance_id, last_heartbeat FROM engine_lock WHERE id = 1",
204                )
205                .fetch_optional(&self.pool)
206                .await?;
207
208                if let Some((existing_id, last_hb)) = row {
209                    if now - last_hb > LOCK_STALE_SECS {
210                        // Stale lock — take over
211                        sqlx::query(
212                            "UPDATE engine_lock SET instance_id = ?, started_at = ?, last_heartbeat = ? WHERE id = 1",
213                        )
214                        .bind(&self.instance_id)
215                        .bind(now)
216                        .bind(now)
217                        .execute(&self.pool)
218                        .await?;
219                        tracing::warn!(
220                            "Took over stale engine lock from {existing_id} (last heartbeat {:.0}s ago)",
221                            now - last_hb
222                        );
223                        Ok(())
224                    } else {
225                        let age = now - last_hb;
226                        anyhow::bail!(
227                            "Another assay engine instance is already running (id: {existing_id}, \
228                             last heartbeat {age:.0}s ago).\n\n\
229                             SQLite only supports a single engine instance. For multi-instance \
230                             deployment (Kubernetes, Docker Swarm), use PostgreSQL:\n\n\
231                             \x20 assay serve --backend postgres://user:pass@host:5432/dbname"
232                        );
233                    }
234                } else {
235                    anyhow::bail!("Unexpected engine lock state");
236                }
237            }
238        }
239    }
240
241    /// Refresh the engine lock heartbeat. Called periodically by the engine.
242    pub async fn refresh_engine_lock(&self) -> Result<()> {
243        sqlx::query("UPDATE engine_lock SET last_heartbeat = ? WHERE id = 1 AND instance_id = ?")
244            .bind(timestamp_now())
245            .bind(&self.instance_id)
246            .execute(&self.pool)
247            .await?;
248        Ok(())
249    }
250
251    /// Release the engine lock on shutdown.
252    pub async fn release_engine_lock(&self) -> Result<()> {
253        sqlx::query("DELETE FROM engine_lock WHERE id = 1 AND instance_id = ?")
254            .bind(&self.instance_id)
255            .execute(&self.pool)
256            .await?;
257        Ok(())
258    }
259
260    /// Start background task to keep the lock alive.
261    pub fn spawn_lock_heartbeat(self: &std::sync::Arc<Self>) {
262        let store = std::sync::Arc::clone(self);
263        tokio::spawn(async move {
264            let mut tick = tokio::time::interval(std::time::Duration::from_secs(LOCK_HEARTBEAT_SECS));
265            loop {
266                tick.tick().await;
267                if let Err(e) = store.refresh_engine_lock().await {
268                    tracing::error!("Engine lock heartbeat failed: {e}");
269                }
270            }
271        });
272    }
273
274    /// Apply the baseline schema.
275    ///
276    /// Fresh installs get the current `CREATE TABLE IF NOT EXISTS` statements
277    /// in one pass. The engine is pre-1.0 and no v0.11.x release has been
278    /// deployed against a real workload yet, so we don't carry
279    /// `ALTER TABLE ADD COLUMN` statements for historical columns — the
280    /// baseline is the source of truth.
281    ///
282    /// For **future** additive migrations (post-v0.11.3), call
283    /// `Self::add_column_if_missing(&self.pool, "<table>", "<column>",
284    /// "<type_def>").await?` here before returning. Kept around so adding a
285    /// column later is a one-liner.
286    async fn migrate(&self) -> Result<()> {
287        for statement in SCHEMA.split(';') {
288            let trimmed = statement.trim();
289            if !trimmed.is_empty() {
290                sqlx::query(trimmed).execute(&self.pool).await?;
291            }
292        }
293        // Future additive migrations go here; see doc-comment above.
294        Ok(())
295    }
296
297    /// Add a column to an existing table if it's not already there.
298    ///
299    /// SQLite (unlike Postgres) doesn't support `ADD COLUMN IF NOT EXISTS`,
300    /// so we check via `pragma_table_info` before issuing the ALTER. Each
301    /// call is idempotent across startups.
302    ///
303    /// Currently unused — kept as the documented pattern for the first
304    /// additive migration after v0.11.3. Remove `#[allow(dead_code)]` when
305    /// a caller is added.
306    #[allow(dead_code)]
307    async fn add_column_if_missing(
308        pool: &SqlitePool,
309        table: &str,
310        column: &str,
311        type_def: &str,
312    ) -> Result<()> {
313        let exists: Option<(String,)> =
314            sqlx::query_as("SELECT name FROM pragma_table_info(?) WHERE name = ?")
315                .bind(table)
316                .bind(column)
317                .fetch_optional(pool)
318                .await?;
319        if exists.is_none() {
320            let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {type_def}");
321            sqlx::query(&sql).execute(pool).await?;
322        }
323        Ok(())
324    }
325}
326
327impl WorkflowStore for SqliteStore {
328    // ── Namespaces ─────────────────────────────────────────
329
330    async fn create_namespace(&self, name: &str) -> Result<()> {
331        sqlx::query("INSERT INTO namespaces (name, created_at) VALUES (?, ?)")
332            .bind(name)
333            .bind(timestamp_now())
334            .execute(&self.pool)
335            .await?;
336        Ok(())
337    }
338
339    async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>> {
340        let rows = sqlx::query_as::<_, (String, f64)>(
341            "SELECT name, created_at FROM namespaces ORDER BY name",
342        )
343        .fetch_all(&self.pool)
344        .await?;
345        Ok(rows
346            .into_iter()
347            .map(|(name, created_at)| NamespaceRecord { name, created_at })
348            .collect())
349    }
350
351    async fn delete_namespace(&self, name: &str) -> Result<bool> {
352        let res = sqlx::query("DELETE FROM namespaces WHERE name = ?")
353            .bind(name)
354            .execute(&self.pool)
355            .await?;
356        Ok(res.rows_affected() > 0)
357    }
358
359    async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats> {
360        let total: (i64,) =
361            sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = ?")
362                .bind(namespace)
363                .fetch_one(&self.pool)
364                .await?;
365        let running: (i64,) = sqlx::query_as(
366            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'RUNNING'",
367        )
368        .bind(namespace)
369        .fetch_one(&self.pool)
370        .await?;
371        let pending: (i64,) = sqlx::query_as(
372            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'PENDING'",
373        )
374        .bind(namespace)
375        .fetch_one(&self.pool)
376        .await?;
377        let completed: (i64,) = sqlx::query_as(
378            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'COMPLETED'",
379        )
380        .bind(namespace)
381        .fetch_one(&self.pool)
382        .await?;
383        let failed: (i64,) = sqlx::query_as(
384            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'FAILED'",
385        )
386        .bind(namespace)
387        .fetch_one(&self.pool)
388        .await?;
389        let schedules: (i64,) =
390            sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = ?")
391                .bind(namespace)
392                .fetch_one(&self.pool)
393                .await?;
394        let workers: (i64,) =
395            sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = ?")
396                .bind(namespace)
397                .fetch_one(&self.pool)
398                .await?;
399
400        Ok(NamespaceStats {
401            namespace: namespace.to_string(),
402            total_workflows: total.0,
403            running: running.0,
404            pending: pending.0,
405            completed: completed.0,
406            failed: failed.0,
407            schedules: schedules.0,
408            workers: workers.0,
409        })
410    }
411
412    // ── Workflows ──────────────────────────────────────────
413
414    async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
415        sqlx::query(
416            "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
417             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
418        )
419        .bind(&wf.id)
420        .bind(&wf.namespace)
421        .bind(&wf.run_id)
422        .bind(&wf.workflow_type)
423        .bind(&wf.task_queue)
424        .bind(&wf.status)
425        .bind(&wf.input)
426        .bind(&wf.result)
427        .bind(&wf.error)
428        .bind(&wf.parent_id)
429        .bind(&wf.claimed_by)
430        .bind(&wf.search_attributes)
431        .bind(wf.archived_at)
432        .bind(&wf.archive_uri)
433        .bind(wf.created_at)
434        .bind(wf.updated_at)
435        .bind(wf.completed_at)
436        .execute(&self.pool)
437        .await?;
438        Ok(())
439    }
440
441    async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
442        let row = sqlx::query_as::<_, SqliteWorkflowRow>(
443            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = ?",
444        )
445        .bind(id)
446        .fetch_optional(&self.pool)
447        .await?;
448        Ok(row.map(Into::into))
449    }
450
451    async fn list_workflows(
452        &self,
453        namespace: &str,
454        status: Option<WorkflowStatus>,
455        workflow_type: Option<&str>,
456        search_attrs_filter: Option<&str>,
457        limit: i64,
458        offset: i64,
459    ) -> Result<Vec<WorkflowRecord>> {
460        let status_str = status.map(|s| s.to_string());
461
462        // Parse search filter into (key, value) pairs. Each pair adds a
463        // `json_extract(search_attributes, '$.key') = value` predicate so
464        // matches require every filter key to be present in the stored
465        // attributes. Invalid/empty JSON → no filter (all pass).
466        let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
467            .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
468            .and_then(|v| v.as_object().cloned())
469            .map(|m| m.into_iter().collect())
470            .unwrap_or_default();
471
472        let mut sql = String::from(
473            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
474             FROM workflows
475             WHERE namespace = ?
476               AND (? IS NULL OR status = ?)
477               AND (? IS NULL OR workflow_type = ?)",
478        );
479        for _ in &filter_pairs {
480            sql.push_str(" AND json_extract(search_attributes, '$.' || ?) = ?");
481        }
482        sql.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
483
484        let mut q = sqlx::query_as::<_, SqliteWorkflowRow>(&sql)
485            .bind(namespace)
486            .bind(&status_str)
487            .bind(&status_str)
488            .bind(workflow_type)
489            .bind(workflow_type);
490        for (key, value) in &filter_pairs {
491            q = q.bind(key.clone());
492            // Bind the JSON value as its string/number representation.
493            // json_extract on a stored JSON string returns its "natural"
494            // SQLite type (text for strings, numeric for numbers), so we
495            // match by the same type.
496            match value {
497                serde_json::Value::String(s) => q = q.bind(s.clone()),
498                serde_json::Value::Number(n) => {
499                    if let Some(i) = n.as_i64() {
500                        q = q.bind(i);
501                    } else if let Some(f) = n.as_f64() {
502                        q = q.bind(f);
503                    } else {
504                        q = q.bind(n.to_string());
505                    }
506                }
507                serde_json::Value::Bool(b) => q = q.bind(*b as i64),
508                _ => q = q.bind(value.to_string()),
509            }
510        }
511        let rows = q
512            .bind(limit)
513            .bind(offset)
514            .fetch_all(&self.pool)
515        .await?;
516        Ok(rows.into_iter().map(Into::into).collect())
517    }
518
519    async fn update_workflow_status(
520        &self,
521        id: &str,
522        status: WorkflowStatus,
523        result: Option<&str>,
524        error: Option<&str>,
525    ) -> Result<()> {
526        let now = timestamp_now();
527        let completed_at = if status.is_terminal() { Some(now) } else { None };
528        sqlx::query(
529            "UPDATE workflows SET status = ?, result = COALESCE(?, result), error = COALESCE(?, error), updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE id = ?",
530        )
531        .bind(status.to_string())
532        .bind(result)
533        .bind(error)
534        .bind(now)
535        .bind(completed_at)
536        .bind(id)
537        .execute(&self.pool)
538        .await?;
539        Ok(())
540    }
541
542    async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
543        let res = sqlx::query(
544            "UPDATE workflows SET claimed_by = ?, status = 'RUNNING', updated_at = ? WHERE id = ? AND claimed_by IS NULL",
545        )
546        .bind(worker_id)
547        .bind(timestamp_now())
548        .bind(id)
549        .execute(&self.pool)
550        .await?;
551        Ok(res.rows_affected() > 0)
552    }
553
554    async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
555        sqlx::query("UPDATE workflows SET needs_dispatch = 1 WHERE id = ?")
556            .bind(workflow_id)
557            .execute(&self.pool)
558            .await?;
559        Ok(())
560    }
561
562    async fn claim_workflow_task(
563        &self,
564        task_queue: &str,
565        worker_id: &str,
566    ) -> Result<Option<WorkflowRecord>> {
567        let now = timestamp_now();
568        // Atomic: pick the oldest dispatchable + unclaimed workflow on the queue
569        let row = sqlx::query_as::<_, SqliteWorkflowRow>(
570            "UPDATE workflows
571             SET dispatch_claimed_by = ?, dispatch_last_heartbeat = ?, needs_dispatch = 0
572             WHERE id = (
573                SELECT id FROM workflows
574                WHERE task_queue = ?
575                  AND needs_dispatch = 1
576                  AND dispatch_claimed_by IS NULL
577                  AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
578                ORDER BY updated_at ASC
579                LIMIT 1
580             )
581             RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
582        )
583        .bind(worker_id)
584        .bind(now)
585        .bind(task_queue)
586        .fetch_optional(&self.pool)
587        .await?;
588        Ok(row.map(Into::into))
589    }
590
591    async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
592        sqlx::query(
593            "UPDATE workflows
594             SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
595             WHERE id = ? AND dispatch_claimed_by = ?",
596        )
597        .bind(workflow_id)
598        .bind(worker_id)
599        .execute(&self.pool)
600        .await?;
601        Ok(())
602    }
603
604    async fn release_stale_dispatch_leases(
605        &self,
606        now: f64,
607        timeout_secs: f64,
608    ) -> Result<u64> {
609        // Re-arm needs_dispatch so the work goes back into the pool. Don't
610        // touch workflows that have reached a terminal state — those should
611        // never be re-dispatched.
612        let res = sqlx::query(
613            "UPDATE workflows
614             SET dispatch_claimed_by = NULL,
615                 dispatch_last_heartbeat = NULL,
616                 needs_dispatch = 1
617             WHERE dispatch_claimed_by IS NOT NULL
618               AND (? - dispatch_last_heartbeat) > ?
619               AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
620        )
621        .bind(now)
622        .bind(timeout_secs)
623        .execute(&self.pool)
624        .await?;
625        Ok(res.rows_affected())
626    }
627
628    // ── Events ─────────────────────────────────────────────
629
630    async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
631        let res = sqlx::query(
632            "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)",
633        )
634        .bind(&ev.workflow_id)
635        .bind(ev.seq)
636        .bind(&ev.event_type)
637        .bind(&ev.payload)
638        .bind(ev.timestamp)
639        .execute(&self.pool)
640        .await?;
641        Ok(res.last_insert_rowid())
642    }
643
644    async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
645        let rows = sqlx::query_as::<_, SqliteEventRow>(
646            "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = ? ORDER BY seq ASC",
647        )
648        .bind(workflow_id)
649        .fetch_all(&self.pool)
650        .await?;
651        Ok(rows.into_iter().map(Into::into).collect())
652    }
653
654    async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
655        let row: (i64,) =
656            sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = ?")
657                .bind(workflow_id)
658                .fetch_one(&self.pool)
659                .await?;
660        Ok(row.0)
661    }
662
663    // ── Activities ──────────────────────────────────────────
664
665    async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
666        let res = sqlx::query(
667            "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
668             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
669        )
670        .bind(&act.workflow_id)
671        .bind(act.seq)
672        .bind(&act.name)
673        .bind(&act.task_queue)
674        .bind(&act.input)
675        .bind(&act.status)
676        .bind(act.attempt)
677        .bind(act.max_attempts)
678        .bind(act.initial_interval_secs)
679        .bind(act.backoff_coefficient)
680        .bind(act.start_to_close_secs)
681        .bind(act.heartbeat_timeout_secs)
682        .bind(act.scheduled_at)
683        .execute(&self.pool)
684        .await?;
685        Ok(res.last_insert_rowid())
686    }
687
688    async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
689        let row = sqlx::query_as::<_, SqliteActivityRow>(
690            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
691             FROM workflow_activities WHERE id = ?",
692        )
693        .bind(id)
694        .fetch_optional(&self.pool)
695        .await?;
696        Ok(row.map(Into::into))
697    }
698
699    async fn get_activity_by_workflow_seq(
700        &self,
701        workflow_id: &str,
702        seq: i32,
703    ) -> Result<Option<WorkflowActivity>> {
704        let row = sqlx::query_as::<_, SqliteActivityRow>(
705            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
706             FROM workflow_activities WHERE workflow_id = ? AND seq = ?",
707        )
708        .bind(workflow_id)
709        .bind(seq)
710        .fetch_optional(&self.pool)
711        .await?;
712        Ok(row.map(Into::into))
713    }
714
715    async fn claim_activity(
716        &self,
717        task_queue: &str,
718        worker_id: &str,
719    ) -> Result<Option<WorkflowActivity>> {
720        let now = timestamp_now();
721        let row = sqlx::query_as::<_, SqliteActivityRow>(
722            "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = ?, started_at = ?
723             WHERE id = (
724                SELECT id FROM workflow_activities
725                WHERE task_queue = ? AND status = 'PENDING'
726                ORDER BY scheduled_at ASC
727                LIMIT 1
728             )
729             RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
730        )
731        .bind(worker_id)
732        .bind(now)
733        .bind(task_queue)
734        .fetch_optional(&self.pool)
735        .await?;
736        Ok(row.map(Into::into))
737    }
738
739    async fn requeue_activity_for_retry(
740        &self,
741        id: i64,
742        next_attempt: i32,
743        next_scheduled_at: f64,
744    ) -> Result<()> {
745        sqlx::query(
746            "UPDATE workflow_activities
747             SET status = 'PENDING', attempt = ?, scheduled_at = ?,
748                 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
749                 error = NULL
750             WHERE id = ?",
751        )
752        .bind(next_attempt)
753        .bind(next_scheduled_at)
754        .bind(id)
755        .execute(&self.pool)
756        .await?;
757        Ok(())
758    }
759
760    async fn complete_activity(
761        &self,
762        id: i64,
763        result: Option<&str>,
764        error: Option<&str>,
765        failed: bool,
766    ) -> Result<()> {
767        let status = if failed { "FAILED" } else { "COMPLETED" };
768        sqlx::query(
769            "UPDATE workflow_activities SET status = ?, result = ?, error = ?, completed_at = ? WHERE id = ?",
770        )
771        .bind(status)
772        .bind(result)
773        .bind(error)
774        .bind(timestamp_now())
775        .bind(id)
776        .execute(&self.pool)
777        .await?;
778        Ok(())
779    }
780
781    async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
782        sqlx::query("UPDATE workflow_activities SET last_heartbeat = ? WHERE id = ?")
783            .bind(timestamp_now())
784            .bind(id)
785            .execute(&self.pool)
786            .await?;
787        Ok(())
788    }
789
790    async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
791        let rows = sqlx::query_as::<_, SqliteActivityRow>(
792            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
793             FROM workflow_activities
794             WHERE status = 'RUNNING'
795               AND heartbeat_timeout_secs IS NOT NULL
796               AND last_heartbeat IS NOT NULL
797               AND (? - last_heartbeat) > heartbeat_timeout_secs",
798        )
799        .bind(now)
800        .fetch_all(&self.pool)
801        .await?;
802        Ok(rows.into_iter().map(Into::into).collect())
803    }
804
805    // ── Timers ──────────────────────────────────────────────
806
807    async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
808        let res = sqlx::query(
809            "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES (?, ?, ?, 0)",
810        )
811        .bind(&timer.workflow_id)
812        .bind(timer.seq)
813        .bind(timer.fire_at)
814        .execute(&self.pool)
815        .await?;
816        Ok(res.last_insert_rowid())
817    }
818
819    async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
820        let res = sqlx::query(
821            "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = ?
822             WHERE workflow_id = ? AND status = 'PENDING'",
823        )
824        .bind(timestamp_now())
825        .bind(workflow_id)
826        .execute(&self.pool)
827        .await?;
828        Ok(res.rows_affected())
829    }
830
831    async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
832        let res = sqlx::query(
833            "UPDATE workflow_timers SET fired = 1
834             WHERE workflow_id = ? AND fired = 0",
835        )
836        .bind(workflow_id)
837        .execute(&self.pool)
838        .await?;
839        Ok(res.rows_affected())
840    }
841
842    async fn get_timer_by_workflow_seq(
843        &self,
844        workflow_id: &str,
845        seq: i32,
846    ) -> Result<Option<WorkflowTimer>> {
847        let row = sqlx::query_as::<_, SqliteTimerRow>(
848            "SELECT id, workflow_id, seq, fire_at, fired
849             FROM workflow_timers WHERE workflow_id = ? AND seq = ?",
850        )
851        .bind(workflow_id)
852        .bind(seq)
853        .fetch_optional(&self.pool)
854        .await?;
855        Ok(row.map(Into::into))
856    }
857
858    async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
859        let rows = sqlx::query_as::<_, SqliteTimerRow>(
860            "UPDATE workflow_timers SET fired = 1
861             WHERE fired = 0 AND fire_at <= ?
862             RETURNING id, workflow_id, seq, fire_at, fired",
863        )
864        .bind(now)
865        .fetch_all(&self.pool)
866        .await?;
867        Ok(rows.into_iter().map(Into::into).collect())
868    }
869
870    // ── Signals ─────────────────────────────────────────────
871
872    async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
873        let res = sqlx::query(
874            "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES (?, ?, ?, 0, ?)",
875        )
876        .bind(&sig.workflow_id)
877        .bind(&sig.name)
878        .bind(&sig.payload)
879        .bind(sig.received_at)
880        .execute(&self.pool)
881        .await?;
882        Ok(res.last_insert_rowid())
883    }
884
885    async fn consume_signals(
886        &self,
887        workflow_id: &str,
888        name: &str,
889    ) -> Result<Vec<WorkflowSignal>> {
890        let rows = sqlx::query_as::<_, SqliteSignalRow>(
891            "UPDATE workflow_signals SET consumed = 1
892             WHERE workflow_id = ? AND name = ? AND consumed = 0
893             RETURNING id, workflow_id, name, payload, consumed, received_at",
894        )
895        .bind(workflow_id)
896        .bind(name)
897        .fetch_all(&self.pool)
898        .await?;
899        Ok(rows.into_iter().map(Into::into).collect())
900    }
901
902    // ── Schedules ───────────────────────────────────────────
903
904    async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
905        sqlx::query(
906            "INSERT INTO workflow_schedules (name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
907             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
908        )
909        .bind(&sched.name)
910        .bind(&sched.namespace)
911        .bind(&sched.workflow_type)
912        .bind(&sched.cron_expr)
913        .bind(&sched.timezone)
914        .bind(&sched.input)
915        .bind(&sched.task_queue)
916        .bind(&sched.overlap_policy)
917        .bind(sched.paused)
918        .bind(sched.last_run_at)
919        .bind(sched.next_run_at)
920        .bind(&sched.last_workflow_id)
921        .bind(sched.created_at)
922        .execute(&self.pool)
923        .await?;
924        Ok(())
925    }
926
927    async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
928        let row = sqlx::query_as::<_, SqliteScheduleRow>(
929            "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
930             FROM workflow_schedules WHERE namespace = ? AND name = ?",
931        )
932        .bind(namespace)
933        .bind(name)
934        .fetch_optional(&self.pool)
935        .await?;
936        Ok(row.map(Into::into))
937    }
938
939    async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
940        let rows = sqlx::query_as::<_, SqliteScheduleRow>(
941            "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
942             FROM workflow_schedules WHERE namespace = ? ORDER BY name",
943        )
944        .bind(namespace)
945        .fetch_all(&self.pool)
946        .await?;
947        Ok(rows.into_iter().map(Into::into).collect())
948    }
949
950    async fn update_schedule_last_run(
951        &self,
952        namespace: &str,
953        name: &str,
954        last_run_at: f64,
955        next_run_at: f64,
956        workflow_id: &str,
957    ) -> Result<()> {
958        sqlx::query(
959            "UPDATE workflow_schedules SET last_run_at = ?, next_run_at = ?, last_workflow_id = ? WHERE namespace = ? AND name = ?",
960        )
961        .bind(last_run_at)
962        .bind(next_run_at)
963        .bind(workflow_id)
964        .bind(namespace)
965        .bind(name)
966        .execute(&self.pool)
967        .await?;
968        Ok(())
969    }
970
971    async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
972        let res =
973            sqlx::query("DELETE FROM workflow_schedules WHERE namespace = ? AND name = ?")
974                .bind(namespace)
975                .bind(name)
976                .execute(&self.pool)
977                .await?;
978        Ok(res.rows_affected() > 0)
979    }
980
981    async fn list_archivable_workflows(
982        &self,
983        cutoff: f64,
984        limit: i64,
985    ) -> Result<Vec<WorkflowRecord>> {
986        let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
987            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
988             FROM workflows
989             WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
990               AND completed_at IS NOT NULL
991               AND completed_at < ?
992               AND archived_at IS NULL
993             ORDER BY completed_at ASC
994             LIMIT ?",
995        )
996        .bind(cutoff)
997        .bind(limit)
998        .fetch_all(&self.pool)
999        .await?;
1000        Ok(rows.into_iter().map(Into::into).collect())
1001    }
1002
1003    async fn mark_archived_and_purge(
1004        &self,
1005        workflow_id: &str,
1006        archive_uri: &str,
1007        archived_at: f64,
1008    ) -> Result<()> {
1009        let mut tx = self.pool.begin().await?;
1010        sqlx::query("DELETE FROM workflow_events WHERE workflow_id = ?")
1011            .bind(workflow_id)
1012            .execute(&mut *tx)
1013            .await?;
1014        sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = ?")
1015            .bind(workflow_id)
1016            .execute(&mut *tx)
1017            .await?;
1018        sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = ?")
1019            .bind(workflow_id)
1020            .execute(&mut *tx)
1021            .await?;
1022        sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = ?")
1023            .bind(workflow_id)
1024            .execute(&mut *tx)
1025            .await?;
1026        sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = ?")
1027            .bind(workflow_id)
1028            .execute(&mut *tx)
1029            .await?;
1030        sqlx::query(
1031            "UPDATE workflows SET archived_at = ?, archive_uri = ? WHERE id = ?",
1032        )
1033        .bind(archived_at)
1034        .bind(archive_uri)
1035        .bind(workflow_id)
1036        .execute(&mut *tx)
1037        .await?;
1038        tx.commit().await?;
1039        Ok(())
1040    }
1041
1042    async fn upsert_search_attributes(
1043        &self,
1044        workflow_id: &str,
1045        patch_json: &str,
1046    ) -> Result<()> {
1047        // Merge at the application layer so we don't depend on SQLite's
1048        // `json_patch`, which is only available with the json1 extension.
1049        let current: Option<(Option<String>,)> =
1050            sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = ?")
1051                .bind(workflow_id)
1052                .fetch_optional(&self.pool)
1053                .await?;
1054        let merged = merge_search_attrs(
1055            current.and_then(|(s,)| s).as_deref(),
1056            patch_json,
1057        )?;
1058        sqlx::query("UPDATE workflows SET search_attributes = ? WHERE id = ?")
1059            .bind(merged)
1060            .bind(workflow_id)
1061            .execute(&self.pool)
1062            .await?;
1063        Ok(())
1064    }
1065
1066    async fn update_schedule(
1067        &self,
1068        namespace: &str,
1069        name: &str,
1070        patch: &SchedulePatch,
1071    ) -> Result<Option<WorkflowSchedule>> {
1072        // Build the UPDATE dynamically so unchanged fields aren't touched
1073        // and NULL from `serde_json::Value::Null` round-trips cleanly.
1074        let mut sets: Vec<&'static str> = Vec::new();
1075        if patch.cron_expr.is_some() {
1076            sets.push("cron_expr = ?");
1077        }
1078        if patch.timezone.is_some() {
1079            sets.push("timezone = ?");
1080        }
1081        if patch.input.is_some() {
1082            sets.push("input = ?");
1083        }
1084        if patch.task_queue.is_some() {
1085            sets.push("task_queue = ?");
1086        }
1087        if patch.overlap_policy.is_some() {
1088            sets.push("overlap_policy = ?");
1089        }
1090        // Updating last_run_at/next_run_at is internal only (update_schedule_last_run).
1091        if sets.is_empty() {
1092            return self.get_schedule(namespace, name).await;
1093        }
1094
1095        let sql = format!(
1096            "UPDATE workflow_schedules SET {} WHERE namespace = ? AND name = ?",
1097            sets.join(", ")
1098        );
1099        let mut q = sqlx::query(&sql);
1100        if let Some(ref v) = patch.cron_expr {
1101            q = q.bind(v);
1102        }
1103        if let Some(ref v) = patch.timezone {
1104            q = q.bind(v);
1105        }
1106        if let Some(ref v) = patch.input {
1107            q = q.bind(v.to_string());
1108        }
1109        if let Some(ref v) = patch.task_queue {
1110            q = q.bind(v);
1111        }
1112        if let Some(ref v) = patch.overlap_policy {
1113            q = q.bind(v);
1114        }
1115        let res = q
1116            .bind(namespace)
1117            .bind(name)
1118            .execute(&self.pool)
1119            .await?;
1120        if res.rows_affected() == 0 {
1121            return Ok(None);
1122        }
1123        self.get_schedule(namespace, name).await
1124    }
1125
1126    async fn set_schedule_paused(
1127        &self,
1128        namespace: &str,
1129        name: &str,
1130        paused: bool,
1131    ) -> Result<Option<WorkflowSchedule>> {
1132        let res = sqlx::query(
1133            "UPDATE workflow_schedules SET paused = ? WHERE namespace = ? AND name = ?",
1134        )
1135        .bind(paused)
1136        .bind(namespace)
1137        .bind(name)
1138        .execute(&self.pool)
1139        .await?;
1140        if res.rows_affected() == 0 {
1141            return Ok(None);
1142        }
1143        self.get_schedule(namespace, name).await
1144    }
1145
1146    // ── Workers ─────────────────────────────────────────────
1147
1148    async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1149        sqlx::query(
1150            "INSERT OR REPLACE INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1151             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1152        )
1153        .bind(&w.id)
1154        .bind(&w.namespace)
1155        .bind(&w.identity)
1156        .bind(&w.task_queue)
1157        .bind(&w.workflows)
1158        .bind(&w.activities)
1159        .bind(w.max_concurrent_workflows)
1160        .bind(w.max_concurrent_activities)
1161        .bind(w.active_tasks)
1162        .bind(w.last_heartbeat)
1163        .bind(w.registered_at)
1164        .execute(&self.pool)
1165        .await?;
1166        Ok(())
1167    }
1168
1169    async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1170        sqlx::query("UPDATE workflow_workers SET last_heartbeat = ? WHERE id = ?")
1171            .bind(now)
1172            .bind(id)
1173            .execute(&self.pool)
1174            .await?;
1175        Ok(())
1176    }
1177
1178    async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1179        let rows = sqlx::query_as::<_, SqliteWorkerRow>(
1180            "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at
1181             FROM workflow_workers WHERE namespace = ? ORDER BY registered_at",
1182        )
1183        .bind(namespace)
1184        .fetch_all(&self.pool)
1185        .await?;
1186        Ok(rows.into_iter().map(Into::into).collect())
1187    }
1188
1189    async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1190        let rows: Vec<(String,)> =
1191            sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < ?")
1192                .bind(cutoff)
1193                .fetch_all(&self.pool)
1194                .await?;
1195        let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1196        if !ids.is_empty() {
1197            sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < ?")
1198                .bind(cutoff)
1199                .execute(&self.pool)
1200                .await?;
1201        }
1202        Ok(ids)
1203    }
1204
1205    // ── API Keys ────────────────────────────────────────────
1206
1207    async fn create_api_key(
1208        &self,
1209        key_hash: &str,
1210        prefix: &str,
1211        label: Option<&str>,
1212        created_at: f64,
1213    ) -> Result<()> {
1214        sqlx::query(
1215            "INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES (?, ?, ?, ?)",
1216        )
1217        .bind(key_hash)
1218        .bind(prefix)
1219        .bind(label)
1220        .bind(created_at)
1221        .execute(&self.pool)
1222        .await?;
1223        Ok(())
1224    }
1225
1226    async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1227        let row: Option<(i64,)> =
1228            sqlx::query_as("SELECT 1 FROM api_keys WHERE key_hash = ?")
1229                .bind(key_hash)
1230                .fetch_optional(&self.pool)
1231                .await?;
1232        Ok(row.is_some())
1233    }
1234
1235    async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>> {
1236        let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1237            "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1238        )
1239        .fetch_all(&self.pool)
1240        .await?;
1241        Ok(rows
1242            .into_iter()
1243            .map(|(prefix, label, created_at)| ApiKeyRecord {
1244                prefix,
1245                label,
1246                created_at,
1247            })
1248            .collect())
1249    }
1250
1251    async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1252        let res = sqlx::query("DELETE FROM api_keys WHERE prefix = ?")
1253            .bind(prefix)
1254            .execute(&self.pool)
1255            .await?;
1256        Ok(res.rows_affected() > 0)
1257    }
1258
1259    // ── Child Workflows ─────────────────────────────────────
1260
1261    async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1262        let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
1263            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1264             FROM workflows WHERE parent_id = ? ORDER BY created_at ASC",
1265        )
1266        .bind(parent_id)
1267        .fetch_all(&self.pool)
1268        .await?;
1269        Ok(rows.into_iter().map(Into::into).collect())
1270    }
1271
1272    // ── Snapshots ───────────────────────────────────────────
1273
1274    async fn create_snapshot(
1275        &self,
1276        workflow_id: &str,
1277        event_seq: i32,
1278        state_json: &str,
1279    ) -> Result<()> {
1280        sqlx::query(
1281            "INSERT OR REPLACE INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1282             VALUES (?, ?, ?, ?)",
1283        )
1284        .bind(workflow_id)
1285        .bind(event_seq)
1286        .bind(state_json)
1287        .bind(timestamp_now())
1288        .execute(&self.pool)
1289        .await?;
1290        Ok(())
1291    }
1292
1293    async fn get_latest_snapshot(
1294        &self,
1295        workflow_id: &str,
1296    ) -> Result<Option<WorkflowSnapshot>> {
1297        let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1298            "SELECT workflow_id, event_seq, state_json, created_at
1299             FROM workflow_snapshots WHERE workflow_id = ?
1300             ORDER BY event_seq DESC LIMIT 1",
1301        )
1302        .bind(workflow_id)
1303        .fetch_optional(&self.pool)
1304        .await?;
1305
1306        Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1307            workflow_id,
1308            event_seq,
1309            state_json,
1310            created_at,
1311        }))
1312    }
1313
1314    // ── Queue Stats ─────────────────────────────────────────
1315
1316    async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>> {
1317        // Gather activity stats per queue for workflows in this namespace
1318        let rows = sqlx::query_as::<_, (String, i64, i64)>(
1319            "SELECT a.task_queue,
1320                    SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END),
1321                    SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END)
1322             FROM workflow_activities a
1323             INNER JOIN workflows w ON w.id = a.workflow_id
1324             WHERE w.namespace = ?
1325             GROUP BY a.task_queue",
1326        )
1327        .bind(namespace)
1328        .fetch_all(&self.pool)
1329        .await?;
1330
1331        let mut stats: Vec<QueueStats> = rows
1332            .into_iter()
1333            .map(|(queue, pending, running)| QueueStats {
1334                queue,
1335                pending_activities: pending,
1336                running_activities: running,
1337                workers: 0,
1338            })
1339            .collect();
1340
1341        // Gather worker counts per queue in this namespace
1342        let worker_rows = sqlx::query_as::<_, (String, i64)>(
1343            "SELECT task_queue, COUNT(*) FROM workflow_workers WHERE namespace = ? GROUP BY task_queue",
1344        )
1345        .bind(namespace)
1346        .fetch_all(&self.pool)
1347        .await?;
1348
1349        for (queue, count) in worker_rows {
1350            if let Some(s) = stats.iter_mut().find(|s| s.queue == queue) {
1351                s.workers = count;
1352            } else {
1353                stats.push(QueueStats {
1354                    queue,
1355                    pending_activities: 0,
1356                    running_activities: 0,
1357                    workers: count,
1358                });
1359            }
1360        }
1361
1362        stats.sort_by(|a, b| a.queue.cmp(&b.queue));
1363        Ok(stats)
1364    }
1365
1366    // ── Leader Election ─────────────────────────────────────
1367
1368    async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1369        // SQLite is single-instance — always the leader.
1370        // Also refresh the engine lock heartbeat on each scheduler tick.
1371        self.refresh_engine_lock().await.ok();
1372        Ok(true)
1373    }
1374}
1375
1376fn timestamp_now() -> f64 {
1377    std::time::SystemTime::now()
1378        .duration_since(std::time::UNIX_EPOCH)
1379        .unwrap()
1380        .as_secs_f64()
1381}
1382
1383/// Merge a JSON-object patch into a (possibly-null) current JSON object,
1384/// returning the serialised result. Shared by SQLite and Postgres stores.
1385pub(crate) fn merge_search_attrs(current: Option<&str>, patch_json: &str) -> Result<String> {
1386    let mut current_map: serde_json::Map<String, serde_json::Value> = current
1387        .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
1388        .and_then(|v| v.as_object().cloned())
1389        .unwrap_or_default();
1390    let patch: serde_json::Value = serde_json::from_str(patch_json)
1391        .map_err(|e| anyhow::anyhow!("invalid search_attributes patch: {e}"))?;
1392    let patch_obj = patch
1393        .as_object()
1394        .ok_or_else(|| anyhow::anyhow!("search_attributes patch must be a JSON object"))?;
1395    for (k, v) in patch_obj {
1396        current_map.insert(k.clone(), v.clone());
1397    }
1398    Ok(serde_json::Value::Object(current_map).to_string())
1399}
1400
1401// ── SQLite row types (sqlx::FromRow) ────────────────────────
1402
1403#[derive(sqlx::FromRow)]
1404struct SqliteWorkflowRow {
1405    id: String,
1406    namespace: String,
1407    run_id: String,
1408    workflow_type: String,
1409    task_queue: String,
1410    status: String,
1411    input: Option<String>,
1412    result: Option<String>,
1413    error: Option<String>,
1414    parent_id: Option<String>,
1415    claimed_by: Option<String>,
1416    search_attributes: Option<String>,
1417    archived_at: Option<f64>,
1418    archive_uri: Option<String>,
1419    created_at: f64,
1420    updated_at: f64,
1421    completed_at: Option<f64>,
1422}
1423
1424impl From<SqliteWorkflowRow> for WorkflowRecord {
1425    fn from(r: SqliteWorkflowRow) -> Self {
1426        Self {
1427            id: r.id,
1428            namespace: r.namespace,
1429            run_id: r.run_id,
1430            workflow_type: r.workflow_type,
1431            task_queue: r.task_queue,
1432            status: r.status,
1433            input: r.input,
1434            result: r.result,
1435            error: r.error,
1436            parent_id: r.parent_id,
1437            claimed_by: r.claimed_by,
1438            search_attributes: r.search_attributes,
1439            archived_at: r.archived_at,
1440            archive_uri: r.archive_uri,
1441            created_at: r.created_at,
1442            updated_at: r.updated_at,
1443            completed_at: r.completed_at,
1444        }
1445    }
1446}
1447
1448#[derive(sqlx::FromRow)]
1449struct SqliteEventRow {
1450    id: i64,
1451    workflow_id: String,
1452    seq: i32,
1453    event_type: String,
1454    payload: Option<String>,
1455    timestamp: f64,
1456}
1457
1458impl From<SqliteEventRow> for WorkflowEvent {
1459    fn from(r: SqliteEventRow) -> Self {
1460        Self {
1461            id: Some(r.id),
1462            workflow_id: r.workflow_id,
1463            seq: r.seq,
1464            event_type: r.event_type,
1465            payload: r.payload,
1466            timestamp: r.timestamp,
1467        }
1468    }
1469}
1470
1471#[derive(sqlx::FromRow)]
1472struct SqliteActivityRow {
1473    id: i64,
1474    workflow_id: String,
1475    seq: i32,
1476    name: String,
1477    task_queue: String,
1478    input: Option<String>,
1479    status: String,
1480    result: Option<String>,
1481    error: Option<String>,
1482    attempt: i32,
1483    max_attempts: i32,
1484    initial_interval_secs: f64,
1485    backoff_coefficient: f64,
1486    start_to_close_secs: f64,
1487    heartbeat_timeout_secs: Option<f64>,
1488    claimed_by: Option<String>,
1489    scheduled_at: f64,
1490    started_at: Option<f64>,
1491    completed_at: Option<f64>,
1492    last_heartbeat: Option<f64>,
1493}
1494
1495impl From<SqliteActivityRow> for WorkflowActivity {
1496    fn from(r: SqliteActivityRow) -> Self {
1497        Self {
1498            id: Some(r.id),
1499            workflow_id: r.workflow_id,
1500            seq: r.seq,
1501            name: r.name,
1502            task_queue: r.task_queue,
1503            input: r.input,
1504            status: r.status,
1505            result: r.result,
1506            error: r.error,
1507            attempt: r.attempt,
1508            max_attempts: r.max_attempts,
1509            initial_interval_secs: r.initial_interval_secs,
1510            backoff_coefficient: r.backoff_coefficient,
1511            start_to_close_secs: r.start_to_close_secs,
1512            heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1513            claimed_by: r.claimed_by,
1514            scheduled_at: r.scheduled_at,
1515            started_at: r.started_at,
1516            completed_at: r.completed_at,
1517            last_heartbeat: r.last_heartbeat,
1518        }
1519    }
1520}
1521
1522#[derive(sqlx::FromRow)]
1523struct SqliteTimerRow {
1524    id: i64,
1525    workflow_id: String,
1526    seq: i32,
1527    fire_at: f64,
1528    fired: bool,
1529}
1530
1531impl From<SqliteTimerRow> for WorkflowTimer {
1532    fn from(r: SqliteTimerRow) -> Self {
1533        Self {
1534            id: Some(r.id),
1535            workflow_id: r.workflow_id,
1536            seq: r.seq,
1537            fire_at: r.fire_at,
1538            fired: r.fired,
1539        }
1540    }
1541}
1542
1543#[derive(sqlx::FromRow)]
1544struct SqliteSignalRow {
1545    id: i64,
1546    workflow_id: String,
1547    name: String,
1548    payload: Option<String>,
1549    consumed: bool,
1550    received_at: f64,
1551}
1552
1553impl From<SqliteSignalRow> for WorkflowSignal {
1554    fn from(r: SqliteSignalRow) -> Self {
1555        Self {
1556            id: Some(r.id),
1557            workflow_id: r.workflow_id,
1558            name: r.name,
1559            payload: r.payload,
1560            consumed: r.consumed,
1561            received_at: r.received_at,
1562        }
1563    }
1564}
1565
1566#[derive(sqlx::FromRow)]
1567struct SqliteScheduleRow {
1568    name: String,
1569    namespace: String,
1570    workflow_type: String,
1571    cron_expr: String,
1572    timezone: String,
1573    input: Option<String>,
1574    task_queue: String,
1575    overlap_policy: String,
1576    paused: bool,
1577    last_run_at: Option<f64>,
1578    next_run_at: Option<f64>,
1579    last_workflow_id: Option<String>,
1580    created_at: f64,
1581}
1582
1583impl From<SqliteScheduleRow> for WorkflowSchedule {
1584    fn from(r: SqliteScheduleRow) -> Self {
1585        Self {
1586            name: r.name,
1587            namespace: r.namespace,
1588            workflow_type: r.workflow_type,
1589            cron_expr: r.cron_expr,
1590            timezone: r.timezone,
1591            input: r.input,
1592            task_queue: r.task_queue,
1593            overlap_policy: r.overlap_policy,
1594            paused: r.paused,
1595            last_run_at: r.last_run_at,
1596            next_run_at: r.next_run_at,
1597            last_workflow_id: r.last_workflow_id,
1598            created_at: r.created_at,
1599        }
1600    }
1601}
1602
1603#[derive(sqlx::FromRow)]
1604struct SqliteWorkerRow {
1605    id: String,
1606    namespace: String,
1607    identity: String,
1608    task_queue: String,
1609    workflows: Option<String>,
1610    activities: Option<String>,
1611    max_concurrent_workflows: i32,
1612    max_concurrent_activities: i32,
1613    active_tasks: i32,
1614    last_heartbeat: f64,
1615    registered_at: f64,
1616}
1617
1618impl From<SqliteWorkerRow> for WorkflowWorker {
1619    fn from(r: SqliteWorkerRow) -> Self {
1620        Self {
1621            id: r.id,
1622            namespace: r.namespace,
1623            identity: r.identity,
1624            task_queue: r.task_queue,
1625            workflows: r.workflows,
1626            activities: r.activities,
1627            max_concurrent_workflows: r.max_concurrent_workflows,
1628            max_concurrent_activities: r.max_concurrent_activities,
1629            active_tasks: r.active_tasks,
1630            last_heartbeat: r.last_heartbeat,
1631            registered_at: r.registered_at,
1632        }
1633    }
1634}