1use crate::error::{Error, Result};
10use crate::session::SessionEntry;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use serde_json::value::RawValue;
14use sha2::{Digest, Sha256};
15use std::borrow::Cow;
16use std::collections::BTreeSet;
17use std::fs::{self, File, OpenOptions};
18use std::io::{BufRead, BufReader, Read, Seek, SeekFrom, Write};
19use std::path::{Path, PathBuf};
20
21pub const SEGMENT_FRAME_SCHEMA: &str = "pi.session_store_v2.segment_frame.v1";
22pub const OFFSET_INDEX_SCHEMA: &str = "pi.session_store_v2.offset_index.v1";
23pub const CHECKPOINT_SCHEMA: &str = "pi.session_store_v2.checkpoint.v1";
24pub const MANIFEST_SCHEMA: &str = "pi.session_store_v2.manifest.v1";
25pub const MIGRATION_EVENT_SCHEMA: &str = "pi.session_store_v2.migration_event.v1";
26
27const MAX_FRAME_READ_BYTES: u64 = 100 * 1024 * 1024;
29
30const GENESIS_CHAIN_HASH: &str = "0000000000000000000000000000000000000000000000000000000000000000";
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34#[serde(rename_all = "camelCase")]
35pub struct SegmentFrame {
36 pub schema: Cow<'static, str>,
37 pub segment_seq: u64,
38 pub frame_seq: u64,
39 pub entry_seq: u64,
40 pub entry_id: String,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 pub parent_entry_id: Option<String>,
43 pub entry_type: String,
44 pub timestamp: String,
45 pub payload_sha256: String,
46 pub payload_bytes: u64,
47 pub payload: Box<RawValue>,
48}
49
50impl SegmentFrame {
51 fn new(
52 segment_seq: u64,
53 frame_seq: u64,
54 entry_seq: u64,
55 entry_id: String,
56 parent_entry_id: Option<String>,
57 entry_type: String,
58 payload: Box<RawValue>,
59 ) -> Result<Self> {
60 let (payload_sha256, payload_bytes) = payload_hash_and_size(&payload)?;
61 Ok(Self {
62 schema: Cow::Borrowed(SEGMENT_FRAME_SCHEMA),
63 segment_seq,
64 frame_seq,
65 entry_seq,
66 entry_id,
67 parent_entry_id,
68 entry_type,
69 timestamp: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
70 payload_sha256,
71 payload_bytes,
72 payload,
73 })
74 }
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct OffsetIndexEntry {
80 pub schema: Cow<'static, str>,
81 pub entry_seq: u64,
82 pub entry_id: String,
83 pub segment_seq: u64,
84 pub frame_seq: u64,
85 pub byte_offset: u64,
86 pub byte_length: u64,
87 pub crc32c: String,
88 pub state: Cow<'static, str>,
89}
90
91#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
93#[serde(rename_all = "camelCase")]
94pub struct StoreHead {
95 pub segment_seq: u64,
96 pub entry_seq: u64,
97 pub entry_id: String,
98}
99
100#[derive(Debug, Clone, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct Checkpoint {
104 pub schema: String,
105 pub checkpoint_seq: u64,
106 pub at: String,
107 pub head_entry_seq: u64,
108 pub head_entry_id: String,
109 pub snapshot_ref: String,
110 pub compacted_before_entry_seq: u64,
111 pub chain_hash: String,
112 pub reason: String,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
116#[serde(rename_all = "camelCase")]
117pub struct Manifest {
118 pub schema: String,
119 pub store_version: u8,
120 pub session_id: String,
121 pub source_format: String,
122 pub created_at: String,
123 pub updated_at: String,
124 pub head: StoreHead,
125 pub counters: ManifestCounters,
126 pub files: ManifestFiles,
127 pub integrity: ManifestIntegrity,
128 pub invariants: ManifestInvariants,
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
132#[serde(rename_all = "camelCase")]
133pub struct ManifestCounters {
134 pub entries_total: u64,
135 pub messages_total: u64,
136 pub branches_total: u64,
137 pub compactions_total: u64,
138 pub bytes_total: u64,
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct ManifestFiles {
144 pub segment_dir: String,
145 pub segment_count: u64,
146 pub index_path: String,
147 pub checkpoint_dir: String,
148 pub migration_ledger_path: String,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize)]
152#[serde(rename_all = "camelCase")]
153pub struct ManifestIntegrity {
154 pub chain_hash: String,
155 pub manifest_hash: String,
156 pub last_crc32c: String,
157}
158
159#[derive(Debug, Clone, Serialize, Deserialize)]
160#[serde(rename_all = "camelCase")]
161#[allow(clippy::struct_excessive_bools)] pub struct ManifestInvariants {
163 pub parent_links_closed: bool,
164 pub monotonic_entry_seq: bool,
165 pub monotonic_segment_seq: bool,
166 pub index_within_segment_bounds: bool,
167 pub branch_heads_indexed: bool,
168 pub checkpoints_monotonic: bool,
169 pub hash_chain_valid: bool,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
173#[serde(rename_all = "camelCase")]
174pub struct MigrationVerification {
175 pub entry_count_match: bool,
176 pub hash_chain_match: bool,
177 pub index_consistent: bool,
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
181#[serde(rename_all = "camelCase")]
182pub struct MigrationEvent {
183 pub schema: String,
184 pub migration_id: String,
185 pub phase: String,
186 pub at: String,
187 pub source_path: String,
188 pub target_path: String,
189 pub source_format: String,
190 pub target_format: String,
191 pub verification: MigrationVerification,
192 pub outcome: String,
193 #[serde(skip_serializing_if = "Option::is_none")]
194 pub error_class: Option<String>,
195 pub correlation_id: String,
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
199#[serde(rename_all = "camelCase")]
200pub struct IndexSummary {
201 pub entry_count: u64,
202 pub first_entry_seq: u64,
203 pub last_entry_seq: u64,
204 pub last_entry_id: String,
205}
206
207#[derive(Debug, Clone)]
208pub struct SessionStoreV2 {
209 root: PathBuf,
210 max_segment_bytes: u64,
211 next_segment_seq: u64,
212 next_frame_seq: u64,
213 next_entry_seq: u64,
214 current_segment_bytes: u64,
215 chain_hash: String,
217 total_bytes: u64,
219 last_entry_id: Option<String>,
221 last_crc32c: String,
223}
224
225impl SessionStoreV2 {
226 pub fn create(root: impl AsRef<Path>, max_segment_bytes: u64) -> Result<Self> {
227 if max_segment_bytes == 0 {
228 return Err(Error::validation("max_segment_bytes must be > 0"));
229 }
230
231 let root = root.as_ref().to_path_buf();
232 fs::create_dir_all(root.join("segments"))?;
233 fs::create_dir_all(root.join("index"))?;
234 fs::create_dir_all(root.join("checkpoints"))?;
235 fs::create_dir_all(root.join("migrations"))?;
236 fs::create_dir_all(root.join("tmp"))?;
237
238 let mut store = Self {
239 root,
240 max_segment_bytes,
241 next_segment_seq: 1,
242 next_frame_seq: 1,
243 next_entry_seq: 1,
244 current_segment_bytes: 0,
245 chain_hash: GENESIS_CHAIN_HASH.to_string(),
246 total_bytes: 0,
247 last_entry_id: None,
248 last_crc32c: "00000000".to_string(),
249 };
250 if let Err(err) = store.bootstrap_from_disk() {
251 if is_recoverable_index_error(&err) {
252 tracing::warn!(
253 root = %store.root.display(),
254 error = %err,
255 "SessionStoreV2 bootstrap failed with recoverable index error; attempting index rebuild"
256 );
257 store.rebuild_index()?;
258 store.bootstrap_from_disk()?;
259 } else {
260 return Err(err);
261 }
262 }
263
264 if store.entry_count() == 0 && store.segments_exist_with_data()? {
267 tracing::warn!(
268 root = %store.root.display(),
269 "SessionStoreV2 detected segment data with empty index; rebuilding index"
270 );
271 store.rebuild_index()?;
272 store.bootstrap_from_disk()?;
273 }
274
275 if let Err(err) = store.validate_integrity() {
276 if is_recoverable_index_error(&err) {
277 tracing::warn!(
278 root = %store.root.display(),
279 error = %err,
280 "SessionStoreV2 integrity validation failed with recoverable error; rebuilding index"
281 );
282 store.rebuild_index()?;
283 store.bootstrap_from_disk()?;
284 store.validate_integrity()?;
285 } else {
286 return Err(err);
287 }
288 }
289 Ok(store)
290 }
291
292 pub fn segment_file_path(&self, segment_seq: u64) -> PathBuf {
293 self.root
294 .join("segments")
295 .join(format!("{segment_seq:016}.seg"))
296 }
297
298 pub fn index_file_path(&self) -> PathBuf {
299 self.root.join("index").join("offsets.jsonl")
300 }
301
302 fn manifest_path(&self) -> PathBuf {
303 self.root.join("manifest.json")
304 }
305
306 fn migration_ledger_path(&self) -> PathBuf {
307 self.root.join("migrations").join("ledger.jsonl")
308 }
309
310 fn list_segment_files(&self) -> Result<Vec<(u64, PathBuf)>> {
311 let segments_dir = self.root.join("segments");
312 if !segments_dir.exists() {
313 return Ok(Vec::new());
314 }
315
316 let mut segment_files = Vec::new();
317 for entry in fs::read_dir(segments_dir)? {
318 let entry = entry?;
319 let path = entry.path();
320 if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
321 continue;
322 }
323 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
324 continue;
325 };
326 let Ok(segment_seq) = stem.parse::<u64>() else {
327 continue;
328 };
329 segment_files.push((segment_seq, path));
330 }
331 segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
332 Ok(segment_files)
333 }
334
335 fn segments_exist_with_data(&self) -> Result<bool> {
336 for (_, path) in self.list_segment_files()? {
337 if fs::metadata(path)?.len() > 0 {
338 return Ok(true);
339 }
340 }
341 Ok(false)
342 }
343
344 #[allow(clippy::needless_pass_by_value)]
345 pub fn append_entry(
346 &mut self,
347 entry_id: impl Into<String>,
348 parent_entry_id: Option<String>,
349 entry_type: impl Into<String>,
350 payload: Value,
351 ) -> Result<OffsetIndexEntry> {
352 let entry_id = entry_id.into();
353 let entry_type = entry_type.into();
354
355 let raw_string = serde_json::to_string(&payload)?;
360 let raw_payload = RawValue::from_string(raw_string)
361 .map_err(|e| Error::session(format!("failed to convert payload to RawValue: {e}")))?;
362
363 let mut frame = SegmentFrame::new(
364 self.next_segment_seq,
365 self.next_frame_seq,
366 self.next_entry_seq,
367 entry_id,
368 parent_entry_id,
369 entry_type,
370 raw_payload,
371 )?;
372 let mut encoded = serde_json::to_vec(&frame)?;
373 let mut line_len = line_length_u64(&encoded)?;
374
375 if self.current_segment_bytes > 0
376 && self.current_segment_bytes.saturating_add(line_len) > self.max_segment_bytes
377 {
378 self.next_segment_seq = self
379 .next_segment_seq
380 .checked_add(1)
381 .ok_or_else(|| Error::session("segment sequence overflow"))?;
382 self.next_frame_seq = 1;
383 self.current_segment_bytes = 0;
384
385 frame = SegmentFrame::new(
386 self.next_segment_seq,
387 self.next_frame_seq,
388 self.next_entry_seq,
389 frame.entry_id.clone(),
390 frame.parent_entry_id.clone(),
391 frame.entry_type.clone(),
392 frame.payload.clone(),
393 )?;
394 encoded = serde_json::to_vec(&frame)?;
395 line_len = line_length_u64(&encoded)?;
396 }
397
398 let segment_path = self.segment_file_path(self.next_segment_seq);
399 let byte_offset = fs::metadata(&segment_path).map_or(0, |meta| meta.len());
400
401 let mut write_buf = encoded;
403 write_buf.push(b'\n');
404
405 let mut segment = OpenOptions::new()
406 .create(true)
407 .write(true)
408 .truncate(false)
409 .open(&segment_path)?;
410
411 let pre_write_len = segment.seek(SeekFrom::End(0))?;
412 if let Err(e) = segment.write_all(&write_buf) {
413 let _ = segment.set_len(pre_write_len);
414 return Err(Error::from(e));
415 }
416
417 let crc = crc32c_upper(&write_buf);
419 let index_entry = OffsetIndexEntry {
420 schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
421 entry_seq: frame.entry_seq,
422 entry_id: frame.entry_id.clone(),
423 segment_seq: frame.segment_seq,
424 frame_seq: frame.frame_seq,
425 byte_offset,
426 byte_length: line_len,
427 crc32c: crc.clone(),
428 state: Cow::Borrowed("active"),
429 };
430
431 if let Err(e) = append_jsonl_line(&self.index_file_path(), &index_entry) {
432 let _ = segment.set_len(pre_write_len);
434 return Err(e);
435 }
436
437 self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
438 self.total_bytes = self.total_bytes.saturating_add(line_len);
439 self.last_entry_id = Some(frame.entry_id);
440 self.last_crc32c = crc;
441
442 self.next_entry_seq = self
443 .next_entry_seq
444 .checked_add(1)
445 .ok_or_else(|| Error::session("entry sequence overflow"))?;
446 self.next_frame_seq = self
447 .next_frame_seq
448 .checked_add(1)
449 .ok_or_else(|| Error::session("frame sequence overflow"))?;
450 self.current_segment_bytes = self.current_segment_bytes.saturating_add(line_len);
451
452 Ok(index_entry)
453 }
454
455 pub fn read_segment(&self, segment_seq: u64) -> Result<Vec<SegmentFrame>> {
456 let path = self.segment_file_path(segment_seq);
457 if !path.exists() {
458 return Ok(Vec::new());
459 }
460 read_jsonl::<SegmentFrame>(&path)
461 }
462
463 pub fn read_index(&self) -> Result<Vec<OffsetIndexEntry>> {
464 let path = self.index_file_path();
465 if !path.exists() {
466 return Ok(Vec::new());
467 }
468 read_jsonl::<OffsetIndexEntry>(&path)
469 }
470
471 pub fn lookup_entry(&self, target_entry_seq: u64) -> Result<Option<SegmentFrame>> {
474 let index_rows = self.read_index()?;
475 let row = index_rows.iter().find(|r| r.entry_seq == target_entry_seq);
476 let Some(row) = row else {
477 return Ok(None);
478 };
479 seek_read_frame(self, row)
480 }
481
482 pub fn read_entries_from(&self, from_entry_seq: u64) -> Result<Vec<SegmentFrame>> {
484 let index_rows = self.read_index()?;
485 let mut frames = Vec::new();
486 for row in &index_rows {
487 if row.entry_seq < from_entry_seq {
488 continue;
489 }
490 if let Some(frame) = seek_read_frame(self, row)? {
491 frames.push(frame);
492 }
493 }
494 Ok(frames)
495 }
496
497 pub fn read_all_entries(&self) -> Result<Vec<SegmentFrame>> {
499 self.read_entries_from(1)
500 }
501
502 pub fn read_tail_entries(&self, count: u64) -> Result<Vec<SegmentFrame>> {
504 let index_rows = self.read_index()?;
505 let total = index_rows.len();
506 let skip = total.saturating_sub(usize::try_from(count).unwrap_or(usize::MAX));
507 let mut frames = Vec::with_capacity(total.saturating_sub(skip));
508 for row in &index_rows[skip..] {
509 if let Some(frame) = seek_read_frame(self, row)? {
510 frames.push(frame);
511 }
512 }
513 Ok(frames)
514 }
515
516 pub fn read_active_path(&self, leaf_entry_id: &str) -> Result<Vec<SegmentFrame>> {
519 let index_rows = self.read_index()?;
520 let id_to_row: std::collections::HashMap<&str, &OffsetIndexEntry> = index_rows
521 .iter()
522 .map(|row| (row.entry_id.as_str(), row))
523 .collect();
524
525 let mut frames = Vec::new();
526 let mut current_id: Option<String> = Some(leaf_entry_id.to_string());
527 while let Some(ref entry_id) = current_id {
528 let row = id_to_row.get(entry_id.as_str());
529 let row = match row {
530 Some(r) => *r,
531 None => break,
532 };
533 match seek_read_frame(self, row)? {
534 Some(frame) => {
535 current_id.clone_from(&frame.parent_entry_id);
536 frames.push(frame);
537 }
538 None => break,
539 }
540 }
541 frames.reverse();
542 Ok(frames)
543 }
544
545 pub const fn entry_count(&self) -> u64 {
547 self.next_entry_seq.saturating_sub(1)
548 }
549
550 pub fn head(&self) -> Option<StoreHead> {
552 self.last_entry_id.as_ref().map(|entry_id| StoreHead {
553 segment_seq: self.next_segment_seq,
554 entry_seq: self.next_entry_seq.saturating_sub(1),
555 entry_id: entry_id.clone(),
556 })
557 }
558
559 fn checkpoint_path(&self, checkpoint_seq: u64) -> PathBuf {
560 self.root
561 .join("checkpoints")
562 .join(format!("{checkpoint_seq:016}.json"))
563 }
564
565 pub fn create_checkpoint(&self, checkpoint_seq: u64, reason: &str) -> Result<Checkpoint> {
567 let head = self.head().unwrap_or(StoreHead {
568 segment_seq: 0,
569 entry_seq: 0,
570 entry_id: String::new(),
571 });
572 let snapshot_ref = format!("checkpoints/{checkpoint_seq:016}.json");
573 let checkpoint = Checkpoint {
574 schema: CHECKPOINT_SCHEMA.to_string(),
575 checkpoint_seq,
576 at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
577 head_entry_seq: head.entry_seq,
578 head_entry_id: head.entry_id,
579 snapshot_ref,
580 compacted_before_entry_seq: 0,
581 chain_hash: self.chain_hash.clone(),
582 reason: reason.to_string(),
583 };
584 let tmp_path = self
585 .root
586 .join("tmp")
587 .join(format!("{checkpoint_seq:016}.json.tmp"));
588 fs::write(&tmp_path, serde_json::to_vec_pretty(&checkpoint)?)?;
589 fs::rename(&tmp_path, self.checkpoint_path(checkpoint_seq))?;
590 Ok(checkpoint)
591 }
592
593 pub fn read_checkpoint(&self, checkpoint_seq: u64) -> Result<Option<Checkpoint>> {
595 let path = self.checkpoint_path(checkpoint_seq);
596 if !path.exists() {
597 return Ok(None);
598 }
599 let data = fs::read_to_string(&path)?;
600 let cp: Checkpoint = serde_json::from_str(&data)?;
601 Ok(Some(cp))
602 }
603
604 pub fn append_migration_event(&self, mut event: MigrationEvent) -> Result<()> {
605 if event.schema.is_empty() {
606 event.schema = MIGRATION_EVENT_SCHEMA.to_string();
607 }
608 if event.at.is_empty() {
609 event.at = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
610 }
611 append_jsonl_line(&self.migration_ledger_path(), &event)
612 }
613
614 pub fn read_migration_events(&self) -> Result<Vec<MigrationEvent>> {
615 let path = self.migration_ledger_path();
616 if !path.exists() {
617 return Ok(Vec::new());
618 }
619 read_jsonl::<MigrationEvent>(&path)
620 }
621
622 #[allow(clippy::too_many_lines)]
623 pub fn rollback_to_checkpoint(
624 &mut self,
625 checkpoint_seq: u64,
626 migration_id: impl Into<String>,
627 correlation_id: impl Into<String>,
628 ) -> Result<MigrationEvent> {
629 let migration_id = migration_id.into();
630 let correlation_id = correlation_id.into();
631
632 let rollback_result: Result<MigrationEvent> = (|| {
633 let checkpoint = self
634 .read_checkpoint(checkpoint_seq)?
635 .ok_or_else(|| Error::session(format!("checkpoint {checkpoint_seq} not found")))?;
636
637 let mut index_rows = self.read_index()?;
638 index_rows.retain(|row| row.entry_seq <= checkpoint.head_entry_seq);
639
640 let mut keep_len_by_segment: std::collections::HashMap<u64, u64> =
641 std::collections::HashMap::new();
642 for row in &index_rows {
643 let end = row
644 .byte_offset
645 .checked_add(row.byte_length)
646 .ok_or_else(|| Error::session("index byte range overflow during rollback"))?;
647 keep_len_by_segment
648 .entry(row.segment_seq)
649 .and_modify(|current| *current = (*current).max(end))
650 .or_insert(end);
651 }
652
653 let segments_dir = self.root.join("segments");
654 if segments_dir.exists() {
655 let mut segment_files: Vec<(u64, PathBuf)> = Vec::new();
656 for entry in fs::read_dir(&segments_dir)? {
657 let entry = entry?;
658 let path = entry.path();
659 if path.extension().and_then(|ext| ext.to_str()) != Some("seg") {
660 continue;
661 }
662 let Some(stem) = path.file_stem().and_then(|s| s.to_str()) else {
663 continue;
664 };
665 let Ok(segment_seq) = stem.parse::<u64>() else {
666 continue;
667 };
668 segment_files.push((segment_seq, path));
669 }
670 segment_files.sort_by_key(|(segment_seq, _)| *segment_seq);
671
672 for (segment_seq, path) in segment_files {
673 match keep_len_by_segment.get(&segment_seq).copied() {
674 Some(keep_len) if keep_len > 0 => {
675 let current_len = fs::metadata(&path)?.len();
676 if keep_len < current_len {
677 let file = OpenOptions::new().write(true).open(&path)?;
678 file.set_len(keep_len)?;
679 }
680 }
681 _ => {
682 fs::remove_file(&path)?;
683 }
684 }
685 }
686 }
687
688 let index_path = self.index_file_path();
689 let index_tmp = self.root.join("tmp").join("offsets.rollback.tmp");
690 write_jsonl_lines(&index_tmp, &index_rows)?;
691 fs::rename(index_tmp, index_path)?;
692
693 self.next_segment_seq = 1;
694 self.next_frame_seq = 1;
695 self.next_entry_seq = 1;
696 self.current_segment_bytes = 0;
697 self.bootstrap_from_disk()?;
698
699 let verification = MigrationVerification {
700 entry_count_match: self.entry_count() == checkpoint.head_entry_seq,
701 hash_chain_match: self.chain_hash == checkpoint.chain_hash,
702 index_consistent: self.validate_integrity().is_ok(),
703 };
704
705 let (outcome, error_class) = if verification.entry_count_match
706 && verification.hash_chain_match
707 && verification.index_consistent
708 {
709 ("ok".to_string(), None)
710 } else if verification.index_consistent {
711 (
712 "recoverable_error".to_string(),
713 Some("integrity_mismatch".to_string()),
714 )
715 } else {
716 (
717 "fatal_error".to_string(),
718 Some("index_corruption".to_string()),
719 )
720 };
721
722 let event = MigrationEvent {
723 schema: MIGRATION_EVENT_SCHEMA.to_string(),
724 migration_id: migration_id.clone(),
725 phase: "rollback".to_string(),
726 at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
727 source_path: checkpoint.snapshot_ref,
728 target_path: self.root.display().to_string(),
729 source_format: "native_v2".to_string(),
730 target_format: "native_v2".to_string(),
731 verification,
732 outcome: outcome.clone(),
733 error_class,
734 correlation_id: correlation_id.clone(),
735 };
736 self.append_migration_event(event.clone())?;
737
738 if outcome == "ok" {
739 Ok(event)
740 } else {
741 Err(Error::session(format!(
742 "rollback verification failed for checkpoint {checkpoint_seq}"
743 )))
744 }
745 })();
746
747 match rollback_result {
748 Ok(event) => Ok(event),
749 Err(error) => {
750 if !rollback_failure_event_already_recorded(&error) {
751 let failure_event = MigrationEvent {
752 schema: MIGRATION_EVENT_SCHEMA.to_string(),
753 migration_id,
754 phase: "rollback".to_string(),
755 at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true),
756 source_path: self.checkpoint_path(checkpoint_seq).display().to_string(),
757 target_path: self.root.display().to_string(),
758 source_format: "native_v2".to_string(),
759 target_format: "native_v2".to_string(),
760 verification: MigrationVerification {
761 entry_count_match: false,
762 hash_chain_match: false,
763 index_consistent: false,
764 },
765 outcome: "fatal_error".to_string(),
766 error_class: Some(classify_rollback_error(&error).to_string()),
767 correlation_id,
768 };
769 let _ = self.append_migration_event(failure_event);
770 }
771 Err(error)
772 }
773 }
774 }
775
776 #[allow(clippy::too_many_lines)]
777 pub fn write_manifest(
778 &self,
779 session_id: impl Into<String>,
780 source_format: impl Into<String>,
781 ) -> Result<Manifest> {
782 let now = chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true);
783 let created_at = self
784 .read_manifest()?
785 .map_or_else(|| now.clone(), |m| m.created_at);
786 let session_id = session_id.into();
787 let source_format = source_format.into();
788 let index_rows = self.read_index()?;
789
790 let mut parent_counts: std::collections::HashMap<String, u64> =
791 std::collections::HashMap::new();
792 let mut message_count = 0u64;
793 let mut compaction_count = 0u64;
794 let mut entry_ids = std::collections::HashSet::with_capacity(index_rows.len());
795
796 let mut recomputed_chain = GENESIS_CHAIN_HASH.to_string();
797 let mut parent_links_closed = true;
798
799 for row in &index_rows {
800 if let Some(frame) = seek_read_frame(self, row)? {
801 entry_ids.insert(frame.entry_id.clone());
802
803 if frame.entry_type == "message" {
804 message_count = message_count.saturating_add(1);
805 }
806 if frame.entry_type == "compaction" {
807 compaction_count = compaction_count.saturating_add(1);
808 }
809
810 if let Some(parent_id) = frame.parent_entry_id.as_deref() {
811 *parent_counts.entry(parent_id.to_string()).or_insert(0) += 1;
812
813 if !entry_ids.contains(parent_id) {
816 parent_links_closed = false;
817 }
818 }
819
820 recomputed_chain = chain_hash_step(&recomputed_chain, &frame.payload_sha256);
821 }
822 }
823
824 let branches_total = u64::try_from(parent_counts.values().filter(|&&n| n > 1).count())
825 .map_err(|_| Error::session("branch count exceeds u64"))?;
826
827 let mut monotonic_entry_seq = true;
828 let mut monotonic_segment_seq = true;
829 let mut last_entry_seq = 0u64;
830 let mut last_segment_seq = 0u64;
831 for row in &index_rows {
832 if row.entry_seq <= last_entry_seq {
833 monotonic_entry_seq = false;
834 }
835 if row.segment_seq < last_segment_seq {
836 monotonic_segment_seq = false;
837 }
838 last_entry_seq = row.entry_seq;
839 last_segment_seq = row.segment_seq;
840 }
841
842 let hash_chain_valid = recomputed_chain == self.chain_hash;
843
844 let head = self.head().unwrap_or(StoreHead {
845 segment_seq: 0,
846 entry_seq: 0,
847 entry_id: String::new(),
848 });
849 let segment_count = u64::try_from(
850 index_rows
851 .iter()
852 .map(|row| row.segment_seq)
853 .collect::<BTreeSet<_>>()
854 .len(),
855 )
856 .map_err(|_| Error::session("segment count exceeds u64"))?;
857
858 let mut manifest = Manifest {
859 schema: MANIFEST_SCHEMA.to_string(),
860 store_version: 2,
861 session_id,
862 source_format,
863 created_at,
864 updated_at: now,
865 head,
866 counters: ManifestCounters {
867 entries_total: u64::try_from(index_rows.len())
868 .map_err(|_| Error::session("entry count exceeds u64"))?,
869 messages_total: message_count,
870 branches_total,
871 compactions_total: compaction_count,
872 bytes_total: self.total_bytes,
873 },
874 files: ManifestFiles {
875 segment_dir: "segments/".to_string(),
876 segment_count,
877 index_path: "index/offsets.jsonl".to_string(),
878 checkpoint_dir: "checkpoints/".to_string(),
879 migration_ledger_path: "migrations/ledger.jsonl".to_string(),
880 },
881 integrity: ManifestIntegrity {
882 chain_hash: self.chain_hash.clone(),
883 manifest_hash: String::new(),
884 last_crc32c: self.last_crc32c.clone(),
885 },
886 invariants: ManifestInvariants {
887 parent_links_closed,
888 monotonic_entry_seq,
889 monotonic_segment_seq,
890 index_within_segment_bounds: self.validate_integrity().is_ok(),
891 branch_heads_indexed: true,
892 checkpoints_monotonic: true,
893 hash_chain_valid,
894 },
895 };
896 manifest.integrity.manifest_hash = manifest_hash_hex(&manifest)?;
897
898 let tmp = self.root.join("tmp").join("manifest.json.tmp");
899 fs::write(&tmp, serde_json::to_vec_pretty(&manifest)?)?;
900 fs::rename(&tmp, self.manifest_path())?;
901 Ok(manifest)
902 }
903
904 pub fn read_manifest(&self) -> Result<Option<Manifest>> {
905 let path = self.manifest_path();
906 if !path.exists() {
907 return Ok(None);
908 }
909 let content = fs::read_to_string(path)?;
910 let manifest: Manifest = serde_json::from_str(&content)?;
911 Ok(Some(manifest))
912 }
913
914 pub fn chain_hash(&self) -> &str {
915 &self.chain_hash
916 }
917
918 pub const fn total_bytes(&self) -> u64 {
919 self.total_bytes
920 }
921
922 pub fn index_summary(&self) -> Result<Option<IndexSummary>> {
923 let rows = self.read_index()?;
924 let (Some(first), Some(last)) = (rows.first(), rows.last()) else {
925 return Ok(None);
926 };
927 Ok(Some(IndexSummary {
928 entry_count: u64::try_from(rows.len())
929 .map_err(|_| Error::session("entry count exceeds u64"))?,
930 first_entry_seq: first.entry_seq,
931 last_entry_seq: last.entry_seq,
932 last_entry_id: last.entry_id.clone(),
933 }))
934 }
935
936 #[allow(clippy::too_many_lines)]
939 pub fn rebuild_index(&mut self) -> Result<u64> {
940 let mut rebuilt_count = 0u64;
941 let index_path = self.index_file_path();
942 let index_tmp_path = self.root.join("tmp").join("offsets.rebuild.tmp");
943
944 if let Some(parent) = index_tmp_path.parent() {
946 fs::create_dir_all(parent)?;
947 }
948
949 if index_tmp_path.exists() {
951 fs::remove_file(&index_tmp_path)?;
952 }
953
954 let mut index_writer = std::io::BufWriter::new(
955 OpenOptions::new()
956 .create(true)
957 .write(true)
958 .truncate(true)
959 .open(&index_tmp_path)?,
960 );
961
962 self.chain_hash = GENESIS_CHAIN_HASH.to_string();
963 self.total_bytes = 0;
964 self.last_entry_id = None;
965 self.last_crc32c = "00000000".to_string();
966
967 let segment_files = self.list_segment_files()?;
968 let mut last_observed_seq = 0u64;
969
970 'segments: for (i, (_seg_seq, seg_path)) in segment_files.iter().enumerate() {
971 let file = File::open(seg_path)?;
972 let mut reader = BufReader::new(file);
973 let mut byte_offset = 0u64;
974 let mut line_number = 0u64;
975 let mut line = String::new();
976
977 loop {
978 line.clear();
979 let bytes_read = match read_line_with_limit(
981 &mut reader,
982 &mut line,
983 MAX_FRAME_READ_BYTES,
984 ) {
985 Ok(n) => n,
986 Err(e) if e.kind() == std::io::ErrorKind::InvalidData => {
987 tracing::warn!(
991 segment = %seg_path.display(),
992 line_number,
993 error = %e,
994 "SessionStoreV2 encountered oversized line during index rebuild; truncating segment"
995 );
996 drop(reader);
997 truncate_file_to(seg_path, byte_offset)?;
998 remove_orphaned_segments(&segment_files[i + 1..])?;
999 break 'segments;
1000 }
1001 Err(e) => return Err(Error::Io(Box::new(e))),
1002 };
1003
1004 if bytes_read == 0 {
1005 break;
1006 }
1007 line_number = line_number.saturating_add(1);
1008 let line_len = u64::try_from(bytes_read)
1009 .map_err(|_| Error::session("line length exceeds u64"))?;
1010
1011 if line.trim().is_empty() {
1012 byte_offset = byte_offset.saturating_add(line_len);
1013 continue;
1014 }
1015
1016 let missing_newline = !line.ends_with('\n');
1017 let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1018 let frame: SegmentFrame = match serde_json::from_str(json_line) {
1019 Ok(frame) => {
1020 if missing_newline {
1021 tracing::warn!(
1025 segment = %seg_path.display(),
1026 line_number,
1027 "SessionStoreV2 dropping valid but newline-missing trailing segment frame during index rebuild"
1028 );
1029 drop(reader);
1030 truncate_file_to(seg_path, byte_offset)?;
1031 remove_orphaned_segments(&segment_files[i + 1..])?;
1032 break 'segments;
1033 }
1034 frame
1035 }
1036 Err(err) => {
1037 let at_eof = reader.fill_buf()?.is_empty();
1038 if at_eof && missing_newline {
1039 tracing::warn!(
1040 segment = %seg_path.display(),
1041 line_number,
1042 error = %err,
1043 "SessionStoreV2 dropping truncated trailing segment frame during index rebuild"
1044 );
1045 drop(reader);
1047 truncate_file_to(seg_path, byte_offset)?;
1048 remove_orphaned_segments(&segment_files[i + 1..])?;
1049 break 'segments;
1050 }
1051 return Err(Error::session(format!(
1053 "failed to parse segment frame while rebuilding index: \
1054 segment={} line={line_number}: {err}",
1055 seg_path.display()
1056 )));
1057 }
1058 };
1059
1060 if frame.entry_seq <= last_observed_seq {
1061 tracing::warn!(
1062 segment = %seg_path.display(),
1063 line_number,
1064 entry_seq = frame.entry_seq,
1065 last_seq = last_observed_seq,
1066 "SessionStoreV2 detected non-monotonic entry sequence during rebuild; truncating segment"
1067 );
1068 drop(reader);
1069 truncate_file_to(seg_path, byte_offset)?;
1070 remove_orphaned_segments(&segment_files[i + 1..])?;
1071 break 'segments;
1072 }
1073 last_observed_seq = frame.entry_seq;
1074
1075 let record_bytes = line.as_bytes().to_vec();
1076 let crc = crc32c_upper(&record_bytes);
1077
1078 let index_entry = OffsetIndexEntry {
1079 schema: Cow::Borrowed(OFFSET_INDEX_SCHEMA),
1080 entry_seq: frame.entry_seq,
1081 entry_id: frame.entry_id.clone(),
1082 segment_seq: frame.segment_seq,
1083 frame_seq: frame.frame_seq,
1084 byte_offset,
1085 byte_length: line_len,
1086 crc32c: crc.clone(),
1087 state: Cow::Borrowed("active"),
1088 };
1089 serde_json::to_writer(&mut index_writer, &index_entry)?;
1090 index_writer.write_all(b"\n")?;
1091
1092 self.chain_hash = chain_hash_step(&self.chain_hash, &frame.payload_sha256);
1093 self.total_bytes = self.total_bytes.saturating_add(line_len);
1094 self.last_entry_id = Some(frame.entry_id);
1095 self.last_crc32c = crc;
1096
1097 byte_offset = byte_offset.saturating_add(line_len);
1098 rebuilt_count = rebuilt_count.saturating_add(1);
1099 }
1100 }
1101
1102 index_writer.flush()?;
1103 drop(index_writer); fs::rename(&index_tmp_path, &index_path)?;
1107
1108 self.next_segment_seq = 1;
1109 self.next_frame_seq = 1;
1110 self.next_entry_seq = 1;
1111 self.current_segment_bytes = 0;
1112 self.bootstrap_from_disk()?;
1113
1114 Ok(rebuilt_count)
1115 }
1116
1117 pub fn validate_integrity(&self) -> Result<()> {
1118 let index_rows = self.read_index()?;
1119 let mut last_entry_seq = 0;
1120
1121 let mut rows_by_segment: std::collections::BTreeMap<u64, Vec<&OffsetIndexEntry>> =
1123 std::collections::BTreeMap::new();
1124 for row in &index_rows {
1125 if row.entry_seq <= last_entry_seq {
1126 return Err(Error::session(format!(
1127 "entry sequence is not strictly increasing at entry_seq={}",
1128 row.entry_seq
1129 )));
1130 }
1131 last_entry_seq = row.entry_seq;
1132 rows_by_segment
1133 .entry(row.segment_seq)
1134 .or_default()
1135 .push(row);
1136 }
1137
1138 for (segment_seq, rows) in rows_by_segment {
1139 let segment_path = self.segment_file_path(segment_seq);
1140 let mut file = File::open(&segment_path).map_err(|err| {
1141 Error::session(format!(
1142 "failed to open segment {}: {err}",
1143 segment_path.display()
1144 ))
1145 })?;
1146 let segment_len = file.metadata()?.len();
1147
1148 for row in rows {
1149 let end = row
1150 .byte_offset
1151 .checked_add(row.byte_length)
1152 .ok_or_else(|| Error::session("index byte range overflow"))?;
1153 if end > segment_len {
1154 return Err(Error::session(format!(
1155 "index out of bounds for segment {}: end={} len={segment_len}",
1156 segment_path.display(),
1157 end
1158 )));
1159 }
1160
1161 file.seek(SeekFrom::Start(row.byte_offset))?;
1162 let mut record_bytes = vec![
1163 0u8;
1164 usize::try_from(row.byte_length).map_err(|_| {
1165 Error::session(format!("byte length too large: {}", row.byte_length))
1166 })?
1167 ];
1168 file.read_exact(&mut record_bytes)?;
1169
1170 let checksum = crc32c_upper(&record_bytes);
1171 if checksum != row.crc32c {
1172 return Err(Error::session(format!(
1173 "checksum mismatch for entry_seq={} expected={} actual={checksum}",
1174 row.entry_seq, row.crc32c
1175 )));
1176 }
1177
1178 if record_bytes.last() == Some(&b'\n') {
1179 record_bytes.pop();
1180 }
1181 let frame: SegmentFrame = serde_json::from_slice(&record_bytes)?;
1182
1183 if frame.entry_seq != row.entry_seq
1184 || frame.entry_id != row.entry_id
1185 || frame.segment_seq != row.segment_seq
1186 || frame.frame_seq != row.frame_seq
1187 {
1188 return Err(Error::session(format!(
1189 "index/frame mismatch at entry_seq={}",
1190 row.entry_seq
1191 )));
1192 }
1193
1194 let (payload_hash, payload_bytes) = payload_hash_and_size(&frame.payload)?;
1195 if frame.payload_sha256 != payload_hash || frame.payload_bytes != payload_bytes {
1196 return Err(Error::session(format!(
1197 "payload integrity mismatch at entry_seq={}",
1198 row.entry_seq
1199 )));
1200 }
1201 }
1202 }
1203
1204 Ok(())
1205 }
1206
1207 fn bootstrap_from_disk(&mut self) -> Result<()> {
1208 let index_rows = self.read_index()?;
1209 if let Some(last) = index_rows.last() {
1210 self.next_entry_seq = last
1211 .entry_seq
1212 .checked_add(1)
1213 .ok_or_else(|| Error::session("entry sequence overflow while bootstrapping"))?;
1214 self.next_segment_seq = last.segment_seq;
1215 self.next_frame_seq = last
1216 .frame_seq
1217 .checked_add(1)
1218 .ok_or_else(|| Error::session("frame sequence overflow while bootstrapping"))?;
1219 let segment_path = self.segment_file_path(last.segment_seq);
1220 self.current_segment_bytes = fs::metadata(&segment_path)
1221 .map(|meta| meta.len())
1222 .map_err(|err| {
1223 Error::session(format!(
1224 "failed to stat active segment {} while bootstrapping: {err}",
1225 segment_path.display()
1226 ))
1227 })?;
1228 self.last_entry_id = Some(last.entry_id.clone());
1229 self.last_crc32c.clone_from(&last.crc32c);
1230
1231 let mut chain = GENESIS_CHAIN_HASH.to_string();
1232 let mut total = 0u64;
1233 for row in &index_rows {
1234 let frame = seek_read_frame(self, row)?.ok_or_else(|| {
1235 Error::session(format!(
1236 "index references missing frame during bootstrap: entry_seq={}, segment={}",
1237 row.entry_seq, row.segment_seq
1238 ))
1239 })?;
1240 chain = chain_hash_step(&chain, &frame.payload_sha256);
1241 total = total.saturating_add(row.byte_length);
1242 }
1243 self.chain_hash = chain;
1244 self.total_bytes = total;
1245 } else {
1246 self.chain_hash = GENESIS_CHAIN_HASH.to_string();
1247 self.total_bytes = 0;
1248 self.last_entry_id = None;
1249 self.last_crc32c = "00000000".to_string();
1250 }
1251 Ok(())
1252 }
1253}
1254
1255fn rollback_failure_event_already_recorded(error: &Error) -> bool {
1256 matches!(error, Error::Session(message) if message.contains("rollback verification failed"))
1257}
1258
1259fn classify_rollback_error(error: &Error) -> &'static str {
1260 match error {
1261 Error::Session(message) => {
1262 if message.contains("checkpoint") && message.contains("not found") {
1263 "checkpoint_not_found"
1264 } else if message.contains("index byte range overflow") {
1265 "index_range_overflow"
1266 } else if message.contains("rollback verification failed") {
1267 "rollback_verification_failed"
1268 } else {
1269 "session_error"
1270 }
1271 }
1272 _ => error.category_code(),
1273 }
1274}
1275
1276fn is_recoverable_index_error(error: &Error) -> bool {
1277 match error {
1278 Error::Json(_) => true,
1279 Error::Io(err) => matches!(
1280 err.kind(),
1281 std::io::ErrorKind::UnexpectedEof | std::io::ErrorKind::InvalidData
1282 ),
1283 Error::Session(message) => {
1284 let lower = message.to_ascii_lowercase();
1285 lower.contains("checksum mismatch")
1286 || lower.contains("index out of bounds")
1287 || lower.contains("index/frame mismatch")
1288 || lower.contains("payload integrity mismatch")
1289 || lower.contains("entry sequence is not strictly increasing")
1290 || lower.contains("index byte range overflow")
1291 }
1292 _ => false,
1293 }
1294}
1295
1296pub fn frame_to_session_entry(frame: &SegmentFrame) -> Result<SessionEntry> {
1298 let entry: SessionEntry = serde_json::from_str(frame.payload.get()).map_err(|e| {
1301 Error::session(format!(
1302 "failed to deserialize SessionEntry from frame entry_id={}: {e}",
1303 frame.entry_id
1304 ))
1305 })?;
1306
1307 if let Some(base_id) = entry.base_id() {
1308 if base_id != &frame.entry_id {
1309 return Err(Error::session(format!(
1310 "frame entry_id mismatch: frame={} entry={}",
1311 frame.entry_id, base_id
1312 )));
1313 }
1314 }
1315
1316 Ok(entry)
1317}
1318
1319pub fn session_entry_to_frame_args(
1321 entry: &SessionEntry,
1322) -> Result<(String, Option<String>, String, Value)> {
1323 let base = entry.base();
1324 let entry_id = base
1325 .id
1326 .clone()
1327 .ok_or_else(|| Error::session("SessionEntry has no id"))?;
1328 let parent_entry_id = base.parent_id.clone();
1329
1330 let entry_type = match entry {
1331 SessionEntry::Message(_) => "message",
1332 SessionEntry::ModelChange(_) => "model_change",
1333 SessionEntry::ThinkingLevelChange(_) => "thinking_level_change",
1334 SessionEntry::Compaction(_) => "compaction",
1335 SessionEntry::BranchSummary(_) => "branch_summary",
1336 SessionEntry::Label(_) => "label",
1337 SessionEntry::SessionInfo(_) => "session_info",
1338 SessionEntry::Custom(_) => "custom",
1339 };
1340
1341 let payload = serde_json::to_value(entry).map_err(|e| {
1342 Error::session(format!(
1343 "failed to serialize SessionEntry to frame payload: {e}"
1344 ))
1345 })?;
1346
1347 Ok((entry_id, parent_entry_id, entry_type.to_string(), payload))
1348}
1349
1350fn seek_read_frame(store: &SessionStoreV2, row: &OffsetIndexEntry) -> Result<Option<SegmentFrame>> {
1352 let segment_path = store.segment_file_path(row.segment_seq);
1353 if !segment_path.exists() {
1354 return Ok(None);
1355 }
1356 let mut file = File::open(&segment_path)?;
1357 let file_len = file.metadata()?.len();
1358 let end_offset = row
1359 .byte_offset
1360 .checked_add(row.byte_length)
1361 .ok_or_else(|| Error::session("index byte range overflow"))?;
1362
1363 if end_offset > file_len {
1364 return Err(Error::session(format!(
1365 "index out of bounds for segment {}: end={} len={}",
1366 segment_path.display(),
1367 end_offset,
1368 file_len
1369 )));
1370 }
1371
1372 file.seek(SeekFrom::Start(row.byte_offset))?;
1373 let byte_len = usize::try_from(row.byte_length)
1374 .map_err(|_| Error::session(format!("byte length too large: {}", row.byte_length)))?;
1375
1376 if row.byte_length > store.max_segment_bytes.max(100 * 1024 * 1024) {
1377 return Err(Error::session(format!(
1378 "frame byte length {byte_len} exceeds limit"
1379 )));
1380 }
1381
1382 let mut buf = vec![0u8; byte_len];
1383 file.read_exact(&mut buf)?;
1384 if buf.last() == Some(&b'\n') {
1385 buf.pop();
1386 }
1387 let frame: SegmentFrame = serde_json::from_slice(&buf)?;
1388 Ok(Some(frame))
1389}
1390
1391fn chain_hash_step(prev_chain: &str, payload_sha256: &str) -> String {
1393 let mut hasher = Sha256::new();
1394 hasher.update(prev_chain.as_bytes());
1395 hasher.update(payload_sha256.as_bytes());
1396 format!("{:x}", hasher.finalize())
1397}
1398
1399fn manifest_hash_hex(manifest: &Manifest) -> Result<String> {
1400 let encoded = serde_json::to_vec(manifest)?;
1401 Ok(format!("{:x}", Sha256::digest(&encoded)))
1402}
1403
1404pub fn v2_sidecar_path(jsonl_path: &Path) -> PathBuf {
1406 let stem = jsonl_path.file_stem().map_or_else(
1407 || "session".to_string(),
1408 |s| s.to_string_lossy().into_owned(),
1409 );
1410 let parent = jsonl_path.parent().unwrap_or_else(|| Path::new("."));
1411 parent.join(format!("{stem}.v2"))
1412}
1413
1414pub fn has_v2_sidecar(jsonl_path: &Path) -> bool {
1416 let root = v2_sidecar_path(jsonl_path);
1417 root.join("manifest.json").exists() || root.join("index").join("offsets.jsonl").exists()
1418}
1419
1420fn append_jsonl_line<T: Serialize>(path: &Path, value: &T) -> Result<()> {
1421 let mut file = OpenOptions::new().create(true).append(true).open(path)?;
1422 serde_json::to_writer(&mut file, value)?;
1424 file.write_all(b"\n")?;
1425 Ok(())
1426}
1427
1428fn truncate_file_to(path: &Path, len: u64) -> Result<()> {
1429 let file = OpenOptions::new().write(true).truncate(false).open(path)?;
1430 file.set_len(len)?;
1431 Ok(())
1432}
1433
1434fn write_jsonl_lines<T: Serialize>(path: &Path, rows: &[T]) -> Result<()> {
1435 let file = OpenOptions::new()
1436 .create(true)
1437 .write(true)
1438 .truncate(true)
1439 .open(path)?;
1440 let mut writer = std::io::BufWriter::new(file);
1441 for row in rows {
1442 serde_json::to_writer(&mut writer, row)?;
1443 writer.write_all(b"\n")?;
1444 }
1445 writer.flush()?;
1446 Ok(())
1447}
1448
1449fn read_jsonl<T: for<'de> Deserialize<'de>>(path: &Path) -> Result<Vec<T>> {
1450 let file = File::open(path)?;
1451 let mut reader = BufReader::new(file);
1452 let mut out = Vec::new();
1453 let mut line = String::new();
1454 loop {
1455 line.clear();
1456 let bytes_read = read_line_with_limit(&mut reader, &mut line, MAX_FRAME_READ_BYTES)
1457 .map_err(|e| Error::Io(Box::new(e)))?;
1458 if bytes_read == 0 {
1459 break;
1460 }
1461 if line.trim().is_empty() {
1462 continue;
1463 }
1464 let json_line = line.trim_end_matches('\n').trim_end_matches('\r');
1465 out.push(serde_json::from_str::<T>(json_line)?);
1466 }
1467 Ok(out)
1468}
1469
1470fn payload_hash_and_size(payload: &RawValue) -> Result<(String, u64)> {
1471 let bytes = payload.get().as_bytes();
1473 let payload_bytes = u64::try_from(bytes.len())
1474 .map_err(|_| Error::session(format!("payload is too large: {} bytes", bytes.len())))?;
1475 let hash = format!("{:x}", Sha256::digest(bytes));
1476 Ok((hash, payload_bytes))
1477}
1478
1479fn line_length_u64(encoded: &[u8]) -> Result<u64> {
1480 let line_len = encoded
1481 .len()
1482 .checked_add(1)
1483 .ok_or_else(|| Error::session("line length overflow"))?;
1484 u64::try_from(line_len).map_err(|_| Error::session("line length exceeds u64"))
1485}
1486
1487fn crc32c_upper(data: &[u8]) -> String {
1488 let crc = crc32c::crc32c(data);
1489 format!("{crc:08X}")
1490}
1491
1492fn remove_orphaned_segments(segments: &[(u64, PathBuf)]) -> Result<()> {
1493 for (_, orphan_path) in segments {
1494 if orphan_path.exists() {
1495 tracing::warn!(
1496 path = %orphan_path.display(),
1497 "removing orphaned segment file after index rebuild truncation"
1498 );
1499 if let Err(e) = fs::remove_file(orphan_path) {
1500 if e.kind() != std::io::ErrorKind::NotFound {
1501 return Err(Error::Io(Box::new(e)));
1502 }
1503 }
1504 }
1505 }
1506 Ok(())
1507}
1508
1509fn read_line_with_limit<R: BufRead>(
1510 reader: &mut R,
1511 buf: &mut String,
1512 limit: u64,
1513) -> std::io::Result<usize> {
1514 let mut take = reader.take(limit);
1515 let n = take.read_line(buf)?;
1516 if n > 0 && take.limit() == 0 && !buf.ends_with('\n') {
1517 return Err(std::io::Error::new(
1518 std::io::ErrorKind::InvalidData,
1519 format!("Line length exceeds limit of {limit} bytes"),
1520 ));
1521 }
1522 Ok(n)
1523}
1524
1525#[cfg(test)]
1526mod proptests {
1527 use super::*;
1528 use proptest::prelude::*;
1529 use serde_json::json;
1530
1531 proptest! {
1536 #[test]
1537 fn chain_hash_output_is_64_hex(
1538 a in "[0-9a-f]{64}",
1539 b in "[0-9a-f]{64}",
1540 ) {
1541 let result = chain_hash_step(&a, &b);
1542 assert_eq!(result.len(), 64);
1543 assert!(result.chars().all(|c| c.is_ascii_hexdigit()));
1544 }
1545
1546 #[test]
1547 fn chain_hash_deterministic(
1548 a in "[0-9a-f]{64}",
1549 b in "[0-9a-f]{64}",
1550 ) {
1551 assert_eq!(chain_hash_step(&a, &b), chain_hash_step(&a, &b));
1552 }
1553
1554 #[test]
1555 fn chain_hash_non_commutative(
1556 a in "[0-9a-f]{64}",
1557 b in "[0-9a-f]{64}",
1558 ) {
1559 if a != b {
1560 assert_ne!(chain_hash_step(&a, &b), chain_hash_step(&b, &a));
1561 }
1562 }
1563
1564 #[test]
1565 fn chain_hash_genesis_differs_from_step(payload in "[0-9a-f]{64}") {
1566 let step1 = chain_hash_step(GENESIS_CHAIN_HASH, &payload);
1567 assert_ne!(step1, GENESIS_CHAIN_HASH);
1568 }
1569 }
1570
1571 proptest! {
1576 #[test]
1577 fn crc32c_output_is_8_uppercase_hex(data in prop::collection::vec(any::<u8>(), 0..500)) {
1578 let result = crc32c_upper(&data);
1579 assert_eq!(result.len(), 8);
1580 assert!(result.chars().all(|c| matches!(c, '0'..='9' | 'A'..='F')));
1581 }
1582
1583 #[test]
1584 fn crc32c_deterministic(data in prop::collection::vec(any::<u8>(), 0..500)) {
1585 assert_eq!(crc32c_upper(&data), crc32c_upper(&data));
1586 }
1587
1588 #[test]
1589 fn crc32c_single_bit_sensitivity(byte in any::<u8>()) {
1590 let a = crc32c_upper(&[byte]);
1591 let b = crc32c_upper(&[byte ^ 1]);
1592 if byte != byte ^ 1 {
1593 assert_ne!(a, b, "flipping LSB should change CRC");
1594 }
1595 }
1596 }
1597
1598 proptest! {
1603 #[test]
1604 fn payload_hash_is_64_hex(s in "[a-z]{0,50}") {
1605 let val = json!(s);
1606 let raw_string = serde_json::to_string(&val).unwrap();
1607 let raw = RawValue::from_string(raw_string).unwrap();
1608 let (hash, _size) = payload_hash_and_size(&raw).unwrap();
1609 assert_eq!(hash.len(), 64);
1610 assert!(hash.chars().all(|c| c.is_ascii_hexdigit()));
1611 }
1612
1613 #[test]
1614 fn payload_size_matches_serialization(s in "[a-z]{0,50}") {
1615 let val = json!(s);
1616 let raw_string = serde_json::to_string(&val).unwrap();
1617 let raw = RawValue::from_string(raw_string).unwrap();
1618 let (_, size) = payload_hash_and_size(&raw).unwrap();
1619 let expected = serde_json::to_vec(&val).unwrap().len() as u64;
1620 assert_eq!(size, expected);
1621 }
1622
1623 #[test]
1624 fn payload_hash_deterministic(n in 0i64..10000) {
1625 let val = json!(n);
1626 let raw_string = serde_json::to_string(&val).unwrap();
1627 let raw = RawValue::from_string(raw_string).unwrap();
1628 let (h1, s1) = payload_hash_and_size(&raw).unwrap();
1629 let (h2, s2) = payload_hash_and_size(&raw).unwrap();
1630 assert_eq!(h1, h2);
1631 assert_eq!(s1, s2);
1632 }
1633 }
1634
1635 proptest! {
1640 #[test]
1641 fn line_length_is_len_plus_one(data in prop::collection::vec(any::<u8>(), 0..1000)) {
1642 let result = line_length_u64(&data).unwrap();
1643 assert_eq!(result, data.len() as u64 + 1);
1644 }
1645
1646 #[test]
1647 fn line_length_never_zero(data in prop::collection::vec(any::<u8>(), 0..100)) {
1648 let result = line_length_u64(&data).unwrap();
1649 assert!(result >= 1);
1650 }
1651 }
1652
1653 proptest! {
1658 #[test]
1659 fn sidecar_path_ends_with_v2(stem in "[a-z]{1,10}") {
1660 let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
1661 let result = v2_sidecar_path(&input);
1662 let name = result.file_name().unwrap().to_str().unwrap();
1663 assert_eq!(
1664 Path::new(name).extension().and_then(|ext| ext.to_str()),
1665 Some("v2"),
1666 "expected .v2 suffix, got {name}"
1667 );
1668 }
1669
1670 #[test]
1671 fn sidecar_path_preserves_parent(stem in "[a-z]{1,10}", dir in "[a-z]{1,8}") {
1672 let input = PathBuf::from(format!("/tmp/{dir}/{stem}.jsonl"));
1673 let result = v2_sidecar_path(&input);
1674 assert_eq!(
1675 result.parent().unwrap(),
1676 Path::new(&format!("/tmp/{dir}"))
1677 );
1678 }
1679
1680 #[test]
1681 fn sidecar_path_deterministic(stem in "[a-z]{1,10}") {
1682 let input = PathBuf::from(format!("/sessions/{stem}.jsonl"));
1683 assert_eq!(v2_sidecar_path(&input), v2_sidecar_path(&input));
1684 }
1685
1686 #[test]
1687 fn sidecar_path_contains_stem(stem in "[a-z]{1,10}") {
1688 let input = PathBuf::from(format!("/tmp/{stem}.jsonl"));
1689 let result = v2_sidecar_path(&input);
1690 let name = result.file_name().unwrap().to_str().unwrap();
1691 assert_eq!(name, format!("{stem}.v2"));
1692 }
1693 }
1694}