use std::collections::HashMap;
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use bytes::Bytes;
use kimberlite_crypto::ChainHash;
use kimberlite_types::{CheckpointPolicy, Offset, RecordKind, StreamId};
use crate::checkpoint::{
CheckpointIndex, deserialize_checkpoint_payload, serialize_checkpoint_payload,
};
use crate::{OffsetIndex, Record, StorageError};
const SEGMENT_FILENAME: &str = "segment_000000.log";
#[derive(Debug, Clone)]
pub struct Storage {
data_dir: PathBuf,
index_cache: HashMap<StreamId, OffsetIndex>,
checkpoint_cache: HashMap<StreamId, CheckpointIndex>,
checkpoint_policy: CheckpointPolicy,
}
impl Storage {
pub fn new(data_dir: impl Into<PathBuf>) -> Self {
Self::with_checkpoint_policy(data_dir, CheckpointPolicy::default())
}
pub fn with_checkpoint_policy(
data_dir: impl Into<PathBuf>,
checkpoint_policy: CheckpointPolicy,
) -> Self {
Self {
data_dir: data_dir.into(),
index_cache: HashMap::new(),
checkpoint_cache: HashMap::new(),
checkpoint_policy,
}
}
pub fn checkpoint_policy(&self) -> &CheckpointPolicy {
&self.checkpoint_policy
}
pub fn data_dir(&self) -> &PathBuf {
&self.data_dir
}
fn segment_path(&self, stream_id: StreamId) -> PathBuf {
self.data_dir
.join(stream_id.to_string())
.join(SEGMENT_FILENAME)
}
pub fn index_path(&self, stream_id: StreamId) -> PathBuf {
let mut path = self.segment_path(stream_id);
path.set_extension("log.idx");
path
}
pub fn rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
let segment_path = self.segment_path(stream_id);
if !segment_path.exists() {
return Ok(OffsetIndex::new());
}
let data: Bytes = fs::read(&segment_path)?.into();
let mut index = OffsetIndex::new();
let mut pos = 0;
while pos < data.len() {
index.append(pos as u64);
let (_, consumed) = Record::from_bytes(&data.slice(pos..))?;
pos += consumed;
}
debug_assert_eq!(
index.len(),
{
let mut count = 0;
let mut p = 0;
while p < data.len() {
let (_, c) = Record::from_bytes(&data.slice(p..)).unwrap();
p += c;
count += 1;
}
count
},
"index entry count mismatch"
);
index.save(&self.index_path(stream_id))?;
Ok(index)
}
pub fn load_or_rebuild_index(&self, stream_id: StreamId) -> Result<OffsetIndex, StorageError> {
let index_path = self.index_path(stream_id);
if let Ok(index) = OffsetIndex::load(&index_path) {
return Ok(index);
}
tracing::warn!(
stream_id = %stream_id,
"index missing or corrupted, rebuilding from log"
);
self.rebuild_index(stream_id)
}
pub fn append_batch(
&mut self,
stream_id: StreamId,
events: Vec<Bytes>,
expected_offset: Offset,
prev_hash: Option<ChainHash>,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError> {
assert!(!events.is_empty(), "cannot append empty batch");
let event_count = events.len();
let stream_dir = self.data_dir.join(stream_id.to_string());
fs::create_dir_all(&stream_dir)?;
let segment_path = self.segment_path(stream_id);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&segment_path)?;
let mut byte_position: u64 = file.metadata()?.len();
let index_path = self.index_path(stream_id);
if !self.index_cache.contains_key(&stream_id) {
let loaded = self.load_or_rebuild_index(stream_id)?;
self.index_cache.insert(stream_id, loaded);
}
let index = self
.index_cache
.get_mut(&stream_id)
.expect("index exists: just inserted or already present");
let mut current_offset = expected_offset;
let mut current_hash = prev_hash;
for event in events {
index.append(byte_position);
let record = Record::new(current_offset, current_hash, event);
let record_bytes = record.to_bytes();
byte_position += record_bytes.len() as u64;
file.write_all(&record_bytes)?;
current_hash = Some(record.compute_hash());
current_offset += Offset::from(1u64);
}
if fsync {
file.sync_all()?;
}
index.save(&index_path)?;
debug_assert_eq!(
current_offset.as_u64() - expected_offset.as_u64(),
event_count as u64,
"offset mismatch after batch write"
);
Ok((current_offset, current_hash.expect("batch was non-empty")))
}
pub fn read_from(
&self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError> {
let records = self.read_records_from(stream_id, from_offset, max_bytes)?;
Ok(records.into_iter().map(|r| r.payload().clone()).collect())
}
pub fn read_records_from(
&self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Record>, StorageError> {
let segment_path = self.segment_path(stream_id);
let data: Bytes = fs::read(&segment_path)?.into();
let mut results = Vec::new();
let mut bytes_read: u64 = 0;
let mut pos = 0;
let mut expected_prev_hash: Option<ChainHash> = None;
let mut records_verified: u64 = 0;
while pos < data.len() && bytes_read < max_bytes {
let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
if record.prev_hash() != expected_prev_hash {
return Err(StorageError::ChainVerificationFailed {
offset: record.offset(),
expected: expected_prev_hash,
actual: record.prev_hash(),
});
}
expected_prev_hash = Some(record.compute_hash());
records_verified += 1;
pos += consumed;
if record.offset() < from_offset {
continue;
}
bytes_read += record.payload().len() as u64;
results.push(record);
}
debug_assert!(
records_verified == 0 || expected_prev_hash.is_some(),
"verified records but no final hash"
);
Ok(results)
}
pub fn rebuild_checkpoint_index(
&self,
stream_id: StreamId,
) -> Result<CheckpointIndex, StorageError> {
let segment_path = self.segment_path(stream_id);
if !segment_path.exists() {
return Ok(CheckpointIndex::new());
}
let data: Bytes = fs::read(&segment_path)?.into();
let mut checkpoint_index = CheckpointIndex::new();
let mut pos = 0;
while pos < data.len() {
let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
if record.is_checkpoint() {
checkpoint_index.add(record.offset());
}
pos += consumed;
}
tracing::debug!(
stream_id = %stream_id,
checkpoint_count = checkpoint_index.len(),
"rebuilt checkpoint index"
);
Ok(checkpoint_index)
}
fn get_or_rebuild_checkpoint_index(
&mut self,
stream_id: StreamId,
) -> Result<&CheckpointIndex, StorageError> {
if !self.checkpoint_cache.contains_key(&stream_id) {
let index = self.rebuild_checkpoint_index(stream_id)?;
self.checkpoint_cache.insert(stream_id, index);
}
Ok(self
.checkpoint_cache
.get(&stream_id)
.expect("just inserted"))
}
pub fn create_checkpoint(
&mut self,
stream_id: StreamId,
current_offset: Offset,
prev_hash: Option<ChainHash>,
record_count: u64,
fsync: bool,
) -> Result<(Offset, ChainHash), StorageError> {
let chain_hash = prev_hash.unwrap_or_else(|| ChainHash::from_bytes(&[0u8; 32]));
let payload = serialize_checkpoint_payload(&chain_hash, record_count);
let record = Record::with_kind(current_offset, prev_hash, RecordKind::Checkpoint, payload);
let record_bytes = record.to_bytes();
let record_hash = record.compute_hash();
let stream_dir = self.data_dir.join(stream_id.to_string());
fs::create_dir_all(&stream_dir)?;
let segment_path = self.segment_path(stream_id);
let mut file = OpenOptions::new()
.create(true)
.append(true)
.open(&segment_path)?;
let byte_position = file.metadata()?.len();
file.write_all(&record_bytes)?;
if fsync {
file.sync_all()?;
}
let index_path = self.index_path(stream_id);
if !self.index_cache.contains_key(&stream_id) {
let loaded = self.load_or_rebuild_index(stream_id)?;
self.index_cache.insert(stream_id, loaded);
}
let index = self.index_cache.get_mut(&stream_id).expect("just loaded");
index.append(byte_position);
index.save(&index_path)?;
if let Some(cp_index) = self.checkpoint_cache.get_mut(&stream_id) {
cp_index.add(current_offset);
}
tracing::info!(
stream_id = %stream_id,
offset = %current_offset,
record_count = record_count,
"created checkpoint"
);
let next_offset = current_offset + Offset::from(1u64);
Ok((next_offset, record_hash))
}
pub fn read_records_verified(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Record>, StorageError> {
let segment_path = self.segment_path(stream_id);
if !segment_path.exists() {
return Ok(Vec::new());
}
let checkpoint_index = self.get_or_rebuild_checkpoint_index(stream_id)?;
let verification_start = checkpoint_index.find_nearest(from_offset);
let data: Bytes = fs::read(&segment_path)?.into();
let offset_index = if let Some(idx) = self.index_cache.get(&stream_id) {
idx.clone()
} else {
self.load_or_rebuild_index(stream_id)?
};
let (start_pos, mut expected_prev_hash) = match verification_start {
Some(cp_offset) => {
let byte_pos = offset_index
.lookup(cp_offset)
.ok_or(StorageError::UnexpectedEof)?;
let (cp_record, _) = Record::from_bytes(&data.slice(byte_pos as usize..))?;
debug_assert!(cp_record.is_checkpoint());
let (chain_hash, _) =
deserialize_checkpoint_payload(cp_record.payload(), cp_offset)?;
(byte_pos as usize, Some(chain_hash))
}
None => {
(0, None)
}
};
let mut results = Vec::new();
let mut bytes_read: u64 = 0;
let mut pos = start_pos;
while pos < data.len() && bytes_read < max_bytes {
let (record, consumed) = Record::from_bytes(&data.slice(pos..))?;
if record.prev_hash() != expected_prev_hash {
return Err(StorageError::ChainVerificationFailed {
offset: record.offset(),
expected: expected_prev_hash,
actual: record.prev_hash(),
});
}
expected_prev_hash = Some(record.compute_hash());
pos += consumed;
if record.offset() >= from_offset && !record.is_checkpoint() {
bytes_read += record.payload().len() as u64;
results.push(record);
}
}
Ok(results)
}
pub fn last_checkpoint(&mut self, stream_id: StreamId) -> Result<Option<Offset>, StorageError> {
let index = self.get_or_rebuild_checkpoint_index(stream_id)?;
Ok(index.last())
}
}