1use crate::canonical::{InfRecord, ProRecord, SemRecord};
23use crate::clock::ClockTime;
24use crate::dag::EdgeKind;
25use crate::pipeline::Pipeline;
26use crate::symbol::SymbolId;
27
28#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
32pub struct TemporalQuery {
33 pub as_of: Option<ClockTime>,
36 pub as_committed: Option<ClockTime>,
39}
40
41impl TemporalQuery {
42 #[must_use]
44 pub const fn current() -> Self {
45 Self {
46 as_of: None,
47 as_committed: None,
48 }
49 }
50
51 #[must_use]
53 pub const fn as_of(t: ClockTime) -> Self {
54 Self {
55 as_of: Some(t),
56 as_committed: None,
57 }
58 }
59
60 #[must_use]
62 pub const fn as_committed(t: ClockTime) -> Self {
63 Self {
64 as_of: None,
65 as_committed: Some(t),
66 }
67 }
68
69 #[must_use]
71 pub const fn bi_temporal(as_of: ClockTime, as_committed: ClockTime) -> Self {
72 Self {
73 as_of: Some(as_of),
74 as_committed: Some(as_committed),
75 }
76 }
77}
78
79#[must_use]
86pub fn resolve_semantic(
87 pipeline: &Pipeline,
88 s: SymbolId,
89 p: SymbolId,
90 query: TemporalQuery,
91) -> Option<SemRecord> {
92 let (as_of, as_committed) = effective_points(pipeline, query)?;
93
94 let records = pipeline.semantic_records();
100 let mut best: Option<&SemRecord> = None;
101 for &idx in pipeline.semantic_history_at(s, p) {
102 let Some(record) = records.get(idx) else {
103 continue;
104 };
105 if !is_authoritative_sem(pipeline, record, as_of, as_committed) {
106 continue;
107 }
108 best = Some(match best {
109 None => record,
110 Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
111 Some(cur) => cur,
117 });
118 }
119 best.cloned()
120}
121
122#[must_use]
141pub fn resolve_inferential(
142 pipeline: &Pipeline,
143 s: SymbolId,
144 p: SymbolId,
145 query: TemporalQuery,
146) -> Option<InfRecord> {
147 let (as_of, as_committed) = effective_points(pipeline, query)?;
148
149 let records = pipeline.inferential_records();
153 let mut best: Option<&InfRecord> = None;
154 for &idx in pipeline.inferential_history_at(s, p) {
155 let Some(record) = records.get(idx) else {
156 continue;
157 };
158 if !is_authoritative_inf(pipeline, record, as_of, as_committed) {
159 continue;
160 }
161 best = Some(match best {
162 None => record,
163 Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
164 Some(cur) => cur,
165 });
166 }
167 best.cloned()
168}
169
170#[must_use]
179pub fn resolve_procedural(
180 pipeline: &Pipeline,
181 rule_id: SymbolId,
182 query: TemporalQuery,
183) -> Option<ProRecord> {
184 let (as_of, as_committed) = effective_points(pipeline, query)?;
185
186 let records = pipeline.procedural_records();
188 let mut best: Option<&ProRecord> = None;
189 for &idx in pipeline.procedural_history_for(rule_id) {
190 let Some(record) = records.get(idx) else {
191 continue;
192 };
193 if !is_authoritative_pro(pipeline, record, as_of, as_committed) {
194 continue;
195 }
196 best = Some(match best {
197 None => record,
198 Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
199 Some(cur) => cur,
200 });
201 }
202 best.cloned()
203}
204
205fn effective_points(pipeline: &Pipeline, query: TemporalQuery) -> Option<(ClockTime, ClockTime)> {
210 let watermark = pipeline.last_committed_at()?;
211 Some((
212 query.as_of.unwrap_or(watermark),
213 query.as_committed.unwrap_or(watermark),
214 ))
215}
216
217fn is_authoritative_sem(
229 pipeline: &Pipeline,
230 record: &SemRecord,
231 as_of: ClockTime,
232 as_committed: ClockTime,
233) -> bool {
234 if record.clocks.committed_at > as_committed {
235 return false;
236 }
237 if record.clocks.valid_at > as_of {
238 return false;
239 }
240 let effective_invalid = effective_invalid_at_sem(pipeline, record, as_committed);
241 match effective_invalid {
242 None => true,
243 Some(iv) => iv > as_of,
244 }
245}
246
247fn is_authoritative_inf(
255 pipeline: &Pipeline,
256 record: &InfRecord,
257 as_of: ClockTime,
258 as_committed: ClockTime,
259) -> bool {
260 if record.clocks.committed_at > as_committed {
261 return false;
262 }
263 if record.clocks.valid_at > as_of {
264 return false;
265 }
266 let effective_invalid = effective_invalid_at_inf(pipeline, record, as_committed);
267 match effective_invalid {
268 None => true,
269 Some(iv) => iv > as_of,
270 }
271}
272
273fn effective_invalid_at_inf(
274 pipeline: &Pipeline,
275 record: &InfRecord,
276 as_committed: ClockTime,
277) -> Option<ClockTime> {
278 let mut candidates: Vec<ClockTime> = Vec::new();
279 if let Some(iv) = record.clocks.invalid_at {
280 candidates.push(iv);
281 }
282 collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
283 candidates.into_iter().min()
284}
285
286fn is_authoritative_pro(
292 pipeline: &Pipeline,
293 record: &ProRecord,
294 as_of: ClockTime,
295 as_committed: ClockTime,
296) -> bool {
297 if record.clocks.committed_at > as_committed {
298 return false;
299 }
300 if record.clocks.valid_at > as_of {
301 return false;
302 }
303 let effective_invalid = effective_invalid_at_pro(pipeline, record, as_committed);
304 match effective_invalid {
305 None => true,
306 Some(iv) => iv > as_of,
307 }
308}
309
310fn effective_invalid_at_sem(
322 pipeline: &Pipeline,
323 record: &SemRecord,
324 as_committed: ClockTime,
325) -> Option<ClockTime> {
326 let mut candidates: Vec<ClockTime> = Vec::new();
327 if let Some(iv) = record.clocks.invalid_at {
328 candidates.push(iv);
329 }
330 collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
331 candidates.into_iter().min()
332}
333
334fn effective_invalid_at_pro(
335 pipeline: &Pipeline,
336 record: &ProRecord,
337 as_committed: ClockTime,
338) -> Option<ClockTime> {
339 let mut candidates: Vec<ClockTime> = Vec::new();
340 if let Some(iv) = record.clocks.invalid_at {
341 candidates.push(iv);
342 }
343 collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
344 candidates.into_iter().min()
345}
346
347fn collect_edge_closures(
357 pipeline: &Pipeline,
358 target_memory: SymbolId,
359 as_committed: ClockTime,
360 out: &mut Vec<ClockTime>,
361) {
362 for edge in pipeline.dag().edges_to(target_memory) {
363 if edge.kind != EdgeKind::Supersedes {
364 continue;
365 }
366 if edge.at > as_committed {
367 continue;
368 }
369 if let Some(source_valid_at) = lookup_source_valid_at(pipeline, edge.from) {
370 out.push(source_valid_at);
371 }
372 }
373}
374
375fn lookup_source_valid_at(pipeline: &Pipeline, memory_id: SymbolId) -> Option<ClockTime> {
379 for r in pipeline.semantic_records() {
380 if r.memory_id == memory_id {
381 return Some(r.clocks.valid_at);
382 }
383 }
384 for r in pipeline.procedural_records() {
385 if r.memory_id == memory_id {
386 return Some(r.clocks.valid_at);
387 }
388 }
389 for r in pipeline.inferential_records() {
390 if r.memory_id == memory_id {
391 return Some(r.clocks.valid_at);
392 }
393 }
394 None
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400
401 fn ms(v: u64) -> ClockTime {
402 ClockTime::try_from_millis(v).expect("non-sentinel")
403 }
404
405 fn now() -> ClockTime {
406 ms(1_713_350_400_000)
407 }
408
409 fn compile(pipe: &mut Pipeline, src: &str) {
410 pipe.compile_batch(src, now()).expect("compile");
411 }
412
413 fn alice_knows(pipe: &Pipeline) -> (SymbolId, SymbolId) {
414 let s = pipe.table().lookup("alice").expect("alice");
415 let p = pipe.table().lookup("knows").expect("knows");
416 (s, p)
417 }
418
419 #[test]
420 fn empty_pipeline_resolves_to_none() {
421 let pipe = Pipeline::new();
422 let q = TemporalQuery::current();
423 let got = resolve_semantic(&pipe, SymbolId::new(0), SymbolId::new(1), q);
426 assert!(got.is_none());
427 }
428
429 #[test]
430 fn current_read_returns_latest_forward_supersessor() {
431 let mut pipe = Pipeline::new();
432 compile(
433 &mut pipe,
434 "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
435 );
436 compile(
437 &mut pipe,
438 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
439 );
440 let (s, p) = alice_knows(&pipe);
441 let got = resolve_semantic(&pipe, s, p, TemporalQuery::current())
442 .expect("has authoritative record");
443 let carol = pipe.table().lookup("carol").expect("carol");
445 assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == carol));
446 }
447
448 #[test]
449 fn as_of_past_valid_time_returns_earlier_record() {
450 let mut pipe = Pipeline::new();
451 compile(
452 &mut pipe,
453 "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
454 );
455 compile(
456 &mut pipe,
457 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
458 );
459 let (s, p) = alice_knows(&pipe);
460
461 let between = ms(1_707_955_200_000); let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_of(between))
465 .expect("bob valid at 2024-02-15");
466 let bob = pipe.table().lookup("bob").expect("bob");
467 assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == bob));
468
469 let before = ms(1_704_067_200_000); assert!(resolve_semantic(&pipe, s, p, TemporalQuery::as_of(before)).is_none());
473 }
474
475 #[test]
476 fn retroactive_record_wins_over_earlier_forward_record_in_overlap() {
477 let mut pipe = Pipeline::new();
486 compile(
487 &mut pipe,
488 "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
489 );
490 compile(
491 &mut pipe,
492 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-04-01)",
493 );
494 compile(
495 &mut pipe,
496 "(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-03-15)",
497 );
498 let (s, p) = alice_knows(&pipe);
499 let mar_20 = ms(1_710_892_800_000); let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_of(mar_20))
501 .expect("dan valid at 2024-03-20");
502 let dan = pipe.table().lookup("dan").expect("dan");
503 assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == dan));
504 }
505
506 #[test]
507 fn as_committed_hides_records_committed_after_snapshot() {
508 let mut pipe = Pipeline::new();
511 let t1 = ms(1_713_350_400_000);
512 let t2 = ms(1_713_350_500_000);
513 pipe.compile_batch(
514 "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
515 t1,
516 )
517 .expect("t1");
518 pipe.compile_batch(
519 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
520 t2,
521 )
522 .expect("t2");
523 let (s, p) = alice_knows(&pipe);
524
525 let now_got = resolve_semantic(&pipe, s, p, TemporalQuery::current()).expect("current");
527 let carol = pipe.table().lookup("carol").expect("carol");
528 assert!(matches!(&now_got.o, crate::Value::Symbol(id) if *id == carol));
529
530 let between = ms(t1.as_millis() + 1);
532 let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_committed(between))
533 .expect("t1 visible, t2 not");
534 let bob = pipe.table().lookup("bob").expect("bob");
535 assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == bob));
536 }
537
538 #[test]
539 fn procedural_current_read_follows_supersession_chain() {
540 let mut pipe = Pipeline::new();
541 compile(
542 &mut pipe,
543 r#"(pro @rule_x "t_a" "act_1" :scp @mimir :src @policy :c 1.0)"#,
544 );
545 compile(
546 &mut pipe,
547 r#"(pro @rule_x "t_b" "act_2" :scp @other :src @policy :c 1.0)"#,
548 );
549 let rule = pipe.table().lookup("rule_x").expect("rule_x");
550 let got = resolve_procedural(&pipe, rule, TemporalQuery::current()).expect("current pro");
551 assert!(matches!(&got.action, crate::Value::String(s) if s == "act_2"));
553 }
554
555 #[test]
556 fn procedural_as_committed_returns_older_version() {
557 let mut pipe = Pipeline::new();
558 let t1 = ms(1_713_350_400_000);
559 let t2 = ms(1_713_350_500_000);
560 pipe.compile_batch(
561 r#"(pro @rule_x "t_a" "act_1" :scp @mimir :src @policy :c 1.0)"#,
562 t1,
563 )
564 .expect("t1");
565 pipe.compile_batch(
566 r#"(pro @rule_x "t_b" "act_2" :scp @other :src @policy :c 1.0)"#,
567 t2,
568 )
569 .expect("t2");
570 let rule = pipe.table().lookup("rule_x").expect("rule_x");
571
572 let got =
573 resolve_procedural(&pipe, rule, TemporalQuery::as_committed(t1)).expect("t1-era pro");
574 assert!(matches!(&got.action, crate::Value::String(s) if s == "act_1"));
575 }
576
577 #[test]
578 fn bi_temporal_read_returns_pre_correction_view() {
579 let mut pipe = Pipeline::new();
584 let t1 = ms(1_713_350_400_000);
585 let t2 = ms(1_713_350_500_000);
586 pipe.compile_batch(
587 "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
588 t1,
589 )
590 .expect("t1 forward base");
591 pipe.compile_batch(
592 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-04-01)",
593 t1,
594 )
595 .expect("t1 forward super");
596 pipe.compile_batch(
598 "(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-03-15)",
599 t2,
600 )
601 .expect("t2 retroactive");
602 let (s, p) = alice_knows(&pipe);
603 let mar_20 = ms(1_710_892_800_000); let post = resolve_semantic(&pipe, s, p, TemporalQuery::bi_temporal(mar_20, t2))
608 .expect("post-correction");
609 let dan = pipe.table().lookup("dan").expect("dan");
610 assert!(matches!(&post.o, crate::Value::Symbol(id) if *id == dan));
611
612 let pre = resolve_semantic(
619 &pipe,
620 s,
621 p,
622 TemporalQuery::bi_temporal(mar_20, ms(t1.as_millis() + 2)),
623 )
624 .expect("pre-correction");
625 let bob = pipe.table().lookup("bob").expect("bob");
626 assert!(matches!(&pre.o, crate::Value::Symbol(id) if *id == bob));
627 }
628}