Skip to main content

entelix_graph/
in_memory_checkpointer.rs

1//! `InMemoryCheckpointer` — in-process `Checkpointer<S>` impl.
2//!
3//! Uses `parking_lot::Mutex` internally for synchronous access; the
4//! trait methods are async only to match the [`Checkpointer`]
5//! interface, not because the implementation blocks. Suited for
6//! single-process tests and short-lived agents where durability
7//! across crashes is not required. Postgres / Redis backed
8//! checkpointers live in `entelix-persistence`.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use entelix_core::{Error, Result, TenantId, ThreadKey};
15use parking_lot::Mutex;
16
17use crate::checkpoint::{Checkpoint, CheckpointId, Checkpointer};
18
19/// Internal partition key — bypasses the public `ThreadKey` so the
20/// `HashMap` can be cheaply cloned for entry lookups without
21/// constructing a new `ThreadKey` per operation. Cloning a
22/// [`TenantId`] is an `Arc<str>` refcount bump, so the partition
23/// remains cheap to materialise.
24type Partition = (TenantId, String);
25
26fn partition(key: &ThreadKey) -> Partition {
27    (key.tenant_id().clone(), key.thread_id().to_owned())
28}
29
30/// In-process checkpointer backed by a
31/// `HashMap<(tenant_id, thread_id), Vec<Checkpoint>>`. The
32/// composite key encodes Invariant 11 (multi-tenant isolation):
33/// the same `thread_id` under two tenants resolves to two distinct
34/// histories.
35///
36/// Cheap to clone — internal state is `Arc<Mutex<...>>`-shared.
37#[derive(Clone)]
38pub struct InMemoryCheckpointer<S>
39where
40    S: Clone + Send + Sync + 'static,
41{
42    inner: Arc<Mutex<HashMap<Partition, Vec<Checkpoint<S>>>>>,
43}
44
45impl<S> InMemoryCheckpointer<S>
46where
47    S: Clone + Send + Sync + 'static,
48{
49    /// Empty checkpointer.
50    pub fn new() -> Self {
51        Self {
52            inner: Arc::new(Mutex::new(HashMap::new())),
53        }
54    }
55
56    /// Total number of checkpoints stored across all
57    /// `(tenant_id, thread_id)` partitions. Test helper.
58    pub fn total_checkpoints(&self) -> usize {
59        self.inner.lock().values().map(Vec::len).sum()
60    }
61
62    /// Number of distinct `(tenant_id, thread_id)` partitions that
63    /// have at least one checkpoint.
64    pub fn thread_count(&self) -> usize {
65        self.inner.lock().len()
66    }
67}
68
69impl<S> Default for InMemoryCheckpointer<S>
70where
71    S: Clone + Send + Sync + 'static,
72{
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78#[async_trait]
79impl<S> Checkpointer<S> for InMemoryCheckpointer<S>
80where
81    S: Clone + Send + Sync + 'static,
82{
83    async fn put(&self, checkpoint: Checkpoint<S>) -> Result<()> {
84        let key = (checkpoint.tenant_id.clone(), checkpoint.thread_id.clone());
85        // Vec::push may reallocate, dropping the previous backing
86        // buffer (and therefore each `S` it held) while we hold the
87        // mutex. Per the `Checkpointer` trait contract `S::drop` does
88        // not block, so this is safe — but worth flagging for
89        // implementors of new state types.
90        self.inner.lock().entry(key).or_default().push(checkpoint);
91        Ok(())
92    }
93
94    async fn get_latest(&self, key: &ThreadKey) -> Result<Option<Checkpoint<S>>> {
95        let guard = self.inner.lock();
96        Ok(guard
97            .get(&partition(key))
98            .and_then(|history| history.last().cloned()))
99    }
100
101    async fn get_by_id(&self, key: &ThreadKey, id: &CheckpointId) -> Result<Option<Checkpoint<S>>> {
102        let guard = self.inner.lock();
103        Ok(guard
104            .get(&partition(key))
105            .and_then(|h| h.iter().find(|cp| &cp.id == id).cloned()))
106    }
107
108    async fn list_history(&self, key: &ThreadKey, limit: usize) -> Result<Vec<Checkpoint<S>>> {
109        let guard = self.inner.lock();
110        Ok(guard
111            .get(&partition(key))
112            .map(|h| h.iter().rev().take(limit).cloned().collect::<Vec<_>>())
113            .unwrap_or_default())
114    }
115
116    async fn update_state(
117        &self,
118        key: &ThreadKey,
119        parent_id: &CheckpointId,
120        new_state: S,
121    ) -> Result<CheckpointId> {
122        let part = partition(key);
123        // Look up the parent's bits, drop the read guard, then build
124        // the error or new checkpoint outside the lock scope.
125        let parent_bits: Option<(Option<String>, usize)> = {
126            let guard = self.inner.lock();
127            guard
128                .get(&part)
129                .and_then(|h| h.iter().find(|cp| &cp.id == parent_id))
130                .map(|cp| (cp.next_node.clone(), cp.step.saturating_add(1)))
131        };
132        let (next_node, step) = parent_bits.ok_or_else(|| {
133            Error::invalid_request(format!(
134                "InMemoryCheckpointer::update_state: unknown parent_id in tenant '{}' thread '{}'",
135                key.tenant_id(),
136                key.thread_id()
137            ))
138        })?;
139        let new_checkpoint =
140            Checkpoint::new(key, step, new_state, next_node).with_parent(parent_id.clone());
141        let new_id = new_checkpoint.id.clone();
142        self.inner
143            .lock()
144            .entry(part)
145            .or_default()
146            .push(new_checkpoint);
147        Ok(new_id)
148    }
149}