1#![allow(missing_docs)]
2use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10
11use crate::{
12 application::ingest_claim::build_latest_disposition_map,
13 config::EngineConfig,
14 engine::{projection, truth_engine},
15 error::MemError,
16 ports::{PersistencePort, VectorPort},
17};
18
19use super::dto::{QueryMemoryRequest, QueryMemoryResponse};
20
21pub struct QueryMemoryUseCase<P, V>
25where
26 P: PersistencePort + Send + Sync + 'static,
27 V: VectorPort + Send + Sync + 'static,
28{
29 persistence: Arc<P>,
30 #[allow(dead_code)]
31 vector: Option<Arc<V>>, config: EngineConfig,
33}
34
35impl<P, V> QueryMemoryUseCase<P, V>
36where
37 P: PersistencePort + Send + Sync + 'static,
38 V: VectorPort + Send + Sync + 'static,
39{
40 pub fn new(persistence: Arc<P>, vector: Option<Arc<V>>, config: EngineConfig) -> Self {
41 Self { persistence, vector, config }
42 }
43
44 pub fn execute_with_time(
48 &self,
49 req: QueryMemoryRequest,
50 now: DateTime<Utc>,
51 ) -> Result<QueryMemoryResponse, MemError> {
52 let claims = self.persistence
54 .load_subject_line(&req.agent_id, &req.subject, &req.predicate)
55 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
56
57 let as_of = req.as_of_tx_time.unwrap_or(now);
60
61 let claim_refs: Vec<_> = claims.iter().map(|c| c.claim_ref().clone()).collect();
65 let all_ledger = self.persistence
66 .load_ledger_for_claims(&req.agent_id, &claim_refs)
67 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
68 let latest_disposition = build_latest_disposition_map(&all_ledger);
69
70 let fold = truth_engine::fold(
72 claims.clone(),
73 |cref| {
74 self.persistence
75 .load_validity_assertions_for(&req.agent_id, cref)
76 .unwrap_or_default()
77 },
78 as_of,
79 &self.config,
80 &latest_disposition,
81 );
82
83 let ledger_entries: Vec<_> = claims.iter().flat_map(|c| {
86 all_ledger.iter()
87 .filter(|e| &e.claim_ref == c.claim_ref())
88 .cloned()
89 }).collect();
90
91 let contested = fold.live_claims.iter().any(|cs| {
93 cs.last_disposition
94 .as_ref()
95 .map(|d| *d == mempill_types::Disposition::Contested)
96 .unwrap_or(false)
97 });
98
99 let belief = projection::project(&fold, &ledger_entries, now, &self.config, contested);
101
102 Ok(QueryMemoryResponse { belief })
103 }
104
105 pub fn execute(&self, req: QueryMemoryRequest) -> Result<QueryMemoryResponse, MemError> {
107 self.execute_with_time(req, Utc::now())
108 }
109}
110
111#[cfg(test)]
114mod tests {
115 use super::*;
116 use crate::noop::NoOpVector;
117 use crate::ports::persistence::Txn;
118 use chrono::TimeZone;
119 use mempill_types::{
120 AgentId, BeliefStatus, Cardinality, Claim, ClaimEdge, ClaimRef, Confidence, Criticality,
121 ExternalAnchor, ExternalKind, Fact, LedgerEntry, ProvenanceLabel, TransactionTime,
122 ValidTime, ValidityAssertion,
123 };
124 use std::sync::Mutex;
125
126 struct MockTxn(AgentId);
127 impl Txn for MockTxn {
128 fn agent_id(&self) -> &AgentId { &self.0 }
129 }
130
131 #[derive(Debug, thiserror::Error)]
132 #[error("mock")]
133 struct MockErr;
134
135 #[derive(Default)]
136 struct MockStore {
137 claims: Mutex<Vec<Claim>>,
138 }
139
140 impl PersistencePort for MockStore {
141 type Transaction = MockTxn;
142 type Error = MockErr;
143 fn begin_atomic(&self, aid: &AgentId) -> Result<MockTxn, MockErr> { Ok(MockTxn(aid.clone())) }
144 fn append_claim(&self, _t: &mut MockTxn, c: &Claim) -> Result<ClaimRef, MockErr> {
145 self.claims.lock().unwrap().push(c.clone());
146 Ok(c.claim_ref().clone())
147 }
148 fn append_validity_assertion(&self, _t: &mut MockTxn, _a: &ValidityAssertion) -> Result<(), MockErr> { Ok(()) }
149 fn append_ledger_entry(&self, _t: &mut MockTxn, _e: &LedgerEntry) -> Result<(), MockErr> { Ok(()) }
150 fn append_claim_edge(&self, _t: &mut MockTxn, _e: &ClaimEdge) -> Result<(), MockErr> { Ok(()) }
151 fn commit(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
152 fn rollback(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
153 fn load_subject_line(&self, _aid: &AgentId, subject: &str, predicate: &str) -> Result<Vec<Claim>, MockErr> {
154 let claims = self.claims.lock().unwrap();
155 Ok(claims.iter()
156 .filter(|c| c.fact().subject == subject && c.fact().predicate == predicate)
157 .cloned()
158 .collect())
159 }
160 fn load_claim(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Option<Claim>, MockErr> { Ok(None) }
161 fn load_validity_assertions_for(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ValidityAssertion>, MockErr> { Ok(vec![]) }
162 fn load_ledger(&self, _aid: &AgentId, _from: Option<&mempill_types::TransactionTime>, _lim: usize) -> Result<Vec<LedgerEntry>, MockErr> { Ok(vec![]) }
163 fn load_ledger_for_claims(&self, _aid: &AgentId, _refs: &[ClaimRef]) -> Result<Vec<LedgerEntry>, MockErr> { Ok(vec![]) }
164 fn load_edges_for(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
165 fn load_injected_claims(&self, _aid: &AgentId) -> Result<Vec<ClaimRef>, MockErr> { Ok(vec![]) }
166 fn load_lineage(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
167 }
168
169 fn make_claim(subject: &str, predicate: &str, value: serde_json::Value, tx: DateTime<Utc>) -> Claim {
170 Claim::new(
171 ClaimRef::new_random(),
172 AgentId("agent".into()),
173 Fact { subject: subject.into(), predicate: predicate.into(), value },
174 Cardinality::Functional,
175 ProvenanceLabel::External(ExternalKind::UserAsserted),
176 ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
177 TransactionTime(tx),
178 ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
179 Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
180 Criticality::Medium,
181 vec![],
182 None,
183 None,
184 )
185 }
186
187 #[test]
188 fn query_no_claims_returns_no_belief() {
189 let store = Arc::new(MockStore::default());
190 let uc = QueryMemoryUseCase::new(
191 Arc::clone(&store),
192 None::<Arc<NoOpVector>>,
193 EngineConfig::default(),
194 );
195 let now = Utc::now();
196 let req = QueryMemoryRequest {
197 agent_id: AgentId("agent".into()),
198 subject: "user".into(),
199 predicate: "city".into(),
200 as_of_tx_time: None,
201 };
202 let resp = uc.execute_with_time(req, now).unwrap();
203 assert_eq!(resp.belief.status, BeliefStatus::NoBelief);
204 }
205
206 #[test]
207 fn query_with_one_claim_returns_resolved() {
208 let store = Arc::new(MockStore::default());
209 let tx = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap();
210 let claim = make_claim("user", "city", serde_json::json!("Paris"), tx);
211 store.claims.lock().unwrap().push(claim);
212
213 let uc = QueryMemoryUseCase::new(
214 Arc::clone(&store),
215 None::<Arc<NoOpVector>>,
216 EngineConfig::default(),
217 );
218 let now = Utc.with_ymd_and_hms(2026, 1, 2, 0, 0, 0).unwrap();
219 let req = QueryMemoryRequest {
220 agent_id: AgentId("agent".into()),
221 subject: "user".into(),
222 predicate: "city".into(),
223 as_of_tx_time: None,
224 };
225 let resp = uc.execute_with_time(req, now).unwrap();
226 assert!(
228 matches!(resp.belief.status, BeliefStatus::TimingUncertain | BeliefStatus::Resolved),
229 "expected Resolved or TimingUncertain, got {:?}",
230 resp.belief.status
231 );
232 assert!(resp.belief.primary.is_some(), "primary belief must be present");
233 }
234
235 #[test]
236 fn query_now_injected_not_read_from_clock() {
237 let store = Arc::new(MockStore::default());
241 let old_tx = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
243 let claim = make_claim("user", "job", serde_json::json!("Engineer"), old_tx);
244 store.claims.lock().unwrap().push(claim);
245
246 let uc = QueryMemoryUseCase::new(
247 Arc::clone(&store),
248 None::<Arc<NoOpVector>>,
249 EngineConfig::default(),
250 );
251
252 let near_now = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 0).unwrap(); let req = QueryMemoryRequest {
255 agent_id: AgentId("agent".into()),
256 subject: "user".into(),
257 predicate: "job".into(),
258 as_of_tx_time: None,
259 };
260 let resp_near = uc.execute_with_time(req.clone(), near_now).unwrap();
261 assert!(resp_near.belief.primary.is_some());
262 assert_eq!(
263 resp_near.belief.primary.unwrap().currency_signal.state,
264 mempill_types::CurrencyState::Fresh,
265 "1 day after claim, currency must be Fresh (injected now, not system clock)"
266 );
267
268 let far_now = Utc.with_ymd_and_hms(2020, 7, 20, 0, 0, 0).unwrap(); let resp_far = uc.execute_with_time(req, far_now).unwrap();
271 assert!(resp_far.belief.primary.is_some());
272 assert_eq!(
273 resp_far.belief.primary.unwrap().currency_signal.state,
274 mempill_types::CurrencyState::Decayed,
275 "200 days after claim, currency must be Decayed (injected now, not system clock)"
276 );
277 }
278}