#![deny(missing_docs)]
use std::collections::VecDeque;
use std::ops::Deref;
use std::pin::Pin;
use std::task::{Context, Poll, ready};
use futures::Stream;
use serde::de::DeserializeOwned;
use pin_project::pin_project;
use serde_json::Deserializer;
use tracing::trace;
const DEFAULT_BUFFER_CAPACITY: usize = 1024 * 256 - 1; pub const DEFAULT_MAX_BUFFER_CAPACITY: usize = 1024 * 1024 * 8 - 1;
#[pin_project]
pub struct JsonStream<T, S> {
#[pin]
stream: S,
entry_buffer: Vec<T>,
byte_buffer: VecDeque<u8>,
finished: bool,
max_buffer_capacity: usize,
}
impl<T, S: Unpin> JsonStream<T, S> {
pub fn new(stream: S) -> Self {
Self::new_with_max_capacity(stream, DEFAULT_MAX_BUFFER_CAPACITY)
}
pub fn new_with_max_capacity(stream: S, max_capacity: usize) -> Self {
Self {
stream,
entry_buffer: Vec::new(),
byte_buffer: VecDeque::with_capacity(std::cmp::min(DEFAULT_BUFFER_CAPACITY, max_capacity)),
finished: false,
max_buffer_capacity: max_capacity
}
}
pub fn set_max_buffer_size(&mut self, max_capacity: usize) {
self.max_buffer_capacity = max_capacity;
}
fn finish(mut self: Pin<&mut Self>) {
self.finished = true;
self.entry_buffer.clear();
self.entry_buffer.shrink_to_fit();
self.byte_buffer.clear();
self.byte_buffer.shrink_to_fit();
}
}
impl<T: DeserializeOwned, S, B, E> Stream for JsonStream<T, S>
where
T: DeserializeOwned,
B: Deref<Target = [u8]>,
S: Stream<Item = Result<B, E>> + Unpin
{
type Item = Result<T, E>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if self.finished {
return Poll::Ready(None);
}
let mut this = self.as_mut().project();
loop {
if let Some(entry) = this.entry_buffer.pop() {
return Poll::Ready(Some(Ok(entry)));
}
let next_chunk = loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(chunk)) => break chunk,
Some(Err(err)) => {
self.finish();
return Poll::Ready(Some(Err(err)));
},
None => {
self.finish();
return Poll::Ready(None);
}
}
};
match this.byte_buffer.len().checked_add(next_chunk.len()) {
Some(new_size) if new_size > DEFAULT_MAX_BUFFER_CAPACITY => {
self.finish();
return Poll::Ready(None);
},
None => {
self.finish();
return Poll::Ready(None);
}
_ => {}
}
this.byte_buffer.extend(&*next_chunk);
this.byte_buffer.make_contiguous();
let (buffer, _) = this.byte_buffer.as_slices();
let mut json_iter = Deserializer::from_slice(buffer).into_iter::<T>();
let mut last_read_pos = 0;
loop {
match json_iter.next() {
Some(Ok(entry)) => {
last_read_pos = json_iter.byte_offset();
this.entry_buffer.push(entry);
},
Some(Err(err)) => {
trace!(err = ?err, "failed to parse json entry");
break
},
None => break
}
}
let _ = this.byte_buffer.drain(..last_read_pos);
this.byte_buffer.make_contiguous();
}
}
}