Skip to main content

pi/
session_store_v2.rs

1//! Session Store V2 segmented append log + sidecar index primitives.
2//!
3//! This module provides the storage core requested by Phase-2 performance work:
4//! - Segment append writer
5//! - Sidecar offset index rows
6//! - Reader helpers
7//! - Integrity validation (checksum + payload hash)
8
9use crate::error::{Error, Result};
10use crate::session::SessionEntry;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use serde_json::value::RawValue;
14use sha2::{Digest, Sha256};
15use std::borrow::Cow;
16use std::collections::BTreeSet;
17use std::fs::{self, File, OpenOptions};
18use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20
21pub const SEGMENT_FRAME_SCHEMA: &str = "pi.session_store_v2.segment_frame.v1";
22pub const OFFSET_INDEX_SCHEMA: &str = "pi.session_store_v2.offset_index.v1";
23pub const CHECKPOINT_SCHEMA: &str = "pi.session_store_v2.checkpoint.v1";
24pub const MANIFEST_SCHEMA: &str = "pi.session_store_v2.manifest.v1";
25pub const MIGRATION_EVENT_SCHEMA: &str = "pi.session_store_v2.migration_event.v1";
26
27/// Maximum size for a single frame line (100MB) to prevent OOM on corrupted files.
28const MAX_FRAME_READ_BYTES: u64 = 100 * 1024 * 1024;
29
30/// Initial chain hash before any frames are appended.
31const GENESIS_CHAIN_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct SegmentFrame {
36    pub schema: Cow<'static, str>,
37    pub segment_seq: u64,
38    pub frame_seq: u64,
39    pub entry_seq: u64,
40    pub entry_id: String,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub parent_entry_id: Option<String>,
43    pub entry_type: String,
44    pub timestamp: String,
45    pub payload_sha256: String,
46    pub payload_bytes: u64,
47    pub payload: Box<RawValue>,
48}
49
50impl SegmentFrame {
51    fn new(
52        segment_seq: u64,
53        frame_seq: u64,
54        entry_seq: u64,
55        entry_id: String,
56        parent_entry_id: Option<String>,
57        entry_type: String,
58        payload: Box<RawValue>,
59    ) -> Result<Self> {
60        let (payload_sha256, payload_bytes) = payload_hash_and_size(&payload)?;
61        Ok(Self {
62            schema: Cow::Borrowed(SEGMENT_FRAME_SCHEMA),
63            segment_seq,
64            frame_seq,
65            entry_seq,
66            entry_id,
67            parent_entry_id,
68            entry_type,
69            timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
70            payload_sha256,
71            payload_bytes,
72            payload,
73        })
74    }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct OffsetIndexEntry {
80    pub schema: Cow<'static, str>,
81    pub entry_seq: u64,
82    pub entry_id: String,
83    pub segment_seq: u64,
84    pub frame_seq: u64,
85    pub byte_offset: u64,
86    pub byte_length: u64,
87    pub crc32c: String,
88    pub state: Cow<'static, str>,
89}
90
91/// Current head position of the store (last written entry).
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
93#[serde(rename_all = "camelCase")]
94pub struct StoreHead {
95    pub segment_seq: u64,
96    pub entry_seq: u64,
97    pub entry_id: String,
98}
99
100/// Periodic checkpoint snapshot metadata.
101#[derive(Debug, Clone, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct Checkpoint {
104    pub schema: String,
105    pub checkpoint_seq: u64,
106    pub at: String,
107    pub head_entry_seq: u64,
108    pub head_entry_id: String,
109    pub snapshot_ref: String,
110    pub compacted_before_entry_seq: u64,
111    pub chain_hash: String,
112    pub reason: String,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
116#[serde(rename_all = "camelCase")]
117pub struct Manifest {
118    pub schema: String,
119    pub store_version: u8,
120    pub session_id: String,
121    pub source_format: String,
122    pub created_at: String,
123    pub updated_at: String,
124    pub head: StoreHead,
125    pub counters: ManifestCounters,
126    pub files: ManifestFiles,
127    pub integrity: ManifestIntegrity,
128    pub invariants: ManifestInvariants,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132#[serde(rename_all = "camelCase")]
133pub struct ManifestCounters {
134    pub entries_total: u64,
135    pub messages_total: u64,
136    pub branches_total: u64,
137    pub compactions_total: u64,
138    pub bytes_total: u64,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct ManifestFiles {
144    pub segment_dir: String,
145    pub segment_count: u64,
146    pub index_path: String,
147    pub checkpoint_dir: String,
148    pub migration_ledger_path: String,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152#[serde(rename_all = "camelCase")]
153pub struct ManifestIntegrity {
154    pub chain_hash: String,
155    pub manifest_hash: String,
156    pub last_crc32c: String,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(rename_all = "camelCase")]
161#[allow(clippy::struct_excessive_bools)] // invariants are naturally boolean checks
162pub struct ManifestInvariants {
163    pub parent_links_closed: bool,
164    pub monotonic_entry_seq: bool,
165    pub monotonic_segment_seq: bool,
166    pub index_within_segment_bounds: bool,
167    pub branch_heads_indexed: bool,
168    pub checkpoints_monotonic: bool,
169    pub hash_chain_valid: bool,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
173#[serde(rename_all = "camelCase")]
174pub struct MigrationVerification {
175    pub entry_count_match: bool,
176    pub hash_chain_match: bool,
177    pub index_consistent: bool,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
181#[serde(rename_all = "camelCase")]
182pub struct MigrationEvent {
183    pub schema: String,
184    pub migration_id: String,
185    pub phase: String,
186    pub at: String,
187    pub source_path: String,
188    pub target_path: String,
189    pub source_format: String,
190    pub target_format: String,
191    pub verification: MigrationVerification,
192    pub outcome: String,
193    #[serde(skip_serializing_if = "Option::is_none")]
194    pub error_class: Option<String>,
195    pub correlation_id: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
199#[serde(rename_all = "camelCase")]
200pub struct IndexSummary {
201    pub entry_count: u64,
202    pub first_entry_seq: u64,
203    pub last_entry_seq: u64,
204    pub last_entry_id: String,
205}
206
207#[derive(Debug, Clone)]
208pub struct SessionStoreV2 {
209    root: PathBuf,
210    max_segment_bytes: u64,
211    next_segment_seq: u64,
212    next_frame_seq: u64,
213    next_entry_seq: u64,
214    current_segment_bytes: u64,
215    /// Running SHA-256 hash chain: `H(prev_chain || payload_sha256)`.
216    chain_hash: String,
217    /// Total bytes written across all segments.
218    total_bytes: u64,
219    /// Last entry ID written (for head tracking).
220    last_entry_id: Option<String>,
221    /// Last CRC32-C written (for integrity checkpoints).
222    last_crc32c: String,
223}
224
225impl SessionStoreV2 {
226    pub fn create(root: impl AsRef<Path>, max_segment_bytes: u64) -> Result<Self> {
227        if max_segment_bytes == 0 {
228            return Err(Error::validation("max_segment_bytes must be > 0"));
229        }
230
231        let root = root.as_ref().to_path_buf();
232        fs::create_dir_all(root.join("segments"))?;
233        fs::create_dir_all(root.join("index"))?;
234        fs::create_dir_all(root.join("checkpoints"))?;
235        fs::create_dir_all(root.join("migrations"))?;
236        fs::create_dir_all(root.join("tmp"))?;
237
238        let mut store = Self {
239            root,
240            max_segment_bytes,
241            next_segment_seq: 1,
242            next_frame_seq: 1,
243            next_entry_seq: 1,
244            current_segment_bytes: 0,
245            chain_hash: GENESIS_CHAIN_HASH.to_string(),
246            total_bytes: 0,
247            last_entry_id: None,
248            last_crc32c: "00000000".to_string(),
249        };
250        if let Err(err) = store.bootstrap_from_disk() {
251            if is_recoverable_index_error(&err) {
252                tracing::warn!(
253                    root = %store.root.display(),
254                    error = %err,
255                    "SessionStoreV2 bootstrap failed with recoverable index error; attempting index rebuild"
256                );
257                store.rebuild_index()?;
258                store.bootstrap_from_disk()?;
259            } else {
260                return Err(err);
261            }
262        }
263
264        // Recovery path: segments exist but index file is missing or empty.
265        // Rebuild from segment frames so resume does not appear as an empty session.
266        if store.entry_count() == 0 && store.segments_exist_with_data()? {
267            tracing::warn!(
268                root = %store.root.display(),
269                "SessionStoreV2 detected segment data with empty index; rebuilding index"
270            );
271            store.rebuild_index()?;
272            store.bootstrap_from_disk()?;
273        }
274
275        if let Err(err) = store.validate_integrity() {
276            if is_recoverable_index_error(&err) {
277                tracing::warn!(
278                    root = %store.root.display(),
279                    error = %err,
280                    "SessionStoreV2 integrity validation failed with recoverable error; rebuilding index"
281                );
282                store.rebuild_index()?;
283                store.bootstrap_from_disk()?;
284                store.validate_integrity()?;
285            } else {
286                return Err(err);
287            }
288        }
289        Ok(store)
290    }
291
292    pub fn segment_file_path(&self, segment_seq: u64) -> PathBuf {
293        self.root
294            .join("segments")
295            .join(format!("{segment_seq:016}.seg"))
296    }
297
298    pub fn index_file_path(&self) -> PathBuf {
299        self.root.join("index").join("offsets.jsonl")
300    }
301
302    fn manifest_path(&self) -> PathBuf {
303        self.root.join("manifest.json")
304    }
305
306    fn migration_ledger_path(&self) -> PathBuf {
307        self.root.join("migrations").join("ledger.jsonl")
308    }
309
310    fn list_segment_files(&self) -> Result<Vec<(u64, PathBuf)>> {
311        let segments_dir = self.root.join("segments");
312        if !segments_dir.exists() {
313            return Ok(Vec::new());
314        }
315
316        let mut segment_files = Vec::new();
317        for entry in fs::read_dir(segments_dir)? {
318            let entry = entry?;
319            let path = entry.path();
320            if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
321                continue;
322            }
323            let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
324                continue;
325            };
326            let Ok(segment_seq) = stem.parse::<u64>() else {
327                continue;
328            };
329            segment_files.push((segment_seq, path));
330        }
331        segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
332        Ok(segment_files)
333    }
334
335    fn segments_exist_with_data(&self) -> Result<bool> {
336        for (_, path) in self.list_segment_files()? {
337            if fs::metadata(path)?.len() > 0 {
338                return Ok(true);
339            }
340        }
341        Ok(false)
342    }
343
344    #[allow(clippy::needless_pass_by_value)]
345    pub fn append_entry(
346        &mut self,
347        entry_id: impl Into<String>,
348        parent_entry_id: Option<String>,
349        entry_type: impl Into<String>,
350        payload: Value,
351    ) -> Result<OffsetIndexEntry> {
352        let entry_id = entry_id.into();
353        let entry_type = entry_type.into();
354
355        // Convert the generic Value into a RawValue (string slice) to avoid
356        // re-serializing the payload when writing the full frame.
357        // We do this by first serializing the Value to a string, then
358        // creating a Box<RawValue> from it.
359        let raw_string = serde_json::to_string(&payload)?;
360        let raw_payload = RawValue::from_string(raw_string)
361            .map_err(|e| Error::session(format!("failed to convert payload to RawValue: {e}")))?;
362
363        let mut frame = SegmentFrame::new(
364            self.next_segment_seq,
365            self.next_frame_seq,
366            self.next_entry_seq,
367            entry_id,
368            parent_entry_id,
369            entry_type,
370            raw_payload,
371        )?;
372        let mut encoded = serde_json::to_vec(&frame)?;
373        let mut line_len = line_length_u64(&encoded)?;
374
375        if self.current_segment_bytes > 0
376            && self.current_segment_bytes.saturating_add(line_len) > self.max_segment_bytes
377        {
378            self.next_segment_seq = self
379                .next_segment_seq
380                .checked_add(1)
381                .ok_or_else(|| Error::session("segment sequence overflow"))?;
382            self.next_frame_seq = 1;
383            self.current_segment_bytes = 0;
384
385            frame = SegmentFrame::new(
386                self.next_segment_seq,
387                self.next_frame_seq,
388                self.next_entry_seq,
389                frame.entry_id.clone(),
390                frame.parent_entry_id.clone(),
391                frame.entry_type.clone(),
392                frame.payload.clone(),
393            )?;
394            encoded = serde_json::to_vec(&frame)?;
395            line_len = line_length_u64(&encoded)?;
396        }
397
398        let segment_path = self.segment_file_path(self.next_segment_seq);
399        let byte_offset = fs::metadata(&segment_path).map_or(0, |meta| meta.len());
400
401        // Prepare the write buffer by appending the newline to the encoded JSON
402        let mut write_buf = encoded;
403        write_buf.push(b'\n');
404
405        let mut segment = OpenOptions::new()
406            .create(true)
407            .write(true)
408            .truncate(false)
409            .open(&segment_path)?;
410
411        let pre_write_len = segment.seek(SeekFrom::End(0))?;
412        if let Err(e) = segment.write_all(&write_buf) {
413            let _ = segment.set_len(pre_write_len);
414            return Err(Error::from(e));
415        }
416
417        // Use write_buf (which includes the newline) for CRC calculation
418        let crc = crc32c_upper(&write_buf);
419        let index_entry = OffsetIndexEntry {
420            schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
421            entry_seq: frame.entry_seq,
422            entry_id: frame.entry_id.clone(),
423            segment_seq: frame.segment_seq,
424            frame_seq: frame.frame_seq,
425            byte_offset,
426            byte_length: line_len,
427            crc32c: crc.clone(),
428            state: Cow::Borrowed("active"),
429        };
430
431        if let Err(e) = append_jsonl_line(&self.index_file_path(), &index_entry) {
432            // Rollback: truncate segment to remove the unindexed frame.
433            let _ = segment.set_len(pre_write_len);
434            return Err(e);
435        }
436
437        self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
438        self.total_bytes = self.total_bytes.saturating_add(line_len);
439        self.last_entry_id = Some(frame.entry_id);
440        self.last_crc32c = crc;
441
442        self.next_entry_seq = self
443            .next_entry_seq
444            .checked_add(1)
445            .ok_or_else(|| Error::session("entry sequence overflow"))?;
446        self.next_frame_seq = self
447            .next_frame_seq
448            .checked_add(1)
449            .ok_or_else(|| Error::session("frame sequence overflow"))?;
450        self.current_segment_bytes = self.current_segment_bytes.saturating_add(line_len);
451
452        Ok(index_entry)
453    }
454
455    pub fn read_segment(&self, segment_seq: u64) -> Result<Vec<SegmentFrame>> {
456        let path = self.segment_file_path(segment_seq);
457        if !path.exists() {
458            return Ok(Vec::new());
459        }
460        read_jsonl::<SegmentFrame>(&path)
461    }
462
463    pub fn read_index(&self) -> Result<Vec<OffsetIndexEntry>> {
464        let path = self.index_file_path();
465        if !path.exists() {
466            return Ok(Vec::new());
467        }
468        read_jsonl::<OffsetIndexEntry>(&path)
469    }
470
471    /// Seek to a specific entry by `entry_seq` using the offset index.
472    /// Returns `None` if the entry is not found.
473    pub fn lookup_entry(&self, target_entry_seq: u64) -> Result<Option<SegmentFrame>> {
474        let index_rows = self.read_index()?;
475        let row = index_rows.iter().find(|r| r.entry_seq == target_entry_seq);
476        let Some(row) = row else {
477            return Ok(None);
478        };
479        seek_read_frame(self, row)
480    }
481
482    /// Read all entries with `entry_seq >= from_entry_seq` (tail reading).
483    pub fn read_entries_from(&self, from_entry_seq: u64) -> Result<Vec<SegmentFrame>> {
484        let index_rows = self.read_index()?;
485        let mut frames = Vec::new();
486        for row in &index_rows {
487            if row.entry_seq < from_entry_seq {
488                continue;
489            }
490            if let Some(frame) = seek_read_frame(self, row)? {
491                frames.push(frame);
492            }
493        }
494        Ok(frames)
495    }
496
497    /// Read all entries across all segments in entry_seq order.
498    pub fn read_all_entries(&self) -> Result<Vec<SegmentFrame>> {
499        self.read_entries_from(1)
500    }
501
502    /// Read the last `count` entries by entry_seq using the offset index.
503    pub fn read_tail_entries(&self, count: u64) -> Result<Vec<SegmentFrame>> {
504        let index_rows = self.read_index()?;
505        let total = index_rows.len();
506        let skip = total.saturating_sub(usize::try_from(count).unwrap_or(usize::MAX));
507        let mut frames = Vec::with_capacity(total.saturating_sub(skip));
508        for row in &index_rows[skip..] {
509            if let Some(frame) = seek_read_frame(self, row)? {
510                frames.push(frame);
511            }
512        }
513        Ok(frames)
514    }
515
516    /// Read entries on the active branch from `leaf_entry_id` back to root.
517    /// Returns frames in root→leaf order.
518    pub fn read_active_path(&self, leaf_entry_id: &str) -> Result<Vec<SegmentFrame>> {
519        let index_rows = self.read_index()?;
520        let id_to_row: std::collections::HashMap<&str, &OffsetIndexEntry> = index_rows
521            .iter()
522            .map(|row| (row.entry_id.as_str(), row))
523            .collect();
524
525        let mut frames = Vec::new();
526        let mut current_id: Option<String> = Some(leaf_entry_id.to_string());
527        while let Some(ref entry_id) = current_id {
528            let row = id_to_row.get(entry_id.as_str());
529            let row = match row {
530                Some(r) => *r,
531                None => break,
532            };
533            match seek_read_frame(self, row)? {
534                Some(frame) => {
535                    current_id.clone_from(&frame.parent_entry_id);
536                    frames.push(frame);
537                }
538                None => break,
539            }
540        }
541        frames.reverse();
542        Ok(frames)
543    }
544
545    /// Total number of entries appended so far.
546    pub const fn entry_count(&self) -> u64 {
547        self.next_entry_seq.saturating_sub(1)
548    }
549
550    /// Current head position, or `None` if the store is empty.
551    pub fn head(&self) -> Option<StoreHead> {
552        self.last_entry_id.as_ref().map(|entry_id| StoreHead {
553            segment_seq: self.next_segment_seq,
554            entry_seq: self.next_entry_seq.saturating_sub(1),
555            entry_id: entry_id.clone(),
556        })
557    }
558
559    fn checkpoint_path(&self, checkpoint_seq: u64) -> PathBuf {
560        self.root
561            .join("checkpoints")
562            .join(format!("{checkpoint_seq:016}.json"))
563    }
564
565    /// Create a checkpoint snapshot at the current head.
566    pub fn create_checkpoint(&self, checkpoint_seq: u64, reason: &str) -> Result<Checkpoint> {
567        let head = self.head().unwrap_or(StoreHead {
568            segment_seq: 0,
569            entry_seq: 0,
570            entry_id: String::new(),
571        });
572        let snapshot_ref = format!("checkpoints/{checkpoint_seq:016}.json");
573        let checkpoint = Checkpoint {
574            schema: CHECKPOINT_SCHEMA.to_string(),
575            checkpoint_seq,
576            at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
577            head_entry_seq: head.entry_seq,
578            head_entry_id: head.entry_id,
579            snapshot_ref,
580            compacted_before_entry_seq: 0,
581            chain_hash: self.chain_hash.clone(),
582            reason: reason.to_string(),
583        };
584        let tmp_path = self
585            .root
586            .join("tmp")
587            .join(format!("{checkpoint_seq:016}.json.tmp"));
588        fs::write(&tmp_path, serde_json::to_vec_pretty(&checkpoint)?)?;
589        fs::rename(&tmp_path, self.checkpoint_path(checkpoint_seq))?;
590        Ok(checkpoint)
591    }
592
593    /// Read a checkpoint by sequence number.
594    pub fn read_checkpoint(&self, checkpoint_seq: u64) -> Result<Option<Checkpoint>> {
595        let path = self.checkpoint_path(checkpoint_seq);
596        if !path.exists() {
597            return Ok(None);
598        }
599        let data = fs::read_to_string(&path)?;
600        let cp: Checkpoint = serde_json::from_str(&data)?;
601        Ok(Some(cp))
602    }
603
604    pub fn append_migration_event(&self, mut event: MigrationEvent) -> Result<()> {
605        if event.schema.is_empty() {
606            event.schema = MIGRATION_EVENT_SCHEMA.to_string();
607        }
608        if event.at.is_empty() {
609            event.at = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
610        }
611        append_jsonl_line(&self.migration_ledger_path(), &event)
612    }
613
614    pub fn read_migration_events(&self) -> Result<Vec<MigrationEvent>> {
615        let path = self.migration_ledger_path();
616        if !path.exists() {
617            return Ok(Vec::new());
618        }
619        read_jsonl::<MigrationEvent>(&path)
620    }
621
622    #[allow(clippy::too_many_lines)]
623    pub fn rollback_to_checkpoint(
624        &mut self,
625        checkpoint_seq: u64,
626        migration_id: impl Into<String>,
627        correlation_id: impl Into<String>,
628    ) -> Result<MigrationEvent> {
629        let migration_id = migration_id.into();
630        let correlation_id = correlation_id.into();
631
632        let rollback_result: Result<MigrationEvent> = (|| {
633            let checkpoint = self
634                .read_checkpoint(checkpoint_seq)?
635                .ok_or_else(|| Error::session(format!("checkpoint {checkpoint_seq} not found")))?;
636
637            let mut index_rows = self.read_index()?;
638            index_rows.retain(|row| row.entry_seq <= checkpoint.head_entry_seq);
639
640            let mut keep_len_by_segment: std::collections::HashMap<u64, u64> =
641                std::collections::HashMap::new();
642            for row in &index_rows {
643                let end = row
644                    .byte_offset
645                    .checked_add(row.byte_length)
646                    .ok_or_else(|| Error::session("index byte range overflow during rollback"))?;
647                keep_len_by_segment
648                    .entry(row.segment_seq)
649                    .and_modify(|current| *current = (*current).max(end))
650                    .or_insert(end);
651            }
652
653            let segments_dir = self.root.join("segments");
654            if segments_dir.exists() {
655                let mut segment_files: Vec<(u64, PathBuf)> = Vec::new();
656                for entry in fs::read_dir(&segments_dir)? {
657                    let entry = entry?;
658                    let path = entry.path();
659                    if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
660                        continue;
661                    }
662                    let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
663                        continue;
664                    };
665                    let Ok(segment_seq) = stem.parse::<u64>() else {
666                        continue;
667                    };
668                    segment_files.push((segment_seq, path));
669                }
670                segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
671
672                for (segment_seq, path) in segment_files {
673                    match keep_len_by_segment.get(&segment_seq).copied() {
674                        Some(keep_len) if keep_len > 0 => {
675                            let current_len = fs::metadata(&path)?.len();
676                            if keep_len < current_len {
677                                let file = OpenOptions::new().write(true).open(&path)?;
678                                file.set_len(keep_len)?;
679                            }
680                        }
681                        _ => {
682                            fs::remove_file(&path)?;
683                        }
684                    }
685                }
686            }
687
688            let index_path = self.index_file_path();
689            let index_tmp = self.root.join("tmp").join("offsets.rollback.tmp");
690            write_jsonl_lines(&index_tmp, &index_rows)?;
691            fs::rename(index_tmp, index_path)?;
692
693            self.next_segment_seq = 1;
694            self.next_frame_seq = 1;
695            self.next_entry_seq = 1;
696            self.current_segment_bytes = 0;
697            self.bootstrap_from_disk()?;
698
699            let verification = MigrationVerification {
700                entry_count_match: self.entry_count() == checkpoint.head_entry_seq,
701                hash_chain_match: self.chain_hash == checkpoint.chain_hash,
702                index_consistent: self.validate_integrity().is_ok(),
703            };
704
705            let (outcome, error_class) = if verification.entry_count_match
706                && verification.hash_chain_match
707                && verification.index_consistent
708            {
709                ("ok".to_string(), None)
710            } else if verification.index_consistent {
711                (
712                    "recoverable_error".to_string(),
713                    Some("integrity_mismatch".to_string()),
714                )
715            } else {
716                (
717                    "fatal_error".to_string(),
718                    Some("index_corruption".to_string()),
719                )
720            };
721
722            let event = MigrationEvent {
723                schema: MIGRATION_EVENT_SCHEMA.to_string(),
724                migration_id: migration_id.clone(),
725                phase: "rollback".to_string(),
726                at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
727                source_path: checkpoint.snapshot_ref,
728                target_path: self.root.display().to_string(),
729                source_format: "native_v2".to_string(),
730                target_format: "native_v2".to_string(),
731                verification,
732                outcome: outcome.clone(),
733                error_class,
734                correlation_id: correlation_id.clone(),
735            };
736            self.append_migration_event(event.clone())?;
737
738            if outcome == "ok" {
739                Ok(event)
740            } else {
741                Err(Error::session(format!(
742                    "rollback verification failed for checkpoint {checkpoint_seq}"
743                )))
744            }
745        })();
746
747        match rollback_result {
748            Ok(event) => Ok(event),
749            Err(error) => {
750                if !rollback_failure_event_already_recorded(&error) {
751                    let failure_event = MigrationEvent {
752                        schema: MIGRATION_EVENT_SCHEMA.to_string(),
753                        migration_id,
754                        phase: "rollback".to_string(),
755                        at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
756                        source_path: self.checkpoint_path(checkpoint_seq).display().to_string(),
757                        target_path: self.root.display().to_string(),
758                        source_format: "native_v2".to_string(),
759                        target_format: "native_v2".to_string(),
760                        verification: MigrationVerification {
761                            entry_count_match: false,
762                            hash_chain_match: false,
763                            index_consistent: false,
764                        },
765                        outcome: "fatal_error".to_string(),
766                        error_class: Some(classify_rollback_error(&error).to_string()),
767                        correlation_id,
768                    };
769                    let _ = self.append_migration_event(failure_event);
770                }
771                Err(error)
772            }
773        }
774    }
775
776    #[allow(clippy::too_many_lines)]
777    pub fn write_manifest(
778        &self,
779        session_id: impl Into<String>,
780        source_format: impl Into<String>,
781    ) -> Result<Manifest> {
782        let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
783        let created_at = self
784            .read_manifest()?
785            .map_or_else(|| now.clone(), |m| m.created_at);
786        let session_id = session_id.into();
787        let source_format = source_format.into();
788        let index_rows = self.read_index()?;
789
790        let mut parent_counts: std::collections::HashMap<String, u64> =
791            std::collections::HashMap::new();
792        let mut message_count = 0u64;
793        let mut compaction_count = 0u64;
794        let mut entry_ids = std::collections::HashSet::with_capacity(index_rows.len());
795
796        let mut recomputed_chain = GENESIS_CHAIN_HASH.to_string();
797        let mut parent_links_closed = true;
798
799        for row in &index_rows {
800            if let Some(frame) = seek_read_frame(self, row)? {
801                entry_ids.insert(frame.entry_id.clone());
802
803                if frame.entry_type == "message" {
804                    message_count = message_count.saturating_add(1);
805                }
806                if frame.entry_type == "compaction" {
807                    compaction_count = compaction_count.saturating_add(1);
808                }
809
810                if let Some(parent_id) = frame.parent_entry_id.as_deref() {
811                    *parent_counts.entry(parent_id.to_string()).or_insert(0) += 1;
812
813                    // In a valid append-only log, the parent must have appeared
814                    // (and thus been added to entry_ids) before the child.
815                    if !entry_ids.contains(parent_id) {
816                        parent_links_closed = false;
817                    }
818                }
819
820                recomputed_chain = chain_hash_step(&recomputed_chain, &frame.payload_sha256);
821            }
822        }
823
824        let branches_total = u64::try_from(parent_counts.values().filter(|&&n| n > 1).count())
825            .map_err(|_| Error::session("branch count exceeds u64"))?;
826
827        let mut monotonic_entry_seq = true;
828        let mut monotonic_segment_seq = true;
829        let mut last_entry_seq = 0u64;
830        let mut last_segment_seq = 0u64;
831        for row in &index_rows {
832            if row.entry_seq <= last_entry_seq {
833                monotonic_entry_seq = false;
834            }
835            if row.segment_seq < last_segment_seq {
836                monotonic_segment_seq = false;
837            }
838            last_entry_seq = row.entry_seq;
839            last_segment_seq = row.segment_seq;
840        }
841
842        let hash_chain_valid = recomputed_chain == self.chain_hash;
843
844        let head = self.head().unwrap_or(StoreHead {
845            segment_seq: 0,
846            entry_seq: 0,
847            entry_id: String::new(),
848        });
849        let segment_count = u64::try_from(
850            index_rows
851                .iter()
852                .map(|row| row.segment_seq)
853                .collect::<BTreeSet<_>>()
854                .len(),
855        )
856        .map_err(|_| Error::session("segment count exceeds u64"))?;
857
858        let mut manifest = Manifest {
859            schema: MANIFEST_SCHEMA.to_string(),
860            store_version: 2,
861            session_id,
862            source_format,
863            created_at,
864            updated_at: now,
865            head,
866            counters: ManifestCounters {
867                entries_total: u64::try_from(index_rows.len())
868                    .map_err(|_| Error::session("entry count exceeds u64"))?,
869                messages_total: message_count,
870                branches_total,
871                compactions_total: compaction_count,
872                bytes_total: self.total_bytes,
873            },
874            files: ManifestFiles {
875                segment_dir: "segments/".to_string(),
876                segment_count,
877                index_path: "index/offsets.jsonl".to_string(),
878                checkpoint_dir: "checkpoints/".to_string(),
879                migration_ledger_path: "migrations/ledger.jsonl".to_string(),
880            },
881            integrity: ManifestIntegrity {
882                chain_hash: self.chain_hash.clone(),
883                manifest_hash: String::new(),
884                last_crc32c: self.last_crc32c.clone(),
885            },
886            invariants: ManifestInvariants {
887                parent_links_closed,
888                monotonic_entry_seq,
889                monotonic_segment_seq,
890                index_within_segment_bounds: self.validate_integrity().is_ok(),
891                branch_heads_indexed: true,
892                checkpoints_monotonic: true,
893                hash_chain_valid,
894            },
895        };
896        manifest.integrity.manifest_hash = manifest_hash_hex(&manifest)?;
897
898        let tmp = self.root.join("tmp").join("manifest.json.tmp");
899        fs::write(&tmp, serde_json::to_vec_pretty(&manifest)?)?;
900        fs::rename(&tmp, self.manifest_path())?;
901        Ok(manifest)
902    }
903
904    pub fn read_manifest(&self) -> Result<Option<Manifest>> {
905        let path = self.manifest_path();
906        if !path.exists() {
907            return Ok(None);
908        }
909        let content = fs::read_to_string(path)?;
910        let manifest: Manifest = serde_json::from_str(&content)?;
911        Ok(Some(manifest))
912    }
913
914    pub fn chain_hash(&self) -> &str {
915        &self.chain_hash
916    }
917
918    pub const fn total_bytes(&self) -> u64 {
919        self.total_bytes
920    }
921
922    pub fn index_summary(&self) -> Result<Option<IndexSummary>> {
923        let rows = self.read_index()?;
924        let (Some(first), Some(last)) = (rows.first(), rows.last()) else {
925            return Ok(None);
926        };
927        Ok(Some(IndexSummary {
928            entry_count: u64::try_from(rows.len())
929                .map_err(|_| Error::session("entry count exceeds u64"))?,
930            first_entry_seq: first.entry_seq,
931            last_entry_seq: last.entry_seq,
932            last_entry_id: last.entry_id.clone(),
933        }))
934    }
935
936    /// Rebuild the offset index by scanning all segment files.
937    /// This is the recovery path when the index is missing or corrupted.
938    #[allow(clippy::too_many_lines)]
939    pub fn rebuild_index(&mut self) -> Result<u64> {
940        let mut rebuilt_count = 0u64;
941        let index_path = self.index_file_path();
942        let index_tmp_path = self.root.join("tmp").join("offsets.rebuild.tmp");
943
944        // Ensure tmp dir exists
945        if let Some(parent) = index_tmp_path.parent() {
946            fs::create_dir_all(parent)?;
947        }
948
949        // Start fresh with the temp file
950        if index_tmp_path.exists() {
951            fs::remove_file(&index_tmp_path)?;
952        }
953
954        let mut index_writer = std::io::BufWriter::new(
955            OpenOptions::new()
956                .create(true)
957                .write(true)
958                .truncate(true)
959                .open(&index_tmp_path)?,
960        );
961
962        self.chain_hash = GENESIS_CHAIN_HASH.to_string();
963        self.total_bytes = 0;
964        self.last_entry_id = None;
965        self.last_crc32c = "00000000".to_string();
966
967        let segment_files = self.list_segment_files()?;
968        let mut last_observed_seq = 0u64;
969
970        'segments: for (i, (_seg_seq, seg_path)) in segment_files.iter().enumerate() {
971            let file = File::open(seg_path)?;
972            let mut reader = BufReader::new(file);
973            let mut byte_offset = 0u64;
974            let mut line_number = 0u64;
975            let mut line = String::new();
976
977            loop {
978                line.clear();
979                // Use bounded read to prevent OOM on corrupted files (e.g. missing newlines)
980                let bytes_read = match read_line_with_limit(
981                    &mut reader,
982                    &mut line,
983                    MAX_FRAME_READ_BYTES,
984                ) {
985                    Ok(n) => n,
986                    Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
987                        // If line exceeds limit, we treat it as corruption and truncate.
988                        // However, we can't easily recover the offset without reading past the bad data.
989                        // For safety, we truncate at the start of this bad frame.
990                        tracing::warn!(
991                            segment = %seg_path.display(),
992                            line_number,
993                            error = %e,
994                            "SessionStoreV2 encountered oversized line during index rebuild; truncating segment"
995                        );
996                        drop(reader);
997                        truncate_file_to(seg_path, byte_offset)?;
998                        remove_orphaned_segments(&segment_files[i + 1..])?;
999                        break 'segments;
1000                    }
1001                    Err(e) => return Err(Error::Io(Box::new(e))),
1002                };
1003
1004                if bytes_read == 0 {
1005                    break;
1006                }
1007                line_number = line_number.saturating_add(1);
1008                let line_len = u64::try_from(bytes_read)
1009                    .map_err(|_| Error::session("line length exceeds u64"))?;
1010
1011                if line.trim().is_empty() {
1012                    byte_offset = byte_offset.saturating_add(line_len);
1013                    continue;
1014                }
1015
1016                let missing_newline = !line.ends_with('\n');
1017                let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1018                let frame: SegmentFrame = match serde_json::from_str(json_line) {
1019                    Ok(frame) => {
1020                        if missing_newline {
1021                            // If we successfully parsed the JSON but the line is missing a newline,
1022                            // the atomic write (which includes the newline) did not complete.
1023                            // We must discard this frame to ensure the file ends cleanly for future appends.
1024                            tracing::warn!(
1025                                segment = %seg_path.display(),
1026                                line_number,
1027                                "SessionStoreV2 dropping valid but newline-missing trailing segment frame during index rebuild"
1028                            );
1029                            drop(reader);
1030                            truncate_file_to(seg_path, byte_offset)?;
1031                            remove_orphaned_segments(&segment_files[i + 1..])?;
1032                            break 'segments;
1033                        }
1034                        frame
1035                    }
1036                    Err(err) => {
1037                        let at_eof = reader.fill_buf()?.is_empty();
1038                        if at_eof && missing_newline {
1039                            tracing::warn!(
1040                                segment = %seg_path.display(),
1041                                line_number,
1042                                error = %err,
1043                                "SessionStoreV2 dropping truncated trailing segment frame during index rebuild"
1044                            );
1045                            // Trim the incomplete tail so subsequent reads and appends remain valid.
1046                            drop(reader);
1047                            truncate_file_to(seg_path, byte_offset)?;
1048                            remove_orphaned_segments(&segment_files[i + 1..])?;
1049                            break 'segments;
1050                        }
1051                        // Non-EOF corruption: fail closed to prevent silent data loss.
1052                        return Err(Error::session(format!(
1053                            "failed to parse segment frame while rebuilding index: \
1054                                                             segment={} line={line_number}: {err}",
1055                            seg_path.display()
1056                        )));
1057                    }
1058                };
1059
1060                if frame.entry_seq <= last_observed_seq {
1061                    tracing::warn!(
1062                        segment = %seg_path.display(),
1063                        line_number,
1064                        entry_seq = frame.entry_seq,
1065                        last_seq = last_observed_seq,
1066                        "SessionStoreV2 detected non-monotonic entry sequence during rebuild; truncating segment"
1067                    );
1068                    drop(reader);
1069                    truncate_file_to(seg_path, byte_offset)?;
1070                    remove_orphaned_segments(&segment_files[i + 1..])?;
1071                    break 'segments;
1072                }
1073                last_observed_seq = frame.entry_seq;
1074
1075                let record_bytes = line.as_bytes().to_vec();
1076                let crc = crc32c_upper(&record_bytes);
1077
1078                let index_entry = OffsetIndexEntry {
1079                    schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
1080                    entry_seq: frame.entry_seq,
1081                    entry_id: frame.entry_id.clone(),
1082                    segment_seq: frame.segment_seq,
1083                    frame_seq: frame.frame_seq,
1084                    byte_offset,
1085                    byte_length: line_len,
1086                    crc32c: crc.clone(),
1087                    state: Cow::Borrowed("active"),
1088                };
1089                serde_json::to_writer(&mut index_writer, &index_entry)?;
1090                index_writer.write_all(b"\n")?;
1091
1092                self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
1093                self.total_bytes = self.total_bytes.saturating_add(line_len);
1094                self.last_entry_id = Some(frame.entry_id);
1095                self.last_crc32c = crc;
1096
1097                byte_offset = byte_offset.saturating_add(line_len);
1098                rebuilt_count = rebuilt_count.saturating_add(1);
1099            }
1100        }
1101
1102        index_writer.flush()?;
1103        drop(index_writer); // Close the file handle before renaming (fixes Windows ERROR_SHARING_VIOLATION)
1104
1105        // Atomically replace the old index with the rebuilt one
1106        fs::rename(&index_tmp_path, &index_path)?;
1107
1108        self.next_segment_seq = 1;
1109        self.next_frame_seq = 1;
1110        self.next_entry_seq = 1;
1111        self.current_segment_bytes = 0;
1112        self.bootstrap_from_disk()?;
1113
1114        Ok(rebuilt_count)
1115    }
1116
1117    pub fn validate_integrity(&self) -> Result<()> {
1118        let index_rows = self.read_index()?;
1119        let mut last_entry_seq = 0;
1120
1121        // Group rows by segment to minimize file opens
1122        let mut rows_by_segment: std::collections::BTreeMap<u64, Vec<&OffsetIndexEntry>> =
1123            std::collections::BTreeMap::new();
1124        for row in &index_rows {
1125            if row.entry_seq <= last_entry_seq {
1126                return Err(Error::session(format!(
1127                    "entry sequence is not strictly increasing at entry_seq={}",
1128                    row.entry_seq
1129                )));
1130            }
1131            last_entry_seq = row.entry_seq;
1132            rows_by_segment
1133                .entry(row.segment_seq)
1134                .or_default()
1135                .push(row);
1136        }
1137
1138        for (segment_seq, rows) in rows_by_segment {
1139            let segment_path = self.segment_file_path(segment_seq);
1140            let mut file = File::open(&segment_path).map_err(|err| {
1141                Error::session(format!(
1142                    "failed to open segment {}: {err}",
1143                    segment_path.display()
1144                ))
1145            })?;
1146            let segment_len = file.metadata()?.len();
1147
1148            for row in rows {
1149                let end = row
1150                    .byte_offset
1151                    .checked_add(row.byte_length)
1152                    .ok_or_else(|| Error::session("index byte range overflow"))?;
1153                if end > segment_len {
1154                    return Err(Error::session(format!(
1155                        "index out of bounds for segment {}: end={} len={segment_len}",
1156                        segment_path.display(),
1157                        end
1158                    )));
1159                }
1160
1161                file.seek(SeekFrom::Start(row.byte_offset))?;
1162                let mut record_bytes = vec![
1163                    0u8;
1164                    usize::try_from(row.byte_length).map_err(|_| {
1165                        Error::session(format!("byte length too large: {}", row.byte_length))
1166                    })?
1167                ];
1168                file.read_exact(&mut record_bytes)?;
1169
1170                let checksum = crc32c_upper(&record_bytes);
1171                if checksum != row.crc32c {
1172                    return Err(Error::session(format!(
1173                        "checksum mismatch for entry_seq={} expected={} actual={checksum}",
1174                        row.entry_seq, row.crc32c
1175                    )));
1176                }
1177
1178                if record_bytes.last() == Some(&b'\n') {
1179                    record_bytes.pop();
1180                }
1181                let frame: SegmentFrame = serde_json::from_slice(&record_bytes)?;
1182
1183                if frame.entry_seq != row.entry_seq
1184                    || frame.entry_id != row.entry_id
1185                    || frame.segment_seq != row.segment_seq
1186                    || frame.frame_seq != row.frame_seq
1187                {
1188                    return Err(Error::session(format!(
1189                        "index/frame mismatch at entry_seq={}",
1190                        row.entry_seq
1191                    )));
1192                }
1193
1194                let (payload_hash, payload_bytes) = payload_hash_and_size(&frame.payload)?;
1195                if frame.payload_sha256 != payload_hash || frame.payload_bytes != payload_bytes {
1196                    return Err(Error::session(format!(
1197                        "payload integrity mismatch at entry_seq={}",
1198                        row.entry_seq
1199                    )));
1200                }
1201            }
1202        }
1203
1204        Ok(())
1205    }
1206
1207    fn bootstrap_from_disk(&mut self) -> Result<()> {
1208        let index_rows = self.read_index()?;
1209        if let Some(last) = index_rows.last() {
1210            self.next_entry_seq = last
1211                .entry_seq
1212                .checked_add(1)
1213                .ok_or_else(|| Error::session("entry sequence overflow while bootstrapping"))?;
1214            self.next_segment_seq = last.segment_seq;
1215            self.next_frame_seq = last
1216                .frame_seq
1217                .checked_add(1)
1218                .ok_or_else(|| Error::session("frame sequence overflow while bootstrapping"))?;
1219            let segment_path = self.segment_file_path(last.segment_seq);
1220            self.current_segment_bytes = fs::metadata(&segment_path)
1221                .map(|meta| meta.len())
1222                .map_err(|err| {
1223                    Error::session(format!(
1224                        "failed to stat active segment {} while bootstrapping: {err}",
1225                        segment_path.display()
1226                    ))
1227                })?;
1228            self.last_entry_id = Some(last.entry_id.clone());
1229            self.last_crc32c.clone_from(&last.crc32c);
1230
1231            let mut chain = GENESIS_CHAIN_HASH.to_string();
1232            let mut total = 0u64;
1233            for row in &index_rows {
1234                let frame = seek_read_frame(self, row)?.ok_or_else(|| {
1235                    Error::session(format!(
1236                        "index references missing frame during bootstrap: entry_seq={}, segment={}",
1237                        row.entry_seq, row.segment_seq
1238                    ))
1239                })?;
1240                chain = chain_hash_step(&chain, &frame.payload_sha256);
1241                total = total.saturating_add(row.byte_length);
1242            }
1243            self.chain_hash = chain;
1244            self.total_bytes = total;
1245        } else {
1246            self.chain_hash = GENESIS_CHAIN_HASH.to_string();
1247            self.total_bytes = 0;
1248            self.last_entry_id = None;
1249            self.last_crc32c = "00000000".to_string();
1250        }
1251        Ok(())
1252    }
1253}
1254
1255fn rollback_failure_event_already_recorded(error: &Error) -> bool {
1256    matches!(error, Error::Session(message) if message.contains("rollback verification failed"))
1257}
1258
1259fn classify_rollback_error(error: &Error) -> &'static str {
1260    match error {
1261        Error::Session(message) => {
1262            if message.contains("checkpoint") && message.contains("not found") {
1263                "checkpoint_not_found"
1264            } else if message.contains("index byte range overflow") {
1265                "index_range_overflow"
1266            } else if message.contains("rollback verification failed") {
1267                "rollback_verification_failed"
1268            } else {
1269                "session_error"
1270            }
1271        }
1272        _ => error.category_code(),
1273    }
1274}
1275
1276fn is_recoverable_index_error(error: &Error) -> bool {
1277    match error {
1278        Error::Json(_) => true,
1279        Error::Io(err) => matches!(
1280            err.kind(),
1281            std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::InvalidData
1282        ),
1283        Error::Session(message) => {
1284            let lower = message.to_ascii_lowercase();
1285            lower.contains("checksum mismatch")
1286                || lower.contains("index out of bounds")
1287                || lower.contains("index/frame mismatch")
1288                || lower.contains("payload integrity mismatch")
1289                || lower.contains("entry sequence is not strictly increasing")
1290                || lower.contains("index byte range overflow")
1291        }
1292        _ => false,
1293    }
1294}
1295
1296/// Convert a V2 `SegmentFrame` payload back into a `SessionEntry`.
1297pub fn frame_to_session_entry(frame: &SegmentFrame) -> Result<SessionEntry> {
1298    // Deserialize directly from the RawValue to avoid extra allocation/copying.
1299    // serde_json::from_str works on RawValue.get() which is &str.
1300    let entry: SessionEntry = serde_json::from_str(frame.payload.get()).map_err(|e| {
1301        Error::session(format!(
1302            "failed to deserialize SessionEntry from frame entry_id={}: {e}",
1303            frame.entry_id
1304        ))
1305    })?;
1306
1307    if let Some(base_id) = entry.base_id() {
1308        if base_id != &frame.entry_id {
1309            return Err(Error::session(format!(
1310                "frame entry_id mismatch: frame={} entry={}",
1311                frame.entry_id, base_id
1312            )));
1313        }
1314    }
1315
1316    Ok(entry)
1317}
1318
1319/// Extract the V2 frame arguments from a `SessionEntry`.
1320pub fn session_entry_to_frame_args(
1321    entry: &SessionEntry,
1322) -> Result<(String, Option<String>, String, Value)> {
1323    let base = entry.base();
1324    let entry_id = base
1325        .id
1326        .clone()
1327        .ok_or_else(|| Error::session("SessionEntry has no id"))?;
1328    let parent_entry_id = base.parent_id.clone();
1329
1330    let entry_type = match entry {
1331        SessionEntry::Message(_) => "message",
1332        SessionEntry::ModelChange(_) => "model_change",
1333        SessionEntry::ThinkingLevelChange(_) => "thinking_level_change",
1334        SessionEntry::Compaction(_) => "compaction",
1335        SessionEntry::BranchSummary(_) => "branch_summary",
1336        SessionEntry::Label(_) => "label",
1337        SessionEntry::SessionInfo(_) => "session_info",
1338        SessionEntry::Custom(_) => "custom",
1339    };
1340
1341    let payload = serde_json::to_value(entry).map_err(|e| {
1342        Error::session(format!(
1343            "failed to serialize SessionEntry to frame payload: {e}"
1344        ))
1345    })?;
1346
1347    Ok((entry_id, parent_entry_id, entry_type.to_string(), payload))
1348}
1349
1350/// Seek-read a single frame from a segment using an index row.
1351fn seek_read_frame(store: &SessionStoreV2, row: &OffsetIndexEntry) -> Result<Option<SegmentFrame>> {
1352    let segment_path = store.segment_file_path(row.segment_seq);
1353    if !segment_path.exists() {
1354        return Ok(None);
1355    }
1356    let mut file = File::open(&segment_path)?;
1357    let file_len = file.metadata()?.len();
1358    let end_offset = row
1359        .byte_offset
1360        .checked_add(row.byte_length)
1361        .ok_or_else(|| Error::session("index byte range overflow"))?;
1362
1363    if end_offset > file_len {
1364        return Err(Error::session(format!(
1365            "index out of bounds for segment {}: end={} len={}",
1366            segment_path.display(),
1367            end_offset,
1368            file_len
1369        )));
1370    }
1371
1372    file.seek(SeekFrom::Start(row.byte_offset))?;
1373    let byte_len = usize::try_from(row.byte_length)
1374        .map_err(|_| Error::session(format!("byte length too large: {}", row.byte_length)))?;
1375
1376    if row.byte_length > store.max_segment_bytes.max(100 * 1024 * 1024) {
1377        return Err(Error::session(format!(
1378            "frame byte length {byte_len} exceeds limit"
1379        )));
1380    }
1381
1382    let mut buf = vec![0u8; byte_len];
1383    file.read_exact(&mut buf)?;
1384    if buf.last() == Some(&b'\n') {
1385        buf.pop();
1386    }
1387    let frame: SegmentFrame = serde_json::from_slice(&buf)?;
1388    Ok(Some(frame))
1389}
1390
1391/// Compute next hash chain value: `SHA-256(prev_chain_hex || payload_sha256_hex)`.
1392fn chain_hash_step(prev_chain: &str, payload_sha256: &str) -> String {
1393    let mut hasher = Sha256::new();
1394    hasher.update(prev_chain.as_bytes());
1395    hasher.update(payload_sha256.as_bytes());
1396    format!("{:x}", hasher.finalize())
1397}
1398
1399fn manifest_hash_hex(manifest: &Manifest) -> Result<String> {
1400    let encoded = serde_json::to_vec(manifest)?;
1401    Ok(format!("{:x}", Sha256::digest(&encoded)))
1402}
1403
1404/// Derive the V2 sidecar store root from a JSONL session file path.
1405pub fn v2_sidecar_path(jsonl_path: &Path) -> PathBuf {
1406    let stem = jsonl_path.file_stem().map_or_else(
1407        || "session".to_string(),
1408        |s| s.to_string_lossy().into_owned(),
1409    );
1410    let parent = jsonl_path.parent().unwrap_or_else(|| Path::new("."));
1411    parent.join(format!("{stem}.v2"))
1412}
1413
1414/// Check whether a V2 sidecar store exists for the given JSONL session.
1415pub fn has_v2_sidecar(jsonl_path: &Path) -> bool {
1416    let root = v2_sidecar_path(jsonl_path);
1417    root.join("manifest.json").exists() || root.join("index").join("offsets.jsonl").exists()
1418}
1419
1420fn append_jsonl_line<T: Serialize>(path: &Path, value: &T) -> Result<()> {
1421    let mut file = OpenOptions::new().create(true).append(true).open(path)?;
1422    // Serialize directly to file — avoids intermediate Vec<u8> allocation.
1423    serde_json::to_writer(&mut file, value)?;
1424    file.write_all(b"\n")?;
1425    Ok(())
1426}
1427
1428fn truncate_file_to(path: &Path, len: u64) -> Result<()> {
1429    let file = OpenOptions::new().write(true).truncate(false).open(path)?;
1430    file.set_len(len)?;
1431    Ok(())
1432}
1433
1434fn write_jsonl_lines<T: Serialize>(path: &Path, rows: &[T]) -> Result<()> {
1435    let file = OpenOptions::new()
1436        .create(true)
1437        .write(true)
1438        .truncate(true)
1439        .open(path)?;
1440    let mut writer = std::io::BufWriter::new(file);
1441    for row in rows {
1442        serde_json::to_writer(&mut writer, row)?;
1443        writer.write_all(b"\n")?;
1444    }
1445    writer.flush()?;
1446    Ok(())
1447}
1448
1449fn read_jsonl<T: for<'de> Deserialize<'de>>(path: &Path) -> Result<Vec<T>> {
1450    let file = File::open(path)?;
1451    let mut reader = BufReader::new(file);
1452    let mut out = Vec::new();
1453    let mut line = String::new();
1454    loop {
1455        line.clear();
1456        let bytes_read = read_line_with_limit(&mut reader, &mut line, MAX_FRAME_READ_BYTES)
1457            .map_err(|e| Error::Io(Box::new(e)))?;
1458        if bytes_read == 0 {
1459            break;
1460        }
1461        if line.trim().is_empty() {
1462            continue;
1463        }
1464        let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1465        out.push(serde_json::from_str::<T>(json_line)?);
1466    }
1467    Ok(out)
1468}
1469
1470fn payload_hash_and_size(payload: &RawValue) -> Result<(String, u64)> {
1471    // For RawValue, we can just get the string content directly.
1472    let bytes = payload.get().as_bytes();
1473    let payload_bytes = u64::try_from(bytes.len())
1474        .map_err(|_| Error::session(format!("payload is too large: {} bytes", bytes.len())))?;
1475    let hash = format!("{:x}", Sha256::digest(bytes));
1476    Ok((hash, payload_bytes))
1477}
1478
1479fn line_length_u64(encoded: &[u8]) -> Result<u64> {
1480    let line_len = encoded
1481        .len()
1482        .checked_add(1)
1483        .ok_or_else(|| Error::session("line length overflow"))?;
1484    u64::try_from(line_len).map_err(|_| Error::session("line length exceeds u64"))
1485}
1486
1487fn crc32c_upper(data: &[u8]) -> String {
1488    let crc = crc32c::crc32c(data);
1489    format!("{crc:08X}")
1490}
1491
1492fn remove_orphaned_segments(segments: &[(u64, PathBuf)]) -> Result<()> {
1493    for (_, orphan_path) in segments {
1494        if orphan_path.exists() {
1495            tracing::warn!(
1496                path = %orphan_path.display(),
1497                "removing orphaned segment file after index rebuild truncation"
1498            );
1499            if let Err(e) = fs::remove_file(orphan_path) {
1500                if e.kind() != std::io::ErrorKind::NotFound {
1501                    return Err(Error::Io(Box::new(e)));
1502                }
1503            }
1504        }
1505    }
1506    Ok(())
1507}
1508
1509fn read_line_with_limit<R: BufRead>(
1510    reader: &mut R,
1511    buf: &mut String,
1512    limit: u64,
1513) -> std::io::Result<usize> {
1514    let mut take = reader.take(limit);
1515    let n = take.read_line(buf)?;
1516    if n > 0 && take.limit() == 0 && !buf.ends_with('\n') {
1517        return Err(std::io::Error::new(
1518            std::io::ErrorKind::InvalidData,
1519            format!("Line length exceeds limit of {limit} bytes"),
1520        ));
1521    }
1522    Ok(n)
1523}
1524
1525#[cfg(test)]
1526mod proptests {
1527    use super::*;
1528    use proptest::prelude::*;
1529    use serde_json::json;
1530
1531    // ====================================================================
1532    // chain_hash_step
1533    // ====================================================================
1534
1535    proptest! {
1536        #[test]
1537        fn chain_hash_output_is_64_hex(
1538            a in "[0-9a-f]{64}",
1539            b in "[0-9a-f]{64}",
1540        ) {
1541            let result = chain_hash_step(&a, &b);
1542            assert_eq!(result.len(), 64);
1543            assert!(result.chars().all(|c| c.is_ascii_hexdigit()));
1544        }
1545
1546        #[test]
1547        fn chain_hash_deterministic(
1548            a in "[0-9a-f]{64}",
1549            b in "[0-9a-f]{64}",
1550        ) {
1551            assert_eq!(chain_hash_step(&a, &b), chain_hash_step(&a, &b));
1552        }
1553
1554        #[test]
1555        fn chain_hash_non_commutative(
1556            a in "[0-9a-f]{64}",
1557            b in "[0-9a-f]{64}",
1558        ) {
1559            if a != b {
1560                assert_ne!(chain_hash_step(&a, &b), chain_hash_step(&b, &a));
1561            }
1562        }
1563
1564        #[test]
1565        fn chain_hash_genesis_differs_from_step(payload in "[0-9a-f]{64}") {
1566            let step1 = chain_hash_step(GENESIS_CHAIN_HASH, &payload);
1567            assert_ne!(step1, GENESIS_CHAIN_HASH);
1568        }
1569    }
1570
1571    // ====================================================================
1572    // crc32c_upper
1573    // ====================================================================
1574
1575    proptest! {
1576        #[test]
1577        fn crc32c_output_is_8_uppercase_hex(data in prop::collection::vec(any::<u8>(), 0..500)) {
1578            let result = crc32c_upper(&data);
1579            assert_eq!(result.len(), 8);
1580            assert!(result.chars().all(|c| matches!(c, '0'..='9' | 'A'..='F')));
1581        }
1582
1583        #[test]
1584        fn crc32c_deterministic(data in prop::collection::vec(any::<u8>(), 0..500)) {
1585            assert_eq!(crc32c_upper(&data), crc32c_upper(&data));
1586        }
1587
1588        #[test]
1589        fn crc32c_single_bit_sensitivity(byte in any::<u8>()) {
1590            let a = crc32c_upper(&[byte]);
1591            let b = crc32c_upper(&[byte ^ 1]);
1592            if byte != byte ^ 1 {
1593                assert_ne!(a, b, "flipping LSB should change CRC");
1594            }
1595        }
1596    }
1597
1598    // ====================================================================
1599    // payload_hash_and_size
1600    // ====================================================================
1601
1602    proptest! {
1603        #[test]
1604        fn payload_hash_is_64_hex(s in "[a-z]{0,50}") {
1605            let val = json!(s);
1606            let raw_string = serde_json::to_string(&val).unwrap();
1607            let raw = RawValue::from_string(raw_string).unwrap();
1608            let (hash, _size) = payload_hash_and_size(&raw).unwrap();
1609            assert_eq!(hash.len(), 64);
1610            assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
1611        }
1612
1613        #[test]
1614        fn payload_size_matches_serialization(s in "[a-z]{0,50}") {
1615            let val = json!(s);
1616            let raw_string = serde_json::to_string(&val).unwrap();
1617            let raw = RawValue::from_string(raw_string).unwrap();
1618            let (_, size) = payload_hash_and_size(&raw).unwrap();
1619            let expected = serde_json::to_vec(&val).unwrap().len() as u64;
1620            assert_eq!(size, expected);
1621        }
1622
1623        #[test]
1624        fn payload_hash_deterministic(n in 0i64..10000) {
1625            let val = json!(n);
1626            let raw_string = serde_json::to_string(&val).unwrap();
1627            let raw = RawValue::from_string(raw_string).unwrap();
1628            let (h1, s1) = payload_hash_and_size(&raw).unwrap();
1629            let (h2, s2) = payload_hash_and_size(&raw).unwrap();
1630            assert_eq!(h1, h2);
1631            assert_eq!(s1, s2);
1632        }
1633    }
1634
1635    // ====================================================================
1636    // line_length_u64
1637    // ====================================================================
1638
1639    proptest! {
1640        #[test]
1641        fn line_length_is_len_plus_one(data in prop::collection::vec(any::<u8>(), 0..1000)) {
1642            let result = line_length_u64(&data).unwrap();
1643            assert_eq!(result, data.len() as u64 + 1);
1644        }
1645
1646        #[test]
1647        fn line_length_never_zero(data in prop::collection::vec(any::<u8>(), 0..100)) {
1648            let result = line_length_u64(&data).unwrap();
1649            assert!(result >= 1);
1650        }
1651    }
1652
1653    // ====================================================================
1654    // v2_sidecar_path
1655    // ====================================================================
1656
1657    proptest! {
1658        #[test]
1659        fn sidecar_path_ends_with_v2(stem in "[a-z]{1,10}") {
1660            let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
1661            let result = v2_sidecar_path(&input);
1662            let name = result.file_name().unwrap().to_str().unwrap();
1663            assert_eq!(
1664                Path::new(name).extension().and_then(|ext| ext.to_str()),
1665                Some("v2"),
1666                "expected .v2 suffix, got {name}"
1667            );
1668        }
1669
1670        #[test]
1671        fn sidecar_path_preserves_parent(stem in "[a-z]{1,10}", dir in "[a-z]{1,8}") {
1672            let input = PathBuf::from(format!("/tmp/{dir}/{stem}.jsonl"));
1673            let result = v2_sidecar_path(&input);
1674            assert_eq!(
1675                result.parent().unwrap(),
1676                Path::new(&format!("/tmp/{dir}"))
1677            );
1678        }
1679
1680        #[test]
1681        fn sidecar_path_deterministic(stem in "[a-z]{1,10}") {
1682            let input = PathBuf::from(format!("/sessions/{stem}.jsonl"));
1683            assert_eq!(v2_sidecar_path(&input), v2_sidecar_path(&input));
1684        }
1685
1686        #[test]
1687        fn sidecar_path_contains_stem(stem in "[a-z]{1,10}") {
1688            let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
1689            let result = v2_sidecar_path(&input);
1690            let name = result.file_name().unwrap().to_str().unwrap();
1691            assert_eq!(name, format!("{stem}.v2"));
1692        }
1693    }
1694}