1use anyhow::Result;
2use sqlx::SqlitePool;
3
4use crate::store::{ApiKeyRecord, NamespaceRecord, NamespaceStats, QueueStats, WorkflowStore};
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9 name TEXT PRIMARY KEY,
10 created_at REAL NOT NULL
11);
12
13INSERT OR IGNORE INTO namespaces (name, created_at)
14 VALUES ('main', strftime('%s', 'now'));
15
16CREATE TABLE IF NOT EXISTS workflows (
17 id TEXT PRIMARY KEY,
18 namespace TEXT NOT NULL DEFAULT 'main',
19 run_id TEXT NOT NULL,
20 workflow_type TEXT NOT NULL,
21 task_queue TEXT NOT NULL DEFAULT 'main',
22 status TEXT NOT NULL DEFAULT 'PENDING',
23 input TEXT,
24 result TEXT,
25 error TEXT,
26 parent_id TEXT,
27 claimed_by TEXT,
28 search_attributes TEXT,
29 archived_at REAL,
30 archive_uri TEXT,
31 -- Workflow-task dispatch (Phase 9): a workflow is "dispatchable" when
32 -- it has new events a worker needs to replay against. Set true on
33 -- start, on activity completion, on timer fire, on signal arrival.
34 -- Cleared when a worker claims the dispatch lease.
35 needs_dispatch INTEGER NOT NULL DEFAULT 0,
36 dispatch_claimed_by TEXT,
37 dispatch_last_heartbeat REAL,
38 created_at REAL NOT NULL,
39 updated_at REAL NOT NULL,
40 completed_at REAL
41);
42CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
43CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
44CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
45
46CREATE TABLE IF NOT EXISTS workflow_events (
47 id INTEGER PRIMARY KEY AUTOINCREMENT,
48 workflow_id TEXT NOT NULL REFERENCES workflows(id),
49 seq INTEGER NOT NULL,
50 event_type TEXT NOT NULL,
51 payload TEXT,
52 timestamp REAL NOT NULL
53);
54CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
55
56CREATE TABLE IF NOT EXISTS workflow_activities (
57 id INTEGER PRIMARY KEY AUTOINCREMENT,
58 workflow_id TEXT NOT NULL REFERENCES workflows(id),
59 seq INTEGER NOT NULL,
60 name TEXT NOT NULL,
61 task_queue TEXT NOT NULL DEFAULT 'main',
62 input TEXT,
63 status TEXT NOT NULL DEFAULT 'PENDING',
64 result TEXT,
65 error TEXT,
66 attempt INTEGER NOT NULL DEFAULT 1,
67 max_attempts INTEGER NOT NULL DEFAULT 3,
68 initial_interval_secs REAL NOT NULL DEFAULT 1,
69 backoff_coefficient REAL NOT NULL DEFAULT 2,
70 start_to_close_secs REAL NOT NULL DEFAULT 300,
71 heartbeat_timeout_secs REAL,
72 claimed_by TEXT,
73 scheduled_at REAL NOT NULL,
74 started_at REAL,
75 completed_at REAL,
76 last_heartbeat REAL,
77 UNIQUE (workflow_id, seq)
78);
79CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
80
81CREATE TABLE IF NOT EXISTS workflow_timers (
82 id INTEGER PRIMARY KEY AUTOINCREMENT,
83 workflow_id TEXT NOT NULL REFERENCES workflows(id),
84 seq INTEGER NOT NULL,
85 fire_at REAL NOT NULL,
86 fired INTEGER NOT NULL DEFAULT 0,
87 UNIQUE (workflow_id, seq)
88);
89CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at);
90
91CREATE TABLE IF NOT EXISTS workflow_signals (
92 id INTEGER PRIMARY KEY AUTOINCREMENT,
93 workflow_id TEXT NOT NULL REFERENCES workflows(id),
94 name TEXT NOT NULL,
95 payload TEXT,
96 consumed INTEGER NOT NULL DEFAULT 0,
97 received_at REAL NOT NULL
98);
99CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
100
101CREATE TABLE IF NOT EXISTS workflow_schedules (
102 name TEXT NOT NULL,
103 namespace TEXT NOT NULL DEFAULT 'main',
104 workflow_type TEXT NOT NULL,
105 cron_expr TEXT NOT NULL,
106 timezone TEXT NOT NULL DEFAULT 'UTC',
107 input TEXT,
108 task_queue TEXT NOT NULL DEFAULT 'main',
109 overlap_policy TEXT NOT NULL DEFAULT 'skip',
110 paused INTEGER NOT NULL DEFAULT 0,
111 last_run_at REAL,
112 next_run_at REAL,
113 last_workflow_id TEXT,
114 created_at REAL NOT NULL,
115 PRIMARY KEY (namespace, name)
116);
117
118CREATE TABLE IF NOT EXISTS workflow_workers (
119 id TEXT PRIMARY KEY,
120 namespace TEXT NOT NULL DEFAULT 'main',
121 identity TEXT NOT NULL,
122 task_queue TEXT NOT NULL,
123 workflows TEXT,
124 activities TEXT,
125 max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
126 max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
127 active_tasks INTEGER NOT NULL DEFAULT 0,
128 last_heartbeat REAL NOT NULL,
129 registered_at REAL NOT NULL
130);
131
132CREATE TABLE IF NOT EXISTS workflow_snapshots (
133 workflow_id TEXT NOT NULL REFERENCES workflows(id),
134 event_seq INTEGER NOT NULL,
135 state_json TEXT NOT NULL,
136 created_at REAL NOT NULL,
137 PRIMARY KEY (workflow_id, event_seq)
138);
139
140CREATE TABLE IF NOT EXISTS api_keys (
141 key_hash TEXT PRIMARY KEY,
142 prefix TEXT NOT NULL,
143 label TEXT,
144 created_at REAL NOT NULL
145);
146CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
147
148CREATE TABLE IF NOT EXISTS engine_lock (
149 id INTEGER PRIMARY KEY CHECK (id = 1),
150 instance_id TEXT NOT NULL,
151 started_at REAL NOT NULL,
152 last_heartbeat REAL NOT NULL
153);
154"#;
155
156const LOCK_STALE_SECS: f64 = 60.0;
159const LOCK_HEARTBEAT_SECS: u64 = 15;
161
162pub struct SqliteStore {
163 pool: SqlitePool,
164 instance_id: String,
165}
166
167impl SqliteStore {
168 pub async fn new(url: &str) -> Result<Self> {
169 let pool = SqlitePool::connect(url).await?;
170 let instance_id = format!("assay-{:016x}", {
171 use std::collections::hash_map::DefaultHasher;
172 use std::hash::{Hash, Hasher};
173 let mut h = DefaultHasher::new();
174 std::time::SystemTime::now().hash(&mut h);
175 std::process::id().hash(&mut h);
176 h.finish()
177 });
178 let store = Self { pool, instance_id };
179 store.migrate().await?;
180 Ok(store)
181 }
182
183 pub async fn acquire_engine_lock(&self) -> Result<()> {
186 let now = timestamp_now();
187
188 let result = sqlx::query(
190 "INSERT INTO engine_lock (id, instance_id, started_at, last_heartbeat) VALUES (1, ?, ?, ?)",
191 )
192 .bind(&self.instance_id)
193 .bind(now)
194 .bind(now)
195 .execute(&self.pool)
196 .await;
197
198 match result {
199 Ok(_) => Ok(()),
200 Err(_) => {
201 let row: Option<(String, f64)> = sqlx::query_as(
203 "SELECT instance_id, last_heartbeat FROM engine_lock WHERE id = 1",
204 )
205 .fetch_optional(&self.pool)
206 .await?;
207
208 if let Some((existing_id, last_hb)) = row {
209 if now - last_hb > LOCK_STALE_SECS {
210 sqlx::query(
212 "UPDATE engine_lock SET instance_id = ?, started_at = ?, last_heartbeat = ? WHERE id = 1",
213 )
214 .bind(&self.instance_id)
215 .bind(now)
216 .bind(now)
217 .execute(&self.pool)
218 .await?;
219 tracing::warn!(
220 "Took over stale engine lock from {existing_id} (last heartbeat {:.0}s ago)",
221 now - last_hb
222 );
223 Ok(())
224 } else {
225 let age = now - last_hb;
226 anyhow::bail!(
227 "Another assay engine instance is already running (id: {existing_id}, \
228 last heartbeat {age:.0}s ago).\n\n\
229 SQLite only supports a single engine instance. For multi-instance \
230 deployment (Kubernetes, Docker Swarm), use PostgreSQL:\n\n\
231 \x20 assay serve --backend postgres://user:pass@host:5432/dbname"
232 );
233 }
234 } else {
235 anyhow::bail!("Unexpected engine lock state");
236 }
237 }
238 }
239 }
240
241 pub async fn refresh_engine_lock(&self) -> Result<()> {
243 sqlx::query("UPDATE engine_lock SET last_heartbeat = ? WHERE id = 1 AND instance_id = ?")
244 .bind(timestamp_now())
245 .bind(&self.instance_id)
246 .execute(&self.pool)
247 .await?;
248 Ok(())
249 }
250
251 pub async fn release_engine_lock(&self) -> Result<()> {
253 sqlx::query("DELETE FROM engine_lock WHERE id = 1 AND instance_id = ?")
254 .bind(&self.instance_id)
255 .execute(&self.pool)
256 .await?;
257 Ok(())
258 }
259
260 pub fn spawn_lock_heartbeat(self: &std::sync::Arc<Self>) {
262 let store = std::sync::Arc::clone(self);
263 tokio::spawn(async move {
264 let mut tick = tokio::time::interval(std::time::Duration::from_secs(LOCK_HEARTBEAT_SECS));
265 loop {
266 tick.tick().await;
267 if let Err(e) = store.refresh_engine_lock().await {
268 tracing::error!("Engine lock heartbeat failed: {e}");
269 }
270 }
271 });
272 }
273
274 async fn migrate(&self) -> Result<()> {
287 for statement in SCHEMA.split(';') {
288 let trimmed = statement.trim();
289 if !trimmed.is_empty() {
290 sqlx::query(trimmed).execute(&self.pool).await?;
291 }
292 }
293 Ok(())
295 }
296
297 #[allow(dead_code)]
307 async fn add_column_if_missing(
308 pool: &SqlitePool,
309 table: &str,
310 column: &str,
311 type_def: &str,
312 ) -> Result<()> {
313 let exists: Option<(String,)> =
314 sqlx::query_as("SELECT name FROM pragma_table_info(?) WHERE name = ?")
315 .bind(table)
316 .bind(column)
317 .fetch_optional(pool)
318 .await?;
319 if exists.is_none() {
320 let sql = format!("ALTER TABLE {table} ADD COLUMN {column} {type_def}");
321 sqlx::query(&sql).execute(pool).await?;
322 }
323 Ok(())
324 }
325}
326
327impl WorkflowStore for SqliteStore {
328 async fn create_namespace(&self, name: &str) -> Result<()> {
331 sqlx::query("INSERT INTO namespaces (name, created_at) VALUES (?, ?)")
332 .bind(name)
333 .bind(timestamp_now())
334 .execute(&self.pool)
335 .await?;
336 Ok(())
337 }
338
339 async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>> {
340 let rows = sqlx::query_as::<_, (String, f64)>(
341 "SELECT name, created_at FROM namespaces ORDER BY name",
342 )
343 .fetch_all(&self.pool)
344 .await?;
345 Ok(rows
346 .into_iter()
347 .map(|(name, created_at)| NamespaceRecord { name, created_at })
348 .collect())
349 }
350
351 async fn delete_namespace(&self, name: &str) -> Result<bool> {
352 let res = sqlx::query("DELETE FROM namespaces WHERE name = ?")
353 .bind(name)
354 .execute(&self.pool)
355 .await?;
356 Ok(res.rows_affected() > 0)
357 }
358
359 async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats> {
360 let total: (i64,) =
361 sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = ?")
362 .bind(namespace)
363 .fetch_one(&self.pool)
364 .await?;
365 let running: (i64,) = sqlx::query_as(
366 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'RUNNING'",
367 )
368 .bind(namespace)
369 .fetch_one(&self.pool)
370 .await?;
371 let pending: (i64,) = sqlx::query_as(
372 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'PENDING'",
373 )
374 .bind(namespace)
375 .fetch_one(&self.pool)
376 .await?;
377 let completed: (i64,) = sqlx::query_as(
378 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'COMPLETED'",
379 )
380 .bind(namespace)
381 .fetch_one(&self.pool)
382 .await?;
383 let failed: (i64,) = sqlx::query_as(
384 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'FAILED'",
385 )
386 .bind(namespace)
387 .fetch_one(&self.pool)
388 .await?;
389 let schedules: (i64,) =
390 sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = ?")
391 .bind(namespace)
392 .fetch_one(&self.pool)
393 .await?;
394 let workers: (i64,) =
395 sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = ?")
396 .bind(namespace)
397 .fetch_one(&self.pool)
398 .await?;
399
400 Ok(NamespaceStats {
401 namespace: namespace.to_string(),
402 total_workflows: total.0,
403 running: running.0,
404 pending: pending.0,
405 completed: completed.0,
406 failed: failed.0,
407 schedules: schedules.0,
408 workers: workers.0,
409 })
410 }
411
412 async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
415 sqlx::query(
416 "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
417 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
418 )
419 .bind(&wf.id)
420 .bind(&wf.namespace)
421 .bind(&wf.run_id)
422 .bind(&wf.workflow_type)
423 .bind(&wf.task_queue)
424 .bind(&wf.status)
425 .bind(&wf.input)
426 .bind(&wf.result)
427 .bind(&wf.error)
428 .bind(&wf.parent_id)
429 .bind(&wf.claimed_by)
430 .bind(&wf.search_attributes)
431 .bind(wf.archived_at)
432 .bind(&wf.archive_uri)
433 .bind(wf.created_at)
434 .bind(wf.updated_at)
435 .bind(wf.completed_at)
436 .execute(&self.pool)
437 .await?;
438 Ok(())
439 }
440
441 async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
442 let row = sqlx::query_as::<_, SqliteWorkflowRow>(
443 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = ?",
444 )
445 .bind(id)
446 .fetch_optional(&self.pool)
447 .await?;
448 Ok(row.map(Into::into))
449 }
450
451 async fn list_workflows(
452 &self,
453 namespace: &str,
454 status: Option<WorkflowStatus>,
455 workflow_type: Option<&str>,
456 search_attrs_filter: Option<&str>,
457 limit: i64,
458 offset: i64,
459 ) -> Result<Vec<WorkflowRecord>> {
460 let status_str = status.map(|s| s.to_string());
461
462 let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
467 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
468 .and_then(|v| v.as_object().cloned())
469 .map(|m| m.into_iter().collect())
470 .unwrap_or_default();
471
472 let mut sql = String::from(
473 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
474 FROM workflows
475 WHERE namespace = ?
476 AND (? IS NULL OR status = ?)
477 AND (? IS NULL OR workflow_type = ?)",
478 );
479 for _ in &filter_pairs {
480 sql.push_str(" AND json_extract(search_attributes, '$.' || ?) = ?");
481 }
482 sql.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
483
484 let mut q = sqlx::query_as::<_, SqliteWorkflowRow>(&sql)
485 .bind(namespace)
486 .bind(&status_str)
487 .bind(&status_str)
488 .bind(workflow_type)
489 .bind(workflow_type);
490 for (key, value) in &filter_pairs {
491 q = q.bind(key.clone());
492 match value {
497 serde_json::Value::String(s) => q = q.bind(s.clone()),
498 serde_json::Value::Number(n) => {
499 if let Some(i) = n.as_i64() {
500 q = q.bind(i);
501 } else if let Some(f) = n.as_f64() {
502 q = q.bind(f);
503 } else {
504 q = q.bind(n.to_string());
505 }
506 }
507 serde_json::Value::Bool(b) => q = q.bind(*b as i64),
508 _ => q = q.bind(value.to_string()),
509 }
510 }
511 let rows = q
512 .bind(limit)
513 .bind(offset)
514 .fetch_all(&self.pool)
515 .await?;
516 Ok(rows.into_iter().map(Into::into).collect())
517 }
518
519 async fn update_workflow_status(
520 &self,
521 id: &str,
522 status: WorkflowStatus,
523 result: Option<&str>,
524 error: Option<&str>,
525 ) -> Result<()> {
526 let now = timestamp_now();
527 let completed_at = if status.is_terminal() { Some(now) } else { None };
528 sqlx::query(
529 "UPDATE workflows SET status = ?, result = COALESCE(?, result), error = COALESCE(?, error), updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE id = ?",
530 )
531 .bind(status.to_string())
532 .bind(result)
533 .bind(error)
534 .bind(now)
535 .bind(completed_at)
536 .bind(id)
537 .execute(&self.pool)
538 .await?;
539 Ok(())
540 }
541
542 async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
543 let res = sqlx::query(
544 "UPDATE workflows SET claimed_by = ?, status = 'RUNNING', updated_at = ? WHERE id = ? AND claimed_by IS NULL",
545 )
546 .bind(worker_id)
547 .bind(timestamp_now())
548 .bind(id)
549 .execute(&self.pool)
550 .await?;
551 Ok(res.rows_affected() > 0)
552 }
553
554 async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
555 sqlx::query("UPDATE workflows SET needs_dispatch = 1 WHERE id = ?")
556 .bind(workflow_id)
557 .execute(&self.pool)
558 .await?;
559 Ok(())
560 }
561
562 async fn claim_workflow_task(
563 &self,
564 task_queue: &str,
565 worker_id: &str,
566 ) -> Result<Option<WorkflowRecord>> {
567 let now = timestamp_now();
568 let row = sqlx::query_as::<_, SqliteWorkflowRow>(
570 "UPDATE workflows
571 SET dispatch_claimed_by = ?, dispatch_last_heartbeat = ?, needs_dispatch = 0
572 WHERE id = (
573 SELECT id FROM workflows
574 WHERE task_queue = ?
575 AND needs_dispatch = 1
576 AND dispatch_claimed_by IS NULL
577 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
578 ORDER BY updated_at ASC
579 LIMIT 1
580 )
581 RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
582 )
583 .bind(worker_id)
584 .bind(now)
585 .bind(task_queue)
586 .fetch_optional(&self.pool)
587 .await?;
588 Ok(row.map(Into::into))
589 }
590
591 async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
592 sqlx::query(
593 "UPDATE workflows
594 SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
595 WHERE id = ? AND dispatch_claimed_by = ?",
596 )
597 .bind(workflow_id)
598 .bind(worker_id)
599 .execute(&self.pool)
600 .await?;
601 Ok(())
602 }
603
604 async fn release_stale_dispatch_leases(
605 &self,
606 now: f64,
607 timeout_secs: f64,
608 ) -> Result<u64> {
609 let res = sqlx::query(
613 "UPDATE workflows
614 SET dispatch_claimed_by = NULL,
615 dispatch_last_heartbeat = NULL,
616 needs_dispatch = 1
617 WHERE dispatch_claimed_by IS NOT NULL
618 AND (? - dispatch_last_heartbeat) > ?
619 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
620 )
621 .bind(now)
622 .bind(timeout_secs)
623 .execute(&self.pool)
624 .await?;
625 Ok(res.rows_affected())
626 }
627
628 async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
631 let res = sqlx::query(
632 "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)",
633 )
634 .bind(&ev.workflow_id)
635 .bind(ev.seq)
636 .bind(&ev.event_type)
637 .bind(&ev.payload)
638 .bind(ev.timestamp)
639 .execute(&self.pool)
640 .await?;
641 Ok(res.last_insert_rowid())
642 }
643
644 async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
645 let rows = sqlx::query_as::<_, SqliteEventRow>(
646 "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = ? ORDER BY seq ASC",
647 )
648 .bind(workflow_id)
649 .fetch_all(&self.pool)
650 .await?;
651 Ok(rows.into_iter().map(Into::into).collect())
652 }
653
654 async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
655 let row: (i64,) =
656 sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = ?")
657 .bind(workflow_id)
658 .fetch_one(&self.pool)
659 .await?;
660 Ok(row.0)
661 }
662
663 async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
666 let res = sqlx::query(
667 "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
668 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
669 )
670 .bind(&act.workflow_id)
671 .bind(act.seq)
672 .bind(&act.name)
673 .bind(&act.task_queue)
674 .bind(&act.input)
675 .bind(&act.status)
676 .bind(act.attempt)
677 .bind(act.max_attempts)
678 .bind(act.initial_interval_secs)
679 .bind(act.backoff_coefficient)
680 .bind(act.start_to_close_secs)
681 .bind(act.heartbeat_timeout_secs)
682 .bind(act.scheduled_at)
683 .execute(&self.pool)
684 .await?;
685 Ok(res.last_insert_rowid())
686 }
687
688 async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
689 let row = sqlx::query_as::<_, SqliteActivityRow>(
690 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
691 FROM workflow_activities WHERE id = ?",
692 )
693 .bind(id)
694 .fetch_optional(&self.pool)
695 .await?;
696 Ok(row.map(Into::into))
697 }
698
699 async fn get_activity_by_workflow_seq(
700 &self,
701 workflow_id: &str,
702 seq: i32,
703 ) -> Result<Option<WorkflowActivity>> {
704 let row = sqlx::query_as::<_, SqliteActivityRow>(
705 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
706 FROM workflow_activities WHERE workflow_id = ? AND seq = ?",
707 )
708 .bind(workflow_id)
709 .bind(seq)
710 .fetch_optional(&self.pool)
711 .await?;
712 Ok(row.map(Into::into))
713 }
714
715 async fn claim_activity(
716 &self,
717 task_queue: &str,
718 worker_id: &str,
719 ) -> Result<Option<WorkflowActivity>> {
720 let now = timestamp_now();
721 let row = sqlx::query_as::<_, SqliteActivityRow>(
722 "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = ?, started_at = ?
723 WHERE id = (
724 SELECT id FROM workflow_activities
725 WHERE task_queue = ? AND status = 'PENDING'
726 ORDER BY scheduled_at ASC
727 LIMIT 1
728 )
729 RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
730 )
731 .bind(worker_id)
732 .bind(now)
733 .bind(task_queue)
734 .fetch_optional(&self.pool)
735 .await?;
736 Ok(row.map(Into::into))
737 }
738
739 async fn requeue_activity_for_retry(
740 &self,
741 id: i64,
742 next_attempt: i32,
743 next_scheduled_at: f64,
744 ) -> Result<()> {
745 sqlx::query(
746 "UPDATE workflow_activities
747 SET status = 'PENDING', attempt = ?, scheduled_at = ?,
748 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
749 error = NULL
750 WHERE id = ?",
751 )
752 .bind(next_attempt)
753 .bind(next_scheduled_at)
754 .bind(id)
755 .execute(&self.pool)
756 .await?;
757 Ok(())
758 }
759
760 async fn complete_activity(
761 &self,
762 id: i64,
763 result: Option<&str>,
764 error: Option<&str>,
765 failed: bool,
766 ) -> Result<()> {
767 let status = if failed { "FAILED" } else { "COMPLETED" };
768 sqlx::query(
769 "UPDATE workflow_activities SET status = ?, result = ?, error = ?, completed_at = ? WHERE id = ?",
770 )
771 .bind(status)
772 .bind(result)
773 .bind(error)
774 .bind(timestamp_now())
775 .bind(id)
776 .execute(&self.pool)
777 .await?;
778 Ok(())
779 }
780
781 async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
782 sqlx::query("UPDATE workflow_activities SET last_heartbeat = ? WHERE id = ?")
783 .bind(timestamp_now())
784 .bind(id)
785 .execute(&self.pool)
786 .await?;
787 Ok(())
788 }
789
790 async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
791 let rows = sqlx::query_as::<_, SqliteActivityRow>(
792 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
793 FROM workflow_activities
794 WHERE status = 'RUNNING'
795 AND heartbeat_timeout_secs IS NOT NULL
796 AND last_heartbeat IS NOT NULL
797 AND (? - last_heartbeat) > heartbeat_timeout_secs",
798 )
799 .bind(now)
800 .fetch_all(&self.pool)
801 .await?;
802 Ok(rows.into_iter().map(Into::into).collect())
803 }
804
805 async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
808 let res = sqlx::query(
809 "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES (?, ?, ?, 0)",
810 )
811 .bind(&timer.workflow_id)
812 .bind(timer.seq)
813 .bind(timer.fire_at)
814 .execute(&self.pool)
815 .await?;
816 Ok(res.last_insert_rowid())
817 }
818
819 async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
820 let res = sqlx::query(
821 "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = ?
822 WHERE workflow_id = ? AND status = 'PENDING'",
823 )
824 .bind(timestamp_now())
825 .bind(workflow_id)
826 .execute(&self.pool)
827 .await?;
828 Ok(res.rows_affected())
829 }
830
831 async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
832 let res = sqlx::query(
833 "UPDATE workflow_timers SET fired = 1
834 WHERE workflow_id = ? AND fired = 0",
835 )
836 .bind(workflow_id)
837 .execute(&self.pool)
838 .await?;
839 Ok(res.rows_affected())
840 }
841
842 async fn get_timer_by_workflow_seq(
843 &self,
844 workflow_id: &str,
845 seq: i32,
846 ) -> Result<Option<WorkflowTimer>> {
847 let row = sqlx::query_as::<_, SqliteTimerRow>(
848 "SELECT id, workflow_id, seq, fire_at, fired
849 FROM workflow_timers WHERE workflow_id = ? AND seq = ?",
850 )
851 .bind(workflow_id)
852 .bind(seq)
853 .fetch_optional(&self.pool)
854 .await?;
855 Ok(row.map(Into::into))
856 }
857
858 async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
859 let rows = sqlx::query_as::<_, SqliteTimerRow>(
860 "UPDATE workflow_timers SET fired = 1
861 WHERE fired = 0 AND fire_at <= ?
862 RETURNING id, workflow_id, seq, fire_at, fired",
863 )
864 .bind(now)
865 .fetch_all(&self.pool)
866 .await?;
867 Ok(rows.into_iter().map(Into::into).collect())
868 }
869
870 async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
873 let res = sqlx::query(
874 "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES (?, ?, ?, 0, ?)",
875 )
876 .bind(&sig.workflow_id)
877 .bind(&sig.name)
878 .bind(&sig.payload)
879 .bind(sig.received_at)
880 .execute(&self.pool)
881 .await?;
882 Ok(res.last_insert_rowid())
883 }
884
885 async fn consume_signals(
886 &self,
887 workflow_id: &str,
888 name: &str,
889 ) -> Result<Vec<WorkflowSignal>> {
890 let rows = sqlx::query_as::<_, SqliteSignalRow>(
891 "UPDATE workflow_signals SET consumed = 1
892 WHERE workflow_id = ? AND name = ? AND consumed = 0
893 RETURNING id, workflow_id, name, payload, consumed, received_at",
894 )
895 .bind(workflow_id)
896 .bind(name)
897 .fetch_all(&self.pool)
898 .await?;
899 Ok(rows.into_iter().map(Into::into).collect())
900 }
901
902 async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
905 sqlx::query(
906 "INSERT INTO workflow_schedules (name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
907 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
908 )
909 .bind(&sched.name)
910 .bind(&sched.namespace)
911 .bind(&sched.workflow_type)
912 .bind(&sched.cron_expr)
913 .bind(&sched.timezone)
914 .bind(&sched.input)
915 .bind(&sched.task_queue)
916 .bind(&sched.overlap_policy)
917 .bind(sched.paused)
918 .bind(sched.last_run_at)
919 .bind(sched.next_run_at)
920 .bind(&sched.last_workflow_id)
921 .bind(sched.created_at)
922 .execute(&self.pool)
923 .await?;
924 Ok(())
925 }
926
927 async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
928 let row = sqlx::query_as::<_, SqliteScheduleRow>(
929 "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
930 FROM workflow_schedules WHERE namespace = ? AND name = ?",
931 )
932 .bind(namespace)
933 .bind(name)
934 .fetch_optional(&self.pool)
935 .await?;
936 Ok(row.map(Into::into))
937 }
938
939 async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
940 let rows = sqlx::query_as::<_, SqliteScheduleRow>(
941 "SELECT name, namespace, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
942 FROM workflow_schedules WHERE namespace = ? ORDER BY name",
943 )
944 .bind(namespace)
945 .fetch_all(&self.pool)
946 .await?;
947 Ok(rows.into_iter().map(Into::into).collect())
948 }
949
950 async fn update_schedule_last_run(
951 &self,
952 namespace: &str,
953 name: &str,
954 last_run_at: f64,
955 next_run_at: f64,
956 workflow_id: &str,
957 ) -> Result<()> {
958 sqlx::query(
959 "UPDATE workflow_schedules SET last_run_at = ?, next_run_at = ?, last_workflow_id = ? WHERE namespace = ? AND name = ?",
960 )
961 .bind(last_run_at)
962 .bind(next_run_at)
963 .bind(workflow_id)
964 .bind(namespace)
965 .bind(name)
966 .execute(&self.pool)
967 .await?;
968 Ok(())
969 }
970
971 async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
972 let res =
973 sqlx::query("DELETE FROM workflow_schedules WHERE namespace = ? AND name = ?")
974 .bind(namespace)
975 .bind(name)
976 .execute(&self.pool)
977 .await?;
978 Ok(res.rows_affected() > 0)
979 }
980
981 async fn list_archivable_workflows(
982 &self,
983 cutoff: f64,
984 limit: i64,
985 ) -> Result<Vec<WorkflowRecord>> {
986 let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
987 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
988 FROM workflows
989 WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
990 AND completed_at IS NOT NULL
991 AND completed_at < ?
992 AND archived_at IS NULL
993 ORDER BY completed_at ASC
994 LIMIT ?",
995 )
996 .bind(cutoff)
997 .bind(limit)
998 .fetch_all(&self.pool)
999 .await?;
1000 Ok(rows.into_iter().map(Into::into).collect())
1001 }
1002
1003 async fn mark_archived_and_purge(
1004 &self,
1005 workflow_id: &str,
1006 archive_uri: &str,
1007 archived_at: f64,
1008 ) -> Result<()> {
1009 let mut tx = self.pool.begin().await?;
1010 sqlx::query("DELETE FROM workflow_events WHERE workflow_id = ?")
1011 .bind(workflow_id)
1012 .execute(&mut *tx)
1013 .await?;
1014 sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = ?")
1015 .bind(workflow_id)
1016 .execute(&mut *tx)
1017 .await?;
1018 sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = ?")
1019 .bind(workflow_id)
1020 .execute(&mut *tx)
1021 .await?;
1022 sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = ?")
1023 .bind(workflow_id)
1024 .execute(&mut *tx)
1025 .await?;
1026 sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = ?")
1027 .bind(workflow_id)
1028 .execute(&mut *tx)
1029 .await?;
1030 sqlx::query(
1031 "UPDATE workflows SET archived_at = ?, archive_uri = ? WHERE id = ?",
1032 )
1033 .bind(archived_at)
1034 .bind(archive_uri)
1035 .bind(workflow_id)
1036 .execute(&mut *tx)
1037 .await?;
1038 tx.commit().await?;
1039 Ok(())
1040 }
1041
1042 async fn upsert_search_attributes(
1043 &self,
1044 workflow_id: &str,
1045 patch_json: &str,
1046 ) -> Result<()> {
1047 let current: Option<(Option<String>,)> =
1050 sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = ?")
1051 .bind(workflow_id)
1052 .fetch_optional(&self.pool)
1053 .await?;
1054 let merged = merge_search_attrs(
1055 current.and_then(|(s,)| s).as_deref(),
1056 patch_json,
1057 )?;
1058 sqlx::query("UPDATE workflows SET search_attributes = ? WHERE id = ?")
1059 .bind(merged)
1060 .bind(workflow_id)
1061 .execute(&self.pool)
1062 .await?;
1063 Ok(())
1064 }
1065
1066 async fn update_schedule(
1067 &self,
1068 namespace: &str,
1069 name: &str,
1070 patch: &SchedulePatch,
1071 ) -> Result<Option<WorkflowSchedule>> {
1072 let mut sets: Vec<&'static str> = Vec::new();
1075 if patch.cron_expr.is_some() {
1076 sets.push("cron_expr = ?");
1077 }
1078 if patch.timezone.is_some() {
1079 sets.push("timezone = ?");
1080 }
1081 if patch.input.is_some() {
1082 sets.push("input = ?");
1083 }
1084 if patch.task_queue.is_some() {
1085 sets.push("task_queue = ?");
1086 }
1087 if patch.overlap_policy.is_some() {
1088 sets.push("overlap_policy = ?");
1089 }
1090 if sets.is_empty() {
1092 return self.get_schedule(namespace, name).await;
1093 }
1094
1095 let sql = format!(
1096 "UPDATE workflow_schedules SET {} WHERE namespace = ? AND name = ?",
1097 sets.join(", ")
1098 );
1099 let mut q = sqlx::query(&sql);
1100 if let Some(ref v) = patch.cron_expr {
1101 q = q.bind(v);
1102 }
1103 if let Some(ref v) = patch.timezone {
1104 q = q.bind(v);
1105 }
1106 if let Some(ref v) = patch.input {
1107 q = q.bind(v.to_string());
1108 }
1109 if let Some(ref v) = patch.task_queue {
1110 q = q.bind(v);
1111 }
1112 if let Some(ref v) = patch.overlap_policy {
1113 q = q.bind(v);
1114 }
1115 let res = q
1116 .bind(namespace)
1117 .bind(name)
1118 .execute(&self.pool)
1119 .await?;
1120 if res.rows_affected() == 0 {
1121 return Ok(None);
1122 }
1123 self.get_schedule(namespace, name).await
1124 }
1125
1126 async fn set_schedule_paused(
1127 &self,
1128 namespace: &str,
1129 name: &str,
1130 paused: bool,
1131 ) -> Result<Option<WorkflowSchedule>> {
1132 let res = sqlx::query(
1133 "UPDATE workflow_schedules SET paused = ? WHERE namespace = ? AND name = ?",
1134 )
1135 .bind(paused)
1136 .bind(namespace)
1137 .bind(name)
1138 .execute(&self.pool)
1139 .await?;
1140 if res.rows_affected() == 0 {
1141 return Ok(None);
1142 }
1143 self.get_schedule(namespace, name).await
1144 }
1145
1146 async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1149 sqlx::query(
1150 "INSERT OR REPLACE INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1151 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
1152 )
1153 .bind(&w.id)
1154 .bind(&w.namespace)
1155 .bind(&w.identity)
1156 .bind(&w.task_queue)
1157 .bind(&w.workflows)
1158 .bind(&w.activities)
1159 .bind(w.max_concurrent_workflows)
1160 .bind(w.max_concurrent_activities)
1161 .bind(w.active_tasks)
1162 .bind(w.last_heartbeat)
1163 .bind(w.registered_at)
1164 .execute(&self.pool)
1165 .await?;
1166 Ok(())
1167 }
1168
1169 async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1170 sqlx::query("UPDATE workflow_workers SET last_heartbeat = ? WHERE id = ?")
1171 .bind(now)
1172 .bind(id)
1173 .execute(&self.pool)
1174 .await?;
1175 Ok(())
1176 }
1177
1178 async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1179 let rows = sqlx::query_as::<_, SqliteWorkerRow>(
1180 "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at
1181 FROM workflow_workers WHERE namespace = ? ORDER BY registered_at",
1182 )
1183 .bind(namespace)
1184 .fetch_all(&self.pool)
1185 .await?;
1186 Ok(rows.into_iter().map(Into::into).collect())
1187 }
1188
1189 async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1190 let rows: Vec<(String,)> =
1191 sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < ?")
1192 .bind(cutoff)
1193 .fetch_all(&self.pool)
1194 .await?;
1195 let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1196 if !ids.is_empty() {
1197 sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < ?")
1198 .bind(cutoff)
1199 .execute(&self.pool)
1200 .await?;
1201 }
1202 Ok(ids)
1203 }
1204
1205 async fn create_api_key(
1208 &self,
1209 key_hash: &str,
1210 prefix: &str,
1211 label: Option<&str>,
1212 created_at: f64,
1213 ) -> Result<()> {
1214 sqlx::query(
1215 "INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES (?, ?, ?, ?)",
1216 )
1217 .bind(key_hash)
1218 .bind(prefix)
1219 .bind(label)
1220 .bind(created_at)
1221 .execute(&self.pool)
1222 .await?;
1223 Ok(())
1224 }
1225
1226 async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1227 let row: Option<(i64,)> =
1228 sqlx::query_as("SELECT 1 FROM api_keys WHERE key_hash = ?")
1229 .bind(key_hash)
1230 .fetch_optional(&self.pool)
1231 .await?;
1232 Ok(row.is_some())
1233 }
1234
1235 async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>> {
1236 let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1237 "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1238 )
1239 .fetch_all(&self.pool)
1240 .await?;
1241 Ok(rows
1242 .into_iter()
1243 .map(|(prefix, label, created_at)| ApiKeyRecord {
1244 prefix,
1245 label,
1246 created_at,
1247 })
1248 .collect())
1249 }
1250
1251 async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1252 let res = sqlx::query("DELETE FROM api_keys WHERE prefix = ?")
1253 .bind(prefix)
1254 .execute(&self.pool)
1255 .await?;
1256 Ok(res.rows_affected() > 0)
1257 }
1258
1259 async fn api_keys_empty(&self) -> Result<bool> {
1260 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM api_keys")
1261 .fetch_one(&self.pool)
1262 .await?;
1263 Ok(row.0 == 0)
1264 }
1265
1266 async fn get_api_key_by_label(&self, label: &str) -> Result<Option<ApiKeyRecord>> {
1267 let row: Option<(String, Option<String>, f64)> = sqlx::query_as(
1268 "SELECT prefix, label, created_at FROM api_keys WHERE label = ? LIMIT 1",
1269 )
1270 .bind(label)
1271 .fetch_optional(&self.pool)
1272 .await?;
1273 Ok(row.map(|(prefix, label, created_at)| ApiKeyRecord {
1274 prefix,
1275 label,
1276 created_at,
1277 }))
1278 }
1279
1280 async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1283 let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
1284 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1285 FROM workflows WHERE parent_id = ? ORDER BY created_at ASC",
1286 )
1287 .bind(parent_id)
1288 .fetch_all(&self.pool)
1289 .await?;
1290 Ok(rows.into_iter().map(Into::into).collect())
1291 }
1292
1293 async fn create_snapshot(
1296 &self,
1297 workflow_id: &str,
1298 event_seq: i32,
1299 state_json: &str,
1300 ) -> Result<()> {
1301 sqlx::query(
1302 "INSERT OR REPLACE INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1303 VALUES (?, ?, ?, ?)",
1304 )
1305 .bind(workflow_id)
1306 .bind(event_seq)
1307 .bind(state_json)
1308 .bind(timestamp_now())
1309 .execute(&self.pool)
1310 .await?;
1311 Ok(())
1312 }
1313
1314 async fn get_latest_snapshot(
1315 &self,
1316 workflow_id: &str,
1317 ) -> Result<Option<WorkflowSnapshot>> {
1318 let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1319 "SELECT workflow_id, event_seq, state_json, created_at
1320 FROM workflow_snapshots WHERE workflow_id = ?
1321 ORDER BY event_seq DESC LIMIT 1",
1322 )
1323 .bind(workflow_id)
1324 .fetch_optional(&self.pool)
1325 .await?;
1326
1327 Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1328 workflow_id,
1329 event_seq,
1330 state_json,
1331 created_at,
1332 }))
1333 }
1334
1335 async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>> {
1338 let rows = sqlx::query_as::<_, (String, i64, i64)>(
1340 "SELECT a.task_queue,
1341 SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END),
1342 SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END)
1343 FROM workflow_activities a
1344 INNER JOIN workflows w ON w.id = a.workflow_id
1345 WHERE w.namespace = ?
1346 GROUP BY a.task_queue",
1347 )
1348 .bind(namespace)
1349 .fetch_all(&self.pool)
1350 .await?;
1351
1352 let mut stats: Vec<QueueStats> = rows
1353 .into_iter()
1354 .map(|(queue, pending, running)| QueueStats {
1355 queue,
1356 pending_activities: pending,
1357 running_activities: running,
1358 workers: 0,
1359 })
1360 .collect();
1361
1362 let worker_rows = sqlx::query_as::<_, (String, i64)>(
1364 "SELECT task_queue, COUNT(*) FROM workflow_workers WHERE namespace = ? GROUP BY task_queue",
1365 )
1366 .bind(namespace)
1367 .fetch_all(&self.pool)
1368 .await?;
1369
1370 for (queue, count) in worker_rows {
1371 if let Some(s) = stats.iter_mut().find(|s| s.queue == queue) {
1372 s.workers = count;
1373 } else {
1374 stats.push(QueueStats {
1375 queue,
1376 pending_activities: 0,
1377 running_activities: 0,
1378 workers: count,
1379 });
1380 }
1381 }
1382
1383 stats.sort_by(|a, b| a.queue.cmp(&b.queue));
1384 Ok(stats)
1385 }
1386
1387 async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1390 self.refresh_engine_lock().await.ok();
1393 Ok(true)
1394 }
1395}
1396
1397fn timestamp_now() -> f64 {
1398 std::time::SystemTime::now()
1399 .duration_since(std::time::UNIX_EPOCH)
1400 .unwrap()
1401 .as_secs_f64()
1402}
1403
1404pub(crate) fn merge_search_attrs(current: Option<&str>, patch_json: &str) -> Result<String> {
1407 let mut current_map: serde_json::Map<String, serde_json::Value> = current
1408 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
1409 .and_then(|v| v.as_object().cloned())
1410 .unwrap_or_default();
1411 let patch: serde_json::Value = serde_json::from_str(patch_json)
1412 .map_err(|e| anyhow::anyhow!("invalid search_attributes patch: {e}"))?;
1413 let patch_obj = patch
1414 .as_object()
1415 .ok_or_else(|| anyhow::anyhow!("search_attributes patch must be a JSON object"))?;
1416 for (k, v) in patch_obj {
1417 current_map.insert(k.clone(), v.clone());
1418 }
1419 Ok(serde_json::Value::Object(current_map).to_string())
1420}
1421
1422#[derive(sqlx::FromRow)]
1425struct SqliteWorkflowRow {
1426 id: String,
1427 namespace: String,
1428 run_id: String,
1429 workflow_type: String,
1430 task_queue: String,
1431 status: String,
1432 input: Option<String>,
1433 result: Option<String>,
1434 error: Option<String>,
1435 parent_id: Option<String>,
1436 claimed_by: Option<String>,
1437 search_attributes: Option<String>,
1438 archived_at: Option<f64>,
1439 archive_uri: Option<String>,
1440 created_at: f64,
1441 updated_at: f64,
1442 completed_at: Option<f64>,
1443}
1444
1445impl From<SqliteWorkflowRow> for WorkflowRecord {
1446 fn from(r: SqliteWorkflowRow) -> Self {
1447 Self {
1448 id: r.id,
1449 namespace: r.namespace,
1450 run_id: r.run_id,
1451 workflow_type: r.workflow_type,
1452 task_queue: r.task_queue,
1453 status: r.status,
1454 input: r.input,
1455 result: r.result,
1456 error: r.error,
1457 parent_id: r.parent_id,
1458 claimed_by: r.claimed_by,
1459 search_attributes: r.search_attributes,
1460 archived_at: r.archived_at,
1461 archive_uri: r.archive_uri,
1462 created_at: r.created_at,
1463 updated_at: r.updated_at,
1464 completed_at: r.completed_at,
1465 }
1466 }
1467}
1468
1469#[derive(sqlx::FromRow)]
1470struct SqliteEventRow {
1471 id: i64,
1472 workflow_id: String,
1473 seq: i32,
1474 event_type: String,
1475 payload: Option<String>,
1476 timestamp: f64,
1477}
1478
1479impl From<SqliteEventRow> for WorkflowEvent {
1480 fn from(r: SqliteEventRow) -> Self {
1481 Self {
1482 id: Some(r.id),
1483 workflow_id: r.workflow_id,
1484 seq: r.seq,
1485 event_type: r.event_type,
1486 payload: r.payload,
1487 timestamp: r.timestamp,
1488 }
1489 }
1490}
1491
1492#[derive(sqlx::FromRow)]
1493struct SqliteActivityRow {
1494 id: i64,
1495 workflow_id: String,
1496 seq: i32,
1497 name: String,
1498 task_queue: String,
1499 input: Option<String>,
1500 status: String,
1501 result: Option<String>,
1502 error: Option<String>,
1503 attempt: i32,
1504 max_attempts: i32,
1505 initial_interval_secs: f64,
1506 backoff_coefficient: f64,
1507 start_to_close_secs: f64,
1508 heartbeat_timeout_secs: Option<f64>,
1509 claimed_by: Option<String>,
1510 scheduled_at: f64,
1511 started_at: Option<f64>,
1512 completed_at: Option<f64>,
1513 last_heartbeat: Option<f64>,
1514}
1515
1516impl From<SqliteActivityRow> for WorkflowActivity {
1517 fn from(r: SqliteActivityRow) -> Self {
1518 Self {
1519 id: Some(r.id),
1520 workflow_id: r.workflow_id,
1521 seq: r.seq,
1522 name: r.name,
1523 task_queue: r.task_queue,
1524 input: r.input,
1525 status: r.status,
1526 result: r.result,
1527 error: r.error,
1528 attempt: r.attempt,
1529 max_attempts: r.max_attempts,
1530 initial_interval_secs: r.initial_interval_secs,
1531 backoff_coefficient: r.backoff_coefficient,
1532 start_to_close_secs: r.start_to_close_secs,
1533 heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1534 claimed_by: r.claimed_by,
1535 scheduled_at: r.scheduled_at,
1536 started_at: r.started_at,
1537 completed_at: r.completed_at,
1538 last_heartbeat: r.last_heartbeat,
1539 }
1540 }
1541}
1542
1543#[derive(sqlx::FromRow)]
1544struct SqliteTimerRow {
1545 id: i64,
1546 workflow_id: String,
1547 seq: i32,
1548 fire_at: f64,
1549 fired: bool,
1550}
1551
1552impl From<SqliteTimerRow> for WorkflowTimer {
1553 fn from(r: SqliteTimerRow) -> Self {
1554 Self {
1555 id: Some(r.id),
1556 workflow_id: r.workflow_id,
1557 seq: r.seq,
1558 fire_at: r.fire_at,
1559 fired: r.fired,
1560 }
1561 }
1562}
1563
1564#[derive(sqlx::FromRow)]
1565struct SqliteSignalRow {
1566 id: i64,
1567 workflow_id: String,
1568 name: String,
1569 payload: Option<String>,
1570 consumed: bool,
1571 received_at: f64,
1572}
1573
1574impl From<SqliteSignalRow> for WorkflowSignal {
1575 fn from(r: SqliteSignalRow) -> Self {
1576 Self {
1577 id: Some(r.id),
1578 workflow_id: r.workflow_id,
1579 name: r.name,
1580 payload: r.payload,
1581 consumed: r.consumed,
1582 received_at: r.received_at,
1583 }
1584 }
1585}
1586
1587#[derive(sqlx::FromRow)]
1588struct SqliteScheduleRow {
1589 name: String,
1590 namespace: String,
1591 workflow_type: String,
1592 cron_expr: String,
1593 timezone: String,
1594 input: Option<String>,
1595 task_queue: String,
1596 overlap_policy: String,
1597 paused: bool,
1598 last_run_at: Option<f64>,
1599 next_run_at: Option<f64>,
1600 last_workflow_id: Option<String>,
1601 created_at: f64,
1602}
1603
1604impl From<SqliteScheduleRow> for WorkflowSchedule {
1605 fn from(r: SqliteScheduleRow) -> Self {
1606 Self {
1607 name: r.name,
1608 namespace: r.namespace,
1609 workflow_type: r.workflow_type,
1610 cron_expr: r.cron_expr,
1611 timezone: r.timezone,
1612 input: r.input,
1613 task_queue: r.task_queue,
1614 overlap_policy: r.overlap_policy,
1615 paused: r.paused,
1616 last_run_at: r.last_run_at,
1617 next_run_at: r.next_run_at,
1618 last_workflow_id: r.last_workflow_id,
1619 created_at: r.created_at,
1620 }
1621 }
1622}
1623
1624#[derive(sqlx::FromRow)]
1625struct SqliteWorkerRow {
1626 id: String,
1627 namespace: String,
1628 identity: String,
1629 task_queue: String,
1630 workflows: Option<String>,
1631 activities: Option<String>,
1632 max_concurrent_workflows: i32,
1633 max_concurrent_activities: i32,
1634 active_tasks: i32,
1635 last_heartbeat: f64,
1636 registered_at: f64,
1637}
1638
1639impl From<SqliteWorkerRow> for WorkflowWorker {
1640 fn from(r: SqliteWorkerRow) -> Self {
1641 Self {
1642 id: r.id,
1643 namespace: r.namespace,
1644 identity: r.identity,
1645 task_queue: r.task_queue,
1646 workflows: r.workflows,
1647 activities: r.activities,
1648 max_concurrent_workflows: r.max_concurrent_workflows,
1649 max_concurrent_activities: r.max_concurrent_activities,
1650 active_tasks: r.active_tasks,
1651 last_heartbeat: r.last_heartbeat,
1652 registered_at: r.registered_at,
1653 }
1654 }
1655}