neomemx 0.1.2

A high-performance memory library for AI agents with semantic search
Documentation
use crate::core::ChangeLog;
use crate::error::{NeomemxError, Result};
use crate::storage::history::HistoryRecord;
use crate::storage::history::HistoryStore;
use async_trait::async_trait;
use parking_lot::Mutex;
use rusqlite::{params, Connection};
use std::path::Path;
use tracing::{debug, info};

/// Builder for creating history records (simplified for internal use)
#[derive(Default)]
pub struct HistoryBuilder<'a> {
    pub memory_id: &'a str,
    pub old_memory: Option<&'a str>,
    pub new_memory: Option<&'a str>,
    pub event: &'a str,
    pub created_at: Option<&'a str>,
    pub updated_at: Option<&'a str>,
    pub is_deleted: bool,
    pub actor_id: Option<&'a str>,
    pub role: Option<&'a str>,
    pub user_id: Option<&'a str>,
    pub agent_id: Option<&'a str>,
    pub session_id: Option<&'a str>,
}

impl<'a> HistoryBuilder<'a> {
    pub fn new(memory_id: &'a str, event: &'a str) -> Self {
        Self {
            memory_id,
            event,
            ..Default::default()
        }
    }

    pub fn old_memory(mut self, val: Option<&'a str>) -> Self {
        self.old_memory = val;
        self
    }
    pub fn new_memory(mut self, val: Option<&'a str>) -> Self {
        self.new_memory = val;
        self
    }
    pub fn created_at(mut self, val: Option<&'a str>) -> Self {
        self.created_at = val;
        self
    }
    pub fn actor_id(mut self, val: Option<&'a str>) -> Self {
        self.actor_id = val;
        self
    }
    pub fn user_id(mut self, val: Option<&'a str>) -> Self {
        self.user_id = val;
        self
    }
    pub fn agent_id(mut self, val: Option<&'a str>) -> Self {
        self.agent_id = val;
        self
    }
    pub fn session_id(mut self, val: Option<&'a str>) -> Self {
        self.session_id = val;
        self
    }
}

/// SQLite manager for storing memory history.
pub struct SqliteManager {
    connection: Mutex<Connection>,
    db_path: String,
}

impl SqliteManager {
    pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
        let db_path_str = db_path.as_ref().to_string_lossy().to_string();

        if db_path_str != ":memory:" {
            if let Some(parent) = db_path.as_ref().parent() {
                std::fs::create_dir_all(parent)?;
            }
        }

        let connection = Connection::open(db_path.as_ref())
            .map_err(|e| NeomemxError::DatabaseError(format!("Failed to open database: {}", e)))?;

        // Enable WAL mode for better concurrent read performance
        if let Err(e) = connection.pragma_update(None, "journal_mode", "WAL") {
            if db_path_str != ":memory:" {
                tracing::warn!("Failed to enable WAL mode: {}", e);
            }
        }

        // Optimize for speed
        if let Err(e) = connection.pragma_update(None, "synchronous", "NORMAL") {
            tracing::warn!("Failed to set synchronous mode: {}", e);
        }

        // Cache size: default 64MB, but can be configured via env var
        let cache_size_kb = std::env::var("NEOMEMX_SQLITE_CACHE_SIZE_KB")
            .ok()
            .and_then(|s| s.parse::<i32>().ok())
            .unwrap_or(-64000); // -64000 = 64MB
        if let Err(e) = connection.pragma_update(None, "cache_size", &cache_size_kb.to_string()) {
            tracing::warn!("Failed to set cache size: {}", e);
        }

        let manager = Self {
            connection: Mutex::new(connection),
            db_path: db_path_str,
        };

        manager.create_history_table()?;
        info!("SQLite manager initialized at: {}", manager.db_path);

        Ok(manager)
    }

    fn create_history_table(&self) -> Result<()> {
        let conn = self.connection.lock();

        conn.execute(
            "CREATE TABLE IF NOT EXISTS history (
                id           TEXT PRIMARY KEY,
                memory_id    TEXT NOT NULL,
                old_memory   TEXT,
                new_memory   TEXT,
                event        TEXT NOT NULL,
                created_at   TEXT,
                updated_at   TEXT,
                is_deleted   INTEGER DEFAULT 0,
                actor_id     TEXT,
                role         TEXT,
                user_id      TEXT,
                agent_id     TEXT,
                session_id   TEXT
            )",
            [],
        )
        .map_err(|e| NeomemxError::DatabaseError(format!("Failed to create table: {}", e)))?;

        conn.execute(
            "CREATE INDEX IF NOT EXISTS idx_history_memory_id ON history(memory_id)",
            [],
        )
        .map_err(|e| NeomemxError::DatabaseError(format!("Failed to create index: {}", e)))?;

        debug!("History table created/verified");
        Ok(())
    }

    pub fn add(&self, builder: HistoryBuilder<'_>) -> Result<String> {
        let conn = self.connection.lock();
        let id = uuid::Uuid::new_v4().to_string();

        conn.execute(
            "INSERT INTO history (id, memory_id, old_memory, new_memory, event,
                created_at, updated_at, is_deleted, actor_id, role, user_id, agent_id, session_id)
             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
            params![
                id,
                builder.memory_id,
                builder.old_memory,
                builder.new_memory,
                builder.event,
                builder.created_at,
                builder.updated_at,
                builder.is_deleted as i32,
                builder.actor_id,
                builder.role,
                builder.user_id,
                builder.agent_id,
                builder.session_id,
            ],
        )
        .map_err(|e| NeomemxError::DatabaseError(format!("Failed to insert history: {}", e)))?;

        debug!(
            "Added history record {} for memory {}",
            id, builder.memory_id
        );
        Ok(id)
    }

    /// Retrieves history records for a specific memory ID.
    pub fn get_history_records(&self, memory_id: &str) -> Result<Vec<HistoryRecord>> {
        let conn = self.connection.lock();

        let mut stmt = conn
            .prepare_cached(
                "SELECT id, memory_id, old_memory, new_memory, event,
                        created_at, updated_at, is_deleted, actor_id, role,
                        user_id, agent_id, session_id
                 FROM history WHERE memory_id = ?1
                 ORDER BY created_at ASC, updated_at ASC",
            )
            .map_err(|e| NeomemxError::DatabaseError(format!("Failed to prepare query: {}", e)))?;

        let rows = stmt
            .query_map(params![memory_id], |row| {
                Ok(HistoryRecord {
                    id: row.get(0)?,
                    memory_id: row.get(1)?,
                    old_memory: row.get::<_, Option<String>>(2)?,
                    new_memory: row.get::<_, Option<String>>(3)?,
                    event: row.get(4)?,
                    created_at: row.get(5)?,
                    updated_at: row.get(6)?,
                    is_deleted: row.get::<_, i32>(7)? != 0,
                    actor_id: row.get(8)?,
                    role: row.get(9)?,
                    user_id: row.get(10)?,
                    agent_id: row.get(11)?,
                    session_id: row.get(12)?,
                })
            })
            .map_err(|e| NeomemxError::DatabaseError(format!("Failed to query history: {}", e)))?;

        rows.collect::<std::result::Result<Vec<_>, _>>()
            .map_err(|e| NeomemxError::DatabaseError(format!("Failed to read row: {}", e)))
    }

    /// Resets the history database by dropping and recreating the table.
    pub fn reset(&self) -> Result<()> {
        {
            let conn = self.connection.lock();
            conn.execute("DROP TABLE IF EXISTS history", [])
                .map_err(|e| NeomemxError::DatabaseError(format!("Failed to drop table: {}", e)))?;
        }

        self.create_history_table()?;
        info!("History table reset");
        Ok(())
    }
}

#[async_trait]
impl HistoryStore for SqliteManager {
    async fn record_change(&self, change: ChangeLog) -> Result<()> {
        let event = match change.change_type {
            crate::core::ChangeType::Created => "ADD",
            crate::core::ChangeType::Updated => "UPDATE",
            crate::core::ChangeType::Deleted => "DELETE",
            crate::core::ChangeType::Consolidated => "UPDATE",
        };

        let created_at_str = change.timestamp.to_rfc3339();
        let mut builder =
            HistoryBuilder::new(&change.fact_id, event).created_at(Some(&created_at_str));

        if let Some(ref prev) = change.previous_content {
            builder = builder.old_memory(Some(prev));
        }

        if let Some(ref new) = change.new_content {
            builder = builder.new_memory(Some(new));
        }

        if let Some(ref actor) = change.actor {
            builder = builder.actor_id(Some(actor));
        }

        // Store scope information
        if let Some(ref user) = change.scope.user {
            builder = builder.user_id(Some(user));
        }
        if let Some(ref agent) = change.scope.agent {
            builder = builder.agent_id(Some(agent));
        }
        if let Some(ref session) = change.scope.session {
            builder = builder.session_id(Some(session));
        }

        self.add(builder)?;
        Ok(())
    }

    async fn get_history(&self, fact_id: &crate::core::FactId) -> Result<Vec<ChangeLog>> {
        let records = self.get_history_records(fact_id)?;

        let mut changes = Vec::with_capacity(records.len());
        for record in records {
            let change_type = match record.event.as_str() {
                "ADD" => crate::core::ChangeType::Created,
                "UPDATE" => crate::core::ChangeType::Updated,
                "DELETE" => crate::core::ChangeType::Deleted,
                _ => crate::core::ChangeType::Updated,
            };

            let timestamp =
                chrono::DateTime::parse_from_rfc3339(&record.created_at.unwrap_or_default())
                    .map(|dt| dt.with_timezone(&chrono::Utc))
                    .unwrap_or_else(|_| chrono::Utc::now());

            // Restore scope from history
            let scope = crate::core::ScopeIdentifiers {
                user: record.user_id.clone(),
                agent: record.agent_id.clone(),
                session: record.session_id.clone(),
            };

            let change = ChangeLog {
                fact_id: record.memory_id.clone(),
                change_type,
                timestamp,
                previous_content: record.old_memory.clone(),
                new_content: record.new_memory.clone(),
                scope,
                actor: record.actor_id.clone(),
            };

            changes.push(change);
        }

        Ok(changes)
    }

    async fn delete_history(&self, fact_id: &crate::core::FactId) -> Result<()> {
        let conn = self.connection.lock();
        conn.execute("DELETE FROM history WHERE memory_id = ?1", params![fact_id])
            .map_err(|e| NeomemxError::DatabaseError(format!("Failed to delete history: {}", e)))?;
        Ok(())
    }

    async fn clear_all(&self) -> Result<()> {
        self.reset()
    }
}