use std::collections::HashMap;
use bytes::Bytes;
use kimberlite_crypto::ChainHash;
use kimberlite_types::{CompressionKind, Offset, RecordKind, StreamId};
use crate::backend::StorageBackend;
use crate::error::StorageError;
use crate::record::Record;
const DEFAULT_MAX_SEGMENT_SIZE: u64 = 256 * 1024 * 1024;
#[derive(Debug, Clone, Copy)]
#[allow(dead_code)] struct SegmentMeta {
segment_num: u32,
first_offset: u64,
next_offset: u64,
size_bytes: u64,
}
#[derive(Debug)]
struct StreamState {
bytes: Vec<u8>,
record_starts: Vec<u64>,
segments: Vec<SegmentMeta>,
active_segment: u32,
}
impl StreamState {
fn new() -> Self {
Self {
bytes: Vec::new(),
record_starts: Vec::new(),
segments: vec![SegmentMeta {
segment_num: 0,
first_offset: 0,
next_offset: 0,
size_bytes: 0,
}],
active_segment: 0,
}
}
fn active_mut(&mut self) -> &mut SegmentMeta {
let active = self.active_segment;
self.segments
.iter_mut()
.find(|s| s.segment_num == active)
.expect("active segment must exist in manifest")
}
fn rotate(&mut self, first_offset: u64) {
let new_num = self.active_segment + 1;
self.segments.push(SegmentMeta {
segment_num: new_num,
first_offset,
next_offset: first_offset,
size_bytes: 0,
});
self.active_segment = new_num;
}
}
#[derive(Debug)]
pub struct MemoryStorage {
streams: HashMap<StreamId, StreamState>,
max_segment_size: u64,
}
impl MemoryStorage {
pub fn new() -> Self {
Self {
streams: HashMap::new(),
max_segment_size: DEFAULT_MAX_SEGMENT_SIZE,
}
}
pub fn with_max_segment_size(max_segment_size: u64) -> Self {
Self {
streams: HashMap::new(),
max_segment_size,
}
}
pub fn max_segment_size(&self) -> u64 {
self.max_segment_size
}
}
impl Default for MemoryStorage {
fn default() -> Self {
Self::new()
}
}
impl StorageBackend for MemoryStorage {
fn append_batch(
&mut self,
stream_id: StreamId,
events: Vec<Bytes>,
expected_offset: Offset,
prev_hash: Option<ChainHash>,
_fsync: bool,
) -> Result<(Offset, ChainHash), StorageError> {
assert!(!events.is_empty(), "cannot append empty batch");
let stream = self
.streams
.entry(stream_id)
.or_insert_with(StreamState::new);
let mut current_offset = expected_offset;
let mut current_hash = prev_hash;
for event in events {
let hash_record = Record::new(current_offset, current_hash, event.clone());
let record_hash = hash_record.compute_hash();
let stored_record = Record::with_compression(
current_offset,
current_hash,
RecordKind::Data,
CompressionKind::None,
event,
);
let bytes = stored_record.to_bytes();
stream.record_starts.push(stream.bytes.len() as u64);
stream.bytes.extend_from_slice(&bytes);
{
let active = stream.active_mut();
active.next_offset = current_offset.as_u64() + 1;
active.size_bytes += bytes.len() as u64;
}
current_hash = Some(record_hash);
current_offset += Offset::from(1u64);
}
let should_rotate = stream.active_mut().size_bytes >= self.max_segment_size;
if should_rotate {
stream.rotate(current_offset.as_u64());
}
assert!(
current_offset.as_u64() >= expected_offset.as_u64(),
"offset must only advance forward after append_batch"
);
Ok((
current_offset,
current_hash.expect("batch was non-empty, hash must be set"),
))
}
fn read_from(
&mut self,
stream_id: StreamId,
from_offset: Offset,
max_bytes: u64,
) -> Result<Vec<Bytes>, StorageError> {
let Some(stream) = self.streams.get(&stream_id) else {
return Ok(Vec::new());
};
let mut results = Vec::new();
let mut bytes_read: u64 = 0;
let mut expected_prev_hash: Option<ChainHash> = None;
let buf = Bytes::copy_from_slice(&stream.bytes);
let mut pos = 0usize;
while pos < buf.len() && bytes_read < max_bytes {
let (record, consumed) = Record::from_bytes(&buf.slice(pos..))?;
if record.prev_hash() != expected_prev_hash {
return Err(StorageError::ChainVerificationFailed {
offset: record.offset(),
expected: expected_prev_hash,
actual: record.prev_hash(),
});
}
expected_prev_hash = Some(record.compute_hash());
pos += consumed;
if record.offset() < from_offset || record.is_checkpoint() {
continue;
}
bytes_read += record.payload().len() as u64;
results.push(record.payload().clone());
}
Ok(results)
}
fn latest_chain_hash(
&mut self,
stream_id: StreamId,
) -> Result<Option<ChainHash>, StorageError> {
let Some(stream) = self.streams.get(&stream_id) else {
return Ok(None);
};
if stream.bytes.is_empty() {
return Ok(None);
}
let buf = Bytes::copy_from_slice(&stream.bytes);
let mut pos = 0usize;
let mut last_hash: Option<ChainHash> = None;
while pos < buf.len() {
let (record, consumed) = Record::from_bytes(&buf.slice(pos..))?;
last_hash = Some(record.compute_hash());
pos += consumed;
}
Ok(last_hash)
}
fn segment_count(&self, stream_id: StreamId) -> usize {
self.streams.get(&stream_id).map_or(0, |s| s.segments.len())
}
fn completed_segments(&self, stream_id: StreamId) -> Vec<u32> {
self.streams.get(&stream_id).map_or_else(Vec::new, |s| {
s.segments
.iter()
.filter(|seg| seg.segment_num != s.active_segment)
.map(|seg| seg.segment_num)
.collect()
})
}
fn flush_indexes(&mut self) -> Result<(), StorageError> {
Ok(())
}
#[cfg(feature = "fuzz-reset")]
fn reset(&mut self) -> Result<(), StorageError> {
self.streams.clear();
Ok(())
}
}