1use std::collections::HashMap;
8use std::fmt;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::error::{Error, Result};
16use crate::types::{ContentType, ProtocolVersion, Timestamp};
17
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(untagged)]
21pub enum MessageId {
22 String(String),
24 Number(i64),
26 Uuid(Uuid),
28}
29
30#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageMetadata {
33 pub created_at: Timestamp,
35
36 pub protocol_version: ProtocolVersion,
38
39 pub encoding: Option<String>,
41
42 pub content_type: ContentType,
44
45 pub size: usize,
47
48 pub correlation_id: Option<String>,
50
51 pub headers: HashMap<String, String>,
53}
54
55#[derive(Debug, Clone)]
57pub struct Message {
58 pub id: MessageId,
60
61 pub metadata: MessageMetadata,
63
64 pub payload: MessagePayload,
66}
67
68#[derive(Debug, Clone)]
70pub enum MessagePayload {
71 Json(JsonPayload),
73
74 Binary(BinaryPayload),
76
77 Text(String),
79
80 Empty,
82}
83
84#[derive(Debug, Clone)]
86pub struct JsonPayload {
87 pub raw: Bytes,
89
90 pub parsed: Option<Arc<serde_json::Value>>,
92
93 pub is_valid: bool,
95}
96
97#[derive(Debug, Clone)]
99pub struct BinaryPayload {
100 pub data: Bytes,
102
103 pub format: BinaryFormat,
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
109#[serde(rename_all = "lowercase")]
110pub enum BinaryFormat {
111 MessagePack,
113
114 ProtoBuf,
116
117 Cbor,
119
120 Custom,
122}
123
124#[derive(Debug)]
126pub struct MessageSerializer {
127 default_format: SerializationFormat,
129
130 enable_compression: bool,
132
133 compression_threshold: usize,
135}
136
137#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub enum SerializationFormat {
140 Json,
142
143 #[cfg(feature = "simd")]
145 SimdJson,
146
147 MessagePack,
149
150 Cbor,
152}
153
154impl Message {
155 pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
161 let json_bytes = Self::serialize_json(&value)?;
162 let payload = MessagePayload::Json(JsonPayload {
163 raw: json_bytes.freeze(),
164 parsed: Some(Arc::new(serde_json::to_value(value)?)),
165 is_valid: true,
166 });
167
168 Ok(Self {
169 id,
170 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
171 payload,
172 })
173 }
174
175 pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
177 let size = data.len();
178 let payload = MessagePayload::Binary(BinaryPayload { data, format });
179
180 Self {
181 id,
182 metadata: MessageMetadata::new(ContentType::Binary, size),
183 payload,
184 }
185 }
186
187 #[must_use]
189 pub fn text(id: MessageId, text: String) -> Self {
190 let size = text.len();
191 let payload = MessagePayload::Text(text);
192
193 Self {
194 id,
195 metadata: MessageMetadata::new(ContentType::Text, size),
196 payload,
197 }
198 }
199
200 #[must_use]
202 pub fn empty(id: MessageId) -> Self {
203 Self {
204 id,
205 metadata: MessageMetadata::new(ContentType::Json, 0),
206 payload: MessagePayload::Empty,
207 }
208 }
209
210 pub const fn size(&self) -> usize {
212 self.metadata.size
213 }
214
215 pub const fn is_empty(&self) -> bool {
217 matches!(self.payload, MessagePayload::Empty)
218 }
219
220 pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
226 match format {
227 SerializationFormat::Json => self.serialize_json_format(),
228 #[cfg(feature = "simd")]
229 SerializationFormat::SimdJson => self.serialize_simd_json(),
230 SerializationFormat::MessagePack => self.serialize_messagepack(),
231 SerializationFormat::Cbor => self.serialize_cbor(),
232 }
233 }
234
235 pub fn deserialize(bytes: Bytes) -> Result<Self> {
241 let format = Self::detect_format(&bytes);
243 Self::deserialize_with_format(bytes, format)
244 }
245
246 pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
248 match format {
249 SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
250 #[cfg(feature = "simd")]
251 SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
252 SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
253 SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
254 }
255 }
256
257 pub fn parse_json<T>(&self) -> Result<T>
259 where
260 T: for<'de> Deserialize<'de>,
261 {
262 match &self.payload {
263 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
264 || {
265 #[cfg(feature = "simd")]
266 {
267 let mut json_bytes = json_payload.raw.to_vec();
268 simd_json::from_slice(&mut json_bytes).map_err(|e| {
269 Error::serialization(format!("SIMD JSON parsing failed: {e}"))
270 })
271 }
272 #[cfg(not(feature = "simd"))]
273 {
274 serde_json::from_slice(&json_payload.raw).map_err(|e| {
275 Error::serialization(format!("JSON parsing failed: {}", e))
276 })
277 }
278 },
279 |parsed| {
280 serde_json::from_value((**parsed).clone())
281 .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
282 },
283 ),
284 _ => Err(Error::validation("Message payload is not JSON")),
285 }
286 }
287
288 fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
291 #[cfg(feature = "simd")]
292 {
293 sonic_rs::to_vec(value)
294 .map(|v| BytesMut::from(v.as_slice()))
295 .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
296 }
297 #[cfg(not(feature = "simd"))]
298 {
299 serde_json::to_vec(value)
300 .map(|v| BytesMut::from(v.as_slice()))
301 .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
302 }
303 }
304
305 fn serialize_json_format(&self) -> Result<Bytes> {
306 match &self.payload {
307 MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
308 MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
309 MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
310 MessagePayload::Binary(_) => Err(Error::validation(
311 "Cannot serialize non-JSON payload as JSON",
312 )),
313 }
314 }
315
316 #[cfg(feature = "simd")]
317 fn serialize_simd_json(&self) -> Result<Bytes> {
318 match &self.payload {
319 MessagePayload::Json(json_payload) => {
320 if json_payload.is_valid {
321 Ok(json_payload.raw.clone())
322 } else {
323 Err(Error::serialization("Invalid JSON payload"))
324 }
325 }
326 _ => Err(Error::validation(
327 "Cannot serialize non-JSON payload with SIMD JSON",
328 )),
329 }
330 }
331
332 fn serialize_messagepack(&self) -> Result<Bytes> {
333 #[cfg(feature = "messagepack")]
334 {
335 match &self.payload {
336 MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
337 Ok(binary.data.clone())
338 }
339 MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
340 || {
341 Err(Error::serialization(
342 "Cannot serialize unparsed JSON to MessagePack",
343 ))
344 },
345 |parsed| {
346 rmp_serde::to_vec(parsed.as_ref())
347 .map(Bytes::from)
348 .map_err(|e| {
349 Error::serialization(format!(
350 "MessagePack serialization failed: {e}"
351 ))
352 })
353 },
354 ),
355 _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
356 }
357 }
358 #[cfg(not(feature = "messagepack"))]
359 {
360 let _ = self; Err(Error::validation("MessagePack serialization not available"))
362 }
363 }
364
365 fn serialize_cbor(&self) -> Result<Bytes> {
366 match &self.payload {
367 MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
368 Ok(binary.data.clone())
369 }
370 MessagePayload::Json(json_payload) => {
371 if let Some(parsed) = &json_payload.parsed {
372 serde_cbor::to_vec(parsed.as_ref())
373 .map(Bytes::from)
374 .map_err(|e| {
375 Error::serialization(format!("CBOR serialization failed: {e}"))
376 })
377 } else {
378 #[cfg(feature = "simd")]
380 {
381 let mut json_bytes = json_payload.raw.to_vec();
382 let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
383 .map_err(|e| {
384 Error::serialization(format!(
385 "SIMD JSON parsing failed before CBOR: {e}"
386 ))
387 })?;
388 serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
389 Error::serialization(format!("CBOR serialization failed: {e}"))
390 })
391 }
392 #[cfg(not(feature = "simd"))]
393 {
394 let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
395 .map_err(|e| {
396 Error::serialization(format!(
397 "JSON parsing failed before CBOR: {}",
398 e
399 ))
400 })?;
401 serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
402 Error::serialization(format!("CBOR serialization failed: {}", e))
403 })
404 }
405 }
406 }
407 _ => Err(Error::validation("Cannot serialize payload as CBOR")),
408 }
409 }
410
411 fn deserialize_json(bytes: Bytes) -> Self {
412 let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
414
415 let payload = MessagePayload::Json(JsonPayload {
416 raw: bytes,
417 parsed: None, is_valid,
419 });
420
421 Self {
422 id: MessageId::Uuid(Uuid::new_v4()),
423 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
424 payload,
425 }
426 }
427
428 #[cfg(feature = "simd")]
429 fn deserialize_simd_json(bytes: Bytes) -> Self {
430 let mut json_bytes = bytes.to_vec();
431 let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
432
433 let payload = MessagePayload::Json(JsonPayload {
434 raw: bytes,
435 parsed: None,
436 is_valid,
437 });
438
439 Self {
440 id: MessageId::Uuid(Uuid::new_v4()),
441 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
442 payload,
443 }
444 }
445
446 fn deserialize_messagepack(bytes: Bytes) -> Self {
447 let payload = MessagePayload::Binary(BinaryPayload {
448 data: bytes,
449 format: BinaryFormat::MessagePack,
450 });
451
452 Self {
453 id: MessageId::Uuid(Uuid::new_v4()),
454 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
455 payload,
456 }
457 }
458
459 fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
460 if let Ok(value) = serde_cbor::from_slice::<serde_json::Value>(&bytes) {
462 let raw = serde_json::to_vec(&value)
463 .map(Bytes::from)
464 .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
465 let payload = MessagePayload::Json(JsonPayload {
466 raw,
467 parsed: Some(Arc::new(value)),
468 is_valid: true,
469 });
470 return Ok(Self {
471 id: MessageId::Uuid(Uuid::new_v4()),
472 metadata: MessageMetadata::new(ContentType::Json, payload.size()),
473 payload,
474 });
475 }
476
477 let payload = MessagePayload::Binary(BinaryPayload {
479 data: bytes,
480 format: BinaryFormat::Cbor,
481 });
482 Ok(Self {
483 id: MessageId::Uuid(Uuid::new_v4()),
484 metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
485 payload,
486 })
487 }
488
489 fn detect_format(bytes: &[u8]) -> SerializationFormat {
490 if bytes.is_empty() {
491 return SerializationFormat::Json;
492 }
493
494 if matches!(bytes[0], b'{' | b'[') {
496 #[cfg(feature = "simd")]
497 {
498 return SerializationFormat::SimdJson;
499 }
500 #[cfg(not(feature = "simd"))]
501 {
502 return SerializationFormat::Json;
503 }
504 }
505
506 if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
508 return SerializationFormat::MessagePack;
509 }
510
511 #[cfg(feature = "simd")]
513 {
514 SerializationFormat::SimdJson
515 }
516 #[cfg(not(feature = "simd"))]
517 {
518 SerializationFormat::Json
519 }
520 }
521}
522
523impl MessagePayload {
524 pub const fn size(&self) -> usize {
526 match self {
527 Self::Json(json) => json.raw.len(),
528 Self::Binary(binary) => binary.data.len(),
529 Self::Text(text) => text.len(),
530 Self::Empty => 0,
531 }
532 }
533}
534
535impl MessageMetadata {
536 #[must_use]
538 pub fn new(content_type: ContentType, size: usize) -> Self {
539 Self {
540 created_at: Timestamp::now(),
541 protocol_version: ProtocolVersion::default(),
542 encoding: None,
543 content_type,
544 size,
545 correlation_id: None,
546 headers: HashMap::new(),
547 }
548 }
549
550 #[must_use]
552 pub fn with_header(mut self, key: String, value: String) -> Self {
553 self.headers.insert(key, value);
554 self
555 }
556
557 #[must_use]
559 pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
560 self.correlation_id = Some(correlation_id);
561 self
562 }
563
564 #[must_use]
566 pub fn with_encoding(mut self, encoding: String) -> Self {
567 self.encoding = Some(encoding);
568 self
569 }
570}
571
572impl MessageSerializer {
573 #[must_use]
575 pub const fn new() -> Self {
576 Self {
577 default_format: SerializationFormat::Json,
578 enable_compression: false,
579 compression_threshold: 1024, }
581 }
582
583 #[must_use]
585 pub const fn with_format(mut self, format: SerializationFormat) -> Self {
586 self.default_format = format;
587 self
588 }
589
590 #[must_use]
592 pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
593 self.enable_compression = enable;
594 self.compression_threshold = threshold;
595 self
596 }
597
598 pub fn serialize(&self, message: &Message) -> Result<Bytes> {
600 let serialized = message.serialize(self.default_format)?;
601
602 if self.enable_compression && serialized.len() > self.compression_threshold {
604 Ok(self.compress(serialized))
605 } else {
606 Ok(serialized)
607 }
608 }
609
610 const fn compress(&self, data: Bytes) -> Bytes {
611 let _ = self; data
615 }
616}
617
618impl Default for MessageSerializer {
619 fn default() -> Self {
620 Self::new()
621 }
622}
623
624impl fmt::Display for MessageId {
625 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
626 match self {
627 Self::String(s) => write!(f, "{s}"),
628 Self::Number(n) => write!(f, "{n}"),
629 Self::Uuid(u) => write!(f, "{u}"),
630 }
631 }
632}
633
634impl From<String> for MessageId {
635 fn from(s: String) -> Self {
636 Self::String(s)
637 }
638}
639
640impl From<&str> for MessageId {
641 fn from(s: &str) -> Self {
642 Self::String(s.to_string())
643 }
644}
645
646impl From<i64> for MessageId {
647 fn from(n: i64) -> Self {
648 Self::Number(n)
649 }
650}
651
652impl From<Uuid> for MessageId {
653 fn from(u: Uuid) -> Self {
654 Self::Uuid(u)
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use serde_json::json;
662
663 #[test]
664 fn test_message_creation() {
665 let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
666 assert_eq!(message.id.to_string(), "test");
667 assert!(!message.is_empty());
668 }
669
670 #[test]
671 fn test_message_serialization() {
672 let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
673 let serialized = message.serialize(SerializationFormat::Json).unwrap();
674 assert!(!serialized.is_empty());
675 }
676
677 #[derive(Deserialize, PartialEq, Debug)]
678 struct TestData {
679 number: i32,
680 }
681
682 #[test]
683 fn test_message_parsing() {
684 let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
685
686 let parsed: TestData = message.parse_json().unwrap();
687 assert_eq!(parsed.number, 42);
688 }
689
690 #[test]
691 fn test_format_detection() {
692 let json_bytes = Bytes::from(r#"{"test": true}"#);
693 let format = Message::detect_format(&json_bytes);
694
695 #[cfg(feature = "simd")]
696 assert_eq!(format, SerializationFormat::SimdJson);
697 #[cfg(not(feature = "simd"))]
698 assert_eq!(format, SerializationFormat::Json);
699 }
700
701 #[test]
702 fn test_message_metadata() {
703 let metadata = MessageMetadata::new(ContentType::Json, 100)
704 .with_header("custom".to_string(), "value".to_string())
705 .with_correlation_id("corr-123".to_string());
706
707 assert_eq!(metadata.size, 100);
708 assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
709 assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
710 }
711}