1use std::io;
2
3use dbn::{
4 decode::{DbnMetadata, DecodeRecordRef},
5 encode::{
6 json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecordRef,
7 EncodeRecordTextExt,
8 },
9 rtype_dispatch, Compression, Encoding, MetadataBuilder, SType, SymbolIndex,
10};
11
12use crate::{infer_encoding, output_from_args, Args};
13
14pub fn silence_broken_pipe(err: anyhow::Error) -> anyhow::Result<()> {
15 if let Some(err) = err.downcast_ref::<dbn::Error>() {
17 if matches!(err, dbn::Error::Io { source, .. } if source.kind() == std::io::ErrorKind::BrokenPipe)
18 {
19 return Ok(());
20 }
21 }
22 Err(err)
23}
24
25pub fn encode_from_dbn<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
26where
27 D: DecodeRecordRef + DbnMetadata,
28{
29 let writer = output_from_args(args)?;
30 let (encoding, compression, delimiter) = infer_encoding(args)?;
31 if args.should_output_metadata {
32 if encoding != Encoding::Json {
33 return Err(anyhow::format_err!(
34 "Metadata flag is only valid with JSON encoding"
35 ));
36 }
37 json::Encoder::new(
38 writer,
39 args.should_pretty_print,
40 args.should_pretty_print,
41 args.should_pretty_print,
42 )
43 .encode_metadata(decoder.metadata())?;
44 } else if args.fragment {
45 encode_fragment(decoder, writer, compression)?;
46 } else {
47 let mut encoder = DynEncoder::builder(writer, encoding, compression, decoder.metadata())
48 .delimiter(delimiter)
49 .write_header(args.write_header)
50 .all_pretty(args.should_pretty_print)
51 .with_symbol(args.map_symbols)
52 .build()?;
53 if args.map_symbols {
54 let symbol_map = decoder.metadata().symbol_map()?;
55 let ts_out = decoder.metadata().ts_out;
56 while let Some(rec) = decoder.decode_record_ref()? {
57 let sym = symbol_map.get_for_rec(&rec).map(String::as_str);
58 unsafe {
60 encoder.encode_ref_ts_out_with_sym(rec, ts_out, sym)?;
61 }
62 }
63 } else {
64 encoder.encode_decoded(decoder)?;
65 }
66 }
67 Ok(())
68}
69
70pub fn encode_from_frag<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
71where
72 D: DecodeRecordRef,
73{
74 let writer = output_from_args(args)?;
75 let (encoding, compression, delimiter) = infer_encoding(args)?;
76 if args.fragment {
77 encode_fragment(decoder, writer, compression)?;
78 return Ok(());
79 }
80 assert!(!args.should_output_metadata);
81
82 let mut encoder = DynEncoder::builder(
83 writer,
84 encoding,
85 compression,
86 &MetadataBuilder::new()
88 .dataset(String::new())
89 .schema(None)
90 .start(0)
91 .stype_in(None)
92 .stype_out(SType::InstrumentId)
93 .build(),
94 )
95 .delimiter(delimiter)
96 .write_header(false)
98 .all_pretty(args.should_pretty_print)
99 .build()?;
100 let mut has_written_header = (encoding != Encoding::Csv) || !args.write_header;
101 fn write_header<T: DbnEncodable>(
102 _record: &T,
103 encoder: &mut DynEncoder<Box<dyn io::Write>>,
104 ) -> dbn::Result<()> {
105 encoder.encode_header::<T>(false)
106 }
107 while let Some(record) = decoder.decode_record_ref()? {
108 if !has_written_header {
109 rtype_dispatch!(record, write_header(&mut encoder))??;
110 has_written_header = true;
111 }
112 encoder.encode_record_ref(record)?;
113 }
114 Ok(())
115}
116
117fn encode_fragment<D: DecodeRecordRef>(
118 mut decoder: D,
119 writer: Box<dyn io::Write>,
120 compression: Compression,
121) -> dbn::Result<()> {
122 let mut encoder = DbnRecordEncoder::new(DynWriter::new(writer, compression)?);
123 while let Some(record) = decoder.decode_record_ref()? {
124 encoder.encode_record_ref(record)?;
125 }
126 Ok(())
127}