car-memgine 0.15.0

Memgine — graph-based memory engine for Common Agent Runtime
Documentation
//! Persistent conversation storage with bounded write-through.
//!
//! Writes conversation turns to a JSONL file as they are ingested,
//! caps at a configurable maximum (default 200), and loads on init.
//! Enables conversation resumption across sessions.

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::io::{BufRead, Write};
use std::path::{Path, PathBuf};

/// A persisted conversation turn.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct StoredTurn {
    pub speaker: String,
    pub text: String,
    pub timestamp: DateTime<Utc>,
}

/// Bounded JSONL conversation store.
///
/// Appends turns on write, caps the file at `max_turns` by rewriting
/// when the count exceeds `max_turns * 1.5` (amortized compaction).
pub struct ConversationStore {
    path: PathBuf,
    max_turns: usize,
    turn_count: usize,
}

impl ConversationStore {
    /// Create a new store at the given path.
    pub fn new(path: PathBuf, max_turns: usize) -> Self {
        let turn_count = Self::count_lines(&path).unwrap_or(0);
        Self {
            path,
            max_turns,
            turn_count,
        }
    }

    /// Append a turn to the store. Compacts if over the threshold.
    pub fn append(&mut self, turn: &StoredTurn) -> Result<(), std::io::Error> {
        // Ensure parent directory exists
        if let Some(parent) = self.path.parent() {
            std::fs::create_dir_all(parent)?;
        }

        let mut file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(&self.path)?;

        let line = serde_json::to_string(turn)
            .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
        writeln!(file, "{}", line)?;
        self.turn_count += 1;

        // Compact when we exceed 1.5x max_turns (amortized)
        if self.turn_count > self.max_turns + self.max_turns / 2 {
            self.compact()?;
        }

        Ok(())
    }

    /// Load the most recent `limit` turns from the store.
    pub fn load(&self, limit: usize) -> Result<Vec<StoredTurn>, std::io::Error> {
        self.load_from_path(&self.path, limit)
    }

    /// Load all turns from the store.
    pub fn load_all(&self) -> Result<Vec<StoredTurn>, std::io::Error> {
        self.load_from_path(&self.path, usize::MAX)
    }

    /// Number of turns currently in the store.
    pub fn len(&self) -> usize {
        self.turn_count
    }

    pub fn is_empty(&self) -> bool {
        self.turn_count == 0
    }

    /// Path to the store file.
    pub fn path(&self) -> &Path {
        &self.path
    }

    /// Compact the store to keep only the most recent `max_turns`.
    fn compact(&mut self) -> Result<(), std::io::Error> {
        let all = self.load_from_path(&self.path, usize::MAX)?;
        let keep = if all.len() > self.max_turns {
            &all[all.len() - self.max_turns..]
        } else {
            &all
        };

        // Write to temp file then rename (atomic on most filesystems)
        let tmp_path = self.path.with_extension("jsonl.tmp");
        {
            let mut file = std::fs::File::create(&tmp_path)?;
            for turn in keep {
                let line = serde_json::to_string(turn)
                    .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
                writeln!(file, "{}", line)?;
            }
        }
        std::fs::rename(&tmp_path, &self.path)?;
        self.turn_count = keep.len();
        Ok(())
    }

    fn load_from_path(&self, path: &Path, limit: usize) -> Result<Vec<StoredTurn>, std::io::Error> {
        if !path.exists() {
            return Ok(Vec::new());
        }

        let file = std::fs::File::open(path)?;
        let reader = std::io::BufReader::new(file);
        let mut turns: Vec<StoredTurn> = Vec::new();

        for line in reader.lines() {
            let line = line?;
            if line.trim().is_empty() {
                continue;
            }
            if let Ok(turn) = serde_json::from_str::<StoredTurn>(&line) {
                turns.push(turn);
            }
        }

        // Return only the most recent `limit` turns
        if turns.len() > limit {
            turns = turns.split_off(turns.len() - limit);
        }

        Ok(turns)
    }

    fn count_lines(path: &Path) -> Result<usize, std::io::Error> {
        if !path.exists() {
            return Ok(0);
        }
        let file = std::fs::File::open(path)?;
        let reader = std::io::BufReader::new(file);
        Ok(reader
            .lines()
            .filter(|l| l.as_ref().map_or(false, |s| !s.trim().is_empty()))
            .count())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::NamedTempFile;

    fn make_turn(speaker: &str, text: &str) -> StoredTurn {
        StoredTurn {
            speaker: speaker.to_string(),
            text: text.to_string(),
            timestamp: Utc::now(),
        }
    }

    #[test]
    fn append_and_load() {
        let tmp = NamedTempFile::new().unwrap();
        let path = tmp.path().to_path_buf();
        let mut store = ConversationStore::new(path, 200);

        store.append(&make_turn("user", "hello")).unwrap();
        store.append(&make_turn("assistant", "hi there")).unwrap();

        let turns = store.load_all().unwrap();
        assert_eq!(turns.len(), 2);
        assert_eq!(turns[0].speaker, "user");
        assert_eq!(turns[1].text, "hi there");
    }

    #[test]
    fn load_with_limit() {
        let tmp = NamedTempFile::new().unwrap();
        let path = tmp.path().to_path_buf();
        let mut store = ConversationStore::new(path, 200);

        for i in 0..10 {
            store
                .append(&make_turn("user", &format!("msg {}", i)))
                .unwrap();
        }

        let turns = store.load(3).unwrap();
        assert_eq!(turns.len(), 3);
        assert_eq!(turns[0].text, "msg 7"); // most recent 3
    }

    #[test]
    fn compaction_caps_at_max() {
        let tmp = NamedTempFile::new().unwrap();
        let path = tmp.path().to_path_buf();
        let mut store = ConversationStore::new(path.clone(), 5);

        // Write 8 turns (5 * 1.5 = 7.5, so 8th triggers compaction)
        for i in 0..8 {
            store
                .append(&make_turn("user", &format!("msg {}", i)))
                .unwrap();
        }

        assert_eq!(store.len(), 5); // compacted
        let turns = store.load_all().unwrap();
        assert_eq!(turns.len(), 5);
        assert_eq!(turns[0].text, "msg 3"); // oldest kept
        assert_eq!(turns[4].text, "msg 7"); // newest
    }

    #[test]
    fn empty_store() {
        let tmp = NamedTempFile::new().unwrap();
        let path = tmp.path().to_path_buf();
        let store = ConversationStore::new(path, 200);

        assert!(store.is_empty());
        assert_eq!(store.load_all().unwrap().len(), 0);
    }

    #[test]
    fn nonexistent_path() {
        let store =
            ConversationStore::new(PathBuf::from("/tmp/car_test_nonexistent_conv.jsonl"), 200);
        assert_eq!(store.load_all().unwrap().len(), 0);
    }
}