1use bytes::{BufMut, Bytes, BytesMut};
7use serde::{Deserialize, Serialize};
8use serde_json::value::RawValue;
9use std::fmt;
10use std::path::Path;
11use std::sync::Arc;
12use uuid::Uuid;
13
14use crate::error::{Error, Result};
15use crate::types::{ContentType, Timestamp};
16
17#[derive(Debug, Clone)]
19pub struct ZeroCopyMessage {
20 pub id: Arc<MessageId>,
22
23 pub payload: Bytes,
25
26 pub lazy_json: Option<Box<RawValue>>,
28
29 pub metadata: MessageMetadata,
31}
32
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
35pub enum MessageId {
36 String(Arc<str>),
38 Number(i64),
40 Uuid(Uuid),
42}
43
44#[derive(Debug, Clone)]
46pub struct MessageMetadata {
47 pub created_at: Timestamp,
49 pub content_type: ContentType,
51 pub size: usize,
53 pub correlation_id: Option<Arc<str>>,
55}
56
57impl ZeroCopyMessage {
58 #[inline]
60 pub fn from_bytes(id: MessageId, payload: Bytes) -> Self {
61 let size = payload.len();
62 Self {
63 id: Arc::new(id),
64 payload: payload.clone(),
65 lazy_json: None,
66 metadata: MessageMetadata {
67 created_at: Timestamp::now(),
68 content_type: ContentType::Json,
69 size,
70 correlation_id: None,
71 },
72 }
73 }
74
75 pub fn from_json<T: Serialize>(id: MessageId, value: &T) -> Result<Self> {
77 let mut buffer = BytesMut::with_capacity(1024);
79
80 serde_json::to_writer((&mut buffer).writer(), value)
82 .map_err(|e| Error::serialization(e.to_string()))?;
83
84 let payload = buffer.freeze();
85 let size = payload.len();
86
87 Ok(Self {
88 id: Arc::new(id),
89 payload,
90 lazy_json: None,
91 metadata: MessageMetadata {
92 created_at: Timestamp::now(),
93 content_type: ContentType::Json,
94 size,
95 correlation_id: None,
96 },
97 })
98 }
99
100 #[inline]
102 pub fn parse_json_lazy(&mut self) -> Result<&RawValue> {
103 if self.lazy_json.is_none() {
104 let raw: Box<RawValue> = serde_json::from_slice(&self.payload)
106 .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))?;
107
108 self.lazy_json = Some(raw);
110 }
111
112 Ok(self.lazy_json.as_ref().unwrap())
113 }
114
115 #[inline]
117 pub fn deserialize<T: for<'de> Deserialize<'de>>(&self) -> Result<T> {
118 #[cfg(feature = "simd")]
119 {
120 if self.payload.len() >= 64 {
122 let mut buffer = self.payload.to_vec();
124 simd_json::from_slice(&mut buffer)
125 .map_err(|e| Error::serialization(format!("SIMD deserialize error: {}", e)))
126 } else {
127 serde_json::from_slice(&self.payload)
129 .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
130 }
131 }
132 #[cfg(not(feature = "simd"))]
133 {
134 serde_json::from_slice(&self.payload)
135 .map_err(|e| Error::serialization(format!("Deserialization error: {}", e)))
136 }
137 }
138
139 #[inline]
141 pub fn payload_slice(&self) -> &[u8] {
142 &self.payload
143 }
144
145 #[inline]
147 pub fn cheap_clone(&self) -> Self {
148 Self {
149 id: Arc::clone(&self.id),
150 payload: self.payload.clone(), lazy_json: self.lazy_json.clone(),
152 metadata: self.metadata.clone(),
153 }
154 }
155}
156
157#[derive(Debug)]
159pub struct BufferPool {
160 buffers: crossbeam::queue::ArrayQueue<BytesMut>,
162 capacity: usize,
164}
165
166impl BufferPool {
167 pub fn new(size: usize, capacity: usize) -> Self {
169 let buffers = crossbeam::queue::ArrayQueue::new(size);
170
171 for _ in 0..size {
173 let _ = buffers.push(BytesMut::with_capacity(capacity));
174 }
175
176 Self { buffers, capacity }
177 }
178
179 #[inline]
181 pub fn acquire(&self) -> BytesMut {
182 self.buffers
183 .pop()
184 .unwrap_or_else(|| BytesMut::with_capacity(self.capacity))
185 }
186
187 #[inline]
189 pub fn release(&self, mut buffer: BytesMut) {
190 buffer.clear();
191 let _ = self.buffers.push(buffer);
192 }
193}
194
195#[derive(Debug)]
197pub struct MessageBatch {
198 pub buffer: Bytes,
200 pub messages: Vec<(usize, usize)>,
202 pub ids: Vec<Arc<MessageId>>,
204}
205
206impl MessageBatch {
207 pub fn new(capacity: usize) -> Self {
209 Self {
210 buffer: Bytes::new(),
211 messages: Vec::with_capacity(capacity),
212 ids: Vec::with_capacity(capacity),
213 }
214 }
215
216 pub fn add(&mut self, id: MessageId, payload: Bytes) {
218 let offset = self.buffer.len();
219 let length = payload.len();
220
221 let mut buffer = BytesMut::from(self.buffer.as_ref());
223 buffer.extend_from_slice(&payload);
224 self.buffer = buffer.freeze();
225
226 self.messages.push((offset, length));
228 self.ids.push(Arc::new(id));
229 }
230
231 #[inline]
233 pub fn get(&self, index: usize) -> Option<Bytes> {
234 self.messages
235 .get(index)
236 .map(|(offset, length)| self.buffer.slice(*offset..*offset + *length))
237 }
238
239 pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, Bytes)> + '_ {
241 self.ids
242 .iter()
243 .zip(self.messages.iter())
244 .map(move |(id, (offset, length))| (id, self.buffer.slice(*offset..*offset + *length)))
245 }
246}
247
248pub mod fast {
250 #[inline]
252 pub fn validate_utf8_fast(bytes: &[u8]) -> bool {
253 #[cfg(feature = "simd")]
254 {
255 if bytes.len() >= 64 {
257 simdutf8::basic::from_utf8(bytes).is_ok()
258 } else {
259 std::str::from_utf8(bytes).is_ok()
260 }
261 }
262 #[cfg(not(feature = "simd"))]
263 {
264 std::str::from_utf8(bytes).is_ok()
265 }
266 }
267
268 #[inline]
270 pub fn find_json_boundaries(bytes: &[u8]) -> Vec<usize> {
271 let mut boundaries = Vec::new();
272 let mut depth = 0;
273 let mut in_string = false;
274 let mut escaped = false;
275
276 for (i, &byte) in bytes.iter().enumerate() {
278 if escaped {
279 escaped = false;
280 continue;
281 }
282
283 match byte {
284 b'\\' if in_string => escaped = true,
285 b'"' if !escaped => in_string = !in_string,
286 b'{' | b'[' if !in_string => depth += 1,
287 b'}' | b']' if !in_string => {
288 depth -= 1;
289 if depth == 0 {
290 boundaries.push(i + 1);
291 }
292 }
293 _ => {}
294 }
295 }
296
297 boundaries
298 }
299
300 #[cfg(feature = "simd")]
302 #[inline]
303 pub fn validate_json_fast(bytes: &[u8]) -> bool {
304 if bytes.len() >= 64 {
305 let mut owned = bytes.to_vec();
307 simd_json::to_borrowed_value(&mut owned).is_ok()
308 } else {
309 serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
311 }
312 }
313
314 #[cfg(not(feature = "simd"))]
315 #[inline]
316 pub fn validate_json_fast(bytes: &[u8]) -> bool {
317 serde_json::from_slice::<serde_json::Value>(bytes).is_ok()
318 }
319}
320
321#[cfg(feature = "mmap")]
323pub mod mmap {
324 use super::*;
325 use memmap2::{Mmap, MmapOptions};
326 use std::fs::File;
327 use std::io;
328 use std::ops::Deref;
329
330 #[derive(Debug)]
332 pub struct MmapMessage {
333 pub id: Arc<MessageId>,
335 pub mmap: Arc<Mmap>,
337 pub offset: usize,
339 pub length: usize,
341 pub metadata: MessageMetadata,
343 }
344
345 impl MmapMessage {
346 pub fn from_file(
348 id: MessageId,
349 path: &Path,
350 offset: usize,
351 length: Option<usize>,
352 ) -> io::Result<Self> {
353 let file = File::open(path)?;
354 let metadata = file.metadata()?;
355 let file_size = metadata.len() as usize;
356
357 if offset >= file_size {
359 return Err(io::Error::new(
360 io::ErrorKind::InvalidInput,
361 "Offset exceeds file size",
362 ));
363 }
364
365 let actual_length = length.unwrap_or(file_size - offset);
367 let actual_length = actual_length.min(file_size - offset);
368
369 let mmap = unsafe { MmapOptions::new().map(&file)? };
373
374 Ok(Self {
375 id: Arc::new(id),
376 mmap: Arc::new(mmap),
377 offset,
378 length: actual_length,
379 metadata: MessageMetadata {
380 created_at: Timestamp::now(),
381 content_type: ContentType::Json,
382 size: actual_length,
383 correlation_id: None,
384 },
385 })
386 }
387
388 #[inline]
390 pub fn data(&self) -> &[u8] {
391 &self.mmap[self.offset..self.offset + self.length]
392 }
393
394 #[inline]
396 pub fn to_bytes(&self) -> Bytes {
397 Bytes::copy_from_slice(self.data())
398 }
399
400 pub fn parse_json<T>(&self) -> Result<T>
403 where
404 T: for<'de> Deserialize<'de>,
405 {
406 serde_json::from_slice(self.data())
407 .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
408 }
409
410 pub fn as_str(&self) -> Result<&str> {
413 std::str::from_utf8(self.data())
414 .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
415 }
416 }
417
418 #[derive(Debug)]
420 pub struct MmapPool {
421 maps: dashmap::DashMap<std::path::PathBuf, Arc<Mmap>>,
423 max_size: usize,
425 }
426
427 impl MmapPool {
428 pub fn new(max_size: usize) -> Self {
430 Self {
431 maps: dashmap::DashMap::new(),
432 max_size,
433 }
434 }
435
436 pub fn get_or_create(&self, path: &Path) -> io::Result<Arc<Mmap>> {
438 if let Some(mmap) = self.maps.get(path) {
440 return Ok(Arc::clone(&*mmap));
441 }
442
443 let file = File::open(path)?;
445 let mmap = unsafe { MmapOptions::new().map(&file)? };
448 let mmap = Arc::new(mmap);
449
450 if self.maps.len() < self.max_size {
452 self.maps.insert(path.to_path_buf(), Arc::clone(&mmap));
453 }
454
455 Ok(mmap)
456 }
457
458 pub fn clear(&self) {
460 self.maps.clear();
461 }
462
463 pub fn size(&self) -> usize {
465 self.maps.len()
466 }
467 }
468
469 #[derive(Debug)]
471 pub struct MmapBatch {
472 mmap: Arc<Mmap>,
474 messages: Vec<(usize, usize)>,
476 ids: Vec<Arc<MessageId>>,
478 }
479
480 impl MmapBatch {
481 pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
483 let file = File::open(path)?;
484 let mmap = unsafe { MmapOptions::new().map(&file)? };
487
488 let mut messages = Vec::new();
489 let mut ids = Vec::new();
490 let mut offset = 0;
491
492 for (idx, line) in mmap.split(|&b| b == b'\n').enumerate() {
494 if !line.is_empty() {
495 messages.push((offset, line.len()));
496 ids.push(Arc::new(MessageId::Number(idx as i64)));
497 }
498 offset += line.len() + 1; }
500
501 Ok(Self {
502 mmap: Arc::new(mmap),
503 messages,
504 ids,
505 })
506 }
507
508 #[inline]
510 pub fn get(&self, index: usize) -> Option<&[u8]> {
511 self.messages
512 .get(index)
513 .map(|(offset, length)| &self.mmap[*offset..*offset + *length])
514 }
515
516 pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
518 self.ids
519 .iter()
520 .zip(self.messages.iter())
521 .map(move |(id, (offset, length))| {
522 (id, &self.mmap.deref()[*offset..*offset + *length])
523 })
524 }
525
526 pub fn len(&self) -> usize {
528 self.messages.len()
529 }
530
531 pub fn is_empty(&self) -> bool {
533 self.messages.is_empty()
534 }
535 }
536}
537
538#[cfg(not(feature = "mmap"))]
540pub mod mmap {
541 use super::*;
542 use std::fs;
543 use std::io;
544
545 #[derive(Debug)]
547 pub struct MmapMessage {
548 pub id: Arc<MessageId>,
550 pub data: Bytes,
552 pub metadata: MessageMetadata,
554 }
555
556 impl MmapMessage {
557 pub fn from_file(
559 id: MessageId,
560 path: &Path,
561 offset: usize,
562 length: Option<usize>,
563 ) -> io::Result<Self> {
564 let data = fs::read(path)?;
565 let file_size = data.len();
566
567 if offset >= file_size {
568 return Err(io::Error::new(
569 io::ErrorKind::InvalidInput,
570 "Offset exceeds file size",
571 ));
572 }
573
574 let actual_length = length.unwrap_or(file_size - offset);
575 let actual_length = actual_length.min(file_size - offset);
576
577 let data = Bytes::copy_from_slice(&data[offset..offset + actual_length]);
578
579 Ok(Self {
580 id: Arc::new(id),
581 data: data.clone(),
582 metadata: MessageMetadata {
583 created_at: Timestamp::now(),
584 content_type: ContentType::Json,
585 size: actual_length,
586 correlation_id: None,
587 },
588 })
589 }
590
591 #[inline]
593 pub fn data(&self) -> &[u8] {
594 &self.data
595 }
596
597 #[inline]
599 pub fn to_bytes(&self) -> Bytes {
600 self.data.clone()
601 }
602
603 pub fn parse_json<T>(&self) -> Result<T>
605 where
606 T: for<'de> Deserialize<'de>,
607 {
608 serde_json::from_slice(&self.data)
609 .map_err(|e| Error::serialization(format!("JSON parse error: {}", e)))
610 }
611
612 pub fn as_str(&self) -> Result<&str> {
614 std::str::from_utf8(&self.data)
615 .map_err(|e| Error::serialization(format!("Invalid UTF-8: {}", e)))
616 }
617 }
618
619 #[derive(Debug)]
621 pub struct MmapPool {
622 cache: dashmap::DashMap<std::path::PathBuf, Bytes>,
623 max_size: usize,
624 }
625
626 impl MmapPool {
627 pub fn new(max_size: usize) -> Self {
629 Self {
630 cache: dashmap::DashMap::new(),
631 max_size,
632 }
633 }
634
635 pub fn get_or_create(&self, path: &Path) -> io::Result<Bytes> {
637 if let Some(data) = self.cache.get(path) {
638 return Ok(data.clone());
639 }
640
641 let data = fs::read(path)?;
642 let bytes = Bytes::from(data);
643
644 if self.cache.len() < self.max_size {
645 self.cache.insert(path.to_path_buf(), bytes.clone());
646 }
647
648 Ok(bytes)
649 }
650
651 pub fn clear(&self) {
653 self.cache.clear();
654 }
655
656 pub fn size(&self) -> usize {
658 self.cache.len()
659 }
660 }
661
662 #[derive(Debug)]
664 pub struct MmapBatch {
665 data: Bytes,
666 messages: Vec<(usize, usize)>,
667 ids: Vec<Arc<MessageId>>,
668 }
669
670 impl MmapBatch {
671 pub fn from_jsonl_file(path: &Path) -> io::Result<Self> {
673 let data = fs::read(path)?;
674 let mut messages = Vec::new();
675 let mut ids = Vec::new();
676 let mut offset = 0;
677
678 for (idx, line) in data.split(|&b| b == b'\n').enumerate() {
679 if !line.is_empty() {
680 messages.push((offset, line.len()));
681 ids.push(Arc::new(MessageId::Number(idx as i64)));
682 }
683 offset += line.len() + 1;
684 }
685
686 Ok(Self {
687 data: Bytes::from(data),
688 messages,
689 ids,
690 })
691 }
692
693 #[inline]
695 pub fn get(&self, index: usize) -> Option<&[u8]> {
696 self.messages
697 .get(index)
698 .map(|(offset, length)| &self.data[*offset..*offset + *length])
699 }
700
701 pub fn iter(&self) -> impl Iterator<Item = (&Arc<MessageId>, &[u8])> + '_ {
703 self.ids
704 .iter()
705 .zip(self.messages.iter())
706 .map(move |(id, (offset, length))| (id, &self.data[*offset..*offset + *length]))
707 }
708
709 pub fn len(&self) -> usize {
711 self.messages.len()
712 }
713
714 pub fn is_empty(&self) -> bool {
716 self.messages.is_empty()
717 }
718 }
719}
720
721impl From<String> for MessageId {
722 fn from(s: String) -> Self {
723 Self::String(Arc::from(s))
724 }
725}
726
727impl From<&str> for MessageId {
728 fn from(s: &str) -> Self {
729 Self::String(Arc::from(s))
730 }
731}
732
733impl From<i64> for MessageId {
734 fn from(n: i64) -> Self {
735 Self::Number(n)
736 }
737}
738
739impl From<Uuid> for MessageId {
740 fn from(u: Uuid) -> Self {
741 Self::Uuid(u)
742 }
743}
744
745impl fmt::Display for MessageId {
746 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
747 match self {
748 Self::String(s) => write!(f, "{}", s),
749 Self::Number(n) => write!(f, "{}", n),
750 Self::Uuid(u) => write!(f, "{}", u),
751 }
752 }
753}
754
755#[cfg(test)]
756mod tests {
757 use super::*;
758
759 #[test]
760 fn test_zero_copy_message_creation() {
761 let payload = Bytes::from(r#"{"test": "data"}"#);
762 let msg = ZeroCopyMessage::from_bytes(MessageId::from("test-1"), payload.clone());
763
764 assert_eq!(msg.payload, payload);
765 assert_eq!(msg.metadata.size, payload.len());
766 }
767
768 #[test]
769 fn test_lazy_json_parsing() {
770 let payload = Bytes::from(r#"{"key": "value", "number": 42}"#);
771 let mut msg = ZeroCopyMessage::from_bytes(MessageId::from("test-2"), payload);
772
773 let raw = msg.parse_json_lazy().unwrap();
775 assert!(raw.get().contains("value"));
776
777 assert!(msg.lazy_json.is_some());
779 }
780
781 #[test]
782 fn test_buffer_pool() {
783 let pool = BufferPool::new(2, 1024);
784
785 let buf1 = pool.acquire();
786 let buf2 = pool.acquire();
787 let buf3 = pool.acquire(); assert_eq!(buf1.capacity(), 1024);
790 assert_eq!(buf2.capacity(), 1024);
791 assert_eq!(buf3.capacity(), 1024);
792
793 pool.release(buf1);
794 let buf4 = pool.acquire(); assert_eq!(buf4.capacity(), 1024);
796 }
797
798 #[test]
799 fn test_message_batch() {
800 let mut batch = MessageBatch::new(10);
801
802 batch.add(MessageId::from("msg1"), Bytes::from("data1"));
803 batch.add(MessageId::from("msg2"), Bytes::from("data2"));
804 batch.add(MessageId::from("msg3"), Bytes::from("data3"));
805
806 assert_eq!(batch.messages.len(), 3);
807
808 let msg1 = batch.get(0).unwrap();
809 assert_eq!(msg1, Bytes::from("data1"));
810
811 let msg2 = batch.get(1).unwrap();
812 assert_eq!(msg2, Bytes::from("data2"));
813
814 let mut count = 0;
816 for (_id, payload) in batch.iter() {
817 count += 1;
818 assert!(!payload.is_empty());
819 }
820 assert_eq!(count, 3);
821 }
822
823 #[test]
824 fn test_cheap_clone() {
825 let msg = ZeroCopyMessage::from_bytes(MessageId::from("test"), Bytes::from("data"));
826
827 let cloned = msg.cheap_clone();
828
829 assert!(Arc::ptr_eq(&msg.id, &cloned.id));
831 assert_eq!(msg.payload, cloned.payload);
832 }
833
834 #[test]
835 fn test_mmap_message() {
836 use std::io::Write;
837
838 let temp_dir = std::env::temp_dir();
840 let test_file = temp_dir.join("test_mmap.json");
841 let mut file = std::fs::File::create(&test_file).unwrap();
842 let test_data = r#"{"test": "data", "value": 42}"#;
843 file.write_all(test_data.as_bytes()).unwrap();
844 file.sync_all().unwrap();
845 drop(file);
846
847 let msg = mmap::MmapMessage::from_file(MessageId::from("mmap-test"), &test_file, 0, None)
849 .unwrap();
850
851 assert_eq!(msg.data(), test_data.as_bytes());
852 assert_eq!(msg.as_str().unwrap(), test_data);
853
854 let value: serde_json::Value = msg.parse_json().unwrap();
856 assert_eq!(value["test"], "data");
857 assert_eq!(value["value"], 42);
858
859 std::fs::remove_file(test_file).unwrap();
861 }
862
863 #[test]
864 fn test_mmap_batch() {
865 use std::io::Write;
866
867 let temp_dir = std::env::temp_dir();
869 let test_file = temp_dir.join("test_batch.jsonl");
870 let mut file = std::fs::File::create(&test_file).unwrap();
871 writeln!(file, r#"{{"id": 1, "name": "first"}}"#).unwrap();
872 writeln!(file, r#"{{"id": 2, "name": "second"}}"#).unwrap();
873 writeln!(file, r#"{{"id": 3, "name": "third"}}"#).unwrap();
874 file.sync_all().unwrap();
875 drop(file);
876
877 let batch = mmap::MmapBatch::from_jsonl_file(&test_file).unwrap();
879
880 assert_eq!(batch.len(), 3);
881 assert!(!batch.is_empty());
882
883 let msg1 = batch.get(0).unwrap();
885 let value: serde_json::Value = serde_json::from_slice(msg1).unwrap();
886 assert_eq!(value["id"], 1);
887 assert_eq!(value["name"], "first");
888
889 let mut count = 0;
891 for (_id, data) in batch.iter() {
892 let value: serde_json::Value = serde_json::from_slice(data).unwrap();
893 assert!(value["id"].is_number());
894 assert!(value["name"].is_string());
895 count += 1;
896 }
897 assert_eq!(count, 3);
898
899 std::fs::remove_file(test_file).unwrap();
901 }
902
903 #[test]
904 fn test_mmap_pool() {
905 use std::io::Write;
906
907 let temp_dir = std::env::temp_dir();
909 let test_file1 = temp_dir.join("pool_test1.json");
910 let test_file2 = temp_dir.join("pool_test2.json");
911
912 let mut file1 = std::fs::File::create(&test_file1).unwrap();
913 file1.write_all(b"test1").unwrap();
914 file1.sync_all().unwrap();
915
916 let mut file2 = std::fs::File::create(&test_file2).unwrap();
917 file2.write_all(b"test2").unwrap();
918 file2.sync_all().unwrap();
919
920 let pool = mmap::MmapPool::new(10);
922
923 assert_eq!(pool.size(), 0);
924
925 let _data1 = pool.get_or_create(&test_file1).unwrap();
926 assert_eq!(pool.size(), 1);
927
928 let _data2 = pool.get_or_create(&test_file2).unwrap();
929 assert_eq!(pool.size(), 2);
930
931 let _data1_again = pool.get_or_create(&test_file1).unwrap();
933 assert_eq!(pool.size(), 2); pool.clear();
936 assert_eq!(pool.size(), 0);
937
938 std::fs::remove_file(test_file1).unwrap();
940 std::fs::remove_file(test_file2).unwrap();
941 }
942}