1mod error;
9pub mod postgres;
10
11pub use error::StoreError;
12pub use postgres::PostgresStore;
13
14use std::future::Future;
15
16use chrono::{DateTime, FixedOffset};
17
18use crate::memory::{ExtractionStat, ForgetTarget, Memory, MemoryKind, Scope, StatsFilter, SupersessionEvent};
19
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, strum::Display, strum::EnumString, strum::AsRefStr)]
26#[strum(serialize_all = "lowercase")]
27pub enum IndexStatus {
28 Pending,
30
31 Indexed,
33
34 Failed,
36}
37
38#[derive(Debug, Clone)]
46pub struct NewMemory {
47 pub scope: Scope,
49
50 pub content: String,
52
53 pub metadata: serde_json::Value,
55
56 pub kind: MemoryKind,
58
59 pub source_pid: Option<String>,
61
62 pub event_at: Option<DateTime<FixedOffset>>,
64
65 pub confidence: crate::memory::Confidence,
68}
69
70#[derive(Debug, Clone, Default)]
78pub struct EditPatch {
79 pub content: Option<String>,
81
82 pub metadata: Option<serde_json::Value>,
84
85 pub event_at: Option<Option<DateTime<FixedOffset>>>,
87}
88
89impl EditPatch {
90 #[must_use]
92 pub fn is_empty(&self) -> bool {
93 self.content.is_none() && self.metadata.is_none() && self.event_at.is_none()
94 }
95}
96
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
99pub enum TimelineDirection {
100 #[default]
102 Descending,
103
104 Ascending,
106}
107
108pub const DEFAULT_TIMELINE_LIMIT: usize = 50;
110
111#[derive(Debug, Clone)]
117pub struct TimelineParams {
118 pub kinds: crate::memory::KindSelector,
119 pub created_after: Option<DateTime<FixedOffset>>,
120 pub created_before: Option<DateTime<FixedOffset>>,
121 pub event_at_after: Option<DateTime<FixedOffset>>,
122 pub event_at_before: Option<DateTime<FixedOffset>>,
123 pub include_superseded: bool,
124 pub limit: usize,
125 pub direction: TimelineDirection,
126}
127
128impl Default for TimelineParams {
129 fn default() -> Self {
130 Self {
131 kinds: crate::memory::KindSelector::default(),
132 created_after: None,
133 created_before: None,
134 event_at_after: None,
135 event_at_before: None,
136 include_superseded: true,
137 limit: DEFAULT_TIMELINE_LIMIT,
138 direction: TimelineDirection::Descending,
139 }
140 }
141}
142
143#[derive(Debug, Clone)]
148pub struct AsOfParams {
149 pub as_of: DateTime<FixedOffset>,
150 pub kinds: crate::memory::KindSelector,
151 pub limit: usize,
152}
153
154impl AsOfParams {
155 pub fn new(as_of: DateTime<FixedOffset>) -> Self {
157 Self {
158 as_of,
159 kinds: crate::memory::KindSelector::default(),
160 limit: DEFAULT_TIMELINE_LIMIT,
161 }
162 }
163}
164
165pub trait MemoryStore: Send + Sync + 'static {
171 fn remember(&self, new: NewMemory) -> impl Future<Output = Result<Memory, StoreError>> + Send;
186
187 fn recall(&self, pid: &str) -> impl Future<Output = Result<Memory, StoreError>> + Send;
194
195 fn timeline(
208 &self,
209 scope: Scope,
210 params: TimelineParams,
211 ) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
212
213 fn memories_as_of(
226 &self,
227 scope: Scope,
228 params: AsOfParams,
229 ) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
230
231 fn find_by_pids(&self, pids: &[&str]) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
241
242 fn active_semantics_for_source(
255 &self,
256 source_pid: &str,
257 ) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
258
259 fn extraction_stats(
274 &self,
275 filter: StatsFilter,
276 ) -> impl Future<Output = Result<Vec<ExtractionStat>, StoreError>> + Send;
277
278 fn forget(&self, target: ForgetTarget) -> impl Future<Output = Result<Vec<String>, StoreError>> + Send;
291
292 fn set_index_status(&self, pid: &str, status: IndexStatus) -> impl Future<Output = Result<(), StoreError>> + Send;
302
303 fn set_category(&self, pid: &str, category: &str) -> impl Future<Output = Result<(), StoreError>> + Send;
314
315 fn retire(
329 &self,
330 pid: &str,
331 reason: crate::memory::RetirementReason,
332 ) -> impl Future<Output = Result<(), StoreError>> + Send;
333
334 fn find_failed(&self, limit: usize) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
343
344 fn list_scopes(&self) -> impl Future<Output = Result<Vec<Scope>, StoreError>> + Send;
354
355 fn list_agent_ids(
366 &self,
367 org_id: &str,
368 user_id: &str,
369 ) -> impl Future<Output = Result<Vec<String>, StoreError>> + Send;
370
371 fn indexed_pids_in_scope(&self, scope: &Scope) -> impl Future<Output = Result<Vec<String>, StoreError>> + Send;
383
384 fn edit(&self, pid: &str, patch: EditPatch) -> impl Future<Output = Result<Memory, StoreError>> + Send;
402
403 fn supersede(&self, pid: &str, by_pid: &str) -> impl Future<Output = Result<(), StoreError>> + Send;
418
419 fn unsupersede(&self, pid: &str) -> impl Future<Output = Result<(), StoreError>> + Send;
431
432 fn supersession_at(
449 &self,
450 pid: &str,
451 as_of: DateTime<FixedOffset>,
452 ) -> impl Future<Output = Result<Option<String>, StoreError>> + Send;
453
454 fn supersession_history(
471 &self,
472 pid: &str,
473 ) -> impl Future<Output = Result<Vec<SupersessionEvent>, StoreError>> + Send;
474}
475
476#[cfg(test)]
477mod tests {
478 use super::*;
479 use chrono::Utc;
480 use std::collections::BTreeMap;
481 use std::sync::Mutex;
482
483 #[derive(Debug, Clone)]
488 struct StubEvent {
489 loser_pid: String,
490 winner_pid: Option<String>,
491 decided_at: DateTime<FixedOffset>,
492 }
493
494 #[derive(Default)]
495 struct StubStore {
496 memories: Mutex<Vec<Memory>>,
497 events: Mutex<Vec<StubEvent>>,
498 }
499
500 impl StubStore {
501 fn refresh_cache(&self, pid: &str) {
507 let events = self.events.lock().unwrap();
508 let latest = events
509 .iter()
510 .filter(|e| e.loser_pid == pid)
511 .max_by_key(|e| e.decided_at);
512 let supersession = latest.and_then(|e| {
513 e.winner_pid.clone().map(|winner_pid| crate::memory::SupersessionInfo {
514 winner_pid,
515 at: e.decided_at,
516 })
517 });
518 drop(events);
519 let mut memories = self.memories.lock().unwrap();
520 if let Some(m) = memories.iter_mut().find(|m| m.pid == pid) {
521 m.supersession = supersession;
522 }
523 }
524 }
525
526 impl MemoryStore for StubStore {
527 async fn remember(&self, new: NewMemory) -> Result<Memory, StoreError> {
528 let NewMemory {
529 scope,
530 content,
531 metadata,
532 kind,
533 source_pid,
534 event_at,
535 confidence,
536 } = new;
537 let now: chrono::DateTime<chrono::FixedOffset> = Utc::now().into();
538 let memory = Memory {
539 pid: format!("test-{}", self.memories.lock().unwrap().len()),
540 scope,
541 content,
542 metadata,
543 kind,
544 source_pid,
545 supersession: None,
546 created_at: now,
547 updated_at: now,
548 event_at,
549 score: None,
550 status: IndexStatus::Pending,
551 confidence,
552 category: None,
553 retirement: None,
554 };
555 self.memories.lock().unwrap().push(memory.clone());
556 Ok(memory)
557 }
558
559 async fn recall(&self, pid: &str) -> Result<Memory, StoreError> {
560 self.memories
561 .lock()
562 .unwrap()
563 .iter()
564 .find(|m| m.pid == pid)
565 .cloned()
566 .ok_or_else(|| StoreError::NotFound(pid.to_string()))
567 }
568
569 async fn find_by_pids(&self, pids: &[&str]) -> Result<Vec<Memory>, StoreError> {
570 let memories = self.memories.lock().unwrap();
571 Ok(pids
572 .iter()
573 .filter_map(|pid| memories.iter().find(|m| m.pid == *pid).cloned())
574 .collect())
575 }
576
577 async fn active_semantics_for_source(&self, source_pid: &str) -> Result<Vec<Memory>, StoreError> {
578 let memories = self.memories.lock().unwrap();
579 Ok(memories
580 .iter()
581 .filter(|m| m.kind == MemoryKind::Semantic)
582 .filter(|m| m.source_pid.as_deref() == Some(source_pid))
583 .filter(|m| m.supersession.is_none() && m.retirement.is_none())
584 .cloned()
585 .collect())
586 }
587
588 async fn extraction_stats(&self, filter: StatsFilter) -> Result<Vec<ExtractionStat>, StoreError> {
589 let memories = self.memories.lock().unwrap();
590 let mut tallies: BTreeMap<(String, String), (u64, u64)> = BTreeMap::new();
591 for m in memories.iter() {
592 if m.kind != MemoryKind::Semantic {
593 continue;
594 }
595 if filter.agent_id.as_ref().is_some_and(|a| a != &m.scope.agent_id)
596 || filter.org_id.as_ref().is_some_and(|o| o != &m.scope.org_id)
597 || filter.user_id.as_ref().is_some_and(|u| u != &m.scope.user_id)
598 {
599 continue;
600 }
601 let provider = m.metadata.get("provider").and_then(|v| v.as_str()).unwrap_or("").to_string();
602 let model = m.metadata.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string();
603 let entry = tallies.entry((provider, model)).or_insert((0, 0));
604 entry.0 += 1;
605 if m.retirement == Some(crate::memory::RetirementReason::Rejected) {
606 entry.1 += 1;
607 }
608 }
609
610 Ok(tallies
611 .into_iter()
612 .map(|((provider, model), (total, rejected))| ExtractionStat {
613 provider,
614 model,
615 total,
616 rejected,
617 })
618 .collect())
619 }
620
621 async fn timeline(&self, scope: Scope, params: TimelineParams) -> Result<Vec<Memory>, StoreError> {
622 scope.validate()?;
623 let memories = self.memories.lock().unwrap();
624
625 let mut filtered: Vec<Memory> = memories
626 .iter()
627 .filter(|m| m.scope == scope)
628 .filter(|m| match m.kind {
629 MemoryKind::Episodic => params.kinds.episodic,
630 MemoryKind::Semantic => params.kinds.semantic,
631 })
632 .filter(|m| params.created_after.is_none_or(|t| m.created_at >= t))
633 .filter(|m| params.created_before.is_none_or(|t| m.created_at < t))
634 .filter(|m| {
635 params
636 .event_at_after
637 .is_none_or(|t| m.event_at.is_some_and(|ev| ev >= t))
638 })
639 .filter(|m| {
640 params
641 .event_at_before
642 .is_none_or(|t| m.event_at.is_some_and(|ev| ev < t))
643 })
644 .filter(|m| params.include_superseded || m.supersession.is_none())
645 .cloned()
646 .collect();
647
648 filtered.sort_by(|a, b| match params.direction {
649 TimelineDirection::Descending => b.created_at.cmp(&a.created_at),
650 TimelineDirection::Ascending => a.created_at.cmp(&b.created_at),
651 });
652 filtered.truncate(params.limit);
653 Ok(filtered)
654 }
655
656 async fn memories_as_of(&self, scope: Scope, params: AsOfParams) -> Result<Vec<Memory>, StoreError> {
657 scope.validate()?;
658 let memories = self.memories.lock().unwrap();
659 let events = self.events.lock().unwrap();
660
661 let mut filtered: Vec<Memory> = memories
662 .iter()
663 .filter(|m| m.scope == scope)
664 .filter(|m| m.created_at <= params.as_of)
665 .filter(|m| match m.kind {
666 MemoryKind::Episodic => params.kinds.episodic,
667 MemoryKind::Semantic => params.kinds.semantic,
668 })
669 .filter(|m| {
670 let latest = events
671 .iter()
672 .filter(|e| e.loser_pid == m.pid && e.decided_at <= params.as_of)
673 .max_by_key(|e| e.decided_at);
674 match latest {
675 None => true,
676 Some(e) => e.winner_pid.is_none(),
677 }
678 })
679 .cloned()
680 .collect();
681
682 filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at));
683 filtered.truncate(params.limit);
684 Ok(filtered)
685 }
686
687 async fn forget(&self, target: ForgetTarget) -> Result<Vec<String>, StoreError> {
688 let mut memories = self.memories.lock().unwrap();
689 let mut deleted = Vec::new();
690 match target {
691 ForgetTarget::Pid(pid) => {
692 memories.retain(|m| {
695 if m.pid == pid || m.source_pid.as_deref() == Some(pid.as_str()) {
696 deleted.push(m.pid.clone());
697 false
698 } else {
699 true
700 }
701 });
702 }
703 ForgetTarget::Scope(scope) => {
704 memories.retain(|m| {
705 if m.scope == scope {
706 deleted.push(m.pid.clone());
707 false
708 } else {
709 true
710 }
711 });
712 }
713 }
714 Ok(deleted)
715 }
716
717 async fn set_index_status(&self, _pid: &str, _status: IndexStatus) -> Result<(), StoreError> {
718 Ok(())
719 }
720
721 async fn set_category(&self, pid: &str, category: &str) -> Result<(), StoreError> {
722 let mut memories = self.memories.lock().unwrap();
723 let memory = memories
724 .iter_mut()
725 .find(|m| m.pid == pid)
726 .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
727 memory.category = Some(category.to_string());
728 Ok(())
729 }
730
731 async fn retire(&self, pid: &str, reason: crate::memory::RetirementReason) -> Result<(), StoreError> {
732 let mut memories = self.memories.lock().unwrap();
733 let memory = memories
734 .iter_mut()
735 .find(|m| m.pid == pid)
736 .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
737 memory.retirement = Some(reason);
738 Ok(())
739 }
740
741 async fn find_failed(&self, _limit: usize) -> Result<Vec<Memory>, StoreError> {
742 Ok(Vec::new())
743 }
744
745 async fn list_scopes(&self) -> Result<Vec<Scope>, StoreError> {
746 let scopes: std::collections::HashSet<Scope> =
747 self.memories.lock().unwrap().iter().map(|m| m.scope.clone()).collect();
748 Ok(scopes.into_iter().collect())
749 }
750
751 async fn list_agent_ids(&self, org_id: &str, user_id: &str) -> Result<Vec<String>, StoreError> {
752 let agent_ids: std::collections::BTreeSet<String> = self
753 .memories
754 .lock()
755 .unwrap()
756 .iter()
757 .filter(|m| m.scope.org_id == org_id && m.scope.user_id == user_id)
758 .map(|m| m.scope.agent_id.clone())
759 .collect();
760 Ok(agent_ids.into_iter().collect())
761 }
762
763 async fn indexed_pids_in_scope(&self, scope: &Scope) -> Result<Vec<String>, StoreError> {
764 Ok(self
765 .memories
766 .lock()
767 .unwrap()
768 .iter()
769 .filter(|m| &m.scope == scope)
770 .map(|m| m.pid.clone())
771 .collect())
772 }
773
774 async fn edit(&self, pid: &str, patch: EditPatch) -> Result<Memory, StoreError> {
775 let mut memories = self.memories.lock().unwrap();
776 let memory = memories
777 .iter_mut()
778 .find(|m| m.pid == pid)
779 .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
780 if memory.kind != MemoryKind::Episodic {
781 return Err(StoreError::UnsupportedEdit {
782 pid: pid.to_string(),
783 kind: memory.kind,
784 });
785 }
786 if let Some(content) = patch.content {
787 memory.content = content;
788 }
789 if let Some(metadata) = patch.metadata {
790 memory.metadata = metadata;
791 }
792 if let Some(event_at) = patch.event_at {
793 memory.event_at = event_at;
794 }
795 memory.updated_at = Utc::now().into();
796 Ok(memory.clone())
797 }
798
799 async fn supersede(&self, pid: &str, by_pid: &str) -> Result<(), StoreError> {
800 {
803 let memories = self.memories.lock().unwrap();
804 if !memories.iter().any(|m| m.pid == pid) {
805 return Err(StoreError::NotFound(pid.to_string()));
806 }
807 }
808 self.events.lock().unwrap().push(StubEvent {
809 loser_pid: pid.to_string(),
810 winner_pid: Some(by_pid.to_string()),
811 decided_at: Utc::now().into(),
812 });
813 self.refresh_cache(pid);
814 Ok(())
815 }
816
817 async fn unsupersede(&self, pid: &str) -> Result<(), StoreError> {
818 {
819 let memories = self.memories.lock().unwrap();
820 if !memories.iter().any(|m| m.pid == pid) {
821 return Err(StoreError::NotFound(pid.to_string()));
822 }
823 }
824 self.events.lock().unwrap().push(StubEvent {
826 loser_pid: pid.to_string(),
827 winner_pid: None,
828 decided_at: Utc::now().into(),
829 });
830 self.refresh_cache(pid);
831 Ok(())
832 }
833
834 async fn supersession_at(&self, pid: &str, as_of: DateTime<FixedOffset>) -> Result<Option<String>, StoreError> {
835 let events = self.events.lock().unwrap();
836 let latest = events
837 .iter()
838 .filter(|e| e.loser_pid == pid && e.decided_at <= as_of)
839 .max_by_key(|e| e.decided_at);
840 Ok(latest.and_then(|e| e.winner_pid.clone()))
841 }
842
843 async fn supersession_history(&self, pid: &str) -> Result<Vec<SupersessionEvent>, StoreError> {
844 let events = self.events.lock().unwrap();
845 let mut trail: Vec<SupersessionEvent> = events
846 .iter()
847 .filter(|e| e.loser_pid == pid)
848 .map(|e| SupersessionEvent {
849 winner_pid: e.winner_pid.clone(),
850 decided_at: e.decided_at,
851 })
852 .collect();
853 trail.sort_by_key(|e| e.decided_at);
854 Ok(trail)
855 }
856 }
857
858 #[tokio::test(flavor = "current_thread")]
859 async fn should_implement_trait_with_in_test_stub() {
860 let store = StubStore::default();
861 let scope = Scope {
862 agent_id: "a".to_string(),
863 org_id: "o".to_string(),
864 user_id: "u".to_string(),
865 };
866
867 let memory = store
868 .remember(NewMemory {
869 scope: scope.clone(),
870 content: "content".to_string(),
871 metadata: serde_json::json!({}),
872 kind: MemoryKind::Episodic,
873 source_pid: None,
874 event_at: None,
875 confidence: crate::memory::Confidence::MAX,
876 })
877 .await
878 .unwrap();
879 assert_eq!(memory.content, "content");
880
881 let recalled = store.recall(&memory.pid).await.unwrap();
882 assert_eq!(recalled.pid, memory.pid);
883
884 let deleted = store.forget(ForgetTarget::Pid(memory.pid.clone())).await.unwrap();
885 assert_eq!(deleted, vec![memory.pid.clone()]);
886
887 let not_found = store.recall(&memory.pid).await;
888 assert!(matches!(not_found, Err(StoreError::NotFound(_))));
889 }
890
891 #[tokio::test]
892 async fn should_list_distinct_sorted_agent_ids_for_matching_org_and_user() {
893 let store = StubStore::default();
894 let remember = async |agent: &str, org: &str, user: &str| {
895 store
896 .remember(NewMemory {
897 scope: Scope {
898 agent_id: agent.to_string(),
899 org_id: org.to_string(),
900 user_id: user.to_string(),
901 },
902 content: "c".to_string(),
903 metadata: serde_json::json!({}),
904 kind: MemoryKind::Episodic,
905 source_pid: None,
906 event_at: None,
907 confidence: crate::memory::Confidence::MAX,
908 })
909 .await
910 .unwrap();
911 };
912
913 remember("zeta", "o", "u").await;
914 remember("alpha", "o", "u").await;
915 remember("alpha", "o", "u").await; remember("other-org", "o2", "u").await; remember("other-user", "o", "u2").await; let agents = store.list_agent_ids("o", "u").await.unwrap();
920
921 assert_eq!(agents, vec!["alpha".to_string(), "zeta".to_string()]);
922 }
923
924 #[tokio::test]
925 async fn should_return_empty_agent_ids_when_scope_has_no_memories() {
926 let store = StubStore::default();
927 let agents = store.list_agent_ids("empty-org", "empty-user").await.unwrap();
928 assert!(agents.is_empty());
929 }
930
931 async fn write_semantic(store: &StubStore, scope: Scope, provider: &str, model: &str) -> String {
933 store
934 .remember(NewMemory {
935 scope,
936 content: "a derived fact".to_string(),
937 metadata: serde_json::json!({ "provider": provider, "model": model }),
938 kind: MemoryKind::Semantic,
939 source_pid: Some("src".to_string()),
940 event_at: None,
941 confidence: crate::memory::Confidence::new(80),
942 })
943 .await
944 .unwrap()
945 .pid
946 }
947
948 #[tokio::test]
949 async fn should_count_only_rejected_rows_in_extraction_numerator() {
950 let store = StubStore::default();
951 let scope = Scope {
952 agent_id: "a".to_string(),
953 org_id: "o".to_string(),
954 user_id: "u".to_string(),
955 };
956
957 let rejected = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
960 let stale = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
961 let superseded = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
962 let winner = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
963
964 store.retire(&rejected, crate::memory::RetirementReason::Rejected).await.unwrap();
965 store.retire(&stale, crate::memory::RetirementReason::Stale).await.unwrap();
966 store.supersede(&superseded, &winner).await.unwrap();
967
968 let stats = store.extraction_stats(StatsFilter::default()).await.unwrap();
969
970 assert_eq!(stats.len(), 1, "one (provider, model) pair");
971 let stat = &stats[0];
972 assert_eq!(stat.total, 4, "every semantic row counts in the denominator, regardless of retirement");
973 assert_eq!(
974 stat.rejected, 1,
975 "only Rejected counts; Stale (source changed) and Superseded (newer fact won) are not model errors",
976 );
977 }
978
979 #[tokio::test]
980 async fn should_break_extraction_stats_down_per_provider_and_model() {
981 let store = StubStore::default();
982 let scope = Scope {
983 agent_id: "a".to_string(),
984 org_id: "o".to_string(),
985 user_id: "u".to_string(),
986 };
987
988 let weak = write_semantic(&store, scope.clone(), "ollama", "llama3.2:1b").await;
989 write_semantic(&store, scope.clone(), "ollama", "llama3.2:1b").await;
990 write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
991 store.retire(&weak, crate::memory::RetirementReason::Rejected).await.unwrap();
992
993 let stats = store.extraction_stats(StatsFilter::default()).await.unwrap();
994
995 assert_eq!(stats.len(), 2, "one row per distinct (provider, model)");
997 assert_eq!(stats[0].model, "llama3.2:1b");
998 assert_eq!((stats[0].total, stats[0].rejected), (2, 1));
999 assert_eq!(stats[1].model, "qwen3:14b");
1000 assert_eq!((stats[1].total, stats[1].rejected), (1, 0), "rejecting one model must not touch the other");
1001 }
1002
1003 #[tokio::test]
1004 async fn should_scope_extraction_stats_to_the_filtered_subset() {
1005 let store = StubStore::default();
1006 let mine = Scope {
1007 agent_id: "a".to_string(),
1008 org_id: "acme".to_string(),
1009 user_id: "u".to_string(),
1010 };
1011 let theirs = Scope {
1012 agent_id: "a".to_string(),
1013 org_id: "other".to_string(),
1014 user_id: "u".to_string(),
1015 };
1016
1017 write_semantic(&store, mine.clone(), "ollama", "m").await;
1018 write_semantic(&store, theirs.clone(), "ollama", "m").await;
1019
1020 let filter = StatsFilter {
1021 org_id: Some("acme".to_string()),
1022 ..StatsFilter::default()
1023 };
1024 let stats = store.extraction_stats(filter).await.unwrap();
1025
1026 assert_eq!(stats.len(), 1);
1027 assert_eq!(stats[0].total, 1, "the org filter must exclude another org's extractions");
1028 }
1029
1030 #[test]
1031 fn should_render_index_status_as_lowercase_string() {
1032 assert_eq!(IndexStatus::Pending.as_ref(), "pending");
1033 assert_eq!(IndexStatus::Indexed.as_ref(), "indexed");
1034 assert_eq!(IndexStatus::Failed.as_ref(), "failed");
1035 }
1036
1037 async fn write(store: &StubStore, content: &str) -> Memory {
1038 let scope = Scope {
1039 agent_id: "a".to_string(),
1040 org_id: "o".to_string(),
1041 user_id: "u".to_string(),
1042 };
1043 store
1044 .remember(NewMemory {
1045 scope,
1046 content: content.to_string(),
1047 metadata: serde_json::json!({}),
1048 kind: MemoryKind::Semantic,
1049 source_pid: None,
1050 event_at: None,
1051 confidence: crate::memory::Confidence::MAX,
1052 })
1053 .await
1054 .unwrap()
1055 }
1056
1057 #[tokio::test(flavor = "current_thread")]
1058 async fn should_set_superseded_by_when_supersede_called() {
1059 let store = StubStore::default();
1060 let loser = write(&store, "old fact").await;
1061 let winner = write(&store, "new fact").await;
1062
1063 store.supersede(&loser.pid, &winner.pid).await.unwrap();
1064
1065 let after = store.recall(&loser.pid).await.unwrap();
1066 let supersession = after.supersession.as_ref().expect("supersession set");
1067 assert_eq!(supersession.winner_pid, winner.pid);
1068 }
1069
1070 #[tokio::test(flavor = "current_thread")]
1071 async fn should_clear_superseded_by_when_unsupersede_called() {
1072 let store = StubStore::default();
1073 let loser = write(&store, "old fact").await;
1074 let winner = write(&store, "new fact").await;
1075 store.supersede(&loser.pid, &winner.pid).await.unwrap();
1076
1077 store.unsupersede(&loser.pid).await.unwrap();
1078
1079 let after = store.recall(&loser.pid).await.unwrap();
1080 assert_eq!(after.supersession, None);
1081 }
1082
1083 #[tokio::test(flavor = "current_thread")]
1084 async fn should_return_not_found_when_supersede_targets_missing_pid() {
1085 let store = StubStore::default();
1086 let winner = write(&store, "fact").await;
1087
1088 let result = store.supersede("does-not-exist", &winner.pid).await;
1089
1090 assert!(matches!(result, Err(StoreError::NotFound(_))));
1091 }
1092
1093 #[tokio::test(flavor = "current_thread")]
1094 async fn should_return_not_found_when_unsupersede_targets_missing_pid() {
1095 let store = StubStore::default();
1096
1097 let result = store.unsupersede("does-not-exist").await;
1098
1099 assert!(matches!(result, Err(StoreError::NotFound(_))));
1100 }
1101
1102 #[tokio::test(flavor = "current_thread")]
1103 async fn should_resolve_to_latest_winner_when_resuperseded() {
1104 let store = StubStore::default();
1105 let loser = write(&store, "old").await;
1106 let first_winner = write(&store, "first").await;
1107 let second_winner = write(&store, "second").await;
1108
1109 store.supersede(&loser.pid, &first_winner.pid).await.unwrap();
1110 store.supersede(&loser.pid, &second_winner.pid).await.unwrap();
1111
1112 let after = store.recall(&loser.pid).await.unwrap();
1113 let supersession = after.supersession.as_ref().expect("supersession set");
1114 assert_eq!(
1115 supersession.winner_pid, second_winner.pid,
1116 "latest event wins the cache"
1117 );
1118 }
1119
1120 #[tokio::test(flavor = "current_thread")]
1121 async fn should_return_winner_pid_from_supersession_at_for_past_timestamp() {
1122 let store = StubStore::default();
1123 let loser = write(&store, "loser").await;
1124 let winner = write(&store, "winner").await;
1125 store.supersede(&loser.pid, &winner.pid).await.unwrap();
1126 let now: DateTime<FixedOffset> = Utc::now().into();
1127
1128 let result = store.supersession_at(&loser.pid, now).await.unwrap();
1129
1130 assert_eq!(result.as_deref(), Some(winner.pid.as_str()));
1131 }
1132
1133 #[tokio::test(flavor = "current_thread")]
1134 async fn should_return_none_from_supersession_at_when_as_of_predates_event() {
1135 let store = StubStore::default();
1136 let loser = write(&store, "loser").await;
1137 let winner = write(&store, "winner").await;
1138 let before: DateTime<FixedOffset> = Utc::now().into();
1139 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1142 store.supersede(&loser.pid, &winner.pid).await.unwrap();
1143
1144 let result = store.supersession_at(&loser.pid, before).await.unwrap();
1145
1146 assert!(result.is_none(), "events after as_of must not count");
1147 }
1148
1149 #[tokio::test(flavor = "current_thread")]
1150 async fn should_return_none_from_supersession_at_when_latest_event_was_unsupersede() {
1151 let store = StubStore::default();
1152 let loser = write(&store, "loser").await;
1153 let winner = write(&store, "winner").await;
1154 store.supersede(&loser.pid, &winner.pid).await.unwrap();
1155 store.unsupersede(&loser.pid).await.unwrap();
1156 let now: DateTime<FixedOffset> = Utc::now().into();
1157
1158 let result = store.supersession_at(&loser.pid, now).await.unwrap();
1159
1160 assert!(result.is_none(), "unsupersede event clears the as-of answer");
1161 }
1162
1163 #[tokio::test(flavor = "current_thread")]
1164 async fn should_return_empty_supersession_history_when_pid_has_no_events() {
1165 let store = StubStore::default();
1166 let solo = write(&store, "never superseded").await;
1167
1168 let trail = store.supersession_history(&solo.pid).await.unwrap();
1169
1170 assert!(trail.is_empty(), "no events = empty trail, not NotFound");
1171 }
1172
1173 #[tokio::test(flavor = "current_thread")]
1174 async fn should_return_supersession_history_in_ascending_order() {
1175 let store = StubStore::default();
1176 let loser = write(&store, "old").await;
1177 let first = write(&store, "first").await;
1178 let second = write(&store, "second").await;
1179 store.supersede(&loser.pid, &first.pid).await.unwrap();
1180 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1181 store.supersede(&loser.pid, &second.pid).await.unwrap();
1182
1183 let trail = store.supersession_history(&loser.pid).await.unwrap();
1184
1185 assert_eq!(trail.len(), 2, "both events present");
1186 assert_eq!(trail[0].winner_pid.as_deref(), Some(first.pid.as_str()));
1187 assert_eq!(trail[1].winner_pid.as_deref(), Some(second.pid.as_str()));
1188 assert!(trail[0].decided_at <= trail[1].decided_at, "ascending by decided_at");
1189 }
1190
1191 #[tokio::test(flavor = "current_thread")]
1192 async fn should_include_unsupersede_events_in_supersession_history() {
1193 let store = StubStore::default();
1194 let loser = write(&store, "old").await;
1195 let winner = write(&store, "winner").await;
1196 store.supersede(&loser.pid, &winner.pid).await.unwrap();
1197 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1198 store.unsupersede(&loser.pid).await.unwrap();
1199
1200 let trail = store.supersession_history(&loser.pid).await.unwrap();
1201
1202 assert_eq!(trail.len(), 2);
1203 assert_eq!(
1204 trail[0].winner_pid.as_deref(),
1205 Some(winner.pid.as_str()),
1206 "supersede first"
1207 );
1208 assert!(
1209 trail[1].winner_pid.is_none(),
1210 "unsupersede represented as winner_pid=None"
1211 );
1212 }
1213
1214 #[tokio::test(flavor = "current_thread")]
1215 async fn should_reject_edit_when_kind_is_not_episodic() {
1216 let store = StubStore::default();
1217 let semantic = write(&store, "derived fact").await;
1218
1219 let result = store
1220 .edit(
1221 &semantic.pid,
1222 EditPatch {
1223 content: Some("hand edit".to_string()),
1224 ..EditPatch::default()
1225 },
1226 )
1227 .await;
1228
1229 match result {
1230 Err(StoreError::UnsupportedEdit { pid, kind }) => {
1231 assert_eq!(pid, semantic.pid);
1232 assert_eq!(kind, MemoryKind::Semantic);
1233 }
1234 other => panic!("expected UnsupportedEdit for semantic kind; got {other:?}"),
1235 }
1236 }
1237
1238 #[tokio::test(flavor = "current_thread")]
1239 async fn should_return_not_found_when_editing_missing_pid() {
1240 let store = StubStore::default();
1241
1242 let result = store
1243 .edit(
1244 "no-such-pid",
1245 EditPatch {
1246 content: Some("anything".to_string()),
1247 ..EditPatch::default()
1248 },
1249 )
1250 .await;
1251
1252 match result {
1253 Err(StoreError::NotFound(pid)) => assert_eq!(pid, "no-such-pid"),
1254 other => panic!("expected NotFound for missing pid; got {other:?}"),
1255 }
1256 }
1257}