1use anyhow::Result;
2use sqlx::PgPool;
3
4use crate::store::WorkflowStore;
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9 name TEXT PRIMARY KEY,
10 created_at DOUBLE PRECISION NOT NULL
11);
12INSERT INTO namespaces (name, created_at)
13 VALUES ('main', EXTRACT(EPOCH FROM NOW()))
14 ON CONFLICT DO NOTHING;
15
16CREATE TABLE IF NOT EXISTS workflows (
17 id TEXT PRIMARY KEY,
18 namespace TEXT NOT NULL DEFAULT 'main',
19 run_id TEXT NOT NULL,
20 workflow_type TEXT NOT NULL,
21 task_queue TEXT NOT NULL DEFAULT 'main',
22 status TEXT NOT NULL DEFAULT 'PENDING',
23 input TEXT,
24 result TEXT,
25 error TEXT,
26 parent_id TEXT,
27 claimed_by TEXT,
28 search_attributes TEXT,
29 archived_at DOUBLE PRECISION,
30 archive_uri TEXT,
31 -- Workflow-task dispatch (Phase 9): see sqlite.rs for the full comment.
32 needs_dispatch BOOLEAN NOT NULL DEFAULT FALSE,
33 dispatch_claimed_by TEXT,
34 dispatch_last_heartbeat DOUBLE PRECISION,
35 created_at DOUBLE PRECISION NOT NULL,
36 updated_at DOUBLE PRECISION NOT NULL,
37 completed_at DOUBLE PRECISION
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44 id BIGSERIAL PRIMARY KEY,
45 workflow_id TEXT NOT NULL REFERENCES workflows(id),
46 seq INTEGER NOT NULL,
47 event_type TEXT NOT NULL,
48 payload TEXT,
49 timestamp DOUBLE PRECISION NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54 id BIGSERIAL PRIMARY KEY,
55 workflow_id TEXT NOT NULL REFERENCES workflows(id),
56 seq INTEGER NOT NULL,
57 name TEXT NOT NULL,
58 task_queue TEXT NOT NULL DEFAULT 'main',
59 input TEXT,
60 status TEXT NOT NULL DEFAULT 'PENDING',
61 result TEXT,
62 error TEXT,
63 attempt INTEGER NOT NULL DEFAULT 1,
64 max_attempts INTEGER NOT NULL DEFAULT 3,
65 initial_interval_secs DOUBLE PRECISION NOT NULL DEFAULT 1,
66 backoff_coefficient DOUBLE PRECISION NOT NULL DEFAULT 2,
67 start_to_close_secs DOUBLE PRECISION NOT NULL DEFAULT 300,
68 heartbeat_timeout_secs DOUBLE PRECISION,
69 claimed_by TEXT,
70 scheduled_at DOUBLE PRECISION NOT NULL,
71 started_at DOUBLE PRECISION,
72 completed_at DOUBLE PRECISION,
73 last_heartbeat DOUBLE PRECISION,
74 UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79 id BIGSERIAL PRIMARY KEY,
80 workflow_id TEXT NOT NULL REFERENCES workflows(id),
81 seq INTEGER NOT NULL,
82 fire_at DOUBLE PRECISION NOT NULL,
83 fired BOOLEAN NOT NULL DEFAULT FALSE,
84 UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at) WHERE fired = FALSE;
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89 id BIGSERIAL PRIMARY KEY,
90 workflow_id TEXT NOT NULL REFERENCES workflows(id),
91 name TEXT NOT NULL,
92 payload TEXT,
93 consumed BOOLEAN NOT NULL DEFAULT FALSE,
94 received_at DOUBLE PRECISION NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99 namespace TEXT NOT NULL DEFAULT 'main',
100 name TEXT NOT NULL,
101 workflow_type TEXT NOT NULL,
102 cron_expr TEXT NOT NULL,
103 timezone TEXT NOT NULL DEFAULT 'UTC',
104 input TEXT,
105 task_queue TEXT NOT NULL DEFAULT 'main',
106 overlap_policy TEXT NOT NULL DEFAULT 'skip',
107 paused BOOLEAN NOT NULL DEFAULT FALSE,
108 last_run_at DOUBLE PRECISION,
109 next_run_at DOUBLE PRECISION,
110 last_workflow_id TEXT,
111 created_at DOUBLE PRECISION NOT NULL,
112 PRIMARY KEY (namespace, name)
113);
114
115CREATE TABLE IF NOT EXISTS workflow_workers (
116 id TEXT PRIMARY KEY,
117 namespace TEXT NOT NULL DEFAULT 'main',
118 identity TEXT NOT NULL,
119 task_queue TEXT NOT NULL,
120 workflows TEXT,
121 activities TEXT,
122 max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
123 max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
124 active_tasks INTEGER NOT NULL DEFAULT 0,
125 last_heartbeat DOUBLE PRECISION NOT NULL,
126 registered_at DOUBLE PRECISION NOT NULL
127);
128
129CREATE TABLE IF NOT EXISTS workflow_snapshots (
130 workflow_id TEXT NOT NULL REFERENCES workflows(id),
131 event_seq INTEGER NOT NULL,
132 state_json TEXT NOT NULL,
133 created_at DOUBLE PRECISION NOT NULL,
134 PRIMARY KEY (workflow_id, event_seq)
135);
136
137CREATE TABLE IF NOT EXISTS api_keys (
138 key_hash TEXT PRIMARY KEY,
139 prefix TEXT NOT NULL,
140 label TEXT,
141 created_at DOUBLE PRECISION NOT NULL
142);
143CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
144
145-- Future additive migrations go below this line. Postgres supports
146-- `ADD COLUMN IF NOT EXISTS` natively, so the pattern is simply:
147--
148-- ALTER TABLE workflows ADD COLUMN IF NOT EXISTS some_new_field TEXT;
149--
150-- Idempotent across startups; fresh installs pick the column up from the
151-- CREATE TABLE above so the ADD is a no-op. Currently no pending
152-- migrations — baseline schema in CREATE TABLE statements above is the
153-- source of truth through v0.11.3.
154"#;
155
156pub struct PostgresStore {
157 pool: PgPool,
158}
159
160impl PostgresStore {
161 pub async fn new(url: &str) -> Result<Self> {
162 let pool = PgPool::connect(url).await?;
163 let store = Self { pool };
164 store.migrate().await?;
165 Ok(store)
166 }
167
168 async fn migrate(&self) -> Result<()> {
169 for statement in SCHEMA.split(';') {
171 let trimmed = statement.trim();
172 if !trimmed.is_empty() {
173 sqlx::query(trimmed).execute(&self.pool).await?;
174 }
175 }
176 Ok(())
177 }
178
179 pub async fn try_acquire_leader_lock(&self) -> Result<bool> {
182 let row: (bool,) =
183 sqlx::query_as("SELECT pg_try_advisory_lock(1)")
184 .fetch_one(&self.pool)
185 .await?;
186 Ok(row.0)
187 }
188}
189
190impl WorkflowStore for PostgresStore {
191 async fn create_namespace(&self, name: &str) -> Result<()> {
194 sqlx::query("INSERT INTO namespaces (name, created_at) VALUES ($1, EXTRACT(EPOCH FROM NOW()))")
195 .bind(name)
196 .execute(&self.pool)
197 .await?;
198 Ok(())
199 }
200
201 async fn list_namespaces(&self) -> Result<Vec<crate::store::NamespaceRecord>> {
202 let rows = sqlx::query_as::<_, (String, f64)>(
203 "SELECT name, created_at FROM namespaces ORDER BY name",
204 )
205 .fetch_all(&self.pool)
206 .await?;
207 Ok(rows
208 .into_iter()
209 .map(|(name, created_at)| crate::store::NamespaceRecord { name, created_at })
210 .collect())
211 }
212
213 async fn delete_namespace(&self, name: &str) -> Result<bool> {
214 let res = sqlx::query("DELETE FROM namespaces WHERE name = $1 AND name != 'main'")
215 .bind(name)
216 .execute(&self.pool)
217 .await?;
218 Ok(res.rows_affected() > 0)
219 }
220
221 async fn get_namespace_stats(&self, namespace: &str) -> Result<crate::store::NamespaceStats> {
222 let total: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = $1")
223 .bind(namespace)
224 .fetch_one(&self.pool)
225 .await?;
226 let running: (i64,) = sqlx::query_as(
227 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'RUNNING'",
228 )
229 .bind(namespace)
230 .fetch_one(&self.pool)
231 .await?;
232 let pending: (i64,) = sqlx::query_as(
233 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'PENDING'",
234 )
235 .bind(namespace)
236 .fetch_one(&self.pool)
237 .await?;
238 let completed: (i64,) = sqlx::query_as(
239 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'COMPLETED'",
240 )
241 .bind(namespace)
242 .fetch_one(&self.pool)
243 .await?;
244 let failed: (i64,) = sqlx::query_as(
245 "SELECT COUNT(*) FROM workflows WHERE namespace = $1 AND status = 'FAILED'",
246 )
247 .bind(namespace)
248 .fetch_one(&self.pool)
249 .await?;
250 let schedules: (i64,) =
251 sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = $1")
252 .bind(namespace)
253 .fetch_one(&self.pool)
254 .await?;
255 let workers: (i64,) =
256 sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = $1")
257 .bind(namespace)
258 .fetch_one(&self.pool)
259 .await?;
260
261 Ok(crate::store::NamespaceStats {
262 namespace: namespace.to_string(),
263 total_workflows: total.0,
264 running: running.0,
265 pending: pending.0,
266 completed: completed.0,
267 failed: failed.0,
268 schedules: schedules.0,
269 workers: workers.0,
270 })
271 }
272
273 async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
276 sqlx::query(
277 "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at)
278 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)",
279 )
280 .bind(&wf.id)
281 .bind(&wf.namespace)
282 .bind(&wf.run_id)
283 .bind(&wf.workflow_type)
284 .bind(&wf.task_queue)
285 .bind(&wf.status)
286 .bind(&wf.input)
287 .bind(&wf.result)
288 .bind(&wf.error)
289 .bind(&wf.parent_id)
290 .bind(&wf.claimed_by)
291 .bind(&wf.search_attributes)
292 .bind(wf.archived_at)
293 .bind(&wf.archive_uri)
294 .bind(wf.created_at)
295 .bind(wf.updated_at)
296 .bind(wf.completed_at)
297 .execute(&self.pool)
298 .await?;
299 Ok(())
300 }
301
302 async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
303 let row = sqlx::query_as::<_, PgWorkflowRow>(
304 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at FROM workflows WHERE id = $1",
305 )
306 .bind(id)
307 .fetch_optional(&self.pool)
308 .await?;
309 Ok(row.map(Into::into))
310 }
311
312 async fn list_workflows(
313 &self,
314 namespace: &str,
315 status: Option<WorkflowStatus>,
316 workflow_type: Option<&str>,
317 search_attrs_filter: Option<&str>,
318 limit: i64,
319 offset: i64,
320 ) -> Result<Vec<WorkflowRecord>> {
321 let status_str = status.map(|s| s.to_string());
322
323 let filter_pairs: Vec<(String, serde_json::Value)> = search_attrs_filter
324 .and_then(|s| serde_json::from_str::<serde_json::Value>(s).ok())
325 .and_then(|v| v.as_object().cloned())
326 .map(|m| m.into_iter().collect())
327 .unwrap_or_default();
328
329 let mut sql = String::from(
330 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
331 FROM workflows
332 WHERE namespace = $1
333 AND ($2::TEXT IS NULL OR status = $2)
334 AND ($3::TEXT IS NULL OR workflow_type = $3)",
335 );
336 let mut idx = 4usize;
338 for _ in &filter_pairs {
339 sql.push_str(&format!(
340 " AND (search_attributes::jsonb)->>${} = ${}",
341 idx,
342 idx + 1
343 ));
344 idx += 2;
345 }
346 sql.push_str(&format!(" ORDER BY created_at DESC LIMIT ${} OFFSET ${}", idx, idx + 1));
347
348 let mut q = sqlx::query_as::<_, PgWorkflowRow>(&sql)
349 .bind(namespace)
350 .bind(&status_str)
351 .bind(workflow_type);
352 for (key, value) in &filter_pairs {
353 q = q.bind(key.clone());
354 let as_text = match value {
356 serde_json::Value::String(s) => s.clone(),
357 other => other.to_string(),
358 };
359 q = q.bind(as_text);
360 }
361 let rows = q.bind(limit).bind(offset).fetch_all(&self.pool).await?;
362 Ok(rows.into_iter().map(Into::into).collect())
363 }
364
365 async fn update_workflow_status(
366 &self,
367 id: &str,
368 status: WorkflowStatus,
369 result: Option<&str>,
370 error: Option<&str>,
371 ) -> Result<()> {
372 let now = timestamp_now();
373 let completed_at = if status.is_terminal() { Some(now) } else { None };
374 sqlx::query(
375 "UPDATE workflows SET status = $1, result = COALESCE($2, result), error = COALESCE($3, error), updated_at = $4, completed_at = COALESCE($5, completed_at) WHERE id = $6",
376 )
377 .bind(status.to_string())
378 .bind(result)
379 .bind(error)
380 .bind(now)
381 .bind(completed_at)
382 .bind(id)
383 .execute(&self.pool)
384 .await?;
385 Ok(())
386 }
387
388 async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
389 let res = sqlx::query(
390 "UPDATE workflows SET claimed_by = $1, status = 'RUNNING', updated_at = $2 WHERE id = $3 AND claimed_by IS NULL",
391 )
392 .bind(worker_id)
393 .bind(timestamp_now())
394 .bind(id)
395 .execute(&self.pool)
396 .await?;
397 Ok(res.rows_affected() > 0)
398 }
399
400 async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
401 sqlx::query("UPDATE workflows SET needs_dispatch = TRUE WHERE id = $1")
402 .bind(workflow_id)
403 .execute(&self.pool)
404 .await?;
405 Ok(())
406 }
407
408 async fn claim_workflow_task(
409 &self,
410 task_queue: &str,
411 worker_id: &str,
412 ) -> Result<Option<WorkflowRecord>> {
413 let now = timestamp_now();
414 let row = sqlx::query_as::<_, PgWorkflowRow>(
417 "UPDATE workflows
418 SET dispatch_claimed_by = $1, dispatch_last_heartbeat = $2, needs_dispatch = FALSE
419 WHERE id = (
420 SELECT id FROM workflows
421 WHERE task_queue = $3
422 AND needs_dispatch = TRUE
423 AND dispatch_claimed_by IS NULL
424 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
425 ORDER BY updated_at ASC
426 FOR UPDATE SKIP LOCKED
427 LIMIT 1
428 )
429 RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at",
430 )
431 .bind(worker_id)
432 .bind(now)
433 .bind(task_queue)
434 .fetch_optional(&self.pool)
435 .await?;
436 Ok(row.map(Into::into))
437 }
438
439 async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
440 sqlx::query(
441 "UPDATE workflows
442 SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
443 WHERE id = $1 AND dispatch_claimed_by = $2",
444 )
445 .bind(workflow_id)
446 .bind(worker_id)
447 .execute(&self.pool)
448 .await?;
449 Ok(())
450 }
451
452 async fn release_stale_dispatch_leases(
453 &self,
454 now: f64,
455 timeout_secs: f64,
456 ) -> Result<u64> {
457 let res = sqlx::query(
458 "UPDATE workflows
459 SET dispatch_claimed_by = NULL,
460 dispatch_last_heartbeat = NULL,
461 needs_dispatch = TRUE
462 WHERE dispatch_claimed_by IS NOT NULL
463 AND ($1 - dispatch_last_heartbeat) > $2
464 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
465 )
466 .bind(now)
467 .bind(timeout_secs)
468 .execute(&self.pool)
469 .await?;
470 Ok(res.rows_affected())
471 }
472
473 async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
476 let row: (i64,) = sqlx::query_as(
477 "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES ($1, $2, $3, $4, $5) RETURNING id",
478 )
479 .bind(&ev.workflow_id)
480 .bind(ev.seq)
481 .bind(&ev.event_type)
482 .bind(&ev.payload)
483 .bind(ev.timestamp)
484 .fetch_one(&self.pool)
485 .await?;
486 Ok(row.0)
487 }
488
489 async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
490 let rows = sqlx::query_as::<_, PgEventRow>(
491 "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = $1 ORDER BY seq ASC",
492 )
493 .bind(workflow_id)
494 .fetch_all(&self.pool)
495 .await?;
496 Ok(rows.into_iter().map(Into::into).collect())
497 }
498
499 async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
500 let row: (i64,) =
501 sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = $1")
502 .bind(workflow_id)
503 .fetch_one(&self.pool)
504 .await?;
505 Ok(row.0)
506 }
507
508 async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
511 let row: (i64,) = sqlx::query_as(
512 "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
513 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) RETURNING id",
514 )
515 .bind(&act.workflow_id)
516 .bind(act.seq)
517 .bind(&act.name)
518 .bind(&act.task_queue)
519 .bind(&act.input)
520 .bind(&act.status)
521 .bind(act.attempt)
522 .bind(act.max_attempts)
523 .bind(act.initial_interval_secs)
524 .bind(act.backoff_coefficient)
525 .bind(act.start_to_close_secs)
526 .bind(act.heartbeat_timeout_secs)
527 .bind(act.scheduled_at)
528 .fetch_one(&self.pool)
529 .await?;
530 Ok(row.0)
531 }
532
533 async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
534 let row = sqlx::query_as::<_, PgActivityRow>(
535 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
536 FROM workflow_activities WHERE id = $1",
537 )
538 .bind(id)
539 .fetch_optional(&self.pool)
540 .await?;
541 Ok(row.map(Into::into))
542 }
543
544 async fn get_activity_by_workflow_seq(
545 &self,
546 workflow_id: &str,
547 seq: i32,
548 ) -> Result<Option<WorkflowActivity>> {
549 let row = sqlx::query_as::<_, PgActivityRow>(
550 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
551 FROM workflow_activities WHERE workflow_id = $1 AND seq = $2",
552 )
553 .bind(workflow_id)
554 .bind(seq)
555 .fetch_optional(&self.pool)
556 .await?;
557 Ok(row.map(Into::into))
558 }
559
560 async fn claim_activity(
561 &self,
562 task_queue: &str,
563 worker_id: &str,
564 ) -> Result<Option<WorkflowActivity>> {
565 let now = timestamp_now();
566 let row = sqlx::query_as::<_, PgActivityRow>(
569 "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = $1, started_at = $2
570 WHERE id = (
571 SELECT id FROM workflow_activities
572 WHERE task_queue = $3 AND status = 'PENDING'
573 ORDER BY scheduled_at ASC
574 FOR UPDATE SKIP LOCKED
575 LIMIT 1
576 )
577 RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
578 )
579 .bind(worker_id)
580 .bind(now)
581 .bind(task_queue)
582 .fetch_optional(&self.pool)
583 .await?;
584 Ok(row.map(Into::into))
585 }
586
587 async fn requeue_activity_for_retry(
588 &self,
589 id: i64,
590 next_attempt: i32,
591 next_scheduled_at: f64,
592 ) -> Result<()> {
593 sqlx::query(
594 "UPDATE workflow_activities
595 SET status = 'PENDING', attempt = $1, scheduled_at = $2,
596 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
597 error = NULL
598 WHERE id = $3",
599 )
600 .bind(next_attempt)
601 .bind(next_scheduled_at)
602 .bind(id)
603 .execute(&self.pool)
604 .await?;
605 Ok(())
606 }
607
608 async fn complete_activity(
609 &self,
610 id: i64,
611 result: Option<&str>,
612 error: Option<&str>,
613 failed: bool,
614 ) -> Result<()> {
615 let status = if failed { "FAILED" } else { "COMPLETED" };
616 sqlx::query(
617 "UPDATE workflow_activities SET status = $1, result = $2, error = $3, completed_at = $4 WHERE id = $5",
618 )
619 .bind(status)
620 .bind(result)
621 .bind(error)
622 .bind(timestamp_now())
623 .bind(id)
624 .execute(&self.pool)
625 .await?;
626 Ok(())
627 }
628
629 async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
630 sqlx::query("UPDATE workflow_activities SET last_heartbeat = $1 WHERE id = $2")
631 .bind(timestamp_now())
632 .bind(id)
633 .execute(&self.pool)
634 .await?;
635 Ok(())
636 }
637
638 async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
639 let rows = sqlx::query_as::<_, PgActivityRow>(
640 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
641 FROM workflow_activities
642 WHERE status = 'RUNNING'
643 AND heartbeat_timeout_secs IS NOT NULL
644 AND last_heartbeat IS NOT NULL
645 AND ($1 - last_heartbeat) > heartbeat_timeout_secs",
646 )
647 .bind(now)
648 .fetch_all(&self.pool)
649 .await?;
650 Ok(rows.into_iter().map(Into::into).collect())
651 }
652
653 async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
656 let row: (i64,) = sqlx::query_as(
657 "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES ($1, $2, $3, FALSE) RETURNING id",
658 )
659 .bind(&timer.workflow_id)
660 .bind(timer.seq)
661 .bind(timer.fire_at)
662 .fetch_one(&self.pool)
663 .await?;
664 Ok(row.0)
665 }
666
667 async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
668 let res = sqlx::query(
669 "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = $1
670 WHERE workflow_id = $2 AND status = 'PENDING'",
671 )
672 .bind(timestamp_now())
673 .bind(workflow_id)
674 .execute(&self.pool)
675 .await?;
676 Ok(res.rows_affected())
677 }
678
679 async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
680 let res = sqlx::query(
681 "UPDATE workflow_timers SET fired = TRUE
682 WHERE workflow_id = $1 AND fired = FALSE",
683 )
684 .bind(workflow_id)
685 .execute(&self.pool)
686 .await?;
687 Ok(res.rows_affected())
688 }
689
690 async fn get_timer_by_workflow_seq(
691 &self,
692 workflow_id: &str,
693 seq: i32,
694 ) -> Result<Option<WorkflowTimer>> {
695 let row = sqlx::query_as::<_, PgTimerRow>(
696 "SELECT id, workflow_id, seq, fire_at, fired
697 FROM workflow_timers WHERE workflow_id = $1 AND seq = $2",
698 )
699 .bind(workflow_id)
700 .bind(seq)
701 .fetch_optional(&self.pool)
702 .await?;
703 Ok(row.map(Into::into))
704 }
705
706 async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
707 let rows = sqlx::query_as::<_, PgTimerRow>(
708 "UPDATE workflow_timers SET fired = TRUE
709 WHERE fired = FALSE AND fire_at <= $1
710 RETURNING id, workflow_id, seq, fire_at, fired",
711 )
712 .bind(now)
713 .fetch_all(&self.pool)
714 .await?;
715 Ok(rows.into_iter().map(Into::into).collect())
716 }
717
718 async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
721 let row: (i64,) = sqlx::query_as(
722 "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES ($1, $2, $3, FALSE, $4) RETURNING id",
723 )
724 .bind(&sig.workflow_id)
725 .bind(&sig.name)
726 .bind(&sig.payload)
727 .bind(sig.received_at)
728 .fetch_one(&self.pool)
729 .await?;
730 Ok(row.0)
731 }
732
733 async fn consume_signals(
734 &self,
735 workflow_id: &str,
736 name: &str,
737 ) -> Result<Vec<WorkflowSignal>> {
738 let rows = sqlx::query_as::<_, PgSignalRow>(
739 "UPDATE workflow_signals SET consumed = TRUE
740 WHERE workflow_id = $1 AND name = $2 AND consumed = FALSE
741 RETURNING id, workflow_id, name, payload, consumed, received_at",
742 )
743 .bind(workflow_id)
744 .bind(name)
745 .fetch_all(&self.pool)
746 .await?;
747 Ok(rows.into_iter().map(Into::into).collect())
748 }
749
750 async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
753 sqlx::query(
754 "INSERT INTO workflow_schedules (namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
755 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
756 )
757 .bind(&sched.namespace)
758 .bind(&sched.name)
759 .bind(&sched.workflow_type)
760 .bind(&sched.cron_expr)
761 .bind(&sched.timezone)
762 .bind(&sched.input)
763 .bind(&sched.task_queue)
764 .bind(&sched.overlap_policy)
765 .bind(sched.paused)
766 .bind(sched.last_run_at)
767 .bind(sched.next_run_at)
768 .bind(&sched.last_workflow_id)
769 .bind(sched.created_at)
770 .execute(&self.pool)
771 .await?;
772 Ok(())
773 }
774
775 async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
776 let row = sqlx::query_as::<_, PgScheduleRow>(
777 "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 AND name = $2",
778 )
779 .bind(namespace)
780 .bind(name)
781 .fetch_optional(&self.pool)
782 .await?;
783 Ok(row.map(Into::into))
784 }
785
786 async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
787 let rows = sqlx::query_as::<_, PgScheduleRow>(
788 "SELECT namespace, name, workflow_type, cron_expr, timezone, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at FROM workflow_schedules WHERE namespace = $1 ORDER BY name",
789 )
790 .bind(namespace)
791 .fetch_all(&self.pool)
792 .await?;
793 Ok(rows.into_iter().map(Into::into).collect())
794 }
795
796 async fn update_schedule_last_run(
797 &self,
798 namespace: &str,
799 name: &str,
800 last_run_at: f64,
801 next_run_at: f64,
802 workflow_id: &str,
803 ) -> Result<()> {
804 sqlx::query(
805 "UPDATE workflow_schedules SET last_run_at = $1, next_run_at = $2, last_workflow_id = $3 WHERE namespace = $4 AND name = $5",
806 )
807 .bind(last_run_at)
808 .bind(next_run_at)
809 .bind(workflow_id)
810 .bind(namespace)
811 .bind(name)
812 .execute(&self.pool)
813 .await?;
814 Ok(())
815 }
816
817 async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
818 let res = sqlx::query("DELETE FROM workflow_schedules WHERE namespace = $1 AND name = $2")
819 .bind(namespace)
820 .bind(name)
821 .execute(&self.pool)
822 .await?;
823 Ok(res.rows_affected() > 0)
824 }
825
826 async fn list_archivable_workflows(
827 &self,
828 cutoff: f64,
829 limit: i64,
830 ) -> Result<Vec<WorkflowRecord>> {
831 let rows = sqlx::query_as::<_, PgWorkflowRow>(
832 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
833 FROM workflows
834 WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
835 AND completed_at IS NOT NULL
836 AND completed_at < $1
837 AND archived_at IS NULL
838 ORDER BY completed_at ASC
839 LIMIT $2",
840 )
841 .bind(cutoff)
842 .bind(limit)
843 .fetch_all(&self.pool)
844 .await?;
845 Ok(rows.into_iter().map(Into::into).collect())
846 }
847
848 async fn mark_archived_and_purge(
849 &self,
850 workflow_id: &str,
851 archive_uri: &str,
852 archived_at: f64,
853 ) -> Result<()> {
854 let mut tx = self.pool.begin().await?;
855 sqlx::query("DELETE FROM workflow_events WHERE workflow_id = $1")
856 .bind(workflow_id)
857 .execute(&mut *tx)
858 .await?;
859 sqlx::query("DELETE FROM workflow_activities WHERE workflow_id = $1")
860 .bind(workflow_id)
861 .execute(&mut *tx)
862 .await?;
863 sqlx::query("DELETE FROM workflow_timers WHERE workflow_id = $1")
864 .bind(workflow_id)
865 .execute(&mut *tx)
866 .await?;
867 sqlx::query("DELETE FROM workflow_signals WHERE workflow_id = $1")
868 .bind(workflow_id)
869 .execute(&mut *tx)
870 .await?;
871 sqlx::query("DELETE FROM workflow_snapshots WHERE workflow_id = $1")
872 .bind(workflow_id)
873 .execute(&mut *tx)
874 .await?;
875 sqlx::query(
876 "UPDATE workflows SET archived_at = $1, archive_uri = $2 WHERE id = $3",
877 )
878 .bind(archived_at)
879 .bind(archive_uri)
880 .bind(workflow_id)
881 .execute(&mut *tx)
882 .await?;
883 tx.commit().await?;
884 Ok(())
885 }
886
887 async fn upsert_search_attributes(
888 &self,
889 workflow_id: &str,
890 patch_json: &str,
891 ) -> Result<()> {
892 let current: Option<(Option<String>,)> =
893 sqlx::query_as("SELECT search_attributes FROM workflows WHERE id = $1")
894 .bind(workflow_id)
895 .fetch_optional(&self.pool)
896 .await?;
897 let merged = crate::store::sqlite::merge_search_attrs(
898 current.and_then(|(s,)| s).as_deref(),
899 patch_json,
900 )?;
901 sqlx::query("UPDATE workflows SET search_attributes = $1 WHERE id = $2")
902 .bind(merged)
903 .bind(workflow_id)
904 .execute(&self.pool)
905 .await?;
906 Ok(())
907 }
908
909 async fn update_schedule(
910 &self,
911 namespace: &str,
912 name: &str,
913 patch: &SchedulePatch,
914 ) -> Result<Option<WorkflowSchedule>> {
915 let mut sets: Vec<String> = Vec::new();
916 let mut idx = 1usize;
917 if patch.cron_expr.is_some() {
918 sets.push(format!("cron_expr = ${idx}"));
919 idx += 1;
920 }
921 if patch.timezone.is_some() {
922 sets.push(format!("timezone = ${idx}"));
923 idx += 1;
924 }
925 if patch.input.is_some() {
926 sets.push(format!("input = ${idx}"));
927 idx += 1;
928 }
929 if patch.task_queue.is_some() {
930 sets.push(format!("task_queue = ${idx}"));
931 idx += 1;
932 }
933 if patch.overlap_policy.is_some() {
934 sets.push(format!("overlap_policy = ${idx}"));
935 idx += 1;
936 }
937 if sets.is_empty() {
938 return self.get_schedule(namespace, name).await;
939 }
940 let sql = format!(
941 "UPDATE workflow_schedules SET {} WHERE namespace = ${} AND name = ${}",
942 sets.join(", "),
943 idx,
944 idx + 1
945 );
946 let mut q = sqlx::query(&sql);
947 if let Some(ref v) = patch.cron_expr {
948 q = q.bind(v);
949 }
950 if let Some(ref v) = patch.timezone {
951 q = q.bind(v);
952 }
953 if let Some(ref v) = patch.input {
954 q = q.bind(v.to_string());
955 }
956 if let Some(ref v) = patch.task_queue {
957 q = q.bind(v);
958 }
959 if let Some(ref v) = patch.overlap_policy {
960 q = q.bind(v);
961 }
962 let res = q
963 .bind(namespace)
964 .bind(name)
965 .execute(&self.pool)
966 .await?;
967 if res.rows_affected() == 0 {
968 return Ok(None);
969 }
970 self.get_schedule(namespace, name).await
971 }
972
973 async fn set_schedule_paused(
974 &self,
975 namespace: &str,
976 name: &str,
977 paused: bool,
978 ) -> Result<Option<WorkflowSchedule>> {
979 let res = sqlx::query(
980 "UPDATE workflow_schedules SET paused = $1 WHERE namespace = $2 AND name = $3",
981 )
982 .bind(paused)
983 .bind(namespace)
984 .bind(name)
985 .execute(&self.pool)
986 .await?;
987 if res.rows_affected() == 0 {
988 return Ok(None);
989 }
990 self.get_schedule(namespace, name).await
991 }
992
993 async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
996 sqlx::query(
997 "INSERT INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
998 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
999 ON CONFLICT (id) DO UPDATE SET last_heartbeat = EXCLUDED.last_heartbeat, identity = EXCLUDED.identity",
1000 )
1001 .bind(&w.id)
1002 .bind(&w.namespace)
1003 .bind(&w.identity)
1004 .bind(&w.task_queue)
1005 .bind(&w.workflows)
1006 .bind(&w.activities)
1007 .bind(w.max_concurrent_workflows)
1008 .bind(w.max_concurrent_activities)
1009 .bind(w.active_tasks)
1010 .bind(w.last_heartbeat)
1011 .bind(w.registered_at)
1012 .execute(&self.pool)
1013 .await?;
1014 Ok(())
1015 }
1016
1017 async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
1018 sqlx::query("UPDATE workflow_workers SET last_heartbeat = $1 WHERE id = $2")
1019 .bind(now)
1020 .bind(id)
1021 .execute(&self.pool)
1022 .await?;
1023 Ok(())
1024 }
1025
1026 async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
1027 let rows = sqlx::query_as::<_, PgWorkerRow>(
1028 "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at FROM workflow_workers WHERE namespace = $1 ORDER BY registered_at",
1029 )
1030 .bind(namespace)
1031 .fetch_all(&self.pool)
1032 .await?;
1033 Ok(rows.into_iter().map(Into::into).collect())
1034 }
1035
1036 async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
1037 let rows: Vec<(String,)> =
1038 sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < $1")
1039 .bind(cutoff)
1040 .fetch_all(&self.pool)
1041 .await?;
1042 let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
1043 if !ids.is_empty() {
1044 sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < $1")
1045 .bind(cutoff)
1046 .execute(&self.pool)
1047 .await?;
1048 }
1049 Ok(ids)
1050 }
1051
1052 async fn create_api_key(
1055 &self,
1056 key_hash: &str,
1057 prefix: &str,
1058 label: Option<&str>,
1059 created_at: f64,
1060 ) -> Result<()> {
1061 sqlx::query("INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES ($1, $2, $3, $4)")
1062 .bind(key_hash)
1063 .bind(prefix)
1064 .bind(label)
1065 .bind(created_at)
1066 .execute(&self.pool)
1067 .await?;
1068 Ok(())
1069 }
1070
1071 async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
1072 let row: Option<(i64,)> =
1073 sqlx::query_as("SELECT 1::BIGINT FROM api_keys WHERE key_hash = $1")
1074 .bind(key_hash)
1075 .fetch_optional(&self.pool)
1076 .await?;
1077 Ok(row.is_some())
1078 }
1079
1080 async fn list_api_keys(&self) -> Result<Vec<crate::store::ApiKeyRecord>> {
1081 let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
1082 "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
1083 )
1084 .fetch_all(&self.pool)
1085 .await?;
1086 Ok(rows
1087 .into_iter()
1088 .map(|(prefix, label, created_at)| crate::store::ApiKeyRecord {
1089 prefix,
1090 label,
1091 created_at,
1092 })
1093 .collect())
1094 }
1095
1096 async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
1097 let res = sqlx::query("DELETE FROM api_keys WHERE prefix = $1")
1098 .bind(prefix)
1099 .execute(&self.pool)
1100 .await?;
1101 Ok(res.rows_affected() > 0)
1102 }
1103
1104 async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1107 let rows = sqlx::query_as::<_, PgWorkflowRow>(
1108 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, search_attributes, archived_at, archive_uri, created_at, updated_at, completed_at
1109 FROM workflows WHERE parent_id = $1 ORDER BY created_at ASC",
1110 )
1111 .bind(parent_id)
1112 .fetch_all(&self.pool)
1113 .await?;
1114 Ok(rows.into_iter().map(Into::into).collect())
1115 }
1116
1117 async fn create_snapshot(
1120 &self,
1121 workflow_id: &str,
1122 event_seq: i32,
1123 state_json: &str,
1124 ) -> Result<()> {
1125 sqlx::query(
1126 "INSERT INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1127 VALUES ($1, $2, $3, $4)
1128 ON CONFLICT (workflow_id, event_seq) DO UPDATE SET state_json = EXCLUDED.state_json, created_at = EXCLUDED.created_at",
1129 )
1130 .bind(workflow_id)
1131 .bind(event_seq)
1132 .bind(state_json)
1133 .bind(timestamp_now())
1134 .execute(&self.pool)
1135 .await?;
1136 Ok(())
1137 }
1138
1139 async fn get_latest_snapshot(
1140 &self,
1141 workflow_id: &str,
1142 ) -> Result<Option<WorkflowSnapshot>> {
1143 let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1144 "SELECT workflow_id, event_seq, state_json, created_at
1145 FROM workflow_snapshots WHERE workflow_id = $1
1146 ORDER BY event_seq DESC LIMIT 1",
1147 )
1148 .bind(workflow_id)
1149 .fetch_optional(&self.pool)
1150 .await?;
1151
1152 Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1153 workflow_id,
1154 event_seq,
1155 state_json,
1156 created_at,
1157 }))
1158 }
1159
1160 async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<crate::store::QueueStats>> {
1163 let rows = sqlx::query_as::<_, (String, i64, i64, i64)>(
1164 "SELECT
1165 a.task_queue AS queue,
1166 SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END) AS pending,
1167 SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END) AS running,
1168 (SELECT COUNT(*) FROM workflow_workers w WHERE w.task_queue = a.task_queue AND w.namespace = $1) AS workers
1169 FROM workflow_activities a
1170 JOIN workflows wf ON a.workflow_id = wf.id AND wf.namespace = $1
1171 GROUP BY a.task_queue",
1172 )
1173 .bind(namespace)
1174 .fetch_all(&self.pool)
1175 .await?;
1176
1177 Ok(rows
1178 .into_iter()
1179 .map(|(queue, pending, running, workers)| crate::store::QueueStats {
1180 queue,
1181 pending_activities: pending,
1182 running_activities: running,
1183 workers,
1184 })
1185 .collect())
1186 }
1187
1188 async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1191 let row: (bool,) =
1195 sqlx::query_as("SELECT pg_try_advisory_lock(42)")
1196 .fetch_one(&self.pool)
1197 .await?;
1198 Ok(row.0)
1199 }
1200}
1201
1202fn timestamp_now() -> f64 {
1203 std::time::SystemTime::now()
1204 .duration_since(std::time::UNIX_EPOCH)
1205 .unwrap()
1206 .as_secs_f64()
1207}
1208
1209#[derive(sqlx::FromRow)]
1212struct PgWorkflowRow {
1213 id: String,
1214 namespace: String,
1215 run_id: String,
1216 workflow_type: String,
1217 task_queue: String,
1218 status: String,
1219 input: Option<String>,
1220 result: Option<String>,
1221 error: Option<String>,
1222 parent_id: Option<String>,
1223 claimed_by: Option<String>,
1224 search_attributes: Option<String>,
1225 archived_at: Option<f64>,
1226 archive_uri: Option<String>,
1227 created_at: f64,
1228 updated_at: f64,
1229 completed_at: Option<f64>,
1230}
1231
1232impl From<PgWorkflowRow> for WorkflowRecord {
1233 fn from(r: PgWorkflowRow) -> Self {
1234 Self {
1235 id: r.id,
1236 namespace: r.namespace,
1237 run_id: r.run_id,
1238 workflow_type: r.workflow_type,
1239 task_queue: r.task_queue,
1240 status: r.status,
1241 input: r.input,
1242 result: r.result,
1243 error: r.error,
1244 parent_id: r.parent_id,
1245 claimed_by: r.claimed_by,
1246 search_attributes: r.search_attributes,
1247 archived_at: r.archived_at,
1248 archive_uri: r.archive_uri,
1249 created_at: r.created_at,
1250 updated_at: r.updated_at,
1251 completed_at: r.completed_at,
1252 }
1253 }
1254}
1255
1256#[derive(sqlx::FromRow)]
1257struct PgEventRow {
1258 id: i64,
1259 workflow_id: String,
1260 seq: i32,
1261 event_type: String,
1262 payload: Option<String>,
1263 timestamp: f64,
1264}
1265
1266impl From<PgEventRow> for WorkflowEvent {
1267 fn from(r: PgEventRow) -> Self {
1268 Self {
1269 id: Some(r.id),
1270 workflow_id: r.workflow_id,
1271 seq: r.seq,
1272 event_type: r.event_type,
1273 payload: r.payload,
1274 timestamp: r.timestamp,
1275 }
1276 }
1277}
1278
1279#[derive(sqlx::FromRow)]
1280struct PgActivityRow {
1281 id: i64,
1282 workflow_id: String,
1283 seq: i32,
1284 name: String,
1285 task_queue: String,
1286 input: Option<String>,
1287 status: String,
1288 result: Option<String>,
1289 error: Option<String>,
1290 attempt: i32,
1291 max_attempts: i32,
1292 initial_interval_secs: f64,
1293 backoff_coefficient: f64,
1294 start_to_close_secs: f64,
1295 heartbeat_timeout_secs: Option<f64>,
1296 claimed_by: Option<String>,
1297 scheduled_at: f64,
1298 started_at: Option<f64>,
1299 completed_at: Option<f64>,
1300 last_heartbeat: Option<f64>,
1301}
1302
1303impl From<PgActivityRow> for WorkflowActivity {
1304 fn from(r: PgActivityRow) -> Self {
1305 Self {
1306 id: Some(r.id),
1307 workflow_id: r.workflow_id,
1308 seq: r.seq,
1309 name: r.name,
1310 task_queue: r.task_queue,
1311 input: r.input,
1312 status: r.status,
1313 result: r.result,
1314 error: r.error,
1315 attempt: r.attempt,
1316 max_attempts: r.max_attempts,
1317 initial_interval_secs: r.initial_interval_secs,
1318 backoff_coefficient: r.backoff_coefficient,
1319 start_to_close_secs: r.start_to_close_secs,
1320 heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1321 claimed_by: r.claimed_by,
1322 scheduled_at: r.scheduled_at,
1323 started_at: r.started_at,
1324 completed_at: r.completed_at,
1325 last_heartbeat: r.last_heartbeat,
1326 }
1327 }
1328}
1329
1330#[derive(sqlx::FromRow)]
1331struct PgTimerRow {
1332 id: i64,
1333 workflow_id: String,
1334 seq: i32,
1335 fire_at: f64,
1336 fired: bool,
1337}
1338
1339impl From<PgTimerRow> for WorkflowTimer {
1340 fn from(r: PgTimerRow) -> Self {
1341 Self {
1342 id: Some(r.id),
1343 workflow_id: r.workflow_id,
1344 seq: r.seq,
1345 fire_at: r.fire_at,
1346 fired: r.fired,
1347 }
1348 }
1349}
1350
1351#[derive(sqlx::FromRow)]
1352struct PgSignalRow {
1353 id: i64,
1354 workflow_id: String,
1355 name: String,
1356 payload: Option<String>,
1357 consumed: bool,
1358 received_at: f64,
1359}
1360
1361impl From<PgSignalRow> for WorkflowSignal {
1362 fn from(r: PgSignalRow) -> Self {
1363 Self {
1364 id: Some(r.id),
1365 workflow_id: r.workflow_id,
1366 name: r.name,
1367 payload: r.payload,
1368 consumed: r.consumed,
1369 received_at: r.received_at,
1370 }
1371 }
1372}
1373
1374#[derive(sqlx::FromRow)]
1375struct PgScheduleRow {
1376 namespace: String,
1377 name: String,
1378 workflow_type: String,
1379 cron_expr: String,
1380 timezone: String,
1381 input: Option<String>,
1382 task_queue: String,
1383 overlap_policy: String,
1384 paused: bool,
1385 last_run_at: Option<f64>,
1386 next_run_at: Option<f64>,
1387 last_workflow_id: Option<String>,
1388 created_at: f64,
1389}
1390
1391impl From<PgScheduleRow> for WorkflowSchedule {
1392 fn from(r: PgScheduleRow) -> Self {
1393 Self {
1394 namespace: r.namespace,
1395 name: r.name,
1396 workflow_type: r.workflow_type,
1397 cron_expr: r.cron_expr,
1398 timezone: r.timezone,
1399 input: r.input,
1400 task_queue: r.task_queue,
1401 overlap_policy: r.overlap_policy,
1402 paused: r.paused,
1403 last_run_at: r.last_run_at,
1404 next_run_at: r.next_run_at,
1405 last_workflow_id: r.last_workflow_id,
1406 created_at: r.created_at,
1407 }
1408 }
1409}
1410
1411#[derive(sqlx::FromRow)]
1412struct PgWorkerRow {
1413 id: String,
1414 namespace: String,
1415 identity: String,
1416 task_queue: String,
1417 workflows: Option<String>,
1418 activities: Option<String>,
1419 max_concurrent_workflows: i32,
1420 max_concurrent_activities: i32,
1421 active_tasks: i32,
1422 last_heartbeat: f64,
1423 registered_at: f64,
1424}
1425
1426impl From<PgWorkerRow> for WorkflowWorker {
1427 fn from(r: PgWorkerRow) -> Self {
1428 Self {
1429 id: r.id,
1430 namespace: r.namespace,
1431 identity: r.identity,
1432 task_queue: r.task_queue,
1433 workflows: r.workflows,
1434 activities: r.activities,
1435 max_concurrent_workflows: r.max_concurrent_workflows,
1436 max_concurrent_activities: r.max_concurrent_activities,
1437 active_tasks: r.active_tasks,
1438 last_heartbeat: r.last_heartbeat,
1439 registered_at: r.registered_at,
1440 }
1441 }
1442}