Skip to main content

jamjet_state/
memory.rs

1//! In-memory state backend — no persistence, no external dependencies.
2//!
3//! Suitable for sandbox deployments (Glama), integration testing, and
4//! quick prototyping where durability is not needed.
5
6use crate::backend::{
7    ApiToken, BackendResult, ReclaimResult, StateBackend, StateBackendError, WorkItem, WorkItemId,
8    WorkflowDefinition,
9};
10use crate::event::{Event, EventSequence};
11use crate::snapshot::Snapshot;
12use crate::tenant::{Tenant, TenantId};
13use async_trait::async_trait;
14use chrono::Utc;
15use dashmap::DashMap;
16use jamjet_core::workflow::{ExecutionId, WorkflowExecution, WorkflowStatus};
17use std::sync::atomic::{AtomicI64, Ordering};
18use uuid::Uuid;
19
20pub struct InMemoryBackend {
21    /// (workflow_id, version) -> definition
22    workflows: DashMap<(String, String), WorkflowDefinition>,
23    executions: DashMap<ExecutionId, WorkflowExecution>,
24    /// execution_id -> ordered list of events
25    events: DashMap<ExecutionId, Vec<Event>>,
26    /// execution_id -> ordered list of snapshots
27    snapshots: DashMap<ExecutionId, Vec<Snapshot>>,
28    /// Work queue: item_id -> item
29    work_items: DashMap<WorkItemId, WorkItem>,
30    /// Dead-letter queue
31    dead_letter: DashMap<WorkItemId, WorkItem>,
32    /// token plaintext -> ApiToken
33    tokens: DashMap<String, ApiToken>,
34    tenants: DashMap<TenantId, Tenant>,
35    /// Global event sequence counter (monotonic across all executions).
36    next_sequence: AtomicI64,
37}
38
39impl InMemoryBackend {
40    pub fn new() -> Self {
41        Self {
42            workflows: DashMap::new(),
43            executions: DashMap::new(),
44            events: DashMap::new(),
45            snapshots: DashMap::new(),
46            work_items: DashMap::new(),
47            dead_letter: DashMap::new(),
48            tokens: DashMap::new(),
49            tenants: DashMap::new(),
50            next_sequence: AtomicI64::new(1),
51        }
52    }
53}
54
55impl Default for InMemoryBackend {
56    fn default() -> Self {
57        Self::new()
58    }
59}
60
61#[async_trait]
62impl StateBackend for InMemoryBackend {
63    // ── Workflow definitions ─────────────────────────────────────────
64
65    async fn store_workflow(&self, def: WorkflowDefinition) -> BackendResult<()> {
66        self.workflows
67            .insert((def.workflow_id.clone(), def.version.clone()), def);
68        Ok(())
69    }
70
71    async fn get_workflow(
72        &self,
73        workflow_id: &str,
74        version: &str,
75    ) -> BackendResult<Option<WorkflowDefinition>> {
76        Ok(self
77            .workflows
78            .get(&(workflow_id.to_string(), version.to_string()))
79            .map(|r| r.value().clone()))
80    }
81
82    // ── Workflow executions ──────────────────────────────────────────
83
84    async fn create_execution(&self, execution: WorkflowExecution) -> BackendResult<()> {
85        self.executions
86            .insert(execution.execution_id.clone(), execution);
87        Ok(())
88    }
89
90    async fn get_execution(&self, id: &ExecutionId) -> BackendResult<Option<WorkflowExecution>> {
91        Ok(self.executions.get(id).map(|r| r.value().clone()))
92    }
93
94    async fn update_execution_status(
95        &self,
96        id: &ExecutionId,
97        status: WorkflowStatus,
98    ) -> BackendResult<()> {
99        match self.executions.get_mut(id) {
100            Some(mut entry) => {
101                entry.status = status;
102                entry.updated_at = Utc::now();
103                Ok(())
104            }
105            None => Err(StateBackendError::NotFound(id.to_string())),
106        }
107    }
108
109    async fn update_execution_current_state(
110        &self,
111        id: &ExecutionId,
112        current_state: &serde_json::Value,
113    ) -> BackendResult<()> {
114        match self.executions.get_mut(id) {
115            Some(mut entry) => {
116                entry.current_state = current_state.clone();
117                entry.updated_at = Utc::now();
118                Ok(())
119            }
120            None => Err(StateBackendError::NotFound(id.to_string())),
121        }
122    }
123
124    async fn patch_append_array(
125        &self,
126        execution_id: &ExecutionId,
127        key: &str,
128        value: serde_json::Value,
129    ) -> BackendResult<()> {
130        match self.executions.get_mut(execution_id) {
131            Some(mut entry) => {
132                let state = &mut entry.current_state;
133                if let Some(obj) = state.as_object_mut() {
134                    let arr = obj
135                        .entry(key.to_string())
136                        .or_insert_with(|| serde_json::Value::Array(vec![]));
137                    if let Some(a) = arr.as_array_mut() {
138                        a.push(value);
139                    }
140                }
141                entry.updated_at = Utc::now();
142                Ok(())
143            }
144            None => Err(StateBackendError::NotFound(execution_id.to_string())),
145        }
146    }
147
148    async fn list_executions(
149        &self,
150        status: Option<WorkflowStatus>,
151        limit: u32,
152        offset: u32,
153    ) -> BackendResult<Vec<WorkflowExecution>> {
154        let mut results: Vec<WorkflowExecution> = self
155            .executions
156            .iter()
157            .filter(|r| {
158                if let Some(ref s) = status {
159                    &r.value().status == s
160                } else {
161                    true
162                }
163            })
164            .map(|r| r.value().clone())
165            .collect();
166        results.sort_by(|a, b| b.started_at.cmp(&a.started_at));
167        Ok(results
168            .into_iter()
169            .skip(offset as usize)
170            .take(limit as usize)
171            .collect())
172    }
173
174    // ── Event log ────────────────────────────────────────────────────
175
176    async fn append_event(&self, mut event: Event) -> BackendResult<EventSequence> {
177        let seq = self.next_sequence.fetch_add(1, Ordering::SeqCst);
178        event.sequence = seq;
179        self.events
180            .entry(event.execution_id.clone())
181            .or_default()
182            .push(event);
183        Ok(seq)
184    }
185
186    async fn get_events(&self, execution_id: &ExecutionId) -> BackendResult<Vec<Event>> {
187        Ok(self
188            .events
189            .get(execution_id)
190            .map(|r| r.value().clone())
191            .unwrap_or_default())
192    }
193
194    async fn get_events_since(
195        &self,
196        execution_id: &ExecutionId,
197        since_sequence: EventSequence,
198    ) -> BackendResult<Vec<Event>> {
199        Ok(self
200            .events
201            .get(execution_id)
202            .map(|r| {
203                r.value()
204                    .iter()
205                    .filter(|e| e.sequence > since_sequence)
206                    .cloned()
207                    .collect()
208            })
209            .unwrap_or_default())
210    }
211
212    async fn latest_sequence(&self, execution_id: &ExecutionId) -> BackendResult<EventSequence> {
213        Ok(self
214            .events
215            .get(execution_id)
216            .and_then(|r| r.value().last().map(|e| e.sequence))
217            .unwrap_or(0))
218    }
219
220    // ── Snapshots ────────────────────────────────────────────────────
221
222    async fn write_snapshot(&self, snapshot: Snapshot) -> BackendResult<()> {
223        self.snapshots
224            .entry(snapshot.execution_id.clone())
225            .or_default()
226            .push(snapshot);
227        Ok(())
228    }
229
230    async fn latest_snapshot(&self, execution_id: &ExecutionId) -> BackendResult<Option<Snapshot>> {
231        Ok(self
232            .snapshots
233            .get(execution_id)
234            .and_then(|r| r.value().last().cloned()))
235    }
236
237    // ── Work queue ───────────────────────────────────────────────────
238
239    async fn enqueue_work_item(&self, item: WorkItem) -> BackendResult<WorkItemId> {
240        let id = item.id;
241        self.work_items.insert(id, item);
242        Ok(id)
243    }
244
245    async fn claim_work_item(
246        &self,
247        worker_id: &str,
248        queue_types: &[&str],
249    ) -> BackendResult<Option<WorkItem>> {
250        for mut entry in self.work_items.iter_mut() {
251            let item = entry.value_mut();
252            if item.worker_id.is_none() && queue_types.contains(&item.queue_type.as_str()) {
253                item.worker_id = Some(worker_id.to_string());
254                item.lease_expires_at = Some(Utc::now() + chrono::Duration::seconds(30));
255                return Ok(Some(item.clone()));
256            }
257        }
258        Ok(None)
259    }
260
261    async fn renew_lease(&self, item_id: WorkItemId, worker_id: &str) -> BackendResult<()> {
262        match self.work_items.get_mut(&item_id) {
263            Some(mut entry) => {
264                if entry.worker_id.as_deref() == Some(worker_id) {
265                    entry.lease_expires_at = Some(Utc::now() + chrono::Duration::seconds(30));
266                    Ok(())
267                } else {
268                    Err(StateBackendError::NotFound(format!(
269                        "lease not held by {worker_id}"
270                    )))
271                }
272            }
273            None => Err(StateBackendError::NotFound(item_id.to_string())),
274        }
275    }
276
277    async fn complete_work_item(&self, item_id: WorkItemId) -> BackendResult<()> {
278        self.work_items.remove(&item_id);
279        Ok(())
280    }
281
282    async fn fail_work_item(&self, item_id: WorkItemId, error: &str) -> BackendResult<()> {
283        match self.work_items.get_mut(&item_id) {
284            Some(mut entry) => {
285                entry.attempt += 1;
286                entry.worker_id = None;
287                entry.lease_expires_at = None;
288                if let Some(obj) = entry.payload.as_object_mut() {
289                    obj.insert(
290                        "last_error".into(),
291                        serde_json::Value::String(error.to_string()),
292                    );
293                }
294                Ok(())
295            }
296            None => Err(StateBackendError::NotFound(item_id.to_string())),
297        }
298    }
299
300    async fn reclaim_expired_leases(&self) -> BackendResult<ReclaimResult> {
301        let now = Utc::now();
302        let mut result = ReclaimResult::default();
303        let mut to_dead_letter = vec![];
304
305        for mut entry in self.work_items.iter_mut() {
306            let item = entry.value_mut();
307            if let Some(expires) = item.lease_expires_at {
308                if expires < now && item.worker_id.is_some() {
309                    item.attempt += 1;
310                    if item.attempt < item.max_attempts {
311                        item.worker_id = None;
312                        item.lease_expires_at = None;
313                        result.retryable.push(item.clone());
314                    } else {
315                        to_dead_letter.push(item.clone());
316                    }
317                }
318            }
319        }
320
321        for item in to_dead_letter {
322            self.work_items.remove(&item.id);
323            result.exhausted.push(item.clone());
324            self.dead_letter.insert(item.id, item);
325        }
326
327        Ok(result)
328    }
329
330    async fn move_to_dead_letter(
331        &self,
332        item_id: WorkItemId,
333        _last_error: &str,
334    ) -> BackendResult<()> {
335        if let Some((_, item)) = self.work_items.remove(&item_id) {
336            self.dead_letter.insert(item_id, item);
337        }
338        Ok(())
339    }
340
341    // ── API tokens ───────────────────────────────────────────────────
342
343    async fn create_token(&self, name: &str, role: &str) -> BackendResult<(String, ApiToken)> {
344        let plaintext = format!("jj_{}", Uuid::new_v4().to_string().replace('-', ""));
345        let token = ApiToken {
346            id: Uuid::new_v4().to_string(),
347            name: name.to_string(),
348            role: role.to_string(),
349            created_at: Utc::now(),
350            expires_at: None,
351            tenant_id: crate::tenant::DEFAULT_TENANT.to_string(),
352        };
353        self.tokens.insert(plaintext.clone(), token.clone());
354        Ok((plaintext, token))
355    }
356
357    async fn validate_token(&self, token: &str) -> BackendResult<Option<ApiToken>> {
358        Ok(self.tokens.get(token).map(|r| r.value().clone()))
359    }
360
361    // ── Tenants ──────────────────────────────────────────────────────
362
363    async fn create_tenant(&self, tenant: Tenant) -> BackendResult<()> {
364        self.tenants.insert(tenant.id.clone(), tenant);
365        Ok(())
366    }
367
368    async fn get_tenant(&self, id: &TenantId) -> BackendResult<Option<Tenant>> {
369        Ok(self.tenants.get(id).map(|r| r.value().clone()))
370    }
371
372    async fn list_tenants(&self) -> BackendResult<Vec<Tenant>> {
373        Ok(self.tenants.iter().map(|r| r.value().clone()).collect())
374    }
375
376    async fn update_tenant(&self, tenant: Tenant) -> BackendResult<()> {
377        self.tenants.insert(tenant.id.clone(), tenant);
378        Ok(())
379    }
380}