partiql_extension_ion/
decode.rs

1use delegate::delegate;
2use ion_rs_old::{Decimal, Int, IonError, IonReader, IonType, StreamItem, Symbol};
3use once_cell::sync::Lazy;
4use partiql_value::{Bag, DateTime, EdgeSpec, Graph, List, SimpleGraph, Tuple, Value, Variant};
5use regex::RegexSet;
6use rust_decimal::prelude::ToPrimitive;
7use std::collections::HashSet;
8
9use crate::boxed_ion::BoxedIonType;
10use crate::common::{
11    Encoding, BAG_ANNOT, BOXED_ION_ANNOT, DATE_ANNOT, GRAPH_ANNOT, MISSING_ANNOT,
12    RE_SET_TIME_PARTS, TIME_ANNOT, TIME_PARTS_HOUR, TIME_PARTS_MINUTE, TIME_PARTS_SECOND,
13    TIME_PARTS_TZ_HOUR, TIME_PARTS_TZ_MINUTE,
14};
15use std::num::NonZeroU8;
16use std::rc::Rc;
17use std::str::FromStr;
18use thiserror::Error;
19use time::Duration;
20
21/// Errors in ion decoding.
22///
23/// ### Notes
24/// This is marked `#[non_exhaustive]`, to reserve the right to add more variants in the future.
25#[derive(Error, Debug, Clone, PartialEq)]
26#[non_exhaustive]
27pub enum IonDecodeError {
28    /// Ion Reader error.
29    #[error("Ion read error: `{}`", .0)]
30    IonReaderError(IonError),
31
32    /// Unsupported type error.
33    #[error("Ion read error: unsupported value of type `{}`", .0)]
34    UnsupportedType(&'static str),
35
36    /// Conversion error.
37    #[error("Ion read error: conversion error `{}`", .0)]
38    ConversionError(String),
39
40    /// Stream error.
41    #[error("Ion read error: stream error `{}`", .0)]
42    StreamError(String),
43
44    /// Any other reading error.
45    #[error("Ion read error: unknown error")]
46    Unknown,
47}
48
49impl From<IonError> for IonDecodeError {
50    fn from(value: IonError) -> Self {
51        IonDecodeError::IonReaderError(value)
52    }
53}
54
55impl From<rust_decimal::Error> for IonDecodeError {
56    fn from(value: rust_decimal::Error) -> Self {
57        IonDecodeError::ConversionError(format!("bad decimal conversion: `{value}`"))
58    }
59}
60
61/// Result of attempts to decode a [`Value`] from Ion.
62pub type IonDecodeResult = Result<Value, IonDecodeError>;
63
64/// Config for construction an Ion decoder.
65pub struct IonDecoderConfig {
66    mode: Encoding,
67}
68
69impl IonDecoderConfig {
70    /// Set the mode to `mode`
71    #[must_use]
72    pub fn with_mode(mut self, mode: crate::Encoding) -> Self {
73        self.mode = mode;
74        self
75    }
76}
77
78impl Default for IonDecoderConfig {
79    fn default() -> Self {
80        IonDecoderConfig {
81            mode: crate::Encoding::Ion,
82        }
83    }
84}
85
86/// Builder for creating a decoder.
87pub struct IonDecoderBuilder {
88    config: IonDecoderConfig,
89}
90
91impl IonDecoderBuilder {
92    /// Create the builder from 'config'
93    #[must_use]
94    pub fn new(config: IonDecoderConfig) -> Self {
95        Self { config }
96    }
97
98    /// Create a decoder given the previously specified config and an ion [`Reader`].
99    pub fn build<'a>(
100        self,
101        reader: impl 'a + IonReader<Item = StreamItem, Symbol = Symbol>,
102    ) -> Result<IonValueIter<'a>, IonDecodeError> {
103        let decoder = SimpleIonValueDecoder {};
104        let inner: Box<dyn Iterator<Item = IonDecodeResult>> = match self.config.mode {
105            crate::Encoding::Ion => Box::new(IonValueIterInner { reader, decoder }),
106            crate::Encoding::PartiqlEncodedAsIon => {
107                let decoder = PartiqlEncodedIonValueDecoder { inner: decoder };
108                Box::new(IonValueIterInner { reader, decoder })
109            }
110        };
111
112        Ok(IonValueIter { inner })
113    }
114}
115
116impl Default for IonDecoderBuilder {
117    fn default() -> Self {
118        Self::new(Default::default())
119    }
120}
121
122/// An Iterator over [`IonDecodeResult`] corresponding to the decoded top-level Ion stream values.
123pub struct IonValueIter<'a> {
124    inner: Box<dyn Iterator<Item = IonDecodeResult> + 'a>,
125}
126
127impl Iterator for IonValueIter<'_> {
128    type Item = IonDecodeResult;
129
130    #[inline]
131    fn next(&mut self) -> Option<Self::Item> {
132        self.inner.next()
133    }
134
135    #[inline]
136    fn size_hint(&self) -> (usize, Option<usize>) {
137        self.inner.size_hint()
138    }
139}
140
141struct IonValueIterInner<D, R>
142where
143    D: IonValueDecoder<R>,
144    R: IonReader<Item = StreamItem, Symbol = Symbol>,
145{
146    reader: R,
147    decoder: D,
148}
149
150impl<D, R> Iterator for IonValueIterInner<D, R>
151where
152    D: IonValueDecoder<R>,
153    R: IonReader<Item = StreamItem, Symbol = Symbol>,
154{
155    type Item = IonDecodeResult;
156
157    fn next(&mut self) -> Option<Self::Item> {
158        match self.reader.next() {
159            Ok(StreamItem::Value(typ)) => Some(self.decoder.decode_value(&mut self.reader, typ)),
160            Ok(StreamItem::Null(_)) => Some(self.decoder.decode_null(&mut self.reader)),
161            Ok(StreamItem::Nothing) => None,
162            Err(e) => Some(Err(e.into())),
163        }
164    }
165}
166#[inline]
167fn dispatch_decode_value<R, D>(decoder: &D, reader: &mut R, typ: IonType) -> IonDecodeResult
168where
169    R: IonReader<Item = StreamItem, Symbol = Symbol>,
170    D: IonValueDecoder<R> + ?Sized,
171{
172    match typ {
173        IonType::Null => decoder.decode_null(reader),
174        IonType::Bool => decoder.decode_bool(reader),
175        IonType::Int => decoder.decode_int(reader),
176        IonType::Float => decoder.decode_float(reader),
177        IonType::Decimal => decoder.decode_decimal(reader),
178        IonType::Timestamp => decoder.decode_timestamp(reader),
179        IonType::Symbol => decoder.decode_symbol(reader),
180        IonType::String => decoder.decode_string(reader),
181        IonType::Clob => decoder.decode_clob(reader),
182        IonType::Blob => decoder.decode_blob(reader),
183        IonType::List => decoder.decode_list(reader),
184        IonType::SExp => decoder.decode_sexp(reader),
185        IonType::Struct => decoder.decode_struct(reader),
186    }
187}
188
189trait IonValueDecoder<R>
190where
191    R: IonReader<Item = StreamItem, Symbol = Symbol>,
192{
193    #[inline]
194    fn decode_value(&self, reader: &mut R, typ: IonType) -> IonDecodeResult {
195        dispatch_decode_value(self, reader, typ)
196    }
197
198    fn decode_null(&self, reader: &mut R) -> IonDecodeResult;
199    fn decode_bool(&self, reader: &mut R) -> IonDecodeResult;
200    fn decode_int(&self, reader: &mut R) -> IonDecodeResult;
201    fn decode_float(&self, reader: &mut R) -> IonDecodeResult;
202    fn decode_decimal(&self, reader: &mut R) -> IonDecodeResult;
203    fn decode_timestamp(&self, reader: &mut R) -> IonDecodeResult;
204    fn decode_symbol(&self, reader: &mut R) -> IonDecodeResult;
205    fn decode_string(&self, reader: &mut R) -> IonDecodeResult;
206    fn decode_clob(&self, reader: &mut R) -> IonDecodeResult;
207    fn decode_blob(&self, reader: &mut R) -> IonDecodeResult;
208    fn decode_list(&self, reader: &mut R) -> IonDecodeResult;
209    fn decode_sexp(&self, reader: &mut R) -> IonDecodeResult;
210    fn decode_struct(&self, reader: &mut R) -> IonDecodeResult;
211}
212
213fn ion_decimal_to_decimal(ion_dec: Decimal) -> Result<rust_decimal::Decimal, rust_decimal::Error> {
214    // TODO ion Decimal doesn't give a lot of functionality to get at the data currently
215    // TODO    and it's not clear whether we'll continue with rust decimal or switch to big decimal
216    let ion_dec_str = format!("{ion_dec}").replace('d', "e");
217    rust_decimal::Decimal::from_str(&ion_dec_str)
218        .or_else(|_| rust_decimal::Decimal::from_scientific(&ion_dec_str))
219}
220
221struct SimpleIonValueDecoder {}
222
223impl<R> IonValueDecoder<R> for SimpleIonValueDecoder
224where
225    R: IonReader<Item = StreamItem, Symbol = Symbol>,
226{
227    #[inline]
228    fn decode_null(&self, _: &mut R) -> IonDecodeResult {
229        Ok(Value::Null)
230    }
231
232    #[inline]
233    fn decode_bool(&self, reader: &mut R) -> IonDecodeResult {
234        Ok(Value::Boolean(reader.read_bool()?))
235    }
236
237    #[inline]
238    fn decode_int(&self, reader: &mut R) -> IonDecodeResult {
239        match reader.read_int()? {
240            Int::I64(i) => Ok(Value::Integer(i)),
241            Int::BigInt(_) => Err(IonDecodeError::UnsupportedType("bigint")),
242        }
243    }
244
245    #[inline]
246    fn decode_float(&self, reader: &mut R) -> IonDecodeResult {
247        Ok(Value::Real(reader.read_f64()?.into()))
248    }
249
250    #[inline]
251    fn decode_decimal(&self, reader: &mut R) -> IonDecodeResult {
252        let dec = ion_decimal_to_decimal(reader.read_decimal()?);
253        Ok(Value::Decimal(Box::new(dec?)))
254    }
255
256    #[inline]
257    fn decode_timestamp(&self, reader: &mut R) -> IonDecodeResult {
258        let ts = reader.read_timestamp()?;
259        let offset = ts.offset();
260        let datetime = DateTime::from_ymdhms_nano_offset_minutes(
261            ts.year(),
262            NonZeroU8::new(ts.month() as u8).ok_or(IonDecodeError::ConversionError(
263                "month outside of range".into(),
264            ))?,
265            ts.day() as u8,
266            ts.hour() as u8,
267            ts.minute() as u8,
268            ts.second() as u8,
269            ts.nanoseconds(),
270            offset,
271        );
272        Ok(datetime.into())
273    }
274
275    #[inline]
276    fn decode_symbol(&self, reader: &mut R) -> IonDecodeResult {
277        Ok(Value::String(Box::new(
278            reader.read_symbol()?.text_or_error()?.to_string(),
279        )))
280    }
281
282    #[inline]
283    fn decode_string(&self, reader: &mut R) -> IonDecodeResult {
284        Ok(Value::String(Box::new(
285            reader.read_string()?.text().to_string(),
286        )))
287    }
288
289    #[inline]
290    fn decode_clob(&self, reader: &mut R) -> IonDecodeResult {
291        Ok(Value::Blob(Box::new(reader.read_clob()?.as_slice().into())))
292    }
293
294    #[inline]
295    fn decode_blob(&self, reader: &mut R) -> IonDecodeResult {
296        Ok(Value::Blob(Box::new(reader.read_blob()?.as_slice().into())))
297    }
298
299    #[inline]
300    fn decode_list(&self, reader: &mut R) -> IonDecodeResult {
301        decode_list(self, reader)
302    }
303
304    #[inline]
305    fn decode_sexp(&self, _: &mut R) -> IonDecodeResult {
306        Err(IonDecodeError::UnsupportedType("sexp"))
307    }
308
309    #[inline]
310    fn decode_struct(&self, reader: &mut R) -> IonDecodeResult {
311        decode_struct(self, reader)
312    }
313}
314
315#[inline]
316fn decode_list<R>(decoder: &impl IonValueDecoder<R>, reader: &mut R) -> IonDecodeResult
317where
318    R: IonReader<Item = StreamItem, Symbol = Symbol>,
319{
320    reader.step_in()?;
321    let mut values = vec![];
322    'values: loop {
323        let item = reader.next()?;
324        let val = match item {
325            StreamItem::Value(typ) => decoder.decode_value(reader, typ)?,
326            StreamItem::Null(_) => decoder.decode_null(reader)?,
327            StreamItem::Nothing => break 'values,
328        };
329        values.push(val);
330    }
331    reader.step_out()?;
332    Ok(List::from(values).into())
333}
334
335#[inline]
336fn decode_struct<R>(decoder: &impl IonValueDecoder<R>, reader: &mut R) -> IonDecodeResult
337where
338    R: IonReader<Item = StreamItem, Symbol = Symbol>,
339{
340    let mut tuple = Tuple::new();
341    reader.step_in()?;
342    'kv: loop {
343        let item = reader.next()?;
344        let (key, value) = match item {
345            StreamItem::Value(typ) => {
346                let field_name = reader.field_name()?;
347                (field_name, decoder.decode_value(reader, typ)?)
348            }
349            StreamItem::Null(_) => (reader.field_name()?, decoder.decode_null(reader)?),
350            StreamItem::Nothing => break 'kv,
351        };
352        tuple.insert(key.text_or_error()?, value);
353    }
354    reader.step_out()?;
355    Ok(tuple.into())
356}
357
358struct PartiqlEncodedIonValueDecoder {
359    inner: SimpleIonValueDecoder,
360}
361
362#[inline]
363fn has_annotation(
364    reader: &impl IonReader<Item = StreamItem, Symbol = Symbol>,
365    annot: &str,
366) -> bool {
367    reader.annotations().any(|a| a.is_ok_and(|a| a == annot))
368}
369
370static TIME_PARTS_PATTERN_SET: Lazy<RegexSet> =
371    Lazy::new(|| RegexSet::new(RE_SET_TIME_PARTS).unwrap());
372
373type GNode = (String, HashSet<String>, Option<Value>);
374type GNodes = (Vec<String>, Vec<HashSet<String>>, Vec<Option<Value>>);
375#[allow(clippy::type_complexity)]
376type GEdge = (
377    String,
378    HashSet<String>,
379    (String, String, String),
380    Option<Value>,
381);
382#[allow(clippy::type_complexity)]
383type GEdges = (
384    Vec<String>,
385    Vec<HashSet<String>>,
386    Vec<(String, String, String)>,
387    Vec<Option<Value>>,
388);
389
390impl PartiqlEncodedIonValueDecoder {
391    fn decode_date<R>(&self, reader: &mut R) -> IonDecodeResult
392    where
393        R: IonReader<Item = StreamItem, Symbol = Symbol>,
394    {
395        let ts = reader.read_timestamp()?;
396        let datetime = DateTime::from_ymd(
397            ts.year(),
398            NonZeroU8::new(ts.month() as u8).ok_or(IonDecodeError::ConversionError(
399                "month outside of range".into(),
400            ))?,
401            ts.day() as u8,
402        );
403        Ok(datetime.into())
404    }
405
406    fn decode_time<R>(&self, reader: &mut R) -> IonDecodeResult
407    where
408        R: IonReader<Item = StreamItem, Symbol = Symbol>,
409    {
410        fn expect_u8<R>(
411            reader: &mut R,
412            typ: Option<IonType>,
413            unit: &'static str,
414        ) -> Result<u8, IonDecodeError>
415        where
416            R: IonReader<Item = StreamItem, Symbol = Symbol>,
417        {
418            match typ {
419                Some(IonType::Int) => match reader.read_int()? {
420                    Int::I64(i) => Ok(i as u8), // TODO check range
421                    Int::BigInt(_) => Err(IonDecodeError::ConversionError(format!(
422                        "value for {unit} outside of range"
423                    ))),
424                },
425                _ => Err(IonDecodeError::ConversionError(format!(
426                    "value for {unit} unexpected type"
427                ))),
428            }
429        }
430        fn maybe_i8<R>(
431            reader: &mut R,
432            typ: Option<IonType>,
433            unit: &'static str,
434        ) -> Result<Option<i8>, IonDecodeError>
435        where
436            R: IonReader<Item = StreamItem, Symbol = Symbol>,
437        {
438            match typ {
439                Some(IonType::Int) => match reader.read_int()? {
440                    Int::I64(i) => Ok(Some(i as i8)), // TODO check range
441                    Int::BigInt(_) => Err(IonDecodeError::ConversionError(format!(
442                        "value for {unit} outside of range"
443                    ))),
444                },
445                None => Ok(None),
446                Some(IonType::Null) => Ok(None),
447                _ => Err(IonDecodeError::ConversionError(format!(
448                    "value for {unit} unexpected type {typ:?}"
449                ))),
450            }
451        }
452        fn expect_f64<R>(
453            reader: &mut R,
454            typ: Option<IonType>,
455            unit: &'static str,
456        ) -> Result<f64, IonDecodeError>
457        where
458            R: IonReader<Item = StreamItem, Symbol = Symbol>,
459        {
460            match typ {
461                Some(IonType::Decimal) => {
462                    let dec = ion_decimal_to_decimal(reader.read_decimal()?);
463                    Ok(dec?.to_f64().unwrap_or(0f64))
464                }
465                Some(IonType::Float) => Ok(reader.read_f64()?),
466                _ => Err(IonDecodeError::ConversionError(format!(
467                    "value for {unit} unexpected type"
468                ))),
469            }
470        }
471
472        #[derive(Default)]
473        struct TimeParts {
474            pub hour: Option<u8>,
475            pub minute: Option<u8>,
476            pub second: Option<f64>,
477            pub tz_hour: Option<i8>,
478            pub tz_minute: Option<i8>,
479        }
480
481        let mut time = TimeParts::default();
482        let patterns: &RegexSet = &TIME_PARTS_PATTERN_SET;
483
484        reader.step_in()?;
485        #[allow(irrefutable_let_patterns)]
486        while let item = reader.next()? {
487            let (key, typ) = match item {
488                StreamItem::Value(typ) => (reader.field_name()?, Some(typ)),
489                StreamItem::Null(_) => (reader.field_name()?, None),
490                StreamItem::Nothing => break,
491            };
492            let matches = patterns.matches(key.text_or_error()?);
493            match matches.into_iter().next() {
494                Some(TIME_PARTS_HOUR) => time.hour = Some(expect_u8(reader, typ, "hour")?),
495                Some(TIME_PARTS_MINUTE) => time.minute = Some(expect_u8(reader, typ, "minute")?),
496                Some(TIME_PARTS_SECOND) => time.second = Some(expect_f64(reader, typ, "second")?),
497                Some(TIME_PARTS_TZ_HOUR) => time.tz_hour = maybe_i8(reader, typ, "tz_hour")?,
498                Some(TIME_PARTS_TZ_MINUTE) => time.tz_minute = maybe_i8(reader, typ, "tz_minute")?,
499                _ => {
500                    return Err(IonDecodeError::ConversionError(
501                        "unexpected field name for time".to_string(),
502                    ))
503                }
504            }
505        }
506        reader.step_out()?;
507
508        let hour = time.hour.ok_or_else(|| {
509            IonDecodeError::ConversionError("expected `hour` key for DateTime".into())
510        })?;
511        let minute = time.minute.ok_or_else(|| {
512            IonDecodeError::ConversionError("expected `minute` key for DateTime".into())
513        })?;
514        let second = time.second.ok_or_else(|| {
515            IonDecodeError::ConversionError("expected `second` key for DateTime".into())
516        })?;
517        let seconds = Duration::seconds_f64(second);
518        let datetime = DateTime::from_hms_nano_tz(
519            hour,
520            minute,
521            seconds.whole_seconds() as u8,
522            seconds.subsec_nanoseconds() as u32,
523            time.tz_hour,
524            time.tz_minute,
525        );
526        Ok(datetime.into())
527    }
528
529    fn decode_boxed<R>(&self, reader: &mut R) -> IonDecodeResult
530    where
531        R: IonReader<Item = StreamItem, Symbol = Symbol>,
532    {
533        let annot: Vec<_> = reader
534            .annotations()
535            .skip(1) // skip the `$ion` boxing annotation
536            .filter_map(|s| s.ok().and_then(|s| s.text().map(|s| s.to_string())))
537            .collect();
538        let mut loader = ion_elt::ElementLoader::for_reader(reader);
539        let elt = loader.materialize_current()?.unwrap();
540        let elt = elt.with_annotations(annot);
541
542        let ion_ctor = Box::new(BoxedIonType {});
543        let contents = elt.to_string();
544        Ok(Value::from(
545            Variant::new(contents, ion_ctor)
546                .map_err(|e| IonDecodeError::StreamError(e.to_string()))?,
547        ))
548    }
549
550    fn decode_graph<R>(&self, reader: &mut R) -> IonDecodeResult
551    where
552        R: IonReader<Item = StreamItem, Symbol = Symbol>,
553    {
554        let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
555        let mut nodes = None;
556        let mut edges = None;
557        reader.step_in()?;
558        'kv: loop {
559            match reader.next()? {
560                StreamItem::Value(typ) => match typ {
561                    IonType::List => match reader.field_name()?.text_or_error()? {
562                        "nodes" => nodes = Some(self.decode_nodes(reader)?),
563                        "edges" => edges = Some(self.decode_edges(reader)?),
564                        _ => return Err(err()),
565                    },
566                    _ => return Err(err()),
567                },
568                StreamItem::Null(_) => return Err(err()),
569                StreamItem::Nothing => break 'kv,
570            }
571        }
572        reader.step_out()?;
573
574        let nodes = nodes.ok_or_else(err)?;
575        let (ids, labels, ends, payloads) = edges.ok_or_else(err)?;
576        let edge_specs = ends
577            .into_iter()
578            .map(|(l, dir, r)| match dir.as_str() {
579                "->" => Ok(EdgeSpec::Directed(l, r)),
580                "<-" => Ok(EdgeSpec::Directed(r, l)),
581                "--" => Ok(EdgeSpec::Undirected(l, r)),
582                _ => Err(err()),
583            })
584            .collect::<Result<Vec<EdgeSpec>, _>>()?;
585        Ok(Value::Graph(Box::new(Graph::Simple(Rc::new(
586            SimpleGraph::from_spec(nodes, (ids, labels, edge_specs, payloads)),
587        )))))
588    }
589
590    fn decode_nodes<R>(&self, reader: &mut R) -> Result<GNodes, IonDecodeError>
591    where
592        R: IonReader<Item = StreamItem, Symbol = Symbol>,
593    {
594        let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
595        reader.step_in()?;
596        let mut ids = vec![];
597        let mut labels = vec![];
598        let mut payloads = vec![];
599        'values: loop {
600            let item = reader.next()?;
601            match item {
602                StreamItem::Nothing => break 'values,
603                StreamItem::Value(IonType::Struct) => {
604                    let (id, labelset, payload) = self.decode_node(reader)?;
605                    ids.push(id);
606                    labels.push(labelset);
607                    payloads.push(payload);
608                }
609                _ => return Err(err()),
610            }
611        }
612        reader.step_out()?;
613        Ok((ids, labels, payloads))
614    }
615
616    fn decode_node<R>(&self, reader: &mut R) -> Result<GNode, IonDecodeError>
617    where
618        R: IonReader<Item = StreamItem, Symbol = Symbol>,
619    {
620        let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
621        let mut id = None;
622        let mut labels = None;
623        let mut payload = None;
624        reader.step_in()?;
625        'kv: loop {
626            let item = reader.next()?;
627            if item == StreamItem::Nothing {
628                break 'kv;
629            }
630            let fname = reader.field_name()?;
631            let fname = fname.text_or_error()?;
632            match (fname, item) {
633                ("id", StreamItem::Value(IonType::Symbol)) => {
634                    id = Some(reader.read_symbol()?.text_or_error()?.to_string());
635                }
636                ("labels", StreamItem::Value(IonType::List)) => {
637                    let mut labelset = HashSet::new();
638                    reader.step_in()?;
639                    #[allow(irrefutable_let_patterns)]
640                    while let item = reader.next()? {
641                        match item {
642                            StreamItem::Value(IonType::String) => {
643                                labelset.insert(reader.read_string()?.text().to_string());
644                            }
645                            StreamItem::Nothing => break,
646                            _ => return Err(err()),
647                        }
648                    }
649                    reader.step_out()?;
650                    labels = Some(labelset);
651                }
652                ("payload", StreamItem::Value(typ)) => {
653                    payload = Some(self.decode_value(reader, typ)?);
654                }
655                _ => return Err(err()),
656            }
657        }
658        reader.step_out()?;
659
660        let id = id.ok_or_else(err)?;
661        let labels = labels.unwrap_or_else(Default::default);
662        Ok((id, labels, payload))
663    }
664
665    fn decode_edges<R>(&self, reader: &mut R) -> Result<GEdges, IonDecodeError>
666    where
667        R: IonReader<Item = StreamItem, Symbol = Symbol>,
668    {
669        let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
670        reader.step_in()?;
671        let mut ids = vec![];
672        let mut labels = vec![];
673        let mut ends = vec![];
674        let mut payloads = vec![];
675        'values: loop {
676            let item = reader.next()?;
677            match item {
678                StreamItem::Nothing => break 'values,
679                StreamItem::Value(IonType::Struct) => {
680                    let (id, labelset, end, payload) = self.decode_edge(reader)?;
681                    ids.push(id);
682                    labels.push(labelset);
683                    ends.push(end);
684                    payloads.push(payload);
685                }
686                _ => return Err(err()),
687            }
688        }
689        reader.step_out()?;
690        Ok((ids, labels, ends, payloads))
691    }
692
693    fn decode_edge<R>(&self, reader: &mut R) -> Result<GEdge, IonDecodeError>
694    where
695        R: IonReader<Item = StreamItem, Symbol = Symbol>,
696    {
697        let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
698        let mut id = None;
699        let mut labels = None;
700        let mut ends = None;
701        let mut payload = None;
702        reader.step_in()?;
703        'kv: loop {
704            let item = reader.next()?;
705            if item == StreamItem::Nothing {
706                break 'kv;
707            }
708            let fname = reader.field_name()?;
709            let fname = fname.text_or_error()?;
710            match (fname, item) {
711                ("id", StreamItem::Value(IonType::Symbol)) => {
712                    id = Some(reader.read_symbol()?.text_or_error()?.to_string());
713                }
714                ("labels", StreamItem::Value(IonType::List)) => {
715                    let mut labelset = HashSet::new();
716                    reader.step_in()?;
717                    #[allow(irrefutable_let_patterns)]
718                    while let item = reader.next()? {
719                        match item {
720                            StreamItem::Value(IonType::String) => {
721                                labelset.insert(reader.read_string()?.text().to_string());
722                            }
723                            StreamItem::Nothing => break,
724                            _ => return Err(err()),
725                        }
726                    }
727                    reader.step_out()?;
728                    labels = Some(labelset);
729                }
730                ("ends", StreamItem::Value(IonType::SExp)) => {
731                    reader.step_in()?;
732                    reader.next()?;
733                    let l = reader.read_symbol()?.text_or_error()?.to_string();
734                    reader.next()?;
735                    let dir = reader.read_symbol()?.text_or_error()?.to_string();
736                    reader.next()?;
737                    let r = reader.read_symbol()?.text_or_error()?.to_string();
738                    reader.step_out()?;
739                    ends = Some((l, dir, r));
740                }
741                ("payload", StreamItem::Value(typ)) => {
742                    payload = Some(self.decode_value(reader, typ)?);
743                }
744                _ => return Err(err()),
745            }
746        }
747        reader.step_out()?;
748
749        let id = id.ok_or_else(err)?;
750        let labels = labels.unwrap_or_else(Default::default);
751        let ends = ends.ok_or_else(err)?;
752        Ok((id, labels, ends, payload))
753    }
754}
755
756impl<R> IonValueDecoder<R> for PartiqlEncodedIonValueDecoder
757where
758    R: IonReader<Item = StreamItem, Symbol = Symbol>,
759{
760    fn decode_value(&self, reader: &mut R, typ: IonType) -> IonDecodeResult {
761        if has_annotation(reader, BOXED_ION_ANNOT) {
762            self.decode_boxed(reader)
763        } else {
764            dispatch_decode_value(self, reader, typ)
765        }
766    }
767
768    #[inline]
769    fn decode_null(&self, reader: &mut R) -> IonDecodeResult {
770        if has_annotation(reader, BOXED_ION_ANNOT) {
771            self.decode_boxed(reader)
772        } else if has_annotation(reader, MISSING_ANNOT) {
773            Ok(Value::Missing)
774        } else {
775            Ok(Value::Null)
776        }
777    }
778
779    #[inline]
780    fn decode_timestamp(&self, reader: &mut R) -> IonDecodeResult {
781        if has_annotation(reader, DATE_ANNOT) {
782            self.decode_date(reader)
783        } else {
784            self.inner.decode_timestamp(reader)
785        }
786    }
787
788    #[inline]
789    fn decode_list(&self, reader: &mut R) -> IonDecodeResult {
790        let is_bag = has_annotation(reader, BAG_ANNOT);
791        let list = decode_list(self, reader);
792        if is_bag {
793            Ok(Bag::from(list?.coerce_into_list()).into())
794        } else {
795            list
796        }
797    }
798
799    #[inline]
800    fn decode_struct(&self, reader: &mut R) -> IonDecodeResult {
801        if has_annotation(reader, TIME_ANNOT) {
802            self.decode_time(reader)
803        } else if has_annotation(reader, GRAPH_ANNOT) {
804            self.decode_graph(reader)
805        } else {
806            decode_struct(self, reader)
807        }
808    }
809
810    delegate! {
811        to self.inner {
812            fn decode_bool(&self, reader: &mut R) -> IonDecodeResult;
813            fn decode_int(&self, reader: &mut R) -> IonDecodeResult;
814            fn decode_float(&self, reader: &mut R) -> IonDecodeResult;
815            fn decode_decimal(&self, reader: &mut R) -> IonDecodeResult;
816            fn decode_symbol(&self, reader: &mut R) -> IonDecodeResult;
817            fn decode_string(&self, reader: &mut R) -> IonDecodeResult;
818            fn decode_clob(&self, reader: &mut R) -> IonDecodeResult;
819            fn decode_blob(&self, reader: &mut R) -> IonDecodeResult;
820            fn decode_sexp(&self, reader: &mut R) -> IonDecodeResult;
821        }
822    }
823}
824
825// Code in this module is copied from ion-rs v0.18, in order to make use of `materialize_current`,
826// which is not exposed there.
827mod ion_elt {
828    use ion_rs_old::element::{Element, IntoAnnotatedElement, Sequence, Struct, Value};
829    use ion_rs_old::{IonReader, IonResult, StreamItem, Symbol};
830
831    /// Helper type; wraps an [ElementReader] and recursively materializes the next value in the
832    /// reader's input, reporting any errors that might occur along the way.
833    pub(crate) struct ElementLoader<'a, R: ?Sized> {
834        reader: &'a mut R,
835    }
836
837    impl<'a, R: IonReader<Item = StreamItem, Symbol = Symbol> + ?Sized> ElementLoader<'a, R> {
838        pub(crate) fn for_reader(reader: &'a mut R) -> ElementLoader<'a, R> {
839            ElementLoader { reader }
840        }
841
842        /// Advances the reader to the next value in the stream and uses [Self::materialize_current]
843        /// to materialize it.
844        pub(crate) fn materialize_next(&mut self) -> IonResult<Option<Element>> {
845            // Advance the reader to the next value
846            let _ = self.reader.next()?;
847            self.materialize_current()
848        }
849
850        /// Recursively materialize the reader's current Ion value and returns it as `Ok(Some(value))`.
851        /// If there are no more values at this level, returns `Ok(None)`.
852        /// If an error occurs while materializing the value, returns an `Err`.
853        /// Calling this method advances the reader and consumes the current value.
854        pub(crate) fn materialize_current(&mut self) -> IonResult<Option<Element>> {
855            // Collect this item's annotations into a Vec. We have to do this before materializing the
856            // value itself because materializing a collection requires advancing the reader further.
857            let mut annotations = Vec::new();
858            // Current API limitations require `self.reader.annotations()` to heap allocate its
859            // iterator even if there aren't annotations. `self.reader.has_annotations()` is trivial
860            // and allows us to skip the heap allocation in the common case.
861            if self.reader.has_annotations() {
862                for annotation in self.reader.annotations() {
863                    annotations.push(annotation?);
864                }
865            }
866
867            let value = match self.reader.current() {
868                // No more values at this level of the stream
869                StreamItem::Nothing => return Ok(None),
870                // This is a typed null
871                StreamItem::Null(ion_type) => Value::Null(ion_type),
872                // This is a non-null value
873                StreamItem::Value(ion_type) => {
874                    use ion_rs_old::IonType::*;
875                    match ion_type {
876                        Null => unreachable!("non-null value had IonType::Null"),
877                        Bool => Value::Bool(self.reader.read_bool()?),
878                        Int => Value::Int(self.reader.read_int()?),
879                        Float => Value::Float(self.reader.read_f64()?),
880                        Decimal => Value::Decimal(self.reader.read_decimal()?),
881                        Timestamp => Value::Timestamp(self.reader.read_timestamp()?),
882                        Symbol => Value::Symbol(self.reader.read_symbol()?),
883                        String => Value::String(self.reader.read_string()?),
884                        Clob => Value::Clob(self.reader.read_clob()?.into()),
885                        Blob => Value::Blob(self.reader.read_blob()?.into()),
886                        // It's a collection; recursively materialize all of this value's children
887                        List => Value::List(self.materialize_sequence()?),
888                        SExp => Value::SExp(self.materialize_sequence()?),
889                        Struct => Value::Struct(self.materialize_struct()?),
890                    }
891                }
892            };
893            Ok(Some(value.with_annotations(annotations)))
894        }
895
896        /// Steps into the current sequence and materializes each of its children to construct
897        /// an [`Vec<Element>`]. When all of the children have been materialized, steps out.
898        /// The reader MUST be positioned over a list or s-expression when this is called.
899        fn materialize_sequence(&mut self) -> IonResult<Sequence> {
900            let mut child_elements = Vec::new();
901            self.reader.step_in()?;
902            while let Some(element) = self.materialize_next()? {
903                child_elements.push(element);
904            }
905            self.reader.step_out()?;
906            Ok(child_elements.into())
907        }
908
909        /// Steps into the current struct and materializes each of its fields to construct
910        /// an [`Struct`]. When all of the fields have been materialized, steps out.
911        /// The reader MUST be positioned over a struct when this is called.
912        fn materialize_struct(&mut self) -> IonResult<Struct> {
913            let mut child_elements = Vec::new();
914            self.reader.step_in()?;
915            while let StreamItem::Value(_) | StreamItem::Null(_) = self.reader.next()? {
916                let field_name = self.reader.field_name()?;
917                let value = self
918                    .materialize_current()?
919                    .expect("materialize_current() returned None for user data");
920                child_elements.push((field_name, value));
921            }
922            self.reader.step_out()?;
923            Ok(Struct::from_iter(child_elements))
924        }
925    }
926}