1use std::sync::atomic::{AtomicU64, Ordering};
18
19use bob_core::{
20 error::StoreError,
21 tape::{TapeEntry, TapeEntryKind, TapeSearchResult, now_ms},
22 types::SessionId,
23};
24
25#[derive(Debug)]
27pub struct InMemoryTapeStore {
28 tapes: scc::HashMap<SessionId, Vec<TapeEntry>>,
29 next_id: AtomicU64,
30}
31
32impl InMemoryTapeStore {
33 #[must_use]
35 pub fn new() -> Self {
36 Self { tapes: scc::HashMap::new(), next_id: AtomicU64::new(1) }
37 }
38
39 fn next_entry_id(&self) -> u64 {
41 self.next_id.fetch_add(1, Ordering::Relaxed)
42 }
43}
44
45impl Default for InMemoryTapeStore {
46 fn default() -> Self {
47 Self::new()
48 }
49}
50
51#[async_trait::async_trait]
52impl bob_core::ports::TapeStorePort for InMemoryTapeStore {
53 async fn append(
54 &self,
55 session_id: &SessionId,
56 kind: TapeEntryKind,
57 ) -> Result<TapeEntry, StoreError> {
58 let entry = TapeEntry { id: self.next_entry_id(), kind, timestamp_ms: now_ms() };
59 let entry_clone = entry.clone();
60
61 let map_entry = self.tapes.entry_async(session_id.clone()).await;
63 match map_entry {
64 scc::hash_map::Entry::Occupied(mut occ) => {
65 occ.get_mut().push(entry_clone);
66 }
67 scc::hash_map::Entry::Vacant(vac) => {
68 let _ = vac.insert_entry(vec![entry_clone]);
69 }
70 }
71
72 Ok(entry)
73 }
74
75 async fn entries_since_last_handoff(
76 &self,
77 session_id: &SessionId,
78 ) -> Result<Vec<TapeEntry>, StoreError> {
79 let result = self
80 .tapes
81 .read_async(session_id, |_, entries| {
82 let last_handoff =
84 entries.iter().rposition(|e| matches!(e.kind, TapeEntryKind::Handoff { .. }));
85
86 match last_handoff {
87 Some(idx) => entries[idx + 1..].to_vec(),
88 None => entries.clone(),
89 }
90 })
91 .await;
92
93 Ok(result.unwrap_or_default())
94 }
95
96 async fn search(
97 &self,
98 session_id: &SessionId,
99 query: &str,
100 ) -> Result<Vec<TapeSearchResult>, StoreError> {
101 let query_lower = query.to_lowercase();
102 let result = self
103 .tapes
104 .read_async(session_id, |_, entries| {
105 entries
106 .iter()
107 .filter_map(|entry| {
108 let text = entry_text(entry);
109 text.to_lowercase().contains(&query_lower).then(|| {
110 let snippet = build_snippet(&text, &query_lower);
112 TapeSearchResult { entry: entry.clone(), snippet }
113 })
114 })
115 .collect::<Vec<_>>()
116 })
117 .await;
118
119 Ok(result.unwrap_or_default())
120 }
121
122 async fn all_entries(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
123 let result = self.tapes.read_async(session_id, |_, entries| entries.clone()).await;
124 Ok(result.unwrap_or_default())
125 }
126
127 async fn anchors(&self, session_id: &SessionId) -> Result<Vec<TapeEntry>, StoreError> {
128 let result = self
129 .tapes
130 .read_async(session_id, |_, entries| {
131 entries
132 .iter()
133 .filter(|e| matches!(e.kind, TapeEntryKind::Anchor { .. }))
134 .cloned()
135 .collect::<Vec<_>>()
136 })
137 .await;
138
139 Ok(result.unwrap_or_default())
140 }
141}
142
143fn entry_text(entry: &TapeEntry) -> String {
145 match &entry.kind {
146 TapeEntryKind::Message { content, .. } => content.clone(),
147 TapeEntryKind::Event { event, payload } => {
148 format!("{event}: {}", serde_json::to_string(payload).unwrap_or_default())
149 }
150 TapeEntryKind::Anchor { name, .. } => format!("anchor: {name}"),
151 TapeEntryKind::Handoff { name, summary, .. } => {
152 let s = summary.as_deref().unwrap_or("");
153 format!("handoff: {name} {s}")
154 }
155 }
156}
157
158fn build_snippet(text: &str, query: &str) -> String {
160 let lower = text.to_lowercase();
161 let pos = lower.find(query).unwrap_or(0);
162 let start = pos.saturating_sub(40);
163 let end = (pos + query.len() + 40).min(text.len());
164 let mut snippet = String::new();
165 if start > 0 {
166 snippet.push_str("...");
167 }
168 snippet.push_str(&text[start..end]);
169 if end < text.len() {
170 snippet.push_str("...");
171 }
172 snippet
173}
174
175#[cfg(test)]
178mod tests {
179 use bob_core::{ports::TapeStorePort, types::Role};
180
181 use super::*;
182
183 #[tokio::test]
184 async fn append_and_read_entries() {
185 let store = InMemoryTapeStore::new();
186 let sid = "session-1".to_string();
187
188 let e1 = store
189 .append(&sid, TapeEntryKind::Message { role: Role::User, content: "hello".into() })
190 .await;
191 assert!(e1.is_ok());
192
193 let e2 = store
194 .append(
195 &sid,
196 TapeEntryKind::Message { role: Role::Assistant, content: "hi there".into() },
197 )
198 .await;
199 assert!(e2.is_ok());
200
201 let all = store.all_entries(&sid).await;
202 assert!(all.is_ok());
203 if let Ok(entries) = all {
204 assert_eq!(entries.len(), 2);
205 }
206 }
207
208 #[tokio::test]
209 async fn entries_since_handoff() {
210 let store = InMemoryTapeStore::new();
211 let sid = "session-2".to_string();
212
213 let _ = store
215 .append(&sid, TapeEntryKind::Message { role: Role::User, content: "msg1".into() })
216 .await;
217 let _ = store
218 .append(&sid, TapeEntryKind::Message { role: Role::Assistant, content: "resp1".into() })
219 .await;
220
221 let _ = store
223 .append(
224 &sid,
225 TapeEntryKind::Handoff {
226 name: "topic-switch".into(),
227 entries_before: 2,
228 summary: None,
229 },
230 )
231 .await;
232
233 let _ = store
235 .append(&sid, TapeEntryKind::Message { role: Role::User, content: "msg2".into() })
236 .await;
237
238 let since = store.entries_since_last_handoff(&sid).await;
239 assert!(since.is_ok());
240 if let Ok(entries) = since {
241 assert_eq!(entries.len(), 1);
242 }
243 }
244
245 #[tokio::test]
246 async fn search_finds_matching_entries() {
247 let store = InMemoryTapeStore::new();
248 let sid = "session-3".to_string();
249
250 let _ = store
251 .append(
252 &sid,
253 TapeEntryKind::Message {
254 role: Role::User,
255 content: "compile the Rust project".into(),
256 },
257 )
258 .await;
259 let _ = store
260 .append(
261 &sid,
262 TapeEntryKind::Message {
263 role: Role::Assistant,
264 content: "Running cargo build...".into(),
265 },
266 )
267 .await;
268
269 let results = store.search(&sid, "rust").await;
270 assert!(results.is_ok());
271 if let Ok(results) = results {
272 assert_eq!(results.len(), 1);
273 }
274 }
275
276 #[tokio::test]
277 async fn anchors_only_returns_anchor_entries() {
278 let store = InMemoryTapeStore::new();
279 let sid = "session-4".to_string();
280
281 let _ = store
282 .append(&sid, TapeEntryKind::Message { role: Role::User, content: "msg".into() })
283 .await;
284 let _ = store
285 .append(
286 &sid,
287 TapeEntryKind::Anchor {
288 name: "phase-1".into(),
289 state: serde_json::json!({"status": "done"}),
290 },
291 )
292 .await;
293
294 let anchors = store.anchors(&sid).await;
295 assert!(anchors.is_ok());
296 if let Ok(anchors) = anchors {
297 assert_eq!(anchors.len(), 1);
298 }
299 }
300}