turbomcp_protocol/
message.rs

1//! Optimized message types and serialization.
2//!
3//! This module provides the standard message handling abstraction for `TurboMCP`.
4//! It supports multiple serialization formats (`JSON`, `MessagePack`, `CBOR`) and
5//! includes SIMD acceleration when available.
6//!
7//! ## Message Types
8//!
9//! This is the **recommended message type** for most use cases. It provides:
10//!
11//! - Multiple serialization formats (`JSON`, `MessagePack`, `CBOR`)
12//! - Automatic format detection
13//! - SIMD-accelerated JSON parsing (when `simd` feature enabled)
14//! - Cached parsed values for efficient reuse
15//! - Ergonomic API for common operations
16//!
17//! For extreme performance scenarios, see [`ZeroCopyMessage`](crate::zero_copy::ZeroCopyMessage).
18//!
19//! ## Example
20//!
21//! ```rust
22//! use turbomcp_protocol::{Message, MessageId};
23//! use serde_json::json;
24//!
25//! // Create a JSON message
26//! let msg = Message::json(
27//!     MessageId::from("req-1"),
28//!     json!({"method": "test", "params": {}})
29//! )?;
30//!
31//! // Parse to specific type
32//! #[derive(serde::Deserialize)]
33//! struct Request {
34//!     method: String,
35//!     params: serde_json::Value,
36//! }
37//!
38//! let request: Request = msg.parse_json()?;
39//! assert_eq!(request.method, "test");
40//! # Ok::<(), Box<dyn std::error::Error>>(())
41//! ```
42
43use flate2::Compression;
44use flate2::write::GzEncoder;
45use std::collections::HashMap;
46use std::fmt;
47use std::sync::Arc;
48
49use bytes::{Bytes, BytesMut};
50use serde::{Deserialize, Serialize};
51use uuid::Uuid;
52
53#[cfg(feature = "messagepack")]
54use msgpacker::Packable;
55
56use crate::error::{Error, Result};
57use crate::types::{ContentType, ProtocolVersion, Timestamp};
58
59/// A msgpacker-compatible representation of JSON values
60#[cfg(feature = "messagepack")]
61#[derive(Debug, Clone)]
62pub enum JsonValue {
63    /// Represents a null JSON value
64    Null,
65    /// Represents a boolean JSON value
66    Bool(bool),
67    /// Represents a numeric JSON value (stored as f64)
68    Number(f64),
69    /// Represents a string JSON value
70    String(String),
71    /// Represents an array JSON value
72    Array(Vec<JsonValue>),
73    /// Represents an object JSON value
74    Object(std::collections::HashMap<String, JsonValue>),
75}
76
77#[cfg(feature = "messagepack")]
78impl JsonValue {
79    /// Converts a `serde_json::Value` into a `JsonValue` for msgpacker serialization
80    pub fn from_serde_json(value: &serde_json::Value) -> Self {
81        match value {
82            serde_json::Value::Null => JsonValue::Null,
83            serde_json::Value::Bool(b) => JsonValue::Bool(*b),
84            serde_json::Value::Number(n) => {
85                if let Some(i) = n.as_i64() {
86                    JsonValue::Number(i as f64)
87                } else if let Some(u) = n.as_u64() {
88                    JsonValue::Number(u as f64)
89                } else if let Some(f) = n.as_f64() {
90                    JsonValue::Number(f)
91                } else {
92                    JsonValue::Null
93                }
94            }
95            serde_json::Value::String(s) => JsonValue::String(s.clone()),
96            serde_json::Value::Array(arr) => {
97                JsonValue::Array(arr.iter().map(Self::from_serde_json).collect())
98            }
99            serde_json::Value::Object(obj) => {
100                let mut map = std::collections::HashMap::new();
101                for (k, v) in obj {
102                    map.insert(k.clone(), Self::from_serde_json(v));
103                }
104                JsonValue::Object(map)
105            }
106        }
107    }
108}
109
110#[cfg(feature = "messagepack")]
111impl msgpacker::Packable for JsonValue {
112    fn pack<T>(&self, buf: &mut T) -> usize
113    where
114        T: Extend<u8>,
115    {
116        match self {
117            JsonValue::Null => {
118                // Pack nil
119                buf.extend([0xc0]);
120                1
121            }
122            JsonValue::Bool(b) => b.pack(buf),
123            JsonValue::Number(n) => n.pack(buf),
124            JsonValue::String(s) => s.pack(buf),
125            JsonValue::Array(arr) => {
126                // Pack array manually since Vec<JsonValue> doesn't implement Packable
127                let len = arr.len();
128                let mut bytes_written = 0;
129
130                // Pack array length
131                if len <= 15 {
132                    buf.extend([0x90 + len as u8]);
133                    bytes_written += 1;
134                } else if len <= u16::MAX as usize {
135                    buf.extend([0xdc]);
136                    buf.extend((len as u16).to_be_bytes());
137                    bytes_written += 3;
138                } else {
139                    buf.extend([0xdd]);
140                    buf.extend((len as u32).to_be_bytes());
141                    bytes_written += 5;
142                }
143
144                // Pack array elements
145                for item in arr {
146                    bytes_written += item.pack(buf);
147                }
148
149                bytes_written
150            }
151            JsonValue::Object(obj) => {
152                // Pack map manually since HashMap<String, JsonValue> doesn't implement Packable
153                let len = obj.len();
154                let mut bytes_written = 0;
155
156                // Pack map length
157                if len <= 15 {
158                    buf.extend([0x80 + len as u8]);
159                    bytes_written += 1;
160                } else if len <= u16::MAX as usize {
161                    buf.extend([0xde]);
162                    buf.extend((len as u16).to_be_bytes());
163                    bytes_written += 3;
164                } else {
165                    buf.extend([0xdf]);
166                    buf.extend((len as u32).to_be_bytes());
167                    bytes_written += 5;
168                }
169
170                // Pack key-value pairs
171                for (k, v) in obj {
172                    bytes_written += k.pack(buf);
173                    bytes_written += v.pack(buf);
174                }
175
176                bytes_written
177            }
178        }
179    }
180}
181
182/// Unique identifier for messages
183#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
184#[serde(untagged)]
185pub enum MessageId {
186    /// String identifier
187    String(String),
188    /// Numeric identifier
189    Number(i64),
190    /// UUID identifier
191    Uuid(Uuid),
192}
193
194/// Message metadata for tracking and debugging
195#[derive(Debug, Clone, Serialize, Deserialize)]
196pub struct MessageMetadata {
197    /// Message creation timestamp
198    pub created_at: Timestamp,
199
200    /// Protocol version used
201    pub protocol_version: ProtocolVersion,
202
203    /// Content encoding (gzip, brotli, etc.)
204    pub encoding: Option<String>,
205
206    /// Content type of the payload
207    pub content_type: ContentType,
208
209    /// Message size in bytes
210    pub size: usize,
211
212    /// Correlation ID for request tracing
213    pub correlation_id: Option<String>,
214
215    /// Custom headers
216    pub headers: HashMap<String, String>,
217}
218
219/// Optimized message container with zero-copy support
220#[derive(Debug, Clone)]
221pub struct Message {
222    /// Message identifier
223    pub id: MessageId,
224
225    /// Message metadata
226    pub metadata: MessageMetadata,
227
228    /// Message payload with zero-copy optimization
229    pub payload: MessagePayload,
230}
231
232/// Zero-copy message payload
233#[derive(Debug, Clone)]
234pub enum MessagePayload {
235    /// JSON payload with potential zero-copy
236    Json(JsonPayload),
237
238    /// Binary payload (`MessagePack`, Protocol Buffers, etc.)
239    Binary(BinaryPayload),
240
241    /// Text payload
242    Text(String),
243
244    /// Empty payload
245    Empty,
246}
247
248/// JSON payload with zero-copy support
249#[derive(Debug, Clone)]
250pub struct JsonPayload {
251    /// Raw JSON bytes (zero-copy when possible)
252    pub raw: Bytes,
253
254    /// Parsed JSON value (lazily evaluated)
255    pub parsed: Option<Arc<serde_json::Value>>,
256
257    /// Whether the raw bytes are valid JSON
258    pub is_valid: bool,
259}
260
261/// Binary payload for efficient serialization formats
262#[derive(Debug, Clone)]
263pub struct BinaryPayload {
264    /// Raw binary data
265    pub data: Bytes,
266
267    /// Binary format identifier
268    pub format: BinaryFormat,
269}
270
271/// Supported binary serialization formats
272#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
273#[serde(rename_all = "lowercase")]
274pub enum BinaryFormat {
275    /// `MessagePack` format
276    MessagePack,
277
278    /// Protocol Buffers
279    ProtoBuf,
280
281    /// CBOR (Concise Binary Object Representation)
282    Cbor,
283
284    /// Custom binary format
285    Custom,
286}
287
288/// Message serializer with format detection
289#[derive(Debug)]
290pub struct MessageSerializer {
291    /// Default serialization format
292    default_format: SerializationFormat,
293
294    /// Whether to enable compression
295    enable_compression: bool,
296
297    /// Compression threshold in bytes
298    compression_threshold: usize,
299}
300
301/// Supported serialization formats
302#[derive(Debug, Clone, Copy, PartialEq, Eq)]
303pub enum SerializationFormat {
304    /// Standard JSON
305    Json,
306
307    /// Fast JSON with SIMD
308    #[cfg(feature = "simd")]
309    SimdJson,
310
311    /// `MessagePack` binary format
312    MessagePack,
313
314    /// CBOR binary format
315    Cbor,
316}
317
318impl Message {
319    /// Create a new message with JSON payload
320    ///
321    /// # Errors
322    ///
323    /// Returns an error if the value cannot be serialized to JSON.
324    pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
325        let json_bytes = Self::serialize_json(&value)?;
326        let payload = MessagePayload::Json(JsonPayload {
327            raw: json_bytes.freeze(),
328            parsed: Some(Arc::new(serde_json::to_value(value)?)),
329            is_valid: true,
330        });
331
332        Ok(Self {
333            id,
334            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
335            payload,
336        })
337    }
338
339    /// Create a new message with binary payload
340    pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
341        let size = data.len();
342        let payload = MessagePayload::Binary(BinaryPayload { data, format });
343
344        Self {
345            id,
346            metadata: MessageMetadata::new(ContentType::Binary, size),
347            payload,
348        }
349    }
350
351    /// Create a new message with text payload
352    #[must_use]
353    pub fn text(id: MessageId, text: String) -> Self {
354        let size = text.len();
355        let payload = MessagePayload::Text(text);
356
357        Self {
358            id,
359            metadata: MessageMetadata::new(ContentType::Text, size),
360            payload,
361        }
362    }
363
364    /// Create an empty message
365    #[must_use]
366    pub fn empty(id: MessageId) -> Self {
367        Self {
368            id,
369            metadata: MessageMetadata::new(ContentType::Json, 0),
370            payload: MessagePayload::Empty,
371        }
372    }
373
374    /// Get the message size in bytes
375    pub const fn size(&self) -> usize {
376        self.metadata.size
377    }
378
379    /// Check if the message is empty
380    pub const fn is_empty(&self) -> bool {
381        matches!(self.payload, MessagePayload::Empty)
382    }
383
384    /// Serialize message to bytes using the specified format
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if serialization fails for the specified format.
389    pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
390        match format {
391            SerializationFormat::Json => self.serialize_json_format(),
392            #[cfg(feature = "simd")]
393            SerializationFormat::SimdJson => self.serialize_simd_json(),
394            SerializationFormat::MessagePack => self.serialize_messagepack(),
395            SerializationFormat::Cbor => self.serialize_cbor(),
396        }
397    }
398
399    /// Deserialize message from bytes with format auto-detection
400    ///
401    /// # Errors
402    ///
403    /// Returns an error if format detection fails or deserialization fails.
404    pub fn deserialize(bytes: Bytes) -> Result<Self> {
405        // Try to detect format from content
406        let format = Self::detect_format(&bytes);
407        Self::deserialize_with_format(bytes, format)
408    }
409
410    /// Deserialize message from bytes using specified format
411    pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
412        match format {
413            SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
414            #[cfg(feature = "simd")]
415            SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
416            SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
417            SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
418        }
419    }
420
421    /// Parse JSON payload to structured data
422    pub fn parse_json<T>(&self) -> Result<T>
423    where
424        T: for<'de> Deserialize<'de>,
425    {
426        match &self.payload {
427            MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
428                || {
429                    #[cfg(feature = "simd")]
430                    {
431                        let mut json_bytes = json_payload.raw.to_vec();
432                        simd_json::from_slice(&mut json_bytes).map_err(|e| {
433                            Error::serialization(format!("SIMD JSON parsing failed: {e}"))
434                        })
435                    }
436                    #[cfg(not(feature = "simd"))]
437                    {
438                        serde_json::from_slice(&json_payload.raw).map_err(|e| {
439                            Error::serialization(format!("JSON parsing failed: {}", e))
440                        })
441                    }
442                },
443                |parsed| {
444                    serde_json::from_value((**parsed).clone())
445                        .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
446                },
447            ),
448            _ => Err(Error::validation("Message payload is not JSON")),
449        }
450    }
451
452    // Private helper methods
453
454    fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
455        #[cfg(feature = "simd")]
456        {
457            sonic_rs::to_vec(value)
458                .map(|v| BytesMut::from(v.as_slice()))
459                .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
460        }
461        #[cfg(not(feature = "simd"))]
462        {
463            serde_json::to_vec(value)
464                .map(|v| BytesMut::from(v.as_slice()))
465                .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
466        }
467    }
468
469    fn serialize_json_format(&self) -> Result<Bytes> {
470        match &self.payload {
471            MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
472            MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
473            MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
474            MessagePayload::Binary(_) => Err(Error::validation(
475                "Cannot serialize non-JSON payload as JSON",
476            )),
477        }
478    }
479
480    #[cfg(feature = "simd")]
481    fn serialize_simd_json(&self) -> Result<Bytes> {
482        match &self.payload {
483            MessagePayload::Json(json_payload) => {
484                if json_payload.is_valid {
485                    Ok(json_payload.raw.clone())
486                } else {
487                    Err(Error::serialization("Invalid JSON payload"))
488                }
489            }
490            _ => Err(Error::validation(
491                "Cannot serialize non-JSON payload with SIMD JSON",
492            )),
493        }
494    }
495
496    fn serialize_messagepack(&self) -> Result<Bytes> {
497        #[cfg(feature = "messagepack")]
498        {
499            match &self.payload {
500                MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
501                    Ok(binary.data.clone())
502                }
503                MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
504                    || {
505                        Err(Error::serialization(
506                            "Cannot serialize unparsed JSON to MessagePack",
507                        ))
508                    },
509                    |parsed| {
510                        // Convert serde_json::Value to msgpacker-compatible format
511                        let packable_value = JsonValue::from_serde_json(parsed.as_ref());
512                        let mut buffer = Vec::new();
513                        packable_value.pack(&mut buffer);
514                        Ok(Bytes::from(buffer))
515                    },
516                ),
517                _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
518            }
519        }
520        #[cfg(not(feature = "messagepack"))]
521        {
522            let _ = self; // Silence unused warning
523            Err(Error::validation("MessagePack serialization not available"))
524        }
525    }
526
527    fn serialize_cbor(&self) -> Result<Bytes> {
528        match &self.payload {
529            MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
530                Ok(binary.data.clone())
531            }
532            MessagePayload::Json(json_payload) => {
533                if let Some(parsed) = &json_payload.parsed {
534                    {
535                        let mut buffer = Vec::new();
536                        ciborium::into_writer(parsed.as_ref(), &mut buffer)
537                            .map(|_| Bytes::from(buffer))
538                            .map_err(|e| {
539                                Error::serialization(format!("CBOR serialization failed: {e}"))
540                            })
541                    }
542                } else {
543                    // Fallback: attempt to parse then encode
544                    #[cfg(feature = "simd")]
545                    {
546                        let mut json_bytes = json_payload.raw.to_vec();
547                        let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
548                            .map_err(|e| {
549                                Error::serialization(format!(
550                                    "SIMD JSON parsing failed before CBOR: {e}"
551                                ))
552                            })?;
553                        {
554                            let mut buffer = Vec::new();
555                            ciborium::into_writer(&value, &mut buffer)
556                                .map(|_| Bytes::from(buffer))
557                                .map_err(|e| {
558                                    Error::serialization(format!("CBOR serialization failed: {e}"))
559                                })
560                        }
561                    }
562                    #[cfg(not(feature = "simd"))]
563                    {
564                        let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
565                            .map_err(|e| {
566                                Error::serialization(format!(
567                                    "JSON parsing failed before CBOR: {}",
568                                    e
569                                ))
570                            })?;
571                        let mut buf = Vec::new();
572                        ciborium::ser::into_writer(&value, &mut buf).map_err(|e| {
573                            Error::serialization(format!("CBOR serialization failed: {}", e))
574                        })?;
575                        Ok(Bytes::from(buf))
576                    }
577                }
578            }
579            _ => Err(Error::validation("Cannot serialize payload as CBOR")),
580        }
581    }
582
583    fn deserialize_json(bytes: Bytes) -> Self {
584        // Validate JSON format
585        let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
586
587        let payload = MessagePayload::Json(JsonPayload {
588            raw: bytes,
589            parsed: None, // Lazy evaluation
590            is_valid,
591        });
592
593        Self {
594            id: MessageId::Uuid(Uuid::new_v4()),
595            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
596            payload,
597        }
598    }
599
600    #[cfg(feature = "simd")]
601    fn deserialize_simd_json(bytes: Bytes) -> Self {
602        let mut json_bytes = bytes.to_vec();
603        let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
604
605        let payload = MessagePayload::Json(JsonPayload {
606            raw: bytes,
607            parsed: None,
608            is_valid,
609        });
610
611        Self {
612            id: MessageId::Uuid(Uuid::new_v4()),
613            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
614            payload,
615        }
616    }
617
618    fn deserialize_messagepack(bytes: Bytes) -> Self {
619        let payload = MessagePayload::Binary(BinaryPayload {
620            data: bytes,
621            format: BinaryFormat::MessagePack,
622        });
623
624        Self {
625            id: MessageId::Uuid(Uuid::new_v4()),
626            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
627            payload,
628        }
629    }
630
631    fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
632        // Accept raw CBOR as binary or attempt to decode into JSON Value
633        if let Ok(value) = ciborium::from_reader::<serde_json::Value, _>(&bytes[..]) {
634            let raw = serde_json::to_vec(&value)
635                .map(Bytes::from)
636                .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
637            let payload = MessagePayload::Json(JsonPayload {
638                raw,
639                parsed: Some(Arc::new(value)),
640                is_valid: true,
641            });
642            return Ok(Self {
643                id: MessageId::Uuid(Uuid::new_v4()),
644                metadata: MessageMetadata::new(ContentType::Json, payload.size()),
645                payload,
646            });
647        }
648
649        // If decoding to JSON fails, keep as CBOR binary
650        let payload = MessagePayload::Binary(BinaryPayload {
651            data: bytes,
652            format: BinaryFormat::Cbor,
653        });
654        Ok(Self {
655            id: MessageId::Uuid(Uuid::new_v4()),
656            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
657            payload,
658        })
659    }
660
661    fn detect_format(bytes: &[u8]) -> SerializationFormat {
662        if bytes.is_empty() {
663            return SerializationFormat::Json;
664        }
665
666        // Check for JSON (starts with '{' or '[')
667        if matches!(bytes[0], b'{' | b'[') {
668            #[cfg(feature = "simd")]
669            {
670                return SerializationFormat::SimdJson;
671            }
672            #[cfg(not(feature = "simd"))]
673            {
674                return SerializationFormat::Json;
675            }
676        }
677
678        // Check for MessagePack (starts with specific bytes)
679        if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
680            return SerializationFormat::MessagePack;
681        }
682
683        // Default to JSON
684        #[cfg(feature = "simd")]
685        {
686            SerializationFormat::SimdJson
687        }
688        #[cfg(not(feature = "simd"))]
689        {
690            SerializationFormat::Json
691        }
692    }
693}
694
695impl MessagePayload {
696    /// Get the size of the payload in bytes
697    pub const fn size(&self) -> usize {
698        match self {
699            Self::Json(json) => json.raw.len(),
700            Self::Binary(binary) => binary.data.len(),
701            Self::Text(text) => text.len(),
702            Self::Empty => 0,
703        }
704    }
705}
706
707impl MessageMetadata {
708    /// Create new message metadata
709    #[must_use]
710    pub fn new(content_type: ContentType, size: usize) -> Self {
711        Self {
712            created_at: Timestamp::now(),
713            protocol_version: crate::PROTOCOL_VERSION.to_string(),
714            encoding: None,
715            content_type,
716            size,
717            correlation_id: None,
718            headers: HashMap::new(),
719        }
720    }
721
722    /// Add a custom header
723    #[must_use]
724    pub fn with_header(mut self, key: String, value: String) -> Self {
725        self.headers.insert(key, value);
726        self
727    }
728
729    /// Set correlation ID for tracing
730    #[must_use]
731    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
732        self.correlation_id = Some(correlation_id);
733        self
734    }
735
736    /// Set content encoding
737    #[must_use]
738    pub fn with_encoding(mut self, encoding: String) -> Self {
739        self.encoding = Some(encoding);
740        self
741    }
742}
743
744impl MessageSerializer {
745    /// Create a new message serializer with default settings
746    #[must_use]
747    pub const fn new() -> Self {
748        Self {
749            default_format: SerializationFormat::Json,
750            enable_compression: false,
751            compression_threshold: 1024, // 1KB
752        }
753    }
754
755    /// Set the default serialization format
756    #[must_use]
757    pub const fn with_format(mut self, format: SerializationFormat) -> Self {
758        self.default_format = format;
759        self
760    }
761
762    /// Enable compression for messages above threshold
763    #[must_use]
764    pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
765        self.enable_compression = enable;
766        self.compression_threshold = threshold;
767        self
768    }
769
770    /// Serialize a message using the default format
771    pub fn serialize(&self, message: &mut Message) -> Result<Bytes> {
772        let serialized = message.serialize(self.default_format)?;
773
774        // Apply compression if enabled and message is large enough
775        if self.enable_compression && serialized.len() > self.compression_threshold {
776            message.metadata.encoding = Some("gzip".to_string()); // Set encoding to gzip
777            Ok(self.compress(serialized))
778        } else {
779            Ok(serialized)
780        }
781    }
782
783    /// Compresses the given data using gzip.
784    /// Returns the compressed data, or the original data if compression fails.
785    fn compress(&self, data: Bytes) -> Bytes {
786        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
787        if let Err(e) = std::io::Write::write_all(&mut encoder, &data) {
788            eprintln!("Failed to compress data: {}", e);
789            return data; // Return original data on error
790        }
791        match encoder.finish() {
792            Ok(compressed_data) => Bytes::from(compressed_data),
793            Err(e) => {
794                eprintln!("Failed to finish compression: {}", e);
795                data // Return original data on error
796            }
797        }
798    }
799}
800
801impl Default for MessageSerializer {
802    fn default() -> Self {
803        Self::new()
804    }
805}
806
807impl fmt::Display for MessageId {
808    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
809        match self {
810            Self::String(s) => write!(f, "{s}"),
811            Self::Number(n) => write!(f, "{n}"),
812            Self::Uuid(u) => write!(f, "{u}"),
813        }
814    }
815}
816
817impl From<String> for MessageId {
818    fn from(s: String) -> Self {
819        Self::String(s)
820    }
821}
822
823impl From<&str> for MessageId {
824    fn from(s: &str) -> Self {
825        Self::String(s.to_string())
826    }
827}
828
829impl From<i64> for MessageId {
830    fn from(n: i64) -> Self {
831        Self::Number(n)
832    }
833}
834
835impl From<Uuid> for MessageId {
836    fn from(u: Uuid) -> Self {
837        Self::Uuid(u)
838    }
839}
840
841#[cfg(test)]
842mod tests {
843    use super::*;
844    use serde_json::json;
845
846    #[test]
847    fn test_message_creation() {
848        let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
849        assert_eq!(message.id.to_string(), "test");
850        assert!(!message.is_empty());
851    }
852
853    #[test]
854    fn test_message_serialization() {
855        let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
856        let serialized = message.serialize(SerializationFormat::Json).unwrap();
857        assert!(!serialized.is_empty());
858    }
859
860    #[derive(Deserialize, PartialEq, Debug)]
861    struct TestData {
862        number: i32,
863    }
864
865    #[test]
866    fn test_message_parsing() {
867        let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
868
869        let parsed: TestData = message.parse_json().unwrap();
870        assert_eq!(parsed.number, 42);
871    }
872
873    #[test]
874    fn test_format_detection() {
875        let json_bytes = Bytes::from(r#"{"test": true}"#);
876        let format = Message::detect_format(&json_bytes);
877
878        #[cfg(feature = "simd")]
879        assert_eq!(format, SerializationFormat::SimdJson);
880        #[cfg(not(feature = "simd"))]
881        assert_eq!(format, SerializationFormat::Json);
882    }
883
884    #[test]
885    fn test_message_metadata() {
886        let metadata = MessageMetadata::new(ContentType::Json, 100)
887            .with_header("custom".to_string(), "value".to_string())
888            .with_correlation_id("corr-123".to_string());
889
890        assert_eq!(metadata.size, 100);
891        assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
892        assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
893    }
894
895    #[test]
896    fn test_message_serializer_compression() {
897        use flate2::read::GzDecoder;
898        use std::io::Read;
899
900        let serializer = MessageSerializer::new().with_compression(true, 10); // Enable compression with a low threshold
901
902        let large_json = json!({
903            "data": "a".repeat(100), // A string larger than 10 bytes
904        });
905        let mut message =
906            Message::json(MessageId::from("compressed_test"), large_json.clone()).unwrap();
907
908        let original_size = message.size();
909        assert!(
910            original_size > 10,
911            "Original message size should be greater than compression threshold"
912        );
913
914        let compressed_bytes = serializer.serialize(&mut message).unwrap();
915
916        // Assert encoding metadata is set
917        assert_eq!(message.metadata.encoding, Some("gzip".to_string()));
918
919        // Assert compressed size is smaller (unless data is incompressible)
920        assert!(
921            compressed_bytes.len() < original_size,
922            "Compressed size should be smaller than original"
923        );
924
925        // Decompress and verify content
926        let mut decoder = GzDecoder::new(&compressed_bytes[..]);
927        let mut decompressed_data = Vec::new();
928        decoder.read_to_end(&mut decompressed_data).unwrap();
929
930        let decompressed_message = Message::deserialize(Bytes::from(decompressed_data)).unwrap();
931        let parsed_json: serde_json::Value = decompressed_message.parse_json().unwrap();
932
933        assert_eq!(parsed_json, large_json);
934    }
935}