1use std::{
18 path::PathBuf,
19 sync::atomic::{AtomicU64, Ordering},
20};
21
22use bob_core::{
23 error::StoreError,
24 ports::ActivityJournalPort,
25 types::{ActivityEntry, ActivityQuery},
26};
27
28#[derive(Debug)]
34pub struct FileActivityJournal {
35 path: PathBuf,
36 write_guard: tokio::sync::Mutex<()>,
37 count: AtomicU64,
38}
39
40impl FileActivityJournal {
41 pub async fn new(path: PathBuf) -> Result<Self, StoreError> {
50 if let Some(parent) = path.parent() {
51 tokio::fs::create_dir_all(parent).await.map_err(|err| {
52 StoreError::Backend(format!("failed to create journal dir: {err}"))
53 })?;
54 }
55
56 let count = Self::count_lines(&path).await?;
57 Ok(Self { path, write_guard: tokio::sync::Mutex::new(()), count: AtomicU64::new(count) })
58 }
59
60 #[must_use]
62 pub fn path(&self) -> &PathBuf {
63 &self.path
64 }
65
66 async fn count_lines(path: &PathBuf) -> Result<u64, StoreError> {
67 let raw = match tokio::fs::read_to_string(path).await {
68 Ok(raw) => raw,
69 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(0),
70 Err(err) => {
71 return Err(StoreError::Backend(format!(
72 "failed to read journal '{}': {err}",
73 path.display()
74 )));
75 }
76 };
77 let count = raw.lines().filter(|l| !l.trim().is_empty()).count() as u64;
78 Ok(count)
79 }
80}
81
82#[async_trait::async_trait]
83impl ActivityJournalPort for FileActivityJournal {
84 async fn append(&self, entry: ActivityEntry) -> Result<(), StoreError> {
85 let line = serde_json::to_string(&entry)
86 .map_err(|err| StoreError::Serialization(err.to_string()))?;
87 let _lock = self.write_guard.lock().await;
88
89 tokio::fs::OpenOptions::new()
90 .create(true)
91 .append(true)
92 .open(&self.path)
93 .await
94 .map_err(|err| {
95 StoreError::Backend(format!(
96 "failed to open journal '{}': {err}",
97 self.path.display()
98 ))
99 })?
100 .write_all(format!("{line}\n").as_bytes())
101 .await
102 .map_err(|err| StoreError::Backend(format!("failed to write journal entry: {err}")))?;
103
104 self.count.fetch_add(1, Ordering::Relaxed);
105 Ok(())
106 }
107
108 async fn query(&self, query: &ActivityQuery) -> Result<Vec<ActivityEntry>, StoreError> {
109 let raw = match tokio::fs::read_to_string(&self.path).await {
110 Ok(raw) => raw,
111 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
112 Err(err) => {
113 return Err(StoreError::Backend(format!(
114 "failed to read journal '{}': {err}",
115 self.path.display()
116 )));
117 }
118 };
119
120 let lower = query.lower_bound_ms();
121 let upper = query.upper_bound_ms();
122
123 let entries: Vec<ActivityEntry> = raw
124 .lines()
125 .filter(|l| !l.trim().is_empty())
126 .filter_map(|line| serde_json::from_str::<ActivityEntry>(line).ok())
127 .filter(|entry| entry.timestamp_ms >= lower && entry.timestamp_ms <= upper)
128 .filter(|entry| query.role_filter.as_ref().is_none_or(|role| entry.role == *role))
129 .collect();
130
131 Ok(entries)
132 }
133
134 async fn count(&self) -> Result<u64, StoreError> {
135 Ok(self.count.load(Ordering::Relaxed))
136 }
137}
138
139use tokio::io::AsyncWriteExt;
141
142#[cfg(test)]
143mod tests {
144 use bob_core::ports::ActivityJournalPort;
145
146 use super::*;
147
148 #[tokio::test]
149 async fn append_and_query_roundtrip() {
150 let temp_dir = tempfile::tempdir();
151 assert!(temp_dir.is_ok(), "tempdir should be created");
152 let temp_dir = match temp_dir {
153 Ok(value) => value,
154 Err(_) => return,
155 };
156 let journal_path = temp_dir.path().join("activity.journal");
157 let journal = FileActivityJournal::new(journal_path).await;
158 assert!(journal.is_ok(), "journal should initialize");
159 let journal = match journal {
160 Ok(value) => value,
161 Err(_) => return,
162 };
163
164 let entry = ActivityEntry {
165 timestamp_ms: 1_000_000,
166 session_key: "sess-1".into(),
167 role: "user".into(),
168 content: "hello world".into(),
169 event_type: None,
170 metadata: None,
171 };
172
173 let appended = journal.append(entry).await;
174 assert!(appended.is_ok(), "append should succeed");
175
176 let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
177 let results = journal.query(&query).await;
178 assert!(results.is_ok(), "query should succeed");
179 let results = results.unwrap_or_default();
180 assert_eq!(results.len(), 1);
181 assert_eq!(results[0].content, "hello world");
182 }
183
184 #[tokio::test]
185 async fn time_window_filtering() {
186 let temp_dir = tempfile::tempdir();
187 assert!(temp_dir.is_ok(), "tempdir should be created");
188 let temp_dir = match temp_dir {
189 Ok(value) => value,
190 Err(_) => return,
191 };
192 let journal_path = temp_dir.path().join("activity.journal");
193 let journal = FileActivityJournal::new(journal_path).await;
194 assert!(journal.is_ok(), "journal should initialize");
195 let journal = match journal {
196 Ok(value) => value,
197 Err(_) => return,
198 };
199
200 let _ = journal
202 .append(ActivityEntry {
203 timestamp_ms: 0,
204 session_key: "s".into(),
205 role: "user".into(),
206 content: "early".into(),
207 event_type: None,
208 metadata: None,
209 })
210 .await;
211 let _ = journal
213 .append(ActivityEntry {
214 timestamp_ms: 1_000_000,
215 session_key: "s".into(),
216 role: "agent".into(),
217 content: "middle".into(),
218 event_type: None,
219 metadata: None,
220 })
221 .await;
222 let _ = journal
224 .append(ActivityEntry {
225 timestamp_ms: 2_000_000,
226 session_key: "s".into(),
227 role: "system".into(),
228 content: "late".into(),
229 event_type: None,
230 metadata: None,
231 })
232 .await;
233
234 let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
236 let results = journal.query(&query).await.unwrap_or_default();
237 assert_eq!(results.len(), 1, "only the middle entry should match");
238 assert_eq!(results[0].content, "middle");
239 }
240
241 #[tokio::test]
242 async fn role_filtering() {
243 let temp_dir = tempfile::tempdir();
244 assert!(temp_dir.is_ok(), "tempdir should be created");
245 let temp_dir = match temp_dir {
246 Ok(value) => value,
247 Err(_) => return,
248 };
249 let journal_path = temp_dir.path().join("activity.journal");
250 let journal = FileActivityJournal::new(journal_path).await;
251 assert!(journal.is_ok(), "journal should initialize");
252 let journal = match journal {
253 Ok(value) => value,
254 Err(_) => return,
255 };
256
257 let _ = journal
258 .append(ActivityEntry {
259 timestamp_ms: 500_000,
260 session_key: "s".into(),
261 role: "user".into(),
262 content: "user msg".into(),
263 event_type: None,
264 metadata: None,
265 })
266 .await;
267 let _ = journal
268 .append(ActivityEntry {
269 timestamp_ms: 500_000,
270 session_key: "s".into(),
271 role: "agent".into(),
272 content: "agent msg".into(),
273 event_type: None,
274 metadata: None,
275 })
276 .await;
277
278 let query = ActivityQuery {
279 anchor_ms: 500_000,
280 window_minutes: 60,
281 role_filter: Some("user".into()),
282 };
283 let results = journal.query(&query).await.unwrap_or_default();
284 assert_eq!(results.len(), 1, "only user entries should match");
285 assert_eq!(results[0].role, "user");
286 }
287
288 #[tokio::test]
289 async fn empty_results_for_out_of_range() {
290 let temp_dir = tempfile::tempdir();
291 assert!(temp_dir.is_ok(), "tempdir should be created");
292 let temp_dir = match temp_dir {
293 Ok(value) => value,
294 Err(_) => return,
295 };
296 let journal_path = temp_dir.path().join("activity.journal");
297 let journal = FileActivityJournal::new(journal_path).await;
298 assert!(journal.is_ok(), "journal should initialize");
299 let journal = match journal {
300 Ok(value) => value,
301 Err(_) => return,
302 };
303
304 let _ = journal
305 .append(ActivityEntry {
306 timestamp_ms: 1_000_000,
307 session_key: "s".into(),
308 role: "user".into(),
309 content: "msg".into(),
310 event_type: None,
311 metadata: None,
312 })
313 .await;
314
315 let query =
317 ActivityQuery { anchor_ms: 9_999_999_999, window_minutes: 1, role_filter: None };
318 let results = journal.query(&query).await.unwrap_or_default();
319 assert!(results.is_empty(), "should return empty for out-of-range query");
320 }
321
322 #[tokio::test]
323 async fn persistence_across_instances() {
324 let temp_dir = tempfile::tempdir();
325 assert!(temp_dir.is_ok(), "tempdir should be created");
326 let temp_dir = match temp_dir {
327 Ok(value) => value,
328 Err(_) => return,
329 };
330 let journal_path = temp_dir.path().join("activity.journal");
331
332 let first = FileActivityJournal::new(journal_path.clone()).await;
334 assert!(first.is_ok(), "first journal should initialize");
335 let first = match first {
336 Ok(value) => value,
337 Err(_) => return,
338 };
339 let _ = first
340 .append(ActivityEntry {
341 timestamp_ms: 42,
342 session_key: "s".into(),
343 role: "system".into(),
344 content: "persisted".into(),
345 event_type: Some("file_created".into()),
346 metadata: None,
347 })
348 .await;
349
350 let second = FileActivityJournal::new(journal_path).await;
352 assert!(second.is_ok(), "second journal should initialize");
353 let second = match second {
354 Ok(value) => value,
355 Err(_) => return,
356 };
357
358 let query = ActivityQuery { anchor_ms: 42, window_minutes: 1, role_filter: None };
359 let results = second.query(&query).await.unwrap_or_default();
360 assert_eq!(results.len(), 1, "entry should persist across instances");
361 assert_eq!(results[0].content, "persisted");
362 assert_eq!(results[0].event_type.as_deref(), Some("file_created"));
363 assert_eq!(second.count().await.unwrap_or(0), 1, "count should be restored on reopen");
364 }
365
366 #[tokio::test]
367 async fn count_tracks_appends() {
368 let temp_dir = tempfile::tempdir();
369 assert!(temp_dir.is_ok(), "tempdir should be created");
370 let temp_dir = match temp_dir {
371 Ok(value) => value,
372 Err(_) => return,
373 };
374 let journal_path = temp_dir.path().join("activity.journal");
375 let journal = FileActivityJournal::new(journal_path).await;
376 assert!(journal.is_ok(), "journal should initialize");
377 let journal = match journal {
378 Ok(value) => value,
379 Err(_) => return,
380 };
381
382 assert_eq!(journal.count().await.unwrap_or(99), 0, "fresh journal should have 0 entries");
383
384 for i in 0..5 {
385 let _ = journal
386 .append(ActivityEntry {
387 timestamp_ms: i,
388 session_key: "s".into(),
389 role: "user".into(),
390 content: format!("msg-{i}"),
391 event_type: None,
392 metadata: None,
393 })
394 .await;
395 }
396
397 assert_eq!(journal.count().await.unwrap_or(0), 5, "count should be 5 after 5 appends");
398 }
399}