Skip to main content

rivven_core/
zero_copy.rs

1//! Zero-Copy Producer/Consumer API
2//!
3//! Provides high-performance data paths that eliminate unnecessary memory copies:
4//! - **ZeroCopyBuffer**: Pre-allocated buffers with reference counting
5//! - **BufferSlice**: View into buffer without copying
6//! - **ZeroCopyProducer**: Produces messages without copying payload
7//! - **ZeroCopyConsumer**: Consumes messages with zero-copy access
8//!
9//! # Performance Characteristics
10//!
11//! - Eliminates 2-3 copies per message on hot path
12//! - Uses memory-mapped I/O for disk access
13//! - Reference-counted buffers for safe sharing
14//! - Cache-line aligned for optimal CPU performance
15//!
16//! # Safety Requirements
17//!
18//! The `ZeroCopyBuffer` type relies on `Arc` for reference counting.
19//! When using `BufferSlice`, the following safety invariants must be maintained:
20//!
21//! 1. **Lifetime**: A `ZeroCopyBuffer` must outlive all `BufferSlice` instances
22//!    created from it. The recommended pattern is to use `Arc<ZeroCopyBuffer>`
23//!    via `ZeroCopyBufferPool`.
24//!
25//! 2. **Reference Counting**: `BufferSlice::drop()` decrements the buffer's
26//!    lifetime. `BufferSlice` holds an `Arc<ZeroCopyBuffer>` automatically.
27//!    exist.
28//!
29//! 3. **Thread Safety**: While individual operations are atomic, the caller must
30//!    ensure exclusive access to mutable slice regions.
31//!
32//! # Recommended Usage
33//!
34//! Use `ZeroCopyBufferPool` to manage buffer lifecycle safely:
35//!
36//! ```rust,ignore
37//! let pool = ZeroCopyBufferPool::new(256 * 1024, 16);
38//! let buffer = pool.acquire();  // Returns Arc<ZeroCopyBuffer>
39//! // buffer is safely reference-counted
40//! ```
41
42use bytes::{Bytes, BytesMut};
43use std::alloc::{alloc, dealloc, Layout};
44use std::ops::Deref;
45use std::ptr::NonNull;
46use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
47use std::sync::Arc;
48
49/// Cache line size for alignment (64 bytes on most modern CPUs)
50const CACHE_LINE_SIZE: usize = 64;
51
52/// Default buffer size (256 KB - optimal for most workloads)
53const DEFAULT_BUFFER_SIZE: usize = 256 * 1024;
54
55/// A zero-copy buffer with memory pooling
56///
57/// Reference counting is handled by `Arc<ZeroCopyBuffer>` — the internal
58/// `ref_count` field has been removed in favour of a single source of truth.
59#[derive(Debug)]
60pub struct ZeroCopyBuffer {
61    /// Raw pointer to the buffer data
62    data: NonNull<u8>,
63    /// Total capacity of the buffer
64    capacity: usize,
65    /// Current write position
66    write_pos: AtomicUsize,
67    /// Buffer ID for tracking
68    id: u64,
69    /// Layout used for allocation (needed for deallocation)
70    layout: Layout,
71}
72
73// Safety: ZeroCopyBuffer uses atomic operations for thread safety
74unsafe impl Send for ZeroCopyBuffer {}
75unsafe impl Sync for ZeroCopyBuffer {}
76
77impl ZeroCopyBuffer {
78    /// Create a new zero-copy buffer with the given capacity
79    pub fn new(capacity: usize) -> Self {
80        Self::with_id(capacity, 0)
81    }
82
83    /// Create a new zero-copy buffer with custom ID
84    pub fn with_id(capacity: usize, id: u64) -> Self {
85        // Align to cache line for optimal performance
86        let aligned_capacity = (capacity + CACHE_LINE_SIZE - 1) & !(CACHE_LINE_SIZE - 1);
87        let layout =
88            Layout::from_size_align(aligned_capacity, CACHE_LINE_SIZE).expect("Invalid layout");
89
90        // Safety: We're allocating with a valid layout
91        let data = unsafe {
92            let ptr = alloc(layout);
93            if ptr.is_null() {
94                std::alloc::handle_alloc_error(layout);
95            }
96            NonNull::new_unchecked(ptr)
97        };
98
99        Self {
100            data,
101            capacity: aligned_capacity,
102            write_pos: AtomicUsize::new(0),
103            id,
104            layout,
105        }
106    }
107
108    /// Get a slice of the buffer for writing.
109    /// Requires an Arc reference to safely create a BufferSlice.
110    pub fn reserve(self: &Arc<Self>, len: usize) -> Option<BufferSlice> {
111        loop {
112            let current = self.write_pos.load(Ordering::Acquire);
113            let new_pos = current + len;
114
115            if new_pos > self.capacity {
116                return None;
117            }
118
119            if self
120                .write_pos
121                .compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
122                .is_ok()
123            {
124                return Some(BufferSlice::new(Arc::clone(self), current, len));
125            }
126            // CAS failed, retry
127            std::hint::spin_loop();
128        }
129    }
130
131    /// Get a mutable slice for the reserved range
132    /// # Safety
133    /// Caller must ensure exclusive access to this range.
134    /// The mutable borrow from immutable self is intentional - this is interior
135    /// mutability via raw pointers with atomic coordination for lock-free access.
136    #[allow(clippy::mut_from_ref)]
137    pub unsafe fn get_mut_slice(&self, offset: usize, len: usize) -> &mut [u8] {
138        assert!(
139            offset + len <= self.capacity,
140            "get_mut_slice out of bounds: offset={} len={} capacity={}",
141            offset,
142            len,
143            self.capacity
144        );
145        std::slice::from_raw_parts_mut(self.data.as_ptr().add(offset), len)
146    }
147
148    /// Get an immutable slice
149    pub fn get_slice(&self, offset: usize, len: usize) -> &[u8] {
150        let write_pos = self.write_pos.load(Ordering::Acquire);
151        assert!(
152            offset + len <= write_pos,
153            "get_slice out of bounds: offset={} len={} write_pos={}",
154            offset,
155            len,
156            write_pos
157        );
158        unsafe { std::slice::from_raw_parts(self.data.as_ptr().add(offset), len) }
159    }
160
161    /// Get the current write position
162    pub fn len(&self) -> usize {
163        self.write_pos.load(Ordering::Acquire)
164    }
165
166    /// Check if buffer is empty
167    pub fn is_empty(&self) -> bool {
168        self.len() == 0
169    }
170
171    /// Get remaining capacity
172    pub fn remaining(&self) -> usize {
173        self.capacity - self.len()
174    }
175
176    /// Get total capacity
177    pub fn capacity(&self) -> usize {
178        self.capacity
179    }
180
181    /// Get buffer ID
182    pub fn id(&self) -> u64 {
183        self.id
184    }
185
186    /// Reset buffer for reuse.
187    ///
188    /// Resets the write position to 0. The caller must ensure exclusive ownership
189    /// (e.g. `Arc::strong_count() == 1`) before calling.
190    pub fn reset(&self) -> bool {
191        self.write_pos.store(0, Ordering::Release);
192        true
193    }
194
195    /// Compatibility shim — ref counting is now handled by `Arc`.
196    #[deprecated(note = "Use Arc::clone instead")]
197    pub fn add_ref(&self) {}
198
199    /// Allocate `len` bytes and advance the write position, returning the start offset.
200    /// Does NOT create a `BufferSlice` — the caller is expected
201    /// to hold an `Arc<ZeroCopyBuffer>` which keeps the buffer alive.
202    pub fn try_allocate(&self, len: usize) -> Option<usize> {
203        loop {
204            let current = self.write_pos.load(Ordering::Acquire);
205            let new_pos = current + len;
206
207            if new_pos > self.capacity {
208                return None;
209            }
210
211            if self
212                .write_pos
213                .compare_exchange_weak(current, new_pos, Ordering::AcqRel, Ordering::Relaxed)
214                .is_ok()
215            {
216                return Some(current);
217            }
218            std::hint::spin_loop();
219        }
220    }
221
222    /// Compatibility shim — ref counting is now handled by `Arc`.
223    #[deprecated(note = "Use Arc::strong_count instead")]
224    pub fn release(&self) -> bool {
225        false
226    }
227
228    /// Compatibility shim — ref counting is now handled by `Arc`.
229    #[deprecated(note = "Use Arc::strong_count instead")]
230    pub fn ref_count(&self) -> u32 {
231        0
232    }
233
234    /// Convert entire written portion to Bytes (zero-copy if possible)
235    pub fn freeze(&self) -> Bytes {
236        let len = self.len();
237        if len == 0 {
238            return Bytes::new();
239        }
240        // This does copy, but we could implement a custom Bytes wrapper
241        // that holds a reference to this buffer for true zero-copy
242        Bytes::copy_from_slice(self.get_slice(0, len))
243    }
244}
245
246impl Drop for ZeroCopyBuffer {
247    fn drop(&mut self) {
248        // Safety: We allocated with this layout, and we're the owner
249        unsafe {
250            dealloc(self.data.as_ptr(), self.layout);
251        }
252    }
253}
254
255/// A slice view into a ZeroCopyBuffer
256/// Holds an `Arc` to the underlying buffer for safe, reference-counted access.
257#[derive(Debug, Clone)]
258pub struct BufferSlice {
259    buffer: Arc<ZeroCopyBuffer>,
260    offset: usize,
261    len: usize,
262}
263
264impl BufferSlice {
265    /// Create a BufferSlice from an Arc reference
266    pub fn new(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
267        Self {
268            buffer,
269            offset,
270            len,
271        }
272    }
273
274    /// Get the slice as bytes
275    pub fn as_bytes(&self) -> &[u8] {
276        self.buffer.get_slice(self.offset, self.len)
277    }
278
279    /// Get a mutable slice for writing
280    /// # Safety
281    /// Caller must ensure exclusive access to this range
282    pub unsafe fn as_mut_bytes(&mut self) -> &mut [u8] {
283        self.buffer.get_mut_slice(self.offset, self.len)
284    }
285
286    /// Write data into this slice
287    pub fn write(&mut self, data: &[u8]) -> usize {
288        let write_len = data.len().min(self.len);
289        unsafe {
290            let dest = self.as_mut_bytes();
291            dest[..write_len].copy_from_slice(&data[..write_len]);
292        }
293        write_len
294    }
295
296    /// Get the length of this slice
297    pub fn len(&self) -> usize {
298        self.len
299    }
300
301    /// Check if slice is empty
302    pub fn is_empty(&self) -> bool {
303        self.len == 0
304    }
305
306    /// Get offset within buffer
307    pub fn offset(&self) -> usize {
308        self.offset
309    }
310
311    /// Convert to Bytes (copies the data)
312    pub fn to_bytes(&self) -> Bytes {
313        Bytes::copy_from_slice(self.as_bytes())
314    }
315}
316
317impl Deref for BufferSlice {
318    type Target = [u8];
319
320    fn deref(&self) -> &Self::Target {
321        self.as_bytes()
322    }
323}
324
325impl AsRef<[u8]> for BufferSlice {
326    fn as_ref(&self) -> &[u8] {
327        self.as_bytes()
328    }
329}
330
331/// Pool of zero-copy buffers for efficient allocation
332pub struct ZeroCopyBufferPool {
333    /// Free buffers available for use
334    free_buffers: crossbeam_channel::Sender<Arc<ZeroCopyBuffer>>,
335    /// Receiver for getting free buffers
336    buffer_receiver: crossbeam_channel::Receiver<Arc<ZeroCopyBuffer>>,
337    /// Buffer size
338    buffer_size: usize,
339    /// Next buffer ID
340    next_id: AtomicU64,
341    /// Total buffers created
342    total_created: AtomicU64,
343    /// Buffers currently in use
344    in_use: AtomicU64,
345}
346
347impl ZeroCopyBufferPool {
348    /// Create a new buffer pool
349    pub fn new(buffer_size: usize, initial_count: usize) -> Self {
350        let (tx, rx) = crossbeam_channel::bounded(initial_count * 2);
351
352        let pool = Self {
353            free_buffers: tx,
354            buffer_receiver: rx,
355            buffer_size,
356            next_id: AtomicU64::new(0),
357            total_created: AtomicU64::new(0),
358            in_use: AtomicU64::new(0),
359        };
360
361        // Pre-allocate buffers
362        for _ in 0..initial_count {
363            let id = pool.next_id.fetch_add(1, Ordering::Relaxed);
364            let buffer = Arc::new(ZeroCopyBuffer::with_id(buffer_size, id));
365            pool.total_created.fetch_add(1, Ordering::Relaxed);
366            let _ = pool.free_buffers.try_send(buffer);
367        }
368
369        pool
370    }
371
372    /// Get a buffer from the pool (or create new one)
373    pub fn acquire(&self) -> Arc<ZeroCopyBuffer> {
374        match self.buffer_receiver.try_recv() {
375            Ok(buffer) => {
376                // Try to reset the buffer
377                if Arc::strong_count(&buffer) == 1 {
378                    buffer.reset();
379                }
380                self.in_use.fetch_add(1, Ordering::Relaxed);
381                buffer
382            }
383            Err(_) => {
384                // Create new buffer
385                let id = self.next_id.fetch_add(1, Ordering::Relaxed);
386                let buffer = Arc::new(ZeroCopyBuffer::with_id(self.buffer_size, id));
387                self.total_created.fetch_add(1, Ordering::Relaxed);
388                self.in_use.fetch_add(1, Ordering::Relaxed);
389                buffer
390            }
391        }
392    }
393
394    /// Return a buffer to the pool
395    pub fn release(&self, buffer: Arc<ZeroCopyBuffer>) {
396        self.in_use.fetch_sub(1, Ordering::Relaxed);
397
398        // Only return to pool if we're the only holder
399        if Arc::strong_count(&buffer) == 1 {
400            buffer.reset();
401            let _ = self.free_buffers.try_send(buffer);
402        }
403    }
404
405    /// Get pool statistics
406    pub fn stats(&self) -> BufferPoolStats {
407        BufferPoolStats {
408            buffer_size: self.buffer_size,
409            total_created: self.total_created.load(Ordering::Relaxed),
410            in_use: self.in_use.load(Ordering::Relaxed),
411            available: self.buffer_receiver.len() as u64,
412        }
413    }
414}
415
416#[derive(Debug, Clone)]
417pub struct BufferPoolStats {
418    pub buffer_size: usize,
419    pub total_created: u64,
420    pub in_use: u64,
421    pub available: u64,
422}
423
424/// Zero-copy message for production
425/// Holds references to data without copying
426#[derive(Debug)]
427pub struct ZeroCopyMessage {
428    /// Topic name (interned for efficiency)
429    pub topic: Arc<str>,
430    /// Partition ID
431    pub partition: u32,
432    /// Message key (optional, zero-copy reference)
433    pub key: Option<BufferRef>,
434    /// Message value (zero-copy reference)
435    pub value: BufferRef,
436    /// Message headers
437    pub headers: Vec<(Arc<str>, BufferRef)>,
438    /// Timestamp (milliseconds since epoch)
439    pub timestamp: i64,
440}
441
442/// Reference to data in a buffer (zero-copy)
443#[derive(Debug, Clone)]
444pub enum BufferRef {
445    /// Inline small data (< 64 bytes)
446    Inline(SmallVec),
447    /// Reference to external buffer
448    External(Bytes),
449    /// Reference to zero-copy buffer slice
450    Slice {
451        buffer: Arc<ZeroCopyBuffer>,
452        offset: usize,
453        len: usize,
454    },
455}
456
457impl BufferRef {
458    /// Create from bytes
459    pub fn from_bytes(data: &[u8]) -> Self {
460        if data.len() <= 64 {
461            BufferRef::Inline(SmallVec::from_slice(data))
462        } else {
463            BufferRef::External(Bytes::copy_from_slice(data))
464        }
465    }
466
467    /// Create from Bytes (zero-copy)
468    pub fn from_external(data: Bytes) -> Self {
469        if data.len() <= 64 {
470            BufferRef::Inline(SmallVec::from_slice(&data))
471        } else {
472            BufferRef::External(data)
473        }
474    }
475
476    /// Create from buffer slice
477    pub fn from_slice(buffer: Arc<ZeroCopyBuffer>, offset: usize, len: usize) -> Self {
478        if len <= 64 {
479            let data = buffer.get_slice(offset, len);
480            BufferRef::Inline(SmallVec::from_slice(data))
481        } else {
482            BufferRef::Slice {
483                buffer,
484                offset,
485                len,
486            }
487        }
488    }
489
490    /// Get as bytes slice
491    pub fn as_bytes(&self) -> &[u8] {
492        match self {
493            BufferRef::Inline(sv) => sv.as_slice(),
494            BufferRef::External(b) => b,
495            BufferRef::Slice {
496                buffer,
497                offset,
498                len,
499            } => buffer.get_slice(*offset, *len),
500        }
501    }
502
503    /// Get length
504    pub fn len(&self) -> usize {
505        match self {
506            BufferRef::Inline(sv) => sv.len(),
507            BufferRef::External(b) => b.len(),
508            BufferRef::Slice { len, .. } => *len,
509        }
510    }
511
512    /// Check if empty
513    pub fn is_empty(&self) -> bool {
514        self.len() == 0
515    }
516
517    /// Convert to Bytes
518    pub fn to_bytes(&self) -> Bytes {
519        match self {
520            BufferRef::Inline(sv) => Bytes::copy_from_slice(sv.as_slice()),
521            BufferRef::External(b) => b.clone(),
522            BufferRef::Slice {
523                buffer,
524                offset,
525                len,
526            } => Bytes::copy_from_slice(buffer.get_slice(*offset, *len)),
527        }
528    }
529}
530
531impl AsRef<[u8]> for BufferRef {
532    fn as_ref(&self) -> &[u8] {
533        self.as_bytes()
534    }
535}
536
537/// Small vector for inline data (avoids allocation for small messages)
538#[derive(Debug, Clone)]
539pub struct SmallVec {
540    data: [u8; 64],
541    len: u8,
542}
543
544impl SmallVec {
545    pub fn new() -> Self {
546        Self {
547            data: [0u8; 64],
548            len: 0,
549        }
550    }
551
552    pub fn from_slice(slice: &[u8]) -> Self {
553        let len = slice.len().min(64);
554        let mut sv = Self::new();
555        sv.data[..len].copy_from_slice(&slice[..len]);
556        sv.len = len as u8;
557        sv
558    }
559
560    pub fn as_slice(&self) -> &[u8] {
561        &self.data[..self.len as usize]
562    }
563
564    pub fn len(&self) -> usize {
565        self.len as usize
566    }
567
568    pub fn is_empty(&self) -> bool {
569        self.len == 0
570    }
571}
572
573impl Default for SmallVec {
574    fn default() -> Self {
575        Self::new()
576    }
577}
578
579/// Zero-copy producer for high-throughput message production
580pub struct ZeroCopyProducer {
581    /// Buffer pool for allocating message buffers
582    buffer_pool: Arc<ZeroCopyBufferPool>,
583    /// Current write buffer
584    current_buffer: parking_lot::Mutex<Option<Arc<ZeroCopyBuffer>>>,
585    /// Interned topic names
586    topic_cache: dashmap::DashMap<String, Arc<str>>,
587    /// Statistics
588    stats: ProducerStats,
589}
590
591impl ZeroCopyProducer {
592    /// Create a new zero-copy producer
593    pub fn new(buffer_pool: Arc<ZeroCopyBufferPool>) -> Self {
594        Self {
595            buffer_pool,
596            current_buffer: parking_lot::Mutex::new(None),
597            topic_cache: dashmap::DashMap::new(),
598            stats: ProducerStats::new(),
599        }
600    }
601
602    /// Create a new zero-copy producer with default buffer pool
603    pub fn with_defaults() -> Self {
604        let pool = Arc::new(ZeroCopyBufferPool::new(DEFAULT_BUFFER_SIZE, 16));
605        Self::new(pool)
606    }
607
608    /// Intern a topic name for efficient storage
609    fn intern_topic(&self, topic: &str) -> Arc<str> {
610        if let Some(interned) = self.topic_cache.get(topic) {
611            return interned.clone();
612        }
613
614        let interned: Arc<str> = Arc::from(topic);
615        self.topic_cache.insert(topic.to_string(), interned.clone());
616        interned
617    }
618
619    /// Create a message with zero-copy value
620    pub fn create_message(
621        &self,
622        topic: &str,
623        partition: u32,
624        key: Option<&[u8]>,
625        value: &[u8],
626    ) -> ZeroCopyMessage {
627        self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
628        self.stats
629            .bytes_written
630            .fetch_add(value.len() as u64, Ordering::Relaxed);
631
632        let topic = self.intern_topic(topic);
633        let timestamp = std::time::SystemTime::now()
634            .duration_since(std::time::UNIX_EPOCH)
635            .unwrap_or_default()
636            .as_millis() as i64;
637
638        ZeroCopyMessage {
639            topic,
640            partition,
641            key: key.map(BufferRef::from_bytes),
642            value: BufferRef::from_bytes(value),
643            headers: Vec::new(),
644            timestamp,
645        }
646    }
647
648    /// Create a message from existing Bytes (true zero-copy)
649    pub fn create_message_from_bytes(
650        &self,
651        topic: &str,
652        partition: u32,
653        key: Option<Bytes>,
654        value: Bytes,
655    ) -> ZeroCopyMessage {
656        self.stats.messages_created.fetch_add(1, Ordering::Relaxed);
657        self.stats
658            .bytes_written
659            .fetch_add(value.len() as u64, Ordering::Relaxed);
660
661        let topic = self.intern_topic(topic);
662        let timestamp = std::time::SystemTime::now()
663            .duration_since(std::time::UNIX_EPOCH)
664            .unwrap_or_default()
665            .as_millis() as i64;
666
667        ZeroCopyMessage {
668            topic,
669            partition,
670            key: key.map(BufferRef::from_external),
671            value: BufferRef::from_external(value),
672            headers: Vec::new(),
673            timestamp,
674        }
675    }
676
677    /// Allocate space in current buffer and return (buffer, offset) for direct writing.
678    ///
679    /// The caller is responsible for writing into the buffer at the returned offset.
680    /// Unlike `reserve()`, this does not create a `BufferSlice` — it increments the
681    /// buffer's write position atomically and returns the raw offset.
682    pub fn allocate(&self, size: usize) -> Option<(Arc<ZeroCopyBuffer>, usize)> {
683        let mut guard = self.current_buffer.lock();
684
685        // Try to reserve in current buffer
686        if let Some(ref buffer) = *guard {
687            if let Some(offset) = buffer.try_allocate(size) {
688                return Some((buffer.clone(), offset));
689            }
690        }
691
692        // Need a new buffer
693        let buffer = self.buffer_pool.acquire();
694        if let Some(offset) = buffer.try_allocate(size) {
695            *guard = Some(buffer.clone());
696            return Some((buffer, offset));
697        }
698
699        None
700    }
701
702    /// Get producer statistics
703    pub fn stats(&self) -> ProducerStatsSnapshot {
704        ProducerStatsSnapshot {
705            messages_created: self.stats.messages_created.load(Ordering::Relaxed),
706            bytes_written: self.stats.bytes_written.load(Ordering::Relaxed),
707            buffer_pool: self.buffer_pool.stats(),
708        }
709    }
710}
711
712struct ProducerStats {
713    messages_created: AtomicU64,
714    bytes_written: AtomicU64,
715}
716
717impl ProducerStats {
718    fn new() -> Self {
719        Self {
720            messages_created: AtomicU64::new(0),
721            bytes_written: AtomicU64::new(0),
722        }
723    }
724}
725
726#[derive(Debug, Clone)]
727pub struct ProducerStatsSnapshot {
728    pub messages_created: u64,
729    pub bytes_written: u64,
730    pub buffer_pool: BufferPoolStats,
731}
732
733/// Zero-copy consumer for high-throughput message consumption
734pub struct ZeroCopyConsumer {
735    /// Read buffer for batch reads
736    read_buffer: parking_lot::Mutex<BytesMut>,
737    /// Statistics
738    stats: ConsumerStats,
739}
740
741impl ZeroCopyConsumer {
742    /// Create a new zero-copy consumer
743    pub fn new() -> Self {
744        Self {
745            read_buffer: parking_lot::Mutex::new(BytesMut::with_capacity(DEFAULT_BUFFER_SIZE)),
746            stats: ConsumerStats::new(),
747        }
748    }
749
750    /// Parse messages from a bytes buffer without copying
751    pub fn parse_messages(&self, data: Bytes) -> Vec<ConsumedMessage> {
752        let mut messages = Vec::new();
753        let mut offset = 0;
754
755        while offset < data.len() {
756            // Minimum message header: 4 (len) + 8 (offset) + 8 (timestamp) = 20 bytes
757            if offset + 20 > data.len() {
758                break;
759            }
760
761            // Read message length
762            let msg_len = u32::from_be_bytes([
763                data[offset],
764                data[offset + 1],
765                data[offset + 2],
766                data[offset + 3],
767            ]) as usize;
768
769            if offset + 4 + msg_len > data.len() {
770                break;
771            }
772
773            // Create a slice of the message data (zero-copy)
774            let msg_data = data.slice(offset + 4..offset + 4 + msg_len);
775
776            if let Some(msg) = self.parse_single_message(msg_data) {
777                messages.push(msg);
778                self.stats.messages_consumed.fetch_add(1, Ordering::Relaxed);
779            }
780
781            offset += 4 + msg_len;
782        }
783
784        self.stats
785            .bytes_read
786            .fetch_add(offset as u64, Ordering::Relaxed);
787        messages
788    }
789
790    /// Parse a single message from bytes
791    fn parse_single_message(&self, data: Bytes) -> Option<ConsumedMessage> {
792        if data.len() < 16 {
793            return None;
794        }
795
796        let msg_offset = u64::from_be_bytes([
797            data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7],
798        ]);
799
800        let timestamp = i64::from_be_bytes([
801            data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15],
802        ]);
803
804        // Rest is the value (simplified - real impl would have key + headers)
805        let value = data.slice(16..);
806
807        Some(ConsumedMessage {
808            offset: msg_offset,
809            timestamp,
810            key: None,
811            value,
812        })
813    }
814
815    /// Get consumer statistics
816    pub fn stats(&self) -> ConsumerStatsSnapshot {
817        ConsumerStatsSnapshot {
818            messages_consumed: self.stats.messages_consumed.load(Ordering::Relaxed),
819            bytes_read: self.stats.bytes_read.load(Ordering::Relaxed),
820        }
821    }
822
823    /// Copy data into internal buffer for processing (useful for network reads)
824    pub fn buffer_data(&self, data: &[u8]) -> Bytes {
825        let mut buffer = self.read_buffer.lock();
826        buffer.clear();
827        buffer.extend_from_slice(data);
828        buffer.clone().freeze()
829    }
830}
831
832impl Default for ZeroCopyConsumer {
833    fn default() -> Self {
834        Self::new()
835    }
836}
837
838struct ConsumerStats {
839    messages_consumed: AtomicU64,
840    bytes_read: AtomicU64,
841}
842
843impl ConsumerStats {
844    fn new() -> Self {
845        Self {
846            messages_consumed: AtomicU64::new(0),
847            bytes_read: AtomicU64::new(0),
848        }
849    }
850}
851
852#[derive(Debug, Clone)]
853pub struct ConsumerStatsSnapshot {
854    pub messages_consumed: u64,
855    pub bytes_read: u64,
856}
857
858/// A consumed message with zero-copy data access
859#[derive(Debug, Clone)]
860pub struct ConsumedMessage {
861    /// Message offset
862    pub offset: u64,
863    /// Timestamp (milliseconds since epoch)
864    pub timestamp: i64,
865    /// Message key (zero-copy)
866    pub key: Option<Bytes>,
867    /// Message value (zero-copy)
868    pub value: Bytes,
869}
870
871impl ConsumedMessage {
872    /// Get value as string (copies if not valid UTF-8)
873    pub fn value_str(&self) -> Option<&str> {
874        std::str::from_utf8(&self.value).ok()
875    }
876
877    /// Get key as string
878    pub fn key_str(&self) -> Option<&str> {
879        self.key.as_ref().and_then(|k| std::str::from_utf8(k).ok())
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886
887    #[test]
888    fn test_zero_copy_buffer_basic() {
889        let buffer = Arc::new(ZeroCopyBuffer::new(1024));
890        assert_eq!(buffer.len(), 0);
891        assert!(buffer.remaining() >= 1024);
892
893        // Reserve space
894        let slice = buffer.reserve(100).unwrap();
895        assert_eq!(slice.len(), 100);
896        assert_eq!(buffer.len(), 100);
897    }
898
899    #[test]
900    fn test_zero_copy_buffer_write() {
901        let buffer = Arc::new(ZeroCopyBuffer::new(1024));
902
903        let mut slice = buffer.reserve(11).unwrap();
904        slice.write(b"Hello World");
905
906        assert_eq!(slice.as_bytes(), b"Hello World");
907    }
908
909    #[test]
910    fn test_buffer_pool() {
911        let pool = ZeroCopyBufferPool::new(1024, 4);
912        let stats = pool.stats();
913        assert_eq!(stats.total_created, 4);
914        assert_eq!(stats.available, 4);
915
916        // Acquire buffers
917        let b1 = pool.acquire();
918        let b2 = pool.acquire();
919
920        let stats = pool.stats();
921        assert_eq!(stats.in_use, 2);
922
923        // Release buffers
924        pool.release(b1);
925        pool.release(b2);
926
927        let stats = pool.stats();
928        assert_eq!(stats.in_use, 0);
929    }
930
931    #[test]
932    fn test_buffer_ref_inline() {
933        let small_data = b"small";
934        let buf_ref = BufferRef::from_bytes(small_data);
935
936        match buf_ref {
937            BufferRef::Inline(_) => {}
938            _ => panic!("Expected inline storage for small data"),
939        }
940
941        assert_eq!(buf_ref.as_bytes(), small_data);
942    }
943
944    #[test]
945    fn test_buffer_ref_external() {
946        let large_data = vec![0u8; 100];
947        let buf_ref = BufferRef::from_bytes(&large_data);
948
949        match buf_ref {
950            BufferRef::External(_) => {}
951            _ => panic!("Expected external storage for large data"),
952        }
953
954        assert_eq!(buf_ref.len(), 100);
955    }
956
957    #[test]
958    fn test_zero_copy_producer() {
959        let producer = ZeroCopyProducer::with_defaults();
960
961        let msg = producer.create_message("test-topic", 0, Some(b"key1"), b"value1");
962
963        assert_eq!(&*msg.topic, "test-topic");
964        assert_eq!(msg.partition, 0);
965        assert_eq!(msg.key.unwrap().as_bytes(), b"key1");
966        assert_eq!(msg.value.as_bytes(), b"value1");
967
968        let stats = producer.stats();
969        assert_eq!(stats.messages_created, 1);
970    }
971
972    #[test]
973    fn test_zero_copy_consumer() {
974        let consumer = ZeroCopyConsumer::new();
975
976        // Create a simple message format
977        let mut data = BytesMut::new();
978
979        // Message length (16 + 5 = 21 bytes)
980        data.extend_from_slice(&21u32.to_be_bytes());
981        // Offset
982        data.extend_from_slice(&42u64.to_be_bytes());
983        // Timestamp
984        data.extend_from_slice(&1234567890i64.to_be_bytes());
985        // Value
986        data.extend_from_slice(b"hello");
987
988        let messages = consumer.parse_messages(data.freeze());
989
990        assert_eq!(messages.len(), 1);
991        assert_eq!(messages[0].offset, 42);
992        assert_eq!(messages[0].timestamp, 1234567890);
993        assert_eq!(&messages[0].value[..], b"hello");
994    }
995
996    #[test]
997    fn test_small_vec() {
998        let sv = SmallVec::from_slice(b"test data");
999        assert_eq!(sv.as_slice(), b"test data");
1000        assert_eq!(sv.len(), 9);
1001    }
1002
1003    #[test]
1004    fn test_topic_interning() {
1005        let producer = ZeroCopyProducer::with_defaults();
1006
1007        let msg1 = producer.create_message("topic-a", 0, None, b"v1");
1008        let msg2 = producer.create_message("topic-a", 0, None, b"v2");
1009        let msg3 = producer.create_message("topic-b", 0, None, b"v3");
1010
1011        // Same topic should share the same Arc
1012        assert!(Arc::ptr_eq(&msg1.topic, &msg2.topic));
1013        // Different topics should not
1014        assert!(!Arc::ptr_eq(&msg1.topic, &msg3.topic));
1015    }
1016}