1use flate2::Compression;
44use flate2::write::GzEncoder;
45use std::collections::HashMap;
46use std::fmt;
47use std::sync::Arc;
48
49use bytes::{Bytes, BytesMut};
50use serde::{Deserialize, Serialize};
51use uuid::Uuid;
52
53#[cfg(feature = "messagepack")]
54use msgpacker::Packable;
55
56use crate::error::{Error, Result};
57use crate::types::{ContentType, ProtocolVersion, Timestamp};
58
59#[cfg(feature = "messagepack")]
61#[derive(Debug, Clone)]
62pub enum JsonValue {
63 Null,
65 Bool(bool),
67 Number(f64),
69 String(String),
71 Array(Vec<JsonValue>),
73 Object(std::collections::HashMap<String, JsonValue>),
75}
76
77#[cfg(feature = "messagepack")]
78impl JsonValue {
79 pub fn from_serde_json(value: &serde_json::Value) -> Self {
81 match value {
82 serde_json::Value::Null => JsonValue::Null,
83 serde_json::Value::Bool(b) => JsonValue::Bool(*b),
84 serde_json::Value::Number(n) => {
85 if let Some(i) = n.as_i64() {
86 JsonValue::Number(i as f64)
87 } else if let Some(u) = n.as_u64() {
88 JsonValue::Number(u as f64)
89 } else if let Some(f) = n.as_f64() {
90 JsonValue::Number(f)
91 } else {
92 JsonValue::Null
93 }
94 }
95 serde_json::Value::String(s) => JsonValue::String(s.clone()),
96 serde_json::Value::Array(arr) => {
97 JsonValue::Array(arr.iter().map(Self::from_serde_json).collect())
98 }
99 serde_json::Value::Object(obj) => {
100 let mut map = std::collections::HashMap::new();
101 for (k, v) in obj {
102 map.insert(k.clone(), Self::from_serde_json(v));
103 }
104 JsonValue::Object(map)
105 }
106 }
107 }
108}
109
110#[cfg(feature = "messagepack")]
111impl msgpacker::Packable for JsonValue {
112 fn pack<T>(&self, buf: &mut T) -> usize
113 where
114 T: Extend<u8>,
115 {
116 match self {
117 JsonValue::Null => {
118 buf.extend([0xc0]);
120 1
121 }
122 JsonValue::Bool(b) => b.pack(buf),
123 JsonValue::Number(n) => n.pack(buf),
124 JsonValue::String(s) => s.pack(buf),
125 JsonValue::Array(arr) => {
126 let len = arr.len();
128 let mut bytes_written = 0;
129
130 if len <= 15 {
132 buf.extend([0x90 + len as u8]);
133 bytes_written += 1;
134 } else if len <= u16::MAX as usize {
135 buf.extend([0xdc]);
136 buf.extend((len as u16).to_be_bytes());
137 bytes_written += 3;
138 } else {
139 buf.extend([0xdd]);
140 buf.extend((len as u32).to_be_bytes());
141 bytes_written += 5;
142 }
143
144 for item in arr {
146 bytes_written += item.pack(buf);
147 }
148
149 bytes_written
150 }
151 JsonValue::Object(obj) => {
152 let len = obj.len();
154 let mut bytes_written = 0;
155
156 if len <= 15 {
158 buf.extend([0x80 + len as u8]);
159 bytes_written += 1;
160 } else if len <= u16::MAX as usize {
161 buf.extend([0xde]);
162 buf.extend((len as u16).to_be_bytes());
163 bytes_written += 3;
164 } else {
165 buf.extend([0xdf]);
166 buf.extend((len as u32).to_be_bytes());
167 bytes_written += 5;
168 }
169
170 for (k, v) in obj {
172 bytes_written += k.pack(buf);
173 bytes_written += v.pack(buf);
174 }
175
176 bytes_written
177 }
178 }
179 }
180}
181
182#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
184#[serde(untagged)]
185pub enum MessageId {
186 String(String),
188 Number(i64),
190 Uuid(Uuid),
192}
193
194#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct MessageMetadata {
197 pub created_at: Timestamp,
199
200 pub protocol_version: ProtocolVersion,
202
203 pub encoding: Option<String>,
205
206 pub content_type: ContentType,
208
209 pub size: usize,
211
212 pub correlation_id: Option<String>,
214
215 pub headers: HashMap<String, String>,
217}
218
219#[derive(Debug, Clone)]
221pub struct Message {
222 pub id: MessageId,
224
225 pub metadata: MessageMetadata,
227
228 pub payload: MessagePayload,
230}
231
232#[derive(Debug, Clone)]
234pub enum MessagePayload {
235 Json(JsonPayload),
237
238 Binary(BinaryPayload),
240
241 Text(String),
243
244 Empty,
246}
247
248#[derive(Debug, Clone)]
250pub struct JsonPayload {
251 pub raw: Bytes,
253
254 pub parsed: Option<Arc<serde_json::Value>>,
256
257 pub is_valid: bool,
259}
260
261#[derive(Debug, Clone)]
263pub struct BinaryPayload {
264 pub data: Bytes,
266
267 pub format: BinaryFormat,
269}
270
271#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
273#[serde(rename_all = "lowercase")]
274pub enum BinaryFormat {
275 MessagePack,
277
278 ProtoBuf,
280
281 Cbor,
283
284 Custom,
286}
287
288#[derive(Debug)]
290pub struct MessageSerializer {
291 default_format: SerializationFormat,
293
294 enable_compression: bool,
296
297 compression_threshold: usize,
299}
300
301#[derive(Debug, Clone, Copy, PartialEq, Eq)]
303pub enum SerializationFormat {
304 Json,
306
307 #[cfg(feature = "simd")]
309 SimdJson,
310
311 MessagePack,
313
314 Cbor,
316}
317
318impl Message {
319 pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
325 let json_bytes = Self::serialize_json(&value)?;
326 let payload = MessagePayload::Json(JsonPayload {
327 raw: json_bytes.freeze(),
328 parsed: Some(Arc::new(serde_json::to_value(value)?)),
329 is_valid: true,
330 });
331
332 Ok(Self {
333 id,
334 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
335 payload,
336 })
337 }
338
339 pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
341 let size = data.len();
342 let payload = MessagePayload::Binary(BinaryPayload { data, format });
343
344 Self {
345 id,
346 metadata: MessageMetadata::new(ContentType::Binary, size),
347 payload,
348 }
349 }
350
351 #[must_use]
353 pub fn text(id: MessageId, text: String) -> Self {
354 let size = text.len();
355 let payload = MessagePayload::Text(text);
356
357 Self {
358 id,
359 metadata: MessageMetadata::new(ContentType::Text, size),
360 payload,
361 }
362 }
363
364 #[must_use]
366 pub fn empty(id: MessageId) -> Self {
367 Self {
368 id,
369 metadata: MessageMetadata::new(ContentType::Json, 0),
370 payload: MessagePayload::Empty,
371 }
372 }
373
374 pub const fn size(&self) -> usize {
376 self.metadata.size
377 }
378
379 pub const fn is_empty(&self) -> bool {
381 matches!(self.payload, MessagePayload::Empty)
382 }
383
384 pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
390 match format {
391 SerializationFormat::Json => self.serialize_json_format(),
392 #[cfg(feature = "simd")]
393 SerializationFormat::SimdJson => self.serialize_simd_json(),
394 SerializationFormat::MessagePack => self.serialize_messagepack(),
395 SerializationFormat::Cbor => self.serialize_cbor(),
396 }
397 }
398
399 pub fn deserialize(bytes: Bytes) -> Result<Self> {
405 let format = Self::detect_format(&bytes);
407 Self::deserialize_with_format(bytes, format)
408 }
409
410 pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
412 match format {
413 SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
414 #[cfg(feature = "simd")]
415 SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
416 SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
417 SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
418 }
419 }
420
421 pub fn parse_json<T>(&self) -> Result<T>
423 where
424 T: for<'de> Deserialize<'de>,
425 {
426 match &self.payload {
427 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
428 || {
429 #[cfg(feature = "simd")]
430 {
431 let mut json_bytes = json_payload.raw.to_vec();
432 simd_json::from_slice(&mut json_bytes).map_err(|e| {
433 Error::serialization(format!("SIMD JSON parsing failed: {e}"))
434 })
435 }
436 #[cfg(not(feature = "simd"))]
437 {
438 serde_json::from_slice(&json_payload.raw).map_err(|e| {
439 Error::serialization(format!("JSON parsing failed: {}", e))
440 })
441 }
442 },
443 |parsed| {
444 serde_json::from_value((**parsed).clone())
445 .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
446 },
447 ),
448 _ => Err(Error::validation("Message payload is not JSON")),
449 }
450 }
451
452 fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
455 #[cfg(feature = "simd")]
456 {
457 sonic_rs::to_vec(value)
458 .map(|v| BytesMut::from(v.as_slice()))
459 .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
460 }
461 #[cfg(not(feature = "simd"))]
462 {
463 serde_json::to_vec(value)
464 .map(|v| BytesMut::from(v.as_slice()))
465 .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
466 }
467 }
468
469 fn serialize_json_format(&self) -> Result<Bytes> {
470 match &self.payload {
471 MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
472 MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
473 MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
474 MessagePayload::Binary(_) => Err(Error::validation(
475 "Cannot serialize non-JSON payload as JSON",
476 )),
477 }
478 }
479
480 #[cfg(feature = "simd")]
481 fn serialize_simd_json(&self) -> Result<Bytes> {
482 match &self.payload {
483 MessagePayload::Json(json_payload) => {
484 if json_payload.is_valid {
485 Ok(json_payload.raw.clone())
486 } else {
487 Err(Error::serialization("Invalid JSON payload"))
488 }
489 }
490 _ => Err(Error::validation(
491 "Cannot serialize non-JSON payload with SIMD JSON",
492 )),
493 }
494 }
495
496 fn serialize_messagepack(&self) -> Result<Bytes> {
497 #[cfg(feature = "messagepack")]
498 {
499 match &self.payload {
500 MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
501 Ok(binary.data.clone())
502 }
503 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
504 || {
505 Err(Error::serialization(
506 "Cannot serialize unparsed JSON to MessagePack",
507 ))
508 },
509 |parsed| {
510 let packable_value = JsonValue::from_serde_json(parsed.as_ref());
512 let mut buffer = Vec::new();
513 packable_value.pack(&mut buffer);
514 Ok(Bytes::from(buffer))
515 },
516 ),
517 _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
518 }
519 }
520 #[cfg(not(feature = "messagepack"))]
521 {
522 let _ = self; Err(Error::validation("MessagePack serialization not available"))
524 }
525 }
526
527 fn serialize_cbor(&self) -> Result<Bytes> {
528 match &self.payload {
529 MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
530 Ok(binary.data.clone())
531 }
532 MessagePayload::Json(json_payload) => {
533 if let Some(parsed) = &json_payload.parsed {
534 {
535 let mut buffer = Vec::new();
536 ciborium::into_writer(parsed.as_ref(), &mut buffer)
537 .map(|_| Bytes::from(buffer))
538 .map_err(|e| {
539 Error::serialization(format!("CBOR serialization failed: {e}"))
540 })
541 }
542 } else {
543 #[cfg(feature = "simd")]
545 {
546 let mut json_bytes = json_payload.raw.to_vec();
547 let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
548 .map_err(|e| {
549 Error::serialization(format!(
550 "SIMD JSON parsing failed before CBOR: {e}"
551 ))
552 })?;
553 {
554 let mut buffer = Vec::new();
555 ciborium::into_writer(&value, &mut buffer)
556 .map(|_| Bytes::from(buffer))
557 .map_err(|e| {
558 Error::serialization(format!("CBOR serialization failed: {e}"))
559 })
560 }
561 }
562 #[cfg(not(feature = "simd"))]
563 {
564 let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
565 .map_err(|e| {
566 Error::serialization(format!(
567 "JSON parsing failed before CBOR: {}",
568 e
569 ))
570 })?;
571 let mut buf = Vec::new();
572 ciborium::ser::into_writer(&value, &mut buf).map_err(|e| {
573 Error::serialization(format!("CBOR serialization failed: {}", e))
574 })?;
575 Ok(Bytes::from(buf))
576 }
577 }
578 }
579 _ => Err(Error::validation("Cannot serialize payload as CBOR")),
580 }
581 }
582
583 fn deserialize_json(bytes: Bytes) -> Self {
584 let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
586
587 let payload = MessagePayload::Json(JsonPayload {
588 raw: bytes,
589 parsed: None, is_valid,
591 });
592
593 Self {
594 id: MessageId::Uuid(Uuid::new_v4()),
595 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
596 payload,
597 }
598 }
599
600 #[cfg(feature = "simd")]
601 fn deserialize_simd_json(bytes: Bytes) -> Self {
602 let mut json_bytes = bytes.to_vec();
603 let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
604
605 let payload = MessagePayload::Json(JsonPayload {
606 raw: bytes,
607 parsed: None,
608 is_valid,
609 });
610
611 Self {
612 id: MessageId::Uuid(Uuid::new_v4()),
613 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
614 payload,
615 }
616 }
617
618 fn deserialize_messagepack(bytes: Bytes) -> Self {
619 let payload = MessagePayload::Binary(BinaryPayload {
620 data: bytes,
621 format: BinaryFormat::MessagePack,
622 });
623
624 Self {
625 id: MessageId::Uuid(Uuid::new_v4()),
626 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
627 payload,
628 }
629 }
630
631 fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
632 if let Ok(value) = ciborium::from_reader::<serde_json::Value, _>(&bytes[..]) {
634 let raw = serde_json::to_vec(&value)
635 .map(Bytes::from)
636 .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
637 let payload = MessagePayload::Json(JsonPayload {
638 raw,
639 parsed: Some(Arc::new(value)),
640 is_valid: true,
641 });
642 return Ok(Self {
643 id: MessageId::Uuid(Uuid::new_v4()),
644 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
645 payload,
646 });
647 }
648
649 let payload = MessagePayload::Binary(BinaryPayload {
651 data: bytes,
652 format: BinaryFormat::Cbor,
653 });
654 Ok(Self {
655 id: MessageId::Uuid(Uuid::new_v4()),
656 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
657 payload,
658 })
659 }
660
661 fn detect_format(bytes: &[u8]) -> SerializationFormat {
662 if bytes.is_empty() {
663 return SerializationFormat::Json;
664 }
665
666 if matches!(bytes[0], b'{' | b'[') {
668 #[cfg(feature = "simd")]
669 {
670 return SerializationFormat::SimdJson;
671 }
672 #[cfg(not(feature = "simd"))]
673 {
674 return SerializationFormat::Json;
675 }
676 }
677
678 if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
680 return SerializationFormat::MessagePack;
681 }
682
683 #[cfg(feature = "simd")]
685 {
686 SerializationFormat::SimdJson
687 }
688 #[cfg(not(feature = "simd"))]
689 {
690 SerializationFormat::Json
691 }
692 }
693}
694
695impl MessagePayload {
696 pub const fn size(&self) -> usize {
698 match self {
699 Self::Json(json) => json.raw.len(),
700 Self::Binary(binary) => binary.data.len(),
701 Self::Text(text) => text.len(),
702 Self::Empty => 0,
703 }
704 }
705}
706
707impl MessageMetadata {
708 #[must_use]
710 pub fn new(content_type: ContentType, size: usize) -> Self {
711 Self {
712 created_at: Timestamp::now(),
713 protocol_version: crate::PROTOCOL_VERSION.to_string(),
714 encoding: None,
715 content_type,
716 size,
717 correlation_id: None,
718 headers: HashMap::new(),
719 }
720 }
721
722 #[must_use]
724 pub fn with_header(mut self, key: String, value: String) -> Self {
725 self.headers.insert(key, value);
726 self
727 }
728
729 #[must_use]
731 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
732 self.correlation_id = Some(correlation_id);
733 self
734 }
735
736 #[must_use]
738 pub fn with_encoding(mut self, encoding: String) -> Self {
739 self.encoding = Some(encoding);
740 self
741 }
742}
743
744impl MessageSerializer {
745 #[must_use]
747 pub const fn new() -> Self {
748 Self {
749 default_format: SerializationFormat::Json,
750 enable_compression: false,
751 compression_threshold: 1024, }
753 }
754
755 #[must_use]
757 pub const fn with_format(mut self, format: SerializationFormat) -> Self {
758 self.default_format = format;
759 self
760 }
761
762 #[must_use]
764 pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
765 self.enable_compression = enable;
766 self.compression_threshold = threshold;
767 self
768 }
769
770 pub fn serialize(&self, message: &mut Message) -> Result<Bytes> {
772 let serialized = message.serialize(self.default_format)?;
773
774 if self.enable_compression && serialized.len() > self.compression_threshold {
776 message.metadata.encoding = Some("gzip".to_string()); Ok(self.compress(serialized))
778 } else {
779 Ok(serialized)
780 }
781 }
782
783 fn compress(&self, data: Bytes) -> Bytes {
786 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
787 if let Err(e) = std::io::Write::write_all(&mut encoder, &data) {
788 eprintln!("Failed to compress data: {}", e);
789 return data; }
791 match encoder.finish() {
792 Ok(compressed_data) => Bytes::from(compressed_data),
793 Err(e) => {
794 eprintln!("Failed to finish compression: {}", e);
795 data }
797 }
798 }
799}
800
801impl Default for MessageSerializer {
802 fn default() -> Self {
803 Self::new()
804 }
805}
806
807impl fmt::Display for MessageId {
808 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
809 match self {
810 Self::String(s) => write!(f, "{s}"),
811 Self::Number(n) => write!(f, "{n}"),
812 Self::Uuid(u) => write!(f, "{u}"),
813 }
814 }
815}
816
817impl From<String> for MessageId {
818 fn from(s: String) -> Self {
819 Self::String(s)
820 }
821}
822
823impl From<&str> for MessageId {
824 fn from(s: &str) -> Self {
825 Self::String(s.to_string())
826 }
827}
828
829impl From<i64> for MessageId {
830 fn from(n: i64) -> Self {
831 Self::Number(n)
832 }
833}
834
835impl From<Uuid> for MessageId {
836 fn from(u: Uuid) -> Self {
837 Self::Uuid(u)
838 }
839}
840
841#[cfg(test)]
842mod tests {
843 use super::*;
844 use serde_json::json;
845
846 #[test]
847 fn test_message_creation() {
848 let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
849 assert_eq!(message.id.to_string(), "test");
850 assert!(!message.is_empty());
851 }
852
853 #[test]
854 fn test_message_serialization() {
855 let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
856 let serialized = message.serialize(SerializationFormat::Json).unwrap();
857 assert!(!serialized.is_empty());
858 }
859
860 #[derive(Deserialize, PartialEq, Debug)]
861 struct TestData {
862 number: i32,
863 }
864
865 #[test]
866 fn test_message_parsing() {
867 let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
868
869 let parsed: TestData = message.parse_json().unwrap();
870 assert_eq!(parsed.number, 42);
871 }
872
873 #[test]
874 fn test_format_detection() {
875 let json_bytes = Bytes::from(r#"{"test": true}"#);
876 let format = Message::detect_format(&json_bytes);
877
878 #[cfg(feature = "simd")]
879 assert_eq!(format, SerializationFormat::SimdJson);
880 #[cfg(not(feature = "simd"))]
881 assert_eq!(format, SerializationFormat::Json);
882 }
883
884 #[test]
885 fn test_message_metadata() {
886 let metadata = MessageMetadata::new(ContentType::Json, 100)
887 .with_header("custom".to_string(), "value".to_string())
888 .with_correlation_id("corr-123".to_string());
889
890 assert_eq!(metadata.size, 100);
891 assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
892 assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
893 }
894
895 #[test]
896 fn test_message_serializer_compression() {
897 use flate2::read::GzDecoder;
898 use std::io::Read;
899
900 let serializer = MessageSerializer::new().with_compression(true, 10); let large_json = json!({
903 "data": "a".repeat(100), });
905 let mut message =
906 Message::json(MessageId::from("compressed_test"), large_json.clone()).unwrap();
907
908 let original_size = message.size();
909 assert!(
910 original_size > 10,
911 "Original message size should be greater than compression threshold"
912 );
913
914 let compressed_bytes = serializer.serialize(&mut message).unwrap();
915
916 assert_eq!(message.metadata.encoding, Some("gzip".to_string()));
918
919 assert!(
921 compressed_bytes.len() < original_size,
922 "Compressed size should be smaller than original"
923 );
924
925 let mut decoder = GzDecoder::new(&compressed_bytes[..]);
927 let mut decompressed_data = Vec::new();
928 decoder.read_to_end(&mut decompressed_data).unwrap();
929
930 let decompressed_message = Message::deserialize(Bytes::from(decompressed_data)).unwrap();
931 let parsed_json: serde_json::Value = decompressed_message.parse_json().unwrap();
932
933 assert_eq!(parsed_json, large_json);
934 }
935}