turbomcp_core/
message.rs

1//! Optimized message types and serialization.
2//!
3//! This module provides zero-copy message handling with optimized serialization
4//! for maximum performance. It supports multiple serialization formats and
5//! includes SIMD acceleration when available.
6
7use std::collections::HashMap;
8use std::fmt;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::error::{Error, Result};
16use crate::types::{ContentType, ProtocolVersion, Timestamp};
17
18/// Unique identifier for messages
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(untagged)]
21pub enum MessageId {
22    /// String identifier
23    String(String),
24    /// Numeric identifier
25    Number(i64),
26    /// UUID identifier
27    Uuid(Uuid),
28}
29
30/// Message metadata for tracking and debugging
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageMetadata {
33    /// Message creation timestamp
34    pub created_at: Timestamp,
35
36    /// Protocol version used
37    pub protocol_version: ProtocolVersion,
38
39    /// Content encoding (gzip, brotli, etc.)
40    pub encoding: Option<String>,
41
42    /// Content type of the payload
43    pub content_type: ContentType,
44
45    /// Message size in bytes
46    pub size: usize,
47
48    /// Correlation ID for request tracing
49    pub correlation_id: Option<String>,
50
51    /// Custom headers
52    pub headers: HashMap<String, String>,
53}
54
55/// Optimized message container with zero-copy support
56#[derive(Debug, Clone)]
57pub struct Message {
58    /// Message identifier
59    pub id: MessageId,
60
61    /// Message metadata
62    pub metadata: MessageMetadata,
63
64    /// Message payload with zero-copy optimization
65    pub payload: MessagePayload,
66}
67
68/// Zero-copy message payload
69#[derive(Debug, Clone)]
70pub enum MessagePayload {
71    /// JSON payload with potential zero-copy
72    Json(JsonPayload),
73
74    /// Binary payload (`MessagePack`, Protocol Buffers, etc.)
75    Binary(BinaryPayload),
76
77    /// Text payload
78    Text(String),
79
80    /// Empty payload
81    Empty,
82}
83
84/// JSON payload with zero-copy support
85#[derive(Debug, Clone)]
86pub struct JsonPayload {
87    /// Raw JSON bytes (zero-copy when possible)
88    pub raw: Bytes,
89
90    /// Parsed JSON value (lazily evaluated)
91    pub parsed: Option<Arc<serde_json::Value>>,
92
93    /// Whether the raw bytes are valid JSON
94    pub is_valid: bool,
95}
96
97/// Binary payload for efficient serialization formats
98#[derive(Debug, Clone)]
99pub struct BinaryPayload {
100    /// Raw binary data
101    pub data: Bytes,
102
103    /// Binary format identifier
104    pub format: BinaryFormat,
105}
106
107/// Supported binary serialization formats
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
109#[serde(rename_all = "lowercase")]
110pub enum BinaryFormat {
111    /// `MessagePack` format
112    MessagePack,
113
114    /// Protocol Buffers
115    ProtoBuf,
116
117    /// CBOR (Concise Binary Object Representation)
118    Cbor,
119
120    /// Custom binary format
121    Custom,
122}
123
124/// Message serializer with format detection
125#[derive(Debug)]
126pub struct MessageSerializer {
127    /// Default serialization format
128    default_format: SerializationFormat,
129
130    /// Whether to enable compression
131    enable_compression: bool,
132
133    /// Compression threshold in bytes
134    compression_threshold: usize,
135}
136
137/// Supported serialization formats
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub enum SerializationFormat {
140    /// Standard JSON
141    Json,
142
143    /// Fast JSON with SIMD
144    #[cfg(feature = "simd")]
145    SimdJson,
146
147    /// `MessagePack` binary format
148    MessagePack,
149
150    /// CBOR binary format
151    Cbor,
152}
153
154impl Message {
155    /// Create a new message with JSON payload
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the value cannot be serialized to JSON.
160    pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
161        let json_bytes = Self::serialize_json(&value)?;
162        let payload = MessagePayload::Json(JsonPayload {
163            raw: json_bytes.freeze(),
164            parsed: Some(Arc::new(serde_json::to_value(value)?)),
165            is_valid: true,
166        });
167
168        Ok(Self {
169            id,
170            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
171            payload,
172        })
173    }
174
175    /// Create a new message with binary payload
176    pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
177        let size = data.len();
178        let payload = MessagePayload::Binary(BinaryPayload { data, format });
179
180        Self {
181            id,
182            metadata: MessageMetadata::new(ContentType::Binary, size),
183            payload,
184        }
185    }
186
187    /// Create a new message with text payload
188    #[must_use]
189    pub fn text(id: MessageId, text: String) -> Self {
190        let size = text.len();
191        let payload = MessagePayload::Text(text);
192
193        Self {
194            id,
195            metadata: MessageMetadata::new(ContentType::Text, size),
196            payload,
197        }
198    }
199
200    /// Create an empty message
201    #[must_use]
202    pub fn empty(id: MessageId) -> Self {
203        Self {
204            id,
205            metadata: MessageMetadata::new(ContentType::Json, 0),
206            payload: MessagePayload::Empty,
207        }
208    }
209
210    /// Get the message size in bytes
211    pub const fn size(&self) -> usize {
212        self.metadata.size
213    }
214
215    /// Check if the message is empty
216    pub const fn is_empty(&self) -> bool {
217        matches!(self.payload, MessagePayload::Empty)
218    }
219
220    /// Serialize message to bytes using the specified format
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if serialization fails for the specified format.
225    pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
226        match format {
227            SerializationFormat::Json => self.serialize_json_format(),
228            #[cfg(feature = "simd")]
229            SerializationFormat::SimdJson => self.serialize_simd_json(),
230            SerializationFormat::MessagePack => self.serialize_messagepack(),
231            SerializationFormat::Cbor => self.serialize_cbor(),
232        }
233    }
234
235    /// Deserialize message from bytes with format auto-detection
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if format detection fails or deserialization fails.
240    pub fn deserialize(bytes: Bytes) -> Result<Self> {
241        // Try to detect format from content
242        let format = Self::detect_format(&bytes);
243        Self::deserialize_with_format(bytes, format)
244    }
245
246    /// Deserialize message from bytes using specified format
247    pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
248        match format {
249            SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
250            #[cfg(feature = "simd")]
251            SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
252            SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
253            SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
254        }
255    }
256
257    /// Parse JSON payload to structured data
258    pub fn parse_json<T>(&self) -> Result<T>
259    where
260        T: for<'de> Deserialize<'de>,
261    {
262        match &self.payload {
263            MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
264                || {
265                    #[cfg(feature = "simd")]
266                    {
267                        let mut json_bytes = json_payload.raw.to_vec();
268                        simd_json::from_slice(&mut json_bytes).map_err(|e| {
269                            Error::serialization(format!("SIMD JSON parsing failed: {e}"))
270                        })
271                    }
272                    #[cfg(not(feature = "simd"))]
273                    {
274                        serde_json::from_slice(&json_payload.raw).map_err(|e| {
275                            Error::serialization(format!("JSON parsing failed: {}", e))
276                        })
277                    }
278                },
279                |parsed| {
280                    serde_json::from_value((**parsed).clone())
281                        .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
282                },
283            ),
284            _ => Err(Error::validation("Message payload is not JSON")),
285        }
286    }
287
288    // Private helper methods
289
290    fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
291        #[cfg(feature = "simd")]
292        {
293            sonic_rs::to_vec(value)
294                .map(|v| BytesMut::from(v.as_slice()))
295                .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
296        }
297        #[cfg(not(feature = "simd"))]
298        {
299            serde_json::to_vec(value)
300                .map(|v| BytesMut::from(v.as_slice()))
301                .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
302        }
303    }
304
305    fn serialize_json_format(&self) -> Result<Bytes> {
306        match &self.payload {
307            MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
308            MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
309            MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
310            MessagePayload::Binary(_) => Err(Error::validation(
311                "Cannot serialize non-JSON payload as JSON",
312            )),
313        }
314    }
315
316    #[cfg(feature = "simd")]
317    fn serialize_simd_json(&self) -> Result<Bytes> {
318        match &self.payload {
319            MessagePayload::Json(json_payload) => {
320                if json_payload.is_valid {
321                    Ok(json_payload.raw.clone())
322                } else {
323                    Err(Error::serialization("Invalid JSON payload"))
324                }
325            }
326            _ => Err(Error::validation(
327                "Cannot serialize non-JSON payload with SIMD JSON",
328            )),
329        }
330    }
331
332    fn serialize_messagepack(&self) -> Result<Bytes> {
333        #[cfg(feature = "messagepack")]
334        {
335            match &self.payload {
336                MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
337                    Ok(binary.data.clone())
338                }
339                MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
340                    || {
341                        Err(Error::serialization(
342                            "Cannot serialize unparsed JSON to MessagePack",
343                        ))
344                    },
345                    |parsed| {
346                        rmp_serde::to_vec(parsed.as_ref())
347                            .map(Bytes::from)
348                            .map_err(|e| {
349                                Error::serialization(format!(
350                                    "MessagePack serialization failed: {e}"
351                                ))
352                            })
353                    },
354                ),
355                _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
356            }
357        }
358        #[cfg(not(feature = "messagepack"))]
359        {
360            let _ = self; // Silence unused warning
361            Err(Error::validation("MessagePack serialization not available"))
362        }
363    }
364
365    fn serialize_cbor(&self) -> Result<Bytes> {
366        match &self.payload {
367            MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
368                Ok(binary.data.clone())
369            }
370            MessagePayload::Json(json_payload) => {
371                if let Some(parsed) = &json_payload.parsed {
372                    {
373                        let mut buffer = Vec::new();
374                        ciborium::into_writer(parsed.as_ref(), &mut buffer)
375                            .map(|_| Bytes::from(buffer))
376                            .map_err(|e| {
377                                Error::serialization(format!("CBOR serialization failed: {e}"))
378                            })
379                    }
380                } else {
381                    // Fallback: attempt to parse then encode
382                    #[cfg(feature = "simd")]
383                    {
384                        let mut json_bytes = json_payload.raw.to_vec();
385                        let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
386                            .map_err(|e| {
387                                Error::serialization(format!(
388                                    "SIMD JSON parsing failed before CBOR: {e}"
389                                ))
390                            })?;
391                        {
392                            let mut buffer = Vec::new();
393                            ciborium::into_writer(&value, &mut buffer)
394                                .map(|_| Bytes::from(buffer))
395                                .map_err(|e| {
396                                    Error::serialization(format!("CBOR serialization failed: {e}"))
397                                })
398                        }
399                    }
400                    #[cfg(not(feature = "simd"))]
401                    {
402                        let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
403                            .map_err(|e| {
404                                Error::serialization(format!(
405                                    "JSON parsing failed before CBOR: {}",
406                                    e
407                                ))
408                            })?;
409                        serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
410                            Error::serialization(format!("CBOR serialization failed: {}", e))
411                        })
412                    }
413                }
414            }
415            _ => Err(Error::validation("Cannot serialize payload as CBOR")),
416        }
417    }
418
419    fn deserialize_json(bytes: Bytes) -> Self {
420        // Validate JSON format
421        let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
422
423        let payload = MessagePayload::Json(JsonPayload {
424            raw: bytes,
425            parsed: None, // Lazy evaluation
426            is_valid,
427        });
428
429        Self {
430            id: MessageId::Uuid(Uuid::new_v4()),
431            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
432            payload,
433        }
434    }
435
436    #[cfg(feature = "simd")]
437    fn deserialize_simd_json(bytes: Bytes) -> Self {
438        let mut json_bytes = bytes.to_vec();
439        let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
440
441        let payload = MessagePayload::Json(JsonPayload {
442            raw: bytes,
443            parsed: None,
444            is_valid,
445        });
446
447        Self {
448            id: MessageId::Uuid(Uuid::new_v4()),
449            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
450            payload,
451        }
452    }
453
454    fn deserialize_messagepack(bytes: Bytes) -> Self {
455        let payload = MessagePayload::Binary(BinaryPayload {
456            data: bytes,
457            format: BinaryFormat::MessagePack,
458        });
459
460        Self {
461            id: MessageId::Uuid(Uuid::new_v4()),
462            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
463            payload,
464        }
465    }
466
467    fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
468        // Accept raw CBOR as binary or attempt to decode into JSON Value
469        if let Ok(value) = ciborium::from_reader::<serde_json::Value, _>(&bytes[..]) {
470            let raw = serde_json::to_vec(&value)
471                .map(Bytes::from)
472                .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
473            let payload = MessagePayload::Json(JsonPayload {
474                raw,
475                parsed: Some(Arc::new(value)),
476                is_valid: true,
477            });
478            return Ok(Self {
479                id: MessageId::Uuid(Uuid::new_v4()),
480                metadata: MessageMetadata::new(ContentType::Json, payload.size()),
481                payload,
482            });
483        }
484
485        // If decoding to JSON fails, keep as CBOR binary
486        let payload = MessagePayload::Binary(BinaryPayload {
487            data: bytes,
488            format: BinaryFormat::Cbor,
489        });
490        Ok(Self {
491            id: MessageId::Uuid(Uuid::new_v4()),
492            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
493            payload,
494        })
495    }
496
497    fn detect_format(bytes: &[u8]) -> SerializationFormat {
498        if bytes.is_empty() {
499            return SerializationFormat::Json;
500        }
501
502        // Check for JSON (starts with '{' or '[')
503        if matches!(bytes[0], b'{' | b'[') {
504            #[cfg(feature = "simd")]
505            {
506                return SerializationFormat::SimdJson;
507            }
508            #[cfg(not(feature = "simd"))]
509            {
510                return SerializationFormat::Json;
511            }
512        }
513
514        // Check for MessagePack (starts with specific bytes)
515        if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
516            return SerializationFormat::MessagePack;
517        }
518
519        // Default to JSON
520        #[cfg(feature = "simd")]
521        {
522            SerializationFormat::SimdJson
523        }
524        #[cfg(not(feature = "simd"))]
525        {
526            SerializationFormat::Json
527        }
528    }
529}
530
531impl MessagePayload {
532    /// Get the size of the payload in bytes
533    pub const fn size(&self) -> usize {
534        match self {
535            Self::Json(json) => json.raw.len(),
536            Self::Binary(binary) => binary.data.len(),
537            Self::Text(text) => text.len(),
538            Self::Empty => 0,
539        }
540    }
541}
542
543impl MessageMetadata {
544    /// Create new message metadata
545    #[must_use]
546    pub fn new(content_type: ContentType, size: usize) -> Self {
547        Self {
548            created_at: Timestamp::now(),
549            protocol_version: ProtocolVersion::default(),
550            encoding: None,
551            content_type,
552            size,
553            correlation_id: None,
554            headers: HashMap::new(),
555        }
556    }
557
558    /// Add a custom header
559    #[must_use]
560    pub fn with_header(mut self, key: String, value: String) -> Self {
561        self.headers.insert(key, value);
562        self
563    }
564
565    /// Set correlation ID for tracing
566    #[must_use]
567    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
568        self.correlation_id = Some(correlation_id);
569        self
570    }
571
572    /// Set content encoding
573    #[must_use]
574    pub fn with_encoding(mut self, encoding: String) -> Self {
575        self.encoding = Some(encoding);
576        self
577    }
578}
579
580impl MessageSerializer {
581    /// Create a new message serializer with default settings
582    #[must_use]
583    pub const fn new() -> Self {
584        Self {
585            default_format: SerializationFormat::Json,
586            enable_compression: false,
587            compression_threshold: 1024, // 1KB
588        }
589    }
590
591    /// Set the default serialization format
592    #[must_use]
593    pub const fn with_format(mut self, format: SerializationFormat) -> Self {
594        self.default_format = format;
595        self
596    }
597
598    /// Enable compression for messages above threshold
599    #[must_use]
600    pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
601        self.enable_compression = enable;
602        self.compression_threshold = threshold;
603        self
604    }
605
606    /// Serialize a message using the default format
607    pub fn serialize(&self, message: &Message) -> Result<Bytes> {
608        let serialized = message.serialize(self.default_format)?;
609
610        // Apply compression if enabled and message is large enough
611        if self.enable_compression && serialized.len() > self.compression_threshold {
612            Ok(self.compress(serialized))
613        } else {
614            Ok(serialized)
615        }
616    }
617
618    const fn compress(&self, data: Bytes) -> Bytes {
619        // Compression implementation would go here
620        // For now, just return the original data
621        let _ = self; // Will use self when compression is implemented
622        data
623    }
624}
625
626impl Default for MessageSerializer {
627    fn default() -> Self {
628        Self::new()
629    }
630}
631
632impl fmt::Display for MessageId {
633    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
634        match self {
635            Self::String(s) => write!(f, "{s}"),
636            Self::Number(n) => write!(f, "{n}"),
637            Self::Uuid(u) => write!(f, "{u}"),
638        }
639    }
640}
641
642impl From<String> for MessageId {
643    fn from(s: String) -> Self {
644        Self::String(s)
645    }
646}
647
648impl From<&str> for MessageId {
649    fn from(s: &str) -> Self {
650        Self::String(s.to_string())
651    }
652}
653
654impl From<i64> for MessageId {
655    fn from(n: i64) -> Self {
656        Self::Number(n)
657    }
658}
659
660impl From<Uuid> for MessageId {
661    fn from(u: Uuid) -> Self {
662        Self::Uuid(u)
663    }
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669    use serde_json::json;
670
671    #[test]
672    fn test_message_creation() {
673        let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
674        assert_eq!(message.id.to_string(), "test");
675        assert!(!message.is_empty());
676    }
677
678    #[test]
679    fn test_message_serialization() {
680        let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
681        let serialized = message.serialize(SerializationFormat::Json).unwrap();
682        assert!(!serialized.is_empty());
683    }
684
685    #[derive(Deserialize, PartialEq, Debug)]
686    struct TestData {
687        number: i32,
688    }
689
690    #[test]
691    fn test_message_parsing() {
692        let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
693
694        let parsed: TestData = message.parse_json().unwrap();
695        assert_eq!(parsed.number, 42);
696    }
697
698    #[test]
699    fn test_format_detection() {
700        let json_bytes = Bytes::from(r#"{"test": true}"#);
701        let format = Message::detect_format(&json_bytes);
702
703        #[cfg(feature = "simd")]
704        assert_eq!(format, SerializationFormat::SimdJson);
705        #[cfg(not(feature = "simd"))]
706        assert_eq!(format, SerializationFormat::Json);
707    }
708
709    #[test]
710    fn test_message_metadata() {
711        let metadata = MessageMetadata::new(ContentType::Json, 100)
712            .with_header("custom".to_string(), "value".to_string())
713            .with_correlation_id("corr-123".to_string());
714
715        assert_eq!(metadata.size, 100);
716        assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
717        assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
718    }
719}