ion_rs/element/
element_stream_reader.rs

1use crate::result::{decoding_error, illegal_operation, illegal_operation_raw};
2use crate::text::parent_container::ParentContainer;
3
4use crate::element::iterators::SymbolsIterator;
5use crate::element::{Blob, Clob, Element};
6use crate::{
7    Decimal, Int, IonError, IonReader, IonResult, IonType, Str, StreamItem, Symbol, Timestamp,
8};
9use std::fmt::Display;
10use std::mem;
11
12const INITIAL_PARENTS_CAPACITY: usize = 16;
13
14// TODO: Add an IonElementReader trait implementation
15// TODO: once ElementReader trait is removed this can  use the name `ElementReader`
16pub struct ElementStreamReader {
17    // Represents the element that will be read using this reader
18    element: Option<Element>,
19    // If the reader is not positioned on a struct the iterator item will store (None, _element_)
20    // Otherwise it will store (Some(_field_name_), _element_)
21    current_iter: Box<dyn Iterator<Item = (Option<Symbol>, Element)>>,
22    iter_stack: Vec<Box<dyn Iterator<Item = (Option<Symbol>, Element)>>>,
23    // If the reader is not positioned over a value inside a struct, this is None.
24    current_field_name: Option<Symbol>,
25    // If the reader has not yet begun reading at the current level or is positioned over an IVM,
26    // this is None.
27    current_value: Option<Element>,
28    is_eof: bool,
29    parents: Vec<ParentContainer>,
30}
31
32impl ElementStreamReader {
33    pub fn new(input: Element) -> ElementStreamReader {
34        ElementStreamReader {
35            element: Some(input),
36            current_iter: Box::new(std::iter::empty()),
37            iter_stack: vec![],
38            current_field_name: None,
39            current_value: None,
40            is_eof: false,
41            parents: Vec::with_capacity(INITIAL_PARENTS_CAPACITY),
42        }
43    }
44
45    fn load_next_value(&mut self) -> IonResult<()> {
46        // If the reader's current value is the beginning of a container and the user calls `next()`,
47        // we need to skip the entire container. We can do this by stepping into and then out of
48        // that container; `step_out()` has logic that will exhaust the remaining values.
49        let need_to_skip_container = !self.is_null()
50            && self
51                .current_value
52                .as_ref()
53                .map(|v| v.ion_type().is_container())
54                .unwrap_or(false);
55
56        if need_to_skip_container {
57            self.step_in()?;
58            self.step_out()?;
59        }
60
61        // Unset variables holding onto information about the previous position.
62        self.current_value = None;
63        self.current_field_name = None;
64
65        if self.parents.is_empty() {
66            // If the reader has already found EOF (the end of the top level), there's no need to
67            // try to read more data. Return Ok(None).
68            if self.is_eof {
69                self.current_value = None;
70                return Ok(());
71            }
72
73            self.current_value = self.element.take();
74            // As we already loaded the single element provided to the reader, we have reached eof
75            self.is_eof = true;
76            return Ok(());
77        }
78
79        // If the parent is not empty that means we are inside a container
80        // Get the next value of the container using the iterator
81        let next_item = self.current_iter.next();
82        if next_item.is_none() {
83            // If there are no next values left within the iterator
84            // then early return
85            self.current_value = None;
86            return Ok(());
87        }
88        // Otherwise if there is a next value available then set current value accordingly
89        let (field_name, value) = next_item.unwrap();
90        self.current_value = Some(value);
91        // Field name will either be `None` for container types SExpression, List
92        // But for struct it will contain the field name `Symbol`
93        self.current_field_name = field_name;
94
95        Ok(())
96    }
97
98    /// Constructs an [IonError::IllegalOperation] which explains that the reader was asked to
99    /// perform an action that is only allowed when it is positioned over the item type described
100    /// by the parameter `expected`.
101    fn expected<I: Display>(&self, expected: I) -> IonError {
102        illegal_operation_raw(format!(
103            "type mismatch: expected a(n) {} but positioned over a(n) {}",
104            expected,
105            self.current()
106        ))
107    }
108
109    fn container_values(value: Element) -> Box<dyn Iterator<Item = (Option<Symbol>, Element)>> {
110        match value.ion_type() {
111            IonType::List | IonType::SExp => Box::new(
112                value
113                    .as_sequence()
114                    .unwrap()
115                    .elements()
116                    .map(|e| (None, e.to_owned()))
117                    .collect::<Vec<(Option<Symbol>, Element)>>()
118                    .into_iter(),
119            ),
120            IonType::Struct => Box::new(
121                value
122                    .as_struct()
123                    .unwrap()
124                    .iter()
125                    .map(|(s, e)| (Some(s.to_owned()), e.to_owned()))
126                    .collect::<Vec<(Option<Symbol>, Element)>>()
127                    .into_iter(),
128            ),
129            _ => unreachable!("Can not step into a scalar type"),
130        }
131    }
132
133    fn current_value_as<T, F>(&self, expect_message: &'static str, map_fn: F) -> IonResult<T>
134    where
135        F: Fn(&Element) -> Option<T>,
136    {
137        self.current_value
138            .as_ref()
139            .and_then(map_fn)
140            .ok_or_else(|| self.expected(expect_message))
141    }
142}
143
144impl IonReader for ElementStreamReader {
145    type Item = StreamItem;
146    type Symbol = Symbol;
147
148    fn next(&mut self) -> IonResult<StreamItem> {
149        // Parse the next value from the stream, storing it in `self.current_value`.
150        self.load_next_value()?;
151
152        // If we're positioned on a value, return its IonType and whether it's null.
153        Ok(self.current())
154    }
155
156    fn current(&self) -> StreamItem {
157        if let Some(ref value) = self.current_value {
158            StreamItem::nullable_value(value.ion_type(), value.is_null())
159        } else {
160            StreamItem::Nothing
161        }
162    }
163
164    fn ion_type(&self) -> Option<IonType> {
165        self.current_value.as_ref().map(|v| v.ion_type())
166    }
167
168    fn is_null(&self) -> bool {
169        if let Some(ref value) = self.current_value {
170            return value.is_null();
171        }
172        false
173    }
174
175    // Clippy reports a redundant closure, but fixing it causes the code to break.
176    // See: https://github.com/amazon-ion/ion-rust/issues/472
177    #[allow(clippy::redundant_closure)]
178    fn annotations<'a>(&'a self) -> Box<dyn Iterator<Item = IonResult<Self::Symbol>> + 'a> {
179        let iterator = self
180            .current_value
181            .as_ref()
182            .map(|value| value.annotations().iter())
183            .unwrap_or_else(|| SymbolsIterator::empty())
184            .cloned()
185            // The annotations are already in memory and are already resolved to text, so
186            // this step cannot fail. Map each token to Ok(token).
187            .map(Ok);
188        Box::new(iterator)
189    }
190
191    fn field_name(&self) -> IonResult<Self::Symbol> {
192        match self.current_field_name.as_ref() {
193            Some(name) => Ok(name.clone()),
194            None => illegal_operation(
195                "field_name() can only be called when the reader is positioned inside a struct",
196            ),
197        }
198    }
199
200    // TODO: See if the match statements for read_*() below could be simplified
201
202    fn read_null(&mut self) -> IonResult<IonType> {
203        match self.current_value.as_ref() {
204            Some(element) if element.is_null() => Ok(element.ion_type()),
205            _ => Err(self.expected("null value")),
206        }
207    }
208
209    fn read_bool(&mut self) -> IonResult<bool> {
210        self.current_value_as("bool value", |v| v.as_bool())
211    }
212
213    fn read_int(&mut self) -> IonResult<Int> {
214        self.current_value_as("int value", |v| v.as_int().map(|i| i.to_owned()))
215    }
216
217    fn read_i64(&mut self) -> IonResult<i64> {
218        match self.current_value.as_ref() {
219            Some(element) if element.as_int().is_some() => match element.as_int().unwrap() {
220                Int::I64(value) => Ok(*value),
221                Int::BigInt(value) => {
222                    decoding_error(format!("Integer {value} is too large to fit in an i64."))
223                }
224            },
225            _ => Err(self.expected("int value")),
226        }
227    }
228
229    fn read_f32(&mut self) -> IonResult<f32> {
230        self.current_value_as("float value", |v| v.as_float().map(|f| f as f32))
231    }
232
233    fn read_f64(&mut self) -> IonResult<f64> {
234        self.current_value_as("float value", |v| v.as_float())
235    }
236
237    fn read_decimal(&mut self) -> IonResult<Decimal> {
238        self.current_value_as("decimal value", |v| v.as_decimal().map(|i| i.to_owned()))
239    }
240
241    fn read_string(&mut self) -> IonResult<Str> {
242        match self.current_value.as_ref() {
243            Some(element) if element.as_text().is_some() => Ok(element.as_text().unwrap().into()),
244            _ => Err(self.expected("string value")),
245        }
246    }
247
248    fn read_str(&mut self) -> IonResult<&str> {
249        match self.current_value.as_ref() {
250            Some(element) if element.as_text().is_some() => Ok(element.as_text().unwrap()),
251            _ => Err(self.expected("string value")),
252        }
253    }
254
255    fn read_symbol(&mut self) -> IonResult<Self::Symbol> {
256        self.current_value_as("symbol value", |v| v.as_symbol().map(|i| i.to_owned()))
257    }
258
259    fn read_blob(&mut self) -> IonResult<Blob> {
260        match self.current_value.as_ref() {
261            Some(element) if element.as_blob().is_some() => {
262                Ok(Blob::from(element.as_blob().unwrap()))
263            }
264            _ => Err(self.expected("blog value")),
265        }
266    }
267
268    fn read_clob(&mut self) -> IonResult<Clob> {
269        match self.current_value.as_ref() {
270            Some(element) if element.as_clob().is_some() => {
271                Ok(Clob::from(element.as_clob().unwrap()))
272            }
273            _ => Err(self.expected("clob value")),
274        }
275    }
276
277    fn read_timestamp(&mut self) -> IonResult<Timestamp> {
278        self.current_value_as("timestamp value", |v| {
279            v.as_timestamp().map(|i| i.to_owned())
280        })
281    }
282
283    fn step_in(&mut self) -> IonResult<()> {
284        match &self.current_value {
285            Some(value) if value.ion_type().is_container() => {
286                self.parents.push(ParentContainer::new(value.ion_type()));
287                // Create a new iterator for values of the container that we are stepping into
288                let mut iter = ElementStreamReader::container_values(value.to_owned());
289                // Set `current_iter` to point to the new one, storing the old one in `iter`.
290                mem::swap(&mut iter, &mut self.current_iter);
291                // Put the old iterator on the stack
292                self.iter_stack.push(iter);
293                self.current_value = None;
294                Ok(())
295            }
296            Some(value) => {
297                illegal_operation(format!("Cannot step_in() to a {:?}", value.ion_type()))
298            }
299            None => illegal_operation(format!(
300                "{} {}",
301                "Cannot `step_in`: the reader is not positioned on a value.",
302                "Try calling `next()` to advance first."
303            )),
304        }
305    }
306
307    fn step_out(&mut self) -> IonResult<()> {
308        if self.parents.is_empty() {
309            return illegal_operation(
310                "Cannot call `step_out()` when the reader is at the top level.",
311            );
312        }
313
314        // The container we're stepping out of.
315        let parent = self.parents.last().unwrap();
316
317        // If we're not at the end of the current container, advance the cursor until we are.
318        if !parent.is_exhausted() {
319            while let StreamItem::Value(_) | StreamItem::Null(_) = self.next()? {}
320        }
321
322        // Remove the parent container from the stack and clear the current value.
323        let _ = self.parents.pop();
324
325        // Remove the iterator related to the parent container from stack and set it as current iterator
326        match self.iter_stack.pop() {
327            None => {}
328            Some(iter) => {
329                self.current_iter = iter;
330            }
331        }
332        self.current_value = None;
333
334        if self.parents.is_empty() {
335            // We're at the top level; nothing left to do.
336            return Ok(());
337        }
338
339        Ok(())
340    }
341
342    fn parent_type(&self) -> Option<IonType> {
343        self.parents.last().map(|parent| parent.ion_type())
344    }
345
346    fn depth(&self) -> usize {
347        self.parents.len()
348    }
349
350    fn ion_version(&self) -> (u8, u8) {
351        // An `Element` doesn't have an Ion version associated with it
352        // Since `Element`s are an in-memory representation fo Ion data, all versions of 1.x share the same Ion version.
353        (1, 0)
354    }
355}
356
357#[cfg(test)]
358mod reader_tests {
359    use rstest::*;
360
361    use super::*;
362    use crate::result::IonResult;
363    use crate::stream_reader::IonReader;
364    use crate::types::{Decimal, Timestamp};
365
366    use crate::IonType;
367
368    fn load_element(text: &str) -> Element {
369        Element::read_one(text.as_bytes()).expect("parsing failed unexpectedly")
370    }
371
372    fn next_type(reader: &mut ElementStreamReader, ion_type: IonType, is_null: bool) {
373        assert_eq!(
374            reader.next().unwrap(),
375            StreamItem::nullable_value(ion_type, is_null)
376        );
377    }
378
379    #[test]
380    fn test_skipping_containers() -> IonResult<()> {
381        let ion_data = load_element(
382            r#"
383            [1, 2, 3]
384        "#,
385        );
386        let reader = &mut ElementStreamReader::new(ion_data);
387
388        next_type(reader, IonType::List, false);
389        reader.step_in()?;
390        next_type(reader, IonType::Int, false);
391        assert_eq!(reader.read_i64()?, 1);
392        reader.step_out()?;
393        // This should skip 2, 3 and reach end of the element
394        // Asking for next here should result in `Nothing`
395        assert_eq!(reader.next()?, StreamItem::Nothing);
396        Ok(())
397    }
398
399    #[test]
400    fn test_read_nested_containers() -> IonResult<()> {
401        let ion_data = load_element(
402            r#"
403            {
404                foo: [
405                    1,
406                    [2, 3],
407                    4
408                ],
409                bar: {
410                    a: 5,
411                    b: (true true true)
412                }
413            }
414        "#,
415        );
416        let reader = &mut ElementStreamReader::new(ion_data);
417        next_type(reader, IonType::Struct, false);
418        reader.step_in()?;
419        next_type(reader, IonType::List, false);
420        reader.step_in()?;
421        next_type(reader, IonType::Int, false);
422        next_type(reader, IonType::List, false);
423        reader.step_in()?;
424        next_type(reader, IonType::Int, false);
425        // The reader is now at the '2' nested inside of 'foo'
426        reader.step_out()?;
427        reader.step_out()?;
428        next_type(reader, IonType::Struct, false);
429        reader.step_in()?;
430        next_type(reader, IonType::Int, false);
431        next_type(reader, IonType::SExp, false);
432        reader.step_in()?;
433        next_type(reader, IonType::Bool, false);
434        next_type(reader, IonType::Bool, false);
435        // The reader is now at the second 'true' in the s-expression nested in 'bar'/'b'
436        reader.step_out()?;
437        reader.step_out()?;
438        reader.step_out()?;
439        Ok(())
440    }
441
442    #[test]
443    fn test_read_container_with_mixed_scalars_and_containers() -> IonResult<()> {
444        let ion_data = load_element(
445            r#"
446            {
447                foo: 4,
448                bar: {
449                    a: 5,
450                    b: (true true true)
451                }
452            }
453        "#,
454        );
455
456        let reader = &mut ElementStreamReader::new(ion_data);
457        next_type(reader, IonType::Struct, false);
458        reader.step_in()?;
459        next_type(reader, IonType::Int, false);
460        assert_eq!(reader.field_name()?, Symbol::owned("foo"));
461        next_type(reader, IonType::Struct, false);
462        assert_eq!(reader.field_name()?, Symbol::owned("bar"));
463        reader.step_in()?;
464        next_type(reader, IonType::Int, false);
465        assert_eq!(reader.read_i64()?, 5);
466        reader.step_out()?;
467        assert_eq!(reader.next()?, StreamItem::Nothing);
468        reader.step_out()?;
469        Ok(())
470    }
471
472    #[test]
473    fn test_read_container_with_mixed_scalars() -> IonResult<()> {
474        let ion_data = load_element(
475            r#"
476            [ {{ZW5jb2RlZA==}}, {{"hello"}}, 4.5e0, 4.5, 2007-07-12T, foo, "hi!" ]
477        "#,
478        );
479
480        let reader = &mut ElementStreamReader::new(ion_data);
481        next_type(reader, IonType::List, false);
482        reader.step_in()?;
483        next_type(reader, IonType::Blob, false);
484        assert_eq!(reader.read_blob()?, Blob::from("encoded"));
485        next_type(reader, IonType::Clob, false);
486        assert_eq!(reader.read_clob()?, Clob::from("hello"));
487        next_type(reader, IonType::Float, false);
488        assert_eq!(reader.read_f64()?, 4.5);
489        next_type(reader, IonType::Decimal, false);
490        assert_eq!(reader.read_decimal()?, Decimal::new(45, -1));
491        next_type(reader, IonType::Timestamp, false);
492        assert_eq!(
493            reader.read_timestamp()?,
494            Timestamp::with_ymd(2007, 7, 12).build().unwrap()
495        );
496        next_type(reader, IonType::Symbol, false);
497        assert_eq!(reader.read_symbol()?, Symbol::owned("foo"));
498        next_type(reader, IonType::String, false);
499        assert_eq!(reader.read_string()?, "hi!".to_string());
500        reader.step_out()?;
501        Ok(())
502    }
503
504    #[rstest]
505    #[case(" null ", Element::from(IonType::Null))]
506    #[case(" null.string ", Element::from(IonType::String))]
507    #[case(" true ", true)]
508    #[case(" false ", false)]
509    #[case(" 738 ", 738)]
510    #[case(" 2.5e0 ", 2.5)]
511    #[case(" 2.5 ", Decimal::new(25, -1))]
512    #[case(" 2007-07-12T ", Timestamp::with_ymd(2007, 7, 12).build().unwrap())]
513    #[case(" foo ", Symbol::owned("foo"))]
514    #[case(" \"hi!\" ", "hi!".to_owned())]
515    #[case(" {{ZW5jb2RlZA==}} ", Blob::from("encoded"))]
516    #[case(" {{\"hello\"}} ", Clob::from("hello"))]
517    fn test_read_single_top_level_values<E: Into<Element>>(
518        #[case] text: &str,
519        #[case] expected_value: E,
520    ) {
521        let reader = &mut ElementStreamReader::new(load_element(text));
522        let expected_element = expected_value.into();
523        next_type(
524            reader,
525            expected_element.ion_type(),
526            expected_element.is_null(),
527        );
528        // TODO: Redo (or remove?) this test. There's not an API that exposes the
529        //       AnnotatedTextValue any more. We're directly accessing `current_value` as a hack.
530        let actual_element = reader.current_value.clone();
531        assert_eq!(actual_element.unwrap(), expected_element);
532    }
533
534    #[rstest]
535    #[case(" foo::bar::null ", Element::from(IonType::Null).with_annotations(["foo", "bar"]))]
536    #[case(" foo::true ", Element::from(true).with_annotations(["foo"]))]
537    #[case(" 'foo'::5 ", Element::from(5).with_annotations(["foo"]))]
538    fn test_top_level_values_with_annotations<E: Into<Element>>(
539        #[case] text: &str,
540        #[case] expected_value: E,
541    ) {
542        let reader = &mut ElementStreamReader::new(load_element(text));
543        let expected_element = expected_value.into();
544        next_type(
545            reader,
546            expected_element.ion_type(),
547            expected_element.is_null(),
548        );
549        let actual_element = reader.current_value.clone();
550        // check if both the elements are equal, this also considers annotations equality
551        assert_eq!(actual_element.unwrap(), expected_element);
552
553        // verify if the annotations are read without error
554        let reader_annotations: IonResult<Vec<Symbol>> = reader.annotations().collect();
555        assert!(reader_annotations.is_ok());
556    }
557
558    #[test]
559    fn structs_trailing_comma() -> IonResult<()> {
560        let pretty_ion = load_element(
561            r#"
562            // Structs with last field with/without trailing comma
563            (
564                {a:1, b:2,}     // with trailing comma
565                {a:1, b:2 }     // without trailing comma
566            )
567        "#,
568        );
569        let mut reader = ElementStreamReader::new(pretty_ion);
570        assert_eq!(reader.next()?, StreamItem::Value(IonType::SExp));
571        reader.step_in()?;
572        assert_eq!(reader.next()?, StreamItem::Value(IonType::Struct));
573
574        reader.step_in()?;
575        assert_eq!(reader.next()?, StreamItem::Value(IonType::Int));
576        assert_eq!(reader.field_name()?, Symbol::owned("a".to_string()));
577        assert_eq!(reader.read_i64()?, 1);
578        assert_eq!(reader.next()?, StreamItem::Value(IonType::Int));
579        assert_eq!(reader.field_name()?, Symbol::owned("b".to_string()));
580        assert_eq!(reader.read_i64()?, 2);
581        reader.step_out()?;
582
583        assert_eq!(reader.next()?, StreamItem::Value(IonType::Struct));
584        reader.step_out()?;
585        Ok(())
586    }
587}