1use std::collections::HashMap;
44use std::fmt;
45use std::sync::Arc;
46
47use bytes::{Bytes, BytesMut};
48use serde::{Deserialize, Serialize};
49use uuid::Uuid;
50
51#[cfg(feature = "messagepack")]
52use msgpacker::Packable;
53
54use crate::error::{Error, Result};
55use crate::types::{ContentType, ProtocolVersion, Timestamp};
56
57#[cfg(feature = "messagepack")]
59#[derive(Debug, Clone)]
60pub enum JsonValue {
61 Null,
63 Bool(bool),
65 Number(f64),
67 String(String),
69 Array(Vec<JsonValue>),
71 Object(std::collections::HashMap<String, JsonValue>),
73}
74
75#[cfg(feature = "messagepack")]
76impl JsonValue {
77 pub fn from_serde_json(value: &serde_json::Value) -> Self {
79 match value {
80 serde_json::Value::Null => JsonValue::Null,
81 serde_json::Value::Bool(b) => JsonValue::Bool(*b),
82 serde_json::Value::Number(n) => {
83 if let Some(i) = n.as_i64() {
84 JsonValue::Number(i as f64)
85 } else if let Some(u) = n.as_u64() {
86 JsonValue::Number(u as f64)
87 } else if let Some(f) = n.as_f64() {
88 JsonValue::Number(f)
89 } else {
90 JsonValue::Null
91 }
92 }
93 serde_json::Value::String(s) => JsonValue::String(s.clone()),
94 serde_json::Value::Array(arr) => {
95 JsonValue::Array(arr.iter().map(Self::from_serde_json).collect())
96 }
97 serde_json::Value::Object(obj) => {
98 let mut map = std::collections::HashMap::new();
99 for (k, v) in obj {
100 map.insert(k.clone(), Self::from_serde_json(v));
101 }
102 JsonValue::Object(map)
103 }
104 }
105 }
106}
107
108#[cfg(feature = "messagepack")]
109impl msgpacker::Packable for JsonValue {
110 fn pack<T>(&self, buf: &mut T) -> usize
111 where
112 T: Extend<u8>,
113 {
114 match self {
115 JsonValue::Null => {
116 buf.extend([0xc0]);
118 1
119 }
120 JsonValue::Bool(b) => b.pack(buf),
121 JsonValue::Number(n) => n.pack(buf),
122 JsonValue::String(s) => s.pack(buf),
123 JsonValue::Array(arr) => {
124 let len = arr.len();
126 let mut bytes_written = 0;
127
128 if len <= 15 {
130 buf.extend([0x90 + len as u8]);
131 bytes_written += 1;
132 } else if len <= u16::MAX as usize {
133 buf.extend([0xdc]);
134 buf.extend((len as u16).to_be_bytes());
135 bytes_written += 3;
136 } else {
137 buf.extend([0xdd]);
138 buf.extend((len as u32).to_be_bytes());
139 bytes_written += 5;
140 }
141
142 for item in arr {
144 bytes_written += item.pack(buf);
145 }
146
147 bytes_written
148 }
149 JsonValue::Object(obj) => {
150 let len = obj.len();
152 let mut bytes_written = 0;
153
154 if len <= 15 {
156 buf.extend([0x80 + len as u8]);
157 bytes_written += 1;
158 } else if len <= u16::MAX as usize {
159 buf.extend([0xde]);
160 buf.extend((len as u16).to_be_bytes());
161 bytes_written += 3;
162 } else {
163 buf.extend([0xdf]);
164 buf.extend((len as u32).to_be_bytes());
165 bytes_written += 5;
166 }
167
168 for (k, v) in obj {
170 bytes_written += k.pack(buf);
171 bytes_written += v.pack(buf);
172 }
173
174 bytes_written
175 }
176 }
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
182#[serde(untagged)]
183pub enum MessageId {
184 String(String),
186 Number(i64),
188 Uuid(Uuid),
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
194pub struct MessageMetadata {
195 pub created_at: Timestamp,
197
198 pub protocol_version: ProtocolVersion,
200
201 pub encoding: Option<String>,
203
204 pub content_type: ContentType,
206
207 pub size: usize,
209
210 pub correlation_id: Option<String>,
212
213 pub headers: HashMap<String, String>,
215}
216
217#[derive(Debug, Clone)]
219pub struct Message {
220 pub id: MessageId,
222
223 pub metadata: MessageMetadata,
225
226 pub payload: MessagePayload,
228}
229
230#[derive(Debug, Clone)]
232pub enum MessagePayload {
233 Json(JsonPayload),
235
236 Binary(BinaryPayload),
238
239 Text(String),
241
242 Empty,
244}
245
246#[derive(Debug, Clone)]
248pub struct JsonPayload {
249 pub raw: Bytes,
251
252 pub parsed: Option<Arc<serde_json::Value>>,
254
255 pub is_valid: bool,
257}
258
259#[derive(Debug, Clone)]
261pub struct BinaryPayload {
262 pub data: Bytes,
264
265 pub format: BinaryFormat,
267}
268
269#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
271#[serde(rename_all = "lowercase")]
272pub enum BinaryFormat {
273 MessagePack,
275
276 ProtoBuf,
278
279 Cbor,
281
282 Custom,
284}
285
286#[derive(Debug)]
288pub struct MessageSerializer {
289 default_format: SerializationFormat,
291
292 enable_compression: bool,
294
295 compression_threshold: usize,
297}
298
299#[derive(Debug, Clone, Copy, PartialEq, Eq)]
301pub enum SerializationFormat {
302 Json,
304
305 #[cfg(feature = "simd")]
307 SimdJson,
308
309 MessagePack,
311
312 Cbor,
314}
315
316impl Message {
317 pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
323 let json_bytes = Self::serialize_json(&value)?;
324 let payload = MessagePayload::Json(JsonPayload {
325 raw: json_bytes.freeze(),
326 parsed: Some(Arc::new(serde_json::to_value(value)?)),
327 is_valid: true,
328 });
329
330 Ok(Self {
331 id,
332 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
333 payload,
334 })
335 }
336
337 pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
339 let size = data.len();
340 let payload = MessagePayload::Binary(BinaryPayload { data, format });
341
342 Self {
343 id,
344 metadata: MessageMetadata::new(ContentType::Binary, size),
345 payload,
346 }
347 }
348
349 #[must_use]
351 pub fn text(id: MessageId, text: String) -> Self {
352 let size = text.len();
353 let payload = MessagePayload::Text(text);
354
355 Self {
356 id,
357 metadata: MessageMetadata::new(ContentType::Text, size),
358 payload,
359 }
360 }
361
362 #[must_use]
364 pub fn empty(id: MessageId) -> Self {
365 Self {
366 id,
367 metadata: MessageMetadata::new(ContentType::Json, 0),
368 payload: MessagePayload::Empty,
369 }
370 }
371
372 pub const fn size(&self) -> usize {
374 self.metadata.size
375 }
376
377 pub const fn is_empty(&self) -> bool {
379 matches!(self.payload, MessagePayload::Empty)
380 }
381
382 pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
388 match format {
389 SerializationFormat::Json => self.serialize_json_format(),
390 #[cfg(feature = "simd")]
391 SerializationFormat::SimdJson => self.serialize_simd_json(),
392 SerializationFormat::MessagePack => self.serialize_messagepack(),
393 SerializationFormat::Cbor => self.serialize_cbor(),
394 }
395 }
396
397 pub fn deserialize(bytes: Bytes) -> Result<Self> {
403 let format = Self::detect_format(&bytes);
405 Self::deserialize_with_format(bytes, format)
406 }
407
408 pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
410 match format {
411 SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
412 #[cfg(feature = "simd")]
413 SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
414 SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
415 SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
416 }
417 }
418
419 pub fn parse_json<T>(&self) -> Result<T>
421 where
422 T: for<'de> Deserialize<'de>,
423 {
424 match &self.payload {
425 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
426 || {
427 #[cfg(feature = "simd")]
428 {
429 let mut json_bytes = json_payload.raw.to_vec();
430 simd_json::from_slice(&mut json_bytes).map_err(|e| {
431 Error::serialization(format!("SIMD JSON parsing failed: {e}"))
432 })
433 }
434 #[cfg(not(feature = "simd"))]
435 {
436 serde_json::from_slice(&json_payload.raw).map_err(|e| {
437 Error::serialization(format!("JSON parsing failed: {}", e))
438 })
439 }
440 },
441 |parsed| {
442 serde_json::from_value((**parsed).clone())
443 .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
444 },
445 ),
446 _ => Err(Error::validation("Message payload is not JSON")),
447 }
448 }
449
450 fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
453 #[cfg(feature = "simd")]
454 {
455 sonic_rs::to_vec(value)
456 .map(|v| BytesMut::from(v.as_slice()))
457 .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
458 }
459 #[cfg(not(feature = "simd"))]
460 {
461 serde_json::to_vec(value)
462 .map(|v| BytesMut::from(v.as_slice()))
463 .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
464 }
465 }
466
467 fn serialize_json_format(&self) -> Result<Bytes> {
468 match &self.payload {
469 MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
470 MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
471 MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
472 MessagePayload::Binary(_) => Err(Error::validation(
473 "Cannot serialize non-JSON payload as JSON",
474 )),
475 }
476 }
477
478 #[cfg(feature = "simd")]
479 fn serialize_simd_json(&self) -> Result<Bytes> {
480 match &self.payload {
481 MessagePayload::Json(json_payload) => {
482 if json_payload.is_valid {
483 Ok(json_payload.raw.clone())
484 } else {
485 Err(Error::serialization("Invalid JSON payload"))
486 }
487 }
488 _ => Err(Error::validation(
489 "Cannot serialize non-JSON payload with SIMD JSON",
490 )),
491 }
492 }
493
494 fn serialize_messagepack(&self) -> Result<Bytes> {
495 #[cfg(feature = "messagepack")]
496 {
497 match &self.payload {
498 MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
499 Ok(binary.data.clone())
500 }
501 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
502 || {
503 Err(Error::serialization(
504 "Cannot serialize unparsed JSON to MessagePack",
505 ))
506 },
507 |parsed| {
508 let packable_value = JsonValue::from_serde_json(parsed.as_ref());
510 let mut buffer = Vec::new();
511 packable_value.pack(&mut buffer);
512 Ok(Bytes::from(buffer))
513 },
514 ),
515 _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
516 }
517 }
518 #[cfg(not(feature = "messagepack"))]
519 {
520 let _ = self; Err(Error::validation("MessagePack serialization not available"))
522 }
523 }
524
525 fn serialize_cbor(&self) -> Result<Bytes> {
526 match &self.payload {
527 MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
528 Ok(binary.data.clone())
529 }
530 MessagePayload::Json(json_payload) => {
531 if let Some(parsed) = &json_payload.parsed {
532 {
533 let mut buffer = Vec::new();
534 ciborium::into_writer(parsed.as_ref(), &mut buffer)
535 .map(|_| Bytes::from(buffer))
536 .map_err(|e| {
537 Error::serialization(format!("CBOR serialization failed: {e}"))
538 })
539 }
540 } else {
541 #[cfg(feature = "simd")]
543 {
544 let mut json_bytes = json_payload.raw.to_vec();
545 let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
546 .map_err(|e| {
547 Error::serialization(format!(
548 "SIMD JSON parsing failed before CBOR: {e}"
549 ))
550 })?;
551 {
552 let mut buffer = Vec::new();
553 ciborium::into_writer(&value, &mut buffer)
554 .map(|_| Bytes::from(buffer))
555 .map_err(|e| {
556 Error::serialization(format!("CBOR serialization failed: {e}"))
557 })
558 }
559 }
560 #[cfg(not(feature = "simd"))]
561 {
562 let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
563 .map_err(|e| {
564 Error::serialization(format!(
565 "JSON parsing failed before CBOR: {}",
566 e
567 ))
568 })?;
569 let mut buf = Vec::new();
570 ciborium::ser::into_writer(&value, &mut buf).map_err(|e| {
571 Error::serialization(format!("CBOR serialization failed: {}", e))
572 })?;
573 Ok(Bytes::from(buf))
574 }
575 }
576 }
577 _ => Err(Error::validation("Cannot serialize payload as CBOR")),
578 }
579 }
580
581 fn deserialize_json(bytes: Bytes) -> Self {
582 let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
584
585 let payload = MessagePayload::Json(JsonPayload {
586 raw: bytes,
587 parsed: None, is_valid,
589 });
590
591 Self {
592 id: MessageId::Uuid(Uuid::new_v4()),
593 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
594 payload,
595 }
596 }
597
598 #[cfg(feature = "simd")]
599 fn deserialize_simd_json(bytes: Bytes) -> Self {
600 let mut json_bytes = bytes.to_vec();
601 let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
602
603 let payload = MessagePayload::Json(JsonPayload {
604 raw: bytes,
605 parsed: None,
606 is_valid,
607 });
608
609 Self {
610 id: MessageId::Uuid(Uuid::new_v4()),
611 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
612 payload,
613 }
614 }
615
616 fn deserialize_messagepack(bytes: Bytes) -> Self {
617 let payload = MessagePayload::Binary(BinaryPayload {
618 data: bytes,
619 format: BinaryFormat::MessagePack,
620 });
621
622 Self {
623 id: MessageId::Uuid(Uuid::new_v4()),
624 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
625 payload,
626 }
627 }
628
629 fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
630 if let Ok(value) = ciborium::from_reader::<serde_json::Value, _>(&bytes[..]) {
632 let raw = serde_json::to_vec(&value)
633 .map(Bytes::from)
634 .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
635 let payload = MessagePayload::Json(JsonPayload {
636 raw,
637 parsed: Some(Arc::new(value)),
638 is_valid: true,
639 });
640 return Ok(Self {
641 id: MessageId::Uuid(Uuid::new_v4()),
642 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
643 payload,
644 });
645 }
646
647 let payload = MessagePayload::Binary(BinaryPayload {
649 data: bytes,
650 format: BinaryFormat::Cbor,
651 });
652 Ok(Self {
653 id: MessageId::Uuid(Uuid::new_v4()),
654 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
655 payload,
656 })
657 }
658
659 fn detect_format(bytes: &[u8]) -> SerializationFormat {
660 if bytes.is_empty() {
661 return SerializationFormat::Json;
662 }
663
664 if matches!(bytes[0], b'{' | b'[') {
666 #[cfg(feature = "simd")]
667 {
668 return SerializationFormat::SimdJson;
669 }
670 #[cfg(not(feature = "simd"))]
671 {
672 return SerializationFormat::Json;
673 }
674 }
675
676 if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
678 return SerializationFormat::MessagePack;
679 }
680
681 #[cfg(feature = "simd")]
683 {
684 SerializationFormat::SimdJson
685 }
686 #[cfg(not(feature = "simd"))]
687 {
688 SerializationFormat::Json
689 }
690 }
691}
692
693impl MessagePayload {
694 pub const fn size(&self) -> usize {
696 match self {
697 Self::Json(json) => json.raw.len(),
698 Self::Binary(binary) => binary.data.len(),
699 Self::Text(text) => text.len(),
700 Self::Empty => 0,
701 }
702 }
703}
704
705impl MessageMetadata {
706 #[must_use]
708 pub fn new(content_type: ContentType, size: usize) -> Self {
709 Self {
710 created_at: Timestamp::now(),
711 protocol_version: crate::PROTOCOL_VERSION.to_string(),
712 encoding: None,
713 content_type,
714 size,
715 correlation_id: None,
716 headers: HashMap::new(),
717 }
718 }
719
720 #[must_use]
722 pub fn with_header(mut self, key: String, value: String) -> Self {
723 self.headers.insert(key, value);
724 self
725 }
726
727 #[must_use]
729 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
730 self.correlation_id = Some(correlation_id);
731 self
732 }
733
734 #[must_use]
736 pub fn with_encoding(mut self, encoding: String) -> Self {
737 self.encoding = Some(encoding);
738 self
739 }
740}
741
742impl MessageSerializer {
743 #[must_use]
745 pub const fn new() -> Self {
746 Self {
747 default_format: SerializationFormat::Json,
748 enable_compression: false,
749 compression_threshold: 1024, }
751 }
752
753 #[must_use]
755 pub const fn with_format(mut self, format: SerializationFormat) -> Self {
756 self.default_format = format;
757 self
758 }
759
760 #[must_use]
762 pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
763 self.enable_compression = enable;
764 self.compression_threshold = threshold;
765 self
766 }
767
768 pub fn serialize(&self, message: &Message) -> Result<Bytes> {
770 let serialized = message.serialize(self.default_format)?;
771
772 if self.enable_compression && serialized.len() > self.compression_threshold {
774 Ok(self.compress(serialized))
775 } else {
776 Ok(serialized)
777 }
778 }
779
780 fn compress(&self, data: Bytes) -> Bytes {
781 let _ = self;
785 data
786 }
787}
788
789impl Default for MessageSerializer {
790 fn default() -> Self {
791 Self::new()
792 }
793}
794
795impl fmt::Display for MessageId {
796 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
797 match self {
798 Self::String(s) => write!(f, "{s}"),
799 Self::Number(n) => write!(f, "{n}"),
800 Self::Uuid(u) => write!(f, "{u}"),
801 }
802 }
803}
804
805impl From<String> for MessageId {
806 fn from(s: String) -> Self {
807 Self::String(s)
808 }
809}
810
811impl From<&str> for MessageId {
812 fn from(s: &str) -> Self {
813 Self::String(s.to_string())
814 }
815}
816
817impl From<i64> for MessageId {
818 fn from(n: i64) -> Self {
819 Self::Number(n)
820 }
821}
822
823impl From<Uuid> for MessageId {
824 fn from(u: Uuid) -> Self {
825 Self::Uuid(u)
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use super::*;
832 use serde_json::json;
833
834 #[test]
835 fn test_message_creation() {
836 let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
837 assert_eq!(message.id.to_string(), "test");
838 assert!(!message.is_empty());
839 }
840
841 #[test]
842 fn test_message_serialization() {
843 let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
844 let serialized = message.serialize(SerializationFormat::Json).unwrap();
845 assert!(!serialized.is_empty());
846 }
847
848 #[derive(Deserialize, PartialEq, Debug)]
849 struct TestData {
850 number: i32,
851 }
852
853 #[test]
854 fn test_message_parsing() {
855 let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
856
857 let parsed: TestData = message.parse_json().unwrap();
858 assert_eq!(parsed.number, 42);
859 }
860
861 #[test]
862 fn test_format_detection() {
863 let json_bytes = Bytes::from(r#"{"test": true}"#);
864 let format = Message::detect_format(&json_bytes);
865
866 #[cfg(feature = "simd")]
867 assert_eq!(format, SerializationFormat::SimdJson);
868 #[cfg(not(feature = "simd"))]
869 assert_eq!(format, SerializationFormat::Json);
870 }
871
872 #[test]
873 fn test_message_metadata() {
874 let metadata = MessageMetadata::new(ContentType::Json, 100)
875 .with_header("custom".to_string(), "value".to_string())
876 .with_correlation_id("corr-123".to_string());
877
878 assert_eq!(metadata.size, 100);
879 assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
880 assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
881 }
882}