dbrest_core/app/
streaming.rs1use axum::body::Body;
7use bytes::Bytes;
8use futures::stream;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12pub fn stream_json_array(body: String) -> Body {
17 const CHUNK_SIZE: usize = 8192;
18 let bytes: Bytes = Bytes::from(body.into_bytes());
19 let len = bytes.len();
20 let offsets: Vec<usize> = (0..len).step_by(CHUNK_SIZE).collect();
21 let stream = stream::iter(offsets.into_iter().map(move |start| {
22 let end = (start + CHUNK_SIZE).min(len);
23 Ok::<_, std::io::Error>(bytes.slice(start..end))
24 }));
25 Body::from_stream(stream)
26}
27
28pub fn should_stream(body_size: usize, streaming_enabled: bool, threshold: u64) -> bool {
30 streaming_enabled && (body_size as u64) > threshold
31}
32
33pub fn stream_json_response(json_body: String) -> Body {
37 const CHUNK_SIZE: usize = 8192;
38 let bytes: Bytes = Bytes::from(json_body.into_bytes());
39 let len = bytes.len();
40 let offsets: Vec<usize> = (0..len).step_by(CHUNK_SIZE).collect();
41 let stream = stream::iter(offsets.into_iter().map(move |start| {
42 let end = (start + CHUNK_SIZE).min(len);
43 Ok::<_, std::io::Error>(bytes.slice(start..end))
44 }));
45 Body::from_stream(stream)
46}
47
48pub struct JsonArrayStream {
52 items: Vec<serde_json::Value>,
53 current_index: usize,
54 buffer: String,
55 opened: bool,
56 done: bool,
57}
58
59impl JsonArrayStream {
60 pub fn new(items: Vec<serde_json::Value>) -> Self {
62 Self {
63 items,
64 current_index: 0,
65 buffer: String::new(),
66 opened: false,
67 done: false,
68 }
69 }
70
71 fn next_chunk(&mut self) -> Option<Bytes> {
73 if self.done {
74 return None;
75 }
76
77 if !self.opened {
78 self.buffer.push('[');
79 self.opened = true;
80 let chunk = Bytes::from(self.buffer.clone());
82 self.buffer.clear();
83 return Some(chunk);
84 }
85
86 while self.current_index < self.items.len() {
88 if self.current_index > 0 {
89 self.buffer.push(',');
90 }
91
92 if let Ok(item_json) = serde_json::to_string(&self.items[self.current_index]) {
94 self.buffer.push_str(&item_json);
95 }
96
97 self.current_index += 1;
98
99 if self.buffer.len() >= 8192 || self.current_index >= self.items.len() {
101 let chunk = Bytes::from(self.buffer.clone());
102 self.buffer.clear();
103
104 if self.current_index >= self.items.len() {
106 }
108
109 return Some(chunk);
110 }
111 }
112
113 if self.current_index >= self.items.len() && !self.done {
115 self.buffer.push(']');
116 self.done = true;
117 let chunk = Bytes::from(self.buffer.clone());
118 self.buffer.clear();
119 return Some(chunk);
120 }
121
122 None
123 }
124}
125
126impl futures::Stream for JsonArrayStream {
127 type Item = Result<Bytes, std::io::Error>;
128
129 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130 match self.next_chunk() {
131 Some(chunk) => Poll::Ready(Some(Ok(chunk))),
132 None => Poll::Ready(None),
133 }
134 }
135}
136
137pub fn stream_json_array_from_values(items: Vec<serde_json::Value>) -> Body {
139 let stream = JsonArrayStream::new(items);
140 Body::from_stream(stream)
141}
142
143#[cfg(test)]
144mod tests {
145 use super::*;
146 use serde_json::json;
147
148 #[tokio::test]
149 async fn test_json_array_stream() {
150 use axum::body::Body;
151
152 let items = vec![
153 json!({"id": 1, "name": "Alice"}),
154 json!({"id": 2, "name": "Bob"}),
155 json!({"id": 3, "name": "Charlie"}),
156 ];
157
158 let stream = JsonArrayStream::new(items);
159 let body = Body::from_stream(stream);
160
161 let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
163 let json_str = String::from_utf8(bytes.to_vec()).unwrap();
164 let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
165
166 assert!(parsed.is_array());
167 assert_eq!(parsed.as_array().unwrap().len(), 3);
168 }
169
170 #[tokio::test]
171 async fn test_stream_json_response_preserves_content() {
172 let original = r#"[{"id":1},{"id":2},{"id":3}]"#.to_string();
173 let body = stream_json_response(original.clone());
174 let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
175 assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), original);
176 }
177
178 #[tokio::test]
179 async fn test_stream_json_response_large_body() {
180 let large = "x".repeat(20_000);
182 let body = stream_json_response(large.clone());
183 let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
184 assert_eq!(bytes.len(), 20_000);
185 assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), large);
186 }
187
188 #[tokio::test]
189 async fn test_stream_json_array_preserves_content() {
190 let original = r#"[1,2,3]"#.to_string();
191 let body = stream_json_array(original.clone());
192 let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
193 assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), original);
194 }
195
196 #[test]
197 fn test_should_stream() {
198 assert!(should_stream(11 * 1024 * 1024, true, 10 * 1024 * 1024));
200
201 assert!(!should_stream(11 * 1024 * 1024, false, 10 * 1024 * 1024));
203
204 assert!(!should_stream(5 * 1024 * 1024, true, 10 * 1024 * 1024));
206 }
207}