Skip to main content

ai_memory/cli/
curator.rs

1// Copyright 2026 AlphaOne LLC
2// SPDX-License-Identifier: Apache-2.0
3
4//! `cmd_curator` migration. The daemon-mode body delegates to
5//! `daemon_runtime::run_curator_daemon_with_primitives` (W3 work);
6//! this module owns only the outer wrapper and the report printer.
7
8use crate::cli::CliOutput;
9use crate::{autonomy, config, curator, db, llm};
10use anyhow::{Context, Result};
11use clap::Args;
12use std::path::Path;
13
14#[derive(Args)]
15#[allow(clippy::struct_excessive_bools)]
16pub struct CuratorArgs {
17    /// Run exactly one sweep and exit. Mutually exclusive with --daemon.
18    #[arg(long, conflicts_with = "daemon")]
19    pub once: bool,
20    /// Loop forever, sleeping --interval-secs between sweeps. SIGINT /
21    /// SIGTERM trigger a clean shutdown between cycles.
22    #[arg(long)]
23    pub daemon: bool,
24    /// Seconds between daemon sweeps. Clamped to [60, 86400].
25    #[arg(long, default_value_t = 3600)]
26    pub interval_secs: u64,
27    /// Hard cap on LLM-invoking operations per cycle.
28    #[arg(long, default_value_t = 100)]
29    pub max_ops: usize,
30    /// Emit the report without persisting any metadata changes.
31    #[arg(long)]
32    pub dry_run: bool,
33    /// Only curate memories in these namespaces. Repeat flag for multiple.
34    #[arg(long = "include-namespace")]
35    pub include_namespaces: Vec<String>,
36    /// Exclude these namespaces from curation. Repeat flag for multiple.
37    #[arg(long = "exclude-namespace")]
38    pub exclude_namespaces: Vec<String>,
39    /// Print the report as JSON rather than a human-readable summary.
40    #[arg(long)]
41    pub json: bool,
42    /// Reverse rollback-log entries instead of running a sweep. Accepts
43    /// a specific rollback-memory id, or `--last N` for the most recent.
44    /// Mutually exclusive with `--once` and `--daemon`.
45    #[arg(long, conflicts_with_all = ["once", "daemon"])]
46    pub rollback: Option<String>,
47    /// With `--rollback`, reverse the N most recent rollback-log entries
48    /// instead of a single id.
49    #[arg(long)]
50    pub rollback_last: Option<usize>,
51}
52
53fn build_curator_llm(tier: config::FeatureTier) -> Option<llm::OllamaClient> {
54    let llm_model = tier.config().llm_model?;
55    let model = llm_model.ollama_model_id().to_string();
56    llm::OllamaClient::new(&model).ok()
57}
58
59fn print_curator_report(r: &curator::CuratorReport, out: &mut CliOutput<'_>) -> Result<()> {
60    writeln!(out.stdout, "curator cycle report")?;
61    writeln!(out.stdout, "  started_at:        {}", r.started_at)?;
62    writeln!(out.stdout, "  completed_at:      {}", r.completed_at)?;
63    writeln!(out.stdout, "  duration_ms:       {}", r.cycle_duration_ms)?;
64    writeln!(out.stdout, "  memories_scanned:  {}", r.memories_scanned)?;
65    writeln!(out.stdout, "  memories_eligible: {}", r.memories_eligible)?;
66    writeln!(
67        out.stdout,
68        "  operations:        {}",
69        r.operations_attempted
70    )?;
71    writeln!(out.stdout, "  auto_tagged:       {}", r.auto_tagged)?;
72    writeln!(
73        out.stdout,
74        "  contradictions:    {}",
75        r.contradictions_found
76    )?;
77    writeln!(
78        out.stdout,
79        "  skipped (cap):     {}",
80        r.operations_skipped_cap
81    )?;
82    writeln!(out.stdout, "  errors:            {}", r.errors.len())?;
83    writeln!(out.stdout, "  dry_run:           {}", r.dry_run)?;
84    for e in &r.errors {
85        writeln!(out.stdout, "    - {e}")?;
86    }
87    Ok(())
88}
89
90/// `curator` handler. Daemon-mode delegates to `daemon_runtime`.
91pub async fn run(
92    db_path: &Path,
93    args: &CuratorArgs,
94    app_config: &config::AppConfig,
95    out: &mut CliOutput<'_>,
96) -> Result<()> {
97    if args.rollback.is_some() || args.rollback_last.is_some() {
98        return run_rollback(db_path, args, out);
99    }
100
101    if !args.once && !args.daemon {
102        anyhow::bail!("curator requires --once, --daemon, --rollback <id>, or --rollback-last N");
103    }
104
105    let cfg = curator::CuratorConfig {
106        interval_secs: args.interval_secs,
107        max_ops_per_cycle: args.max_ops,
108        dry_run: args.dry_run,
109        include_namespaces: args.include_namespaces.clone(),
110        exclude_namespaces: args.exclude_namespaces.clone(),
111    };
112
113    let feature_tier = app_config.effective_tier(None);
114    let llm = build_curator_llm(feature_tier);
115
116    if args.once {
117        let conn = db::open(db_path)?;
118        let report = curator::run_once(&conn, llm.as_ref(), &cfg)?;
119        if args.json {
120            writeln!(out.stdout, "{}", serde_json::to_string_pretty(&report)?)?;
121        } else {
122            print_curator_report(&report, out)?;
123        }
124        return Ok(());
125    }
126
127    // Daemon mode — delegate to daemon_runtime.
128    let shutdown = std::sync::Arc::new(tokio::sync::Notify::new());
129    let shutdown_for_signal = shutdown.clone();
130    tokio::spawn(async move {
131        let _ = tokio::signal::ctrl_c().await;
132        shutdown_for_signal.notify_one();
133    });
134
135    let ollama_model = feature_tier
136        .config()
137        .llm_model
138        .map(|m| m.ollama_model_id().to_string());
139
140    crate::daemon_runtime::run_curator_daemon_with_primitives(
141        db_path.to_path_buf(),
142        args.interval_secs,
143        args.max_ops,
144        args.dry_run,
145        args.include_namespaces.clone(),
146        args.exclude_namespaces.clone(),
147        ollama_model,
148        shutdown,
149    )
150    .await
151}
152
153fn run_rollback(db_path: &Path, args: &CuratorArgs, out: &mut CliOutput<'_>) -> Result<()> {
154    let conn = db::open(db_path)?;
155
156    if let Some(id) = &args.rollback {
157        let Some(mem) = db::get(&conn, id)? else {
158            anyhow::bail!("rollback entry {id} not found");
159        };
160        let entry: autonomy::RollbackEntry = serde_json::from_str(&mem.content)
161            .context("rollback entry content is not a valid RollbackEntry JSON")?;
162        let applied = autonomy::reverse_rollback_entry(&conn, &entry)?;
163        let mut tags = mem.tags.clone();
164        if !tags.iter().any(|t| t == "_reversed") {
165            tags.push("_reversed".to_string());
166            db::update(
167                &conn,
168                &mem.id,
169                None,
170                None,
171                None,
172                None,
173                Some(&tags),
174                None,
175                None,
176                None,
177                None,
178            )?;
179        }
180        writeln!(
181            out.stdout,
182            "rollback {id}: {}",
183            if applied { "applied" } else { "no-op" }
184        )?;
185        return Ok(());
186    }
187
188    if let Some(n) = args.rollback_last {
189        let log = db::list(
190            &conn,
191            Some("_curator/rollback"),
192            None,
193            n.max(1),
194            0,
195            None,
196            None,
197            None,
198            None,
199            None,
200        )?;
201        let mut reversed = 0usize;
202        for mem in &log {
203            if mem.tags.iter().any(|t| t == "_reversed") {
204                continue;
205            }
206            let Ok(entry) = serde_json::from_str::<autonomy::RollbackEntry>(&mem.content) else {
207                continue;
208            };
209            let applied = autonomy::reverse_rollback_entry(&conn, &entry)?;
210            if applied {
211                reversed += 1;
212                let mut tags = mem.tags.clone();
213                tags.push("_reversed".to_string());
214                db::update(
215                    &conn,
216                    &mem.id,
217                    None,
218                    None,
219                    None,
220                    None,
221                    Some(&tags),
222                    None,
223                    None,
224                    None,
225                    None,
226                )?;
227            }
228        }
229        writeln!(out.stdout, "reversed {reversed} rollback entries")?;
230        return Ok(());
231    }
232
233    unreachable!("run_rollback entered without --rollback or --rollback-last");
234}
235
236#[cfg(test)]
237mod tests {
238    use super::*;
239    use crate::cli::test_utils::TestEnv;
240
241    fn default_args() -> CuratorArgs {
242        CuratorArgs {
243            once: false,
244            daemon: false,
245            interval_secs: 3600,
246            max_ops: 100,
247            dry_run: false,
248            include_namespaces: Vec::new(),
249            exclude_namespaces: Vec::new(),
250            json: false,
251            rollback: None,
252            rollback_last: None,
253        }
254    }
255
256    #[tokio::test]
257    async fn test_curator_requires_mode() {
258        let mut env = TestEnv::fresh();
259        let db = env.db_path.clone();
260        let cfg = config::AppConfig::default();
261        let args = default_args();
262        let mut out = env.output();
263        let res = run(&db, &args, &cfg, &mut out).await;
264        assert!(res.is_err());
265        assert!(
266            res.unwrap_err()
267                .to_string()
268                .contains("--once, --daemon, --rollback")
269        );
270    }
271
272    #[tokio::test]
273    async fn test_curator_once_runs_single_sweep_text() {
274        let mut env = TestEnv::fresh();
275        let db = env.db_path.clone();
276        let cfg = config::AppConfig::default();
277        let mut args = default_args();
278        args.once = true;
279        args.dry_run = true;
280        {
281            let mut out = env.output();
282            run(&db, &args, &cfg, &mut out).await.unwrap();
283        }
284        assert!(env.stdout_str().contains("curator cycle report"));
285    }
286
287    #[tokio::test]
288    async fn test_curator_once_json_format() {
289        let mut env = TestEnv::fresh();
290        let db = env.db_path.clone();
291        let cfg = config::AppConfig::default();
292        let mut args = default_args();
293        args.once = true;
294        args.json = true;
295        args.dry_run = true;
296        {
297            let mut out = env.output();
298            run(&db, &args, &cfg, &mut out).await.unwrap();
299        }
300        let v: serde_json::Value = serde_json::from_str(env.stdout_str().trim()).unwrap();
301        assert!(v["dry_run"].as_bool().unwrap());
302    }
303
304    #[tokio::test]
305    async fn test_curator_dry_run_skips_writes() {
306        let mut env = TestEnv::fresh();
307        let db = env.db_path.clone();
308        let cfg = config::AppConfig::default();
309        let mut args = default_args();
310        args.once = true;
311        args.dry_run = true;
312        {
313            let mut out = env.output();
314            run(&db, &args, &cfg, &mut out).await.unwrap();
315        }
316        // Report mentions dry_run flag.
317        let s = env.stdout_str();
318        assert!(s.contains("dry_run:") || s.contains("\"dry_run\""));
319    }
320
321    #[tokio::test]
322    async fn test_curator_include_namespaces_filter() {
323        let mut env = TestEnv::fresh();
324        let db = env.db_path.clone();
325        let cfg = config::AppConfig::default();
326        let mut args = default_args();
327        args.once = true;
328        args.dry_run = true;
329        args.include_namespaces = vec!["only-this-ns".to_string()];
330        {
331            let mut out = env.output();
332            run(&db, &args, &cfg, &mut out).await.unwrap();
333        }
334        // No memories — operations attempted should be 0.
335        assert!(env.stdout_str().contains("operations:"));
336    }
337
338    #[tokio::test]
339    async fn test_curator_exclude_namespaces_filter() {
340        let mut env = TestEnv::fresh();
341        let db = env.db_path.clone();
342        let cfg = config::AppConfig::default();
343        let mut args = default_args();
344        args.once = true;
345        args.dry_run = true;
346        args.exclude_namespaces = vec!["skip-me".to_string()];
347        {
348            let mut out = env.output();
349            run(&db, &args, &cfg, &mut out).await.unwrap();
350        }
351        assert!(env.stdout_str().contains("curator cycle report"));
352    }
353
354    #[tokio::test]
355    async fn test_curator_max_ops_cap_respected() {
356        let mut env = TestEnv::fresh();
357        let db = env.db_path.clone();
358        let cfg = config::AppConfig::default();
359        let mut args = default_args();
360        args.once = true;
361        args.dry_run = true;
362        args.max_ops = 0; // immediately at cap
363        {
364            let mut out = env.output();
365            run(&db, &args, &cfg, &mut out).await.unwrap();
366        }
367        assert!(env.stdout_str().contains("operations:"));
368    }
369
370    #[tokio::test]
371    async fn test_curator_rollback_id_not_found() {
372        let mut env = TestEnv::fresh();
373        let db = env.db_path.clone();
374        let cfg = config::AppConfig::default();
375        let mut args = default_args();
376        args.rollback = Some("00000000-0000-0000-0000-000000000000".to_string());
377        let mut out = env.output();
378        let res = run(&db, &args, &cfg, &mut out).await;
379        assert!(res.is_err());
380        assert!(res.unwrap_err().to_string().contains("rollback entry"));
381    }
382
383    #[tokio::test]
384    async fn test_curator_rollback_last_zero_entries() {
385        let mut env = TestEnv::fresh();
386        let db = env.db_path.clone();
387        let cfg = config::AppConfig::default();
388        let mut args = default_args();
389        args.rollback_last = Some(5);
390        {
391            let mut out = env.output();
392            run(&db, &args, &cfg, &mut out).await.unwrap();
393        }
394        // No rollback log entries; should report 0.
395        assert!(env.stdout_str().contains("reversed 0"));
396    }
397}