turbomcp_core/
zero_copy.rs

1//! Zero-copy message processing for ultra-high performance
2//!
3//! This module provides zero-allocation message handling using `bytes::Bytes`
4//! for maximum throughput and minimal memory overhead.
5
6use bytes::{BufMut, Bytes, BytesMut};
7use serde::{Deserialize, Serialize};
8use serde_json::value::RawValue;
9use std::fmt;
10use std::path::Path;
11use std::sync::Arc;
12use uuid::Uuid;
13
14use crate::error::{Error, Result};
15use crate::types::{ContentType, Timestamp};
16
17/// Zero-copy message with lazy deserialization
18#[derive(Debug, Clone)]
19pub struct ZeroCopyMessage {
20    /// Message ID - using Arc for cheap cloning
21    pub id: Arc<MessageId>,
22
23    /// Raw message payload - zero-copy bytes
24    pub payload: Bytes,
25
26    /// Lazy-parsed JSON value for deferred deserialization
27    pub lazy_json: Option<Box<RawValue>>,
28
29    /// Message metadata
30    pub metadata: MessageMetadata,
31}
32
33/// Optimized message ID with Arc sharing
34#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum MessageId {
36    /// String ID with Arc for sharing
37    String(Arc<str>),
38    /// Numeric ID (stack-allocated)
39    Number(i64),
40    /// UUID (stack-allocated)
41    Uuid(Uuid),
42}
43
44/// Lightweight message metadata
45#[derive(Debug, Clone)]
46pub struct MessageMetadata {
47    /// Creation timestamp
48    pub created_at: Timestamp,
49    /// Content type
50    pub content_type: ContentType,
51    /// Message size in bytes
52    pub size: usize,
53    /// Optional correlation ID (Arc for sharing)
54    pub correlation_id: Option<Arc<str>>,
55}
56
57impl ZeroCopyMessage {
58    /// Create a new zero-copy message from bytes
59    #[inline]
60    pub fn from_bytes(id: MessageId, payload: Bytes) -> Self {
61        let size = payload.len();
62        Self {
63            id: Arc::new(id),
64            payload: payload.clone(),
65            lazy_json: None,
66            metadata: MessageMetadata {
67                created_at: Timestamp::now(),
68                content_type: ContentType::Json,
69                size,
70                correlation_id: None,
71            },
72        }
73    }
74
75    /// Create from a JSON value with zero-copy optimization
76    pub fn from_json<T: Serialize>(id: MessageId, value: &T) -> Result<Self> {
77        // Use a reusable buffer pool in production
78        let mut buffer = BytesMut::with_capacity(1024);
79
80        // Serialize directly to bytes
81        serde_json::to_writer((&mut buffer).writer(), value)
82            .map_err(|e| Error::serialization(e.to_string()))?;
83
84        let payload = buffer.freeze();
85        let size = payload.len();
86
87        Ok(Self {
88            id: Arc::new(id),
89            payload,
90            lazy_json: None,
91            metadata: MessageMetadata {
92                created_at: Timestamp::now(),
93                content_type: ContentType::Json,
94                size,
95                correlation_id: None,
96            },
97        })
98    }
99
100    /// Parse JSON lazily - only when needed
101    #[inline]
102    pub fn parse_json_lazy(&mut self) -> Result<&RawValue> {
103        if self.lazy_json.is_none() {
104            // Parse without deserializing the full structure
105            let raw: Box<RawValue> = serde_json::from_slice(&self.payload)
106                .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))?;
107
108            // Store the parsed raw value
109            self.lazy_json = Some(raw);
110        }
111
112        Ok(self.lazy_json.as_ref().unwrap())
113    }
114
115    /// Deserialize a specific type from the message with SIMD acceleration when available
116    #[inline]
117    pub fn deserialize<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
118        #[cfg(feature = "simd")]
119        {
120            // Use simd-json for faster parsing when payload is large enough
121            if self.payload.len() >= 64 {
122                // Clone to mutable buffer for SIMD parsing
123                let mut buffer = self.payload.to_vec();
124                simd_json::from_slice(&mut buffer)
125                    .map_err(|e| Error::serialization(format!("SIMD deserialize error: {}", e)))
126            } else {
127                // Fall back to standard parsing for small payloads
128                serde_json::from_slice(&self.payload)
129                    .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
130            }
131        }
132        #[cfg(not(feature = "simd"))]
133        {
134            serde_json::from_slice(&self.payload)
135                .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
136        }
137    }
138
139    /// Get a zero-copy view of the payload
140    #[inline]
141    pub fn payload_slice(&self) -> &[u8] {
142        &self.payload
143    }
144
145    /// Clone the message cheaply (Arc increments only)
146    #[inline]
147    pub fn cheap_clone(&self) -> Self {
148        Self {
149            id: Arc::clone(&self.id),
150            payload: self.payload.clone(), // Bytes is already Arc-based
151            lazy_json: self.lazy_json.clone(),
152            metadata: self.metadata.clone(),
153        }
154    }
155}
156
157/// Buffer pool for reusing allocations
158#[derive(Debug)]
159pub struct BufferPool {
160    /// Pool of reusable buffers
161    buffers: crossbeam::queue::ArrayQueue<BytesMut>,
162    /// Default buffer capacity
163    capacity: usize,
164}
165
166impl BufferPool {
167    /// Create a new buffer pool
168    pub fn new(size: usize, capacity: usize) -> Self {
169        let buffers = crossbeam::queue::ArrayQueue::new(size);
170
171        // Pre-allocate buffers
172        for _ in 0..size {
173            let _ = buffers.push(BytesMut::with_capacity(capacity));
174        }
175
176        Self { buffers, capacity }
177    }
178
179    /// Get a buffer from the pool or create a new one
180    #[inline]
181    pub fn acquire(&self) -> BytesMut {
182        self.buffers
183            .pop()
184            .unwrap_or_else(|| BytesMut::with_capacity(self.capacity))
185    }
186
187    /// Return a buffer to the pool for reuse
188    #[inline]
189    pub fn release(&self, mut buffer: BytesMut) {
190        buffer.clear();
191        let _ = self.buffers.push(buffer);
192    }
193}
194
195/// Zero-copy message batch for efficient bulk processing
196#[derive(Debug)]
197pub struct MessageBatch {
198    /// Contiguous buffer containing all messages
199    pub buffer: Bytes,
200    /// Offsets and lengths of individual messages
201    pub messages: Vec<(usize, usize)>,
202    /// Shared message IDs
203    pub ids: Vec<Arc<MessageId>>,
204}
205
206impl MessageBatch {
207    /// Create a new message batch
208    pub fn new(capacity: usize) -> Self {
209        Self {
210            buffer: Bytes::new(),
211            messages: Vec::with_capacity(capacity),
212            ids: Vec::with_capacity(capacity),
213        }
214    }
215
216    /// Add a message to the batch
217    pub fn add(&mut self, id: MessageId, payload: Bytes) {
218        let offset = self.buffer.len();
219        let length = payload.len();
220
221        // Extend the buffer
222        let mut buffer = BytesMut::from(self.buffer.as_ref());
223        buffer.extend_from_slice(&payload);
224        self.buffer = buffer.freeze();
225
226        // Store offset and length
227        self.messages.push((offset, length));
228        self.ids.push(Arc::new(id));
229    }
230
231    /// Get a zero-copy view of a message
232    #[inline]
233    pub fn get(&self, index: usize) -> Option<Bytes> {
234        self.messages
235            .get(index)
236            .map(|(offset, length)| self.buffer.slice(*offset..*offset + *length))
237    }
238
239    /// Iterate over messages without copying
240    pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, Bytes)> + '_ {
241        self.ids
242            .iter()
243            .zip(self.messages.iter())
244            .map(move |(id, (offset, length))| (id, self.buffer.slice(*offset..*offset + *length)))
245    }
246}
247
248/// Fast utilities for message processing with SIMD acceleration
249pub mod fast {
250    /// Fast UTF-8 validation with SIMD when available
251    #[inline]
252    pub fn validate_utf8_fast(bytes: &[u8]) -> bool {
253        #[cfg(feature = "simd")]
254        {
255            // Use SIMD-accelerated validation for larger inputs
256            if bytes.len() >= 64 {
257                simdutf8::basic::from_utf8(bytes).is_ok()
258            } else {
259                std::str::from_utf8(bytes).is_ok()
260            }
261        }
262        #[cfg(not(feature = "simd"))]
263        {
264            std::str::from_utf8(bytes).is_ok()
265        }
266    }
267
268    /// Fast JSON boundary detection with optimized scanning
269    #[inline]
270    pub fn find_json_boundaries(bytes: &[u8]) -> Vec<usize> {
271        let mut boundaries = Vec::new();
272        let mut depth = 0;
273        let mut in_string = false;
274        let mut escaped = false;
275
276        // Optimized boundary detection with proper string handling
277        for (i, &byte) in bytes.iter().enumerate() {
278            if escaped {
279                escaped = false;
280                continue;
281            }
282
283            match byte {
284                b'\\' if in_string => escaped = true,
285                b'"' if !escaped => in_string = !in_string,
286                b'{' | b'[' if !in_string => depth += 1,
287                b'}' | b']' if !in_string => {
288                    depth -= 1;
289                    if depth == 0 {
290                        boundaries.push(i + 1);
291                    }
292                }
293                _ => {}
294            }
295        }
296
297        boundaries
298    }
299
300    /// SIMD-accelerated JSON validation
301    #[cfg(feature = "simd")]
302    #[inline]
303    pub fn validate_json_fast(bytes: &[u8]) -> bool {
304        if bytes.len() >= 64 {
305            // Use simd-json for validation
306            let mut owned = bytes.to_vec();
307            simd_json::to_borrowed_value(&mut owned).is_ok()
308        } else {
309            // Fall back to standard validation for small inputs
310            serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
311        }
312    }
313
314    #[cfg(not(feature = "simd"))]
315    #[inline]
316    pub fn validate_json_fast(bytes: &[u8]) -> bool {
317        serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
318    }
319}
320
321/// Memory-mapped file support for ultra-fast large file processing
322#[cfg(feature = "mmap")]
323pub mod mmap {
324    use super::*;
325    use memmap2::{Mmap, MmapOptions};
326    use std::fs::File;
327    use std::io;
328    use std::ops::Deref;
329
330    /// A memory-mapped message for zero-copy file access
331    #[derive(Debug)]
332    pub struct MmapMessage {
333        /// Message ID
334        pub id: Arc<MessageId>,
335        /// Memory-mapped data
336        pub mmap: Arc<Mmap>,
337        /// Offset within the mapped region
338        pub offset: usize,
339        /// Length of the message data
340        pub length: usize,
341        /// Message metadata
342        pub metadata: MessageMetadata,
343    }
344
345    impl MmapMessage {
346        /// Create a message from a memory-mapped file
347        pub fn from_file(
348            id: MessageId,
349            path: &Path,
350            offset: usize,
351            length: Option<usize>,
352        ) -> io::Result<Self> {
353            let file = File::open(path)?;
354            let metadata = file.metadata()?;
355            let file_size = metadata.len() as usize;
356
357            // Validate offset
358            if offset >= file_size {
359                return Err(io::Error::new(
360                    io::ErrorKind::InvalidInput,
361                    "Offset exceeds file size",
362                ));
363            }
364
365            // Calculate actual length
366            let actual_length = length.unwrap_or(file_size - offset);
367            let actual_length = actual_length.min(file_size - offset);
368
369            // Create memory map
370            // SAFETY: file handle is valid and opened for reading. memmap2 provides
371            // safe abstractions over POSIX mmap. The resulting mapping is read-only.
372            let mmap = unsafe { MmapOptions::new().map(&file)? };
373
374            Ok(Self {
375                id: Arc::new(id),
376                mmap: Arc::new(mmap),
377                offset,
378                length: actual_length,
379                metadata: MessageMetadata {
380                    created_at: Timestamp::now(),
381                    content_type: ContentType::Json,
382                    size: actual_length,
383                    correlation_id: None,
384                },
385            })
386        }
387
388        /// Get the message data as a byte slice
389        #[inline]
390        pub fn data(&self) -> &[u8] {
391            &self.mmap[self.offset..self.offset + self.length]
392        }
393
394        /// Convert to a Bytes instance for compatibility
395        #[inline]
396        pub fn to_bytes(&self) -> Bytes {
397            Bytes::copy_from_slice(self.data())
398        }
399
400        /// Parse JSON lazily from the mapped data
401        /// Parse the message data as JSON
402        pub fn parse_json<T>(&self) -> Result<T>
403        where
404            T: for<'de> Deserialize<'de>,
405        {
406            serde_json::from_slice(self.data())
407                .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
408        }
409
410        /// Get a zero-copy string view if the data is valid UTF-8
411        /// Get the message data as a string slice
412        pub fn as_str(&self) -> Result<&str> {
413            std::str::from_utf8(self.data())
414                .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
415        }
416    }
417
418    /// A pool of memory-mapped files for efficient reuse
419    #[derive(Debug)]
420    pub struct MmapPool {
421        /// Cached memory maps
422        maps: dashmap::DashMap<std::path::PathBuf, Arc<Mmap>>,
423        /// Maximum number of cached maps
424        max_size: usize,
425    }
426
427    impl MmapPool {
428        /// Create a new memory map pool
429        pub fn new(max_size: usize) -> Self {
430            Self {
431                maps: dashmap::DashMap::new(),
432                max_size,
433            }
434        }
435
436        /// Get or create a memory map for a file
437        pub fn get_or_create(&self, path: &Path) -> io::Result<Arc<Mmap>> {
438            // Check if already cached
439            if let Some(mmap) = self.maps.get(path) {
440                return Ok(Arc::clone(&*mmap));
441            }
442
443            // Create new memory map
444            let file = File::open(path)?;
445            // SAFETY: file handle is valid and opened for reading. memmap2 provides
446            // safe abstractions over POSIX mmap. The resulting mapping is read-only.
447            let mmap = unsafe { MmapOptions::new().map(&file)? };
448            let mmap = Arc::new(mmap);
449
450            // Cache if under limit
451            if self.maps.len() < self.max_size {
452                self.maps.insert(path.to_path_buf(), Arc::clone(&mmap));
453            }
454
455            Ok(mmap)
456        }
457
458        /// Clear the cache
459        pub fn clear(&self) {
460            self.maps.clear();
461        }
462
463        /// Get cache size
464        pub fn size(&self) -> usize {
465            self.maps.len()
466        }
467    }
468
469    /// Memory-mapped message batch for processing multiple messages from a file
470    #[derive(Debug)]
471    pub struct MmapBatch {
472        /// Memory-mapped file
473        mmap: Arc<Mmap>,
474        /// Message boundaries (offset, length)
475        messages: Vec<(usize, usize)>,
476        /// Message IDs
477        ids: Vec<Arc<MessageId>>,
478    }
479
480    impl MmapBatch {
481        /// Create a batch from a memory-mapped file with JSON lines
482        pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
483            let file = File::open(path)?;
484            // SAFETY: file handle is valid and opened for reading. memmap2 provides
485            // safe abstractions over POSIX mmap. The resulting mapping is read-only.
486            let mmap = unsafe { MmapOptions::new().map(&file)? };
487
488            let mut messages = Vec::new();
489            let mut ids = Vec::new();
490            let mut offset = 0;
491
492            // Parse JSON lines
493            for (idx, line) in mmap.split(|&b| b == b'\n').enumerate() {
494                if !line.is_empty() {
495                    messages.push((offset, line.len()));
496                    ids.push(Arc::new(MessageId::Number(idx as i64)));
497                }
498                offset += line.len() + 1; // +1 for newline
499            }
500
501            Ok(Self {
502                mmap: Arc::new(mmap),
503                messages,
504                ids,
505            })
506        }
507
508        /// Get a message by index
509        #[inline]
510        pub fn get(&self, index: usize) -> Option<&[u8]> {
511            self.messages
512                .get(index)
513                .map(|(offset, length)| &self.mmap[*offset..*offset + *length])
514        }
515
516        /// Iterate over messages
517        pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
518            self.ids
519                .iter()
520                .zip(self.messages.iter())
521                .map(move |(id, (offset, length))| {
522                    (id, &self.mmap.deref()[*offset..*offset + *length])
523                })
524        }
525
526        /// Get the number of messages
527        pub fn len(&self) -> usize {
528            self.messages.len()
529        }
530
531        /// Check if batch is empty
532        pub fn is_empty(&self) -> bool {
533            self.messages.is_empty()
534        }
535    }
536}
537
538/// Memory-mapped file support (safe wrapper when feature is disabled)
539#[cfg(not(feature = "mmap"))]
540pub mod mmap {
541    use super::*;
542    use std::fs;
543    use std::io;
544
545    /// Fallback implementation using regular file I/O
546    #[derive(Debug)]
547    pub struct MmapMessage {
548        /// Unique message identifier
549        pub id: Arc<MessageId>,
550        /// Message data as bytes
551        pub data: Bytes,
552        /// Message metadata
553        pub metadata: MessageMetadata,
554    }
555
556    impl MmapMessage {
557        /// Create a message by reading from a file
558        pub fn from_file(
559            id: MessageId,
560            path: &Path,
561            offset: usize,
562            length: Option<usize>,
563        ) -> io::Result<Self> {
564            let data = fs::read(path)?;
565            let file_size = data.len();
566
567            if offset >= file_size {
568                return Err(io::Error::new(
569                    io::ErrorKind::InvalidInput,
570                    "Offset exceeds file size",
571                ));
572            }
573
574            let actual_length = length.unwrap_or(file_size - offset);
575            let actual_length = actual_length.min(file_size - offset);
576
577            let data = Bytes::copy_from_slice(&data[offset..offset + actual_length]);
578
579            Ok(Self {
580                id: Arc::new(id),
581                data: data.clone(),
582                metadata: MessageMetadata {
583                    created_at: Timestamp::now(),
584                    content_type: ContentType::Json,
585                    size: actual_length,
586                    correlation_id: None,
587                },
588            })
589        }
590
591        /// Get the message data as a byte slice
592        #[inline]
593        pub fn data(&self) -> &[u8] {
594            &self.data
595        }
596
597        /// Convert the message data to Bytes
598        #[inline]
599        pub fn to_bytes(&self) -> Bytes {
600            self.data.clone()
601        }
602
603        /// Parse the message data as JSON
604        pub fn parse_json<T>(&self) -> Result<T>
605        where
606            T: for<'de> Deserialize<'de>,
607        {
608            serde_json::from_slice(&self.data)
609                .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
610        }
611
612        /// Get the message data as a string slice
613        pub fn as_str(&self) -> Result<&str> {
614            std::str::from_utf8(&self.data)
615                .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
616        }
617    }
618
619    /// Fallback pool implementation
620    #[derive(Debug)]
621    pub struct MmapPool {
622        cache: dashmap::DashMap<std::path::PathBuf, Bytes>,
623        max_size: usize,
624    }
625
626    impl MmapPool {
627        /// Create a new MmapPool with the specified maximum cache size
628        pub fn new(max_size: usize) -> Self {
629            Self {
630                cache: dashmap::DashMap::new(),
631                max_size,
632            }
633        }
634
635        /// Get or create a cached file read
636        pub fn get_or_create(&self, path: &Path) -> io::Result<Bytes> {
637            if let Some(data) = self.cache.get(path) {
638                return Ok(data.clone());
639            }
640
641            let data = fs::read(path)?;
642            let bytes = Bytes::from(data);
643
644            if self.cache.len() < self.max_size {
645                self.cache.insert(path.to_path_buf(), bytes.clone());
646            }
647
648            Ok(bytes)
649        }
650
651        /// Clear all cached entries
652        pub fn clear(&self) {
653            self.cache.clear();
654        }
655
656        /// Get the current number of cached entries
657        pub fn size(&self) -> usize {
658            self.cache.len()
659        }
660    }
661
662    /// Fallback batch implementation
663    #[derive(Debug)]
664    pub struct MmapBatch {
665        data: Bytes,
666        messages: Vec<(usize, usize)>,
667        ids: Vec<Arc<MessageId>>,
668    }
669
670    impl MmapBatch {
671        /// Create a batch from a JSONL file
672        pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
673            let data = fs::read(path)?;
674            let mut messages = Vec::new();
675            let mut ids = Vec::new();
676            let mut offset = 0;
677
678            for (idx, line) in data.split(|&b| b == b'\n').enumerate() {
679                if !line.is_empty() {
680                    messages.push((offset, line.len()));
681                    ids.push(Arc::new(MessageId::Number(idx as i64)));
682                }
683                offset += line.len() + 1;
684            }
685
686            Ok(Self {
687                data: Bytes::from(data),
688                messages,
689                ids,
690            })
691        }
692
693        /// Get a message by index
694        #[inline]
695        pub fn get(&self, index: usize) -> Option<&[u8]> {
696            self.messages
697                .get(index)
698                .map(|(offset, length)| &self.data[*offset..*offset + *length])
699        }
700
701        /// Iterate over all messages in the batch
702        pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
703            self.ids
704                .iter()
705                .zip(self.messages.iter())
706                .map(move |(id, (offset, length))| (id, &self.data[*offset..*offset + *length]))
707        }
708
709        /// Get the number of messages in the batch
710        pub fn len(&self) -> usize {
711            self.messages.len()
712        }
713
714        /// Check if the batch is empty
715        pub fn is_empty(&self) -> bool {
716            self.messages.is_empty()
717        }
718    }
719}
720
721impl From<String> for MessageId {
722    fn from(s: String) -> Self {
723        Self::String(Arc::from(s))
724    }
725}
726
727impl From<&str> for MessageId {
728    fn from(s: &str) -> Self {
729        Self::String(Arc::from(s))
730    }
731}
732
733impl From<i64> for MessageId {
734    fn from(n: i64) -> Self {
735        Self::Number(n)
736    }
737}
738
739impl From<Uuid> for MessageId {
740    fn from(u: Uuid) -> Self {
741        Self::Uuid(u)
742    }
743}
744
745impl fmt::Display for MessageId {
746    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
747        match self {
748            Self::String(s) => write!(f, "{}", s),
749            Self::Number(n) => write!(f, "{}", n),
750            Self::Uuid(u) => write!(f, "{}", u),
751        }
752    }
753}
754
755#[cfg(test)]
756mod tests {
757    use super::*;
758
759    #[test]
760    fn test_zero_copy_message_creation() {
761        let payload = Bytes::from(r#"{"test": "data"}"#);
762        let msg = ZeroCopyMessage::from_bytes(MessageId::from("test-1"), payload.clone());
763
764        assert_eq!(msg.payload, payload);
765        assert_eq!(msg.metadata.size, payload.len());
766    }
767
768    #[test]
769    fn test_lazy_json_parsing() {
770        let payload = Bytes::from(r#"{"key": "value", "number": 42}"#);
771        let mut msg = ZeroCopyMessage::from_bytes(MessageId::from("test-2"), payload);
772
773        // Parse lazily
774        let raw = msg.parse_json_lazy().unwrap();
775        assert!(raw.get().contains("value"));
776
777        // Check that lazy_json is now populated
778        assert!(msg.lazy_json.is_some());
779    }
780
781    #[test]
782    fn test_buffer_pool() {
783        let pool = BufferPool::new(2, 1024);
784
785        let buf1 = pool.acquire();
786        let buf2 = pool.acquire();
787        let buf3 = pool.acquire(); // Should create new
788
789        assert_eq!(buf1.capacity(), 1024);
790        assert_eq!(buf2.capacity(), 1024);
791        assert_eq!(buf3.capacity(), 1024);
792
793        pool.release(buf1);
794        let buf4 = pool.acquire(); // Should reuse
795        assert_eq!(buf4.capacity(), 1024);
796    }
797
798    #[test]
799    fn test_message_batch() {
800        let mut batch = MessageBatch::new(10);
801
802        batch.add(MessageId::from("msg1"), Bytes::from("data1"));
803        batch.add(MessageId::from("msg2"), Bytes::from("data2"));
804        batch.add(MessageId::from("msg3"), Bytes::from("data3"));
805
806        assert_eq!(batch.messages.len(), 3);
807
808        let msg1 = batch.get(0).unwrap();
809        assert_eq!(msg1, Bytes::from("data1"));
810
811        let msg2 = batch.get(1).unwrap();
812        assert_eq!(msg2, Bytes::from("data2"));
813
814        // Iterate without copying
815        let mut count = 0;
816        for (_id, payload) in batch.iter() {
817            count += 1;
818            assert!(!payload.is_empty());
819        }
820        assert_eq!(count, 3);
821    }
822
823    #[test]
824    fn test_cheap_clone() {
825        let msg = ZeroCopyMessage::from_bytes(MessageId::from("test"), Bytes::from("data"));
826
827        let cloned = msg.cheap_clone();
828
829        // Should share the same Arc pointers
830        assert!(Arc::ptr_eq(&msg.id, &cloned.id));
831        assert_eq!(msg.payload, cloned.payload);
832    }
833
834    #[test]
835    fn test_mmap_message() {
836        use std::io::Write;
837
838        // Create a test file
839        let temp_dir = std::env::temp_dir();
840        let test_file = temp_dir.join("test_mmap.json");
841        let mut file = std::fs::File::create(&test_file).unwrap();
842        let test_data = r#"{"test": "data", "value": 42}"#;
843        file.write_all(test_data.as_bytes()).unwrap();
844        file.sync_all().unwrap();
845        drop(file);
846
847        // Test memory-mapped message
848        let msg = mmap::MmapMessage::from_file(MessageId::from("mmap-test"), &test_file, 0, None)
849            .unwrap();
850
851        assert_eq!(msg.data(), test_data.as_bytes());
852        assert_eq!(msg.as_str().unwrap(), test_data);
853
854        // Test JSON parsing
855        let value: serde_json::Value = msg.parse_json().unwrap();
856        assert_eq!(value["test"], "data");
857        assert_eq!(value["value"], 42);
858
859        // Clean up
860        std::fs::remove_file(test_file).unwrap();
861    }
862
863    #[test]
864    fn test_mmap_batch() {
865        use std::io::Write;
866
867        // Create a test JSONL file
868        let temp_dir = std::env::temp_dir();
869        let test_file = temp_dir.join("test_batch.jsonl");
870        let mut file = std::fs::File::create(&test_file).unwrap();
871        writeln!(file, r#"{{"id": 1, "name": "first"}}"#).unwrap();
872        writeln!(file, r#"{{"id": 2, "name": "second"}}"#).unwrap();
873        writeln!(file, r#"{{"id": 3, "name": "third"}}"#).unwrap();
874        file.sync_all().unwrap();
875        drop(file);
876
877        // Test batch processing
878        let batch = mmap::MmapBatch::from_jsonl_file(&test_file).unwrap();
879
880        assert_eq!(batch.len(), 3);
881        assert!(!batch.is_empty());
882
883        // Test individual access
884        let msg1 = batch.get(0).unwrap();
885        let value: serde_json::Value = serde_json::from_slice(msg1).unwrap();
886        assert_eq!(value["id"], 1);
887        assert_eq!(value["name"], "first");
888
889        // Test iteration
890        let mut count = 0;
891        for (_id, data) in batch.iter() {
892            let value: serde_json::Value = serde_json::from_slice(data).unwrap();
893            assert!(value["id"].is_number());
894            assert!(value["name"].is_string());
895            count += 1;
896        }
897        assert_eq!(count, 3);
898
899        // Clean up
900        std::fs::remove_file(test_file).unwrap();
901    }
902
903    #[test]
904    fn test_mmap_pool() {
905        use std::io::Write;
906
907        // Create test files
908        let temp_dir = std::env::temp_dir();
909        let test_file1 = temp_dir.join("pool_test1.json");
910        let test_file2 = temp_dir.join("pool_test2.json");
911
912        let mut file1 = std::fs::File::create(&test_file1).unwrap();
913        file1.write_all(b"test1").unwrap();
914        file1.sync_all().unwrap();
915
916        let mut file2 = std::fs::File::create(&test_file2).unwrap();
917        file2.write_all(b"test2").unwrap();
918        file2.sync_all().unwrap();
919
920        // Test pool
921        let pool = mmap::MmapPool::new(10);
922
923        assert_eq!(pool.size(), 0);
924
925        let _data1 = pool.get_or_create(&test_file1).unwrap();
926        assert_eq!(pool.size(), 1);
927
928        let _data2 = pool.get_or_create(&test_file2).unwrap();
929        assert_eq!(pool.size(), 2);
930
931        // Getting again should use cache
932        let _data1_again = pool.get_or_create(&test_file1).unwrap();
933        assert_eq!(pool.size(), 2); // Still 2, used cache
934
935        pool.clear();
936        assert_eq!(pool.size(), 0);
937
938        // Clean up
939        std::fs::remove_file(test_file1).unwrap();
940        std::fs::remove_file(test_file2).unwrap();
941    }
942}