pub(crate) mod candidates;
pub(crate) mod cluster;
pub(crate) mod compaction;
pub(crate) mod persist;
pub(crate) mod pipeline;
pub mod reflection_pass;
use anyhow::Result;
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
#[cfg(test)]
use crate::db;
use crate::llm::OllamaClient;
use crate::models::Memory;
#[cfg(test)]
use crate::models::Tier;
use candidates::{
CandidateBatch, adjacent_memory, collect_candidates, needs_curation, record_truncation,
};
use persist::{persist_auto_tags, persist_contradiction};
pub const DEFAULT_INTERVAL_SECS: u64 = crate::SECS_PER_HOUR as u64;
pub const DEFAULT_MAX_OPS_PER_CYCLE: usize = 100;
pub const MIN_CONTENT_LEN: usize = 50;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CompactionConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_cosine_threshold")]
pub cosine_threshold: f32,
#[serde(default)]
pub reflection_pass: reflection_pass::ReflectionPassConfig,
}
fn default_cosine_threshold() -> f32 {
crate::curator::cluster::DEFAULT_COSINE_THRESHOLD
}
impl Default for CompactionConfig {
fn default() -> Self {
Self {
enabled: false,
cosine_threshold: default_cosine_threshold(),
reflection_pass: reflection_pass::ReflectionPassConfig::default(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CuratorConfig {
pub interval_secs: u64,
pub max_ops_per_cycle: usize,
pub dry_run: bool,
pub include_namespaces: Vec<String>,
pub exclude_namespaces: Vec<String>,
#[serde(default)]
pub compaction: CompactionConfig,
}
impl Default for CuratorConfig {
fn default() -> Self {
Self {
interval_secs: DEFAULT_INTERVAL_SECS,
max_ops_per_cycle: DEFAULT_MAX_OPS_PER_CYCLE,
dry_run: false,
include_namespaces: Vec::new(),
exclude_namespaces: Vec::new(),
compaction: CompactionConfig::default(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CuratorReport {
pub started_at: String,
pub completed_at: String,
pub cycle_duration_ms: u128,
pub memories_scanned: usize,
pub memories_eligible: usize,
pub auto_tagged: usize,
pub contradictions_found: usize,
pub operations_attempted: usize,
pub operations_skipped_cap: usize,
#[serde(default)]
pub autonomy: crate::autonomy::AutonomyPassReport,
#[serde(default)]
pub personas_generated: usize,
pub errors: Vec<String>,
pub dry_run: bool,
}
impl CuratorReport {
fn new(dry_run: bool) -> Self {
let now = chrono::Utc::now().to_rfc3339();
Self {
started_at: now.clone(),
completed_at: now,
dry_run,
..Self::default()
}
}
}
pub fn run_once(
conn: &Connection,
llm: Option<&OllamaClient>,
cfg: &CuratorConfig,
active_keypair: Option<&crate::identity::keypair::AgentKeypair>,
) -> Result<CuratorReport> {
let mut report = CuratorReport::new(cfg.dry_run);
let started = Instant::now();
let CandidateBatch {
memories: candidates,
truncated,
} = collect_candidates(conn, cfg)?;
report.memories_scanned = candidates.len();
record_truncation(&mut report, truncated, cfg);
let eligible: Vec<&Memory> = candidates
.iter()
.filter(|m| needs_curation(m, cfg))
.collect();
report.memories_eligible = eligible.len();
let Some(llm_client) = llm else {
report.errors.push("no LLM client configured".to_string());
report.completed_at = chrono::Utc::now().to_rfc3339();
report.cycle_duration_ms = started.elapsed().as_millis();
return Ok(report);
};
for mem in eligible {
if report.operations_attempted >= cfg.max_ops_per_cycle {
report.operations_skipped_cap += 1;
continue;
}
report.operations_attempted += 1;
match llm_client.auto_tag(&mem.title, &mem.content, None) {
Ok(tags) if !tags.is_empty() => {
let tag_list: Vec<String> = tags.into_iter().take(8).collect::<Vec<String>>();
if !cfg.dry_run
&& let Err(e) = persist_auto_tags(conn, mem, &tag_list)
{
report
.errors
.push(format!("auto_tag persist failed for {}: {e}", mem.id));
continue;
}
report.auto_tagged += 1;
}
Ok(_) => {}
Err(e) => {
report
.errors
.push(format!("auto_tag failed for {}: {e}", mem.id));
}
}
if let Ok(Some(sibling)) = adjacent_memory(conn, mem) {
match llm_client.detect_contradiction(&mem.content, &sibling.content) {
Ok(true) => {
if !cfg.dry_run
&& let Err(e) = persist_contradiction(conn, mem, &sibling.id)
{
report
.errors
.push(format!("contradiction persist failed for {}: {e}", mem.id));
continue;
}
report.contradictions_found += 1;
}
Ok(false) => {}
Err(e) => {
report.errors.push(format!(
"detect_contradiction failed ({} vs {}): {e}",
mem.id, sibling.id
));
}
}
}
}
let autonomy_candidates: Vec<crate::models::Memory> = candidates
.iter()
.filter(|m| needs_curation(m, cfg))
.cloned()
.collect();
let pass_report =
crate::autonomy::run_autonomy_passes(conn, llm_client, &autonomy_candidates, cfg.dry_run);
report.errors.extend(pass_report.errors.clone());
report.autonomy = pass_report;
persona_sweep(
conn,
llm_client,
&candidates,
cfg,
active_keypair,
&mut report,
);
report.completed_at = chrono::Utc::now().to_rfc3339();
report.cycle_duration_ms = started.elapsed().as_millis();
if !cfg.dry_run
&& let Err(e) = crate::autonomy::persist_self_report(
conn,
report.cycle_duration_ms,
&report.autonomy,
report.auto_tagged,
report.contradictions_found,
report.personas_generated,
report.errors.len(),
)
{
tracing::warn!("self-report persist failed: {e}");
}
crate::metrics::curator_cycle_completed(
report.operations_attempted,
report.auto_tagged,
report.contradictions_found,
report.errors.len(),
);
Ok(report)
}
fn persona_sweep(
conn: &Connection,
_llm_client: &OllamaClient,
_candidates: &[Memory],
cfg: &CuratorConfig,
active_keypair: Option<&crate::identity::keypair::AgentKeypair>,
report: &mut CuratorReport,
) {
let Some(keypair) = active_keypair else {
return;
};
use std::collections::BTreeSet;
let limit = (cfg.max_ops_per_cycle.saturating_mul(2)).max(64);
let mut entity_pairs: BTreeSet<(String, String)> = BTreeSet::new();
let scan_result = (|| -> Result<()> {
let mut stmt = conn.prepare(
"SELECT mentioned_entity_id, namespace
FROM memories
WHERE memory_kind = 'reflection'
AND mentioned_entity_id IS NOT NULL
AND namespace NOT LIKE '\\_%' ESCAPE '\\'
ORDER BY created_at DESC
LIMIT ?1",
)?;
let rows = stmt.query_map(rusqlite::params![limit as i64], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
})?;
for row in rows {
let (eid, ns) = row?;
entity_pairs.insert((eid, ns));
}
Ok(())
})();
if let Err(e) = scan_result {
report.errors.push(format!(
"persona_sweep: scan for mentioned_entity_id failed: {e}"
));
return;
}
if entity_pairs.is_empty() {
return;
}
use crate::persona::{PersonaConfig, PersonaGenerator, get_latest_persona};
let config = PersonaConfig::default();
let generator = PersonaGenerator::new(conn, _llm_client, Some(keypair), config);
for (entity_id, namespace) in entity_pairs {
if report.operations_attempted >= cfg.max_ops_per_cycle {
report.operations_skipped_cap += 1;
continue;
}
match get_latest_persona(conn, &entity_id, &namespace) {
Ok(Some(_)) => continue,
Ok(None) => {}
Err(e) => {
report.errors.push(format!(
"persona_sweep: get_latest_persona failed for ({entity_id}, {namespace}): {e}"
));
continue;
}
}
report.operations_attempted += 1;
if cfg.dry_run {
report.personas_generated += 1;
continue;
}
match generator.generate(&entity_id, &namespace) {
Ok(_persona) => {
report.personas_generated += 1;
}
Err(e) => {
report.errors.push(format!(
"persona_sweep: generate failed for ({entity_id}, {namespace}): {e}"
));
}
}
}
}
#[allow(clippy::needless_pass_by_value)]
#[allow(dead_code)] pub fn run_daemon(
db_path: PathBuf,
llm: Option<Arc<OllamaClient>>,
cfg: CuratorConfig,
shutdown: Arc<AtomicBool>,
active_keypair: Option<Arc<crate::identity::keypair::AgentKeypair>>,
) {
let interval = cfg.interval_secs.clamp(60, crate::SECS_PER_DAY as u64);
tracing::info!(
"curator daemon started (interval={}s, max_ops={}, dry_run={}, auto_persona={})",
interval,
cfg.max_ops_per_cycle,
cfg.dry_run,
active_keypair.is_some()
);
while !shutdown.load(Ordering::Relaxed) {
match Connection::open(&db_path) {
Ok(conn) => {
let llm_ref = llm.as_deref();
let kp_ref = active_keypair.as_deref();
match run_once(&conn, llm_ref, &cfg, kp_ref) {
Ok(report) => tracing::info!(
"curator cycle: scanned={} eligible={} tagged={} contradictions={} personas={} errors={} ({}ms, dry_run={})",
report.memories_scanned,
report.memories_eligible,
report.auto_tagged,
report.contradictions_found,
report.personas_generated,
report.errors.len(),
report.cycle_duration_ms,
report.dry_run
),
Err(e) => tracing::error!("curator cycle errored: {e}"),
}
}
Err(e) => tracing::error!("curator could not open db {}: {e}", db_path.display()),
}
let deadline = Instant::now() + Duration::from_secs(interval);
while Instant::now() < deadline {
if shutdown.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(Duration::from_millis(500));
}
}
tracing::info!("curator daemon shutdown");
}
#[cfg(test)]
mod tests {
use super::candidates::{
adjacent_memory, collect_candidates, needs_curation, record_truncation,
};
use super::persist::{persist_auto_tags, persist_contradiction};
use super::*;
#[test]
fn default_config_has_sane_values() {
let cfg = CuratorConfig::default();
assert_eq!(cfg.interval_secs, DEFAULT_INTERVAL_SECS);
assert_eq!(cfg.max_ops_per_cycle, DEFAULT_MAX_OPS_PER_CYCLE);
assert!(!cfg.dry_run);
assert!(cfg.include_namespaces.is_empty());
assert!(cfg.exclude_namespaces.is_empty());
}
#[test]
fn needs_curation_skips_internal_namespaces() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "_messages/alice".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
assert!(!needs_curation(&mem, &CuratorConfig::default()));
}
#[test]
fn needs_curation_skips_short_content() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "app".to_string(),
title: "t".to_string(),
content: "short".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
assert!(!needs_curation(&mem, &CuratorConfig::default()));
}
#[test]
fn needs_curation_skips_already_tagged() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Long,
namespace: "app".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"auto_tags":["x","y"]}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
assert!(!needs_curation(&mem, &CuratorConfig::default()));
}
#[test]
fn needs_curation_respects_include_list() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Long,
namespace: "app".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let mut cfg = CuratorConfig {
include_namespaces: vec!["other".to_string()],
..CuratorConfig::default()
};
assert!(!needs_curation(&mem, &cfg));
cfg.include_namespaces = vec!["app".to_string()];
assert!(needs_curation(&mem, &cfg));
}
#[test]
fn needs_curation_respects_exclude_list() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Long,
namespace: "noisy".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let cfg = CuratorConfig {
exclude_namespaces: vec!["noisy".to_string()],
..CuratorConfig::default()
};
assert!(!needs_curation(&mem, &cfg));
}
#[test]
fn run_once_without_llm_emits_error_but_succeeds() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let cfg = CuratorConfig::default();
let report = run_once(&conn, None, &cfg, None).unwrap();
assert_eq!(report.memories_scanned, 0);
assert_eq!(report.memories_eligible, 0);
assert_eq!(report.operations_attempted, 0);
assert!(report.errors.iter().any(|e| e.contains("no LLM")));
}
#[test]
fn report_serialises_to_json() {
let report = CuratorReport::new(true);
let json = serde_json::to_string(&report).unwrap();
assert!(json.contains("dry_run"));
assert!(json.contains("memories_scanned"));
}
fn make_test_memory(ns: &str, title: &str, content: &str) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "api".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[test]
fn persist_auto_tags_writes_metadata() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
persist_auto_tags(&conn, &mem, &["alpha".to_string(), "beta".to_string()]).unwrap();
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let tags = updated
.metadata
.get("auto_tags")
.unwrap()
.as_array()
.unwrap();
assert_eq!(tags.len(), 2);
assert_eq!(tags[0].as_str().unwrap(), "alpha");
assert!(
updated
.metadata
.get("curated_at")
.and_then(|v| v.as_str())
.is_some_and(|s| !s.is_empty())
);
}
#[test]
fn persist_auto_tags_with_empty_tag_list_still_writes_marker() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
persist_auto_tags(&conn, &mem, &[]).unwrap();
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let tags = updated
.metadata
.get("auto_tags")
.unwrap()
.as_array()
.unwrap();
assert!(tags.is_empty());
}
#[test]
fn persist_contradiction_appends_unique_ids() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
persist_contradiction(&conn, &mem, "id-1").unwrap();
let mid = db::get(&conn, &mem.id).unwrap().unwrap();
persist_contradiction(&conn, &mid, "id-2").unwrap();
let mid2 = db::get(&conn, &mem.id).unwrap().unwrap();
persist_contradiction(&conn, &mid2, "id-1").unwrap();
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let ids = updated
.metadata
.get("confirmed_contradictions")
.unwrap()
.as_array()
.unwrap();
assert_eq!(ids.len(), 2);
let strs: Vec<String> = ids
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
assert!(strs.contains(&"id-1".to_string()));
assert!(strs.contains(&"id-2".to_string()));
}
#[test]
fn adjacent_memory_returns_none_when_only_self_exists() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("solo-ns", "only", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
let got = adjacent_memory(&conn, &mem).unwrap();
assert!(got.is_none());
}
#[test]
fn adjacent_memory_returns_some_when_sibling_present() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let m1 = make_test_memory("dual-ns", "first", &"a".repeat(120));
let m2 = make_test_memory("dual-ns", "second", &"b".repeat(120));
db::insert(&conn, &m1).unwrap();
db::insert(&conn, &m2).unwrap();
let got = adjacent_memory(&conn, &m1).unwrap().unwrap();
assert_ne!(got.id, m1.id);
assert!(got.content.len() >= MIN_CONTENT_LEN);
}
#[test]
fn adjacent_memory_skips_short_sibling() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let m1 = make_test_memory("ns-short", "anchor", &"a".repeat(120));
let mut m2 = make_test_memory("ns-short", "tiny-sibling", "x");
m2.content = "short".to_string(); db::insert(&conn, &m1).unwrap();
db::insert(&conn, &m2).unwrap();
let got = adjacent_memory(&conn, &m1).unwrap();
assert!(got.is_none());
}
#[test]
fn record_truncation_appends_when_truncated() {
let mut report = CuratorReport::new(false);
let cfg = CuratorConfig::default();
record_truncation(&mut report, true, &cfg);
assert_eq!(report.errors.len(), 1);
assert!(report.errors[0].contains("collect_candidates truncated"));
}
#[test]
fn record_truncation_noop_when_not_truncated() {
let mut report = CuratorReport::new(false);
let cfg = CuratorConfig::default();
record_truncation(&mut report, false, &cfg);
assert!(report.errors.is_empty());
}
#[test]
fn collect_candidates_returns_eligible_memories() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
for i in 0..3 {
let mem = make_test_memory("cand-ns", &format!("row-{i}"), &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
}
let cfg = CuratorConfig::default();
let batch = collect_candidates(&conn, &cfg).unwrap();
assert!(!batch.memories.is_empty());
assert!(!batch.truncated);
}
#[test]
fn run_once_with_dry_run_does_not_persist() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("dry-ns", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
dry_run: true,
..CuratorConfig::default()
};
let report = run_once(&conn, None, &cfg, None).unwrap();
assert!(report.dry_run);
let after = db::get(&conn, &mem.id).unwrap().unwrap();
assert!(after.metadata.get("auto_tags").is_none());
}
#[test]
fn run_daemon_executes_multiple_cycles_and_respects_shutdown() {
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
let tmp = tempfile::NamedTempFile::new().unwrap();
let db_path = tmp.path().to_path_buf();
let conn = db::open(&db_path).unwrap();
let now = chrono::Utc::now().to_rfc3339();
for i in 0..5 {
let mem = Memory {
id: format!("test-mem-{i}"),
tier: crate::models::Tier::Mid,
namespace: "test".to_string(),
title: format!("Memory {i}"),
content: "x".repeat(100), tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(&conn, &mem).unwrap();
}
drop(conn);
let cycle_count = std::sync::Arc::new(Mutex::new(0));
let cycle_count_for_test = cycle_count.clone();
let cfg = CuratorConfig {
interval_secs: 1,
max_ops_per_cycle: 50,
dry_run: true, include_namespaces: vec![],
exclude_namespaces: vec![],
..CuratorConfig::default()
};
let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let shutdown_for_daemon = shutdown.clone();
let daemon_thread = thread::spawn(move || {
*cycle_count_for_test.lock().unwrap() = 1;
run_daemon(db_path, None, cfg, shutdown_for_daemon, None);
*cycle_count_for_test.lock().unwrap() = 2;
});
thread::sleep(Duration::from_millis(2500));
shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
let join_result = daemon_thread.join();
assert!(
join_result.is_ok(),
"daemon thread panicked or failed to join"
);
let final_count = *cycle_count.lock().unwrap();
assert_eq!(
final_count, 2,
"daemon should have entered and exited cleanly"
);
}
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpListener;
use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicBool as StdAtomicBool, AtomicUsize, Ordering as StdOrdering};
use std::thread::JoinHandle;
#[derive(Clone)]
struct FakeOllamaCfg {
tag_response: String,
contradiction_answer: String,
summary_response: String,
chat_returns_error: bool,
}
impl Default for FakeOllamaCfg {
fn default() -> Self {
Self {
tag_response: "alpha\nbeta\ngamma".to_string(),
contradiction_answer: "no".to_string(),
summary_response: "consolidated summary".to_string(),
chat_returns_error: false,
}
}
}
struct FakeOllama {
url: String,
shutdown: StdArc<StdAtomicBool>,
handle: Option<JoinHandle<()>>,
chat_calls: StdArc<AtomicUsize>,
}
impl FakeOllama {
fn start(cfg: FakeOllamaCfg) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind 127.0.0.1");
let addr = listener.local_addr().unwrap();
listener.set_nonblocking(true).unwrap();
let shutdown = StdArc::new(StdAtomicBool::new(false));
let chat_calls = StdArc::new(AtomicUsize::new(0));
let shutdown_for_thread = shutdown.clone();
let chat_calls_for_thread = chat_calls.clone();
let cfg_for_thread = cfg;
let handle = std::thread::spawn(move || {
while !shutdown_for_thread.load(StdOrdering::Relaxed) {
match listener.accept() {
Ok((mut stream, _peer)) => {
stream.set_nonblocking(false).ok();
stream
.set_read_timeout(Some(std::time::Duration::from_secs(2)))
.ok();
let cfg = cfg_for_thread.clone();
let chat_calls = chat_calls_for_thread.clone();
std::thread::spawn(move || {
handle_one(&mut stream, &cfg, &chat_calls);
});
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(20));
}
Err(_) => break,
}
}
});
Self {
url: format!("http://127.0.0.1:{}", addr.port()),
shutdown,
handle: Some(handle),
chat_calls,
}
}
}
impl Drop for FakeOllama {
fn drop(&mut self) {
self.shutdown.store(true, StdOrdering::Relaxed);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
fn handle_one(stream: &mut std::net::TcpStream, cfg: &FakeOllamaCfg, chat_calls: &AtomicUsize) {
let mut reader = BufReader::new(stream.try_clone().expect("clone tcp"));
let mut request_line = String::new();
if reader.read_line(&mut request_line).is_err() {
return;
}
let parts: Vec<&str> = request_line.split_whitespace().collect();
if parts.len() < 2 {
return;
}
let method = parts[0];
let path = parts[1];
let mut content_length: usize = 0;
loop {
let mut header = String::new();
if reader.read_line(&mut header).is_err() {
return;
}
if header == "\r\n" || header.is_empty() {
break;
}
let lower = header.to_ascii_lowercase();
if let Some(rest) = lower.strip_prefix("content-length:") {
content_length = rest.trim().parse().unwrap_or(0);
}
}
let mut body = vec![0u8; content_length];
if content_length > 0 {
let _ = reader.read_exact(&mut body);
}
let body_str = String::from_utf8_lossy(&body).to_string();
let (status, body): (&str, String) = if method == "GET" && path == "/api/tags" {
(
"200 OK",
serde_json::json!({"models": [{"name": "fake-model:latest"}]}).to_string(),
)
} else if method == "POST" && path == "/api/chat" {
chat_calls.fetch_add(1, StdOrdering::Relaxed);
if cfg.chat_returns_error {
(
"500 Internal Server Error",
"{\"error\":\"forced fault\"}".to_string(),
)
} else {
let response = if body_str.contains("contradict") {
cfg.contradiction_answer.clone()
} else if body_str.contains("Summarize") || body_str.contains("summari") {
cfg.summary_response.clone()
} else if body_str.contains("tags") {
cfg.tag_response.clone()
} else {
"ok".to_string()
};
(
"200 OK",
serde_json::json!({"message": {"content": response}}).to_string(),
)
}
} else if method == "POST" && path == "/api/generate" {
chat_calls.fetch_add(1, StdOrdering::Relaxed);
if cfg.chat_returns_error {
(
"500 Internal Server Error",
"{\"error\":\"forced fault\"}".to_string(),
)
} else {
let response = cfg.tag_response.clone();
(
"200 OK",
serde_json::json!({"response": response}).to_string(),
)
}
} else {
("404 Not Found", "{}".to_string())
};
let resp = format!(
"HTTP/1.1 {status}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
let _ = stream.write_all(resp.as_bytes());
let _ = stream.flush();
let _ = stream.shutdown(std::net::Shutdown::Write);
}
fn ollama_for(server: &FakeOllama) -> crate::llm::OllamaClient {
crate::llm::OllamaClient::new_with_url(&server.url, "fake-model")
.expect("client must reach fake server")
}
fn make_eligible_memory(ns: &str, title: &str) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: title.to_string(),
content: "a".repeat(120),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "api".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
}
}
#[test]
fn run_once_with_llm_tags_eligible_memories() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("autotag-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["autotag-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(report.memories_eligible >= 1);
assert!(report.auto_tagged >= 1, "report: {report:?}");
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let tags = updated
.metadata
.get("auto_tags")
.and_then(|v| v.as_array())
.expect("auto_tags persisted");
assert!(!tags.is_empty());
}
#[test]
fn run_once_with_llm_dry_run_skips_writes() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("dry-llm-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
dry_run: true,
include_namespaces: vec!["dry-llm-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(report.dry_run);
let after = db::get(&conn, &mem.id).unwrap().unwrap();
assert!(after.metadata.get("auto_tags").is_none());
let reports = db::list(
&conn,
Some("_curator/reports"),
None,
10,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert!(reports.is_empty(), "dry-run must not persist self-report");
}
#[test]
fn run_once_max_ops_cap_respected() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
for i in 0..3 {
let m = make_eligible_memory("capns", &format!("anchor-{i}"));
db::insert(&conn, &m).unwrap();
}
let cfg = CuratorConfig {
max_ops_per_cycle: 1,
include_namespaces: vec!["capns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert_eq!(report.operations_attempted, 1);
assert!(report.operations_skipped_cap >= 2, "report: {report:?}");
}
#[test]
fn run_once_include_namespaces_filter() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let inside = make_eligible_memory("included", "in");
let outside = make_eligible_memory("not-included", "out");
db::insert(&conn, &inside).unwrap();
db::insert(&conn, &outside).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["included".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(report.memories_scanned >= 2);
assert_eq!(report.memories_eligible, 1);
let after_outside = db::get(&conn, &outside.id).unwrap().unwrap();
assert!(after_outside.metadata.get("auto_tags").is_none());
}
#[test]
fn run_once_exclude_namespaces_filter() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let kept = make_eligible_memory("kept", "k");
let dropped = make_eligible_memory("dropped", "d");
db::insert(&conn, &kept).unwrap();
db::insert(&conn, &dropped).unwrap();
let cfg = CuratorConfig {
exclude_namespaces: vec!["dropped".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(report.memories_scanned >= 2);
assert_eq!(report.memories_eligible, 1);
let after_dropped = db::get(&conn, &dropped.id).unwrap().unwrap();
assert!(after_dropped.metadata.get("auto_tags").is_none());
}
#[test]
fn run_once_handles_zero_candidates() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let cfg = CuratorConfig::default();
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert_eq!(report.memories_scanned, 0);
assert_eq!(report.memories_eligible, 0);
assert_eq!(report.operations_attempted, 0);
assert_eq!(report.auto_tagged, 0);
assert_eq!(report.contradictions_found, 0);
}
#[test]
fn run_once_records_contradictions_when_llm_affirms() {
let cfg_server = FakeOllamaCfg {
contradiction_answer: "yes".to_string(),
..FakeOllamaCfg::default()
};
let server = FakeOllama::start(cfg_server);
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let m1 = make_eligible_memory("dual", "first");
let m2 = make_eligible_memory("dual", "second");
db::insert(&conn, &m1).unwrap();
db::insert(&conn, &m2).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["dual".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(report.contradictions_found >= 1, "report: {report:?}");
}
#[test]
fn run_once_records_errors_when_llm_fails() {
let cfg_server = FakeOllamaCfg {
chat_returns_error: true,
..FakeOllamaCfg::default()
};
let server = FakeOllama::start(cfg_server);
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("fail-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["fail-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(!report.completed_at.is_empty());
assert!(
report
.errors
.iter()
.any(|e| e.contains("auto_tag failed") || e.contains("detect_contradiction failed")),
"expected an LLM-error entry in report.errors: {:?}",
report.errors
);
let after = db::get(&conn, &mem.id).unwrap().unwrap();
assert!(after.metadata.get("auto_tags").is_none());
}
#[test]
fn run_once_writes_self_report_when_not_dry_run() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("report-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["report-ns".to_string()],
..CuratorConfig::default()
};
let _ = run_once(&conn, Some(&llm), &cfg, None).unwrap();
let reports = db::list(
&conn,
Some("_curator/reports"),
None,
10,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert_eq!(reports.len(), 1);
assert!(reports[0].content.contains("memories_consolidated"));
}
#[test]
fn run_once_idempotent_on_already_tagged_rows() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("idem-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["idem-ns".to_string()],
..CuratorConfig::default()
};
let r1 = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert_eq!(r1.memories_eligible, 1);
let r2 = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(r2.memories_scanned >= 1);
assert_eq!(r2.memories_eligible, 0);
assert_eq!(r2.operations_attempted, 0);
}
#[test]
fn run_once_iterates_through_multiple_rows() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
for i in 0..3 {
let m = make_eligible_memory("multi-ns", &format!("anchor-{i}"));
db::insert(&conn, &m).unwrap();
}
let cfg = CuratorConfig {
include_namespaces: vec!["multi-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert_eq!(report.operations_attempted, 3);
assert_eq!(report.auto_tagged, 3);
assert!(server.chat_calls.load(StdOrdering::Relaxed) >= 3);
}
#[test]
fn run_once_smart_tier_consults_llm_for_clusters() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let m_a = Memory {
id: "smart-a".to_string(),
tier: Tier::Long,
namespace: "smart".to_string(),
title: "deploy plan".to_string(),
content: "kubernetes rolling canary deploy strategy kubernetes deploy".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "api".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let m_b = Memory {
id: "smart-b".to_string(),
content: "kubernetes rolling canary deploy strategy kubernetes deploy".to_string(),
title: "deploy overview".to_string(),
..m_a.clone()
};
db::insert(&conn, &m_a).unwrap();
db::insert(&conn, &m_b).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["smart".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
assert!(server.chat_calls.load(StdOrdering::Relaxed) >= 3);
assert!(report.autonomy.clusters_formed >= 1, "report: {report:?}");
}
#[test]
fn run_once_persona_sweep_generates_signed_persona_for_new_entity() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let obs = make_eligible_memory("auto-persona-ns", "observation");
let obs_id = db::insert(&conn, &obs).unwrap();
let entity_id = "auto-persona-entity-2026-05-16";
let mut rfl = make_eligible_memory("auto-persona-ns", "reflection-of-obs");
rfl.memory_kind = crate::models::MemoryKind::Reflection;
rfl.reflection_depth = 1;
rfl.content = "This reflection mentions the entity under test.".to_string();
let rfl_id = db::insert(&conn, &rfl).unwrap();
conn.execute(
"UPDATE memories SET mentioned_entity_id = ?1 WHERE id = ?2",
rusqlite::params![entity_id, &rfl_id],
)
.unwrap();
db::create_link(&conn, &rfl_id, &obs_id, "reflects_on").unwrap();
let kp = crate::identity::keypair::generate("daemon").unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["auto-persona-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, Some(&kp)).unwrap();
assert!(
report.personas_generated >= 1,
"expected at least one auto-persona generation, report.errors={:?}",
report.errors
);
let persona = crate::persona::get_latest_persona(&conn, entity_id, "auto-persona-ns")
.expect("get_latest_persona failed")
.expect("persona row must exist after sweep");
assert_eq!(
persona.attest_level, "self_signed",
"persona attest_level must be self_signed (was {:?})",
persona.attest_level
);
let row: String = conn
.query_row(
"SELECT metadata FROM memories WHERE id = ?1",
rusqlite::params![&persona.id],
|r| r.get(0),
)
.unwrap();
let meta: serde_json::Value = serde_json::from_str(&row).unwrap();
let sig_b64 = meta
.get("persona")
.and_then(|p| p.get("signature"))
.and_then(|v| v.as_str())
.expect("metadata.persona.signature missing");
use base64::Engine;
let sig_bytes = base64::engine::general_purpose::STANDARD
.decode(sig_b64)
.expect("signature must be valid base64");
assert_eq!(
sig_bytes.len(),
64,
"metadata.persona.signature must decode to 64 bytes (got {})",
sig_bytes.len()
);
let mut stmt = conn
.prepare(
"SELECT attest_level, length(signature) \
FROM memory_links \
WHERE source_id = ?1 AND relation = 'derived_from'",
)
.unwrap();
let rows: Vec<(String, Option<i64>)> = stmt
.query_map(rusqlite::params![&persona.id], |r| {
Ok((r.get::<_, String>(0)?, r.get::<_, Option<i64>>(1)?))
})
.unwrap()
.map(std::result::Result::unwrap)
.collect();
assert!(
!rows.is_empty(),
"persona must emit at least one derived_from edge"
);
for (attest_level, sig_len) in &rows {
assert_eq!(
attest_level, "self_signed",
"persona derived_from edges must be self_signed"
);
assert_eq!(
sig_len.unwrap_or(0),
64,
"persona derived_from signature must be 64 bytes"
);
}
}
#[test]
fn run_once_persona_sweep_dry_run_counts_without_writing() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let obs = make_eligible_memory("dry-persona-ns", "observation");
let obs_id = db::insert(&conn, &obs).unwrap();
let entity_id = "dry-persona-entity-2026-05-18";
let mut rfl = make_eligible_memory("dry-persona-ns", "reflection-of-obs");
rfl.memory_kind = crate::models::MemoryKind::Reflection;
rfl.reflection_depth = 1;
rfl.content = "Dry-run reflection mentions the entity under test.".to_string();
let rfl_id = db::insert(&conn, &rfl).unwrap();
conn.execute(
"UPDATE memories SET mentioned_entity_id = ?1 WHERE id = ?2",
rusqlite::params![entity_id, &rfl_id],
)
.unwrap();
db::create_link(&conn, &rfl_id, &obs_id, "reflects_on").unwrap();
let kp = crate::identity::keypair::generate("daemon").unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["dry-persona-ns".to_string()],
dry_run: true,
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg, Some(&kp)).unwrap();
assert!(
report.personas_generated >= 1,
"dry-run must still count would-be persona generations, errors={:?}",
report.errors
);
let persona = crate::persona::get_latest_persona(&conn, entity_id, "dry-persona-ns")
.expect("get_latest_persona must not error");
assert!(
persona.is_none(),
"dry-run must NOT write a persona row, got: {persona:?}"
);
}
}
#[test]
fn apply_rollback_handles_storage_error() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "test".to_string(),
title: "Test".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(&conn, &mem).unwrap();
let tags = vec!["test-tag".to_string()];
match persist_auto_tags(&conn, &mem, &tags) {
Ok(_) => {
let batch = db::list(&conn, None, None, 10, 0, None, None, None, None, None).unwrap();
let updated = batch.iter().find(|m| m.id == mem.id).unwrap();
assert!(updated.metadata.get("auto_tags").is_some());
}
Err(e) => {
assert!(!e.to_string().is_empty());
}
}
}
#[test]
fn consolidate_pair_skips_when_namespaces_disagree() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mem1 = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "ns1".to_string(),
title: "Title 1".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let mem2 = Memory {
id: "m2".to_string(),
tier: Tier::Mid,
namespace: "ns2".to_string(),
title: "Title 2".to_string(),
content: "b".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(&conn, &mem1).unwrap();
db::insert(&conn, &mem2).unwrap();
let adj = adjacent_memory(&conn, &mem1).unwrap();
assert!(adj.is_none());
}
#[test]
fn priority_feedback_caps_at_priority_10() {
let cfg = CuratorConfig {
interval_secs: crate::SECS_PER_HOUR as u64,
max_ops_per_cycle: 100,
dry_run: false,
include_namespaces: vec![],
exclude_namespaces: vec![],
..CuratorConfig::default()
};
let cap = cfg.max_ops_per_cycle.saturating_mul(4);
assert_eq!(cap, 400);
assert!(cap <= usize::MAX / 10);
}
#[test]
fn priority_feedback_floors_at_priority_1() {
let cfg = CuratorConfig::default();
assert!(cfg.max_ops_per_cycle > 0);
let floored = 0_usize.saturating_add(1);
assert_eq!(floored, 1);
}
#[test]
fn cycle_aborts_on_database_error() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let cfg = CuratorConfig::default();
let result = run_once(&conn, None, &cfg, None);
assert!(result.is_ok());
let report = result.unwrap();
assert!(report.errors.iter().any(|e| e.contains("no LLM")));
}