use crate::{Database, DbResultExt};
use chrono::Utc;
use roboticus_core::Result;
use rusqlite::OptionalExtension;
fn auto_index(
db: &Database,
source_table: &str,
source_id: &str,
summary: &str,
category: Option<&str>,
) {
if let Err(e) =
crate::memory_index::upsert_index_entry(db, source_table, source_id, summary, category)
{
tracing::warn!(error = %e, source_table, source_id, "auto-index failed");
}
}
#[derive(Debug, Clone)]
pub struct WorkingEntry {
pub id: String,
pub session_id: String,
pub entry_type: String,
pub content: String,
pub importance: i32,
pub created_at: String,
}
pub fn store_working(
db: &Database,
session_id: &str,
entry_type: &str,
content: &str,
importance: i32,
) -> Result<String> {
let conn = db.conn();
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let tx = conn.unchecked_transaction().db_err()?;
tx.execute(
"INSERT INTO working_memory (id, session_id, entry_type, content, importance, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
rusqlite::params![id, session_id, entry_type, content, importance, now],
)
.db_err()?;
tx.execute(
"DELETE FROM memory_fts WHERE source_table = 'working' AND source_id = ?1",
rusqlite::params![id],
)
.db_err()?;
tx.execute(
"INSERT INTO memory_fts (content, category, source_table, source_id) VALUES (?1, ?2, 'working', ?3)",
rusqlite::params![content, entry_type, id],
)
.db_err()?;
tx.commit().db_err()?;
Ok(id)
}
pub fn retrieve_working(db: &Database, session_id: &str) -> Result<Vec<WorkingEntry>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, session_id, entry_type, content, importance, created_at \
FROM working_memory WHERE session_id = ?1 ORDER BY importance DESC, created_at DESC",
)
.db_err()?;
let rows = stmt
.query_map([session_id], |row| {
Ok(WorkingEntry {
id: row.get(0)?,
session_id: row.get(1)?,
entry_type: row.get(2)?,
content: row.get(3)?,
importance: row.get(4)?,
created_at: row.get(5)?,
})
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn retrieve_working_all(db: &Database, limit: i64) -> Result<Vec<WorkingEntry>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, session_id, entry_type, content, importance, created_at \
FROM working_memory ORDER BY importance DESC, created_at DESC LIMIT ?1",
)
.db_err()?;
let rows = stmt
.query_map([limit], |row| {
Ok(WorkingEntry {
id: row.get(0)?,
session_id: row.get(1)?,
entry_type: row.get(2)?,
content: row.get(3)?,
importance: row.get(4)?,
created_at: row.get(5)?,
})
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
#[derive(Debug, Clone)]
pub struct EpisodicEntry {
pub id: String,
pub classification: String,
pub content: String,
pub importance: i32,
pub owner_id: Option<String>,
pub memory_state: String,
pub state_reason: Option<String>,
pub created_at: String,
}
pub fn store_episodic(
db: &Database,
classification: &str,
content: &str,
importance: i32,
) -> Result<String> {
store_episodic_with_meta(
db,
classification,
content,
importance,
None,
"active",
None,
)
}
pub fn store_episodic_with_meta(
db: &Database,
classification: &str,
content: &str,
importance: i32,
owner_id: Option<&str>,
memory_state: &str,
state_reason: Option<&str>,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
{
let conn = db.conn();
conn.execute(
"INSERT INTO episodic_memory
(id, classification, content, importance, owner_id, memory_state, state_reason, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
rusqlite::params![
id,
classification,
content,
importance,
owner_id,
memory_state,
state_reason,
now
],
)
.db_err()?;
}
let summary: String = content.chars().take(150).collect();
auto_index(db, "episodic_memory", &id, &summary, Some(classification));
Ok(id)
}
pub fn retrieve_episodic(db: &Database, limit: i64) -> Result<Vec<EpisodicEntry>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, classification, content, importance, owner_id, memory_state, state_reason, created_at \
FROM episodic_memory ORDER BY importance DESC, created_at DESC LIMIT ?1",
)
.db_err()?;
let rows = stmt
.query_map([limit], |row| {
Ok(EpisodicEntry {
id: row.get(0)?,
classification: row.get(1)?,
content: row.get(2)?,
importance: row.get(3)?,
owner_id: row.get(4)?,
memory_state: row.get(5)?,
state_reason: row.get(6)?,
created_at: row.get(7)?,
})
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn retrieve_recent_episodic(
db: &Database,
max_age_hours: u32,
limit: i64,
) -> Result<Vec<EpisodicEntry>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, classification, content, importance, owner_id, \
memory_state, state_reason, created_at \
FROM episodic_memory \
WHERE created_at >= datetime('now', ?1) \
AND (memory_state IS NULL OR memory_state NOT IN ('inactive', 'stale')) \
ORDER BY created_at DESC \
LIMIT ?2",
)
.db_err()?;
let age_modifier = format!("-{max_age_hours} hours");
let rows = stmt
.query_map(rusqlite::params![age_modifier, limit], |row| {
Ok(EpisodicEntry {
id: row.get(0)?,
classification: row.get(1)?,
content: row.get(2)?,
importance: row.get(3)?,
owner_id: row.get(4)?,
memory_state: row.get(5)?,
state_reason: row.get(6)?,
created_at: row.get(7)?,
})
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn mark_episodic_digests_stale_for_owner(
db: &Database,
owner_id: &str,
keep_id: &str,
reason: &str,
) -> Result<usize> {
let conn = db.conn();
let updated = conn
.execute(
"UPDATE episodic_memory
SET memory_state = 'stale', state_reason = ?1
WHERE owner_id = ?2
AND classification = 'digest'
AND id != ?3
AND memory_state = 'active'",
rusqlite::params![reason, owner_id, keep_id],
)
.db_err()?;
Ok(updated)
}
#[derive(Debug, Clone)]
pub struct SemanticEntry {
pub id: String,
pub category: String,
pub key: String,
pub value: String,
pub confidence: f64,
pub memory_state: String,
pub state_reason: Option<String>,
pub created_at: String,
pub updated_at: String,
}
pub fn store_semantic(
db: &Database,
category: &str,
key: &str,
value: &str,
confidence: f64,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let actual_id = {
let conn = db.conn();
let tx = conn.unchecked_transaction().db_err()?;
tx.execute(
"INSERT INTO semantic_memory
(id, category, key, value, confidence, memory_state, state_reason, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, 'active', NULL, ?6) \
ON CONFLICT(category, key) DO UPDATE SET value = excluded.value, \
confidence = excluded.confidence, memory_state = 'active', state_reason = NULL, updated_at = ?6",
rusqlite::params![id, category, key, value, confidence, now],
)
.db_err()?;
let actual_id: String = tx
.query_row(
"SELECT id FROM semantic_memory WHERE category = ?1 AND key = ?2",
rusqlite::params![category, key],
|row| row.get(0),
)
.db_err()?;
tx.execute(
"DELETE FROM memory_fts WHERE source_table = 'semantic' AND source_id = ?1",
rusqlite::params![actual_id],
)
.db_err()?;
tx.execute(
"INSERT INTO memory_fts (content, category, source_table, source_id) VALUES (?1, ?2, 'semantic', ?3)",
rusqlite::params![value, category, actual_id],
)
.db_err()?;
tx.commit().db_err()?;
actual_id
};
let summary: String = format!("{key}: {value}").chars().take(150).collect();
auto_index(db, "semantic_memory", &actual_id, &summary, Some(category));
Ok(actual_id)
}
pub fn retrieve_semantic(db: &Database, category: &str) -> Result<Vec<SemanticEntry>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, category, key, value, confidence, memory_state, state_reason, created_at, updated_at \
FROM semantic_memory WHERE category = ?1 ORDER BY confidence DESC",
)
.db_err()?;
let rows = stmt
.query_map([category], |row| {
Ok(SemanticEntry {
id: row.get(0)?,
category: row.get(1)?,
key: row.get(2)?,
value: row.get(3)?,
confidence: row.get(4)?,
memory_state: row.get(5)?,
state_reason: row.get(6)?,
created_at: row.get(7)?,
updated_at: row.get(8)?,
})
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn mark_semantic_stale_by_category_and_key_prefix(
db: &Database,
category: &str,
key_prefix: &str,
keep_id: &str,
reason: &str,
) -> Result<usize> {
let conn = db.conn();
let updated = conn
.execute(
"UPDATE semantic_memory
SET memory_state = 'stale', state_reason = ?1
WHERE category = ?2
AND key LIKE ?3
AND id != ?4
AND memory_state = 'active'",
rusqlite::params![reason, category, format!("{key_prefix}%"), keep_id],
)
.db_err()?;
Ok(updated)
}
pub fn list_semantic_categories(db: &Database) -> Result<Vec<(String, i64)>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT category, COUNT(*) as cnt FROM semantic_memory \
GROUP BY category ORDER BY cnt DESC",
)
.db_err()?;
let rows = stmt
.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, i64>(1)?))
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
pub fn retrieve_semantic_all(db: &Database, limit: i64) -> Result<Vec<SemanticEntry>> {
let conn = db.conn();
let mut stmt = conn
.prepare(
"SELECT id, category, key, value, confidence, memory_state, state_reason, created_at, updated_at \
FROM semantic_memory ORDER BY confidence DESC, updated_at DESC LIMIT ?1",
)
.db_err()?;
let rows = stmt
.query_map([limit], |row| {
Ok(SemanticEntry {
id: row.get(0)?,
category: row.get(1)?,
key: row.get(2)?,
value: row.get(3)?,
confidence: row.get(4)?,
memory_state: row.get(5)?,
state_reason: row.get(6)?,
created_at: row.get(7)?,
updated_at: row.get(8)?,
})
})
.db_err()?;
rows.collect::<std::result::Result<Vec<_>, _>>().db_err()
}
#[derive(Debug, Clone)]
pub struct ProceduralEntry {
pub id: String,
pub name: String,
pub steps: String,
pub success_count: i64,
pub failure_count: i64,
pub created_at: String,
pub updated_at: String,
}
pub fn store_procedural(db: &Database, name: &str, steps: &str) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let persisted_id = {
let conn = db.conn();
conn.execute(
"INSERT INTO procedural_memory (id, name, steps, created_at) VALUES (?1, ?2, ?3, ?4) \
ON CONFLICT(name) DO UPDATE SET steps = excluded.steps, updated_at = ?4",
rusqlite::params![id, name, steps, now],
)
.db_err()?;
conn.query_row(
"SELECT id FROM procedural_memory WHERE name = ?1",
[name],
|r| r.get::<_, String>(0),
)
.db_err()?
};
auto_index(
db,
"procedural_memory",
&persisted_id,
&format!("Tool: {name}"),
Some("procedural"),
);
Ok(persisted_id)
}
pub fn retrieve_procedural(db: &Database, name: &str) -> Result<Option<ProceduralEntry>> {
let conn = db.conn();
conn.query_row(
"SELECT id, name, steps, success_count, failure_count, created_at, updated_at \
FROM procedural_memory WHERE name = ?1",
[name],
|row| {
Ok(ProceduralEntry {
id: row.get(0)?,
name: row.get(1)?,
steps: row.get(2)?,
success_count: row.get(3)?,
failure_count: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
})
},
)
.optional()
.db_err()
}
pub fn record_procedural_success(db: &Database, name: &str) -> Result<()> {
let persisted_id = {
let conn = db.conn();
conn.execute(
"INSERT INTO procedural_memory (id, name, steps, success_count, failure_count, created_at, updated_at) \
VALUES (lower(hex(randomblob(16))), ?1, '', 0, 0, datetime('now'), datetime('now')) \
ON CONFLICT(name) DO NOTHING",
[name],
)
.db_err()?;
conn.execute(
"UPDATE procedural_memory SET success_count = success_count + 1, updated_at = datetime('now') WHERE name = ?1",
[name],
)
.db_err()?;
conn.query_row(
"SELECT id FROM procedural_memory WHERE name = ?1",
[name],
|r| r.get::<_, String>(0),
)
.db_err()?
};
auto_index(
db,
"procedural_memory",
&persisted_id,
&format!("Tool: {name}"),
Some("procedural"),
);
Ok(())
}
pub fn record_procedural_failure(db: &Database, name: &str) -> Result<()> {
let persisted_id = {
let conn = db.conn();
conn.execute(
"INSERT INTO procedural_memory (id, name, steps, success_count, failure_count, created_at, updated_at) \
VALUES (lower(hex(randomblob(16))), ?1, '', 0, 0, datetime('now'), datetime('now')) \
ON CONFLICT(name) DO NOTHING",
[name],
)
.db_err()?;
conn.execute(
"UPDATE procedural_memory SET failure_count = failure_count + 1, updated_at = datetime('now') WHERE name = ?1",
[name],
)
.db_err()?;
conn.query_row(
"SELECT id FROM procedural_memory WHERE name = ?1",
[name],
|r| r.get::<_, String>(0),
)
.db_err()?
};
auto_index(
db,
"procedural_memory",
&persisted_id,
&format!("Tool: {name}"),
Some("procedural"),
);
Ok(())
}
pub fn prune_stale_procedural(db: &Database, stale_days: u32) -> Result<usize> {
let conn = db.conn();
let deleted = conn
.execute(
"DELETE FROM procedural_memory \
WHERE success_count = 0 AND failure_count = 0 \
AND updated_at < datetime('now', ?1)",
[format!("-{stale_days} days")],
)
.db_err()?;
Ok(deleted)
}
#[derive(Debug, Clone)]
pub struct RelationshipEntry {
pub id: String,
pub entity_id: String,
pub entity_name: Option<String>,
pub trust_score: f64,
pub interaction_summary: Option<String>,
pub interaction_count: i64,
pub last_interaction: Option<String>,
pub created_at: String,
}
pub fn store_relationship(
db: &Database,
entity_id: &str,
entity_name: &str,
trust_score: f64,
) -> Result<String> {
store_relationship_interaction(db, entity_id, entity_name, trust_score, None)
}
pub fn store_relationship_interaction(
db: &Database,
entity_id: &str,
entity_name: &str,
trust_score: f64,
interaction_summary: Option<&str>,
) -> Result<String> {
let id = uuid::Uuid::new_v4().to_string();
let now = Utc::now().to_rfc3339();
let persisted_id = {
let conn = db.conn();
conn.execute(
"INSERT INTO relationship_memory
(id, entity_id, entity_name, trust_score, interaction_summary, interaction_count, last_interaction, created_at) \
VALUES (?1, ?2, ?3, ?4, ?5, 1, ?6, ?6) \
ON CONFLICT(entity_id) DO UPDATE SET entity_name = excluded.entity_name, \
trust_score = excluded.trust_score, interaction_count = interaction_count + 1, \
interaction_summary = COALESCE(excluded.interaction_summary, relationship_memory.interaction_summary), \
last_interaction = excluded.last_interaction",
rusqlite::params![
id,
entity_id,
entity_name,
trust_score,
interaction_summary,
now
],
)
.db_err()?;
conn.query_row(
"SELECT id FROM relationship_memory WHERE entity_id = ?1",
[entity_id],
|r| r.get::<_, String>(0),
)
.db_err()?
};
auto_index(
db,
"relationship_memory",
&persisted_id,
&format!("Entity: {entity_name} (trust: {trust_score:.1})"),
Some("relationship"),
);
Ok(persisted_id)
}
pub fn retrieve_relationship(db: &Database, entity_id: &str) -> Result<Option<RelationshipEntry>> {
let conn = db.conn();
conn.query_row(
"SELECT id, entity_id, entity_name, trust_score, interaction_summary, \
interaction_count, last_interaction, created_at \
FROM relationship_memory WHERE entity_id = ?1",
[entity_id],
|row| {
Ok(RelationshipEntry {
id: row.get(0)?,
entity_id: row.get(1)?,
entity_name: row.get(2)?,
trust_score: row.get(3)?,
interaction_summary: row.get(4)?,
interaction_count: row.get(5)?,
last_interaction: row.get(6)?,
created_at: row.get(7)?,
})
},
)
.optional()
.db_err()
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct MemoryTierHealth {
pub total: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub session_entries: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub active: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub stale: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub utilized: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub idle: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub total_interactions: Option<i64>,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct MemoryHealthSnapshot {
pub session_id: String,
pub working: MemoryTierHealth,
pub episodic: MemoryTierHealth,
pub semantic: MemoryTierHealth,
pub procedural: MemoryTierHealth,
pub relationship: MemoryTierHealth,
}
pub fn memory_health_snapshot(db: &Database, session_id: &str) -> Result<MemoryHealthSnapshot> {
let conn = db.conn();
let count = |sql: &str, params: &[&dyn rusqlite::ToSql]| -> Result<i64> {
conn.query_row(sql, params, |row| row.get(0)).db_err()
};
let working_total = count("SELECT COUNT(*) FROM working_memory", &[])?;
let working_session = count(
"SELECT COUNT(*) FROM working_memory WHERE session_id = ?1",
&[&session_id],
)?;
let episodic_total = count("SELECT COUNT(*) FROM episodic_memory", &[])?;
let episodic_active = count(
"SELECT COUNT(*) FROM episodic_memory WHERE memory_state = 'active'",
&[],
)?;
let episodic_stale = count(
"SELECT COUNT(*) FROM episodic_memory WHERE memory_state != 'active'",
&[],
)?;
let semantic_total = count("SELECT COUNT(*) FROM semantic_memory", &[])?;
let semantic_active = count(
"SELECT COUNT(*) FROM semantic_memory WHERE memory_state = 'active'",
&[],
)?;
let semantic_stale = count(
"SELECT COUNT(*) FROM semantic_memory WHERE memory_state != 'active'",
&[],
)?;
let procedural_total = count("SELECT COUNT(*) FROM procedural_memory", &[])?;
let procedural_utilized = count(
"SELECT COUNT(*) FROM procedural_memory WHERE success_count > 0 OR failure_count > 0",
&[],
)?;
let procedural_idle = count(
"SELECT COUNT(*) FROM procedural_memory WHERE success_count = 0 AND failure_count = 0",
&[],
)?;
let relationship_total = count("SELECT COUNT(*) FROM relationship_memory", &[])?;
let total_interactions = count(
"SELECT COALESCE(SUM(interaction_count), 0) FROM relationship_memory",
&[],
)?;
Ok(MemoryHealthSnapshot {
session_id: session_id.to_string(),
working: MemoryTierHealth {
total: working_total,
session_entries: Some(working_session),
active: None,
stale: None,
utilized: None,
idle: None,
total_interactions: None,
},
episodic: MemoryTierHealth {
total: episodic_total,
session_entries: None,
active: Some(episodic_active),
stale: Some(episodic_stale),
utilized: None,
idle: None,
total_interactions: None,
},
semantic: MemoryTierHealth {
total: semantic_total,
session_entries: None,
active: Some(semantic_active),
stale: Some(semantic_stale),
utilized: None,
idle: None,
total_interactions: None,
},
procedural: MemoryTierHealth {
total: procedural_total,
session_entries: None,
active: None,
stale: None,
utilized: Some(procedural_utilized),
idle: Some(procedural_idle),
total_interactions: None,
},
relationship: MemoryTierHealth {
total: relationship_total,
session_entries: None,
active: None,
stale: None,
utilized: None,
idle: None,
total_interactions: Some(total_interactions),
},
})
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct MemorySearchResult {
pub content: String,
pub category: String,
pub source: String,
}
pub(crate) fn sanitize_fts_query(query: &str) -> String {
let stripped: String = query
.chars()
.filter(|c| c.is_alphanumeric() || c.is_whitespace())
.collect();
let tokens: Vec<String> = stripped
.split_whitespace()
.take(12)
.map(|t| format!("\"{}\"", t.replace('"', "\"\"")))
.collect();
if tokens.is_empty() {
"\"\"".to_string()
} else {
tokens.join(" OR ")
}
}
pub fn fts_search(db: &Database, query: &str, limit: i64) -> Result<Vec<MemorySearchResult>> {
let conn = db.conn();
let mut results: Vec<MemorySearchResult> = Vec::new();
let mut seen = std::collections::HashSet::new();
let fts_query = sanitize_fts_query(query);
match conn.prepare(
"SELECT content, category, source_table FROM memory_fts WHERE memory_fts MATCH ?1 LIMIT ?2",
) {
Ok(mut stmt) => {
match stmt.query_map(rusqlite::params![fts_query, limit], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
))
}) {
Ok(rows) => {
for row in rows.flatten() {
let key = format!("{}|{}", row.2, row.0);
if seen.insert(key) {
results.push(MemorySearchResult {
content: row.0,
category: row.1,
source: row.2,
});
if results.len() as i64 >= limit {
return Ok(results);
}
}
}
}
Err(e) => tracing::warn!(error = %e, "FTS5 query_map failed"),
}
}
Err(e) => tracing::warn!(error = %e, "FTS5 query preparation failed"),
}
let escaped_query = query
.replace('\\', "\\\\")
.replace('%', "\\%")
.replace('_', "\\_");
let pattern = format!("%{escaped_query}%");
let tables_and_cols: &[(&str, &str)] = &[
("procedural_memory", "steps"),
("relationship_memory", "interaction_summary"),
];
for &(table, col) in tables_and_cols {
let sql = format!("SELECT {col} FROM {table} WHERE {col} LIKE ?1 ESCAPE '\\' LIMIT ?2");
match conn.prepare(&sql) {
Ok(mut stmt) => {
match stmt.query_map(rusqlite::params![pattern, limit], |row| {
row.get::<_, String>(0)
}) {
Ok(rows) => {
for row in rows.flatten() {
let key = format!("{table}|{row}");
if seen.insert(key) {
results.push(MemorySearchResult {
content: row,
category: table.replace("_memory", ""),
source: table.to_string(),
});
if results.len() as i64 >= limit {
return Ok(results);
}
}
}
}
Err(e) => {
tracing::warn!(error = %e, table, col, "LIKE fallback query_map failed")
}
}
}
Err(e) => {
tracing::warn!(error = %e, table, col, "LIKE fallback query preparation failed")
}
}
}
Ok(results)
}
pub fn prune_dead_episodic(db: &Database, stale_days: u32) -> Result<usize> {
let conn = db.conn();
let deleted = conn
.execute(
"DELETE FROM episodic_memory \
WHERE importance <= 1 \
AND created_at < datetime('now', ?1)",
[format!("-{stale_days} days")],
)
.db_err()?;
Ok(deleted)
}
pub fn mark_memory_stale(
db: &Database,
source_table: &str,
source_id: &str,
reason: &str,
) -> Result<()> {
let conn = db.conn();
match source_table {
"episodic_memory" => {
conn.execute(
"UPDATE episodic_memory SET memory_state = 'stale', state_reason = ?1 WHERE id = ?2",
rusqlite::params![reason, source_id],
).db_err()?;
}
"semantic_memory" => {
conn.execute(
"UPDATE semantic_memory SET memory_state = 'stale', state_reason = ?1 WHERE id = ?2",
rusqlite::params![reason, source_id],
).db_err()?;
}
_ => {} }
Ok(())
}
pub fn cleanup_orphaned_working_memory(db: &Database) -> Result<usize> {
let conn = db.conn();
let deleted = conn
.execute(
"DELETE FROM working_memory \
WHERE session_id NOT IN (SELECT id FROM sessions)",
[],
)
.db_err()?;
Ok(deleted)
}
pub fn mark_derivable_episodic_stale(db: &Database) -> Result<usize> {
let derivable_patterns = [
"Executed 'list_directory'%",
"Executed 'list-subagent-roster'%",
"Executed 'get_subagent_status'%",
"Executed 'get_runtime_context'%",
"Executed 'get_memory_stats'%",
"Executed 'get_channel_health'%",
"Executed 'list-open-tasks'%",
"Executed 'list-available-skills'%",
"Executed 'task-status'%",
"Executed 'get_wallet_balance'%",
"Executed 'read_file'%",
"Executed 'orchestrate-subagents'%",
"Used tool 'list_directory'%",
"Used tool 'get_runtime_context'%",
"Used tool 'get_memory_stats'%",
"Used tool 'get_channel_health'%",
"Used tool 'get_subagent_status'%",
"Used tool 'list-subagent-roster'%",
"Used tool 'read_file'%",
"Used tool 'orchestrate-subagents'%",
];
let conn = db.conn();
let mut total = 0usize;
for pattern in &derivable_patterns {
let updated = conn
.execute(
"UPDATE episodic_memory SET memory_state = 'stale', \
state_reason = 'derivable_cleanup' \
WHERE memory_state = 'active' AND content LIKE ?1",
rusqlite::params![pattern],
)
.db_err()?;
total += updated;
}
if total > 0 {
tracing::info!(total, "marked legacy derivable episodic entries as stale");
}
Ok(total)
}
pub fn episodic_content_exists(db: &Database, content: &str) -> bool {
let conn = db.conn();
conn.query_row(
"SELECT 1 FROM episodic_memory WHERE content = ?1 LIMIT 1",
rusqlite::params![content],
|_| Ok(()),
)
.is_ok()
}
pub fn cleanup_inactive_working_memory(db: &Database) -> Result<usize> {
let conn = db.conn();
let deleted = conn
.execute(
"DELETE FROM working_memory \
WHERE session_id NOT IN (SELECT id FROM sessions WHERE status = 'active')",
[],
)
.db_err()?;
if deleted > 0 {
tracing::info!(deleted, "cleaned working memory for inactive sessions");
}
Ok(deleted)
}
pub fn promote_episodic_to_semantic(
db: &Database,
episodic_id: &str,
category: &str,
key: &str,
preserve_original: bool,
) -> Result<String> {
let entry = retrieve_episodic_by_id(db, episodic_id)?.ok_or_else(|| {
roboticus_core::RoboticusError::Database(format!(
"episodic memory '{episodic_id}' not found"
))
})?;
let semantic_id = store_semantic(db, category, key, &entry.content, 1.0)?;
if preserve_original {
let conn = db.conn();
conn.execute(
"UPDATE episodic_memory SET memory_state = 'migrated', \
state_reason = ?1 WHERE id = ?2",
rusqlite::params![format!("promoted_to_semantic:{semantic_id}"), episodic_id],
)
.db_err()?;
} else {
let conn = db.conn();
conn.execute(
"DELETE FROM episodic_memory WHERE id = ?1",
rusqlite::params![episodic_id],
)
.db_err()?;
}
Ok(semantic_id)
}
fn retrieve_episodic_by_id(db: &Database, id: &str) -> Result<Option<EpisodicEntry>> {
let conn = db.conn();
conn.query_row(
"SELECT id, classification, content, importance, owner_id, memory_state, \
state_reason, created_at FROM episodic_memory WHERE id = ?1",
rusqlite::params![id],
|row| {
Ok(EpisodicEntry {
id: row.get(0)?,
classification: row.get(1)?,
content: row.get(2)?,
importance: row.get(3)?,
owner_id: row.get(4)?,
memory_state: row.get(5)?,
state_reason: row.get(6)?,
created_at: row.get(7)?,
})
},
)
.optional()
.db_err()
}
pub fn flag_immutable(db: &Database, source_id: &str) -> Result<()> {
let conn = db.conn();
conn.execute(
"UPDATE memory_index SET confidence = 2.0 WHERE source_id = ?1",
rusqlite::params![source_id],
)
.db_err()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
fn test_db() -> Database {
Database::new(":memory:").unwrap()
}
#[test]
fn working_memory_roundtrip() {
let db = test_db();
store_working(&db, "sess-1", "goal", "find food", 8).unwrap();
store_working(&db, "sess-1", "observation", "sun is up", 3).unwrap();
let entries = retrieve_working(&db, "sess-1").unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].importance, 8, "higher importance first");
}
#[test]
fn episodic_memory_roundtrip() {
let db = test_db();
store_episodic(&db, "success", "deployed v1.0", 9).unwrap();
store_episodic(&db, "failure", "ran out of credits", 7).unwrap();
let entries = retrieve_episodic(&db, 10).unwrap();
assert_eq!(entries.len(), 2);
assert_eq!(entries[0].classification, "success");
assert_eq!(entries[0].memory_state, "active");
}
#[test]
fn semantic_memory_upsert() {
let db = test_db();
store_semantic(&db, "facts", "sky_color", "blue", 0.9).unwrap();
store_semantic(&db, "facts", "sky_color", "grey", 0.7).unwrap();
let entries = retrieve_semantic(&db, "facts").unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].value, "grey");
assert_eq!(entries[0].memory_state, "active");
}
#[test]
fn mark_semantic_stale_by_category_and_key_prefix_marks_older_rows() {
let db = test_db();
let old_id = store_semantic(&db, "learned", "session:s1:old", "older fact", 0.8).unwrap();
let keep_id = store_semantic(&db, "learned", "session:s1:new", "newer fact", 0.9).unwrap();
store_semantic(&db, "learned", "session:s2:other", "other session", 0.7).unwrap();
let updated = mark_semantic_stale_by_category_and_key_prefix(
&db,
"learned",
"session:s1:",
&keep_id,
"superseded_by_newer_session_summary",
)
.unwrap();
assert_eq!(updated, 1);
let entries = retrieve_semantic(&db, "learned").unwrap();
let old = entries.iter().find(|entry| entry.id == old_id).unwrap();
let keep = entries.iter().find(|entry| entry.id == keep_id).unwrap();
let other = entries
.iter()
.find(|entry| entry.key == "session:s2:other")
.unwrap();
assert_eq!(old.memory_state, "stale");
assert_eq!(
old.state_reason.as_deref(),
Some("superseded_by_newer_session_summary")
);
assert_eq!(keep.memory_state, "active");
assert_eq!(other.memory_state, "active");
}
#[test]
fn procedural_memory_roundtrip() {
let db = test_db();
store_procedural(&db, "deploy", r#"["build","push","verify"]"#).unwrap();
let entry = retrieve_procedural(&db, "deploy").unwrap().unwrap();
assert_eq!(entry.name, "deploy");
}
#[test]
fn relationship_memory_roundtrip() {
let db = test_db();
store_relationship(&db, "user-42", "Jon", 0.9).unwrap();
let entry = retrieve_relationship(&db, "user-42").unwrap().unwrap();
assert_eq!(entry.entity_name.as_deref(), Some("Jon"));
assert!((entry.trust_score - 0.9).abs() < f64::EPSILON);
assert_eq!(entry.interaction_count, 1);
}
#[test]
fn memory_health_snapshot_reports_live_counts_and_states() {
let db = test_db();
let session_id = crate::sessions::find_or_create(&db, "health-agent", None).unwrap();
store_working(&db, &session_id, "goal", "Keep recall sharp", 8).unwrap();
store_episodic_with_meta(
&db,
"digest",
"fresh digest",
8,
Some("health-agent"),
"active",
None,
)
.unwrap();
store_episodic_with_meta(
&db,
"digest",
"old digest",
6,
Some("health-agent"),
"stale",
Some("superseded"),
)
.unwrap();
store_semantic(&db, "learned", "session:test:1", "Current summary", 0.9).unwrap();
let stale_semantic =
store_semantic(&db, "learned", "session:test:0", "Old summary", 0.7).unwrap();
db.conn()
.execute(
"UPDATE semantic_memory SET memory_state = 'stale', state_reason = 'superseded' WHERE id = ?1",
[&stale_semantic],
)
.unwrap();
store_procedural(&db, "echo", "Say things back").unwrap();
record_procedural_success(&db, "echo").unwrap();
store_procedural(&db, "idle-tool", "Not used yet").unwrap();
store_relationship_interaction(
&db,
"peer:telegram:alice",
"Alice",
0.6,
Some("First hello"),
)
.unwrap();
store_relationship_interaction(
&db,
"peer:telegram:alice",
"Alice",
0.6,
Some("Second hello"),
)
.unwrap();
let snapshot = memory_health_snapshot(&db, &session_id).unwrap();
assert_eq!(snapshot.session_id, session_id);
assert_eq!(snapshot.working.total, 1);
assert_eq!(snapshot.working.session_entries, Some(1));
assert_eq!(snapshot.episodic.total, 2);
assert_eq!(snapshot.episodic.active, Some(1));
assert_eq!(snapshot.episodic.stale, Some(1));
assert_eq!(snapshot.semantic.total, 2);
assert_eq!(snapshot.semantic.active, Some(1));
assert_eq!(snapshot.semantic.stale, Some(1));
assert_eq!(snapshot.procedural.total, 2);
assert_eq!(snapshot.procedural.utilized, Some(1));
assert_eq!(snapshot.procedural.idle, Some(1));
assert_eq!(snapshot.relationship.total, 1);
assert_eq!(snapshot.relationship.total_interactions, Some(2));
}
#[test]
fn fts_search_finds_across_tiers() {
let db = test_db();
store_working(&db, "s1", "note", "the quick brown fox", 5).unwrap();
store_episodic(&db, "event", "a lazy dog appeared", 5).unwrap();
store_semantic(&db, "facts", "animal", "fox is quick", 0.8).unwrap();
let hits = fts_search(&db, "quick", 10).unwrap();
assert_eq!(hits.len(), 2, "should match working + semantic");
}
#[test]
fn fts_search_finds_episodic_via_trigger() {
let db = test_db();
store_episodic(&db, "discovery", "the quantum engine hummed", 9).unwrap();
let hits = fts_search(&db, "quantum", 10).unwrap();
assert_eq!(hits.len(), 1);
assert!(hits[0].content.contains("quantum"));
}
#[test]
fn fts_respects_limit() {
let db = test_db();
for i in 0..5 {
store_working(&db, "s1", "note", &format!("alpha item {i}"), 1).unwrap();
}
let hits = fts_search(&db, "alpha", 3).unwrap();
assert_eq!(hits.len(), 3);
}
#[test]
fn semantic_upsert_returns_existing_id() {
let db = test_db();
let id1 = store_semantic(&db, "prefs", "color", "blue", 0.9).unwrap();
let id2 = store_semantic(&db, "prefs", "color", "red", 0.8).unwrap();
assert_eq!(id1, id2, "upsert should return the original row id");
}
#[test]
fn procedural_failure_tracking() {
let db = test_db();
store_procedural(&db, "deploy", r#"["build","push"]"#).unwrap();
let entry = retrieve_procedural(&db, "deploy").unwrap().unwrap();
assert_eq!(entry.failure_count, 0);
record_procedural_failure(&db, "deploy").unwrap();
record_procedural_failure(&db, "deploy").unwrap();
let entry = retrieve_procedural(&db, "deploy").unwrap().unwrap();
assert_eq!(entry.failure_count, 2);
}
#[test]
fn store_working_writes_both_tables() {
let db = test_db();
let id = store_working(&db, "sess-1", "fact", "the sky is blue", 5).unwrap();
let conn = db.conn();
let count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM working_memory WHERE id = ?1",
[&id],
|r| r.get(0),
)
.unwrap();
assert_eq!(count, 1);
let fts_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM memory_fts WHERE source_id = ?1",
[&id],
|r| r.get(0),
)
.unwrap();
assert_eq!(fts_count, 1);
}
#[test]
fn record_procedural_success_tracking() {
let db = test_db();
store_procedural(&db, "deploy", r#"["build","push"]"#).unwrap();
record_procedural_success(&db, "deploy").unwrap();
record_procedural_success(&db, "deploy").unwrap();
record_procedural_success(&db, "deploy").unwrap();
let entry = retrieve_procedural(&db, "deploy").unwrap().unwrap();
assert_eq!(entry.success_count, 3);
}
#[test]
fn retrieve_working_empty_session() {
let db = test_db();
let entries = retrieve_working(&db, "nonexistent-session").unwrap();
assert!(entries.is_empty());
}
#[test]
fn retrieve_working_is_session_isolated() {
let db = test_db();
store_working(&db, "sess-a", "note", "alpha", 5).unwrap();
store_working(&db, "sess-b", "note", "beta", 5).unwrap();
let a = retrieve_working(&db, "sess-a").unwrap();
let b = retrieve_working(&db, "sess-b").unwrap();
assert_eq!(a.len(), 1);
assert_eq!(b.len(), 1);
assert_eq!(a[0].content, "alpha");
assert_eq!(b[0].content, "beta");
}
#[test]
fn retrieve_episodic_limit_zero() {
let db = test_db();
store_episodic(&db, "event", "something happened", 5).unwrap();
let entries = retrieve_episodic(&db, 0).unwrap();
assert!(entries.is_empty());
}
#[test]
fn retrieve_semantic_empty_category() {
let db = test_db();
let entries = retrieve_semantic(&db, "no-such-category").unwrap();
assert!(entries.is_empty());
}
#[test]
fn retrieve_procedural_nonexistent() {
let db = test_db();
let entry = retrieve_procedural(&db, "nonexistent").unwrap();
assert!(entry.is_none());
}
#[test]
fn retrieve_relationship_nonexistent() {
let db = test_db();
let entry = retrieve_relationship(&db, "no-such-entity").unwrap();
assert!(entry.is_none());
}
#[test]
fn store_relationship_upsert_increments_interaction() {
let db = test_db();
store_relationship(&db, "user-1", "Alice", 0.5).unwrap();
store_relationship(&db, "user-1", "Alice Updated", 0.8).unwrap();
let entry = retrieve_relationship(&db, "user-1").unwrap().unwrap();
assert_eq!(entry.interaction_count, 2);
}
#[test]
fn mark_episodic_digests_stale_for_owner_marks_older_rows() {
let db = test_db();
let stale_id = store_episodic_with_meta(
&db,
"digest",
"older digest",
7,
Some("agent-1"),
"active",
None,
)
.unwrap();
let keep_id = store_episodic_with_meta(
&db,
"digest",
"newer digest",
8,
Some("agent-1"),
"active",
None,
)
.unwrap();
let changed =
mark_episodic_digests_stale_for_owner(&db, "agent-1", &keep_id, "superseded").unwrap();
assert_eq!(changed, 1);
let entries = retrieve_episodic(&db, 10).unwrap();
let stale = entries.iter().find(|e| e.id == stale_id).unwrap();
let keep = entries.iter().find(|e| e.id == keep_id).unwrap();
assert_eq!(stale.memory_state, "stale");
assert_eq!(stale.state_reason.as_deref(), Some("superseded"));
assert_eq!(keep.memory_state, "active");
}
#[test]
fn store_relationship_interaction_updates_summary() {
let db = test_db();
store_relationship_interaction(
&db,
"peer:telegram:alice",
"alice",
0.7,
Some("User: hello; Assistant: hi there"),
)
.unwrap();
let entry = retrieve_relationship(&db, "peer:telegram:alice")
.unwrap()
.expect("relationship should exist");
assert_eq!(
entry.interaction_summary.as_deref(),
Some("User: hello; Assistant: hi there")
);
}
#[test]
fn store_procedural_upsert_updates_steps() {
let db = test_db();
store_procedural(&db, "deploy", r#"["build"]"#).unwrap();
store_procedural(&db, "deploy", r#"["build","push","verify"]"#).unwrap();
let entry = retrieve_procedural(&db, "deploy").unwrap().unwrap();
assert_eq!(entry.steps, r#"["build","push","verify"]"#);
}
#[test]
fn fts_search_no_matches() {
let db = test_db();
store_working(&db, "s1", "note", "hello world", 5).unwrap();
let hits = fts_search(&db, "zzzznotfound", 10).unwrap();
assert!(hits.is_empty());
}
#[test]
fn fts_search_like_fallback_procedural() {
let db = test_db();
store_procedural(&db, "backup", "step one: tar the archive and compress").unwrap();
let hits = fts_search(&db, "tar the archive", 10).unwrap();
assert!(!hits.is_empty());
}
#[test]
fn retrieve_working_all_returns_across_sessions() {
let db = test_db();
store_working(&db, "sess-a", "note", "alpha entry", 5).unwrap();
store_working(&db, "sess-b", "note", "beta entry", 8).unwrap();
store_working(&db, "sess-c", "note", "gamma entry", 3).unwrap();
let entries = retrieve_working_all(&db, 100).unwrap();
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].importance, 8);
assert_eq!(entries[1].importance, 5);
assert_eq!(entries[2].importance, 3);
}
#[test]
fn retrieve_working_all_respects_limit() {
let db = test_db();
for i in 0..5 {
store_working(&db, "sess", "note", &format!("entry {i}"), i).unwrap();
}
let entries = retrieve_working_all(&db, 2).unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn retrieve_working_all_empty_db() {
let db = test_db();
let entries = retrieve_working_all(&db, 10).unwrap();
assert!(entries.is_empty());
}
#[test]
fn list_semantic_categories_returns_grouped() {
let db = test_db();
store_semantic(&db, "facts", "sky_color", "blue", 0.9).unwrap();
store_semantic(&db, "facts", "grass_color", "green", 0.8).unwrap();
store_semantic(&db, "prefs", "theme", "dark", 0.7).unwrap();
let categories = list_semantic_categories(&db).unwrap();
assert_eq!(categories.len(), 2);
assert_eq!(categories[0].0, "facts");
assert_eq!(categories[0].1, 2);
assert_eq!(categories[1].0, "prefs");
assert_eq!(categories[1].1, 1);
}
#[test]
fn list_semantic_categories_empty() {
let db = test_db();
let categories = list_semantic_categories(&db).unwrap();
assert!(categories.is_empty());
}
#[test]
fn retrieve_semantic_all_returns_across_categories() {
let db = test_db();
store_semantic(&db, "facts", "sky", "blue", 0.9).unwrap();
store_semantic(&db, "prefs", "theme", "dark", 0.7).unwrap();
store_semantic(&db, "facts", "grass", "green", 0.8).unwrap();
let entries = retrieve_semantic_all(&db, 100).unwrap();
assert_eq!(entries.len(), 3);
assert!((entries[0].confidence - 0.9).abs() < f64::EPSILON);
}
#[test]
fn retrieve_semantic_all_respects_limit() {
let db = test_db();
for i in 0..5 {
store_semantic(
&db,
"cat",
&format!("key{i}"),
&format!("val{i}"),
0.5 + i as f64 * 0.1,
)
.unwrap();
}
let entries = retrieve_semantic_all(&db, 2).unwrap();
assert_eq!(entries.len(), 2);
}
#[test]
fn retrieve_semantic_all_empty() {
let db = test_db();
let entries = retrieve_semantic_all(&db, 10).unwrap();
assert!(entries.is_empty());
}
#[test]
fn fts_search_like_fallback_relationship() {
let db = test_db();
{
let conn = db.conn();
conn.execute(
"INSERT INTO relationship_memory (id, entity_id, entity_name, trust_score, interaction_summary) \
VALUES ('r1', 'user-99', 'TestUser', 0.8, 'discussed the quantum physics experiment')",
[],
).unwrap();
}
let hits = fts_search(&db, "quantum physics", 10).unwrap();
assert!(
!hits.is_empty(),
"LIKE fallback should find relationship interaction_summary"
);
}
#[test]
fn fts_search_limit_reached_in_fts_phase() {
let db = test_db();
for i in 0..5 {
store_working(
&db,
"sess",
"note",
&format!("searchable keyword item {i}"),
5,
)
.unwrap();
}
let hits = fts_search(&db, "keyword", 2).unwrap();
assert_eq!(hits.len(), 2, "should stop at limit during FTS phase");
}
#[test]
fn fts_search_limit_reached_in_like_phase() {
let db = test_db();
for i in 0..5 {
store_procedural(
&db,
&format!("proc_{i}"),
&format!("step: run the xyzzy command {i}"),
)
.unwrap();
}
let hits = fts_search(&db, "xyzzy command", 2).unwrap();
assert_eq!(
hits.len(),
2,
"should stop at limit during LIKE fallback phase"
);
}
#[test]
fn fts_search_special_chars_in_query() {
let db = test_db();
store_working(
&db,
"sess",
"note",
"test with percent % and underscore _",
5,
)
.unwrap();
let hits = fts_search(&db, "percent", 10).unwrap();
assert!(!hits.is_empty());
}
#[test]
fn sanitize_fts_query_strips_operators() {
let result = sanitize_fts_query("hello AND world");
assert!(result.starts_with('"'));
assert!(result.ends_with('"'));
}
#[test]
fn sanitize_fts_query_empty() {
let result = sanitize_fts_query("");
assert_eq!(result, "\"\"");
}
#[test]
fn sanitize_fts_query_special_chars_stripped() {
let result = sanitize_fts_query("hello* OR world");
assert!(!result.contains('*'));
}
#[test]
fn prune_stale_procedural_removes_zero_activity_entries() {
let db = test_db();
store_procedural(&db, "stale-tool", "do something").unwrap();
db.conn()
.execute(
"UPDATE procedural_memory SET updated_at = datetime('now', '-60 days') WHERE name = ?1",
["stale-tool"],
)
.unwrap();
store_procedural(&db, "active-tool", "steps").unwrap();
record_procedural_success(&db, "active-tool").unwrap();
db.conn()
.execute(
"UPDATE procedural_memory SET updated_at = datetime('now', '-60 days') WHERE name = ?1",
["active-tool"],
)
.unwrap();
let pruned = prune_stale_procedural(&db, 30).unwrap();
assert_eq!(pruned, 1);
assert!(retrieve_procedural(&db, "stale-tool").unwrap().is_none());
assert!(retrieve_procedural(&db, "active-tool").unwrap().is_some());
}
#[test]
fn prune_stale_procedural_ignores_recent_entries() {
let db = test_db();
store_procedural(&db, "fresh-tool", "steps").unwrap();
let pruned = prune_stale_procedural(&db, 30).unwrap();
assert_eq!(pruned, 0);
assert!(retrieve_procedural(&db, "fresh-tool").unwrap().is_some());
}
#[test]
fn prune_dead_episodic_removes_low_importance_old() {
let db = test_db();
store_episodic(&db, "noise", "irrelevant chatter", 1).unwrap();
store_episodic(&db, "signal", "critical event", 8).unwrap();
db.conn()
.execute(
"UPDATE episodic_memory SET created_at = datetime('now', '-60 days') \
WHERE importance <= 1",
[],
)
.unwrap();
let pruned = prune_dead_episodic(&db, 30).unwrap();
assert_eq!(pruned, 1);
let remaining = retrieve_episodic(&db, 100).unwrap();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].content, "critical event");
}
#[test]
fn prune_dead_episodic_keeps_recent_low_importance() {
let db = test_db();
store_episodic(&db, "recent-noise", "just happened", 1).unwrap();
let pruned = prune_dead_episodic(&db, 30).unwrap();
assert_eq!(pruned, 0);
}
#[test]
fn prune_dead_episodic_keeps_old_high_importance() {
let db = test_db();
store_episodic(&db, "important", "old but critical", 5).unwrap();
db.conn()
.execute(
"UPDATE episodic_memory SET created_at = datetime('now', '-90 days')",
[],
)
.unwrap();
let pruned = prune_dead_episodic(&db, 30).unwrap();
assert_eq!(pruned, 0);
}
#[test]
fn cleanup_orphaned_working_memory_removes_dangling() {
let db = test_db();
let conn = db.conn();
conn.execute(
"INSERT INTO sessions (id, agent_id) VALUES ('live-sess', 'a')",
[],
)
.unwrap();
drop(conn);
store_working(&db, "live-sess", "note", "survives", 5).unwrap();
store_working(&db, "dead-sess", "note", "orphaned", 5).unwrap();
let deleted = cleanup_orphaned_working_memory(&db).unwrap();
assert_eq!(deleted, 1);
let remaining = retrieve_working(&db, "live-sess").unwrap();
assert_eq!(remaining.len(), 1);
let gone = retrieve_working(&db, "dead-sess").unwrap();
assert!(gone.is_empty());
}
#[test]
fn cleanup_orphaned_working_memory_noop_when_clean() {
let db = test_db();
let conn = db.conn();
conn.execute("INSERT INTO sessions (id, agent_id) VALUES ('s1', 'a')", [])
.unwrap();
drop(conn);
store_working(&db, "s1", "note", "ok", 5).unwrap();
let deleted = cleanup_orphaned_working_memory(&db).unwrap();
assert_eq!(deleted, 0);
}
#[test]
fn promote_episodic_to_semantic_with_preserve() {
let db = test_db();
crate::schema::initialize_db(&db).unwrap();
let eid = store_episodic(&db, "insight", "user prefers terse responses", 8).unwrap();
let sid = promote_episodic_to_semantic(&db, &eid, "preferences", "terse", true).unwrap();
let entries = retrieve_semantic(&db, "preferences").unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].id, sid);
assert!(entries[0].value.contains("terse"));
let original = retrieve_episodic_by_id(&db, &eid).unwrap().unwrap();
assert_eq!(original.memory_state, "migrated");
assert!(original.state_reason.as_deref().unwrap().contains(&sid));
}
#[test]
fn promote_episodic_to_semantic_without_preserve() {
let db = test_db();
crate::schema::initialize_db(&db).unwrap();
let eid = store_episodic(&db, "insight", "ephemeral fact", 5).unwrap();
promote_episodic_to_semantic(&db, &eid, "facts", "key", false).unwrap();
assert!(retrieve_episodic_by_id(&db, &eid).unwrap().is_none());
}
#[test]
fn promote_nonexistent_episodic_returns_error() {
let db = test_db();
crate::schema::initialize_db(&db).unwrap();
let result = promote_episodic_to_semantic(&db, "nope", "cat", "key", false);
assert!(result.is_err());
}
#[test]
fn flag_immutable_sets_confidence_above_one() {
let db = test_db();
crate::schema::initialize_db(&db).unwrap();
let sid = store_semantic(&db, "facts", "sky", "blue", 0.9).unwrap();
flag_immutable(&db, &sid).unwrap();
let conn = db.conn();
let confidence: f64 = conn
.query_row(
"SELECT confidence FROM memory_index WHERE source_id = ?1",
[&sid],
|row| row.get(0),
)
.unwrap();
assert!(
confidence > 1.0,
"immutable entries should have confidence > 1.0"
);
}
}