Skip to main content

dbn_cli/
encode.rs

1use std::{io, path::Path};
2
3use dbn::{
4    decode::{DbnMetadata, DecodeRecordRef},
5    encode::{
6        json, DbnEncodable, DbnRecordEncoder, DynEncoder, DynWriter, EncodeDbn, EncodeRecord,
7        EncodeRecordRef, EncodeRecordTextExt, NoSchemaBehavior, SchemaSplitter, SplitEncoder,
8        Splitter, SymbolSplitter, TimeSplitter,
9    },
10    rtype_dispatch, Compression, Encoding, Metadata, MetadataBuilder, SType, Schema, SymbolIndex,
11    TsSymbolMap,
12};
13
14use crate::{infer_encoding, output_from_args, Args, InferredEncoding, SplitBy};
15
16pub fn silence_broken_pipe(err: anyhow::Error) -> anyhow::Result<()> {
17    // Handle broken pipe as a non-error.
18    if let Some(err) = err.downcast_ref::<dbn::Error>() {
19        if matches!(err, dbn::Error::Io { source, .. } if source.kind() == std::io::ErrorKind::BrokenPipe)
20        {
21            return Ok(());
22        }
23    }
24    Err(err)
25}
26
27pub fn encode_from_dbn<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
28where
29    D: DecodeRecordRef + DbnMetadata,
30{
31    let writer = output_from_args(args)?;
32    let InferredEncoding {
33        encoding,
34        is_fragment,
35        delimiter,
36        compression,
37    } = infer_encoding(args)?;
38    if args.should_output_metadata {
39        if encoding != Encoding::Json {
40            return Err(anyhow::format_err!(
41                "Metadata flag is only valid with JSON encoding"
42            ));
43        }
44        json::Encoder::new(
45            writer,
46            args.should_pretty_print,
47            args.should_pretty_print,
48            args.should_pretty_print,
49        )
50        .encode_metadata(decoder.metadata())?;
51    } else if is_fragment {
52        encode_fragment(decoder, writer, compression)?;
53    } else {
54        let mut encoder = DynEncoder::builder(writer, encoding, compression, decoder.metadata())
55            .delimiter(delimiter)
56            .write_header(args.write_header)
57            .all_pretty(args.should_pretty_print)
58            .with_symbol(args.map_symbols)
59            .build()?;
60        if args.map_symbols {
61            let symbol_map = decoder.metadata().symbol_map()?;
62            let ts_out = decoder.metadata().ts_out;
63            while let Some(rec) = decoder.decode_record_ref()? {
64                let sym = symbol_map.get_for_rec(&rec).map(String::as_str);
65                // SAFETY: `ts_out` is accurate because it's sourced from the metadata
66                unsafe {
67                    encoder.encode_ref_ts_out_with_sym(rec, ts_out, sym)?;
68                }
69            }
70        } else {
71            encoder.encode_decoded(decoder)?;
72        }
73    }
74    Ok(())
75}
76
77pub fn split_encode_from_dbn<D>(
78    args: &Args,
79    split_by: SplitBy,
80    output_pattern: &str,
81    decoder: D,
82) -> anyhow::Result<()>
83where
84    D: DecodeRecordRef + DbnMetadata,
85{
86    let InferredEncoding {
87        encoding,
88        compression,
89        delimiter,
90        is_fragment: is_output_fragment,
91    } = infer_encoding(args)?;
92    let open_output = |path: &str| {
93        crate::output(Some(Path::new(path)), args.force)
94            .map_err(|e| dbn::Error::io(io::Error::other(e), format!("opening output file {path}")))
95    };
96    if is_output_fragment {
97        let build_encoder = |path: &str, _metadata: Option<Metadata>| -> dbn::Result<_> {
98            Ok(DbnRecordEncoder::new(DynWriter::new(
99                open_output(path)?,
100                compression,
101            )?))
102        };
103        split_by_encode_fragment(decoder, split_by, output_pattern, build_encoder)
104    } else {
105        let build_encoder = |path: &str, metadata: Option<Metadata>| -> dbn::Result<_> {
106            DynEncoder::builder(
107                open_output(path)?,
108                encoding,
109                compression,
110                &metadata.unwrap(),
111            )
112            .delimiter(delimiter)
113            .write_header(args.write_header)
114            .all_pretty(args.should_pretty_print)
115            .with_symbol(args.map_symbols)
116            .build()
117        };
118        split_by_encode(
119            decoder,
120            split_by,
121            output_pattern,
122            build_encoder,
123            args.map_symbols,
124        )
125    }
126}
127
128fn split_by_encode<D, E, F>(
129    decoder: D,
130    split_by: SplitBy,
131    output_pattern: &str,
132    build_encoder: F,
133    map_symbols: bool,
134) -> anyhow::Result<()>
135where
136    D: DecodeRecordRef + DbnMetadata,
137    E: EncodeRecordTextExt,
138    F: Fn(&str, Option<Metadata>) -> dbn::Result<E>,
139{
140    let symbol_map = decoder.metadata().symbol_map()?;
141    match split_by {
142        SplitBy::Symbol => {
143            // TODO: detect live data and split on live symbol mapping msgs
144            let splitter = SymbolSplitter::new(
145                |symbol: &str, metadata| {
146                    build_encoder(&output_pattern.replace("{symbol}", symbol), metadata)
147                },
148                symbol_map.clone(),
149            );
150            split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map))
151        }
152        SplitBy::Schema => {
153            let splitter = SchemaSplitter::new(
154                |schema: Schema, metadata| {
155                    build_encoder(
156                        &output_pattern.replace("{schema}", schema.as_str()),
157                        metadata,
158                    )
159                },
160                // TODO: support other behaviors
161                NoSchemaBehavior::default(),
162            );
163            split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map))
164        }
165        SplitBy::Day | SplitBy::Week | SplitBy::Month => {
166            let splitter = TimeSplitter::new(
167                |date: time::Date, metadata| {
168                    build_encoder(
169                        &output_pattern.replace("{date}", &date.to_string()),
170                        metadata,
171                    )
172                },
173                split_by.duration().unwrap(),
174            );
175            split_encode_impl(decoder, map_symbols, splitter, Some(symbol_map))
176        }
177    }
178}
179
180fn split_by_encode_fragment<D, E, F>(
181    decoder: D,
182    split_by: SplitBy,
183    output_pattern: &str,
184    build_encoder: F,
185) -> anyhow::Result<()>
186where
187    D: DecodeRecordRef + DbnMetadata,
188    E: EncodeRecord + EncodeRecordRef,
189    F: Fn(&str, Option<Metadata>) -> dbn::Result<E>,
190{
191    match split_by {
192        SplitBy::Symbol => {
193            let symbol_map = decoder.metadata().symbol_map()?;
194            let splitter = SymbolSplitter::new(
195                |symbol: &str, metadata| {
196                    build_encoder(&output_pattern.replace("{symbol}", symbol), metadata)
197                },
198                symbol_map,
199            );
200            split_encode_fragment_impl(decoder, splitter)
201        }
202        SplitBy::Schema => {
203            let splitter = SchemaSplitter::new(
204                |schema: Schema, metadata| {
205                    build_encoder(
206                        &output_pattern.replace("{schema}", schema.as_str()),
207                        metadata,
208                    )
209                },
210                // TODO: support other behaviors
211                NoSchemaBehavior::default(),
212            );
213            split_encode_fragment_impl(decoder, splitter)
214        }
215        SplitBy::Day | SplitBy::Week | SplitBy::Month => {
216            let splitter = TimeSplitter::new(
217                |date: time::Date, metadata| {
218                    build_encoder(
219                        &output_pattern.replace("{date}", &date.to_string()),
220                        metadata,
221                    )
222                },
223                split_by.duration().unwrap(),
224            );
225            split_encode_fragment_impl(decoder, splitter)
226        }
227    }
228}
229
230fn split_encode_impl<D, S, E>(
231    mut decoder: D,
232    map_symbols: bool,
233    splitter: S,
234    symbol_map: Option<TsSymbolMap>,
235) -> anyhow::Result<()>
236where
237    D: DecodeRecordRef + DbnMetadata,
238    S: Splitter<E>,
239    E: EncodeRecordTextExt,
240{
241    let mut encoder = SplitEncoder::with_metadata(splitter, decoder.metadata().clone());
242    if map_symbols {
243        let symbol_map = if let Some(symbol_map) = symbol_map {
244            symbol_map
245        } else {
246            decoder.metadata().symbol_map()?
247        };
248        let ts_out = decoder.metadata().ts_out;
249        while let Some(rec) = decoder.decode_record_ref()? {
250            let sym = symbol_map.get_for_rec(&rec).map(String::as_str);
251            // SAFETY: `ts_out` is accurate because it's sourced from the metadata
252            unsafe {
253                encoder.encode_ref_ts_out_with_sym(rec, ts_out, sym)?;
254            }
255        }
256    } else {
257        encoder.encode_decoded(decoder)?;
258    }
259    Ok(())
260}
261
262fn split_encode_fragment_impl<D, S, E>(mut decoder: D, splitter: S) -> anyhow::Result<()>
263where
264    D: DecodeRecordRef,
265    S: Splitter<E>,
266    E: EncodeRecord + EncodeRecordRef,
267{
268    let mut encoder = SplitEncoder::records_only(splitter);
269    while let Some(rec) = decoder.decode_record_ref()? {
270        encoder.encode_record_ref(rec)?;
271    }
272    encoder.flush()?;
273    Ok(())
274}
275
276pub fn encode_from_frag<D>(args: &Args, mut decoder: D) -> anyhow::Result<()>
277where
278    D: DecodeRecordRef,
279{
280    let writer = output_from_args(args)?;
281    let InferredEncoding {
282        encoding,
283        compression,
284        delimiter,
285        is_fragment,
286    } = infer_encoding(args)?;
287    if is_fragment {
288        encode_fragment(decoder, writer, compression)?;
289        return Ok(());
290    }
291    assert!(!args.should_output_metadata);
292
293    let mut encoder = DynEncoder::builder(
294        writer,
295        encoding,
296        compression,
297        // dummy metadata won't be encoded
298        &dummy_metadata(),
299    )
300    .delimiter(delimiter)
301    // Can't write header until we know the record type
302    .write_header(false)
303    .all_pretty(args.should_pretty_print)
304    .build()?;
305    let mut has_written_header = (encoding != Encoding::Csv) || !args.write_header;
306    fn write_header<T: DbnEncodable>(
307        _record: &T,
308        encoder: &mut DynEncoder<Box<dyn io::Write>>,
309    ) -> dbn::Result<()> {
310        encoder.encode_header::<T>(false)
311    }
312    while let Some(record) = decoder.decode_record_ref()? {
313        if !has_written_header {
314            rtype_dispatch!(record, write_header(&mut encoder))??;
315            has_written_header = true;
316        }
317        encoder.encode_record_ref(record)?;
318    }
319    Ok(())
320}
321
322fn dummy_metadata() -> Metadata {
323    MetadataBuilder::new()
324        .dataset(String::new())
325        .schema(None)
326        .start(0)
327        .stype_in(None)
328        .stype_out(SType::InstrumentId)
329        .build()
330}
331
332fn encode_fragment<D: DecodeRecordRef>(
333    mut decoder: D,
334    writer: Box<dyn io::Write>,
335    compression: Compression,
336) -> dbn::Result<()> {
337    let mut encoder = DbnRecordEncoder::new(DynWriter::new(writer, compression)?);
338    while let Some(record) = decoder.decode_record_ref()? {
339        encoder.encode_record_ref(record)?;
340    }
341    Ok(())
342}
343
344/// Split encode from a fragment input (no metadata).
345///
346/// Only supports time-based and schema-based splitting. Symbol splitting requires
347/// a symbol map which is not available in fragment inputs.
348pub fn split_encode_from_frag<D>(
349    args: &Args,
350    split_by: SplitBy,
351    output_pattern: &str,
352    decoder: D,
353) -> anyhow::Result<()>
354where
355    D: DecodeRecordRef,
356{
357    if matches!(split_by, SplitBy::Symbol) {
358        return Err(anyhow::anyhow!(
359            "Cannot split by symbol when input is a fragment: no symbol map available"
360        ));
361    }
362    let InferredEncoding {
363        encoding,
364        compression,
365        delimiter,
366        is_fragment,
367    } = infer_encoding(args)?;
368    let open_output = |path: &str| {
369        crate::output(Some(Path::new(path)), args.force)
370            .map_err(|e| dbn::Error::io(io::Error::other(e), format!("opening output file {path}")))
371    };
372    if is_fragment {
373        let build_encoder = |path: &str| -> dbn::Result<_> {
374            Ok(DbnRecordEncoder::new(DynWriter::new(
375                open_output(path)?,
376                compression,
377            )?))
378        };
379        match split_by {
380            SplitBy::Symbol => unreachable!("handled above"),
381            SplitBy::Schema => {
382                let splitter = SchemaSplitter::new(
383                    |schema: Schema, _metadata| {
384                        build_encoder(&output_pattern.replace("{schema}", schema.as_str()))
385                    },
386                    NoSchemaBehavior::default(),
387                );
388                split_encode_fragment_impl(decoder, splitter)
389            }
390            SplitBy::Day | SplitBy::Week | SplitBy::Month => {
391                let splitter = TimeSplitter::new(
392                    |date: time::Date, _metadata| {
393                        build_encoder(&output_pattern.replace("{date}", &date.to_string()))
394                    },
395                    split_by.duration().unwrap(),
396                );
397                split_encode_fragment_impl(decoder, splitter)
398            }
399        }
400    } else {
401        let metadata = dummy_metadata();
402        let build_encoder = |path: &str| -> dbn::Result<_> {
403            DynEncoder::builder(open_output(path)?, encoding, compression, &metadata)
404                .delimiter(delimiter)
405                .write_header(args.write_header)
406                .all_pretty(args.should_pretty_print)
407                .build()
408        };
409        match split_by {
410            SplitBy::Symbol => unreachable!("handled above"),
411            SplitBy::Schema => {
412                let splitter = SchemaSplitter::new(
413                    |schema: Schema, _metadata| {
414                        build_encoder(&output_pattern.replace("{schema}", schema.as_str()))
415                    },
416                    NoSchemaBehavior::default(),
417                );
418                split_encode_fragment_impl(decoder, splitter)
419            }
420            SplitBy::Day | SplitBy::Week | SplitBy::Month => {
421                let splitter = TimeSplitter::new(
422                    |date: time::Date, _metadata| {
423                        build_encoder(&output_pattern.replace("{date}", &date.to_string()))
424                    },
425                    split_by.duration().unwrap(),
426                );
427                split_encode_fragment_impl(decoder, splitter)
428            }
429        }
430    }
431}