Skip to main content

dbrest_core/app/
streaming.rs

1//! Streaming response support
2//!
3//! This module provides functionality to stream large JSON responses
4//! instead of loading them entirely into memory.
5
6use axum::body::Body;
7use bytes::Bytes;
8use futures::stream;
9use std::pin::Pin;
10use std::task::{Context, Poll};
11
12/// Stream a JSON array by parsing the body string and streaming it chunk by chunk.
13///
14/// This is useful for large result sets where we want to avoid loading
15/// the entire JSON array into memory at once.
16pub fn stream_json_array(body: String) -> Body {
17    const CHUNK_SIZE: usize = 8192;
18    let bytes: Bytes = Bytes::from(body.into_bytes());
19    let len = bytes.len();
20    let offsets: Vec<usize> = (0..len).step_by(CHUNK_SIZE).collect();
21    let stream = stream::iter(offsets.into_iter().map(move |start| {
22        let end = (start + CHUNK_SIZE).min(len);
23        Ok::<_, std::io::Error>(bytes.slice(start..end))
24    }));
25    Body::from_stream(stream)
26}
27
28/// Check if a response body should be streamed based on size and configuration.
29pub fn should_stream(body_size: usize, streaming_enabled: bool, threshold: u64) -> bool {
30    streaming_enabled && (body_size as u64) > threshold
31}
32
33/// Stream a JSON array response with proper formatting.
34///
35/// Takes a pre-formatted JSON array string and streams it in chunks.
36pub fn stream_json_response(json_body: String) -> Body {
37    const CHUNK_SIZE: usize = 8192;
38    let bytes: Bytes = Bytes::from(json_body.into_bytes());
39    let len = bytes.len();
40    let offsets: Vec<usize> = (0..len).step_by(CHUNK_SIZE).collect();
41    let stream = stream::iter(offsets.into_iter().map(move |start| {
42        let end = (start + CHUNK_SIZE).min(len);
43        Ok::<_, std::io::Error>(bytes.slice(start..end))
44    }));
45    Body::from_stream(stream)
46}
47
48/// A stream that yields JSON array elements one at a time.
49///
50/// This allows streaming very large arrays without loading them all into memory.
51pub struct JsonArrayStream {
52    items: Vec<serde_json::Value>,
53    current_index: usize,
54    buffer: String,
55    opened: bool,
56    done: bool,
57}
58
59impl JsonArrayStream {
60    /// Create a new JSON array stream from a vector of JSON values.
61    pub fn new(items: Vec<serde_json::Value>) -> Self {
62        Self {
63            items,
64            current_index: 0,
65            buffer: String::new(),
66            opened: false,
67            done: false,
68        }
69    }
70
71    /// Get the next chunk of JSON to send.
72    fn next_chunk(&mut self) -> Option<Bytes> {
73        if self.done {
74            return None;
75        }
76
77        if !self.opened {
78            self.buffer.push('[');
79            self.opened = true;
80            // Return opening bracket immediately
81            let chunk = Bytes::from(self.buffer.clone());
82            self.buffer.clear();
83            return Some(chunk);
84        }
85
86        // Stream items one at a time
87        while self.current_index < self.items.len() {
88            if self.current_index > 0 {
89                self.buffer.push(',');
90            }
91
92            // Serialize the current item
93            if let Ok(item_json) = serde_json::to_string(&self.items[self.current_index]) {
94                self.buffer.push_str(&item_json);
95            }
96
97            self.current_index += 1;
98
99            // Return chunk if buffer is large enough, or if it's the last item
100            if self.buffer.len() >= 8192 || self.current_index >= self.items.len() {
101                let chunk = Bytes::from(self.buffer.clone());
102                self.buffer.clear();
103
104                // If this was the last item, we need to close the array next
105                if self.current_index >= self.items.len() {
106                    // Don't mark as done yet - we still need to send the closing bracket
107                }
108
109                return Some(chunk);
110            }
111        }
112
113        // Close the array
114        if self.current_index >= self.items.len() && !self.done {
115            self.buffer.push(']');
116            self.done = true;
117            let chunk = Bytes::from(self.buffer.clone());
118            self.buffer.clear();
119            return Some(chunk);
120        }
121
122        None
123    }
124}
125
126impl futures::Stream for JsonArrayStream {
127    type Item = Result<Bytes, std::io::Error>;
128
129    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
130        match self.next_chunk() {
131            Some(chunk) => Poll::Ready(Some(Ok(chunk))),
132            None => Poll::Ready(None),
133        }
134    }
135}
136
137/// Stream a vector of JSON values as a JSON array.
138pub fn stream_json_array_from_values(items: Vec<serde_json::Value>) -> Body {
139    let stream = JsonArrayStream::new(items);
140    Body::from_stream(stream)
141}
142
143#[cfg(test)]
144mod tests {
145    use super::*;
146    use serde_json::json;
147
148    #[tokio::test]
149    async fn test_json_array_stream() {
150        use axum::body::Body;
151
152        let items = vec![
153            json!({"id": 1, "name": "Alice"}),
154            json!({"id": 2, "name": "Bob"}),
155            json!({"id": 3, "name": "Charlie"}),
156        ];
157
158        let stream = JsonArrayStream::new(items);
159        let body = Body::from_stream(stream);
160
161        // Collect all chunks using axum's body utilities
162        let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
163        let json_str = String::from_utf8(bytes.to_vec()).unwrap();
164        let parsed: serde_json::Value = serde_json::from_str(&json_str).unwrap();
165
166        assert!(parsed.is_array());
167        assert_eq!(parsed.as_array().unwrap().len(), 3);
168    }
169
170    #[tokio::test]
171    async fn test_stream_json_response_preserves_content() {
172        let original = r#"[{"id":1},{"id":2},{"id":3}]"#.to_string();
173        let body = stream_json_response(original.clone());
174        let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
175        assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), original);
176    }
177
178    #[tokio::test]
179    async fn test_stream_json_response_large_body() {
180        // Body larger than CHUNK_SIZE (8192) to test multi-chunk streaming
181        let large = "x".repeat(20_000);
182        let body = stream_json_response(large.clone());
183        let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
184        assert_eq!(bytes.len(), 20_000);
185        assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), large);
186    }
187
188    #[tokio::test]
189    async fn test_stream_json_array_preserves_content() {
190        let original = r#"[1,2,3]"#.to_string();
191        let body = stream_json_array(original.clone());
192        let bytes = axum::body::to_bytes(body, 10 * 1024 * 1024).await.unwrap();
193        assert_eq!(String::from_utf8(bytes.to_vec()).unwrap(), original);
194    }
195
196    #[test]
197    fn test_should_stream() {
198        // Should stream if enabled and size exceeds threshold
199        assert!(should_stream(11 * 1024 * 1024, true, 10 * 1024 * 1024));
200
201        // Should not stream if disabled
202        assert!(!should_stream(11 * 1024 * 1024, false, 10 * 1024 * 1024));
203
204        // Should not stream if size is below threshold
205        assert!(!should_stream(5 * 1024 * 1024, true, 10 * 1024 * 1024));
206    }
207}