use chrono::{DateTime, TimeZone, Utc};
use rusqlite::{params, Connection, OptionalExtension, Result as SqlResult};
use std::path::Path;
use crate::synthesis::types::{GateScores, ProvenanceRecord};
use crate::triple::{Triple, Predicate, TripleSource};
use crate::types::{AclEntry, CrossLink, HebbianLink, MemoryLayer, MergeOutcome, MemoryRecord, MemoryType, Permission};
use std::sync::OnceLock;
fn jieba() -> &'static jieba_rs::Jieba {
static JIEBA: OnceLock<jieba_rs::Jieba> = OnceLock::new();
JIEBA.get_or_init(jieba_rs::Jieba::new)
}
fn tokenize_cjk_boundaries(text: &str) -> String {
if !text.chars().any(is_cjk_char) {
return text.to_string(); }
let words = jieba().cut(text, true);
let joined = words.join(" ");
let mut result = String::with_capacity(joined.len());
let mut prev_space = false;
for ch in joined.chars() {
if ch == ' ' {
if !prev_space {
result.push(ch);
}
prev_space = true;
} else {
result.push(ch);
prev_space = false;
}
}
result
}
fn tokenize_like_unicode61(text: &str) -> Vec<String> {
let mut tokens = Vec::new();
let mut current = String::new();
for ch in text.chars() {
if ch.is_alphanumeric() || is_cjk_char(ch) {
current.push(ch);
} else {
if !current.is_empty() {
tokens.push(std::mem::take(&mut current));
}
}
}
if !current.is_empty() {
tokens.push(current);
}
tokens
}
fn is_cjk_char(ch: char) -> bool {
matches!(ch,
'\u{4E00}'..='\u{9FFF}' | '\u{3400}'..='\u{4DBF}' | '\u{F900}'..='\u{FAFF}' | '\u{3000}'..='\u{303F}' | '\u{3040}'..='\u{309F}' | '\u{30A0}'..='\u{30FF}' | '\u{AC00}'..='\u{D7AF}' )
}
fn datetime_to_f64(dt: &DateTime<Utc>) -> f64 {
dt.timestamp() as f64 + dt.timestamp_subsec_nanos() as f64 / 1_000_000_000.0
}
fn f64_to_datetime(ts: f64) -> DateTime<Utc> {
let secs = ts.floor() as i64;
let nanos = ((ts - secs as f64) * 1_000_000_000.0).max(0.0) as u32;
Utc.timestamp_opt(secs, nanos)
.single()
.unwrap_or_else(Utc::now)
}
pub fn now_f64() -> f64 {
datetime_to_f64(&Utc::now())
}
fn bytes_to_f32_vec(bytes: &[u8]) -> Vec<f32> {
bytes
.chunks_exact(4)
.map(|chunk| {
let arr: [u8; 4] = chunk.try_into().unwrap();
f32::from_le_bytes(arr)
})
.collect()
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EmbeddingStats {
pub total_memories: usize,
pub embedded_count: usize,
pub model: Option<String>,
pub dimensions: Option<usize>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct EntityRecord {
pub id: String,
pub name: String,
pub entity_type: String,
pub namespace: String,
pub metadata: Option<String>,
pub created_at: f64,
pub updated_at: f64,
}
fn generate_entity_id(name: &str, entity_type: &str, namespace: &str) -> String {
let input = format!("{}|{}|{}", name.to_lowercase(), entity_type.to_lowercase(), namespace);
let mut hash: u64 = 0xcbf29ce484222325;
for byte in input.as_bytes() {
hash ^= *byte as u64;
hash = hash.wrapping_mul(0x100000001b3);
}
format!("{:016x}", hash)
}
pub struct Storage {
conn: Connection,
}
impl Storage {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, rusqlite::Error> {
let conn = Connection::open(path)?;
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON; PRAGMA busy_timeout=5000;")?;
Self::create_schema(&conn)?;
Self::migrate_v2(&conn)?;
Self::migrate_embeddings(&conn)?;
Self::migrate_entities(&conn)?;
Self::rebuild_fts_if_needed(&conn)?;
Self::migrate_hebbian_signals(&conn)?;
Self::migrate_triples(&conn)?;
Self::migrate_promotions(&conn)?;
Self::migrate_cluster_state(&conn)?;
match conn.execute(
"ALTER TABLE memories ADD COLUMN deleted_at TEXT DEFAULT NULL",
[],
) {
Ok(_) => {},
Err(e) if e.to_string().contains("duplicate column name") => {},
Err(e) => return Err(e),
}
conn.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_memories_deleted_at ON memories(deleted_at);"
)?;
match conn.execute(
"ALTER TABLE memories ADD COLUMN superseded_by TEXT DEFAULT ''",
[],
) {
Ok(_) => {},
Err(e) if e.to_string().contains("duplicate column name") => {},
Err(e) => return Err(e),
}
Ok(Self { conn })
}
pub fn connection(&self) -> &Connection {
&self.conn
}
fn create_schema(conn: &Connection) -> SqlResult<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
memory_type TEXT NOT NULL,
layer TEXT NOT NULL,
created_at REAL NOT NULL,
working_strength REAL NOT NULL DEFAULT 1.0,
core_strength REAL NOT NULL DEFAULT 0.0,
importance REAL NOT NULL DEFAULT 0.3,
pinned INTEGER NOT NULL DEFAULT 0,
consolidation_count INTEGER NOT NULL DEFAULT 0,
last_consolidated REAL,
source TEXT DEFAULT '',
contradicts TEXT DEFAULT '',
contradicted_by TEXT DEFAULT '',
metadata TEXT,
namespace TEXT NOT NULL DEFAULT 'default'
);
CREATE TABLE IF NOT EXISTS access_log (
memory_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
accessed_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS hebbian_links (
source_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
target_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
strength REAL NOT NULL DEFAULT 1.0,
coactivation_count INTEGER NOT NULL DEFAULT 0,
temporal_forward INTEGER NOT NULL DEFAULT 0,
temporal_backward INTEGER NOT NULL DEFAULT 0,
direction TEXT NOT NULL DEFAULT 'bidirectional',
created_at REAL NOT NULL,
namespace TEXT NOT NULL DEFAULT 'default',
PRIMARY KEY (source_id, target_id)
);
CREATE TABLE IF NOT EXISTS engram_acl (
agent_id TEXT NOT NULL,
namespace TEXT NOT NULL,
permission TEXT NOT NULL,
granted_by TEXT NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (agent_id, namespace)
);
-- Schema metadata
CREATE TABLE IF NOT EXISTS engram_meta (
key TEXT PRIMARY KEY,
value TEXT NOT NULL
);
INSERT OR IGNORE INTO engram_meta VALUES ('schema_version', '1');
-- Entity tables (canonical schema)
CREATE TABLE IF NOT EXISTS entities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
entity_type TEXT NOT NULL,
namespace TEXT NOT NULL DEFAULT 'default',
metadata TEXT,
created_at REAL NOT NULL,
updated_at REAL NOT NULL
);
CREATE TABLE IF NOT EXISTS entity_relations (
id TEXT PRIMARY KEY,
source_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
target_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
relation TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 1.0,
source TEXT,
namespace TEXT NOT NULL DEFAULT 'default',
created_at REAL NOT NULL,
metadata TEXT
);
CREATE TABLE IF NOT EXISTS memory_entities (
memory_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
entity_id TEXT NOT NULL REFERENCES entities(id) ON DELETE CASCADE,
role TEXT NOT NULL DEFAULT 'mention',
PRIMARY KEY (memory_id, entity_id)
);
CREATE INDEX IF NOT EXISTS idx_access_log_mid ON access_log(memory_id);
CREATE INDEX IF NOT EXISTS idx_hebbian_source ON hebbian_links(source_id);
CREATE INDEX IF NOT EXISTS idx_hebbian_target ON hebbian_links(target_id);
CREATE INDEX IF NOT EXISTS idx_memories_type ON memories(memory_type);
CREATE INDEX IF NOT EXISTS idx_memories_namespace ON memories(namespace);
CREATE INDEX IF NOT EXISTS idx_hebbian_namespace ON hebbian_links(namespace);
CREATE INDEX IF NOT EXISTS idx_entities_namespace ON entities(namespace);
CREATE INDEX IF NOT EXISTS idx_entity_relations_source ON entity_relations(source_id);
CREATE INDEX IF NOT EXISTS idx_entity_relations_target ON entity_relations(target_id);
CREATE INDEX IF NOT EXISTS idx_memory_entities_memory ON memory_entities(memory_id);
CREATE INDEX IF NOT EXISTS idx_memory_entities_entity ON memory_entities(entity_id);
-- Synthesis provenance: tracks which source memories contributed to insights
CREATE TABLE IF NOT EXISTS synthesis_provenance (
id TEXT PRIMARY KEY,
insight_id TEXT NOT NULL,
source_id TEXT NOT NULL,
cluster_id TEXT NOT NULL,
synthesis_timestamp TEXT NOT NULL,
gate_decision TEXT NOT NULL,
gate_scores TEXT,
confidence REAL NOT NULL,
source_original_importance REAL,
FOREIGN KEY (insight_id) REFERENCES memories(id),
FOREIGN KEY (source_id) REFERENCES memories(id)
);
CREATE INDEX IF NOT EXISTS idx_provenance_insight ON synthesis_provenance(insight_id);
CREATE INDEX IF NOT EXISTS idx_provenance_source ON synthesis_provenance(source_id);
-- FTS5 for full-text search (manually managed, not via triggers,
-- so we can pre-process content for CJK/ASCII boundary tokenization)
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
content
);
"#,
)?;
Ok(())
}
fn migrate_v2(conn: &Connection) -> SqlResult<()> {
let has_namespace: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('memories') WHERE name='namespace'",
[],
|row| row.get(0),
)?;
if !has_namespace {
conn.execute(
"ALTER TABLE memories ADD COLUMN namespace TEXT NOT NULL DEFAULT 'default'",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memories_namespace ON memories(namespace)",
[],
)?;
}
let has_hebbian_namespace: bool = conn.query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('hebbian_links') WHERE name='namespace'",
[],
|row| row.get(0),
)?;
if !has_hebbian_namespace {
conn.execute(
"ALTER TABLE hebbian_links ADD COLUMN namespace TEXT NOT NULL DEFAULT 'default'",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_hebbian_namespace ON hebbian_links(namespace)",
[],
)?;
}
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS engram_acl (
agent_id TEXT NOT NULL,
namespace TEXT NOT NULL,
permission TEXT NOT NULL,
granted_by TEXT NOT NULL,
created_at REAL NOT NULL,
PRIMARY KEY (agent_id, namespace)
);
"#,
)?;
Ok(())
}
fn migrate_embeddings(conn: &Connection) -> SqlResult<()> {
let protocol_version = Self::get_meta(conn, "embedding_protocol_version")
.unwrap_or(None)
.unwrap_or_else(|| "0".to_string());
if protocol_version == "2" {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS memory_embeddings (
memory_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
dimensions INTEGER NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (memory_id, model)
);
CREATE INDEX IF NOT EXISTS idx_embeddings_model ON memory_embeddings(model);
"#,
)?;
return Ok(());
}
let table_exists: bool = conn.query_row(
"SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='memory_embeddings'",
[],
|row| row.get::<_, i64>(0),
).map(|c| c > 0).unwrap_or(false);
if !table_exists {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS memory_embeddings (
memory_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
dimensions INTEGER NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (memory_id, model)
);
CREATE INDEX IF NOT EXISTS idx_embeddings_model ON memory_embeddings(model);
"#,
)?;
Self::set_meta(conn, "embedding_protocol_version", "2")?;
return Ok(());
}
eprintln!("[engram] Migrating memory_embeddings to protocol v2 (multi-model support)...");
let cols: Vec<String> = conn
.prepare("PRAGMA table_info(memory_embeddings)")?
.query_map([], |row| row.get::<_, String>(1))?
.filter_map(|r| r.ok())
.collect();
let has_model = cols.contains(&"model".to_string());
let has_dimensions = cols.contains(&"dimensions".to_string());
let has_created_at = cols.contains(&"created_at".to_string());
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS memory_embeddings_v2 (
memory_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
model TEXT NOT NULL,
embedding BLOB NOT NULL,
dimensions INTEGER NOT NULL,
created_at TEXT NOT NULL,
PRIMARY KEY (memory_id, model)
);
"#,
)?;
let mut migrated = 0;
let mut skipped = 0;
{
let select_sql = if has_model && has_dimensions && has_created_at {
"SELECT memory_id, embedding, model, dimensions, created_at FROM memory_embeddings"
} else if has_model {
"SELECT memory_id, embedding, model, 0, '' FROM memory_embeddings"
} else {
"SELECT memory_id, embedding, 'unknown/legacy', 0, '' FROM memory_embeddings"
};
let mut stmt = conn.prepare(select_sql)?;
let rows: Vec<(String, Vec<u8>, String, i64, String)> = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Vec<u8>>(1)?,
row.get::<_, String>(2)?,
row.get::<_, i64>(3)?,
row.get::<_, String>(4)?,
))
})?.filter_map(|r| r.ok()).collect();
for (memory_id, blob_or_text, mut model, mut dims, created_at) in rows {
let final_blob: Vec<u8>;
if blob_or_text.len() % 4 == 0 && !blob_or_text.is_empty() {
if blob_or_text.first() == Some(&0x5B) || blob_or_text.first() == Some(&0x2D) {
match Self::json_text_to_blob(&blob_or_text) {
Some((blob, d)) => {
final_blob = blob;
if dims == 0 { dims = d as i64; }
}
None => {
eprintln!("[engram] Skipping corrupt embedding for memory {}", memory_id);
skipped += 1;
continue;
}
}
} else {
final_blob = blob_or_text;
if dims == 0 { dims = final_blob.len() as i64 / 4; }
}
} else if !blob_or_text.is_empty() {
match Self::json_text_to_blob(&blob_or_text) {
Some((blob, d)) => {
final_blob = blob;
if dims == 0 { dims = d as i64; }
}
None => {
eprintln!("[engram] Skipping corrupt embedding for memory {}", memory_id);
skipped += 1;
continue;
}
}
} else {
skipped += 1;
continue;
}
if !model.contains('/') {
if model == "unknown" || model.is_empty() {
model = "unknown/legacy".to_string();
} else {
model = if model.starts_with("text-embedding") {
format!("openai/{}", model)
} else {
format!("ollama/{}", model)
};
}
}
let ts = if created_at.is_empty() {
chrono::Utc::now().to_rfc3339()
} else {
created_at
};
conn.execute(
"INSERT OR REPLACE INTO memory_embeddings_v2 (memory_id, model, embedding, dimensions, created_at) VALUES (?, ?, ?, ?, ?)",
params![memory_id, model, final_blob, dims, ts],
)?;
migrated += 1;
}
}
conn.execute_batch(
r#"
DROP TABLE memory_embeddings;
ALTER TABLE memory_embeddings_v2 RENAME TO memory_embeddings;
CREATE INDEX IF NOT EXISTS idx_embeddings_model ON memory_embeddings(model);
"#,
)?;
Self::set_meta(conn, "embedding_protocol_version", "2")?;
eprintln!("[engram] Migration complete: {} migrated, {} skipped", migrated, skipped);
Ok(())
}
fn json_text_to_blob(data: &[u8]) -> Option<(Vec<u8>, usize)> {
let text = std::str::from_utf8(data).ok()?;
let values: Vec<f64> = serde_json::from_str(text).ok()?;
let dims = values.len();
let blob: Vec<u8> = values.iter()
.flat_map(|v| (*v as f32).to_le_bytes())
.collect();
Some((blob, dims))
}
fn get_meta(conn: &Connection, key: &str) -> SqlResult<Option<String>> {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS engram_meta (key TEXT PRIMARY KEY, value TEXT NOT NULL);"
)?;
conn.query_row(
"SELECT value FROM engram_meta WHERE key = ?",
params![key],
|row| row.get(0),
).optional()
}
fn set_meta(conn: &Connection, key: &str, value: &str) -> SqlResult<()> {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS engram_meta (key TEXT PRIMARY KEY, value TEXT NOT NULL);"
)?;
conn.execute(
"INSERT OR REPLACE INTO engram_meta (key, value) VALUES (?, ?)",
params![key, value],
)?;
Ok(())
}
fn migrate_hebbian_signals(conn: &Connection) -> SqlResult<()> {
match conn.execute(
"ALTER TABLE hebbian_links ADD COLUMN signal_source TEXT DEFAULT 'corecall'",
[],
) {
Ok(_) => {},
Err(e) if e.to_string().contains("duplicate column name") => {},
Err(e) => return Err(e),
}
match conn.execute(
"ALTER TABLE hebbian_links ADD COLUMN signal_detail TEXT DEFAULT NULL",
[],
) {
Ok(_) => {},
Err(e) if e.to_string().contains("duplicate column name") => {},
Err(e) => return Err(e),
}
conn.execute(
"UPDATE hebbian_links SET signal_source = 'corecall' WHERE signal_source IS NULL",
[],
)?;
conn.execute_batch(
"CREATE INDEX IF NOT EXISTS idx_hebbian_signal_source ON hebbian_links(signal_source);"
)?;
Ok(())
}
fn migrate_triples(conn: &Connection) -> SqlResult<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS triples (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
subject TEXT NOT NULL,
predicate TEXT NOT NULL,
object TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 1.0,
source TEXT NOT NULL DEFAULT 'llm',
created_at TEXT NOT NULL,
UNIQUE(memory_id, subject, predicate, object)
);
CREATE INDEX IF NOT EXISTS idx_triples_memory ON triples(memory_id);
CREATE INDEX IF NOT EXISTS idx_triples_subject ON triples(subject);
CREATE INDEX IF NOT EXISTS idx_triples_object ON triples(object);
"#
)?;
match conn.execute(
"ALTER TABLE memories ADD COLUMN triple_extraction_attempts INTEGER NOT NULL DEFAULT 0",
[],
) {
Ok(_) => {},
Err(e) if e.to_string().contains("duplicate column name") => {},
Err(e) => return Err(e),
}
Ok(())
}
fn migrate_entities(conn: &Connection) -> SqlResult<()> {
conn.execute_batch(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_entities_unique ON entities(name, entity_type, namespace);"
)?;
conn.execute_batch(
"CREATE UNIQUE INDEX IF NOT EXISTS idx_entity_relations_unique ON entity_relations(source_id, target_id, relation);"
)?;
Ok(())
}
fn migrate_promotions(conn: &Connection) -> SqlResult<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS promotion_candidates (
id TEXT PRIMARY KEY,
member_ids TEXT NOT NULL,
snippets TEXT NOT NULL,
avg_core_strength REAL NOT NULL,
avg_importance REAL NOT NULL,
time_span_days REAL NOT NULL,
internal_link_count INTEGER NOT NULL,
suggested_target TEXT NOT NULL,
summary TEXT,
status TEXT NOT NULL DEFAULT 'pending',
created_at TEXT NOT NULL,
resolved_at TEXT
);
"#
)?;
Ok(())
}
pub fn store_promotion_candidate(&self, candidate: &crate::promotion::PromotionCandidate) -> Result<(), rusqlite::Error> {
self.conn.execute(
"INSERT OR REPLACE INTO promotion_candidates (id, member_ids, snippets, avg_core_strength, avg_importance, time_span_days, internal_link_count, suggested_target, summary, status, created_at) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11)",
rusqlite::params![
candidate.id,
serde_json::to_string(&candidate.member_ids).unwrap_or_default(),
serde_json::to_string(&candidate.snippets).unwrap_or_default(),
candidate.avg_core_strength,
candidate.avg_importance,
candidate.time_span_days,
candidate.internal_link_count,
candidate.suggested_target,
candidate.summary,
candidate.status,
candidate.created_at.to_rfc3339(),
],
)?;
Ok(())
}
pub fn get_pending_promotions(&self) -> Result<Vec<crate::promotion::PromotionCandidate>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT id, member_ids, snippets, avg_core_strength, avg_importance, time_span_days, internal_link_count, suggested_target, summary, status, created_at FROM promotion_candidates WHERE status = 'pending'"
)?;
let rows = stmt.query_map([], |row| {
let member_ids_json: String = row.get(1)?;
let snippets_json: String = row.get(2)?;
let created_at_str: String = row.get(10)?;
Ok(crate::promotion::PromotionCandidate {
id: row.get(0)?,
member_ids: serde_json::from_str(&member_ids_json).unwrap_or_default(),
snippets: serde_json::from_str(&snippets_json).unwrap_or_default(),
avg_core_strength: row.get(3)?,
avg_importance: row.get(4)?,
time_span_days: row.get(5)?,
internal_link_count: row.get::<_, i64>(6)? as usize,
suggested_target: row.get(7)?,
summary: row.get(8)?,
status: row.get(9)?,
created_at: chrono::DateTime::parse_from_rfc3339(&created_at_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now()),
})
})?;
rows.collect()
}
pub fn resolve_promotion(&self, id: &str, status: &str) -> Result<(), rusqlite::Error> {
self.conn.execute(
"UPDATE promotion_candidates SET status = ?1, resolved_at = ?2 WHERE id = ?3",
rusqlite::params![status, chrono::Utc::now().to_rfc3339(), id],
)?;
Ok(())
}
pub fn is_cluster_already_promoted(&self, member_ids: &[String]) -> Result<bool, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT member_ids FROM promotion_candidates WHERE status != 'dismissed'"
)?;
let rows = stmt.query_map([], |row| {
let json: String = row.get(0)?;
Ok(json)
})?;
let input_set: std::collections::HashSet<&str> = member_ids.iter().map(|s| s.as_str()).collect();
for row in rows {
let json = row?;
if let Ok(existing_ids) = serde_json::from_str::<Vec<String>>(&json) {
let existing_set: std::collections::HashSet<&str> = existing_ids.iter().map(|s| s.as_str()).collect();
let overlap = input_set.intersection(&existing_set).count();
let min_size = input_set.len().min(existing_set.len());
if min_size > 0 && overlap * 2 >= min_size {
return Ok(true);
}
}
}
Ok(false)
}
fn rebuild_fts_if_needed(conn: &Connection) -> SqlResult<()> {
const FTS_CJK_VERSION: &str = "1";
let current: Option<String> = conn
.query_row(
"SELECT value FROM engram_meta WHERE key = 'fts_cjk_version'",
[],
|row| row.get(0),
)
.ok();
if current.as_deref() == Some(FTS_CJK_VERSION) {
return Ok(()); }
let count: i64 = conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get(0))?;
if count == 0 {
conn.execute(
"INSERT OR REPLACE INTO engram_meta VALUES ('fts_cjk_version', ?1)",
params![FTS_CJK_VERSION],
)?;
return Ok(());
}
conn.execute_batch("BEGIN IMMEDIATE")?;
conn.execute("DELETE FROM memories_fts", [])?;
let mut stmt = conn.prepare("SELECT rowid, content FROM memories")?;
let rows: Vec<(i64, String)> = stmt
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
.filter_map(|r| r.ok())
.collect();
for (rowid, content) in &rows {
let tokenized = tokenize_cjk_boundaries(content);
conn.execute(
"INSERT INTO memories_fts(rowid, content) VALUES (?1, ?2)",
params![rowid, tokenized],
)?;
}
conn.execute(
"INSERT OR REPLACE INTO engram_meta VALUES ('fts_cjk_version', ?1)",
params![FTS_CJK_VERSION],
)?;
conn.execute_batch("COMMIT")?;
eprintln!("[engram] Rebuilt FTS index with CJK tokenization for {} memories", rows.len());
Ok(())
}
pub fn add(&mut self, record: &MemoryRecord, namespace: &str) -> Result<(), rusqlite::Error> {
let tx = self.conn.transaction()?;
let metadata_json = record.metadata.as_ref().and_then(|m| serde_json::to_string(m).ok());
tx.execute(
r#"
INSERT INTO memories (
id, content, memory_type, layer, created_at,
working_strength, core_strength, importance, pinned,
consolidation_count, last_consolidated, source,
contradicts, contradicted_by, metadata, namespace
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
params![
record.id,
record.content,
record.memory_type.to_string(),
record.layer.to_string(),
datetime_to_f64(&record.created_at),
record.working_strength,
record.core_strength,
record.importance,
record.pinned as i32,
record.consolidation_count,
record.last_consolidated.map(|dt| datetime_to_f64(&dt)),
record.source,
record.contradicts.as_ref().unwrap_or(&String::new()),
record.contradicted_by.as_ref().unwrap_or(&String::new()),
metadata_json,
namespace,
],
)?;
tx.execute(
"INSERT INTO access_log (memory_id, accessed_at) VALUES (?, ?)",
params![record.id, datetime_to_f64(&record.created_at)],
)?;
let tokenized = tokenize_cjk_boundaries(&record.content);
let rowid: i64 = tx.query_row(
"SELECT rowid FROM memories WHERE id = ?",
params![record.id],
|row| row.get(0),
)?;
tx.execute(
"INSERT INTO memories_fts(rowid, content) VALUES (?, ?)",
params![rowid, tokenized],
)?;
tx.commit()?;
Ok(())
}
pub fn get(&self, id: &str) -> Result<Option<MemoryRecord>, rusqlite::Error> {
let access_times = self.get_access_times(id)?;
self.conn
.query_row(
"SELECT * FROM memories WHERE id = ?",
params![id],
|row| self.row_to_record(row, access_times.clone()),
)
.optional()
}
pub fn all(&self) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let mut stmt = self.conn.prepare("SELECT * FROM memories WHERE deleted_at IS NULL AND (superseded_by IS NULL OR superseded_by = '')")?;
let rows = stmt.query_map([], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
pub fn update(&mut self, record: &MemoryRecord) -> Result<(), rusqlite::Error> {
let metadata_json = record.metadata.as_ref().and_then(|m| serde_json::to_string(m).ok());
let needs_tx = self.conn.is_autocommit();
if needs_tx {
self.conn.execute_batch("BEGIN IMMEDIATE")?;
}
let result = self.update_inner(record, &metadata_json);
if needs_tx {
match &result {
Ok(_) => self.conn.execute_batch("COMMIT")?,
Err(_) => { let _ = self.conn.execute_batch("ROLLBACK"); }
}
}
result
}
fn update_inner(&self, record: &MemoryRecord, metadata_json: &Option<String>) -> Result<(), rusqlite::Error> {
let rowid: i64 = self.conn.query_row(
"SELECT rowid FROM memories WHERE id = ?",
params![record.id],
|row| row.get(0),
)?;
self.conn.execute(
r#"
UPDATE memories SET
content = ?, memory_type = ?, layer = ?,
working_strength = ?, core_strength = ?, importance = ?,
pinned = ?, consolidation_count = ?, last_consolidated = ?,
source = ?, contradicts = ?, contradicted_by = ?, metadata = ?
WHERE id = ?
"#,
params![
record.content,
record.memory_type.to_string(),
record.layer.to_string(),
record.working_strength,
record.core_strength,
record.importance,
record.pinned as i32,
record.consolidation_count,
record.last_consolidated.map(|dt| datetime_to_f64(&dt)),
record.source,
record.contradicts.as_ref().unwrap_or(&String::new()),
record.contradicted_by.as_ref().unwrap_or(&String::new()),
metadata_json,
record.id,
],
)?;
match self.conn.execute("DELETE FROM memories_fts WHERE rowid = ?", params![rowid]) {
Ok(_) => {},
Err(e) if e.to_string().contains("malformed") => {
eprintln!("[engram] FTS corruption detected during update, rebuilding index...");
let _ = self.conn.execute(
"INSERT INTO memories_fts(memories_fts) VALUES('rebuild')", []
);
let _ = self.conn.execute("DELETE FROM memories_fts WHERE rowid = ?", params![rowid]);
}
Err(_) => {} }
let tokenized = tokenize_cjk_boundaries(&record.content);
let _ = self.conn.execute(
"INSERT INTO memories_fts(rowid, content) VALUES (?, ?)",
params![rowid, tokenized],
);
Ok(())
}
pub fn delete(&mut self, id: &str) -> Result<(), rusqlite::Error> {
let needs_tx = self.conn.is_autocommit();
if needs_tx {
self.conn.execute_batch("BEGIN IMMEDIATE")?;
}
let result = self.delete_inner(id);
if needs_tx {
match &result {
Ok(_) => self.conn.execute_batch("COMMIT")?,
Err(_) => { let _ = self.conn.execute_batch("ROLLBACK"); }
}
}
result
}
fn delete_inner(&self, id: &str) -> Result<(), rusqlite::Error> {
let rowid: Result<i64, _> = self.conn.query_row(
"SELECT rowid FROM memories WHERE id = ?",
params![id],
|row| row.get(0),
);
if let Ok(rowid) = rowid {
let _ = self.conn.execute(
"DELETE FROM memories_fts WHERE rowid = ?",
params![rowid],
);
}
self.conn.execute("DELETE FROM memories WHERE id = ?", params![id])?;
Ok(())
}
pub fn update_content(
&mut self,
id: &str,
new_content: &str,
metadata: Option<serde_json::Value>,
) -> Result<(), rusqlite::Error> {
let metadata_json = metadata.and_then(|m| serde_json::to_string(&m).ok());
let needs_tx = self.conn.is_autocommit();
if needs_tx {
self.conn.execute_batch("BEGIN IMMEDIATE")?;
}
let result = self.update_content_inner(id, new_content, &metadata_json);
if needs_tx {
match &result {
Ok(_) => self.conn.execute_batch("COMMIT")?,
Err(_) => { let _ = self.conn.execute_batch("ROLLBACK"); }
}
}
result
}
fn update_content_inner(&self, id: &str, new_content: &str, metadata_json: &Option<String>) -> Result<(), rusqlite::Error> {
let rowid: i64 = self.conn.query_row(
"SELECT rowid FROM memories WHERE id = ?",
params![id],
|row| row.get(0),
)?;
self.conn.execute(
"UPDATE memories SET content = ?, metadata = ? WHERE id = ?",
params![new_content, metadata_json, id],
)?;
let _ = self.conn.execute("DELETE FROM memories_fts WHERE rowid = ?", params![rowid]);
let tokenized = tokenize_cjk_boundaries(new_content);
let _ = self.conn.execute(
"INSERT INTO memories_fts(rowid, content) VALUES (?, ?)",
params![rowid, tokenized],
);
Ok(())
}
pub fn search_by_type_ns(
&self,
memory_type: MemoryType,
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let ns = namespace.unwrap_or("default");
if ns == "*" {
let mut stmt = self.conn.prepare(
"SELECT * FROM memories WHERE memory_type = ? AND deleted_at IS NULL AND (superseded_by IS NULL OR superseded_by = '') ORDER BY importance DESC LIMIT ?"
)?;
let rows = stmt.query_map(params![memory_type.to_string(), limit as i64], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
} else {
let mut stmt = self.conn.prepare(
"SELECT * FROM memories WHERE memory_type = ? AND namespace = ? AND deleted_at IS NULL AND (superseded_by IS NULL OR superseded_by = '') ORDER BY importance DESC LIMIT ?"
)?;
let rows = stmt.query_map(params![memory_type.to_string(), ns, limit as i64], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
}
pub fn record_access(&mut self, id: &str) -> Result<(), rusqlite::Error> {
self.conn.execute(
"INSERT INTO access_log (memory_id, accessed_at) VALUES (?, ?)",
params![id, now_f64()],
)?;
Ok(())
}
pub fn get_access_times(&self, id: &str) -> Result<Vec<DateTime<Utc>>, rusqlite::Error> {
let mut stmt = self
.conn
.prepare("SELECT accessed_at FROM access_log WHERE memory_id = ? ORDER BY accessed_at")?;
let rows = stmt.query_map(params![id], |row| {
let ts: f64 = row.get(0)?;
Ok(f64_to_datetime(ts))
})?;
rows.collect()
}
pub fn search_fts(&self, query: &str, limit: usize) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let tokenized = tokenize_cjk_boundaries(query);
let words = tokenize_like_unicode61(&tokenized);
if words.is_empty() {
return Ok(vec![]);
}
let fts_query = words.iter().map(|w| format!("\"{}\"", w)).collect::<Vec<_>>().join(" OR ");
let mut stmt = self.conn.prepare(
r#"
SELECT m.* FROM memories m
JOIN memories_fts f ON m.rowid = f.rowid
WHERE memories_fts MATCH ? AND m.deleted_at IS NULL
AND (m.superseded_by IS NULL OR m.superseded_by = '')
ORDER BY rank LIMIT ?
"#,
)?;
let rows = stmt.query_map(params![fts_query, limit as i64], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
pub fn fetch_recent(
&self,
limit: usize,
namespace: Option<&str>,
) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let ns = namespace.unwrap_or("default");
if ns == "*" {
let mut stmt = self.conn.prepare(
"SELECT * FROM memories WHERE (superseded_by IS NULL OR superseded_by = '') AND deleted_at IS NULL ORDER BY created_at DESC LIMIT ?"
)?;
let rows = stmt.query_map(params![limit as i64], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
} else {
let mut stmt = self.conn.prepare(
"SELECT * FROM memories WHERE namespace = ? AND (superseded_by IS NULL OR superseded_by = '') AND deleted_at IS NULL ORDER BY created_at DESC LIMIT ?"
)?;
let rows = stmt.query_map(params![ns, limit as i64], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
}
pub fn search_by_type(&self, memory_type: MemoryType) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let mut stmt = self
.conn
.prepare("SELECT * FROM memories WHERE memory_type = ? AND deleted_at IS NULL AND (superseded_by IS NULL OR superseded_by = '')")?;
let rows = stmt.query_map(params![memory_type.to_string()], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
pub fn get_hebbian_neighbors(&self, memory_id: &str) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT target_id FROM hebbian_links WHERE source_id = ? AND strength > 0"
)?;
let rows = stmt.query_map(params![memory_id], |row| row.get(0))?;
rows.collect()
}
pub fn get_hebbian_links_weighted(&self, memory_id: &str) -> Result<Vec<(String, f64)>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT CASE WHEN source_id = ?1 THEN target_id ELSE source_id END, strength \
FROM hebbian_links WHERE (source_id = ?1 OR target_id = ?1) AND strength > 0"
)?;
let rows = stmt.query_map(params![memory_id], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, f64>(1)?))
})?;
rows.collect()
}
pub fn record_coactivation(
&mut self,
id1: &str,
id2: &str,
threshold: i32,
) -> Result<bool, rusqlite::Error> {
let (id1, id2) = if id1 < id2 { (id1, id2) } else { (id2, id1) };
let existing: Option<(f64, i32)> = self.conn
.query_row(
"SELECT strength, coactivation_count FROM hebbian_links WHERE source_id = ? AND target_id = ?",
params![id1, id2],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.optional()?;
match existing {
Some((strength, _count)) if strength > 0.0 => {
let new_strength = (strength + 0.1).min(1.0);
self.conn.execute(
"UPDATE hebbian_links SET strength = ?, coactivation_count = coactivation_count + 1 WHERE source_id = ? AND target_id = ?",
params![new_strength, id1, id2],
)?;
self.conn.execute(
"UPDATE hebbian_links SET strength = ?, coactivation_count = coactivation_count + 1 WHERE source_id = ? AND target_id = ?",
params![new_strength, id2, id1],
)?;
Ok(false)
}
Some((_, count)) => {
let new_count = count + 1;
if new_count >= threshold {
self.conn.execute(
"UPDATE hebbian_links SET strength = 1.0, coactivation_count = ? WHERE source_id = ? AND target_id = ?",
params![new_count, id1, id2],
)?;
self.conn.execute(
"INSERT OR REPLACE INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at) VALUES (?, ?, 1.0, ?, ?)",
params![id2, id1, new_count, now_f64()],
)?;
Ok(true)
} else {
self.conn.execute(
"UPDATE hebbian_links SET coactivation_count = ? WHERE source_id = ? AND target_id = ?",
params![new_count, id1, id2],
)?;
Ok(false)
}
}
None => {
self.conn.execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at) VALUES (?, ?, 0.0, 1, ?)",
params![id1, id2, now_f64()],
)?;
Ok(false)
}
}
}
pub fn decay_hebbian_links(&mut self, factor: f64) -> Result<usize, rusqlite::Error> {
self.conn.execute(
"UPDATE hebbian_links SET strength = strength * ? WHERE strength > 0",
params![factor],
)?;
let pruned = self.conn.execute(
"DELETE FROM hebbian_links WHERE strength > 0 AND strength < 0.1",
[],
)?;
Ok(pruned)
}
pub fn merge_hebbian_links(
&mut self,
donor_id: &str,
target_id: &str,
) -> Result<usize, rusqlite::Error> {
let links = self.get_hebbian_links_weighted(donor_id)?;
let mut transferred = 0;
for (other_id, weight) in &links {
if other_id == target_id {
continue;
}
let existing_weight: Option<f64> = self.conn.query_row(
"SELECT strength FROM hebbian_links WHERE \
(source_id = ?1 AND target_id = ?2) OR (source_id = ?2 AND target_id = ?1)",
params![target_id, other_id],
|row| row.get(0),
).optional()?;
match existing_weight {
Some(existing) => {
let max_weight = existing.max(*weight);
self.conn.execute(
"UPDATE hebbian_links SET strength = ?1 WHERE \
(source_id = ?2 AND target_id = ?3) OR (source_id = ?3 AND target_id = ?2)",
params![max_weight, target_id, other_id],
)?;
}
None => {
self.conn.execute(
"INSERT OR IGNORE INTO hebbian_links \
(source_id, target_id, strength, coactivation_count, \
temporal_forward, temporal_backward, direction, created_at, namespace) \
VALUES (?1, ?2, ?3, 1, 0, 0, 'bidirectional', ?4, 'default')",
params![target_id, other_id, weight, now_f64()],
)?;
}
}
transferred += 1;
}
self.conn.execute(
"DELETE FROM hebbian_links WHERE source_id = ?1 OR target_id = ?1",
params![donor_id],
)?;
Ok(transferred)
}
pub fn decay_hebbian_links_differential(
&mut self,
decay_corecall: f64,
decay_multi: f64,
decay_single: f64,
) -> Result<usize, rusqlite::Error> {
self.conn.execute(
"UPDATE hebbian_links SET strength = strength * CASE \
WHEN signal_source = 'corecall' THEN ?1 \
WHEN signal_source = 'multi' THEN ?2 \
ELSE ?3 \
END \
WHERE strength > 0",
params![decay_corecall, decay_multi, decay_single],
)?;
let pruned = self.conn.execute(
"DELETE FROM hebbian_links WHERE strength > 0 AND strength < 0.1",
[],
)?;
Ok(pruned)
}
fn row_to_record(
&self,
row: &rusqlite::Row,
access_times: Vec<DateTime<Utc>>,
) -> SqlResult<MemoryRecord> {
let memory_type_str: String = row.get("memory_type")?;
let layer_str: String = row.get("layer")?;
let created_at_f64: f64 = row.get("created_at")?;
let last_consolidated_f64: Option<f64> = row.get("last_consolidated")?;
let metadata_str: Option<String> = row.get("metadata")?;
let memory_type = match memory_type_str.as_str() {
"factual" => MemoryType::Factual,
"episodic" => MemoryType::Episodic,
"relational" => MemoryType::Relational,
"emotional" => MemoryType::Emotional,
"procedural" => MemoryType::Procedural,
"opinion" => MemoryType::Opinion,
"causal" => MemoryType::Causal,
_ => MemoryType::Factual,
};
let layer = match layer_str.as_str() {
"core" => MemoryLayer::Core,
"working" => MemoryLayer::Working,
"archive" => MemoryLayer::Archive,
_ => MemoryLayer::Working,
};
let created_at = f64_to_datetime(created_at_f64);
let last_consolidated = last_consolidated_f64.map(f64_to_datetime);
let contradicts_str: String = row.get("contradicts")?;
let contradicted_by_str: String = row.get("contradicted_by")?;
let superseded_by_str: String = row.get("superseded_by").unwrap_or_default();
let metadata = metadata_str
.and_then(|s| serde_json::from_str(&s).ok());
Ok(MemoryRecord {
id: row.get("id")?,
content: row.get("content")?,
memory_type,
layer,
created_at,
access_times,
working_strength: row.get("working_strength")?,
core_strength: row.get("core_strength")?,
importance: row.get("importance")?,
pinned: row.get::<_, i32>("pinned")? != 0,
consolidation_count: row.get("consolidation_count")?,
last_consolidated,
source: row.get("source")?,
contradicts: if contradicts_str.is_empty() { None } else { Some(contradicts_str) },
contradicted_by: if contradicted_by_str.is_empty() { None } else { Some(contradicted_by_str) },
superseded_by: if superseded_by_str.is_empty() { None } else { Some(superseded_by_str) },
metadata,
})
}
pub fn get_namespace(&self, id: &str) -> Result<Option<String>, rusqlite::Error> {
self.conn
.query_row(
"SELECT namespace FROM memories WHERE id = ?",
params![id],
|row| row.get(0),
)
.optional()
}
pub fn supersede(&self, old_id: &str, new_id: &str) -> Result<(), crate::types::SupersessionError> {
use crate::types::SupersessionError;
if old_id == new_id {
return Err(SupersessionError::SelfSupersession(old_id.to_string()));
}
if self.get(old_id).map_err(SupersessionError::Db)?.is_none() {
return Err(SupersessionError::NotFound(old_id.to_string()));
}
if self.get(new_id).map_err(SupersessionError::Db)?.is_none() {
return Err(SupersessionError::NotFound(new_id.to_string()));
}
let old_ns = self.get_namespace(old_id).map_err(SupersessionError::Db)?;
let new_ns = self.get_namespace(new_id).map_err(SupersessionError::Db)?;
if old_ns != new_ns {
return Err(SupersessionError::CrossNamespace {
old_ns: old_ns.unwrap_or_default(),
new_ns: new_ns.unwrap_or_default(),
});
}
self.conn.execute(
"UPDATE memories SET superseded_by = ? WHERE id = ?",
params![new_id, old_id],
).map_err(SupersessionError::Db)?;
Ok(())
}
pub fn supersede_bulk(&self, old_ids: &[&str], new_id: &str) -> Result<usize, crate::types::SupersessionError> {
use crate::types::SupersessionError;
if old_ids.is_empty() {
return Ok(0);
}
if self.get(new_id).map_err(SupersessionError::Db)?.is_none() {
return Err(SupersessionError::NotFound(new_id.to_string()));
}
let new_ns = self.get_namespace(new_id).map_err(SupersessionError::Db)?;
let mut invalid_ids = Vec::new();
for &old_id in old_ids {
if old_id == new_id {
invalid_ids.push(old_id.to_string());
continue;
}
match self.get(old_id).map_err(SupersessionError::Db)? {
None => invalid_ids.push(old_id.to_string()),
Some(_) => {
let old_ns = self.get_namespace(old_id).map_err(SupersessionError::Db)?;
if old_ns != new_ns {
return Err(SupersessionError::CrossNamespace {
old_ns: old_ns.unwrap_or_default(),
new_ns: new_ns.unwrap_or_default(),
});
}
}
}
}
if !invalid_ids.is_empty() {
return Err(SupersessionError::InvalidIds(invalid_ids));
}
self.conn.execute("SAVEPOINT supersede_bulk", []).map_err(SupersessionError::Db)?;
let result = (|| {
for &old_id in old_ids {
self.conn.execute(
"UPDATE memories SET superseded_by = ? WHERE id = ?",
params![new_id, old_id],
).map_err(SupersessionError::Db)?;
}
Ok::<usize, SupersessionError>(old_ids.len())
})();
match result {
Ok(count) => {
self.conn.execute("RELEASE supersede_bulk", []).map_err(SupersessionError::Db)?;
Ok(count)
}
Err(e) => {
let _ = self.conn.execute("ROLLBACK TO supersede_bulk", []);
let _ = self.conn.execute("RELEASE supersede_bulk", []);
Err(e)
}
}
}
pub fn unsupersede(&self, id: &str) -> Result<(), crate::types::SupersessionError> {
use crate::types::SupersessionError;
if self.get(id).map_err(SupersessionError::Db)?.is_none() {
return Err(SupersessionError::NotFound(id.to_string()));
}
self.conn.execute(
"UPDATE memories SET superseded_by = '' WHERE id = ?",
params![id],
).map_err(SupersessionError::Db)?;
Ok(())
}
pub fn list_superseded(&self, namespace: Option<&str>) -> Result<Vec<(MemoryRecord, String)>, rusqlite::Error> {
let query = if let Some(ns) = namespace {
let mut stmt = self.conn.prepare(
"SELECT * FROM memories WHERE superseded_by != '' AND namespace = ? AND deleted_at IS NULL ORDER BY created_at DESC"
)?;
let rows = stmt.query_map(params![ns], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
let record = self.row_to_record(row, access_times)?;
let superseded_by: String = row.get("superseded_by")?;
Ok((record, superseded_by))
})?;
rows.collect::<Result<Vec<_>, _>>()?
} else {
let mut stmt = self.conn.prepare(
"SELECT * FROM memories WHERE superseded_by != '' AND deleted_at IS NULL ORDER BY created_at DESC"
)?;
let rows = stmt.query_map([], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
let record = self.row_to_record(row, access_times)?;
let superseded_by: String = row.get("superseded_by")?;
Ok((record, superseded_by))
})?;
rows.collect::<Result<Vec<_>, _>>()?
};
Ok(query)
}
pub fn resolve_chain_head(&self, id: &str) -> Result<Option<String>, rusqlite::Error> {
let mut current = id.to_string();
let mut visited = std::collections::HashSet::new();
loop {
if !visited.insert(current.clone()) {
log::warn!("Supersession cycle detected involving {}", current);
return Ok(None);
}
match self.get(¤t)? {
Some(record) => match &record.superseded_by {
Some(next) => current = next.clone(),
None => return Ok(Some(current)),
},
None => return Ok(None), }
}
}
pub fn search_fts_ns(
&self,
query: &str,
limit: usize,
namespace: Option<&str>,
) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let tokenized = tokenize_cjk_boundaries(query);
let words = tokenize_like_unicode61(&tokenized);
if words.is_empty() {
return Ok(vec![]);
}
let fts_query = words.iter().map(|w| format!("\"{}\"", w)).collect::<Vec<_>>().join(" OR ");
let ns = namespace.unwrap_or("default");
if ns == "*" {
let mut stmt = self.conn.prepare(
r#"
SELECT m.* FROM memories m
JOIN memories_fts f ON m.rowid = f.rowid
WHERE memories_fts MATCH ? AND m.deleted_at IS NULL
AND (m.superseded_by IS NULL OR m.superseded_by = '')
ORDER BY rank LIMIT ?
"#,
)?;
let rows = stmt.query_map(params![fts_query, limit as i64], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
} else {
let mut stmt = self.conn.prepare(
r#"
SELECT m.* FROM memories m
JOIN memories_fts f ON m.rowid = f.rowid
WHERE memories_fts MATCH ? AND m.namespace = ? AND m.deleted_at IS NULL
AND (m.superseded_by IS NULL OR m.superseded_by = '')
ORDER BY rank LIMIT ?
"#,
)?;
let rows = stmt.query_map(params![fts_query, ns, limit as i64], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
}
pub fn all_in_namespace(&self, namespace: Option<&str>) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let ns = namespace.unwrap_or("default");
if ns == "*" {
return self.all();
}
let mut stmt = self.conn.prepare("SELECT * FROM memories WHERE namespace = ? AND deleted_at IS NULL AND (superseded_by IS NULL OR superseded_by = '')")?;
let rows = stmt.query_map(params![ns], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
fn normalize_model_id(model: &str) -> String {
if model.contains('/') {
model.to_string()
} else if model.starts_with("text-embedding") {
format!("openai/{}", model)
} else if model.is_empty() || model == "unknown" {
"unknown/legacy".to_string()
} else {
format!("ollama/{}", model)
}
}
fn validate_embedding(embedding: &[f32]) -> Result<(), rusqlite::Error> {
if embedding.is_empty() {
return Err(rusqlite::Error::InvalidParameterName(
"Empty embedding".to_string(),
));
}
if !embedding.iter().all(|f| f.is_finite()) {
return Err(rusqlite::Error::InvalidParameterName(
"Non-finite value in embedding (NaN or Inf)".to_string(),
));
}
Ok(())
}
pub fn store_embedding(
&mut self,
memory_id: &str,
embedding: &[f32],
model: &str,
dimensions: usize,
) -> Result<(), rusqlite::Error> {
Self::validate_embedding(embedding)?;
let model = Self::normalize_model_id(model);
let bytes: Vec<u8> = embedding
.iter()
.flat_map(|f| f.to_le_bytes())
.collect();
debug_assert_eq!(bytes.len(), dimensions * 4,
"Blob size mismatch: {} bytes for {} dimensions", bytes.len(), dimensions);
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
r#"
INSERT OR REPLACE INTO memory_embeddings (memory_id, model, embedding, dimensions, created_at)
VALUES (?, ?, ?, ?, ?)
"#,
params![memory_id, model, bytes, dimensions as i64, now],
)?;
Ok(())
}
pub fn get_embedding(&self, memory_id: &str, model: &str) -> Result<Option<Vec<f32>>, rusqlite::Error> {
let model = Self::normalize_model_id(model);
let result: Option<Vec<u8>> = self.conn
.query_row(
"SELECT embedding FROM memory_embeddings WHERE memory_id = ? AND model = ?",
params![memory_id, model],
|row| row.get(0),
)
.optional()?;
Ok(result.map(|bytes| bytes_to_f32_vec(&bytes)))
}
pub fn get_all_embeddings(&self, model: &str) -> Result<Vec<(String, Vec<f32>)>, rusqlite::Error> {
let model = Self::normalize_model_id(model);
let mut stmt = self.conn.prepare(
r#"SELECT e.memory_id, e.embedding FROM memory_embeddings e
JOIN memories m ON e.memory_id = m.id
WHERE e.model = ? AND m.deleted_at IS NULL
AND (m.superseded_by IS NULL OR m.superseded_by = '')"#
)?;
let rows = stmt.query_map(params![model], |row| {
let memory_id: String = row.get(0)?;
let bytes: Vec<u8> = row.get(1)?;
Ok((memory_id, bytes_to_f32_vec(&bytes)))
})?;
rows.collect()
}
pub fn get_embeddings_in_namespace(
&self,
namespace: Option<&str>,
model: &str,
) -> Result<Vec<(String, Vec<f32>)>, rusqlite::Error> {
let model = Self::normalize_model_id(model);
let ns = namespace.unwrap_or("default");
if ns == "*" {
return self.get_all_embeddings(&model);
}
let mut stmt = self.conn.prepare(
r#"
SELECT e.memory_id, e.embedding FROM memory_embeddings e
JOIN memories m ON e.memory_id = m.id
WHERE m.namespace = ? AND e.model = ? AND m.deleted_at IS NULL
AND (m.superseded_by IS NULL OR m.superseded_by = '')
"#
)?;
let rows = stmt.query_map(params![ns, model], |row| {
let memory_id: String = row.get(0)?;
let bytes: Vec<u8> = row.get(1)?;
Ok((memory_id, bytes_to_f32_vec(&bytes)))
})?;
rows.collect()
}
pub fn conn(&self) -> &Connection {
&self.conn
}
pub fn soft_delete(&self, id: &str) -> Result<(), rusqlite::Error> {
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"UPDATE memories SET deleted_at = ?1 WHERE id = ?2",
params![now, id],
)?;
Ok(())
}
pub fn hard_delete_cascade(&self, id: &str) -> Result<(), rusqlite::Error> {
self.conn.execute("DELETE FROM memory_embeddings WHERE memory_id = ?1", params![id])?;
self.conn.execute("DELETE FROM access_log WHERE memory_id = ?1", params![id])?;
self.conn.execute("DELETE FROM hebbian_links WHERE source_id = ?1 OR target_id = ?1", params![id])?;
self.conn.execute("DELETE FROM memory_entities WHERE memory_id = ?1", params![id])?;
self.conn.execute("DELETE FROM synthesis_provenance WHERE source_id = ?1 OR insight_id = ?1", params![id])?;
let rowid: Result<i64, _> = self.conn.query_row(
"SELECT rowid FROM memories WHERE id = ?", params![id], |row| row.get(0),
);
if let Ok(rowid) = rowid {
let _ = self.conn.execute("DELETE FROM memories_fts WHERE rowid = ?", params![rowid]);
}
self.conn.execute("DELETE FROM memories WHERE id = ?1", params![id])?;
Ok(())
}
pub fn list_deleted(&self, namespace: Option<&str>) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
let ns = namespace.unwrap_or("default");
if ns == "*" {
let mut stmt = self.conn.prepare("SELECT * FROM memories WHERE deleted_at IS NOT NULL")?;
let rows = stmt.query_map([], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
} else {
let mut stmt = self.conn.prepare(
"SELECT * FROM memories WHERE namespace = ? AND deleted_at IS NOT NULL"
)?;
let rows = stmt.query_map(params![ns], |row| {
let id: String = row.get("id")?;
let access_times = self.get_access_times(&id).unwrap_or_default();
self.row_to_record(row, access_times)
})?;
rows.collect()
}
}
pub fn count_soft_deleted(&self) -> Result<usize, rusqlite::Error> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM memories WHERE deleted_at IS NOT NULL",
[],
|row| row.get(0),
)?;
Ok(count as usize)
}
pub fn get_deleted_at(&self, id: &str) -> Result<Option<String>, rusqlite::Error> {
let result: Option<String> = self.conn.query_row(
"SELECT deleted_at FROM memories WHERE id = ?",
params![id],
|row| row.get(0),
)?;
Ok(result)
}
pub fn delete_embedding(&mut self, memory_id: &str, model: &str) -> Result<(), rusqlite::Error> {
let model = Self::normalize_model_id(model);
self.conn.execute(
"DELETE FROM memory_embeddings WHERE memory_id = ? AND model = ?",
params![memory_id, model],
)?;
Ok(())
}
pub fn delete_all_embeddings(&mut self, memory_id: &str) -> Result<(), rusqlite::Error> {
self.conn.execute(
"DELETE FROM memory_embeddings WHERE memory_id = ?",
params![memory_id],
)?;
Ok(())
}
pub fn get_memories_without_embeddings(&self, model: &str) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
r#"
SELECT m.id FROM memories m
LEFT JOIN memory_embeddings e ON m.id = e.memory_id AND e.model = ?
WHERE e.memory_id IS NULL
"#
)?;
let rows = stmt.query_map(params![model], |row| row.get(0))?;
rows.collect()
}
pub fn embedding_stats(&self) -> Result<EmbeddingStats, rusqlite::Error> {
let total_memories: usize = self.conn.query_row(
"SELECT COUNT(*) FROM memories",
[],
|row| row.get(0),
)?;
let embedded_count: usize = self.conn.query_row(
"SELECT COUNT(DISTINCT memory_id) FROM memory_embeddings",
[],
|row| row.get(0),
)?;
let model: Option<String> = self.conn.query_row(
"SELECT model FROM memory_embeddings GROUP BY model ORDER BY COUNT(*) DESC LIMIT 1",
[],
|row| row.get(0),
).optional()?;
let dimensions: Option<usize> = self.conn.query_row(
"SELECT dimensions FROM memory_embeddings LIMIT 1",
[],
|row| row.get::<_, i64>(0).map(|d| d as usize),
).optional()?;
Ok(EmbeddingStats {
total_memories,
embedded_count,
model,
dimensions,
})
}
pub fn grant_permission(
&mut self,
agent_id: &str,
namespace: &str,
permission: Permission,
granted_by: &str,
) -> Result<(), rusqlite::Error> {
self.conn.execute(
r#"
INSERT OR REPLACE INTO engram_acl (agent_id, namespace, permission, granted_by, created_at)
VALUES (?, ?, ?, ?, ?)
"#,
params![
agent_id,
namespace,
permission.to_string(),
granted_by,
now_f64(),
],
)?;
Ok(())
}
pub fn revoke_permission(&mut self, agent_id: &str, namespace: &str) -> Result<(), rusqlite::Error> {
self.conn.execute(
"DELETE FROM engram_acl WHERE agent_id = ? AND namespace = ?",
params![agent_id, namespace],
)?;
Ok(())
}
pub fn check_permission(
&self,
agent_id: &str,
namespace: &str,
required: Permission,
) -> Result<bool, rusqlite::Error> {
let direct: Option<String> = self.conn
.query_row(
"SELECT permission FROM engram_acl WHERE agent_id = ? AND namespace = ?",
params![agent_id, namespace],
|row| row.get(0),
)
.optional()?;
if let Some(perm_str) = direct {
if let Ok(perm) = perm_str.parse::<Permission>() {
return Ok(Self::permission_allows(perm, required));
}
}
let wildcard: Option<String> = self.conn
.query_row(
"SELECT permission FROM engram_acl WHERE agent_id = ? AND namespace = '*'",
params![agent_id],
|row| row.get(0),
)
.optional()?;
if let Some(perm_str) = wildcard {
if let Ok(perm) = perm_str.parse::<Permission>() {
return Ok(Self::permission_allows(perm, required));
}
}
if namespace == "global" && matches!(required, Permission::Read) {
return Ok(true);
}
if namespace == agent_id && matches!(required, Permission::Write | Permission::Read) {
return Ok(true);
}
Ok(false)
}
fn permission_allows(granted: Permission, required: Permission) -> bool {
match required {
Permission::Read => granted.can_read(),
Permission::Write => granted.can_write(),
Permission::Admin => granted.is_admin(),
}
}
pub fn list_permissions(&self, agent_id: &str) -> Result<Vec<AclEntry>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT agent_id, namespace, permission, granted_by, created_at FROM engram_acl WHERE agent_id = ?"
)?;
let rows = stmt.query_map(params![agent_id], |row| {
let perm_str: String = row.get(2)?;
let created_at_f64: f64 = row.get(4)?;
Ok(AclEntry {
agent_id: row.get(0)?,
namespace: row.get(1)?,
permission: perm_str.parse().unwrap_or(Permission::Read),
granted_by: row.get(3)?,
created_at: f64_to_datetime(created_at_f64),
})
})?;
rows.collect()
}
pub fn get_hebbian_neighbors_ns(
&self,
memory_id: &str,
namespace: Option<&str>,
) -> Result<Vec<String>, rusqlite::Error> {
match namespace {
Some("*") | None => {
self.get_hebbian_neighbors(memory_id)
}
Some(ns) => {
let mut stmt = self.conn.prepare(
"SELECT target_id FROM hebbian_links WHERE source_id = ? AND strength > 0 AND namespace = ?"
)?;
let rows = stmt.query_map(params![memory_id, ns], |row| row.get(0))?;
rows.collect()
}
}
}
pub fn record_coactivation_ns(
&mut self,
id1: &str,
id2: &str,
threshold: i32,
namespace: &str,
) -> Result<bool, rusqlite::Error> {
let (id1, id2) = if id1 < id2 { (id1, id2) } else { (id2, id1) };
let existing: Option<(f64, i32)> = self.conn
.query_row(
"SELECT strength, coactivation_count FROM hebbian_links WHERE source_id = ? AND target_id = ?",
params![id1, id2],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.optional()?;
match existing {
Some((strength, _count)) if strength > 0.0 => {
let new_strength = (strength + 0.1).min(1.0);
self.conn.execute(
"UPDATE hebbian_links SET strength = ?, coactivation_count = coactivation_count + 1 WHERE source_id = ? AND target_id = ?",
params![new_strength, id1, id2],
)?;
self.conn.execute(
"UPDATE hebbian_links SET strength = ?, coactivation_count = coactivation_count + 1 WHERE source_id = ? AND target_id = ?",
params![new_strength, id2, id1],
)?;
Ok(false)
}
Some((_, count)) => {
let new_count = count + 1;
if new_count >= threshold {
self.conn.execute(
"UPDATE hebbian_links SET strength = 1.0, coactivation_count = ? WHERE source_id = ? AND target_id = ?",
params![new_count, id1, id2],
)?;
self.conn.execute(
"INSERT OR REPLACE INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, namespace) VALUES (?, ?, 1.0, ?, ?, ?)",
params![id2, id1, new_count, now_f64(), namespace],
)?;
Ok(true)
} else {
self.conn.execute(
"UPDATE hebbian_links SET coactivation_count = ? WHERE source_id = ? AND target_id = ?",
params![new_count, id1, id2],
)?;
Ok(false)
}
}
None => {
self.conn.execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, namespace) VALUES (?, ?, 0.0, 1, ?, ?)",
params![id1, id2, now_f64(), namespace],
)?;
Ok(false)
}
}
}
pub fn record_cross_namespace_coactivation(
&mut self,
id1: &str,
ns1: &str,
id2: &str,
ns2: &str,
threshold: i32,
) -> Result<bool, rusqlite::Error> {
if ns1 == ns2 {
return self.record_coactivation_ns(id1, id2, threshold, ns1);
}
let (id1, id2, ns1, ns2) = if (ns1, id1) < (ns2, id2) {
(id1, id2, ns1, ns2)
} else {
(id2, id1, ns2, ns1)
};
let existing: Option<(f64, i32)> = self.conn
.query_row(
"SELECT strength, coactivation_count FROM hebbian_links WHERE source_id = ? AND target_id = ?",
params![id1, id2],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.optional()?;
let cross_ns = format!("{}:{}", ns1, ns2);
match existing {
Some((strength, _count)) if strength > 0.0 => {
let new_strength = (strength + 0.1).min(1.0);
self.conn.execute(
"UPDATE hebbian_links SET strength = ?, coactivation_count = coactivation_count + 1 WHERE source_id = ? AND target_id = ?",
params![new_strength, id1, id2],
)?;
self.conn.execute(
"UPDATE hebbian_links SET strength = ?, coactivation_count = coactivation_count + 1 WHERE source_id = ? AND target_id = ?",
params![new_strength, id2, id1],
)?;
Ok(false)
}
Some((_, count)) => {
let new_count = count + 1;
if new_count >= threshold {
self.conn.execute(
"UPDATE hebbian_links SET strength = 1.0, coactivation_count = ? WHERE source_id = ? AND target_id = ?",
params![new_count, id1, id2],
)?;
self.conn.execute(
"INSERT OR REPLACE INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, namespace) VALUES (?, ?, 1.0, ?, ?, ?)",
params![id2, id1, new_count, now_f64(), &cross_ns],
)?;
Ok(true)
} else {
self.conn.execute(
"UPDATE hebbian_links SET coactivation_count = ? WHERE source_id = ? AND target_id = ?",
params![new_count, id1, id2],
)?;
Ok(false)
}
}
None => {
self.conn.execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, namespace) VALUES (?, ?, 0.0, 1, ?, ?)",
params![id1, id2, now_f64(), &cross_ns],
)?;
Ok(false)
}
}
}
pub fn discover_cross_links(
&self,
namespace_a: &str,
namespace_b: &str,
) -> Result<Vec<HebbianLink>, rusqlite::Error> {
let cross_ns_1 = format!("{}:{}", namespace_a, namespace_b);
let cross_ns_2 = format!("{}:{}", namespace_b, namespace_a);
let mut stmt = self.conn.prepare(
r#"
SELECT h.source_id, h.target_id, h.strength, h.coactivation_count,
h.direction, h.created_at, h.namespace,
m1.namespace as source_ns, m2.namespace as target_ns
FROM hebbian_links h
LEFT JOIN memories m1 ON h.source_id = m1.id
LEFT JOIN memories m2 ON h.target_id = m2.id
WHERE h.strength > 0 AND (h.namespace = ? OR h.namespace = ?)
ORDER BY h.strength DESC
"#,
)?;
let rows = stmt.query_map(params![cross_ns_1, cross_ns_2], |row| {
let created_at_f64: f64 = row.get(5)?;
let source_ns: Option<String> = row.get(7)?;
let target_ns: Option<String> = row.get(8)?;
Ok(HebbianLink {
source_id: row.get(0)?,
target_id: row.get(1)?,
strength: row.get(2)?,
coactivation_count: row.get(3)?,
direction: row.get(4)?,
created_at: f64_to_datetime(created_at_f64),
source_ns,
target_ns,
})
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
pub fn get_cross_namespace_neighbors(
&self,
memory_id: &str,
) -> Result<Vec<CrossLink>, rusqlite::Error> {
let source_ns = self.get_namespace(memory_id)?;
let mut stmt = self.conn.prepare(
r#"
SELECT h.source_id, h.target_id, h.strength, m.namespace, m.content
FROM hebbian_links h
JOIN memories m ON h.target_id = m.id
WHERE h.source_id = ? AND h.strength > 0
"#,
)?;
let source_ns_str = source_ns.clone().unwrap_or_else(|| "default".to_string());
let rows = stmt.query_map(params![memory_id], |row| {
let target_ns: String = row.get(3)?;
let content: String = row.get(4)?;
Ok(CrossLink {
source_id: row.get(0)?,
source_ns: source_ns_str.clone(),
target_id: row.get(1)?,
target_ns,
strength: row.get(2)?,
description: Some(content),
})
})?;
let source_ns_val = source_ns.unwrap_or_else(|| "default".to_string());
Ok(rows
.filter_map(|r| r.ok())
.filter(|link| link.target_ns != source_ns_val)
.collect())
}
pub fn get_all_cross_links(&self) -> Result<Vec<CrossLink>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
r#"
SELECT h.source_id, h.target_id, h.strength,
m1.namespace as source_ns, m2.namespace as target_ns,
m2.content as target_content
FROM hebbian_links h
JOIN memories m1 ON h.source_id = m1.id
JOIN memories m2 ON h.target_id = m2.id
WHERE h.strength > 0 AND m1.namespace != m2.namespace
ORDER BY h.strength DESC
"#,
)?;
let rows = stmt.query_map([], |row| {
Ok(CrossLink {
source_id: row.get(0)?,
target_id: row.get(1)?,
strength: row.get(2)?,
source_ns: row.get(3)?,
target_ns: row.get(4)?,
description: row.get(5)?,
})
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
pub fn begin_transaction(&mut self) -> Result<(), rusqlite::Error> {
self.conn.execute_batch("BEGIN IMMEDIATE")?;
Ok(())
}
pub fn commit_transaction(&mut self) -> Result<(), rusqlite::Error> {
self.conn.execute_batch("COMMIT")?;
Ok(())
}
pub fn rollback_transaction(&mut self) -> Result<(), rusqlite::Error> {
self.conn.execute_batch("ROLLBACK")?;
Ok(())
}
pub fn rebuild_fts(&mut self) -> Result<(), rusqlite::Error> {
self.conn.execute("INSERT INTO memories_fts(memories_fts) VALUES('rebuild')", [])?;
Ok(())
}
pub fn integrity_check(&self) -> Result<bool, rusqlite::Error> {
let result: String = self.conn.query_row(
"PRAGMA integrity_check", [], |row| row.get(0)
)?;
Ok(result == "ok")
}
pub fn upsert_entity(
&self,
name: &str,
entity_type: &str,
namespace: &str,
metadata: Option<&str>,
) -> Result<String, rusqlite::Error> {
let entity_id = generate_entity_id(name, entity_type, namespace);
let now = now_f64();
self.conn.execute(
r#"
INSERT INTO entities (id, name, entity_type, namespace, metadata, created_at, updated_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
ON CONFLICT(id) DO UPDATE SET
updated_at = ?6,
metadata = COALESCE(?5, metadata)
"#,
params![entity_id, name, entity_type, namespace, metadata, now],
)?;
Ok(entity_id)
}
pub fn link_memory_entity(
&self,
memory_id: &str,
entity_id: &str,
role: &str,
) -> Result<(), rusqlite::Error> {
self.conn.execute(
"INSERT OR IGNORE INTO memory_entities (memory_id, entity_id, role) VALUES (?1, ?2, ?3)",
params![memory_id, entity_id, role],
)?;
Ok(())
}
pub fn upsert_entity_relation(
&self,
source_id: &str,
target_id: &str,
relation: &str,
namespace: &str,
) -> Result<(), rusqlite::Error> {
let now = now_f64();
let id = format!("{}_{}", source_id, target_id);
self.conn.execute(
r#"
INSERT INTO entity_relations (id, source_id, target_id, relation, confidence, namespace, created_at)
VALUES (?1, ?2, ?3, ?4, 0.1, ?5, ?6)
ON CONFLICT(source_id, target_id, relation) DO UPDATE SET
confidence = MIN(confidence + 0.1, 1.0),
created_at = ?6
"#,
params![id, source_id, target_id, relation, namespace, now],
)?;
Ok(())
}
pub fn find_entities(
&self,
query: &str,
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<EntityRecord>, rusqlite::Error> {
match namespace {
Some(ns) => {
let mut stmt = self.conn.prepare(
"SELECT id, name, entity_type, namespace, metadata, created_at, updated_at \
FROM entities WHERE name = ?1 AND namespace = ?2 LIMIT ?3",
)?;
let rows = stmt.query_map(params![query, ns, limit as i64], |row| {
Ok(EntityRecord {
id: row.get(0)?,
name: row.get(1)?,
entity_type: row.get(2)?,
namespace: row.get(3)?,
metadata: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
})
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
None => {
let mut stmt = self.conn.prepare(
"SELECT id, name, entity_type, namespace, metadata, created_at, updated_at \
FROM entities WHERE name = ?1 LIMIT ?2",
)?;
let rows = stmt.query_map(params![query, limit as i64], |row| {
Ok(EntityRecord {
id: row.get(0)?,
name: row.get(1)?,
entity_type: row.get(2)?,
namespace: row.get(3)?,
metadata: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
})
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
}
}
pub fn get_entity_ids_for_memory(&self, memory_id: &str) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT entity_id FROM memory_entities WHERE memory_id = ?1"
)?;
let rows = stmt.query_map(params![memory_id], |row| row.get(0))?;
rows.collect()
}
pub fn get_entity_memories(&self, entity_id: &str) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT memory_id FROM memory_entities WHERE entity_id = ?1",
)?;
let rows = stmt.query_map(params![entity_id], |row| row.get(0))?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
pub fn get_related_entities(
&self,
entity_id: &str,
limit: usize,
) -> Result<Vec<(String, String)>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
r#"
SELECT target_id, relation FROM entity_relations WHERE source_id = ?1
UNION
SELECT source_id, relation FROM entity_relations WHERE target_id = ?1
LIMIT ?2
"#,
)?;
let rows = stmt.query_map(params![entity_id, limit as i64], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
pub fn get_entity(&self, id: &str) -> Result<Option<EntityRecord>, rusqlite::Error> {
self.conn
.query_row(
"SELECT id, name, entity_type, namespace, metadata, created_at, updated_at \
FROM entities WHERE id = ?1",
params![id],
|row| {
Ok(EntityRecord {
id: row.get(0)?,
name: row.get(1)?,
entity_type: row.get(2)?,
namespace: row.get(3)?,
metadata: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
})
},
)
.optional()
}
pub fn count_entities(&self, namespace: Option<&str>) -> Result<usize, rusqlite::Error> {
match namespace {
Some(ns) => {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM entities WHERE namespace = ?1",
params![ns],
|row| row.get(0),
)?;
Ok(count as usize)
}
None => {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM entities",
[],
|row| row.get(0),
)?;
Ok(count as usize)
}
}
}
pub fn list_entities(
&self,
entity_type: Option<&str>,
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<(EntityRecord, usize)>, rusqlite::Error> {
let sql = match (entity_type, namespace) {
(Some(_), Some(_)) => {
r#"SELECT e.id, e.name, e.entity_type, e.namespace, e.metadata, e.created_at, e.updated_at,
COUNT(me.memory_id) as mention_count
FROM entities e
LEFT JOIN memory_entities me ON e.id = me.entity_id
WHERE e.entity_type = ?1 AND e.namespace = ?2
GROUP BY e.id
ORDER BY mention_count DESC, e.updated_at DESC
LIMIT ?3"#
}
(Some(_), None) => {
r#"SELECT e.id, e.name, e.entity_type, e.namespace, e.metadata, e.created_at, e.updated_at,
COUNT(me.memory_id) as mention_count
FROM entities e
LEFT JOIN memory_entities me ON e.id = me.entity_id
WHERE e.entity_type = ?1
GROUP BY e.id
ORDER BY mention_count DESC, e.updated_at DESC
LIMIT ?3"#
}
(None, Some(_)) => {
r#"SELECT e.id, e.name, e.entity_type, e.namespace, e.metadata, e.created_at, e.updated_at,
COUNT(me.memory_id) as mention_count
FROM entities e
LEFT JOIN memory_entities me ON e.id = me.entity_id
WHERE e.namespace = ?2
GROUP BY e.id
ORDER BY mention_count DESC, e.updated_at DESC
LIMIT ?3"#
}
(None, None) => {
r#"SELECT e.id, e.name, e.entity_type, e.namespace, e.metadata, e.created_at, e.updated_at,
COUNT(me.memory_id) as mention_count
FROM entities e
LEFT JOIN memory_entities me ON e.id = me.entity_id
GROUP BY e.id
ORDER BY mention_count DESC, e.updated_at DESC
LIMIT ?3"#
}
};
let mut stmt = self.conn.prepare(sql)?;
let et = entity_type.unwrap_or("");
let ns = namespace.unwrap_or("");
let rows = stmt.query_map(params![et, ns, limit as i64], |row| {
Ok((
EntityRecord {
id: row.get(0)?,
name: row.get(1)?,
entity_type: row.get(2)?,
namespace: row.get(3)?,
metadata: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
},
row.get::<_, i64>(7)? as usize,
))
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
pub fn entity_stats(&self) -> Result<(usize, usize, usize), rusqlite::Error> {
let entity_count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM entities",
[],
|row| row.get(0),
)?;
let relation_count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM entity_relations",
[],
|row| row.get(0),
)?;
let link_count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM memory_entities",
[],
|row| row.get(0),
)?;
Ok((entity_count as usize, relation_count as usize, link_count as usize))
}
pub fn delete_entity(&self, entity_id: &str) -> Result<bool, rusqlite::Error> {
let affected = self.conn.execute(
"DELETE FROM entities WHERE id = ?1",
[entity_id],
)?;
Ok(affected > 0)
}
pub fn delete_entities_by_filter(
&self,
entity_type: &str,
name_pattern: &str,
) -> Result<usize, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT id FROM entities WHERE entity_type = ?1 AND name GLOB ?2"
)?;
let ids: Vec<String> = stmt.query_map(
rusqlite::params![entity_type, name_pattern],
|row| row.get(0),
)?.filter_map(|r| r.ok()).collect();
let mut count = 0;
for id in &ids {
if self.delete_entity(id)? {
count += 1;
}
}
Ok(count)
}
pub fn clear_memory_entity_links(&self, memory_ids: &[String]) -> Result<usize, rusqlite::Error> {
let mut count = 0;
for mid in memory_ids {
count += self.conn.execute(
"DELETE FROM memory_entities WHERE memory_id = ?1",
[mid],
)?;
}
Ok(count)
}
pub fn find_nearest_embedding(
&self,
embedding: &[f32],
model: &str,
namespace: Option<&str>,
threshold: f64,
) -> Result<Option<(String, f32)>, rusqlite::Error> {
use crate::embeddings::EmbeddingProvider;
let start = std::time::Instant::now();
let stored = self.get_embeddings_in_namespace(namespace, model)?;
let mut best: Option<(String, f32)> = None;
for (mid, stored_emb) in &stored {
let sim = EmbeddingProvider::cosine_similarity(embedding, stored_emb);
if (sim as f64) >= threshold {
match best {
Some((_, best_sim)) if sim > best_sim => {
best = Some((mid.clone(), sim));
}
None => {
best = Some((mid.clone(), sim));
}
_ => {}
}
}
}
let elapsed = start.elapsed();
if elapsed.as_millis() > 100 {
log::warn!(
"Dedup scan took {}ms over {} embeddings",
elapsed.as_millis(),
stored.len()
);
}
Ok(best)
}
pub fn find_all_above_threshold(
&self,
embedding: &[f32],
model: &str,
namespace: Option<&str>,
threshold: f64,
) -> Result<Vec<(String, f32)>, rusqlite::Error> {
let stored = self.get_embeddings_in_namespace(namespace, model)?;
let mut matches = Vec::new();
for (id, stored_emb) in &stored {
let sim = crate::EmbeddingProvider::cosine_similarity(embedding, stored_emb);
if sim as f64 >= threshold {
matches.push((id.clone(), sim));
}
}
matches.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(std::cmp::Ordering::Equal));
Ok(matches)
}
pub fn get_memory_content_preview(&self, id: &str, max_chars: usize) -> Result<String, rusqlite::Error> {
let content: String = self.conn.query_row(
"SELECT content FROM memories WHERE id = ?1",
rusqlite::params![id],
|row| row.get(0),
)?;
Ok(content.chars().take(max_chars).collect())
}
pub fn merge_memory_into(
&mut self,
existing_id: &str,
new_content: &str,
new_importance: f64,
similarity: f32,
) -> Result<MergeOutcome, rusqlite::Error> {
self.conn.execute(
"INSERT INTO access_log (memory_id, accessed_at) VALUES (?, ?)",
params![existing_id, now_f64()],
)?;
self.conn.execute(
"UPDATE memories SET importance = MAX(importance, ?) WHERE id = ?",
params![new_importance, existing_id],
)?;
let (existing_content, existing_metadata_str): (String, Option<String>) = self.conn.query_row(
"SELECT content, metadata FROM memories WHERE id = ?",
params![existing_id],
|row| Ok((row.get(0)?, row.get(1)?)),
)?;
let content_updated = new_content.len() > (existing_content.len() as f64 * 1.3) as usize;
if content_updated {
self.conn.execute(
"UPDATE memories SET content = ? WHERE id = ?",
params![new_content, existing_id],
)?;
}
let mut metadata: serde_json::Value = existing_metadata_str
.as_deref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_else(|| serde_json::json!({}));
if !metadata.is_object() {
metadata = serde_json::json!({});
}
let epoch_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let history_entry = serde_json::json!({
"ts": epoch_secs,
"sim": similarity,
"content_updated": content_updated,
"prev_content_len": existing_content.len(),
"new_content_len": new_content.len(),
});
let merge_history = metadata.get("merge_history")
.and_then(|v| v.as_array().cloned())
.unwrap_or_default();
let mut new_history = merge_history;
new_history.push(history_entry);
if new_history.len() > 10 {
let start = new_history.len() - 10;
new_history = new_history[start..].to_vec();
}
metadata["merge_history"] = serde_json::Value::Array(new_history);
let merge_count = metadata.get("merge_count")
.and_then(|v| v.as_i64())
.unwrap_or(0) + 1;
metadata["merge_count"] = serde_json::json!(merge_count);
let metadata_str = serde_json::to_string(&metadata).unwrap_or_else(|_| "{}".to_string());
self.conn.execute(
"UPDATE memories SET metadata = ? WHERE id = ?",
params![metadata_str, existing_id],
)?;
log::info!(
"Merged duplicate into memory {}: boosted access + importance(max {}), content_updated={}, merge_count={}",
existing_id,
new_importance,
content_updated,
merge_count,
);
Ok(MergeOutcome {
memory_id: existing_id.to_string(),
content_updated,
merge_count: merge_count as i32,
})
}
pub fn get_memories_without_entities(
&self,
limit: usize,
) -> Result<Vec<(String, String, String)>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
r#"
SELECT m.id, m.content, COALESCE(m.namespace, 'default') as ns
FROM memories m
LEFT JOIN memory_entities me ON m.id = me.memory_id
WHERE me.entity_id IS NULL
LIMIT ?1
"#,
)?;
let rows = stmt.query_map(params![limit as i64], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
))
})?;
Ok(rows.filter_map(|r| r.ok()).collect())
}
pub fn record_provenance(&self, record: &ProvenanceRecord) -> Result<(), Box<dyn std::error::Error>> {
self.conn.execute(
"INSERT INTO synthesis_provenance (id, insight_id, source_id, cluster_id, synthesis_timestamp, gate_decision, gate_scores, confidence, source_original_importance) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
params![
record.id,
record.insight_id,
record.source_id,
record.cluster_id,
record.synthesis_timestamp.to_rfc3339(),
record.gate_decision,
record.gate_scores.as_ref().map(|s| serde_json::to_string(s).unwrap_or_default()),
record.confidence,
record.source_original_importance,
],
)?;
Ok(())
}
pub fn get_insight_sources(&self, insight_id: &str) -> Result<Vec<ProvenanceRecord>, Box<dyn std::error::Error>> {
let mut stmt = self.conn.prepare(
"SELECT id, insight_id, source_id, cluster_id, synthesis_timestamp, gate_decision, gate_scores, confidence, source_original_importance FROM synthesis_provenance WHERE insight_id = ?1"
)?;
let records = stmt.query_map([insight_id], |row| {
Self::row_to_provenance(row)
})?.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
pub fn get_memory_insights(&self, source_id: &str) -> Result<Vec<ProvenanceRecord>, Box<dyn std::error::Error>> {
let mut stmt = self.conn.prepare(
"SELECT id, insight_id, source_id, cluster_id, synthesis_timestamp, gate_decision, gate_scores, confidence, source_original_importance FROM synthesis_provenance WHERE source_id = ?1"
)?;
let records = stmt.query_map([source_id], |row| {
Self::row_to_provenance(row)
})?.collect::<Result<Vec<_>, _>>()?;
Ok(records)
}
pub fn delete_provenance(&self, insight_id: &str) -> Result<usize, Box<dyn std::error::Error>> {
let count = self.conn.execute(
"DELETE FROM synthesis_provenance WHERE insight_id = ?1",
[insight_id],
)?;
Ok(count)
}
pub fn check_coverage(&self, member_ids: &[String]) -> Result<f64, Box<dyn std::error::Error>> {
if member_ids.is_empty() {
return Ok(0.0);
}
let mut covered = 0usize;
for id in member_ids {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM synthesis_provenance WHERE source_id = ?1",
[id],
|row| row.get(0),
)?;
if count > 0 {
covered += 1;
}
}
Ok(covered as f64 / member_ids.len() as f64)
}
pub fn update_importance(&self, memory_id: &str, importance: f64) -> Result<(), Box<dyn std::error::Error>> {
self.conn.execute(
"UPDATE memories SET importance = ?1 WHERE id = ?2",
params![importance, memory_id],
)?;
Ok(())
}
pub fn store_raw(
&self,
id: &str,
content: &str,
memory_type: &str,
importance: f64,
metadata: Option<&str>,
) -> Result<(), Box<dyn std::error::Error>> {
let now = datetime_to_f64(&Utc::now());
self.conn.execute(
r#"INSERT INTO memories (
id, content, memory_type, importance, layer,
working_strength, core_strength, source, created_at,
last_consolidated, consolidation_count, pinned, metadata, namespace
) VALUES (?1, ?2, ?3, ?4, 'core', 0.5, 0.5, 'synthesis', ?5, NULL, 0, 0, ?6, 'default')"#,
params![id, content, memory_type, importance, now, metadata],
)?;
self.conn.execute(
"INSERT INTO access_log (memory_id, accessed_at) VALUES (?, ?)",
params![id, now],
)?;
let rowid: i64 = self.conn.query_row(
"SELECT rowid FROM memories WHERE id = ?",
params![id],
|row| row.get(0),
)?;
let tokenized = tokenize_cjk_boundaries(content);
self.conn.execute(
"INSERT INTO memories_fts(rowid, content) VALUES (?, ?)",
params![rowid, tokenized],
)?;
Ok(())
}
fn row_to_provenance(row: &rusqlite::Row) -> Result<ProvenanceRecord, rusqlite::Error> {
let gate_scores_str: Option<String> = row.get(6)?;
let gate_scores: Option<GateScores> = gate_scores_str.and_then(|s| serde_json::from_str(&s).ok());
let ts_str: String = row.get(4)?;
let synthesis_timestamp = chrono::DateTime::parse_from_rfc3339(&ts_str)
.map(|dt| dt.with_timezone(&chrono::Utc))
.unwrap_or_else(|_| chrono::Utc::now());
Ok(ProvenanceRecord {
id: row.get(0)?,
insight_id: row.get(1)?,
source_id: row.get(2)?,
cluster_id: row.get(3)?,
synthesis_timestamp,
gate_decision: row.get(5)?,
gate_scores,
confidence: row.get(7)?,
source_original_importance: row.get(8)?,
})
}
pub fn get_entities_for_memory(&self, memory_id: &str) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT e.name FROM entities e \
INNER JOIN memory_entities me ON e.id = me.entity_id \
WHERE me.memory_id = ?1"
)?;
let rows = stmt.query_map(params![memory_id], |row| row.get(0))?;
rows.collect()
}
pub fn get_embedding_for_memory(&self, memory_id: &str) -> Result<Option<Vec<f32>>, rusqlite::Error> {
let result: Option<Vec<u8>> = self.conn
.query_row(
"SELECT embedding FROM memory_embeddings WHERE memory_id = ?1 LIMIT 1",
params![memory_id],
|row| row.get(0),
)
.optional()?;
Ok(result.map(|bytes| bytes_to_f32_vec(&bytes)))
}
pub fn get_memory_timestamp(&self, memory_id: &str) -> Result<Option<f64>, rusqlite::Error> {
self.conn
.query_row(
"SELECT created_at FROM memories WHERE id = ?1",
params![memory_id],
|row| row.get(0),
)
.optional()
}
pub fn find_entity_overlap(
&self,
entity_names: &[String],
namespace: &str,
threshold: f64,
) -> Result<Option<(String, f64)>, rusqlite::Error> {
if entity_names.is_empty() {
return Ok(None);
}
let placeholders: Vec<String> = entity_names.iter().enumerate()
.map(|(i, _)| format!("?{}", i + 1))
.collect();
let in_clause = placeholders.join(", ");
let sql = format!(
r#"
SELECT me.memory_id, COUNT(DISTINCT e.name) as overlap_count
FROM memory_entities me
JOIN entities e ON me.entity_id = e.id
JOIN memories m ON me.memory_id = m.id
WHERE e.name IN ({})
AND m.namespace = ?{}
AND m.deleted_at IS NULL
GROUP BY me.memory_id
ORDER BY overlap_count DESC
LIMIT 10
"#,
in_clause,
entity_names.len() + 1
);
let mut stmt = self.conn.prepare(&sql)?;
let mut params_vec: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
for name in entity_names {
params_vec.push(Box::new(name.clone()));
}
params_vec.push(Box::new(namespace.to_string()));
let param_refs: Vec<&dyn rusqlite::types::ToSql> = params_vec.iter()
.map(|p| p.as_ref())
.collect();
let mut best: Option<(String, f64)> = None;
let mut rows = stmt.query(param_refs.as_slice())?;
let input_count = entity_names.len();
while let Some(row) = rows.next()? {
let memory_id: String = row.get(0)?;
let overlap_count: usize = row.get::<_, i64>(1)? as usize;
let target_count: usize = self.conn.query_row(
"SELECT COUNT(*) FROM memory_entities WHERE memory_id = ?",
params![memory_id],
|r| r.get::<_, i64>(0),
)? as usize;
let union_count = input_count + target_count - overlap_count;
if union_count == 0 { continue; }
let jaccard = overlap_count as f64 / union_count as f64;
if jaccard >= threshold {
match &best {
Some((_, best_score)) if jaccard <= *best_score => {},
_ => { best = Some((memory_id, jaccard)); }
}
}
}
Ok(best)
}
pub fn append_merge_provenance(
&self,
target_id: &str,
source_id: &str,
similarity: f32,
content_updated: bool,
) -> Result<(), rusqlite::Error> {
let metadata_str: Option<String> = self.conn.query_row(
"SELECT metadata FROM memories WHERE id = ?",
params![target_id],
|row| row.get(0),
)?;
let mut metadata: serde_json::Value = metadata_str
.as_deref()
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_else(|| serde_json::json!({}));
if !metadata.is_object() {
metadata = serde_json::json!({});
}
let epoch_secs = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let entry = serde_json::json!({
"source_id": source_id,
"ts": epoch_secs,
"sim": similarity,
"content_updated": content_updated,
});
let mut history = metadata.get("merge_history")
.and_then(|v| v.as_array().cloned())
.unwrap_or_default();
history.push(entry);
while history.len() > 10 { history.remove(0); }
metadata["merge_history"] = serde_json::Value::Array(history);
let metadata_str = serde_json::to_string(&metadata).unwrap_or_else(|_| "{}".to_string());
self.conn.execute(
"UPDATE memories SET metadata = ? WHERE id = ?",
params![metadata_str, target_id],
)?;
Ok(())
}
pub fn record_association(
&self,
source_id: &str,
target_id: &str,
strength: f64,
signal_source: &str,
signal_detail: &str,
namespace: &str,
) -> Result<bool, rusqlite::Error> {
let existing: Option<(String, String, f64)> = self.conn
.query_row(
"SELECT source_id, target_id, strength FROM hebbian_links \
WHERE (source_id = ?1 AND target_id = ?2) OR (source_id = ?2 AND target_id = ?1) \
LIMIT 1",
params![source_id, target_id],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.optional()?;
match existing {
Some((existing_src, existing_tgt, existing_strength)) => {
let new_strength = existing_strength.max(strength);
if strength > existing_strength {
self.conn.execute(
"UPDATE hebbian_links SET strength = ?1, signal_source = ?2, signal_detail = ?3 \
WHERE source_id = ?4 AND target_id = ?5",
params![new_strength, signal_source, signal_detail, existing_src, existing_tgt],
)?;
} else {
self.conn.execute(
"UPDATE hebbian_links SET strength = ?1 \
WHERE source_id = ?2 AND target_id = ?3",
params![new_strength, existing_src, existing_tgt],
)?;
}
Ok(false)
}
None => {
self.conn.execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, \
created_at, signal_source, signal_detail, namespace) \
VALUES (?1, ?2, ?3, 0, ?4, ?5, ?6, ?7)",
params![
source_id,
target_id,
strength,
now_f64(),
signal_source,
signal_detail,
namespace,
],
)?;
Ok(true)
}
}
}
pub fn get_memory_ids_since(
&self,
since_timestamp: f64,
namespace: &str,
) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT id FROM memories WHERE created_at >= ?1 AND namespace = ?2 \
ORDER BY created_at DESC LIMIT 100"
)?;
let rows = stmt.query_map(params![since_timestamp, namespace], |row| {
row.get(0)
})?;
rows.collect()
}
pub fn store_triples(&self, memory_id: &str, triples: &[Triple]) -> Result<usize, rusqlite::Error> {
let now = chrono::Utc::now().to_rfc3339();
let mut inserted = 0;
for triple in triples {
let rows = self.conn.execute(
"INSERT OR IGNORE INTO triples (memory_id, subject, predicate, object, confidence, source, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
params![
memory_id,
triple.subject,
triple.predicate.as_str(),
triple.object,
triple.confidence,
match &triple.source {
TripleSource::Llm => "llm",
TripleSource::Rule => "rule",
TripleSource::Manual => "manual",
},
now,
],
)?;
if rows > 0 {
inserted += 1;
self.insert_triple_entity(memory_id, &triple.subject)?;
self.insert_triple_entity(memory_id, &triple.object)?;
}
}
Ok(inserted)
}
fn insert_triple_entity(&self, memory_id: &str, entity_name: &str) -> Result<(), rusqlite::Error> {
use std::hash::{Hash, Hasher};
use std::collections::hash_map::DefaultHasher;
let name_lower = entity_name.to_lowercase();
let mut hasher = DefaultHasher::new();
name_lower.hash(&mut hasher);
let entity_id = format!("triple-{:x}", hasher.finish());
let now = datetime_to_f64(&chrono::Utc::now());
self.conn.execute(
"INSERT OR IGNORE INTO entities (id, name, entity_type, namespace, metadata, created_at, updated_at) \
VALUES (?1, ?2, 'concept', 'triple', '{}', ?3, ?3)",
params![entity_id, name_lower, now],
)?;
self.conn.execute(
"INSERT OR IGNORE INTO memory_entities (memory_id, entity_id, role) VALUES (?1, ?2, 'triple')",
params![memory_id, entity_id],
)?;
Ok(())
}
pub fn get_triples(&self, memory_id: &str) -> Result<Vec<Triple>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT subject, predicate, object, confidence, source FROM triples WHERE memory_id = ?1"
)?;
let rows = stmt.query_map(params![memory_id], |row| {
let subject: String = row.get(0)?;
let predicate_str: String = row.get(1)?;
let object: String = row.get(2)?;
let confidence: f64 = row.get(3)?;
let source_str: String = row.get(4)?;
let predicate = Predicate::from_str_lossy(&predicate_str);
let source = match source_str.as_str() {
"rule" => TripleSource::Rule,
"manual" => TripleSource::Manual,
_ => TripleSource::Llm,
};
Ok(Triple {
subject,
predicate,
object,
confidence: confidence.clamp(0.0, 1.0),
source,
})
})?;
rows.collect()
}
pub fn has_triples(&self, memory_id: &str) -> Result<bool, rusqlite::Error> {
let count: i64 = self.conn.query_row(
"SELECT COUNT(*) FROM triples WHERE memory_id = ?1",
params![memory_id],
|row| row.get(0),
)?;
Ok(count > 0)
}
pub fn get_unenriched_memory_ids(&self, limit: usize, max_retries: u32) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT id FROM memories \
WHERE id NOT IN (SELECT DISTINCT memory_id FROM triples) \
AND triple_extraction_attempts < ?1 \
ORDER BY created_at DESC \
LIMIT ?2"
)?;
let rows = stmt.query_map(params![max_retries, limit], |row| row.get(0))?;
rows.collect()
}
pub fn increment_extraction_attempts(&self, memory_id: &str) -> Result<(), rusqlite::Error> {
self.conn.execute(
"UPDATE memories SET triple_extraction_attempts = triple_extraction_attempts + 1 WHERE id = ?1",
params![memory_id],
)?;
Ok(())
}
fn migrate_cluster_state(conn: &Connection) -> SqlResult<()> {
conn.execute_batch(r#"
CREATE TABLE IF NOT EXISTS cluster_state (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_full_cluster_at TEXT,
last_full_memory_count INTEGER DEFAULT 0,
version INTEGER DEFAULT 1
);
INSERT OR IGNORE INTO cluster_state (id) VALUES (1);
CREATE TABLE IF NOT EXISTS cluster_assignments (
memory_id TEXT PRIMARY KEY,
cluster_id TEXT NOT NULL,
assigned_at TEXT NOT NULL,
method TEXT NOT NULL,
confidence REAL NOT NULL DEFAULT 1.0
);
CREATE TABLE IF NOT EXISTS cluster_centroids (
cluster_id TEXT PRIMARY KEY,
centroid BLOB NOT NULL,
member_count INTEGER NOT NULL DEFAULT 0,
dirty INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS cluster_pending (
memory_id TEXT PRIMARY KEY,
added_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS cluster_incremental_state (
cluster_id TEXT PRIMARY KEY,
state_json TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_cluster_assignments_cluster ON cluster_assignments(cluster_id);
"#)?;
Ok(())
}
pub fn init_cluster_tables(&self) -> Result<(), rusqlite::Error> {
Self::migrate_cluster_state(&self.conn)
}
pub fn get_cluster_centroids(&self) -> Result<Vec<(String, Vec<f32>)>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT cluster_id, centroid FROM cluster_centroids"
)?;
let rows = stmt.query_map([], |row| {
let id: String = row.get(0)?;
let bytes: Vec<u8> = row.get(1)?;
Ok((id, bytes_to_f32_vec(&bytes)))
})?;
rows.collect()
}
pub fn assign_to_cluster(
&self, memory_id: &str, cluster_id: &str, method: &str, confidence: f64,
) -> Result<(), rusqlite::Error> {
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"INSERT OR REPLACE INTO cluster_assignments (memory_id, cluster_id, assigned_at, method, confidence)
VALUES (?1, ?2, ?3, ?4, ?5)",
params![memory_id, cluster_id, now, method, confidence],
)?;
Ok(())
}
pub fn update_centroid_incremental(
&self, cluster_id: &str, new_embedding: &[f32],
) -> Result<(), rusqlite::Error> {
let result: Option<(Vec<u8>, i64)> = self.conn.query_row(
"SELECT centroid, member_count FROM cluster_centroids WHERE cluster_id = ?",
params![cluster_id],
|row| Ok((row.get(0)?, row.get(1)?)),
).optional()?;
let now = chrono::Utc::now().to_rfc3339();
match result {
Some((old_bytes, count)) => {
let old = bytes_to_f32_vec(&old_bytes);
let n = count as f32;
let new_centroid: Vec<f32> = old.iter().zip(new_embedding.iter())
.map(|(o, e)| (o * n + e) / (n + 1.0))
.collect();
let new_bytes: Vec<u8> = new_centroid.iter()
.flat_map(|f| f.to_le_bytes())
.collect();
self.conn.execute(
"UPDATE cluster_centroids SET centroid = ?1, member_count = member_count + 1, updated_at = ?2 WHERE cluster_id = ?3",
params![new_bytes, now, cluster_id],
)?;
}
None => {
let bytes: Vec<u8> = new_embedding.iter()
.flat_map(|f| f.to_le_bytes())
.collect();
self.conn.execute(
"INSERT INTO cluster_centroids (cluster_id, centroid, member_count, dirty, updated_at)
VALUES (?1, ?2, 1, 0, ?3)",
params![cluster_id, bytes, now],
)?;
}
}
Ok(())
}
pub fn mark_cluster_dirty(&self, cluster_id: &str) -> Result<(), rusqlite::Error> {
self.conn.execute(
"UPDATE cluster_centroids SET dirty = 1 WHERE cluster_id = ?",
params![cluster_id],
)?;
Ok(())
}
pub fn get_dirty_cluster_ids(&self) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT cluster_id FROM cluster_centroids WHERE dirty = 1"
)?;
let rows = stmt.query_map([], |row| row.get(0))?;
rows.collect()
}
pub fn add_pending_memory(&self, memory_id: &str) -> Result<(), rusqlite::Error> {
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"INSERT OR IGNORE INTO cluster_pending (memory_id, added_at) VALUES (?1, ?2)",
params![memory_id, now],
)?;
Ok(())
}
pub fn get_pending_memory_ids(&self) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare("SELECT memory_id FROM cluster_pending")?;
let rows = stmt.query_map([], |row| row.get(0))?;
rows.collect()
}
pub fn get_cluster_members(&self, cluster_id: &str) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT memory_id FROM cluster_assignments WHERE cluster_id = ?"
)?;
let rows = stmt.query_map(params![cluster_id], |row| row.get(0))?;
rows.collect()
}
pub fn replace_clusters(
&self, old_cluster_ids: &[String], new_clusters: &[(String, Vec<String>, Vec<f32>)],
) -> Result<(), rusqlite::Error> {
let tx = self.conn.unchecked_transaction()?;
for cid in old_cluster_ids {
tx.execute("DELETE FROM cluster_assignments WHERE cluster_id = ?", params![cid])?;
tx.execute("DELETE FROM cluster_centroids WHERE cluster_id = ?", params![cid])?;
}
let now = chrono::Utc::now().to_rfc3339();
for (cluster_id, member_ids, centroid) in new_clusters {
let centroid_bytes: Vec<u8> = centroid.iter().flat_map(|f| f.to_le_bytes()).collect();
tx.execute(
"INSERT OR REPLACE INTO cluster_centroids (cluster_id, centroid, member_count, dirty, updated_at)
VALUES (?1, ?2, ?3, 0, ?4)",
params![cluster_id, centroid_bytes, member_ids.len() as i64, now],
)?;
for mid in member_ids {
tx.execute(
"INSERT OR REPLACE INTO cluster_assignments (memory_id, cluster_id, assigned_at, method, confidence)
VALUES (?1, ?2, ?3, 'warm', 1.0)",
params![mid, cluster_id, now],
)?;
}
}
tx.commit()?;
Ok(())
}
pub fn get_memories_by_ids(&self, ids: &[String]) -> Result<Vec<MemoryRecord>, rusqlite::Error> {
if ids.is_empty() {
return Ok(Vec::new());
}
let mut results = Vec::with_capacity(ids.len());
for id in ids {
if let Some(record) = self.get(id)? {
results.push(record);
}
}
Ok(results)
}
pub fn clear_pending_and_dirty(&self) -> Result<(), rusqlite::Error> {
self.conn.execute("DELETE FROM cluster_pending", [])?;
self.conn.execute("UPDATE cluster_centroids SET dirty = 0", [])?;
Ok(())
}
pub fn save_full_cluster_state(
&self, clusters: &[(String, Vec<String>, Vec<f32>)],
) -> Result<(), rusqlite::Error> {
let tx = self.conn.unchecked_transaction()?;
tx.execute("DELETE FROM cluster_assignments", [])?;
tx.execute("DELETE FROM cluster_centroids", [])?;
tx.execute("DELETE FROM cluster_pending", [])?;
let now = chrono::Utc::now().to_rfc3339();
tx.execute(
"UPDATE cluster_state SET last_full_cluster_at = ?1, last_full_memory_count = ?2 WHERE id = 1",
params![now, clusters.iter().map(|(_, members, _)| members.len()).sum::<usize>() as i64],
)?;
for (cluster_id, member_ids, centroid) in clusters {
let centroid_bytes: Vec<u8> = centroid.iter().flat_map(|f| f.to_le_bytes()).collect();
tx.execute(
"INSERT INTO cluster_centroids (cluster_id, centroid, member_count, dirty, updated_at)
VALUES (?1, ?2, ?3, 0, ?4)",
params![cluster_id, centroid_bytes, member_ids.len() as i64, now],
)?;
for mid in member_ids {
tx.execute(
"INSERT INTO cluster_assignments (memory_id, cluster_id, assigned_at, method, confidence)
VALUES (?1, ?2, ?3, 'full', 1.0)",
params![mid, cluster_id, now],
)?;
}
}
tx.commit()?;
Ok(())
}
pub fn get_pending_count(&self) -> Result<usize, rusqlite::Error> {
self.conn.query_row(
"SELECT COUNT(*) FROM cluster_pending", [], |row| row.get::<_, i64>(0)
).map(|c| c as usize)
}
pub fn count_memories(&self) -> Result<usize, rusqlite::Error> {
self.conn.query_row("SELECT COUNT(*) FROM memories", [], |row| row.get::<_, i64>(0))
.map(|c| c as usize)
}
pub fn get_incremental_state(&self, cluster_id: &str) -> Result<Option<crate::synthesis::types::IncrementalState>, rusqlite::Error> {
let result: Option<String> = self.conn.query_row(
"SELECT state_json FROM cluster_incremental_state WHERE cluster_id = ?",
params![cluster_id],
|row| row.get(0),
).optional()?;
match result {
Some(json) => {
match serde_json::from_str(&json) {
Ok(state) => Ok(Some(state)),
Err(_) => Ok(None),
}
}
None => Ok(None),
}
}
pub fn set_incremental_state(&self, cluster_id: &str, state: &crate::synthesis::types::IncrementalState) -> Result<(), rusqlite::Error> {
let json = serde_json::to_string(state).unwrap_or_default();
let now = chrono::Utc::now().to_rfc3339();
self.conn.execute(
"INSERT OR REPLACE INTO cluster_incremental_state (cluster_id, state_json, updated_at) VALUES (?1, ?2, ?3)",
params![cluster_id, json, now],
)?;
Ok(())
}
pub fn get_all_cluster_data(&self) -> Result<Vec<crate::synthesis::types::MemoryCluster>, rusqlite::Error> {
use std::collections::HashMap;
let mut clusters: HashMap<String, Vec<String>> = HashMap::new();
let mut stmt = self.conn.prepare("SELECT memory_id, cluster_id FROM cluster_assignments")?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
})?;
for row in rows {
let (memory_id, cluster_id) = row?;
clusters.entry(cluster_id).or_default().push(memory_id);
}
let result = clusters.into_iter().map(|(cluster_id, mut members)| {
members.sort();
let centroid_id = members.first().cloned().unwrap_or_default();
crate::synthesis::types::MemoryCluster {
id: cluster_id,
members,
quality_score: 0.5, centroid_id,
signals_summary: crate::synthesis::types::SignalsSummary {
dominant_signal: crate::synthesis::types::ClusterSignal::Hebbian,
hebbian_contribution: 0.4,
entity_contribution: 0.3,
embedding_contribution: 0.2,
temporal_contribution: 0.1,
},
}
}).collect();
Ok(result)
}
pub fn list_namespaces(&self) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare("SELECT DISTINCT namespace FROM memories WHERE deleted_at IS NULL")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect()
}
pub fn count_orphan_memories(&self) -> Result<usize, rusqlite::Error> {
self.conn.query_row(
"SELECT COUNT(*) FROM memories m WHERE m.deleted_at IS NULL AND NOT EXISTS (SELECT 1 FROM memory_embeddings me WHERE me.memory_id = m.id)",
[],
|row| row.get(0),
)
}
pub fn count_dangling_hebbian(&self) -> Result<usize, rusqlite::Error> {
self.conn.query_row(
"SELECT COUNT(*) FROM hebbian_links h WHERE NOT EXISTS (SELECT 1 FROM memories m WHERE m.id = h.source_id AND m.deleted_at IS NULL) OR NOT EXISTS (SELECT 1 FROM memories m WHERE m.id = h.target_id AND m.deleted_at IS NULL)",
[],
|row| row.get(0),
)
}
pub fn get_orphan_memory_ids(&self) -> Result<Vec<String>, rusqlite::Error> {
let mut stmt = self.conn.prepare(
"SELECT m.id FROM memories m WHERE m.deleted_at IS NULL AND NOT EXISTS (SELECT 1 FROM memory_embeddings me WHERE me.memory_id = m.id)"
)?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
rows.collect()
}
pub fn cleanup_orphaned_access_log(&self) -> Result<usize, rusqlite::Error> {
self.conn.execute(
"DELETE FROM access_log WHERE memory_id NOT IN (SELECT id FROM memories WHERE deleted_at IS NULL)",
[],
)
}
pub fn cleanup_dangling_hebbian(&self) -> Result<usize, rusqlite::Error> {
self.conn.execute(
"DELETE FROM hebbian_links WHERE source_id NOT IN (SELECT id FROM memories WHERE deleted_at IS NULL) OR target_id NOT IN (SELECT id FROM memories WHERE deleted_at IS NULL)",
[],
)
}
pub fn cleanup_orphaned_entity_links(&self) -> Result<usize, rusqlite::Error> {
self.conn.execute(
"DELETE FROM memory_entities WHERE memory_id NOT IN (SELECT id FROM memories WHERE deleted_at IS NULL)",
[],
)
}
pub fn count_memories_in_namespace(&self, namespace: Option<&str>) -> Result<usize, rusqlite::Error> {
match namespace {
Some(ns) => self.conn.query_row(
"SELECT COUNT(*) FROM memories WHERE namespace = ? AND deleted_at IS NULL",
params![ns],
|row| row.get(0),
),
None => self.conn.query_row(
"SELECT COUNT(*) FROM memories WHERE deleted_at IS NULL",
[],
|row| row.get(0),
),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{MemoryLayer, MemoryRecord, MemoryType};
fn test_storage() -> Storage {
Storage::new(":memory:").expect("in-memory storage")
}
fn make_record(id: &str, content: &str, created_at: DateTime<Utc>) -> MemoryRecord {
MemoryRecord {
id: id.to_string(),
content: content.to_string(),
memory_type: MemoryType::Factual,
layer: MemoryLayer::Working,
created_at,
access_times: vec![created_at],
working_strength: 1.0,
core_strength: 0.0,
importance: 0.5,
pinned: false,
consolidation_count: 0,
last_consolidated: None,
source: String::new(),
contradicts: None,
contradicted_by: None,
superseded_by: None,
metadata: None,
}
}
#[test]
fn test_record_association_new() {
let _storage = test_storage();
let mut storage_mut = Storage::new(":memory:").unwrap();
let now = Utc::now();
let m1 = make_record("mem_a", "memory about cats", now);
let m2 = make_record("mem_b", "memory about dogs", now);
storage_mut.add(&m1, "default").unwrap();
storage_mut.add(&m2, "default").unwrap();
let created = storage_mut
.record_association("mem_a", "mem_b", 0.5, "entity", r#"{"entity_overlap":0.4}"#, "default")
.unwrap();
assert!(created, "should create new link");
let row: (f64, String, String) = storage_mut.connection().query_row(
"SELECT strength, signal_source, signal_detail FROM hebbian_links WHERE source_id = 'mem_a' AND target_id = 'mem_b'",
[],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
).unwrap();
assert!((row.0 - 0.5).abs() < f64::EPSILON);
assert_eq!(row.1, "entity");
assert_eq!(row.2, r#"{"entity_overlap":0.4}"#);
}
#[test]
fn test_record_association_duplicate() {
let mut storage = test_storage();
let now = Utc::now();
let m1 = make_record("mem_a", "memory about cats", now);
let m2 = make_record("mem_b", "memory about dogs", now);
storage.add(&m1, "default").unwrap();
storage.add(&m2, "default").unwrap();
let created1 = storage
.record_association("mem_a", "mem_b", 0.5, "entity", "{}", "default")
.unwrap();
assert!(created1);
let created2 = storage
.record_association("mem_a", "mem_b", 0.3, "temporal", "{}", "default")
.unwrap();
assert!(!created2, "should not create duplicate");
let count: i64 = storage.connection().query_row(
"SELECT COUNT(*) FROM hebbian_links WHERE \
(source_id = 'mem_a' AND target_id = 'mem_b') OR \
(source_id = 'mem_b' AND target_id = 'mem_a')",
[],
|row| row.get(0),
).unwrap();
assert_eq!(count, 1);
}
#[test]
fn test_record_association_bidirectional() {
let mut storage = test_storage();
let now = Utc::now();
let m1 = make_record("mem_a", "memory about cats", now);
let m2 = make_record("mem_b", "memory about dogs", now);
storage.add(&m1, "default").unwrap();
storage.add(&m2, "default").unwrap();
let created1 = storage
.record_association("mem_a", "mem_b", 0.5, "entity", "{}", "default")
.unwrap();
assert!(created1);
let created2 = storage
.record_association("mem_b", "mem_a", 0.6, "multi", "{}", "default")
.unwrap();
assert!(!created2, "B→A should not create duplicate when A→B exists");
let count: i64 = storage.connection().query_row(
"SELECT COUNT(*) FROM hebbian_links WHERE \
(source_id = 'mem_a' AND target_id = 'mem_b') OR \
(source_id = 'mem_b' AND target_id = 'mem_a')",
[],
|row| row.get(0),
).unwrap();
assert_eq!(count, 1);
let strength: f64 = storage.connection().query_row(
"SELECT strength FROM hebbian_links WHERE source_id = 'mem_a' AND target_id = 'mem_b'",
[],
|row| row.get(0),
).unwrap();
assert!((strength - 0.6).abs() < f64::EPSILON);
}
#[test]
fn test_decay_differential_rates() {
let mut storage = test_storage();
let now = Utc::now();
for id in &["m1", "m2", "m3", "m4", "m5", "m6"] {
let rec = make_record(id, &format!("memory {}", id), now);
storage.add(&rec, "default").unwrap();
}
let now_f64 = now.timestamp() as f64;
storage.connection().execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, signal_source, namespace) \
VALUES ('m1', 'm2', 1.0, 1, ?1, 'corecall', 'default')",
params![now_f64],
).unwrap();
storage.connection().execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, signal_source, namespace) \
VALUES ('m3', 'm4', 1.0, 1, ?1, 'multi', 'default')",
params![now_f64],
).unwrap();
storage.connection().execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, signal_source, namespace) \
VALUES ('m5', 'm6', 1.0, 1, ?1, 'entity', 'default')",
params![now_f64],
).unwrap();
storage.decay_hebbian_links_differential(0.95, 0.90, 0.85).unwrap();
let get_strength = |src: &str, tgt: &str| -> f64 {
storage.connection().query_row(
"SELECT strength FROM hebbian_links WHERE source_id = ?1 AND target_id = ?2",
params![src, tgt],
|row| row.get(0),
).unwrap()
};
let corecall_str = get_strength("m1", "m2");
let multi_str = get_strength("m3", "m4");
let entity_str = get_strength("m5", "m6");
assert!((corecall_str - 0.95).abs() < 1e-9, "corecall should be 0.95, got {}", corecall_str);
assert!((multi_str - 0.90).abs() < 1e-9, "multi should be 0.90, got {}", multi_str);
assert!((entity_str - 0.85).abs() < 1e-9, "entity should be 0.85, got {}", entity_str);
assert!(corecall_str > multi_str);
assert!(multi_str > entity_str);
}
#[test]
fn test_decay_differential_deletes_weak() {
let mut storage = test_storage();
let now = Utc::now();
for id in &["m1", "m2", "m3", "m4"] {
let rec = make_record(id, &format!("memory {}", id), now);
storage.add(&rec, "default").unwrap();
}
let now_f64 = now.timestamp() as f64;
storage.connection().execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, signal_source, namespace) \
VALUES ('m1', 'm2', 0.11, 1, ?1, 'entity', 'default')",
params![now_f64],
).unwrap();
storage.connection().execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, signal_source, namespace) \
VALUES ('m3', 'm4', 0.5, 1, ?1, 'corecall', 'default')",
params![now_f64],
).unwrap();
let deleted = storage.decay_hebbian_links_differential(0.95, 0.90, 0.85).unwrap();
assert_eq!(deleted, 1, "should delete 1 weak link");
let count: i64 = storage.connection().query_row(
"SELECT COUNT(*) FROM hebbian_links WHERE source_id = 'm1' AND target_id = 'm2'",
[],
|row| row.get(0),
).unwrap();
assert_eq!(count, 0, "weak entity link should be deleted");
let count: i64 = storage.connection().query_row(
"SELECT COUNT(*) FROM hebbian_links WHERE source_id = 'm3' AND target_id = 'm4'",
[],
|row| row.get(0),
).unwrap();
assert_eq!(count, 1, "strong corecall link should survive");
}
#[test]
fn test_hebbian_signal_migration_fresh_db() {
let storage = test_storage();
let has_signal_source: bool = storage.connection().query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('hebbian_links') WHERE name='signal_source'",
[],
|row| row.get(0),
).unwrap();
assert!(has_signal_source, "signal_source column should exist on fresh DB");
let has_signal_detail: bool = storage.connection().query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('hebbian_links') WHERE name='signal_detail'",
[],
|row| row.get(0),
).unwrap();
assert!(has_signal_detail, "signal_detail column should exist on fresh DB");
let has_index: bool = storage.connection().query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='index' AND name='idx_hebbian_signal_source'",
[],
|row| row.get(0),
).unwrap();
assert!(has_index, "idx_hebbian_signal_source index should exist");
}
#[test]
fn test_hebbian_signal_migration_idempotent() {
let storage = test_storage();
Storage::migrate_hebbian_signals(storage.connection()).unwrap();
Storage::migrate_hebbian_signals(storage.connection()).unwrap();
let has_signal_source: bool = storage.connection().query_row(
"SELECT COUNT(*) > 0 FROM pragma_table_info('hebbian_links') WHERE name='signal_source'",
[],
|row| row.get(0),
).unwrap();
assert!(has_signal_source);
}
#[test]
fn test_hebbian_signal_migration_backfills_existing_rows() {
let conn = Connection::open(":memory:").unwrap();
conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA foreign_keys=ON; PRAGMA busy_timeout=5000;").unwrap();
Storage::create_schema(&conn).unwrap();
Storage::migrate_v2(&conn).unwrap();
let now = now_f64();
conn.execute(
"INSERT INTO memories (id, content, memory_type, layer, created_at, namespace) VALUES ('m1', 'test1', 'factual', 'working', ?1, 'default')",
params![now],
).unwrap();
conn.execute(
"INSERT INTO memories (id, content, memory_type, layer, created_at, namespace) VALUES ('m2', 'test2', 'factual', 'working', ?1, 'default')",
params![now],
).unwrap();
conn.execute(
"INSERT INTO hebbian_links (source_id, target_id, strength, coactivation_count, created_at, namespace) VALUES ('m1', 'm2', 1.0, 3, ?1, 'default')",
params![now],
).unwrap();
Storage::migrate_hebbian_signals(&conn).unwrap();
let source_after: String = conn.query_row(
"SELECT signal_source FROM hebbian_links WHERE source_id = 'm1'",
[],
|row| row.get(0),
).unwrap();
assert_eq!(source_after, "corecall", "signal_source should be backfilled to 'corecall'");
}
#[test]
fn test_cluster_centroids_roundtrip() {
let storage = test_storage();
let centroid = vec![1.0f32, 2.0, 3.0];
storage.update_centroid_incremental("cluster_a", ¢roid).unwrap();
let centroids = storage.get_cluster_centroids().unwrap();
assert_eq!(centroids.len(), 1);
assert_eq!(centroids[0].0, "cluster_a");
assert_eq!(centroids[0].1, vec![1.0f32, 2.0, 3.0]);
}
#[test]
fn test_assign_to_cluster() {
let storage = test_storage();
storage.assign_to_cluster("mem_1", "cluster_a", "hot", 0.95).unwrap();
let members = storage.get_cluster_members("cluster_a").unwrap();
assert_eq!(members, vec!["mem_1".to_string()]);
}
#[test]
fn test_centroid_incremental_update() {
let storage = test_storage();
storage.update_centroid_incremental("cluster_a", &[1.0, 0.0, 0.0]).unwrap();
storage.update_centroid_incremental("cluster_a", &[0.0, 1.0, 0.0]).unwrap();
let centroids = storage.get_cluster_centroids().unwrap();
assert_eq!(centroids.len(), 1);
let (id, vec) = ¢roids[0];
assert_eq!(id, "cluster_a");
assert!((vec[0] - 0.5).abs() < 1e-6, "expected 0.5, got {}", vec[0]);
assert!((vec[1] - 0.5).abs() < 1e-6, "expected 0.5, got {}", vec[1]);
assert!((vec[2] - 0.0).abs() < 1e-6, "expected 0.0, got {}", vec[2]);
}
#[test]
fn test_dirty_cluster_tracking() {
let storage = test_storage();
storage.update_centroid_incremental("cluster_a", &[1.0, 0.0]).unwrap();
storage.update_centroid_incremental("cluster_b", &[0.0, 1.0]).unwrap();
storage.mark_cluster_dirty("cluster_a").unwrap();
let dirty = storage.get_dirty_cluster_ids().unwrap();
assert_eq!(dirty, vec!["cluster_a".to_string()]);
storage.clear_pending_and_dirty().unwrap();
let dirty = storage.get_dirty_cluster_ids().unwrap();
assert!(dirty.is_empty());
}
#[test]
fn test_pending_memory_tracking() {
let storage = test_storage();
storage.add_pending_memory("mem_1").unwrap();
storage.add_pending_memory("mem_2").unwrap();
storage.add_pending_memory("mem_1").unwrap();
let pending = storage.get_pending_memory_ids().unwrap();
assert_eq!(pending.len(), 2);
assert!(pending.contains(&"mem_1".to_string()));
assert!(pending.contains(&"mem_2".to_string()));
assert_eq!(storage.get_pending_count().unwrap(), 2);
storage.clear_pending_and_dirty().unwrap();
let pending = storage.get_pending_memory_ids().unwrap();
assert!(pending.is_empty());
assert_eq!(storage.get_pending_count().unwrap(), 0);
}
#[test]
fn test_replace_clusters() {
let storage = test_storage();
storage.update_centroid_incremental("old_c1", &[1.0, 0.0]).unwrap();
storage.assign_to_cluster("mem_1", "old_c1", "full", 1.0).unwrap();
storage.assign_to_cluster("mem_2", "old_c1", "full", 1.0).unwrap();
storage.update_centroid_incremental("old_c2", &[0.0, 1.0]).unwrap();
storage.assign_to_cluster("mem_3", "old_c2", "full", 1.0).unwrap();
let new_clusters = vec![
("new_c1".to_string(), vec!["mem_1".to_string(), "mem_3".to_string()], vec![0.5f32, 0.5]),
];
storage.replace_clusters(
&["old_c1".to_string(), "old_c2".to_string()],
&new_clusters,
).unwrap();
assert!(storage.get_cluster_members("old_c1").unwrap().is_empty());
assert!(storage.get_cluster_members("old_c2").unwrap().is_empty());
let members = storage.get_cluster_members("new_c1").unwrap();
assert_eq!(members.len(), 2);
assert!(members.contains(&"mem_1".to_string()));
assert!(members.contains(&"mem_3".to_string()));
let centroids = storage.get_cluster_centroids().unwrap();
let new_centroid = centroids.iter().find(|(id, _)| id == "new_c1").unwrap();
assert_eq!(new_centroid.1, vec![0.5f32, 0.5]);
}
#[test]
fn test_save_full_cluster_state() {
let storage = test_storage();
storage.update_centroid_incremental("old_c", &[1.0]).unwrap();
storage.assign_to_cluster("mem_x", "old_c", "hot", 0.5).unwrap();
storage.add_pending_memory("mem_p").unwrap();
let clusters = vec![
("c1".to_string(), vec!["m1".to_string(), "m2".to_string()], vec![1.0f32, 0.0]),
("c2".to_string(), vec!["m3".to_string()], vec![0.0f32, 1.0]),
];
storage.save_full_cluster_state(&clusters).unwrap();
assert!(storage.get_cluster_members("old_c").unwrap().is_empty());
assert!(storage.get_pending_memory_ids().unwrap().is_empty());
let members_c1 = storage.get_cluster_members("c1").unwrap();
assert_eq!(members_c1.len(), 2);
assert!(members_c1.contains(&"m1".to_string()));
assert!(members_c1.contains(&"m2".to_string()));
let members_c2 = storage.get_cluster_members("c2").unwrap();
assert_eq!(members_c2, vec!["m3".to_string()]);
let centroids = storage.get_cluster_centroids().unwrap();
assert_eq!(centroids.len(), 2);
let (last_at, count): (String, i64) = storage.conn.query_row(
"SELECT last_full_cluster_at, last_full_memory_count FROM cluster_state WHERE id = 1",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
).unwrap();
assert!(!last_at.is_empty());
assert_eq!(count, 3); }
#[test]
fn test_get_memories_by_ids_empty() {
let storage = test_storage();
let result = storage.get_memories_by_ids(&[]).unwrap();
assert!(result.is_empty());
}
}