1use crate::reasoning::loop_types::{JournalEntry, JournalError, JournalWriter};
10use crate::types::AgentId;
11use std::sync::atomic::{AtomicU64, Ordering};
12use std::sync::Arc;
13use tokio::sync::Mutex;
14
15#[async_trait::async_trait]
17pub trait JournalStorage: Send + Sync {
18 async fn store(&self, entry: &JournalEntry) -> Result<(), JournalError>;
20
21 async fn read_entries(&self, agent_id: &AgentId) -> Result<Vec<JournalEntry>, JournalError>;
23
24 async fn read_from(
26 &self,
27 agent_id: &AgentId,
28 from_sequence: u64,
29 ) -> Result<Vec<JournalEntry>, JournalError>;
30
31 async fn latest_sequence(&self, agent_id: &AgentId) -> Result<u64, JournalError>;
33
34 async fn compact(&self, agent_id: &AgentId) -> Result<u64, JournalError>;
36}
37
38pub struct MemoryJournalStorage {
40 entries: Mutex<Vec<JournalEntry>>,
41}
42
43impl Default for MemoryJournalStorage {
44 fn default() -> Self {
45 Self::new()
46 }
47}
48
49impl MemoryJournalStorage {
50 pub fn new() -> Self {
51 Self {
52 entries: Mutex::new(Vec::new()),
53 }
54 }
55}
56
57#[async_trait::async_trait]
58impl JournalStorage for MemoryJournalStorage {
59 async fn store(&self, entry: &JournalEntry) -> Result<(), JournalError> {
60 self.entries.lock().await.push(entry.clone());
61 Ok(())
62 }
63
64 async fn read_entries(&self, agent_id: &AgentId) -> Result<Vec<JournalEntry>, JournalError> {
65 let entries = self.entries.lock().await;
66 Ok(entries
67 .iter()
68 .filter(|e| e.agent_id == *agent_id)
69 .cloned()
70 .collect())
71 }
72
73 async fn read_from(
74 &self,
75 agent_id: &AgentId,
76 from_sequence: u64,
77 ) -> Result<Vec<JournalEntry>, JournalError> {
78 let entries = self.entries.lock().await;
79 Ok(entries
80 .iter()
81 .filter(|e| e.agent_id == *agent_id && e.sequence >= from_sequence)
82 .cloned()
83 .collect())
84 }
85
86 async fn latest_sequence(&self, agent_id: &AgentId) -> Result<u64, JournalError> {
87 let entries = self.entries.lock().await;
88 Ok(entries
89 .iter()
90 .filter(|e| e.agent_id == *agent_id)
91 .map(|e| e.sequence)
92 .max()
93 .unwrap_or(0))
94 }
95
96 async fn compact(&self, agent_id: &AgentId) -> Result<u64, JournalError> {
97 let mut entries = self.entries.lock().await;
98 let before = entries.len();
99 entries.retain(|e| e.agent_id != *agent_id);
100 Ok((before - entries.len()) as u64)
101 }
102}
103
104pub struct DurableJournal {
109 storage: Arc<dyn JournalStorage>,
110 sequence: AtomicU64,
111 agent_id: AgentId,
112}
113
114impl DurableJournal {
115 pub fn new(storage: Arc<dyn JournalStorage>, agent_id: AgentId) -> Self {
117 Self {
118 storage,
119 sequence: AtomicU64::new(0),
120 agent_id,
121 }
122 }
123
124 pub async fn initialize(&self) -> Result<(), JournalError> {
126 let latest = self.storage.latest_sequence(&self.agent_id).await?;
127 self.sequence.store(latest, Ordering::SeqCst);
128 Ok(())
129 }
130
131 pub async fn replay(&self) -> Result<Vec<JournalEntry>, JournalError> {
133 self.storage.read_entries(&self.agent_id).await
134 }
135
136 pub async fn replay_from(&self, from_sequence: u64) -> Result<Vec<JournalEntry>, JournalError> {
138 self.storage.read_from(&self.agent_id, from_sequence).await
139 }
140
141 pub async fn compact(&self) -> Result<u64, JournalError> {
143 let removed = self.storage.compact(&self.agent_id).await?;
144 self.sequence.store(0, Ordering::SeqCst);
145 Ok(removed)
146 }
147
148 pub async fn last_completed_iteration(&self) -> Result<u32, JournalError> {
150 let entries = self.storage.read_entries(&self.agent_id).await?;
151 Ok(entries.iter().map(|e| e.iteration).max().unwrap_or(0))
152 }
153}
154
155#[async_trait::async_trait]
156impl JournalWriter for DurableJournal {
157 async fn append(&self, mut entry: JournalEntry) -> Result<(), JournalError> {
158 let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
159 entry.sequence = seq;
160 entry.agent_id = self.agent_id;
161 self.storage.store(&entry).await
162 }
163
164 async fn next_sequence(&self) -> u64 {
165 self.sequence.load(Ordering::SeqCst)
166 }
167}
168
169pub async fn export_entries(
171 storage: &dyn JournalStorage,
172 agent_id: &AgentId,
173) -> Result<String, JournalError> {
174 let entries = storage.read_entries(agent_id).await?;
175 serde_json::to_string_pretty(&entries)
176 .map_err(|e| JournalError::WriteFailed(format!("Failed to serialize journal entries: {e}")))
177}
178
179pub async fn import_entries(
184 storage: &dyn JournalStorage,
185 json: &str,
186) -> Result<usize, JournalError> {
187 let entries: Vec<JournalEntry> = serde_json::from_str(json).map_err(|e| {
188 JournalError::ReadFailed(format!("Failed to deserialize journal entries: {e}"))
189 })?;
190 let count = entries.len();
191 for entry in &entries {
192 storage.store(entry).await?;
193 }
194 Ok(count)
195}
196
197#[cfg(test)]
198mod tests {
199 use super::*;
200 use crate::reasoning::loop_types::{LoopConfig, LoopEvent};
201
202 fn make_entry(agent_id: AgentId, sequence: u64, iteration: u32) -> JournalEntry {
203 JournalEntry {
204 sequence,
205 timestamp: chrono::Utc::now(),
206 agent_id,
207 iteration,
208 event: LoopEvent::Started {
209 agent_id,
210 config: Box::new(LoopConfig::default()),
211 },
212 }
213 }
214
215 #[tokio::test]
216 async fn test_memory_storage_store_and_read() {
217 let storage = MemoryJournalStorage::new();
218 let agent = AgentId::new();
219
220 storage.store(&make_entry(agent, 0, 0)).await.unwrap();
221 storage.store(&make_entry(agent, 1, 1)).await.unwrap();
222
223 let entries = storage.read_entries(&agent).await.unwrap();
224 assert_eq!(entries.len(), 2);
225 assert_eq!(entries[0].sequence, 0);
226 assert_eq!(entries[1].sequence, 1);
227 }
228
229 #[tokio::test]
230 async fn test_memory_storage_read_from() {
231 let storage = MemoryJournalStorage::new();
232 let agent = AgentId::new();
233
234 for i in 0..5 {
235 storage
236 .store(&make_entry(agent, i, i as u32))
237 .await
238 .unwrap();
239 }
240
241 let entries = storage.read_from(&agent, 3).await.unwrap();
242 assert_eq!(entries.len(), 2);
243 assert_eq!(entries[0].sequence, 3);
244 assert_eq!(entries[1].sequence, 4);
245 }
246
247 #[tokio::test]
248 async fn test_memory_storage_latest_sequence() {
249 let storage = MemoryJournalStorage::new();
250 let agent = AgentId::new();
251
252 assert_eq!(storage.latest_sequence(&agent).await.unwrap(), 0);
253
254 storage.store(&make_entry(agent, 0, 0)).await.unwrap();
255 storage.store(&make_entry(agent, 5, 2)).await.unwrap();
256
257 assert_eq!(storage.latest_sequence(&agent).await.unwrap(), 5);
258 }
259
260 #[tokio::test]
261 async fn test_memory_storage_compact() {
262 let storage = MemoryJournalStorage::new();
263 let agent = AgentId::new();
264
265 storage.store(&make_entry(agent, 0, 0)).await.unwrap();
266 storage.store(&make_entry(agent, 1, 1)).await.unwrap();
267
268 let removed = storage.compact(&agent).await.unwrap();
269 assert_eq!(removed, 2);
270
271 let entries = storage.read_entries(&agent).await.unwrap();
272 assert!(entries.is_empty());
273 }
274
275 #[tokio::test]
276 async fn test_memory_storage_agent_isolation() {
277 let storage = MemoryJournalStorage::new();
278 let agent_a = AgentId::new();
279 let agent_b = AgentId::new();
280
281 storage.store(&make_entry(agent_a, 0, 0)).await.unwrap();
282 storage.store(&make_entry(agent_b, 0, 0)).await.unwrap();
283 storage.store(&make_entry(agent_a, 1, 1)).await.unwrap();
284
285 assert_eq!(storage.read_entries(&agent_a).await.unwrap().len(), 2);
286 assert_eq!(storage.read_entries(&agent_b).await.unwrap().len(), 1);
287
288 storage.compact(&agent_a).await.unwrap();
290 assert_eq!(storage.read_entries(&agent_a).await.unwrap().len(), 0);
291 assert_eq!(storage.read_entries(&agent_b).await.unwrap().len(), 1);
292 }
293
294 #[tokio::test]
295 async fn test_durable_journal_append_and_replay() {
296 let storage = Arc::new(MemoryJournalStorage::new());
297 let agent = AgentId::new();
298 let journal = DurableJournal::new(storage, agent);
299
300 journal.append(make_entry(agent, 0, 0)).await.unwrap();
301 journal.append(make_entry(agent, 0, 1)).await.unwrap();
302
303 assert_eq!(journal.next_sequence().await, 2);
304
305 let entries = journal.replay().await.unwrap();
306 assert_eq!(entries.len(), 2);
307 assert_eq!(entries[0].sequence, 0);
309 assert_eq!(entries[1].sequence, 1);
310 }
311
312 #[tokio::test]
313 async fn test_durable_journal_replay_from() {
314 let storage = Arc::new(MemoryJournalStorage::new());
315 let agent = AgentId::new();
316 let journal = DurableJournal::new(storage, agent);
317
318 for _ in 0..5 {
319 journal.append(make_entry(agent, 0, 0)).await.unwrap();
320 }
321
322 let entries = journal.replay_from(3).await.unwrap();
323 assert_eq!(entries.len(), 2);
324 }
325
326 #[tokio::test]
327 async fn test_durable_journal_initialize_resumes_sequence() {
328 let storage = Arc::new(MemoryJournalStorage::new());
329 let agent = AgentId::new();
330
331 for i in 0..3 {
333 storage
334 .store(&make_entry(agent, i, i as u32))
335 .await
336 .unwrap();
337 }
338
339 let journal = DurableJournal::new(storage, agent);
341 journal.initialize().await.unwrap();
342 assert_eq!(journal.next_sequence().await, 2);
343
344 journal.append(make_entry(agent, 0, 3)).await.unwrap();
346 assert_eq!(journal.next_sequence().await, 3);
347 }
348
349 #[tokio::test]
350 async fn test_durable_journal_compact() {
351 let storage = Arc::new(MemoryJournalStorage::new());
352 let agent = AgentId::new();
353 let journal = DurableJournal::new(storage, agent);
354
355 journal.append(make_entry(agent, 0, 0)).await.unwrap();
356 journal.append(make_entry(agent, 0, 1)).await.unwrap();
357
358 let removed = journal.compact().await.unwrap();
359 assert_eq!(removed, 2);
360 assert_eq!(journal.next_sequence().await, 0);
361
362 let entries = journal.replay().await.unwrap();
363 assert!(entries.is_empty());
364 }
365
366 #[tokio::test]
367 async fn test_last_completed_iteration() {
368 let storage = Arc::new(MemoryJournalStorage::new());
369 let agent = AgentId::new();
370 let journal = DurableJournal::new(storage, agent);
371
372 assert_eq!(journal.last_completed_iteration().await.unwrap(), 0);
373
374 let mut entry = make_entry(agent, 0, 3);
375 journal.append(entry.clone()).await.unwrap();
376 entry.iteration = 7;
377 journal.append(entry).await.unwrap();
378
379 assert_eq!(journal.last_completed_iteration().await.unwrap(), 7);
380 }
381
382 #[tokio::test]
383 async fn test_export_entries() {
384 let storage = MemoryJournalStorage::new();
385 let agent = AgentId::new();
386
387 storage.store(&make_entry(agent, 0, 0)).await.unwrap();
388 storage.store(&make_entry(agent, 1, 1)).await.unwrap();
389
390 let json = export_entries(&storage, &agent).await.unwrap();
391 assert!(json.contains("sequence"));
392
393 let parsed: Vec<JournalEntry> = serde_json::from_str(&json).unwrap();
395 assert_eq!(parsed.len(), 2);
396 }
397
398 #[tokio::test]
399 async fn test_import_entries() {
400 let storage = MemoryJournalStorage::new();
401 let agent = AgentId::new();
402
403 storage.store(&make_entry(agent, 0, 0)).await.unwrap();
405 storage.store(&make_entry(agent, 1, 1)).await.unwrap();
406
407 let json = export_entries(&storage, &agent).await.unwrap();
408
409 let fresh_storage = MemoryJournalStorage::new();
410 let count = import_entries(&fresh_storage, &json).await.unwrap();
411 assert_eq!(count, 2);
412
413 let entries = fresh_storage.read_entries(&agent).await.unwrap();
414 assert_eq!(entries.len(), 2);
415 }
416
417 #[tokio::test]
418 async fn test_import_invalid_json() {
419 let storage = MemoryJournalStorage::new();
420 let result = import_entries(&storage, "not valid json").await;
421 assert!(result.is_err());
422 }
423}