use std::io;
use std::io::Read;
use std::ops::Range;
use delegate::delegate;
use crate::binary::constants::v1_0::IVM;
use crate::constants::v1_0::system_symbol_ids;
use crate::data_source::ToIonDataSource;
use crate::raw_reader::{RawReader, RawStreamItem};
use crate::raw_symbol_token::RawSymbolToken;
use crate::result::{decoding_error, decoding_error_raw, IonResult};
use crate::stream_reader::IonReader;
use crate::symbol::Symbol;
use crate::symbol_table::SymbolTable;
use crate::types::decimal::Decimal;
use crate::types::integer::Integer;
use crate::types::timestamp::Timestamp;
use crate::{IonType, RawBinaryReader, RawTextReader};
use std::fmt::{Display, Formatter};
pub struct ReaderBuilder {}
impl ReaderBuilder {
pub fn new() -> ReaderBuilder {
ReaderBuilder {
}
}
pub fn build<'a, I: 'a + ToIonDataSource>(self, input: I) -> IonResult<Reader<'a>> {
let mut input = input.to_ion_data_source();
let mut header: [u8; 4] = [0u8; 4];
let mut total_bytes_read = 0usize;
while total_bytes_read < IVM.len() {
let bytes_read = input.read(&mut header[total_bytes_read..])?;
if bytes_read == 0 {
let owned_header = Vec::from(&header[..total_bytes_read]);
return Self::make_text_reader(owned_header);
}
total_bytes_read += bytes_read;
}
match header {
[0xe0, 0x01, 0x00, 0xea] => {
let full_input = io::Cursor::new(header).chain(input);
Ok(Self::make_binary_reader(full_input))
}
[0xe0, major, minor, 0xea] => {
decoding_error(format!(
"cannot read Ion v{}.{}; only v1.0 is supported",
major, minor
))
}
_ => {
let full_input = io::Cursor::new(header).chain(input);
Ok(Self::make_text_reader(full_input)?)
}
}
}
fn make_text_reader<'a, I: 'a + ToIonDataSource>(data: I) -> IonResult<Reader<'a>> {
let raw_reader = Box::new(RawTextReader::new(data)?);
Ok(Reader {
raw_reader,
symbol_table: SymbolTable::new(),
})
}
fn make_binary_reader<'a, I: 'a + ToIonDataSource>(data: I) -> Reader<'a> {
let raw_reader = Box::new(RawBinaryReader::new(data.to_ion_data_source()));
Reader {
raw_reader,
symbol_table: SymbolTable::new(),
}
}
}
impl Default for ReaderBuilder {
fn default() -> Self {
ReaderBuilder::new()
}
}
pub type Reader<'a> = UserReader<Box<dyn RawReader + 'a>>;
pub struct UserReader<R: RawReader> {
raw_reader: R,
symbol_table: SymbolTable,
}
impl<R: RawReader> UserReader<R> {
pub(crate) fn new(raw_reader: R) -> UserReader<R> {
UserReader {
raw_reader,
symbol_table: SymbolTable::new(),
}
}
}
#[derive(Eq, PartialEq, Debug)]
pub enum StreamItem {
Value(IonType),
Null(IonType),
Nothing,
}
impl StreamItem {
pub fn nullable_value(ion_type: IonType, is_null: bool) -> StreamItem {
if is_null {
StreamItem::Null(ion_type)
} else {
StreamItem::Value(ion_type)
}
}
}
impl Display for StreamItem {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
use StreamItem::*;
match self {
Value(ion_type) => write!(f, "{}", ion_type),
Null(ion_type) => write!(f, "null.{}", ion_type),
Nothing => Ok(()),
}
}
}
impl<R: RawReader> UserReader<R> {
pub fn read_raw_symbol(&mut self) -> IonResult<RawSymbolToken> {
self.raw_reader.read_symbol()
}
pub fn raw_field_name_token(&mut self) -> IonResult<RawSymbolToken> {
self.raw_reader.field_name()
}
fn read_symbol_table(&mut self) -> IonResult<()> {
self.raw_reader.step_in()?;
let mut is_append = false;
let mut new_symbols = vec![];
let mut has_found_symbols_field = false;
let mut has_found_imports_field = false;
loop {
let ion_type = match self.raw_reader.next()? {
RawStreamItem::Value(ion_type) => ion_type,
RawStreamItem::Null(_) => continue,
RawStreamItem::Nothing => break,
RawStreamItem::VersionMarker(major, minor) => {
return decoding_error(format!(
"encountered Ion version marker for v{}.{} in symbol table",
major, minor
))
}
};
let field_id = self
.raw_reader
.field_name()
.expect("No field ID found inside $ion_symbol_table struct.");
match (field_id, ion_type) {
(symbol, IonType::List)
if symbol.matches(system_symbol_ids::IMPORTS, "imports") =>
{
return decoding_error("importing shared symbol tables is not yet supported");
}
(symbol, IonType::Symbol)
if symbol.matches(system_symbol_ids::IMPORTS, "imports") =>
{
if has_found_imports_field {
return decoding_error("symbol table had multiple 'imports' fields");
}
has_found_imports_field = true;
let import_symbol = self.raw_reader.read_symbol()?;
if !import_symbol.matches(3, "$ion_symbol_table") {
continue;
}
is_append = true;
}
(symbol, IonType::List)
if symbol.matches(system_symbol_ids::SYMBOLS, "symbols") =>
{
if has_found_symbols_field {
return decoding_error("symbol table had multiple 'symbols' fields");
}
has_found_symbols_field = true;
self.raw_reader.step_in()?;
loop {
use RawStreamItem::*;
match self.raw_reader.next()? {
Value(IonType::String) => {
new_symbols.push(Some(self.raw_reader.read_string()?));
}
Value(_) | Null(_) => {
new_symbols.push(None);
}
VersionMarker(_, _) => {
return decoding_error("Found IVM in symbol table.")
}
Nothing => break,
}
}
self.raw_reader.step_out()?;
}
something_else => {
unimplemented!("No support for {:?}", something_else);
}
}
}
if is_append {
for maybe_text in new_symbols.drain(..) {
let _sid = self.symbol_table.intern_or_add_placeholder(maybe_text);
}
} else {
self.symbol_table.reset();
for maybe_text in new_symbols.drain(..) {
let _sid = self.symbol_table.intern_or_add_placeholder(maybe_text);
}
}
self.raw_reader.step_out()?;
Ok(())
}
fn raw_annotations(&mut self) -> impl Iterator<Item = RawSymbolToken> + '_ {
self.raw_reader.annotations().map(|a| a.unwrap())
}
pub fn symbol_table(&self) -> &SymbolTable {
&self.symbol_table
}
}
impl<R: RawReader> IonReader for UserReader<R> {
type Item = StreamItem;
type Symbol = Symbol;
fn current(&self) -> Self::Item {
if let Some(ion_type) = self.ion_type() {
return if self.is_null() {
StreamItem::Null(ion_type)
} else {
StreamItem::Value(ion_type)
};
}
StreamItem::Nothing
}
#[allow(clippy::should_implement_trait)]
fn next(&mut self) -> IonResult<Self::Item> {
use RawStreamItem::*;
loop {
match self.raw_reader.next()? {
VersionMarker(1, 0) => {
self.symbol_table.reset();
}
VersionMarker(major, minor) => {
return decoding_error(format!(
"Encountered a version marker for v{}.{}, but only v1.0 is supported.",
major, minor
));
}
Value(IonType::Struct) => {
if self.raw_reader.depth() == 0 {
let is_symtab = match self.raw_reader.annotations().next() {
Some(Err(error)) => return Err(error),
Some(Ok(symbol))
if symbol.matches(
system_symbol_ids::ION_SYMBOL_TABLE,
"$ion_symbol_table",
) =>
{
true
}
_ => false,
};
if is_symtab {
self.read_symbol_table()?;
continue;
}
}
return Ok(StreamItem::Value(IonType::Struct));
}
Value(ion_type) => return Ok(StreamItem::Value(ion_type)),
Null(ion_type) => return Ok(StreamItem::Null(ion_type)),
Nothing => return Ok(StreamItem::Nothing),
}
}
}
fn field_name(&self) -> IonResult<Self::Symbol> {
match self.raw_reader.field_name()? {
RawSymbolToken::SymbolId(sid) => {
self.symbol_table.symbol_for(sid).cloned().ok_or_else(|| {
decoding_error_raw(format!("encountered field ID with unknown text: ${}", sid))
})
}
RawSymbolToken::Text(text) => Ok(Symbol::owned(text)),
}
}
fn annotations<'a>(&'a self) -> Box<dyn Iterator<Item = IonResult<Self::Symbol>> + 'a> {
let iterator = self
.raw_reader
.annotations()
.map(move |raw_token| match raw_token? {
RawSymbolToken::SymbolId(sid) => {
self.symbol_table.symbol_for(sid).cloned().ok_or_else(|| {
decoding_error_raw(format!(
"found annotation ID with unknown text: ${}",
sid
))
})
}
RawSymbolToken::Text(text) => Ok(Symbol::owned(text)),
});
Box::new(iterator)
}
fn read_symbol(&mut self) -> IonResult<Self::Symbol> {
match self.raw_reader.read_symbol()? {
RawSymbolToken::SymbolId(symbol_id) => {
if let Some(symbol) = self.symbol_table.symbol_for(symbol_id) {
Ok(symbol.clone())
} else {
decoding_error(format!(
"Found symbol ID ${}, which is not defined.",
symbol_id
))
}
}
RawSymbolToken::Text(text) => Ok(Symbol::owned(text)),
}
}
delegate! {
to self.raw_reader {
fn is_null(&self) -> bool;
fn ion_version(&self) -> (u8, u8);
fn ion_type(&self) -> Option<IonType>;
fn read_null(&mut self) -> IonResult<IonType>;
fn read_bool(&mut self) -> IonResult<bool>;
fn read_integer(&mut self) -> IonResult<Integer>;
fn read_i64(&mut self) -> IonResult<i64>;
fn read_f32(&mut self) -> IonResult<f32>;
fn read_f64(&mut self) -> IonResult<f64>;
fn read_decimal(&mut self) -> IonResult<Decimal>;
fn read_string(&mut self) -> IonResult<String>;
fn map_string<F, U>(&mut self, f: F) -> IonResult<U> where F: FnOnce(&str) -> U;
fn map_string_bytes<F, U>(&mut self, f: F) -> IonResult<U> where F: FnOnce(&[u8]) -> U;
fn read_blob(&mut self) -> IonResult<Vec<u8>>;
fn map_blob<F, U>(&mut self, f: F) -> IonResult<U> where F: FnOnce(&[u8]) -> U;
fn read_clob(&mut self) -> IonResult<Vec<u8>>;
fn map_clob<F, U>(&mut self, f: F) -> IonResult<U> where F: FnOnce(&[u8]) -> U;
fn read_timestamp(&mut self) -> IonResult<Timestamp>;
fn step_in(&mut self) -> IonResult<()>;
fn step_out(&mut self) -> IonResult<()>;
fn parent_type(&self) -> Option<IonType>;
fn depth(&self) -> usize;
}
}
}
impl<T: AsRef<[u8]>> UserReader<RawBinaryReader<io::Cursor<T>>> {
delegate! {
to self.raw_reader {
pub fn raw_bytes(&self) -> Option<&[u8]>;
pub fn raw_field_id_bytes(&self) -> Option<&[u8]>;
pub fn raw_header_bytes(&self) -> Option<&[u8]>;
pub fn raw_value_bytes(&self) -> Option<&[u8]>;
pub fn raw_annotations_bytes(&self) -> Option<&[u8]>;
pub fn field_id_length(&self) -> Option<usize>;
pub fn field_id_offset(&self) -> Option<usize>;
pub fn field_id_range(&self) -> Option<Range<usize>>;
pub fn annotations_length(&self) -> Option<usize>;
pub fn annotations_offset(&self) -> Option<usize>;
pub fn annotations_range(&self) -> Option<Range<usize>>;
pub fn header_length(&self) -> usize;
pub fn header_offset(&self) -> usize;
pub fn header_range(&self) -> Range<usize>;
pub fn value_length(&self) -> usize;
pub fn value_offset(&self) -> usize;
pub fn value_range(&self) -> Range<usize>;
}
}
}
#[cfg(test)]
mod tests {
use std::io;
use super::*;
use crate::binary::constants::v1_0::IVM;
use crate::binary::raw_binary_reader::RawBinaryReader;
use crate::result::IonResult;
use crate::types::IonType;
use crate::StreamItem::Value;
type TestDataSource = io::Cursor<Vec<u8>>;
fn ion_data(bytes: &[u8]) -> Vec<u8> {
let mut data = Vec::new();
data.extend_from_slice(&IVM);
data.extend_from_slice(bytes);
data
}
fn data_source_for(bytes: &[u8]) -> TestDataSource {
let data = ion_data(bytes);
io::Cursor::new(data)
}
fn raw_binary_reader_for(bytes: &[u8]) -> RawBinaryReader<TestDataSource> {
use RawStreamItem::*;
let mut raw_reader = RawBinaryReader::new(data_source_for(bytes));
assert_eq!(raw_reader.ion_type(), None);
assert_eq!(raw_reader.next(), Ok(VersionMarker(1, 0)));
assert_eq!(raw_reader.ion_version(), (1u8, 0u8));
raw_reader
}
fn ion_reader_for(bytes: &[u8]) -> Reader {
ReaderBuilder::new().build(ion_data(bytes)).unwrap()
}
const EXAMPLE_STREAM: &[u8] = &[
0xEE, 0x92, 0x81, 0x83, 0xDE, 0x8E, 0x87, 0xBC, 0x83, 0x66, 0x6f, 0x6f, 0x83, 0x62, 0x61, 0x72, 0x83, 0x62, 0x61, 0x7a, 0xD9, 0x8A, 0x21, 0x01, 0x8B, 0x21, 0x02, 0x8C, 0x21, 0x03, ];
#[test]
fn test_read_struct() -> IonResult<()> {
let mut reader = ion_reader_for(EXAMPLE_STREAM);
assert_eq!(Value(IonType::Struct), reader.next()?);
reader.step_in()?;
assert_eq!(reader.next()?, Value(IonType::Integer));
assert_eq!(reader.field_name()?, "foo");
assert_eq!(reader.next()?, Value(IonType::Integer));
assert_eq!(reader.field_name()?, "bar");
assert_eq!(reader.next()?, Value(IonType::Integer));
assert_eq!(reader.field_name()?, "baz");
Ok(())
}
}