1#![allow(missing_docs)]
2use std::sync::Arc;
21
22use chrono::{DateTime, Utc};
23use mempill_types::{Claim, ClaimRef, HistoryEntryStatus, ProvenanceLabel, ExternalKind};
24
25use crate::{
26 application::ingest_claim::build_latest_disposition_map,
27 config::EngineConfig,
28 engine::truth_engine,
29 error::MemError,
30 ports::{PersistencePort, VectorPort},
31};
32
33use super::dto::{HistoryEntry, QueryHistoryRequest, QueryHistoryResponse};
34
35fn ordering_key_dt(claim: &Claim, config: &EngineConfig) -> DateTime<Utc> {
40 if claim.valid_time().valid_time_confidence >= config.valid_time_confidence_threshold {
41 claim.valid_time().start.unwrap_or(claim.transaction_time().0)
42 } else {
43 claim.transaction_time().0
44 }
45}
46
47fn format_provenance(p: &ProvenanceLabel) -> String {
52 match p {
53 ProvenanceLabel::External(ExternalKind::UserAsserted) => {
54 "External/UserAsserted".to_owned()
55 }
56 ProvenanceLabel::External(ExternalKind::ExternalFirstHand) => {
57 "External/ExternalFirstHand".to_owned()
58 }
59 ProvenanceLabel::RecallReEntry => "RecallReEntry".to_owned(),
60 ProvenanceLabel::ModelDerived => "ModelDerived".to_owned(),
61 _ => format!("{p:?}"),
62 }
63}
64
65pub fn compute_effective_windows(
76 sorted: &[&Claim],
77 config: &EngineConfig,
78) -> Vec<Option<DateTime<Utc>>> {
79 let n = sorted.len();
80 let mut windows = Vec::with_capacity(n);
81 for i in 0..n {
82 if i + 1 < n {
83 windows.push(Some(ordering_key_dt(sorted[i + 1], config)));
85 } else {
86 windows.push(None);
88 }
89 }
90 windows
91}
92
93pub struct QueryHistoryUseCase<P, V>
99where
100 P: PersistencePort + Send + Sync + 'static,
101 V: VectorPort + Send + Sync + 'static,
102{
103 persistence: Arc<P>,
104 #[allow(dead_code)]
105 vector: Option<Arc<V>>,
106 config: EngineConfig,
107}
108
109impl<P, V> QueryHistoryUseCase<P, V>
110where
111 P: PersistencePort + Send + Sync + 'static,
112 V: VectorPort + Send + Sync + 'static,
113{
114 pub fn new(persistence: Arc<P>, vector: Option<Arc<V>>, config: EngineConfig) -> Self {
115 Self { persistence, vector, config }
116 }
117
118 pub fn execute_with_time(
122 &self,
123 req: QueryHistoryRequest,
124 now: DateTime<Utc>,
125 ) -> Result<QueryHistoryResponse, MemError> {
126 let claims = self.persistence
128 .load_subject_line(&req.agent_id, &req.subject, &req.predicate)
129 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
130
131 if claims.is_empty() {
132 return Ok(QueryHistoryResponse { entries: vec![] });
133 }
134
135 let claim_refs: Vec<_> = claims.iter().map(|c| c.claim_ref().clone()).collect();
138 let all_ledger = self.persistence
139 .load_ledger_for_claims(&req.agent_id, &claim_refs)
140 .map_err(|e| MemError::Persistence { source: Box::new(e) })?;
141 let latest_disposition = build_latest_disposition_map(&all_ledger);
142
143 let fold = truth_engine::fold(
145 claims.clone(),
146 |cref| {
147 self.persistence
148 .load_validity_assertions_for(&req.agent_id, cref)
149 .unwrap_or_default()
150 },
151 now,
152 &self.config,
153 &latest_disposition,
154 );
155
156 let live_refs: std::collections::HashSet<&ClaimRef> = fold
158 .live_claims
159 .iter()
160 .map(|cs| cs.claim.claim_ref())
161 .collect();
162
163 let mut sorted_claims = claims;
165 sorted_claims.sort_by(|a, b| {
166 let ka = ordering_key_dt(a, &self.config);
167 let kb = ordering_key_dt(b, &self.config);
168 ka.cmp(&kb)
169 .then(a.transaction_time().0.cmp(&b.transaction_time().0))
170 .then(a.claim_ref().0.as_u128().cmp(&b.claim_ref().0.as_u128()))
171 });
172
173 let refs: Vec<&Claim> = sorted_claims.iter().collect();
175 let windows = compute_effective_windows(&refs, &self.config);
176
177 let entries: Vec<HistoryEntry> = sorted_claims
179 .iter()
180 .zip(windows)
181 .map(|(claim, valid_until)| {
182 let status = if live_refs.contains(claim.claim_ref()) {
183 HistoryEntryStatus::Current
184 } else {
185 HistoryEntryStatus::Superseded
186 };
187 HistoryEntry {
188 claim_ref: claim.claim_ref().clone(),
189 value: claim.fact().value.clone(),
190 valid_from: claim.valid_time().start,
191 valid_until,
192 status,
193 provenance: format_provenance(claim.provenance()),
194 value_confidence: claim.confidence().value_confidence,
195 }
196 })
197 .collect();
198
199 Ok(QueryHistoryResponse { entries })
200 }
201
202 pub fn execute(&self, req: QueryHistoryRequest) -> Result<QueryHistoryResponse, MemError> {
204 self.execute_with_time(req, Utc::now())
205 }
206}
207
208#[cfg(test)]
211mod tests {
212 use super::*;
213 use crate::config::EngineConfig;
214 use crate::noop::NoOpVector;
215 use crate::ports::persistence::Txn;
216 use chrono::TimeZone;
217 use mempill_types::{
218 AgentId, Cardinality, Claim, ClaimEdge, ClaimRef, Confidence, Criticality,
219 ExternalAnchor, ExternalKind, Fact, LedgerEntry, ProvenanceLabel, TransactionTime,
220 ValidTime, ValidityAssertion,
221 };
222 use std::sync::Mutex;
223
224 struct MockTxn(AgentId);
227 impl Txn for MockTxn {
228 fn agent_id(&self) -> &AgentId { &self.0 }
229 }
230
231 #[derive(Debug, thiserror::Error)]
232 #[error("mock")]
233 struct MockErr;
234
235 #[derive(Default)]
236 struct MockStore {
237 claims: Mutex<Vec<Claim>>,
238 assertions: Mutex<Vec<ValidityAssertion>>,
239 }
240
241 impl PersistencePort for MockStore {
242 type Transaction = MockTxn;
243 type Error = MockErr;
244 fn begin_atomic(&self, aid: &AgentId) -> Result<MockTxn, MockErr> {
245 Ok(MockTxn(aid.clone()))
246 }
247 fn append_claim(&self, _t: &mut MockTxn, c: &Claim) -> Result<ClaimRef, MockErr> {
248 self.claims.lock().unwrap().push(c.clone());
249 Ok(c.claim_ref().clone())
250 }
251 fn append_validity_assertion(
252 &self,
253 _t: &mut MockTxn,
254 a: &ValidityAssertion,
255 ) -> Result<(), MockErr> {
256 self.assertions.lock().unwrap().push(a.clone());
257 Ok(())
258 }
259 fn append_ledger_entry(
260 &self,
261 _t: &mut MockTxn,
262 _e: &LedgerEntry,
263 ) -> Result<(), MockErr> {
264 Ok(())
265 }
266 fn append_claim_edge(
267 &self,
268 _t: &mut MockTxn,
269 _e: &ClaimEdge,
270 ) -> Result<(), MockErr> {
271 Ok(())
272 }
273 fn commit(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
274 fn rollback(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
275 fn load_subject_line(
276 &self,
277 _aid: &AgentId,
278 subject: &str,
279 predicate: &str,
280 ) -> Result<Vec<Claim>, MockErr> {
281 let claims = self.claims.lock().unwrap();
282 Ok(claims
283 .iter()
284 .filter(|c| {
285 c.fact().subject == subject && c.fact().predicate == predicate
286 })
287 .cloned()
288 .collect())
289 }
290 fn load_claim(
291 &self,
292 _aid: &AgentId,
293 r: &ClaimRef,
294 ) -> Result<Option<Claim>, MockErr> {
295 let claims = self.claims.lock().unwrap();
296 Ok(claims.iter().find(|c| c.claim_ref() == r).cloned())
297 }
298 fn load_validity_assertions_for(
299 &self,
300 _aid: &AgentId,
301 r: &ClaimRef,
302 ) -> Result<Vec<ValidityAssertion>, MockErr> {
303 let assertions = self.assertions.lock().unwrap();
304 Ok(assertions
305 .iter()
306 .filter(|a| &a.target_claim == r)
307 .cloned()
308 .collect())
309 }
310 fn load_ledger(
311 &self,
312 _aid: &AgentId,
313 _from: Option<&mempill_types::TransactionTime>,
314 _lim: usize,
315 ) -> Result<Vec<LedgerEntry>, MockErr> {
316 Ok(vec![])
317 }
318 fn load_ledger_for_claims(
319 &self,
320 _aid: &AgentId,
321 _refs: &[ClaimRef],
322 ) -> Result<Vec<LedgerEntry>, MockErr> {
323 Ok(vec![])
324 }
325 fn load_edges_for(
326 &self,
327 _aid: &AgentId,
328 _r: &ClaimRef,
329 ) -> Result<Vec<ClaimEdge>, MockErr> {
330 Ok(vec![])
331 }
332 fn load_injected_claims(
333 &self,
334 _aid: &AgentId,
335 ) -> Result<Vec<ClaimRef>, MockErr> {
336 Ok(vec![])
337 }
338 fn load_lineage(
339 &self,
340 _aid: &AgentId,
341 _r: &ClaimRef,
342 ) -> Result<Vec<ClaimEdge>, MockErr> {
343 Ok(vec![])
344 }
345 }
346
347 fn agent() -> AgentId {
350 AgentId("test-agent".into())
351 }
352
353 #[allow(clippy::too_many_arguments)]
354 fn make_claim(
356 agent_id: &AgentId,
357 subject: &str,
358 predicate: &str,
359 value: serde_json::Value,
360 tx: DateTime<Utc>,
361 vt_start: Option<DateTime<Utc>>,
362 vt_end: Option<DateTime<Utc>>,
363 vt_confidence: f32,
364 ) -> Claim {
365 Claim::new(
366 ClaimRef::new_random(),
367 agent_id.clone(),
368 Fact { subject: subject.into(), predicate: predicate.into(), value },
369 Cardinality::Functional,
370 ProvenanceLabel::External(ExternalKind::UserAsserted),
371 ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
372 TransactionTime(tx),
373 ValidTime { start: vt_start, end: vt_end, valid_time_confidence: vt_confidence },
374 Confidence { value_confidence: 0.9, valid_time_confidence: vt_confidence },
375 Criticality::Medium,
376 vec![],
377 None,
378 None,
379 )
380 }
381
382 fn uc(store: Arc<MockStore>) -> QueryHistoryUseCase<MockStore, NoOpVector> {
383 QueryHistoryUseCase::new(store, None::<Arc<NoOpVector>>, EngineConfig::default())
384 }
385
386 #[test]
389 fn empty_subject_line_returns_empty_entries() {
390 let store = Arc::new(MockStore::default());
391 let uc = uc(Arc::clone(&store));
392 let now = Utc::now();
393 let req = QueryHistoryRequest {
394 agent_id: agent(),
395 subject: "nobody".into(),
396 predicate: "nothing".into(),
397 };
398 let resp = uc.execute_with_time(req, now).unwrap();
399 assert!(resp.entries.is_empty(), "no claims → empty history");
400 assert!(resp.current().is_none(), "no current entry");
401 }
402
403 #[test]
406 fn single_claim_returns_one_current_entry() {
407 let store = Arc::new(MockStore::default());
408 let agent = agent();
409 let tx = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
410 let claim = make_claim(&agent, "acme", "ceo", serde_json::json!("Alice"), tx, None, None, 0.0);
411 store.claims.lock().unwrap().push(claim.clone());
412
413 let uc = uc(Arc::clone(&store));
414 let now = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
415 let resp = uc.execute_with_time(
416 QueryHistoryRequest { agent_id: agent, subject: "acme".into(), predicate: "ceo".into() },
417 now,
418 ).unwrap();
419
420 assert_eq!(resp.entries.len(), 1, "one claim → one entry");
421 assert_eq!(resp.entries[0].status, HistoryEntryStatus::Current);
422 assert_eq!(resp.entries[0].value, serde_json::json!("Alice"));
423 assert!(resp.entries[0].valid_until.is_none(), "single entry has no successor → open-ended");
424 }
425
426 #[test]
429 fn succession_ordering_oldest_first() {
430 let store = Arc::new(MockStore::default());
431 let agent = agent();
432 let t1 = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
433 let t2 = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
434
435 let claim2 = make_claim(&agent, "acme", "ceo", serde_json::json!("Bob"), t2, None, None, 0.0);
437 let claim1 = make_claim(&agent, "acme", "ceo", serde_json::json!("Alice"), t1, None, None, 0.0);
438 store.claims.lock().unwrap().push(claim2);
439 store.claims.lock().unwrap().push(claim1);
440
441 let uc = uc(Arc::clone(&store));
442 let now = Utc.with_ymd_and_hms(2025, 6, 1, 0, 0, 0).unwrap();
443 let resp = uc.execute_with_time(
444 QueryHistoryRequest { agent_id: agent, subject: "acme".into(), predicate: "ceo".into() },
445 now,
446 ).unwrap();
447
448 assert_eq!(resp.entries.len(), 2);
449 assert_eq!(resp.entries[0].value, serde_json::json!("Alice"), "oldest first");
450 assert_eq!(resp.entries[1].value, serde_json::json!("Bob"), "newer second");
451 }
452
453 #[test]
456 fn effective_window_successor_closes_prior_entry() {
457 let config = EngineConfig::default();
458 let agent = agent();
459 let t1 = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
460 let t2 = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
461
462 let c1 = make_claim(&agent, "a", "b", serde_json::json!("v1"), t1, None, None, 0.0);
463 let c2 = make_claim(&agent, "a", "b", serde_json::json!("v2"), t2, None, None, 0.0);
464
465 let sorted: Vec<&Claim> = vec![&c1, &c2];
466 let windows = compute_effective_windows(&sorted, &config);
467
468 assert_eq!(windows[0], Some(t2), "c1 closed by c2's ordering key");
470 assert_eq!(windows[1], None, "last entry is open-ended");
472 }
473
474 #[test]
477 fn current_entry_value_matches_recall_primary() {
478 let store = Arc::new(MockStore::default());
479 let agent = agent();
480 let t1 = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
481 let t2 = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
482
483 let c1 = make_claim(&agent, "acme", "ceo", serde_json::json!("Alice"), t1, None, None, 0.0);
484 let c2 = make_claim(&agent, "acme", "ceo", serde_json::json!("Bob"), t2, None, None, 0.0);
485 store.claims.lock().unwrap().push(c1);
486 store.claims.lock().unwrap().push(c2);
487
488 let uc = uc(Arc::clone(&store));
489 let now = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
490 let resp = uc.execute_with_time(
491 QueryHistoryRequest { agent_id: agent, subject: "acme".into(), predicate: "ceo".into() },
492 now,
493 ).unwrap();
494
495 let current_entries: Vec<_> = resp.entries.iter().filter(|e| e.status == HistoryEntryStatus::Current).collect();
498 assert!(!current_entries.is_empty(), "at least one Current entry must exist");
499 }
500
501 #[test]
504 fn high_confidence_ordering_key_uses_valid_time_start() {
505 let config = EngineConfig::default(); let agent = agent();
507
508 let tx_late = Utc.with_ymd_and_hms(2024, 6, 1, 0, 0, 0).unwrap();
510 let vt_early = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
511 let claim_a = make_claim(&agent, "x", "y", serde_json::json!("A"), tx_late, Some(vt_early), None, 0.9);
512
513 let tx_early = Utc.with_ymd_and_hms(2023, 1, 1, 0, 0, 0).unwrap();
515 let claim_b = make_claim(&agent, "x", "y", serde_json::json!("B"), tx_early, None, None, 0.0);
516
517 let key_a = ordering_key_dt(&claim_a, &config);
519 let key_b = ordering_key_dt(&claim_b, &config);
520 assert!(key_a < key_b, "high-confidence A (vt=2020) must precede B (tx=2023)");
521
522 let sorted: Vec<&Claim> = vec![&claim_a, &claim_b];
524 let windows = compute_effective_windows(&sorted, &config);
525 assert_eq!(windows[0], Some(key_b), "A's valid_until = B's ordering key");
526 assert_eq!(windows[1], None, "B is last → open-ended");
527 }
528
529 #[test]
532 fn all_claims_bounded_returns_all_superseded() {
533 use mempill_types::{AssertionKind, ValidityAssertion};
534 use uuid::Uuid;
535
536 let store = Arc::new(MockStore::default());
537 let agent = agent();
538 let tx = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
539 let bound_at = Utc.with_ymd_and_hms(2021, 1, 1, 0, 0, 0).unwrap();
540
541 let claim = make_claim(&agent, "acme", "ceo", serde_json::json!("Alice"), tx, None, None, 0.0);
542 let claim_ref = claim.claim_ref().clone();
543
544 let assertion = ValidityAssertion {
545 assertion_ref: Uuid::new_v4(),
546 agent_id: agent.clone(),
547 target_claim: claim_ref.clone(),
548 kind: AssertionKind::Bound { bound_at },
549 provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
550 confidence: Confidence { value_confidence: 1.0, valid_time_confidence: 1.0 },
551 asserted_at: TransactionTime(bound_at),
552 };
553
554 store.claims.lock().unwrap().push(claim);
555 store.assertions.lock().unwrap().push(assertion);
556
557 let uc = uc(Arc::clone(&store));
558 let now = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
560 let resp = uc.execute_with_time(
561 QueryHistoryRequest { agent_id: agent, subject: "acme".into(), predicate: "ceo".into() },
562 now,
563 ).unwrap();
564
565 assert_eq!(resp.entries.len(), 1, "one claim in history");
566 assert_eq!(
567 resp.entries[0].status,
568 HistoryEntryStatus::Superseded,
569 "bounded claim must be Superseded"
570 );
571 assert!(resp.current().is_none(), "no current entry when all claims are bounded");
572 }
573
574 #[test]
577 fn compute_effective_windows_empty() {
578 let config = EngineConfig::default();
579 let windows = compute_effective_windows(&[], &config);
580 assert!(windows.is_empty());
581 }
582
583 #[test]
584 fn compute_effective_windows_single() {
585 let config = EngineConfig::default();
586 let agent = agent();
587 let tx = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
588 let c = make_claim(&agent, "a", "b", serde_json::json!("v"), tx, None, None, 0.0);
589 let sorted = vec![&c];
590 let windows = compute_effective_windows(&sorted, &config);
591 assert_eq!(windows.len(), 1);
592 assert_eq!(windows[0], None, "single claim → open-ended");
593 }
594
595 #[test]
596 fn compute_effective_windows_three_entries() {
597 let config = EngineConfig::default();
598 let agent = agent();
599 let t1 = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
600 let t2 = Utc.with_ymd_and_hms(2022, 1, 1, 0, 0, 0).unwrap();
601 let t3 = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
602
603 let c1 = make_claim(&agent, "a", "b", serde_json::json!("v1"), t1, None, None, 0.0);
604 let c2 = make_claim(&agent, "a", "b", serde_json::json!("v2"), t2, None, None, 0.0);
605 let c3 = make_claim(&agent, "a", "b", serde_json::json!("v3"), t3, None, None, 0.0);
606
607 let sorted = vec![&c1, &c2, &c3];
608 let windows = compute_effective_windows(&sorted, &config);
609
610 assert_eq!(windows.len(), 3);
611 assert_eq!(windows[0], Some(t2));
613 assert_eq!(windows[1], Some(t3));
614 assert_eq!(windows[2], None);
615 }
616
617 #[test]
620 fn provenance_formatted_correctly_in_entry() {
621 let store = Arc::new(MockStore::default());
622 let agent = agent();
623 let tx = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap();
624 let claim = make_claim(&agent, "acme", "ceo", serde_json::json!("Alice"), tx, None, None, 0.0);
625 store.claims.lock().unwrap().push(claim);
626
627 let uc = uc(Arc::clone(&store));
628 let now = Utc.with_ymd_and_hms(2025, 1, 1, 0, 0, 0).unwrap();
629 let resp = uc.execute_with_time(
630 QueryHistoryRequest { agent_id: agent, subject: "acme".into(), predicate: "ceo".into() },
631 now,
632 ).unwrap();
633
634 assert_eq!(resp.entries[0].provenance, "External/UserAsserted");
635 }
636}