Skip to main content

jamjet_state/
backend.rs

1use crate::event::{Event, EventSequence};
2use crate::snapshot::Snapshot;
3use crate::tenant::{Tenant, TenantId, DEFAULT_TENANT};
4use async_trait::async_trait;
5use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
6use thiserror::Error;
7
8/// Workflow definition stored in the registry (the compiled IR as JSON).
9#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
10pub struct WorkflowDefinition {
11    pub workflow_id: String,
12    pub version: String,
13    /// The canonical IR JSON value.
14    pub ir: serde_json::Value,
15    pub created_at: chrono::DateTime<chrono::Utc>,
16    /// Tenant that owns this workflow definition.
17    #[serde(default = "default_tenant_string")]
18    pub tenant_id: String,
19}
20
21fn default_tenant_string() -> String {
22    DEFAULT_TENANT.to_string()
23}
24
25#[derive(Debug, Error)]
26pub enum StateBackendError {
27    #[error("execution not found: {0}")]
28    NotFound(String),
29
30    #[error("optimistic concurrency conflict: sequence mismatch for {0}")]
31    SequenceConflict(String),
32
33    #[error("database error: {0}")]
34    Database(String),
35
36    #[error("serialization error: {0}")]
37    Serialization(#[from] serde_json::Error),
38}
39
40pub type BackendResult<T> = Result<T, StateBackendError>;
41
42/// Trait abstracting the durable state storage backend.
43///
44/// Implementors: `SqliteBackend`, `PostgresBackend`.
45/// Both must guarantee transactional writes per state transition.
46#[async_trait]
47pub trait StateBackend: Send + Sync {
48    // ── Workflow definitions ─────────────────────────────────────────────
49
50    /// Store a compiled workflow IR.
51    async fn store_workflow(&self, def: WorkflowDefinition) -> BackendResult<()>;
52
53    /// Load a workflow definition by id and version.
54    async fn get_workflow(
55        &self,
56        workflow_id: &str,
57        version: &str,
58    ) -> BackendResult<Option<WorkflowDefinition>>;
59
60    // ── Workflow executions ──────────────────────────────────────────────
61
62    /// Create a new workflow execution record.
63    async fn create_execution(&self, execution: WorkflowExecution) -> BackendResult<()>;
64
65    /// Load a workflow execution by id.
66    async fn get_execution(&self, id: &ExecutionId) -> BackendResult<Option<WorkflowExecution>>;
67
68    /// Update the status of a workflow execution.
69    async fn update_execution_status(
70        &self,
71        id: &ExecutionId,
72        status: WorkflowStatus,
73    ) -> BackendResult<()>;
74
75    /// Update the current_state of a workflow execution (apply state patches).
76    async fn update_execution_current_state(
77        &self,
78        id: &ExecutionId,
79        current_state: &serde_json::Value,
80    ) -> BackendResult<()>;
81
82    /// Append a value to an array field in the execution's current_state.
83    /// Creates the array if it doesn't exist.
84    async fn patch_append_array(
85        &self,
86        execution_id: &ExecutionId,
87        key: &str,
88        value: serde_json::Value,
89    ) -> BackendResult<()>;
90
91    /// List executions, optionally filtered by status.
92    async fn list_executions(
93        &self,
94        status: Option<WorkflowStatus>,
95        limit: u32,
96        offset: u32,
97    ) -> BackendResult<Vec<WorkflowExecution>>;
98
99    // ── Event log ────────────────────────────────────────────────────────
100
101    /// Append an event to the event log.
102    /// Must be transactional — either fully written or not at all.
103    async fn append_event(&self, event: Event) -> BackendResult<EventSequence>;
104
105    /// Load all events for an execution, ordered by sequence.
106    async fn get_events(&self, execution_id: &ExecutionId) -> BackendResult<Vec<Event>>;
107
108    /// Load events since a given sequence number (exclusive).
109    async fn get_events_since(
110        &self,
111        execution_id: &ExecutionId,
112        since_sequence: EventSequence,
113    ) -> BackendResult<Vec<Event>>;
114
115    /// Get the latest event sequence for an execution.
116    async fn latest_sequence(&self, execution_id: &ExecutionId) -> BackendResult<EventSequence>;
117
118    // ── Snapshots ────────────────────────────────────────────────────────
119
120    /// Write a snapshot.
121    async fn write_snapshot(&self, snapshot: Snapshot) -> BackendResult<()>;
122
123    /// Load the latest snapshot for an execution.
124    async fn latest_snapshot(&self, execution_id: &ExecutionId) -> BackendResult<Option<Snapshot>>;
125
126    // ── Queue (simple Postgres/SQLite backed queue in v1) ────────────────
127
128    /// Enqueue a work item for execution.
129    async fn enqueue_work_item(&self, item: WorkItem) -> BackendResult<WorkItemId>;
130
131    /// Claim the next available work item for a given queue type.
132    /// Returns None if no items are available.
133    async fn claim_work_item(
134        &self,
135        worker_id: &str,
136        queue_types: &[&str],
137    ) -> BackendResult<Option<WorkItem>>;
138
139    /// Renew the lease on a claimed work item (heartbeat).
140    async fn renew_lease(&self, item_id: WorkItemId, worker_id: &str) -> BackendResult<()>;
141
142    /// Mark a work item as completed and release the lease.
143    async fn complete_work_item(&self, item_id: WorkItemId) -> BackendResult<()>;
144
145    /// Mark a work item as failed. The scheduler will decide whether to retry.
146    async fn fail_work_item(&self, item_id: WorkItemId, error: &str) -> BackendResult<()>;
147
148    /// Reclaim work items whose lease has expired (worker crashed or stalled).
149    ///
150    /// For each expired item:
151    /// - Increments `attempt`.
152    /// - If `attempt < max_attempts`: resets to `pending` with optional backoff delay
153    ///   via `retry_after`, and returns it in the `retryable` list.
154    /// - If `attempt >= max_attempts`: moves to `dead_letter` and returns it in
155    ///   the `exhausted` list.
156    async fn reclaim_expired_leases(&self) -> BackendResult<ReclaimResult>;
157
158    /// Move a failed work item to the dead-letter queue.
159    async fn move_to_dead_letter(&self, item_id: WorkItemId, last_error: &str)
160        -> BackendResult<()>;
161
162    // ── API token auth ───────────────────────────────────────────────────
163
164    /// Create a new API token. Returns `(plaintext_token, token_info)`.
165    /// The plaintext token is only returned here; only its hash is stored.
166    async fn create_token(&self, name: &str, role: &str) -> BackendResult<(String, ApiToken)>;
167
168    /// Validate a plaintext API token. Returns the token info if valid and not revoked.
169    async fn validate_token(&self, token: &str) -> BackendResult<Option<ApiToken>>;
170
171    // ── Tenant management ───────────────────────────────────────────────
172
173    /// Create a new tenant.
174    async fn create_tenant(&self, _tenant: Tenant) -> BackendResult<()> {
175        Err(StateBackendError::Database(
176            "tenant management not supported".into(),
177        ))
178    }
179
180    /// Get a tenant by ID.
181    async fn get_tenant(&self, _id: &TenantId) -> BackendResult<Option<Tenant>> {
182        Ok(None)
183    }
184
185    /// List all tenants.
186    async fn list_tenants(&self) -> BackendResult<Vec<Tenant>> {
187        Ok(vec![])
188    }
189
190    /// Update tenant metadata (name, status, policy, limits).
191    async fn update_tenant(&self, _tenant: Tenant) -> BackendResult<()> {
192        Err(StateBackendError::Database(
193            "tenant management not supported".into(),
194        ))
195    }
196}
197
198/// Metadata for an API token (does not contain the plaintext token).
199#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
200pub struct ApiToken {
201    pub id: String,
202    pub name: String,
203    pub role: String,
204    pub created_at: chrono::DateTime<chrono::Utc>,
205    pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
206    /// Tenant this token belongs to.
207    #[serde(default = "default_tenant_string")]
208    pub tenant_id: String,
209}
210
211pub type WorkItemId = uuid::Uuid;
212
213/// A unit of work dispatched to a worker queue.
214#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
215pub struct WorkItem {
216    pub id: WorkItemId,
217    pub execution_id: ExecutionId,
218    pub node_id: String,
219    pub queue_type: String,
220    pub payload: serde_json::Value,
221    pub attempt: u32,
222    /// Maximum number of attempts before moving to dead-letter queue.
223    pub max_attempts: u32,
224    pub created_at: chrono::DateTime<chrono::Utc>,
225    pub lease_expires_at: Option<chrono::DateTime<chrono::Utc>>,
226    pub worker_id: Option<String>,
227    /// Tenant that owns this work item.
228    #[serde(default = "default_tenant_string")]
229    pub tenant_id: String,
230}
231
232/// Result of reclaiming expired work item leases.
233#[derive(Debug, Default)]
234pub struct ReclaimResult {
235    /// Items whose lease expired and have been reset to `pending` for retry.
236    pub retryable: Vec<WorkItem>,
237    /// Items that exhausted all attempts and were moved to dead-letter.
238    pub exhausted: Vec<WorkItem>,
239}