ion_rs/
blocking_reader.rs

1use delegate::delegate;
2use std::ops::Range;
3
4use crate::binary::non_blocking::raw_binary_reader::RawBinaryReader;
5use crate::data_source::ToIonDataSource;
6use crate::element::{Blob, Clob};
7use crate::raw_reader::BufferedRawReader;
8use crate::result::IonResult;
9use crate::stream_reader::IonReader;
10use crate::text::non_blocking::raw_text_reader::RawTextReader;
11use crate::types::Timestamp;
12use crate::{Decimal, Int, IonError, IonType, Str};
13
14pub type BlockingRawTextReader<T> = BlockingRawReader<RawTextReader<Vec<u8>>, T>;
15pub type BlockingRawBinaryReader<T> = BlockingRawReader<RawBinaryReader<Vec<u8>>, T>;
16
17/// The BlockingRawReader wraps a non-blocking RawReader that implements the BufferedReader trait,
18/// providing a blocking RawReader.
19pub struct BlockingRawReader<R: BufferedRawReader, T: ToIonDataSource> {
20    source: T::DataSource,
21    reader: R,
22    expected_read_size: usize,
23}
24
25const READER_DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
26
27impl<R: BufferedRawReader, T: ToIonDataSource> BlockingRawReader<R, T> {
28    pub fn read_source(&mut self, length: usize) -> IonResult<usize> {
29        let mut bytes_read = 0;
30        loop {
31            let n = self.reader.read_from(&mut self.source, length)?;
32            bytes_read += n;
33            if n == 0 || bytes_read >= length {
34                break;
35            }
36        }
37        Ok(bytes_read)
38    }
39
40    pub fn new(input: T) -> IonResult<Self> {
41        Self::new_with_size(input, READER_DEFAULT_BUFFER_CAPACITY)
42    }
43
44    pub fn new_with_size(input: T, size: usize) -> IonResult<Self> {
45        let buffer = Vec::with_capacity(size);
46        let mut reader = Self {
47            source: input.to_ion_data_source(),
48            reader: buffer.into(),
49            expected_read_size: size,
50        };
51        reader.read_source(size)?;
52        Ok(reader)
53    }
54}
55
56impl<R: BufferedRawReader, T: ToIonDataSource> IonReader for BlockingRawReader<R, T> {
57    type Item = R::Item;
58    type Symbol = R::Symbol;
59
60    fn ion_version(&self) -> (u8, u8) {
61        (1, 0)
62    }
63
64    fn next(&mut self) -> IonResult<Self::Item> {
65        let mut read_size = self.expected_read_size;
66
67        loop {
68            let result = self.reader.next();
69            if let Err(IonError::Incomplete { .. }) = result {
70                let bytes_read = self.read_source(read_size)?;
71                // if we have no bytes, and our stream has been marked as fully loaded, then we
72                // need to bubble up the error. Otherwise, if our stream has not been marked as
73                // loaded, then we need to mark it as loaded and retry.
74                if 0 == bytes_read {
75                    if self.reader.is_stream_complete() {
76                        return result;
77                    } else {
78                        self.reader.stream_complete();
79                    }
80                }
81                // The assumption here is that most buffer sizes will start at a magnitude the user
82                // is comfortable with in terms of memory usage. So if we're reading more in order
83                // to reach a parsable point we do not want to start consuming more than an order of
84                // magnitude more memory just to get there.
85                read_size = std::cmp::min(read_size * 2, self.expected_read_size * 10);
86            } else {
87                return result;
88            }
89        }
90    }
91
92    fn current(&self) -> Self::Item {
93        self.reader.current()
94    }
95
96    fn ion_type(&self) -> Option<IonType> {
97        self.reader.ion_type()
98    }
99
100    fn is_null(&self) -> bool {
101        self.reader.is_null()
102    }
103
104    fn annotations<'a>(&'a self) -> Box<dyn Iterator<Item = IonResult<Self::Symbol>> + 'a> {
105        self.reader.annotations()
106    }
107
108    fn has_annotations(&self) -> bool {
109        self.reader.has_annotations()
110    }
111
112    fn number_of_annotations(&self) -> usize {
113        self.reader.number_of_annotations()
114    }
115
116    fn field_name(&self) -> IonResult<Self::Symbol> {
117        self.reader.field_name()
118    }
119
120    fn read_null(&mut self) -> IonResult<IonType> {
121        self.reader.read_null()
122    }
123
124    fn read_bool(&mut self) -> IonResult<bool> {
125        self.reader.read_bool()
126    }
127
128    fn read_int(&mut self) -> IonResult<Int> {
129        self.reader.read_int()
130    }
131
132    fn read_i64(&mut self) -> IonResult<i64> {
133        self.reader.read_i64()
134    }
135
136    fn read_f32(&mut self) -> IonResult<f32> {
137        self.reader.read_f32()
138    }
139
140    fn read_f64(&mut self) -> IonResult<f64> {
141        self.reader.read_f64()
142    }
143
144    fn read_decimal(&mut self) -> IonResult<Decimal> {
145        self.reader.read_decimal()
146    }
147
148    fn read_string(&mut self) -> IonResult<Str> {
149        self.reader.read_string()
150    }
151
152    fn read_str(&mut self) -> IonResult<&str> {
153        self.reader.read_str()
154    }
155
156    fn read_symbol(&mut self) -> IonResult<Self::Symbol> {
157        self.reader.read_symbol()
158    }
159
160    fn read_blob(&mut self) -> IonResult<Blob> {
161        self.reader.read_blob()
162    }
163
164    fn read_clob(&mut self) -> IonResult<Clob> {
165        self.reader.read_clob()
166    }
167
168    fn read_timestamp(&mut self) -> IonResult<Timestamp> {
169        self.reader.read_timestamp()
170    }
171
172    fn step_in(&mut self) -> IonResult<()> {
173        self.reader.step_in()
174    }
175
176    fn step_out(&mut self) -> IonResult<()> {
177        let mut read_size = self.expected_read_size;
178        loop {
179            let result = self.reader.step_out();
180            if let Err(IonError::Incomplete { .. }) = result {
181                if 0 == self.read_source(read_size)? {
182                    return result;
183                }
184            } else {
185                return result;
186            }
187            read_size = std::cmp::min(read_size * 2, self.expected_read_size * 10);
188        }
189    }
190
191    fn parent_type(&self) -> Option<IonType> {
192        self.reader.parent_type()
193    }
194
195    fn depth(&self) -> usize {
196        self.reader.depth()
197    }
198}
199
200impl<T: ToIonDataSource> BlockingRawReader<RawBinaryReader<Vec<u8>>, T> {
201    delegate! {
202        to self.reader {
203            pub fn raw_bytes(&self) ->  Option<&[u8]>;
204
205            pub fn field_id_length(&self) -> Option<usize>;
206            pub fn field_id_offset(&self) -> Option<usize>;
207            pub fn field_id_range(&self) -> Option<Range<usize>>;
208            pub fn raw_field_id_bytes(&self) -> Option<&[u8]>;
209
210            pub fn annotations_length(&self) -> Option<usize>;
211            pub fn annotations_offset(&self) -> Option<usize>;
212            pub fn annotations_range(&self) -> Option<Range<usize>>;
213            pub fn raw_annotations_bytes(&self) -> Option<&[u8]>;
214
215            pub fn header_length(&self) -> usize;
216            pub fn header_offset(&self) -> usize;
217            pub fn header_range(&self) -> Range<usize>;
218            pub fn raw_header_bytes(&self) -> Option<&[u8]>;
219
220            pub fn value_length(&self) -> usize;
221            pub fn value_offset(&self) -> usize;
222            pub fn value_range(&self) -> Range<usize>;
223            pub fn raw_value_bytes(&self) -> Option<&[u8]>;
224        }
225    }
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231    use crate::binary::non_blocking::raw_binary_reader::RawBinaryReader as NBRawBinaryReader;
232    use crate::raw_reader::RawStreamItem;
233    use crate::result::IonResult;
234    use crate::text::non_blocking::raw_text_reader::RawTextReader;
235
236    fn bin_reader(source: &[u8]) -> BlockingRawBinaryReader<Vec<u8>> {
237        let reader = BlockingRawReader::<NBRawBinaryReader<Vec<u8>>, Vec<u8>>::new(source.to_vec());
238        reader.unwrap()
239    }
240
241    fn text_reader(source: &[u8]) -> BlockingRawTextReader<Vec<u8>> {
242        let reader = BlockingRawReader::<RawTextReader<Vec<u8>>, Vec<u8>>::new(source.to_vec());
243        reader.unwrap()
244    }
245
246    mod data {
247        pub mod binary_reader {
248            // Binary reader data has been converted from the text data. In the cases where symbol
249            // tables were added to the binary output, an empty struct was added to the text
250            // version in order to keep the sequence of tests the same.
251            pub const BASIC_INCOMPLETE: &[u8] = &[
252                0xe0, 0x01, 0x00, 0xea, 0xb6, 0x21, 0x01, 0x21, 0x02, 0x21, // 0x03,
253            ];
254
255            pub const STRING_BASIC: &[u8] = &[
256                0xe0, 0x01, 0x00, 0xea, 0x8b, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x57, 0x6f, 0x72,
257                0x6c, 0x64,
258            ];
259
260            pub const STRUCT_NESTED: &[u8] = &[
261                0xe0, 0x01, 0x00, 0xea, 0xee, 0x95, 0x81, 0x83, 0xde, 0x91, 0x86, 0x71, 0x03, 0x87,
262                0xbc, 0x83, 0x66, 0x6f, 0x6f, 0x83, 0x62, 0x61, 0x72, 0x81, 0x61, 0x81, 0x62, 0xde,
263                0x95, 0x8a, 0xb9, 0x21, 0x01, 0xb4, 0x21, 0x02, 0x21, 0x03, 0x21, 0x04, 0x8b, 0xd8,
264                0x8c, 0x21, 0x05, 0x8d, 0xc3, 0x11, 0x11, 0x11, 0x21, 0x0b,
265            ];
266
267            pub const BASIC_SYMBOL_TABLE: &[u8] = &[
268                /* 0x00 */ 0xe0, 0x01, 0x00, 0xea, 0xee, 0x95, 0x81, 0x83, 0xde, 0x91, 0x86,
269                0x71, 0x03, 0x87, 0xbc, 0x83, 0x66, /* 0x10 */ 0x6f, 0x6f, 0x83, 0x62, 0x61,
270                0x72, 0x83, 0x62, 0x61, 0x7a, 0x71, 0x0a, 0x71, 0x0b, 0x71, 0x0c,
271            ];
272        }
273        pub mod text_reader {
274            pub const BASIC_INCOMPLETE: &[u8] = r#"
275              $ion_1_0
276              [1, 2, 3
277              "#
278            .as_bytes();
279            pub const STRING_BASIC: &[u8] = r#"
280            $ion_1_0
281            "Hello World"
282            "#
283            .as_bytes();
284
285            pub const STRUCT_NESTED: &[u8] = r#"
286                $ion_1_0
287                $ion_symbol_table::{}
288                {
289                    foo: [
290                        1,
291                        [2, 3],
292                        4
293                    ],
294                    bar: {
295                        a: 5,
296                        b: (true true true)
297                    }
298                }
299                11
300            "#
301            .as_bytes();
302
303            pub const BASIC_SYMBOL_TABLE: &[u8] = r#"
304               $ion_1_0
305               $ion_symbol_table::{
306                  imports: $ion_symbol_table,
307                  symbols: ["foo", "bar", "baz"],
308               }
309               $10
310               $11
311               $12
312            "#
313            .as_bytes();
314        }
315    }
316
317    macro_rules! raw_reader_tests {
318        ($($name:ident: $type:ty,)*) => {
319        $(
320            mod $name {
321                use super::*;
322                use super::data::$name::*;
323                use crate::raw_symbol_token::RawSymbolToken;
324
325                fn next_type(reader: &mut BlockingRawReader<$type, Vec<u8>>, ion_type: IonType, is_null: bool) {
326                    assert_eq!(
327                        reader.next().unwrap(),
328                        RawStreamItem::nullable_value(ion_type, is_null)
329                    );
330                }
331
332                // Creates a reader for the specified reader type, with a small (24 byte) read
333                // initial size.
334                fn new_reader(source: &[u8]) -> BlockingRawReader<$type, Vec<u8>> {
335                    let reader = BlockingRawReader::<$type, Vec<u8>>::new_with_size(source.to_vec(), 24);
336                    reader.unwrap()
337                }
338
339                #[test]
340                fn basic_incomplete() -> IonResult<()> {
341                    let reader = &mut new_reader(BASIC_INCOMPLETE);
342                    assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
343                    next_type(reader, IonType::List, false);
344                    reader.step_in()?;
345
346                    next_type(reader, IonType::Int, false);
347                    assert_eq!(reader.read_i64()?, 1);
348                    let result = reader.step_out();
349                    match result {
350                        Err(IonError::Incomplete { .. }) => (),
351                        r => panic!("Unexpected result: {:?}", r),
352                    }
353                    assert!(result.is_err());
354
355                    Ok(())
356                }
357
358                #[test]
359                fn incomplete_string() -> IonResult<()> {
360                    let reader = &mut new_reader(STRING_BASIC);
361                    assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
362                    next_type(reader, IonType::String, false);
363                    assert_eq!(reader.read_string()?, "Hello World");
364                    Ok(())
365                }
366
367                #[test]
368                fn nested_struct() -> IonResult<()> {
369                    let reader = &mut new_reader(STRUCT_NESTED);
370                    assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
371
372                    next_type(reader, IonType::Struct, false); // Version Table
373
374                    next_type(reader, IonType::Struct, false);
375                    reader.step_in()?;
376                    next_type(reader, IonType::List, false);
377                    assert!(reader.field_name().is_ok());
378
379                    reader.step_in()?;
380                    next_type(reader, IonType::Int, false);
381                    assert_eq!(reader.read_i64()?, 1);
382                    next_type(reader, IonType::List, false);
383                    reader.step_in()?;
384                    next_type(reader, IonType::Int, false);
385                    assert_eq!(reader.read_i64()?, 2);
386                    // next_type(reader, IonType::Integer, false);
387                    // assert_eq!(reader.read_i64()?, 3);
388                    reader.step_out()?; // Step out of foo[1]
389                    reader.step_out()?; // Step out of foo
390
391                    next_type(reader, IonType::Struct, false);
392                    assert!(reader.field_name().is_ok());
393
394                    reader.step_in()?;
395                    next_type(reader, IonType::Int, false);
396                    assert_eq!(reader.read_i64()?, 5);
397                    next_type(reader, IonType::SExp, false);
398                    reader.step_in()?;
399                    next_type(reader, IonType::Bool, false);
400                    assert_eq!(reader.read_bool()?, true);
401                    next_type(reader, IonType::Bool, false);
402                    assert_eq!(reader.read_bool()?, true);
403                    next_type(reader, IonType::Bool, false);
404                    assert_eq!(reader.read_bool()?, true);
405                    // The reader is now at the second 'true' in the s-expression nested in 'bar'/'b'
406                    reader.step_out()?; // Step out of bar.b
407                    reader.step_out()?; // Step out of bar
408                    reader.step_out()?; // Step out of our top-levelstruct
409
410                    next_type(reader, IonType::Int, false);
411                    assert_eq!(reader.read_i64()?, 11);
412                    Ok(())
413                }
414
415                #[test]
416                fn basic_symbol_table() -> IonResult<()> {
417                    let reader = &mut new_reader(BASIC_SYMBOL_TABLE);
418                    assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
419
420                    next_type(reader, IonType::Struct, false);
421                    reader.step_in()?;
422
423                    next_type(reader, IonType::Symbol, false);
424
425                    next_type(reader, IonType::List, false);
426                    reader.step_in()?;
427
428                    next_type(reader, IonType::String, false);
429                    assert_eq!(reader.read_string()?, "foo");
430                    next_type(reader, IonType::String, false);
431                    assert_eq!(reader.read_string()?, "bar");
432                    next_type(reader, IonType::String, false);
433                    assert_eq!(reader.read_string()?, "baz");
434
435                    reader.step_out()?; // List
436                    reader.step_out()?; // Symbol table
437                    next_type(reader, IonType::Symbol, false);
438                    assert_eq!(reader.read_symbol()?, RawSymbolToken::SymbolId(10));
439
440                    next_type(reader, IonType::Symbol, false);
441                    assert_eq!(reader.read_symbol()?, RawSymbolToken::SymbolId(11));
442
443                    next_type(reader, IonType::Symbol, false);
444                    assert_eq!(reader.read_symbol()?, RawSymbolToken::SymbolId(12));
445
446                    Ok(())
447                }
448            }
449
450
451        )*
452        }
453    }
454
455    raw_reader_tests! {
456        binary_reader: RawBinaryReader<Vec<u8>>,
457        text_reader: RawTextReader<Vec<u8>>,
458    }
459
460    #[test]
461    fn test_raw_bytes() -> IonResult<()> {
462        // Note: technically invalid Ion because the symbol IDs referenced are never added to the
463        // symbol table.
464
465        // {$11: [1, 2, 3], $10: 1}
466        let ion_data: &[u8] = &[
467            // First top-level value in the stream
468            0xDB, // 11-byte struct
469            0x8B, // Field ID 11
470            0xB6, // 6-byte List
471            0x21, 0x01, // Integer 1
472            0x21, 0x02, // Integer 2
473            0x21, 0x03, // Integer 3
474            0x8A, // Field ID 10
475            0x21, 0x01, // Integer 1
476            // Second top-level value in the stream
477            0xE3, // 3-byte annotations envelope
478            0x81, // * Annotations themselves take 1 byte
479            0x8C, // * Annotation w/SID $12
480            0x10, // Boolean false
481        ];
482        let mut cursor = BlockingRawBinaryReader::new(ion_data.to_owned())?;
483        assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
484        assert_eq!(cursor.raw_bytes(), Some(&ion_data[0..1])); // No value bytes for containers.
485        assert_eq!(cursor.raw_field_id_bytes(), None);
486        assert_eq!(cursor.raw_annotations_bytes(), None);
487        assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[0..=0]));
488        assert_eq!(cursor.raw_value_bytes(), None);
489        cursor.step_in()?;
490        assert_eq!(RawStreamItem::Value(IonType::List), cursor.next()?);
491        assert_eq!(cursor.raw_bytes(), Some(&ion_data[1..3]));
492        assert_eq!(cursor.raw_field_id_bytes(), Some(&ion_data[1..=1]));
493        assert_eq!(cursor.raw_annotations_bytes(), None);
494        assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[2..=2]));
495        assert_eq!(cursor.raw_value_bytes(), None);
496        cursor.step_in()?;
497        assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
498        assert_eq!(cursor.raw_bytes(), Some(&ion_data[3..=4]));
499        assert_eq!(cursor.raw_field_id_bytes(), None);
500        assert_eq!(cursor.raw_annotations_bytes(), None);
501        assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[3..=3]));
502        assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[4..=4]));
503        assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
504        assert_eq!(cursor.raw_bytes(), Some(&ion_data[5..=6]));
505        assert_eq!(cursor.raw_field_id_bytes(), None);
506        assert_eq!(cursor.raw_annotations_bytes(), None);
507        assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[5..=5]));
508        assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[6..=6]));
509        assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
510        assert_eq!(cursor.raw_bytes(), Some(&ion_data[7..=8]));
511        assert_eq!(cursor.raw_field_id_bytes(), None);
512        assert_eq!(cursor.raw_annotations_bytes(), None);
513        assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[7..=7]));
514        assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[8..=8]));
515
516        cursor.step_out()?; // Step out of list
517
518        assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
519        assert_eq!(cursor.raw_bytes(), Some(&ion_data[9..=11]));
520        assert_eq!(cursor.raw_field_id_bytes(), Some(&ion_data[9..=9]));
521        assert_eq!(cursor.raw_annotations_bytes(), None);
522        assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[10..=10]));
523        assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[11..=11]));
524
525        cursor.step_out()?; // Step out of struct
526
527        // Second top-level value
528        assert_eq!(RawStreamItem::Value(IonType::Bool), cursor.next()?);
529        assert_eq!(cursor.raw_bytes(), Some(&ion_data[12..16]));
530        assert_eq!(cursor.raw_field_id_bytes(), None);
531        assert_eq!(cursor.raw_annotations_bytes(), Some(&ion_data[12..=14]));
532        assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[15..=15]));
533        assert_eq!(
534            cursor.raw_value_bytes(),
535            Some(&ion_data[15..15] /* That is, zero bytes */)
536        );
537        Ok(())
538    }
539
540    #[test]
541    fn test_binary_end_of_stream() -> IonResult<()> {
542        // This test is to ensure that the non-blocking binary reader is honoring the end of stream
543        // flag, and that the blocking reader is making use of it. A previous bug existed where the
544        // binary reader was not using the end of stream flag, and ending a read on a data boundary
545        // would result in the blocking reader not providing any more data, since it relies on
546        // Incomplete errors to do so.
547
548        // {$11: [1, 2, 3], $10: 1}
549        let ion_data: &[u8] = &[
550            // First top-level value in the stream
551            0xDB, // 11-byte struct
552            0x8B, // Field ID 11
553            0xB6, // 6-byte List
554            0x21, 0x01, // Integer 1
555            0x21, 0x02, // Integer 2
556            0x21, 0x03, // Integer 3
557            0x8A, // Field ID 10
558            0x21, 0x01, // Integer 1
559            // Second top-level value in the stream
560            0xE3, // 3-byte annotations envelope
561            0x81, // * Annotations themselves take 1 byte
562            0x8C, // * Annotation w/SID $12
563            0x10, // Boolean false
564        ];
565        // Create a blocking reader with an initial buffer size of 12, so that we can read exactly
566        // the first value. If EOS is not honored, our second call to `next` should result in no
567        // value being read, since the blocking reader would not know to provide more data.
568        let mut cursor = BlockingRawBinaryReader::new_with_size(ion_data.to_owned(), 12)?;
569        assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
570        assert_eq!(RawStreamItem::Value(IonType::Bool), cursor.next()?);
571
572        Ok(())
573    }
574}