1use bytes::{Bytes, BytesMut};
16use std::alloc::{alloc, dealloc, Layout};
17use std::ops::Deref;
18use std::ptr::NonNull;
19use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22const CACHE_LINE_SIZE: usize = 64;
24
25const DEFAULT_BUFFER_SIZE: usize = 256 * 1024;
27
28#[derive(Debug)]
30pub struct ZeroCopyBuffer {
31 data: NonNull<u8>,
33 capacity: usize,
35 write_pos: AtomicUsize,
37 ref_count: AtomicU32,
39 id: u64,
41 layout: Layout,
43}
44
45unsafe impl Send for ZeroCopyBuffer {}
47unsafe impl Sync for ZeroCopyBuffer {}
48
49impl ZeroCopyBuffer {
50 pub fn new(capacity: usize) -> Self {
52 Self::with_id(capacity, 0)
53 }
54
55 pub fn with_id(capacity: usize, id: u64) -> Self {
57 let aligned_capacity = (capacity + CACHE_LINE_SIZE - 1) & !(CACHE_LINE_SIZE - 1);
59 let layout =
60 Layout::from_size_align(aligned_capacity, CACHE_LINE_SIZE).expect("Invalid layout");
61
62 let data = unsafe {
64 let ptr = alloc(layout);
65 if ptr.is_null() {
66 std::alloc::handle_alloc_error(layout);
67 }
68 NonNull::new_unchecked(ptr)
69 };
70
71 Self {
72 data,
73 capacity: aligned_capacity,
74 write_pos: AtomicUsize::new(0),
75 ref_count: AtomicU32::new(1),
76 id,
77 layout,
78 }
79 }
80
81 pub fn reserve(&self, len: usize) -> Option<BufferSlice> {
84 loop {
85 let current = self.write_pos.load(Ordering::Acquire);
86 let new_pos = current + len;
87
88 if new_pos > self.capacity {
89 return None;
90 }
91
92 if self
93 .write_pos
94 .compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
95 .is_ok()
96 {
97 self.ref_count.fetch_add(1, Ordering::Relaxed);
98
99 return Some(BufferSlice {
100 buffer: self as *const ZeroCopyBuffer,
101 offset: current,
102 len,
103 });
104 }
105 std::hint::spin_loop();
107 }
108 }
109
110 #[allow(clippy::mut_from_ref)]
116 pub unsafe fn get_mut_slice(&self, offset: usize, len: usize) -> &mut [u8] {
117 debug_assert!(offset + len <= self.capacity);
118 std::slice::from_raw_parts_mut(self.data.as_ptr().add(offset), len)
119 }
120
121 pub fn get_slice(&self, offset: usize, len: usize) -> &[u8] {
123 debug_assert!(offset + len <= self.write_pos.load(Ordering::Acquire));
124 unsafe { std::slice::from_raw_parts(self.data.as_ptr().add(offset), len) }
125 }
126
127 pub fn len(&self) -> usize {
129 self.write_pos.load(Ordering::Acquire)
130 }
131
132 pub fn is_empty(&self) -> bool {
134 self.len() == 0
135 }
136
137 pub fn remaining(&self) -> usize {
139 self.capacity - self.len()
140 }
141
142 pub fn capacity(&self) -> usize {
144 self.capacity
145 }
146
147 pub fn id(&self) -> u64 {
149 self.id
150 }
151
152 pub fn reset(&self) -> bool {
154 if self.ref_count.load(Ordering::Acquire) == 1 {
155 self.write_pos.store(0, Ordering::Release);
156 true
157 } else {
158 false
159 }
160 }
161
162 pub fn add_ref(&self) {
164 self.ref_count.fetch_add(1, Ordering::Relaxed);
165 }
166
167 pub fn release(&self) -> bool {
169 self.ref_count.fetch_sub(1, Ordering::AcqRel) == 1
170 }
171
172 pub fn ref_count(&self) -> u32 {
174 self.ref_count.load(Ordering::Relaxed)
175 }
176
177 pub fn freeze(&self) -> Bytes {
179 let len = self.len();
180 if len == 0 {
181 return Bytes::new();
182 }
183 Bytes::copy_from_slice(self.get_slice(0, len))
186 }
187}
188
189impl Drop for ZeroCopyBuffer {
190 fn drop(&mut self) {
191 unsafe {
193 dealloc(self.data.as_ptr(), self.layout);
194 }
195 }
196}
197
198#[derive(Debug)]
201pub struct BufferSlice {
202 buffer: *const ZeroCopyBuffer,
203 offset: usize,
204 len: usize,
205}
206
207unsafe impl Send for BufferSlice {}
209unsafe impl Sync for BufferSlice {}
210
211impl BufferSlice {
212 pub fn as_bytes(&self) -> &[u8] {
214 unsafe { &*self.buffer }.get_slice(self.offset, self.len)
216 }
217
218 pub unsafe fn as_mut_bytes(&mut self) -> &mut [u8] {
222 (*self.buffer).get_mut_slice(self.offset, self.len)
223 }
224
225 pub fn write(&mut self, data: &[u8]) -> usize {
227 let write_len = data.len().min(self.len);
228 unsafe {
229 let dest = self.as_mut_bytes();
230 dest[..write_len].copy_from_slice(&data[..write_len]);
231 }
232 write_len
233 }
234
235 pub fn len(&self) -> usize {
237 self.len
238 }
239
240 pub fn is_empty(&self) -> bool {
242 self.len == 0
243 }
244
245 pub fn offset(&self) -> usize {
247 self.offset
248 }
249
250 pub fn to_bytes(&self) -> Bytes {
252 Bytes::copy_from_slice(self.as_bytes())
253 }
254}
255
256impl Drop for BufferSlice {
257 fn drop(&mut self) {
258 unsafe {
260 (*self.buffer).release();
261 }
262 }
263}
264
265impl Deref for BufferSlice {
266 type Target = [u8];
267
268 fn deref(&self) -> &Self::Target {
269 self.as_bytes()
270 }
271}
272
273impl AsRef<[u8]> for BufferSlice {
274 fn as_ref(&self) -> &[u8] {
275 self.as_bytes()
276 }
277}
278
279pub struct ZeroCopyBufferPool {
281 free_buffers: crossbeam_channel::Sender<Arc<ZeroCopyBuffer>>,
283 buffer_receiver: crossbeam_channel::Receiver<Arc<ZeroCopyBuffer>>,
285 buffer_size: usize,
287 next_id: AtomicU64,
289 total_created: AtomicU64,
291 in_use: AtomicU64,
293}
294
295impl ZeroCopyBufferPool {
296 pub fn new(buffer_size: usize, initial_count: usize) -> Self {
298 let (tx, rx) = crossbeam_channel::bounded(initial_count * 2);
299
300 let pool = Self {
301 free_buffers: tx,
302 buffer_receiver: rx,
303 buffer_size,
304 next_id: AtomicU64::new(0),
305 total_created: AtomicU64::new(0),
306 in_use: AtomicU64::new(0),
307 };
308
309 for _ in 0..initial_count {
311 let id = pool.next_id.fetch_add(1, Ordering::Relaxed);
312 let buffer = Arc::new(ZeroCopyBuffer::with_id(buffer_size, id));
313 pool.total_created.fetch_add(1, Ordering::Relaxed);
314 let _ = pool.free_buffers.try_send(buffer);
315 }
316
317 pool
318 }
319
320 pub fn acquire(&self) -> Arc<ZeroCopyBuffer> {
322 match self.buffer_receiver.try_recv() {
323 Ok(buffer) => {
324 if Arc::strong_count(&buffer) == 1 {
326 buffer.reset();
327 }
328 self.in_use.fetch_add(1, Ordering::Relaxed);
329 buffer
330 }
331 Err(_) => {
332 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
334 let buffer = Arc::new(ZeroCopyBuffer::with_id(self.buffer_size, id));
335 self.total_created.fetch_add(1, Ordering::Relaxed);
336 self.in_use.fetch_add(1, Ordering::Relaxed);
337 buffer
338 }
339 }
340 }
341
342 pub fn release(&self, buffer: Arc<ZeroCopyBuffer>) {
344 self.in_use.fetch_sub(1, Ordering::Relaxed);
345
346 if Arc::strong_count(&buffer) == 1 {
348 buffer.reset();
349 let _ = self.free_buffers.try_send(buffer);
350 }
351 }
352
353 pub fn stats(&self) -> BufferPoolStats {
355 BufferPoolStats {
356 buffer_size: self.buffer_size,
357 total_created: self.total_created.load(Ordering::Relaxed),
358 in_use: self.in_use.load(Ordering::Relaxed),
359 available: self.buffer_receiver.len() as u64,
360 }
361 }
362}
363
364#[derive(Debug, Clone)]
365pub struct BufferPoolStats {
366 pub buffer_size: usize,
367 pub total_created: u64,
368 pub in_use: u64,
369 pub available: u64,
370}
371
372#[derive(Debug)]
375pub struct ZeroCopyMessage {
376 pub topic: Arc<str>,
378 pub partition: u32,
380 pub key: Option<BufferRef>,
382 pub value: BufferRef,
384 pub headers: Vec<(Arc<str>, BufferRef)>,
386 pub timestamp: i64,
388}
389
390#[derive(Debug, Clone)]
392pub enum BufferRef {
393 Inline(SmallVec),
395 External(Bytes),
397 Slice {
399 buffer: Arc<ZeroCopyBuffer>,
400 offset: usize,
401 len: usize,
402 },
403}
404
405impl BufferRef {
406 pub fn from_bytes(data: &[u8]) -> Self {
408 if data.len() <= 64 {
409 BufferRef::Inline(SmallVec::from_slice(data))
410 } else {
411 BufferRef::External(Bytes::copy_from_slice(data))
412 }
413 }
414
415 pub fn from_external(data: Bytes) -> Self {
417 if data.len() <= 64 {
418 BufferRef::Inline(SmallVec::from_slice(&data))
419 } else {
420 BufferRef::External(data)
421 }
422 }
423
424 pub fn from_slice(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
426 if len <= 64 {
427 let data = buffer.get_slice(offset, len);
428 BufferRef::Inline(SmallVec::from_slice(data))
429 } else {
430 BufferRef::Slice {
431 buffer,
432 offset,
433 len,
434 }
435 }
436 }
437
438 pub fn as_bytes(&self) -> &[u8] {
440 match self {
441 BufferRef::Inline(sv) => sv.as_slice(),
442 BufferRef::External(b) => b,
443 BufferRef::Slice {
444 buffer,
445 offset,
446 len,
447 } => buffer.get_slice(*offset, *len),
448 }
449 }
450
451 pub fn len(&self) -> usize {
453 match self {
454 BufferRef::Inline(sv) => sv.len(),
455 BufferRef::External(b) => b.len(),
456 BufferRef::Slice { len, .. } => *len,
457 }
458 }
459
460 pub fn is_empty(&self) -> bool {
462 self.len() == 0
463 }
464
465 pub fn to_bytes(&self) -> Bytes {
467 match self {
468 BufferRef::Inline(sv) => Bytes::copy_from_slice(sv.as_slice()),
469 BufferRef::External(b) => b.clone(),
470 BufferRef::Slice {
471 buffer,
472 offset,
473 len,
474 } => Bytes::copy_from_slice(buffer.get_slice(*offset, *len)),
475 }
476 }
477}
478
479impl AsRef<[u8]> for BufferRef {
480 fn as_ref(&self) -> &[u8] {
481 self.as_bytes()
482 }
483}
484
485#[derive(Debug, Clone)]
487pub struct SmallVec {
488 data: [u8; 64],
489 len: u8,
490}
491
492impl SmallVec {
493 pub fn new() -> Self {
494 Self {
495 data: [0u8; 64],
496 len: 0,
497 }
498 }
499
500 pub fn from_slice(slice: &[u8]) -> Self {
501 let len = slice.len().min(64);
502 let mut sv = Self::new();
503 sv.data[..len].copy_from_slice(&slice[..len]);
504 sv.len = len as u8;
505 sv
506 }
507
508 pub fn as_slice(&self) -> &[u8] {
509 &self.data[..self.len as usize]
510 }
511
512 pub fn len(&self) -> usize {
513 self.len as usize
514 }
515
516 pub fn is_empty(&self) -> bool {
517 self.len == 0
518 }
519}
520
521impl Default for SmallVec {
522 fn default() -> Self {
523 Self::new()
524 }
525}
526
527pub struct ZeroCopyProducer {
529 buffer_pool: Arc<ZeroCopyBufferPool>,
531 current_buffer: parking_lot::Mutex<Option<Arc<ZeroCopyBuffer>>>,
533 topic_cache: dashmap::DashMap<String, Arc<str>>,
535 stats: ProducerStats,
537}
538
539impl ZeroCopyProducer {
540 pub fn new(buffer_pool: Arc<ZeroCopyBufferPool>) -> Self {
542 Self {
543 buffer_pool,
544 current_buffer: parking_lot::Mutex::new(None),
545 topic_cache: dashmap::DashMap::new(),
546 stats: ProducerStats::new(),
547 }
548 }
549
550 pub fn with_defaults() -> Self {
552 let pool = Arc::new(ZeroCopyBufferPool::new(DEFAULT_BUFFER_SIZE, 16));
553 Self::new(pool)
554 }
555
556 fn intern_topic(&self, topic: &str) -> Arc<str> {
558 if let Some(interned) = self.topic_cache.get(topic) {
559 return interned.clone();
560 }
561
562 let interned: Arc<str> = Arc::from(topic);
563 self.topic_cache.insert(topic.to_string(), interned.clone());
564 interned
565 }
566
567 pub fn create_message(
569 &self,
570 topic: &str,
571 partition: u32,
572 key: Option<&[u8]>,
573 value: &[u8],
574 ) -> ZeroCopyMessage {
575 self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
576 self.stats
577 .bytes_written
578 .fetch_add(value.len() as u64, Ordering::Relaxed);
579
580 let topic = self.intern_topic(topic);
581 let timestamp = std::time::SystemTime::now()
582 .duration_since(std::time::UNIX_EPOCH)
583 .unwrap_or_default()
584 .as_millis() as i64;
585
586 ZeroCopyMessage {
587 topic,
588 partition,
589 key: key.map(BufferRef::from_bytes),
590 value: BufferRef::from_bytes(value),
591 headers: Vec::new(),
592 timestamp,
593 }
594 }
595
596 pub fn create_message_from_bytes(
598 &self,
599 topic: &str,
600 partition: u32,
601 key: Option<Bytes>,
602 value: Bytes,
603 ) -> ZeroCopyMessage {
604 self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
605 self.stats
606 .bytes_written
607 .fetch_add(value.len() as u64, Ordering::Relaxed);
608
609 let topic = self.intern_topic(topic);
610 let timestamp = std::time::SystemTime::now()
611 .duration_since(std::time::UNIX_EPOCH)
612 .unwrap_or_default()
613 .as_millis() as i64;
614
615 ZeroCopyMessage {
616 topic,
617 partition,
618 key: key.map(BufferRef::from_external),
619 value: BufferRef::from_external(value),
620 headers: Vec::new(),
621 timestamp,
622 }
623 }
624
625 pub fn allocate(&self, size: usize) -> Option<(Arc<ZeroCopyBuffer>, usize)> {
627 let mut guard = self.current_buffer.lock();
628
629 if let Some(ref buffer) = *guard {
631 if let Some(slice) = buffer.reserve(size) {
632 let offset = slice.offset();
633 std::mem::forget(slice); return Some((buffer.clone(), offset));
635 }
636 }
637
638 let buffer = self.buffer_pool.acquire();
640 if let Some(slice) = buffer.reserve(size) {
641 let offset = slice.offset();
642 std::mem::forget(slice);
643 *guard = Some(buffer.clone());
644 return Some((buffer, offset));
645 }
646
647 None
648 }
649
650 pub fn stats(&self) -> ProducerStatsSnapshot {
652 ProducerStatsSnapshot {
653 messages_created: self.stats.messages_created.load(Ordering::Relaxed),
654 bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
655 buffer_pool: self.buffer_pool.stats(),
656 }
657 }
658}
659
660struct ProducerStats {
661 messages_created: AtomicU64,
662 bytes_written: AtomicU64,
663}
664
665impl ProducerStats {
666 fn new() -> Self {
667 Self {
668 messages_created: AtomicU64::new(0),
669 bytes_written: AtomicU64::new(0),
670 }
671 }
672}
673
674#[derive(Debug, Clone)]
675pub struct ProducerStatsSnapshot {
676 pub messages_created: u64,
677 pub bytes_written: u64,
678 pub buffer_pool: BufferPoolStats,
679}
680
681pub struct ZeroCopyConsumer {
683 read_buffer: parking_lot::Mutex<BytesMut>,
685 stats: ConsumerStats,
687}
688
689impl ZeroCopyConsumer {
690 pub fn new() -> Self {
692 Self {
693 read_buffer: parking_lot::Mutex::new(BytesMut::with_capacity(DEFAULT_BUFFER_SIZE)),
694 stats: ConsumerStats::new(),
695 }
696 }
697
698 pub fn parse_messages(&self, data: Bytes) -> Vec<ConsumedMessage> {
700 let mut messages = Vec::new();
701 let mut offset = 0;
702
703 while offset < data.len() {
704 if offset + 20 > data.len() {
706 break;
707 }
708
709 let msg_len = u32::from_be_bytes([
711 data[offset],
712 data[offset + 1],
713 data[offset + 2],
714 data[offset + 3],
715 ]) as usize;
716
717 if offset + 4 + msg_len > data.len() {
718 break;
719 }
720
721 let msg_data = data.slice(offset + 4..offset + 4 + msg_len);
723
724 if let Some(msg) = self.parse_single_message(msg_data) {
725 messages.push(msg);
726 self.stats.messages_consumed.fetch_add(1, Ordering::Relaxed);
727 }
728
729 offset += 4 + msg_len;
730 }
731
732 self.stats
733 .bytes_read
734 .fetch_add(offset as u64, Ordering::Relaxed);
735 messages
736 }
737
738 fn parse_single_message(&self, data: Bytes) -> Option<ConsumedMessage> {
740 if data.len() < 16 {
741 return None;
742 }
743
744 let msg_offset = u64::from_be_bytes([
745 data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
746 ]);
747
748 let timestamp = i64::from_be_bytes([
749 data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
750 ]);
751
752 let value = data.slice(16..);
754
755 Some(ConsumedMessage {
756 offset: msg_offset,
757 timestamp,
758 key: None,
759 value,
760 })
761 }
762
763 pub fn stats(&self) -> ConsumerStatsSnapshot {
765 ConsumerStatsSnapshot {
766 messages_consumed: self.stats.messages_consumed.load(Ordering::Relaxed),
767 bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
768 }
769 }
770
771 pub fn buffer_data(&self, data: &[u8]) -> Bytes {
773 let mut buffer = self.read_buffer.lock();
774 buffer.clear();
775 buffer.extend_from_slice(data);
776 buffer.clone().freeze()
777 }
778}
779
780impl Default for ZeroCopyConsumer {
781 fn default() -> Self {
782 Self::new()
783 }
784}
785
786struct ConsumerStats {
787 messages_consumed: AtomicU64,
788 bytes_read: AtomicU64,
789}
790
791impl ConsumerStats {
792 fn new() -> Self {
793 Self {
794 messages_consumed: AtomicU64::new(0),
795 bytes_read: AtomicU64::new(0),
796 }
797 }
798}
799
800#[derive(Debug, Clone)]
801pub struct ConsumerStatsSnapshot {
802 pub messages_consumed: u64,
803 pub bytes_read: u64,
804}
805
806#[derive(Debug, Clone)]
808pub struct ConsumedMessage {
809 pub offset: u64,
811 pub timestamp: i64,
813 pub key: Option<Bytes>,
815 pub value: Bytes,
817}
818
819impl ConsumedMessage {
820 pub fn value_str(&self) -> Option<&str> {
822 std::str::from_utf8(&self.value).ok()
823 }
824
825 pub fn key_str(&self) -> Option<&str> {
827 self.key.as_ref().and_then(|k| std::str::from_utf8(k).ok())
828 }
829}
830
831#[cfg(test)]
832mod tests {
833 use super::*;
834
835 #[test]
836 fn test_zero_copy_buffer_basic() {
837 let buffer = ZeroCopyBuffer::new(1024);
838 assert_eq!(buffer.len(), 0);
839 assert!(buffer.remaining() >= 1024);
840
841 let slice = buffer.reserve(100).unwrap();
843 assert_eq!(slice.len(), 100);
844 assert_eq!(buffer.len(), 100);
845 }
846
847 #[test]
848 fn test_zero_copy_buffer_write() {
849 let buffer = ZeroCopyBuffer::new(1024);
850
851 let mut slice = buffer.reserve(11).unwrap();
852 slice.write(b"Hello World");
853
854 assert_eq!(slice.as_bytes(), b"Hello World");
855 }
856
857 #[test]
858 fn test_buffer_pool() {
859 let pool = ZeroCopyBufferPool::new(1024, 4);
860 let stats = pool.stats();
861 assert_eq!(stats.total_created, 4);
862 assert_eq!(stats.available, 4);
863
864 let b1 = pool.acquire();
866 let b2 = pool.acquire();
867
868 let stats = pool.stats();
869 assert_eq!(stats.in_use, 2);
870
871 pool.release(b1);
873 pool.release(b2);
874
875 let stats = pool.stats();
876 assert_eq!(stats.in_use, 0);
877 }
878
879 #[test]
880 fn test_buffer_ref_inline() {
881 let small_data = b"small";
882 let buf_ref = BufferRef::from_bytes(small_data);
883
884 match buf_ref {
885 BufferRef::Inline(_) => {}
886 _ => panic!("Expected inline storage for small data"),
887 }
888
889 assert_eq!(buf_ref.as_bytes(), small_data);
890 }
891
892 #[test]
893 fn test_buffer_ref_external() {
894 let large_data = vec![0u8; 100];
895 let buf_ref = BufferRef::from_bytes(&large_data);
896
897 match buf_ref {
898 BufferRef::External(_) => {}
899 _ => panic!("Expected external storage for large data"),
900 }
901
902 assert_eq!(buf_ref.len(), 100);
903 }
904
905 #[test]
906 fn test_zero_copy_producer() {
907 let producer = ZeroCopyProducer::with_defaults();
908
909 let msg = producer.create_message("test-topic", 0, Some(b"key1"), b"value1");
910
911 assert_eq!(&*msg.topic, "test-topic");
912 assert_eq!(msg.partition, 0);
913 assert_eq!(msg.key.unwrap().as_bytes(), b"key1");
914 assert_eq!(msg.value.as_bytes(), b"value1");
915
916 let stats = producer.stats();
917 assert_eq!(stats.messages_created, 1);
918 }
919
920 #[test]
921 fn test_zero_copy_consumer() {
922 let consumer = ZeroCopyConsumer::new();
923
924 let mut data = BytesMut::new();
926
927 data.extend_from_slice(&21u32.to_be_bytes());
929 data.extend_from_slice(&42u64.to_be_bytes());
931 data.extend_from_slice(&1234567890i64.to_be_bytes());
933 data.extend_from_slice(b"hello");
935
936 let messages = consumer.parse_messages(data.freeze());
937
938 assert_eq!(messages.len(), 1);
939 assert_eq!(messages[0].offset, 42);
940 assert_eq!(messages[0].timestamp, 1234567890);
941 assert_eq!(&messages[0].value[..], b"hello");
942 }
943
944 #[test]
945 fn test_small_vec() {
946 let sv = SmallVec::from_slice(b"test data");
947 assert_eq!(sv.as_slice(), b"test data");
948 assert_eq!(sv.len(), 9);
949 }
950
951 #[test]
952 fn test_topic_interning() {
953 let producer = ZeroCopyProducer::with_defaults();
954
955 let msg1 = producer.create_message("topic-a", 0, None, b"v1");
956 let msg2 = producer.create_message("topic-a", 0, None, b"v2");
957 let msg3 = producer.create_message("topic-b", 0, None, b"v3");
958
959 assert!(Arc::ptr_eq(&msg1.topic, &msg2.topic));
961 assert!(!Arc::ptr_eq(&msg1.topic, &msg3.topic));
963 }
964}