Skip to main content

assay_workflow/store/
sqlite.rs

1use anyhow::Result;
2use sqlx::SqlitePool;
3
4use crate::store::{ApiKeyRecord, NamespaceRecord, NamespaceStats, QueueStats, WorkflowStore};
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9    name            TEXT PRIMARY KEY,
10    created_at      REAL NOT NULL
11);
12
13INSERT OR IGNORE INTO namespaces (name, created_at)
14    VALUES ('main', strftime('%s', 'now'));
15
16CREATE TABLE IF NOT EXISTS workflows (
17    id              TEXT PRIMARY KEY,
18    namespace       TEXT NOT NULL DEFAULT 'main',
19    run_id          TEXT NOT NULL,
20    workflow_type   TEXT NOT NULL,
21    task_queue      TEXT NOT NULL DEFAULT 'main',
22    status          TEXT NOT NULL DEFAULT 'PENDING',
23    input           TEXT,
24    result          TEXT,
25    error           TEXT,
26    parent_id       TEXT,
27    claimed_by      TEXT,
28    -- Workflow-task dispatch (Phase 9): a workflow is "dispatchable" when
29    -- it has new events a worker needs to replay against. Set true on
30    -- start, on activity completion, on timer fire, on signal arrival.
31    -- Cleared when a worker claims the dispatch lease.
32    needs_dispatch  INTEGER NOT NULL DEFAULT 0,
33    dispatch_claimed_by    TEXT,
34    dispatch_last_heartbeat REAL,
35    created_at      REAL NOT NULL,
36    updated_at      REAL NOT NULL,
37    completed_at    REAL
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44    id              INTEGER PRIMARY KEY AUTOINCREMENT,
45    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
46    seq             INTEGER NOT NULL,
47    event_type      TEXT NOT NULL,
48    payload         TEXT,
49    timestamp       REAL NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54    id              INTEGER PRIMARY KEY AUTOINCREMENT,
55    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
56    seq             INTEGER NOT NULL,
57    name            TEXT NOT NULL,
58    task_queue      TEXT NOT NULL DEFAULT 'main',
59    input           TEXT,
60    status          TEXT NOT NULL DEFAULT 'PENDING',
61    result          TEXT,
62    error           TEXT,
63    attempt         INTEGER NOT NULL DEFAULT 1,
64    max_attempts    INTEGER NOT NULL DEFAULT 3,
65    initial_interval_secs   REAL NOT NULL DEFAULT 1,
66    backoff_coefficient     REAL NOT NULL DEFAULT 2,
67    start_to_close_secs     REAL NOT NULL DEFAULT 300,
68    heartbeat_timeout_secs  REAL,
69    claimed_by      TEXT,
70    scheduled_at    REAL NOT NULL,
71    started_at      REAL,
72    completed_at    REAL,
73    last_heartbeat  REAL,
74    UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79    id              INTEGER PRIMARY KEY AUTOINCREMENT,
80    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
81    seq             INTEGER NOT NULL,
82    fire_at         REAL NOT NULL,
83    fired           INTEGER NOT NULL DEFAULT 0,
84    UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at);
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89    id              INTEGER PRIMARY KEY AUTOINCREMENT,
90    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
91    name            TEXT NOT NULL,
92    payload         TEXT,
93    consumed        INTEGER NOT NULL DEFAULT 0,
94    received_at     REAL NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99    name            TEXT NOT NULL,
100    namespace       TEXT NOT NULL DEFAULT 'main',
101    workflow_type   TEXT NOT NULL,
102    cron_expr       TEXT NOT NULL,
103    input           TEXT,
104    task_queue      TEXT NOT NULL DEFAULT 'main',
105    overlap_policy  TEXT NOT NULL DEFAULT 'skip',
106    paused          INTEGER NOT NULL DEFAULT 0,
107    last_run_at     REAL,
108    next_run_at     REAL,
109    last_workflow_id TEXT,
110    created_at      REAL NOT NULL,
111    PRIMARY KEY (namespace, name)
112);
113
114CREATE TABLE IF NOT EXISTS workflow_workers (
115    id              TEXT PRIMARY KEY,
116    namespace       TEXT NOT NULL DEFAULT 'main',
117    identity        TEXT NOT NULL,
118    task_queue      TEXT NOT NULL,
119    workflows       TEXT,
120    activities      TEXT,
121    max_concurrent_workflows  INTEGER NOT NULL DEFAULT 10,
122    max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
123    active_tasks    INTEGER NOT NULL DEFAULT 0,
124    last_heartbeat  REAL NOT NULL,
125    registered_at   REAL NOT NULL
126);
127
128CREATE TABLE IF NOT EXISTS workflow_snapshots (
129    workflow_id     TEXT NOT NULL REFERENCES workflows(id),
130    event_seq       INTEGER NOT NULL,
131    state_json      TEXT NOT NULL,
132    created_at      REAL NOT NULL,
133    PRIMARY KEY (workflow_id, event_seq)
134);
135
136CREATE TABLE IF NOT EXISTS api_keys (
137    key_hash        TEXT PRIMARY KEY,
138    prefix          TEXT NOT NULL,
139    label           TEXT,
140    created_at      REAL NOT NULL
141);
142CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
143
144CREATE TABLE IF NOT EXISTS engine_lock (
145    id              INTEGER PRIMARY KEY CHECK (id = 1),
146    instance_id     TEXT NOT NULL,
147    started_at      REAL NOT NULL,
148    last_heartbeat  REAL NOT NULL
149);
150"#;
151
152/// Stale lock timeout — if the lock holder hasn't heartbeated in this
153/// many seconds, assume it's dead and allow takeover.
154const LOCK_STALE_SECS: f64 = 60.0;
155/// How often to refresh the lock heartbeat.
156const LOCK_HEARTBEAT_SECS: u64 = 15;
157
158pub struct SqliteStore {
159    pool: SqlitePool,
160    instance_id: String,
161}
162
163impl SqliteStore {
164    pub async fn new(url: &str) -> Result<Self> {
165        let pool = SqlitePool::connect(url).await?;
166        let instance_id = format!("assay-{:016x}", {
167            use std::collections::hash_map::DefaultHasher;
168            use std::hash::{Hash, Hasher};
169            let mut h = DefaultHasher::new();
170            std::time::SystemTime::now().hash(&mut h);
171            std::process::id().hash(&mut h);
172            h.finish()
173        });
174        let store = Self { pool, instance_id };
175        store.migrate().await?;
176        Ok(store)
177    }
178
179    /// Acquire the single-instance engine lock.
180    /// Returns an error if another instance is already running.
181    pub async fn acquire_engine_lock(&self) -> Result<()> {
182        let now = timestamp_now();
183
184        // Try to insert the lock
185        let result = sqlx::query(
186            "INSERT INTO engine_lock (id, instance_id, started_at, last_heartbeat) VALUES (1, ?, ?, ?)",
187        )
188        .bind(&self.instance_id)
189        .bind(now)
190        .bind(now)
191        .execute(&self.pool)
192        .await;
193
194        match result {
195            Ok(_) => Ok(()),
196            Err(_) => {
197                // Lock exists — check if it's stale
198                let row: Option<(String, f64)> = sqlx::query_as(
199                    "SELECT instance_id, last_heartbeat FROM engine_lock WHERE id = 1",
200                )
201                .fetch_optional(&self.pool)
202                .await?;
203
204                if let Some((existing_id, last_hb)) = row {
205                    if now - last_hb > LOCK_STALE_SECS {
206                        // Stale lock — take over
207                        sqlx::query(
208                            "UPDATE engine_lock SET instance_id = ?, started_at = ?, last_heartbeat = ? WHERE id = 1",
209                        )
210                        .bind(&self.instance_id)
211                        .bind(now)
212                        .bind(now)
213                        .execute(&self.pool)
214                        .await?;
215                        tracing::warn!(
216                            "Took over stale engine lock from {existing_id} (last heartbeat {:.0}s ago)",
217                            now - last_hb
218                        );
219                        Ok(())
220                    } else {
221                        let age = now - last_hb;
222                        anyhow::bail!(
223                            "Another assay engine instance is already running (id: {existing_id}, \
224                             last heartbeat {age:.0}s ago).\n\n\
225                             SQLite only supports a single engine instance. For multi-instance \
226                             deployment (Kubernetes, Docker Swarm), use PostgreSQL:\n\n\
227                             \x20 assay serve --backend postgres://user:pass@host:5432/dbname"
228                        );
229                    }
230                } else {
231                    anyhow::bail!("Unexpected engine lock state");
232                }
233            }
234        }
235    }
236
237    /// Refresh the engine lock heartbeat. Called periodically by the engine.
238    pub async fn refresh_engine_lock(&self) -> Result<()> {
239        sqlx::query("UPDATE engine_lock SET last_heartbeat = ? WHERE id = 1 AND instance_id = ?")
240            .bind(timestamp_now())
241            .bind(&self.instance_id)
242            .execute(&self.pool)
243            .await?;
244        Ok(())
245    }
246
247    /// Release the engine lock on shutdown.
248    pub async fn release_engine_lock(&self) -> Result<()> {
249        sqlx::query("DELETE FROM engine_lock WHERE id = 1 AND instance_id = ?")
250            .bind(&self.instance_id)
251            .execute(&self.pool)
252            .await?;
253        Ok(())
254    }
255
256    /// Start background task to keep the lock alive.
257    pub fn spawn_lock_heartbeat(self: &std::sync::Arc<Self>) {
258        let store = std::sync::Arc::clone(self);
259        tokio::spawn(async move {
260            let mut tick = tokio::time::interval(std::time::Duration::from_secs(LOCK_HEARTBEAT_SECS));
261            loop {
262                tick.tick().await;
263                if let Err(e) = store.refresh_engine_lock().await {
264                    tracing::error!("Engine lock heartbeat failed: {e}");
265                }
266            }
267        });
268    }
269
270    async fn migrate(&self) -> Result<()> {
271        for statement in SCHEMA.split(';') {
272            let trimmed = statement.trim();
273            if !trimmed.is_empty() {
274                sqlx::query(trimmed).execute(&self.pool).await?;
275            }
276        }
277        Ok(())
278    }
279}
280
281impl WorkflowStore for SqliteStore {
282    // ── Namespaces ─────────────────────────────────────────
283
284    async fn create_namespace(&self, name: &str) -> Result<()> {
285        sqlx::query("INSERT INTO namespaces (name, created_at) VALUES (?, ?)")
286            .bind(name)
287            .bind(timestamp_now())
288            .execute(&self.pool)
289            .await?;
290        Ok(())
291    }
292
293    async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>> {
294        let rows = sqlx::query_as::<_, (String, f64)>(
295            "SELECT name, created_at FROM namespaces ORDER BY name",
296        )
297        .fetch_all(&self.pool)
298        .await?;
299        Ok(rows
300            .into_iter()
301            .map(|(name, created_at)| NamespaceRecord { name, created_at })
302            .collect())
303    }
304
305    async fn delete_namespace(&self, name: &str) -> Result<bool> {
306        let res = sqlx::query("DELETE FROM namespaces WHERE name = ?")
307            .bind(name)
308            .execute(&self.pool)
309            .await?;
310        Ok(res.rows_affected() > 0)
311    }
312
313    async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats> {
314        let total: (i64,) =
315            sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = ?")
316                .bind(namespace)
317                .fetch_one(&self.pool)
318                .await?;
319        let running: (i64,) = sqlx::query_as(
320            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'RUNNING'",
321        )
322        .bind(namespace)
323        .fetch_one(&self.pool)
324        .await?;
325        let pending: (i64,) = sqlx::query_as(
326            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'PENDING'",
327        )
328        .bind(namespace)
329        .fetch_one(&self.pool)
330        .await?;
331        let completed: (i64,) = sqlx::query_as(
332            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'COMPLETED'",
333        )
334        .bind(namespace)
335        .fetch_one(&self.pool)
336        .await?;
337        let failed: (i64,) = sqlx::query_as(
338            "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'FAILED'",
339        )
340        .bind(namespace)
341        .fetch_one(&self.pool)
342        .await?;
343        let schedules: (i64,) =
344            sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = ?")
345                .bind(namespace)
346                .fetch_one(&self.pool)
347                .await?;
348        let workers: (i64,) =
349            sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = ?")
350                .bind(namespace)
351                .fetch_one(&self.pool)
352                .await?;
353
354        Ok(NamespaceStats {
355            namespace: namespace.to_string(),
356            total_workflows: total.0,
357            running: running.0,
358            pending: pending.0,
359            completed: completed.0,
360            failed: failed.0,
361            schedules: schedules.0,
362            workers: workers.0,
363        })
364    }
365
366    // ── Workflows ──────────────────────────────────────────
367
368    async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
369        sqlx::query(
370            "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at)
371             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
372        )
373        .bind(&wf.id)
374        .bind(&wf.namespace)
375        .bind(&wf.run_id)
376        .bind(&wf.workflow_type)
377        .bind(&wf.task_queue)
378        .bind(&wf.status)
379        .bind(&wf.input)
380        .bind(&wf.result)
381        .bind(&wf.error)
382        .bind(&wf.parent_id)
383        .bind(&wf.claimed_by)
384        .bind(wf.created_at)
385        .bind(wf.updated_at)
386        .bind(wf.completed_at)
387        .execute(&self.pool)
388        .await?;
389        Ok(())
390    }
391
392    async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
393        let row = sqlx::query_as::<_, SqliteWorkflowRow>(
394            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at FROM workflows WHERE id = ?",
395        )
396        .bind(id)
397        .fetch_optional(&self.pool)
398        .await?;
399        Ok(row.map(Into::into))
400    }
401
402    async fn list_workflows(
403        &self,
404        namespace: &str,
405        status: Option<WorkflowStatus>,
406        workflow_type: Option<&str>,
407        limit: i64,
408        offset: i64,
409    ) -> Result<Vec<WorkflowRecord>> {
410        let status_str = status.map(|s| s.to_string());
411        let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
412            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
413             FROM workflows
414             WHERE namespace = ?
415               AND (? IS NULL OR status = ?)
416               AND (? IS NULL OR workflow_type = ?)
417             ORDER BY created_at DESC
418             LIMIT ? OFFSET ?",
419        )
420        .bind(namespace)
421        .bind(&status_str)
422        .bind(&status_str)
423        .bind(workflow_type)
424        .bind(workflow_type)
425        .bind(limit)
426        .bind(offset)
427        .fetch_all(&self.pool)
428        .await?;
429        Ok(rows.into_iter().map(Into::into).collect())
430    }
431
432    async fn update_workflow_status(
433        &self,
434        id: &str,
435        status: WorkflowStatus,
436        result: Option<&str>,
437        error: Option<&str>,
438    ) -> Result<()> {
439        let now = timestamp_now();
440        let completed_at = if status.is_terminal() { Some(now) } else { None };
441        sqlx::query(
442            "UPDATE workflows SET status = ?, result = COALESCE(?, result), error = COALESCE(?, error), updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE id = ?",
443        )
444        .bind(status.to_string())
445        .bind(result)
446        .bind(error)
447        .bind(now)
448        .bind(completed_at)
449        .bind(id)
450        .execute(&self.pool)
451        .await?;
452        Ok(())
453    }
454
455    async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
456        let res = sqlx::query(
457            "UPDATE workflows SET claimed_by = ?, status = 'RUNNING', updated_at = ? WHERE id = ? AND claimed_by IS NULL",
458        )
459        .bind(worker_id)
460        .bind(timestamp_now())
461        .bind(id)
462        .execute(&self.pool)
463        .await?;
464        Ok(res.rows_affected() > 0)
465    }
466
467    async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
468        sqlx::query("UPDATE workflows SET needs_dispatch = 1 WHERE id = ?")
469            .bind(workflow_id)
470            .execute(&self.pool)
471            .await?;
472        Ok(())
473    }
474
475    async fn claim_workflow_task(
476        &self,
477        task_queue: &str,
478        worker_id: &str,
479    ) -> Result<Option<WorkflowRecord>> {
480        let now = timestamp_now();
481        // Atomic: pick the oldest dispatchable + unclaimed workflow on the queue
482        let row = sqlx::query_as::<_, SqliteWorkflowRow>(
483            "UPDATE workflows
484             SET dispatch_claimed_by = ?, dispatch_last_heartbeat = ?, needs_dispatch = 0
485             WHERE id = (
486                SELECT id FROM workflows
487                WHERE task_queue = ?
488                  AND needs_dispatch = 1
489                  AND dispatch_claimed_by IS NULL
490                  AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
491                ORDER BY updated_at ASC
492                LIMIT 1
493             )
494             RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at",
495        )
496        .bind(worker_id)
497        .bind(now)
498        .bind(task_queue)
499        .fetch_optional(&self.pool)
500        .await?;
501        Ok(row.map(Into::into))
502    }
503
504    async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
505        sqlx::query(
506            "UPDATE workflows
507             SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
508             WHERE id = ? AND dispatch_claimed_by = ?",
509        )
510        .bind(workflow_id)
511        .bind(worker_id)
512        .execute(&self.pool)
513        .await?;
514        Ok(())
515    }
516
517    async fn release_stale_dispatch_leases(
518        &self,
519        now: f64,
520        timeout_secs: f64,
521    ) -> Result<u64> {
522        // Re-arm needs_dispatch so the work goes back into the pool. Don't
523        // touch workflows that have reached a terminal state — those should
524        // never be re-dispatched.
525        let res = sqlx::query(
526            "UPDATE workflows
527             SET dispatch_claimed_by = NULL,
528                 dispatch_last_heartbeat = NULL,
529                 needs_dispatch = 1
530             WHERE dispatch_claimed_by IS NOT NULL
531               AND (? - dispatch_last_heartbeat) > ?
532               AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
533        )
534        .bind(now)
535        .bind(timeout_secs)
536        .execute(&self.pool)
537        .await?;
538        Ok(res.rows_affected())
539    }
540
541    // ── Events ─────────────────────────────────────────────
542
543    async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
544        let res = sqlx::query(
545            "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)",
546        )
547        .bind(&ev.workflow_id)
548        .bind(ev.seq)
549        .bind(&ev.event_type)
550        .bind(&ev.payload)
551        .bind(ev.timestamp)
552        .execute(&self.pool)
553        .await?;
554        Ok(res.last_insert_rowid())
555    }
556
557    async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
558        let rows = sqlx::query_as::<_, SqliteEventRow>(
559            "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = ? ORDER BY seq ASC",
560        )
561        .bind(workflow_id)
562        .fetch_all(&self.pool)
563        .await?;
564        Ok(rows.into_iter().map(Into::into).collect())
565    }
566
567    async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
568        let row: (i64,) =
569            sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = ?")
570                .bind(workflow_id)
571                .fetch_one(&self.pool)
572                .await?;
573        Ok(row.0)
574    }
575
576    // ── Activities ──────────────────────────────────────────
577
578    async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
579        let res = sqlx::query(
580            "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
581             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
582        )
583        .bind(&act.workflow_id)
584        .bind(act.seq)
585        .bind(&act.name)
586        .bind(&act.task_queue)
587        .bind(&act.input)
588        .bind(&act.status)
589        .bind(act.attempt)
590        .bind(act.max_attempts)
591        .bind(act.initial_interval_secs)
592        .bind(act.backoff_coefficient)
593        .bind(act.start_to_close_secs)
594        .bind(act.heartbeat_timeout_secs)
595        .bind(act.scheduled_at)
596        .execute(&self.pool)
597        .await?;
598        Ok(res.last_insert_rowid())
599    }
600
601    async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
602        let row = sqlx::query_as::<_, SqliteActivityRow>(
603            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
604             FROM workflow_activities WHERE id = ?",
605        )
606        .bind(id)
607        .fetch_optional(&self.pool)
608        .await?;
609        Ok(row.map(Into::into))
610    }
611
612    async fn get_activity_by_workflow_seq(
613        &self,
614        workflow_id: &str,
615        seq: i32,
616    ) -> Result<Option<WorkflowActivity>> {
617        let row = sqlx::query_as::<_, SqliteActivityRow>(
618            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
619             FROM workflow_activities WHERE workflow_id = ? AND seq = ?",
620        )
621        .bind(workflow_id)
622        .bind(seq)
623        .fetch_optional(&self.pool)
624        .await?;
625        Ok(row.map(Into::into))
626    }
627
628    async fn claim_activity(
629        &self,
630        task_queue: &str,
631        worker_id: &str,
632    ) -> Result<Option<WorkflowActivity>> {
633        let now = timestamp_now();
634        let row = sqlx::query_as::<_, SqliteActivityRow>(
635            "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = ?, started_at = ?
636             WHERE id = (
637                SELECT id FROM workflow_activities
638                WHERE task_queue = ? AND status = 'PENDING'
639                ORDER BY scheduled_at ASC
640                LIMIT 1
641             )
642             RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
643        )
644        .bind(worker_id)
645        .bind(now)
646        .bind(task_queue)
647        .fetch_optional(&self.pool)
648        .await?;
649        Ok(row.map(Into::into))
650    }
651
652    async fn requeue_activity_for_retry(
653        &self,
654        id: i64,
655        next_attempt: i32,
656        next_scheduled_at: f64,
657    ) -> Result<()> {
658        sqlx::query(
659            "UPDATE workflow_activities
660             SET status = 'PENDING', attempt = ?, scheduled_at = ?,
661                 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
662                 error = NULL
663             WHERE id = ?",
664        )
665        .bind(next_attempt)
666        .bind(next_scheduled_at)
667        .bind(id)
668        .execute(&self.pool)
669        .await?;
670        Ok(())
671    }
672
673    async fn complete_activity(
674        &self,
675        id: i64,
676        result: Option<&str>,
677        error: Option<&str>,
678        failed: bool,
679    ) -> Result<()> {
680        let status = if failed { "FAILED" } else { "COMPLETED" };
681        sqlx::query(
682            "UPDATE workflow_activities SET status = ?, result = ?, error = ?, completed_at = ? WHERE id = ?",
683        )
684        .bind(status)
685        .bind(result)
686        .bind(error)
687        .bind(timestamp_now())
688        .bind(id)
689        .execute(&self.pool)
690        .await?;
691        Ok(())
692    }
693
694    async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
695        sqlx::query("UPDATE workflow_activities SET last_heartbeat = ? WHERE id = ?")
696            .bind(timestamp_now())
697            .bind(id)
698            .execute(&self.pool)
699            .await?;
700        Ok(())
701    }
702
703    async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
704        let rows = sqlx::query_as::<_, SqliteActivityRow>(
705            "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
706             FROM workflow_activities
707             WHERE status = 'RUNNING'
708               AND heartbeat_timeout_secs IS NOT NULL
709               AND last_heartbeat IS NOT NULL
710               AND (? - last_heartbeat) > heartbeat_timeout_secs",
711        )
712        .bind(now)
713        .fetch_all(&self.pool)
714        .await?;
715        Ok(rows.into_iter().map(Into::into).collect())
716    }
717
718    // ── Timers ──────────────────────────────────────────────
719
720    async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
721        let res = sqlx::query(
722            "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES (?, ?, ?, 0)",
723        )
724        .bind(&timer.workflow_id)
725        .bind(timer.seq)
726        .bind(timer.fire_at)
727        .execute(&self.pool)
728        .await?;
729        Ok(res.last_insert_rowid())
730    }
731
732    async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
733        let res = sqlx::query(
734            "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = ?
735             WHERE workflow_id = ? AND status = 'PENDING'",
736        )
737        .bind(timestamp_now())
738        .bind(workflow_id)
739        .execute(&self.pool)
740        .await?;
741        Ok(res.rows_affected())
742    }
743
744    async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
745        let res = sqlx::query(
746            "UPDATE workflow_timers SET fired = 1
747             WHERE workflow_id = ? AND fired = 0",
748        )
749        .bind(workflow_id)
750        .execute(&self.pool)
751        .await?;
752        Ok(res.rows_affected())
753    }
754
755    async fn get_timer_by_workflow_seq(
756        &self,
757        workflow_id: &str,
758        seq: i32,
759    ) -> Result<Option<WorkflowTimer>> {
760        let row = sqlx::query_as::<_, SqliteTimerRow>(
761            "SELECT id, workflow_id, seq, fire_at, fired
762             FROM workflow_timers WHERE workflow_id = ? AND seq = ?",
763        )
764        .bind(workflow_id)
765        .bind(seq)
766        .fetch_optional(&self.pool)
767        .await?;
768        Ok(row.map(Into::into))
769    }
770
771    async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
772        let rows = sqlx::query_as::<_, SqliteTimerRow>(
773            "UPDATE workflow_timers SET fired = 1
774             WHERE fired = 0 AND fire_at <= ?
775             RETURNING id, workflow_id, seq, fire_at, fired",
776        )
777        .bind(now)
778        .fetch_all(&self.pool)
779        .await?;
780        Ok(rows.into_iter().map(Into::into).collect())
781    }
782
783    // ── Signals ─────────────────────────────────────────────
784
785    async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
786        let res = sqlx::query(
787            "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES (?, ?, ?, 0, ?)",
788        )
789        .bind(&sig.workflow_id)
790        .bind(&sig.name)
791        .bind(&sig.payload)
792        .bind(sig.received_at)
793        .execute(&self.pool)
794        .await?;
795        Ok(res.last_insert_rowid())
796    }
797
798    async fn consume_signals(
799        &self,
800        workflow_id: &str,
801        name: &str,
802    ) -> Result<Vec<WorkflowSignal>> {
803        let rows = sqlx::query_as::<_, SqliteSignalRow>(
804            "UPDATE workflow_signals SET consumed = 1
805             WHERE workflow_id = ? AND name = ? AND consumed = 0
806             RETURNING id, workflow_id, name, payload, consumed, received_at",
807        )
808        .bind(workflow_id)
809        .bind(name)
810        .fetch_all(&self.pool)
811        .await?;
812        Ok(rows.into_iter().map(Into::into).collect())
813    }
814
815    // ── Schedules ───────────────────────────────────────────
816
817    async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
818        sqlx::query(
819            "INSERT INTO workflow_schedules (name, namespace, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
820             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
821        )
822        .bind(&sched.name)
823        .bind(&sched.namespace)
824        .bind(&sched.workflow_type)
825        .bind(&sched.cron_expr)
826        .bind(&sched.input)
827        .bind(&sched.task_queue)
828        .bind(&sched.overlap_policy)
829        .bind(sched.paused)
830        .bind(sched.last_run_at)
831        .bind(sched.next_run_at)
832        .bind(&sched.last_workflow_id)
833        .bind(sched.created_at)
834        .execute(&self.pool)
835        .await?;
836        Ok(())
837    }
838
839    async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
840        let row = sqlx::query_as::<_, SqliteScheduleRow>(
841            "SELECT name, namespace, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
842             FROM workflow_schedules WHERE namespace = ? AND name = ?",
843        )
844        .bind(namespace)
845        .bind(name)
846        .fetch_optional(&self.pool)
847        .await?;
848        Ok(row.map(Into::into))
849    }
850
851    async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
852        let rows = sqlx::query_as::<_, SqliteScheduleRow>(
853            "SELECT name, namespace, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
854             FROM workflow_schedules WHERE namespace = ? ORDER BY name",
855        )
856        .bind(namespace)
857        .fetch_all(&self.pool)
858        .await?;
859        Ok(rows.into_iter().map(Into::into).collect())
860    }
861
862    async fn update_schedule_last_run(
863        &self,
864        namespace: &str,
865        name: &str,
866        last_run_at: f64,
867        next_run_at: f64,
868        workflow_id: &str,
869    ) -> Result<()> {
870        sqlx::query(
871            "UPDATE workflow_schedules SET last_run_at = ?, next_run_at = ?, last_workflow_id = ? WHERE namespace = ? AND name = ?",
872        )
873        .bind(last_run_at)
874        .bind(next_run_at)
875        .bind(workflow_id)
876        .bind(namespace)
877        .bind(name)
878        .execute(&self.pool)
879        .await?;
880        Ok(())
881    }
882
883    async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
884        let res =
885            sqlx::query("DELETE FROM workflow_schedules WHERE namespace = ? AND name = ?")
886                .bind(namespace)
887                .bind(name)
888                .execute(&self.pool)
889                .await?;
890        Ok(res.rows_affected() > 0)
891    }
892
893    // ── Workers ─────────────────────────────────────────────
894
895    async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
896        sqlx::query(
897            "INSERT OR REPLACE INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
898             VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
899        )
900        .bind(&w.id)
901        .bind(&w.namespace)
902        .bind(&w.identity)
903        .bind(&w.task_queue)
904        .bind(&w.workflows)
905        .bind(&w.activities)
906        .bind(w.max_concurrent_workflows)
907        .bind(w.max_concurrent_activities)
908        .bind(w.active_tasks)
909        .bind(w.last_heartbeat)
910        .bind(w.registered_at)
911        .execute(&self.pool)
912        .await?;
913        Ok(())
914    }
915
916    async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
917        sqlx::query("UPDATE workflow_workers SET last_heartbeat = ? WHERE id = ?")
918            .bind(now)
919            .bind(id)
920            .execute(&self.pool)
921            .await?;
922        Ok(())
923    }
924
925    async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
926        let rows = sqlx::query_as::<_, SqliteWorkerRow>(
927            "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at
928             FROM workflow_workers WHERE namespace = ? ORDER BY registered_at",
929        )
930        .bind(namespace)
931        .fetch_all(&self.pool)
932        .await?;
933        Ok(rows.into_iter().map(Into::into).collect())
934    }
935
936    async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
937        let rows: Vec<(String,)> =
938            sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < ?")
939                .bind(cutoff)
940                .fetch_all(&self.pool)
941                .await?;
942        let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
943        if !ids.is_empty() {
944            sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < ?")
945                .bind(cutoff)
946                .execute(&self.pool)
947                .await?;
948        }
949        Ok(ids)
950    }
951
952    // ── API Keys ────────────────────────────────────────────
953
954    async fn create_api_key(
955        &self,
956        key_hash: &str,
957        prefix: &str,
958        label: Option<&str>,
959        created_at: f64,
960    ) -> Result<()> {
961        sqlx::query(
962            "INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES (?, ?, ?, ?)",
963        )
964        .bind(key_hash)
965        .bind(prefix)
966        .bind(label)
967        .bind(created_at)
968        .execute(&self.pool)
969        .await?;
970        Ok(())
971    }
972
973    async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
974        let row: Option<(i64,)> =
975            sqlx::query_as("SELECT 1 FROM api_keys WHERE key_hash = ?")
976                .bind(key_hash)
977                .fetch_optional(&self.pool)
978                .await?;
979        Ok(row.is_some())
980    }
981
982    async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>> {
983        let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
984            "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
985        )
986        .fetch_all(&self.pool)
987        .await?;
988        Ok(rows
989            .into_iter()
990            .map(|(prefix, label, created_at)| ApiKeyRecord {
991                prefix,
992                label,
993                created_at,
994            })
995            .collect())
996    }
997
998    async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
999        let res = sqlx::query("DELETE FROM api_keys WHERE prefix = ?")
1000            .bind(prefix)
1001            .execute(&self.pool)
1002            .await?;
1003        Ok(res.rows_affected() > 0)
1004    }
1005
1006    // ── Child Workflows ─────────────────────────────────────
1007
1008    async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1009        let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
1010            "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
1011             FROM workflows WHERE parent_id = ? ORDER BY created_at ASC",
1012        )
1013        .bind(parent_id)
1014        .fetch_all(&self.pool)
1015        .await?;
1016        Ok(rows.into_iter().map(Into::into).collect())
1017    }
1018
1019    // ── Snapshots ───────────────────────────────────────────
1020
1021    async fn create_snapshot(
1022        &self,
1023        workflow_id: &str,
1024        event_seq: i32,
1025        state_json: &str,
1026    ) -> Result<()> {
1027        sqlx::query(
1028            "INSERT OR REPLACE INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1029             VALUES (?, ?, ?, ?)",
1030        )
1031        .bind(workflow_id)
1032        .bind(event_seq)
1033        .bind(state_json)
1034        .bind(timestamp_now())
1035        .execute(&self.pool)
1036        .await?;
1037        Ok(())
1038    }
1039
1040    async fn get_latest_snapshot(
1041        &self,
1042        workflow_id: &str,
1043    ) -> Result<Option<WorkflowSnapshot>> {
1044        let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1045            "SELECT workflow_id, event_seq, state_json, created_at
1046             FROM workflow_snapshots WHERE workflow_id = ?
1047             ORDER BY event_seq DESC LIMIT 1",
1048        )
1049        .bind(workflow_id)
1050        .fetch_optional(&self.pool)
1051        .await?;
1052
1053        Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1054            workflow_id,
1055            event_seq,
1056            state_json,
1057            created_at,
1058        }))
1059    }
1060
1061    // ── Queue Stats ─────────────────────────────────────────
1062
1063    async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>> {
1064        // Gather activity stats per queue for workflows in this namespace
1065        let rows = sqlx::query_as::<_, (String, i64, i64)>(
1066            "SELECT a.task_queue,
1067                    SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END),
1068                    SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END)
1069             FROM workflow_activities a
1070             INNER JOIN workflows w ON w.id = a.workflow_id
1071             WHERE w.namespace = ?
1072             GROUP BY a.task_queue",
1073        )
1074        .bind(namespace)
1075        .fetch_all(&self.pool)
1076        .await?;
1077
1078        let mut stats: Vec<QueueStats> = rows
1079            .into_iter()
1080            .map(|(queue, pending, running)| QueueStats {
1081                queue,
1082                pending_activities: pending,
1083                running_activities: running,
1084                workers: 0,
1085            })
1086            .collect();
1087
1088        // Gather worker counts per queue in this namespace
1089        let worker_rows = sqlx::query_as::<_, (String, i64)>(
1090            "SELECT task_queue, COUNT(*) FROM workflow_workers WHERE namespace = ? GROUP BY task_queue",
1091        )
1092        .bind(namespace)
1093        .fetch_all(&self.pool)
1094        .await?;
1095
1096        for (queue, count) in worker_rows {
1097            if let Some(s) = stats.iter_mut().find(|s| s.queue == queue) {
1098                s.workers = count;
1099            } else {
1100                stats.push(QueueStats {
1101                    queue,
1102                    pending_activities: 0,
1103                    running_activities: 0,
1104                    workers: count,
1105                });
1106            }
1107        }
1108
1109        stats.sort_by(|a, b| a.queue.cmp(&b.queue));
1110        Ok(stats)
1111    }
1112
1113    // ── Leader Election ─────────────────────────────────────
1114
1115    async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1116        // SQLite is single-instance — always the leader.
1117        // Also refresh the engine lock heartbeat on each scheduler tick.
1118        self.refresh_engine_lock().await.ok();
1119        Ok(true)
1120    }
1121}
1122
1123fn timestamp_now() -> f64 {
1124    std::time::SystemTime::now()
1125        .duration_since(std::time::UNIX_EPOCH)
1126        .unwrap()
1127        .as_secs_f64()
1128}
1129
1130// ── SQLite row types (sqlx::FromRow) ────────────────────────
1131
1132#[derive(sqlx::FromRow)]
1133struct SqliteWorkflowRow {
1134    id: String,
1135    namespace: String,
1136    run_id: String,
1137    workflow_type: String,
1138    task_queue: String,
1139    status: String,
1140    input: Option<String>,
1141    result: Option<String>,
1142    error: Option<String>,
1143    parent_id: Option<String>,
1144    claimed_by: Option<String>,
1145    created_at: f64,
1146    updated_at: f64,
1147    completed_at: Option<f64>,
1148}
1149
1150impl From<SqliteWorkflowRow> for WorkflowRecord {
1151    fn from(r: SqliteWorkflowRow) -> Self {
1152        Self {
1153            id: r.id,
1154            namespace: r.namespace,
1155            run_id: r.run_id,
1156            workflow_type: r.workflow_type,
1157            task_queue: r.task_queue,
1158            status: r.status,
1159            input: r.input,
1160            result: r.result,
1161            error: r.error,
1162            parent_id: r.parent_id,
1163            claimed_by: r.claimed_by,
1164            created_at: r.created_at,
1165            updated_at: r.updated_at,
1166            completed_at: r.completed_at,
1167        }
1168    }
1169}
1170
1171#[derive(sqlx::FromRow)]
1172struct SqliteEventRow {
1173    id: i64,
1174    workflow_id: String,
1175    seq: i32,
1176    event_type: String,
1177    payload: Option<String>,
1178    timestamp: f64,
1179}
1180
1181impl From<SqliteEventRow> for WorkflowEvent {
1182    fn from(r: SqliteEventRow) -> Self {
1183        Self {
1184            id: Some(r.id),
1185            workflow_id: r.workflow_id,
1186            seq: r.seq,
1187            event_type: r.event_type,
1188            payload: r.payload,
1189            timestamp: r.timestamp,
1190        }
1191    }
1192}
1193
1194#[derive(sqlx::FromRow)]
1195struct SqliteActivityRow {
1196    id: i64,
1197    workflow_id: String,
1198    seq: i32,
1199    name: String,
1200    task_queue: String,
1201    input: Option<String>,
1202    status: String,
1203    result: Option<String>,
1204    error: Option<String>,
1205    attempt: i32,
1206    max_attempts: i32,
1207    initial_interval_secs: f64,
1208    backoff_coefficient: f64,
1209    start_to_close_secs: f64,
1210    heartbeat_timeout_secs: Option<f64>,
1211    claimed_by: Option<String>,
1212    scheduled_at: f64,
1213    started_at: Option<f64>,
1214    completed_at: Option<f64>,
1215    last_heartbeat: Option<f64>,
1216}
1217
1218impl From<SqliteActivityRow> for WorkflowActivity {
1219    fn from(r: SqliteActivityRow) -> Self {
1220        Self {
1221            id: Some(r.id),
1222            workflow_id: r.workflow_id,
1223            seq: r.seq,
1224            name: r.name,
1225            task_queue: r.task_queue,
1226            input: r.input,
1227            status: r.status,
1228            result: r.result,
1229            error: r.error,
1230            attempt: r.attempt,
1231            max_attempts: r.max_attempts,
1232            initial_interval_secs: r.initial_interval_secs,
1233            backoff_coefficient: r.backoff_coefficient,
1234            start_to_close_secs: r.start_to_close_secs,
1235            heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1236            claimed_by: r.claimed_by,
1237            scheduled_at: r.scheduled_at,
1238            started_at: r.started_at,
1239            completed_at: r.completed_at,
1240            last_heartbeat: r.last_heartbeat,
1241        }
1242    }
1243}
1244
1245#[derive(sqlx::FromRow)]
1246struct SqliteTimerRow {
1247    id: i64,
1248    workflow_id: String,
1249    seq: i32,
1250    fire_at: f64,
1251    fired: bool,
1252}
1253
1254impl From<SqliteTimerRow> for WorkflowTimer {
1255    fn from(r: SqliteTimerRow) -> Self {
1256        Self {
1257            id: Some(r.id),
1258            workflow_id: r.workflow_id,
1259            seq: r.seq,
1260            fire_at: r.fire_at,
1261            fired: r.fired,
1262        }
1263    }
1264}
1265
1266#[derive(sqlx::FromRow)]
1267struct SqliteSignalRow {
1268    id: i64,
1269    workflow_id: String,
1270    name: String,
1271    payload: Option<String>,
1272    consumed: bool,
1273    received_at: f64,
1274}
1275
1276impl From<SqliteSignalRow> for WorkflowSignal {
1277    fn from(r: SqliteSignalRow) -> Self {
1278        Self {
1279            id: Some(r.id),
1280            workflow_id: r.workflow_id,
1281            name: r.name,
1282            payload: r.payload,
1283            consumed: r.consumed,
1284            received_at: r.received_at,
1285        }
1286    }
1287}
1288
1289#[derive(sqlx::FromRow)]
1290struct SqliteScheduleRow {
1291    name: String,
1292    namespace: String,
1293    workflow_type: String,
1294    cron_expr: String,
1295    input: Option<String>,
1296    task_queue: String,
1297    overlap_policy: String,
1298    paused: bool,
1299    last_run_at: Option<f64>,
1300    next_run_at: Option<f64>,
1301    last_workflow_id: Option<String>,
1302    created_at: f64,
1303}
1304
1305impl From<SqliteScheduleRow> for WorkflowSchedule {
1306    fn from(r: SqliteScheduleRow) -> Self {
1307        Self {
1308            name: r.name,
1309            namespace: r.namespace,
1310            workflow_type: r.workflow_type,
1311            cron_expr: r.cron_expr,
1312            input: r.input,
1313            task_queue: r.task_queue,
1314            overlap_policy: r.overlap_policy,
1315            paused: r.paused,
1316            last_run_at: r.last_run_at,
1317            next_run_at: r.next_run_at,
1318            last_workflow_id: r.last_workflow_id,
1319            created_at: r.created_at,
1320        }
1321    }
1322}
1323
1324#[derive(sqlx::FromRow)]
1325struct SqliteWorkerRow {
1326    id: String,
1327    namespace: String,
1328    identity: String,
1329    task_queue: String,
1330    workflows: Option<String>,
1331    activities: Option<String>,
1332    max_concurrent_workflows: i32,
1333    max_concurrent_activities: i32,
1334    active_tasks: i32,
1335    last_heartbeat: f64,
1336    registered_at: f64,
1337}
1338
1339impl From<SqliteWorkerRow> for WorkflowWorker {
1340    fn from(r: SqliteWorkerRow) -> Self {
1341        Self {
1342            id: r.id,
1343            namespace: r.namespace,
1344            identity: r.identity,
1345            task_queue: r.task_queue,
1346            workflows: r.workflows,
1347            activities: r.activities,
1348            max_concurrent_workflows: r.max_concurrent_workflows,
1349            max_concurrent_activities: r.max_concurrent_activities,
1350            active_tasks: r.active_tasks,
1351            last_heartbeat: r.last_heartbeat,
1352            registered_at: r.registered_at,
1353        }
1354    }
1355}