agent_memory/
journal.rs

1//! Durable episodic memory journal implementations.
2
3use std::path::{Path, PathBuf};
4
5use async_trait::async_trait;
6use tokio::fs::{self, OpenOptions};
7use tokio::io::{AsyncSeekExt, AsyncWriteExt};
8use tokio::sync::Mutex;
9
10use crate::MemoryResult;
11use crate::record::MemoryRecord;
12
13/// Trait implemented by durable journals.
14#[async_trait]
15pub trait Journal: Send + Sync {
16    /// Appends a record to the journal.
17    async fn append(&self, record: &MemoryRecord) -> MemoryResult<()>;
18
19    /// Returns the most recent `limit` records, ordered oldest to newest.
20    async fn tail(&self, limit: usize) -> MemoryResult<Vec<MemoryRecord>>;
21
22    /// Clears the journal contents.
23    async fn clear(&self) -> MemoryResult<()>;
24}
25
26/// File-backed journal writing newline-delimited JSON entries.
27pub struct FileJournal {
28    path: PathBuf,
29    file: Mutex<tokio::fs::File>,
30}
31
32impl FileJournal {
33    /// Opens (or creates) a journal file at the provided path.
34    ///
35    /// # Errors
36    ///
37    /// Propagates I/O and serialization errors encountered while preparing the
38    /// file.
39    pub async fn open(path: impl Into<PathBuf>) -> MemoryResult<Self> {
40        let path = path.into();
41        if let Some(parent) = path.parent() {
42            fs::create_dir_all(parent).await?;
43        }
44
45        let file = OpenOptions::new()
46            .create(true)
47            .append(true)
48            .read(true)
49            .open(&path)
50            .await?;
51
52        Ok(Self {
53            path,
54            file: Mutex::new(file),
55        })
56    }
57
58    /// Returns the underlying path of the journal file.
59    #[must_use]
60    pub fn path(&self) -> &Path {
61        &self.path
62    }
63}
64
65#[async_trait]
66impl Journal for FileJournal {
67    async fn append(&self, record: &MemoryRecord) -> MemoryResult<()> {
68        let line = serde_json::to_vec(record)?;
69        let mut guard = self.file.lock().await;
70        guard.write_all(&line).await?;
71        guard.write_u8(b'\n').await?;
72        guard.flush().await?;
73        Ok(())
74    }
75
76    async fn tail(&self, limit: usize) -> MemoryResult<Vec<MemoryRecord>> {
77        if limit == 0 {
78            return Ok(Vec::new());
79        }
80
81        let data = fs::read(&self.path).await?;
82        if data.is_empty() {
83            return Ok(Vec::new());
84        }
85
86        let mut records = Vec::new();
87        for chunk in data
88            .split(|byte| *byte == b'\n')
89            .filter(|chunk| !chunk.is_empty())
90        {
91            let record: MemoryRecord = serde_json::from_slice(chunk)?;
92            records.push(record);
93        }
94
95        if records.len() <= limit {
96            return Ok(records);
97        }
98
99        let skip = records.len() - limit;
100        Ok(records.into_iter().skip(skip).collect())
101    }
102
103    async fn clear(&self) -> MemoryResult<()> {
104        let mut guard = self.file.lock().await;
105        guard.rewind().await?;
106        guard.set_len(0).await?;
107        guard.flush().await?;
108        Ok(())
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use bytes::Bytes;
116    use uuid::Uuid;
117
118    use crate::record::MemoryChannel;
119
120    fn temp_path() -> PathBuf {
121        let mut path = std::env::temp_dir();
122        path.push(format!("memory-journal-{}.log", Uuid::new_v4()));
123        path
124    }
125
126    #[tokio::test]
127    async fn append_and_tail_roundtrip() {
128        let path = temp_path();
129        let journal = FileJournal::open(&path).await.unwrap();
130
131        for content in ["one", "two", "three"] {
132            let record = crate::record::MemoryRecord::builder(
133                MemoryChannel::Input,
134                Bytes::from_static(content.as_bytes()),
135            )
136            .build()
137            .unwrap();
138            journal.append(&record).await.unwrap();
139        }
140
141        let tail = journal.tail(2).await.unwrap();
142        assert_eq!(tail.len(), 2);
143        assert_eq!(tail[0].payload(), &Bytes::from_static(b"two"));
144        assert_eq!(tail[1].payload(), &Bytes::from_static(b"three"));
145
146        journal.clear().await.unwrap();
147        let empty = journal.tail(10).await.unwrap();
148        assert!(empty.is_empty());
149
150        if path.exists() {
151            let _ = std::fs::remove_file(path);
152        }
153    }
154}