1use bytes::{BufMut, Bytes, BytesMut};
50use serde::{Deserialize, Serialize};
51use serde_json::value::RawValue;
52use std::fmt;
53use std::path::Path;
54use std::sync::Arc;
55use uuid::Uuid;
56
57use crate::error::{Error, Result};
58use crate::types::{ContentType, Timestamp};
59
60#[derive(Debug, Clone)]
62pub struct ZeroCopyMessage {
63 pub id: Arc<MessageId>,
65
66 pub payload: Bytes,
68
69 pub lazy_json: Option<Box<RawValue>>,
71
72 pub metadata: MessageMetadata,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
78pub enum MessageId {
79 String(Arc<str>),
81 Number(i64),
83 Uuid(Uuid),
85}
86
87#[derive(Debug, Clone)]
89pub struct MessageMetadata {
90 pub created_at: Timestamp,
92 pub content_type: ContentType,
94 pub size: usize,
96 pub correlation_id: Option<Arc<str>>,
98}
99
100impl ZeroCopyMessage {
101 #[inline]
103 pub fn from_bytes(id: MessageId, payload: Bytes) -> Self {
104 let size = payload.len();
105 Self {
106 id: Arc::new(id),
107 payload: payload.clone(),
108 lazy_json: None,
109 metadata: MessageMetadata {
110 created_at: Timestamp::now(),
111 content_type: ContentType::Json,
112 size,
113 correlation_id: None,
114 },
115 }
116 }
117
118 pub fn from_json<T: Serialize>(id: MessageId, value: &T) -> Result<Self> {
120 let mut buffer = BytesMut::with_capacity(1024);
122
123 serde_json::to_writer((&mut buffer).writer(), value)
125 .map_err(|e| Error::serialization(e.to_string()))?;
126
127 let payload = buffer.freeze();
128 let size = payload.len();
129
130 Ok(Self {
131 id: Arc::new(id),
132 payload,
133 lazy_json: None,
134 metadata: MessageMetadata {
135 created_at: Timestamp::now(),
136 content_type: ContentType::Json,
137 size,
138 correlation_id: None,
139 },
140 })
141 }
142
143 #[inline]
145 pub fn parse_json_lazy(&mut self) -> Result<&RawValue> {
146 if self.lazy_json.is_none() {
147 let raw: Box<RawValue> = serde_json::from_slice(&self.payload)
149 .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))?;
150
151 self.lazy_json = Some(raw);
153 }
154
155 Ok(self.lazy_json.as_ref().unwrap())
156 }
157
158 #[inline]
160 pub fn deserialize<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
161 #[cfg(feature = "simd")]
162 {
163 if self.payload.len() >= 64 {
165 let mut buffer = self.payload.to_vec();
167 simd_json::from_slice(&mut buffer)
168 .map_err(|e| Error::serialization(format!("SIMD deserialize error: {}", e)))
169 } else {
170 serde_json::from_slice(&self.payload)
172 .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
173 }
174 }
175 #[cfg(not(feature = "simd"))]
176 {
177 serde_json::from_slice(&self.payload)
178 .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
179 }
180 }
181
182 #[inline]
184 pub fn payload_slice(&self) -> &[u8] {
185 &self.payload
186 }
187
188 #[inline]
190 pub fn cheap_clone(&self) -> Self {
191 Self {
192 id: Arc::clone(&self.id),
193 payload: self.payload.clone(), lazy_json: self.lazy_json.clone(),
195 metadata: self.metadata.clone(),
196 }
197 }
198}
199
200#[derive(Debug)]
202pub struct BufferPool {
203 buffers: crossbeam::queue::ArrayQueue<BytesMut>,
205 capacity: usize,
207}
208
209impl BufferPool {
210 pub fn new(size: usize, capacity: usize) -> Self {
212 let buffers = crossbeam::queue::ArrayQueue::new(size);
213
214 for _ in 0..size {
216 let _ = buffers.push(BytesMut::with_capacity(capacity));
217 }
218
219 Self { buffers, capacity }
220 }
221
222 #[inline]
224 pub fn acquire(&self) -> BytesMut {
225 self.buffers
226 .pop()
227 .unwrap_or_else(|| BytesMut::with_capacity(self.capacity))
228 }
229
230 #[inline]
232 pub fn release(&self, mut buffer: BytesMut) {
233 buffer.clear();
234 let _ = self.buffers.push(buffer);
235 }
236}
237
238#[derive(Debug)]
240pub struct MessageBatch {
241 pub buffer: Bytes,
243 pub messages: Vec<(usize, usize)>,
245 pub ids: Vec<Arc<MessageId>>,
247}
248
249impl MessageBatch {
250 pub fn new(capacity: usize) -> Self {
252 Self {
253 buffer: Bytes::new(),
254 messages: Vec::with_capacity(capacity),
255 ids: Vec::with_capacity(capacity),
256 }
257 }
258
259 pub fn add(&mut self, id: MessageId, payload: Bytes) {
261 let offset = self.buffer.len();
262 let length = payload.len();
263
264 let mut buffer = BytesMut::from(self.buffer.as_ref());
266 buffer.extend_from_slice(&payload);
267 self.buffer = buffer.freeze();
268
269 self.messages.push((offset, length));
271 self.ids.push(Arc::new(id));
272 }
273
274 #[inline]
276 pub fn get(&self, index: usize) -> Option<Bytes> {
277 self.messages
278 .get(index)
279 .map(|(offset, length)| self.buffer.slice(*offset..*offset + *length))
280 }
281
282 pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, Bytes)> + '_ {
284 self.ids
285 .iter()
286 .zip(self.messages.iter())
287 .map(move |(id, (offset, length))| (id, self.buffer.slice(*offset..*offset + *length)))
288 }
289}
290
291pub mod fast {
293 #[inline]
295 pub fn validate_utf8_fast(bytes: &[u8]) -> bool {
296 #[cfg(feature = "simd")]
297 {
298 if bytes.len() >= 64 {
300 simdutf8::basic::from_utf8(bytes).is_ok()
301 } else {
302 std::str::from_utf8(bytes).is_ok()
303 }
304 }
305 #[cfg(not(feature = "simd"))]
306 {
307 std::str::from_utf8(bytes).is_ok()
308 }
309 }
310
311 #[inline]
313 pub fn find_json_boundaries(bytes: &[u8]) -> Vec<usize> {
314 let mut boundaries = Vec::new();
315 let mut depth = 0;
316 let mut in_string = false;
317 let mut escaped = false;
318
319 for (i, &byte) in bytes.iter().enumerate() {
321 if escaped {
322 escaped = false;
323 continue;
324 }
325
326 match byte {
327 b'\\' if in_string => escaped = true,
328 b'"' if !escaped => in_string = !in_string,
329 b'{' | b'[' if !in_string => depth += 1,
330 b'}' | b']' if !in_string => {
331 depth -= 1;
332 if depth == 0 {
333 boundaries.push(i + 1);
334 }
335 }
336 _ => {}
337 }
338 }
339
340 boundaries
341 }
342
343 #[cfg(feature = "simd")]
345 #[inline]
346 pub fn validate_json_fast(bytes: &[u8]) -> bool {
347 if bytes.len() >= 64 {
348 let mut owned = bytes.to_vec();
350 simd_json::to_borrowed_value(&mut owned).is_ok()
351 } else {
352 serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
354 }
355 }
356
357 #[cfg(not(feature = "simd"))]
361 #[inline]
362 pub fn validate_json_fast(bytes: &[u8]) -> bool {
363 serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
364 }
365}
366
367#[cfg(feature = "mmap")]
369pub mod mmap {
370 use super::*;
371 use memmap2::{Mmap, MmapOptions};
372 use std::fs::File;
373 use std::io;
374 use std::ops::Deref;
375
376 #[derive(Debug)]
380 pub struct MmapMessage {
381 pub id: Arc<MessageId>,
383 pub mmap: Arc<Mmap>,
385 pub offset: usize,
387 pub length: usize,
389 pub metadata: MessageMetadata,
391 }
392
393 impl MmapMessage {
394 pub fn from_file(
399 id: MessageId,
400 path: &Path,
401 offset: usize,
402 length: Option<usize>,
403 ) -> io::Result<Self> {
404 let file = File::open(path)?;
406 let metadata = file.metadata()?;
407 let file_size = metadata.len() as usize;
408
409 if offset >= file_size {
411 return Err(io::Error::new(
412 io::ErrorKind::InvalidInput,
413 "Offset exceeds file size",
414 ));
415 }
416
417 let actual_length = length.unwrap_or(file_size - offset);
419 let actual_length = actual_length.min(file_size - offset);
420
421 let mmap = unsafe { MmapOptions::new().map(&file)? };
425
426 Ok(Self {
427 id: Arc::new(id),
428 mmap: Arc::new(mmap),
429 offset,
430 length: actual_length,
431 metadata: MessageMetadata {
432 created_at: Timestamp::now(),
433 content_type: ContentType::Json,
434 size: actual_length,
435 correlation_id: None,
436 },
437 })
438 }
439
440 #[allow(dead_code)] async fn from_file_internal(
443 id: MessageId,
444 path: &Path,
445 offset: usize,
446 length: Option<usize>,
447 ) -> io::Result<Self> {
448 let file = File::open(path)?;
449 let metadata = file.metadata()?;
450 let file_size = metadata.len() as usize;
451
452 if offset >= file_size {
454 return Err(io::Error::new(
455 io::ErrorKind::InvalidInput,
456 "Offset exceeds file size",
457 ));
458 }
459
460 let actual_length = length.unwrap_or(file_size - offset);
462 let actual_length = actual_length.min(file_size - offset);
463
464 let mmap = unsafe { MmapOptions::new().map(&file)? };
468
469 Ok(Self {
470 id: Arc::new(id),
471 mmap: Arc::new(mmap),
472 offset,
473 length: actual_length,
474 metadata: MessageMetadata {
475 created_at: Timestamp::now(),
476 content_type: ContentType::Json,
477 size: actual_length,
478 correlation_id: None,
479 },
480 })
481 }
482
483 pub async fn from_file_async(
494 id: MessageId,
495 path: &std::path::Path,
496 offset: usize,
497 length: Option<usize>,
498 ) -> std::io::Result<Self> {
499 let path = path.to_path_buf(); tokio::task::spawn_blocking(move || Self::from_file(id, &path, offset, length))
502 .await
503 .map_err(|join_err| {
504 std::io::Error::other(format!("Async mmap operation failed: {}", join_err))
505 })?
506 }
507
508 #[inline]
510 pub fn data(&self) -> &[u8] {
511 &self.mmap[self.offset..self.offset + self.length]
512 }
513
514 #[inline]
516 pub fn to_bytes(&self) -> Bytes {
517 Bytes::copy_from_slice(self.data())
518 }
519
520 pub fn parse_json<T>(&self) -> Result<T>
523 where
524 T: for<'de> Deserialize<'de>,
525 {
526 serde_json::from_slice(self.data())
527 .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
528 }
529
530 pub fn as_str(&self) -> Result<&str> {
533 std::str::from_utf8(self.data())
534 .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
535 }
536 }
537
538 #[derive(Debug)]
540 pub struct MmapPool {
541 maps: dashmap::DashMap<std::path::PathBuf, Arc<Mmap>>,
543 max_size: usize,
545 }
546
547 impl MmapPool {
548 pub fn new(max_size: usize) -> Self {
550 Self {
551 maps: dashmap::DashMap::new(),
552 max_size,
553 }
554 }
555
556 pub fn get_or_create(&self, path: &Path) -> io::Result<Arc<Mmap>> {
558 if let Some(mmap) = self.maps.get(path) {
560 return Ok(Arc::clone(&*mmap));
561 }
562
563 let file = File::open(path)?;
565 let mmap = unsafe { MmapOptions::new().map(&file)? };
568 let mmap = Arc::new(mmap);
569
570 if self.maps.len() < self.max_size {
572 self.maps.insert(path.to_path_buf(), Arc::clone(&mmap));
573 }
574
575 Ok(mmap)
576 }
577
578 pub fn clear(&self) {
580 self.maps.clear();
581 }
582
583 pub fn size(&self) -> usize {
585 self.maps.len()
586 }
587 }
588
589 #[derive(Debug)]
591 pub struct MmapBatch {
592 mmap: Arc<Mmap>,
594 messages: Vec<(usize, usize)>,
596 ids: Vec<Arc<MessageId>>,
598 }
599
600 impl MmapBatch {
601 pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
603 let file = File::open(path)?;
604 let mmap = unsafe { MmapOptions::new().map(&file)? };
607
608 let mut messages = Vec::new();
609 let mut ids = Vec::new();
610 let mut offset = 0;
611
612 for (idx, line) in mmap.split(|&b| b == b'\n').enumerate() {
614 if !line.is_empty() {
615 messages.push((offset, line.len()));
616 ids.push(Arc::new(MessageId::Number(idx as i64)));
617 }
618 offset += line.len() + 1; }
620
621 Ok(Self {
622 mmap: Arc::new(mmap),
623 messages,
624 ids,
625 })
626 }
627
628 #[inline]
630 pub fn get(&self, index: usize) -> Option<&[u8]> {
631 self.messages
632 .get(index)
633 .map(|(offset, length)| &self.mmap[*offset..*offset + *length])
634 }
635
636 pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
638 self.ids
639 .iter()
640 .zip(self.messages.iter())
641 .map(move |(id, (offset, length))| {
642 (id, &self.mmap.deref()[*offset..*offset + *length])
643 })
644 }
645
646 pub fn len(&self) -> usize {
648 self.messages.len()
649 }
650
651 pub fn is_empty(&self) -> bool {
653 self.messages.is_empty()
654 }
655
656 pub async fn from_jsonl_file_async(path: &std::path::Path) -> std::io::Result<Self> {
667 let path = path.to_path_buf(); tokio::task::spawn_blocking(move || Self::from_jsonl_file(&path))
670 .await
671 .map_err(|join_err| {
672 std::io::Error::other(format!(
673 "Async JSONL batch operation failed: {}",
674 join_err
675 ))
676 })?
677 }
678 }
679}
680
681#[cfg(not(feature = "mmap"))]
683pub mod mmap {
684 use super::*;
685 use std::fs;
686 use std::io;
687
688 #[derive(Debug)]
690 pub struct MmapMessage {
691 pub id: Arc<MessageId>,
693 pub data: Bytes,
695 pub metadata: MessageMetadata,
697 }
698
699 impl MmapMessage {
700 pub fn from_file(
702 id: MessageId,
703 path: &Path,
704 offset: usize,
705 length: Option<usize>,
706 ) -> io::Result<Self> {
707 let data = fs::read(path)?;
708 let file_size = data.len();
709
710 if offset >= file_size {
711 return Err(io::Error::new(
712 io::ErrorKind::InvalidInput,
713 "Offset exceeds file size",
714 ));
715 }
716
717 let actual_length = length.unwrap_or(file_size - offset);
718 let actual_length = actual_length.min(file_size - offset);
719
720 let data = Bytes::copy_from_slice(&data[offset..offset + actual_length]);
721
722 Ok(Self {
723 id: Arc::new(id),
724 data: data.clone(),
725 metadata: MessageMetadata {
726 created_at: Timestamp::now(),
727 content_type: ContentType::Json,
728 size: actual_length,
729 correlation_id: None,
730 },
731 })
732 }
733
734 #[inline]
736 pub fn data(&self) -> &[u8] {
737 &self.data
738 }
739
740 #[inline]
742 pub fn to_bytes(&self) -> Bytes {
743 self.data.clone()
744 }
745
746 pub fn parse_json<T>(&self) -> Result<T>
748 where
749 T: for<'de> Deserialize<'de>,
750 {
751 serde_json::from_slice(&self.data)
752 .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
753 }
754
755 pub fn as_str(&self) -> Result<&str> {
757 std::str::from_utf8(&self.data)
758 .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
759 }
760
761 pub async fn from_file_async(
766 id: MessageId,
767 path: &std::path::Path,
768 offset: usize,
769 length: Option<usize>,
770 ) -> std::io::Result<Self> {
771 let path = path.to_path_buf(); tokio::task::spawn_blocking(move || Self::from_file(id, &path, offset, length))
774 .await
775 .map_err(|join_err| {
776 std::io::Error::other(format!("Async file operation failed: {}", join_err))
777 })?
778 }
779 }
780
781 #[derive(Debug)]
783 pub struct MmapPool {
784 cache: dashmap::DashMap<std::path::PathBuf, Bytes>,
785 max_size: usize,
786 }
787
788 impl MmapPool {
789 pub fn new(max_size: usize) -> Self {
791 Self {
792 cache: dashmap::DashMap::new(),
793 max_size,
794 }
795 }
796
797 pub fn get_or_create(&self, path: &Path) -> io::Result<Bytes> {
799 if let Some(data) = self.cache.get(path) {
800 return Ok(data.clone());
801 }
802
803 let data = fs::read(path)?;
804 let bytes = Bytes::from(data);
805
806 if self.cache.len() < self.max_size {
807 self.cache.insert(path.to_path_buf(), bytes.clone());
808 }
809
810 Ok(bytes)
811 }
812
813 pub fn clear(&self) {
815 self.cache.clear();
816 }
817
818 pub fn size(&self) -> usize {
820 self.cache.len()
821 }
822 }
823
824 #[derive(Debug)]
826 pub struct MmapBatch {
827 data: Bytes,
828 messages: Vec<(usize, usize)>,
829 ids: Vec<Arc<MessageId>>,
830 }
831
832 impl MmapBatch {
833 pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
835 let data = fs::read(path)?;
836 let mut messages = Vec::new();
837 let mut ids = Vec::new();
838 let mut offset = 0;
839
840 for (idx, line) in data.split(|&b| b == b'\n').enumerate() {
841 if !line.is_empty() {
842 messages.push((offset, line.len()));
843 ids.push(Arc::new(MessageId::Number(idx as i64)));
844 }
845 offset += line.len() + 1;
846 }
847
848 Ok(Self {
849 data: Bytes::from(data),
850 messages,
851 ids,
852 })
853 }
854
855 #[inline]
857 pub fn get(&self, index: usize) -> Option<&[u8]> {
858 self.messages
859 .get(index)
860 .map(|(offset, length)| &self.data[*offset..*offset + *length])
861 }
862
863 pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
865 self.ids
866 .iter()
867 .zip(self.messages.iter())
868 .map(move |(id, (offset, length))| (id, &self.data[*offset..*offset + *length]))
869 }
870
871 pub fn len(&self) -> usize {
873 self.messages.len()
874 }
875
876 pub fn is_empty(&self) -> bool {
878 self.messages.is_empty()
879 }
880
881 pub async fn from_jsonl_file_async(path: &std::path::Path) -> std::io::Result<Self> {
886 let path = path.to_path_buf(); tokio::task::spawn_blocking(move || Self::from_jsonl_file(&path))
889 .await
890 .map_err(|join_err| {
891 std::io::Error::other(format!(
892 "Async JSONL batch operation failed: {}",
893 join_err
894 ))
895 })?
896 }
897 }
898}
899
900impl From<String> for MessageId {
901 fn from(s: String) -> Self {
902 Self::String(Arc::from(s))
903 }
904}
905
906impl From<&str> for MessageId {
907 fn from(s: &str) -> Self {
908 Self::String(Arc::from(s))
909 }
910}
911
912impl From<i64> for MessageId {
913 fn from(n: i64) -> Self {
914 Self::Number(n)
915 }
916}
917
918impl From<Uuid> for MessageId {
919 fn from(u: Uuid) -> Self {
920 Self::Uuid(u)
921 }
922}
923
924impl fmt::Display for MessageId {
925 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
926 match self {
927 Self::String(s) => write!(f, "{}", s),
928 Self::Number(n) => write!(f, "{}", n),
929 Self::Uuid(u) => write!(f, "{}", u),
930 }
931 }
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937
938 #[test]
939 fn test_zero_copy_message_creation() {
940 let payload = Bytes::from(r#"{"test": "data"}"#);
941 let msg = ZeroCopyMessage::from_bytes(MessageId::from("test-1"), payload.clone());
942
943 assert_eq!(msg.payload, payload);
944 assert_eq!(msg.metadata.size, payload.len());
945 }
946
947 #[test]
948 fn test_lazy_json_parsing() {
949 let payload = Bytes::from(r#"{"key": "value", "number": 42}"#);
950 let mut msg = ZeroCopyMessage::from_bytes(MessageId::from("test-2"), payload);
951
952 let raw = msg.parse_json_lazy().unwrap();
954 assert!(raw.get().contains("value"));
955
956 assert!(msg.lazy_json.is_some());
958 }
959
960 #[test]
961 fn test_buffer_pool() {
962 let pool = BufferPool::new(2, 1024);
963
964 let buf1 = pool.acquire();
965 let buf2 = pool.acquire();
966 let buf3 = pool.acquire(); assert_eq!(buf1.capacity(), 1024);
969 assert_eq!(buf2.capacity(), 1024);
970 assert_eq!(buf3.capacity(), 1024);
971
972 pool.release(buf1);
973 let buf4 = pool.acquire(); assert_eq!(buf4.capacity(), 1024);
975 }
976
977 #[test]
978 fn test_message_batch() {
979 let mut batch = MessageBatch::new(10);
980
981 batch.add(MessageId::from("msg1"), Bytes::from("data1"));
982 batch.add(MessageId::from("msg2"), Bytes::from("data2"));
983 batch.add(MessageId::from("msg3"), Bytes::from("data3"));
984
985 assert_eq!(batch.messages.len(), 3);
986
987 let msg1 = batch.get(0).unwrap();
988 assert_eq!(msg1, Bytes::from("data1"));
989
990 let msg2 = batch.get(1).unwrap();
991 assert_eq!(msg2, Bytes::from("data2"));
992
993 let mut count = 0;
995 for (_id, payload) in batch.iter() {
996 count += 1;
997 assert!(!payload.is_empty());
998 }
999 assert_eq!(count, 3);
1000 }
1001
1002 #[test]
1003 fn test_cheap_clone() {
1004 let msg = ZeroCopyMessage::from_bytes(MessageId::from("test"), Bytes::from("data"));
1005
1006 let cloned = msg.cheap_clone();
1007
1008 assert!(Arc::ptr_eq(&msg.id, &cloned.id));
1010 assert_eq!(msg.payload, cloned.payload);
1011 }
1012
1013 #[test]
1014 fn test_mmap_message() {
1015 use std::io::Write;
1016
1017 let temp_dir = std::env::temp_dir();
1019 let test_file = temp_dir.join("test_mmap.json");
1020 let mut file = std::fs::File::create(&test_file).unwrap();
1021 let test_data = r#"{"test": "data", "value": 42}"#;
1022 file.write_all(test_data.as_bytes()).unwrap();
1023 file.sync_all().unwrap();
1024 drop(file);
1025
1026 let msg = mmap::MmapMessage::from_file(MessageId::from("mmap-test"), &test_file, 0, None)
1028 .unwrap();
1029
1030 assert_eq!(msg.data(), test_data.as_bytes());
1031 assert_eq!(msg.as_str().unwrap(), test_data);
1032
1033 let value: serde_json::Value = msg.parse_json().unwrap();
1035 assert_eq!(value["test"], "data");
1036 assert_eq!(value["value"], 42);
1037
1038 std::fs::remove_file(test_file).unwrap();
1040 }
1041
1042 #[test]
1043 fn test_mmap_batch() {
1044 use std::io::Write;
1045
1046 let temp_dir = std::env::temp_dir();
1048 let test_file = temp_dir.join("test_batch.jsonl");
1049 let mut file = std::fs::File::create(&test_file).unwrap();
1050 writeln!(file, r#"{{"id": 1, "name": "first"}}"#).unwrap();
1051 writeln!(file, r#"{{"id": 2, "name": "second"}}"#).unwrap();
1052 writeln!(file, r#"{{"id": 3, "name": "third"}}"#).unwrap();
1053 file.sync_all().unwrap();
1054 drop(file);
1055
1056 let batch = mmap::MmapBatch::from_jsonl_file(&test_file).unwrap();
1058
1059 assert_eq!(batch.len(), 3);
1060 assert!(!batch.is_empty());
1061
1062 let msg1 = batch.get(0).unwrap();
1064 let value: serde_json::Value = serde_json::from_slice(msg1).unwrap();
1065 assert_eq!(value["id"], 1);
1066 assert_eq!(value["name"], "first");
1067
1068 let mut count = 0;
1070 for (_id, data) in batch.iter() {
1071 let value: serde_json::Value = serde_json::from_slice(data).unwrap();
1072 assert!(value["id"].is_number());
1073 assert!(value["name"].is_string());
1074 count += 1;
1075 }
1076 assert_eq!(count, 3);
1077
1078 std::fs::remove_file(test_file).unwrap();
1080 }
1081
1082 #[test]
1083 fn test_mmap_pool() {
1084 use std::io::Write;
1085
1086 let temp_dir = std::env::temp_dir();
1088 let test_file1 = temp_dir.join("pool_test1.json");
1089 let test_file2 = temp_dir.join("pool_test2.json");
1090
1091 let mut file1 = std::fs::File::create(&test_file1).unwrap();
1092 file1.write_all(b"test1").unwrap();
1093 file1.sync_all().unwrap();
1094
1095 let mut file2 = std::fs::File::create(&test_file2).unwrap();
1096 file2.write_all(b"test2").unwrap();
1097 file2.sync_all().unwrap();
1098
1099 let pool = mmap::MmapPool::new(10);
1101
1102 assert_eq!(pool.size(), 0);
1103
1104 let _data1 = pool.get_or_create(&test_file1).unwrap();
1105 assert_eq!(pool.size(), 1);
1106
1107 let _data2 = pool.get_or_create(&test_file2).unwrap();
1108 assert_eq!(pool.size(), 2);
1109
1110 let _data1_again = pool.get_or_create(&test_file1).unwrap();
1112 assert_eq!(pool.size(), 2); pool.clear();
1115 assert_eq!(pool.size(), 0);
1116
1117 std::fs::remove_file(test_file1).unwrap();
1119 std::fs::remove_file(test_file2).unwrap();
1120 }
1121
1122 #[cfg(feature = "mmap")]
1127 mod async_mmap_tests {
1128 use super::MessageId;
1129 use super::mmap::*;
1130 use std::io::Write;
1131 use std::path::Path;
1132
1133 #[tokio::test]
1134 async fn test_mmap_message_from_file_async_performance() {
1135 let temp_dir = std::env::temp_dir();
1138 let test_file = temp_dir.join("async_mmap_test.json");
1139
1140 {
1142 let mut file = std::fs::File::create(&test_file).unwrap();
1143 let test_data = r#"{"test": "async_data", "large_field": "Lorem ipsum dolor sit amet, consectetur adipiscing elit. Sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."}"#;
1144 file.write_all(test_data.as_bytes()).unwrap();
1145 file.sync_all().unwrap();
1146 }
1147
1148 let handles = (0..3)
1150 .map(|i| {
1151 let test_file = test_file.clone();
1152 tokio::spawn(async move {
1153 let start_time = std::time::Instant::now();
1154
1155 let result = MmapMessage::from_file_async(
1157 MessageId::from(format!("async-test-{}", i)),
1158 &test_file,
1159 0,
1160 None,
1161 )
1162 .await;
1163
1164 let duration = start_time.elapsed();
1165
1166 assert!(
1168 duration.as_millis() < 100,
1169 "Async mmap took {}ms - should be <100ms",
1170 duration.as_millis()
1171 );
1172
1173 (i, result)
1174 })
1175 })
1176 .collect::<Vec<_>>();
1177
1178 let start_time = std::time::Instant::now();
1179 let results = futures::future::join_all(handles).await;
1180 let total_duration = start_time.elapsed();
1181
1182 assert!(
1184 total_duration.as_millis() < 200,
1185 "Concurrent async mmap operations took {}ms - should be <200ms",
1186 total_duration.as_millis()
1187 );
1188
1189 for result in results {
1191 let (i, mmap_result) = result.unwrap();
1192 let mmap_msg = mmap_result.unwrap();
1193 assert_eq!(*mmap_msg.id, MessageId::from(format!("async-test-{}", i)));
1194 assert!(!mmap_msg.data().is_empty());
1195 }
1196
1197 std::fs::remove_file(test_file).unwrap();
1199 }
1200
1201 #[tokio::test]
1202 async fn test_mmap_batch_from_jsonl_file_async_concurrency() {
1203 let temp_dir = std::env::temp_dir();
1204 let test_file = temp_dir.join("async_batch_test.jsonl");
1205
1206 {
1208 let mut file = std::fs::File::create(&test_file).unwrap();
1209 writeln!(file, r#"{{"id": "msg1", "data": "test1"}}"#).unwrap();
1210 writeln!(file, r#"{{"id": "msg2", "data": "test2"}}"#).unwrap();
1211 writeln!(file, r#"{{"id": "msg3", "data": "test3"}}"#).unwrap();
1212 file.sync_all().unwrap();
1213 }
1214
1215 let handles = (0..5)
1217 .map(|_| {
1218 let test_file = test_file.clone();
1219 tokio::spawn(async move {
1220 let start_time = std::time::Instant::now();
1221
1222 let result = MmapBatch::from_jsonl_file_async(&test_file).await;
1224
1225 let duration = start_time.elapsed();
1226 assert!(
1227 duration.as_millis() < 150,
1228 "Async batch processing took {}ms - should be <150ms",
1229 duration.as_millis()
1230 );
1231
1232 result
1233 })
1234 })
1235 .collect::<Vec<_>>();
1236
1237 let results = futures::future::join_all(handles).await;
1238
1239 for result in results {
1241 let batch = result.unwrap().unwrap();
1242 assert_eq!(batch.len(), 3);
1243 }
1244
1245 std::fs::remove_file(test_file).unwrap();
1246 }
1247
1248 #[tokio::test]
1249 async fn test_async_mmap_error_handling() {
1250 let non_existent = Path::new("/tmp/does_not_exist_async.json");
1252
1253 let result = MmapMessage::from_file_async(
1254 MessageId::String("error-test".to_string().into()),
1255 non_existent,
1256 0,
1257 None,
1258 )
1259 .await;
1260
1261 assert!(result.is_err());
1262
1263 let error_msg = format!("{}", result.unwrap_err());
1265 assert!(
1266 error_msg.contains("No such file") || error_msg.contains("not found"),
1267 "Error should be descriptive: {}",
1268 error_msg
1269 );
1270 }
1271
1272 #[tokio::test]
1273 async fn test_async_mmap_maintains_functionality() {
1274 let temp_dir = std::env::temp_dir();
1276 let test_file = temp_dir.join("functionality_test.json");
1277
1278 let test_data = r#"{"test": "functionality", "value": 42}"#;
1279 std::fs::write(&test_file, test_data).unwrap();
1280
1281 let sync_result = MmapMessage::from_file(
1283 MessageId::String("sync".to_string().into()),
1284 &test_file,
1285 0,
1286 None,
1287 )
1288 .unwrap();
1289
1290 let async_result = MmapMessage::from_file_async(
1291 MessageId::String("async".to_string().into()),
1292 &test_file,
1293 0,
1294 None,
1295 )
1296 .await
1297 .unwrap();
1298
1299 assert_eq!(sync_result.data(), async_result.data());
1301 assert_eq!(sync_result.data(), test_data.as_bytes());
1302
1303 std::fs::remove_file(test_file).unwrap();
1304 }
1305 }
1306}