use crate::reasoning::loop_types::{JournalEntry, JournalError, JournalWriter};
use crate::types::AgentId;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::sync::Mutex;
#[async_trait::async_trait]
pub trait JournalStorage: Send + Sync {
async fn store(&self, entry: &JournalEntry) -> Result<(), JournalError>;
async fn read_entries(&self, agent_id: &AgentId) -> Result<Vec<JournalEntry>, JournalError>;
async fn read_from(
&self,
agent_id: &AgentId,
from_sequence: u64,
) -> Result<Vec<JournalEntry>, JournalError>;
async fn latest_sequence(&self, agent_id: &AgentId) -> Result<u64, JournalError>;
async fn compact(&self, agent_id: &AgentId) -> Result<u64, JournalError>;
}
pub struct MemoryJournalStorage {
entries: Mutex<Vec<JournalEntry>>,
}
impl Default for MemoryJournalStorage {
fn default() -> Self {
Self::new()
}
}
impl MemoryJournalStorage {
pub fn new() -> Self {
Self {
entries: Mutex::new(Vec::new()),
}
}
}
#[async_trait::async_trait]
impl JournalStorage for MemoryJournalStorage {
async fn store(&self, entry: &JournalEntry) -> Result<(), JournalError> {
self.entries.lock().await.push(entry.clone());
Ok(())
}
async fn read_entries(&self, agent_id: &AgentId) -> Result<Vec<JournalEntry>, JournalError> {
let entries = self.entries.lock().await;
Ok(entries
.iter()
.filter(|e| e.agent_id == *agent_id)
.cloned()
.collect())
}
async fn read_from(
&self,
agent_id: &AgentId,
from_sequence: u64,
) -> Result<Vec<JournalEntry>, JournalError> {
let entries = self.entries.lock().await;
Ok(entries
.iter()
.filter(|e| e.agent_id == *agent_id && e.sequence >= from_sequence)
.cloned()
.collect())
}
async fn latest_sequence(&self, agent_id: &AgentId) -> Result<u64, JournalError> {
let entries = self.entries.lock().await;
Ok(entries
.iter()
.filter(|e| e.agent_id == *agent_id)
.map(|e| e.sequence)
.max()
.unwrap_or(0))
}
async fn compact(&self, agent_id: &AgentId) -> Result<u64, JournalError> {
let mut entries = self.entries.lock().await;
let before = entries.len();
entries.retain(|e| e.agent_id != *agent_id);
Ok((before - entries.len()) as u64)
}
}
pub struct DurableJournal {
storage: Arc<dyn JournalStorage>,
sequence: AtomicU64,
agent_id: AgentId,
}
impl DurableJournal {
pub fn new(storage: Arc<dyn JournalStorage>, agent_id: AgentId) -> Self {
Self {
storage,
sequence: AtomicU64::new(0),
agent_id,
}
}
pub async fn initialize(&self) -> Result<(), JournalError> {
let latest = self.storage.latest_sequence(&self.agent_id).await?;
self.sequence.store(latest, Ordering::SeqCst);
Ok(())
}
pub async fn replay(&self) -> Result<Vec<JournalEntry>, JournalError> {
self.storage.read_entries(&self.agent_id).await
}
pub async fn replay_from(&self, from_sequence: u64) -> Result<Vec<JournalEntry>, JournalError> {
self.storage.read_from(&self.agent_id, from_sequence).await
}
pub async fn compact(&self) -> Result<u64, JournalError> {
let removed = self.storage.compact(&self.agent_id).await?;
self.sequence.store(0, Ordering::SeqCst);
Ok(removed)
}
pub async fn last_completed_iteration(&self) -> Result<u32, JournalError> {
let entries = self.storage.read_entries(&self.agent_id).await?;
Ok(entries.iter().map(|e| e.iteration).max().unwrap_or(0))
}
}
#[async_trait::async_trait]
impl JournalWriter for DurableJournal {
async fn append(&self, mut entry: JournalEntry) -> Result<(), JournalError> {
let seq = self.sequence.fetch_add(1, Ordering::SeqCst);
entry.sequence = seq;
entry.agent_id = self.agent_id;
self.storage.store(&entry).await
}
async fn next_sequence(&self) -> u64 {
self.sequence.load(Ordering::SeqCst)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::reasoning::loop_types::{LoopConfig, LoopEvent};
fn make_entry(agent_id: AgentId, sequence: u64, iteration: u32) -> JournalEntry {
JournalEntry {
sequence,
timestamp: chrono::Utc::now(),
agent_id,
iteration,
event: LoopEvent::Started {
agent_id,
config: Box::new(LoopConfig::default()),
},
}
}
#[tokio::test]
async fn test_memory_storage_store_and_read() {
let storage = MemoryJournalStorage::new();
let agent = AgentId::new();
storage.store(&make_entry(agent, 0, 0)).await.unwrap();
storage.store(&make_entry(agent, 1, 1)).await.unwrap();
let entries = storage.read_entries(&agent).await.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[1].sequence, 1);
}
#[tokio::test]
async fn test_memory_storage_read_from() {
let storage = MemoryJournalStorage::new();
let agent = AgentId::new();
for i in 0..5 {
storage
.store(&make_entry(agent, i, i as u32))
.await
.unwrap();
}
let entries = storage.read_from(&agent, 3).await.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 3);
assert_eq!(entries[1].sequence, 4);
}
#[tokio::test]
async fn test_memory_storage_latest_sequence() {
let storage = MemoryJournalStorage::new();
let agent = AgentId::new();
assert_eq!(storage.latest_sequence(&agent).await.unwrap(), 0);
storage.store(&make_entry(agent, 0, 0)).await.unwrap();
storage.store(&make_entry(agent, 5, 2)).await.unwrap();
assert_eq!(storage.latest_sequence(&agent).await.unwrap(), 5);
}
#[tokio::test]
async fn test_memory_storage_compact() {
let storage = MemoryJournalStorage::new();
let agent = AgentId::new();
storage.store(&make_entry(agent, 0, 0)).await.unwrap();
storage.store(&make_entry(agent, 1, 1)).await.unwrap();
let removed = storage.compact(&agent).await.unwrap();
assert_eq!(removed, 2);
let entries = storage.read_entries(&agent).await.unwrap();
assert!(entries.is_empty());
}
#[tokio::test]
async fn test_memory_storage_agent_isolation() {
let storage = MemoryJournalStorage::new();
let agent_a = AgentId::new();
let agent_b = AgentId::new();
storage.store(&make_entry(agent_a, 0, 0)).await.unwrap();
storage.store(&make_entry(agent_b, 0, 0)).await.unwrap();
storage.store(&make_entry(agent_a, 1, 1)).await.unwrap();
assert_eq!(storage.read_entries(&agent_a).await.unwrap().len(), 2);
assert_eq!(storage.read_entries(&agent_b).await.unwrap().len(), 1);
storage.compact(&agent_a).await.unwrap();
assert_eq!(storage.read_entries(&agent_a).await.unwrap().len(), 0);
assert_eq!(storage.read_entries(&agent_b).await.unwrap().len(), 1);
}
#[tokio::test]
async fn test_durable_journal_append_and_replay() {
let storage = Arc::new(MemoryJournalStorage::new());
let agent = AgentId::new();
let journal = DurableJournal::new(storage, agent);
journal.append(make_entry(agent, 0, 0)).await.unwrap();
journal.append(make_entry(agent, 0, 1)).await.unwrap();
assert_eq!(journal.next_sequence().await, 2);
let entries = journal.replay().await.unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].sequence, 0);
assert_eq!(entries[1].sequence, 1);
}
#[tokio::test]
async fn test_durable_journal_replay_from() {
let storage = Arc::new(MemoryJournalStorage::new());
let agent = AgentId::new();
let journal = DurableJournal::new(storage, agent);
for _ in 0..5 {
journal.append(make_entry(agent, 0, 0)).await.unwrap();
}
let entries = journal.replay_from(3).await.unwrap();
assert_eq!(entries.len(), 2);
}
#[tokio::test]
async fn test_durable_journal_initialize_resumes_sequence() {
let storage = Arc::new(MemoryJournalStorage::new());
let agent = AgentId::new();
for i in 0..3 {
storage
.store(&make_entry(agent, i, i as u32))
.await
.unwrap();
}
let journal = DurableJournal::new(storage, agent);
journal.initialize().await.unwrap();
assert_eq!(journal.next_sequence().await, 2);
journal.append(make_entry(agent, 0, 3)).await.unwrap();
assert_eq!(journal.next_sequence().await, 3);
}
#[tokio::test]
async fn test_durable_journal_compact() {
let storage = Arc::new(MemoryJournalStorage::new());
let agent = AgentId::new();
let journal = DurableJournal::new(storage, agent);
journal.append(make_entry(agent, 0, 0)).await.unwrap();
journal.append(make_entry(agent, 0, 1)).await.unwrap();
let removed = journal.compact().await.unwrap();
assert_eq!(removed, 2);
assert_eq!(journal.next_sequence().await, 0);
let entries = journal.replay().await.unwrap();
assert!(entries.is_empty());
}
#[tokio::test]
async fn test_last_completed_iteration() {
let storage = Arc::new(MemoryJournalStorage::new());
let agent = AgentId::new();
let journal = DurableJournal::new(storage, agent);
assert_eq!(journal.last_completed_iteration().await.unwrap(), 0);
let mut entry = make_entry(agent, 0, 3);
journal.append(entry.clone()).await.unwrap();
entry.iteration = 7;
journal.append(entry).await.unwrap();
assert_eq!(journal.last_completed_iteration().await.unwrap(), 7);
}
}