1use std::collections::HashMap;
38use std::fs::{self, OpenOptions};
39use std::io::Write;
40use std::path::PathBuf;
41
42use bytes::Bytes;
43use kimberlite_crypto::ChainHash;
44use kimberlite_types::{CheckpointPolicy, CompressionKind, Offset, RecordKind, StreamId};
45
46use crate::checkpoint::{
47 CheckpointIndex, deserialize_checkpoint_payload, serialize_checkpoint_payload,
48};
49use crate::codec::CodecRegistry;
50use crate::{OffsetIndex, Record, StorageError};
51
52const INDEX_FLUSH_THRESHOLD: usize = 100;
54
55const DEFAULT_MAX_SEGMENT_SIZE: u64 = 256 * 1024 * 1024;
57
58const MANIFEST_FILENAME: &str = "manifest.json";
60
61fn segment_filename(segment_num: u32) -> String {
63 format!("segment_{segment_num:06}.log")
64}
65
66fn segment_index_filename(segment_num: u32) -> String {
68 format!("segment_{segment_num:06}.log.idx")
69}
70
71#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
73struct SegmentMeta {
74 segment_num: u32,
76 first_offset: u64,
78 next_offset: u64,
81 size_bytes: u64,
83}
84
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
87struct SegmentManifest {
88 segments: Vec<SegmentMeta>,
90 active_segment: u32,
92}
93
94impl SegmentManifest {
95 fn new() -> Self {
97 Self {
98 segments: vec![SegmentMeta {
99 segment_num: 0,
100 first_offset: 0,
101 next_offset: 0,
102 size_bytes: 0,
103 }],
104 active_segment: 0,
105 }
106 }
107
108 fn save(&self, stream_dir: &std::path::Path) -> Result<(), StorageError> {
110 let path = stream_dir.join(MANIFEST_FILENAME);
111 let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
112 fs::write(path, json)?;
113 Ok(())
114 }
115
116 fn load(stream_dir: &std::path::Path) -> Result<Self, StorageError> {
118 let path = stream_dir.join(MANIFEST_FILENAME);
119 let json = fs::read_to_string(path)?;
120 let manifest: Self = serde_json::from_str(&json).map_err(std::io::Error::other)?;
121 Ok(manifest)
122 }
123
124 fn active_mut(&mut self) -> &mut SegmentMeta {
126 self.segments
127 .iter_mut()
128 .find(|s| s.segment_num == self.active_segment)
129 .expect("active segment must exist in manifest")
130 }
131
132 fn find_segment_for_offset(&self, offset: u64) -> Option<&SegmentMeta> {
134 match self
136 .segments
137 .binary_search_by_key(&offset, |s| s.first_offset)
138 {
139 Ok(idx) => Some(&self.segments[idx]),
140 Err(idx) => {
141 if idx == 0 {
142 None
143 } else {
144 let seg = &self.segments[idx - 1];
145 if offset < seg.next_offset {
146 Some(seg)
147 } else {
148 self.segments.last()
150 }
151 }
152 }
153 }
154 }
155
156 fn rotate(&mut self, first_offset: u64) -> u32 {
158 let new_num = self.active_segment + 1;
159 self.segments.push(SegmentMeta {
160 segment_num: new_num,
161 first_offset,
162 next_offset: first_offset,
163 size_bytes: 0,
164 });
165 self.active_segment = new_num;
166 new_num
167 }
168}
169
170#[derive(Debug)]
184pub struct Storage {
185 data_dir: PathBuf,
187
188 index_cache: HashMap<(StreamId, u32), OffsetIndex>,
191
192 checkpoint_cache: HashMap<StreamId, CheckpointIndex>,
195
196 checkpoint_policy: CheckpointPolicy,
198
199 index_dirty_count: HashMap<(StreamId, u32), usize>,
202
203 manifests: HashMap<StreamId, SegmentManifest>,
205
206 max_segment_size: u64,
208
209 index_flushed_count: HashMap<(StreamId, u32), usize>,
212
213 segment_data_cache: HashMap<(StreamId, u32), Bytes>,
219
220 default_compression: CompressionKind,
222
223 codec_registry: CodecRegistry,
225}
226
227impl Storage {
228 pub fn new(data_dir: impl Into<PathBuf>) -> Self {
233 Self::with_checkpoint_policy(data_dir, CheckpointPolicy::default())
234 }
235
236 pub fn with_checkpoint_policy(
238 data_dir: impl Into<PathBuf>,
239 checkpoint_policy: CheckpointPolicy,
240 ) -> Self {
241 Self {
242 data_dir: data_dir.into(),
243 index_cache: HashMap::new(),
244 checkpoint_cache: HashMap::new(),
245 checkpoint_policy,
246 index_dirty_count: HashMap::new(),
247 manifests: HashMap::new(),
248 max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
249 index_flushed_count: HashMap::new(),
250 segment_data_cache: HashMap::new(),
251 default_compression: CompressionKind::None,
252 codec_registry: CodecRegistry::new(),
253 }
254 }
255
256 pub fn with_max_segment_size(
258 data_dir: impl Into<PathBuf>,
259 checkpoint_policy: CheckpointPolicy,
260 max_segment_size: u64,
261 ) -> Self {
262 Self {
263 data_dir: data_dir.into(),
264 index_cache: HashMap::new(),
265 checkpoint_cache: HashMap::new(),
266 checkpoint_policy,
267 index_dirty_count: HashMap::new(),
268 manifests: HashMap::new(),
269 max_segment_size,
270 index_flushed_count: HashMap::new(),
271 segment_data_cache: HashMap::new(),
272 default_compression: CompressionKind::None,
273 codec_registry: CodecRegistry::new(),
274 }
275 }
276
277 pub fn with_compression(
279 data_dir: impl Into<PathBuf>,
280 checkpoint_policy: CheckpointPolicy,
281 compression: CompressionKind,
282 ) -> Self {
283 Self {
284 data_dir: data_dir.into(),
285 index_cache: HashMap::new(),
286 checkpoint_cache: HashMap::new(),
287 checkpoint_policy,
288 index_dirty_count: HashMap::new(),
289 manifests: HashMap::new(),
290 max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
291 index_flushed_count: HashMap::new(),
292 segment_data_cache: HashMap::new(),
293 default_compression: compression,
294 codec_registry: CodecRegistry::new(),
295 }
296 }
297
298 pub fn default_compression(&self) -> CompressionKind {
300 self.default_compression
301 }
302
303 #[cfg(feature = "fuzz-reset")]
316 pub fn reset(&mut self) -> Result<(), crate::error::StorageError> {
317 if self.data_dir.exists() {
318 std::fs::remove_dir_all(&self.data_dir)?;
319 }
320 std::fs::create_dir_all(&self.data_dir)?;
321 self.index_cache.clear();
322 self.checkpoint_cache.clear();
323 self.index_dirty_count.clear();
324 self.manifests.clear();
325 self.index_flushed_count.clear();
326 self.segment_data_cache.clear();
327 Ok(())
328 }
329
330 pub fn checkpoint_policy(&self) -> &CheckpointPolicy {
332 &self.checkpoint_policy
333 }
334
335 pub fn data_dir(&self) -> &PathBuf {
337 &self.data_dir
338 }
339
340 pub fn max_segment_size(&self) -> u64 {
342 self.max_segment_size
343 }
344
345 fn stream_dir(&self, stream_id: StreamId) -> PathBuf {
347 self.data_dir.join(stream_id.to_string())
348 }
349
350 fn segment_path_for(&self, stream_id: StreamId, segment_num: u32) -> PathBuf {
352 self.stream_dir(stream_id)
353 .join(segment_filename(segment_num))
354 }
355
356 fn index_path_for(&self, stream_id: StreamId, segment_num: u32) -> PathBuf {
358 self.stream_dir(stream_id)
359 .join(segment_index_filename(segment_num))
360 }
361
362 pub fn index_path(&self, stream_id: StreamId) -> PathBuf {
364 let segment_num = self
365 .manifests
366 .get(&stream_id)
367 .map_or(0, |m| m.active_segment);
368 self.index_path_for(stream_id, segment_num)
369 }
370
371 fn get_or_load_manifest(
375 &mut self,
376 stream_id: StreamId,
377 ) -> Result<&mut SegmentManifest, StorageError> {
378 if !self.manifests.contains_key(&stream_id) {
379 let stream_dir = self.stream_dir(stream_id);
380 let manifest = if stream_dir.join(MANIFEST_FILENAME).exists() {
381 SegmentManifest::load(&stream_dir)?
382 } else {
383 SegmentManifest::new()
384 };
385 self.manifests.insert(stream_id, manifest);
386 }
387 Ok(self.manifests.get_mut(&stream_id).expect("just inserted"))
388 }
389
390 pub fn rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
404 let segment_num = self
405 .manifests
406 .get(&stream_id)
407 .map_or(0, |m| m.active_segment);
408 self.rebuild_index_for_segment(stream_id, segment_num)
409 }
410
411 fn rebuild_index_for_segment(
423 &self,
424 stream_id: StreamId,
425 segment_num: u32,
426 ) -> Result<OffsetIndex, StorageError> {
427 let segment_path = self.segment_path_for(stream_id, segment_num);
428
429 if !segment_path.exists() {
430 return Ok(OffsetIndex::new());
431 }
432
433 let data: Bytes = fs::read(&segment_path)?.into();
434 let mut index = OffsetIndex::new();
435 let mut pos = 0;
436
437 loop {
438 if pos >= data.len() {
439 break;
440 }
441
442 match Record::from_bytes(&data.slice(pos..)) {
443 Ok((_, consumed)) => {
444 index.append(pos as u64);
445 pos += consumed;
446 }
447 Err(StorageError::TornWrite { ref reason }) => {
448 tracing::warn!(
455 stream_id = %stream_id,
456 segment_num = segment_num,
457 torn_byte_offset = pos,
458 complete_records = index.len(),
459 reason = %reason,
460 "torn write detected during recovery — truncating log at last complete record"
461 );
462
463 let file = fs::OpenOptions::new().write(true).open(&segment_path)?;
466 file.set_len(pos as u64)?;
467
468 tracing::info!(
469 stream_id = %stream_id,
470 segment_num = segment_num,
471 truncated_to_bytes = pos,
472 "log truncated to last complete record"
473 );
474 break;
475 }
476 Err(StorageError::UnexpectedEof) => {
477 tracing::warn!(
480 stream_id = %stream_id,
481 segment_num = segment_num,
482 partial_byte_offset = pos,
483 complete_records = index.len(),
484 "unexpected EOF during recovery — truncating log at last complete record"
485 );
486
487 let file = fs::OpenOptions::new().write(true).open(&segment_path)?;
488 file.set_len(pos as u64)?;
489 break;
490 }
491 Err(e) => {
492 return Err(e);
496 }
497 }
498 }
499
500 let index_path = self.index_path_for(stream_id, segment_num);
501 index.save(&index_path)?;
502
503 Ok(index)
504 }
505
506 fn load_or_rebuild_index_for_segment(
511 &self,
512 stream_id: StreamId,
513 segment_num: u32,
514 ) -> Result<OffsetIndex, StorageError> {
515 let index_path = self.index_path_for(stream_id, segment_num);
516
517 if !index_path.exists() {
520 let wal_path = index_path.with_extension("idx.wal");
521 if !wal_path.exists() {
522 return self.rebuild_index_for_segment(stream_id, segment_num);
523 }
524 }
525
526 if let Ok(index) = OffsetIndex::load_with_wal(&index_path) {
528 return Ok(index);
529 }
530
531 if let Ok(index) = OffsetIndex::load(&index_path) {
533 return Ok(index);
534 }
535
536 tracing::warn!(
538 stream_id = %stream_id,
539 segment_num = segment_num,
540 "index corrupted, rebuilding from log"
541 );
542 self.rebuild_index_for_segment(stream_id, segment_num)
543 }
544
545 pub fn load_or_rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
549 let segment_num = self
550 .manifests
551 .get(&stream_id)
552 .map_or(0, |m| m.active_segment);
553 self.load_or_rebuild_index_for_segment(stream_id, segment_num)
554 }
555
556 fn ensure_index_cached(
558 &mut self,
559 stream_id: StreamId,
560 segment_num: u32,
561 ) -> Result<(), StorageError> {
562 let key = (stream_id, segment_num);
563 if !self.index_cache.contains_key(&key) {
564 let loaded = self.load_or_rebuild_index_for_segment(stream_id, segment_num)?;
565 let flushed = loaded.len(); self.index_cache.insert(key, loaded);
567 self.index_flushed_count.insert(key, flushed);
568 }
569 Ok(())
570 }
571
572 fn read_segment_data(
577 &mut self,
578 stream_id: StreamId,
579 segment_num: u32,
580 ) -> Result<Bytes, StorageError> {
581 let is_active = self
582 .manifests
583 .get(&stream_id)
584 .is_some_and(|m| m.active_segment == segment_num);
585
586 if is_active {
587 let path = self.segment_path_for(stream_id, segment_num);
589 Ok(fs::read(&path)?.into())
590 } else {
591 let key = (stream_id, segment_num);
593 if let Some(cached) = self.segment_data_cache.get(&key) {
594 return Ok(cached.clone());
595 }
596
597 let path = self.segment_path_for(stream_id, segment_num);
598 let data: Bytes = fs::read(&path)?.into();
599 self.segment_data_cache.insert(key, data.clone());
600 Ok(data)
601 }
602 }
603
604 fn segment_numbers(&self, stream_id: StreamId) -> Vec<u32> {
606 self.manifests.get(&stream_id).map_or_else(
607 || {
608 if self.segment_path_for(stream_id, 0).exists() {
610 vec![0]
611 } else {
612 vec![]
613 }
614 },
615 |m| m.segments.iter().map(|s| s.segment_num).collect(),
616 )
617 }
618
619 pub fn append_batch(
646 &mut self,
647 stream_id: StreamId,
648 events: Vec<Bytes>,
649 expected_offset: Offset,
650 prev_hash: Option<ChainHash>,
651 fsync: bool,
652 ) -> Result<(Offset, ChainHash), StorageError> {
653 assert!(!events.is_empty(), "cannot append empty batch");
655
656 let event_count = events.len();
657
658 let stream_dir = self.stream_dir(stream_id);
660 fs::create_dir_all(&stream_dir)?;
661
662 let manifest = self.get_or_load_manifest(stream_id)?;
664 let active_seg = manifest.active_segment;
665
666 let segment_path = self.segment_path_for(stream_id, active_seg);
668 let mut file = OpenOptions::new()
669 .create(true)
670 .append(true)
671 .open(&segment_path)?;
672
673 let mut byte_position: u64 = file.metadata()?.len();
675
676 let index_path = self.index_path_for(stream_id, active_seg);
678 let cache_key = (stream_id, active_seg);
679
680 self.ensure_index_cached(stream_id, active_seg)?;
682
683 let index = self
684 .index_cache
685 .get_mut(&cache_key)
686 .expect("index exists: just ensured");
687
688 let mut current_offset = expected_offset;
689 let mut current_hash = prev_hash;
690
691 let compression = self.default_compression;
692
693 for event in events {
694 index.append(byte_position);
696
697 let (stored_payload, record_compression) = if compression == CompressionKind::None {
699 (event.clone(), CompressionKind::None)
700 } else {
701 let compressed = self.codec_registry.compress(compression, &event)?;
702 if compressed.len() < event.len() {
704 (Bytes::from(compressed), compression)
705 } else {
706 (event.clone(), CompressionKind::None)
707 }
708 };
709
710 let hash_record = Record::new(current_offset, current_hash, event);
712 current_hash = Some(hash_record.compute_hash());
713
714 let record = Record::with_compression(
716 current_offset,
717 hash_record.prev_hash(),
718 RecordKind::Data,
719 record_compression,
720 stored_payload,
721 );
722 let record_bytes = record.to_bytes();
723
724 byte_position += record_bytes.len() as u64;
726
727 file.write_all(&record_bytes)?;
728
729 current_offset += Offset::from(1u64);
730 }
731
732 if fsync {
734 file.sync_all()?;
735 }
736
737 let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
739 let active_meta = manifest.active_mut();
740 active_meta.size_bytes = byte_position;
741 active_meta.next_offset = current_offset.as_u64();
742
743 let cache_key_for_flush = (stream_id, active_seg);
747 let dirty = self
748 .index_dirty_count
749 .entry(cache_key_for_flush)
750 .or_insert(0);
751 *dirty += event_count;
752 if *dirty >= INDEX_FLUSH_THRESHOLD || fsync {
753 let index = self
754 .index_cache
755 .get(&cache_key_for_flush)
756 .expect("index exists: just used above");
757 let flushed = *self
758 .index_flushed_count
759 .get(&cache_key_for_flush)
760 .unwrap_or(&0);
761 index.save_incremental(&index_path, flushed, crate::index::MAX_WAL_BYTES)?;
763 self.index_flushed_count
765 .insert(cache_key_for_flush, index.len());
766 *dirty = 0;
767 }
768
769 if byte_position >= self.max_segment_size {
771 self.rotate_segment(stream_id, current_offset)?;
772 }
773
774 let stream_dir = self.stream_dir(stream_id);
776 let manifest = self.manifests.get(&stream_id).expect("manifest loaded");
777 manifest.save(&stream_dir)?;
778
779 debug_assert_eq!(
781 current_offset.as_u64() - expected_offset.as_u64(),
782 event_count as u64,
783 "offset mismatch after batch write"
784 );
785
786 kimberlite_properties::always!(
788 current_offset.as_u64() >= expected_offset.as_u64(),
789 "storage.offset_advances_forward",
790 "offset must only advance forward after append_batch"
791 );
792
793 kimberlite_properties::always!(
795 current_hash.is_some(),
796 "storage.hash_chain_valid_after_append",
797 "hash chain must produce a valid hash after non-empty batch append"
798 );
799
800 Ok((current_offset, current_hash.expect("batch was non-empty")))
801 }
802
803 fn rotate_segment(
807 &mut self,
808 stream_id: StreamId,
809 next_offset: Offset,
810 ) -> Result<(), StorageError> {
811 let old_seg = self
812 .manifests
813 .get(&stream_id)
814 .expect("manifest loaded")
815 .active_segment;
816
817 let old_key = (stream_id, old_seg);
819 if let Some(index) = self.index_cache.get(&old_key) {
820 let index_path = self.index_path_for(stream_id, old_seg);
821 index.save(&index_path)?;
822 }
823 self.index_dirty_count.insert(old_key, 0);
824
825 let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
827 let new_seg = manifest.rotate(next_offset.as_u64());
828
829 tracing::info!(
830 stream_id = %stream_id,
831 old_segment = old_seg,
832 new_segment = new_seg,
833 "rotated segment"
834 );
835
836 Ok(())
837 }
838
839 pub fn read_from(
847 &mut self,
848 stream_id: StreamId,
849 from_offset: Offset,
850 max_bytes: u64,
851 ) -> Result<Vec<Bytes>, StorageError> {
852 let records = self.read_records_verified(stream_id, from_offset, max_bytes)?;
853 Ok(records.into_iter().map(|r| r.payload().clone()).collect())
854 }
855
856 pub fn read_from_genesis(
862 &mut self,
863 stream_id: StreamId,
864 from_offset: Offset,
865 max_bytes: u64,
866 ) -> Result<Vec<Bytes>, StorageError> {
867 let records = self.read_records_from_genesis(stream_id, from_offset, max_bytes)?;
868 Ok(records.into_iter().map(|r| r.payload().clone()).collect())
869 }
870
871 pub fn read_records_from_genesis(
876 &mut self,
877 stream_id: StreamId,
878 from_offset: Offset,
879 max_bytes: u64,
880 ) -> Result<Vec<Record>, StorageError> {
881 let segment_nums = self.segment_numbers(stream_id);
882
883 let mut results = Vec::new();
884 let mut bytes_read: u64 = 0;
885 let mut expected_prev_hash: Option<ChainHash> = None;
886 let mut records_verified: u64 = 0;
887
888 for seg_num in segment_nums {
889 let seg_path = self.segment_path_for(stream_id, seg_num);
890 if !seg_path.exists() {
891 continue;
892 }
893
894 let data = self.read_segment_data(stream_id, seg_num)?;
895 let mut pos = 0;
896
897 while pos < data.len() && bytes_read < max_bytes {
898 let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
899
900 let record = self.decompress_record(record)?;
902
903 if record.prev_hash() != expected_prev_hash {
905 return Err(StorageError::ChainVerificationFailed {
906 offset: record.offset(),
907 expected: expected_prev_hash,
908 actual: record.prev_hash(),
909 });
910 }
911
912 kimberlite_properties::always!(
914 record.prev_hash() == expected_prev_hash,
915 "storage.hash_chain_valid_on_genesis_read",
916 "prev_hash must match expected hash during genesis-verified read"
917 );
918
919 expected_prev_hash = Some(record.compute_hash());
920 records_verified += 1;
921 pos += consumed;
922
923 if record.offset() < from_offset {
925 continue;
926 }
927
928 bytes_read += record.payload().len() as u64;
929 results.push(record);
930 }
931
932 if bytes_read >= max_bytes {
933 break;
934 }
935 }
936
937 debug_assert!(
939 records_verified == 0 || expected_prev_hash.is_some(),
940 "verified records but no final hash"
941 );
942
943 kimberlite_properties::sometimes!(
945 !results.is_empty(),
946 "storage.read_after_write_exercised",
947 "simulation should exercise reading non-empty results from storage"
948 );
949
950 Ok(results)
951 }
952
953 pub fn append_batch_pipelined(
965 &mut self,
966 stream_id: StreamId,
967 events: &[Bytes],
968 expected_offset: Offset,
969 prev_hash: Option<ChainHash>,
970 fsync: bool,
971 pipeline: &mut crate::AppendPipeline,
972 ) -> Result<(Offset, ChainHash), StorageError> {
973 assert!(!events.is_empty(), "cannot append empty batch");
974
975 let event_count = events.len();
976
977 let stream_dir = self.stream_dir(stream_id);
979 fs::create_dir_all(&stream_dir)?;
980
981 let manifest = self.get_or_load_manifest(stream_id)?;
983 let active_seg = manifest.active_segment;
984
985 let segment_path = self.segment_path_for(stream_id, active_seg);
987 let mut file = OpenOptions::new()
988 .create(true)
989 .append(true)
990 .open(&segment_path)?;
991
992 let base_byte_pos: u64 = file.metadata()?.len();
993
994 let batch = pipeline.prepare_batch(
996 events,
997 expected_offset,
998 prev_hash,
999 base_byte_pos,
1000 self.default_compression,
1001 &self.codec_registry,
1002 )?;
1003
1004 file.write_all(&batch.data)?;
1006
1007 if fsync {
1008 file.sync_all()?;
1009 }
1010
1011 let index_path = self.index_path_for(stream_id, active_seg);
1013 let cache_key = (stream_id, active_seg);
1014 self.ensure_index_cached(stream_id, active_seg)?;
1015 let index = self
1016 .index_cache
1017 .get_mut(&cache_key)
1018 .expect("index exists: just ensured");
1019
1020 for &(_offset, byte_pos) in &batch.index_entries {
1021 index.append(byte_pos);
1022 }
1023
1024 let new_byte_pos = base_byte_pos + batch.bytes_written;
1025 let new_offset = expected_offset + Offset::from(event_count as u64);
1026
1027 let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
1029 let active_meta = manifest.active_mut();
1030 active_meta.size_bytes = new_byte_pos;
1031 active_meta.next_offset = new_offset.as_u64();
1032
1033 let dirty = self.index_dirty_count.entry(cache_key).or_insert(0);
1035 *dirty += event_count;
1036 if *dirty >= INDEX_FLUSH_THRESHOLD || fsync {
1037 let index = self.index_cache.get(&cache_key).expect("index exists");
1038 let flushed = *self.index_flushed_count.get(&cache_key).unwrap_or(&0);
1039 index.save_incremental(&index_path, flushed, 1000)?;
1040 self.index_flushed_count.insert(cache_key, index.len());
1041 *dirty = 0;
1042 }
1043
1044 if new_byte_pos >= self.max_segment_size {
1046 self.rotate_segment(stream_id, new_offset)?;
1047 }
1048
1049 let stream_dir = self.stream_dir(stream_id);
1051 let manifest = self.manifests.get(&stream_id).expect("manifest loaded");
1052 manifest.save(&stream_dir)?;
1053
1054 debug_assert_eq!(
1055 new_offset.as_u64() - expected_offset.as_u64(),
1056 event_count as u64,
1057 "offset mismatch after pipelined batch write"
1058 );
1059
1060 Ok((new_offset, batch.final_hash))
1061 }
1062
1063 fn decompress_record(&self, record: Record) -> Result<Record, StorageError> {
1072 if record.compression() == CompressionKind::None {
1073 return Ok(record);
1074 }
1075
1076 let decompressed = self
1077 .codec_registry
1078 .decompress(record.compression(), record.payload())?;
1079
1080 Ok(Record::with_compression(
1081 record.offset(),
1082 record.prev_hash(),
1083 record.kind(),
1084 CompressionKind::None,
1085 Bytes::from(decompressed),
1086 ))
1087 }
1088
1089 pub fn rebuild_checkpoint_index(
1095 &mut self,
1096 stream_id: StreamId,
1097 ) -> Result<CheckpointIndex, StorageError> {
1098 let segment_nums = self.segment_numbers(stream_id);
1099 let mut checkpoint_index = CheckpointIndex::new();
1100
1101 for seg_num in segment_nums {
1102 let seg_path = self.segment_path_for(stream_id, seg_num);
1103 if !seg_path.exists() {
1104 continue;
1105 }
1106
1107 let data = self.read_segment_data(stream_id, seg_num)?;
1108 let mut pos = 0;
1109
1110 while pos < data.len() {
1111 let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
1112
1113 if record.is_checkpoint() {
1114 checkpoint_index.add(record.offset());
1115 }
1116
1117 pos += consumed;
1118 }
1119 }
1120
1121 tracing::debug!(
1122 stream_id = %stream_id,
1123 checkpoint_count = checkpoint_index.len(),
1124 "rebuilt checkpoint index"
1125 );
1126
1127 Ok(checkpoint_index)
1128 }
1129
1130 fn get_or_rebuild_checkpoint_index(
1132 &mut self,
1133 stream_id: StreamId,
1134 ) -> Result<&CheckpointIndex, StorageError> {
1135 if !self.checkpoint_cache.contains_key(&stream_id) {
1136 let index = self.rebuild_checkpoint_index(stream_id)?;
1137 self.checkpoint_cache.insert(stream_id, index);
1138 }
1139 Ok(self
1140 .checkpoint_cache
1141 .get(&stream_id)
1142 .expect("just inserted"))
1143 }
1144
1145 pub fn create_checkpoint(
1147 &mut self,
1148 stream_id: StreamId,
1149 current_offset: Offset,
1150 prev_hash: Option<ChainHash>,
1151 record_count: u64,
1152 fsync: bool,
1153 ) -> Result<(Offset, ChainHash), StorageError> {
1154 let chain_hash = prev_hash.unwrap_or_else(|| ChainHash::from_bytes(&[0u8; 32]));
1155
1156 let payload = serialize_checkpoint_payload(&chain_hash, record_count);
1157
1158 let record = Record::with_kind(current_offset, prev_hash, RecordKind::Checkpoint, payload);
1159 let record_bytes = record.to_bytes();
1160 let record_hash = record.compute_hash();
1161
1162 let stream_dir = self.stream_dir(stream_id);
1164 fs::create_dir_all(&stream_dir)?;
1165 let manifest = self.get_or_load_manifest(stream_id)?;
1166 let active_seg = manifest.active_segment;
1167
1168 let segment_path = self.segment_path_for(stream_id, active_seg);
1170 let mut file = OpenOptions::new()
1171 .create(true)
1172 .append(true)
1173 .open(&segment_path)?;
1174
1175 let byte_position = file.metadata()?.len();
1176
1177 file.write_all(&record_bytes)?;
1178
1179 if fsync {
1180 file.sync_all()?;
1181 }
1182
1183 let cache_key = (stream_id, active_seg);
1185 let index_path = self.index_path_for(stream_id, active_seg);
1186 self.ensure_index_cached(stream_id, active_seg)?;
1187 let index = self.index_cache.get_mut(&cache_key).expect("just loaded");
1188 index.append(byte_position);
1189 index.save(&index_path)?;
1191 self.index_dirty_count.insert(cache_key, 0);
1192 self.index_flushed_count.insert(cache_key, index.len());
1193 let wal_path = {
1195 let mut p = index_path.as_os_str().to_owned();
1196 p.push(".wal");
1197 std::path::PathBuf::from(p)
1198 };
1199 let _ = fs::remove_file(wal_path);
1200
1201 let new_size = byte_position + record_bytes.len() as u64;
1203 let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
1204 let active_meta = manifest.active_mut();
1205 active_meta.size_bytes = new_size;
1206 active_meta.next_offset = current_offset.as_u64() + 1;
1207 manifest.save(&stream_dir)?;
1208
1209 if let Some(cp_index) = self.checkpoint_cache.get_mut(&stream_id) {
1211 cp_index.add(current_offset);
1212 }
1213
1214 tracing::info!(
1215 stream_id = %stream_id,
1216 offset = %current_offset,
1217 record_count = record_count,
1218 "created checkpoint"
1219 );
1220
1221 let next_offset = current_offset + Offset::from(1u64);
1222 Ok((next_offset, record_hash))
1223 }
1224
1225 pub fn read_records_verified(
1230 &mut self,
1231 stream_id: StreamId,
1232 from_offset: Offset,
1233 max_bytes: u64,
1234 ) -> Result<Vec<Record>, StorageError> {
1235 let _ = self.get_or_load_manifest(stream_id);
1237 let segment_nums = self.segment_numbers(stream_id);
1238
1239 if segment_nums.is_empty() {
1240 return Ok(Vec::new());
1241 }
1242
1243 let first_seg_path = self.segment_path_for(stream_id, segment_nums[0]);
1245 if !first_seg_path.exists() {
1246 return Ok(Vec::new());
1247 }
1248
1249 let checkpoint_index = self.get_or_rebuild_checkpoint_index(stream_id)?;
1251 let verification_start = checkpoint_index.find_nearest(from_offset);
1252
1253 let (start_seg_num, start_pos, mut expected_prev_hash) = match verification_start {
1255 Some(cp_offset) => {
1256 let manifest = self.manifests.get(&stream_id);
1258 let seg_num = manifest
1259 .and_then(|m| {
1260 m.find_segment_for_offset(cp_offset.as_u64())
1261 .map(|s| s.segment_num)
1262 })
1263 .unwrap_or(0);
1264
1265 self.ensure_index_cached(stream_id, seg_num)?;
1267 let offset_index = self
1268 .index_cache
1269 .get(&(stream_id, seg_num))
1270 .expect("just ensured");
1271
1272 let first_offset_in_seg = self
1274 .manifests
1275 .get(&stream_id)
1276 .and_then(|m| {
1277 m.find_segment_for_offset(cp_offset.as_u64())
1278 .map(|s| s.first_offset)
1279 })
1280 .unwrap_or(0);
1281 let local_offset = Offset::new(cp_offset.as_u64() - first_offset_in_seg);
1282
1283 let byte_pos = offset_index
1284 .lookup(local_offset)
1285 .ok_or(StorageError::UnexpectedEof)?;
1286
1287 let data = self.read_segment_data(stream_id, seg_num)?;
1289 let (cp_record, _) = Record::from_bytes(&data.slice(byte_pos as usize..))?;
1290 debug_assert!(cp_record.is_checkpoint());
1291
1292 let (chain_hash, _) =
1293 deserialize_checkpoint_payload(cp_record.payload(), cp_offset)?;
1294
1295 (seg_num, byte_pos as usize, Some(chain_hash))
1296 }
1297 None => (segment_nums[0], 0, None),
1298 };
1299
1300 let mut results = Vec::new();
1301 let mut bytes_read: u64 = 0;
1302 let mut started = false;
1303
1304 for &seg_num in &segment_nums {
1305 if seg_num < start_seg_num {
1306 continue;
1307 }
1308
1309 let seg_path = self.segment_path_for(stream_id, seg_num);
1310 if !seg_path.exists() {
1311 continue;
1312 }
1313
1314 let data = self.read_segment_data(stream_id, seg_num)?;
1315 let mut pos = if seg_num == start_seg_num && !started {
1316 started = true;
1317 start_pos
1318 } else {
1319 0
1320 };
1321
1322 while pos < data.len() && bytes_read < max_bytes {
1323 let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
1324
1325 let record = self.decompress_record(record)?;
1327
1328 kimberlite_properties::never!(
1330 record.prev_hash() != expected_prev_hash,
1331 "storage.verified_read_chain_break",
1332 "verified read must never encounter a hash chain break in non-corrupted storage"
1333 );
1334
1335 if record.prev_hash() != expected_prev_hash {
1336 return Err(StorageError::ChainVerificationFailed {
1337 offset: record.offset(),
1338 expected: expected_prev_hash,
1339 actual: record.prev_hash(),
1340 });
1341 }
1342
1343 expected_prev_hash = Some(record.compute_hash());
1344 pos += consumed;
1345
1346 if record.offset() >= from_offset && !record.is_checkpoint() {
1347 bytes_read += record.payload().len() as u64;
1348 results.push(record);
1349 }
1350 }
1351
1352 if bytes_read >= max_bytes {
1353 break;
1354 }
1355 }
1356
1357 Ok(results)
1358 }
1359
1360 pub fn last_checkpoint(&mut self, stream_id: StreamId) -> Result<Option<Offset>, StorageError> {
1362 let index = self.get_or_rebuild_checkpoint_index(stream_id)?;
1363 Ok(index.last())
1364 }
1365
1366 pub fn latest_chain_hash(
1383 &mut self,
1384 stream_id: StreamId,
1385 ) -> Result<Option<ChainHash>, StorageError> {
1386 let segment_nums = self.segment_numbers(stream_id);
1387 if segment_nums.is_empty() {
1388 return Ok(None);
1389 }
1390 let last_seg_num = *segment_nums.last().expect("segment_nums non-empty");
1391 let seg_path = self.segment_path_for(stream_id, last_seg_num);
1392 if !seg_path.exists() {
1393 return Ok(None);
1394 }
1395
1396 let data = self.read_segment_data(stream_id, last_seg_num)?;
1397 if data.is_empty() {
1398 return Ok(None);
1399 }
1400
1401 let mut pos = 0usize;
1402 let mut last_hash: Option<ChainHash> = None;
1403 while pos < data.len() {
1404 let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
1405 let record = self.decompress_record(record)?;
1406 last_hash = Some(record.compute_hash());
1407 pos += consumed;
1408 }
1409
1410 Ok(last_hash)
1411 }
1412
1413 pub fn segment_count(&self, stream_id: StreamId) -> usize {
1415 self.manifests
1416 .get(&stream_id)
1417 .map_or(0, |m| m.segments.len())
1418 }
1419
1420 pub fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
1422 self.manifests.get(&stream_id).map_or_else(Vec::new, |m| {
1423 m.segments
1424 .iter()
1425 .filter(|s| s.segment_num != m.active_segment)
1426 .map(|s| s.segment_num)
1427 .collect()
1428 })
1429 }
1430
1431 pub fn flush_indexes(&mut self) -> Result<(), StorageError> {
1436 let dirty_keys: Vec<(StreamId, u32)> = self
1437 .index_dirty_count
1438 .iter()
1439 .filter(|(_, count)| **count > 0)
1440 .map(|(&key, _)| key)
1441 .collect();
1442
1443 let mut first_error: Option<StorageError> = None;
1444
1445 for (stream_id, segment_num) in dirty_keys {
1446 if let Some(index) = self.index_cache.get(&(stream_id, segment_num)) {
1447 let index_path = self.index_path_for(stream_id, segment_num);
1448 if let Err(e) = index.save(&index_path) {
1450 tracing::error!(
1451 stream_id = %stream_id,
1452 segment_num = segment_num,
1453 error = %e,
1454 "failed to flush index on shutdown"
1455 );
1456 if first_error.is_none() {
1457 first_error = Some(e);
1458 }
1459 } else {
1460 self.index_dirty_count.insert((stream_id, segment_num), 0);
1461 self.index_flushed_count
1462 .insert((stream_id, segment_num), index.len());
1463 let wal_path = {
1465 let mut p = index_path.as_os_str().to_owned();
1466 p.push(".wal");
1467 std::path::PathBuf::from(p)
1468 };
1469 let _ = fs::remove_file(wal_path);
1470 }
1471 }
1472 }
1473
1474 match first_error {
1475 Some(e) => Err(e),
1476 None => Ok(()),
1477 }
1478 }
1479}
1480
1481impl Drop for Storage {
1482 fn drop(&mut self) {
1483 if let Err(e) = self.flush_indexes() {
1484 tracing::error!(error = %e, "failed to flush indexes during Storage drop");
1485 }
1486 }
1487}
1488
1489impl crate::backend::StorageBackend for Storage {
1494 fn append_batch(
1495 &mut self,
1496 stream_id: StreamId,
1497 events: Vec<Bytes>,
1498 expected_offset: Offset,
1499 prev_hash: Option<ChainHash>,
1500 fsync: bool,
1501 ) -> Result<(Offset, ChainHash), StorageError> {
1502 Storage::append_batch(self, stream_id, events, expected_offset, prev_hash, fsync)
1503 }
1504
1505 fn read_from(
1506 &mut self,
1507 stream_id: StreamId,
1508 from_offset: Offset,
1509 max_bytes: u64,
1510 ) -> Result<Vec<Bytes>, StorageError> {
1511 Storage::read_from(self, stream_id, from_offset, max_bytes)
1512 }
1513
1514 fn latest_chain_hash(
1515 &mut self,
1516 stream_id: StreamId,
1517 ) -> Result<Option<ChainHash>, StorageError> {
1518 Storage::latest_chain_hash(self, stream_id)
1519 }
1520
1521 fn segment_count(&self, stream_id: StreamId) -> usize {
1522 Storage::segment_count(self, stream_id)
1523 }
1524
1525 fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
1526 Storage::completed_segments(self, stream_id)
1527 }
1528
1529 fn flush_indexes(&mut self) -> Result<(), StorageError> {
1530 Storage::flush_indexes(self)
1531 }
1532
1533 #[cfg(feature = "fuzz-reset")]
1534 fn reset(&mut self) -> Result<(), StorageError> {
1535 Storage::reset(self)
1536 }
1537}