1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9 name TEXT PRIMARY KEY,
10 created_at DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13 VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14 ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17 id TEXT PRIMARY KEY,
18 namespace TEXT NOT NULL DEFAULT 'main',
19 run_id TEXT NOT NULL,
20 workflow_type TEXT NOT NULL,
21 task_queue TEXT NOT NULL DEFAULT 'main',
22 status TEXT NOT NULL DEFAULT 'PENDING',
23 input TEXT,
24 result TEXT,
25 error TEXT,
26 parent_id TEXT,
27 claimed_by TEXT,
28 search_attributes TEXT,
29 archived_at DOUBLE PRECISION,
30 archive_uri TEXT,
31 -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
32 needs_dispatch BOOLEAN NOT NULL DEFAULT FALSE,
33 dispatch_claimed_by TEXT,
34 dispatch_last_heartbeat DOUBLE PRECISION,
35 created_at DOUBLE PRECISION NOT NULL,
36 updated_at DOUBLE PRECISION NOT NULL,
37 completed_at DOUBLE PRECISION
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44 id BIGSERIAL PRIMARY KEY,
45 workflow_id TEXT NOT NULL REFERENCES workflows(id),
46 seq INTEGER NOT NULL,
47 event_type TEXT NOT NULL,
48 payload TEXT,
49 timestamp DOUBLE PRECISION NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54 id BIGSERIAL PRIMARY KEY,
55 workflow_id TEXT NOT NULL REFERENCES workflows(id),
56 seq INTEGER NOT NULL,
57 name TEXT NOT NULL,
58 task_queue TEXT NOT NULL DEFAULT 'main',
59 input TEXT,
60 status TEXT NOT NULL DEFAULT 'PENDING',
61 result TEXT,
62 error TEXT,
63 attempt INTEGER NOT NULL DEFAULT 1,
64 max_attempts INTEGER NOT NULL DEFAULT 3,
65 initial_interval_secs DOUBLE PRECISION NOT NULL DEFAULT 1,
66 backoff_coefficient DOUBLE PRECISION NOT NULL DEFAULT 2,
67 start_to_close_secs DOUBLE PRECISION NOT NULL DEFAULT 300,
68 heartbeat_timeout_secs DOUBLE PRECISION,
69 claimed_by TEXT,
70 scheduled_at DOUBLE PRECISION NOT NULL,
71 started_at DOUBLE PRECISION,
72 completed_at DOUBLE PRECISION,
73 last_heartbeat DOUBLE PRECISION,
74 UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79 id BIGSERIAL PRIMARY KEY,
80 workflow_id TEXT NOT NULL REFERENCES workflows(id),
81 seq INTEGER NOT NULL,
82 fire_at DOUBLE PRECISION NOT NULL,
83 fired BOOLEAN NOT NULL DEFAULT FALSE,
84 UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89 id BIGSERIAL PRIMARY KEY,
90 workflow_id TEXT NOT NULL REFERENCES workflows(id),
91 name TEXT NOT NULL,
92 payload TEXT,
93 consumed BOOLEAN NOT NULL DEFAULT FALSE,
94 received_at DOUBLE PRECISION NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99 namespace TEXT NOT NULL DEFAULT 'main',
100 name TEXT NOT NULL,
101 workflow_type TEXT NOT NULL,
102 cron_expr TEXT NOT NULL,
103 timezone TEXT NOT NULL DEFAULT 'UTC',
104 input TEXT,
105 task_queue TEXT NOT NULL DEFAULT 'main',
106 overlap_policy TEXT NOT NULL DEFAULT 'skip',
107 paused BOOLEAN NOT NULL DEFAULT FALSE,
108 last_run_at DOUBLE PRECISION,
109 next_run_at DOUBLE PRECISION,
110 last_workflow_id TEXT,
111 created_at DOUBLE PRECISION NOT NULL,
112 PRIMARY KEY (namespace, name)
113);
114
115CREATE TABLE IF NOT EXISTS workflow_workers (
116 id TEXT PRIMARY KEY,
117 namespace TEXT NOT NULL DEFAULT 'main',
118 identity TEXT NOT NULL,
119 task_queue TEXT NOT NULL,
120 workflows TEXT,
121 activities TEXT,
122 max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
123 max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
124 active_tasks INTEGER NOT NULL DEFAULT 0,
125 last_heartbeat DOUBLE PRECISION NOT NULL,
126 registered_at DOUBLE PRECISION NOT NULL
127);
128
129CREATE TABLE IF NOT EXISTS workflow_snapshots (
130 workflow_id TEXT NOT NULL REFERENCES workflows(id),
131 event_seq INTEGER NOT NULL,
132 state_json TEXT NOT NULL,
133 created_at DOUBLE PRECISION NOT NULL,
134 PRIMARY KEY (workflow_id, event_seq)
135);
136
137CREATE TABLE IF NOT EXISTS api_keys (
138 key_hash TEXT PRIMARY KEY,
139 prefix TEXT NOT NULL,
140 label TEXT,
141 created_at DOUBLE PRECISION NOT NULL
142);
143CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
144
145-- Future additive migrations go below this line. Postgres supports
146-- `ADD COLUMN IF NOT EXISTS` natively, so the pattern is simply:
147--
148-- ALTER TABLE workflows ADD COLUMN IF NOT EXISTS some_new_field TEXT;
149--
150-- Idempotent across startups; fresh installs pick the column up from the
151-- CREATE TABLE above so the ADD is a no-op. Currently no pending
152-- migrations — baseline schema in CREATE TABLE statements above is the
153-- source of truth through v0.11.3.
154"#;
155
156fn sanitise_schema(schema: &str) -> Vec<String> {
168 let without_comments: String = schema
169 .lines()
170 .filter(|line| !line.trim_start().starts_with("--"))
171 .collect::<Vec<_>>()
172 .join("\n");
173
174 without_comments
175 .split(';')
176 .map(|s| s.trim().to_string())
177 .filter(|s| !s.is_empty())
178 .collect()
179}
180
181pub struct PostgresStore {
182 pool: PgPool,
183}
184
185impl PostgresStore {
186 pub async fn new(url: &str) -> Result<Self> {
187 let pool = PgPool::connect(url).await?;
188 let store = Self { pool };
189 store.migrate().await?;
190 Ok(store)
191 }
192
193 async fn migrate(&self) -> Result<()> {
194 for statement in sanitise_schema(SCHEMA) {
195 sqlx::query(&statement).execute(&self.pool).await?;
196 }
197 Ok(())
198 }
199
200 pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
203 let row: (bool,) =
204 sqlx::query_as("SELECT pg_try_advisory_lock(1)")
205 .fetch_one(&self.pool)
206 .await?;
207 Ok(row.0)
208 }
209}
210
211impl WorkflowStore for PostgresStore {
212 async fn create_namespace(&self, name: &str) -> Result<()> {
215 sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
216 .bind(name)
217 .execute(&self.pool)
218 .await?;
219 Ok(())
220 }
221
222 async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
223 let rows = sqlx::query_as::<_, (String, f64)>(
224 "SELECT name, created_at FROM namespaces ORDER BY name",
225 )
226 .fetch_all(&self.pool)
227 .await?;
228 Ok(rows
229 .into_iter()
230 .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
231 .collect())
232 }
233
234 async fn delete_namespace(&self, name: &str) -> Result<bool> {
235 let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
236 .bind(name)
237 .execute(&self.pool)
238 .await?;
239 Ok(res.rows_affected() > 0)
240 }
241
242 async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
243 let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
244 .bind(namespace)
245 .fetch_one(&self.pool)
246 .await?;
247 let running: (i64,) = sqlx::query_as(
248 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
249 )
250 .bind(namespace)
251 .fetch_one(&self.pool)
252 .await?;
253 let pending: (i64,) = sqlx::query_as(
254 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
255 )
256 .bind(namespace)
257 .fetch_one(&self.pool)
258 .await?;
259 let completed: (i64,) = sqlx::query_as(
260 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
261 )
262 .bind(namespace)
263 .fetch_one(&self.pool)
264 .await?;
265 let failed: (i64,) = sqlx::query_as(
266 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
267 )
268 .bind(namespace)
269 .fetch_one(&self.pool)
270 .await?;
271 let schedules: (i64,) =
272 sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
273 .bind(namespace)
274 .fetch_one(&self.pool)
275 .await?;
276 let workers: (i64,) =
277 sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
278 .bind(namespace)
279 .fetch_one(&self.pool)
280 .await?;
281
282 Ok(crate::store::NamespaceStats {
283 namespace: namespace.to_string(),
284 total_workflows: total.0,
285 running: running.0,
286 pending: pending.0,
287 completed: completed.0,
288 failed: failed.0,
289 schedules: schedules.0,
290 workers: workers.0,
291 })
292 }
293
294 async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
297 sqlx::query(
298 "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
299 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
300 )
301 .bind(&wf.id)
302 .bind(&wf.namespace)
303 .bind(&wf.run_id)
304 .bind(&wf.workflow_type)
305 .bind(&wf.task_queue)
306 .bind(&wf.status)
307 .bind(&wf.input)
308 .bind(&wf.result)
309 .bind(&wf.error)
310 .bind(&wf.parent_id)
311 .bind(&wf.claimed_by)
312 .bind(&wf.search_attributes)
313 .bind(wf.archived_at)
314 .bind(&wf.archive_uri)
315 .bind(wf.created_at)
316 .bind(wf.updated_at)
317 .bind(wf.completed_at)
318 .execute(&self.pool)
319 .await?;
320 Ok(())
321 }
322
323 async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
324 let row = sqlx::query_as::<_, PgWorkflowRow>(
325 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
326 )
327 .bind(id)
328 .fetch_optional(&self.pool)
329 .await?;
330 Ok(row.map(Into::into))
331 }
332
333 async fn list_workflows(
334 &self,
335 namespace: &str,
336 status: Option<WorkflowStatus>,
337 workflow_type: Option<&str>,
338 search_attrs_filter: Option<&str>,
339 limit: i64,
340 offset: i64,
341 ) -> Result<Vec<WorkflowRecord>> {
342 let status_str = status.map(|s| s.to_string());
343
344 let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
345 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
346 .and_then(|v| v.as_object().cloned())
347 .map(|m| m.into_iter().collect())
348 .unwrap_or_default();
349
350 let mut sql = String::from(
351 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
352 FROM workflows
353 WHERE namespace = $1
354 AND ($2::TEXT IS NULL OR status = $2)
355 AND ($3::TEXT IS NULL OR workflow_type = $3)",
356 );
357 let mut idx = 4usize;
359 for _ in &filter_pairs {
360 sql.push_str(&format!(
361 " AND (search_attributes::jsonb)->>${} = ${}",
362 idx,
363 idx + 1
364 ));
365 idx += 2;
366 }
367 sql.push_str(&format!(" ORDER BY created_at DESC LIMIT ${} OFFSET ${}", idx, idx + 1));
368
369 let mut q = sqlx::query_as::<_, PgWorkflowRow>(&sql)
370 .bind(namespace)
371 .bind(&status_str)
372 .bind(workflow_type);
373 for (key, value) in &filter_pairs {
374 q = q.bind(key.clone());
375 let as_text = match value {
377 serde_json::Value::String(s) => s.clone(),
378 other => other.to_string(),
379 };
380 q = q.bind(as_text);
381 }
382 let rows = q.bind(limit).bind(offset).fetch_all(&self.pool).await?;
383 Ok(rows.into_iter().map(Into::into).collect())
384 }
385
386 async fn update_workflow_status(
387 &self,
388 id: &str,
389 status: WorkflowStatus,
390 result: Option<&str>,
391 error: Option<&str>,
392 ) -> Result<()> {
393 let now = timestamp_now();
394 let completed_at = if status.is_terminal() { Some(now) } else { None };
395 sqlx::query(
396 "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
397 )
398 .bind(status.to_string())
399 .bind(result)
400 .bind(error)
401 .bind(now)
402 .bind(completed_at)
403 .bind(id)
404 .execute(&self.pool)
405 .await?;
406 Ok(())
407 }
408
409 async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
410 let res = sqlx::query(
411 "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
412 )
413 .bind(worker_id)
414 .bind(timestamp_now())
415 .bind(id)
416 .execute(&self.pool)
417 .await?;
418 Ok(res.rows_affected() > 0)
419 }
420
421 async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
422 sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
423 .bind(workflow_id)
424 .execute(&self.pool)
425 .await?;
426 Ok(())
427 }
428
429 async fn claim_workflow_task(
430 &self,
431 task_queue: &str,
432 worker_id: &str,
433 ) -> Result<Option<WorkflowRecord>> {
434 let now = timestamp_now();
435 let row = sqlx::query_as::<_, PgWorkflowRow>(
438 "UPDATE workflows
439 SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
440 WHERE id = (
441 SELECT id FROM workflows
442 WHERE task_queue = $3
443 AND needs_dispatch = TRUE
444 AND dispatch_claimed_by IS NULL
445 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
446 ORDER BY updated_at ASC
447 FOR UPDATE SKIP LOCKED
448 LIMIT 1
449 )
450 RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
451 )
452 .bind(worker_id)
453 .bind(now)
454 .bind(task_queue)
455 .fetch_optional(&self.pool)
456 .await?;
457 Ok(row.map(Into::into))
458 }
459
460 async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
461 sqlx::query(
462 "UPDATE workflows
463 SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
464 WHERE id = $1 AND dispatch_claimed_by = $2",
465 )
466 .bind(workflow_id)
467 .bind(worker_id)
468 .execute(&self.pool)
469 .await?;
470 Ok(())
471 }
472
473 async fn release_stale_dispatch_leases(
474 &self,
475 now: f64,
476 timeout_secs: f64,
477 ) -> Result<u64> {
478 let res = sqlx::query(
479 "UPDATE workflows
480 SET dispatch_claimed_by = NULL,
481 dispatch_last_heartbeat = NULL,
482 needs_dispatch = TRUE
483 WHERE dispatch_claimed_by IS NOT NULL
484 AND ($1 - dispatch_last_heartbeat) > $2
485 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
486 )
487 .bind(now)
488 .bind(timeout_secs)
489 .execute(&self.pool)
490 .await?;
491 Ok(res.rows_affected())
492 }
493
494 async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
497 let row: (i64,) = sqlx::query_as(
498 "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
499 )
500 .bind(&ev.workflow_id)
501 .bind(ev.seq)
502 .bind(&ev.event_type)
503 .bind(&ev.payload)
504 .bind(ev.timestamp)
505 .fetch_one(&self.pool)
506 .await?;
507 Ok(row.0)
508 }
509
510 async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
511 let rows = sqlx::query_as::<_, PgEventRow>(
512 "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
513 )
514 .bind(workflow_id)
515 .fetch_all(&self.pool)
516 .await?;
517 Ok(rows.into_iter().map(Into::into).collect())
518 }
519
520 async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
521 let row: (i64,) =
522 sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
523 .bind(workflow_id)
524 .fetch_one(&self.pool)
525 .await?;
526 Ok(row.0)
527 }
528
529 async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
532 let row: (i64,) = sqlx::query_as(
533 "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
534 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
535 )
536 .bind(&act.workflow_id)
537 .bind(act.seq)
538 .bind(&act.name)
539 .bind(&act.task_queue)
540 .bind(&act.input)
541 .bind(&act.status)
542 .bind(act.attempt)
543 .bind(act.max_attempts)
544 .bind(act.initial_interval_secs)
545 .bind(act.backoff_coefficient)
546 .bind(act.start_to_close_secs)
547 .bind(act.heartbeat_timeout_secs)
548 .bind(act.scheduled_at)
549 .fetch_one(&self.pool)
550 .await?;
551 Ok(row.0)
552 }
553
554 async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
555 let row = sqlx::query_as::<_, PgActivityRow>(
556 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
557 FROM workflow_activities WHERE id = $1",
558 )
559 .bind(id)
560 .fetch_optional(&self.pool)
561 .await?;
562 Ok(row.map(Into::into))
563 }
564
565 async fn get_activity_by_workflow_seq(
566 &self,
567 workflow_id: &str,
568 seq: i32,
569 ) -> Result<Option<WorkflowActivity>> {
570 let row = sqlx::query_as::<_, PgActivityRow>(
571 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
572 FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
573 )
574 .bind(workflow_id)
575 .bind(seq)
576 .fetch_optional(&self.pool)
577 .await?;
578 Ok(row.map(Into::into))
579 }
580
581 async fn claim_activity(
582 &self,
583 task_queue: &str,
584 worker_id: &str,
585 ) -> Result<Option<WorkflowActivity>> {
586 let now = timestamp_now();
587 let row = sqlx::query_as::<_, PgActivityRow>(
590 "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
591 WHERE id = (
592 SELECT id FROM workflow_activities
593 WHERE task_queue = $3 AND status = 'PENDING'
594 ORDER BY scheduled_at ASC
595 FOR UPDATE SKIP LOCKED
596 LIMIT 1
597 )
598 RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
599 )
600 .bind(worker_id)
601 .bind(now)
602 .bind(task_queue)
603 .fetch_optional(&self.pool)
604 .await?;
605 Ok(row.map(Into::into))
606 }
607
608 async fn requeue_activity_for_retry(
609 &self,
610 id: i64,
611 next_attempt: i32,
612 next_scheduled_at: f64,
613 ) -> Result<()> {
614 sqlx::query(
615 "UPDATE workflow_activities
616 SET status = 'PENDING', attempt = $1, scheduled_at = $2,
617 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
618 error = NULL
619 WHERE id = $3",
620 )
621 .bind(next_attempt)
622 .bind(next_scheduled_at)
623 .bind(id)
624 .execute(&self.pool)
625 .await?;
626 Ok(())
627 }
628
629 async fn complete_activity(
630 &self,
631 id: i64,
632 result: Option<&str>,
633 error: Option<&str>,
634 failed: bool,
635 ) -> Result<()> {
636 let status = if failed { "FAILED" } else { "COMPLETED" };
637 sqlx::query(
638 "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
639 )
640 .bind(status)
641 .bind(result)
642 .bind(error)
643 .bind(timestamp_now())
644 .bind(id)
645 .execute(&self.pool)
646 .await?;
647 Ok(())
648 }
649
650 async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
651 sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
652 .bind(timestamp_now())
653 .bind(id)
654 .execute(&self.pool)
655 .await?;
656 Ok(())
657 }
658
659 async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
660 let rows = sqlx::query_as::<_, PgActivityRow>(
661 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
662 FROM workflow_activities
663 WHERE status = 'RUNNING'
664 AND heartbeat_timeout_secs IS NOT NULL
665 AND last_heartbeat IS NOT NULL
666 AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
667 )
668 .bind(now)
669 .fetch_all(&self.pool)
670 .await?;
671 Ok(rows.into_iter().map(Into::into).collect())
672 }
673
674 async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
677 let row: (i64,) = sqlx::query_as(
678 "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
679 )
680 .bind(&timer.workflow_id)
681 .bind(timer.seq)
682 .bind(timer.fire_at)
683 .fetch_one(&self.pool)
684 .await?;
685 Ok(row.0)
686 }
687
688 async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
689 let res = sqlx::query(
690 "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
691 WHERE workflow_id = $2 AND status = 'PENDING'",
692 )
693 .bind(timestamp_now())
694 .bind(workflow_id)
695 .execute(&self.pool)
696 .await?;
697 Ok(res.rows_affected())
698 }
699
700 async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
701 let res = sqlx::query(
702 "UPDATE workflow_timers SET fired = TRUE
703 WHERE workflow_id = $1 AND fired = FALSE",
704 )
705 .bind(workflow_id)
706 .execute(&self.pool)
707 .await?;
708 Ok(res.rows_affected())
709 }
710
711 async fn get_timer_by_workflow_seq(
712 &self,
713 workflow_id: &str,
714 seq: i32,
715 ) -> Result<Option<WorkflowTimer>> {
716 let row = sqlx::query_as::<_, PgTimerRow>(
717 "SELECT id, workflow_id, seq, fire_at, fired
718 FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
719 )
720 .bind(workflow_id)
721 .bind(seq)
722 .fetch_optional(&self.pool)
723 .await?;
724 Ok(row.map(Into::into))
725 }
726
727 async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
728 let rows = sqlx::query_as::<_, PgTimerRow>(
729 "UPDATE workflow_timers SET fired = TRUE
730 WHERE fired = FALSE AND fire_at <= $1
731 RETURNING id, workflow_id, seq, fire_at, fired",
732 )
733 .bind(now)
734 .fetch_all(&self.pool)
735 .await?;
736 Ok(rows.into_iter().map(Into::into).collect())
737 }
738
739 async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
742 let row: (i64,) = sqlx::query_as(
743 "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
744 )
745 .bind(&sig.workflow_id)
746 .bind(&sig.name)
747 .bind(&sig.payload)
748 .bind(sig.received_at)
749 .fetch_one(&self.pool)
750 .await?;
751 Ok(row.0)
752 }
753
754 async fn consume_signals(
755 &self,
756 workflow_id: &str,
757 name: &str,
758 ) -> Result<Vec<WorkflowSignal>> {
759 let rows = sqlx::query_as::<_, PgSignalRow>(
760 "UPDATE workflow_signals SET consumed = TRUE
761 WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
762 RETURNING id, workflow_id, name, payload, consumed, received_at",
763 )
764 .bind(workflow_id)
765 .bind(name)
766 .fetch_all(&self.pool)
767 .await?;
768 Ok(rows.into_iter().map(Into::into).collect())
769 }
770
771 async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
774 sqlx::query(
775 "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
776 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
777 )
778 .bind(&sched.namespace)
779 .bind(&sched.name)
780 .bind(&sched.workflow_type)
781 .bind(&sched.cron_expr)
782 .bind(&sched.timezone)
783 .bind(&sched.input)
784 .bind(&sched.task_queue)
785 .bind(&sched.overlap_policy)
786 .bind(sched.paused)
787 .bind(sched.last_run_at)
788 .bind(sched.next_run_at)
789 .bind(&sched.last_workflow_id)
790 .bind(sched.created_at)
791 .execute(&self.pool)
792 .await?;
793 Ok(())
794 }
795
796 async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
797 let row = sqlx::query_as::<_, PgScheduleRow>(
798 "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
799 )
800 .bind(namespace)
801 .bind(name)
802 .fetch_optional(&self.pool)
803 .await?;
804 Ok(row.map(Into::into))
805 }
806
807 async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
808 let rows = sqlx::query_as::<_, PgScheduleRow>(
809 "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
810 )
811 .bind(namespace)
812 .fetch_all(&self.pool)
813 .await?;
814 Ok(rows.into_iter().map(Into::into).collect())
815 }
816
817 async fn update_schedule_last_run(
818 &self,
819 namespace: &str,
820 name: &str,
821 last_run_at: f64,
822 next_run_at: f64,
823 workflow_id: &str,
824 ) -> Result<()> {
825 sqlx::query(
826 "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
827 )
828 .bind(last_run_at)
829 .bind(next_run_at)
830 .bind(workflow_id)
831 .bind(namespace)
832 .bind(name)
833 .execute(&self.pool)
834 .await?;
835 Ok(())
836 }
837
838 async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
839 let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
840 .bind(namespace)
841 .bind(name)
842 .execute(&self.pool)
843 .await?;
844 Ok(res.rows_affected() > 0)
845 }
846
847 async fn list_archivable_workflows(
848 &self,
849 cutoff: f64,
850 limit: i64,
851 ) -> Result<Vec<WorkflowRecord>> {
852 let rows = sqlx::query_as::<_, PgWorkflowRow>(
853 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
854 FROM workflows
855 WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
856 AND completed_at IS NOT NULL
857 AND completed_at < $1
858 AND archived_at IS NULL
859 ORDER BY completed_at ASC
860 LIMIT $2",
861 )
862 .bind(cutoff)
863 .bind(limit)
864 .fetch_all(&self.pool)
865 .await?;
866 Ok(rows.into_iter().map(Into::into).collect())
867 }
868
869 async fn mark_archived_and_purge(
870 &self,
871 workflow_id: &str,
872 archive_uri: &str,
873 archived_at: f64,
874 ) -> Result<()> {
875 let mut tx = self.pool.begin().await?;
876 sqlx::query("DELETE FROM workflow_events WHERE workflow_id = $1")
877 .bind(workflow_id)
878 .execute(&mut *tx)
879 .await?;
880 sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = $1")
881 .bind(workflow_id)
882 .execute(&mut *tx)
883 .await?;
884 sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = $1")
885 .bind(workflow_id)
886 .execute(&mut *tx)
887 .await?;
888 sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = $1")
889 .bind(workflow_id)
890 .execute(&mut *tx)
891 .await?;
892 sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = $1")
893 .bind(workflow_id)
894 .execute(&mut *tx)
895 .await?;
896 sqlx::query(
897 "UPDATE workflows SET archived_at = $1, archive_uri = $2 WHERE id = $3",
898 )
899 .bind(archived_at)
900 .bind(archive_uri)
901 .bind(workflow_id)
902 .execute(&mut *tx)
903 .await?;
904 tx.commit().await?;
905 Ok(())
906 }
907
908 async fn upsert_search_attributes(
909 &self,
910 workflow_id: &str,
911 patch_json: &str,
912 ) -> Result<()> {
913 let current: Option<(Option<String>,)> =
914 sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = $1")
915 .bind(workflow_id)
916 .fetch_optional(&self.pool)
917 .await?;
918 let merged = crate::store::sqlite::merge_search_attrs(
919 current.and_then(|(s,)| s).as_deref(),
920 patch_json,
921 )?;
922 sqlx::query("UPDATE workflows SET search_attributes = $1 WHERE id = $2")
923 .bind(merged)
924 .bind(workflow_id)
925 .execute(&self.pool)
926 .await?;
927 Ok(())
928 }
929
930 async fn update_schedule(
931 &self,
932 namespace: &str,
933 name: &str,
934 patch: &SchedulePatch,
935 ) -> Result<Option<WorkflowSchedule>> {
936 let mut sets: Vec<String> = Vec::new();
937 let mut idx = 1usize;
938 if patch.cron_expr.is_some() {
939 sets.push(format!("cron_expr = ${idx}"));
940 idx += 1;
941 }
942 if patch.timezone.is_some() {
943 sets.push(format!("timezone = ${idx}"));
944 idx += 1;
945 }
946 if patch.input.is_some() {
947 sets.push(format!("input = ${idx}"));
948 idx += 1;
949 }
950 if patch.task_queue.is_some() {
951 sets.push(format!("task_queue = ${idx}"));
952 idx += 1;
953 }
954 if patch.overlap_policy.is_some() {
955 sets.push(format!("overlap_policy = ${idx}"));
956 idx += 1;
957 }
958 if sets.is_empty() {
959 return self.get_schedule(namespace, name).await;
960 }
961 let sql = format!(
962 "UPDATE workflow_schedules SET {} WHERE namespace = ${} AND name = ${}",
963 sets.join(", "),
964 idx,
965 idx + 1
966 );
967 let mut q = sqlx::query(&sql);
968 if let Some(ref v) = patch.cron_expr {
969 q = q.bind(v);
970 }
971 if let Some(ref v) = patch.timezone {
972 q = q.bind(v);
973 }
974 if let Some(ref v) = patch.input {
975 q = q.bind(v.to_string());
976 }
977 if let Some(ref v) = patch.task_queue {
978 q = q.bind(v);
979 }
980 if let Some(ref v) = patch.overlap_policy {
981 q = q.bind(v);
982 }
983 let res = q
984 .bind(namespace)
985 .bind(name)
986 .execute(&self.pool)
987 .await?;
988 if res.rows_affected() == 0 {
989 return Ok(None);
990 }
991 self.get_schedule(namespace, name).await
992 }
993
994 async fn set_schedule_paused(
995 &self,
996 namespace: &str,
997 name: &str,
998 paused: bool,
999 ) -> Result<Option<WorkflowSchedule>> {
1000 let res = sqlx::query(
1001 "UPDATE workflow_schedules SET paused = $1 WHERE namespace = $2 AND name = $3",
1002 )
1003 .bind(paused)
1004 .bind(namespace)
1005 .bind(name)
1006 .execute(&self.pool)
1007 .await?;
1008 if res.rows_affected() == 0 {
1009 return Ok(None);
1010 }
1011 self.get_schedule(namespace, name).await
1012 }
1013
1014 async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1017 sqlx::query(
1018 "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1019 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1020 ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
1021 )
1022 .bind(&w.id)
1023 .bind(&w.namespace)
1024 .bind(&w.identity)
1025 .bind(&w.task_queue)
1026 .bind(&w.workflows)
1027 .bind(&w.activities)
1028 .bind(w.max_concurrent_workflows)
1029 .bind(w.max_concurrent_activities)
1030 .bind(w.active_tasks)
1031 .bind(w.last_heartbeat)
1032 .bind(w.registered_at)
1033 .execute(&self.pool)
1034 .await?;
1035 Ok(())
1036 }
1037
1038 async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1039 sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
1040 .bind(now)
1041 .bind(id)
1042 .execute(&self.pool)
1043 .await?;
1044 Ok(())
1045 }
1046
1047 async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1048 let rows = sqlx::query_as::<_, PgWorkerRow>(
1049 "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
1050 )
1051 .bind(namespace)
1052 .fetch_all(&self.pool)
1053 .await?;
1054 Ok(rows.into_iter().map(Into::into).collect())
1055 }
1056
1057 async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1058 let rows: Vec<(String,)> =
1059 sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
1060 .bind(cutoff)
1061 .fetch_all(&self.pool)
1062 .await?;
1063 let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1064 if !ids.is_empty() {
1065 sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
1066 .bind(cutoff)
1067 .execute(&self.pool)
1068 .await?;
1069 }
1070 Ok(ids)
1071 }
1072
1073 async fn create_api_key(
1076 &self,
1077 key_hash: &str,
1078 prefix: &str,
1079 label: Option<&str>,
1080 created_at: f64,
1081 ) -> Result<()> {
1082 sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
1083 .bind(key_hash)
1084 .bind(prefix)
1085 .bind(label)
1086 .bind(created_at)
1087 .execute(&self.pool)
1088 .await?;
1089 Ok(())
1090 }
1091
1092 async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1093 let row: Option<(i64,)> =
1094 sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
1095 .bind(key_hash)
1096 .fetch_optional(&self.pool)
1097 .await?;
1098 Ok(row.is_some())
1099 }
1100
1101 async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
1102 let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1103 "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1104 )
1105 .fetch_all(&self.pool)
1106 .await?;
1107 Ok(rows
1108 .into_iter()
1109 .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1110 prefix,
1111 label,
1112 created_at,
1113 })
1114 .collect())
1115 }
1116
1117 async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1118 let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
1119 .bind(prefix)
1120 .execute(&self.pool)
1121 .await?;
1122 Ok(res.rows_affected() > 0)
1123 }
1124
1125 async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1128 let rows = sqlx::query_as::<_, PgWorkflowRow>(
1129 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1130 FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
1131 )
1132 .bind(parent_id)
1133 .fetch_all(&self.pool)
1134 .await?;
1135 Ok(rows.into_iter().map(Into::into).collect())
1136 }
1137
1138 async fn create_snapshot(
1141 &self,
1142 workflow_id: &str,
1143 event_seq: i32,
1144 state_json: &str,
1145 ) -> Result<()> {
1146 sqlx::query(
1147 "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1148 VALUES ($1, $2, $3, $4)
1149 ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
1150 )
1151 .bind(workflow_id)
1152 .bind(event_seq)
1153 .bind(state_json)
1154 .bind(timestamp_now())
1155 .execute(&self.pool)
1156 .await?;
1157 Ok(())
1158 }
1159
1160 async fn get_latest_snapshot(
1161 &self,
1162 workflow_id: &str,
1163 ) -> Result<Option<WorkflowSnapshot>> {
1164 let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1165 "SELECT workflow_id, event_seq, state_json, created_at
1166 FROM workflow_snapshots WHERE workflow_id = $1
1167 ORDER BY event_seq DESC LIMIT 1",
1168 )
1169 .bind(workflow_id)
1170 .fetch_optional(&self.pool)
1171 .await?;
1172
1173 Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1174 workflow_id,
1175 event_seq,
1176 state_json,
1177 created_at,
1178 }))
1179 }
1180
1181 async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1184 let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
1185 "SELECT
1186 a.task_queue AS queue,
1187 SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
1188 SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
1189 (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
1190 FROM workflow_activities a
1191 JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
1192 GROUP BY a.task_queue",
1193 )
1194 .bind(namespace)
1195 .fetch_all(&self.pool)
1196 .await?;
1197
1198 Ok(rows
1199 .into_iter()
1200 .map(|(queue, pending, running, workers)| crate::store::QueueStats {
1201 queue,
1202 pending_activities: pending,
1203 running_activities: running,
1204 workers,
1205 })
1206 .collect())
1207 }
1208
1209 async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1212 let row: (bool,) =
1216 sqlx::query_as("SELECT pg_try_advisory_lock(42)")
1217 .fetch_one(&self.pool)
1218 .await?;
1219 Ok(row.0)
1220 }
1221}
1222
1223fn timestamp_now() -> f64 {
1224 std::time::SystemTime::now()
1225 .duration_since(std::time::UNIX_EPOCH)
1226 .unwrap()
1227 .as_secs_f64()
1228}
1229
1230#[derive(sqlx::FromRow)]
1233struct PgWorkflowRow {
1234 id: String,
1235 namespace: String,
1236 run_id: String,
1237 workflow_type: String,
1238 task_queue: String,
1239 status: String,
1240 input: Option<String>,
1241 result: Option<String>,
1242 error: Option<String>,
1243 parent_id: Option<String>,
1244 claimed_by: Option<String>,
1245 search_attributes: Option<String>,
1246 archived_at: Option<f64>,
1247 archive_uri: Option<String>,
1248 created_at: f64,
1249 updated_at: f64,
1250 completed_at: Option<f64>,
1251}
1252
1253impl From<PgWorkflowRow> for WorkflowRecord {
1254 fn from(r: PgWorkflowRow) -> Self {
1255 Self {
1256 id: r.id,
1257 namespace: r.namespace,
1258 run_id: r.run_id,
1259 workflow_type: r.workflow_type,
1260 task_queue: r.task_queue,
1261 status: r.status,
1262 input: r.input,
1263 result: r.result,
1264 error: r.error,
1265 parent_id: r.parent_id,
1266 claimed_by: r.claimed_by,
1267 search_attributes: r.search_attributes,
1268 archived_at: r.archived_at,
1269 archive_uri: r.archive_uri,
1270 created_at: r.created_at,
1271 updated_at: r.updated_at,
1272 completed_at: r.completed_at,
1273 }
1274 }
1275}
1276
1277#[derive(sqlx::FromRow)]
1278struct PgEventRow {
1279 id: i64,
1280 workflow_id: String,
1281 seq: i32,
1282 event_type: String,
1283 payload: Option<String>,
1284 timestamp: f64,
1285}
1286
1287impl From<PgEventRow> for WorkflowEvent {
1288 fn from(r: PgEventRow) -> Self {
1289 Self {
1290 id: Some(r.id),
1291 workflow_id: r.workflow_id,
1292 seq: r.seq,
1293 event_type: r.event_type,
1294 payload: r.payload,
1295 timestamp: r.timestamp,
1296 }
1297 }
1298}
1299
1300#[derive(sqlx::FromRow)]
1301struct PgActivityRow {
1302 id: i64,
1303 workflow_id: String,
1304 seq: i32,
1305 name: String,
1306 task_queue: String,
1307 input: Option<String>,
1308 status: String,
1309 result: Option<String>,
1310 error: Option<String>,
1311 attempt: i32,
1312 max_attempts: i32,
1313 initial_interval_secs: f64,
1314 backoff_coefficient: f64,
1315 start_to_close_secs: f64,
1316 heartbeat_timeout_secs: Option<f64>,
1317 claimed_by: Option<String>,
1318 scheduled_at: f64,
1319 started_at: Option<f64>,
1320 completed_at: Option<f64>,
1321 last_heartbeat: Option<f64>,
1322}
1323
1324impl From<PgActivityRow> for WorkflowActivity {
1325 fn from(r: PgActivityRow) -> Self {
1326 Self {
1327 id: Some(r.id),
1328 workflow_id: r.workflow_id,
1329 seq: r.seq,
1330 name: r.name,
1331 task_queue: r.task_queue,
1332 input: r.input,
1333 status: r.status,
1334 result: r.result,
1335 error: r.error,
1336 attempt: r.attempt,
1337 max_attempts: r.max_attempts,
1338 initial_interval_secs: r.initial_interval_secs,
1339 backoff_coefficient: r.backoff_coefficient,
1340 start_to_close_secs: r.start_to_close_secs,
1341 heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1342 claimed_by: r.claimed_by,
1343 scheduled_at: r.scheduled_at,
1344 started_at: r.started_at,
1345 completed_at: r.completed_at,
1346 last_heartbeat: r.last_heartbeat,
1347 }
1348 }
1349}
1350
1351#[derive(sqlx::FromRow)]
1352struct PgTimerRow {
1353 id: i64,
1354 workflow_id: String,
1355 seq: i32,
1356 fire_at: f64,
1357 fired: bool,
1358}
1359
1360impl From<PgTimerRow> for WorkflowTimer {
1361 fn from(r: PgTimerRow) -> Self {
1362 Self {
1363 id: Some(r.id),
1364 workflow_id: r.workflow_id,
1365 seq: r.seq,
1366 fire_at: r.fire_at,
1367 fired: r.fired,
1368 }
1369 }
1370}
1371
1372#[derive(sqlx::FromRow)]
1373struct PgSignalRow {
1374 id: i64,
1375 workflow_id: String,
1376 name: String,
1377 payload: Option<String>,
1378 consumed: bool,
1379 received_at: f64,
1380}
1381
1382impl From<PgSignalRow> for WorkflowSignal {
1383 fn from(r: PgSignalRow) -> Self {
1384 Self {
1385 id: Some(r.id),
1386 workflow_id: r.workflow_id,
1387 name: r.name,
1388 payload: r.payload,
1389 consumed: r.consumed,
1390 received_at: r.received_at,
1391 }
1392 }
1393}
1394
1395#[derive(sqlx::FromRow)]
1396struct PgScheduleRow {
1397 namespace: String,
1398 name: String,
1399 workflow_type: String,
1400 cron_expr: String,
1401 timezone: String,
1402 input: Option<String>,
1403 task_queue: String,
1404 overlap_policy: String,
1405 paused: bool,
1406 last_run_at: Option<f64>,
1407 next_run_at: Option<f64>,
1408 last_workflow_id: Option<String>,
1409 created_at: f64,
1410}
1411
1412impl From<PgScheduleRow> for WorkflowSchedule {
1413 fn from(r: PgScheduleRow) -> Self {
1414 Self {
1415 namespace: r.namespace,
1416 name: r.name,
1417 workflow_type: r.workflow_type,
1418 cron_expr: r.cron_expr,
1419 timezone: r.timezone,
1420 input: r.input,
1421 task_queue: r.task_queue,
1422 overlap_policy: r.overlap_policy,
1423 paused: r.paused,
1424 last_run_at: r.last_run_at,
1425 next_run_at: r.next_run_at,
1426 last_workflow_id: r.last_workflow_id,
1427 created_at: r.created_at,
1428 }
1429 }
1430}
1431
1432#[derive(sqlx::FromRow)]
1433struct PgWorkerRow {
1434 id: String,
1435 namespace: String,
1436 identity: String,
1437 task_queue: String,
1438 workflows: Option<String>,
1439 activities: Option<String>,
1440 max_concurrent_workflows: i32,
1441 max_concurrent_activities: i32,
1442 active_tasks: i32,
1443 last_heartbeat: f64,
1444 registered_at: f64,
1445}
1446
1447impl From<PgWorkerRow> for WorkflowWorker {
1448 fn from(r: PgWorkerRow) -> Self {
1449 Self {
1450 id: r.id,
1451 namespace: r.namespace,
1452 identity: r.identity,
1453 task_queue: r.task_queue,
1454 workflows: r.workflows,
1455 activities: r.activities,
1456 max_concurrent_workflows: r.max_concurrent_workflows,
1457 max_concurrent_activities: r.max_concurrent_activities,
1458 active_tasks: r.active_tasks,
1459 last_heartbeat: r.last_heartbeat,
1460 registered_at: r.registered_at,
1461 }
1462 }
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467 use super::*;
1468
1469 #[test]
1470 fn sanitise_schema_keeps_statements_intact() {
1471 let input = "CREATE TABLE foo (x INT);\nCREATE INDEX idx_foo ON foo(x);\n";
1472 let out = sanitise_schema(input);
1473 assert_eq!(out.len(), 2);
1474 assert!(out[0].starts_with("CREATE TABLE foo"));
1475 assert!(out[1].starts_with("CREATE INDEX idx_foo"));
1476 }
1477
1478 #[test]
1479 fn sanitise_schema_drops_pure_comment_lines() {
1480 let input = "-- header comment\nCREATE TABLE foo (x INT);\n-- trailing comment\n";
1481 let out = sanitise_schema(input);
1482 assert_eq!(out.len(), 1);
1483 assert!(out[0].starts_with("CREATE TABLE foo"));
1484 }
1485
1486 #[test]
1487 fn sanitise_schema_ignores_semicolons_inside_comment_prose() {
1488 let input = "\
1492CREATE TABLE foo (x INT);
1493-- Idempotent across startups; fresh installs pick the column up from the
1494-- CREATE TABLE above so the ADD is a no-op.
1495";
1496 let out = sanitise_schema(input);
1497 assert_eq!(
1498 out.len(),
1499 1,
1500 "expected 1 real statement, got {}: {:?}",
1501 out.len(),
1502 out
1503 );
1504 assert!(out[0].starts_with("CREATE TABLE foo"));
1505 }
1506
1507 #[test]
1508 fn sanitise_schema_drops_indented_comment_lines() {
1509 let input = " -- indented comment\n\tCREATE TABLE foo (x INT);\n";
1510 let out = sanitise_schema(input);
1511 assert_eq!(out.len(), 1);
1512 assert!(out[0].contains("CREATE TABLE foo"));
1513 }
1514
1515 #[test]
1516 fn sanitise_schema_real_constant_produces_only_ddl() {
1517 for stmt in sanitise_schema(SCHEMA) {
1521 let first_word = stmt
1522 .split_whitespace()
1523 .next()
1524 .expect("non-empty statement")
1525 .to_uppercase();
1526 assert!(
1527 matches!(
1528 first_word.as_str(),
1529 "CREATE" | "INSERT" | "UPDATE" | "DROP" | "ALTER" | "WITH"
1530 ),
1531 "SCHEMA produced non-DDL statement starting with {first_word:?}: {stmt:?}"
1532 );
1533 }
1534 }
1535}