Skip to main content

symbi_runtime/reasoning/
journal.rs

1//! Durable execution journal with SQLite storage
2//!
3//! Provides append-only, crash-recoverable journal storage for reasoning loops.
4//! Each phase boundary is a checkpoint; crashed loops resume deterministically
5//! by replaying journal entries.
6//!
7//! Feature-gated behind `cron` (which includes `rusqlite`).
8
9use crate::reasoning::loop_types::{JournalEntry, JournalError, JournalWriter};
10use crate::types::AgentId;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15/// Trait for durable journal storage backends.
16#[async_trait::async_trait]
17pub trait JournalStorage: Send + Sync {
18    /// Append an entry to persistent storage.
19    async fn store(&self, entry: &JournalEntry) -> Result<(), JournalError>;
20
21    /// Read all entries for a given agent, ordered by sequence.
22    async fn read_entries(&self, agent_id: &AgentId) -> Result<Vec<JournalEntry>, JournalError>;
23
24    /// Read entries starting from a given sequence number.
25    async fn read_from(
26        &self,
27        agent_id: &AgentId,
28        from_sequence: u64,
29    ) -> Result<Vec<JournalEntry>, JournalError>;
30
31    /// Get the latest sequence number for an agent (0 if none).
32    async fn latest_sequence(&self, agent_id: &AgentId) -> Result<u64, JournalError>;
33
34    /// Delete all entries for an agent (compaction after loop completion).
35    async fn compact(&self, agent_id: &AgentId) -> Result<u64, JournalError>;
36}
37
38/// In-memory journal storage for testing and lightweight use.
39pub struct MemoryJournalStorage {
40    entries: Mutex<Vec<JournalEntry>>,
41}
42
43impl Default for MemoryJournalStorage {
44    fn default() -> Self {
45        Self::new()
46    }
47}
48
49impl MemoryJournalStorage {
50    pub fn new() -> Self {
51        Self {
52            entries: Mutex::new(Vec::new()),
53        }
54    }
55}
56
57#[async_trait::async_trait]
58impl JournalStorage for MemoryJournalStorage {
59    async fn store(&self, entry: &JournalEntry) -> Result<(), JournalError> {
60        self.entries.lock().await.push(entry.clone());
61        Ok(())
62    }
63
64    async fn read_entries(&self, agent_id: &AgentId) -> Result<Vec<JournalEntry>, JournalError> {
65        let entries = self.entries.lock().await;
66        Ok(entries
67            .iter()
68            .filter(|e| e.agent_id == *agent_id)
69            .cloned()
70            .collect())
71    }
72
73    async fn read_from(
74        &self,
75        agent_id: &AgentId,
76        from_sequence: u64,
77    ) -> Result<Vec<JournalEntry>, JournalError> {
78        let entries = self.entries.lock().await;
79        Ok(entries
80            .iter()
81            .filter(|e| e.agent_id == *agent_id && e.sequence >= from_sequence)
82            .cloned()
83            .collect())
84    }
85
86    async fn latest_sequence(&self, agent_id: &AgentId) -> Result<u64, JournalError> {
87        let entries = self.entries.lock().await;
88        Ok(entries
89            .iter()
90            .filter(|e| e.agent_id == *agent_id)
91            .map(|e| e.sequence)
92            .max()
93            .unwrap_or(0))
94    }
95
96    async fn compact(&self, agent_id: &AgentId) -> Result<u64, JournalError> {
97        let mut entries = self.entries.lock().await;
98        let before = entries.len();
99        entries.retain(|e| e.agent_id != *agent_id);
100        Ok((before - entries.len()) as u64)
101    }
102}
103
104/// Durable journal backed by a `JournalStorage` implementation.
105///
106/// Implements `JournalWriter` so it can be used as a drop-in replacement
107/// for `BufferedJournal` in the reasoning loop.
108pub struct DurableJournal {
109    storage: Arc<dyn JournalStorage>,
110    sequence: AtomicU64,
111    agent_id: AgentId,
112}
113
114impl DurableJournal {
115    /// Create a new durable journal for the given agent.
116    pub fn new(storage: Arc<dyn JournalStorage>, agent_id: AgentId) -> Self {
117        Self {
118            storage,
119            sequence: AtomicU64::new(0),
120            agent_id,
121        }
122    }
123
124    /// Initialize from storage, resuming the sequence counter.
125    pub async fn initialize(&self) -> Result<(), JournalError> {
126        let latest = self.storage.latest_sequence(&self.agent_id).await?;
127        self.sequence.store(latest, Ordering::SeqCst);
128        Ok(())
129    }
130
131    /// Replay all journal entries for this agent.
132    pub async fn replay(&self) -> Result<Vec<JournalEntry>, JournalError> {
133        self.storage.read_entries(&self.agent_id).await
134    }
135
136    /// Replay entries starting from a given sequence.
137    pub async fn replay_from(&self, from_sequence: u64) -> Result<Vec<JournalEntry>, JournalError> {
138        self.storage.read_from(&self.agent_id, from_sequence).await
139    }
140
141    /// Compact (remove) all entries for this agent after successful loop completion.
142    pub async fn compact(&self) -> Result<u64, JournalError> {
143        let removed = self.storage.compact(&self.agent_id).await?;
144        self.sequence.store(0, Ordering::SeqCst);
145        Ok(removed)
146    }
147
148    /// Determine the last completed iteration from the journal.
149    pub async fn last_completed_iteration(&self) -> Result<u32, JournalError> {
150        let entries = self.storage.read_entries(&self.agent_id).await?;
151        Ok(entries.iter().map(|e| e.iteration).max().unwrap_or(0))
152    }
153}
154
155#[async_trait::async_trait]
156impl JournalWriter for DurableJournal {
157    async fn append(&self, mut entry: JournalEntry) -> Result<(), JournalError> {
158        let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
159        entry.sequence = seq;
160        entry.agent_id = self.agent_id;
161        self.storage.store(&entry).await
162    }
163
164    async fn next_sequence(&self) -> u64 {
165        self.sequence.load(Ordering::SeqCst)
166    }
167}
168
169/// Export all journal entries for an agent as a JSON string for backup.
170pub async fn export_entries(
171    storage: &dyn JournalStorage,
172    agent_id: &AgentId,
173) -> Result<String, JournalError> {
174    let entries = storage.read_entries(agent_id).await?;
175    serde_json::to_string_pretty(&entries)
176        .map_err(|e| JournalError::WriteFailed(format!("Failed to serialize journal entries: {e}")))
177}
178
179/// Import journal entries from a JSON string (restore from backup).
180///
181/// Entries are appended to storage. Callers should compact first if
182/// a clean restore is desired.
183pub async fn import_entries(
184    storage: &dyn JournalStorage,
185    json: &str,
186) -> Result<usize, JournalError> {
187    let entries: Vec<JournalEntry> = serde_json::from_str(json).map_err(|e| {
188        JournalError::ReadFailed(format!("Failed to deserialize journal entries: {e}"))
189    })?;
190    let count = entries.len();
191    for entry in &entries {
192        storage.store(entry).await?;
193    }
194    Ok(count)
195}
196
197#[cfg(test)]
198mod tests {
199    use super::*;
200    use crate::reasoning::loop_types::{LoopConfig, LoopEvent};
201
202    fn make_entry(agent_id: AgentId, sequence: u64, iteration: u32) -> JournalEntry {
203        JournalEntry {
204            sequence,
205            timestamp: chrono::Utc::now(),
206            agent_id,
207            iteration,
208            event: LoopEvent::Started {
209                agent_id,
210                config: Box::new(LoopConfig::default()),
211            },
212        }
213    }
214
215    #[tokio::test]
216    async fn test_memory_storage_store_and_read() {
217        let storage = MemoryJournalStorage::new();
218        let agent = AgentId::new();
219
220        storage.store(&make_entry(agent, 0, 0)).await.unwrap();
221        storage.store(&make_entry(agent, 1, 1)).await.unwrap();
222
223        let entries = storage.read_entries(&agent).await.unwrap();
224        assert_eq!(entries.len(), 2);
225        assert_eq!(entries[0].sequence, 0);
226        assert_eq!(entries[1].sequence, 1);
227    }
228
229    #[tokio::test]
230    async fn test_memory_storage_read_from() {
231        let storage = MemoryJournalStorage::new();
232        let agent = AgentId::new();
233
234        for i in 0..5 {
235            storage
236                .store(&make_entry(agent, i, i as u32))
237                .await
238                .unwrap();
239        }
240
241        let entries = storage.read_from(&agent, 3).await.unwrap();
242        assert_eq!(entries.len(), 2);
243        assert_eq!(entries[0].sequence, 3);
244        assert_eq!(entries[1].sequence, 4);
245    }
246
247    #[tokio::test]
248    async fn test_memory_storage_latest_sequence() {
249        let storage = MemoryJournalStorage::new();
250        let agent = AgentId::new();
251
252        assert_eq!(storage.latest_sequence(&agent).await.unwrap(), 0);
253
254        storage.store(&make_entry(agent, 0, 0)).await.unwrap();
255        storage.store(&make_entry(agent, 5, 2)).await.unwrap();
256
257        assert_eq!(storage.latest_sequence(&agent).await.unwrap(), 5);
258    }
259
260    #[tokio::test]
261    async fn test_memory_storage_compact() {
262        let storage = MemoryJournalStorage::new();
263        let agent = AgentId::new();
264
265        storage.store(&make_entry(agent, 0, 0)).await.unwrap();
266        storage.store(&make_entry(agent, 1, 1)).await.unwrap();
267
268        let removed = storage.compact(&agent).await.unwrap();
269        assert_eq!(removed, 2);
270
271        let entries = storage.read_entries(&agent).await.unwrap();
272        assert!(entries.is_empty());
273    }
274
275    #[tokio::test]
276    async fn test_memory_storage_agent_isolation() {
277        let storage = MemoryJournalStorage::new();
278        let agent_a = AgentId::new();
279        let agent_b = AgentId::new();
280
281        storage.store(&make_entry(agent_a, 0, 0)).await.unwrap();
282        storage.store(&make_entry(agent_b, 0, 0)).await.unwrap();
283        storage.store(&make_entry(agent_a, 1, 1)).await.unwrap();
284
285        assert_eq!(storage.read_entries(&agent_a).await.unwrap().len(), 2);
286        assert_eq!(storage.read_entries(&agent_b).await.unwrap().len(), 1);
287
288        // Compacting agent_a shouldn't affect agent_b
289        storage.compact(&agent_a).await.unwrap();
290        assert_eq!(storage.read_entries(&agent_a).await.unwrap().len(), 0);
291        assert_eq!(storage.read_entries(&agent_b).await.unwrap().len(), 1);
292    }
293
294    #[tokio::test]
295    async fn test_durable_journal_append_and_replay() {
296        let storage = Arc::new(MemoryJournalStorage::new());
297        let agent = AgentId::new();
298        let journal = DurableJournal::new(storage, agent);
299
300        journal.append(make_entry(agent, 0, 0)).await.unwrap();
301        journal.append(make_entry(agent, 0, 1)).await.unwrap();
302
303        assert_eq!(journal.next_sequence().await, 2);
304
305        let entries = journal.replay().await.unwrap();
306        assert_eq!(entries.len(), 2);
307        // Sequence is set by the journal, not the caller
308        assert_eq!(entries[0].sequence, 0);
309        assert_eq!(entries[1].sequence, 1);
310    }
311
312    #[tokio::test]
313    async fn test_durable_journal_replay_from() {
314        let storage = Arc::new(MemoryJournalStorage::new());
315        let agent = AgentId::new();
316        let journal = DurableJournal::new(storage, agent);
317
318        for _ in 0..5 {
319            journal.append(make_entry(agent, 0, 0)).await.unwrap();
320        }
321
322        let entries = journal.replay_from(3).await.unwrap();
323        assert_eq!(entries.len(), 2);
324    }
325
326    #[tokio::test]
327    async fn test_durable_journal_initialize_resumes_sequence() {
328        let storage = Arc::new(MemoryJournalStorage::new());
329        let agent = AgentId::new();
330
331        // Write some entries directly to storage
332        for i in 0..3 {
333            storage
334                .store(&make_entry(agent, i, i as u32))
335                .await
336                .unwrap();
337        }
338
339        // Create a new journal and initialize — should resume from sequence 2
340        let journal = DurableJournal::new(storage, agent);
341        journal.initialize().await.unwrap();
342        assert_eq!(journal.next_sequence().await, 2);
343
344        // Next append should get sequence 2
345        journal.append(make_entry(agent, 0, 3)).await.unwrap();
346        assert_eq!(journal.next_sequence().await, 3);
347    }
348
349    #[tokio::test]
350    async fn test_durable_journal_compact() {
351        let storage = Arc::new(MemoryJournalStorage::new());
352        let agent = AgentId::new();
353        let journal = DurableJournal::new(storage, agent);
354
355        journal.append(make_entry(agent, 0, 0)).await.unwrap();
356        journal.append(make_entry(agent, 0, 1)).await.unwrap();
357
358        let removed = journal.compact().await.unwrap();
359        assert_eq!(removed, 2);
360        assert_eq!(journal.next_sequence().await, 0);
361
362        let entries = journal.replay().await.unwrap();
363        assert!(entries.is_empty());
364    }
365
366    #[tokio::test]
367    async fn test_last_completed_iteration() {
368        let storage = Arc::new(MemoryJournalStorage::new());
369        let agent = AgentId::new();
370        let journal = DurableJournal::new(storage, agent);
371
372        assert_eq!(journal.last_completed_iteration().await.unwrap(), 0);
373
374        let mut entry = make_entry(agent, 0, 3);
375        journal.append(entry.clone()).await.unwrap();
376        entry.iteration = 7;
377        journal.append(entry).await.unwrap();
378
379        assert_eq!(journal.last_completed_iteration().await.unwrap(), 7);
380    }
381
382    #[tokio::test]
383    async fn test_export_entries() {
384        let storage = MemoryJournalStorage::new();
385        let agent = AgentId::new();
386
387        storage.store(&make_entry(agent, 0, 0)).await.unwrap();
388        storage.store(&make_entry(agent, 1, 1)).await.unwrap();
389
390        let json = export_entries(&storage, &agent).await.unwrap();
391        assert!(json.contains("sequence"));
392
393        // Should be valid JSON array
394        let parsed: Vec<JournalEntry> = serde_json::from_str(&json).unwrap();
395        assert_eq!(parsed.len(), 2);
396    }
397
398    #[tokio::test]
399    async fn test_import_entries() {
400        let storage = MemoryJournalStorage::new();
401        let agent = AgentId::new();
402
403        // Create entries, export, then import into a fresh storage
404        storage.store(&make_entry(agent, 0, 0)).await.unwrap();
405        storage.store(&make_entry(agent, 1, 1)).await.unwrap();
406
407        let json = export_entries(&storage, &agent).await.unwrap();
408
409        let fresh_storage = MemoryJournalStorage::new();
410        let count = import_entries(&fresh_storage, &json).await.unwrap();
411        assert_eq!(count, 2);
412
413        let entries = fresh_storage.read_entries(&agent).await.unwrap();
414        assert_eq!(entries.len(), 2);
415    }
416
417    #[tokio::test]
418    async fn test_import_invalid_json() {
419        let storage = MemoryJournalStorage::new();
420        let result = import_entries(&storage, "not valid json").await;
421        assert!(result.is_err());
422    }
423}