1use std::path::Path;
40
41use thiserror::Error;
42
43use crate::canonical::{
44 decode_all, decode_record, encode_record, CanonicalRecord, CheckpointRecord, DecodeError,
45 EpisodeMetaRecord, SymbolEventRecord,
46};
47use crate::clock::ClockTime;
48use crate::log::{CanonicalLog, LogBackend, LogError};
49use crate::pipeline::{Pipeline, PipelineError};
50use crate::symbol::{SymbolId, SymbolKind};
51
52#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
55pub struct EpisodeId(SymbolId);
56
57impl EpisodeId {
58 #[must_use]
61 pub const fn as_symbol(self) -> SymbolId {
62 self.0
63 }
64}
65
66pub struct Store<L: LogBackend = CanonicalLog> {
71 log: L,
72 pipeline: Pipeline,
73 next_episode_counter: u64,
74}
75
76impl Store<CanonicalLog> {
77 pub fn open(path: impl AsRef<Path>) -> Result<Self, StoreError> {
85 let log = CanonicalLog::open(path).map_err(StoreError::Log)?;
86 Self::from_backend(log)
87 }
88
89 pub fn open_in_workspace(
104 data_root: impl AsRef<Path>,
105 workspace_id: crate::WorkspaceId,
106 ) -> Result<Self, StoreError> {
107 use std::fmt::Write;
112 let mut hex = String::with_capacity(workspace_id.as_bytes().len() * 2);
113 for b in workspace_id.as_bytes() {
114 let _ = write!(hex, "{b:02x}");
116 }
117 let workspace_dir = data_root.as_ref().join(&hex);
118 std::fs::create_dir_all(&workspace_dir)
119 .map_err(|e| StoreError::Log(crate::log::LogError::Io(e)))?;
120 let log_path = workspace_dir.join("canonical.log");
121 let log = CanonicalLog::open(log_path).map_err(StoreError::Log)?;
122 Self::from_backend(log)
123 }
124}
125
126impl<L: LogBackend> Store<L> {
127 pub fn from_backend(mut log: L) -> Result<Self, StoreError> {
143 let log_len_on_open = log.len();
144 let bytes_on_open = log.read_all().map_err(StoreError::Log)?;
145 let committed_end = Self::committed_end_for_open(&bytes_on_open)?;
146 if committed_end < bytes_on_open.len() {
147 let committed_end_u64 =
148 u64::try_from(committed_end).map_err(|_| StoreError::Log(LogError::LogOverflow))?;
149 let orphan_bytes = log_len_on_open - committed_end_u64;
150 log.truncate(committed_end_u64).map_err(StoreError::Log)?;
151 tracing::warn!(
152 target: "mimir.recovery.orphan_truncated",
153 log_len_before = log_len_on_open,
154 committed_end = committed_end_u64,
155 orphan_bytes,
156 "truncated orphan bytes past last CHECKPOINT on open",
157 );
158 }
159 let records = decode_all(&bytes_on_open[..committed_end])?;
168 let mut pipeline = Pipeline::new();
169 let mut next_memory_counter = 0_u64;
170 let mut next_episode_counter = 0_u64;
171 let mut symbol_alloc_count = 0_u64;
172 let mut symbol_mutation_count = 0_u64;
173 let mut checkpoint_count = 0_u64;
174 for record in records {
175 pipeline.advance_last_committed_at(record.committed_at());
180 if let Some(edge) = crate::dag::Edge::try_from_record(&record) {
184 pipeline.replay_edge(edge)?;
185 }
186 pipeline.replay_memory_record(&record);
189 pipeline.replay_flag(&record);
191 match record {
192 CanonicalRecord::SymbolAlloc(event) => {
193 pipeline
194 .replay_allocate(event.symbol_id, event.name.clone(), event.symbol_kind)
195 .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
196 Self::advance_reserved_counter("__mem_", &event.name, &mut next_memory_counter);
197 Self::advance_reserved_counter("__ep_", &event.name, &mut next_episode_counter);
198 symbol_alloc_count += 1;
199 }
200 CanonicalRecord::SymbolAlias(event) => {
201 pipeline
202 .replay_alias(event.symbol_id, event.name)
203 .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
204 symbol_mutation_count += 1;
205 }
206 CanonicalRecord::SymbolRename(event) => {
207 pipeline
208 .replay_rename(event.symbol_id, event.name)
209 .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
210 symbol_mutation_count += 1;
211 }
212 CanonicalRecord::SymbolRetire(event) => {
213 pipeline
214 .replay_retire(event.symbol_id, event.name)
215 .map_err(|e| StoreError::Pipeline(PipelineError::Bind(e)))?;
216 symbol_mutation_count += 1;
217 }
218 CanonicalRecord::Checkpoint(cp) => {
219 pipeline.register_episode(cp.episode_id, cp.at);
222 checkpoint_count += 1;
223 }
224 CanonicalRecord::EpisodeMeta(meta) => {
225 pipeline.register_episode(meta.episode_id, meta.at);
229 if let Some(parent) = meta.parent_episode_id {
230 pipeline.register_episode_parent(meta.episode_id, parent);
231 }
232 }
233 _ => {}
234 }
235 }
236 pipeline.set_next_memory_counter(next_memory_counter);
237 if symbol_alloc_count > 0 || symbol_mutation_count > 0 || checkpoint_count > 0 {
240 tracing::info!(
241 target: "mimir.recovery.symbol_replay",
242 symbol_alloc_count,
243 symbol_mutation_count,
244 checkpoint_count,
245 next_memory_counter,
246 next_episode_counter,
247 "replayed committed log into pipeline on open",
248 );
249 }
250 Ok(Self {
251 log,
252 pipeline,
253 next_episode_counter,
254 })
255 }
256
257 fn committed_end_for_open(bytes: &[u8]) -> Result<usize, StoreError> {
258 let mut pos = 0_usize;
259 let mut last_checkpoint_end = 0_usize;
260 while pos < bytes.len() {
261 match decode_record(&bytes[pos..]) {
262 Ok((record, consumed)) => {
263 pos += consumed;
264 if matches!(record, CanonicalRecord::Checkpoint(_)) {
265 last_checkpoint_end = pos;
266 }
267 }
268 Err(source) if Self::is_recoverable_tail_decode_error(&source) => {
269 return Ok(last_checkpoint_end);
270 }
271 Err(source) => {
272 let offset =
273 u64::try_from(pos).map_err(|_| StoreError::Log(LogError::LogOverflow))?;
274 return Err(StoreError::CorruptTail { offset, source });
275 }
276 }
277 }
278 Ok(last_checkpoint_end)
279 }
280
281 const fn is_recoverable_tail_decode_error(error: &DecodeError) -> bool {
282 matches!(
283 error,
284 DecodeError::Truncated { .. } | DecodeError::LengthMismatch { .. }
285 )
286 }
287
288 fn advance_reserved_counter(prefix: &str, name: &str, counter: &mut u64) {
289 if let Some(suffix) = name.strip_prefix(prefix) {
290 if let Ok(n) = suffix.parse::<u64>() {
291 if n + 1 > *counter {
292 *counter = n + 1;
293 }
294 }
295 }
296 }
297
298 #[must_use]
300 pub fn log_len(&self) -> u64 {
301 self.log.len()
302 }
303
304 #[must_use]
308 pub fn pipeline(&self) -> &Pipeline {
309 &self.pipeline
310 }
311
312 pub fn pipeline_mut(&mut self) -> &mut Pipeline {
318 &mut self.pipeline
319 }
320
321 pub fn commit_batch(&mut self, input: &str, now: ClockTime) -> Result<EpisodeId, StoreError> {
345 self.commit_batch_with_metadata(input, now, &EpisodeMetadata::default())
346 }
347
348 pub fn commit_batch_with_metadata(
360 &mut self,
361 input: &str,
362 now: ClockTime,
363 metadata: &EpisodeMetadata,
364 ) -> Result<EpisodeId, StoreError> {
365 let span = tracing::info_span!(
369 "mimir.commit.batch",
370 log_offset_before = self.log.len(),
371 log_offset_after = tracing::field::Empty,
372 record_count = tracing::field::Empty,
373 episode_id = tracing::field::Empty,
374 fsync_micros = tracing::field::Empty,
375 );
376 let _enter = span.enter();
377
378 metadata.validate()?;
379 let pipeline_snapshot = self.pipeline.clone();
380 let episode_counter_snapshot = self.next_episode_counter;
381 let log_len_before = self.log.len();
382
383 let records = self.pipeline.compile_batch(input, now)?;
387
388 let pending = self.pipeline.take_pending_episode_metadata();
393 let mut resolved_meta = metadata.clone();
394 if let Some(p) = pending {
395 if p.label.is_some() {
396 resolved_meta.label = p.label;
397 }
398 if p.parent_episode.is_some() {
399 resolved_meta.parent_episode = p.parent_episode;
400 }
401 if !p.retracts.is_empty() {
402 resolved_meta.retracts = p.retracts;
403 }
404 resolved_meta.validate()?;
407 }
408
409 let effective_now = self.pipeline.last_committed_at().unwrap_or(now);
415
416 let episode_id = self
418 .pipeline
419 .allocate_episode_symbol(self.next_episode_counter)
420 .map_err(|e| {
421 self.pipeline = pipeline_snapshot.clone();
423 self.next_episode_counter = episode_counter_snapshot;
424 StoreError::Pipeline(PipelineError::Emit(e))
425 })?;
426 self.next_episode_counter += 1;
427
428 let checkpoint = CheckpointRecord {
429 episode_id,
430 at: effective_now,
431 memory_count: memory_record_count(&records),
432 };
433
434 let episode_alloc = CanonicalRecord::SymbolAlloc(SymbolEventRecord {
440 symbol_id: episode_id,
441 name: format!("__ep_{episode_counter_snapshot}"),
442 symbol_kind: SymbolKind::Memory,
443 at: effective_now,
444 });
445
446 let episode_meta = resolved_meta.to_record(episode_id, effective_now);
447
448 let mut buf = Vec::new();
449 encode_record(&episode_alloc, &mut buf);
450 for r in &records {
451 encode_record(r, &mut buf);
452 }
453 if let Some(ref meta_rec) = episode_meta {
454 encode_record(&CanonicalRecord::EpisodeMeta(meta_rec.clone()), &mut buf);
455 }
456 encode_record(&CanonicalRecord::Checkpoint(checkpoint), &mut buf);
457
458 if let Err(e) = self.log.append(&buf) {
459 self.rollback(&pipeline_snapshot, episode_counter_snapshot, log_len_before)?;
460 return Err(StoreError::Log(e));
461 }
462
463 let fsync_start = std::time::Instant::now();
466 if let Err(e) = self.log.sync() {
467 self.rollback(&pipeline_snapshot, episode_counter_snapshot, log_len_before)?;
468 return Err(StoreError::Log(e));
469 }
470 let fsync_micros = u64::try_from(fsync_start.elapsed().as_micros()).unwrap_or(u64::MAX);
471
472 self.pipeline.register_episode(episode_id, effective_now);
477 if let Some(ref meta_rec) = episode_meta {
478 if let Some(parent) = meta_rec.parent_episode_id {
479 self.pipeline.register_episode_parent(episode_id, parent);
480 }
481 }
482
483 span.record("log_offset_after", self.log.len());
484 span.record("record_count", records.len());
485 span.record("episode_id", tracing::field::display(episode_id));
486 span.record("fsync_micros", fsync_micros);
487
488 Ok(EpisodeId(episode_id))
489 }
490
491 fn rollback(
495 &mut self,
496 pipeline_snapshot: &Pipeline,
497 episode_counter_snapshot: u64,
498 log_len_before: u64,
499 ) -> Result<(), StoreError> {
500 self.pipeline = pipeline_snapshot.clone();
501 self.next_episode_counter = episode_counter_snapshot;
502 if self.log.len() > log_len_before {
507 self.log.truncate(log_len_before).map_err(StoreError::Log)?;
508 }
509 Ok(())
510 }
511}
512
513fn memory_record_count(records: &[CanonicalRecord]) -> u64 {
514 records
515 .iter()
516 .filter(|record| {
517 matches!(
518 record,
519 CanonicalRecord::Sem(_)
520 | CanonicalRecord::Epi(_)
521 | CanonicalRecord::Pro(_)
522 | CanonicalRecord::Inf(_)
523 )
524 })
525 .count() as u64
526}
527
528#[derive(Debug, Error)]
530pub enum StoreError {
531 #[error("pipeline error: {0}")]
534 Pipeline(#[from] PipelineError),
535
536 #[error("log error: {0}")]
540 Log(#[from] LogError),
541
542 #[error("corrupt canonical log tail at offset {offset}: {source}")]
548 CorruptTail {
549 offset: u64,
551 source: DecodeError,
553 },
554
555 #[error("committed canonical log failed to decode: {source}")]
560 CorruptCommittedLog {
561 #[from]
563 source: crate::canonical::DecodeError,
564 },
565
566 #[error("supersession DAG replay failed: {source}")]
572 DagReplay {
573 #[from]
575 source: crate::dag::DagError,
576 },
577
578 #[error("invalid episode metadata: {reason}")]
582 InvalidEpisodeMetadata {
583 reason: &'static str,
585 },
586}
587
588#[derive(Clone, Debug, Default, PartialEq, Eq)]
593pub struct EpisodeMetadata {
594 pub label: Option<String>,
596 pub parent_episode: Option<SymbolId>,
598 pub retracts: Vec<SymbolId>,
600}
601
602impl EpisodeMetadata {
603 pub const MAX_LABEL_BYTES: usize = 256;
605
606 #[must_use]
609 pub fn is_empty(&self) -> bool {
610 self.label.as_deref().is_none_or(str::is_empty)
611 && self.parent_episode.is_none()
612 && self.retracts.is_empty()
613 }
614
615 fn validate(&self) -> Result<(), StoreError> {
617 if let Some(label) = self.label.as_deref() {
618 if label.len() > Self::MAX_LABEL_BYTES {
619 return Err(StoreError::InvalidEpisodeMetadata {
620 reason: "label exceeds 256-byte cap",
621 });
622 }
623 }
624 Ok(())
625 }
626
627 fn to_record(&self, episode_id: SymbolId, at: ClockTime) -> Option<EpisodeMetaRecord> {
632 if self.is_empty() {
633 return None;
634 }
635 Some(EpisodeMetaRecord {
636 episode_id,
637 at,
638 label: self.label.clone().filter(|s| !s.is_empty()),
639 parent_episode_id: self.parent_episode,
640 retracts: self.retracts.clone(),
641 })
642 }
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648 use crate::canonical::{decode_all, decode_record, CanonicalRecord};
649 use crate::read::{Framing, FramingSource, ReadFlags};
650 use tempfile::TempDir;
651
652 const SEM_OK: &str = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
653 const SEM_OK_2: &str = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
654
655 fn fixed_now() -> ClockTime {
656 ClockTime::try_from_millis(1_713_350_400_000).expect("non-sentinel")
657 }
658
659 fn open_fresh(dir: &TempDir) -> Store {
660 Store::open(dir.path().join("canonical.log")).expect("open")
661 }
662
663 #[derive(Default)]
668 struct FaultyLog {
669 bytes: Vec<u8>,
670 fail_next_append: Option<std::io::ErrorKind>,
671 fail_next_sync: Option<std::io::ErrorKind>,
672 fail_next_truncate: Option<std::io::ErrorKind>,
673 }
674
675 impl FaultyLog {
676 fn new() -> Self {
677 Self::default()
678 }
679
680 fn arm_append_failure(&mut self, kind: std::io::ErrorKind) {
681 self.fail_next_append = Some(kind);
682 }
683
684 fn arm_sync_failure(&mut self, kind: std::io::ErrorKind) {
685 self.fail_next_sync = Some(kind);
686 }
687
688 fn arm_truncate_failure(&mut self, kind: std::io::ErrorKind) {
689 self.fail_next_truncate = Some(kind);
690 }
691 }
692
693 impl LogBackend for FaultyLog {
694 fn append(&mut self, bytes: &[u8]) -> Result<(), LogError> {
695 if let Some(kind) = self.fail_next_append.take() {
696 return Err(LogError::Io(std::io::Error::from(kind)));
697 }
698 self.bytes.extend_from_slice(bytes);
699 Ok(())
700 }
701
702 fn sync(&mut self) -> Result<(), LogError> {
703 if let Some(kind) = self.fail_next_sync.take() {
704 return Err(LogError::Io(std::io::Error::from(kind)));
705 }
706 Ok(())
707 }
708
709 fn truncate(&mut self, new_len: u64) -> Result<(), LogError> {
710 if let Some(kind) = self.fail_next_truncate.take() {
711 return Err(LogError::Io(std::io::Error::from(kind)));
712 }
713 let current = self.bytes.len() as u64;
714 if new_len > current {
715 return Err(LogError::TruncateBeyondEnd {
716 requested: new_len,
717 current,
718 });
719 }
720 let new_len_usize = usize::try_from(new_len).unwrap_or(self.bytes.len());
721 self.bytes.truncate(new_len_usize);
722 Ok(())
723 }
724
725 fn read_all(&mut self) -> Result<Vec<u8>, LogError> {
726 Ok(self.bytes.clone())
727 }
728
729 fn len(&self) -> u64 {
730 self.bytes.len() as u64
731 }
732
733 fn last_checkpoint_end(&mut self) -> Result<u64, LogError> {
734 let mut pos: usize = 0;
735 let mut last_checkpoint_end: u64 = 0;
736 while pos < self.bytes.len() {
737 match decode_record(&self.bytes[pos..]) {
738 Ok((record, consumed)) => {
739 pos += consumed;
740 if matches!(record, CanonicalRecord::Checkpoint(_)) {
741 last_checkpoint_end = pos as u64;
742 }
743 }
744 Err(_) => break,
745 }
746 }
747 Ok(last_checkpoint_end)
748 }
749 }
750
751 #[test]
752 fn commit_single_batch_persists_records_and_checkpoint() {
753 let dir = TempDir::new().expect("tmp");
754 let mut store = open_fresh(&dir);
755 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
756
757 let bytes = store.log.read_all().expect("read");
760 let records = decode_all(&bytes).expect("decode");
761 assert!(matches!(
762 records.last(),
763 Some(CanonicalRecord::Checkpoint(_))
764 ));
765 let checkpoint = records
766 .iter()
767 .find_map(|r| match r {
768 CanonicalRecord::Checkpoint(c) => Some(c),
769 _ => None,
770 })
771 .expect("checkpoint");
772 assert_eq!(
773 checkpoint.memory_count, 1,
774 "checkpoint memory_count must count memory records, not symbol events"
775 );
776 let mem_count = records
777 .iter()
778 .filter(|r| matches!(r, CanonicalRecord::Sem(_)))
779 .count();
780 assert_eq!(mem_count, 1);
781 }
782
783 #[test]
784 fn commit_registers_episode_with_pipeline() {
785 let dir = TempDir::new().expect("tmp");
786 let mut store = open_fresh(&dir);
787 let first = store.commit_batch(SEM_OK, fixed_now()).expect("first");
788 let second = store.commit_batch(SEM_OK_2, fixed_now()).expect("second");
789
790 let got1 = store
794 .pipeline_mut()
795 .execute_query("(query :in_episode @__ep_0)")
796 .expect("q1");
797 assert_eq!(got1.records.len(), 1, "first Episode holds SEM_OK");
798
799 let got2 = store
800 .pipeline_mut()
801 .execute_query("(query :after_episode @__ep_0)")
802 .expect("q2");
803 assert_eq!(got2.records.len(), 1, "SEM_OK_2 commits after __ep_0");
804
805 assert_ne!(first, second);
807 }
808
809 #[test]
810 fn replay_registers_episodes_with_pipeline() {
811 let dir = TempDir::new().expect("tmp");
815 {
816 let mut store = open_fresh(&dir);
817 store.commit_batch(SEM_OK, fixed_now()).expect("first");
818 }
819 let mut reopened = open_fresh(&dir);
820 let got = reopened
821 .pipeline_mut()
822 .execute_query("(query :in_episode @__ep_0)")
823 .expect("query");
824 assert_eq!(
825 got.records.len(),
826 1,
827 "replay must re-register Episodes with the pipeline"
828 );
829 }
830
831 #[test]
832 fn commit_with_metadata_emits_episode_meta_record() {
833 let dir = TempDir::new().expect("tmp");
834 let mut store = open_fresh(&dir);
835 let first = store.commit_batch(SEM_OK, fixed_now()).expect("first");
836 let meta = EpisodeMetadata {
837 label: Some("design-session".into()),
838 parent_episode: Some(first.0),
839 retracts: Vec::new(),
840 };
841 store
842 .commit_batch_with_metadata(SEM_OK_2, fixed_now(), &meta)
843 .expect("second with metadata");
844
845 let bytes = store.log.read_all().expect("read");
846 let records = decode_all(&bytes).expect("decode");
847 let meta_count = records
848 .iter()
849 .filter(|r| matches!(r, CanonicalRecord::EpisodeMeta(_)))
850 .count();
851 assert_eq!(
852 meta_count, 1,
853 "only the metadata-carrying commit should emit an EpisodeMeta"
854 );
855 let meta_rec = records
857 .iter()
858 .find_map(|r| match r {
859 CanonicalRecord::EpisodeMeta(m) => Some(m),
860 _ => None,
861 })
862 .expect("EpisodeMeta present");
863 assert_eq!(meta_rec.label.as_deref(), Some("design-session"));
864 assert_eq!(meta_rec.parent_episode_id, Some(first.0));
865 }
866
867 #[test]
868 fn episode_chain_walks_parent_links_after_replay() {
869 let dir = TempDir::new().expect("tmp");
870 let (first, second, third);
872 {
873 let mut store = open_fresh(&dir);
874 first = store.commit_batch(SEM_OK, fixed_now()).expect("first");
875 second = store
876 .commit_batch_with_metadata(
877 "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)",
878 fixed_now(),
879 &EpisodeMetadata {
880 label: None,
881 parent_episode: Some(first.0),
882 retracts: Vec::new(),
883 },
884 )
885 .expect("second");
886 third = store
887 .commit_batch_with_metadata(
888 "(sem @charlie @knows @dana :src @observation :c 0.7 :v 2024-01-17)",
889 fixed_now(),
890 &EpisodeMetadata {
891 label: None,
892 parent_episode: Some(second.0),
893 retracts: Vec::new(),
894 },
895 )
896 .expect("third");
897 }
898
899 let mut reopened = open_fresh(&dir);
903 let _ = (first, second, third);
904 let got = reopened
905 .pipeline_mut()
906 .execute_query("(query :episode_chain @__ep_2)")
907 .expect("query");
908 assert_eq!(
909 got.records.len(),
910 3,
911 "episode_chain over three linked Episodes returns all three memories"
912 );
913 }
914
915 #[test]
916 fn label_exceeding_cap_rejects() {
917 let dir = TempDir::new().expect("tmp");
918 let mut store = open_fresh(&dir);
919 let bad_label = "x".repeat(EpisodeMetadata::MAX_LABEL_BYTES + 1);
920 let err = store
921 .commit_batch_with_metadata(
922 SEM_OK,
923 fixed_now(),
924 &EpisodeMetadata {
925 label: Some(bad_label),
926 parent_episode: None,
927 retracts: Vec::new(),
928 },
929 )
930 .expect_err("label too long");
931 assert!(matches!(
932 err,
933 StoreError::InvalidEpisodeMetadata { reason } if reason.contains("256")
934 ));
935 }
936
937 #[test]
938 fn episode_start_form_writes_episode_meta_end_to_end() {
939 let dir = TempDir::new().expect("tmp");
942 let mut store = open_fresh(&dir);
943 let input = r#"(episode :start :label "design-session")
944 (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)"#;
945 store.commit_batch(input, fixed_now()).expect("commit");
946
947 let bytes = store.log.read_all().expect("read");
948 let records = decode_all(&bytes).expect("decode");
949 let meta = records
950 .iter()
951 .find_map(|r| match r {
952 CanonicalRecord::EpisodeMeta(m) => Some(m),
953 _ => None,
954 })
955 .expect("EpisodeMeta present");
956 assert_eq!(meta.label.as_deref(), Some("design-session"));
957 }
958
959 #[test]
960 fn episode_close_form_is_accepted_no_op() {
961 let dir = TempDir::new().expect("tmp");
964 let mut store = open_fresh(&dir);
965 let input = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)
966 (episode :close)";
967 store.commit_batch(input, fixed_now()).expect("commit");
968
969 let bytes = store.log.read_all().expect("read");
970 let records = decode_all(&bytes).expect("decode");
971 let meta_count = records
972 .iter()
973 .filter(|r| matches!(r, CanonicalRecord::EpisodeMeta(_)))
974 .count();
975 assert_eq!(
976 meta_count, 0,
977 ":close alone carries no metadata; no EpisodeMeta record"
978 );
979 }
980
981 #[test]
982 fn episode_start_with_parent_links_chain() {
983 let dir = TempDir::new().expect("tmp");
984 let mut store = open_fresh(&dir);
985 let first = store
986 .commit_batch(
987 r#"(episode :start :label "parent")
988 (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)"#,
989 fixed_now(),
990 )
991 .expect("parent");
992 let second_input = "(episode :start :parent_episode @__ep_0)\n\
995 (sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
996 store
997 .commit_batch(second_input, fixed_now())
998 .expect("child");
999
1000 let got = store
1003 .pipeline_mut()
1004 .execute_query("(query :episode_chain @__ep_1)")
1005 .expect("query");
1006 assert_eq!(
1007 got.records.len(),
1008 2,
1009 "chain walk returns both linked Episodes (got {first:?})"
1010 );
1011 }
1012
1013 #[test]
1014 fn episode_start_with_retracts_records_metadata() {
1015 let dir = TempDir::new().expect("tmp");
1016 let mut store = open_fresh(&dir);
1017 let _bad = store
1018 .commit_batch(SEM_OK, fixed_now())
1019 .expect("bad episode");
1020 let input = r#"(episode :start :label "correction" :retracts (@__ep_0))
1025 (sem @alice @knows @charlie :src @observation :c 0.95 :v 2024-01-16)"#;
1026 store.commit_batch(input, fixed_now()).expect("correction");
1027
1028 let bytes = store.log.read_all().expect("read");
1029 let records = decode_all(&bytes).expect("decode");
1030 let meta = records
1031 .iter()
1032 .find_map(|r| match r {
1033 CanonicalRecord::EpisodeMeta(m) => Some(m),
1034 _ => None,
1035 })
1036 .expect("EpisodeMeta present on the correction batch");
1037 assert_eq!(meta.retracts.len(), 1);
1038 assert_eq!(meta.label.as_deref(), Some("correction"));
1039 }
1040
1041 #[test]
1042 fn two_episode_directives_in_one_batch_reject() {
1043 let dir = TempDir::new().expect("tmp");
1044 let mut store = open_fresh(&dir);
1045 let input = r#"(episode :start :label "a")
1046 (episode :start :label "b")
1047 (sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)"#;
1048 let err = store
1049 .commit_batch(input, fixed_now())
1050 .expect_err("multiple episode directives must reject");
1051 assert!(matches!(
1052 err,
1053 StoreError::Pipeline(PipelineError::Semantic(
1054 crate::semantic::SemanticError::MultipleEpisodeDirectives { count: 2 }
1055 ))
1056 ));
1057 }
1058
1059 #[test]
1060 fn pin_suspends_decay_and_flags_authoritative() {
1061 let dir = TempDir::new().expect("tmp");
1065 let mut store = open_fresh(&dir);
1066 let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1067 let _ = store.commit_batch(old_sem, fixed_now()).expect("old sem");
1068
1069 let before = store
1071 .pipeline_mut()
1072 .execute_query("(query)")
1073 .expect("before");
1074 assert!(
1075 before.flags.contains(ReadFlags::LOW_CONFIDENCE),
1076 "decayed stored 0.8 should be < 0.5 before pin"
1077 );
1078
1079 store
1081 .commit_batch("(pin @__mem_0 :actor @mira)", fixed_now())
1082 .expect("pin");
1083
1084 let after = store
1086 .pipeline_mut()
1087 .execute_query("(query :show_framing true)")
1088 .expect("after");
1089 assert!(
1090 !after.flags.contains(ReadFlags::LOW_CONFIDENCE),
1091 "pin must suspend decay"
1092 );
1093 assert_eq!(after.framings.len(), 1);
1094 assert_eq!(
1095 after.framings[0],
1096 Framing::Authoritative {
1097 set_by: FramingSource::AgentPinned
1098 }
1099 );
1100 }
1101
1102 #[test]
1103 fn unpin_restores_decay() {
1104 let dir = TempDir::new().expect("tmp");
1105 let mut store = open_fresh(&dir);
1106 let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1107 store.commit_batch(old_sem, fixed_now()).expect("old sem");
1108 store
1109 .commit_batch("(pin @__mem_0 :actor @mira)", fixed_now())
1110 .expect("pin");
1111 store
1112 .commit_batch("(unpin @__mem_0 :actor @mira)", fixed_now())
1113 .expect("unpin");
1114
1115 let got = store
1116 .pipeline_mut()
1117 .execute_query("(query)")
1118 .expect("query");
1119 assert!(
1120 got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1121 "unpin should restore decay"
1122 );
1123 }
1124
1125 #[test]
1126 fn authoritative_set_surfaces_operator_framing() {
1127 let dir = TempDir::new().expect("tmp");
1128 let mut store = open_fresh(&dir);
1129 let sem = "(sem @alice @knows @bob :src @observation :c 0.8 :v 2024-01-15)";
1130 store.commit_batch(sem, fixed_now()).expect("sem");
1131 store
1132 .commit_batch("(authoritative_set @__mem_0 :actor @operator)", fixed_now())
1133 .expect("auth-set");
1134
1135 let got = store
1136 .pipeline_mut()
1137 .execute_query("(query :show_framing true)")
1138 .expect("query");
1139 assert_eq!(got.framings.len(), 1);
1140 assert_eq!(
1141 got.framings[0],
1142 Framing::Authoritative {
1143 set_by: FramingSource::OperatorAuthoritative
1144 }
1145 );
1146 }
1147
1148 #[test]
1149 fn authoritative_clear_resumes_decay() {
1150 let dir = TempDir::new().expect("tmp");
1151 let mut store = open_fresh(&dir);
1152 let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1153 store.commit_batch(old_sem, fixed_now()).expect("sem");
1154 store
1155 .commit_batch("(authoritative_set @__mem_0 :actor @operator)", fixed_now())
1156 .expect("set");
1157 store
1158 .commit_batch(
1159 "(authoritative_clear @__mem_0 :actor @operator)",
1160 fixed_now(),
1161 )
1162 .expect("clear");
1163
1164 let got = store
1165 .pipeline_mut()
1166 .execute_query("(query)")
1167 .expect("query");
1168 assert!(
1169 got.flags.contains(ReadFlags::LOW_CONFIDENCE),
1170 "clear should restore decay"
1171 );
1172 }
1173
1174 #[test]
1175 fn pin_replay_survives_reopen() {
1176 let dir = TempDir::new().expect("tmp");
1177 let old_sem = "(sem @mira @saw @kilroy :src @observation :c 0.8 :v 2023-12-01)";
1178 {
1179 let mut store = open_fresh(&dir);
1180 store.commit_batch(old_sem, fixed_now()).expect("sem");
1181 store
1182 .commit_batch("(pin @__mem_0 :actor @mira)", fixed_now())
1183 .expect("pin");
1184 }
1185 let mut reopened = open_fresh(&dir);
1186 let got = reopened
1187 .pipeline_mut()
1188 .execute_query("(query :show_framing true)")
1189 .expect("reopened query");
1190 assert_eq!(got.framings.len(), 1);
1192 assert_eq!(
1193 got.framings[0],
1194 Framing::Authoritative {
1195 set_by: FramingSource::AgentPinned
1196 }
1197 );
1198 }
1199
1200 #[test]
1201 fn multiple_commits_accumulate_in_log() {
1202 let dir = TempDir::new().expect("tmp");
1203 let mut store = open_fresh(&dir);
1204 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1205 let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1206 let _ = store.commit_batch(input2, fixed_now()).expect("second");
1207
1208 let bytes = store.log.read_all().expect("read");
1209 let records = decode_all(&bytes).expect("decode");
1210 let checkpoints = records
1213 .iter()
1214 .filter(|r| matches!(r, CanonicalRecord::Checkpoint(_)))
1215 .count();
1216 assert_eq!(checkpoints, 2);
1217 let sems = records
1218 .iter()
1219 .filter(|r| matches!(r, CanonicalRecord::Sem(_)))
1220 .count();
1221 assert_eq!(sems, 2);
1222 }
1223
1224 #[test]
1225 fn pipeline_error_does_not_write_log() {
1226 let dir = TempDir::new().expect("tmp");
1227 let mut store = open_fresh(&dir);
1228 let err = store
1229 .commit_batch("(sem @a", fixed_now())
1230 .expect_err("malformed");
1231 assert!(matches!(err, StoreError::Pipeline(_)));
1232 assert_eq!(store.log.len(), 0);
1233 }
1234
1235 #[test]
1236 fn commits_assign_distinct_episode_ids() {
1237 let dir = TempDir::new().expect("tmp");
1238 let mut store = open_fresh(&dir);
1239 let a = store.commit_batch(SEM_OK, fixed_now()).expect("a");
1240 let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1241 let b = store.commit_batch(input2, fixed_now()).expect("b");
1242 assert_ne!(a.as_symbol(), b.as_symbol());
1243 }
1244
1245 #[test]
1246 fn reopen_truncates_orphans_past_last_checkpoint() {
1247 let dir = TempDir::new().expect("tmp");
1248 let path = dir.path().join("canonical.log");
1249 let committed_len;
1250 {
1251 let mut store = Store::open(&path).expect("open");
1252 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1253 committed_len = store.log.len();
1254 }
1255 {
1258 let mut raw = CanonicalLog::open(&path).expect("reopen raw");
1259 raw.append(&[0x01, 0x42, 0xFF, 0xFF]).expect("append");
1260 raw.sync().expect("sync");
1261 assert!(raw.len() > committed_len);
1262 }
1263 let store = Store::open(&path).expect("reopen store");
1265 assert_eq!(store.log.len(), committed_len);
1266 }
1267
1268 #[test]
1269 fn reopen_on_empty_workspace_is_clean() {
1270 let dir = TempDir::new().expect("tmp");
1271 let store = Store::open(dir.path().join("canonical.log")).expect("open");
1272 assert_eq!(store.log_len(), 0);
1273 }
1274
1275 #[test]
1276 fn episode_allocation_collision_restores_pipeline_state() {
1277 let dir = TempDir::new().expect("tmp");
1283 let mut store = open_fresh(&dir);
1284 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1285 assert_eq!(store.next_episode_counter, 1);
1286 let snapshot = store.pipeline.clone();
1287 let log_len_after_first = store.log.len();
1288
1289 store.next_episode_counter = 0;
1291
1292 let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1293 let err = store
1294 .commit_batch(input2, fixed_now())
1295 .expect_err("collision");
1296 assert!(matches!(err, StoreError::Pipeline(_)));
1297
1298 assert_eq!(store.next_episode_counter, 0);
1303 assert_eq!(store.pipeline, snapshot);
1304 assert_eq!(store.log.len(), log_len_after_first);
1305 }
1306
1307 #[test]
1308 fn reopen_restores_symbol_table_from_log() {
1309 let dir = TempDir::new().expect("tmp");
1310 let path = dir.path().join("canonical.log");
1311 let alice_id;
1312 {
1313 let mut store = Store::open(&path).expect("open");
1314 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1315 alice_id = store
1316 .pipeline
1317 .table()
1318 .lookup("alice")
1319 .expect("alice allocated");
1320 }
1321 let store = Store::open(&path).expect("reopen");
1324 assert_eq!(store.pipeline.table().lookup("alice"), Some(alice_id));
1325 assert!(store.pipeline.table().lookup("knows").is_some());
1326 assert!(store.pipeline.table().lookup("bob").is_some());
1327 }
1328
1329 #[test]
1330 fn reopen_restores_table_from_epi_batch() {
1331 let dir = TempDir::new().expect("tmp");
1332 let path = dir.path().join("canonical.log");
1333 let evt_id;
1334 let alice_id;
1335 {
1336 let mut store = Store::open(&path).expect("open");
1337 let input = "(epi @evt_001 @rename (@old @new) @github \
1338 :at 2024-01-15T10:00:00Z :obs 2024-01-15T10:00:05Z \
1339 :src @alice :c 0.9)";
1340 let _ = store.commit_batch(input, fixed_now()).expect("commit");
1341 evt_id = store
1342 .pipeline
1343 .table()
1344 .lookup("evt_001")
1345 .expect("event id allocated");
1346 alice_id = store
1347 .pipeline
1348 .table()
1349 .lookup("alice")
1350 .expect("witness allocated");
1351 }
1352 let store = Store::open(&path).expect("reopen");
1353 assert_eq!(store.pipeline.table().lookup("evt_001"), Some(evt_id));
1354 assert_eq!(store.pipeline.table().lookup("alice"), Some(alice_id));
1355 assert!(store.pipeline.table().lookup("old").is_some());
1356 assert!(store.pipeline.table().lookup("new").is_some());
1357 assert!(store.pipeline.table().lookup("github").is_some());
1358 assert_eq!(store.pipeline.episodic_records().len(), 1);
1359 assert_eq!(store.pipeline.episodic_records()[0].event_id, evt_id);
1360 assert_eq!(store.pipeline.episodic_records()[0].source, alice_id);
1361 }
1362
1363 #[test]
1364 fn reopen_restores_table_from_pro_batch() {
1365 let dir = TempDir::new().expect("tmp");
1366 let path = dir.path().join("canonical.log");
1367 let rule_id;
1368 {
1369 let mut store = Store::open(&path).expect("open");
1370 let input = r#"(pro @rule_1 "trigger text" "action text" :scp @mimir :src @agent_instruction :c 0.9)"#;
1371 let _ = store.commit_batch(input, fixed_now()).expect("commit");
1372 rule_id = store
1373 .pipeline
1374 .table()
1375 .lookup("rule_1")
1376 .expect("rule allocated");
1377 }
1378 let store = Store::open(&path).expect("reopen");
1379 assert_eq!(store.pipeline.table().lookup("rule_1"), Some(rule_id));
1380 assert!(store.pipeline.table().lookup("mimir").is_some());
1381 assert!(store.pipeline.table().lookup("agent_instruction").is_some());
1382 }
1383
1384 #[test]
1385 fn reopen_restores_table_from_inf_batch() {
1386 let dir = TempDir::new().expect("tmp");
1387 let path = dir.path().join("canonical.log");
1388 let method_id;
1389 {
1390 let mut store = Store::open(&path).expect("open");
1391 let input = "(inf @alice @friend_of @carol (@m0 @m1) @citation_link \
1392 :c 0.6 :v 2024-01-15)";
1393 let _ = store.commit_batch(input, fixed_now()).expect("commit");
1394 method_id = store
1395 .pipeline
1396 .table()
1397 .lookup("citation_link")
1398 .expect("method allocated");
1399 }
1400 let store = Store::open(&path).expect("reopen");
1401 assert_eq!(
1402 store.pipeline.table().lookup("citation_link"),
1403 Some(method_id)
1404 );
1405 for name in ["alice", "friend_of", "carol", "m0", "m1"] {
1406 assert!(
1407 store.pipeline.table().lookup(name).is_some(),
1408 "{name} lost on reopen"
1409 );
1410 }
1411 }
1412
1413 #[test]
1414 fn reopen_advances_memory_and_episode_counters() {
1415 let dir = TempDir::new().expect("tmp");
1416 let path = dir.path().join("canonical.log");
1417 {
1418 let mut store = Store::open(&path).expect("open");
1419 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1420 }
1421 let mut store = Store::open(&path).expect("reopen");
1422 assert_eq!(store.next_episode_counter, 1);
1423 let input2 = "(sem @alice @likes @carol :src @observation :c 0.7 :v 2024-01-16)";
1426 let _ = store.commit_batch(input2, fixed_now()).expect("second");
1427 }
1428
1429 #[test]
1430 fn checkpoint_and_episode_alloc_use_monotonic_clock_under_regressed_wall_clock() {
1431 let dir = TempDir::new().expect("tmp");
1439 let path = dir.path().join("canonical.log");
1440 let high = ClockTime::try_from_millis(2_000_000_000_000).expect("non-sentinel");
1441 let regressed = ClockTime::try_from_millis(1_800_000_000_000).expect("non-sentinel");
1442 {
1443 let mut store = Store::open(&path).expect("open");
1444 let _ = store.commit_batch(SEM_OK, high).expect("high");
1445 let _ = store
1449 .commit_batch(
1450 "(sem @alice @likes @dan :src @observation :c 0.8 :v 2024-01-15)",
1451 regressed,
1452 )
1453 .expect("regressed");
1454 }
1455
1456 let raw = std::fs::read(&path).expect("read log");
1462 let header_size = usize::try_from(crate::log::LOG_HEADER_SIZE).expect("header fits");
1463 let bytes = &raw[header_size..];
1464 let records = decode_all(bytes).expect("decode");
1465
1466 let ep1_alloc = records
1468 .iter()
1469 .find(|r| matches!(r, CanonicalRecord::SymbolAlloc(ev) if ev.name == "__ep_1"))
1470 .expect("__ep_1 alloc present");
1471 let CanonicalRecord::SymbolAlloc(ep1) = ep1_alloc else {
1472 unreachable!();
1473 };
1474 let expected = ClockTime::try_from_millis(high.as_millis() + 1).expect("non-sentinel");
1475 assert_eq!(ep1.at, expected, "__ep_1 alloc must use monotonic clock");
1476
1477 let checkpoints: Vec<_> = records
1480 .iter()
1481 .filter_map(|r| match r {
1482 CanonicalRecord::Checkpoint(c) => Some(c),
1483 _ => None,
1484 })
1485 .collect();
1486 assert_eq!(checkpoints.len(), 2, "two batches → two checkpoints");
1487 assert_eq!(
1488 checkpoints[1].at, expected,
1489 "second checkpoint.at must use monotonic clock, not regressed wall clock"
1490 );
1491 }
1492
1493 #[test]
1494 fn reopen_restores_monotonic_commit_watermark() {
1495 let dir = TempDir::new().expect("tmp");
1502 let path = dir.path().join("canonical.log");
1503 let high = ClockTime::try_from_millis(2_000_000_000_000).expect("non-sentinel");
1504 {
1505 let mut store = Store::open(&path).expect("open");
1506 let _ = store.commit_batch(SEM_OK, high).expect("commit at high");
1507 assert_eq!(store.pipeline.last_committed_at(), Some(high));
1508 }
1509 let mut store = Store::open(&path).expect("reopen");
1511 assert_eq!(store.pipeline.last_committed_at(), Some(high));
1512
1513 let low = ClockTime::try_from_millis(1_800_000_000_000).expect("non-sentinel");
1517 let _ = store
1518 .commit_batch(
1519 "(sem @alice @likes @dan :src @observation :c 0.8 :v 2024-01-15)",
1520 low,
1521 )
1522 .expect("regressed commit");
1523 let watermark = store
1524 .pipeline
1525 .last_committed_at()
1526 .expect("watermark set after commit");
1527 assert_eq!(watermark.as_millis(), high.as_millis() + 1);
1528 }
1529
1530 #[test]
1531 fn reopen_replays_rename_and_retire() {
1532 let dir = TempDir::new().expect("tmp");
1533 let path = dir.path().join("canonical.log");
1534 {
1535 let mut store = Store::open(&path).expect("open");
1536 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1537 let _ = store
1538 .commit_batch("(rename @alice @alice_v2)", fixed_now())
1539 .expect("rename");
1540 let _ = store
1541 .commit_batch("(retire @bob)", fixed_now())
1542 .expect("retire");
1543 }
1544 let store = Store::open(&path).expect("reopen");
1545 let alice_id = store
1546 .pipeline
1547 .table()
1548 .lookup("alice_v2")
1549 .expect("canonical rotated");
1550 assert_eq!(
1551 store
1552 .pipeline
1553 .table()
1554 .entry(alice_id)
1555 .expect("entry")
1556 .canonical_name,
1557 "alice_v2"
1558 );
1559 let bob_id = store.pipeline.table().lookup("bob").expect("bob");
1560 assert!(store.pipeline.table().is_retired(bob_id));
1561 }
1562
1563 #[test]
1568 fn row_3_orphan_memory_record_without_checkpoint_truncated_on_reopen() {
1569 let dir = TempDir::new().expect("tmp");
1574 let path = dir.path().join("canonical.log");
1575 let committed_len;
1576 {
1577 let mut store = Store::open(&path).expect("open");
1578 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1579 committed_len = store.log_len();
1580 }
1581 {
1584 let mut raw = CanonicalLog::open(&path).expect("raw");
1585 let fake_alloc = CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1586 symbol_id: SymbolId::new(999),
1587 name: "orphan_symbol".into(),
1588 symbol_kind: SymbolKind::Literal,
1589 at: fixed_now(),
1590 });
1591 let mut buf = Vec::new();
1592 encode_record(&fake_alloc, &mut buf);
1593 raw.append(&buf).expect("append");
1594 raw.sync().expect("sync");
1595 assert!(raw.len() > committed_len);
1596 }
1597 let store = Store::open(&path).expect("reopen");
1598 assert_eq!(store.log_len(), committed_len);
1599 assert!(store.pipeline.table().lookup("orphan_symbol").is_none());
1601 }
1602
1603 #[test]
1604 fn row_6_append_failure_rolls_back_pipeline_and_log() {
1605 let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1608 let pre_commit_pipeline = store.pipeline.clone();
1609 store
1610 .log
1611 .arm_append_failure(std::io::ErrorKind::StorageFull);
1612
1613 let err = store
1614 .commit_batch(SEM_OK, fixed_now())
1615 .expect_err("append failure");
1616 assert!(matches!(err, StoreError::Log(_)));
1617 assert_eq!(store.log.len(), 0);
1619 assert_eq!(store.pipeline, pre_commit_pipeline);
1620 assert_eq!(store.next_episode_counter, 0);
1621 }
1622
1623 #[test]
1624 fn row_7_sync_failure_rolls_back_pipeline_and_log() {
1625 let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1629 let pre_commit_pipeline = store.pipeline.clone();
1630 store.log.arm_sync_failure(std::io::ErrorKind::Other);
1631
1632 let err = store
1633 .commit_batch(SEM_OK, fixed_now())
1634 .expect_err("sync failure");
1635 assert!(matches!(err, StoreError::Log(_)));
1636 assert_eq!(store.log.len(), 0);
1638 assert_eq!(store.pipeline, pre_commit_pipeline);
1639 assert_eq!(store.next_episode_counter, 0);
1640 }
1641
1642 #[test]
1643 fn rollback_truncate_failure_still_surfaces_an_error() {
1644 let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1652 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1653 let pre_second_pipeline = store.pipeline.clone();
1654 let len_after_first = store.log.len();
1655
1656 store.log.arm_sync_failure(std::io::ErrorKind::Other);
1657 store
1658 .log
1659 .arm_truncate_failure(std::io::ErrorKind::PermissionDenied);
1660
1661 let err = store
1662 .commit_batch(SEM_OK_2, fixed_now())
1663 .expect_err("compound failure");
1664 assert!(matches!(err, StoreError::Log(_)));
1665 assert!(store.log.len() >= len_after_first);
1669 assert_eq!(store.pipeline, pre_second_pipeline);
1673 assert_eq!(store.next_episode_counter, 1);
1674 }
1675
1676 #[test]
1677 fn rollback_preserves_earlier_committed_bytes() {
1678 let mut store = Store::from_backend(FaultyLog::new()).expect("open");
1682 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1683 let len_after_first = store.log.len();
1684 assert!(len_after_first > 0);
1685
1686 store.log.arm_sync_failure(std::io::ErrorKind::Other);
1687 let err = store
1688 .commit_batch(SEM_OK_2, fixed_now())
1689 .expect_err("sync failure");
1690 assert!(matches!(err, StoreError::Log(_)));
1691 assert_eq!(store.log.len(), len_after_first);
1692 }
1693
1694 #[test]
1695 fn orphan_truncation_is_idempotent() {
1696 let dir = TempDir::new().expect("tmp");
1700 let path = dir.path().join("canonical.log");
1701 {
1702 let mut store = Store::open(&path).expect("open");
1703 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1704 let _ = store.commit_batch(SEM_OK_2, fixed_now()).expect("second");
1705 }
1706 {
1708 let mut raw = CanonicalLog::open(&path).expect("raw");
1709 raw.append(&[0x01_u8]).expect("append partial frame");
1710 raw.sync().expect("sync");
1711 }
1712 let len_after_first_recovery = {
1714 let store = Store::open(&path).expect("recover once");
1715 store.log_len()
1716 };
1717 let store = Store::open(&path).expect("recover twice");
1719 assert_eq!(store.log_len(), len_after_first_recovery);
1720 }
1721
1722 #[test]
1723 fn reopen_rejects_corrupt_tail_after_last_checkpoint() {
1724 let dir = TempDir::new().expect("tmp");
1725 let path = dir.path().join("canonical.log");
1726 let committed_len;
1727 {
1728 let mut store = Store::open(&path).expect("open");
1729 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1730 committed_len = store.log_len();
1731 }
1732
1733 {
1734 let mut raw = CanonicalLog::open(&path).expect("raw");
1735 raw.append(&[0x05_u8; 7]).expect("append corrupt tail");
1736 raw.sync().expect("sync");
1737 assert!(raw.len() > committed_len);
1738 }
1739
1740 let Err(err) = Store::open(&path) else {
1741 panic!("corrupt tail must not truncate");
1742 };
1743 assert!(
1744 matches!(err, StoreError::CorruptTail { .. }),
1745 "expected corrupt-tail error, got {err:?}"
1746 );
1747
1748 let raw = CanonicalLog::open(&path).expect("raw reopen");
1749 assert!(
1750 raw.len() > committed_len,
1751 "corrupt tail must be preserved for inspection"
1752 );
1753 }
1754
1755 #[test]
1756 fn reopen_rejects_corrupt_log_without_checkpoint() {
1757 let dir = TempDir::new().expect("tmp");
1758 let path = dir.path().join("canonical.log");
1759 {
1760 let mut raw = CanonicalLog::open(&path).expect("raw");
1761 raw.append(&[0x05_u8]).expect("append corrupt log");
1762 raw.sync().expect("sync");
1763 }
1764
1765 let Err(err) = Store::open(&path) else {
1766 panic!("corrupt checkpoint-free log must not truncate to empty");
1767 };
1768 assert!(
1769 matches!(
1770 err,
1771 StoreError::CorruptTail {
1772 offset: 0,
1773 source: DecodeError::UnknownOpcode { .. }
1774 }
1775 ),
1776 "expected corrupt-tail unknown-opcode error, got {err:?}"
1777 );
1778
1779 let raw = CanonicalLog::open(&path).expect("raw reopen");
1780 assert_eq!(raw.len(), 1, "corrupt bytes must be preserved");
1781 }
1782
1783 #[test]
1784 fn symbol_table_replay_reproduces_pre_crash_state() {
1785 let dir = TempDir::new().expect("tmp");
1793 let path = dir.path().join("canonical.log");
1794 let table_before;
1795 let counter_before;
1796 let memory_counter_before;
1797 {
1798 let mut store = Store::open(&path).expect("open");
1799 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1800 let _ = store
1801 .commit_batch("(rename @alice @alice_v2)", fixed_now())
1802 .expect("rename");
1803 let _ = store
1804 .commit_batch("(retire @bob)", fixed_now())
1805 .expect("retire");
1806 table_before = store.pipeline.table().clone();
1807 counter_before = store.next_episode_counter;
1808 memory_counter_before = store.pipeline.next_memory_counter();
1809 }
1810 let store = Store::open(&path).expect("reopen");
1811 assert_eq!(store.pipeline.table(), &table_before);
1812 assert_eq!(store.next_episode_counter, counter_before);
1813 assert_eq!(store.pipeline.next_memory_counter(), memory_counter_before);
1817 }
1818
1819 #[test]
1820 fn checkpoint_is_atomic_commit_boundary() {
1821 let dir = TempDir::new().expect("tmp");
1827 let path = dir.path().join("canonical.log");
1828 let len_after_first;
1829 {
1830 let mut store = Store::open(&path).expect("open");
1831 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("first");
1832 len_after_first = store.log_len();
1833 let _ = store.commit_batch(SEM_OK_2, fixed_now()).expect("second");
1834 }
1835 {
1838 let mut raw = CanonicalLog::open(&path).expect("raw");
1839 raw.truncate(len_after_first).expect("truncate");
1840 }
1841 let store = Store::open(&path).expect("reopen");
1842 assert!(store.pipeline.table().lookup("carol").is_none());
1844 assert!(store.pipeline.table().lookup("alice").is_some());
1846 }
1847
1848 #[test]
1851 fn open_in_workspace_creates_partitioned_directory() {
1852 use crate::WorkspaceId;
1856 let data_root = TempDir::new().expect("tmp");
1857 let ws_a = WorkspaceId::from_git_remote("https://github.com/foo/mimir").unwrap();
1858 let ws_b = WorkspaceId::from_git_remote("https://github.com/bar/mimir").unwrap();
1859 assert_ne!(ws_a, ws_b);
1860
1861 {
1862 let mut store_a = Store::open_in_workspace(data_root.path(), ws_a).expect("open ws a");
1863 let _ = store_a.commit_batch(SEM_OK, fixed_now()).expect("commit a");
1864 }
1865 {
1866 let mut store_b = Store::open_in_workspace(data_root.path(), ws_b).expect("open ws b");
1867 assert!(store_b.pipeline.table().lookup("alice").is_none());
1870 let _ = store_b.commit_batch(SEM_OK, fixed_now()).expect("commit b");
1871 }
1872 let store_a_again = Store::open_in_workspace(data_root.path(), ws_a).expect("reopen ws a");
1874 assert!(store_a_again.pipeline.table().lookup("alice").is_some());
1875 }
1876
1877 #[test]
1878 fn reopen_restores_procedural_supersession_index() {
1879 let dir = TempDir::new().expect("tmp");
1885 let path = dir.path().join("canonical.log");
1886 let pro_seed = r#"(pro @rule_route "agent_write" "route_via_librarian"
1887 :scp @mimir :src @policy :c 1.0)"#;
1888 {
1889 let mut store = Store::open(&path).expect("open");
1890 let _ = store.commit_batch(pro_seed, fixed_now()).expect("seed");
1891 }
1892 let mut store = Store::open(&path).expect("reopen");
1893 let records = store
1895 .pipeline
1896 .compile_batch(
1897 r#"(pro @rule_route "other_trigger" "other_action"
1898 :scp @other_scope :src @policy :c 0.9)"#,
1899 fixed_now(),
1900 )
1901 .expect("post-reopen compile");
1902 let edges: Vec<_> = records
1903 .iter()
1904 .filter(|r| matches!(r, CanonicalRecord::Supersedes(_)))
1905 .collect();
1906 assert_eq!(
1907 edges.len(),
1908 1,
1909 "post-reopen same-rule_id write must auto-supersede"
1910 );
1911 }
1912
1913 #[test]
1914 fn reopen_restores_supersession_index_so_post_reopen_auto_supersedes() {
1915 let dir = TempDir::new().expect("tmp");
1921 let path = dir.path().join("canonical.log");
1922 {
1923 let mut store = Store::open(&path).expect("open");
1924 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("seed");
1925 }
1926 let mut store = Store::open(&path).expect("reopen");
1928 let records = store
1929 .pipeline
1930 .compile_batch(
1931 "(sem @alice @knows @mallory :src @observation :c 0.8 :v 2024-03-01)",
1932 fixed_now(),
1933 )
1934 .expect("post-reopen compile");
1935 let edges: Vec<_> = records
1936 .iter()
1937 .filter(|r| matches!(r, CanonicalRecord::Supersedes(_)))
1938 .collect();
1939 assert_eq!(
1940 edges.len(),
1941 1,
1942 "post-reopen forward write must auto-supersede the pre-reopen memory"
1943 );
1944 }
1945
1946 #[test]
1947 fn reopen_on_fully_committed_log_preserves_length() {
1948 let dir = TempDir::new().expect("tmp");
1949 let path = dir.path().join("canonical.log");
1950 let committed_len;
1951 {
1952 let mut store = Store::open(&path).expect("open");
1953 let _ = store.commit_batch(SEM_OK, fixed_now()).expect("commit");
1954 committed_len = store.log.len();
1955 }
1956 let store = Store::open(&path).expect("reopen");
1957 assert_eq!(store.log.len(), committed_len);
1958 }
1959
1960 fn fabricate_committed_segment<L: LogBackend>(log: &mut L, records: &[CanonicalRecord]) {
1963 let mut buf = Vec::new();
1964 for r in records {
1965 encode_record(r, &mut buf);
1966 }
1967 log.append(&buf).expect("append");
1968 log.sync().expect("sync");
1969 }
1970
1971 #[test]
1972 fn reopen_replays_supersession_edges_into_dag() {
1973 use crate::canonical::{CheckpointRecord, EdgeRecord};
1978 use crate::dag::EdgeKind;
1979
1980 let mut log = FaultyLog::new();
1981 let ep0 = SymbolId::new(100);
1982 let m1 = SymbolId::new(101);
1983 let m2 = SymbolId::new(102);
1984 let m3 = SymbolId::new(103);
1985 let ts = fixed_now();
1986
1987 let records = vec![
1988 CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1992 symbol_id: ep0,
1993 name: "__ep_0".into(),
1994 symbol_kind: SymbolKind::Memory,
1995 at: ts,
1996 }),
1997 CanonicalRecord::SymbolAlloc(SymbolEventRecord {
1998 symbol_id: m1,
1999 name: "__mem_0".into(),
2000 symbol_kind: SymbolKind::Memory,
2001 at: ts,
2002 }),
2003 CanonicalRecord::SymbolAlloc(SymbolEventRecord {
2004 symbol_id: m2,
2005 name: "__mem_1".into(),
2006 symbol_kind: SymbolKind::Memory,
2007 at: ts,
2008 }),
2009 CanonicalRecord::SymbolAlloc(SymbolEventRecord {
2010 symbol_id: m3,
2011 name: "__mem_2".into(),
2012 symbol_kind: SymbolKind::Memory,
2013 at: ts,
2014 }),
2015 CanonicalRecord::Supersedes(EdgeRecord {
2017 from: m1,
2018 to: m2,
2019 at: ts,
2020 }),
2021 CanonicalRecord::Corrects(EdgeRecord {
2022 from: m2,
2023 to: m3,
2024 at: ts,
2025 }),
2026 CanonicalRecord::Checkpoint(CheckpointRecord {
2027 episode_id: ep0,
2028 at: ts,
2029 memory_count: 0,
2030 }),
2031 ];
2032 fabricate_committed_segment(&mut log, &records);
2033
2034 let store = Store::from_backend(log).expect("open");
2035 assert_eq!(store.pipeline.dag().len(), 2);
2036 let edges: Vec<_> = store.pipeline.dag().edges().to_vec();
2037 assert_eq!(edges[0].kind, EdgeKind::Supersedes);
2038 assert_eq!(edges[0].from, m1);
2039 assert_eq!(edges[0].to, m2);
2040 assert_eq!(edges[1].kind, EdgeKind::Corrects);
2041 }
2042
2043 #[test]
2044 fn reopen_surfaces_dag_replay_error_on_cyclic_edges() {
2045 use crate::canonical::{CheckpointRecord, EdgeRecord};
2048
2049 let mut log = FaultyLog::new();
2050 let ep0 = SymbolId::new(200);
2051 let m1 = SymbolId::new(201);
2052 let m2 = SymbolId::new(202);
2053 let ts = fixed_now();
2054
2055 let records = vec![
2056 CanonicalRecord::SymbolAlloc(SymbolEventRecord {
2057 symbol_id: ep0,
2058 name: "__ep_0".into(),
2059 symbol_kind: SymbolKind::Memory,
2060 at: ts,
2061 }),
2062 CanonicalRecord::Supersedes(EdgeRecord {
2064 from: m1,
2065 to: m2,
2066 at: ts,
2067 }),
2068 CanonicalRecord::Supersedes(EdgeRecord {
2069 from: m2,
2070 to: m1,
2071 at: ts,
2072 }),
2073 CanonicalRecord::Checkpoint(CheckpointRecord {
2074 episode_id: ep0,
2075 at: ts,
2076 memory_count: 0,
2077 }),
2078 ];
2079 fabricate_committed_segment(&mut log, &records);
2080
2081 let Err(err) = Store::from_backend(log) else {
2082 panic!("cyclic edges must not replay cleanly");
2083 };
2084 assert!(
2085 matches!(err, StoreError::DagReplay { .. }),
2086 "expected DagReplay, got {err:?}"
2087 );
2088 }
2089}