#![allow(missing_docs)]
use std::sync::Arc;
use chrono::{DateTime, Utc};
use crate::{
application::ingest_claim::build_latest_disposition_map,
config::EngineConfig,
engine::{projection, truth_engine},
error::MemError,
ports::{PersistencePort, VectorPort},
};
use super::dto::{QueryMemoryRequest, QueryMemoryResponse};
pub struct QueryMemoryUseCase<P, V>
where
P: PersistencePort + Send + Sync + 'static,
V: VectorPort + Send + Sync + 'static,
{
persistence: Arc<P>,
#[allow(dead_code)]
vector: Option<Arc<V>>, config: EngineConfig,
}
impl<P, V> QueryMemoryUseCase<P, V>
where
P: PersistencePort + Send + Sync + 'static,
V: VectorPort + Send + Sync + 'static,
{
pub fn new(persistence: Arc<P>, vector: Option<Arc<V>>, config: EngineConfig) -> Self {
Self { persistence, vector, config }
}
pub fn execute_with_time(
&self,
req: QueryMemoryRequest,
now: DateTime<Utc>,
) -> Result<QueryMemoryResponse, MemError> {
let claims = self.persistence
.load_subject_line(&req.agent_id, &req.subject, &req.predicate)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
let as_of = req.as_of_tx_time.unwrap_or(now);
let claim_refs: Vec<_> = claims.iter().map(|c| c.claim_ref().clone()).collect();
let all_ledger = self.persistence
.load_ledger_for_claims(&req.agent_id, &claim_refs)
.map_err(|e| MemError::Persistence { source: Box::new(e) })?;
let latest_disposition = build_latest_disposition_map(&all_ledger);
let fold = truth_engine::fold(
claims.clone(),
|cref| {
self.persistence
.load_validity_assertions_for(&req.agent_id, cref)
.unwrap_or_default()
},
as_of,
&self.config,
&latest_disposition,
);
let ledger_entries: Vec<_> = claims.iter().flat_map(|c| {
all_ledger.iter()
.filter(|e| &e.claim_ref == c.claim_ref())
.cloned()
}).collect();
let contested = fold.live_claims.iter().any(|cs| {
cs.last_disposition
.as_ref()
.map(|d| *d == mempill_types::Disposition::Contested)
.unwrap_or(false)
});
let belief = projection::project(&fold, &ledger_entries, now, &self.config, contested);
Ok(QueryMemoryResponse { belief })
}
pub fn execute(&self, req: QueryMemoryRequest) -> Result<QueryMemoryResponse, MemError> {
self.execute_with_time(req, Utc::now())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::noop::NoOpVector;
use crate::ports::persistence::Txn;
use chrono::TimeZone;
use mempill_types::{
AgentId, BeliefStatus, Cardinality, Claim, ClaimEdge, ClaimRef, Confidence, Criticality,
ExternalAnchor, ExternalKind, Fact, LedgerEntry, ProvenanceLabel, TransactionTime,
ValidTime, ValidityAssertion,
};
use std::sync::Mutex;
struct MockTxn(AgentId);
impl Txn for MockTxn {
fn agent_id(&self) -> &AgentId { &self.0 }
}
#[derive(Debug, thiserror::Error)]
#[error("mock")]
struct MockErr;
#[derive(Default)]
struct MockStore {
claims: Mutex<Vec<Claim>>,
}
impl PersistencePort for MockStore {
type Transaction = MockTxn;
type Error = MockErr;
fn begin_atomic(&self, aid: &AgentId) -> Result<MockTxn, MockErr> { Ok(MockTxn(aid.clone())) }
fn append_claim(&self, _t: &mut MockTxn, c: &Claim) -> Result<ClaimRef, MockErr> {
self.claims.lock().unwrap().push(c.clone());
Ok(c.claim_ref().clone())
}
fn append_validity_assertion(&self, _t: &mut MockTxn, _a: &ValidityAssertion) -> Result<(), MockErr> { Ok(()) }
fn append_ledger_entry(&self, _t: &mut MockTxn, _e: &LedgerEntry) -> Result<(), MockErr> { Ok(()) }
fn append_claim_edge(&self, _t: &mut MockTxn, _e: &ClaimEdge) -> Result<(), MockErr> { Ok(()) }
fn commit(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
fn rollback(&self, _t: MockTxn) -> Result<(), MockErr> { Ok(()) }
fn load_subject_line(&self, _aid: &AgentId, subject: &str, predicate: &str) -> Result<Vec<Claim>, MockErr> {
let claims = self.claims.lock().unwrap();
Ok(claims.iter()
.filter(|c| c.fact().subject == subject && c.fact().predicate == predicate)
.cloned()
.collect())
}
fn load_claim(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Option<Claim>, MockErr> { Ok(None) }
fn load_validity_assertions_for(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ValidityAssertion>, MockErr> { Ok(vec![]) }
fn load_ledger(&self, _aid: &AgentId, _from: Option<&mempill_types::TransactionTime>, _lim: usize) -> Result<Vec<LedgerEntry>, MockErr> { Ok(vec![]) }
fn load_ledger_for_claims(&self, _aid: &AgentId, _refs: &[ClaimRef]) -> Result<Vec<LedgerEntry>, MockErr> { Ok(vec![]) }
fn load_edges_for(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
fn load_injected_claims(&self, _aid: &AgentId) -> Result<Vec<ClaimRef>, MockErr> { Ok(vec![]) }
fn load_lineage(&self, _aid: &AgentId, _r: &ClaimRef) -> Result<Vec<ClaimEdge>, MockErr> { Ok(vec![]) }
}
fn make_claim(subject: &str, predicate: &str, value: serde_json::Value, tx: DateTime<Utc>) -> Claim {
Claim::new(
ClaimRef::new_random(),
AgentId("agent".into()),
Fact { subject: subject.into(), predicate: predicate.into(), value },
Cardinality::Functional,
ProvenanceLabel::External(ExternalKind::UserAsserted),
ExternalAnchor { nearest_external_anchor: None, derivation_depth: 0 },
TransactionTime(tx),
ValidTime { start: None, end: None, valid_time_confidence: 0.0 },
Confidence { value_confidence: 0.9, valid_time_confidence: 0.0 },
Criticality::Medium,
vec![],
None,
None,
)
}
#[test]
fn query_no_claims_returns_no_belief() {
let store = Arc::new(MockStore::default());
let uc = QueryMemoryUseCase::new(
Arc::clone(&store),
None::<Arc<NoOpVector>>,
EngineConfig::default(),
);
let now = Utc::now();
let req = QueryMemoryRequest {
agent_id: AgentId("agent".into()),
subject: "user".into(),
predicate: "city".into(),
as_of_tx_time: None,
};
let resp = uc.execute_with_time(req, now).unwrap();
assert_eq!(resp.belief.status, BeliefStatus::NoBelief);
}
#[test]
fn query_with_one_claim_returns_resolved() {
let store = Arc::new(MockStore::default());
let tx = Utc.with_ymd_and_hms(2026, 1, 1, 0, 0, 0).unwrap();
let claim = make_claim("user", "city", serde_json::json!("Paris"), tx);
store.claims.lock().unwrap().push(claim);
let uc = QueryMemoryUseCase::new(
Arc::clone(&store),
None::<Arc<NoOpVector>>,
EngineConfig::default(),
);
let now = Utc.with_ymd_and_hms(2026, 1, 2, 0, 0, 0).unwrap();
let req = QueryMemoryRequest {
agent_id: AgentId("agent".into()),
subject: "user".into(),
predicate: "city".into(),
as_of_tx_time: None,
};
let resp = uc.execute_with_time(req, now).unwrap();
assert!(
matches!(resp.belief.status, BeliefStatus::TimingUncertain | BeliefStatus::Resolved),
"expected Resolved or TimingUncertain, got {:?}",
resp.belief.status
);
assert!(resp.belief.primary.is_some(), "primary belief must be present");
}
#[test]
fn query_now_injected_not_read_from_clock() {
let store = Arc::new(MockStore::default());
let old_tx = Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap();
let claim = make_claim("user", "job", serde_json::json!("Engineer"), old_tx);
store.claims.lock().unwrap().push(claim);
let uc = QueryMemoryUseCase::new(
Arc::clone(&store),
None::<Arc<NoOpVector>>,
EngineConfig::default(),
);
let near_now = Utc.with_ymd_and_hms(2020, 1, 2, 0, 0, 0).unwrap(); let req = QueryMemoryRequest {
agent_id: AgentId("agent".into()),
subject: "user".into(),
predicate: "job".into(),
as_of_tx_time: None,
};
let resp_near = uc.execute_with_time(req.clone(), near_now).unwrap();
assert!(resp_near.belief.primary.is_some());
assert_eq!(
resp_near.belief.primary.unwrap().currency_signal.state,
mempill_types::CurrencyState::Fresh,
"1 day after claim, currency must be Fresh (injected now, not system clock)"
);
let far_now = Utc.with_ymd_and_hms(2020, 7, 20, 0, 0, 0).unwrap(); let resp_far = uc.execute_with_time(req, far_now).unwrap();
assert!(resp_far.belief.primary.is_some());
assert_eq!(
resp_far.belief.primary.unwrap().currency_signal.state,
mempill_types::CurrencyState::Decayed,
"200 days after claim, currency must be Decayed (injected now, not system clock)"
);
}
}