1use std::sync::atomic::{AtomicU64, Ordering};
8
9use bob_core::{
10 error::StoreError,
11 journal::{JournalEntry, ToolJournalPort},
12 types::{ActivityEntry, ActivityQuery, SessionId},
13};
14
15#[derive(Debug)]
19pub struct InMemoryToolJournal {
20 inner: scc::HashMap<SessionId, Vec<JournalEntry>>,
21}
22
23impl Default for InMemoryToolJournal {
24 fn default() -> Self {
25 Self::new()
26 }
27}
28
29impl InMemoryToolJournal {
30 #[must_use]
32 pub fn new() -> Self {
33 Self { inner: scc::HashMap::new() }
34 }
35}
36
37#[async_trait::async_trait]
38impl ToolJournalPort for InMemoryToolJournal {
39 async fn append(&self, entry: JournalEntry) -> Result<(), StoreError> {
40 let session_id = entry.session_id.clone();
41 let map_entry = self.inner.entry_async(session_id).await;
42 match map_entry {
43 scc::hash_map::Entry::Occupied(mut occ) => {
44 occ.get_mut().push(entry);
45 }
46 scc::hash_map::Entry::Vacant(vac) => {
47 let _ = vac.insert_entry(vec![entry]);
48 }
49 }
50 Ok(())
51 }
52
53 async fn lookup(
54 &self,
55 session_id: &SessionId,
56 fingerprint: &str,
57 ) -> Result<Option<JournalEntry>, StoreError> {
58 let found = self
59 .inner
60 .read_async(session_id, |_k, entries| {
61 entries.iter().find(|e| e.call_fingerprint == fingerprint).cloned()
62 })
63 .await;
64 Ok(found.flatten())
65 }
66
67 async fn entries(&self, session_id: &SessionId) -> Result<Vec<JournalEntry>, StoreError> {
68 let entries =
69 self.inner.read_async(session_id, |_k, v| v.clone()).await.unwrap_or_default();
70 Ok(entries)
71 }
72}
73
74#[derive(Debug)]
81pub struct MemoryActivityJournal {
82 entries: tokio::sync::Mutex<Vec<ActivityEntry>>,
83 count: AtomicU64,
84}
85
86impl Default for MemoryActivityJournal {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl MemoryActivityJournal {
93 #[must_use]
95 pub fn new() -> Self {
96 Self { entries: tokio::sync::Mutex::new(Vec::new()), count: AtomicU64::new(0) }
97 }
98}
99
100#[async_trait::async_trait]
101impl bob_core::ports::ActivityJournalPort for MemoryActivityJournal {
102 async fn append(&self, entry: ActivityEntry) -> Result<(), StoreError> {
103 let mut entries = self.entries.lock().await;
104 entries.push(entry);
105 self.count.fetch_add(1, Ordering::Relaxed);
106 Ok(())
107 }
108
109 async fn query(&self, query: &ActivityQuery) -> Result<Vec<ActivityEntry>, StoreError> {
110 let entries = self.entries.lock().await;
111 let lower = query.lower_bound_ms();
112 let upper = query.upper_bound_ms();
113
114 let results: Vec<ActivityEntry> = entries
115 .iter()
116 .filter(|e| e.timestamp_ms >= lower && e.timestamp_ms <= upper)
117 .filter(|e| query.role_filter.as_ref().is_none_or(|role| e.role == *role))
118 .cloned()
119 .collect();
120
121 Ok(results)
122 }
123
124 async fn count(&self) -> Result<u64, StoreError> {
125 Ok(self.count.load(Ordering::Relaxed))
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use std::sync::Arc;
132
133 use bob_core::journal::JournalEntry;
134
135 use super::*;
136
137 #[tokio::test]
140 async fn append_and_lookup_roundtrip() {
141 let journal = InMemoryToolJournal::new();
142 let sid = "sess-1".to_string();
143 let args = serde_json::json!({"path": "/tmp/a"});
144 let fp = JournalEntry::fingerprint("read_file", &args);
145
146 let entry = JournalEntry {
147 session_id: sid.clone(),
148 call_fingerprint: fp.clone(),
149 tool_name: "read_file".to_string(),
150 arguments: args.clone(),
151 result: serde_json::json!({"content": "hello"}),
152 is_error: false,
153 timestamp_ms: 0,
154 };
155
156 journal.append(entry).await.ok();
157
158 let found = journal.lookup(&sid, &fp).await.ok().flatten();
159 assert!(found.is_some(), "should find the appended entry");
160 assert_eq!(found.as_ref().map(|e| e.tool_name.as_str()), Some("read_file"));
161 }
162
163 #[tokio::test]
164 async fn lookup_returns_none_for_missing() {
165 let journal = InMemoryToolJournal::new();
166 let found =
167 journal.lookup(&"missing".to_string(), "nonexistent:fingerprint").await.ok().flatten();
168 assert!(found.is_none(), "should return None for missing session");
169 }
170
171 #[tokio::test]
172 async fn entries_returns_all_for_session() {
173 let journal = InMemoryToolJournal::new();
174 let sid = "sess-2".to_string();
175
176 for i in 0..3 {
177 let args = serde_json::json!({"i": i});
178 let entry = JournalEntry {
179 session_id: sid.clone(),
180 call_fingerprint: JournalEntry::fingerprint("tool", &args),
181 tool_name: "tool".to_string(),
182 arguments: args,
183 result: serde_json::json!({"i": i}),
184 is_error: false,
185 timestamp_ms: 0,
186 };
187 journal.append(entry).await.ok();
188 }
189
190 let all = journal.entries(&sid).await.ok().unwrap_or_default();
191 assert_eq!(all.len(), 3);
192 }
193
194 #[tokio::test]
195 async fn arc_dyn_journal_works() {
196 let journal: Arc<dyn ToolJournalPort> = Arc::new(InMemoryToolJournal::new());
197 let sid = "sess-arc".to_string();
198 let args = serde_json::json!({});
199 let entry = JournalEntry {
200 session_id: sid.clone(),
201 call_fingerprint: JournalEntry::fingerprint("t", &args),
202 tool_name: "t".to_string(),
203 arguments: args,
204 result: serde_json::json!(null),
205 is_error: false,
206 timestamp_ms: 0,
207 };
208 journal.append(entry).await.ok();
209 let found = journal
210 .lookup(&sid, &JournalEntry::fingerprint("t", &serde_json::json!({})))
211 .await
212 .ok()
213 .flatten();
214 assert!(found.is_some());
215 }
216
217 use bob_core::ports::ActivityJournalPort;
220
221 #[tokio::test]
222 async fn activity_append_and_query_roundtrip() {
223 let journal = MemoryActivityJournal::new();
224
225 let entry = ActivityEntry {
226 timestamp_ms: 1_000_000,
227 session_key: "sess-1".into(),
228 role: "user".into(),
229 content: "hello world".into(),
230 event_type: None,
231 metadata: None,
232 };
233
234 let appended = journal.append(entry).await;
235 assert!(appended.is_ok(), "append should succeed");
236
237 let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
238 let results = journal.query(&query).await;
239 assert!(results.is_ok(), "query should succeed");
240 let results = results.unwrap_or_default();
241 assert_eq!(results.len(), 1);
242 assert_eq!(results[0].content, "hello world");
243 }
244
245 #[tokio::test]
246 async fn activity_time_window_filtering() {
247 let journal = MemoryActivityJournal::new();
248
249 let _ = journal
250 .append(ActivityEntry {
251 timestamp_ms: 0,
252 session_key: "s".into(),
253 role: "user".into(),
254 content: "early".into(),
255 event_type: None,
256 metadata: None,
257 })
258 .await;
259 let _ = journal
260 .append(ActivityEntry {
261 timestamp_ms: 1_000_000,
262 session_key: "s".into(),
263 role: "agent".into(),
264 content: "middle".into(),
265 event_type: None,
266 metadata: None,
267 })
268 .await;
269 let _ = journal
270 .append(ActivityEntry {
271 timestamp_ms: 2_000_000,
272 session_key: "s".into(),
273 role: "system".into(),
274 content: "late".into(),
275 event_type: None,
276 metadata: None,
277 })
278 .await;
279
280 let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
281 let results = journal.query(&query).await.unwrap_or_default();
282 assert_eq!(results.len(), 1, "only the middle entry should match");
283 assert_eq!(results[0].content, "middle");
284 }
285
286 #[tokio::test]
287 async fn activity_role_filtering() {
288 let journal = MemoryActivityJournal::new();
289
290 let _ = journal
291 .append(ActivityEntry {
292 timestamp_ms: 500_000,
293 session_key: "s".into(),
294 role: "user".into(),
295 content: "user msg".into(),
296 event_type: None,
297 metadata: None,
298 })
299 .await;
300 let _ = journal
301 .append(ActivityEntry {
302 timestamp_ms: 500_000,
303 session_key: "s".into(),
304 role: "agent".into(),
305 content: "agent msg".into(),
306 event_type: None,
307 metadata: None,
308 })
309 .await;
310
311 let query = ActivityQuery {
312 anchor_ms: 500_000,
313 window_minutes: 60,
314 role_filter: Some("user".into()),
315 };
316 let results = journal.query(&query).await.unwrap_or_default();
317 assert_eq!(results.len(), 1, "only user entries should match");
318 assert_eq!(results[0].role, "user");
319 }
320
321 #[tokio::test]
322 async fn activity_empty_results_for_out_of_range() {
323 let journal = MemoryActivityJournal::new();
324
325 let _ = journal
326 .append(ActivityEntry {
327 timestamp_ms: 1_000_000,
328 session_key: "s".into(),
329 role: "user".into(),
330 content: "msg".into(),
331 event_type: None,
332 metadata: None,
333 })
334 .await;
335
336 let query =
337 ActivityQuery { anchor_ms: 9_999_999_999, window_minutes: 1, role_filter: None };
338 let results = journal.query(&query).await.unwrap_or_default();
339 assert!(results.is_empty(), "should return empty for out-of-range query");
340 }
341
342 #[tokio::test]
343 async fn activity_count_tracks_appends() {
344 let journal = MemoryActivityJournal::new();
345
346 assert_eq!(journal.count().await.unwrap_or(99), 0, "fresh journal should have 0 entries");
347
348 for i in 0..5 {
349 let _ = journal
350 .append(ActivityEntry {
351 timestamp_ms: i,
352 session_key: "s".into(),
353 role: "user".into(),
354 content: format!("msg-{i}"),
355 event_type: None,
356 metadata: None,
357 })
358 .await;
359 }
360
361 assert_eq!(journal.count().await.unwrap_or(0), 5, "count should be 5 after 5 appends");
362 }
363
364 #[tokio::test]
365 async fn activity_arc_dyn_journal_works() {
366 let journal: Arc<dyn bob_core::ports::ActivityJournalPort> =
367 Arc::new(MemoryActivityJournal::new());
368
369 let _ = journal
370 .append(ActivityEntry {
371 timestamp_ms: 42,
372 session_key: "s".into(),
373 role: "system".into(),
374 content: "arc test".into(),
375 event_type: Some("file_created".into()),
376 metadata: Some(serde_json::json!({"key": "value"})),
377 })
378 .await;
379
380 let query = ActivityQuery { anchor_ms: 42, window_minutes: 1, role_filter: None };
381 let results = journal.query(&query).await.unwrap_or_default();
382 assert_eq!(results.len(), 1);
383 assert_eq!(results[0].event_type.as_deref(), Some("file_created"));
384 }
385}