1use anyhow::Result;
2use sqlx::SqlitePool;
3
4use crate::store::{ApiKeyRecord, NamespaceRecord, NamespaceStats, QueueStats, WorkflowStore};
5use crate::types::*;
6
7const SCHEMA: &str = r#"
8CREATE TABLE IF NOT EXISTS namespaces (
9 name TEXT PRIMARY KEY,
10 created_at REAL NOT NULL
11);
12
13INSERT OR IGNORE INTO namespaces (name, created_at)
14 VALUES ('main', strftime('%s', 'now'));
15
16CREATE TABLE IF NOT EXISTS workflows (
17 id TEXT PRIMARY KEY,
18 namespace TEXT NOT NULL DEFAULT 'main',
19 run_id TEXT NOT NULL,
20 workflow_type TEXT NOT NULL,
21 task_queue TEXT NOT NULL DEFAULT 'main',
22 status TEXT NOT NULL DEFAULT 'PENDING',
23 input TEXT,
24 result TEXT,
25 error TEXT,
26 parent_id TEXT,
27 claimed_by TEXT,
28 -- Workflow-task dispatch (Phase 9): a workflow is "dispatchable" when
29 -- it has new events a worker needs to replay against. Set true on
30 -- start, on activity completion, on timer fire, on signal arrival.
31 -- Cleared when a worker claims the dispatch lease.
32 needs_dispatch INTEGER NOT NULL DEFAULT 0,
33 dispatch_claimed_by TEXT,
34 dispatch_last_heartbeat REAL,
35 created_at REAL NOT NULL,
36 updated_at REAL NOT NULL,
37 completed_at REAL
38);
39CREATE INDEX IF NOT EXISTS idx_wf_status_queue ON workflows(status, task_queue);
40CREATE INDEX IF NOT EXISTS idx_wf_namespace ON workflows(namespace);
41CREATE INDEX IF NOT EXISTS idx_wf_dispatch ON workflows(task_queue, needs_dispatch, dispatch_claimed_by);
42
43CREATE TABLE IF NOT EXISTS workflow_events (
44 id INTEGER PRIMARY KEY AUTOINCREMENT,
45 workflow_id TEXT NOT NULL REFERENCES workflows(id),
46 seq INTEGER NOT NULL,
47 event_type TEXT NOT NULL,
48 payload TEXT,
49 timestamp REAL NOT NULL
50);
51CREATE INDEX IF NOT EXISTS idx_wf_events_lookup ON workflow_events(workflow_id, seq);
52
53CREATE TABLE IF NOT EXISTS workflow_activities (
54 id INTEGER PRIMARY KEY AUTOINCREMENT,
55 workflow_id TEXT NOT NULL REFERENCES workflows(id),
56 seq INTEGER NOT NULL,
57 name TEXT NOT NULL,
58 task_queue TEXT NOT NULL DEFAULT 'main',
59 input TEXT,
60 status TEXT NOT NULL DEFAULT 'PENDING',
61 result TEXT,
62 error TEXT,
63 attempt INTEGER NOT NULL DEFAULT 1,
64 max_attempts INTEGER NOT NULL DEFAULT 3,
65 initial_interval_secs REAL NOT NULL DEFAULT 1,
66 backoff_coefficient REAL NOT NULL DEFAULT 2,
67 start_to_close_secs REAL NOT NULL DEFAULT 300,
68 heartbeat_timeout_secs REAL,
69 claimed_by TEXT,
70 scheduled_at REAL NOT NULL,
71 started_at REAL,
72 completed_at REAL,
73 last_heartbeat REAL,
74 UNIQUE (workflow_id, seq)
75);
76CREATE INDEX IF NOT EXISTS idx_wf_act_pending ON workflow_activities(task_queue, status, scheduled_at);
77
78CREATE TABLE IF NOT EXISTS workflow_timers (
79 id INTEGER PRIMARY KEY AUTOINCREMENT,
80 workflow_id TEXT NOT NULL REFERENCES workflows(id),
81 seq INTEGER NOT NULL,
82 fire_at REAL NOT NULL,
83 fired INTEGER NOT NULL DEFAULT 0,
84 UNIQUE (workflow_id, seq)
85);
86CREATE INDEX IF NOT EXISTS idx_wf_timers_due ON workflow_timers(fire_at);
87
88CREATE TABLE IF NOT EXISTS workflow_signals (
89 id INTEGER PRIMARY KEY AUTOINCREMENT,
90 workflow_id TEXT NOT NULL REFERENCES workflows(id),
91 name TEXT NOT NULL,
92 payload TEXT,
93 consumed INTEGER NOT NULL DEFAULT 0,
94 received_at REAL NOT NULL
95);
96CREATE INDEX IF NOT EXISTS idx_wf_signals_lookup ON workflow_signals(workflow_id, name, consumed);
97
98CREATE TABLE IF NOT EXISTS workflow_schedules (
99 name TEXT NOT NULL,
100 namespace TEXT NOT NULL DEFAULT 'main',
101 workflow_type TEXT NOT NULL,
102 cron_expr TEXT NOT NULL,
103 input TEXT,
104 task_queue TEXT NOT NULL DEFAULT 'main',
105 overlap_policy TEXT NOT NULL DEFAULT 'skip',
106 paused INTEGER NOT NULL DEFAULT 0,
107 last_run_at REAL,
108 next_run_at REAL,
109 last_workflow_id TEXT,
110 created_at REAL NOT NULL,
111 PRIMARY KEY (namespace, name)
112);
113
114CREATE TABLE IF NOT EXISTS workflow_workers (
115 id TEXT PRIMARY KEY,
116 namespace TEXT NOT NULL DEFAULT 'main',
117 identity TEXT NOT NULL,
118 task_queue TEXT NOT NULL,
119 workflows TEXT,
120 activities TEXT,
121 max_concurrent_workflows INTEGER NOT NULL DEFAULT 10,
122 max_concurrent_activities INTEGER NOT NULL DEFAULT 10,
123 active_tasks INTEGER NOT NULL DEFAULT 0,
124 last_heartbeat REAL NOT NULL,
125 registered_at REAL NOT NULL
126);
127
128CREATE TABLE IF NOT EXISTS workflow_snapshots (
129 workflow_id TEXT NOT NULL REFERENCES workflows(id),
130 event_seq INTEGER NOT NULL,
131 state_json TEXT NOT NULL,
132 created_at REAL NOT NULL,
133 PRIMARY KEY (workflow_id, event_seq)
134);
135
136CREATE TABLE IF NOT EXISTS api_keys (
137 key_hash TEXT PRIMARY KEY,
138 prefix TEXT NOT NULL,
139 label TEXT,
140 created_at REAL NOT NULL
141);
142CREATE INDEX IF NOT EXISTS idx_api_keys_prefix ON api_keys(prefix);
143
144CREATE TABLE IF NOT EXISTS engine_lock (
145 id INTEGER PRIMARY KEY CHECK (id = 1),
146 instance_id TEXT NOT NULL,
147 started_at REAL NOT NULL,
148 last_heartbeat REAL NOT NULL
149);
150"#;
151
152const LOCK_STALE_SECS: f64 = 60.0;
155const LOCK_HEARTBEAT_SECS: u64 = 15;
157
158pub struct SqliteStore {
159 pool: SqlitePool,
160 instance_id: String,
161}
162
163impl SqliteStore {
164 pub async fn new(url: &str) -> Result<Self> {
165 let pool = SqlitePool::connect(url).await?;
166 let instance_id = format!("assay-{:016x}", {
167 use std::collections::hash_map::DefaultHasher;
168 use std::hash::{Hash, Hasher};
169 let mut h = DefaultHasher::new();
170 std::time::SystemTime::now().hash(&mut h);
171 std::process::id().hash(&mut h);
172 h.finish()
173 });
174 let store = Self { pool, instance_id };
175 store.migrate().await?;
176 Ok(store)
177 }
178
179 pub async fn acquire_engine_lock(&self) -> Result<()> {
182 let now = timestamp_now();
183
184 let result = sqlx::query(
186 "INSERT INTO engine_lock (id, instance_id, started_at, last_heartbeat) VALUES (1, ?, ?, ?)",
187 )
188 .bind(&self.instance_id)
189 .bind(now)
190 .bind(now)
191 .execute(&self.pool)
192 .await;
193
194 match result {
195 Ok(_) => Ok(()),
196 Err(_) => {
197 let row: Option<(String, f64)> = sqlx::query_as(
199 "SELECT instance_id, last_heartbeat FROM engine_lock WHERE id = 1",
200 )
201 .fetch_optional(&self.pool)
202 .await?;
203
204 if let Some((existing_id, last_hb)) = row {
205 if now - last_hb > LOCK_STALE_SECS {
206 sqlx::query(
208 "UPDATE engine_lock SET instance_id = ?, started_at = ?, last_heartbeat = ? WHERE id = 1",
209 )
210 .bind(&self.instance_id)
211 .bind(now)
212 .bind(now)
213 .execute(&self.pool)
214 .await?;
215 tracing::warn!(
216 "Took over stale engine lock from {existing_id} (last heartbeat {:.0}s ago)",
217 now - last_hb
218 );
219 Ok(())
220 } else {
221 let age = now - last_hb;
222 anyhow::bail!(
223 "Another assay engine instance is already running (id: {existing_id}, \
224 last heartbeat {age:.0}s ago).\n\n\
225 SQLite only supports a single engine instance. For multi-instance \
226 deployment (Kubernetes, Docker Swarm), use PostgreSQL:\n\n\
227 \x20 assay serve --backend postgres://user:pass@host:5432/dbname"
228 );
229 }
230 } else {
231 anyhow::bail!("Unexpected engine lock state");
232 }
233 }
234 }
235 }
236
237 pub async fn refresh_engine_lock(&self) -> Result<()> {
239 sqlx::query("UPDATE engine_lock SET last_heartbeat = ? WHERE id = 1 AND instance_id = ?")
240 .bind(timestamp_now())
241 .bind(&self.instance_id)
242 .execute(&self.pool)
243 .await?;
244 Ok(())
245 }
246
247 pub async fn release_engine_lock(&self) -> Result<()> {
249 sqlx::query("DELETE FROM engine_lock WHERE id = 1 AND instance_id = ?")
250 .bind(&self.instance_id)
251 .execute(&self.pool)
252 .await?;
253 Ok(())
254 }
255
256 pub fn spawn_lock_heartbeat(self: &std::sync::Arc<Self>) {
258 let store = std::sync::Arc::clone(self);
259 tokio::spawn(async move {
260 let mut tick = tokio::time::interval(std::time::Duration::from_secs(LOCK_HEARTBEAT_SECS));
261 loop {
262 tick.tick().await;
263 if let Err(e) = store.refresh_engine_lock().await {
264 tracing::error!("Engine lock heartbeat failed: {e}");
265 }
266 }
267 });
268 }
269
270 async fn migrate(&self) -> Result<()> {
271 for statement in SCHEMA.split(';') {
272 let trimmed = statement.trim();
273 if !trimmed.is_empty() {
274 sqlx::query(trimmed).execute(&self.pool).await?;
275 }
276 }
277 Ok(())
278 }
279}
280
281impl WorkflowStore for SqliteStore {
282 async fn create_namespace(&self, name: &str) -> Result<()> {
285 sqlx::query("INSERT INTO namespaces (name, created_at) VALUES (?, ?)")
286 .bind(name)
287 .bind(timestamp_now())
288 .execute(&self.pool)
289 .await?;
290 Ok(())
291 }
292
293 async fn list_namespaces(&self) -> Result<Vec<NamespaceRecord>> {
294 let rows = sqlx::query_as::<_, (String, f64)>(
295 "SELECT name, created_at FROM namespaces ORDER BY name",
296 )
297 .fetch_all(&self.pool)
298 .await?;
299 Ok(rows
300 .into_iter()
301 .map(|(name, created_at)| NamespaceRecord { name, created_at })
302 .collect())
303 }
304
305 async fn delete_namespace(&self, name: &str) -> Result<bool> {
306 let res = sqlx::query("DELETE FROM namespaces WHERE name = ?")
307 .bind(name)
308 .execute(&self.pool)
309 .await?;
310 Ok(res.rows_affected() > 0)
311 }
312
313 async fn get_namespace_stats(&self, namespace: &str) -> Result<NamespaceStats> {
314 let total: (i64,) =
315 sqlx::query_as("SELECT COUNT(*) FROM workflows WHERE namespace = ?")
316 .bind(namespace)
317 .fetch_one(&self.pool)
318 .await?;
319 let running: (i64,) = sqlx::query_as(
320 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'RUNNING'",
321 )
322 .bind(namespace)
323 .fetch_one(&self.pool)
324 .await?;
325 let pending: (i64,) = sqlx::query_as(
326 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'PENDING'",
327 )
328 .bind(namespace)
329 .fetch_one(&self.pool)
330 .await?;
331 let completed: (i64,) = sqlx::query_as(
332 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'COMPLETED'",
333 )
334 .bind(namespace)
335 .fetch_one(&self.pool)
336 .await?;
337 let failed: (i64,) = sqlx::query_as(
338 "SELECT COUNT(*) FROM workflows WHERE namespace = ? AND status = 'FAILED'",
339 )
340 .bind(namespace)
341 .fetch_one(&self.pool)
342 .await?;
343 let schedules: (i64,) =
344 sqlx::query_as("SELECT COUNT(*) FROM workflow_schedules WHERE namespace = ?")
345 .bind(namespace)
346 .fetch_one(&self.pool)
347 .await?;
348 let workers: (i64,) =
349 sqlx::query_as("SELECT COUNT(*) FROM workflow_workers WHERE namespace = ?")
350 .bind(namespace)
351 .fetch_one(&self.pool)
352 .await?;
353
354 Ok(NamespaceStats {
355 namespace: namespace.to_string(),
356 total_workflows: total.0,
357 running: running.0,
358 pending: pending.0,
359 completed: completed.0,
360 failed: failed.0,
361 schedules: schedules.0,
362 workers: workers.0,
363 })
364 }
365
366 async fn create_workflow(&self, wf: &WorkflowRecord) -> Result<()> {
369 sqlx::query(
370 "INSERT INTO workflows (id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at)
371 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
372 )
373 .bind(&wf.id)
374 .bind(&wf.namespace)
375 .bind(&wf.run_id)
376 .bind(&wf.workflow_type)
377 .bind(&wf.task_queue)
378 .bind(&wf.status)
379 .bind(&wf.input)
380 .bind(&wf.result)
381 .bind(&wf.error)
382 .bind(&wf.parent_id)
383 .bind(&wf.claimed_by)
384 .bind(wf.created_at)
385 .bind(wf.updated_at)
386 .bind(wf.completed_at)
387 .execute(&self.pool)
388 .await?;
389 Ok(())
390 }
391
392 async fn get_workflow(&self, id: &str) -> Result<Option<WorkflowRecord>> {
393 let row = sqlx::query_as::<_, SqliteWorkflowRow>(
394 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at FROM workflows WHERE id = ?",
395 )
396 .bind(id)
397 .fetch_optional(&self.pool)
398 .await?;
399 Ok(row.map(Into::into))
400 }
401
402 async fn list_workflows(
403 &self,
404 namespace: &str,
405 status: Option<WorkflowStatus>,
406 workflow_type: Option<&str>,
407 limit: i64,
408 offset: i64,
409 ) -> Result<Vec<WorkflowRecord>> {
410 let status_str = status.map(|s| s.to_string());
411 let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
412 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
413 FROM workflows
414 WHERE namespace = ?
415 AND (? IS NULL OR status = ?)
416 AND (? IS NULL OR workflow_type = ?)
417 ORDER BY created_at DESC
418 LIMIT ? OFFSET ?",
419 )
420 .bind(namespace)
421 .bind(&status_str)
422 .bind(&status_str)
423 .bind(workflow_type)
424 .bind(workflow_type)
425 .bind(limit)
426 .bind(offset)
427 .fetch_all(&self.pool)
428 .await?;
429 Ok(rows.into_iter().map(Into::into).collect())
430 }
431
432 async fn update_workflow_status(
433 &self,
434 id: &str,
435 status: WorkflowStatus,
436 result: Option<&str>,
437 error: Option<&str>,
438 ) -> Result<()> {
439 let now = timestamp_now();
440 let completed_at = if status.is_terminal() { Some(now) } else { None };
441 sqlx::query(
442 "UPDATE workflows SET status = ?, result = COALESCE(?, result), error = COALESCE(?, error), updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE id = ?",
443 )
444 .bind(status.to_string())
445 .bind(result)
446 .bind(error)
447 .bind(now)
448 .bind(completed_at)
449 .bind(id)
450 .execute(&self.pool)
451 .await?;
452 Ok(())
453 }
454
455 async fn claim_workflow(&self, id: &str, worker_id: &str) -> Result<bool> {
456 let res = sqlx::query(
457 "UPDATE workflows SET claimed_by = ?, status = 'RUNNING', updated_at = ? WHERE id = ? AND claimed_by IS NULL",
458 )
459 .bind(worker_id)
460 .bind(timestamp_now())
461 .bind(id)
462 .execute(&self.pool)
463 .await?;
464 Ok(res.rows_affected() > 0)
465 }
466
467 async fn mark_workflow_dispatchable(&self, workflow_id: &str) -> Result<()> {
468 sqlx::query("UPDATE workflows SET needs_dispatch = 1 WHERE id = ?")
469 .bind(workflow_id)
470 .execute(&self.pool)
471 .await?;
472 Ok(())
473 }
474
475 async fn claim_workflow_task(
476 &self,
477 task_queue: &str,
478 worker_id: &str,
479 ) -> Result<Option<WorkflowRecord>> {
480 let now = timestamp_now();
481 let row = sqlx::query_as::<_, SqliteWorkflowRow>(
483 "UPDATE workflows
484 SET dispatch_claimed_by = ?, dispatch_last_heartbeat = ?, needs_dispatch = 0
485 WHERE id = (
486 SELECT id FROM workflows
487 WHERE task_queue = ?
488 AND needs_dispatch = 1
489 AND dispatch_claimed_by IS NULL
490 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')
491 ORDER BY updated_at ASC
492 LIMIT 1
493 )
494 RETURNING id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at",
495 )
496 .bind(worker_id)
497 .bind(now)
498 .bind(task_queue)
499 .fetch_optional(&self.pool)
500 .await?;
501 Ok(row.map(Into::into))
502 }
503
504 async fn release_workflow_task(&self, workflow_id: &str, worker_id: &str) -> Result<()> {
505 sqlx::query(
506 "UPDATE workflows
507 SET dispatch_claimed_by = NULL, dispatch_last_heartbeat = NULL
508 WHERE id = ? AND dispatch_claimed_by = ?",
509 )
510 .bind(workflow_id)
511 .bind(worker_id)
512 .execute(&self.pool)
513 .await?;
514 Ok(())
515 }
516
517 async fn release_stale_dispatch_leases(
518 &self,
519 now: f64,
520 timeout_secs: f64,
521 ) -> Result<u64> {
522 let res = sqlx::query(
526 "UPDATE workflows
527 SET dispatch_claimed_by = NULL,
528 dispatch_last_heartbeat = NULL,
529 needs_dispatch = 1
530 WHERE dispatch_claimed_by IS NOT NULL
531 AND (? - dispatch_last_heartbeat) > ?
532 AND status NOT IN ('COMPLETED', 'FAILED', 'CANCELLED', 'TIMED_OUT')",
533 )
534 .bind(now)
535 .bind(timeout_secs)
536 .execute(&self.pool)
537 .await?;
538 Ok(res.rows_affected())
539 }
540
541 async fn append_event(&self, ev: &WorkflowEvent) -> Result<i64> {
544 let res = sqlx::query(
545 "INSERT INTO workflow_events (workflow_id, seq, event_type, payload, timestamp) VALUES (?, ?, ?, ?, ?)",
546 )
547 .bind(&ev.workflow_id)
548 .bind(ev.seq)
549 .bind(&ev.event_type)
550 .bind(&ev.payload)
551 .bind(ev.timestamp)
552 .execute(&self.pool)
553 .await?;
554 Ok(res.last_insert_rowid())
555 }
556
557 async fn list_events(&self, workflow_id: &str) -> Result<Vec<WorkflowEvent>> {
558 let rows = sqlx::query_as::<_, SqliteEventRow>(
559 "SELECT id, workflow_id, seq, event_type, payload, timestamp FROM workflow_events WHERE workflow_id = ? ORDER BY seq ASC",
560 )
561 .bind(workflow_id)
562 .fetch_all(&self.pool)
563 .await?;
564 Ok(rows.into_iter().map(Into::into).collect())
565 }
566
567 async fn get_event_count(&self, workflow_id: &str) -> Result<i64> {
568 let row: (i64,) =
569 sqlx::query_as("SELECT COUNT(*) FROM workflow_events WHERE workflow_id = ?")
570 .bind(workflow_id)
571 .fetch_one(&self.pool)
572 .await?;
573 Ok(row.0)
574 }
575
576 async fn create_activity(&self, act: &WorkflowActivity) -> Result<i64> {
579 let res = sqlx::query(
580 "INSERT INTO workflow_activities (workflow_id, seq, name, task_queue, input, status, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, scheduled_at)
581 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
582 )
583 .bind(&act.workflow_id)
584 .bind(act.seq)
585 .bind(&act.name)
586 .bind(&act.task_queue)
587 .bind(&act.input)
588 .bind(&act.status)
589 .bind(act.attempt)
590 .bind(act.max_attempts)
591 .bind(act.initial_interval_secs)
592 .bind(act.backoff_coefficient)
593 .bind(act.start_to_close_secs)
594 .bind(act.heartbeat_timeout_secs)
595 .bind(act.scheduled_at)
596 .execute(&self.pool)
597 .await?;
598 Ok(res.last_insert_rowid())
599 }
600
601 async fn get_activity(&self, id: i64) -> Result<Option<WorkflowActivity>> {
602 let row = sqlx::query_as::<_, SqliteActivityRow>(
603 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
604 FROM workflow_activities WHERE id = ?",
605 )
606 .bind(id)
607 .fetch_optional(&self.pool)
608 .await?;
609 Ok(row.map(Into::into))
610 }
611
612 async fn get_activity_by_workflow_seq(
613 &self,
614 workflow_id: &str,
615 seq: i32,
616 ) -> Result<Option<WorkflowActivity>> {
617 let row = sqlx::query_as::<_, SqliteActivityRow>(
618 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
619 FROM workflow_activities WHERE workflow_id = ? AND seq = ?",
620 )
621 .bind(workflow_id)
622 .bind(seq)
623 .fetch_optional(&self.pool)
624 .await?;
625 Ok(row.map(Into::into))
626 }
627
628 async fn claim_activity(
629 &self,
630 task_queue: &str,
631 worker_id: &str,
632 ) -> Result<Option<WorkflowActivity>> {
633 let now = timestamp_now();
634 let row = sqlx::query_as::<_, SqliteActivityRow>(
635 "UPDATE workflow_activities SET status = 'RUNNING', claimed_by = ?, started_at = ?
636 WHERE id = (
637 SELECT id FROM workflow_activities
638 WHERE task_queue = ? AND status = 'PENDING'
639 ORDER BY scheduled_at ASC
640 LIMIT 1
641 )
642 RETURNING id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat",
643 )
644 .bind(worker_id)
645 .bind(now)
646 .bind(task_queue)
647 .fetch_optional(&self.pool)
648 .await?;
649 Ok(row.map(Into::into))
650 }
651
652 async fn requeue_activity_for_retry(
653 &self,
654 id: i64,
655 next_attempt: i32,
656 next_scheduled_at: f64,
657 ) -> Result<()> {
658 sqlx::query(
659 "UPDATE workflow_activities
660 SET status = 'PENDING', attempt = ?, scheduled_at = ?,
661 claimed_by = NULL, started_at = NULL, last_heartbeat = NULL,
662 error = NULL
663 WHERE id = ?",
664 )
665 .bind(next_attempt)
666 .bind(next_scheduled_at)
667 .bind(id)
668 .execute(&self.pool)
669 .await?;
670 Ok(())
671 }
672
673 async fn complete_activity(
674 &self,
675 id: i64,
676 result: Option<&str>,
677 error: Option<&str>,
678 failed: bool,
679 ) -> Result<()> {
680 let status = if failed { "FAILED" } else { "COMPLETED" };
681 sqlx::query(
682 "UPDATE workflow_activities SET status = ?, result = ?, error = ?, completed_at = ? WHERE id = ?",
683 )
684 .bind(status)
685 .bind(result)
686 .bind(error)
687 .bind(timestamp_now())
688 .bind(id)
689 .execute(&self.pool)
690 .await?;
691 Ok(())
692 }
693
694 async fn heartbeat_activity(&self, id: i64, _details: Option<&str>) -> Result<()> {
695 sqlx::query("UPDATE workflow_activities SET last_heartbeat = ? WHERE id = ?")
696 .bind(timestamp_now())
697 .bind(id)
698 .execute(&self.pool)
699 .await?;
700 Ok(())
701 }
702
703 async fn get_timed_out_activities(&self, now: f64) -> Result<Vec<WorkflowActivity>> {
704 let rows = sqlx::query_as::<_, SqliteActivityRow>(
705 "SELECT id, workflow_id, seq, name, task_queue, input, status, result, error, attempt, max_attempts, initial_interval_secs, backoff_coefficient, start_to_close_secs, heartbeat_timeout_secs, claimed_by, scheduled_at, started_at, completed_at, last_heartbeat
706 FROM workflow_activities
707 WHERE status = 'RUNNING'
708 AND heartbeat_timeout_secs IS NOT NULL
709 AND last_heartbeat IS NOT NULL
710 AND (? - last_heartbeat) > heartbeat_timeout_secs",
711 )
712 .bind(now)
713 .fetch_all(&self.pool)
714 .await?;
715 Ok(rows.into_iter().map(Into::into).collect())
716 }
717
718 async fn create_timer(&self, timer: &WorkflowTimer) -> Result<i64> {
721 let res = sqlx::query(
722 "INSERT INTO workflow_timers (workflow_id, seq, fire_at, fired) VALUES (?, ?, ?, 0)",
723 )
724 .bind(&timer.workflow_id)
725 .bind(timer.seq)
726 .bind(timer.fire_at)
727 .execute(&self.pool)
728 .await?;
729 Ok(res.last_insert_rowid())
730 }
731
732 async fn cancel_pending_activities(&self, workflow_id: &str) -> Result<u64> {
733 let res = sqlx::query(
734 "UPDATE workflow_activities SET status = 'CANCELLED', completed_at = ?
735 WHERE workflow_id = ? AND status = 'PENDING'",
736 )
737 .bind(timestamp_now())
738 .bind(workflow_id)
739 .execute(&self.pool)
740 .await?;
741 Ok(res.rows_affected())
742 }
743
744 async fn cancel_pending_timers(&self, workflow_id: &str) -> Result<u64> {
745 let res = sqlx::query(
746 "UPDATE workflow_timers SET fired = 1
747 WHERE workflow_id = ? AND fired = 0",
748 )
749 .bind(workflow_id)
750 .execute(&self.pool)
751 .await?;
752 Ok(res.rows_affected())
753 }
754
755 async fn get_timer_by_workflow_seq(
756 &self,
757 workflow_id: &str,
758 seq: i32,
759 ) -> Result<Option<WorkflowTimer>> {
760 let row = sqlx::query_as::<_, SqliteTimerRow>(
761 "SELECT id, workflow_id, seq, fire_at, fired
762 FROM workflow_timers WHERE workflow_id = ? AND seq = ?",
763 )
764 .bind(workflow_id)
765 .bind(seq)
766 .fetch_optional(&self.pool)
767 .await?;
768 Ok(row.map(Into::into))
769 }
770
771 async fn fire_due_timers(&self, now: f64) -> Result<Vec<WorkflowTimer>> {
772 let rows = sqlx::query_as::<_, SqliteTimerRow>(
773 "UPDATE workflow_timers SET fired = 1
774 WHERE fired = 0 AND fire_at <= ?
775 RETURNING id, workflow_id, seq, fire_at, fired",
776 )
777 .bind(now)
778 .fetch_all(&self.pool)
779 .await?;
780 Ok(rows.into_iter().map(Into::into).collect())
781 }
782
783 async fn send_signal(&self, sig: &WorkflowSignal) -> Result<i64> {
786 let res = sqlx::query(
787 "INSERT INTO workflow_signals (workflow_id, name, payload, consumed, received_at) VALUES (?, ?, ?, 0, ?)",
788 )
789 .bind(&sig.workflow_id)
790 .bind(&sig.name)
791 .bind(&sig.payload)
792 .bind(sig.received_at)
793 .execute(&self.pool)
794 .await?;
795 Ok(res.last_insert_rowid())
796 }
797
798 async fn consume_signals(
799 &self,
800 workflow_id: &str,
801 name: &str,
802 ) -> Result<Vec<WorkflowSignal>> {
803 let rows = sqlx::query_as::<_, SqliteSignalRow>(
804 "UPDATE workflow_signals SET consumed = 1
805 WHERE workflow_id = ? AND name = ? AND consumed = 0
806 RETURNING id, workflow_id, name, payload, consumed, received_at",
807 )
808 .bind(workflow_id)
809 .bind(name)
810 .fetch_all(&self.pool)
811 .await?;
812 Ok(rows.into_iter().map(Into::into).collect())
813 }
814
815 async fn create_schedule(&self, sched: &WorkflowSchedule) -> Result<()> {
818 sqlx::query(
819 "INSERT INTO workflow_schedules (name, namespace, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at)
820 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
821 )
822 .bind(&sched.name)
823 .bind(&sched.namespace)
824 .bind(&sched.workflow_type)
825 .bind(&sched.cron_expr)
826 .bind(&sched.input)
827 .bind(&sched.task_queue)
828 .bind(&sched.overlap_policy)
829 .bind(sched.paused)
830 .bind(sched.last_run_at)
831 .bind(sched.next_run_at)
832 .bind(&sched.last_workflow_id)
833 .bind(sched.created_at)
834 .execute(&self.pool)
835 .await?;
836 Ok(())
837 }
838
839 async fn get_schedule(&self, namespace: &str, name: &str) -> Result<Option<WorkflowSchedule>> {
840 let row = sqlx::query_as::<_, SqliteScheduleRow>(
841 "SELECT name, namespace, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
842 FROM workflow_schedules WHERE namespace = ? AND name = ?",
843 )
844 .bind(namespace)
845 .bind(name)
846 .fetch_optional(&self.pool)
847 .await?;
848 Ok(row.map(Into::into))
849 }
850
851 async fn list_schedules(&self, namespace: &str) -> Result<Vec<WorkflowSchedule>> {
852 let rows = sqlx::query_as::<_, SqliteScheduleRow>(
853 "SELECT name, namespace, workflow_type, cron_expr, input, task_queue, overlap_policy, paused, last_run_at, next_run_at, last_workflow_id, created_at
854 FROM workflow_schedules WHERE namespace = ? ORDER BY name",
855 )
856 .bind(namespace)
857 .fetch_all(&self.pool)
858 .await?;
859 Ok(rows.into_iter().map(Into::into).collect())
860 }
861
862 async fn update_schedule_last_run(
863 &self,
864 namespace: &str,
865 name: &str,
866 last_run_at: f64,
867 next_run_at: f64,
868 workflow_id: &str,
869 ) -> Result<()> {
870 sqlx::query(
871 "UPDATE workflow_schedules SET last_run_at = ?, next_run_at = ?, last_workflow_id = ? WHERE namespace = ? AND name = ?",
872 )
873 .bind(last_run_at)
874 .bind(next_run_at)
875 .bind(workflow_id)
876 .bind(namespace)
877 .bind(name)
878 .execute(&self.pool)
879 .await?;
880 Ok(())
881 }
882
883 async fn delete_schedule(&self, namespace: &str, name: &str) -> Result<bool> {
884 let res =
885 sqlx::query("DELETE FROM workflow_schedules WHERE namespace = ? AND name = ?")
886 .bind(namespace)
887 .bind(name)
888 .execute(&self.pool)
889 .await?;
890 Ok(res.rows_affected() > 0)
891 }
892
893 async fn register_worker(&self, w: &WorkflowWorker) -> Result<()> {
896 sqlx::query(
897 "INSERT OR REPLACE INTO workflow_workers (id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at)
898 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
899 )
900 .bind(&w.id)
901 .bind(&w.namespace)
902 .bind(&w.identity)
903 .bind(&w.task_queue)
904 .bind(&w.workflows)
905 .bind(&w.activities)
906 .bind(w.max_concurrent_workflows)
907 .bind(w.max_concurrent_activities)
908 .bind(w.active_tasks)
909 .bind(w.last_heartbeat)
910 .bind(w.registered_at)
911 .execute(&self.pool)
912 .await?;
913 Ok(())
914 }
915
916 async fn heartbeat_worker(&self, id: &str, now: f64) -> Result<()> {
917 sqlx::query("UPDATE workflow_workers SET last_heartbeat = ? WHERE id = ?")
918 .bind(now)
919 .bind(id)
920 .execute(&self.pool)
921 .await?;
922 Ok(())
923 }
924
925 async fn list_workers(&self, namespace: &str) -> Result<Vec<WorkflowWorker>> {
926 let rows = sqlx::query_as::<_, SqliteWorkerRow>(
927 "SELECT id, namespace, identity, task_queue, workflows, activities, max_concurrent_workflows, max_concurrent_activities, active_tasks, last_heartbeat, registered_at
928 FROM workflow_workers WHERE namespace = ? ORDER BY registered_at",
929 )
930 .bind(namespace)
931 .fetch_all(&self.pool)
932 .await?;
933 Ok(rows.into_iter().map(Into::into).collect())
934 }
935
936 async fn remove_dead_workers(&self, cutoff: f64) -> Result<Vec<String>> {
937 let rows: Vec<(String,)> =
938 sqlx::query_as("SELECT id FROM workflow_workers WHERE last_heartbeat < ?")
939 .bind(cutoff)
940 .fetch_all(&self.pool)
941 .await?;
942 let ids: Vec<String> = rows.into_iter().map(|r| r.0).collect();
943 if !ids.is_empty() {
944 sqlx::query("DELETE FROM workflow_workers WHERE last_heartbeat < ?")
945 .bind(cutoff)
946 .execute(&self.pool)
947 .await?;
948 }
949 Ok(ids)
950 }
951
952 async fn create_api_key(
955 &self,
956 key_hash: &str,
957 prefix: &str,
958 label: Option<&str>,
959 created_at: f64,
960 ) -> Result<()> {
961 sqlx::query(
962 "INSERT INTO api_keys (key_hash, prefix, label, created_at) VALUES (?, ?, ?, ?)",
963 )
964 .bind(key_hash)
965 .bind(prefix)
966 .bind(label)
967 .bind(created_at)
968 .execute(&self.pool)
969 .await?;
970 Ok(())
971 }
972
973 async fn validate_api_key(&self, key_hash: &str) -> Result<bool> {
974 let row: Option<(i64,)> =
975 sqlx::query_as("SELECT 1 FROM api_keys WHERE key_hash = ?")
976 .bind(key_hash)
977 .fetch_optional(&self.pool)
978 .await?;
979 Ok(row.is_some())
980 }
981
982 async fn list_api_keys(&self) -> Result<Vec<ApiKeyRecord>> {
983 let rows = sqlx::query_as::<_, (String, Option<String>, f64)>(
984 "SELECT prefix, label, created_at FROM api_keys ORDER BY created_at DESC",
985 )
986 .fetch_all(&self.pool)
987 .await?;
988 Ok(rows
989 .into_iter()
990 .map(|(prefix, label, created_at)| ApiKeyRecord {
991 prefix,
992 label,
993 created_at,
994 })
995 .collect())
996 }
997
998 async fn revoke_api_key(&self, prefix: &str) -> Result<bool> {
999 let res = sqlx::query("DELETE FROM api_keys WHERE prefix = ?")
1000 .bind(prefix)
1001 .execute(&self.pool)
1002 .await?;
1003 Ok(res.rows_affected() > 0)
1004 }
1005
1006 async fn list_child_workflows(&self, parent_id: &str) -> Result<Vec<WorkflowRecord>> {
1009 let rows = sqlx::query_as::<_, SqliteWorkflowRow>(
1010 "SELECT id, namespace, run_id, workflow_type, task_queue, status, input, result, error, parent_id, claimed_by, created_at, updated_at, completed_at
1011 FROM workflows WHERE parent_id = ? ORDER BY created_at ASC",
1012 )
1013 .bind(parent_id)
1014 .fetch_all(&self.pool)
1015 .await?;
1016 Ok(rows.into_iter().map(Into::into).collect())
1017 }
1018
1019 async fn create_snapshot(
1022 &self,
1023 workflow_id: &str,
1024 event_seq: i32,
1025 state_json: &str,
1026 ) -> Result<()> {
1027 sqlx::query(
1028 "INSERT OR REPLACE INTO workflow_snapshots (workflow_id, event_seq, state_json, created_at)
1029 VALUES (?, ?, ?, ?)",
1030 )
1031 .bind(workflow_id)
1032 .bind(event_seq)
1033 .bind(state_json)
1034 .bind(timestamp_now())
1035 .execute(&self.pool)
1036 .await?;
1037 Ok(())
1038 }
1039
1040 async fn get_latest_snapshot(
1041 &self,
1042 workflow_id: &str,
1043 ) -> Result<Option<WorkflowSnapshot>> {
1044 let row = sqlx::query_as::<_, (String, i32, String, f64)>(
1045 "SELECT workflow_id, event_seq, state_json, created_at
1046 FROM workflow_snapshots WHERE workflow_id = ?
1047 ORDER BY event_seq DESC LIMIT 1",
1048 )
1049 .bind(workflow_id)
1050 .fetch_optional(&self.pool)
1051 .await?;
1052
1053 Ok(row.map(|(workflow_id, event_seq, state_json, created_at)| WorkflowSnapshot {
1054 workflow_id,
1055 event_seq,
1056 state_json,
1057 created_at,
1058 }))
1059 }
1060
1061 async fn get_queue_stats(&self, namespace: &str) -> Result<Vec<QueueStats>> {
1064 let rows = sqlx::query_as::<_, (String, i64, i64)>(
1066 "SELECT a.task_queue,
1067 SUM(CASE WHEN a.status = 'PENDING' THEN 1 ELSE 0 END),
1068 SUM(CASE WHEN a.status = 'RUNNING' THEN 1 ELSE 0 END)
1069 FROM workflow_activities a
1070 INNER JOIN workflows w ON w.id = a.workflow_id
1071 WHERE w.namespace = ?
1072 GROUP BY a.task_queue",
1073 )
1074 .bind(namespace)
1075 .fetch_all(&self.pool)
1076 .await?;
1077
1078 let mut stats: Vec<QueueStats> = rows
1079 .into_iter()
1080 .map(|(queue, pending, running)| QueueStats {
1081 queue,
1082 pending_activities: pending,
1083 running_activities: running,
1084 workers: 0,
1085 })
1086 .collect();
1087
1088 let worker_rows = sqlx::query_as::<_, (String, i64)>(
1090 "SELECT task_queue, COUNT(*) FROM workflow_workers WHERE namespace = ? GROUP BY task_queue",
1091 )
1092 .bind(namespace)
1093 .fetch_all(&self.pool)
1094 .await?;
1095
1096 for (queue, count) in worker_rows {
1097 if let Some(s) = stats.iter_mut().find(|s| s.queue == queue) {
1098 s.workers = count;
1099 } else {
1100 stats.push(QueueStats {
1101 queue,
1102 pending_activities: 0,
1103 running_activities: 0,
1104 workers: count,
1105 });
1106 }
1107 }
1108
1109 stats.sort_by(|a, b| a.queue.cmp(&b.queue));
1110 Ok(stats)
1111 }
1112
1113 async fn try_acquire_scheduler_lock(&self) -> Result<bool> {
1116 self.refresh_engine_lock().await.ok();
1119 Ok(true)
1120 }
1121}
1122
1123fn timestamp_now() -> f64 {
1124 std::time::SystemTime::now()
1125 .duration_since(std::time::UNIX_EPOCH)
1126 .unwrap()
1127 .as_secs_f64()
1128}
1129
1130#[derive(sqlx::FromRow)]
1133struct SqliteWorkflowRow {
1134 id: String,
1135 namespace: String,
1136 run_id: String,
1137 workflow_type: String,
1138 task_queue: String,
1139 status: String,
1140 input: Option<String>,
1141 result: Option<String>,
1142 error: Option<String>,
1143 parent_id: Option<String>,
1144 claimed_by: Option<String>,
1145 created_at: f64,
1146 updated_at: f64,
1147 completed_at: Option<f64>,
1148}
1149
1150impl From<SqliteWorkflowRow> for WorkflowRecord {
1151 fn from(r: SqliteWorkflowRow) -> Self {
1152 Self {
1153 id: r.id,
1154 namespace: r.namespace,
1155 run_id: r.run_id,
1156 workflow_type: r.workflow_type,
1157 task_queue: r.task_queue,
1158 status: r.status,
1159 input: r.input,
1160 result: r.result,
1161 error: r.error,
1162 parent_id: r.parent_id,
1163 claimed_by: r.claimed_by,
1164 created_at: r.created_at,
1165 updated_at: r.updated_at,
1166 completed_at: r.completed_at,
1167 }
1168 }
1169}
1170
1171#[derive(sqlx::FromRow)]
1172struct SqliteEventRow {
1173 id: i64,
1174 workflow_id: String,
1175 seq: i32,
1176 event_type: String,
1177 payload: Option<String>,
1178 timestamp: f64,
1179}
1180
1181impl From<SqliteEventRow> for WorkflowEvent {
1182 fn from(r: SqliteEventRow) -> Self {
1183 Self {
1184 id: Some(r.id),
1185 workflow_id: r.workflow_id,
1186 seq: r.seq,
1187 event_type: r.event_type,
1188 payload: r.payload,
1189 timestamp: r.timestamp,
1190 }
1191 }
1192}
1193
1194#[derive(sqlx::FromRow)]
1195struct SqliteActivityRow {
1196 id: i64,
1197 workflow_id: String,
1198 seq: i32,
1199 name: String,
1200 task_queue: String,
1201 input: Option<String>,
1202 status: String,
1203 result: Option<String>,
1204 error: Option<String>,
1205 attempt: i32,
1206 max_attempts: i32,
1207 initial_interval_secs: f64,
1208 backoff_coefficient: f64,
1209 start_to_close_secs: f64,
1210 heartbeat_timeout_secs: Option<f64>,
1211 claimed_by: Option<String>,
1212 scheduled_at: f64,
1213 started_at: Option<f64>,
1214 completed_at: Option<f64>,
1215 last_heartbeat: Option<f64>,
1216}
1217
1218impl From<SqliteActivityRow> for WorkflowActivity {
1219 fn from(r: SqliteActivityRow) -> Self {
1220 Self {
1221 id: Some(r.id),
1222 workflow_id: r.workflow_id,
1223 seq: r.seq,
1224 name: r.name,
1225 task_queue: r.task_queue,
1226 input: r.input,
1227 status: r.status,
1228 result: r.result,
1229 error: r.error,
1230 attempt: r.attempt,
1231 max_attempts: r.max_attempts,
1232 initial_interval_secs: r.initial_interval_secs,
1233 backoff_coefficient: r.backoff_coefficient,
1234 start_to_close_secs: r.start_to_close_secs,
1235 heartbeat_timeout_secs: r.heartbeat_timeout_secs,
1236 claimed_by: r.claimed_by,
1237 scheduled_at: r.scheduled_at,
1238 started_at: r.started_at,
1239 completed_at: r.completed_at,
1240 last_heartbeat: r.last_heartbeat,
1241 }
1242 }
1243}
1244
1245#[derive(sqlx::FromRow)]
1246struct SqliteTimerRow {
1247 id: i64,
1248 workflow_id: String,
1249 seq: i32,
1250 fire_at: f64,
1251 fired: bool,
1252}
1253
1254impl From<SqliteTimerRow> for WorkflowTimer {
1255 fn from(r: SqliteTimerRow) -> Self {
1256 Self {
1257 id: Some(r.id),
1258 workflow_id: r.workflow_id,
1259 seq: r.seq,
1260 fire_at: r.fire_at,
1261 fired: r.fired,
1262 }
1263 }
1264}
1265
1266#[derive(sqlx::FromRow)]
1267struct SqliteSignalRow {
1268 id: i64,
1269 workflow_id: String,
1270 name: String,
1271 payload: Option<String>,
1272 consumed: bool,
1273 received_at: f64,
1274}
1275
1276impl From<SqliteSignalRow> for WorkflowSignal {
1277 fn from(r: SqliteSignalRow) -> Self {
1278 Self {
1279 id: Some(r.id),
1280 workflow_id: r.workflow_id,
1281 name: r.name,
1282 payload: r.payload,
1283 consumed: r.consumed,
1284 received_at: r.received_at,
1285 }
1286 }
1287}
1288
1289#[derive(sqlx::FromRow)]
1290struct SqliteScheduleRow {
1291 name: String,
1292 namespace: String,
1293 workflow_type: String,
1294 cron_expr: String,
1295 input: Option<String>,
1296 task_queue: String,
1297 overlap_policy: String,
1298 paused: bool,
1299 last_run_at: Option<f64>,
1300 next_run_at: Option<f64>,
1301 last_workflow_id: Option<String>,
1302 created_at: f64,
1303}
1304
1305impl From<SqliteScheduleRow> for WorkflowSchedule {
1306 fn from(r: SqliteScheduleRow) -> Self {
1307 Self {
1308 name: r.name,
1309 namespace: r.namespace,
1310 workflow_type: r.workflow_type,
1311 cron_expr: r.cron_expr,
1312 input: r.input,
1313 task_queue: r.task_queue,
1314 overlap_policy: r.overlap_policy,
1315 paused: r.paused,
1316 last_run_at: r.last_run_at,
1317 next_run_at: r.next_run_at,
1318 last_workflow_id: r.last_workflow_id,
1319 created_at: r.created_at,
1320 }
1321 }
1322}
1323
1324#[derive(sqlx::FromRow)]
1325struct SqliteWorkerRow {
1326 id: String,
1327 namespace: String,
1328 identity: String,
1329 task_queue: String,
1330 workflows: Option<String>,
1331 activities: Option<String>,
1332 max_concurrent_workflows: i32,
1333 max_concurrent_activities: i32,
1334 active_tasks: i32,
1335 last_heartbeat: f64,
1336 registered_at: f64,
1337}
1338
1339impl From<SqliteWorkerRow> for WorkflowWorker {
1340 fn from(r: SqliteWorkerRow) -> Self {
1341 Self {
1342 id: r.id,
1343 namespace: r.namespace,
1344 identity: r.identity,
1345 task_queue: r.task_queue,
1346 workflows: r.workflows,
1347 activities: r.activities,
1348 max_concurrent_workflows: r.max_concurrent_workflows,
1349 max_concurrent_activities: r.max_concurrent_activities,
1350 active_tasks: r.active_tasks,
1351 last_heartbeat: r.last_heartbeat,
1352 registered_at: r.registered_at,
1353 }
1354 }
1355}