Skip to main content

turbomcp_wire/
lib.rs

1//! # TurboMCP Wire Format Codec
2//!
3//! This crate provides wire format encoding/decoding abstractions for MCP messages.
4//! It enables pluggable serialization formats while maintaining MCP protocol compliance.
5//!
6//! ## Design Philosophy
7//!
8//! - **Wire format**: JSON-RPC 2.0 (MCP protocol standard)
9//! - **Extensible**: Support for alternative formats (MessagePack, etc.)
10//! - **Zero-copy ready**: Integration with rkyv for internal message passing
11//! - **`no_std` compatible**: Works in embedded and WASM environments
12//!
13//! ## Usage
14//!
15//! ```rust
16//! use turbomcp_wire::{Codec, JsonCodec};
17//! use serde::{Serialize, Deserialize};
18//!
19//! #[derive(Serialize, Deserialize)]
20//! struct MyMessage {
21//!     id: u32,
22//!     method: String,
23//! }
24//!
25//! let codec = JsonCodec::new();
26//! let msg = MyMessage { id: 1, method: "test".into() };
27//!
28//! // Encode to bytes
29//! let bytes = codec.encode(&msg).unwrap();
30//!
31//! // Decode from bytes
32//! let decoded: MyMessage = codec.decode(&bytes).unwrap();
33//! ```
34//!
35//! ## Features
36//!
37//! - `std` - Standard library support (default)
38//! - `json` - JSON codec (default)
39//! - `simd` - SIMD-accelerated JSON (sonic-rs, simd-json)
40//! - `msgpack` - MessagePack binary format
41
42#![cfg_attr(not(feature = "std"), no_std)]
43#![deny(unsafe_code)]
44#![warn(missing_docs)]
45#![cfg_attr(docsrs, feature(doc_cfg))]
46
47extern crate alloc;
48
49use alloc::string::{String, ToString};
50use alloc::vec::Vec;
51use core::fmt;
52use serde::{Serialize, de::DeserializeOwned};
53
54// Re-export core types for convenience
55pub use turbomcp_core::error::McpError;
56
57/// Wire format codec error
58#[derive(Debug, Clone)]
59pub struct CodecError {
60    /// Error message
61    pub message: String,
62    /// Optional source location
63    pub source: Option<String>,
64}
65
66impl fmt::Display for CodecError {
67    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
68        write!(f, "codec error: {}", self.message)
69    }
70}
71
72#[cfg(feature = "std")]
73impl std::error::Error for CodecError {}
74
75impl CodecError {
76    /// Create a new codec error
77    pub fn new(message: impl Into<String>) -> Self {
78        Self {
79            message: message.into(),
80            source: None,
81        }
82    }
83
84    /// Create a codec error with source information
85    pub fn with_source(message: impl Into<String>, source: impl Into<String>) -> Self {
86        Self {
87            message: message.into(),
88            source: Some(source.into()),
89        }
90    }
91
92    /// Create an encoding error
93    pub fn encode(message: impl Into<String>) -> Self {
94        Self::new(alloc::format!("encode: {}", message.into()))
95    }
96
97    /// Create a decoding error
98    pub fn decode(message: impl Into<String>) -> Self {
99        Self::new(alloc::format!("decode: {}", message.into()))
100    }
101}
102
103impl From<CodecError> for McpError {
104    fn from(err: CodecError) -> Self {
105        McpError::parse_error(err.message)
106    }
107}
108
109/// Result type for codec operations
110pub type CodecResult<T> = Result<T, CodecError>;
111
112/// Wire format codec trait
113///
114/// This trait abstracts over different serialization formats, allowing
115/// pluggable encoding/decoding while maintaining type safety.
116///
117/// # Send + Sync Bounds
118///
119/// The `Send + Sync` bounds are required because codecs are typically shared across
120/// multiple threads/tasks in multi-threaded runtimes (tokio, async-std). This enables:
121///
122/// - **Concurrent encoding/decoding**: Multiple tasks can use the codec simultaneously
123/// - **Zero-copy sharing**: Codec instances can be wrapped in Arc and shared cheaply
124/// - **Thread-safe initialization**: Codec configuration is immutable after creation
125///
126/// ## WASM Implications
127///
128/// On WASM targets (single-threaded), these bounds are automatically satisfied since
129/// all types are `Send + Sync` by default in single-threaded environments. The trait
130/// bounds don't add overhead on WASM - they're purely compile-time constraints that
131/// prevent accidental use of non-thread-safe types on native platforms.
132///
133/// # Implementors
134///
135/// - [`JsonCodec`] - Standard JSON encoding (default)
136/// - `SimdJsonCodec` - SIMD-accelerated JSON (requires `simd` feature)
137/// - `MsgPackCodec` - MessagePack binary format (requires `msgpack` feature)
138pub trait Codec: Send + Sync {
139    /// Encode a value to bytes
140    fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>>;
141
142    /// Decode bytes to a value
143    fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T>;
144
145    /// Get the content type for this codec (e.g., "application/json")
146    fn content_type(&self) -> &'static str;
147
148    /// Check if this codec supports streaming
149    fn supports_streaming(&self) -> bool {
150        false
151    }
152
153    /// Get codec name for debugging
154    fn name(&self) -> &'static str;
155}
156
157/// JSON codec using serde_json
158///
159/// This is the default codec for MCP protocol compliance.
160/// It produces human-readable JSON suitable for debugging and logging.
161#[derive(Debug, Clone, Default)]
162pub struct JsonCodec {
163    /// Pretty print output (default: false)
164    pub pretty: bool,
165}
166
167impl JsonCodec {
168    /// Create a new JSON codec
169    pub fn new() -> Self {
170        Self::default()
171    }
172
173    /// Create a JSON codec with pretty printing enabled
174    pub fn pretty() -> Self {
175        Self { pretty: true }
176    }
177}
178
179impl Codec for JsonCodec {
180    fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
181        if self.pretty {
182            serde_json::to_vec_pretty(value)
183        } else {
184            serde_json::to_vec(value)
185        }
186        .map_err(|e| CodecError::encode(e.to_string()))
187    }
188
189    fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
190        serde_json::from_slice(bytes).map_err(|e| CodecError::decode(e.to_string()))
191    }
192
193    fn content_type(&self) -> &'static str {
194        "application/json"
195    }
196
197    fn supports_streaming(&self) -> bool {
198        true
199    }
200
201    fn name(&self) -> &'static str {
202        "json"
203    }
204}
205
206/// SIMD-accelerated JSON codec using sonic-rs
207///
208/// This codec uses SIMD instructions for faster JSON parsing.
209/// Falls back to standard serde_json on unsupported platforms.
210#[cfg(feature = "simd")]
211#[cfg_attr(docsrs, doc(cfg(feature = "simd")))]
212#[derive(Debug, Clone, Default)]
213pub struct SimdJsonCodec;
214
215#[cfg(feature = "simd")]
216impl SimdJsonCodec {
217    /// Create a new SIMD JSON codec
218    pub fn new() -> Self {
219        Self
220    }
221}
222
223#[cfg(feature = "simd")]
224impl Codec for SimdJsonCodec {
225    fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
226        sonic_rs::to_vec(value).map_err(|e| CodecError::encode(e.to_string()))
227    }
228
229    fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
230        sonic_rs::from_slice(bytes).map_err(|e| CodecError::decode(e.to_string()))
231    }
232
233    fn content_type(&self) -> &'static str {
234        "application/json"
235    }
236
237    fn supports_streaming(&self) -> bool {
238        true
239    }
240
241    fn name(&self) -> &'static str {
242        "simd-json"
243    }
244}
245
246/// MessagePack binary codec
247///
248/// This codec produces compact binary output, suitable for
249/// high-throughput scenarios where bandwidth is limited.
250///
251/// **Note**: MessagePack is not MCP-compliant for external communication
252/// but can be used for internal message passing.
253///
254/// # Security Considerations
255///
256/// When using MessagePack for untrusted input, be aware of:
257///
258/// ## Nesting Depth
259///
260/// Deeply nested structures can cause stack overflow. The underlying `rmp-serde`
261/// library has default recursion limits, but extremely nested payloads may still
262/// cause issues. Consider validating message structure before decoding.
263///
264/// ## Binary Field Size
265///
266/// MessagePack can encode arbitrarily large binary/string fields. Applications should:
267/// - Enforce maximum message size limits at the transport layer
268/// - Use streaming decoders for large payloads when possible
269/// - Set appropriate memory limits in production environments
270///
271/// ## Type Confusion
272///
273/// MessagePack's dynamic typing can lead to type confusion attacks. Always:
274/// - Validate deserialized data matches expected schema
275/// - Use strongly-typed Rust structs rather than `serde_json::Value`
276/// - Check for unexpected field types after deserialization
277///
278/// ## Recommended Usage
279///
280/// For production systems handling untrusted input, prefer JSON (with schema validation)
281/// or use MessagePack only within trusted boundaries (internal microservices, etc.).
282#[cfg(feature = "msgpack")]
283#[cfg_attr(docsrs, doc(cfg(feature = "msgpack")))]
284#[derive(Debug, Clone, Default)]
285pub struct MsgPackCodec;
286
287#[cfg(feature = "msgpack")]
288impl MsgPackCodec {
289    /// Create a new MessagePack codec
290    pub fn new() -> Self {
291        Self
292    }
293}
294
295#[cfg(feature = "msgpack")]
296impl Codec for MsgPackCodec {
297    fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
298        // Use named serialization to support skip_serializing_if on optional fields
299        rmp_serde::to_vec_named(value).map_err(|e| CodecError::encode(e.to_string()))
300    }
301
302    fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
303        rmp_serde::from_slice(bytes).map_err(|e| CodecError::decode(e.to_string()))
304    }
305
306    fn content_type(&self) -> &'static str {
307        "application/msgpack"
308    }
309
310    fn supports_streaming(&self) -> bool {
311        false
312    }
313
314    fn name(&self) -> &'static str {
315        "msgpack"
316    }
317}
318
319/// Maximum streaming buffer size (1MB) - prevents DoS via unbounded memory growth
320const MAX_STREAMING_BUFFER_SIZE: usize = 1024 * 1024;
321
322/// Streaming JSON decoder for Server-Sent Events (SSE)
323///
324/// This decoder handles newline-delimited JSON streams commonly
325/// used in HTTP/SSE transports.
326///
327/// # Security
328///
329/// The decoder enforces a maximum buffer size of 1MB to prevent
330/// denial-of-service attacks via unbounded memory consumption.
331/// If an attacker sends continuous data without newlines, the
332/// buffer will be cleared after exceeding the limit.
333#[derive(Debug)]
334pub struct StreamingJsonDecoder {
335    buffer: Vec<u8>,
336    max_buffer_size: usize,
337}
338
339impl Default for StreamingJsonDecoder {
340    fn default() -> Self {
341        Self::new()
342    }
343}
344
345impl StreamingJsonDecoder {
346    /// Create a new streaming decoder with default 1MB buffer limit
347    pub fn new() -> Self {
348        Self {
349            buffer: Vec::new(),
350            max_buffer_size: MAX_STREAMING_BUFFER_SIZE,
351        }
352    }
353
354    /// Create with pre-allocated buffer capacity and default limit
355    pub fn with_capacity(capacity: usize) -> Self {
356        Self {
357            buffer: Vec::with_capacity(capacity),
358            max_buffer_size: MAX_STREAMING_BUFFER_SIZE,
359        }
360    }
361
362    /// Create with custom maximum buffer size
363    ///
364    /// # Arguments
365    ///
366    /// * `max_size` - Maximum buffer size in bytes (capped at 10MB for safety)
367    ///
368    /// # Security
369    ///
370    /// Setting this too high may allow DoS attacks via memory exhaustion.
371    /// The value is capped at 10MB regardless of input.
372    pub fn with_max_size(max_size: usize) -> Self {
373        Self {
374            buffer: Vec::new(),
375            max_buffer_size: max_size.min(10 * 1024 * 1024), // Cap at 10MB
376        }
377    }
378
379    /// Feed data into the decoder
380    ///
381    /// # Security
382    ///
383    /// If the buffer exceeds the maximum size, it will be cleared and
384    /// an error will be returned on the next `try_decode` call.
385    pub fn feed(&mut self, data: &[u8]) {
386        self.buffer.extend_from_slice(data);
387
388        // Enforce buffer size limit to prevent DoS
389        if self.buffer.len() > self.max_buffer_size {
390            #[cfg(feature = "std")]
391            tracing::warn!(
392                buffer_size = self.buffer.len(),
393                max_size = self.max_buffer_size,
394                "Streaming buffer exceeded maximum size, clearing buffer"
395            );
396            self.buffer.clear();
397        }
398    }
399
400    /// Try to decode the next complete message
401    ///
402    /// Returns `Some(T)` if a complete message is available,
403    /// `None` if more data is needed.
404    pub fn try_decode<T: DeserializeOwned>(&mut self) -> CodecResult<Option<T>> {
405        // Look for newline delimiter
406        if let Some(pos) = self.buffer.iter().position(|&b| b == b'\n') {
407            let line = &self.buffer[..pos];
408
409            // Skip empty lines
410            if line.is_empty() || line.iter().all(|b| b.is_ascii_whitespace()) {
411                self.buffer.drain(..=pos);
412                return Ok(None);
413            }
414
415            // Try to decode
416            let result = serde_json::from_slice(line);
417
418            // Remove processed data (including newline)
419            self.buffer.drain(..=pos);
420
421            match result {
422                Ok(value) => Ok(Some(value)),
423                Err(e) => Err(CodecError::decode(e.to_string())),
424            }
425        } else {
426            Ok(None)
427        }
428    }
429
430    /// Clear the internal buffer
431    pub fn clear(&mut self) {
432        self.buffer.clear();
433    }
434
435    /// Check if buffer is empty
436    pub fn is_empty(&self) -> bool {
437        self.buffer.is_empty()
438    }
439
440    /// Get current buffer length
441    pub fn len(&self) -> usize {
442        self.buffer.len()
443    }
444
445    /// Get maximum buffer size
446    pub fn max_buffer_size(&self) -> usize {
447        self.max_buffer_size
448    }
449}
450
451/// Enum wrapper for all codec types
452///
453/// This provides a unified type for codec selection without requiring
454/// dyn trait objects (which aren't compatible with generic methods).
455#[derive(Debug, Clone)]
456pub enum AnyCodec {
457    /// Standard JSON codec
458    Json(JsonCodec),
459    /// SIMD-accelerated JSON codec
460    #[cfg(feature = "simd")]
461    #[cfg_attr(docsrs, doc(cfg(feature = "simd")))]
462    SimdJson(SimdJsonCodec),
463    /// MessagePack binary codec
464    #[cfg(feature = "msgpack")]
465    #[cfg_attr(docsrs, doc(cfg(feature = "msgpack")))]
466    MsgPack(MsgPackCodec),
467}
468
469impl AnyCodec {
470    /// Create a codec by name
471    ///
472    /// Supported names:
473    /// - `"json"` - Standard JSON codec
474    /// - `"simd"` or `"simd-json"` - SIMD-accelerated JSON (requires `simd` feature)
475    /// - `"msgpack"` - MessagePack binary (requires `msgpack` feature)
476    pub fn from_name(name: &str) -> Option<Self> {
477        match name {
478            "json" => Some(Self::Json(JsonCodec::new())),
479            #[cfg(feature = "simd")]
480            "simd" | "simd-json" => Some(Self::SimdJson(SimdJsonCodec::new())),
481            #[cfg(feature = "msgpack")]
482            "msgpack" => Some(Self::MsgPack(MsgPackCodec::new())),
483            _ => None,
484        }
485    }
486
487    /// List available codec names
488    pub fn available_names() -> &'static [&'static str] {
489        &[
490            "json",
491            #[cfg(feature = "simd")]
492            "simd-json",
493            #[cfg(feature = "msgpack")]
494            "msgpack",
495        ]
496    }
497
498    /// Encode a value to bytes
499    pub fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
500        match self {
501            Self::Json(c) => c.encode(value),
502            #[cfg(feature = "simd")]
503            Self::SimdJson(c) => c.encode(value),
504            #[cfg(feature = "msgpack")]
505            Self::MsgPack(c) => c.encode(value),
506        }
507    }
508
509    /// Decode bytes to a value
510    pub fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
511        match self {
512            Self::Json(c) => c.decode(bytes),
513            #[cfg(feature = "simd")]
514            Self::SimdJson(c) => c.decode(bytes),
515            #[cfg(feature = "msgpack")]
516            Self::MsgPack(c) => c.decode(bytes),
517        }
518    }
519
520    /// Get the content type
521    pub fn content_type(&self) -> &'static str {
522        match self {
523            Self::Json(c) => c.content_type(),
524            #[cfg(feature = "simd")]
525            Self::SimdJson(c) => c.content_type(),
526            #[cfg(feature = "msgpack")]
527            Self::MsgPack(c) => c.content_type(),
528        }
529    }
530
531    /// Get codec name
532    pub fn name(&self) -> &'static str {
533        match self {
534            Self::Json(c) => c.name(),
535            #[cfg(feature = "simd")]
536            Self::SimdJson(c) => c.name(),
537            #[cfg(feature = "msgpack")]
538            Self::MsgPack(c) => c.name(),
539        }
540    }
541}
542
543#[cfg(test)]
544mod tests {
545    use super::*;
546    use serde::{Deserialize, Serialize};
547
548    #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
549    struct TestMessage {
550        id: u32,
551        method: String,
552        params: Option<serde_json::Value>,
553    }
554
555    #[test]
556    fn test_json_codec_roundtrip() {
557        let codec = JsonCodec::new();
558        let msg = TestMessage {
559            id: 42,
560            method: "test/method".into(),
561            params: Some(serde_json::json!({"key": "value"})),
562        };
563
564        let encoded = codec.encode(&msg).unwrap();
565        let decoded: TestMessage = codec.decode(&encoded).unwrap();
566
567        assert_eq!(msg, decoded);
568    }
569
570    #[test]
571    fn test_json_codec_pretty() {
572        let codec = JsonCodec::pretty();
573        let msg = TestMessage {
574            id: 1,
575            method: "test".into(),
576            params: None,
577        };
578
579        let encoded = codec.encode(&msg).unwrap();
580        let output = String::from_utf8(encoded).unwrap();
581
582        // Pretty output should contain newlines
583        assert!(output.contains('\n'));
584    }
585
586    #[test]
587    fn test_codec_content_type() {
588        let json = JsonCodec::new();
589        assert_eq!(json.content_type(), "application/json");
590        assert_eq!(json.name(), "json");
591    }
592
593    #[test]
594    fn test_streaming_decoder() {
595        let mut decoder = StreamingJsonDecoder::new();
596
597        // Feed partial data
598        decoder.feed(br#"{"id":1,"method":"a","params":null}"#);
599        assert!(decoder.try_decode::<TestMessage>().unwrap().is_none());
600
601        // Feed newline to complete
602        decoder.feed(b"\n");
603        let msg: TestMessage = decoder.try_decode().unwrap().unwrap();
604        assert_eq!(msg.id, 1);
605        assert_eq!(msg.method, "a");
606    }
607
608    #[test]
609    fn test_streaming_decoder_multiple() {
610        let mut decoder = StreamingJsonDecoder::new();
611
612        // Feed multiple messages at once
613        decoder.feed(
614            br#"{"id":1,"method":"a","params":null}
615{"id":2,"method":"b","params":null}
616"#,
617        );
618
619        let msg1: TestMessage = decoder.try_decode().unwrap().unwrap();
620        assert_eq!(msg1.id, 1);
621
622        let msg2: TestMessage = decoder.try_decode().unwrap().unwrap();
623        assert_eq!(msg2.id, 2);
624
625        // No more messages
626        assert!(decoder.try_decode::<TestMessage>().unwrap().is_none());
627    }
628
629    #[test]
630    fn test_streaming_decoder_buffer_limit() {
631        let mut decoder = StreamingJsonDecoder::with_max_size(100);
632
633        // Feed data that exceeds buffer limit (no newline)
634        let large_data = vec![b'x'; 150];
635        decoder.feed(&large_data);
636
637        // Buffer should be cleared after exceeding limit
638        assert!(
639            decoder.is_empty(),
640            "Buffer should be cleared after exceeding limit"
641        );
642    }
643
644    #[test]
645    fn test_streaming_decoder_max_size_cap() {
646        // Try to create decoder with absurdly large limit
647        let decoder = StreamingJsonDecoder::with_max_size(100 * 1024 * 1024); // 100MB
648
649        // Should be capped at 10MB
650        assert_eq!(decoder.max_buffer_size(), 10 * 1024 * 1024);
651    }
652
653    #[test]
654    fn test_streaming_decoder_default_limit() {
655        let decoder = StreamingJsonDecoder::new();
656        assert_eq!(decoder.max_buffer_size(), 1024 * 1024); // 1MB default
657    }
658
659    #[test]
660    fn test_any_codec() {
661        let codec = AnyCodec::from_name("json").unwrap();
662        assert_eq!(codec.name(), "json");
663
664        assert!(AnyCodec::from_name("unknown").is_none());
665        assert!(AnyCodec::available_names().contains(&"json"));
666    }
667
668    #[test]
669    fn test_codec_error() {
670        let codec = JsonCodec::new();
671        let result: CodecResult<TestMessage> = codec.decode(b"invalid json");
672        assert!(result.is_err());
673
674        let err = result.unwrap_err();
675        assert!(err.message.contains("decode"));
676    }
677
678    #[cfg(feature = "simd")]
679    #[test]
680    fn test_simd_codec_roundtrip() {
681        let codec = SimdJsonCodec::new();
682        let msg = TestMessage {
683            id: 99,
684            method: "simd/test".into(),
685            params: Some(serde_json::json!([1, 2, 3])),
686        };
687
688        let encoded = codec.encode(&msg).unwrap();
689        let decoded: TestMessage = codec.decode(&encoded).unwrap();
690
691        assert_eq!(msg, decoded);
692    }
693
694    #[cfg(feature = "msgpack")]
695    #[test]
696    fn test_msgpack_codec_roundtrip() {
697        let codec = MsgPackCodec::new();
698        let msg = TestMessage {
699            id: 77,
700            method: "msgpack/test".into(),
701            params: None,
702        };
703
704        let encoded = codec.encode(&msg).unwrap();
705        let decoded: TestMessage = codec.decode(&encoded).unwrap();
706
707        assert_eq!(msg, decoded);
708        assert_eq!(codec.content_type(), "application/msgpack");
709    }
710}