turbomcp_core/
message.rs

1//! Optimized message types and serialization.
2//!
3//! This module provides zero-copy message handling with optimized serialization
4//! for maximum performance. It supports multiple serialization formats and
5//! includes SIMD acceleration when available.
6
7use std::collections::HashMap;
8use std::fmt;
9use std::sync::Arc;
10
11use bytes::{Bytes, BytesMut};
12use serde::{Deserialize, Serialize};
13use uuid::Uuid;
14
15use crate::error::{Error, Result};
16use crate::types::{ContentType, ProtocolVersion, Timestamp};
17
18/// Unique identifier for messages
19#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
20#[serde(untagged)]
21pub enum MessageId {
22    /// String identifier
23    String(String),
24    /// Numeric identifier
25    Number(i64),
26    /// UUID identifier
27    Uuid(Uuid),
28}
29
30/// Message metadata for tracking and debugging
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct MessageMetadata {
33    /// Message creation timestamp
34    pub created_at: Timestamp,
35
36    /// Protocol version used
37    pub protocol_version: ProtocolVersion,
38
39    /// Content encoding (gzip, brotli, etc.)
40    pub encoding: Option<String>,
41
42    /// Content type of the payload
43    pub content_type: ContentType,
44
45    /// Message size in bytes
46    pub size: usize,
47
48    /// Correlation ID for request tracing
49    pub correlation_id: Option<String>,
50
51    /// Custom headers
52    pub headers: HashMap<String, String>,
53}
54
55/// Optimized message container with zero-copy support
56#[derive(Debug, Clone)]
57pub struct Message {
58    /// Message identifier
59    pub id: MessageId,
60
61    /// Message metadata
62    pub metadata: MessageMetadata,
63
64    /// Message payload with zero-copy optimization
65    pub payload: MessagePayload,
66}
67
68/// Zero-copy message payload
69#[derive(Debug, Clone)]
70pub enum MessagePayload {
71    /// JSON payload with potential zero-copy
72    Json(JsonPayload),
73
74    /// Binary payload (`MessagePack`, Protocol Buffers, etc.)
75    Binary(BinaryPayload),
76
77    /// Text payload
78    Text(String),
79
80    /// Empty payload
81    Empty,
82}
83
84/// JSON payload with zero-copy support
85#[derive(Debug, Clone)]
86pub struct JsonPayload {
87    /// Raw JSON bytes (zero-copy when possible)
88    pub raw: Bytes,
89
90    /// Parsed JSON value (lazily evaluated)
91    pub parsed: Option<Arc<serde_json::Value>>,
92
93    /// Whether the raw bytes are valid JSON
94    pub is_valid: bool,
95}
96
97/// Binary payload for efficient serialization formats
98#[derive(Debug, Clone)]
99pub struct BinaryPayload {
100    /// Raw binary data
101    pub data: Bytes,
102
103    /// Binary format identifier
104    pub format: BinaryFormat,
105}
106
107/// Supported binary serialization formats
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
109#[serde(rename_all = "lowercase")]
110pub enum BinaryFormat {
111    /// `MessagePack` format
112    MessagePack,
113
114    /// Protocol Buffers
115    ProtoBuf,
116
117    /// CBOR (Concise Binary Object Representation)
118    Cbor,
119
120    /// Custom binary format
121    Custom,
122}
123
124/// Message serializer with format detection
125#[derive(Debug)]
126pub struct MessageSerializer {
127    /// Default serialization format
128    default_format: SerializationFormat,
129
130    /// Whether to enable compression
131    enable_compression: bool,
132
133    /// Compression threshold in bytes
134    compression_threshold: usize,
135}
136
137/// Supported serialization formats
138#[derive(Debug, Clone, Copy, PartialEq, Eq)]
139pub enum SerializationFormat {
140    /// Standard JSON
141    Json,
142
143    /// Fast JSON with SIMD
144    #[cfg(feature = "simd")]
145    SimdJson,
146
147    /// `MessagePack` binary format
148    MessagePack,
149
150    /// CBOR binary format
151    Cbor,
152}
153
154impl Message {
155    /// Create a new message with JSON payload
156    ///
157    /// # Errors
158    ///
159    /// Returns an error if the value cannot be serialized to JSON.
160    pub fn json(id: MessageId, value: impl Serialize) -> Result<Self> {
161        let json_bytes = Self::serialize_json(&value)?;
162        let payload = MessagePayload::Json(JsonPayload {
163            raw: json_bytes.freeze(),
164            parsed: Some(Arc::new(serde_json::to_value(value)?)),
165            is_valid: true,
166        });
167
168        Ok(Self {
169            id,
170            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
171            payload,
172        })
173    }
174
175    /// Create a new message with binary payload
176    pub fn binary(id: MessageId, data: Bytes, format: BinaryFormat) -> Self {
177        let size = data.len();
178        let payload = MessagePayload::Binary(BinaryPayload { data, format });
179
180        Self {
181            id,
182            metadata: MessageMetadata::new(ContentType::Binary, size),
183            payload,
184        }
185    }
186
187    /// Create a new message with text payload
188    #[must_use]
189    pub fn text(id: MessageId, text: String) -> Self {
190        let size = text.len();
191        let payload = MessagePayload::Text(text);
192
193        Self {
194            id,
195            metadata: MessageMetadata::new(ContentType::Text, size),
196            payload,
197        }
198    }
199
200    /// Create an empty message
201    #[must_use]
202    pub fn empty(id: MessageId) -> Self {
203        Self {
204            id,
205            metadata: MessageMetadata::new(ContentType::Json, 0),
206            payload: MessagePayload::Empty,
207        }
208    }
209
210    /// Get the message size in bytes
211    pub const fn size(&self) -> usize {
212        self.metadata.size
213    }
214
215    /// Check if the message is empty
216    pub const fn is_empty(&self) -> bool {
217        matches!(self.payload, MessagePayload::Empty)
218    }
219
220    /// Serialize message to bytes using the specified format
221    ///
222    /// # Errors
223    ///
224    /// Returns an error if serialization fails for the specified format.
225    pub fn serialize(&self, format: SerializationFormat) -> Result<Bytes> {
226        match format {
227            SerializationFormat::Json => self.serialize_json_format(),
228            #[cfg(feature = "simd")]
229            SerializationFormat::SimdJson => self.serialize_simd_json(),
230            SerializationFormat::MessagePack => self.serialize_messagepack(),
231            SerializationFormat::Cbor => self.serialize_cbor(),
232        }
233    }
234
235    /// Deserialize message from bytes with format auto-detection
236    ///
237    /// # Errors
238    ///
239    /// Returns an error if format detection fails or deserialization fails.
240    pub fn deserialize(bytes: Bytes) -> Result<Self> {
241        // Try to detect format from content
242        let format = Self::detect_format(&bytes);
243        Self::deserialize_with_format(bytes, format)
244    }
245
246    /// Deserialize message from bytes using specified format
247    pub fn deserialize_with_format(bytes: Bytes, format: SerializationFormat) -> Result<Self> {
248        match format {
249            SerializationFormat::Json => Ok(Self::deserialize_json(bytes)),
250            #[cfg(feature = "simd")]
251            SerializationFormat::SimdJson => Ok(Self::deserialize_simd_json(bytes)),
252            SerializationFormat::MessagePack => Ok(Self::deserialize_messagepack(bytes)),
253            SerializationFormat::Cbor => Self::deserialize_cbor(bytes),
254        }
255    }
256
257    /// Parse JSON payload to structured data
258    pub fn parse_json<T>(&self) -> Result<T>
259    where
260        T: for<'de> Deserialize<'de>,
261    {
262        match &self.payload {
263            MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
264                || {
265                    #[cfg(feature = "simd")]
266                    {
267                        let mut json_bytes = json_payload.raw.to_vec();
268                        simd_json::from_slice(&mut json_bytes).map_err(|e| {
269                            Error::serialization(format!("SIMD JSON parsing failed: {e}"))
270                        })
271                    }
272                    #[cfg(not(feature = "simd"))]
273                    {
274                        serde_json::from_slice(&json_payload.raw).map_err(|e| {
275                            Error::serialization(format!("JSON parsing failed: {}", e))
276                        })
277                    }
278                },
279                |parsed| {
280                    serde_json::from_value((**parsed).clone())
281                        .map_err(|e| Error::serialization(format!("JSON parsing failed: {e}")))
282                },
283            ),
284            _ => Err(Error::validation("Message payload is not JSON")),
285        }
286    }
287
288    // Private helper methods
289
290    fn serialize_json(value: &impl Serialize) -> Result<BytesMut> {
291        #[cfg(feature = "simd")]
292        {
293            sonic_rs::to_vec(value)
294                .map(|v| BytesMut::from(v.as_slice()))
295                .map_err(|e| Error::serialization(format!("SIMD JSON serialization failed: {e}")))
296        }
297        #[cfg(not(feature = "simd"))]
298        {
299            serde_json::to_vec(value)
300                .map(|v| BytesMut::from(v.as_slice()))
301                .map_err(|e| Error::serialization(format!("JSON serialization failed: {}", e)))
302        }
303    }
304
305    fn serialize_json_format(&self) -> Result<Bytes> {
306        match &self.payload {
307            MessagePayload::Json(json_payload) => Ok(json_payload.raw.clone()),
308            MessagePayload::Text(text) => Ok(Bytes::from(text.clone())),
309            MessagePayload::Empty => Ok(Bytes::from_static(b"{}")),
310            MessagePayload::Binary(_) => Err(Error::validation(
311                "Cannot serialize non-JSON payload as JSON",
312            )),
313        }
314    }
315
316    #[cfg(feature = "simd")]
317    fn serialize_simd_json(&self) -> Result<Bytes> {
318        match &self.payload {
319            MessagePayload::Json(json_payload) => {
320                if json_payload.is_valid {
321                    Ok(json_payload.raw.clone())
322                } else {
323                    Err(Error::serialization("Invalid JSON payload"))
324                }
325            }
326            _ => Err(Error::validation(
327                "Cannot serialize non-JSON payload with SIMD JSON",
328            )),
329        }
330    }
331
332    fn serialize_messagepack(&self) -> Result<Bytes> {
333        #[cfg(feature = "messagepack")]
334        {
335            match &self.payload {
336                MessagePayload::Binary(binary) if binary.format == BinaryFormat::MessagePack => {
337                    Ok(binary.data.clone())
338                }
339                MessagePayload::Json(json_payload) => json_payload.parsed.as_ref().map_or_else(
340                    || {
341                        Err(Error::serialization(
342                            "Cannot serialize unparsed JSON to MessagePack",
343                        ))
344                    },
345                    |parsed| {
346                        rmp_serde::to_vec(parsed.as_ref())
347                            .map(Bytes::from)
348                            .map_err(|e| {
349                                Error::serialization(format!(
350                                    "MessagePack serialization failed: {e}"
351                                ))
352                            })
353                    },
354                ),
355                _ => Err(Error::validation("Cannot serialize payload as MessagePack")),
356            }
357        }
358        #[cfg(not(feature = "messagepack"))]
359        {
360            let _ = self; // Silence unused warning
361            Err(Error::validation("MessagePack serialization not available"))
362        }
363    }
364
365    fn serialize_cbor(&self) -> Result<Bytes> {
366        match &self.payload {
367            MessagePayload::Binary(binary) if binary.format == BinaryFormat::Cbor => {
368                Ok(binary.data.clone())
369            }
370            MessagePayload::Json(json_payload) => {
371                if let Some(parsed) = &json_payload.parsed {
372                    serde_cbor::to_vec(parsed.as_ref())
373                        .map(Bytes::from)
374                        .map_err(|e| {
375                            Error::serialization(format!("CBOR serialization failed: {e}"))
376                        })
377                } else {
378                    // Fallback: attempt to parse then encode
379                    #[cfg(feature = "simd")]
380                    {
381                        let mut json_bytes = json_payload.raw.to_vec();
382                        let value: serde_json::Value = simd_json::from_slice(&mut json_bytes)
383                            .map_err(|e| {
384                                Error::serialization(format!(
385                                    "SIMD JSON parsing failed before CBOR: {e}"
386                                ))
387                            })?;
388                        serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
389                            Error::serialization(format!("CBOR serialization failed: {e}"))
390                        })
391                    }
392                    #[cfg(not(feature = "simd"))]
393                    {
394                        let value: serde_json::Value = serde_json::from_slice(&json_payload.raw)
395                            .map_err(|e| {
396                                Error::serialization(format!(
397                                    "JSON parsing failed before CBOR: {}",
398                                    e
399                                ))
400                            })?;
401                        serde_cbor::to_vec(&value).map(Bytes::from).map_err(|e| {
402                            Error::serialization(format!("CBOR serialization failed: {}", e))
403                        })
404                    }
405                }
406            }
407            _ => Err(Error::validation("Cannot serialize payload as CBOR")),
408        }
409    }
410
411    fn deserialize_json(bytes: Bytes) -> Self {
412        // Validate JSON format
413        let is_valid = serde_json::from_slice::<serde_json::Value>(&bytes).is_ok();
414
415        let payload = MessagePayload::Json(JsonPayload {
416            raw: bytes,
417            parsed: None, // Lazy evaluation
418            is_valid,
419        });
420
421        Self {
422            id: MessageId::Uuid(Uuid::new_v4()),
423            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
424            payload,
425        }
426    }
427
428    #[cfg(feature = "simd")]
429    fn deserialize_simd_json(bytes: Bytes) -> Self {
430        let mut json_bytes = bytes.to_vec();
431        let is_valid = simd_json::from_slice::<serde_json::Value>(&mut json_bytes).is_ok();
432
433        let payload = MessagePayload::Json(JsonPayload {
434            raw: bytes,
435            parsed: None,
436            is_valid,
437        });
438
439        Self {
440            id: MessageId::Uuid(Uuid::new_v4()),
441            metadata: MessageMetadata::new(ContentType::Json, payload.size()),
442            payload,
443        }
444    }
445
446    fn deserialize_messagepack(bytes: Bytes) -> Self {
447        let payload = MessagePayload::Binary(BinaryPayload {
448            data: bytes,
449            format: BinaryFormat::MessagePack,
450        });
451
452        Self {
453            id: MessageId::Uuid(Uuid::new_v4()),
454            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
455            payload,
456        }
457    }
458
459    fn deserialize_cbor(bytes: Bytes) -> Result<Self> {
460        // Accept raw CBOR as binary or attempt to decode into JSON Value
461        if let Ok(value) = serde_cbor::from_slice::<serde_json::Value>(&bytes) {
462            let raw = serde_json::to_vec(&value)
463                .map(Bytes::from)
464                .map_err(|e| Error::serialization(format!("JSON re-encode failed: {e}")))?;
465            let payload = MessagePayload::Json(JsonPayload {
466                raw,
467                parsed: Some(Arc::new(value)),
468                is_valid: true,
469            });
470            return Ok(Self {
471                id: MessageId::Uuid(Uuid::new_v4()),
472                metadata: MessageMetadata::new(ContentType::Json, payload.size()),
473                payload,
474            });
475        }
476
477        // If decoding to JSON fails, keep as CBOR binary
478        let payload = MessagePayload::Binary(BinaryPayload {
479            data: bytes,
480            format: BinaryFormat::Cbor,
481        });
482        Ok(Self {
483            id: MessageId::Uuid(Uuid::new_v4()),
484            metadata: MessageMetadata::new(ContentType::Binary, payload.size()),
485            payload,
486        })
487    }
488
489    fn detect_format(bytes: &[u8]) -> SerializationFormat {
490        if bytes.is_empty() {
491            return SerializationFormat::Json;
492        }
493
494        // Check for JSON (starts with '{' or '[')
495        if matches!(bytes[0], b'{' | b'[') {
496            #[cfg(feature = "simd")]
497            {
498                return SerializationFormat::SimdJson;
499            }
500            #[cfg(not(feature = "simd"))]
501            {
502                return SerializationFormat::Json;
503            }
504        }
505
506        // Check for MessagePack (starts with specific bytes)
507        if bytes.len() >= 2 && (bytes[0] == 0x82 || bytes[0] == 0x83) {
508            return SerializationFormat::MessagePack;
509        }
510
511        // Default to JSON
512        #[cfg(feature = "simd")]
513        {
514            SerializationFormat::SimdJson
515        }
516        #[cfg(not(feature = "simd"))]
517        {
518            SerializationFormat::Json
519        }
520    }
521}
522
523impl MessagePayload {
524    /// Get the size of the payload in bytes
525    pub const fn size(&self) -> usize {
526        match self {
527            Self::Json(json) => json.raw.len(),
528            Self::Binary(binary) => binary.data.len(),
529            Self::Text(text) => text.len(),
530            Self::Empty => 0,
531        }
532    }
533}
534
535impl MessageMetadata {
536    /// Create new message metadata
537    #[must_use]
538    pub fn new(content_type: ContentType, size: usize) -> Self {
539        Self {
540            created_at: Timestamp::now(),
541            protocol_version: ProtocolVersion::default(),
542            encoding: None,
543            content_type,
544            size,
545            correlation_id: None,
546            headers: HashMap::new(),
547        }
548    }
549
550    /// Add a custom header
551    #[must_use]
552    pub fn with_header(mut self, key: String, value: String) -> Self {
553        self.headers.insert(key, value);
554        self
555    }
556
557    /// Set correlation ID for tracing
558    #[must_use]
559    pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
560        self.correlation_id = Some(correlation_id);
561        self
562    }
563
564    /// Set content encoding
565    #[must_use]
566    pub fn with_encoding(mut self, encoding: String) -> Self {
567        self.encoding = Some(encoding);
568        self
569    }
570}
571
572impl MessageSerializer {
573    /// Create a new message serializer with default settings
574    #[must_use]
575    pub const fn new() -> Self {
576        Self {
577            default_format: SerializationFormat::Json,
578            enable_compression: false,
579            compression_threshold: 1024, // 1KB
580        }
581    }
582
583    /// Set the default serialization format
584    #[must_use]
585    pub const fn with_format(mut self, format: SerializationFormat) -> Self {
586        self.default_format = format;
587        self
588    }
589
590    /// Enable compression for messages above threshold
591    #[must_use]
592    pub const fn with_compression(mut self, enable: bool, threshold: usize) -> Self {
593        self.enable_compression = enable;
594        self.compression_threshold = threshold;
595        self
596    }
597
598    /// Serialize a message using the default format
599    pub fn serialize(&self, message: &Message) -> Result<Bytes> {
600        let serialized = message.serialize(self.default_format)?;
601
602        // Apply compression if enabled and message is large enough
603        if self.enable_compression && serialized.len() > self.compression_threshold {
604            Ok(self.compress(serialized))
605        } else {
606            Ok(serialized)
607        }
608    }
609
610    const fn compress(&self, data: Bytes) -> Bytes {
611        // Compression implementation would go here
612        // For now, just return the original data
613        let _ = self; // Will use self when compression is implemented
614        data
615    }
616}
617
618impl Default for MessageSerializer {
619    fn default() -> Self {
620        Self::new()
621    }
622}
623
624impl fmt::Display for MessageId {
625    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
626        match self {
627            Self::String(s) => write!(f, "{s}"),
628            Self::Number(n) => write!(f, "{n}"),
629            Self::Uuid(u) => write!(f, "{u}"),
630        }
631    }
632}
633
634impl From<String> for MessageId {
635    fn from(s: String) -> Self {
636        Self::String(s)
637    }
638}
639
640impl From<&str> for MessageId {
641    fn from(s: &str) -> Self {
642        Self::String(s.to_string())
643    }
644}
645
646impl From<i64> for MessageId {
647    fn from(n: i64) -> Self {
648        Self::Number(n)
649    }
650}
651
652impl From<Uuid> for MessageId {
653    fn from(u: Uuid) -> Self {
654        Self::Uuid(u)
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661    use serde_json::json;
662
663    #[test]
664    fn test_message_creation() {
665        let message = Message::json(MessageId::from("test"), json!({"key": "value"})).unwrap();
666        assert_eq!(message.id.to_string(), "test");
667        assert!(!message.is_empty());
668    }
669
670    #[test]
671    fn test_message_serialization() {
672        let message = Message::json(MessageId::from(1), json!({"test": true})).unwrap();
673        let serialized = message.serialize(SerializationFormat::Json).unwrap();
674        assert!(!serialized.is_empty());
675    }
676
677    #[derive(Deserialize, PartialEq, Debug)]
678    struct TestData {
679        number: i32,
680    }
681
682    #[test]
683    fn test_message_parsing() {
684        let message = Message::json(MessageId::from("test"), json!({"number": 42})).unwrap();
685
686        let parsed: TestData = message.parse_json().unwrap();
687        assert_eq!(parsed.number, 42);
688    }
689
690    #[test]
691    fn test_format_detection() {
692        let json_bytes = Bytes::from(r#"{"test": true}"#);
693        let format = Message::detect_format(&json_bytes);
694
695        #[cfg(feature = "simd")]
696        assert_eq!(format, SerializationFormat::SimdJson);
697        #[cfg(not(feature = "simd"))]
698        assert_eq!(format, SerializationFormat::Json);
699    }
700
701    #[test]
702    fn test_message_metadata() {
703        let metadata = MessageMetadata::new(ContentType::Json, 100)
704            .with_header("custom".to_string(), "value".to_string())
705            .with_correlation_id("corr-123".to_string());
706
707        assert_eq!(metadata.size, 100);
708        assert_eq!(metadata.headers.get("custom"), Some(&"value".to_string()));
709        assert_eq!(metadata.correlation_id, Some("corr-123".to_string()));
710    }
711}