use bytes::{Bytes, BytesMut};
use kimberlite_crypto::ChainHash;
use kimberlite_types::{CompressionKind, Offset, RecordKind};
use crate::Record;
use crate::StorageError;
use crate::codec::CodecRegistry;
#[derive(Debug)]
pub struct PreparedBatch {
pub data: BytesMut,
pub index_entries: Vec<(Offset, u64)>,
pub final_hash: ChainHash,
pub bytes_written: u64,
pub records_written: usize,
}
#[derive(Debug)]
pub struct AppendPipeline {
prepare_buf: BytesMut,
default_capacity: usize,
}
impl AppendPipeline {
pub fn new(default_capacity: usize) -> Self {
Self {
prepare_buf: BytesMut::with_capacity(default_capacity),
default_capacity,
}
}
pub fn prepare_batch(
&mut self,
events: &[Bytes],
start_offset: Offset,
prev_hash: Option<ChainHash>,
base_byte_pos: u64,
compression: CompressionKind,
codecs: &CodecRegistry,
) -> Result<PreparedBatch, StorageError> {
assert!(!events.is_empty(), "cannot prepare empty batch");
self.prepare_buf.clear();
let mut index_entries = Vec::with_capacity(events.len());
let mut current_offset = start_offset;
let mut current_hash = prev_hash;
let mut byte_pos = base_byte_pos;
for event in events {
index_entries.push((current_offset, byte_pos));
let (stored_payload, record_compression) = if compression == CompressionKind::None {
(event.clone(), CompressionKind::None)
} else {
let compressed = codecs.compress(compression, event)?;
if compressed.len() < event.len() {
(Bytes::from(compressed), compression)
} else {
(event.clone(), CompressionKind::None)
}
};
let hash_record = Record::new(current_offset, current_hash, event.clone());
current_hash = Some(hash_record.compute_hash());
let record = Record::with_compression(
current_offset,
hash_record.prev_hash(),
RecordKind::Data,
record_compression,
stored_payload,
);
record.to_bytes_into(&mut self.prepare_buf);
byte_pos = base_byte_pos + self.prepare_buf.len() as u64;
current_offset += Offset::from(1u64);
}
let data = self.take_prepared();
let bytes_written = data.len() as u64;
Ok(PreparedBatch {
data,
index_entries,
final_hash: current_hash.expect("batch was non-empty"),
bytes_written,
records_written: events.len(),
})
}
pub fn prepare_buffer(&mut self) -> &mut BytesMut {
self.prepare_buf.clear();
&mut self.prepare_buf
}
fn take_prepared(&mut self) -> BytesMut {
std::mem::replace(
&mut self.prepare_buf,
BytesMut::with_capacity(self.default_capacity),
)
}
pub fn default_capacity(&self) -> usize {
self.default_capacity
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pipeline_buffer_lifecycle() {
let mut pipeline = AppendPipeline::new(4096);
let buf = pipeline.prepare_buffer();
buf.extend_from_slice(b"hello world");
assert_eq!(buf.len(), 11);
let prepared = pipeline.take_prepared();
assert_eq!(&prepared[..], b"hello world");
let buf = pipeline.prepare_buffer();
assert!(buf.is_empty());
}
#[test]
fn prepared_batch_fields() {
let batch = PreparedBatch {
data: BytesMut::from(&b"test"[..]),
index_entries: vec![(Offset::new(0), 0), (Offset::new(1), 50)],
final_hash: ChainHash::from_bytes(&[0xab; 32]),
bytes_written: 100,
records_written: 2,
};
assert_eq!(batch.records_written, 2);
assert_eq!(batch.bytes_written, 100);
assert_eq!(batch.index_entries.len(), 2);
}
#[test]
fn prepare_batch_uncompressed() {
let mut pipeline = AppendPipeline::new(4096);
let codecs = CodecRegistry::new();
let events = vec![
Bytes::from("event one"),
Bytes::from("event two"),
Bytes::from("event three"),
];
let batch = pipeline
.prepare_batch(
&events,
Offset::new(0),
None,
0,
CompressionKind::None,
&codecs,
)
.unwrap();
assert_eq!(batch.records_written, 3);
assert_eq!(batch.index_entries.len(), 3);
assert!(batch.bytes_written > 0);
assert_eq!(batch.index_entries[0].1, 0);
assert_eq!(batch.index_entries[0].0, Offset::new(0));
assert_eq!(batch.index_entries[1].0, Offset::new(1));
assert_eq!(batch.index_entries[2].0, Offset::new(2));
}
#[test]
fn prepare_batch_with_lz4() {
let mut pipeline = AppendPipeline::new(4096);
let codecs = CodecRegistry::new();
let events = vec![Bytes::from(vec![42u8; 1000])];
let batch = pipeline
.prepare_batch(
&events,
Offset::new(0),
None,
0,
CompressionKind::Lz4,
&codecs,
)
.unwrap();
assert_eq!(batch.records_written, 1);
assert!(batch.bytes_written < 1050);
}
#[test]
fn prepare_batch_chain_continuity() {
let mut pipeline = AppendPipeline::new(4096);
let codecs = CodecRegistry::new();
let events1 = vec![Bytes::from("first")];
let batch1 = pipeline
.prepare_batch(
&events1,
Offset::new(0),
None,
0,
CompressionKind::None,
&codecs,
)
.unwrap();
let events2 = vec![Bytes::from("second")];
let batch2 = pipeline
.prepare_batch(
&events2,
Offset::new(1),
Some(batch1.final_hash),
batch1.bytes_written,
CompressionKind::None,
&codecs,
)
.unwrap();
assert_eq!(batch2.records_written, 1);
assert_eq!(batch2.index_entries[0].0, Offset::new(1));
assert_eq!(batch2.index_entries[0].1, batch1.bytes_written);
assert_ne!(batch1.final_hash, batch2.final_hash);
}
#[test]
#[should_panic(expected = "cannot prepare empty batch")]
fn prepare_batch_empty_panics() {
let mut pipeline = AppendPipeline::new(4096);
let codecs = CodecRegistry::new();
let _ =
pipeline.prepare_batch(&[], Offset::new(0), None, 0, CompressionKind::None, &codecs);
}
}