Skip to main content

rivven_core/
zero_copy.rs

1//! Zero-Copy Producer/Consumer API
2//!
3//! Provides high-performance data paths that eliminate unnecessary memory copies:
4//! - **ZeroCopyBuffer**: Pre-allocated buffers with reference counting
5//! - **BufferSlice**: View into buffer without copying
6//! - **ZeroCopyProducer**: Produces messages without copying payload
7//! - **ZeroCopyConsumer**: Consumes messages with zero-copy access
8//!
9//! Performance characteristics:
10//! - Eliminates 2-3 copies per message on hot path
11//! - Uses memory-mapped I/O for disk access
12//! - Reference-counted buffers for safe sharing
13//! - Cache-line aligned for optimal CPU performance
14
15use bytes::{Bytes, BytesMut};
16use std::alloc::{alloc, dealloc, Layout};
17use std::ops::Deref;
18use std::ptr::NonNull;
19use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
20use std::sync::Arc;
21
22/// Cache line size for alignment (64 bytes on most modern CPUs)
23const CACHE_LINE_SIZE: usize = 64;
24
25/// Default buffer size (256 KB - optimal for most workloads)
26const DEFAULT_BUFFER_SIZE: usize = 256 * 1024;
27
28/// A zero-copy buffer with reference counting and memory pooling
29#[derive(Debug)]
30pub struct ZeroCopyBuffer {
31    /// Raw pointer to the buffer data
32    data: NonNull<u8>,
33    /// Total capacity of the buffer
34    capacity: usize,
35    /// Current write position
36    write_pos: AtomicUsize,
37    /// Reference count for safe sharing
38    ref_count: AtomicU32,
39    /// Buffer ID for tracking
40    id: u64,
41    /// Layout used for allocation (needed for deallocation)
42    layout: Layout,
43}
44
45// Safety: ZeroCopyBuffer uses atomic operations for thread safety
46unsafe impl Send for ZeroCopyBuffer {}
47unsafe impl Sync for ZeroCopyBuffer {}
48
49impl ZeroCopyBuffer {
50    /// Create a new zero-copy buffer with the given capacity
51    pub fn new(capacity: usize) -> Self {
52        Self::with_id(capacity, 0)
53    }
54
55    /// Create a new zero-copy buffer with custom ID
56    pub fn with_id(capacity: usize, id: u64) -> Self {
57        // Align to cache line for optimal performance
58        let aligned_capacity = (capacity + CACHE_LINE_SIZE - 1) & !(CACHE_LINE_SIZE - 1);
59        let layout =
60            Layout::from_size_align(aligned_capacity, CACHE_LINE_SIZE).expect("Invalid layout");
61
62        // Safety: We're allocating with a valid layout
63        let data = unsafe {
64            let ptr = alloc(layout);
65            if ptr.is_null() {
66                std::alloc::handle_alloc_error(layout);
67            }
68            NonNull::new_unchecked(ptr)
69        };
70
71        Self {
72            data,
73            capacity: aligned_capacity,
74            write_pos: AtomicUsize::new(0),
75            ref_count: AtomicU32::new(1),
76            id,
77            layout,
78        }
79    }
80
81    /// Get a slice of the buffer for writing
82    /// Returns None if there's not enough space
83    pub fn reserve(&self, len: usize) -> Option<BufferSlice> {
84        loop {
85            let current = self.write_pos.load(Ordering::Acquire);
86            let new_pos = current + len;
87
88            if new_pos > self.capacity {
89                return None;
90            }
91
92            if self
93                .write_pos
94                .compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
95                .is_ok()
96            {
97                self.ref_count.fetch_add(1, Ordering::Relaxed);
98
99                return Some(BufferSlice {
100                    buffer: self as *const ZeroCopyBuffer,
101                    offset: current,
102                    len,
103                });
104            }
105            // CAS failed, retry
106            std::hint::spin_loop();
107        }
108    }
109
110    /// Get a mutable slice for the reserved range
111    /// # Safety
112    /// Caller must ensure exclusive access to this range.
113    /// The mutable borrow from immutable self is intentional - this is interior
114    /// mutability via raw pointers with atomic coordination for lock-free access.
115    #[allow(clippy::mut_from_ref)]
116    pub unsafe fn get_mut_slice(&self, offset: usize, len: usize) -> &mut [u8] {
117        debug_assert!(offset + len <= self.capacity);
118        std::slice::from_raw_parts_mut(self.data.as_ptr().add(offset), len)
119    }
120
121    /// Get an immutable slice
122    pub fn get_slice(&self, offset: usize, len: usize) -> &[u8] {
123        debug_assert!(offset + len <= self.write_pos.load(Ordering::Acquire));
124        unsafe { std::slice::from_raw_parts(self.data.as_ptr().add(offset), len) }
125    }
126
127    /// Get the current write position
128    pub fn len(&self) -> usize {
129        self.write_pos.load(Ordering::Acquire)
130    }
131
132    /// Check if buffer is empty
133    pub fn is_empty(&self) -> bool {
134        self.len() == 0
135    }
136
137    /// Get remaining capacity
138    pub fn remaining(&self) -> usize {
139        self.capacity - self.len()
140    }
141
142    /// Get total capacity
143    pub fn capacity(&self) -> usize {
144        self.capacity
145    }
146
147    /// Get buffer ID
148    pub fn id(&self) -> u64 {
149        self.id
150    }
151
152    /// Reset buffer for reuse (only when ref_count == 1)
153    pub fn reset(&self) -> bool {
154        if self.ref_count.load(Ordering::Acquire) == 1 {
155            self.write_pos.store(0, Ordering::Release);
156            true
157        } else {
158            false
159        }
160    }
161
162    /// Increment reference count
163    pub fn add_ref(&self) {
164        self.ref_count.fetch_add(1, Ordering::Relaxed);
165    }
166
167    /// Decrement reference count, returns true if this was the last reference
168    pub fn release(&self) -> bool {
169        self.ref_count.fetch_sub(1, Ordering::AcqRel) == 1
170    }
171
172    /// Get current reference count
173    pub fn ref_count(&self) -> u32 {
174        self.ref_count.load(Ordering::Relaxed)
175    }
176
177    /// Convert entire written portion to Bytes (zero-copy if possible)
178    pub fn freeze(&self) -> Bytes {
179        let len = self.len();
180        if len == 0 {
181            return Bytes::new();
182        }
183        // This does copy, but we could implement a custom Bytes wrapper
184        // that holds a reference to this buffer for true zero-copy
185        Bytes::copy_from_slice(self.get_slice(0, len))
186    }
187}
188
189impl Drop for ZeroCopyBuffer {
190    fn drop(&mut self) {
191        // Safety: We allocated with this layout, and we're the owner
192        unsafe {
193            dealloc(self.data.as_ptr(), self.layout);
194        }
195    }
196}
197
198/// A slice view into a ZeroCopyBuffer
199/// Does not copy data, just holds offset and length
200#[derive(Debug)]
201pub struct BufferSlice {
202    buffer: *const ZeroCopyBuffer,
203    offset: usize,
204    len: usize,
205}
206
207// Safety: BufferSlice only reads from the buffer
208unsafe impl Send for BufferSlice {}
209unsafe impl Sync for BufferSlice {}
210
211impl BufferSlice {
212    /// Get the slice as bytes
213    pub fn as_bytes(&self) -> &[u8] {
214        // Safety: Buffer is valid while slice exists
215        unsafe { &*self.buffer }.get_slice(self.offset, self.len)
216    }
217
218    /// Get a mutable slice for writing
219    /// # Safety
220    /// Caller must ensure exclusive access to this range
221    pub unsafe fn as_mut_bytes(&mut self) -> &mut [u8] {
222        (*self.buffer).get_mut_slice(self.offset, self.len)
223    }
224
225    /// Write data into this slice
226    pub fn write(&mut self, data: &[u8]) -> usize {
227        let write_len = data.len().min(self.len);
228        unsafe {
229            let dest = self.as_mut_bytes();
230            dest[..write_len].copy_from_slice(&data[..write_len]);
231        }
232        write_len
233    }
234
235    /// Get the length of this slice
236    pub fn len(&self) -> usize {
237        self.len
238    }
239
240    /// Check if slice is empty
241    pub fn is_empty(&self) -> bool {
242        self.len == 0
243    }
244
245    /// Get offset within buffer
246    pub fn offset(&self) -> usize {
247        self.offset
248    }
249
250    /// Convert to Bytes (copies the data)
251    pub fn to_bytes(&self) -> Bytes {
252        Bytes::copy_from_slice(self.as_bytes())
253    }
254}
255
256impl Drop for BufferSlice {
257    fn drop(&mut self) {
258        // Release reference to buffer
259        unsafe {
260            (*self.buffer).release();
261        }
262    }
263}
264
265impl Deref for BufferSlice {
266    type Target = [u8];
267
268    fn deref(&self) -> &Self::Target {
269        self.as_bytes()
270    }
271}
272
273impl AsRef<[u8]> for BufferSlice {
274    fn as_ref(&self) -> &[u8] {
275        self.as_bytes()
276    }
277}
278
279/// Pool of zero-copy buffers for efficient allocation
280pub struct ZeroCopyBufferPool {
281    /// Free buffers available for use
282    free_buffers: crossbeam_channel::Sender<Arc<ZeroCopyBuffer>>,
283    /// Receiver for getting free buffers
284    buffer_receiver: crossbeam_channel::Receiver<Arc<ZeroCopyBuffer>>,
285    /// Buffer size
286    buffer_size: usize,
287    /// Next buffer ID
288    next_id: AtomicU64,
289    /// Total buffers created
290    total_created: AtomicU64,
291    /// Buffers currently in use
292    in_use: AtomicU64,
293}
294
295impl ZeroCopyBufferPool {
296    /// Create a new buffer pool
297    pub fn new(buffer_size: usize, initial_count: usize) -> Self {
298        let (tx, rx) = crossbeam_channel::bounded(initial_count * 2);
299
300        let pool = Self {
301            free_buffers: tx,
302            buffer_receiver: rx,
303            buffer_size,
304            next_id: AtomicU64::new(0),
305            total_created: AtomicU64::new(0),
306            in_use: AtomicU64::new(0),
307        };
308
309        // Pre-allocate buffers
310        for _ in 0..initial_count {
311            let id = pool.next_id.fetch_add(1, Ordering::Relaxed);
312            let buffer = Arc::new(ZeroCopyBuffer::with_id(buffer_size, id));
313            pool.total_created.fetch_add(1, Ordering::Relaxed);
314            let _ = pool.free_buffers.try_send(buffer);
315        }
316
317        pool
318    }
319
320    /// Get a buffer from the pool (or create new one)
321    pub fn acquire(&self) -> Arc<ZeroCopyBuffer> {
322        match self.buffer_receiver.try_recv() {
323            Ok(buffer) => {
324                // Try to reset the buffer
325                if Arc::strong_count(&buffer) == 1 {
326                    buffer.reset();
327                }
328                self.in_use.fetch_add(1, Ordering::Relaxed);
329                buffer
330            }
331            Err(_) => {
332                // Create new buffer
333                let id = self.next_id.fetch_add(1, Ordering::Relaxed);
334                let buffer = Arc::new(ZeroCopyBuffer::with_id(self.buffer_size, id));
335                self.total_created.fetch_add(1, Ordering::Relaxed);
336                self.in_use.fetch_add(1, Ordering::Relaxed);
337                buffer
338            }
339        }
340    }
341
342    /// Return a buffer to the pool
343    pub fn release(&self, buffer: Arc<ZeroCopyBuffer>) {
344        self.in_use.fetch_sub(1, Ordering::Relaxed);
345
346        // Only return to pool if we're the only holder
347        if Arc::strong_count(&buffer) == 1 {
348            buffer.reset();
349            let _ = self.free_buffers.try_send(buffer);
350        }
351    }
352
353    /// Get pool statistics
354    pub fn stats(&self) -> BufferPoolStats {
355        BufferPoolStats {
356            buffer_size: self.buffer_size,
357            total_created: self.total_created.load(Ordering::Relaxed),
358            in_use: self.in_use.load(Ordering::Relaxed),
359            available: self.buffer_receiver.len() as u64,
360        }
361    }
362}
363
364#[derive(Debug, Clone)]
365pub struct BufferPoolStats {
366    pub buffer_size: usize,
367    pub total_created: u64,
368    pub in_use: u64,
369    pub available: u64,
370}
371
372/// Zero-copy message for production
373/// Holds references to data without copying
374#[derive(Debug)]
375pub struct ZeroCopyMessage {
376    /// Topic name (interned for efficiency)
377    pub topic: Arc<str>,
378    /// Partition ID
379    pub partition: u32,
380    /// Message key (optional, zero-copy reference)
381    pub key: Option<BufferRef>,
382    /// Message value (zero-copy reference)
383    pub value: BufferRef,
384    /// Message headers
385    pub headers: Vec<(Arc<str>, BufferRef)>,
386    /// Timestamp (milliseconds since epoch)
387    pub timestamp: i64,
388}
389
390/// Reference to data in a buffer (zero-copy)
391#[derive(Debug, Clone)]
392pub enum BufferRef {
393    /// Inline small data (< 64 bytes)
394    Inline(SmallVec),
395    /// Reference to external buffer
396    External(Bytes),
397    /// Reference to zero-copy buffer slice
398    Slice {
399        buffer: Arc<ZeroCopyBuffer>,
400        offset: usize,
401        len: usize,
402    },
403}
404
405impl BufferRef {
406    /// Create from bytes
407    pub fn from_bytes(data: &[u8]) -> Self {
408        if data.len() <= 64 {
409            BufferRef::Inline(SmallVec::from_slice(data))
410        } else {
411            BufferRef::External(Bytes::copy_from_slice(data))
412        }
413    }
414
415    /// Create from Bytes (zero-copy)
416    pub fn from_external(data: Bytes) -> Self {
417        if data.len() <= 64 {
418            BufferRef::Inline(SmallVec::from_slice(&data))
419        } else {
420            BufferRef::External(data)
421        }
422    }
423
424    /// Create from buffer slice
425    pub fn from_slice(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
426        if len <= 64 {
427            let data = buffer.get_slice(offset, len);
428            BufferRef::Inline(SmallVec::from_slice(data))
429        } else {
430            BufferRef::Slice {
431                buffer,
432                offset,
433                len,
434            }
435        }
436    }
437
438    /// Get as bytes slice
439    pub fn as_bytes(&self) -> &[u8] {
440        match self {
441            BufferRef::Inline(sv) => sv.as_slice(),
442            BufferRef::External(b) => b,
443            BufferRef::Slice {
444                buffer,
445                offset,
446                len,
447            } => buffer.get_slice(*offset, *len),
448        }
449    }
450
451    /// Get length
452    pub fn len(&self) -> usize {
453        match self {
454            BufferRef::Inline(sv) => sv.len(),
455            BufferRef::External(b) => b.len(),
456            BufferRef::Slice { len, .. } => *len,
457        }
458    }
459
460    /// Check if empty
461    pub fn is_empty(&self) -> bool {
462        self.len() == 0
463    }
464
465    /// Convert to Bytes
466    pub fn to_bytes(&self) -> Bytes {
467        match self {
468            BufferRef::Inline(sv) => Bytes::copy_from_slice(sv.as_slice()),
469            BufferRef::External(b) => b.clone(),
470            BufferRef::Slice {
471                buffer,
472                offset,
473                len,
474            } => Bytes::copy_from_slice(buffer.get_slice(*offset, *len)),
475        }
476    }
477}
478
479impl AsRef<[u8]> for BufferRef {
480    fn as_ref(&self) -> &[u8] {
481        self.as_bytes()
482    }
483}
484
485/// Small vector for inline data (avoids allocation for small messages)
486#[derive(Debug, Clone)]
487pub struct SmallVec {
488    data: [u8; 64],
489    len: u8,
490}
491
492impl SmallVec {
493    pub fn new() -> Self {
494        Self {
495            data: [0u8; 64],
496            len: 0,
497        }
498    }
499
500    pub fn from_slice(slice: &[u8]) -> Self {
501        let len = slice.len().min(64);
502        let mut sv = Self::new();
503        sv.data[..len].copy_from_slice(&slice[..len]);
504        sv.len = len as u8;
505        sv
506    }
507
508    pub fn as_slice(&self) -> &[u8] {
509        &self.data[..self.len as usize]
510    }
511
512    pub fn len(&self) -> usize {
513        self.len as usize
514    }
515
516    pub fn is_empty(&self) -> bool {
517        self.len == 0
518    }
519}
520
521impl Default for SmallVec {
522    fn default() -> Self {
523        Self::new()
524    }
525}
526
527/// Zero-copy producer for high-throughput message production
528pub struct ZeroCopyProducer {
529    /// Buffer pool for allocating message buffers
530    buffer_pool: Arc<ZeroCopyBufferPool>,
531    /// Current write buffer
532    current_buffer: parking_lot::Mutex<Option<Arc<ZeroCopyBuffer>>>,
533    /// Interned topic names
534    topic_cache: dashmap::DashMap<String, Arc<str>>,
535    /// Statistics
536    stats: ProducerStats,
537}
538
539impl ZeroCopyProducer {
540    /// Create a new zero-copy producer
541    pub fn new(buffer_pool: Arc<ZeroCopyBufferPool>) -> Self {
542        Self {
543            buffer_pool,
544            current_buffer: parking_lot::Mutex::new(None),
545            topic_cache: dashmap::DashMap::new(),
546            stats: ProducerStats::new(),
547        }
548    }
549
550    /// Create a new zero-copy producer with default buffer pool
551    pub fn with_defaults() -> Self {
552        let pool = Arc::new(ZeroCopyBufferPool::new(DEFAULT_BUFFER_SIZE, 16));
553        Self::new(pool)
554    }
555
556    /// Intern a topic name for efficient storage
557    fn intern_topic(&self, topic: &str) -> Arc<str> {
558        if let Some(interned) = self.topic_cache.get(topic) {
559            return interned.clone();
560        }
561
562        let interned: Arc<str> = Arc::from(topic);
563        self.topic_cache.insert(topic.to_string(), interned.clone());
564        interned
565    }
566
567    /// Create a message with zero-copy value
568    pub fn create_message(
569        &self,
570        topic: &str,
571        partition: u32,
572        key: Option<&[u8]>,
573        value: &[u8],
574    ) -> ZeroCopyMessage {
575        self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
576        self.stats
577            .bytes_written
578            .fetch_add(value.len() as u64, Ordering::Relaxed);
579
580        let topic = self.intern_topic(topic);
581        let timestamp = std::time::SystemTime::now()
582            .duration_since(std::time::UNIX_EPOCH)
583            .unwrap_or_default()
584            .as_millis() as i64;
585
586        ZeroCopyMessage {
587            topic,
588            partition,
589            key: key.map(BufferRef::from_bytes),
590            value: BufferRef::from_bytes(value),
591            headers: Vec::new(),
592            timestamp,
593        }
594    }
595
596    /// Create a message from existing Bytes (true zero-copy)
597    pub fn create_message_from_bytes(
598        &self,
599        topic: &str,
600        partition: u32,
601        key: Option<Bytes>,
602        value: Bytes,
603    ) -> ZeroCopyMessage {
604        self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
605        self.stats
606            .bytes_written
607            .fetch_add(value.len() as u64, Ordering::Relaxed);
608
609        let topic = self.intern_topic(topic);
610        let timestamp = std::time::SystemTime::now()
611            .duration_since(std::time::UNIX_EPOCH)
612            .unwrap_or_default()
613            .as_millis() as i64;
614
615        ZeroCopyMessage {
616            topic,
617            partition,
618            key: key.map(BufferRef::from_external),
619            value: BufferRef::from_external(value),
620            headers: Vec::new(),
621            timestamp,
622        }
623    }
624
625    /// Allocate space in current buffer and return slice for direct writing
626    pub fn allocate(&self, size: usize) -> Option<(Arc<ZeroCopyBuffer>, usize)> {
627        let mut guard = self.current_buffer.lock();
628
629        // Try to reserve in current buffer
630        if let Some(ref buffer) = *guard {
631            if let Some(slice) = buffer.reserve(size) {
632                let offset = slice.offset();
633                std::mem::forget(slice); // Don't release ref, we're returning the buffer
634                return Some((buffer.clone(), offset));
635            }
636        }
637
638        // Need a new buffer
639        let buffer = self.buffer_pool.acquire();
640        if let Some(slice) = buffer.reserve(size) {
641            let offset = slice.offset();
642            std::mem::forget(slice);
643            *guard = Some(buffer.clone());
644            return Some((buffer, offset));
645        }
646
647        None
648    }
649
650    /// Get producer statistics
651    pub fn stats(&self) -> ProducerStatsSnapshot {
652        ProducerStatsSnapshot {
653            messages_created: self.stats.messages_created.load(Ordering::Relaxed),
654            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
655            buffer_pool: self.buffer_pool.stats(),
656        }
657    }
658}
659
660struct ProducerStats {
661    messages_created: AtomicU64,
662    bytes_written: AtomicU64,
663}
664
665impl ProducerStats {
666    fn new() -> Self {
667        Self {
668            messages_created: AtomicU64::new(0),
669            bytes_written: AtomicU64::new(0),
670        }
671    }
672}
673
674#[derive(Debug, Clone)]
675pub struct ProducerStatsSnapshot {
676    pub messages_created: u64,
677    pub bytes_written: u64,
678    pub buffer_pool: BufferPoolStats,
679}
680
681/// Zero-copy consumer for high-throughput message consumption
682pub struct ZeroCopyConsumer {
683    /// Read buffer for batch reads
684    read_buffer: parking_lot::Mutex<BytesMut>,
685    /// Statistics
686    stats: ConsumerStats,
687}
688
689impl ZeroCopyConsumer {
690    /// Create a new zero-copy consumer
691    pub fn new() -> Self {
692        Self {
693            read_buffer: parking_lot::Mutex::new(BytesMut::with_capacity(DEFAULT_BUFFER_SIZE)),
694            stats: ConsumerStats::new(),
695        }
696    }
697
698    /// Parse messages from a bytes buffer without copying
699    pub fn parse_messages(&self, data: Bytes) -> Vec<ConsumedMessage> {
700        let mut messages = Vec::new();
701        let mut offset = 0;
702
703        while offset < data.len() {
704            // Minimum message header: 4 (len) + 8 (offset) + 8 (timestamp) = 20 bytes
705            if offset + 20 > data.len() {
706                break;
707            }
708
709            // Read message length
710            let msg_len = u32::from_be_bytes([
711                data[offset],
712                data[offset + 1],
713                data[offset + 2],
714                data[offset + 3],
715            ]) as usize;
716
717            if offset + 4 + msg_len > data.len() {
718                break;
719            }
720
721            // Create a slice of the message data (zero-copy)
722            let msg_data = data.slice(offset + 4..offset + 4 + msg_len);
723
724            if let Some(msg) = self.parse_single_message(msg_data) {
725                messages.push(msg);
726                self.stats.messages_consumed.fetch_add(1, Ordering::Relaxed);
727            }
728
729            offset += 4 + msg_len;
730        }
731
732        self.stats
733            .bytes_read
734            .fetch_add(offset as u64, Ordering::Relaxed);
735        messages
736    }
737
738    /// Parse a single message from bytes
739    fn parse_single_message(&self, data: Bytes) -> Option<ConsumedMessage> {
740        if data.len() < 16 {
741            return None;
742        }
743
744        let msg_offset = u64::from_be_bytes([
745            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
746        ]);
747
748        let timestamp = i64::from_be_bytes([
749            data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
750        ]);
751
752        // Rest is the value (simplified - real impl would have key + headers)
753        let value = data.slice(16..);
754
755        Some(ConsumedMessage {
756            offset: msg_offset,
757            timestamp,
758            key: None,
759            value,
760        })
761    }
762
763    /// Get consumer statistics
764    pub fn stats(&self) -> ConsumerStatsSnapshot {
765        ConsumerStatsSnapshot {
766            messages_consumed: self.stats.messages_consumed.load(Ordering::Relaxed),
767            bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
768        }
769    }
770
771    /// Copy data into internal buffer for processing (useful for network reads)
772    pub fn buffer_data(&self, data: &[u8]) -> Bytes {
773        let mut buffer = self.read_buffer.lock();
774        buffer.clear();
775        buffer.extend_from_slice(data);
776        buffer.clone().freeze()
777    }
778}
779
780impl Default for ZeroCopyConsumer {
781    fn default() -> Self {
782        Self::new()
783    }
784}
785
786struct ConsumerStats {
787    messages_consumed: AtomicU64,
788    bytes_read: AtomicU64,
789}
790
791impl ConsumerStats {
792    fn new() -> Self {
793        Self {
794            messages_consumed: AtomicU64::new(0),
795            bytes_read: AtomicU64::new(0),
796        }
797    }
798}
799
800#[derive(Debug, Clone)]
801pub struct ConsumerStatsSnapshot {
802    pub messages_consumed: u64,
803    pub bytes_read: u64,
804}
805
806/// A consumed message with zero-copy data access
807#[derive(Debug, Clone)]
808pub struct ConsumedMessage {
809    /// Message offset
810    pub offset: u64,
811    /// Timestamp (milliseconds since epoch)
812    pub timestamp: i64,
813    /// Message key (zero-copy)
814    pub key: Option<Bytes>,
815    /// Message value (zero-copy)
816    pub value: Bytes,
817}
818
819impl ConsumedMessage {
820    /// Get value as string (copies if not valid UTF-8)
821    pub fn value_str(&self) -> Option<&str> {
822        std::str::from_utf8(&self.value).ok()
823    }
824
825    /// Get key as string
826    pub fn key_str(&self) -> Option<&str> {
827        self.key.as_ref().and_then(|k| std::str::from_utf8(k).ok())
828    }
829}
830
831#[cfg(test)]
832mod tests {
833    use super::*;
834
835    #[test]
836    fn test_zero_copy_buffer_basic() {
837        let buffer = ZeroCopyBuffer::new(1024);
838        assert_eq!(buffer.len(), 0);
839        assert!(buffer.remaining() >= 1024);
840
841        // Reserve space
842        let slice = buffer.reserve(100).unwrap();
843        assert_eq!(slice.len(), 100);
844        assert_eq!(buffer.len(), 100);
845    }
846
847    #[test]
848    fn test_zero_copy_buffer_write() {
849        let buffer = ZeroCopyBuffer::new(1024);
850
851        let mut slice = buffer.reserve(11).unwrap();
852        slice.write(b"Hello World");
853
854        assert_eq!(slice.as_bytes(), b"Hello World");
855    }
856
857    #[test]
858    fn test_buffer_pool() {
859        let pool = ZeroCopyBufferPool::new(1024, 4);
860        let stats = pool.stats();
861        assert_eq!(stats.total_created, 4);
862        assert_eq!(stats.available, 4);
863
864        // Acquire buffers
865        let b1 = pool.acquire();
866        let b2 = pool.acquire();
867
868        let stats = pool.stats();
869        assert_eq!(stats.in_use, 2);
870
871        // Release buffers
872        pool.release(b1);
873        pool.release(b2);
874
875        let stats = pool.stats();
876        assert_eq!(stats.in_use, 0);
877    }
878
879    #[test]
880    fn test_buffer_ref_inline() {
881        let small_data = b"small";
882        let buf_ref = BufferRef::from_bytes(small_data);
883
884        match buf_ref {
885            BufferRef::Inline(_) => {}
886            _ => panic!("Expected inline storage for small data"),
887        }
888
889        assert_eq!(buf_ref.as_bytes(), small_data);
890    }
891
892    #[test]
893    fn test_buffer_ref_external() {
894        let large_data = vec![0u8; 100];
895        let buf_ref = BufferRef::from_bytes(&large_data);
896
897        match buf_ref {
898            BufferRef::External(_) => {}
899            _ => panic!("Expected external storage for large data"),
900        }
901
902        assert_eq!(buf_ref.len(), 100);
903    }
904
905    #[test]
906    fn test_zero_copy_producer() {
907        let producer = ZeroCopyProducer::with_defaults();
908
909        let msg = producer.create_message("test-topic", 0, Some(b"key1"), b"value1");
910
911        assert_eq!(&*msg.topic, "test-topic");
912        assert_eq!(msg.partition, 0);
913        assert_eq!(msg.key.unwrap().as_bytes(), b"key1");
914        assert_eq!(msg.value.as_bytes(), b"value1");
915
916        let stats = producer.stats();
917        assert_eq!(stats.messages_created, 1);
918    }
919
920    #[test]
921    fn test_zero_copy_consumer() {
922        let consumer = ZeroCopyConsumer::new();
923
924        // Create a simple message format
925        let mut data = BytesMut::new();
926
927        // Message length (16 + 5 = 21 bytes)
928        data.extend_from_slice(&21u32.to_be_bytes());
929        // Offset
930        data.extend_from_slice(&42u64.to_be_bytes());
931        // Timestamp
932        data.extend_from_slice(&1234567890i64.to_be_bytes());
933        // Value
934        data.extend_from_slice(b"hello");
935
936        let messages = consumer.parse_messages(data.freeze());
937
938        assert_eq!(messages.len(), 1);
939        assert_eq!(messages[0].offset, 42);
940        assert_eq!(messages[0].timestamp, 1234567890);
941        assert_eq!(&messages[0].value[..], b"hello");
942    }
943
944    #[test]
945    fn test_small_vec() {
946        let sv = SmallVec::from_slice(b"test data");
947        assert_eq!(sv.as_slice(), b"test data");
948        assert_eq!(sv.len(), 9);
949    }
950
951    #[test]
952    fn test_topic_interning() {
953        let producer = ZeroCopyProducer::with_defaults();
954
955        let msg1 = producer.create_message("topic-a", 0, None, b"v1");
956        let msg2 = producer.create_message("topic-a", 0, None, b"v2");
957        let msg3 = producer.create_message("topic-b", 0, None, b"v3");
958
959        // Same topic should share the same Arc
960        assert!(Arc::ptr_eq(&msg1.topic, &msg2.topic));
961        // Different topics should not
962        assert!(!Arc::ptr_eq(&msg1.topic, &msg3.topic));
963    }
964}