use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use bytes::Bytes;
use kimberlite_crypto::ChainHash;
use kimberlite_types::{CheckpointPolicy, CompressionKind, Offset, RecordKind, StreamId};
use crate::checkpoint::{
CheckpointIndex, deserialize_checkpoint_payload, serialize_checkpoint_payload,
};
use crate::codec::CodecRegistry;
use crate::{OffsetIndex, Record, StorageError};
const INDEX_FLUSH_THRESHOLD: usize = 100;
const DEFAULT_MAX_SEGMENT_SIZE: u64 = 256 * 1024 * 1024;
const MANIFEST_FILENAME: &str = "manifest.json";
fn segment_filename(segment_num: u32) -> String {
format!("segment_{segment_num:06}.log")
}
fn segment_index_filename(segment_num: u32) -> String {
format!("segment_{segment_num:06}.log.idx")
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct SegmentMeta {
segment_num: u32,
first_offset: u64,
next_offset: u64,
size_bytes: u64,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct SegmentManifest {
segments: Vec<SegmentMeta>,
active_segment: u32,
}
impl SegmentManifest {
fn new() -> Self {
Self {
segments: vec![SegmentMeta {
segment_num: 0,
first_offset: 0,
next_offset: 0,
size_bytes: 0,
}],
active_segment: 0,
}
}
fn save(&self, stream_dir: &std::path::Path) -> Result<(), StorageError> {
let path = stream_dir.join(MANIFEST_FILENAME);
let json = serde_json::to_string_pretty(self).map_err(std::io::Error::other)?;
fs::write(path, json)?;
Ok(())
}
fn load(stream_dir: &std::path::Path) -> Result<Self, StorageError> {
let path = stream_dir.join(MANIFEST_FILENAME);
let json = fs::read_to_string(path)?;
let manifest: Self = serde_json::from_str(&json).map_err(std::io::Error::other)?;
Ok(manifest)
}
fn active_mut(&mut self) -> &mut SegmentMeta {
self.segments
.iter_mut()
.find(|s| s.segment_num == self.active_segment)
.expect("active segment must exist in manifest")
}
fn find_segment_for_offset(&self, offset: u64) -> Option<&SegmentMeta> {
match self
.segments
.binary_search_by_key(&offset, |s| s.first_offset)
{
Ok(idx) => Some(&self.segments[idx]),
Err(idx) => {
if idx == 0 {
None
} else {
let seg = &self.segments[idx - 1];
if offset < seg.next_offset {
Some(seg)
} else {
self.segments.last()
}
}
}
}
}
fn rotate(&mut self, first_offset: u64) -> u32 {
let new_num = self.active_segment + 1;
self.segments.push(SegmentMeta {
segment_num: new_num,
first_offset,
next_offset: first_offset,
size_bytes: 0,
});
self.active_segment = new_num;
new_num
}
}
#[derive(Debug)]
pub struct Storage {
data_dir: PathBuf,
index_cache: HashMap<(StreamId, u32), OffsetIndex>,
checkpoint_cache: HashMap<StreamId, CheckpointIndex>,
checkpoint_policy: CheckpointPolicy,
index_dirty_count: HashMap<(StreamId, u32), usize>,
manifests: HashMap<StreamId, SegmentManifest>,
max_segment_size: u64,
index_flushed_count: HashMap<(StreamId, u32), usize>,
segment_data_cache: HashMap<(StreamId, u32), Bytes>,
default_compression: CompressionKind,
codec_registry: CodecRegistry,
}
impl Storage {
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
Self::with_checkpoint_policy(data_dir, CheckpointPolicy::default())
}
pub fn with_checkpoint_policy(
data_dir: impl Into<PathBuf>,
checkpoint_policy: CheckpointPolicy,
) -> Self {
Self {
data_dir: data_dir.into(),
index_cache: HashMap::new(),
checkpoint_cache: HashMap::new(),
checkpoint_policy,
index_dirty_count: HashMap::new(),
manifests: HashMap::new(),
max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
index_flushed_count: HashMap::new(),
segment_data_cache: HashMap::new(),
default_compression: CompressionKind::None,
codec_registry: CodecRegistry::new(),
}
}
pub fn with_max_segment_size(
data_dir: impl Into<PathBuf>,
checkpoint_policy: CheckpointPolicy,
max_segment_size: u64,
) -> Self {
Self {
data_dir: data_dir.into(),
index_cache: HashMap::new(),
checkpoint_cache: HashMap::new(),
checkpoint_policy,
index_dirty_count: HashMap::new(),
manifests: HashMap::new(),
max_segment_size,
index_flushed_count: HashMap::new(),
segment_data_cache: HashMap::new(),
default_compression: CompressionKind::None,
codec_registry: CodecRegistry::new(),
}
}
pub fn with_compression(
data_dir: impl Into<PathBuf>,
checkpoint_policy: CheckpointPolicy,
compression: CompressionKind,
) -> Self {
Self {
data_dir: data_dir.into(),
index_cache: HashMap::new(),
checkpoint_cache: HashMap::new(),
checkpoint_policy,
index_dirty_count: HashMap::new(),
manifests: HashMap::new(),
max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
index_flushed_count: HashMap::new(),
segment_data_cache: HashMap::new(),
default_compression: compression,
codec_registry: CodecRegistry::new(),
}
}
pub fn default_compression(&self) -> CompressionKind {
self.default_compression
}
#[cfg(feature = "fuzz-reset")]
pub fn reset(&mut self) -> Result<(), crate::error::StorageError> {
if self.data_dir.exists() {
std::fs::remove_dir_all(&self.data_dir)?;
}
std::fs::create_dir_all(&self.data_dir)?;
self.index_cache.clear();
self.checkpoint_cache.clear();
self.index_dirty_count.clear();
self.manifests.clear();
self.index_flushed_count.clear();
self.segment_data_cache.clear();
Ok(())
}
pub fn checkpoint_policy(&self) -> &CheckpointPolicy {
&self.checkpoint_policy
}
pub fn data_dir(&self) -> &PathBuf {
&self.data_dir
}
pub fn max_segment_size(&self) -> u64 {
self.max_segment_size
}
fn stream_dir(&self, stream_id: StreamId) -> PathBuf {
self.data_dir.join(stream_id.to_string())
}
fn segment_path_for(&self, stream_id: StreamId, segment_num: u32) -> PathBuf {
self.stream_dir(stream_id)
.join(segment_filename(segment_num))
}
fn index_path_for(&self, stream_id: StreamId, segment_num: u32) -> PathBuf {
self.stream_dir(stream_id)
.join(segment_index_filename(segment_num))
}
pub fn index_path(&self, stream_id: StreamId) -> PathBuf {
let segment_num = self
.manifests
.get(&stream_id)
.map_or(0, |m| m.active_segment);
self.index_path_for(stream_id, segment_num)
}
fn get_or_load_manifest(
&mut self,
stream_id: StreamId,
) -> Result<&mut SegmentManifest, StorageError> {
if !self.manifests.contains_key(&stream_id) {
let stream_dir = self.stream_dir(stream_id);
let manifest = if stream_dir.join(MANIFEST_FILENAME).exists() {
SegmentManifest::load(&stream_dir)?
} else {
SegmentManifest::new()
};
self.manifests.insert(stream_id, manifest);
}
Ok(self.manifests.get_mut(&stream_id).expect("just inserted"))
}
pub fn rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
let segment_num = self
.manifests
.get(&stream_id)
.map_or(0, |m| m.active_segment);
self.rebuild_index_for_segment(stream_id, segment_num)
}
fn rebuild_index_for_segment(
&self,
stream_id: StreamId,
segment_num: u32,
) -> Result<OffsetIndex, StorageError> {
let segment_path = self.segment_path_for(stream_id, segment_num);
if !segment_path.exists() {
return Ok(OffsetIndex::new());
}
let data: Bytes = fs::read(&segment_path)?.into();
let mut index = OffsetIndex::new();
let mut pos = 0;
loop {
if pos >= data.len() {
break;
}
match Record::from_bytes(&data.slice(pos..)) {
Ok((_, consumed)) => {
index.append(pos as u64);
pos += consumed;
}
Err(StorageError::TornWrite { ref reason }) => {
tracing::warn!(
stream_id = %stream_id,
segment_num = segment_num,
torn_byte_offset = pos,
complete_records = index.len(),
reason = %reason,
"torn write detected during recovery — truncating log at last complete record"
);
let file = fs::OpenOptions::new().write(true).open(&segment_path)?;
file.set_len(pos as u64)?;
tracing::info!(
stream_id = %stream_id,
segment_num = segment_num,
truncated_to_bytes = pos,
"log truncated to last complete record"
);
break;
}
Err(StorageError::UnexpectedEof) => {
tracing::warn!(
stream_id = %stream_id,
segment_num = segment_num,
partial_byte_offset = pos,
complete_records = index.len(),
"unexpected EOF during recovery — truncating log at last complete record"
);
let file = fs::OpenOptions::new().write(true).open(&segment_path)?;
file.set_len(pos as u64)?;
break;
}
Err(e) => {
return Err(e);
}
}
}
let index_path = self.index_path_for(stream_id, segment_num);
index.save(&index_path)?;
Ok(index)
}
fn load_or_rebuild_index_for_segment(
&self,
stream_id: StreamId,
segment_num: u32,
) -> Result<OffsetIndex, StorageError> {
let index_path = self.index_path_for(stream_id, segment_num);
if !index_path.exists() {
let wal_path = index_path.with_extension("idx.wal");
if !wal_path.exists() {
return self.rebuild_index_for_segment(stream_id, segment_num);
}
}
if let Ok(index) = OffsetIndex::load_with_wal(&index_path) {
return Ok(index);
}
if let Ok(index) = OffsetIndex::load(&index_path) {
return Ok(index);
}
tracing::warn!(
stream_id = %stream_id,
segment_num = segment_num,
"index corrupted, rebuilding from log"
);
self.rebuild_index_for_segment(stream_id, segment_num)
}
pub fn load_or_rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
let segment_num = self
.manifests
.get(&stream_id)
.map_or(0, |m| m.active_segment);
self.load_or_rebuild_index_for_segment(stream_id, segment_num)
}
fn ensure_index_cached(
&mut self,
stream_id: StreamId,
segment_num: u32,
) -> Result<(), StorageError> {
let key = (stream_id, segment_num);
if !self.index_cache.contains_key(&key) {
let loaded = self.load_or_rebuild_index_for_segment(stream_id, segment_num)?;
let flushed = loaded.len(); self.index_cache.insert(key, loaded);
self.index_flushed_count.insert(key, flushed);
}
Ok(())
}
fn read_segment_data(
&mut self,
stream_id: StreamId,
segment_num: u32,
) -> Result<Bytes, StorageError> {
let is_active = self
.manifests
.get(&stream_id)
.is_some_and(|m| m.active_segment == segment_num);
if is_active {
let path = self.segment_path_for(stream_id, segment_num);
Ok(fs::read(&path)?.into())
} else {
let key = (stream_id, segment_num);
if let Some(cached) = self.segment_data_cache.get(&key) {
return Ok(cached.clone());
}
let path = self.segment_path_for(stream_id, segment_num);
let data: Bytes = fs::read(&path)?.into();
self.segment_data_cache.insert(key, data.clone());
Ok(data)
}
}
fn segment_numbers(&self, stream_id: StreamId) -> Vec<u32> {
self.manifests.get(&stream_id).map_or_else(
|| {
if self.segment_path_for(stream_id, 0).exists() {
vec![0]
} else {
vec![]
}
},
|m| m.segments.iter().map(|s| s.segment_num).collect(),
)
}
pub fn append_batch(
&mut self,
stream_id: StreamId,
events: Vec<Bytes>,
expected_offset: Offset,
prev_hash: Option<ChainHash>,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError> {
assert!(!events.is_empty(), "cannot append empty batch");
let event_count = events.len();
let stream_dir = self.stream_dir(stream_id);
fs::create_dir_all(&stream_dir)?;
let manifest = self.get_or_load_manifest(stream_id)?;
let active_seg = manifest.active_segment;
let segment_path = self.segment_path_for(stream_id, active_seg);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&segment_path)?;
let mut byte_position: u64 = file.metadata()?.len();
let index_path = self.index_path_for(stream_id, active_seg);
let cache_key = (stream_id, active_seg);
self.ensure_index_cached(stream_id, active_seg)?;
let index = self
.index_cache
.get_mut(&cache_key)
.expect("index exists: just ensured");
let mut current_offset = expected_offset;
let mut current_hash = prev_hash;
let compression = self.default_compression;
for event in events {
index.append(byte_position);
let (stored_payload, record_compression) = if compression == CompressionKind::None {
(event.clone(), CompressionKind::None)
} else {
let compressed = self.codec_registry.compress(compression, &event)?;
if compressed.len() < event.len() {
(Bytes::from(compressed), compression)
} else {
(event.clone(), CompressionKind::None)
}
};
let hash_record = Record::new(current_offset, current_hash, event);
current_hash = Some(hash_record.compute_hash());
let record = Record::with_compression(
current_offset,
hash_record.prev_hash(),
RecordKind::Data,
record_compression,
stored_payload,
);
let record_bytes = record.to_bytes();
byte_position += record_bytes.len() as u64;
file.write_all(&record_bytes)?;
current_offset += Offset::from(1u64);
}
if fsync {
file.sync_all()?;
}
let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
let active_meta = manifest.active_mut();
active_meta.size_bytes = byte_position;
active_meta.next_offset = current_offset.as_u64();
let cache_key_for_flush = (stream_id, active_seg);
let dirty = self
.index_dirty_count
.entry(cache_key_for_flush)
.or_insert(0);
*dirty += event_count;
if *dirty >= INDEX_FLUSH_THRESHOLD || fsync {
let index = self
.index_cache
.get(&cache_key_for_flush)
.expect("index exists: just used above");
let flushed = *self
.index_flushed_count
.get(&cache_key_for_flush)
.unwrap_or(&0);
index.save_incremental(&index_path, flushed, crate::index::MAX_WAL_BYTES)?;
self.index_flushed_count
.insert(cache_key_for_flush, index.len());
*dirty = 0;
}
if byte_position >= self.max_segment_size {
self.rotate_segment(stream_id, current_offset)?;
}
let stream_dir = self.stream_dir(stream_id);
let manifest = self.manifests.get(&stream_id).expect("manifest loaded");
manifest.save(&stream_dir)?;
debug_assert_eq!(
current_offset.as_u64() - expected_offset.as_u64(),
event_count as u64,
"offset mismatch after batch write"
);
kimberlite_properties::always!(
current_offset.as_u64() >= expected_offset.as_u64(),
"storage.offset_advances_forward",
"offset must only advance forward after append_batch"
);
kimberlite_properties::always!(
current_hash.is_some(),
"storage.hash_chain_valid_after_append",
"hash chain must produce a valid hash after non-empty batch append"
);
Ok((current_offset, current_hash.expect("batch was non-empty")))
}
fn rotate_segment(
&mut self,
stream_id: StreamId,
next_offset: Offset,
) -> Result<(), StorageError> {
let old_seg = self
.manifests
.get(&stream_id)
.expect("manifest loaded")
.active_segment;
let old_key = (stream_id, old_seg);
if let Some(index) = self.index_cache.get(&old_key) {
let index_path = self.index_path_for(stream_id, old_seg);
index.save(&index_path)?;
}
self.index_dirty_count.insert(old_key, 0);
let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
let new_seg = manifest.rotate(next_offset.as_u64());
tracing::info!(
stream_id = %stream_id,
old_segment = old_seg,
new_segment = new_seg,
"rotated segment"
);
Ok(())
}
pub fn read_from(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError> {
let records = self.read_records_verified(stream_id, from_offset, max_bytes)?;
Ok(records.into_iter().map(|r| r.payload().clone()).collect())
}
pub fn read_from_genesis(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError> {
let records = self.read_records_from_genesis(stream_id, from_offset, max_bytes)?;
Ok(records.into_iter().map(|r| r.payload().clone()).collect())
}
pub fn read_records_from_genesis(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Record>, StorageError> {
let segment_nums = self.segment_numbers(stream_id);
let mut results = Vec::new();
let mut bytes_read: u64 = 0;
let mut expected_prev_hash: Option<ChainHash> = None;
let mut records_verified: u64 = 0;
for seg_num in segment_nums {
let seg_path = self.segment_path_for(stream_id, seg_num);
if !seg_path.exists() {
continue;
}
let data = self.read_segment_data(stream_id, seg_num)?;
let mut pos = 0;
while pos < data.len() && bytes_read < max_bytes {
let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
let record = self.decompress_record(record)?;
if record.prev_hash() != expected_prev_hash {
return Err(StorageError::ChainVerificationFailed {
offset: record.offset(),
expected: expected_prev_hash,
actual: record.prev_hash(),
});
}
kimberlite_properties::always!(
record.prev_hash() == expected_prev_hash,
"storage.hash_chain_valid_on_genesis_read",
"prev_hash must match expected hash during genesis-verified read"
);
expected_prev_hash = Some(record.compute_hash());
records_verified += 1;
pos += consumed;
if record.offset() < from_offset {
continue;
}
bytes_read += record.payload().len() as u64;
results.push(record);
}
if bytes_read >= max_bytes {
break;
}
}
debug_assert!(
records_verified == 0 || expected_prev_hash.is_some(),
"verified records but no final hash"
);
kimberlite_properties::sometimes!(
!results.is_empty(),
"storage.read_after_write_exercised",
"simulation should exercise reading non-empty results from storage"
);
Ok(results)
}
pub fn append_batch_pipelined(
&mut self,
stream_id: StreamId,
events: &[Bytes],
expected_offset: Offset,
prev_hash: Option<ChainHash>,
fsync: bool,
pipeline: &mut crate::AppendPipeline,
) -> Result<(Offset, ChainHash), StorageError> {
assert!(!events.is_empty(), "cannot append empty batch");
let event_count = events.len();
let stream_dir = self.stream_dir(stream_id);
fs::create_dir_all(&stream_dir)?;
let manifest = self.get_or_load_manifest(stream_id)?;
let active_seg = manifest.active_segment;
let segment_path = self.segment_path_for(stream_id, active_seg);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&segment_path)?;
let base_byte_pos: u64 = file.metadata()?.len();
let batch = pipeline.prepare_batch(
events,
expected_offset,
prev_hash,
base_byte_pos,
self.default_compression,
&self.codec_registry,
)?;
file.write_all(&batch.data)?;
if fsync {
file.sync_all()?;
}
let index_path = self.index_path_for(stream_id, active_seg);
let cache_key = (stream_id, active_seg);
self.ensure_index_cached(stream_id, active_seg)?;
let index = self
.index_cache
.get_mut(&cache_key)
.expect("index exists: just ensured");
for &(_offset, byte_pos) in &batch.index_entries {
index.append(byte_pos);
}
let new_byte_pos = base_byte_pos + batch.bytes_written;
let new_offset = expected_offset + Offset::from(event_count as u64);
let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
let active_meta = manifest.active_mut();
active_meta.size_bytes = new_byte_pos;
active_meta.next_offset = new_offset.as_u64();
let dirty = self.index_dirty_count.entry(cache_key).or_insert(0);
*dirty += event_count;
if *dirty >= INDEX_FLUSH_THRESHOLD || fsync {
let index = self.index_cache.get(&cache_key).expect("index exists");
let flushed = *self.index_flushed_count.get(&cache_key).unwrap_or(&0);
index.save_incremental(&index_path, flushed, 1000)?;
self.index_flushed_count.insert(cache_key, index.len());
*dirty = 0;
}
if new_byte_pos >= self.max_segment_size {
self.rotate_segment(stream_id, new_offset)?;
}
let stream_dir = self.stream_dir(stream_id);
let manifest = self.manifests.get(&stream_id).expect("manifest loaded");
manifest.save(&stream_dir)?;
debug_assert_eq!(
new_offset.as_u64() - expected_offset.as_u64(),
event_count as u64,
"offset mismatch after pipelined batch write"
);
Ok((new_offset, batch.final_hash))
}
fn decompress_record(&self, record: Record) -> Result<Record, StorageError> {
if record.compression() == CompressionKind::None {
return Ok(record);
}
let decompressed = self
.codec_registry
.decompress(record.compression(), record.payload())?;
Ok(Record::with_compression(
record.offset(),
record.prev_hash(),
record.kind(),
CompressionKind::None,
Bytes::from(decompressed),
))
}
pub fn rebuild_checkpoint_index(
&mut self,
stream_id: StreamId,
) -> Result<CheckpointIndex, StorageError> {
let segment_nums = self.segment_numbers(stream_id);
let mut checkpoint_index = CheckpointIndex::new();
for seg_num in segment_nums {
let seg_path = self.segment_path_for(stream_id, seg_num);
if !seg_path.exists() {
continue;
}
let data = self.read_segment_data(stream_id, seg_num)?;
let mut pos = 0;
while pos < data.len() {
let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
if record.is_checkpoint() {
checkpoint_index.add(record.offset());
}
pos += consumed;
}
}
tracing::debug!(
stream_id = %stream_id,
checkpoint_count = checkpoint_index.len(),
"rebuilt checkpoint index"
);
Ok(checkpoint_index)
}
fn get_or_rebuild_checkpoint_index(
&mut self,
stream_id: StreamId,
) -> Result<&CheckpointIndex, StorageError> {
if !self.checkpoint_cache.contains_key(&stream_id) {
let index = self.rebuild_checkpoint_index(stream_id)?;
self.checkpoint_cache.insert(stream_id, index);
}
Ok(self
.checkpoint_cache
.get(&stream_id)
.expect("just inserted"))
}
pub fn create_checkpoint(
&mut self,
stream_id: StreamId,
current_offset: Offset,
prev_hash: Option<ChainHash>,
record_count: u64,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError> {
let chain_hash = prev_hash.unwrap_or_else(|| ChainHash::from_bytes(&[0u8; 32]));
let payload = serialize_checkpoint_payload(&chain_hash, record_count);
let record = Record::with_kind(current_offset, prev_hash, RecordKind::Checkpoint, payload);
let record_bytes = record.to_bytes();
let record_hash = record.compute_hash();
let stream_dir = self.stream_dir(stream_id);
fs::create_dir_all(&stream_dir)?;
let manifest = self.get_or_load_manifest(stream_id)?;
let active_seg = manifest.active_segment;
let segment_path = self.segment_path_for(stream_id, active_seg);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&segment_path)?;
let byte_position = file.metadata()?.len();
file.write_all(&record_bytes)?;
if fsync {
file.sync_all()?;
}
let cache_key = (stream_id, active_seg);
let index_path = self.index_path_for(stream_id, active_seg);
self.ensure_index_cached(stream_id, active_seg)?;
let index = self.index_cache.get_mut(&cache_key).expect("just loaded");
index.append(byte_position);
index.save(&index_path)?;
self.index_dirty_count.insert(cache_key, 0);
self.index_flushed_count.insert(cache_key, index.len());
let wal_path = {
let mut p = index_path.as_os_str().to_owned();
p.push(".wal");
std::path::PathBuf::from(p)
};
let _ = fs::remove_file(wal_path);
let new_size = byte_position + record_bytes.len() as u64;
let manifest = self.manifests.get_mut(&stream_id).expect("manifest loaded");
let active_meta = manifest.active_mut();
active_meta.size_bytes = new_size;
active_meta.next_offset = current_offset.as_u64() + 1;
manifest.save(&stream_dir)?;
if let Some(cp_index) = self.checkpoint_cache.get_mut(&stream_id) {
cp_index.add(current_offset);
}
tracing::info!(
stream_id = %stream_id,
offset = %current_offset,
record_count = record_count,
"created checkpoint"
);
let next_offset = current_offset + Offset::from(1u64);
Ok((next_offset, record_hash))
}
pub fn read_records_verified(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Record>, StorageError> {
let _ = self.get_or_load_manifest(stream_id);
let segment_nums = self.segment_numbers(stream_id);
if segment_nums.is_empty() {
return Ok(Vec::new());
}
let first_seg_path = self.segment_path_for(stream_id, segment_nums[0]);
if !first_seg_path.exists() {
return Ok(Vec::new());
}
let checkpoint_index = self.get_or_rebuild_checkpoint_index(stream_id)?;
let verification_start = checkpoint_index.find_nearest(from_offset);
let (start_seg_num, start_pos, mut expected_prev_hash) = match verification_start {
Some(cp_offset) => {
let manifest = self.manifests.get(&stream_id);
let seg_num = manifest
.and_then(|m| {
m.find_segment_for_offset(cp_offset.as_u64())
.map(|s| s.segment_num)
})
.unwrap_or(0);
self.ensure_index_cached(stream_id, seg_num)?;
let offset_index = self
.index_cache
.get(&(stream_id, seg_num))
.expect("just ensured");
let first_offset_in_seg = self
.manifests
.get(&stream_id)
.and_then(|m| {
m.find_segment_for_offset(cp_offset.as_u64())
.map(|s| s.first_offset)
})
.unwrap_or(0);
let local_offset = Offset::new(cp_offset.as_u64() - first_offset_in_seg);
let byte_pos = offset_index
.lookup(local_offset)
.ok_or(StorageError::UnexpectedEof)?;
let data = self.read_segment_data(stream_id, seg_num)?;
let (cp_record, _) = Record::from_bytes(&data.slice(byte_pos as usize..))?;
debug_assert!(cp_record.is_checkpoint());
let (chain_hash, _) =
deserialize_checkpoint_payload(cp_record.payload(), cp_offset)?;
(seg_num, byte_pos as usize, Some(chain_hash))
}
None => (segment_nums[0], 0, None),
};
let mut results = Vec::new();
let mut bytes_read: u64 = 0;
let mut started = false;
for &seg_num in &segment_nums {
if seg_num < start_seg_num {
continue;
}
let seg_path = self.segment_path_for(stream_id, seg_num);
if !seg_path.exists() {
continue;
}
let data = self.read_segment_data(stream_id, seg_num)?;
let mut pos = if seg_num == start_seg_num && !started {
started = true;
start_pos
} else {
0
};
while pos < data.len() && bytes_read < max_bytes {
let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
let record = self.decompress_record(record)?;
kimberlite_properties::never!(
record.prev_hash() != expected_prev_hash,
"storage.verified_read_chain_break",
"verified read must never encounter a hash chain break in non-corrupted storage"
);
if record.prev_hash() != expected_prev_hash {
return Err(StorageError::ChainVerificationFailed {
offset: record.offset(),
expected: expected_prev_hash,
actual: record.prev_hash(),
});
}
expected_prev_hash = Some(record.compute_hash());
pos += consumed;
if record.offset() >= from_offset && !record.is_checkpoint() {
bytes_read += record.payload().len() as u64;
results.push(record);
}
}
if bytes_read >= max_bytes {
break;
}
}
Ok(results)
}
pub fn last_checkpoint(&mut self, stream_id: StreamId) -> Result<Option<Offset>, StorageError> {
let index = self.get_or_rebuild_checkpoint_index(stream_id)?;
Ok(index.last())
}
pub fn latest_chain_hash(
&mut self,
stream_id: StreamId,
) -> Result<Option<ChainHash>, StorageError> {
let segment_nums = self.segment_numbers(stream_id);
if segment_nums.is_empty() {
return Ok(None);
}
let last_seg_num = *segment_nums.last().expect("segment_nums non-empty");
let seg_path = self.segment_path_for(stream_id, last_seg_num);
if !seg_path.exists() {
return Ok(None);
}
let data = self.read_segment_data(stream_id, last_seg_num)?;
if data.is_empty() {
return Ok(None);
}
let mut pos = 0usize;
let mut last_hash: Option<ChainHash> = None;
while pos < data.len() {
let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
let record = self.decompress_record(record)?;
last_hash = Some(record.compute_hash());
pos += consumed;
}
Ok(last_hash)
}
pub fn segment_count(&self, stream_id: StreamId) -> usize {
self.manifests
.get(&stream_id)
.map_or(0, |m| m.segments.len())
}
pub fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
self.manifests.get(&stream_id).map_or_else(Vec::new, |m| {
m.segments
.iter()
.filter(|s| s.segment_num != m.active_segment)
.map(|s| s.segment_num)
.collect()
})
}
pub fn flush_indexes(&mut self) -> Result<(), StorageError> {
let dirty_keys: Vec<(StreamId, u32)> = self
.index_dirty_count
.iter()
.filter(|(_, count)| **count > 0)
.map(|(&key, _)| key)
.collect();
let mut first_error: Option<StorageError> = None;
for (stream_id, segment_num) in dirty_keys {
if let Some(index) = self.index_cache.get(&(stream_id, segment_num)) {
let index_path = self.index_path_for(stream_id, segment_num);
if let Err(e) = index.save(&index_path) {
tracing::error!(
stream_id = %stream_id,
segment_num = segment_num,
error = %e,
"failed to flush index on shutdown"
);
if first_error.is_none() {
first_error = Some(e);
}
} else {
self.index_dirty_count.insert((stream_id, segment_num), 0);
self.index_flushed_count
.insert((stream_id, segment_num), index.len());
let wal_path = {
let mut p = index_path.as_os_str().to_owned();
p.push(".wal");
std::path::PathBuf::from(p)
};
let _ = fs::remove_file(wal_path);
}
}
}
match first_error {
Some(e) => Err(e),
None => Ok(()),
}
}
}
impl Drop for Storage {
fn drop(&mut self) {
if let Err(e) = self.flush_indexes() {
tracing::error!(error = %e, "failed to flush indexes during Storage drop");
}
}
}
impl crate::backend::StorageBackend for Storage {
fn append_batch(
&mut self,
stream_id: StreamId,
events: Vec<Bytes>,
expected_offset: Offset,
prev_hash: Option<ChainHash>,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError> {
Storage::append_batch(self, stream_id, events, expected_offset, prev_hash, fsync)
}
fn read_from(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError> {
Storage::read_from(self, stream_id, from_offset, max_bytes)
}
fn latest_chain_hash(
&mut self,
stream_id: StreamId,
) -> Result<Option<ChainHash>, StorageError> {
Storage::latest_chain_hash(self, stream_id)
}
fn segment_count(&self, stream_id: StreamId) -> usize {
Storage::segment_count(self, stream_id)
}
fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
Storage::completed_segments(self, stream_id)
}
fn flush_indexes(&mut self) -> Result<(), StorageError> {
Storage::flush_indexes(self)
}
#[cfg(feature = "fuzz-reset")]
fn reset(&mut self) -> Result<(), StorageError> {
Storage::reset(self)
}
}