Skip to main content

mimir_core/
read.rs

1//! Hot-path read API per `read-protocol.md`.
2//!
3//! The entry point is [`Pipeline::execute_query`]. It parses a single
4//! `(query ...)` form, resolves the keyword predicates against the
5//! pipeline's symbol table (without mutation), and projects over the
6//! pipeline's record history and supersession DAG via the as-of
7//! resolver (6.4).
8//!
9//! Scope (7.2 — filters, flags, framing, filtered array):
10//!
11//! | Predicate             | Status |
12//! |-----------------------|---|
13//! | `:kind`               | `sem`, `pro` supported; `epi`, `inf` return empty |
14//! | `:s @S`, `:p @P`      | supported (Semantic only) |
15//! | `:as_of T`            | supported (delegates to resolver § 7.2) |
16//! | `:as_committed T`     | supported (§ 7.3) |
17//! | `:limit N`            | supported; sets `TRUNCATED` flag when hit |
18//! | `:include_retired`    | default `false`; retired-symbol records drop unless set |
19//! | `:include_projected`  | default `false`; projected records drop unless set |
20//! | `:confidence_threshold` | default 0.5; flag-only threshold (does not filter) |
21//! | `:explain_filtered`   | default `false`; when true, dropped records surface in `filtered` |
22//! | `:show_framing`       | default `false`; when true, `framings` parallels `records` |
23//! | `:debug_mode`         | shorthand for `:explain_filtered` + `:show_framing` |
24//! | everything else       | [`ReadError::UnsupportedPredicate`] |
25//!
26//! Flag surface (§ 6):
27//! - `STALE_SYMBOL` — any kept record references a retired symbol.
28//! - `LOW_CONFIDENCE` — any kept record's effective (decay-adjusted)
29//!   confidence < threshold. Computed via
30//!   [`crate::decay::effective_confidence`] against the pipeline's
31//!   current [`crate::decay::DecayConfig`].
32//! - `PROJECTED_PRESENT` — any kept record has `flags.projected`.
33//! - `TRUNCATED` — `:limit` was hit.
34//! - `EXPLAIN_FILTERED_ACTIVE` — `:explain_filtered` (or `:debug_mode`) active.
35//!
36//! `Framing` values at 7.2: `Advisory` (default), `Historical`
37//! (`:as_of` predates `query_committed_at`), `Projected` (record's
38//! `flags.projected` bit set). `Authoritative` lands in Step 4 when
39//! pin / authoritative write forms are wired.
40
41use thiserror::Error;
42
43use crate::bind::SymbolTable;
44use crate::canonical::{CanonicalRecord, EpiRecord, InfRecord, ProRecord, SemRecord};
45use crate::clock::ClockTime;
46use crate::confidence::Confidence;
47use crate::decay::{effective_confidence, DecayFlags};
48use crate::memory_kind::MemoryKindTag;
49use crate::parse::{self, ParseError, RawSymbolName, RawValue, UnboundForm};
50use crate::pipeline::Pipeline;
51use crate::resolver::{self, TemporalQuery};
52use crate::semantic::source_kind_from_name;
53use crate::source_kind::SourceKind;
54use crate::symbol::SymbolId;
55use crate::value::Value;
56
57/// Result of a read-path query per `read-protocol.md` § 5.
58#[derive(Clone, Debug, PartialEq)]
59pub struct ReadResult {
60    /// Matched canonical records — mixed kinds when `:kind` is
61    /// unspecified, single-kind otherwise. Order: by `committed_at`
62    /// ascending.
63    pub records: Vec<CanonicalRecord>,
64    /// Per-record framing, parallel to `records`. Empty unless
65    /// `:show_framing true` (or `:debug_mode true`). When populated,
66    /// `framings.len() == records.len()`.
67    pub framings: Vec<Framing>,
68    /// Records that were dropped by a filter, surfaced when
69    /// `:explain_filtered true` (or `:debug_mode true`). Empty
70    /// otherwise — the default silent-filter UX per spec § 11.1.
71    pub filtered: Vec<FilteredMemory>,
72    /// Flag bitset; see [`ReadFlags`] constants.
73    pub flags: ReadFlags,
74    /// Effective `as_of` used for this query (the pipeline's latest
75    /// commit if the predicate was absent).
76    pub as_of: ClockTime,
77    /// Effective `as_committed`.
78    pub as_committed: ClockTime,
79    /// Snapshot boundary — pipeline's latest committed clock at
80    /// query start. Per `read-protocol.md` § 9.
81    pub query_committed_at: ClockTime,
82}
83
84/// Framing classification per `read-protocol.md` § 5. Attached
85/// per-record in [`ReadResult::framings`] when `:show_framing true`.
86#[derive(Copy, Clone, Debug, PartialEq, Eq)]
87pub enum Framing {
88    /// Normal current-state record.
89    Advisory,
90    /// Returned because `:as_of` predates `query_committed_at`.
91    Historical,
92    /// Record has `flags.projected` — an intent / plan, not current
93    /// truth. Only reachable when `:include_projected true`.
94    Projected,
95    /// Record is pinned or operator-authoritative — decay suspended,
96    /// high trust. See `confidence-decay.md` §§ 7 / 8.
97    Authoritative {
98        /// Which flag source authorised the pin.
99        set_by: FramingSource,
100    },
101}
102
103/// Who authorised an authoritative framing. Spec § 5 /
104/// `confidence-decay.md` § 7 / § 8.
105#[derive(Copy, Clone, Debug, PartialEq, Eq)]
106pub enum FramingSource {
107    /// Agent-invokable `(pin @mem)` flag (spec § 7).
108    AgentPinned,
109    /// User-applied `(authoritative_set @mem)` flag (spec § 8).
110    OperatorAuthoritative,
111}
112
113/// Why a record was filtered out of [`ReadResult::records`]. Exposed
114/// via [`ReadResult::filtered`] when `:explain_filtered true`.
115#[derive(Copy, Clone, Debug, PartialEq, Eq)]
116pub enum FilterReason {
117    /// The record references at least one retired symbol and
118    /// `:include_retired` was not set.
119    RetiredSymbolExcluded,
120    /// The record has `flags.projected` and `:include_projected`
121    /// was not set.
122    ProjectedExcluded,
123}
124
125/// A record that was dropped from the result set by a filter.
126#[derive(Clone, Debug, PartialEq)]
127pub struct FilteredMemory {
128    /// The record that was filtered. Agents may still inspect its
129    /// full shape.
130    pub record: CanonicalRecord,
131    /// Reason for the drop.
132    pub reason: FilterReason,
133}
134
135/// Read-result flag bitset per `read-protocol.md` § 5 / § 6.
136///
137/// Bits reserved but not issued in v1 (part of the stable on-wire
138/// layout, always clear): bit 1 (was `CONFLICT`), bit 5 (was
139/// `CROSS_WORKSPACE`), bit 6 (was `CONTESTED`). See spec note.
140#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
141pub struct ReadFlags(u32);
142
143impl ReadFlags {
144    /// At least one kept record references a retired symbol.
145    pub const STALE_SYMBOL: u32 = 1 << 0;
146    // bit 1 reserved — previously CONFLICT (SSI); single writer never sets it.
147    /// At least one kept record's confidence is below the query's
148    /// `:confidence_threshold` (default 0.5).
149    pub const LOW_CONFIDENCE: u32 = 1 << 2;
150    /// At least one kept record has `flags.projected`.
151    pub const PROJECTED_PRESENT: u32 = 1 << 3;
152    /// The result was capped by `:limit` before exhausting all matches.
153    pub const TRUNCATED: u32 = 1 << 4;
154    // bit 5 reserved — previously CROSS_WORKSPACE; out of scope.
155    // bit 6 reserved — previously CONTESTED; out of scope.
156    /// `:explain_filtered` is active for this query; `filtered` may
157    /// be non-empty.
158    pub const EXPLAIN_FILTERED_ACTIVE: u32 = 1 << 7;
159
160    /// Construct an empty flag set.
161    #[must_use]
162    pub const fn empty() -> Self {
163        Self(0)
164    }
165
166    /// `true` if any of `bits` is set.
167    #[must_use]
168    pub const fn contains(self, bits: u32) -> bool {
169        self.0 & bits != 0
170    }
171
172    /// Set `bits`, returning the new flags.
173    #[must_use]
174    pub const fn with(self, bits: u32) -> Self {
175        Self(self.0 | bits)
176    }
177
178    /// Raw u32 view — for interop with e.g. wire serialization.
179    #[must_use]
180    pub const fn bits(self) -> u32 {
181        self.0
182    }
183}
184
185/// Memory kind filter used by `:kind`.
186#[derive(Copy, Clone, Debug, PartialEq, Eq)]
187pub enum KindFilter {
188    /// Semantic memory.
189    Sem,
190    /// Procedural memory.
191    Pro,
192    /// Episodic memory (no Episodic resolver yet — queries return empty).
193    Epi,
194    /// Inferential memory. Resolver wired in Phase 3.1 per
195    /// `temporal-model.md` § 5.4; keyed by `(s, p)` with
196    /// re-derivation-based auto-supersession mirroring Sem § 5.1.
197    Inf,
198}
199
200/// Read-path error family. Every variant means the query did not
201/// execute; no state was observed or mutated.
202#[derive(Debug, Error, PartialEq)]
203pub enum ReadError {
204    /// Parse failure on the query input.
205    #[error("parse error: {0}")]
206    Parse(#[from] ParseError),
207
208    /// Input was not a single `(query ...)` form — the read API
209    /// expects exactly one query per call.
210    #[error("expected a single (query ...) form, got {count} forms")]
211    NotASingleQuery {
212        /// Number of forms parsed.
213        count: usize,
214    },
215
216    /// Input parsed as something other than a `(query ...)` form.
217    #[error("input is not a query form")]
218    NotAQuery,
219
220    /// Parsed but includes a predicate whose behavior hasn't been
221    /// wired yet. 7.1 scope note: see the module docstring.
222    #[error("query predicate {predicate} is not supported in this milestone")]
223    UnsupportedPredicate {
224        /// Predicate keyword (e.g. `"in_episode"`).
225        predicate: &'static str,
226    },
227
228    /// A predicate's value had the wrong shape for its keyword
229    /// (e.g. `:limit @not_a_number`).
230    #[error("invalid value for {keyword}: {reason}")]
231    InvalidPredicate {
232        /// Predicate keyword.
233        keyword: &'static str,
234        /// Human-readable reason.
235        reason: String,
236    },
237
238    /// `:kind` was provided with a value that isn't one of the four
239    /// canonical kinds.
240    #[error("invalid kind {got}: expected one of sem, pro, epi, inf")]
241    InvalidKind {
242        /// The literal bareword supplied.
243        got: String,
244    },
245
246    /// A predicate was combined with a `:kind` value it doesn't
247    /// apply to — e.g. `:s @alice :kind pro`. Per
248    /// `read-protocol.md` § 4.1 the `:s` / `:p` predicates are
249    /// SEM / INF only; silently returning no records on this input
250    /// would violate spec invariant § 13 #9 "no silent information
251    /// loss," so v1 rejects with a typed error.
252    #[error(
253        "predicate {predicate} is not compatible with :kind {kind:?} (it applies to SEM / INF only)"
254    )]
255    IncompatiblePredicates {
256        /// The offending predicate keyword.
257        predicate: &'static str,
258        /// The `:kind` value it was combined with.
259        kind: KindFilter,
260    },
261}
262
263impl Pipeline {
264    /// Execute a single `(query ...)` form against this pipeline.
265    ///
266    /// Does NOT mutate pipeline state: no symbol allocations, no
267    /// clock advances, no log writes. Symbol references in the
268    /// query that do not resolve against the pipeline's current
269    /// symbol table produce no matches (a nonexistent symbol is
270    /// treated as "no memories at this key") rather than an error.
271    ///
272    /// # Errors
273    ///
274    /// See [`ReadError`] variants.
275    pub fn execute_query(&self, input: &str) -> Result<ReadResult, ReadError> {
276        let forms = parse::parse(input)?;
277        let count = forms.len();
278        let Some(form) = forms.into_iter().next().filter(|_| count == 1) else {
279            return Err(ReadError::NotASingleQuery { count });
280        };
281        let UnboundForm::Query { selector, keywords } = form else {
282            return Err(ReadError::NotAQuery);
283        };
284        // 7.1 rejects a positional selector — spec § 4 doesn't
285        // define selector semantics concretely yet.
286        if selector.is_some() {
287            return Err(ReadError::UnsupportedPredicate {
288                predicate: "selector",
289            });
290        }
291        execute(self, keywords)
292    }
293}
294
295fn execute(pipeline: &Pipeline, keywords: parse::KeywordArgs) -> Result<ReadResult, ReadError> {
296    let predicates = parse_predicates(pipeline, keywords)?;
297
298    // Snapshot boundary — read-protocol.md § 9 snapshot isolation.
299    // An empty pipeline has no commits yet; by § 9 convention every
300    // clock defaults to that watermark, so there's nothing to match.
301    let Some(query_committed_at) = pipeline.last_committed_at() else {
302        return Ok(empty_result(&predicates));
303    };
304
305    let effective_as_of = predicates.as_of.unwrap_or(query_committed_at);
306    let effective_as_committed = predicates.as_committed.unwrap_or(query_committed_at);
307    let temporal = TemporalQuery::bi_temporal(effective_as_of, effective_as_committed);
308
309    check_predicate_compatibility(&predicates)?;
310
311    let candidates = collect_candidates(pipeline, &predicates, temporal);
312    let (kept, filtered) = apply_filters(candidates, pipeline.table(), &predicates);
313    let mut flags = compute_flags(&kept, pipeline, query_committed_at, &predicates);
314
315    let limit_value = predicates.limit.unwrap_or(DEFAULT_LIMIT);
316    let (records, flags_with_limit) = apply_limit(kept, limit_value, flags);
317    flags = flags_with_limit;
318
319    let framings = if predicates.show_framing {
320        records
321            .iter()
322            .map(|r| compute_framing(r, pipeline, predicates.as_of, query_committed_at))
323            .collect()
324    } else {
325        Vec::new()
326    };
327
328    Ok(ReadResult {
329        records,
330        framings,
331        filtered,
332        flags,
333        as_of: effective_as_of,
334        as_committed: effective_as_committed,
335        query_committed_at,
336    })
337}
338
339/// `:s` and `:p` are SEM / INF only per read-protocol.md § 4.1.
340/// Combining them with `:kind pro` or `:kind epi` is a spec
341/// violation — reject loudly rather than silently returning an
342/// empty result (spec § 13 #9 "no silent information loss").
343fn check_predicate_compatibility(predicates: &Predicates) -> Result<(), ReadError> {
344    let Some(k) = predicates.kind else {
345        return Ok(());
346    };
347    if !matches!(k, KindFilter::Pro | KindFilter::Epi) {
348        return Ok(());
349    }
350    if !matches!(predicates.subject, SymbolFilter::Absent) {
351        return Err(ReadError::IncompatiblePredicates {
352            predicate: "s",
353            kind: k,
354        });
355    }
356    if !matches!(predicates.predicate, SymbolFilter::Absent) {
357        return Err(ReadError::IncompatiblePredicates {
358            predicate: "p",
359            kind: k,
360        });
361    }
362    Ok(())
363}
364
365/// Gather candidate records via the temporal resolver, plus projected
366/// records (which bypass the resolver because their `valid_at` is
367/// future) when `:include_projected true`. Applies the Episode-scoped
368/// predicate (`:in_episode` / `:after_episode` / `:before_episode`)
369/// last as a `committed_at`-based prune.
370fn collect_candidates(
371    pipeline: &Pipeline,
372    predicates: &Predicates,
373    temporal: TemporalQuery,
374) -> Vec<CanonicalRecord> {
375    // A NoMatch on either symbol predicate guarantees an empty
376    // result — the agent referenced an unknown name, which the spec
377    // treats as "no memories at this key" rather than an error.
378    if predicates.subject.is_no_match() || predicates.predicate.is_no_match() {
379        return Vec::new();
380    }
381    // Similarly, an Episode-scoped predicate that names an unknown
382    // Episode returns empty rather than erroring.
383    if predicates
384        .episode
385        .as_ref()
386        .is_some_and(EpisodeFilter::is_empty_set)
387    {
388        return Vec::new();
389    }
390    let mut candidates: Vec<CanonicalRecord> = Vec::new();
391    if matches!(predicates.kind, None | Some(KindFilter::Sem)) {
392        collect_semantic(
393            pipeline,
394            predicates.subject,
395            predicates.predicate,
396            temporal,
397            &mut candidates,
398        );
399    }
400    if matches!(predicates.kind, None | Some(KindFilter::Pro))
401        && matches!(predicates.subject, SymbolFilter::Absent)
402        && matches!(predicates.predicate, SymbolFilter::Absent)
403    {
404        collect_procedural(pipeline, temporal, &mut candidates);
405    }
406    if matches!(predicates.kind, None | Some(KindFilter::Inf)) {
407        collect_inferential(
408            pipeline,
409            predicates.subject,
410            predicates.predicate,
411            temporal,
412            &mut candidates,
413        );
414    }
415    if predicates.include_projected {
416        collect_projected(
417            pipeline,
418            predicates.kind,
419            predicates.subject,
420            predicates.predicate,
421            &mut candidates,
422        );
423    }
424    if let Some(filter) = predicates.episode.as_ref() {
425        candidates.retain(|r| filter.matches(r.committed_at()));
426    }
427    candidates.sort_by_key(CanonicalRecord::committed_at);
428    candidates
429}
430
431/// Apply the retired-symbol and projected filters. Dropped records
432/// surface in `filtered` only when `:explain_filtered true`;
433/// otherwise they drop silently per spec § 11.1.
434fn apply_filters(
435    candidates: Vec<CanonicalRecord>,
436    table: &SymbolTable,
437    predicates: &Predicates,
438) -> (Vec<CanonicalRecord>, Vec<FilteredMemory>) {
439    let mut kept: Vec<CanonicalRecord> = Vec::with_capacity(candidates.len());
440    let mut filtered: Vec<FilteredMemory> = Vec::new();
441    for record in candidates {
442        let retired_ref = record_references_retired(&record, table);
443        let projected = record_is_projected(&record);
444
445        if retired_ref && !predicates.include_retired {
446            if predicates.explain_filtered {
447                filtered.push(FilteredMemory {
448                    record,
449                    reason: FilterReason::RetiredSymbolExcluded,
450                });
451            }
452            continue;
453        }
454        if projected && !predicates.include_projected {
455            if predicates.explain_filtered {
456                filtered.push(FilteredMemory {
457                    record,
458                    reason: FilterReason::ProjectedExcluded,
459                });
460            }
461            continue;
462        }
463        kept.push(record);
464    }
465    (kept, filtered)
466}
467
468/// Compute flags over the kept record set. Flags describe the
469/// returned rows; they don't double-count filtered drops. The
470/// confidence comparison uses *effective* confidence (spec § 3):
471/// decay-adjusted per the pipeline's `DecayConfig` and relative to
472/// the query's snapshot time.
473fn compute_flags(
474    kept: &[CanonicalRecord],
475    pipeline: &Pipeline,
476    query_committed_at: ClockTime,
477    predicates: &Predicates,
478) -> ReadFlags {
479    let mut flags = ReadFlags::empty();
480    let table = pipeline.table();
481    for record in kept {
482        if record_references_retired(record, table) {
483            flags = flags.with(ReadFlags::STALE_SYMBOL);
484        }
485        if record_is_projected(record) {
486            flags = flags.with(ReadFlags::PROJECTED_PRESENT);
487        }
488        let effective = record_effective_confidence(record, pipeline, query_committed_at);
489        if effective < predicates.confidence_threshold {
490            flags = flags.with(ReadFlags::LOW_CONFIDENCE);
491        }
492    }
493    if predicates.explain_filtered {
494        flags = flags.with(ReadFlags::EXPLAIN_FILTERED_ACTIVE);
495    }
496    flags
497}
498
499/// The empty-pipeline result — no memories exist to match, but the
500/// result must still carry the query's effective clocks and the
501/// correct `EXPLAIN_FILTERED_ACTIVE` flag if the toggle was set.
502fn empty_result(predicates: &Predicates) -> ReadResult {
503    let mut flags = ReadFlags::empty();
504    if predicates.explain_filtered {
505        flags = flags.with(ReadFlags::EXPLAIN_FILTERED_ACTIVE);
506    }
507    ReadResult {
508        records: Vec::new(),
509        framings: Vec::new(),
510        filtered: Vec::new(),
511        flags,
512        as_of: predicates.as_of.unwrap_or_else(epoch_zero),
513        as_committed: predicates.as_committed.unwrap_or_else(epoch_zero),
514        query_committed_at: epoch_zero(),
515    }
516}
517
518/// Walk the pipeline's Semantic history, keep each record the
519/// resolver would classify as authoritative at `temporal`. When
520/// `s` / `p` are set, only memories at that specific `(s, p)` can
521/// match — the resolver already tie-breaks within a pair.
522///
523/// When `s` / `p` are unset we iterate distinct `(s, p)` pairs that
524/// appear in the history and run the resolver once per pair.
525fn collect_semantic(
526    pipeline: &Pipeline,
527    s: SymbolFilter,
528    p: SymbolFilter,
529    temporal: TemporalQuery,
530    out: &mut Vec<CanonicalRecord>,
531) {
532    if let (Some(sub), Some(pred)) = (s.as_id(), p.as_id()) {
533        if let Some(rec) = resolver::resolve_semantic(pipeline, sub, pred, temporal) {
534            out.push(CanonicalRecord::Sem(rec));
535        }
536        return;
537    }
538    // Enumerate distinct (s, p) pairs, applying the filter (which
539    // may be Absent on one axis) as we go.
540    let mut seen: std::collections::BTreeSet<(SymbolId, SymbolId)> =
541        std::collections::BTreeSet::new();
542    for record in pipeline.semantic_records() {
543        if !s.matches(record.s) || !p.matches(record.p) {
544            continue;
545        }
546        let key = (record.s, record.p);
547        if !seen.insert(key) {
548            continue;
549        }
550        if let Some(rec) = resolver::resolve_semantic(pipeline, key.0, key.1, temporal) {
551            out.push(CanonicalRecord::Sem(rec));
552        }
553    }
554}
555
556fn collect_procedural(
557    pipeline: &Pipeline,
558    temporal: TemporalQuery,
559    out: &mut Vec<CanonicalRecord>,
560) {
561    let mut seen_rules: std::collections::BTreeSet<SymbolId> = std::collections::BTreeSet::new();
562    for record in pipeline.procedural_records() {
563        if !seen_rules.insert(record.rule_id) {
564            continue;
565        }
566        if let Some(rec) = resolver::resolve_procedural(pipeline, record.rule_id, temporal) {
567            out.push(CanonicalRecord::Pro(rec));
568        }
569    }
570}
571
572/// Walk the pipeline's Inferential history, keep each record the
573/// resolver classifies as authoritative at `temporal`. Mirrors
574/// [`collect_semantic`]: when `s` / `p` are both pinned we resolve
575/// the single `(s, p)` bucket; otherwise we enumerate distinct
576/// `(s, p)` pairs from the history and run the resolver once per
577/// pair.
578fn collect_inferential(
579    pipeline: &Pipeline,
580    s: SymbolFilter,
581    p: SymbolFilter,
582    temporal: TemporalQuery,
583    out: &mut Vec<CanonicalRecord>,
584) {
585    if let (Some(sub), Some(pred)) = (s.as_id(), p.as_id()) {
586        if let Some(rec) = resolver::resolve_inferential(pipeline, sub, pred, temporal) {
587            out.push(CanonicalRecord::Inf(rec));
588        }
589        return;
590    }
591    let mut seen: std::collections::BTreeSet<(SymbolId, SymbolId)> =
592        std::collections::BTreeSet::new();
593    for record in pipeline.inferential_records() {
594        if !s.matches(record.s) || !p.matches(record.p) {
595            continue;
596        }
597        let key = (record.s, record.p);
598        if !seen.insert(key) {
599            continue;
600        }
601        if let Some(rec) = resolver::resolve_inferential(pipeline, key.0, key.1, temporal) {
602            out.push(CanonicalRecord::Inf(rec));
603        }
604    }
605}
606
607/// Collect memories whose `flags.projected` is set. These bypass the
608/// as-of resolver because projections are by definition future-valid.
609/// Records already present in `out` (via the resolver) are skipped to
610/// avoid duplicates.
611fn collect_projected(
612    pipeline: &Pipeline,
613    kind: Option<KindFilter>,
614    s: SymbolFilter,
615    p: SymbolFilter,
616    out: &mut Vec<CanonicalRecord>,
617) {
618    let existing: std::collections::BTreeSet<SymbolId> = out
619        .iter()
620        .filter_map(|r| match r {
621            CanonicalRecord::Sem(sem) => Some(sem.memory_id),
622            CanonicalRecord::Pro(pro) => Some(pro.memory_id),
623            CanonicalRecord::Inf(inf) => Some(inf.memory_id),
624            CanonicalRecord::Epi(epi) => Some(epi.memory_id),
625            _ => None,
626        })
627        .collect();
628    if matches!(kind, None | Some(KindFilter::Sem)) {
629        for record in pipeline.semantic_records() {
630            if !record.flags.projected {
631                continue;
632            }
633            if !s.matches(record.s) || !p.matches(record.p) {
634                continue;
635            }
636            if existing.contains(&record.memory_id) {
637                continue;
638            }
639            out.push(CanonicalRecord::Sem(record.clone()));
640        }
641    }
642    // Procedural records cannot carry `projected` after the wire-format
643    // split (ir-canonical-form.md § 5): `ProRecord` has no flags byte.
644    // Any future projection semantics for Pro would need its own opcode.
645    if matches!(kind, None | Some(KindFilter::Inf)) {
646        for record in pipeline.inferential_records() {
647            if !record.flags.projected {
648                continue;
649            }
650            if !s.matches(record.s) || !p.matches(record.p) {
651                continue;
652            }
653            if existing.contains(&record.memory_id) {
654                continue;
655            }
656            out.push(CanonicalRecord::Inf(record.clone()));
657        }
658    }
659}
660
661/// Apply `:limit` — truncate the record set and set the `TRUNCATED`
662/// flag on top of `existing_flags` if the limit was hit.
663fn apply_limit(
664    records: Vec<CanonicalRecord>,
665    limit: usize,
666    existing_flags: ReadFlags,
667) -> (Vec<CanonicalRecord>, ReadFlags) {
668    if records.len() > limit {
669        let truncated: Vec<_> = records.into_iter().take(limit).collect();
670        (truncated, existing_flags.with(ReadFlags::TRUNCATED))
671    } else {
672        (records, existing_flags)
673    }
674}
675
676/// True if any `SymbolId` the record references is currently retired
677/// in `table`. Per spec § 7 the check covers top-level symbol fields
678/// plus `Value::Symbol` payloads inside object / trigger / action /
679/// precondition slots. Inferential parent IDs (`derived_from`) are
680/// memory-id symbols; retirement on memory IDs is not meaningful in
681/// v1, but we still check them for spec completeness.
682fn record_references_retired(record: &CanonicalRecord, table: &SymbolTable) -> bool {
683    match record {
684        CanonicalRecord::Sem(r) => sem_has_retired_ref(r, table),
685        CanonicalRecord::Epi(r) => epi_has_retired_ref(r, table),
686        CanonicalRecord::Pro(r) => pro_has_retired_ref(r, table),
687        CanonicalRecord::Inf(r) => inf_has_retired_ref(r, table),
688        // Edge / checkpoint / symbol-event records are never returned
689        // as memory records, so this arm is unreachable in practice.
690        _ => false,
691    }
692}
693
694fn sem_has_retired_ref(r: &SemRecord, table: &SymbolTable) -> bool {
695    table.is_retired(r.s)
696        || table.is_retired(r.p)
697        || table.is_retired(r.source)
698        || value_has_retired_symbol(&r.o, table)
699}
700
701fn epi_has_retired_ref(r: &EpiRecord, table: &SymbolTable) -> bool {
702    table.is_retired(r.event_id)
703        || table.is_retired(r.kind)
704        || table.is_retired(r.location)
705        || table.is_retired(r.source)
706        || r.participants.iter().any(|p| table.is_retired(*p))
707}
708
709fn pro_has_retired_ref(r: &ProRecord, table: &SymbolTable) -> bool {
710    table.is_retired(r.rule_id)
711        || table.is_retired(r.scope)
712        || table.is_retired(r.source)
713        || value_has_retired_symbol(&r.trigger, table)
714        || value_has_retired_symbol(&r.action, table)
715        || r.precondition
716            .as_ref()
717            .is_some_and(|v| value_has_retired_symbol(v, table))
718}
719
720fn inf_has_retired_ref(r: &InfRecord, table: &SymbolTable) -> bool {
721    table.is_retired(r.s)
722        || table.is_retired(r.p)
723        || table.is_retired(r.method)
724        || value_has_retired_symbol(&r.o, table)
725        || r.derived_from.iter().any(|p| table.is_retired(*p))
726}
727
728fn value_has_retired_symbol(v: &Value, table: &SymbolTable) -> bool {
729    matches!(v, Value::Symbol(id) if table.is_retired(*id))
730}
731
732/// True if the record carries the `projected` flag. Only Sem and Inf
733/// carry a flags byte on the wire (ir-canonical-form.md § 5); Epi and
734/// Pro have no flags and therefore cannot be projected.
735fn record_is_projected(record: &CanonicalRecord) -> bool {
736    match record {
737        CanonicalRecord::Sem(r) => r.flags.projected,
738        CanonicalRecord::Inf(r) => r.flags.projected,
739        _ => false,
740    }
741}
742
743/// Compute the effective (decay-adjusted) confidence for a memory
744/// record at the query's snapshot time.
745///
746/// The confidence-decay spec (§ 3) defines effective as
747/// `stored × decay_factor(elapsed, half_life)` for non-Inferential
748/// memories, modulo pin / authoritative short-circuits (not yet
749/// wired — see PLAN.md Step 4). For Inferential memories the spec
750/// says decay composes from current parent effective confidences;
751/// that composition lands with the Inferential resolver, so for
752/// now we return stored.
753///
754/// `elapsed_ms` is `query_committed_at - valid_at` in milliseconds.
755/// The source kind is looked up from the source symbol's canonical
756/// name via [`source_kind_from_name`]; unknown symbols fall back to
757/// `SourceKind::Observation`, the same default the semantic stage
758/// uses at write time.
759fn record_effective_confidence(
760    record: &CanonicalRecord,
761    pipeline: &Pipeline,
762    query_committed_at: ClockTime,
763) -> Confidence {
764    let table = pipeline.table();
765    let decay_config = pipeline.decay_config();
766    let (stored, memory_kind, source_id, valid_at) = match record {
767        CanonicalRecord::Sem(r) => (
768            r.confidence,
769            MemoryKindTag::Semantic,
770            r.source,
771            r.clocks.valid_at,
772        ),
773        CanonicalRecord::Epi(r) => (r.confidence, MemoryKindTag::Episodic, r.source, r.at_time),
774        CanonicalRecord::Pro(r) => (
775            r.confidence,
776            MemoryKindTag::Procedural,
777            r.source,
778            r.clocks.valid_at,
779        ),
780        // Inferential decay composes from parents; defer to the
781        // Inferential resolver work (#29). For now use stored as a
782        // safe upper bound.
783        CanonicalRecord::Inf(r) => return r.confidence,
784        _ => return Confidence::ONE,
785    };
786
787    let source_kind = table.entry(source_id).map_or(SourceKind::Observation, |e| {
788        source_kind_from_name(e.canonical_name.as_str())
789    });
790    let elapsed_ms = query_committed_at
791        .as_millis()
792        .saturating_sub(valid_at.as_millis());
793    let memory_id = record_memory_id(record);
794    let pinned = memory_id.is_some_and(|id| pipeline.is_pinned(id));
795    let authoritative = memory_id.is_some_and(|id| pipeline.is_authoritative(id));
796    let flags = DecayFlags {
797        pinned,
798        authoritative,
799    };
800    effective_confidence(
801        stored,
802        elapsed_ms,
803        memory_kind,
804        source_kind,
805        flags,
806        decay_config,
807    )
808}
809
810/// Per-record framing. Priority order (spec § 5):
811/// - `Projected` if the record has `flags.projected` set — dominates
812///   other classifications because the agent needs to know the
813///   memory is intent, not current truth.
814/// - `Authoritative { set_by }` if the record is pinned or
815///   operator-authoritative. Pin takes precedence over operator
816///   flag when both are set (it's the narrower, agent-owned source
817///   — the operator flag is broader).
818/// - `Historical` if the query asked for `:as_of T` with T predating
819///   `query_committed_at`.
820/// - `Advisory` otherwise.
821fn compute_framing(
822    record: &CanonicalRecord,
823    pipeline: &Pipeline,
824    as_of: Option<ClockTime>,
825    query_committed_at: ClockTime,
826) -> Framing {
827    if record_is_projected(record) {
828        return Framing::Projected;
829    }
830    if let Some(mem_id) = record_memory_id(record) {
831        if pipeline.is_pinned(mem_id) {
832            return Framing::Authoritative {
833                set_by: FramingSource::AgentPinned,
834            };
835        }
836        if pipeline.is_authoritative(mem_id) {
837            return Framing::Authoritative {
838                set_by: FramingSource::OperatorAuthoritative,
839            };
840        }
841    }
842    if as_of.is_some_and(|t| t < query_committed_at) {
843        return Framing::Historical;
844    }
845    Framing::Advisory
846}
847
848/// Extract the `memory_id` from a record — used to check the
849/// pipeline's pin / authoritative sets.
850fn record_memory_id(record: &CanonicalRecord) -> Option<SymbolId> {
851    match record {
852        CanonicalRecord::Sem(r) => Some(r.memory_id),
853        CanonicalRecord::Epi(r) => Some(r.memory_id),
854        CanonicalRecord::Pro(r) => Some(r.memory_id),
855        CanonicalRecord::Inf(r) => Some(r.memory_id),
856        _ => None,
857    }
858}
859
860/// Default `:limit` per `read-protocol.md` § 4.2.
861const DEFAULT_LIMIT: usize = 1000;
862
863/// Parsed keyword arguments, resolved to pipeline state.
864///
865/// The four boolean toggles mirror the spec's independent read-side
866/// knobs (`include_retired`, `include_projected`, `explain_filtered`,
867/// `show_framing`). Collapsing them into a state machine would hide
868/// the one-to-one mapping with spec § 4.1, so we allow the pedantic
869/// bool-count here.
870#[allow(clippy::struct_excessive_bools)]
871struct Predicates {
872    kind: Option<KindFilter>,
873    subject: SymbolFilter,
874    predicate: SymbolFilter,
875    as_of: Option<ClockTime>,
876    as_committed: Option<ClockTime>,
877    limit: Option<usize>,
878    include_retired: bool,
879    include_projected: bool,
880    confidence_threshold: Confidence,
881    explain_filtered: bool,
882    show_framing: bool,
883    episode: Option<EpisodeFilter>,
884}
885
886/// How an Episode-scoped read predicate restricts the result set.
887/// See `read-protocol.md` § 4.1 and 6.
888#[derive(Clone, Debug, PartialEq, Eq)]
889enum EpisodeFilter {
890    /// Keep memories whose `committed_at` equals `at` — i.e. whose
891    /// Episode is the one the user specified.
892    In { at: ClockTime },
893    /// Keep memories whose `committed_at` > `at`.
894    After { at: ClockTime },
895    /// Keep memories whose `committed_at` < `at`.
896    Before { at: ClockTime },
897    /// Keep memories whose `committed_at` equals any Episode in
898    /// the chain `@E → parent → grandparent → …`. Backed by
899    /// `Pipeline::episode_chain` walking `parent_episode`.
900    Chain { ats: Vec<ClockTime> },
901    /// Predicate referenced an Episode symbol that the pipeline has
902    /// no record of — yields an empty result set rather than an
903    /// error, matching the `NoMatch` semantics of `:s` / `:p`.
904    UnknownEpisode,
905}
906
907impl EpisodeFilter {
908    fn matches(&self, committed_at: ClockTime) -> bool {
909        match self {
910            Self::In { at } => committed_at == *at,
911            Self::After { at } => committed_at > *at,
912            Self::Before { at } => committed_at < *at,
913            Self::Chain { ats } => ats.contains(&committed_at),
914            Self::UnknownEpisode => false,
915        }
916    }
917
918    fn is_empty_set(&self) -> bool {
919        matches!(self, Self::UnknownEpisode)
920    }
921}
922
923/// Tri-state for a symbol-valued predicate:
924/// - `Absent` — the predicate wasn't set; don't filter.
925/// - `Match(id)` — the predicate resolved to `id`; filter to memories at that key.
926/// - `NoMatch` — the predicate was set but its symbol doesn't exist
927///   in the workspace; the result is necessarily empty.
928#[derive(Copy, Clone, Debug, PartialEq, Eq)]
929enum SymbolFilter {
930    Absent,
931    Match(SymbolId),
932    NoMatch,
933}
934
935impl SymbolFilter {
936    fn from_lookup(resolved: Option<SymbolId>, set: bool) -> Self {
937        match (set, resolved) {
938            (false, _) => Self::Absent,
939            (true, Some(id)) => Self::Match(id),
940            (true, None) => Self::NoMatch,
941        }
942    }
943
944    /// `true` if this filter guarantees zero matches.
945    fn is_no_match(self) -> bool {
946        matches!(self, Self::NoMatch)
947    }
948
949    fn matches(self, id: SymbolId) -> bool {
950        match self {
951            Self::Absent => true,
952            Self::Match(expected) => id == expected,
953            Self::NoMatch => false,
954        }
955    }
956
957    fn as_id(self) -> Option<SymbolId> {
958        // `NoMatch` is short-circuited by `guaranteed_empty` in the
959        // caller before we reach `collect_semantic`, so in practice
960        // the `_ => None` arm only fires for `Absent`. Keeping it
961        // total rather than `unreachable!()` so a future caller that
962        // skips the short-circuit can't panic.
963        match self {
964            Self::Match(id) => Some(id),
965            _ => None,
966        }
967    }
968}
969
970fn parse_predicates(
971    pipeline: &Pipeline,
972    keywords: parse::KeywordArgs,
973) -> Result<Predicates, ReadError> {
974    let table = pipeline.table();
975    let mut out = Predicates {
976        kind: None,
977        subject: SymbolFilter::Absent,
978        predicate: SymbolFilter::Absent,
979        as_of: None,
980        as_committed: None,
981        limit: None,
982        include_retired: false,
983        include_projected: false,
984        confidence_threshold: default_confidence_threshold(),
985        explain_filtered: false,
986        show_framing: false,
987        episode: None,
988    };
989    let mut debug_mode = false;
990
991    for (key, value) in keywords {
992        match key.as_str() {
993            "kind" => out.kind = Some(parse_kind(&value)?),
994            "s" => {
995                out.subject = SymbolFilter::from_lookup(resolve_symbol(table, &value, "s")?, true);
996            }
997            "p" => {
998                out.predicate =
999                    SymbolFilter::from_lookup(resolve_symbol(table, &value, "p")?, true);
1000            }
1001            "as_of" => out.as_of = Some(parse_timestamp(&value, "as_of")?),
1002            "as_committed" => out.as_committed = Some(parse_timestamp(&value, "as_committed")?),
1003            "limit" => out.limit = Some(parse_limit(&value)?),
1004            "include_retired" => out.include_retired = parse_bool(&value, "include_retired")?,
1005            "include_projected" => out.include_projected = parse_bool(&value, "include_projected")?,
1006            "confidence_threshold" => {
1007                out.confidence_threshold = parse_confidence(&value)?;
1008            }
1009            "explain_filtered" => out.explain_filtered = parse_bool(&value, "explain_filtered")?,
1010            "show_framing" => out.show_framing = parse_bool(&value, "show_framing")?,
1011            "debug_mode" => debug_mode = parse_bool(&value, "debug_mode")?,
1012            "in_episode" | "after_episode" | "before_episode" | "episode_chain" => {
1013                // Only one Episode-scoped predicate can be active at
1014                // a time; rejecting the combination is friendlier
1015                // than letting the last one silently overwrite.
1016                if out.episode.is_some() {
1017                    return Err(ReadError::InvalidPredicate {
1018                        keyword: "in_episode / after_episode / before_episode / episode_chain",
1019                        reason: "at most one Episode-scoped predicate per query".into(),
1020                    });
1021                }
1022                out.episode = Some(parse_episode_filter(pipeline, &key, &value)?);
1023            }
1024            // Every other predicate — known-but-unwired or truly
1025            // unknown — surfaces as `UnsupportedPredicate`. The
1026            // `static_key_name` helper returns a stable static name
1027            // for the known ones and `"unknown_predicate"` otherwise.
1028            _ => {
1029                return Err(ReadError::UnsupportedPredicate {
1030                    predicate: static_key_name(&key),
1031                });
1032            }
1033        }
1034    }
1035
1036    // `:debug_mode true` implies both surfacing toggles per spec
1037    // § 4.1. Explicit per-toggle values can still turn things on;
1038    // debug mode is pure disjunction (OR), never disables.
1039    if debug_mode {
1040        out.explain_filtered = true;
1041        out.show_framing = true;
1042    }
1043
1044    Ok(out)
1045}
1046
1047/// Resolve `:in_episode @E` / `:after_episode @E` /
1048/// `:before_episode @E` / `:episode_chain @E` against the pipeline's
1049/// registered Episodes. An unknown-to-pipeline symbol yields
1050/// `UnknownEpisode`, producing an empty result set (matching the
1051/// `:s` / `:p` `NoMatch` semantics per spec § 4.1).
1052fn parse_episode_filter(
1053    pipeline: &Pipeline,
1054    keyword: &str,
1055    value: &RawValue,
1056) -> Result<EpisodeFilter, ReadError> {
1057    let static_key = match keyword {
1058        "in_episode" => "in_episode",
1059        "after_episode" => "after_episode",
1060        "before_episode" => "before_episode",
1061        "episode_chain" => "episode_chain",
1062        _ => "unknown_predicate",
1063    };
1064    let Some(id) = resolve_symbol(pipeline.table(), value, static_key)? else {
1065        return Ok(EpisodeFilter::UnknownEpisode);
1066    };
1067    let Some(at) = pipeline.episode_committed_at(id) else {
1068        return Ok(EpisodeFilter::UnknownEpisode);
1069    };
1070    Ok(match keyword {
1071        "in_episode" => EpisodeFilter::In { at },
1072        "after_episode" => EpisodeFilter::After { at },
1073        "before_episode" => EpisodeFilter::Before { at },
1074        // Precondition guaranteed by caller — only these four
1075        // keywords reach this helper.
1076        _ => {
1077            let ats: Vec<ClockTime> = pipeline
1078                .episode_chain(id)
1079                .filter_map(|ep| pipeline.episode_committed_at(ep))
1080                .collect();
1081            EpisodeFilter::Chain { ats }
1082        }
1083    })
1084}
1085
1086/// Default `:confidence_threshold` per spec § 4.2 — 0.5.
1087fn default_confidence_threshold() -> Confidence {
1088    // 0.5 is exactly representable in u16 fixed-point
1089    // (u16::MAX / 2 rounded), so try_from_f32 cannot fail.
1090    #[allow(clippy::expect_used)]
1091    Confidence::try_from_f32(0.5).expect("0.5 is a valid Confidence")
1092}
1093
1094fn parse_kind(value: &RawValue) -> Result<KindFilter, ReadError> {
1095    let name = match value {
1096        RawValue::Bareword(s) => s.as_str(),
1097        RawValue::RawSymbol(RawSymbolName { name, .. }) => name.as_str(),
1098        _ => {
1099            return Err(ReadError::InvalidPredicate {
1100                keyword: "kind",
1101                reason: "expected a bareword (sem, pro, epi, inf)".into(),
1102            })
1103        }
1104    };
1105    match name {
1106        "sem" => Ok(KindFilter::Sem),
1107        "pro" => Ok(KindFilter::Pro),
1108        "epi" => Ok(KindFilter::Epi),
1109        "inf" => Ok(KindFilter::Inf),
1110        other => Err(ReadError::InvalidKind {
1111            got: other.to_string(),
1112        }),
1113    }
1114}
1115
1116/// Resolve a symbol-valued predicate (`:s @X`, `:p @Y`) against the
1117/// pipeline's table. Unknown symbols return `Ok(None)` — the query
1118/// simply won't match any memories at that key, which is the
1119/// correct no-crash read semantic.
1120fn resolve_symbol(
1121    table: &SymbolTable,
1122    value: &RawValue,
1123    keyword: &'static str,
1124) -> Result<Option<SymbolId>, ReadError> {
1125    let name: &str = match value {
1126        RawValue::RawSymbol(sym) => sym.as_str(),
1127        RawValue::TypedSymbol { name, .. } => name.as_str(),
1128        RawValue::Bareword(text) => text.as_str(),
1129        _ => {
1130            return Err(ReadError::InvalidPredicate {
1131                keyword,
1132                reason: "expected a symbol reference like @name".into(),
1133            })
1134        }
1135    };
1136    Ok(table.lookup(name))
1137}
1138
1139fn parse_timestamp(value: &RawValue, keyword: &'static str) -> Result<ClockTime, ReadError> {
1140    match value {
1141        RawValue::Timestamp(t) => Ok(*t),
1142        _ => Err(ReadError::InvalidPredicate {
1143            keyword,
1144            reason: "expected an ISO-8601 timestamp".into(),
1145        }),
1146    }
1147}
1148
1149fn parse_limit(value: &RawValue) -> Result<usize, ReadError> {
1150    match value {
1151        RawValue::Integer(n) if *n >= 0 => {
1152            usize::try_from(*n).map_err(|_| ReadError::InvalidPredicate {
1153                keyword: "limit",
1154                reason: "limit exceeds usize".into(),
1155            })
1156        }
1157        _ => Err(ReadError::InvalidPredicate {
1158            keyword: "limit",
1159            reason: "expected a non-negative integer".into(),
1160        }),
1161    }
1162}
1163
1164fn parse_bool(value: &RawValue, keyword: &'static str) -> Result<bool, ReadError> {
1165    match value {
1166        RawValue::Boolean(b) => Ok(*b),
1167        _ => Err(ReadError::InvalidPredicate {
1168            keyword,
1169            reason: "expected a boolean".into(),
1170        }),
1171    }
1172}
1173
1174fn parse_confidence(value: &RawValue) -> Result<Confidence, ReadError> {
1175    let f = match value {
1176        RawValue::Float(f) => *f,
1177        // Accept `1` / `0` as shorthand for the extremes.
1178        RawValue::Integer(n) if *n == 0 || *n == 1 => f64::from(i32::try_from(*n).unwrap_or(0)),
1179        _ => {
1180            return Err(ReadError::InvalidPredicate {
1181                keyword: "confidence_threshold",
1182                reason: "expected a float in [0.0, 1.0]".into(),
1183            });
1184        }
1185    };
1186    #[allow(clippy::cast_possible_truncation)]
1187    Confidence::try_from_f32(f as f32).map_err(|_| ReadError::InvalidPredicate {
1188        keyword: "confidence_threshold",
1189        reason: "expected a float in [0.0, 1.0]".into(),
1190    })
1191}
1192
1193/// Fallback used for the `None`-pipeline branch. The only failing
1194/// input to `ClockTime::try_from_millis` is `u64::MAX`; `0` always
1195/// succeeds. The `expect` lives here (rather than bubbling up the
1196/// `Result`) because an empty pipeline's `ReadResult` already
1197/// carries zero records — the clocks are diagnostic, not load-
1198/// bearing, so there's no information loss in panicking on an
1199/// impossible branch.
1200#[allow(clippy::expect_used)]
1201fn epoch_zero() -> ClockTime {
1202    ClockTime::try_from_millis(0).expect("0ms is always a valid ClockTime")
1203}
1204
1205/// Map a dynamic keyword string back to the `&'static str` slot
1206/// used in the `UnsupportedPredicate` variant. Only *known-but-
1207/// unwired* keywords get a stable name here; wired keywords are
1208/// handled upstream in `parse_predicates`. Unknown keywords fall
1209/// back to a generic label.
1210fn static_key_name(key: &str) -> &'static str {
1211    match key {
1212        "o" => "o",
1213        "read_after" => "read_after",
1214        "timeout_ms" => "timeout_ms",
1215        _ => "unknown_predicate",
1216    }
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221    use super::*;
1222
1223    fn now() -> ClockTime {
1224        ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel")
1225    }
1226
1227    fn compile(pipe: &mut Pipeline, src: &str) {
1228        pipe.compile_batch(src, now()).expect("compile");
1229    }
1230
1231    const SEM_ALICE: &str = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
1232    // Distinct (s, p) pair so both memories stay current (no supersession conflict).
1233    const SEM_TRUSTS: &str = "(sem @alice @trusts @carol :src @observation :c 0.8 :v 2024-01-15)";
1234    const PRO_RULE: &str = r#"(pro @rule_route "agent_write" "route_via_librarian"
1235        :scp @mimir :src @policy :c 1.0)"#;
1236
1237    #[test]
1238    fn empty_pipeline_returns_empty_result() {
1239        let pipe = Pipeline::new();
1240        let got = pipe.execute_query("(query :s @alice :p @knows)").unwrap();
1241        assert!(got.records.is_empty());
1242        assert_eq!(got.flags, ReadFlags::empty());
1243    }
1244
1245    #[test]
1246    fn exact_sp_match_returns_current_memory() {
1247        let mut pipe = Pipeline::new();
1248        compile(&mut pipe, SEM_ALICE);
1249        let got = pipe
1250            .execute_query("(query :s @alice :p @knows)")
1251            .expect("query");
1252        assert_eq!(got.records.len(), 1);
1253        let CanonicalRecord::Sem(sem) = &got.records[0] else {
1254            panic!("expected Sem");
1255        };
1256        let alice = pipe.table().lookup("alice").unwrap();
1257        assert_eq!(sem.s, alice);
1258    }
1259
1260    #[test]
1261    fn unknown_symbol_returns_empty_not_error() {
1262        let mut pipe = Pipeline::new();
1263        compile(&mut pipe, SEM_ALICE);
1264        let got = pipe
1265            .execute_query("(query :s @nonexistent :p @knows)")
1266            .expect("unknown symbol is OK");
1267        assert!(got.records.is_empty());
1268    }
1269
1270    #[test]
1271    fn unscoped_query_returns_current_across_pairs() {
1272        let mut pipe = Pipeline::new();
1273        compile(&mut pipe, SEM_ALICE);
1274        compile(&mut pipe, SEM_TRUSTS);
1275        let got = pipe.execute_query("(query)").expect("all");
1276        assert_eq!(got.records.len(), 2);
1277    }
1278
1279    #[test]
1280    fn kind_filter_isolates_sem_from_pro() {
1281        let mut pipe = Pipeline::new();
1282        compile(&mut pipe, SEM_ALICE);
1283        compile(&mut pipe, PRO_RULE);
1284
1285        let sem_only = pipe.execute_query("(query :kind sem)").expect("sem");
1286        assert_eq!(sem_only.records.len(), 1);
1287        assert!(matches!(sem_only.records[0], CanonicalRecord::Sem(_)));
1288
1289        let pro_only = pipe.execute_query("(query :kind pro)").expect("pro");
1290        assert_eq!(pro_only.records.len(), 1);
1291        assert!(matches!(pro_only.records[0], CanonicalRecord::Pro(_)));
1292    }
1293
1294    #[test]
1295    fn kind_epi_returns_empty_in_71_scope() {
1296        let mut pipe = Pipeline::new();
1297        compile(&mut pipe, SEM_ALICE);
1298        let got = pipe.execute_query("(query :kind epi)").expect("epi");
1299        assert!(got.records.is_empty());
1300    }
1301
1302    #[test]
1303    fn invalid_kind_bareword_is_rejected() {
1304        let pipe = Pipeline::new();
1305        let err = pipe
1306            .execute_query("(query :kind bogus)")
1307            .expect_err("bad kind");
1308        assert!(matches!(err, ReadError::InvalidKind { .. }));
1309    }
1310
1311    #[test]
1312    fn as_of_past_valid_time_returns_earlier_record() {
1313        let mut pipe = Pipeline::new();
1314        compile(&mut pipe, SEM_ALICE);
1315        compile(
1316            &mut pipe,
1317            "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)",
1318        );
1319
1320        // Query at 2024-02-01 — @bob was current, @carol not yet.
1321        let got = pipe
1322            .execute_query("(query :s @alice :p @knows :as_of 2024-02-01)")
1323            .expect("as_of");
1324        let CanonicalRecord::Sem(sem) = &got.records[0] else {
1325            panic!();
1326        };
1327        let bob = pipe.table().lookup("bob").unwrap();
1328        assert!(matches!(&sem.o, crate::Value::Symbol(id) if *id == bob));
1329
1330        // Current read returns @carol.
1331        let current = pipe
1332            .execute_query("(query :s @alice :p @knows)")
1333            .expect("current");
1334        let CanonicalRecord::Sem(sem) = &current.records[0] else {
1335            panic!();
1336        };
1337        let carol = pipe.table().lookup("carol").unwrap();
1338        assert!(matches!(&sem.o, crate::Value::Symbol(id) if *id == carol));
1339    }
1340
1341    #[test]
1342    fn limit_truncates_and_sets_flag() {
1343        let mut pipe = Pipeline::new();
1344        compile(&mut pipe, SEM_ALICE);
1345        compile(&mut pipe, SEM_TRUSTS);
1346        let got = pipe.execute_query("(query :limit 1)").expect("limit");
1347        assert_eq!(got.records.len(), 1);
1348        assert!(got.flags.contains(ReadFlags::TRUNCATED));
1349    }
1350
1351    #[test]
1352    fn limit_not_hit_leaves_flag_clear() {
1353        let mut pipe = Pipeline::new();
1354        compile(&mut pipe, SEM_ALICE);
1355        let got = pipe.execute_query("(query :limit 10)").expect("limit");
1356        assert_eq!(got.records.len(), 1);
1357        assert!(!got.flags.contains(ReadFlags::TRUNCATED));
1358    }
1359
1360    #[test]
1361    fn unsupported_predicate_returns_typed_error() {
1362        let mut pipe = Pipeline::new();
1363        compile(&mut pipe, SEM_ALICE);
1364        // `:read_after` is known-but-unwired — wire-architecture
1365        // work (Step 6) needs to land before it becomes useful.
1366        let err = pipe
1367            .execute_query("(query :read_after @foo)")
1368            .expect_err("unsupported");
1369        assert!(matches!(
1370            err,
1371            ReadError::UnsupportedPredicate {
1372                predicate: "read_after"
1373            }
1374        ));
1375    }
1376
1377    #[test]
1378    fn s_predicate_with_kind_pro_is_rejected() {
1379        let mut pipe = Pipeline::new();
1380        compile(&mut pipe, SEM_ALICE);
1381        compile(&mut pipe, PRO_RULE);
1382        let err = pipe
1383            .execute_query("(query :s @alice :kind pro)")
1384            .expect_err("s + kind pro must reject");
1385        assert!(matches!(
1386            err,
1387            ReadError::IncompatiblePredicates {
1388                predicate: "s",
1389                kind: KindFilter::Pro,
1390            }
1391        ));
1392    }
1393
1394    #[test]
1395    fn p_predicate_with_kind_epi_is_rejected() {
1396        let mut pipe = Pipeline::new();
1397        compile(&mut pipe, SEM_ALICE);
1398        let err = pipe
1399            .execute_query("(query :p @knows :kind epi)")
1400            .expect_err("p + kind epi must reject");
1401        assert!(matches!(
1402            err,
1403            ReadError::IncompatiblePredicates {
1404                predicate: "p",
1405                kind: KindFilter::Epi,
1406            }
1407        ));
1408    }
1409
1410    #[test]
1411    fn write_path_query_still_unsupported() {
1412        // The (query ...) form going through `compile_batch` still
1413        // returns EmitError::Unsupported — writes don't execute
1414        // reads. This keeps the read path cleanly separated until
1415        // the read-protocol spec defines how the two interact.
1416        let mut pipe = Pipeline::new();
1417        let err = pipe
1418            .compile_batch("(query :s @alice :p @knows)", now())
1419            .expect_err("write path rejects query");
1420        assert!(matches!(
1421            err,
1422            crate::pipeline::PipelineError::Emit(crate::pipeline::EmitError::Unsupported {
1423                form: "query"
1424            })
1425        ));
1426    }
1427
1428    #[test]
1429    fn multiple_forms_rejected() {
1430        let pipe = Pipeline::new();
1431        let err = pipe
1432            .execute_query("(query) (query)")
1433            .expect_err("two forms");
1434        assert!(matches!(err, ReadError::NotASingleQuery { count: 2 }));
1435    }
1436
1437    #[test]
1438    fn non_query_form_rejected() {
1439        let pipe = Pipeline::new();
1440        let err = pipe
1441            .execute_query("(sem @a @b @c :src @observation :c 0.8 :v 2024-01-15)")
1442            .expect_err("not a query");
1443        assert!(matches!(err, ReadError::NotAQuery));
1444    }
1445
1446    // ----- 7.2: filter predicates + flags + framing + filtered -----
1447
1448    const SEM_LOW_CONF: &str = "(sem @mira @likes @tea :src @self_report :c 0.3 :v 2024-01-15)";
1449    const SEM_PROJECTED: &str =
1450        "(sem @plan @deploys @mimir :src @agent_instruction :c 0.9 :v 2099-01-01 :projected true)";
1451
1452    #[test]
1453    fn retired_symbol_default_drops_record() {
1454        let mut pipe = Pipeline::new();
1455        compile(&mut pipe, SEM_ALICE);
1456        // Retire @bob — SEM_ALICE references it as the object.
1457        compile(&mut pipe, "(retire @bob)");
1458        let got = pipe.execute_query("(query)").expect("query");
1459        assert!(
1460            got.records.is_empty(),
1461            "retired-symbol record should drop by default, got {:?}",
1462            got.records
1463        );
1464        assert!(!got.flags.contains(ReadFlags::STALE_SYMBOL));
1465    }
1466
1467    #[test]
1468    fn include_retired_keeps_record_and_sets_flag() {
1469        let mut pipe = Pipeline::new();
1470        compile(&mut pipe, SEM_ALICE);
1471        compile(&mut pipe, "(retire @bob)");
1472        let got = pipe
1473            .execute_query("(query :include_retired true)")
1474            .expect("query");
1475        assert_eq!(got.records.len(), 1);
1476        assert!(got.flags.contains(ReadFlags::STALE_SYMBOL));
1477    }
1478
1479    #[test]
1480    fn projected_default_drops_record() {
1481        let mut pipe = Pipeline::new();
1482        compile(&mut pipe, SEM_ALICE);
1483        compile(&mut pipe, SEM_PROJECTED);
1484        let got = pipe.execute_query("(query)").expect("query");
1485        // Only SEM_ALICE should come back; SEM_PROJECTED is filtered.
1486        assert_eq!(got.records.len(), 1);
1487        assert!(!got.flags.contains(ReadFlags::PROJECTED_PRESENT));
1488    }
1489
1490    #[test]
1491    fn include_projected_keeps_record_and_sets_flag() {
1492        let mut pipe = Pipeline::new();
1493        compile(&mut pipe, SEM_ALICE);
1494        compile(&mut pipe, SEM_PROJECTED);
1495        let got = pipe
1496            .execute_query("(query :include_projected true)")
1497            .expect("query");
1498        assert_eq!(got.records.len(), 2);
1499        assert!(got.flags.contains(ReadFlags::PROJECTED_PRESENT));
1500    }
1501
1502    #[test]
1503    fn low_confidence_flag_fires_on_default_threshold() {
1504        let mut pipe = Pipeline::new();
1505        compile(&mut pipe, SEM_LOW_CONF);
1506        let got = pipe.execute_query("(query)").expect("query");
1507        assert_eq!(got.records.len(), 1);
1508        // 0.3 < 0.5 default → flag set.
1509        assert!(got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1510    }
1511
1512    #[test]
1513    fn confidence_threshold_override_tightens_flag() {
1514        let mut pipe = Pipeline::new();
1515        compile(&mut pipe, SEM_ALICE); // c=0.8
1516                                       // At threshold 0.9, 0.8 should flag as low.
1517        let got = pipe
1518            .execute_query("(query :confidence_threshold 0.9)")
1519            .expect("query");
1520        assert!(got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1521        // Default (0.5) — 0.8 should NOT flag.
1522        let got_default = pipe.execute_query("(query)").expect("query");
1523        assert!(!got_default.flags.contains(ReadFlags::LOW_CONFIDENCE));
1524    }
1525
1526    #[test]
1527    fn confidence_threshold_flag_only_does_not_filter() {
1528        let mut pipe = Pipeline::new();
1529        compile(&mut pipe, SEM_LOW_CONF); // c=0.3
1530        let got = pipe.execute_query("(query)").expect("query");
1531        // The low-confidence record is kept; flag just warns.
1532        assert_eq!(got.records.len(), 1);
1533        assert!(got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1534    }
1535
1536    #[test]
1537    fn explain_filtered_surfaces_dropped_records() {
1538        let mut pipe = Pipeline::new();
1539        compile(&mut pipe, SEM_ALICE);
1540        compile(&mut pipe, "(retire @bob)");
1541        let got = pipe
1542            .execute_query("(query :explain_filtered true)")
1543            .expect("query");
1544        assert!(got.records.is_empty());
1545        assert_eq!(got.filtered.len(), 1);
1546        assert_eq!(got.filtered[0].reason, FilterReason::RetiredSymbolExcluded);
1547        assert!(got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1548    }
1549
1550    #[test]
1551    fn explain_filtered_off_keeps_filtered_empty() {
1552        let mut pipe = Pipeline::new();
1553        compile(&mut pipe, SEM_ALICE);
1554        compile(&mut pipe, "(retire @bob)");
1555        let got = pipe.execute_query("(query)").expect("query");
1556        assert!(got.filtered.is_empty());
1557        assert!(!got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1558    }
1559
1560    #[test]
1561    fn show_framing_populates_per_record() {
1562        let mut pipe = Pipeline::new();
1563        compile(&mut pipe, SEM_ALICE);
1564        let got = pipe
1565            .execute_query("(query :show_framing true)")
1566            .expect("query");
1567        assert_eq!(got.framings.len(), got.records.len());
1568        assert_eq!(got.framings[0], Framing::Advisory);
1569    }
1570
1571    #[test]
1572    fn show_framing_off_leaves_framings_empty() {
1573        let mut pipe = Pipeline::new();
1574        compile(&mut pipe, SEM_ALICE);
1575        let got = pipe.execute_query("(query)").expect("query");
1576        assert!(got.framings.is_empty());
1577    }
1578
1579    #[test]
1580    fn framing_historical_when_as_of_is_past() {
1581        let mut pipe = Pipeline::new();
1582        compile(&mut pipe, SEM_ALICE);
1583        let got = pipe
1584            .execute_query("(query :as_of 2024-01-20 :show_framing true)")
1585            .expect("query");
1586        assert_eq!(got.framings.len(), 1);
1587        assert_eq!(got.framings[0], Framing::Historical);
1588    }
1589
1590    #[test]
1591    fn framing_projected_for_projected_record() {
1592        let mut pipe = Pipeline::new();
1593        compile(&mut pipe, SEM_PROJECTED);
1594        let got = pipe
1595            .execute_query("(query :include_projected true :show_framing true)")
1596            .expect("query");
1597        assert_eq!(got.framings.len(), 1);
1598        assert_eq!(got.framings[0], Framing::Projected);
1599    }
1600
1601    #[test]
1602    fn debug_mode_enables_both_toggles() {
1603        let mut pipe = Pipeline::new();
1604        compile(&mut pipe, SEM_ALICE);
1605        compile(&mut pipe, "(retire @bob)");
1606        let got = pipe
1607            .execute_query("(query :debug_mode true)")
1608            .expect("query");
1609        // Filtered surfaces AND framings parallels records.
1610        assert!(got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1611        assert_eq!(got.filtered.len(), 1);
1612        // records is empty (retired dropped by default); framings
1613        // should also be empty and same length as records.
1614        assert_eq!(got.framings.len(), got.records.len());
1615    }
1616
1617    #[test]
1618    fn include_retired_with_explain_filtered_shows_no_filtered() {
1619        let mut pipe = Pipeline::new();
1620        compile(&mut pipe, SEM_ALICE);
1621        compile(&mut pipe, "(retire @bob)");
1622        let got = pipe
1623            .execute_query("(query :include_retired true :explain_filtered true)")
1624            .expect("query");
1625        // Record is kept, so nothing gets filtered — filtered stays empty.
1626        assert_eq!(got.records.len(), 1);
1627        assert!(got.filtered.is_empty());
1628        assert!(got.flags.contains(ReadFlags::STALE_SYMBOL));
1629        assert!(got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1630    }
1631
1632    #[test]
1633    fn invalid_boolean_predicate_is_rejected() {
1634        let pipe = Pipeline::new();
1635        let err = pipe
1636            .execute_query("(query :include_retired 5)")
1637            .expect_err("expected bool error");
1638        assert!(matches!(
1639            err,
1640            ReadError::InvalidPredicate {
1641                keyword: "include_retired",
1642                ..
1643            }
1644        ));
1645    }
1646
1647    #[test]
1648    fn invalid_confidence_threshold_is_rejected() {
1649        let pipe = Pipeline::new();
1650        let err = pipe
1651            .execute_query("(query :confidence_threshold 1.5)")
1652            .expect_err("out of range");
1653        assert!(matches!(
1654            err,
1655            ReadError::InvalidPredicate {
1656                keyword: "confidence_threshold",
1657                ..
1658            }
1659        ));
1660    }
1661
1662    // ----- 7.3: effective confidence drives LOW_CONFIDENCE -----
1663
1664    /// Stored 0.8, `valid_at` 136 days before `now()`. Semantic ×
1665    /// `@observation` → 180-day half-life → factor ≈ 0.59 →
1666    /// effective ≈ 0.47 → below the default 0.5 threshold.
1667    const SEM_DECAYED_BELOW: &str =
1668        "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1669
1670    #[test]
1671    fn stored_above_threshold_but_effective_below_triggers_low_confidence() {
1672        let mut pipe = Pipeline::new();
1673        compile(&mut pipe, SEM_DECAYED_BELOW);
1674        let got = pipe.execute_query("(query)").expect("query");
1675        assert_eq!(got.records.len(), 1);
1676        let CanonicalRecord::Sem(sem) = &got.records[0] else {
1677            panic!("expected Sem");
1678        };
1679        assert!(sem.confidence.as_f32() > 0.5, "stored should be > 0.5");
1680        assert!(
1681            got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1682            "effective (decay-adjusted) confidence should be < 0.5"
1683        );
1684    }
1685
1686    #[test]
1687    fn recent_memory_stays_above_threshold() {
1688        // SEM_ALICE is valid_at 2024-01-15 → 93 days before now().
1689        // Semantic × @observation, factor ≈ 0.70, effective ≈ 0.56.
1690        let mut pipe = Pipeline::new();
1691        compile(&mut pipe, SEM_ALICE);
1692        let got = pipe.execute_query("(query)").expect("query");
1693        assert!(!got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1694    }
1695
1696    // ----- 8.1: Episode-scoped read predicates -----
1697
1698    /// Register the Episode that `compile_batch` implicitly commits
1699    /// under. Pipeline-level tests use this because `compile_batch`
1700    /// doesn't allocate the `__ep_{n}` symbol — that's `Store`'s job.
1701    /// Here we fabricate a Memory-kind symbol named `name`, point
1702    /// it at the current watermark, and register. An integration
1703    /// test against `Store` in `tests/round_trip.rs` covers the
1704    /// genuine end-to-end flow.
1705    fn register_latest_episode(pipe: &mut Pipeline, name: &str) -> crate::symbol::SymbolId {
1706        let at = pipe.last_committed_at().expect("committed");
1707        // Use a non-reserved name; `__ep_N` allocations from the
1708        // store path would collide across test calls since we don't
1709        // thread a counter here.
1710        let table_snapshot_len = pipe.table().iter_entries().count();
1711        let id = crate::symbol::SymbolId::new(u64::MAX - table_snapshot_len as u64);
1712        pipe.replay_allocate(id, name.into(), crate::symbol::SymbolKind::Memory)
1713            .expect("allocate");
1714        pipe.register_episode(id, at);
1715        id
1716    }
1717
1718    #[test]
1719    fn in_episode_filters_to_that_commit() {
1720        let mut pipe = Pipeline::new();
1721        compile(&mut pipe, SEM_ALICE);
1722        let ep_id = register_latest_episode(&mut pipe, "ep_alpha");
1723        // Commit a second batch under a different Episode.
1724        pipe.compile_batch(SEM_TRUSTS, later_now()).unwrap();
1725        let _beta = register_latest_episode(&mut pipe, "ep_beta");
1726
1727        let got = pipe
1728            .execute_query("(query :in_episode @ep_alpha)")
1729            .expect("query");
1730        assert_eq!(got.records.len(), 1);
1731        let CanonicalRecord::Sem(sem) = &got.records[0] else {
1732            panic!();
1733        };
1734        let alice = pipe.table().lookup("alice").unwrap();
1735        assert_eq!(sem.s, alice, "should be the SEM_ALICE record");
1736        // Just to make sure the second Episode's id was different
1737        // from ep_alpha's.
1738        assert_ne!(ep_id.as_u64(), 0);
1739    }
1740
1741    #[test]
1742    fn after_episode_filters_later_commits() {
1743        let mut pipe = Pipeline::new();
1744        compile(&mut pipe, SEM_ALICE);
1745        let _alpha = register_latest_episode(&mut pipe, "ep_alpha");
1746        pipe.compile_batch(SEM_TRUSTS, later_now()).unwrap();
1747        let _beta = register_latest_episode(&mut pipe, "ep_beta");
1748
1749        let got = pipe
1750            .execute_query("(query :after_episode @ep_alpha)")
1751            .expect("query");
1752        assert_eq!(got.records.len(), 1);
1753        let CanonicalRecord::Sem(sem) = &got.records[0] else {
1754            panic!();
1755        };
1756        let trusts = pipe.table().lookup("trusts").unwrap();
1757        assert_eq!(sem.p, trusts, "should be the SEM_TRUSTS record");
1758    }
1759
1760    #[test]
1761    fn before_episode_filters_earlier_commits() {
1762        let mut pipe = Pipeline::new();
1763        compile(&mut pipe, SEM_ALICE);
1764        let _alpha = register_latest_episode(&mut pipe, "ep_alpha");
1765        pipe.compile_batch(SEM_TRUSTS, later_now()).unwrap();
1766        let _beta = register_latest_episode(&mut pipe, "ep_beta");
1767
1768        let got = pipe
1769            .execute_query("(query :before_episode @ep_beta)")
1770            .expect("query");
1771        assert_eq!(got.records.len(), 1);
1772        let CanonicalRecord::Sem(sem) = &got.records[0] else {
1773            panic!();
1774        };
1775        let alice = pipe.table().lookup("alice").unwrap();
1776        assert_eq!(sem.s, alice, "should be SEM_ALICE");
1777    }
1778
1779    #[test]
1780    fn unknown_episode_symbol_returns_empty() {
1781        let mut pipe = Pipeline::new();
1782        compile(&mut pipe, SEM_ALICE);
1783        let got = pipe
1784            .execute_query("(query :in_episode @nonexistent)")
1785            .expect("query");
1786        assert!(got.records.is_empty());
1787    }
1788
1789    #[test]
1790    fn multiple_episode_scopes_rejected() {
1791        let mut pipe = Pipeline::new();
1792        compile(&mut pipe, SEM_ALICE);
1793        let _ = register_latest_episode(&mut pipe, "ep_alpha");
1794        let err = pipe
1795            .execute_query("(query :in_episode @ep_alpha :after_episode @ep_alpha)")
1796            .expect_err("two Episode-scoped predicates must reject");
1797        assert!(matches!(
1798            err,
1799            ReadError::InvalidPredicate {
1800                keyword: "in_episode / after_episode / before_episode / episode_chain",
1801                ..
1802            }
1803        ));
1804    }
1805
1806    fn later_now() -> ClockTime {
1807        ClockTime::try_from_millis(1_713_350_400_000 + 1_000).expect("non-sentinel")
1808    }
1809
1810    #[test]
1811    fn decay_config_override_suppresses_decay() {
1812        // Same record as SEM_DECAYED_BELOW, but we disable decay
1813        // for Sem×Observation — effective equals stored, which is
1814        // above the threshold.
1815        let mut pipe = Pipeline::new();
1816        let mut cfg = crate::decay::DecayConfig::librarian_defaults();
1817        cfg.sem_observation = crate::decay::HalfLife::no_decay();
1818        pipe.set_decay_config(cfg);
1819        compile(&mut pipe, SEM_DECAYED_BELOW);
1820        let got = pipe.execute_query("(query)").expect("query");
1821        assert!(
1822            !got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1823            "with decay disabled, stored 0.8 stays above 0.5"
1824        );
1825    }
1826
1827    // ----------------------------------------------------------------
1828    // Inferential resolver — Phase 3.1
1829    //
1830    // Until Phase 3.1 wires the resolver, `(query :kind inf)` returned
1831    // empty. These tests drive the wiring per `temporal-model.md` § 5.4
1832    // + `read-protocol.md` § 3.1 (Inferentials keyed on `(s, p)` like
1833    // Semantic; re-derivation with same `(s, p)` + later `valid_at`
1834    // supersedes the prior, matching the Semantic rule).
1835    // ----------------------------------------------------------------
1836
1837    /// A committed Inferential matching `:kind inf` must appear in the
1838    /// result set. Roadmap Phase 3.1 exit criterion: "for any committed
1839    /// Inferential, a matching `(query :kind inf)` returns it."
1840    ///
1841    /// We seed one `@alice @knows @bob` Sem (so `@alice @knows` exists
1842    /// as bound symbols that the Inf can reference as a parent) and
1843    /// commit an Inferential `(inf @alice @friend_of @bob (@__mem_0)
1844    /// @citation_link :c 0.7 :v 2024-03-15)`.
1845    #[test]
1846    fn inf_kind_query_returns_committed_inf() {
1847        let mut pipe = Pipeline::new();
1848        compile(&mut pipe, SEM_ALICE);
1849        compile(
1850            &mut pipe,
1851            "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1852             :c 0.7 :v 2024-03-15)",
1853        );
1854        let got = pipe.execute_query("(query :kind inf)").expect("query");
1855        assert_eq!(
1856            got.records.len(),
1857            1,
1858            "expected the committed inferential to be returned; \
1859             got {} records: {:?}",
1860            got.records.len(),
1861            got.records,
1862        );
1863        let CanonicalRecord::Inf(inf) = &got.records[0] else {
1864            panic!("expected Inf record, got {:?}", got.records[0]);
1865        };
1866        let alice = pipe.table().lookup("alice").expect("alice bound");
1867        let friend_of = pipe.table().lookup("friend_of").expect("friend_of bound");
1868        assert_eq!(inf.s, alice);
1869        assert_eq!(inf.p, friend_of);
1870    }
1871
1872    /// `:s` / `:p` predicates filter Inferentials by subject /
1873    /// predicate — same semantics as for Sem per read-protocol.md
1874    /// § 4.1.
1875    #[test]
1876    fn inf_sp_query_filters_by_subject_predicate() {
1877        let mut pipe = Pipeline::new();
1878        compile(&mut pipe, SEM_ALICE);
1879        compile(
1880            &mut pipe,
1881            "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1882             :c 0.7 :v 2024-03-15)",
1883        );
1884        compile(
1885            &mut pipe,
1886            "(inf @alice @colleague_of @dave (@__mem_0) @citation_link \
1887             :c 0.7 :v 2024-03-15)",
1888        );
1889        let got = pipe
1890            .execute_query("(query :kind inf :s @alice :p @friend_of)")
1891            .expect("query");
1892        assert_eq!(
1893            got.records.len(),
1894            1,
1895            ":s @alice :p @friend_of must match exactly one Inf",
1896        );
1897        let CanonicalRecord::Inf(inf) = &got.records[0] else {
1898            panic!("expected Inf record");
1899        };
1900        let friend_of = pipe.table().lookup("friend_of").unwrap();
1901        assert_eq!(inf.p, friend_of);
1902    }
1903
1904    /// A bare `(query)` without `:kind` must return Inferentials too
1905    /// (in addition to Sem / Pro). "All memory types end-to-end"
1906    /// is the Phase 3.1 deliverable.
1907    #[test]
1908    fn bare_query_includes_inferentials() {
1909        let mut pipe = Pipeline::new();
1910        compile(&mut pipe, SEM_ALICE);
1911        compile(
1912            &mut pipe,
1913            "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1914             :c 0.7 :v 2024-03-15)",
1915        );
1916        let got = pipe.execute_query("(query)").expect("query");
1917        let has_sem = got
1918            .records
1919            .iter()
1920            .any(|r| matches!(r, CanonicalRecord::Sem(_)));
1921        let has_inf = got
1922            .records
1923            .iter()
1924            .any(|r| matches!(r, CanonicalRecord::Inf(_)));
1925        assert!(has_sem, "bare query must include Sem records");
1926        assert!(has_inf, "bare query must include Inf records");
1927    }
1928
1929    /// Two Inferentials at the same `(s, p)` with the same `valid_at`
1930    /// are a conflict under the single-writer invariant (same rule as
1931    /// Sem § 5.1 per temporal-model.md § 5.4). Verify the emit path
1932    /// rejects it rather than silently overwriting.
1933    #[test]
1934    fn inf_same_sp_same_valid_at_is_conflict() {
1935        let mut pipe = Pipeline::new();
1936        compile(&mut pipe, SEM_ALICE);
1937        compile(
1938            &mut pipe,
1939            "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1940             :c 0.7 :v 2024-01-15)",
1941        );
1942        let err = pipe
1943            .compile_batch(
1944                "(inf @alice @friend_of @carol (@__mem_0) @citation_link \
1945                 :c 0.8 :v 2024-01-15)",
1946                now(),
1947            )
1948            .expect_err("identical (s, p, valid_at) must conflict");
1949        assert!(
1950            matches!(
1951                err,
1952                crate::pipeline::PipelineError::Emit(
1953                    crate::pipeline::EmitError::InferentialSupersessionConflict { .. }
1954                )
1955            ),
1956            "expected InferentialSupersessionConflict; got {err:?}",
1957        );
1958    }
1959
1960    /// Re-derivation with the same `(s, p)` and a later `valid_at`
1961    /// supersedes the prior Inferential — mirroring Semantic § 5.1 per
1962    /// temporal-model.md § 5.4's "auto-supersession rule as if
1963    /// Inferential were Semantic." Only the newer record appears in
1964    /// the current-state result.
1965    #[test]
1966    fn inf_re_derivation_supersedes_earlier_inf() {
1967        let mut pipe = Pipeline::new();
1968        compile(&mut pipe, SEM_ALICE);
1969        compile(
1970            &mut pipe,
1971            "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1972             :c 0.7 :v 2024-01-15)",
1973        );
1974        compile(
1975            &mut pipe,
1976            "(inf @alice @friend_of @carol (@__mem_0) @citation_link \
1977             :c 0.9 :v 2024-03-15)",
1978        );
1979        let got = pipe.execute_query("(query :kind inf)").expect("query");
1980        assert_eq!(
1981            got.records.len(),
1982            1,
1983            "later valid_at re-derivation must supersede earlier Inf; \
1984             current-state query should return only one record. Got: {:?}",
1985            got.records,
1986        );
1987        let CanonicalRecord::Inf(inf) = &got.records[0] else {
1988            panic!("expected Inf");
1989        };
1990        let carol = pipe.table().lookup("carol").expect("carol bound");
1991        assert!(
1992            matches!(&inf.o, crate::Value::Symbol(id) if *id == carol),
1993            "expected the later-valid_at (carol) record; got {:?}",
1994            inf.o,
1995        );
1996    }
1997}