Skip to main content

bob_adapters/
journal_memory.rs

1//! # In-Memory Journal Implementations
2//!
3//! In-memory implementations of journal ports backed by `scc::HashMap` and `Vec`.
4//!
5//! Suitable for development and single-process deployments.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use bob_core::{
10    error::StoreError,
11    journal::{JournalEntry, ToolJournalPort},
12    types::{ActivityEntry, ActivityQuery, SessionId},
13};
14
15/// In-memory tool call journal.
16///
17/// Entries are stored per-session in a lock-free concurrent map.
18#[derive(Debug)]
19pub struct InMemoryToolJournal {
20    inner: scc::HashMap<SessionId, Vec<JournalEntry>>,
21}
22
23impl Default for InMemoryToolJournal {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29impl InMemoryToolJournal {
30    /// Create an empty journal.
31    #[must_use]
32    pub fn new() -> Self {
33        Self { inner: scc::HashMap::new() }
34    }
35}
36
37#[async_trait::async_trait]
38impl ToolJournalPort for InMemoryToolJournal {
39    async fn append(&self, entry: JournalEntry) -> Result<(), StoreError> {
40        let session_id = entry.session_id.clone();
41        let map_entry = self.inner.entry_async(session_id).await;
42        match map_entry {
43            scc::hash_map::Entry::Occupied(mut occ) => {
44                occ.get_mut().push(entry);
45            }
46            scc::hash_map::Entry::Vacant(vac) => {
47                let _ = vac.insert_entry(vec![entry]);
48            }
49        }
50        Ok(())
51    }
52
53    async fn lookup(
54        &self,
55        session_id: &SessionId,
56        fingerprint: &str,
57    ) -> Result<Option<JournalEntry>, StoreError> {
58        let found = self
59            .inner
60            .read_async(session_id, |_k, entries| {
61                entries.iter().find(|e| e.call_fingerprint == fingerprint).cloned()
62            })
63            .await;
64        Ok(found.flatten())
65    }
66
67    async fn entries(&self, session_id: &SessionId) -> Result<Vec<JournalEntry>, StoreError> {
68        let entries =
69            self.inner.read_async(session_id, |_k, v| v.clone()).await.unwrap_or_default();
70        Ok(entries)
71    }
72}
73
74// ── In-Memory Activity Journal ────────────────────────────────────────
75
76/// In-memory activity journal for testing and development.
77///
78/// Entries are stored in a `Vec` behind a `tokio::sync::Mutex` for
79/// simplicity. Time-window queries filter over the full vector.
80#[derive(Debug)]
81pub struct MemoryActivityJournal {
82    entries: tokio::sync::Mutex<Vec<ActivityEntry>>,
83    count: AtomicU64,
84}
85
86impl Default for MemoryActivityJournal {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl MemoryActivityJournal {
93    /// Create an empty activity journal.
94    #[must_use]
95    pub fn new() -> Self {
96        Self { entries: tokio::sync::Mutex::new(Vec::new()), count: AtomicU64::new(0) }
97    }
98}
99
100#[async_trait::async_trait]
101impl bob_core::ports::ActivityJournalPort for MemoryActivityJournal {
102    async fn append(&self, entry: ActivityEntry) -> Result<(), StoreError> {
103        let mut entries = self.entries.lock().await;
104        entries.push(entry);
105        self.count.fetch_add(1, Ordering::Relaxed);
106        Ok(())
107    }
108
109    async fn query(&self, query: &ActivityQuery) -> Result<Vec<ActivityEntry>, StoreError> {
110        let entries = self.entries.lock().await;
111        let lower = query.lower_bound_ms();
112        let upper = query.upper_bound_ms();
113
114        let results: Vec<ActivityEntry> = entries
115            .iter()
116            .filter(|e| e.timestamp_ms >= lower && e.timestamp_ms <= upper)
117            .filter(|e| query.role_filter.as_ref().is_none_or(|role| e.role == *role))
118            .cloned()
119            .collect();
120
121        Ok(results)
122    }
123
124    async fn count(&self) -> Result<u64, StoreError> {
125        Ok(self.count.load(Ordering::Relaxed))
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use std::sync::Arc;
132
133    use bob_core::journal::JournalEntry;
134
135    use super::*;
136
137    // ── InMemoryToolJournal tests ─────────────────────────────────
138
139    #[tokio::test]
140    async fn append_and_lookup_roundtrip() {
141        let journal = InMemoryToolJournal::new();
142        let sid = "sess-1".to_string();
143        let args = serde_json::json!({"path": "/tmp/a"});
144        let fp = JournalEntry::fingerprint("read_file", &args);
145
146        let entry = JournalEntry {
147            session_id: sid.clone(),
148            call_fingerprint: fp.clone(),
149            tool_name: "read_file".to_string(),
150            arguments: args.clone(),
151            result: serde_json::json!({"content": "hello"}),
152            is_error: false,
153            timestamp_ms: 0,
154        };
155
156        journal.append(entry).await.ok();
157
158        let found = journal.lookup(&sid, &fp).await.ok().flatten();
159        assert!(found.is_some(), "should find the appended entry");
160        assert_eq!(found.as_ref().map(|e| e.tool_name.as_str()), Some("read_file"));
161    }
162
163    #[tokio::test]
164    async fn lookup_returns_none_for_missing() {
165        let journal = InMemoryToolJournal::new();
166        let found =
167            journal.lookup(&"missing".to_string(), "nonexistent:fingerprint").await.ok().flatten();
168        assert!(found.is_none(), "should return None for missing session");
169    }
170
171    #[tokio::test]
172    async fn entries_returns_all_for_session() {
173        let journal = InMemoryToolJournal::new();
174        let sid = "sess-2".to_string();
175
176        for i in 0..3 {
177            let args = serde_json::json!({"i": i});
178            let entry = JournalEntry {
179                session_id: sid.clone(),
180                call_fingerprint: JournalEntry::fingerprint("tool", &args),
181                tool_name: "tool".to_string(),
182                arguments: args,
183                result: serde_json::json!({"i": i}),
184                is_error: false,
185                timestamp_ms: 0,
186            };
187            journal.append(entry).await.ok();
188        }
189
190        let all = journal.entries(&sid).await.ok().unwrap_or_default();
191        assert_eq!(all.len(), 3);
192    }
193
194    #[tokio::test]
195    async fn arc_dyn_journal_works() {
196        let journal: Arc<dyn ToolJournalPort> = Arc::new(InMemoryToolJournal::new());
197        let sid = "sess-arc".to_string();
198        let args = serde_json::json!({});
199        let entry = JournalEntry {
200            session_id: sid.clone(),
201            call_fingerprint: JournalEntry::fingerprint("t", &args),
202            tool_name: "t".to_string(),
203            arguments: args,
204            result: serde_json::json!(null),
205            is_error: false,
206            timestamp_ms: 0,
207        };
208        journal.append(entry).await.ok();
209        let found = journal
210            .lookup(&sid, &JournalEntry::fingerprint("t", &serde_json::json!({})))
211            .await
212            .ok()
213            .flatten();
214        assert!(found.is_some());
215    }
216
217    // ── MemoryActivityJournal tests ───────────────────────────────
218
219    use bob_core::ports::ActivityJournalPort;
220
221    #[tokio::test]
222    async fn activity_append_and_query_roundtrip() {
223        let journal = MemoryActivityJournal::new();
224
225        let entry = ActivityEntry {
226            timestamp_ms: 1_000_000,
227            session_key: "sess-1".into(),
228            role: "user".into(),
229            content: "hello world".into(),
230            event_type: None,
231            metadata: None,
232        };
233
234        let appended = journal.append(entry).await;
235        assert!(appended.is_ok(), "append should succeed");
236
237        let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
238        let results = journal.query(&query).await;
239        assert!(results.is_ok(), "query should succeed");
240        let results = results.unwrap_or_default();
241        assert_eq!(results.len(), 1);
242        assert_eq!(results[0].content, "hello world");
243    }
244
245    #[tokio::test]
246    async fn activity_time_window_filtering() {
247        let journal = MemoryActivityJournal::new();
248
249        let _ = journal
250            .append(ActivityEntry {
251                timestamp_ms: 0,
252                session_key: "s".into(),
253                role: "user".into(),
254                content: "early".into(),
255                event_type: None,
256                metadata: None,
257            })
258            .await;
259        let _ = journal
260            .append(ActivityEntry {
261                timestamp_ms: 1_000_000,
262                session_key: "s".into(),
263                role: "agent".into(),
264                content: "middle".into(),
265                event_type: None,
266                metadata: None,
267            })
268            .await;
269        let _ = journal
270            .append(ActivityEntry {
271                timestamp_ms: 2_000_000,
272                session_key: "s".into(),
273                role: "system".into(),
274                content: "late".into(),
275                event_type: None,
276                metadata: None,
277            })
278            .await;
279
280        let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
281        let results = journal.query(&query).await.unwrap_or_default();
282        assert_eq!(results.len(), 1, "only the middle entry should match");
283        assert_eq!(results[0].content, "middle");
284    }
285
286    #[tokio::test]
287    async fn activity_role_filtering() {
288        let journal = MemoryActivityJournal::new();
289
290        let _ = journal
291            .append(ActivityEntry {
292                timestamp_ms: 500_000,
293                session_key: "s".into(),
294                role: "user".into(),
295                content: "user msg".into(),
296                event_type: None,
297                metadata: None,
298            })
299            .await;
300        let _ = journal
301            .append(ActivityEntry {
302                timestamp_ms: 500_000,
303                session_key: "s".into(),
304                role: "agent".into(),
305                content: "agent msg".into(),
306                event_type: None,
307                metadata: None,
308            })
309            .await;
310
311        let query = ActivityQuery {
312            anchor_ms: 500_000,
313            window_minutes: 60,
314            role_filter: Some("user".into()),
315        };
316        let results = journal.query(&query).await.unwrap_or_default();
317        assert_eq!(results.len(), 1, "only user entries should match");
318        assert_eq!(results[0].role, "user");
319    }
320
321    #[tokio::test]
322    async fn activity_empty_results_for_out_of_range() {
323        let journal = MemoryActivityJournal::new();
324
325        let _ = journal
326            .append(ActivityEntry {
327                timestamp_ms: 1_000_000,
328                session_key: "s".into(),
329                role: "user".into(),
330                content: "msg".into(),
331                event_type: None,
332                metadata: None,
333            })
334            .await;
335
336        let query =
337            ActivityQuery { anchor_ms: 9_999_999_999, window_minutes: 1, role_filter: None };
338        let results = journal.query(&query).await.unwrap_or_default();
339        assert!(results.is_empty(), "should return empty for out-of-range query");
340    }
341
342    #[tokio::test]
343    async fn activity_count_tracks_appends() {
344        let journal = MemoryActivityJournal::new();
345
346        assert_eq!(journal.count().await.unwrap_or(99), 0, "fresh journal should have 0 entries");
347
348        for i in 0..5 {
349            let _ = journal
350                .append(ActivityEntry {
351                    timestamp_ms: i,
352                    session_key: "s".into(),
353                    role: "user".into(),
354                    content: format!("msg-{i}"),
355                    event_type: None,
356                    metadata: None,
357                })
358                .await;
359        }
360
361        assert_eq!(journal.count().await.unwrap_or(0), 5, "count should be 5 after 5 appends");
362    }
363
364    #[tokio::test]
365    async fn activity_arc_dyn_journal_works() {
366        let journal: Arc<dyn bob_core::ports::ActivityJournalPort> =
367            Arc::new(MemoryActivityJournal::new());
368
369        let _ = journal
370            .append(ActivityEntry {
371                timestamp_ms: 42,
372                session_key: "s".into(),
373                role: "system".into(),
374                content: "arc test".into(),
375                event_type: Some("file_created".into()),
376                metadata: Some(serde_json::json!({"key": "value"})),
377            })
378            .await;
379
380        let query = ActivityQuery { anchor_ms: 42, window_minutes: 1, role_filter: None };
381        let results = journal.query(&query).await.unwrap_or_default();
382        assert_eq!(results.len(), 1);
383        assert_eq!(results[0].event_type.as_deref(), Some("file_created"));
384    }
385}