rsonpath/input/
buffered.rs

1//! Acquires a [`Read`] instance and reads it in on-demand in a buffer.
2//! All of the bytes read are kept in memory.
3//!
4//! Choose this implementation if:
5//!
6//! 1. You have a [`Read`] source that might contain relatively large amounts
7//!    of data.
8//! 2. You want to run the JSONPath query on the input and then discard it.
9//!
10//! ## Performance characteristics
11//!
12//! This is the best choice for a relatively large read-once input that is not a file
13//! (or when memory maps are not supported). It is faster than first reading all of
14//! the contents and then passing them to [`BorrowedBytes`](`super::BorrowedBytes`). It is, however,
15//! slow compared to other choices. If you know the approximate length of input,
16//! use the [`with_capacity`](`BufferedInput::with_capacity`) function to avoid
17//! reallocating the internal buffers.
18
19use super::{
20    error::InputError, repr_align_block_size, Input, InputBlock, InputBlockIterator, SliceSeekable, MAX_BLOCK_SIZE,
21};
22use crate::{error::InternalRsonpathError, result::InputRecorder, string_pattern::StringPattern, JSON_SPACE_BYTE};
23use std::{cell::RefCell, io::Read, ops::Deref, slice};
24
25// The buffer has to be a multiple of MAX_BLOCK_SIZE.
26// It could technically be as small as MAX_BLOCK_SIZE, but there is a performance consideration.
27// The fewer reads we make, the smoother the pipeline of the engine can go.
28// 8KB is too little and hurts performance. 64KB appears to be a good compromise.
29const BUF_SIZE: usize = 64 * 1024;
30
31static_assertions::const_assert!(BUF_SIZE >= MAX_BLOCK_SIZE);
32static_assertions::const_assert!(BUF_SIZE % MAX_BLOCK_SIZE == 0);
33
34/// Input supporting a buffered read over a [`Read`] implementation.
35pub struct BufferedInput<R>(RefCell<InternalBuffer<R>>);
36
37struct InternalBuffer<R> {
38    source: R,
39    bytes: Vec<BufferedChunk>,
40    chunk_idx: usize,
41    source_read: usize,
42    eof: bool,
43}
44
45repr_align_block_size! {
46    struct BufferedChunk([u8; BUF_SIZE]);
47}
48
49/// Iterator over a [`BufferedInput`].
50pub struct BufferedInputBlockIterator<'a, 'r, R, IR, const N: usize> {
51    input: &'a BufferedInput<R>,
52    idx: usize,
53    recorder: &'r IR,
54}
55
56/// Block returned from a [`BufferedInputBlockIterator`].
57pub struct BufferedInputBlock<const N: usize>([u8; N]);
58
59impl<R: Read> InternalBuffer<R> {
60    fn as_slice(&self) -> &[u8] {
61        let len = self.len();
62        let ptr = self.bytes.as_slice().as_ptr().cast();
63
64        // SAFETY: BufferedChunk has the same layout as an array of bytes due to repr(C).
65        // `BUF_SIZE >= MAX_BLOCK_SIZE`, and `BUF_SIZE` is a multiple of `MAX_BLOCK_SIZE`
66        // (static asserts at the top), so [BufferedChunk; N] has the same repr as [[u8; BUF_SIZE]; N],
67        // which in turn is guaranteed to have the same repr as [u8; BUF_SIZE * N].
68        // https://doc.rust-lang.org/reference/type-layout.html#array-layout
69        unsafe { slice::from_raw_parts(ptr, len) }
70    }
71
72    fn len(&self) -> usize {
73        self.chunk_idx * BUF_SIZE
74    }
75
76    fn read_more(&mut self) -> Result<bool, InputError> {
77        if self.eof {
78            return Ok(false);
79        }
80
81        if self.chunk_idx == self.bytes.len() {
82            self.bytes.push(BufferedChunk([JSON_SPACE_BYTE; BUF_SIZE]));
83        }
84
85        let buf = &mut self.bytes[self.chunk_idx].0;
86        let mut total = 0;
87        self.chunk_idx += 1;
88
89        while total < BUF_SIZE && !self.eof {
90            let size = self.source.read(&mut buf[total..])?;
91
92            if size == 0 {
93                self.eof = true;
94            }
95
96            total += size;
97            self.source_read += size;
98        }
99
100        Ok(total > 0)
101    }
102}
103
104impl<R: Read> BufferedInput<R> {
105    /// Create a new [`BufferedInput`] reading from the given `source`.
106    #[inline]
107    pub fn new(source: R) -> Self {
108        Self(RefCell::new(InternalBuffer {
109            source,
110            bytes: vec![],
111            eof: false,
112            chunk_idx: 0,
113            source_read: 0,
114        }))
115    }
116
117    /// Create a new [`BufferedInput`] reading from the given `source`,
118    /// preallocating at least `capacity` bytes up front.
119    #[inline]
120    pub fn with_capacity(source: R, capacity: usize) -> Self {
121        let blocks_needed = capacity / MAX_BLOCK_SIZE + 1;
122        Self(RefCell::new(InternalBuffer {
123            source,
124            bytes: Vec::with_capacity(blocks_needed),
125            eof: false,
126            chunk_idx: 0,
127            source_read: 0,
128        }))
129    }
130}
131
132impl<R: Read> Input for BufferedInput<R> {
133    type BlockIterator<'a, 'r, IR, const N: usize>
134        = BufferedInputBlockIterator<'a, 'r, R, IR, N>
135    where
136        Self: 'a,
137        IR: InputRecorder<BufferedInputBlock<N>> + 'r;
138
139    type Error = InputError;
140    type Block<'a, const N: usize>
141        = BufferedInputBlock<N>
142    where
143        Self: 'a;
144
145    #[inline(always)]
146    fn leading_padding_len(&self) -> usize {
147        0
148    }
149
150    #[inline(always)]
151    fn trailing_padding_len(&self) -> usize {
152        let rem = self.0.borrow().source_read % BUF_SIZE;
153        if rem == 0 {
154            0
155        } else {
156            BUF_SIZE - rem
157        }
158    }
159
160    #[inline(always)]
161    fn iter_blocks<'i, 'r, IR, const N: usize>(&'i self, recorder: &'r IR) -> Self::BlockIterator<'i, 'r, IR, N>
162    where
163        IR: InputRecorder<Self::Block<'i, N>>,
164    {
165        BufferedInputBlockIterator {
166            input: self,
167            idx: 0,
168            recorder,
169        }
170    }
171
172    #[inline(always)]
173    fn seek_backward(&self, from: usize, needle: u8) -> Option<usize> {
174        let buf = self.0.borrow();
175        buf.as_slice().seek_backward(from, needle)
176    }
177
178    #[inline]
179    fn seek_forward<const N: usize>(&self, from: usize, needles: [u8; N]) -> Result<Option<(usize, u8)>, InputError> {
180        let mut buf = self.0.borrow_mut();
181        let mut moving_from = from;
182
183        loop {
184            let res = buf.as_slice().seek_forward(moving_from, needles);
185
186            moving_from = buf.len();
187
188            if res.is_some() {
189                return Ok(res);
190            } else if !buf.read_more()? {
191                return Ok(None);
192            }
193        }
194    }
195
196    #[inline]
197    fn seek_non_whitespace_forward(&self, from: usize) -> Result<Option<(usize, u8)>, InputError> {
198        let mut buf = self.0.borrow_mut();
199        let mut moving_from = from;
200
201        loop {
202            let res = buf.as_slice().seek_non_whitespace_forward(moving_from);
203
204            moving_from = buf.len();
205
206            if res.is_some() {
207                return Ok(res);
208            } else if !buf.read_more()? {
209                return Ok(None);
210            }
211        }
212    }
213
214    #[inline(always)]
215    fn seek_non_whitespace_backward(&self, from: usize) -> Option<(usize, u8)> {
216        let buf = self.0.borrow();
217        buf.as_slice().seek_non_whitespace_backward(from)
218    }
219
220    #[inline(always)]
221    fn is_member_match(&self, from: usize, to: usize, member: &StringPattern) -> Result<bool, Self::Error> {
222        let mut buf = self.0.borrow_mut();
223
224        while buf.len() < to {
225            if !buf.read_more()? {
226                return Ok(false);
227            }
228        }
229
230        let bytes = buf.as_slice();
231        let slice = &bytes[from..to];
232        Ok(member.quoted() == slice && (from == 0 || bytes[from - 1] != b'\\'))
233    }
234}
235
236impl<'a, R: Read, IR, const N: usize> InputBlockIterator<'a, N> for BufferedInputBlockIterator<'a, '_, R, IR, N>
237where
238    IR: InputRecorder<BufferedInputBlock<N>>,
239{
240    type Block = BufferedInputBlock<N>;
241    type Error = InputError;
242
243    #[inline]
244    fn next(&mut self) -> Result<Option<Self::Block>, Self::Error> {
245        let buf = self.input.0.borrow();
246
247        if self.idx + N <= buf.len() {
248            let slice = &buf.as_slice()[self.idx..self.idx + N];
249            let block: [u8; N] = slice
250                .try_into()
251                .map_err(|err| InternalRsonpathError::from_error(err, "slice of size N is not of size N"))?;
252            self.idx += N;
253
254            self.recorder.record_block_start(BufferedInputBlock(block));
255
256            Ok(Some(BufferedInputBlock(block)))
257        } else {
258            drop(buf);
259            let mut buf_mut = self.input.0.borrow_mut();
260
261            if !buf_mut.read_more()? {
262                Ok(None)
263            } else {
264                drop(buf_mut);
265                self.next()
266            }
267        }
268    }
269
270    #[inline(always)]
271    fn offset(&mut self, count: isize) {
272        assert!(count >= 0);
273        self.idx += count as usize * N;
274    }
275
276    #[inline(always)]
277    fn get_offset(&self) -> usize {
278        self.idx
279    }
280}
281
282impl<const N: usize> Deref for BufferedInputBlock<N> {
283    type Target = [u8];
284
285    #[inline(always)]
286    fn deref(&self) -> &Self::Target {
287        &self.0
288    }
289}
290
291impl<const N: usize> InputBlock<'_, N> for BufferedInputBlock<N> {
292    #[inline(always)]
293    fn halves(&self) -> (&[u8], &[u8]) {
294        assert_eq!(N % 2, 0);
295        (&self[..N / 2], &self[N / 2..])
296    }
297}