Skip to main content

agentic_memory/v3/
immortal_log.rs

1//! The append-only immortal log. Never deletes. Never modifies. Only appends.
2
3use super::block::{Block, BlockContent, BlockHash, BlockType};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::fs::{File, OpenOptions};
7use std::io::{BufReader, Read, Seek, Write};
8use std::path::PathBuf;
9
10/// The append-only immortal log.
11/// Source of truth — everything written here is permanent.
12pub struct ImmortalLog {
13    /// Path to the log file
14    path: PathBuf,
15
16    /// File handle for writing
17    file: File,
18
19    /// Current write position
20    write_pos: u64,
21
22    /// Block count
23    block_count: u64,
24
25    /// Last block hash (for chaining)
26    last_hash: BlockHash,
27
28    /// Block offset index (sequence -> file offset)
29    offsets: Vec<u64>,
30
31    /// Content-address index (hash -> sequence)
32    content_index: HashMap<BlockHash, u64>,
33}
34
35impl ImmortalLog {
36    /// Create or open an immortal log
37    pub fn open(path: PathBuf) -> Result<Self, std::io::Error> {
38        let exists = path.exists() && std::fs::metadata(&path)?.len() > 0;
39
40        if exists {
41            Self::load_existing(path)
42        } else {
43            Self::create_new(path)
44        }
45    }
46
47    fn create_new(path: PathBuf) -> Result<Self, std::io::Error> {
48        let file = OpenOptions::new()
49            .read(true)
50            .write(true)
51            .create(true)
52            .truncate(true)
53            .open(&path)?;
54
55        Ok(Self {
56            path,
57            file,
58            write_pos: 0,
59            block_count: 0,
60            last_hash: BlockHash::zero(),
61            offsets: Vec::new(),
62            content_index: HashMap::new(),
63        })
64    }
65
66    fn load_existing(path: PathBuf) -> Result<Self, std::io::Error> {
67        let file = OpenOptions::new().read(true).write(true).open(&path)?;
68
69        let mut log = Self {
70            path,
71            file,
72            write_pos: 0,
73            block_count: 0,
74            last_hash: BlockHash::zero(),
75            offsets: Vec::new(),
76            content_index: HashMap::new(),
77        };
78
79        // Scan and rebuild indexes
80        let read_file = OpenOptions::new().read(true).open(&log.path)?;
81        let mut reader = BufReader::new(read_file);
82
83        loop {
84            let pos = reader.stream_position()?;
85
86            // Read block length prefix (4 bytes)
87            let mut len_buf = [0u8; 4];
88            match reader.read_exact(&mut len_buf) {
89                Ok(_) => {}
90                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
91                Err(e) => return Err(e),
92            }
93            let block_len = u32::from_le_bytes(len_buf) as usize;
94
95            if block_len == 0 {
96                break;
97            }
98
99            // Read block data
100            let mut block_data = vec![0u8; block_len];
101            match reader.read_exact(&mut block_data) {
102                Ok(_) => {}
103                Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
104                Err(e) => return Err(e),
105            }
106
107            // Deserialize
108            if let Ok(block) = serde_json::from_slice::<Block>(&block_data) {
109                log.offsets.push(pos);
110                log.content_index.insert(block.hash, block.sequence);
111                log.last_hash = block.hash;
112                log.block_count = block.sequence + 1;
113            }
114
115            log.write_pos = reader.stream_position()?;
116        }
117
118        Ok(log)
119    }
120
121    /// Append a block to the log
122    pub fn append(
123        &mut self,
124        block_type: BlockType,
125        content: BlockContent,
126    ) -> Result<Block, std::io::Error> {
127        let block = Block::new(self.last_hash, self.block_count, block_type, content);
128
129        // Serialize
130        let block_data = serde_json::to_vec(&block)?;
131        let block_len = block_data.len() as u32;
132
133        // Seek to write position and write length prefix + data
134        self.file.seek(std::io::SeekFrom::Start(self.write_pos))?;
135        self.file.write_all(&block_len.to_le_bytes())?;
136        self.file.write_all(&block_data)?;
137        self.file.flush()?;
138
139        // Update indexes
140        self.offsets.push(self.write_pos);
141        self.content_index.insert(block.hash, block.sequence);
142        self.last_hash = block.hash;
143        self.block_count += 1;
144        self.write_pos += 4 + block_data.len() as u64;
145
146        Ok(block)
147    }
148
149    /// Get block by sequence number
150    pub fn get(&self, sequence: u64) -> Option<Block> {
151        let offset = *self.offsets.get(sequence as usize)?;
152        self.read_block_at(offset)
153    }
154
155    /// Get block by hash
156    pub fn get_by_hash(&self, hash: &BlockHash) -> Option<Block> {
157        let sequence = *self.content_index.get(hash)?;
158        self.get(sequence)
159    }
160
161    /// Read block at file offset
162    fn read_block_at(&self, offset: u64) -> Option<Block> {
163        let file = OpenOptions::new().read(true).open(&self.path).ok()?;
164        let mut reader = BufReader::new(file);
165        reader.seek(std::io::SeekFrom::Start(offset)).ok()?;
166
167        let mut len_buf = [0u8; 4];
168        reader.read_exact(&mut len_buf).ok()?;
169        let block_len = u32::from_le_bytes(len_buf) as usize;
170
171        if block_len == 0 {
172            return None;
173        }
174
175        let mut block_data = vec![0u8; block_len];
176        reader.read_exact(&mut block_data).ok()?;
177
178        serde_json::from_slice(&block_data).ok()
179    }
180
181    /// Iterate over all blocks
182    pub fn iter(&self) -> impl Iterator<Item = Block> + '_ {
183        (0..self.block_count).filter_map(move |seq| self.get(seq))
184    }
185
186    /// Iterate over blocks in sequence range
187    pub fn iter_range(&self, start: u64, end: u64) -> impl Iterator<Item = Block> + '_ {
188        (start..std::cmp::min(end, self.block_count)).filter_map(move |seq| self.get(seq))
189    }
190
191    /// Get block count
192    pub fn len(&self) -> u64 {
193        self.block_count
194    }
195
196    /// Check if log is empty
197    pub fn is_empty(&self) -> bool {
198        self.block_count == 0
199    }
200
201    /// Get last block hash
202    pub fn last_hash(&self) -> BlockHash {
203        self.last_hash
204    }
205
206    /// Verify integrity of entire log
207    pub fn verify_integrity(&self) -> IntegrityReport {
208        let mut report = IntegrityReport {
209            verified: true,
210            blocks_checked: 0,
211            chain_intact: true,
212            missing_blocks: vec![],
213            corrupted_blocks: vec![],
214        };
215
216        let mut expected_prev = BlockHash::zero();
217
218        for seq in 0..self.block_count {
219            match self.get(seq) {
220                Some(block) => {
221                    report.blocks_checked += 1;
222
223                    if !block.verify() {
224                        report.corrupted_blocks.push(seq);
225                        report.verified = false;
226                    }
227
228                    if block.prev_hash != expected_prev {
229                        report.chain_intact = false;
230                        report.verified = false;
231                    }
232
233                    expected_prev = block.hash;
234                }
235                None => {
236                    report.missing_blocks.push(seq);
237                    report.verified = false;
238                }
239            }
240        }
241
242        report
243    }
244}
245
246#[derive(Debug, Clone, Serialize, Deserialize)]
247pub struct IntegrityReport {
248    pub verified: bool,
249    pub blocks_checked: u64,
250    pub chain_intact: bool,
251    pub missing_blocks: Vec<u64>,
252    pub corrupted_blocks: Vec<u64>,
253}
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258    use tempfile::TempDir;
259
260    #[test]
261    fn test_create_and_append() {
262        let dir = TempDir::new().unwrap();
263        let path = dir.path().join("test.log");
264
265        let mut log = ImmortalLog::open(path).unwrap();
266        assert_eq!(log.len(), 0);
267
268        let block = log
269            .append(
270                BlockType::UserMessage,
271                BlockContent::Text {
272                    text: "Hello".to_string(),
273                    role: Some("user".to_string()),
274                    tokens: None,
275                },
276            )
277            .unwrap();
278
279        assert_eq!(log.len(), 1);
280        assert!(block.verify());
281    }
282
283    #[test]
284    fn test_persistence() {
285        let dir = TempDir::new().unwrap();
286        let path = dir.path().join("test.log");
287
288        // Write
289        {
290            let mut log = ImmortalLog::open(path.clone()).unwrap();
291            log.append(
292                BlockType::UserMessage,
293                BlockContent::Text {
294                    text: "First".to_string(),
295                    role: None,
296                    tokens: None,
297                },
298            )
299            .unwrap();
300            log.append(
301                BlockType::AssistantMessage,
302                BlockContent::Text {
303                    text: "Second".to_string(),
304                    role: None,
305                    tokens: None,
306                },
307            )
308            .unwrap();
309        }
310
311        // Read back
312        {
313            let log = ImmortalLog::open(path).unwrap();
314            assert_eq!(log.len(), 2);
315
316            let b0 = log.get(0).unwrap();
317            assert_eq!(b0.block_type, BlockType::UserMessage);
318
319            let b1 = log.get(1).unwrap();
320            assert_eq!(b1.block_type, BlockType::AssistantMessage);
321            assert_eq!(b1.prev_hash, b0.hash);
322        }
323    }
324
325    #[test]
326    fn test_integrity_verification() {
327        let dir = TempDir::new().unwrap();
328        let path = dir.path().join("test.log");
329
330        let mut log = ImmortalLog::open(path).unwrap();
331        for i in 0..10 {
332            log.append(
333                BlockType::UserMessage,
334                BlockContent::Text {
335                    text: format!("Message {}", i),
336                    role: None,
337                    tokens: None,
338                },
339            )
340            .unwrap();
341        }
342
343        let report = log.verify_integrity();
344        assert!(report.verified);
345        assert_eq!(report.blocks_checked, 10);
346        assert!(report.chain_intact);
347    }
348}