record_query/value/
avro.rs

1use crate::error;
2use crate::value;
3use avro_rs;
4use std;
5use std::fmt;
6use std::io;
7
8pub struct Source<'a, R>(avro_rs::Reader<'a, R>)
9where
10    R: io::Read;
11
12pub struct Sink<'a, W>(avro_rs::Writer<'a, W>)
13where
14    W: io::Write;
15
16#[inline]
17pub fn source<'a, R>(r: R) -> error::Result<Source<'a, R>>
18where
19    R: io::Read,
20{
21    Ok(Source(avro_rs::Reader::new(r).map_err(|e| {
22        error::Error::Avro(error::Avro::downcast(e))
23    })?))
24}
25
26#[inline]
27pub fn sink<W>(schema: &avro_rs::Schema, w: W, codec: avro_rs::Codec) -> error::Result<Sink<W>>
28where
29    W: io::Write,
30{
31    Ok(Sink(avro_rs::Writer::with_codec(schema, w, codec)))
32}
33
34impl<'a, R> value::Source for Source<'a, R>
35where
36    R: io::Read,
37{
38    #[inline]
39    fn read(&mut self) -> error::Result<Option<value::Value>> {
40        match self.0.next() {
41            Some(Ok(v)) => Ok(Some(value_from_avro(v))),
42            Some(Err(e)) => Err(error::Error::Avro(error::Avro::downcast(e))),
43            None => Ok(None),
44        }
45    }
46}
47
48fn value_from_avro(value: avro_rs::types::Value) -> value::Value {
49    use avro_rs::types::Value;
50    match value {
51        Value::Null => value::Value::Unit,
52        Value::Boolean(v) => value::Value::Bool(v),
53        Value::Int(v) => value::Value::I32(v),
54        Value::Long(v) => value::Value::I64(v),
55        Value::Float(v) => value::Value::from_f32(v),
56        Value::Double(v) => value::Value::from_f64(v),
57        Value::Bytes(v) | Value::Fixed(_, v) => value::Value::Bytes(v),
58        Value::String(v) | Value::Enum(_, v) => value::Value::String(v),
59        Value::Union(boxed) => value_from_avro(*boxed),
60        Value::Array(v) => value::Value::Sequence(v.into_iter().map(value_from_avro).collect()),
61        Value::Map(v) => value::Value::Map(
62            v.into_iter()
63                .map(|(k, v)| (value::Value::String(k), value_from_avro(v)))
64                .collect(),
65        ),
66        Value::Record(v) => value::Value::Map(
67            v.into_iter()
68                .map(|(k, v)| (value::Value::String(k), value_from_avro(v)))
69                .collect(),
70        ),
71    }
72}
73
74impl<'a, W> value::Sink for Sink<'a, W>
75where
76    W: io::Write,
77{
78    #[inline]
79    fn write(&mut self, value: value::Value) -> error::Result<()> {
80        self.0
81            .append(value_to_avro(value)?)
82            .map_err(|e| error::Error::Avro(error::Avro::downcast(e)))?;
83        Ok(())
84    }
85}
86
87fn value_to_avro(value: value::Value) -> error::Result<avro_rs::types::Value> {
88    use avro_rs::types::Value;
89    use std::convert::TryFrom;
90    match value {
91        value::Value::Unit => Ok(Value::Null),
92        value::Value::Bool(v) => Ok(Value::Boolean(v)),
93
94        value::Value::I8(v) => Ok(Value::Int(i32::from(v))),
95        value::Value::I16(v) => Ok(Value::Int(i32::from(v))),
96        value::Value::I32(v) => Ok(Value::Int(v)),
97        value::Value::I64(v) => Ok(Value::Long(v)),
98
99        value::Value::U8(v) => Ok(Value::Int(i32::from(v))),
100        value::Value::U16(v) => Ok(Value::Int(i32::from(v))),
101        value::Value::U32(v) => Ok(Value::Long(i64::from(v))),
102        value::Value::U64(v) => {
103            if let Ok(v) = i64::try_from(v) {
104                Ok(Value::Long(v))
105            } else {
106                Err(error::Error::Format {
107                    msg: format!(
108                        "Avro output does not support unsigned 64 bit integer: {}",
109                        v
110                    ),
111                })
112            }
113        }
114
115        value::Value::F32(ordered_float::OrderedFloat(v)) => Ok(Value::Float(v)),
116        value::Value::F64(ordered_float::OrderedFloat(v)) => Ok(Value::Double(v)),
117
118        value::Value::Char(v) => Ok(Value::String(format!("{}", v))),
119        value::Value::String(v) => Ok(Value::String(v)),
120        value::Value::Bytes(v) => Ok(Value::Bytes(v)),
121
122        value::Value::Sequence(v) => Ok(Value::Array(
123            v.into_iter()
124                .map(value_to_avro)
125                .collect::<error::Result<Vec<_>>>()?,
126        )),
127        value::Value::Map(v) => Ok(Value::Record(
128            v.into_iter()
129                .map(|(k, v)| match (value_to_string(k), value_to_avro(v)) {
130                    (Ok(k), Ok(v)) => Ok((k, v)),
131                    (Ok(_), Err(e)) | (Err(e), Ok(_)) | (Err(_), Err(e)) => Err(e),
132                })
133                .collect::<error::Result<Vec<_>>>()?,
134        )),
135    }
136}
137
138fn value_to_string(value: value::Value) -> error::Result<String> {
139    match value {
140        value::Value::Char(v) => Ok(format!("{}", v)),
141        value::Value::String(v) => Ok(v),
142        x => Err(error::Error::Format {
143            msg: format!("Avro can only output string keys, got: {:?}", x),
144        }),
145    }
146}
147
148impl<'a, R> fmt::Debug for Source<'a, R>
149where
150    R: io::Read,
151{
152    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153        f.debug_struct("AvroSource").finish()
154    }
155}
156
157impl<'a, W> fmt::Debug for Sink<'a, W>
158where
159    W: io::Write,
160{
161    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
162        f.debug_struct("AvroSink").finish()
163    }
164}
165
166impl<'a, W> Drop for Sink<'a, W>
167where
168    W: io::Write,
169{
170    fn drop(&mut self) {
171        match self.0.flush() {
172            Ok(_) => (),
173            Err(error) => panic!("{}", error),
174        }
175    }
176}