1use std::collections::HashMap;
8use std::fmt;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15#[cfg(feature = "messagepack")]
16use msgpacker::Packable;
17
18use crate::error::{Error, Result};
19use crate::types::{ContentType, ProtocolVersion, Timestamp};
20
21#[cfg(feature = "messagepack")]
23#[derive(Debug, Clone)]
24pub enum JsonValue {
25 Null,
27 Bool(bool),
29 Number(f64),
31 String(String),
33 Array(Vec<JsonValue>),
35 Object(std::collections::HashMap<String, JsonValue>),
37}
38
39#[cfg(feature = "messagepack")]
40impl JsonValue {
41 pub fn from_serde_json(value: &serde_json::Value) -> Self {
43 match value {
44 serde_json::Value::Null => JsonValue::Null,
45 serde_json::Value::Bool(b) => JsonValue::Bool(*b),
46 serde_json::Value::Number(n) => {
47 if let Some(i) = n.as_i64() {
48 JsonValue::Number(i as f64)
49 } else if let Some(u) = n.as_u64() {
50 JsonValue::Number(u as f64)
51 } else if let Some(f) = n.as_f64() {
52 JsonValue::Number(f)
53 } else {
54 JsonValue::Null
55 }
56 }
57 serde_json::Value::String(s) => JsonValue::String(s.clone()),
58 serde_json::Value::Array(arr) => {
59 JsonValue::Array(arr.iter().map(Self::from_serde_json).collect())
60 }
61 serde_json::Value::Object(obj) => {
62 let mut map = std::collections::HashMap::new();
63 for (k, v) in obj {
64 map.insert(k.clone(), Self::from_serde_json(v));
65 }
66 JsonValue::Object(map)
67 }
68 }
69 }
70}
71
72#[cfg(feature = "messagepack")]
73impl msgpacker::Packable for JsonValue {
74 fn pack<T>(&self, buf: &mut T) -> usize
75 where
76 T: Extend<u8>,
77 {
78 match self {
79 JsonValue::Null => {
80 buf.extend([0xc0]);
82 1
83 }
84 JsonValue::Bool(b) => b.pack(buf),
85 JsonValue::Number(n) => n.pack(buf),
86 JsonValue::String(s) => s.pack(buf),
87 JsonValue::Array(arr) => {
88 let len = arr.len();
90 let mut bytes_written = 0;
91
92 if len <= 15 {
94 buf.extend([0x90 + len as u8]);
95 bytes_written += 1;
96 } else if len <= u16::MAX as usize {
97 buf.extend([0xdc]);
98 buf.extend((len as u16).to_be_bytes());
99 bytes_written += 3;
100 } else {
101 buf.extend([0xdd]);
102 buf.extend((len as u32).to_be_bytes());
103 bytes_written += 5;
104 }
105
106 for item in arr {
108 bytes_written += item.pack(buf);
109 }
110
111 bytes_written
112 }
113 JsonValue::Object(obj) => {
114 let len = obj.len();
116 let mut bytes_written = 0;
117
118 if len <= 15 {
120 buf.extend([0x80 + len as u8]);
121 bytes_written += 1;
122 } else if len <= u16::MAX as usize {
123 buf.extend([0xde]);
124 buf.extend((len as u16).to_be_bytes());
125 bytes_written += 3;
126 } else {
127 buf.extend([0xdf]);
128 buf.extend((len as u32).to_be_bytes());
129 bytes_written += 5;
130 }
131
132 for (k, v) in obj {
134 bytes_written += k.pack(buf);
135 bytes_written += v.pack(buf);
136 }
137
138 bytes_written
139 }
140 }
141 }
142}
143
144#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
146#[serde(untagged)]
147pub enum MessageId {
148 String(String),
150 Number(i64),
152 Uuid(Uuid),
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct MessageMetadata {
159 pub created_at: Timestamp,
161
162 pub protocol_version: ProtocolVersion,
164
165 pub encoding: Option<String>,
167
168 pub content_type: ContentType,
170
171 pub size: usize,
173
174 pub correlation_id: Option<String>,
176
177 pub headers: HashMap<String, String>,
179}
180
181#[derive(Debug, Clone)]
183pub struct Message {
184 pub id: MessageId,
186
187 pub metadata: MessageMetadata,
189
190 pub payload: MessagePayload,
192}
193
194#[derive(Debug, Clone)]
196pub enum MessagePayload {
197 Json(JsonPayload),
199
200 Binary(BinaryPayload),
202
203 Text(String),
205
206 Empty,
208}
209
210#[derive(Debug, Clone)]
212pub struct JsonPayload {
213 pub raw: Bytes,
215
216 pub parsed: Option<Arc<serde_json::Value>>,
218
219 pub is_valid: bool,
221}
222
223#[derive(Debug, Clone)]
225pub struct BinaryPayload {
226 pub data: Bytes,
228
229 pub format: BinaryFormat,
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
235#[serde(rename_all = "lowercase")]
236pub enum BinaryFormat {
237 MessagePack,
239
240 ProtoBuf,
242
243 Cbor,
245
246 Custom,
248}
249
250#[derive(Debug)]
252pub struct MessageSerializer {
253 default_format: SerializationFormat,
255
256 enable_compression: bool,
258
259 compression_threshold: usize,
261}
262
263#[derive(Debug, Clone, Copy, PartialEq, Eq)]
265pub enum SerializationFormat {
266 Json,
268
269 #[cfg(feature = "simd")]
271 SimdJson,
272
273 MessagePack,
275
276 Cbor,
278}
279
280impl Message {
281 pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
287 let json_bytes = Self::serialize_json(&value)?;
288 let payload = MessagePayload::Json(JsonPayload {
289 raw: json_bytes.freeze(),
290 parsed: Some(Arc::new(serde_json::to_value(value)?)),
291 is_valid: true,
292 });
293
294 Ok(Self {
295 id,
296 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
297 payload,
298 })
299 }
300
301 pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
303 let size = data.len();
304 let payload = MessagePayload::Binary(BinaryPayload { data, format });
305
306 Self {
307 id,
308 metadata: MessageMetadata::new(ContentType::Binary, size),
309 payload,
310 }
311 }
312
313 #[must_use]
315 pub fn text(id: MessageId, text: String) -> Self {
316 let size = text.len();
317 let payload = MessagePayload::Text(text);
318
319 Self {
320 id,
321 metadata: MessageMetadata::new(ContentType::Text, size),
322 payload,
323 }
324 }
325
326 #[must_use]
328 pub fn empty(id: MessageId) -> Self {
329 Self {
330 id,
331 metadata: MessageMetadata::new(ContentType::Json, 0),
332 payload: MessagePayload::Empty,
333 }
334 }
335
336 pub const fn size(&self) -> usize {
338 self.metadata.size
339 }
340
341 pub const fn is_empty(&self) -> bool {
343 matches!(self.payload, MessagePayload::Empty)
344 }
345
346 pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
352 match format {
353 SerializationFormat::Json => self.serialize_json_format(),
354 #[cfg(feature = "simd")]
355 SerializationFormat::SimdJson => self.serialize_simd_json(),
356 SerializationFormat::MessagePack => self.serialize_messagepack(),
357 SerializationFormat::Cbor => self.serialize_cbor(),
358 }
359 }
360
361 pub fn deserialize(bytes: Bytes) -> Result<Self> {
367 let format = Self::detect_format(&bytes);
369 Self::deserialize_with_format(bytes, format)
370 }
371
372 pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
374 match format {
375 SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
376 #[cfg(feature = "simd")]
377 SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
378 SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
379 SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
380 }
381 }
382
383 pub fn parse_json<T>(&self) -> Result<T>
385 where
386 T: for<'de> Deserialize<'de>,
387 {
388 match &self.payload {
389 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
390 || {
391 #[cfg(feature = "simd")]
392 {
393 let mut json_bytes = json_payload.raw.to_vec();
394 simd_json::from_slice(&mut json_bytes).map_err(|e| {
395 Error::serialization(format!("SIMD JSON parsing failed: {e}"))
396 })
397 }
398 #[cfg(not(feature = "simd"))]
399 {
400 serde_json::from_slice(&json_payload.raw).map_err(|e| {
401 Error::serialization(format!("JSON parsing failed: {}", e))
402 })
403 }
404 },
405 |parsed| {
406 serde_json::from_value((**parsed).clone())
407 .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
408 },
409 ),
410 _ => Err(Error::validation("Message payload is not JSON")),
411 }
412 }
413
414 fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
417 #[cfg(feature = "simd")]
418 {
419 sonic_rs::to_vec(value)
420 .map(|v| BytesMut::from(v.as_slice()))
421 .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
422 }
423 #[cfg(not(feature = "simd"))]
424 {
425 serde_json::to_vec(value)
426 .map(|v| BytesMut::from(v.as_slice()))
427 .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
428 }
429 }
430
431 fn serialize_json_format(&self) -> Result<Bytes> {
432 match &self.payload {
433 MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
434 MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
435 MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
436 MessagePayload::Binary(_) => Err(Error::validation(
437 "Cannot serialize non-JSON payload as JSON",
438 )),
439 }
440 }
441
442 #[cfg(feature = "simd")]
443 fn serialize_simd_json(&self) -> Result<Bytes> {
444 match &self.payload {
445 MessagePayload::Json(json_payload) => {
446 if json_payload.is_valid {
447 Ok(json_payload.raw.clone())
448 } else {
449 Err(Error::serialization("Invalid JSON payload"))
450 }
451 }
452 _ => Err(Error::validation(
453 "Cannot serialize non-JSON payload with SIMD JSON",
454 )),
455 }
456 }
457
458 fn serialize_messagepack(&self) -> Result<Bytes> {
459 #[cfg(feature = "messagepack")]
460 {
461 match &self.payload {
462 MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
463 Ok(binary.data.clone())
464 }
465 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
466 || {
467 Err(Error::serialization(
468 "Cannot serialize unparsed JSON to MessagePack",
469 ))
470 },
471 |parsed| {
472 let packable_value = JsonValue::from_serde_json(parsed.as_ref());
474 let mut buffer = Vec::new();
475 packable_value.pack(&mut buffer);
476 Ok(Bytes::from(buffer))
477 },
478 ),
479 _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
480 }
481 }
482 #[cfg(not(feature = "messagepack"))]
483 {
484 let _ = self; Err(Error::validation("MessagePack serialization not available"))
486 }
487 }
488
489 fn serialize_cbor(&self) -> Result<Bytes> {
490 match &self.payload {
491 MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
492 Ok(binary.data.clone())
493 }
494 MessagePayload::Json(json_payload) => {
495 if let Some(parsed) = &json_payload.parsed {
496 {
497 let mut buffer = Vec::new();
498 ciborium::into_writer(parsed.as_ref(), &mut buffer)
499 .map(|_| Bytes::from(buffer))
500 .map_err(|e| {
501 Error::serialization(format!("CBOR serialization failed: {e}"))
502 })
503 }
504 } else {
505 #[cfg(feature = "simd")]
507 {
508 let mut json_bytes = json_payload.raw.to_vec();
509 let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
510 .map_err(|e| {
511 Error::serialization(format!(
512 "SIMD JSON parsing failed before CBOR: {e}"
513 ))
514 })?;
515 {
516 let mut buffer = Vec::new();
517 ciborium::into_writer(&value, &mut buffer)
518 .map(|_| Bytes::from(buffer))
519 .map_err(|e| {
520 Error::serialization(format!("CBOR serialization failed: {e}"))
521 })
522 }
523 }
524 #[cfg(not(feature = "simd"))]
525 {
526 let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
527 .map_err(|e| {
528 Error::serialization(format!(
529 "JSON parsing failed before CBOR: {}",
530 e
531 ))
532 })?;
533 serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
534 Error::serialization(format!("CBOR serialization failed: {}", e))
535 })
536 }
537 }
538 }
539 _ => Err(Error::validation("Cannot serialize payload as CBOR")),
540 }
541 }
542
543 fn deserialize_json(bytes: Bytes) -> Self {
544 let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
546
547 let payload = MessagePayload::Json(JsonPayload {
548 raw: bytes,
549 parsed: None, is_valid,
551 });
552
553 Self {
554 id: MessageId::Uuid(Uuid::new_v4()),
555 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
556 payload,
557 }
558 }
559
560 #[cfg(feature = "simd")]
561 fn deserialize_simd_json(bytes: Bytes) -> Self {
562 let mut json_bytes = bytes.to_vec();
563 let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
564
565 let payload = MessagePayload::Json(JsonPayload {
566 raw: bytes,
567 parsed: None,
568 is_valid,
569 });
570
571 Self {
572 id: MessageId::Uuid(Uuid::new_v4()),
573 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
574 payload,
575 }
576 }
577
578 fn deserialize_messagepack(bytes: Bytes) -> Self {
579 let payload = MessagePayload::Binary(BinaryPayload {
580 data: bytes,
581 format: BinaryFormat::MessagePack,
582 });
583
584 Self {
585 id: MessageId::Uuid(Uuid::new_v4()),
586 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
587 payload,
588 }
589 }
590
591 fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
592 if let Ok(value) = ciborium::from_reader::<serde_json::Value, _>(&bytes[..]) {
594 let raw = serde_json::to_vec(&value)
595 .map(Bytes::from)
596 .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
597 let payload = MessagePayload::Json(JsonPayload {
598 raw,
599 parsed: Some(Arc::new(value)),
600 is_valid: true,
601 });
602 return Ok(Self {
603 id: MessageId::Uuid(Uuid::new_v4()),
604 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
605 payload,
606 });
607 }
608
609 let payload = MessagePayload::Binary(BinaryPayload {
611 data: bytes,
612 format: BinaryFormat::Cbor,
613 });
614 Ok(Self {
615 id: MessageId::Uuid(Uuid::new_v4()),
616 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
617 payload,
618 })
619 }
620
621 fn detect_format(bytes: &[u8]) -> SerializationFormat {
622 if bytes.is_empty() {
623 return SerializationFormat::Json;
624 }
625
626 if matches!(bytes[0], b'{' | b'[') {
628 #[cfg(feature = "simd")]
629 {
630 return SerializationFormat::SimdJson;
631 }
632 #[cfg(not(feature = "simd"))]
633 {
634 return SerializationFormat::Json;
635 }
636 }
637
638 if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
640 return SerializationFormat::MessagePack;
641 }
642
643 #[cfg(feature = "simd")]
645 {
646 SerializationFormat::SimdJson
647 }
648 #[cfg(not(feature = "simd"))]
649 {
650 SerializationFormat::Json
651 }
652 }
653}
654
655impl MessagePayload {
656 pub const fn size(&self) -> usize {
658 match self {
659 Self::Json(json) => json.raw.len(),
660 Self::Binary(binary) => binary.data.len(),
661 Self::Text(text) => text.len(),
662 Self::Empty => 0,
663 }
664 }
665}
666
667impl MessageMetadata {
668 #[must_use]
670 pub fn new(content_type: ContentType, size: usize) -> Self {
671 Self {
672 created_at: Timestamp::now(),
673 protocol_version: ProtocolVersion::default(),
674 encoding: None,
675 content_type,
676 size,
677 correlation_id: None,
678 headers: HashMap::new(),
679 }
680 }
681
682 #[must_use]
684 pub fn with_header(mut self, key: String, value: String) -> Self {
685 self.headers.insert(key, value);
686 self
687 }
688
689 #[must_use]
691 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
692 self.correlation_id = Some(correlation_id);
693 self
694 }
695
696 #[must_use]
698 pub fn with_encoding(mut self, encoding: String) -> Self {
699 self.encoding = Some(encoding);
700 self
701 }
702}
703
704impl MessageSerializer {
705 #[must_use]
707 pub const fn new() -> Self {
708 Self {
709 default_format: SerializationFormat::Json,
710 enable_compression: false,
711 compression_threshold: 1024, }
713 }
714
715 #[must_use]
717 pub const fn with_format(mut self, format: SerializationFormat) -> Self {
718 self.default_format = format;
719 self
720 }
721
722 #[must_use]
724 pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
725 self.enable_compression = enable;
726 self.compression_threshold = threshold;
727 self
728 }
729
730 pub fn serialize(&self, message: &Message) -> Result<Bytes> {
732 let serialized = message.serialize(self.default_format)?;
733
734 if self.enable_compression && serialized.len() > self.compression_threshold {
736 Ok(self.compress(serialized))
737 } else {
738 Ok(serialized)
739 }
740 }
741
742 const fn compress(&self, data: Bytes) -> Bytes {
743 let _ = self; data
747 }
748}
749
750impl Default for MessageSerializer {
751 fn default() -> Self {
752 Self::new()
753 }
754}
755
756impl fmt::Display for MessageId {
757 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758 match self {
759 Self::String(s) => write!(f, "{s}"),
760 Self::Number(n) => write!(f, "{n}"),
761 Self::Uuid(u) => write!(f, "{u}"),
762 }
763 }
764}
765
766impl From<String> for MessageId {
767 fn from(s: String) -> Self {
768 Self::String(s)
769 }
770}
771
772impl From<&str> for MessageId {
773 fn from(s: &str) -> Self {
774 Self::String(s.to_string())
775 }
776}
777
778impl From<i64> for MessageId {
779 fn from(n: i64) -> Self {
780 Self::Number(n)
781 }
782}
783
784impl From<Uuid> for MessageId {
785 fn from(u: Uuid) -> Self {
786 Self::Uuid(u)
787 }
788}
789
790#[cfg(test)]
791mod tests {
792 use super::*;
793 use serde_json::json;
794
795 #[test]
796 fn test_message_creation() {
797 let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
798 assert_eq!(message.id.to_string(), "test");
799 assert!(!message.is_empty());
800 }
801
802 #[test]
803 fn test_message_serialization() {
804 let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
805 let serialized = message.serialize(SerializationFormat::Json).unwrap();
806 assert!(!serialized.is_empty());
807 }
808
809 #[derive(Deserialize, PartialEq, Debug)]
810 struct TestData {
811 number: i32,
812 }
813
814 #[test]
815 fn test_message_parsing() {
816 let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
817
818 let parsed: TestData = message.parse_json().unwrap();
819 assert_eq!(parsed.number, 42);
820 }
821
822 #[test]
823 fn test_format_detection() {
824 let json_bytes = Bytes::from(r#"{"test": true}"#);
825 let format = Message::detect_format(&json_bytes);
826
827 #[cfg(feature = "simd")]
828 assert_eq!(format, SerializationFormat::SimdJson);
829 #[cfg(not(feature = "simd"))]
830 assert_eq!(format, SerializationFormat::Json);
831 }
832
833 #[test]
834 fn test_message_metadata() {
835 let metadata = MessageMetadata::new(ContentType::Json, 100)
836 .with_header("custom".to_string(), "value".to_string())
837 .with_correlation_id("corr-123".to_string());
838
839 assert_eq!(metadata.size, 100);
840 assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
841 assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
842 }
843}