turbomcp_protocol/
zero_copy.rs

1//! Zero-copy message processing with minimal allocations
2//!
3//! This module provides zero-allocation message handling using `bytes::Bytes`
4//! for maximum throughput and minimal memory overhead.
5//!
6//! ## When to Use ZeroCopyMessage
7//!
8//! **Most users should use [`Message`](crate::Message) instead.**
9//!
10//! `ZeroCopyMessage` is designed for extreme performance scenarios where:
11//! - You process **millions of messages per second**
12//! - **Every allocation matters** for your performance profile
13//! - You can **defer deserialization** until absolutely necessary
14//! - You're willing to **trade ergonomics for performance**
15//!
16//! ### Message vs ZeroCopyMessage
17//!
18//! | Feature | [`Message`](crate::Message) | `ZeroCopyMessage` |
19//! |---------|---------|------------------|
20//! | Ergonomics | ✅ Excellent | ⚠️ Manual |
21//! | Memory | ✅ Good | ✅ Optimal |
22//! | Deserialization | Eager | Lazy |
23//! | Multiple formats | ✅ JSON/CBOR/MessagePack | JSON only |
24//! | ID storage | Stack/String | Arc (shared) |
25//! | Use case | General purpose | Ultra-high throughput |
26//!
27//! ### Example Usage
28//!
29//! ```rust
30//! use turbomcp_protocol::zero_copy::{ZeroCopyMessage, MessageId};
31//! use bytes::Bytes;
32//!
33//! // Create from raw bytes (no allocation)
34//! let payload = Bytes::from(r#"{"method": "test", "id": 1}"#);
35//! let mut msg = ZeroCopyMessage::from_bytes(MessageId::from("req-1"), payload);
36//!
37//! // Lazy parsing - returns RawValue without full deserialization
38//! let raw = msg.parse_json_lazy()?;
39//! assert!(raw.get().contains("method"));
40//!
41//! // Full deserialization when needed
42//! let data: serde_json::Value = msg.deserialize()?;
43//! # Ok::<(), Box<dyn std::error::Error>>(())
44//! ```
45//!
46//! **Performance Tip**: Use `ZeroCopyMessage` in hot paths where you need to
47//! route messages based on metadata but don't need to parse the payload.
48
49use bytes::{BufMut, Bytes, BytesMut};
50use serde::{Deserialize, Serialize};
51use serde_json::value::RawValue;
52use std::fmt;
53use std::path::Path;
54use std::sync::Arc;
55use uuid::Uuid;
56
57use crate::error::{Error, Result};
58use crate::types::{ContentType, Timestamp};
59
60/// Zero-copy message with lazy deserialization
61#[derive(Debug, Clone)]
62pub struct ZeroCopyMessage {
63    /// Message ID - using Arc for cheap cloning
64    pub id: Arc<MessageId>,
65
66    /// Raw message payload - zero-copy bytes
67    pub payload: Bytes,
68
69    /// Lazy-parsed JSON value for deferred deserialization
70    pub lazy_json: Option<Box<RawValue>>,
71
72    /// Message metadata
73    pub metadata: MessageMetadata,
74}
75
76/// Optimized message ID with Arc sharing
77#[derive(Debug, Clone, PartialEq, Eq, Hash)]
78pub enum MessageId {
79    /// String ID with Arc for sharing
80    String(Arc<str>),
81    /// Numeric ID (stack-allocated)
82    Number(i64),
83    /// UUID (stack-allocated)
84    Uuid(Uuid),
85}
86
87/// Lightweight message metadata
88#[derive(Debug, Clone)]
89pub struct MessageMetadata {
90    /// Creation timestamp
91    pub created_at: Timestamp,
92    /// Content type
93    pub content_type: ContentType,
94    /// Message size in bytes
95    pub size: usize,
96    /// Optional correlation ID (Arc for sharing)
97    pub correlation_id: Option<Arc<str>>,
98}
99
100impl ZeroCopyMessage {
101    /// Create a new zero-copy message from bytes
102    #[inline]
103    pub fn from_bytes(id: MessageId, payload: Bytes) -> Self {
104        let size = payload.len();
105        Self {
106            id: Arc::new(id),
107            payload: payload.clone(),
108            lazy_json: None,
109            metadata: MessageMetadata {
110                created_at: Timestamp::now(),
111                content_type: ContentType::Json,
112                size,
113                correlation_id: None,
114            },
115        }
116    }
117
118    /// Create from a JSON value with zero-copy optimization
119    pub fn from_json<T: Serialize>(id: MessageId, value: &T) -> Result<Self> {
120        // Use a reusable buffer pool in production
121        let mut buffer = BytesMut::with_capacity(1024);
122
123        // Serialize directly to bytes
124        serde_json::to_writer((&mut buffer).writer(), value)
125            .map_err(|e| Error::serialization(e.to_string()))?;
126
127        let payload = buffer.freeze();
128        let size = payload.len();
129
130        Ok(Self {
131            id: Arc::new(id),
132            payload,
133            lazy_json: None,
134            metadata: MessageMetadata {
135                created_at: Timestamp::now(),
136                content_type: ContentType::Json,
137                size,
138                correlation_id: None,
139            },
140        })
141    }
142
143    /// Parse JSON lazily - only when needed
144    #[inline]
145    pub fn parse_json_lazy(&mut self) -> Result<&RawValue> {
146        if self.lazy_json.is_none() {
147            // Parse without deserializing the full structure
148            let raw: Box<RawValue> = serde_json::from_slice(&self.payload)
149                .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))?;
150
151            // Store the parsed raw value
152            self.lazy_json = Some(raw);
153        }
154
155        Ok(self.lazy_json.as_ref().unwrap())
156    }
157
158    /// Deserialize a specific type from the message with SIMD acceleration when available
159    #[inline]
160    pub fn deserialize<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
161        #[cfg(feature = "simd")]
162        {
163            // Use simd-json for faster parsing when payload is large enough
164            if self.payload.len() >= 64 {
165                // Clone to mutable buffer for SIMD parsing
166                let mut buffer = self.payload.to_vec();
167                simd_json::from_slice(&mut buffer)
168                    .map_err(|e| Error::serialization(format!("SIMD deserialize error: {}", e)))
169            } else {
170                // Fall back to standard parsing for small payloads
171                serde_json::from_slice(&self.payload)
172                    .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
173            }
174        }
175        #[cfg(not(feature = "simd"))]
176        {
177            serde_json::from_slice(&self.payload)
178                .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
179        }
180    }
181
182    /// Get a zero-copy view of the payload
183    #[inline]
184    pub fn payload_slice(&self) -> &[u8] {
185        &self.payload
186    }
187
188    /// Clone the message cheaply (Arc increments only)
189    #[inline]
190    pub fn cheap_clone(&self) -> Self {
191        Self {
192            id: Arc::clone(&self.id),
193            payload: self.payload.clone(), // Bytes is already Arc-based
194            lazy_json: self.lazy_json.clone(),
195            metadata: self.metadata.clone(),
196        }
197    }
198}
199
200/// Buffer pool for reusing allocations
201#[derive(Debug)]
202pub struct BufferPool {
203    /// Pool of reusable buffers
204    buffers: crossbeam::queue::ArrayQueue<BytesMut>,
205    /// Default buffer capacity
206    capacity: usize,
207}
208
209impl BufferPool {
210    /// Create a new buffer pool
211    pub fn new(size: usize, capacity: usize) -> Self {
212        let buffers = crossbeam::queue::ArrayQueue::new(size);
213
214        // Pre-allocate buffers
215        for _ in 0..size {
216            let _ = buffers.push(BytesMut::with_capacity(capacity));
217        }
218
219        Self { buffers, capacity }
220    }
221
222    /// Get a buffer from the pool or create a new one
223    #[inline]
224    pub fn acquire(&self) -> BytesMut {
225        self.buffers
226            .pop()
227            .unwrap_or_else(|| BytesMut::with_capacity(self.capacity))
228    }
229
230    /// Return a buffer to the pool for reuse
231    #[inline]
232    pub fn release(&self, mut buffer: BytesMut) {
233        buffer.clear();
234        let _ = self.buffers.push(buffer);
235    }
236}
237
238/// Zero-copy message batch for efficient bulk processing
239#[derive(Debug)]
240pub struct MessageBatch {
241    /// Contiguous buffer containing all messages
242    pub buffer: Bytes,
243    /// Offsets and lengths of individual messages
244    pub messages: Vec<(usize, usize)>,
245    /// Shared message IDs
246    pub ids: Vec<Arc<MessageId>>,
247}
248
249impl MessageBatch {
250    /// Create a new message batch
251    pub fn new(capacity: usize) -> Self {
252        Self {
253            buffer: Bytes::new(),
254            messages: Vec::with_capacity(capacity),
255            ids: Vec::with_capacity(capacity),
256        }
257    }
258
259    /// Add a message to the batch
260    pub fn add(&mut self, id: MessageId, payload: Bytes) {
261        let offset = self.buffer.len();
262        let length = payload.len();
263
264        // Extend the buffer
265        let mut buffer = BytesMut::from(self.buffer.as_ref());
266        buffer.extend_from_slice(&payload);
267        self.buffer = buffer.freeze();
268
269        // Store offset and length
270        self.messages.push((offset, length));
271        self.ids.push(Arc::new(id));
272    }
273
274    /// Get a zero-copy view of a message
275    #[inline]
276    pub fn get(&self, index: usize) -> Option<Bytes> {
277        self.messages
278            .get(index)
279            .map(|(offset, length)| self.buffer.slice(*offset..*offset + *length))
280    }
281
282    /// Iterate over messages without copying
283    pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, Bytes)> + '_ {
284        self.ids
285            .iter()
286            .zip(self.messages.iter())
287            .map(move |(id, (offset, length))| (id, self.buffer.slice(*offset..*offset + *length)))
288    }
289}
290
291/// Fast utilities for message processing with SIMD acceleration
292pub mod fast {
293    /// Fast UTF-8 validation with SIMD when available
294    #[inline]
295    pub fn validate_utf8_fast(bytes: &[u8]) -> bool {
296        #[cfg(feature = "simd")]
297        {
298            // Use SIMD-accelerated validation for larger inputs
299            if bytes.len() >= 64 {
300                simdutf8::basic::from_utf8(bytes).is_ok()
301            } else {
302                std::str::from_utf8(bytes).is_ok()
303            }
304        }
305        #[cfg(not(feature = "simd"))]
306        {
307            std::str::from_utf8(bytes).is_ok()
308        }
309    }
310
311    /// Fast JSON boundary detection with optimized scanning
312    #[inline]
313    pub fn find_json_boundaries(bytes: &[u8]) -> Vec<usize> {
314        let mut boundaries = Vec::new();
315        let mut depth = 0;
316        let mut in_string = false;
317        let mut escaped = false;
318
319        // Optimized boundary detection with proper string handling
320        for (i, &byte) in bytes.iter().enumerate() {
321            if escaped {
322                escaped = false;
323                continue;
324            }
325
326            match byte {
327                b'\\' if in_string => escaped = true,
328                b'"' if !escaped => in_string = !in_string,
329                b'{' | b'[' if !in_string => depth += 1,
330                b'}' | b']' if !in_string => {
331                    depth -= 1;
332                    if depth == 0 {
333                        boundaries.push(i + 1);
334                    }
335                }
336                _ => {}
337            }
338        }
339
340        boundaries
341    }
342
343    /// SIMD-accelerated JSON validation
344    #[cfg(feature = "simd")]
345    #[inline]
346    pub fn validate_json_fast(bytes: &[u8]) -> bool {
347        if bytes.len() >= 64 {
348            // Use simd-json for validation
349            let mut owned = bytes.to_vec();
350            simd_json::to_borrowed_value(&mut owned).is_ok()
351        } else {
352            // Fall back to standard validation for small inputs
353            serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
354        }
355    }
356
357    /// Standard JSON validation (non-SIMD fallback)
358    ///
359    /// Validates JSON syntax using serde_json's parser.
360    #[cfg(not(feature = "simd"))]
361    #[inline]
362    pub fn validate_json_fast(bytes: &[u8]) -> bool {
363        serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
364    }
365}
366
367/// Memory-mapped file support for efficient large file processing
368#[cfg(feature = "mmap")]
369pub mod mmap {
370    use super::*;
371    use memmap2::{Mmap, MmapOptions};
372    use std::fs::File;
373    use std::io;
374    use std::ops::Deref;
375
376    // Import security module if available
377
378    /// A memory-mapped message for zero-copy file access
379    #[derive(Debug)]
380    pub struct MmapMessage {
381        /// Message ID
382        pub id: Arc<MessageId>,
383        /// Memory-mapped data
384        pub mmap: Arc<Mmap>,
385        /// Offset within the mapped region
386        pub offset: usize,
387        /// Length of the message data
388        pub length: usize,
389        /// Message metadata
390        pub metadata: MessageMetadata,
391    }
392
393    impl MmapMessage {
394        /// Create a message from a memory-mapped file (legacy method without security)
395        ///
396        /// **SECURITY WARNING**: This method bypasses security validation.
397        /// Use `from_file_secure` for production systems.
398        pub fn from_file(
399            id: MessageId,
400            path: &Path,
401            offset: usize,
402            length: Option<usize>,
403        ) -> io::Result<Self> {
404            // For backwards compatibility, perform basic validation
405            let file = File::open(path)?;
406            let metadata = file.metadata()?;
407            let file_size = metadata.len() as usize;
408
409            // Validate offset
410            if offset >= file_size {
411                return Err(io::Error::new(
412                    io::ErrorKind::InvalidInput,
413                    "Offset exceeds file size",
414                ));
415            }
416
417            // Calculate actual length
418            let actual_length = length.unwrap_or(file_size - offset);
419            let actual_length = actual_length.min(file_size - offset);
420
421            // Create memory map
422            // SAFETY: file handle is valid and opened for reading. memmap2 provides
423            // safe abstractions over POSIX mmap. The resulting mapping is read-only.
424            let mmap = unsafe { MmapOptions::new().map(&file)? };
425
426            Ok(Self {
427                id: Arc::new(id),
428                mmap: Arc::new(mmap),
429                offset,
430                length: actual_length,
431                metadata: MessageMetadata {
432                    created_at: Timestamp::now(),
433                    content_type: ContentType::Json,
434                    size: actual_length,
435                    correlation_id: None,
436                },
437            })
438        }
439
440        /// Internal method for creating memory-mapped files after security validation
441        #[allow(dead_code)] // Used with mmap feature
442        async fn from_file_internal(
443            id: MessageId,
444            path: &Path,
445            offset: usize,
446            length: Option<usize>,
447        ) -> io::Result<Self> {
448            let file = File::open(path)?;
449            let metadata = file.metadata()?;
450            let file_size = metadata.len() as usize;
451
452            // Validate offset (already validated by security layer, but double-check)
453            if offset >= file_size {
454                return Err(io::Error::new(
455                    io::ErrorKind::InvalidInput,
456                    "Offset exceeds file size",
457                ));
458            }
459
460            // Calculate actual length
461            let actual_length = length.unwrap_or(file_size - offset);
462            let actual_length = actual_length.min(file_size - offset);
463
464            // Create memory map
465            // SAFETY: file handle is valid and opened for reading. memmap2 provides
466            // safe abstractions over POSIX mmap. The resulting mapping is read-only.
467            let mmap = unsafe { MmapOptions::new().map(&file)? };
468
469            Ok(Self {
470                id: Arc::new(id),
471                mmap: Arc::new(mmap),
472                offset,
473                length: actual_length,
474                metadata: MessageMetadata {
475                    created_at: Timestamp::now(),
476                    content_type: ContentType::Json,
477                    size: actual_length,
478                    correlation_id: None,
479                },
480            })
481        }
482
483        /// Create a message from a memory-mapped file (ASYNC - Non-blocking!)
484        ///
485        /// This is the async version of `from_file` that uses `tokio::task::spawn_blocking`
486        /// to avoid blocking the async runtime during file I/O operations.
487        ///
488        /// # Production-Grade Async I/O
489        /// - Uses spawn_blocking for CPU-intensive mmap operations
490        /// - Maintains same functionality as sync version
491        /// - Safe to call from async contexts without blocking
492        /// - Proper error propagation and resource cleanup
493        pub async fn from_file_async(
494            id: MessageId,
495            path: &std::path::Path,
496            offset: usize,
497            length: Option<usize>,
498        ) -> std::io::Result<Self> {
499            let path = path.to_path_buf(); // Clone for move into spawn_blocking
500
501            tokio::task::spawn_blocking(move || Self::from_file(id, &path, offset, length))
502                .await
503                .map_err(|join_err| {
504                    std::io::Error::other(format!("Async mmap operation failed: {}", join_err))
505                })?
506        }
507
508        /// Get the message data as a byte slice
509        #[inline]
510        pub fn data(&self) -> &[u8] {
511            &self.mmap[self.offset..self.offset + self.length]
512        }
513
514        /// Convert to a Bytes instance for compatibility
515        #[inline]
516        pub fn to_bytes(&self) -> Bytes {
517            Bytes::copy_from_slice(self.data())
518        }
519
520        /// Parse JSON lazily from the mapped data
521        /// Parse the message data as JSON
522        pub fn parse_json<T>(&self) -> Result<T>
523        where
524            T: for<'de> Deserialize<'de>,
525        {
526            serde_json::from_slice(self.data())
527                .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
528        }
529
530        /// Get a zero-copy string view if the data is valid UTF-8
531        /// Get the message data as a string slice
532        pub fn as_str(&self) -> Result<&str> {
533            std::str::from_utf8(self.data())
534                .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
535        }
536    }
537
538    /// A pool of memory-mapped files for efficient reuse
539    #[derive(Debug)]
540    pub struct MmapPool {
541        /// Cached memory maps
542        maps: dashmap::DashMap<std::path::PathBuf, Arc<Mmap>>,
543        /// Maximum number of cached maps
544        max_size: usize,
545    }
546
547    impl MmapPool {
548        /// Create a new memory map pool
549        pub fn new(max_size: usize) -> Self {
550            Self {
551                maps: dashmap::DashMap::new(),
552                max_size,
553            }
554        }
555
556        /// Get or create a memory map for a file
557        pub fn get_or_create(&self, path: &Path) -> io::Result<Arc<Mmap>> {
558            // Check if already cached
559            if let Some(mmap) = self.maps.get(path) {
560                return Ok(Arc::clone(&*mmap));
561            }
562
563            // Create new memory map
564            let file = File::open(path)?;
565            // SAFETY: file handle is valid and opened for reading. memmap2 provides
566            // safe abstractions over POSIX mmap. The resulting mapping is read-only.
567            let mmap = unsafe { MmapOptions::new().map(&file)? };
568            let mmap = Arc::new(mmap);
569
570            // Cache if under limit
571            if self.maps.len() < self.max_size {
572                self.maps.insert(path.to_path_buf(), Arc::clone(&mmap));
573            }
574
575            Ok(mmap)
576        }
577
578        /// Clear the cache
579        pub fn clear(&self) {
580            self.maps.clear();
581        }
582
583        /// Get cache size
584        pub fn size(&self) -> usize {
585            self.maps.len()
586        }
587    }
588
589    /// Memory-mapped message batch for processing multiple messages from a file
590    #[derive(Debug)]
591    pub struct MmapBatch {
592        /// Memory-mapped file
593        mmap: Arc<Mmap>,
594        /// Message boundaries (offset, length)
595        messages: Vec<(usize, usize)>,
596        /// Message IDs
597        ids: Vec<Arc<MessageId>>,
598    }
599
600    impl MmapBatch {
601        /// Create a batch from a memory-mapped file with JSON lines
602        pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
603            let file = File::open(path)?;
604            // SAFETY: file handle is valid and opened for reading. memmap2 provides
605            // safe abstractions over POSIX mmap. The resulting mapping is read-only.
606            let mmap = unsafe { MmapOptions::new().map(&file)? };
607
608            let mut messages = Vec::new();
609            let mut ids = Vec::new();
610            let mut offset = 0;
611
612            // Parse JSON lines
613            for (idx, line) in mmap.split(|&b| b == b'\n').enumerate() {
614                if !line.is_empty() {
615                    messages.push((offset, line.len()));
616                    ids.push(Arc::new(MessageId::Number(idx as i64)));
617                }
618                offset += line.len() + 1; // +1 for newline
619            }
620
621            Ok(Self {
622                mmap: Arc::new(mmap),
623                messages,
624                ids,
625            })
626        }
627
628        /// Get a message by index
629        #[inline]
630        pub fn get(&self, index: usize) -> Option<&[u8]> {
631            self.messages
632                .get(index)
633                .map(|(offset, length)| &self.mmap[*offset..*offset + *length])
634        }
635
636        /// Iterate over messages
637        pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
638            self.ids
639                .iter()
640                .zip(self.messages.iter())
641                .map(move |(id, (offset, length))| {
642                    (id, &self.mmap.deref()[*offset..*offset + *length])
643                })
644        }
645
646        /// Get the number of messages
647        pub fn len(&self) -> usize {
648            self.messages.len()
649        }
650
651        /// Check if batch is empty
652        pub fn is_empty(&self) -> bool {
653            self.messages.is_empty()
654        }
655
656        /// Create a batch from a memory-mapped JSONL file (ASYNC - Non-blocking!)
657        ///
658        /// This is the async version of `from_jsonl_file` that uses `tokio::task::spawn_blocking`
659        /// to avoid blocking the async runtime during file I/O and parsing operations.
660        ///
661        /// # Production-Grade Async I/O
662        /// - Uses spawn_blocking for CPU-intensive mmap and parsing operations
663        /// - Maintains same functionality as sync version
664        /// - Safe to call from async contexts without blocking
665        /// - Proper error propagation and resource cleanup
666        pub async fn from_jsonl_file_async(path: &std::path::Path) -> std::io::Result<Self> {
667            let path = path.to_path_buf(); // Clone for move into spawn_blocking
668
669            tokio::task::spawn_blocking(move || Self::from_jsonl_file(&path))
670                .await
671                .map_err(|join_err| {
672                    std::io::Error::other(format!(
673                        "Async JSONL batch operation failed: {}",
674                        join_err
675                    ))
676                })?
677        }
678    }
679}
680
681/// Memory-mapped file support (safe wrapper when feature is disabled)
682#[cfg(not(feature = "mmap"))]
683pub mod mmap {
684    use super::*;
685    use std::fs;
686    use std::io;
687
688    /// Fallback implementation using regular file I/O
689    #[derive(Debug)]
690    pub struct MmapMessage {
691        /// Unique message identifier
692        pub id: Arc<MessageId>,
693        /// Message data as bytes
694        pub data: Bytes,
695        /// Message metadata
696        pub metadata: MessageMetadata,
697    }
698
699    impl MmapMessage {
700        /// Create a message by reading from a file
701        pub fn from_file(
702            id: MessageId,
703            path: &Path,
704            offset: usize,
705            length: Option<usize>,
706        ) -> io::Result<Self> {
707            let data = fs::read(path)?;
708            let file_size = data.len();
709
710            if offset >= file_size {
711                return Err(io::Error::new(
712                    io::ErrorKind::InvalidInput,
713                    "Offset exceeds file size",
714                ));
715            }
716
717            let actual_length = length.unwrap_or(file_size - offset);
718            let actual_length = actual_length.min(file_size - offset);
719
720            let data = Bytes::copy_from_slice(&data[offset..offset + actual_length]);
721
722            Ok(Self {
723                id: Arc::new(id),
724                data: data.clone(),
725                metadata: MessageMetadata {
726                    created_at: Timestamp::now(),
727                    content_type: ContentType::Json,
728                    size: actual_length,
729                    correlation_id: None,
730                },
731            })
732        }
733
734        /// Get the message data as a byte slice
735        #[inline]
736        pub fn data(&self) -> &[u8] {
737            &self.data
738        }
739
740        /// Convert the message data to Bytes
741        #[inline]
742        pub fn to_bytes(&self) -> Bytes {
743            self.data.clone()
744        }
745
746        /// Parse the message data as JSON
747        pub fn parse_json<T>(&self) -> Result<T>
748        where
749            T: for<'de> Deserialize<'de>,
750        {
751            serde_json::from_slice(&self.data)
752                .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
753        }
754
755        /// Get the message data as a string slice
756        pub fn as_str(&self) -> Result<&str> {
757            std::str::from_utf8(&self.data)
758                .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
759        }
760
761        /// Create a message from a file (ASYNC - Non-blocking fallback!)
762        ///
763        /// This is the async version of `from_file` for the non-mmap fallback
764        /// that uses `tokio::task::spawn_blocking` to avoid blocking the async runtime.
765        pub async fn from_file_async(
766            id: MessageId,
767            path: &std::path::Path,
768            offset: usize,
769            length: Option<usize>,
770        ) -> std::io::Result<Self> {
771            let path = path.to_path_buf(); // Clone for move into spawn_blocking
772
773            tokio::task::spawn_blocking(move || Self::from_file(id, &path, offset, length))
774                .await
775                .map_err(|join_err| {
776                    std::io::Error::other(format!("Async file operation failed: {}", join_err))
777                })?
778        }
779    }
780
781    /// Fallback pool implementation
782    #[derive(Debug)]
783    pub struct MmapPool {
784        cache: dashmap::DashMap<std::path::PathBuf, Bytes>,
785        max_size: usize,
786    }
787
788    impl MmapPool {
789        /// Create a new MmapPool with the specified maximum cache size
790        pub fn new(max_size: usize) -> Self {
791            Self {
792                cache: dashmap::DashMap::new(),
793                max_size,
794            }
795        }
796
797        /// Get or create a cached file read
798        pub fn get_or_create(&self, path: &Path) -> io::Result<Bytes> {
799            if let Some(data) = self.cache.get(path) {
800                return Ok(data.clone());
801            }
802
803            let data = fs::read(path)?;
804            let bytes = Bytes::from(data);
805
806            if self.cache.len() < self.max_size {
807                self.cache.insert(path.to_path_buf(), bytes.clone());
808            }
809
810            Ok(bytes)
811        }
812
813        /// Clear all cached entries
814        pub fn clear(&self) {
815            self.cache.clear();
816        }
817
818        /// Get the current number of cached entries
819        pub fn size(&self) -> usize {
820            self.cache.len()
821        }
822    }
823
824    /// Fallback batch implementation
825    #[derive(Debug)]
826    pub struct MmapBatch {
827        data: Bytes,
828        messages: Vec<(usize, usize)>,
829        ids: Vec<Arc<MessageId>>,
830    }
831
832    impl MmapBatch {
833        /// Create a batch from a JSONL file
834        pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
835            let data = fs::read(path)?;
836            let mut messages = Vec::new();
837            let mut ids = Vec::new();
838            let mut offset = 0;
839
840            for (idx, line) in data.split(|&b| b == b'\n').enumerate() {
841                if !line.is_empty() {
842                    messages.push((offset, line.len()));
843                    ids.push(Arc::new(MessageId::Number(idx as i64)));
844                }
845                offset += line.len() + 1;
846            }
847
848            Ok(Self {
849                data: Bytes::from(data),
850                messages,
851                ids,
852            })
853        }
854
855        /// Get a message by index
856        #[inline]
857        pub fn get(&self, index: usize) -> Option<&[u8]> {
858            self.messages
859                .get(index)
860                .map(|(offset, length)| &self.data[*offset..*offset + *length])
861        }
862
863        /// Iterate over all messages in the batch
864        pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
865            self.ids
866                .iter()
867                .zip(self.messages.iter())
868                .map(move |(id, (offset, length))| (id, &self.data[*offset..*offset + *length]))
869        }
870
871        /// Get the number of messages in the batch
872        pub fn len(&self) -> usize {
873            self.messages.len()
874        }
875
876        /// Check if the batch is empty
877        pub fn is_empty(&self) -> bool {
878            self.messages.is_empty()
879        }
880
881        /// Create a batch from a JSONL file (ASYNC - Non-blocking fallback!)
882        ///
883        /// This is the async version of `from_jsonl_file` for the non-mmap fallback
884        /// that uses `tokio::task::spawn_blocking` to avoid blocking the async runtime.
885        pub async fn from_jsonl_file_async(path: &std::path::Path) -> std::io::Result<Self> {
886            let path = path.to_path_buf(); // Clone for move into spawn_blocking
887
888            tokio::task::spawn_blocking(move || Self::from_jsonl_file(&path))
889                .await
890                .map_err(|join_err| {
891                    std::io::Error::other(format!(
892                        "Async JSONL batch operation failed: {}",
893                        join_err
894                    ))
895                })?
896        }
897    }
898}
899
900impl From<String> for MessageId {
901    fn from(s: String) -> Self {
902        Self::String(Arc::from(s))
903    }
904}
905
906impl From<&str> for MessageId {
907    fn from(s: &str) -> Self {
908        Self::String(Arc::from(s))
909    }
910}
911
912impl From<i64> for MessageId {
913    fn from(n: i64) -> Self {
914        Self::Number(n)
915    }
916}
917
918impl From<Uuid> for MessageId {
919    fn from(u: Uuid) -> Self {
920        Self::Uuid(u)
921    }
922}
923
924impl fmt::Display for MessageId {
925    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
926        match self {
927            Self::String(s) => write!(f, "{}", s),
928            Self::Number(n) => write!(f, "{}", n),
929            Self::Uuid(u) => write!(f, "{}", u),
930        }
931    }
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937
938    #[test]
939    fn test_zero_copy_message_creation() {
940        let payload = Bytes::from(r#"{"test": "data"}"#);
941        let msg = ZeroCopyMessage::from_bytes(MessageId::from("test-1"), payload.clone());
942
943        assert_eq!(msg.payload, payload);
944        assert_eq!(msg.metadata.size, payload.len());
945    }
946
947    #[test]
948    fn test_lazy_json_parsing() {
949        let payload = Bytes::from(r#"{"key": "value", "number": 42}"#);
950        let mut msg = ZeroCopyMessage::from_bytes(MessageId::from("test-2"), payload);
951
952        // Parse lazily
953        let raw = msg.parse_json_lazy().unwrap();
954        assert!(raw.get().contains("value"));
955
956        // Check that lazy_json is now populated
957        assert!(msg.lazy_json.is_some());
958    }
959
960    #[test]
961    fn test_buffer_pool() {
962        let pool = BufferPool::new(2, 1024);
963
964        let buf1 = pool.acquire();
965        let buf2 = pool.acquire();
966        let buf3 = pool.acquire(); // Should create new
967
968        assert_eq!(buf1.capacity(), 1024);
969        assert_eq!(buf2.capacity(), 1024);
970        assert_eq!(buf3.capacity(), 1024);
971
972        pool.release(buf1);
973        let buf4 = pool.acquire(); // Should reuse
974        assert_eq!(buf4.capacity(), 1024);
975    }
976
977    #[test]
978    fn test_message_batch() {
979        let mut batch = MessageBatch::new(10);
980
981        batch.add(MessageId::from("msg1"), Bytes::from("data1"));
982        batch.add(MessageId::from("msg2"), Bytes::from("data2"));
983        batch.add(MessageId::from("msg3"), Bytes::from("data3"));
984
985        assert_eq!(batch.messages.len(), 3);
986
987        let msg1 = batch.get(0).unwrap();
988        assert_eq!(msg1, Bytes::from("data1"));
989
990        let msg2 = batch.get(1).unwrap();
991        assert_eq!(msg2, Bytes::from("data2"));
992
993        // Iterate without copying
994        let mut count = 0;
995        for (_id, payload) in batch.iter() {
996            count += 1;
997            assert!(!payload.is_empty());
998        }
999        assert_eq!(count, 3);
1000    }
1001
1002    #[test]
1003    fn test_cheap_clone() {
1004        let msg = ZeroCopyMessage::from_bytes(MessageId::from("test"), Bytes::from("data"));
1005
1006        let cloned = msg.cheap_clone();
1007
1008        // Should share the same Arc pointers
1009        assert!(Arc::ptr_eq(&msg.id, &cloned.id));
1010        assert_eq!(msg.payload, cloned.payload);
1011    }
1012
1013    #[test]
1014    fn test_mmap_message() {
1015        use std::io::Write;
1016
1017        // Create a test file
1018        let temp_dir = std::env::temp_dir();
1019        let test_file = temp_dir.join("test_mmap.json");
1020        let mut file = std::fs::File::create(&test_file).unwrap();
1021        let test_data = r#"{"test": "data", "value": 42}"#;
1022        file.write_all(test_data.as_bytes()).unwrap();
1023        file.sync_all().unwrap();
1024        drop(file);
1025
1026        // Test memory-mapped message
1027        let msg = mmap::MmapMessage::from_file(MessageId::from("mmap-test"), &test_file, 0, None)
1028            .unwrap();
1029
1030        assert_eq!(msg.data(), test_data.as_bytes());
1031        assert_eq!(msg.as_str().unwrap(), test_data);
1032
1033        // Test JSON parsing
1034        let value: serde_json::Value = msg.parse_json().unwrap();
1035        assert_eq!(value["test"], "data");
1036        assert_eq!(value["value"], 42);
1037
1038        // Clean up
1039        std::fs::remove_file(test_file).unwrap();
1040    }
1041
1042    #[test]
1043    fn test_mmap_batch() {
1044        use std::io::Write;
1045
1046        // Create a test JSONL file
1047        let temp_dir = std::env::temp_dir();
1048        let test_file = temp_dir.join("test_batch.jsonl");
1049        let mut file = std::fs::File::create(&test_file).unwrap();
1050        writeln!(file, r#"{{"id": 1, "name": "first"}}"#).unwrap();
1051        writeln!(file, r#"{{"id": 2, "name": "second"}}"#).unwrap();
1052        writeln!(file, r#"{{"id": 3, "name": "third"}}"#).unwrap();
1053        file.sync_all().unwrap();
1054        drop(file);
1055
1056        // Test batch processing
1057        let batch = mmap::MmapBatch::from_jsonl_file(&test_file).unwrap();
1058
1059        assert_eq!(batch.len(), 3);
1060        assert!(!batch.is_empty());
1061
1062        // Test individual access
1063        let msg1 = batch.get(0).unwrap();
1064        let value: serde_json::Value = serde_json::from_slice(msg1).unwrap();
1065        assert_eq!(value["id"], 1);
1066        assert_eq!(value["name"], "first");
1067
1068        // Test iteration
1069        let mut count = 0;
1070        for (_id, data) in batch.iter() {
1071            let value: serde_json::Value = serde_json::from_slice(data).unwrap();
1072            assert!(value["id"].is_number());
1073            assert!(value["name"].is_string());
1074            count += 1;
1075        }
1076        assert_eq!(count, 3);
1077
1078        // Clean up
1079        std::fs::remove_file(test_file).unwrap();
1080    }
1081
1082    #[test]
1083    fn test_mmap_pool() {
1084        use std::io::Write;
1085
1086        // Create test files
1087        let temp_dir = std::env::temp_dir();
1088        let test_file1 = temp_dir.join("pool_test1.json");
1089        let test_file2 = temp_dir.join("pool_test2.json");
1090
1091        let mut file1 = std::fs::File::create(&test_file1).unwrap();
1092        file1.write_all(b"test1").unwrap();
1093        file1.sync_all().unwrap();
1094
1095        let mut file2 = std::fs::File::create(&test_file2).unwrap();
1096        file2.write_all(b"test2").unwrap();
1097        file2.sync_all().unwrap();
1098
1099        // Test pool
1100        let pool = mmap::MmapPool::new(10);
1101
1102        assert_eq!(pool.size(), 0);
1103
1104        let _data1 = pool.get_or_create(&test_file1).unwrap();
1105        assert_eq!(pool.size(), 1);
1106
1107        let _data2 = pool.get_or_create(&test_file2).unwrap();
1108        assert_eq!(pool.size(), 2);
1109
1110        // Getting again should use cache
1111        let _data1_again = pool.get_or_create(&test_file1).unwrap();
1112        assert_eq!(pool.size(), 2); // Still 2, used cache
1113
1114        pool.clear();
1115        assert_eq!(pool.size(), 0);
1116
1117        // Clean up
1118        std::fs::remove_file(test_file1).unwrap();
1119        std::fs::remove_file(test_file2).unwrap();
1120    }
1121
1122    // ============================================================================
1123    // Async Memory Mapping Tests - Production-Grade TDD
1124    // ============================================================================
1125
1126    #[cfg(feature = "mmap")]
1127    mod async_mmap_tests {
1128        use super::MessageId;
1129        use super::mmap::*;
1130        use std::io::Write;
1131        use std::path::Path;
1132
1133        #[tokio::test]
1134        async fn test_mmap_message_from_file_async_performance() {
1135            // Test that from_file_async doesn't block the async runtime
1136
1137            let temp_dir = std::env::temp_dir();
1138            let test_file = temp_dir.join("async_mmap_test.json");
1139
1140            // Create test file
1141            {
1142                let mut file = std::fs::File::create(&test_file).unwrap();
1143                let test_data = r#"{"test": "async_data", "large_field": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."}"#;
1144                file.write_all(test_data.as_bytes()).unwrap();
1145                file.sync_all().unwrap();
1146            }
1147
1148            // Test concurrent async calls don't block each other
1149            let handles = (0..3)
1150                .map(|i| {
1151                    let test_file = test_file.clone();
1152                    tokio::spawn(async move {
1153                        let start_time = std::time::Instant::now();
1154
1155                        // This will FAIL initially because we need to implement from_file_async
1156                        let result = MmapMessage::from_file_async(
1157                            MessageId::from(format!("async-test-{}", i)),
1158                            &test_file,
1159                            0,
1160                            None,
1161                        )
1162                        .await;
1163
1164                        let duration = start_time.elapsed();
1165
1166                        // Should complete quickly without blocking other async tasks
1167                        assert!(
1168                            duration.as_millis() < 100,
1169                            "Async mmap took {}ms - should be <100ms",
1170                            duration.as_millis()
1171                        );
1172
1173                        (i, result)
1174                    })
1175                })
1176                .collect::<Vec<_>>();
1177
1178            let start_time = std::time::Instant::now();
1179            let results = futures::future::join_all(handles).await;
1180            let total_duration = start_time.elapsed();
1181
1182            // All concurrent operations should complete quickly
1183            assert!(
1184                total_duration.as_millis() < 200,
1185                "Concurrent async mmap operations took {}ms - should be <200ms",
1186                total_duration.as_millis()
1187            );
1188
1189            // All should succeed and return valid messages
1190            for result in results {
1191                let (i, mmap_result) = result.unwrap();
1192                let mmap_msg = mmap_result.unwrap();
1193                assert_eq!(*mmap_msg.id, MessageId::from(format!("async-test-{}", i)));
1194                assert!(!mmap_msg.data().is_empty());
1195            }
1196
1197            // Clean up
1198            std::fs::remove_file(test_file).unwrap();
1199        }
1200
1201        #[tokio::test]
1202        async fn test_mmap_batch_from_jsonl_file_async_concurrency() {
1203            let temp_dir = std::env::temp_dir();
1204            let test_file = temp_dir.join("async_batch_test.jsonl");
1205
1206            // Create JSONL test file
1207            {
1208                let mut file = std::fs::File::create(&test_file).unwrap();
1209                writeln!(file, r#"{{"id": "msg1", "data": "test1"}}"#).unwrap();
1210                writeln!(file, r#"{{"id": "msg2", "data": "test2"}}"#).unwrap();
1211                writeln!(file, r#"{{"id": "msg3", "data": "test3"}}"#).unwrap();
1212                file.sync_all().unwrap();
1213            }
1214
1215            // Test that async version doesn't block concurrent operations
1216            let handles = (0..5)
1217                .map(|_| {
1218                    let test_file = test_file.clone();
1219                    tokio::spawn(async move {
1220                        let start_time = std::time::Instant::now();
1221
1222                        // This will FAIL initially - need to implement from_jsonl_file_async
1223                        let result = MmapBatch::from_jsonl_file_async(&test_file).await;
1224
1225                        let duration = start_time.elapsed();
1226                        assert!(
1227                            duration.as_millis() < 150,
1228                            "Async batch processing took {}ms - should be <150ms",
1229                            duration.as_millis()
1230                        );
1231
1232                        result
1233                    })
1234                })
1235                .collect::<Vec<_>>();
1236
1237            let results = futures::future::join_all(handles).await;
1238
1239            // All should succeed
1240            for result in results {
1241                let batch = result.unwrap().unwrap();
1242                assert_eq!(batch.len(), 3);
1243            }
1244
1245            std::fs::remove_file(test_file).unwrap();
1246        }
1247
1248        #[tokio::test]
1249        async fn test_async_mmap_error_handling() {
1250            // Test async error handling for non-existent files
1251            let non_existent = Path::new("/tmp/does_not_exist_async.json");
1252
1253            let result = MmapMessage::from_file_async(
1254                MessageId::String("error-test".to_string().into()),
1255                non_existent,
1256                0,
1257                None,
1258            )
1259            .await;
1260
1261            assert!(result.is_err());
1262
1263            // Error should be descriptive, not generic blocking I/O error
1264            let error_msg = format!("{}", result.unwrap_err());
1265            assert!(
1266                error_msg.contains("No such file") || error_msg.contains("not found"),
1267                "Error should be descriptive: {}",
1268                error_msg
1269            );
1270        }
1271
1272        #[tokio::test]
1273        async fn test_async_mmap_maintains_functionality() {
1274            // Verify async versions provide same functionality as sync versions
1275            let temp_dir = std::env::temp_dir();
1276            let test_file = temp_dir.join("functionality_test.json");
1277
1278            let test_data = r#"{"test": "functionality", "value": 42}"#;
1279            std::fs::write(&test_file, test_data).unwrap();
1280
1281            // Test both sync and async versions
1282            let sync_result = MmapMessage::from_file(
1283                MessageId::String("sync".to_string().into()),
1284                &test_file,
1285                0,
1286                None,
1287            )
1288            .unwrap();
1289
1290            let async_result = MmapMessage::from_file_async(
1291                MessageId::String("async".to_string().into()),
1292                &test_file,
1293                0,
1294                None,
1295            )
1296            .await
1297            .unwrap();
1298
1299            // Same data should be accessible
1300            assert_eq!(sync_result.data(), async_result.data());
1301            assert_eq!(sync_result.data(), test_data.as_bytes());
1302
1303            std::fs::remove_file(test_file).unwrap();
1304        }
1305    }
1306}