use bytes::{BufMut, Bytes, BytesMut};
use std::sync::atomic::{AtomicU64, Ordering};
pub struct BatchEncoder {
buffer: BytesMut,
message_count: usize,
stats: EncoderStats,
}
impl BatchEncoder {
pub fn new() -> Self {
Self::with_capacity(64 * 1024) }
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: BytesMut::with_capacity(capacity),
message_count: 0,
stats: EncoderStats::new(),
}
}
pub fn add_message(&mut self, key: Option<&[u8]>, value: &[u8], timestamp: i64) {
let key_len = key.map(|k| k.len()).unwrap_or(0);
let total_len = 8 + 4 + key_len + 4 + value.len() + 4;
if self.buffer.remaining_mut() < 4 + total_len {
self.buffer.reserve(4 + total_len);
}
self.buffer.put_u32(total_len as u32);
self.buffer.put_i64(timestamp);
self.buffer.put_u32(key_len as u32);
if let Some(k) = key {
self.buffer.extend_from_slice(k);
}
self.buffer.put_u32(value.len() as u32);
self.buffer.extend_from_slice(value);
let crc = crc32_fast(&self.buffer[self.buffer.len() - total_len + 4..]);
self.buffer.put_u32(crc);
self.message_count += 1;
self.stats.messages_encoded.fetch_add(1, Ordering::Relaxed);
self.stats
.bytes_encoded
.fetch_add((4 + total_len) as u64, Ordering::Relaxed);
}
pub fn add_messages(&mut self, messages: &[BatchMessage]) {
let total_size: usize = messages
.iter()
.map(|m| {
let key_len = m.key.as_ref().map(|k| k.len()).unwrap_or(0);
4 + 8 + 4 + key_len + 4 + m.value.len() + 4
})
.sum();
self.buffer.reserve(total_size);
for msg in messages {
self.add_message(msg.key.as_deref(), &msg.value, msg.timestamp);
}
}
pub fn finish(self) -> Bytes {
self.buffer.freeze()
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn message_count(&self) -> usize {
self.message_count
}
pub fn reset(&mut self) {
self.buffer.clear();
self.message_count = 0;
}
pub fn stats(&self) -> EncoderStatsSnapshot {
EncoderStatsSnapshot {
messages_encoded: self.stats.messages_encoded.load(Ordering::Relaxed),
bytes_encoded: self.stats.bytes_encoded.load(Ordering::Relaxed),
}
}
}
impl Default for BatchEncoder {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct BatchMessage {
pub key: Option<Vec<u8>>,
pub value: Vec<u8>,
pub timestamp: i64,
}
impl BatchMessage {
pub fn new(value: Vec<u8>) -> Self {
Self {
key: None,
value,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64,
}
}
pub fn with_key(key: Vec<u8>, value: Vec<u8>) -> Self {
Self {
key: Some(key),
value,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64,
}
}
}
struct EncoderStats {
messages_encoded: AtomicU64,
bytes_encoded: AtomicU64,
}
impl EncoderStats {
fn new() -> Self {
Self {
messages_encoded: AtomicU64::new(0),
bytes_encoded: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct EncoderStatsSnapshot {
pub messages_encoded: u64,
pub bytes_encoded: u64,
}
pub struct BatchDecoder {
stats: DecoderStats,
}
impl BatchDecoder {
pub fn new() -> Self {
Self {
stats: DecoderStats::new(),
}
}
pub fn decode_all(&self, data: &[u8]) -> Vec<DecodedMessage> {
let mut messages = Vec::new();
let mut offset = 0;
while offset + 4 <= data.len() {
let total_len = u32::from_be_bytes([
data[offset],
data[offset + 1],
data[offset + 2],
data[offset + 3],
]) as usize;
if offset + 4 + total_len > data.len() {
break;
}
if let Some(msg) = self.decode_message(&data[offset + 4..offset + 4 + total_len]) {
messages.push(msg);
self.stats.messages_decoded.fetch_add(1, Ordering::Relaxed);
}
offset += 4 + total_len;
}
self.stats
.bytes_decoded
.fetch_add(offset as u64, Ordering::Relaxed);
messages
}
fn decode_message(&self, data: &[u8]) -> Option<DecodedMessage> {
if data.len() < 20 {
return None;
}
let stored_crc = u32::from_be_bytes([
data[data.len() - 4],
data[data.len() - 3],
data[data.len() - 2],
data[data.len() - 1],
]);
let computed_crc = crc32_fast(&data[..data.len() - 4]);
if stored_crc != computed_crc {
return None;
}
let timestamp = i64::from_be_bytes([
data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
]);
let key_len = u32::from_be_bytes([data[8], data[9], data[10], data[11]]) as usize;
let key = if key_len > 0 {
Some(Bytes::copy_from_slice(&data[12..12 + key_len]))
} else {
None
};
let value_offset = 12 + key_len;
let value_len = u32::from_be_bytes([
data[value_offset],
data[value_offset + 1],
data[value_offset + 2],
data[value_offset + 3],
]) as usize;
let value = Bytes::copy_from_slice(&data[value_offset + 4..value_offset + 4 + value_len]);
Some(DecodedMessage {
timestamp,
key,
value,
})
}
pub fn stats(&self) -> DecoderStatsSnapshot {
DecoderStatsSnapshot {
messages_decoded: self.stats.messages_decoded.load(Ordering::Relaxed),
bytes_decoded: self.stats.bytes_decoded.load(Ordering::Relaxed),
}
}
}
impl Default for BatchDecoder {
fn default() -> Self {
Self::new()
}
}
struct DecoderStats {
messages_decoded: AtomicU64,
bytes_decoded: AtomicU64,
}
impl DecoderStats {
fn new() -> Self {
Self {
messages_decoded: AtomicU64::new(0),
bytes_decoded: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct DecoderStatsSnapshot {
pub messages_decoded: u64,
pub bytes_decoded: u64,
}
#[derive(Debug, Clone)]
pub struct DecodedMessage {
pub timestamp: i64,
pub key: Option<Bytes>,
pub value: Bytes,
}
#[inline]
pub fn crc32_fast(data: &[u8]) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(data);
hasher.finalize()
}
pub fn crc32_batch(buffers: &[&[u8]]) -> Vec<u32> {
buffers.iter().map(|buf| crc32_fast(buf)).collect()
}
#[inline]
pub fn memcmp_fast(a: &[u8], b: &[u8]) -> std::cmp::Ordering {
a.cmp(b)
}
#[inline]
pub fn memchr_fast(needle: u8, haystack: &[u8]) -> Option<usize> {
memchr::memchr(needle, haystack)
}
#[inline]
pub fn memmem_fast(needle: &[u8], haystack: &[u8]) -> Option<usize> {
memchr::memmem::find(haystack, needle)
}
pub struct BatchProcessor {
workers: usize,
stats: ProcessorStats,
}
impl BatchProcessor {
pub fn new(workers: usize) -> Self {
Self {
workers: workers.max(1),
stats: ProcessorStats::new(),
}
}
pub fn process<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
where
T: Send + Sync,
R: Send,
F: Fn(&T) -> R + Send + Sync,
{
if items.len() <= self.workers {
return items.iter().map(&f).collect();
}
let chunk_size = items.len().div_ceil(self.workers);
std::thread::scope(|s| {
let mut handles = vec![];
for chunk in items.chunks(chunk_size) {
let f = &f;
handles.push(s.spawn(move || chunk.iter().map(f).collect::<Vec<_>>()));
}
let mut results = Vec::with_capacity(items.len());
for handle in handles {
results.extend(handle.join().unwrap());
}
self.stats.batches_processed.fetch_add(1, Ordering::Relaxed);
self.stats
.items_processed
.fetch_add(items.len() as u64, Ordering::Relaxed);
results
})
}
pub fn filter_map<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
where
T: Send + Sync,
R: Send,
F: Fn(&T) -> Option<R> + Send + Sync,
{
if items.len() <= self.workers {
return items.iter().filter_map(&f).collect();
}
let chunk_size = items.len().div_ceil(self.workers);
std::thread::scope(|s| {
let mut handles = vec![];
for chunk in items.chunks(chunk_size) {
let f = &f;
handles.push(s.spawn(move || chunk.iter().filter_map(f).collect::<Vec<_>>()));
}
let mut results = Vec::with_capacity(items.len());
for handle in handles {
results.extend(handle.join().unwrap());
}
results
})
}
pub fn stats(&self) -> ProcessorStatsSnapshot {
ProcessorStatsSnapshot {
batches_processed: self.stats.batches_processed.load(Ordering::Relaxed),
items_processed: self.stats.items_processed.load(Ordering::Relaxed),
}
}
}
struct ProcessorStats {
batches_processed: AtomicU64,
items_processed: AtomicU64,
}
impl ProcessorStats {
fn new() -> Self {
Self {
batches_processed: AtomicU64::new(0),
items_processed: AtomicU64::new(0),
}
}
}
#[derive(Debug, Clone)]
pub struct ProcessorStatsSnapshot {
pub batches_processed: u64,
pub items_processed: u64,
}
#[derive(Debug)]
pub struct RecordBatch {
pub len: usize,
pub timestamps: Vec<i64>,
pub key_offsets: Vec<u32>,
pub key_data: Vec<u8>,
pub value_offsets: Vec<u32>,
pub value_data: Vec<u8>,
}
impl RecordBatch {
pub fn new() -> Self {
Self {
len: 0,
timestamps: Vec::new(),
key_offsets: vec![0],
key_data: Vec::new(),
value_offsets: vec![0],
value_data: Vec::new(),
}
}
pub fn with_capacity(records: usize, avg_key_size: usize, avg_value_size: usize) -> Self {
Self {
len: 0,
timestamps: Vec::with_capacity(records),
key_offsets: Vec::with_capacity(records + 1),
key_data: Vec::with_capacity(records * avg_key_size),
value_offsets: Vec::with_capacity(records + 1),
value_data: Vec::with_capacity(records * avg_value_size),
}
}
pub fn add(&mut self, timestamp: i64, key: Option<&[u8]>, value: &[u8]) {
self.timestamps.push(timestamp);
if let Some(k) = key {
self.key_data.extend_from_slice(k);
}
self.key_offsets.push(self.key_data.len() as u32);
self.value_data.extend_from_slice(value);
self.value_offsets.push(self.value_data.len() as u32);
self.len += 1;
}
pub fn timestamp(&self, idx: usize) -> i64 {
self.timestamps[idx]
}
pub fn key(&self, idx: usize) -> Option<&[u8]> {
let start = self.key_offsets[idx] as usize;
let end = self.key_offsets[idx + 1] as usize;
if start == end {
None
} else {
Some(&self.key_data[start..end])
}
}
pub fn value(&self, idx: usize) -> &[u8] {
let start = self.value_offsets[idx] as usize;
let end = self.value_offsets[idx + 1] as usize;
&self.value_data[start..end]
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn memory_size(&self) -> usize {
self.timestamps.len() * 8
+ self.key_offsets.len() * 4
+ self.key_data.len()
+ self.value_offsets.len() * 4
+ self.value_data.len()
}
pub fn filter<F>(&self, predicate: F) -> RecordBatch
where
F: Fn(i64, Option<&[u8]>, &[u8]) -> bool,
{
let mut batch = RecordBatch::new();
for i in 0..self.len {
let ts = self.timestamp(i);
let key = self.key(i);
let value = self.value(i);
if predicate(ts, key, value) {
batch.add(ts, key, value);
}
}
batch
}
pub fn map_values<F>(&self, transform: F) -> RecordBatch
where
F: Fn(&[u8]) -> Vec<u8>,
{
let mut batch = RecordBatch::new();
for i in 0..self.len {
let ts = self.timestamp(i);
let key = self.key(i);
let value = transform(self.value(i));
batch.add(ts, key, &value);
}
batch
}
}
impl Default for RecordBatch {
fn default() -> Self {
Self::new()
}
}
pub struct RecordBatchIter<'a> {
batch: &'a RecordBatch,
idx: usize,
}
impl<'a> Iterator for RecordBatchIter<'a> {
type Item = (i64, Option<&'a [u8]>, &'a [u8]);
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.batch.len {
return None;
}
let ts = self.batch.timestamp(self.idx);
let key = self.batch.key(self.idx);
let value = self.batch.value(self.idx);
self.idx += 1;
Some((ts, key, value))
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.batch.len - self.idx;
(remaining, Some(remaining))
}
}
impl<'a> ExactSizeIterator for RecordBatchIter<'a> {}
impl<'a> IntoIterator for &'a RecordBatch {
type Item = (i64, Option<&'a [u8]>, &'a [u8]);
type IntoIter = RecordBatchIter<'a>;
fn into_iter(self) -> Self::IntoIter {
RecordBatchIter {
batch: self,
idx: 0,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batch_encoder_decoder() {
let mut encoder = BatchEncoder::new();
encoder.add_message(Some(b"key1"), b"value1", 1000);
encoder.add_message(Some(b"key2"), b"value2", 2000);
encoder.add_message(None, b"value3", 3000);
assert_eq!(encoder.message_count(), 3);
let encoded = encoder.finish();
let decoder = BatchDecoder::new();
let messages = decoder.decode_all(&encoded);
assert_eq!(messages.len(), 3);
assert_eq!(messages[0].timestamp, 1000);
assert_eq!(messages[0].key.as_ref().unwrap().as_ref(), b"key1");
assert_eq!(messages[0].value.as_ref(), b"value1");
assert_eq!(messages[2].timestamp, 3000);
assert!(messages[2].key.is_none());
assert_eq!(messages[2].value.as_ref(), b"value3");
}
#[test]
fn test_batch_messages() {
let mut encoder = BatchEncoder::new();
let messages = vec![
BatchMessage::with_key(b"k1".to_vec(), b"v1".to_vec()),
BatchMessage::with_key(b"k2".to_vec(), b"v2".to_vec()),
BatchMessage::new(b"v3".to_vec()),
];
encoder.add_messages(&messages);
assert_eq!(encoder.message_count(), 3);
let encoded = encoder.finish();
let decoder = BatchDecoder::new();
let decoded = decoder.decode_all(&encoded);
assert_eq!(decoded.len(), 3);
}
#[test]
fn test_crc32_fast() {
let data = b"Hello, World!";
let crc = crc32_fast(data);
assert_eq!(crc, crc32_fast(data));
let crc2 = crc32_fast(b"Different data");
assert_ne!(crc, crc2);
}
#[test]
fn test_crc32_batch() {
let buffers: Vec<&[u8]> = vec![b"data1", b"data2", b"data3"];
let crcs = crc32_batch(&buffers);
assert_eq!(crcs.len(), 3);
assert_eq!(crcs[0], crc32_fast(b"data1"));
assert_eq!(crcs[1], crc32_fast(b"data2"));
assert_eq!(crcs[2], crc32_fast(b"data3"));
}
#[test]
fn test_batch_processor() {
let processor = BatchProcessor::new(4);
let items: Vec<i32> = (0..100).collect();
let results = processor.process(items, |x| x * 2);
assert_eq!(results.len(), 100);
for (i, r) in results.iter().enumerate() {
assert_eq!(*r, (i as i32) * 2);
}
let stats = processor.stats();
assert_eq!(stats.items_processed, 100);
}
#[test]
fn test_batch_processor_filter_map() {
let processor = BatchProcessor::new(4);
let items: Vec<i32> = (0..100).collect();
let results = processor.filter_map(items, |x| if x % 2 == 0 { Some(x * 2) } else { None });
assert_eq!(results.len(), 50);
for r in &results {
assert_eq!(r % 4, 0);
}
}
#[test]
fn test_record_batch() {
let mut batch = RecordBatch::new();
batch.add(1000, Some(b"key1"), b"value1");
batch.add(2000, Some(b"key2"), b"value2222");
batch.add(3000, None, b"v3");
assert_eq!(batch.len, 3);
assert_eq!(batch.timestamp(0), 1000);
assert_eq!(batch.key(0), Some(&b"key1"[..]));
assert_eq!(batch.value(0), b"value1");
assert_eq!(batch.timestamp(1), 2000);
assert_eq!(batch.key(1), Some(&b"key2"[..]));
assert_eq!(batch.value(1), b"value2222");
assert_eq!(batch.timestamp(2), 3000);
assert_eq!(batch.key(2), None);
assert_eq!(batch.value(2), b"v3");
}
#[test]
fn test_record_batch_filter() {
let mut batch = RecordBatch::new();
for i in 0..10 {
batch.add(
i * 100,
Some(format!("key{}", i).as_bytes()),
format!("value{}", i).as_bytes(),
);
}
let filtered = batch.filter(|ts, _, _| ts >= 500);
assert_eq!(filtered.len, 5);
assert_eq!(filtered.timestamp(0), 500);
}
#[test]
fn test_record_batch_iter() {
let mut batch = RecordBatch::new();
batch.add(1000, Some(b"k1"), b"v1");
batch.add(2000, Some(b"k2"), b"v2");
let collected: Vec<_> = batch.into_iter().collect();
assert_eq!(collected.len(), 2);
assert_eq!(collected[0].0, 1000);
assert_eq!(collected[1].0, 2000);
}
#[test]
fn test_record_batch_map_values() {
let mut batch = RecordBatch::new();
batch.add(1000, None, b"hello");
batch.add(2000, None, b"world");
let mapped = batch.map_values(|v| v.iter().map(|b| b.to_ascii_uppercase()).collect());
assert_eq!(mapped.value(0), b"HELLO");
assert_eq!(mapped.value(1), b"WORLD");
}
#[test]
fn test_memchr_fast() {
let haystack = b"hello, world!";
assert_eq!(memchr_fast(b'w', haystack), Some(7));
assert_eq!(memchr_fast(b'x', haystack), None);
}
#[test]
fn test_memmem_fast() {
let haystack = b"hello, world! world!";
assert_eq!(memmem_fast(b"world", haystack), Some(7));
assert_eq!(memmem_fast(b"xyz", haystack), None);
}
}