1use crate::backend::{
7 ApiToken, BackendResult, ReclaimResult, StateBackend, StateBackendError, WorkItem, WorkItemId,
8 WorkflowDefinition,
9};
10use crate::event::{Event, EventSequence};
11use crate::snapshot::Snapshot;
12use crate::tenant::{Tenant, TenantId};
13use async_trait::async_trait;
14use chrono::Utc;
15use dashmap::DashMap;
16use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
17use std::sync::atomic::{AtomicI64, Ordering};
18use uuid::Uuid;
19
20pub struct InMemoryBackend {
21 workflows: DashMap<(String, String), WorkflowDefinition>,
23 executions: DashMap<ExecutionId, WorkflowExecution>,
24 events: DashMap<ExecutionId, Vec<Event>>,
26 snapshots: DashMap<ExecutionId, Vec<Snapshot>>,
28 work_items: DashMap<WorkItemId, WorkItem>,
30 dead_letter: DashMap<WorkItemId, WorkItem>,
32 tokens: DashMap<String, ApiToken>,
34 tenants: DashMap<TenantId, Tenant>,
35 next_sequence: AtomicI64,
37}
38
39impl InMemoryBackend {
40 pub fn new() -> Self {
41 Self {
42 workflows: DashMap::new(),
43 executions: DashMap::new(),
44 events: DashMap::new(),
45 snapshots: DashMap::new(),
46 work_items: DashMap::new(),
47 dead_letter: DashMap::new(),
48 tokens: DashMap::new(),
49 tenants: DashMap::new(),
50 next_sequence: AtomicI64::new(1),
51 }
52 }
53}
54
55impl Default for InMemoryBackend {
56 fn default() -> Self {
57 Self::new()
58 }
59}
60
61#[async_trait]
62impl StateBackend for InMemoryBackend {
63 async fn store_workflow(&self, def: WorkflowDefinition) -> BackendResult<()> {
66 self.workflows
67 .insert((def.workflow_id.clone(), def.version.clone()), def);
68 Ok(())
69 }
70
71 async fn get_workflow(
72 &self,
73 workflow_id: &str,
74 version: &str,
75 ) -> BackendResult<Option<WorkflowDefinition>> {
76 Ok(self
77 .workflows
78 .get(&(workflow_id.to_string(), version.to_string()))
79 .map(|r| r.value().clone()))
80 }
81
82 async fn create_execution(&self, execution: WorkflowExecution) -> BackendResult<()> {
85 self.executions
86 .insert(execution.execution_id.clone(), execution);
87 Ok(())
88 }
89
90 async fn get_execution(&self, id: &ExecutionId) -> BackendResult<Option<WorkflowExecution>> {
91 Ok(self.executions.get(id).map(|r| r.value().clone()))
92 }
93
94 async fn update_execution_status(
95 &self,
96 id: &ExecutionId,
97 status: WorkflowStatus,
98 ) -> BackendResult<()> {
99 match self.executions.get_mut(id) {
100 Some(mut entry) => {
101 entry.status = status;
102 entry.updated_at = Utc::now();
103 Ok(())
104 }
105 None => Err(StateBackendError::NotFound(id.to_string())),
106 }
107 }
108
109 async fn update_execution_current_state(
110 &self,
111 id: &ExecutionId,
112 current_state: &serde_json::Value,
113 ) -> BackendResult<()> {
114 match self.executions.get_mut(id) {
115 Some(mut entry) => {
116 entry.current_state = current_state.clone();
117 entry.updated_at = Utc::now();
118 Ok(())
119 }
120 None => Err(StateBackendError::NotFound(id.to_string())),
121 }
122 }
123
124 async fn patch_append_array(
125 &self,
126 execution_id: &ExecutionId,
127 key: &str,
128 value: serde_json::Value,
129 ) -> BackendResult<()> {
130 match self.executions.get_mut(execution_id) {
131 Some(mut entry) => {
132 let state = &mut entry.current_state;
133 if let Some(obj) = state.as_object_mut() {
134 let arr = obj
135 .entry(key.to_string())
136 .or_insert_with(|| serde_json::Value::Array(vec![]));
137 if let Some(a) = arr.as_array_mut() {
138 a.push(value);
139 }
140 }
141 entry.updated_at = Utc::now();
142 Ok(())
143 }
144 None => Err(StateBackendError::NotFound(execution_id.to_string())),
145 }
146 }
147
148 async fn list_executions(
149 &self,
150 status: Option<WorkflowStatus>,
151 limit: u32,
152 offset: u32,
153 ) -> BackendResult<Vec<WorkflowExecution>> {
154 let mut results: Vec<WorkflowExecution> = self
155 .executions
156 .iter()
157 .filter(|r| {
158 if let Some(ref s) = status {
159 &r.value().status == s
160 } else {
161 true
162 }
163 })
164 .map(|r| r.value().clone())
165 .collect();
166 results.sort_by(|a, b| b.started_at.cmp(&a.started_at));
167 Ok(results
168 .into_iter()
169 .skip(offset as usize)
170 .take(limit as usize)
171 .collect())
172 }
173
174 async fn append_event(&self, mut event: Event) -> BackendResult<EventSequence> {
177 let seq = self.next_sequence.fetch_add(1, Ordering::SeqCst);
178 event.sequence = seq;
179 self.events
180 .entry(event.execution_id.clone())
181 .or_default()
182 .push(event);
183 Ok(seq)
184 }
185
186 async fn get_events(&self, execution_id: &ExecutionId) -> BackendResult<Vec<Event>> {
187 Ok(self
188 .events
189 .get(execution_id)
190 .map(|r| r.value().clone())
191 .unwrap_or_default())
192 }
193
194 async fn get_events_since(
195 &self,
196 execution_id: &ExecutionId,
197 since_sequence: EventSequence,
198 ) -> BackendResult<Vec<Event>> {
199 Ok(self
200 .events
201 .get(execution_id)
202 .map(|r| {
203 r.value()
204 .iter()
205 .filter(|e| e.sequence > since_sequence)
206 .cloned()
207 .collect()
208 })
209 .unwrap_or_default())
210 }
211
212 async fn latest_sequence(&self, execution_id: &ExecutionId) -> BackendResult<EventSequence> {
213 Ok(self
214 .events
215 .get(execution_id)
216 .and_then(|r| r.value().last().map(|e| e.sequence))
217 .unwrap_or(0))
218 }
219
220 async fn write_snapshot(&self, snapshot: Snapshot) -> BackendResult<()> {
223 self.snapshots
224 .entry(snapshot.execution_id.clone())
225 .or_default()
226 .push(snapshot);
227 Ok(())
228 }
229
230 async fn latest_snapshot(&self, execution_id: &ExecutionId) -> BackendResult<Option<Snapshot>> {
231 Ok(self
232 .snapshots
233 .get(execution_id)
234 .and_then(|r| r.value().last().cloned()))
235 }
236
237 async fn enqueue_work_item(&self, item: WorkItem) -> BackendResult<WorkItemId> {
240 let id = item.id;
241 self.work_items.insert(id, item);
242 Ok(id)
243 }
244
245 async fn claim_work_item(
246 &self,
247 worker_id: &str,
248 queue_types: &[&str],
249 ) -> BackendResult<Option<WorkItem>> {
250 for mut entry in self.work_items.iter_mut() {
251 let item = entry.value_mut();
252 if item.worker_id.is_none() && queue_types.contains(&item.queue_type.as_str()) {
253 item.worker_id = Some(worker_id.to_string());
254 item.lease_expires_at = Some(Utc::now() + chrono::Duration::seconds(30));
255 return Ok(Some(item.clone()));
256 }
257 }
258 Ok(None)
259 }
260
261 async fn renew_lease(&self, item_id: WorkItemId, worker_id: &str) -> BackendResult<()> {
262 match self.work_items.get_mut(&item_id) {
263 Some(mut entry) => {
264 if entry.worker_id.as_deref() == Some(worker_id) {
265 entry.lease_expires_at = Some(Utc::now() + chrono::Duration::seconds(30));
266 Ok(())
267 } else {
268 Err(StateBackendError::NotFound(format!(
269 "lease not held by {worker_id}"
270 )))
271 }
272 }
273 None => Err(StateBackendError::NotFound(item_id.to_string())),
274 }
275 }
276
277 async fn complete_work_item(&self, item_id: WorkItemId) -> BackendResult<()> {
278 self.work_items.remove(&item_id);
279 Ok(())
280 }
281
282 async fn fail_work_item(&self, item_id: WorkItemId, error: &str) -> BackendResult<()> {
283 match self.work_items.get_mut(&item_id) {
284 Some(mut entry) => {
285 entry.attempt += 1;
286 entry.worker_id = None;
287 entry.lease_expires_at = None;
288 if let Some(obj) = entry.payload.as_object_mut() {
289 obj.insert(
290 "last_error".into(),
291 serde_json::Value::String(error.to_string()),
292 );
293 }
294 Ok(())
295 }
296 None => Err(StateBackendError::NotFound(item_id.to_string())),
297 }
298 }
299
300 async fn reclaim_expired_leases(&self) -> BackendResult<ReclaimResult> {
301 let now = Utc::now();
302 let mut result = ReclaimResult::default();
303 let mut to_dead_letter = vec![];
304
305 for mut entry in self.work_items.iter_mut() {
306 let item = entry.value_mut();
307 if let Some(expires) = item.lease_expires_at {
308 if expires < now && item.worker_id.is_some() {
309 item.attempt += 1;
310 if item.attempt < item.max_attempts {
311 item.worker_id = None;
312 item.lease_expires_at = None;
313 result.retryable.push(item.clone());
314 } else {
315 to_dead_letter.push(item.clone());
316 }
317 }
318 }
319 }
320
321 for item in to_dead_letter {
322 self.work_items.remove(&item.id);
323 result.exhausted.push(item.clone());
324 self.dead_letter.insert(item.id, item);
325 }
326
327 Ok(result)
328 }
329
330 async fn move_to_dead_letter(
331 &self,
332 item_id: WorkItemId,
333 _last_error: &str,
334 ) -> BackendResult<()> {
335 if let Some((_, item)) = self.work_items.remove(&item_id) {
336 self.dead_letter.insert(item_id, item);
337 }
338 Ok(())
339 }
340
341 async fn create_token(&self, name: &str, role: &str) -> BackendResult<(String, ApiToken)> {
344 let plaintext = format!("jj_{}", Uuid::new_v4().to_string().replace('-', ""));
345 let token = ApiToken {
346 id: Uuid::new_v4().to_string(),
347 name: name.to_string(),
348 role: role.to_string(),
349 created_at: Utc::now(),
350 expires_at: None,
351 tenant_id: crate::tenant::DEFAULT_TENANT.to_string(),
352 };
353 self.tokens.insert(plaintext.clone(), token.clone());
354 Ok((plaintext, token))
355 }
356
357 async fn validate_token(&self, token: &str) -> BackendResult<Option<ApiToken>> {
358 Ok(self.tokens.get(token).map(|r| r.value().clone()))
359 }
360
361 async fn create_tenant(&self, tenant: Tenant) -> BackendResult<()> {
364 self.tenants.insert(tenant.id.clone(), tenant);
365 Ok(())
366 }
367
368 async fn get_tenant(&self, id: &TenantId) -> BackendResult<Option<Tenant>> {
369 Ok(self.tenants.get(id).map(|r| r.value().clone()))
370 }
371
372 async fn list_tenants(&self) -> BackendResult<Vec<Tenant>> {
373 Ok(self.tenants.iter().map(|r| r.value().clone()).collect())
374 }
375
376 async fn update_tenant(&self, tenant: Tenant) -> BackendResult<()> {
377 self.tenants.insert(tenant.id.clone(), tenant);
378 Ok(())
379 }
380}