turbomcp_core/
message.rs

1//! Optimized message types and serialization.
2//!
3//! This module provides zero-copy message handling with optimized serialization
4//! for maximum performance. It supports multiple serialization formats and
5//! includes SIMD acceleration when available.
6
7use std::collections::HashMap;
8use std::fmt;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15#[cfg(feature = "messagepack")]
16use msgpacker::Packable;
17
18use crate::error::{Error, Result};
19use crate::types::{ContentType, ProtocolVersion, Timestamp};
20
21/// A msgpacker-compatible representation of JSON values
22#[cfg(feature = "messagepack")]
23#[derive(Debug, Clone)]
24pub enum JsonValue {
25    /// Represents a null JSON value
26    Null,
27    /// Represents a boolean JSON value
28    Bool(bool),
29    /// Represents a numeric JSON value (stored as f64)
30    Number(f64),
31    /// Represents a string JSON value
32    String(String),
33    /// Represents an array JSON value
34    Array(Vec<JsonValue>),
35    /// Represents an object JSON value
36    Object(std::collections::HashMap<String, JsonValue>),
37}
38
39#[cfg(feature = "messagepack")]
40impl JsonValue {
41    /// Converts a serde_json::Value into a JsonValue for msgpacker serialization
42    pub fn from_serde_json(value: &serde_json::Value) -> Self {
43        match value {
44            serde_json::Value::Null => JsonValue::Null,
45            serde_json::Value::Bool(b) => JsonValue::Bool(*b),
46            serde_json::Value::Number(n) => {
47                if let Some(i) = n.as_i64() {
48                    JsonValue::Number(i as f64)
49                } else if let Some(u) = n.as_u64() {
50                    JsonValue::Number(u as f64)
51                } else if let Some(f) = n.as_f64() {
52                    JsonValue::Number(f)
53                } else {
54                    JsonValue::Null
55                }
56            }
57            serde_json::Value::String(s) => JsonValue::String(s.clone()),
58            serde_json::Value::Array(arr) => {
59                JsonValue::Array(arr.iter().map(Self::from_serde_json).collect())
60            }
61            serde_json::Value::Object(obj) => {
62                let mut map = std::collections::HashMap::new();
63                for (k, v) in obj {
64                    map.insert(k.clone(), Self::from_serde_json(v));
65                }
66                JsonValue::Object(map)
67            }
68        }
69    }
70}
71
72#[cfg(feature = "messagepack")]
73impl msgpacker::Packable for JsonValue {
74    fn pack<T>(&self, buf: &mut T) -> usize
75    where
76        T: Extend<u8>,
77    {
78        match self {
79            JsonValue::Null => {
80                // Pack nil
81                buf.extend([0xc0]);
82                1
83            }
84            JsonValue::Bool(b) => b.pack(buf),
85            JsonValue::Number(n) => n.pack(buf),
86            JsonValue::String(s) => s.pack(buf),
87            JsonValue::Array(arr) => {
88                // Pack array manually since Vec<JsonValue> doesn't implement Packable
89                let len = arr.len();
90                let mut bytes_written = 0;
91
92                // Pack array length
93                if len <= 15 {
94                    buf.extend([0x90 + len as u8]);
95                    bytes_written += 1;
96                } else if len <= u16::MAX as usize {
97                    buf.extend([0xdc]);
98                    buf.extend((len as u16).to_be_bytes());
99                    bytes_written += 3;
100                } else {
101                    buf.extend([0xdd]);
102                    buf.extend((len as u32).to_be_bytes());
103                    bytes_written += 5;
104                }
105
106                // Pack array elements
107                for item in arr {
108                    bytes_written += item.pack(buf);
109                }
110
111                bytes_written
112            }
113            JsonValue::Object(obj) => {
114                // Pack map manually since HashMap<String, JsonValue> doesn't implement Packable
115                let len = obj.len();
116                let mut bytes_written = 0;
117
118                // Pack map length
119                if len <= 15 {
120                    buf.extend([0x80 + len as u8]);
121                    bytes_written += 1;
122                } else if len <= u16::MAX as usize {
123                    buf.extend([0xde]);
124                    buf.extend((len as u16).to_be_bytes());
125                    bytes_written += 3;
126                } else {
127                    buf.extend([0xdf]);
128                    buf.extend((len as u32).to_be_bytes());
129                    bytes_written += 5;
130                }
131
132                // Pack key-value pairs
133                for (k, v) in obj {
134                    bytes_written += k.pack(buf);
135                    bytes_written += v.pack(buf);
136                }
137
138                bytes_written
139            }
140        }
141    }
142}
143
144/// Unique identifier for messages
145#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
146#[serde(untagged)]
147pub enum MessageId {
148    /// String identifier
149    String(String),
150    /// Numeric identifier
151    Number(i64),
152    /// UUID identifier
153    Uuid(Uuid),
154}
155
156/// Message metadata for tracking and debugging
157#[derive(Debug, Clone, Serialize, Deserialize)]
158pub struct MessageMetadata {
159    /// Message creation timestamp
160    pub created_at: Timestamp,
161
162    /// Protocol version used
163    pub protocol_version: ProtocolVersion,
164
165    /// Content encoding (gzip, brotli, etc.)
166    pub encoding: Option<String>,
167
168    /// Content type of the payload
169    pub content_type: ContentType,
170
171    /// Message size in bytes
172    pub size: usize,
173
174    /// Correlation ID for request tracing
175    pub correlation_id: Option<String>,
176
177    /// Custom headers
178    pub headers: HashMap<String, String>,
179}
180
181/// Optimized message container with zero-copy support
182#[derive(Debug, Clone)]
183pub struct Message {
184    /// Message identifier
185    pub id: MessageId,
186
187    /// Message metadata
188    pub metadata: MessageMetadata,
189
190    /// Message payload with zero-copy optimization
191    pub payload: MessagePayload,
192}
193
194/// Zero-copy message payload
195#[derive(Debug, Clone)]
196pub enum MessagePayload {
197    /// JSON payload with potential zero-copy
198    Json(JsonPayload),
199
200    /// Binary payload (`MessagePack`, Protocol Buffers, etc.)
201    Binary(BinaryPayload),
202
203    /// Text payload
204    Text(String),
205
206    /// Empty payload
207    Empty,
208}
209
210/// JSON payload with zero-copy support
211#[derive(Debug, Clone)]
212pub struct JsonPayload {
213    /// Raw JSON bytes (zero-copy when possible)
214    pub raw: Bytes,
215
216    /// Parsed JSON value (lazily evaluated)
217    pub parsed: Option<Arc<serde_json::Value>>,
218
219    /// Whether the raw bytes are valid JSON
220    pub is_valid: bool,
221}
222
223/// Binary payload for efficient serialization formats
224#[derive(Debug, Clone)]
225pub struct BinaryPayload {
226    /// Raw binary data
227    pub data: Bytes,
228
229    /// Binary format identifier
230    pub format: BinaryFormat,
231}
232
233/// Supported binary serialization formats
234#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
235#[serde(rename_all = "lowercase")]
236pub enum BinaryFormat {
237    /// `MessagePack` format
238    MessagePack,
239
240    /// Protocol Buffers
241    ProtoBuf,
242
243    /// CBOR (Concise Binary Object Representation)
244    Cbor,
245
246    /// Custom binary format
247    Custom,
248}
249
250/// Message serializer with format detection
251#[derive(Debug)]
252pub struct MessageSerializer {
253    /// Default serialization format
254    default_format: SerializationFormat,
255
256    /// Whether to enable compression
257    enable_compression: bool,
258
259    /// Compression threshold in bytes
260    compression_threshold: usize,
261}
262
263/// Supported serialization formats
264#[derive(Debug, Clone, Copy, PartialEq, Eq)]
265pub enum SerializationFormat {
266    /// Standard JSON
267    Json,
268
269    /// Fast JSON with SIMD
270    #[cfg(feature = "simd")]
271    SimdJson,
272
273    /// `MessagePack` binary format
274    MessagePack,
275
276    /// CBOR binary format
277    Cbor,
278}
279
280impl Message {
281    /// Create a new message with JSON payload
282    ///
283    /// # Errors
284    ///
285    /// Returns an error if the value cannot be serialized to JSON.
286    pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
287        let json_bytes = Self::serialize_json(&value)?;
288        let payload = MessagePayload::Json(JsonPayload {
289            raw: json_bytes.freeze(),
290            parsed: Some(Arc::new(serde_json::to_value(value)?)),
291            is_valid: true,
292        });
293
294        Ok(Self {
295            id,
296            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
297            payload,
298        })
299    }
300
301    /// Create a new message with binary payload
302    pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
303        let size = data.len();
304        let payload = MessagePayload::Binary(BinaryPayload { data, format });
305
306        Self {
307            id,
308            metadata: MessageMetadata::new(ContentType::Binary, size),
309            payload,
310        }
311    }
312
313    /// Create a new message with text payload
314    #[must_use]
315    pub fn text(id: MessageId, text: String) -> Self {
316        let size = text.len();
317        let payload = MessagePayload::Text(text);
318
319        Self {
320            id,
321            metadata: MessageMetadata::new(ContentType::Text, size),
322            payload,
323        }
324    }
325
326    /// Create an empty message
327    #[must_use]
328    pub fn empty(id: MessageId) -> Self {
329        Self {
330            id,
331            metadata: MessageMetadata::new(ContentType::Json, 0),
332            payload: MessagePayload::Empty,
333        }
334    }
335
336    /// Get the message size in bytes
337    pub const fn size(&self) -> usize {
338        self.metadata.size
339    }
340
341    /// Check if the message is empty
342    pub const fn is_empty(&self) -> bool {
343        matches!(self.payload, MessagePayload::Empty)
344    }
345
346    /// Serialize message to bytes using the specified format
347    ///
348    /// # Errors
349    ///
350    /// Returns an error if serialization fails for the specified format.
351    pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
352        match format {
353            SerializationFormat::Json => self.serialize_json_format(),
354            #[cfg(feature = "simd")]
355            SerializationFormat::SimdJson => self.serialize_simd_json(),
356            SerializationFormat::MessagePack => self.serialize_messagepack(),
357            SerializationFormat::Cbor => self.serialize_cbor(),
358        }
359    }
360
361    /// Deserialize message from bytes with format auto-detection
362    ///
363    /// # Errors
364    ///
365    /// Returns an error if format detection fails or deserialization fails.
366    pub fn deserialize(bytes: Bytes) -> Result<Self> {
367        // Try to detect format from content
368        let format = Self::detect_format(&bytes);
369        Self::deserialize_with_format(bytes, format)
370    }
371
372    /// Deserialize message from bytes using specified format
373    pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
374        match format {
375            SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
376            #[cfg(feature = "simd")]
377            SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
378            SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
379            SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
380        }
381    }
382
383    /// Parse JSON payload to structured data
384    pub fn parse_json<T>(&self) -> Result<T>
385    where
386        T: for<'de> Deserialize<'de>,
387    {
388        match &self.payload {
389            MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
390                || {
391                    #[cfg(feature = "simd")]
392                    {
393                        let mut json_bytes = json_payload.raw.to_vec();
394                        simd_json::from_slice(&mut json_bytes).map_err(|e| {
395                            Error::serialization(format!("SIMD JSON parsing failed: {e}"))
396                        })
397                    }
398                    #[cfg(not(feature = "simd"))]
399                    {
400                        serde_json::from_slice(&json_payload.raw).map_err(|e| {
401                            Error::serialization(format!("JSON parsing failed: {}", e))
402                        })
403                    }
404                },
405                |parsed| {
406                    serde_json::from_value((**parsed).clone())
407                        .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
408                },
409            ),
410            _ => Err(Error::validation("Message payload is not JSON")),
411        }
412    }
413
414    // Private helper methods
415
416    fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
417        #[cfg(feature = "simd")]
418        {
419            sonic_rs::to_vec(value)
420                .map(|v| BytesMut::from(v.as_slice()))
421                .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
422        }
423        #[cfg(not(feature = "simd"))]
424        {
425            serde_json::to_vec(value)
426                .map(|v| BytesMut::from(v.as_slice()))
427                .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
428        }
429    }
430
431    fn serialize_json_format(&self) -> Result<Bytes> {
432        match &self.payload {
433            MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
434            MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
435            MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
436            MessagePayload::Binary(_) => Err(Error::validation(
437                "Cannot serialize non-JSON payload as JSON",
438            )),
439        }
440    }
441
442    #[cfg(feature = "simd")]
443    fn serialize_simd_json(&self) -> Result<Bytes> {
444        match &self.payload {
445            MessagePayload::Json(json_payload) => {
446                if json_payload.is_valid {
447                    Ok(json_payload.raw.clone())
448                } else {
449                    Err(Error::serialization("Invalid JSON payload"))
450                }
451            }
452            _ => Err(Error::validation(
453                "Cannot serialize non-JSON payload with SIMD JSON",
454            )),
455        }
456    }
457
458    fn serialize_messagepack(&self) -> Result<Bytes> {
459        #[cfg(feature = "messagepack")]
460        {
461            match &self.payload {
462                MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
463                    Ok(binary.data.clone())
464                }
465                MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
466                    || {
467                        Err(Error::serialization(
468                            "Cannot serialize unparsed JSON to MessagePack",
469                        ))
470                    },
471                    |parsed| {
472                        // Convert serde_json::Value to msgpacker-compatible format
473                        let packable_value = JsonValue::from_serde_json(parsed.as_ref());
474                        let mut buffer = Vec::new();
475                        packable_value.pack(&mut buffer);
476                        Ok(Bytes::from(buffer))
477                    },
478                ),
479                _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
480            }
481        }
482        #[cfg(not(feature = "messagepack"))]
483        {
484            let _ = self; // Silence unused warning
485            Err(Error::validation("MessagePack serialization not available"))
486        }
487    }
488
489    fn serialize_cbor(&self) -> Result<Bytes> {
490        match &self.payload {
491            MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
492                Ok(binary.data.clone())
493            }
494            MessagePayload::Json(json_payload) => {
495                if let Some(parsed) = &json_payload.parsed {
496                    {
497                        let mut buffer = Vec::new();
498                        ciborium::into_writer(parsed.as_ref(), &mut buffer)
499                            .map(|_| Bytes::from(buffer))
500                            .map_err(|e| {
501                                Error::serialization(format!("CBOR serialization failed: {e}"))
502                            })
503                    }
504                } else {
505                    // Fallback: attempt to parse then encode
506                    #[cfg(feature = "simd")]
507                    {
508                        let mut json_bytes = json_payload.raw.to_vec();
509                        let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
510                            .map_err(|e| {
511                                Error::serialization(format!(
512                                    "SIMD JSON parsing failed before CBOR: {e}"
513                                ))
514                            })?;
515                        {
516                            let mut buffer = Vec::new();
517                            ciborium::into_writer(&value, &mut buffer)
518                                .map(|_| Bytes::from(buffer))
519                                .map_err(|e| {
520                                    Error::serialization(format!("CBOR serialization failed: {e}"))
521                                })
522                        }
523                    }
524                    #[cfg(not(feature = "simd"))]
525                    {
526                        let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
527                            .map_err(|e| {
528                                Error::serialization(format!(
529                                    "JSON parsing failed before CBOR: {}",
530                                    e
531                                ))
532                            })?;
533                        serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
534                            Error::serialization(format!("CBOR serialization failed: {}", e))
535                        })
536                    }
537                }
538            }
539            _ => Err(Error::validation("Cannot serialize payload as CBOR")),
540        }
541    }
542
543    fn deserialize_json(bytes: Bytes) -> Self {
544        // Validate JSON format
545        let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
546
547        let payload = MessagePayload::Json(JsonPayload {
548            raw: bytes,
549            parsed: None, // Lazy evaluation
550            is_valid,
551        });
552
553        Self {
554            id: MessageId::Uuid(Uuid::new_v4()),
555            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
556            payload,
557        }
558    }
559
560    #[cfg(feature = "simd")]
561    fn deserialize_simd_json(bytes: Bytes) -> Self {
562        let mut json_bytes = bytes.to_vec();
563        let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
564
565        let payload = MessagePayload::Json(JsonPayload {
566            raw: bytes,
567            parsed: None,
568            is_valid,
569        });
570
571        Self {
572            id: MessageId::Uuid(Uuid::new_v4()),
573            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
574            payload,
575        }
576    }
577
578    fn deserialize_messagepack(bytes: Bytes) -> Self {
579        let payload = MessagePayload::Binary(BinaryPayload {
580            data: bytes,
581            format: BinaryFormat::MessagePack,
582        });
583
584        Self {
585            id: MessageId::Uuid(Uuid::new_v4()),
586            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
587            payload,
588        }
589    }
590
591    fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
592        // Accept raw CBOR as binary or attempt to decode into JSON Value
593        if let Ok(value) = ciborium::from_reader::<serde_json::Value, _>(&bytes[..]) {
594            let raw = serde_json::to_vec(&value)
595                .map(Bytes::from)
596                .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
597            let payload = MessagePayload::Json(JsonPayload {
598                raw,
599                parsed: Some(Arc::new(value)),
600                is_valid: true,
601            });
602            return Ok(Self {
603                id: MessageId::Uuid(Uuid::new_v4()),
604                metadata: MessageMetadata::new(ContentType::Json, payload.size()),
605                payload,
606            });
607        }
608
609        // If decoding to JSON fails, keep as CBOR binary
610        let payload = MessagePayload::Binary(BinaryPayload {
611            data: bytes,
612            format: BinaryFormat::Cbor,
613        });
614        Ok(Self {
615            id: MessageId::Uuid(Uuid::new_v4()),
616            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
617            payload,
618        })
619    }
620
621    fn detect_format(bytes: &[u8]) -> SerializationFormat {
622        if bytes.is_empty() {
623            return SerializationFormat::Json;
624        }
625
626        // Check for JSON (starts with '{' or '[')
627        if matches!(bytes[0], b'{' | b'[') {
628            #[cfg(feature = "simd")]
629            {
630                return SerializationFormat::SimdJson;
631            }
632            #[cfg(not(feature = "simd"))]
633            {
634                return SerializationFormat::Json;
635            }
636        }
637
638        // Check for MessagePack (starts with specific bytes)
639        if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
640            return SerializationFormat::MessagePack;
641        }
642
643        // Default to JSON
644        #[cfg(feature = "simd")]
645        {
646            SerializationFormat::SimdJson
647        }
648        #[cfg(not(feature = "simd"))]
649        {
650            SerializationFormat::Json
651        }
652    }
653}
654
655impl MessagePayload {
656    /// Get the size of the payload in bytes
657    pub const fn size(&self) -> usize {
658        match self {
659            Self::Json(json) => json.raw.len(),
660            Self::Binary(binary) => binary.data.len(),
661            Self::Text(text) => text.len(),
662            Self::Empty => 0,
663        }
664    }
665}
666
667impl MessageMetadata {
668    /// Create new message metadata
669    #[must_use]
670    pub fn new(content_type: ContentType, size: usize) -> Self {
671        Self {
672            created_at: Timestamp::now(),
673            protocol_version: ProtocolVersion::default(),
674            encoding: None,
675            content_type,
676            size,
677            correlation_id: None,
678            headers: HashMap::new(),
679        }
680    }
681
682    /// Add a custom header
683    #[must_use]
684    pub fn with_header(mut self, key: String, value: String) -> Self {
685        self.headers.insert(key, value);
686        self
687    }
688
689    /// Set correlation ID for tracing
690    #[must_use]
691    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
692        self.correlation_id = Some(correlation_id);
693        self
694    }
695
696    /// Set content encoding
697    #[must_use]
698    pub fn with_encoding(mut self, encoding: String) -> Self {
699        self.encoding = Some(encoding);
700        self
701    }
702}
703
704impl MessageSerializer {
705    /// Create a new message serializer with default settings
706    #[must_use]
707    pub const fn new() -> Self {
708        Self {
709            default_format: SerializationFormat::Json,
710            enable_compression: false,
711            compression_threshold: 1024, // 1KB
712        }
713    }
714
715    /// Set the default serialization format
716    #[must_use]
717    pub const fn with_format(mut self, format: SerializationFormat) -> Self {
718        self.default_format = format;
719        self
720    }
721
722    /// Enable compression for messages above threshold
723    #[must_use]
724    pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
725        self.enable_compression = enable;
726        self.compression_threshold = threshold;
727        self
728    }
729
730    /// Serialize a message using the default format
731    pub fn serialize(&self, message: &Message) -> Result<Bytes> {
732        let serialized = message.serialize(self.default_format)?;
733
734        // Apply compression if enabled and message is large enough
735        if self.enable_compression && serialized.len() > self.compression_threshold {
736            Ok(self.compress(serialized))
737        } else {
738            Ok(serialized)
739        }
740    }
741
742    const fn compress(&self, data: Bytes) -> Bytes {
743        // Compression implementation would go here
744        // For now, just return the original data
745        let _ = self; // Will use self when compression is implemented
746        data
747    }
748}
749
750impl Default for MessageSerializer {
751    fn default() -> Self {
752        Self::new()
753    }
754}
755
756impl fmt::Display for MessageId {
757    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
758        match self {
759            Self::String(s) => write!(f, "{s}"),
760            Self::Number(n) => write!(f, "{n}"),
761            Self::Uuid(u) => write!(f, "{u}"),
762        }
763    }
764}
765
766impl From<String> for MessageId {
767    fn from(s: String) -> Self {
768        Self::String(s)
769    }
770}
771
772impl From<&str> for MessageId {
773    fn from(s: &str) -> Self {
774        Self::String(s.to_string())
775    }
776}
777
778impl From<i64> for MessageId {
779    fn from(n: i64) -> Self {
780        Self::Number(n)
781    }
782}
783
784impl From<Uuid> for MessageId {
785    fn from(u: Uuid) -> Self {
786        Self::Uuid(u)
787    }
788}
789
790#[cfg(test)]
791mod tests {
792    use super::*;
793    use serde_json::json;
794
795    #[test]
796    fn test_message_creation() {
797        let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
798        assert_eq!(message.id.to_string(), "test");
799        assert!(!message.is_empty());
800    }
801
802    #[test]
803    fn test_message_serialization() {
804        let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
805        let serialized = message.serialize(SerializationFormat::Json).unwrap();
806        assert!(!serialized.is_empty());
807    }
808
809    #[derive(Deserialize, PartialEq, Debug)]
810    struct TestData {
811        number: i32,
812    }
813
814    #[test]
815    fn test_message_parsing() {
816        let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
817
818        let parsed: TestData = message.parse_json().unwrap();
819        assert_eq!(parsed.number, 42);
820    }
821
822    #[test]
823    fn test_format_detection() {
824        let json_bytes = Bytes::from(r#"{"test": true}"#);
825        let format = Message::detect_format(&json_bytes);
826
827        #[cfg(feature = "simd")]
828        assert_eq!(format, SerializationFormat::SimdJson);
829        #[cfg(not(feature = "simd"))]
830        assert_eq!(format, SerializationFormat::Json);
831    }
832
833    #[test]
834    fn test_message_metadata() {
835        let metadata = MessageMetadata::new(ContentType::Json, 100)
836            .with_header("custom".to_string(), "value".to_string())
837            .with_correlation_id("corr-123".to_string());
838
839        assert_eq!(metadata.size, 100);
840        assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
841        assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
842    }
843}