1use bytes::{BufMut, Bytes, BytesMut};
16use std::sync::atomic::{AtomicU64, Ordering};
17
18pub struct BatchEncoder {
20 buffer: BytesMut,
22 message_count: usize,
24 stats: EncoderStats,
26}
27
28impl BatchEncoder {
29 pub fn new() -> Self {
31 Self::with_capacity(64 * 1024) }
33
34 pub fn with_capacity(capacity: usize) -> Self {
36 Self {
37 buffer: BytesMut::with_capacity(capacity),
38 message_count: 0,
39 stats: EncoderStats::new(),
40 }
41 }
42
43 pub fn add_message(&mut self, key: Option<&[u8]>, value: &[u8], timestamp: i64) {
45 let key_len = key.map(|k| k.len()).unwrap_or(0);
49 let total_len = 8 + 4 + key_len + 4 + value.len() + 4;
50
51 if self.buffer.remaining_mut() < 4 + total_len {
53 self.buffer.reserve(4 + total_len);
54 }
55
56 self.buffer.put_u32(total_len as u32);
58
59 self.buffer.put_i64(timestamp);
61
62 self.buffer.put_u32(key_len as u32);
64 if let Some(k) = key {
65 self.buffer.extend_from_slice(k);
66 }
67
68 self.buffer.put_u32(value.len() as u32);
70 self.buffer.extend_from_slice(value);
71
72 let crc = crc32_fast(&self.buffer[self.buffer.len() - total_len + 4..]);
74 self.buffer.put_u32(crc);
75
76 self.message_count += 1;
77 self.stats.messages_encoded.fetch_add(1, Ordering::Relaxed);
78 self.stats
79 .bytes_encoded
80 .fetch_add((4 + total_len) as u64, Ordering::Relaxed);
81 }
82
83 pub fn add_messages(&mut self, messages: &[BatchMessage]) {
85 let total_size: usize = messages
87 .iter()
88 .map(|m| {
89 let key_len = m.key.as_ref().map(|k| k.len()).unwrap_or(0);
90 4 + 8 + 4 + key_len + 4 + m.value.len() + 4
91 })
92 .sum();
93
94 self.buffer.reserve(total_size);
95
96 for msg in messages {
98 self.add_message(msg.key.as_deref(), &msg.value, msg.timestamp);
99 }
100 }
101
102 pub fn finish(self) -> Bytes {
104 self.buffer.freeze()
105 }
106
107 pub fn len(&self) -> usize {
109 self.buffer.len()
110 }
111
112 pub fn is_empty(&self) -> bool {
114 self.buffer.is_empty()
115 }
116
117 pub fn message_count(&self) -> usize {
119 self.message_count
120 }
121
122 pub fn reset(&mut self) {
124 self.buffer.clear();
125 self.message_count = 0;
126 }
127
128 pub fn stats(&self) -> EncoderStatsSnapshot {
130 EncoderStatsSnapshot {
131 messages_encoded: self.stats.messages_encoded.load(Ordering::Relaxed),
132 bytes_encoded: self.stats.bytes_encoded.load(Ordering::Relaxed),
133 }
134 }
135}
136
137impl Default for BatchEncoder {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143#[derive(Debug, Clone)]
145pub struct BatchMessage {
146 pub key: Option<Vec<u8>>,
147 pub value: Vec<u8>,
148 pub timestamp: i64,
149}
150
151impl BatchMessage {
152 pub fn new(value: Vec<u8>) -> Self {
153 Self {
154 key: None,
155 value,
156 timestamp: std::time::SystemTime::now()
157 .duration_since(std::time::UNIX_EPOCH)
158 .unwrap_or_default()
159 .as_millis() as i64,
160 }
161 }
162
163 pub fn with_key(key: Vec<u8>, value: Vec<u8>) -> Self {
164 Self {
165 key: Some(key),
166 value,
167 timestamp: std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_millis() as i64,
171 }
172 }
173}
174
175struct EncoderStats {
176 messages_encoded: AtomicU64,
177 bytes_encoded: AtomicU64,
178}
179
180impl EncoderStats {
181 fn new() -> Self {
182 Self {
183 messages_encoded: AtomicU64::new(0),
184 bytes_encoded: AtomicU64::new(0),
185 }
186 }
187}
188
189#[derive(Debug, Clone)]
190pub struct EncoderStatsSnapshot {
191 pub messages_encoded: u64,
192 pub bytes_encoded: u64,
193}
194
195pub struct BatchDecoder {
197 stats: DecoderStats,
199}
200
201impl BatchDecoder {
202 pub fn new() -> Self {
203 Self {
204 stats: DecoderStats::new(),
205 }
206 }
207
208 pub fn decode_all(&self, data: &[u8]) -> Vec<DecodedMessage> {
210 let mut messages = Vec::new();
211 let mut offset = 0;
212
213 while offset + 4 <= data.len() {
214 let total_len = u32::from_be_bytes([
216 data[offset],
217 data[offset + 1],
218 data[offset + 2],
219 data[offset + 3],
220 ]) as usize;
221
222 if offset + 4 + total_len > data.len() {
223 break;
224 }
225
226 if let Some(msg) = self.decode_message(&data[offset + 4..offset + 4 + total_len]) {
227 messages.push(msg);
228 self.stats.messages_decoded.fetch_add(1, Ordering::Relaxed);
229 }
230
231 offset += 4 + total_len;
232 }
233
234 self.stats
235 .bytes_decoded
236 .fetch_add(offset as u64, Ordering::Relaxed);
237 messages
238 }
239
240 fn decode_message(&self, data: &[u8]) -> Option<DecodedMessage> {
242 if data.len() < 20 {
243 return None;
245 }
246
247 let stored_crc = u32::from_be_bytes([
249 data[data.len() - 4],
250 data[data.len() - 3],
251 data[data.len() - 2],
252 data[data.len() - 1],
253 ]);
254
255 let computed_crc = crc32_fast(&data[..data.len() - 4]);
256 if stored_crc != computed_crc {
257 return None;
258 }
259
260 let timestamp = i64::from_be_bytes([
262 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
263 ]);
264
265 let key_len = u32::from_be_bytes([data[8], data[9], data[10], data[11]]) as usize;
266
267 let key = if key_len > 0 {
268 Some(Bytes::copy_from_slice(&data[12..12 + key_len]))
269 } else {
270 None
271 };
272
273 let value_offset = 12 + key_len;
274 let value_len = u32::from_be_bytes([
275 data[value_offset],
276 data[value_offset + 1],
277 data[value_offset + 2],
278 data[value_offset + 3],
279 ]) as usize;
280
281 let value = Bytes::copy_from_slice(&data[value_offset + 4..value_offset + 4 + value_len]);
282
283 Some(DecodedMessage {
284 timestamp,
285 key,
286 value,
287 })
288 }
289
290 pub fn stats(&self) -> DecoderStatsSnapshot {
292 DecoderStatsSnapshot {
293 messages_decoded: self.stats.messages_decoded.load(Ordering::Relaxed),
294 bytes_decoded: self.stats.bytes_decoded.load(Ordering::Relaxed),
295 }
296 }
297}
298
299impl Default for BatchDecoder {
300 fn default() -> Self {
301 Self::new()
302 }
303}
304
305struct DecoderStats {
306 messages_decoded: AtomicU64,
307 bytes_decoded: AtomicU64,
308}
309
310impl DecoderStats {
311 fn new() -> Self {
312 Self {
313 messages_decoded: AtomicU64::new(0),
314 bytes_decoded: AtomicU64::new(0),
315 }
316 }
317}
318
319#[derive(Debug, Clone)]
320pub struct DecoderStatsSnapshot {
321 pub messages_decoded: u64,
322 pub bytes_decoded: u64,
323}
324
325#[derive(Debug, Clone)]
327pub struct DecodedMessage {
328 pub timestamp: i64,
329 pub key: Option<Bytes>,
330 pub value: Bytes,
331}
332
333#[inline]
335pub fn crc32_fast(data: &[u8]) -> u32 {
336 let mut hasher = crc32fast::Hasher::new();
338 hasher.update(data);
339 hasher.finalize()
340}
341
342pub fn crc32_batch(buffers: &[&[u8]]) -> Vec<u32> {
344 buffers.iter().map(|buf| crc32_fast(buf)).collect()
345}
346
347#[inline]
349pub fn memcmp_fast(a: &[u8], b: &[u8]) -> std::cmp::Ordering {
350 a.cmp(b)
351}
352
353#[inline]
355pub fn memchr_fast(needle: u8, haystack: &[u8]) -> Option<usize> {
356 memchr::memchr(needle, haystack)
357}
358
359#[inline]
361pub fn memmem_fast(needle: &[u8], haystack: &[u8]) -> Option<usize> {
362 memchr::memmem::find(haystack, needle)
363}
364
365pub struct BatchProcessor {
367 workers: usize,
369 stats: ProcessorStats,
371}
372
373impl BatchProcessor {
374 pub fn new(workers: usize) -> Self {
375 Self {
376 workers: workers.max(1),
377 stats: ProcessorStats::new(),
378 }
379 }
380
381 pub fn process<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
383 where
384 T: Send + Sync,
385 R: Send,
386 F: Fn(&T) -> R + Send + Sync,
387 {
388 if items.len() <= self.workers {
389 return items.iter().map(&f).collect();
391 }
392
393 let chunk_size = items.len().div_ceil(self.workers);
394
395 std::thread::scope(|s| {
396 let mut handles = vec![];
397
398 for chunk in items.chunks(chunk_size) {
399 let f = &f;
400 handles.push(s.spawn(move || chunk.iter().map(f).collect::<Vec<_>>()));
401 }
402
403 let mut results = Vec::with_capacity(items.len());
404 for handle in handles {
405 results.extend(handle.join().unwrap());
406 }
407
408 self.stats.batches_processed.fetch_add(1, Ordering::Relaxed);
409 self.stats
410 .items_processed
411 .fetch_add(items.len() as u64, Ordering::Relaxed);
412
413 results
414 })
415 }
416
417 pub fn filter_map<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
419 where
420 T: Send + Sync,
421 R: Send,
422 F: Fn(&T) -> Option<R> + Send + Sync,
423 {
424 if items.len() <= self.workers {
425 return items.iter().filter_map(&f).collect();
426 }
427
428 let chunk_size = items.len().div_ceil(self.workers);
429
430 std::thread::scope(|s| {
431 let mut handles = vec![];
432
433 for chunk in items.chunks(chunk_size) {
434 let f = &f;
435 handles.push(s.spawn(move || chunk.iter().filter_map(f).collect::<Vec<_>>()));
436 }
437
438 let mut results = Vec::with_capacity(items.len());
439 for handle in handles {
440 results.extend(handle.join().unwrap());
441 }
442
443 results
444 })
445 }
446
447 pub fn stats(&self) -> ProcessorStatsSnapshot {
449 ProcessorStatsSnapshot {
450 batches_processed: self.stats.batches_processed.load(Ordering::Relaxed),
451 items_processed: self.stats.items_processed.load(Ordering::Relaxed),
452 }
453 }
454}
455
456struct ProcessorStats {
457 batches_processed: AtomicU64,
458 items_processed: AtomicU64,
459}
460
461impl ProcessorStats {
462 fn new() -> Self {
463 Self {
464 batches_processed: AtomicU64::new(0),
465 items_processed: AtomicU64::new(0),
466 }
467 }
468}
469
470#[derive(Debug, Clone)]
471pub struct ProcessorStatsSnapshot {
472 pub batches_processed: u64,
473 pub items_processed: u64,
474}
475
476#[derive(Debug)]
478pub struct RecordBatch {
479 pub len: usize,
481 pub timestamps: Vec<i64>,
483 pub key_offsets: Vec<u32>,
485 pub key_data: Vec<u8>,
487 pub value_offsets: Vec<u32>,
489 pub value_data: Vec<u8>,
491}
492
493impl RecordBatch {
494 pub fn new() -> Self {
496 Self {
497 len: 0,
498 timestamps: Vec::new(),
499 key_offsets: vec![0],
500 key_data: Vec::new(),
501 value_offsets: vec![0],
502 value_data: Vec::new(),
503 }
504 }
505
506 pub fn with_capacity(records: usize, avg_key_size: usize, avg_value_size: usize) -> Self {
508 Self {
509 len: 0,
510 timestamps: Vec::with_capacity(records),
511 key_offsets: Vec::with_capacity(records + 1),
512 key_data: Vec::with_capacity(records * avg_key_size),
513 value_offsets: Vec::with_capacity(records + 1),
514 value_data: Vec::with_capacity(records * avg_value_size),
515 }
516 }
517
518 pub fn add(&mut self, timestamp: i64, key: Option<&[u8]>, value: &[u8]) {
520 self.timestamps.push(timestamp);
521
522 if let Some(k) = key {
523 self.key_data.extend_from_slice(k);
524 }
525 self.key_offsets.push(self.key_data.len() as u32);
526
527 self.value_data.extend_from_slice(value);
528 self.value_offsets.push(self.value_data.len() as u32);
529
530 self.len += 1;
531 }
532
533 pub fn timestamp(&self, idx: usize) -> i64 {
535 self.timestamps[idx]
536 }
537
538 pub fn key(&self, idx: usize) -> Option<&[u8]> {
540 let start = self.key_offsets[idx] as usize;
541 let end = self.key_offsets[idx + 1] as usize;
542 if start == end {
543 None
544 } else {
545 Some(&self.key_data[start..end])
546 }
547 }
548
549 pub fn value(&self, idx: usize) -> &[u8] {
551 let start = self.value_offsets[idx] as usize;
552 let end = self.value_offsets[idx + 1] as usize;
553 &self.value_data[start..end]
554 }
555
556 pub fn is_empty(&self) -> bool {
558 self.len == 0
559 }
560
561 pub fn memory_size(&self) -> usize {
563 self.timestamps.len() * 8
564 + self.key_offsets.len() * 4
565 + self.key_data.len()
566 + self.value_offsets.len() * 4
567 + self.value_data.len()
568 }
569
570 pub fn filter<F>(&self, predicate: F) -> RecordBatch
572 where
573 F: Fn(i64, Option<&[u8]>, &[u8]) -> bool,
574 {
575 let mut batch = RecordBatch::new();
576
577 for i in 0..self.len {
578 let ts = self.timestamp(i);
579 let key = self.key(i);
580 let value = self.value(i);
581
582 if predicate(ts, key, value) {
583 batch.add(ts, key, value);
584 }
585 }
586
587 batch
588 }
589
590 pub fn map_values<F>(&self, transform: F) -> RecordBatch
592 where
593 F: Fn(&[u8]) -> Vec<u8>,
594 {
595 let mut batch = RecordBatch::new();
596
597 for i in 0..self.len {
598 let ts = self.timestamp(i);
599 let key = self.key(i);
600 let value = transform(self.value(i));
601 batch.add(ts, key, &value);
602 }
603
604 batch
605 }
606}
607
608impl Default for RecordBatch {
609 fn default() -> Self {
610 Self::new()
611 }
612}
613
614pub struct RecordBatchIter<'a> {
616 batch: &'a RecordBatch,
617 idx: usize,
618}
619
620impl<'a> Iterator for RecordBatchIter<'a> {
621 type Item = (i64, Option<&'a [u8]>, &'a [u8]);
622
623 fn next(&mut self) -> Option<Self::Item> {
624 if self.idx >= self.batch.len {
625 return None;
626 }
627
628 let ts = self.batch.timestamp(self.idx);
629 let key = self.batch.key(self.idx);
630 let value = self.batch.value(self.idx);
631
632 self.idx += 1;
633 Some((ts, key, value))
634 }
635
636 fn size_hint(&self) -> (usize, Option<usize>) {
637 let remaining = self.batch.len - self.idx;
638 (remaining, Some(remaining))
639 }
640}
641
642impl<'a> ExactSizeIterator for RecordBatchIter<'a> {}
643
644impl<'a> IntoIterator for &'a RecordBatch {
645 type Item = (i64, Option<&'a [u8]>, &'a [u8]);
646 type IntoIter = RecordBatchIter<'a>;
647
648 fn into_iter(self) -> Self::IntoIter {
649 RecordBatchIter {
650 batch: self,
651 idx: 0,
652 }
653 }
654}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659
660 #[test]
661 fn test_batch_encoder_decoder() {
662 let mut encoder = BatchEncoder::new();
663
664 encoder.add_message(Some(b"key1"), b"value1", 1000);
666 encoder.add_message(Some(b"key2"), b"value2", 2000);
667 encoder.add_message(None, b"value3", 3000);
668
669 assert_eq!(encoder.message_count(), 3);
670
671 let encoded = encoder.finish();
672
673 let decoder = BatchDecoder::new();
675 let messages = decoder.decode_all(&encoded);
676
677 assert_eq!(messages.len(), 3);
678 assert_eq!(messages[0].timestamp, 1000);
679 assert_eq!(messages[0].key.as_ref().unwrap().as_ref(), b"key1");
680 assert_eq!(messages[0].value.as_ref(), b"value1");
681
682 assert_eq!(messages[2].timestamp, 3000);
683 assert!(messages[2].key.is_none());
684 assert_eq!(messages[2].value.as_ref(), b"value3");
685 }
686
687 #[test]
688 fn test_batch_messages() {
689 let mut encoder = BatchEncoder::new();
690
691 let messages = vec![
692 BatchMessage::with_key(b"k1".to_vec(), b"v1".to_vec()),
693 BatchMessage::with_key(b"k2".to_vec(), b"v2".to_vec()),
694 BatchMessage::new(b"v3".to_vec()),
695 ];
696
697 encoder.add_messages(&messages);
698
699 assert_eq!(encoder.message_count(), 3);
700
701 let encoded = encoder.finish();
702 let decoder = BatchDecoder::new();
703 let decoded = decoder.decode_all(&encoded);
704
705 assert_eq!(decoded.len(), 3);
706 }
707
708 #[test]
709 fn test_crc32_fast() {
710 let data = b"Hello, World!";
711 let crc = crc32_fast(data);
712
713 assert_eq!(crc, crc32_fast(data));
715
716 let crc2 = crc32_fast(b"Different data");
718 assert_ne!(crc, crc2);
719 }
720
721 #[test]
722 fn test_crc32_batch() {
723 let buffers: Vec<&[u8]> = vec![b"data1", b"data2", b"data3"];
724 let crcs = crc32_batch(&buffers);
725
726 assert_eq!(crcs.len(), 3);
727 assert_eq!(crcs[0], crc32_fast(b"data1"));
728 assert_eq!(crcs[1], crc32_fast(b"data2"));
729 assert_eq!(crcs[2], crc32_fast(b"data3"));
730 }
731
732 #[test]
733 fn test_batch_processor() {
734 let processor = BatchProcessor::new(4);
735
736 let items: Vec<i32> = (0..100).collect();
737 let results = processor.process(items, |x| x * 2);
738
739 assert_eq!(results.len(), 100);
740 for (i, r) in results.iter().enumerate() {
741 assert_eq!(*r, (i as i32) * 2);
742 }
743
744 let stats = processor.stats();
745 assert_eq!(stats.items_processed, 100);
746 }
747
748 #[test]
749 fn test_batch_processor_filter_map() {
750 let processor = BatchProcessor::new(4);
751
752 let items: Vec<i32> = (0..100).collect();
753 let results = processor.filter_map(items, |x| if x % 2 == 0 { Some(x * 2) } else { None });
754
755 assert_eq!(results.len(), 50);
756 for r in &results {
757 assert_eq!(r % 4, 0);
758 }
759 }
760
761 #[test]
762 fn test_record_batch() {
763 let mut batch = RecordBatch::new();
764
765 batch.add(1000, Some(b"key1"), b"value1");
766 batch.add(2000, Some(b"key2"), b"value2222");
767 batch.add(3000, None, b"v3");
768
769 assert_eq!(batch.len, 3);
770
771 assert_eq!(batch.timestamp(0), 1000);
772 assert_eq!(batch.key(0), Some(&b"key1"[..]));
773 assert_eq!(batch.value(0), b"value1");
774
775 assert_eq!(batch.timestamp(1), 2000);
776 assert_eq!(batch.key(1), Some(&b"key2"[..]));
777 assert_eq!(batch.value(1), b"value2222");
778
779 assert_eq!(batch.timestamp(2), 3000);
780 assert_eq!(batch.key(2), None);
781 assert_eq!(batch.value(2), b"v3");
782 }
783
784 #[test]
785 fn test_record_batch_filter() {
786 let mut batch = RecordBatch::new();
787
788 for i in 0..10 {
789 batch.add(
790 i * 100,
791 Some(format!("key{}", i).as_bytes()),
792 format!("value{}", i).as_bytes(),
793 );
794 }
795
796 let filtered = batch.filter(|ts, _, _| ts >= 500);
797
798 assert_eq!(filtered.len, 5);
799 assert_eq!(filtered.timestamp(0), 500);
800 }
801
802 #[test]
803 fn test_record_batch_iter() {
804 let mut batch = RecordBatch::new();
805
806 batch.add(1000, Some(b"k1"), b"v1");
807 batch.add(2000, Some(b"k2"), b"v2");
808
809 let collected: Vec<_> = batch.into_iter().collect();
810
811 assert_eq!(collected.len(), 2);
812 assert_eq!(collected[0].0, 1000);
813 assert_eq!(collected[1].0, 2000);
814 }
815
816 #[test]
817 fn test_record_batch_map_values() {
818 let mut batch = RecordBatch::new();
819
820 batch.add(1000, None, b"hello");
821 batch.add(2000, None, b"world");
822
823 let mapped = batch.map_values(|v| v.iter().map(|b| b.to_ascii_uppercase()).collect());
824
825 assert_eq!(mapped.value(0), b"HELLO");
826 assert_eq!(mapped.value(1), b"WORLD");
827 }
828
829 #[test]
830 fn test_memchr_fast() {
831 let haystack = b"hello, world!";
832
833 assert_eq!(memchr_fast(b'w', haystack), Some(7));
834 assert_eq!(memchr_fast(b'x', haystack), None);
835 }
836
837 #[test]
838 fn test_memmem_fast() {
839 let haystack = b"hello, world! world!";
840
841 assert_eq!(memmem_fast(b"world", haystack), Some(7));
842 assert_eq!(memmem_fast(b"xyz", haystack), None);
843 }
844}