use crate::error::{Error, Result};
use crate::session::SessionEntry;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_json::value::RawValue;
use sha2::{Digest, Sha256};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::fs::{self, File, OpenOptions};
use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
pub const SEGMENT_FRAME_SCHEMA: &str = "pi.session_store_v2.segment_frame.v1";
pub const OFFSET_INDEX_SCHEMA: &str = "pi.session_store_v2.offset_index.v1";
pub const CHECKPOINT_SCHEMA: &str = "pi.session_store_v2.checkpoint.v1";
pub const MANIFEST_SCHEMA: &str = "pi.session_store_v2.manifest.v1";
pub const MIGRATION_EVENT_SCHEMA: &str = "pi.session_store_v2.migration_event.v1";
const MAX_FRAME_READ_BYTES: u64 = 100 * 1024 * 1024;
const GENESIS_CHAIN_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SegmentFrame {
pub schema: Cow<'static, str>,
pub segment_seq: u64,
pub frame_seq: u64,
pub entry_seq: u64,
pub entry_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_entry_id: Option<String>,
pub entry_type: String,
pub timestamp: String,
pub payload_sha256: String,
pub payload_bytes: u64,
pub payload: Box<RawValue>,
}
impl SegmentFrame {
fn new(
segment_seq: u64,
frame_seq: u64,
entry_seq: u64,
entry_id: String,
parent_entry_id: Option<String>,
entry_type: String,
payload: Box<RawValue>,
) -> Result<Self> {
let (payload_sha256, payload_bytes) = payload_hash_and_size(&payload)?;
Ok(Self {
schema: Cow::Borrowed(SEGMENT_FRAME_SCHEMA),
segment_seq,
frame_seq,
entry_seq,
entry_id,
parent_entry_id,
entry_type,
timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
payload_sha256,
payload_bytes,
payload,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OffsetIndexEntry {
pub schema: Cow<'static, str>,
pub entry_seq: u64,
pub entry_id: String,
pub segment_seq: u64,
pub frame_seq: u64,
pub byte_offset: u64,
pub byte_length: u64,
pub crc32c: String,
pub state: Cow<'static, str>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct StoreHead {
pub segment_seq: u64,
pub entry_seq: u64,
pub entry_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Checkpoint {
pub schema: String,
pub checkpoint_seq: u64,
pub at: String,
pub head_entry_seq: u64,
pub head_entry_id: String,
pub snapshot_ref: String,
pub compacted_before_entry_seq: u64,
pub chain_hash: String,
pub reason: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Manifest {
pub schema: String,
pub store_version: u8,
pub session_id: String,
pub source_format: String,
pub created_at: String,
pub updated_at: String,
pub head: StoreHead,
pub counters: ManifestCounters,
pub files: ManifestFiles,
pub integrity: ManifestIntegrity,
pub invariants: ManifestInvariants,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ManifestCounters {
pub entries_total: u64,
pub messages_total: u64,
pub branches_total: u64,
pub compactions_total: u64,
pub bytes_total: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ManifestFiles {
pub segment_dir: String,
pub segment_count: u64,
pub index_path: String,
pub checkpoint_dir: String,
pub migration_ledger_path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ManifestIntegrity {
pub chain_hash: String,
pub manifest_hash: String,
pub last_crc32c: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
#[allow(clippy::struct_excessive_bools)] pub struct ManifestInvariants {
pub parent_links_closed: bool,
pub monotonic_entry_seq: bool,
pub monotonic_segment_seq: bool,
pub index_within_segment_bounds: bool,
pub branch_heads_indexed: bool,
pub checkpoints_monotonic: bool,
pub hash_chain_valid: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct MigrationVerification {
pub entry_count_match: bool,
pub hash_chain_match: bool,
pub index_consistent: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct MigrationEvent {
pub schema: String,
pub migration_id: String,
pub phase: String,
pub at: String,
pub source_path: String,
pub target_path: String,
pub source_format: String,
pub target_format: String,
pub verification: MigrationVerification,
pub outcome: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub error_class: Option<String>,
pub correlation_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct IndexSummary {
pub entry_count: u64,
pub first_entry_seq: u64,
pub last_entry_seq: u64,
pub last_entry_id: String,
}
#[derive(Debug, Clone)]
pub struct SessionStoreV2 {
root: PathBuf,
max_segment_bytes: u64,
next_segment_seq: u64,
next_frame_seq: u64,
next_entry_seq: u64,
current_segment_bytes: u64,
chain_hash: String,
total_bytes: u64,
last_entry_id: Option<String>,
last_crc32c: String,
}
impl SessionStoreV2 {
pub fn create(root: impl AsRef<Path>, max_segment_bytes: u64) -> Result<Self> {
if max_segment_bytes == 0 {
return Err(Error::validation("max_segment_bytes must be > 0"));
}
let root = root.as_ref().to_path_buf();
fs::create_dir_all(root.join("segments"))?;
fs::create_dir_all(root.join("index"))?;
fs::create_dir_all(root.join("checkpoints"))?;
fs::create_dir_all(root.join("migrations"))?;
fs::create_dir_all(root.join("tmp"))?;
let mut store = Self {
root,
max_segment_bytes,
next_segment_seq: 1,
next_frame_seq: 1,
next_entry_seq: 1,
current_segment_bytes: 0,
chain_hash: GENESIS_CHAIN_HASH.to_string(),
total_bytes: 0,
last_entry_id: None,
last_crc32c: "00000000".to_string(),
};
if let Err(err) = store.bootstrap_from_disk() {
if is_recoverable_index_error(&err) {
tracing::warn!(
root = %store.root.display(),
error = %err,
"SessionStoreV2 bootstrap failed with recoverable index error; attempting index rebuild"
);
store.rebuild_index()?;
store.bootstrap_from_disk()?;
} else {
return Err(err);
}
}
if store.entry_count() == 0 && store.segments_exist_with_data()? {
tracing::warn!(
root = %store.root.display(),
"SessionStoreV2 detected segment data with empty index; rebuilding index"
);
store.rebuild_index()?;
store.bootstrap_from_disk()?;
}
if let Err(err) = store.validate_integrity() {
if is_recoverable_index_error(&err) {
tracing::warn!(
root = %store.root.display(),
error = %err,
"SessionStoreV2 integrity validation failed with recoverable error; rebuilding index"
);
store.rebuild_index()?;
store.bootstrap_from_disk()?;
store.validate_integrity()?;
} else {
return Err(err);
}
}
Ok(store)
}
pub fn segment_file_path(&self, segment_seq: u64) -> PathBuf {
self.root
.join("segments")
.join(format!("{segment_seq:016}.seg"))
}
pub fn index_file_path(&self) -> PathBuf {
self.root.join("index").join("offsets.jsonl")
}
fn manifest_path(&self) -> PathBuf {
self.root.join("manifest.json")
}
fn migration_ledger_path(&self) -> PathBuf {
self.root.join("migrations").join("ledger.jsonl")
}
fn list_segment_files(&self) -> Result<Vec<(u64, PathBuf)>> {
let segments_dir = self.root.join("segments");
if !segments_dir.exists() {
return Ok(Vec::new());
}
let mut segment_files = Vec::new();
for entry in fs::read_dir(segments_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
continue;
}
let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
continue;
};
let Ok(segment_seq) = stem.parse::<u64>() else {
continue;
};
segment_files.push((segment_seq, path));
}
segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
Ok(segment_files)
}
fn segments_exist_with_data(&self) -> Result<bool> {
for (_, path) in self.list_segment_files()? {
if fs::metadata(path)?.len() > 0 {
return Ok(true);
}
}
Ok(false)
}
#[allow(clippy::needless_pass_by_value)]
pub fn append_entry(
&mut self,
entry_id: impl Into<String>,
parent_entry_id: Option<String>,
entry_type: impl Into<String>,
payload: Value,
) -> Result<OffsetIndexEntry> {
let entry_id = entry_id.into();
let entry_type = entry_type.into();
let raw_string = serde_json::to_string(&payload)?;
let raw_payload = RawValue::from_string(raw_string)
.map_err(|e| Error::session(format!("failed to convert payload to RawValue: {e}")))?;
let mut frame = SegmentFrame::new(
self.next_segment_seq,
self.next_frame_seq,
self.next_entry_seq,
entry_id,
parent_entry_id,
entry_type,
raw_payload,
)?;
let mut encoded = serde_json::to_vec(&frame)?;
let mut line_len = line_length_u64(&encoded)?;
if self.current_segment_bytes > 0
&& self.current_segment_bytes.saturating_add(line_len) > self.max_segment_bytes
{
self.next_segment_seq = self
.next_segment_seq
.checked_add(1)
.ok_or_else(|| Error::session("segment sequence overflow"))?;
self.next_frame_seq = 1;
self.current_segment_bytes = 0;
frame = SegmentFrame::new(
self.next_segment_seq,
self.next_frame_seq,
self.next_entry_seq,
frame.entry_id.clone(),
frame.parent_entry_id.clone(),
frame.entry_type.clone(),
frame.payload.clone(),
)?;
encoded = serde_json::to_vec(&frame)?;
line_len = line_length_u64(&encoded)?;
}
let segment_path = self.segment_file_path(self.next_segment_seq);
let byte_offset = fs::metadata(&segment_path).map_or(0, |meta| meta.len());
let mut write_buf = encoded;
write_buf.push(b'\n');
let mut segment = OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&segment_path)?;
let pre_write_len = segment.seek(SeekFrom::End(0))?;
if let Err(e) = segment.write_all(&write_buf) {
let _ = segment.set_len(pre_write_len);
return Err(Error::from(e));
}
let crc = crc32c_upper(&write_buf);
let index_entry = OffsetIndexEntry {
schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
entry_seq: frame.entry_seq,
entry_id: frame.entry_id.clone(),
segment_seq: frame.segment_seq,
frame_seq: frame.frame_seq,
byte_offset,
byte_length: line_len,
crc32c: crc.clone(),
state: Cow::Borrowed("active"),
};
if let Err(e) = append_jsonl_line(&self.index_file_path(), &index_entry) {
let _ = segment.set_len(pre_write_len);
return Err(e);
}
self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
self.total_bytes = self.total_bytes.saturating_add(line_len);
self.last_entry_id = Some(frame.entry_id);
self.last_crc32c = crc;
self.next_entry_seq = self
.next_entry_seq
.checked_add(1)
.ok_or_else(|| Error::session("entry sequence overflow"))?;
self.next_frame_seq = self
.next_frame_seq
.checked_add(1)
.ok_or_else(|| Error::session("frame sequence overflow"))?;
self.current_segment_bytes = self.current_segment_bytes.saturating_add(line_len);
Ok(index_entry)
}
pub fn read_segment(&self, segment_seq: u64) -> Result<Vec<SegmentFrame>> {
let path = self.segment_file_path(segment_seq);
if !path.exists() {
return Ok(Vec::new());
}
read_jsonl::<SegmentFrame>(&path)
}
pub fn read_index(&self) -> Result<Vec<OffsetIndexEntry>> {
let path = self.index_file_path();
if !path.exists() {
return Ok(Vec::new());
}
read_jsonl::<OffsetIndexEntry>(&path)
}
pub fn lookup_entry(&self, target_entry_seq: u64) -> Result<Option<SegmentFrame>> {
let index_rows = self.read_index()?;
let row = index_rows.iter().find(|r| r.entry_seq == target_entry_seq);
let Some(row) = row else {
return Ok(None);
};
seek_read_frame(self, row)
}
pub fn read_entries_from(&self, from_entry_seq: u64) -> Result<Vec<SegmentFrame>> {
let index_rows = self.read_index()?;
let mut frames = Vec::new();
for row in &index_rows {
if row.entry_seq < from_entry_seq {
continue;
}
if let Some(frame) = seek_read_frame(self, row)? {
frames.push(frame);
}
}
Ok(frames)
}
pub fn read_all_entries(&self) -> Result<Vec<SegmentFrame>> {
self.read_entries_from(1)
}
pub fn read_tail_entries(&self, count: u64) -> Result<Vec<SegmentFrame>> {
let index_rows = self.read_index()?;
let total = index_rows.len();
let skip = total.saturating_sub(usize::try_from(count).unwrap_or(usize::MAX));
let mut frames = Vec::with_capacity(total.saturating_sub(skip));
for row in &index_rows[skip..] {
if let Some(frame) = seek_read_frame(self, row)? {
frames.push(frame);
}
}
Ok(frames)
}
pub fn read_active_path(&self, leaf_entry_id: &str) -> Result<Vec<SegmentFrame>> {
let index_rows = self.read_index()?;
let id_to_row: std::collections::HashMap<&str, &OffsetIndexEntry> = index_rows
.iter()
.map(|row| (row.entry_id.as_str(), row))
.collect();
let mut frames = Vec::new();
let mut current_id: Option<String> = Some(leaf_entry_id.to_string());
while let Some(ref entry_id) = current_id {
let row = id_to_row.get(entry_id.as_str());
let row = match row {
Some(r) => *r,
None => break,
};
match seek_read_frame(self, row)? {
Some(frame) => {
current_id.clone_from(&frame.parent_entry_id);
frames.push(frame);
}
None => break,
}
}
frames.reverse();
Ok(frames)
}
pub const fn entry_count(&self) -> u64 {
self.next_entry_seq.saturating_sub(1)
}
pub fn head(&self) -> Option<StoreHead> {
self.last_entry_id.as_ref().map(|entry_id| StoreHead {
segment_seq: self.next_segment_seq,
entry_seq: self.next_entry_seq.saturating_sub(1),
entry_id: entry_id.clone(),
})
}
fn checkpoint_path(&self, checkpoint_seq: u64) -> PathBuf {
self.root
.join("checkpoints")
.join(format!("{checkpoint_seq:016}.json"))
}
pub fn create_checkpoint(&self, checkpoint_seq: u64, reason: &str) -> Result<Checkpoint> {
let head = self.head().unwrap_or(StoreHead {
segment_seq: 0,
entry_seq: 0,
entry_id: String::new(),
});
let snapshot_ref = format!("checkpoints/{checkpoint_seq:016}.json");
let checkpoint = Checkpoint {
schema: CHECKPOINT_SCHEMA.to_string(),
checkpoint_seq,
at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
head_entry_seq: head.entry_seq,
head_entry_id: head.entry_id,
snapshot_ref,
compacted_before_entry_seq: 0,
chain_hash: self.chain_hash.clone(),
reason: reason.to_string(),
};
let tmp_path = self
.root
.join("tmp")
.join(format!("{checkpoint_seq:016}.json.tmp"));
fs::write(&tmp_path, serde_json::to_vec_pretty(&checkpoint)?)?;
fs::rename(&tmp_path, self.checkpoint_path(checkpoint_seq))?;
Ok(checkpoint)
}
pub fn read_checkpoint(&self, checkpoint_seq: u64) -> Result<Option<Checkpoint>> {
let path = self.checkpoint_path(checkpoint_seq);
if !path.exists() {
return Ok(None);
}
let data = fs::read_to_string(&path)?;
let cp: Checkpoint = serde_json::from_str(&data)?;
Ok(Some(cp))
}
pub fn append_migration_event(&self, mut event: MigrationEvent) -> Result<()> {
if event.schema.is_empty() {
event.schema = MIGRATION_EVENT_SCHEMA.to_string();
}
if event.at.is_empty() {
event.at = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
}
append_jsonl_line(&self.migration_ledger_path(), &event)
}
pub fn read_migration_events(&self) -> Result<Vec<MigrationEvent>> {
let path = self.migration_ledger_path();
if !path.exists() {
return Ok(Vec::new());
}
read_jsonl::<MigrationEvent>(&path)
}
#[allow(clippy::too_many_lines)]
pub fn rollback_to_checkpoint(
&mut self,
checkpoint_seq: u64,
migration_id: impl Into<String>,
correlation_id: impl Into<String>,
) -> Result<MigrationEvent> {
let migration_id = migration_id.into();
let correlation_id = correlation_id.into();
let rollback_result: Result<MigrationEvent> = (|| {
let checkpoint = self
.read_checkpoint(checkpoint_seq)?
.ok_or_else(|| Error::session(format!("checkpoint {checkpoint_seq} not found")))?;
let mut index_rows = self.read_index()?;
index_rows.retain(|row| row.entry_seq <= checkpoint.head_entry_seq);
let mut keep_len_by_segment: std::collections::HashMap<u64, u64> =
std::collections::HashMap::new();
for row in &index_rows {
let end = row
.byte_offset
.checked_add(row.byte_length)
.ok_or_else(|| Error::session("index byte range overflow during rollback"))?;
keep_len_by_segment
.entry(row.segment_seq)
.and_modify(|current| *current = (*current).max(end))
.or_insert(end);
}
let segments_dir = self.root.join("segments");
if segments_dir.exists() {
let mut segment_files: Vec<(u64, PathBuf)> = Vec::new();
for entry in fs::read_dir(&segments_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
continue;
}
let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
continue;
};
let Ok(segment_seq) = stem.parse::<u64>() else {
continue;
};
segment_files.push((segment_seq, path));
}
segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
for (segment_seq, path) in segment_files {
match keep_len_by_segment.get(&segment_seq).copied() {
Some(keep_len) if keep_len > 0 => {
let current_len = fs::metadata(&path)?.len();
if keep_len < current_len {
let file = OpenOptions::new().write(true).open(&path)?;
file.set_len(keep_len)?;
}
}
_ => {
fs::remove_file(&path)?;
}
}
}
}
let index_path = self.index_file_path();
let index_tmp = self.root.join("tmp").join("offsets.rollback.tmp");
write_jsonl_lines(&index_tmp, &index_rows)?;
fs::rename(index_tmp, index_path)?;
self.next_segment_seq = 1;
self.next_frame_seq = 1;
self.next_entry_seq = 1;
self.current_segment_bytes = 0;
self.bootstrap_from_disk()?;
let verification = MigrationVerification {
entry_count_match: self.entry_count() == checkpoint.head_entry_seq,
hash_chain_match: self.chain_hash == checkpoint.chain_hash,
index_consistent: self.validate_integrity().is_ok(),
};
let (outcome, error_class) = if verification.entry_count_match
&& verification.hash_chain_match
&& verification.index_consistent
{
("ok".to_string(), None)
} else if verification.index_consistent {
(
"recoverable_error".to_string(),
Some("integrity_mismatch".to_string()),
)
} else {
(
"fatal_error".to_string(),
Some("index_corruption".to_string()),
)
};
let event = MigrationEvent {
schema: MIGRATION_EVENT_SCHEMA.to_string(),
migration_id: migration_id.clone(),
phase: "rollback".to_string(),
at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
source_path: checkpoint.snapshot_ref,
target_path: self.root.display().to_string(),
source_format: "native_v2".to_string(),
target_format: "native_v2".to_string(),
verification,
outcome: outcome.clone(),
error_class,
correlation_id: correlation_id.clone(),
};
self.append_migration_event(event.clone())?;
if outcome == "ok" {
Ok(event)
} else {
Err(Error::session(format!(
"rollback verification failed for checkpoint {checkpoint_seq}"
)))
}
})();
match rollback_result {
Ok(event) => Ok(event),
Err(error) => {
if !rollback_failure_event_already_recorded(&error) {
let failure_event = MigrationEvent {
schema: MIGRATION_EVENT_SCHEMA.to_string(),
migration_id,
phase: "rollback".to_string(),
at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
source_path: self.checkpoint_path(checkpoint_seq).display().to_string(),
target_path: self.root.display().to_string(),
source_format: "native_v2".to_string(),
target_format: "native_v2".to_string(),
verification: MigrationVerification {
entry_count_match: false,
hash_chain_match: false,
index_consistent: false,
},
outcome: "fatal_error".to_string(),
error_class: Some(classify_rollback_error(&error).to_string()),
correlation_id,
};
let _ = self.append_migration_event(failure_event);
}
Err(error)
}
}
}
#[allow(clippy::too_many_lines)]
pub fn write_manifest(
&self,
session_id: impl Into<String>,
source_format: impl Into<String>,
) -> Result<Manifest> {
let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
let created_at = self
.read_manifest()?
.map_or_else(|| now.clone(), |m| m.created_at);
let session_id = session_id.into();
let source_format = source_format.into();
let index_rows = self.read_index()?;
let mut parent_counts: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
let mut message_count = 0u64;
let mut compaction_count = 0u64;
let mut entry_ids = std::collections::HashSet::with_capacity(index_rows.len());
let mut recomputed_chain = GENESIS_CHAIN_HASH.to_string();
let mut parent_links_closed = true;
for row in &index_rows {
if let Some(frame) = seek_read_frame(self, row)? {
entry_ids.insert(frame.entry_id.clone());
if frame.entry_type == "message" {
message_count = message_count.saturating_add(1);
}
if frame.entry_type == "compaction" {
compaction_count = compaction_count.saturating_add(1);
}
if let Some(parent_id) = frame.parent_entry_id.as_deref() {
*parent_counts.entry(parent_id.to_string()).or_insert(0) += 1;
if !entry_ids.contains(parent_id) {
parent_links_closed = false;
}
}
recomputed_chain = chain_hash_step(&recomputed_chain, &frame.payload_sha256);
}
}
let branches_total = u64::try_from(parent_counts.values().filter(|&&n| n > 1).count())
.map_err(|_| Error::session("branch count exceeds u64"))?;
let mut monotonic_entry_seq = true;
let mut monotonic_segment_seq = true;
let mut last_entry_seq = 0u64;
let mut last_segment_seq = 0u64;
for row in &index_rows {
if row.entry_seq <= last_entry_seq {
monotonic_entry_seq = false;
}
if row.segment_seq < last_segment_seq {
monotonic_segment_seq = false;
}
last_entry_seq = row.entry_seq;
last_segment_seq = row.segment_seq;
}
let hash_chain_valid = recomputed_chain == self.chain_hash;
let head = self.head().unwrap_or(StoreHead {
segment_seq: 0,
entry_seq: 0,
entry_id: String::new(),
});
let segment_count = u64::try_from(
index_rows
.iter()
.map(|row| row.segment_seq)
.collect::<BTreeSet<_>>()
.len(),
)
.map_err(|_| Error::session("segment count exceeds u64"))?;
let mut manifest = Manifest {
schema: MANIFEST_SCHEMA.to_string(),
store_version: 2,
session_id,
source_format,
created_at,
updated_at: now,
head,
counters: ManifestCounters {
entries_total: u64::try_from(index_rows.len())
.map_err(|_| Error::session("entry count exceeds u64"))?,
messages_total: message_count,
branches_total,
compactions_total: compaction_count,
bytes_total: self.total_bytes,
},
files: ManifestFiles {
segment_dir: "segments/".to_string(),
segment_count,
index_path: "index/offsets.jsonl".to_string(),
checkpoint_dir: "checkpoints/".to_string(),
migration_ledger_path: "migrations/ledger.jsonl".to_string(),
},
integrity: ManifestIntegrity {
chain_hash: self.chain_hash.clone(),
manifest_hash: String::new(),
last_crc32c: self.last_crc32c.clone(),
},
invariants: ManifestInvariants {
parent_links_closed,
monotonic_entry_seq,
monotonic_segment_seq,
index_within_segment_bounds: self.validate_integrity().is_ok(),
branch_heads_indexed: true,
checkpoints_monotonic: true,
hash_chain_valid,
},
};
manifest.integrity.manifest_hash = manifest_hash_hex(&manifest)?;
let tmp = self.root.join("tmp").join("manifest.json.tmp");
fs::write(&tmp, serde_json::to_vec_pretty(&manifest)?)?;
fs::rename(&tmp, self.manifest_path())?;
Ok(manifest)
}
pub fn read_manifest(&self) -> Result<Option<Manifest>> {
let path = self.manifest_path();
if !path.exists() {
return Ok(None);
}
let content = fs::read_to_string(path)?;
let manifest: Manifest = serde_json::from_str(&content)?;
Ok(Some(manifest))
}
pub fn chain_hash(&self) -> &str {
&self.chain_hash
}
pub const fn total_bytes(&self) -> u64 {
self.total_bytes
}
pub fn index_summary(&self) -> Result<Option<IndexSummary>> {
let rows = self.read_index()?;
let (Some(first), Some(last)) = (rows.first(), rows.last()) else {
return Ok(None);
};
Ok(Some(IndexSummary {
entry_count: u64::try_from(rows.len())
.map_err(|_| Error::session("entry count exceeds u64"))?,
first_entry_seq: first.entry_seq,
last_entry_seq: last.entry_seq,
last_entry_id: last.entry_id.clone(),
}))
}
#[allow(clippy::too_many_lines)]
pub fn rebuild_index(&mut self) -> Result<u64> {
let mut rebuilt_count = 0u64;
let index_path = self.index_file_path();
let index_tmp_path = self.root.join("tmp").join("offsets.rebuild.tmp");
if let Some(parent) = index_tmp_path.parent() {
fs::create_dir_all(parent)?;
}
if index_tmp_path.exists() {
fs::remove_file(&index_tmp_path)?;
}
let mut index_writer = std::io::BufWriter::new(
OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&index_tmp_path)?,
);
self.chain_hash = GENESIS_CHAIN_HASH.to_string();
self.total_bytes = 0;
self.last_entry_id = None;
self.last_crc32c = "00000000".to_string();
let segment_files = self.list_segment_files()?;
let mut last_observed_seq = 0u64;
'segments: for (i, (_seg_seq, seg_path)) in segment_files.iter().enumerate() {
let file = File::open(seg_path)?;
let mut reader = BufReader::new(file);
let mut byte_offset = 0u64;
let mut line_number = 0u64;
let mut line = String::new();
loop {
line.clear();
let bytes_read = match read_line_with_limit(
&mut reader,
&mut line,
MAX_FRAME_READ_BYTES,
) {
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
tracing::warn!(
segment = %seg_path.display(),
line_number,
error = %e,
"SessionStoreV2 encountered oversized line during index rebuild; truncating segment"
);
drop(reader);
truncate_file_to(seg_path, byte_offset)?;
remove_orphaned_segments(&segment_files[i + 1..])?;
break 'segments;
}
Err(e) => return Err(Error::Io(Box::new(e))),
};
if bytes_read == 0 {
break;
}
line_number = line_number.saturating_add(1);
let line_len = u64::try_from(bytes_read)
.map_err(|_| Error::session("line length exceeds u64"))?;
if line.trim().is_empty() {
byte_offset = byte_offset.saturating_add(line_len);
continue;
}
let missing_newline = !line.ends_with('\n');
let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
let frame: SegmentFrame = match serde_json::from_str(json_line) {
Ok(frame) => {
if missing_newline {
tracing::warn!(
segment = %seg_path.display(),
line_number,
"SessionStoreV2 dropping valid but newline-missing trailing segment frame during index rebuild"
);
drop(reader);
truncate_file_to(seg_path, byte_offset)?;
remove_orphaned_segments(&segment_files[i + 1..])?;
break 'segments;
}
frame
}
Err(err) => {
let at_eof = reader.fill_buf()?.is_empty();
if at_eof && missing_newline {
tracing::warn!(
segment = %seg_path.display(),
line_number,
error = %err,
"SessionStoreV2 dropping truncated trailing segment frame during index rebuild"
);
drop(reader);
truncate_file_to(seg_path, byte_offset)?;
remove_orphaned_segments(&segment_files[i + 1..])?;
break 'segments;
}
return Err(Error::session(format!(
"failed to parse segment frame while rebuilding index: \
segment={} line={line_number}: {err}",
seg_path.display()
)));
}
};
if frame.entry_seq <= last_observed_seq {
tracing::warn!(
segment = %seg_path.display(),
line_number,
entry_seq = frame.entry_seq,
last_seq = last_observed_seq,
"SessionStoreV2 detected non-monotonic entry sequence during rebuild; truncating segment"
);
drop(reader);
truncate_file_to(seg_path, byte_offset)?;
remove_orphaned_segments(&segment_files[i + 1..])?;
break 'segments;
}
last_observed_seq = frame.entry_seq;
let record_bytes = line.as_bytes().to_vec();
let crc = crc32c_upper(&record_bytes);
let index_entry = OffsetIndexEntry {
schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
entry_seq: frame.entry_seq,
entry_id: frame.entry_id.clone(),
segment_seq: frame.segment_seq,
frame_seq: frame.frame_seq,
byte_offset,
byte_length: line_len,
crc32c: crc.clone(),
state: Cow::Borrowed("active"),
};
serde_json::to_writer(&mut index_writer, &index_entry)?;
index_writer.write_all(b"\n")?;
self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
self.total_bytes = self.total_bytes.saturating_add(line_len);
self.last_entry_id = Some(frame.entry_id);
self.last_crc32c = crc;
byte_offset = byte_offset.saturating_add(line_len);
rebuilt_count = rebuilt_count.saturating_add(1);
}
}
index_writer.flush()?;
drop(index_writer);
fs::rename(&index_tmp_path, &index_path)?;
self.next_segment_seq = 1;
self.next_frame_seq = 1;
self.next_entry_seq = 1;
self.current_segment_bytes = 0;
self.bootstrap_from_disk()?;
Ok(rebuilt_count)
}
pub fn validate_integrity(&self) -> Result<()> {
let index_rows = self.read_index()?;
let mut last_entry_seq = 0;
let mut rows_by_segment: std::collections::BTreeMap<u64, Vec<&OffsetIndexEntry>> =
std::collections::BTreeMap::new();
for row in &index_rows {
if row.entry_seq <= last_entry_seq {
return Err(Error::session(format!(
"entry sequence is not strictly increasing at entry_seq={}",
row.entry_seq
)));
}
last_entry_seq = row.entry_seq;
rows_by_segment
.entry(row.segment_seq)
.or_default()
.push(row);
}
for (segment_seq, rows) in rows_by_segment {
let segment_path = self.segment_file_path(segment_seq);
let mut file = File::open(&segment_path).map_err(|err| {
Error::session(format!(
"failed to open segment {}: {err}",
segment_path.display()
))
})?;
let segment_len = file.metadata()?.len();
for row in rows {
let end = row
.byte_offset
.checked_add(row.byte_length)
.ok_or_else(|| Error::session("index byte range overflow"))?;
if end > segment_len {
return Err(Error::session(format!(
"index out of bounds for segment {}: end={} len={segment_len}",
segment_path.display(),
end
)));
}
file.seek(SeekFrom::Start(row.byte_offset))?;
let mut record_bytes = vec![
0u8;
usize::try_from(row.byte_length).map_err(|_| {
Error::session(format!("byte length too large: {}", row.byte_length))
})?
];
file.read_exact(&mut record_bytes)?;
let checksum = crc32c_upper(&record_bytes);
if checksum != row.crc32c {
return Err(Error::session(format!(
"checksum mismatch for entry_seq={} expected={} actual={checksum}",
row.entry_seq, row.crc32c
)));
}
if record_bytes.last() == Some(&b'\n') {
record_bytes.pop();
}
let frame: SegmentFrame = serde_json::from_slice(&record_bytes)?;
if frame.entry_seq != row.entry_seq
|| frame.entry_id != row.entry_id
|| frame.segment_seq != row.segment_seq
|| frame.frame_seq != row.frame_seq
{
return Err(Error::session(format!(
"index/frame mismatch at entry_seq={}",
row.entry_seq
)));
}
let (payload_hash, payload_bytes) = payload_hash_and_size(&frame.payload)?;
if frame.payload_sha256 != payload_hash || frame.payload_bytes != payload_bytes {
return Err(Error::session(format!(
"payload integrity mismatch at entry_seq={}",
row.entry_seq
)));
}
}
}
Ok(())
}
fn bootstrap_from_disk(&mut self) -> Result<()> {
let index_rows = self.read_index()?;
if let Some(last) = index_rows.last() {
self.next_entry_seq = last
.entry_seq
.checked_add(1)
.ok_or_else(|| Error::session("entry sequence overflow while bootstrapping"))?;
self.next_segment_seq = last.segment_seq;
self.next_frame_seq = last
.frame_seq
.checked_add(1)
.ok_or_else(|| Error::session("frame sequence overflow while bootstrapping"))?;
let segment_path = self.segment_file_path(last.segment_seq);
self.current_segment_bytes = fs::metadata(&segment_path)
.map(|meta| meta.len())
.map_err(|err| {
Error::session(format!(
"failed to stat active segment {} while bootstrapping: {err}",
segment_path.display()
))
})?;
self.last_entry_id = Some(last.entry_id.clone());
self.last_crc32c.clone_from(&last.crc32c);
let mut chain = GENESIS_CHAIN_HASH.to_string();
let mut total = 0u64;
for row in &index_rows {
let frame = seek_read_frame(self, row)?.ok_or_else(|| {
Error::session(format!(
"index references missing frame during bootstrap: entry_seq={}, segment={}",
row.entry_seq, row.segment_seq
))
})?;
chain = chain_hash_step(&chain, &frame.payload_sha256);
total = total.saturating_add(row.byte_length);
}
self.chain_hash = chain;
self.total_bytes = total;
} else {
self.chain_hash = GENESIS_CHAIN_HASH.to_string();
self.total_bytes = 0;
self.last_entry_id = None;
self.last_crc32c = "00000000".to_string();
}
Ok(())
}
}
fn rollback_failure_event_already_recorded(error: &Error) -> bool {
matches!(error, Error::Session(message) if message.contains("rollback verification failed"))
}
fn classify_rollback_error(error: &Error) -> &'static str {
match error {
Error::Session(message) => {
if message.contains("checkpoint") && message.contains("not found") {
"checkpoint_not_found"
} else if message.contains("index byte range overflow") {
"index_range_overflow"
} else if message.contains("rollback verification failed") {
"rollback_verification_failed"
} else {
"session_error"
}
}
_ => error.category_code(),
}
}
fn is_recoverable_index_error(error: &Error) -> bool {
match error {
Error::Json(_) => true,
Error::Io(err) => matches!(
err.kind(),
std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::InvalidData
),
Error::Session(message) => {
let lower = message.to_ascii_lowercase();
lower.contains("checksum mismatch")
|| lower.contains("index out of bounds")
|| lower.contains("index/frame mismatch")
|| lower.contains("payload integrity mismatch")
|| lower.contains("entry sequence is not strictly increasing")
|| lower.contains("index byte range overflow")
}
_ => false,
}
}
pub fn frame_to_session_entry(frame: &SegmentFrame) -> Result<SessionEntry> {
let entry: SessionEntry = serde_json::from_str(frame.payload.get()).map_err(|e| {
Error::session(format!(
"failed to deserialize SessionEntry from frame entry_id={}: {e}",
frame.entry_id
))
})?;
if let Some(base_id) = entry.base_id() {
if base_id != &frame.entry_id {
return Err(Error::session(format!(
"frame entry_id mismatch: frame={} entry={}",
frame.entry_id, base_id
)));
}
}
Ok(entry)
}
pub fn session_entry_to_frame_args(
entry: &SessionEntry,
) -> Result<(String, Option<String>, String, Value)> {
let base = entry.base();
let entry_id = base
.id
.clone()
.ok_or_else(|| Error::session("SessionEntry has no id"))?;
let parent_entry_id = base.parent_id.clone();
let entry_type = match entry {
SessionEntry::Message(_) => "message",
SessionEntry::ModelChange(_) => "model_change",
SessionEntry::ThinkingLevelChange(_) => "thinking_level_change",
SessionEntry::Compaction(_) => "compaction",
SessionEntry::BranchSummary(_) => "branch_summary",
SessionEntry::Label(_) => "label",
SessionEntry::SessionInfo(_) => "session_info",
SessionEntry::Custom(_) => "custom",
};
let payload = serde_json::to_value(entry).map_err(|e| {
Error::session(format!(
"failed to serialize SessionEntry to frame payload: {e}"
))
})?;
Ok((entry_id, parent_entry_id, entry_type.to_string(), payload))
}
fn seek_read_frame(store: &SessionStoreV2, row: &OffsetIndexEntry) -> Result<Option<SegmentFrame>> {
let segment_path = store.segment_file_path(row.segment_seq);
if !segment_path.exists() {
return Ok(None);
}
let mut file = File::open(&segment_path)?;
let file_len = file.metadata()?.len();
let end_offset = row
.byte_offset
.checked_add(row.byte_length)
.ok_or_else(|| Error::session("index byte range overflow"))?;
if end_offset > file_len {
return Err(Error::session(format!(
"index out of bounds for segment {}: end={} len={}",
segment_path.display(),
end_offset,
file_len
)));
}
file.seek(SeekFrom::Start(row.byte_offset))?;
let byte_len = usize::try_from(row.byte_length)
.map_err(|_| Error::session(format!("byte length too large: {}", row.byte_length)))?;
if row.byte_length > store.max_segment_bytes.max(100 * 1024 * 1024) {
return Err(Error::session(format!(
"frame byte length {byte_len} exceeds limit"
)));
}
let mut buf = vec![0u8; byte_len];
file.read_exact(&mut buf)?;
if buf.last() == Some(&b'\n') {
buf.pop();
}
let frame: SegmentFrame = serde_json::from_slice(&buf)?;
Ok(Some(frame))
}
fn chain_hash_step(prev_chain: &str, payload_sha256: &str) -> String {
let mut hasher = Sha256::new();
hasher.update(prev_chain.as_bytes());
hasher.update(payload_sha256.as_bytes());
format!("{:x}", hasher.finalize())
}
fn manifest_hash_hex(manifest: &Manifest) -> Result<String> {
let encoded = serde_json::to_vec(manifest)?;
Ok(format!("{:x}", Sha256::digest(&encoded)))
}
pub fn v2_sidecar_path(jsonl_path: &Path) -> PathBuf {
let stem = jsonl_path.file_stem().map_or_else(
|| "session".to_string(),
|s| s.to_string_lossy().into_owned(),
);
let parent = jsonl_path.parent().unwrap_or_else(|| Path::new("."));
parent.join(format!("{stem}.v2"))
}
pub fn has_v2_sidecar(jsonl_path: &Path) -> bool {
let root = v2_sidecar_path(jsonl_path);
root.join("manifest.json").exists() || root.join("index").join("offsets.jsonl").exists()
}
fn append_jsonl_line<T: Serialize>(path: &Path, value: &T) -> Result<()> {
let mut file = OpenOptions::new().create(true).append(true).open(path)?;
serde_json::to_writer(&mut file, value)?;
file.write_all(b"\n")?;
Ok(())
}
fn truncate_file_to(path: &Path, len: u64) -> Result<()> {
let file = OpenOptions::new().write(true).truncate(false).open(path)?;
file.set_len(len)?;
Ok(())
}
fn write_jsonl_lines<T: Serialize>(path: &Path, rows: &[T]) -> Result<()> {
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(path)?;
let mut writer = std::io::BufWriter::new(file);
for row in rows {
serde_json::to_writer(&mut writer, row)?;
writer.write_all(b"\n")?;
}
writer.flush()?;
Ok(())
}
fn read_jsonl<T: for<'de> Deserialize<'de>>(path: &Path) -> Result<Vec<T>> {
let file = File::open(path)?;
let mut reader = BufReader::new(file);
let mut out = Vec::new();
let mut line = String::new();
loop {
line.clear();
let bytes_read = read_line_with_limit(&mut reader, &mut line, MAX_FRAME_READ_BYTES)
.map_err(|e| Error::Io(Box::new(e)))?;
if bytes_read == 0 {
break;
}
if line.trim().is_empty() {
continue;
}
let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
out.push(serde_json::from_str::<T>(json_line)?);
}
Ok(out)
}
fn payload_hash_and_size(payload: &RawValue) -> Result<(String, u64)> {
let bytes = payload.get().as_bytes();
let payload_bytes = u64::try_from(bytes.len())
.map_err(|_| Error::session(format!("payload is too large: {} bytes", bytes.len())))?;
let hash = format!("{:x}", Sha256::digest(bytes));
Ok((hash, payload_bytes))
}
fn line_length_u64(encoded: &[u8]) -> Result<u64> {
let line_len = encoded
.len()
.checked_add(1)
.ok_or_else(|| Error::session("line length overflow"))?;
u64::try_from(line_len).map_err(|_| Error::session("line length exceeds u64"))
}
fn crc32c_upper(data: &[u8]) -> String {
let crc = crc32c::crc32c(data);
format!("{crc:08X}")
}
fn remove_orphaned_segments(segments: &[(u64, PathBuf)]) -> Result<()> {
for (_, orphan_path) in segments {
if orphan_path.exists() {
tracing::warn!(
path = %orphan_path.display(),
"removing orphaned segment file after index rebuild truncation"
);
if let Err(e) = fs::remove_file(orphan_path) {
if e.kind() != std::io::ErrorKind::NotFound {
return Err(Error::Io(Box::new(e)));
}
}
}
}
Ok(())
}
fn read_line_with_limit<R: BufRead>(
reader: &mut R,
buf: &mut String,
limit: u64,
) -> std::io::Result<usize> {
let mut take = reader.take(limit);
let n = take.read_line(buf)?;
if n > 0 && take.limit() == 0 && !buf.ends_with('\n') {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
format!("Line length exceeds limit of {limit} bytes"),
));
}
Ok(n)
}
#[cfg(test)]
mod proptests {
use super::*;
use proptest::prelude::*;
use serde_json::json;
proptest! {
#[test]
fn chain_hash_output_is_64_hex(
a in "[0-9a-f]{64}",
b in "[0-9a-f]{64}",
) {
let result = chain_hash_step(&a, &b);
assert_eq!(result.len(), 64);
assert!(result.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn chain_hash_deterministic(
a in "[0-9a-f]{64}",
b in "[0-9a-f]{64}",
) {
assert_eq!(chain_hash_step(&a, &b), chain_hash_step(&a, &b));
}
#[test]
fn chain_hash_non_commutative(
a in "[0-9a-f]{64}",
b in "[0-9a-f]{64}",
) {
if a != b {
assert_ne!(chain_hash_step(&a, &b), chain_hash_step(&b, &a));
}
}
#[test]
fn chain_hash_genesis_differs_from_step(payload in "[0-9a-f]{64}") {
let step1 = chain_hash_step(GENESIS_CHAIN_HASH, &payload);
assert_ne!(step1, GENESIS_CHAIN_HASH);
}
}
proptest! {
#[test]
fn crc32c_output_is_8_uppercase_hex(data in prop::collection::vec(any::<u8>(), 0..500)) {
let result = crc32c_upper(&data);
assert_eq!(result.len(), 8);
assert!(result.chars().all(|c| matches!(c, '0'..='9' | 'A'..='F')));
}
#[test]
fn crc32c_deterministic(data in prop::collection::vec(any::<u8>(), 0..500)) {
assert_eq!(crc32c_upper(&data), crc32c_upper(&data));
}
#[test]
fn crc32c_single_bit_sensitivity(byte in any::<u8>()) {
let a = crc32c_upper(&[byte]);
let b = crc32c_upper(&[byte ^ 1]);
if byte != byte ^ 1 {
assert_ne!(a, b, "flipping LSB should change CRC");
}
}
}
proptest! {
#[test]
fn payload_hash_is_64_hex(s in "[a-z]{0,50}") {
let val = json!(s);
let raw_string = serde_json::to_string(&val).unwrap();
let raw = RawValue::from_string(raw_string).unwrap();
let (hash, _size) = payload_hash_and_size(&raw).unwrap();
assert_eq!(hash.len(), 64);
assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn payload_size_matches_serialization(s in "[a-z]{0,50}") {
let val = json!(s);
let raw_string = serde_json::to_string(&val).unwrap();
let raw = RawValue::from_string(raw_string).unwrap();
let (_, size) = payload_hash_and_size(&raw).unwrap();
let expected = serde_json::to_vec(&val).unwrap().len() as u64;
assert_eq!(size, expected);
}
#[test]
fn payload_hash_deterministic(n in 0i64..10000) {
let val = json!(n);
let raw_string = serde_json::to_string(&val).unwrap();
let raw = RawValue::from_string(raw_string).unwrap();
let (h1, s1) = payload_hash_and_size(&raw).unwrap();
let (h2, s2) = payload_hash_and_size(&raw).unwrap();
assert_eq!(h1, h2);
assert_eq!(s1, s2);
}
}
proptest! {
#[test]
fn line_length_is_len_plus_one(data in prop::collection::vec(any::<u8>(), 0..1000)) {
let result = line_length_u64(&data).unwrap();
assert_eq!(result, data.len() as u64 + 1);
}
#[test]
fn line_length_never_zero(data in prop::collection::vec(any::<u8>(), 0..100)) {
let result = line_length_u64(&data).unwrap();
assert!(result >= 1);
}
}
proptest! {
#[test]
fn sidecar_path_ends_with_v2(stem in "[a-z]{1,10}") {
let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
let result = v2_sidecar_path(&input);
let name = result.file_name().unwrap().to_str().unwrap();
assert_eq!(
Path::new(name).extension().and_then(|ext| ext.to_str()),
Some("v2"),
"expected .v2 suffix, got {name}"
);
}
#[test]
fn sidecar_path_preserves_parent(stem in "[a-z]{1,10}", dir in "[a-z]{1,8}") {
let input = PathBuf::from(format!("/tmp/{dir}/{stem}.jsonl"));
let result = v2_sidecar_path(&input);
assert_eq!(
result.parent().unwrap(),
Path::new(&format!("/tmp/{dir}"))
);
}
#[test]
fn sidecar_path_deterministic(stem in "[a-z]{1,10}") {
let input = PathBuf::from(format!("/sessions/{stem}.jsonl"));
assert_eq!(v2_sidecar_path(&input), v2_sidecar_path(&input));
}
#[test]
fn sidecar_path_contains_stem(stem in "[a-z]{1,10}") {
let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
let result = v2_sidecar_path(&input);
let name = result.file_name().unwrap().to_str().unwrap();
assert_eq!(name, format!("{stem}.v2"));
}
}
}