use anyhow::Result;
use rusqlite::Connection;
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::{Duration, Instant};
use crate::db;
use crate::llm::OllamaClient;
use crate::models::{Memory, Tier};
pub const DEFAULT_INTERVAL_SECS: u64 = 3600;
pub const DEFAULT_MAX_OPS_PER_CYCLE: usize = 100;
pub const MIN_CONTENT_LEN: usize = 50;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CuratorConfig {
pub interval_secs: u64,
pub max_ops_per_cycle: usize,
pub dry_run: bool,
pub include_namespaces: Vec<String>,
pub exclude_namespaces: Vec<String>,
}
impl Default for CuratorConfig {
fn default() -> Self {
Self {
interval_secs: DEFAULT_INTERVAL_SECS,
max_ops_per_cycle: DEFAULT_MAX_OPS_PER_CYCLE,
dry_run: false,
include_namespaces: Vec::new(),
exclude_namespaces: Vec::new(),
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct CuratorReport {
pub started_at: String,
pub completed_at: String,
pub cycle_duration_ms: u128,
pub memories_scanned: usize,
pub memories_eligible: usize,
pub auto_tagged: usize,
pub contradictions_found: usize,
pub operations_attempted: usize,
pub operations_skipped_cap: usize,
#[serde(default)]
pub autonomy: crate::autonomy::AutonomyPassReport,
pub errors: Vec<String>,
pub dry_run: bool,
}
impl CuratorReport {
fn new(dry_run: bool) -> Self {
let now = chrono::Utc::now().to_rfc3339();
Self {
started_at: now.clone(),
completed_at: now,
dry_run,
..Self::default()
}
}
}
pub fn run_once(
conn: &Connection,
llm: Option<&OllamaClient>,
cfg: &CuratorConfig,
) -> Result<CuratorReport> {
let mut report = CuratorReport::new(cfg.dry_run);
let started = Instant::now();
let CandidateBatch {
memories: candidates,
truncated,
} = collect_candidates(conn, cfg)?;
report.memories_scanned = candidates.len();
record_truncation(&mut report, truncated, cfg);
let eligible: Vec<&Memory> = candidates
.iter()
.filter(|m| needs_curation(m, cfg))
.collect();
report.memories_eligible = eligible.len();
let Some(llm_client) = llm else {
report.errors.push("no LLM client configured".to_string());
report.completed_at = chrono::Utc::now().to_rfc3339();
report.cycle_duration_ms = started.elapsed().as_millis();
return Ok(report);
};
for mem in eligible {
if report.operations_attempted >= cfg.max_ops_per_cycle {
report.operations_skipped_cap += 1;
continue;
}
report.operations_attempted += 1;
match llm_client.auto_tag(&mem.title, &mem.content) {
Ok(tags) if !tags.is_empty() => {
let tag_list: Vec<String> = tags.into_iter().take(8).collect::<Vec<String>>();
if !cfg.dry_run
&& let Err(e) = persist_auto_tags(conn, mem, &tag_list)
{
report
.errors
.push(format!("auto_tag persist failed for {}: {e}", mem.id));
continue;
}
report.auto_tagged += 1;
}
Ok(_) => {}
Err(e) => {
report
.errors
.push(format!("auto_tag failed for {}: {e}", mem.id));
}
}
if let Ok(Some(sibling)) = adjacent_memory(conn, mem) {
match llm_client.detect_contradiction(&mem.content, &sibling.content) {
Ok(true) => {
if !cfg.dry_run
&& let Err(e) = persist_contradiction(conn, mem, &sibling.id)
{
report
.errors
.push(format!("contradiction persist failed for {}: {e}", mem.id));
continue;
}
report.contradictions_found += 1;
}
Ok(false) => {}
Err(e) => {
report.errors.push(format!(
"detect_contradiction failed ({} vs {}): {e}",
mem.id, sibling.id
));
}
}
}
}
let autonomy_candidates: Vec<crate::models::Memory> = candidates
.iter()
.filter(|m| needs_curation(m, cfg))
.cloned()
.collect();
let pass_report =
crate::autonomy::run_autonomy_passes(conn, llm_client, &autonomy_candidates, cfg.dry_run);
report.errors.extend(pass_report.errors.clone());
report.autonomy = pass_report;
report.completed_at = chrono::Utc::now().to_rfc3339();
report.cycle_duration_ms = started.elapsed().as_millis();
if !cfg.dry_run
&& let Err(e) = crate::autonomy::persist_self_report(
conn,
report.cycle_duration_ms,
&report.autonomy,
report.auto_tagged,
report.contradictions_found,
report.errors.len(),
)
{
tracing::warn!("self-report persist failed: {e}");
}
crate::metrics::curator_cycle_completed(
report.operations_attempted,
report.auto_tagged,
report.contradictions_found,
report.errors.len(),
);
Ok(report)
}
#[allow(clippy::needless_pass_by_value)]
#[allow(dead_code)] pub fn run_daemon(
db_path: PathBuf,
llm: Option<Arc<OllamaClient>>,
cfg: CuratorConfig,
shutdown: Arc<AtomicBool>,
) {
let interval = cfg.interval_secs.clamp(60, 86400);
tracing::info!(
"curator daemon started (interval={}s, max_ops={}, dry_run={})",
interval,
cfg.max_ops_per_cycle,
cfg.dry_run
);
while !shutdown.load(Ordering::Relaxed) {
match Connection::open(&db_path) {
Ok(conn) => {
let llm_ref = llm.as_deref();
match run_once(&conn, llm_ref, &cfg) {
Ok(report) => tracing::info!(
"curator cycle: scanned={} eligible={} tagged={} contradictions={} errors={} ({}ms, dry_run={})",
report.memories_scanned,
report.memories_eligible,
report.auto_tagged,
report.contradictions_found,
report.errors.len(),
report.cycle_duration_ms,
report.dry_run
),
Err(e) => tracing::error!("curator cycle errored: {e}"),
}
}
Err(e) => tracing::error!("curator could not open db {}: {e}", db_path.display()),
}
let deadline = Instant::now() + Duration::from_secs(interval);
while Instant::now() < deadline {
if shutdown.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(Duration::from_millis(500));
}
}
tracing::info!("curator daemon shutdown");
}
pub(crate) struct CandidateBatch {
pub memories: Vec<Memory>,
pub truncated: bool,
}
fn record_truncation(report: &mut CuratorReport, truncated: bool, cfg: &CuratorConfig) {
if truncated {
report.errors.push(format!(
"collect_candidates truncated at cap={} per tier; consider raising max_ops_per_cycle or paginating across cycles",
cfg.max_ops_per_cycle.saturating_mul(4)
));
}
}
fn collect_candidates(conn: &Connection, cfg: &CuratorConfig) -> Result<CandidateBatch> {
let cap = cfg.max_ops_per_cycle.saturating_mul(4);
let mut out = Vec::new();
let mut truncated = false;
for tier in [Tier::Mid, Tier::Long] {
let batch = db::list(
conn,
None,
Some(&tier),
cap,
0,
None,
None,
None,
None,
None,
)?;
if batch.len() >= cap {
truncated = true;
}
out.extend(batch);
}
Ok(CandidateBatch {
memories: out,
truncated,
})
}
fn needs_curation(mem: &Memory, cfg: &CuratorConfig) -> bool {
if mem.namespace.starts_with('_') {
return false;
}
if !cfg.include_namespaces.is_empty() && !cfg.include_namespaces.contains(&mem.namespace) {
return false;
}
if cfg.exclude_namespaces.contains(&mem.namespace) {
return false;
}
if mem.content.len() < MIN_CONTENT_LEN {
return false;
}
let has_auto_tags = mem
.metadata
.get("auto_tags")
.is_some_and(|v| v.as_array().is_some_and(|a| !a.is_empty()));
!has_auto_tags
}
fn persist_auto_tags(conn: &Connection, mem: &Memory, tags: &[String]) -> Result<()> {
let mut updated = mem.metadata.clone();
if let Some(obj) = updated.as_object_mut() {
obj.insert("auto_tags".to_string(), serde_json::json!(tags));
obj.insert(
"curated_at".to_string(),
serde_json::json!(chrono::Utc::now().to_rfc3339()),
);
}
db::update(
conn,
&mem.id,
None,
None,
None,
None,
None,
None,
None,
None,
Some(&updated),
)?;
Ok(())
}
fn persist_contradiction(conn: &Connection, mem: &Memory, against_id: &str) -> Result<()> {
let mut updated = mem.metadata.clone();
if let Some(obj) = updated.as_object_mut() {
let existing = obj
.get("confirmed_contradictions")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let mut ids: Vec<String> = existing
.into_iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
if !ids.iter().any(|id| id == against_id) {
ids.push(against_id.to_string());
}
obj.insert(
"confirmed_contradictions".to_string(),
serde_json::json!(ids),
);
}
db::update(
conn,
&mem.id,
None,
None,
None,
None,
None,
None,
None,
None,
Some(&updated),
)?;
Ok(())
}
fn adjacent_memory(conn: &Connection, mem: &Memory) -> Result<Option<Memory>> {
let batch = db::list(
conn,
Some(&mem.namespace),
None,
8,
0,
None,
None,
None,
None,
None,
)?;
Ok(batch
.into_iter()
.find(|m| m.id != mem.id && m.content.len() >= MIN_CONTENT_LEN))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config_has_sane_values() {
let cfg = CuratorConfig::default();
assert_eq!(cfg.interval_secs, DEFAULT_INTERVAL_SECS);
assert_eq!(cfg.max_ops_per_cycle, DEFAULT_MAX_OPS_PER_CYCLE);
assert!(!cfg.dry_run);
assert!(cfg.include_namespaces.is_empty());
assert!(cfg.exclude_namespaces.is_empty());
}
#[test]
fn needs_curation_skips_internal_namespaces() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "_messages/alice".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
assert!(!needs_curation(&mem, &CuratorConfig::default()));
}
#[test]
fn needs_curation_skips_short_content() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "app".to_string(),
title: "t".to_string(),
content: "short".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
assert!(!needs_curation(&mem, &CuratorConfig::default()));
}
#[test]
fn needs_curation_skips_already_tagged() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Long,
namespace: "app".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({"auto_tags":["x","y"]}),
};
assert!(!needs_curation(&mem, &CuratorConfig::default()));
}
#[test]
fn needs_curation_respects_include_list() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Long,
namespace: "app".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
let mut cfg = CuratorConfig {
include_namespaces: vec!["other".to_string()],
..CuratorConfig::default()
};
assert!(!needs_curation(&mem, &cfg));
cfg.include_namespaces = vec!["app".to_string()];
assert!(needs_curation(&mem, &cfg));
}
#[test]
fn needs_curation_respects_exclude_list() {
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Long,
namespace: "noisy".to_string(),
title: "t".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
let cfg = CuratorConfig {
exclude_namespaces: vec!["noisy".to_string()],
..CuratorConfig::default()
};
assert!(!needs_curation(&mem, &cfg));
}
#[test]
fn run_once_without_llm_emits_error_but_succeeds() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let cfg = CuratorConfig::default();
let report = run_once(&conn, None, &cfg).unwrap();
assert_eq!(report.memories_scanned, 0);
assert_eq!(report.memories_eligible, 0);
assert_eq!(report.operations_attempted, 0);
assert!(report.errors.iter().any(|e| e.contains("no LLM")));
}
#[test]
fn report_serialises_to_json() {
let report = CuratorReport::new(true);
let json = serde_json::to_string(&report).unwrap();
assert!(json.contains("dry_run"));
assert!(json.contains("memories_scanned"));
}
fn make_test_memory(ns: &str, title: &str, content: &str) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: title.to_string(),
content: content.to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "api".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
}
}
#[test]
fn persist_auto_tags_writes_metadata() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
persist_auto_tags(&conn, &mem, &["alpha".to_string(), "beta".to_string()]).unwrap();
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let tags = updated
.metadata
.get("auto_tags")
.unwrap()
.as_array()
.unwrap();
assert_eq!(tags.len(), 2);
assert_eq!(tags[0].as_str().unwrap(), "alpha");
assert!(
updated
.metadata
.get("curated_at")
.and_then(|v| v.as_str())
.is_some_and(|s| !s.is_empty())
);
}
#[test]
fn persist_auto_tags_with_empty_tag_list_still_writes_marker() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
persist_auto_tags(&conn, &mem, &[]).unwrap();
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let tags = updated
.metadata
.get("auto_tags")
.unwrap()
.as_array()
.unwrap();
assert!(tags.is_empty());
}
#[test]
fn persist_contradiction_appends_unique_ids() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
persist_contradiction(&conn, &mem, "id-1").unwrap();
let mid = db::get(&conn, &mem.id).unwrap().unwrap();
persist_contradiction(&conn, &mid, "id-2").unwrap();
let mid2 = db::get(&conn, &mem.id).unwrap().unwrap();
persist_contradiction(&conn, &mid2, "id-1").unwrap();
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let ids = updated
.metadata
.get("confirmed_contradictions")
.unwrap()
.as_array()
.unwrap();
assert_eq!(ids.len(), 2);
let strs: Vec<String> = ids
.iter()
.filter_map(|v| v.as_str().map(String::from))
.collect();
assert!(strs.contains(&"id-1".to_string()));
assert!(strs.contains(&"id-2".to_string()));
}
#[test]
fn adjacent_memory_returns_none_when_only_self_exists() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("solo-ns", "only", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
let got = adjacent_memory(&conn, &mem).unwrap();
assert!(got.is_none());
}
#[test]
fn adjacent_memory_returns_some_when_sibling_present() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let m1 = make_test_memory("dual-ns", "first", &"a".repeat(120));
let m2 = make_test_memory("dual-ns", "second", &"b".repeat(120));
db::insert(&conn, &m1).unwrap();
db::insert(&conn, &m2).unwrap();
let got = adjacent_memory(&conn, &m1).unwrap().unwrap();
assert_ne!(got.id, m1.id);
assert!(got.content.len() >= MIN_CONTENT_LEN);
}
#[test]
fn adjacent_memory_skips_short_sibling() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let m1 = make_test_memory("ns-short", "anchor", &"a".repeat(120));
let mut m2 = make_test_memory("ns-short", "tiny-sibling", "x");
m2.content = "short".to_string(); db::insert(&conn, &m1).unwrap();
db::insert(&conn, &m2).unwrap();
let got = adjacent_memory(&conn, &m1).unwrap();
assert!(got.is_none());
}
#[test]
fn record_truncation_appends_when_truncated() {
let mut report = CuratorReport::new(false);
let cfg = CuratorConfig::default();
record_truncation(&mut report, true, &cfg);
assert_eq!(report.errors.len(), 1);
assert!(report.errors[0].contains("collect_candidates truncated"));
}
#[test]
fn record_truncation_noop_when_not_truncated() {
let mut report = CuratorReport::new(false);
let cfg = CuratorConfig::default();
record_truncation(&mut report, false, &cfg);
assert!(report.errors.is_empty());
}
#[test]
fn collect_candidates_returns_eligible_memories() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
for i in 0..3 {
let mem = make_test_memory("cand-ns", &format!("row-{i}"), &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
}
let cfg = CuratorConfig::default();
let batch = collect_candidates(&conn, &cfg).unwrap();
assert!(!batch.memories.is_empty());
assert!(!batch.truncated);
}
#[test]
fn run_once_with_dry_run_does_not_persist() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_test_memory("dry-ns", "anchor", &"a".repeat(120));
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
dry_run: true,
..CuratorConfig::default()
};
let report = run_once(&conn, None, &cfg).unwrap();
assert!(report.dry_run);
let after = db::get(&conn, &mem.id).unwrap().unwrap();
assert!(after.metadata.get("auto_tags").is_none());
}
#[test]
fn run_daemon_executes_multiple_cycles_and_respects_shutdown() {
use std::sync::Mutex;
use std::thread;
use std::time::Duration;
let tmp = tempfile::NamedTempFile::new().unwrap();
let db_path = tmp.path().to_path_buf();
let conn = db::open(&db_path).unwrap();
let now = chrono::Utc::now().to_rfc3339();
for i in 0..5 {
let mem = Memory {
id: format!("test-mem-{i}"),
tier: crate::models::Tier::Mid,
namespace: "test".to_string(),
title: format!("Memory {i}"),
content: "x".repeat(100), tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
db::insert(&conn, &mem).unwrap();
}
drop(conn);
let cycle_count = std::sync::Arc::new(Mutex::new(0));
let cycle_count_for_test = cycle_count.clone();
let cfg = CuratorConfig {
interval_secs: 1,
max_ops_per_cycle: 50,
dry_run: true, include_namespaces: vec![],
exclude_namespaces: vec![],
};
let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let shutdown_for_daemon = shutdown.clone();
let daemon_thread = thread::spawn(move || {
*cycle_count_for_test.lock().unwrap() = 1;
run_daemon(db_path, None, cfg, shutdown_for_daemon);
*cycle_count_for_test.lock().unwrap() = 2;
});
thread::sleep(Duration::from_millis(2500));
shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
let join_result = daemon_thread.join();
assert!(
join_result.is_ok(),
"daemon thread panicked or failed to join"
);
let final_count = *cycle_count.lock().unwrap();
assert_eq!(
final_count, 2,
"daemon should have entered and exited cleanly"
);
}
use std::io::{BufRead, BufReader, Read, Write};
use std::net::TcpListener;
use std::sync::Arc as StdArc;
use std::sync::atomic::{AtomicBool as StdAtomicBool, AtomicUsize, Ordering as StdOrdering};
use std::thread::JoinHandle;
#[derive(Clone)]
struct FakeOllamaCfg {
tag_response: String,
contradiction_answer: String,
summary_response: String,
chat_returns_error: bool,
}
impl Default for FakeOllamaCfg {
fn default() -> Self {
Self {
tag_response: "alpha\nbeta\ngamma".to_string(),
contradiction_answer: "no".to_string(),
summary_response: "consolidated summary".to_string(),
chat_returns_error: false,
}
}
}
struct FakeOllama {
url: String,
shutdown: StdArc<StdAtomicBool>,
handle: Option<JoinHandle<()>>,
chat_calls: StdArc<AtomicUsize>,
}
impl FakeOllama {
fn start(cfg: FakeOllamaCfg) -> Self {
let listener = TcpListener::bind("127.0.0.1:0").expect("bind 127.0.0.1");
let addr = listener.local_addr().unwrap();
listener.set_nonblocking(true).unwrap();
let shutdown = StdArc::new(StdAtomicBool::new(false));
let chat_calls = StdArc::new(AtomicUsize::new(0));
let shutdown_for_thread = shutdown.clone();
let chat_calls_for_thread = chat_calls.clone();
let cfg_for_thread = cfg;
let handle = std::thread::spawn(move || {
while !shutdown_for_thread.load(StdOrdering::Relaxed) {
match listener.accept() {
Ok((mut stream, _peer)) => {
stream.set_nonblocking(false).ok();
stream
.set_read_timeout(Some(std::time::Duration::from_secs(2)))
.ok();
let cfg = cfg_for_thread.clone();
let chat_calls = chat_calls_for_thread.clone();
std::thread::spawn(move || {
handle_one(&mut stream, &cfg, &chat_calls);
});
}
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
std::thread::sleep(std::time::Duration::from_millis(20));
}
Err(_) => break,
}
}
});
Self {
url: format!("http://127.0.0.1:{}", addr.port()),
shutdown,
handle: Some(handle),
chat_calls,
}
}
}
impl Drop for FakeOllama {
fn drop(&mut self) {
self.shutdown.store(true, StdOrdering::Relaxed);
if let Some(h) = self.handle.take() {
let _ = h.join();
}
}
}
fn handle_one(stream: &mut std::net::TcpStream, cfg: &FakeOllamaCfg, chat_calls: &AtomicUsize) {
let mut reader = BufReader::new(stream.try_clone().expect("clone tcp"));
let mut request_line = String::new();
if reader.read_line(&mut request_line).is_err() {
return;
}
let parts: Vec<&str> = request_line.split_whitespace().collect();
if parts.len() < 2 {
return;
}
let method = parts[0];
let path = parts[1];
let mut content_length: usize = 0;
loop {
let mut header = String::new();
if reader.read_line(&mut header).is_err() {
return;
}
if header == "\r\n" || header.is_empty() {
break;
}
let lower = header.to_ascii_lowercase();
if let Some(rest) = lower.strip_prefix("content-length:") {
content_length = rest.trim().parse().unwrap_or(0);
}
}
let mut body = vec![0u8; content_length];
if content_length > 0 {
let _ = reader.read_exact(&mut body);
}
let body_str = String::from_utf8_lossy(&body).to_string();
let (status, body): (&str, String) = if method == "GET" && path == "/api/tags" {
(
"200 OK",
serde_json::json!({"models": [{"name": "fake-model:latest"}]}).to_string(),
)
} else if method == "POST" && path == "/api/chat" {
chat_calls.fetch_add(1, StdOrdering::Relaxed);
if cfg.chat_returns_error {
(
"500 Internal Server Error",
"{\"error\":\"forced fault\"}".to_string(),
)
} else {
let response = if body_str.contains("contradict") {
cfg.contradiction_answer.clone()
} else if body_str.contains("Summarize") || body_str.contains("summari") {
cfg.summary_response.clone()
} else if body_str.contains("tags") {
cfg.tag_response.clone()
} else {
"ok".to_string()
};
(
"200 OK",
serde_json::json!({"message": {"content": response}}).to_string(),
)
}
} else {
("404 Not Found", "{}".to_string())
};
let resp = format!(
"HTTP/1.1 {status}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
let _ = stream.write_all(resp.as_bytes());
let _ = stream.flush();
let _ = stream.shutdown(std::net::Shutdown::Write);
}
fn ollama_for(server: &FakeOllama) -> crate::llm::OllamaClient {
crate::llm::OllamaClient::new_with_url(&server.url, "fake-model")
.expect("client must reach fake server")
}
fn make_eligible_memory(ns: &str, title: &str) -> Memory {
let now = chrono::Utc::now().to_rfc3339();
Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: Tier::Long,
namespace: ns.to_string(),
title: title.to_string(),
content: "a".repeat(120),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "api".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
}
}
#[test]
fn run_once_with_llm_tags_eligible_memories() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("autotag-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["autotag-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(report.memories_eligible >= 1);
assert!(report.auto_tagged >= 1, "report: {report:?}");
let updated = db::get(&conn, &mem.id).unwrap().unwrap();
let tags = updated
.metadata
.get("auto_tags")
.and_then(|v| v.as_array())
.expect("auto_tags persisted");
assert!(!tags.is_empty());
}
#[test]
fn run_once_with_llm_dry_run_skips_writes() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("dry-llm-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
dry_run: true,
include_namespaces: vec!["dry-llm-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(report.dry_run);
let after = db::get(&conn, &mem.id).unwrap().unwrap();
assert!(after.metadata.get("auto_tags").is_none());
let reports = db::list(
&conn,
Some("_curator/reports"),
None,
10,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert!(reports.is_empty(), "dry-run must not persist self-report");
}
#[test]
fn run_once_max_ops_cap_respected() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
for i in 0..3 {
let m = make_eligible_memory("capns", &format!("anchor-{i}"));
db::insert(&conn, &m).unwrap();
}
let cfg = CuratorConfig {
max_ops_per_cycle: 1,
include_namespaces: vec!["capns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert_eq!(report.operations_attempted, 1);
assert!(report.operations_skipped_cap >= 2, "report: {report:?}");
}
#[test]
fn run_once_include_namespaces_filter() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let inside = make_eligible_memory("included", "in");
let outside = make_eligible_memory("not-included", "out");
db::insert(&conn, &inside).unwrap();
db::insert(&conn, &outside).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["included".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(report.memories_scanned >= 2);
assert_eq!(report.memories_eligible, 1);
let after_outside = db::get(&conn, &outside.id).unwrap().unwrap();
assert!(after_outside.metadata.get("auto_tags").is_none());
}
#[test]
fn run_once_exclude_namespaces_filter() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let kept = make_eligible_memory("kept", "k");
let dropped = make_eligible_memory("dropped", "d");
db::insert(&conn, &kept).unwrap();
db::insert(&conn, &dropped).unwrap();
let cfg = CuratorConfig {
exclude_namespaces: vec!["dropped".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(report.memories_scanned >= 2);
assert_eq!(report.memories_eligible, 1);
let after_dropped = db::get(&conn, &dropped.id).unwrap().unwrap();
assert!(after_dropped.metadata.get("auto_tags").is_none());
}
#[test]
fn run_once_handles_zero_candidates() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let cfg = CuratorConfig::default();
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert_eq!(report.memories_scanned, 0);
assert_eq!(report.memories_eligible, 0);
assert_eq!(report.operations_attempted, 0);
assert_eq!(report.auto_tagged, 0);
assert_eq!(report.contradictions_found, 0);
}
#[test]
fn run_once_records_contradictions_when_llm_affirms() {
let cfg_server = FakeOllamaCfg {
contradiction_answer: "yes".to_string(),
..FakeOllamaCfg::default()
};
let server = FakeOllama::start(cfg_server);
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let m1 = make_eligible_memory("dual", "first");
let m2 = make_eligible_memory("dual", "second");
db::insert(&conn, &m1).unwrap();
db::insert(&conn, &m2).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["dual".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(report.contradictions_found >= 1, "report: {report:?}");
}
#[test]
fn run_once_records_errors_when_llm_fails() {
let cfg_server = FakeOllamaCfg {
chat_returns_error: true,
..FakeOllamaCfg::default()
};
let server = FakeOllama::start(cfg_server);
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("fail-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["fail-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(!report.completed_at.is_empty());
assert!(
report
.errors
.iter()
.any(|e| e.contains("auto_tag failed") || e.contains("detect_contradiction failed")),
"expected an LLM-error entry in report.errors: {:?}",
report.errors
);
let after = db::get(&conn, &mem.id).unwrap().unwrap();
assert!(after.metadata.get("auto_tags").is_none());
}
#[test]
fn run_once_writes_self_report_when_not_dry_run() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("report-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["report-ns".to_string()],
..CuratorConfig::default()
};
let _ = run_once(&conn, Some(&llm), &cfg).unwrap();
let reports = db::list(
&conn,
Some("_curator/reports"),
None,
10,
0,
None,
None,
None,
None,
None,
)
.unwrap();
assert_eq!(reports.len(), 1);
assert!(reports[0].content.contains("memories_consolidated"));
}
#[test]
fn run_once_idempotent_on_already_tagged_rows() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = make_eligible_memory("idem-ns", "anchor");
db::insert(&conn, &mem).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["idem-ns".to_string()],
..CuratorConfig::default()
};
let r1 = run_once(&conn, Some(&llm), &cfg).unwrap();
assert_eq!(r1.memories_eligible, 1);
let r2 = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(r2.memories_scanned >= 1);
assert_eq!(r2.memories_eligible, 0);
assert_eq!(r2.operations_attempted, 0);
}
#[test]
fn run_once_iterates_through_multiple_rows() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
for i in 0..3 {
let m = make_eligible_memory("multi-ns", &format!("anchor-{i}"));
db::insert(&conn, &m).unwrap();
}
let cfg = CuratorConfig {
include_namespaces: vec!["multi-ns".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert_eq!(report.operations_attempted, 3);
assert_eq!(report.auto_tagged, 3);
assert!(server.chat_calls.load(StdOrdering::Relaxed) >= 3);
}
#[test]
fn run_once_smart_tier_consults_llm_for_clusters() {
let server = FakeOllama::start(FakeOllamaCfg::default());
let llm = ollama_for(&server);
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let m_a = Memory {
id: "smart-a".to_string(),
tier: Tier::Long,
namespace: "smart".to_string(),
title: "deploy plan".to_string(),
content: "kubernetes rolling canary deploy strategy kubernetes deploy".to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "api".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
let m_b = Memory {
id: "smart-b".to_string(),
content: "kubernetes rolling canary deploy strategy kubernetes deploy".to_string(),
title: "deploy overview".to_string(),
..m_a.clone()
};
db::insert(&conn, &m_a).unwrap();
db::insert(&conn, &m_b).unwrap();
let cfg = CuratorConfig {
include_namespaces: vec!["smart".to_string()],
..CuratorConfig::default()
};
let report = run_once(&conn, Some(&llm), &cfg).unwrap();
assert!(server.chat_calls.load(StdOrdering::Relaxed) >= 3);
assert!(report.autonomy.clusters_formed >= 1, "report: {report:?}");
}
}
#[test]
fn apply_rollback_handles_storage_error() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let mem = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "test".to_string(),
title: "Test".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: "2026-01-01T00:00:00Z".to_string(),
updated_at: "2026-01-01T00:00:00Z".to_string(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
db::insert(&conn, &mem).unwrap();
let tags = vec!["test-tag".to_string()];
match persist_auto_tags(&conn, &mem, &tags) {
Ok(_) => {
let batch = db::list(&conn, None, None, 10, 0, None, None, None, None, None).unwrap();
let updated = batch.iter().find(|m| m.id == mem.id).unwrap();
assert!(updated.metadata.get("auto_tags").is_some());
}
Err(e) => {
assert!(!e.to_string().is_empty());
}
}
}
#[test]
fn consolidate_pair_skips_when_namespaces_disagree() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mem1 = Memory {
id: "m1".to_string(),
tier: Tier::Mid,
namespace: "ns1".to_string(),
title: "Title 1".to_string(),
content: "a".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
let mem2 = Memory {
id: "m2".to_string(),
tier: Tier::Mid,
namespace: "ns2".to_string(),
title: "Title 2".to_string(),
content: "b".repeat(100),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: serde_json::json!({}),
};
db::insert(&conn, &mem1).unwrap();
db::insert(&conn, &mem2).unwrap();
let adj = adjacent_memory(&conn, &mem1).unwrap();
assert!(adj.is_none());
}
#[test]
fn priority_feedback_caps_at_priority_10() {
let cfg = CuratorConfig {
interval_secs: 3600,
max_ops_per_cycle: 100,
dry_run: false,
include_namespaces: vec![],
exclude_namespaces: vec![],
};
let cap = cfg.max_ops_per_cycle.saturating_mul(4);
assert_eq!(cap, 400);
assert!(cap <= usize::MAX / 10);
}
#[test]
fn priority_feedback_floors_at_priority_1() {
let cfg = CuratorConfig::default();
assert!(cfg.max_ops_per_cycle > 0);
let floored = 0_usize.saturating_add(1);
assert_eq!(floored, 1);
}
#[test]
fn cycle_aborts_on_database_error() {
let tmp = tempfile::NamedTempFile::new().unwrap();
let conn = db::open(tmp.path()).unwrap();
let cfg = CuratorConfig::default();
let result = run_once(&conn, None, &cfg);
assert!(result.is_ok());
let report = result.unwrap();
assert!(report.errors.iter().any(|e| e.contains("no LLM")));
}