1use bytes::{BufMut, Bytes, BytesMut};
21use std::sync::atomic::{AtomicU64, Ordering};
22
23pub struct BatchEncoder {
25 buffer: BytesMut,
27 message_count: usize,
29 stats: EncoderStats,
31}
32
33impl BatchEncoder {
34 pub fn new() -> Self {
36 Self::with_capacity(64 * 1024) }
38
39 pub fn with_capacity(capacity: usize) -> Self {
41 Self {
42 buffer: BytesMut::with_capacity(capacity),
43 message_count: 0,
44 stats: EncoderStats::new(),
45 }
46 }
47
48 pub fn add_message(&mut self, key: Option<&[u8]>, value: &[u8], timestamp: i64) {
50 let key_len = key.map(|k| k.len()).unwrap_or(0);
54 let total_len = 8 + 4 + key_len + 4 + value.len() + 4;
55
56 if self.buffer.remaining_mut() < 4 + total_len {
58 self.buffer.reserve(4 + total_len);
59 }
60
61 self.buffer.put_u32(total_len as u32);
63
64 self.buffer.put_i64(timestamp);
66
67 self.buffer.put_u32(key_len as u32);
69 if let Some(k) = key {
70 self.buffer.extend_from_slice(k);
71 }
72
73 self.buffer.put_u32(value.len() as u32);
75 self.buffer.extend_from_slice(value);
76
77 let crc = crc32_fast(&self.buffer[self.buffer.len() - total_len + 4..]);
79 self.buffer.put_u32(crc);
80
81 self.message_count += 1;
82 self.stats.messages_encoded.fetch_add(1, Ordering::Relaxed);
83 self.stats
84 .bytes_encoded
85 .fetch_add((4 + total_len) as u64, Ordering::Relaxed);
86 }
87
88 pub fn add_messages(&mut self, messages: &[BatchMessage]) {
90 let total_size: usize = messages
92 .iter()
93 .map(|m| {
94 let key_len = m.key.as_ref().map(|k| k.len()).unwrap_or(0);
95 4 + 8 + 4 + key_len + 4 + m.value.len() + 4
96 })
97 .sum();
98
99 self.buffer.reserve(total_size);
100
101 for msg in messages {
103 self.add_message(msg.key.as_deref(), &msg.value, msg.timestamp);
104 }
105 }
106
107 pub fn finish(self) -> Bytes {
109 self.buffer.freeze()
110 }
111
112 pub fn len(&self) -> usize {
114 self.buffer.len()
115 }
116
117 pub fn is_empty(&self) -> bool {
119 self.buffer.is_empty()
120 }
121
122 pub fn message_count(&self) -> usize {
124 self.message_count
125 }
126
127 pub fn reset(&mut self) {
129 self.buffer.clear();
130 self.message_count = 0;
131 }
132
133 pub fn stats(&self) -> EncoderStatsSnapshot {
135 EncoderStatsSnapshot {
136 messages_encoded: self.stats.messages_encoded.load(Ordering::Relaxed),
137 bytes_encoded: self.stats.bytes_encoded.load(Ordering::Relaxed),
138 }
139 }
140}
141
142impl Default for BatchEncoder {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148#[derive(Debug, Clone)]
150pub struct BatchMessage {
151 pub key: Option<Vec<u8>>,
152 pub value: Vec<u8>,
153 pub timestamp: i64,
154}
155
156impl BatchMessage {
157 pub fn new(value: Vec<u8>) -> Self {
158 Self {
159 key: None,
160 value,
161 timestamp: std::time::SystemTime::now()
162 .duration_since(std::time::UNIX_EPOCH)
163 .unwrap_or_default()
164 .as_millis() as i64,
165 }
166 }
167
168 pub fn with_key(key: Vec<u8>, value: Vec<u8>) -> Self {
169 Self {
170 key: Some(key),
171 value,
172 timestamp: std::time::SystemTime::now()
173 .duration_since(std::time::UNIX_EPOCH)
174 .unwrap_or_default()
175 .as_millis() as i64,
176 }
177 }
178}
179
180struct EncoderStats {
181 messages_encoded: AtomicU64,
182 bytes_encoded: AtomicU64,
183}
184
185impl EncoderStats {
186 fn new() -> Self {
187 Self {
188 messages_encoded: AtomicU64::new(0),
189 bytes_encoded: AtomicU64::new(0),
190 }
191 }
192}
193
194#[derive(Debug, Clone)]
195pub struct EncoderStatsSnapshot {
196 pub messages_encoded: u64,
197 pub bytes_encoded: u64,
198}
199
200pub struct BatchDecoder {
202 stats: DecoderStats,
204}
205
206impl BatchDecoder {
207 pub fn new() -> Self {
208 Self {
209 stats: DecoderStats::new(),
210 }
211 }
212
213 pub fn decode_all(&self, data: &[u8]) -> Vec<DecodedMessage> {
215 let mut messages = Vec::new();
216 let mut offset = 0;
217
218 while offset + 4 <= data.len() {
219 let total_len = u32::from_be_bytes([
221 data[offset],
222 data[offset + 1],
223 data[offset + 2],
224 data[offset + 3],
225 ]) as usize;
226
227 if offset + 4 + total_len > data.len() {
228 break;
229 }
230
231 if let Some(msg) = self.decode_message(&data[offset + 4..offset + 4 + total_len]) {
232 messages.push(msg);
233 self.stats.messages_decoded.fetch_add(1, Ordering::Relaxed);
234 }
235
236 offset += 4 + total_len;
237 }
238
239 self.stats
240 .bytes_decoded
241 .fetch_add(offset as u64, Ordering::Relaxed);
242 messages
243 }
244
245 fn decode_message(&self, data: &[u8]) -> Option<DecodedMessage> {
247 if data.len() < 20 {
248 return None;
250 }
251
252 let stored_crc = u32::from_be_bytes([
254 data[data.len() - 4],
255 data[data.len() - 3],
256 data[data.len() - 2],
257 data[data.len() - 1],
258 ]);
259
260 let computed_crc = crc32_fast(&data[..data.len() - 4]);
261 if stored_crc != computed_crc {
262 return None;
263 }
264
265 let timestamp = i64::from_be_bytes([
267 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
268 ]);
269
270 let key_len = u32::from_be_bytes([data[8], data[9], data[10], data[11]]) as usize;
271
272 let key = if key_len > 0 {
273 Some(Bytes::copy_from_slice(&data[12..12 + key_len]))
274 } else {
275 None
276 };
277
278 let value_offset = 12 + key_len;
279 let value_len = u32::from_be_bytes([
280 data[value_offset],
281 data[value_offset + 1],
282 data[value_offset + 2],
283 data[value_offset + 3],
284 ]) as usize;
285
286 let value = Bytes::copy_from_slice(&data[value_offset + 4..value_offset + 4 + value_len]);
287
288 Some(DecodedMessage {
289 timestamp,
290 key,
291 value,
292 })
293 }
294
295 pub fn stats(&self) -> DecoderStatsSnapshot {
297 DecoderStatsSnapshot {
298 messages_decoded: self.stats.messages_decoded.load(Ordering::Relaxed),
299 bytes_decoded: self.stats.bytes_decoded.load(Ordering::Relaxed),
300 }
301 }
302}
303
304impl Default for BatchDecoder {
305 fn default() -> Self {
306 Self::new()
307 }
308}
309
310struct DecoderStats {
311 messages_decoded: AtomicU64,
312 bytes_decoded: AtomicU64,
313}
314
315impl DecoderStats {
316 fn new() -> Self {
317 Self {
318 messages_decoded: AtomicU64::new(0),
319 bytes_decoded: AtomicU64::new(0),
320 }
321 }
322}
323
324#[derive(Debug, Clone)]
325pub struct DecoderStatsSnapshot {
326 pub messages_decoded: u64,
327 pub bytes_decoded: u64,
328}
329
330#[derive(Debug, Clone)]
332pub struct DecodedMessage {
333 pub timestamp: i64,
334 pub key: Option<Bytes>,
335 pub value: Bytes,
336}
337
338#[inline]
340pub fn crc32_fast(data: &[u8]) -> u32 {
341 let mut hasher = crc32fast::Hasher::new();
343 hasher.update(data);
344 hasher.finalize()
345}
346
347pub fn crc32_batch(buffers: &[&[u8]]) -> Vec<u32> {
349 buffers.iter().map(|buf| crc32_fast(buf)).collect()
350}
351
352#[inline]
354pub fn memcmp_fast(a: &[u8], b: &[u8]) -> std::cmp::Ordering {
355 a.cmp(b)
356}
357
358#[inline]
360pub fn memchr_fast(needle: u8, haystack: &[u8]) -> Option<usize> {
361 memchr::memchr(needle, haystack)
362}
363
364#[inline]
366pub fn memmem_fast(needle: &[u8], haystack: &[u8]) -> Option<usize> {
367 memchr::memmem::find(haystack, needle)
368}
369
370pub struct BatchProcessor {
372 workers: usize,
374 stats: ProcessorStats,
376}
377
378impl BatchProcessor {
379 pub fn new(workers: usize) -> Self {
380 Self {
381 workers: workers.max(1),
382 stats: ProcessorStats::new(),
383 }
384 }
385
386 pub fn process<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
388 where
389 T: Send + Sync,
390 R: Send,
391 F: Fn(&T) -> R + Send + Sync,
392 {
393 if items.len() <= self.workers {
394 return items.iter().map(&f).collect();
396 }
397
398 let chunk_size = items.len().div_ceil(self.workers);
399
400 std::thread::scope(|s| {
401 let mut handles = vec![];
402
403 for chunk in items.chunks(chunk_size) {
404 let f = &f;
405 handles.push(s.spawn(move || chunk.iter().map(f).collect::<Vec<_>>()));
406 }
407
408 let mut results = Vec::with_capacity(items.len());
409 for handle in handles {
410 results.extend(handle.join().unwrap());
411 }
412
413 self.stats.batches_processed.fetch_add(1, Ordering::Relaxed);
414 self.stats
415 .items_processed
416 .fetch_add(items.len() as u64, Ordering::Relaxed);
417
418 results
419 })
420 }
421
422 pub fn filter_map<T, R, F>(&self, items: Vec<T>, f: F) -> Vec<R>
424 where
425 T: Send + Sync,
426 R: Send,
427 F: Fn(&T) -> Option<R> + Send + Sync,
428 {
429 if items.len() <= self.workers {
430 return items.iter().filter_map(&f).collect();
431 }
432
433 let chunk_size = items.len().div_ceil(self.workers);
434
435 std::thread::scope(|s| {
436 let mut handles = vec![];
437
438 for chunk in items.chunks(chunk_size) {
439 let f = &f;
440 handles.push(s.spawn(move || chunk.iter().filter_map(f).collect::<Vec<_>>()));
441 }
442
443 let mut results = Vec::with_capacity(items.len());
444 for handle in handles {
445 results.extend(handle.join().unwrap());
446 }
447
448 results
449 })
450 }
451
452 pub fn stats(&self) -> ProcessorStatsSnapshot {
454 ProcessorStatsSnapshot {
455 batches_processed: self.stats.batches_processed.load(Ordering::Relaxed),
456 items_processed: self.stats.items_processed.load(Ordering::Relaxed),
457 }
458 }
459}
460
461struct ProcessorStats {
462 batches_processed: AtomicU64,
463 items_processed: AtomicU64,
464}
465
466impl ProcessorStats {
467 fn new() -> Self {
468 Self {
469 batches_processed: AtomicU64::new(0),
470 items_processed: AtomicU64::new(0),
471 }
472 }
473}
474
475#[derive(Debug, Clone)]
476pub struct ProcessorStatsSnapshot {
477 pub batches_processed: u64,
478 pub items_processed: u64,
479}
480
481#[derive(Debug)]
483pub struct RecordBatch {
484 pub len: usize,
486 pub timestamps: Vec<i64>,
488 pub key_offsets: Vec<u32>,
490 pub key_data: Vec<u8>,
492 pub value_offsets: Vec<u32>,
494 pub value_data: Vec<u8>,
496}
497
498impl RecordBatch {
499 pub fn new() -> Self {
501 Self {
502 len: 0,
503 timestamps: Vec::new(),
504 key_offsets: vec![0],
505 key_data: Vec::new(),
506 value_offsets: vec![0],
507 value_data: Vec::new(),
508 }
509 }
510
511 pub fn with_capacity(records: usize, avg_key_size: usize, avg_value_size: usize) -> Self {
513 Self {
514 len: 0,
515 timestamps: Vec::with_capacity(records),
516 key_offsets: Vec::with_capacity(records + 1),
517 key_data: Vec::with_capacity(records * avg_key_size),
518 value_offsets: Vec::with_capacity(records + 1),
519 value_data: Vec::with_capacity(records * avg_value_size),
520 }
521 }
522
523 pub fn add(&mut self, timestamp: i64, key: Option<&[u8]>, value: &[u8]) {
525 self.timestamps.push(timestamp);
526
527 if let Some(k) = key {
528 self.key_data.extend_from_slice(k);
529 }
530 self.key_offsets.push(self.key_data.len() as u32);
531
532 self.value_data.extend_from_slice(value);
533 self.value_offsets.push(self.value_data.len() as u32);
534
535 self.len += 1;
536 }
537
538 pub fn timestamp(&self, idx: usize) -> i64 {
540 self.timestamps[idx]
541 }
542
543 pub fn key(&self, idx: usize) -> Option<&[u8]> {
545 let start = self.key_offsets[idx] as usize;
546 let end = self.key_offsets[idx + 1] as usize;
547 if start == end {
548 None
549 } else {
550 Some(&self.key_data[start..end])
551 }
552 }
553
554 pub fn value(&self, idx: usize) -> &[u8] {
556 let start = self.value_offsets[idx] as usize;
557 let end = self.value_offsets[idx + 1] as usize;
558 &self.value_data[start..end]
559 }
560
561 pub fn is_empty(&self) -> bool {
563 self.len == 0
564 }
565
566 pub fn memory_size(&self) -> usize {
568 self.timestamps.len() * 8
569 + self.key_offsets.len() * 4
570 + self.key_data.len()
571 + self.value_offsets.len() * 4
572 + self.value_data.len()
573 }
574
575 pub fn filter<F>(&self, predicate: F) -> RecordBatch
577 where
578 F: Fn(i64, Option<&[u8]>, &[u8]) -> bool,
579 {
580 let mut batch = RecordBatch::new();
581
582 for i in 0..self.len {
583 let ts = self.timestamp(i);
584 let key = self.key(i);
585 let value = self.value(i);
586
587 if predicate(ts, key, value) {
588 batch.add(ts, key, value);
589 }
590 }
591
592 batch
593 }
594
595 pub fn map_values<F>(&self, transform: F) -> RecordBatch
597 where
598 F: Fn(&[u8]) -> Vec<u8>,
599 {
600 let mut batch = RecordBatch::new();
601
602 for i in 0..self.len {
603 let ts = self.timestamp(i);
604 let key = self.key(i);
605 let value = transform(self.value(i));
606 batch.add(ts, key, &value);
607 }
608
609 batch
610 }
611}
612
613impl Default for RecordBatch {
614 fn default() -> Self {
615 Self::new()
616 }
617}
618
619pub struct RecordBatchIter<'a> {
621 batch: &'a RecordBatch,
622 idx: usize,
623}
624
625impl<'a> Iterator for RecordBatchIter<'a> {
626 type Item = (i64, Option<&'a [u8]>, &'a [u8]);
627
628 fn next(&mut self) -> Option<Self::Item> {
629 if self.idx >= self.batch.len {
630 return None;
631 }
632
633 let ts = self.batch.timestamp(self.idx);
634 let key = self.batch.key(self.idx);
635 let value = self.batch.value(self.idx);
636
637 self.idx += 1;
638 Some((ts, key, value))
639 }
640
641 fn size_hint(&self) -> (usize, Option<usize>) {
642 let remaining = self.batch.len - self.idx;
643 (remaining, Some(remaining))
644 }
645}
646
647impl<'a> ExactSizeIterator for RecordBatchIter<'a> {}
648
649impl<'a> IntoIterator for &'a RecordBatch {
650 type Item = (i64, Option<&'a [u8]>, &'a [u8]);
651 type IntoIter = RecordBatchIter<'a>;
652
653 fn into_iter(self) -> Self::IntoIter {
654 RecordBatchIter {
655 batch: self,
656 idx: 0,
657 }
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664
665 #[test]
666 fn test_batch_encoder_decoder() {
667 let mut encoder = BatchEncoder::new();
668
669 encoder.add_message(Some(b"key1"), b"value1", 1000);
671 encoder.add_message(Some(b"key2"), b"value2", 2000);
672 encoder.add_message(None, b"value3", 3000);
673
674 assert_eq!(encoder.message_count(), 3);
675
676 let encoded = encoder.finish();
677
678 let decoder = BatchDecoder::new();
680 let messages = decoder.decode_all(&encoded);
681
682 assert_eq!(messages.len(), 3);
683 assert_eq!(messages[0].timestamp, 1000);
684 assert_eq!(messages[0].key.as_ref().unwrap().as_ref(), b"key1");
685 assert_eq!(messages[0].value.as_ref(), b"value1");
686
687 assert_eq!(messages[2].timestamp, 3000);
688 assert!(messages[2].key.is_none());
689 assert_eq!(messages[2].value.as_ref(), b"value3");
690 }
691
692 #[test]
693 fn test_batch_messages() {
694 let mut encoder = BatchEncoder::new();
695
696 let messages = vec![
697 BatchMessage::with_key(b"k1".to_vec(), b"v1".to_vec()),
698 BatchMessage::with_key(b"k2".to_vec(), b"v2".to_vec()),
699 BatchMessage::new(b"v3".to_vec()),
700 ];
701
702 encoder.add_messages(&messages);
703
704 assert_eq!(encoder.message_count(), 3);
705
706 let encoded = encoder.finish();
707 let decoder = BatchDecoder::new();
708 let decoded = decoder.decode_all(&encoded);
709
710 assert_eq!(decoded.len(), 3);
711 }
712
713 #[test]
714 fn test_crc32_fast() {
715 let data = b"Hello, World!";
716 let crc = crc32_fast(data);
717
718 assert_eq!(crc, crc32_fast(data));
720
721 let crc2 = crc32_fast(b"Different data");
723 assert_ne!(crc, crc2);
724 }
725
726 #[test]
727 fn test_crc32_batch() {
728 let buffers: Vec<&[u8]> = vec![b"data1", b"data2", b"data3"];
729 let crcs = crc32_batch(&buffers);
730
731 assert_eq!(crcs.len(), 3);
732 assert_eq!(crcs[0], crc32_fast(b"data1"));
733 assert_eq!(crcs[1], crc32_fast(b"data2"));
734 assert_eq!(crcs[2], crc32_fast(b"data3"));
735 }
736
737 #[test]
738 fn test_batch_processor() {
739 let processor = BatchProcessor::new(4);
740
741 let items: Vec<i32> = (0..100).collect();
742 let results = processor.process(items, |x| x * 2);
743
744 assert_eq!(results.len(), 100);
745 for (i, r) in results.iter().enumerate() {
746 assert_eq!(*r, (i as i32) * 2);
747 }
748
749 let stats = processor.stats();
750 assert_eq!(stats.items_processed, 100);
751 }
752
753 #[test]
754 fn test_batch_processor_filter_map() {
755 let processor = BatchProcessor::new(4);
756
757 let items: Vec<i32> = (0..100).collect();
758 let results = processor.filter_map(items, |x| if x % 2 == 0 { Some(x * 2) } else { None });
759
760 assert_eq!(results.len(), 50);
761 for r in &results {
762 assert_eq!(r % 4, 0);
763 }
764 }
765
766 #[test]
767 fn test_record_batch() {
768 let mut batch = RecordBatch::new();
769
770 batch.add(1000, Some(b"key1"), b"value1");
771 batch.add(2000, Some(b"key2"), b"value2222");
772 batch.add(3000, None, b"v3");
773
774 assert_eq!(batch.len, 3);
775
776 assert_eq!(batch.timestamp(0), 1000);
777 assert_eq!(batch.key(0), Some(&b"key1"[..]));
778 assert_eq!(batch.value(0), b"value1");
779
780 assert_eq!(batch.timestamp(1), 2000);
781 assert_eq!(batch.key(1), Some(&b"key2"[..]));
782 assert_eq!(batch.value(1), b"value2222");
783
784 assert_eq!(batch.timestamp(2), 3000);
785 assert_eq!(batch.key(2), None);
786 assert_eq!(batch.value(2), b"v3");
787 }
788
789 #[test]
790 fn test_record_batch_filter() {
791 let mut batch = RecordBatch::new();
792
793 for i in 0..10 {
794 batch.add(
795 i * 100,
796 Some(format!("key{}", i).as_bytes()),
797 format!("value{}", i).as_bytes(),
798 );
799 }
800
801 let filtered = batch.filter(|ts, _, _| ts >= 500);
802
803 assert_eq!(filtered.len, 5);
804 assert_eq!(filtered.timestamp(0), 500);
805 }
806
807 #[test]
808 fn test_record_batch_iter() {
809 let mut batch = RecordBatch::new();
810
811 batch.add(1000, Some(b"k1"), b"v1");
812 batch.add(2000, Some(b"k2"), b"v2");
813
814 let collected: Vec<_> = batch.into_iter().collect();
815
816 assert_eq!(collected.len(), 2);
817 assert_eq!(collected[0].0, 1000);
818 assert_eq!(collected[1].0, 2000);
819 }
820
821 #[test]
822 fn test_record_batch_map_values() {
823 let mut batch = RecordBatch::new();
824
825 batch.add(1000, None, b"hello");
826 batch.add(2000, None, b"world");
827
828 let mapped = batch.map_values(|v| v.iter().map(|b| b.to_ascii_uppercase()).collect());
829
830 assert_eq!(mapped.value(0), b"HELLO");
831 assert_eq!(mapped.value(1), b"WORLD");
832 }
833
834 #[test]
835 fn test_memchr_fast() {
836 let haystack = b"hello, world!";
837
838 assert_eq!(memchr_fast(b'w', haystack), Some(7));
839 assert_eq!(memchr_fast(b'x', haystack), None);
840 }
841
842 #[test]
843 fn test_memmem_fast() {
844 let haystack = b"hello, world! world!";
845
846 assert_eq!(memmem_fast(b"world", haystack), Some(7));
847 assert_eq!(memmem_fast(b"xyz", haystack), None);
848 }
849}