Skip to main content

assay_workflow/store/
sqlite.rs

1use anyhow::Result;
2use sqlx::SqlitePool;
3
4use crate::store::{ApiKeyRecord, NamespaceRecord, NamespaceStats, QueueStats, WorkflowStore};
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9    name            TEXT PRIMARY KEY,
10    created_at      REAL NOT NULL
11);
12
13INSERT OR IGNORE INTO namespaces (name, created_at)
14    VALUES ('main', strftime('%s', 'now'));
15
16CREATE TABLE IF NOT EXISTS workflows (
17    id              TEXT PRIMARY KEY,
18    namespace       TEXT NOT NULL DEFAULT 'main',
19    run_id          TEXT NOT NULL,
20    workflow_type   TEXT NOT NULL,
21    task_queue      TEXT NOT NULL DEFAULT 'main',
22    status          TEXT NOT NULL DEFAULT 'PENDING',
23    input           TEXT,
24    result          TEXT,
25    error           TEXT,
26    parent_id       TEXT,
27    claimed_by      TEXT,
28    search_attributes TEXT,
29    archived_at     REAL,
30    archive_uri     TEXT,
31    -- Workflow-task dispatch (Phase 9): a workflow is "dispatchable" when
32    -- it has new events a worker needs to replay against. Set true on
33    -- start, on activity completion, on timer fire, on signal arrival.
34    -- Cleared when a worker claims the dispatch lease.
35    needs_dispatch  INTEGER NOT NULL DEFAULT 0,
36    dispatch_claimed_by    TEXT,
37    dispatch_last_heartbeat REAL,
38    created_at      REAL NOT NULL,
39    updated_at      REAL NOT NULL,
40    completed_at    REAL
41);
42CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
43CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
44CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
45
46CREATE TABLE IF NOT EXISTS workflow_events (
47    id              INTEGER PRIMARY KEY AUTOINCREMENT,
48    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
49    seq             INTEGER NOT NULL,
50    event_type      TEXT NOT NULL,
51    payload         TEXT,
52    timestamp       REAL NOT NULL
53);
54CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
55
56CREATE TABLE IF NOT EXISTS workflow_activities (
57    id              INTEGER PRIMARY KEY AUTOINCREMENT,
58    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
59    seq             INTEGER NOT NULL,
60    name            TEXT NOT NULL,
61    task_queue      TEXT NOT NULL DEFAULT 'main',
62    input           TEXT,
63    status          TEXT NOT NULL DEFAULT 'PENDING',
64    result          TEXT,
65    error           TEXT,
66    attempt         INTEGER NOT NULL DEFAULT 1,
67    max_attempts    INTEGER NOT NULL DEFAULT 3,
68    initial_interval_secs   REAL NOT NULL DEFAULT 1,
69    backoff_coefficient     REAL NOT NULL DEFAULT 2,
70    start_to_close_secs     REAL NOT NULL DEFAULT 300,
71    heartbeat_timeout_secs  REAL,
72    claimed_by      TEXT,
73    scheduled_at    REAL NOT NULL,
74    started_at      REAL,
75    completed_at    REAL,
76    last_heartbeat  REAL,
77    UNIQUE (workflow_id, seq)
78);
79CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
80
81CREATE TABLE IF NOT EXISTS workflow_timers (
82    id              INTEGER PRIMARY KEY AUTOINCREMENT,
83    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
84    seq             INTEGER NOT NULL,
85    fire_at         REAL NOT NULL,
86    fired           INTEGER NOT NULL DEFAULT 0,
87    UNIQUE (workflow_id, seq)
88);
89CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at);
90
91CREATE TABLE IF NOT EXISTS workflow_signals (
92    id              INTEGER PRIMARY KEY AUTOINCREMENT,
93    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
94    name            TEXT NOT NULL,
95    payload         TEXT,
96    consumed        INTEGER NOT NULL DEFAULT 0,
97    received_at     REAL NOT NULL
98);
99CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
100
101CREATE TABLE IF NOT EXISTS workflow_schedules (
102    name            TEXT NOT NULL,
103    namespace       TEXT NOT NULL DEFAULT 'main',
104    workflow_type   TEXT NOT NULL,
105    cron_expr       TEXT NOT NULL,
106    timezone        TEXT NOT NULL DEFAULT 'UTC',
107    input           TEXT,
108    task_queue      TEXT NOT NULL DEFAULT 'main',
109    overlap_policy  TEXT NOT NULL DEFAULT 'skip',
110    paused          INTEGER NOT NULL DEFAULT 0,
111    last_run_at     REAL,
112    next_run_at     REAL,
113    last_workflow_id TEXT,
114    created_at      REAL NOT NULL,
115    PRIMARY KEY (namespace, name)
116);
117
118CREATE TABLE IF NOT EXISTS workflow_workers (
119    id              TEXT PRIMARY KEY,
120    namespace       TEXT NOT NULL DEFAULT 'main',
121    identity        TEXT NOT NULL,
122    task_queue      TEXT NOT NULL,
123    workflows       TEXT,
124    activities      TEXT,
125    max_concurrent_workflows  INTEGER NOT NULL DEFAULT 10,
126    max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
127    active_tasks    INTEGER NOT NULL DEFAULT 0,
128    last_heartbeat  REAL NOT NULL,
129    registered_at   REAL NOT NULL
130);
131
132CREATE TABLE IF NOT EXISTS workflow_snapshots (
133    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
134    event_seq       INTEGER NOT NULL,
135    state_json      TEXT NOT NULL,
136    created_at      REAL NOT NULL,
137    PRIMARY KEY (workflow_id, event_seq)
138);
139
140CREATE TABLE IF NOT EXISTS api_keys (
141    key_hash        TEXT PRIMARY KEY,
142    prefix          TEXT NOT NULL,
143    label           TEXT,
144    created_at      REAL NOT NULL
145);
146CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
147
148CREATE TABLE IF NOT EXISTS engine_lock (
149    id              INTEGER PRIMARY KEY CHECK (id = 1),
150    instance_id     TEXT NOT NULL,
151    started_at      REAL NOT NULL,
152    last_heartbeat  REAL NOT NULL
153);
154"#;
155
156/// Stale lock timeout — if the lock holder hasn't heartbeated in this
157/// many seconds, assume it's dead and allow takeover.
158const LOCK_STALE_SECS: f64 = 60.0;
159/// How often to refresh the lock heartbeat.
160const LOCK_HEARTBEAT_SECS: u64 = 15;
161
162pub struct SqliteStore {
163    pool: SqlitePool,
164    instance_id: String,
165}
166
167impl SqliteStore {
168    pub async fn new(url: &str) -> Result<Self> {
169        let pool = SqlitePool::connect(url).await?;
170        let instance_id = format!("assay-{:016x}", {
171            use std::collections::hash_map::DefaultHasher;
172            use std::hash::{Hash, Hasher};
173            let mut h = DefaultHasher::new();
174            std::time::SystemTime::now().hash(&mut h);
175            std::process::id().hash(&mut h);
176            h.finish()
177        });
178        let store = Self { pool, instance_id };
179        store.migrate().await?;
180        Ok(store)
181    }
182
183    /// Acquire the single-instance engine lock.
184    /// Returns an error if another instance is already running.
185    pub async fn acquire_engine_lock(&self) -> Result<()> {
186        let now = timestamp_now();
187
188        // Try to insert the lock
189        let result = sqlx::query(
190            "INSERT INTO engine_lock (id, instance_id, started_at, last_heartbeat) VALUES (1, ?, ?, ?)",
191        )
192        .bind(&self.instance_id)
193        .bind(now)
194        .bind(now)
195        .execute(&self.pool)
196        .await;
197
198        match result {
199            Ok(_) => Ok(()),
200            Err(_) => {
201                // Lock exists — check if it's stale
202                let row: Option<(String, f64)> = sqlx::query_as(
203                    "SELECT instance_id, last_heartbeat FROM engine_lock WHERE id = 1",
204                )
205                .fetch_optional(&self.pool)
206                .await?;
207
208                if let Some((existing_id, last_hb)) = row {
209                    if now - last_hb > LOCK_STALE_SECS {
210                        // Stale lock — take over
211                        sqlx::query(
212                            "UPDATE engine_lock SET instance_id = ?, started_at = ?, last_heartbeat = ? WHERE id = 1",
213                        )
214                        .bind(&self.instance_id)
215                        .bind(now)
216                        .bind(now)
217                        .execute(&self.pool)
218                        .await?;
219                        tracing::warn!(
220                            "Took over stale engine lock from {existing_id} (last heartbeat {:.0}s ago)",
221                            now - last_hb
222                        );
223                        Ok(())
224                    } else {
225                        let age = now - last_hb;
226                        anyhow::bail!(
227                            "Another assay engine instance is already running (id: {existing_id}, \
228                             last heartbeat {age:.0}s ago).\n\n\
229                             SQLite only supports a single engine instance. For multi-instance \
230                             deployment (Kubernetes, Docker Swarm), use PostgreSQL:\n\n\
231                             \x20 assay serve --backend postgres://user:pass@host:5432/dbname"
232                        );
233                    }
234                } else {
235                    anyhow::bail!("Unexpected engine lock state");
236                }
237            }
238        }
239    }
240
241    /// Refresh the engine lock heartbeat. Called periodically by the engine.
242    pub async fn refresh_engine_lock(&self) -> Result<()> {
243        sqlx::query("UPDATE engine_lock SET last_heartbeat = ? WHERE id = 1 AND instance_id = ?")
244            .bind(timestamp_now())
245            .bind(&self.instance_id)
246            .execute(&self.pool)
247            .await?;
248        Ok(())
249    }
250
251    /// Release the engine lock on shutdown.
252    pub async fn release_engine_lock(&self) -> Result<()> {
253        sqlx::query("DELETE FROM engine_lock WHERE id = 1 AND instance_id = ?")
254            .bind(&self.instance_id)
255            .execute(&self.pool)
256            .await?;
257        Ok(())
258    }
259
260    /// Start background task to keep the lock alive.
261    pub fn spawn_lock_heartbeat(self: &std::sync::Arc<Self>) {
262        let store = std::sync::Arc::clone(self);
263        tokio::spawn(async move {
264            let mut tick = tokio::time::interval(std::time::Duration::from_secs(LOCK_HEARTBEAT_SECS));
265            loop {
266                tick.tick().await;
267                if let Err(e) = store.refresh_engine_lock().await {
268                    tracing::error!("Engine lock heartbeat failed: {e}");
269                }
270            }
271        });
272    }
273
274    /// Apply the baseline schema.
275    ///
276    /// Fresh installs get the current `CREATE TABLE IF NOT EXISTS` statements
277    /// in one pass. The engine is pre-1.0 and no v0.11.x release has been
278    /// deployed against a real workload yet, so we don't carry
279    /// `ALTER TABLE ADD COLUMN` statements for historical columns — the
280    /// baseline is the source of truth.
281    ///
282    /// For **future** additive migrations (post-v0.11.3), call
283    /// `Self::add_column_if_missing(&self.pool, "<table>", "<column>",
284    /// "<type_def>").await?` here before returning. Kept around so adding a
285    /// column later is a one-liner.
286    async fn migrate(&self) -> Result<()> {
287        for statement in SCHEMA.split(';') {
288            let trimmed = statement.trim();
289            if !trimmed.is_empty() {
290                sqlx::query(trimmed).execute(&self.pool).await?;
291            }
292        }
293        // Future additive migrations go here; see doc-comment above.
294        Ok(())
295    }
296
297    /// Add a column to an existing table if it's not already there.
298    ///
299    /// SQLite (unlike Postgres) doesn't support `ADD COLUMN IF NOT EXISTS`,
300    /// so we check via `pragma_table_info` before issuing the ALTER. Each
301    /// call is idempotent across startups.
302    ///
303    /// Currently unused — kept as the documented pattern for the first
304    /// additive migration after v0.11.3. Remove `#[allow(dead_code)]` when
305    /// a caller is added.
306    #[allow(dead_code)]
307    async fn add_column_if_missing(
308        pool: &SqlitePool,
309        table: &str,
310        column: &str,
311        type_def: &str,
312    ) -> Result<()> {
313        let exists: Option<(String,)> =
314            sqlx::query_as("SELECT name FROM pragma_table_info(?) WHERE name = ?")
315                .bind(table)
316                .bind(column)
317                .fetch_optional(pool)
318                .await?;
319        if exists.is_none() {
320            let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {type_def}");
321            sqlx::query(&sql).execute(pool).await?;
322        }
323        Ok(())
324    }
325}
326
327impl WorkflowStore for SqliteStore {
328    // ── Namespaces ─────────────────────────────────────────
329
330    async fn create_namespace(&self, name: &str) -> Result<()> {
331        sqlx::query("INSERT INTO namespaces (name, created_at) VALUES (?, ?)")
332            .bind(name)
333            .bind(timestamp_now())
334            .execute(&self.pool)
335            .await?;
336        Ok(())
337    }
338
339    async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>> {
340        let rows = sqlx::query_as::<_, (String, f64)>(
341            "SELECT name, created_at FROM namespaces ORDER BY name",
342        )
343        .fetch_all(&self.pool)
344        .await?;
345        Ok(rows
346            .into_iter()
347            .map(|(name, created_at)| NamespaceRecord { name, created_at })
348            .collect())
349    }
350
351    async fn delete_namespace(&self, name: &str) -> Result<bool> {
352        let res = sqlx::query("DELETE FROM namespaces WHERE name = ?")
353            .bind(name)
354            .execute(&self.pool)
355            .await?;
356        Ok(res.rows_affected() > 0)
357    }
358
359    async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats> {
360        let total: (i64,) =
361            sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = ?")
362                .bind(namespace)
363                .fetch_one(&self.pool)
364                .await?;
365        let running: (i64,) = sqlx::query_as(
366            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'RUNNING'",
367        )
368        .bind(namespace)
369        .fetch_one(&self.pool)
370        .await?;
371        let pending: (i64,) = sqlx::query_as(
372            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'PENDING'",
373        )
374        .bind(namespace)
375        .fetch_one(&self.pool)
376        .await?;
377        let completed: (i64,) = sqlx::query_as(
378            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'COMPLETED'",
379        )
380        .bind(namespace)
381        .fetch_one(&self.pool)
382        .await?;
383        let failed: (i64,) = sqlx::query_as(
384            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'FAILED'",
385        )
386        .bind(namespace)
387        .fetch_one(&self.pool)
388        .await?;
389        let schedules: (i64,) =
390            sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = ?")
391                .bind(namespace)
392                .fetch_one(&self.pool)
393                .await?;
394        let workers: (i64,) =
395            sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = ?")
396                .bind(namespace)
397                .fetch_one(&self.pool)
398                .await?;
399
400        Ok(NamespaceStats {
401            namespace: namespace.to_string(),
402            total_workflows: total.0,
403            running: running.0,
404            pending: pending.0,
405            completed: completed.0,
406            failed: failed.0,
407            schedules: schedules.0,
408            workers: workers.0,
409        })
410    }
411
412    // ── Workflows ──────────────────────────────────────────
413
414    async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
415        sqlx::query(
416            "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
417             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
418        )
419        .bind(&wf.id)
420        .bind(&wf.namespace)
421        .bind(&wf.run_id)
422        .bind(&wf.workflow_type)
423        .bind(&wf.task_queue)
424        .bind(&wf.status)
425        .bind(&wf.input)
426        .bind(&wf.result)
427        .bind(&wf.error)
428        .bind(&wf.parent_id)
429        .bind(&wf.claimed_by)
430        .bind(&wf.search_attributes)
431        .bind(wf.archived_at)
432        .bind(&wf.archive_uri)
433        .bind(wf.created_at)
434        .bind(wf.updated_at)
435        .bind(wf.completed_at)
436        .execute(&self.pool)
437        .await?;
438        Ok(())
439    }
440
441    async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
442        let row = sqlx::query_as::<_, SqliteWorkflowRow>(
443            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = ?",
444        )
445        .bind(id)
446        .fetch_optional(&self.pool)
447        .await?;
448        Ok(row.map(Into::into))
449    }
450
451    async fn list_workflows(
452        &self,
453        namespace: &str,
454        status: Option<WorkflowStatus>,
455        workflow_type: Option<&str>,
456        search_attrs_filter: Option<&str>,
457        limit: i64,
458        offset: i64,
459    ) -> Result<Vec<WorkflowRecord>> {
460        let status_str = status.map(|s| s.to_string());
461
462        // Parse search filter into (key, value) pairs. Each pair adds a
463        // `json_extract(search_attributes, '$.key') = value` predicate so
464        // matches require every filter key to be present in the stored
465        // attributes. Invalid/empty JSON → no filter (all pass).
466        let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
467            .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
468            .and_then(|v| v.as_object().cloned())
469            .map(|m| m.into_iter().collect())
470            .unwrap_or_default();
471
472        let mut sql = String::from(
473            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
474             FROM workflows
475             WHERE namespace = ?
476               AND (? IS NULL OR status = ?)
477               AND (? IS NULL OR workflow_type = ?)",
478        );
479        for _ in &filter_pairs {
480            sql.push_str(" AND json_extract(search_attributes, '$.' || ?) = ?");
481        }
482        sql.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
483
484        let mut q = sqlx::query_as::<_, SqliteWorkflowRow>(&sql)
485            .bind(namespace)
486            .bind(&status_str)
487            .bind(&status_str)
488            .bind(workflow_type)
489            .bind(workflow_type);
490        for (key, value) in &filter_pairs {
491            q = q.bind(key.clone());
492            // Bind the JSON value as its string/number representation.
493            // json_extract on a stored JSON string returns its "natural"
494            // SQLite type (text for strings, numeric for numbers), so we
495            // match by the same type.
496            match value {
497                serde_json::Value::String(s) => q = q.bind(s.clone()),
498                serde_json::Value::Number(n) => {
499                    if let Some(i) = n.as_i64() {
500                        q = q.bind(i);
501                    } else if let Some(f) = n.as_f64() {
502                        q = q.bind(f);
503                    } else {
504                        q = q.bind(n.to_string());
505                    }
506                }
507                serde_json::Value::Bool(b) => q = q.bind(*b as i64),
508                _ => q = q.bind(value.to_string()),
509            }
510        }
511        let rows = q
512            .bind(limit)
513            .bind(offset)
514            .fetch_all(&self.pool)
515        .await?;
516        Ok(rows.into_iter().map(Into::into).collect())
517    }
518
519    async fn update_workflow_status(
520        &self,
521        id: &str,
522        status: WorkflowStatus,
523        result: Option<&str>,
524        error: Option<&str>,
525    ) -> Result<()> {
526        let now = timestamp_now();
527        let completed_at = if status.is_terminal() { Some(now) } else { None };
528        sqlx::query(
529            "UPDATE workflows SET status = ?, result = COALESCE(?, result), error = COALESCE(?, error), updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE id = ?",
530        )
531        .bind(status.to_string())
532        .bind(result)
533        .bind(error)
534        .bind(now)
535        .bind(completed_at)
536        .bind(id)
537        .execute(&self.pool)
538        .await?;
539        Ok(())
540    }
541
542    async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
543        let res = sqlx::query(
544            "UPDATE workflows SET claimed_by = ?, status = 'RUNNING', updated_at = ? WHERE id = ? AND claimed_by IS NULL",
545        )
546        .bind(worker_id)
547        .bind(timestamp_now())
548        .bind(id)
549        .execute(&self.pool)
550        .await?;
551        Ok(res.rows_affected() > 0)
552    }
553
554    async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
555        sqlx::query("UPDATE workflows SET needs_dispatch = 1 WHERE id = ?")
556            .bind(workflow_id)
557            .execute(&self.pool)
558            .await?;
559        Ok(())
560    }
561
562    async fn claim_workflow_task(
563        &self,
564        task_queue: &str,
565        worker_id: &str,
566    ) -> Result<Option<WorkflowRecord>> {
567        let now = timestamp_now();
568        // Atomic: pick the oldest dispatchable + unclaimed workflow on the queue
569        let row = sqlx::query_as::<_, SqliteWorkflowRow>(
570            "UPDATE workflows
571             SET dispatch_claimed_by = ?, dispatch_last_heartbeat = ?, needs_dispatch = 0
572             WHERE id = (
573                SELECT id FROM workflows
574                WHERE task_queue = ?
575                  AND needs_dispatch = 1
576                  AND dispatch_claimed_by IS NULL
577                  AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
578                ORDER BY updated_at ASC
579                LIMIT 1
580             )
581             RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
582        )
583        .bind(worker_id)
584        .bind(now)
585        .bind(task_queue)
586        .fetch_optional(&self.pool)
587        .await?;
588        Ok(row.map(Into::into))
589    }
590
591    async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
592        sqlx::query(
593            "UPDATE workflows
594             SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
595             WHERE id = ? AND dispatch_claimed_by = ?",
596        )
597        .bind(workflow_id)
598        .bind(worker_id)
599        .execute(&self.pool)
600        .await?;
601        Ok(())
602    }
603
604    async fn release_stale_dispatch_leases(
605        &self,
606        now: f64,
607        timeout_secs: f64,
608    ) -> Result<u64> {
609        // Re-arm needs_dispatch so the work goes back into the pool. Don't
610        // touch workflows that have reached a terminal state — those should
611        // never be re-dispatched.
612        let res = sqlx::query(
613            "UPDATE workflows
614             SET dispatch_claimed_by = NULL,
615                 dispatch_last_heartbeat = NULL,
616                 needs_dispatch = 1
617             WHERE dispatch_claimed_by IS NOT NULL
618               AND (? - dispatch_last_heartbeat) > ?
619               AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
620        )
621        .bind(now)
622        .bind(timeout_secs)
623        .execute(&self.pool)
624        .await?;
625        Ok(res.rows_affected())
626    }
627
628    // ── Events ─────────────────────────────────────────────
629
630    async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
631        let res = sqlx::query(
632            "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)",
633        )
634        .bind(&ev.workflow_id)
635        .bind(ev.seq)
636        .bind(&ev.event_type)
637        .bind(&ev.payload)
638        .bind(ev.timestamp)
639        .execute(&self.pool)
640        .await?;
641        Ok(res.last_insert_rowid())
642    }
643
644    async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
645        let rows = sqlx::query_as::<_, SqliteEventRow>(
646            "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = ? ORDER BY seq ASC",
647        )
648        .bind(workflow_id)
649        .fetch_all(&self.pool)
650        .await?;
651        Ok(rows.into_iter().map(Into::into).collect())
652    }
653
654    async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
655        let row: (i64,) =
656            sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = ?")
657                .bind(workflow_id)
658                .fetch_one(&self.pool)
659                .await?;
660        Ok(row.0)
661    }
662
663    // ── Activities ──────────────────────────────────────────
664
665    async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
666        let res = sqlx::query(
667            "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
668             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
669        )
670        .bind(&act.workflow_id)
671        .bind(act.seq)
672        .bind(&act.name)
673        .bind(&act.task_queue)
674        .bind(&act.input)
675        .bind(&act.status)
676        .bind(act.attempt)
677        .bind(act.max_attempts)
678        .bind(act.initial_interval_secs)
679        .bind(act.backoff_coefficient)
680        .bind(act.start_to_close_secs)
681        .bind(act.heartbeat_timeout_secs)
682        .bind(act.scheduled_at)
683        .execute(&self.pool)
684        .await?;
685        Ok(res.last_insert_rowid())
686    }
687
688    async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
689        let row = sqlx::query_as::<_, SqliteActivityRow>(
690            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
691             FROM workflow_activities WHERE id = ?",
692        )
693        .bind(id)
694        .fetch_optional(&self.pool)
695        .await?;
696        Ok(row.map(Into::into))
697    }
698
699    async fn get_activity_by_workflow_seq(
700        &self,
701        workflow_id: &str,
702        seq: i32,
703    ) -> Result<Option<WorkflowActivity>> {
704        let row = sqlx::query_as::<_, SqliteActivityRow>(
705            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
706             FROM workflow_activities WHERE workflow_id = ? AND seq = ?",
707        )
708        .bind(workflow_id)
709        .bind(seq)
710        .fetch_optional(&self.pool)
711        .await?;
712        Ok(row.map(Into::into))
713    }
714
715    async fn claim_activity(
716        &self,
717        task_queue: &str,
718        worker_id: &str,
719    ) -> Result<Option<WorkflowActivity>> {
720        let now = timestamp_now();
721        let row = sqlx::query_as::<_, SqliteActivityRow>(
722            "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = ?, started_at = ?
723             WHERE id = (
724                SELECT id FROM workflow_activities
725                WHERE task_queue = ? AND status = 'PENDING'
726                ORDER BY scheduled_at ASC
727                LIMIT 1
728             )
729             RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
730        )
731        .bind(worker_id)
732        .bind(now)
733        .bind(task_queue)
734        .fetch_optional(&self.pool)
735        .await?;
736        Ok(row.map(Into::into))
737    }
738
739    async fn requeue_activity_for_retry(
740        &self,
741        id: i64,
742        next_attempt: i32,
743        next_scheduled_at: f64,
744    ) -> Result<()> {
745        sqlx::query(
746            "UPDATE workflow_activities
747             SET status = 'PENDING', attempt = ?, scheduled_at = ?,
748                 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
749                 error = NULL
750             WHERE id = ?",
751        )
752        .bind(next_attempt)
753        .bind(next_scheduled_at)
754        .bind(id)
755        .execute(&self.pool)
756        .await?;
757        Ok(())
758    }
759
760    async fn complete_activity(
761        &self,
762        id: i64,
763        result: Option<&str>,
764        error: Option<&str>,
765        failed: bool,
766    ) -> Result<()> {
767        let status = if failed { "FAILED" } else { "COMPLETED" };
768        sqlx::query(
769            "UPDATE workflow_activities SET status = ?, result = ?, error = ?, completed_at = ? WHERE id = ?",
770        )
771        .bind(status)
772        .bind(result)
773        .bind(error)
774        .bind(timestamp_now())
775        .bind(id)
776        .execute(&self.pool)
777        .await?;
778        Ok(())
779    }
780
781    async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
782        sqlx::query("UPDATE workflow_activities SET last_heartbeat = ? WHERE id = ?")
783            .bind(timestamp_now())
784            .bind(id)
785            .execute(&self.pool)
786            .await?;
787        Ok(())
788    }
789
790    async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
791        let rows = sqlx::query_as::<_, SqliteActivityRow>(
792            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
793             FROM workflow_activities
794             WHERE status = 'RUNNING'
795               AND heartbeat_timeout_secs IS NOT NULL
796               AND last_heartbeat IS NOT NULL
797               AND (? - last_heartbeat) > heartbeat_timeout_secs",
798        )
799        .bind(now)
800        .fetch_all(&self.pool)
801        .await?;
802        Ok(rows.into_iter().map(Into::into).collect())
803    }
804
805    // ── Timers ──────────────────────────────────────────────
806
807    async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
808        let res = sqlx::query(
809            "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES (?, ?, ?, 0)",
810        )
811        .bind(&timer.workflow_id)
812        .bind(timer.seq)
813        .bind(timer.fire_at)
814        .execute(&self.pool)
815        .await?;
816        Ok(res.last_insert_rowid())
817    }
818
819    async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
820        let res = sqlx::query(
821            "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = ?
822             WHERE workflow_id = ? AND status = 'PENDING'",
823        )
824        .bind(timestamp_now())
825        .bind(workflow_id)
826        .execute(&self.pool)
827        .await?;
828        Ok(res.rows_affected())
829    }
830
831    async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
832        let res = sqlx::query(
833            "UPDATE workflow_timers SET fired = 1
834             WHERE workflow_id = ? AND fired = 0",
835        )
836        .bind(workflow_id)
837        .execute(&self.pool)
838        .await?;
839        Ok(res.rows_affected())
840    }
841
842    async fn get_timer_by_workflow_seq(
843        &self,
844        workflow_id: &str,
845        seq: i32,
846    ) -> Result<Option<WorkflowTimer>> {
847        let row = sqlx::query_as::<_, SqliteTimerRow>(
848            "SELECT id, workflow_id, seq, fire_at, fired
849             FROM workflow_timers WHERE workflow_id = ? AND seq = ?",
850        )
851        .bind(workflow_id)
852        .bind(seq)
853        .fetch_optional(&self.pool)
854        .await?;
855        Ok(row.map(Into::into))
856    }
857
858    async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
859        let rows = sqlx::query_as::<_, SqliteTimerRow>(
860            "UPDATE workflow_timers SET fired = 1
861             WHERE fired = 0 AND fire_at <= ?
862             RETURNING id, workflow_id, seq, fire_at, fired",
863        )
864        .bind(now)
865        .fetch_all(&self.pool)
866        .await?;
867        Ok(rows.into_iter().map(Into::into).collect())
868    }
869
870    // ── Signals ─────────────────────────────────────────────
871
872    async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
873        let res = sqlx::query(
874            "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES (?, ?, ?, 0, ?)",
875        )
876        .bind(&sig.workflow_id)
877        .bind(&sig.name)
878        .bind(&sig.payload)
879        .bind(sig.received_at)
880        .execute(&self.pool)
881        .await?;
882        Ok(res.last_insert_rowid())
883    }
884
885    async fn consume_signals(
886        &self,
887        workflow_id: &str,
888        name: &str,
889    ) -> Result<Vec<WorkflowSignal>> {
890        let rows = sqlx::query_as::<_, SqliteSignalRow>(
891            "UPDATE workflow_signals SET consumed = 1
892             WHERE workflow_id = ? AND name = ? AND consumed = 0
893             RETURNING id, workflow_id, name, payload, consumed, received_at",
894        )
895        .bind(workflow_id)
896        .bind(name)
897        .fetch_all(&self.pool)
898        .await?;
899        Ok(rows.into_iter().map(Into::into).collect())
900    }
901
902    // ── Schedules ───────────────────────────────────────────
903
904    async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
905        sqlx::query(
906            "INSERT INTO workflow_schedules (name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
907             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
908        )
909        .bind(&sched.name)
910        .bind(&sched.namespace)
911        .bind(&sched.workflow_type)
912        .bind(&sched.cron_expr)
913        .bind(&sched.timezone)
914        .bind(&sched.input)
915        .bind(&sched.task_queue)
916        .bind(&sched.overlap_policy)
917        .bind(sched.paused)
918        .bind(sched.last_run_at)
919        .bind(sched.next_run_at)
920        .bind(&sched.last_workflow_id)
921        .bind(sched.created_at)
922        .execute(&self.pool)
923        .await?;
924        Ok(())
925    }
926
927    async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
928        let row = sqlx::query_as::<_, SqliteScheduleRow>(
929            "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
930             FROM workflow_schedules WHERE namespace = ? AND name = ?",
931        )
932        .bind(namespace)
933        .bind(name)
934        .fetch_optional(&self.pool)
935        .await?;
936        Ok(row.map(Into::into))
937    }
938
939    async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
940        let rows = sqlx::query_as::<_, SqliteScheduleRow>(
941            "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
942             FROM workflow_schedules WHERE namespace = ? ORDER BY name",
943        )
944        .bind(namespace)
945        .fetch_all(&self.pool)
946        .await?;
947        Ok(rows.into_iter().map(Into::into).collect())
948    }
949
950    async fn update_schedule_last_run(
951        &self,
952        namespace: &str,
953        name: &str,
954        last_run_at: f64,
955        next_run_at: f64,
956        workflow_id: &str,
957    ) -> Result<()> {
958        sqlx::query(
959            "UPDATE workflow_schedules SET last_run_at = ?, next_run_at = ?, last_workflow_id = ? WHERE namespace = ? AND name = ?",
960        )
961        .bind(last_run_at)
962        .bind(next_run_at)
963        .bind(workflow_id)
964        .bind(namespace)
965        .bind(name)
966        .execute(&self.pool)
967        .await?;
968        Ok(())
969    }
970
971    async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
972        let res =
973            sqlx::query("DELETE FROM workflow_schedules WHERE namespace = ? AND name = ?")
974                .bind(namespace)
975                .bind(name)
976                .execute(&self.pool)
977                .await?;
978        Ok(res.rows_affected() > 0)
979    }
980
981    async fn list_archivable_workflows(
982        &self,
983        cutoff: f64,
984        limit: i64,
985    ) -> Result<Vec<WorkflowRecord>> {
986        let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
987            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
988             FROM workflows
989             WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
990               AND completed_at IS NOT NULL
991               AND completed_at < ?
992               AND archived_at IS NULL
993             ORDER BY completed_at ASC
994             LIMIT ?",
995        )
996        .bind(cutoff)
997        .bind(limit)
998        .fetch_all(&self.pool)
999        .await?;
1000        Ok(rows.into_iter().map(Into::into).collect())
1001    }
1002
1003    async fn mark_archived_and_purge(
1004        &self,
1005        workflow_id: &str,
1006        archive_uri: &str,
1007        archived_at: f64,
1008    ) -> Result<()> {
1009        let mut tx = self.pool.begin().await?;
1010        sqlx::query("DELETE FROM workflow_events WHERE workflow_id = ?")
1011            .bind(workflow_id)
1012            .execute(&mut *tx)
1013            .await?;
1014        sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = ?")
1015            .bind(workflow_id)
1016            .execute(&mut *tx)
1017            .await?;
1018        sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = ?")
1019            .bind(workflow_id)
1020            .execute(&mut *tx)
1021            .await?;
1022        sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = ?")
1023            .bind(workflow_id)
1024            .execute(&mut *tx)
1025            .await?;
1026        sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = ?")
1027            .bind(workflow_id)
1028            .execute(&mut *tx)
1029            .await?;
1030        sqlx::query(
1031            "UPDATE workflows SET archived_at = ?, archive_uri = ? WHERE id = ?",
1032        )
1033        .bind(archived_at)
1034        .bind(archive_uri)
1035        .bind(workflow_id)
1036        .execute(&mut *tx)
1037        .await?;
1038        tx.commit().await?;
1039        Ok(())
1040    }
1041
1042    async fn upsert_search_attributes(
1043        &self,
1044        workflow_id: &str,
1045        patch_json: &str,
1046    ) -> Result<()> {
1047        // Merge at the application layer so we don't depend on SQLite's
1048        // `json_patch`, which is only available with the json1 extension.
1049        let current: Option<(Option<String>,)> =
1050            sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = ?")
1051                .bind(workflow_id)
1052                .fetch_optional(&self.pool)
1053                .await?;
1054        let merged = merge_search_attrs(
1055            current.and_then(|(s,)| s).as_deref(),
1056            patch_json,
1057        )?;
1058        sqlx::query("UPDATE workflows SET search_attributes = ? WHERE id = ?")
1059            .bind(merged)
1060            .bind(workflow_id)
1061            .execute(&self.pool)
1062            .await?;
1063        Ok(())
1064    }
1065
1066    async fn update_schedule(
1067        &self,
1068        namespace: &str,
1069        name: &str,
1070        patch: &SchedulePatch,
1071    ) -> Result<Option<WorkflowSchedule>> {
1072        // Build the UPDATE dynamically so unchanged fields aren't touched
1073        // and NULL from `serde_json::Value::Null` round-trips cleanly.
1074        let mut sets: Vec<&'static str> = Vec::new();
1075        if patch.cron_expr.is_some() {
1076            sets.push("cron_expr = ?");
1077        }
1078        if patch.timezone.is_some() {
1079            sets.push("timezone = ?");
1080        }
1081        if patch.input.is_some() {
1082            sets.push("input = ?");
1083        }
1084        if patch.task_queue.is_some() {
1085            sets.push("task_queue = ?");
1086        }
1087        if patch.overlap_policy.is_some() {
1088            sets.push("overlap_policy = ?");
1089        }
1090        // Updating last_run_at/next_run_at is internal only (update_schedule_last_run).
1091        if sets.is_empty() {
1092            return self.get_schedule(namespace, name).await;
1093        }
1094
1095        let sql = format!(
1096            "UPDATE workflow_schedules SET {} WHERE namespace = ? AND name = ?",
1097            sets.join(", ")
1098        );
1099        let mut q = sqlx::query(&sql);
1100        if let Some(ref v) = patch.cron_expr {
1101            q = q.bind(v);
1102        }
1103        if let Some(ref v) = patch.timezone {
1104            q = q.bind(v);
1105        }
1106        if let Some(ref v) = patch.input {
1107            q = q.bind(v.to_string());
1108        }
1109        if let Some(ref v) = patch.task_queue {
1110            q = q.bind(v);
1111        }
1112        if let Some(ref v) = patch.overlap_policy {
1113            q = q.bind(v);
1114        }
1115        let res = q
1116            .bind(namespace)
1117            .bind(name)
1118            .execute(&self.pool)
1119            .await?;
1120        if res.rows_affected() == 0 {
1121            return Ok(None);
1122        }
1123        self.get_schedule(namespace, name).await
1124    }
1125
1126    async fn set_schedule_paused(
1127        &self,
1128        namespace: &str,
1129        name: &str,
1130        paused: bool,
1131    ) -> Result<Option<WorkflowSchedule>> {
1132        let res = sqlx::query(
1133            "UPDATE workflow_schedules SET paused = ? WHERE namespace = ? AND name = ?",
1134        )
1135        .bind(paused)
1136        .bind(namespace)
1137        .bind(name)
1138        .execute(&self.pool)
1139        .await?;
1140        if res.rows_affected() == 0 {
1141            return Ok(None);
1142        }
1143        self.get_schedule(namespace, name).await
1144    }
1145
1146    // ── Workers ─────────────────────────────────────────────
1147
1148    async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1149        sqlx::query(
1150            "INSERT OR REPLACE INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1151             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1152        )
1153        .bind(&w.id)
1154        .bind(&w.namespace)
1155        .bind(&w.identity)
1156        .bind(&w.task_queue)
1157        .bind(&w.workflows)
1158        .bind(&w.activities)
1159        .bind(w.max_concurrent_workflows)
1160        .bind(w.max_concurrent_activities)
1161        .bind(w.active_tasks)
1162        .bind(w.last_heartbeat)
1163        .bind(w.registered_at)
1164        .execute(&self.pool)
1165        .await?;
1166        Ok(())
1167    }
1168
1169    async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1170        sqlx::query("UPDATE workflow_workers SET last_heartbeat = ? WHERE id = ?")
1171            .bind(now)
1172            .bind(id)
1173            .execute(&self.pool)
1174            .await?;
1175        Ok(())
1176    }
1177
1178    async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1179        let rows = sqlx::query_as::<_, SqliteWorkerRow>(
1180            "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at
1181             FROM workflow_workers WHERE namespace = ? ORDER BY registered_at",
1182        )
1183        .bind(namespace)
1184        .fetch_all(&self.pool)
1185        .await?;
1186        Ok(rows.into_iter().map(Into::into).collect())
1187    }
1188
1189    async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1190        let rows: Vec<(String,)> =
1191            sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < ?")
1192                .bind(cutoff)
1193                .fetch_all(&self.pool)
1194                .await?;
1195        let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1196        if !ids.is_empty() {
1197            sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < ?")
1198                .bind(cutoff)
1199                .execute(&self.pool)
1200                .await?;
1201        }
1202        Ok(ids)
1203    }
1204
1205    // ── API Keys ────────────────────────────────────────────
1206
1207    async fn create_api_key(
1208        &self,
1209        key_hash: &str,
1210        prefix: &str,
1211        label: Option<&str>,
1212        created_at: f64,
1213    ) -> Result<()> {
1214        sqlx::query(
1215            "INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES (?, ?, ?, ?)",
1216        )
1217        .bind(key_hash)
1218        .bind(prefix)
1219        .bind(label)
1220        .bind(created_at)
1221        .execute(&self.pool)
1222        .await?;
1223        Ok(())
1224    }
1225
1226    async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1227        let row: Option<(i64,)> =
1228            sqlx::query_as("SELECT 1 FROM api_keys WHERE key_hash = ?")
1229                .bind(key_hash)
1230                .fetch_optional(&self.pool)
1231                .await?;
1232        Ok(row.is_some())
1233    }
1234
1235    async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>> {
1236        let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1237            "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1238        )
1239        .fetch_all(&self.pool)
1240        .await?;
1241        Ok(rows
1242            .into_iter()
1243            .map(|(prefix, label, created_at)| ApiKeyRecord {
1244                prefix,
1245                label,
1246                created_at,
1247            })
1248            .collect())
1249    }
1250
1251    async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1252        let res = sqlx::query("DELETE FROM api_keys WHERE prefix = ?")
1253            .bind(prefix)
1254            .execute(&self.pool)
1255            .await?;
1256        Ok(res.rows_affected() > 0)
1257    }
1258
1259    async fn api_keys_empty(&self) -> Result<bool> {
1260        let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM api_keys")
1261            .fetch_one(&self.pool)
1262            .await?;
1263        Ok(row.0 == 0)
1264    }
1265
1266    async fn get_api_key_by_label(&self, label: &str) -> Result<Option<ApiKeyRecord>> {
1267        let row: Option<(String, Option<String>, f64)> = sqlx::query_as(
1268            "SELECT prefix, label, created_at FROM api_keys WHERE label = ? LIMIT 1",
1269        )
1270        .bind(label)
1271        .fetch_optional(&self.pool)
1272        .await?;
1273        Ok(row.map(|(prefix, label, created_at)| ApiKeyRecord {
1274            prefix,
1275            label,
1276            created_at,
1277        }))
1278    }
1279
1280    // ── Child Workflows ─────────────────────────────────────
1281
1282    async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1283        let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
1284            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1285             FROM workflows WHERE parent_id = ? ORDER BY created_at ASC",
1286        )
1287        .bind(parent_id)
1288        .fetch_all(&self.pool)
1289        .await?;
1290        Ok(rows.into_iter().map(Into::into).collect())
1291    }
1292
1293    // ── Snapshots ───────────────────────────────────────────
1294
1295    async fn create_snapshot(
1296        &self,
1297        workflow_id: &str,
1298        event_seq: i32,
1299        state_json: &str,
1300    ) -> Result<()> {
1301        sqlx::query(
1302            "INSERT OR REPLACE INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1303             VALUES (?, ?, ?, ?)",
1304        )
1305        .bind(workflow_id)
1306        .bind(event_seq)
1307        .bind(state_json)
1308        .bind(timestamp_now())
1309        .execute(&self.pool)
1310        .await?;
1311        Ok(())
1312    }
1313
1314    async fn get_latest_snapshot(
1315        &self,
1316        workflow_id: &str,
1317    ) -> Result<Option<WorkflowSnapshot>> {
1318        let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1319            "SELECT workflow_id, event_seq, state_json, created_at
1320             FROM workflow_snapshots WHERE workflow_id = ?
1321             ORDER BY event_seq DESC LIMIT 1",
1322        )
1323        .bind(workflow_id)
1324        .fetch_optional(&self.pool)
1325        .await?;
1326
1327        Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1328            workflow_id,
1329            event_seq,
1330            state_json,
1331            created_at,
1332        }))
1333    }
1334
1335    // ── Queue Stats ─────────────────────────────────────────
1336
1337    async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>> {
1338        // Gather activity stats per queue for workflows in this namespace
1339        let rows = sqlx::query_as::<_, (String, i64, i64)>(
1340            "SELECT a.task_queue,
1341                    SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END),
1342                    SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END)
1343             FROM workflow_activities a
1344             INNER JOIN workflows w ON w.id = a.workflow_id
1345             WHERE w.namespace = ?
1346             GROUP BY a.task_queue",
1347        )
1348        .bind(namespace)
1349        .fetch_all(&self.pool)
1350        .await?;
1351
1352        let mut stats: Vec<QueueStats> = rows
1353            .into_iter()
1354            .map(|(queue, pending, running)| QueueStats {
1355                queue,
1356                pending_activities: pending,
1357                running_activities: running,
1358                workers: 0,
1359            })
1360            .collect();
1361
1362        // Gather worker counts per queue in this namespace
1363        let worker_rows = sqlx::query_as::<_, (String, i64)>(
1364            "SELECT task_queue, COUNT(*) FROM workflow_workers WHERE namespace = ? GROUP BY task_queue",
1365        )
1366        .bind(namespace)
1367        .fetch_all(&self.pool)
1368        .await?;
1369
1370        for (queue, count) in worker_rows {
1371            if let Some(s) = stats.iter_mut().find(|s| s.queue == queue) {
1372                s.workers = count;
1373            } else {
1374                stats.push(QueueStats {
1375                    queue,
1376                    pending_activities: 0,
1377                    running_activities: 0,
1378                    workers: count,
1379                });
1380            }
1381        }
1382
1383        stats.sort_by(|a, b| a.queue.cmp(&b.queue));
1384        Ok(stats)
1385    }
1386
1387    // ── Leader Election ─────────────────────────────────────
1388
1389    async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1390        // SQLite is single-instance — always the leader.
1391        // Also refresh the engine lock heartbeat on each scheduler tick.
1392        self.refresh_engine_lock().await.ok();
1393        Ok(true)
1394    }
1395}
1396
1397fn timestamp_now() -> f64 {
1398    std::time::SystemTime::now()
1399        .duration_since(std::time::UNIX_EPOCH)
1400        .unwrap()
1401        .as_secs_f64()
1402}
1403
1404/// Merge a JSON-object patch into a (possibly-null) current JSON object,
1405/// returning the serialised result. Shared by SQLite and Postgres stores.
1406pub(crate) fn merge_search_attrs(current: Option<&str>, patch_json: &str) -> Result<String> {
1407    let mut current_map: serde_json::Map<String, serde_json::Value> = current
1408        .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
1409        .and_then(|v| v.as_object().cloned())
1410        .unwrap_or_default();
1411    let patch: serde_json::Value = serde_json::from_str(patch_json)
1412        .map_err(|e| anyhow::anyhow!("invalid search_attributes patch: {e}"))?;
1413    let patch_obj = patch
1414        .as_object()
1415        .ok_or_else(|| anyhow::anyhow!("search_attributes patch must be a JSON object"))?;
1416    for (k, v) in patch_obj {
1417        current_map.insert(k.clone(), v.clone());
1418    }
1419    Ok(serde_json::Value::Object(current_map).to_string())
1420}
1421
1422// ── SQLite row types (sqlx::FromRow) ────────────────────────
1423
1424#[derive(sqlx::FromRow)]
1425struct SqliteWorkflowRow {
1426    id: String,
1427    namespace: String,
1428    run_id: String,
1429    workflow_type: String,
1430    task_queue: String,
1431    status: String,
1432    input: Option<String>,
1433    result: Option<String>,
1434    error: Option<String>,
1435    parent_id: Option<String>,
1436    claimed_by: Option<String>,
1437    search_attributes: Option<String>,
1438    archived_at: Option<f64>,
1439    archive_uri: Option<String>,
1440    created_at: f64,
1441    updated_at: f64,
1442    completed_at: Option<f64>,
1443}
1444
1445impl From<SqliteWorkflowRow> for WorkflowRecord {
1446    fn from(r: SqliteWorkflowRow) -> Self {
1447        Self {
1448            id: r.id,
1449            namespace: r.namespace,
1450            run_id: r.run_id,
1451            workflow_type: r.workflow_type,
1452            task_queue: r.task_queue,
1453            status: r.status,
1454            input: r.input,
1455            result: r.result,
1456            error: r.error,
1457            parent_id: r.parent_id,
1458            claimed_by: r.claimed_by,
1459            search_attributes: r.search_attributes,
1460            archived_at: r.archived_at,
1461            archive_uri: r.archive_uri,
1462            created_at: r.created_at,
1463            updated_at: r.updated_at,
1464            completed_at: r.completed_at,
1465        }
1466    }
1467}
1468
1469#[derive(sqlx::FromRow)]
1470struct SqliteEventRow {
1471    id: i64,
1472    workflow_id: String,
1473    seq: i32,
1474    event_type: String,
1475    payload: Option<String>,
1476    timestamp: f64,
1477}
1478
1479impl From<SqliteEventRow> for WorkflowEvent {
1480    fn from(r: SqliteEventRow) -> Self {
1481        Self {
1482            id: Some(r.id),
1483            workflow_id: r.workflow_id,
1484            seq: r.seq,
1485            event_type: r.event_type,
1486            payload: r.payload,
1487            timestamp: r.timestamp,
1488        }
1489    }
1490}
1491
1492#[derive(sqlx::FromRow)]
1493struct SqliteActivityRow {
1494    id: i64,
1495    workflow_id: String,
1496    seq: i32,
1497    name: String,
1498    task_queue: String,
1499    input: Option<String>,
1500    status: String,
1501    result: Option<String>,
1502    error: Option<String>,
1503    attempt: i32,
1504    max_attempts: i32,
1505    initial_interval_secs: f64,
1506    backoff_coefficient: f64,
1507    start_to_close_secs: f64,
1508    heartbeat_timeout_secs: Option<f64>,
1509    claimed_by: Option<String>,
1510    scheduled_at: f64,
1511    started_at: Option<f64>,
1512    completed_at: Option<f64>,
1513    last_heartbeat: Option<f64>,
1514}
1515
1516impl From<SqliteActivityRow> for WorkflowActivity {
1517    fn from(r: SqliteActivityRow) -> Self {
1518        Self {
1519            id: Some(r.id),
1520            workflow_id: r.workflow_id,
1521            seq: r.seq,
1522            name: r.name,
1523            task_queue: r.task_queue,
1524            input: r.input,
1525            status: r.status,
1526            result: r.result,
1527            error: r.error,
1528            attempt: r.attempt,
1529            max_attempts: r.max_attempts,
1530            initial_interval_secs: r.initial_interval_secs,
1531            backoff_coefficient: r.backoff_coefficient,
1532            start_to_close_secs: r.start_to_close_secs,
1533            heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1534            claimed_by: r.claimed_by,
1535            scheduled_at: r.scheduled_at,
1536            started_at: r.started_at,
1537            completed_at: r.completed_at,
1538            last_heartbeat: r.last_heartbeat,
1539        }
1540    }
1541}
1542
1543#[derive(sqlx::FromRow)]
1544struct SqliteTimerRow {
1545    id: i64,
1546    workflow_id: String,
1547    seq: i32,
1548    fire_at: f64,
1549    fired: bool,
1550}
1551
1552impl From<SqliteTimerRow> for WorkflowTimer {
1553    fn from(r: SqliteTimerRow) -> Self {
1554        Self {
1555            id: Some(r.id),
1556            workflow_id: r.workflow_id,
1557            seq: r.seq,
1558            fire_at: r.fire_at,
1559            fired: r.fired,
1560        }
1561    }
1562}
1563
1564#[derive(sqlx::FromRow)]
1565struct SqliteSignalRow {
1566    id: i64,
1567    workflow_id: String,
1568    name: String,
1569    payload: Option<String>,
1570    consumed: bool,
1571    received_at: f64,
1572}
1573
1574impl From<SqliteSignalRow> for WorkflowSignal {
1575    fn from(r: SqliteSignalRow) -> Self {
1576        Self {
1577            id: Some(r.id),
1578            workflow_id: r.workflow_id,
1579            name: r.name,
1580            payload: r.payload,
1581            consumed: r.consumed,
1582            received_at: r.received_at,
1583        }
1584    }
1585}
1586
1587#[derive(sqlx::FromRow)]
1588struct SqliteScheduleRow {
1589    name: String,
1590    namespace: String,
1591    workflow_type: String,
1592    cron_expr: String,
1593    timezone: String,
1594    input: Option<String>,
1595    task_queue: String,
1596    overlap_policy: String,
1597    paused: bool,
1598    last_run_at: Option<f64>,
1599    next_run_at: Option<f64>,
1600    last_workflow_id: Option<String>,
1601    created_at: f64,
1602}
1603
1604impl From<SqliteScheduleRow> for WorkflowSchedule {
1605    fn from(r: SqliteScheduleRow) -> Self {
1606        Self {
1607            name: r.name,
1608            namespace: r.namespace,
1609            workflow_type: r.workflow_type,
1610            cron_expr: r.cron_expr,
1611            timezone: r.timezone,
1612            input: r.input,
1613            task_queue: r.task_queue,
1614            overlap_policy: r.overlap_policy,
1615            paused: r.paused,
1616            last_run_at: r.last_run_at,
1617            next_run_at: r.next_run_at,
1618            last_workflow_id: r.last_workflow_id,
1619            created_at: r.created_at,
1620        }
1621    }
1622}
1623
1624#[derive(sqlx::FromRow)]
1625struct SqliteWorkerRow {
1626    id: String,
1627    namespace: String,
1628    identity: String,
1629    task_queue: String,
1630    workflows: Option<String>,
1631    activities: Option<String>,
1632    max_concurrent_workflows: i32,
1633    max_concurrent_activities: i32,
1634    active_tasks: i32,
1635    last_heartbeat: f64,
1636    registered_at: f64,
1637}
1638
1639impl From<SqliteWorkerRow> for WorkflowWorker {
1640    fn from(r: SqliteWorkerRow) -> Self {
1641        Self {
1642            id: r.id,
1643            namespace: r.namespace,
1644            identity: r.identity,
1645            task_queue: r.task_queue,
1646            workflows: r.workflows,
1647            activities: r.activities,
1648            max_concurrent_workflows: r.max_concurrent_workflows,
1649            max_concurrent_activities: r.max_concurrent_activities,
1650            active_tasks: r.active_tasks,
1651            last_heartbeat: r.last_heartbeat,
1652            registered_at: r.registered_at,
1653        }
1654    }
1655}