use crate::canonical::{InfRecord, ProRecord, SemRecord};
use crate::clock::ClockTime;
use crate::dag::EdgeKind;
use crate::pipeline::Pipeline;
use crate::symbol::SymbolId;
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
pub struct TemporalQuery {
pub as_of: Option<ClockTime>,
pub as_committed: Option<ClockTime>,
}
impl TemporalQuery {
#[must_use]
pub const fn current() -> Self {
Self {
as_of: None,
as_committed: None,
}
}
#[must_use]
pub const fn as_of(t: ClockTime) -> Self {
Self {
as_of: Some(t),
as_committed: None,
}
}
#[must_use]
pub const fn as_committed(t: ClockTime) -> Self {
Self {
as_of: None,
as_committed: Some(t),
}
}
#[must_use]
pub const fn bi_temporal(as_of: ClockTime, as_committed: ClockTime) -> Self {
Self {
as_of: Some(as_of),
as_committed: Some(as_committed),
}
}
}
#[must_use]
pub fn resolve_semantic(
pipeline: &Pipeline,
s: SymbolId,
p: SymbolId,
query: TemporalQuery,
) -> Option<SemRecord> {
let (as_of, as_committed) = effective_points(pipeline, query)?;
let records = pipeline.semantic_records();
let mut best: Option<&SemRecord> = None;
for &idx in pipeline.semantic_history_at(s, p) {
let Some(record) = records.get(idx) else {
continue;
};
if !is_authoritative_sem(pipeline, record, as_of, as_committed) {
continue;
}
best = Some(match best {
None => record,
Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
Some(cur) => cur,
});
}
best.cloned()
}
#[must_use]
pub fn resolve_inferential(
pipeline: &Pipeline,
s: SymbolId,
p: SymbolId,
query: TemporalQuery,
) -> Option<InfRecord> {
let (as_of, as_committed) = effective_points(pipeline, query)?;
let records = pipeline.inferential_records();
let mut best: Option<&InfRecord> = None;
for &idx in pipeline.inferential_history_at(s, p) {
let Some(record) = records.get(idx) else {
continue;
};
if !is_authoritative_inf(pipeline, record, as_of, as_committed) {
continue;
}
best = Some(match best {
None => record,
Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
Some(cur) => cur,
});
}
best.cloned()
}
#[must_use]
pub fn resolve_procedural(
pipeline: &Pipeline,
rule_id: SymbolId,
query: TemporalQuery,
) -> Option<ProRecord> {
let (as_of, as_committed) = effective_points(pipeline, query)?;
let records = pipeline.procedural_records();
let mut best: Option<&ProRecord> = None;
for &idx in pipeline.procedural_history_for(rule_id) {
let Some(record) = records.get(idx) else {
continue;
};
if !is_authoritative_pro(pipeline, record, as_of, as_committed) {
continue;
}
best = Some(match best {
None => record,
Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
Some(cur) => cur,
});
}
best.cloned()
}
fn effective_points(pipeline: &Pipeline, query: TemporalQuery) -> Option<(ClockTime, ClockTime)> {
let watermark = pipeline.last_committed_at()?;
Some((
query.as_of.unwrap_or(watermark),
query.as_committed.unwrap_or(watermark),
))
}
fn is_authoritative_sem(
pipeline: &Pipeline,
record: &SemRecord,
as_of: ClockTime,
as_committed: ClockTime,
) -> bool {
if record.clocks.committed_at > as_committed {
return false;
}
if record.clocks.valid_at > as_of {
return false;
}
let effective_invalid = effective_invalid_at_sem(pipeline, record, as_committed);
match effective_invalid {
None => true,
Some(iv) => iv > as_of,
}
}
fn is_authoritative_inf(
pipeline: &Pipeline,
record: &InfRecord,
as_of: ClockTime,
as_committed: ClockTime,
) -> bool {
if record.clocks.committed_at > as_committed {
return false;
}
if record.clocks.valid_at > as_of {
return false;
}
let effective_invalid = effective_invalid_at_inf(pipeline, record, as_committed);
match effective_invalid {
None => true,
Some(iv) => iv > as_of,
}
}
fn effective_invalid_at_inf(
pipeline: &Pipeline,
record: &InfRecord,
as_committed: ClockTime,
) -> Option<ClockTime> {
let mut candidates: Vec<ClockTime> = Vec::new();
if let Some(iv) = record.clocks.invalid_at {
candidates.push(iv);
}
collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
candidates.into_iter().min()
}
fn is_authoritative_pro(
pipeline: &Pipeline,
record: &ProRecord,
as_of: ClockTime,
as_committed: ClockTime,
) -> bool {
if record.clocks.committed_at > as_committed {
return false;
}
if record.clocks.valid_at > as_of {
return false;
}
let effective_invalid = effective_invalid_at_pro(pipeline, record, as_committed);
match effective_invalid {
None => true,
Some(iv) => iv > as_of,
}
}
fn effective_invalid_at_sem(
pipeline: &Pipeline,
record: &SemRecord,
as_committed: ClockTime,
) -> Option<ClockTime> {
let mut candidates: Vec<ClockTime> = Vec::new();
if let Some(iv) = record.clocks.invalid_at {
candidates.push(iv);
}
collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
candidates.into_iter().min()
}
fn effective_invalid_at_pro(
pipeline: &Pipeline,
record: &ProRecord,
as_committed: ClockTime,
) -> Option<ClockTime> {
let mut candidates: Vec<ClockTime> = Vec::new();
if let Some(iv) = record.clocks.invalid_at {
candidates.push(iv);
}
collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
candidates.into_iter().min()
}
fn collect_edge_closures(
pipeline: &Pipeline,
target_memory: SymbolId,
as_committed: ClockTime,
out: &mut Vec<ClockTime>,
) {
for edge in pipeline.dag().edges_to(target_memory) {
if edge.kind != EdgeKind::Supersedes {
continue;
}
if edge.at > as_committed {
continue;
}
if let Some(source_valid_at) = lookup_source_valid_at(pipeline, edge.from) {
out.push(source_valid_at);
}
}
}
fn lookup_source_valid_at(pipeline: &Pipeline, memory_id: SymbolId) -> Option<ClockTime> {
for r in pipeline.semantic_records() {
if r.memory_id == memory_id {
return Some(r.clocks.valid_at);
}
}
for r in pipeline.procedural_records() {
if r.memory_id == memory_id {
return Some(r.clocks.valid_at);
}
}
for r in pipeline.inferential_records() {
if r.memory_id == memory_id {
return Some(r.clocks.valid_at);
}
}
None
}
#[cfg(test)]
mod tests {
use super::*;
fn ms(v: u64) -> ClockTime {
ClockTime::try_from_millis(v).expect("non-sentinel")
}
fn now() -> ClockTime {
ms(1_713_350_400_000)
}
fn compile(pipe: &mut Pipeline, src: &str) {
pipe.compile_batch(src, now()).expect("compile");
}
fn alice_knows(pipe: &Pipeline) -> (SymbolId, SymbolId) {
let s = pipe.table().lookup("alice").expect("alice");
let p = pipe.table().lookup("knows").expect("knows");
(s, p)
}
#[test]
fn empty_pipeline_resolves_to_none() {
let pipe = Pipeline::new();
let q = TemporalQuery::current();
let got = resolve_semantic(&pipe, SymbolId::new(0), SymbolId::new(1), q);
assert!(got.is_none());
}
#[test]
fn current_read_returns_latest_forward_supersessor() {
let mut pipe = Pipeline::new();
compile(
&mut pipe,
"(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
);
compile(
&mut pipe,
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
);
let (s, p) = alice_knows(&pipe);
let got = resolve_semantic(&pipe, s, p, TemporalQuery::current())
.expect("has authoritative record");
let carol = pipe.table().lookup("carol").expect("carol");
assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == carol));
}
#[test]
fn as_of_past_valid_time_returns_earlier_record() {
let mut pipe = Pipeline::new();
compile(
&mut pipe,
"(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
);
compile(
&mut pipe,
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
);
let (s, p) = alice_knows(&pipe);
let between = ms(1_707_955_200_000); let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_of(between))
.expect("bob valid at 2024-02-15");
let bob = pipe.table().lookup("bob").expect("bob");
assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == bob));
let before = ms(1_704_067_200_000); assert!(resolve_semantic(&pipe, s, p, TemporalQuery::as_of(before)).is_none());
}
#[test]
fn retroactive_record_wins_over_earlier_forward_record_in_overlap() {
let mut pipe = Pipeline::new();
compile(
&mut pipe,
"(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
);
compile(
&mut pipe,
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-04-01)",
);
compile(
&mut pipe,
"(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-03-15)",
);
let (s, p) = alice_knows(&pipe);
let mar_20 = ms(1_710_892_800_000); let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_of(mar_20))
.expect("dan valid at 2024-03-20");
let dan = pipe.table().lookup("dan").expect("dan");
assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == dan));
}
#[test]
fn as_committed_hides_records_committed_after_snapshot() {
let mut pipe = Pipeline::new();
let t1 = ms(1_713_350_400_000);
let t2 = ms(1_713_350_500_000);
pipe.compile_batch(
"(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
t1,
)
.expect("t1");
pipe.compile_batch(
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
t2,
)
.expect("t2");
let (s, p) = alice_knows(&pipe);
let now_got = resolve_semantic(&pipe, s, p, TemporalQuery::current()).expect("current");
let carol = pipe.table().lookup("carol").expect("carol");
assert!(matches!(&now_got.o, crate::Value::Symbol(id) if *id == carol));
let between = ms(t1.as_millis() + 1);
let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_committed(between))
.expect("t1 visible, t2 not");
let bob = pipe.table().lookup("bob").expect("bob");
assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == bob));
}
#[test]
fn procedural_current_read_follows_supersession_chain() {
let mut pipe = Pipeline::new();
compile(
&mut pipe,
r#"(pro @rule_x "t_a" "act_1" :scp @mimir :src @policy :c 1.0)"#,
);
compile(
&mut pipe,
r#"(pro @rule_x "t_b" "act_2" :scp @other :src @policy :c 1.0)"#,
);
let rule = pipe.table().lookup("rule_x").expect("rule_x");
let got = resolve_procedural(&pipe, rule, TemporalQuery::current()).expect("current pro");
assert!(matches!(&got.action, crate::Value::String(s) if s == "act_2"));
}
#[test]
fn procedural_as_committed_returns_older_version() {
let mut pipe = Pipeline::new();
let t1 = ms(1_713_350_400_000);
let t2 = ms(1_713_350_500_000);
pipe.compile_batch(
r#"(pro @rule_x "t_a" "act_1" :scp @mimir :src @policy :c 1.0)"#,
t1,
)
.expect("t1");
pipe.compile_batch(
r#"(pro @rule_x "t_b" "act_2" :scp @other :src @policy :c 1.0)"#,
t2,
)
.expect("t2");
let rule = pipe.table().lookup("rule_x").expect("rule_x");
let got =
resolve_procedural(&pipe, rule, TemporalQuery::as_committed(t1)).expect("t1-era pro");
assert!(matches!(&got.action, crate::Value::String(s) if s == "act_1"));
}
#[test]
fn bi_temporal_read_returns_pre_correction_view() {
let mut pipe = Pipeline::new();
let t1 = ms(1_713_350_400_000);
let t2 = ms(1_713_350_500_000);
pipe.compile_batch(
"(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
t1,
)
.expect("t1 forward base");
pipe.compile_batch(
"(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-04-01)",
t1,
)
.expect("t1 forward super");
pipe.compile_batch(
"(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-03-15)",
t2,
)
.expect("t2 retroactive");
let (s, p) = alice_knows(&pipe);
let mar_20 = ms(1_710_892_800_000);
let post = resolve_semantic(&pipe, s, p, TemporalQuery::bi_temporal(mar_20, t2))
.expect("post-correction");
let dan = pipe.table().lookup("dan").expect("dan");
assert!(matches!(&post.o, crate::Value::Symbol(id) if *id == dan));
let pre = resolve_semantic(
&pipe,
s,
p,
TemporalQuery::bi_temporal(mar_20, ms(t1.as_millis() + 2)),
)
.expect("pre-correction");
let bob = pipe.table().lookup("bob").expect("bob");
assert!(matches!(&pre.o, crate::Value::Symbol(id) if *id == bob));
}
}