impl MemoryDatabase {
pub async fn new(db_path: &Path) -> MemoryResult<Self> {
if let Some(parent) = db_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
unsafe {
sqlite3_auto_extension(Some(std::mem::transmute::<
*const (),
unsafe extern "C" fn(
*mut rusqlite::ffi::sqlite3,
*mut *mut i8,
*const rusqlite::ffi::sqlite3_api_routines,
) -> i32,
>(sqlite3_vec_init as *const ())));
}
let conn = Connection::open(db_path)?;
conn.busy_timeout(Duration::from_secs(10))?;
conn.query_row("PRAGMA journal_mode = WAL", [], |_| Ok(()))?;
conn.execute("PRAGMA synchronous = NORMAL", [])?;
let db = Self {
conn: Arc::new(Mutex::new(conn)),
db_path: db_path.to_path_buf(),
};
let _schema_init_guard = SCHEMA_INIT_LOCK.lock().await;
db.init_schema().await?;
if let Err(err) = db.validate_vector_tables().await {
match &err {
crate::types::MemoryError::Database(db_err)
if Self::is_vector_table_error(db_err) =>
{
tracing::warn!(
"Detected vector table corruption during startup ({}). Recreating vector tables.",
db_err
);
db.recreate_vector_tables().await?;
}
_ => return Err(err),
}
}
db.validate_integrity().await?;
Ok(db)
}
async fn validate_integrity(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let check = match conn.query_row("PRAGMA quick_check(1)", [], |row| row.get::<_, String>(0))
{
Ok(value) => value,
Err(err) => {
tracing::warn!(
"Skipping strict PRAGMA quick_check due to probe error: {}",
err
);
return Ok(());
}
};
if check.trim().eq_ignore_ascii_case("ok") {
return Ok(());
}
let lowered = check.to_lowercase();
if lowered.contains("malformed")
|| lowered.contains("corrupt")
|| lowered.contains("database disk image is malformed")
{
return Err(crate::types::MemoryError::InvalidConfig(format!(
"malformed database integrity check: {}",
check
)));
}
tracing::warn!(
"PRAGMA quick_check returned non-ok status but not a hard corruption signal: {}",
check
);
Ok(())
}
async fn init_schema(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
conn.execute(
"CREATE TABLE IF NOT EXISTS session_memory_chunks (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
session_id TEXT NOT NULL,
project_id TEXT,
source TEXT NOT NULL,
created_at TEXT NOT NULL,
token_count INTEGER NOT NULL DEFAULT 0,
metadata TEXT
)",
[],
)?;
let session_existing_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(session_memory_chunks)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !session_existing_cols.contains("source_path") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_path TEXT",
[],
)?;
}
if !session_existing_cols.contains("source_mtime") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_mtime INTEGER",
[],
)?;
}
if !session_existing_cols.contains("source_size") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_size INTEGER",
[],
)?;
}
if !session_existing_cols.contains("source_hash") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN source_hash TEXT",
[],
)?;
}
if !session_existing_cols.contains("tenant_org_id") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN tenant_org_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !session_existing_cols.contains("tenant_workspace_id") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN tenant_workspace_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !session_existing_cols.contains("tenant_deployment_id") {
conn.execute(
"ALTER TABLE session_memory_chunks ADD COLUMN tenant_deployment_id TEXT",
[],
)?;
}
conn.execute(
"UPDATE session_memory_chunks SET tenant_org_id = 'local' WHERE tenant_org_id IS NULL OR tenant_org_id = ''",
[],
)?;
conn.execute(
"UPDATE session_memory_chunks SET tenant_workspace_id = 'local' WHERE tenant_workspace_id IS NULL OR tenant_workspace_id = ''",
[],
)?;
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS project_memory_chunks (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
project_id TEXT NOT NULL,
session_id TEXT,
source TEXT NOT NULL,
created_at TEXT NOT NULL,
token_count INTEGER NOT NULL DEFAULT 0,
metadata TEXT
)",
[],
)?;
let existing_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(project_memory_chunks)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !existing_cols.contains("source_path") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_path TEXT",
[],
)?;
}
if !existing_cols.contains("source_mtime") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_mtime INTEGER",
[],
)?;
}
if !existing_cols.contains("source_size") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_size INTEGER",
[],
)?;
}
if !existing_cols.contains("source_hash") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN source_hash TEXT",
[],
)?;
}
if !existing_cols.contains("tenant_org_id") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN tenant_org_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !existing_cols.contains("tenant_workspace_id") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN tenant_workspace_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !existing_cols.contains("tenant_deployment_id") {
conn.execute(
"ALTER TABLE project_memory_chunks ADD COLUMN tenant_deployment_id TEXT",
[],
)?;
}
conn.execute(
"UPDATE project_memory_chunks SET tenant_org_id = 'local' WHERE tenant_org_id IS NULL OR tenant_org_id = ''",
[],
)?;
conn.execute(
"UPDATE project_memory_chunks SET tenant_workspace_id = 'local' WHERE tenant_workspace_id IS NULL OR tenant_workspace_id = ''",
[],
)?;
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS project_file_index (
project_id TEXT NOT NULL,
path TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL,
PRIMARY KEY(project_id, path)
)",
[],
)?;
let project_file_index_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(project_file_index)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !project_file_index_cols.contains("tenant_org_id") {
conn.execute(
"CREATE TABLE project_file_index_new (
tenant_org_id TEXT NOT NULL DEFAULT 'local',
tenant_workspace_id TEXT NOT NULL DEFAULT 'local',
tenant_deployment_id TEXT NOT NULL DEFAULT '',
project_id TEXT NOT NULL,
path TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL,
PRIMARY KEY(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id, path)
)",
[],
)?;
conn.execute(
"INSERT OR REPLACE INTO project_file_index_new
(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id, path, mtime, size, hash, indexed_at)
SELECT 'local', 'local', '', project_id, path, mtime, size, hash, indexed_at
FROM project_file_index",
[],
)?;
conn.execute("DROP TABLE project_file_index", [])?;
conn.execute(
"ALTER TABLE project_file_index_new RENAME TO project_file_index",
[],
)?;
}
conn.execute(
"CREATE TABLE IF NOT EXISTS session_file_index (
session_id TEXT NOT NULL,
path TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL,
PRIMARY KEY(session_id, path)
)",
[],
)?;
let session_file_index_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(session_file_index)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !session_file_index_cols.contains("tenant_org_id") {
conn.execute(
"CREATE TABLE session_file_index_new (
tenant_org_id TEXT NOT NULL DEFAULT 'local',
tenant_workspace_id TEXT NOT NULL DEFAULT 'local',
tenant_deployment_id TEXT NOT NULL DEFAULT '',
session_id TEXT NOT NULL,
path TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL,
PRIMARY KEY(tenant_org_id, tenant_workspace_id, tenant_deployment_id, session_id, path)
)",
[],
)?;
conn.execute(
"INSERT OR REPLACE INTO session_file_index_new
(tenant_org_id, tenant_workspace_id, tenant_deployment_id, session_id, path, mtime, size, hash, indexed_at)
SELECT 'local', 'local', '', session_id, path, mtime, size, hash, indexed_at
FROM session_file_index",
[],
)?;
conn.execute("DROP TABLE session_file_index", [])?;
conn.execute(
"ALTER TABLE session_file_index_new RENAME TO session_file_index",
[],
)?;
}
conn.execute(
"CREATE TABLE IF NOT EXISTS project_index_status (
project_id TEXT PRIMARY KEY,
last_indexed_at TEXT,
last_total_files INTEGER,
last_processed_files INTEGER,
last_indexed_files INTEGER,
last_skipped_files INTEGER,
last_errors INTEGER
)",
[],
)?;
let project_index_status_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(project_index_status)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !project_index_status_cols.contains("tenant_org_id") {
conn.execute(
"CREATE TABLE project_index_status_new (
tenant_org_id TEXT NOT NULL DEFAULT 'local',
tenant_workspace_id TEXT NOT NULL DEFAULT 'local',
tenant_deployment_id TEXT NOT NULL DEFAULT '',
project_id TEXT NOT NULL,
last_indexed_at TEXT,
last_total_files INTEGER,
last_processed_files INTEGER,
last_indexed_files INTEGER,
last_skipped_files INTEGER,
last_errors INTEGER,
PRIMARY KEY(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id)
)",
[],
)?;
conn.execute(
"INSERT OR REPLACE INTO project_index_status_new
(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id, last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors)
SELECT 'local', 'local', '', project_id, last_indexed_at, last_total_files, last_processed_files, last_indexed_files, last_skipped_files, last_errors
FROM project_index_status",
[],
)?;
conn.execute("DROP TABLE project_index_status", [])?;
conn.execute(
"ALTER TABLE project_index_status_new RENAME TO project_index_status",
[],
)?;
}
conn.execute(
"CREATE TABLE IF NOT EXISTS global_memory_chunks (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
source TEXT NOT NULL,
created_at TEXT NOT NULL,
token_count INTEGER NOT NULL DEFAULT 0,
metadata TEXT
)",
[],
)?;
let global_existing_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(global_memory_chunks)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !global_existing_cols.contains("source_path") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_path TEXT",
[],
)?;
}
if !global_existing_cols.contains("source_mtime") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_mtime INTEGER",
[],
)?;
}
if !global_existing_cols.contains("source_size") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_size INTEGER",
[],
)?;
}
if !global_existing_cols.contains("source_hash") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN source_hash TEXT",
[],
)?;
}
if !global_existing_cols.contains("tenant_org_id") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN tenant_org_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !global_existing_cols.contains("tenant_workspace_id") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN tenant_workspace_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !global_existing_cols.contains("tenant_deployment_id") {
conn.execute(
"ALTER TABLE global_memory_chunks ADD COLUMN tenant_deployment_id TEXT",
[],
)?;
}
conn.execute(
"UPDATE global_memory_chunks SET tenant_org_id = 'local' WHERE tenant_org_id IS NULL OR tenant_org_id = ''",
[],
)?;
conn.execute(
"UPDATE global_memory_chunks SET tenant_workspace_id = 'local' WHERE tenant_workspace_id IS NULL OR tenant_workspace_id = ''",
[],
)?;
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_config (
project_id TEXT PRIMARY KEY,
max_chunks INTEGER NOT NULL DEFAULT 10000,
chunk_size INTEGER NOT NULL DEFAULT 512,
retrieval_k INTEGER NOT NULL DEFAULT 5,
auto_cleanup INTEGER NOT NULL DEFAULT 1,
session_retention_days INTEGER NOT NULL DEFAULT 30,
token_budget INTEGER NOT NULL DEFAULT 5000,
chunk_overlap INTEGER NOT NULL DEFAULT 64,
updated_at TEXT NOT NULL
)",
[],
)?;
let memory_config_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(memory_config)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !memory_config_cols.contains("tenant_org_id") {
conn.execute(
"CREATE TABLE memory_config_new (
tenant_org_id TEXT NOT NULL DEFAULT 'local',
tenant_workspace_id TEXT NOT NULL DEFAULT 'local',
tenant_deployment_id TEXT NOT NULL DEFAULT '',
project_id TEXT NOT NULL,
max_chunks INTEGER NOT NULL DEFAULT 10000,
chunk_size INTEGER NOT NULL DEFAULT 512,
retrieval_k INTEGER NOT NULL DEFAULT 5,
auto_cleanup INTEGER NOT NULL DEFAULT 1,
session_retention_days INTEGER NOT NULL DEFAULT 30,
token_budget INTEGER NOT NULL DEFAULT 5000,
chunk_overlap INTEGER NOT NULL DEFAULT 64,
updated_at TEXT NOT NULL,
PRIMARY KEY(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id)
)",
[],
)?;
conn.execute(
"INSERT OR REPLACE INTO memory_config_new
(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id,
max_chunks, chunk_size, retrieval_k, auto_cleanup, session_retention_days,
token_budget, chunk_overlap, updated_at)
SELECT 'local', 'local', '', project_id, max_chunks, chunk_size, retrieval_k,
auto_cleanup, session_retention_days, token_budget, chunk_overlap, updated_at
FROM memory_config",
[],
)?;
conn.execute("DROP TABLE memory_config", [])?;
conn.execute("ALTER TABLE memory_config_new RENAME TO memory_config", [])?;
}
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_config_tenant_project
ON memory_config(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_cleanup_log (
id TEXT PRIMARY KEY,
cleanup_type TEXT NOT NULL,
tier TEXT NOT NULL,
project_id TEXT,
session_id TEXT,
chunks_deleted INTEGER NOT NULL DEFAULT 0,
bytes_reclaimed INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL
)",
[],
)?;
let cleanup_log_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(memory_cleanup_log)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !cleanup_log_cols.contains("tenant_org_id") {
conn.execute(
"ALTER TABLE memory_cleanup_log ADD COLUMN tenant_org_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !cleanup_log_cols.contains("tenant_workspace_id") {
conn.execute(
"ALTER TABLE memory_cleanup_log ADD COLUMN tenant_workspace_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !cleanup_log_cols.contains("tenant_deployment_id") {
conn.execute(
"ALTER TABLE memory_cleanup_log ADD COLUMN tenant_deployment_id TEXT",
[],
)?;
}
conn.execute(
"UPDATE memory_cleanup_log SET tenant_org_id = 'local' WHERE tenant_org_id IS NULL OR tenant_org_id = ''",
[],
)?;
conn.execute(
"UPDATE memory_cleanup_log SET tenant_workspace_id = 'local' WHERE tenant_workspace_id IS NULL OR tenant_workspace_id = ''",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_chunks_session ON session_memory_chunks(session_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_chunks_tenant_session ON session_memory_chunks(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), session_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_chunks_project ON session_memory_chunks(project_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_file_chunks ON session_memory_chunks(session_id, source, source_path)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_chunks_project ON project_memory_chunks(project_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_chunks_tenant_project ON project_memory_chunks(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), project_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_file_chunks ON project_memory_chunks(project_id, source, source_path)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_chunks_created ON session_memory_chunks(created_at)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_global_file_chunks ON global_memory_chunks(source, source_path)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_global_chunks_tenant_created ON global_memory_chunks(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), created_at DESC)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_cleanup_log_created ON memory_cleanup_log(created_at)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_cleanup_log_tenant_created ON memory_cleanup_log(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), created_at DESC)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS global_file_index (
path TEXT PRIMARY KEY,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL
)",
[],
)?;
let global_file_index_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(global_file_index)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !global_file_index_cols.contains("tenant_org_id") {
conn.execute(
"CREATE TABLE global_file_index_new (
tenant_org_id TEXT NOT NULL DEFAULT 'local',
tenant_workspace_id TEXT NOT NULL DEFAULT 'local',
tenant_deployment_id TEXT NOT NULL DEFAULT '',
path TEXT NOT NULL,
mtime INTEGER NOT NULL,
size INTEGER NOT NULL,
hash TEXT NOT NULL,
indexed_at TEXT NOT NULL,
PRIMARY KEY(tenant_org_id, tenant_workspace_id, tenant_deployment_id, path)
)",
[],
)?;
conn.execute(
"INSERT OR REPLACE INTO global_file_index_new
(tenant_org_id, tenant_workspace_id, tenant_deployment_id, path, mtime, size, hash, indexed_at)
SELECT 'local', 'local', '', path, mtime, size, hash, indexed_at
FROM global_file_index",
[],
)?;
conn.execute("DROP TABLE global_file_index", [])?;
conn.execute(
"ALTER TABLE global_file_index_new RENAME TO global_file_index",
[],
)?;
}
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_file_index_tenant_project ON project_file_index(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), project_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_session_file_index_tenant_session ON session_file_index(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), session_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_project_index_status_tenant_project ON project_index_status(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), project_id)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS source_object_lifecycle (
tenant_org_id TEXT NOT NULL,
tenant_workspace_id TEXT NOT NULL,
tenant_deployment_id TEXT NOT NULL DEFAULT '',
source_object_id TEXT NOT NULL,
source_binding_id TEXT NOT NULL,
connector_id TEXT NOT NULL,
state TEXT NOT NULL,
tier TEXT NOT NULL,
session_id TEXT,
project_id TEXT,
import_namespace TEXT NOT NULL,
indexed_path TEXT NOT NULL,
native_object_id TEXT NOT NULL,
resource_ref TEXT NOT NULL,
data_class TEXT NOT NULL,
content_hash TEXT,
source_hash TEXT,
first_seen_at_ms INTEGER NOT NULL,
last_seen_at_ms INTEGER NOT NULL,
tombstoned_at_ms INTEGER,
metadata TEXT,
PRIMARY KEY(tenant_org_id, tenant_workspace_id, tenant_deployment_id, source_object_id)
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_source_object_lifecycle_binding
ON source_object_lifecycle(tenant_org_id, tenant_workspace_id, tenant_deployment_id, source_binding_id, state)",
[],
)?;
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_source_object_lifecycle_native
ON source_object_lifecycle(tenant_org_id, tenant_workspace_id, tenant_deployment_id, source_binding_id, native_object_id)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS knowledge_spaces (
id TEXT PRIMARY KEY,
scope TEXT NOT NULL,
project_id TEXT,
namespace TEXT,
title TEXT,
description TEXT,
trust_level TEXT NOT NULL,
metadata TEXT,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
)",
[],
)?;
conn.execute(
"DROP INDEX IF EXISTS idx_knowledge_spaces_scope_project_namespace",
[],
)?;
let knowledge_space_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(knowledge_spaces)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !knowledge_space_cols.contains("tenant_org_id") {
conn.execute(
"CREATE TABLE knowledge_spaces_new (
id TEXT PRIMARY KEY,
tenant_org_id TEXT NOT NULL DEFAULT 'local',
tenant_workspace_id TEXT NOT NULL DEFAULT 'local',
tenant_deployment_id TEXT NOT NULL DEFAULT '',
scope TEXT NOT NULL,
project_id TEXT,
namespace TEXT,
title TEXT,
description TEXT,
trust_level TEXT NOT NULL,
metadata TEXT,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL
)",
[],
)?;
conn.execute(
"INSERT OR REPLACE INTO knowledge_spaces_new
(id, tenant_org_id, tenant_workspace_id, tenant_deployment_id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms)
SELECT id, 'local', 'local', '', scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
FROM knowledge_spaces",
[],
)?;
conn.execute("DROP TABLE knowledge_spaces", [])?;
conn.execute(
"ALTER TABLE knowledge_spaces_new RENAME TO knowledge_spaces",
[],
)?;
}
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_spaces_tenant_scope_project_namespace
ON knowledge_spaces(tenant_org_id, tenant_workspace_id, tenant_deployment_id, scope, IFNULL(project_id, ''), IFNULL(namespace, ''))",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_knowledge_spaces_tenant_project_updated
ON knowledge_spaces(tenant_org_id, tenant_workspace_id, tenant_deployment_id, IFNULL(project_id, ''), updated_at_ms DESC)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS knowledge_items (
id TEXT PRIMARY KEY,
space_id TEXT NOT NULL,
coverage_key TEXT NOT NULL,
dedupe_key TEXT NOT NULL,
item_type TEXT NOT NULL,
title TEXT NOT NULL,
summary TEXT,
payload TEXT NOT NULL,
trust_level TEXT NOT NULL,
status TEXT NOT NULL,
run_id TEXT,
artifact_refs TEXT NOT NULL,
source_memory_ids TEXT NOT NULL,
freshness_expires_at_ms INTEGER,
metadata TEXT,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
FOREIGN KEY(space_id) REFERENCES knowledge_spaces(id)
)",
[],
)?;
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_knowledge_items_space_dedupe
ON knowledge_items(space_id, dedupe_key)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_knowledge_items_space_coverage
ON knowledge_items(space_id, coverage_key)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_knowledge_items_space_created
ON knowledge_items(space_id, created_at_ms DESC)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS knowledge_coverage (
coverage_key TEXT NOT NULL,
space_id TEXT NOT NULL,
latest_item_id TEXT,
latest_dedupe_key TEXT,
last_seen_at_ms INTEGER NOT NULL,
last_promoted_at_ms INTEGER,
freshness_expires_at_ms INTEGER,
metadata TEXT,
PRIMARY KEY(coverage_key, space_id),
FOREIGN KEY(space_id) REFERENCES knowledge_spaces(id)
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_knowledge_coverage_space_seen
ON knowledge_coverage(space_id, last_seen_at_ms DESC)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_records (
id TEXT PRIMARY KEY,
tenant_org_id TEXT NOT NULL DEFAULT 'local',
tenant_workspace_id TEXT NOT NULL DEFAULT 'local',
tenant_deployment_id TEXT,
user_id TEXT NOT NULL,
source_type TEXT NOT NULL,
content TEXT NOT NULL,
content_hash TEXT NOT NULL,
run_id TEXT NOT NULL,
session_id TEXT,
message_id TEXT,
tool_name TEXT,
project_tag TEXT,
channel_tag TEXT,
host_tag TEXT,
metadata TEXT,
provenance TEXT,
redaction_status TEXT NOT NULL,
redaction_count INTEGER NOT NULL DEFAULT 0,
visibility TEXT NOT NULL DEFAULT 'private',
demoted INTEGER NOT NULL DEFAULT 0,
score_boost REAL NOT NULL DEFAULT 0.0,
created_at_ms INTEGER NOT NULL,
updated_at_ms INTEGER NOT NULL,
expires_at_ms INTEGER
)",
[],
)?;
let memory_record_cols: HashSet<String> = {
let mut stmt = conn.prepare("PRAGMA table_info(memory_records)")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
rows.collect::<Result<HashSet<_>, _>>()?
};
if !memory_record_cols.contains("tenant_org_id") {
conn.execute(
"ALTER TABLE memory_records ADD COLUMN tenant_org_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !memory_record_cols.contains("tenant_workspace_id") {
conn.execute(
"ALTER TABLE memory_records ADD COLUMN tenant_workspace_id TEXT NOT NULL DEFAULT 'local'",
[],
)?;
}
if !memory_record_cols.contains("tenant_deployment_id") {
conn.execute(
"ALTER TABLE memory_records ADD COLUMN tenant_deployment_id TEXT",
[],
)?;
}
conn.execute(
"UPDATE memory_records
SET tenant_org_id = 'local'
WHERE tenant_org_id IS NULL OR tenant_org_id = ''",
[],
)?;
conn.execute(
"UPDATE memory_records
SET tenant_workspace_id = 'local'
WHERE tenant_workspace_id IS NULL OR tenant_workspace_id = ''",
[],
)?;
conn.execute("DROP INDEX IF EXISTS idx_memory_records_dedup", [])?;
conn.execute(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_memory_records_dedup
ON memory_records(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), user_id, source_type, content_hash, run_id, IFNULL(session_id, ''), IFNULL(message_id, ''), IFNULL(tool_name, ''))",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_records_user_created
ON memory_records(tenant_org_id, tenant_workspace_id, IFNULL(tenant_deployment_id, ''), user_id, created_at_ms DESC)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_records_run
ON memory_records(run_id)",
[],
)?;
conn.execute(
"CREATE VIRTUAL TABLE IF NOT EXISTS memory_records_fts USING fts5(
id UNINDEXED,
user_id UNINDEXED,
content
)",
[],
)?;
conn.execute(
"CREATE TRIGGER IF NOT EXISTS memory_records_ai AFTER INSERT ON memory_records BEGIN
INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
END",
[],
)?;
conn.execute(
"CREATE TRIGGER IF NOT EXISTS memory_records_ad AFTER DELETE ON memory_records BEGIN
DELETE FROM memory_records_fts WHERE id = old.id;
END",
[],
)?;
conn.execute(
"CREATE TRIGGER IF NOT EXISTS memory_records_au AFTER UPDATE OF content, user_id ON memory_records BEGIN
DELETE FROM memory_records_fts WHERE id = old.id;
INSERT INTO memory_records_fts(id, user_id, content) VALUES (new.id, new.user_id, new.content);
END",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_nodes (
id TEXT PRIMARY KEY,
uri TEXT NOT NULL UNIQUE,
parent_uri TEXT,
node_type TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
metadata TEXT
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_nodes_uri ON memory_nodes(uri)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_nodes_parent ON memory_nodes(parent_uri)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_layers (
id TEXT PRIMARY KEY,
node_id TEXT NOT NULL,
layer_type TEXT NOT NULL,
content TEXT NOT NULL,
token_count INTEGER NOT NULL,
embedding_id TEXT,
created_at TEXT NOT NULL,
source_chunk_id TEXT,
FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_layers_node ON memory_layers(node_id)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memory_layers_type ON memory_layers(layer_type)",
[],
)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS memory_retrieval_state (
node_id TEXT PRIMARY KEY,
active_layer TEXT NOT NULL DEFAULT 'L0',
last_accessed TEXT,
access_count INTEGER DEFAULT 0,
FOREIGN KEY (node_id) REFERENCES memory_nodes(id)
)",
[],
)?;
Ok(())
}
pub async fn validate_vector_tables(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let probe_embedding = format!("[{}]", vec!["0.0"; DEFAULT_EMBEDDING_DIMENSION].join(","));
for table in [
"session_memory_vectors",
"project_memory_vectors",
"global_memory_vectors",
] {
let sql = format!("SELECT COUNT(*) FROM {}", table);
let row_count: i64 = conn.query_row(&sql, [], |row| row.get(0))?;
if row_count > 0 {
let probe_sql = format!(
"SELECT chunk_id, distance
FROM {}
WHERE embedding MATCH ?1 AND k = 1",
table
);
let mut stmt = conn.prepare(&probe_sql)?;
let mut rows = stmt.query(params![probe_embedding.as_str()])?;
let _ = rows.next()?;
}
}
Ok(())
}
fn is_vector_table_error(err: &rusqlite::Error) -> bool {
let text = err.to_string().to_lowercase();
text.contains("vector blob")
|| text.contains("chunks iter error")
|| text.contains("chunks iter")
|| text.contains("internal sqlite-vec error")
|| text.contains("insert rowids id")
|| text.contains("sql logic error")
|| text.contains("database disk image is malformed")
|| text.contains("session_memory_vectors")
|| text.contains("project_memory_vectors")
|| text.contains("global_memory_vectors")
|| text.contains("vec0")
}
async fn recreate_vector_tables(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
for base in [
"session_memory_vectors",
"project_memory_vectors",
"global_memory_vectors",
] {
for name in [
base.to_string(),
format!("{}_chunks", base),
format!("{}_info", base),
format!("{}_rowids", base),
format!("{}_vector_chunks00", base),
] {
let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
conn.execute(&sql, [])?;
}
let like_pattern = format!("{base}_%");
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ?1 ORDER BY name",
)?;
let table_names = stmt
.query_map(params![like_pattern], |row| row.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
drop(stmt);
for name in table_names {
let sql = format!("DROP TABLE IF EXISTS \"{}\"", name.replace('"', "\"\""));
conn.execute(&sql, [])?;
}
}
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS session_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS project_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
conn.execute(
&format!(
"CREATE VIRTUAL TABLE IF NOT EXISTS global_memory_vectors USING vec0(
chunk_id TEXT PRIMARY KEY,
embedding float[{}]
)",
DEFAULT_EMBEDDING_DIMENSION
),
[],
)?;
Ok(())
}
pub async fn ensure_vector_tables_healthy(&self) -> MemoryResult<bool> {
match self.validate_vector_tables().await {
Ok(()) => Ok(false),
Err(crate::types::MemoryError::Database(err)) if Self::is_vector_table_error(&err) => {
tracing::warn!(
"Memory vector tables appear corrupted ({}). Recreating vector tables.",
err
);
self.recreate_vector_tables().await?;
Ok(true)
}
Err(err) => Err(err),
}
}
pub async fn reset_all_memory_tables(&self) -> MemoryResult<()> {
let table_names = {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT name FROM sqlite_master
WHERE type='table'
AND name NOT LIKE 'sqlite_%'
ORDER BY name",
)?;
let names = stmt
.query_map([], |row| row.get::<_, String>(0))?
.collect::<Result<Vec<_>, _>>()?;
names
};
{
let conn = self.conn.lock().await;
for table in table_names {
let sql = format!("DROP TABLE IF EXISTS \"{}\"", table.replace('"', "\"\""));
let _ = conn.execute(&sql, []);
}
}
self.init_schema().await
}
pub async fn try_repair_after_error(
&self,
err: &crate::types::MemoryError,
) -> MemoryResult<bool> {
match err {
crate::types::MemoryError::Database(db_err) if Self::is_vector_table_error(db_err) => {
tracing::warn!(
"Memory write/read hit vector DB error ({}). Recreating vector tables immediately.",
db_err
);
self.recreate_vector_tables().await?;
Ok(true)
}
_ => Ok(false),
}
}
pub async fn store_chunk(&self, chunk: &MemoryChunk, embedding: &[f32]) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let (chunks_table, vectors_table) = match chunk.tier {
MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
};
let created_at_str = chunk.created_at.to_rfc3339();
let metadata_str = chunk
.metadata
.as_ref()
.map(|m| m.to_string())
.unwrap_or_default();
match chunk.tier {
MemoryTier::Session => {
conn.execute(
&format!(
"INSERT INTO {} (
id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
chunks_table
),
params![
chunk.id,
chunk.content,
chunk.session_id.as_ref().unwrap_or(&String::new()),
chunk.project_id,
chunk.source,
created_at_str,
chunk.token_count,
metadata_str,
chunk.source_path.clone(),
chunk.source_mtime,
chunk.source_size,
chunk.source_hash.clone(),
chunk.tenant_scope.org_id.as_str(),
chunk.tenant_scope.workspace_id.as_str(),
chunk.tenant_scope.deployment_id.as_deref()
],
)?;
}
MemoryTier::Project => {
conn.execute(
&format!(
"INSERT INTO {} (
id, content, project_id, session_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)",
chunks_table
),
params![
chunk.id,
chunk.content,
chunk.project_id.as_ref().unwrap_or(&String::new()),
chunk.session_id,
chunk.source,
created_at_str,
chunk.token_count,
metadata_str,
chunk.source_path.clone(),
chunk.source_mtime,
chunk.source_size,
chunk.source_hash.clone(),
chunk.tenant_scope.org_id.as_str(),
chunk.tenant_scope.workspace_id.as_str(),
chunk.tenant_scope.deployment_id.as_deref()
],
)?;
}
MemoryTier::Global => {
conn.execute(
&format!(
"INSERT INTO {} (
id, content, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)",
chunks_table
),
params![
chunk.id,
chunk.content,
chunk.source,
created_at_str,
chunk.token_count,
metadata_str,
chunk.source_path.clone(),
chunk.source_mtime,
chunk.source_size,
chunk.source_hash.clone(),
chunk.tenant_scope.org_id.as_str(),
chunk.tenant_scope.workspace_id.as_str(),
chunk.tenant_scope.deployment_id.as_deref()
],
)?;
}
}
let embedding_json = format!(
"[{}]",
embedding
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(",")
);
conn.execute(
&format!(
"INSERT INTO {} (chunk_id, embedding) VALUES (?1, ?2)",
vectors_table
),
params![chunk.id, embedding_json],
)?;
Ok(())
}
pub async fn search_similar(
&self,
query_embedding: &[f32],
tier: MemoryTier,
project_id: Option<&str>,
session_id: Option<&str>,
limit: i64,
) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
self.search_similar_for_tenant(
query_embedding,
tier,
project_id,
session_id,
&MemoryTenantScope::local(),
limit,
)
.await
}
pub async fn search_similar_for_tenant(
&self,
query_embedding: &[f32],
tier: MemoryTier,
project_id: Option<&str>,
session_id: Option<&str>,
tenant_scope: &MemoryTenantScope,
limit: i64,
) -> MemoryResult<Vec<(MemoryChunk, f64)>> {
let conn = self.conn.lock().await;
let (chunks_table, vectors_table) = match tier {
MemoryTier::Session => ("session_memory_chunks", "session_memory_vectors"),
MemoryTier::Project => ("project_memory_chunks", "project_memory_vectors"),
MemoryTier::Global => ("global_memory_chunks", "global_memory_vectors"),
};
let embedding_json = format!(
"[{}]",
query_embedding
.iter()
.map(|f| f.to_string())
.collect::<Vec<_>>()
.join(",")
);
let results = match tier {
MemoryTier::Session => {
if let Some(sid) = session_id {
let tenant_clause = tenant_scope_matches_sql_clause("c", 2);
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
c.tenant_org_id, c.tenant_workspace_id, c.tenant_deployment_id,
vec_distance_cosine(v.embedding, ?5) AS distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE c.session_id = ?1 AND {}
ORDER BY distance
LIMIT ?6",
vectors_table, chunks_table, tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(
params![
sid,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref(),
embedding_json,
limit
],
|row| Ok((row_to_chunk(row, tier)?, row.get::<_, f64>("distance")?)),
)?
.collect::<Result<Vec<_>, _>>()?;
results
} else if let Some(pid) = project_id {
let tenant_clause = tenant_scope_matches_sql_clause("c", 2);
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
c.tenant_org_id, c.tenant_workspace_id, c.tenant_deployment_id,
vec_distance_cosine(v.embedding, ?5) AS distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE c.project_id = ?1 AND {}
ORDER BY distance
LIMIT ?6",
vectors_table, chunks_table, tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(
params![
pid,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref(),
embedding_json,
limit
],
|row| Ok((row_to_chunk(row, tier)?, row.get::<_, f64>("distance")?)),
)?
.collect::<Result<Vec<_>, _>>()?;
results
} else {
let tenant_clause = tenant_scope_matches_sql_clause("c", 1);
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
c.tenant_org_id, c.tenant_workspace_id, c.tenant_deployment_id,
vec_distance_cosine(v.embedding, ?4) AS distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE {}
ORDER BY distance
LIMIT ?5",
vectors_table, chunks_table, tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref(),
embedding_json,
limit
],
|row| Ok((row_to_chunk(row, tier)?, row.get::<_, f64>("distance")?)),
)?
.collect::<Result<Vec<_>, _>>()?;
results
}
}
MemoryTier::Project => {
if let Some(pid) = project_id {
let tenant_clause = tenant_scope_matches_sql_clause("c", 2);
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
c.tenant_org_id, c.tenant_workspace_id, c.tenant_deployment_id,
vec_distance_cosine(v.embedding, ?5) AS distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE c.project_id = ?1 AND {}
ORDER BY distance
LIMIT ?6",
vectors_table, chunks_table, tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(
params![
pid,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref(),
embedding_json,
limit
],
|row| Ok((row_to_chunk(row, tier)?, row.get::<_, f64>("distance")?)),
)?
.collect::<Result<Vec<_>, _>>()?;
results
} else {
let tenant_clause = tenant_scope_matches_sql_clause("c", 1);
let sql = format!(
"SELECT c.id, c.content, c.session_id, c.project_id, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
c.tenant_org_id, c.tenant_workspace_id, c.tenant_deployment_id,
vec_distance_cosine(v.embedding, ?4) AS distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE {}
ORDER BY distance
LIMIT ?5",
vectors_table, chunks_table, tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref(),
embedding_json,
limit
],
|row| Ok((row_to_chunk(row, tier)?, row.get::<_, f64>("distance")?)),
)?
.collect::<Result<Vec<_>, _>>()?;
results
}
}
MemoryTier::Global => {
let tenant_clause = tenant_scope_matches_sql_clause("c", 1);
let sql = format!(
"SELECT c.id, c.content, c.source, c.created_at, c.token_count, c.metadata,
c.source_path, c.source_mtime, c.source_size, c.source_hash,
c.tenant_org_id, c.tenant_workspace_id, c.tenant_deployment_id,
vec_distance_cosine(v.embedding, ?4) AS distance
FROM {} AS v
JOIN {} AS c ON v.chunk_id = c.id
WHERE {}
ORDER BY distance
LIMIT ?5",
vectors_table, chunks_table, tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let results = stmt
.query_map(
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref(),
embedding_json,
limit
],
|row| Ok((row_to_chunk(row, tier)?, row.get::<_, f64>("distance")?)),
)?
.collect::<Result<Vec<_>, _>>()?;
results
}
};
Ok(results)
}
pub async fn get_session_chunks(&self, session_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
FROM session_memory_chunks
WHERE session_id = ?1
ORDER BY created_at DESC",
)?;
let chunks = stmt
.query_map(params![session_id], |row| {
row_to_chunk(row, MemoryTier::Session)
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn get_session_chunks_for_tenant(
&self,
session_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let tenant_clause = tenant_scope_matches_sql_clause("session_memory_chunks", 2);
let sql = format!(
"SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
FROM session_memory_chunks
WHERE session_id = ?1 AND {}
ORDER BY created_at DESC",
tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let chunks = stmt
.query_map(
params![
session_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row_to_chunk(row, MemoryTier::Session),
)?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn get_project_chunks(&self, project_id: &str) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
FROM project_memory_chunks
WHERE project_id = ?1
ORDER BY created_at DESC",
)?;
let chunks = stmt
.query_map(params![project_id], |row| {
row_to_chunk(row, MemoryTier::Project)
})?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn get_project_chunks_for_tenant(
&self,
project_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let tenant_clause = tenant_scope_matches_sql_clause("project_memory_chunks", 2);
let sql = format!(
"SELECT id, content, session_id, project_id, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
FROM project_memory_chunks
WHERE project_id = ?1 AND {}
ORDER BY created_at DESC",
tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let chunks = stmt
.query_map(
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row_to_chunk(row, MemoryTier::Project),
)?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn get_global_chunks(&self, limit: i64) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let mut stmt = conn.prepare(
"SELECT id, content, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
FROM global_memory_chunks
ORDER BY created_at DESC
LIMIT ?1",
)?;
let chunks = stmt
.query_map(params![limit], |row| row_to_chunk(row, MemoryTier::Global))?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn get_global_chunks_for_tenant(
&self,
tenant_scope: &MemoryTenantScope,
limit: i64,
) -> MemoryResult<Vec<MemoryChunk>> {
let conn = self.conn.lock().await;
let tenant_clause = tenant_scope_matches_sql_clause("global_memory_chunks", 1);
let sql = format!(
"SELECT id, content, source, created_at, token_count, metadata,
source_path, source_mtime, source_size, source_hash,
tenant_org_id, tenant_workspace_id, tenant_deployment_id
FROM global_memory_chunks
WHERE {}
ORDER BY created_at DESC
LIMIT ?4",
tenant_clause
);
let mut stmt = conn.prepare(&sql)?;
let chunks = stmt
.query_map(
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref(),
limit
],
|row| row_to_chunk(row, MemoryTier::Global),
)?
.collect::<Result<Vec<_>, _>>()?;
Ok(chunks)
}
pub async fn global_chunk_exists_by_source_hash(
&self,
source_hash: &str,
) -> MemoryResult<bool> {
self.global_chunk_exists_by_source_hash_for_tenant(source_hash, &MemoryTenantScope::local())
.await
}
pub async fn global_chunk_exists_by_source_hash_for_tenant(
&self,
source_hash: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let tenant_clause = tenant_scope_matches_sql_clause("global_memory_chunks", 2);
let sql = format!(
"SELECT 1 FROM global_memory_chunks
WHERE source_hash = ?1 AND {}
LIMIT 1",
tenant_clause
);
let exists = conn
.query_row(
&sql,
params![
source_hash,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|_row| Ok(()),
)
.optional()?
.is_some();
Ok(exists)
}
pub async fn clear_session_memory(&self, session_id: &str) -> MemoryResult<u64> {
self.clear_session_memory_for_tenant(session_id, &MemoryTenantScope::local())
.await
}
pub async fn clear_session_memory_for_tenant(
&self,
session_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM session_memory_chunks
WHERE session_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
session_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM session_memory_vectors WHERE chunk_id IN
(SELECT id FROM session_memory_chunks
WHERE session_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, ''))",
params![
session_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
conn.execute(
"DELETE FROM session_memory_chunks
WHERE session_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
session_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
Ok(count as u64)
}
pub async fn clear_project_memory(&self, project_id: &str) -> MemoryResult<u64> {
self.clear_project_memory_for_tenant(project_id, &MemoryTenantScope::local())
.await
}
pub async fn clear_project_memory_for_tenant(
&self,
project_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks
WHERE project_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM project_memory_vectors WHERE chunk_id IN
(SELECT id FROM project_memory_chunks
WHERE project_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, ''))",
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
conn.execute(
"DELETE FROM project_memory_chunks
WHERE project_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
Ok(count as u64)
}
pub async fn clear_global_memory_by_source_prefix(
&self,
source_prefix: &str,
) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let like = format!("{}%", source_prefix);
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM global_memory_chunks WHERE source LIKE ?1",
params![like],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM global_memory_vectors WHERE chunk_id IN
(SELECT id FROM global_memory_chunks WHERE source LIKE ?1)",
params![like],
)?;
conn.execute(
"DELETE FROM global_memory_chunks WHERE source LIKE ?1",
params![like],
)?;
Ok(count as u64)
}
pub async fn delete_chunk(
&self,
tier: MemoryTier,
chunk_id: &str,
project_id: Option<&str>,
session_id: Option<&str>,
) -> MemoryResult<u64> {
self.delete_chunk_for_tenant(
tier,
chunk_id,
project_id,
session_id,
&MemoryTenantScope::local(),
)
.await
}
pub async fn delete_chunk_for_tenant(
&self,
tier: MemoryTier,
chunk_id: &str,
project_id: Option<&str>,
session_id: Option<&str>,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let deleted = match tier {
MemoryTier::Session => {
let Some(session_id) = session_id else {
return Err(MemoryError::InvalidConfig(
"session_id is required to delete session memory chunks".to_string(),
));
};
conn.execute(
"DELETE FROM session_memory_vectors WHERE chunk_id IN
(SELECT id FROM session_memory_chunks
WHERE id = ?1 AND session_id = ?2
AND tenant_org_id = ?3
AND tenant_workspace_id = ?4
AND IFNULL(tenant_deployment_id, '') = IFNULL(?5, ''))",
params![
chunk_id,
session_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
conn.execute(
"DELETE FROM session_memory_chunks
WHERE id = ?1 AND session_id = ?2
AND tenant_org_id = ?3
AND tenant_workspace_id = ?4
AND IFNULL(tenant_deployment_id, '') = IFNULL(?5, '')",
params![
chunk_id,
session_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?
}
MemoryTier::Project => {
let Some(project_id) = project_id else {
return Err(MemoryError::InvalidConfig(
"project_id is required to delete project memory chunks".to_string(),
));
};
conn.execute(
"DELETE FROM project_memory_vectors WHERE chunk_id IN
(SELECT id FROM project_memory_chunks
WHERE id = ?1 AND project_id = ?2
AND tenant_org_id = ?3
AND tenant_workspace_id = ?4
AND IFNULL(tenant_deployment_id, '') = IFNULL(?5, ''))",
params![
chunk_id,
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
conn.execute(
"DELETE FROM project_memory_chunks
WHERE id = ?1 AND project_id = ?2
AND tenant_org_id = ?3
AND tenant_workspace_id = ?4
AND IFNULL(tenant_deployment_id, '') = IFNULL(?5, '')",
params![
chunk_id,
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?
}
MemoryTier::Global => {
conn.execute(
"DELETE FROM global_memory_vectors WHERE chunk_id IN
(SELECT id FROM global_memory_chunks
WHERE id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, ''))",
params![
chunk_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
conn.execute(
"DELETE FROM global_memory_chunks
WHERE id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
chunk_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?
}
};
Ok(deleted as u64)
}
pub async fn cleanup_old_sessions(&self, retention_days: i64) -> MemoryResult<u64> {
self.cleanup_old_sessions_for_tenant(retention_days, &MemoryTenantScope::local())
.await
}
pub async fn cleanup_old_sessions_for_tenant(
&self,
retention_days: i64,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<u64> {
let conn = self.conn.lock().await;
let cutoff = Utc::now() - chrono::Duration::days(retention_days);
let cutoff_str = cutoff.to_rfc3339();
let count: i64 = conn.query_row(
"SELECT COUNT(*) FROM session_memory_chunks
WHERE created_at < ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
cutoff_str,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
conn.execute(
"DELETE FROM session_memory_vectors WHERE chunk_id IN
(SELECT id FROM session_memory_chunks
WHERE created_at < ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, ''))",
params![
cutoff_str,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
conn.execute(
"DELETE FROM session_memory_chunks
WHERE created_at < ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
cutoff_str,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
Ok(count as u64)
}
pub async fn get_or_create_config(&self, project_id: &str) -> MemoryResult<MemoryConfig> {
self.get_or_create_config_for_tenant(project_id, &MemoryTenantScope::local())
.await
}
pub async fn get_or_create_config_for_tenant(
&self,
project_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<MemoryConfig> {
let conn = self.conn.lock().await;
let result: Option<MemoryConfig> = conn
.query_row(
"SELECT max_chunks, chunk_size, retrieval_k, auto_cleanup,
session_retention_days, token_budget, chunk_overlap
FROM memory_config
WHERE project_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| {
Ok(MemoryConfig {
max_chunks: row.get(0)?,
chunk_size: row.get(1)?,
retrieval_k: row.get(2)?,
auto_cleanup: row.get::<_, i64>(3)? != 0,
session_retention_days: row.get(4)?,
token_budget: row.get(5)?,
chunk_overlap: row.get(6)?,
})
},
)
.optional()?;
match result {
Some(config) => Ok(config),
None => {
let config = MemoryConfig::default();
let updated_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memory_config
(tenant_org_id, tenant_workspace_id, tenant_deployment_id,
project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
session_retention_days, token_budget, chunk_overlap, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref().unwrap_or(""),
project_id,
config.max_chunks,
config.chunk_size,
config.retrieval_k,
config.auto_cleanup as i64,
config.session_retention_days,
config.token_budget,
config.chunk_overlap,
updated_at
],
)?;
Ok(config)
}
}
}
pub async fn update_config(&self, project_id: &str, config: &MemoryConfig) -> MemoryResult<()> {
self.update_config_for_tenant(project_id, config, &MemoryTenantScope::local())
.await
}
pub async fn update_config_for_tenant(
&self,
project_id: &str,
config: &MemoryConfig,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let updated_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memory_config
(tenant_org_id, tenant_workspace_id, tenant_deployment_id,
project_id, max_chunks, chunk_size, retrieval_k, auto_cleanup,
session_retention_days, token_budget, chunk_overlap, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
ON CONFLICT(tenant_org_id, tenant_workspace_id, tenant_deployment_id, project_id)
DO UPDATE SET
max_chunks = excluded.max_chunks,
chunk_size = excluded.chunk_size,
retrieval_k = excluded.retrieval_k,
auto_cleanup = excluded.auto_cleanup,
session_retention_days = excluded.session_retention_days,
token_budget = excluded.token_budget,
chunk_overlap = excluded.chunk_overlap,
updated_at = excluded.updated_at",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref().unwrap_or(""),
project_id,
config.max_chunks,
config.chunk_size,
config.retrieval_k,
config.auto_cleanup as i64,
config.session_retention_days,
config.token_budget,
config.chunk_overlap,
updated_at
],
)?;
Ok(())
}
pub async fn upsert_knowledge_space(&self, space: &KnowledgeSpaceRecord) -> MemoryResult<()> {
self.upsert_knowledge_space_for_tenant(space, &MemoryTenantScope::local())
.await
}
pub async fn upsert_knowledge_space_for_tenant(
&self,
space: &KnowledgeSpaceRecord,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
conn.execute(
"INSERT INTO knowledge_spaces
(id, tenant_org_id, tenant_workspace_id, tenant_deployment_id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
ON CONFLICT(id) DO UPDATE SET
tenant_org_id = excluded.tenant_org_id,
tenant_workspace_id = excluded.tenant_workspace_id,
tenant_deployment_id = excluded.tenant_deployment_id,
scope = excluded.scope,
project_id = excluded.project_id,
namespace = excluded.namespace,
title = excluded.title,
description = excluded.description,
trust_level = excluded.trust_level,
metadata = excluded.metadata,
created_at_ms = excluded.created_at_ms,
updated_at_ms = excluded.updated_at_ms",
params![
space.id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref().unwrap_or(""),
space.scope.to_string(),
space.project_id,
space.namespace,
space.title,
space.description,
space.trust_level.to_string(),
space.metadata.as_ref().map(|value| value.to_string()),
space.created_at_ms as i64,
space.updated_at_ms as i64,
],
)?;
Ok(())
}
pub async fn get_knowledge_space(
&self,
id: &str,
) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
self.get_knowledge_space_for_tenant(id, &MemoryTenantScope::local())
.await
}
pub async fn get_knowledge_space_for_tenant(
&self,
id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Option<KnowledgeSpaceRecord>> {
let conn = self.conn.lock().await;
Ok(
conn.query_row(
"SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
FROM knowledge_spaces
WHERE id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_space,
)
.optional()?,
)
}
pub async fn list_knowledge_spaces(
&self,
project_id: Option<&str>,
) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
self.list_knowledge_spaces_for_tenant(project_id, &MemoryTenantScope::local())
.await
}
pub async fn list_knowledge_spaces_for_tenant(
&self,
project_id: Option<&str>,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Vec<KnowledgeSpaceRecord>> {
let conn = self.conn.lock().await;
let mut stmt = if project_id.is_some() {
conn.prepare(
"SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
FROM knowledge_spaces
WHERE project_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')
ORDER BY updated_at_ms DESC",
)?
} else {
conn.prepare(
"SELECT id, scope, project_id, namespace, title, description, trust_level, metadata, created_at_ms, updated_at_ms
FROM knowledge_spaces
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')
ORDER BY updated_at_ms DESC",
)?
};
let rows = if let Some(project_id) = project_id {
stmt.query_map(
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_space,
)?
} else {
stmt.query_map(
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_space,
)?
};
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub async fn upsert_knowledge_item(&self, item: &KnowledgeItemRecord) -> MemoryResult<()> {
self.upsert_knowledge_item_for_tenant(item, &MemoryTenantScope::local())
.await
}
pub async fn upsert_knowledge_item_for_tenant(
&self,
item: &KnowledgeItemRecord,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<()> {
if self
.get_knowledge_space_for_tenant(&item.space_id, tenant_scope)
.await?
.is_none()
{
return Err(crate::types::MemoryError::InvalidConfig(
"knowledge item space not found for tenant".to_string(),
));
}
let conn = self.conn.lock().await;
conn.execute(
"INSERT OR REPLACE INTO knowledge_items
(id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
params![
item.id,
item.space_id,
item.coverage_key,
item.dedupe_key,
item.item_type,
item.title,
item.summary,
item.payload.to_string(),
item.trust_level.to_string(),
item.status.to_string(),
item.run_id,
serde_json::to_string(&item.artifact_refs)?,
serde_json::to_string(&item.source_memory_ids)?,
item.freshness_expires_at_ms.map(|value| value as i64),
item.metadata.as_ref().map(|value| value.to_string()),
item.created_at_ms as i64,
item.updated_at_ms as i64,
],
)?;
Ok(())
}
pub async fn list_knowledge_items(
&self,
space_id: &str,
coverage_key: Option<&str>,
) -> MemoryResult<Vec<KnowledgeItemRecord>> {
self.list_knowledge_items_for_tenant(space_id, coverage_key, &MemoryTenantScope::local())
.await
}
pub async fn list_knowledge_items_for_tenant(
&self,
space_id: &str,
coverage_key: Option<&str>,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Vec<KnowledgeItemRecord>> {
let conn = self.conn.lock().await;
let mut stmt = if coverage_key.is_some() {
conn.prepare(
"SELECT i.id, i.space_id, i.coverage_key, i.dedupe_key, i.item_type, i.title, i.summary, i.payload, i.trust_level, i.status, i.run_id, i.artifact_refs, i.source_memory_ids, i.freshness_expires_at_ms, i.metadata, i.created_at_ms, i.updated_at_ms
FROM knowledge_items i
JOIN knowledge_spaces s ON s.id = i.space_id
WHERE i.space_id = ?1 AND i.coverage_key = ?2
AND s.tenant_org_id = ?3
AND s.tenant_workspace_id = ?4
AND IFNULL(s.tenant_deployment_id, '') = IFNULL(?5, '')
ORDER BY i.created_at_ms DESC",
)?
} else {
conn.prepare(
"SELECT i.id, i.space_id, i.coverage_key, i.dedupe_key, i.item_type, i.title, i.summary, i.payload, i.trust_level, i.status, i.run_id, i.artifact_refs, i.source_memory_ids, i.freshness_expires_at_ms, i.metadata, i.created_at_ms, i.updated_at_ms
FROM knowledge_items i
JOIN knowledge_spaces s ON s.id = i.space_id
WHERE i.space_id = ?1
AND s.tenant_org_id = ?2
AND s.tenant_workspace_id = ?3
AND IFNULL(s.tenant_deployment_id, '') = IFNULL(?4, '')
ORDER BY i.created_at_ms DESC",
)?
};
let rows = if let Some(coverage_key) = coverage_key {
stmt.query_map(
params![
space_id,
coverage_key,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_item,
)?
} else {
stmt.query_map(
params![
space_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_item,
)?
};
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
}
pub async fn get_knowledge_item(&self, id: &str) -> MemoryResult<Option<KnowledgeItemRecord>> {
self.get_knowledge_item_for_tenant(id, &MemoryTenantScope::local())
.await
}
pub async fn get_knowledge_item_for_tenant(
&self,
id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Option<KnowledgeItemRecord>> {
let conn = self.conn.lock().await;
Ok(
conn.query_row(
"SELECT i.id, i.space_id, i.coverage_key, i.dedupe_key, i.item_type, i.title, i.summary, i.payload, i.trust_level, i.status, i.run_id, i.artifact_refs, i.source_memory_ids, i.freshness_expires_at_ms, i.metadata, i.created_at_ms, i.updated_at_ms
FROM knowledge_items i
JOIN knowledge_spaces s ON s.id = i.space_id
WHERE i.id = ?1
AND s.tenant_org_id = ?2
AND s.tenant_workspace_id = ?3
AND IFNULL(s.tenant_deployment_id, '') = IFNULL(?4, '')",
params![
id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_item,
)
.optional()?,
)
}
pub async fn promote_knowledge_item(
&self,
request: &KnowledgePromotionRequest,
) -> MemoryResult<Option<KnowledgePromotionResult>> {
self.promote_knowledge_item_for_tenant(request, &MemoryTenantScope::local())
.await
}
pub async fn promote_knowledge_item_for_tenant(
&self,
request: &KnowledgePromotionRequest,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Option<KnowledgePromotionResult>> {
let mut conn = self.conn.lock().await;
let tx = conn.transaction()?;
let Some(mut item) = tx
.query_row(
"SELECT i.id, i.space_id, i.coverage_key, i.dedupe_key, i.item_type, i.title, i.summary, i.payload, i.trust_level, i.status, i.run_id, i.artifact_refs, i.source_memory_ids, i.freshness_expires_at_ms, i.metadata, i.created_at_ms, i.updated_at_ms
FROM knowledge_items i
JOIN knowledge_spaces s ON s.id = i.space_id
WHERE i.id = ?1
AND s.tenant_org_id = ?2
AND s.tenant_workspace_id = ?3
AND IFNULL(s.tenant_deployment_id, '') = IFNULL(?4, '')",
params![
request.item_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_item,
)
.optional()? else {
return Ok(None);
};
let previous_status = item.status;
let previous_trust_level = item.trust_level;
if previous_status == KnowledgeItemStatus::Deprecated
&& request.target_status != KnowledgeItemStatus::Deprecated
{
return Err(crate::types::MemoryError::InvalidConfig(
"cannot promote a deprecated knowledge item".to_string(),
));
}
let next_status = request.target_status;
match (previous_status, next_status) {
(KnowledgeItemStatus::Working, KnowledgeItemStatus::Promoted)
| (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Promoted)
| (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::ApprovedDefault)
| (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::ApprovedDefault)
| (KnowledgeItemStatus::Working, KnowledgeItemStatus::Deprecated)
| (KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Deprecated)
| (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Deprecated) => {}
(KnowledgeItemStatus::Working, KnowledgeItemStatus::ApprovedDefault) => {
return Err(crate::types::MemoryError::InvalidConfig(
"approved_default requires an intermediate promoted item".to_string(),
));
}
(KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Promoted) => {
return Err(crate::types::MemoryError::InvalidConfig(
"approved_default items do not downgrade back to promoted".to_string(),
));
}
(KnowledgeItemStatus::Promoted, KnowledgeItemStatus::Working)
| (KnowledgeItemStatus::ApprovedDefault, KnowledgeItemStatus::Working) => {
return Err(crate::types::MemoryError::InvalidConfig(
"knowledge items cannot be demoted back to working".to_string(),
));
}
(KnowledgeItemStatus::Working, KnowledgeItemStatus::Working) => {}
(KnowledgeItemStatus::Deprecated, _) => {}
}
if next_status == KnowledgeItemStatus::ApprovedDefault
&& (request.reviewer_id.is_none() || request.approval_id.is_none())
{
return Err(crate::types::MemoryError::InvalidConfig(
"approved_default promotion requires reviewer_id and approval_id".to_string(),
));
}
let promoted = next_status.is_active()
&& (previous_status != next_status || request.freshness_expires_at_ms.is_some());
let mut metadata_obj = item
.metadata
.clone()
.and_then(|value| value.as_object().cloned())
.unwrap_or_default();
metadata_obj.insert(
"promotion".to_string(),
serde_json::json!({
"from_status": previous_status.to_string(),
"to_status": next_status.to_string(),
"promoted_at_ms": request.promoted_at_ms,
"reason": request.reason,
"reviewer_id": request.reviewer_id,
"approval_id": request.approval_id,
"freshness_expires_at_ms": request.freshness_expires_at_ms,
}),
);
item.status = next_status;
if let Some(next_trust) = next_status.as_trust_level() {
item.trust_level = next_trust;
}
if let Some(freshness_expires_at_ms) = request.freshness_expires_at_ms {
item.freshness_expires_at_ms = Some(freshness_expires_at_ms);
}
item.metadata = Some(serde_json::Value::Object(metadata_obj));
item.updated_at_ms = request.promoted_at_ms;
let persisted_item = item.clone();
let item_id = persisted_item.id.clone();
let space_id = persisted_item.space_id.clone();
let coverage_key = persisted_item.coverage_key.clone();
let dedupe_key = persisted_item.dedupe_key.clone();
tx.execute(
"INSERT OR REPLACE INTO knowledge_items
(id, space_id, coverage_key, dedupe_key, item_type, title, summary, payload, trust_level, status, run_id, artifact_refs, source_memory_ids, freshness_expires_at_ms, metadata, created_at_ms, updated_at_ms)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17)",
params![
persisted_item.id,
persisted_item.space_id,
persisted_item.coverage_key,
persisted_item.dedupe_key,
persisted_item.item_type,
persisted_item.title,
persisted_item.summary,
persisted_item.payload.to_string(),
persisted_item.trust_level.to_string(),
persisted_item.status.to_string(),
persisted_item.run_id,
serde_json::to_string(&persisted_item.artifact_refs)?,
serde_json::to_string(&persisted_item.source_memory_ids)?,
persisted_item.freshness_expires_at_ms.map(|value| value as i64),
persisted_item.metadata.as_ref().map(|value| value.to_string()),
persisted_item.created_at_ms as i64,
persisted_item.updated_at_ms as i64,
],
)?;
let mut coverage = tx
.query_row(
"SELECT coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata
FROM knowledge_coverage WHERE coverage_key = ?1 AND space_id = ?2",
params![coverage_key.as_str(), space_id.as_str()],
row_to_knowledge_coverage,
)
.optional()?
.unwrap_or(KnowledgeCoverageRecord {
coverage_key: coverage_key.clone(),
space_id: space_id.clone(),
latest_item_id: None,
latest_dedupe_key: None,
last_seen_at_ms: request.promoted_at_ms,
last_promoted_at_ms: None,
freshness_expires_at_ms: None,
metadata: None,
});
coverage.latest_item_id = Some(item_id.clone());
coverage.latest_dedupe_key = Some(dedupe_key.clone());
coverage.last_seen_at_ms = request.promoted_at_ms;
if next_status.is_active() {
coverage.last_promoted_at_ms = Some(request.promoted_at_ms);
}
if let Some(freshness_expires_at_ms) = request.freshness_expires_at_ms {
coverage.freshness_expires_at_ms = Some(freshness_expires_at_ms);
}
let mut coverage_metadata = coverage
.metadata
.clone()
.and_then(|value| value.as_object().cloned())
.unwrap_or_default();
coverage_metadata.insert(
"promotion".to_string(),
serde_json::json!({
"item_id": item_id,
"from_status": previous_status.to_string(),
"to_status": next_status.to_string(),
"promoted_at_ms": request.promoted_at_ms,
"reason": request.reason,
"reviewer_id": request.reviewer_id,
"approval_id": request.approval_id,
}),
);
coverage.metadata = Some(serde_json::Value::Object(coverage_metadata));
tx.execute(
"INSERT OR REPLACE INTO knowledge_coverage
(coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
coverage.coverage_key,
coverage.space_id,
coverage.latest_item_id,
coverage.latest_dedupe_key,
coverage.last_seen_at_ms as i64,
coverage.last_promoted_at_ms.map(|value| value as i64),
coverage.freshness_expires_at_ms.map(|value| value as i64),
coverage.metadata.as_ref().map(|value| value.to_string()),
],
)?;
tx.commit()?;
Ok(Some(KnowledgePromotionResult {
previous_status,
previous_trust_level,
promoted,
item: persisted_item,
coverage,
}))
}
pub async fn upsert_knowledge_coverage(
&self,
coverage: &KnowledgeCoverageRecord,
) -> MemoryResult<()> {
self.upsert_knowledge_coverage_for_tenant(coverage, &MemoryTenantScope::local())
.await
}
pub async fn upsert_knowledge_coverage_for_tenant(
&self,
coverage: &KnowledgeCoverageRecord,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<()> {
if self
.get_knowledge_space_for_tenant(&coverage.space_id, tenant_scope)
.await?
.is_none()
{
return Err(crate::types::MemoryError::InvalidConfig(
"knowledge coverage space not found for tenant".to_string(),
));
}
let conn = self.conn.lock().await;
conn.execute(
"INSERT OR REPLACE INTO knowledge_coverage
(coverage_key, space_id, latest_item_id, latest_dedupe_key, last_seen_at_ms, last_promoted_at_ms, freshness_expires_at_ms, metadata)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
params![
coverage.coverage_key,
coverage.space_id,
coverage.latest_item_id,
coverage.latest_dedupe_key,
coverage.last_seen_at_ms as i64,
coverage.last_promoted_at_ms.map(|value| value as i64),
coverage.freshness_expires_at_ms.map(|value| value as i64),
coverage.metadata.as_ref().map(|value| value.to_string()),
],
)?;
Ok(())
}
pub async fn get_knowledge_coverage(
&self,
coverage_key: &str,
space_id: &str,
) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
self.get_knowledge_coverage_for_tenant(coverage_key, space_id, &MemoryTenantScope::local())
.await
}
pub async fn get_knowledge_coverage_for_tenant(
&self,
coverage_key: &str,
space_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Option<KnowledgeCoverageRecord>> {
let conn = self.conn.lock().await;
Ok(
conn.query_row(
"SELECT c.coverage_key, c.space_id, c.latest_item_id, c.latest_dedupe_key, c.last_seen_at_ms, c.last_promoted_at_ms, c.freshness_expires_at_ms, c.metadata
FROM knowledge_coverage c
JOIN knowledge_spaces s ON s.id = c.space_id
WHERE c.coverage_key = ?1 AND c.space_id = ?2
AND s.tenant_org_id = ?3
AND s.tenant_workspace_id = ?4
AND IFNULL(s.tenant_deployment_id, '') = IFNULL(?5, '')",
params![
coverage_key,
space_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
row_to_knowledge_coverage,
)
.optional()?,
)
}
pub async fn get_stats(&self) -> MemoryResult<MemoryStats> {
let mut stats = self
.get_stats_for_tenant(&MemoryTenantScope::local())
.await?;
stats.file_size = std::fs::metadata(&self.db_path)?.len() as i64;
Ok(stats)
}
pub async fn get_stats_for_tenant(
&self,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<MemoryStats> {
let conn = self.conn.lock().await;
let session_chunks: i64 = conn.query_row(
"SELECT COUNT(*) FROM session_memory_chunks
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
let project_chunks: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_memory_chunks
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
let global_chunks: i64 = conn.query_row(
"SELECT COUNT(*) FROM global_memory_chunks
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
let session_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM session_memory_chunks
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
let project_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM project_memory_chunks
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
let global_bytes: i64 = conn.query_row(
"SELECT COALESCE(SUM(LENGTH(content)), 0) FROM global_memory_chunks
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
let last_cleanup: Option<String> = conn
.query_row(
"SELECT created_at FROM memory_cleanup_log
WHERE tenant_org_id = ?1
AND tenant_workspace_id = ?2
AND IFNULL(tenant_deployment_id, '') = IFNULL(?3, '')
ORDER BY created_at DESC
LIMIT 1",
params![
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)
.optional()?;
let last_cleanup = last_cleanup.and_then(|s| {
DateTime::parse_from_rfc3339(&s)
.ok()
.map(|dt| dt.with_timezone(&Utc))
});
let total_bytes = session_bytes + project_bytes + global_bytes;
Ok(MemoryStats {
total_chunks: session_chunks + project_chunks + global_chunks,
session_chunks,
project_chunks,
global_chunks,
total_bytes,
session_bytes,
project_bytes,
global_bytes,
file_size: total_bytes,
last_cleanup,
})
}
pub async fn log_cleanup(
&self,
cleanup_type: &str,
tier: MemoryTier,
project_id: Option<&str>,
session_id: Option<&str>,
chunks_deleted: i64,
bytes_reclaimed: i64,
) -> MemoryResult<()> {
self.log_cleanup_for_tenant(
cleanup_type,
tier,
project_id,
session_id,
chunks_deleted,
bytes_reclaimed,
&MemoryTenantScope::local(),
)
.await
}
pub async fn log_cleanup_for_tenant(
&self,
cleanup_type: &str,
tier: MemoryTier,
project_id: Option<&str>,
session_id: Option<&str>,
chunks_deleted: i64,
bytes_reclaimed: i64,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<()> {
let conn = self.conn.lock().await;
let id = uuid::Uuid::new_v4().to_string();
let created_at = Utc::now().to_rfc3339();
conn.execute(
"INSERT INTO memory_cleanup_log
(id, cleanup_type, tier, project_id, session_id, chunks_deleted, bytes_reclaimed, created_at,
tenant_org_id, tenant_workspace_id, tenant_deployment_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
params![
id,
cleanup_type,
tier.to_string(),
project_id,
session_id,
chunks_deleted,
bytes_reclaimed,
created_at,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
)?;
Ok(())
}
pub async fn vacuum(&self) -> MemoryResult<()> {
let conn = self.conn.lock().await;
conn.execute("VACUUM", [])?;
Ok(())
}
pub async fn project_file_index_count(&self, project_id: &str) -> MemoryResult<i64> {
self.project_file_index_count_for_tenant(project_id, &MemoryTenantScope::local())
.await
}
pub async fn project_file_index_count_for_tenant(
&self,
project_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<i64> {
let conn = self.conn.lock().await;
let n: i64 = conn.query_row(
"SELECT COUNT(*) FROM project_file_index
WHERE project_id = ?1
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')",
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)?;
Ok(n)
}
pub async fn project_has_file_chunks(&self, project_id: &str) -> MemoryResult<bool> {
self.project_has_file_chunks_for_tenant(project_id, &MemoryTenantScope::local())
.await
}
pub async fn project_has_file_chunks_for_tenant(
&self,
project_id: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<bool> {
let conn = self.conn.lock().await;
let exists: Option<i64> = conn
.query_row(
"SELECT 1 FROM project_memory_chunks
WHERE project_id = ?1 AND source = 'file'
AND tenant_org_id = ?2
AND tenant_workspace_id = ?3
AND IFNULL(tenant_deployment_id, '') = IFNULL(?4, '')
LIMIT 1",
params![
project_id,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| row.get(0),
)
.optional()?;
Ok(exists.is_some())
}
pub async fn get_file_index_entry(
&self,
project_id: &str,
path: &str,
) -> MemoryResult<Option<(i64, i64, String)>> {
self.get_file_index_entry_for_tenant(project_id, path, &MemoryTenantScope::local())
.await
}
pub async fn get_file_index_entry_for_tenant(
&self,
project_id: &str,
path: &str,
tenant_scope: &MemoryTenantScope,
) -> MemoryResult<Option<(i64, i64, String)>> {
let conn = self.conn.lock().await;
let row: Option<(i64, i64, String)> = conn
.query_row(
"SELECT mtime, size, hash FROM project_file_index
WHERE project_id = ?1 AND path = ?2
AND tenant_org_id = ?3
AND tenant_workspace_id = ?4
AND IFNULL(tenant_deployment_id, '') = IFNULL(?5, '')",
params![
project_id,
path,
tenant_scope.org_id.as_str(),
tenant_scope.workspace_id.as_str(),
tenant_scope.deployment_id.as_deref()
],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.optional()?;
Ok(row)
}
}