use crate::core::ChangeLog;
use crate::error::{NeomemxError, Result};
use crate::storage::history::HistoryRecord;
use crate::storage::history::HistoryStore;
use async_trait::async_trait;
use parking_lot::Mutex;
use rusqlite::{params, Connection};
use std::path::Path;
use tracing::{debug, info};
#[derive(Default)]
pub struct HistoryBuilder<'a> {
pub memory_id: &'a str,
pub old_memory: Option<&'a str>,
pub new_memory: Option<&'a str>,
pub event: &'a str,
pub created_at: Option<&'a str>,
pub updated_at: Option<&'a str>,
pub is_deleted: bool,
pub actor_id: Option<&'a str>,
pub role: Option<&'a str>,
pub user_id: Option<&'a str>,
pub agent_id: Option<&'a str>,
pub session_id: Option<&'a str>,
}
impl<'a> HistoryBuilder<'a> {
pub fn new(memory_id: &'a str, event: &'a str) -> Self {
Self {
memory_id,
event,
..Default::default()
}
}
pub fn old_memory(mut self, val: Option<&'a str>) -> Self {
self.old_memory = val;
self
}
pub fn new_memory(mut self, val: Option<&'a str>) -> Self {
self.new_memory = val;
self
}
pub fn created_at(mut self, val: Option<&'a str>) -> Self {
self.created_at = val;
self
}
pub fn actor_id(mut self, val: Option<&'a str>) -> Self {
self.actor_id = val;
self
}
pub fn user_id(mut self, val: Option<&'a str>) -> Self {
self.user_id = val;
self
}
pub fn agent_id(mut self, val: Option<&'a str>) -> Self {
self.agent_id = val;
self
}
pub fn session_id(mut self, val: Option<&'a str>) -> Self {
self.session_id = val;
self
}
}
pub struct SqliteManager {
connection: Mutex<Connection>,
db_path: String,
}
impl SqliteManager {
pub fn new<P: AsRef<Path>>(db_path: P) -> Result<Self> {
let db_path_str = db_path.as_ref().to_string_lossy().to_string();
if db_path_str != ":memory:" {
if let Some(parent) = db_path.as_ref().parent() {
std::fs::create_dir_all(parent)?;
}
}
let connection = Connection::open(db_path.as_ref())
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to open database: {}", e)))?;
if let Err(e) = connection.pragma_update(None, "journal_mode", "WAL") {
if db_path_str != ":memory:" {
tracing::warn!("Failed to enable WAL mode: {}", e);
}
}
if let Err(e) = connection.pragma_update(None, "synchronous", "NORMAL") {
tracing::warn!("Failed to set synchronous mode: {}", e);
}
let cache_size_kb = std::env::var("NEOMEMX_SQLITE_CACHE_SIZE_KB")
.ok()
.and_then(|s| s.parse::<i32>().ok())
.unwrap_or(-64000); if let Err(e) = connection.pragma_update(None, "cache_size", &cache_size_kb.to_string()) {
tracing::warn!("Failed to set cache size: {}", e);
}
let manager = Self {
connection: Mutex::new(connection),
db_path: db_path_str,
};
manager.create_history_table()?;
info!("SQLite manager initialized at: {}", manager.db_path);
Ok(manager)
}
fn create_history_table(&self) -> Result<()> {
let conn = self.connection.lock();
conn.execute(
"CREATE TABLE IF NOT EXISTS history (
id TEXT PRIMARY KEY,
memory_id TEXT NOT NULL,
old_memory TEXT,
new_memory TEXT,
event TEXT NOT NULL,
created_at TEXT,
updated_at TEXT,
is_deleted INTEGER DEFAULT 0,
actor_id TEXT,
role TEXT,
user_id TEXT,
agent_id TEXT,
session_id TEXT
)",
[],
)
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to create table: {}", e)))?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_history_memory_id ON history(memory_id)",
[],
)
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to create index: {}", e)))?;
debug!("History table created/verified");
Ok(())
}
pub fn add(&self, builder: HistoryBuilder<'_>) -> Result<String> {
let conn = self.connection.lock();
let id = uuid::Uuid::new_v4().to_string();
conn.execute(
"INSERT INTO history (id, memory_id, old_memory, new_memory, event,
created_at, updated_at, is_deleted, actor_id, role, user_id, agent_id, session_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
params![
id,
builder.memory_id,
builder.old_memory,
builder.new_memory,
builder.event,
builder.created_at,
builder.updated_at,
builder.is_deleted as i32,
builder.actor_id,
builder.role,
builder.user_id,
builder.agent_id,
builder.session_id,
],
)
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to insert history: {}", e)))?;
debug!(
"Added history record {} for memory {}",
id, builder.memory_id
);
Ok(id)
}
pub fn get_history_records(&self, memory_id: &str) -> Result<Vec<HistoryRecord>> {
let conn = self.connection.lock();
let mut stmt = conn
.prepare_cached(
"SELECT id, memory_id, old_memory, new_memory, event,
created_at, updated_at, is_deleted, actor_id, role,
user_id, agent_id, session_id
FROM history WHERE memory_id = ?1
ORDER BY created_at ASC, updated_at ASC",
)
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to prepare query: {}", e)))?;
let rows = stmt
.query_map(params![memory_id], |row| {
Ok(HistoryRecord {
id: row.get(0)?,
memory_id: row.get(1)?,
old_memory: row.get::<_, Option<String>>(2)?,
new_memory: row.get::<_, Option<String>>(3)?,
event: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
is_deleted: row.get::<_, i32>(7)? != 0,
actor_id: row.get(8)?,
role: row.get(9)?,
user_id: row.get(10)?,
agent_id: row.get(11)?,
session_id: row.get(12)?,
})
})
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to query history: {}", e)))?;
rows.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to read row: {}", e)))
}
pub fn reset(&self) -> Result<()> {
{
let conn = self.connection.lock();
conn.execute("DROP TABLE IF EXISTS history", [])
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to drop table: {}", e)))?;
}
self.create_history_table()?;
info!("History table reset");
Ok(())
}
}
#[async_trait]
impl HistoryStore for SqliteManager {
async fn record_change(&self, change: ChangeLog) -> Result<()> {
let event = match change.change_type {
crate::core::ChangeType::Created => "ADD",
crate::core::ChangeType::Updated => "UPDATE",
crate::core::ChangeType::Deleted => "DELETE",
crate::core::ChangeType::Consolidated => "UPDATE",
};
let created_at_str = change.timestamp.to_rfc3339();
let mut builder =
HistoryBuilder::new(&change.fact_id, event).created_at(Some(&created_at_str));
if let Some(ref prev) = change.previous_content {
builder = builder.old_memory(Some(prev));
}
if let Some(ref new) = change.new_content {
builder = builder.new_memory(Some(new));
}
if let Some(ref actor) = change.actor {
builder = builder.actor_id(Some(actor));
}
if let Some(ref user) = change.scope.user {
builder = builder.user_id(Some(user));
}
if let Some(ref agent) = change.scope.agent {
builder = builder.agent_id(Some(agent));
}
if let Some(ref session) = change.scope.session {
builder = builder.session_id(Some(session));
}
self.add(builder)?;
Ok(())
}
async fn get_history(&self, fact_id: &crate::core::FactId) -> Result<Vec<ChangeLog>> {
let records = self.get_history_records(fact_id)?;
let mut changes = Vec::with_capacity(records.len());
for record in records {
let change_type = match record.event.as_str() {
"ADD" => crate::core::ChangeType::Created,
"UPDATE" => crate::core::ChangeType::Updated,
"DELETE" => crate::core::ChangeType::Deleted,
_ => crate::core::ChangeType::Updated,
};
let timestamp =
chrono::DateTime::parse_from_rfc3339(&record.created_at.unwrap_or_default())
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
let scope = crate::core::ScopeIdentifiers {
user: record.user_id.clone(),
agent: record.agent_id.clone(),
session: record.session_id.clone(),
};
let change = ChangeLog {
fact_id: record.memory_id.clone(),
change_type,
timestamp,
previous_content: record.old_memory.clone(),
new_content: record.new_memory.clone(),
scope,
actor: record.actor_id.clone(),
};
changes.push(change);
}
Ok(changes)
}
async fn delete_history(&self, fact_id: &crate::core::FactId) -> Result<()> {
let conn = self.connection.lock();
conn.execute("DELETE FROM history WHERE memory_id = ?1", params![fact_id])
.map_err(|e| NeomemxError::DatabaseError(format!("Failed to delete history: {}", e)))?;
Ok(())
}
async fn clear_all(&self) -> Result<()> {
self.reset()
}
}