1use crate::{decode::decode, schema::Schema, types::Value, util, AvroResult, Codec, Error};
3use serde_json::from_slice;
4use std::{
5 io::{ErrorKind, Read},
6 str::FromStr,
7};
8
9#[derive(Debug, Clone)]
11struct Block<R> {
12 reader: R,
13 buf: Vec<u8>,
15 buf_idx: usize,
16 message_count: usize,
18 marker: [u8; 16],
19 codec: Codec,
20 writer_schema: Schema,
21}
22
23impl<R: Read> Block<R> {
24 fn new(reader: R) -> AvroResult<Block<R>> {
25 let mut block = Block {
26 reader,
27 codec: Codec::Null,
28 writer_schema: Schema::Null,
29 buf: vec![],
30 buf_idx: 0,
31 message_count: 0,
32 marker: [0; 16],
33 };
34
35 block.read_header()?;
36 Ok(block)
37 }
38
39 fn read_header(&mut self) -> AvroResult<()> {
42 let meta_schema = Schema::Map(Box::new(Schema::Bytes));
43
44 let mut buf = [0u8; 4];
45 self.reader
46 .read_exact(&mut buf)
47 .map_err(Error::ReadHeader)?;
48
49 if buf != [b'O', b'b', b'j', 1u8] {
50 return Err(Error::HeaderMagic);
51 }
52
53 if let Value::Map(meta) = decode(&meta_schema, &mut self.reader)? {
54 let json = meta
56 .get("avro.schema")
57 .and_then(|bytes| {
58 if let Value::Bytes(ref bytes) = *bytes {
59 from_slice(bytes.as_ref()).ok()
60 } else {
61 None
62 }
63 })
64 .ok_or(Error::GetAvroSchemaFromMap)?;
65 self.writer_schema = Schema::parse(&json)?;
66
67 if let Some(codec) = meta
68 .get("avro.codec")
69 .and_then(|codec| {
70 if let Value::Bytes(ref bytes) = *codec {
71 std::str::from_utf8(bytes.as_ref()).ok()
72 } else {
73 None
74 }
75 })
76 .and_then(|codec| Codec::from_str(codec).ok())
77 {
78 self.codec = codec;
79 }
80 } else {
81 return Err(Error::GetHeaderMetadata);
82 }
83
84 self.reader
85 .read_exact(&mut self.marker)
86 .map_err(Error::ReadMarker)
87 }
88
89 fn fill_buf(&mut self, n: usize) -> AvroResult<()> {
90 self.buf.resize(n, 0);
102 self.reader
103 .read_exact(&mut self.buf)
104 .map_err(Error::ReadIntoBuf)?;
105 self.buf_idx = 0;
106 Ok(())
107 }
108
109 fn read_block_next(&mut self) -> AvroResult<()> {
112 assert!(self.is_empty(), "Expected self to be empty!");
113 match util::read_long(&mut self.reader) {
114 Ok(block_len) => {
115 self.message_count = block_len as usize;
116 let block_bytes = util::read_long(&mut self.reader)?;
117 self.fill_buf(block_bytes as usize)?;
118 let mut marker = [0u8; 16];
119 self.reader
120 .read_exact(&mut marker)
121 .map_err(Error::ReadBlockMarker)?;
122
123 if marker != self.marker {
124 return Err(Error::GetBlockMarker);
125 }
126
127 self.codec.decompress(&mut self.buf)
134 }
135 Err(Error::ReadVariableIntegerBytes(io_err)) => {
136 if let ErrorKind::UnexpectedEof = io_err.kind() {
137 Ok(())
139 } else {
140 Err(Error::ReadVariableIntegerBytes(io_err))
141 }
142 }
143 Err(e) => Err(e),
144 }
145 }
146
147 fn len(&self) -> usize {
148 self.message_count
149 }
150
151 fn is_empty(&self) -> bool {
152 self.len() == 0
153 }
154
155 fn read_next(&mut self, read_schema: Option<&Schema>) -> AvroResult<Option<Value>> {
156 if self.is_empty() {
157 self.read_block_next()?;
158 if self.is_empty() {
159 return Ok(None);
160 }
161 }
162
163 let mut block_bytes = &self.buf[self.buf_idx..];
164 let b_original = block_bytes.len();
165 let item = from_avro_datum(&self.writer_schema, &mut block_bytes, read_schema)?;
166 self.buf_idx += b_original - block_bytes.len();
167 self.message_count -= 1;
168 Ok(Some(item))
169 }
170}
171
172pub struct Reader<'a, R> {
188 block: Block<R>,
189 reader_schema: Option<&'a Schema>,
190 errored: bool,
191 should_resolve_schema: bool,
192}
193
194impl<'a, R: Read> Reader<'a, R> {
195 pub fn new(reader: R) -> AvroResult<Reader<'a, R>> {
200 let block = Block::new(reader)?;
201 let reader = Reader {
202 block,
203 reader_schema: None,
204 errored: false,
205 should_resolve_schema: false,
206 };
207 Ok(reader)
208 }
209
210 pub fn with_schema(schema: &'a Schema, reader: R) -> AvroResult<Reader<'a, R>> {
215 let block = Block::new(reader)?;
216 let mut reader = Reader {
217 block,
218 reader_schema: Some(schema),
219 errored: false,
220 should_resolve_schema: false,
221 };
222 reader.should_resolve_schema = reader.writer_schema() != schema;
224 Ok(reader)
225 }
226
227 pub fn writer_schema(&self) -> &Schema {
229 &self.block.writer_schema
230 }
231
232 pub fn reader_schema(&self) -> Option<&Schema> {
234 self.reader_schema
235 }
236
237 #[inline]
238 fn read_next(&mut self) -> AvroResult<Option<Value>> {
239 let read_schema = if self.should_resolve_schema {
240 self.reader_schema
241 } else {
242 None
243 };
244
245 self.block.read_next(read_schema)
246 }
247}
248
249impl<'a, R: Read> Iterator for Reader<'a, R> {
250 type Item = AvroResult<Value>;
251
252 fn next(&mut self) -> Option<Self::Item> {
253 if self.errored {
255 return None;
256 };
257 match self.read_next() {
258 Ok(opt) => opt.map(Ok),
259 Err(e) => {
260 self.errored = true;
261 Some(Err(e))
262 }
263 }
264 }
265}
266
267pub fn from_avro_datum<R: Read>(
276 writer_schema: &Schema,
277 reader: &mut R,
278 reader_schema: Option<&Schema>,
279) -> AvroResult<Value> {
280 let value = decode(writer_schema, reader)?;
281 match reader_schema {
282 Some(ref schema) => value.resolve(schema),
283 None => Ok(value),
284 }
285}
286
287#[cfg(test)]
288mod tests {
289 use super::*;
290 use crate::{types::Record, Reader};
291 use std::io::Cursor;
292
293 const SCHEMA: &str = r#"
294 {
295 "type": "record",
296 "name": "test",
297 "fields": [
298 {
299 "name": "a",
300 "type": "long",
301 "default": 42
302 },
303 {
304 "name": "b",
305 "type": "string"
306 }
307 ]
308 }
309 "#;
310 const UNION_SCHEMA: &str = r#"["null", "long"]"#;
311 const ENCODED: &[u8] = &[
312 79u8, 98u8, 106u8, 1u8, 4u8, 22u8, 97u8, 118u8, 114u8, 111u8, 46u8, 115u8, 99u8, 104u8,
313 101u8, 109u8, 97u8, 222u8, 1u8, 123u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8,
314 114u8, 101u8, 99u8, 111u8, 114u8, 100u8, 34u8, 44u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8,
315 58u8, 34u8, 116u8, 101u8, 115u8, 116u8, 34u8, 44u8, 34u8, 102u8, 105u8, 101u8, 108u8,
316 100u8, 115u8, 34u8, 58u8, 91u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8,
317 97u8, 34u8, 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 108u8, 111u8, 110u8,
318 103u8, 34u8, 44u8, 34u8, 100u8, 101u8, 102u8, 97u8, 117u8, 108u8, 116u8, 34u8, 58u8, 52u8,
319 50u8, 125u8, 44u8, 123u8, 34u8, 110u8, 97u8, 109u8, 101u8, 34u8, 58u8, 34u8, 98u8, 34u8,
320 44u8, 34u8, 116u8, 121u8, 112u8, 101u8, 34u8, 58u8, 34u8, 115u8, 116u8, 114u8, 105u8,
321 110u8, 103u8, 34u8, 125u8, 93u8, 125u8, 20u8, 97u8, 118u8, 114u8, 111u8, 46u8, 99u8, 111u8,
322 100u8, 101u8, 99u8, 8u8, 110u8, 117u8, 108u8, 108u8, 0u8, 94u8, 61u8, 54u8, 221u8, 190u8,
323 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8, 4u8, 20u8, 54u8,
324 6u8, 102u8, 111u8, 111u8, 84u8, 6u8, 98u8, 97u8, 114u8, 94u8, 61u8, 54u8, 221u8, 190u8,
325 207u8, 108u8, 180u8, 158u8, 57u8, 114u8, 40u8, 173u8, 199u8, 228u8, 239u8,
326 ];
327
328 #[test]
329 fn test_from_avro_datum() {
330 let schema = Schema::parse_str(SCHEMA).unwrap();
331 let mut encoded: &'static [u8] = &[54, 6, 102, 111, 111];
332
333 let mut record = Record::new(&schema).unwrap();
334 record.put("a", 27i64);
335 record.put("b", "foo");
336 let expected = record.into();
337
338 assert_eq!(
339 from_avro_datum(&schema, &mut encoded, None).unwrap(),
340 expected
341 );
342 }
343
344 #[test]
345 fn test_null_union() {
346 let schema = Schema::parse_str(UNION_SCHEMA).unwrap();
347 let mut encoded: &'static [u8] = &[2, 0];
348
349 assert_eq!(
350 from_avro_datum(&schema, &mut encoded, None).unwrap(),
351 Value::Union(Box::new(Value::Long(0)))
352 );
353 }
354
355 #[test]
356 fn test_reader_iterator() {
357 let schema = Schema::parse_str(SCHEMA).unwrap();
358 let reader = Reader::with_schema(&schema, ENCODED).unwrap();
359
360 let mut record1 = Record::new(&schema).unwrap();
361 record1.put("a", 27i64);
362 record1.put("b", "foo");
363
364 let mut record2 = Record::new(&schema).unwrap();
365 record2.put("a", 42i64);
366 record2.put("b", "bar");
367
368 let expected = vec![record1.into(), record2.into()];
369
370 for (i, value) in reader.enumerate() {
371 assert_eq!(value.unwrap(), expected[i]);
372 }
373 }
374
375 #[test]
376 fn test_reader_invalid_header() {
377 let schema = Schema::parse_str(SCHEMA).unwrap();
378 let invalid = ENCODED.to_owned().into_iter().skip(1).collect::<Vec<u8>>();
379 assert!(Reader::with_schema(&schema, &invalid[..]).is_err());
380 }
381
382 #[test]
383 fn test_reader_invalid_block() {
384 let schema = Schema::parse_str(SCHEMA).unwrap();
385 let invalid = ENCODED
386 .to_owned()
387 .into_iter()
388 .rev()
389 .skip(19)
390 .collect::<Vec<u8>>()
391 .into_iter()
392 .rev()
393 .collect::<Vec<u8>>();
394 let reader = Reader::with_schema(&schema, &invalid[..]).unwrap();
395 for value in reader {
396 assert!(value.is_err());
397 }
398 }
399
400 #[test]
401 fn test_reader_empty_buffer() {
402 let empty = Cursor::new(Vec::new());
403 assert!(Reader::new(empty).is_err());
404 }
405
406 #[test]
407 fn test_reader_only_header() {
408 let invalid = ENCODED
409 .to_owned()
410 .into_iter()
411 .take(165)
412 .collect::<Vec<u8>>();
413 let reader = Reader::new(&invalid[..]).unwrap();
414 for value in reader {
415 assert!(value.is_err());
416 }
417 }
418}