1use crate::error::{Error, Result};
10use crate::session::SessionEntry;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use serde_json::value::RawValue;
14use sha2::{Digest, Sha256};
15use std::borrow::Cow;
16use std::collections::BTreeSet;
17use std::fs::{self, File, OpenOptions};
18use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20
21fn secure_open_options() -> OpenOptions {
22 #[cfg_attr(not(unix), allow(unused_mut))]
23 let mut opts = OpenOptions::new();
24 #[cfg(unix)]
25 {
26 use std::os::unix::fs::OpenOptionsExt;
27 opts.mode(0o600);
28 }
29 opts
30}
31
32pub const SEGMENT_FRAME_SCHEMA: &str = "pi.session_store_v2.segment_frame.v1";
33pub const OFFSET_INDEX_SCHEMA: &str = "pi.session_store_v2.offset_index.v1";
34pub const CHECKPOINT_SCHEMA: &str = "pi.session_store_v2.checkpoint.v1";
35pub const MANIFEST_SCHEMA: &str = "pi.session_store_v2.manifest.v1";
36pub const MIGRATION_EVENT_SCHEMA: &str = "pi.session_store_v2.migration_event.v1";
37
38const MAX_FRAME_READ_BYTES: u64 = 100 * 1024 * 1024;
40
41const GENESIS_CHAIN_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct SegmentFrame {
47 pub schema: Cow<'static, str>,
48 pub segment_seq: u64,
49 pub frame_seq: u64,
50 pub entry_seq: u64,
51 pub entry_id: String,
52 #[serde(skip_serializing_if = "Option::is_none")]
53 pub parent_entry_id: Option<String>,
54 pub entry_type: String,
55 pub timestamp: String,
56 pub payload_sha256: String,
57 pub payload_bytes: u64,
58 pub payload: Box<RawValue>,
59}
60
61impl SegmentFrame {
62 fn new(
63 segment_seq: u64,
64 frame_seq: u64,
65 entry_seq: u64,
66 entry_id: String,
67 parent_entry_id: Option<String>,
68 entry_type: String,
69 payload: Box<RawValue>,
70 ) -> Result<Self> {
71 let (payload_sha256, payload_bytes) = payload_hash_and_size(&payload)?;
72 Ok(Self {
73 schema: Cow::Borrowed(SEGMENT_FRAME_SCHEMA),
74 segment_seq,
75 frame_seq,
76 entry_seq,
77 entry_id,
78 parent_entry_id,
79 entry_type,
80 timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
81 payload_sha256,
82 payload_bytes,
83 payload,
84 })
85 }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(rename_all = "camelCase")]
90pub struct OffsetIndexEntry {
91 pub schema: Cow<'static, str>,
92 pub entry_seq: u64,
93 pub entry_id: String,
94 pub segment_seq: u64,
95 pub frame_seq: u64,
96 pub byte_offset: u64,
97 pub byte_length: u64,
98 pub crc32c: String,
99 pub state: Cow<'static, str>,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(rename_all = "camelCase")]
105pub struct StoreHead {
106 pub segment_seq: u64,
107 pub entry_seq: u64,
108 pub entry_id: String,
109}
110
111#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "camelCase")]
114pub struct Checkpoint {
115 pub schema: String,
116 pub checkpoint_seq: u64,
117 pub at: String,
118 pub head_entry_seq: u64,
119 pub head_entry_id: String,
120 pub snapshot_ref: String,
121 pub compacted_before_entry_seq: u64,
122 pub chain_hash: String,
123 pub reason: String,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct Manifest {
129 pub schema: String,
130 pub store_version: u8,
131 pub session_id: String,
132 pub source_format: String,
133 pub created_at: String,
134 pub updated_at: String,
135 pub head: StoreHead,
136 pub counters: ManifestCounters,
137 pub files: ManifestFiles,
138 pub integrity: ManifestIntegrity,
139 pub invariants: ManifestInvariants,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[serde(rename_all = "camelCase")]
144pub struct ManifestCounters {
145 pub entries_total: u64,
146 pub messages_total: u64,
147 pub branches_total: u64,
148 pub compactions_total: u64,
149 pub bytes_total: u64,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
153#[serde(rename_all = "camelCase")]
154pub struct ManifestFiles {
155 pub segment_dir: String,
156 pub segment_count: u64,
157 pub index_path: String,
158 pub checkpoint_dir: String,
159 pub migration_ledger_path: String,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ManifestIntegrity {
165 pub chain_hash: String,
166 pub manifest_hash: String,
167 pub last_crc32c: String,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171#[serde(rename_all = "camelCase")]
172#[allow(clippy::struct_excessive_bools)] pub struct ManifestInvariants {
174 pub parent_links_closed: bool,
175 pub monotonic_entry_seq: bool,
176 pub monotonic_segment_seq: bool,
177 pub index_within_segment_bounds: bool,
178 pub branch_heads_indexed: bool,
179 pub checkpoints_monotonic: bool,
180 pub hash_chain_valid: bool,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
184#[serde(rename_all = "camelCase")]
185pub struct MigrationVerification {
186 pub entry_count_match: bool,
187 pub hash_chain_match: bool,
188 pub index_consistent: bool,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
192#[serde(rename_all = "camelCase")]
193pub struct MigrationEvent {
194 pub schema: String,
195 pub migration_id: String,
196 pub phase: String,
197 pub at: String,
198 pub source_path: String,
199 pub target_path: String,
200 pub source_format: String,
201 pub target_format: String,
202 pub verification: MigrationVerification,
203 pub outcome: String,
204 #[serde(skip_serializing_if = "Option::is_none")]
205 pub error_class: Option<String>,
206 pub correlation_id: String,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(rename_all = "camelCase")]
211pub struct IndexSummary {
212 pub entry_count: u64,
213 pub first_entry_seq: u64,
214 pub last_entry_seq: u64,
215 pub last_entry_id: String,
216}
217
218#[derive(Debug, Clone)]
219pub struct SessionStoreV2 {
220 root: PathBuf,
221 max_segment_bytes: u64,
222 next_segment_seq: u64,
223 next_frame_seq: u64,
224 next_entry_seq: u64,
225 current_segment_bytes: u64,
226 chain_hash: String,
228 total_bytes: u64,
230 last_entry_id: Option<String>,
232 last_crc32c: String,
234}
235
236impl SessionStoreV2 {
237 pub fn open_for_inspection(root: impl AsRef<Path>, max_segment_bytes: u64) -> Result<Self> {
239 if max_segment_bytes == 0 {
240 return Err(Error::validation("max_segment_bytes must be > 0"));
241 }
242
243 Ok(Self {
244 root: root.as_ref().to_path_buf(),
245 max_segment_bytes,
246 next_segment_seq: 1,
247 next_frame_seq: 1,
248 next_entry_seq: 1,
249 current_segment_bytes: 0,
250 chain_hash: GENESIS_CHAIN_HASH.to_string(),
251 total_bytes: 0,
252 last_entry_id: None,
253 last_crc32c: "00000000".to_string(),
254 })
255 }
256
257 pub fn create(root: impl AsRef<Path>, max_segment_bytes: u64) -> Result<Self> {
258 if max_segment_bytes == 0 {
259 return Err(Error::validation("max_segment_bytes must be > 0"));
260 }
261
262 let root = root.as_ref().to_path_buf();
263 fs::create_dir_all(root.join("segments"))?;
264 fs::create_dir_all(root.join("index"))?;
265 fs::create_dir_all(root.join("checkpoints"))?;
266 fs::create_dir_all(root.join("migrations"))?;
267 fs::create_dir_all(root.join("tmp"))?;
268
269 let mut store = Self {
270 root,
271 max_segment_bytes,
272 next_segment_seq: 1,
273 next_frame_seq: 1,
274 next_entry_seq: 1,
275 current_segment_bytes: 0,
276 chain_hash: GENESIS_CHAIN_HASH.to_string(),
277 total_bytes: 0,
278 last_entry_id: None,
279 last_crc32c: "00000000".to_string(),
280 };
281 if let Err(err) = store.bootstrap_from_disk() {
282 if is_recoverable_index_error(&err) {
283 tracing::warn!(
284 root = %store.root.display(),
285 error = %err,
286 "SessionStoreV2 bootstrap failed with recoverable index error; attempting index rebuild"
287 );
288 store.rebuild_index()?;
289 store.bootstrap_from_disk()?;
290 } else {
291 return Err(err);
292 }
293 }
294
295 if store.entry_count() == 0 && store.segments_exist_with_data()? {
298 tracing::warn!(
299 root = %store.root.display(),
300 "SessionStoreV2 detected segment data with empty index; rebuilding index"
301 );
302 store.rebuild_index()?;
303 store.bootstrap_from_disk()?;
304 }
305
306 if let Err(err) = store.validate_integrity() {
307 if is_recoverable_index_error(&err) {
308 tracing::warn!(
309 root = %store.root.display(),
310 error = %err,
311 "SessionStoreV2 integrity validation failed with recoverable error; rebuilding index"
312 );
313 store.rebuild_index()?;
314 store.bootstrap_from_disk()?;
315 store.validate_integrity()?;
316 } else {
317 return Err(err);
318 }
319 }
320 Ok(store)
321 }
322
323 pub fn segment_file_path(&self, segment_seq: u64) -> PathBuf {
324 self.root
325 .join("segments")
326 .join(format!("{segment_seq:016}.seg"))
327 }
328
329 pub fn index_file_path(&self) -> PathBuf {
330 self.root.join("index").join("offsets.jsonl")
331 }
332
333 fn manifest_path(&self) -> PathBuf {
334 self.root.join("manifest.json")
335 }
336
337 fn migration_ledger_path(&self) -> PathBuf {
338 self.root.join("migrations").join("ledger.jsonl")
339 }
340
341 fn list_segment_files(&self) -> Result<Vec<(u64, PathBuf)>> {
342 let segments_dir = self.root.join("segments");
343 if !segments_dir.exists() {
344 return Ok(Vec::new());
345 }
346
347 let mut segment_files = Vec::new();
348 for entry in fs::read_dir(segments_dir)? {
349 let entry = entry?;
350 let path = entry.path();
351 if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
352 continue;
353 }
354 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
355 continue;
356 };
357 let Ok(segment_seq) = stem.parse::<u64>() else {
358 continue;
359 };
360 segment_files.push((segment_seq, path));
361 }
362 segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
363 Ok(segment_files)
364 }
365
366 fn segments_exist_with_data(&self) -> Result<bool> {
367 for (_, path) in self.list_segment_files()? {
368 if fs::metadata(path)?.len() > 0 {
369 return Ok(true);
370 }
371 }
372 Ok(false)
373 }
374
375 #[allow(clippy::needless_pass_by_value)]
376 pub fn append_entry(
377 &mut self,
378 entry_id: impl Into<String>,
379 parent_entry_id: Option<String>,
380 entry_type: impl Into<String>,
381 payload: Value,
382 ) -> Result<OffsetIndexEntry> {
383 let entry_id = entry_id.into();
384 let entry_type = entry_type.into();
385
386 let raw_string = serde_json::to_string(&payload)?;
391 let raw_payload = RawValue::from_string(raw_string)
392 .map_err(|e| Error::session(format!("failed to convert payload to RawValue: {e}")))?;
393
394 let mut frame = SegmentFrame::new(
395 self.next_segment_seq,
396 self.next_frame_seq,
397 self.next_entry_seq,
398 entry_id,
399 parent_entry_id,
400 entry_type,
401 raw_payload,
402 )?;
403 let mut encoded = serde_json::to_vec(&frame)?;
404 let mut line_len = line_length_u64(&encoded)?;
405
406 if self.current_segment_bytes > 0
407 && self.current_segment_bytes.saturating_add(line_len) > self.max_segment_bytes
408 {
409 self.next_segment_seq = self
410 .next_segment_seq
411 .checked_add(1)
412 .ok_or_else(|| Error::session("segment sequence overflow"))?;
413 self.next_frame_seq = 1;
414 self.current_segment_bytes = 0;
415
416 frame = SegmentFrame::new(
417 self.next_segment_seq,
418 self.next_frame_seq,
419 self.next_entry_seq,
420 frame.entry_id.clone(),
421 frame.parent_entry_id.clone(),
422 frame.entry_type.clone(),
423 frame.payload.clone(),
424 )?;
425 encoded = serde_json::to_vec(&frame)?;
426 line_len = line_length_u64(&encoded)?;
427 }
428
429 let segment_path = self.segment_file_path(self.next_segment_seq);
430
431 let mut write_buf = encoded;
433 write_buf.push(b'\n');
434
435 let is_new_segment = self.next_frame_seq == 1;
436 let mut segment = secure_open_options()
437 .create(true)
438 .write(true)
439 .truncate(is_new_segment)
440 .open(&segment_path)?;
441
442 let byte_offset = segment.seek(SeekFrom::End(0))?;
443 if let Err(e) = segment.write_all(&write_buf) {
444 let _ = segment.set_len(byte_offset);
445 return Err(Error::from(e));
446 }
447
448 let crc = crc32c_upper(&write_buf);
450 let index_entry = OffsetIndexEntry {
451 schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
452 entry_seq: frame.entry_seq,
453 entry_id: frame.entry_id.clone(),
454 segment_seq: frame.segment_seq,
455 frame_seq: frame.frame_seq,
456 byte_offset,
457 byte_length: line_len,
458 crc32c: crc.clone(),
459 state: Cow::Borrowed("active"),
460 };
461
462 if let Err(e) = append_jsonl_line(&self.index_file_path(), &index_entry) {
463 let _ = segment.set_len(byte_offset);
465 return Err(e);
466 }
467
468 self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
469 self.total_bytes = self.total_bytes.saturating_add(line_len);
470 self.last_entry_id = Some(frame.entry_id);
471 self.last_crc32c = crc;
472
473 self.next_entry_seq = self
474 .next_entry_seq
475 .checked_add(1)
476 .ok_or_else(|| Error::session("entry sequence overflow"))?;
477 self.next_frame_seq = self
478 .next_frame_seq
479 .checked_add(1)
480 .ok_or_else(|| Error::session("frame sequence overflow"))?;
481 self.current_segment_bytes = self.current_segment_bytes.saturating_add(line_len);
482
483 Ok(index_entry)
484 }
485
486 pub fn read_segment(&self, segment_seq: u64) -> Result<Vec<SegmentFrame>> {
487 let path = self.segment_file_path(segment_seq);
488 if !path.exists() {
489 return Ok(Vec::new());
490 }
491 read_jsonl::<SegmentFrame>(&path)
492 }
493
494 pub fn read_index(&self) -> Result<Vec<OffsetIndexEntry>> {
495 let path = self.index_file_path();
496 if !path.exists() {
497 return Ok(Vec::new());
498 }
499 read_jsonl::<OffsetIndexEntry>(&path)
500 }
501
502 pub fn lookup_entry(&self, target_entry_seq: u64) -> Result<Option<SegmentFrame>> {
505 let index_rows = self.read_index()?;
506 let row = index_rows.iter().find(|r| r.entry_seq == target_entry_seq);
507 let Some(row) = row else {
508 return Ok(None);
509 };
510 SegmentFileReader::new(self).read_frame(row)
511 }
512
513 pub fn read_entries_from(&self, from_entry_seq: u64) -> Result<Vec<SegmentFrame>> {
515 let index_rows = self.read_index()?;
516 let mut frames = Vec::new();
517 let mut reader = SegmentFileReader::new(self);
518 for row in &index_rows {
519 if row.entry_seq < from_entry_seq {
520 continue;
521 }
522 if let Some(frame) = reader.read_frame(row)? {
523 frames.push(frame);
524 }
525 }
526 Ok(frames)
527 }
528
529 pub fn read_all_entries(&self) -> Result<Vec<SegmentFrame>> {
531 self.read_entries_from(1)
532 }
533
534 pub fn read_tail_entries(&self, count: u64) -> Result<Vec<SegmentFrame>> {
536 let index_rows = self.read_index()?;
537 let total = index_rows.len();
538 let skip = total.saturating_sub(usize::try_from(count).unwrap_or(usize::MAX));
539 let mut frames = Vec::with_capacity(total.saturating_sub(skip));
540 let mut reader = SegmentFileReader::new(self);
541 for row in &index_rows[skip..] {
542 if let Some(frame) = reader.read_frame(row)? {
543 frames.push(frame);
544 }
545 }
546 Ok(frames)
547 }
548
549 pub fn read_active_path(&self, leaf_entry_id: &str) -> Result<Vec<SegmentFrame>> {
552 let index_rows = self.read_index()?;
553 let mut id_to_row: std::collections::HashMap<&str, &OffsetIndexEntry> =
554 std::collections::HashMap::with_capacity(index_rows.len());
555 for row in &index_rows {
556 if id_to_row.insert(row.entry_id.as_str(), row).is_some() {
557 return Err(Error::session(format!(
558 "duplicate entry_id detected while reading active path: {}",
559 row.entry_id
560 )));
561 }
562 }
563
564 let mut frames = Vec::new();
565 let mut current_id: Option<String> = Some(leaf_entry_id.to_string());
566 let mut reader = SegmentFileReader::new(self);
567 let mut visited = std::collections::HashSet::new();
568 while let Some(ref entry_id) = current_id {
569 if !visited.insert(entry_id.clone()) {
570 return Err(Error::session(format!(
571 "cyclic parent chain detected while reading active path at entry_id={entry_id}"
572 )));
573 }
574 let Some(&row) = id_to_row.get(entry_id.as_str()) else {
575 if frames.is_empty() {
576 break;
577 }
578 return Err(Error::session(format!(
579 "missing parent entry detected while reading active path at entry_id={entry_id}"
580 )));
581 };
582 match reader.read_frame(row)? {
583 Some(frame) => {
584 if frame.entry_id != row.entry_id {
585 return Err(Error::session(format!(
586 "active path index/frame mismatch for entry_id={} frame={}",
587 row.entry_id, frame.entry_id
588 )));
589 }
590 current_id.clone_from(&frame.parent_entry_id);
591 frames.push(frame);
592 }
593 None => {
594 return Err(Error::session(format!(
595 "index references missing frame while reading active path at entry_id={entry_id}"
596 )));
597 }
598 }
599 }
600 frames.reverse();
601 Ok(frames)
602 }
603
604 pub const fn entry_count(&self) -> u64 {
606 self.next_entry_seq.saturating_sub(1)
607 }
608
609 pub fn head(&self) -> Option<StoreHead> {
611 self.last_entry_id.as_ref().map(|entry_id| StoreHead {
612 segment_seq: self.next_segment_seq,
613 entry_seq: self.next_entry_seq.saturating_sub(1),
614 entry_id: entry_id.clone(),
615 })
616 }
617
618 fn checkpoint_path(&self, checkpoint_seq: u64) -> PathBuf {
619 self.root
620 .join("checkpoints")
621 .join(format!("{checkpoint_seq:016}.json"))
622 }
623
624 pub fn create_checkpoint(&self, checkpoint_seq: u64, reason: &str) -> Result<Checkpoint> {
626 let head = self.head().unwrap_or(StoreHead {
627 segment_seq: 0,
628 entry_seq: 0,
629 entry_id: String::new(),
630 });
631 let snapshot_ref = format!("checkpoints/{checkpoint_seq:016}.json");
632 let checkpoint = Checkpoint {
633 schema: CHECKPOINT_SCHEMA.to_string(),
634 checkpoint_seq,
635 at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
636 head_entry_seq: head.entry_seq,
637 head_entry_id: head.entry_id,
638 snapshot_ref,
639 compacted_before_entry_seq: 0,
640 chain_hash: self.chain_hash.clone(),
641 reason: reason.to_string(),
642 };
643 let tmp_path = self
644 .root
645 .join("tmp")
646 .join(format!("{checkpoint_seq:016}.json.tmp"));
647
648 let write_result: Result<()> = (|| {
649 let mut file = secure_open_options()
650 .create(true)
651 .write(true)
652 .truncate(true)
653 .open(&tmp_path)?;
654 file.write_all(&serde_json::to_vec_pretty(&checkpoint)?)?;
655 file.sync_all()?;
656 Ok(())
657 })();
658
659 if let Err(err) = write_result {
660 let _ = fs::remove_file(&tmp_path);
661 return Err(err);
662 }
663
664 let target_path = self.checkpoint_path(checkpoint_seq);
665 fs::rename(&tmp_path, &target_path)?;
666 sync_parent_dir(&target_path)?;
667 Ok(checkpoint)
668 }
669
670 pub fn read_checkpoint(&self, checkpoint_seq: u64) -> Result<Option<Checkpoint>> {
672 let path = self.checkpoint_path(checkpoint_seq);
673 if !path.exists() {
674 return Ok(None);
675 }
676 let data = fs::read_to_string(&path)?;
677 let cp: Checkpoint = serde_json::from_str(&data)?;
678 Ok(Some(cp))
679 }
680
681 pub fn append_migration_event(&self, mut event: MigrationEvent) -> Result<()> {
682 if event.schema.is_empty() {
683 event.schema = MIGRATION_EVENT_SCHEMA.to_string();
684 }
685 if event.at.is_empty() {
686 event.at = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
687 }
688 append_jsonl_line(&self.migration_ledger_path(), &event)
689 }
690
691 pub fn read_migration_events(&self) -> Result<Vec<MigrationEvent>> {
692 let path = self.migration_ledger_path();
693 if !path.exists() {
694 return Ok(Vec::new());
695 }
696 read_jsonl::<MigrationEvent>(&path)
697 }
698
699 #[allow(clippy::too_many_lines)]
700 pub fn rollback_to_checkpoint(
701 &mut self,
702 checkpoint_seq: u64,
703 migration_id: impl Into<String>,
704 correlation_id: impl Into<String>,
705 ) -> Result<MigrationEvent> {
706 let migration_id = migration_id.into();
707 let correlation_id = correlation_id.into();
708
709 let rollback_result: Result<MigrationEvent> = (|| {
710 let checkpoint = self
711 .read_checkpoint(checkpoint_seq)?
712 .ok_or_else(|| Error::session(format!("checkpoint {checkpoint_seq} not found")))?;
713
714 let mut index_rows = self.read_index()?;
715 index_rows.retain(|row| row.entry_seq <= checkpoint.head_entry_seq);
716
717 let mut keep_len_by_segment: std::collections::HashMap<u64, u64> =
718 std::collections::HashMap::new();
719 for row in &index_rows {
720 let end = row
721 .byte_offset
722 .checked_add(row.byte_length)
723 .ok_or_else(|| Error::session("index byte range overflow during rollback"))?;
724 keep_len_by_segment
725 .entry(row.segment_seq)
726 .and_modify(|current| *current = (*current).max(end))
727 .or_insert(end);
728 }
729
730 let segments_dir = self.root.join("segments");
731 if segments_dir.exists() {
732 let mut segment_files: Vec<(u64, PathBuf)> = Vec::new();
733 for entry in fs::read_dir(&segments_dir)? {
734 let entry = entry?;
735 let path = entry.path();
736 if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
737 continue;
738 }
739 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
740 continue;
741 };
742 let Ok(segment_seq) = stem.parse::<u64>() else {
743 continue;
744 };
745 segment_files.push((segment_seq, path));
746 }
747 segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
748
749 for (segment_seq, path) in segment_files {
750 match keep_len_by_segment.get(&segment_seq).copied() {
751 Some(keep_len) if keep_len > 0 => {
752 let current_len = fs::metadata(&path)?.len();
753 if keep_len < current_len {
754 truncate_file_to(&path, keep_len)?;
755 }
756 }
757 _ => {
758 fs::remove_file(&path)?;
759 }
760 }
761 }
762 }
763
764 let index_path = self.index_file_path();
765 let index_tmp = self.root.join("tmp").join("offsets.rollback.tmp");
766 if let Some(parent) = index_tmp.parent() {
767 let _ = fs::create_dir_all(parent);
768 }
769 write_jsonl_lines(&index_tmp, &index_rows)?;
770 fs::rename(&index_tmp, &index_path)?;
771 let _ = sync_parent_dir(&index_path);
772
773 self.next_segment_seq = 1;
774 self.next_frame_seq = 1;
775 self.next_entry_seq = 1;
776 self.current_segment_bytes = 0;
777 self.bootstrap_from_disk()?;
778
779 let verification = MigrationVerification {
780 entry_count_match: self.entry_count() == checkpoint.head_entry_seq,
781 hash_chain_match: self.chain_hash == checkpoint.chain_hash,
782 index_consistent: self.validate_integrity().is_ok(),
783 };
784
785 let (outcome, error_class) = if verification.entry_count_match
786 && verification.hash_chain_match
787 && verification.index_consistent
788 {
789 ("ok".to_string(), None)
790 } else if verification.index_consistent {
791 (
792 "recoverable_error".to_string(),
793 Some("integrity_mismatch".to_string()),
794 )
795 } else {
796 (
797 "fatal_error".to_string(),
798 Some("index_corruption".to_string()),
799 )
800 };
801
802 let event = MigrationEvent {
803 schema: MIGRATION_EVENT_SCHEMA.to_string(),
804 migration_id: migration_id.clone(),
805 phase: "rollback".to_string(),
806 at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
807 source_path: checkpoint.snapshot_ref,
808 target_path: self.root.display().to_string(),
809 source_format: "native_v2".to_string(),
810 target_format: "native_v2".to_string(),
811 verification,
812 outcome: outcome.clone(),
813 error_class,
814 correlation_id: correlation_id.clone(),
815 };
816 self.append_migration_event(event.clone())?;
817
818 if outcome == "ok" {
819 Ok(event)
820 } else {
821 Err(Error::session(format!(
822 "rollback verification failed for checkpoint {checkpoint_seq}"
823 )))
824 }
825 })();
826
827 match rollback_result {
828 Ok(event) => Ok(event),
829 Err(error) => {
830 if !rollback_failure_event_already_recorded(&error) {
831 let failure_event = MigrationEvent {
832 schema: MIGRATION_EVENT_SCHEMA.to_string(),
833 migration_id,
834 phase: "rollback".to_string(),
835 at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
836 source_path: self.checkpoint_path(checkpoint_seq).display().to_string(),
837 target_path: self.root.display().to_string(),
838 source_format: "native_v2".to_string(),
839 target_format: "native_v2".to_string(),
840 verification: MigrationVerification {
841 entry_count_match: false,
842 hash_chain_match: false,
843 index_consistent: false,
844 },
845 outcome: "fatal_error".to_string(),
846 error_class: Some(classify_rollback_error(&error).to_string()),
847 correlation_id,
848 };
849 let _ = self.append_migration_event(failure_event);
850 }
851 Err(error)
852 }
853 }
854 }
855
856 #[allow(clippy::too_many_lines)]
857 pub fn write_manifest(
858 &self,
859 session_id: impl Into<String>,
860 source_format: impl Into<String>,
861 ) -> Result<Manifest> {
862 let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
863 let created_at = self
864 .read_manifest()?
865 .map_or_else(|| now.clone(), |m| m.created_at);
866 let session_id = session_id.into();
867 let source_format = source_format.into();
868 let index_rows = self.read_index()?;
869
870 let mut parent_counts: std::collections::HashMap<String, u64> =
871 std::collections::HashMap::new();
872 let mut message_count = 0u64;
873 let mut compaction_count = 0u64;
874 let mut entry_ids = std::collections::HashSet::with_capacity(index_rows.len());
875
876 let mut recomputed_chain = GENESIS_CHAIN_HASH.to_string();
877 let mut parent_links_closed = true;
878 let mut reader = SegmentFileReader::new(self);
879
880 for row in &index_rows {
881 if let Some(frame) = reader.read_frame(row)? {
882 entry_ids.insert(frame.entry_id.clone());
883
884 if frame.entry_type == "message" {
885 message_count = message_count.saturating_add(1);
886 }
887 if frame.entry_type == "compaction" {
888 compaction_count = compaction_count.saturating_add(1);
889 }
890
891 if let Some(parent_id) = frame.parent_entry_id.as_deref() {
892 *parent_counts.entry(parent_id.to_string()).or_insert(0) += 1;
893
894 if !entry_ids.contains(parent_id) {
897 parent_links_closed = false;
898 }
899 }
900
901 recomputed_chain = chain_hash_step(&recomputed_chain, &frame.payload_sha256);
902 }
903 }
904
905 let branches_total = u64::try_from(parent_counts.values().filter(|&&n| n > 1).count())
906 .map_err(|_| Error::session("branch count exceeds u64"))?;
907
908 let mut monotonic_entry_seq = true;
909 let mut monotonic_segment_seq = true;
910 let mut last_entry_seq = 0u64;
911 let mut last_segment_seq = 0u64;
912 for row in &index_rows {
913 if row.entry_seq <= last_entry_seq {
914 monotonic_entry_seq = false;
915 }
916 if row.segment_seq < last_segment_seq {
917 monotonic_segment_seq = false;
918 }
919 last_entry_seq = row.entry_seq;
920 last_segment_seq = row.segment_seq;
921 }
922
923 let hash_chain_valid = recomputed_chain == self.chain_hash;
924
925 let head = self.head().unwrap_or(StoreHead {
926 segment_seq: 0,
927 entry_seq: 0,
928 entry_id: String::new(),
929 });
930 let segment_count = u64::try_from(
931 index_rows
932 .iter()
933 .map(|row| row.segment_seq)
934 .collect::<BTreeSet<_>>()
935 .len(),
936 )
937 .map_err(|_| Error::session("segment count exceeds u64"))?;
938
939 let mut manifest = Manifest {
940 schema: MANIFEST_SCHEMA.to_string(),
941 store_version: 2,
942 session_id,
943 source_format,
944 created_at,
945 updated_at: now,
946 head,
947 counters: ManifestCounters {
948 entries_total: u64::try_from(index_rows.len())
949 .map_err(|_| Error::session("entry count exceeds u64"))?,
950 messages_total: message_count,
951 branches_total,
952 compactions_total: compaction_count,
953 bytes_total: self.total_bytes,
954 },
955 files: ManifestFiles {
956 segment_dir: "segments/".to_string(),
957 segment_count,
958 index_path: "index/offsets.jsonl".to_string(),
959 checkpoint_dir: "checkpoints/".to_string(),
960 migration_ledger_path: "migrations/ledger.jsonl".to_string(),
961 },
962 integrity: ManifestIntegrity {
963 chain_hash: self.chain_hash.clone(),
964 manifest_hash: String::new(),
965 last_crc32c: self.last_crc32c.clone(),
966 },
967 invariants: ManifestInvariants {
968 parent_links_closed,
969 monotonic_entry_seq,
970 monotonic_segment_seq,
971 index_within_segment_bounds: self.validate_integrity().is_ok(),
972 branch_heads_indexed: true,
973 checkpoints_monotonic: true,
974 hash_chain_valid,
975 },
976 };
977 manifest.integrity.manifest_hash = manifest_hash_hex(&manifest)?;
978
979 let tmp = self.root.join("tmp").join("manifest.json.tmp");
980
981 let write_result: Result<()> = (|| {
982 let mut file = secure_open_options()
983 .create(true)
984 .write(true)
985 .truncate(true)
986 .open(&tmp)?;
987 file.write_all(&serde_json::to_vec_pretty(&manifest)?)?;
988 file.sync_all()?;
989 Ok(())
990 })();
991
992 if let Err(err) = write_result {
993 let _ = fs::remove_file(&tmp);
994 return Err(err);
995 }
996
997 let target_path = self.manifest_path();
998 fs::rename(&tmp, &target_path)?;
999 sync_parent_dir(&target_path)?;
1000 Ok(manifest)
1001 }
1002
1003 pub fn read_manifest(&self) -> Result<Option<Manifest>> {
1004 let path = self.manifest_path();
1005 if !path.exists() {
1006 return Ok(None);
1007 }
1008 let content = fs::read_to_string(&path)?;
1009 let manifest: Manifest = serde_json::from_str(&content).map_err(|err| {
1010 Error::session(format!(
1011 "Failed to parse manifest {}: {err}",
1012 path.display()
1013 ))
1014 })?;
1015 Ok(Some(manifest))
1016 }
1017
1018 pub fn chain_hash(&self) -> &str {
1019 &self.chain_hash
1020 }
1021
1022 pub const fn total_bytes(&self) -> u64 {
1023 self.total_bytes
1024 }
1025
1026 pub fn index_summary(&self) -> Result<Option<IndexSummary>> {
1027 let rows = self.read_index()?;
1028 let (Some(first), Some(last)) = (rows.first(), rows.last()) else {
1029 return Ok(None);
1030 };
1031 Ok(Some(IndexSummary {
1032 entry_count: u64::try_from(rows.len())
1033 .map_err(|_| Error::session("entry count exceeds u64"))?,
1034 first_entry_seq: first.entry_seq,
1035 last_entry_seq: last.entry_seq,
1036 last_entry_id: last.entry_id.clone(),
1037 }))
1038 }
1039
1040 #[allow(clippy::too_many_lines)]
1043 pub fn rebuild_index(&mut self) -> Result<u64> {
1044 let mut rebuilt_count = 0u64;
1045 let index_path = self.index_file_path();
1046 let index_tmp_path = self.root.join("tmp").join("offsets.rebuild.tmp");
1047
1048 if let Some(parent) = index_tmp_path.parent() {
1050 fs::create_dir_all(parent)?;
1051 }
1052
1053 if index_tmp_path.exists() {
1055 fs::remove_file(&index_tmp_path)?;
1056 }
1057
1058 let mut index_writer = std::io::BufWriter::new(
1059 secure_open_options()
1060 .create(true)
1061 .write(true)
1062 .truncate(true)
1063 .open(&index_tmp_path)?,
1064 );
1065
1066 self.chain_hash = GENESIS_CHAIN_HASH.to_string();
1067 self.total_bytes = 0;
1068 self.last_entry_id = None;
1069 self.last_crc32c = "00000000".to_string();
1070
1071 let segment_files = self.list_segment_files()?;
1072 let mut last_observed_seq = 0u64;
1073
1074 'segments: for (i, (segment_seq, seg_path)) in segment_files.iter().enumerate() {
1075 let file = File::open(seg_path)?;
1076 let mut reader = BufReader::new(file);
1077 let mut byte_offset = 0u64;
1078 let mut line_number = 0u64;
1079 let mut expected_frame_seq = 1u64;
1080 let mut line = String::new();
1081
1082 loop {
1083 line.clear();
1084 let bytes_read =
1086 match read_line_with_limit(&mut reader, &mut line, MAX_FRAME_READ_BYTES) {
1087 Ok(n) => n,
1088 Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
1089 return Err(Error::session(format!(
1090 "failed to read segment frame while rebuilding index: \
1091 segment={} line={}: {e}",
1092 seg_path.display(),
1093 line_number.saturating_add(1),
1094 )));
1095 }
1096 Err(e) => return Err(Error::Io(Box::new(e))),
1097 };
1098
1099 if bytes_read == 0 {
1100 break;
1101 }
1102 line_number = line_number.saturating_add(1);
1103 let mut line_len = u64::try_from(bytes_read)
1104 .map_err(|_| Error::session("line length exceeds u64"))?;
1105
1106 if line.trim().is_empty() {
1107 byte_offset = byte_offset.saturating_add(line_len);
1108 continue;
1109 }
1110
1111 let missing_newline = !line.ends_with('\n');
1112 let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1113 let frame: SegmentFrame = match serde_json::from_str(json_line) {
1114 Ok(frame) => {
1115 if missing_newline {
1116 use std::io::{Read, Write};
1117 tracing::warn!(
1118 segment = %seg_path.display(),
1119 line_number,
1120 "SessionStoreV2 encountered valid frame missing trailing newline; healing segment"
1121 );
1122 let mut f = secure_open_options().append(true).open(seg_path)?;
1123 f.write_all(b"\n")?;
1124 line.push('\n');
1125 line_len += 1;
1126 let mut healed_newline = [0u8; 1];
1128 reader.read_exact(&mut healed_newline).map_err(|err| {
1129 Error::session(format!(
1130 "failed to consume healed newline while rebuilding index: \
1131 segment={} line={line_number}: {err}",
1132 seg_path.display()
1133 ))
1134 })?;
1135 if healed_newline[0] != b'\n' {
1136 return Err(Error::session(format!(
1137 "healed newline read back as non-newline byte while rebuilding index: \
1138 segment={} line={line_number}: 0x{:02X}",
1139 seg_path.display(),
1140 healed_newline[0]
1141 )));
1142 }
1143 }
1144 frame
1145 }
1146 Err(err) => {
1147 let at_eof = reader.fill_buf().is_ok_and(<[u8]>::is_empty);
1148 if !at_eof || !missing_newline {
1149 return Err(Error::session(format!(
1150 "failed to parse segment frame while rebuilding index: \
1151 segment={} line={line_number}: {err}",
1152 seg_path.display()
1153 )));
1154 }
1155 tracing::warn!(
1156 segment = %seg_path.display(),
1157 line_number,
1158 error = %err,
1159 at_eof,
1160 missing_newline,
1161 "SessionStoreV2 dropping corrupted frame during index rebuild; truncating segment and quarantining subsequent segments"
1162 );
1163 drop(reader);
1165 truncate_file_to(seg_path, byte_offset)?;
1166 quarantine_segment_tail(&segment_files[i + 1..])?;
1167 break 'segments;
1168 }
1169 };
1170
1171 if frame.segment_seq != *segment_seq || frame.frame_seq != expected_frame_seq {
1172 tracing::warn!(
1173 segment = %seg_path.display(),
1174 line_number,
1175 expected_segment_seq = *segment_seq,
1176 actual_segment_seq = frame.segment_seq,
1177 expected_frame_seq,
1178 actual_frame_seq = frame.frame_seq,
1179 "SessionStoreV2 detected mismatched embedded frame coordinates during rebuild; truncating segment and quarantining subsequent segments"
1180 );
1181 drop(reader);
1182 truncate_file_to(seg_path, byte_offset)?;
1183 quarantine_segment_tail(&segment_files[i + 1..])?;
1184 break 'segments;
1185 }
1186
1187 if frame.entry_seq <= last_observed_seq {
1188 tracing::warn!(
1189 segment = %seg_path.display(),
1190 line_number,
1191 entry_seq = frame.entry_seq,
1192 last_seq = last_observed_seq,
1193 "SessionStoreV2 detected non-monotonic entry sequence during rebuild; truncating segment and quarantining subsequent segments"
1194 );
1195 drop(reader);
1196 truncate_file_to(seg_path, byte_offset)?;
1197 quarantine_segment_tail(&segment_files[i + 1..])?;
1198 break 'segments;
1199 }
1200 last_observed_seq = frame.entry_seq;
1201
1202 let crc = crc32c_upper(line.as_bytes());
1203
1204 let index_entry = OffsetIndexEntry {
1205 schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
1206 entry_seq: frame.entry_seq,
1207 entry_id: frame.entry_id.clone(),
1208 segment_seq: *segment_seq,
1209 frame_seq: expected_frame_seq,
1210 byte_offset,
1211 byte_length: line_len,
1212 crc32c: crc.clone(),
1213 state: Cow::Borrowed("active"),
1214 };
1215 serde_json::to_writer(&mut index_writer, &index_entry)?;
1216 index_writer.write_all(b"\n")?;
1217
1218 self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
1219 self.total_bytes = self.total_bytes.saturating_add(line_len);
1220 self.last_entry_id = Some(frame.entry_id);
1221 self.last_crc32c = crc;
1222
1223 byte_offset = byte_offset.saturating_add(line_len);
1224 rebuilt_count = rebuilt_count.saturating_add(1);
1225 expected_frame_seq = expected_frame_seq
1226 .checked_add(1)
1227 .ok_or_else(|| Error::session("frame sequence overflow during rebuild"))?;
1228 }
1229 }
1230
1231 index_writer.flush()?;
1232 let file = index_writer
1233 .into_inner()
1234 .map_err(std::io::IntoInnerError::into_error)?;
1235 file.sync_all()?;
1236 drop(file); fs::rename(&index_tmp_path, &index_path)?;
1240 sync_parent_dir(&index_path)?;
1241
1242 self.next_segment_seq = 1;
1243 self.next_frame_seq = 1;
1244 self.next_entry_seq = 1;
1245 self.current_segment_bytes = 0;
1246 self.bootstrap_from_disk()?;
1247
1248 Ok(rebuilt_count)
1249 }
1250
1251 pub fn validate_integrity(&self) -> Result<()> {
1252 let index_rows = self.read_index()?;
1253 let mut last_entry_seq = 0;
1254 let mut parent_by_entry: std::collections::HashMap<String, Option<String>> =
1255 std::collections::HashMap::with_capacity(index_rows.len());
1256
1257 let mut rows_by_segment: std::collections::BTreeMap<u64, Vec<&OffsetIndexEntry>> =
1259 std::collections::BTreeMap::new();
1260 for row in &index_rows {
1261 if row.entry_seq <= last_entry_seq {
1262 return Err(Error::session(format!(
1263 "entry sequence is not strictly increasing at entry_seq={}",
1264 row.entry_seq
1265 )));
1266 }
1267 last_entry_seq = row.entry_seq;
1268 rows_by_segment
1269 .entry(row.segment_seq)
1270 .or_default()
1271 .push(row);
1272 }
1273
1274 for (segment_seq, rows) in rows_by_segment {
1275 let segment_path = self.segment_file_path(segment_seq);
1276 let mut file = File::open(&segment_path).map_err(|err| {
1277 Error::session(format!(
1278 "failed to open segment {}: {err}",
1279 segment_path.display()
1280 ))
1281 })?;
1282 let segment_len = file.metadata()?.len();
1283
1284 for row in rows {
1285 let end = row
1286 .byte_offset
1287 .checked_add(row.byte_length)
1288 .ok_or_else(|| Error::session("index byte range overflow"))?;
1289 if end > segment_len {
1290 return Err(Error::session(format!(
1291 "index out of bounds for segment {}: end={} len={segment_len}",
1292 segment_path.display(),
1293 end
1294 )));
1295 }
1296
1297 file.seek(SeekFrom::Start(row.byte_offset))?;
1298 let mut record_bytes = vec![
1299 0u8;
1300 usize::try_from(row.byte_length).map_err(|_| {
1301 Error::session(format!("byte length too large: {}", row.byte_length))
1302 })?
1303 ];
1304 file.read_exact(&mut record_bytes)?;
1305
1306 let checksum = crc32c_upper(&record_bytes);
1307 if checksum != row.crc32c {
1308 return Err(Error::session(format!(
1309 "checksum mismatch for entry_seq={} expected={} actual={checksum}",
1310 row.entry_seq, row.crc32c
1311 )));
1312 }
1313
1314 if record_bytes.last() == Some(&b'\n') {
1315 record_bytes.pop();
1316 }
1317 let frame: SegmentFrame = serde_json::from_slice(&record_bytes)?;
1318
1319 if frame.entry_seq != row.entry_seq
1320 || frame.entry_id != row.entry_id
1321 || frame.segment_seq != row.segment_seq
1322 || frame.frame_seq != row.frame_seq
1323 {
1324 return Err(Error::session(format!(
1325 "index/frame mismatch at entry_seq={}",
1326 row.entry_seq
1327 )));
1328 }
1329
1330 let (payload_hash, payload_bytes) = payload_hash_and_size(&frame.payload)?;
1331 if frame.payload_sha256 != payload_hash || frame.payload_bytes != payload_bytes {
1332 return Err(Error::session(format!(
1333 "payload integrity mismatch at entry_seq={}",
1334 row.entry_seq
1335 )));
1336 }
1337
1338 if parent_by_entry
1339 .insert(frame.entry_id.clone(), frame.parent_entry_id.clone())
1340 .is_some()
1341 {
1342 return Err(Error::session(format!(
1343 "duplicate entry_id detected in session store: {}",
1344 frame.entry_id
1345 )));
1346 }
1347 }
1348 }
1349
1350 validate_parent_graph_links(&parent_by_entry)?;
1351 validate_parent_graph_acyclic(&parent_by_entry)?;
1352
1353 Ok(())
1354 }
1355
1356 fn bootstrap_from_disk(&mut self) -> Result<()> {
1357 let index_rows = self.read_index()?;
1358 if let Some(last) = index_rows.last() {
1359 self.next_entry_seq = last
1360 .entry_seq
1361 .checked_add(1)
1362 .ok_or_else(|| Error::session("entry sequence overflow while bootstrapping"))?;
1363 self.next_segment_seq = last.segment_seq;
1364 self.next_frame_seq = last
1365 .frame_seq
1366 .checked_add(1)
1367 .ok_or_else(|| Error::session("frame sequence overflow while bootstrapping"))?;
1368 let segment_path = self.segment_file_path(last.segment_seq);
1369 let expected_segment_bytes = last.byte_offset.saturating_add(last.byte_length);
1370 let actual_segment_bytes =
1371 fs::metadata(&segment_path)
1372 .map(|meta| meta.len())
1373 .map_err(|err| {
1374 Error::session(format!(
1375 "failed to stat active segment {} while bootstrapping: {err}",
1376 segment_path.display()
1377 ))
1378 })?;
1379
1380 if actual_segment_bytes > expected_segment_bytes {
1381 tracing::warn!(
1382 segment = %segment_path.display(),
1383 expected = expected_segment_bytes,
1384 actual = actual_segment_bytes,
1385 "SessionStoreV2 truncating unindexed trailing bytes from active segment after crash recovery"
1386 );
1387 truncate_file_to(&segment_path, expected_segment_bytes)?;
1388 }
1389 self.current_segment_bytes = expected_segment_bytes;
1390 self.last_entry_id = Some(last.entry_id.clone());
1391 self.last_crc32c.clone_from(&last.crc32c);
1392
1393 let mut chain = GENESIS_CHAIN_HASH.to_string();
1394 let mut total = 0u64;
1395 let mut reader = SegmentFileReader::new(self);
1396 for row in &index_rows {
1397 let frame = reader.read_frame(row)?.ok_or_else(|| {
1398 Error::session(format!(
1399 "index references missing frame during bootstrap: entry_seq={}, segment={}",
1400 row.entry_seq, row.segment_seq
1401 ))
1402 })?;
1403 chain = chain_hash_step(&chain, &frame.payload_sha256);
1404 total = total.saturating_add(row.byte_length);
1405 }
1406 self.chain_hash = chain;
1407 self.total_bytes = total;
1408 } else {
1409 self.chain_hash = GENESIS_CHAIN_HASH.to_string();
1410 self.total_bytes = 0;
1411 self.last_entry_id = None;
1412 self.last_crc32c = "00000000".to_string();
1413 }
1414 Ok(())
1415 }
1416}
1417
1418fn rollback_failure_event_already_recorded(error: &Error) -> bool {
1419 matches!(error, Error::Session(message) if message.contains("rollback verification failed"))
1420}
1421
1422fn classify_rollback_error(error: &Error) -> &'static str {
1423 match error {
1424 Error::Session(message) => {
1425 if message.contains("checkpoint") && message.contains("not found") {
1426 "checkpoint_not_found"
1427 } else if message.contains("index byte range overflow") {
1428 "index_range_overflow"
1429 } else if message.contains("rollback verification failed") {
1430 "rollback_verification_failed"
1431 } else {
1432 "session_error"
1433 }
1434 }
1435 _ => error.category_code(),
1436 }
1437}
1438
1439fn is_recoverable_index_error(error: &Error) -> bool {
1440 match error {
1441 Error::Json(_) => true,
1442 Error::Io(err) => matches!(
1443 err.kind(),
1444 std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::InvalidData
1445 ),
1446 Error::Session(message) => {
1447 let lower = message.to_ascii_lowercase();
1448 lower.contains("checksum mismatch")
1449 || lower.contains("index out of bounds")
1450 || lower.contains("index/frame mismatch")
1451 || lower.contains("index references missing frame")
1452 || lower.contains("payload integrity mismatch")
1453 || lower.contains("entry sequence is not strictly increasing")
1454 || lower.contains("index byte range overflow")
1455 || lower.contains("failed to stat active segment")
1456 }
1457 _ => false,
1458 }
1459}
1460
1461#[derive(Clone, Copy, PartialEq, Eq)]
1462enum ParentGraphVisitState {
1463 Visiting,
1464 Visited,
1465}
1466
1467fn validate_parent_graph_links(
1468 parent_by_entry: &std::collections::HashMap<String, Option<String>>,
1469) -> Result<()> {
1470 for (entry_id, parent_id) in parent_by_entry {
1471 if let Some(parent_id) = parent_id.as_deref()
1472 && !parent_by_entry.contains_key(parent_id)
1473 {
1474 return Err(Error::session(format!(
1475 "missing parent entry detected in session store: entry_id={entry_id} parent_id={parent_id}"
1476 )));
1477 }
1478 }
1479
1480 Ok(())
1481}
1482
1483fn validate_parent_graph_acyclic(
1484 parent_by_entry: &std::collections::HashMap<String, Option<String>>,
1485) -> Result<()> {
1486 let mut visit_state: std::collections::HashMap<&str, ParentGraphVisitState> =
1487 std::collections::HashMap::with_capacity(parent_by_entry.len());
1488
1489 for entry_id in parent_by_entry.keys() {
1490 if visit_state.get(entry_id.as_str()) == Some(&ParentGraphVisitState::Visited) {
1491 continue;
1492 }
1493
1494 let mut stack = vec![(entry_id.as_str(), false)];
1495 while let Some((current_id, expanded)) = stack.pop() {
1496 if expanded {
1497 visit_state.insert(current_id, ParentGraphVisitState::Visited);
1498 continue;
1499 }
1500
1501 match visit_state.get(current_id).copied() {
1502 Some(ParentGraphVisitState::Visited) => continue,
1503 Some(ParentGraphVisitState::Visiting) => {
1504 return Err(Error::session(format!(
1505 "cyclic parent chain detected in session store at entry_id={current_id}"
1506 )));
1507 }
1508 None => {}
1509 }
1510
1511 visit_state.insert(current_id, ParentGraphVisitState::Visiting);
1512 stack.push((current_id, true));
1513
1514 if let Some(parent_id) = parent_by_entry
1515 .get(current_id)
1516 .and_then(std::option::Option::as_deref)
1517 && parent_by_entry.contains_key(parent_id)
1518 {
1519 stack.push((parent_id, false));
1520 }
1521 }
1522 }
1523
1524 Ok(())
1525}
1526
1527pub fn frame_to_session_entry(frame: &SegmentFrame) -> Result<SessionEntry> {
1529 let entry: SessionEntry = serde_json::from_str(frame.payload.get()).map_err(|e| {
1532 Error::session(format!(
1533 "failed to deserialize SessionEntry from frame entry_id={}: {e}",
1534 frame.entry_id
1535 ))
1536 })?;
1537
1538 if let Some(base_id) = entry.base_id() {
1539 if base_id != &frame.entry_id {
1540 return Err(Error::session(format!(
1541 "frame entry_id mismatch: frame={} entry={}",
1542 frame.entry_id, base_id
1543 )));
1544 }
1545 }
1546
1547 Ok(entry)
1548}
1549
1550pub fn session_entry_to_frame_args(
1552 entry: &SessionEntry,
1553) -> Result<(String, Option<String>, String, Value)> {
1554 let base = entry.base();
1555 let entry_id = base
1556 .id
1557 .clone()
1558 .ok_or_else(|| Error::session("SessionEntry has no id"))?;
1559 let parent_entry_id = base.parent_id.clone();
1560
1561 let entry_type = match entry {
1562 SessionEntry::Message(_) => "message",
1563 SessionEntry::ModelChange(_) => "model_change",
1564 SessionEntry::ThinkingLevelChange(_) => "thinking_level_change",
1565 SessionEntry::Compaction(_) => "compaction",
1566 SessionEntry::BranchSummary(_) => "branch_summary",
1567 SessionEntry::Label(_) => "label",
1568 SessionEntry::SessionInfo(_) => "session_info",
1569 SessionEntry::Custom(_) => "custom",
1570 };
1571
1572 let payload = serde_json::to_value(entry).map_err(|e| {
1573 Error::session(format!(
1574 "failed to serialize SessionEntry to frame payload: {e}"
1575 ))
1576 })?;
1577
1578 Ok((entry_id, parent_entry_id, entry_type.to_string(), payload))
1579}
1580
1581struct SegmentFileReader<'a> {
1583 store: &'a SessionStoreV2,
1584 current_segment_seq: Option<u64>,
1585 current_file: Option<File>,
1586 current_len: u64,
1587}
1588
1589impl<'a> SegmentFileReader<'a> {
1590 const fn new(store: &'a SessionStoreV2) -> Self {
1591 Self {
1592 store,
1593 current_segment_seq: None,
1594 current_file: None,
1595 current_len: 0,
1596 }
1597 }
1598
1599 fn read_frame(&mut self, row: &OffsetIndexEntry) -> Result<Option<SegmentFrame>> {
1600 if self.current_segment_seq != Some(row.segment_seq) {
1601 self.current_segment_seq = Some(row.segment_seq);
1602 let path = self.store.segment_file_path(row.segment_seq);
1603 if path.exists() {
1604 let file = File::open(&path)?;
1605 self.current_len = file.metadata()?.len();
1606 self.current_file = Some(file);
1607 } else {
1608 self.current_file = None;
1609 }
1610 }
1611
1612 let Some(file) = self.current_file.as_mut() else {
1613 return Ok(None);
1614 };
1615
1616 let end_offset = row
1617 .byte_offset
1618 .checked_add(row.byte_length)
1619 .ok_or_else(|| Error::session("index byte range overflow"))?;
1620
1621 if end_offset > self.current_len {
1622 return Err(Error::session(format!(
1623 "index out of bounds for segment {}: end={} len={}",
1624 self.store.segment_file_path(row.segment_seq).display(),
1625 end_offset,
1626 self.current_len
1627 )));
1628 }
1629
1630 file.seek(SeekFrom::Start(row.byte_offset))?;
1631 let byte_len = usize::try_from(row.byte_length)
1632 .map_err(|_| Error::session(format!("byte length too large: {}", row.byte_length)))?;
1633
1634 if row.byte_length > self.store.max_segment_bytes.max(100 * 1024 * 1024) {
1635 return Err(Error::session(format!(
1636 "frame byte length {byte_len} exceeds limit"
1637 )));
1638 }
1639
1640 let mut buf = vec![0u8; byte_len];
1641 file.read_exact(&mut buf)?;
1642 if buf.last() == Some(&b'\n') {
1643 buf.pop();
1644 }
1645 let frame: SegmentFrame = serde_json::from_slice(&buf)?;
1646 Ok(Some(frame))
1647 }
1648}
1649
1650fn chain_hash_step(prev_chain: &str, payload_sha256: &str) -> String {
1652 let mut hasher = Sha256::new();
1653 hasher.update(prev_chain.as_bytes());
1654 hasher.update(payload_sha256.as_bytes());
1655 format!("{:x}", hasher.finalize())
1656}
1657
1658fn manifest_hash_hex(manifest: &Manifest) -> Result<String> {
1659 let encoded = serde_json::to_vec(manifest)?;
1660 Ok(format!("{:x}", Sha256::digest(&encoded)))
1661}
1662
1663pub fn v2_sidecar_path(jsonl_path: &Path) -> PathBuf {
1665 let stem = jsonl_path.file_stem().map_or_else(
1666 || "session".to_string(),
1667 |s| s.to_string_lossy().into_owned(),
1668 );
1669 let parent = jsonl_path.parent().unwrap_or_else(|| Path::new("."));
1670 parent.join(format!("{stem}.v2"))
1671}
1672
1673pub fn has_v2_sidecar(jsonl_path: &Path) -> bool {
1675 let root = v2_sidecar_path(jsonl_path);
1676 root.join("manifest.json").exists() || root.join("index").join("offsets.jsonl").exists()
1677}
1678
1679fn append_jsonl_line<T: Serialize>(path: &Path, value: &T) -> Result<()> {
1680 let file = secure_open_options().create(true).append(true).open(path)?;
1681 let mut writer = std::io::BufWriter::new(file);
1682 serde_json::to_writer(&mut writer, value)?;
1685 writer.write_all(b"\n")?;
1686 writer.flush()?;
1687 Ok(())
1688}
1689
1690fn truncate_file_to(path: &Path, len: u64) -> Result<()> {
1691 let file = secure_open_options()
1692 .write(true)
1693 .truncate(false)
1694 .open(path)?;
1695 file.set_len(len)?;
1696 file.sync_all()?;
1697 Ok(())
1698}
1699
1700fn quarantine_segment_file(path: &Path) -> Result<PathBuf> {
1701 let parent = path
1702 .parent()
1703 .ok_or_else(|| Error::session(format!("segment has no parent: {}", path.display())))?;
1704 let file_name = path
1705 .file_name()
1706 .map(|name| name.to_string_lossy().into_owned())
1707 .ok_or_else(|| Error::session(format!("segment has no filename: {}", path.display())))?;
1708
1709 for suffix in 0u32..10_000 {
1710 let backup_name = if suffix == 0 {
1711 format!("{file_name}.bak")
1712 } else {
1713 format!("{file_name}.bak.{suffix}")
1714 };
1715 let backup_path = parent.join(backup_name);
1716 if backup_path.exists() {
1717 continue;
1718 }
1719
1720 fs::rename(path, &backup_path).map_err(|err| {
1721 Error::session(format!(
1722 "failed to quarantine segment {} -> {}: {err}",
1723 path.display(),
1724 backup_path.display()
1725 ))
1726 })?;
1727 return Ok(backup_path);
1728 }
1729
1730 Err(Error::session(format!(
1731 "failed to quarantine segment {}: exhausted backup suffixes",
1732 path.display()
1733 )))
1734}
1735
1736fn quarantine_segment_tail(segment_files: &[(u64, PathBuf)]) -> Result<()> {
1737 for (_, path) in segment_files {
1738 let backup_path = quarantine_segment_file(path)?;
1739 tracing::warn!(
1740 segment = %path.display(),
1741 backup = %backup_path.display(),
1742 "SessionStoreV2 quarantined trailing segment during rebuild"
1743 );
1744 }
1745 Ok(())
1746}
1747
1748#[cfg(unix)]
1749fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
1750 let Some(parent) = path.parent() else {
1751 return Ok(());
1752 };
1753 File::open(parent)?.sync_all()
1754}
1755
1756#[cfg(not(unix))]
1757fn sync_parent_dir(_path: &Path) -> std::io::Result<()> {
1758 Ok(())
1759}
1760
1761fn write_jsonl_lines<T: Serialize>(path: &Path, rows: &[T]) -> Result<()> {
1762 let file = secure_open_options()
1763 .create(true)
1764 .write(true)
1765 .truncate(true)
1766 .open(path)?;
1767 let mut writer = std::io::BufWriter::new(file);
1768 for row in rows {
1769 serde_json::to_writer(&mut writer, row)?;
1770 writer.write_all(b"\n")?;
1771 }
1772 writer.flush()?;
1773 let file = writer
1774 .into_inner()
1775 .map_err(std::io::IntoInnerError::into_error)?;
1776 file.sync_all()?;
1777 Ok(())
1778}
1779
1780fn read_jsonl<T: for<'de> Deserialize<'de>>(path: &Path) -> Result<Vec<T>> {
1781 let file = File::open(path)?;
1782 let mut reader = BufReader::new(file);
1783 let mut out = Vec::new();
1784 let mut line = String::new();
1785 loop {
1786 line.clear();
1787 let bytes_read = read_line_with_limit(&mut reader, &mut line, MAX_FRAME_READ_BYTES)
1788 .map_err(|e| Error::Io(Box::new(e)))?;
1789 if bytes_read == 0 {
1790 break;
1791 }
1792 if line.trim().is_empty() {
1793 continue;
1794 }
1795 let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1796 out.push(serde_json::from_str::<T>(json_line)?);
1797 }
1798 Ok(out)
1799}
1800
1801fn payload_hash_and_size(payload: &RawValue) -> Result<(String, u64)> {
1802 let bytes = payload.get().as_bytes();
1804 let payload_bytes = u64::try_from(bytes.len())
1805 .map_err(|_| Error::session(format!("payload is too large: {} bytes", bytes.len())))?;
1806 let hash = format!("{:x}", Sha256::digest(bytes));
1807 Ok((hash, payload_bytes))
1808}
1809
1810fn line_length_u64(encoded: &[u8]) -> Result<u64> {
1811 let line_len = encoded
1812 .len()
1813 .checked_add(1)
1814 .ok_or_else(|| Error::session("line length overflow"))?;
1815 u64::try_from(line_len).map_err(|_| Error::session("line length exceeds u64"))
1816}
1817
1818fn crc32c_upper(data: &[u8]) -> String {
1819 let crc = crc32c::crc32c(data);
1820 format!("{crc:08X}")
1821}
1822
1823fn read_line_with_limit<R: BufRead>(
1824 reader: &mut R,
1825 buf: &mut String,
1826 limit: u64,
1827) -> std::io::Result<usize> {
1828 let mut take = reader.take(limit);
1829 let n = take.read_line(buf)?;
1830 if n > 0 && take.limit() == 0 && !buf.ends_with('\n') {
1831 let is_eof = take.into_inner().fill_buf()?.is_empty();
1834 if !is_eof {
1835 return Err(std::io::Error::new(
1836 std::io::ErrorKind::InvalidData,
1837 format!("Line length exceeds limit of {limit} bytes"),
1838 ));
1839 }
1840 }
1841 Ok(n)
1842}
1843
1844#[cfg(test)]
1845mod proptests {
1846 use super::*;
1847 use proptest::prelude::*;
1848 use serde_json::json;
1849 use std::fs;
1850
1851 #[test]
1852 fn quarantine_segment_file_moves_segment_to_backup() {
1853 let tmp = tempfile::tempdir().expect("tempdir");
1854 let segment = tmp.path().join("0000000000000002.seg");
1855 fs::write(&segment, b"hello").expect("write segment");
1856
1857 let backup = quarantine_segment_file(&segment).expect("quarantine segment");
1858
1859 assert_eq!(backup, tmp.path().join("0000000000000002.seg.bak"));
1860 assert!(!segment.exists(), "original segment should be moved away");
1861 assert_eq!(fs::read(&backup).expect("read backup"), b"hello");
1862 }
1863
1864 #[test]
1865 fn quarantine_segment_file_uses_next_available_backup_suffix() {
1866 let tmp = tempfile::tempdir().expect("tempdir");
1867 let segment = tmp.path().join("0000000000000002.seg");
1868 let existing_backup = tmp.path().join("0000000000000002.seg.bak");
1869 fs::write(&segment, b"new").expect("write segment");
1870 fs::write(&existing_backup, b"old").expect("write existing backup");
1871
1872 let backup = quarantine_segment_file(&segment).expect("quarantine segment");
1873
1874 assert_eq!(backup, tmp.path().join("0000000000000002.seg.bak.1"));
1875 assert_eq!(
1876 fs::read(&existing_backup).expect("read existing backup"),
1877 b"old"
1878 );
1879 assert_eq!(fs::read(&backup).expect("read new backup"), b"new");
1880 }
1881
1882 #[test]
1883 fn create_recovers_from_index_row_that_references_missing_segment() {
1884 let tmp = tempfile::tempdir().expect("tempdir");
1885 let root = tmp.path().join("store");
1886 let mut store = SessionStoreV2::create(&root, 4096).expect("create store");
1887 store
1888 .append_entry("entry-1", None, "message", json!({"n": 1}))
1889 .expect("append entry");
1890
1891 let mut rows = store.read_index().expect("read index");
1892 assert_eq!(rows.len(), 1);
1893 rows[0].segment_seq = 999;
1894 write_jsonl_lines(&store.index_file_path(), &rows).expect("write corrupted index");
1895 drop(store);
1896
1897 let reopened = SessionStoreV2::create(&root, 4096).expect("reopen store");
1898 assert_eq!(reopened.entry_count(), 1);
1899
1900 let rebuilt_rows = reopened.read_index().expect("read rebuilt index");
1901 assert_eq!(rebuilt_rows.len(), 1);
1902 assert_eq!(rebuilt_rows[0].segment_seq, 1);
1903 assert!(reopened.lookup_entry(1).expect("lookup entry").is_some());
1904 }
1905
1906 #[test]
1907 fn create_drops_frame_with_mismatched_embedded_segment_seq_during_rebuild() {
1908 let tmp = tempfile::tempdir().expect("tempdir");
1909 let root = tmp.path().join("store");
1910 let mut store = SessionStoreV2::create(&root, 4096).expect("create store");
1911 store
1912 .append_entry("entry-1", None, "message", json!({"n": 1}))
1913 .expect("append first entry");
1914 store
1915 .append_entry(
1916 "entry-2",
1917 Some("entry-1".to_string()),
1918 "message",
1919 json!({"n": 2}),
1920 )
1921 .expect("append second entry");
1922
1923 let segment_path = store.segment_file_path(1);
1924 let mut frames = store.read_segment(1).expect("read segment");
1925 assert_eq!(frames.len(), 2);
1926 frames[1].segment_seq = 77;
1927 write_jsonl_lines(&segment_path, &frames).expect("write corrupted segment");
1928 fs::remove_file(store.index_file_path()).expect("remove index");
1929 drop(store);
1930
1931 let reopened = SessionStoreV2::create(&root, 4096).expect("reopen store");
1932 assert_eq!(reopened.entry_count(), 1);
1933
1934 let rebuilt_rows = reopened.read_index().expect("read rebuilt index");
1935 assert_eq!(rebuilt_rows.len(), 1);
1936 assert_eq!(rebuilt_rows[0].entry_seq, 1);
1937 assert_eq!(reopened.read_segment(1).expect("read segment").len(), 1);
1938 assert!(reopened.lookup_entry(2).expect("lookup entry").is_none());
1939 }
1940
1941 proptest! {
1946 #[test]
1947 fn chain_hash_output_is_64_hex(
1948 a in "[0-9a-f]{64}",
1949 b in "[0-9a-f]{64}",
1950 ) {
1951 let result = chain_hash_step(&a, &b);
1952 assert_eq!(result.len(), 64);
1953 assert!(result.chars().all(|c| c.is_ascii_hexdigit()));
1954 }
1955
1956 #[test]
1957 fn chain_hash_deterministic(
1958 a in "[0-9a-f]{64}",
1959 b in "[0-9a-f]{64}",
1960 ) {
1961 assert_eq!(chain_hash_step(&a, &b), chain_hash_step(&a, &b));
1962 }
1963
1964 #[test]
1965 fn chain_hash_non_commutative(
1966 a in "[0-9a-f]{64}",
1967 b in "[0-9a-f]{64}",
1968 ) {
1969 if a != b {
1970 assert_ne!(chain_hash_step(&a, &b), chain_hash_step(&b, &a));
1971 }
1972 }
1973
1974 #[test]
1975 fn chain_hash_genesis_differs_from_step(payload in "[0-9a-f]{64}") {
1976 let step1 = chain_hash_step(GENESIS_CHAIN_HASH, &payload);
1977 assert_ne!(step1, GENESIS_CHAIN_HASH);
1978 }
1979 }
1980
1981 proptest! {
1986 #[test]
1987 fn crc32c_output_is_8_uppercase_hex(data in prop::collection::vec(any::<u8>(), 0..500)) {
1988 let result = crc32c_upper(&data);
1989 assert_eq!(result.len(), 8);
1990 assert!(result.chars().all(|c| matches!(c, '0'..='9' | 'A'..='F')));
1991 }
1992
1993 #[test]
1994 fn crc32c_deterministic(data in prop::collection::vec(any::<u8>(), 0..500)) {
1995 assert_eq!(crc32c_upper(&data), crc32c_upper(&data));
1996 }
1997
1998 #[test]
1999 fn crc32c_single_bit_sensitivity(byte in any::<u8>()) {
2000 let a = crc32c_upper(&[byte]);
2001 let b = crc32c_upper(&[byte ^ 1]);
2002 if byte != byte ^ 1 {
2003 assert_ne!(a, b, "flipping LSB should change CRC");
2004 }
2005 }
2006 }
2007
2008 proptest! {
2013 #[test]
2014 fn payload_hash_is_64_hex(s in "[a-z]{0,50}") {
2015 let val = json!(s);
2016 let raw_string = serde_json::to_string(&val).unwrap();
2017 let raw = RawValue::from_string(raw_string).unwrap();
2018 let (hash, _size) = payload_hash_and_size(&raw).unwrap();
2019 assert_eq!(hash.len(), 64);
2020 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
2021 }
2022
2023 #[test]
2024 fn payload_size_matches_serialization(s in "[a-z]{0,50}") {
2025 let val = json!(s);
2026 let raw_string = serde_json::to_string(&val).unwrap();
2027 let raw = RawValue::from_string(raw_string).unwrap();
2028 let (_, size) = payload_hash_and_size(&raw).unwrap();
2029 let expected = serde_json::to_vec(&val).unwrap().len() as u64;
2030 assert_eq!(size, expected);
2031 }
2032
2033 #[test]
2034 fn payload_hash_deterministic(n in 0i64..10000) {
2035 let val = json!(n);
2036 let raw_string = serde_json::to_string(&val).unwrap();
2037 let raw = RawValue::from_string(raw_string).unwrap();
2038 let (h1, s1) = payload_hash_and_size(&raw).unwrap();
2039 let (h2, s2) = payload_hash_and_size(&raw).unwrap();
2040 assert_eq!(h1, h2);
2041 assert_eq!(s1, s2);
2042 }
2043 }
2044
2045 proptest! {
2050 #[test]
2051 fn line_length_is_len_plus_one(data in prop::collection::vec(any::<u8>(), 0..1000)) {
2052 let result = line_length_u64(&data).unwrap();
2053 assert_eq!(result, data.len() as u64 + 1);
2054 }
2055
2056 #[test]
2057 fn line_length_never_zero(data in prop::collection::vec(any::<u8>(), 0..100)) {
2058 let result = line_length_u64(&data).unwrap();
2059 assert!(result >= 1);
2060 }
2061 }
2062
2063 proptest! {
2068 #[test]
2069 fn sidecar_path_ends_with_v2(stem in "[a-z]{1,10}") {
2070 let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
2071 let result = v2_sidecar_path(&input);
2072 let name = result.file_name().unwrap().to_str().unwrap();
2073 assert_eq!(
2074 Path::new(name).extension().and_then(|ext| ext.to_str()),
2075 Some("v2"),
2076 "expected .v2 suffix, got {name}"
2077 );
2078 }
2079
2080 #[test]
2081 fn sidecar_path_preserves_parent(stem in "[a-z]{1,10}", dir in "[a-z]{1,8}") {
2082 let input = PathBuf::from(format!("/tmp/{dir}/{stem}.jsonl"));
2083 let result = v2_sidecar_path(&input);
2084 assert_eq!(
2085 result.parent().unwrap(),
2086 Path::new(&format!("/tmp/{dir}"))
2087 );
2088 }
2089
2090 #[test]
2091 fn sidecar_path_deterministic(stem in "[a-z]{1,10}") {
2092 let input = PathBuf::from(format!("/sessions/{stem}.jsonl"));
2093 assert_eq!(v2_sidecar_path(&input), v2_sidecar_path(&input));
2094 }
2095
2096 #[test]
2097 fn sidecar_path_contains_stem(stem in "[a-z]{1,10}") {
2098 let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
2099 let result = v2_sidecar_path(&input);
2100 let name = result.file_name().unwrap().to_str().unwrap();
2101 assert_eq!(name, format!("{stem}.v2"));
2102 }
2103 }
2104}