1use anyhow::Result;
2use sqlx::SqlitePool;
3
4use crate::store::{ApiKeyRecord, NamespaceRecord, NamespaceStats, QueueStats, WorkflowStore};
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9 name TEXT PRIMARY KEY,
10 created_at REAL NOT NULL
11);
12
13INSERT OR IGNORE INTO namespaces (name, created_at)
14 VALUES ('main', strftime('%s', 'now'));
15
16CREATE TABLE IF NOT EXISTS workflows (
17 id TEXT PRIMARY KEY,
18 namespace TEXT NOT NULL DEFAULT 'main',
19 run_id TEXT NOT NULL,
20 workflow_type TEXT NOT NULL,
21 task_queue TEXT NOT NULL DEFAULT 'main',
22 status TEXT NOT NULL DEFAULT 'PENDING',
23 input TEXT,
24 result TEXT,
25 error TEXT,
26 parent_id TEXT,
27 claimed_by TEXT,
28 search_attributes TEXT,
29 archived_at REAL,
30 archive_uri TEXT,
31 -- Workflow-task dispatch (Phase 9): a workflow is "dispatchable" when
32 -- it has new events a worker needs to replay against. Set true on
33 -- start, on activity completion, on timer fire, on signal arrival.
34 -- Cleared when a worker claims the dispatch lease.
35 needs_dispatch INTEGER NOT NULL DEFAULT 0,
36 dispatch_claimed_by TEXT,
37 dispatch_last_heartbeat REAL,
38 created_at REAL NOT NULL,
39 updated_at REAL NOT NULL,
40 completed_at REAL
41);
42CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
43CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
44CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
45
46CREATE TABLE IF NOT EXISTS workflow_events (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 workflow_id TEXT NOT NULL REFERENCES workflows(id),
49 seq INTEGER NOT NULL,
50 event_type TEXT NOT NULL,
51 payload TEXT,
52 timestamp REAL NOT NULL
53);
54CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
55
56CREATE TABLE IF NOT EXISTS workflow_activities (
57 id INTEGER PRIMARY KEY AUTOINCREMENT,
58 workflow_id TEXT NOT NULL REFERENCES workflows(id),
59 seq INTEGER NOT NULL,
60 name TEXT NOT NULL,
61 task_queue TEXT NOT NULL DEFAULT 'main',
62 input TEXT,
63 status TEXT NOT NULL DEFAULT 'PENDING',
64 result TEXT,
65 error TEXT,
66 attempt INTEGER NOT NULL DEFAULT 1,
67 max_attempts INTEGER NOT NULL DEFAULT 3,
68 initial_interval_secs REAL NOT NULL DEFAULT 1,
69 backoff_coefficient REAL NOT NULL DEFAULT 2,
70 start_to_close_secs REAL NOT NULL DEFAULT 300,
71 heartbeat_timeout_secs REAL,
72 claimed_by TEXT,
73 scheduled_at REAL NOT NULL,
74 started_at REAL,
75 completed_at REAL,
76 last_heartbeat REAL,
77 UNIQUE (workflow_id, seq)
78);
79CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
80
81CREATE TABLE IF NOT EXISTS workflow_timers (
82 id INTEGER PRIMARY KEY AUTOINCREMENT,
83 workflow_id TEXT NOT NULL REFERENCES workflows(id),
84 seq INTEGER NOT NULL,
85 fire_at REAL NOT NULL,
86 fired INTEGER NOT NULL DEFAULT 0,
87 UNIQUE (workflow_id, seq)
88);
89CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at);
90
91CREATE TABLE IF NOT EXISTS workflow_signals (
92 id INTEGER PRIMARY KEY AUTOINCREMENT,
93 workflow_id TEXT NOT NULL REFERENCES workflows(id),
94 name TEXT NOT NULL,
95 payload TEXT,
96 consumed INTEGER NOT NULL DEFAULT 0,
97 received_at REAL NOT NULL
98);
99CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
100
101CREATE TABLE IF NOT EXISTS workflow_schedules (
102 name TEXT NOT NULL,
103 namespace TEXT NOT NULL DEFAULT 'main',
104 workflow_type TEXT NOT NULL,
105 cron_expr TEXT NOT NULL,
106 timezone TEXT NOT NULL DEFAULT 'UTC',
107 input TEXT,
108 task_queue TEXT NOT NULL DEFAULT 'main',
109 overlap_policy TEXT NOT NULL DEFAULT 'skip',
110 paused INTEGER NOT NULL DEFAULT 0,
111 last_run_at REAL,
112 next_run_at REAL,
113 last_workflow_id TEXT,
114 created_at REAL NOT NULL,
115 PRIMARY KEY (namespace, name)
116);
117
118CREATE TABLE IF NOT EXISTS workflow_workers (
119 id TEXT PRIMARY KEY,
120 namespace TEXT NOT NULL DEFAULT 'main',
121 identity TEXT NOT NULL,
122 task_queue TEXT NOT NULL,
123 workflows TEXT,
124 activities TEXT,
125 max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
126 max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
127 active_tasks INTEGER NOT NULL DEFAULT 0,
128 last_heartbeat REAL NOT NULL,
129 registered_at REAL NOT NULL
130);
131
132CREATE TABLE IF NOT EXISTS workflow_snapshots (
133 workflow_id TEXT NOT NULL REFERENCES workflows(id),
134 event_seq INTEGER NOT NULL,
135 state_json TEXT NOT NULL,
136 created_at REAL NOT NULL,
137 PRIMARY KEY (workflow_id, event_seq)
138);
139
140CREATE TABLE IF NOT EXISTS api_keys (
141 key_hash TEXT PRIMARY KEY,
142 prefix TEXT NOT NULL,
143 label TEXT,
144 created_at REAL NOT NULL
145);
146CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
147
148CREATE TABLE IF NOT EXISTS engine_lock (
149 id INTEGER PRIMARY KEY CHECK (id = 1),
150 instance_id TEXT NOT NULL,
151 started_at REAL NOT NULL,
152 last_heartbeat REAL NOT NULL
153);
154"#;
155
156const LOCK_STALE_SECS: f64 = 60.0;
159const LOCK_HEARTBEAT_SECS: u64 = 15;
161
162pub struct SqliteStore {
163 pool: SqlitePool,
164 instance_id: String,
165}
166
167impl SqliteStore {
168 pub async fn new(url: &str) -> Result<Self> {
169 let pool = SqlitePool::connect(url).await?;
170 let instance_id = format!("assay-{:016x}", {
171 use std::collections::hash_map::DefaultHasher;
172 use std::hash::{Hash, Hasher};
173 let mut h = DefaultHasher::new();
174 std::time::SystemTime::now().hash(&mut h);
175 std::process::id().hash(&mut h);
176 h.finish()
177 });
178 let store = Self { pool, instance_id };
179 store.migrate().await?;
180 Ok(store)
181 }
182
183 pub async fn acquire_engine_lock(&self) -> Result<()> {
186 let now = timestamp_now();
187
188 let result = sqlx::query(
190 "INSERT INTO engine_lock (id, instance_id, started_at, last_heartbeat) VALUES (1, ?, ?, ?)",
191 )
192 .bind(&self.instance_id)
193 .bind(now)
194 .bind(now)
195 .execute(&self.pool)
196 .await;
197
198 match result {
199 Ok(_) => Ok(()),
200 Err(_) => {
201 let row: Option<(String, f64)> = sqlx::query_as(
203 "SELECT instance_id, last_heartbeat FROM engine_lock WHERE id = 1",
204 )
205 .fetch_optional(&self.pool)
206 .await?;
207
208 if let Some((existing_id, last_hb)) = row {
209 if now - last_hb > LOCK_STALE_SECS {
210 sqlx::query(
212 "UPDATE engine_lock SET instance_id = ?, started_at = ?, last_heartbeat = ? WHERE id = 1",
213 )
214 .bind(&self.instance_id)
215 .bind(now)
216 .bind(now)
217 .execute(&self.pool)
218 .await?;
219 tracing::warn!(
220 "Took over stale engine lock from {existing_id} (last heartbeat {:.0}s ago)",
221 now - last_hb
222 );
223 Ok(())
224 } else {
225 let age = now - last_hb;
226 anyhow::bail!(
227 "Another assay engine instance is already running (id: {existing_id}, \
228 last heartbeat {age:.0}s ago).\n\n\
229 SQLite only supports a single engine instance. For multi-instance \
230 deployment (Kubernetes, Docker Swarm), use PostgreSQL:\n\n\
231 \x20 assay serve --backend postgres://user:pass@host:5432/dbname"
232 );
233 }
234 } else {
235 anyhow::bail!("Unexpected engine lock state");
236 }
237 }
238 }
239 }
240
241 pub async fn refresh_engine_lock(&self) -> Result<()> {
243 sqlx::query("UPDATE engine_lock SET last_heartbeat = ? WHERE id = 1 AND instance_id = ?")
244 .bind(timestamp_now())
245 .bind(&self.instance_id)
246 .execute(&self.pool)
247 .await?;
248 Ok(())
249 }
250
251 pub async fn release_engine_lock(&self) -> Result<()> {
253 sqlx::query("DELETE FROM engine_lock WHERE id = 1 AND instance_id = ?")
254 .bind(&self.instance_id)
255 .execute(&self.pool)
256 .await?;
257 Ok(())
258 }
259
260 pub fn spawn_lock_heartbeat(self: &std::sync::Arc<Self>) {
262 let store = std::sync::Arc::clone(self);
263 tokio::spawn(async move {
264 let mut tick = tokio::time::interval(std::time::Duration::from_secs(LOCK_HEARTBEAT_SECS));
265 loop {
266 tick.tick().await;
267 if let Err(e) = store.refresh_engine_lock().await {
268 tracing::error!("Engine lock heartbeat failed: {e}");
269 }
270 }
271 });
272 }
273
274 async fn migrate(&self) -> Result<()> {
287 for statement in SCHEMA.split(';') {
288 let trimmed = statement.trim();
289 if !trimmed.is_empty() {
290 sqlx::query(trimmed).execute(&self.pool).await?;
291 }
292 }
293 Ok(())
295 }
296
297 #[allow(dead_code)]
307 async fn add_column_if_missing(
308 pool: &SqlitePool,
309 table: &str,
310 column: &str,
311 type_def: &str,
312 ) -> Result<()> {
313 let exists: Option<(String,)> =
314 sqlx::query_as("SELECT name FROM pragma_table_info(?) WHERE name = ?")
315 .bind(table)
316 .bind(column)
317 .fetch_optional(pool)
318 .await?;
319 if exists.is_none() {
320 let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {type_def}");
321 sqlx::query(&sql).execute(pool).await?;
322 }
323 Ok(())
324 }
325}
326
327impl WorkflowStore for SqliteStore {
328 async fn create_namespace(&self, name: &str) -> Result<()> {
331 sqlx::query("INSERT INTO namespaces (name, created_at) VALUES (?, ?)")
332 .bind(name)
333 .bind(timestamp_now())
334 .execute(&self.pool)
335 .await?;
336 Ok(())
337 }
338
339 async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>> {
340 let rows = sqlx::query_as::<_, (String, f64)>(
341 "SELECT name, created_at FROM namespaces ORDER BY name",
342 )
343 .fetch_all(&self.pool)
344 .await?;
345 Ok(rows
346 .into_iter()
347 .map(|(name, created_at)| NamespaceRecord { name, created_at })
348 .collect())
349 }
350
351 async fn delete_namespace(&self, name: &str) -> Result<bool> {
352 let res = sqlx::query("DELETE FROM namespaces WHERE name = ?")
353 .bind(name)
354 .execute(&self.pool)
355 .await?;
356 Ok(res.rows_affected() > 0)
357 }
358
359 async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats> {
360 let total: (i64,) =
361 sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = ?")
362 .bind(namespace)
363 .fetch_one(&self.pool)
364 .await?;
365 let running: (i64,) = sqlx::query_as(
366 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'RUNNING'",
367 )
368 .bind(namespace)
369 .fetch_one(&self.pool)
370 .await?;
371 let pending: (i64,) = sqlx::query_as(
372 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'PENDING'",
373 )
374 .bind(namespace)
375 .fetch_one(&self.pool)
376 .await?;
377 let completed: (i64,) = sqlx::query_as(
378 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'COMPLETED'",
379 )
380 .bind(namespace)
381 .fetch_one(&self.pool)
382 .await?;
383 let failed: (i64,) = sqlx::query_as(
384 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'FAILED'",
385 )
386 .bind(namespace)
387 .fetch_one(&self.pool)
388 .await?;
389 let schedules: (i64,) =
390 sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = ?")
391 .bind(namespace)
392 .fetch_one(&self.pool)
393 .await?;
394 let workers: (i64,) =
395 sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = ?")
396 .bind(namespace)
397 .fetch_one(&self.pool)
398 .await?;
399
400 Ok(NamespaceStats {
401 namespace: namespace.to_string(),
402 total_workflows: total.0,
403 running: running.0,
404 pending: pending.0,
405 completed: completed.0,
406 failed: failed.0,
407 schedules: schedules.0,
408 workers: workers.0,
409 })
410 }
411
412 async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
415 sqlx::query(
416 "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
417 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
418 )
419 .bind(&wf.id)
420 .bind(&wf.namespace)
421 .bind(&wf.run_id)
422 .bind(&wf.workflow_type)
423 .bind(&wf.task_queue)
424 .bind(&wf.status)
425 .bind(&wf.input)
426 .bind(&wf.result)
427 .bind(&wf.error)
428 .bind(&wf.parent_id)
429 .bind(&wf.claimed_by)
430 .bind(&wf.search_attributes)
431 .bind(wf.archived_at)
432 .bind(&wf.archive_uri)
433 .bind(wf.created_at)
434 .bind(wf.updated_at)
435 .bind(wf.completed_at)
436 .execute(&self.pool)
437 .await?;
438 Ok(())
439 }
440
441 async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
442 let row = sqlx::query_as::<_, SqliteWorkflowRow>(
443 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = ?",
444 )
445 .bind(id)
446 .fetch_optional(&self.pool)
447 .await?;
448 Ok(row.map(Into::into))
449 }
450
451 async fn list_workflows(
452 &self,
453 namespace: &str,
454 status: Option<WorkflowStatus>,
455 workflow_type: Option<&str>,
456 search_attrs_filter: Option<&str>,
457 limit: i64,
458 offset: i64,
459 ) -> Result<Vec<WorkflowRecord>> {
460 let status_str = status.map(|s| s.to_string());
461
462 let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
467 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
468 .and_then(|v| v.as_object().cloned())
469 .map(|m| m.into_iter().collect())
470 .unwrap_or_default();
471
472 let mut sql = String::from(
473 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
474 FROM workflows
475 WHERE namespace = ?
476 AND (? IS NULL OR status = ?)
477 AND (? IS NULL OR workflow_type = ?)",
478 );
479 for _ in &filter_pairs {
480 sql.push_str(" AND json_extract(search_attributes, '$.' || ?) = ?");
481 }
482 sql.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
483
484 let mut q = sqlx::query_as::<_, SqliteWorkflowRow>(&sql)
485 .bind(namespace)
486 .bind(&status_str)
487 .bind(&status_str)
488 .bind(workflow_type)
489 .bind(workflow_type);
490 for (key, value) in &filter_pairs {
491 q = q.bind(key.clone());
492 match value {
497 serde_json::Value::String(s) => q = q.bind(s.clone()),
498 serde_json::Value::Number(n) => {
499 if let Some(i) = n.as_i64() {
500 q = q.bind(i);
501 } else if let Some(f) = n.as_f64() {
502 q = q.bind(f);
503 } else {
504 q = q.bind(n.to_string());
505 }
506 }
507 serde_json::Value::Bool(b) => q = q.bind(*b as i64),
508 _ => q = q.bind(value.to_string()),
509 }
510 }
511 let rows = q
512 .bind(limit)
513 .bind(offset)
514 .fetch_all(&self.pool)
515 .await?;
516 Ok(rows.into_iter().map(Into::into).collect())
517 }
518
519 async fn update_workflow_status(
520 &self,
521 id: &str,
522 status: WorkflowStatus,
523 result: Option<&str>,
524 error: Option<&str>,
525 ) -> Result<()> {
526 let now = timestamp_now();
527 let completed_at = if status.is_terminal() { Some(now) } else { None };
528 sqlx::query(
529 "UPDATE workflows SET status = ?, result = COALESCE(?, result), error = COALESCE(?, error), updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE id = ?",
530 )
531 .bind(status.to_string())
532 .bind(result)
533 .bind(error)
534 .bind(now)
535 .bind(completed_at)
536 .bind(id)
537 .execute(&self.pool)
538 .await?;
539 Ok(())
540 }
541
542 async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
543 let res = sqlx::query(
544 "UPDATE workflows SET claimed_by = ?, status = 'RUNNING', updated_at = ? WHERE id = ? AND claimed_by IS NULL",
545 )
546 .bind(worker_id)
547 .bind(timestamp_now())
548 .bind(id)
549 .execute(&self.pool)
550 .await?;
551 Ok(res.rows_affected() > 0)
552 }
553
554 async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
555 sqlx::query("UPDATE workflows SET needs_dispatch = 1 WHERE id = ?")
556 .bind(workflow_id)
557 .execute(&self.pool)
558 .await?;
559 Ok(())
560 }
561
562 async fn claim_workflow_task(
563 &self,
564 task_queue: &str,
565 worker_id: &str,
566 ) -> Result<Option<WorkflowRecord>> {
567 let now = timestamp_now();
568 let row = sqlx::query_as::<_, SqliteWorkflowRow>(
570 "UPDATE workflows
571 SET dispatch_claimed_by = ?, dispatch_last_heartbeat = ?, needs_dispatch = 0
572 WHERE id = (
573 SELECT id FROM workflows
574 WHERE task_queue = ?
575 AND needs_dispatch = 1
576 AND dispatch_claimed_by IS NULL
577 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
578 ORDER BY updated_at ASC
579 LIMIT 1
580 )
581 RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
582 )
583 .bind(worker_id)
584 .bind(now)
585 .bind(task_queue)
586 .fetch_optional(&self.pool)
587 .await?;
588 Ok(row.map(Into::into))
589 }
590
591 async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
592 sqlx::query(
593 "UPDATE workflows
594 SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
595 WHERE id = ? AND dispatch_claimed_by = ?",
596 )
597 .bind(workflow_id)
598 .bind(worker_id)
599 .execute(&self.pool)
600 .await?;
601 Ok(())
602 }
603
604 async fn release_stale_dispatch_leases(
605 &self,
606 now: f64,
607 timeout_secs: f64,
608 ) -> Result<u64> {
609 let res = sqlx::query(
613 "UPDATE workflows
614 SET dispatch_claimed_by = NULL,
615 dispatch_last_heartbeat = NULL,
616 needs_dispatch = 1
617 WHERE dispatch_claimed_by IS NOT NULL
618 AND (? - dispatch_last_heartbeat) > ?
619 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
620 )
621 .bind(now)
622 .bind(timeout_secs)
623 .execute(&self.pool)
624 .await?;
625 Ok(res.rows_affected())
626 }
627
628 async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
631 let res = sqlx::query(
632 "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)",
633 )
634 .bind(&ev.workflow_id)
635 .bind(ev.seq)
636 .bind(&ev.event_type)
637 .bind(&ev.payload)
638 .bind(ev.timestamp)
639 .execute(&self.pool)
640 .await?;
641 Ok(res.last_insert_rowid())
642 }
643
644 async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
645 let rows = sqlx::query_as::<_, SqliteEventRow>(
646 "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = ? ORDER BY seq ASC",
647 )
648 .bind(workflow_id)
649 .fetch_all(&self.pool)
650 .await?;
651 Ok(rows.into_iter().map(Into::into).collect())
652 }
653
654 async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
655 let row: (i64,) =
656 sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = ?")
657 .bind(workflow_id)
658 .fetch_one(&self.pool)
659 .await?;
660 Ok(row.0)
661 }
662
663 async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
666 let res = sqlx::query(
667 "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
668 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
669 )
670 .bind(&act.workflow_id)
671 .bind(act.seq)
672 .bind(&act.name)
673 .bind(&act.task_queue)
674 .bind(&act.input)
675 .bind(&act.status)
676 .bind(act.attempt)
677 .bind(act.max_attempts)
678 .bind(act.initial_interval_secs)
679 .bind(act.backoff_coefficient)
680 .bind(act.start_to_close_secs)
681 .bind(act.heartbeat_timeout_secs)
682 .bind(act.scheduled_at)
683 .execute(&self.pool)
684 .await?;
685 Ok(res.last_insert_rowid())
686 }
687
688 async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
689 let row = sqlx::query_as::<_, SqliteActivityRow>(
690 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
691 FROM workflow_activities WHERE id = ?",
692 )
693 .bind(id)
694 .fetch_optional(&self.pool)
695 .await?;
696 Ok(row.map(Into::into))
697 }
698
699 async fn get_activity_by_workflow_seq(
700 &self,
701 workflow_id: &str,
702 seq: i32,
703 ) -> Result<Option<WorkflowActivity>> {
704 let row = sqlx::query_as::<_, SqliteActivityRow>(
705 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
706 FROM workflow_activities WHERE workflow_id = ? AND seq = ?",
707 )
708 .bind(workflow_id)
709 .bind(seq)
710 .fetch_optional(&self.pool)
711 .await?;
712 Ok(row.map(Into::into))
713 }
714
715 async fn claim_activity(
716 &self,
717 task_queue: &str,
718 worker_id: &str,
719 ) -> Result<Option<WorkflowActivity>> {
720 let now = timestamp_now();
721 let row = sqlx::query_as::<_, SqliteActivityRow>(
722 "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = ?, started_at = ?
723 WHERE id = (
724 SELECT id FROM workflow_activities
725 WHERE task_queue = ? AND status = 'PENDING'
726 ORDER BY scheduled_at ASC
727 LIMIT 1
728 )
729 RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
730 )
731 .bind(worker_id)
732 .bind(now)
733 .bind(task_queue)
734 .fetch_optional(&self.pool)
735 .await?;
736 Ok(row.map(Into::into))
737 }
738
739 async fn requeue_activity_for_retry(
740 &self,
741 id: i64,
742 next_attempt: i32,
743 next_scheduled_at: f64,
744 ) -> Result<()> {
745 sqlx::query(
746 "UPDATE workflow_activities
747 SET status = 'PENDING', attempt = ?, scheduled_at = ?,
748 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
749 error = NULL
750 WHERE id = ?",
751 )
752 .bind(next_attempt)
753 .bind(next_scheduled_at)
754 .bind(id)
755 .execute(&self.pool)
756 .await?;
757 Ok(())
758 }
759
760 async fn complete_activity(
761 &self,
762 id: i64,
763 result: Option<&str>,
764 error: Option<&str>,
765 failed: bool,
766 ) -> Result<()> {
767 let status = if failed { "FAILED" } else { "COMPLETED" };
768 sqlx::query(
769 "UPDATE workflow_activities SET status = ?, result = ?, error = ?, completed_at = ? WHERE id = ?",
770 )
771 .bind(status)
772 .bind(result)
773 .bind(error)
774 .bind(timestamp_now())
775 .bind(id)
776 .execute(&self.pool)
777 .await?;
778 Ok(())
779 }
780
781 async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
782 sqlx::query("UPDATE workflow_activities SET last_heartbeat = ? WHERE id = ?")
783 .bind(timestamp_now())
784 .bind(id)
785 .execute(&self.pool)
786 .await?;
787 Ok(())
788 }
789
790 async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
791 let rows = sqlx::query_as::<_, SqliteActivityRow>(
792 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
793 FROM workflow_activities
794 WHERE status = 'RUNNING'
795 AND heartbeat_timeout_secs IS NOT NULL
796 AND last_heartbeat IS NOT NULL
797 AND (? - last_heartbeat) > heartbeat_timeout_secs",
798 )
799 .bind(now)
800 .fetch_all(&self.pool)
801 .await?;
802 Ok(rows.into_iter().map(Into::into).collect())
803 }
804
805 async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
808 let res = sqlx::query(
809 "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES (?, ?, ?, 0)",
810 )
811 .bind(&timer.workflow_id)
812 .bind(timer.seq)
813 .bind(timer.fire_at)
814 .execute(&self.pool)
815 .await?;
816 Ok(res.last_insert_rowid())
817 }
818
819 async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
820 let res = sqlx::query(
821 "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = ?
822 WHERE workflow_id = ? AND status = 'PENDING'",
823 )
824 .bind(timestamp_now())
825 .bind(workflow_id)
826 .execute(&self.pool)
827 .await?;
828 Ok(res.rows_affected())
829 }
830
831 async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
832 let res = sqlx::query(
833 "UPDATE workflow_timers SET fired = 1
834 WHERE workflow_id = ? AND fired = 0",
835 )
836 .bind(workflow_id)
837 .execute(&self.pool)
838 .await?;
839 Ok(res.rows_affected())
840 }
841
842 async fn get_timer_by_workflow_seq(
843 &self,
844 workflow_id: &str,
845 seq: i32,
846 ) -> Result<Option<WorkflowTimer>> {
847 let row = sqlx::query_as::<_, SqliteTimerRow>(
848 "SELECT id, workflow_id, seq, fire_at, fired
849 FROM workflow_timers WHERE workflow_id = ? AND seq = ?",
850 )
851 .bind(workflow_id)
852 .bind(seq)
853 .fetch_optional(&self.pool)
854 .await?;
855 Ok(row.map(Into::into))
856 }
857
858 async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
859 let rows = sqlx::query_as::<_, SqliteTimerRow>(
860 "UPDATE workflow_timers SET fired = 1
861 WHERE fired = 0 AND fire_at <= ?
862 RETURNING id, workflow_id, seq, fire_at, fired",
863 )
864 .bind(now)
865 .fetch_all(&self.pool)
866 .await?;
867 Ok(rows.into_iter().map(Into::into).collect())
868 }
869
870 async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
873 let res = sqlx::query(
874 "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES (?, ?, ?, 0, ?)",
875 )
876 .bind(&sig.workflow_id)
877 .bind(&sig.name)
878 .bind(&sig.payload)
879 .bind(sig.received_at)
880 .execute(&self.pool)
881 .await?;
882 Ok(res.last_insert_rowid())
883 }
884
885 async fn consume_signals(
886 &self,
887 workflow_id: &str,
888 name: &str,
889 ) -> Result<Vec<WorkflowSignal>> {
890 let rows = sqlx::query_as::<_, SqliteSignalRow>(
891 "UPDATE workflow_signals SET consumed = 1
892 WHERE workflow_id = ? AND name = ? AND consumed = 0
893 RETURNING id, workflow_id, name, payload, consumed, received_at",
894 )
895 .bind(workflow_id)
896 .bind(name)
897 .fetch_all(&self.pool)
898 .await?;
899 Ok(rows.into_iter().map(Into::into).collect())
900 }
901
902 async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
905 sqlx::query(
906 "INSERT INTO workflow_schedules (name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
907 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
908 )
909 .bind(&sched.name)
910 .bind(&sched.namespace)
911 .bind(&sched.workflow_type)
912 .bind(&sched.cron_expr)
913 .bind(&sched.timezone)
914 .bind(&sched.input)
915 .bind(&sched.task_queue)
916 .bind(&sched.overlap_policy)
917 .bind(sched.paused)
918 .bind(sched.last_run_at)
919 .bind(sched.next_run_at)
920 .bind(&sched.last_workflow_id)
921 .bind(sched.created_at)
922 .execute(&self.pool)
923 .await?;
924 Ok(())
925 }
926
927 async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
928 let row = sqlx::query_as::<_, SqliteScheduleRow>(
929 "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
930 FROM workflow_schedules WHERE namespace = ? AND name = ?",
931 )
932 .bind(namespace)
933 .bind(name)
934 .fetch_optional(&self.pool)
935 .await?;
936 Ok(row.map(Into::into))
937 }
938
939 async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
940 let rows = sqlx::query_as::<_, SqliteScheduleRow>(
941 "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
942 FROM workflow_schedules WHERE namespace = ? ORDER BY name",
943 )
944 .bind(namespace)
945 .fetch_all(&self.pool)
946 .await?;
947 Ok(rows.into_iter().map(Into::into).collect())
948 }
949
950 async fn update_schedule_last_run(
951 &self,
952 namespace: &str,
953 name: &str,
954 last_run_at: f64,
955 next_run_at: f64,
956 workflow_id: &str,
957 ) -> Result<()> {
958 sqlx::query(
959 "UPDATE workflow_schedules SET last_run_at = ?, next_run_at = ?, last_workflow_id = ? WHERE namespace = ? AND name = ?",
960 )
961 .bind(last_run_at)
962 .bind(next_run_at)
963 .bind(workflow_id)
964 .bind(namespace)
965 .bind(name)
966 .execute(&self.pool)
967 .await?;
968 Ok(())
969 }
970
971 async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
972 let res =
973 sqlx::query("DELETE FROM workflow_schedules WHERE namespace = ? AND name = ?")
974 .bind(namespace)
975 .bind(name)
976 .execute(&self.pool)
977 .await?;
978 Ok(res.rows_affected() > 0)
979 }
980
981 async fn list_archivable_workflows(
982 &self,
983 cutoff: f64,
984 limit: i64,
985 ) -> Result<Vec<WorkflowRecord>> {
986 let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
987 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
988 FROM workflows
989 WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
990 AND completed_at IS NOT NULL
991 AND completed_at < ?
992 AND archived_at IS NULL
993 ORDER BY completed_at ASC
994 LIMIT ?",
995 )
996 .bind(cutoff)
997 .bind(limit)
998 .fetch_all(&self.pool)
999 .await?;
1000 Ok(rows.into_iter().map(Into::into).collect())
1001 }
1002
1003 async fn mark_archived_and_purge(
1004 &self,
1005 workflow_id: &str,
1006 archive_uri: &str,
1007 archived_at: f64,
1008 ) -> Result<()> {
1009 let mut tx = self.pool.begin().await?;
1010 sqlx::query("DELETE FROM workflow_events WHERE workflow_id = ?")
1011 .bind(workflow_id)
1012 .execute(&mut *tx)
1013 .await?;
1014 sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = ?")
1015 .bind(workflow_id)
1016 .execute(&mut *tx)
1017 .await?;
1018 sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = ?")
1019 .bind(workflow_id)
1020 .execute(&mut *tx)
1021 .await?;
1022 sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = ?")
1023 .bind(workflow_id)
1024 .execute(&mut *tx)
1025 .await?;
1026 sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = ?")
1027 .bind(workflow_id)
1028 .execute(&mut *tx)
1029 .await?;
1030 sqlx::query(
1031 "UPDATE workflows SET archived_at = ?, archive_uri = ? WHERE id = ?",
1032 )
1033 .bind(archived_at)
1034 .bind(archive_uri)
1035 .bind(workflow_id)
1036 .execute(&mut *tx)
1037 .await?;
1038 tx.commit().await?;
1039 Ok(())
1040 }
1041
1042 async fn upsert_search_attributes(
1043 &self,
1044 workflow_id: &str,
1045 patch_json: &str,
1046 ) -> Result<()> {
1047 let current: Option<(Option<String>,)> =
1050 sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = ?")
1051 .bind(workflow_id)
1052 .fetch_optional(&self.pool)
1053 .await?;
1054 let merged = merge_search_attrs(
1055 current.and_then(|(s,)| s).as_deref(),
1056 patch_json,
1057 )?;
1058 sqlx::query("UPDATE workflows SET search_attributes = ? WHERE id = ?")
1059 .bind(merged)
1060 .bind(workflow_id)
1061 .execute(&self.pool)
1062 .await?;
1063 Ok(())
1064 }
1065
1066 async fn update_schedule(
1067 &self,
1068 namespace: &str,
1069 name: &str,
1070 patch: &SchedulePatch,
1071 ) -> Result<Option<WorkflowSchedule>> {
1072 let mut sets: Vec<&'static str> = Vec::new();
1075 if patch.cron_expr.is_some() {
1076 sets.push("cron_expr = ?");
1077 }
1078 if patch.timezone.is_some() {
1079 sets.push("timezone = ?");
1080 }
1081 if patch.input.is_some() {
1082 sets.push("input = ?");
1083 }
1084 if patch.task_queue.is_some() {
1085 sets.push("task_queue = ?");
1086 }
1087 if patch.overlap_policy.is_some() {
1088 sets.push("overlap_policy = ?");
1089 }
1090 if sets.is_empty() {
1092 return self.get_schedule(namespace, name).await;
1093 }
1094
1095 let sql = format!(
1096 "UPDATE workflow_schedules SET {} WHERE namespace = ? AND name = ?",
1097 sets.join(", ")
1098 );
1099 let mut q = sqlx::query(&sql);
1100 if let Some(ref v) = patch.cron_expr {
1101 q = q.bind(v);
1102 }
1103 if let Some(ref v) = patch.timezone {
1104 q = q.bind(v);
1105 }
1106 if let Some(ref v) = patch.input {
1107 q = q.bind(v.to_string());
1108 }
1109 if let Some(ref v) = patch.task_queue {
1110 q = q.bind(v);
1111 }
1112 if let Some(ref v) = patch.overlap_policy {
1113 q = q.bind(v);
1114 }
1115 let res = q
1116 .bind(namespace)
1117 .bind(name)
1118 .execute(&self.pool)
1119 .await?;
1120 if res.rows_affected() == 0 {
1121 return Ok(None);
1122 }
1123 self.get_schedule(namespace, name).await
1124 }
1125
1126 async fn set_schedule_paused(
1127 &self,
1128 namespace: &str,
1129 name: &str,
1130 paused: bool,
1131 ) -> Result<Option<WorkflowSchedule>> {
1132 let res = sqlx::query(
1133 "UPDATE workflow_schedules SET paused = ? WHERE namespace = ? AND name = ?",
1134 )
1135 .bind(paused)
1136 .bind(namespace)
1137 .bind(name)
1138 .execute(&self.pool)
1139 .await?;
1140 if res.rows_affected() == 0 {
1141 return Ok(None);
1142 }
1143 self.get_schedule(namespace, name).await
1144 }
1145
1146 async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1149 sqlx::query(
1150 "INSERT OR REPLACE INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1151 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1152 )
1153 .bind(&w.id)
1154 .bind(&w.namespace)
1155 .bind(&w.identity)
1156 .bind(&w.task_queue)
1157 .bind(&w.workflows)
1158 .bind(&w.activities)
1159 .bind(w.max_concurrent_workflows)
1160 .bind(w.max_concurrent_activities)
1161 .bind(w.active_tasks)
1162 .bind(w.last_heartbeat)
1163 .bind(w.registered_at)
1164 .execute(&self.pool)
1165 .await?;
1166 Ok(())
1167 }
1168
1169 async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1170 sqlx::query("UPDATE workflow_workers SET last_heartbeat = ? WHERE id = ?")
1171 .bind(now)
1172 .bind(id)
1173 .execute(&self.pool)
1174 .await?;
1175 Ok(())
1176 }
1177
1178 async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1179 let rows = sqlx::query_as::<_, SqliteWorkerRow>(
1180 "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at
1181 FROM workflow_workers WHERE namespace = ? ORDER BY registered_at",
1182 )
1183 .bind(namespace)
1184 .fetch_all(&self.pool)
1185 .await?;
1186 Ok(rows.into_iter().map(Into::into).collect())
1187 }
1188
1189 async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1190 let rows: Vec<(String,)> =
1191 sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < ?")
1192 .bind(cutoff)
1193 .fetch_all(&self.pool)
1194 .await?;
1195 let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1196 if !ids.is_empty() {
1197 sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < ?")
1198 .bind(cutoff)
1199 .execute(&self.pool)
1200 .await?;
1201 }
1202 Ok(ids)
1203 }
1204
1205 async fn create_api_key(
1208 &self,
1209 key_hash: &str,
1210 prefix: &str,
1211 label: Option<&str>,
1212 created_at: f64,
1213 ) -> Result<()> {
1214 sqlx::query(
1215 "INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES (?, ?, ?, ?)",
1216 )
1217 .bind(key_hash)
1218 .bind(prefix)
1219 .bind(label)
1220 .bind(created_at)
1221 .execute(&self.pool)
1222 .await?;
1223 Ok(())
1224 }
1225
1226 async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1227 let row: Option<(i64,)> =
1228 sqlx::query_as("SELECT 1 FROM api_keys WHERE key_hash = ?")
1229 .bind(key_hash)
1230 .fetch_optional(&self.pool)
1231 .await?;
1232 Ok(row.is_some())
1233 }
1234
1235 async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>> {
1236 let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1237 "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1238 )
1239 .fetch_all(&self.pool)
1240 .await?;
1241 Ok(rows
1242 .into_iter()
1243 .map(|(prefix, label, created_at)| ApiKeyRecord {
1244 prefix,
1245 label,
1246 created_at,
1247 })
1248 .collect())
1249 }
1250
1251 async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1252 let res = sqlx::query("DELETE FROM api_keys WHERE prefix = ?")
1253 .bind(prefix)
1254 .execute(&self.pool)
1255 .await?;
1256 Ok(res.rows_affected() > 0)
1257 }
1258
1259 async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1262 let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
1263 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1264 FROM workflows WHERE parent_id = ? ORDER BY created_at ASC",
1265 )
1266 .bind(parent_id)
1267 .fetch_all(&self.pool)
1268 .await?;
1269 Ok(rows.into_iter().map(Into::into).collect())
1270 }
1271
1272 async fn create_snapshot(
1275 &self,
1276 workflow_id: &str,
1277 event_seq: i32,
1278 state_json: &str,
1279 ) -> Result<()> {
1280 sqlx::query(
1281 "INSERT OR REPLACE INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1282 VALUES (?, ?, ?, ?)",
1283 )
1284 .bind(workflow_id)
1285 .bind(event_seq)
1286 .bind(state_json)
1287 .bind(timestamp_now())
1288 .execute(&self.pool)
1289 .await?;
1290 Ok(())
1291 }
1292
1293 async fn get_latest_snapshot(
1294 &self,
1295 workflow_id: &str,
1296 ) -> Result<Option<WorkflowSnapshot>> {
1297 let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1298 "SELECT workflow_id, event_seq, state_json, created_at
1299 FROM workflow_snapshots WHERE workflow_id = ?
1300 ORDER BY event_seq DESC LIMIT 1",
1301 )
1302 .bind(workflow_id)
1303 .fetch_optional(&self.pool)
1304 .await?;
1305
1306 Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1307 workflow_id,
1308 event_seq,
1309 state_json,
1310 created_at,
1311 }))
1312 }
1313
1314 async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>> {
1317 let rows = sqlx::query_as::<_, (String, i64, i64)>(
1319 "SELECT a.task_queue,
1320 SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END),
1321 SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END)
1322 FROM workflow_activities a
1323 INNER JOIN workflows w ON w.id = a.workflow_id
1324 WHERE w.namespace = ?
1325 GROUP BY a.task_queue",
1326 )
1327 .bind(namespace)
1328 .fetch_all(&self.pool)
1329 .await?;
1330
1331 let mut stats: Vec<QueueStats> = rows
1332 .into_iter()
1333 .map(|(queue, pending, running)| QueueStats {
1334 queue,
1335 pending_activities: pending,
1336 running_activities: running,
1337 workers: 0,
1338 })
1339 .collect();
1340
1341 let worker_rows = sqlx::query_as::<_, (String, i64)>(
1343 "SELECT task_queue, COUNT(*) FROM workflow_workers WHERE namespace = ? GROUP BY task_queue",
1344 )
1345 .bind(namespace)
1346 .fetch_all(&self.pool)
1347 .await?;
1348
1349 for (queue, count) in worker_rows {
1350 if let Some(s) = stats.iter_mut().find(|s| s.queue == queue) {
1351 s.workers = count;
1352 } else {
1353 stats.push(QueueStats {
1354 queue,
1355 pending_activities: 0,
1356 running_activities: 0,
1357 workers: count,
1358 });
1359 }
1360 }
1361
1362 stats.sort_by(|a, b| a.queue.cmp(&b.queue));
1363 Ok(stats)
1364 }
1365
1366 async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1369 self.refresh_engine_lock().await.ok();
1372 Ok(true)
1373 }
1374}
1375
1376fn timestamp_now() -> f64 {
1377 std::time::SystemTime::now()
1378 .duration_since(std::time::UNIX_EPOCH)
1379 .unwrap()
1380 .as_secs_f64()
1381}
1382
1383pub(crate) fn merge_search_attrs(current: Option<&str>, patch_json: &str) -> Result<String> {
1386 let mut current_map: serde_json::Map<String, serde_json::Value> = current
1387 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
1388 .and_then(|v| v.as_object().cloned())
1389 .unwrap_or_default();
1390 let patch: serde_json::Value = serde_json::from_str(patch_json)
1391 .map_err(|e| anyhow::anyhow!("invalid search_attributes patch: {e}"))?;
1392 let patch_obj = patch
1393 .as_object()
1394 .ok_or_else(|| anyhow::anyhow!("search_attributes patch must be a JSON object"))?;
1395 for (k, v) in patch_obj {
1396 current_map.insert(k.clone(), v.clone());
1397 }
1398 Ok(serde_json::Value::Object(current_map).to_string())
1399}
1400
1401#[derive(sqlx::FromRow)]
1404struct SqliteWorkflowRow {
1405 id: String,
1406 namespace: String,
1407 run_id: String,
1408 workflow_type: String,
1409 task_queue: String,
1410 status: String,
1411 input: Option<String>,
1412 result: Option<String>,
1413 error: Option<String>,
1414 parent_id: Option<String>,
1415 claimed_by: Option<String>,
1416 search_attributes: Option<String>,
1417 archived_at: Option<f64>,
1418 archive_uri: Option<String>,
1419 created_at: f64,
1420 updated_at: f64,
1421 completed_at: Option<f64>,
1422}
1423
1424impl From<SqliteWorkflowRow> for WorkflowRecord {
1425 fn from(r: SqliteWorkflowRow) -> Self {
1426 Self {
1427 id: r.id,
1428 namespace: r.namespace,
1429 run_id: r.run_id,
1430 workflow_type: r.workflow_type,
1431 task_queue: r.task_queue,
1432 status: r.status,
1433 input: r.input,
1434 result: r.result,
1435 error: r.error,
1436 parent_id: r.parent_id,
1437 claimed_by: r.claimed_by,
1438 search_attributes: r.search_attributes,
1439 archived_at: r.archived_at,
1440 archive_uri: r.archive_uri,
1441 created_at: r.created_at,
1442 updated_at: r.updated_at,
1443 completed_at: r.completed_at,
1444 }
1445 }
1446}
1447
1448#[derive(sqlx::FromRow)]
1449struct SqliteEventRow {
1450 id: i64,
1451 workflow_id: String,
1452 seq: i32,
1453 event_type: String,
1454 payload: Option<String>,
1455 timestamp: f64,
1456}
1457
1458impl From<SqliteEventRow> for WorkflowEvent {
1459 fn from(r: SqliteEventRow) -> Self {
1460 Self {
1461 id: Some(r.id),
1462 workflow_id: r.workflow_id,
1463 seq: r.seq,
1464 event_type: r.event_type,
1465 payload: r.payload,
1466 timestamp: r.timestamp,
1467 }
1468 }
1469}
1470
1471#[derive(sqlx::FromRow)]
1472struct SqliteActivityRow {
1473 id: i64,
1474 workflow_id: String,
1475 seq: i32,
1476 name: String,
1477 task_queue: String,
1478 input: Option<String>,
1479 status: String,
1480 result: Option<String>,
1481 error: Option<String>,
1482 attempt: i32,
1483 max_attempts: i32,
1484 initial_interval_secs: f64,
1485 backoff_coefficient: f64,
1486 start_to_close_secs: f64,
1487 heartbeat_timeout_secs: Option<f64>,
1488 claimed_by: Option<String>,
1489 scheduled_at: f64,
1490 started_at: Option<f64>,
1491 completed_at: Option<f64>,
1492 last_heartbeat: Option<f64>,
1493}
1494
1495impl From<SqliteActivityRow> for WorkflowActivity {
1496 fn from(r: SqliteActivityRow) -> Self {
1497 Self {
1498 id: Some(r.id),
1499 workflow_id: r.workflow_id,
1500 seq: r.seq,
1501 name: r.name,
1502 task_queue: r.task_queue,
1503 input: r.input,
1504 status: r.status,
1505 result: r.result,
1506 error: r.error,
1507 attempt: r.attempt,
1508 max_attempts: r.max_attempts,
1509 initial_interval_secs: r.initial_interval_secs,
1510 backoff_coefficient: r.backoff_coefficient,
1511 start_to_close_secs: r.start_to_close_secs,
1512 heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1513 claimed_by: r.claimed_by,
1514 scheduled_at: r.scheduled_at,
1515 started_at: r.started_at,
1516 completed_at: r.completed_at,
1517 last_heartbeat: r.last_heartbeat,
1518 }
1519 }
1520}
1521
1522#[derive(sqlx::FromRow)]
1523struct SqliteTimerRow {
1524 id: i64,
1525 workflow_id: String,
1526 seq: i32,
1527 fire_at: f64,
1528 fired: bool,
1529}
1530
1531impl From<SqliteTimerRow> for WorkflowTimer {
1532 fn from(r: SqliteTimerRow) -> Self {
1533 Self {
1534 id: Some(r.id),
1535 workflow_id: r.workflow_id,
1536 seq: r.seq,
1537 fire_at: r.fire_at,
1538 fired: r.fired,
1539 }
1540 }
1541}
1542
1543#[derive(sqlx::FromRow)]
1544struct SqliteSignalRow {
1545 id: i64,
1546 workflow_id: String,
1547 name: String,
1548 payload: Option<String>,
1549 consumed: bool,
1550 received_at: f64,
1551}
1552
1553impl From<SqliteSignalRow> for WorkflowSignal {
1554 fn from(r: SqliteSignalRow) -> Self {
1555 Self {
1556 id: Some(r.id),
1557 workflow_id: r.workflow_id,
1558 name: r.name,
1559 payload: r.payload,
1560 consumed: r.consumed,
1561 received_at: r.received_at,
1562 }
1563 }
1564}
1565
1566#[derive(sqlx::FromRow)]
1567struct SqliteScheduleRow {
1568 name: String,
1569 namespace: String,
1570 workflow_type: String,
1571 cron_expr: String,
1572 timezone: String,
1573 input: Option<String>,
1574 task_queue: String,
1575 overlap_policy: String,
1576 paused: bool,
1577 last_run_at: Option<f64>,
1578 next_run_at: Option<f64>,
1579 last_workflow_id: Option<String>,
1580 created_at: f64,
1581}
1582
1583impl From<SqliteScheduleRow> for WorkflowSchedule {
1584 fn from(r: SqliteScheduleRow) -> Self {
1585 Self {
1586 name: r.name,
1587 namespace: r.namespace,
1588 workflow_type: r.workflow_type,
1589 cron_expr: r.cron_expr,
1590 timezone: r.timezone,
1591 input: r.input,
1592 task_queue: r.task_queue,
1593 overlap_policy: r.overlap_policy,
1594 paused: r.paused,
1595 last_run_at: r.last_run_at,
1596 next_run_at: r.next_run_at,
1597 last_workflow_id: r.last_workflow_id,
1598 created_at: r.created_at,
1599 }
1600 }
1601}
1602
1603#[derive(sqlx::FromRow)]
1604struct SqliteWorkerRow {
1605 id: String,
1606 namespace: String,
1607 identity: String,
1608 task_queue: String,
1609 workflows: Option<String>,
1610 activities: Option<String>,
1611 max_concurrent_workflows: i32,
1612 max_concurrent_activities: i32,
1613 active_tasks: i32,
1614 last_heartbeat: f64,
1615 registered_at: f64,
1616}
1617
1618impl From<SqliteWorkerRow> for WorkflowWorker {
1619 fn from(r: SqliteWorkerRow) -> Self {
1620 Self {
1621 id: r.id,
1622 namespace: r.namespace,
1623 identity: r.identity,
1624 task_queue: r.task_queue,
1625 workflows: r.workflows,
1626 activities: r.activities,
1627 max_concurrent_workflows: r.max_concurrent_workflows,
1628 max_concurrent_activities: r.max_concurrent_activities,
1629 active_tasks: r.active_tasks,
1630 last_heartbeat: r.last_heartbeat,
1631 registered_at: r.registered_at,
1632 }
1633 }
1634}