1#![cfg_attr(not(feature = "sal"), allow(dead_code, unused_imports))]
53
54use std::collections::HashSet;
55
56use chrono::{DateTime, Utc};
57use serde::{Deserialize, Serialize};
58
59#[cfg(feature = "sal")]
60use anyhow::Context;
61use anyhow::Result;
62
63#[cfg(feature = "sal")]
64use crate::autonomy::AutonomyLlm;
65#[cfg(feature = "sal")]
66use crate::identity::keypair::AgentKeypair;
67use crate::models::{Memory, MemoryKind, Tier};
68#[cfg(feature = "sal")]
69use crate::storage::reflect::{ReflectError, ReflectInput};
70#[cfg(feature = "sal")]
71use crate::store::{CallerContext, Filter, MemoryStore, StoreError};
72
73#[cfg(feature = "sal")]
74use super::pipeline::MemoryId;
75
76#[cfg(any(feature = "sal", test))]
77use crate::models::ConfidenceSource;
78
79#[cfg(feature = "sal")]
89async fn store_get_opt(
90 store: &dyn MemoryStore,
91 ctx: &CallerContext,
92 id: &str,
93) -> Result<Option<Memory>> {
94 match store.get(ctx, id).await {
95 Ok(mem) => Ok(Some(mem)),
96 Err(StoreError::NotFound { .. }) => Ok(None),
97 Err(e) => Err(anyhow::anyhow!(e)),
98 }
99}
100
101#[cfg(feature = "sal")]
109fn curator_caller_context(agent_id: &str) -> CallerContext {
110 CallerContext::for_admin(agent_id)
111}
112
113#[cfg(feature = "sal")]
121async fn store_list_namespace(
122 store: &dyn MemoryStore,
123 ctx: &CallerContext,
124 namespace: &str,
125 limit: usize,
126) -> Result<Vec<Memory>> {
127 let filter = Filter {
128 namespace: Some(namespace.to_string()),
129 limit,
130 ..Default::default()
131 };
132 store
133 .list(ctx, &filter)
134 .await
135 .map_err(|e| anyhow::anyhow!(e))
136}
137
138pub(crate) const MIN_CLUSTER_SIZE: usize = 3;
146
147pub(crate) const MAX_CLUSTER_SIZE: usize = 12;
151
152pub(crate) const TEMPORAL_WINDOW_DAYS: i64 = 7;
157
158pub(crate) const REFLECTION_JACCARD_THRESHOLD: f64 = 0.30;
163
164pub(crate) const MIN_RECALL_COUNT: i64 = 1;
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
183pub struct ReflectionPassConfig {
184 #[serde(default)]
186 pub enabled: bool,
187 #[serde(default, skip_serializing_if = "Option::is_none")]
195 pub max_depth: Option<u32>,
196}
197
198impl Default for ReflectionPassConfig {
199 fn default() -> Self {
200 Self {
201 enabled: false,
202 max_depth: None,
203 }
204 }
205}
206
207#[cfg(feature = "sal")]
224pub(crate) struct ReflectionPass<'a> {
225 pub(crate) store: &'a dyn MemoryStore,
230 pub(crate) ctx: CallerContext,
235 pub(crate) llm: &'a dyn AutonomyLlm,
238 pub(crate) keypair: Option<&'a AgentKeypair>,
243 pub(crate) max_depth: Option<u32>,
250 pub(crate) dry_run: bool,
255}
256
257#[cfg(feature = "sal")]
258impl<'a> ReflectionPass<'a> {
259 pub(crate) fn new(
264 store: &'a dyn MemoryStore,
265 llm: &'a dyn AutonomyLlm,
266 keypair: Option<&'a AgentKeypair>,
267 max_depth: Option<u32>,
268 dry_run: bool,
269 ) -> Self {
270 let agent_id = keypair.map_or_else(
271 || crate::identity::sentinels::AI_CURATOR.to_string(),
272 |k| k.agent_id.clone(),
273 );
274 Self {
275 store,
276 ctx: curator_caller_context(&agent_id),
277 llm,
278 keypair,
279 max_depth,
280 dry_run,
281 }
282 }
283
284 fn agent_id(&self) -> String {
291 self.keypair.map_or_else(
292 || crate::identity::sentinels::AI_CURATOR.to_string(),
293 |k| k.agent_id.clone(),
294 )
295 }
296
297 #[allow(dead_code)]
301 fn name(&self) -> &str {
302 "reflection"
303 }
304
305 fn cluster(&self, memories: &[Memory]) -> Vec<Vec<MemoryId>> {
320 let mut by_ns: std::collections::HashMap<&str, Vec<&Memory>> =
321 std::collections::HashMap::new();
322 for m in memories {
323 if !is_clusterable_observation(m) {
324 continue;
325 }
326 by_ns.entry(&m.namespace).or_default().push(m);
327 }
328
329 let mut clusters: Vec<Vec<MemoryId>> = Vec::new();
330 for (_ns, group) in by_ns {
331 let mut used = vec![false; group.len()];
332 for i in 0..group.len() {
333 if used[i] {
334 continue;
335 }
336 let mut cluster = vec![group[i].id.clone()];
337 used[i] = true;
338 for j in (i + 1)..group.len() {
339 if used[j] {
340 continue;
341 }
342 if cluster.len() >= MAX_CLUSTER_SIZE {
343 break;
344 }
345 if pair_co_occurs(group[i], group[j]) {
346 cluster.push(group[j].id.clone());
347 used[j] = true;
348 }
349 }
350 if cluster.len() >= MIN_CLUSTER_SIZE {
351 clusters.push(cluster);
352 }
353 }
354 }
355 clusters
356 }
357
358 fn eligible(&self, cluster: &[Memory]) -> bool {
370 if cluster.len() < MIN_CLUSTER_SIZE || cluster.len() > MAX_CLUSTER_SIZE {
371 return false;
372 }
373 let ns = &cluster[0].namespace;
374 if ns.starts_with('_') {
375 return false;
376 }
377 cluster.iter().all(|m| {
378 m.memory_kind == MemoryKind::Observation
379 && &m.namespace == ns
380 && m.access_count >= MIN_RECALL_COUNT
381 })
382 }
383
384 fn summarize(&self, cluster: &[Memory]) -> Result<Memory> {
402 if cluster.len() < MIN_CLUSTER_SIZE {
403 anyhow::bail!(
404 "summarize: cluster has {} members (< MIN_CLUSTER_SIZE = {})",
405 cluster.len(),
406 MIN_CLUSTER_SIZE
407 );
408 }
409
410 let input: Vec<(String, String)> = cluster
411 .iter()
412 .map(|m| (m.title.clone(), m.content.clone()))
413 .collect();
414 let summary_text = self
415 .llm
416 .summarize_memories(&input)
417 .context("ReflectionPass::summarize: LLM call failed")?;
418
419 let base_title = cluster
420 .iter()
421 .map(|m| m.title.as_str())
422 .next()
423 .unwrap_or("(reflection)");
424 let title = format!("[reflection] {base_title}");
425
426 let tier = cluster
427 .iter()
428 .map(|m| m.tier.clone())
429 .max_by_key(tier_rank)
430 .unwrap_or(Tier::Mid);
431 let priority = cluster.iter().map(|m| m.priority).max().unwrap_or(5);
432
433 let now = Utc::now().to_rfc3339();
434 Ok(Memory {
435 id: uuid::Uuid::new_v4().to_string(),
436 tier,
437 namespace: cluster[0].namespace.clone(),
438 title,
439 content: summary_text,
440 tags: vec![],
441 priority,
442 confidence: 1.0,
443 source: "system".to_string(),
447 access_count: 0,
448 created_at: now.clone(),
449 updated_at: now,
450 last_accessed_at: None,
451 expires_at: None,
452 metadata: serde_json::json!({}),
453 reflection_depth: 0,
454 memory_kind: MemoryKind::Reflection,
455 entity_id: None,
456 persona_version: None,
457 citations: Vec::new(),
458 source_uri: None,
459 source_span: None,
460 confidence_source: ConfidenceSource::CallerProvided,
461 confidence_signals: None,
462 confidence_decayed_at: None,
463 version: 1,
464 })
465 }
466
467 async fn persist(&self, summary: &Memory, sources: &[MemoryId]) -> Result<()> {
480 if self.dry_run || sources.is_empty() {
481 return Ok(());
482 }
483
484 if let Some(cap) = self.max_depth {
491 let mut max_src_depth: i32 = 0;
492 for id in sources {
493 if let Some(m) = store_get_opt(self.store, &self.ctx, id).await? {
494 max_src_depth = max_src_depth.max(m.reflection_depth);
495 }
496 }
497 let new_depth =
498 u32::try_from(max_src_depth.max(0).saturating_add(1)).unwrap_or(u32::MAX);
499 if new_depth > cap {
500 anyhow::bail!(
501 "ReflectionPass::persist: proposed depth {new_depth} exceeds \
502 curator --max-depth {cap}"
503 );
504 }
505 }
506
507 let input = ReflectInput {
508 source_ids: sources.to_vec(),
509 title: summary.title.clone(),
510 content: summary.content.clone(),
511 namespace: Some(summary.namespace.clone()),
512 tier: summary.tier.clone(),
513 tags: summary.tags.clone(),
514 priority: summary.priority,
515 confidence: summary.confidence,
516 source: summary.source.clone(),
517 agent_id: self.agent_id(),
518 metadata: summary.metadata.clone(),
519 };
520
521 match self.store.reflect(&self.ctx, &input, self.keypair).await {
530 Ok(_outcome) => Ok(()),
531 Err(ReflectError::DepthExceeded {
532 attempted,
533 cap,
534 namespace,
535 }) => {
536 anyhow::bail!(
537 "ReflectionPass::persist: substrate refused — proposed depth \
538 {attempted} exceeds namespace cap {cap} in '{namespace}'"
539 )
540 }
541 Err(other) => Err(anyhow::anyhow!(other.to_string())),
542 }
543 }
544
545 #[allow(dead_code)]
563 async fn verify(&self, summary_id: MemoryId) -> Result<()> {
564 let mem = store_get_opt(self.store, &self.ctx, &summary_id)
565 .await
566 .context("ReflectionPass::verify: store.get failed")?;
567 let mem = mem
568 .ok_or_else(|| anyhow::anyhow!("verify: reflection {} not found in DB", summary_id))?;
569 if mem.memory_kind != MemoryKind::Reflection {
570 anyhow::bail!(
571 "verify: memory {} is {:?}, expected Reflection",
572 summary_id,
573 mem.memory_kind
574 );
575 }
576
577 let links = self
578 .store
579 .get_links_for_anchor(&summary_id)
580 .await
581 .map_err(|e| anyhow::anyhow!(e))
582 .context("ReflectionPass::verify: store.get_links_for_anchor failed")?;
583 let mut saw_reflects_on = false;
584 for link in &links {
585 if link.source_id != summary_id {
589 continue;
590 }
591 if link.relation != crate::models::MemoryLinkRelation::ReflectsOn {
592 continue;
593 }
594 saw_reflects_on = true;
595 let target = store_get_opt(self.store, &self.ctx, &link.target_id).await?;
599 if target.is_none() {
600 anyhow::bail!(
601 "verify: reflects_on edge target {} not found",
602 link.target_id
603 );
604 }
605 }
606 if !saw_reflects_on {
607 anyhow::bail!("verify: reflection {} has no reflects_on edge", summary_id);
608 }
609 Ok(())
610 }
611}
612
613#[derive(Debug, Clone, Default, Serialize, Deserialize)]
620pub struct ReflectionPassReport {
621 pub started_at: String,
623 pub completed_at: String,
624 pub namespaces_visited: usize,
627 pub observations_scanned: usize,
630 pub clusters_formed: usize,
632 pub clusters_eligible: usize,
634 pub reflections_persisted: usize,
637 pub depth_refusals: usize,
640 pub errors: Vec<String>,
643 #[serde(default)]
647 pub dry_run_proposals: Vec<DryRunProposal>,
648 pub dry_run: bool,
650}
651
652#[derive(Debug, Clone, Serialize, Deserialize)]
656pub struct DryRunProposal {
657 pub namespace: String,
658 pub proposed_title: String,
659 pub source_ids: Vec<String>,
660}
661
662#[cfg(feature = "sal")]
675pub async fn run_reflection_pass(
676 store: &dyn MemoryStore,
677 llm: &dyn AutonomyLlm,
678 keypair: Option<&AgentKeypair>,
679 namespace: Option<&str>,
680 max_depth: Option<u32>,
681 dry_run: bool,
682 enabled_check: impl Fn(&str) -> bool,
683) -> Result<ReflectionPassReport> {
684 let mut report = ReflectionPassReport {
685 started_at: Utc::now().to_rfc3339(),
686 dry_run,
687 ..Default::default()
688 };
689
690 let pass = ReflectionPass::new(store, llm, keypair, max_depth, dry_run);
691
692 let namespaces: Vec<String> = match namespace {
693 Some(ns) => vec![ns.to_string()],
694 None => {
695 let counts = store
698 .list_namespaces()
699 .await
700 .map_err(|e| anyhow::anyhow!(e))
701 .context("run_reflection_pass: list_namespaces failed")?;
702 counts
703 .into_iter()
704 .map(|nc| nc.namespace)
705 .filter(|ns| !ns.starts_with('_'))
706 .collect()
707 }
708 };
709 report.namespaces_visited = namespaces.len();
710
711 for ns in &namespaces {
712 if !enabled_check(ns) {
713 continue;
714 }
715
716 let candidates = match store_list_namespace(
721 store,
722 &pass.ctx,
723 ns.as_str(),
724 MAX_CLUSTER_SIZE * 16,
725 )
726 .await
727 {
728 Ok(v) => v,
729 Err(e) => {
730 report
731 .errors
732 .push(format!("namespace '{ns}': store.list failed: {e}"));
733 continue;
734 }
735 };
736 let scanned_here = candidates.len();
737 report.observations_scanned += scanned_here;
738
739 let clusters = pass.cluster(&candidates);
741 report.clusters_formed += clusters.len();
742
743 for cluster_ids in clusters {
744 let mut cluster: Vec<Memory> = cluster_ids
746 .iter()
747 .filter_map(|id| candidates.iter().find(|m| &m.id == id).cloned())
748 .collect();
749
750 if !pass.eligible(&cluster) {
751 continue;
752 }
753 report.clusters_eligible += 1;
754
755 cluster.sort_by(|a, b| a.id.cmp(&b.id));
758
759 let summary = match pass.summarize(&cluster) {
760 Ok(s) => s,
761 Err(e) => {
762 report
763 .errors
764 .push(format!("namespace '{ns}': summarize failed: {e}"));
765 continue;
766 }
767 };
768
769 let source_ids: Vec<String> = cluster.iter().map(|m| m.id.clone()).collect();
770
771 if dry_run {
772 report.dry_run_proposals.push(DryRunProposal {
773 namespace: ns.clone(),
774 proposed_title: summary.title.clone(),
775 source_ids: source_ids.clone(),
776 });
777 continue;
778 }
779
780 match pass.persist(&summary, &source_ids).await {
781 Ok(()) => {
782 report.reflections_persisted += 1;
783 if let Err(e) = verify_recent(store, &pass.ctx, ns, &source_ids).await {
788 report
789 .errors
790 .push(format!("namespace '{ns}': verify failed: {e}"));
791 }
792 }
793 Err(e) => {
794 let msg = e.to_string();
795 if msg.contains("exceeds") && msg.contains("depth") {
796 report.depth_refusals += 1;
797 } else {
798 report
799 .errors
800 .push(format!("namespace '{ns}': persist failed: {e}"));
801 }
802 }
803 }
804 }
805 }
806
807 report.completed_at = Utc::now().to_rfc3339();
808 Ok(report)
809}
810
811#[cfg(feature = "sal")]
815async fn verify_recent(
816 store: &dyn MemoryStore,
817 ctx: &CallerContext,
818 namespace: &str,
819 source_ids: &[String],
820) -> Result<()> {
821 let candidates = store_list_namespace(store, ctx, namespace, 16)
822 .await
823 .context("verify_recent: store.list failed")?;
824 let target_set: HashSet<&str> = source_ids.iter().map(String::as_str).collect();
825 for cand in candidates
826 .iter()
827 .filter(|m| m.memory_kind == MemoryKind::Reflection)
828 {
829 let links = store
830 .get_links_for_anchor(&cand.id)
831 .await
832 .map_err(|e| anyhow::anyhow!(e))?;
833 let outbound: HashSet<&str> = links
834 .iter()
835 .filter(|l| {
836 l.source_id == cand.id
837 && l.relation == crate::models::MemoryLinkRelation::ReflectsOn
838 })
839 .map(|l| l.target_id.as_str())
840 .collect();
841 if outbound == target_set {
842 return Ok(());
849 }
850 }
851 anyhow::bail!(
852 "verify_recent: no Reflection in namespace '{namespace}' carries the \
853 expected reflects_on edge set"
854 )
855}
856
857fn tier_rank(t: &Tier) -> u8 {
862 match t {
863 Tier::Short => 0,
864 Tier::Mid => 1,
865 Tier::Long => 2,
866 }
867}
868
869fn is_clusterable_observation(m: &Memory) -> bool {
874 m.memory_kind == MemoryKind::Observation
875 && !m.namespace.starts_with('_')
876 && m.access_count >= MIN_RECALL_COUNT
877}
878
879fn pair_co_occurs(a: &Memory, b: &Memory) -> bool {
884 if a.namespace != b.namespace {
885 return false;
886 }
887 if let (Some(ta), Some(tb)) = (parse_rfc3339(&a.created_at), parse_rfc3339(&b.created_at)) {
888 let delta = (ta - tb).num_days().abs();
889 if delta > TEMPORAL_WINDOW_DAYS {
890 return false;
891 }
892 }
893 jaccard_similarity(&a.content, &b.content) >= REFLECTION_JACCARD_THRESHOLD
894}
895
896fn parse_rfc3339(s: &str) -> Option<DateTime<Utc>> {
900 DateTime::parse_from_rfc3339(s)
901 .ok()
902 .map(|d| d.with_timezone(&Utc))
903}
904
905fn jaccard_similarity(a: &str, b: &str) -> f64 {
910 let tokens = |s: &str| -> HashSet<String> {
911 s.split(|c: char| !c.is_alphanumeric())
912 .filter(|t| t.len() >= 3)
913 .map(str::to_lowercase)
914 .collect()
915 };
916 let ta = tokens(a);
917 let tb = tokens(b);
918 if ta.is_empty() && tb.is_empty() {
919 return 0.0;
920 }
921 let inter = ta.intersection(&tb).count();
922 let union = ta.union(&tb).count();
923 if union == 0 {
924 0.0
925 } else {
926 #[allow(clippy::cast_precision_loss)]
927 let result = inter as f64 / union as f64;
928 result
929 }
930}
931
932#[cfg(test)]
935pub(crate) fn temporal_window_seconds() -> i64 {
936 chrono::Duration::days(TEMPORAL_WINDOW_DAYS).num_seconds()
937}
938
939#[cfg(all(test, feature = "sal"))]
947mod tests {
948 use super::*;
949 use crate::models::{Memory, MemoryKind, Tier};
950 use anyhow::Result;
951 use chrono::Duration;
952 use std::sync::Mutex;
953
954 pub(super) struct StubLlm {
962 pub(super) summary: String,
963 pub(super) calls: Mutex<Vec<String>>,
964 }
965
966 impl StubLlm {
967 pub(super) fn new(summary: &str) -> Self {
968 Self {
969 summary: summary.to_string(),
970 calls: Mutex::new(Vec::new()),
971 }
972 }
973 }
974
975 impl AutonomyLlm for StubLlm {
976 fn auto_tag(&self, _title: &str, _content: &str) -> Result<Vec<String>> {
977 Ok(vec![])
978 }
979 fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
980 Ok(false)
981 }
982 fn summarize_memories(&self, memories: &[(String, String)]) -> Result<String> {
983 self.calls
984 .lock()
985 .unwrap()
986 .push(format!("summarize:{}", memories.len()));
987 Ok(self.summary.clone())
988 }
989 }
990
991 fn make_obs(id: &str, ns: &str, title: &str, content: &str, access: i64) -> Memory {
994 let now = Utc::now().to_rfc3339();
995 Memory {
996 id: id.to_string(),
997 tier: Tier::Long,
998 namespace: ns.to_string(),
999 title: title.to_string(),
1000 content: content.to_string(),
1001 tags: vec![],
1002 priority: 5,
1003 confidence: 1.0,
1004 source: "test".to_string(),
1005 access_count: access,
1006 created_at: now.clone(),
1007 updated_at: now,
1008 last_accessed_at: None,
1009 expires_at: None,
1010 metadata: serde_json::json!({}),
1011 reflection_depth: 0,
1012 memory_kind: MemoryKind::Observation,
1013 entity_id: None,
1014 persona_version: None,
1015 citations: Vec::new(),
1016 source_uri: None,
1017 source_span: None,
1018 confidence_source: ConfidenceSource::CallerProvided,
1019 confidence_signals: None,
1020 confidence_decayed_at: None,
1021 version: 1,
1022 }
1023 }
1024
1025 #[test]
1028 fn eligible_rejects_below_min_cluster_size() {
1029 let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1032 .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1033 .collect();
1034 let result = cluster.len() >= MIN_CLUSTER_SIZE
1035 && cluster.len() <= MAX_CLUSTER_SIZE
1036 && !cluster[0].namespace.starts_with('_')
1037 && cluster.iter().all(|m| {
1038 m.memory_kind == MemoryKind::Observation
1039 && m.namespace == cluster[0].namespace
1040 && m.access_count >= MIN_RECALL_COUNT
1041 });
1042 assert!(!result, "below-MIN cluster must not be eligible");
1043 }
1044
1045 #[test]
1046 fn eligible_rejects_reflection_kind_member() {
1047 let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1052 .map(|i| make_obs(&format!("m{i}"), "app", "t", "kubernetes deploy", 1))
1053 .collect();
1054 cluster[0].memory_kind = MemoryKind::Reflection;
1055 let result = cluster
1056 .iter()
1057 .all(|m| m.memory_kind == MemoryKind::Observation);
1058 assert!(!result, "mixed-kind cluster must not be eligible");
1059 }
1060
1061 #[test]
1062 fn eligible_rejects_internal_namespace() {
1063 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1064 .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "kubernetes deploy", 1))
1065 .collect();
1066 let result = !cluster[0].namespace.starts_with('_');
1067 assert!(!result, "internal-namespace cluster must not be eligible");
1068 }
1069
1070 #[test]
1073 fn cluster_groups_three_co_occurring_observations() {
1074 let m1 = make_obs("a", "ns", "t1", "kubernetes rolling deploy strategy", 2);
1077 let m2 = make_obs("b", "ns", "t2", "kubernetes deploy canary strategy", 3);
1078 let m3 = make_obs("c", "ns", "t3", "kubernetes rolling deploy approach", 1);
1079
1080 let obs = [m1.clone(), m2.clone(), m3.clone()];
1084 let pairs = [
1085 pair_co_occurs(&m1, &m2),
1086 pair_co_occurs(&m1, &m3),
1087 pair_co_occurs(&m2, &m3),
1088 ];
1089 assert!(
1090 pairs.iter().all(|p| *p),
1091 "all three pairs must co-occur, got {pairs:?}"
1092 );
1093 assert_eq!(obs.len(), MIN_CLUSTER_SIZE);
1094 }
1095
1096 #[test]
1097 fn cluster_skips_observations_with_zero_access_count() {
1098 let cold = make_obs("cold", "ns", "t", "kubernetes deploy", 0);
1101 assert!(!is_clusterable_observation(&cold));
1102 }
1103
1104 #[test]
1105 fn pair_co_occurs_rejects_cross_namespace() {
1106 let a = make_obs("a", "ns1", "t", "shared content tokens", 1);
1107 let b = make_obs("b", "ns2", "t", "shared content tokens", 1);
1108 assert!(!pair_co_occurs(&a, &b));
1109 }
1110
1111 #[test]
1112 fn pair_co_occurs_respects_temporal_window() {
1113 let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1116 let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1117 let now = Utc::now();
1118 a.created_at = now.to_rfc3339();
1119 b.created_at = (now - Duration::days(TEMPORAL_WINDOW_DAYS + 2)).to_rfc3339();
1120 assert!(
1121 !pair_co_occurs(&a, &b),
1122 "outside-window pair must not co-occur"
1123 );
1124 }
1125
1126 #[test]
1127 fn pair_co_occurs_below_jaccard_threshold_is_false() {
1128 let a = make_obs("a", "ns", "t", "kubernetes deploy strategy", 1);
1129 let b = make_obs(
1130 "b",
1131 "ns",
1132 "t",
1133 "completely unrelated quantum mechanics text",
1134 1,
1135 );
1136 assert!(!pair_co_occurs(&a, &b));
1137 }
1138
1139 #[test]
1142 fn jaccard_similarity_is_symmetric() {
1143 let a = "kubernetes rolling deploy canary";
1144 let b = "kubernetes canary rolling deploy strategy";
1145 let sim_ab = jaccard_similarity(a, b);
1146 let sim_ba = jaccard_similarity(b, a);
1147 assert!((sim_ab - sim_ba).abs() < 1e-9);
1148 }
1149
1150 #[test]
1151 fn jaccard_similarity_empty_strings_zero() {
1152 assert_eq!(jaccard_similarity("", ""), 0.0);
1153 }
1154
1155 #[test]
1156 fn temporal_window_is_7_days() {
1157 assert_eq!(temporal_window_seconds(), crate::SECS_PER_WEEK);
1160 }
1161
1162 #[test]
1163 fn config_default_is_disabled() {
1164 let cfg = ReflectionPassConfig::default();
1166 assert!(!cfg.enabled);
1167 assert!(cfg.max_depth.is_none());
1168 }
1169
1170 #[test]
1171 fn config_round_trips_json() {
1172 let cfg = ReflectionPassConfig {
1173 enabled: true,
1174 max_depth: Some(2),
1175 };
1176 let json = serde_json::to_string(&cfg).unwrap();
1177 let back: ReflectionPassConfig = serde_json::from_str(&json).unwrap();
1178 assert_eq!(cfg, back);
1179 }
1180
1181 #[test]
1184 fn stub_llm_records_calls() {
1185 let stub = StubLlm::new("synthesised pattern");
1186 let out = stub
1187 .summarize_memories(&[("t1".into(), "c1".into()), ("t2".into(), "c2".into())])
1188 .unwrap();
1189 assert_eq!(out, "synthesised pattern");
1190 let calls = stub.calls.lock().unwrap();
1191 assert_eq!(calls.len(), 1);
1192 assert!(calls[0].starts_with("summarize:"));
1193 }
1194
1195 #[test]
1198 fn report_serialises_to_json() {
1199 let r = ReflectionPassReport {
1200 started_at: "2026-01-01T00:00:00Z".into(),
1201 completed_at: "2026-01-01T00:00:01Z".into(),
1202 namespaces_visited: 1,
1203 observations_scanned: 30,
1204 clusters_formed: 3,
1205 clusters_eligible: 3,
1206 reflections_persisted: 3,
1207 depth_refusals: 0,
1208 errors: vec![],
1209 dry_run_proposals: vec![],
1210 dry_run: false,
1211 };
1212 let json = serde_json::to_string(&r).unwrap();
1213 assert!(json.contains("reflections_persisted"));
1214 assert!(json.contains("clusters_eligible"));
1215 let back: ReflectionPassReport = serde_json::from_str(&json).unwrap();
1216 assert_eq!(back.observations_scanned, 30);
1217 }
1218
1219 #[cfg(feature = "sal")]
1232 mod sal_pass_tests {
1233 use super::*;
1234
1235 use crate::store::sqlite::SqliteStore;
1236
1237 fn open_db() -> (SqliteStore, tempfile::TempDir) {
1238 let dir = tempfile::tempdir().expect("tempdir");
1239 let path = dir.path().join("test.db");
1240 let store = SqliteStore::open(&path).expect("SqliteStore::open");
1241 (store, dir)
1242 }
1243
1244 fn conn_of(store: &SqliteStore) -> rusqlite::Connection {
1247 crate::db::open(store.path()).expect("db::open at store path")
1248 }
1249
1250 fn insert_observation(
1251 conn: &rusqlite::Connection,
1252 ns: &str,
1253 title: &str,
1254 content: &str,
1255 access_count: i64,
1256 ) -> String {
1257 let now = chrono::Utc::now().to_rfc3339();
1258 let mut metadata = crate::models::default_metadata();
1259 if let Some(obj) = metadata.as_object_mut() {
1260 obj.insert(
1261 "agent_id".to_string(),
1262 serde_json::Value::String("test-agent".to_string()),
1263 );
1264 }
1265 let mem = Memory {
1266 id: uuid::Uuid::new_v4().to_string(),
1267 tier: Tier::Long,
1268 namespace: ns.to_string(),
1269 title: title.to_string(),
1270 content: content.to_string(),
1271 tags: vec![],
1272 priority: 5,
1273 confidence: 1.0,
1274 source: "test".to_string(),
1275 access_count,
1276 created_at: now.clone(),
1277 updated_at: now,
1278 last_accessed_at: None,
1279 expires_at: None,
1280 metadata,
1281 reflection_depth: 0,
1282 memory_kind: MemoryKind::Observation,
1283 entity_id: None,
1284 persona_version: None,
1285 citations: Vec::new(),
1286 source_uri: None,
1287 source_span: None,
1288 confidence_source: ConfidenceSource::CallerProvided,
1289 confidence_signals: None,
1290 confidence_decayed_at: None,
1291 version: 1,
1292 };
1293 crate::db::insert(conn, &mem).unwrap()
1294 }
1295
1296 #[test]
1297 fn pass_name_is_reflection() {
1298 let (store, _dir) = open_db();
1299 let llm = StubLlm::new("S");
1300 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1301 assert_eq!(pass.name(), "reflection");
1302 }
1303
1304 #[test]
1305 fn agent_id_falls_back_to_ai_curator_without_keypair() {
1306 let (store, _dir) = open_db();
1307 let llm = StubLlm::new("S");
1308 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1309 assert_eq!(pass.agent_id(), "ai:curator");
1310 }
1311
1312 #[test]
1313 fn agent_id_uses_keypair_when_provided() {
1314 let (store, _dir) = open_db();
1315 let llm = StubLlm::new("S");
1316 use ed25519_dalek::{SigningKey, VerifyingKey};
1317 let mut rng = rand_core::OsRng;
1318 let sk = SigningKey::generate(&mut rng);
1319 let vk: VerifyingKey = (&sk).into();
1320 let kp = AgentKeypair {
1321 agent_id: "test:agent-x".to_string(),
1322 public: vk,
1323 private: Some(sk),
1324 };
1325 let pass = ReflectionPass::new(&store, &llm, Some(&kp), None, false);
1326 assert_eq!(pass.agent_id(), "test:agent-x");
1327 }
1328
1329 #[test]
1330 fn cluster_excludes_zero_access_observations() {
1331 let (store, _dir) = open_db();
1332 let llm = StubLlm::new("S");
1333 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1334 let m1 = make_obs("a", "ns", "t", "shared keyword tokens here", 0); let m2 = make_obs("b", "ns", "t", "shared keyword tokens here", 5);
1336 let m3 = make_obs("c", "ns", "t", "shared keyword tokens here", 5);
1337 let m4 = make_obs("d", "ns", "t", "shared keyword tokens here", 5);
1338 let clusters = pass.cluster(&[m1, m2, m3, m4]);
1339 assert_eq!(clusters.len(), 1);
1340 assert_eq!(clusters[0].len(), 3);
1341 }
1342
1343 #[test]
1344 fn cluster_caps_at_max_cluster_size() {
1345 let (store, _dir) = open_db();
1346 let llm = StubLlm::new("S");
1347 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1348 let mems: Vec<Memory> = (0..15)
1350 .map(|i| {
1351 make_obs(
1352 &format!("m{i:02}"),
1353 "ns",
1354 "t",
1355 "shared keyword tokens here pattern",
1356 1,
1357 )
1358 })
1359 .collect();
1360 let clusters = pass.cluster(&mems);
1361 for c in &clusters {
1363 assert!(c.len() <= MAX_CLUSTER_SIZE);
1364 }
1365 }
1366
1367 #[test]
1368 fn eligible_pass_method_accepts_valid() {
1369 let (store, _dir) = open_db();
1370 let llm = StubLlm::new("S");
1371 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1372 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1373 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1374 .collect();
1375 assert!(pass.eligible(&cluster));
1376 }
1377
1378 #[test]
1379 fn eligible_pass_method_rejects_oversize() {
1380 let (store, _dir) = open_db();
1381 let llm = StubLlm::new("S");
1382 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1383 let cluster: Vec<Memory> = (0..(MAX_CLUSTER_SIZE + 1))
1384 .map(|i| make_obs(&format!("m{i:02}"), "ns", "t", "c", 1))
1385 .collect();
1386 assert!(!pass.eligible(&cluster));
1387 }
1388
1389 #[test]
1390 fn eligible_pass_method_rejects_reflection_member() {
1391 let (store, _dir) = open_db();
1392 let llm = StubLlm::new("S");
1393 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1394 let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1395 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1396 .collect();
1397 cluster[0].memory_kind = MemoryKind::Reflection;
1398 assert!(!pass.eligible(&cluster));
1399 }
1400
1401 #[test]
1402 fn eligible_pass_method_rejects_zero_access() {
1403 let (store, _dir) = open_db();
1404 let llm = StubLlm::new("S");
1405 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1406 let mut cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1407 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1408 .collect();
1409 cluster[1].access_count = 0;
1410 assert!(!pass.eligible(&cluster));
1411 }
1412
1413 #[test]
1414 fn summarize_below_min_errors() {
1415 let (store, _dir) = open_db();
1416 let llm = StubLlm::new("S");
1417 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1418 let cluster: Vec<Memory> = (0..(MIN_CLUSTER_SIZE - 1))
1419 .map(|i| make_obs(&format!("m{i}"), "ns", "t", "c", 1))
1420 .collect();
1421 let err = pass.summarize(&cluster).unwrap_err().to_string();
1422 assert!(err.contains("< MIN_CLUSTER_SIZE"));
1423 }
1424
1425 #[test]
1426 fn summarize_returns_reflection_typed_memory() {
1427 let (store, _dir) = open_db();
1428 let llm = StubLlm::new("synth pattern");
1429 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1430 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1431 .map(|i| {
1432 let mut m = make_obs(&format!("m{i}"), "ns", "Title-A", "shared content", 2);
1433 m.tier = if i == 0 { Tier::Long } else { Tier::Mid };
1434 m.priority = 5 + i32::try_from(i).unwrap();
1435 m
1436 })
1437 .collect();
1438 let summary = pass.summarize(&cluster).unwrap();
1439 assert_eq!(summary.memory_kind, MemoryKind::Reflection);
1440 assert!(summary.title.starts_with("[reflection]"));
1441 assert_eq!(summary.content, "synth pattern");
1442 assert_eq!(summary.tier, Tier::Long);
1443 assert_eq!(summary.source, "system");
1444 assert_eq!(summary.namespace, "ns");
1445 assert_eq!(
1447 summary.priority,
1448 5 + i32::try_from(MIN_CLUSTER_SIZE - 1).unwrap()
1449 );
1450 }
1451
1452 #[tokio::test]
1453 async fn persist_dry_run_is_noop() {
1454 let (store, _dir) = open_db();
1455 let llm = StubLlm::new("S");
1456 let pass = ReflectionPass::new(&store, &llm, None, None, true);
1457 let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1458 pass.persist(&summary, &["x".to_string()]).await.unwrap();
1459 }
1460
1461 #[tokio::test]
1462 async fn persist_empty_sources_is_noop() {
1463 let (store, _dir) = open_db();
1464 let llm = StubLlm::new("S");
1465 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1466 let summary = make_obs("s", "ns", "[reflection]", "c", 1);
1467 pass.persist(&summary, &[]).await.unwrap();
1468 }
1469
1470 #[tokio::test]
1471 async fn persist_refuses_when_max_depth_exceeded() {
1472 let (store, _dir) = open_db();
1473 let llm = StubLlm::new("S");
1474 let pass = ReflectionPass::new(&store, &llm, None, Some(1), false);
1476 let mut source = make_obs("src", "ns", "t", "c", 1);
1477 source.reflection_depth = 1;
1478 let src_id = crate::db::insert(&conn_of(&store), &source).unwrap();
1479 let summary = make_obs("s", "ns", "[reflection]", "c", 0);
1480 let err = pass
1481 .persist(&summary, &[src_id])
1482 .await
1483 .unwrap_err()
1484 .to_string();
1485 assert!(err.contains("exceeds"));
1486 assert!(err.contains("--max-depth"));
1487 }
1488
1489 #[tokio::test]
1490 async fn persist_writes_reflection_into_db() {
1491 let (store, _dir) = open_db();
1493 let conn = conn_of(&store);
1494 let llm = StubLlm::new("synthesised pattern");
1495 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1496 let s1 = insert_observation(&conn, "app", "T1", "kubernetes deploy strategy notes", 2);
1498 let s2 =
1499 insert_observation(&conn, "app", "T2", "kubernetes rolling deploy approach", 3);
1500 let s3 = insert_observation(&conn, "app", "T3", "kubernetes canary deploy strategy", 1);
1501 let summary = pass
1502 .summarize(&[
1503 crate::db::get(&conn, &s1).unwrap().unwrap(),
1504 crate::db::get(&conn, &s2).unwrap().unwrap(),
1505 crate::db::get(&conn, &s3).unwrap().unwrap(),
1506 ])
1507 .unwrap();
1508 pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1509 .await
1510 .unwrap();
1511
1512 let listed = crate::db::list(
1514 &conn,
1515 Some("app"),
1516 None,
1517 32,
1518 0,
1519 None,
1520 None,
1521 None,
1522 None,
1523 None,
1524 )
1525 .unwrap();
1526 let refl = listed
1527 .iter()
1528 .find(|m| m.memory_kind == MemoryKind::Reflection)
1529 .expect("expected one reflection");
1530 pass.verify(refl.id.clone()).await.unwrap();
1532 }
1533
1534 #[tokio::test]
1535 async fn verify_missing_id_errors() {
1536 let (store, _dir) = open_db();
1537 let llm = StubLlm::new("S");
1538 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1539 let err = pass
1540 .verify("no-such".to_string())
1541 .await
1542 .unwrap_err()
1543 .to_string();
1544 assert!(err.contains("not found in DB"));
1545 }
1546
1547 #[tokio::test]
1548 async fn verify_wrong_kind_errors() {
1549 let (store, _dir) = open_db();
1550 let conn = conn_of(&store);
1551 let llm = StubLlm::new("S");
1552 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1553 let id = insert_observation(&conn, "ns", "T", "c", 1);
1555 let err = pass.verify(id).await.unwrap_err().to_string();
1556 assert!(err.contains("expected Reflection"));
1557 }
1558
1559 #[tokio::test]
1560 async fn verify_reflection_without_edges_errors() {
1561 let (store, _dir) = open_db();
1564 let conn = conn_of(&store);
1565 let llm = StubLlm::new("S");
1566 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1567 let now = chrono::Utc::now().to_rfc3339();
1568 let mut metadata = crate::models::default_metadata();
1569 if let Some(obj) = metadata.as_object_mut() {
1570 obj.insert(
1571 "agent_id".to_string(),
1572 serde_json::Value::String("test-agent".to_string()),
1573 );
1574 }
1575 let m = Memory {
1576 id: uuid::Uuid::new_v4().to_string(),
1577 tier: Tier::Mid,
1578 namespace: "ns".to_string(),
1579 title: "[reflection] orphan".to_string(),
1580 content: "c".to_string(),
1581 tags: vec![],
1582 priority: 5,
1583 confidence: 1.0,
1584 source: "system".to_string(),
1585 access_count: 0,
1586 created_at: now.clone(),
1587 updated_at: now,
1588 last_accessed_at: None,
1589 expires_at: None,
1590 metadata,
1591 reflection_depth: 1,
1592 memory_kind: MemoryKind::Reflection,
1593 entity_id: None,
1594 persona_version: None,
1595 citations: Vec::new(),
1596 source_uri: None,
1597 source_span: None,
1598 confidence_source: ConfidenceSource::CallerProvided,
1599 confidence_signals: None,
1600 confidence_decayed_at: None,
1601 version: 1,
1602 };
1603 let id = crate::db::insert(&conn, &m).unwrap();
1604 let err = pass.verify(id).await.unwrap_err().to_string();
1605 assert!(err.contains("no reflects_on edge"));
1606 }
1607
1608 #[tokio::test]
1611 async fn run_reflection_pass_empty_db_dry_run_namespace() {
1612 let (store, _dir) = open_db();
1613 let llm = StubLlm::new("S");
1614 let report =
1615 run_reflection_pass(&store, &llm, None, Some("nope"), None, true, |_| true)
1616 .await
1617 .unwrap();
1618 assert!(report.dry_run);
1619 assert_eq!(report.namespaces_visited, 1);
1620 assert_eq!(report.clusters_formed, 0);
1621 assert_eq!(report.reflections_persisted, 0);
1622 }
1623
1624 #[tokio::test]
1625 async fn run_reflection_pass_all_namespaces_with_disabled_check() {
1626 let (store, _dir) = open_db();
1627 let conn = conn_of(&store);
1628 let llm = StubLlm::new("S");
1629 insert_observation(&conn, "ns1", "t", "shared content tokens here", 2);
1631 let report = run_reflection_pass(&store, &llm, None, None, None, true, |_| false)
1632 .await
1633 .unwrap();
1634 assert_eq!(report.observations_scanned, 0);
1636 }
1637
1638 #[tokio::test]
1639 async fn run_reflection_pass_dry_run_reports_proposals() {
1640 let (store, _dir) = open_db();
1641 let conn = conn_of(&store);
1642 let llm = StubLlm::new("synth");
1643 insert_observation(
1645 &conn,
1646 "app",
1647 "T1",
1648 "kubernetes rolling deploy strategy notes",
1649 2,
1650 );
1651 insert_observation(
1652 &conn,
1653 "app",
1654 "T2",
1655 "kubernetes rolling deploy strategy canary",
1656 3,
1657 );
1658 insert_observation(
1659 &conn,
1660 "app",
1661 "T3",
1662 "kubernetes canary deploy strategy rolling",
1663 1,
1664 );
1665 let report = run_reflection_pass(&store, &llm, None, Some("app"), None, true, |_| true)
1666 .await
1667 .unwrap();
1668 assert!(report.dry_run);
1669 assert!(report.observations_scanned >= 3);
1670 assert!(report.clusters_eligible >= 1);
1671 assert!(!report.dry_run_proposals.is_empty());
1672 assert_eq!(report.reflections_persisted, 0);
1673 }
1674
1675 #[tokio::test]
1676 async fn run_reflection_pass_persists_reflections() {
1677 let (store, _dir) = open_db();
1678 let conn = conn_of(&store);
1679 let llm = StubLlm::new("persisted pattern");
1680 insert_observation(&conn, "app", "T1", "shared keyword token strategy notes", 2);
1681 insert_observation(&conn, "app", "T2", "shared keyword token strategy plan", 3);
1682 insert_observation(
1683 &conn,
1684 "app",
1685 "T3",
1686 "shared keyword token strategy canary",
1687 1,
1688 );
1689 let report =
1690 run_reflection_pass(&store, &llm, None, Some("app"), None, false, |_| true)
1691 .await
1692 .unwrap();
1693 assert_eq!(report.dry_run, false);
1694 assert!(report.reflections_persisted >= 1);
1695 }
1696
1697 #[tokio::test]
1698 async fn run_reflection_pass_depth_refusal_increments_counter() {
1699 let (store, _dir) = open_db();
1700 let conn = conn_of(&store);
1701 let llm = StubLlm::new("synth");
1702 let now = chrono::Utc::now().to_rfc3339();
1705 for i in 0..3 {
1706 let mut metadata = crate::models::default_metadata();
1707 if let Some(obj) = metadata.as_object_mut() {
1708 obj.insert(
1709 "agent_id".to_string(),
1710 serde_json::Value::String("test-agent".to_string()),
1711 );
1712 }
1713 let m = Memory {
1714 id: uuid::Uuid::new_v4().to_string(),
1715 tier: Tier::Long,
1716 namespace: "deep".to_string(),
1717 title: format!("Tdeep-{i}"),
1718 content: "shared keyword token deep strategy".to_string(),
1719 tags: vec![],
1720 priority: 5,
1721 confidence: 1.0,
1722 source: "test".to_string(),
1723 access_count: 2,
1724 created_at: now.clone(),
1725 updated_at: now.clone(),
1726 last_accessed_at: None,
1727 expires_at: None,
1728 metadata,
1729 reflection_depth: 2,
1730 memory_kind: MemoryKind::Observation,
1731 entity_id: None,
1732 persona_version: None,
1733 citations: Vec::new(),
1734 source_uri: None,
1735 source_span: None,
1736 confidence_source: ConfidenceSource::CallerProvided,
1737 confidence_signals: None,
1738 confidence_decayed_at: None,
1739 version: 1,
1740 };
1741 crate::db::insert(&conn, &m).unwrap();
1742 }
1743 let report = run_reflection_pass(
1744 &store,
1745 &llm,
1746 None,
1747 Some("deep"),
1748 Some(2), false,
1750 |_| true,
1751 )
1752 .await
1753 .unwrap();
1754 assert!(report.depth_refusals >= 1);
1755 assert_eq!(report.reflections_persisted, 0);
1757 }
1758
1759 #[test]
1760 fn dry_run_proposal_serialises() {
1761 let p = DryRunProposal {
1762 namespace: "ns".into(),
1763 proposed_title: "[reflection] x".into(),
1764 source_ids: vec!["a".into(), "b".into()],
1765 };
1766 let j = serde_json::to_string(&p).unwrap();
1767 assert!(j.contains("source_ids"));
1768 let back: DryRunProposal = serde_json::from_str(&j).unwrap();
1769 assert_eq!(back.namespace, "ns");
1770 }
1771
1772 #[test]
1775 fn pair_co_occurs_unparseable_timestamps_still_checks_jaccard() {
1776 let mut a = make_obs("a", "ns", "t", "shared content tokens here", 1);
1777 let mut b = make_obs("b", "ns", "t", "shared content tokens here", 1);
1778 a.created_at = "not-a-timestamp".to_string();
1779 b.created_at = "also-invalid".to_string();
1780 assert!(pair_co_occurs(&a, &b));
1783 }
1784
1785 #[test]
1788 fn stub_llm_auto_tag_and_contradiction_paths() {
1789 let stub = StubLlm::new("S");
1790 let tags = stub.auto_tag("t", "c").unwrap();
1791 assert!(tags.is_empty());
1792 let conflict = stub.detect_contradiction("a", "b").unwrap();
1793 assert!(!conflict);
1794 }
1795
1796 #[test]
1797 fn eligible_pass_rejects_internal_namespace_directly() {
1798 let (store, _dir) = open_db();
1800 let llm = StubLlm::new("S");
1801 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1802 let cluster: Vec<Memory> = (0..MIN_CLUSTER_SIZE)
1803 .map(|i| make_obs(&format!("m{i}"), "_curator", "t", "c", 1))
1804 .collect();
1805 assert!(!pass.eligible(&cluster));
1806 }
1807
1808 #[test]
1809 fn jaccard_similarity_zero_union_returns_zero() {
1810 let a = "a b c"; let b = "x";
1815 assert_eq!(jaccard_similarity(a, b), 0.0);
1816 }
1817
1818 #[test]
1819 fn tier_rank_all_variants() {
1820 assert_eq!(tier_rank(&Tier::Short), 0);
1822 assert_eq!(tier_rank(&Tier::Mid), 1);
1823 assert_eq!(tier_rank(&Tier::Long), 2);
1824 }
1825
1826 #[test]
1827 fn parse_rfc3339_invalid_returns_none() {
1828 assert!(parse_rfc3339("garbage").is_none());
1829 assert!(parse_rfc3339("2026-01-01T00:00:00Z").is_some());
1830 }
1831
1832 #[tokio::test]
1833 async fn verify_skips_inbound_links() {
1834 let (store, _dir) = open_db();
1837 let conn = conn_of(&store);
1838 let llm = StubLlm::new("S");
1839 let pass = ReflectionPass::new(&store, &llm, None, None, false);
1840 let s1 =
1842 insert_observation(&conn, "vrf", "T1", "shared keyword pattern tokens here", 2);
1843 let s2 =
1844 insert_observation(&conn, "vrf", "T2", "shared keyword pattern tokens here", 2);
1845 let s3 =
1846 insert_observation(&conn, "vrf", "T3", "shared keyword pattern tokens here", 2);
1847 let summary = pass
1848 .summarize(&[
1849 crate::db::get(&conn, &s1).unwrap().unwrap(),
1850 crate::db::get(&conn, &s2).unwrap().unwrap(),
1851 crate::db::get(&conn, &s3).unwrap().unwrap(),
1852 ])
1853 .unwrap();
1854 pass.persist(&summary, &[s1.clone(), s2.clone(), s3.clone()])
1855 .await
1856 .unwrap();
1857 let listed = crate::db::list(
1858 &conn,
1859 Some("vrf"),
1860 None,
1861 32,
1862 0,
1863 None,
1864 None,
1865 None,
1866 None,
1867 None,
1868 )
1869 .unwrap();
1870 let refl_id = listed
1871 .iter()
1872 .find(|m| m.memory_kind == MemoryKind::Reflection)
1873 .unwrap()
1874 .id
1875 .clone();
1876 let _ = crate::db::create_link(&conn, &s1, &refl_id, "related_to");
1879 pass.verify(refl_id).await.unwrap();
1880 }
1881
1882 #[tokio::test]
1883 async fn run_reflection_pass_summarize_error_recorded() {
1884 struct FailingLlm;
1888 impl AutonomyLlm for FailingLlm {
1889 fn auto_tag(&self, _t: &str, _c: &str) -> Result<Vec<String>> {
1890 Ok(vec![])
1891 }
1892 fn detect_contradiction(&self, _a: &str, _b: &str) -> Result<bool> {
1893 Ok(false)
1894 }
1895 fn summarize_memories(&self, _m: &[(String, String)]) -> Result<String> {
1896 anyhow::bail!("forced llm failure")
1897 }
1898 }
1899 let (store, _dir) = open_db();
1900 let conn = conn_of(&store);
1901 let llm = FailingLlm;
1902 insert_observation(&conn, "ns", "T1", "shared keyword pattern tokens here", 2);
1903 insert_observation(&conn, "ns", "T2", "shared keyword pattern tokens here", 2);
1904 insert_observation(&conn, "ns", "T3", "shared keyword pattern tokens here", 2);
1905 let report = run_reflection_pass(&store, &llm, None, Some("ns"), None, false, |_| true)
1906 .await
1907 .unwrap();
1908 assert!(report.errors.iter().any(|e| e.contains("summarize failed")));
1910 assert_eq!(report.reflections_persisted, 0);
1911 }
1912 } }