#[cfg(any(feature = "sal", test))]
use crate::models::ConfidenceSource;
#[cfg(feature = "sal")]
use anyhow::Result;
#[cfg(feature = "sal")]
use crate::autonomy::AutonomyLlm;
#[cfg(feature = "sal")]
use crate::embeddings::Embedder;
#[cfg(not(feature = "sal"))]
use crate::models::Tier;
#[cfg(feature = "sal")]
use crate::models::{Memory, Tier};
#[cfg(feature = "sal")]
use crate::store::{CallerContext, MemoryStore, StoreError};
#[cfg(test)]
use crate::hooks::events::HookEvent;
#[cfg(feature = "sal")]
use super::cluster::{CosineClustering, JaccardClustering};
#[cfg(feature = "sal")]
use super::pipeline::MemoryId;
#[cfg(feature = "sal")]
const CONSOLIDATOR_AGENT_ID: &str = crate::identity::sentinels::AI_CURATOR;
#[allow(dead_code)]
#[cfg(feature = "sal")]
pub(crate) struct ConsolidationPass<'a> {
pub(crate) store: &'a dyn MemoryStore,
pub(crate) ctx: CallerContext,
pub(crate) llm: &'a dyn AutonomyLlm,
pub(crate) embedder: Option<Embedder>,
pub(crate) dry_run: bool,
}
#[cfg(feature = "sal")]
#[allow(dead_code)]
impl<'a> ConsolidationPass<'a> {
#[allow(dead_code)]
pub(crate) fn new(
store: &'a dyn MemoryStore,
llm: &'a dyn AutonomyLlm,
embedder: Option<Embedder>,
dry_run: bool,
) -> Self {
Self {
store,
ctx: CallerContext::for_admin(CONSOLIDATOR_AGENT_ID),
llm,
embedder,
dry_run,
}
}
fn name(&self) -> &str {
"consolidation"
}
fn cluster(&self, memories: &[Memory]) -> Vec<Vec<MemoryId>> {
let cosine = CosineClustering::new(self.embedder.clone());
let cosine_clusters = cosine.cluster_memories(memories);
if !cosine_clusters.is_empty() {
return cosine_clusters;
}
let jaccard = JaccardClustering::default();
jaccard.cluster_memories(memories)
}
fn eligible(&self, cluster: &[Memory]) -> bool {
if cluster.len() < 2 {
return false;
}
let ns = &cluster[0].namespace;
if ns.starts_with('_') {
return false;
}
cluster.iter().all(|m| &m.namespace == ns)
}
fn summarize(&self, cluster: &[Memory]) -> Result<Memory> {
if cluster.is_empty() {
anyhow::bail!("summarize called on empty cluster");
}
let input: Vec<(String, String)> = cluster
.iter()
.map(|m| (m.title.clone(), m.content.clone()))
.collect();
let summary_text = self.llm.summarize_memories(&input)?;
let base_title = cluster
.iter()
.map(|m| m.title.as_str())
.next()
.unwrap_or("(consolidated)");
let title = format!("[consolidated] {base_title}");
let tier = cluster
.iter()
.map(|m| m.tier.clone())
.max_by_key(tier_rank)
.unwrap_or(Tier::Mid);
let priority = cluster.iter().map(|m| m.priority).max().unwrap_or(5);
let now = chrono::Utc::now().to_rfc3339();
Ok(Memory {
id: uuid::Uuid::new_v4().to_string(),
tier,
namespace: cluster[0].namespace.clone(),
title,
content: summary_text,
tags: vec![],
priority,
confidence: 1.0,
source: "ai-memory curator (compaction)".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
})
}
async fn persist(&self, summary: &Memory, sources: &[MemoryId]) -> Result<()> {
if self.dry_run || sources.is_empty() {
return Ok(());
}
self.store
.consolidate(
&self.ctx,
sources,
&summary.title,
&summary.content,
&summary.namespace,
&summary.tier,
&summary.source,
CONSOLIDATOR_AGENT_ID,
)
.await
.map_err(|e| anyhow::anyhow!(e))?;
Ok(())
}
async fn verify(&self, summary_id: MemoryId) -> Result<()> {
match self.store.get(&self.ctx, &summary_id).await {
Ok(_) => Ok(()),
Err(StoreError::NotFound { .. }) => anyhow::bail!(
"verify: consolidated summary {} not found in DB",
summary_id
),
Err(e) => Err(anyhow::anyhow!(e)),
}
}
}
#[cfg_attr(not(feature = "sal"), allow(dead_code))]
fn tier_rank(t: &Tier) -> u8 {
match t {
Tier::Short => 0,
Tier::Mid => 1,
Tier::Long => 2,
}
}
#[cfg(test)]
pub(super) fn fire_pre_compaction_hook(_event: HookEvent) -> bool {
true
}
#[cfg(test)]
pub(super) fn is_pre_compaction(event: HookEvent) -> bool {
matches!(event, HookEvent::PreCompaction)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::hooks::events::HookEvent;
use crate::models::{Memory, Tier};
fn make_memory(id: &str, ns: &str, content: &str) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: id.to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: format!("title-{id}"),
content: content.to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[test]
fn eligible_rejects_single_member_cluster() {
let m = make_memory("a", "ns", "content");
let cluster = vec![m];
let result = cluster.len() >= 2
&& !cluster[0].namespace.starts_with('_')
&& cluster
.iter()
.all(|m2| m2.namespace == cluster[0].namespace);
assert!(!result, "singleton cluster must not be eligible");
}
#[test]
fn eligible_rejects_reserved_namespace() {
let m1 = make_memory("a", "_curator", "content a");
let m2 = make_memory("b", "_curator", "content b");
let cluster = vec![m1, m2];
let result = cluster.len() >= 2
&& !cluster[0].namespace.starts_with('_')
&& cluster
.iter()
.all(|m2| m2.namespace == cluster[0].namespace);
assert!(!result, "reserved namespace must not be eligible");
}
#[test]
fn eligible_rejects_mixed_namespace_cluster() {
let m1 = make_memory("a", "ns1", "content a");
let m2 = make_memory("b", "ns2", "content b");
let cluster = vec![m1, m2];
let result = cluster.len() >= 2
&& !cluster[0].namespace.starts_with('_')
&& cluster
.iter()
.all(|m2| m2.namespace == cluster[0].namespace);
assert!(!result, "mixed-namespace cluster must not be eligible");
}
#[test]
fn pre_compaction_event_constant_is_correct() {
assert!(is_pre_compaction(HookEvent::PreCompaction));
assert!(!is_pre_compaction(HookEvent::OnCompactionRollback));
assert!(!is_pre_compaction(HookEvent::PreStore));
}
#[test]
fn fire_pre_compaction_hook_passes_through_allow() {
assert!(fire_pre_compaction_hook(HookEvent::PreCompaction));
}
#[test]
fn on_compaction_rollback_is_not_pre_event() {
assert!(!crate::hooks::decision::is_pre_event(
HookEvent::OnCompactionRollback
));
}
#[test]
fn tier_rank_ordering() {
assert!(tier_rank(&Tier::Short) < tier_rank(&Tier::Mid));
assert!(tier_rank(&Tier::Mid) < tier_rank(&Tier::Long));
}
#[cfg(feature = "sal")]
mod sal_pass_tests {
use super::*;
use crate::autonomy::AutonomyLlm;
use anyhow::Result;
use std::sync::Mutex;
struct StubLlm {
summary: String,
calls: Mutex<Vec<String>>,
}
impl StubLlm {
fn new(summary: &str) -> Self {
Self {
summary: summary.to_string(),
calls: Mutex::new(Vec::new()),
}
}
}
impl AutonomyLlm for StubLlm {
fn auto_tag(&self, _title: &str, _content: &str) -> Result<Vec<String>> {
Ok(vec![])
}
fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
Ok(false)
}
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
self.calls
.lock()
.unwrap()
.push(format!("summarize:{}", memories.len()));
Ok(self.summary.clone())
}
}
use crate::store::sqlite::SqliteStore;
fn open_db() -> (SqliteStore, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("test.db");
let store = SqliteStore::open(&path).expect("SqliteStore::open");
(store, dir)
}
fn conn_of(store: &SqliteStore) -> rusqlite::Connection {
crate::db::open(store.path()).expect("db::open at store path")
}
fn make_memory_full(
id: &str,
ns: &str,
title: &str,
content: &str,
tier: Tier,
priority: i32,
) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: id.to_string(),
tier,
namespace: ns.to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec![],
priority,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[test]
fn pass_name_is_consolidation() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
assert_eq!(pass.name(), "consolidation");
}
#[test]
fn cluster_via_jaccard_fallback_returns_clusters() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m1 = make_memory_full(
"a",
"ns",
"t",
"kubernetes rolling canary deploy strategy",
Tier::Mid,
5,
);
let m2 = make_memory_full(
"b",
"ns",
"t",
"kubernetes rolling canary deploy strategy",
Tier::Mid,
5,
);
let clusters = pass.cluster(&[m1, m2]);
assert_eq!(clusters.len(), 1);
assert_eq!(clusters[0].len(), 2);
}
#[test]
fn cluster_empty_returns_no_groups() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let clusters = pass.cluster(&[]);
assert!(clusters.is_empty());
}
#[test]
fn eligible_accepts_valid_cluster() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m1 = make_memory_full("a", "ns", "t1", "c1", Tier::Long, 5);
let m2 = make_memory_full("b", "ns", "t2", "c2", Tier::Long, 5);
assert!(pass.eligible(&[m1, m2]));
}
#[test]
fn eligible_rejects_singleton_via_pass() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m = make_memory_full("a", "ns", "t", "c", Tier::Long, 5);
assert!(!pass.eligible(&[m]));
}
#[test]
fn eligible_rejects_reserved_namespace_via_pass() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m1 = make_memory_full("a", "_curator", "t", "c", Tier::Long, 5);
let m2 = make_memory_full("b", "_curator", "t", "c", Tier::Long, 5);
assert!(!pass.eligible(&[m1, m2]));
}
#[test]
fn eligible_rejects_mixed_ns_via_pass() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m1 = make_memory_full("a", "ns1", "t", "c", Tier::Long, 5);
let m2 = make_memory_full("b", "ns2", "t", "c", Tier::Long, 5);
assert!(!pass.eligible(&[m1, m2]));
}
#[test]
fn summarize_returns_consolidated_memory() {
let (store, _dir) = open_db();
let llm = StubLlm::new("synthesised summary");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m1 = make_memory_full("a", "ns", "First", "c1", Tier::Mid, 3);
let m2 = make_memory_full("b", "ns", "Second", "c2", Tier::Long, 7);
let summary = pass.summarize(&[m1, m2]).unwrap();
assert!(summary.title.starts_with("[consolidated]"));
assert_eq!(summary.namespace, "ns");
assert_eq!(summary.content, "synthesised summary");
assert_eq!(summary.tier, Tier::Long); assert_eq!(summary.priority, 7); assert_eq!(summary.source, "ai-memory curator (compaction)");
}
#[test]
fn summarize_empty_cluster_errors() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let err = pass.summarize(&[]).unwrap_err().to_string();
assert!(err.contains("empty cluster"));
}
#[tokio::test]
async fn persist_dry_run_is_noop() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, true );
let summary = make_memory_full("s", "ns", "t", "c", Tier::Mid, 5);
pass.persist(&summary, &["x".to_string(), "y".to_string()])
.await
.unwrap();
}
#[tokio::test]
async fn persist_empty_sources_is_noop() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let summary = make_memory_full("s", "ns", "t", "c", Tier::Mid, 5);
pass.persist(&summary, &[]).await.unwrap();
}
#[tokio::test]
async fn persist_writes_consolidated_memory() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("synth");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m1 = make_memory_full(
&uuid::Uuid::new_v4().to_string(),
"ns",
"t1",
"Some keyword content alpha",
Tier::Mid,
5,
);
let m2 = make_memory_full(
&uuid::Uuid::new_v4().to_string(),
"ns",
"t2",
"Some keyword content beta",
Tier::Mid,
5,
);
let id1 = crate::db::insert(&conn, &m1).unwrap();
let id2 = crate::db::insert(&conn, &m2).unwrap();
let summary = make_memory_full(
"s",
"ns",
"[consolidated] title",
"consolidated body",
Tier::Long,
5,
);
pass.persist(&summary, &[id1, id2]).await.unwrap();
let by_title =
crate::db::list(&conn, Some("ns"), None, 16, 0, None, None, None, None, None)
.unwrap();
let titles: Vec<&str> = by_title.iter().map(|m| m.title.as_str()).collect();
assert!(titles.iter().any(|t| t.contains("[consolidated]")));
}
#[tokio::test]
async fn verify_missing_id_returns_error() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let err = pass
.verify("no-such-id".to_string())
.await
.unwrap_err()
.to_string();
assert!(err.contains("not found in DB"));
}
#[tokio::test]
async fn verify_existing_id_ok() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let m = make_memory_full(
&uuid::Uuid::new_v4().to_string(),
"ns",
"t",
"c",
Tier::Mid,
5,
);
let id = crate::db::insert(&conn, &m).unwrap();
pass.verify(id).await.unwrap();
}
#[test]
fn stub_llm_auto_tag_and_contradiction_paths() {
let stub = StubLlm::new("S");
assert!(stub.auto_tag("t", "c").unwrap().is_empty());
assert!(!stub.detect_contradiction("a", "b").unwrap());
}
#[test]
fn cluster_via_cosine_primary_when_embedder_available() {
let Some(embedder) = crate::embeddings::Embedder::new_local().ok() else {
return;
};
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, Some(embedder), false);
let m1 = make_memory_full(
"a",
"ns",
"t",
"kubernetes rolling canary deploy strategy",
Tier::Mid,
5,
);
let m2 = make_memory_full(
"b",
"ns",
"t",
"kubernetes rolling canary deploy strategy",
Tier::Mid,
5,
);
let clusters = pass.cluster(&[m1, m2]);
assert_eq!(clusters.len(), 1);
}
#[tokio::test]
async fn persist_propagates_db_consolidate_failure() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ConsolidationPass::new(&store, &llm, None, false);
let summary = make_memory_full("s", "ns", "[consolidated] x", "c", Tier::Mid, 5);
let res = pass
.persist(
&summary,
&["nope-id-1".to_string(), "nope-id-2".to_string()],
)
.await;
assert!(
res.is_err(),
"expected consolidate to fail on missing sources"
);
}
} }