use crate::types::{
ClearFileIndexResult, GlobalMemoryRecord, GlobalMemorySearchHit, GlobalMemoryWriteResult,
MemoryChunk, MemoryConfig, MemoryError, MemoryResult, MemoryStats, MemoryTier,
ProjectMemoryStats, DEFAULT_EMBEDDING_DIMENSION,
};
use chrono::{DateTime, Utc};
use rusqlite::{ffi::sqlite3_auto_extension, params, Connection, OptionalExtension, Row};
use sqlite_vec::sqlite3_vec_init;
use std::collections::HashSet;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
type ProjectIndexStatusRow = (
Option<String>,
Option<i64>,
Option<i64>,
Option<i64>,
Option<i64>,
Option<i64>,
);
pub struct MemoryDatabase {
conn: Arc<Mutex<Connection>>,
db_path: std::path::PathBuf,
}
impl MemoryDatabase {
pub async fn new(db_path: &Path) -> MemoryResult<Self> {
unsafe {
sqlite3_auto_extension(Some(std::mem::transmute::<
*const (),
unsafe extern "C" fn(
*mut rusqlite::ffi::sqlite3,
*mut *mut i8,
*const rusqlite::ffi::sqlite3_api_routines,
) -> i32,
>(sqlite3_vec_init as *const ())));
}
let conn = Connection::open(db_path)?;
conn.busy_timeout(Duration::from_secs(10))?;
conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
conn.execute("PRAGMA synchronous = NORMAL", [])?;
let db = Self {
conn: Arc::new(Mutex::new(conn)),
db_path: db_path.to_path_buf(),
};
db.init_schema().await?;
if let Err(err) = db.validate_vector_tables().await {
match &err {
crate::types::MemoryError::Database(db_err)
if Self::is_vector_table_error(db_err) =>
{
tracing::warn!(
"Detected vector table corruption during startup ({}). Recreating vector tables.",
db_err
);
db.recreate_vector_tables().await?;
}
_ => return Err(err),
}
}
db.validate_integrity().await?;
Ok(db)
}
async fn validate_integrity(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
{
Ok(value) => value,
Err(err) => {
tracing::warn!(
"Skipping strict PRAGMA quick_check due to probe error: {}",
err
);
return Ok(());
}
};
if check.trim().eq_ignore_ascii_case("ok") {
return Ok(());
}
let lowered = check.to_lowercase();
if lowered.contains("malformed")
|| lowered.contains("corrupt")
|| lowered.contains("database disk image is malformed")
{
return Err(crate::types::MemoryError::InvalidConfig(format!(
"malformed database integrity check: {}",
check
)));
}
tracing::warn!(
"PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
check
);
Ok(())
}
async fn init_schema(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
conn.execute(
"CREATE TABLE IF NOT EXISTS session_memory_chunks (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
session_id TEXT NOT NULL,
project_id TEXT,
source TEXT NOT NULL,
created_at TEXT NOT NULL,
token_count INTEGER NOT NULL DEFAULT 0,
metadata TEXT
)",
[],
)?;
let session_existing_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(session_memory_chunks)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !session_existing_cols.contains("source_path") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_path TEXT",
[],
)?;
}
if !session_existing_cols.contains("source_mtime") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_mtime INTEGER",
[],
)?;
}
if !session_existing_cols.contains("source_size") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_size INTEGER",
[],
)?;
}
if !session_existing_cols.contains("source_hash") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_hash TEXT",
[],
)?;
}
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS project_memory_chunks (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
project_id TEXT NOT NULL,
session_id TEXT,
source TEXT NOT NULL,
created_at TEXT NOT NULL,
token_count INTEGER NOT NULL DEFAULT 0,
metadata TEXT
)",
[],
)?;
let existing_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !existing_cols.contains("source_path") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
[],
)?;
}
if !existing_cols.contains("source_mtime") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
[],
)?;
}
if !existing_cols.contains("source_size") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
[],
)?;
}
if !existing_cols.contains("source_hash") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
[],
)?;
}
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS project_file_index (
project_id TEXT NOT NULL,
path TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL,
PRIMARY KEY(project_id, path)
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS session_file_index (
session_id TEXT NOT NULL,
path TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL,
PRIMARY KEY(session_id, path)
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS project_index_status (
project_id TEXT PRIMARY KEY,
last_indexed_at TEXT,
last_total_files INTEGER,
last_processed_files INTEGER,
last_indexed_files INTEGER,
last_skipped_files INTEGER,
last_errors INTEGER
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS global_memory_chunks (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
source TEXT NOT NULL,
created_at TEXT NOT NULL,
token_count INTEGER NOT NULL DEFAULT 0,
metadata TEXT
)",
[],
)?;
let global_existing_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(global_memory_chunks)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !global_existing_cols.contains("source_path") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_path TEXT",
[],
)?;
}
if !global_existing_cols.contains("source_mtime") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_mtime INTEGER",
[],
)?;
}
if !global_existing_cols.contains("source_size") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_size INTEGER",
[],
)?;
}
if !global_existing_cols.contains("source_hash") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_hash TEXT",
[],
)?;
}
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_config (
project_id TEXT PRIMARY KEY,
max_chunks INTEGER NOT NULL DEFAULT 10000,
chunk_size INTEGER NOT NULL DEFAULT 512,
retrieval_k INTEGER NOT NULL DEFAULT 5,
auto_cleanup INTEGER NOT NULL DEFAULT 1,
session_retention_days INTEGER NOT NULL DEFAULT 30,
token_budget INTEGER NOT NULL DEFAULT 5000,
chunk_overlap INTEGER NOT NULL DEFAULT 64,
updated_at TEXT NOT NULL
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_cleanup_log (
id TEXT PRIMARY KEY,
cleanup_type TEXT NOT NULL,
tier TEXT NOT NULL,
project_id TEXT,
session_id TEXT,
chunks_deleted INTEGER NOT NULL DEFAULT 0,
bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_file_chunks ON session_memory_chunks(session_id, source, source_path)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_global_file_chunks ON global_memory_chunks(source, source_path)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS global_file_index (
path TEXT PRIMARY KEY,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL
)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_records (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
source_type TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
run_id TEXT NOT NULL,
session_id TEXT,
message_id TEXT,
tool_name TEXT,
project_tag TEXT,
channel_tag TEXT,
host_tag TEXT,
metadata TEXT,
provenance TEXT,
redaction_status TEXT NOT NULL,
redaction_count INTEGER NOT NULL DEFAULT 0,
visibility TEXT NOT NULL DEFAULT 'private',
demoted INTEGER NOT NULL DEFAULT 0,
score_boost REAL NOT NULL DEFAULT 0.0,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
expires_at_ms INTEGER
)",
[],
)?;
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
ON memory_records(user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
ON memory_records(user_id, created_at_ms DESC)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_records_run
ON memory_records(run_id)",
[],
)?;
conn.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
id UNINDEXED,
user_id UNINDEXED,
content
)",
[],
)?;
conn.execute(
"CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
END",
[],
)?;
conn.execute(
"CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
DELETE FROM memory_records_fts WHERE id = old.id;
END",
[],
)?;
conn.execute(
"CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
DELETE FROM memory_records_fts WHERE id = old.id;
INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
END",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_nodes (
id TEXT PRIMARY KEY,
uri TEXT NOT NULL UNIQUE,
parent_uri TEXT,
node_type TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
metadata TEXT
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_nodes_uri ON memory_nodes(uri)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_nodes_parent ON memory_nodes(parent_uri)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_layers (
id TEXT PRIMARY KEY,
node_id TEXT NOT NULL,
layer_type TEXT NOT NULL,
content TEXT NOT NULL,
token_count INTEGER NOT NULL,
embedding_id TEXT,
created_at TEXT NOT NULL,
source_chunk_id TEXT,
FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_layers_node ON memory_layers(node_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_layers_type ON memory_layers(layer_type)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_retrieval_state (
node_id TEXT PRIMARY KEY,
active_layer TEXT NOT NULL DEFAULT 'L0',
last_accessed TEXT,
access_count INTEGER DEFAULT 0,
FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
)",
[],
)?;
Ok(())
}
pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
for table in [
"session_memory_vectors",
"project_memory_vectors",
"global_memory_vectors",
] {
let sql = format!("SELECT COUNT(*) FROM {}", table);
let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
if row_count > 0 {
let probe_sql = format!(
"SELECT chunk_id, distance
FROM {}
WHERE embedding MATCH ?1 AND k = 1",
table
);
let mut stmt = conn.prepare(&probe_sql)?;
let mut rows = stmt.query(params![probe_embedding.as_str()])?;
let _ = rows.next()?;
}
}
Ok(())
}
fn is_vector_table_error(err: &rusqlite::Error) -> bool {
let text = err.to_string().to_lowercase();
text.contains("vector blob")
|| text.contains("chunks iter error")
|| text.contains("chunks iter")
|| text.contains("internal sqlite-vec error")
|| text.contains("insert rowids id")
|| text.contains("sql logic error")
|| text.contains("database disk image is malformed")
|| text.contains("session_memory_vectors")
|| text.contains("project_memory_vectors")
|| text.contains("global_memory_vectors")
|| text.contains("vec0")
}
async fn recreate_vector_tables(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
for base in [
"session_memory_vectors",
"project_memory_vectors",
"global_memory_vectors",
] {
for name in [
base.to_string(),
format!("{}_chunks", base),
format!("{}_info", base),
format!("{}_rowids", base),
format!("{}_vector_chunks00", base),
] {
let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
conn.execute(&sql, [])?;
}
let like_pattern = format!("{base}_%");
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
)?;
let table_names = stmt
.query_map(params![like_pattern], |row| row.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
drop(stmt);
for name in table_names {
let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
conn.execute(&sql, [])?;
}
}
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
Ok(())
}
pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
match self.validate_vector_tables().await {
Ok(()) => Ok(false),
Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
tracing::warn!(
"Memory vector tables appear corrupted ({}). Recreating vector tables.",
err
);
self.recreate_vector_tables().await?;
Ok(true)
}
Err(err) => Err(err),
}
}
pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
let table_names = {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master
WHERE type='table'
AND name NOT LIKE 'sqlite_%'
ORDER BY name",
)?;
let names = stmt
.query_map([], |row| row.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
names
};
{
let conn = self.conn.lock().await;
for table in table_names {
let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
let _ = conn.execute(&sql, []);
}
}
self.init_schema().await
}
pub async fn try_repair_after_error(
&self,
err: &crate::types::MemoryError,
) -> MemoryResult<bool> {
match err {
crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
tracing::warn!(
"Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
db_err
);
self.recreate_vector_tables().await?;
Ok(true)
}
_ => Ok(false),
}
}
pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let (chunks_table, vectors_table) = match chunk.tier {
MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
};
let created_at_str = chunk.created_at.to_rfc3339();
let metadata_str = chunk
.metadata
.as_ref()
.map(|m| m.to_string())
.unwrap_or_default();
match chunk.tier {
MemoryTier::Session => {
conn.execute(
&format!(
"INSERT INTO {} (
id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
chunks_table
),
params![
chunk.id,
chunk.content,
chunk.session_id.as_ref().unwrap_or(&String::new()),
chunk.project_id,
chunk.source,
created_at_str,
chunk.token_count,
metadata_str,
chunk.source_path.clone(),
chunk.source_mtime,
chunk.source_size,
chunk.source_hash.clone()
],
)?;
}
MemoryTier::Project => {
conn.execute(
&format!(
"INSERT INTO {} (
id, content, project_id, session_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
chunks_table
),
params![
chunk.id,
chunk.content,
chunk.project_id.as_ref().unwrap_or(&String::new()),
chunk.session_id,
chunk.source,
created_at_str,
chunk.token_count,
metadata_str,
chunk.source_path.clone(),
chunk.source_mtime,
chunk.source_size,
chunk.source_hash.clone()
],
)?;
}
MemoryTier::Global => {
conn.execute(
&format!(
"INSERT INTO {} (
id, content, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
chunks_table
),
params![
chunk.id,
chunk.content,
chunk.source,
created_at_str,
chunk.token_count,
metadata_str,
chunk.source_path.clone(),
chunk.source_mtime,
chunk.source_size,
chunk.source_hash.clone()
],
)?;
}
}
let embedding_json = format!(
"[{}]",
embedding
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(",")
);
conn.execute(
&format!(
"INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
vectors_table
),
params![chunk.id, embedding_json],
)?;
Ok(())
}
pub async fn search_similar(
&self,
query_embedding: &[f32],
tier: MemoryTier,
project_id: Option<&str>,
session_id: Option<&str>,
limit: i64,
) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
let conn = self.conn.lock().await;
let (chunks_table, vectors_table) = match tier {
MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
};
let embedding_json = format!(
"[{}]",
query_embedding
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(",")
);
let results = match tier {
MemoryTier::Session => {
if let Some(sid) = session_id {
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
v.distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE c.session_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
ORDER BY v.distance",
vectors_table, chunks_table
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(params![sid, embedding_json, limit], |row| {
Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
})?
.collect::<Result<Vec<_>, _>>()?;
results
} else if let Some(pid) = project_id {
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
v.distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
ORDER BY v.distance",
vectors_table, chunks_table
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(params![pid, embedding_json, limit], |row| {
Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
})?
.collect::<Result<Vec<_>, _>>()?;
results
} else {
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
v.distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE v.embedding MATCH ?1 AND k = ?2
ORDER BY v.distance",
vectors_table, chunks_table
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(params![embedding_json, limit], |row| {
Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
})?
.collect::<Result<Vec<_>, _>>()?;
results
}
}
MemoryTier::Project => {
if let Some(pid) = project_id {
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
v.distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE c.project_id = ?1 AND v.embedding MATCH ?2 AND k = ?3
ORDER BY v.distance",
vectors_table, chunks_table
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(params![pid, embedding_json, limit], |row| {
Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
})?
.collect::<Result<Vec<_>, _>>()?;
results
} else {
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
v.distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE v.embedding MATCH ?1 AND k = ?2
ORDER BY v.distance",
vectors_table, chunks_table
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(params![embedding_json, limit], |row| {
Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
})?
.collect::<Result<Vec<_>, _>>()?;
results
}
}
MemoryTier::Global => {
let sql = format!(
"SELECT c.id, c.content, NULL as session_id, NULL as project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
v.distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE v.embedding MATCH ?1 AND k = ?2
ORDER BY v.distance",
vectors_table, chunks_table
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(params![embedding_json, limit], |row| {
Ok((row_to_chunk(row, tier)?, row.get::<_, f64>(12)?))
})?
.collect::<Result<Vec<_>, _>>()?;
results
}
};
Ok(results)
}
pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash
FROM session_memory_chunks
WHERE session_id = ?1
ORDER BY created_at DESC",
)?;
let chunks = stmt
.query_map(params![session_id], |row| {
row_to_chunk(row, MemoryTier::Session)
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash
FROM project_memory_chunks
WHERE project_id = ?1
ORDER BY created_at DESC",
)?;
let chunks = stmt
.query_map(params![project_id], |row| {
row_to_chunk(row, MemoryTier::Project)
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, content, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash
FROM global_memory_chunks
ORDER BY created_at DESC
LIMIT ?1",
)?;
let chunks = stmt
.query_map(params![limit], |row| row_to_chunk(row, MemoryTier::Global))?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn global_chunk_exists_by_source_hash(
&self,
source_hash: &str,
) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let exists = conn
.query_row(
"SELECT 1 FROM global_memory_chunks WHERE source_hash = ?1 LIMIT 1",
params![source_hash],
|_row| Ok(()),
)
.optional()?
.is_some();
Ok(exists)
}
pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM session_memory_chunks WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM session_memory_vectors WHERE chunk_id IN
(SELECT id FROM session_memory_chunks WHERE session_id = ?1)",
params![session_id],
)?;
conn.execute(
"DELETE FROM session_memory_chunks WHERE session_id = ?1",
params![session_id],
)?;
Ok(count as u64)
}
pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
params![project_id],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM project_memory_vectors WHERE chunk_id IN
(SELECT id FROM project_memory_chunks WHERE project_id = ?1)",
params![project_id],
)?;
conn.execute(
"DELETE FROM project_memory_chunks WHERE project_id = ?1",
params![project_id],
)?;
Ok(count as u64)
}
pub async fn clear_global_memory_by_source_prefix(
&self,
source_prefix: &str,
) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let like = format!("{}%", source_prefix);
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM global_memory_chunks WHERE source LIKE ?1",
params![like],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM global_memory_vectors WHERE chunk_id IN
(SELECT id FROM global_memory_chunks WHERE source LIKE ?1)",
params![like],
)?;
conn.execute(
"DELETE FROM global_memory_chunks WHERE source LIKE ?1",
params![like],
)?;
Ok(count as u64)
}
pub async fn delete_chunk(
&self,
tier: MemoryTier,
chunk_id: &str,
project_id: Option<&str>,
session_id: Option<&str>,
) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let deleted = match tier {
MemoryTier::Session => {
let Some(session_id) = session_id else {
return Err(MemoryError::InvalidConfig(
"session_id is required to delete session memory chunks".to_string(),
));
};
conn.execute(
"DELETE FROM session_memory_vectors WHERE chunk_id IN
(SELECT id FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2)",
params![chunk_id, session_id],
)?;
conn.execute(
"DELETE FROM session_memory_chunks WHERE id = ?1 AND session_id = ?2",
params![chunk_id, session_id],
)?
}
MemoryTier::Project => {
let Some(project_id) = project_id else {
return Err(MemoryError::InvalidConfig(
"project_id is required to delete project memory chunks".to_string(),
));
};
conn.execute(
"DELETE FROM project_memory_vectors WHERE chunk_id IN
(SELECT id FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2)",
params![chunk_id, project_id],
)?;
conn.execute(
"DELETE FROM project_memory_chunks WHERE id = ?1 AND project_id = ?2",
params![chunk_id, project_id],
)?
}
MemoryTier::Global => {
conn.execute(
"DELETE FROM global_memory_vectors WHERE chunk_id IN
(SELECT id FROM global_memory_chunks WHERE id = ?1)",
params![chunk_id],
)?;
conn.execute(
"DELETE FROM global_memory_chunks WHERE id = ?1",
params![chunk_id],
)?
}
};
Ok(deleted as u64)
}
pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let cutoff = Utc::now() - chrono::Duration::days(retention_days);
let cutoff_str = cutoff.to_rfc3339();
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM session_memory_chunks WHERE created_at < ?1",
params![cutoff_str],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM session_memory_vectors WHERE chunk_id IN
(SELECT id FROM session_memory_chunks WHERE created_at < ?1)",
params![cutoff_str],
)?;
conn.execute(
"DELETE FROM session_memory_chunks WHERE created_at < ?1",
params![cutoff_str],
)?;
Ok(count as u64)
}
pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
let conn = self.conn.lock().await;
let result: Option<MemoryConfig> = conn
.query_row(
"SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
session_retention_days, token_budget, chunk_overlap
FROM memory_config WHERE project_id = ?1",
params![project_id],
|row| {
Ok(MemoryConfig {
max_chunks: row.get(0)?,
chunk_size: row.get(1)?,
retrieval_k: row.get(2)?,
auto_cleanup: row.get::<_, i64>(3)? != 0,
session_retention_days: row.get(4)?,
token_budget: row.get(5)?,
chunk_overlap: row.get(6)?,
})
},
)
.optional()?;
match result {
Some(config) => Ok(config),
None => {
let config = MemoryConfig::default();
let updated_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memory_config
(project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
session_retention_days, token_budget, chunk_overlap, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
project_id,
config.max_chunks,
config.chunk_size,
config.retrieval_k,
config.auto_cleanup as i64,
config.session_retention_days,
config.token_budget,
config.chunk_overlap,
updated_at
],
)?;
Ok(config)
}
}
}
pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let updated_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT OR REPLACE INTO memory_config
(project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
session_retention_days, token_budget, chunk_overlap, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
project_id,
config.max_chunks,
config.chunk_size,
config.retrieval_k,
config.auto_cleanup as i64,
config.session_retention_days,
config.token_budget,
config.chunk_overlap,
updated_at
],
)?;
Ok(())
}
pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
let conn = self.conn.lock().await;
let session_chunks: i64 =
conn.query_row("SELECT COUNT(*) FROM session_memory_chunks", [], |row| {
row.get(0)
})?;
let project_chunks: i64 =
conn.query_row("SELECT COUNT(*) FROM project_memory_chunks", [], |row| {
row.get(0)
})?;
let global_chunks: i64 =
conn.query_row("SELECT COUNT(*) FROM global_memory_chunks", [], |row| {
row.get(0)
})?;
let session_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks",
[],
|row| row.get(0),
)?;
let project_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks",
[],
|row| row.get(0),
)?;
let global_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks",
[],
|row| row.get(0),
)?;
let last_cleanup: Option<String> = conn
.query_row(
"SELECT created_at FROM memory_cleanup_log ORDER BY created_at DESC LIMIT 1",
[],
|row| row.get(0),
)
.optional()?;
let last_cleanup = last_cleanup.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
});
let file_size = std::fs::metadata(&self.db_path)?.len() as i64;
Ok(MemoryStats {
total_chunks: session_chunks + project_chunks + global_chunks,
session_chunks,
project_chunks,
global_chunks,
total_bytes: session_bytes + project_bytes + global_bytes,
session_bytes,
project_bytes,
global_bytes,
file_size,
last_cleanup,
})
}
pub async fn log_cleanup(
&self,
cleanup_type: &str,
tier: MemoryTier,
project_id: Option<&str>,
session_id: Option<&str>,
chunks_deleted: i64,
bytes_reclaimed: i64,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let id = uuid::Uuid::new_v4().to_string();
let created_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memory_cleanup_log
(id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
id,
cleanup_type,
tier.to_string(),
project_id,
session_id,
chunks_deleted,
bytes_reclaimed,
created_at
],
)?;
Ok(())
}
pub async fn vacuum(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
conn.execute("VACUUM", [])?;
Ok(())
}
pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
let conn = self.conn.lock().await;
let n: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
params![project_id],
|row| row.get(0),
)?;
Ok(n)
}
pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let exists: Option<i64> = conn
.query_row(
"SELECT 1 FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' LIMIT 1",
params![project_id],
|row| row.get(0),
)
.optional()?;
Ok(exists.is_some())
}
pub async fn get_file_index_entry(
&self,
project_id: &str,
path: &str,
) -> MemoryResult<Option<(i64, i64, String)>> {
let conn = self.conn.lock().await;
let row: Option<(i64, i64, String)> = conn
.query_row(
"SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
params![project_id, path],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.optional()?;
Ok(row)
}
pub async fn upsert_file_index_entry(
&self,
project_id: &str,
path: &str,
mtime: i64,
size: i64,
hash: &str,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let indexed_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(project_id, path) DO UPDATE SET
mtime = excluded.mtime,
size = excluded.size,
hash = excluded.hash,
indexed_at = excluded.indexed_at",
params![project_id, path, mtime, size, hash, indexed_at],
)?;
Ok(())
}
pub async fn delete_file_index_entry(&self, project_id: &str, path: &str) -> MemoryResult<()> {
let conn = self.conn.lock().await;
conn.execute(
"DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
params![project_id, path],
)?;
Ok(())
}
pub async fn list_file_index_paths(&self, project_id: &str) -> MemoryResult<Vec<String>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
Ok(rows.collect::<Result<Vec<_>, _>>()?)
}
pub async fn delete_project_file_chunks_by_path(
&self,
project_id: &str,
source_path: &str,
) -> MemoryResult<(i64, i64)> {
let conn = self.conn.lock().await;
let chunks_deleted: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks
WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
params![project_id, source_path],
|row| row.get(0),
)?;
let bytes_estimated: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
params![project_id, source_path],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM project_memory_vectors WHERE chunk_id IN
(SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
params![project_id, source_path],
)?;
conn.execute(
"DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
params![project_id, source_path],
)?;
Ok((chunks_deleted, bytes_estimated))
}
pub async fn get_import_index_entry(
&self,
tier: MemoryTier,
session_id: Option<&str>,
project_id: Option<&str>,
path: &str,
) -> MemoryResult<Option<(i64, i64, String)>> {
let conn = self.conn.lock().await;
let row = match tier {
MemoryTier::Session => {
let session_id = require_scope_id(tier, session_id)?;
conn.query_row(
"SELECT mtime, size, hash FROM session_file_index WHERE session_id = ?1 AND path = ?2",
params![session_id, path],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.optional()?
}
MemoryTier::Project => {
let project_id = require_scope_id(tier, project_id)?;
conn.query_row(
"SELECT mtime, size, hash FROM project_file_index WHERE project_id = ?1 AND path = ?2",
params![project_id, path],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.optional()?
}
MemoryTier::Global => conn
.query_row(
"SELECT mtime, size, hash FROM global_file_index WHERE path = ?1",
params![path],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.optional()?,
};
Ok(row)
}
pub async fn upsert_import_index_entry(
&self,
tier: MemoryTier,
session_id: Option<&str>,
project_id: Option<&str>,
path: &str,
mtime: i64,
size: i64,
hash: &str,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let indexed_at = Utc::now().to_rfc3339();
match tier {
MemoryTier::Session => {
let session_id = require_scope_id(tier, session_id)?;
conn.execute(
"INSERT INTO session_file_index (session_id, path, mtime, size, hash, indexed_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(session_id, path) DO UPDATE SET
mtime = excluded.mtime,
size = excluded.size,
hash = excluded.hash,
indexed_at = excluded.indexed_at",
params![session_id, path, mtime, size, hash, indexed_at],
)?;
}
MemoryTier::Project => {
let project_id = require_scope_id(tier, project_id)?;
conn.execute(
"INSERT INTO project_file_index (project_id, path, mtime, size, hash, indexed_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6)
ON CONFLICT(project_id, path) DO UPDATE SET
mtime = excluded.mtime,
size = excluded.size,
hash = excluded.hash,
indexed_at = excluded.indexed_at",
params![project_id, path, mtime, size, hash, indexed_at],
)?;
}
MemoryTier::Global => {
conn.execute(
"INSERT INTO global_file_index (path, mtime, size, hash, indexed_at)
VALUES (?1, ?2, ?3, ?4, ?5)
ON CONFLICT(path) DO UPDATE SET
mtime = excluded.mtime,
size = excluded.size,
hash = excluded.hash,
indexed_at = excluded.indexed_at",
params![path, mtime, size, hash, indexed_at],
)?;
}
}
Ok(())
}
pub async fn list_import_index_paths(
&self,
tier: MemoryTier,
session_id: Option<&str>,
project_id: Option<&str>,
) -> MemoryResult<Vec<String>> {
let conn = self.conn.lock().await;
let rows = match tier {
MemoryTier::Session => {
let session_id = require_scope_id(tier, session_id)?;
let mut stmt =
conn.prepare("SELECT path FROM session_file_index WHERE session_id = ?1")?;
let rows = stmt.query_map(params![session_id], |row| row.get::<_, String>(0))?;
rows.collect::<Result<Vec<_>, _>>()?
}
MemoryTier::Project => {
let project_id = require_scope_id(tier, project_id)?;
let mut stmt =
conn.prepare("SELECT path FROM project_file_index WHERE project_id = ?1")?;
let rows = stmt.query_map(params![project_id], |row| row.get::<_, String>(0))?;
rows.collect::<Result<Vec<_>, _>>()?
}
MemoryTier::Global => {
let mut stmt = conn.prepare("SELECT path FROM global_file_index")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect::<Result<Vec<_>, _>>()?
}
};
Ok(rows)
}
pub async fn delete_import_index_entry(
&self,
tier: MemoryTier,
session_id: Option<&str>,
project_id: Option<&str>,
path: &str,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
match tier {
MemoryTier::Session => {
let session_id = require_scope_id(tier, session_id)?;
conn.execute(
"DELETE FROM session_file_index WHERE session_id = ?1 AND path = ?2",
params![session_id, path],
)?;
}
MemoryTier::Project => {
let project_id = require_scope_id(tier, project_id)?;
conn.execute(
"DELETE FROM project_file_index WHERE project_id = ?1 AND path = ?2",
params![project_id, path],
)?;
}
MemoryTier::Global => {
conn.execute(
"DELETE FROM global_file_index WHERE path = ?1",
params![path],
)?;
}
}
Ok(())
}
pub async fn delete_file_chunks_by_path(
&self,
tier: MemoryTier,
session_id: Option<&str>,
project_id: Option<&str>,
source_path: &str,
) -> MemoryResult<(i64, i64)> {
let conn = self.conn.lock().await;
let result = match tier {
MemoryTier::Session => {
let session_id = require_scope_id(tier, session_id)?;
let chunks_deleted: i64 = conn.query_row(
"SELECT COUNT(*) FROM session_memory_chunks
WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
params![session_id, source_path],
|row| row.get(0),
)?;
let bytes_estimated: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks
WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
params![session_id, source_path],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM session_memory_vectors WHERE chunk_id IN
(SELECT id FROM session_memory_chunks WHERE session_id = ?1 AND source = 'file' AND source_path = ?2)",
params![session_id, source_path],
)?;
conn.execute(
"DELETE FROM session_memory_chunks
WHERE session_id = ?1 AND source = 'file' AND source_path = ?2",
params![session_id, source_path],
)?;
(chunks_deleted, bytes_estimated)
}
MemoryTier::Project => {
let project_id = require_scope_id(tier, project_id)?;
let chunks_deleted: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks
WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
params![project_id, source_path],
|row| row.get(0),
)?;
let bytes_estimated: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
params![project_id, source_path],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM project_memory_vectors WHERE chunk_id IN
(SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file' AND source_path = ?2)",
params![project_id, source_path],
)?;
conn.execute(
"DELETE FROM project_memory_chunks
WHERE project_id = ?1 AND source = 'file' AND source_path = ?2",
params![project_id, source_path],
)?;
(chunks_deleted, bytes_estimated)
}
MemoryTier::Global => {
let chunks_deleted: i64 = conn.query_row(
"SELECT COUNT(*) FROM global_memory_chunks
WHERE source = 'file' AND source_path = ?1",
params![source_path],
|row| row.get(0),
)?;
let bytes_estimated: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks
WHERE source = 'file' AND source_path = ?1",
params![source_path],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM global_memory_vectors WHERE chunk_id IN
(SELECT id FROM global_memory_chunks WHERE source = 'file' AND source_path = ?1)",
params![source_path],
)?;
conn.execute(
"DELETE FROM global_memory_chunks
WHERE source = 'file' AND source_path = ?1",
params![source_path],
)?;
(chunks_deleted, bytes_estimated)
}
};
Ok(result)
}
pub async fn upsert_project_index_status(
&self,
project_id: &str,
total_files: i64,
processed_files: i64,
indexed_files: i64,
skipped_files: i64,
errors: i64,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let last_indexed_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO project_index_status (
project_id, last_indexed_at, last_total_files, last_processed_files,
last_indexed_files, last_skipped_files, last_errors
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
ON CONFLICT(project_id) DO UPDATE SET
last_indexed_at = excluded.last_indexed_at,
last_total_files = excluded.last_total_files,
last_processed_files = excluded.last_processed_files,
last_indexed_files = excluded.last_indexed_files,
last_skipped_files = excluded.last_skipped_files,
last_errors = excluded.last_errors",
params![
project_id,
last_indexed_at,
total_files,
processed_files,
indexed_files,
skipped_files,
errors
],
)?;
Ok(())
}
pub async fn get_project_stats(&self, project_id: &str) -> MemoryResult<ProjectMemoryStats> {
let conn = self.conn.lock().await;
let project_chunks: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1",
params![project_id],
|row| row.get(0),
)?;
let project_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1",
params![project_id],
|row| row.get(0),
)?;
let file_index_chunks: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
params![project_id],
|row| row.get(0),
)?;
let file_index_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
params![project_id],
|row| row.get(0),
)?;
let indexed_files: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_file_index WHERE project_id = ?1",
params![project_id],
|row| row.get(0),
)?;
let status_row: Option<ProjectIndexStatusRow> =
conn
.query_row(
"SELECT last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
FROM project_index_status WHERE project_id = ?1",
params![project_id],
|row| {
Ok((
row.get(0)?,
row.get(1)?,
row.get(2)?,
row.get(3)?,
row.get(4)?,
row.get(5)?,
))
},
)
.optional()?;
let (
last_indexed_at,
last_total_files,
last_processed_files,
last_indexed_files,
last_skipped_files,
last_errors,
) = status_row.unwrap_or((None, None, None, None, None, None));
let last_indexed_at = last_indexed_at.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
});
Ok(ProjectMemoryStats {
project_id: project_id.to_string(),
project_chunks,
project_bytes,
file_index_chunks,
file_index_bytes,
indexed_files,
last_indexed_at,
last_total_files,
last_processed_files,
last_indexed_files,
last_skipped_files,
last_errors,
})
}
pub async fn clear_project_file_index(
&self,
project_id: &str,
vacuum: bool,
) -> MemoryResult<ClearFileIndexResult> {
let conn = self.conn.lock().await;
let chunks_deleted: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
params![project_id],
|row| row.get(0),
)?;
let bytes_estimated: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
params![project_id],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM project_memory_vectors WHERE chunk_id IN
(SELECT id FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file')",
params![project_id],
)?;
conn.execute(
"DELETE FROM project_memory_chunks WHERE project_id = ?1 AND source = 'file'",
params![project_id],
)?;
conn.execute(
"DELETE FROM project_file_index WHERE project_id = ?1",
params![project_id],
)?;
conn.execute(
"DELETE FROM project_index_status WHERE project_id = ?1",
params![project_id],
)?;
drop(conn);
if vacuum {
self.vacuum().await?;
}
Ok(ClearFileIndexResult {
chunks_deleted,
bytes_estimated,
did_vacuum: vacuum,
})
}
pub async fn prune_old_session_chunks(&self, retention_days: u32) -> MemoryResult<u64> {
if retention_days == 0 {
return Ok(0);
}
let conn = self.conn.lock().await;
let cutoff =
(chrono::Utc::now() - chrono::Duration::days(i64::from(retention_days))).to_rfc3339();
conn.execute(
"DELETE FROM session_memory_vectors
WHERE chunk_id IN (
SELECT id FROM session_memory_chunks WHERE created_at < ?1
)",
params![cutoff],
)?;
let deleted = conn.execute(
"DELETE FROM session_memory_chunks WHERE created_at < ?1",
params![cutoff],
)?;
if deleted > 0 {
tracing::info!(
retention_days,
deleted,
"memory hygiene: pruned old session chunks"
);
}
#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
Ok(deleted as u64)
}
pub async fn run_hygiene(&self, env_override_days: u32) -> MemoryResult<u64> {
let retention_days = if env_override_days > 0 {
env_override_days
} else {
let conn = self.conn.lock().await;
let days: Option<i64> = conn
.query_row(
"SELECT session_retention_days FROM memory_config
WHERE project_id = '__global__' LIMIT 1",
[],
|row| row.get(0),
)
.ok();
drop(conn);
days.unwrap_or(30) as u32
};
self.prune_old_session_chunks(retention_days).await
}
pub async fn put_global_memory_record(
&self,
record: &GlobalMemoryRecord,
) -> MemoryResult<GlobalMemoryWriteResult> {
let conn = self.conn.lock().await;
let existing: Option<String> = conn
.query_row(
"SELECT id FROM memory_records
WHERE user_id = ?1
AND source_type = ?2
AND content_hash = ?3
AND run_id = ?4
AND IFNULL(session_id, '') = IFNULL(?5, '')
AND IFNULL(message_id, '') = IFNULL(?6, '')
AND IFNULL(tool_name, '') = IFNULL(?7, '')
LIMIT 1",
params![
record.user_id,
record.source_type,
record.content_hash,
record.run_id,
record.session_id,
record.message_id,
record.tool_name
],
|row| row.get(0),
)
.optional()?;
if let Some(id) = existing {
return Ok(GlobalMemoryWriteResult {
id,
stored: false,
deduped: true,
});
}
let metadata = record
.metadata
.as_ref()
.map(ToString::to_string)
.unwrap_or_default();
let provenance = record
.provenance
.as_ref()
.map(ToString::to_string)
.unwrap_or_default();
conn.execute(
"INSERT INTO memory_records(
id, user_id, source_type, content, content_hash, run_id, session_id, message_id, tool_name,
project_tag, channel_tag, host_tag, metadata, provenance, redaction_status, redaction_count,
visibility, demoted, score_boost, created_at_ms, updated_at_ms, expires_at_ms
) VALUES (
?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9,
?10, ?11, ?12, ?13, ?14, ?15, ?16,
?17, ?18, ?19, ?20, ?21, ?22
)",
params![
record.id,
record.user_id,
record.source_type,
record.content,
record.content_hash,
record.run_id,
record.session_id,
record.message_id,
record.tool_name,
record.project_tag,
record.channel_tag,
record.host_tag,
metadata,
provenance,
record.redaction_status,
i64::from(record.redaction_count),
record.visibility,
if record.demoted { 1i64 } else { 0i64 },
record.score_boost,
record.created_at_ms as i64,
record.updated_at_ms as i64,
record.expires_at_ms.map(|v| v as i64),
],
)?;
Ok(GlobalMemoryWriteResult {
id: record.id.clone(),
stored: true,
deduped: false,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn search_global_memory(
&self,
user_id: &str,
query: &str,
limit: i64,
project_tag: Option<&str>,
channel_tag: Option<&str>,
host_tag: Option<&str>,
) -> MemoryResult<Vec<GlobalMemorySearchHit>> {
let conn = self.conn.lock().await;
let now_ms = chrono::Utc::now().timestamp_millis();
let mut hits = Vec::new();
let fts_query = build_fts_query(query);
let search_limit = limit.clamp(1, 100);
let maybe_rows = conn.prepare(
"SELECT
m.id, m.user_id, m.source_type, m.content, m.content_hash, m.run_id, m.session_id, m.message_id,
m.tool_name, m.project_tag, m.channel_tag, m.host_tag, m.metadata, m.provenance,
m.redaction_status, m.redaction_count, m.visibility, m.demoted, m.score_boost,
m.created_at_ms, m.updated_at_ms, m.expires_at_ms,
bm25(memory_records_fts) AS rank
FROM memory_records_fts
JOIN memory_records m ON m.id = memory_records_fts.id
WHERE memory_records_fts MATCH ?1
AND m.user_id = ?2
AND m.demoted = 0
AND (m.expires_at_ms IS NULL OR m.expires_at_ms > ?3)
AND (?4 IS NULL OR m.project_tag = ?4)
AND (?5 IS NULL OR m.channel_tag = ?5)
AND (?6 IS NULL OR m.host_tag = ?6)
ORDER BY rank ASC
LIMIT ?7"
);
if let Ok(mut stmt) = maybe_rows {
let rows = stmt.query_map(
params![
fts_query,
user_id,
now_ms,
project_tag,
channel_tag,
host_tag,
search_limit
],
|row| {
let record = row_to_global_record(row)?;
let rank = row.get::<_, f64>(22)?;
let score = 1.0 / (1.0 + rank.max(0.0));
Ok(GlobalMemorySearchHit { record, score })
},
)?;
for row in rows {
hits.push(row?);
}
}
if !hits.is_empty() {
return Ok(hits);
}
let like = format!("%{}%", query.trim());
let mut stmt = conn.prepare(
"SELECT
id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
redaction_status, redaction_count, visibility, demoted, score_boost,
created_at_ms, updated_at_ms, expires_at_ms
FROM memory_records
WHERE user_id = ?1
AND demoted = 0
AND (expires_at_ms IS NULL OR expires_at_ms > ?2)
AND (?3 IS NULL OR project_tag = ?3)
AND (?4 IS NULL OR channel_tag = ?4)
AND (?5 IS NULL OR host_tag = ?5)
AND (?6 = '' OR content LIKE ?7)
ORDER BY created_at_ms DESC
LIMIT ?8",
)?;
let rows = stmt.query_map(
params![
user_id,
now_ms,
project_tag,
channel_tag,
host_tag,
query.trim(),
like,
search_limit
],
|row| {
let record = row_to_global_record(row)?;
Ok(GlobalMemorySearchHit {
record,
score: 0.25,
})
},
)?;
for row in rows {
hits.push(row?);
}
Ok(hits)
}
pub async fn list_global_memory(
&self,
user_id: &str,
q: Option<&str>,
project_tag: Option<&str>,
channel_tag: Option<&str>,
limit: i64,
offset: i64,
) -> MemoryResult<Vec<GlobalMemoryRecord>> {
let conn = self.conn.lock().await;
let query = q.unwrap_or("").trim();
let like = format!("%{}%", query);
let mut stmt = conn.prepare(
"SELECT
id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
redaction_status, redaction_count, visibility, demoted, score_boost,
created_at_ms, updated_at_ms, expires_at_ms
FROM memory_records
WHERE user_id = ?1
AND (?2 = '' OR content LIKE ?3 OR source_type LIKE ?3 OR run_id LIKE ?3)
AND (?4 IS NULL OR project_tag = ?4)
AND (?5 IS NULL OR channel_tag = ?5)
ORDER BY created_at_ms DESC
LIMIT ?6 OFFSET ?7",
)?;
let rows = stmt.query_map(
params![
user_id,
query,
like,
project_tag,
channel_tag,
limit.clamp(1, 1000),
offset.max(0)
],
row_to_global_record,
)?;
let mut out = Vec::new();
for row in rows {
out.push(row?);
}
Ok(out)
}
pub async fn set_global_memory_visibility(
&self,
id: &str,
visibility: &str,
demoted: bool,
) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let now_ms = chrono::Utc::now().timestamp_millis();
let changed = conn.execute(
"UPDATE memory_records
SET visibility = ?2, demoted = ?3, updated_at_ms = ?4
WHERE id = ?1",
params![id, visibility, if demoted { 1i64 } else { 0i64 }, now_ms],
)?;
Ok(changed > 0)
}
pub async fn update_global_memory_context(
&self,
id: &str,
visibility: &str,
demoted: bool,
metadata: Option<&serde_json::Value>,
provenance: Option<&serde_json::Value>,
) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let now_ms = chrono::Utc::now().timestamp_millis();
let metadata = metadata.map(ToString::to_string).unwrap_or_default();
let provenance = provenance.map(ToString::to_string).unwrap_or_default();
let changed = conn.execute(
"UPDATE memory_records
SET visibility = ?2, demoted = ?3, metadata = ?4, provenance = ?5, updated_at_ms = ?6
WHERE id = ?1",
params![
id,
visibility,
if demoted { 1i64 } else { 0i64 },
metadata,
provenance,
now_ms,
],
)?;
Ok(changed > 0)
}
pub async fn get_global_memory(&self, id: &str) -> MemoryResult<Option<GlobalMemoryRecord>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT
id, user_id, source_type, content, content_hash, run_id, session_id, message_id,
tool_name, project_tag, channel_tag, host_tag, metadata, provenance,
redaction_status, redaction_count, visibility, demoted, score_boost,
created_at_ms, updated_at_ms, expires_at_ms
FROM memory_records
WHERE id = ?1
LIMIT 1",
)?;
let record = stmt
.query_row(params![id], row_to_global_record)
.optional()?;
Ok(record)
}
pub async fn delete_global_memory(&self, id: &str) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let changed = conn.execute("DELETE FROM memory_records WHERE id = ?1", params![id])?;
Ok(changed > 0)
}
}
fn row_to_chunk(row: &Row, tier: MemoryTier) -> Result<MemoryChunk, rusqlite::Error> {
let id: String = row.get(0)?;
let content: String = row.get(1)?;
let (session_id, project_id, source_idx, created_at_idx, token_count_idx, metadata_idx) =
match tier {
MemoryTier::Session => (
Some(row.get(2)?),
row.get(3)?,
4usize,
5usize,
6usize,
7usize,
),
MemoryTier::Project => (
row.get(2)?,
Some(row.get(3)?),
4usize,
5usize,
6usize,
7usize,
),
MemoryTier::Global => (None, None, 2usize, 3usize, 4usize, 5usize),
};
let source: String = row.get(source_idx)?;
let created_at_str: String = row.get(created_at_idx)?;
let token_count: i64 = row.get(token_count_idx)?;
let metadata_str: Option<String> = row.get(metadata_idx)?;
let created_at = DateTime::parse_from_rfc3339(&created_at_str)
.map_err(|e| {
rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(e))
})?
.with_timezone(&Utc);
let metadata = metadata_str
.filter(|s| !s.is_empty())
.and_then(|s| serde_json::from_str(&s).ok());
let source_path = row.get::<_, Option<String>>("source_path").ok().flatten();
let source_mtime = row.get::<_, Option<i64>>("source_mtime").ok().flatten();
let source_size = row.get::<_, Option<i64>>("source_size").ok().flatten();
let source_hash = row.get::<_, Option<String>>("source_hash").ok().flatten();
Ok(MemoryChunk {
id,
content,
tier,
session_id,
project_id,
source,
source_path,
source_mtime,
source_size,
source_hash,
created_at,
token_count,
metadata,
})
}
fn require_scope_id<'a>(tier: MemoryTier, scope: Option<&'a str>) -> MemoryResult<&'a str> {
scope
.filter(|value| !value.trim().is_empty())
.ok_or_else(|| {
crate::types::MemoryError::InvalidConfig(match tier {
MemoryTier::Session => "tier=session requires session_id".to_string(),
MemoryTier::Project => "tier=project requires project_id".to_string(),
MemoryTier::Global => "tier=global does not require a scope id".to_string(),
})
})
}
fn row_to_global_record(row: &Row) -> Result<GlobalMemoryRecord, rusqlite::Error> {
let metadata_str: Option<String> = row.get(12)?;
let provenance_str: Option<String> = row.get(13)?;
Ok(GlobalMemoryRecord {
id: row.get(0)?,
user_id: row.get(1)?,
source_type: row.get(2)?,
content: row.get(3)?,
content_hash: row.get(4)?,
run_id: row.get(5)?,
session_id: row.get(6)?,
message_id: row.get(7)?,
tool_name: row.get(8)?,
project_tag: row.get(9)?,
channel_tag: row.get(10)?,
host_tag: row.get(11)?,
metadata: metadata_str
.filter(|s| !s.is_empty())
.and_then(|s| serde_json::from_str(&s).ok()),
provenance: provenance_str
.filter(|s| !s.is_empty())
.and_then(|s| serde_json::from_str(&s).ok()),
redaction_status: row.get(14)?,
redaction_count: row.get::<_, i64>(15)? as u32,
visibility: row.get(16)?,
demoted: row.get::<_, i64>(17)? != 0,
score_boost: row.get(18)?,
created_at_ms: row.get::<_, i64>(19)? as u64,
updated_at_ms: row.get::<_, i64>(20)? as u64,
expires_at_ms: row.get::<_, Option<i64>>(21)?.map(|v| v as u64),
})
}
impl MemoryDatabase {
pub async fn get_node_by_uri(
&self,
uri: &str,
) -> MemoryResult<Option<crate::types::MemoryNode>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
FROM memory_nodes WHERE uri = ?1",
)?;
let result = stmt.query_row(params![uri], |row| {
let node_type_str: String = row.get(3)?;
let node_type = node_type_str
.parse()
.unwrap_or(crate::types::NodeType::File);
let metadata_str: Option<String> = row.get(6)?;
Ok(crate::types::MemoryNode {
id: row.get(0)?,
uri: row.get(1)?,
parent_uri: row.get(2)?,
node_type,
created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
})
});
match result {
Ok(node) => Ok(Some(node)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(MemoryError::Database(e)),
}
}
pub async fn create_node(
&self,
uri: &str,
parent_uri: Option<&str>,
node_type: crate::types::NodeType,
metadata: Option<&serde_json::Value>,
) -> MemoryResult<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let metadata_str = metadata.map(|m| serde_json::to_string(m)).transpose()?;
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO memory_nodes (id, uri, parent_uri, node_type, created_at, updated_at, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![id, uri, parent_uri, node_type.to_string(), now, now, metadata_str],
)?;
Ok(id)
}
pub async fn list_directory(&self, uri: &str) -> MemoryResult<Vec<crate::types::MemoryNode>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, uri, parent_uri, node_type, created_at, updated_at, metadata
FROM memory_nodes WHERE parent_uri = ?1 ORDER BY node_type DESC, uri ASC",
)?;
let rows = stmt.query_map(params![uri], |row| {
let node_type_str: String = row.get(3)?;
let node_type = node_type_str
.parse()
.unwrap_or(crate::types::NodeType::File);
let metadata_str: Option<String> = row.get(6)?;
Ok(crate::types::MemoryNode {
id: row.get(0)?,
uri: row.get(1)?,
parent_uri: row.get(2)?,
node_type,
created_at: row.get::<_, String>(4)?.parse().unwrap_or_default(),
updated_at: row.get::<_, String>(5)?.parse().unwrap_or_default(),
metadata: metadata_str.and_then(|s| serde_json::from_str(&s).ok()),
})
})?;
rows.collect::<Result<Vec<_>, _>>()
.map_err(MemoryError::Database)
}
pub async fn get_layer(
&self,
node_id: &str,
layer_type: crate::types::LayerType,
) -> MemoryResult<Option<crate::types::MemoryLayer>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, node_id, layer_type, content, token_count, embedding_id, created_at, source_chunk_id
FROM memory_layers WHERE node_id = ?1 AND layer_type = ?2"
)?;
let result = stmt.query_row(params![node_id, layer_type.to_string()], |row| {
let layer_type_str: String = row.get(2)?;
let layer_type = layer_type_str
.parse()
.unwrap_or(crate::types::LayerType::L2);
Ok(crate::types::MemoryLayer {
id: row.get(0)?,
node_id: row.get(1)?,
layer_type,
content: row.get(3)?,
token_count: row.get(4)?,
embedding_id: row.get(5)?,
created_at: row.get::<_, String>(6)?.parse().unwrap_or_default(),
source_chunk_id: row.get(7)?,
})
});
match result {
Ok(layer) => Ok(Some(layer)),
Err(rusqlite::Error::QueryReturnedNoRows) => Ok(None),
Err(e) => Err(MemoryError::Database(e)),
}
}
pub async fn create_layer(
&self,
node_id: &str,
layer_type: crate::types::LayerType,
content: &str,
token_count: i64,
source_chunk_id: Option<&str>,
) -> MemoryResult<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO memory_layers (id, node_id, layer_type, content, token_count, created_at, source_chunk_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![id, node_id, layer_type.to_string(), content, token_count, now, source_chunk_id],
)?;
Ok(id)
}
pub async fn get_children_tree(
&self,
parent_uri: &str,
max_depth: usize,
) -> MemoryResult<Vec<crate::types::TreeNode>> {
if max_depth == 0 {
return Ok(Vec::new());
}
let children = self.list_directory(parent_uri).await?;
let mut tree_nodes = Vec::new();
for child in children {
let layer_summary = self.get_layer_summary(&child.id).await?;
let grandchildren = if child.node_type == crate::types::NodeType::Directory {
Box::pin(self.get_children_tree(&child.uri, max_depth.saturating_sub(1))).await?
} else {
Vec::new()
};
tree_nodes.push(crate::types::TreeNode {
node: child,
children: grandchildren,
layer_summary,
});
}
Ok(tree_nodes)
}
async fn get_layer_summary(
&self,
node_id: &str,
) -> MemoryResult<Option<crate::types::LayerSummary>> {
let l0 = self.get_layer(node_id, crate::types::LayerType::L0).await?;
let l1 = self.get_layer(node_id, crate::types::LayerType::L1).await?;
let has_l2 = self
.get_layer(node_id, crate::types::LayerType::L2)
.await?
.is_some();
if l0.is_none() && l1.is_none() && !has_l2 {
return Ok(None);
}
Ok(Some(crate::types::LayerSummary {
l0_preview: l0.map(|l| truncate_string(&l.content, 100)),
l1_preview: l1.map(|l| truncate_string(&l.content, 200)),
has_l2,
}))
}
pub async fn node_exists(&self, uri: &str) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM memory_nodes WHERE uri = ?1",
params![uri],
|row| row.get(0),
)?;
Ok(count > 0)
}
}
fn truncate_string(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
format!("{}...", &s[..max_len.saturating_sub(3)])
}
}
fn build_fts_query(query: &str) -> String {
let tokens = query
.split_whitespace()
.filter_map(|tok| {
let cleaned =
tok.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '_' && c != '-');
if cleaned.is_empty() {
None
} else {
Some(format!("\"{}\"", cleaned))
}
})
.collect::<Vec<_>>();
if tokens.is_empty() {
"\"\"".to_string()
} else {
tokens.join(" OR ")
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
async fn setup_test_db() -> (MemoryDatabase, TempDir) {
let temp_dir = TempDir::new().unwrap();
let db_path = temp_dir.path().join("test_memory.db");
let db = MemoryDatabase::new(&db_path).await.unwrap();
(db, temp_dir)
}
#[tokio::test]
async fn test_init_schema() {
let (db, _temp) = setup_test_db().await;
let stats = db.get_stats().await.unwrap();
assert_eq!(stats.total_chunks, 0);
}
#[tokio::test]
async fn test_store_and_retrieve_chunk() {
let (db, _temp) = setup_test_db().await;
let chunk = MemoryChunk {
id: "test-1".to_string(),
content: "Test content".to_string(),
tier: MemoryTier::Session,
session_id: Some("session-1".to_string()),
project_id: Some("project-1".to_string()),
source: "user_message".to_string(),
source_path: None,
source_mtime: None,
source_size: None,
source_hash: None,
created_at: Utc::now(),
token_count: 10,
metadata: None,
};
let embedding = vec![0.1f32; DEFAULT_EMBEDDING_DIMENSION];
db.store_chunk(&chunk, &embedding).await.unwrap();
let chunks = db.get_session_chunks("session-1").await.unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].content, "Test content");
}
#[tokio::test]
async fn test_store_and_retrieve_global_chunk() {
let (db, _temp) = setup_test_db().await;
let chunk = MemoryChunk {
id: "global-1".to_string(),
content: "Global note".to_string(),
tier: MemoryTier::Global,
session_id: None,
project_id: None,
source: "agent_note".to_string(),
source_path: None,
source_mtime: None,
source_size: None,
source_hash: None,
created_at: Utc::now(),
token_count: 7,
metadata: Some(serde_json::json!({"kind":"test"})),
};
let embedding = vec![0.2f32; DEFAULT_EMBEDDING_DIMENSION];
db.store_chunk(&chunk, &embedding).await.unwrap();
let chunks = db.get_global_chunks(10).await.unwrap();
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].content, "Global note");
assert_eq!(chunks[0].source, "agent_note");
assert_eq!(chunks[0].token_count, 7);
assert_eq!(chunks[0].tier, MemoryTier::Global);
}
#[tokio::test]
async fn test_global_chunk_exists_by_source_hash() {
let (db, _temp) = setup_test_db().await;
let chunk = MemoryChunk {
id: "global-hash".to_string(),
content: "Global hash note".to_string(),
tier: MemoryTier::Global,
session_id: None,
project_id: None,
source: "chat_exchange".to_string(),
source_path: None,
source_mtime: None,
source_size: None,
source_hash: Some("hash-123".to_string()),
created_at: Utc::now(),
token_count: 5,
metadata: None,
};
let embedding = vec![0.3f32; DEFAULT_EMBEDDING_DIMENSION];
db.store_chunk(&chunk, &embedding).await.unwrap();
assert!(db
.global_chunk_exists_by_source_hash("hash-123")
.await
.unwrap());
assert!(!db
.global_chunk_exists_by_source_hash("missing-hash")
.await
.unwrap());
}
#[tokio::test]
async fn test_config_crud() {
let (db, _temp) = setup_test_db().await;
let config = db.get_or_create_config("project-1").await.unwrap();
assert_eq!(config.max_chunks, 10000);
let new_config = MemoryConfig {
max_chunks: 5000,
..Default::default()
};
db.update_config("project-1", &new_config).await.unwrap();
let updated = db.get_or_create_config("project-1").await.unwrap();
assert_eq!(updated.max_chunks, 5000);
}
#[tokio::test]
async fn test_global_memory_put_search_and_dedup() {
let (db, _temp) = setup_test_db().await;
let now = chrono::Utc::now().timestamp_millis() as u64;
let record = GlobalMemoryRecord {
id: "gm-1".to_string(),
user_id: "user-a".to_string(),
source_type: "user_message".to_string(),
content: "remember rust workspace layout".to_string(),
content_hash: "h1".to_string(),
run_id: "run-1".to_string(),
session_id: Some("s1".to_string()),
message_id: Some("m1".to_string()),
tool_name: None,
project_tag: Some("proj-x".to_string()),
channel_tag: None,
host_tag: None,
metadata: None,
provenance: None,
redaction_status: "passed".to_string(),
redaction_count: 0,
visibility: "private".to_string(),
demoted: false,
score_boost: 0.0,
created_at_ms: now,
updated_at_ms: now,
expires_at_ms: None,
};
let first = db.put_global_memory_record(&record).await.unwrap();
assert!(first.stored);
let second = db.put_global_memory_record(&record).await.unwrap();
assert!(second.deduped);
let hits = db
.search_global_memory("user-a", "rust workspace", 5, Some("proj-x"), None, None)
.await
.unwrap();
assert!(!hits.is_empty());
assert_eq!(hits[0].record.id, "gm-1");
}
}