hyperi_rustlib/worker/engine/
parse.rs1use super::types::PayloadFormat;
19
20#[derive(Debug)]
22pub enum ParseError {
23 Empty,
25 Json(sonic_rs::Error),
27 MsgPack(String),
29 UnsupportedFormat(&'static str),
31}
32
33impl std::fmt::Display for ParseError {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Empty => write!(f, "empty payload"),
37 Self::Json(e) => write!(f, "json parse error: {e}"),
38 Self::MsgPack(msg) => write!(f, "msgpack decode error: {msg}"),
39 Self::UnsupportedFormat(msg) => write!(f, "unsupported format: {msg}"),
40 }
41 }
42}
43
44impl std::error::Error for ParseError {
45 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
46 match self {
47 Self::Json(e) => Some(e),
48 _ => None,
49 }
50 }
51}
52
53pub fn parse_payload(payload: &[u8], format: PayloadFormat) -> Result<sonic_rs::Value, ParseError> {
68 if payload.is_empty() {
69 return Err(ParseError::Empty);
70 }
71
72 let effective = match format {
73 PayloadFormat::Auto => PayloadFormat::detect(payload),
74 other => other,
75 };
76
77 match effective {
78 PayloadFormat::Json | PayloadFormat::Auto => {
80 sonic_rs::from_slice(payload).map_err(ParseError::Json)
81 }
82 PayloadFormat::MsgPack => {
83 #[cfg(feature = "worker-msgpack")]
84 {
85 let mut cursor: &[u8] = payload;
93 let value = rmpv::decode::read_value(&mut cursor)
94 .map_err(|e| ParseError::MsgPack(e.to_string()))?;
95 Ok(rmpv_to_sonic(&value))
96 }
97 #[cfg(not(feature = "worker-msgpack"))]
98 {
99 Err(ParseError::UnsupportedFormat(
100 "msgpack requires the worker-msgpack feature",
101 ))
102 }
103 }
104 }
105}
106
107#[cfg(feature = "worker-msgpack")]
129fn rmpv_to_sonic(value: &rmpv::Value) -> sonic_rs::Value {
130 use rmpv::Value as M;
131 use sonic_rs::Value as S;
132
133 let from_f64 = |f: f64| S::new_f64(f).unwrap_or_else(S::new_null);
136
137 match value {
138 M::Nil | M::Ext(_, _) => S::new_null(),
142 M::Boolean(b) => S::new_bool(*b),
143 M::Integer(i) => {
144 if let Some(n) = i.as_i64() {
145 S::new_i64(n)
146 } else if let Some(n) = i.as_u64() {
147 S::new_u64(n)
148 } else {
149 S::new_null()
151 }
152 }
153 M::F32(f) => from_f64(f64::from(*f)),
154 M::F64(f) => from_f64(*f),
155 M::String(s) => match s.as_str() {
156 Some(text) => S::from(text),
157 None => S::from(String::from_utf8_lossy(s.as_bytes())),
158 },
159 M::Binary(bytes) => S::from(String::from_utf8_lossy(bytes)),
160 M::Array(items) => {
161 let mut arr = sonic_rs::Array::new();
162 for item in items {
163 arr.push(rmpv_to_sonic(item));
164 }
165 S::from(arr)
166 }
167 M::Map(pairs) => {
168 let mut obj = sonic_rs::Object::new();
169 for (k, v) in pairs {
170 let key = msgpack_key_to_string(k);
171 obj.insert(&key, rmpv_to_sonic(v));
172 }
173 S::from(obj)
174 }
175 }
176}
177
178#[cfg(feature = "worker-msgpack")]
182fn msgpack_key_to_string(key: &rmpv::Value) -> String {
183 use rmpv::Value as M;
184 match key {
185 M::String(s) => match s.as_str() {
186 Some(text) => text.to_string(),
187 None => String::from_utf8_lossy(s.as_bytes()).into_owned(),
188 },
189 M::Integer(i) => i.to_string(),
190 M::Boolean(b) => b.to_string(),
191 M::Nil => "null".to_string(),
192 other => format!("{other}"),
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use sonic_rs::JsonValueTrait as _;
199
200 use super::*;
201
202 #[test]
203 fn parse_valid_json() {
204 let payload = br#"{"host": "web1", "status": 200}"#;
205 let value = parse_payload(payload, PayloadFormat::Json).unwrap();
206 assert_eq!(value.get("host").and_then(|v| v.as_str()), Some("web1"));
207 assert_eq!(value.get("status").and_then(|v| v.as_u64()), Some(200));
208 }
209
210 #[test]
211 fn parse_auto_detects_json() {
212 let payload = br#"{"_table": "events"}"#;
213 let value = parse_payload(payload, PayloadFormat::Auto).unwrap();
214 assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
215 }
216
217 #[test]
218 fn parse_invalid_json_returns_error() {
219 let payload = b"this is not json {";
220 let result = parse_payload(payload, PayloadFormat::Json);
221 assert!(
222 matches!(result, Err(ParseError::Json(_))),
223 "expected Json error, got {result:?}"
224 );
225 }
226
227 #[test]
228 fn parse_empty_payload_returns_empty_error() {
229 let result = parse_payload(b"", PayloadFormat::Json);
230 assert!(
231 matches!(result, Err(ParseError::Empty)),
232 "expected Empty error, got {result:?}"
233 );
234 }
235
236 #[test]
237 fn parse_empty_payload_auto_returns_empty_error() {
238 let result = parse_payload(b"", PayloadFormat::Auto);
239 assert!(matches!(result, Err(ParseError::Empty)));
240 }
241
242 #[test]
243 fn parse_nested_json() {
244 let payload = br#"{"meta": {"source": "kafka", "version": 3}, "data": [1, 2, 3]}"#;
245 let value = parse_payload(payload, PayloadFormat::Json).unwrap();
246 assert!(value.get("meta").is_some());
247 assert!(value.get("data").is_some());
248 let meta = value.get("meta").unwrap();
250 assert_eq!(meta.get("source").and_then(|v| v.as_str()), Some("kafka"));
251 }
252
253 #[test]
254 fn parse_json_with_unicode() {
255 let payload = "{\"name\": \"caf\\u00e9\"}".as_bytes();
256 let value = parse_payload(payload, PayloadFormat::Json).unwrap();
257 assert!(value.get("name").is_some());
258 }
259
260 #[test]
261 fn parse_error_display_empty() {
262 let e = ParseError::Empty;
263 assert_eq!(e.to_string(), "empty payload");
264 }
265
266 #[test]
267 fn parse_error_display_msgpack_unsupported() {
268 #[cfg(not(feature = "worker-msgpack"))]
270 {
271 let payload: &[u8] = &[0x81, 0xa3, b'k', b'e', b'y', 0x01];
273 let result = parse_payload(payload, PayloadFormat::MsgPack);
274 assert!(
275 matches!(result, Err(ParseError::UnsupportedFormat(_))),
276 "expected UnsupportedFormat, got {result:?}"
277 );
278 }
279 #[cfg(feature = "worker-msgpack")]
280 {
281 let e = ParseError::UnsupportedFormat("test");
284 assert!(e.to_string().contains("test"));
285 }
286 }
287
288 #[cfg(feature = "worker-msgpack")]
293 mod msgpack_native {
294 use super::*;
295
296 fn fixstr(s: &str) -> Vec<u8> {
298 let bytes = s.as_bytes();
299 let mut out = vec![0xa0 | u8::try_from(bytes.len()).expect("len < 32")];
300 out.extend_from_slice(bytes);
301 out
302 }
303
304 fn sample() -> Vec<u8> {
307 let mut buf = vec![0x80 | 5]; buf.extend(fixstr("_table"));
309 buf.extend(fixstr("events"));
310 buf.extend(fixstr("org_id"));
311 buf.push(42); buf.extend(fixstr("live"));
313 buf.push(0xc3); buf.extend(fixstr("ratio"));
315 buf.push(0xcb); buf.extend_from_slice(&1.5f64.to_be_bytes());
317 buf.extend(fixstr("missing"));
318 buf.push(0xc0); buf
320 }
321
322 #[test]
323 fn msgpack_native_decode_extracts_string_field() {
324 let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
325 assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
326 }
327
328 #[test]
329 fn msgpack_native_decode_preserves_scalar_types() {
330 let value = parse_payload(&sample(), PayloadFormat::MsgPack).unwrap();
331 assert_eq!(value.get("org_id").and_then(|v| v.as_i64()), Some(42));
332 assert_eq!(value.get("live").and_then(|v| v.as_bool()), Some(true));
333 assert_eq!(value.get("ratio").and_then(|v| v.as_f64()), Some(1.5));
334 assert!(value.get("missing").is_some_and(|v| v.is_null()));
335 }
336
337 #[test]
338 fn msgpack_auto_detects_and_decodes_natively() {
339 let value = parse_payload(&sample(), PayloadFormat::Auto).unwrap();
341 assert_eq!(value.get("_table").and_then(|v| v.as_str()), Some("events"));
342 }
343
344 #[test]
345 fn msgpack_nested_array_and_map_walk() {
346 let mut buf = vec![0x80 | 2];
348 buf.extend(fixstr("items"));
349 buf.push(0x90 | 2); buf.push(1);
351 buf.push(2);
352 buf.extend(fixstr("meta"));
353 buf.push(0x80 | 1); buf.extend(fixstr("k"));
355 buf.extend(fixstr("v"));
356
357 let value = parse_payload(&buf, PayloadFormat::MsgPack).unwrap();
358 let items = value.get("items").unwrap();
359 assert_eq!(items[0].as_i64(), Some(1));
360 assert_eq!(items[1].as_i64(), Some(2));
361 assert_eq!(
362 value
363 .get("meta")
364 .and_then(|m| m.get("k"))
365 .and_then(|v| v.as_str()),
366 Some("v")
367 );
368 }
369
370 #[test]
371 fn malformed_msgpack_returns_msgpack_error() {
372 let result = parse_payload(&[0x81], PayloadFormat::MsgPack);
374 assert!(
375 matches!(result, Err(ParseError::MsgPack(_))),
376 "expected MsgPack error, got {result:?}"
377 );
378 }
379 }
380}