use roboticus_core::Result;
use roboticus_db::Database;
use std::collections::{HashMap, HashSet};
#[derive(Debug, Default)]
pub struct ConsolidationReport {
pub indexed: usize,
pub obsidian_indexed: usize,
pub deduped: usize,
pub tier_synced: usize,
pub pruned: usize,
pub orphans_cleaned: usize,
pub duration_ms: u64,
}
impl ConsolidationReport {
pub fn total_actions(&self) -> usize {
self.indexed
+ self.obsidian_indexed
+ self.deduped
+ self.tier_synced
+ self.pruned
+ self.orphans_cleaned
}
}
pub fn run_consolidation(
db: &Database,
vault: Option<&crate::obsidian::ObsidianVault>,
) -> Result<ConsolidationReport> {
let start = std::time::Instant::now();
let mut report = ConsolidationReport::default();
match roboticus_db::memory_index::backfill_missing_index_entries(db, 50) {
Ok(n) => report.indexed = n,
Err(e) => tracing::warn!(error = %e, "consolidation phase 1 (index sync) failed"),
}
if let Some(vault) = vault {
match index_obsidian_vault(db, vault) {
Ok(n) => report.obsidian_indexed = n,
Err(e) => tracing::warn!(error = %e, "consolidation phase 2 (obsidian scan) failed"),
}
if let Ok(n) = cleanup_deleted_obsidian_entries(db, vault) {
report.orphans_cleaned += n;
}
}
let quiescent = is_quiescent(db);
if quiescent {
match run_dedup(db, 0.85) {
Ok(n) => report.deduped = n,
Err(e) => tracing::warn!(error = %e, "consolidation phase 3 (dedup) failed"),
}
}
match sync_index_with_tier_state(db) {
Ok(n) => report.tier_synced = n,
Err(e) => tracing::warn!(error = %e, "consolidation phase 4 (tier sync) failed"),
}
match roboticus_db::memory_index::prune_low_confidence(db, 0.05) {
Ok(n) => report.pruned = n,
Err(e) => tracing::warn!(error = %e, "consolidation phase 5 (prune) failed"),
}
let mut orphans = 0usize;
if let Ok(n) = roboticus_db::memory_index::cleanup_orphaned_index_entries(db) {
orphans += n;
}
if let Ok(n) = roboticus_db::memory::cleanup_orphaned_working_memory(db) {
orphans += n;
}
if let Ok(n) = roboticus_db::embeddings::cleanup_orphaned_embeddings(db) {
orphans += n;
}
report.orphans_cleaned += orphans;
report.duration_ms = start.elapsed().as_millis() as u64;
Ok(report)
}
fn is_quiescent(db: &Database) -> bool {
let conn = db.conn();
let active: i64 = conn
.query_row(
"SELECT COUNT(*) FROM sessions
WHERE status = 'active'
AND updated_at > datetime('now', '-5 seconds')",
[],
|r| r.get(0),
)
.unwrap_or(1); active == 0
}
fn index_obsidian_vault(db: &Database, vault: &crate::obsidian::ObsidianVault) -> Result<usize> {
let all_notes = vault.notes_in_folder("");
let conn = db.conn();
let mut count = 0usize;
for (rel_path, note) in &all_notes {
let exists: bool = conn
.query_row(
"SELECT COUNT(*) FROM memory_index
WHERE source_table = 'obsidian' AND source_id = ?1",
[rel_path],
|r| r.get::<_, i64>(0),
)
.unwrap_or(0)
> 0;
if exists {
continue;
}
let tags = note.tags.join(", ");
let summary: String = if tags.is_empty() {
note.title.chars().take(150).collect()
} else {
format!("{} - {}", note.title, tags)
.chars()
.take(150)
.collect()
};
roboticus_db::memory_index::upsert_index_entry(
db,
"obsidian",
rel_path,
&summary,
Some("obsidian_note"),
)?;
count += 1;
}
Ok(count)
}
fn cleanup_deleted_obsidian_entries(
db: &Database,
vault: &crate::obsidian::ObsidianVault,
) -> Result<usize> {
let conn = db.conn();
let mut stmt = conn
.prepare("SELECT id, source_id FROM memory_index WHERE source_table = 'obsidian'")
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?;
let indexed: Vec<(String, String)> = stmt
.query_map([], |r| Ok((r.get(0)?, r.get(1)?)))
.map_err(|e| roboticus_core::RoboticusError::Database(e.to_string()))?
.filter_map(|r| r.ok())
.collect();
drop(stmt);
let mut to_remove = Vec::new();
for (idx_id, rel_path) in &indexed {
if vault.get_note(rel_path).is_none() {
to_remove.push(idx_id.clone());
}
}
let mut removed = 0usize;
for idx_id in &to_remove {
if conn
.execute("DELETE FROM memory_index WHERE id = ?1", [idx_id])
.is_ok()
{
removed += 1;
}
}
Ok(removed)
}
fn jaccard_similarity(a: &str, b: &str) -> f64 {
let tokens_a: HashSet<&str> = a.split_whitespace().collect();
let tokens_b: HashSet<&str> = b.split_whitespace().collect();
if tokens_a.is_empty() && tokens_b.is_empty() {
return 0.0;
}
let intersection = tokens_a.intersection(&tokens_b).count();
let union = tokens_a.union(&tokens_b).count();
if union == 0 {
0.0
} else {
intersection as f64 / union as f64
}
}
fn run_dedup(db: &Database, threshold: f64) -> Result<usize> {
let entries = roboticus_db::memory_index::top_entries(db, 500)?;
if entries.len() < 2 {
return Ok(0);
}
let mut groups: HashMap<(String, String), Vec<&roboticus_db::memory_index::IndexEntry>> =
HashMap::new();
for entry in &entries {
if entry.source_table == "system" {
continue;
}
let key = (
entry.source_table.clone(),
entry.category.as_deref().unwrap_or("general").to_string(),
);
groups.entry(key).or_default().push(entry);
}
struct DedupAction {
victim_index_id: String,
victim_source_table: String,
victim_source_id: String,
keeper_index_id: String,
}
let mut actions: Vec<DedupAction> = Vec::new();
for group in groups.values() {
let mut eliminated: HashSet<String> = HashSet::new();
for i in 0..group.len() {
if eliminated.contains(&group[i].id) {
continue;
}
for j in (i + 1)..group.len() {
if eliminated.contains(&group[j].id) {
continue;
}
let sim = jaccard_similarity(
&group[i].summary.to_lowercase(),
&group[j].summary.to_lowercase(),
);
if sim >= threshold {
let (keeper, victim) = if group[i].confidence >= group[j].confidence {
(group[i], group[j])
} else {
(group[j], group[i])
};
actions.push(DedupAction {
victim_index_id: victim.id.clone(),
victim_source_table: victim.source_table.clone(),
victim_source_id: victim.source_id.clone(),
keeper_index_id: keeper.id.clone(),
});
eliminated.insert(victim.id.clone());
}
}
}
}
for action in &actions {
let has_memory_state = matches!(
action.victim_source_table.as_str(),
"episodic_memory" | "semantic_memory"
);
{
let conn = db.conn();
if has_memory_state {
let _ = conn.execute(
"DELETE FROM memory_index WHERE id = ?1",
[&action.victim_index_id],
);
} else {
let _ = conn.execute(
"UPDATE memory_index SET confidence = -1.0,
summary = '[dedup] ' || summary WHERE id = ?1",
[&action.victim_index_id],
);
}
let _ = conn.execute(
"UPDATE memory_index SET confidence = 1.0 WHERE id = ?1",
[&action.keeper_index_id],
);
}
if has_memory_state {
let stale_reason = format!("dedup:{}", action.keeper_index_id);
let _ = roboticus_db::memory::mark_memory_stale(
db,
&action.victim_source_table,
&action.victim_source_id,
&stale_reason,
);
}
}
Ok(actions.len())
}
fn sync_index_with_tier_state(db: &Database) -> Result<usize> {
let mut synced = 0usize;
synced += db
.conn()
.execute(
"UPDATE memory_index SET confidence = 0.0
WHERE source_table = 'episodic_memory'
AND confidence > 0.0
AND source_id IN (
SELECT id FROM episodic_memory WHERE memory_state = 'stale'
)",
[],
)
.unwrap_or(0);
synced += db
.conn()
.execute(
"UPDATE memory_index SET confidence = 0.0
WHERE source_table = 'semantic_memory'
AND confidence > 0.0
AND source_id IN (
SELECT id FROM semantic_memory WHERE memory_state = 'stale'
)",
[],
)
.unwrap_or(0);
synced += db
.conn()
.execute(
"UPDATE memory_index SET confidence = 0.1
WHERE source_table = 'procedural_memory'
AND confidence > 0.1
AND source_id IN (
SELECT id FROM procedural_memory
WHERE (success_count + failure_count) > 3
AND CAST(failure_count AS REAL) / (success_count + failure_count) > 0.8
)",
[],
)
.unwrap_or(0);
synced += db.conn()
.execute(
"UPDATE memory_index SET confidence = MIN(1.0, MAX(0.1, CAST(ls.priority AS REAL) / 100.0))
FROM learned_skills ls
WHERE memory_index.source_table = 'learned_skills'
AND memory_index.source_id = ls.id
AND ABS(memory_index.confidence - MIN(1.0, MAX(0.1, CAST(ls.priority AS REAL) / 100.0))) > 0.05",
[],
)
.unwrap_or(0);
Ok(synced)
}
#[cfg(test)]
mod tests {
use super::*;
fn test_db() -> Database {
let db = Database::new(":memory:").unwrap();
roboticus_db::schema::initialize_db(&db).unwrap();
db
}
#[test]
fn jaccard_identical() {
assert!((jaccard_similarity("the sky is blue", "the sky is blue") - 1.0).abs() < 0.01);
}
#[test]
fn jaccard_disjoint() {
assert!((jaccard_similarity("hello world", "foo bar")).abs() < 0.01);
}
#[test]
fn jaccard_partial() {
let sim = jaccard_similarity("the quick brown fox", "the slow brown dog");
assert!(sim > 0.3 && sim < 0.4);
}
#[test]
fn dedup_within_same_tier() {
let db = test_db();
let ep1_id = roboticus_db::memory::store_episodic(
&db,
"incident",
"SLO target breached at 99.90% on API gateway",
7,
)
.unwrap();
let _ep2_id = roboticus_db::memory::store_episodic(
&db,
"incident",
"SLO target breached at 99.85% on API gateway",
7,
)
.unwrap();
let entries = roboticus_db::memory_index::top_entries(&db, 100).unwrap();
let ep_before = entries
.iter()
.filter(|e| e.source_table == "episodic_memory")
.count();
assert!(ep_before >= 2, "should have 2 episodic index entries");
let deduped = run_dedup(&db, 0.6).unwrap();
assert!(
deduped >= 1,
"similar within-tier entries should be deduped"
);
let conn = db.conn();
let stale_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM episodic_memory WHERE memory_state = 'stale'",
[],
|r| r.get(0),
)
.unwrap();
assert!(stale_count >= 1, "victim should be marked stale");
drop(conn);
let entries_after = roboticus_db::memory_index::top_entries(&db, 100).unwrap();
assert!(entries_after.iter().any(|e| e.source_id == ep1_id));
}
#[test]
fn dedup_does_not_cross_tiers() {
let db = test_db();
roboticus_db::memory::store_episodic(&db, "incident", "SLO target breached at 99.90%", 7)
.unwrap();
roboticus_db::memory::store_semantic(
&db,
"facts",
"slo_target",
"SLO target is 99.95%",
0.9,
)
.unwrap();
let entries = roboticus_db::memory_index::top_entries(&db, 100).unwrap();
let before = entries.len();
let deduped = run_dedup(&db, 0.3).unwrap();
assert_eq!(deduped, 0, "cross-tier entries should not be deduped");
let entries_after = roboticus_db::memory_index::top_entries(&db, 100).unwrap();
assert_eq!(entries_after.len(), before);
}
#[test]
fn tier_sync_zeros_stale_entries() {
let db = test_db();
let ep_id =
roboticus_db::memory::store_episodic(&db, "incident", "Something happened", 5).unwrap();
roboticus_db::memory::mark_memory_stale(&db, "episodic_memory", &ep_id, "test").unwrap();
let synced = sync_index_with_tier_state(&db).unwrap();
assert!(synced >= 1, "stale entry should be synced");
let entries = roboticus_db::memory_index::top_entries(&db, 100).unwrap();
let entry = entries.iter().find(|e| e.source_id == ep_id);
if let Some(e) = entry {
assert!(e.confidence < 0.01, "confidence should be zeroed");
}
}
#[test]
fn consolidation_runs_without_error_on_empty_db() {
let db = test_db();
let report = run_consolidation(&db, None).unwrap();
assert!(
report.total_actions() <= 1,
"empty DB should have minimal consolidation work, got {}",
report.total_actions()
);
}
#[test]
fn consolidation_backfills_unindexed_memories() {
let db = test_db();
roboticus_db::memory::store_procedural(&db, "test_tool", "step1; step2").unwrap();
{
let conn = db.conn();
conn.execute(
"DELETE FROM memory_index WHERE source_table = 'procedural_memory'",
[],
)
.unwrap();
}
let report = run_consolidation(&db, None).unwrap();
assert!(
report.indexed >= 1,
"should backfill the unindexed procedural memory"
);
}
#[test]
fn confidence_reinforcement_on_recall() {
let db = test_db();
let id = roboticus_db::memory::store_semantic(
&db,
"facts",
"weather",
"It rains in Switzerland",
0.9,
)
.unwrap();
roboticus_db::memory_index::decay_confidence(&db, 0.5).unwrap();
let entries = roboticus_db::memory_index::top_entries(&db, 100).unwrap();
let entry = entries.iter().find(|e| e.source_id == id).unwrap();
assert!(entry.confidence < 0.6, "confidence should have decayed");
let content =
roboticus_db::memory_index::recall_content(&db, "semantic_memory", &id).unwrap();
assert!(content.is_some());
let entries = roboticus_db::memory_index::top_entries(&db, 100).unwrap();
let entry = entries.iter().find(|e| e.source_id == id).unwrap();
assert!(
(entry.confidence - 1.0).abs() < 0.01,
"recall should reinforce confidence to 1.0, got {}",
entry.confidence
);
}
}