Skip to main content

bob_adapters/
tape_memory.rs

1//! # In-Memory Tape Store
2//!
3//! A `TapeStorePort` implementation backed by `scc::HashMap` for fast concurrent
4//! access. Each session gets its own append-only `Vec<TapeEntry>`.
5//!
6//! ## Usage
7//!
8//! ```rust,ignore
9//! use bob_adapters::tape_memory::InMemoryTapeStore;
10//! let tape = InMemoryTapeStore::new();
11//! ```
12//!
13//! This implementation is suitable for development, testing, and single-process
14//! deployments. For persistent tape storage, implement `TapeStorePort` with a
15//! file-backed (JSONL) or database-backed adapter.
16
17use std::sync::atomic::{AtomicU64, Ordering};
18
19use bob_core::{
20    error::StoreError,
21    tape::{TapeEntry, TapeEntryKind, TapeSearchResult, now_ms},
22    types::SessionId,
23};
24
25/// In-memory tape store using `scc::HashMap` for concurrent access.
26#[derive(Debug)]
27pub struct InMemoryTapeStore {
28    tapes: scc::HashMap<SessionId, Vec<TapeEntry>>,
29    next_id: AtomicU64,
30}
31
32impl InMemoryTapeStore {
33    /// Create a new empty tape store.
34    #[must_use]
35    pub fn new() -> Self {
36        Self { tapes: scc::HashMap::new(), next_id: AtomicU64::new(1) }
37    }
38
39    /// Allocate the next unique entry ID.
40    fn next_entry_id(&self) -> u64 {
41        self.next_id.fetch_add(1, Ordering::Relaxed)
42    }
43}
44
45impl Default for InMemoryTapeStore {
46    fn default() -> Self {
47        Self::new()
48    }
49}
50
51#[async_trait::async_trait]
52impl bob_core::ports::TapeStorePort for InMemoryTapeStore {
53    async fn append(
54        &self,
55        session_id: &SessionId,
56        kind: TapeEntryKind,
57    ) -> Result<TapeEntry, StoreError> {
58        let entry = TapeEntry { id: self.next_entry_id(), kind, timestamp_ms: now_ms() };
59        let entry_clone = entry.clone();
60
61        // Insert or update the session's tape.
62        let map_entry = self.tapes.entry_async(session_id.clone()).await;
63        match map_entry {
64            scc::hash_map::Entry::Occupied(mut occ) => {
65                occ.get_mut().push(entry_clone);
66            }
67            scc::hash_map::Entry::Vacant(vac) => {
68                let _ = vac.insert_entry(vec![entry_clone]);
69            }
70        }
71
72        Ok(entry)
73    }
74
75    async fn entries_since_last_handoff(
76        &self,
77        session_id: &SessionId,
78    ) -> Result<Vec<TapeEntry>, StoreError> {
79        let result = self
80            .tapes
81            .read_async(session_id, |_, entries| {
82                // Find the last handoff index.
83                let last_handoff =
84                    entries.iter().rposition(|e| matches!(e.kind, TapeEntryKind::Handoff { .. }));
85
86                match last_handoff {
87                    Some(idx) => entries[idx + 1..].to_vec(),
88                    None => entries.clone(),
89                }
90            })
91            .await;
92
93        Ok(result.unwrap_or_default())
94    }
95
96    async fn search(
97        &self,
98        session_id: &SessionId,
99        query: &str,
100    ) -> Result<Vec<TapeSearchResult>, StoreError> {
101        let query_lower = query.to_lowercase();
102        let result = self
103            .tapes
104            .read_async(session_id, |_, entries| {
105                entries
106                    .iter()
107                    .filter_map(|entry| {
108                        let text = entry_text(entry);
109                        text.to_lowercase().contains(&query_lower).then(|| {
110                            // Build a snippet (up to 80 chars around the first match).
111                            let snippet = build_snippet(&text, &query_lower);
112                            TapeSearchResult { entry: entry.clone(), snippet }
113                        })
114                    })
115                    .collect::<Vec<_>>()
116            })
117            .await;
118
119        Ok(result.unwrap_or_default())
120    }
121
122    async fn all_entries(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
123        let result = self.tapes.read_async(session_id, |_, entries| entries.clone()).await;
124        Ok(result.unwrap_or_default())
125    }
126
127    async fn anchors(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
128        let result = self
129            .tapes
130            .read_async(session_id, |_, entries| {
131                entries
132                    .iter()
133                    .filter(|e| matches!(e.kind, TapeEntryKind::Anchor { .. }))
134                    .cloned()
135                    .collect::<Vec<_>>()
136            })
137            .await;
138
139        Ok(result.unwrap_or_default())
140    }
141}
142
143/// Extract searchable text from a tape entry.
144fn entry_text(entry: &TapeEntry) -> String {
145    match &entry.kind {
146        TapeEntryKind::Message { content, .. } => content.clone(),
147        TapeEntryKind::Event { event, payload } => {
148            format!("{event}: {}", serde_json::to_string(payload).unwrap_or_default())
149        }
150        TapeEntryKind::Anchor { name, .. } => format!("anchor: {name}"),
151        TapeEntryKind::Handoff { name, summary, .. } => {
152            let s = summary.as_deref().unwrap_or("");
153            format!("handoff: {name} {s}")
154        }
155    }
156}
157
158/// Build a snippet around the first occurrence of `query` in `text`.
159fn build_snippet(text: &str, query: &str) -> String {
160    let lower = text.to_lowercase();
161    let pos = lower.find(query).unwrap_or(0);
162    let start = pos.saturating_sub(40);
163    let end = (pos + query.len() + 40).min(text.len());
164    let mut snippet = String::new();
165    if start > 0 {
166        snippet.push_str("...");
167    }
168    snippet.push_str(&text[start..end]);
169    if end < text.len() {
170        snippet.push_str("...");
171    }
172    snippet
173}
174
175// ── Tests ────────────────────────────────────────────────────────────
176
177#[cfg(test)]
178mod tests {
179    use bob_core::{ports::TapeStorePort, types::Role};
180
181    use super::*;
182
183    #[tokio::test]
184    async fn append_and_read_entries() {
185        let store = InMemoryTapeStore::new();
186        let sid = "session-1".to_string();
187
188        let e1 = store
189            .append(&sid, TapeEntryKind::Message { role: Role::User, content: "hello".into() })
190            .await;
191        assert!(e1.is_ok());
192
193        let e2 = store
194            .append(
195                &sid,
196                TapeEntryKind::Message { role: Role::Assistant, content: "hi there".into() },
197            )
198            .await;
199        assert!(e2.is_ok());
200
201        let all = store.all_entries(&sid).await;
202        assert!(all.is_ok());
203        if let Ok(entries) = all {
204            assert_eq!(entries.len(), 2);
205        }
206    }
207
208    #[tokio::test]
209    async fn entries_since_handoff() {
210        let store = InMemoryTapeStore::new();
211        let sid = "session-2".to_string();
212
213        // Two messages before handoff.
214        let _ = store
215            .append(&sid, TapeEntryKind::Message { role: Role::User, content: "msg1".into() })
216            .await;
217        let _ = store
218            .append(&sid, TapeEntryKind::Message { role: Role::Assistant, content: "resp1".into() })
219            .await;
220
221        // Handoff.
222        let _ = store
223            .append(
224                &sid,
225                TapeEntryKind::Handoff {
226                    name: "topic-switch".into(),
227                    entries_before: 2,
228                    summary: None,
229                },
230            )
231            .await;
232
233        // One message after handoff.
234        let _ = store
235            .append(&sid, TapeEntryKind::Message { role: Role::User, content: "msg2".into() })
236            .await;
237
238        let since = store.entries_since_last_handoff(&sid).await;
239        assert!(since.is_ok());
240        if let Ok(entries) = since {
241            assert_eq!(entries.len(), 1);
242        }
243    }
244
245    #[tokio::test]
246    async fn search_finds_matching_entries() {
247        let store = InMemoryTapeStore::new();
248        let sid = "session-3".to_string();
249
250        let _ = store
251            .append(
252                &sid,
253                TapeEntryKind::Message {
254                    role: Role::User,
255                    content: "compile the Rust project".into(),
256                },
257            )
258            .await;
259        let _ = store
260            .append(
261                &sid,
262                TapeEntryKind::Message {
263                    role: Role::Assistant,
264                    content: "Running cargo build...".into(),
265                },
266            )
267            .await;
268
269        let results = store.search(&sid, "rust").await;
270        assert!(results.is_ok());
271        if let Ok(results) = results {
272            assert_eq!(results.len(), 1);
273        }
274    }
275
276    #[tokio::test]
277    async fn anchors_only_returns_anchor_entries() {
278        let store = InMemoryTapeStore::new();
279        let sid = "session-4".to_string();
280
281        let _ = store
282            .append(&sid, TapeEntryKind::Message { role: Role::User, content: "msg".into() })
283            .await;
284        let _ = store
285            .append(
286                &sid,
287                TapeEntryKind::Anchor {
288                    name: "phase-1".into(),
289                    state: serde_json::json!({"status": "done"}),
290                },
291            )
292            .await;
293
294        let anchors = store.anchors(&sid).await;
295        assert!(anchors.is_ok());
296        if let Ok(anchors) = anchors {
297            assert_eq!(anchors.len(), 1);
298        }
299    }
300}