1#![cfg_attr(not(feature = "sal"), allow(dead_code, unused_imports))]
53
54use std::collections::HashSet;
55
56use chrono::{DateTime, Utc};
57use serde::{Deserialize, Serialize};
58
59#[cfg(feature = "sal")]
60use anyhow::Context;
61use anyhow::Result;
62
63#[cfg(feature = "sal")]
64use crate::autonomy::AutonomyLlm;
65#[cfg(feature = "sal")]
66use crate::identity::keypair::AgentKeypair;
67use crate::models::{Memory, MemoryKind, Tier};
68#[cfg(feature = "sal")]
69use crate::storage::reflect::{ReflectError, ReflectInput};
70#[cfg(feature = "sal")]
71use crate::store::{CallerContext, Filter, MemoryStore, StoreError};
72
73#[cfg(feature = "sal")]
74use super::pipeline::MemoryId;
75
76#[cfg(any(feature = "sal", test))]
77use crate::models::ConfidenceSource;
78
79#[cfg(feature = "sal")]
89async fn store_get_opt(
90 store: &dyn MemoryStore,
91 ctx: &CallerContext,
92 id: &str,
93) -> Result<Option<Memory>> {
94 match store.get(ctx, id).await {
95 Ok(mem) => Ok(Some(mem)),
96 Err(StoreError::NotFound { .. }) => Ok(None),
97 Err(e) => Err(anyhow::anyhow!(e)),
98 }
99}
100
101#[cfg(feature = "sal")]
109fn curator_caller_context(agent_id: &str) -> CallerContext {
110 CallerContext::for_admin(agent_id)
111}
112
113#[cfg(feature = "sal")]
121async fn store_list_namespace(
122 store: &dyn MemoryStore,
123 ctx: &CallerContext,
124 namespace: &str,
125 limit: usize,
126) -> Result<Vec<Memory>> {
127 let filter = Filter {
128 namespace: Some(namespace.to_string()),
129 limit,
130 ..Default::default()
131 };
132 store
133 .list(ctx, &filter)
134 .await
135 .map_err(|e| anyhow::anyhow!(e))
136}
137
138pub(crate) const MIN_CLUSTER_SIZE: usize = 3;
146
147pub(crate) const MAX_CLUSTER_SIZE: usize = 12;
151
152pub(crate) const TEMPORAL_WINDOW_DAYS: i64 = 7;
157
158pub(crate) const REFLECTION_JACCARD_THRESHOLD: f64 = 0.30;
163
164pub(crate) const MIN_RECALL_COUNT: i64 = 1;
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
183pub struct ReflectionPassConfig {
184 #[serde(default)]
186 pub enabled: bool,
187 #[serde(default, skip_serializing_if = "Option::is_none")]
195 pub max_depth: Option<u32>,
196}
197
198impl Default for ReflectionPassConfig {
199 fn default() -> Self {
200 Self {
201 enabled: false,
202 max_depth: None,
203 }
204 }
205}
206
207#[cfg(feature = "sal")]
223pub(crate) struct ReflectionPass<'a> {
224 pub(crate) store: &'a dyn MemoryStore,
229 pub(crate) ctx: CallerContext,
234 pub(crate) llm: &'a dyn AutonomyLlm,
237 pub(crate) keypair: Option<&'a AgentKeypair>,
242 pub(crate) max_depth: Option<u32>,
249 pub(crate) dry_run: bool,
254}
255
256#[cfg(feature = "sal")]
257impl<'a> ReflectionPass<'a> {
258 pub(crate) fn new(
263 store: &'a dyn MemoryStore,
264 llm: &'a dyn AutonomyLlm,
265 keypair: Option<&'a AgentKeypair>,
266 max_depth: Option<u32>,
267 dry_run: bool,
268 ) -> Self {
269 let agent_id = keypair.map_or_else(
270 || crate::identity::sentinels::AI_CURATOR.to_string(),
271 |k| k.agent_id.clone(),
272 );
273 Self {
274 store,
275 ctx: curator_caller_context(&agent_id),
276 llm,
277 keypair,
278 max_depth,
279 dry_run,
280 }
281 }
282
283 fn agent_id(&self) -> String {
290 self.keypair.map_or_else(
291 || crate::identity::sentinels::AI_CURATOR.to_string(),
292 |k| k.agent_id.clone(),
293 )
294 }
295
296 #[allow(dead_code)]
300 fn name(&self) -> &str {
301 "reflection"
302 }
303
304 fn cluster(&self, memories: &[Memory]) -> Vec<Vec<MemoryId>> {
319 let mut by_ns: std::collections::HashMap<&str, Vec<&Memory>> =
320 std::collections::HashMap::new();
321 for m in memories {
322 if !is_clusterable_observation(m) {
323 continue;
324 }
325 by_ns.entry(&m.namespace).or_default().push(m);
326 }
327
328 let mut clusters: Vec<Vec<MemoryId>> = Vec::new();
329 for (_ns, group) in by_ns {
330 let mut used = vec![false; group.len()];
331 for i in 0..group.len() {
332 if used[i] {
333 continue;
334 }
335 let mut cluster = vec![group[i].id.clone()];
336 used[i] = true;
337 for j in (i + 1)..group.len() {
338 if used[j] {
339 continue;
340 }
341 if cluster.len() >= MAX_CLUSTER_SIZE {
342 break;
343 }
344 if pair_co_occurs(group[i], group[j]) {
345 cluster.push(group[j].id.clone());
346 used[j] = true;
347 }
348 }
349 if cluster.len() >= MIN_CLUSTER_SIZE {
350 clusters.push(cluster);
351 }
352 }
353 }
354 clusters
355 }
356
357 fn eligible(&self, cluster: &[Memory]) -> bool {
369 if cluster.len() < MIN_CLUSTER_SIZE || cluster.len() > MAX_CLUSTER_SIZE {
370 return false;
371 }
372 let ns = &cluster[0].namespace;
373 if ns.starts_with('_') {
374 return false;
375 }
376 cluster.iter().all(|m| {
377 m.memory_kind == MemoryKind::Observation
378 && &m.namespace == ns
379 && m.access_count >= MIN_RECALL_COUNT
380 })
381 }
382
383 fn summarize(&self, cluster: &[Memory]) -> Result<Memory> {
401 if cluster.len() < MIN_CLUSTER_SIZE {
402 anyhow::bail!(
403 "summarize: cluster has {} members (< MIN_CLUSTER_SIZE = {})",
404 cluster.len(),
405 MIN_CLUSTER_SIZE
406 );
407 }
408
409 let input: Vec<(String, String)> = cluster
410 .iter()
411 .map(|m| (m.title.clone(), m.content.clone()))
412 .collect();
413 let summary_text = self
414 .llm
415 .summarize_memories(&input)
416 .context("ReflectionPass::summarize: LLM call failed")?;
417
418 let base_title = cluster
419 .iter()
420 .map(|m| m.title.as_str())
421 .next()
422 .unwrap_or("(reflection)");
423 let title = format!("[reflection] {base_title}");
424
425 let tier = cluster
426 .iter()
427 .map(|m| m.tier.clone())
428 .max_by_key(tier_rank)
429 .unwrap_or(Tier::Mid);
430 let priority = cluster.iter().map(|m| m.priority).max().unwrap_or(5);
431
432 let now = Utc::now().to_rfc3339();
433 Ok(Memory {
434 id: uuid::Uuid::new_v4().to_string(),
435 tier,
436 namespace: cluster[0].namespace.clone(),
437 title,
438 content: summary_text,
439 tags: vec![],
440 priority,
441 confidence: 1.0,
442 source: "system".to_string(),
446 access_count: 0,
447 created_at: now.clone(),
448 updated_at: now,
449 last_accessed_at: None,
450 expires_at: None,
451 metadata: serde_json::json!({}),
452 reflection_depth: 0,
453 memory_kind: MemoryKind::Reflection,
454 entity_id: None,
455 persona_version: None,
456 citations: Vec::new(),
457 source_uri: None,
458 source_span: None,
459 confidence_source: ConfidenceSource::CallerProvided,
460 confidence_signals: None,
461 confidence_decayed_at: None,
462 version: 1,
463 })
464 }
465
466 async fn persist(&self, summary: &Memory, sources: &[MemoryId]) -> Result<()> {
479 if self.dry_run || sources.is_empty() {
480 return Ok(());
481 }
482
483 if let Some(cap) = self.max_depth {
490 let mut max_src_depth: i32 = 0;
491 for id in sources {
492 if let Some(m) = store_get_opt(self.store, &self.ctx, id).await? {
493 max_src_depth = max_src_depth.max(m.reflection_depth);
494 }
495 }
496 let new_depth =
497 u32::try_from(max_src_depth.max(0).saturating_add(1)).unwrap_or(u32::MAX);
498 if new_depth > cap {
499 anyhow::bail!(
500 "ReflectionPass::persist: proposed depth {new_depth} exceeds \
501 curator --max-depth {cap}"
502 );
503 }
504 }
505
506 let input = ReflectInput {
507 source_ids: sources.to_vec(),
508 title: summary.title.clone(),
509 content: summary.content.clone(),
510 namespace: Some(summary.namespace.clone()),
511 tier: summary.tier.clone(),
512 tags: summary.tags.clone(),
513 priority: summary.priority,
514 confidence: summary.confidence,
515 source: summary.source.clone(),
516 agent_id: self.agent_id(),
517 metadata: summary.metadata.clone(),
518 };
519
520 match self.store.reflect(&self.ctx, &input, self.keypair).await {
529 Ok(_outcome) => Ok(()),
530 Err(ReflectError::DepthExceeded {
531 attempted,
532 cap,
533 namespace,
534 }) => {
535 anyhow::bail!(
536 "ReflectionPass::persist: substrate refused — proposed depth \
537 {attempted} exceeds namespace cap {cap} in '{namespace}'"
538 )
539 }
540 Err(other) => Err(anyhow::anyhow!(other.to_string())),
541 }
542 }
543
544 #[allow(dead_code)]
562 async fn verify(&self, summary_id: MemoryId) -> Result<()> {
563 let mem = store_get_opt(self.store, &self.ctx, &summary_id)
564 .await
565 .context("ReflectionPass::verify: store.get failed")?;
566 let mem = mem
567 .ok_or_else(|| anyhow::anyhow!("verify: reflection {} not found in DB", summary_id))?;
568 if mem.memory_kind != MemoryKind::Reflection {
569 anyhow::bail!(
570 "verify: memory {} is {:?}, expected Reflection",
571 summary_id,
572 mem.memory_kind
573 );
574 }
575
576 let links = self
577 .store
578 .get_links_for_anchor(&summary_id)
579 .await
580 .map_err(|e| anyhow::anyhow!(e))
581 .context("ReflectionPass::verify: store.get_links_for_anchor failed")?;
582 let mut saw_reflects_on = false;
583 for link in &links {
584 if link.source_id != summary_id {
588 continue;
589 }
590 if link.relation != crate::models::MemoryLinkRelation::ReflectsOn {
591 continue;
592 }
593 saw_reflects_on = true;
594 let target = store_get_opt(self.store, &self.ctx, &link.target_id).await?;
598 if target.is_none() {
599 anyhow::bail!(
600 "verify: reflects_on edge target {} not found",
601 link.target_id
602 );
603 }
604 }
605 if !saw_reflects_on {
606 anyhow::bail!("verify: reflection {} has no reflects_on edge", summary_id);
607 }
608 Ok(())
609 }
610}
611
612#[derive(Debug, Clone, Default, Serialize, Deserialize)]
619pub struct ReflectionPassReport {
620 pub started_at: String,
622 pub completed_at: String,
623 pub namespaces_visited: usize,
626 pub observations_scanned: usize,
629 pub clusters_formed: usize,
631 pub clusters_eligible: usize,
633 pub reflections_persisted: usize,
636 pub depth_refusals: usize,
639 pub errors: Vec<String>,
642 #[serde(default)]
646 pub dry_run_proposals: Vec<DryRunProposal>,
647 pub dry_run: bool,
649}
650
651#[derive(Debug, Clone, Serialize, Deserialize)]
655pub struct DryRunProposal {
656 pub namespace: String,
657 pub proposed_title: String,
658 pub source_ids: Vec<String>,
659}
660
661#[cfg(feature = "sal")]
674pub async fn run_reflection_pass(
675 store: &dyn MemoryStore,
676 llm: &dyn AutonomyLlm,
677 keypair: Option<&AgentKeypair>,
678 namespace: Option<&str>,
679 max_depth: Option<u32>,
680 dry_run: bool,
681 enabled_check: impl Fn(&str) -> bool,
682) -> Result<ReflectionPassReport> {
683 let mut report = ReflectionPassReport {
684 started_at: Utc::now().to_rfc3339(),
685 dry_run,
686 ..Default::default()
687 };
688
689 let pass = ReflectionPass::new(store, llm, keypair, max_depth, dry_run);
690
691 let namespaces: Vec<String> = match namespace {
692 Some(ns) => vec![ns.to_string()],
693 None => {
694 let counts = store
697 .list_namespaces()
698 .await
699 .map_err(|e| anyhow::anyhow!(e))
700 .context("run_reflection_pass: list_namespaces failed")?;
701 counts
702 .into_iter()
703 .map(|nc| nc.namespace)
704 .filter(|ns| !ns.starts_with('_'))
705 .collect()
706 }
707 };
708 report.namespaces_visited = namespaces.len();
709
710 for ns in &namespaces {
711 if !enabled_check(ns) {
712 continue;
713 }
714
715 let candidates = match store_list_namespace(
720 store,
721 &pass.ctx,
722 ns.as_str(),
723 MAX_CLUSTER_SIZE * 16,
724 )
725 .await
726 {
727 Ok(v) => v,
728 Err(e) => {
729 report
730 .errors
731 .push(format!("namespace '{ns}': store.list failed: {e}"));
732 continue;
733 }
734 };
735 let scanned_here = candidates.len();
736 report.observations_scanned += scanned_here;
737
738 let clusters = pass.cluster(&candidates);
740 report.clusters_formed += clusters.len();
741
742 for cluster_ids in clusters {
743 let mut cluster: Vec<Memory> = cluster_ids
745 .iter()
746 .filter_map(|id| candidates.iter().find(|m| &m.id == id).cloned())
747 .collect();
748
749 if !pass.eligible(&cluster) {
750 continue;
751 }
752 report.clusters_eligible += 1;
753
754 cluster.sort_by(|a, b| a.id.cmp(&b.id));
757
758 let summary = match pass.summarize(&cluster) {
759 Ok(s) => s,
760 Err(e) => {
761 report
762 .errors
763 .push(format!("namespace '{ns}': summarize failed: {e}"));
764 continue;
765 }
766 };
767
768 let source_ids: Vec<String> = cluster.iter().map(|m| m.id.clone()).collect();
769
770 if dry_run {
771 report.dry_run_proposals.push(DryRunProposal {
772 namespace: ns.clone(),
773 proposed_title: summary.title.clone(),
774 source_ids: source_ids.clone(),
775 });
776 continue;
777 }
778
779 match pass.persist(&summary, &source_ids).await {
780 Ok(()) => {
781 report.reflections_persisted += 1;
782 if let Err(e) = verify_recent(store, &pass.ctx, ns, &source_ids).await {
787 report
788 .errors
789 .push(format!("namespace '{ns}': verify failed: {e}"));
790 }
791 }
792 Err(e) => {
793 let msg = e.to_string();
794 if msg.contains("exceeds") && msg.contains("depth") {
795 report.depth_refusals += 1;
796 } else {
797 report
798 .errors
799 .push(format!("namespace '{ns}': persist failed: {e}"));
800 }
801 }
802 }
803 }
804 }
805
806 report.completed_at = Utc::now().to_rfc3339();
807 Ok(report)
808}
809
810#[cfg(feature = "sal")]
814async fn verify_recent(
815 store: &dyn MemoryStore,
816 ctx: &CallerContext,
817 namespace: &str,
818 source_ids: &[String],
819) -> Result<()> {
820 let candidates = store_list_namespace(store, ctx, namespace, 16)
821 .await
822 .context("verify_recent: store.list failed")?;
823 let target_set: HashSet<&str> = source_ids.iter().map(String::as_str).collect();
824 for cand in candidates
825 .iter()
826 .filter(|m| m.memory_kind == MemoryKind::Reflection)
827 {
828 let links = store
829 .get_links_for_anchor(&cand.id)
830 .await
831 .map_err(|e| anyhow::anyhow!(e))?;
832 let outbound: HashSet<&str> = links
833 .iter()
834 .filter(|l| {
835 l.source_id == cand.id
836 && l.relation == crate::models::MemoryLinkRelation::ReflectsOn
837 })
838 .map(|l| l.target_id.as_str())
839 .collect();
840 if outbound == target_set {
841 return Ok(());
848 }
849 }
850 anyhow::bail!(
851 "verify_recent: no Reflection in namespace '{namespace}' carries the \
852 expected reflects_on edge set"
853 )
854}
855
856fn tier_rank(t: &Tier) -> u8 {
861 match t {
862 Tier::Short => 0,
863 Tier::Mid => 1,
864 Tier::Long => 2,
865 }
866}
867
868fn is_clusterable_observation(m: &Memory) -> bool {
873 m.memory_kind == MemoryKind::Observation
874 && !m.namespace.starts_with('_')
875 && m.access_count >= MIN_RECALL_COUNT
876}
877
878fn pair_co_occurs(a: &Memory, b: &Memory) -> bool {
883 if a.namespace != b.namespace {
884 return false;
885 }
886 if let (Some(ta), Some(tb)) = (parse_rfc3339(&a.created_at), parse_rfc3339(&b.created_at)) {
887 let delta = (ta - tb).num_days().abs();
888 if delta > TEMPORAL_WINDOW_DAYS {
889 return false;
890 }
891 }
892 jaccard_similarity(&a.content, &b.content) >= REFLECTION_JACCARD_THRESHOLD
893}
894
895fn parse_rfc3339(s: &str) -> Option<DateTime<Utc>> {
899 DateTime::parse_from_rfc3339(s)
900 .ok()
901 .map(|d| d.with_timezone(&Utc))
902}
903
904fn jaccard_similarity(a: &str, b: &str) -> f64 {
909 let tokens = |s: &str| -> HashSet<String> {
910 s.split(|c: char| !c.is_alphanumeric())
911 .filter(|t| t.len() >= 3)
912 .map(str::to_lowercase)
913 .collect()
914 };
915 let ta = tokens(a);
916 let tb = tokens(b);
917 if ta.is_empty() && tb.is_empty() {
918 return 0.0;
919 }
920 let inter = ta.intersection(&tb).count();
921 let union = ta.union(&tb).count();
922 if union == 0 {
923 0.0
924 } else {
925 #[allow(clippy::cast_precision_loss)]
926 let result = inter as f64 / union as f64;
927 result
928 }
929}
930
931#[cfg(test)]
934pub(crate) fn temporal_window_seconds() -> i64 {
935 chrono::Duration::days(TEMPORAL_WINDOW_DAYS).num_seconds()
936}
937
938#[cfg(all(test, feature = "sal"))]
946mod tests {
947 use super::*;
948 use crate::models::{Memory, MemoryKind, Tier};
949 use anyhow::Result;
950 use chrono::Duration;
951 use std::sync::Mutex;
952
953 pub(super) struct StubLlm {
961 pub(super) summary: String,
962 pub(super) calls: Mutex<Vec<String>>,
963 }
964
965 impl StubLlm {
966 pub(super) fn new(summary: &str) -> Self {
967 Self {
968 summary: summary.to_string(),
969 calls: Mutex::new(Vec::new()),
970 }
971 }
972 }
973
974 impl AutonomyLlm for StubLlm {
975 fn auto_tag(&self, _title: &str, _content: &str) -> Result<Vec<String>> {
976 Ok(vec![])
977 }
978 fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
979 Ok(false)
980 }
981 fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
982 self.calls
983 .lock()
984 .unwrap()
985 .push(format!("summarize:{}", memories.len()));
986 Ok(self.summary.clone())
987 }
988 }
989
990 fn make_obs(id: &str, ns: &str, title: &str, content: &str, access: i64) -> Memory {
993 let now = Utc::now().to_rfc3339();
994 Memory {
995 id: id.to_string(),
996 tier: Tier::Long,
997 namespace: ns.to_string(),
998 title: title.to_string(),
999 content: content.to_string(),
1000 tags: vec![],
1001 priority: 5,
1002 confidence: 1.0,
1003 source: "test".to_string(),
1004 access_count: access,
1005 created_at: now.clone(),
1006 updated_at: now,
1007 last_accessed_at: None,
1008 expires_at: None,
1009 metadata: serde_json::json!({}),
1010 reflection_depth: 0,
1011 memory_kind: MemoryKind::Observation,
1012 entity_id: None,
1013 persona_version: None,
1014 citations: Vec::new(),
1015 source_uri: None,
1016 source_span: None,
1017 confidence_source: ConfidenceSource::CallerProvided,
1018 confidence_signals: None,
1019 confidence_decayed_at: None,
1020 version: 1,
1021 }
1022 }
1023
1024 #[test]
1027 fn eligible_rejects_below_min_cluster_size() {
1028 let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1031 .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1032 .collect();
1033 let result = cluster.len() >= MIN_CLUSTER_SIZE
1034 && cluster.len() <= MAX_CLUSTER_SIZE
1035 && !cluster[0].namespace.starts_with('_')
1036 && cluster.iter().all(|m| {
1037 m.memory_kind == MemoryKind::Observation
1038 && m.namespace == cluster[0].namespace
1039 && m.access_count >= MIN_RECALL_COUNT
1040 });
1041 assert!(!result, "below-MIN cluster must not be eligible");
1042 }
1043
1044 #[test]
1045 fn eligible_rejects_reflection_kind_member() {
1046 let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1051 .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1052 .collect();
1053 cluster[0].memory_kind = MemoryKind::Reflection;
1054 let result = cluster
1055 .iter()
1056 .all(|m| m.memory_kind == MemoryKind::Observation);
1057 assert!(!result, "mixed-kind cluster must not be eligible");
1058 }
1059
1060 #[test]
1061 fn eligible_rejects_internal_namespace() {
1062 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1063 .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "kubernetes deploy", 1))
1064 .collect();
1065 let result = !cluster[0].namespace.starts_with('_');
1066 assert!(!result, "internal-namespace cluster must not be eligible");
1067 }
1068
1069 #[test]
1072 fn cluster_groups_three_co_occurring_observations() {
1073 let m1 = make_obs("a", "ns", "t1", "kubernetes rolling deploy strategy", 2);
1076 let m2 = make_obs("b", "ns", "t2", "kubernetes deploy canary strategy", 3);
1077 let m3 = make_obs("c", "ns", "t3", "kubernetes rolling deploy approach", 1);
1078
1079 let obs = [m1.clone(), m2.clone(), m3.clone()];
1083 let pairs = [
1084 pair_co_occurs(&m1, &m2),
1085 pair_co_occurs(&m1, &m3),
1086 pair_co_occurs(&m2, &m3),
1087 ];
1088 assert!(
1089 pairs.iter().all(|p| *p),
1090 "all three pairs must co-occur, got {pairs:?}"
1091 );
1092 assert_eq!(obs.len(), MIN_CLUSTER_SIZE);
1093 }
1094
1095 #[test]
1096 fn cluster_skips_observations_with_zero_access_count() {
1097 let cold = make_obs("cold", "ns", "t", "kubernetes deploy", 0);
1100 assert!(!is_clusterable_observation(&cold));
1101 }
1102
1103 #[test]
1104 fn pair_co_occurs_rejects_cross_namespace() {
1105 let a = make_obs("a", "ns1", "t", "shared content tokens", 1);
1106 let b = make_obs("b", "ns2", "t", "shared content tokens", 1);
1107 assert!(!pair_co_occurs(&a, &b));
1108 }
1109
1110 #[test]
1111 fn pair_co_occurs_respects_temporal_window() {
1112 let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1115 let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1116 let now = Utc::now();
1117 a.created_at = now.to_rfc3339();
1118 b.created_at = (now - Duration::days(TEMPORAL_WINDOW_DAYS + 2)).to_rfc3339();
1119 assert!(
1120 !pair_co_occurs(&a, &b),
1121 "outside-window pair must not co-occur"
1122 );
1123 }
1124
1125 #[test]
1126 fn pair_co_occurs_below_jaccard_threshold_is_false() {
1127 let a = make_obs("a", "ns", "t", "kubernetes deploy strategy", 1);
1128 let b = make_obs(
1129 "b",
1130 "ns",
1131 "t",
1132 "completely unrelated quantum mechanics text",
1133 1,
1134 );
1135 assert!(!pair_co_occurs(&a, &b));
1136 }
1137
1138 #[test]
1141 fn jaccard_similarity_is_symmetric() {
1142 let a = "kubernetes rolling deploy canary";
1143 let b = "kubernetes canary rolling deploy strategy";
1144 let sim_ab = jaccard_similarity(a, b);
1145 let sim_ba = jaccard_similarity(b, a);
1146 assert!((sim_ab - sim_ba).abs() < 1e-9);
1147 }
1148
1149 #[test]
1150 fn jaccard_similarity_empty_strings_zero() {
1151 assert_eq!(jaccard_similarity("", ""), 0.0);
1152 }
1153
1154 #[test]
1155 fn temporal_window_is_7_days() {
1156 assert_eq!(temporal_window_seconds(), crate::SECS_PER_WEEK);
1159 }
1160
1161 #[test]
1162 fn config_default_is_disabled() {
1163 let cfg = ReflectionPassConfig::default();
1165 assert!(!cfg.enabled);
1166 assert!(cfg.max_depth.is_none());
1167 }
1168
1169 #[test]
1170 fn config_round_trips_json() {
1171 let cfg = ReflectionPassConfig {
1172 enabled: true,
1173 max_depth: Some(2),
1174 };
1175 let json = serde_json::to_string(&cfg).unwrap();
1176 let back: ReflectionPassConfig = serde_json::from_str(&json).unwrap();
1177 assert_eq!(cfg, back);
1178 }
1179
1180 #[test]
1183 fn stub_llm_records_calls() {
1184 let stub = StubLlm::new("synthesised pattern");
1185 let out = stub
1186 .summarize_memories(&[("t1".into(), "c1".into()), ("t2".into(), "c2".into())])
1187 .unwrap();
1188 assert_eq!(out, "synthesised pattern");
1189 let calls = stub.calls.lock().unwrap();
1190 assert_eq!(calls.len(), 1);
1191 assert!(calls[0].starts_with("summarize:"));
1192 }
1193
1194 #[test]
1197 fn report_serialises_to_json() {
1198 let r = ReflectionPassReport {
1199 started_at: "2026-01-01T00:00:00Z".into(),
1200 completed_at: "2026-01-01T00:00:01Z".into(),
1201 namespaces_visited: 1,
1202 observations_scanned: 30,
1203 clusters_formed: 3,
1204 clusters_eligible: 3,
1205 reflections_persisted: 3,
1206 depth_refusals: 0,
1207 errors: vec![],
1208 dry_run_proposals: vec![],
1209 dry_run: false,
1210 };
1211 let json = serde_json::to_string(&r).unwrap();
1212 assert!(json.contains("reflections_persisted"));
1213 assert!(json.contains("clusters_eligible"));
1214 let back: ReflectionPassReport = serde_json::from_str(&json).unwrap();
1215 assert_eq!(back.observations_scanned, 30);
1216 }
1217
1218 #[cfg(feature = "sal")]
1231 mod sal_pass_tests {
1232 use super::*;
1233
1234 use crate::store::sqlite::SqliteStore;
1235
1236 fn open_db() -> (SqliteStore, tempfile::TempDir) {
1237 let dir = tempfile::tempdir().expect("tempdir");
1238 let path = dir.path().join("test.db");
1239 let store = SqliteStore::open(&path).expect("SqliteStore::open");
1240 (store, dir)
1241 }
1242
1243 fn conn_of(store: &SqliteStore) -> rusqlite::Connection {
1246 crate::db::open(store.path()).expect("db::open at store path")
1247 }
1248
1249 fn insert_observation(
1250 conn: &rusqlite::Connection,
1251 ns: &str,
1252 title: &str,
1253 content: &str,
1254 access_count: i64,
1255 ) -> String {
1256 let now = chrono::Utc::now().to_rfc3339();
1257 let mut metadata = crate::models::default_metadata();
1258 if let Some(obj) = metadata.as_object_mut() {
1259 obj.insert(
1260 "agent_id".to_string(),
1261 serde_json::Value::String("test-agent".to_string()),
1262 );
1263 }
1264 let mem = Memory {
1265 id: uuid::Uuid::new_v4().to_string(),
1266 tier: Tier::Long,
1267 namespace: ns.to_string(),
1268 title: title.to_string(),
1269 content: content.to_string(),
1270 tags: vec![],
1271 priority: 5,
1272 confidence: 1.0,
1273 source: "test".to_string(),
1274 access_count,
1275 created_at: now.clone(),
1276 updated_at: now,
1277 last_accessed_at: None,
1278 expires_at: None,
1279 metadata,
1280 reflection_depth: 0,
1281 memory_kind: MemoryKind::Observation,
1282 entity_id: None,
1283 persona_version: None,
1284 citations: Vec::new(),
1285 source_uri: None,
1286 source_span: None,
1287 confidence_source: ConfidenceSource::CallerProvided,
1288 confidence_signals: None,
1289 confidence_decayed_at: None,
1290 version: 1,
1291 };
1292 crate::db::insert(conn, &mem).unwrap()
1293 }
1294
1295 #[test]
1296 fn pass_name_is_reflection() {
1297 let (store, _dir) = open_db();
1298 let llm = StubLlm::new("S");
1299 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1300 assert_eq!(pass.name(), "reflection");
1301 }
1302
1303 #[test]
1304 fn agent_id_falls_back_to_ai_curator_without_keypair() {
1305 let (store, _dir) = open_db();
1306 let llm = StubLlm::new("S");
1307 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1308 assert_eq!(pass.agent_id(), "ai:curator");
1309 }
1310
1311 #[test]
1312 fn agent_id_uses_keypair_when_provided() {
1313 let (store, _dir) = open_db();
1314 let llm = StubLlm::new("S");
1315 use ed25519_dalek::{SigningKey, VerifyingKey};
1316 let mut rng = rand_core::OsRng;
1317 let sk = SigningKey::generate(&mut rng);
1318 let vk: VerifyingKey = (&sk).into();
1319 let kp = AgentKeypair {
1320 agent_id: "test:agent-x".to_string(),
1321 public: vk,
1322 private: Some(sk),
1323 };
1324 let pass = ReflectionPass::new(&store, &llm, Some(&kp), None, false);
1325 assert_eq!(pass.agent_id(), "test:agent-x");
1326 }
1327
1328 #[test]
1329 fn cluster_excludes_zero_access_observations() {
1330 let (store, _dir) = open_db();
1331 let llm = StubLlm::new("S");
1332 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1333 let m1 = make_obs("a", "ns", "t", "shared keyword tokens here", 0); let m2 = make_obs("b", "ns", "t", "shared keyword tokens here", 5);
1335 let m3 = make_obs("c", "ns", "t", "shared keyword tokens here", 5);
1336 let m4 = make_obs("d", "ns", "t", "shared keyword tokens here", 5);
1337 let clusters = pass.cluster(&[m1, m2, m3, m4]);
1338 assert_eq!(clusters.len(), 1);
1339 assert_eq!(clusters[0].len(), 3);
1340 }
1341
1342 #[test]
1343 fn cluster_caps_at_max_cluster_size() {
1344 let (store, _dir) = open_db();
1345 let llm = StubLlm::new("S");
1346 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1347 let mems: Vec<Memory> = (0..15)
1349 .map(|i| {
1350 make_obs(
1351 &format!("m{i:02}"),
1352 "ns",
1353 "t",
1354 "shared keyword tokens here pattern",
1355 1,
1356 )
1357 })
1358 .collect();
1359 let clusters = pass.cluster(&mems);
1360 for c in &clusters {
1362 assert!(c.len() <= MAX_CLUSTER_SIZE);
1363 }
1364 }
1365
1366 #[test]
1367 fn eligible_pass_method_accepts_valid() {
1368 let (store, _dir) = open_db();
1369 let llm = StubLlm::new("S");
1370 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1371 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1372 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1373 .collect();
1374 assert!(pass.eligible(&cluster));
1375 }
1376
1377 #[test]
1378 fn eligible_pass_method_rejects_oversize() {
1379 let (store, _dir) = open_db();
1380 let llm = StubLlm::new("S");
1381 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1382 let cluster: Vec<Memory> = (0..(MAX_CLUSTER_SIZE + 1))
1383 .map(|i| make_obs(&format!("m{i:02}"), "ns", "t", "c", 1))
1384 .collect();
1385 assert!(!pass.eligible(&cluster));
1386 }
1387
1388 #[test]
1389 fn eligible_pass_method_rejects_reflection_member() {
1390 let (store, _dir) = open_db();
1391 let llm = StubLlm::new("S");
1392 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1393 let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1394 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1395 .collect();
1396 cluster[0].memory_kind = MemoryKind::Reflection;
1397 assert!(!pass.eligible(&cluster));
1398 }
1399
1400 #[test]
1401 fn eligible_pass_method_rejects_zero_access() {
1402 let (store, _dir) = open_db();
1403 let llm = StubLlm::new("S");
1404 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1405 let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1406 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1407 .collect();
1408 cluster[1].access_count = 0;
1409 assert!(!pass.eligible(&cluster));
1410 }
1411
1412 #[test]
1413 fn summarize_below_min_errors() {
1414 let (store, _dir) = open_db();
1415 let llm = StubLlm::new("S");
1416 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1417 let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1418 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1419 .collect();
1420 let err = pass.summarize(&cluster).unwrap_err().to_string();
1421 assert!(err.contains("< MIN_CLUSTER_SIZE"));
1422 }
1423
1424 #[test]
1425 fn summarize_returns_reflection_typed_memory() {
1426 let (store, _dir) = open_db();
1427 let llm = StubLlm::new("synth pattern");
1428 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1429 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1430 .map(|i| {
1431 let mut m = make_obs(&format!("m{i}"), "ns", "Title-A", "shared content", 2);
1432 m.tier = if i == 0 { Tier::Long } else { Tier::Mid };
1433 m.priority = 5 + i32::try_from(i).unwrap();
1434 m
1435 })
1436 .collect();
1437 let summary = pass.summarize(&cluster).unwrap();
1438 assert_eq!(summary.memory_kind, MemoryKind::Reflection);
1439 assert!(summary.title.starts_with("[reflection]"));
1440 assert_eq!(summary.content, "synth pattern");
1441 assert_eq!(summary.tier, Tier::Long);
1442 assert_eq!(summary.source, "system");
1443 assert_eq!(summary.namespace, "ns");
1444 assert_eq!(
1446 summary.priority,
1447 5 + i32::try_from(MIN_CLUSTER_SIZE - 1).unwrap()
1448 );
1449 }
1450
1451 #[tokio::test]
1452 async fn persist_dry_run_is_noop() {
1453 let (store, _dir) = open_db();
1454 let llm = StubLlm::new("S");
1455 let pass = ReflectionPass::new(&store, &llm, None, None, true);
1456 let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1457 pass.persist(&summary, &["x".to_string()]).await.unwrap();
1458 }
1459
1460 #[tokio::test]
1461 async fn persist_empty_sources_is_noop() {
1462 let (store, _dir) = open_db();
1463 let llm = StubLlm::new("S");
1464 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1465 let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1466 pass.persist(&summary, &[]).await.unwrap();
1467 }
1468
1469 #[tokio::test]
1470 async fn persist_refuses_when_max_depth_exceeded() {
1471 let (store, _dir) = open_db();
1472 let llm = StubLlm::new("S");
1473 let pass = ReflectionPass::new(&store, &llm, None, Some(1), false);
1475 let mut source = make_obs("src", "ns", "t", "c", 1);
1476 source.reflection_depth = 1;
1477 let src_id = crate::db::insert(&conn_of(&store), &source).unwrap();
1478 let summary = make_obs("s", "ns", "[reflection]", "c", 0);
1479 let err = pass
1480 .persist(&summary, &[src_id])
1481 .await
1482 .unwrap_err()
1483 .to_string();
1484 assert!(err.contains("exceeds"));
1485 assert!(err.contains("--max-depth"));
1486 }
1487
1488 #[tokio::test]
1489 async fn persist_writes_reflection_into_db() {
1490 let (store, _dir) = open_db();
1492 let conn = conn_of(&store);
1493 let llm = StubLlm::new("synthesised pattern");
1494 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1495 let s1 = insert_observation(&conn, "app", "T1", "kubernetes deploy strategy notes", 2);
1497 let s2 =
1498 insert_observation(&conn, "app", "T2", "kubernetes rolling deploy approach", 3);
1499 let s3 = insert_observation(&conn, "app", "T3", "kubernetes canary deploy strategy", 1);
1500 let summary = pass
1501 .summarize(&[
1502 crate::db::get(&conn, &s1).unwrap().unwrap(),
1503 crate::db::get(&conn, &s2).unwrap().unwrap(),
1504 crate::db::get(&conn, &s3).unwrap().unwrap(),
1505 ])
1506 .unwrap();
1507 pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1508 .await
1509 .unwrap();
1510
1511 let listed = crate::db::list(
1513 &conn,
1514 Some("app"),
1515 None,
1516 32,
1517 0,
1518 None,
1519 None,
1520 None,
1521 None,
1522 None,
1523 )
1524 .unwrap();
1525 let refl = listed
1526 .iter()
1527 .find(|m| m.memory_kind == MemoryKind::Reflection)
1528 .expect("expected one reflection");
1529 pass.verify(refl.id.clone()).await.unwrap();
1531 }
1532
1533 #[tokio::test]
1534 async fn verify_missing_id_errors() {
1535 let (store, _dir) = open_db();
1536 let llm = StubLlm::new("S");
1537 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1538 let err = pass
1539 .verify("no-such".to_string())
1540 .await
1541 .unwrap_err()
1542 .to_string();
1543 assert!(err.contains("not found in DB"));
1544 }
1545
1546 #[tokio::test]
1547 async fn verify_wrong_kind_errors() {
1548 let (store, _dir) = open_db();
1549 let conn = conn_of(&store);
1550 let llm = StubLlm::new("S");
1551 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1552 let id = insert_observation(&conn, "ns", "T", "c", 1);
1554 let err = pass.verify(id).await.unwrap_err().to_string();
1555 assert!(err.contains("expected Reflection"));
1556 }
1557
1558 #[tokio::test]
1559 async fn verify_reflection_without_edges_errors() {
1560 let (store, _dir) = open_db();
1563 let conn = conn_of(&store);
1564 let llm = StubLlm::new("S");
1565 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1566 let now = chrono::Utc::now().to_rfc3339();
1567 let mut metadata = crate::models::default_metadata();
1568 if let Some(obj) = metadata.as_object_mut() {
1569 obj.insert(
1570 "agent_id".to_string(),
1571 serde_json::Value::String("test-agent".to_string()),
1572 );
1573 }
1574 let m = Memory {
1575 id: uuid::Uuid::new_v4().to_string(),
1576 tier: Tier::Mid,
1577 namespace: "ns".to_string(),
1578 title: "[reflection] orphan".to_string(),
1579 content: "c".to_string(),
1580 tags: vec![],
1581 priority: 5,
1582 confidence: 1.0,
1583 source: "system".to_string(),
1584 access_count: 0,
1585 created_at: now.clone(),
1586 updated_at: now,
1587 last_accessed_at: None,
1588 expires_at: None,
1589 metadata,
1590 reflection_depth: 1,
1591 memory_kind: MemoryKind::Reflection,
1592 entity_id: None,
1593 persona_version: None,
1594 citations: Vec::new(),
1595 source_uri: None,
1596 source_span: None,
1597 confidence_source: ConfidenceSource::CallerProvided,
1598 confidence_signals: None,
1599 confidence_decayed_at: None,
1600 version: 1,
1601 };
1602 let id = crate::db::insert(&conn, &m).unwrap();
1603 let err = pass.verify(id).await.unwrap_err().to_string();
1604 assert!(err.contains("no reflects_on edge"));
1605 }
1606
1607 #[tokio::test]
1610 async fn run_reflection_pass_empty_db_dry_run_namespace() {
1611 let (store, _dir) = open_db();
1612 let llm = StubLlm::new("S");
1613 let report =
1614 run_reflection_pass(&store, &llm, None, Some("nope"), None, true, |_| true)
1615 .await
1616 .unwrap();
1617 assert!(report.dry_run);
1618 assert_eq!(report.namespaces_visited, 1);
1619 assert_eq!(report.clusters_formed, 0);
1620 assert_eq!(report.reflections_persisted, 0);
1621 }
1622
1623 #[tokio::test]
1624 async fn run_reflection_pass_all_namespaces_with_disabled_check() {
1625 let (store, _dir) = open_db();
1626 let conn = conn_of(&store);
1627 let llm = StubLlm::new("S");
1628 insert_observation(&conn, "ns1", "t", "shared content tokens here", 2);
1630 let report = run_reflection_pass(&store, &llm, None, None, None, true, |_| false)
1631 .await
1632 .unwrap();
1633 assert_eq!(report.observations_scanned, 0);
1635 }
1636
1637 #[tokio::test]
1638 async fn run_reflection_pass_dry_run_reports_proposals() {
1639 let (store, _dir) = open_db();
1640 let conn = conn_of(&store);
1641 let llm = StubLlm::new("synth");
1642 insert_observation(
1644 &conn,
1645 "app",
1646 "T1",
1647 "kubernetes rolling deploy strategy notes",
1648 2,
1649 );
1650 insert_observation(
1651 &conn,
1652 "app",
1653 "T2",
1654 "kubernetes rolling deploy strategy canary",
1655 3,
1656 );
1657 insert_observation(
1658 &conn,
1659 "app",
1660 "T3",
1661 "kubernetes canary deploy strategy rolling",
1662 1,
1663 );
1664 let report = run_reflection_pass(&store, &llm, None, Some("app"), None, true, |_| true)
1665 .await
1666 .unwrap();
1667 assert!(report.dry_run);
1668 assert!(report.observations_scanned >= 3);
1669 assert!(report.clusters_eligible >= 1);
1670 assert!(!report.dry_run_proposals.is_empty());
1671 assert_eq!(report.reflections_persisted, 0);
1672 }
1673
1674 #[tokio::test]
1675 async fn run_reflection_pass_persists_reflections() {
1676 let (store, _dir) = open_db();
1677 let conn = conn_of(&store);
1678 let llm = StubLlm::new("persisted pattern");
1679 insert_observation(&conn, "app", "T1", "shared keyword token strategy notes", 2);
1680 insert_observation(&conn, "app", "T2", "shared keyword token strategy plan", 3);
1681 insert_observation(
1682 &conn,
1683 "app",
1684 "T3",
1685 "shared keyword token strategy canary",
1686 1,
1687 );
1688 let report =
1689 run_reflection_pass(&store, &llm, None, Some("app"), None, false, |_| true)
1690 .await
1691 .unwrap();
1692 assert_eq!(report.dry_run, false);
1693 assert!(report.reflections_persisted >= 1);
1694 }
1695
1696 #[tokio::test]
1697 async fn run_reflection_pass_depth_refusal_increments_counter() {
1698 let (store, _dir) = open_db();
1699 let conn = conn_of(&store);
1700 let llm = StubLlm::new("synth");
1701 let now = chrono::Utc::now().to_rfc3339();
1704 for i in 0..3 {
1705 let mut metadata = crate::models::default_metadata();
1706 if let Some(obj) = metadata.as_object_mut() {
1707 obj.insert(
1708 "agent_id".to_string(),
1709 serde_json::Value::String("test-agent".to_string()),
1710 );
1711 }
1712 let m = Memory {
1713 id: uuid::Uuid::new_v4().to_string(),
1714 tier: Tier::Long,
1715 namespace: "deep".to_string(),
1716 title: format!("Tdeep-{i}"),
1717 content: "shared keyword token deep strategy".to_string(),
1718 tags: vec![],
1719 priority: 5,
1720 confidence: 1.0,
1721 source: "test".to_string(),
1722 access_count: 2,
1723 created_at: now.clone(),
1724 updated_at: now.clone(),
1725 last_accessed_at: None,
1726 expires_at: None,
1727 metadata,
1728 reflection_depth: 2,
1729 memory_kind: MemoryKind::Observation,
1730 entity_id: None,
1731 persona_version: None,
1732 citations: Vec::new(),
1733 source_uri: None,
1734 source_span: None,
1735 confidence_source: ConfidenceSource::CallerProvided,
1736 confidence_signals: None,
1737 confidence_decayed_at: None,
1738 version: 1,
1739 };
1740 crate::db::insert(&conn, &m).unwrap();
1741 }
1742 let report = run_reflection_pass(
1743 &store,
1744 &llm,
1745 None,
1746 Some("deep"),
1747 Some(2), false,
1749 |_| true,
1750 )
1751 .await
1752 .unwrap();
1753 assert!(report.depth_refusals >= 1);
1754 assert_eq!(report.reflections_persisted, 0);
1756 }
1757
1758 #[test]
1759 fn dry_run_proposal_serialises() {
1760 let p = DryRunProposal {
1761 namespace: "ns".into(),
1762 proposed_title: "[reflection] x".into(),
1763 source_ids: vec!["a".into(), "b".into()],
1764 };
1765 let j = serde_json::to_string(&p).unwrap();
1766 assert!(j.contains("source_ids"));
1767 let back: DryRunProposal = serde_json::from_str(&j).unwrap();
1768 assert_eq!(back.namespace, "ns");
1769 }
1770
1771 #[test]
1774 fn pair_co_occurs_unparseable_timestamps_still_checks_jaccard() {
1775 let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1776 let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1777 a.created_at = "not-a-timestamp".to_string();
1778 b.created_at = "also-invalid".to_string();
1779 assert!(pair_co_occurs(&a, &b));
1782 }
1783
1784 #[test]
1787 fn stub_llm_auto_tag_and_contradiction_paths() {
1788 let stub = StubLlm::new("S");
1789 let tags = stub.auto_tag("t", "c").unwrap();
1790 assert!(tags.is_empty());
1791 let conflict = stub.detect_contradiction("a", "b").unwrap();
1792 assert!(!conflict);
1793 }
1794
1795 #[test]
1796 fn eligible_pass_rejects_internal_namespace_directly() {
1797 let (store, _dir) = open_db();
1799 let llm = StubLlm::new("S");
1800 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1801 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1802 .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "c", 1))
1803 .collect();
1804 assert!(!pass.eligible(&cluster));
1805 }
1806
1807 #[test]
1808 fn jaccard_similarity_zero_union_returns_zero() {
1809 let a = "a b c"; let b = "x";
1814 assert_eq!(jaccard_similarity(a, b), 0.0);
1815 }
1816
1817 #[test]
1818 fn tier_rank_all_variants() {
1819 assert_eq!(tier_rank(&Tier::Short), 0);
1821 assert_eq!(tier_rank(&Tier::Mid), 1);
1822 assert_eq!(tier_rank(&Tier::Long), 2);
1823 }
1824
1825 #[test]
1826 fn parse_rfc3339_invalid_returns_none() {
1827 assert!(parse_rfc3339("garbage").is_none());
1828 assert!(parse_rfc3339("2026-01-01T00:00:00Z").is_some());
1829 }
1830
1831 #[tokio::test]
1832 async fn verify_skips_inbound_links() {
1833 let (store, _dir) = open_db();
1836 let conn = conn_of(&store);
1837 let llm = StubLlm::new("S");
1838 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1839 let s1 =
1841 insert_observation(&conn, "vrf", "T1", "shared keyword pattern tokens here", 2);
1842 let s2 =
1843 insert_observation(&conn, "vrf", "T2", "shared keyword pattern tokens here", 2);
1844 let s3 =
1845 insert_observation(&conn, "vrf", "T3", "shared keyword pattern tokens here", 2);
1846 let summary = pass
1847 .summarize(&[
1848 crate::db::get(&conn, &s1).unwrap().unwrap(),
1849 crate::db::get(&conn, &s2).unwrap().unwrap(),
1850 crate::db::get(&conn, &s3).unwrap().unwrap(),
1851 ])
1852 .unwrap();
1853 pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1854 .await
1855 .unwrap();
1856 let listed = crate::db::list(
1857 &conn,
1858 Some("vrf"),
1859 None,
1860 32,
1861 0,
1862 None,
1863 None,
1864 None,
1865 None,
1866 None,
1867 )
1868 .unwrap();
1869 let refl_id = listed
1870 .iter()
1871 .find(|m| m.memory_kind == MemoryKind::Reflection)
1872 .unwrap()
1873 .id
1874 .clone();
1875 let _ = crate::db::create_link(&conn, &s1, &refl_id, "related_to");
1878 pass.verify(refl_id).await.unwrap();
1879 }
1880
1881 #[tokio::test]
1882 async fn run_reflection_pass_summarize_error_recorded() {
1883 struct FailingLlm;
1887 impl AutonomyLlm for FailingLlm {
1888 fn auto_tag(&self, _t: &str, _c: &str) -> Result<Vec<String>> {
1889 Ok(vec![])
1890 }
1891 fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
1892 Ok(false)
1893 }
1894 fn summarize_memories(&self, _m: &[(String, String)]) -> Result<String> {
1895 anyhow::bail!("forced llm failure")
1896 }
1897 }
1898 let (store, _dir) = open_db();
1899 let conn = conn_of(&store);
1900 let llm = FailingLlm;
1901 insert_observation(&conn, "ns", "T1", "shared keyword pattern tokens here", 2);
1902 insert_observation(&conn, "ns", "T2", "shared keyword pattern tokens here", 2);
1903 insert_observation(&conn, "ns", "T3", "shared keyword pattern tokens here", 2);
1904 let report = run_reflection_pass(&store, &llm, None, Some("ns"), None, false, |_| true)
1905 .await
1906 .unwrap();
1907 assert!(report.errors.iter().any(|e| e.contains("summarize failed")));
1909 assert_eq!(report.reflections_persisted, 0);
1910 }
1911 } }