json_stream/
lib.rs

1#![deny(missing_docs)]
2
3//! A library to parse Newline Delimited JSON values from a byte stream.
4//!
5//! # Example
6//!
7//! ```rust
8//! use json_stream::JsonStream;
9//! use futures::stream::once;
10//! use futures::StreamExt;
11//! use serde::Deserialize;
12//!
13//! #[derive(Deserialize, Debug)]
14//! struct Foo {
15//!    bar: String,
16//! }
17//!
18//! #[tokio::main]
19//! async fn main() {
20//!     // This could be any stream that yields bytes, such as a file stream or a network stream.
21//!     let pinned_bytes_future = Box::pin(async {
22//!         Ok::<_, std::io::Error>(r#"{"bar": "foo"}\n{"bar": "qux"}\n{"bar": "baz"}"#.as_bytes())
23//!     });
24//!     let mut json_stream = JsonStream::<Foo, _>::new(once(pinned_bytes_future));
25//!
26//!     while let Some(Ok(foo)) = json_stream.next().await {
27//!         println!("{:?}", foo);
28//!     }
29//! }
30//! ```
31use std::collections::VecDeque;
32use std::ops::Deref;
33use std::pin::Pin;
34use std::task::{Context, Poll, ready};
35use futures::Stream;
36use serde::de::DeserializeOwned;
37use pin_project::pin_project;
38use serde_json::Deserializer;
39use tracing::trace;
40
41// should be 2^n - 1 for VecDeque to work efficiently
42const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 256 - 1; // 256KB
43/// The default buffer capacity for the [`JsonStream`]. This is the maximum amount of bytes that
44/// will be buffered before the stream is terminated, by default.
45pub const DEFAULT_MAX_BUFFER_CAPACITY: usize = 1024 * 1024 * 8 - 1; // 8 MB
46
47/// A [`Stream`] implementation that can be used to parse Newline Delimited JSON values from a byte stream.
48/// It does so by buffering bytes internally and parsing them as they are received.
49/// This means that the stream will not yield values until a full JSON value has been received.
50///
51/// After a full JSON value has been parsed and yielded, the stream will delete the bytes that were used
52/// to parse the value from the internal buffer. This means that the stream will not use more memory than
53/// is necessary to contain the maximum buffer size specified) as well as any JSON values previously
54/// parsed but not yet yielded.
55#[pin_project]
56pub struct JsonStream<T, S> {
57    #[pin]
58    stream: S,
59    entry_buffer: Vec<T>,
60    byte_buffer: VecDeque<u8>,
61    finished: bool,
62    max_buffer_capacity: usize,
63}
64
65impl<T, S: Unpin> JsonStream<T, S> {
66    /// Create a new [`JsonStream`] with the default buffer capacity.
67    pub fn new(stream: S) -> Self {
68        Self::new_with_max_capacity(stream, DEFAULT_MAX_BUFFER_CAPACITY)
69    }
70
71    /// Create a new [`JsonStream`] with a custom maximum buffer capacity.
72    ///
73    /// The maximum buffer capacity is the maximum amount of bytes that will be buffered before the
74    /// stream is terminated. This is to prevent malformed streams from causing the server to run out
75    /// of memory.
76    ///
77    /// As a rule of thumb, this number should be at least as large as the largest entry in the stream.
78    /// Additionally, it's best if it's a power of 2 minus 1 (e.g. 1023, 2047, 4095, etc.) as this
79    /// allows the internal buffer to be more efficient. This is not a requirement, however.
80    ///
81    /// Lastly, it is not guaranteed that this is the maximum amount of memory that will be used by
82    /// the stream. This is because the internal buffer _may_ allocate 2x the amount of bytes specified
83    /// as well as waste some space in the buffer.
84    pub fn new_with_max_capacity(stream: S, max_capacity: usize) -> Self {
85        Self {
86            stream,
87            entry_buffer: Vec::new(),
88            byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)),
89            finished: false,
90            max_buffer_capacity: max_capacity
91        }
92    }
93
94    /// Controls how large the internal buffer can grow in bytes. If the buffer grows larger than this
95    /// the stream is terminated as it is assumed that the stream is malformed. If this number is too
96    /// large, a malformed stream can cause the server to run out of memory.
97    ///
98    /// As a rule of thumb, this number should be at least as large as the largest entry in the stream.
99    ///
100    /// The default value is 8 MB.
101    pub fn set_max_buffer_size(&mut self, max_capacity: usize) {
102        self.max_buffer_capacity = max_capacity;
103    }
104
105    /// Marks this stream as "finished" which means that no more entries will be read from the
106    /// underlying stream. While this stream is likely going to be dropped soon, we might as well
107    /// clear memory we do not need to use.
108    fn finish(mut self: Pin<&mut Self>) {
109        self.finished = true;
110        self.entry_buffer.clear();
111        self.entry_buffer.shrink_to_fit();
112        self.byte_buffer.clear();
113        self.byte_buffer.shrink_to_fit();
114    }
115}
116
117impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
118    where
119        T: DeserializeOwned,
120        B: Deref<Target = [u8]>,
121        S: Stream<Item = Result<B, E>> + Unpin
122{
123    type Item = Result<T, E>;
124
125    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126        // efficiently check if we should stop
127        if self.finished {
128            return Poll::Ready(None);
129        }
130
131        let mut this = self.as_mut().project();
132
133        loop {
134            // if we have an entry, we should return it immediately
135            if let Some(entry) = this.entry_buffer.pop() {
136                return Poll::Ready(Some(Ok(entry)));
137            }
138
139            // try to fetch the next chunk
140            let next_chunk = loop {
141                match ready!(this.stream.as_mut().poll_next(cx)) {
142                    Some(Ok(chunk)) => break chunk,
143                    Some(Err(err)) => {
144                        self.finish();
145                        return Poll::Ready(Some(Err(err)));
146                    },
147                    None => {
148                        self.finish();
149                        return Poll::Ready(None);
150                    }
151                }
152            };
153
154            // if there is no room for this chunk, we should give up
155            match this.byte_buffer.len().checked_add(next_chunk.len()) {
156                Some(new_size) if new_size > DEFAULT_MAX_BUFFER_CAPACITY => {
157                    // no room for this chunk
158                    self.finish();
159                    return Poll::Ready(None);
160                },
161                None => {
162                    // overflow occurred
163                    self.finish();
164                    return Poll::Ready(None);
165                }
166                _ => {}
167            }
168
169            // room is available, so let's add the chunk
170            this.byte_buffer.extend(&*next_chunk);
171
172            // because we inserted more data into the VecDeque, we need to reassure the layout of it
173            this.byte_buffer.make_contiguous();
174            // we know that all of the data will be located in the first slice
175            let (buffer, _) = this.byte_buffer.as_slices();
176            let mut json_iter = Deserializer::from_slice(buffer).into_iter::<T>();
177            let mut last_read_pos = 0;
178
179            // read each entry from the buffer
180            loop {
181                match json_iter.next() {
182                    Some(Ok(entry)) => {
183                        last_read_pos = json_iter.byte_offset();
184                        this.entry_buffer.push(entry);
185                    },
186                    // if there was an error, log it but move on because this could be a partial entry
187                    Some(Err(err)) => {
188                        trace!(err = ?err, "failed to parse json entry");
189                        break
190                    },
191                    // nothing left then we move on
192                    None => break
193                }
194            }
195
196            // remove the read bytes - this is very efficient as it's a ring buffer
197            let _ = this.byte_buffer.drain(..last_read_pos);
198            // realign the buffer to the beginning so we can get contiguous slices
199            // we want to do this with all of the read bytes removed because this operation becomes a memcpy
200            // if we waited until after we added bytes again, it could devolve into a much slower operation
201            this.byte_buffer.make_contiguous();
202        }
203    }
204}