1use crate::backend::{
9 ApiToken, BackendResult, ReclaimResult, StateBackend, StateBackendError, WorkItem, WorkItemId,
10 WorkflowDefinition,
11};
12use crate::event::{Event, EventKind, EventSequence};
13use crate::snapshot::Snapshot;
14use crate::sqlite::{map_db_err, parse_datetime, parse_execution_id};
15use crate::tenant::{Tenant, TenantId, TenantLimits, TenantStatus, DEFAULT_TENANT};
16use async_trait::async_trait;
17use chrono::{DateTime, Utc};
18use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
19use sqlx::{Row, SqlitePool};
20use tracing::instrument;
21use uuid::Uuid;
22
23pub struct TenantScopedSqliteBackend {
28 pub(crate) pool: SqlitePool,
29 pub(crate) tenant_id: TenantId,
30}
31
32impl TenantScopedSqliteBackend {
33 pub fn new(pool: SqlitePool, tenant_id: TenantId) -> Self {
34 Self { pool, tenant_id }
35 }
36
37 pub fn tenant_id(&self) -> &TenantId {
38 &self.tenant_id
39 }
40}
41
42fn status_to_str(s: &WorkflowStatus) -> &'static str {
45 match s {
46 WorkflowStatus::Pending => "pending",
47 WorkflowStatus::Running => "running",
48 WorkflowStatus::Paused => "paused",
49 WorkflowStatus::Completed => "completed",
50 WorkflowStatus::Failed => "failed",
51 WorkflowStatus::Cancelled => "cancelled",
52 WorkflowStatus::LimitExceeded => "limit_exceeded",
53 }
54}
55
56fn str_to_status(s: &str) -> BackendResult<WorkflowStatus> {
57 match s {
58 "pending" => Ok(WorkflowStatus::Pending),
59 "running" => Ok(WorkflowStatus::Running),
60 "paused" => Ok(WorkflowStatus::Paused),
61 "completed" => Ok(WorkflowStatus::Completed),
62 "failed" => Ok(WorkflowStatus::Failed),
63 "cancelled" => Ok(WorkflowStatus::Cancelled),
64 "limit_exceeded" => Ok(WorkflowStatus::LimitExceeded),
65 other => Err(StateBackendError::Database(format!(
66 "unknown status: {other}"
67 ))),
68 }
69}
70
71fn execution_id_str(id: &ExecutionId) -> String {
72 id.0.to_string()
73}
74
75fn row_to_execution(row: &sqlx::sqlite::SqliteRow) -> BackendResult<WorkflowExecution> {
76 let execution_id =
77 parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
78 let status = str_to_status(row.try_get::<&str, _>("status").map_err(map_db_err)?)?;
79 let initial_input: serde_json::Value = serde_json::from_str(
80 row.try_get::<&str, _>("initial_input")
81 .map_err(map_db_err)?,
82 )
83 .map_err(StateBackendError::Serialization)?;
84 let current_state: serde_json::Value = serde_json::from_str(
85 row.try_get::<&str, _>("current_state")
86 .map_err(map_db_err)?,
87 )
88 .map_err(StateBackendError::Serialization)?;
89 let started_at = parse_datetime(row.try_get::<&str, _>("started_at").map_err(map_db_err)?)?;
90 let updated_at = parse_datetime(row.try_get::<&str, _>("updated_at").map_err(map_db_err)?)?;
91 let completed_at: Option<DateTime<Utc>> = row
92 .try_get::<Option<&str>, _>("completed_at")
93 .map_err(map_db_err)?
94 .map(parse_datetime)
95 .transpose()?;
96
97 Ok(WorkflowExecution {
98 execution_id,
99 workflow_id: row
100 .try_get::<String, _>("workflow_id")
101 .map_err(map_db_err)?,
102 workflow_version: row
103 .try_get::<String, _>("workflow_version")
104 .map_err(map_db_err)?,
105 status,
106 initial_input,
107 current_state,
108 started_at,
109 updated_at,
110 completed_at,
111 session_type: None,
112 })
113}
114
115fn row_to_event(row: &sqlx::sqlite::SqliteRow) -> BackendResult<Event> {
116 let id = Uuid::parse_str(row.try_get::<&str, _>("id").map_err(map_db_err)?)
117 .map_err(|e| StateBackendError::Database(e.to_string()))?;
118 let execution_id =
119 parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
120 let sequence: i64 = row.try_get("sequence").map_err(map_db_err)?;
121 let kind: EventKind =
122 serde_json::from_str(row.try_get::<&str, _>("kind_json").map_err(map_db_err)?)
123 .map_err(StateBackendError::Serialization)?;
124 let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
125
126 Ok(Event {
127 id,
128 execution_id,
129 sequence,
130 kind,
131 created_at,
132 })
133}
134
135fn row_to_work_item(row: &sqlx::sqlite::SqliteRow) -> BackendResult<WorkItem> {
136 let id = Uuid::parse_str(row.try_get::<&str, _>("id").map_err(map_db_err)?)
137 .map_err(|e| StateBackendError::Database(e.to_string()))?;
138 let execution_id =
139 parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
140 let payload: serde_json::Value =
141 serde_json::from_str(row.try_get::<&str, _>("payload_json").map_err(map_db_err)?)
142 .map_err(StateBackendError::Serialization)?;
143 let lease_expires_at: Option<DateTime<Utc>> = row
144 .try_get::<Option<&str>, _>("lease_expires_at")
145 .map_err(map_db_err)?
146 .map(parse_datetime)
147 .transpose()?;
148 let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
149 let attempt: i64 = row.try_get("attempt").map_err(map_db_err)?;
150 let max_attempts: i64 = row.try_get("max_attempts").unwrap_or(3);
151 let tenant_id: String = row
152 .try_get("tenant_id")
153 .unwrap_or_else(|_| DEFAULT_TENANT.to_string());
154
155 Ok(WorkItem {
156 id,
157 execution_id,
158 node_id: row.try_get::<String, _>("node_id").map_err(map_db_err)?,
159 queue_type: row.try_get::<String, _>("queue_type").map_err(map_db_err)?,
160 payload,
161 attempt: attempt as u32,
162 max_attempts: max_attempts as u32,
163 created_at,
164 lease_expires_at,
165 worker_id: row
166 .try_get::<Option<String>, _>("worker_id")
167 .map_err(map_db_err)?,
168 tenant_id,
169 })
170}
171
172#[async_trait]
175impl StateBackend for TenantScopedSqliteBackend {
176 #[instrument(skip(self, def), fields(tenant = %self.tenant_id, workflow_id = %def.workflow_id))]
179 async fn store_workflow(&self, mut def: WorkflowDefinition) -> BackendResult<()> {
180 def.tenant_id = self.tenant_id.0.clone();
181 let ir_json = serde_json::to_string(&def.ir)?;
182 let created_at = def.created_at.to_rfc3339();
183
184 sqlx::query(
185 r#"INSERT OR REPLACE INTO workflow_definitions (workflow_id, version, ir_json, created_at, tenant_id)
186 VALUES (?, ?, ?, ?, ?)"#,
187 )
188 .bind(&def.workflow_id)
189 .bind(&def.version)
190 .bind(&ir_json)
191 .bind(&created_at)
192 .bind(&self.tenant_id.0)
193 .execute(&self.pool)
194 .await
195 .map_err(map_db_err)?;
196
197 Ok(())
198 }
199
200 #[instrument(skip(self), fields(tenant = %self.tenant_id, workflow_id = workflow_id))]
201 async fn get_workflow(
202 &self,
203 workflow_id: &str,
204 version: &str,
205 ) -> BackendResult<Option<WorkflowDefinition>> {
206 let row = sqlx::query(
207 "SELECT * FROM workflow_definitions WHERE workflow_id = ? AND version = ? AND tenant_id = ?",
208 )
209 .bind(workflow_id)
210 .bind(version)
211 .bind(&self.tenant_id.0)
212 .fetch_optional(&self.pool)
213 .await
214 .map_err(map_db_err)?;
215
216 let Some(row) = row else { return Ok(None) };
217
218 let ir: serde_json::Value =
219 serde_json::from_str(row.try_get::<&str, _>("ir_json").map_err(map_db_err)?)
220 .map_err(StateBackendError::Serialization)?;
221 let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
222
223 Ok(Some(WorkflowDefinition {
224 workflow_id: row
225 .try_get::<String, _>("workflow_id")
226 .map_err(map_db_err)?,
227 version: row.try_get::<String, _>("version").map_err(map_db_err)?,
228 ir,
229 created_at,
230 tenant_id: self.tenant_id.0.clone(),
231 }))
232 }
233
234 #[instrument(skip(self, execution), fields(tenant = %self.tenant_id, execution_id = %execution.execution_id))]
237 async fn create_execution(&self, execution: WorkflowExecution) -> BackendResult<()> {
238 let id = execution_id_str(&execution.execution_id);
239 let status = status_to_str(&execution.status);
240 let initial_input = serde_json::to_string(&execution.initial_input)?;
241 let current_state = serde_json::to_string(&execution.current_state)?;
242 let started_at = execution.started_at.to_rfc3339();
243 let updated_at = execution.updated_at.to_rfc3339();
244 let completed_at = execution.completed_at.map(|dt| dt.to_rfc3339());
245
246 sqlx::query(
247 r#"INSERT INTO workflow_executions
248 (execution_id, workflow_id, workflow_version, status, initial_input, current_state,
249 started_at, updated_at, completed_at, tenant_id)
250 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
251 )
252 .bind(&id)
253 .bind(&execution.workflow_id)
254 .bind(&execution.workflow_version)
255 .bind(status)
256 .bind(&initial_input)
257 .bind(¤t_state)
258 .bind(&started_at)
259 .bind(&updated_at)
260 .bind(completed_at.as_deref())
261 .bind(&self.tenant_id.0)
262 .execute(&self.pool)
263 .await
264 .map_err(map_db_err)?;
265
266 Ok(())
267 }
268
269 #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %id))]
270 async fn get_execution(&self, id: &ExecutionId) -> BackendResult<Option<WorkflowExecution>> {
271 let id_str = execution_id_str(id);
272 let row = sqlx::query(
273 "SELECT * FROM workflow_executions WHERE execution_id = ? AND tenant_id = ?",
274 )
275 .bind(&id_str)
276 .bind(&self.tenant_id.0)
277 .fetch_optional(&self.pool)
278 .await
279 .map_err(map_db_err)?;
280
281 row.map(|r| row_to_execution(&r)).transpose()
282 }
283
284 #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %id))]
285 async fn update_execution_status(
286 &self,
287 id: &ExecutionId,
288 status: WorkflowStatus,
289 ) -> BackendResult<()> {
290 let id_str = execution_id_str(id);
291 let status_str = status_to_str(&status);
292 let now = Utc::now().to_rfc3339();
293 let completed_at = if status.is_terminal() {
294 Some(now.clone())
295 } else {
296 None
297 };
298
299 let rows_affected = sqlx::query(
300 "UPDATE workflow_executions SET status = ?, updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE execution_id = ? AND tenant_id = ?",
301 )
302 .bind(status_str)
303 .bind(&now)
304 .bind(completed_at.as_deref())
305 .bind(&id_str)
306 .bind(&self.tenant_id.0)
307 .execute(&self.pool)
308 .await
309 .map_err(map_db_err)?
310 .rows_affected();
311
312 if rows_affected == 0 {
313 return Err(StateBackendError::NotFound(id_str));
314 }
315 Ok(())
316 }
317
318 async fn update_execution_current_state(
319 &self,
320 id: &ExecutionId,
321 current_state: &serde_json::Value,
322 ) -> BackendResult<()> {
323 let id_str = execution_id_str(id);
324 let state_str =
325 serde_json::to_string(current_state).map_err(StateBackendError::Serialization)?;
326 let now = Utc::now().to_rfc3339();
327 sqlx::query(
328 "UPDATE workflow_executions SET current_state = ?, updated_at = ? WHERE execution_id = ? AND tenant_id = ?",
329 )
330 .bind(&state_str)
331 .bind(&now)
332 .bind(&id_str)
333 .bind(&self.tenant_id.0)
334 .execute(&self.pool)
335 .await
336 .map_err(map_db_err)?;
337 Ok(())
338 }
339
340 async fn patch_append_array(
341 &self,
342 execution_id: &ExecutionId,
343 key: &str,
344 value: serde_json::Value,
345 ) -> BackendResult<()> {
346 let exec = self
347 .get_execution(execution_id)
348 .await?
349 .ok_or_else(|| StateBackendError::NotFound(format!("execution {execution_id}")))?;
350 let mut state = exec.current_state.clone();
351 let arr = state
352 .as_object_mut()
353 .ok_or_else(|| StateBackendError::Database("state is not a JSON object".into()))?
354 .entry(key)
355 .or_insert_with(|| serde_json::json!([]));
356 arr.as_array_mut()
357 .ok_or_else(|| StateBackendError::Database(format!("{key} is not an array")))?
358 .push(value);
359 self.update_execution_current_state(execution_id, &state)
360 .await
361 }
362
363 #[instrument(skip(self), fields(tenant = %self.tenant_id))]
364 async fn list_executions(
365 &self,
366 status: Option<WorkflowStatus>,
367 limit: u32,
368 offset: u32,
369 ) -> BackendResult<Vec<WorkflowExecution>> {
370 let rows = match status {
371 Some(s) => {
372 let status_str = status_to_str(&s);
373 sqlx::query(
374 "SELECT * FROM workflow_executions WHERE status = ? AND tenant_id = ? ORDER BY updated_at DESC LIMIT ? OFFSET ?",
375 )
376 .bind(status_str)
377 .bind(&self.tenant_id.0)
378 .bind(limit as i64)
379 .bind(offset as i64)
380 .fetch_all(&self.pool)
381 .await
382 .map_err(map_db_err)?
383 }
384 None => sqlx::query(
385 "SELECT * FROM workflow_executions WHERE tenant_id = ? ORDER BY updated_at DESC LIMIT ? OFFSET ?",
386 )
387 .bind(&self.tenant_id.0)
388 .bind(limit as i64)
389 .bind(offset as i64)
390 .fetch_all(&self.pool)
391 .await
392 .map_err(map_db_err)?,
393 };
394
395 rows.iter().map(row_to_execution).collect()
396 }
397
398 #[instrument(skip(self, event), fields(tenant = %self.tenant_id, execution_id = %event.execution_id))]
401 async fn append_event(&self, event: Event) -> BackendResult<EventSequence> {
402 let id = event.id.to_string();
403 let execution_id = execution_id_str(&event.execution_id);
404 let kind_json = serde_json::to_string(&event.kind)?;
405 let created_at = event.created_at.to_rfc3339();
406
407 let mut tx = self.pool.begin().await.map_err(map_db_err)?;
412 let seq_row = sqlx::query(
413 "SELECT COALESCE(MAX(sequence), 0) + 1 AS seq FROM events WHERE execution_id = ?",
414 )
415 .bind(&execution_id)
416 .fetch_one(&mut *tx)
417 .await
418 .map_err(map_db_err)?;
419 let sequence: i64 = seq_row.try_get::<i64, _>("seq").map_err(map_db_err)?;
420
421 sqlx::query(
422 r#"INSERT INTO events (id, execution_id, sequence, kind_json, created_at, tenant_id)
423 VALUES (?, ?, ?, ?, ?, ?)"#,
424 )
425 .bind(&id)
426 .bind(&execution_id)
427 .bind(sequence)
428 .bind(&kind_json)
429 .bind(&created_at)
430 .bind(&self.tenant_id.0)
431 .execute(&mut *tx)
432 .await
433 .map_err(map_db_err)?;
434 tx.commit().await.map_err(map_db_err)?;
435
436 Ok(sequence)
437 }
438
439 #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
440 async fn get_events(&self, execution_id: &ExecutionId) -> BackendResult<Vec<Event>> {
441 let id_str = execution_id_str(execution_id);
442 let rows = sqlx::query(
443 "SELECT * FROM events WHERE execution_id = ? AND tenant_id = ? ORDER BY sequence ASC",
444 )
445 .bind(&id_str)
446 .bind(&self.tenant_id.0)
447 .fetch_all(&self.pool)
448 .await
449 .map_err(map_db_err)?;
450
451 rows.iter().map(row_to_event).collect()
452 }
453
454 #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
455 async fn get_events_since(
456 &self,
457 execution_id: &ExecutionId,
458 since_sequence: EventSequence,
459 ) -> BackendResult<Vec<Event>> {
460 let id_str = execution_id_str(execution_id);
461 let rows = sqlx::query(
462 "SELECT * FROM events WHERE execution_id = ? AND tenant_id = ? AND sequence > ? ORDER BY sequence ASC",
463 )
464 .bind(&id_str)
465 .bind(&self.tenant_id.0)
466 .bind(since_sequence)
467 .fetch_all(&self.pool)
468 .await
469 .map_err(map_db_err)?;
470
471 rows.iter().map(row_to_event).collect()
472 }
473
474 #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
475 async fn latest_sequence(&self, execution_id: &ExecutionId) -> BackendResult<EventSequence> {
476 let id_str = execution_id_str(execution_id);
477 let row = sqlx::query(
478 "SELECT COALESCE(MAX(sequence), 0) as seq FROM events WHERE execution_id = ? AND tenant_id = ?",
479 )
480 .bind(&id_str)
481 .bind(&self.tenant_id.0)
482 .fetch_one(&self.pool)
483 .await
484 .map_err(map_db_err)?;
485
486 Ok(row.try_get::<i64, _>("seq").map_err(map_db_err)?)
487 }
488
489 #[instrument(skip(self, snapshot), fields(tenant = %self.tenant_id, execution_id = %snapshot.execution_id))]
492 async fn write_snapshot(&self, snapshot: Snapshot) -> BackendResult<()> {
493 let id = snapshot.id.to_string();
494 let execution_id = execution_id_str(&snapshot.execution_id);
495 let state_json = serde_json::to_string(&snapshot.state)?;
496 let created_at = snapshot.created_at.to_rfc3339();
497
498 sqlx::query(
499 r#"INSERT OR REPLACE INTO snapshots (id, execution_id, at_sequence, state_json, created_at, tenant_id)
500 VALUES (?, ?, ?, ?, ?, ?)"#,
501 )
502 .bind(&id)
503 .bind(&execution_id)
504 .bind(snapshot.at_sequence)
505 .bind(&state_json)
506 .bind(&created_at)
507 .bind(&self.tenant_id.0)
508 .execute(&self.pool)
509 .await
510 .map_err(map_db_err)?;
511
512 Ok(())
513 }
514
515 #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
516 async fn latest_snapshot(&self, execution_id: &ExecutionId) -> BackendResult<Option<Snapshot>> {
517 let id_str = execution_id_str(execution_id);
518 let row = sqlx::query(
519 "SELECT * FROM snapshots WHERE execution_id = ? AND tenant_id = ? ORDER BY at_sequence DESC LIMIT 1",
520 )
521 .bind(&id_str)
522 .bind(&self.tenant_id.0)
523 .fetch_optional(&self.pool)
524 .await
525 .map_err(map_db_err)?;
526
527 let Some(row) = row else { return Ok(None) };
528
529 let id = Uuid::parse_str(row.try_get::<&str, _>("id").map_err(map_db_err)?)
530 .map_err(|e| StateBackendError::Database(e.to_string()))?;
531 let execution_id =
532 parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
533 let at_sequence: i64 = row.try_get("at_sequence").map_err(map_db_err)?;
534 let state: serde_json::Value =
535 serde_json::from_str(row.try_get::<&str, _>("state_json").map_err(map_db_err)?)
536 .map_err(StateBackendError::Serialization)?;
537 let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
538
539 Ok(Some(Snapshot {
540 id,
541 execution_id,
542 at_sequence,
543 state,
544 created_at,
545 }))
546 }
547
548 #[instrument(skip(self, item), fields(tenant = %self.tenant_id, execution_id = %item.execution_id))]
551 async fn enqueue_work_item(&self, mut item: WorkItem) -> BackendResult<WorkItemId> {
552 item.tenant_id = self.tenant_id.0.clone();
553 let id = item.id.to_string();
554 let execution_id = execution_id_str(&item.execution_id);
555 let payload_json = serde_json::to_string(&item.payload)?;
556 let created_at = item.created_at.to_rfc3339();
557
558 sqlx::query(
559 r#"INSERT INTO work_items
560 (id, execution_id, node_id, queue_type, payload_json, attempt, max_attempts, status, created_at, tenant_id)
561 VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?)"#,
562 )
563 .bind(&id)
564 .bind(&execution_id)
565 .bind(&item.node_id)
566 .bind(&item.queue_type)
567 .bind(&payload_json)
568 .bind(item.attempt as i64)
569 .bind(item.max_attempts as i64)
570 .bind(&created_at)
571 .bind(&self.tenant_id.0)
572 .execute(&self.pool)
573 .await
574 .map_err(map_db_err)?;
575
576 Ok(item.id)
577 }
578
579 #[instrument(skip(self), fields(tenant = %self.tenant_id, worker_id = worker_id))]
580 async fn claim_work_item(
581 &self,
582 worker_id: &str,
583 queue_types: &[&str],
584 ) -> BackendResult<Option<WorkItem>> {
585 if queue_types.is_empty() {
586 return Ok(None);
587 }
588
589 let now = Utc::now().to_rfc3339();
590 sqlx::query(
592 "UPDATE work_items SET status = 'pending', worker_id = NULL, lease_expires_at = NULL \
593 WHERE status = 'claimed' AND lease_expires_at < ? AND tenant_id = ?",
594 )
595 .bind(&now)
596 .bind(&self.tenant_id.0)
597 .execute(&self.pool)
598 .await
599 .map_err(map_db_err)?;
600
601 let mut tx = self.pool.begin().await.map_err(map_db_err)?;
602
603 let placeholders = queue_types
604 .iter()
605 .map(|_| "?")
606 .collect::<Vec<_>>()
607 .join(",");
608 let query_str = format!(
609 "SELECT * FROM work_items WHERE status = 'pending' AND queue_type IN ({}) \
610 AND tenant_id = ? AND (retry_after IS NULL OR retry_after <= ?) ORDER BY created_at ASC LIMIT 1",
611 placeholders
612 );
613 let mut q = sqlx::query(&query_str);
614 for qt in queue_types {
615 q = q.bind(*qt);
616 }
617 q = q.bind(&self.tenant_id.0);
618 q = q.bind(&now);
619 let row = q.fetch_optional(&mut *tx).await.map_err(map_db_err)?;
620
621 let Some(row) = row else {
622 tx.rollback().await.map_err(map_db_err)?;
623 return Ok(None);
624 };
625
626 let item = row_to_work_item(&row)?;
627 let item_id = item.id.to_string();
628 let lease_expires_at = (Utc::now() + chrono::Duration::seconds(30)).to_rfc3339();
629 let claimed_at = Utc::now().to_rfc3339();
630
631 sqlx::query(
632 "UPDATE work_items SET status = 'claimed', worker_id = ?, lease_expires_at = ?, claimed_at = ? WHERE id = ?",
633 )
634 .bind(worker_id)
635 .bind(&lease_expires_at)
636 .bind(&claimed_at)
637 .bind(&item_id)
638 .execute(&mut *tx)
639 .await
640 .map_err(map_db_err)?;
641
642 tx.commit().await.map_err(map_db_err)?;
643
644 let mut claimed = item;
645 claimed.worker_id = Some(worker_id.to_string());
646 claimed.lease_expires_at = Some(
647 DateTime::parse_from_rfc3339(&lease_expires_at)
648 .map(|dt| dt.with_timezone(&Utc))
649 .map_err(|e| StateBackendError::Database(e.to_string()))?,
650 );
651 Ok(Some(claimed))
652 }
653
654 #[instrument(skip(self), fields(tenant = %self.tenant_id, item_id = %item_id))]
655 async fn renew_lease(&self, item_id: WorkItemId, worker_id: &str) -> BackendResult<()> {
656 let lease_expires_at = (Utc::now() + chrono::Duration::seconds(30)).to_rfc3339();
657 let id_str = item_id.to_string();
658
659 let rows_affected = sqlx::query(
660 "UPDATE work_items SET lease_expires_at = ? WHERE id = ? AND worker_id = ? AND status = 'claimed' AND tenant_id = ?",
661 )
662 .bind(&lease_expires_at)
663 .bind(&id_str)
664 .bind(worker_id)
665 .bind(&self.tenant_id.0)
666 .execute(&self.pool)
667 .await
668 .map_err(map_db_err)?
669 .rows_affected();
670
671 if rows_affected == 0 {
672 return Err(StateBackendError::NotFound(id_str));
673 }
674 Ok(())
675 }
676
677 #[instrument(skip(self), fields(tenant = %self.tenant_id, item_id = %item_id))]
678 async fn complete_work_item(&self, item_id: WorkItemId) -> BackendResult<()> {
679 let id_str = item_id.to_string();
680 let completed_at = Utc::now().to_rfc3339();
681
682 let rows_affected = sqlx::query(
683 "UPDATE work_items SET status = 'completed', completed_at = ?, lease_expires_at = NULL WHERE id = ? AND tenant_id = ?",
684 )
685 .bind(&completed_at)
686 .bind(&id_str)
687 .bind(&self.tenant_id.0)
688 .execute(&self.pool)
689 .await
690 .map_err(map_db_err)?
691 .rows_affected();
692
693 if rows_affected == 0 {
694 return Err(StateBackendError::NotFound(id_str));
695 }
696 Ok(())
697 }
698
699 #[instrument(skip(self, error), fields(tenant = %self.tenant_id, item_id = %item_id))]
700 async fn fail_work_item(&self, item_id: WorkItemId, error: &str) -> BackendResult<()> {
701 let id_str = item_id.to_string();
702 let _ = error;
703
704 let rows_affected = sqlx::query(
705 "UPDATE work_items SET status = 'failed', lease_expires_at = NULL, worker_id = NULL WHERE id = ? AND tenant_id = ?",
706 )
707 .bind(&id_str)
708 .bind(&self.tenant_id.0)
709 .execute(&self.pool)
710 .await
711 .map_err(map_db_err)?
712 .rows_affected();
713
714 if rows_affected == 0 {
715 return Err(StateBackendError::NotFound(id_str));
716 }
717 Ok(())
718 }
719
720 #[instrument(skip(self), fields(tenant = %self.tenant_id))]
721 async fn reclaim_expired_leases(&self) -> BackendResult<ReclaimResult> {
722 let now = Utc::now().to_rfc3339();
723
724 let rows = sqlx::query(
725 "SELECT * FROM work_items WHERE status = 'claimed' AND lease_expires_at < ? AND tenant_id = ? ORDER BY created_at ASC",
726 )
727 .bind(&now)
728 .bind(&self.tenant_id.0)
729 .fetch_all(&self.pool)
730 .await
731 .map_err(map_db_err)?;
732
733 let mut result = ReclaimResult::default();
734
735 for row in &rows {
736 let item = row_to_work_item(row)?;
737 let new_attempt = item.attempt + 1;
738 let id_str = item.id.to_string();
739
740 if new_attempt >= item.max_attempts {
741 let dead_lettered_at = Utc::now().to_rfc3339();
742 sqlx::query(
743 r#"INSERT OR IGNORE INTO dead_letter_items
744 (id, execution_id, node_id, queue_type, payload_json, attempt, last_error, created_at, dead_lettered_at, tenant_id)
745 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
746 )
747 .bind(&id_str)
748 .bind(execution_id_str(&item.execution_id))
749 .bind(&item.node_id)
750 .bind(&item.queue_type)
751 .bind(serde_json::to_string(&item.payload)?)
752 .bind(new_attempt as i64)
753 .bind("lease expired: worker dead")
754 .bind(item.created_at.to_rfc3339())
755 .bind(dead_lettered_at)
756 .bind(&self.tenant_id.0)
757 .execute(&self.pool)
758 .await
759 .map_err(map_db_err)?;
760
761 sqlx::query("UPDATE work_items SET status = 'dead_lettered', attempt = ?, lease_expires_at = NULL, worker_id = NULL WHERE id = ?")
762 .bind(new_attempt as i64)
763 .bind(&id_str)
764 .execute(&self.pool)
765 .await
766 .map_err(map_db_err)?;
767
768 let mut exhausted_item = item;
769 exhausted_item.attempt = new_attempt;
770 result.exhausted.push(exhausted_item);
771 } else {
772 let backoff_secs = 1u64 << new_attempt.min(6);
773 let retry_after =
774 (Utc::now() + chrono::Duration::seconds(backoff_secs as i64)).to_rfc3339();
775
776 sqlx::query(
777 "UPDATE work_items SET status = 'pending', attempt = ?, worker_id = NULL, lease_expires_at = NULL, retry_after = ? WHERE id = ?",
778 )
779 .bind(new_attempt as i64)
780 .bind(&retry_after)
781 .bind(&id_str)
782 .execute(&self.pool)
783 .await
784 .map_err(map_db_err)?;
785
786 let mut retry_item = item;
787 retry_item.attempt = new_attempt;
788 result.retryable.push(retry_item);
789 }
790 }
791
792 Ok(result)
793 }
794
795 #[instrument(skip(self, last_error), fields(tenant = %self.tenant_id, item_id = %item_id))]
796 async fn move_to_dead_letter(
797 &self,
798 item_id: WorkItemId,
799 last_error: &str,
800 ) -> BackendResult<()> {
801 let id_str = item_id.to_string();
802
803 let row = sqlx::query("SELECT * FROM work_items WHERE id = ? AND tenant_id = ?")
804 .bind(&id_str)
805 .bind(&self.tenant_id.0)
806 .fetch_optional(&self.pool)
807 .await
808 .map_err(map_db_err)?;
809
810 let Some(row) = row else {
811 return Err(StateBackendError::NotFound(id_str));
812 };
813 let item = row_to_work_item(&row)?;
814 let dead_lettered_at = Utc::now().to_rfc3339();
815
816 sqlx::query(
817 r#"INSERT OR REPLACE INTO dead_letter_items
818 (id, execution_id, node_id, queue_type, payload_json, attempt, last_error, created_at, dead_lettered_at, tenant_id)
819 VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
820 )
821 .bind(&id_str)
822 .bind(execution_id_str(&item.execution_id))
823 .bind(&item.node_id)
824 .bind(&item.queue_type)
825 .bind(serde_json::to_string(&item.payload)?)
826 .bind(item.attempt as i64)
827 .bind(last_error)
828 .bind(item.created_at.to_rfc3339())
829 .bind(dead_lettered_at)
830 .bind(&self.tenant_id.0)
831 .execute(&self.pool)
832 .await
833 .map_err(map_db_err)?;
834
835 sqlx::query("UPDATE work_items SET status = 'dead_lettered', lease_expires_at = NULL, worker_id = NULL WHERE id = ? AND tenant_id = ?")
836 .bind(&id_str)
837 .bind(&self.tenant_id.0)
838 .execute(&self.pool)
839 .await
840 .map_err(map_db_err)?;
841
842 Ok(())
843 }
844
845 async fn create_token(&self, name: &str, role: &str) -> BackendResult<(String, ApiToken)> {
848 use rand::Rng;
849 use sha2::{Digest, Sha256};
850
851 let random_bytes: [u8; 32] = rand::thread_rng().gen();
852 let token = format!(
853 "jj_{}",
854 random_bytes
855 .iter()
856 .map(|b| format!("{b:02x}"))
857 .collect::<String>()
858 );
859 let token_hash = format!("{:x}", Sha256::digest(token.as_bytes()));
860 let id = Uuid::new_v4().to_string();
861 let now = Utc::now().to_rfc3339();
862
863 sqlx::query(
864 r#"INSERT INTO api_tokens (id, token_hash, name, role, created_at, tenant_id)
865 VALUES (?, ?, ?, ?, ?, ?)"#,
866 )
867 .bind(&id)
868 .bind(&token_hash)
869 .bind(name)
870 .bind(role)
871 .bind(&now)
872 .bind(&self.tenant_id.0)
873 .execute(&self.pool)
874 .await
875 .map_err(map_db_err)?;
876
877 let info = ApiToken {
878 id,
879 name: name.to_string(),
880 role: role.to_string(),
881 created_at: Utc::now(),
882 expires_at: None,
883 tenant_id: self.tenant_id.0.clone(),
884 };
885 Ok((token, info))
886 }
887
888 async fn validate_token(&self, token: &str) -> BackendResult<Option<ApiToken>> {
889 use sha2::{Digest, Sha256};
890
891 let token_hash = format!("{:x}", Sha256::digest(token.as_bytes()));
892 let now = Utc::now().to_rfc3339();
893
894 let row = sqlx::query(
895 r#"SELECT id, name, role, created_at, expires_at, tenant_id
896 FROM api_tokens
897 WHERE token_hash = ?
898 AND revoked_at IS NULL
899 AND (expires_at IS NULL OR expires_at > ?)"#,
900 )
901 .bind(&token_hash)
902 .bind(&now)
903 .fetch_optional(&self.pool)
904 .await
905 .map_err(map_db_err)?;
906
907 let Some(row) = row else { return Ok(None) };
908
909 let id: String = row.get("id");
910 sqlx::query("UPDATE api_tokens SET last_used_at = ? WHERE id = ?")
911 .bind(&now)
912 .bind(&id)
913 .execute(&self.pool)
914 .await
915 .map_err(map_db_err)?;
916
917 let expires_at: Option<String> = row.get("expires_at");
918 let tenant_id: String = row
919 .try_get("tenant_id")
920 .unwrap_or_else(|_| DEFAULT_TENANT.to_string());
921
922 let info = ApiToken {
923 id,
924 name: row.get("name"),
925 role: row.get("role"),
926 created_at: row
927 .get::<String, _>("created_at")
928 .parse::<chrono::DateTime<Utc>>()
929 .unwrap_or_else(|_| Utc::now()),
930 expires_at: expires_at.and_then(|s| s.parse().ok()),
931 tenant_id,
932 };
933 Ok(Some(info))
934 }
935
936 async fn create_tenant(&self, tenant: Tenant) -> BackendResult<()> {
939 let policy_json = tenant
940 .policy
941 .as_ref()
942 .map(serde_json::to_string)
943 .transpose()?;
944 let limits_json = tenant
945 .limits
946 .as_ref()
947 .map(serde_json::to_string)
948 .transpose()
949 .map_err(StateBackendError::Serialization)?;
950
951 sqlx::query(
952 r#"INSERT INTO tenants (id, name, status, policy_json, limits_json, created_at, updated_at)
953 VALUES (?, ?, ?, ?, ?, ?, ?)"#,
954 )
955 .bind(&tenant.id.0)
956 .bind(&tenant.name)
957 .bind(tenant.status.as_str())
958 .bind(policy_json.as_deref())
959 .bind(limits_json.as_deref())
960 .bind(tenant.created_at.to_rfc3339())
961 .bind(tenant.updated_at.to_rfc3339())
962 .execute(&self.pool)
963 .await
964 .map_err(map_db_err)?;
965
966 Ok(())
967 }
968
969 async fn get_tenant(&self, id: &TenantId) -> BackendResult<Option<Tenant>> {
970 let row = sqlx::query("SELECT * FROM tenants WHERE id = ?")
971 .bind(&id.0)
972 .fetch_optional(&self.pool)
973 .await
974 .map_err(map_db_err)?;
975
976 let Some(row) = row else { return Ok(None) };
977 row_to_tenant(&row).map(Some)
978 }
979
980 async fn list_tenants(&self) -> BackendResult<Vec<Tenant>> {
981 let rows = sqlx::query("SELECT * FROM tenants ORDER BY created_at ASC")
982 .fetch_all(&self.pool)
983 .await
984 .map_err(map_db_err)?;
985
986 rows.iter().map(row_to_tenant).collect()
987 }
988
989 async fn update_tenant(&self, tenant: Tenant) -> BackendResult<()> {
990 let policy_json = tenant
991 .policy
992 .as_ref()
993 .map(serde_json::to_string)
994 .transpose()?;
995 let limits_json = tenant
996 .limits
997 .as_ref()
998 .map(serde_json::to_string)
999 .transpose()
1000 .map_err(StateBackendError::Serialization)?;
1001
1002 let rows_affected = sqlx::query(
1003 r#"UPDATE tenants SET name = ?, status = ?, policy_json = ?, limits_json = ?, updated_at = ?
1004 WHERE id = ?"#,
1005 )
1006 .bind(&tenant.name)
1007 .bind(tenant.status.as_str())
1008 .bind(policy_json.as_deref())
1009 .bind(limits_json.as_deref())
1010 .bind(tenant.updated_at.to_rfc3339())
1011 .bind(&tenant.id.0)
1012 .execute(&self.pool)
1013 .await
1014 .map_err(map_db_err)?
1015 .rows_affected();
1016
1017 if rows_affected == 0 {
1018 return Err(StateBackendError::NotFound(tenant.id.0));
1019 }
1020 Ok(())
1021 }
1022}
1023
1024fn parse_datetime_flexible(s: &str) -> BackendResult<DateTime<Utc>> {
1026 if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1028 return Ok(dt.with_timezone(&Utc));
1029 }
1030 if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1032 return Ok(naive.and_utc());
1033 }
1034 Err(StateBackendError::Database(format!(
1035 "invalid datetime: {s}"
1036 )))
1037}
1038
1039fn row_to_tenant(row: &sqlx::sqlite::SqliteRow) -> BackendResult<Tenant> {
1040 let policy: Option<serde_json::Value> = row
1041 .try_get::<Option<&str>, _>("policy_json")
1042 .map_err(map_db_err)?
1043 .map(serde_json::from_str)
1044 .transpose()
1045 .map_err(StateBackendError::Serialization)?;
1046
1047 let limits: Option<TenantLimits> = row
1048 .try_get::<Option<&str>, _>("limits_json")
1049 .map_err(map_db_err)?
1050 .map(serde_json::from_str)
1051 .transpose()
1052 .map_err(StateBackendError::Serialization)?;
1053
1054 let created_at =
1055 parse_datetime_flexible(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
1056 let updated_at =
1057 parse_datetime_flexible(row.try_get::<&str, _>("updated_at").map_err(map_db_err)?)?;
1058
1059 Ok(Tenant {
1060 id: TenantId(row.try_get::<String, _>("id").map_err(map_db_err)?),
1061 name: row.try_get::<String, _>("name").map_err(map_db_err)?,
1062 status: TenantStatus::parse(row.try_get::<&str, _>("status").map_err(map_db_err)?),
1063 policy,
1064 limits,
1065 created_at,
1066 updated_at,
1067 })
1068}