1use crate::result::{decoding_error, illegal_operation, illegal_operation_raw};
2use crate::text::parent_container::ParentContainer;
3
4use crate::element::iterators::SymbolsIterator;
5use crate::element::{Blob, Clob, Element};
6use crate::{
7 Decimal, Int, IonError, IonReader, IonResult, IonType, Str, StreamItem, Symbol, Timestamp,
8};
9use std::fmt::Display;
10use std::mem;
11
12const INITIAL_PARENTS_CAPACITY: usize = 16;
13
14pub struct ElementStreamReader {
17 element: Option<Element>,
19 current_iter: Box<dyn Iterator<Item = (Option<Symbol>, Element)>>,
22 iter_stack: Vec<Box<dyn Iterator<Item = (Option<Symbol>, Element)>>>,
23 current_field_name: Option<Symbol>,
25 current_value: Option<Element>,
28 is_eof: bool,
29 parents: Vec<ParentContainer>,
30}
31
32impl ElementStreamReader {
33 pub fn new(input: Element) -> ElementStreamReader {
34 ElementStreamReader {
35 element: Some(input),
36 current_iter: Box::new(std::iter::empty()),
37 iter_stack: vec![],
38 current_field_name: None,
39 current_value: None,
40 is_eof: false,
41 parents: Vec::with_capacity(INITIAL_PARENTS_CAPACITY),
42 }
43 }
44
45 fn load_next_value(&mut self) -> IonResult<()> {
46 let need_to_skip_container = !self.is_null()
50 && self
51 .current_value
52 .as_ref()
53 .map(|v| v.ion_type().is_container())
54 .unwrap_or(false);
55
56 if need_to_skip_container {
57 self.step_in()?;
58 self.step_out()?;
59 }
60
61 self.current_value = None;
63 self.current_field_name = None;
64
65 if self.parents.is_empty() {
66 if self.is_eof {
69 self.current_value = None;
70 return Ok(());
71 }
72
73 self.current_value = self.element.take();
74 self.is_eof = true;
76 return Ok(());
77 }
78
79 let next_item = self.current_iter.next();
82 if next_item.is_none() {
83 self.current_value = None;
86 return Ok(());
87 }
88 let (field_name, value) = next_item.unwrap();
90 self.current_value = Some(value);
91 self.current_field_name = field_name;
94
95 Ok(())
96 }
97
98 fn expected<I: Display>(&self, expected: I) -> IonError {
102 illegal_operation_raw(format!(
103 "type mismatch: expected a(n) {} but positioned over a(n) {}",
104 expected,
105 self.current()
106 ))
107 }
108
109 fn container_values(value: Element) -> Box<dyn Iterator<Item = (Option<Symbol>, Element)>> {
110 match value.ion_type() {
111 IonType::List | IonType::SExp => Box::new(
112 value
113 .as_sequence()
114 .unwrap()
115 .elements()
116 .map(|e| (None, e.to_owned()))
117 .collect::<Vec<(Option<Symbol>, Element)>>()
118 .into_iter(),
119 ),
120 IonType::Struct => Box::new(
121 value
122 .as_struct()
123 .unwrap()
124 .iter()
125 .map(|(s, e)| (Some(s.to_owned()), e.to_owned()))
126 .collect::<Vec<(Option<Symbol>, Element)>>()
127 .into_iter(),
128 ),
129 _ => unreachable!("Can not step into a scalar type"),
130 }
131 }
132
133 fn current_value_as<T, F>(&self, expect_message: &'static str, map_fn: F) -> IonResult<T>
134 where
135 F: Fn(&Element) -> Option<T>,
136 {
137 self.current_value
138 .as_ref()
139 .and_then(map_fn)
140 .ok_or_else(|| self.expected(expect_message))
141 }
142}
143
144impl IonReader for ElementStreamReader {
145 type Item = StreamItem;
146 type Symbol = Symbol;
147
148 fn next(&mut self) -> IonResult<StreamItem> {
149 self.load_next_value()?;
151
152 Ok(self.current())
154 }
155
156 fn current(&self) -> StreamItem {
157 if let Some(ref value) = self.current_value {
158 StreamItem::nullable_value(value.ion_type(), value.is_null())
159 } else {
160 StreamItem::Nothing
161 }
162 }
163
164 fn ion_type(&self) -> Option<IonType> {
165 self.current_value.as_ref().map(|v| v.ion_type())
166 }
167
168 fn is_null(&self) -> bool {
169 if let Some(ref value) = self.current_value {
170 return value.is_null();
171 }
172 false
173 }
174
175 #[allow(clippy::redundant_closure)]
178 fn annotations<'a>(&'a self) -> Box<dyn Iterator<Item = IonResult<Self::Symbol>> + 'a> {
179 let iterator = self
180 .current_value
181 .as_ref()
182 .map(|value| value.annotations().iter())
183 .unwrap_or_else(|| SymbolsIterator::empty())
184 .cloned()
185 .map(Ok);
188 Box::new(iterator)
189 }
190
191 fn field_name(&self) -> IonResult<Self::Symbol> {
192 match self.current_field_name.as_ref() {
193 Some(name) => Ok(name.clone()),
194 None => illegal_operation(
195 "field_name() can only be called when the reader is positioned inside a struct",
196 ),
197 }
198 }
199
200 fn read_null(&mut self) -> IonResult<IonType> {
203 match self.current_value.as_ref() {
204 Some(element) if element.is_null() => Ok(element.ion_type()),
205 _ => Err(self.expected("null value")),
206 }
207 }
208
209 fn read_bool(&mut self) -> IonResult<bool> {
210 self.current_value_as("bool value", |v| v.as_bool())
211 }
212
213 fn read_int(&mut self) -> IonResult<Int> {
214 self.current_value_as("int value", |v| v.as_int().map(|i| i.to_owned()))
215 }
216
217 fn read_i64(&mut self) -> IonResult<i64> {
218 match self.current_value.as_ref() {
219 Some(element) if element.as_int().is_some() => match element.as_int().unwrap() {
220 Int::I64(value) => Ok(*value),
221 Int::BigInt(value) => {
222 decoding_error(format!("Integer {value} is too large to fit in an i64."))
223 }
224 },
225 _ => Err(self.expected("int value")),
226 }
227 }
228
229 fn read_f32(&mut self) -> IonResult<f32> {
230 self.current_value_as("float value", |v| v.as_float().map(|f| f as f32))
231 }
232
233 fn read_f64(&mut self) -> IonResult<f64> {
234 self.current_value_as("float value", |v| v.as_float())
235 }
236
237 fn read_decimal(&mut self) -> IonResult<Decimal> {
238 self.current_value_as("decimal value", |v| v.as_decimal().map(|i| i.to_owned()))
239 }
240
241 fn read_string(&mut self) -> IonResult<Str> {
242 match self.current_value.as_ref() {
243 Some(element) if element.as_text().is_some() => Ok(element.as_text().unwrap().into()),
244 _ => Err(self.expected("string value")),
245 }
246 }
247
248 fn read_str(&mut self) -> IonResult<&str> {
249 match self.current_value.as_ref() {
250 Some(element) if element.as_text().is_some() => Ok(element.as_text().unwrap()),
251 _ => Err(self.expected("string value")),
252 }
253 }
254
255 fn read_symbol(&mut self) -> IonResult<Self::Symbol> {
256 self.current_value_as("symbol value", |v| v.as_symbol().map(|i| i.to_owned()))
257 }
258
259 fn read_blob(&mut self) -> IonResult<Blob> {
260 match self.current_value.as_ref() {
261 Some(element) if element.as_blob().is_some() => {
262 Ok(Blob::from(element.as_blob().unwrap()))
263 }
264 _ => Err(self.expected("blog value")),
265 }
266 }
267
268 fn read_clob(&mut self) -> IonResult<Clob> {
269 match self.current_value.as_ref() {
270 Some(element) if element.as_clob().is_some() => {
271 Ok(Clob::from(element.as_clob().unwrap()))
272 }
273 _ => Err(self.expected("clob value")),
274 }
275 }
276
277 fn read_timestamp(&mut self) -> IonResult<Timestamp> {
278 self.current_value_as("timestamp value", |v| {
279 v.as_timestamp().map(|i| i.to_owned())
280 })
281 }
282
283 fn step_in(&mut self) -> IonResult<()> {
284 match &self.current_value {
285 Some(value) if value.ion_type().is_container() => {
286 self.parents.push(ParentContainer::new(value.ion_type()));
287 let mut iter = ElementStreamReader::container_values(value.to_owned());
289 mem::swap(&mut iter, &mut self.current_iter);
291 self.iter_stack.push(iter);
293 self.current_value = None;
294 Ok(())
295 }
296 Some(value) => {
297 illegal_operation(format!("Cannot step_in() to a {:?}", value.ion_type()))
298 }
299 None => illegal_operation(format!(
300 "{} {}",
301 "Cannot `step_in`: the reader is not positioned on a value.",
302 "Try calling `next()` to advance first."
303 )),
304 }
305 }
306
307 fn step_out(&mut self) -> IonResult<()> {
308 if self.parents.is_empty() {
309 return illegal_operation(
310 "Cannot call `step_out()` when the reader is at the top level.",
311 );
312 }
313
314 let parent = self.parents.last().unwrap();
316
317 if !parent.is_exhausted() {
319 while let StreamItem::Value(_) | StreamItem::Null(_) = self.next()? {}
320 }
321
322 let _ = self.parents.pop();
324
325 match self.iter_stack.pop() {
327 None => {}
328 Some(iter) => {
329 self.current_iter = iter;
330 }
331 }
332 self.current_value = None;
333
334 if self.parents.is_empty() {
335 return Ok(());
337 }
338
339 Ok(())
340 }
341
342 fn parent_type(&self) -> Option<IonType> {
343 self.parents.last().map(|parent| parent.ion_type())
344 }
345
346 fn depth(&self) -> usize {
347 self.parents.len()
348 }
349
350 fn ion_version(&self) -> (u8, u8) {
351 (1, 0)
354 }
355}
356
357#[cfg(test)]
358mod reader_tests {
359 use rstest::*;
360
361 use super::*;
362 use crate::result::IonResult;
363 use crate::stream_reader::IonReader;
364 use crate::types::{Decimal, Timestamp};
365
366 use crate::IonType;
367
368 fn load_element(text: &str) -> Element {
369 Element::read_one(text.as_bytes()).expect("parsing failed unexpectedly")
370 }
371
372 fn next_type(reader: &mut ElementStreamReader, ion_type: IonType, is_null: bool) {
373 assert_eq!(
374 reader.next().unwrap(),
375 StreamItem::nullable_value(ion_type, is_null)
376 );
377 }
378
379 #[test]
380 fn test_skipping_containers() -> IonResult<()> {
381 let ion_data = load_element(
382 r#"
383 [1, 2, 3]
384 "#,
385 );
386 let reader = &mut ElementStreamReader::new(ion_data);
387
388 next_type(reader, IonType::List, false);
389 reader.step_in()?;
390 next_type(reader, IonType::Int, false);
391 assert_eq!(reader.read_i64()?, 1);
392 reader.step_out()?;
393 assert_eq!(reader.next()?, StreamItem::Nothing);
396 Ok(())
397 }
398
399 #[test]
400 fn test_read_nested_containers() -> IonResult<()> {
401 let ion_data = load_element(
402 r#"
403 {
404 foo: [
405 1,
406 [2, 3],
407 4
408 ],
409 bar: {
410 a: 5,
411 b: (true true true)
412 }
413 }
414 "#,
415 );
416 let reader = &mut ElementStreamReader::new(ion_data);
417 next_type(reader, IonType::Struct, false);
418 reader.step_in()?;
419 next_type(reader, IonType::List, false);
420 reader.step_in()?;
421 next_type(reader, IonType::Int, false);
422 next_type(reader, IonType::List, false);
423 reader.step_in()?;
424 next_type(reader, IonType::Int, false);
425 reader.step_out()?;
427 reader.step_out()?;
428 next_type(reader, IonType::Struct, false);
429 reader.step_in()?;
430 next_type(reader, IonType::Int, false);
431 next_type(reader, IonType::SExp, false);
432 reader.step_in()?;
433 next_type(reader, IonType::Bool, false);
434 next_type(reader, IonType::Bool, false);
435 reader.step_out()?;
437 reader.step_out()?;
438 reader.step_out()?;
439 Ok(())
440 }
441
442 #[test]
443 fn test_read_container_with_mixed_scalars_and_containers() -> IonResult<()> {
444 let ion_data = load_element(
445 r#"
446 {
447 foo: 4,
448 bar: {
449 a: 5,
450 b: (true true true)
451 }
452 }
453 "#,
454 );
455
456 let reader = &mut ElementStreamReader::new(ion_data);
457 next_type(reader, IonType::Struct, false);
458 reader.step_in()?;
459 next_type(reader, IonType::Int, false);
460 assert_eq!(reader.field_name()?, Symbol::owned("foo"));
461 next_type(reader, IonType::Struct, false);
462 assert_eq!(reader.field_name()?, Symbol::owned("bar"));
463 reader.step_in()?;
464 next_type(reader, IonType::Int, false);
465 assert_eq!(reader.read_i64()?, 5);
466 reader.step_out()?;
467 assert_eq!(reader.next()?, StreamItem::Nothing);
468 reader.step_out()?;
469 Ok(())
470 }
471
472 #[test]
473 fn test_read_container_with_mixed_scalars() -> IonResult<()> {
474 let ion_data = load_element(
475 r#"
476 [ {{ZW5jb2RlZA==}}, {{"hello"}}, 4.5e0, 4.5, 2007-07-12T, foo, "hi!" ]
477 "#,
478 );
479
480 let reader = &mut ElementStreamReader::new(ion_data);
481 next_type(reader, IonType::List, false);
482 reader.step_in()?;
483 next_type(reader, IonType::Blob, false);
484 assert_eq!(reader.read_blob()?, Blob::from("encoded"));
485 next_type(reader, IonType::Clob, false);
486 assert_eq!(reader.read_clob()?, Clob::from("hello"));
487 next_type(reader, IonType::Float, false);
488 assert_eq!(reader.read_f64()?, 4.5);
489 next_type(reader, IonType::Decimal, false);
490 assert_eq!(reader.read_decimal()?, Decimal::new(45, -1));
491 next_type(reader, IonType::Timestamp, false);
492 assert_eq!(
493 reader.read_timestamp()?,
494 Timestamp::with_ymd(2007, 7, 12).build().unwrap()
495 );
496 next_type(reader, IonType::Symbol, false);
497 assert_eq!(reader.read_symbol()?, Symbol::owned("foo"));
498 next_type(reader, IonType::String, false);
499 assert_eq!(reader.read_string()?, "hi!".to_string());
500 reader.step_out()?;
501 Ok(())
502 }
503
504 #[rstest]
505 #[case(" null ", Element::from(IonType::Null))]
506 #[case(" null.string ", Element::from(IonType::String))]
507 #[case(" true ", true)]
508 #[case(" false ", false)]
509 #[case(" 738 ", 738)]
510 #[case(" 2.5e0 ", 2.5)]
511 #[case(" 2.5 ", Decimal::new(25, -1))]
512 #[case(" 2007-07-12T ", Timestamp::with_ymd(2007, 7, 12).build().unwrap())]
513 #[case(" foo ", Symbol::owned("foo"))]
514 #[case(" \"hi!\" ", "hi!".to_owned())]
515 #[case(" {{ZW5jb2RlZA==}} ", Blob::from("encoded"))]
516 #[case(" {{\"hello\"}} ", Clob::from("hello"))]
517 fn test_read_single_top_level_values<E: Into<Element>>(
518 #[case] text: &str,
519 #[case] expected_value: E,
520 ) {
521 let reader = &mut ElementStreamReader::new(load_element(text));
522 let expected_element = expected_value.into();
523 next_type(
524 reader,
525 expected_element.ion_type(),
526 expected_element.is_null(),
527 );
528 let actual_element = reader.current_value.clone();
531 assert_eq!(actual_element.unwrap(), expected_element);
532 }
533
534 #[rstest]
535 #[case(" foo::bar::null ", Element::from(IonType::Null).with_annotations(["foo", "bar"]))]
536 #[case(" foo::true ", Element::from(true).with_annotations(["foo"]))]
537 #[case(" 'foo'::5 ", Element::from(5).with_annotations(["foo"]))]
538 fn test_top_level_values_with_annotations<E: Into<Element>>(
539 #[case] text: &str,
540 #[case] expected_value: E,
541 ) {
542 let reader = &mut ElementStreamReader::new(load_element(text));
543 let expected_element = expected_value.into();
544 next_type(
545 reader,
546 expected_element.ion_type(),
547 expected_element.is_null(),
548 );
549 let actual_element = reader.current_value.clone();
550 assert_eq!(actual_element.unwrap(), expected_element);
552
553 let reader_annotations: IonResult<Vec<Symbol>> = reader.annotations().collect();
555 assert!(reader_annotations.is_ok());
556 }
557
558 #[test]
559 fn structs_trailing_comma() -> IonResult<()> {
560 let pretty_ion = load_element(
561 r#"
562 // Structs with last field with/without trailing comma
563 (
564 {a:1, b:2,} // with trailing comma
565 {a:1, b:2 } // without trailing comma
566 )
567 "#,
568 );
569 let mut reader = ElementStreamReader::new(pretty_ion);
570 assert_eq!(reader.next()?, StreamItem::Value(IonType::SExp));
571 reader.step_in()?;
572 assert_eq!(reader.next()?, StreamItem::Value(IonType::Struct));
573
574 reader.step_in()?;
575 assert_eq!(reader.next()?, StreamItem::Value(IonType::Int));
576 assert_eq!(reader.field_name()?, Symbol::owned("a".to_string()));
577 assert_eq!(reader.read_i64()?, 1);
578 assert_eq!(reader.next()?, StreamItem::Value(IonType::Int));
579 assert_eq!(reader.field_name()?, Symbol::owned("b".to_string()));
580 assert_eq!(reader.read_i64()?, 2);
581 reader.step_out()?;
582
583 assert_eq!(reader.next()?, StreamItem::Value(IonType::Struct));
584 reader.step_out()?;
585 Ok(())
586 }
587}