use anyhow::Result;
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use crate::db;
use crate::llm::OllamaClient;
use crate::models::{Memory, Tier};
pub const CONSOLIDATE_JACCARD_THRESHOLD: f64 = 0.55;
pub const CONSOLIDATE_MAX_CLUSTER_SIZE: usize = 8;
pub const CURATOR_NAMESPACE: &str = "_curator";
#[allow(dead_code)]
pub trait AutonomyLlm {
fn auto_tag(&self, title: &str, content: &str) -> Result<Vec<String>>;
fn detect_contradiction(&self, mem_a: &str, mem_b: &str) -> Result<bool>;
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String>;
}
impl AutonomyLlm for OllamaClient {
fn auto_tag(&self, title: &str, content: &str) -> Result<Vec<String>> {
Self::auto_tag(self, title, content)
}
fn detect_contradiction(&self, mem_a: &str, mem_b: &str) -> Result<bool> {
Self::detect_contradiction(self, mem_a, mem_b)
}
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
Self::summarize_memories(self, memories)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "action", rename_all = "snake_case")]
pub enum RollbackEntry {
Consolidate {
originals: Vec<Memory>,
result_id: String,
},
Forget { snapshot: Memory },
PriorityAdjust {
memory_id: String,
before: i32,
after: i32,
},
}
impl RollbackEntry {
fn action_tag(&self) -> &'static str {
match self {
Self::Consolidate { .. } => "consolidate",
Self::Forget { .. } => "forget",
Self::PriorityAdjust { .. } => "priority_adjust",
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AutonomyPassReport {
pub clusters_formed: usize,
pub memories_consolidated: usize,
pub memories_forgotten: usize,
pub priority_adjustments: usize,
pub rollback_entries_written: usize,
pub errors: Vec<String>,
}
pub fn run_autonomy_passes(
conn: &Connection,
llm: &dyn AutonomyLlm,
candidates: &[Memory],
dry_run: bool,
) -> AutonomyPassReport {
let mut report = AutonomyPassReport::default();
let clusters = find_consolidation_clusters(candidates);
report.clusters_formed = clusters.len();
for cluster in clusters {
match consolidate_cluster(conn, llm, &cluster, dry_run) {
Ok(Some(entry)) => {
if !dry_run && let Err(e) = persist_rollback_entry(conn, &entry) {
report
.errors
.push(format!("rollback-log write failed: {e}"));
} else {
report.rollback_entries_written += 1;
}
if let RollbackEntry::Consolidate { originals, .. } = entry {
report.memories_consolidated += originals.len();
}
}
Ok(None) => {}
Err(e) => report.errors.push(format!("consolidate failed: {e}")),
}
}
for mem in candidates {
match forget_if_superseded(conn, mem, candidates, dry_run) {
Ok(Some(entry)) => {
if !dry_run && let Err(e) = persist_rollback_entry(conn, &entry) {
report
.errors
.push(format!("rollback-log write failed: {e}"));
} else {
report.rollback_entries_written += 1;
}
report.memories_forgotten += 1;
}
Ok(None) => {}
Err(e) => report.errors.push(format!("forget failed: {e}")),
}
}
#[allow(unused_assignments)]
for mem in candidates {
match apply_priority_feedback(conn, mem, dry_run) {
Ok(Some(entry)) => {
if !dry_run && let Err(e) = persist_rollback_entry(conn, &entry) {
report
.errors
.push(format!("rollback-log write failed: {e}"));
} else {
report.rollback_entries_written += 1;
}
report.priority_adjustments += 1;
}
Ok(None) => {}
Err(e) => report.errors.push(format!("priority feedback failed: {e}")),
}
}
report
}
fn find_consolidation_clusters(candidates: &[Memory]) -> Vec<Vec<Memory>> {
let mut by_ns: std::collections::HashMap<&str, Vec<&Memory>> = std::collections::HashMap::new();
for m in candidates {
if m.namespace.starts_with('_') {
continue;
}
by_ns.entry(&m.namespace).or_default().push(m);
}
let mut clusters: Vec<Vec<Memory>> = Vec::new();
for (_ns, group) in by_ns {
let mut used = vec![false; group.len()];
for i in 0..group.len() {
if used[i] {
continue;
}
let mut cluster = vec![group[i].clone()];
used[i] = true;
for j in (i + 1)..group.len() {
if used[j] {
continue;
}
if cluster.len() >= CONSOLIDATE_MAX_CLUSTER_SIZE {
break;
}
if jaccard_similarity(&group[i].content, &group[j].content)
>= CONSOLIDATE_JACCARD_THRESHOLD
{
cluster.push(group[j].clone());
used[j] = true;
}
}
if cluster.len() >= 2 {
clusters.push(cluster);
}
}
}
clusters
}
fn jaccard_similarity(a: &str, b: &str) -> f64 {
use std::collections::HashSet;
let tokens = |s: &str| -> HashSet<String> {
s.split(|c: char| !c.is_alphanumeric())
.filter(|t| t.len() >= 3)
.map(str::to_lowercase)
.collect()
};
let ta = tokens(a);
let tb = tokens(b);
if ta.is_empty() && tb.is_empty() {
return 0.0;
}
let inter = ta.intersection(&tb).count();
let union = ta.union(&tb).count();
if union == 0 {
0.0
} else {
#[allow(clippy::cast_precision_loss)]
let result = inter as f64 / union as f64;
result
}
}
fn consolidate_cluster(
conn: &Connection,
llm: &dyn AutonomyLlm,
cluster: &[Memory],
dry_run: bool,
) -> Result<Option<RollbackEntry>> {
if cluster.len() < 2 {
return Ok(None);
}
if cluster.iter().any(|m| m.namespace.starts_with('_')) {
return Ok(None);
}
let input: Vec<(String, String)> = cluster
.iter()
.map(|m| (m.title.clone(), m.content.clone()))
.collect();
let summary = llm.summarize_memories(&input)?;
let base_title = cluster
.iter()
.map(|m| m.title.as_str())
.next()
.unwrap_or("(consolidated)");
let title = format!("[consolidated] {base_title}");
if dry_run {
return Ok(Some(RollbackEntry::Consolidate {
originals: cluster.to_vec(),
result_id: "dry-run".to_string(),
}));
}
let ids: Vec<String> = cluster.iter().map(|m| m.id.clone()).collect();
let namespace = cluster[0].namespace.clone();
let tier = cluster
.iter()
.map(|m| m.tier.clone())
.max_by_key(tier_rank)
.unwrap_or(Tier::Mid);
let result_id = db::consolidate(
conn,
&ids,
&title,
&summary,
&namespace,
&tier,
"ai-memory curator (autonomy)",
"ai:curator",
)?;
Ok(Some(RollbackEntry::Consolidate {
originals: cluster.to_vec(),
result_id,
}))
}
fn tier_rank(t: &Tier) -> u8 {
match t {
Tier::Short => 0,
Tier::Mid => 1,
Tier::Long => 2,
}
}
fn forget_if_superseded(
conn: &Connection,
mem: &Memory,
all: &[Memory],
dry_run: bool,
) -> Result<Option<RollbackEntry>> {
let contradictions = mem
.metadata
.get("confirmed_contradictions")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
if contradictions.is_empty() {
return Ok(None);
}
let by_id: std::collections::HashMap<&str, &Memory> =
all.iter().map(|m| (m.id.as_str(), m)).collect();
let mut superseder: Option<&Memory> = None;
for v in contradictions {
let Some(other_id) = v.as_str() else {
continue;
};
if let Some(other) = by_id.get(other_id)
&& other.updated_at > mem.updated_at
&& other.confidence >= mem.confidence
{
superseder = Some(other);
break;
}
}
let Some(_) = superseder else {
return Ok(None);
};
if dry_run {
return Ok(Some(RollbackEntry::Forget {
snapshot: mem.clone(),
}));
}
db::delete(conn, &mem.id)?;
Ok(Some(RollbackEntry::Forget {
snapshot: mem.clone(),
}))
}
fn apply_priority_feedback(
conn: &Connection,
mem: &Memory,
dry_run: bool,
) -> Result<Option<RollbackEntry>> {
let now = chrono::Utc::now();
let before = mem.priority;
let mut after = before;
let last_accessed = mem
.last_accessed_at
.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(chrono::DateTime::<chrono::Utc>::from);
let created = chrono::DateTime::parse_from_rfc3339(&mem.created_at)
.ok()
.map(chrono::DateTime::<chrono::Utc>::from);
let recent = last_accessed.is_some_and(|t| (now - t).num_days() <= 7);
let cold_enough = created.is_some_and(|t| (now - t).num_days() >= 30);
if mem.access_count >= 10 && recent && after < 10 {
after = after.saturating_add(1).min(10);
} else if mem.access_count == 0 && cold_enough && after > 1 {
after = after.saturating_sub(1).max(1);
}
if after == before {
return Ok(None);
}
if !dry_run {
db::update(
conn,
&mem.id,
None,
None,
None,
None,
None,
Some(after),
None,
None,
None,
)?;
}
Ok(Some(RollbackEntry::PriorityAdjust {
memory_id: mem.id.clone(),
before,
after,
}))
}
fn persist_rollback_entry(conn: &Connection, entry: &RollbackEntry) -> Result<()> {
let now = chrono::Utc::now();
let ts = now.to_rfc3339();
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: format!("{CURATOR_NAMESPACE}/rollback"),
title: format!("curator {} @ {}", entry.action_tag(), ts),
content: serde_json::to_string(entry)?,
tags: vec![
"_curator".to_string(),
"_rollback".to_string(),
entry.action_tag().to_string(),
],
priority: 3,
confidence: 1.0,
source: "ai-memory curator (autonomy)".to_string(),
access_count: 0,
created_at: ts.clone(),
updated_at: ts,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({
"agent_id": "ai:curator",
"action": entry.action_tag(),
}),
};
db::insert(conn, &mem)?;
Ok(())
}
pub fn persist_self_report(
conn: &Connection,
cycle_duration_ms: u128,
pass_report: &AutonomyPassReport,
auto_tagged: usize,
contradictions_found: usize,
errors_total: usize,
) -> Result<()> {
let now = chrono::Utc::now();
let ts = now.to_rfc3339();
let body = serde_json::json!({
"cycle_ts": ts,
"cycle_duration_ms": cycle_duration_ms,
"auto_tagged": auto_tagged,
"contradictions_found": contradictions_found,
"clusters_formed": pass_report.clusters_formed,
"memories_consolidated": pass_report.memories_consolidated,
"memories_forgotten": pass_report.memories_forgotten,
"priority_adjustments": pass_report.priority_adjustments,
"rollback_entries_written": pass_report.rollback_entries_written,
"errors_total": errors_total,
});
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: format!("{CURATOR_NAMESPACE}/reports"),
title: format!("curator cycle @ {ts}"),
content: serde_json::to_string_pretty(&body)?,
tags: vec!["_curator".to_string(), "_report".to_string()],
priority: 2,
confidence: 1.0,
source: "ai-memory curator (autonomy)".to_string(),
access_count: 0,
created_at: ts.clone(),
updated_at: ts,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id": "ai:curator"}),
};
db::insert(conn, &mem)?;
Ok(())
}
pub fn reverse_rollback_entry(conn: &Connection, entry: &RollbackEntry) -> Result<bool> {
match entry {
RollbackEntry::Consolidate {
originals,
result_id,
} => {
for m in originals {
check_no_collision(conn, &m.title, &m.namespace, &m.id)?;
}
let existed = db::delete(conn, result_id)?;
for m in originals {
db::insert(conn, m)?;
}
Ok(existed)
}
RollbackEntry::Forget { snapshot } => {
check_no_collision(conn, &snapshot.title, &snapshot.namespace, &snapshot.id)?;
db::insert(conn, snapshot)?;
Ok(true)
}
RollbackEntry::PriorityAdjust {
memory_id,
before,
after: _,
} => {
let _ = db::update(
conn,
memory_id,
None,
None,
None,
None,
None,
Some(*before),
None,
None,
None,
)?;
Ok(true)
}
}
}
fn check_no_collision(
conn: &Connection,
title: &str,
namespace: &str,
expected_id: &str,
) -> Result<()> {
let rows = db::list(
conn,
Some(namespace),
None,
50,
0,
None,
None,
None,
None,
None,
)?;
for row in rows {
if row.namespace == namespace && row.title == title && row.id != expected_id {
anyhow::bail!(
"rollback aborted: memory {} now occupies (title={:?}, namespace={:?}) — \
reverting would overwrite it. Resolve the conflict manually.",
row.id,
title,
namespace
);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
struct StubLlm {
auto_tag_result: Vec<String>,
summary: String,
contradiction_sentinel: String,
calls: Mutex<Vec<String>>,
}
impl StubLlm {
fn new(summary: &str) -> Self {
Self {
auto_tag_result: vec!["auto".to_string(), "stub".to_string()],
summary: summary.to_string(),
contradiction_sentinel: "CONTRADICTS".to_string(),
calls: Mutex::new(Vec::new()),
}
}
}
impl AutonomyLlm for StubLlm {
fn auto_tag(&self, title: &str, _content: &str) -> Result<Vec<String>> {
self.calls.lock().unwrap().push(format!("auto_tag:{title}"));
Ok(self.auto_tag_result.clone())
}
fn detect_contradiction(&self, a: &str, b: &str) -> Result<bool> {
self.calls
.lock()
.unwrap()
.push("detect_contradiction".to_string());
Ok(
a.contains(&self.contradiction_sentinel)
|| b.contains(&self.contradiction_sentinel),
)
}
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
self.calls
.lock()
.unwrap()
.push(format!("summarize:{}", memories.len()));
Ok(self.summary.clone())
}
}
fn sample_mem(id: &str, ns: &str, title: &str, content: &str, tier: Tier) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: id.to_string(),
tier,
namespace: ns.to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec!["t".to_string()],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id":"ai:test"}),
}
}
fn setup_conn() -> (tempfile::NamedTempFile, Connection) {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
(tmp, conn)
}
#[test]
fn jaccard_similarity_basic() {
let sim = jaccard_similarity(
"the quick brown fox jumps over",
"quick brown fox over the lazy",
);
assert!(sim > 0.4, "unexpected sim {sim}");
}
#[test]
fn jaccard_similarity_empty() {
assert!((jaccard_similarity("", "") - 0.0).abs() < 1e-9);
assert!((jaccard_similarity("abc", "") - 0.0).abs() < 1e-9);
}
#[test]
fn consolidation_clusters_group_by_namespace() {
let a = sample_mem(
"a",
"ns1",
"A",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
let b = sample_mem(
"b",
"ns1",
"B",
"quick brown fox over lazy dog jumps",
Tier::Mid,
);
let c = sample_mem(
"c",
"ns2",
"C",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
let clusters = find_consolidation_clusters(&[a, b, c]);
assert_eq!(clusters.len(), 1);
assert_eq!(clusters[0].len(), 2);
}
#[test]
fn consolidation_skips_reserved_namespace() {
let a = sample_mem("a", "_curator/reports", "A", "content aaaa bbbb", Tier::Mid);
let b = sample_mem("b", "_curator/reports", "B", "content aaaa bbbb", Tier::Mid);
let clusters = find_consolidation_clusters(&[a, b]);
assert!(clusters.is_empty());
}
#[test]
fn rollback_entry_serialises() {
let e = RollbackEntry::PriorityAdjust {
memory_id: "m1".to_string(),
before: 5,
after: 6,
};
let json = serde_json::to_string(&e).unwrap();
assert!(json.contains("priority_adjust"));
let back: RollbackEntry = serde_json::from_str(&json).unwrap();
assert_eq!(back.action_tag(), "priority_adjust");
}
#[test]
fn consolidate_cluster_merges_two_memories() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"app",
"Deploy plan",
"kubernetes rolling deploy with canary",
Tier::Long,
);
let b = sample_mem(
"b",
"app",
"Deploy process",
"kubernetes deploy rolling canary strategy",
Tier::Long,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("consolidated deploy plan");
let cluster = vec![a.clone(), b.clone()];
let entry = consolidate_cluster(&conn, &llm, &cluster, false)
.unwrap()
.expect("expected rollback entry");
match entry {
RollbackEntry::Consolidate {
originals,
result_id,
} => {
assert_eq!(originals.len(), 2);
assert_ne!(result_id, "dry-run");
let got = db::get(&conn, &result_id).unwrap().expect("result memory");
assert_eq!(got.namespace, "app");
assert!(got.title.starts_with("[consolidated]"));
assert!(got.content.contains("consolidated deploy plan"));
}
_ => panic!("expected Consolidate"),
}
}
#[test]
fn dry_run_does_not_write() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"app",
"Deploy plan",
"kubernetes rolling deploy with canary",
Tier::Long,
);
let b = sample_mem(
"b",
"app",
"Deploy process",
"kubernetes deploy rolling canary strategy",
Tier::Long,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("never persisted");
let cluster = vec![a.clone(), b.clone()];
let entry = consolidate_cluster(&conn, &llm, &cluster, true)
.unwrap()
.expect("dry-run returns entry");
if let RollbackEntry::Consolidate { result_id, .. } = entry {
assert_eq!(result_id, "dry-run");
}
assert!(db::get(&conn, "a").unwrap().is_some());
assert!(db::get(&conn, "b").unwrap().is_some());
}
#[test]
fn reverse_consolidation_restores_originals() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"app",
"Deploy plan",
"kubernetes rolling deploy canary",
Tier::Long,
);
let b = sample_mem(
"b",
"app",
"Deploy process",
"kubernetes rolling canary strategy",
Tier::Long,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("summary");
let cluster = vec![a.clone(), b.clone()];
let entry = consolidate_cluster(&conn, &llm, &cluster, false)
.unwrap()
.expect("entry");
if let RollbackEntry::Consolidate {
result_id,
originals,
} = &entry
{
assert!(db::get(&conn, result_id).unwrap().is_some());
for orig in originals {
assert!(
db::get(&conn, &orig.id).unwrap().is_none(),
"{} should be merged-away",
orig.id
);
}
}
reverse_rollback_entry(&conn, &entry).unwrap();
assert!(db::get(&conn, "a").unwrap().is_some());
assert!(db::get(&conn, "b").unwrap().is_some());
if let RollbackEntry::Consolidate { result_id, .. } = &entry {
assert!(db::get(&conn, result_id).unwrap().is_none());
}
}
#[test]
fn full_autonomy_cycle_end_to_end() {
let (_tmp, conn) = setup_conn();
let llm = StubLlm::new("consolidated");
let m_a = sample_mem(
"ma",
"deploy",
"canary deploy plan",
"kubernetes canary rolling deploy strategy",
Tier::Long,
);
let m_b = sample_mem(
"mb",
"deploy",
"canary deploy overview",
"kubernetes rolling canary deploy strategy",
Tier::Long,
);
let m_chat = sample_mem(
"mchat",
"chat",
"hello",
"hi there chat only content here",
Tier::Mid,
);
let mut m_old = sample_mem(
"mold",
"facts",
"fact v1",
"the sky is green always uniformly",
Tier::Long,
);
let m_new_id = "mnew";
m_old.metadata["confirmed_contradictions"] = serde_json::json!([m_new_id]);
m_old.updated_at = (chrono::Utc::now() - chrono::Duration::days(30)).to_rfc3339();
let m_new = sample_mem(
m_new_id,
"facts",
"fact v2",
"the sky is blue most of the time for sure",
Tier::Long,
);
for m in [&m_a, &m_b, &m_chat, &m_old, &m_new] {
db::insert(&conn, m).unwrap();
}
let candidates = vec![
m_a.clone(),
m_b.clone(),
m_chat.clone(),
m_old.clone(),
m_new.clone(),
];
let report = run_autonomy_passes(&conn, &llm, &candidates, false);
assert!(report.clusters_formed >= 1);
assert!(report.memories_consolidated >= 2);
assert!(
report.memories_forgotten >= 1,
"expected ≥1 forget, got {:?}",
report
);
assert!(report.rollback_entries_written >= report.clusters_formed);
let log = db::list(
&conn,
Some("_curator/rollback"),
None,
100,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert!(!log.is_empty(), "rollback log should be populated");
}
#[test]
fn self_report_written_to_reports_namespace() {
let (_tmp, conn) = setup_conn();
let pass = AutonomyPassReport {
clusters_formed: 1,
memories_consolidated: 2,
memories_forgotten: 0,
priority_adjustments: 1,
rollback_entries_written: 2,
errors: vec![],
};
persist_self_report(&conn, 1234, &pass, 3, 0, 0).unwrap();
let reports = db::list(
&conn,
Some("_curator/reports"),
None,
10,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert_eq!(reports.len(), 1);
assert!(reports[0].content.contains("memories_consolidated"));
}
}