Skip to main content

mempill_core/application/
query_memory.rs

1#![allow(missing_docs)]
2//! QueryMemoryUseCase — application layer read path.
3//!
4//! Read-only: no Txn opened, no writes. Delegates to TruthEngine (fold)
5//! then Projection (project). `now` is injected by the EngineHandle boundary.
6
7use std::sync::Arc;
8
9use chrono::{DateTime, Utc};
10
11use crate::{
12    application::ingest_claim::build_latest_disposition_map,
13    config::EngineConfig,
14    engine::{projection, truth_engine},
15    error::MemError,
16    ports::{PersistencePort, VectorPort},
17};
18
19use super::dto::{QueryMemoryRequest, QueryMemoryResponse};
20
21/// Use-case: query the canonical belief for a (subject, predicate) line.
22/// Generic over persistence and vector ports.
23/// Vector is optional: None = structural-only mode (v0.1 default).
24pub struct QueryMemoryUseCase<P, V>
25where
26    P: PersistencePort + Send + Sync + 'static,
27    V: VectorPort + Send + Sync + 'static,
28{
29    persistence: Arc<P>,
30    #[allow(dead_code)]
31    vector: Option<Arc<V>>, // v0.1: unused; structural-only query
32    config: EngineConfig,
33}
34
35impl<P, V> QueryMemoryUseCase<P, V>
36where
37    P: PersistencePort + Send + Sync + 'static,
38    V: VectorPort + Send + Sync + 'static,
39{
40    pub fn new(persistence: Arc<P>, vector: Option<Arc<V>>, config: EngineConfig) -> Self {
41        Self { persistence, vector, config }
42    }
43
44    /// Read path: no Txn (read-only). TruthEngine fold → Projection → DTO.
45    ///
46    /// `now` is injected by the EngineHandle (DETERMINISM — no clock reads here).
47    pub fn execute_with_time(
48        &self,
49        req: QueryMemoryRequest,
50        now: DateTime<Utc>,
51    ) -> Result<QueryMemoryResponse, MemError> {
52        // Load all claims for the subject-line.
53        let claims = self.persistence
54            .load_subject_line(&req.agent_id, &req.subject, &req.predicate)
55            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
56
57        // Determine the bi-temporal as-of point: use the request's as_of_tx_time if supplied,
58        // otherwise use the injected `now`.
59        let as_of = req.as_of_tx_time.unwrap_or(now);
60
61        // Load ledger for the disposition-based liveness filter — scoped to exactly the
62        // claims on this subject-line (no agent-wide cap; always complete regardless of
63        // total agent ledger size — fixes the silent-wrong-belief-at-scale bug).
64        let claim_refs: Vec<_> = claims.iter().map(|c| c.claim_ref().clone()).collect();
65        let all_ledger = self.persistence
66            .load_ledger_for_claims(&req.agent_id, &claim_refs)
67            .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
68        let latest_disposition = build_latest_disposition_map(&all_ledger);
69
70        // C2: canonical valid-time fold (with disposition filter).
71        let fold = truth_engine::fold(
72            claims.clone(),
73            |cref| {
74                self.persistence
75                    .load_validity_assertions_for(&req.agent_id, cref)
76                    .unwrap_or_default()
77            },
78            as_of,
79            &self.config,
80            &latest_disposition,
81        );
82
83        // Build ledger entries per claim (for A26 PendingReview detection).
84        // Reuse the already-loaded all_ledger from above (no second load needed).
85        let ledger_entries: Vec<_> = claims.iter().flat_map(|c| {
86            all_ledger.iter()
87                .filter(|e| &e.claim_ref == c.claim_ref())
88                .cloned()
89        }).collect();
90
91        // Determine contested state from ledger (Contested disposition in live claims).
92        let contested = fold.live_claims.iter().any(|cs| {
93            cs.last_disposition
94                .as_ref()
95                .map(|d| *d == mempill_types::Disposition::Contested)
96                .unwrap_or(false)
97        });
98
99        // C5: projection.
100        let belief = projection::project(&fold, &ledger_entries, now, &self.config, contested);
101
102        Ok(QueryMemoryResponse { belief })
103    }
104
105    /// Convenience wrapper that stamps now internally (for direct calls outside EngineHandle).
106    pub fn execute(&self, req: QueryMemoryRequest) -> Result<QueryMemoryResponse, MemError> {
107        self.execute_with_time(req, Utc::now())
108    }
109}
110
111// ── Tests ──────────────────────────────────────────────────────────────────────
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use crate::noop::NoOpVector;
117    use crate::ports::persistence::Txn;
118    use chrono::TimeZone;
119    use mempill_types::{
120        AgentId, BeliefStatus, Cardinality, Claim, ClaimEdge, ClaimRef, Confidence, Criticality,
121        ExternalAnchor, ExternalKind, Fact, LedgerEntry, ProvenanceLabel, TransactionTime,
122        ValidTime, ValidityAssertion,
123    };
124    use std::sync::Mutex;
125
126    struct MockTxn(AgentId);
127    impl Txn for MockTxn {
128        fn agent_id(&self) -> &AgentId { &self.0 }
129    }
130
131    #[derive(Debug, thiserror::Error)]
132    #[error("mock")]
133    struct MockErr;
134
135    #[derive(Default)]
136    struct MockStore {
137        claims: Mutex<Vec<Claim>>,
138    }
139
140    impl PersistencePort for MockStore {
141        type Transaction = MockTxn;
142        type Error = MockErr;
143        fn begin_atomic(&self, aid: &AgentId) -> Result<MockTxn, MockErr> { Ok(MockTxn(aid.clone())) }
144        fn append_claim(&self, _t: &mut MockTxn, c: &Claim) -> Result<ClaimRef, MockErr> {
145            self.claims.lock().unwrap().push(c.clone());
146            Ok(c.claim_ref().clone())
147        }
148        fn append_validity_assertion(&self, _t: &mut MockTxn, _a: &ValidityAssertion) -> Result<(), MockErr> { Ok(()) }
149        fn append_ledger_entry(&self, _t: &mut MockTxn, _e: &LedgerEntry) -> Result<(), MockErr> { Ok(()) }
150        fn append_claim_edge(&self, _t: &mut MockTxn, _e: &ClaimEdge) -> Result<(), MockErr> { Ok(()) }
151        fn commit(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
152        fn rollback(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
153        fn load_subject_line(&self, _aid: &AgentId, subject: &str, predicate: &str) -> Result<Vec<Claim>, MockErr> {
154            let claims = self.claims.lock().unwrap();
155            Ok(claims.iter()
156                .filter(|c| c.fact().subject == subject && c.fact().predicate == predicate)
157                .cloned()
158                .collect())
159        }
160        fn load_claim(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Option<Claim>, MockErr> { Ok(None) }
161        fn load_validity_assertions_for(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ValidityAssertion>, MockErr> { Ok(vec![]) }
162        fn load_ledger(&self, _aid: &AgentId, _from: Option<&mempill_types::TransactionTime>, _lim: usize) -> Result<Vec<LedgerEntry>, MockErr> { Ok(vec![]) }
163        fn load_ledger_for_claims(&self, _aid: &AgentId, _refs: &[ClaimRef]) -> Result<Vec<LedgerEntry>, MockErr> { Ok(vec![]) }
164        fn load_edges_for(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
165        fn load_injected_claims(&self, _aid: &AgentId) -> Result<Vec<ClaimRef>, MockErr> { Ok(vec![]) }
166        fn load_lineage(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
167    }
168
169    fn make_claim(subject: &str, predicate: &str, value: serde_json::Value, tx: DateTime<Utc>) -> Claim {
170        Claim::new(
171            ClaimRef::new_random(),
172            AgentId("agent".into()),
173            Fact { subject: subject.into(), predicate: predicate.into(), value },
174            Cardinality::Functional,
175            ProvenanceLabel::External(ExternalKind::UserAsserted),
176            ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
177            TransactionTime(tx),
178            ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
179            Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
180            Criticality::Medium,
181            vec![],
182            None,
183            None,
184        )
185    }
186
187    #[test]
188    fn query_no_claims_returns_no_belief() {
189        let store = Arc::new(MockStore::default());
190        let uc = QueryMemoryUseCase::new(
191            Arc::clone(&store),
192            None::<Arc<NoOpVector>>,
193            EngineConfig::default(),
194        );
195        let now = Utc::now();
196        let req = QueryMemoryRequest {
197            agent_id: AgentId("agent".into()),
198            subject: "user".into(),
199            predicate: "city".into(),
200            as_of_tx_time: None,
201        };
202        let resp = uc.execute_with_time(req, now).unwrap();
203        assert_eq!(resp.belief.status, BeliefStatus::NoBelief);
204    }
205
206    #[test]
207    fn query_with_one_claim_returns_resolved() {
208        let store = Arc::new(MockStore::default());
209        let tx = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap();
210        let claim = make_claim("user", "city", serde_json::json!("Paris"), tx);
211        store.claims.lock().unwrap().push(claim);
212
213        let uc = QueryMemoryUseCase::new(
214            Arc::clone(&store),
215            None::<Arc<NoOpVector>>,
216            EngineConfig::default(),
217        );
218        let now = Utc.with_ymd_and_hms(2026, 1, 2, 0, 0, 0).unwrap();
219        let req = QueryMemoryRequest {
220            agent_id: AgentId("agent".into()),
221            subject: "user".into(),
222            predicate: "city".into(),
223            as_of_tx_time: None,
224        };
225        let resp = uc.execute_with_time(req, now).unwrap();
226        // Single live claim with unknown valid_time → TimingUncertain (valid_time is None).
227        assert!(
228            matches!(resp.belief.status, BeliefStatus::TimingUncertain | BeliefStatus::Resolved),
229            "expected Resolved or TimingUncertain, got {:?}",
230            resp.belief.status
231        );
232        assert!(resp.belief.primary.is_some(), "primary belief must be present");
233    }
234
235    #[test]
236    fn query_now_injected_not_read_from_clock() {
237        // Verify that the injected `now` flows into projection (currency decay) rather than
238        // the system clock. Two queries with different injected 'now' values on the same
239        // claim yield different CurrencyState in the result.
240        let store = Arc::new(MockStore::default());
241        // A claim from 200 days ago.
242        let old_tx = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
243        let claim = make_claim("user", "job", serde_json::json!("Engineer"), old_tx);
244        store.claims.lock().unwrap().push(claim);
245
246        let uc = QueryMemoryUseCase::new(
247            Arc::clone(&store),
248            None::<Arc<NoOpVector>>,
249            EngineConfig::default(),
250        );
251
252        // Query with 'now' very close to the claim's tx_time → should be Fresh.
253        let near_now = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 0).unwrap(); // 1 day later
254        let req = QueryMemoryRequest {
255            agent_id: AgentId("agent".into()),
256            subject: "user".into(),
257            predicate: "job".into(),
258            as_of_tx_time: None,
259        };
260        let resp_near = uc.execute_with_time(req.clone(), near_now).unwrap();
261        assert!(resp_near.belief.primary.is_some());
262        assert_eq!(
263            resp_near.belief.primary.unwrap().currency_signal.state,
264            mempill_types::CurrencyState::Fresh,
265            "1 day after claim, currency must be Fresh (injected now, not system clock)"
266        );
267
268        // Query with 'now' 200 days after claim → should be Decayed (decayed_threshold_days=90).
269        let far_now = Utc.with_ymd_and_hms(2020, 7, 20, 0, 0, 0).unwrap(); // ~200 days later
270        let resp_far = uc.execute_with_time(req, far_now).unwrap();
271        assert!(resp_far.belief.primary.is_some());
272        assert_eq!(
273            resp_far.belief.primary.unwrap().currency_signal.state,
274            mempill_types::CurrencyState::Decayed,
275            "200 days after claim, currency must be Decayed (injected now, not system clock)"
276        );
277    }
278}