1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
#![deny(missing_docs)]

//! A library to parse Newline Delimited JSON values from a byte stream.
//!
//! # Example
//!
//! ```rust
//! use json_stream::JsonStream;
//! use futures::stream::once;
//! use futures::StreamExt;
//! use serde::Deserialize;
//!
//! #[derive(Deserialize, Debug)]
//! struct Foo {
//!    bar: String,
//! }
//!
//! #[tokio::main]
//! async fn main() {
//!     // This could be any stream that yields bytes, such as a file stream or a network stream.
//!     let pinned_bytes_future = Box::pin(async {
//!         Ok::<_, std::io::Error>(r#"{"bar": "foo"}\n{"bar": "qux"}\n{"bar": "baz"}"#.as_bytes())
//!     });
//!     let mut json_stream = JsonStream::<Foo, _>::new(once(pinned_bytes_future));
//!
//!     while let Some(Ok(foo)) = json_stream.next().await {
//!         println!("{:?}", foo);
//!     }
//! }
//! ```
use std::collections::VecDeque;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use futures::Stream;
use serde::de::DeserializeOwned;
use pin_project::pin_project;
use serde_json::Deserializer;
use tracing::trace;

// should be 2^n - 1 for VecDeque to work efficiently
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 256 - 1; // 256KB
/// The default buffer capacity for the [`JsonStream`]. This is the maximum amount of bytes that
/// will be buffered before the stream is terminated, by default.
pub const DEFAULT_MAX_BUFFER_CAPACITY: usize = 1024 * 1024 * 8 - 1; // 8 MB

/// A [`Stream`] implementation that can be used to parse Newline Delimited JSON values from a byte stream.
/// It does so by buffering bytes internally and parsing them as they are received.
/// This means that the stream will not yield values until a full JSON value has been received.
///
/// After a full JSON value has been parsed and yielded, the stream will delete the bytes that were used
/// to parse the value from the internal buffer. This means that the stream will not use more memory than
/// is necessary to contain the maximum buffer size specified) as well as any JSON values previously
/// parsed but not yet yielded.
#[pin_project]
pub struct JsonStream<T, S> {
    #[pin]
    stream: S,
    entry_buffer: Vec<T>,
    byte_buffer: VecDeque<u8>,
    finished: bool,
    max_buffer_capacity: usize,
}

impl<T, S: Unpin> JsonStream<T, S> {
    /// Create a new [`JsonStream`] with the default buffer capacity.
    pub fn new(stream: S) -> Self {
        Self::new_with_max_capacity(stream, DEFAULT_MAX_BUFFER_CAPACITY)
    }

    /// Create a new [`JsonStream`] with a custom maximum buffer capacity.
    ///
    /// The maximum buffer capacity is the maximum amount of bytes that will be buffered before the
    /// stream is terminated. This is to prevent malformed streams from causing the server to run out
    /// of memory.
    ///
    /// As a rule of thumb, this number should be at least as large as the largest entry in the stream.
    /// Additionally, it's best if it's a power of 2 minus 1 (e.g. 1023, 2047, 4095, etc.) as this
    /// allows the internal buffer to be more efficient. This is not a requirement, however.
    ///
    /// Lastly, it is not guaranteed that this is the maximum amount of memory that will be used by
    /// the stream. This is because the internal buffer _may_ allocate 2x the amount of bytes specified
    /// as well as waste some space in the buffer.
    pub fn new_with_max_capacity(stream: S, max_capacity: usize) -> Self {
        Self {
            stream,
            entry_buffer: Vec::new(),
            byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)),
            finished: false,
            max_buffer_capacity: max_capacity
        }
    }

    /// Controls how large the internal buffer can grow in bytes. If the buffer grows larger than this
    /// the stream is terminated as it is assumed that the stream is malformed. If this number is too
    /// large, a malformed stream can cause the server to run out of memory.
    ///
    /// As a rule of thumb, this number should be at least as large as the largest entry in the stream.
    ///
    /// The default value is 8 MB.
    pub fn set_max_buffer_size(&mut self, max_capacity: usize) {
        self.max_buffer_capacity = max_capacity;
    }

    /// Marks this stream as "finished" which means that no more entries will be read from the
    /// underlying stream. While this stream is likely going to be dropped soon, we might as well
    /// clear memory we do not need to use.
    fn finish(mut self: Pin<&mut Self>) {
        self.finished = true;
        self.entry_buffer.clear();
        self.entry_buffer.shrink_to_fit();
        self.byte_buffer.clear();
        self.byte_buffer.shrink_to_fit();
    }
}

impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
    where
        T: DeserializeOwned,
        B: Deref<Target = [u8]>,
        S: Stream<Item = Result<B, E>> + Unpin
{
    type Item = Result<T, E>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // efficiently check if we should stop
        if self.finished {
            return Poll::Ready(None);
        }

        let mut this = self.as_mut().project();

        loop {
            // if we have an entry, we should return it immediately
            if let Some(entry) = this.entry_buffer.pop() {
                return Poll::Ready(Some(Ok(entry)));
            }

            // try to fetch the next chunk
            let next_chunk = loop {
                match ready!(this.stream.as_mut().poll_next(cx)) {
                    Some(Ok(chunk)) => break chunk,
                    Some(Err(err)) => {
                        self.finish();
                        return Poll::Ready(Some(Err(err)));
                    },
                    None => {
                        self.finish();
                        return Poll::Ready(None);
                    }
                }
            };

            // if there is no room for this chunk, we should give up
            match this.byte_buffer.len().checked_add(next_chunk.len()) {
                Some(new_size) if new_size > DEFAULT_MAX_BUFFER_CAPACITY => {
                    // no room for this chunk
                    self.finish();
                    return Poll::Ready(None);
                },
                None => {
                    // overflow occurred
                    self.finish();
                    return Poll::Ready(None);
                }
                _ => {}
            }

            // room is available, so let's add the chunk
            this.byte_buffer.extend(&*next_chunk);

            // we know that all of the data will be located in the first slice as we've not removed data from the front yet
            let (buffer, _) = this.byte_buffer.as_slices();
            let mut json_iter = Deserializer::from_slice(buffer).into_iter::<T>();
            let mut last_read_pos = 0;

            // read each entry from the buffer
            loop {
                match json_iter.next() {
                    Some(Ok(entry)) => {
                        last_read_pos = json_iter.byte_offset();
                        this.entry_buffer.push(entry);
                    },
                    // if there was an error, log it but move on because this could be a partial entry
                    Some(Err(err)) => {
                        trace!(err = ?err, "failed to parse json entry");
                        break
                    },
                    // nothing left then we move on
                    None => break
                }
            }

            // remove the read bytes - this is very efficient as it's a ring buffer
            let _ = this.byte_buffer.drain(..last_read_pos);
            // realign the buffer to the beginning so we can get contiguous slices
            // we want to do this with all of the read bytes removed because this operation becomes a memcpy
            // if we waited until after we added bytes again, it could devolve into a much slower operation
            this.byte_buffer.make_contiguous();
        }
    }
}