1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9 name TEXT PRIMARY KEY,
10 created_at DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13 VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14 ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17 id TEXT PRIMARY KEY,
18 namespace TEXT NOT NULL DEFAULT 'main',
19 run_id TEXT NOT NULL,
20 workflow_type TEXT NOT NULL,
21 task_queue TEXT NOT NULL DEFAULT 'main',
22 status TEXT NOT NULL DEFAULT 'PENDING',
23 input TEXT,
24 result TEXT,
25 error TEXT,
26 parent_id TEXT,
27 claimed_by TEXT,
28 search_attributes TEXT,
29 archived_at DOUBLE PRECISION,
30 archive_uri TEXT,
31 -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
32 needs_dispatch BOOLEAN NOT NULL DEFAULT FALSE,
33 dispatch_claimed_by TEXT,
34 dispatch_last_heartbeat DOUBLE PRECISION,
35 created_at DOUBLE PRECISION NOT NULL,
36 updated_at DOUBLE PRECISION NOT NULL,
37 completed_at DOUBLE PRECISION
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44 id BIGSERIAL PRIMARY KEY,
45 workflow_id TEXT NOT NULL REFERENCES workflows(id),
46 seq INTEGER NOT NULL,
47 event_type TEXT NOT NULL,
48 payload TEXT,
49 timestamp DOUBLE PRECISION NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54 id BIGSERIAL PRIMARY KEY,
55 workflow_id TEXT NOT NULL REFERENCES workflows(id),
56 seq INTEGER NOT NULL,
57 name TEXT NOT NULL,
58 task_queue TEXT NOT NULL DEFAULT 'main',
59 input TEXT,
60 status TEXT NOT NULL DEFAULT 'PENDING',
61 result TEXT,
62 error TEXT,
63 attempt INTEGER NOT NULL DEFAULT 1,
64 max_attempts INTEGER NOT NULL DEFAULT 3,
65 initial_interval_secs DOUBLE PRECISION NOT NULL DEFAULT 1,
66 backoff_coefficient DOUBLE PRECISION NOT NULL DEFAULT 2,
67 start_to_close_secs DOUBLE PRECISION NOT NULL DEFAULT 300,
68 heartbeat_timeout_secs DOUBLE PRECISION,
69 claimed_by TEXT,
70 scheduled_at DOUBLE PRECISION NOT NULL,
71 started_at DOUBLE PRECISION,
72 completed_at DOUBLE PRECISION,
73 last_heartbeat DOUBLE PRECISION,
74 UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79 id BIGSERIAL PRIMARY KEY,
80 workflow_id TEXT NOT NULL REFERENCES workflows(id),
81 seq INTEGER NOT NULL,
82 fire_at DOUBLE PRECISION NOT NULL,
83 fired BOOLEAN NOT NULL DEFAULT FALSE,
84 UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89 id BIGSERIAL PRIMARY KEY,
90 workflow_id TEXT NOT NULL REFERENCES workflows(id),
91 name TEXT NOT NULL,
92 payload TEXT,
93 consumed BOOLEAN NOT NULL DEFAULT FALSE,
94 received_at DOUBLE PRECISION NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99 namespace TEXT NOT NULL DEFAULT 'main',
100 name TEXT NOT NULL,
101 workflow_type TEXT NOT NULL,
102 cron_expr TEXT NOT NULL,
103 timezone TEXT NOT NULL DEFAULT 'UTC',
104 input TEXT,
105 task_queue TEXT NOT NULL DEFAULT 'main',
106 overlap_policy TEXT NOT NULL DEFAULT 'skip',
107 paused BOOLEAN NOT NULL DEFAULT FALSE,
108 last_run_at DOUBLE PRECISION,
109 next_run_at DOUBLE PRECISION,
110 last_workflow_id TEXT,
111 created_at DOUBLE PRECISION NOT NULL,
112 PRIMARY KEY (namespace, name)
113);
114
115CREATE TABLE IF NOT EXISTS workflow_workers (
116 id TEXT PRIMARY KEY,
117 namespace TEXT NOT NULL DEFAULT 'main',
118 identity TEXT NOT NULL,
119 task_queue TEXT NOT NULL,
120 workflows TEXT,
121 activities TEXT,
122 max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
123 max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
124 active_tasks INTEGER NOT NULL DEFAULT 0,
125 last_heartbeat DOUBLE PRECISION NOT NULL,
126 registered_at DOUBLE PRECISION NOT NULL
127);
128
129CREATE TABLE IF NOT EXISTS workflow_snapshots (
130 workflow_id TEXT NOT NULL REFERENCES workflows(id),
131 event_seq INTEGER NOT NULL,
132 state_json TEXT NOT NULL,
133 created_at DOUBLE PRECISION NOT NULL,
134 PRIMARY KEY (workflow_id, event_seq)
135);
136
137CREATE TABLE IF NOT EXISTS api_keys (
138 key_hash TEXT PRIMARY KEY,
139 prefix TEXT NOT NULL,
140 label TEXT,
141 created_at DOUBLE PRECISION NOT NULL
142);
143CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
144
145-- Future additive migrations go below this line. Postgres supports
146-- `ADD COLUMN IF NOT EXISTS` natively, so the pattern is simply:
147--
148-- ALTER TABLE workflows ADD COLUMN IF NOT EXISTS some_new_field TEXT;
149--
150-- Idempotent across startups; fresh installs pick the column up from the
151-- CREATE TABLE above so the ADD is a no-op. Currently no pending
152-- migrations — baseline schema in CREATE TABLE statements above is the
153-- source of truth through v0.11.3.
154"#;
155
156fn sanitise_schema(schema: &str) -> Vec<String> {
168 let without_comments: String = schema
169 .lines()
170 .filter(|line| !line.trim_start().starts_with("--"))
171 .collect::<Vec<_>>()
172 .join("\n");
173
174 without_comments
175 .split(';')
176 .map(|s| s.trim().to_string())
177 .filter(|s| !s.is_empty())
178 .collect()
179}
180
181pub struct PostgresStore {
182 pool: PgPool,
183}
184
185impl PostgresStore {
186 pub async fn new(url: &str) -> Result<Self> {
187 let pool = PgPool::connect(url).await?;
188 let store = Self { pool };
189 store.migrate().await?;
190 Ok(store)
191 }
192
193 async fn migrate(&self) -> Result<()> {
194 for statement in sanitise_schema(SCHEMA) {
195 sqlx::query(&statement).execute(&self.pool).await?;
196 }
197 Ok(())
198 }
199
200 pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
203 let row: (bool,) =
204 sqlx::query_as("SELECT pg_try_advisory_lock(1)")
205 .fetch_one(&self.pool)
206 .await?;
207 Ok(row.0)
208 }
209}
210
211impl WorkflowStore for PostgresStore {
212 async fn create_namespace(&self, name: &str) -> Result<()> {
215 sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
216 .bind(name)
217 .execute(&self.pool)
218 .await?;
219 Ok(())
220 }
221
222 async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
223 let rows = sqlx::query_as::<_, (String, f64)>(
224 "SELECT name, created_at FROM namespaces ORDER BY name",
225 )
226 .fetch_all(&self.pool)
227 .await?;
228 Ok(rows
229 .into_iter()
230 .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
231 .collect())
232 }
233
234 async fn delete_namespace(&self, name: &str) -> Result<bool> {
235 let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
236 .bind(name)
237 .execute(&self.pool)
238 .await?;
239 Ok(res.rows_affected() > 0)
240 }
241
242 async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
243 let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
244 .bind(namespace)
245 .fetch_one(&self.pool)
246 .await?;
247 let running: (i64,) = sqlx::query_as(
248 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
249 )
250 .bind(namespace)
251 .fetch_one(&self.pool)
252 .await?;
253 let pending: (i64,) = sqlx::query_as(
254 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
255 )
256 .bind(namespace)
257 .fetch_one(&self.pool)
258 .await?;
259 let completed: (i64,) = sqlx::query_as(
260 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
261 )
262 .bind(namespace)
263 .fetch_one(&self.pool)
264 .await?;
265 let failed: (i64,) = sqlx::query_as(
266 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
267 )
268 .bind(namespace)
269 .fetch_one(&self.pool)
270 .await?;
271 let schedules: (i64,) =
272 sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
273 .bind(namespace)
274 .fetch_one(&self.pool)
275 .await?;
276 let workers: (i64,) =
277 sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
278 .bind(namespace)
279 .fetch_one(&self.pool)
280 .await?;
281
282 Ok(crate::store::NamespaceStats {
283 namespace: namespace.to_string(),
284 total_workflows: total.0,
285 running: running.0,
286 pending: pending.0,
287 completed: completed.0,
288 failed: failed.0,
289 schedules: schedules.0,
290 workers: workers.0,
291 })
292 }
293
294 async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
297 sqlx::query(
298 "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
299 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
300 )
301 .bind(&wf.id)
302 .bind(&wf.namespace)
303 .bind(&wf.run_id)
304 .bind(&wf.workflow_type)
305 .bind(&wf.task_queue)
306 .bind(&wf.status)
307 .bind(&wf.input)
308 .bind(&wf.result)
309 .bind(&wf.error)
310 .bind(&wf.parent_id)
311 .bind(&wf.claimed_by)
312 .bind(&wf.search_attributes)
313 .bind(wf.archived_at)
314 .bind(&wf.archive_uri)
315 .bind(wf.created_at)
316 .bind(wf.updated_at)
317 .bind(wf.completed_at)
318 .execute(&self.pool)
319 .await?;
320 Ok(())
321 }
322
323 async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
324 let row = sqlx::query_as::<_, PgWorkflowRow>(
325 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
326 )
327 .bind(id)
328 .fetch_optional(&self.pool)
329 .await?;
330 Ok(row.map(Into::into))
331 }
332
333 async fn list_workflows(
334 &self,
335 namespace: &str,
336 status: Option<WorkflowStatus>,
337 workflow_type: Option<&str>,
338 search_attrs_filter: Option<&str>,
339 limit: i64,
340 offset: i64,
341 ) -> Result<Vec<WorkflowRecord>> {
342 let status_str = status.map(|s| s.to_string());
343
344 let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
345 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
346 .and_then(|v| v.as_object().cloned())
347 .map(|m| m.into_iter().collect())
348 .unwrap_or_default();
349
350 let mut sql = String::from(
351 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
352 FROM workflows
353 WHERE namespace = $1
354 AND ($2::TEXT IS NULL OR status = $2)
355 AND ($3::TEXT IS NULL OR workflow_type = $3)",
356 );
357 let mut idx = 4usize;
359 for _ in &filter_pairs {
360 sql.push_str(&format!(
361 " AND (search_attributes::jsonb)->>${} = ${}",
362 idx,
363 idx + 1
364 ));
365 idx += 2;
366 }
367 sql.push_str(&format!(" ORDER BY created_at DESC LIMIT ${} OFFSET ${}", idx, idx + 1));
368
369 let mut q = sqlx::query_as::<_, PgWorkflowRow>(&sql)
370 .bind(namespace)
371 .bind(&status_str)
372 .bind(workflow_type);
373 for (key, value) in &filter_pairs {
374 q = q.bind(key.clone());
375 let as_text = match value {
377 serde_json::Value::String(s) => s.clone(),
378 other => other.to_string(),
379 };
380 q = q.bind(as_text);
381 }
382 let rows = q.bind(limit).bind(offset).fetch_all(&self.pool).await?;
383 Ok(rows.into_iter().map(Into::into).collect())
384 }
385
386 async fn update_workflow_status(
387 &self,
388 id: &str,
389 status: WorkflowStatus,
390 result: Option<&str>,
391 error: Option<&str>,
392 ) -> Result<()> {
393 let now = timestamp_now();
394 let completed_at = if status.is_terminal() { Some(now) } else { None };
395 sqlx::query(
396 "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
397 )
398 .bind(status.to_string())
399 .bind(result)
400 .bind(error)
401 .bind(now)
402 .bind(completed_at)
403 .bind(id)
404 .execute(&self.pool)
405 .await?;
406 Ok(())
407 }
408
409 async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
410 let res = sqlx::query(
411 "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
412 )
413 .bind(worker_id)
414 .bind(timestamp_now())
415 .bind(id)
416 .execute(&self.pool)
417 .await?;
418 Ok(res.rows_affected() > 0)
419 }
420
421 async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
422 sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
423 .bind(workflow_id)
424 .execute(&self.pool)
425 .await?;
426 Ok(())
427 }
428
429 async fn claim_workflow_task(
430 &self,
431 task_queue: &str,
432 worker_id: &str,
433 ) -> Result<Option<WorkflowRecord>> {
434 let now = timestamp_now();
435 let row = sqlx::query_as::<_, PgWorkflowRow>(
438 "UPDATE workflows
439 SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
440 WHERE id = (
441 SELECT id FROM workflows
442 WHERE task_queue = $3
443 AND needs_dispatch = TRUE
444 AND dispatch_claimed_by IS NULL
445 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
446 ORDER BY updated_at ASC
447 FOR UPDATE SKIP LOCKED
448 LIMIT 1
449 )
450 RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
451 )
452 .bind(worker_id)
453 .bind(now)
454 .bind(task_queue)
455 .fetch_optional(&self.pool)
456 .await?;
457 Ok(row.map(Into::into))
458 }
459
460 async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
461 sqlx::query(
462 "UPDATE workflows
463 SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
464 WHERE id = $1 AND dispatch_claimed_by = $2",
465 )
466 .bind(workflow_id)
467 .bind(worker_id)
468 .execute(&self.pool)
469 .await?;
470 Ok(())
471 }
472
473 async fn release_stale_dispatch_leases(
474 &self,
475 now: f64,
476 timeout_secs: f64,
477 ) -> Result<u64> {
478 let res = sqlx::query(
479 "UPDATE workflows
480 SET dispatch_claimed_by = NULL,
481 dispatch_last_heartbeat = NULL,
482 needs_dispatch = TRUE
483 WHERE dispatch_claimed_by IS NOT NULL
484 AND ($1 - dispatch_last_heartbeat) > $2
485 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
486 )
487 .bind(now)
488 .bind(timeout_secs)
489 .execute(&self.pool)
490 .await?;
491 Ok(res.rows_affected())
492 }
493
494 async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
497 let row: (i64,) = sqlx::query_as(
498 "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
499 )
500 .bind(&ev.workflow_id)
501 .bind(ev.seq)
502 .bind(&ev.event_type)
503 .bind(&ev.payload)
504 .bind(ev.timestamp)
505 .fetch_one(&self.pool)
506 .await?;
507 Ok(row.0)
508 }
509
510 async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
511 let rows = sqlx::query_as::<_, PgEventRow>(
512 "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
513 )
514 .bind(workflow_id)
515 .fetch_all(&self.pool)
516 .await?;
517 Ok(rows.into_iter().map(Into::into).collect())
518 }
519
520 async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
521 let row: (i64,) =
522 sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
523 .bind(workflow_id)
524 .fetch_one(&self.pool)
525 .await?;
526 Ok(row.0)
527 }
528
529 async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
532 let row: (i64,) = sqlx::query_as(
533 "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
534 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
535 )
536 .bind(&act.workflow_id)
537 .bind(act.seq)
538 .bind(&act.name)
539 .bind(&act.task_queue)
540 .bind(&act.input)
541 .bind(&act.status)
542 .bind(act.attempt)
543 .bind(act.max_attempts)
544 .bind(act.initial_interval_secs)
545 .bind(act.backoff_coefficient)
546 .bind(act.start_to_close_secs)
547 .bind(act.heartbeat_timeout_secs)
548 .bind(act.scheduled_at)
549 .fetch_one(&self.pool)
550 .await?;
551 Ok(row.0)
552 }
553
554 async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
555 let row = sqlx::query_as::<_, PgActivityRow>(
556 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
557 FROM workflow_activities WHERE id = $1",
558 )
559 .bind(id)
560 .fetch_optional(&self.pool)
561 .await?;
562 Ok(row.map(Into::into))
563 }
564
565 async fn get_activity_by_workflow_seq(
566 &self,
567 workflow_id: &str,
568 seq: i32,
569 ) -> Result<Option<WorkflowActivity>> {
570 let row = sqlx::query_as::<_, PgActivityRow>(
571 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
572 FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
573 )
574 .bind(workflow_id)
575 .bind(seq)
576 .fetch_optional(&self.pool)
577 .await?;
578 Ok(row.map(Into::into))
579 }
580
581 async fn claim_activity(
582 &self,
583 task_queue: &str,
584 worker_id: &str,
585 ) -> Result<Option<WorkflowActivity>> {
586 let now = timestamp_now();
587 let row = sqlx::query_as::<_, PgActivityRow>(
590 "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
591 WHERE id = (
592 SELECT id FROM workflow_activities
593 WHERE task_queue = $3 AND status = 'PENDING'
594 ORDER BY scheduled_at ASC
595 FOR UPDATE SKIP LOCKED
596 LIMIT 1
597 )
598 RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
599 )
600 .bind(worker_id)
601 .bind(now)
602 .bind(task_queue)
603 .fetch_optional(&self.pool)
604 .await?;
605 Ok(row.map(Into::into))
606 }
607
608 async fn requeue_activity_for_retry(
609 &self,
610 id: i64,
611 next_attempt: i32,
612 next_scheduled_at: f64,
613 ) -> Result<()> {
614 sqlx::query(
615 "UPDATE workflow_activities
616 SET status = 'PENDING', attempt = $1, scheduled_at = $2,
617 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
618 error = NULL
619 WHERE id = $3",
620 )
621 .bind(next_attempt)
622 .bind(next_scheduled_at)
623 .bind(id)
624 .execute(&self.pool)
625 .await?;
626 Ok(())
627 }
628
629 async fn complete_activity(
630 &self,
631 id: i64,
632 result: Option<&str>,
633 error: Option<&str>,
634 failed: bool,
635 ) -> Result<()> {
636 let status = if failed { "FAILED" } else { "COMPLETED" };
637 sqlx::query(
638 "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
639 )
640 .bind(status)
641 .bind(result)
642 .bind(error)
643 .bind(timestamp_now())
644 .bind(id)
645 .execute(&self.pool)
646 .await?;
647 Ok(())
648 }
649
650 async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
651 sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
652 .bind(timestamp_now())
653 .bind(id)
654 .execute(&self.pool)
655 .await?;
656 Ok(())
657 }
658
659 async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
660 let rows = sqlx::query_as::<_, PgActivityRow>(
661 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
662 FROM workflow_activities
663 WHERE status = 'RUNNING'
664 AND heartbeat_timeout_secs IS NOT NULL
665 AND last_heartbeat IS NOT NULL
666 AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
667 )
668 .bind(now)
669 .fetch_all(&self.pool)
670 .await?;
671 Ok(rows.into_iter().map(Into::into).collect())
672 }
673
674 async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
677 let row: (i64,) = sqlx::query_as(
678 "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
679 )
680 .bind(&timer.workflow_id)
681 .bind(timer.seq)
682 .bind(timer.fire_at)
683 .fetch_one(&self.pool)
684 .await?;
685 Ok(row.0)
686 }
687
688 async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
689 let res = sqlx::query(
690 "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
691 WHERE workflow_id = $2 AND status = 'PENDING'",
692 )
693 .bind(timestamp_now())
694 .bind(workflow_id)
695 .execute(&self.pool)
696 .await?;
697 Ok(res.rows_affected())
698 }
699
700 async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
701 let res = sqlx::query(
702 "UPDATE workflow_timers SET fired = TRUE
703 WHERE workflow_id = $1 AND fired = FALSE",
704 )
705 .bind(workflow_id)
706 .execute(&self.pool)
707 .await?;
708 Ok(res.rows_affected())
709 }
710
711 async fn get_timer_by_workflow_seq(
712 &self,
713 workflow_id: &str,
714 seq: i32,
715 ) -> Result<Option<WorkflowTimer>> {
716 let row = sqlx::query_as::<_, PgTimerRow>(
717 "SELECT id, workflow_id, seq, fire_at, fired
718 FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
719 )
720 .bind(workflow_id)
721 .bind(seq)
722 .fetch_optional(&self.pool)
723 .await?;
724 Ok(row.map(Into::into))
725 }
726
727 async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
728 let rows = sqlx::query_as::<_, PgTimerRow>(
729 "UPDATE workflow_timers SET fired = TRUE
730 WHERE fired = FALSE AND fire_at <= $1
731 RETURNING id, workflow_id, seq, fire_at, fired",
732 )
733 .bind(now)
734 .fetch_all(&self.pool)
735 .await?;
736 Ok(rows.into_iter().map(Into::into).collect())
737 }
738
739 async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
742 let row: (i64,) = sqlx::query_as(
743 "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
744 )
745 .bind(&sig.workflow_id)
746 .bind(&sig.name)
747 .bind(&sig.payload)
748 .bind(sig.received_at)
749 .fetch_one(&self.pool)
750 .await?;
751 Ok(row.0)
752 }
753
754 async fn consume_signals(
755 &self,
756 workflow_id: &str,
757 name: &str,
758 ) -> Result<Vec<WorkflowSignal>> {
759 let rows = sqlx::query_as::<_, PgSignalRow>(
760 "UPDATE workflow_signals SET consumed = TRUE
761 WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
762 RETURNING id, workflow_id, name, payload, consumed, received_at",
763 )
764 .bind(workflow_id)
765 .bind(name)
766 .fetch_all(&self.pool)
767 .await?;
768 Ok(rows.into_iter().map(Into::into).collect())
769 }
770
771 async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
774 sqlx::query(
775 "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
776 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
777 )
778 .bind(&sched.namespace)
779 .bind(&sched.name)
780 .bind(&sched.workflow_type)
781 .bind(&sched.cron_expr)
782 .bind(&sched.timezone)
783 .bind(&sched.input)
784 .bind(&sched.task_queue)
785 .bind(&sched.overlap_policy)
786 .bind(sched.paused)
787 .bind(sched.last_run_at)
788 .bind(sched.next_run_at)
789 .bind(&sched.last_workflow_id)
790 .bind(sched.created_at)
791 .execute(&self.pool)
792 .await?;
793 Ok(())
794 }
795
796 async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
797 let row = sqlx::query_as::<_, PgScheduleRow>(
798 "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
799 )
800 .bind(namespace)
801 .bind(name)
802 .fetch_optional(&self.pool)
803 .await?;
804 Ok(row.map(Into::into))
805 }
806
807 async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
808 let rows = sqlx::query_as::<_, PgScheduleRow>(
809 "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
810 )
811 .bind(namespace)
812 .fetch_all(&self.pool)
813 .await?;
814 Ok(rows.into_iter().map(Into::into).collect())
815 }
816
817 async fn update_schedule_last_run(
818 &self,
819 namespace: &str,
820 name: &str,
821 last_run_at: f64,
822 next_run_at: f64,
823 workflow_id: &str,
824 ) -> Result<()> {
825 sqlx::query(
826 "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
827 )
828 .bind(last_run_at)
829 .bind(next_run_at)
830 .bind(workflow_id)
831 .bind(namespace)
832 .bind(name)
833 .execute(&self.pool)
834 .await?;
835 Ok(())
836 }
837
838 async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
839 let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
840 .bind(namespace)
841 .bind(name)
842 .execute(&self.pool)
843 .await?;
844 Ok(res.rows_affected() > 0)
845 }
846
847 async fn list_archivable_workflows(
848 &self,
849 cutoff: f64,
850 limit: i64,
851 ) -> Result<Vec<WorkflowRecord>> {
852 let rows = sqlx::query_as::<_, PgWorkflowRow>(
853 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
854 FROM workflows
855 WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
856 AND completed_at IS NOT NULL
857 AND completed_at < $1
858 AND archived_at IS NULL
859 ORDER BY completed_at ASC
860 LIMIT $2",
861 )
862 .bind(cutoff)
863 .bind(limit)
864 .fetch_all(&self.pool)
865 .await?;
866 Ok(rows.into_iter().map(Into::into).collect())
867 }
868
869 async fn mark_archived_and_purge(
870 &self,
871 workflow_id: &str,
872 archive_uri: &str,
873 archived_at: f64,
874 ) -> Result<()> {
875 let mut tx = self.pool.begin().await?;
876 sqlx::query("DELETE FROM workflow_events WHERE workflow_id = $1")
877 .bind(workflow_id)
878 .execute(&mut *tx)
879 .await?;
880 sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = $1")
881 .bind(workflow_id)
882 .execute(&mut *tx)
883 .await?;
884 sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = $1")
885 .bind(workflow_id)
886 .execute(&mut *tx)
887 .await?;
888 sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = $1")
889 .bind(workflow_id)
890 .execute(&mut *tx)
891 .await?;
892 sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = $1")
893 .bind(workflow_id)
894 .execute(&mut *tx)
895 .await?;
896 sqlx::query(
897 "UPDATE workflows SET archived_at = $1, archive_uri = $2 WHERE id = $3",
898 )
899 .bind(archived_at)
900 .bind(archive_uri)
901 .bind(workflow_id)
902 .execute(&mut *tx)
903 .await?;
904 tx.commit().await?;
905 Ok(())
906 }
907
908 async fn upsert_search_attributes(
909 &self,
910 workflow_id: &str,
911 patch_json: &str,
912 ) -> Result<()> {
913 let current: Option<(Option<String>,)> =
914 sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = $1")
915 .bind(workflow_id)
916 .fetch_optional(&self.pool)
917 .await?;
918 let merged = crate::store::sqlite::merge_search_attrs(
919 current.and_then(|(s,)| s).as_deref(),
920 patch_json,
921 )?;
922 sqlx::query("UPDATE workflows SET search_attributes = $1 WHERE id = $2")
923 .bind(merged)
924 .bind(workflow_id)
925 .execute(&self.pool)
926 .await?;
927 Ok(())
928 }
929
930 async fn update_schedule(
931 &self,
932 namespace: &str,
933 name: &str,
934 patch: &SchedulePatch,
935 ) -> Result<Option<WorkflowSchedule>> {
936 let mut sets: Vec<String> = Vec::new();
937 let mut idx = 1usize;
938 if patch.cron_expr.is_some() {
939 sets.push(format!("cron_expr = ${idx}"));
940 idx += 1;
941 }
942 if patch.timezone.is_some() {
943 sets.push(format!("timezone = ${idx}"));
944 idx += 1;
945 }
946 if patch.input.is_some() {
947 sets.push(format!("input = ${idx}"));
948 idx += 1;
949 }
950 if patch.task_queue.is_some() {
951 sets.push(format!("task_queue = ${idx}"));
952 idx += 1;
953 }
954 if patch.overlap_policy.is_some() {
955 sets.push(format!("overlap_policy = ${idx}"));
956 idx += 1;
957 }
958 if sets.is_empty() {
959 return self.get_schedule(namespace, name).await;
960 }
961 let sql = format!(
962 "UPDATE workflow_schedules SET {} WHERE namespace = ${} AND name = ${}",
963 sets.join(", "),
964 idx,
965 idx + 1
966 );
967 let mut q = sqlx::query(&sql);
968 if let Some(ref v) = patch.cron_expr {
969 q = q.bind(v);
970 }
971 if let Some(ref v) = patch.timezone {
972 q = q.bind(v);
973 }
974 if let Some(ref v) = patch.input {
975 q = q.bind(v.to_string());
976 }
977 if let Some(ref v) = patch.task_queue {
978 q = q.bind(v);
979 }
980 if let Some(ref v) = patch.overlap_policy {
981 q = q.bind(v);
982 }
983 let res = q
984 .bind(namespace)
985 .bind(name)
986 .execute(&self.pool)
987 .await?;
988 if res.rows_affected() == 0 {
989 return Ok(None);
990 }
991 self.get_schedule(namespace, name).await
992 }
993
994 async fn set_schedule_paused(
995 &self,
996 namespace: &str,
997 name: &str,
998 paused: bool,
999 ) -> Result<Option<WorkflowSchedule>> {
1000 let res = sqlx::query(
1001 "UPDATE workflow_schedules SET paused = $1 WHERE namespace = $2 AND name = $3",
1002 )
1003 .bind(paused)
1004 .bind(namespace)
1005 .bind(name)
1006 .execute(&self.pool)
1007 .await?;
1008 if res.rows_affected() == 0 {
1009 return Ok(None);
1010 }
1011 self.get_schedule(namespace, name).await
1012 }
1013
1014 async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
1017 sqlx::query(
1018 "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
1019 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
1020 ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
1021 )
1022 .bind(&w.id)
1023 .bind(&w.namespace)
1024 .bind(&w.identity)
1025 .bind(&w.task_queue)
1026 .bind(&w.workflows)
1027 .bind(&w.activities)
1028 .bind(w.max_concurrent_workflows)
1029 .bind(w.max_concurrent_activities)
1030 .bind(w.active_tasks)
1031 .bind(w.last_heartbeat)
1032 .bind(w.registered_at)
1033 .execute(&self.pool)
1034 .await?;
1035 Ok(())
1036 }
1037
1038 async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1039 sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
1040 .bind(now)
1041 .bind(id)
1042 .execute(&self.pool)
1043 .await?;
1044 Ok(())
1045 }
1046
1047 async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1048 let rows = sqlx::query_as::<_, PgWorkerRow>(
1049 "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
1050 )
1051 .bind(namespace)
1052 .fetch_all(&self.pool)
1053 .await?;
1054 Ok(rows.into_iter().map(Into::into).collect())
1055 }
1056
1057 async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1058 let rows: Vec<(String,)> =
1059 sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
1060 .bind(cutoff)
1061 .fetch_all(&self.pool)
1062 .await?;
1063 let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1064 if !ids.is_empty() {
1065 sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
1066 .bind(cutoff)
1067 .execute(&self.pool)
1068 .await?;
1069 }
1070 Ok(ids)
1071 }
1072
1073 async fn create_api_key(
1076 &self,
1077 key_hash: &str,
1078 prefix: &str,
1079 label: Option<&str>,
1080 created_at: f64,
1081 ) -> Result<()> {
1082 sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
1083 .bind(key_hash)
1084 .bind(prefix)
1085 .bind(label)
1086 .bind(created_at)
1087 .execute(&self.pool)
1088 .await?;
1089 Ok(())
1090 }
1091
1092 async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1093 let row: Option<(i64,)> =
1094 sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
1095 .bind(key_hash)
1096 .fetch_optional(&self.pool)
1097 .await?;
1098 Ok(row.is_some())
1099 }
1100
1101 async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
1102 let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1103 "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1104 )
1105 .fetch_all(&self.pool)
1106 .await?;
1107 Ok(rows
1108 .into_iter()
1109 .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1110 prefix,
1111 label,
1112 created_at,
1113 })
1114 .collect())
1115 }
1116
1117 async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1118 let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
1119 .bind(prefix)
1120 .execute(&self.pool)
1121 .await?;
1122 Ok(res.rows_affected() > 0)
1123 }
1124
1125 async fn api_keys_empty(&self) -> Result<bool> {
1126 let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM api_keys")
1127 .fetch_one(&self.pool)
1128 .await?;
1129 Ok(row.0 == 0)
1130 }
1131
1132 async fn get_api_key_by_label(
1133 &self,
1134 label: &str,
1135 ) -> Result<Option<crate::store::ApiKeyRecord>> {
1136 let row: Option<(String, Option<String>, f64)> = sqlx::query_as(
1137 "SELECT prefix, label, created_at FROM api_keys WHERE label = $1 LIMIT 1",
1138 )
1139 .bind(label)
1140 .fetch_optional(&self.pool)
1141 .await?;
1142 Ok(row.map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1143 prefix,
1144 label,
1145 created_at,
1146 }))
1147 }
1148
1149 async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1152 let rows = sqlx::query_as::<_, PgWorkflowRow>(
1153 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1154 FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
1155 )
1156 .bind(parent_id)
1157 .fetch_all(&self.pool)
1158 .await?;
1159 Ok(rows.into_iter().map(Into::into).collect())
1160 }
1161
1162 async fn create_snapshot(
1165 &self,
1166 workflow_id: &str,
1167 event_seq: i32,
1168 state_json: &str,
1169 ) -> Result<()> {
1170 sqlx::query(
1171 "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1172 VALUES ($1, $2, $3, $4)
1173 ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
1174 )
1175 .bind(workflow_id)
1176 .bind(event_seq)
1177 .bind(state_json)
1178 .bind(timestamp_now())
1179 .execute(&self.pool)
1180 .await?;
1181 Ok(())
1182 }
1183
1184 async fn get_latest_snapshot(
1185 &self,
1186 workflow_id: &str,
1187 ) -> Result<Option<WorkflowSnapshot>> {
1188 let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1189 "SELECT workflow_id, event_seq, state_json, created_at
1190 FROM workflow_snapshots WHERE workflow_id = $1
1191 ORDER BY event_seq DESC LIMIT 1",
1192 )
1193 .bind(workflow_id)
1194 .fetch_optional(&self.pool)
1195 .await?;
1196
1197 Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1198 workflow_id,
1199 event_seq,
1200 state_json,
1201 created_at,
1202 }))
1203 }
1204
1205 async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1208 let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
1209 "SELECT
1210 a.task_queue AS queue,
1211 SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
1212 SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
1213 (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
1214 FROM workflow_activities a
1215 JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
1216 GROUP BY a.task_queue",
1217 )
1218 .bind(namespace)
1219 .fetch_all(&self.pool)
1220 .await?;
1221
1222 Ok(rows
1223 .into_iter()
1224 .map(|(queue, pending, running, workers)| crate::store::QueueStats {
1225 queue,
1226 pending_activities: pending,
1227 running_activities: running,
1228 workers,
1229 })
1230 .collect())
1231 }
1232
1233 async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1236 let row: (bool,) =
1240 sqlx::query_as("SELECT pg_try_advisory_lock(42)")
1241 .fetch_one(&self.pool)
1242 .await?;
1243 Ok(row.0)
1244 }
1245}
1246
1247fn timestamp_now() -> f64 {
1248 std::time::SystemTime::now()
1249 .duration_since(std::time::UNIX_EPOCH)
1250 .unwrap()
1251 .as_secs_f64()
1252}
1253
1254#[derive(sqlx::FromRow)]
1257struct PgWorkflowRow {
1258 id: String,
1259 namespace: String,
1260 run_id: String,
1261 workflow_type: String,
1262 task_queue: String,
1263 status: String,
1264 input: Option<String>,
1265 result: Option<String>,
1266 error: Option<String>,
1267 parent_id: Option<String>,
1268 claimed_by: Option<String>,
1269 search_attributes: Option<String>,
1270 archived_at: Option<f64>,
1271 archive_uri: Option<String>,
1272 created_at: f64,
1273 updated_at: f64,
1274 completed_at: Option<f64>,
1275}
1276
1277impl From<PgWorkflowRow> for WorkflowRecord {
1278 fn from(r: PgWorkflowRow) -> Self {
1279 Self {
1280 id: r.id,
1281 namespace: r.namespace,
1282 run_id: r.run_id,
1283 workflow_type: r.workflow_type,
1284 task_queue: r.task_queue,
1285 status: r.status,
1286 input: r.input,
1287 result: r.result,
1288 error: r.error,
1289 parent_id: r.parent_id,
1290 claimed_by: r.claimed_by,
1291 search_attributes: r.search_attributes,
1292 archived_at: r.archived_at,
1293 archive_uri: r.archive_uri,
1294 created_at: r.created_at,
1295 updated_at: r.updated_at,
1296 completed_at: r.completed_at,
1297 }
1298 }
1299}
1300
1301#[derive(sqlx::FromRow)]
1302struct PgEventRow {
1303 id: i64,
1304 workflow_id: String,
1305 seq: i32,
1306 event_type: String,
1307 payload: Option<String>,
1308 timestamp: f64,
1309}
1310
1311impl From<PgEventRow> for WorkflowEvent {
1312 fn from(r: PgEventRow) -> Self {
1313 Self {
1314 id: Some(r.id),
1315 workflow_id: r.workflow_id,
1316 seq: r.seq,
1317 event_type: r.event_type,
1318 payload: r.payload,
1319 timestamp: r.timestamp,
1320 }
1321 }
1322}
1323
1324#[derive(sqlx::FromRow)]
1325struct PgActivityRow {
1326 id: i64,
1327 workflow_id: String,
1328 seq: i32,
1329 name: String,
1330 task_queue: String,
1331 input: Option<String>,
1332 status: String,
1333 result: Option<String>,
1334 error: Option<String>,
1335 attempt: i32,
1336 max_attempts: i32,
1337 initial_interval_secs: f64,
1338 backoff_coefficient: f64,
1339 start_to_close_secs: f64,
1340 heartbeat_timeout_secs: Option<f64>,
1341 claimed_by: Option<String>,
1342 scheduled_at: f64,
1343 started_at: Option<f64>,
1344 completed_at: Option<f64>,
1345 last_heartbeat: Option<f64>,
1346}
1347
1348impl From<PgActivityRow> for WorkflowActivity {
1349 fn from(r: PgActivityRow) -> Self {
1350 Self {
1351 id: Some(r.id),
1352 workflow_id: r.workflow_id,
1353 seq: r.seq,
1354 name: r.name,
1355 task_queue: r.task_queue,
1356 input: r.input,
1357 status: r.status,
1358 result: r.result,
1359 error: r.error,
1360 attempt: r.attempt,
1361 max_attempts: r.max_attempts,
1362 initial_interval_secs: r.initial_interval_secs,
1363 backoff_coefficient: r.backoff_coefficient,
1364 start_to_close_secs: r.start_to_close_secs,
1365 heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1366 claimed_by: r.claimed_by,
1367 scheduled_at: r.scheduled_at,
1368 started_at: r.started_at,
1369 completed_at: r.completed_at,
1370 last_heartbeat: r.last_heartbeat,
1371 }
1372 }
1373}
1374
1375#[derive(sqlx::FromRow)]
1376struct PgTimerRow {
1377 id: i64,
1378 workflow_id: String,
1379 seq: i32,
1380 fire_at: f64,
1381 fired: bool,
1382}
1383
1384impl From<PgTimerRow> for WorkflowTimer {
1385 fn from(r: PgTimerRow) -> Self {
1386 Self {
1387 id: Some(r.id),
1388 workflow_id: r.workflow_id,
1389 seq: r.seq,
1390 fire_at: r.fire_at,
1391 fired: r.fired,
1392 }
1393 }
1394}
1395
1396#[derive(sqlx::FromRow)]
1397struct PgSignalRow {
1398 id: i64,
1399 workflow_id: String,
1400 name: String,
1401 payload: Option<String>,
1402 consumed: bool,
1403 received_at: f64,
1404}
1405
1406impl From<PgSignalRow> for WorkflowSignal {
1407 fn from(r: PgSignalRow) -> Self {
1408 Self {
1409 id: Some(r.id),
1410 workflow_id: r.workflow_id,
1411 name: r.name,
1412 payload: r.payload,
1413 consumed: r.consumed,
1414 received_at: r.received_at,
1415 }
1416 }
1417}
1418
1419#[derive(sqlx::FromRow)]
1420struct PgScheduleRow {
1421 namespace: String,
1422 name: String,
1423 workflow_type: String,
1424 cron_expr: String,
1425 timezone: String,
1426 input: Option<String>,
1427 task_queue: String,
1428 overlap_policy: String,
1429 paused: bool,
1430 last_run_at: Option<f64>,
1431 next_run_at: Option<f64>,
1432 last_workflow_id: Option<String>,
1433 created_at: f64,
1434}
1435
1436impl From<PgScheduleRow> for WorkflowSchedule {
1437 fn from(r: PgScheduleRow) -> Self {
1438 Self {
1439 namespace: r.namespace,
1440 name: r.name,
1441 workflow_type: r.workflow_type,
1442 cron_expr: r.cron_expr,
1443 timezone: r.timezone,
1444 input: r.input,
1445 task_queue: r.task_queue,
1446 overlap_policy: r.overlap_policy,
1447 paused: r.paused,
1448 last_run_at: r.last_run_at,
1449 next_run_at: r.next_run_at,
1450 last_workflow_id: r.last_workflow_id,
1451 created_at: r.created_at,
1452 }
1453 }
1454}
1455
1456#[derive(sqlx::FromRow)]
1457struct PgWorkerRow {
1458 id: String,
1459 namespace: String,
1460 identity: String,
1461 task_queue: String,
1462 workflows: Option<String>,
1463 activities: Option<String>,
1464 max_concurrent_workflows: i32,
1465 max_concurrent_activities: i32,
1466 active_tasks: i32,
1467 last_heartbeat: f64,
1468 registered_at: f64,
1469}
1470
1471impl From<PgWorkerRow> for WorkflowWorker {
1472 fn from(r: PgWorkerRow) -> Self {
1473 Self {
1474 id: r.id,
1475 namespace: r.namespace,
1476 identity: r.identity,
1477 task_queue: r.task_queue,
1478 workflows: r.workflows,
1479 activities: r.activities,
1480 max_concurrent_workflows: r.max_concurrent_workflows,
1481 max_concurrent_activities: r.max_concurrent_activities,
1482 active_tasks: r.active_tasks,
1483 last_heartbeat: r.last_heartbeat,
1484 registered_at: r.registered_at,
1485 }
1486 }
1487}
1488
1489#[cfg(test)]
1490mod tests {
1491 use super::*;
1492
1493 #[test]
1494 fn sanitise_schema_keeps_statements_intact() {
1495 let input = "CREATE TABLE foo (x INT);\nCREATE INDEX idx_foo ON foo(x);\n";
1496 let out = sanitise_schema(input);
1497 assert_eq!(out.len(), 2);
1498 assert!(out[0].starts_with("CREATE TABLE foo"));
1499 assert!(out[1].starts_with("CREATE INDEX idx_foo"));
1500 }
1501
1502 #[test]
1503 fn sanitise_schema_drops_pure_comment_lines() {
1504 let input = "-- header comment\nCREATE TABLE foo (x INT);\n-- trailing comment\n";
1505 let out = sanitise_schema(input);
1506 assert_eq!(out.len(), 1);
1507 assert!(out[0].starts_with("CREATE TABLE foo"));
1508 }
1509
1510 #[test]
1511 fn sanitise_schema_ignores_semicolons_inside_comment_prose() {
1512 let input = "\
1516CREATE TABLE foo (x INT);
1517-- Idempotent across startups; fresh installs pick the column up from the
1518-- CREATE TABLE above so the ADD is a no-op.
1519";
1520 let out = sanitise_schema(input);
1521 assert_eq!(
1522 out.len(),
1523 1,
1524 "expected 1 real statement, got {}: {:?}",
1525 out.len(),
1526 out
1527 );
1528 assert!(out[0].starts_with("CREATE TABLE foo"));
1529 }
1530
1531 #[test]
1532 fn sanitise_schema_drops_indented_comment_lines() {
1533 let input = " -- indented comment\n\tCREATE TABLE foo (x INT);\n";
1534 let out = sanitise_schema(input);
1535 assert_eq!(out.len(), 1);
1536 assert!(out[0].contains("CREATE TABLE foo"));
1537 }
1538
1539 #[test]
1540 fn sanitise_schema_real_constant_produces_only_ddl() {
1541 for stmt in sanitise_schema(SCHEMA) {
1545 let first_word = stmt
1546 .split_whitespace()
1547 .next()
1548 .expect("non-empty statement")
1549 .to_uppercase();
1550 assert!(
1551 matches!(
1552 first_word.as_str(),
1553 "CREATE" | "INSERT" | "UPDATE" | "DROP" | "ALTER" | "WITH"
1554 ),
1555 "SCHEMA produced non-DDL statement starting with {first_word:?}: {stmt:?}"
1556 );
1557 }
1558 }
1559}