1use std::collections::HashMap;
8use std::fmt;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::error::{Error, Result};
16use crate::types::{ContentType, ProtocolVersion, Timestamp};
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(untagged)]
21pub enum MessageId {
22 String(String),
24 Number(i64),
26 Uuid(Uuid),
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageMetadata {
33 pub created_at: Timestamp,
35
36 pub protocol_version: ProtocolVersion,
38
39 pub encoding: Option<String>,
41
42 pub content_type: ContentType,
44
45 pub size: usize,
47
48 pub correlation_id: Option<String>,
50
51 pub headers: HashMap<String, String>,
53}
54
55#[derive(Debug, Clone)]
57pub struct Message {
58 pub id: MessageId,
60
61 pub metadata: MessageMetadata,
63
64 pub payload: MessagePayload,
66}
67
68#[derive(Debug, Clone)]
70pub enum MessagePayload {
71 Json(JsonPayload),
73
74 Binary(BinaryPayload),
76
77 Text(String),
79
80 Empty,
82}
83
84#[derive(Debug, Clone)]
86pub struct JsonPayload {
87 pub raw: Bytes,
89
90 pub parsed: Option<Arc<serde_json::Value>>,
92
93 pub is_valid: bool,
95}
96
97#[derive(Debug, Clone)]
99pub struct BinaryPayload {
100 pub data: Bytes,
102
103 pub format: BinaryFormat,
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
109#[serde(rename_all = "lowercase")]
110pub enum BinaryFormat {
111 MessagePack,
113
114 ProtoBuf,
116
117 Cbor,
119
120 Custom,
122}
123
124#[derive(Debug)]
126pub struct MessageSerializer {
127 default_format: SerializationFormat,
129
130 enable_compression: bool,
132
133 compression_threshold: usize,
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub enum SerializationFormat {
140 Json,
142
143 #[cfg(feature = "simd")]
145 SimdJson,
146
147 MessagePack,
149
150 Cbor,
152}
153
154impl Message {
155 pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
161 let json_bytes = Self::serialize_json(&value)?;
162 let payload = MessagePayload::Json(JsonPayload {
163 raw: json_bytes.freeze(),
164 parsed: Some(Arc::new(serde_json::to_value(value)?)),
165 is_valid: true,
166 });
167
168 Ok(Self {
169 id,
170 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
171 payload,
172 })
173 }
174
175 pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
177 let size = data.len();
178 let payload = MessagePayload::Binary(BinaryPayload { data, format });
179
180 Self {
181 id,
182 metadata: MessageMetadata::new(ContentType::Binary, size),
183 payload,
184 }
185 }
186
187 #[must_use]
189 pub fn text(id: MessageId, text: String) -> Self {
190 let size = text.len();
191 let payload = MessagePayload::Text(text);
192
193 Self {
194 id,
195 metadata: MessageMetadata::new(ContentType::Text, size),
196 payload,
197 }
198 }
199
200 #[must_use]
202 pub fn empty(id: MessageId) -> Self {
203 Self {
204 id,
205 metadata: MessageMetadata::new(ContentType::Json, 0),
206 payload: MessagePayload::Empty,
207 }
208 }
209
210 pub const fn size(&self) -> usize {
212 self.metadata.size
213 }
214
215 pub const fn is_empty(&self) -> bool {
217 matches!(self.payload, MessagePayload::Empty)
218 }
219
220 pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
226 match format {
227 SerializationFormat::Json => self.serialize_json_format(),
228 #[cfg(feature = "simd")]
229 SerializationFormat::SimdJson => self.serialize_simd_json(),
230 SerializationFormat::MessagePack => self.serialize_messagepack(),
231 SerializationFormat::Cbor => self.serialize_cbor(),
232 }
233 }
234
235 pub fn deserialize(bytes: Bytes) -> Result<Self> {
241 let format = Self::detect_format(&bytes);
243 Self::deserialize_with_format(bytes, format)
244 }
245
246 pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
248 match format {
249 SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
250 #[cfg(feature = "simd")]
251 SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
252 SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
253 SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
254 }
255 }
256
257 pub fn parse_json<T>(&self) -> Result<T>
259 where
260 T: for<'de> Deserialize<'de>,
261 {
262 match &self.payload {
263 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
264 || {
265 #[cfg(feature = "simd")]
266 {
267 let mut json_bytes = json_payload.raw.to_vec();
268 simd_json::from_slice(&mut json_bytes).map_err(|e| {
269 Error::serialization(format!("SIMD JSON parsing failed: {e}"))
270 })
271 }
272 #[cfg(not(feature = "simd"))]
273 {
274 serde_json::from_slice(&json_payload.raw).map_err(|e| {
275 Error::serialization(format!("JSON parsing failed: {}", e))
276 })
277 }
278 },
279 |parsed| {
280 serde_json::from_value((**parsed).clone())
281 .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
282 },
283 ),
284 _ => Err(Error::validation("Message payload is not JSON")),
285 }
286 }
287
288 fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
291 #[cfg(feature = "simd")]
292 {
293 sonic_rs::to_vec(value)
294 .map(|v| BytesMut::from(v.as_slice()))
295 .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
296 }
297 #[cfg(not(feature = "simd"))]
298 {
299 serde_json::to_vec(value)
300 .map(|v| BytesMut::from(v.as_slice()))
301 .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
302 }
303 }
304
305 fn serialize_json_format(&self) -> Result<Bytes> {
306 match &self.payload {
307 MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
308 MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
309 MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
310 MessagePayload::Binary(_) => Err(Error::validation(
311 "Cannot serialize non-JSON payload as JSON",
312 )),
313 }
314 }
315
316 #[cfg(feature = "simd")]
317 fn serialize_simd_json(&self) -> Result<Bytes> {
318 match &self.payload {
319 MessagePayload::Json(json_payload) => {
320 if json_payload.is_valid {
321 Ok(json_payload.raw.clone())
322 } else {
323 Err(Error::serialization("Invalid JSON payload"))
324 }
325 }
326 _ => Err(Error::validation(
327 "Cannot serialize non-JSON payload with SIMD JSON",
328 )),
329 }
330 }
331
332 fn serialize_messagepack(&self) -> Result<Bytes> {
333 #[cfg(feature = "messagepack")]
334 {
335 match &self.payload {
336 MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
337 Ok(binary.data.clone())
338 }
339 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
340 || {
341 Err(Error::serialization(
342 "Cannot serialize unparsed JSON to MessagePack",
343 ))
344 },
345 |parsed| {
346 rmp_serde::to_vec(parsed.as_ref())
347 .map(Bytes::from)
348 .map_err(|e| {
349 Error::serialization(format!(
350 "MessagePack serialization failed: {e}"
351 ))
352 })
353 },
354 ),
355 _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
356 }
357 }
358 #[cfg(not(feature = "messagepack"))]
359 {
360 let _ = self; Err(Error::validation("MessagePack serialization not available"))
362 }
363 }
364
365 fn serialize_cbor(&self) -> Result<Bytes> {
366 match &self.payload {
367 MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
368 Ok(binary.data.clone())
369 }
370 MessagePayload::Json(json_payload) => {
371 if let Some(parsed) = &json_payload.parsed {
372 {
373 let mut buffer = Vec::new();
374 ciborium::into_writer(parsed.as_ref(), &mut buffer)
375 .map(|_| Bytes::from(buffer))
376 .map_err(|e| {
377 Error::serialization(format!("CBOR serialization failed: {e}"))
378 })
379 }
380 } else {
381 #[cfg(feature = "simd")]
383 {
384 let mut json_bytes = json_payload.raw.to_vec();
385 let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
386 .map_err(|e| {
387 Error::serialization(format!(
388 "SIMD JSON parsing failed before CBOR: {e}"
389 ))
390 })?;
391 {
392 let mut buffer = Vec::new();
393 ciborium::into_writer(&value, &mut buffer)
394 .map(|_| Bytes::from(buffer))
395 .map_err(|e| {
396 Error::serialization(format!("CBOR serialization failed: {e}"))
397 })
398 }
399 }
400 #[cfg(not(feature = "simd"))]
401 {
402 let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
403 .map_err(|e| {
404 Error::serialization(format!(
405 "JSON parsing failed before CBOR: {}",
406 e
407 ))
408 })?;
409 serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
410 Error::serialization(format!("CBOR serialization failed: {}", e))
411 })
412 }
413 }
414 }
415 _ => Err(Error::validation("Cannot serialize payload as CBOR")),
416 }
417 }
418
419 fn deserialize_json(bytes: Bytes) -> Self {
420 let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
422
423 let payload = MessagePayload::Json(JsonPayload {
424 raw: bytes,
425 parsed: None, is_valid,
427 });
428
429 Self {
430 id: MessageId::Uuid(Uuid::new_v4()),
431 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
432 payload,
433 }
434 }
435
436 #[cfg(feature = "simd")]
437 fn deserialize_simd_json(bytes: Bytes) -> Self {
438 let mut json_bytes = bytes.to_vec();
439 let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
440
441 let payload = MessagePayload::Json(JsonPayload {
442 raw: bytes,
443 parsed: None,
444 is_valid,
445 });
446
447 Self {
448 id: MessageId::Uuid(Uuid::new_v4()),
449 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
450 payload,
451 }
452 }
453
454 fn deserialize_messagepack(bytes: Bytes) -> Self {
455 let payload = MessagePayload::Binary(BinaryPayload {
456 data: bytes,
457 format: BinaryFormat::MessagePack,
458 });
459
460 Self {
461 id: MessageId::Uuid(Uuid::new_v4()),
462 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
463 payload,
464 }
465 }
466
467 fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
468 if let Ok(value) = ciborium::from_reader::<serde_json::Value, _>(&bytes[..]) {
470 let raw = serde_json::to_vec(&value)
471 .map(Bytes::from)
472 .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
473 let payload = MessagePayload::Json(JsonPayload {
474 raw,
475 parsed: Some(Arc::new(value)),
476 is_valid: true,
477 });
478 return Ok(Self {
479 id: MessageId::Uuid(Uuid::new_v4()),
480 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
481 payload,
482 });
483 }
484
485 let payload = MessagePayload::Binary(BinaryPayload {
487 data: bytes,
488 format: BinaryFormat::Cbor,
489 });
490 Ok(Self {
491 id: MessageId::Uuid(Uuid::new_v4()),
492 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
493 payload,
494 })
495 }
496
497 fn detect_format(bytes: &[u8]) -> SerializationFormat {
498 if bytes.is_empty() {
499 return SerializationFormat::Json;
500 }
501
502 if matches!(bytes[0], b'{' | b'[') {
504 #[cfg(feature = "simd")]
505 {
506 return SerializationFormat::SimdJson;
507 }
508 #[cfg(not(feature = "simd"))]
509 {
510 return SerializationFormat::Json;
511 }
512 }
513
514 if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
516 return SerializationFormat::MessagePack;
517 }
518
519 #[cfg(feature = "simd")]
521 {
522 SerializationFormat::SimdJson
523 }
524 #[cfg(not(feature = "simd"))]
525 {
526 SerializationFormat::Json
527 }
528 }
529}
530
531impl MessagePayload {
532 pub const fn size(&self) -> usize {
534 match self {
535 Self::Json(json) => json.raw.len(),
536 Self::Binary(binary) => binary.data.len(),
537 Self::Text(text) => text.len(),
538 Self::Empty => 0,
539 }
540 }
541}
542
543impl MessageMetadata {
544 #[must_use]
546 pub fn new(content_type: ContentType, size: usize) -> Self {
547 Self {
548 created_at: Timestamp::now(),
549 protocol_version: ProtocolVersion::default(),
550 encoding: None,
551 content_type,
552 size,
553 correlation_id: None,
554 headers: HashMap::new(),
555 }
556 }
557
558 #[must_use]
560 pub fn with_header(mut self, key: String, value: String) -> Self {
561 self.headers.insert(key, value);
562 self
563 }
564
565 #[must_use]
567 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
568 self.correlation_id = Some(correlation_id);
569 self
570 }
571
572 #[must_use]
574 pub fn with_encoding(mut self, encoding: String) -> Self {
575 self.encoding = Some(encoding);
576 self
577 }
578}
579
580impl MessageSerializer {
581 #[must_use]
583 pub const fn new() -> Self {
584 Self {
585 default_format: SerializationFormat::Json,
586 enable_compression: false,
587 compression_threshold: 1024, }
589 }
590
591 #[must_use]
593 pub const fn with_format(mut self, format: SerializationFormat) -> Self {
594 self.default_format = format;
595 self
596 }
597
598 #[must_use]
600 pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
601 self.enable_compression = enable;
602 self.compression_threshold = threshold;
603 self
604 }
605
606 pub fn serialize(&self, message: &Message) -> Result<Bytes> {
608 let serialized = message.serialize(self.default_format)?;
609
610 if self.enable_compression && serialized.len() > self.compression_threshold {
612 Ok(self.compress(serialized))
613 } else {
614 Ok(serialized)
615 }
616 }
617
618 const fn compress(&self, data: Bytes) -> Bytes {
619 let _ = self; data
623 }
624}
625
626impl Default for MessageSerializer {
627 fn default() -> Self {
628 Self::new()
629 }
630}
631
632impl fmt::Display for MessageId {
633 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634 match self {
635 Self::String(s) => write!(f, "{s}"),
636 Self::Number(n) => write!(f, "{n}"),
637 Self::Uuid(u) => write!(f, "{u}"),
638 }
639 }
640}
641
642impl From<String> for MessageId {
643 fn from(s: String) -> Self {
644 Self::String(s)
645 }
646}
647
648impl From<&str> for MessageId {
649 fn from(s: &str) -> Self {
650 Self::String(s.to_string())
651 }
652}
653
654impl From<i64> for MessageId {
655 fn from(n: i64) -> Self {
656 Self::Number(n)
657 }
658}
659
660impl From<Uuid> for MessageId {
661 fn from(u: Uuid) -> Self {
662 Self::Uuid(u)
663 }
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669 use serde_json::json;
670
671 #[test]
672 fn test_message_creation() {
673 let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
674 assert_eq!(message.id.to_string(), "test");
675 assert!(!message.is_empty());
676 }
677
678 #[test]
679 fn test_message_serialization() {
680 let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
681 let serialized = message.serialize(SerializationFormat::Json).unwrap();
682 assert!(!serialized.is_empty());
683 }
684
685 #[derive(Deserialize, PartialEq, Debug)]
686 struct TestData {
687 number: i32,
688 }
689
690 #[test]
691 fn test_message_parsing() {
692 let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
693
694 let parsed: TestData = message.parse_json().unwrap();
695 assert_eq!(parsed.number, 42);
696 }
697
698 #[test]
699 fn test_format_detection() {
700 let json_bytes = Bytes::from(r#"{"test": true}"#);
701 let format = Message::detect_format(&json_bytes);
702
703 #[cfg(feature = "simd")]
704 assert_eq!(format, SerializationFormat::SimdJson);
705 #[cfg(not(feature = "simd"))]
706 assert_eq!(format, SerializationFormat::Json);
707 }
708
709 #[test]
710 fn test_message_metadata() {
711 let metadata = MessageMetadata::new(ContentType::Json, 100)
712 .with_header("custom".to_string(), "value".to_string())
713 .with_correlation_id("corr-123".to_string());
714
715 assert_eq!(metadata.size, 100);
716 assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
717 assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
718 }
719}