ion_rs/
data_source.rs

1use std::fs::File;
2use std::io;
3use std::io::{BufRead, BufReader, Read, StdinLock};
4
5use crate::result::{decoding_error, IonError, IonResult};
6
7/// Optimized read operations for parsing Ion.
8///
9/// The [binary Ion spec](https://amazon-ion.github.io/ion-docs/docs/binary.html) calls for a number of reading
10/// patterns, including:
11///
12/// * Type descriptor octets (value headers) require that a single byte be read from input.
13/// * Variable length integers (both signed and unsigned) require that a single byte at a time be
14///   read from the data source until some condition is met.
15/// * Fixed length values require that `n` bytes be read from the data source and interpreted as a
16///   single value.
17/// * Skipping over values, partial or whole, requires that the next `n` bytes of the data source be
18///   ignored altogether.
19///
20/// The IonDataSource trait extends functionality offered by the [BufRead] trait by providing
21/// methods that are tailored to these use cases. They have been optimized to prefer operating
22/// on data that's already in the input buffer in-place rather than copying it out to another
23/// byte array.
24pub trait IonDataSource: BufRead {
25    /// Ignore the next `number_of_bytes` bytes in the data source.
26    fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()> {
27        let mut bytes_skipped = 0;
28        while bytes_skipped < number_of_bytes {
29            let buffer = self.fill_buf()?;
30            if buffer.is_empty() {
31                // TODO: IonResult should have a distinct `IncompleteData` error case
32                //       https://github.com/amazon-ion/ion-rust/issues/299
33                return decoding_error("Unexpected end of stream.");
34            }
35            let bytes_in_buffer = buffer.len();
36            let bytes_to_skip = (number_of_bytes - bytes_skipped).min(bytes_in_buffer);
37            self.consume(bytes_to_skip);
38            bytes_skipped += bytes_to_skip;
39        }
40        Ok(())
41    }
42
43    /// Returns the next byte in the data source if available.
44    fn next_byte(&mut self) -> IonResult<Option<u8>> {
45        match self.fill_buf()?.first() {
46            Some(&byte) => {
47                self.consume(1);
48                Ok(Some(byte))
49            }
50            None => Ok(None),
51        }
52    }
53
54    /// Calls `byte_processor` on each byte in the data source until it returns false.
55    /// Returns the number of bytes that were read and processed.
56    fn read_next_byte_while<F>(&mut self, byte_processor: &mut F) -> IonResult<usize>
57    where
58        F: FnMut(u8) -> bool,
59    {
60        // The number of bytes that have been processed by the provided closure
61        let mut number_of_bytes_processed: usize = 0;
62        // The number of bytes currently available in the data source's input buffer
63        let mut number_of_buffered_bytes: usize;
64        // The number of bytes that have been flushed from the input buffer after processing them
65        let mut number_of_bytes_consumed: usize = 0;
66
67        loop {
68            // Get a reference to the data source's input buffer, refilling it if it's empty.
69            let buffer = self.fill_buf()?;
70            number_of_buffered_bytes = buffer.len();
71
72            if number_of_buffered_bytes == 0 {
73                // TODO: IonResult should have a distinct `IncompleteData` error case
74                //       https://github.com/amazon-ion/ion-rust/issues/299
75                return decoding_error("Unexpected end of stream.");
76            }
77
78            // Iterate over the bytes already in the buffer, calling the provided lambda on each
79            // one.
80            for byte in buffer {
81                number_of_bytes_processed += 1;
82                if !byte_processor(*byte) {
83                    // If the lambda is finished reading, notify the data source of how many bytes
84                    // we've read from the buffer so they can be removed.
85                    self.consume(number_of_bytes_processed - number_of_bytes_consumed);
86                    return Ok(number_of_bytes_processed);
87                }
88            }
89
90            // If we read all of the available data in the buffer but the lambda isn't finished yet,
91            // empty the buffer so the next loop iteration will refill it.
92            self.consume(number_of_buffered_bytes);
93            number_of_bytes_consumed += number_of_buffered_bytes;
94        }
95    }
96
97    /// Calls `slice_processor` on a slice containing the next `length` bytes from the
98    /// data source. If the required bytes are already in the input buffer, a reference to that
99    /// slice of the input buffer will be used. If they are not, the required bytes will be read
100    /// into `fallback_buffer` and that will be used instead. If `fallback_buffer` does not have
101    /// enough capacity to store the requested data, it will be resized. It will never be shrunk,
102    /// however--it is the caller's responsibility to manage this memory.
103    fn read_slice<T, F>(
104        &mut self,
105        length: usize,
106        fallback_buffer: &mut Vec<u8>,
107        slice_processor: F,
108    ) -> IonResult<T>
109    where
110        F: FnOnce(&[u8]) -> IonResult<T>,
111    {
112        // Get a reference to the data source's input buffer, refilling it if it's empty.
113        let buffer = self.fill_buf()?;
114
115        // If the buffer is still empty, we've run out of data.
116        if buffer.is_empty() && length > 0 {
117            // TODO: IonResult should have a distinct `IncompleteData` error case
118            //       https://github.com/amazon-ion/ion-rust/issues/299
119            return decoding_error("Unexpected end of stream.");
120        }
121
122        // If the requested value is already in our input buffer, there's no need to copy it out
123        // into a separate buffer. We can return a slice of the input buffer and consume() that
124        // number of bytes.
125        if buffer.len() >= length {
126            let result = slice_processor(&buffer[..length]);
127            self.consume(length);
128            return result;
129        }
130
131        // Grow the Vec to accommodate the requested data if needed
132        let buffer: &mut [u8] = if fallback_buffer.len() < length {
133            fallback_buffer.resize(length, 0);
134            // Get a mutable reference to the underlying byte array
135            fallback_buffer.as_mut()
136        } else {
137            // Otherwise, split the Vec's underlying storage to get a &mut [u8] slice of the
138            // required size
139            let (required_buffer, _) = fallback_buffer.split_at_mut(length);
140            required_buffer
141        };
142
143        // Fill the fallback buffer with bytes from the data source
144        match self.read_exact(buffer) {
145            Ok(()) => slice_processor(buffer),
146            Err(ref e) if e.kind() == std::io::ErrorKind::UnexpectedEof =>
147            // TODO: IonResult should have a distinct `IncompleteData` error case
148            //       https://github.com/amazon-ion/ion-rust/issues/299
149            {
150                decoding_error("Unexpected end of stream.")
151            }
152            Err(io_error) => Err(IonError::IoError { source: io_error }),
153        }
154    }
155}
156
157impl<T> IonDataSource for T where T: BufRead {}
158
159#[cfg(test)]
160mod tests {
161    use super::IonDataSource;
162    use crate::result::IonError;
163    use std::io::BufReader;
164
165    fn test_data(buffer_size: usize, data: &'static [u8]) -> impl IonDataSource {
166        BufReader::with_capacity(buffer_size, data)
167    }
168
169    #[test]
170    fn test_next_byte() {
171        let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
172
173        assert_eq!(Some(1), data_source.next_byte().unwrap());
174        assert_eq!(Some(2), data_source.next_byte().unwrap());
175        assert_eq!(Some(3), data_source.next_byte().unwrap());
176        assert_eq!(Some(4), data_source.next_byte().unwrap());
177        assert_eq!(Some(5), data_source.next_byte().unwrap());
178    }
179
180    #[test]
181    fn test_skip_bytes() {
182        let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
183        data_source.skip_bytes(3).unwrap();
184        assert_eq!(Some(4), data_source.next_byte().unwrap());
185        data_source.skip_bytes(1).unwrap();
186        assert_eq!(None, data_source.next_byte().unwrap());
187    }
188
189    #[test]
190    fn test_read_next_byte_while() {
191        let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
192        let mut sum: u64 = 0;
193        let processor = &mut |byte: u8| {
194            sum += byte as u64;
195            byte < 4
196        };
197        let number_of_bytes_processed = data_source.read_next_byte_while(processor).unwrap();
198        assert_eq!(number_of_bytes_processed, 4);
199        assert_eq!(sum, 10);
200    }
201
202    #[test]
203    fn test_read_slice() {
204        let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
205        let processor = &mut |data: &[u8]| Ok(data.iter().map(|byte| *byte as i32).sum());
206        let sum = data_source
207            .read_slice(4, &mut Vec::new(), processor)
208            .unwrap();
209        assert_eq!(10, sum);
210    }
211
212    #[test]
213    fn test_eof_during_skip_bytes() {
214        let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
215
216        // We expect to encounter an end-of-file error because we're skipping more bytes than
217        // we have in input.
218        let result = data_source.skip_bytes(42);
219
220        // TODO: IonResult should have a distinct `IncompleteData` error case
221        //       https://github.com/amazon-ion/ion-rust/issues/299
222        assert!(matches!(result, Err(IonError::DecodingError { .. })));
223    }
224
225    #[test]
226    fn test_eof_during_read_next_byte_while() {
227        let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
228        let mut sum: u64 = 0;
229        // This processor reads until it finds a 6, which our data does not contain.
230        let processor = &mut |byte: u8| {
231            sum += byte as u64;
232            byte != 6
233        };
234        // We expect to encounter an end-of-file error because the data ends before the processor
235        // is satisfied.
236        let result = data_source.read_next_byte_while(processor);
237
238        // TODO: IonResult should have a distinct `IncompleteData` error case
239        //       https://github.com/amazon-ion/ion-rust/issues/299
240        assert!(matches!(result, Err(IonError::DecodingError { .. })));
241    }
242
243    #[test]
244    fn test_eof_during_read_slice() {
245        let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
246        let mut fallback_buffer = vec![];
247        // This trivial processor will report the length of the slice it's passed.
248        // It will not be used because we expect to encounter an error before then.
249        let processor = &mut |bytes: &[u8]| Ok(bytes.len());
250        // We expect to encounter an end-of-file error because the input buffer doesn't have
251        // enough data.
252        let result = data_source.read_slice(
253            42, // Number of bytes requested from input
254            &mut fallback_buffer,
255            processor,
256        );
257
258        // TODO: IonResult should have a distinct `IncompleteData` error case
259        //       https://github.com/amazon-ion/ion-rust/issues/299
260        assert!(matches!(result, Err(IonError::DecodingError { .. })));
261    }
262}
263
264/// Types that implement this trait can be converted into an implementation of [io::BufRead],
265/// allowing users to build a [Reader](crate::reader::Reader) from a variety of types that might not
266/// define I/O operations on their own.
267pub trait ToIonDataSource {
268    type DataSource: IonDataSource;
269    fn to_ion_data_source(self) -> Self::DataSource;
270}
271
272impl ToIonDataSource for String {
273    type DataSource = io::Cursor<Self>;
274
275    fn to_ion_data_source(self) -> Self::DataSource {
276        io::Cursor::new(self)
277    }
278}
279
280impl<'a> ToIonDataSource for &'a str {
281    type DataSource = io::Cursor<Self>;
282
283    fn to_ion_data_source(self) -> Self::DataSource {
284        io::Cursor::new(self)
285    }
286}
287
288impl<'a> ToIonDataSource for &'a [u8] {
289    type DataSource = io::Cursor<Self>;
290
291    fn to_ion_data_source(self) -> Self::DataSource {
292        io::Cursor::new(self)
293    }
294}
295
296impl<'a, const N: usize> ToIonDataSource for &'a [u8; N] {
297    type DataSource = io::Cursor<Self>;
298
299    fn to_ion_data_source(self) -> Self::DataSource {
300        io::Cursor::new(self)
301    }
302}
303
304impl ToIonDataSource for Vec<u8> {
305    type DataSource = io::Cursor<Self>;
306
307    fn to_ion_data_source(self) -> Self::DataSource {
308        io::Cursor::new(self)
309    }
310}
311
312impl<T: BufRead, U: BufRead> ToIonDataSource for io::Chain<T, U> {
313    type DataSource = Self;
314
315    fn to_ion_data_source(self) -> Self::DataSource {
316        self
317    }
318}
319
320impl<T> ToIonDataSource for io::Cursor<T>
321where
322    T: AsRef<[u8]>,
323{
324    type DataSource = Self;
325
326    fn to_ion_data_source(self) -> Self::DataSource {
327        self
328    }
329}
330
331impl<T: Read> ToIonDataSource for BufReader<T> {
332    type DataSource = Self;
333
334    fn to_ion_data_source(self) -> Self::DataSource {
335        self
336    }
337}
338
339impl ToIonDataSource for File {
340    type DataSource = BufReader<Self>;
341
342    fn to_ion_data_source(self) -> Self::DataSource {
343        BufReader::new(self)
344    }
345}
346
347// Allows Readers to consume Ion directly from STDIN
348impl<'a> ToIonDataSource for StdinLock<'a> {
349    type DataSource = Self;
350
351    fn to_ion_data_source(self) -> Self::DataSource {
352        self
353    }
354}