rs_avro_transcode/
enc.rs

1use std::io;
2
3use std::io::BufReader;
4use std::io::Read;
5
6use std::io::BufWriter;
7use std::io::Write;
8
9use apache_avro::types::Value;
10use apache_avro::Writer;
11
12use apache_avro::Codec;
13use apache_avro::Reader;
14use apache_avro::Schema;
15
16pub fn val2wtr<I, W>(values: I, mut wtr: Writer<W>) -> Result<(), io::Error>
17where
18    I: Iterator<Item = Result<Value, io::Error>>,
19    W: Write,
20{
21    for rval in values {
22        let val: Value = rval?;
23        wtr.append(val).map_err(io::Error::other)?;
24    }
25    wtr.flush().map_err(io::Error::other)?;
26    Ok(())
27}
28
29pub fn rdr2wtr<R, W>(r: R, c: Codec, mut w: W) -> Result<(), io::Error>
30where
31    R: Read,
32    W: Write,
33{
34    let reader: Reader<R> = Reader::new(r).map_err(io::Error::other)?;
35    let s: Schema = reader.writer_schema().clone();
36    let wtr: Writer<_> = Writer::with_codec(&s, &mut w, c);
37    let values = reader.map(|rslt| rslt.map_err(io::Error::other));
38    val2wtr(values, wtr)?;
39    w.flush()?;
40    Ok(())
41}
42
43pub fn stdin2stdout(codec_string: &str) -> Result<(), io::Error> {
44    let c: Codec = str::parse(codec_string)
45        .map_err(|_| io::Error::other(format!("wrong codec: {codec_string}")))?;
46
47    let i = io::stdin();
48    let il = i.lock();
49    let br = BufReader::new(il);
50
51    let o = io::stdout();
52    let mut ol = o.lock();
53    let bw = BufWriter::new(&mut ol);
54    rdr2wtr(br, c, bw)?;
55    ol.flush()
56}