use super::types::PayloadFormat;
#[derive(Debug)]
pub enum ParseError {
Empty,
Json(sonic_rs::Error),
MsgPack(String),
UnsupportedFormat(&'static str),
}
impl std::fmt::Display for ParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Empty => write!(f, "empty payload"),
Self::Json(e) => write!(f, "json parse error: {e}"),
Self::MsgPack(msg) => write!(f, "msgpack decode error: {msg}"),
Self::UnsupportedFormat(msg) => write!(f, "unsupported format: {msg}"),
}
}
}
impl std::error::Error for ParseError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Json(e) => Some(e),
_ => None,
}
}
}
pub fn parse_payload(payload: &[u8], format: PayloadFormat) -> Result<sonic_rs::Value, ParseError> {
if payload.is_empty() {
return Err(ParseError::Empty);
}
let effective = match format {
PayloadFormat::Auto => PayloadFormat::detect(payload),
other => other,
};
match effective {
PayloadFormat::Json | PayloadFormat::Auto => {
sonic_rs::from_slice(payload).map_err(ParseError::Json)
}
PayloadFormat::MsgPack => {
#[cfg(feature = "worker-msgpack")]
{
let mut cursor: &[u8] = payload;
let value = rmpv::decode::read_value(&mut cursor)
.map_err(|e| ParseError::MsgPack(e.to_string()))?;
Ok(rmpv_to_sonic(&value))
}
#[cfg(not(feature = "worker-msgpack"))]
{
Err(ParseError::UnsupportedFormat(
"msgpack requires the worker-msgpack feature",
))
}
}
}
}
#[cfg(feature = "worker-msgpack")]
fn rmpv_to_sonic(value: &rmpv::Value) -> sonic_rs::Value {
use rmpv::Value as M;
use sonic_rs::Value as S;
let from_f64 = |f: f64| S::new_f64(f).unwrap_or_else(S::new_null);
match value {
M::Nil | M::Ext(_, _) => S::new_null(),
M::Boolean(b) => S::new_bool(*b),
M::Integer(i) => {
if let Some(n) = i.as_i64() {
S::new_i64(n)
} else if let Some(n) = i.as_u64() {
S::new_u64(n)
} else {
S::new_null()
}
}
M::F32(f) => from_f64(f64::from(*f)),
M::F64(f) => from_f64(*f),
M::String(s) => match s.as_str() {
Some(text) => S::from(text),
None => S::from(String::from_utf8_lossy(s.as_bytes())),
},
M::Binary(bytes) => S::from(String::from_utf8_lossy(bytes)),
M::Array(items) => {
let mut arr = sonic_rs::Array::new();
for item in items {
arr.push(rmpv_to_sonic(item));
}
S::from(arr)
}
M::Map(pairs) => {
let mut obj = sonic_rs::Object::new();
for (k, v) in pairs {
let key = msgpack_key_to_string(k);
obj.insert(&key, rmpv_to_sonic(v));
}
S::from(obj)
}
}
}
#[cfg(feature = "worker-msgpack")]
fn msgpack_key_to_string(key: &rmpv::Value) -> String {
use rmpv::Value as M;
match key {
M::String(s) => match s.as_str() {
Some(text) => text.to_string(),
None => String::from_utf8_lossy(s.as_bytes()).into_owned(),
},
M::Integer(i) => i.to_string(),
M::Boolean(b) => b.to_string(),
M::Nil => "null".to_string(),
other => format!("{other}"),
}
}
#[cfg(test)]
mod tests {
use sonic_rs::JsonValueTrait as _;
use super::*;
#[test]
fn parse_valid_json() {
let payload = br#"{"host": "web1", "status": 200}"#;
let value = parse_payload(payload, PayloadFormat::Json).unwrap();
assert_eq!(value.get("host").and_then(|v| v.as_str()), Some("web1"));
assert_eq!(value.get("status").and_then(|v| v.as_u64()), Some(200));
}
#[test]
fn parse_auto_detects_json() {
let payload = br#"{"_table": "events"}"#;
let value = parse_payload(payload, PayloadFormat::Auto).unwrap();
assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
}
#[test]
fn parse_invalid_json_returns_error() {
let payload = b"this is not json {";
let result = parse_payload(payload, PayloadFormat::Json);
assert!(
matches!(result, Err(ParseError::Json(_))),
"expected Json error, got {result:?}"
);
}
#[test]
fn parse_empty_payload_returns_empty_error() {
let result = parse_payload(b"", PayloadFormat::Json);
assert!(
matches!(result, Err(ParseError::Empty)),
"expected Empty error, got {result:?}"
);
}
#[test]
fn parse_empty_payload_auto_returns_empty_error() {
let result = parse_payload(b"", PayloadFormat::Auto);
assert!(matches!(result, Err(ParseError::Empty)));
}
#[test]
fn parse_nested_json() {
let payload = br#"{"meta": {"source": "kafka", "version": 3}, "data": [1, 2, 3]}"#;
let value = parse_payload(payload, PayloadFormat::Json).unwrap();
assert!(value.get("meta").is_some());
assert!(value.get("data").is_some());
let meta = value.get("meta").unwrap();
assert_eq!(meta.get("source").and_then(|v| v.as_str()), Some("kafka"));
}
#[test]
fn parse_json_with_unicode() {
let payload = "{\"name\": \"caf\\u00e9\"}".as_bytes();
let value = parse_payload(payload, PayloadFormat::Json).unwrap();
assert!(value.get("name").is_some());
}
#[test]
fn parse_error_display_empty() {
let e = ParseError::Empty;
assert_eq!(e.to_string(), "empty payload");
}
#[test]
fn parse_error_display_msgpack_unsupported() {
#[cfg(not(feature = "worker-msgpack"))]
{
let payload: &[u8] = &[0x81, 0xa3, b'k', b'e', b'y', 0x01];
let result = parse_payload(payload, PayloadFormat::MsgPack);
assert!(
matches!(result, Err(ParseError::UnsupportedFormat(_))),
"expected UnsupportedFormat, got {result:?}"
);
}
#[cfg(feature = "worker-msgpack")]
{
let e = ParseError::UnsupportedFormat("test");
assert!(e.to_string().contains("test"));
}
}
#[cfg(feature = "worker-msgpack")]
mod msgpack_native {
use super::*;
fn fixstr(s: &str) -> Vec<u8> {
let bytes = s.as_bytes();
let mut out = vec![0xa0 | u8::try_from(bytes.len()).expect("len < 32")];
out.extend_from_slice(bytes);
out
}
fn sample() -> Vec<u8> {
let mut buf = vec![0x80 | 5]; buf.extend(fixstr("_table"));
buf.extend(fixstr("events"));
buf.extend(fixstr("org_id"));
buf.push(42); buf.extend(fixstr("live"));
buf.push(0xc3); buf.extend(fixstr("ratio"));
buf.push(0xcb); buf.extend_from_slice(&1.5f64.to_be_bytes());
buf.extend(fixstr("missing"));
buf.push(0xc0); buf
}
#[test]
fn msgpack_native_decode_extracts_string_field() {
let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
}
#[test]
fn msgpack_native_decode_preserves_scalar_types() {
let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
assert_eq!(value.get("org_id").and_then(|v| v.as_i64()), Some(42));
assert_eq!(value.get("live").and_then(|v| v.as_bool()), Some(true));
assert_eq!(value.get("ratio").and_then(|v| v.as_f64()), Some(1.5));
assert!(value.get("missing").is_some_and(|v| v.is_null()));
}
#[test]
fn msgpack_auto_detects_and_decodes_natively() {
let value = parse_payload(&sample(), PayloadFormat::Auto).unwrap();
assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
}
#[test]
fn msgpack_nested_array_and_map_walk() {
let mut buf = vec![0x80 | 2];
buf.extend(fixstr("items"));
buf.push(0x90 | 2); buf.push(1);
buf.push(2);
buf.extend(fixstr("meta"));
buf.push(0x80 | 1); buf.extend(fixstr("k"));
buf.extend(fixstr("v"));
let value = parse_payload(&buf, PayloadFormat::MsgPack).unwrap();
let items = value.get("items").unwrap();
assert_eq!(items[0].as_i64(), Some(1));
assert_eq!(items[1].as_i64(), Some(2));
assert_eq!(
value
.get("meta")
.and_then(|m| m.get("k"))
.and_then(|v| v.as_str()),
Some("v")
);
}
#[test]
fn malformed_msgpack_returns_msgpack_error() {
let result = parse_payload(&[0x81], PayloadFormat::MsgPack);
assert!(
matches!(result, Err(ParseError::MsgPack(_))),
"expected MsgPack error, got {result:?}"
);
}
}
}