Skip to main content

durable_streams_server/protocol/
json_mode.rs

1use crate::protocol::error::{Error, Result};
2use bytes::{BufMut, Bytes, BytesMut};
3use serde_json::Value;
4
5/// Process JSON data for append: validate and flatten arrays
6///
7/// If the input is a JSON array, returns each element as a separate message.
8/// If the input is a single JSON value, returns it as one message.
9/// Empty arrays return `Ok(vec![])` — callers decide whether that's an error.
10///
11/// # Errors
12///
13/// Returns `Error::InvalidJson` if input is not valid JSON.
14///
15/// # Panics
16///
17/// Panics if serializing validated JSON back to bytes fails, which should never happen.
18pub fn process_append(data: &[u8]) -> Result<Vec<Bytes>> {
19    // Parse JSON
20    let value: Value =
21        serde_json::from_slice(data).map_err(|e| Error::InvalidJson(e.to_string()))?;
22
23    if let Value::Array(arr) = value {
24        // Empty arrays return empty vec — caller decides policy
25        if arr.is_empty() {
26            return Ok(vec![]);
27        }
28
29        // Flatten array: each element becomes a separate message
30        let mut messages = Vec::with_capacity(arr.len());
31        for element in arr {
32            let json_bytes =
33                serde_json::to_vec(&element).expect("serializing validated JSON should not fail");
34            messages.push(Bytes::from(json_bytes));
35        }
36        Ok(messages)
37    } else {
38        // Single value: return as-is
39        let json_bytes =
40            serde_json::to_vec(&value).expect("serializing validated JSON should not fail");
41        Ok(vec![Bytes::from(json_bytes)])
42    }
43}
44
45/// Wrap messages in JSON array for read.
46///
47/// Takes a collection of JSON message bytes and wraps them in a JSON array.
48/// If the input is empty, returns an empty array `[]`.
49///
50/// # Errors
51///
52/// This function currently does not return errors and always returns `Ok`.
53pub fn wrap_read(messages: &[Bytes]) -> Result<Bytes> {
54    wrap_read_iter(messages.iter())
55}
56
57/// Wrap JSON messages (iterator form) in a JSON array for read responses.
58///
59/// Accepts any iterator of `&Bytes` to avoid intermediate allocations
60/// in handler hot paths.
61///
62/// # Errors
63///
64/// This function currently does not return errors and always returns `Ok`.
65pub fn wrap_read_iter<'a, I>(messages: I) -> Result<Bytes>
66where
67    I: IntoIterator<Item = &'a Bytes>,
68{
69    // Stored JSON messages are validated on write; build the array directly
70    // to avoid per-message parse + re-serialize in read hot paths.
71    let iter = messages.into_iter();
72    let (lo, _) = iter.size_hint();
73    // Rough estimate: '[' + ']' + per-message avg ~64 bytes + commas
74    let mut out = BytesMut::with_capacity(2 + lo * 65);
75    out.put_u8(b'[');
76    let mut wrote_any = false;
77    for msg in iter {
78        if wrote_any {
79            out.put_u8(b',');
80        }
81        out.extend_from_slice(msg);
82        wrote_any = true;
83    }
84    out.put_u8(b']');
85    Ok(out.freeze())
86}
87
88/// Check if a content type is JSON
89///
90/// Returns true if the normalized content type is "application/json".
91#[must_use]
92pub fn is_json_content_type(content_type: &str) -> bool {
93    content_type == "application/json"
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn test_process_append_single_value() {
102        let data = br#"{"event":"click","x":100}"#;
103        let result = process_append(data).unwrap();
104
105        assert_eq!(result.len(), 1);
106        assert_eq!(result[0], Bytes::from(r#"{"event":"click","x":100}"#));
107    }
108
109    #[test]
110    fn test_process_append_array_flattening() {
111        let data = br#"[{"a":1},{"b":2}]"#;
112        let result = process_append(data).unwrap();
113
114        assert_eq!(result.len(), 2);
115        assert_eq!(result[0], Bytes::from(r#"{"a":1}"#));
116        assert_eq!(result[1], Bytes::from(r#"{"b":2}"#));
117    }
118
119    #[test]
120    fn test_process_append_empty_array_returns_empty_vec() {
121        let data = b"[]";
122        let result = process_append(data).unwrap();
123        assert!(result.is_empty());
124    }
125
126    #[test]
127    fn test_process_append_invalid_json() {
128        let data = b"{invalid}";
129        let result = process_append(data);
130
131        assert!(matches!(result, Err(Error::InvalidJson(_))));
132    }
133
134    #[test]
135    fn test_process_append_nested_arrays_preserved() {
136        let data = br#"[{"tags":["a","b"]},{"items":[1,2,3]}]"#;
137        let result = process_append(data).unwrap();
138
139        assert_eq!(result.len(), 2);
140        assert_eq!(result[0], Bytes::from(r#"{"tags":["a","b"]}"#));
141        assert_eq!(result[1], Bytes::from(r#"{"items":[1,2,3]}"#));
142    }
143
144    #[test]
145    fn test_wrap_read_empty() {
146        let messages: Vec<Bytes> = vec![];
147        let result = wrap_read(&messages).unwrap();
148
149        assert_eq!(result, Bytes::from("[]"));
150    }
151
152    #[test]
153    fn test_wrap_read_single_message() {
154        let messages = vec![Bytes::from(r#"{"a":1}"#)];
155        let result = wrap_read(&messages).unwrap();
156
157        assert_eq!(result, Bytes::from(r#"[{"a":1}]"#));
158    }
159
160    #[test]
161    fn test_wrap_read_multiple_messages() {
162        let messages = vec![
163            Bytes::from(r#"{"a":1}"#),
164            Bytes::from(r#"{"b":2}"#),
165            Bytes::from(r#"{"c":3}"#),
166        ];
167        let result = wrap_read(&messages).unwrap();
168
169        assert_eq!(result, Bytes::from(r#"[{"a":1},{"b":2},{"c":3}]"#));
170    }
171
172    #[test]
173    fn test_is_json_content_type() {
174        assert!(is_json_content_type("application/json"));
175        assert!(!is_json_content_type("text/plain"));
176        assert!(!is_json_content_type("application/xml"));
177    }
178}