1use thiserror::Error;
42
43use crate::bind::SymbolTable;
44use crate::canonical::{CanonicalRecord, EpiRecord, InfRecord, ProRecord, SemRecord};
45use crate::clock::ClockTime;
46use crate::confidence::Confidence;
47use crate::decay::{effective_confidence, DecayFlags};
48use crate::memory_kind::MemoryKindTag;
49use crate::parse::{self, ParseError, RawSymbolName, RawValue, UnboundForm};
50use crate::pipeline::Pipeline;
51use crate::resolver::{self, TemporalQuery};
52use crate::semantic::source_kind_from_name;
53use crate::source_kind::SourceKind;
54use crate::symbol::SymbolId;
55use crate::value::Value;
56
57#[derive(Clone, Debug, PartialEq)]
59pub struct ReadResult {
60 pub records: Vec<CanonicalRecord>,
64 pub framings: Vec<Framing>,
68 pub filtered: Vec<FilteredMemory>,
72 pub flags: ReadFlags,
74 pub as_of: ClockTime,
77 pub as_committed: ClockTime,
79 pub query_committed_at: ClockTime,
82}
83
84#[derive(Copy, Clone, Debug, PartialEq, Eq)]
87pub enum Framing {
88 Advisory,
90 Historical,
92 Projected,
95 Authoritative {
98 set_by: FramingSource,
100 },
101}
102
103#[derive(Copy, Clone, Debug, PartialEq, Eq)]
106pub enum FramingSource {
107 AgentPinned,
109 OperatorAuthoritative,
111}
112
113#[derive(Copy, Clone, Debug, PartialEq, Eq)]
116pub enum FilterReason {
117 RetiredSymbolExcluded,
120 ProjectedExcluded,
123}
124
125#[derive(Clone, Debug, PartialEq)]
127pub struct FilteredMemory {
128 pub record: CanonicalRecord,
131 pub reason: FilterReason,
133}
134
135#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
141pub struct ReadFlags(u32);
142
143impl ReadFlags {
144 pub const STALE_SYMBOL: u32 = 1 << 0;
146 pub const LOW_CONFIDENCE: u32 = 1 << 2;
150 pub const PROJECTED_PRESENT: u32 = 1 << 3;
152 pub const TRUNCATED: u32 = 1 << 4;
154 pub const EXPLAIN_FILTERED_ACTIVE: u32 = 1 << 7;
159
160 #[must_use]
162 pub const fn empty() -> Self {
163 Self(0)
164 }
165
166 #[must_use]
168 pub const fn contains(self, bits: u32) -> bool {
169 self.0 & bits != 0
170 }
171
172 #[must_use]
174 pub const fn with(self, bits: u32) -> Self {
175 Self(self.0 | bits)
176 }
177
178 #[must_use]
180 pub const fn bits(self) -> u32 {
181 self.0
182 }
183}
184
185#[derive(Copy, Clone, Debug, PartialEq, Eq)]
187pub enum KindFilter {
188 Sem,
190 Pro,
192 Epi,
194 Inf,
198}
199
200#[derive(Debug, Error, PartialEq)]
203pub enum ReadError {
204 #[error("parse error: {0}")]
206 Parse(#[from] ParseError),
207
208 #[error("expected a single (query ...) form, got {count} forms")]
211 NotASingleQuery {
212 count: usize,
214 },
215
216 #[error("input is not a query form")]
218 NotAQuery,
219
220 #[error("query predicate {predicate} is not supported in this milestone")]
223 UnsupportedPredicate {
224 predicate: &'static str,
226 },
227
228 #[error("invalid value for {keyword}: {reason}")]
231 InvalidPredicate {
232 keyword: &'static str,
234 reason: String,
236 },
237
238 #[error("invalid kind {got}: expected one of sem, pro, epi, inf")]
241 InvalidKind {
242 got: String,
244 },
245
246 #[error(
253 "predicate {predicate} is not compatible with :kind {kind:?} (it applies to SEM / INF only)"
254 )]
255 IncompatiblePredicates {
256 predicate: &'static str,
258 kind: KindFilter,
260 },
261}
262
263impl Pipeline {
264 pub fn execute_query(&self, input: &str) -> Result<ReadResult, ReadError> {
276 let forms = parse::parse(input)?;
277 let count = forms.len();
278 let Some(form) = forms.into_iter().next().filter(|_| count == 1) else {
279 return Err(ReadError::NotASingleQuery { count });
280 };
281 let UnboundForm::Query { selector, keywords } = form else {
282 return Err(ReadError::NotAQuery);
283 };
284 if selector.is_some() {
287 return Err(ReadError::UnsupportedPredicate {
288 predicate: "selector",
289 });
290 }
291 execute(self, keywords)
292 }
293}
294
295fn execute(pipeline: &Pipeline, keywords: parse::KeywordArgs) -> Result<ReadResult, ReadError> {
296 let predicates = parse_predicates(pipeline, keywords)?;
297
298 let Some(query_committed_at) = pipeline.last_committed_at() else {
302 return Ok(empty_result(&predicates));
303 };
304
305 let effective_as_of = predicates.as_of.unwrap_or(query_committed_at);
306 let effective_as_committed = predicates.as_committed.unwrap_or(query_committed_at);
307 let temporal = TemporalQuery::bi_temporal(effective_as_of, effective_as_committed);
308
309 check_predicate_compatibility(&predicates)?;
310
311 let candidates = collect_candidates(pipeline, &predicates, temporal);
312 let (kept, filtered) = apply_filters(candidates, pipeline.table(), &predicates);
313 let mut flags = compute_flags(&kept, pipeline, query_committed_at, &predicates);
314
315 let limit_value = predicates.limit.unwrap_or(DEFAULT_LIMIT);
316 let (records, flags_with_limit) = apply_limit(kept, limit_value, flags);
317 flags = flags_with_limit;
318
319 let framings = if predicates.show_framing {
320 records
321 .iter()
322 .map(|r| compute_framing(r, pipeline, predicates.as_of, query_committed_at))
323 .collect()
324 } else {
325 Vec::new()
326 };
327
328 Ok(ReadResult {
329 records,
330 framings,
331 filtered,
332 flags,
333 as_of: effective_as_of,
334 as_committed: effective_as_committed,
335 query_committed_at,
336 })
337}
338
339fn check_predicate_compatibility(predicates: &Predicates) -> Result<(), ReadError> {
344 let Some(k) = predicates.kind else {
345 return Ok(());
346 };
347 if !matches!(k, KindFilter::Pro | KindFilter::Epi) {
348 return Ok(());
349 }
350 if !matches!(predicates.subject, SymbolFilter::Absent) {
351 return Err(ReadError::IncompatiblePredicates {
352 predicate: "s",
353 kind: k,
354 });
355 }
356 if !matches!(predicates.predicate, SymbolFilter::Absent) {
357 return Err(ReadError::IncompatiblePredicates {
358 predicate: "p",
359 kind: k,
360 });
361 }
362 Ok(())
363}
364
365fn collect_candidates(
371 pipeline: &Pipeline,
372 predicates: &Predicates,
373 temporal: TemporalQuery,
374) -> Vec<CanonicalRecord> {
375 if predicates.subject.is_no_match() || predicates.predicate.is_no_match() {
379 return Vec::new();
380 }
381 if predicates
384 .episode
385 .as_ref()
386 .is_some_and(EpisodeFilter::is_empty_set)
387 {
388 return Vec::new();
389 }
390 let mut candidates: Vec<CanonicalRecord> = Vec::new();
391 if matches!(predicates.kind, None | Some(KindFilter::Sem)) {
392 collect_semantic(
393 pipeline,
394 predicates.subject,
395 predicates.predicate,
396 temporal,
397 &mut candidates,
398 );
399 }
400 if matches!(predicates.kind, None | Some(KindFilter::Pro))
401 && matches!(predicates.subject, SymbolFilter::Absent)
402 && matches!(predicates.predicate, SymbolFilter::Absent)
403 {
404 collect_procedural(pipeline, temporal, &mut candidates);
405 }
406 if matches!(predicates.kind, None | Some(KindFilter::Inf)) {
407 collect_inferential(
408 pipeline,
409 predicates.subject,
410 predicates.predicate,
411 temporal,
412 &mut candidates,
413 );
414 }
415 if predicates.include_projected {
416 collect_projected(
417 pipeline,
418 predicates.kind,
419 predicates.subject,
420 predicates.predicate,
421 &mut candidates,
422 );
423 }
424 if let Some(filter) = predicates.episode.as_ref() {
425 candidates.retain(|r| filter.matches(r.committed_at()));
426 }
427 candidates.sort_by_key(CanonicalRecord::committed_at);
428 candidates
429}
430
431fn apply_filters(
435 candidates: Vec<CanonicalRecord>,
436 table: &SymbolTable,
437 predicates: &Predicates,
438) -> (Vec<CanonicalRecord>, Vec<FilteredMemory>) {
439 let mut kept: Vec<CanonicalRecord> = Vec::with_capacity(candidates.len());
440 let mut filtered: Vec<FilteredMemory> = Vec::new();
441 for record in candidates {
442 let retired_ref = record_references_retired(&record, table);
443 let projected = record_is_projected(&record);
444
445 if retired_ref && !predicates.include_retired {
446 if predicates.explain_filtered {
447 filtered.push(FilteredMemory {
448 record,
449 reason: FilterReason::RetiredSymbolExcluded,
450 });
451 }
452 continue;
453 }
454 if projected && !predicates.include_projected {
455 if predicates.explain_filtered {
456 filtered.push(FilteredMemory {
457 record,
458 reason: FilterReason::ProjectedExcluded,
459 });
460 }
461 continue;
462 }
463 kept.push(record);
464 }
465 (kept, filtered)
466}
467
468fn compute_flags(
474 kept: &[CanonicalRecord],
475 pipeline: &Pipeline,
476 query_committed_at: ClockTime,
477 predicates: &Predicates,
478) -> ReadFlags {
479 let mut flags = ReadFlags::empty();
480 let table = pipeline.table();
481 for record in kept {
482 if record_references_retired(record, table) {
483 flags = flags.with(ReadFlags::STALE_SYMBOL);
484 }
485 if record_is_projected(record) {
486 flags = flags.with(ReadFlags::PROJECTED_PRESENT);
487 }
488 let effective = record_effective_confidence(record, pipeline, query_committed_at);
489 if effective < predicates.confidence_threshold {
490 flags = flags.with(ReadFlags::LOW_CONFIDENCE);
491 }
492 }
493 if predicates.explain_filtered {
494 flags = flags.with(ReadFlags::EXPLAIN_FILTERED_ACTIVE);
495 }
496 flags
497}
498
499fn empty_result(predicates: &Predicates) -> ReadResult {
503 let mut flags = ReadFlags::empty();
504 if predicates.explain_filtered {
505 flags = flags.with(ReadFlags::EXPLAIN_FILTERED_ACTIVE);
506 }
507 ReadResult {
508 records: Vec::new(),
509 framings: Vec::new(),
510 filtered: Vec::new(),
511 flags,
512 as_of: predicates.as_of.unwrap_or_else(epoch_zero),
513 as_committed: predicates.as_committed.unwrap_or_else(epoch_zero),
514 query_committed_at: epoch_zero(),
515 }
516}
517
518fn collect_semantic(
526 pipeline: &Pipeline,
527 s: SymbolFilter,
528 p: SymbolFilter,
529 temporal: TemporalQuery,
530 out: &mut Vec<CanonicalRecord>,
531) {
532 if let (Some(sub), Some(pred)) = (s.as_id(), p.as_id()) {
533 if let Some(rec) = resolver::resolve_semantic(pipeline, sub, pred, temporal) {
534 out.push(CanonicalRecord::Sem(rec));
535 }
536 return;
537 }
538 let mut seen: std::collections::BTreeSet<(SymbolId, SymbolId)> =
541 std::collections::BTreeSet::new();
542 for record in pipeline.semantic_records() {
543 if !s.matches(record.s) || !p.matches(record.p) {
544 continue;
545 }
546 let key = (record.s, record.p);
547 if !seen.insert(key) {
548 continue;
549 }
550 if let Some(rec) = resolver::resolve_semantic(pipeline, key.0, key.1, temporal) {
551 out.push(CanonicalRecord::Sem(rec));
552 }
553 }
554}
555
556fn collect_procedural(
557 pipeline: &Pipeline,
558 temporal: TemporalQuery,
559 out: &mut Vec<CanonicalRecord>,
560) {
561 let mut seen_rules: std::collections::BTreeSet<SymbolId> = std::collections::BTreeSet::new();
562 for record in pipeline.procedural_records() {
563 if !seen_rules.insert(record.rule_id) {
564 continue;
565 }
566 if let Some(rec) = resolver::resolve_procedural(pipeline, record.rule_id, temporal) {
567 out.push(CanonicalRecord::Pro(rec));
568 }
569 }
570}
571
572fn collect_inferential(
579 pipeline: &Pipeline,
580 s: SymbolFilter,
581 p: SymbolFilter,
582 temporal: TemporalQuery,
583 out: &mut Vec<CanonicalRecord>,
584) {
585 if let (Some(sub), Some(pred)) = (s.as_id(), p.as_id()) {
586 if let Some(rec) = resolver::resolve_inferential(pipeline, sub, pred, temporal) {
587 out.push(CanonicalRecord::Inf(rec));
588 }
589 return;
590 }
591 let mut seen: std::collections::BTreeSet<(SymbolId, SymbolId)> =
592 std::collections::BTreeSet::new();
593 for record in pipeline.inferential_records() {
594 if !s.matches(record.s) || !p.matches(record.p) {
595 continue;
596 }
597 let key = (record.s, record.p);
598 if !seen.insert(key) {
599 continue;
600 }
601 if let Some(rec) = resolver::resolve_inferential(pipeline, key.0, key.1, temporal) {
602 out.push(CanonicalRecord::Inf(rec));
603 }
604 }
605}
606
607fn collect_projected(
612 pipeline: &Pipeline,
613 kind: Option<KindFilter>,
614 s: SymbolFilter,
615 p: SymbolFilter,
616 out: &mut Vec<CanonicalRecord>,
617) {
618 let existing: std::collections::BTreeSet<SymbolId> = out
619 .iter()
620 .filter_map(|r| match r {
621 CanonicalRecord::Sem(sem) => Some(sem.memory_id),
622 CanonicalRecord::Pro(pro) => Some(pro.memory_id),
623 CanonicalRecord::Inf(inf) => Some(inf.memory_id),
624 CanonicalRecord::Epi(epi) => Some(epi.memory_id),
625 _ => None,
626 })
627 .collect();
628 if matches!(kind, None | Some(KindFilter::Sem)) {
629 for record in pipeline.semantic_records() {
630 if !record.flags.projected {
631 continue;
632 }
633 if !s.matches(record.s) || !p.matches(record.p) {
634 continue;
635 }
636 if existing.contains(&record.memory_id) {
637 continue;
638 }
639 out.push(CanonicalRecord::Sem(record.clone()));
640 }
641 }
642 if matches!(kind, None | Some(KindFilter::Inf)) {
646 for record in pipeline.inferential_records() {
647 if !record.flags.projected {
648 continue;
649 }
650 if !s.matches(record.s) || !p.matches(record.p) {
651 continue;
652 }
653 if existing.contains(&record.memory_id) {
654 continue;
655 }
656 out.push(CanonicalRecord::Inf(record.clone()));
657 }
658 }
659}
660
661fn apply_limit(
664 records: Vec<CanonicalRecord>,
665 limit: usize,
666 existing_flags: ReadFlags,
667) -> (Vec<CanonicalRecord>, ReadFlags) {
668 if records.len() > limit {
669 let truncated: Vec<_> = records.into_iter().take(limit).collect();
670 (truncated, existing_flags.with(ReadFlags::TRUNCATED))
671 } else {
672 (records, existing_flags)
673 }
674}
675
676fn record_references_retired(record: &CanonicalRecord, table: &SymbolTable) -> bool {
683 match record {
684 CanonicalRecord::Sem(r) => sem_has_retired_ref(r, table),
685 CanonicalRecord::Epi(r) => epi_has_retired_ref(r, table),
686 CanonicalRecord::Pro(r) => pro_has_retired_ref(r, table),
687 CanonicalRecord::Inf(r) => inf_has_retired_ref(r, table),
688 _ => false,
691 }
692}
693
694fn sem_has_retired_ref(r: &SemRecord, table: &SymbolTable) -> bool {
695 table.is_retired(r.s)
696 || table.is_retired(r.p)
697 || table.is_retired(r.source)
698 || value_has_retired_symbol(&r.o, table)
699}
700
701fn epi_has_retired_ref(r: &EpiRecord, table: &SymbolTable) -> bool {
702 table.is_retired(r.event_id)
703 || table.is_retired(r.kind)
704 || table.is_retired(r.location)
705 || table.is_retired(r.source)
706 || r.participants.iter().any(|p| table.is_retired(*p))
707}
708
709fn pro_has_retired_ref(r: &ProRecord, table: &SymbolTable) -> bool {
710 table.is_retired(r.rule_id)
711 || table.is_retired(r.scope)
712 || table.is_retired(r.source)
713 || value_has_retired_symbol(&r.trigger, table)
714 || value_has_retired_symbol(&r.action, table)
715 || r.precondition
716 .as_ref()
717 .is_some_and(|v| value_has_retired_symbol(v, table))
718}
719
720fn inf_has_retired_ref(r: &InfRecord, table: &SymbolTable) -> bool {
721 table.is_retired(r.s)
722 || table.is_retired(r.p)
723 || table.is_retired(r.method)
724 || value_has_retired_symbol(&r.o, table)
725 || r.derived_from.iter().any(|p| table.is_retired(*p))
726}
727
728fn value_has_retired_symbol(v: &Value, table: &SymbolTable) -> bool {
729 matches!(v, Value::Symbol(id) if table.is_retired(*id))
730}
731
732fn record_is_projected(record: &CanonicalRecord) -> bool {
736 match record {
737 CanonicalRecord::Sem(r) => r.flags.projected,
738 CanonicalRecord::Inf(r) => r.flags.projected,
739 _ => false,
740 }
741}
742
743fn record_effective_confidence(
760 record: &CanonicalRecord,
761 pipeline: &Pipeline,
762 query_committed_at: ClockTime,
763) -> Confidence {
764 let table = pipeline.table();
765 let decay_config = pipeline.decay_config();
766 let (stored, memory_kind, source_id, valid_at) = match record {
767 CanonicalRecord::Sem(r) => (
768 r.confidence,
769 MemoryKindTag::Semantic,
770 r.source,
771 r.clocks.valid_at,
772 ),
773 CanonicalRecord::Epi(r) => (r.confidence, MemoryKindTag::Episodic, r.source, r.at_time),
774 CanonicalRecord::Pro(r) => (
775 r.confidence,
776 MemoryKindTag::Procedural,
777 r.source,
778 r.clocks.valid_at,
779 ),
780 CanonicalRecord::Inf(r) => return r.confidence,
784 _ => return Confidence::ONE,
785 };
786
787 let source_kind = table.entry(source_id).map_or(SourceKind::Observation, |e| {
788 source_kind_from_name(e.canonical_name.as_str())
789 });
790 let elapsed_ms = query_committed_at
791 .as_millis()
792 .saturating_sub(valid_at.as_millis());
793 let memory_id = record_memory_id(record);
794 let pinned = memory_id.is_some_and(|id| pipeline.is_pinned(id));
795 let authoritative = memory_id.is_some_and(|id| pipeline.is_authoritative(id));
796 let flags = DecayFlags {
797 pinned,
798 authoritative,
799 };
800 effective_confidence(
801 stored,
802 elapsed_ms,
803 memory_kind,
804 source_kind,
805 flags,
806 decay_config,
807 )
808}
809
810fn compute_framing(
822 record: &CanonicalRecord,
823 pipeline: &Pipeline,
824 as_of: Option<ClockTime>,
825 query_committed_at: ClockTime,
826) -> Framing {
827 if record_is_projected(record) {
828 return Framing::Projected;
829 }
830 if let Some(mem_id) = record_memory_id(record) {
831 if pipeline.is_pinned(mem_id) {
832 return Framing::Authoritative {
833 set_by: FramingSource::AgentPinned,
834 };
835 }
836 if pipeline.is_authoritative(mem_id) {
837 return Framing::Authoritative {
838 set_by: FramingSource::OperatorAuthoritative,
839 };
840 }
841 }
842 if as_of.is_some_and(|t| t < query_committed_at) {
843 return Framing::Historical;
844 }
845 Framing::Advisory
846}
847
848fn record_memory_id(record: &CanonicalRecord) -> Option<SymbolId> {
851 match record {
852 CanonicalRecord::Sem(r) => Some(r.memory_id),
853 CanonicalRecord::Epi(r) => Some(r.memory_id),
854 CanonicalRecord::Pro(r) => Some(r.memory_id),
855 CanonicalRecord::Inf(r) => Some(r.memory_id),
856 _ => None,
857 }
858}
859
860const DEFAULT_LIMIT: usize = 1000;
862
863#[allow(clippy::struct_excessive_bools)]
871struct Predicates {
872 kind: Option<KindFilter>,
873 subject: SymbolFilter,
874 predicate: SymbolFilter,
875 as_of: Option<ClockTime>,
876 as_committed: Option<ClockTime>,
877 limit: Option<usize>,
878 include_retired: bool,
879 include_projected: bool,
880 confidence_threshold: Confidence,
881 explain_filtered: bool,
882 show_framing: bool,
883 episode: Option<EpisodeFilter>,
884}
885
886#[derive(Clone, Debug, PartialEq, Eq)]
889enum EpisodeFilter {
890 In { at: ClockTime },
893 After { at: ClockTime },
895 Before { at: ClockTime },
897 Chain { ats: Vec<ClockTime> },
901 UnknownEpisode,
905}
906
907impl EpisodeFilter {
908 fn matches(&self, committed_at: ClockTime) -> bool {
909 match self {
910 Self::In { at } => committed_at == *at,
911 Self::After { at } => committed_at > *at,
912 Self::Before { at } => committed_at < *at,
913 Self::Chain { ats } => ats.contains(&committed_at),
914 Self::UnknownEpisode => false,
915 }
916 }
917
918 fn is_empty_set(&self) -> bool {
919 matches!(self, Self::UnknownEpisode)
920 }
921}
922
923#[derive(Copy, Clone, Debug, PartialEq, Eq)]
929enum SymbolFilter {
930 Absent,
931 Match(SymbolId),
932 NoMatch,
933}
934
935impl SymbolFilter {
936 fn from_lookup(resolved: Option<SymbolId>, set: bool) -> Self {
937 match (set, resolved) {
938 (false, _) => Self::Absent,
939 (true, Some(id)) => Self::Match(id),
940 (true, None) => Self::NoMatch,
941 }
942 }
943
944 fn is_no_match(self) -> bool {
946 matches!(self, Self::NoMatch)
947 }
948
949 fn matches(self, id: SymbolId) -> bool {
950 match self {
951 Self::Absent => true,
952 Self::Match(expected) => id == expected,
953 Self::NoMatch => false,
954 }
955 }
956
957 fn as_id(self) -> Option<SymbolId> {
958 match self {
964 Self::Match(id) => Some(id),
965 _ => None,
966 }
967 }
968}
969
970fn parse_predicates(
971 pipeline: &Pipeline,
972 keywords: parse::KeywordArgs,
973) -> Result<Predicates, ReadError> {
974 let table = pipeline.table();
975 let mut out = Predicates {
976 kind: None,
977 subject: SymbolFilter::Absent,
978 predicate: SymbolFilter::Absent,
979 as_of: None,
980 as_committed: None,
981 limit: None,
982 include_retired: false,
983 include_projected: false,
984 confidence_threshold: default_confidence_threshold(),
985 explain_filtered: false,
986 show_framing: false,
987 episode: None,
988 };
989 let mut debug_mode = false;
990
991 for (key, value) in keywords {
992 match key.as_str() {
993 "kind" => out.kind = Some(parse_kind(&value)?),
994 "s" => {
995 out.subject = SymbolFilter::from_lookup(resolve_symbol(table, &value, "s")?, true);
996 }
997 "p" => {
998 out.predicate =
999 SymbolFilter::from_lookup(resolve_symbol(table, &value, "p")?, true);
1000 }
1001 "as_of" => out.as_of = Some(parse_timestamp(&value, "as_of")?),
1002 "as_committed" => out.as_committed = Some(parse_timestamp(&value, "as_committed")?),
1003 "limit" => out.limit = Some(parse_limit(&value)?),
1004 "include_retired" => out.include_retired = parse_bool(&value, "include_retired")?,
1005 "include_projected" => out.include_projected = parse_bool(&value, "include_projected")?,
1006 "confidence_threshold" => {
1007 out.confidence_threshold = parse_confidence(&value)?;
1008 }
1009 "explain_filtered" => out.explain_filtered = parse_bool(&value, "explain_filtered")?,
1010 "show_framing" => out.show_framing = parse_bool(&value, "show_framing")?,
1011 "debug_mode" => debug_mode = parse_bool(&value, "debug_mode")?,
1012 "in_episode" | "after_episode" | "before_episode" | "episode_chain" => {
1013 if out.episode.is_some() {
1017 return Err(ReadError::InvalidPredicate {
1018 keyword: "in_episode / after_episode / before_episode / episode_chain",
1019 reason: "at most one Episode-scoped predicate per query".into(),
1020 });
1021 }
1022 out.episode = Some(parse_episode_filter(pipeline, &key, &value)?);
1023 }
1024 _ => {
1029 return Err(ReadError::UnsupportedPredicate {
1030 predicate: static_key_name(&key),
1031 });
1032 }
1033 }
1034 }
1035
1036 if debug_mode {
1040 out.explain_filtered = true;
1041 out.show_framing = true;
1042 }
1043
1044 Ok(out)
1045}
1046
1047fn parse_episode_filter(
1053 pipeline: &Pipeline,
1054 keyword: &str,
1055 value: &RawValue,
1056) -> Result<EpisodeFilter, ReadError> {
1057 let static_key = match keyword {
1058 "in_episode" => "in_episode",
1059 "after_episode" => "after_episode",
1060 "before_episode" => "before_episode",
1061 "episode_chain" => "episode_chain",
1062 _ => "unknown_predicate",
1063 };
1064 let Some(id) = resolve_symbol(pipeline.table(), value, static_key)? else {
1065 return Ok(EpisodeFilter::UnknownEpisode);
1066 };
1067 let Some(at) = pipeline.episode_committed_at(id) else {
1068 return Ok(EpisodeFilter::UnknownEpisode);
1069 };
1070 Ok(match keyword {
1071 "in_episode" => EpisodeFilter::In { at },
1072 "after_episode" => EpisodeFilter::After { at },
1073 "before_episode" => EpisodeFilter::Before { at },
1074 _ => {
1077 let ats: Vec<ClockTime> = pipeline
1078 .episode_chain(id)
1079 .filter_map(|ep| pipeline.episode_committed_at(ep))
1080 .collect();
1081 EpisodeFilter::Chain { ats }
1082 }
1083 })
1084}
1085
1086fn default_confidence_threshold() -> Confidence {
1088 #[allow(clippy::expect_used)]
1091 Confidence::try_from_f32(0.5).expect("0.5 is a valid Confidence")
1092}
1093
1094fn parse_kind(value: &RawValue) -> Result<KindFilter, ReadError> {
1095 let name = match value {
1096 RawValue::Bareword(s) => s.as_str(),
1097 RawValue::RawSymbol(RawSymbolName { name, .. }) => name.as_str(),
1098 _ => {
1099 return Err(ReadError::InvalidPredicate {
1100 keyword: "kind",
1101 reason: "expected a bareword (sem, pro, epi, inf)".into(),
1102 })
1103 }
1104 };
1105 match name {
1106 "sem" => Ok(KindFilter::Sem),
1107 "pro" => Ok(KindFilter::Pro),
1108 "epi" => Ok(KindFilter::Epi),
1109 "inf" => Ok(KindFilter::Inf),
1110 other => Err(ReadError::InvalidKind {
1111 got: other.to_string(),
1112 }),
1113 }
1114}
1115
1116fn resolve_symbol(
1121 table: &SymbolTable,
1122 value: &RawValue,
1123 keyword: &'static str,
1124) -> Result<Option<SymbolId>, ReadError> {
1125 let name: &str = match value {
1126 RawValue::RawSymbol(sym) => sym.as_str(),
1127 RawValue::TypedSymbol { name, .. } => name.as_str(),
1128 RawValue::Bareword(text) => text.as_str(),
1129 _ => {
1130 return Err(ReadError::InvalidPredicate {
1131 keyword,
1132 reason: "expected a symbol reference like @name".into(),
1133 })
1134 }
1135 };
1136 Ok(table.lookup(name))
1137}
1138
1139fn parse_timestamp(value: &RawValue, keyword: &'static str) -> Result<ClockTime, ReadError> {
1140 match value {
1141 RawValue::Timestamp(t) => Ok(*t),
1142 _ => Err(ReadError::InvalidPredicate {
1143 keyword,
1144 reason: "expected an ISO-8601 timestamp".into(),
1145 }),
1146 }
1147}
1148
1149fn parse_limit(value: &RawValue) -> Result<usize, ReadError> {
1150 match value {
1151 RawValue::Integer(n) if *n >= 0 => {
1152 usize::try_from(*n).map_err(|_| ReadError::InvalidPredicate {
1153 keyword: "limit",
1154 reason: "limit exceeds usize".into(),
1155 })
1156 }
1157 _ => Err(ReadError::InvalidPredicate {
1158 keyword: "limit",
1159 reason: "expected a non-negative integer".into(),
1160 }),
1161 }
1162}
1163
1164fn parse_bool(value: &RawValue, keyword: &'static str) -> Result<bool, ReadError> {
1165 match value {
1166 RawValue::Boolean(b) => Ok(*b),
1167 _ => Err(ReadError::InvalidPredicate {
1168 keyword,
1169 reason: "expected a boolean".into(),
1170 }),
1171 }
1172}
1173
1174fn parse_confidence(value: &RawValue) -> Result<Confidence, ReadError> {
1175 let f = match value {
1176 RawValue::Float(f) => *f,
1177 RawValue::Integer(n) if *n == 0 || *n == 1 => f64::from(i32::try_from(*n).unwrap_or(0)),
1179 _ => {
1180 return Err(ReadError::InvalidPredicate {
1181 keyword: "confidence_threshold",
1182 reason: "expected a float in [0.0, 1.0]".into(),
1183 });
1184 }
1185 };
1186 #[allow(clippy::cast_possible_truncation)]
1187 Confidence::try_from_f32(f as f32).map_err(|_| ReadError::InvalidPredicate {
1188 keyword: "confidence_threshold",
1189 reason: "expected a float in [0.0, 1.0]".into(),
1190 })
1191}
1192
1193#[allow(clippy::expect_used)]
1201fn epoch_zero() -> ClockTime {
1202 ClockTime::try_from_millis(0).expect("0ms is always a valid ClockTime")
1203}
1204
1205fn static_key_name(key: &str) -> &'static str {
1211 match key {
1212 "o" => "o",
1213 "read_after" => "read_after",
1214 "timeout_ms" => "timeout_ms",
1215 _ => "unknown_predicate",
1216 }
1217}
1218
1219#[cfg(test)]
1220mod tests {
1221 use super::*;
1222
1223 fn now() -> ClockTime {
1224 ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel")
1225 }
1226
1227 fn compile(pipe: &mut Pipeline, src: &str) {
1228 pipe.compile_batch(src, now()).expect("compile");
1229 }
1230
1231 const SEM_ALICE: &str = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
1232 const SEM_TRUSTS: &str = "(sem @alice @trusts @carol :src @observation :c 0.8 :v 2024-01-15)";
1234 const PRO_RULE: &str = r#"(pro @rule_route "agent_write" "route_via_librarian"
1235 :scp @mimir :src @policy :c 1.0)"#;
1236
1237 #[test]
1238 fn empty_pipeline_returns_empty_result() {
1239 let pipe = Pipeline::new();
1240 let got = pipe.execute_query("(query :s @alice :p @knows)").unwrap();
1241 assert!(got.records.is_empty());
1242 assert_eq!(got.flags, ReadFlags::empty());
1243 }
1244
1245 #[test]
1246 fn exact_sp_match_returns_current_memory() {
1247 let mut pipe = Pipeline::new();
1248 compile(&mut pipe, SEM_ALICE);
1249 let got = pipe
1250 .execute_query("(query :s @alice :p @knows)")
1251 .expect("query");
1252 assert_eq!(got.records.len(), 1);
1253 let CanonicalRecord::Sem(sem) = &got.records[0] else {
1254 panic!("expected Sem");
1255 };
1256 let alice = pipe.table().lookup("alice").unwrap();
1257 assert_eq!(sem.s, alice);
1258 }
1259
1260 #[test]
1261 fn unknown_symbol_returns_empty_not_error() {
1262 let mut pipe = Pipeline::new();
1263 compile(&mut pipe, SEM_ALICE);
1264 let got = pipe
1265 .execute_query("(query :s @nonexistent :p @knows)")
1266 .expect("unknown symbol is OK");
1267 assert!(got.records.is_empty());
1268 }
1269
1270 #[test]
1271 fn unscoped_query_returns_current_across_pairs() {
1272 let mut pipe = Pipeline::new();
1273 compile(&mut pipe, SEM_ALICE);
1274 compile(&mut pipe, SEM_TRUSTS);
1275 let got = pipe.execute_query("(query)").expect("all");
1276 assert_eq!(got.records.len(), 2);
1277 }
1278
1279 #[test]
1280 fn kind_filter_isolates_sem_from_pro() {
1281 let mut pipe = Pipeline::new();
1282 compile(&mut pipe, SEM_ALICE);
1283 compile(&mut pipe, PRO_RULE);
1284
1285 let sem_only = pipe.execute_query("(query :kind sem)").expect("sem");
1286 assert_eq!(sem_only.records.len(), 1);
1287 assert!(matches!(sem_only.records[0], CanonicalRecord::Sem(_)));
1288
1289 let pro_only = pipe.execute_query("(query :kind pro)").expect("pro");
1290 assert_eq!(pro_only.records.len(), 1);
1291 assert!(matches!(pro_only.records[0], CanonicalRecord::Pro(_)));
1292 }
1293
1294 #[test]
1295 fn kind_epi_returns_empty_in_71_scope() {
1296 let mut pipe = Pipeline::new();
1297 compile(&mut pipe, SEM_ALICE);
1298 let got = pipe.execute_query("(query :kind epi)").expect("epi");
1299 assert!(got.records.is_empty());
1300 }
1301
1302 #[test]
1303 fn invalid_kind_bareword_is_rejected() {
1304 let pipe = Pipeline::new();
1305 let err = pipe
1306 .execute_query("(query :kind bogus)")
1307 .expect_err("bad kind");
1308 assert!(matches!(err, ReadError::InvalidKind { .. }));
1309 }
1310
1311 #[test]
1312 fn as_of_past_valid_time_returns_earlier_record() {
1313 let mut pipe = Pipeline::new();
1314 compile(&mut pipe, SEM_ALICE);
1315 compile(
1316 &mut pipe,
1317 "(sem @alice @knows @carol :src @observation :c 0.8 :v 2024-03-01)",
1318 );
1319
1320 let got = pipe
1322 .execute_query("(query :s @alice :p @knows :as_of 2024-02-01)")
1323 .expect("as_of");
1324 let CanonicalRecord::Sem(sem) = &got.records[0] else {
1325 panic!();
1326 };
1327 let bob = pipe.table().lookup("bob").unwrap();
1328 assert!(matches!(&sem.o, crate::Value::Symbol(id) if *id == bob));
1329
1330 let current = pipe
1332 .execute_query("(query :s @alice :p @knows)")
1333 .expect("current");
1334 let CanonicalRecord::Sem(sem) = ¤t.records[0] else {
1335 panic!();
1336 };
1337 let carol = pipe.table().lookup("carol").unwrap();
1338 assert!(matches!(&sem.o, crate::Value::Symbol(id) if *id == carol));
1339 }
1340
1341 #[test]
1342 fn limit_truncates_and_sets_flag() {
1343 let mut pipe = Pipeline::new();
1344 compile(&mut pipe, SEM_ALICE);
1345 compile(&mut pipe, SEM_TRUSTS);
1346 let got = pipe.execute_query("(query :limit 1)").expect("limit");
1347 assert_eq!(got.records.len(), 1);
1348 assert!(got.flags.contains(ReadFlags::TRUNCATED));
1349 }
1350
1351 #[test]
1352 fn limit_not_hit_leaves_flag_clear() {
1353 let mut pipe = Pipeline::new();
1354 compile(&mut pipe, SEM_ALICE);
1355 let got = pipe.execute_query("(query :limit 10)").expect("limit");
1356 assert_eq!(got.records.len(), 1);
1357 assert!(!got.flags.contains(ReadFlags::TRUNCATED));
1358 }
1359
1360 #[test]
1361 fn unsupported_predicate_returns_typed_error() {
1362 let mut pipe = Pipeline::new();
1363 compile(&mut pipe, SEM_ALICE);
1364 let err = pipe
1367 .execute_query("(query :read_after @foo)")
1368 .expect_err("unsupported");
1369 assert!(matches!(
1370 err,
1371 ReadError::UnsupportedPredicate {
1372 predicate: "read_after"
1373 }
1374 ));
1375 }
1376
1377 #[test]
1378 fn s_predicate_with_kind_pro_is_rejected() {
1379 let mut pipe = Pipeline::new();
1380 compile(&mut pipe, SEM_ALICE);
1381 compile(&mut pipe, PRO_RULE);
1382 let err = pipe
1383 .execute_query("(query :s @alice :kind pro)")
1384 .expect_err("s + kind pro must reject");
1385 assert!(matches!(
1386 err,
1387 ReadError::IncompatiblePredicates {
1388 predicate: "s",
1389 kind: KindFilter::Pro,
1390 }
1391 ));
1392 }
1393
1394 #[test]
1395 fn p_predicate_with_kind_epi_is_rejected() {
1396 let mut pipe = Pipeline::new();
1397 compile(&mut pipe, SEM_ALICE);
1398 let err = pipe
1399 .execute_query("(query :p @knows :kind epi)")
1400 .expect_err("p + kind epi must reject");
1401 assert!(matches!(
1402 err,
1403 ReadError::IncompatiblePredicates {
1404 predicate: "p",
1405 kind: KindFilter::Epi,
1406 }
1407 ));
1408 }
1409
1410 #[test]
1411 fn write_path_query_still_unsupported() {
1412 let mut pipe = Pipeline::new();
1417 let err = pipe
1418 .compile_batch("(query :s @alice :p @knows)", now())
1419 .expect_err("write path rejects query");
1420 assert!(matches!(
1421 err,
1422 crate::pipeline::PipelineError::Emit(crate::pipeline::EmitError::Unsupported {
1423 form: "query"
1424 })
1425 ));
1426 }
1427
1428 #[test]
1429 fn multiple_forms_rejected() {
1430 let pipe = Pipeline::new();
1431 let err = pipe
1432 .execute_query("(query) (query)")
1433 .expect_err("two forms");
1434 assert!(matches!(err, ReadError::NotASingleQuery { count: 2 }));
1435 }
1436
1437 #[test]
1438 fn non_query_form_rejected() {
1439 let pipe = Pipeline::new();
1440 let err = pipe
1441 .execute_query("(sem @a @b @c :src @observation :c 0.8 :v 2024-01-15)")
1442 .expect_err("not a query");
1443 assert!(matches!(err, ReadError::NotAQuery));
1444 }
1445
1446 const SEM_LOW_CONF: &str = "(sem @mira @likes @tea :src @self_report :c 0.3 :v 2024-01-15)";
1449 const SEM_PROJECTED: &str =
1450 "(sem @plan @deploys @mimir :src @agent_instruction :c 0.9 :v 2099-01-01 :projected true)";
1451
1452 #[test]
1453 fn retired_symbol_default_drops_record() {
1454 let mut pipe = Pipeline::new();
1455 compile(&mut pipe, SEM_ALICE);
1456 compile(&mut pipe, "(retire @bob)");
1458 let got = pipe.execute_query("(query)").expect("query");
1459 assert!(
1460 got.records.is_empty(),
1461 "retired-symbol record should drop by default, got {:?}",
1462 got.records
1463 );
1464 assert!(!got.flags.contains(ReadFlags::STALE_SYMBOL));
1465 }
1466
1467 #[test]
1468 fn include_retired_keeps_record_and_sets_flag() {
1469 let mut pipe = Pipeline::new();
1470 compile(&mut pipe, SEM_ALICE);
1471 compile(&mut pipe, "(retire @bob)");
1472 let got = pipe
1473 .execute_query("(query :include_retired true)")
1474 .expect("query");
1475 assert_eq!(got.records.len(), 1);
1476 assert!(got.flags.contains(ReadFlags::STALE_SYMBOL));
1477 }
1478
1479 #[test]
1480 fn projected_default_drops_record() {
1481 let mut pipe = Pipeline::new();
1482 compile(&mut pipe, SEM_ALICE);
1483 compile(&mut pipe, SEM_PROJECTED);
1484 let got = pipe.execute_query("(query)").expect("query");
1485 assert_eq!(got.records.len(), 1);
1487 assert!(!got.flags.contains(ReadFlags::PROJECTED_PRESENT));
1488 }
1489
1490 #[test]
1491 fn include_projected_keeps_record_and_sets_flag() {
1492 let mut pipe = Pipeline::new();
1493 compile(&mut pipe, SEM_ALICE);
1494 compile(&mut pipe, SEM_PROJECTED);
1495 let got = pipe
1496 .execute_query("(query :include_projected true)")
1497 .expect("query");
1498 assert_eq!(got.records.len(), 2);
1499 assert!(got.flags.contains(ReadFlags::PROJECTED_PRESENT));
1500 }
1501
1502 #[test]
1503 fn low_confidence_flag_fires_on_default_threshold() {
1504 let mut pipe = Pipeline::new();
1505 compile(&mut pipe, SEM_LOW_CONF);
1506 let got = pipe.execute_query("(query)").expect("query");
1507 assert_eq!(got.records.len(), 1);
1508 assert!(got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1510 }
1511
1512 #[test]
1513 fn confidence_threshold_override_tightens_flag() {
1514 let mut pipe = Pipeline::new();
1515 compile(&mut pipe, SEM_ALICE); let got = pipe
1518 .execute_query("(query :confidence_threshold 0.9)")
1519 .expect("query");
1520 assert!(got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1521 let got_default = pipe.execute_query("(query)").expect("query");
1523 assert!(!got_default.flags.contains(ReadFlags::LOW_CONFIDENCE));
1524 }
1525
1526 #[test]
1527 fn confidence_threshold_flag_only_does_not_filter() {
1528 let mut pipe = Pipeline::new();
1529 compile(&mut pipe, SEM_LOW_CONF); let got = pipe.execute_query("(query)").expect("query");
1531 assert_eq!(got.records.len(), 1);
1533 assert!(got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1534 }
1535
1536 #[test]
1537 fn explain_filtered_surfaces_dropped_records() {
1538 let mut pipe = Pipeline::new();
1539 compile(&mut pipe, SEM_ALICE);
1540 compile(&mut pipe, "(retire @bob)");
1541 let got = pipe
1542 .execute_query("(query :explain_filtered true)")
1543 .expect("query");
1544 assert!(got.records.is_empty());
1545 assert_eq!(got.filtered.len(), 1);
1546 assert_eq!(got.filtered[0].reason, FilterReason::RetiredSymbolExcluded);
1547 assert!(got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1548 }
1549
1550 #[test]
1551 fn explain_filtered_off_keeps_filtered_empty() {
1552 let mut pipe = Pipeline::new();
1553 compile(&mut pipe, SEM_ALICE);
1554 compile(&mut pipe, "(retire @bob)");
1555 let got = pipe.execute_query("(query)").expect("query");
1556 assert!(got.filtered.is_empty());
1557 assert!(!got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1558 }
1559
1560 #[test]
1561 fn show_framing_populates_per_record() {
1562 let mut pipe = Pipeline::new();
1563 compile(&mut pipe, SEM_ALICE);
1564 let got = pipe
1565 .execute_query("(query :show_framing true)")
1566 .expect("query");
1567 assert_eq!(got.framings.len(), got.records.len());
1568 assert_eq!(got.framings[0], Framing::Advisory);
1569 }
1570
1571 #[test]
1572 fn show_framing_off_leaves_framings_empty() {
1573 let mut pipe = Pipeline::new();
1574 compile(&mut pipe, SEM_ALICE);
1575 let got = pipe.execute_query("(query)").expect("query");
1576 assert!(got.framings.is_empty());
1577 }
1578
1579 #[test]
1580 fn framing_historical_when_as_of_is_past() {
1581 let mut pipe = Pipeline::new();
1582 compile(&mut pipe, SEM_ALICE);
1583 let got = pipe
1584 .execute_query("(query :as_of 2024-01-20 :show_framing true)")
1585 .expect("query");
1586 assert_eq!(got.framings.len(), 1);
1587 assert_eq!(got.framings[0], Framing::Historical);
1588 }
1589
1590 #[test]
1591 fn framing_projected_for_projected_record() {
1592 let mut pipe = Pipeline::new();
1593 compile(&mut pipe, SEM_PROJECTED);
1594 let got = pipe
1595 .execute_query("(query :include_projected true :show_framing true)")
1596 .expect("query");
1597 assert_eq!(got.framings.len(), 1);
1598 assert_eq!(got.framings[0], Framing::Projected);
1599 }
1600
1601 #[test]
1602 fn debug_mode_enables_both_toggles() {
1603 let mut pipe = Pipeline::new();
1604 compile(&mut pipe, SEM_ALICE);
1605 compile(&mut pipe, "(retire @bob)");
1606 let got = pipe
1607 .execute_query("(query :debug_mode true)")
1608 .expect("query");
1609 assert!(got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1611 assert_eq!(got.filtered.len(), 1);
1612 assert_eq!(got.framings.len(), got.records.len());
1615 }
1616
1617 #[test]
1618 fn include_retired_with_explain_filtered_shows_no_filtered() {
1619 let mut pipe = Pipeline::new();
1620 compile(&mut pipe, SEM_ALICE);
1621 compile(&mut pipe, "(retire @bob)");
1622 let got = pipe
1623 .execute_query("(query :include_retired true :explain_filtered true)")
1624 .expect("query");
1625 assert_eq!(got.records.len(), 1);
1627 assert!(got.filtered.is_empty());
1628 assert!(got.flags.contains(ReadFlags::STALE_SYMBOL));
1629 assert!(got.flags.contains(ReadFlags::EXPLAIN_FILTERED_ACTIVE));
1630 }
1631
1632 #[test]
1633 fn invalid_boolean_predicate_is_rejected() {
1634 let pipe = Pipeline::new();
1635 let err = pipe
1636 .execute_query("(query :include_retired 5)")
1637 .expect_err("expected bool error");
1638 assert!(matches!(
1639 err,
1640 ReadError::InvalidPredicate {
1641 keyword: "include_retired",
1642 ..
1643 }
1644 ));
1645 }
1646
1647 #[test]
1648 fn invalid_confidence_threshold_is_rejected() {
1649 let pipe = Pipeline::new();
1650 let err = pipe
1651 .execute_query("(query :confidence_threshold 1.5)")
1652 .expect_err("out of range");
1653 assert!(matches!(
1654 err,
1655 ReadError::InvalidPredicate {
1656 keyword: "confidence_threshold",
1657 ..
1658 }
1659 ));
1660 }
1661
1662 const SEM_DECAYED_BELOW: &str =
1668 "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1669
1670 #[test]
1671 fn stored_above_threshold_but_effective_below_triggers_low_confidence() {
1672 let mut pipe = Pipeline::new();
1673 compile(&mut pipe, SEM_DECAYED_BELOW);
1674 let got = pipe.execute_query("(query)").expect("query");
1675 assert_eq!(got.records.len(), 1);
1676 let CanonicalRecord::Sem(sem) = &got.records[0] else {
1677 panic!("expected Sem");
1678 };
1679 assert!(sem.confidence.as_f32() > 0.5, "stored should be > 0.5");
1680 assert!(
1681 got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1682 "effective (decay-adjusted) confidence should be < 0.5"
1683 );
1684 }
1685
1686 #[test]
1687 fn recent_memory_stays_above_threshold() {
1688 let mut pipe = Pipeline::new();
1691 compile(&mut pipe, SEM_ALICE);
1692 let got = pipe.execute_query("(query)").expect("query");
1693 assert!(!got.flags.contains(ReadFlags::LOW_CONFIDENCE));
1694 }
1695
1696 fn register_latest_episode(pipe: &mut Pipeline, name: &str) -> crate::symbol::SymbolId {
1706 let at = pipe.last_committed_at().expect("committed");
1707 let table_snapshot_len = pipe.table().iter_entries().count();
1711 let id = crate::symbol::SymbolId::new(u64::MAX - table_snapshot_len as u64);
1712 pipe.replay_allocate(id, name.into(), crate::symbol::SymbolKind::Memory)
1713 .expect("allocate");
1714 pipe.register_episode(id, at);
1715 id
1716 }
1717
1718 #[test]
1719 fn in_episode_filters_to_that_commit() {
1720 let mut pipe = Pipeline::new();
1721 compile(&mut pipe, SEM_ALICE);
1722 let ep_id = register_latest_episode(&mut pipe, "ep_alpha");
1723 pipe.compile_batch(SEM_TRUSTS, later_now()).unwrap();
1725 let _beta = register_latest_episode(&mut pipe, "ep_beta");
1726
1727 let got = pipe
1728 .execute_query("(query :in_episode @ep_alpha)")
1729 .expect("query");
1730 assert_eq!(got.records.len(), 1);
1731 let CanonicalRecord::Sem(sem) = &got.records[0] else {
1732 panic!();
1733 };
1734 let alice = pipe.table().lookup("alice").unwrap();
1735 assert_eq!(sem.s, alice, "should be the SEM_ALICE record");
1736 assert_ne!(ep_id.as_u64(), 0);
1739 }
1740
1741 #[test]
1742 fn after_episode_filters_later_commits() {
1743 let mut pipe = Pipeline::new();
1744 compile(&mut pipe, SEM_ALICE);
1745 let _alpha = register_latest_episode(&mut pipe, "ep_alpha");
1746 pipe.compile_batch(SEM_TRUSTS, later_now()).unwrap();
1747 let _beta = register_latest_episode(&mut pipe, "ep_beta");
1748
1749 let got = pipe
1750 .execute_query("(query :after_episode @ep_alpha)")
1751 .expect("query");
1752 assert_eq!(got.records.len(), 1);
1753 let CanonicalRecord::Sem(sem) = &got.records[0] else {
1754 panic!();
1755 };
1756 let trusts = pipe.table().lookup("trusts").unwrap();
1757 assert_eq!(sem.p, trusts, "should be the SEM_TRUSTS record");
1758 }
1759
1760 #[test]
1761 fn before_episode_filters_earlier_commits() {
1762 let mut pipe = Pipeline::new();
1763 compile(&mut pipe, SEM_ALICE);
1764 let _alpha = register_latest_episode(&mut pipe, "ep_alpha");
1765 pipe.compile_batch(SEM_TRUSTS, later_now()).unwrap();
1766 let _beta = register_latest_episode(&mut pipe, "ep_beta");
1767
1768 let got = pipe
1769 .execute_query("(query :before_episode @ep_beta)")
1770 .expect("query");
1771 assert_eq!(got.records.len(), 1);
1772 let CanonicalRecord::Sem(sem) = &got.records[0] else {
1773 panic!();
1774 };
1775 let alice = pipe.table().lookup("alice").unwrap();
1776 assert_eq!(sem.s, alice, "should be SEM_ALICE");
1777 }
1778
1779 #[test]
1780 fn unknown_episode_symbol_returns_empty() {
1781 let mut pipe = Pipeline::new();
1782 compile(&mut pipe, SEM_ALICE);
1783 let got = pipe
1784 .execute_query("(query :in_episode @nonexistent)")
1785 .expect("query");
1786 assert!(got.records.is_empty());
1787 }
1788
1789 #[test]
1790 fn multiple_episode_scopes_rejected() {
1791 let mut pipe = Pipeline::new();
1792 compile(&mut pipe, SEM_ALICE);
1793 let _ = register_latest_episode(&mut pipe, "ep_alpha");
1794 let err = pipe
1795 .execute_query("(query :in_episode @ep_alpha :after_episode @ep_alpha)")
1796 .expect_err("two Episode-scoped predicates must reject");
1797 assert!(matches!(
1798 err,
1799 ReadError::InvalidPredicate {
1800 keyword: "in_episode / after_episode / before_episode / episode_chain",
1801 ..
1802 }
1803 ));
1804 }
1805
1806 fn later_now() -> ClockTime {
1807 ClockTime::try_from_millis(1_713_350_400_000 + 1_000).expect("non-sentinel")
1808 }
1809
1810 #[test]
1811 fn decay_config_override_suppresses_decay() {
1812 let mut pipe = Pipeline::new();
1816 let mut cfg = crate::decay::DecayConfig::librarian_defaults();
1817 cfg.sem_observation = crate::decay::HalfLife::no_decay();
1818 pipe.set_decay_config(cfg);
1819 compile(&mut pipe, SEM_DECAYED_BELOW);
1820 let got = pipe.execute_query("(query)").expect("query");
1821 assert!(
1822 !got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1823 "with decay disabled, stored 0.8 stays above 0.5"
1824 );
1825 }
1826
1827 #[test]
1846 fn inf_kind_query_returns_committed_inf() {
1847 let mut pipe = Pipeline::new();
1848 compile(&mut pipe, SEM_ALICE);
1849 compile(
1850 &mut pipe,
1851 "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1852 :c 0.7 :v 2024-03-15)",
1853 );
1854 let got = pipe.execute_query("(query :kind inf)").expect("query");
1855 assert_eq!(
1856 got.records.len(),
1857 1,
1858 "expected the committed inferential to be returned; \
1859 got {} records: {:?}",
1860 got.records.len(),
1861 got.records,
1862 );
1863 let CanonicalRecord::Inf(inf) = &got.records[0] else {
1864 panic!("expected Inf record, got {:?}", got.records[0]);
1865 };
1866 let alice = pipe.table().lookup("alice").expect("alice bound");
1867 let friend_of = pipe.table().lookup("friend_of").expect("friend_of bound");
1868 assert_eq!(inf.s, alice);
1869 assert_eq!(inf.p, friend_of);
1870 }
1871
1872 #[test]
1876 fn inf_sp_query_filters_by_subject_predicate() {
1877 let mut pipe = Pipeline::new();
1878 compile(&mut pipe, SEM_ALICE);
1879 compile(
1880 &mut pipe,
1881 "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1882 :c 0.7 :v 2024-03-15)",
1883 );
1884 compile(
1885 &mut pipe,
1886 "(inf @alice @colleague_of @dave (@__mem_0) @citation_link \
1887 :c 0.7 :v 2024-03-15)",
1888 );
1889 let got = pipe
1890 .execute_query("(query :kind inf :s @alice :p @friend_of)")
1891 .expect("query");
1892 assert_eq!(
1893 got.records.len(),
1894 1,
1895 ":s @alice :p @friend_of must match exactly one Inf",
1896 );
1897 let CanonicalRecord::Inf(inf) = &got.records[0] else {
1898 panic!("expected Inf record");
1899 };
1900 let friend_of = pipe.table().lookup("friend_of").unwrap();
1901 assert_eq!(inf.p, friend_of);
1902 }
1903
1904 #[test]
1908 fn bare_query_includes_inferentials() {
1909 let mut pipe = Pipeline::new();
1910 compile(&mut pipe, SEM_ALICE);
1911 compile(
1912 &mut pipe,
1913 "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1914 :c 0.7 :v 2024-03-15)",
1915 );
1916 let got = pipe.execute_query("(query)").expect("query");
1917 let has_sem = got
1918 .records
1919 .iter()
1920 .any(|r| matches!(r, CanonicalRecord::Sem(_)));
1921 let has_inf = got
1922 .records
1923 .iter()
1924 .any(|r| matches!(r, CanonicalRecord::Inf(_)));
1925 assert!(has_sem, "bare query must include Sem records");
1926 assert!(has_inf, "bare query must include Inf records");
1927 }
1928
1929 #[test]
1934 fn inf_same_sp_same_valid_at_is_conflict() {
1935 let mut pipe = Pipeline::new();
1936 compile(&mut pipe, SEM_ALICE);
1937 compile(
1938 &mut pipe,
1939 "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1940 :c 0.7 :v 2024-01-15)",
1941 );
1942 let err = pipe
1943 .compile_batch(
1944 "(inf @alice @friend_of @carol (@__mem_0) @citation_link \
1945 :c 0.8 :v 2024-01-15)",
1946 now(),
1947 )
1948 .expect_err("identical (s, p, valid_at) must conflict");
1949 assert!(
1950 matches!(
1951 err,
1952 crate::pipeline::PipelineError::Emit(
1953 crate::pipeline::EmitError::InferentialSupersessionConflict { .. }
1954 )
1955 ),
1956 "expected InferentialSupersessionConflict; got {err:?}",
1957 );
1958 }
1959
1960 #[test]
1966 fn inf_re_derivation_supersedes_earlier_inf() {
1967 let mut pipe = Pipeline::new();
1968 compile(&mut pipe, SEM_ALICE);
1969 compile(
1970 &mut pipe,
1971 "(inf @alice @friend_of @bob (@__mem_0) @citation_link \
1972 :c 0.7 :v 2024-01-15)",
1973 );
1974 compile(
1975 &mut pipe,
1976 "(inf @alice @friend_of @carol (@__mem_0) @citation_link \
1977 :c 0.9 :v 2024-03-15)",
1978 );
1979 let got = pipe.execute_query("(query :kind inf)").expect("query");
1980 assert_eq!(
1981 got.records.len(),
1982 1,
1983 "later valid_at re-derivation must supersede earlier Inf; \
1984 current-state query should return only one record. Got: {:?}",
1985 got.records,
1986 );
1987 let CanonicalRecord::Inf(inf) = &got.records[0] else {
1988 panic!("expected Inf");
1989 };
1990 let carol = pipe.table().lookup("carol").expect("carol bound");
1991 assert!(
1992 matches!(&inf.o, crate::Value::Symbol(id) if *id == carol),
1993 "expected the later-valid_at (carol) record; got {:?}",
1994 inf.o,
1995 );
1996 }
1997}