#![cfg_attr(not(feature = "sal"), allow(dead_code, unused_imports))]
use std::collections::HashSet;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
#[cfg(feature = "sal")]
use anyhow::Context;
use anyhow::Result;
#[cfg(feature = "sal")]
use crate::autonomy::AutonomyLlm;
#[cfg(feature = "sal")]
use crate::identity::keypair::AgentKeypair;
use crate::models::{Memory, MemoryKind, Tier};
#[cfg(feature = "sal")]
use crate::storage::reflect::{ReflectError, ReflectInput};
#[cfg(feature = "sal")]
use crate::store::{CallerContext, Filter, MemoryStore, StoreError};
#[cfg(feature = "sal")]
use super::pipeline::MemoryId;
#[cfg(any(feature = "sal", test))]
use crate::models::ConfidenceSource;
#[cfg(feature = "sal")]
async fn store_get_opt(
store: &dyn MemoryStore,
ctx: &CallerContext,
id: &str,
) -> Result<Option<Memory>> {
match store.get(ctx, id).await {
Ok(mem) => Ok(Some(mem)),
Err(StoreError::NotFound { .. }) => Ok(None),
Err(e) => Err(anyhow::anyhow!(e)),
}
}
#[cfg(feature = "sal")]
fn curator_caller_context(agent_id: &str) -> CallerContext {
CallerContext::for_admin(agent_id)
}
#[cfg(feature = "sal")]
async fn store_list_namespace(
store: &dyn MemoryStore,
ctx: &CallerContext,
namespace: &str,
limit: usize,
) -> Result<Vec<Memory>> {
let filter = Filter {
namespace: Some(namespace.to_string()),
limit,
..Default::default()
};
store
.list(ctx, &filter)
.await
.map_err(|e| anyhow::anyhow!(e))
}
pub(crate) const MIN_CLUSTER_SIZE: usize = 3;
pub(crate) const MAX_CLUSTER_SIZE: usize = 12;
pub(crate) const TEMPORAL_WINDOW_DAYS: i64 = 7;
pub(crate) const REFLECTION_JACCARD_THRESHOLD: f64 = 0.30;
pub(crate) const MIN_RECALL_COUNT: i64 = 1;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReflectionPassConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub max_depth: Option<u32>,
}
impl Default for ReflectionPassConfig {
fn default() -> Self {
Self {
enabled: false,
max_depth: None,
}
}
}
#[cfg(feature = "sal")]
pub(crate) struct ReflectionPass<'a> {
pub(crate) store: &'a dyn MemoryStore,
pub(crate) ctx: CallerContext,
pub(crate) llm: &'a dyn AutonomyLlm,
pub(crate) keypair: Option<&'a AgentKeypair>,
pub(crate) max_depth: Option<u32>,
pub(crate) dry_run: bool,
}
#[cfg(feature = "sal")]
impl<'a> ReflectionPass<'a> {
pub(crate) fn new(
store: &'a dyn MemoryStore,
llm: &'a dyn AutonomyLlm,
keypair: Option<&'a AgentKeypair>,
max_depth: Option<u32>,
dry_run: bool,
) -> Self {
let agent_id = keypair.map_or_else(
|| crate::identity::sentinels::AI_CURATOR.to_string(),
|k| k.agent_id.clone(),
);
Self {
store,
ctx: curator_caller_context(&agent_id),
llm,
keypair,
max_depth,
dry_run,
}
}
fn agent_id(&self) -> String {
self.keypair.map_or_else(
|| crate::identity::sentinels::AI_CURATOR.to_string(),
|k| k.agent_id.clone(),
)
}
#[allow(dead_code)]
fn name(&self) -> &str {
"reflection"
}
fn cluster(&self, memories: &[Memory]) -> Vec<Vec<MemoryId>> {
let mut by_ns: std::collections::HashMap<&str, Vec<&Memory>> =
std::collections::HashMap::new();
for m in memories {
if !is_clusterable_observation(m) {
continue;
}
by_ns.entry(&m.namespace).or_default().push(m);
}
let mut clusters: Vec<Vec<MemoryId>> = Vec::new();
for (_ns, group) in by_ns {
let mut used = vec![false; group.len()];
for i in 0..group.len() {
if used[i] {
continue;
}
let mut cluster = vec![group[i].id.clone()];
used[i] = true;
for j in (i + 1)..group.len() {
if used[j] {
continue;
}
if cluster.len() >= MAX_CLUSTER_SIZE {
break;
}
if pair_co_occurs(group[i], group[j]) {
cluster.push(group[j].id.clone());
used[j] = true;
}
}
if cluster.len() >= MIN_CLUSTER_SIZE {
clusters.push(cluster);
}
}
}
clusters
}
fn eligible(&self, cluster: &[Memory]) -> bool {
if cluster.len() < MIN_CLUSTER_SIZE || cluster.len() > MAX_CLUSTER_SIZE {
return false;
}
let ns = &cluster[0].namespace;
if ns.starts_with('_') {
return false;
}
cluster.iter().all(|m| {
m.memory_kind == MemoryKind::Observation
&& &m.namespace == ns
&& m.access_count >= MIN_RECALL_COUNT
})
}
fn summarize(&self, cluster: &[Memory]) -> Result<Memory> {
if cluster.len() < MIN_CLUSTER_SIZE {
anyhow::bail!(
"summarize: cluster has {} members (< MIN_CLUSTER_SIZE = {})",
cluster.len(),
MIN_CLUSTER_SIZE
);
}
let input: Vec<(String, String)> = cluster
.iter()
.map(|m| (m.title.clone(), m.content.clone()))
.collect();
let summary_text = self
.llm
.summarize_memories(&input)
.context("ReflectionPass::summarize: LLM call failed")?;
let base_title = cluster
.iter()
.map(|m| m.title.as_str())
.next()
.unwrap_or("(reflection)");
let title = format!("[reflection] {base_title}");
let tier = cluster
.iter()
.map(|m| m.tier.clone())
.max_by_key(tier_rank)
.unwrap_or(Tier::Mid);
let priority = cluster.iter().map(|m| m.priority).max().unwrap_or(5);
let now = Utc::now().to_rfc3339();
Ok(Memory {
id: uuid::Uuid::new_v4().to_string(),
tier,
namespace: cluster[0].namespace.clone(),
title,
content: summary_text,
tags: vec![],
priority,
confidence: 1.0,
source: "system".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: MemoryKind::Reflection,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
})
}
async fn persist(&self, summary: &Memory, sources: &[MemoryId]) -> Result<()> {
if self.dry_run || sources.is_empty() {
return Ok(());
}
if let Some(cap) = self.max_depth {
let mut max_src_depth: i32 = 0;
for id in sources {
if let Some(m) = store_get_opt(self.store, &self.ctx, id).await? {
max_src_depth = max_src_depth.max(m.reflection_depth);
}
}
let new_depth =
u32::try_from(max_src_depth.max(0).saturating_add(1)).unwrap_or(u32::MAX);
if new_depth > cap {
anyhow::bail!(
"ReflectionPass::persist: proposed depth {new_depth} exceeds \
curator --max-depth {cap}"
);
}
}
let input = ReflectInput {
source_ids: sources.to_vec(),
title: summary.title.clone(),
content: summary.content.clone(),
namespace: Some(summary.namespace.clone()),
tier: summary.tier.clone(),
tags: summary.tags.clone(),
priority: summary.priority,
confidence: summary.confidence,
source: summary.source.clone(),
agent_id: self.agent_id(),
metadata: summary.metadata.clone(),
};
match self.store.reflect(&self.ctx, &input, self.keypair).await {
Ok(_outcome) => Ok(()),
Err(ReflectError::DepthExceeded {
attempted,
cap,
namespace,
}) => {
anyhow::bail!(
"ReflectionPass::persist: substrate refused — proposed depth \
{attempted} exceeds namespace cap {cap} in '{namespace}'"
)
}
Err(other) => Err(anyhow::anyhow!(other.to_string())),
}
}
#[allow(dead_code)]
async fn verify(&self, summary_id: MemoryId) -> Result<()> {
let mem = store_get_opt(self.store, &self.ctx, &summary_id)
.await
.context("ReflectionPass::verify: store.get failed")?;
let mem = mem
.ok_or_else(|| anyhow::anyhow!("verify: reflection {} not found in DB", summary_id))?;
if mem.memory_kind != MemoryKind::Reflection {
anyhow::bail!(
"verify: memory {} is {:?}, expected Reflection",
summary_id,
mem.memory_kind
);
}
let links = self
.store
.get_links_for_anchor(&summary_id)
.await
.map_err(|e| anyhow::anyhow!(e))
.context("ReflectionPass::verify: store.get_links_for_anchor failed")?;
let mut saw_reflects_on = false;
for link in &links {
if link.source_id != summary_id {
continue;
}
if link.relation != crate::models::MemoryLinkRelation::ReflectsOn {
continue;
}
saw_reflects_on = true;
let target = store_get_opt(self.store, &self.ctx, &link.target_id).await?;
if target.is_none() {
anyhow::bail!(
"verify: reflects_on edge target {} not found",
link.target_id
);
}
}
if !saw_reflects_on {
anyhow::bail!("verify: reflection {} has no reflects_on edge", summary_id);
}
Ok(())
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ReflectionPassReport {
pub started_at: String,
pub completed_at: String,
pub namespaces_visited: usize,
pub observations_scanned: usize,
pub clusters_formed: usize,
pub clusters_eligible: usize,
pub reflections_persisted: usize,
pub depth_refusals: usize,
pub errors: Vec<String>,
#[serde(default)]
pub dry_run_proposals: Vec<DryRunProposal>,
pub dry_run: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DryRunProposal {
pub namespace: String,
pub proposed_title: String,
pub source_ids: Vec<String>,
}
#[cfg(feature = "sal")]
pub async fn run_reflection_pass(
store: &dyn MemoryStore,
llm: &dyn AutonomyLlm,
keypair: Option<&AgentKeypair>,
namespace: Option<&str>,
max_depth: Option<u32>,
dry_run: bool,
enabled_check: impl Fn(&str) -> bool,
) -> Result<ReflectionPassReport> {
let mut report = ReflectionPassReport {
started_at: Utc::now().to_rfc3339(),
dry_run,
..Default::default()
};
let pass = ReflectionPass::new(store, llm, keypair, max_depth, dry_run);
let namespaces: Vec<String> = match namespace {
Some(ns) => vec![ns.to_string()],
None => {
let counts = store
.list_namespaces()
.await
.map_err(|e| anyhow::anyhow!(e))
.context("run_reflection_pass: list_namespaces failed")?;
counts
.into_iter()
.map(|nc| nc.namespace)
.filter(|ns| !ns.starts_with('_'))
.collect()
}
};
report.namespaces_visited = namespaces.len();
for ns in &namespaces {
if !enabled_check(ns) {
continue;
}
let candidates = match store_list_namespace(
store,
&pass.ctx,
ns.as_str(),
MAX_CLUSTER_SIZE * 16,
)
.await
{
Ok(v) => v,
Err(e) => {
report
.errors
.push(format!("namespace '{ns}': store.list failed: {e}"));
continue;
}
};
let scanned_here = candidates.len();
report.observations_scanned += scanned_here;
let clusters = pass.cluster(&candidates);
report.clusters_formed += clusters.len();
for cluster_ids in clusters {
let mut cluster: Vec<Memory> = cluster_ids
.iter()
.filter_map(|id| candidates.iter().find(|m| &m.id == id).cloned())
.collect();
if !pass.eligible(&cluster) {
continue;
}
report.clusters_eligible += 1;
cluster.sort_by(|a, b| a.id.cmp(&b.id));
let summary = match pass.summarize(&cluster) {
Ok(s) => s,
Err(e) => {
report
.errors
.push(format!("namespace '{ns}': summarize failed: {e}"));
continue;
}
};
let source_ids: Vec<String> = cluster.iter().map(|m| m.id.clone()).collect();
if dry_run {
report.dry_run_proposals.push(DryRunProposal {
namespace: ns.clone(),
proposed_title: summary.title.clone(),
source_ids: source_ids.clone(),
});
continue;
}
match pass.persist(&summary, &source_ids).await {
Ok(()) => {
report.reflections_persisted += 1;
if let Err(e) = verify_recent(store, &pass.ctx, ns, &source_ids).await {
report
.errors
.push(format!("namespace '{ns}': verify failed: {e}"));
}
}
Err(e) => {
let msg = e.to_string();
if msg.contains("exceeds") && msg.contains("depth") {
report.depth_refusals += 1;
} else {
report
.errors
.push(format!("namespace '{ns}': persist failed: {e}"));
}
}
}
}
}
report.completed_at = Utc::now().to_rfc3339();
Ok(report)
}
#[cfg(feature = "sal")]
async fn verify_recent(
store: &dyn MemoryStore,
ctx: &CallerContext,
namespace: &str,
source_ids: &[String],
) -> Result<()> {
let candidates = store_list_namespace(store, ctx, namespace, 16)
.await
.context("verify_recent: store.list failed")?;
let target_set: HashSet<&str> = source_ids.iter().map(String::as_str).collect();
for cand in candidates
.iter()
.filter(|m| m.memory_kind == MemoryKind::Reflection)
{
let links = store
.get_links_for_anchor(&cand.id)
.await
.map_err(|e| anyhow::anyhow!(e))?;
let outbound: HashSet<&str> = links
.iter()
.filter(|l| {
l.source_id == cand.id
&& l.relation == crate::models::MemoryLinkRelation::ReflectsOn
})
.map(|l| l.target_id.as_str())
.collect();
if outbound == target_set {
return Ok(());
}
}
anyhow::bail!(
"verify_recent: no Reflection in namespace '{namespace}' carries the \
expected reflects_on edge set"
)
}
fn tier_rank(t: &Tier) -> u8 {
match t {
Tier::Short => 0,
Tier::Mid => 1,
Tier::Long => 2,
}
}
fn is_clusterable_observation(m: &Memory) -> bool {
m.memory_kind == MemoryKind::Observation
&& !m.namespace.starts_with('_')
&& m.access_count >= MIN_RECALL_COUNT
}
fn pair_co_occurs(a: &Memory, b: &Memory) -> bool {
if a.namespace != b.namespace {
return false;
}
if let (Some(ta), Some(tb)) = (parse_rfc3339(&a.created_at), parse_rfc3339(&b.created_at)) {
let delta = (ta - tb).num_days().abs();
if delta > TEMPORAL_WINDOW_DAYS {
return false;
}
}
jaccard_similarity(&a.content, &b.content) >= REFLECTION_JACCARD_THRESHOLD
}
fn parse_rfc3339(s: &str) -> Option<DateTime<Utc>> {
DateTime::parse_from_rfc3339(s)
.ok()
.map(|d| d.with_timezone(&Utc))
}
fn jaccard_similarity(a: &str, b: &str) -> f64 {
let tokens = |s: &str| -> HashSet<String> {
s.split(|c: char| !c.is_alphanumeric())
.filter(|t| t.len() >= 3)
.map(str::to_lowercase)
.collect()
};
let ta = tokens(a);
let tb = tokens(b);
if ta.is_empty() && tb.is_empty() {
return 0.0;
}
let inter = ta.intersection(&tb).count();
let union = ta.union(&tb).count();
if union == 0 {
0.0
} else {
#[allow(clippy::cast_precision_loss)]
let result = inter as f64 / union as f64;
result
}
}
#[cfg(test)]
pub(crate) fn temporal_window_seconds() -> i64 {
chrono::Duration::days(TEMPORAL_WINDOW_DAYS).num_seconds()
}
#[cfg(all(test, feature = "sal"))]
mod tests {
use super::*;
use crate::models::{Memory, MemoryKind, Tier};
use anyhow::Result;
use chrono::Duration;
use std::sync::Mutex;
pub(super) struct StubLlm {
pub(super) summary: String,
pub(super) calls: Mutex<Vec<String>>,
}
impl StubLlm {
pub(super) fn new(summary: &str) -> Self {
Self {
summary: summary.to_string(),
calls: Mutex::new(Vec::new()),
}
}
}
impl AutonomyLlm for StubLlm {
fn auto_tag(&self, _title: &str, _content: &str) -> Result<Vec<String>> {
Ok(vec![])
}
fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
Ok(false)
}
fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
self.calls
.lock()
.unwrap()
.push(format!("summarize:{}", memories.len()));
Ok(self.summary.clone())
}
}
fn make_obs(id: &str, ns: &str, title: &str, content: &str, access: i64) -> Memory {
let now = Utc::now().to_rfc3339();
Memory {
id: id.to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: access,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[test]
fn eligible_rejects_below_min_cluster_size() {
let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
.map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
.collect();
let result = cluster.len() >= MIN_CLUSTER_SIZE
&& cluster.len() <= MAX_CLUSTER_SIZE
&& !cluster[0].namespace.starts_with('_')
&& cluster.iter().all(|m| {
m.memory_kind == MemoryKind::Observation
&& m.namespace == cluster[0].namespace
&& m.access_count >= MIN_RECALL_COUNT
});
assert!(!result, "below-MIN cluster must not be eligible");
}
#[test]
fn eligible_rejects_reflection_kind_member() {
let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
.map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
.collect();
cluster[0].memory_kind = MemoryKind::Reflection;
let result = cluster
.iter()
.all(|m| m.memory_kind == MemoryKind::Observation);
assert!(!result, "mixed-kind cluster must not be eligible");
}
#[test]
fn eligible_rejects_internal_namespace() {
let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
.map(|i| make_obs(&format!("m{i}"), "_curator", "t", "kubernetes deploy", 1))
.collect();
let result = !cluster[0].namespace.starts_with('_');
assert!(!result, "internal-namespace cluster must not be eligible");
}
#[test]
fn cluster_groups_three_co_occurring_observations() {
let m1 = make_obs("a", "ns", "t1", "kubernetes rolling deploy strategy", 2);
let m2 = make_obs("b", "ns", "t2", "kubernetes deploy canary strategy", 3);
let m3 = make_obs("c", "ns", "t3", "kubernetes rolling deploy approach", 1);
let obs = [m1.clone(), m2.clone(), m3.clone()];
let pairs = [
pair_co_occurs(&m1, &m2),
pair_co_occurs(&m1, &m3),
pair_co_occurs(&m2, &m3),
];
assert!(
pairs.iter().all(|p| *p),
"all three pairs must co-occur, got {pairs:?}"
);
assert_eq!(obs.len(), MIN_CLUSTER_SIZE);
}
#[test]
fn cluster_skips_observations_with_zero_access_count() {
let cold = make_obs("cold", "ns", "t", "kubernetes deploy", 0);
assert!(!is_clusterable_observation(&cold));
}
#[test]
fn pair_co_occurs_rejects_cross_namespace() {
let a = make_obs("a", "ns1", "t", "shared content tokens", 1);
let b = make_obs("b", "ns2", "t", "shared content tokens", 1);
assert!(!pair_co_occurs(&a, &b));
}
#[test]
fn pair_co_occurs_respects_temporal_window() {
let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
let now = Utc::now();
a.created_at = now.to_rfc3339();
b.created_at = (now - Duration::days(TEMPORAL_WINDOW_DAYS + 2)).to_rfc3339();
assert!(
!pair_co_occurs(&a, &b),
"outside-window pair must not co-occur"
);
}
#[test]
fn pair_co_occurs_below_jaccard_threshold_is_false() {
let a = make_obs("a", "ns", "t", "kubernetes deploy strategy", 1);
let b = make_obs(
"b",
"ns",
"t",
"completely unrelated quantum mechanics text",
1,
);
assert!(!pair_co_occurs(&a, &b));
}
#[test]
fn jaccard_similarity_is_symmetric() {
let a = "kubernetes rolling deploy canary";
let b = "kubernetes canary rolling deploy strategy";
let sim_ab = jaccard_similarity(a, b);
let sim_ba = jaccard_similarity(b, a);
assert!((sim_ab - sim_ba).abs() < 1e-9);
}
#[test]
fn jaccard_similarity_empty_strings_zero() {
assert_eq!(jaccard_similarity("", ""), 0.0);
}
#[test]
fn temporal_window_is_7_days() {
assert_eq!(temporal_window_seconds(), crate::SECS_PER_WEEK);
}
#[test]
fn config_default_is_disabled() {
let cfg = ReflectionPassConfig::default();
assert!(!cfg.enabled);
assert!(cfg.max_depth.is_none());
}
#[test]
fn config_round_trips_json() {
let cfg = ReflectionPassConfig {
enabled: true,
max_depth: Some(2),
};
let json = serde_json::to_string(&cfg).unwrap();
let back: ReflectionPassConfig = serde_json::from_str(&json).unwrap();
assert_eq!(cfg, back);
}
#[test]
fn stub_llm_records_calls() {
let stub = StubLlm::new("synthesised pattern");
let out = stub
.summarize_memories(&[("t1".into(), "c1".into()), ("t2".into(), "c2".into())])
.unwrap();
assert_eq!(out, "synthesised pattern");
let calls = stub.calls.lock().unwrap();
assert_eq!(calls.len(), 1);
assert!(calls[0].starts_with("summarize:"));
}
#[test]
fn report_serialises_to_json() {
let r = ReflectionPassReport {
started_at: "2026-01-01T00:00:00Z".into(),
completed_at: "2026-01-01T00:00:01Z".into(),
namespaces_visited: 1,
observations_scanned: 30,
clusters_formed: 3,
clusters_eligible: 3,
reflections_persisted: 3,
depth_refusals: 0,
errors: vec![],
dry_run_proposals: vec![],
dry_run: false,
};
let json = serde_json::to_string(&r).unwrap();
assert!(json.contains("reflections_persisted"));
assert!(json.contains("clusters_eligible"));
let back: ReflectionPassReport = serde_json::from_str(&json).unwrap();
assert_eq!(back.observations_scanned, 30);
}
#[cfg(feature = "sal")]
mod sal_pass_tests {
use super::*;
use crate::store::sqlite::SqliteStore;
fn open_db() -> (SqliteStore, tempfile::TempDir) {
let dir = tempfile::tempdir().expect("tempdir");
let path = dir.path().join("test.db");
let store = SqliteStore::open(&path).expect("SqliteStore::open");
(store, dir)
}
fn conn_of(store: &SqliteStore) -> rusqlite::Connection {
crate::db::open(store.path()).expect("db::open at store path")
}
fn insert_observation(
conn: &rusqlite::Connection,
ns: &str,
title: &str,
content: &str,
access_count: i64,
) -> String {
let now = chrono::Utc::now().to_rfc3339();
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let mem = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
crate::db::insert(conn, &mem).unwrap()
}
#[test]
fn pass_name_is_reflection() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
assert_eq!(pass.name(), "reflection");
}
#[test]
fn agent_id_falls_back_to_ai_curator_without_keypair() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
assert_eq!(pass.agent_id(), "ai:curator");
}
#[test]
fn agent_id_uses_keypair_when_provided() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
use ed25519_dalek::{SigningKey, VerifyingKey};
let mut rng = rand_core::OsRng;
let sk = SigningKey::generate(&mut rng);
let vk: VerifyingKey = (&sk).into();
let kp = AgentKeypair {
agent_id: "test:agent-x".to_string(),
public: vk,
private: Some(sk),
};
let pass = ReflectionPass::new(&store, &llm, Some(&kp), None, false);
assert_eq!(pass.agent_id(), "test:agent-x");
}
#[test]
fn cluster_excludes_zero_access_observations() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let m1 = make_obs("a", "ns", "t", "shared keyword tokens here", 0); let m2 = make_obs("b", "ns", "t", "shared keyword tokens here", 5);
let m3 = make_obs("c", "ns", "t", "shared keyword tokens here", 5);
let m4 = make_obs("d", "ns", "t", "shared keyword tokens here", 5);
let clusters = pass.cluster(&[m1, m2, m3, m4]);
assert_eq!(clusters.len(), 1);
assert_eq!(clusters[0].len(), 3);
}
#[test]
fn cluster_caps_at_max_cluster_size() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let mems: Vec<Memory> = (0..15)
.map(|i| {
make_obs(
&format!("m{i:02}"),
"ns",
"t",
"shared keyword tokens here pattern",
1,
)
})
.collect();
let clusters = pass.cluster(&mems);
for c in &clusters {
assert!(c.len() <= MAX_CLUSTER_SIZE);
}
}
#[test]
fn eligible_pass_method_accepts_valid() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
.map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
.collect();
assert!(pass.eligible(&cluster));
}
#[test]
fn eligible_pass_method_rejects_oversize() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let cluster: Vec<Memory> = (0..(MAX_CLUSTER_SIZE + 1))
.map(|i| make_obs(&format!("m{i:02}"), "ns", "t", "c", 1))
.collect();
assert!(!pass.eligible(&cluster));
}
#[test]
fn eligible_pass_method_rejects_reflection_member() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
.map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
.collect();
cluster[0].memory_kind = MemoryKind::Reflection;
assert!(!pass.eligible(&cluster));
}
#[test]
fn eligible_pass_method_rejects_zero_access() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
.map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
.collect();
cluster[1].access_count = 0;
assert!(!pass.eligible(&cluster));
}
#[test]
fn summarize_below_min_errors() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
.map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
.collect();
let err = pass.summarize(&cluster).unwrap_err().to_string();
assert!(err.contains("< MIN_CLUSTER_SIZE"));
}
#[test]
fn summarize_returns_reflection_typed_memory() {
let (store, _dir) = open_db();
let llm = StubLlm::new("synth pattern");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
.map(|i| {
let mut m = make_obs(&format!("m{i}"), "ns", "Title-A", "shared content", 2);
m.tier = if i == 0 { Tier::Long } else { Tier::Mid };
m.priority = 5 + i32::try_from(i).unwrap();
m
})
.collect();
let summary = pass.summarize(&cluster).unwrap();
assert_eq!(summary.memory_kind, MemoryKind::Reflection);
assert!(summary.title.starts_with("[reflection]"));
assert_eq!(summary.content, "synth pattern");
assert_eq!(summary.tier, Tier::Long);
assert_eq!(summary.source, "system");
assert_eq!(summary.namespace, "ns");
assert_eq!(
summary.priority,
5 + i32::try_from(MIN_CLUSTER_SIZE - 1).unwrap()
);
}
#[tokio::test]
async fn persist_dry_run_is_noop() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, true);
let summary = make_obs("s", "ns", "[reflection]", "c", 1);
pass.persist(&summary, &["x".to_string()]).await.unwrap();
}
#[tokio::test]
async fn persist_empty_sources_is_noop() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let summary = make_obs("s", "ns", "[reflection]", "c", 1);
pass.persist(&summary, &[]).await.unwrap();
}
#[tokio::test]
async fn persist_refuses_when_max_depth_exceeded() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, Some(1), false);
let mut source = make_obs("src", "ns", "t", "c", 1);
source.reflection_depth = 1;
let src_id = crate::db::insert(&conn_of(&store), &source).unwrap();
let summary = make_obs("s", "ns", "[reflection]", "c", 0);
let err = pass
.persist(&summary, &[src_id])
.await
.unwrap_err()
.to_string();
assert!(err.contains("exceeds"));
assert!(err.contains("--max-depth"));
}
#[tokio::test]
async fn persist_writes_reflection_into_db() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("synthesised pattern");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let s1 = insert_observation(&conn, "app", "T1", "kubernetes deploy strategy notes", 2);
let s2 =
insert_observation(&conn, "app", "T2", "kubernetes rolling deploy approach", 3);
let s3 = insert_observation(&conn, "app", "T3", "kubernetes canary deploy strategy", 1);
let summary = pass
.summarize(&[
crate::db::get(&conn, &s1).unwrap().unwrap(),
crate::db::get(&conn, &s2).unwrap().unwrap(),
crate::db::get(&conn, &s3).unwrap().unwrap(),
])
.unwrap();
pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
.await
.unwrap();
let listed = crate::db::list(
&conn,
Some("app"),
None,
32,
0,
None,
None,
None,
None,
None,
)
.unwrap();
let refl = listed
.iter()
.find(|m| m.memory_kind == MemoryKind::Reflection)
.expect("expected one reflection");
pass.verify(refl.id.clone()).await.unwrap();
}
#[tokio::test]
async fn verify_missing_id_errors() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let err = pass
.verify("no-such".to_string())
.await
.unwrap_err()
.to_string();
assert!(err.contains("not found in DB"));
}
#[tokio::test]
async fn verify_wrong_kind_errors() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let id = insert_observation(&conn, "ns", "T", "c", 1);
let err = pass.verify(id).await.unwrap_err().to_string();
assert!(err.contains("expected Reflection"));
}
#[tokio::test]
async fn verify_reflection_without_edges_errors() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let now = chrono::Utc::now().to_rfc3339();
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let m = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Mid,
namespace: "ns".to_string(),
title: "[reflection] orphan".to_string(),
content: "c".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "system".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 1,
memory_kind: MemoryKind::Reflection,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let id = crate::db::insert(&conn, &m).unwrap();
let err = pass.verify(id).await.unwrap_err().to_string();
assert!(err.contains("no reflects_on edge"));
}
#[tokio::test]
async fn run_reflection_pass_empty_db_dry_run_namespace() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let report =
run_reflection_pass(&store, &llm, None, Some("nope"), None, true, |_| true)
.await
.unwrap();
assert!(report.dry_run);
assert_eq!(report.namespaces_visited, 1);
assert_eq!(report.clusters_formed, 0);
assert_eq!(report.reflections_persisted, 0);
}
#[tokio::test]
async fn run_reflection_pass_all_namespaces_with_disabled_check() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("S");
insert_observation(&conn, "ns1", "t", "shared content tokens here", 2);
let report = run_reflection_pass(&store, &llm, None, None, None, true, |_| false)
.await
.unwrap();
assert_eq!(report.observations_scanned, 0);
}
#[tokio::test]
async fn run_reflection_pass_dry_run_reports_proposals() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("synth");
insert_observation(
&conn,
"app",
"T1",
"kubernetes rolling deploy strategy notes",
2,
);
insert_observation(
&conn,
"app",
"T2",
"kubernetes rolling deploy strategy canary",
3,
);
insert_observation(
&conn,
"app",
"T3",
"kubernetes canary deploy strategy rolling",
1,
);
let report = run_reflection_pass(&store, &llm, None, Some("app"), None, true, |_| true)
.await
.unwrap();
assert!(report.dry_run);
assert!(report.observations_scanned >= 3);
assert!(report.clusters_eligible >= 1);
assert!(!report.dry_run_proposals.is_empty());
assert_eq!(report.reflections_persisted, 0);
}
#[tokio::test]
async fn run_reflection_pass_persists_reflections() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("persisted pattern");
insert_observation(&conn, "app", "T1", "shared keyword token strategy notes", 2);
insert_observation(&conn, "app", "T2", "shared keyword token strategy plan", 3);
insert_observation(
&conn,
"app",
"T3",
"shared keyword token strategy canary",
1,
);
let report =
run_reflection_pass(&store, &llm, None, Some("app"), None, false, |_| true)
.await
.unwrap();
assert_eq!(report.dry_run, false);
assert!(report.reflections_persisted >= 1);
}
#[tokio::test]
async fn run_reflection_pass_depth_refusal_increments_counter() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("synth");
let now = chrono::Utc::now().to_rfc3339();
for i in 0..3 {
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let m = Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: "deep".to_string(),
title: format!("Tdeep-{i}"),
content: "shared keyword token deep strategy".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 2,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 2,
memory_kind: MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
crate::db::insert(&conn, &m).unwrap();
}
let report = run_reflection_pass(
&store,
&llm,
None,
Some("deep"),
Some(2), false,
|_| true,
)
.await
.unwrap();
assert!(report.depth_refusals >= 1);
assert_eq!(report.reflections_persisted, 0);
}
#[test]
fn dry_run_proposal_serialises() {
let p = DryRunProposal {
namespace: "ns".into(),
proposed_title: "[reflection] x".into(),
source_ids: vec!["a".into(), "b".into()],
};
let j = serde_json::to_string(&p).unwrap();
assert!(j.contains("source_ids"));
let back: DryRunProposal = serde_json::from_str(&j).unwrap();
assert_eq!(back.namespace, "ns");
}
#[test]
fn pair_co_occurs_unparseable_timestamps_still_checks_jaccard() {
let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
a.created_at = "not-a-timestamp".to_string();
b.created_at = "also-invalid".to_string();
assert!(pair_co_occurs(&a, &b));
}
#[test]
fn stub_llm_auto_tag_and_contradiction_paths() {
let stub = StubLlm::new("S");
let tags = stub.auto_tag("t", "c").unwrap();
assert!(tags.is_empty());
let conflict = stub.detect_contradiction("a", "b").unwrap();
assert!(!conflict);
}
#[test]
fn eligible_pass_rejects_internal_namespace_directly() {
let (store, _dir) = open_db();
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
.map(|i| make_obs(&format!("m{i}"), "_curator", "t", "c", 1))
.collect();
assert!(!pass.eligible(&cluster));
}
#[test]
fn jaccard_similarity_zero_union_returns_zero() {
let a = "a b c"; let b = "x";
assert_eq!(jaccard_similarity(a, b), 0.0);
}
#[test]
fn tier_rank_all_variants() {
assert_eq!(tier_rank(&Tier::Short), 0);
assert_eq!(tier_rank(&Tier::Mid), 1);
assert_eq!(tier_rank(&Tier::Long), 2);
}
#[test]
fn parse_rfc3339_invalid_returns_none() {
assert!(parse_rfc3339("garbage").is_none());
assert!(parse_rfc3339("2026-01-01T00:00:00Z").is_some());
}
#[tokio::test]
async fn verify_skips_inbound_links() {
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = StubLlm::new("S");
let pass = ReflectionPass::new(&store, &llm, None, None, false);
let s1 =
insert_observation(&conn, "vrf", "T1", "shared keyword pattern tokens here", 2);
let s2 =
insert_observation(&conn, "vrf", "T2", "shared keyword pattern tokens here", 2);
let s3 =
insert_observation(&conn, "vrf", "T3", "shared keyword pattern tokens here", 2);
let summary = pass
.summarize(&[
crate::db::get(&conn, &s1).unwrap().unwrap(),
crate::db::get(&conn, &s2).unwrap().unwrap(),
crate::db::get(&conn, &s3).unwrap().unwrap(),
])
.unwrap();
pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
.await
.unwrap();
let listed = crate::db::list(
&conn,
Some("vrf"),
None,
32,
0,
None,
None,
None,
None,
None,
)
.unwrap();
let refl_id = listed
.iter()
.find(|m| m.memory_kind == MemoryKind::Reflection)
.unwrap()
.id
.clone();
let _ = crate::db::create_link(&conn, &s1, &refl_id, "related_to");
pass.verify(refl_id).await.unwrap();
}
#[tokio::test]
async fn run_reflection_pass_summarize_error_recorded() {
struct FailingLlm;
impl AutonomyLlm for FailingLlm {
fn auto_tag(&self, _t: &str, _c: &str) -> Result<Vec<String>> {
Ok(vec![])
}
fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
Ok(false)
}
fn summarize_memories(&self, _m: &[(String, String)]) -> Result<String> {
anyhow::bail!("forced llm failure")
}
}
let (store, _dir) = open_db();
let conn = conn_of(&store);
let llm = FailingLlm;
insert_observation(&conn, "ns", "T1", "shared keyword pattern tokens here", 2);
insert_observation(&conn, "ns", "T2", "shared keyword pattern tokens here", 2);
insert_observation(&conn, "ns", "T3", "shared keyword pattern tokens here", 2);
let report = run_reflection_pass(&store, &llm, None, Some("ns"), None, false, |_| true)
.await
.unwrap();
assert!(report.errors.iter().any(|e| e.contains("summarize failed")));
assert_eq!(report.reflections_persisted, 0);
}
} }