mod common;
use std::sync::Arc;
use chrono::Utc;
use mempill_core::application::{IngestClaimRequest, QueryMemoryRequest};
use mempill_core::noop::{NoOpOracle, NoOpVector};
use mempill_core::EngineConfig;
use mempill_postgres::open_postgres;
use mempill_types::{
AgentId, BeliefStatus, Cardinality, Confidence, Criticality,
Disposition, ExternalKind, ProvenanceLabel, ValidTime,
};
fn dt(rfc3339: &str) -> chrono::DateTime<Utc> {
chrono::DateTime::parse_from_rfc3339(rfc3339)
.unwrap()
.with_timezone(&Utc)
}
fn vt_high(start: &str, end: Option<&str>) -> ValidTime {
ValidTime {
start: Some(dt(start)),
end: end.map(dt),
valid_time_confidence: 0.9,
}
}
fn vt_low(start: &str, end: Option<&str>) -> ValidTime {
ValidTime {
start: Some(dt(start)),
end: end.map(dt),
valid_time_confidence: 0.5, }
}
fn confident() -> Confidence {
Confidence { value_confidence: 0.9, valid_time_confidence: 0.9 }
}
fn run_succession_scenario(conn_str: &str) {
let conn_str = conn_str.to_owned();
let join = std::thread::spawn(move || {
let engine = open_postgres(
&conn_str,
None::<Arc<NoOpOracle>>,
None::<Arc<NoOpVector>>,
EngineConfig::default(),
)
.expect("open_postgres must succeed");
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("tokio runtime must build");
let result: Result<(), String> = rt.block_on(async {
let agent = AgentId("pg-succession-agent".into());
let r_alice = engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "acme".into(),
predicate: "ceo".into(),
value: serde_json::json!("alice"),
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: Some(vt_high("2020-01-01T00:00:00Z", Some("2024-03-01T00:00:00Z"))),
confidence: confident(),
criticality: Criticality::Medium,
derived_from: vec![],
}).await.map_err(|e| format!("alice ingest failed: {e}"))?;
println!("[PG SUCC] alice ingest → {:?}", r_alice.disposition);
if r_alice.disposition != Disposition::CommittedCheap {
return Err(format!(
"alice MUST be CommittedCheap; got {:?}", r_alice.disposition
));
}
let r_bob = engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "acme".into(),
predicate: "ceo".into(),
value: serde_json::json!("bob"),
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: Some(vt_high("2024-03-01T00:00:00Z", None)),
confidence: confident(),
criticality: Criticality::Medium,
derived_from: vec![],
}).await.map_err(|e| format!("bob ingest failed: {e}"))?;
println!("[PG SUCC] bob ingest → {:?}", r_bob.disposition);
if r_bob.disposition != Disposition::CommittedCheap {
return Err(format!(
"TASK-11: trusted non-overlapping succession MUST be CommittedCheap, not Contested; got {:?}",
r_bob.disposition
));
}
let qr_now = engine.query_memory(QueryMemoryRequest {
agent_id: agent.clone(),
subject: "acme".into(),
predicate: "ceo".into(),
as_of_tx_time: None,
}).await.map_err(|e| format!("query NOW failed: {e}"))?;
println!("[PG SUCC] query NOW → status={:?}, primary={:?}",
qr_now.belief.status,
qr_now.belief.primary.as_ref().map(|b| &b.fact.value));
if qr_now.belief.status != BeliefStatus::Resolved {
return Err(format!(
"Postgres: NOW query MUST be Resolved (succession selects Bob); got {:?}",
qr_now.belief.status
));
}
let primary_now = qr_now.belief.primary
.ok_or_else(|| "primary must exist at NOW".to_string())?;
if primary_now.fact.value != serde_json::json!("bob") {
return Err(format!(
"Postgres: primary at NOW MUST be Bob; got {:?}", primary_now.fact.value
));
}
if !qr_now.belief.alternatives.is_empty() {
return Err(format!(
"Postgres: no alternatives expected; got {:?}", qr_now.belief.alternatives
));
}
let qr_past = engine.query_memory(QueryMemoryRequest {
agent_id: agent.clone(),
subject: "acme".into(),
predicate: "ceo".into(),
as_of_tx_time: Some(dt("2022-06-01T00:00:00Z")),
}).await.map_err(|e| format!("query as_of 2022 failed: {e}"))?;
println!("[PG SUCC] query as_of 2022-06-01 → status={:?}, primary={:?}",
qr_past.belief.status,
qr_past.belief.primary.as_ref().map(|b| &b.fact.value));
if qr_past.belief.status != BeliefStatus::Resolved {
return Err(format!(
"Postgres: instant in Alice's window MUST be Resolved; got {:?}",
qr_past.belief.status
));
}
let primary_past = qr_past.belief.primary
.ok_or_else(|| "primary at 2022 must exist".to_string())?;
if primary_past.fact.value != serde_json::json!("alice") {
return Err(format!(
"Postgres: instant in Alice's window → primary MUST be Alice; got {:?}",
primary_past.fact.value
));
}
let boundary = dt("2024-03-01T00:00:00Z");
let qr_boundary = engine.query_memory(QueryMemoryRequest {
agent_id: agent.clone(),
subject: "acme".into(),
predicate: "ceo".into(),
as_of_tx_time: Some(boundary),
}).await.map_err(|e| format!("query at boundary failed: {e}"))?;
println!("[PG SUCC] query at boundary 2024-03-01 → status={:?}, primary={:?}",
qr_boundary.belief.status,
qr_boundary.belief.primary.as_ref().map(|b| &b.fact.value));
if qr_boundary.belief.status != BeliefStatus::Resolved {
return Err(format!(
"Postgres TIMESTAMPTZ boundary: 2024-03-01 MUST be Resolved; got {:?}",
qr_boundary.belief.status
));
}
let primary_boundary = qr_boundary.belief.primary
.ok_or_else(|| "primary at boundary must exist".to_string())?;
if primary_boundary.fact.value != serde_json::json!("bob") {
return Err(format!(
"Postgres TIMESTAMPTZ boundary: 2024-03-01 == Bob's start → MUST select Bob; got {:?}",
primary_boundary.fact.value
));
}
Ok(())
});
drop(engine);
result
});
join.join().expect("succession scenario thread must not panic")
.expect("PG succession + boundary scenario must pass");
}
fn run_low_confidence_contested(conn_str: &str) {
let conn_str = conn_str.to_owned();
let join = std::thread::spawn(move || {
let engine = open_postgres(
&conn_str,
None::<Arc<NoOpOracle>>,
None::<Arc<NoOpVector>>,
EngineConfig::default(),
)
.expect("open_postgres must succeed");
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("tokio runtime must build");
let result: Result<(), String> = rt.block_on(async {
let agent = AgentId("pg-lowconf-agent".into());
engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "corp".into(),
predicate: "ceo".into(),
value: serde_json::json!("alice"),
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: Some(vt_low("2020-01-01T00:00:00Z", Some("2024-03-01T00:00:00Z"))),
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.5 },
criticality: Criticality::Medium,
derived_from: vec![],
}).await.map_err(|e| format!("alice ingest failed: {e}"))?;
let r_bob = engine.ingest_claim(IngestClaimRequest {
agent_id: agent.clone(),
subject: "corp".into(),
predicate: "ceo".into(),
value: serde_json::json!("bob"),
provenance: ProvenanceLabel::External(ExternalKind::UserAsserted),
cardinality: Cardinality::Functional,
valid_time: Some(vt_low("2024-03-01T00:00:00Z", None)),
confidence: Confidence { value_confidence: 0.9, valid_time_confidence: 0.5 },
criticality: Criticality::Medium,
derived_from: vec![],
}).await.map_err(|e| format!("bob ingest failed: {e}"))?;
println!("[PG LOWCONF] bob ingest → {:?}", r_bob.disposition);
if r_bob.disposition != Disposition::Contested {
return Err(format!(
"Postgres I2 fallback: confidence 0.5 < 0.7 MUST be Contested; got {:?}",
r_bob.disposition
));
}
Ok(())
});
drop(engine);
result
});
join.join().expect("low-confidence thread must not panic")
.expect("PG low-confidence contested scenario must pass");
}
#[test]
fn pg16_succession_and_boundary() {
common::with_pg_and_conn("16", |_store, conn_str| {
run_succession_scenario(&conn_str);
});
}
#[test]
fn pg16_low_confidence_is_contested() {
common::with_pg_and_conn("16", |_store, conn_str| {
run_low_confidence_contested(&conn_str);
});
}
#[test]
fn pg18_succession_and_boundary() {
common::with_pg_and_conn("18", |_store, conn_str| {
run_succession_scenario(&conn_str);
});
}
#[test]
fn pg18_low_confidence_is_contested() {
common::with_pg_and_conn("18", |_store, conn_str| {
run_low_confidence_contested(&conn_str);
});
}