durable_streams_server/protocol/
json_mode.rs1use crate::protocol::error::{Error, Result};
2use bytes::{BufMut, Bytes, BytesMut};
3use serde_json::Value;
4
5pub fn process_append(data: &[u8]) -> Result<Vec<Bytes>> {
19 let value: Value =
21 serde_json::from_slice(data).map_err(|e| Error::InvalidJson(e.to_string()))?;
22
23 if let Value::Array(arr) = value {
24 if arr.is_empty() {
26 return Ok(vec![]);
27 }
28
29 let mut messages = Vec::with_capacity(arr.len());
31 for element in arr {
32 let json_bytes =
33 serde_json::to_vec(&element).expect("serializing validated JSON should not fail");
34 messages.push(Bytes::from(json_bytes));
35 }
36 Ok(messages)
37 } else {
38 let json_bytes =
40 serde_json::to_vec(&value).expect("serializing validated JSON should not fail");
41 Ok(vec![Bytes::from(json_bytes)])
42 }
43}
44
45pub fn wrap_read(messages: &[Bytes]) -> Result<Bytes> {
54 wrap_read_iter(messages.iter())
55}
56
57pub fn wrap_read_iter<'a, I>(messages: I) -> Result<Bytes>
66where
67 I: IntoIterator<Item = &'a Bytes>,
68{
69 let iter = messages.into_iter();
72 let (lo, _) = iter.size_hint();
73 let mut out = BytesMut::with_capacity(2 + lo * 65);
75 out.put_u8(b'[');
76 let mut wrote_any = false;
77 for msg in iter {
78 if wrote_any {
79 out.put_u8(b',');
80 }
81 out.extend_from_slice(msg);
82 wrote_any = true;
83 }
84 out.put_u8(b']');
85 Ok(out.freeze())
86}
87
88#[must_use]
92pub fn is_json_content_type(content_type: &str) -> bool {
93 content_type == "application/json"
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[test]
101 fn test_process_append_single_value() {
102 let data = br#"{"event":"click","x":100}"#;
103 let result = process_append(data).unwrap();
104
105 assert_eq!(result.len(), 1);
106 assert_eq!(result[0], Bytes::from(r#"{"event":"click","x":100}"#));
107 }
108
109 #[test]
110 fn test_process_append_array_flattening() {
111 let data = br#"[{"a":1},{"b":2}]"#;
112 let result = process_append(data).unwrap();
113
114 assert_eq!(result.len(), 2);
115 assert_eq!(result[0], Bytes::from(r#"{"a":1}"#));
116 assert_eq!(result[1], Bytes::from(r#"{"b":2}"#));
117 }
118
119 #[test]
120 fn test_process_append_empty_array_returns_empty_vec() {
121 let data = b"[]";
122 let result = process_append(data).unwrap();
123 assert!(result.is_empty());
124 }
125
126 #[test]
127 fn test_process_append_invalid_json() {
128 let data = b"{invalid}";
129 let result = process_append(data);
130
131 assert!(matches!(result, Err(Error::InvalidJson(_))));
132 }
133
134 #[test]
135 fn test_process_append_nested_arrays_preserved() {
136 let data = br#"[{"tags":["a","b"]},{"items":[1,2,3]}]"#;
137 let result = process_append(data).unwrap();
138
139 assert_eq!(result.len(), 2);
140 assert_eq!(result[0], Bytes::from(r#"{"tags":["a","b"]}"#));
141 assert_eq!(result[1], Bytes::from(r#"{"items":[1,2,3]}"#));
142 }
143
144 #[test]
145 fn test_wrap_read_empty() {
146 let messages: Vec<Bytes> = vec![];
147 let result = wrap_read(&messages).unwrap();
148
149 assert_eq!(result, Bytes::from("[]"));
150 }
151
152 #[test]
153 fn test_wrap_read_single_message() {
154 let messages = vec![Bytes::from(r#"{"a":1}"#)];
155 let result = wrap_read(&messages).unwrap();
156
157 assert_eq!(result, Bytes::from(r#"[{"a":1}]"#));
158 }
159
160 #[test]
161 fn test_wrap_read_multiple_messages() {
162 let messages = vec![
163 Bytes::from(r#"{"a":1}"#),
164 Bytes::from(r#"{"b":2}"#),
165 Bytes::from(r#"{"c":3}"#),
166 ];
167 let result = wrap_read(&messages).unwrap();
168
169 assert_eq!(result, Bytes::from(r#"[{"a":1},{"b":2},{"c":3}]"#));
170 }
171
172 #[test]
173 fn test_is_json_content_type() {
174 assert!(is_json_content_type("application/json"));
175 assert!(!is_json_content_type("text/plain"));
176 assert!(!is_json_content_type("application/xml"));
177 }
178}