1pub(crate) mod candidates;
36pub(crate) mod cluster;
37pub(crate) mod compaction;
38pub(crate) mod persist;
39pub(crate) mod pipeline;
40pub mod reflection_pass;
46
47use anyhow::Result;
48use rusqlite::Connection;
49use serde::{Deserialize, Serialize};
50use std::path::PathBuf;
51use std::sync::Arc;
52use std::sync::atomic::{AtomicBool, Ordering};
53use std::time::{Duration, Instant};
54
55#[cfg(test)]
56use crate::db;
57use crate::llm::OllamaClient;
58use crate::models::Memory;
59#[cfg(test)]
60use crate::models::Tier;
61
62use candidates::{
63 CandidateBatch, adjacent_memory, collect_candidates, needs_curation, record_truncation,
64};
65use persist::{persist_auto_tags, persist_contradiction};
66
67pub const DEFAULT_INTERVAL_SECS: u64 = crate::SECS_PER_HOUR as u64;
69
70pub const DEFAULT_MAX_OPS_PER_CYCLE: usize = 100;
72
73pub const MIN_CONTENT_LEN: usize = 50;
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct CompactionConfig {
85 #[serde(default)]
88 pub enabled: bool,
89 #[serde(default = "default_cosine_threshold")]
93 pub cosine_threshold: f32,
94 #[serde(default)]
102 pub reflection_pass: reflection_pass::ReflectionPassConfig,
103}
104
105fn default_cosine_threshold() -> f32 {
106 crate::curator::cluster::DEFAULT_COSINE_THRESHOLD
107}
108
109impl Default for CompactionConfig {
110 fn default() -> Self {
111 Self {
112 enabled: false,
113 cosine_threshold: default_cosine_threshold(),
114 reflection_pass: reflection_pass::ReflectionPassConfig::default(),
115 }
116 }
117}
118
119#[derive(Debug, Clone, Serialize, Deserialize)]
121pub struct CuratorConfig {
122 pub interval_secs: u64,
125 pub max_ops_per_cycle: usize,
127 pub dry_run: bool,
129 pub include_namespaces: Vec<String>,
131 pub exclude_namespaces: Vec<String>,
133 #[serde(default)]
136 pub compaction: CompactionConfig,
137}
138
139impl Default for CuratorConfig {
140 fn default() -> Self {
141 Self {
142 interval_secs: DEFAULT_INTERVAL_SECS,
143 max_ops_per_cycle: DEFAULT_MAX_OPS_PER_CYCLE,
144 dry_run: false,
145 include_namespaces: Vec::new(),
146 exclude_namespaces: Vec::new(),
147 compaction: CompactionConfig::default(),
148 }
149 }
150}
151
152#[derive(Debug, Clone, Default, Serialize, Deserialize)]
156pub struct CuratorReport {
157 pub started_at: String,
158 pub completed_at: String,
159 pub cycle_duration_ms: u128,
160 pub memories_scanned: usize,
161 pub memories_eligible: usize,
162 pub auto_tagged: usize,
163 pub contradictions_found: usize,
164 pub operations_attempted: usize,
165 pub operations_skipped_cap: usize,
166 #[serde(default)]
170 pub autonomy: crate::autonomy::AutonomyPassReport,
171 #[serde(default)]
180 pub personas_generated: usize,
181 pub errors: Vec<String>,
182 pub dry_run: bool,
183}
184
185impl CuratorReport {
186 fn new(dry_run: bool) -> Self {
187 let now = chrono::Utc::now().to_rfc3339();
188 Self {
189 started_at: now.clone(),
190 completed_at: now,
191 dry_run,
192 ..Self::default()
193 }
194 }
195}
196
197pub fn run_once(
211 conn: &Connection,
212 llm: Option<&OllamaClient>,
213 cfg: &CuratorConfig,
214 active_keypair: Option<&crate::identity::keypair::AgentKeypair>,
215) -> Result<CuratorReport> {
216 let mut report = CuratorReport::new(cfg.dry_run);
217 let started = Instant::now();
218
219 let CandidateBatch {
220 memories: candidates,
221 truncated,
222 } = collect_candidates(conn, cfg)?;
223 report.memories_scanned = candidates.len();
224 record_truncation(&mut report, truncated, cfg);
225
226 let eligible: Vec<&Memory> = candidates
227 .iter()
228 .filter(|m| needs_curation(m, cfg))
229 .collect();
230 report.memories_eligible = eligible.len();
231
232 let Some(llm_client) = llm else {
233 report.errors.push("no LLM client configured".to_string());
234 report.completed_at = chrono::Utc::now().to_rfc3339();
235 report.cycle_duration_ms = started.elapsed().as_millis();
236 return Ok(report);
237 };
238
239 for mem in eligible {
240 if report.operations_attempted >= cfg.max_ops_per_cycle {
241 report.operations_skipped_cap += 1;
242 continue;
243 }
244 report.operations_attempted += 1;
245
246 match llm_client.auto_tag(&mem.title, &mem.content, None) {
247 Ok(tags) if !tags.is_empty() => {
248 let tag_list: Vec<String> = tags.into_iter().take(8).collect::<Vec<String>>();
249 if !cfg.dry_run
250 && let Err(e) = persist_auto_tags(conn, mem, &tag_list)
251 {
252 report
253 .errors
254 .push(format!("auto_tag persist failed for {}: {e}", mem.id));
255 continue;
256 }
257 report.auto_tagged += 1;
258 }
259 Ok(_) => {}
260 Err(e) => {
261 report
262 .errors
263 .push(format!("auto_tag failed for {}: {e}", mem.id));
264 }
265 }
266
267 if let Ok(Some(sibling)) = adjacent_memory(conn, mem) {
272 match llm_client.detect_contradiction(&mem.content, &sibling.content) {
273 Ok(true) => {
274 if !cfg.dry_run
275 && let Err(e) = persist_contradiction(conn, mem, &sibling.id)
276 {
277 report
278 .errors
279 .push(format!("contradiction persist failed for {}: {e}", mem.id));
280 continue;
281 }
282 report.contradictions_found += 1;
283 }
284 Ok(false) => {}
285 Err(e) => {
286 report.errors.push(format!(
287 "detect_contradiction failed ({} vs {}): {e}",
288 mem.id, sibling.id
289 ));
290 }
291 }
292 }
293 }
294
295 let autonomy_candidates: Vec<crate::models::Memory> = candidates
299 .iter()
300 .filter(|m| needs_curation(m, cfg))
301 .cloned()
302 .collect();
303 let pass_report =
304 crate::autonomy::run_autonomy_passes(conn, llm_client, &autonomy_candidates, cfg.dry_run);
305 report.errors.extend(pass_report.errors.clone());
306 report.autonomy = pass_report;
307
308 persona_sweep(
325 conn,
326 llm_client,
327 &candidates,
328 cfg,
329 active_keypair,
330 &mut report,
331 );
332
333 report.completed_at = chrono::Utc::now().to_rfc3339();
334 report.cycle_duration_ms = started.elapsed().as_millis();
335
336 if !cfg.dry_run
341 && let Err(e) = crate::autonomy::persist_self_report(
342 conn,
343 report.cycle_duration_ms,
344 &report.autonomy,
345 report.auto_tagged,
346 report.contradictions_found,
347 report.personas_generated,
348 report.errors.len(),
349 )
350 {
351 tracing::warn!("self-report persist failed: {e}");
352 }
353
354 crate::metrics::curator_cycle_completed(
355 report.operations_attempted,
356 report.auto_tagged,
357 report.contradictions_found,
358 report.errors.len(),
359 );
360
361 Ok(report)
362}
363
364fn persona_sweep(
390 conn: &Connection,
391 _llm_client: &OllamaClient,
392 _candidates: &[Memory],
393 cfg: &CuratorConfig,
394 active_keypair: Option<&crate::identity::keypair::AgentKeypair>,
395 report: &mut CuratorReport,
396) {
397 let Some(keypair) = active_keypair else {
398 return;
399 };
400
401 use std::collections::BTreeSet;
415 let limit = (cfg.max_ops_per_cycle.saturating_mul(2)).max(64);
416 let mut entity_pairs: BTreeSet<(String, String)> = BTreeSet::new();
417 let scan_result = (|| -> Result<()> {
418 let mut stmt = conn.prepare(
419 "SELECT mentioned_entity_id, namespace
420 FROM memories
421 WHERE memory_kind = 'reflection'
422 AND mentioned_entity_id IS NOT NULL
423 AND namespace NOT LIKE '\\_%' ESCAPE '\\'
424 ORDER BY created_at DESC
425 LIMIT ?1",
426 )?;
427 let rows = stmt.query_map(rusqlite::params![limit as i64], |r| {
428 Ok((r.get::<_, String>(0)?, r.get::<_, String>(1)?))
429 })?;
430 for row in rows {
431 let (eid, ns) = row?;
432 entity_pairs.insert((eid, ns));
433 }
434 Ok(())
435 })();
436 if let Err(e) = scan_result {
437 report.errors.push(format!(
438 "persona_sweep: scan for mentioned_entity_id failed: {e}"
439 ));
440 return;
441 }
442
443 if entity_pairs.is_empty() {
444 return;
445 }
446
447 use crate::persona::{PersonaConfig, PersonaGenerator, get_latest_persona};
450 let config = PersonaConfig::default();
451 let generator = PersonaGenerator::new(conn, _llm_client, Some(keypair), config);
452
453 for (entity_id, namespace) in entity_pairs {
454 if report.operations_attempted >= cfg.max_ops_per_cycle {
455 report.operations_skipped_cap += 1;
456 continue;
457 }
458
459 match get_latest_persona(conn, &entity_id, &namespace) {
467 Ok(Some(_)) => continue,
468 Ok(None) => {}
469 Err(e) => {
470 report.errors.push(format!(
471 "persona_sweep: get_latest_persona failed for ({entity_id}, {namespace}): {e}"
472 ));
473 continue;
474 }
475 }
476
477 report.operations_attempted += 1;
478
479 if cfg.dry_run {
480 report.personas_generated += 1;
485 continue;
486 }
487
488 match generator.generate(&entity_id, &namespace) {
489 Ok(_persona) => {
490 report.personas_generated += 1;
491 }
492 Err(e) => {
493 report.errors.push(format!(
494 "persona_sweep: generate failed for ({entity_id}, {namespace}): {e}"
495 ));
496 }
497 }
498 }
499}
500
501#[allow(clippy::needless_pass_by_value)]
507#[allow(dead_code)] pub fn run_daemon(
509 db_path: PathBuf,
510 llm: Option<Arc<OllamaClient>>,
511 cfg: CuratorConfig,
512 shutdown: Arc<AtomicBool>,
513 active_keypair: Option<Arc<crate::identity::keypair::AgentKeypair>>,
521) {
522 let interval = cfg.interval_secs.clamp(60, crate::SECS_PER_DAY as u64);
523 tracing::info!(
524 "curator daemon started (interval={}s, max_ops={}, dry_run={}, auto_persona={})",
525 interval,
526 cfg.max_ops_per_cycle,
527 cfg.dry_run,
528 active_keypair.is_some()
529 );
530
531 while !shutdown.load(Ordering::Relaxed) {
532 match Connection::open(&db_path) {
533 Ok(conn) => {
534 let llm_ref = llm.as_deref();
535 let kp_ref = active_keypair.as_deref();
536 match run_once(&conn, llm_ref, &cfg, kp_ref) {
537 Ok(report) => tracing::info!(
538 "curator cycle: scanned={} eligible={} tagged={} contradictions={} personas={} errors={} ({}ms, dry_run={})",
539 report.memories_scanned,
540 report.memories_eligible,
541 report.auto_tagged,
542 report.contradictions_found,
543 report.personas_generated,
544 report.errors.len(),
545 report.cycle_duration_ms,
546 report.dry_run
547 ),
548 Err(e) => tracing::error!("curator cycle errored: {e}"),
549 }
550 }
551 Err(e) => tracing::error!("curator could not open db {}: {e}", db_path.display()),
552 }
553
554 let deadline = Instant::now() + Duration::from_secs(interval);
555 while Instant::now() < deadline {
556 if shutdown.load(Ordering::Relaxed) {
557 break;
558 }
559 std::thread::sleep(Duration::from_millis(500));
560 }
561 }
562
563 tracing::info!("curator daemon shutdown");
564}
565
566#[cfg(test)]
567mod tests {
568 use super::candidates::{
573 adjacent_memory, collect_candidates, needs_curation, record_truncation,
574 };
575 use super::persist::{persist_auto_tags, persist_contradiction};
576 use super::*;
577
578 #[test]
579 fn default_config_has_sane_values() {
580 let cfg = CuratorConfig::default();
581 assert_eq!(cfg.interval_secs, DEFAULT_INTERVAL_SECS);
582 assert_eq!(cfg.max_ops_per_cycle, DEFAULT_MAX_OPS_PER_CYCLE);
583 assert!(!cfg.dry_run);
584 assert!(cfg.include_namespaces.is_empty());
585 assert!(cfg.exclude_namespaces.is_empty());
586 }
587
588 #[test]
589 fn needs_curation_skips_internal_namespaces() {
590 let mem = Memory {
591 id: "m1".to_string(),
592 tier: Tier::Mid,
593 namespace: "_messages/alice".to_string(),
594 title: "t".to_string(),
595 content: "a".repeat(100),
596 tags: vec![],
597 priority: 5,
598 confidence: 1.0,
599 source: "test".to_string(),
600 access_count: 0,
601 created_at: "2026-01-01T00:00:00Z".to_string(),
602 updated_at: "2026-01-01T00:00:00Z".to_string(),
603 last_accessed_at: None,
604 expires_at: None,
605 metadata: serde_json::json!({}),
606 reflection_depth: 0,
607 memory_kind: crate::models::MemoryKind::Observation,
608 entity_id: None,
609 persona_version: None,
610 citations: Vec::new(),
611 source_uri: None,
612 source_span: None,
613 confidence_source: crate::models::ConfidenceSource::CallerProvided,
614 confidence_signals: None,
615 confidence_decayed_at: None,
616 version: 1,
617 };
618 assert!(!needs_curation(&mem, &CuratorConfig::default()));
619 }
620
621 #[test]
622 fn needs_curation_skips_short_content() {
623 let mem = Memory {
624 id: "m1".to_string(),
625 tier: Tier::Mid,
626 namespace: "app".to_string(),
627 title: "t".to_string(),
628 content: "short".to_string(),
629 tags: vec![],
630 priority: 5,
631 confidence: 1.0,
632 source: "test".to_string(),
633 access_count: 0,
634 created_at: "2026-01-01T00:00:00Z".to_string(),
635 updated_at: "2026-01-01T00:00:00Z".to_string(),
636 last_accessed_at: None,
637 expires_at: None,
638 metadata: serde_json::json!({}),
639 reflection_depth: 0,
640 memory_kind: crate::models::MemoryKind::Observation,
641 entity_id: None,
642 persona_version: None,
643 citations: Vec::new(),
644 source_uri: None,
645 source_span: None,
646 confidence_source: crate::models::ConfidenceSource::CallerProvided,
647 confidence_signals: None,
648 confidence_decayed_at: None,
649 version: 1,
650 };
651 assert!(!needs_curation(&mem, &CuratorConfig::default()));
652 }
653
654 #[test]
655 fn needs_curation_skips_already_tagged() {
656 let mem = Memory {
657 id: "m1".to_string(),
658 tier: Tier::Long,
659 namespace: "app".to_string(),
660 title: "t".to_string(),
661 content: "a".repeat(100),
662 tags: vec![],
663 priority: 5,
664 confidence: 1.0,
665 source: "test".to_string(),
666 access_count: 0,
667 created_at: "2026-01-01T00:00:00Z".to_string(),
668 updated_at: "2026-01-01T00:00:00Z".to_string(),
669 last_accessed_at: None,
670 expires_at: None,
671 metadata: serde_json::json!({"auto_tags":["x","y"]}),
672 reflection_depth: 0,
673 memory_kind: crate::models::MemoryKind::Observation,
674 entity_id: None,
675 persona_version: None,
676 citations: Vec::new(),
677 source_uri: None,
678 source_span: None,
679 confidence_source: crate::models::ConfidenceSource::CallerProvided,
680 confidence_signals: None,
681 confidence_decayed_at: None,
682 version: 1,
683 };
684 assert!(!needs_curation(&mem, &CuratorConfig::default()));
685 }
686
687 #[test]
688 fn needs_curation_respects_include_list() {
689 let mem = Memory {
690 id: "m1".to_string(),
691 tier: Tier::Long,
692 namespace: "app".to_string(),
693 title: "t".to_string(),
694 content: "a".repeat(100),
695 tags: vec![],
696 priority: 5,
697 confidence: 1.0,
698 source: "test".to_string(),
699 access_count: 0,
700 created_at: "2026-01-01T00:00:00Z".to_string(),
701 updated_at: "2026-01-01T00:00:00Z".to_string(),
702 last_accessed_at: None,
703 expires_at: None,
704 metadata: serde_json::json!({}),
705 reflection_depth: 0,
706 memory_kind: crate::models::MemoryKind::Observation,
707 entity_id: None,
708 persona_version: None,
709 citations: Vec::new(),
710 source_uri: None,
711 source_span: None,
712 confidence_source: crate::models::ConfidenceSource::CallerProvided,
713 confidence_signals: None,
714 confidence_decayed_at: None,
715 version: 1,
716 };
717 let mut cfg = CuratorConfig {
718 include_namespaces: vec!["other".to_string()],
719 ..CuratorConfig::default()
720 };
721 assert!(!needs_curation(&mem, &cfg));
722 cfg.include_namespaces = vec!["app".to_string()];
723 assert!(needs_curation(&mem, &cfg));
724 }
725
726 #[test]
727 fn needs_curation_respects_exclude_list() {
728 let mem = Memory {
729 id: "m1".to_string(),
730 tier: Tier::Long,
731 namespace: "noisy".to_string(),
732 title: "t".to_string(),
733 content: "a".repeat(100),
734 tags: vec![],
735 priority: 5,
736 confidence: 1.0,
737 source: "test".to_string(),
738 access_count: 0,
739 created_at: "2026-01-01T00:00:00Z".to_string(),
740 updated_at: "2026-01-01T00:00:00Z".to_string(),
741 last_accessed_at: None,
742 expires_at: None,
743 metadata: serde_json::json!({}),
744 reflection_depth: 0,
745 memory_kind: crate::models::MemoryKind::Observation,
746 entity_id: None,
747 persona_version: None,
748 citations: Vec::new(),
749 source_uri: None,
750 source_span: None,
751 confidence_source: crate::models::ConfidenceSource::CallerProvided,
752 confidence_signals: None,
753 confidence_decayed_at: None,
754 version: 1,
755 };
756 let cfg = CuratorConfig {
757 exclude_namespaces: vec!["noisy".to_string()],
758 ..CuratorConfig::default()
759 };
760 assert!(!needs_curation(&mem, &cfg));
761 }
762
763 #[test]
764 fn run_once_without_llm_emits_error_but_succeeds() {
765 let tmp = tempfile::NamedTempFile::new().unwrap();
766 let conn = db::open(tmp.path()).unwrap();
767 let cfg = CuratorConfig::default();
768 let report = run_once(&conn, None, &cfg, None).unwrap();
769 assert_eq!(report.memories_scanned, 0);
770 assert_eq!(report.memories_eligible, 0);
771 assert_eq!(report.operations_attempted, 0);
772 assert!(report.errors.iter().any(|e| e.contains("no LLM")));
773 }
774
775 #[test]
776 fn report_serialises_to_json() {
777 let report = CuratorReport::new(true);
778 let json = serde_json::to_string(&report).unwrap();
779 assert!(json.contains("dry_run"));
780 assert!(json.contains("memories_scanned"));
781 }
782
783 fn make_test_memory(ns: &str, title: &str, content: &str) -> Memory {
787 let now = chrono::Utc::now().to_rfc3339();
788 Memory {
789 id: uuid::Uuid::new_v4().to_string(),
790 tier: Tier::Long,
791 namespace: ns.to_string(),
792 title: title.to_string(),
793 content: content.to_string(),
794 tags: vec![],
795 priority: 5,
796 confidence: 1.0,
797 source: "api".to_string(),
798 access_count: 0,
799 created_at: now.clone(),
800 updated_at: now,
801 last_accessed_at: None,
802 expires_at: None,
803 metadata: serde_json::json!({}),
804 reflection_depth: 0,
805 memory_kind: crate::models::MemoryKind::Observation,
806 entity_id: None,
807 persona_version: None,
808 citations: Vec::new(),
809 source_uri: None,
810 source_span: None,
811 confidence_source: crate::models::ConfidenceSource::CallerProvided,
812 confidence_signals: None,
813 confidence_decayed_at: None,
814 version: 1,
815 }
816 }
817
818 #[test]
819 fn persist_auto_tags_writes_metadata() {
820 let tmp = tempfile::NamedTempFile::new().unwrap();
823 let conn = db::open(tmp.path()).unwrap();
824 let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
825 db::insert(&conn, &mem).unwrap();
826
827 persist_auto_tags(&conn, &mem, &["alpha".to_string(), "beta".to_string()]).unwrap();
828
829 let updated = db::get(&conn, &mem.id).unwrap().unwrap();
830 let tags = updated
831 .metadata
832 .get("auto_tags")
833 .unwrap()
834 .as_array()
835 .unwrap();
836 assert_eq!(tags.len(), 2);
837 assert_eq!(tags[0].as_str().unwrap(), "alpha");
838 assert!(
839 updated
840 .metadata
841 .get("curated_at")
842 .and_then(|v| v.as_str())
843 .is_some_and(|s| !s.is_empty())
844 );
845 }
846
847 #[test]
848 fn persist_auto_tags_with_empty_tag_list_still_writes_marker() {
849 let tmp = tempfile::NamedTempFile::new().unwrap();
852 let conn = db::open(tmp.path()).unwrap();
853 let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
854 db::insert(&conn, &mem).unwrap();
855
856 persist_auto_tags(&conn, &mem, &[]).unwrap();
857
858 let updated = db::get(&conn, &mem.id).unwrap().unwrap();
859 let tags = updated
860 .metadata
861 .get("auto_tags")
862 .unwrap()
863 .as_array()
864 .unwrap();
865 assert!(tags.is_empty());
866 }
867
868 #[test]
869 fn persist_contradiction_appends_unique_ids() {
870 let tmp = tempfile::NamedTempFile::new().unwrap();
873 let conn = db::open(tmp.path()).unwrap();
874 let mem = make_test_memory("curate-test", "anchor", &"a".repeat(120));
875 db::insert(&conn, &mem).unwrap();
876
877 persist_contradiction(&conn, &mem, "id-1").unwrap();
878 let mid = db::get(&conn, &mem.id).unwrap().unwrap();
880 persist_contradiction(&conn, &mid, "id-2").unwrap();
881 let mid2 = db::get(&conn, &mem.id).unwrap().unwrap();
883 persist_contradiction(&conn, &mid2, "id-1").unwrap();
884
885 let updated = db::get(&conn, &mem.id).unwrap().unwrap();
886 let ids = updated
887 .metadata
888 .get("confirmed_contradictions")
889 .unwrap()
890 .as_array()
891 .unwrap();
892 assert_eq!(ids.len(), 2);
893 let strs: Vec<String> = ids
894 .iter()
895 .filter_map(|v| v.as_str().map(String::from))
896 .collect();
897 assert!(strs.contains(&"id-1".to_string()));
898 assert!(strs.contains(&"id-2".to_string()));
899 }
900
901 #[test]
902 fn adjacent_memory_returns_none_when_only_self_exists() {
903 let tmp = tempfile::NamedTempFile::new().unwrap();
905 let conn = db::open(tmp.path()).unwrap();
906 let mem = make_test_memory("solo-ns", "only", &"a".repeat(120));
907 db::insert(&conn, &mem).unwrap();
908
909 let got = adjacent_memory(&conn, &mem).unwrap();
910 assert!(got.is_none());
911 }
912
913 #[test]
914 fn adjacent_memory_returns_some_when_sibling_present() {
915 let tmp = tempfile::NamedTempFile::new().unwrap();
918 let conn = db::open(tmp.path()).unwrap();
919 let m1 = make_test_memory("dual-ns", "first", &"a".repeat(120));
920 let m2 = make_test_memory("dual-ns", "second", &"b".repeat(120));
921 db::insert(&conn, &m1).unwrap();
922 db::insert(&conn, &m2).unwrap();
923
924 let got = adjacent_memory(&conn, &m1).unwrap().unwrap();
925 assert_ne!(got.id, m1.id);
926 assert!(got.content.len() >= MIN_CONTENT_LEN);
927 }
928
929 #[test]
930 fn adjacent_memory_skips_short_sibling() {
931 let tmp = tempfile::NamedTempFile::new().unwrap();
933 let conn = db::open(tmp.path()).unwrap();
934 let m1 = make_test_memory("ns-short", "anchor", &"a".repeat(120));
935 let mut m2 = make_test_memory("ns-short", "tiny-sibling", "x");
936 m2.content = "short".to_string(); db::insert(&conn, &m1).unwrap();
938 db::insert(&conn, &m2).unwrap();
939
940 let got = adjacent_memory(&conn, &m1).unwrap();
941 assert!(got.is_none());
942 }
943
944 #[test]
945 fn record_truncation_appends_when_truncated() {
946 let mut report = CuratorReport::new(false);
947 let cfg = CuratorConfig::default();
948 record_truncation(&mut report, true, &cfg);
949 assert_eq!(report.errors.len(), 1);
950 assert!(report.errors[0].contains("collect_candidates truncated"));
951 }
952
953 #[test]
954 fn record_truncation_noop_when_not_truncated() {
955 let mut report = CuratorReport::new(false);
956 let cfg = CuratorConfig::default();
957 record_truncation(&mut report, false, &cfg);
958 assert!(report.errors.is_empty());
959 }
960
961 #[test]
962 fn collect_candidates_returns_eligible_memories() {
963 let tmp = tempfile::NamedTempFile::new().unwrap();
966 let conn = db::open(tmp.path()).unwrap();
967 for i in 0..3 {
968 let mem = make_test_memory("cand-ns", &format!("row-{i}"), &"a".repeat(120));
969 db::insert(&conn, &mem).unwrap();
970 }
971 let cfg = CuratorConfig::default();
972 let batch = collect_candidates(&conn, &cfg).unwrap();
973 assert!(!batch.memories.is_empty());
974 assert!(!batch.truncated);
976 }
977
978 #[test]
979 fn run_once_with_dry_run_does_not_persist() {
980 let tmp = tempfile::NamedTempFile::new().unwrap();
983 let conn = db::open(tmp.path()).unwrap();
984 let mem = make_test_memory("dry-ns", "anchor", &"a".repeat(120));
985 db::insert(&conn, &mem).unwrap();
986
987 let cfg = CuratorConfig {
988 dry_run: true,
989 ..CuratorConfig::default()
990 };
991 let report = run_once(&conn, None, &cfg, None).unwrap();
992 assert!(report.dry_run);
993 let after = db::get(&conn, &mem.id).unwrap().unwrap();
995 assert!(after.metadata.get("auto_tags").is_none());
996 }
997
998 #[test]
999 fn run_daemon_executes_multiple_cycles_and_respects_shutdown() {
1000 use std::sync::Mutex;
1001 use std::thread;
1002 use std::time::Duration;
1003
1004 let tmp = tempfile::NamedTempFile::new().unwrap();
1005 let db_path = tmp.path().to_path_buf();
1006 let conn = db::open(&db_path).unwrap();
1007
1008 let now = chrono::Utc::now().to_rfc3339();
1010 for i in 0..5 {
1011 let mem = Memory {
1012 id: format!("test-mem-{i}"),
1013 tier: crate::models::Tier::Mid,
1014 namespace: "test".to_string(),
1015 title: format!("Memory {i}"),
1016 content: "x".repeat(100), tags: vec![],
1018 priority: 5,
1019 confidence: 1.0,
1020 source: "test".to_string(),
1021 access_count: 0,
1022 created_at: now.clone(),
1023 updated_at: now.clone(),
1024 last_accessed_at: None,
1025 expires_at: None,
1026 metadata: serde_json::json!({}),
1027 reflection_depth: 0,
1028 memory_kind: crate::models::MemoryKind::Observation,
1029 entity_id: None,
1030 persona_version: None,
1031 citations: Vec::new(),
1032 source_uri: None,
1033 source_span: None,
1034 confidence_source: crate::models::ConfidenceSource::CallerProvided,
1035 confidence_signals: None,
1036 confidence_decayed_at: None,
1037 version: 1,
1038 };
1039 db::insert(&conn, &mem).unwrap();
1040 }
1041 drop(conn);
1042
1043 let cycle_count = std::sync::Arc::new(Mutex::new(0));
1045 let cycle_count_for_test = cycle_count.clone();
1046
1047 let cfg = CuratorConfig {
1049 interval_secs: 1,
1050 max_ops_per_cycle: 50,
1051 dry_run: true, include_namespaces: vec![],
1053 exclude_namespaces: vec![],
1054 ..CuratorConfig::default()
1055 };
1056
1057 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
1059 let shutdown_for_daemon = shutdown.clone();
1060
1061 let daemon_thread = thread::spawn(move || {
1063 *cycle_count_for_test.lock().unwrap() = 1;
1065 run_daemon(db_path, None, cfg, shutdown_for_daemon, None);
1066 *cycle_count_for_test.lock().unwrap() = 2;
1068 });
1069
1070 thread::sleep(Duration::from_millis(2500));
1072
1073 shutdown.store(true, std::sync::atomic::Ordering::Relaxed);
1075
1076 let join_result = daemon_thread.join();
1078 assert!(
1079 join_result.is_ok(),
1080 "daemon thread panicked or failed to join"
1081 );
1082
1083 let final_count = *cycle_count.lock().unwrap();
1085 assert_eq!(
1086 final_count, 2,
1087 "daemon should have entered and exited cleanly"
1088 );
1089 }
1090
1091 use std::io::{BufRead, BufReader, Read, Write};
1100 use std::net::TcpListener;
1101 use std::sync::Arc as StdArc;
1102 use std::sync::atomic::{AtomicBool as StdAtomicBool, AtomicUsize, Ordering as StdOrdering};
1103 use std::thread::JoinHandle;
1104
1105 #[derive(Clone)]
1107 struct FakeOllamaCfg {
1108 tag_response: String,
1110 contradiction_answer: String,
1112 summary_response: String,
1114 chat_returns_error: bool,
1116 }
1117
1118 impl Default for FakeOllamaCfg {
1119 fn default() -> Self {
1120 Self {
1121 tag_response: "alpha\nbeta\ngamma".to_string(),
1122 contradiction_answer: "no".to_string(),
1123 summary_response: "consolidated summary".to_string(),
1124 chat_returns_error: false,
1125 }
1126 }
1127 }
1128
1129 struct FakeOllama {
1131 url: String,
1132 shutdown: StdArc<StdAtomicBool>,
1133 handle: Option<JoinHandle<()>>,
1134 chat_calls: StdArc<AtomicUsize>,
1135 }
1136
1137 impl FakeOllama {
1138 fn start(cfg: FakeOllamaCfg) -> Self {
1139 let listener = TcpListener::bind("127.0.0.1:0").expect("bind 127.0.0.1");
1140 let addr = listener.local_addr().unwrap();
1141 listener.set_nonblocking(true).unwrap();
1143 let shutdown = StdArc::new(StdAtomicBool::new(false));
1144 let chat_calls = StdArc::new(AtomicUsize::new(0));
1145 let shutdown_for_thread = shutdown.clone();
1146 let chat_calls_for_thread = chat_calls.clone();
1147 let cfg_for_thread = cfg;
1148
1149 let handle = std::thread::spawn(move || {
1150 while !shutdown_for_thread.load(StdOrdering::Relaxed) {
1151 match listener.accept() {
1152 Ok((mut stream, _peer)) => {
1153 stream.set_nonblocking(false).ok();
1154 stream
1155 .set_read_timeout(Some(std::time::Duration::from_secs(2)))
1156 .ok();
1157 let cfg = cfg_for_thread.clone();
1158 let chat_calls = chat_calls_for_thread.clone();
1159 std::thread::spawn(move || {
1160 handle_one(&mut stream, &cfg, &chat_calls);
1161 });
1162 }
1163 Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => {
1164 std::thread::sleep(std::time::Duration::from_millis(20));
1165 }
1166 Err(_) => break,
1167 }
1168 }
1169 });
1170
1171 Self {
1172 url: format!("http://127.0.0.1:{}", addr.port()),
1173 shutdown,
1174 handle: Some(handle),
1175 chat_calls,
1176 }
1177 }
1178 }
1179
1180 impl Drop for FakeOllama {
1181 fn drop(&mut self) {
1182 self.shutdown.store(true, StdOrdering::Relaxed);
1183 if let Some(h) = self.handle.take() {
1184 let _ = h.join();
1185 }
1186 }
1187 }
1188
1189 fn handle_one(stream: &mut std::net::TcpStream, cfg: &FakeOllamaCfg, chat_calls: &AtomicUsize) {
1193 let mut reader = BufReader::new(stream.try_clone().expect("clone tcp"));
1194 let mut request_line = String::new();
1196 if reader.read_line(&mut request_line).is_err() {
1197 return;
1198 }
1199 let parts: Vec<&str> = request_line.split_whitespace().collect();
1200 if parts.len() < 2 {
1201 return;
1202 }
1203 let method = parts[0];
1204 let path = parts[1];
1205
1206 let mut content_length: usize = 0;
1208 loop {
1209 let mut header = String::new();
1210 if reader.read_line(&mut header).is_err() {
1211 return;
1212 }
1213 if header == "\r\n" || header.is_empty() {
1214 break;
1215 }
1216 let lower = header.to_ascii_lowercase();
1217 if let Some(rest) = lower.strip_prefix("content-length:") {
1218 content_length = rest.trim().parse().unwrap_or(0);
1219 }
1220 }
1221
1222 let mut body = vec![0u8; content_length];
1224 if content_length > 0 {
1225 let _ = reader.read_exact(&mut body);
1226 }
1227 let body_str = String::from_utf8_lossy(&body).to_string();
1228
1229 let (status, body): (&str, String) = if method == "GET" && path == "/api/tags" {
1230 (
1232 "200 OK",
1233 serde_json::json!({"models": [{"name": "fake-model:latest"}]}).to_string(),
1234 )
1235 } else if method == "POST" && path == "/api/chat" {
1236 chat_calls.fetch_add(1, StdOrdering::Relaxed);
1237 if cfg.chat_returns_error {
1238 (
1239 "500 Internal Server Error",
1240 "{\"error\":\"forced fault\"}".to_string(),
1241 )
1242 } else {
1243 let response = if body_str.contains("contradict") {
1245 cfg.contradiction_answer.clone()
1246 } else if body_str.contains("Summarize") || body_str.contains("summari") {
1247 cfg.summary_response.clone()
1248 } else if body_str.contains("tags") {
1249 cfg.tag_response.clone()
1250 } else {
1251 "ok".to_string()
1252 };
1253 (
1254 "200 OK",
1255 serde_json::json!({"message": {"content": response}}).to_string(),
1256 )
1257 }
1258 } else if method == "POST" && path == "/api/generate" {
1259 chat_calls.fetch_add(1, StdOrdering::Relaxed);
1266 if cfg.chat_returns_error {
1267 (
1268 "500 Internal Server Error",
1269 "{\"error\":\"forced fault\"}".to_string(),
1270 )
1271 } else {
1272 let response = cfg.tag_response.clone();
1273 (
1274 "200 OK",
1275 serde_json::json!({"response": response}).to_string(),
1276 )
1277 }
1278 } else {
1279 ("404 Not Found", "{}".to_string())
1280 };
1281
1282 let resp = format!(
1283 "HTTP/1.1 {status}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
1284 body.len()
1285 );
1286 let _ = stream.write_all(resp.as_bytes());
1287 let _ = stream.flush();
1288 let _ = stream.shutdown(std::net::Shutdown::Write);
1289 }
1290
1291 fn ollama_for(server: &FakeOllama) -> crate::llm::OllamaClient {
1293 crate::llm::OllamaClient::new_with_url(&server.url, "fake-model")
1294 .expect("client must reach fake server")
1295 }
1296
1297 fn make_eligible_memory(ns: &str, title: &str) -> Memory {
1298 let now = chrono::Utc::now().to_rfc3339();
1299 Memory {
1300 id: uuid::Uuid::new_v4().to_string(),
1301 tier: Tier::Long,
1302 namespace: ns.to_string(),
1303 title: title.to_string(),
1304 content: "a".repeat(120),
1305 tags: vec![],
1306 priority: 5,
1307 confidence: 1.0,
1308 source: "api".to_string(),
1309 access_count: 0,
1310 created_at: now.clone(),
1311 updated_at: now,
1312 last_accessed_at: None,
1313 expires_at: None,
1314 metadata: serde_json::json!({}),
1315 reflection_depth: 0,
1316 memory_kind: crate::models::MemoryKind::Observation,
1317 entity_id: None,
1318 persona_version: None,
1319 citations: Vec::new(),
1320 source_uri: None,
1321 source_span: None,
1322 confidence_source: crate::models::ConfidenceSource::CallerProvided,
1323 confidence_signals: None,
1324 confidence_decayed_at: None,
1325 version: 1,
1326 }
1327 }
1328
1329 #[test]
1333 fn run_once_with_llm_tags_eligible_memories() {
1334 let server = FakeOllama::start(FakeOllamaCfg::default());
1335 let llm = ollama_for(&server);
1336
1337 let tmp = tempfile::NamedTempFile::new().unwrap();
1338 let conn = db::open(tmp.path()).unwrap();
1339 let mem = make_eligible_memory("autotag-ns", "anchor");
1340 db::insert(&conn, &mem).unwrap();
1341
1342 let cfg = CuratorConfig {
1343 include_namespaces: vec!["autotag-ns".to_string()],
1346 ..CuratorConfig::default()
1347 };
1348 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1349
1350 assert!(report.memories_eligible >= 1);
1351 assert!(report.auto_tagged >= 1, "report: {report:?}");
1352 let updated = db::get(&conn, &mem.id).unwrap().unwrap();
1353 let tags = updated
1354 .metadata
1355 .get("auto_tags")
1356 .and_then(|v| v.as_array())
1357 .expect("auto_tags persisted");
1358 assert!(!tags.is_empty());
1359 }
1360
1361 #[test]
1365 fn run_once_with_llm_dry_run_skips_writes() {
1366 let server = FakeOllama::start(FakeOllamaCfg::default());
1367 let llm = ollama_for(&server);
1368
1369 let tmp = tempfile::NamedTempFile::new().unwrap();
1370 let conn = db::open(tmp.path()).unwrap();
1371 let mem = make_eligible_memory("dry-llm-ns", "anchor");
1372 db::insert(&conn, &mem).unwrap();
1373
1374 let cfg = CuratorConfig {
1375 dry_run: true,
1376 include_namespaces: vec!["dry-llm-ns".to_string()],
1377 ..CuratorConfig::default()
1378 };
1379 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1380 assert!(report.dry_run);
1381
1382 let after = db::get(&conn, &mem.id).unwrap().unwrap();
1384 assert!(after.metadata.get("auto_tags").is_none());
1385 let reports = db::list(
1386 &conn,
1387 Some("_curator/reports"),
1388 None,
1389 10,
1390 0,
1391 None,
1392 None,
1393 None,
1394 None,
1395 None,
1396 )
1397 .unwrap();
1398 assert!(reports.is_empty(), "dry-run must not persist self-report");
1399 }
1400
1401 #[test]
1405 fn run_once_max_ops_cap_respected() {
1406 let server = FakeOllama::start(FakeOllamaCfg::default());
1407 let llm = ollama_for(&server);
1408
1409 let tmp = tempfile::NamedTempFile::new().unwrap();
1410 let conn = db::open(tmp.path()).unwrap();
1411 for i in 0..3 {
1412 let m = make_eligible_memory("capns", &format!("anchor-{i}"));
1413 db::insert(&conn, &m).unwrap();
1414 }
1415 let cfg = CuratorConfig {
1416 max_ops_per_cycle: 1,
1417 include_namespaces: vec!["capns".to_string()],
1418 ..CuratorConfig::default()
1419 };
1420 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1421 assert_eq!(report.operations_attempted, 1);
1422 assert!(report.operations_skipped_cap >= 2, "report: {report:?}");
1423 }
1424
1425 #[test]
1429 fn run_once_include_namespaces_filter() {
1430 let server = FakeOllama::start(FakeOllamaCfg::default());
1431 let llm = ollama_for(&server);
1432
1433 let tmp = tempfile::NamedTempFile::new().unwrap();
1434 let conn = db::open(tmp.path()).unwrap();
1435 let inside = make_eligible_memory("included", "in");
1436 let outside = make_eligible_memory("not-included", "out");
1437 db::insert(&conn, &inside).unwrap();
1438 db::insert(&conn, &outside).unwrap();
1439
1440 let cfg = CuratorConfig {
1441 include_namespaces: vec!["included".to_string()],
1442 ..CuratorConfig::default()
1443 };
1444 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1445 assert!(report.memories_scanned >= 2);
1447 assert_eq!(report.memories_eligible, 1);
1448 let after_outside = db::get(&conn, &outside.id).unwrap().unwrap();
1450 assert!(after_outside.metadata.get("auto_tags").is_none());
1451 }
1452
1453 #[test]
1455 fn run_once_exclude_namespaces_filter() {
1456 let server = FakeOllama::start(FakeOllamaCfg::default());
1457 let llm = ollama_for(&server);
1458
1459 let tmp = tempfile::NamedTempFile::new().unwrap();
1460 let conn = db::open(tmp.path()).unwrap();
1461 let kept = make_eligible_memory("kept", "k");
1462 let dropped = make_eligible_memory("dropped", "d");
1463 db::insert(&conn, &kept).unwrap();
1464 db::insert(&conn, &dropped).unwrap();
1465
1466 let cfg = CuratorConfig {
1467 exclude_namespaces: vec!["dropped".to_string()],
1468 ..CuratorConfig::default()
1469 };
1470 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1471 assert!(report.memories_scanned >= 2);
1472 assert_eq!(report.memories_eligible, 1);
1474 let after_dropped = db::get(&conn, &dropped.id).unwrap().unwrap();
1475 assert!(after_dropped.metadata.get("auto_tags").is_none());
1476 }
1477
1478 #[test]
1482 fn run_once_handles_zero_candidates() {
1483 let server = FakeOllama::start(FakeOllamaCfg::default());
1484 let llm = ollama_for(&server);
1485
1486 let tmp = tempfile::NamedTempFile::new().unwrap();
1487 let conn = db::open(tmp.path()).unwrap();
1488 let cfg = CuratorConfig::default();
1489
1490 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1491 assert_eq!(report.memories_scanned, 0);
1492 assert_eq!(report.memories_eligible, 0);
1493 assert_eq!(report.operations_attempted, 0);
1494 assert_eq!(report.auto_tagged, 0);
1495 assert_eq!(report.contradictions_found, 0);
1496 }
1497
1498 #[test]
1502 fn run_once_records_contradictions_when_llm_affirms() {
1503 let cfg_server = FakeOllamaCfg {
1504 contradiction_answer: "yes".to_string(),
1505 ..FakeOllamaCfg::default()
1506 };
1507 let server = FakeOllama::start(cfg_server);
1508 let llm = ollama_for(&server);
1509
1510 let tmp = tempfile::NamedTempFile::new().unwrap();
1511 let conn = db::open(tmp.path()).unwrap();
1512 let m1 = make_eligible_memory("dual", "first");
1513 let m2 = make_eligible_memory("dual", "second");
1514 db::insert(&conn, &m1).unwrap();
1515 db::insert(&conn, &m2).unwrap();
1516
1517 let cfg = CuratorConfig {
1518 include_namespaces: vec!["dual".to_string()],
1519 ..CuratorConfig::default()
1520 };
1521 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1522 assert!(report.contradictions_found >= 1, "report: {report:?}");
1523 }
1524
1525 #[test]
1529 fn run_once_records_errors_when_llm_fails() {
1530 let cfg_server = FakeOllamaCfg {
1531 chat_returns_error: true,
1532 ..FakeOllamaCfg::default()
1533 };
1534 let server = FakeOllama::start(cfg_server);
1535 let llm = ollama_for(&server);
1536
1537 let tmp = tempfile::NamedTempFile::new().unwrap();
1538 let conn = db::open(tmp.path()).unwrap();
1539 let mem = make_eligible_memory("fail-ns", "anchor");
1540 db::insert(&conn, &mem).unwrap();
1541
1542 let cfg = CuratorConfig {
1543 include_namespaces: vec!["fail-ns".to_string()],
1544 ..CuratorConfig::default()
1545 };
1546 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1547 assert!(!report.completed_at.is_empty());
1549 assert!(
1551 report
1552 .errors
1553 .iter()
1554 .any(|e| e.contains("auto_tag failed") || e.contains("detect_contradiction failed")),
1555 "expected an LLM-error entry in report.errors: {:?}",
1556 report.errors
1557 );
1558 let after = db::get(&conn, &mem.id).unwrap().unwrap();
1560 assert!(after.metadata.get("auto_tags").is_none());
1561 }
1562
1563 #[test]
1567 fn run_once_writes_self_report_when_not_dry_run() {
1568 let server = FakeOllama::start(FakeOllamaCfg::default());
1569 let llm = ollama_for(&server);
1570
1571 let tmp = tempfile::NamedTempFile::new().unwrap();
1572 let conn = db::open(tmp.path()).unwrap();
1573 let mem = make_eligible_memory("report-ns", "anchor");
1574 db::insert(&conn, &mem).unwrap();
1575
1576 let cfg = CuratorConfig {
1577 include_namespaces: vec!["report-ns".to_string()],
1578 ..CuratorConfig::default()
1579 };
1580 let _ = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1581
1582 let reports = db::list(
1583 &conn,
1584 Some("_curator/reports"),
1585 None,
1586 10,
1587 0,
1588 None,
1589 None,
1590 None,
1591 None,
1592 None,
1593 )
1594 .unwrap();
1595 assert_eq!(reports.len(), 1);
1596 assert!(reports[0].content.contains("memories_consolidated"));
1597 }
1598
1599 #[test]
1604 fn run_once_idempotent_on_already_tagged_rows() {
1605 let server = FakeOllama::start(FakeOllamaCfg::default());
1606 let llm = ollama_for(&server);
1607
1608 let tmp = tempfile::NamedTempFile::new().unwrap();
1609 let conn = db::open(tmp.path()).unwrap();
1610 let mem = make_eligible_memory("idem-ns", "anchor");
1611 db::insert(&conn, &mem).unwrap();
1612
1613 let cfg = CuratorConfig {
1614 include_namespaces: vec!["idem-ns".to_string()],
1615 ..CuratorConfig::default()
1616 };
1617 let r1 = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1618 assert_eq!(r1.memories_eligible, 1);
1619 let r2 = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1620 assert!(r2.memories_scanned >= 1);
1621 assert_eq!(r2.memories_eligible, 0);
1622 assert_eq!(r2.operations_attempted, 0);
1623 }
1624
1625 #[test]
1631 fn run_once_iterates_through_multiple_rows() {
1632 let server = FakeOllama::start(FakeOllamaCfg::default());
1633 let llm = ollama_for(&server);
1634
1635 let tmp = tempfile::NamedTempFile::new().unwrap();
1636 let conn = db::open(tmp.path()).unwrap();
1637 for i in 0..3 {
1638 let m = make_eligible_memory("multi-ns", &format!("anchor-{i}"));
1639 db::insert(&conn, &m).unwrap();
1640 }
1641 let cfg = CuratorConfig {
1642 include_namespaces: vec!["multi-ns".to_string()],
1643 ..CuratorConfig::default()
1644 };
1645 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1646 assert_eq!(report.operations_attempted, 3);
1647 assert_eq!(report.auto_tagged, 3);
1648 assert!(server.chat_calls.load(StdOrdering::Relaxed) >= 3);
1650 }
1651
1652 #[test]
1658 fn run_once_smart_tier_consults_llm_for_clusters() {
1659 let server = FakeOllama::start(FakeOllamaCfg::default());
1660 let llm = ollama_for(&server);
1661
1662 let tmp = tempfile::NamedTempFile::new().unwrap();
1663 let conn = db::open(tmp.path()).unwrap();
1664 let now = chrono::Utc::now().to_rfc3339();
1666 let m_a = Memory {
1667 id: "smart-a".to_string(),
1668 tier: Tier::Long,
1669 namespace: "smart".to_string(),
1670 title: "deploy plan".to_string(),
1671 content: "kubernetes rolling canary deploy strategy kubernetes deploy".to_string(),
1672 tags: vec![],
1673 priority: 5,
1674 confidence: 1.0,
1675 source: "api".to_string(),
1676 access_count: 0,
1677 created_at: now.clone(),
1678 updated_at: now.clone(),
1679 last_accessed_at: None,
1680 expires_at: None,
1681 metadata: serde_json::json!({}),
1682 reflection_depth: 0,
1683 memory_kind: crate::models::MemoryKind::Observation,
1684 entity_id: None,
1685 persona_version: None,
1686 citations: Vec::new(),
1687 source_uri: None,
1688 source_span: None,
1689 confidence_source: crate::models::ConfidenceSource::CallerProvided,
1690 confidence_signals: None,
1691 confidence_decayed_at: None,
1692 version: 1,
1693 };
1694 let m_b = Memory {
1695 id: "smart-b".to_string(),
1696 content: "kubernetes rolling canary deploy strategy kubernetes deploy".to_string(),
1697 title: "deploy overview".to_string(),
1698 ..m_a.clone()
1699 };
1700 db::insert(&conn, &m_a).unwrap();
1701 db::insert(&conn, &m_b).unwrap();
1702
1703 let cfg = CuratorConfig {
1704 include_namespaces: vec!["smart".to_string()],
1705 ..CuratorConfig::default()
1706 };
1707 let report = run_once(&conn, Some(&llm), &cfg, None).unwrap();
1708 assert!(server.chat_calls.load(StdOrdering::Relaxed) >= 3);
1710 assert!(report.autonomy.clusters_formed >= 1, "report: {report:?}");
1712 }
1713
1714 #[test]
1731 fn run_once_persona_sweep_generates_signed_persona_for_new_entity() {
1732 let server = FakeOllama::start(FakeOllamaCfg::default());
1733 let llm = ollama_for(&server);
1734
1735 let tmp = tempfile::NamedTempFile::new().unwrap();
1736 let conn = db::open(tmp.path()).unwrap();
1737
1738 let obs = make_eligible_memory("auto-persona-ns", "observation");
1744 let obs_id = db::insert(&conn, &obs).unwrap();
1745
1746 let entity_id = "auto-persona-entity-2026-05-16";
1754 let mut rfl = make_eligible_memory("auto-persona-ns", "reflection-of-obs");
1755 rfl.memory_kind = crate::models::MemoryKind::Reflection;
1756 rfl.reflection_depth = 1;
1757 rfl.content = "This reflection mentions the entity under test.".to_string();
1758 let rfl_id = db::insert(&conn, &rfl).unwrap();
1759 conn.execute(
1765 "UPDATE memories SET mentioned_entity_id = ?1 WHERE id = ?2",
1766 rusqlite::params![entity_id, &rfl_id],
1767 )
1768 .unwrap();
1769 db::create_link(&conn, &rfl_id, &obs_id, "reflects_on").unwrap();
1770
1771 let kp = crate::identity::keypair::generate("daemon").unwrap();
1776
1777 let cfg = CuratorConfig {
1778 include_namespaces: vec!["auto-persona-ns".to_string()],
1779 ..CuratorConfig::default()
1780 };
1781 let report = run_once(&conn, Some(&llm), &cfg, Some(&kp)).unwrap();
1782
1783 assert!(
1784 report.personas_generated >= 1,
1785 "expected at least one auto-persona generation, report.errors={:?}",
1786 report.errors
1787 );
1788
1789 let persona = crate::persona::get_latest_persona(&conn, entity_id, "auto-persona-ns")
1791 .expect("get_latest_persona failed")
1792 .expect("persona row must exist after sweep");
1793 assert_eq!(
1794 persona.attest_level, "self_signed",
1795 "persona attest_level must be self_signed (was {:?})",
1796 persona.attest_level
1797 );
1798
1799 let row: String = conn
1801 .query_row(
1802 "SELECT metadata FROM memories WHERE id = ?1",
1803 rusqlite::params![&persona.id],
1804 |r| r.get(0),
1805 )
1806 .unwrap();
1807 let meta: serde_json::Value = serde_json::from_str(&row).unwrap();
1808 let sig_b64 = meta
1809 .get("persona")
1810 .and_then(|p| p.get("signature"))
1811 .and_then(|v| v.as_str())
1812 .expect("metadata.persona.signature missing");
1813 use base64::Engine;
1814 let sig_bytes = base64::engine::general_purpose::STANDARD
1815 .decode(sig_b64)
1816 .expect("signature must be valid base64");
1817 assert_eq!(
1818 sig_bytes.len(),
1819 64,
1820 "metadata.persona.signature must decode to 64 bytes (got {})",
1821 sig_bytes.len()
1822 );
1823
1824 let mut stmt = conn
1826 .prepare(
1827 "SELECT attest_level, length(signature) \
1828 FROM memory_links \
1829 WHERE source_id = ?1 AND relation = 'derived_from'",
1830 )
1831 .unwrap();
1832 let rows: Vec<(String, Option<i64>)> = stmt
1833 .query_map(rusqlite::params![&persona.id], |r| {
1834 Ok((r.get::<_, String>(0)?, r.get::<_, Option<i64>>(1)?))
1835 })
1836 .unwrap()
1837 .map(std::result::Result::unwrap)
1838 .collect();
1839 assert!(
1840 !rows.is_empty(),
1841 "persona must emit at least one derived_from edge"
1842 );
1843 for (attest_level, sig_len) in &rows {
1844 assert_eq!(
1845 attest_level, "self_signed",
1846 "persona derived_from edges must be self_signed"
1847 );
1848 assert_eq!(
1849 sig_len.unwrap_or(0),
1850 64,
1851 "persona derived_from signature must be 64 bytes"
1852 );
1853 }
1854 }
1855
1856 #[test]
1864 fn run_once_persona_sweep_dry_run_counts_without_writing() {
1865 let server = FakeOllama::start(FakeOllamaCfg::default());
1866 let llm = ollama_for(&server);
1867
1868 let tmp = tempfile::NamedTempFile::new().unwrap();
1869 let conn = db::open(tmp.path()).unwrap();
1870
1871 let obs = make_eligible_memory("dry-persona-ns", "observation");
1872 let obs_id = db::insert(&conn, &obs).unwrap();
1873
1874 let entity_id = "dry-persona-entity-2026-05-18";
1875 let mut rfl = make_eligible_memory("dry-persona-ns", "reflection-of-obs");
1876 rfl.memory_kind = crate::models::MemoryKind::Reflection;
1877 rfl.reflection_depth = 1;
1878 rfl.content = "Dry-run reflection mentions the entity under test.".to_string();
1879 let rfl_id = db::insert(&conn, &rfl).unwrap();
1880 conn.execute(
1886 "UPDATE memories SET mentioned_entity_id = ?1 WHERE id = ?2",
1887 rusqlite::params![entity_id, &rfl_id],
1888 )
1889 .unwrap();
1890 db::create_link(&conn, &rfl_id, &obs_id, "reflects_on").unwrap();
1891
1892 let kp = crate::identity::keypair::generate("daemon").unwrap();
1893
1894 let cfg = CuratorConfig {
1895 include_namespaces: vec!["dry-persona-ns".to_string()],
1896 dry_run: true,
1897 ..CuratorConfig::default()
1898 };
1899 let report = run_once(&conn, Some(&llm), &cfg, Some(&kp)).unwrap();
1900
1901 assert!(
1903 report.personas_generated >= 1,
1904 "dry-run must still count would-be persona generations, errors={:?}",
1905 report.errors
1906 );
1907
1908 let persona = crate::persona::get_latest_persona(&conn, entity_id, "dry-persona-ns")
1910 .expect("get_latest_persona must not error");
1911 assert!(
1912 persona.is_none(),
1913 "dry-run must NOT write a persona row, got: {persona:?}"
1914 );
1915 }
1916}
1917
1918#[test]
1919fn apply_rollback_handles_storage_error() {
1920 let tmp = tempfile::NamedTempFile::new().unwrap();
1923 let conn = db::open(tmp.path()).unwrap();
1924
1925 let now = chrono::Utc::now().to_rfc3339();
1930 let mem = Memory {
1931 id: "m1".to_string(),
1932 tier: Tier::Mid,
1933 namespace: "test".to_string(),
1934 title: "Test".to_string(),
1935 content: "a".repeat(100),
1936 tags: vec![],
1937 priority: 5,
1938 confidence: 1.0,
1939 source: "test".to_string(),
1940 access_count: 0,
1941 created_at: now.clone(),
1942 updated_at: now,
1943 last_accessed_at: None,
1944 expires_at: None,
1945 metadata: serde_json::json!({}),
1946 reflection_depth: 0,
1947 memory_kind: crate::models::MemoryKind::Observation,
1948 entity_id: None,
1949 persona_version: None,
1950 citations: Vec::new(),
1951 source_uri: None,
1952 source_span: None,
1953 confidence_source: crate::models::ConfidenceSource::CallerProvided,
1954 confidence_signals: None,
1955 confidence_decayed_at: None,
1956 version: 1,
1957 };
1958
1959 db::insert(&conn, &mem).unwrap();
1961
1962 let tags = vec!["test-tag".to_string()];
1967 match persist_auto_tags(&conn, &mem, &tags) {
1968 Ok(_) => {
1969 let batch = db::list(&conn, None, None, 10, 0, None, None, None, None, None).unwrap();
1971 let updated = batch.iter().find(|m| m.id == mem.id).unwrap();
1972 assert!(updated.metadata.get("auto_tags").is_some());
1973 }
1974 Err(e) => {
1975 assert!(!e.to_string().is_empty());
1977 }
1978 }
1979}
1980
1981#[test]
1982fn consolidate_pair_skips_when_namespaces_disagree() {
1983 let tmp = tempfile::NamedTempFile::new().unwrap();
1987 let conn = db::open(tmp.path()).unwrap();
1988
1989 let now = chrono::Utc::now().to_rfc3339();
1990 let mem1 = Memory {
1991 id: "m1".to_string(),
1992 tier: Tier::Mid,
1993 namespace: "ns1".to_string(),
1994 title: "Title 1".to_string(),
1995 content: "a".repeat(100),
1996 tags: vec![],
1997 priority: 5,
1998 confidence: 1.0,
1999 source: "test".to_string(),
2000 access_count: 0,
2001 created_at: now.clone(),
2002 updated_at: now.clone(),
2003 last_accessed_at: None,
2004 expires_at: None,
2005 metadata: serde_json::json!({}),
2006 reflection_depth: 0,
2007 memory_kind: crate::models::MemoryKind::Observation,
2008 entity_id: None,
2009 persona_version: None,
2010 citations: Vec::new(),
2011 source_uri: None,
2012 source_span: None,
2013 confidence_source: crate::models::ConfidenceSource::CallerProvided,
2014 confidence_signals: None,
2015 confidence_decayed_at: None,
2016 version: 1,
2017 };
2018
2019 let mem2 = Memory {
2020 id: "m2".to_string(),
2021 tier: Tier::Mid,
2022 namespace: "ns2".to_string(),
2023 title: "Title 2".to_string(),
2024 content: "b".repeat(100),
2025 tags: vec![],
2026 priority: 5,
2027 confidence: 1.0,
2028 source: "test".to_string(),
2029 access_count: 0,
2030 created_at: now.clone(),
2031 updated_at: now.clone(),
2032 last_accessed_at: None,
2033 expires_at: None,
2034 metadata: serde_json::json!({}),
2035 reflection_depth: 0,
2036 memory_kind: crate::models::MemoryKind::Observation,
2037 entity_id: None,
2038 persona_version: None,
2039 citations: Vec::new(),
2040 source_uri: None,
2041 source_span: None,
2042 confidence_source: crate::models::ConfidenceSource::CallerProvided,
2043 confidence_signals: None,
2044 confidence_decayed_at: None,
2045 version: 1,
2046 };
2047
2048 db::insert(&conn, &mem1).unwrap();
2049 db::insert(&conn, &mem2).unwrap();
2050
2051 let adj = adjacent_memory(&conn, &mem1).unwrap();
2053 assert!(adj.is_none());
2055}
2056
2057#[test]
2058fn priority_feedback_caps_at_priority_10() {
2059 let cfg = CuratorConfig {
2063 interval_secs: crate::SECS_PER_HOUR as u64,
2064 max_ops_per_cycle: 100,
2065 dry_run: false,
2066 include_namespaces: vec![],
2067 exclude_namespaces: vec![],
2068 ..CuratorConfig::default()
2069 };
2070 let cap = cfg.max_ops_per_cycle.saturating_mul(4);
2072 assert_eq!(cap, 400);
2073 assert!(cap <= usize::MAX / 10);
2074}
2075
2076#[test]
2077fn priority_feedback_floors_at_priority_1() {
2078 let cfg = CuratorConfig::default();
2080 assert!(cfg.max_ops_per_cycle > 0);
2081 let floored = 0_usize.saturating_add(1);
2084 assert_eq!(floored, 1);
2085}
2086
2087#[test]
2088fn cycle_aborts_on_database_error() {
2089 let tmp = tempfile::NamedTempFile::new().unwrap();
2092 let conn = db::open(tmp.path()).unwrap();
2093 let cfg = CuratorConfig::default();
2094
2095 let result = run_once(&conn, None, &cfg, None);
2097 assert!(result.is_ok());
2098 let report = result.unwrap();
2099 assert!(report.errors.iter().any(|e| e.contains("no LLM")));
2101}