json_stream/lib.rs
1#![deny(missing_docs)]
2
3//! A library to parse Newline Delimited JSON values from a byte stream.
4//!
5//! # Example
6//!
7//! ```rust
8//! use json_stream::JsonStream;
9//! use futures::stream::once;
10//! use futures::StreamExt;
11//! use serde::Deserialize;
12//!
13//! #[derive(Deserialize, Debug)]
14//! struct Foo {
15//! bar: String,
16//! }
17//!
18//! #[tokio::main]
19//! async fn main() {
20//! // This could be any stream that yields bytes, such as a file stream or a network stream.
21//! let pinned_bytes_future = Box::pin(async {
22//! Ok::<_, std::io::Error>(r#"{"bar": "foo"}\n{"bar": "qux"}\n{"bar": "baz"}"#.as_bytes())
23//! });
24//! let mut json_stream = JsonStream::<Foo, _>::new(once(pinned_bytes_future));
25//!
26//! while let Some(Ok(foo)) = json_stream.next().await {
27//! println!("{:?}", foo);
28//! }
29//! }
30//! ```
31use std::collections::VecDeque;
32use std::ops::Deref;
33use std::pin::Pin;
34use std::task::{Context, Poll, ready};
35use futures::Stream;
36use serde::de::DeserializeOwned;
37use pin_project::pin_project;
38use serde_json::Deserializer;
39use tracing::trace;
40
41// should be 2^n - 1 for VecDeque to work efficiently
42const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 256 - 1; // 256KB
43/// The default buffer capacity for the [`JsonStream`]. This is the maximum amount of bytes that
44/// will be buffered before the stream is terminated, by default.
45pub const DEFAULT_MAX_BUFFER_CAPACITY: usize = 1024 * 1024 * 8 - 1; // 8 MB
46
47/// A [`Stream`] implementation that can be used to parse Newline Delimited JSON values from a byte stream.
48/// It does so by buffering bytes internally and parsing them as they are received.
49/// This means that the stream will not yield values until a full JSON value has been received.
50///
51/// After a full JSON value has been parsed and yielded, the stream will delete the bytes that were used
52/// to parse the value from the internal buffer. This means that the stream will not use more memory than
53/// is necessary to contain the maximum buffer size specified) as well as any JSON values previously
54/// parsed but not yet yielded.
55#[pin_project]
56pub struct JsonStream<T, S> {
57 #[pin]
58 stream: S,
59 entry_buffer: Vec<T>,
60 byte_buffer: VecDeque<u8>,
61 finished: bool,
62 max_buffer_capacity: usize,
63}
64
65impl<T, S: Unpin> JsonStream<T, S> {
66 /// Create a new [`JsonStream`] with the default buffer capacity.
67 pub fn new(stream: S) -> Self {
68 Self::new_with_max_capacity(stream, DEFAULT_MAX_BUFFER_CAPACITY)
69 }
70
71 /// Create a new [`JsonStream`] with a custom maximum buffer capacity.
72 ///
73 /// The maximum buffer capacity is the maximum amount of bytes that will be buffered before the
74 /// stream is terminated. This is to prevent malformed streams from causing the server to run out
75 /// of memory.
76 ///
77 /// As a rule of thumb, this number should be at least as large as the largest entry in the stream.
78 /// Additionally, it's best if it's a power of 2 minus 1 (e.g. 1023, 2047, 4095, etc.) as this
79 /// allows the internal buffer to be more efficient. This is not a requirement, however.
80 ///
81 /// Lastly, it is not guaranteed that this is the maximum amount of memory that will be used by
82 /// the stream. This is because the internal buffer _may_ allocate 2x the amount of bytes specified
83 /// as well as waste some space in the buffer.
84 pub fn new_with_max_capacity(stream: S, max_capacity: usize) -> Self {
85 Self {
86 stream,
87 entry_buffer: Vec::new(),
88 byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)),
89 finished: false,
90 max_buffer_capacity: max_capacity
91 }
92 }
93
94 /// Controls how large the internal buffer can grow in bytes. If the buffer grows larger than this
95 /// the stream is terminated as it is assumed that the stream is malformed. If this number is too
96 /// large, a malformed stream can cause the server to run out of memory.
97 ///
98 /// As a rule of thumb, this number should be at least as large as the largest entry in the stream.
99 ///
100 /// The default value is 8 MB.
101 pub fn set_max_buffer_size(&mut self, max_capacity: usize) {
102 self.max_buffer_capacity = max_capacity;
103 }
104
105 /// Marks this stream as "finished" which means that no more entries will be read from the
106 /// underlying stream. While this stream is likely going to be dropped soon, we might as well
107 /// clear memory we do not need to use.
108 fn finish(mut self: Pin<&mut Self>) {
109 self.finished = true;
110 self.entry_buffer.clear();
111 self.entry_buffer.shrink_to_fit();
112 self.byte_buffer.clear();
113 self.byte_buffer.shrink_to_fit();
114 }
115}
116
117impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
118 where
119 T: DeserializeOwned,
120 B: Deref<Target = [u8]>,
121 S: Stream<Item = Result<B, E>> + Unpin
122{
123 type Item = Result<T, E>;
124
125 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 // efficiently check if we should stop
127 if self.finished {
128 return Poll::Ready(None);
129 }
130
131 let mut this = self.as_mut().project();
132
133 loop {
134 // if we have an entry, we should return it immediately
135 if let Some(entry) = this.entry_buffer.pop() {
136 return Poll::Ready(Some(Ok(entry)));
137 }
138
139 // try to fetch the next chunk
140 let next_chunk = loop {
141 match ready!(this.stream.as_mut().poll_next(cx)) {
142 Some(Ok(chunk)) => break chunk,
143 Some(Err(err)) => {
144 self.finish();
145 return Poll::Ready(Some(Err(err)));
146 },
147 None => {
148 self.finish();
149 return Poll::Ready(None);
150 }
151 }
152 };
153
154 // if there is no room for this chunk, we should give up
155 match this.byte_buffer.len().checked_add(next_chunk.len()) {
156 Some(new_size) if new_size > DEFAULT_MAX_BUFFER_CAPACITY => {
157 // no room for this chunk
158 self.finish();
159 return Poll::Ready(None);
160 },
161 None => {
162 // overflow occurred
163 self.finish();
164 return Poll::Ready(None);
165 }
166 _ => {}
167 }
168
169 // room is available, so let's add the chunk
170 this.byte_buffer.extend(&*next_chunk);
171
172 // because we inserted more data into the VecDeque, we need to reassure the layout of it
173 this.byte_buffer.make_contiguous();
174 // we know that all of the data will be located in the first slice
175 let (buffer, _) = this.byte_buffer.as_slices();
176 let mut json_iter = Deserializer::from_slice(buffer).into_iter::<T>();
177 let mut last_read_pos = 0;
178
179 // read each entry from the buffer
180 loop {
181 match json_iter.next() {
182 Some(Ok(entry)) => {
183 last_read_pos = json_iter.byte_offset();
184 this.entry_buffer.push(entry);
185 },
186 // if there was an error, log it but move on because this could be a partial entry
187 Some(Err(err)) => {
188 trace!(err = ?err, "failed to parse json entry");
189 break
190 },
191 // nothing left then we move on
192 None => break
193 }
194 }
195
196 // remove the read bytes - this is very efficient as it's a ring buffer
197 let _ = this.byte_buffer.drain(..last_read_pos);
198 // realign the buffer to the beginning so we can get contiguous slices
199 // we want to do this with all of the read bytes removed because this operation becomes a memcpy
200 // if we waited until after we added bytes again, it could devolve into a much slower operation
201 this.byte_buffer.make_contiguous();
202 }
203 }
204}