Skip to main content

mimir_core/
resolver.rs

1//! As-of query resolver — implements `temporal-model.md` § 7 read
2//! semantics over the pipeline's in-memory record history.
3//!
4//! Query shapes (§ 7.1 – 7.4) collapse into a single bi-temporal
5//! form:
6//!
7//! - `(as_of, as_committed) = (now, now)` — current state (§ 7.1).
8//! - `(T, now)` — as-of-valid-time (§ 7.2).
9//! - `(now, T_c)` — transaction-time snapshot (§ 7.3).
10//! - `(T, T_c)` — retroactive-correction-aware read (§ 7.4).
11//!
12//! The resolver is a pure projection over `Pipeline::semantic_records`
13//! / `Pipeline::procedural_records` plus the supersession DAG. It
14//! does **not** mutate the pipeline; multiple readers can call it
15//! concurrently against an `Arc<Pipeline>` once that bound lands.
16//!
17//! Scope (6.4): Semantic and Procedural as-of queries. Episodic
18//! queries remain out of scope for the temporal-model graduation.
19//! Inferential resolution (§ 5.4) was added in Phase 3.1 of the
20//! prime-time roadmap — see [`resolve_inferential`].
21
22use crate::canonical::{InfRecord, ProRecord, SemRecord};
23use crate::clock::ClockTime;
24use crate::dag::EdgeKind;
25use crate::pipeline::Pipeline;
26use crate::symbol::SymbolId;
27
28/// Bi-temporal query descriptor. Both fields default to `None`,
29/// which means "use the pipeline's current commit watermark" — so an
30/// unspecified query is a current-state read.
31#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
32pub struct TemporalQuery {
33    /// Valid-time point: "what was true at this time?"
34    /// `None` ⇒ use the pipeline's latest commit as the point.
35    pub as_of: Option<ClockTime>,
36    /// Transaction-time point: "what did the librarian know by
37    /// this time?" `None` ⇒ use the pipeline's latest commit.
38    pub as_committed: Option<ClockTime>,
39}
40
41impl TemporalQuery {
42    /// Current-state read — `(now, now)` per § 7.1.
43    #[must_use]
44    pub const fn current() -> Self {
45        Self {
46            as_of: None,
47            as_committed: None,
48        }
49    }
50
51    /// As-of-valid-time read — `(T, now)` per § 7.2.
52    #[must_use]
53    pub const fn as_of(t: ClockTime) -> Self {
54        Self {
55            as_of: Some(t),
56            as_committed: None,
57        }
58    }
59
60    /// Transaction-time snapshot — `(now, T_c)` per § 7.3.
61    #[must_use]
62    pub const fn as_committed(t: ClockTime) -> Self {
63        Self {
64            as_of: None,
65            as_committed: Some(t),
66        }
67    }
68
69    /// Fully-specified bi-temporal point per § 7.4.
70    #[must_use]
71    pub const fn bi_temporal(as_of: ClockTime, as_committed: ClockTime) -> Self {
72        Self {
73            as_of: Some(as_of),
74            as_committed: Some(as_committed),
75        }
76    }
77}
78
79/// Resolve the currently-authoritative Semantic memory for `(s, p)`
80/// under the temporal query, or `None` if no Semantic memory matches.
81///
82/// Returns a clone because the resolver is a pure projection — it
83/// doesn't hand out references tied to the pipeline's internal
84/// vector (which could be invalidated by a subsequent commit).
85#[must_use]
86pub fn resolve_semantic(
87    pipeline: &Pipeline,
88    s: SymbolId,
89    p: SymbolId,
90    query: TemporalQuery,
91) -> Option<SemRecord> {
92    let (as_of, as_committed) = effective_points(pipeline, query)?;
93
94    // Use the `(s, p) → indices` index so we scan only the history
95    // at this specific key — O(k) where k is typically 1–3 — instead
96    // of the whole `semantic_records` vec. Per `read-protocol.md`
97    // § 3.1 and graduation criterion #4 (p50 < 1ms on a 1M-record
98    // warm index).
99    let records = pipeline.semantic_records();
100    let mut best: Option<&SemRecord> = None;
101    for &idx in pipeline.semantic_history_at(s, p) {
102        let Some(record) = records.get(idx) else {
103            continue;
104        };
105        if !is_authoritative_sem(pipeline, record, as_of, as_committed) {
106            continue;
107        }
108        best = Some(match best {
109            None => record,
110            Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
111            // Stable tie-break: later committed_at wins. An equal
112            // committed_at would collide in memory_id — impossible,
113            // because auto-supersession rejects it (§ 5.1 equal
114            // valid_at → SemanticSupersessionConflict) — so we
115            // preserve the first candidate if this case ever fires.
116            Some(cur) => cur,
117        });
118    }
119    best.cloned()
120}
121
122/// Resolve the currently-authoritative Inferential memory for
123/// `(s, p)` under the temporal query, or `None` if no Inferential
124/// matches. Mirrors [`resolve_semantic`] in shape and semantics —
125/// per `temporal-model.md` § 5.4 an Inferential re-derivation with
126/// the same `(s, p)` and a later `valid_at` supersedes the prior
127/// (same rule as Semantic § 5.1).
128///
129/// Returns a clone for the same reason as [`resolve_semantic`]: the
130/// resolver is a pure projection and a subsequent commit may
131/// invalidate internal references.
132///
133/// `StaleParent` edges (§ 5.4's read-time stale-flag overlay) are
134/// explicitly NOT consulted here — surfacing them requires a
135/// `ReadFlags` bit that `read-protocol.md` § 5 does not yet allocate.
136/// That overlay lands with a spec amendment to the flag enum; the
137/// resolver's definition of "authoritative" is unchanged by it
138/// (stale Inferentials stay authoritative — only the surface flag
139/// changes).
140#[must_use]
141pub fn resolve_inferential(
142    pipeline: &Pipeline,
143    s: SymbolId,
144    p: SymbolId,
145    query: TemporalQuery,
146) -> Option<InfRecord> {
147    let (as_of, as_committed) = effective_points(pipeline, query)?;
148
149    // Same `(s, p) → indices` pattern as Sem — O(k) in the history
150    // length at this key rather than O(n) over the whole Inferential
151    // record vec.
152    let records = pipeline.inferential_records();
153    let mut best: Option<&InfRecord> = None;
154    for &idx in pipeline.inferential_history_at(s, p) {
155        let Some(record) = records.get(idx) else {
156            continue;
157        };
158        if !is_authoritative_inf(pipeline, record, as_of, as_committed) {
159            continue;
160        }
161        best = Some(match best {
162            None => record,
163            Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
164            Some(cur) => cur,
165        });
166    }
167    best.cloned()
168}
169
170/// Resolve the currently-authoritative Procedural memory for
171/// `rule_id` under the temporal query. Returns `None` if no
172/// Procedural with that rule is authoritative at the requested
173/// bi-temporal point.
174///
175/// Scope: keyed by `rule_id` only. The secondary `(trigger, scope)`
176/// index from 6.3b is not queried here — v1 read API exposes the
177/// primary key.
178#[must_use]
179pub fn resolve_procedural(
180    pipeline: &Pipeline,
181    rule_id: SymbolId,
182    query: TemporalQuery,
183) -> Option<ProRecord> {
184    let (as_of, as_committed) = effective_points(pipeline, query)?;
185
186    // `rule_id → indices` index avoids scanning the full history.
187    let records = pipeline.procedural_records();
188    let mut best: Option<&ProRecord> = None;
189    for &idx in pipeline.procedural_history_for(rule_id) {
190        let Some(record) = records.get(idx) else {
191            continue;
192        };
193        if !is_authoritative_pro(pipeline, record, as_of, as_committed) {
194            continue;
195        }
196        best = Some(match best {
197            None => record,
198            Some(cur) if record.clocks.committed_at > cur.clocks.committed_at => record,
199            Some(cur) => cur,
200        });
201    }
202    best.cloned()
203}
204
205/// Resolve `(as_of, as_committed)` against the pipeline's current
206/// commit watermark. Returns `None` if the pipeline hasn't committed
207/// anything yet — queries against an empty pipeline trivially return
208/// `None` because there are no records to match.
209fn effective_points(pipeline: &Pipeline, query: TemporalQuery) -> Option<(ClockTime, ClockTime)> {
210    let watermark = pipeline.last_committed_at()?;
211    Some((
212        query.as_of.unwrap_or(watermark),
213        query.as_committed.unwrap_or(watermark),
214    ))
215}
216
217/// A Semantic record is authoritative at `(as_of, as_committed)` iff:
218///
219/// 1. `record.committed_at ≤ as_committed` — the librarian knew about
220///    this record by the transaction-time point.
221/// 2. `record.valid_at ≤ as_of` — the record's validity started by
222///    the valid-time point.
223/// 3. `effective_invalid_at(record, as_committed) > as_of` OR None —
224///    the record's validity hadn't ended by the valid-time point,
225///    considering both retroactive record-level closures (§ 5.1
226///    backward case) and forward-case closures derived from
227///    `Supersedes` edges committed by `as_committed`.
228fn is_authoritative_sem(
229    pipeline: &Pipeline,
230    record: &SemRecord,
231    as_of: ClockTime,
232    as_committed: ClockTime,
233) -> bool {
234    if record.clocks.committed_at > as_committed {
235        return false;
236    }
237    if record.clocks.valid_at > as_of {
238        return false;
239    }
240    let effective_invalid = effective_invalid_at_sem(pipeline, record, as_committed);
241    match effective_invalid {
242        None => true,
243        Some(iv) => iv > as_of,
244    }
245}
246
247/// An Inferential record is authoritative at `(as_of, as_committed)`.
248/// Same shape as Semantic: must be committed by the transaction-time
249/// point, valid-time started by the valid-time point, and not closed
250/// by own `invalid_at` or a `Supersedes` edge committed by
251/// `as_committed`. Per temporal-model.md § 6.2 only `Supersedes`
252/// edges close validity — `StaleParent` does not, so the stale flag
253/// is orthogonal to authoritativeness.
254fn is_authoritative_inf(
255    pipeline: &Pipeline,
256    record: &InfRecord,
257    as_of: ClockTime,
258    as_committed: ClockTime,
259) -> bool {
260    if record.clocks.committed_at > as_committed {
261        return false;
262    }
263    if record.clocks.valid_at > as_of {
264        return false;
265    }
266    let effective_invalid = effective_invalid_at_inf(pipeline, record, as_committed);
267    match effective_invalid {
268        None => true,
269        Some(iv) => iv > as_of,
270    }
271}
272
273fn effective_invalid_at_inf(
274    pipeline: &Pipeline,
275    record: &InfRecord,
276    as_committed: ClockTime,
277) -> Option<ClockTime> {
278    let mut candidates: Vec<ClockTime> = Vec::new();
279    if let Some(iv) = record.clocks.invalid_at {
280        candidates.push(iv);
281    }
282    collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
283    candidates.into_iter().min()
284}
285
286/// A Procedural record is authoritative at `(as_of, as_committed)`.
287/// Same shape as Semantic; `valid_at = committed_at` by spec § 4.3,
288/// so a Pro record's own `invalid_at` is always `None` (Pro has no
289/// retroactive case) and closures come entirely from `Supersedes`
290/// edges.
291fn is_authoritative_pro(
292    pipeline: &Pipeline,
293    record: &ProRecord,
294    as_of: ClockTime,
295    as_committed: ClockTime,
296) -> bool {
297    if record.clocks.committed_at > as_committed {
298        return false;
299    }
300    if record.clocks.valid_at > as_of {
301        return false;
302    }
303    let effective_invalid = effective_invalid_at_pro(pipeline, record, as_committed);
304    match effective_invalid {
305        None => true,
306        Some(iv) => iv > as_of,
307    }
308}
309
310/// Compute a Semantic record's effective `invalid_at` as observed at
311/// transaction time `as_committed`, combining:
312///
313/// - The record's own `invalid_at` field (set at write time for
314///   retroactive corrections per § 5.1 backward case).
315/// - The earliest `Supersedes`-edge-derived closure from any forward
316///   supersession known by `as_committed`: for each incoming
317///   Supersedes edge `e` with `e.at ≤ as_committed`, look up
318///   `e.from`'s `valid_at` and take the minimum.
319///
320/// Returns the minimum of all sources, or `None` if none apply.
321fn effective_invalid_at_sem(
322    pipeline: &Pipeline,
323    record: &SemRecord,
324    as_committed: ClockTime,
325) -> Option<ClockTime> {
326    let mut candidates: Vec<ClockTime> = Vec::new();
327    if let Some(iv) = record.clocks.invalid_at {
328        candidates.push(iv);
329    }
330    collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
331    candidates.into_iter().min()
332}
333
334fn effective_invalid_at_pro(
335    pipeline: &Pipeline,
336    record: &ProRecord,
337    as_committed: ClockTime,
338) -> Option<ClockTime> {
339    let mut candidates: Vec<ClockTime> = Vec::new();
340    if let Some(iv) = record.clocks.invalid_at {
341        candidates.push(iv);
342    }
343    collect_edge_closures(pipeline, record.memory_id, as_committed, &mut candidates);
344    candidates.into_iter().min()
345}
346
347/// For every `Supersedes` edge targeting `target_memory` with
348/// `edge.at ≤ as_committed`, push the source memory's `valid_at`
349/// into `out`. Per invariant § 6.2 #4, a Supersedes edge closes the
350/// target's validity at the source's `valid_at`.
351///
352/// Sources are looked up in the pipeline's Sem and Pro histories.
353/// An edge whose source isn't found is skipped silently — this
354/// shouldn't happen against a well-formed log, but skipping is safer
355/// than panicking.
356fn collect_edge_closures(
357    pipeline: &Pipeline,
358    target_memory: SymbolId,
359    as_committed: ClockTime,
360    out: &mut Vec<ClockTime>,
361) {
362    for edge in pipeline.dag().edges_to(target_memory) {
363        if edge.kind != EdgeKind::Supersedes {
364            continue;
365        }
366        if edge.at > as_committed {
367            continue;
368        }
369        if let Some(source_valid_at) = lookup_source_valid_at(pipeline, edge.from) {
370            out.push(source_valid_at);
371        }
372    }
373}
374
375/// Find the `valid_at` of a memory by ID, searching Sem, Pro, then
376/// Inf histories. Linear in total record count — acceptable for v1;
377/// a `memory_id -> record` index is an obvious optimization.
378fn lookup_source_valid_at(pipeline: &Pipeline, memory_id: SymbolId) -> Option<ClockTime> {
379    for r in pipeline.semantic_records() {
380        if r.memory_id == memory_id {
381            return Some(r.clocks.valid_at);
382        }
383    }
384    for r in pipeline.procedural_records() {
385        if r.memory_id == memory_id {
386            return Some(r.clocks.valid_at);
387        }
388    }
389    for r in pipeline.inferential_records() {
390        if r.memory_id == memory_id {
391            return Some(r.clocks.valid_at);
392        }
393    }
394    None
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400
401    fn ms(v: u64) -> ClockTime {
402        ClockTime::try_from_millis(v).expect("non-sentinel")
403    }
404
405    fn now() -> ClockTime {
406        ms(1_713_350_400_000)
407    }
408
409    fn compile(pipe: &mut Pipeline, src: &str) {
410        pipe.compile_batch(src, now()).expect("compile");
411    }
412
413    fn alice_knows(pipe: &Pipeline) -> (SymbolId, SymbolId) {
414        let s = pipe.table().lookup("alice").expect("alice");
415        let p = pipe.table().lookup("knows").expect("knows");
416        (s, p)
417    }
418
419    #[test]
420    fn empty_pipeline_resolves_to_none() {
421        let pipe = Pipeline::new();
422        let q = TemporalQuery::current();
423        // Can't even construct s/p symbols — pass in fabricated ones
424        // to prove the resolver handles an empty state.
425        let got = resolve_semantic(&pipe, SymbolId::new(0), SymbolId::new(1), q);
426        assert!(got.is_none());
427    }
428
429    #[test]
430    fn current_read_returns_latest_forward_supersessor() {
431        let mut pipe = Pipeline::new();
432        compile(
433            &mut pipe,
434            "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
435        );
436        compile(
437            &mut pipe,
438            "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
439        );
440        let (s, p) = alice_knows(&pipe);
441        let got = resolve_semantic(&pipe, s, p, TemporalQuery::current())
442            .expect("has authoritative record");
443        // @carol's valid_at is later — the head of the forward chain.
444        let carol = pipe.table().lookup("carol").expect("carol");
445        assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == carol));
446    }
447
448    #[test]
449    fn as_of_past_valid_time_returns_earlier_record() {
450        let mut pipe = Pipeline::new();
451        compile(
452            &mut pipe,
453            "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
454        );
455        compile(
456            &mut pipe,
457            "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
458        );
459        let (s, p) = alice_knows(&pipe);
460
461        // Query at 2024-02-15 — between the two valid_at points.
462        // @bob was valid, @carol not yet.
463        let between = ms(1_707_955_200_000); // 2024-02-15
464        let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_of(between))
465            .expect("bob valid at 2024-02-15");
466        let bob = pipe.table().lookup("bob").expect("bob");
467        assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == bob));
468
469        // Query at 2024-01-01 — before either record's valid_at.
470        // Nothing authoritative yet.
471        let before = ms(1_704_067_200_000); // 2024-01-01
472        assert!(resolve_semantic(&pipe, s, p, TemporalQuery::as_of(before)).is_none());
473    }
474
475    #[test]
476    fn retroactive_record_wins_over_earlier_forward_record_in_overlap() {
477        // Three writes (all with valid_at before `now()` = 2024-04-17):
478        //   M1: valid_at 2024-01-15 (forward-superseded by M2)
479        //   M2: valid_at 2024-04-01 (current)
480        //   M3: retroactive at valid_at 2024-03-15, invalid_at=M2's
481        // Query at 2024-03-20 — M3 and M1 both overlap:
482        //   M1: valid 2024-01-15, edge-closed at M2's 2024-04-01 → still valid at 2024-03-20
483        //   M3: valid 2024-03-15 to 2024-04-01 → valid
484        // Tie-break by committed_at: M3 > M1, so M3 wins.
485        let mut pipe = Pipeline::new();
486        compile(
487            &mut pipe,
488            "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
489        );
490        compile(
491            &mut pipe,
492            "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-04-01)",
493        );
494        compile(
495            &mut pipe,
496            "(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-03-15)",
497        );
498        let (s, p) = alice_knows(&pipe);
499        let mar_20 = ms(1_710_892_800_000); // 2024-03-20
500        let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_of(mar_20))
501            .expect("dan valid at 2024-03-20");
502        let dan = pipe.table().lookup("dan").expect("dan");
503        assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == dan));
504    }
505
506    #[test]
507    fn as_committed_hides_records_committed_after_snapshot() {
508        // Two writes; query with as_committed between them — the
509        // second write shouldn't be visible.
510        let mut pipe = Pipeline::new();
511        let t1 = ms(1_713_350_400_000);
512        let t2 = ms(1_713_350_500_000);
513        pipe.compile_batch(
514            "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
515            t1,
516        )
517        .expect("t1");
518        pipe.compile_batch(
519            "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-15)",
520            t2,
521        )
522        .expect("t2");
523        let (s, p) = alice_knows(&pipe);
524
525        // Current read: sees t2's carol write.
526        let now_got = resolve_semantic(&pipe, s, p, TemporalQuery::current()).expect("current");
527        let carol = pipe.table().lookup("carol").expect("carol");
528        assert!(matches!(&now_got.o, crate::Value::Symbol(id) if *id == carol));
529
530        // as_committed between t1 and t2: t2's write invisible.
531        let between = ms(t1.as_millis() + 1);
532        let got = resolve_semantic(&pipe, s, p, TemporalQuery::as_committed(between))
533            .expect("t1 visible, t2 not");
534        let bob = pipe.table().lookup("bob").expect("bob");
535        assert!(matches!(&got.o, crate::Value::Symbol(id) if *id == bob));
536    }
537
538    #[test]
539    fn procedural_current_read_follows_supersession_chain() {
540        let mut pipe = Pipeline::new();
541        compile(
542            &mut pipe,
543            r#"(pro @rule_x "t_a" "act_1" :scp @mimir :src @policy :c 1.0)"#,
544        );
545        compile(
546            &mut pipe,
547            r#"(pro @rule_x "t_b" "act_2" :scp @other :src @policy :c 1.0)"#,
548        );
549        let rule = pipe.table().lookup("rule_x").expect("rule_x");
550        let got = resolve_procedural(&pipe, rule, TemporalQuery::current()).expect("current pro");
551        // Second commit's action wins.
552        assert!(matches!(&got.action, crate::Value::String(s) if s == "act_2"));
553    }
554
555    #[test]
556    fn procedural_as_committed_returns_older_version() {
557        let mut pipe = Pipeline::new();
558        let t1 = ms(1_713_350_400_000);
559        let t2 = ms(1_713_350_500_000);
560        pipe.compile_batch(
561            r#"(pro @rule_x "t_a" "act_1" :scp @mimir :src @policy :c 1.0)"#,
562            t1,
563        )
564        .expect("t1");
565        pipe.compile_batch(
566            r#"(pro @rule_x "t_b" "act_2" :scp @other :src @policy :c 1.0)"#,
567            t2,
568        )
569        .expect("t2");
570        let rule = pipe.table().lookup("rule_x").expect("rule_x");
571
572        let got =
573            resolve_procedural(&pipe, rule, TemporalQuery::as_committed(t1)).expect("t1-era pro");
574        assert!(matches!(&got.action, crate::Value::String(s) if s == "act_1"));
575    }
576
577    #[test]
578    fn bi_temporal_read_returns_pre_correction_view() {
579        // Spec § 7.4: at as_committed before the correction, the
580        // pre-correction view. Build a chain with a retroactive
581        // correction and verify the bi-temporal read ignores it
582        // when as_committed predates the correction's commit.
583        let mut pipe = Pipeline::new();
584        let t1 = ms(1_713_350_400_000);
585        let t2 = ms(1_713_350_500_000);
586        pipe.compile_batch(
587            "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)",
588            t1,
589        )
590        .expect("t1 forward base");
591        pipe.compile_batch(
592            "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-04-01)",
593            t1,
594        )
595        .expect("t1 forward super");
596        // Retroactive correction committed at t2 only.
597        pipe.compile_batch(
598            "(sem @alice @knows @dan :src @observation :c 0.8 :v 2024-03-15)",
599            t2,
600        )
601        .expect("t2 retroactive");
602        let (s, p) = alice_knows(&pipe);
603        let mar_20 = ms(1_710_892_800_000); // 2024-03-20
604
605        // Post-correction bi-temporal read at 2024-03-20:
606        // retroactive dan wins.
607        let post = resolve_semantic(&pipe, s, p, TemporalQuery::bi_temporal(mar_20, t2))
608            .expect("post-correction");
609        let dan = pipe.table().lookup("dan").expect("dan");
610        assert!(matches!(&post.o, crate::Value::Symbol(id) if *id == dan));
611
612        // Pre-correction bi-temporal read at 2024-03-20 with
613        // as_committed right after t1 — dan not yet committed. M1
614        // (bob) is the pre-correction truth at 2024-03-20 because M2
615        // (carol, valid_at=2024-04-01) doesn't start until after.
616        // Use t1 + 2 so monotonic-enforced committed_at bumps for the
617        // two t1-batches land below this snapshot.
618        let pre = resolve_semantic(
619            &pipe,
620            s,
621            p,
622            TemporalQuery::bi_temporal(mar_20, ms(t1.as_millis() + 2)),
623        )
624        .expect("pre-correction");
625        let bob = pipe.table().lookup("bob").expect("bob");
626        assert!(matches!(&pre.o, crate::Value::Symbol(id) if *id == bob));
627    }
628}