1use crate::error;
2use crate::value;
3use avro_rs;
4use std;
5use std::fmt;
6use std::io;
7
8pub struct Source<'a, R>(avro_rs::Reader<'a, R>)
9where
10 R: io::Read;
11
12pub struct Sink<'a, W>(avro_rs::Writer<'a, W>)
13where
14 W: io::Write;
15
16#[inline]
17pub fn source<'a, R>(r: R) -> error::Result<Source<'a, R>>
18where
19 R: io::Read,
20{
21 Ok(Source(avro_rs::Reader::new(r).map_err(|e| {
22 error::Error::Avro(error::Avro::downcast(e))
23 })?))
24}
25
26#[inline]
27pub fn sink<W>(schema: &avro_rs::Schema, w: W, codec: avro_rs::Codec) -> error::Result<Sink<W>>
28where
29 W: io::Write,
30{
31 Ok(Sink(avro_rs::Writer::with_codec(schema, w, codec)))
32}
33
34impl<'a, R> value::Source for Source<'a, R>
35where
36 R: io::Read,
37{
38 #[inline]
39 fn read(&mut self) -> error::Result<Option<value::Value>> {
40 match self.0.next() {
41 Some(Ok(v)) => Ok(Some(value_from_avro(v))),
42 Some(Err(e)) => Err(error::Error::Avro(error::Avro::downcast(e))),
43 None => Ok(None),
44 }
45 }
46}
47
48fn value_from_avro(value: avro_rs::types::Value) -> value::Value {
49 use avro_rs::types::Value;
50 match value {
51 Value::Null => value::Value::Unit,
52 Value::Boolean(v) => value::Value::Bool(v),
53 Value::Int(v) => value::Value::I32(v),
54 Value::Long(v) => value::Value::I64(v),
55 Value::Float(v) => value::Value::from_f32(v),
56 Value::Double(v) => value::Value::from_f64(v),
57 Value::Bytes(v) | Value::Fixed(_, v) => value::Value::Bytes(v),
58 Value::String(v) | Value::Enum(_, v) => value::Value::String(v),
59 Value::Union(boxed) => value_from_avro(*boxed),
60 Value::Array(v) => value::Value::Sequence(v.into_iter().map(value_from_avro).collect()),
61 Value::Map(v) => value::Value::Map(
62 v.into_iter()
63 .map(|(k, v)| (value::Value::String(k), value_from_avro(v)))
64 .collect(),
65 ),
66 Value::Record(v) => value::Value::Map(
67 v.into_iter()
68 .map(|(k, v)| (value::Value::String(k), value_from_avro(v)))
69 .collect(),
70 ),
71 }
72}
73
74impl<'a, W> value::Sink for Sink<'a, W>
75where
76 W: io::Write,
77{
78 #[inline]
79 fn write(&mut self, value: value::Value) -> error::Result<()> {
80 self.0
81 .append(value_to_avro(value)?)
82 .map_err(|e| error::Error::Avro(error::Avro::downcast(e)))?;
83 Ok(())
84 }
85}
86
87fn value_to_avro(value: value::Value) -> error::Result<avro_rs::types::Value> {
88 use avro_rs::types::Value;
89 use std::convert::TryFrom;
90 match value {
91 value::Value::Unit => Ok(Value::Null),
92 value::Value::Bool(v) => Ok(Value::Boolean(v)),
93
94 value::Value::I8(v) => Ok(Value::Int(i32::from(v))),
95 value::Value::I16(v) => Ok(Value::Int(i32::from(v))),
96 value::Value::I32(v) => Ok(Value::Int(v)),
97 value::Value::I64(v) => Ok(Value::Long(v)),
98
99 value::Value::U8(v) => Ok(Value::Int(i32::from(v))),
100 value::Value::U16(v) => Ok(Value::Int(i32::from(v))),
101 value::Value::U32(v) => Ok(Value::Long(i64::from(v))),
102 value::Value::U64(v) => {
103 if let Ok(v) = i64::try_from(v) {
104 Ok(Value::Long(v))
105 } else {
106 Err(error::Error::Format {
107 msg: format!(
108 "Avro output does not support unsigned 64 bit integer: {}",
109 v
110 ),
111 })
112 }
113 }
114
115 value::Value::F32(ordered_float::OrderedFloat(v)) => Ok(Value::Float(v)),
116 value::Value::F64(ordered_float::OrderedFloat(v)) => Ok(Value::Double(v)),
117
118 value::Value::Char(v) => Ok(Value::String(format!("{}", v))),
119 value::Value::String(v) => Ok(Value::String(v)),
120 value::Value::Bytes(v) => Ok(Value::Bytes(v)),
121
122 value::Value::Sequence(v) => Ok(Value::Array(
123 v.into_iter()
124 .map(value_to_avro)
125 .collect::<error::Result<Vec<_>>>()?,
126 )),
127 value::Value::Map(v) => Ok(Value::Record(
128 v.into_iter()
129 .map(|(k, v)| match (value_to_string(k), value_to_avro(v)) {
130 (Ok(k), Ok(v)) => Ok((k, v)),
131 (Ok(_), Err(e)) | (Err(e), Ok(_)) | (Err(_), Err(e)) => Err(e),
132 })
133 .collect::<error::Result<Vec<_>>>()?,
134 )),
135 }
136}
137
138fn value_to_string(value: value::Value) -> error::Result<String> {
139 match value {
140 value::Value::Char(v) => Ok(format!("{}", v)),
141 value::Value::String(v) => Ok(v),
142 x => Err(error::Error::Format {
143 msg: format!("Avro can only output string keys, got: {:?}", x),
144 }),
145 }
146}
147
148impl<'a, R> fmt::Debug for Source<'a, R>
149where
150 R: io::Read,
151{
152 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
153 f.debug_struct("AvroSource").finish()
154 }
155}
156
157impl<'a, W> fmt::Debug for Sink<'a, W>
158where
159 W: io::Write,
160{
161 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
162 f.debug_struct("AvroSink").finish()
163 }
164}
165
166impl<'a, W> Drop for Sink<'a, W>
167where
168 W: io::Write,
169{
170 fn drop(&mut self) {
171 match self.0.flush() {
172 Ok(_) => (),
173 Err(error) => panic!("{}", error),
174 }
175 }
176}