1use delegate::delegate;
2use std::ops::Range;
3
4use crate::binary::non_blocking::raw_binary_reader::RawBinaryReader;
5use crate::data_source::ToIonDataSource;
6use crate::element::{Blob, Clob};
7use crate::raw_reader::BufferedRawReader;
8use crate::result::IonResult;
9use crate::stream_reader::IonReader;
10use crate::text::non_blocking::raw_text_reader::RawTextReader;
11use crate::types::Timestamp;
12use crate::{Decimal, Int, IonError, IonType, Str};
13
14pub type BlockingRawTextReader<T> = BlockingRawReader<RawTextReader<Vec<u8>>, T>;
15pub type BlockingRawBinaryReader<T> = BlockingRawReader<RawBinaryReader<Vec<u8>>, T>;
16
17pub struct BlockingRawReader<R: BufferedRawReader, T: ToIonDataSource> {
20 source: T::DataSource,
21 reader: R,
22 expected_read_size: usize,
23}
24
25const READER_DEFAULT_BUFFER_CAPACITY: usize = 1024 * 4;
26
27impl<R: BufferedRawReader, T: ToIonDataSource> BlockingRawReader<R, T> {
28 pub fn read_source(&mut self, length: usize) -> IonResult<usize> {
29 let mut bytes_read = 0;
30 loop {
31 let n = self.reader.read_from(&mut self.source, length)?;
32 bytes_read += n;
33 if n == 0 || bytes_read >= length {
34 break;
35 }
36 }
37 Ok(bytes_read)
38 }
39
40 pub fn new(input: T) -> IonResult<Self> {
41 Self::new_with_size(input, READER_DEFAULT_BUFFER_CAPACITY)
42 }
43
44 pub fn new_with_size(input: T, size: usize) -> IonResult<Self> {
45 let buffer = Vec::with_capacity(size);
46 let mut reader = Self {
47 source: input.to_ion_data_source(),
48 reader: buffer.into(),
49 expected_read_size: size,
50 };
51 reader.read_source(size)?;
52 Ok(reader)
53 }
54}
55
56impl<R: BufferedRawReader, T: ToIonDataSource> IonReader for BlockingRawReader<R, T> {
57 type Item = R::Item;
58 type Symbol = R::Symbol;
59
60 fn ion_version(&self) -> (u8, u8) {
61 (1, 0)
62 }
63
64 fn next(&mut self) -> IonResult<Self::Item> {
65 let mut read_size = self.expected_read_size;
66
67 loop {
68 let result = self.reader.next();
69 if let Err(IonError::Incomplete { .. }) = result {
70 let bytes_read = self.read_source(read_size)?;
71 if 0 == bytes_read {
75 if self.reader.is_stream_complete() {
76 return result;
77 } else {
78 self.reader.stream_complete();
79 }
80 }
81 read_size = std::cmp::min(read_size * 2, self.expected_read_size * 10);
86 } else {
87 return result;
88 }
89 }
90 }
91
92 fn current(&self) -> Self::Item {
93 self.reader.current()
94 }
95
96 fn ion_type(&self) -> Option<IonType> {
97 self.reader.ion_type()
98 }
99
100 fn is_null(&self) -> bool {
101 self.reader.is_null()
102 }
103
104 fn annotations<'a>(&'a self) -> Box<dyn Iterator<Item = IonResult<Self::Symbol>> + 'a> {
105 self.reader.annotations()
106 }
107
108 fn has_annotations(&self) -> bool {
109 self.reader.has_annotations()
110 }
111
112 fn number_of_annotations(&self) -> usize {
113 self.reader.number_of_annotations()
114 }
115
116 fn field_name(&self) -> IonResult<Self::Symbol> {
117 self.reader.field_name()
118 }
119
120 fn read_null(&mut self) -> IonResult<IonType> {
121 self.reader.read_null()
122 }
123
124 fn read_bool(&mut self) -> IonResult<bool> {
125 self.reader.read_bool()
126 }
127
128 fn read_int(&mut self) -> IonResult<Int> {
129 self.reader.read_int()
130 }
131
132 fn read_i64(&mut self) -> IonResult<i64> {
133 self.reader.read_i64()
134 }
135
136 fn read_f32(&mut self) -> IonResult<f32> {
137 self.reader.read_f32()
138 }
139
140 fn read_f64(&mut self) -> IonResult<f64> {
141 self.reader.read_f64()
142 }
143
144 fn read_decimal(&mut self) -> IonResult<Decimal> {
145 self.reader.read_decimal()
146 }
147
148 fn read_string(&mut self) -> IonResult<Str> {
149 self.reader.read_string()
150 }
151
152 fn read_str(&mut self) -> IonResult<&str> {
153 self.reader.read_str()
154 }
155
156 fn read_symbol(&mut self) -> IonResult<Self::Symbol> {
157 self.reader.read_symbol()
158 }
159
160 fn read_blob(&mut self) -> IonResult<Blob> {
161 self.reader.read_blob()
162 }
163
164 fn read_clob(&mut self) -> IonResult<Clob> {
165 self.reader.read_clob()
166 }
167
168 fn read_timestamp(&mut self) -> IonResult<Timestamp> {
169 self.reader.read_timestamp()
170 }
171
172 fn step_in(&mut self) -> IonResult<()> {
173 self.reader.step_in()
174 }
175
176 fn step_out(&mut self) -> IonResult<()> {
177 let mut read_size = self.expected_read_size;
178 loop {
179 let result = self.reader.step_out();
180 if let Err(IonError::Incomplete { .. }) = result {
181 if 0 == self.read_source(read_size)? {
182 return result;
183 }
184 } else {
185 return result;
186 }
187 read_size = std::cmp::min(read_size * 2, self.expected_read_size * 10);
188 }
189 }
190
191 fn parent_type(&self) -> Option<IonType> {
192 self.reader.parent_type()
193 }
194
195 fn depth(&self) -> usize {
196 self.reader.depth()
197 }
198}
199
200impl<T: ToIonDataSource> BlockingRawReader<RawBinaryReader<Vec<u8>>, T> {
201 delegate! {
202 to self.reader {
203 pub fn raw_bytes(&self) -> Option<&[u8]>;
204
205 pub fn field_id_length(&self) -> Option<usize>;
206 pub fn field_id_offset(&self) -> Option<usize>;
207 pub fn field_id_range(&self) -> Option<Range<usize>>;
208 pub fn raw_field_id_bytes(&self) -> Option<&[u8]>;
209
210 pub fn annotations_length(&self) -> Option<usize>;
211 pub fn annotations_offset(&self) -> Option<usize>;
212 pub fn annotations_range(&self) -> Option<Range<usize>>;
213 pub fn raw_annotations_bytes(&self) -> Option<&[u8]>;
214
215 pub fn header_length(&self) -> usize;
216 pub fn header_offset(&self) -> usize;
217 pub fn header_range(&self) -> Range<usize>;
218 pub fn raw_header_bytes(&self) -> Option<&[u8]>;
219
220 pub fn value_length(&self) -> usize;
221 pub fn value_offset(&self) -> usize;
222 pub fn value_range(&self) -> Range<usize>;
223 pub fn raw_value_bytes(&self) -> Option<&[u8]>;
224 }
225 }
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231 use crate::binary::non_blocking::raw_binary_reader::RawBinaryReader as NBRawBinaryReader;
232 use crate::raw_reader::RawStreamItem;
233 use crate::result::IonResult;
234 use crate::text::non_blocking::raw_text_reader::RawTextReader;
235
236 fn bin_reader(source: &[u8]) -> BlockingRawBinaryReader<Vec<u8>> {
237 let reader = BlockingRawReader::<NBRawBinaryReader<Vec<u8>>, Vec<u8>>::new(source.to_vec());
238 reader.unwrap()
239 }
240
241 fn text_reader(source: &[u8]) -> BlockingRawTextReader<Vec<u8>> {
242 let reader = BlockingRawReader::<RawTextReader<Vec<u8>>, Vec<u8>>::new(source.to_vec());
243 reader.unwrap()
244 }
245
246 mod data {
247 pub mod binary_reader {
248 pub const BASIC_INCOMPLETE: &[u8] = &[
252 0xe0, 0x01, 0x00, 0xea, 0xb6, 0x21, 0x01, 0x21, 0x02, 0x21, ];
254
255 pub const STRING_BASIC: &[u8] = &[
256 0xe0, 0x01, 0x00, 0xea, 0x8b, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x20, 0x57, 0x6f, 0x72,
257 0x6c, 0x64,
258 ];
259
260 pub const STRUCT_NESTED: &[u8] = &[
261 0xe0, 0x01, 0x00, 0xea, 0xee, 0x95, 0x81, 0x83, 0xde, 0x91, 0x86, 0x71, 0x03, 0x87,
262 0xbc, 0x83, 0x66, 0x6f, 0x6f, 0x83, 0x62, 0x61, 0x72, 0x81, 0x61, 0x81, 0x62, 0xde,
263 0x95, 0x8a, 0xb9, 0x21, 0x01, 0xb4, 0x21, 0x02, 0x21, 0x03, 0x21, 0x04, 0x8b, 0xd8,
264 0x8c, 0x21, 0x05, 0x8d, 0xc3, 0x11, 0x11, 0x11, 0x21, 0x0b,
265 ];
266
267 pub const BASIC_SYMBOL_TABLE: &[u8] = &[
268 0xe0, 0x01, 0x00, 0xea, 0xee, 0x95, 0x81, 0x83, 0xde, 0x91, 0x86,
269 0x71, 0x03, 0x87, 0xbc, 0x83, 0x66, 0x6f, 0x6f, 0x83, 0x62, 0x61,
270 0x72, 0x83, 0x62, 0x61, 0x7a, 0x71, 0x0a, 0x71, 0x0b, 0x71, 0x0c,
271 ];
272 }
273 pub mod text_reader {
274 pub const BASIC_INCOMPLETE: &[u8] = r#"
275 $ion_1_0
276 [1, 2, 3
277 "#
278 .as_bytes();
279 pub const STRING_BASIC: &[u8] = r#"
280 $ion_1_0
281 "Hello World"
282 "#
283 .as_bytes();
284
285 pub const STRUCT_NESTED: &[u8] = r#"
286 $ion_1_0
287 $ion_symbol_table::{}
288 {
289 foo: [
290 1,
291 [2, 3],
292 4
293 ],
294 bar: {
295 a: 5,
296 b: (true true true)
297 }
298 }
299 11
300 "#
301 .as_bytes();
302
303 pub const BASIC_SYMBOL_TABLE: &[u8] = r#"
304 $ion_1_0
305 $ion_symbol_table::{
306 imports: $ion_symbol_table,
307 symbols: ["foo", "bar", "baz"],
308 }
309 $10
310 $11
311 $12
312 "#
313 .as_bytes();
314 }
315 }
316
317 macro_rules! raw_reader_tests {
318 ($($name:ident: $type:ty,)*) => {
319 $(
320 mod $name {
321 use super::*;
322 use super::data::$name::*;
323 use crate::raw_symbol_token::RawSymbolToken;
324
325 fn next_type(reader: &mut BlockingRawReader<$type, Vec<u8>>, ion_type: IonType, is_null: bool) {
326 assert_eq!(
327 reader.next().unwrap(),
328 RawStreamItem::nullable_value(ion_type, is_null)
329 );
330 }
331
332 fn new_reader(source: &[u8]) -> BlockingRawReader<$type, Vec<u8>> {
335 let reader = BlockingRawReader::<$type, Vec<u8>>::new_with_size(source.to_vec(), 24);
336 reader.unwrap()
337 }
338
339 #[test]
340 fn basic_incomplete() -> IonResult<()> {
341 let reader = &mut new_reader(BASIC_INCOMPLETE);
342 assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
343 next_type(reader, IonType::List, false);
344 reader.step_in()?;
345
346 next_type(reader, IonType::Int, false);
347 assert_eq!(reader.read_i64()?, 1);
348 let result = reader.step_out();
349 match result {
350 Err(IonError::Incomplete { .. }) => (),
351 r => panic!("Unexpected result: {:?}", r),
352 }
353 assert!(result.is_err());
354
355 Ok(())
356 }
357
358 #[test]
359 fn incomplete_string() -> IonResult<()> {
360 let reader = &mut new_reader(STRING_BASIC);
361 assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
362 next_type(reader, IonType::String, false);
363 assert_eq!(reader.read_string()?, "Hello World");
364 Ok(())
365 }
366
367 #[test]
368 fn nested_struct() -> IonResult<()> {
369 let reader = &mut new_reader(STRUCT_NESTED);
370 assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
371
372 next_type(reader, IonType::Struct, false); next_type(reader, IonType::Struct, false);
375 reader.step_in()?;
376 next_type(reader, IonType::List, false);
377 assert!(reader.field_name().is_ok());
378
379 reader.step_in()?;
380 next_type(reader, IonType::Int, false);
381 assert_eq!(reader.read_i64()?, 1);
382 next_type(reader, IonType::List, false);
383 reader.step_in()?;
384 next_type(reader, IonType::Int, false);
385 assert_eq!(reader.read_i64()?, 2);
386 reader.step_out()?; reader.step_out()?; next_type(reader, IonType::Struct, false);
392 assert!(reader.field_name().is_ok());
393
394 reader.step_in()?;
395 next_type(reader, IonType::Int, false);
396 assert_eq!(reader.read_i64()?, 5);
397 next_type(reader, IonType::SExp, false);
398 reader.step_in()?;
399 next_type(reader, IonType::Bool, false);
400 assert_eq!(reader.read_bool()?, true);
401 next_type(reader, IonType::Bool, false);
402 assert_eq!(reader.read_bool()?, true);
403 next_type(reader, IonType::Bool, false);
404 assert_eq!(reader.read_bool()?, true);
405 reader.step_out()?; reader.step_out()?; reader.step_out()?; next_type(reader, IonType::Int, false);
411 assert_eq!(reader.read_i64()?, 11);
412 Ok(())
413 }
414
415 #[test]
416 fn basic_symbol_table() -> IonResult<()> {
417 let reader = &mut new_reader(BASIC_SYMBOL_TABLE);
418 assert_eq!(reader.next().unwrap(), RawStreamItem::VersionMarker(1, 0));
419
420 next_type(reader, IonType::Struct, false);
421 reader.step_in()?;
422
423 next_type(reader, IonType::Symbol, false);
424
425 next_type(reader, IonType::List, false);
426 reader.step_in()?;
427
428 next_type(reader, IonType::String, false);
429 assert_eq!(reader.read_string()?, "foo");
430 next_type(reader, IonType::String, false);
431 assert_eq!(reader.read_string()?, "bar");
432 next_type(reader, IonType::String, false);
433 assert_eq!(reader.read_string()?, "baz");
434
435 reader.step_out()?; reader.step_out()?; next_type(reader, IonType::Symbol, false);
438 assert_eq!(reader.read_symbol()?, RawSymbolToken::SymbolId(10));
439
440 next_type(reader, IonType::Symbol, false);
441 assert_eq!(reader.read_symbol()?, RawSymbolToken::SymbolId(11));
442
443 next_type(reader, IonType::Symbol, false);
444 assert_eq!(reader.read_symbol()?, RawSymbolToken::SymbolId(12));
445
446 Ok(())
447 }
448 }
449
450
451 )*
452 }
453 }
454
455 raw_reader_tests! {
456 binary_reader: RawBinaryReader<Vec<u8>>,
457 text_reader: RawTextReader<Vec<u8>>,
458 }
459
460 #[test]
461 fn test_raw_bytes() -> IonResult<()> {
462 let ion_data: &[u8] = &[
467 0xDB, 0x8B, 0xB6, 0x21, 0x01, 0x21, 0x02, 0x21, 0x03, 0x8A, 0x21, 0x01, 0xE3, 0x81, 0x8C, 0x10, ];
482 let mut cursor = BlockingRawBinaryReader::new(ion_data.to_owned())?;
483 assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
484 assert_eq!(cursor.raw_bytes(), Some(&ion_data[0..1])); assert_eq!(cursor.raw_field_id_bytes(), None);
486 assert_eq!(cursor.raw_annotations_bytes(), None);
487 assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[0..=0]));
488 assert_eq!(cursor.raw_value_bytes(), None);
489 cursor.step_in()?;
490 assert_eq!(RawStreamItem::Value(IonType::List), cursor.next()?);
491 assert_eq!(cursor.raw_bytes(), Some(&ion_data[1..3]));
492 assert_eq!(cursor.raw_field_id_bytes(), Some(&ion_data[1..=1]));
493 assert_eq!(cursor.raw_annotations_bytes(), None);
494 assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[2..=2]));
495 assert_eq!(cursor.raw_value_bytes(), None);
496 cursor.step_in()?;
497 assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
498 assert_eq!(cursor.raw_bytes(), Some(&ion_data[3..=4]));
499 assert_eq!(cursor.raw_field_id_bytes(), None);
500 assert_eq!(cursor.raw_annotations_bytes(), None);
501 assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[3..=3]));
502 assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[4..=4]));
503 assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
504 assert_eq!(cursor.raw_bytes(), Some(&ion_data[5..=6]));
505 assert_eq!(cursor.raw_field_id_bytes(), None);
506 assert_eq!(cursor.raw_annotations_bytes(), None);
507 assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[5..=5]));
508 assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[6..=6]));
509 assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
510 assert_eq!(cursor.raw_bytes(), Some(&ion_data[7..=8]));
511 assert_eq!(cursor.raw_field_id_bytes(), None);
512 assert_eq!(cursor.raw_annotations_bytes(), None);
513 assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[7..=7]));
514 assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[8..=8]));
515
516 cursor.step_out()?; assert_eq!(RawStreamItem::Value(IonType::Int), cursor.next()?);
519 assert_eq!(cursor.raw_bytes(), Some(&ion_data[9..=11]));
520 assert_eq!(cursor.raw_field_id_bytes(), Some(&ion_data[9..=9]));
521 assert_eq!(cursor.raw_annotations_bytes(), None);
522 assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[10..=10]));
523 assert_eq!(cursor.raw_value_bytes(), Some(&ion_data[11..=11]));
524
525 cursor.step_out()?; assert_eq!(RawStreamItem::Value(IonType::Bool), cursor.next()?);
529 assert_eq!(cursor.raw_bytes(), Some(&ion_data[12..16]));
530 assert_eq!(cursor.raw_field_id_bytes(), None);
531 assert_eq!(cursor.raw_annotations_bytes(), Some(&ion_data[12..=14]));
532 assert_eq!(cursor.raw_header_bytes(), Some(&ion_data[15..=15]));
533 assert_eq!(
534 cursor.raw_value_bytes(),
535 Some(&ion_data[15..15] )
536 );
537 Ok(())
538 }
539
540 #[test]
541 fn test_binary_end_of_stream() -> IonResult<()> {
542 let ion_data: &[u8] = &[
550 0xDB, 0x8B, 0xB6, 0x21, 0x01, 0x21, 0x02, 0x21, 0x03, 0x8A, 0x21, 0x01, 0xE3, 0x81, 0x8C, 0x10, ];
565 let mut cursor = BlockingRawBinaryReader::new_with_size(ion_data.to_owned(), 12)?;
569 assert_eq!(RawStreamItem::Value(IonType::Struct), cursor.next()?);
570 assert_eq!(RawStreamItem::Value(IonType::Bool), cursor.next()?);
571
572 Ok(())
573 }
574}