1use crate::event::{Event, EventSequence};
2use crate::snapshot::Snapshot;
3use crate::tenant::{Tenant, TenantId, DEFAULT_TENANT};
4use async_trait::async_trait;
5use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
6use thiserror::Error;
7
8#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
10pub struct WorkflowDefinition {
11 pub workflow_id: String,
12 pub version: String,
13 pub ir: serde_json::Value,
15 pub created_at: chrono::DateTime<chrono::Utc>,
16 #[serde(default = "default_tenant_string")]
18 pub tenant_id: String,
19}
20
21fn default_tenant_string() -> String {
22 DEFAULT_TENANT.to_string()
23}
24
25#[derive(Debug, Error)]
26pub enum StateBackendError {
27 #[error("execution not found: {0}")]
28 NotFound(String),
29
30 #[error("optimistic concurrency conflict: sequence mismatch for {0}")]
31 SequenceConflict(String),
32
33 #[error("database error: {0}")]
34 Database(String),
35
36 #[error("serialization error: {0}")]
37 Serialization(#[from] serde_json::Error),
38}
39
40pub type BackendResult<T> = Result<T, StateBackendError>;
41
42#[async_trait]
47pub trait StateBackend: Send + Sync {
48 async fn store_workflow(&self, def: WorkflowDefinition) -> BackendResult<()>;
52
53 async fn get_workflow(
55 &self,
56 workflow_id: &str,
57 version: &str,
58 ) -> BackendResult<Option<WorkflowDefinition>>;
59
60 async fn create_execution(&self, execution: WorkflowExecution) -> BackendResult<()>;
64
65 async fn get_execution(&self, id: &ExecutionId) -> BackendResult<Option<WorkflowExecution>>;
67
68 async fn update_execution_status(
70 &self,
71 id: &ExecutionId,
72 status: WorkflowStatus,
73 ) -> BackendResult<()>;
74
75 async fn update_execution_current_state(
77 &self,
78 id: &ExecutionId,
79 current_state: &serde_json::Value,
80 ) -> BackendResult<()>;
81
82 async fn patch_append_array(
85 &self,
86 execution_id: &ExecutionId,
87 key: &str,
88 value: serde_json::Value,
89 ) -> BackendResult<()>;
90
91 async fn list_executions(
93 &self,
94 status: Option<WorkflowStatus>,
95 limit: u32,
96 offset: u32,
97 ) -> BackendResult<Vec<WorkflowExecution>>;
98
99 async fn append_event(&self, event: Event) -> BackendResult<EventSequence>;
104
105 async fn get_events(&self, execution_id: &ExecutionId) -> BackendResult<Vec<Event>>;
107
108 async fn get_events_since(
110 &self,
111 execution_id: &ExecutionId,
112 since_sequence: EventSequence,
113 ) -> BackendResult<Vec<Event>>;
114
115 async fn latest_sequence(&self, execution_id: &ExecutionId) -> BackendResult<EventSequence>;
117
118 async fn write_snapshot(&self, snapshot: Snapshot) -> BackendResult<()>;
122
123 async fn latest_snapshot(&self, execution_id: &ExecutionId) -> BackendResult<Option<Snapshot>>;
125
126 async fn enqueue_work_item(&self, item: WorkItem) -> BackendResult<WorkItemId>;
130
131 async fn claim_work_item(
134 &self,
135 worker_id: &str,
136 queue_types: &[&str],
137 ) -> BackendResult<Option<WorkItem>>;
138
139 async fn renew_lease(&self, item_id: WorkItemId, worker_id: &str) -> BackendResult<()>;
141
142 async fn complete_work_item(&self, item_id: WorkItemId) -> BackendResult<()>;
144
145 async fn fail_work_item(&self, item_id: WorkItemId, error: &str) -> BackendResult<()>;
147
148 async fn reclaim_expired_leases(&self) -> BackendResult<ReclaimResult>;
157
158 async fn move_to_dead_letter(&self, item_id: WorkItemId, last_error: &str)
160 -> BackendResult<()>;
161
162 async fn create_token(&self, name: &str, role: &str) -> BackendResult<(String, ApiToken)>;
167
168 async fn validate_token(&self, token: &str) -> BackendResult<Option<ApiToken>>;
170
171 async fn create_tenant(&self, _tenant: Tenant) -> BackendResult<()> {
175 Err(StateBackendError::Database(
176 "tenant management not supported".into(),
177 ))
178 }
179
180 async fn get_tenant(&self, _id: &TenantId) -> BackendResult<Option<Tenant>> {
182 Ok(None)
183 }
184
185 async fn list_tenants(&self) -> BackendResult<Vec<Tenant>> {
187 Ok(vec![])
188 }
189
190 async fn update_tenant(&self, _tenant: Tenant) -> BackendResult<()> {
192 Err(StateBackendError::Database(
193 "tenant management not supported".into(),
194 ))
195 }
196}
197
198#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
200pub struct ApiToken {
201 pub id: String,
202 pub name: String,
203 pub role: String,
204 pub created_at: chrono::DateTime<chrono::Utc>,
205 pub expires_at: Option<chrono::DateTime<chrono::Utc>>,
206 #[serde(default = "default_tenant_string")]
208 pub tenant_id: String,
209}
210
211pub type WorkItemId = uuid::Uuid;
212
213#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
215pub struct WorkItem {
216 pub id: WorkItemId,
217 pub execution_id: ExecutionId,
218 pub node_id: String,
219 pub queue_type: String,
220 pub payload: serde_json::Value,
221 pub attempt: u32,
222 pub max_attempts: u32,
224 pub created_at: chrono::DateTime<chrono::Utc>,
225 pub lease_expires_at: Option<chrono::DateTime<chrono::Utc>>,
226 pub worker_id: Option<String>,
227 #[serde(default = "default_tenant_string")]
229 pub tenant_id: String,
230}
231
232#[derive(Debug, Default)]
234pub struct ReclaimResult {
235 pub retryable: Vec<WorkItem>,
237 pub exhausted: Vec<WorkItem>,
239}