1use delegate::delegate;
2use ion_rs_old::{Decimal, Int, IonError, IonReader, IonType, StreamItem, Symbol};
3use once_cell::sync::Lazy;
4use partiql_value::{Bag, DateTime, EdgeSpec, Graph, List, SimpleGraph, Tuple, Value, Variant};
5use regex::RegexSet;
6use rust_decimal::prelude::ToPrimitive;
7use std::collections::HashSet;
8
9use crate::boxed_ion::BoxedIonType;
10use crate::common::{
11 Encoding, BAG_ANNOT, BOXED_ION_ANNOT, DATE_ANNOT, GRAPH_ANNOT, MISSING_ANNOT,
12 RE_SET_TIME_PARTS, TIME_ANNOT, TIME_PARTS_HOUR, TIME_PARTS_MINUTE, TIME_PARTS_SECOND,
13 TIME_PARTS_TZ_HOUR, TIME_PARTS_TZ_MINUTE,
14};
15use std::num::NonZeroU8;
16use std::rc::Rc;
17use std::str::FromStr;
18use thiserror::Error;
19use time::Duration;
20
21#[derive(Error, Debug, Clone, PartialEq)]
26#[non_exhaustive]
27pub enum IonDecodeError {
28 #[error("Ion read error: `{}`", .0)]
30 IonReaderError(IonError),
31
32 #[error("Ion read error: unsupported value of type `{}`", .0)]
34 UnsupportedType(&'static str),
35
36 #[error("Ion read error: conversion error `{}`", .0)]
38 ConversionError(String),
39
40 #[error("Ion read error: stream error `{}`", .0)]
42 StreamError(String),
43
44 #[error("Ion read error: unknown error")]
46 Unknown,
47}
48
49impl From<IonError> for IonDecodeError {
50 fn from(value: IonError) -> Self {
51 IonDecodeError::IonReaderError(value)
52 }
53}
54
55impl From<rust_decimal::Error> for IonDecodeError {
56 fn from(value: rust_decimal::Error) -> Self {
57 IonDecodeError::ConversionError(format!("bad decimal conversion: `{value}`"))
58 }
59}
60
61pub type IonDecodeResult = Result<Value, IonDecodeError>;
63
64pub struct IonDecoderConfig {
66 mode: Encoding,
67}
68
69impl IonDecoderConfig {
70 #[must_use]
72 pub fn with_mode(mut self, mode: crate::Encoding) -> Self {
73 self.mode = mode;
74 self
75 }
76}
77
78impl Default for IonDecoderConfig {
79 fn default() -> Self {
80 IonDecoderConfig {
81 mode: crate::Encoding::Ion,
82 }
83 }
84}
85
86pub struct IonDecoderBuilder {
88 config: IonDecoderConfig,
89}
90
91impl IonDecoderBuilder {
92 #[must_use]
94 pub fn new(config: IonDecoderConfig) -> Self {
95 Self { config }
96 }
97
98 pub fn build<'a>(
100 self,
101 reader: impl 'a + IonReader<Item = StreamItem, Symbol = Symbol>,
102 ) -> Result<IonValueIter<'a>, IonDecodeError> {
103 let decoder = SimpleIonValueDecoder {};
104 let inner: Box<dyn Iterator<Item = IonDecodeResult>> = match self.config.mode {
105 crate::Encoding::Ion => Box::new(IonValueIterInner { reader, decoder }),
106 crate::Encoding::PartiqlEncodedAsIon => {
107 let decoder = PartiqlEncodedIonValueDecoder { inner: decoder };
108 Box::new(IonValueIterInner { reader, decoder })
109 }
110 };
111
112 Ok(IonValueIter { inner })
113 }
114}
115
116impl Default for IonDecoderBuilder {
117 fn default() -> Self {
118 Self::new(Default::default())
119 }
120}
121
122pub struct IonValueIter<'a> {
124 inner: Box<dyn Iterator<Item = IonDecodeResult> + 'a>,
125}
126
127impl Iterator for IonValueIter<'_> {
128 type Item = IonDecodeResult;
129
130 #[inline]
131 fn next(&mut self) -> Option<Self::Item> {
132 self.inner.next()
133 }
134
135 #[inline]
136 fn size_hint(&self) -> (usize, Option<usize>) {
137 self.inner.size_hint()
138 }
139}
140
141struct IonValueIterInner<D, R>
142where
143 D: IonValueDecoder<R>,
144 R: IonReader<Item = StreamItem, Symbol = Symbol>,
145{
146 reader: R,
147 decoder: D,
148}
149
150impl<D, R> Iterator for IonValueIterInner<D, R>
151where
152 D: IonValueDecoder<R>,
153 R: IonReader<Item = StreamItem, Symbol = Symbol>,
154{
155 type Item = IonDecodeResult;
156
157 fn next(&mut self) -> Option<Self::Item> {
158 match self.reader.next() {
159 Ok(StreamItem::Value(typ)) => Some(self.decoder.decode_value(&mut self.reader, typ)),
160 Ok(StreamItem::Null(_)) => Some(self.decoder.decode_null(&mut self.reader)),
161 Ok(StreamItem::Nothing) => None,
162 Err(e) => Some(Err(e.into())),
163 }
164 }
165}
166#[inline]
167fn dispatch_decode_value<R, D>(decoder: &D, reader: &mut R, typ: IonType) -> IonDecodeResult
168where
169 R: IonReader<Item = StreamItem, Symbol = Symbol>,
170 D: IonValueDecoder<R> + ?Sized,
171{
172 match typ {
173 IonType::Null => decoder.decode_null(reader),
174 IonType::Bool => decoder.decode_bool(reader),
175 IonType::Int => decoder.decode_int(reader),
176 IonType::Float => decoder.decode_float(reader),
177 IonType::Decimal => decoder.decode_decimal(reader),
178 IonType::Timestamp => decoder.decode_timestamp(reader),
179 IonType::Symbol => decoder.decode_symbol(reader),
180 IonType::String => decoder.decode_string(reader),
181 IonType::Clob => decoder.decode_clob(reader),
182 IonType::Blob => decoder.decode_blob(reader),
183 IonType::List => decoder.decode_list(reader),
184 IonType::SExp => decoder.decode_sexp(reader),
185 IonType::Struct => decoder.decode_struct(reader),
186 }
187}
188
189trait IonValueDecoder<R>
190where
191 R: IonReader<Item = StreamItem, Symbol = Symbol>,
192{
193 #[inline]
194 fn decode_value(&self, reader: &mut R, typ: IonType) -> IonDecodeResult {
195 dispatch_decode_value(self, reader, typ)
196 }
197
198 fn decode_null(&self, reader: &mut R) -> IonDecodeResult;
199 fn decode_bool(&self, reader: &mut R) -> IonDecodeResult;
200 fn decode_int(&self, reader: &mut R) -> IonDecodeResult;
201 fn decode_float(&self, reader: &mut R) -> IonDecodeResult;
202 fn decode_decimal(&self, reader: &mut R) -> IonDecodeResult;
203 fn decode_timestamp(&self, reader: &mut R) -> IonDecodeResult;
204 fn decode_symbol(&self, reader: &mut R) -> IonDecodeResult;
205 fn decode_string(&self, reader: &mut R) -> IonDecodeResult;
206 fn decode_clob(&self, reader: &mut R) -> IonDecodeResult;
207 fn decode_blob(&self, reader: &mut R) -> IonDecodeResult;
208 fn decode_list(&self, reader: &mut R) -> IonDecodeResult;
209 fn decode_sexp(&self, reader: &mut R) -> IonDecodeResult;
210 fn decode_struct(&self, reader: &mut R) -> IonDecodeResult;
211}
212
213fn ion_decimal_to_decimal(ion_dec: Decimal) -> Result<rust_decimal::Decimal, rust_decimal::Error> {
214 let ion_dec_str = format!("{ion_dec}").replace('d', "e");
217 rust_decimal::Decimal::from_str(&ion_dec_str)
218 .or_else(|_| rust_decimal::Decimal::from_scientific(&ion_dec_str))
219}
220
221struct SimpleIonValueDecoder {}
222
223impl<R> IonValueDecoder<R> for SimpleIonValueDecoder
224where
225 R: IonReader<Item = StreamItem, Symbol = Symbol>,
226{
227 #[inline]
228 fn decode_null(&self, _: &mut R) -> IonDecodeResult {
229 Ok(Value::Null)
230 }
231
232 #[inline]
233 fn decode_bool(&self, reader: &mut R) -> IonDecodeResult {
234 Ok(Value::Boolean(reader.read_bool()?))
235 }
236
237 #[inline]
238 fn decode_int(&self, reader: &mut R) -> IonDecodeResult {
239 match reader.read_int()? {
240 Int::I64(i) => Ok(Value::Integer(i)),
241 Int::BigInt(_) => Err(IonDecodeError::UnsupportedType("bigint")),
242 }
243 }
244
245 #[inline]
246 fn decode_float(&self, reader: &mut R) -> IonDecodeResult {
247 Ok(Value::Real(reader.read_f64()?.into()))
248 }
249
250 #[inline]
251 fn decode_decimal(&self, reader: &mut R) -> IonDecodeResult {
252 let dec = ion_decimal_to_decimal(reader.read_decimal()?);
253 Ok(Value::Decimal(Box::new(dec?)))
254 }
255
256 #[inline]
257 fn decode_timestamp(&self, reader: &mut R) -> IonDecodeResult {
258 let ts = reader.read_timestamp()?;
259 let offset = ts.offset();
260 let datetime = DateTime::from_ymdhms_nano_offset_minutes(
261 ts.year(),
262 NonZeroU8::new(ts.month() as u8).ok_or(IonDecodeError::ConversionError(
263 "month outside of range".into(),
264 ))?,
265 ts.day() as u8,
266 ts.hour() as u8,
267 ts.minute() as u8,
268 ts.second() as u8,
269 ts.nanoseconds(),
270 offset,
271 );
272 Ok(datetime.into())
273 }
274
275 #[inline]
276 fn decode_symbol(&self, reader: &mut R) -> IonDecodeResult {
277 Ok(Value::String(Box::new(
278 reader.read_symbol()?.text_or_error()?.to_string(),
279 )))
280 }
281
282 #[inline]
283 fn decode_string(&self, reader: &mut R) -> IonDecodeResult {
284 Ok(Value::String(Box::new(
285 reader.read_string()?.text().to_string(),
286 )))
287 }
288
289 #[inline]
290 fn decode_clob(&self, reader: &mut R) -> IonDecodeResult {
291 Ok(Value::Blob(Box::new(reader.read_clob()?.as_slice().into())))
292 }
293
294 #[inline]
295 fn decode_blob(&self, reader: &mut R) -> IonDecodeResult {
296 Ok(Value::Blob(Box::new(reader.read_blob()?.as_slice().into())))
297 }
298
299 #[inline]
300 fn decode_list(&self, reader: &mut R) -> IonDecodeResult {
301 decode_list(self, reader)
302 }
303
304 #[inline]
305 fn decode_sexp(&self, _: &mut R) -> IonDecodeResult {
306 Err(IonDecodeError::UnsupportedType("sexp"))
307 }
308
309 #[inline]
310 fn decode_struct(&self, reader: &mut R) -> IonDecodeResult {
311 decode_struct(self, reader)
312 }
313}
314
315#[inline]
316fn decode_list<R>(decoder: &impl IonValueDecoder<R>, reader: &mut R) -> IonDecodeResult
317where
318 R: IonReader<Item = StreamItem, Symbol = Symbol>,
319{
320 reader.step_in()?;
321 let mut values = vec![];
322 'values: loop {
323 let item = reader.next()?;
324 let val = match item {
325 StreamItem::Value(typ) => decoder.decode_value(reader, typ)?,
326 StreamItem::Null(_) => decoder.decode_null(reader)?,
327 StreamItem::Nothing => break 'values,
328 };
329 values.push(val);
330 }
331 reader.step_out()?;
332 Ok(List::from(values).into())
333}
334
335#[inline]
336fn decode_struct<R>(decoder: &impl IonValueDecoder<R>, reader: &mut R) -> IonDecodeResult
337where
338 R: IonReader<Item = StreamItem, Symbol = Symbol>,
339{
340 let mut tuple = Tuple::new();
341 reader.step_in()?;
342 'kv: loop {
343 let item = reader.next()?;
344 let (key, value) = match item {
345 StreamItem::Value(typ) => {
346 let field_name = reader.field_name()?;
347 (field_name, decoder.decode_value(reader, typ)?)
348 }
349 StreamItem::Null(_) => (reader.field_name()?, decoder.decode_null(reader)?),
350 StreamItem::Nothing => break 'kv,
351 };
352 tuple.insert(key.text_or_error()?, value);
353 }
354 reader.step_out()?;
355 Ok(tuple.into())
356}
357
358struct PartiqlEncodedIonValueDecoder {
359 inner: SimpleIonValueDecoder,
360}
361
362#[inline]
363fn has_annotation(
364 reader: &impl IonReader<Item = StreamItem, Symbol = Symbol>,
365 annot: &str,
366) -> bool {
367 reader.annotations().any(|a| a.is_ok_and(|a| a == annot))
368}
369
370static TIME_PARTS_PATTERN_SET: Lazy<RegexSet> =
371 Lazy::new(|| RegexSet::new(RE_SET_TIME_PARTS).unwrap());
372
373type GNode = (String, HashSet<String>, Option<Value>);
374type GNodes = (Vec<String>, Vec<HashSet<String>>, Vec<Option<Value>>);
375#[allow(clippy::type_complexity)]
376type GEdge = (
377 String,
378 HashSet<String>,
379 (String, String, String),
380 Option<Value>,
381);
382#[allow(clippy::type_complexity)]
383type GEdges = (
384 Vec<String>,
385 Vec<HashSet<String>>,
386 Vec<(String, String, String)>,
387 Vec<Option<Value>>,
388);
389
390impl PartiqlEncodedIonValueDecoder {
391 fn decode_date<R>(&self, reader: &mut R) -> IonDecodeResult
392 where
393 R: IonReader<Item = StreamItem, Symbol = Symbol>,
394 {
395 let ts = reader.read_timestamp()?;
396 let datetime = DateTime::from_ymd(
397 ts.year(),
398 NonZeroU8::new(ts.month() as u8).ok_or(IonDecodeError::ConversionError(
399 "month outside of range".into(),
400 ))?,
401 ts.day() as u8,
402 );
403 Ok(datetime.into())
404 }
405
406 fn decode_time<R>(&self, reader: &mut R) -> IonDecodeResult
407 where
408 R: IonReader<Item = StreamItem, Symbol = Symbol>,
409 {
410 fn expect_u8<R>(
411 reader: &mut R,
412 typ: Option<IonType>,
413 unit: &'static str,
414 ) -> Result<u8, IonDecodeError>
415 where
416 R: IonReader<Item = StreamItem, Symbol = Symbol>,
417 {
418 match typ {
419 Some(IonType::Int) => match reader.read_int()? {
420 Int::I64(i) => Ok(i as u8), Int::BigInt(_) => Err(IonDecodeError::ConversionError(format!(
422 "value for {unit} outside of range"
423 ))),
424 },
425 _ => Err(IonDecodeError::ConversionError(format!(
426 "value for {unit} unexpected type"
427 ))),
428 }
429 }
430 fn maybe_i8<R>(
431 reader: &mut R,
432 typ: Option<IonType>,
433 unit: &'static str,
434 ) -> Result<Option<i8>, IonDecodeError>
435 where
436 R: IonReader<Item = StreamItem, Symbol = Symbol>,
437 {
438 match typ {
439 Some(IonType::Int) => match reader.read_int()? {
440 Int::I64(i) => Ok(Some(i as i8)), Int::BigInt(_) => Err(IonDecodeError::ConversionError(format!(
442 "value for {unit} outside of range"
443 ))),
444 },
445 None => Ok(None),
446 Some(IonType::Null) => Ok(None),
447 _ => Err(IonDecodeError::ConversionError(format!(
448 "value for {unit} unexpected type {typ:?}"
449 ))),
450 }
451 }
452 fn expect_f64<R>(
453 reader: &mut R,
454 typ: Option<IonType>,
455 unit: &'static str,
456 ) -> Result<f64, IonDecodeError>
457 where
458 R: IonReader<Item = StreamItem, Symbol = Symbol>,
459 {
460 match typ {
461 Some(IonType::Decimal) => {
462 let dec = ion_decimal_to_decimal(reader.read_decimal()?);
463 Ok(dec?.to_f64().unwrap_or(0f64))
464 }
465 Some(IonType::Float) => Ok(reader.read_f64()?),
466 _ => Err(IonDecodeError::ConversionError(format!(
467 "value for {unit} unexpected type"
468 ))),
469 }
470 }
471
472 #[derive(Default)]
473 struct TimeParts {
474 pub hour: Option<u8>,
475 pub minute: Option<u8>,
476 pub second: Option<f64>,
477 pub tz_hour: Option<i8>,
478 pub tz_minute: Option<i8>,
479 }
480
481 let mut time = TimeParts::default();
482 let patterns: &RegexSet = &TIME_PARTS_PATTERN_SET;
483
484 reader.step_in()?;
485 #[allow(irrefutable_let_patterns)]
486 while let item = reader.next()? {
487 let (key, typ) = match item {
488 StreamItem::Value(typ) => (reader.field_name()?, Some(typ)),
489 StreamItem::Null(_) => (reader.field_name()?, None),
490 StreamItem::Nothing => break,
491 };
492 let matches = patterns.matches(key.text_or_error()?);
493 match matches.into_iter().next() {
494 Some(TIME_PARTS_HOUR) => time.hour = Some(expect_u8(reader, typ, "hour")?),
495 Some(TIME_PARTS_MINUTE) => time.minute = Some(expect_u8(reader, typ, "minute")?),
496 Some(TIME_PARTS_SECOND) => time.second = Some(expect_f64(reader, typ, "second")?),
497 Some(TIME_PARTS_TZ_HOUR) => time.tz_hour = maybe_i8(reader, typ, "tz_hour")?,
498 Some(TIME_PARTS_TZ_MINUTE) => time.tz_minute = maybe_i8(reader, typ, "tz_minute")?,
499 _ => {
500 return Err(IonDecodeError::ConversionError(
501 "unexpected field name for time".to_string(),
502 ))
503 }
504 }
505 }
506 reader.step_out()?;
507
508 let hour = time.hour.ok_or_else(|| {
509 IonDecodeError::ConversionError("expected `hour` key for DateTime".into())
510 })?;
511 let minute = time.minute.ok_or_else(|| {
512 IonDecodeError::ConversionError("expected `minute` key for DateTime".into())
513 })?;
514 let second = time.second.ok_or_else(|| {
515 IonDecodeError::ConversionError("expected `second` key for DateTime".into())
516 })?;
517 let seconds = Duration::seconds_f64(second);
518 let datetime = DateTime::from_hms_nano_tz(
519 hour,
520 minute,
521 seconds.whole_seconds() as u8,
522 seconds.subsec_nanoseconds() as u32,
523 time.tz_hour,
524 time.tz_minute,
525 );
526 Ok(datetime.into())
527 }
528
529 fn decode_boxed<R>(&self, reader: &mut R) -> IonDecodeResult
530 where
531 R: IonReader<Item = StreamItem, Symbol = Symbol>,
532 {
533 let annot: Vec<_> = reader
534 .annotations()
535 .skip(1) .filter_map(|s| s.ok().and_then(|s| s.text().map(|s| s.to_string())))
537 .collect();
538 let mut loader = ion_elt::ElementLoader::for_reader(reader);
539 let elt = loader.materialize_current()?.unwrap();
540 let elt = elt.with_annotations(annot);
541
542 let ion_ctor = Box::new(BoxedIonType {});
543 let contents = elt.to_string();
544 Ok(Value::from(
545 Variant::new(contents, ion_ctor)
546 .map_err(|e| IonDecodeError::StreamError(e.to_string()))?,
547 ))
548 }
549
550 fn decode_graph<R>(&self, reader: &mut R) -> IonDecodeResult
551 where
552 R: IonReader<Item = StreamItem, Symbol = Symbol>,
553 {
554 let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
555 let mut nodes = None;
556 let mut edges = None;
557 reader.step_in()?;
558 'kv: loop {
559 match reader.next()? {
560 StreamItem::Value(typ) => match typ {
561 IonType::List => match reader.field_name()?.text_or_error()? {
562 "nodes" => nodes = Some(self.decode_nodes(reader)?),
563 "edges" => edges = Some(self.decode_edges(reader)?),
564 _ => return Err(err()),
565 },
566 _ => return Err(err()),
567 },
568 StreamItem::Null(_) => return Err(err()),
569 StreamItem::Nothing => break 'kv,
570 }
571 }
572 reader.step_out()?;
573
574 let nodes = nodes.ok_or_else(err)?;
575 let (ids, labels, ends, payloads) = edges.ok_or_else(err)?;
576 let edge_specs = ends
577 .into_iter()
578 .map(|(l, dir, r)| match dir.as_str() {
579 "->" => Ok(EdgeSpec::Directed(l, r)),
580 "<-" => Ok(EdgeSpec::Directed(r, l)),
581 "--" => Ok(EdgeSpec::Undirected(l, r)),
582 _ => Err(err()),
583 })
584 .collect::<Result<Vec<EdgeSpec>, _>>()?;
585 Ok(Value::Graph(Box::new(Graph::Simple(Rc::new(
586 SimpleGraph::from_spec(nodes, (ids, labels, edge_specs, payloads)),
587 )))))
588 }
589
590 fn decode_nodes<R>(&self, reader: &mut R) -> Result<GNodes, IonDecodeError>
591 where
592 R: IonReader<Item = StreamItem, Symbol = Symbol>,
593 {
594 let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
595 reader.step_in()?;
596 let mut ids = vec![];
597 let mut labels = vec![];
598 let mut payloads = vec![];
599 'values: loop {
600 let item = reader.next()?;
601 match item {
602 StreamItem::Nothing => break 'values,
603 StreamItem::Value(IonType::Struct) => {
604 let (id, labelset, payload) = self.decode_node(reader)?;
605 ids.push(id);
606 labels.push(labelset);
607 payloads.push(payload);
608 }
609 _ => return Err(err()),
610 }
611 }
612 reader.step_out()?;
613 Ok((ids, labels, payloads))
614 }
615
616 fn decode_node<R>(&self, reader: &mut R) -> Result<GNode, IonDecodeError>
617 where
618 R: IonReader<Item = StreamItem, Symbol = Symbol>,
619 {
620 let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
621 let mut id = None;
622 let mut labels = None;
623 let mut payload = None;
624 reader.step_in()?;
625 'kv: loop {
626 let item = reader.next()?;
627 if item == StreamItem::Nothing {
628 break 'kv;
629 }
630 let fname = reader.field_name()?;
631 let fname = fname.text_or_error()?;
632 match (fname, item) {
633 ("id", StreamItem::Value(IonType::Symbol)) => {
634 id = Some(reader.read_symbol()?.text_or_error()?.to_string());
635 }
636 ("labels", StreamItem::Value(IonType::List)) => {
637 let mut labelset = HashSet::new();
638 reader.step_in()?;
639 #[allow(irrefutable_let_patterns)]
640 while let item = reader.next()? {
641 match item {
642 StreamItem::Value(IonType::String) => {
643 labelset.insert(reader.read_string()?.text().to_string());
644 }
645 StreamItem::Nothing => break,
646 _ => return Err(err()),
647 }
648 }
649 reader.step_out()?;
650 labels = Some(labelset);
651 }
652 ("payload", StreamItem::Value(typ)) => {
653 payload = Some(self.decode_value(reader, typ)?);
654 }
655 _ => return Err(err()),
656 }
657 }
658 reader.step_out()?;
659
660 let id = id.ok_or_else(err)?;
661 let labels = labels.unwrap_or_else(Default::default);
662 Ok((id, labels, payload))
663 }
664
665 fn decode_edges<R>(&self, reader: &mut R) -> Result<GEdges, IonDecodeError>
666 where
667 R: IonReader<Item = StreamItem, Symbol = Symbol>,
668 {
669 let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
670 reader.step_in()?;
671 let mut ids = vec![];
672 let mut labels = vec![];
673 let mut ends = vec![];
674 let mut payloads = vec![];
675 'values: loop {
676 let item = reader.next()?;
677 match item {
678 StreamItem::Nothing => break 'values,
679 StreamItem::Value(IonType::Struct) => {
680 let (id, labelset, end, payload) = self.decode_edge(reader)?;
681 ids.push(id);
682 labels.push(labelset);
683 ends.push(end);
684 payloads.push(payload);
685 }
686 _ => return Err(err()),
687 }
688 }
689 reader.step_out()?;
690 Ok((ids, labels, ends, payloads))
691 }
692
693 fn decode_edge<R>(&self, reader: &mut R) -> Result<GEdge, IonDecodeError>
694 where
695 R: IonReader<Item = StreamItem, Symbol = Symbol>,
696 {
697 let err = || IonDecodeError::ConversionError("Invalid graph specified".into());
698 let mut id = None;
699 let mut labels = None;
700 let mut ends = None;
701 let mut payload = None;
702 reader.step_in()?;
703 'kv: loop {
704 let item = reader.next()?;
705 if item == StreamItem::Nothing {
706 break 'kv;
707 }
708 let fname = reader.field_name()?;
709 let fname = fname.text_or_error()?;
710 match (fname, item) {
711 ("id", StreamItem::Value(IonType::Symbol)) => {
712 id = Some(reader.read_symbol()?.text_or_error()?.to_string());
713 }
714 ("labels", StreamItem::Value(IonType::List)) => {
715 let mut labelset = HashSet::new();
716 reader.step_in()?;
717 #[allow(irrefutable_let_patterns)]
718 while let item = reader.next()? {
719 match item {
720 StreamItem::Value(IonType::String) => {
721 labelset.insert(reader.read_string()?.text().to_string());
722 }
723 StreamItem::Nothing => break,
724 _ => return Err(err()),
725 }
726 }
727 reader.step_out()?;
728 labels = Some(labelset);
729 }
730 ("ends", StreamItem::Value(IonType::SExp)) => {
731 reader.step_in()?;
732 reader.next()?;
733 let l = reader.read_symbol()?.text_or_error()?.to_string();
734 reader.next()?;
735 let dir = reader.read_symbol()?.text_or_error()?.to_string();
736 reader.next()?;
737 let r = reader.read_symbol()?.text_or_error()?.to_string();
738 reader.step_out()?;
739 ends = Some((l, dir, r));
740 }
741 ("payload", StreamItem::Value(typ)) => {
742 payload = Some(self.decode_value(reader, typ)?);
743 }
744 _ => return Err(err()),
745 }
746 }
747 reader.step_out()?;
748
749 let id = id.ok_or_else(err)?;
750 let labels = labels.unwrap_or_else(Default::default);
751 let ends = ends.ok_or_else(err)?;
752 Ok((id, labels, ends, payload))
753 }
754}
755
756impl<R> IonValueDecoder<R> for PartiqlEncodedIonValueDecoder
757where
758 R: IonReader<Item = StreamItem, Symbol = Symbol>,
759{
760 fn decode_value(&self, reader: &mut R, typ: IonType) -> IonDecodeResult {
761 if has_annotation(reader, BOXED_ION_ANNOT) {
762 self.decode_boxed(reader)
763 } else {
764 dispatch_decode_value(self, reader, typ)
765 }
766 }
767
768 #[inline]
769 fn decode_null(&self, reader: &mut R) -> IonDecodeResult {
770 if has_annotation(reader, BOXED_ION_ANNOT) {
771 self.decode_boxed(reader)
772 } else if has_annotation(reader, MISSING_ANNOT) {
773 Ok(Value::Missing)
774 } else {
775 Ok(Value::Null)
776 }
777 }
778
779 #[inline]
780 fn decode_timestamp(&self, reader: &mut R) -> IonDecodeResult {
781 if has_annotation(reader, DATE_ANNOT) {
782 self.decode_date(reader)
783 } else {
784 self.inner.decode_timestamp(reader)
785 }
786 }
787
788 #[inline]
789 fn decode_list(&self, reader: &mut R) -> IonDecodeResult {
790 let is_bag = has_annotation(reader, BAG_ANNOT);
791 let list = decode_list(self, reader);
792 if is_bag {
793 Ok(Bag::from(list?.coerce_into_list()).into())
794 } else {
795 list
796 }
797 }
798
799 #[inline]
800 fn decode_struct(&self, reader: &mut R) -> IonDecodeResult {
801 if has_annotation(reader, TIME_ANNOT) {
802 self.decode_time(reader)
803 } else if has_annotation(reader, GRAPH_ANNOT) {
804 self.decode_graph(reader)
805 } else {
806 decode_struct(self, reader)
807 }
808 }
809
810 delegate! {
811 to self.inner {
812 fn decode_bool(&self, reader: &mut R) -> IonDecodeResult;
813 fn decode_int(&self, reader: &mut R) -> IonDecodeResult;
814 fn decode_float(&self, reader: &mut R) -> IonDecodeResult;
815 fn decode_decimal(&self, reader: &mut R) -> IonDecodeResult;
816 fn decode_symbol(&self, reader: &mut R) -> IonDecodeResult;
817 fn decode_string(&self, reader: &mut R) -> IonDecodeResult;
818 fn decode_clob(&self, reader: &mut R) -> IonDecodeResult;
819 fn decode_blob(&self, reader: &mut R) -> IonDecodeResult;
820 fn decode_sexp(&self, reader: &mut R) -> IonDecodeResult;
821 }
822 }
823}
824
825mod ion_elt {
828 use ion_rs_old::element::{Element, IntoAnnotatedElement, Sequence, Struct, Value};
829 use ion_rs_old::{IonReader, IonResult, StreamItem, Symbol};
830
831 pub(crate) struct ElementLoader<'a, R: ?Sized> {
834 reader: &'a mut R,
835 }
836
837 impl<'a, R: IonReader<Item = StreamItem, Symbol = Symbol> + ?Sized> ElementLoader<'a, R> {
838 pub(crate) fn for_reader(reader: &'a mut R) -> ElementLoader<'a, R> {
839 ElementLoader { reader }
840 }
841
842 pub(crate) fn materialize_next(&mut self) -> IonResult<Option<Element>> {
845 let _ = self.reader.next()?;
847 self.materialize_current()
848 }
849
850 pub(crate) fn materialize_current(&mut self) -> IonResult<Option<Element>> {
855 let mut annotations = Vec::new();
858 if self.reader.has_annotations() {
862 for annotation in self.reader.annotations() {
863 annotations.push(annotation?);
864 }
865 }
866
867 let value = match self.reader.current() {
868 StreamItem::Nothing => return Ok(None),
870 StreamItem::Null(ion_type) => Value::Null(ion_type),
872 StreamItem::Value(ion_type) => {
874 use ion_rs_old::IonType::*;
875 match ion_type {
876 Null => unreachable!("non-null value had IonType::Null"),
877 Bool => Value::Bool(self.reader.read_bool()?),
878 Int => Value::Int(self.reader.read_int()?),
879 Float => Value::Float(self.reader.read_f64()?),
880 Decimal => Value::Decimal(self.reader.read_decimal()?),
881 Timestamp => Value::Timestamp(self.reader.read_timestamp()?),
882 Symbol => Value::Symbol(self.reader.read_symbol()?),
883 String => Value::String(self.reader.read_string()?),
884 Clob => Value::Clob(self.reader.read_clob()?.into()),
885 Blob => Value::Blob(self.reader.read_blob()?.into()),
886 List => Value::List(self.materialize_sequence()?),
888 SExp => Value::SExp(self.materialize_sequence()?),
889 Struct => Value::Struct(self.materialize_struct()?),
890 }
891 }
892 };
893 Ok(Some(value.with_annotations(annotations)))
894 }
895
896 fn materialize_sequence(&mut self) -> IonResult<Sequence> {
900 let mut child_elements = Vec::new();
901 self.reader.step_in()?;
902 while let Some(element) = self.materialize_next()? {
903 child_elements.push(element);
904 }
905 self.reader.step_out()?;
906 Ok(child_elements.into())
907 }
908
909 fn materialize_struct(&mut self) -> IonResult<Struct> {
913 let mut child_elements = Vec::new();
914 self.reader.step_in()?;
915 while let StreamItem::Value(_) | StreamItem::Null(_) = self.reader.next()? {
916 let field_name = self.reader.field_name()?;
917 let value = self
918 .materialize_current()?
919 .expect("materialize_current() returned None for user data");
920 child_elements.push((field_name, value));
921 }
922 self.reader.step_out()?;
923 Ok(Struct::from_iter(child_elements))
924 }
925 }
926}