Skip to main content

bob_adapters/
journal_file.rs

1//! # File-Backed Activity Journal
2//!
3//! Append-only NDJSON activity journal persisted to a single file.
4//!
5//! Each entry is serialized as one JSON line (newline-delimited JSON).
6//! Writes are atomic at the line level: open in append mode, write the
7//! serialized line, flush. Readers scan the full file and filter by
8//! the requested time window.
9//!
10//! ## Usage
11//!
12//! ```rust,ignore
13//! use bob_adapters::journal_file::FileActivityJournal;
14//! let journal = FileActivityJournal::new("/var/bob/activity.journal").await?;
15//! ```
16
17use std::{
18    path::PathBuf,
19    sync::atomic::{AtomicU64, Ordering},
20};
21
22use bob_core::{
23    error::StoreError,
24    ports::ActivityJournalPort,
25    types::{ActivityEntry, ActivityQuery},
26};
27
28/// File-backed activity journal using NDJSON format.
29///
30/// All entries are appended to a single file. A write mutex ensures
31/// line-level atomicity. The entry count is tracked in-memory and
32/// incremented on every successful append.
33#[derive(Debug)]
34pub struct FileActivityJournal {
35    path: PathBuf,
36    write_guard: tokio::sync::Mutex<()>,
37    count: AtomicU64,
38}
39
40impl FileActivityJournal {
41    /// Open (or create) an activity journal at the given file path.
42    ///
43    /// Parent directories are created automatically. The initial entry
44    /// count is determined by scanning the existing file.
45    ///
46    /// # Errors
47    /// Returns a backend error if the directory cannot be created or the
48    /// file cannot be opened for the first scan.
49    pub async fn new(path: PathBuf) -> Result<Self, StoreError> {
50        if let Some(parent) = path.parent() {
51            tokio::fs::create_dir_all(parent).await.map_err(|err| {
52                StoreError::Backend(format!("failed to create journal dir: {err}"))
53            })?;
54        }
55
56        let count = Self::count_lines(&path).await?;
57        Ok(Self { path, write_guard: tokio::sync::Mutex::new(()), count: AtomicU64::new(count) })
58    }
59
60    /// Return the file path backing this journal.
61    #[must_use]
62    pub fn path(&self) -> &PathBuf {
63        &self.path
64    }
65
66    async fn count_lines(path: &PathBuf) -> Result<u64, StoreError> {
67        let raw = match tokio::fs::read_to_string(path).await {
68            Ok(raw) => raw,
69            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(0),
70            Err(err) => {
71                return Err(StoreError::Backend(format!(
72                    "failed to read journal '{}': {err}",
73                    path.display()
74                )));
75            }
76        };
77        let count = raw.lines().filter(|l| !l.trim().is_empty()).count() as u64;
78        Ok(count)
79    }
80}
81
82#[async_trait::async_trait]
83impl ActivityJournalPort for FileActivityJournal {
84    async fn append(&self, entry: ActivityEntry) -> Result<(), StoreError> {
85        let line = serde_json::to_string(&entry)
86            .map_err(|err| StoreError::Serialization(err.to_string()))?;
87        let _lock = self.write_guard.lock().await;
88
89        tokio::fs::OpenOptions::new()
90            .create(true)
91            .append(true)
92            .open(&self.path)
93            .await
94            .map_err(|err| {
95                StoreError::Backend(format!(
96                    "failed to open journal '{}': {err}",
97                    self.path.display()
98                ))
99            })?
100            .write_all(format!("{line}\n").as_bytes())
101            .await
102            .map_err(|err| StoreError::Backend(format!("failed to write journal entry: {err}")))?;
103
104        self.count.fetch_add(1, Ordering::Relaxed);
105        Ok(())
106    }
107
108    async fn query(&self, query: &ActivityQuery) -> Result<Vec<ActivityEntry>, StoreError> {
109        let raw = match tokio::fs::read_to_string(&self.path).await {
110            Ok(raw) => raw,
111            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
112            Err(err) => {
113                return Err(StoreError::Backend(format!(
114                    "failed to read journal '{}': {err}",
115                    self.path.display()
116                )));
117            }
118        };
119
120        let lower = query.lower_bound_ms();
121        let upper = query.upper_bound_ms();
122
123        let entries: Vec<ActivityEntry> = raw
124            .lines()
125            .filter(|l| !l.trim().is_empty())
126            .filter_map(|line| serde_json::from_str::<ActivityEntry>(line).ok())
127            .filter(|entry| entry.timestamp_ms >= lower && entry.timestamp_ms <= upper)
128            .filter(|entry| query.role_filter.as_ref().is_none_or(|role| entry.role == *role))
129            .collect();
130
131        Ok(entries)
132    }
133
134    async fn count(&self) -> Result<u64, StoreError> {
135        Ok(self.count.load(Ordering::Relaxed))
136    }
137}
138
139// Need the write trait for `write_all`.
140use tokio::io::AsyncWriteExt;
141
142#[cfg(test)]
143mod tests {
144    use bob_core::ports::ActivityJournalPort;
145
146    use super::*;
147
148    #[tokio::test]
149    async fn append_and_query_roundtrip() {
150        let temp_dir = tempfile::tempdir();
151        assert!(temp_dir.is_ok(), "tempdir should be created");
152        let temp_dir = match temp_dir {
153            Ok(value) => value,
154            Err(_) => return,
155        };
156        let journal_path = temp_dir.path().join("activity.journal");
157        let journal = FileActivityJournal::new(journal_path).await;
158        assert!(journal.is_ok(), "journal should initialize");
159        let journal = match journal {
160            Ok(value) => value,
161            Err(_) => return,
162        };
163
164        let entry = ActivityEntry {
165            timestamp_ms: 1_000_000,
166            session_key: "sess-1".into(),
167            role: "user".into(),
168            content: "hello world".into(),
169            event_type: None,
170            metadata: None,
171        };
172
173        let appended = journal.append(entry).await;
174        assert!(appended.is_ok(), "append should succeed");
175
176        let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
177        let results = journal.query(&query).await;
178        assert!(results.is_ok(), "query should succeed");
179        let results = results.unwrap_or_default();
180        assert_eq!(results.len(), 1);
181        assert_eq!(results[0].content, "hello world");
182    }
183
184    #[tokio::test]
185    async fn time_window_filtering() {
186        let temp_dir = tempfile::tempdir();
187        assert!(temp_dir.is_ok(), "tempdir should be created");
188        let temp_dir = match temp_dir {
189            Ok(value) => value,
190            Err(_) => return,
191        };
192        let journal_path = temp_dir.path().join("activity.journal");
193        let journal = FileActivityJournal::new(journal_path).await;
194        assert!(journal.is_ok(), "journal should initialize");
195        let journal = match journal {
196            Ok(value) => value,
197            Err(_) => return,
198        };
199
200        // Entry at t=0
201        let _ = journal
202            .append(ActivityEntry {
203                timestamp_ms: 0,
204                session_key: "s".into(),
205                role: "user".into(),
206                content: "early".into(),
207                event_type: None,
208                metadata: None,
209            })
210            .await;
211        // Entry at t=1_000_000 (1000 seconds later)
212        let _ = journal
213            .append(ActivityEntry {
214                timestamp_ms: 1_000_000,
215                session_key: "s".into(),
216                role: "agent".into(),
217                content: "middle".into(),
218                event_type: None,
219                metadata: None,
220            })
221            .await;
222        // Entry at t=2_000_000
223        let _ = journal
224            .append(ActivityEntry {
225                timestamp_ms: 2_000_000,
226                session_key: "s".into(),
227                role: "system".into(),
228                content: "late".into(),
229                event_type: None,
230                metadata: None,
231            })
232            .await;
233
234        // Query a 10-minute window around t=1_000_000
235        let query = ActivityQuery { anchor_ms: 1_000_000, window_minutes: 10, role_filter: None };
236        let results = journal.query(&query).await.unwrap_or_default();
237        assert_eq!(results.len(), 1, "only the middle entry should match");
238        assert_eq!(results[0].content, "middle");
239    }
240
241    #[tokio::test]
242    async fn role_filtering() {
243        let temp_dir = tempfile::tempdir();
244        assert!(temp_dir.is_ok(), "tempdir should be created");
245        let temp_dir = match temp_dir {
246            Ok(value) => value,
247            Err(_) => return,
248        };
249        let journal_path = temp_dir.path().join("activity.journal");
250        let journal = FileActivityJournal::new(journal_path).await;
251        assert!(journal.is_ok(), "journal should initialize");
252        let journal = match journal {
253            Ok(value) => value,
254            Err(_) => return,
255        };
256
257        let _ = journal
258            .append(ActivityEntry {
259                timestamp_ms: 500_000,
260                session_key: "s".into(),
261                role: "user".into(),
262                content: "user msg".into(),
263                event_type: None,
264                metadata: None,
265            })
266            .await;
267        let _ = journal
268            .append(ActivityEntry {
269                timestamp_ms: 500_000,
270                session_key: "s".into(),
271                role: "agent".into(),
272                content: "agent msg".into(),
273                event_type: None,
274                metadata: None,
275            })
276            .await;
277
278        let query = ActivityQuery {
279            anchor_ms: 500_000,
280            window_minutes: 60,
281            role_filter: Some("user".into()),
282        };
283        let results = journal.query(&query).await.unwrap_or_default();
284        assert_eq!(results.len(), 1, "only user entries should match");
285        assert_eq!(results[0].role, "user");
286    }
287
288    #[tokio::test]
289    async fn empty_results_for_out_of_range() {
290        let temp_dir = tempfile::tempdir();
291        assert!(temp_dir.is_ok(), "tempdir should be created");
292        let temp_dir = match temp_dir {
293            Ok(value) => value,
294            Err(_) => return,
295        };
296        let journal_path = temp_dir.path().join("activity.journal");
297        let journal = FileActivityJournal::new(journal_path).await;
298        assert!(journal.is_ok(), "journal should initialize");
299        let journal = match journal {
300            Ok(value) => value,
301            Err(_) => return,
302        };
303
304        let _ = journal
305            .append(ActivityEntry {
306                timestamp_ms: 1_000_000,
307                session_key: "s".into(),
308                role: "user".into(),
309                content: "msg".into(),
310                event_type: None,
311                metadata: None,
312            })
313            .await;
314
315        // Query far away from the entry
316        let query =
317            ActivityQuery { anchor_ms: 9_999_999_999, window_minutes: 1, role_filter: None };
318        let results = journal.query(&query).await.unwrap_or_default();
319        assert!(results.is_empty(), "should return empty for out-of-range query");
320    }
321
322    #[tokio::test]
323    async fn persistence_across_instances() {
324        let temp_dir = tempfile::tempdir();
325        assert!(temp_dir.is_ok(), "tempdir should be created");
326        let temp_dir = match temp_dir {
327            Ok(value) => value,
328            Err(_) => return,
329        };
330        let journal_path = temp_dir.path().join("activity.journal");
331
332        // Write with first instance.
333        let first = FileActivityJournal::new(journal_path.clone()).await;
334        assert!(first.is_ok(), "first journal should initialize");
335        let first = match first {
336            Ok(value) => value,
337            Err(_) => return,
338        };
339        let _ = first
340            .append(ActivityEntry {
341                timestamp_ms: 42,
342                session_key: "s".into(),
343                role: "system".into(),
344                content: "persisted".into(),
345                event_type: Some("file_created".into()),
346                metadata: None,
347            })
348            .await;
349
350        // Read with second instance (simulates restart).
351        let second = FileActivityJournal::new(journal_path).await;
352        assert!(second.is_ok(), "second journal should initialize");
353        let second = match second {
354            Ok(value) => value,
355            Err(_) => return,
356        };
357
358        let query = ActivityQuery { anchor_ms: 42, window_minutes: 1, role_filter: None };
359        let results = second.query(&query).await.unwrap_or_default();
360        assert_eq!(results.len(), 1, "entry should persist across instances");
361        assert_eq!(results[0].content, "persisted");
362        assert_eq!(results[0].event_type.as_deref(), Some("file_created"));
363        assert_eq!(second.count().await.unwrap_or(0), 1, "count should be restored on reopen");
364    }
365
366    #[tokio::test]
367    async fn count_tracks_appends() {
368        let temp_dir = tempfile::tempdir();
369        assert!(temp_dir.is_ok(), "tempdir should be created");
370        let temp_dir = match temp_dir {
371            Ok(value) => value,
372            Err(_) => return,
373        };
374        let journal_path = temp_dir.path().join("activity.journal");
375        let journal = FileActivityJournal::new(journal_path).await;
376        assert!(journal.is_ok(), "journal should initialize");
377        let journal = match journal {
378            Ok(value) => value,
379            Err(_) => return,
380        };
381
382        assert_eq!(journal.count().await.unwrap_or(99), 0, "fresh journal should have 0 entries");
383
384        for i in 0..5 {
385            let _ = journal
386                .append(ActivityEntry {
387                    timestamp_ms: i,
388                    session_key: "s".into(),
389                    role: "user".into(),
390                    content: format!("msg-{i}"),
391                    event_type: None,
392                    metadata: None,
393                })
394                .await;
395        }
396
397        assert_eq!(journal.count().await.unwrap_or(0), 5, "count should be 5 after 5 appends");
398    }
399}