use crate::models::ConfidenceSource;
use crate::models::field_names;
use anyhow::Result;
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use crate::db;
use crate::llm::OllamaClient;
use crate::models::{Memory, Tier};
const CURATOR_SOURCE_LABEL: &str = "ai-memory curator (autonomy)";
pub const CONSOLIDATE_JACCARD_THRESHOLD: f64 = 0.55;
pub const CONSOLIDATE_COSINE_THRESHOLD: f64 = 0.75;
pub const CONSOLIDATE_MAX_CLUSTER_SIZE: usize = 8;
pub const CURATOR_NAMESPACE: &str = "_curator";
#[allow(dead_code)]
pub trait AutonomyLlm {
fn auto_tag(&self, title: &str, content: &str) -> Result<Vec<String>>;
fn detect_contradiction(&self, mem_a: &str, mem_b: &str) -> Result<bool>;
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String>;
}
impl AutonomyLlm for OllamaClient {
fn auto_tag(&self, title: &str, content: &str) -> Result<Vec<String>> {
Self::auto_tag(self, title, content, None)
}
fn detect_contradiction(&self, mem_a: &str, mem_b: &str) -> Result<bool> {
Self::detect_contradiction(self, mem_a, mem_b)
}
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
Self::summarize_memories(self, memories)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "action", rename_all = "snake_case")]
pub enum RollbackEntry {
Consolidate {
originals: Vec<Memory>,
result_id: String,
},
Forget { snapshot: Memory },
PriorityAdjust {
memory_id: String,
before: i32,
after: i32,
},
}
impl RollbackEntry {
fn action_tag(&self) -> &'static str {
match self {
Self::Consolidate { .. } => crate::audit::OP_CONSOLIDATE,
Self::Forget { .. } => "forget",
Self::PriorityAdjust { .. } => "priority_adjust",
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AutonomyPassReport {
pub clusters_formed: usize,
pub memories_consolidated: usize,
pub memories_forgotten: usize,
pub priority_adjustments: usize,
pub rollback_entries_written: usize,
pub errors: Vec<String>,
}
pub fn run_autonomy_passes(
conn: &Connection,
llm: &dyn AutonomyLlm,
candidates: &[Memory],
dry_run: bool,
) -> AutonomyPassReport {
let mut report = AutonomyPassReport::default();
let clusters = find_consolidation_clusters(conn, candidates);
report.clusters_formed = clusters.len();
for cluster in clusters {
match consolidate_cluster(conn, llm, &cluster, dry_run) {
Ok(Some(entry)) => {
if !dry_run && let Err(e) = persist_rollback_entry(conn, &entry) {
report.errors.push(rollback_log_write_failed(&e));
} else {
report.rollback_entries_written += 1;
}
if let RollbackEntry::Consolidate { originals, .. } = entry {
report.memories_consolidated += originals.len();
}
}
Ok(None) => {}
Err(e) => report.errors.push(format!("consolidate failed: {e}")),
}
}
for mem in candidates {
match forget_if_superseded(conn, mem, candidates, dry_run) {
Ok(Some(entry)) => {
if !dry_run && let Err(e) = persist_rollback_entry(conn, &entry) {
report.errors.push(rollback_log_write_failed(&e));
} else {
report.rollback_entries_written += 1;
}
report.memories_forgotten += 1;
}
Ok(None) => {}
Err(e) => report.errors.push(format!("forget failed: {e}")),
}
}
#[allow(unused_assignments)]
for mem in candidates {
match apply_priority_feedback(conn, mem, dry_run) {
Ok(Some(entry)) => {
if !dry_run && let Err(e) = persist_rollback_entry(conn, &entry) {
report.errors.push(rollback_log_write_failed(&e));
} else {
report.rollback_entries_written += 1;
}
report.priority_adjustments += 1;
}
Ok(None) => {}
Err(e) => report.errors.push(format!("priority feedback failed: {e}")),
}
}
report
}
fn find_consolidation_clusters(conn: &Connection, candidates: &[Memory]) -> Vec<Vec<Memory>> {
let mut by_ns: std::collections::HashMap<&str, Vec<&Memory>> = std::collections::HashMap::new();
for m in candidates {
if m.namespace.starts_with('_') {
continue;
}
by_ns.entry(&m.namespace).or_default().push(m);
}
let mut clusters: Vec<Vec<Memory>> = Vec::new();
for (_ns, group) in by_ns {
let mut used = vec![false; group.len()];
for i in 0..group.len() {
if used[i] {
continue;
}
let mut cluster = vec![group[i].clone()];
used[i] = true;
let seed_emb = db::get_embedding(conn, &group[i].id).ok().flatten();
for j in (i + 1)..group.len() {
if used[j] {
continue;
}
if cluster.len() >= CONSOLIDATE_MAX_CLUSTER_SIZE {
break;
}
let j_sim = jaccard_similarity(&group[i].content, &group[j].content);
if j_sim < CONSOLIDATE_JACCARD_THRESHOLD {
continue;
}
let pair_emb = db::get_embedding(conn, &group[j].id).ok().flatten();
let matches_cluster = match (seed_emb.as_ref(), pair_emb.as_ref()) {
(Some(a), Some(b)) => {
let cos = f64::from(crate::embeddings::Embedder::cosine_similarity(a, b));
cos >= CONSOLIDATE_COSINE_THRESHOLD
}
_ => true,
};
if matches_cluster {
cluster.push(group[j].clone());
used[j] = true;
}
}
if cluster.len() >= 2 {
clusters.push(cluster);
}
}
}
clusters
}
fn jaccard_similarity(a: &str, b: &str) -> f64 {
use std::collections::HashSet;
let tokens = |s: &str| -> HashSet<String> {
s.split(|c: char| !c.is_alphanumeric())
.filter(|t| t.len() >= 3)
.map(str::to_lowercase)
.collect()
};
let ta = tokens(a);
let tb = tokens(b);
if ta.is_empty() && tb.is_empty() {
return 0.0;
}
let inter = ta.intersection(&tb).count();
let union = ta.union(&tb).count();
if union == 0 {
0.0
} else {
#[allow(clippy::cast_precision_loss)]
let result = inter as f64 / union as f64;
result
}
}
fn consolidate_cluster(
conn: &Connection,
llm: &dyn AutonomyLlm,
cluster: &[Memory],
dry_run: bool,
) -> Result<Option<RollbackEntry>> {
if cluster.len() < 2 {
return Ok(None);
}
if cluster.iter().any(|m| m.namespace.starts_with('_')) {
return Ok(None);
}
let input: Vec<(String, String)> = cluster
.iter()
.map(|m| (m.title.clone(), m.content.clone()))
.collect();
let summary = llm.summarize_memories(&input)?;
let base_title = cluster
.iter()
.map(|m| m.title.as_str())
.next()
.unwrap_or("(consolidated)");
let title = format!("[consolidated] {base_title}");
if dry_run {
return Ok(Some(RollbackEntry::Consolidate {
originals: cluster.to_vec(),
result_id: "dry-run".to_string(),
}));
}
let ids: Vec<String> = cluster.iter().map(|m| m.id.clone()).collect();
let namespace = cluster[0].namespace.clone();
let tier = cluster
.iter()
.map(|m| m.tier.clone())
.max_by_key(tier_rank)
.unwrap_or(Tier::Mid);
let result_id = db::consolidate(
conn,
&ids,
&title,
&summary,
&namespace,
&tier,
CURATOR_SOURCE_LABEL,
crate::identity::sentinels::AI_CURATOR,
)?;
Ok(Some(RollbackEntry::Consolidate {
originals: cluster.to_vec(),
result_id,
}))
}
fn tier_rank(t: &Tier) -> u8 {
match t {
Tier::Short => 0,
Tier::Mid => 1,
Tier::Long => 2,
}
}
fn forget_if_superseded(
conn: &Connection,
mem: &Memory,
all: &[Memory],
dry_run: bool,
) -> Result<Option<RollbackEntry>> {
let contradictions = mem
.metadata
.get(field_names::CONFIRMED_CONTRADICTIONS)
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
if contradictions.is_empty() {
return Ok(None);
}
let by_id: std::collections::HashMap<&str, &Memory> =
all.iter().map(|m| (m.id.as_str(), m)).collect();
let mut superseder: Option<&Memory> = None;
for v in contradictions {
let Some(other_id) = v.as_str() else {
continue;
};
if let Some(other) = by_id.get(other_id)
&& other.updated_at > mem.updated_at
&& other.confidence >= mem.confidence
{
superseder = Some(other);
break;
}
}
let Some(_) = superseder else {
return Ok(None);
};
if dry_run {
return Ok(Some(RollbackEntry::Forget {
snapshot: mem.clone(),
}));
}
db::delete(conn, &mem.id)?;
Ok(Some(RollbackEntry::Forget {
snapshot: mem.clone(),
}))
}
fn apply_priority_feedback(
conn: &Connection,
mem: &Memory,
dry_run: bool,
) -> Result<Option<RollbackEntry>> {
let now = chrono::Utc::now();
let before = mem.priority;
let mut after = before;
let last_accessed = mem
.last_accessed_at
.as_deref()
.and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok())
.map(chrono::DateTime::<chrono::Utc>::from);
let created = chrono::DateTime::parse_from_rfc3339(&mem.created_at)
.ok()
.map(chrono::DateTime::<chrono::Utc>::from);
let recent = last_accessed.is_some_and(|t| (now - t).num_days() <= 7);
let cold_enough = created.is_some_and(|t| (now - t).num_days() >= 30);
if mem.access_count >= 10 && recent && after < 10 {
after = after.saturating_add(1).min(10);
} else if mem.access_count == 0 && cold_enough && after > 1 {
after = after.saturating_sub(1).max(1);
}
if after == before {
return Ok(None);
}
if !dry_run {
db::update(
conn,
&mem.id,
None,
None,
None,
None,
None,
Some(after),
None,
None,
None,
)?;
}
Ok(Some(RollbackEntry::PriorityAdjust {
memory_id: mem.id.clone(),
before,
after,
}))
}
fn rollback_log_write_failed(e: &dyn std::fmt::Display) -> String {
format!("rollback-log write failed: {e}")
}
fn persist_rollback_entry(conn: &Connection, entry: &RollbackEntry) -> Result<()> {
let now = chrono::Utc::now();
let ts = now.to_rfc3339();
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: format!("{CURATOR_NAMESPACE}/rollback"),
title: format!("curator {} @ {}", entry.action_tag(), ts),
content: serde_json::to_string(entry)?,
tags: vec![
"_curator".to_string(),
"_rollback".to_string(),
entry.action_tag().to_string(),
],
priority: 3,
confidence: 1.0,
source: CURATOR_SOURCE_LABEL.to_string(),
access_count: 0,
created_at: ts.clone(),
updated_at: ts,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({
"agent_id": crate::identity::sentinels::AI_CURATOR,
"action": entry.action_tag(),
}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(conn, &mem)?;
Ok(())
}
pub fn persist_self_report(
conn: &Connection,
cycle_duration_ms: u128,
pass_report: &AutonomyPassReport,
auto_tagged: usize,
contradictions_found: usize,
personas_generated: usize,
errors_total: usize,
) -> Result<()> {
let now = chrono::Utc::now();
let ts = now.to_rfc3339();
let body = serde_json::json!({
"cycle_ts": ts,
"cycle_duration_ms": cycle_duration_ms,
"auto_tagged": auto_tagged,
"contradictions_found": contradictions_found,
"personas_generated": personas_generated,
"clusters_formed": pass_report.clusters_formed,
"memories_consolidated": pass_report.memories_consolidated,
"memories_forgotten": pass_report.memories_forgotten,
"priority_adjustments": pass_report.priority_adjustments,
"rollback_entries_written": pass_report.rollback_entries_written,
"errors_total": errors_total,
});
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: format!("{CURATOR_NAMESPACE}/reports"),
title: format!("curator cycle @ {ts}"),
content: serde_json::to_string_pretty(&body)?,
tags: vec!["_curator".to_string(), "_report".to_string()],
priority: 2,
confidence: 1.0,
source: CURATOR_SOURCE_LABEL.to_string(),
access_count: 0,
created_at: ts.clone(),
updated_at: ts,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id": crate::identity::sentinels::AI_CURATOR}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(conn, &mem)?;
Ok(())
}
pub fn reverse_rollback_entry(conn: &Connection, entry: &RollbackEntry) -> Result<bool> {
match entry {
RollbackEntry::Consolidate {
originals,
result_id,
} => {
for m in originals {
check_no_collision(conn, &m.title, &m.namespace, &m.id)?;
}
let existed = db::delete(conn, result_id)?;
for m in originals {
db::insert(conn, m)?;
}
Ok(existed)
}
RollbackEntry::Forget { snapshot } => {
check_no_collision(conn, &snapshot.title, &snapshot.namespace, &snapshot.id)?;
db::insert(conn, snapshot)?;
Ok(true)
}
RollbackEntry::PriorityAdjust {
memory_id,
before,
after: _,
} => {
let _ = db::update(
conn,
memory_id,
None,
None,
None,
None,
None,
Some(*before),
None,
None,
None,
)?;
Ok(true)
}
}
}
fn check_no_collision(
conn: &Connection,
title: &str,
namespace: &str,
expected_id: &str,
) -> Result<()> {
let rows = db::list(
conn,
Some(namespace),
None,
50,
0,
None,
None,
None,
None,
None,
)?;
for row in rows {
if row.namespace == namespace && row.title == title && row.id != expected_id {
anyhow::bail!(
"rollback aborted: memory {} now occupies (title={:?}, namespace={:?}) — \
reverting would overwrite it. Resolve the conflict manually.",
row.id,
title,
namespace
);
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
struct StubLlm {
#[allow(dead_code)]
auto_tag_result: Vec<String>,
summary: String,
#[allow(dead_code)]
contradiction_sentinel: String,
calls: Mutex<Vec<String>>,
}
impl StubLlm {
fn new(summary: &str) -> Self {
Self {
auto_tag_result: vec!["auto".to_string(), "stub".to_string()],
summary: summary.to_string(),
contradiction_sentinel: "CONTRADICTS".to_string(),
calls: Mutex::new(Vec::new()),
}
}
}
impl AutonomyLlm for StubLlm {
fn auto_tag(&self, title: &str, _content: &str) -> Result<Vec<String>> {
self.calls.lock().unwrap().push(format!("auto_tag:{title}"));
Ok(self.auto_tag_result.clone())
}
fn detect_contradiction(&self, a: &str, b: &str) -> Result<bool> {
self.calls
.lock()
.unwrap()
.push("detect_contradiction".to_string());
Ok(
a.contains(&self.contradiction_sentinel)
|| b.contains(&self.contradiction_sentinel),
)
}
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
self.calls
.lock()
.unwrap()
.push(format!("summarize:{}", memories.len()));
Ok(self.summary.clone())
}
}
fn sample_mem(id: &str, ns: &str, title: &str, content: &str, tier: Tier) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: id.to_string(),
tier,
namespace: ns.to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec!["t".to_string()],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"agent_id":"ai:test"}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
fn setup_conn() -> (tempfile::NamedTempFile, Connection) {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
(tmp, conn)
}
#[test]
fn jaccard_similarity_basic() {
let sim = jaccard_similarity(
"the quick brown fox jumps over",
"quick brown fox over the lazy",
);
assert!(sim > 0.4, "unexpected sim {sim}");
}
#[test]
fn jaccard_similarity_empty() {
assert!((jaccard_similarity("", "") - 0.0).abs() < 1e-9);
assert!((jaccard_similarity("abc", "") - 0.0).abs() < 1e-9);
}
#[test]
fn consolidation_clusters_group_by_namespace() {
let a = sample_mem(
"a",
"ns1",
"A",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
let b = sample_mem(
"b",
"ns1",
"B",
"quick brown fox over lazy dog jumps",
Tier::Mid,
);
let c = sample_mem(
"c",
"ns2",
"C",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
let (_tmp, conn) = setup_conn();
let clusters = find_consolidation_clusters(&conn, &[a, b, c]);
assert_eq!(clusters.len(), 1);
assert_eq!(clusters[0].len(), 2);
}
#[test]
fn consolidation_skips_reserved_namespace() {
let a = sample_mem("a", "_curator/reports", "A", "content aaaa bbbb", Tier::Mid);
let b = sample_mem("b", "_curator/reports", "B", "content aaaa bbbb", Tier::Mid);
let (_tmp, conn) = setup_conn();
let clusters = find_consolidation_clusters(&conn, &[a, b]);
assert!(clusters.is_empty());
}
fn synth_emb(values: &[f32]) -> Vec<f32> {
let norm: f32 = values.iter().map(|v| v * v).sum::<f32>().sqrt();
if norm < 1e-12 {
return values.to_vec();
}
values.iter().map(|v| v / norm).collect()
}
#[test]
fn test_consolidation_uses_cosine_when_embeddings_present() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"ns1",
"A",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
let b = sample_mem(
"b",
"ns1",
"B",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
db::set_embedding(&conn, &a.id, &synth_emb(&[1.0, 0.0, 0.0, 0.0])).unwrap();
db::set_embedding(&conn, &b.id, &synth_emb(&[0.0, 1.0, 0.0, 0.0])).unwrap();
let clusters = find_consolidation_clusters(&conn, &[a, b]);
assert!(
clusters.is_empty(),
"cosine-dissimilar embeddings must defeat the Jaccard-only cluster (cosine is primary)",
);
let c = sample_mem(
"c",
"ns2",
"C",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
let d = sample_mem(
"d",
"ns2",
"D",
"the quick brown fox jumps over lazy dog",
Tier::Mid,
);
db::insert(&conn, &c).unwrap();
db::insert(&conn, &d).unwrap();
db::set_embedding(&conn, &c.id, &synth_emb(&[1.0, 0.0, 0.0, 0.0])).unwrap();
db::set_embedding(&conn, &d.id, &synth_emb(&[0.99, 0.1, 0.0, 0.0])).unwrap();
let clusters2 = find_consolidation_clusters(&conn, &[c, d]);
assert_eq!(
clusters2.len(),
1,
"cosine-similar embeddings on a Jaccard-similar pair must cluster"
);
assert_eq!(clusters2[0].len(), 2);
}
#[test]
fn test_consolidation_falls_back_to_jaccard_no_embeddings() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"ns",
"A",
"kubernetes rolling canary deploy strategy keyword keyword",
Tier::Long,
);
let b = sample_mem(
"b",
"ns",
"B",
"kubernetes rolling canary deploy strategy keyword keyword",
Tier::Long,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let clusters = find_consolidation_clusters(&conn, &[a, b]);
assert_eq!(
clusters.len(),
1,
"keyword-tier corpus (no embeddings) must still cluster via Jaccard"
);
assert_eq!(clusters[0].len(), 2);
}
#[test]
fn rollback_entry_serialises() {
let e = RollbackEntry::PriorityAdjust {
memory_id: "m1".to_string(),
before: 5,
after: 6,
};
let json = serde_json::to_string(&e).unwrap();
assert!(json.contains("priority_adjust"));
let back: RollbackEntry = serde_json::from_str(&json).unwrap();
assert_eq!(back.action_tag(), "priority_adjust");
}
#[test]
fn consolidate_cluster_merges_two_memories() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"app",
"Deploy plan",
"kubernetes rolling deploy with canary",
Tier::Long,
);
let b = sample_mem(
"b",
"app",
"Deploy process",
"kubernetes deploy rolling canary strategy",
Tier::Long,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("consolidated deploy plan");
let cluster = vec![a.clone(), b.clone()];
let entry = consolidate_cluster(&conn, &llm, &cluster, false)
.unwrap()
.expect("expected rollback entry");
match entry {
RollbackEntry::Consolidate {
originals,
result_id,
} => {
assert_eq!(originals.len(), 2);
assert_ne!(result_id, "dry-run");
let got = db::get(&conn, &result_id).unwrap().expect("result memory");
assert_eq!(got.namespace, "app");
assert!(got.title.starts_with("[consolidated]"));
assert!(got.content.contains("consolidated deploy plan"));
}
_ => panic!("expected Consolidate"),
}
}
#[test]
fn dry_run_does_not_write() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"app",
"Deploy plan",
"kubernetes rolling deploy with canary",
Tier::Long,
);
let b = sample_mem(
"b",
"app",
"Deploy process",
"kubernetes deploy rolling canary strategy",
Tier::Long,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("never persisted");
let cluster = vec![a.clone(), b.clone()];
let entry = consolidate_cluster(&conn, &llm, &cluster, true)
.unwrap()
.expect("dry-run returns entry");
if let RollbackEntry::Consolidate { result_id, .. } = entry {
assert_eq!(result_id, "dry-run");
}
assert!(db::get(&conn, "a").unwrap().is_some());
assert!(db::get(&conn, "b").unwrap().is_some());
}
#[test]
fn reverse_consolidation_restores_originals() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"app",
"Deploy plan",
"kubernetes rolling deploy canary",
Tier::Long,
);
let b = sample_mem(
"b",
"app",
"Deploy process",
"kubernetes rolling canary strategy",
Tier::Long,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("summary");
let cluster = vec![a.clone(), b.clone()];
let entry = consolidate_cluster(&conn, &llm, &cluster, false)
.unwrap()
.expect("entry");
if let RollbackEntry::Consolidate {
result_id,
originals,
} = &entry
{
assert!(db::get(&conn, result_id).unwrap().is_some());
for orig in originals {
assert!(
db::get(&conn, &orig.id).unwrap().is_none(),
"{} should be merged-away",
orig.id
);
}
}
reverse_rollback_entry(&conn, &entry).unwrap();
assert!(db::get(&conn, "a").unwrap().is_some());
assert!(db::get(&conn, "b").unwrap().is_some());
if let RollbackEntry::Consolidate { result_id, .. } = &entry {
assert!(db::get(&conn, result_id).unwrap().is_none());
}
}
#[test]
fn full_autonomy_cycle_end_to_end() {
let (_tmp, conn) = setup_conn();
let llm = StubLlm::new("consolidated");
let m_a = sample_mem(
"ma",
"deploy",
"canary deploy plan",
"kubernetes canary rolling deploy strategy",
Tier::Long,
);
let m_b = sample_mem(
"mb",
"deploy",
"canary deploy overview",
"kubernetes rolling canary deploy strategy",
Tier::Long,
);
let m_chat = sample_mem(
"mchat",
"chat",
"hello",
"hi there chat only content here",
Tier::Mid,
);
let mut m_old = sample_mem(
"mold",
"facts",
"fact v1",
"the sky is green always uniformly",
Tier::Long,
);
let m_new_id = "mnew";
m_old.metadata["confirmed_contradictions"] = serde_json::json!([m_new_id]);
m_old.updated_at = (chrono::Utc::now() - chrono::Duration::days(30)).to_rfc3339();
let m_new = sample_mem(
m_new_id,
"facts",
"fact v2",
"the sky is blue most of the time for sure",
Tier::Long,
);
for m in [&m_a, &m_b, &m_chat, &m_old, &m_new] {
db::insert(&conn, m).unwrap();
}
let candidates = vec![
m_a.clone(),
m_b.clone(),
m_chat.clone(),
m_old.clone(),
m_new.clone(),
];
let report = run_autonomy_passes(&conn, &llm, &candidates, false);
assert!(report.clusters_formed >= 1);
assert!(report.memories_consolidated >= 2);
assert!(
report.memories_forgotten >= 1,
"expected ≥1 forget, got {report:?}"
);
assert!(report.rollback_entries_written >= report.clusters_formed);
let log = db::list(
&conn,
Some("_curator/rollback"),
None,
100,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert!(!log.is_empty(), "rollback log should be populated");
}
#[test]
fn self_report_written_to_reports_namespace() {
let (_tmp, conn) = setup_conn();
let pass = AutonomyPassReport {
clusters_formed: 1,
memories_consolidated: 2,
memories_forgotten: 0,
priority_adjustments: 1,
rollback_entries_written: 2,
errors: vec![],
};
persist_self_report(&conn, 1234, &pass, 3, 0, 0, 0).unwrap();
let reports = db::list(
&conn,
Some("_curator/reports"),
None,
10,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert_eq!(reports.len(), 1);
assert!(reports[0].content.contains("memories_consolidated"));
}
#[test]
fn smart_tier_mock_cycle_summarize() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"mem-a",
"app",
"Deploy A",
"kubernetes deployment rolling canary strategy kubernetes rolling deploy canary",
Tier::Mid,
);
let b = sample_mem(
"mem-b",
"app",
"Deploy B",
"kubernetes deployment rolling canary approach kubernetes rolling canary deploy",
Tier::Mid,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("LLM-generated consolidated summary");
let candidates = vec![a, b];
let report = run_autonomy_passes(&conn, &llm, &candidates, false);
assert!(report.clusters_formed > 0);
assert!(report.memories_consolidated > 0);
}
#[test]
fn autonomy_cycle_with_mock_ollama() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"id-1",
"ns1",
"Title A",
"content similar enough for clustering test similar clustering",
Tier::Mid,
);
let b = sample_mem(
"id-2",
"ns1",
"Title B",
"content similar enough for clustering test similar clustering",
Tier::Mid,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("mock summary result");
let candidates = vec![a, b];
let report = run_autonomy_passes(&conn, &llm, &candidates, false);
assert_eq!(report.errors.len(), 0, "autonomy cycle should not error");
assert!(
report.rollback_entries_written > 0,
"autonomy cycle should write rollback entries"
);
}
#[test]
fn rollback_log_captures_consolidation() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"test-ns",
"Memory A",
"test content aaaa bbbb cccc aaaa bbbb",
Tier::Mid,
);
let b = sample_mem(
"b",
"test-ns",
"Memory B",
"test content aaaa bbbb cccc aaaa bbbb",
Tier::Mid,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let llm = StubLlm::new("consolidated");
let cluster = vec![a.clone(), b.clone()];
let entry = consolidate_cluster(&conn, &llm, &cluster, false)
.unwrap()
.expect("rollback entry");
persist_rollback_entry(&conn, &entry).unwrap();
let log = db::list(
&conn,
Some("_curator/rollback"),
None,
100,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert_eq!(log.len(), 1);
assert!(log[0].content.contains("consolidate"));
}
#[test]
fn priority_feedback_adjusts_memory() {
let (_tmp, conn) = setup_conn();
let mut mem = sample_mem("id", "ns", "Title", "content", Tier::Mid);
mem.priority = 5;
mem.access_count = 100;
mem.last_accessed_at = Some(chrono::Utc::now().to_rfc3339());
db::insert(&conn, &mem).unwrap();
let entry = apply_priority_feedback(&conn, &mem, false)
.unwrap()
.expect("priority feedback should produce entry");
match entry {
RollbackEntry::PriorityAdjust {
memory_id,
before,
after,
} => {
assert_eq!(memory_id, "id");
assert_eq!(before, 5);
assert!(after > before, "high access should increase priority");
}
_ => panic!("expected PriorityAdjust"),
}
}
#[test]
fn dry_run_autonomy_does_not_write() {
let (_tmp, conn) = setup_conn();
let a = sample_mem(
"a",
"test-ns",
"Memory A",
"test content aaaa bbbb cccc aaaa bbbb",
Tier::Mid,
);
let b = sample_mem(
"b",
"test-ns",
"Memory B",
"test content aaaa bbbb cccc aaaa bbbb",
Tier::Mid,
);
db::insert(&conn, &a).unwrap();
db::insert(&conn, &b).unwrap();
let initial_count = db::list(
&conn,
Some("test-ns"),
None,
100,
0,
None,
None,
None,
None,
None,
)
.unwrap()
.len();
let llm = StubLlm::new("consolidated");
let candidates = vec![a, b];
let _report = run_autonomy_passes(&conn, &llm, &candidates, true);
let final_count = db::list(
&conn,
Some("test-ns"),
None,
100,
0,
None,
None,
None,
None,
None,
)
.unwrap()
.len();
assert_eq!(
initial_count, final_count,
"dry-run should not modify database"
);
}
#[test]
fn autonomy_passes_report_aggregates_errors() {
let (_tmp, conn) = setup_conn();
let mem = sample_mem("id", "ns", "Title", "content", Tier::Mid);
let llm = StubLlm::new("summary");
let candidates = vec![mem];
let report = run_autonomy_passes(&conn, &llm, &candidates, false);
assert!(report.clusters_formed > 0 || report.clusters_formed == 0);
}
#[test]
fn reverse_priority_adjust_restores_before_value() {
let (_tmp, conn) = setup_conn();
let mut mem = sample_mem("pa-id", "ns", "Title", "content", Tier::Mid);
mem.priority = 7;
db::insert(&conn, &mem).unwrap();
db::update(
&conn,
&mem.id,
None,
None,
None,
None,
None,
Some(9),
None,
None,
None,
)
.unwrap();
assert_eq!(db::get(&conn, &mem.id).unwrap().unwrap().priority, 9);
let entry = RollbackEntry::PriorityAdjust {
memory_id: mem.id.clone(),
before: 7,
after: 9,
};
let applied = reverse_rollback_entry(&conn, &entry).unwrap();
assert!(applied);
assert_eq!(db::get(&conn, &mem.id).unwrap().unwrap().priority, 7);
}
#[test]
fn reverse_forget_restores_snapshot() {
let (_tmp, conn) = setup_conn();
let mem = sample_mem(
"forget-id",
"factual",
"Snapshot",
"saved content body abc",
Tier::Long,
);
db::insert(&conn, &mem).unwrap();
db::delete(&conn, &mem.id).unwrap();
assert!(db::get(&conn, &mem.id).unwrap().is_none());
let entry = RollbackEntry::Forget {
snapshot: mem.clone(),
};
let applied = reverse_rollback_entry(&conn, &entry).unwrap();
assert!(applied);
let restored = db::get(&conn, &mem.id).unwrap().expect("snapshot restored");
assert_eq!(restored.title, "Snapshot");
assert_eq!(restored.namespace, "factual");
}
#[test]
fn reverse_consolidate_collision_aborts() {
let (_tmp, conn) = setup_conn();
let original = sample_mem(
"o1",
"app",
"Deploy plan",
"kubernetes rolling deploy canary",
Tier::Long,
);
let merged_id = "merged".to_string();
let entry = RollbackEntry::Consolidate {
originals: vec![original.clone()],
result_id: merged_id.clone(),
};
let collider = sample_mem(
"collider-id",
"app",
"Deploy plan",
"different content here entirely",
Tier::Long,
);
db::insert(&conn, &collider).unwrap();
let err = reverse_rollback_entry(&conn, &entry).expect_err("collision must abort");
let msg = format!("{err}");
assert!(msg.contains("rollback aborted"), "unexpected msg: {msg}");
assert!(db::get(&conn, "collider-id").unwrap().is_some());
}
#[test]
fn consolidate_cluster_returns_none_for_singleton() {
let (_tmp, conn) = setup_conn();
let llm = StubLlm::new("never called");
let solo = sample_mem("a", "ns", "T", "content body word word", Tier::Mid);
let result = consolidate_cluster(&conn, &llm, std::slice::from_ref(&solo), false).unwrap();
assert!(result.is_none());
}
#[test]
fn consolidate_cluster_skips_reserved_namespace_defensive() {
let (_tmp, conn) = setup_conn();
let llm = StubLlm::new("never called");
let a = sample_mem("a", "_curator/rollback", "T1", "abc abc abc abc", Tier::Mid);
let b = sample_mem("b", "_curator/rollback", "T2", "abc abc abc abc", Tier::Mid);
let result = consolidate_cluster(&conn, &llm, &[a, b], false).unwrap();
assert!(
result.is_none(),
"reserved-namespace cluster must be skipped"
);
}
#[test]
fn forget_if_superseded_dry_run_returns_entry_without_delete() {
let (_tmp, conn) = setup_conn();
let mut older = sample_mem("old", "facts", "fact v1", "the sky is green", Tier::Long);
older.metadata["confirmed_contradictions"] = serde_json::json!(["new"]);
older.updated_at = (chrono::Utc::now() - chrono::Duration::days(30)).to_rfc3339();
let newer = sample_mem("new", "facts", "fact v2", "the sky is blue", Tier::Long);
db::insert(&conn, &older).unwrap();
db::insert(&conn, &newer).unwrap();
let result = forget_if_superseded(&conn, &older, &[older.clone(), newer], true).unwrap();
match result {
Some(RollbackEntry::Forget { snapshot }) => {
assert_eq!(snapshot.id, "old");
}
_ => panic!("expected Forget entry from dry-run forget"),
}
assert!(db::get(&conn, "old").unwrap().is_some());
}
#[test]
fn forget_if_superseded_skips_non_string_contradiction_ids() {
let (_tmp, conn) = setup_conn();
let mut mem = sample_mem("m", "facts", "T", "content body word", Tier::Mid);
mem.metadata["confirmed_contradictions"] = serde_json::json!([42, "missing-id"]);
let result = forget_if_superseded(&conn, &mem, std::slice::from_ref(&mem), false).unwrap();
assert!(result.is_none());
}
#[test]
fn stub_llm_auto_tag_and_detect_contradiction() {
let llm = StubLlm::new("summary");
let tags = AutonomyLlm::auto_tag(&llm, "Some Title", "body").unwrap();
assert_eq!(tags, vec!["auto".to_string(), "stub".to_string()]);
assert!(AutonomyLlm::detect_contradiction(&llm, "this CONTRADICTS that", "ok").unwrap());
assert!(!AutonomyLlm::detect_contradiction(&llm, "ok", "fine").unwrap());
let calls = llm.calls.lock().unwrap();
assert!(calls.iter().any(|c| c.starts_with("auto_tag:")));
assert!(calls.iter().any(|c| c == "detect_contradiction"));
}
#[test]
fn run_autonomy_passes_dry_run_writes_no_changes() {
let (_tmp, conn) = setup_conn();
let m_a = sample_mem(
"ma",
"deploy",
"canary deploy plan",
"kubernetes canary rolling deploy strategy",
Tier::Long,
);
let m_b = sample_mem(
"mb",
"deploy",
"canary deploy overview",
"kubernetes rolling canary deploy strategy",
Tier::Long,
);
let mut m_old = sample_mem(
"mold",
"facts",
"fact v1",
"the sky is green always uniformly",
Tier::Long,
);
m_old.metadata["confirmed_contradictions"] = serde_json::json!(["mnew"]);
m_old.updated_at = (chrono::Utc::now() - chrono::Duration::days(30)).to_rfc3339();
let m_new = sample_mem(
"mnew",
"facts",
"fact v2",
"the sky is blue most of the time",
Tier::Long,
);
let mut m_hot = sample_mem(
"hot",
"ns",
"Hot",
"this is hot content for priority bump",
Tier::Mid,
);
m_hot.priority = 5;
m_hot.access_count = 100;
m_hot.last_accessed_at = Some(chrono::Utc::now().to_rfc3339());
for m in [&m_a, &m_b, &m_old, &m_new, &m_hot] {
db::insert(&conn, m).unwrap();
}
let candidates = vec![
m_a.clone(),
m_b.clone(),
m_old.clone(),
m_new.clone(),
m_hot.clone(),
];
let pre_priority = db::get(&conn, &m_hot.id).unwrap().unwrap().priority;
assert!(db::get(&conn, "mold").unwrap().is_some());
let llm = StubLlm::new("dry-run summary");
let report = run_autonomy_passes(&conn, &llm, &candidates, true);
assert!(report.clusters_formed >= 1);
let log = db::list(
&conn,
Some("_curator/rollback"),
None,
100,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert!(log.is_empty(), "dry-run must not persist rollback memories");
assert_eq!(
db::get(&conn, &m_hot.id).unwrap().unwrap().priority,
pre_priority
);
assert!(db::get(&conn, "mold").unwrap().is_some());
assert!(db::get(&conn, "ma").unwrap().is_some());
}
#[test]
fn consolidation_cluster_respects_max_size_cap() {
let n = CONSOLIDATE_MAX_CLUSTER_SIZE + 4;
let mut candidates: Vec<Memory> = Vec::with_capacity(n);
for i in 0..n {
candidates.push(sample_mem(
&format!("m{i}"),
"deploy",
&format!("title-{i}"),
"kubernetes rolling canary deploy strategy",
Tier::Long,
));
}
let (_tmp, conn) = setup_conn();
let clusters = find_consolidation_clusters(&conn, &candidates);
assert!(!clusters.is_empty());
for c in &clusters {
assert!(
c.len() <= CONSOLIDATE_MAX_CLUSTER_SIZE,
"cluster size {} exceeded cap {}",
c.len(),
CONSOLIDATE_MAX_CLUSTER_SIZE
);
}
}
#[test]
fn priority_feedback_decrements_cold_old_memory() {
let (_tmp, conn) = setup_conn();
let mut mem = sample_mem(
"cold-id",
"ns",
"Cold",
"content body content body",
Tier::Mid,
);
mem.priority = 5;
mem.access_count = 0;
mem.created_at = (chrono::Utc::now() - chrono::Duration::days(60)).to_rfc3339();
db::insert(&conn, &mem).unwrap();
let entry = apply_priority_feedback(&conn, &mem, false)
.unwrap()
.expect("cold memory must produce a -1 adjustment");
match entry {
RollbackEntry::PriorityAdjust {
memory_id,
before,
after,
} => {
assert_eq!(memory_id, "cold-id");
assert_eq!(before, 5);
assert_eq!(after, 4);
}
_ => panic!("expected PriorityAdjust"),
}
}
}