Skip to main content

jamjet_state/
tenant_scoped.rs

1//! Tenant-scoped SQLite backend — wraps a shared `SqlitePool` and transparently
2//! filters all queries by `tenant_id`.
3//!
4//! Workers serve all tenants: they claim any pending work item, then read the
5//! `tenant_id` from the claimed item and use a scoped backend for the rest of
6//! that item's execution.
7
8use crate::backend::{
9    ApiToken, BackendResult, ReclaimResult, StateBackend, StateBackendError, WorkItem, WorkItemId,
10    WorkflowDefinition,
11};
12use crate::event::{Event, EventKind, EventSequence};
13use crate::snapshot::Snapshot;
14use crate::sqlite::{map_db_err, parse_datetime, parse_execution_id};
15use crate::tenant::{Tenant, TenantId, TenantLimits, TenantStatus, DEFAULT_TENANT};
16use async_trait::async_trait;
17use chrono::{DateTime, Utc};
18use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
19use sqlx::{Row, SqlitePool};
20use tracing::instrument;
21use uuid::Uuid;
22
23/// A tenant-scoped view over a shared `SqlitePool`.
24///
25/// Every read/write operation is filtered by `tenant_id`, ensuring
26/// complete data isolation between tenants.
27pub struct TenantScopedSqliteBackend {
28    pub(crate) pool: SqlitePool,
29    pub(crate) tenant_id: TenantId,
30}
31
32impl TenantScopedSqliteBackend {
33    pub fn new(pool: SqlitePool, tenant_id: TenantId) -> Self {
34        Self { pool, tenant_id }
35    }
36
37    pub fn tenant_id(&self) -> &TenantId {
38        &self.tenant_id
39    }
40}
41
42// ── Helpers (re-use sqlite.rs parsers) ────────────────────────────────────────
43
44fn status_to_str(s: &WorkflowStatus) -> &'static str {
45    match s {
46        WorkflowStatus::Pending => "pending",
47        WorkflowStatus::Running => "running",
48        WorkflowStatus::Paused => "paused",
49        WorkflowStatus::Completed => "completed",
50        WorkflowStatus::Failed => "failed",
51        WorkflowStatus::Cancelled => "cancelled",
52        WorkflowStatus::LimitExceeded => "limit_exceeded",
53    }
54}
55
56fn str_to_status(s: &str) -> BackendResult<WorkflowStatus> {
57    match s {
58        "pending" => Ok(WorkflowStatus::Pending),
59        "running" => Ok(WorkflowStatus::Running),
60        "paused" => Ok(WorkflowStatus::Paused),
61        "completed" => Ok(WorkflowStatus::Completed),
62        "failed" => Ok(WorkflowStatus::Failed),
63        "cancelled" => Ok(WorkflowStatus::Cancelled),
64        "limit_exceeded" => Ok(WorkflowStatus::LimitExceeded),
65        other => Err(StateBackendError::Database(format!(
66            "unknown status: {other}"
67        ))),
68    }
69}
70
71fn execution_id_str(id: &ExecutionId) -> String {
72    id.0.to_string()
73}
74
75fn row_to_execution(row: &sqlx::sqlite::SqliteRow) -> BackendResult<WorkflowExecution> {
76    let execution_id =
77        parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
78    let status = str_to_status(row.try_get::<&str, _>("status").map_err(map_db_err)?)?;
79    let initial_input: serde_json::Value = serde_json::from_str(
80        row.try_get::<&str, _>("initial_input")
81            .map_err(map_db_err)?,
82    )
83    .map_err(StateBackendError::Serialization)?;
84    let current_state: serde_json::Value = serde_json::from_str(
85        row.try_get::<&str, _>("current_state")
86            .map_err(map_db_err)?,
87    )
88    .map_err(StateBackendError::Serialization)?;
89    let started_at = parse_datetime(row.try_get::<&str, _>("started_at").map_err(map_db_err)?)?;
90    let updated_at = parse_datetime(row.try_get::<&str, _>("updated_at").map_err(map_db_err)?)?;
91    let completed_at: Option<DateTime<Utc>> = row
92        .try_get::<Option<&str>, _>("completed_at")
93        .map_err(map_db_err)?
94        .map(parse_datetime)
95        .transpose()?;
96
97    Ok(WorkflowExecution {
98        execution_id,
99        workflow_id: row
100            .try_get::<String, _>("workflow_id")
101            .map_err(map_db_err)?,
102        workflow_version: row
103            .try_get::<String, _>("workflow_version")
104            .map_err(map_db_err)?,
105        status,
106        initial_input,
107        current_state,
108        started_at,
109        updated_at,
110        completed_at,
111        session_type: None,
112    })
113}
114
115fn row_to_event(row: &sqlx::sqlite::SqliteRow) -> BackendResult<Event> {
116    let id = Uuid::parse_str(row.try_get::<&str, _>("id").map_err(map_db_err)?)
117        .map_err(|e| StateBackendError::Database(e.to_string()))?;
118    let execution_id =
119        parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
120    let sequence: i64 = row.try_get("sequence").map_err(map_db_err)?;
121    let kind: EventKind =
122        serde_json::from_str(row.try_get::<&str, _>("kind_json").map_err(map_db_err)?)
123            .map_err(StateBackendError::Serialization)?;
124    let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
125
126    Ok(Event {
127        id,
128        execution_id,
129        sequence,
130        kind,
131        created_at,
132    })
133}
134
135fn row_to_work_item(row: &sqlx::sqlite::SqliteRow) -> BackendResult<WorkItem> {
136    let id = Uuid::parse_str(row.try_get::<&str, _>("id").map_err(map_db_err)?)
137        .map_err(|e| StateBackendError::Database(e.to_string()))?;
138    let execution_id =
139        parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
140    let payload: serde_json::Value =
141        serde_json::from_str(row.try_get::<&str, _>("payload_json").map_err(map_db_err)?)
142            .map_err(StateBackendError::Serialization)?;
143    let lease_expires_at: Option<DateTime<Utc>> = row
144        .try_get::<Option<&str>, _>("lease_expires_at")
145        .map_err(map_db_err)?
146        .map(parse_datetime)
147        .transpose()?;
148    let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
149    let attempt: i64 = row.try_get("attempt").map_err(map_db_err)?;
150    let max_attempts: i64 = row.try_get("max_attempts").unwrap_or(3);
151    let tenant_id: String = row
152        .try_get("tenant_id")
153        .unwrap_or_else(|_| DEFAULT_TENANT.to_string());
154
155    Ok(WorkItem {
156        id,
157        execution_id,
158        node_id: row.try_get::<String, _>("node_id").map_err(map_db_err)?,
159        queue_type: row.try_get::<String, _>("queue_type").map_err(map_db_err)?,
160        payload,
161        attempt: attempt as u32,
162        max_attempts: max_attempts as u32,
163        created_at,
164        lease_expires_at,
165        worker_id: row
166            .try_get::<Option<String>, _>("worker_id")
167            .map_err(map_db_err)?,
168        tenant_id,
169    })
170}
171
172// ── StateBackend impl ─────────────────────────────────────────────────────────
173
174#[async_trait]
175impl StateBackend for TenantScopedSqliteBackend {
176    // ── Workflow definitions ──────────────────────────────────────────────
177
178    #[instrument(skip(self, def), fields(tenant = %self.tenant_id, workflow_id = %def.workflow_id))]
179    async fn store_workflow(&self, mut def: WorkflowDefinition) -> BackendResult<()> {
180        def.tenant_id = self.tenant_id.0.clone();
181        let ir_json = serde_json::to_string(&def.ir)?;
182        let created_at = def.created_at.to_rfc3339();
183
184        sqlx::query(
185            r#"INSERT OR REPLACE INTO workflow_definitions (workflow_id, version, ir_json, created_at, tenant_id)
186               VALUES (?, ?, ?, ?, ?)"#,
187        )
188        .bind(&def.workflow_id)
189        .bind(&def.version)
190        .bind(&ir_json)
191        .bind(&created_at)
192        .bind(&self.tenant_id.0)
193        .execute(&self.pool)
194        .await
195        .map_err(map_db_err)?;
196
197        Ok(())
198    }
199
200    #[instrument(skip(self), fields(tenant = %self.tenant_id, workflow_id = workflow_id))]
201    async fn get_workflow(
202        &self,
203        workflow_id: &str,
204        version: &str,
205    ) -> BackendResult<Option<WorkflowDefinition>> {
206        let row = sqlx::query(
207            "SELECT * FROM workflow_definitions WHERE workflow_id = ? AND version = ? AND tenant_id = ?",
208        )
209        .bind(workflow_id)
210        .bind(version)
211        .bind(&self.tenant_id.0)
212        .fetch_optional(&self.pool)
213        .await
214        .map_err(map_db_err)?;
215
216        let Some(row) = row else { return Ok(None) };
217
218        let ir: serde_json::Value =
219            serde_json::from_str(row.try_get::<&str, _>("ir_json").map_err(map_db_err)?)
220                .map_err(StateBackendError::Serialization)?;
221        let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
222
223        Ok(Some(WorkflowDefinition {
224            workflow_id: row
225                .try_get::<String, _>("workflow_id")
226                .map_err(map_db_err)?,
227            version: row.try_get::<String, _>("version").map_err(map_db_err)?,
228            ir,
229            created_at,
230            tenant_id: self.tenant_id.0.clone(),
231        }))
232    }
233
234    // ── Executions ────────────────────────────────────────────────────────
235
236    #[instrument(skip(self, execution), fields(tenant = %self.tenant_id, execution_id = %execution.execution_id))]
237    async fn create_execution(&self, execution: WorkflowExecution) -> BackendResult<()> {
238        let id = execution_id_str(&execution.execution_id);
239        let status = status_to_str(&execution.status);
240        let initial_input = serde_json::to_string(&execution.initial_input)?;
241        let current_state = serde_json::to_string(&execution.current_state)?;
242        let started_at = execution.started_at.to_rfc3339();
243        let updated_at = execution.updated_at.to_rfc3339();
244        let completed_at = execution.completed_at.map(|dt| dt.to_rfc3339());
245
246        sqlx::query(
247            r#"INSERT INTO workflow_executions
248               (execution_id, workflow_id, workflow_version, status, initial_input, current_state,
249                started_at, updated_at, completed_at, tenant_id)
250               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
251        )
252        .bind(&id)
253        .bind(&execution.workflow_id)
254        .bind(&execution.workflow_version)
255        .bind(status)
256        .bind(&initial_input)
257        .bind(&current_state)
258        .bind(&started_at)
259        .bind(&updated_at)
260        .bind(completed_at.as_deref())
261        .bind(&self.tenant_id.0)
262        .execute(&self.pool)
263        .await
264        .map_err(map_db_err)?;
265
266        Ok(())
267    }
268
269    #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %id))]
270    async fn get_execution(&self, id: &ExecutionId) -> BackendResult<Option<WorkflowExecution>> {
271        let id_str = execution_id_str(id);
272        let row = sqlx::query(
273            "SELECT * FROM workflow_executions WHERE execution_id = ? AND tenant_id = ?",
274        )
275        .bind(&id_str)
276        .bind(&self.tenant_id.0)
277        .fetch_optional(&self.pool)
278        .await
279        .map_err(map_db_err)?;
280
281        row.map(|r| row_to_execution(&r)).transpose()
282    }
283
284    #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %id))]
285    async fn update_execution_status(
286        &self,
287        id: &ExecutionId,
288        status: WorkflowStatus,
289    ) -> BackendResult<()> {
290        let id_str = execution_id_str(id);
291        let status_str = status_to_str(&status);
292        let now = Utc::now().to_rfc3339();
293        let completed_at = if status.is_terminal() {
294            Some(now.clone())
295        } else {
296            None
297        };
298
299        let rows_affected = sqlx::query(
300            "UPDATE workflow_executions SET status = ?, updated_at = ?, completed_at = COALESCE(?, completed_at) WHERE execution_id = ? AND tenant_id = ?",
301        )
302        .bind(status_str)
303        .bind(&now)
304        .bind(completed_at.as_deref())
305        .bind(&id_str)
306        .bind(&self.tenant_id.0)
307        .execute(&self.pool)
308        .await
309        .map_err(map_db_err)?
310        .rows_affected();
311
312        if rows_affected == 0 {
313            return Err(StateBackendError::NotFound(id_str));
314        }
315        Ok(())
316    }
317
318    async fn update_execution_current_state(
319        &self,
320        id: &ExecutionId,
321        current_state: &serde_json::Value,
322    ) -> BackendResult<()> {
323        let id_str = execution_id_str(id);
324        let state_str =
325            serde_json::to_string(current_state).map_err(StateBackendError::Serialization)?;
326        let now = Utc::now().to_rfc3339();
327        sqlx::query(
328            "UPDATE workflow_executions SET current_state = ?, updated_at = ? WHERE execution_id = ? AND tenant_id = ?",
329        )
330        .bind(&state_str)
331        .bind(&now)
332        .bind(&id_str)
333        .bind(&self.tenant_id.0)
334        .execute(&self.pool)
335        .await
336        .map_err(map_db_err)?;
337        Ok(())
338    }
339
340    async fn patch_append_array(
341        &self,
342        execution_id: &ExecutionId,
343        key: &str,
344        value: serde_json::Value,
345    ) -> BackendResult<()> {
346        let exec = self
347            .get_execution(execution_id)
348            .await?
349            .ok_or_else(|| StateBackendError::NotFound(format!("execution {execution_id}")))?;
350        let mut state = exec.current_state.clone();
351        let arr = state
352            .as_object_mut()
353            .ok_or_else(|| StateBackendError::Database("state is not a JSON object".into()))?
354            .entry(key)
355            .or_insert_with(|| serde_json::json!([]));
356        arr.as_array_mut()
357            .ok_or_else(|| StateBackendError::Database(format!("{key} is not an array")))?
358            .push(value);
359        self.update_execution_current_state(execution_id, &state)
360            .await
361    }
362
363    #[instrument(skip(self), fields(tenant = %self.tenant_id))]
364    async fn list_executions(
365        &self,
366        status: Option<WorkflowStatus>,
367        limit: u32,
368        offset: u32,
369    ) -> BackendResult<Vec<WorkflowExecution>> {
370        let rows = match status {
371            Some(s) => {
372                let status_str = status_to_str(&s);
373                sqlx::query(
374                    "SELECT * FROM workflow_executions WHERE status = ? AND tenant_id = ? ORDER BY updated_at DESC LIMIT ? OFFSET ?",
375                )
376                .bind(status_str)
377                .bind(&self.tenant_id.0)
378                .bind(limit as i64)
379                .bind(offset as i64)
380                .fetch_all(&self.pool)
381                .await
382                .map_err(map_db_err)?
383            }
384            None => sqlx::query(
385                "SELECT * FROM workflow_executions WHERE tenant_id = ? ORDER BY updated_at DESC LIMIT ? OFFSET ?",
386            )
387            .bind(&self.tenant_id.0)
388            .bind(limit as i64)
389            .bind(offset as i64)
390            .fetch_all(&self.pool)
391            .await
392            .map_err(map_db_err)?,
393        };
394
395        rows.iter().map(row_to_execution).collect()
396    }
397
398    // ── Event log ─────────────────────────────────────────────────────────
399
400    #[instrument(skip(self, event), fields(tenant = %self.tenant_id, execution_id = %event.execution_id))]
401    async fn append_event(&self, event: Event) -> BackendResult<EventSequence> {
402        let id = event.id.to_string();
403        let execution_id = execution_id_str(&event.execution_id);
404        let kind_json = serde_json::to_string(&event.kind)?;
405        let created_at = event.created_at.to_rfc3339();
406
407        // Assign the sequence atomically (see SqliteBackend::append_event). The
408        // MAX is scoped to the execution only (not the tenant), so it agrees with
409        // the base backend's sequence even when an execution's events carry mixed
410        // tenant_id rows (scheduler/worker run on the base backend in dev).
411        let mut tx = self.pool.begin().await.map_err(map_db_err)?;
412        let seq_row = sqlx::query(
413            "SELECT COALESCE(MAX(sequence), 0) + 1 AS seq FROM events WHERE execution_id = ?",
414        )
415        .bind(&execution_id)
416        .fetch_one(&mut *tx)
417        .await
418        .map_err(map_db_err)?;
419        let sequence: i64 = seq_row.try_get::<i64, _>("seq").map_err(map_db_err)?;
420
421        sqlx::query(
422            r#"INSERT INTO events (id, execution_id, sequence, kind_json, created_at, tenant_id)
423               VALUES (?, ?, ?, ?, ?, ?)"#,
424        )
425        .bind(&id)
426        .bind(&execution_id)
427        .bind(sequence)
428        .bind(&kind_json)
429        .bind(&created_at)
430        .bind(&self.tenant_id.0)
431        .execute(&mut *tx)
432        .await
433        .map_err(map_db_err)?;
434        tx.commit().await.map_err(map_db_err)?;
435
436        Ok(sequence)
437    }
438
439    #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
440    async fn get_events(&self, execution_id: &ExecutionId) -> BackendResult<Vec<Event>> {
441        let id_str = execution_id_str(execution_id);
442        let rows = sqlx::query(
443            "SELECT * FROM events WHERE execution_id = ? AND tenant_id = ? ORDER BY sequence ASC",
444        )
445        .bind(&id_str)
446        .bind(&self.tenant_id.0)
447        .fetch_all(&self.pool)
448        .await
449        .map_err(map_db_err)?;
450
451        rows.iter().map(row_to_event).collect()
452    }
453
454    #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
455    async fn get_events_since(
456        &self,
457        execution_id: &ExecutionId,
458        since_sequence: EventSequence,
459    ) -> BackendResult<Vec<Event>> {
460        let id_str = execution_id_str(execution_id);
461        let rows = sqlx::query(
462            "SELECT * FROM events WHERE execution_id = ? AND tenant_id = ? AND sequence > ? ORDER BY sequence ASC",
463        )
464        .bind(&id_str)
465        .bind(&self.tenant_id.0)
466        .bind(since_sequence)
467        .fetch_all(&self.pool)
468        .await
469        .map_err(map_db_err)?;
470
471        rows.iter().map(row_to_event).collect()
472    }
473
474    #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
475    async fn latest_sequence(&self, execution_id: &ExecutionId) -> BackendResult<EventSequence> {
476        let id_str = execution_id_str(execution_id);
477        let row = sqlx::query(
478            "SELECT COALESCE(MAX(sequence), 0) as seq FROM events WHERE execution_id = ? AND tenant_id = ?",
479        )
480        .bind(&id_str)
481        .bind(&self.tenant_id.0)
482        .fetch_one(&self.pool)
483        .await
484        .map_err(map_db_err)?;
485
486        Ok(row.try_get::<i64, _>("seq").map_err(map_db_err)?)
487    }
488
489    // ── Snapshots ─────────────────────────────────────────────────────────
490
491    #[instrument(skip(self, snapshot), fields(tenant = %self.tenant_id, execution_id = %snapshot.execution_id))]
492    async fn write_snapshot(&self, snapshot: Snapshot) -> BackendResult<()> {
493        let id = snapshot.id.to_string();
494        let execution_id = execution_id_str(&snapshot.execution_id);
495        let state_json = serde_json::to_string(&snapshot.state)?;
496        let created_at = snapshot.created_at.to_rfc3339();
497
498        sqlx::query(
499            r#"INSERT OR REPLACE INTO snapshots (id, execution_id, at_sequence, state_json, created_at, tenant_id)
500               VALUES (?, ?, ?, ?, ?, ?)"#,
501        )
502        .bind(&id)
503        .bind(&execution_id)
504        .bind(snapshot.at_sequence)
505        .bind(&state_json)
506        .bind(&created_at)
507        .bind(&self.tenant_id.0)
508        .execute(&self.pool)
509        .await
510        .map_err(map_db_err)?;
511
512        Ok(())
513    }
514
515    #[instrument(skip(self), fields(tenant = %self.tenant_id, execution_id = %execution_id))]
516    async fn latest_snapshot(&self, execution_id: &ExecutionId) -> BackendResult<Option<Snapshot>> {
517        let id_str = execution_id_str(execution_id);
518        let row = sqlx::query(
519            "SELECT * FROM snapshots WHERE execution_id = ? AND tenant_id = ? ORDER BY at_sequence DESC LIMIT 1",
520        )
521        .bind(&id_str)
522        .bind(&self.tenant_id.0)
523        .fetch_optional(&self.pool)
524        .await
525        .map_err(map_db_err)?;
526
527        let Some(row) = row else { return Ok(None) };
528
529        let id = Uuid::parse_str(row.try_get::<&str, _>("id").map_err(map_db_err)?)
530            .map_err(|e| StateBackendError::Database(e.to_string()))?;
531        let execution_id =
532            parse_execution_id(row.try_get::<&str, _>("execution_id").map_err(map_db_err)?)?;
533        let at_sequence: i64 = row.try_get("at_sequence").map_err(map_db_err)?;
534        let state: serde_json::Value =
535            serde_json::from_str(row.try_get::<&str, _>("state_json").map_err(map_db_err)?)
536                .map_err(StateBackendError::Serialization)?;
537        let created_at = parse_datetime(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
538
539        Ok(Some(Snapshot {
540            id,
541            execution_id,
542            at_sequence,
543            state,
544            created_at,
545        }))
546    }
547
548    // ── Work item queue ───────────────────────────────────────────────────
549
550    #[instrument(skip(self, item), fields(tenant = %self.tenant_id, execution_id = %item.execution_id))]
551    async fn enqueue_work_item(&self, mut item: WorkItem) -> BackendResult<WorkItemId> {
552        item.tenant_id = self.tenant_id.0.clone();
553        let id = item.id.to_string();
554        let execution_id = execution_id_str(&item.execution_id);
555        let payload_json = serde_json::to_string(&item.payload)?;
556        let created_at = item.created_at.to_rfc3339();
557
558        sqlx::query(
559            r#"INSERT INTO work_items
560               (id, execution_id, node_id, queue_type, payload_json, attempt, max_attempts, status, created_at, tenant_id)
561               VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?, ?)"#,
562        )
563        .bind(&id)
564        .bind(&execution_id)
565        .bind(&item.node_id)
566        .bind(&item.queue_type)
567        .bind(&payload_json)
568        .bind(item.attempt as i64)
569        .bind(item.max_attempts as i64)
570        .bind(&created_at)
571        .bind(&self.tenant_id.0)
572        .execute(&self.pool)
573        .await
574        .map_err(map_db_err)?;
575
576        Ok(item.id)
577    }
578
579    #[instrument(skip(self), fields(tenant = %self.tenant_id, worker_id = worker_id))]
580    async fn claim_work_item(
581        &self,
582        worker_id: &str,
583        queue_types: &[&str],
584    ) -> BackendResult<Option<WorkItem>> {
585        if queue_types.is_empty() {
586            return Ok(None);
587        }
588
589        let now = Utc::now().to_rfc3339();
590        // Expire stale leases for this tenant.
591        sqlx::query(
592            "UPDATE work_items SET status = 'pending', worker_id = NULL, lease_expires_at = NULL \
593             WHERE status = 'claimed' AND lease_expires_at < ? AND tenant_id = ?",
594        )
595        .bind(&now)
596        .bind(&self.tenant_id.0)
597        .execute(&self.pool)
598        .await
599        .map_err(map_db_err)?;
600
601        let mut tx = self.pool.begin().await.map_err(map_db_err)?;
602
603        let placeholders = queue_types
604            .iter()
605            .map(|_| "?")
606            .collect::<Vec<_>>()
607            .join(",");
608        let query_str = format!(
609            "SELECT * FROM work_items WHERE status = 'pending' AND queue_type IN ({}) \
610             AND tenant_id = ? AND (retry_after IS NULL OR retry_after <= ?) ORDER BY created_at ASC LIMIT 1",
611            placeholders
612        );
613        let mut q = sqlx::query(&query_str);
614        for qt in queue_types {
615            q = q.bind(*qt);
616        }
617        q = q.bind(&self.tenant_id.0);
618        q = q.bind(&now);
619        let row = q.fetch_optional(&mut *tx).await.map_err(map_db_err)?;
620
621        let Some(row) = row else {
622            tx.rollback().await.map_err(map_db_err)?;
623            return Ok(None);
624        };
625
626        let item = row_to_work_item(&row)?;
627        let item_id = item.id.to_string();
628        let lease_expires_at = (Utc::now() + chrono::Duration::seconds(30)).to_rfc3339();
629        let claimed_at = Utc::now().to_rfc3339();
630
631        sqlx::query(
632            "UPDATE work_items SET status = 'claimed', worker_id = ?, lease_expires_at = ?, claimed_at = ? WHERE id = ?",
633        )
634        .bind(worker_id)
635        .bind(&lease_expires_at)
636        .bind(&claimed_at)
637        .bind(&item_id)
638        .execute(&mut *tx)
639        .await
640        .map_err(map_db_err)?;
641
642        tx.commit().await.map_err(map_db_err)?;
643
644        let mut claimed = item;
645        claimed.worker_id = Some(worker_id.to_string());
646        claimed.lease_expires_at = Some(
647            DateTime::parse_from_rfc3339(&lease_expires_at)
648                .map(|dt| dt.with_timezone(&Utc))
649                .map_err(|e| StateBackendError::Database(e.to_string()))?,
650        );
651        Ok(Some(claimed))
652    }
653
654    #[instrument(skip(self), fields(tenant = %self.tenant_id, item_id = %item_id))]
655    async fn renew_lease(&self, item_id: WorkItemId, worker_id: &str) -> BackendResult<()> {
656        let lease_expires_at = (Utc::now() + chrono::Duration::seconds(30)).to_rfc3339();
657        let id_str = item_id.to_string();
658
659        let rows_affected = sqlx::query(
660            "UPDATE work_items SET lease_expires_at = ? WHERE id = ? AND worker_id = ? AND status = 'claimed' AND tenant_id = ?",
661        )
662        .bind(&lease_expires_at)
663        .bind(&id_str)
664        .bind(worker_id)
665        .bind(&self.tenant_id.0)
666        .execute(&self.pool)
667        .await
668        .map_err(map_db_err)?
669        .rows_affected();
670
671        if rows_affected == 0 {
672            return Err(StateBackendError::NotFound(id_str));
673        }
674        Ok(())
675    }
676
677    #[instrument(skip(self), fields(tenant = %self.tenant_id, item_id = %item_id))]
678    async fn complete_work_item(&self, item_id: WorkItemId) -> BackendResult<()> {
679        let id_str = item_id.to_string();
680        let completed_at = Utc::now().to_rfc3339();
681
682        let rows_affected = sqlx::query(
683            "UPDATE work_items SET status = 'completed', completed_at = ?, lease_expires_at = NULL WHERE id = ? AND tenant_id = ?",
684        )
685        .bind(&completed_at)
686        .bind(&id_str)
687        .bind(&self.tenant_id.0)
688        .execute(&self.pool)
689        .await
690        .map_err(map_db_err)?
691        .rows_affected();
692
693        if rows_affected == 0 {
694            return Err(StateBackendError::NotFound(id_str));
695        }
696        Ok(())
697    }
698
699    #[instrument(skip(self, error), fields(tenant = %self.tenant_id, item_id = %item_id))]
700    async fn fail_work_item(&self, item_id: WorkItemId, error: &str) -> BackendResult<()> {
701        let id_str = item_id.to_string();
702        let _ = error;
703
704        let rows_affected = sqlx::query(
705            "UPDATE work_items SET status = 'failed', lease_expires_at = NULL, worker_id = NULL WHERE id = ? AND tenant_id = ?",
706        )
707        .bind(&id_str)
708        .bind(&self.tenant_id.0)
709        .execute(&self.pool)
710        .await
711        .map_err(map_db_err)?
712        .rows_affected();
713
714        if rows_affected == 0 {
715            return Err(StateBackendError::NotFound(id_str));
716        }
717        Ok(())
718    }
719
720    #[instrument(skip(self), fields(tenant = %self.tenant_id))]
721    async fn reclaim_expired_leases(&self) -> BackendResult<ReclaimResult> {
722        let now = Utc::now().to_rfc3339();
723
724        let rows = sqlx::query(
725            "SELECT * FROM work_items WHERE status = 'claimed' AND lease_expires_at < ? AND tenant_id = ? ORDER BY created_at ASC",
726        )
727        .bind(&now)
728        .bind(&self.tenant_id.0)
729        .fetch_all(&self.pool)
730        .await
731        .map_err(map_db_err)?;
732
733        let mut result = ReclaimResult::default();
734
735        for row in &rows {
736            let item = row_to_work_item(row)?;
737            let new_attempt = item.attempt + 1;
738            let id_str = item.id.to_string();
739
740            if new_attempt >= item.max_attempts {
741                let dead_lettered_at = Utc::now().to_rfc3339();
742                sqlx::query(
743                    r#"INSERT OR IGNORE INTO dead_letter_items
744                       (id, execution_id, node_id, queue_type, payload_json, attempt, last_error, created_at, dead_lettered_at, tenant_id)
745                       VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
746                )
747                .bind(&id_str)
748                .bind(execution_id_str(&item.execution_id))
749                .bind(&item.node_id)
750                .bind(&item.queue_type)
751                .bind(serde_json::to_string(&item.payload)?)
752                .bind(new_attempt as i64)
753                .bind("lease expired: worker dead")
754                .bind(item.created_at.to_rfc3339())
755                .bind(dead_lettered_at)
756                .bind(&self.tenant_id.0)
757                .execute(&self.pool)
758                .await
759                .map_err(map_db_err)?;
760
761                sqlx::query("UPDATE work_items SET status = 'dead_lettered', attempt = ?, lease_expires_at = NULL, worker_id = NULL WHERE id = ?")
762                    .bind(new_attempt as i64)
763                    .bind(&id_str)
764                    .execute(&self.pool)
765                    .await
766                    .map_err(map_db_err)?;
767
768                let mut exhausted_item = item;
769                exhausted_item.attempt = new_attempt;
770                result.exhausted.push(exhausted_item);
771            } else {
772                let backoff_secs = 1u64 << new_attempt.min(6);
773                let retry_after =
774                    (Utc::now() + chrono::Duration::seconds(backoff_secs as i64)).to_rfc3339();
775
776                sqlx::query(
777                    "UPDATE work_items SET status = 'pending', attempt = ?, worker_id = NULL, lease_expires_at = NULL, retry_after = ? WHERE id = ?",
778                )
779                .bind(new_attempt as i64)
780                .bind(&retry_after)
781                .bind(&id_str)
782                .execute(&self.pool)
783                .await
784                .map_err(map_db_err)?;
785
786                let mut retry_item = item;
787                retry_item.attempt = new_attempt;
788                result.retryable.push(retry_item);
789            }
790        }
791
792        Ok(result)
793    }
794
795    #[instrument(skip(self, last_error), fields(tenant = %self.tenant_id, item_id = %item_id))]
796    async fn move_to_dead_letter(
797        &self,
798        item_id: WorkItemId,
799        last_error: &str,
800    ) -> BackendResult<()> {
801        let id_str = item_id.to_string();
802
803        let row = sqlx::query("SELECT * FROM work_items WHERE id = ? AND tenant_id = ?")
804            .bind(&id_str)
805            .bind(&self.tenant_id.0)
806            .fetch_optional(&self.pool)
807            .await
808            .map_err(map_db_err)?;
809
810        let Some(row) = row else {
811            return Err(StateBackendError::NotFound(id_str));
812        };
813        let item = row_to_work_item(&row)?;
814        let dead_lettered_at = Utc::now().to_rfc3339();
815
816        sqlx::query(
817            r#"INSERT OR REPLACE INTO dead_letter_items
818               (id, execution_id, node_id, queue_type, payload_json, attempt, last_error, created_at, dead_lettered_at, tenant_id)
819               VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
820        )
821        .bind(&id_str)
822        .bind(execution_id_str(&item.execution_id))
823        .bind(&item.node_id)
824        .bind(&item.queue_type)
825        .bind(serde_json::to_string(&item.payload)?)
826        .bind(item.attempt as i64)
827        .bind(last_error)
828        .bind(item.created_at.to_rfc3339())
829        .bind(dead_lettered_at)
830        .bind(&self.tenant_id.0)
831        .execute(&self.pool)
832        .await
833        .map_err(map_db_err)?;
834
835        sqlx::query("UPDATE work_items SET status = 'dead_lettered', lease_expires_at = NULL, worker_id = NULL WHERE id = ? AND tenant_id = ?")
836            .bind(&id_str)
837            .bind(&self.tenant_id.0)
838            .execute(&self.pool)
839            .await
840            .map_err(map_db_err)?;
841
842        Ok(())
843    }
844
845    // ── API tokens ────────────────────────────────────────────────────────
846
847    async fn create_token(&self, name: &str, role: &str) -> BackendResult<(String, ApiToken)> {
848        use rand::Rng;
849        use sha2::{Digest, Sha256};
850
851        let random_bytes: [u8; 32] = rand::thread_rng().gen();
852        let token = format!(
853            "jj_{}",
854            random_bytes
855                .iter()
856                .map(|b| format!("{b:02x}"))
857                .collect::<String>()
858        );
859        let token_hash = format!("{:x}", Sha256::digest(token.as_bytes()));
860        let id = Uuid::new_v4().to_string();
861        let now = Utc::now().to_rfc3339();
862
863        sqlx::query(
864            r#"INSERT INTO api_tokens (id, token_hash, name, role, created_at, tenant_id)
865               VALUES (?, ?, ?, ?, ?, ?)"#,
866        )
867        .bind(&id)
868        .bind(&token_hash)
869        .bind(name)
870        .bind(role)
871        .bind(&now)
872        .bind(&self.tenant_id.0)
873        .execute(&self.pool)
874        .await
875        .map_err(map_db_err)?;
876
877        let info = ApiToken {
878            id,
879            name: name.to_string(),
880            role: role.to_string(),
881            created_at: Utc::now(),
882            expires_at: None,
883            tenant_id: self.tenant_id.0.clone(),
884        };
885        Ok((token, info))
886    }
887
888    async fn validate_token(&self, token: &str) -> BackendResult<Option<ApiToken>> {
889        use sha2::{Digest, Sha256};
890
891        let token_hash = format!("{:x}", Sha256::digest(token.as_bytes()));
892        let now = Utc::now().to_rfc3339();
893
894        let row = sqlx::query(
895            r#"SELECT id, name, role, created_at, expires_at, tenant_id
896               FROM api_tokens
897               WHERE token_hash = ?
898                 AND revoked_at IS NULL
899                 AND (expires_at IS NULL OR expires_at > ?)"#,
900        )
901        .bind(&token_hash)
902        .bind(&now)
903        .fetch_optional(&self.pool)
904        .await
905        .map_err(map_db_err)?;
906
907        let Some(row) = row else { return Ok(None) };
908
909        let id: String = row.get("id");
910        sqlx::query("UPDATE api_tokens SET last_used_at = ? WHERE id = ?")
911            .bind(&now)
912            .bind(&id)
913            .execute(&self.pool)
914            .await
915            .map_err(map_db_err)?;
916
917        let expires_at: Option<String> = row.get("expires_at");
918        let tenant_id: String = row
919            .try_get("tenant_id")
920            .unwrap_or_else(|_| DEFAULT_TENANT.to_string());
921
922        let info = ApiToken {
923            id,
924            name: row.get("name"),
925            role: row.get("role"),
926            created_at: row
927                .get::<String, _>("created_at")
928                .parse::<chrono::DateTime<Utc>>()
929                .unwrap_or_else(|_| Utc::now()),
930            expires_at: expires_at.and_then(|s| s.parse().ok()),
931            tenant_id,
932        };
933        Ok(Some(info))
934    }
935
936    // ── Tenant management ─────────────────────────────────────────────────
937
938    async fn create_tenant(&self, tenant: Tenant) -> BackendResult<()> {
939        let policy_json = tenant
940            .policy
941            .as_ref()
942            .map(serde_json::to_string)
943            .transpose()?;
944        let limits_json = tenant
945            .limits
946            .as_ref()
947            .map(serde_json::to_string)
948            .transpose()
949            .map_err(StateBackendError::Serialization)?;
950
951        sqlx::query(
952            r#"INSERT INTO tenants (id, name, status, policy_json, limits_json, created_at, updated_at)
953               VALUES (?, ?, ?, ?, ?, ?, ?)"#,
954        )
955        .bind(&tenant.id.0)
956        .bind(&tenant.name)
957        .bind(tenant.status.as_str())
958        .bind(policy_json.as_deref())
959        .bind(limits_json.as_deref())
960        .bind(tenant.created_at.to_rfc3339())
961        .bind(tenant.updated_at.to_rfc3339())
962        .execute(&self.pool)
963        .await
964        .map_err(map_db_err)?;
965
966        Ok(())
967    }
968
969    async fn get_tenant(&self, id: &TenantId) -> BackendResult<Option<Tenant>> {
970        let row = sqlx::query("SELECT * FROM tenants WHERE id = ?")
971            .bind(&id.0)
972            .fetch_optional(&self.pool)
973            .await
974            .map_err(map_db_err)?;
975
976        let Some(row) = row else { return Ok(None) };
977        row_to_tenant(&row).map(Some)
978    }
979
980    async fn list_tenants(&self) -> BackendResult<Vec<Tenant>> {
981        let rows = sqlx::query("SELECT * FROM tenants ORDER BY created_at ASC")
982            .fetch_all(&self.pool)
983            .await
984            .map_err(map_db_err)?;
985
986        rows.iter().map(row_to_tenant).collect()
987    }
988
989    async fn update_tenant(&self, tenant: Tenant) -> BackendResult<()> {
990        let policy_json = tenant
991            .policy
992            .as_ref()
993            .map(serde_json::to_string)
994            .transpose()?;
995        let limits_json = tenant
996            .limits
997            .as_ref()
998            .map(serde_json::to_string)
999            .transpose()
1000            .map_err(StateBackendError::Serialization)?;
1001
1002        let rows_affected = sqlx::query(
1003            r#"UPDATE tenants SET name = ?, status = ?, policy_json = ?, limits_json = ?, updated_at = ?
1004               WHERE id = ?"#,
1005        )
1006        .bind(&tenant.name)
1007        .bind(tenant.status.as_str())
1008        .bind(policy_json.as_deref())
1009        .bind(limits_json.as_deref())
1010        .bind(tenant.updated_at.to_rfc3339())
1011        .bind(&tenant.id.0)
1012        .execute(&self.pool)
1013        .await
1014        .map_err(map_db_err)?
1015        .rows_affected();
1016
1017        if rows_affected == 0 {
1018            return Err(StateBackendError::NotFound(tenant.id.0));
1019        }
1020        Ok(())
1021    }
1022}
1023
1024/// Parse a datetime that might be either RFC 3339 or SQLite's `datetime('now')` format.
1025fn parse_datetime_flexible(s: &str) -> BackendResult<DateTime<Utc>> {
1026    // Try RFC 3339 first (e.g. "2026-03-12T00:00:00+00:00")
1027    if let Ok(dt) = DateTime::parse_from_rfc3339(s) {
1028        return Ok(dt.with_timezone(&Utc));
1029    }
1030    // Fallback: SQLite datetime format "YYYY-MM-DD HH:MM:SS"
1031    if let Ok(naive) = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%d %H:%M:%S") {
1032        return Ok(naive.and_utc());
1033    }
1034    Err(StateBackendError::Database(format!(
1035        "invalid datetime: {s}"
1036    )))
1037}
1038
1039fn row_to_tenant(row: &sqlx::sqlite::SqliteRow) -> BackendResult<Tenant> {
1040    let policy: Option<serde_json::Value> = row
1041        .try_get::<Option<&str>, _>("policy_json")
1042        .map_err(map_db_err)?
1043        .map(serde_json::from_str)
1044        .transpose()
1045        .map_err(StateBackendError::Serialization)?;
1046
1047    let limits: Option<TenantLimits> = row
1048        .try_get::<Option<&str>, _>("limits_json")
1049        .map_err(map_db_err)?
1050        .map(serde_json::from_str)
1051        .transpose()
1052        .map_err(StateBackendError::Serialization)?;
1053
1054    let created_at =
1055        parse_datetime_flexible(row.try_get::<&str, _>("created_at").map_err(map_db_err)?)?;
1056    let updated_at =
1057        parse_datetime_flexible(row.try_get::<&str, _>("updated_at").map_err(map_db_err)?)?;
1058
1059    Ok(Tenant {
1060        id: TenantId(row.try_get::<String, _>("id").map_err(map_db_err)?),
1061        name: row.try_get::<String, _>("name").map_err(map_db_err)?,
1062        status: TenantStatus::parse(row.try_get::<&str, _>("status").map_err(map_db_err)?),
1063        policy,
1064        limits,
1065        created_at,
1066        updated_at,
1067    })
1068}