1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9 name TEXT PRIMARY KEY,
10 created_at DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13 VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14 ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17 id TEXT PRIMARY KEY,
18 namespace TEXT NOT NULL DEFAULT 'main',
19 run_id TEXT NOT NULL,
20 workflow_type TEXT NOT NULL,
21 task_queue TEXT NOT NULL DEFAULT 'main',
22 status TEXT NOT NULL DEFAULT 'PENDING',
23 input TEXT,
24 result TEXT,
25 error TEXT,
26 parent_id TEXT,
27 claimed_by TEXT,
28 -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
29 needs_dispatch BOOLEAN NOT NULL DEFAULT FALSE,
30 dispatch_claimed_by TEXT,
31 dispatch_last_heartbeat DOUBLE PRECISION,
32 created_at DOUBLE PRECISION NOT NULL,
33 updated_at DOUBLE PRECISION NOT NULL,
34 completed_at DOUBLE PRECISION
35);
36CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
37CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
38CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
39
40CREATE TABLE IF NOT EXISTS workflow_events (
41 id BIGSERIAL PRIMARY KEY,
42 workflow_id TEXT NOT NULL REFERENCES workflows(id),
43 seq INTEGER NOT NULL,
44 event_type TEXT NOT NULL,
45 payload TEXT,
46 timestamp DOUBLE PRECISION NOT NULL
47);
48CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
49
50CREATE TABLE IF NOT EXISTS workflow_activities (
51 id BIGSERIAL PRIMARY KEY,
52 workflow_id TEXT NOT NULL REFERENCES workflows(id),
53 seq INTEGER NOT NULL,
54 name TEXT NOT NULL,
55 task_queue TEXT NOT NULL DEFAULT 'main',
56 input TEXT,
57 status TEXT NOT NULL DEFAULT 'PENDING',
58 result TEXT,
59 error TEXT,
60 attempt INTEGER NOT NULL DEFAULT 1,
61 max_attempts INTEGER NOT NULL DEFAULT 3,
62 initial_interval_secs DOUBLE PRECISION NOT NULL DEFAULT 1,
63 backoff_coefficient DOUBLE PRECISION NOT NULL DEFAULT 2,
64 start_to_close_secs DOUBLE PRECISION NOT NULL DEFAULT 300,
65 heartbeat_timeout_secs DOUBLE PRECISION,
66 claimed_by TEXT,
67 scheduled_at DOUBLE PRECISION NOT NULL,
68 started_at DOUBLE PRECISION,
69 completed_at DOUBLE PRECISION,
70 last_heartbeat DOUBLE PRECISION,
71 UNIQUE (workflow_id, seq)
72);
73CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
74
75CREATE TABLE IF NOT EXISTS workflow_timers (
76 id BIGSERIAL PRIMARY KEY,
77 workflow_id TEXT NOT NULL REFERENCES workflows(id),
78 seq INTEGER NOT NULL,
79 fire_at DOUBLE PRECISION NOT NULL,
80 fired BOOLEAN NOT NULL DEFAULT FALSE,
81 UNIQUE (workflow_id, seq)
82);
83CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
84
85CREATE TABLE IF NOT EXISTS workflow_signals (
86 id BIGSERIAL PRIMARY KEY,
87 workflow_id TEXT NOT NULL REFERENCES workflows(id),
88 name TEXT NOT NULL,
89 payload TEXT,
90 consumed BOOLEAN NOT NULL DEFAULT FALSE,
91 received_at DOUBLE PRECISION NOT NULL
92);
93CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
94
95CREATE TABLE IF NOT EXISTS workflow_schedules (
96 namespace TEXT NOT NULL DEFAULT 'main',
97 name TEXT NOT NULL,
98 workflow_type TEXT NOT NULL,
99 cron_expr TEXT NOT NULL,
100 input TEXT,
101 task_queue TEXT NOT NULL DEFAULT 'main',
102 overlap_policy TEXT NOT NULL DEFAULT 'skip',
103 paused BOOLEAN NOT NULL DEFAULT FALSE,
104 last_run_at DOUBLE PRECISION,
105 next_run_at DOUBLE PRECISION,
106 last_workflow_id TEXT,
107 created_at DOUBLE PRECISION NOT NULL,
108 PRIMARY KEY (namespace, name)
109);
110
111CREATE TABLE IF NOT EXISTS workflow_workers (
112 id TEXT PRIMARY KEY,
113 namespace TEXT NOT NULL DEFAULT 'main',
114 identity TEXT NOT NULL,
115 task_queue TEXT NOT NULL,
116 workflows TEXT,
117 activities TEXT,
118 max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
119 max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
120 active_tasks INTEGER NOT NULL DEFAULT 0,
121 last_heartbeat DOUBLE PRECISION NOT NULL,
122 registered_at DOUBLE PRECISION NOT NULL
123);
124
125CREATE TABLE IF NOT EXISTS workflow_snapshots (
126 workflow_id TEXT NOT NULL REFERENCES workflows(id),
127 event_seq INTEGER NOT NULL,
128 state_json TEXT NOT NULL,
129 created_at DOUBLE PRECISION NOT NULL,
130 PRIMARY KEY (workflow_id, event_seq)
131);
132
133CREATE TABLE IF NOT EXISTS api_keys (
134 key_hash TEXT PRIMARY KEY,
135 prefix TEXT NOT NULL,
136 label TEXT,
137 created_at DOUBLE PRECISION NOT NULL
138);
139CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
140"#;
141
142pub struct PostgresStore {
143 pool: PgPool,
144}
145
146impl PostgresStore {
147 pub async fn new(url: &str) -> Result<Self> {
148 let pool = PgPool::connect(url).await?;
149 let store = Self { pool };
150 store.migrate().await?;
151 Ok(store)
152 }
153
154 async fn migrate(&self) -> Result<()> {
155 for statement in SCHEMA.split(';') {
157 let trimmed = statement.trim();
158 if !trimmed.is_empty() {
159 sqlx::query(trimmed).execute(&self.pool).await?;
160 }
161 }
162 Ok(())
163 }
164
165 pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
168 let row: (bool,) =
169 sqlx::query_as("SELECT pg_try_advisory_lock(1)")
170 .fetch_one(&self.pool)
171 .await?;
172 Ok(row.0)
173 }
174}
175
176impl WorkflowStore for PostgresStore {
177 async fn create_namespace(&self, name: &str) -> Result<()> {
180 sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
181 .bind(name)
182 .execute(&self.pool)
183 .await?;
184 Ok(())
185 }
186
187 async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
188 let rows = sqlx::query_as::<_, (String, f64)>(
189 "SELECT name, created_at FROM namespaces ORDER BY name",
190 )
191 .fetch_all(&self.pool)
192 .await?;
193 Ok(rows
194 .into_iter()
195 .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
196 .collect())
197 }
198
199 async fn delete_namespace(&self, name: &str) -> Result<bool> {
200 let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
201 .bind(name)
202 .execute(&self.pool)
203 .await?;
204 Ok(res.rows_affected() > 0)
205 }
206
207 async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
208 let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
209 .bind(namespace)
210 .fetch_one(&self.pool)
211 .await?;
212 let running: (i64,) = sqlx::query_as(
213 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
214 )
215 .bind(namespace)
216 .fetch_one(&self.pool)
217 .await?;
218 let pending: (i64,) = sqlx::query_as(
219 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
220 )
221 .bind(namespace)
222 .fetch_one(&self.pool)
223 .await?;
224 let completed: (i64,) = sqlx::query_as(
225 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
226 )
227 .bind(namespace)
228 .fetch_one(&self.pool)
229 .await?;
230 let failed: (i64,) = sqlx::query_as(
231 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
232 )
233 .bind(namespace)
234 .fetch_one(&self.pool)
235 .await?;
236 let schedules: (i64,) =
237 sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
238 .bind(namespace)
239 .fetch_one(&self.pool)
240 .await?;
241 let workers: (i64,) =
242 sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
243 .bind(namespace)
244 .fetch_one(&self.pool)
245 .await?;
246
247 Ok(crate::store::NamespaceStats {
248 namespace: namespace.to_string(),
249 total_workflows: total.0,
250 running: running.0,
251 pending: pending.0,
252 completed: completed.0,
253 failed: failed.0,
254 schedules: schedules.0,
255 workers: workers.0,
256 })
257 }
258
259 async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
262 sqlx::query(
263 "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at)
264 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
265 )
266 .bind(&wf.id)
267 .bind(&wf.namespace)
268 .bind(&wf.run_id)
269 .bind(&wf.workflow_type)
270 .bind(&wf.task_queue)
271 .bind(&wf.status)
272 .bind(&wf.input)
273 .bind(&wf.result)
274 .bind(&wf.error)
275 .bind(&wf.parent_id)
276 .bind(&wf.claimed_by)
277 .bind(wf.created_at)
278 .bind(wf.updated_at)
279 .bind(wf.completed_at)
280 .execute(&self.pool)
281 .await?;
282 Ok(())
283 }
284
285 async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
286 let row = sqlx::query_as::<_, PgWorkflowRow>(
287 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
288 )
289 .bind(id)
290 .fetch_optional(&self.pool)
291 .await?;
292 Ok(row.map(Into::into))
293 }
294
295 async fn list_workflows(
296 &self,
297 namespace: &str,
298 status: Option<WorkflowStatus>,
299 workflow_type: Option<&str>,
300 limit: i64,
301 offset: i64,
302 ) -> Result<Vec<WorkflowRecord>> {
303 let status_str = status.map(|s| s.to_string());
304 let rows = sqlx::query_as::<_, PgWorkflowRow>(
305 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
306 FROM workflows
307 WHERE namespace = $1
308 AND ($2::TEXT IS NULL OR status = $2)
309 AND ($3::TEXT IS NULL OR workflow_type = $3)
310 ORDER BY created_at DESC
311 LIMIT $4 OFFSET $5",
312 )
313 .bind(namespace)
314 .bind(&status_str)
315 .bind(workflow_type)
316 .bind(limit)
317 .bind(offset)
318 .fetch_all(&self.pool)
319 .await?;
320 Ok(rows.into_iter().map(Into::into).collect())
321 }
322
323 async fn update_workflow_status(
324 &self,
325 id: &str,
326 status: WorkflowStatus,
327 result: Option<&str>,
328 error: Option<&str>,
329 ) -> Result<()> {
330 let now = timestamp_now();
331 let completed_at = if status.is_terminal() { Some(now) } else { None };
332 sqlx::query(
333 "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
334 )
335 .bind(status.to_string())
336 .bind(result)
337 .bind(error)
338 .bind(now)
339 .bind(completed_at)
340 .bind(id)
341 .execute(&self.pool)
342 .await?;
343 Ok(())
344 }
345
346 async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
347 let res = sqlx::query(
348 "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
349 )
350 .bind(worker_id)
351 .bind(timestamp_now())
352 .bind(id)
353 .execute(&self.pool)
354 .await?;
355 Ok(res.rows_affected() > 0)
356 }
357
358 async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
359 sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
360 .bind(workflow_id)
361 .execute(&self.pool)
362 .await?;
363 Ok(())
364 }
365
366 async fn claim_workflow_task(
367 &self,
368 task_queue: &str,
369 worker_id: &str,
370 ) -> Result<Option<WorkflowRecord>> {
371 let now = timestamp_now();
372 let row = sqlx::query_as::<_, PgWorkflowRow>(
375 "UPDATE workflows
376 SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
377 WHERE id = (
378 SELECT id FROM workflows
379 WHERE task_queue = $3
380 AND needs_dispatch = TRUE
381 AND dispatch_claimed_by IS NULL
382 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
383 ORDER BY updated_at ASC
384 FOR UPDATE SKIP LOCKED
385 LIMIT 1
386 )
387 RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at",
388 )
389 .bind(worker_id)
390 .bind(now)
391 .bind(task_queue)
392 .fetch_optional(&self.pool)
393 .await?;
394 Ok(row.map(Into::into))
395 }
396
397 async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
398 sqlx::query(
399 "UPDATE workflows
400 SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
401 WHERE id = $1 AND dispatch_claimed_by = $2",
402 )
403 .bind(workflow_id)
404 .bind(worker_id)
405 .execute(&self.pool)
406 .await?;
407 Ok(())
408 }
409
410 async fn release_stale_dispatch_leases(
411 &self,
412 now: f64,
413 timeout_secs: f64,
414 ) -> Result<u64> {
415 let res = sqlx::query(
416 "UPDATE workflows
417 SET dispatch_claimed_by = NULL,
418 dispatch_last_heartbeat = NULL,
419 needs_dispatch = TRUE
420 WHERE dispatch_claimed_by IS NOT NULL
421 AND ($1 - dispatch_last_heartbeat) > $2
422 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
423 )
424 .bind(now)
425 .bind(timeout_secs)
426 .execute(&self.pool)
427 .await?;
428 Ok(res.rows_affected())
429 }
430
431 async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
434 let row: (i64,) = sqlx::query_as(
435 "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
436 )
437 .bind(&ev.workflow_id)
438 .bind(ev.seq)
439 .bind(&ev.event_type)
440 .bind(&ev.payload)
441 .bind(ev.timestamp)
442 .fetch_one(&self.pool)
443 .await?;
444 Ok(row.0)
445 }
446
447 async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
448 let rows = sqlx::query_as::<_, PgEventRow>(
449 "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
450 )
451 .bind(workflow_id)
452 .fetch_all(&self.pool)
453 .await?;
454 Ok(rows.into_iter().map(Into::into).collect())
455 }
456
457 async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
458 let row: (i64,) =
459 sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
460 .bind(workflow_id)
461 .fetch_one(&self.pool)
462 .await?;
463 Ok(row.0)
464 }
465
466 async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
469 let row: (i64,) = sqlx::query_as(
470 "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
471 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
472 )
473 .bind(&act.workflow_id)
474 .bind(act.seq)
475 .bind(&act.name)
476 .bind(&act.task_queue)
477 .bind(&act.input)
478 .bind(&act.status)
479 .bind(act.attempt)
480 .bind(act.max_attempts)
481 .bind(act.initial_interval_secs)
482 .bind(act.backoff_coefficient)
483 .bind(act.start_to_close_secs)
484 .bind(act.heartbeat_timeout_secs)
485 .bind(act.scheduled_at)
486 .fetch_one(&self.pool)
487 .await?;
488 Ok(row.0)
489 }
490
491 async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
492 let row = sqlx::query_as::<_, PgActivityRow>(
493 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
494 FROM workflow_activities WHERE id = $1",
495 )
496 .bind(id)
497 .fetch_optional(&self.pool)
498 .await?;
499 Ok(row.map(Into::into))
500 }
501
502 async fn get_activity_by_workflow_seq(
503 &self,
504 workflow_id: &str,
505 seq: i32,
506 ) -> Result<Option<WorkflowActivity>> {
507 let row = sqlx::query_as::<_, PgActivityRow>(
508 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
509 FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
510 )
511 .bind(workflow_id)
512 .bind(seq)
513 .fetch_optional(&self.pool)
514 .await?;
515 Ok(row.map(Into::into))
516 }
517
518 async fn claim_activity(
519 &self,
520 task_queue: &str,
521 worker_id: &str,
522 ) -> Result<Option<WorkflowActivity>> {
523 let now = timestamp_now();
524 let row = sqlx::query_as::<_, PgActivityRow>(
527 "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
528 WHERE id = (
529 SELECT id FROM workflow_activities
530 WHERE task_queue = $3 AND status = 'PENDING'
531 ORDER BY scheduled_at ASC
532 FOR UPDATE SKIP LOCKED
533 LIMIT 1
534 )
535 RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
536 )
537 .bind(worker_id)
538 .bind(now)
539 .bind(task_queue)
540 .fetch_optional(&self.pool)
541 .await?;
542 Ok(row.map(Into::into))
543 }
544
545 async fn requeue_activity_for_retry(
546 &self,
547 id: i64,
548 next_attempt: i32,
549 next_scheduled_at: f64,
550 ) -> Result<()> {
551 sqlx::query(
552 "UPDATE workflow_activities
553 SET status = 'PENDING', attempt = $1, scheduled_at = $2,
554 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
555 error = NULL
556 WHERE id = $3",
557 )
558 .bind(next_attempt)
559 .bind(next_scheduled_at)
560 .bind(id)
561 .execute(&self.pool)
562 .await?;
563 Ok(())
564 }
565
566 async fn complete_activity(
567 &self,
568 id: i64,
569 result: Option<&str>,
570 error: Option<&str>,
571 failed: bool,
572 ) -> Result<()> {
573 let status = if failed { "FAILED" } else { "COMPLETED" };
574 sqlx::query(
575 "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
576 )
577 .bind(status)
578 .bind(result)
579 .bind(error)
580 .bind(timestamp_now())
581 .bind(id)
582 .execute(&self.pool)
583 .await?;
584 Ok(())
585 }
586
587 async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
588 sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
589 .bind(timestamp_now())
590 .bind(id)
591 .execute(&self.pool)
592 .await?;
593 Ok(())
594 }
595
596 async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
597 let rows = sqlx::query_as::<_, PgActivityRow>(
598 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
599 FROM workflow_activities
600 WHERE status = 'RUNNING'
601 AND heartbeat_timeout_secs IS NOT NULL
602 AND last_heartbeat IS NOT NULL
603 AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
604 )
605 .bind(now)
606 .fetch_all(&self.pool)
607 .await?;
608 Ok(rows.into_iter().map(Into::into).collect())
609 }
610
611 async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
614 let row: (i64,) = sqlx::query_as(
615 "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
616 )
617 .bind(&timer.workflow_id)
618 .bind(timer.seq)
619 .bind(timer.fire_at)
620 .fetch_one(&self.pool)
621 .await?;
622 Ok(row.0)
623 }
624
625 async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
626 let res = sqlx::query(
627 "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
628 WHERE workflow_id = $2 AND status = 'PENDING'",
629 )
630 .bind(timestamp_now())
631 .bind(workflow_id)
632 .execute(&self.pool)
633 .await?;
634 Ok(res.rows_affected())
635 }
636
637 async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
638 let res = sqlx::query(
639 "UPDATE workflow_timers SET fired = TRUE
640 WHERE workflow_id = $1 AND fired = FALSE",
641 )
642 .bind(workflow_id)
643 .execute(&self.pool)
644 .await?;
645 Ok(res.rows_affected())
646 }
647
648 async fn get_timer_by_workflow_seq(
649 &self,
650 workflow_id: &str,
651 seq: i32,
652 ) -> Result<Option<WorkflowTimer>> {
653 let row = sqlx::query_as::<_, PgTimerRow>(
654 "SELECT id, workflow_id, seq, fire_at, fired
655 FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
656 )
657 .bind(workflow_id)
658 .bind(seq)
659 .fetch_optional(&self.pool)
660 .await?;
661 Ok(row.map(Into::into))
662 }
663
664 async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
665 let rows = sqlx::query_as::<_, PgTimerRow>(
666 "UPDATE workflow_timers SET fired = TRUE
667 WHERE fired = FALSE AND fire_at <= $1
668 RETURNING id, workflow_id, seq, fire_at, fired",
669 )
670 .bind(now)
671 .fetch_all(&self.pool)
672 .await?;
673 Ok(rows.into_iter().map(Into::into).collect())
674 }
675
676 async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
679 let row: (i64,) = sqlx::query_as(
680 "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
681 )
682 .bind(&sig.workflow_id)
683 .bind(&sig.name)
684 .bind(&sig.payload)
685 .bind(sig.received_at)
686 .fetch_one(&self.pool)
687 .await?;
688 Ok(row.0)
689 }
690
691 async fn consume_signals(
692 &self,
693 workflow_id: &str,
694 name: &str,
695 ) -> Result<Vec<WorkflowSignal>> {
696 let rows = sqlx::query_as::<_, PgSignalRow>(
697 "UPDATE workflow_signals SET consumed = TRUE
698 WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
699 RETURNING id, workflow_id, name, payload, consumed, received_at",
700 )
701 .bind(workflow_id)
702 .bind(name)
703 .fetch_all(&self.pool)
704 .await?;
705 Ok(rows.into_iter().map(Into::into).collect())
706 }
707
708 async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
711 sqlx::query(
712 "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
713 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)",
714 )
715 .bind(&sched.namespace)
716 .bind(&sched.name)
717 .bind(&sched.workflow_type)
718 .bind(&sched.cron_expr)
719 .bind(&sched.input)
720 .bind(&sched.task_queue)
721 .bind(&sched.overlap_policy)
722 .bind(sched.paused)
723 .bind(sched.last_run_at)
724 .bind(sched.next_run_at)
725 .bind(&sched.last_workflow_id)
726 .bind(sched.created_at)
727 .execute(&self.pool)
728 .await?;
729 Ok(())
730 }
731
732 async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
733 let row = sqlx::query_as::<_, PgScheduleRow>(
734 "SELECT namespace, name, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
735 )
736 .bind(namespace)
737 .bind(name)
738 .fetch_optional(&self.pool)
739 .await?;
740 Ok(row.map(Into::into))
741 }
742
743 async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
744 let rows = sqlx::query_as::<_, PgScheduleRow>(
745 "SELECT namespace, name, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
746 )
747 .bind(namespace)
748 .fetch_all(&self.pool)
749 .await?;
750 Ok(rows.into_iter().map(Into::into).collect())
751 }
752
753 async fn update_schedule_last_run(
754 &self,
755 namespace: &str,
756 name: &str,
757 last_run_at: f64,
758 next_run_at: f64,
759 workflow_id: &str,
760 ) -> Result<()> {
761 sqlx::query(
762 "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
763 )
764 .bind(last_run_at)
765 .bind(next_run_at)
766 .bind(workflow_id)
767 .bind(namespace)
768 .bind(name)
769 .execute(&self.pool)
770 .await?;
771 Ok(())
772 }
773
774 async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
775 let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
776 .bind(namespace)
777 .bind(name)
778 .execute(&self.pool)
779 .await?;
780 Ok(res.rows_affected() > 0)
781 }
782
783 async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
786 sqlx::query(
787 "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
788 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
789 ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
790 )
791 .bind(&w.id)
792 .bind(&w.namespace)
793 .bind(&w.identity)
794 .bind(&w.task_queue)
795 .bind(&w.workflows)
796 .bind(&w.activities)
797 .bind(w.max_concurrent_workflows)
798 .bind(w.max_concurrent_activities)
799 .bind(w.active_tasks)
800 .bind(w.last_heartbeat)
801 .bind(w.registered_at)
802 .execute(&self.pool)
803 .await?;
804 Ok(())
805 }
806
807 async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
808 sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
809 .bind(now)
810 .bind(id)
811 .execute(&self.pool)
812 .await?;
813 Ok(())
814 }
815
816 async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
817 let rows = sqlx::query_as::<_, PgWorkerRow>(
818 "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
819 )
820 .bind(namespace)
821 .fetch_all(&self.pool)
822 .await?;
823 Ok(rows.into_iter().map(Into::into).collect())
824 }
825
826 async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
827 let rows: Vec<(String,)> =
828 sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
829 .bind(cutoff)
830 .fetch_all(&self.pool)
831 .await?;
832 let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
833 if !ids.is_empty() {
834 sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
835 .bind(cutoff)
836 .execute(&self.pool)
837 .await?;
838 }
839 Ok(ids)
840 }
841
842 async fn create_api_key(
845 &self,
846 key_hash: &str,
847 prefix: &str,
848 label: Option<&str>,
849 created_at: f64,
850 ) -> Result<()> {
851 sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
852 .bind(key_hash)
853 .bind(prefix)
854 .bind(label)
855 .bind(created_at)
856 .execute(&self.pool)
857 .await?;
858 Ok(())
859 }
860
861 async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
862 let row: Option<(i64,)> =
863 sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
864 .bind(key_hash)
865 .fetch_optional(&self.pool)
866 .await?;
867 Ok(row.is_some())
868 }
869
870 async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
871 let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
872 "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
873 )
874 .fetch_all(&self.pool)
875 .await?;
876 Ok(rows
877 .into_iter()
878 .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
879 prefix,
880 label,
881 created_at,
882 })
883 .collect())
884 }
885
886 async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
887 let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
888 .bind(prefix)
889 .execute(&self.pool)
890 .await?;
891 Ok(res.rows_affected() > 0)
892 }
893
894 async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
897 let rows = sqlx::query_as::<_, PgWorkflowRow>(
898 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
899 FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
900 )
901 .bind(parent_id)
902 .fetch_all(&self.pool)
903 .await?;
904 Ok(rows.into_iter().map(Into::into).collect())
905 }
906
907 async fn create_snapshot(
910 &self,
911 workflow_id: &str,
912 event_seq: i32,
913 state_json: &str,
914 ) -> Result<()> {
915 sqlx::query(
916 "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
917 VALUES ($1, $2, $3, $4)
918 ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
919 )
920 .bind(workflow_id)
921 .bind(event_seq)
922 .bind(state_json)
923 .bind(timestamp_now())
924 .execute(&self.pool)
925 .await?;
926 Ok(())
927 }
928
929 async fn get_latest_snapshot(
930 &self,
931 workflow_id: &str,
932 ) -> Result<Option<WorkflowSnapshot>> {
933 let row = sqlx::query_as::<_, (String, i32, String, f64)>(
934 "SELECT workflow_id, event_seq, state_json, created_at
935 FROM workflow_snapshots WHERE workflow_id = $1
936 ORDER BY event_seq DESC LIMIT 1",
937 )
938 .bind(workflow_id)
939 .fetch_optional(&self.pool)
940 .await?;
941
942 Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
943 workflow_id,
944 event_seq,
945 state_json,
946 created_at,
947 }))
948 }
949
950 async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
953 let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
954 "SELECT
955 a.task_queue AS queue,
956 SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
957 SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
958 (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
959 FROM workflow_activities a
960 JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
961 GROUP BY a.task_queue",
962 )
963 .bind(namespace)
964 .fetch_all(&self.pool)
965 .await?;
966
967 Ok(rows
968 .into_iter()
969 .map(|(queue, pending, running, workers)| crate::store::QueueStats {
970 queue,
971 pending_activities: pending,
972 running_activities: running,
973 workers,
974 })
975 .collect())
976 }
977
978 async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
981 let row: (bool,) =
985 sqlx::query_as("SELECT pg_try_advisory_lock(42)")
986 .fetch_one(&self.pool)
987 .await?;
988 Ok(row.0)
989 }
990}
991
992fn timestamp_now() -> f64 {
993 std::time::SystemTime::now()
994 .duration_since(std::time::UNIX_EPOCH)
995 .unwrap()
996 .as_secs_f64()
997}
998
999#[derive(sqlx::FromRow)]
1002struct PgWorkflowRow {
1003 id: String,
1004 namespace: String,
1005 run_id: String,
1006 workflow_type: String,
1007 task_queue: String,
1008 status: String,
1009 input: Option<String>,
1010 result: Option<String>,
1011 error: Option<String>,
1012 parent_id: Option<String>,
1013 claimed_by: Option<String>,
1014 created_at: f64,
1015 updated_at: f64,
1016 completed_at: Option<f64>,
1017}
1018
1019impl From<PgWorkflowRow> for WorkflowRecord {
1020 fn from(r: PgWorkflowRow) -> Self {
1021 Self {
1022 id: r.id,
1023 namespace: r.namespace,
1024 run_id: r.run_id,
1025 workflow_type: r.workflow_type,
1026 task_queue: r.task_queue,
1027 status: r.status,
1028 input: r.input,
1029 result: r.result,
1030 error: r.error,
1031 parent_id: r.parent_id,
1032 claimed_by: r.claimed_by,
1033 created_at: r.created_at,
1034 updated_at: r.updated_at,
1035 completed_at: r.completed_at,
1036 }
1037 }
1038}
1039
1040#[derive(sqlx::FromRow)]
1041struct PgEventRow {
1042 id: i64,
1043 workflow_id: String,
1044 seq: i32,
1045 event_type: String,
1046 payload: Option<String>,
1047 timestamp: f64,
1048}
1049
1050impl From<PgEventRow> for WorkflowEvent {
1051 fn from(r: PgEventRow) -> Self {
1052 Self {
1053 id: Some(r.id),
1054 workflow_id: r.workflow_id,
1055 seq: r.seq,
1056 event_type: r.event_type,
1057 payload: r.payload,
1058 timestamp: r.timestamp,
1059 }
1060 }
1061}
1062
1063#[derive(sqlx::FromRow)]
1064struct PgActivityRow {
1065 id: i64,
1066 workflow_id: String,
1067 seq: i32,
1068 name: String,
1069 task_queue: String,
1070 input: Option<String>,
1071 status: String,
1072 result: Option<String>,
1073 error: Option<String>,
1074 attempt: i32,
1075 max_attempts: i32,
1076 initial_interval_secs: f64,
1077 backoff_coefficient: f64,
1078 start_to_close_secs: f64,
1079 heartbeat_timeout_secs: Option<f64>,
1080 claimed_by: Option<String>,
1081 scheduled_at: f64,
1082 started_at: Option<f64>,
1083 completed_at: Option<f64>,
1084 last_heartbeat: Option<f64>,
1085}
1086
1087impl From<PgActivityRow> for WorkflowActivity {
1088 fn from(r: PgActivityRow) -> Self {
1089 Self {
1090 id: Some(r.id),
1091 workflow_id: r.workflow_id,
1092 seq: r.seq,
1093 name: r.name,
1094 task_queue: r.task_queue,
1095 input: r.input,
1096 status: r.status,
1097 result: r.result,
1098 error: r.error,
1099 attempt: r.attempt,
1100 max_attempts: r.max_attempts,
1101 initial_interval_secs: r.initial_interval_secs,
1102 backoff_coefficient: r.backoff_coefficient,
1103 start_to_close_secs: r.start_to_close_secs,
1104 heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1105 claimed_by: r.claimed_by,
1106 scheduled_at: r.scheduled_at,
1107 started_at: r.started_at,
1108 completed_at: r.completed_at,
1109 last_heartbeat: r.last_heartbeat,
1110 }
1111 }
1112}
1113
1114#[derive(sqlx::FromRow)]
1115struct PgTimerRow {
1116 id: i64,
1117 workflow_id: String,
1118 seq: i32,
1119 fire_at: f64,
1120 fired: bool,
1121}
1122
1123impl From<PgTimerRow> for WorkflowTimer {
1124 fn from(r: PgTimerRow) -> Self {
1125 Self {
1126 id: Some(r.id),
1127 workflow_id: r.workflow_id,
1128 seq: r.seq,
1129 fire_at: r.fire_at,
1130 fired: r.fired,
1131 }
1132 }
1133}
1134
1135#[derive(sqlx::FromRow)]
1136struct PgSignalRow {
1137 id: i64,
1138 workflow_id: String,
1139 name: String,
1140 payload: Option<String>,
1141 consumed: bool,
1142 received_at: f64,
1143}
1144
1145impl From<PgSignalRow> for WorkflowSignal {
1146 fn from(r: PgSignalRow) -> Self {
1147 Self {
1148 id: Some(r.id),
1149 workflow_id: r.workflow_id,
1150 name: r.name,
1151 payload: r.payload,
1152 consumed: r.consumed,
1153 received_at: r.received_at,
1154 }
1155 }
1156}
1157
1158#[derive(sqlx::FromRow)]
1159struct PgScheduleRow {
1160 namespace: String,
1161 name: String,
1162 workflow_type: String,
1163 cron_expr: String,
1164 input: Option<String>,
1165 task_queue: String,
1166 overlap_policy: String,
1167 paused: bool,
1168 last_run_at: Option<f64>,
1169 next_run_at: Option<f64>,
1170 last_workflow_id: Option<String>,
1171 created_at: f64,
1172}
1173
1174impl From<PgScheduleRow> for WorkflowSchedule {
1175 fn from(r: PgScheduleRow) -> Self {
1176 Self {
1177 namespace: r.namespace,
1178 name: r.name,
1179 workflow_type: r.workflow_type,
1180 cron_expr: r.cron_expr,
1181 input: r.input,
1182 task_queue: r.task_queue,
1183 overlap_policy: r.overlap_policy,
1184 paused: r.paused,
1185 last_run_at: r.last_run_at,
1186 next_run_at: r.next_run_at,
1187 last_workflow_id: r.last_workflow_id,
1188 created_at: r.created_at,
1189 }
1190 }
1191}
1192
1193#[derive(sqlx::FromRow)]
1194struct PgWorkerRow {
1195 id: String,
1196 namespace: String,
1197 identity: String,
1198 task_queue: String,
1199 workflows: Option<String>,
1200 activities: Option<String>,
1201 max_concurrent_workflows: i32,
1202 max_concurrent_activities: i32,
1203 active_tasks: i32,
1204 last_heartbeat: f64,
1205 registered_at: f64,
1206}
1207
1208impl From<PgWorkerRow> for WorkflowWorker {
1209 fn from(r: PgWorkerRow) -> Self {
1210 Self {
1211 id: r.id,
1212 namespace: r.namespace,
1213 identity: r.identity,
1214 task_queue: r.task_queue,
1215 workflows: r.workflows,
1216 activities: r.activities,
1217 max_concurrent_workflows: r.max_concurrent_workflows,
1218 max_concurrent_activities: r.max_concurrent_activities,
1219 active_tasks: r.active_tasks,
1220 last_heartbeat: r.last_heartbeat,
1221 registered_at: r.registered_at,
1222 }
1223 }
1224}