Skip to main content

pi/
session_store_v2.rs

1//! Session Store V2 segmented append log + sidecar index primitives.
2//!
3//! This module provides the storage core requested by Phase-2 performance work:
4//! - Segment append writer
5//! - Sidecar offset index rows
6//! - Reader helpers
7//! - Integrity validation (checksum + payload hash)
8
9use crate::error::{Error, Result};
10use crate::session::SessionEntry;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use serde_json::value::RawValue;
14use sha2::{Digest, Sha256};
15use std::borrow::Cow;
16use std::collections::BTreeSet;
17use std::fs::{self, File, OpenOptions};
18use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20
21fn secure_open_options() -> OpenOptions {
22    #[cfg_attr(not(unix), allow(unused_mut))]
23    let mut opts = OpenOptions::new();
24    #[cfg(unix)]
25    {
26        use std::os::unix::fs::OpenOptionsExt;
27        opts.mode(0o600);
28    }
29    opts
30}
31
32pub const SEGMENT_FRAME_SCHEMA: &str = "pi.session_store_v2.segment_frame.v1";
33pub const OFFSET_INDEX_SCHEMA: &str = "pi.session_store_v2.offset_index.v1";
34pub const CHECKPOINT_SCHEMA: &str = "pi.session_store_v2.checkpoint.v1";
35pub const MANIFEST_SCHEMA: &str = "pi.session_store_v2.manifest.v1";
36pub const MIGRATION_EVENT_SCHEMA: &str = "pi.session_store_v2.migration_event.v1";
37
38/// Maximum size for a single frame line (100MB) to prevent OOM on corrupted files.
39const MAX_FRAME_READ_BYTES: u64 = 100 * 1024 * 1024;
40
41/// Initial chain hash before any frames are appended.
42const GENESIS_CHAIN_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
45#[serde(rename_all = "camelCase")]
46pub struct SegmentFrame {
47    pub schema: Cow<'static, str>,
48    pub segment_seq: u64,
49    pub frame_seq: u64,
50    pub entry_seq: u64,
51    pub entry_id: String,
52    #[serde(skip_serializing_if = "Option::is_none")]
53    pub parent_entry_id: Option<String>,
54    pub entry_type: String,
55    pub timestamp: String,
56    pub payload_sha256: String,
57    pub payload_bytes: u64,
58    pub payload: Box<RawValue>,
59}
60
61impl SegmentFrame {
62    fn new(
63        segment_seq: u64,
64        frame_seq: u64,
65        entry_seq: u64,
66        entry_id: String,
67        parent_entry_id: Option<String>,
68        entry_type: String,
69        payload: Box<RawValue>,
70    ) -> Result<Self> {
71        let (payload_sha256, payload_bytes) = payload_hash_and_size(&payload)?;
72        Ok(Self {
73            schema: Cow::Borrowed(SEGMENT_FRAME_SCHEMA),
74            segment_seq,
75            frame_seq,
76            entry_seq,
77            entry_id,
78            parent_entry_id,
79            entry_type,
80            timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
81            payload_sha256,
82            payload_bytes,
83            payload,
84        })
85    }
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize)]
89#[serde(rename_all = "camelCase")]
90pub struct OffsetIndexEntry {
91    pub schema: Cow<'static, str>,
92    pub entry_seq: u64,
93    pub entry_id: String,
94    pub segment_seq: u64,
95    pub frame_seq: u64,
96    pub byte_offset: u64,
97    pub byte_length: u64,
98    pub crc32c: String,
99    pub state: Cow<'static, str>,
100}
101
102/// Current head position of the store (last written entry).
103#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
104#[serde(rename_all = "camelCase")]
105pub struct StoreHead {
106    pub segment_seq: u64,
107    pub entry_seq: u64,
108    pub entry_id: String,
109}
110
111/// Periodic checkpoint snapshot metadata.
112#[derive(Debug, Clone, Serialize, Deserialize)]
113#[serde(rename_all = "camelCase")]
114pub struct Checkpoint {
115    pub schema: String,
116    pub checkpoint_seq: u64,
117    pub at: String,
118    pub head_entry_seq: u64,
119    pub head_entry_id: String,
120    pub snapshot_ref: String,
121    pub compacted_before_entry_seq: u64,
122    pub chain_hash: String,
123    pub reason: String,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct Manifest {
129    pub schema: String,
130    pub store_version: u8,
131    pub session_id: String,
132    pub source_format: String,
133    pub created_at: String,
134    pub updated_at: String,
135    pub head: StoreHead,
136    pub counters: ManifestCounters,
137    pub files: ManifestFiles,
138    pub integrity: ManifestIntegrity,
139    pub invariants: ManifestInvariants,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
143#[serde(rename_all = "camelCase")]
144pub struct ManifestCounters {
145    pub entries_total: u64,
146    pub messages_total: u64,
147    pub branches_total: u64,
148    pub compactions_total: u64,
149    pub bytes_total: u64,
150}
151
152#[derive(Debug, Clone, Serialize, Deserialize)]
153#[serde(rename_all = "camelCase")]
154pub struct ManifestFiles {
155    pub segment_dir: String,
156    pub segment_count: u64,
157    pub index_path: String,
158    pub checkpoint_dir: String,
159    pub migration_ledger_path: String,
160}
161
162#[derive(Debug, Clone, Serialize, Deserialize)]
163#[serde(rename_all = "camelCase")]
164pub struct ManifestIntegrity {
165    pub chain_hash: String,
166    pub manifest_hash: String,
167    pub last_crc32c: String,
168}
169
170#[derive(Debug, Clone, Serialize, Deserialize)]
171#[serde(rename_all = "camelCase")]
172#[allow(clippy::struct_excessive_bools)] // invariants are naturally boolean checks
173pub struct ManifestInvariants {
174    pub parent_links_closed: bool,
175    pub monotonic_entry_seq: bool,
176    pub monotonic_segment_seq: bool,
177    pub index_within_segment_bounds: bool,
178    pub branch_heads_indexed: bool,
179    pub checkpoints_monotonic: bool,
180    pub hash_chain_valid: bool,
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
184#[serde(rename_all = "camelCase")]
185pub struct MigrationVerification {
186    pub entry_count_match: bool,
187    pub hash_chain_match: bool,
188    pub index_consistent: bool,
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
192#[serde(rename_all = "camelCase")]
193pub struct MigrationEvent {
194    pub schema: String,
195    pub migration_id: String,
196    pub phase: String,
197    pub at: String,
198    pub source_path: String,
199    pub target_path: String,
200    pub source_format: String,
201    pub target_format: String,
202    pub verification: MigrationVerification,
203    pub outcome: String,
204    #[serde(skip_serializing_if = "Option::is_none")]
205    pub error_class: Option<String>,
206    pub correlation_id: String,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
210#[serde(rename_all = "camelCase")]
211pub struct IndexSummary {
212    pub entry_count: u64,
213    pub first_entry_seq: u64,
214    pub last_entry_seq: u64,
215    pub last_entry_id: String,
216}
217
218#[derive(Debug, Clone)]
219pub struct SessionStoreV2 {
220    root: PathBuf,
221    max_segment_bytes: u64,
222    next_segment_seq: u64,
223    next_frame_seq: u64,
224    next_entry_seq: u64,
225    current_segment_bytes: u64,
226    /// Running SHA-256 hash chain: `H(prev_chain || payload_sha256)`.
227    chain_hash: String,
228    /// Total bytes written across all segments.
229    total_bytes: u64,
230    /// Last entry ID written (for head tracking).
231    last_entry_id: Option<String>,
232    /// Last CRC32-C written (for integrity checkpoints).
233    last_crc32c: String,
234}
235
236impl SessionStoreV2 {
237    /// Open a store handle for read-only inspection without bootstrap recovery.
238    pub fn open_for_inspection(root: impl AsRef<Path>, max_segment_bytes: u64) -> Result<Self> {
239        if max_segment_bytes == 0 {
240            return Err(Error::validation("max_segment_bytes must be > 0"));
241        }
242
243        Ok(Self {
244            root: root.as_ref().to_path_buf(),
245            max_segment_bytes,
246            next_segment_seq: 1,
247            next_frame_seq: 1,
248            next_entry_seq: 1,
249            current_segment_bytes: 0,
250            chain_hash: GENESIS_CHAIN_HASH.to_string(),
251            total_bytes: 0,
252            last_entry_id: None,
253            last_crc32c: "00000000".to_string(),
254        })
255    }
256
257    pub fn create(root: impl AsRef<Path>, max_segment_bytes: u64) -> Result<Self> {
258        if max_segment_bytes == 0 {
259            return Err(Error::validation("max_segment_bytes must be > 0"));
260        }
261
262        let root = root.as_ref().to_path_buf();
263        fs::create_dir_all(root.join("segments"))?;
264        fs::create_dir_all(root.join("index"))?;
265        fs::create_dir_all(root.join("checkpoints"))?;
266        fs::create_dir_all(root.join("migrations"))?;
267        fs::create_dir_all(root.join("tmp"))?;
268
269        let mut store = Self {
270            root,
271            max_segment_bytes,
272            next_segment_seq: 1,
273            next_frame_seq: 1,
274            next_entry_seq: 1,
275            current_segment_bytes: 0,
276            chain_hash: GENESIS_CHAIN_HASH.to_string(),
277            total_bytes: 0,
278            last_entry_id: None,
279            last_crc32c: "00000000".to_string(),
280        };
281        if let Err(err) = store.bootstrap_from_disk() {
282            if is_recoverable_index_error(&err) {
283                tracing::warn!(
284                    root = %store.root.display(),
285                    error = %err,
286                    "SessionStoreV2 bootstrap failed with recoverable index error; attempting index rebuild"
287                );
288                store.rebuild_index()?;
289                store.bootstrap_from_disk()?;
290            } else {
291                return Err(err);
292            }
293        }
294
295        // Recovery path: segments exist but index file is missing or empty.
296        // Rebuild from segment frames so resume does not appear as an empty session.
297        if store.entry_count() == 0 && store.segments_exist_with_data()? {
298            tracing::warn!(
299                root = %store.root.display(),
300                "SessionStoreV2 detected segment data with empty index; rebuilding index"
301            );
302            store.rebuild_index()?;
303            store.bootstrap_from_disk()?;
304        }
305
306        if let Err(err) = store.validate_integrity() {
307            if is_recoverable_index_error(&err) {
308                tracing::warn!(
309                    root = %store.root.display(),
310                    error = %err,
311                    "SessionStoreV2 integrity validation failed with recoverable error; rebuilding index"
312                );
313                store.rebuild_index()?;
314                store.bootstrap_from_disk()?;
315                store.validate_integrity()?;
316            } else {
317                return Err(err);
318            }
319        }
320        Ok(store)
321    }
322
323    pub fn segment_file_path(&self, segment_seq: u64) -> PathBuf {
324        self.root
325            .join("segments")
326            .join(format!("{segment_seq:016}.seg"))
327    }
328
329    pub fn index_file_path(&self) -> PathBuf {
330        self.root.join("index").join("offsets.jsonl")
331    }
332
333    fn manifest_path(&self) -> PathBuf {
334        self.root.join("manifest.json")
335    }
336
337    fn migration_ledger_path(&self) -> PathBuf {
338        self.root.join("migrations").join("ledger.jsonl")
339    }
340
341    fn list_segment_files(&self) -> Result<Vec<(u64, PathBuf)>> {
342        let segments_dir = self.root.join("segments");
343        if !segments_dir.exists() {
344            return Ok(Vec::new());
345        }
346
347        let mut segment_files = Vec::new();
348        for entry in fs::read_dir(segments_dir)? {
349            let entry = entry?;
350            let path = entry.path();
351            if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
352                continue;
353            }
354            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
355                continue;
356            };
357            let Ok(segment_seq) = stem.parse::<u64>() else {
358                continue;
359            };
360            segment_files.push((segment_seq, path));
361        }
362        segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
363        Ok(segment_files)
364    }
365
366    fn segments_exist_with_data(&self) -> Result<bool> {
367        for (_, path) in self.list_segment_files()? {
368            if fs::metadata(path)?.len() > 0 {
369                return Ok(true);
370            }
371        }
372        Ok(false)
373    }
374
375    #[allow(clippy::needless_pass_by_value)]
376    pub fn append_entry(
377        &mut self,
378        entry_id: impl Into<String>,
379        parent_entry_id: Option<String>,
380        entry_type: impl Into<String>,
381        payload: Value,
382    ) -> Result<OffsetIndexEntry> {
383        let entry_id = entry_id.into();
384        let entry_type = entry_type.into();
385
386        // Convert the generic Value into a RawValue (string slice) to avoid
387        // re-serializing the payload when writing the full frame.
388        // We do this by first serializing the Value to a string, then
389        // creating a Box<RawValue> from it.
390        let raw_string = serde_json::to_string(&payload)?;
391        let raw_payload = RawValue::from_string(raw_string)
392            .map_err(|e| Error::session(format!("failed to convert payload to RawValue: {e}")))?;
393
394        let mut frame = SegmentFrame::new(
395            self.next_segment_seq,
396            self.next_frame_seq,
397            self.next_entry_seq,
398            entry_id,
399            parent_entry_id,
400            entry_type,
401            raw_payload,
402        )?;
403        let mut encoded = serde_json::to_vec(&frame)?;
404        let mut line_len = line_length_u64(&encoded)?;
405
406        if self.current_segment_bytes > 0
407            && self.current_segment_bytes.saturating_add(line_len) > self.max_segment_bytes
408        {
409            self.next_segment_seq = self
410                .next_segment_seq
411                .checked_add(1)
412                .ok_or_else(|| Error::session("segment sequence overflow"))?;
413            self.next_frame_seq = 1;
414            self.current_segment_bytes = 0;
415
416            frame = SegmentFrame::new(
417                self.next_segment_seq,
418                self.next_frame_seq,
419                self.next_entry_seq,
420                frame.entry_id.clone(),
421                frame.parent_entry_id.clone(),
422                frame.entry_type.clone(),
423                frame.payload.clone(),
424            )?;
425            encoded = serde_json::to_vec(&frame)?;
426            line_len = line_length_u64(&encoded)?;
427        }
428
429        let segment_path = self.segment_file_path(self.next_segment_seq);
430
431        // Prepare the write buffer by appending the newline to the encoded JSON
432        let mut write_buf = encoded;
433        write_buf.push(b'\n');
434
435        let is_new_segment = self.next_frame_seq == 1;
436        let mut segment = secure_open_options()
437            .create(true)
438            .write(true)
439            .truncate(is_new_segment)
440            .open(&segment_path)?;
441
442        let byte_offset = segment.seek(SeekFrom::End(0))?;
443        if let Err(e) = segment.write_all(&write_buf) {
444            let _ = segment.set_len(byte_offset);
445            return Err(Error::from(e));
446        }
447
448        // Use write_buf (which includes the newline) for CRC calculation
449        let crc = crc32c_upper(&write_buf);
450        let index_entry = OffsetIndexEntry {
451            schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
452            entry_seq: frame.entry_seq,
453            entry_id: frame.entry_id.clone(),
454            segment_seq: frame.segment_seq,
455            frame_seq: frame.frame_seq,
456            byte_offset,
457            byte_length: line_len,
458            crc32c: crc.clone(),
459            state: Cow::Borrowed("active"),
460        };
461
462        if let Err(e) = append_jsonl_line(&self.index_file_path(), &index_entry) {
463            // Rollback: truncate segment to remove the unindexed frame.
464            let _ = segment.set_len(byte_offset);
465            return Err(e);
466        }
467
468        self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
469        self.total_bytes = self.total_bytes.saturating_add(line_len);
470        self.last_entry_id = Some(frame.entry_id);
471        self.last_crc32c = crc;
472
473        self.next_entry_seq = self
474            .next_entry_seq
475            .checked_add(1)
476            .ok_or_else(|| Error::session("entry sequence overflow"))?;
477        self.next_frame_seq = self
478            .next_frame_seq
479            .checked_add(1)
480            .ok_or_else(|| Error::session("frame sequence overflow"))?;
481        self.current_segment_bytes = self.current_segment_bytes.saturating_add(line_len);
482
483        Ok(index_entry)
484    }
485
486    pub fn read_segment(&self, segment_seq: u64) -> Result<Vec<SegmentFrame>> {
487        let path = self.segment_file_path(segment_seq);
488        if !path.exists() {
489            return Ok(Vec::new());
490        }
491        read_jsonl::<SegmentFrame>(&path)
492    }
493
494    pub fn read_index(&self) -> Result<Vec<OffsetIndexEntry>> {
495        let path = self.index_file_path();
496        if !path.exists() {
497            return Ok(Vec::new());
498        }
499        read_jsonl::<OffsetIndexEntry>(&path)
500    }
501
502    /// Seek to a specific entry by `entry_seq` using the offset index.
503    /// Returns `None` if the entry is not found.
504    pub fn lookup_entry(&self, target_entry_seq: u64) -> Result<Option<SegmentFrame>> {
505        let index_rows = self.read_index()?;
506        let row = index_rows.iter().find(|r| r.entry_seq == target_entry_seq);
507        let Some(row) = row else {
508            return Ok(None);
509        };
510        SegmentFileReader::new(self).read_frame(row)
511    }
512
513    /// Read all entries with `entry_seq >= from_entry_seq` (tail reading).
514    pub fn read_entries_from(&self, from_entry_seq: u64) -> Result<Vec<SegmentFrame>> {
515        let index_rows = self.read_index()?;
516        let mut frames = Vec::new();
517        let mut reader = SegmentFileReader::new(self);
518        for row in &index_rows {
519            if row.entry_seq < from_entry_seq {
520                continue;
521            }
522            if let Some(frame) = reader.read_frame(row)? {
523                frames.push(frame);
524            }
525        }
526        Ok(frames)
527    }
528
529    /// Read all entries across all segments in entry_seq order.
530    pub fn read_all_entries(&self) -> Result<Vec<SegmentFrame>> {
531        self.read_entries_from(1)
532    }
533
534    /// Read the last `count` entries by entry_seq using the offset index.
535    pub fn read_tail_entries(&self, count: u64) -> Result<Vec<SegmentFrame>> {
536        let index_rows = self.read_index()?;
537        let total = index_rows.len();
538        let skip = total.saturating_sub(usize::try_from(count).unwrap_or(usize::MAX));
539        let mut frames = Vec::with_capacity(total.saturating_sub(skip));
540        let mut reader = SegmentFileReader::new(self);
541        for row in &index_rows[skip..] {
542            if let Some(frame) = reader.read_frame(row)? {
543                frames.push(frame);
544            }
545        }
546        Ok(frames)
547    }
548
549    /// Read entries on the active branch from `leaf_entry_id` back to root.
550    /// Returns frames in root→leaf order.
551    pub fn read_active_path(&self, leaf_entry_id: &str) -> Result<Vec<SegmentFrame>> {
552        let index_rows = self.read_index()?;
553        let mut id_to_row: std::collections::HashMap<&str, &OffsetIndexEntry> =
554            std::collections::HashMap::with_capacity(index_rows.len());
555        for row in &index_rows {
556            if id_to_row.insert(row.entry_id.as_str(), row).is_some() {
557                return Err(Error::session(format!(
558                    "duplicate entry_id detected while reading active path: {}",
559                    row.entry_id
560                )));
561            }
562        }
563
564        let mut frames = Vec::new();
565        let mut current_id: Option<String> = Some(leaf_entry_id.to_string());
566        let mut reader = SegmentFileReader::new(self);
567        let mut visited = std::collections::HashSet::new();
568        while let Some(ref entry_id) = current_id {
569            if !visited.insert(entry_id.clone()) {
570                return Err(Error::session(format!(
571                    "cyclic parent chain detected while reading active path at entry_id={entry_id}"
572                )));
573            }
574            let Some(&row) = id_to_row.get(entry_id.as_str()) else {
575                if frames.is_empty() {
576                    break;
577                }
578                return Err(Error::session(format!(
579                    "missing parent entry detected while reading active path at entry_id={entry_id}"
580                )));
581            };
582            match reader.read_frame(row)? {
583                Some(frame) => {
584                    if frame.entry_id != row.entry_id {
585                        return Err(Error::session(format!(
586                            "active path index/frame mismatch for entry_id={} frame={}",
587                            row.entry_id, frame.entry_id
588                        )));
589                    }
590                    current_id.clone_from(&frame.parent_entry_id);
591                    frames.push(frame);
592                }
593                None => {
594                    return Err(Error::session(format!(
595                        "index references missing frame while reading active path at entry_id={entry_id}"
596                    )));
597                }
598            }
599        }
600        frames.reverse();
601        Ok(frames)
602    }
603
604    /// Total number of entries appended so far.
605    pub const fn entry_count(&self) -> u64 {
606        self.next_entry_seq.saturating_sub(1)
607    }
608
609    /// Current head position, or `None` if the store is empty.
610    pub fn head(&self) -> Option<StoreHead> {
611        self.last_entry_id.as_ref().map(|entry_id| StoreHead {
612            segment_seq: self.next_segment_seq,
613            entry_seq: self.next_entry_seq.saturating_sub(1),
614            entry_id: entry_id.clone(),
615        })
616    }
617
618    fn checkpoint_path(&self, checkpoint_seq: u64) -> PathBuf {
619        self.root
620            .join("checkpoints")
621            .join(format!("{checkpoint_seq:016}.json"))
622    }
623
624    /// Create a checkpoint snapshot at the current head.
625    pub fn create_checkpoint(&self, checkpoint_seq: u64, reason: &str) -> Result<Checkpoint> {
626        let head = self.head().unwrap_or(StoreHead {
627            segment_seq: 0,
628            entry_seq: 0,
629            entry_id: String::new(),
630        });
631        let snapshot_ref = format!("checkpoints/{checkpoint_seq:016}.json");
632        let checkpoint = Checkpoint {
633            schema: CHECKPOINT_SCHEMA.to_string(),
634            checkpoint_seq,
635            at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
636            head_entry_seq: head.entry_seq,
637            head_entry_id: head.entry_id,
638            snapshot_ref,
639            compacted_before_entry_seq: 0,
640            chain_hash: self.chain_hash.clone(),
641            reason: reason.to_string(),
642        };
643        let tmp_path = self
644            .root
645            .join("tmp")
646            .join(format!("{checkpoint_seq:016}.json.tmp"));
647
648        let write_result: Result<()> = (|| {
649            let mut file = secure_open_options()
650                .create(true)
651                .write(true)
652                .truncate(true)
653                .open(&tmp_path)?;
654            file.write_all(&serde_json::to_vec_pretty(&checkpoint)?)?;
655            file.sync_all()?;
656            Ok(())
657        })();
658
659        if let Err(err) = write_result {
660            let _ = fs::remove_file(&tmp_path);
661            return Err(err);
662        }
663
664        let target_path = self.checkpoint_path(checkpoint_seq);
665        fs::rename(&tmp_path, &target_path)?;
666        sync_parent_dir(&target_path)?;
667        Ok(checkpoint)
668    }
669
670    /// Read a checkpoint by sequence number.
671    pub fn read_checkpoint(&self, checkpoint_seq: u64) -> Result<Option<Checkpoint>> {
672        let path = self.checkpoint_path(checkpoint_seq);
673        if !path.exists() {
674            return Ok(None);
675        }
676        let data = fs::read_to_string(&path)?;
677        let cp: Checkpoint = serde_json::from_str(&data)?;
678        Ok(Some(cp))
679    }
680
681    pub fn append_migration_event(&self, mut event: MigrationEvent) -> Result<()> {
682        if event.schema.is_empty() {
683            event.schema = MIGRATION_EVENT_SCHEMA.to_string();
684        }
685        if event.at.is_empty() {
686            event.at = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
687        }
688        append_jsonl_line(&self.migration_ledger_path(), &event)
689    }
690
691    pub fn read_migration_events(&self) -> Result<Vec<MigrationEvent>> {
692        let path = self.migration_ledger_path();
693        if !path.exists() {
694            return Ok(Vec::new());
695        }
696        read_jsonl::<MigrationEvent>(&path)
697    }
698
699    #[allow(clippy::too_many_lines)]
700    pub fn rollback_to_checkpoint(
701        &mut self,
702        checkpoint_seq: u64,
703        migration_id: impl Into<String>,
704        correlation_id: impl Into<String>,
705    ) -> Result<MigrationEvent> {
706        let migration_id = migration_id.into();
707        let correlation_id = correlation_id.into();
708
709        let rollback_result: Result<MigrationEvent> = (|| {
710            let checkpoint = self
711                .read_checkpoint(checkpoint_seq)?
712                .ok_or_else(|| Error::session(format!("checkpoint {checkpoint_seq} not found")))?;
713
714            let mut index_rows = self.read_index()?;
715            index_rows.retain(|row| row.entry_seq <= checkpoint.head_entry_seq);
716
717            let mut keep_len_by_segment: std::collections::HashMap<u64, u64> =
718                std::collections::HashMap::new();
719            for row in &index_rows {
720                let end = row
721                    .byte_offset
722                    .checked_add(row.byte_length)
723                    .ok_or_else(|| Error::session("index byte range overflow during rollback"))?;
724                keep_len_by_segment
725                    .entry(row.segment_seq)
726                    .and_modify(|current| *current = (*current).max(end))
727                    .or_insert(end);
728            }
729
730            let segments_dir = self.root.join("segments");
731            if segments_dir.exists() {
732                let mut segment_files: Vec<(u64, PathBuf)> = Vec::new();
733                for entry in fs::read_dir(&segments_dir)? {
734                    let entry = entry?;
735                    let path = entry.path();
736                    if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
737                        continue;
738                    }
739                    let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
740                        continue;
741                    };
742                    let Ok(segment_seq) = stem.parse::<u64>() else {
743                        continue;
744                    };
745                    segment_files.push((segment_seq, path));
746                }
747                segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
748
749                for (segment_seq, path) in segment_files {
750                    match keep_len_by_segment.get(&segment_seq).copied() {
751                        Some(keep_len) if keep_len > 0 => {
752                            let current_len = fs::metadata(&path)?.len();
753                            if keep_len < current_len {
754                                truncate_file_to(&path, keep_len)?;
755                            }
756                        }
757                        _ => {
758                            fs::remove_file(&path)?;
759                        }
760                    }
761                }
762            }
763
764            let index_path = self.index_file_path();
765            let index_tmp = self.root.join("tmp").join("offsets.rollback.tmp");
766            if let Some(parent) = index_tmp.parent() {
767                let _ = fs::create_dir_all(parent);
768            }
769            write_jsonl_lines(&index_tmp, &index_rows)?;
770            fs::rename(&index_tmp, &index_path)?;
771            let _ = sync_parent_dir(&index_path);
772
773            self.next_segment_seq = 1;
774            self.next_frame_seq = 1;
775            self.next_entry_seq = 1;
776            self.current_segment_bytes = 0;
777            self.bootstrap_from_disk()?;
778
779            let verification = MigrationVerification {
780                entry_count_match: self.entry_count() == checkpoint.head_entry_seq,
781                hash_chain_match: self.chain_hash == checkpoint.chain_hash,
782                index_consistent: self.validate_integrity().is_ok(),
783            };
784
785            let (outcome, error_class) = if verification.entry_count_match
786                && verification.hash_chain_match
787                && verification.index_consistent
788            {
789                ("ok".to_string(), None)
790            } else if verification.index_consistent {
791                (
792                    "recoverable_error".to_string(),
793                    Some("integrity_mismatch".to_string()),
794                )
795            } else {
796                (
797                    "fatal_error".to_string(),
798                    Some("index_corruption".to_string()),
799                )
800            };
801
802            let event = MigrationEvent {
803                schema: MIGRATION_EVENT_SCHEMA.to_string(),
804                migration_id: migration_id.clone(),
805                phase: "rollback".to_string(),
806                at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
807                source_path: checkpoint.snapshot_ref,
808                target_path: self.root.display().to_string(),
809                source_format: "native_v2".to_string(),
810                target_format: "native_v2".to_string(),
811                verification,
812                outcome: outcome.clone(),
813                error_class,
814                correlation_id: correlation_id.clone(),
815            };
816            self.append_migration_event(event.clone())?;
817
818            if outcome == "ok" {
819                Ok(event)
820            } else {
821                Err(Error::session(format!(
822                    "rollback verification failed for checkpoint {checkpoint_seq}"
823                )))
824            }
825        })();
826
827        match rollback_result {
828            Ok(event) => Ok(event),
829            Err(error) => {
830                if !rollback_failure_event_already_recorded(&error) {
831                    let failure_event = MigrationEvent {
832                        schema: MIGRATION_EVENT_SCHEMA.to_string(),
833                        migration_id,
834                        phase: "rollback".to_string(),
835                        at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
836                        source_path: self.checkpoint_path(checkpoint_seq).display().to_string(),
837                        target_path: self.root.display().to_string(),
838                        source_format: "native_v2".to_string(),
839                        target_format: "native_v2".to_string(),
840                        verification: MigrationVerification {
841                            entry_count_match: false,
842                            hash_chain_match: false,
843                            index_consistent: false,
844                        },
845                        outcome: "fatal_error".to_string(),
846                        error_class: Some(classify_rollback_error(&error).to_string()),
847                        correlation_id,
848                    };
849                    let _ = self.append_migration_event(failure_event);
850                }
851                Err(error)
852            }
853        }
854    }
855
856    #[allow(clippy::too_many_lines)]
857    pub fn write_manifest(
858        &self,
859        session_id: impl Into<String>,
860        source_format: impl Into<String>,
861    ) -> Result<Manifest> {
862        let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
863        let created_at = self
864            .read_manifest()?
865            .map_or_else(|| now.clone(), |m| m.created_at);
866        let session_id = session_id.into();
867        let source_format = source_format.into();
868        let index_rows = self.read_index()?;
869
870        let mut parent_counts: std::collections::HashMap<String, u64> =
871            std::collections::HashMap::new();
872        let mut message_count = 0u64;
873        let mut compaction_count = 0u64;
874        let mut entry_ids = std::collections::HashSet::with_capacity(index_rows.len());
875
876        let mut recomputed_chain = GENESIS_CHAIN_HASH.to_string();
877        let mut parent_links_closed = true;
878        let mut reader = SegmentFileReader::new(self);
879
880        for row in &index_rows {
881            if let Some(frame) = reader.read_frame(row)? {
882                entry_ids.insert(frame.entry_id.clone());
883
884                if frame.entry_type == "message" {
885                    message_count = message_count.saturating_add(1);
886                }
887                if frame.entry_type == "compaction" {
888                    compaction_count = compaction_count.saturating_add(1);
889                }
890
891                if let Some(parent_id) = frame.parent_entry_id.as_deref() {
892                    *parent_counts.entry(parent_id.to_string()).or_insert(0) += 1;
893
894                    // In a valid append-only log, the parent must have appeared
895                    // (and thus been added to entry_ids) before the child.
896                    if !entry_ids.contains(parent_id) {
897                        parent_links_closed = false;
898                    }
899                }
900
901                recomputed_chain = chain_hash_step(&recomputed_chain, &frame.payload_sha256);
902            }
903        }
904
905        let branches_total = u64::try_from(parent_counts.values().filter(|&&n| n > 1).count())
906            .map_err(|_| Error::session("branch count exceeds u64"))?;
907
908        let mut monotonic_entry_seq = true;
909        let mut monotonic_segment_seq = true;
910        let mut last_entry_seq = 0u64;
911        let mut last_segment_seq = 0u64;
912        for row in &index_rows {
913            if row.entry_seq <= last_entry_seq {
914                monotonic_entry_seq = false;
915            }
916            if row.segment_seq < last_segment_seq {
917                monotonic_segment_seq = false;
918            }
919            last_entry_seq = row.entry_seq;
920            last_segment_seq = row.segment_seq;
921        }
922
923        let hash_chain_valid = recomputed_chain == self.chain_hash;
924
925        let head = self.head().unwrap_or(StoreHead {
926            segment_seq: 0,
927            entry_seq: 0,
928            entry_id: String::new(),
929        });
930        let segment_count = u64::try_from(
931            index_rows
932                .iter()
933                .map(|row| row.segment_seq)
934                .collect::<BTreeSet<_>>()
935                .len(),
936        )
937        .map_err(|_| Error::session("segment count exceeds u64"))?;
938
939        let mut manifest = Manifest {
940            schema: MANIFEST_SCHEMA.to_string(),
941            store_version: 2,
942            session_id,
943            source_format,
944            created_at,
945            updated_at: now,
946            head,
947            counters: ManifestCounters {
948                entries_total: u64::try_from(index_rows.len())
949                    .map_err(|_| Error::session("entry count exceeds u64"))?,
950                messages_total: message_count,
951                branches_total,
952                compactions_total: compaction_count,
953                bytes_total: self.total_bytes,
954            },
955            files: ManifestFiles {
956                segment_dir: "segments/".to_string(),
957                segment_count,
958                index_path: "index/offsets.jsonl".to_string(),
959                checkpoint_dir: "checkpoints/".to_string(),
960                migration_ledger_path: "migrations/ledger.jsonl".to_string(),
961            },
962            integrity: ManifestIntegrity {
963                chain_hash: self.chain_hash.clone(),
964                manifest_hash: String::new(),
965                last_crc32c: self.last_crc32c.clone(),
966            },
967            invariants: ManifestInvariants {
968                parent_links_closed,
969                monotonic_entry_seq,
970                monotonic_segment_seq,
971                index_within_segment_bounds: self.validate_integrity().is_ok(),
972                branch_heads_indexed: true,
973                checkpoints_monotonic: true,
974                hash_chain_valid,
975            },
976        };
977        manifest.integrity.manifest_hash = manifest_hash_hex(&manifest)?;
978
979        let tmp = self.root.join("tmp").join("manifest.json.tmp");
980
981        let write_result: Result<()> = (|| {
982            let mut file = secure_open_options()
983                .create(true)
984                .write(true)
985                .truncate(true)
986                .open(&tmp)?;
987            file.write_all(&serde_json::to_vec_pretty(&manifest)?)?;
988            file.sync_all()?;
989            Ok(())
990        })();
991
992        if let Err(err) = write_result {
993            let _ = fs::remove_file(&tmp);
994            return Err(err);
995        }
996
997        let target_path = self.manifest_path();
998        fs::rename(&tmp, &target_path)?;
999        sync_parent_dir(&target_path)?;
1000        Ok(manifest)
1001    }
1002
1003    pub fn read_manifest(&self) -> Result<Option<Manifest>> {
1004        let path = self.manifest_path();
1005        if !path.exists() {
1006            return Ok(None);
1007        }
1008        let content = fs::read_to_string(&path)?;
1009        let manifest: Manifest = serde_json::from_str(&content).map_err(|err| {
1010            Error::session(format!(
1011                "Failed to parse manifest {}: {err}",
1012                path.display()
1013            ))
1014        })?;
1015        Ok(Some(manifest))
1016    }
1017
1018    pub fn chain_hash(&self) -> &str {
1019        &self.chain_hash
1020    }
1021
1022    pub const fn total_bytes(&self) -> u64 {
1023        self.total_bytes
1024    }
1025
1026    pub fn index_summary(&self) -> Result<Option<IndexSummary>> {
1027        let rows = self.read_index()?;
1028        let (Some(first), Some(last)) = (rows.first(), rows.last()) else {
1029            return Ok(None);
1030        };
1031        Ok(Some(IndexSummary {
1032            entry_count: u64::try_from(rows.len())
1033                .map_err(|_| Error::session("entry count exceeds u64"))?,
1034            first_entry_seq: first.entry_seq,
1035            last_entry_seq: last.entry_seq,
1036            last_entry_id: last.entry_id.clone(),
1037        }))
1038    }
1039
1040    /// Rebuild the offset index by scanning all segment files.
1041    /// This is the recovery path when the index is missing or corrupted.
1042    #[allow(clippy::too_many_lines)]
1043    pub fn rebuild_index(&mut self) -> Result<u64> {
1044        let mut rebuilt_count = 0u64;
1045        let index_path = self.index_file_path();
1046        let index_tmp_path = self.root.join("tmp").join("offsets.rebuild.tmp");
1047
1048        // Ensure tmp dir exists
1049        if let Some(parent) = index_tmp_path.parent() {
1050            fs::create_dir_all(parent)?;
1051        }
1052
1053        // Start fresh with the temp file
1054        if index_tmp_path.exists() {
1055            fs::remove_file(&index_tmp_path)?;
1056        }
1057
1058        let mut index_writer = std::io::BufWriter::new(
1059            secure_open_options()
1060                .create(true)
1061                .write(true)
1062                .truncate(true)
1063                .open(&index_tmp_path)?,
1064        );
1065
1066        self.chain_hash = GENESIS_CHAIN_HASH.to_string();
1067        self.total_bytes = 0;
1068        self.last_entry_id = None;
1069        self.last_crc32c = "00000000".to_string();
1070
1071        let segment_files = self.list_segment_files()?;
1072        let mut last_observed_seq = 0u64;
1073
1074        'segments: for (i, (segment_seq, seg_path)) in segment_files.iter().enumerate() {
1075            let file = File::open(seg_path)?;
1076            let mut reader = BufReader::new(file);
1077            let mut byte_offset = 0u64;
1078            let mut line_number = 0u64;
1079            let mut expected_frame_seq = 1u64;
1080            let mut line = String::new();
1081
1082            loop {
1083                line.clear();
1084                // Use bounded read to prevent OOM on corrupted files (e.g. missing newlines)
1085                let bytes_read =
1086                    match read_line_with_limit(&mut reader, &mut line, MAX_FRAME_READ_BYTES) {
1087                        Ok(n) => n,
1088                        Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
1089                            return Err(Error::session(format!(
1090                                "failed to read segment frame while rebuilding index: \
1091                             segment={} line={}: {e}",
1092                                seg_path.display(),
1093                                line_number.saturating_add(1),
1094                            )));
1095                        }
1096                        Err(e) => return Err(Error::Io(Box::new(e))),
1097                    };
1098
1099                if bytes_read == 0 {
1100                    break;
1101                }
1102                line_number = line_number.saturating_add(1);
1103                let mut line_len = u64::try_from(bytes_read)
1104                    .map_err(|_| Error::session("line length exceeds u64"))?;
1105
1106                if line.trim().is_empty() {
1107                    byte_offset = byte_offset.saturating_add(line_len);
1108                    continue;
1109                }
1110
1111                let missing_newline = !line.ends_with('\n');
1112                let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1113                let frame: SegmentFrame = match serde_json::from_str(json_line) {
1114                    Ok(frame) => {
1115                        if missing_newline {
1116                            use std::io::{Read, Write};
1117                            tracing::warn!(
1118                                segment = %seg_path.display(),
1119                                line_number,
1120                                "SessionStoreV2 encountered valid frame missing trailing newline; healing segment"
1121                            );
1122                            let mut f = secure_open_options().append(true).open(seg_path)?;
1123                            f.write_all(b"\n")?;
1124                            line.push('\n');
1125                            line_len += 1;
1126                            // Consume the healed newline so the reader and offset accounting stay aligned.
1127                            let mut healed_newline = [0u8; 1];
1128                            reader.read_exact(&mut healed_newline).map_err(|err| {
1129                                Error::session(format!(
1130                                    "failed to consume healed newline while rebuilding index: \
1131                                     segment={} line={line_number}: {err}",
1132                                    seg_path.display()
1133                                ))
1134                            })?;
1135                            if healed_newline[0] != b'\n' {
1136                                return Err(Error::session(format!(
1137                                    "healed newline read back as non-newline byte while rebuilding index: \
1138                                     segment={} line={line_number}: 0x{:02X}",
1139                                    seg_path.display(),
1140                                    healed_newline[0]
1141                                )));
1142                            }
1143                        }
1144                        frame
1145                    }
1146                    Err(err) => {
1147                        let at_eof = reader.fill_buf().is_ok_and(<[u8]>::is_empty);
1148                        if !at_eof || !missing_newline {
1149                            return Err(Error::session(format!(
1150                                "failed to parse segment frame while rebuilding index: \
1151                                 segment={} line={line_number}: {err}",
1152                                seg_path.display()
1153                            )));
1154                        }
1155                        tracing::warn!(
1156                            segment = %seg_path.display(),
1157                            line_number,
1158                            error = %err,
1159                            at_eof,
1160                            missing_newline,
1161                            "SessionStoreV2 dropping corrupted frame during index rebuild; truncating segment and quarantining subsequent segments"
1162                        );
1163                        // Trim the incomplete tail so subsequent reads and appends remain valid.
1164                        drop(reader);
1165                        truncate_file_to(seg_path, byte_offset)?;
1166                        quarantine_segment_tail(&segment_files[i + 1..])?;
1167                        break 'segments;
1168                    }
1169                };
1170
1171                if frame.segment_seq != *segment_seq || frame.frame_seq != expected_frame_seq {
1172                    tracing::warn!(
1173                        segment = %seg_path.display(),
1174                        line_number,
1175                        expected_segment_seq = *segment_seq,
1176                        actual_segment_seq = frame.segment_seq,
1177                        expected_frame_seq,
1178                        actual_frame_seq = frame.frame_seq,
1179                        "SessionStoreV2 detected mismatched embedded frame coordinates during rebuild; truncating segment and quarantining subsequent segments"
1180                    );
1181                    drop(reader);
1182                    truncate_file_to(seg_path, byte_offset)?;
1183                    quarantine_segment_tail(&segment_files[i + 1..])?;
1184                    break 'segments;
1185                }
1186
1187                if frame.entry_seq <= last_observed_seq {
1188                    tracing::warn!(
1189                        segment = %seg_path.display(),
1190                        line_number,
1191                        entry_seq = frame.entry_seq,
1192                        last_seq = last_observed_seq,
1193                        "SessionStoreV2 detected non-monotonic entry sequence during rebuild; truncating segment and quarantining subsequent segments"
1194                    );
1195                    drop(reader);
1196                    truncate_file_to(seg_path, byte_offset)?;
1197                    quarantine_segment_tail(&segment_files[i + 1..])?;
1198                    break 'segments;
1199                }
1200                last_observed_seq = frame.entry_seq;
1201
1202                let crc = crc32c_upper(line.as_bytes());
1203
1204                let index_entry = OffsetIndexEntry {
1205                    schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
1206                    entry_seq: frame.entry_seq,
1207                    entry_id: frame.entry_id.clone(),
1208                    segment_seq: *segment_seq,
1209                    frame_seq: expected_frame_seq,
1210                    byte_offset,
1211                    byte_length: line_len,
1212                    crc32c: crc.clone(),
1213                    state: Cow::Borrowed("active"),
1214                };
1215                serde_json::to_writer(&mut index_writer, &index_entry)?;
1216                index_writer.write_all(b"\n")?;
1217
1218                self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
1219                self.total_bytes = self.total_bytes.saturating_add(line_len);
1220                self.last_entry_id = Some(frame.entry_id);
1221                self.last_crc32c = crc;
1222
1223                byte_offset = byte_offset.saturating_add(line_len);
1224                rebuilt_count = rebuilt_count.saturating_add(1);
1225                expected_frame_seq = expected_frame_seq
1226                    .checked_add(1)
1227                    .ok_or_else(|| Error::session("frame sequence overflow during rebuild"))?;
1228            }
1229        }
1230
1231        index_writer.flush()?;
1232        let file = index_writer
1233            .into_inner()
1234            .map_err(std::io::IntoInnerError::into_error)?;
1235        file.sync_all()?;
1236        drop(file); // Close the file handle before renaming (fixes Windows ERROR_SHARING_VIOLATION)
1237
1238        // Atomically replace the old index with the rebuilt one
1239        fs::rename(&index_tmp_path, &index_path)?;
1240        sync_parent_dir(&index_path)?;
1241
1242        self.next_segment_seq = 1;
1243        self.next_frame_seq = 1;
1244        self.next_entry_seq = 1;
1245        self.current_segment_bytes = 0;
1246        self.bootstrap_from_disk()?;
1247
1248        Ok(rebuilt_count)
1249    }
1250
1251    pub fn validate_integrity(&self) -> Result<()> {
1252        let index_rows = self.read_index()?;
1253        let mut last_entry_seq = 0;
1254        let mut parent_by_entry: std::collections::HashMap<String, Option<String>> =
1255            std::collections::HashMap::with_capacity(index_rows.len());
1256
1257        // Group rows by segment to minimize file opens
1258        let mut rows_by_segment: std::collections::BTreeMap<u64, Vec<&OffsetIndexEntry>> =
1259            std::collections::BTreeMap::new();
1260        for row in &index_rows {
1261            if row.entry_seq <= last_entry_seq {
1262                return Err(Error::session(format!(
1263                    "entry sequence is not strictly increasing at entry_seq={}",
1264                    row.entry_seq
1265                )));
1266            }
1267            last_entry_seq = row.entry_seq;
1268            rows_by_segment
1269                .entry(row.segment_seq)
1270                .or_default()
1271                .push(row);
1272        }
1273
1274        for (segment_seq, rows) in rows_by_segment {
1275            let segment_path = self.segment_file_path(segment_seq);
1276            let mut file = File::open(&segment_path).map_err(|err| {
1277                Error::session(format!(
1278                    "failed to open segment {}: {err}",
1279                    segment_path.display()
1280                ))
1281            })?;
1282            let segment_len = file.metadata()?.len();
1283
1284            for row in rows {
1285                let end = row
1286                    .byte_offset
1287                    .checked_add(row.byte_length)
1288                    .ok_or_else(|| Error::session("index byte range overflow"))?;
1289                if end > segment_len {
1290                    return Err(Error::session(format!(
1291                        "index out of bounds for segment {}: end={} len={segment_len}",
1292                        segment_path.display(),
1293                        end
1294                    )));
1295                }
1296
1297                file.seek(SeekFrom::Start(row.byte_offset))?;
1298                let mut record_bytes = vec![
1299                    0u8;
1300                    usize::try_from(row.byte_length).map_err(|_| {
1301                        Error::session(format!("byte length too large: {}", row.byte_length))
1302                    })?
1303                ];
1304                file.read_exact(&mut record_bytes)?;
1305
1306                let checksum = crc32c_upper(&record_bytes);
1307                if checksum != row.crc32c {
1308                    return Err(Error::session(format!(
1309                        "checksum mismatch for entry_seq={} expected={} actual={checksum}",
1310                        row.entry_seq, row.crc32c
1311                    )));
1312                }
1313
1314                if record_bytes.last() == Some(&b'\n') {
1315                    record_bytes.pop();
1316                }
1317                let frame: SegmentFrame = serde_json::from_slice(&record_bytes)?;
1318
1319                if frame.entry_seq != row.entry_seq
1320                    || frame.entry_id != row.entry_id
1321                    || frame.segment_seq != row.segment_seq
1322                    || frame.frame_seq != row.frame_seq
1323                {
1324                    return Err(Error::session(format!(
1325                        "index/frame mismatch at entry_seq={}",
1326                        row.entry_seq
1327                    )));
1328                }
1329
1330                let (payload_hash, payload_bytes) = payload_hash_and_size(&frame.payload)?;
1331                if frame.payload_sha256 != payload_hash || frame.payload_bytes != payload_bytes {
1332                    return Err(Error::session(format!(
1333                        "payload integrity mismatch at entry_seq={}",
1334                        row.entry_seq
1335                    )));
1336                }
1337
1338                if parent_by_entry
1339                    .insert(frame.entry_id.clone(), frame.parent_entry_id.clone())
1340                    .is_some()
1341                {
1342                    return Err(Error::session(format!(
1343                        "duplicate entry_id detected in session store: {}",
1344                        frame.entry_id
1345                    )));
1346                }
1347            }
1348        }
1349
1350        validate_parent_graph_links(&parent_by_entry)?;
1351        validate_parent_graph_acyclic(&parent_by_entry)?;
1352
1353        Ok(())
1354    }
1355
1356    fn bootstrap_from_disk(&mut self) -> Result<()> {
1357        let index_rows = self.read_index()?;
1358        if let Some(last) = index_rows.last() {
1359            self.next_entry_seq = last
1360                .entry_seq
1361                .checked_add(1)
1362                .ok_or_else(|| Error::session("entry sequence overflow while bootstrapping"))?;
1363            self.next_segment_seq = last.segment_seq;
1364            self.next_frame_seq = last
1365                .frame_seq
1366                .checked_add(1)
1367                .ok_or_else(|| Error::session("frame sequence overflow while bootstrapping"))?;
1368            let segment_path = self.segment_file_path(last.segment_seq);
1369            let expected_segment_bytes = last.byte_offset.saturating_add(last.byte_length);
1370            let actual_segment_bytes =
1371                fs::metadata(&segment_path)
1372                    .map(|meta| meta.len())
1373                    .map_err(|err| {
1374                        Error::session(format!(
1375                            "failed to stat active segment {} while bootstrapping: {err}",
1376                            segment_path.display()
1377                        ))
1378                    })?;
1379
1380            if actual_segment_bytes > expected_segment_bytes {
1381                tracing::warn!(
1382                    segment = %segment_path.display(),
1383                    expected = expected_segment_bytes,
1384                    actual = actual_segment_bytes,
1385                    "SessionStoreV2 truncating unindexed trailing bytes from active segment after crash recovery"
1386                );
1387                truncate_file_to(&segment_path, expected_segment_bytes)?;
1388            }
1389            self.current_segment_bytes = expected_segment_bytes;
1390            self.last_entry_id = Some(last.entry_id.clone());
1391            self.last_crc32c.clone_from(&last.crc32c);
1392
1393            let mut chain = GENESIS_CHAIN_HASH.to_string();
1394            let mut total = 0u64;
1395            let mut reader = SegmentFileReader::new(self);
1396            for row in &index_rows {
1397                let frame = reader.read_frame(row)?.ok_or_else(|| {
1398                    Error::session(format!(
1399                        "index references missing frame during bootstrap: entry_seq={}, segment={}",
1400                        row.entry_seq, row.segment_seq
1401                    ))
1402                })?;
1403                chain = chain_hash_step(&chain, &frame.payload_sha256);
1404                total = total.saturating_add(row.byte_length);
1405            }
1406            self.chain_hash = chain;
1407            self.total_bytes = total;
1408        } else {
1409            self.chain_hash = GENESIS_CHAIN_HASH.to_string();
1410            self.total_bytes = 0;
1411            self.last_entry_id = None;
1412            self.last_crc32c = "00000000".to_string();
1413        }
1414        Ok(())
1415    }
1416}
1417
1418fn rollback_failure_event_already_recorded(error: &Error) -> bool {
1419    matches!(error, Error::Session(message) if message.contains("rollback verification failed"))
1420}
1421
1422fn classify_rollback_error(error: &Error) -> &'static str {
1423    match error {
1424        Error::Session(message) => {
1425            if message.contains("checkpoint") && message.contains("not found") {
1426                "checkpoint_not_found"
1427            } else if message.contains("index byte range overflow") {
1428                "index_range_overflow"
1429            } else if message.contains("rollback verification failed") {
1430                "rollback_verification_failed"
1431            } else {
1432                "session_error"
1433            }
1434        }
1435        _ => error.category_code(),
1436    }
1437}
1438
1439fn is_recoverable_index_error(error: &Error) -> bool {
1440    match error {
1441        Error::Json(_) => true,
1442        Error::Io(err) => matches!(
1443            err.kind(),
1444            std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::InvalidData
1445        ),
1446        Error::Session(message) => {
1447            let lower = message.to_ascii_lowercase();
1448            lower.contains("checksum mismatch")
1449                || lower.contains("index out of bounds")
1450                || lower.contains("index/frame mismatch")
1451                || lower.contains("index references missing frame")
1452                || lower.contains("payload integrity mismatch")
1453                || lower.contains("entry sequence is not strictly increasing")
1454                || lower.contains("index byte range overflow")
1455                || lower.contains("failed to stat active segment")
1456        }
1457        _ => false,
1458    }
1459}
1460
1461#[derive(Clone, Copy, PartialEq, Eq)]
1462enum ParentGraphVisitState {
1463    Visiting,
1464    Visited,
1465}
1466
1467fn validate_parent_graph_links(
1468    parent_by_entry: &std::collections::HashMap<String, Option<String>>,
1469) -> Result<()> {
1470    for (entry_id, parent_id) in parent_by_entry {
1471        if let Some(parent_id) = parent_id.as_deref()
1472            && !parent_by_entry.contains_key(parent_id)
1473        {
1474            return Err(Error::session(format!(
1475                "missing parent entry detected in session store: entry_id={entry_id} parent_id={parent_id}"
1476            )));
1477        }
1478    }
1479
1480    Ok(())
1481}
1482
1483fn validate_parent_graph_acyclic(
1484    parent_by_entry: &std::collections::HashMap<String, Option<String>>,
1485) -> Result<()> {
1486    let mut visit_state: std::collections::HashMap<&str, ParentGraphVisitState> =
1487        std::collections::HashMap::with_capacity(parent_by_entry.len());
1488
1489    for entry_id in parent_by_entry.keys() {
1490        if visit_state.get(entry_id.as_str()) == Some(&ParentGraphVisitState::Visited) {
1491            continue;
1492        }
1493
1494        let mut stack = vec![(entry_id.as_str(), false)];
1495        while let Some((current_id, expanded)) = stack.pop() {
1496            if expanded {
1497                visit_state.insert(current_id, ParentGraphVisitState::Visited);
1498                continue;
1499            }
1500
1501            match visit_state.get(current_id).copied() {
1502                Some(ParentGraphVisitState::Visited) => continue,
1503                Some(ParentGraphVisitState::Visiting) => {
1504                    return Err(Error::session(format!(
1505                        "cyclic parent chain detected in session store at entry_id={current_id}"
1506                    )));
1507                }
1508                None => {}
1509            }
1510
1511            visit_state.insert(current_id, ParentGraphVisitState::Visiting);
1512            stack.push((current_id, true));
1513
1514            if let Some(parent_id) = parent_by_entry
1515                .get(current_id)
1516                .and_then(std::option::Option::as_deref)
1517                && parent_by_entry.contains_key(parent_id)
1518            {
1519                stack.push((parent_id, false));
1520            }
1521        }
1522    }
1523
1524    Ok(())
1525}
1526
1527/// Convert a V2 `SegmentFrame` payload back into a `SessionEntry`.
1528pub fn frame_to_session_entry(frame: &SegmentFrame) -> Result<SessionEntry> {
1529    // Deserialize directly from the RawValue to avoid extra allocation/copying.
1530    // serde_json::from_str works on RawValue.get() which is &str.
1531    let entry: SessionEntry = serde_json::from_str(frame.payload.get()).map_err(|e| {
1532        Error::session(format!(
1533            "failed to deserialize SessionEntry from frame entry_id={}: {e}",
1534            frame.entry_id
1535        ))
1536    })?;
1537
1538    if let Some(base_id) = entry.base_id() {
1539        if base_id != &frame.entry_id {
1540            return Err(Error::session(format!(
1541                "frame entry_id mismatch: frame={} entry={}",
1542                frame.entry_id, base_id
1543            )));
1544        }
1545    }
1546
1547    Ok(entry)
1548}
1549
1550/// Extract the V2 frame arguments from a `SessionEntry`.
1551pub fn session_entry_to_frame_args(
1552    entry: &SessionEntry,
1553) -> Result<(String, Option<String>, String, Value)> {
1554    let base = entry.base();
1555    let entry_id = base
1556        .id
1557        .clone()
1558        .ok_or_else(|| Error::session("SessionEntry has no id"))?;
1559    let parent_entry_id = base.parent_id.clone();
1560
1561    let entry_type = match entry {
1562        SessionEntry::Message(_) => "message",
1563        SessionEntry::ModelChange(_) => "model_change",
1564        SessionEntry::ThinkingLevelChange(_) => "thinking_level_change",
1565        SessionEntry::Compaction(_) => "compaction",
1566        SessionEntry::BranchSummary(_) => "branch_summary",
1567        SessionEntry::Label(_) => "label",
1568        SessionEntry::SessionInfo(_) => "session_info",
1569        SessionEntry::Custom(_) => "custom",
1570    };
1571
1572    let payload = serde_json::to_value(entry).map_err(|e| {
1573        Error::session(format!(
1574            "failed to serialize SessionEntry to frame payload: {e}"
1575        ))
1576    })?;
1577
1578    Ok((entry_id, parent_entry_id, entry_type.to_string(), payload))
1579}
1580
1581/// Helper to cache the file descriptor when reading multiple frames sequentially.
1582struct SegmentFileReader<'a> {
1583    store: &'a SessionStoreV2,
1584    current_segment_seq: Option<u64>,
1585    current_file: Option<File>,
1586    current_len: u64,
1587}
1588
1589impl<'a> SegmentFileReader<'a> {
1590    const fn new(store: &'a SessionStoreV2) -> Self {
1591        Self {
1592            store,
1593            current_segment_seq: None,
1594            current_file: None,
1595            current_len: 0,
1596        }
1597    }
1598
1599    fn read_frame(&mut self, row: &OffsetIndexEntry) -> Result<Option<SegmentFrame>> {
1600        if self.current_segment_seq != Some(row.segment_seq) {
1601            self.current_segment_seq = Some(row.segment_seq);
1602            let path = self.store.segment_file_path(row.segment_seq);
1603            if path.exists() {
1604                let file = File::open(&path)?;
1605                self.current_len = file.metadata()?.len();
1606                self.current_file = Some(file);
1607            } else {
1608                self.current_file = None;
1609            }
1610        }
1611
1612        let Some(file) = self.current_file.as_mut() else {
1613            return Ok(None);
1614        };
1615
1616        let end_offset = row
1617            .byte_offset
1618            .checked_add(row.byte_length)
1619            .ok_or_else(|| Error::session("index byte range overflow"))?;
1620
1621        if end_offset > self.current_len {
1622            return Err(Error::session(format!(
1623                "index out of bounds for segment {}: end={} len={}",
1624                self.store.segment_file_path(row.segment_seq).display(),
1625                end_offset,
1626                self.current_len
1627            )));
1628        }
1629
1630        file.seek(SeekFrom::Start(row.byte_offset))?;
1631        let byte_len = usize::try_from(row.byte_length)
1632            .map_err(|_| Error::session(format!("byte length too large: {}", row.byte_length)))?;
1633
1634        if row.byte_length > self.store.max_segment_bytes.max(100 * 1024 * 1024) {
1635            return Err(Error::session(format!(
1636                "frame byte length {byte_len} exceeds limit"
1637            )));
1638        }
1639
1640        let mut buf = vec![0u8; byte_len];
1641        file.read_exact(&mut buf)?;
1642        if buf.last() == Some(&b'\n') {
1643            buf.pop();
1644        }
1645        let frame: SegmentFrame = serde_json::from_slice(&buf)?;
1646        Ok(Some(frame))
1647    }
1648}
1649
1650/// Compute next hash chain value: `SHA-256(prev_chain_hex || payload_sha256_hex)`.
1651fn chain_hash_step(prev_chain: &str, payload_sha256: &str) -> String {
1652    let mut hasher = Sha256::new();
1653    hasher.update(prev_chain.as_bytes());
1654    hasher.update(payload_sha256.as_bytes());
1655    format!("{:x}", hasher.finalize())
1656}
1657
1658fn manifest_hash_hex(manifest: &Manifest) -> Result<String> {
1659    let encoded = serde_json::to_vec(manifest)?;
1660    Ok(format!("{:x}", Sha256::digest(&encoded)))
1661}
1662
1663/// Derive the V2 sidecar store root from a JSONL session file path.
1664pub fn v2_sidecar_path(jsonl_path: &Path) -> PathBuf {
1665    let stem = jsonl_path.file_stem().map_or_else(
1666        || "session".to_string(),
1667        |s| s.to_string_lossy().into_owned(),
1668    );
1669    let parent = jsonl_path.parent().unwrap_or_else(|| Path::new("."));
1670    parent.join(format!("{stem}.v2"))
1671}
1672
1673/// Check whether a V2 sidecar store exists for the given JSONL session.
1674pub fn has_v2_sidecar(jsonl_path: &Path) -> bool {
1675    let root = v2_sidecar_path(jsonl_path);
1676    root.join("manifest.json").exists() || root.join("index").join("offsets.jsonl").exists()
1677}
1678
1679fn append_jsonl_line<T: Serialize>(path: &Path, value: &T) -> Result<()> {
1680    let file = secure_open_options().create(true).append(true).open(path)?;
1681    let mut writer = std::io::BufWriter::new(file);
1682    // Serialize directly to buffered file — avoids intermediate Vec<u8> allocation
1683    // while preventing excessive write syscalls.
1684    serde_json::to_writer(&mut writer, value)?;
1685    writer.write_all(b"\n")?;
1686    writer.flush()?;
1687    Ok(())
1688}
1689
1690fn truncate_file_to(path: &Path, len: u64) -> Result<()> {
1691    let file = secure_open_options()
1692        .write(true)
1693        .truncate(false)
1694        .open(path)?;
1695    file.set_len(len)?;
1696    file.sync_all()?;
1697    Ok(())
1698}
1699
1700fn quarantine_segment_file(path: &Path) -> Result<PathBuf> {
1701    let parent = path
1702        .parent()
1703        .ok_or_else(|| Error::session(format!("segment has no parent: {}", path.display())))?;
1704    let file_name = path
1705        .file_name()
1706        .map(|name| name.to_string_lossy().into_owned())
1707        .ok_or_else(|| Error::session(format!("segment has no filename: {}", path.display())))?;
1708
1709    for suffix in 0u32..10_000 {
1710        let backup_name = if suffix == 0 {
1711            format!("{file_name}.bak")
1712        } else {
1713            format!("{file_name}.bak.{suffix}")
1714        };
1715        let backup_path = parent.join(backup_name);
1716        if backup_path.exists() {
1717            continue;
1718        }
1719
1720        fs::rename(path, &backup_path).map_err(|err| {
1721            Error::session(format!(
1722                "failed to quarantine segment {} -> {}: {err}",
1723                path.display(),
1724                backup_path.display()
1725            ))
1726        })?;
1727        return Ok(backup_path);
1728    }
1729
1730    Err(Error::session(format!(
1731        "failed to quarantine segment {}: exhausted backup suffixes",
1732        path.display()
1733    )))
1734}
1735
1736fn quarantine_segment_tail(segment_files: &[(u64, PathBuf)]) -> Result<()> {
1737    for (_, path) in segment_files {
1738        let backup_path = quarantine_segment_file(path)?;
1739        tracing::warn!(
1740            segment = %path.display(),
1741            backup = %backup_path.display(),
1742            "SessionStoreV2 quarantined trailing segment during rebuild"
1743        );
1744    }
1745    Ok(())
1746}
1747
1748#[cfg(unix)]
1749fn sync_parent_dir(path: &Path) -> std::io::Result<()> {
1750    let Some(parent) = path.parent() else {
1751        return Ok(());
1752    };
1753    File::open(parent)?.sync_all()
1754}
1755
1756#[cfg(not(unix))]
1757fn sync_parent_dir(_path: &Path) -> std::io::Result<()> {
1758    Ok(())
1759}
1760
1761fn write_jsonl_lines<T: Serialize>(path: &Path, rows: &[T]) -> Result<()> {
1762    let file = secure_open_options()
1763        .create(true)
1764        .write(true)
1765        .truncate(true)
1766        .open(path)?;
1767    let mut writer = std::io::BufWriter::new(file);
1768    for row in rows {
1769        serde_json::to_writer(&mut writer, row)?;
1770        writer.write_all(b"\n")?;
1771    }
1772    writer.flush()?;
1773    let file = writer
1774        .into_inner()
1775        .map_err(std::io::IntoInnerError::into_error)?;
1776    file.sync_all()?;
1777    Ok(())
1778}
1779
1780fn read_jsonl<T: for<'de> Deserialize<'de>>(path: &Path) -> Result<Vec<T>> {
1781    let file = File::open(path)?;
1782    let mut reader = BufReader::new(file);
1783    let mut out = Vec::new();
1784    let mut line = String::new();
1785    loop {
1786        line.clear();
1787        let bytes_read = read_line_with_limit(&mut reader, &mut line, MAX_FRAME_READ_BYTES)
1788            .map_err(|e| Error::Io(Box::new(e)))?;
1789        if bytes_read == 0 {
1790            break;
1791        }
1792        if line.trim().is_empty() {
1793            continue;
1794        }
1795        let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1796        out.push(serde_json::from_str::<T>(json_line)?);
1797    }
1798    Ok(out)
1799}
1800
1801fn payload_hash_and_size(payload: &RawValue) -> Result<(String, u64)> {
1802    // For RawValue, we can just get the string content directly.
1803    let bytes = payload.get().as_bytes();
1804    let payload_bytes = u64::try_from(bytes.len())
1805        .map_err(|_| Error::session(format!("payload is too large: {} bytes", bytes.len())))?;
1806    let hash = format!("{:x}", Sha256::digest(bytes));
1807    Ok((hash, payload_bytes))
1808}
1809
1810fn line_length_u64(encoded: &[u8]) -> Result<u64> {
1811    let line_len = encoded
1812        .len()
1813        .checked_add(1)
1814        .ok_or_else(|| Error::session("line length overflow"))?;
1815    u64::try_from(line_len).map_err(|_| Error::session("line length exceeds u64"))
1816}
1817
1818fn crc32c_upper(data: &[u8]) -> String {
1819    let crc = crc32c::crc32c(data);
1820    format!("{crc:08X}")
1821}
1822
1823fn read_line_with_limit<R: BufRead>(
1824    reader: &mut R,
1825    buf: &mut String,
1826    limit: u64,
1827) -> std::io::Result<usize> {
1828    let mut take = reader.take(limit);
1829    let n = take.read_line(buf)?;
1830    if n > 0 && take.limit() == 0 && !buf.ends_with('\n') {
1831        // We reached the limit, but this might just be the exact end of the file.
1832        // Check if there is more data in the underlying reader.
1833        let is_eof = take.into_inner().fill_buf()?.is_empty();
1834        if !is_eof {
1835            return Err(std::io::Error::new(
1836                std::io::ErrorKind::InvalidData,
1837                format!("Line length exceeds limit of {limit} bytes"),
1838            ));
1839        }
1840    }
1841    Ok(n)
1842}
1843
1844#[cfg(test)]
1845mod proptests {
1846    use super::*;
1847    use proptest::prelude::*;
1848    use serde_json::json;
1849    use std::fs;
1850
1851    #[test]
1852    fn quarantine_segment_file_moves_segment_to_backup() {
1853        let tmp = tempfile::tempdir().expect("tempdir");
1854        let segment = tmp.path().join("0000000000000002.seg");
1855        fs::write(&segment, b"hello").expect("write segment");
1856
1857        let backup = quarantine_segment_file(&segment).expect("quarantine segment");
1858
1859        assert_eq!(backup, tmp.path().join("0000000000000002.seg.bak"));
1860        assert!(!segment.exists(), "original segment should be moved away");
1861        assert_eq!(fs::read(&backup).expect("read backup"), b"hello");
1862    }
1863
1864    #[test]
1865    fn quarantine_segment_file_uses_next_available_backup_suffix() {
1866        let tmp = tempfile::tempdir().expect("tempdir");
1867        let segment = tmp.path().join("0000000000000002.seg");
1868        let existing_backup = tmp.path().join("0000000000000002.seg.bak");
1869        fs::write(&segment, b"new").expect("write segment");
1870        fs::write(&existing_backup, b"old").expect("write existing backup");
1871
1872        let backup = quarantine_segment_file(&segment).expect("quarantine segment");
1873
1874        assert_eq!(backup, tmp.path().join("0000000000000002.seg.bak.1"));
1875        assert_eq!(
1876            fs::read(&existing_backup).expect("read existing backup"),
1877            b"old"
1878        );
1879        assert_eq!(fs::read(&backup).expect("read new backup"), b"new");
1880    }
1881
1882    #[test]
1883    fn create_recovers_from_index_row_that_references_missing_segment() {
1884        let tmp = tempfile::tempdir().expect("tempdir");
1885        let root = tmp.path().join("store");
1886        let mut store = SessionStoreV2::create(&root, 4096).expect("create store");
1887        store
1888            .append_entry("entry-1", None, "message", json!({"n": 1}))
1889            .expect("append entry");
1890
1891        let mut rows = store.read_index().expect("read index");
1892        assert_eq!(rows.len(), 1);
1893        rows[0].segment_seq = 999;
1894        write_jsonl_lines(&store.index_file_path(), &rows).expect("write corrupted index");
1895        drop(store);
1896
1897        let reopened = SessionStoreV2::create(&root, 4096).expect("reopen store");
1898        assert_eq!(reopened.entry_count(), 1);
1899
1900        let rebuilt_rows = reopened.read_index().expect("read rebuilt index");
1901        assert_eq!(rebuilt_rows.len(), 1);
1902        assert_eq!(rebuilt_rows[0].segment_seq, 1);
1903        assert!(reopened.lookup_entry(1).expect("lookup entry").is_some());
1904    }
1905
1906    #[test]
1907    fn create_drops_frame_with_mismatched_embedded_segment_seq_during_rebuild() {
1908        let tmp = tempfile::tempdir().expect("tempdir");
1909        let root = tmp.path().join("store");
1910        let mut store = SessionStoreV2::create(&root, 4096).expect("create store");
1911        store
1912            .append_entry("entry-1", None, "message", json!({"n": 1}))
1913            .expect("append first entry");
1914        store
1915            .append_entry(
1916                "entry-2",
1917                Some("entry-1".to_string()),
1918                "message",
1919                json!({"n": 2}),
1920            )
1921            .expect("append second entry");
1922
1923        let segment_path = store.segment_file_path(1);
1924        let mut frames = store.read_segment(1).expect("read segment");
1925        assert_eq!(frames.len(), 2);
1926        frames[1].segment_seq = 77;
1927        write_jsonl_lines(&segment_path, &frames).expect("write corrupted segment");
1928        fs::remove_file(store.index_file_path()).expect("remove index");
1929        drop(store);
1930
1931        let reopened = SessionStoreV2::create(&root, 4096).expect("reopen store");
1932        assert_eq!(reopened.entry_count(), 1);
1933
1934        let rebuilt_rows = reopened.read_index().expect("read rebuilt index");
1935        assert_eq!(rebuilt_rows.len(), 1);
1936        assert_eq!(rebuilt_rows[0].entry_seq, 1);
1937        assert_eq!(reopened.read_segment(1).expect("read segment").len(), 1);
1938        assert!(reopened.lookup_entry(2).expect("lookup entry").is_none());
1939    }
1940
1941    // ====================================================================
1942    // chain_hash_step
1943    // ====================================================================
1944
1945    proptest! {
1946        #[test]
1947        fn chain_hash_output_is_64_hex(
1948            a in "[0-9a-f]{64}",
1949            b in "[0-9a-f]{64}",
1950        ) {
1951            let result = chain_hash_step(&a, &b);
1952            assert_eq!(result.len(), 64);
1953            assert!(result.chars().all(|c| c.is_ascii_hexdigit()));
1954        }
1955
1956        #[test]
1957        fn chain_hash_deterministic(
1958            a in "[0-9a-f]{64}",
1959            b in "[0-9a-f]{64}",
1960        ) {
1961            assert_eq!(chain_hash_step(&a, &b), chain_hash_step(&a, &b));
1962        }
1963
1964        #[test]
1965        fn chain_hash_non_commutative(
1966            a in "[0-9a-f]{64}",
1967            b in "[0-9a-f]{64}",
1968        ) {
1969            if a != b {
1970                assert_ne!(chain_hash_step(&a, &b), chain_hash_step(&b, &a));
1971            }
1972        }
1973
1974        #[test]
1975        fn chain_hash_genesis_differs_from_step(payload in "[0-9a-f]{64}") {
1976            let step1 = chain_hash_step(GENESIS_CHAIN_HASH, &payload);
1977            assert_ne!(step1, GENESIS_CHAIN_HASH);
1978        }
1979    }
1980
1981    // ====================================================================
1982    // crc32c_upper
1983    // ====================================================================
1984
1985    proptest! {
1986        #[test]
1987        fn crc32c_output_is_8_uppercase_hex(data in prop::collection::vec(any::<u8>(), 0..500)) {
1988            let result = crc32c_upper(&data);
1989            assert_eq!(result.len(), 8);
1990            assert!(result.chars().all(|c| matches!(c, '0'..='9' | 'A'..='F')));
1991        }
1992
1993        #[test]
1994        fn crc32c_deterministic(data in prop::collection::vec(any::<u8>(), 0..500)) {
1995            assert_eq!(crc32c_upper(&data), crc32c_upper(&data));
1996        }
1997
1998        #[test]
1999        fn crc32c_single_bit_sensitivity(byte in any::<u8>()) {
2000            let a = crc32c_upper(&[byte]);
2001            let b = crc32c_upper(&[byte ^ 1]);
2002            if byte != byte ^ 1 {
2003                assert_ne!(a, b, "flipping LSB should change CRC");
2004            }
2005        }
2006    }
2007
2008    // ====================================================================
2009    // payload_hash_and_size
2010    // ====================================================================
2011
2012    proptest! {
2013        #[test]
2014        fn payload_hash_is_64_hex(s in "[a-z]{0,50}") {
2015            let val = json!(s);
2016            let raw_string = serde_json::to_string(&val).unwrap();
2017            let raw = RawValue::from_string(raw_string).unwrap();
2018            let (hash, _size) = payload_hash_and_size(&raw).unwrap();
2019            assert_eq!(hash.len(), 64);
2020            assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
2021        }
2022
2023        #[test]
2024        fn payload_size_matches_serialization(s in "[a-z]{0,50}") {
2025            let val = json!(s);
2026            let raw_string = serde_json::to_string(&val).unwrap();
2027            let raw = RawValue::from_string(raw_string).unwrap();
2028            let (_, size) = payload_hash_and_size(&raw).unwrap();
2029            let expected = serde_json::to_vec(&val).unwrap().len() as u64;
2030            assert_eq!(size, expected);
2031        }
2032
2033        #[test]
2034        fn payload_hash_deterministic(n in 0i64..10000) {
2035            let val = json!(n);
2036            let raw_string = serde_json::to_string(&val).unwrap();
2037            let raw = RawValue::from_string(raw_string).unwrap();
2038            let (h1, s1) = payload_hash_and_size(&raw).unwrap();
2039            let (h2, s2) = payload_hash_and_size(&raw).unwrap();
2040            assert_eq!(h1, h2);
2041            assert_eq!(s1, s2);
2042        }
2043    }
2044
2045    // ====================================================================
2046    // line_length_u64
2047    // ====================================================================
2048
2049    proptest! {
2050        #[test]
2051        fn line_length_is_len_plus_one(data in prop::collection::vec(any::<u8>(), 0..1000)) {
2052            let result = line_length_u64(&data).unwrap();
2053            assert_eq!(result, data.len() as u64 + 1);
2054        }
2055
2056        #[test]
2057        fn line_length_never_zero(data in prop::collection::vec(any::<u8>(), 0..100)) {
2058            let result = line_length_u64(&data).unwrap();
2059            assert!(result >= 1);
2060        }
2061    }
2062
2063    // ====================================================================
2064    // v2_sidecar_path
2065    // ====================================================================
2066
2067    proptest! {
2068        #[test]
2069        fn sidecar_path_ends_with_v2(stem in "[a-z]{1,10}") {
2070            let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
2071            let result = v2_sidecar_path(&input);
2072            let name = result.file_name().unwrap().to_str().unwrap();
2073            assert_eq!(
2074                Path::new(name).extension().and_then(|ext| ext.to_str()),
2075                Some("v2"),
2076                "expected .v2 suffix, got {name}"
2077            );
2078        }
2079
2080        #[test]
2081        fn sidecar_path_preserves_parent(stem in "[a-z]{1,10}", dir in "[a-z]{1,8}") {
2082            let input = PathBuf::from(format!("/tmp/{dir}/{stem}.jsonl"));
2083            let result = v2_sidecar_path(&input);
2084            assert_eq!(
2085                result.parent().unwrap(),
2086                Path::new(&format!("/tmp/{dir}"))
2087            );
2088        }
2089
2090        #[test]
2091        fn sidecar_path_deterministic(stem in "[a-z]{1,10}") {
2092            let input = PathBuf::from(format!("/sessions/{stem}.jsonl"));
2093            assert_eq!(v2_sidecar_path(&input), v2_sidecar_path(&input));
2094        }
2095
2096        #[test]
2097        fn sidecar_path_contains_stem(stem in "[a-z]{1,10}") {
2098            let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
2099            let result = v2_sidecar_path(&input);
2100            let name = result.file_name().unwrap().to_str().unwrap();
2101            assert_eq!(name, format!("{stem}.v2"));
2102        }
2103    }
2104}