dbn_cli/
encode.rs

1use std::io;
2
3use dbn::{
4    decode::{DbnMetadata, DecodeRecordRef},
5    encode::{
6        json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecordRef,
7        EncodeRecordTextExt,
8    },
9    rtype_dispatch, Compression, Encoding, MetadataBuilder, SType, SymbolIndex,
10};
11
12use crate::{infer_encoding, output_from_args, Args};
13
14pub fn silence_broken_pipe(err: anyhow::Error) -> anyhow::Result<()> {
15    // Handle broken pipe as a non-error.
16    if let Some(err) = err.downcast_ref::<dbn::Error>() {
17        if matches!(err, dbn::Error::Io { source, .. } if source.kind() == std::io::ErrorKind::BrokenPipe)
18        {
19            return Ok(());
20        }
21    }
22    Err(err)
23}
24
25pub fn encode_from_dbn<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
26where
27    D: DecodeRecordRef + DbnMetadata,
28{
29    let writer = output_from_args(args)?;
30    let (encoding, compression, delimiter) = infer_encoding(args)?;
31    if args.should_output_metadata {
32        if encoding != Encoding::Json {
33            return Err(anyhow::format_err!(
34                "Metadata flag is only valid with JSON encoding"
35            ));
36        }
37        json::Encoder::new(
38            writer,
39            args.should_pretty_print,
40            args.should_pretty_print,
41            args.should_pretty_print,
42        )
43        .encode_metadata(decoder.metadata())?;
44    } else if args.fragment {
45        encode_fragment(decoder, writer, compression)?;
46    } else {
47        let mut encoder = DynEncoder::builder(writer, encoding, compression, decoder.metadata())
48            .delimiter(delimiter)
49            .write_header(args.write_header)
50            .all_pretty(args.should_pretty_print)
51            .with_symbol(args.map_symbols)
52            .build()?;
53        if args.map_symbols {
54            let symbol_map = decoder.metadata().symbol_map()?;
55            let ts_out = decoder.metadata().ts_out;
56            while let Some(rec) = decoder.decode_record_ref()? {
57                let sym = symbol_map.get_for_rec(&rec).map(String::as_str);
58                // SAFETY: `ts_out` is accurate because it's sourced from the metadata
59                unsafe {
60                    encoder.encode_ref_ts_out_with_sym(rec, ts_out, sym)?;
61                }
62            }
63        } else {
64            encoder.encode_decoded(decoder)?;
65        }
66    }
67    Ok(())
68}
69
70pub fn encode_from_frag<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
71where
72    D: DecodeRecordRef,
73{
74    let writer = output_from_args(args)?;
75    let (encoding, compression, delimiter) = infer_encoding(args)?;
76    if args.fragment {
77        encode_fragment(decoder, writer, compression)?;
78        return Ok(());
79    }
80    assert!(!args.should_output_metadata);
81
82    let mut encoder = DynEncoder::builder(
83        writer,
84        encoding,
85        compression,
86        // dummy metadata won't be encoded
87        &MetadataBuilder::new()
88            .dataset(String::new())
89            .schema(None)
90            .start(0)
91            .stype_in(None)
92            .stype_out(SType::InstrumentId)
93            .build(),
94    )
95    .delimiter(delimiter)
96    // Can't write header until we know the record type
97    .write_header(false)
98    .all_pretty(args.should_pretty_print)
99    .build()?;
100    let mut has_written_header = (encoding != Encoding::Csv) || !args.write_header;
101    fn write_header<T: DbnEncodable>(
102        _record: &T,
103        encoder: &mut DynEncoder<Box<dyn io::Write>>,
104    ) -> dbn::Result<()> {
105        encoder.encode_header::<T>(false)
106    }
107    while let Some(record) = decoder.decode_record_ref()? {
108        if !has_written_header {
109            rtype_dispatch!(record, write_header(&mut encoder))??;
110            has_written_header = true;
111        }
112        encoder.encode_record_ref(record)?;
113    }
114    Ok(())
115}
116
117fn encode_fragment<D: DecodeRecordRef>(
118    mut decoder: D,
119    writer: Box<dyn io::Write>,
120    compression: Compression,
121) -> dbn::Result<()> {
122    let mut encoder = DbnRecordEncoder::new(DynWriter::new(writer, compression)?);
123    while let Some(record) = decoder.decode_record_ref()? {
124        encoder.encode_record_ref(record)?;
125    }
126    Ok(())
127}