#![cfg_attr(not(feature = "sal"), allow(dead_code, unused_imports))]
use crate::cli::CliOutput;
use crate::curator::reflection_pass;
use crate::identity::keypair as identity_keypair;
use crate::{autonomy, config, curator, db, llm};
use anyhow::{Context, Result};
use clap::Args;
use std::path::Path;
#[derive(Args)]
#[allow(clippy::struct_excessive_bools)]
pub struct CuratorArgs {
#[arg(long, conflicts_with = "daemon")]
pub once: bool,
#[arg(long)]
pub daemon: bool,
#[arg(long, default_value_t = crate::SECS_PER_HOUR as u64)]
pub interval_secs: u64,
#[arg(long, default_value_t = 100)]
pub max_ops: usize,
#[arg(long)]
pub dry_run: bool,
#[arg(long = "include-namespace")]
pub include_namespaces: Vec<String>,
#[arg(long = "exclude-namespace")]
pub exclude_namespaces: Vec<String>,
#[arg(long)]
pub json: bool,
#[arg(long, conflicts_with_all = ["once", "daemon"])]
pub rollback: Option<String>,
#[arg(long)]
pub rollback_last: Option<usize>,
#[arg(long, conflicts_with_all = ["once", "daemon", "rollback", "rollback_last"])]
pub reflect: bool,
#[arg(long)]
pub namespace: Option<String>,
#[arg(long)]
pub max_depth: Option<u32>,
#[arg(long)]
pub all_namespaces: bool,
#[cfg(feature = "sal")]
#[arg(long, value_name = "URL")]
pub store_url: Option<String>,
}
fn build_curator_llm(tier: config::FeatureTier) -> Option<llm::OllamaClient> {
let app_config = config::AppConfig::load();
let resolved = app_config.resolve_llm(None, None, None);
if matches!(resolved.source, config::ConfigSource::CompiledDefault)
&& tier.config().llm_model.is_none()
{
return None;
}
llm::OllamaClient::build_from_resolved(&resolved)
.ok()
.flatten()
}
fn print_curator_report(r: &curator::CuratorReport, out: &mut CliOutput<'_>) -> Result<()> {
writeln!(out.stdout, "curator cycle report")?;
writeln!(out.stdout, " started_at: {}", r.started_at)?;
writeln!(out.stdout, " completed_at: {}", r.completed_at)?;
writeln!(out.stdout, " duration_ms: {}", r.cycle_duration_ms)?;
writeln!(out.stdout, " memories_scanned: {}", r.memories_scanned)?;
writeln!(out.stdout, " memories_eligible: {}", r.memories_eligible)?;
writeln!(
out.stdout,
" operations: {}",
r.operations_attempted
)?;
writeln!(out.stdout, " auto_tagged: {}", r.auto_tagged)?;
writeln!(
out.stdout,
" contradictions: {}",
r.contradictions_found
)?;
writeln!(
out.stdout,
" skipped (cap): {}",
r.operations_skipped_cap
)?;
writeln!(out.stdout, " errors: {}", r.errors.len())?;
writeln!(out.stdout, " dry_run: {}", r.dry_run)?;
for e in &r.errors {
writeln!(out.stdout, " - {e}")?;
}
Ok(())
}
pub async fn run(
db_path: &Path,
args: &CuratorArgs,
app_config: &config::AppConfig,
out: &mut CliOutput<'_>,
) -> Result<()> {
if args.rollback.is_some() || args.rollback_last.is_some() {
return run_rollback(db_path, args, out);
}
if args.reflect {
return run_reflect(db_path, args, app_config, out).await;
}
if !args.once && !args.daemon {
anyhow::bail!(
"curator requires --once, --daemon, --reflect, --rollback <id>, or --rollback-last N"
);
}
#[cfg(feature = "sal")]
if curator_store_url(args).is_some() {
return run_store_backed_sweep(db_path, args, app_config, out).await;
}
let cfg = curator::CuratorConfig {
interval_secs: args.interval_secs,
max_ops_per_cycle: args.max_ops,
dry_run: args.dry_run,
include_namespaces: args.include_namespaces.clone(),
exclude_namespaces: args.exclude_namespaces.clone(),
compaction: curator::CompactionConfig::default(),
};
let feature_tier = app_config.effective_tier(None);
let llm = build_curator_llm(feature_tier);
if args.once {
let conn = db::open(db_path)?;
let report = curator::run_once(&conn, llm.as_ref(), &cfg, None)?;
if args.json {
writeln!(out.stdout, "{}", serde_json::to_string_pretty(&report)?)?;
} else {
print_curator_report(&report, out)?;
}
return Ok(());
}
let shutdown = std::sync::Arc::new(tokio::sync::Notify::new());
let shutdown_for_signal = shutdown.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
shutdown_for_signal.notify_one();
});
crate::daemon_runtime::run_curator_daemon_with_primitives(
db_path.to_path_buf(),
args.interval_secs,
args.max_ops,
args.dry_run,
args.include_namespaces.clone(),
args.exclude_namespaces.clone(),
llm.map(std::sync::Arc::new),
shutdown,
)
.await
}
#[must_use]
fn curator_store_url(args: &CuratorArgs) -> Option<&str> {
#[cfg(feature = "sal")]
{
args.store_url.as_deref()
}
#[cfg(not(feature = "sal"))]
{
let _ = args;
None
}
}
#[cfg(feature = "sal")]
async fn run_store_backed_sweep(
db_path: &Path,
args: &CuratorArgs,
app_config: &config::AppConfig,
out: &mut CliOutput<'_>,
) -> Result<()> {
let store =
crate::daemon_runtime::build_curator_store(curator_store_url(args), db_path, app_config)
.await?;
let keypair = load_curator_keypair_best_effort();
let feature_tier = app_config.effective_tier(None);
let llm = build_curator_llm(feature_tier);
if args.once {
let report = store_backed_reflection_sweep(
store.as_ref(),
llm.as_ref().map(|c| c as &dyn crate::autonomy::AutonomyLlm),
keypair.as_ref(),
args,
)
.await;
if args.json {
writeln!(out.stdout, "{}", serde_json::to_string_pretty(&report)?)?;
} else {
print_reflection_report(&report, out)?;
}
return Ok(());
}
let shutdown = std::sync::Arc::new(tokio::sync::Notify::new());
let shutdown_flag = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let shutdown_for_signal = shutdown.clone();
let flag_for_signal = shutdown_flag.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
flag_for_signal.store(true, std::sync::atomic::Ordering::Relaxed);
shutdown_for_signal.notify_one();
});
let interval_secs = args.interval_secs.clamp(60, crate::SECS_PER_DAY as u64);
tracing::info!(
"curator SAL daemon started (store-url backend, interval={interval_secs}s, \
max_ops={}, dry_run={})",
args.max_ops,
args.dry_run,
);
while !shutdown_flag.load(std::sync::atomic::Ordering::Relaxed) {
let report = store_backed_reflection_sweep(
store.as_ref(),
llm.as_ref().map(|c| c as &dyn crate::autonomy::AutonomyLlm),
keypair.as_ref(),
args,
)
.await;
tracing::info!(
"curator SAL cycle: namespaces={} observations={} clusters_eligible={} \
reflections_persisted={} depth_refusals={} errors={} (dry_run={})",
report.namespaces_visited,
report.observations_scanned,
report.clusters_eligible,
report.reflections_persisted,
report.depth_refusals,
report.errors.len(),
report.dry_run,
);
tokio::select! {
() = tokio::time::sleep(std::time::Duration::from_secs(interval_secs)) => {}
() = shutdown.notified() => break,
}
}
tracing::info!("curator SAL daemon shutdown");
Ok(())
}
#[cfg(feature = "sal")]
async fn store_backed_reflection_sweep(
store: &dyn crate::store::MemoryStore,
llm: Option<&dyn crate::autonomy::AutonomyLlm>,
keypair: Option<&identity_keypair::AgentKeypair>,
args: &CuratorArgs,
) -> reflection_pass::ReflectionPassReport {
run_reflection_pass_with_optional_llm(
store,
llm,
keypair,
None,
args.max_depth,
args.dry_run,
|_ns: &str| true,
)
.await
}
#[cfg(feature = "sal")]
async fn run_reflection_pass_with_optional_llm(
store: &dyn crate::store::MemoryStore,
llm: Option<&dyn crate::autonomy::AutonomyLlm>,
keypair: Option<&identity_keypair::AgentKeypair>,
namespace: Option<&str>,
max_depth: Option<u32>,
dry_run: bool,
enabled_check: impl Fn(&str) -> bool,
) -> reflection_pass::ReflectionPassReport {
let stamp = || chrono::Utc::now().to_rfc3339();
let Some(llm_client) = llm else {
let mut empty = reflection_pass::ReflectionPassReport {
started_at: stamp(),
completed_at: stamp(),
dry_run,
..Default::default()
};
empty.errors.push(
"no LLM client configured — set a feature tier that provides an llm_model".into(),
);
return empty;
};
match reflection_pass::run_reflection_pass(
store,
llm_client,
keypair,
namespace,
max_depth,
dry_run,
enabled_check,
)
.await
{
Ok(report) => report,
Err(e) => {
let mut report = reflection_pass::ReflectionPassReport {
started_at: stamp(),
completed_at: stamp(),
dry_run,
..Default::default()
};
report.errors.push(format!("reflection pass failed: {e}"));
report
}
}
}
#[cfg(feature = "sal")]
async fn run_reflect(
db_path: &Path,
args: &CuratorArgs,
app_config: &config::AppConfig,
out: &mut CliOutput<'_>,
) -> Result<()> {
if args.namespace.is_none() && !args.all_namespaces {
anyhow::bail!("--reflect requires either --namespace <ns> or --all-namespaces");
}
if args.namespace.is_some() && args.all_namespaces {
anyhow::bail!("--reflect: --namespace and --all-namespaces are mutually exclusive");
}
let store = build_reflect_store(db_path, args, app_config).await?;
let keypair = load_curator_keypair_best_effort();
let feature_tier = app_config.effective_tier(None);
let llm = build_curator_llm(feature_tier);
let scope_single = args.namespace.is_some();
let enabled_check =
|ns: &str| -> bool { scope_single || app_config.reflection_namespace_enabled(ns) };
let report = run_reflection_pass_with_optional_llm(
store.as_ref(),
llm.as_ref().map(|c| c as &dyn crate::autonomy::AutonomyLlm),
keypair.as_ref(),
args.namespace.as_deref(),
args.max_depth,
args.dry_run,
enabled_check,
)
.await;
if args.json {
writeln!(out.stdout, "{}", serde_json::to_string_pretty(&report)?)?;
} else {
print_reflection_report(&report, out)?;
}
Ok(())
}
#[cfg(not(feature = "sal"))]
#[allow(clippy::unused_async)]
async fn run_reflect(
_db_path: &Path,
_args: &CuratorArgs,
_app_config: &config::AppConfig,
_out: &mut CliOutput<'_>,
) -> Result<()> {
anyhow::bail!(
"curator --reflect requires a binary built with --features sal \
(the reflection pass operates over the SAL MemoryStore trait)"
)
}
#[cfg(feature = "sal")]
async fn build_reflect_store(
db_path: &Path,
args: &CuratorArgs,
app_config: &config::AppConfig,
) -> Result<std::sync::Arc<dyn crate::store::MemoryStore>> {
crate::daemon_runtime::build_curator_store(curator_store_url(args), db_path, app_config).await
}
#[cfg_attr(not(feature = "sal"), allow(dead_code))]
fn load_curator_keypair_best_effort() -> Option<identity_keypair::AgentKeypair> {
let dir = identity_keypair::default_key_dir().ok()?;
let listed = identity_keypair::list(&dir).ok()?;
let first = listed.into_iter().next()?;
identity_keypair::load(&first.agent_id, &dir).ok()
}
#[cfg_attr(not(feature = "sal"), allow(dead_code))]
fn print_reflection_report(
r: &reflection_pass::ReflectionPassReport,
out: &mut CliOutput<'_>,
) -> Result<()> {
writeln!(out.stdout, "reflection pass report")?;
writeln!(out.stdout, " started_at: {}", r.started_at)?;
writeln!(out.stdout, " completed_at: {}", r.completed_at)?;
writeln!(
out.stdout,
" namespaces_visited: {}",
r.namespaces_visited
)?;
writeln!(
out.stdout,
" observations_scanned: {}",
r.observations_scanned
)?;
writeln!(out.stdout, " clusters_formed: {}", r.clusters_formed)?;
writeln!(
out.stdout,
" clusters_eligible: {}",
r.clusters_eligible
)?;
writeln!(
out.stdout,
" reflections_persisted: {}",
r.reflections_persisted
)?;
writeln!(out.stdout, " depth_refusals: {}", r.depth_refusals)?;
writeln!(out.stdout, " errors: {}", r.errors.len())?;
writeln!(out.stdout, " dry_run: {}", r.dry_run)?;
for e in &r.errors {
writeln!(out.stdout, " - {e}")?;
}
for prop in &r.dry_run_proposals {
writeln!(
out.stdout,
" proposal: ns='{}' title='{}' sources={}",
prop.namespace,
prop.proposed_title,
prop.source_ids.len()
)?;
}
Ok(())
}
fn run_rollback(db_path: &Path, args: &CuratorArgs, out: &mut CliOutput<'_>) -> Result<()> {
let conn = db::open(db_path)?;
if let Some(id) = &args.rollback {
let Some(mem) = db::get(&conn, id)? else {
anyhow::bail!("rollback entry {id} not found");
};
let entry: autonomy::RollbackEntry = serde_json::from_str(&mem.content)
.context("rollback entry content is not a valid RollbackEntry JSON")?;
let applied = autonomy::reverse_rollback_entry(&conn, &entry)?;
let mut tags = mem.tags.clone();
if !tags.iter().any(|t| t == "_reversed") {
tags.push("_reversed".to_string());
db::update(
&conn,
&mem.id,
None,
None,
None,
None,
Some(&tags),
None,
None,
None,
None,
)?;
}
writeln!(
out.stdout,
"rollback {id}: {}",
if applied { "applied" } else { "no-op" }
)?;
return Ok(());
}
if let Some(n) = args.rollback_last {
let log = db::list(
&conn,
Some("_curator/rollback"),
None,
n.max(1),
0,
None,
None,
None,
None,
None,
)?;
let mut reversed = 0usize;
for mem in &log {
if mem.tags.iter().any(|t| t == "_reversed") {
continue;
}
let Ok(entry) = serde_json::from_str::<autonomy::RollbackEntry>(&mem.content) else {
continue;
};
let applied = autonomy::reverse_rollback_entry(&conn, &entry)?;
if applied {
reversed += 1;
let mut tags = mem.tags.clone();
tags.push("_reversed".to_string());
db::update(
&conn,
&mem.id,
None,
None,
None,
None,
Some(&tags),
None,
None,
None,
None,
)?;
}
}
writeln!(out.stdout, "reversed {reversed} rollback entries")?;
return Ok(());
}
anyhow::bail!("run_rollback entered without --rollback or --rollback-last");
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cli::test_utils::TestEnv;
fn default_args() -> CuratorArgs {
CuratorArgs {
once: false,
daemon: false,
interval_secs: crate::SECS_PER_HOUR as u64,
max_ops: 100,
dry_run: false,
include_namespaces: Vec::new(),
exclude_namespaces: Vec::new(),
json: false,
rollback: None,
rollback_last: None,
reflect: false,
namespace: None,
max_depth: None,
all_namespaces: false,
#[cfg(feature = "sal")]
store_url: None,
}
}
#[tokio::test]
async fn test_curator_requires_mode() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let args = default_args();
let mut out = env.output();
let res = run(&db, &args, &cfg, &mut out).await;
assert!(res.is_err());
assert!(
res.unwrap_err()
.to_string()
.contains("--once, --daemon, --reflect")
);
}
#[tokio::test]
async fn test_curator_once_runs_single_sweep_text() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.once = true;
args.dry_run = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
assert!(env.stdout_str().contains("curator cycle report"));
}
#[tokio::test]
async fn test_curator_once_json_format() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.once = true;
args.json = true;
args.dry_run = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
assert!(v["dry_run"].as_bool().unwrap());
}
#[tokio::test]
async fn test_curator_dry_run_skips_writes() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.once = true;
args.dry_run = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let s = env.stdout_str();
assert!(s.contains("dry_run:") || s.contains("\"dry_run\""));
}
#[tokio::test]
async fn test_curator_include_namespaces_filter() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.once = true;
args.dry_run = true;
args.include_namespaces = vec!["only-this-ns".to_string()];
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
assert!(env.stdout_str().contains("operations:"));
}
#[tokio::test]
async fn test_curator_exclude_namespaces_filter() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.once = true;
args.dry_run = true;
args.exclude_namespaces = vec!["skip-me".to_string()];
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
assert!(env.stdout_str().contains("curator cycle report"));
}
#[tokio::test]
async fn test_curator_max_ops_cap_respected() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.once = true;
args.dry_run = true;
args.max_ops = 0; {
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
assert!(env.stdout_str().contains("operations:"));
}
#[tokio::test]
async fn test_curator_rollback_id_not_found() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.rollback = Some("00000000-0000-0000-0000-000000000000".to_string());
let mut out = env.output();
let res = run(&db, &args, &cfg, &mut out).await;
assert!(res.is_err());
assert!(res.unwrap_err().to_string().contains("rollback entry"));
}
#[tokio::test]
async fn test_curator_rollback_last_zero_entries() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.rollback_last = Some(5);
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
assert!(env.stdout_str().contains("reversed 0"));
}
fn build_priority_rollback_entry_json(memory_id: &str, before: i32, after: i32) -> String {
serde_json::to_string(&autonomy::RollbackEntry::PriorityAdjust {
memory_id: memory_id.to_string(),
before,
after,
})
.unwrap()
}
fn seed_rollback_entry(db_path: &std::path::Path, content: &str) -> String {
let conn = db::open(db_path).expect("db::open");
let now = chrono::Utc::now().to_rfc3339();
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let mem = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Mid,
namespace: "_curator/rollback".to_string(),
title: format!("rollback-{}", uuid::Uuid::new_v4()),
content: content.to_string(),
tags: vec![],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(&conn, &mem).expect("db::insert")
}
#[tokio::test]
async fn pr9i_curator_rollback_priority_adjust_applies() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let target = {
let conn = db::open(&db).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let mem = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Mid,
namespace: "ns".to_string(),
title: "target".to_string(),
content: "c".to_string(),
tags: vec![],
priority: 7,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
db::insert(&conn, &mem).unwrap()
};
let entry_json = build_priority_rollback_entry_json(&target, 3, 7);
let entry_id = seed_rollback_entry(&db, &entry_json);
let mut args = default_args();
args.rollback = Some(entry_id.clone());
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let s = env.stdout_str();
assert!(s.contains(&format!("rollback {entry_id}")));
assert!(s.contains("applied"));
let conn = db::open(&db).unwrap();
let target_mem = db::get(&conn, &target).unwrap().unwrap();
assert_eq!(target_mem.priority, 3);
let entry_mem = db::get(&conn, &entry_id).unwrap().unwrap();
assert!(entry_mem.tags.iter().any(|t| t == "_reversed"));
}
#[tokio::test]
async fn pr9i_curator_rollback_last_processes_multiple() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let t1;
let t2;
{
let conn = db::open(&db).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let m1 = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Mid,
namespace: "ns".to_string(),
title: "t1".to_string(),
content: "c1".to_string(),
tags: vec![],
priority: 8,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now.clone(),
last_accessed_at: None,
expires_at: None,
metadata: metadata.clone(),
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
let m2 = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Mid,
namespace: "ns".to_string(),
title: "t2".to_string(),
content: "c2".to_string(),
tags: vec![],
priority: 9,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
t1 = db::insert(&conn, &m1).unwrap();
t2 = db::insert(&conn, &m2).unwrap();
}
seed_rollback_entry(&db, &build_priority_rollback_entry_json(&t1, 4, 8));
seed_rollback_entry(&db, &build_priority_rollback_entry_json(&t2, 5, 9));
seed_rollback_entry(&db, "{not valid json: at all");
let mut args = default_args();
args.rollback_last = Some(5);
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let s = env.stdout_str();
assert!(s.contains("reversed 2"));
let conn = db::open(&db).unwrap();
assert_eq!(db::get(&conn, &t1).unwrap().unwrap().priority, 4);
assert_eq!(db::get(&conn, &t2).unwrap().unwrap().priority, 5);
}
#[tokio::test]
async fn pr9i_curator_rollback_last_skips_already_reversed() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let target;
{
let conn = db::open(&db).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let mem = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Mid,
namespace: "ns".to_string(),
title: "x".to_string(),
content: "c".to_string(),
tags: vec![],
priority: 7,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
target = db::insert(&conn, &mem).unwrap();
}
let entry_json = build_priority_rollback_entry_json(&target, 2, 7);
let entry_id;
{
let conn = db::open(&db).unwrap();
let now = chrono::Utc::now().to_rfc3339();
let mut metadata = crate::models::default_metadata();
if let Some(obj) = metadata.as_object_mut() {
obj.insert(
"agent_id".to_string(),
serde_json::Value::String("test-agent".to_string()),
);
}
let mem = crate::models::Memory {
id: uuid::Uuid::new_v4().to_string(),
tier: crate::models::Tier::Mid,
namespace: "_curator/rollback".to_string(),
title: "preexisting-reversed".to_string(),
content: entry_json,
tags: vec!["_reversed".to_string()],
priority: 5,
confidence: 1.0,
source: "test".to_string(),
access_count: 0,
created_at: now.clone(),
updated_at: now,
last_accessed_at: None,
expires_at: None,
metadata,
reflection_depth: 0,
memory_kind: crate::models::MemoryKind::Observation,
entity_id: None,
persona_version: None,
citations: Vec::new(),
source_uri: None,
source_span: None,
confidence_source: crate::models::ConfidenceSource::CallerProvided,
confidence_signals: None,
confidence_decayed_at: None,
version: 1,
};
entry_id = db::insert(&conn, &mem).unwrap();
}
let mut args = default_args();
args.rollback_last = Some(5);
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let s = env.stdout_str();
assert!(s.contains("reversed 0"));
let conn = db::open(&db).unwrap();
assert_eq!(db::get(&conn, &target).unwrap().unwrap().priority, 7);
let entry_mem = db::get(&conn, &entry_id).unwrap().unwrap();
assert!(entry_mem.tags.iter().any(|t| t == "_reversed"));
}
#[tokio::test]
async fn pr9i_curator_rollback_id_with_malformed_content() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let entry_id = seed_rollback_entry(&db, "{invalid json");
let mut args = default_args();
args.rollback = Some(entry_id);
let mut out = env.output();
let res = run(&db, &args, &cfg, &mut out).await;
assert!(res.is_err());
let err = res.unwrap_err().to_string();
assert!(
err.contains("rollback") || err.contains("RollbackEntry"),
"expected parse-error message, got: {err}"
);
}
#[test]
fn build_curator_llm_with_keyword_tier_returns_none() {
crate::cli::test_utils::ensure_no_config_env();
let result = build_curator_llm(config::FeatureTier::Keyword);
assert!(result.is_none());
}
#[test]
fn build_curator_llm_with_smart_tier_runs_body() {
crate::cli::test_utils::ensure_no_config_env();
let _ = build_curator_llm(config::FeatureTier::Smart);
}
#[cfg(unix)]
#[tokio::test(flavor = "multi_thread")]
async fn curator_daemon_mode_short_loop_returns_on_shutdown() {
use std::path::PathBuf;
let env = TestEnv::fresh();
let db: PathBuf = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.daemon = true;
args.interval_secs = 60; args.dry_run = true;
let kicker = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
unsafe {
let pid = libc::getpid();
libc::kill(pid, libc::SIGINT);
}
});
let mut stdout = Vec::<u8>::new();
let mut stderr = Vec::<u8>::new();
let mut out = crate::cli::CliOutput::from_std(&mut stdout, &mut stderr);
let res = tokio::time::timeout(
std::time::Duration::from_secs(15),
run(&db, &args, &cfg, &mut out),
)
.await;
let _ = kicker.await;
match res {
Ok(Ok(())) => {}
Ok(Err(e)) => panic!("daemon mode errored: {e}"),
Err(_) => {
eprintln!("daemon-mode test timed out; coverage already captured");
}
}
}
#[test]
fn print_curator_report_emits_error_list_lines() {
let mut report = crate::curator::CuratorReport::default();
report.errors = vec!["err A".to_string(), "err B".to_string()];
report.dry_run = true;
let mut stdout = Vec::<u8>::new();
let mut stderr = Vec::<u8>::new();
{
let mut out = crate::cli::CliOutput::from_std(&mut stdout, &mut stderr);
print_curator_report(&report, &mut out).unwrap();
}
let s = String::from_utf8(stdout).unwrap();
assert!(s.contains("curator cycle report"));
assert!(s.contains("- err A"));
assert!(s.contains("- err B"));
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflect_requires_namespace_or_all_namespaces() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.reflect = true;
let mut out = env.output();
let err = run(&db, &args, &cfg, &mut out).await.unwrap_err();
assert!(
err.to_string().contains("--namespace") || err.to_string().contains("--all-namespaces")
);
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflect_namespace_and_all_namespaces_mutually_exclusive() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let cfg = config::AppConfig::default();
let mut args = default_args();
args.reflect = true;
args.namespace = Some("ns".to_string());
args.all_namespaces = true;
let mut out = env.output();
let err = run(&db, &args, &cfg, &mut out).await.unwrap_err();
assert!(err.to_string().contains("mutually exclusive"));
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflect_no_llm_path_emits_error_in_report() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let mut cfg = config::AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = default_args();
args.reflect = true;
args.namespace = Some("ns".to_string());
args.dry_run = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let s = env.stdout_str();
assert!(s.contains("reflection pass report"));
assert!(s.contains("no LLM client configured"));
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflect_no_llm_path_emits_json_report() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let mut cfg = config::AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = default_args();
args.reflect = true;
args.namespace = Some("ns".to_string());
args.dry_run = true;
args.json = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
let errs = v["errors"].as_array().unwrap();
assert!(errs.iter().any(|e| e.as_str().unwrap().contains("no LLM")));
assert!(v["dry_run"].as_bool().unwrap());
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflect_all_namespaces_text_output() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let mut cfg = config::AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = default_args();
args.reflect = true;
args.all_namespaces = true;
args.dry_run = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let s = env.stdout_str();
assert!(s.contains("reflection pass report"));
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn store_url_sqlite_once_text_runs_sweep() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let mut cfg = config::AppConfig::default();
cfg.tier = Some("keyword".to_string()); let mut args = default_args();
args.store_url = Some(format!("sqlite://{}", db.display()));
args.once = true;
args.dry_run = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let s = env.stdout_str();
assert!(s.contains("reflection pass report"));
assert!(s.contains("no LLM client configured"));
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn store_url_sqlite_once_json_runs_sweep() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let mut cfg = config::AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = default_args();
args.store_url = Some(format!("sqlite://{}", db.display()));
args.once = true;
args.dry_run = true;
args.json = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
let errs = v["errors"].as_array().unwrap();
assert!(errs.iter().any(|e| e.as_str().unwrap().contains("no LLM")));
assert!(v["dry_run"].as_bool().unwrap());
}
#[cfg(feature = "sal")]
struct CovStubLlm;
#[cfg(feature = "sal")]
impl crate::autonomy::AutonomyLlm for CovStubLlm {
fn auto_tag(&self, _title: &str, _content: &str) -> anyhow::Result<Vec<String>> {
Ok(Vec::new())
}
fn detect_contradiction(&self, _a: &str, _b: &str) -> anyhow::Result<bool> {
Ok(false)
}
fn summarize_memories(&self, _memories: &[(String, String)]) -> anyhow::Result<String> {
Ok("stub reflection summary".to_string())
}
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflection_helper_with_stub_llm_runs_with_llm_branch() {
let env = TestEnv::fresh();
let store = crate::store::sqlite::SqliteStore::open(&env.db_path).expect("open store");
let stub = CovStubLlm;
let args = default_args();
let report = run_reflection_pass_with_optional_llm(
&store,
Some(&stub as &dyn crate::autonomy::AutonomyLlm),
None,
None,
args.max_depth,
true,
|_ns: &str| true,
)
.await;
assert!(report.dry_run);
assert!(
report.errors.is_empty(),
"unexpected errors: {:?}",
report.errors
);
use crate::autonomy::AutonomyLlm;
assert!(stub.auto_tag("t", "c").unwrap().is_empty());
assert!(!stub.detect_contradiction("a", "b").unwrap());
assert_eq!(
stub.summarize_memories(&[("a".to_string(), "b".to_string())])
.unwrap(),
"stub reflection summary"
);
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflection_helper_with_none_llm_reports_configured_error() {
let env = TestEnv::fresh();
let store = crate::store::sqlite::SqliteStore::open(&env.db_path).expect("open store");
let report = run_reflection_pass_with_optional_llm(
&store,
None,
None,
Some("ns"),
None,
false,
|_ns: &str| true,
)
.await;
assert!(!report.dry_run);
assert!(
report
.errors
.iter()
.any(|e| e.contains("no LLM client configured"))
);
}
#[cfg(all(feature = "sal", unix))]
#[tokio::test(flavor = "multi_thread")]
async fn store_url_sqlite_daemon_loop_returns_on_shutdown() {
use std::path::PathBuf;
let env = TestEnv::fresh();
let db: PathBuf = env.db_path.clone();
let mut cfg = config::AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = default_args();
args.store_url = Some(format!("sqlite://{}", db.display()));
args.daemon = true;
args.interval_secs = 60;
args.dry_run = true;
let kicker = tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
unsafe {
libc::kill(libc::getpid(), libc::SIGINT);
}
});
let mut stdout = Vec::<u8>::new();
let mut stderr = Vec::<u8>::new();
let mut out = crate::cli::CliOutput::from_std(&mut stdout, &mut stderr);
let res = tokio::time::timeout(
std::time::Duration::from_secs(90),
run(&db, &args, &cfg, &mut out),
)
.await;
let _ = kicker.await;
assert!(res.is_ok(), "SAL daemon did not return within timeout");
assert!(res.unwrap().is_ok());
}
#[test]
fn print_reflection_report_emits_proposals_and_errors() {
let r = crate::curator::reflection_pass::ReflectionPassReport {
started_at: "2026-01-01T00:00:00Z".into(),
completed_at: "2026-01-01T00:00:01Z".into(),
namespaces_visited: 2,
observations_scanned: 5,
clusters_formed: 1,
clusters_eligible: 1,
reflections_persisted: 0,
depth_refusals: 0,
errors: vec!["a problem".to_string()],
dry_run_proposals: vec![crate::curator::reflection_pass::DryRunProposal {
namespace: "app".to_string(),
proposed_title: "[reflection] pattern".to_string(),
source_ids: vec!["a".to_string(), "b".to_string(), "c".to_string()],
}],
dry_run: true,
};
let mut stdout = Vec::<u8>::new();
let mut stderr = Vec::<u8>::new();
{
let mut out = crate::cli::CliOutput::from_std(&mut stdout, &mut stderr);
print_reflection_report(&r, &mut out).unwrap();
}
let s = String::from_utf8(stdout).unwrap();
assert!(s.contains("reflection pass report"));
assert!(s.contains("namespaces_visited:"));
assert!(s.contains("observations_scanned:"));
assert!(s.contains("- a problem"));
assert!(s.contains("proposal: ns='app'"));
assert!(s.contains("sources=3"));
}
#[test]
fn load_curator_keypair_best_effort_returns_some_or_none() {
let _ = load_curator_keypair_best_effort();
}
#[test]
fn build_curator_llm_with_autonomous_tier() {
crate::cli::test_utils::ensure_no_config_env();
let _ = build_curator_llm(config::FeatureTier::Autonomous);
}
#[cfg(feature = "sal")]
#[tokio::test]
async fn reflect_with_seeded_observations_and_no_llm() {
let mut env = TestEnv::fresh();
let db = env.db_path.clone();
let _id = crate::cli::test_utils::seed_memory(&db, "myns", "T", "C");
let mut cfg = config::AppConfig::default();
cfg.tier = Some("keyword".to_string());
let mut args = default_args();
args.reflect = true;
args.all_namespaces = true;
args.dry_run = true;
{
let mut out = env.output();
run(&db, &args, &cfg, &mut out).await.unwrap();
}
assert!(env.stdout_str().contains("reflection pass report"));
}
#[test]
fn qual_2_run_rollback_returns_error_when_no_mode_set() {
let env = TestEnv::fresh();
let db = env.db_path.clone();
let args = default_args();
let mut stdout: Vec<u8> = Vec::new();
let mut stderr: Vec<u8> = Vec::new();
let mut out = CliOutput::from_std(&mut stdout, &mut stderr);
let res = run_rollback(&db, &args, &mut out);
assert!(
res.is_err(),
"run_rollback must return Err when both --rollback and --rollback-last are None"
);
let msg = res.unwrap_err().to_string();
assert!(
msg.contains("run_rollback entered without --rollback or --rollback-last"),
"unexpected error message: {msg}"
);
}
}