#![cfg_attr(not(feature = "std"), no_std)]
#![deny(unsafe_code)]
#![warn(missing_docs)]
#![cfg_attr(docsrs, feature(doc_cfg))]
extern crate alloc;
use alloc::string::{String, ToString};
use alloc::vec::Vec;
use core::fmt;
use serde::{Serialize, de::DeserializeOwned};
pub use turbomcp_core::error::McpError;
#[derive(Debug, Clone)]
pub struct CodecError {
pub message: String,
pub source: Option<String>,
}
impl fmt::Display for CodecError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "codec error: {}", self.message)
}
}
#[cfg(feature = "std")]
impl std::error::Error for CodecError {}
impl CodecError {
pub fn new(message: impl Into<String>) -> Self {
Self {
message: message.into(),
source: None,
}
}
pub fn with_source(message: impl Into<String>, source: impl Into<String>) -> Self {
Self {
message: message.into(),
source: Some(source.into()),
}
}
pub fn encode(message: impl Into<String>) -> Self {
Self::new(alloc::format!("encode: {}", message.into()))
}
pub fn decode(message: impl Into<String>) -> Self {
Self::new(alloc::format!("decode: {}", message.into()))
}
}
impl From<CodecError> for McpError {
fn from(err: CodecError) -> Self {
McpError::parse_error(err.message)
}
}
pub type CodecResult<T> = Result<T, CodecError>;
pub trait Codec: Send + Sync {
fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>>;
fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T>;
fn content_type(&self) -> &'static str;
fn supports_streaming(&self) -> bool {
false
}
fn name(&self) -> &'static str;
}
#[derive(Debug, Clone, Default)]
pub struct JsonCodec {
pub pretty: bool,
}
impl JsonCodec {
pub fn new() -> Self {
Self::default()
}
pub fn pretty() -> Self {
Self { pretty: true }
}
}
impl Codec for JsonCodec {
fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
if self.pretty {
serde_json::to_vec_pretty(value)
} else {
serde_json::to_vec(value)
}
.map_err(|e| CodecError::encode(e.to_string()))
}
fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
serde_json::from_slice(bytes).map_err(|e| CodecError::decode(e.to_string()))
}
fn content_type(&self) -> &'static str {
"application/json"
}
fn supports_streaming(&self) -> bool {
true
}
fn name(&self) -> &'static str {
"json"
}
}
#[cfg(feature = "simd")]
#[cfg_attr(docsrs, doc(cfg(feature = "simd")))]
#[derive(Debug, Clone, Default)]
pub struct SimdJsonCodec;
#[cfg(feature = "simd")]
impl SimdJsonCodec {
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "simd")]
impl Codec for SimdJsonCodec {
fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
sonic_rs::to_vec(value).map_err(|e| CodecError::encode(e.to_string()))
}
fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
sonic_rs::from_slice(bytes).map_err(|e| CodecError::decode(e.to_string()))
}
fn content_type(&self) -> &'static str {
"application/json"
}
fn supports_streaming(&self) -> bool {
true
}
fn name(&self) -> &'static str {
"simd-json"
}
}
#[cfg(feature = "msgpack")]
#[cfg_attr(docsrs, doc(cfg(feature = "msgpack")))]
#[derive(Debug, Clone, Default)]
pub struct MsgPackCodec;
#[cfg(feature = "msgpack")]
impl MsgPackCodec {
pub fn new() -> Self {
Self
}
}
#[cfg(feature = "msgpack")]
impl Codec for MsgPackCodec {
fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
rmp_serde::to_vec_named(value).map_err(|e| CodecError::encode(e.to_string()))
}
fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
rmp_serde::from_slice(bytes).map_err(|e| CodecError::decode(e.to_string()))
}
fn content_type(&self) -> &'static str {
"application/msgpack"
}
fn supports_streaming(&self) -> bool {
false
}
fn name(&self) -> &'static str {
"msgpack"
}
}
const MAX_STREAMING_BUFFER_SIZE: usize = 1024 * 1024;
#[derive(Debug)]
pub struct StreamingJsonDecoder {
buffer: Vec<u8>,
max_buffer_size: usize,
}
impl Default for StreamingJsonDecoder {
fn default() -> Self {
Self::new()
}
}
impl StreamingJsonDecoder {
pub fn new() -> Self {
Self {
buffer: Vec::new(),
max_buffer_size: MAX_STREAMING_BUFFER_SIZE,
}
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(capacity),
max_buffer_size: MAX_STREAMING_BUFFER_SIZE,
}
}
pub fn with_max_size(max_size: usize) -> Self {
Self {
buffer: Vec::new(),
max_buffer_size: max_size.min(10 * 1024 * 1024), }
}
pub fn feed(&mut self, data: &[u8]) {
self.buffer.extend_from_slice(data);
if self.buffer.len() > self.max_buffer_size {
#[cfg(feature = "std")]
tracing::warn!(
buffer_size = self.buffer.len(),
max_size = self.max_buffer_size,
"Streaming buffer exceeded maximum size, clearing buffer"
);
self.buffer.clear();
}
}
pub fn try_decode<T: DeserializeOwned>(&mut self) -> CodecResult<Option<T>> {
if let Some(pos) = self.buffer.iter().position(|&b| b == b'\n') {
let line = &self.buffer[..pos];
if line.is_empty() || line.iter().all(|b| b.is_ascii_whitespace()) {
self.buffer.drain(..=pos);
return Ok(None);
}
let result = serde_json::from_slice(line);
self.buffer.drain(..=pos);
match result {
Ok(value) => Ok(Some(value)),
Err(e) => Err(CodecError::decode(e.to_string())),
}
} else {
Ok(None)
}
}
pub fn clear(&mut self) {
self.buffer.clear();
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn max_buffer_size(&self) -> usize {
self.max_buffer_size
}
}
#[derive(Debug, Clone)]
pub enum AnyCodec {
Json(JsonCodec),
#[cfg(feature = "simd")]
#[cfg_attr(docsrs, doc(cfg(feature = "simd")))]
SimdJson(SimdJsonCodec),
#[cfg(feature = "msgpack")]
#[cfg_attr(docsrs, doc(cfg(feature = "msgpack")))]
MsgPack(MsgPackCodec),
}
impl AnyCodec {
pub fn from_name(name: &str) -> Option<Self> {
match name {
"json" => Some(Self::Json(JsonCodec::new())),
#[cfg(feature = "simd")]
"simd" | "simd-json" => Some(Self::SimdJson(SimdJsonCodec::new())),
#[cfg(feature = "msgpack")]
"msgpack" => Some(Self::MsgPack(MsgPackCodec::new())),
_ => None,
}
}
pub fn available_names() -> &'static [&'static str] {
&[
"json",
#[cfg(feature = "simd")]
"simd-json",
#[cfg(feature = "msgpack")]
"msgpack",
]
}
pub fn encode<T: Serialize>(&self, value: &T) -> CodecResult<Vec<u8>> {
match self {
Self::Json(c) => c.encode(value),
#[cfg(feature = "simd")]
Self::SimdJson(c) => c.encode(value),
#[cfg(feature = "msgpack")]
Self::MsgPack(c) => c.encode(value),
}
}
pub fn decode<T: DeserializeOwned>(&self, bytes: &[u8]) -> CodecResult<T> {
match self {
Self::Json(c) => c.decode(bytes),
#[cfg(feature = "simd")]
Self::SimdJson(c) => c.decode(bytes),
#[cfg(feature = "msgpack")]
Self::MsgPack(c) => c.decode(bytes),
}
}
pub fn content_type(&self) -> &'static str {
match self {
Self::Json(c) => c.content_type(),
#[cfg(feature = "simd")]
Self::SimdJson(c) => c.content_type(),
#[cfg(feature = "msgpack")]
Self::MsgPack(c) => c.content_type(),
}
}
pub fn name(&self) -> &'static str {
match self {
Self::Json(c) => c.name(),
#[cfg(feature = "simd")]
Self::SimdJson(c) => c.name(),
#[cfg(feature = "msgpack")]
Self::MsgPack(c) => c.name(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
struct TestMessage {
id: u32,
method: String,
params: Option<serde_json::Value>,
}
#[test]
fn test_json_codec_roundtrip() {
let codec = JsonCodec::new();
let msg = TestMessage {
id: 42,
method: "test/method".into(),
params: Some(serde_json::json!({"key": "value"})),
};
let encoded = codec.encode(&msg).unwrap();
let decoded: TestMessage = codec.decode(&encoded).unwrap();
assert_eq!(msg, decoded);
}
#[test]
fn test_json_codec_pretty() {
let codec = JsonCodec::pretty();
let msg = TestMessage {
id: 1,
method: "test".into(),
params: None,
};
let encoded = codec.encode(&msg).unwrap();
let output = String::from_utf8(encoded).unwrap();
assert!(output.contains('\n'));
}
#[test]
fn test_codec_content_type() {
let json = JsonCodec::new();
assert_eq!(json.content_type(), "application/json");
assert_eq!(json.name(), "json");
}
#[test]
fn test_streaming_decoder() {
let mut decoder = StreamingJsonDecoder::new();
decoder.feed(br#"{"id":1,"method":"a","params":null}"#);
assert!(decoder.try_decode::<TestMessage>().unwrap().is_none());
decoder.feed(b"\n");
let msg: TestMessage = decoder.try_decode().unwrap().unwrap();
assert_eq!(msg.id, 1);
assert_eq!(msg.method, "a");
}
#[test]
fn test_streaming_decoder_multiple() {
let mut decoder = StreamingJsonDecoder::new();
decoder.feed(
br#"{"id":1,"method":"a","params":null}
{"id":2,"method":"b","params":null}
"#,
);
let msg1: TestMessage = decoder.try_decode().unwrap().unwrap();
assert_eq!(msg1.id, 1);
let msg2: TestMessage = decoder.try_decode().unwrap().unwrap();
assert_eq!(msg2.id, 2);
assert!(decoder.try_decode::<TestMessage>().unwrap().is_none());
}
#[test]
fn test_streaming_decoder_buffer_limit() {
let mut decoder = StreamingJsonDecoder::with_max_size(100);
let large_data = vec![b'x'; 150];
decoder.feed(&large_data);
assert!(
decoder.is_empty(),
"Buffer should be cleared after exceeding limit"
);
}
#[test]
fn test_streaming_decoder_max_size_cap() {
let decoder = StreamingJsonDecoder::with_max_size(100 * 1024 * 1024);
assert_eq!(decoder.max_buffer_size(), 10 * 1024 * 1024);
}
#[test]
fn test_streaming_decoder_default_limit() {
let decoder = StreamingJsonDecoder::new();
assert_eq!(decoder.max_buffer_size(), 1024 * 1024); }
#[test]
fn test_any_codec() {
let codec = AnyCodec::from_name("json").unwrap();
assert_eq!(codec.name(), "json");
assert!(AnyCodec::from_name("unknown").is_none());
assert!(AnyCodec::available_names().contains(&"json"));
}
#[test]
fn test_codec_error() {
let codec = JsonCodec::new();
let result: CodecResult<TestMessage> = codec.decode(b"invalid json");
assert!(result.is_err());
let err = result.unwrap_err();
assert!(err.message.contains("decode"));
}
#[cfg(feature = "simd")]
#[test]
fn test_simd_codec_roundtrip() {
let codec = SimdJsonCodec::new();
let msg = TestMessage {
id: 99,
method: "simd/test".into(),
params: Some(serde_json::json!([1, 2, 3])),
};
let encoded = codec.encode(&msg).unwrap();
let decoded: TestMessage = codec.decode(&encoded).unwrap();
assert_eq!(msg, decoded);
}
#[cfg(feature = "msgpack")]
#[test]
fn test_msgpack_codec_roundtrip() {
let codec = MsgPackCodec::new();
let msg = TestMessage {
id: 77,
method: "msgpack/test".into(),
params: None,
};
let encoded = codec.encode(&msg).unwrap();
let decoded: TestMessage = codec.decode(&encoded).unwrap();
assert_eq!(msg, decoded);
assert_eq!(codec.content_type(), "application/msgpack");
}
}