Skip to main content

dbn_cli/
lib.rs

1use std::{
2    fs::File,
3    io::{self, BufWriter},
4    num::NonZeroU64,
5    path::{Path, PathBuf},
6};
7
8use anyhow::{anyhow, Context};
9use clap::{ArgAction, Parser, ValueEnum};
10
11use dbn::{
12    encode::SplitDuration,
13    enums::{Compression, Encoding},
14    Schema, VersionUpgradePolicy,
15};
16
17pub mod encode;
18pub mod filter;
19
20/// How the output of the `dbn` command will be encoded.
21#[derive(Clone, Copy, Debug, ValueEnum)]
22pub enum OutputEncoding {
23    /// `dbn` will infer based on the extension of the specified output file
24    Infer,
25    Dbn,
26    Csv,
27    Tsv,
28    Json,
29    DbnFragment,
30}
31
32/// How to split a DBN file
33#[derive(Clone, Copy, Debug, ValueEnum)]
34pub enum SplitBy {
35    Symbol,
36    Schema,
37    Day,
38    Week,
39    Month,
40}
41
42impl SplitBy {
43    pub fn duration(self) -> Option<SplitDuration> {
44        match self {
45            SplitBy::Day => Some(SplitDuration::Day),
46            SplitBy::Week => Some(SplitDuration::Week),
47            SplitBy::Month => Some(SplitDuration::Month),
48            SplitBy::Symbol | SplitBy::Schema => None,
49        }
50    }
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
54pub struct InferredEncoding {
55    pub encoding: Encoding,
56    pub compression: Compression,
57    pub delimiter: u8,
58    pub is_fragment: bool,
59}
60
61#[derive(Debug, Parser)]
62#[clap(name = "dbn", version, about)]
63#[cfg_attr(test, derive(Default))]
64pub struct Args {
65    #[clap(
66        help = "One or more DBN or legacy DBZ files to decode. Passing multiple files will result in a merge. Pass '-' to read from standard input",
67        value_name = "FILE...",
68        value_delimiter = ' ',
69        num_args = 1..,
70        required = true,
71    )]
72    pub input: Vec<PathBuf>,
73    #[clap(
74        short,
75        long,
76        help = "Saves the result to FILE. If no path is specified, the output will be written to standard output",
77        value_name = "FILE"
78    )]
79    pub output: Option<PathBuf>,
80    #[clap(
81        short = 'O',
82        long,
83        help = "Saves the result of file splitting to paths according to PATTERN",
84        requires = "split_by",
85        conflicts_with = "output",
86        value_name = "PATTERN"
87    )]
88    pub output_pattern: Option<String>,
89    #[clap(
90        short = 'S',
91        long,
92        help = "How to optionally split the output across files",
93        requires = "output_pattern",
94        value_name = "SPLIT_BY"
95    )]
96    pub split_by: Option<SplitBy>,
97    #[clap(
98        short = 'J',
99        long,
100        action = ArgAction::SetTrue,
101        default_value = "false",
102        group = "output_encoding",
103        help = "Output the result as JSON lines"
104    )]
105    pub json: bool,
106    #[clap(
107        short = 'C',
108        long,
109        action = ArgAction::SetTrue,
110        default_value = "false",
111        group = "output_encoding",
112        help = "Output the result as CSV"
113    )]
114    pub csv: bool,
115    #[clap(
116        short = 'T',
117        long,
118        action = ArgAction::SetTrue,
119        default_value = "false",
120        group = "output_encoding",
121        help = "Output the result as tab-separated values (TSV)"
122    )]
123    pub tsv: bool,
124    #[clap(
125        short = 'D',
126        long,
127        action = ArgAction::SetTrue,
128        default_value = "false",
129        group = "output_encoding",
130        help = "Output the result as DBN"
131    )]
132    pub dbn: bool,
133    #[clap(
134        short = 'F',
135        long,
136        action = ArgAction::SetTrue,
137        default_value = "false",
138        group = "output_encoding",
139        help = "Output the result as a DBN fragment (no metadata)"
140    )]
141    pub fragment: bool,
142    #[clap(short, long, action = ArgAction::SetTrue, default_value = "false", help = "Zstd compress the output")]
143    pub zstd: bool,
144    #[clap(
145        short = 'u',
146        long = "upgrade",
147        default_value = "false",
148        action = ArgAction::SetTrue,
149        help = "Upgrade data when decoding previous DBN versions. By default data is decoded as-is."
150    )]
151    pub should_upgrade: bool,
152    #[clap(
153        short,
154        long,
155        action = ArgAction::SetTrue,
156        default_value = "false",
157        help = "Allow overwriting of existing files, such as the output file"
158    )]
159    pub force: bool,
160    #[clap(
161        short = 'm',
162        long = "metadata",
163        action = ArgAction::SetTrue,
164        default_value = "false",
165        conflicts_with_all = ["csv", "dbn", "fragment"],
166        help = "Output the metadata section instead of the body of the DBN file. Only valid for JSON output encoding"
167    )]
168    pub should_output_metadata: bool,
169    #[clap(
170         short = 'p',
171         long = "pretty",
172         action = ArgAction::SetTrue,
173         default_value = "false",
174         conflicts_with_all = ["dbn", "fragment"],
175         help ="Make the CSV or JSON output easier to read by converting timestamps to ISO 8601 and prices to decimals"
176    )]
177    pub should_pretty_print: bool,
178    #[clap(
179         short = 's',
180         long = "map-symbols",
181         action = ArgAction::SetTrue,
182         default_value = "false",
183         conflicts_with_all = ["input_fragment", "dbn", "fragment"],
184         help ="Use symbology mappings from the metadata to create a 'symbol' field mapping the instrument ID to its requested symbol."
185    )]
186    pub map_symbols: bool,
187    #[clap(
188        short = 'l',
189        long = "limit",
190        value_name = "NUM_RECORDS",
191        help = "Limit the number of records in the output to the specified number"
192    )]
193    pub limit: Option<NonZeroU64>,
194    // Fragment arguments
195    #[clap(
196        long = "input-fragment",
197        action = ArgAction::SetTrue,
198        default_value = "false",
199        group = "input_fragment",
200        conflicts_with_all = ["is_input_zstd_fragment", "should_output_metadata", "dbn"],
201        help = "Interpret the input as an uncompressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
202    )]
203    pub is_input_fragment: bool,
204    #[clap(
205        long = "input-zstd-fragment",
206        action = ArgAction::SetTrue,
207        default_value = "false",
208        group = "input_fragment",
209        conflicts_with_all = ["should_output_metadata", "dbn"],
210        help = "Interpret the input as a Zstd-compressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
211    )]
212    pub is_input_zstd_fragment: bool,
213    #[clap(
214        long = "input-dbn-version",
215        help = "Specify the DBN version of the fragment. By default the fragment is assumed to be of the current version",
216        value_name = "DBN_VERSION",
217        value_parser = clap::value_parser!(u8).range(1..=3),
218        requires = "input_fragment"
219    )]
220    pub input_dbn_version_override: Option<u8>,
221    #[clap(
222        long = "schema",
223        help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas",
224        value_name = "SCHEMA"
225    )]
226    pub schema_filter: Option<Schema>,
227    #[clap(
228        long = "omit-header",
229        action = ArgAction::SetFalse,
230        default_value = "true",
231        conflicts_with_all = ["json", "dbn", "fragment"],
232        help = "Skip encoding the header. Only valid when encoding CSV or TSV."
233    )]
234    pub write_header: bool,
235}
236
237impl Args {
238    /// Consolidates the several output flag booleans into a single enum.
239    pub fn output_encoding(&self) -> OutputEncoding {
240        if self.json {
241            OutputEncoding::Json
242        } else if self.csv {
243            OutputEncoding::Csv
244        } else if self.tsv {
245            OutputEncoding::Tsv
246        } else if self.dbn {
247            OutputEncoding::Dbn
248        } else if self.fragment {
249            OutputEncoding::DbnFragment
250        } else {
251            OutputEncoding::Infer
252        }
253    }
254
255    pub fn upgrade_policy(&self) -> VersionUpgradePolicy {
256        if self.should_upgrade {
257            VersionUpgradePolicy::UpgradeToV3
258        } else {
259            VersionUpgradePolicy::AsIs
260        }
261    }
262
263    pub fn input_version(&self) -> u8 {
264        self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION)
265    }
266}
267
268pub fn infer_encoding(args: &Args) -> anyhow::Result<InferredEncoding> {
269    let compression = if args.zstd {
270        Compression::Zstd
271    } else {
272        Compression::None
273    };
274    match args.output_encoding() {
275        OutputEncoding::DbnFragment => Ok(InferredEncoding {
276            encoding: Encoding::Dbn,
277            compression,
278            delimiter: 0,
279            is_fragment: true,
280        }),
281        OutputEncoding::Dbn => Ok(InferredEncoding {
282            encoding: Encoding::Dbn,
283            compression,
284            delimiter: 0,
285            is_fragment: false,
286        }),
287        OutputEncoding::Csv => Ok(InferredEncoding {
288            encoding: Encoding::Csv,
289            compression,
290            delimiter: b',',
291            is_fragment: false,
292        }),
293        OutputEncoding::Tsv => Ok(InferredEncoding {
294            encoding: Encoding::Csv,
295            compression,
296            delimiter: b'\t',
297            is_fragment: false,
298        }),
299        OutputEncoding::Json => Ok(InferredEncoding {
300            encoding: Encoding::Json,
301            compression,
302            delimiter: 0,
303            is_fragment: false,
304        }),
305        OutputEncoding::Infer => {
306            let output = args
307                .output
308                .as_ref()
309                .map(|p| p.to_string_lossy().into_owned())
310                .or_else(|| args.output_pattern.clone());
311            if let Some(output) = output {
312                if output.ends_with(".dbn.frag.zst") {
313                    Ok(InferredEncoding {
314                        encoding: Encoding::Dbn,
315                        compression: Compression::Zstd,
316                        delimiter: 0,
317                        is_fragment: true,
318                    })
319                } else if output.ends_with(".dbn.frag") {
320                    Ok(InferredEncoding {
321                        encoding: Encoding::Dbn,
322                        compression: Compression::None,
323                        delimiter: 0,
324                        is_fragment: true,
325                    })
326                } else if output.ends_with(".dbn.zst") {
327                    Ok(InferredEncoding {
328                        encoding: Encoding::Dbn,
329                        compression: Compression::Zstd,
330                        delimiter: 0,
331                        is_fragment: false,
332                    })
333                } else if output.ends_with(".dbn") {
334                    Ok(InferredEncoding {
335                        encoding: Encoding::Dbn,
336                        compression: Compression::None,
337                        delimiter: 0,
338                        is_fragment: false,
339                    })
340                } else if output.ends_with(".csv.zst") {
341                    Ok(InferredEncoding {
342                        encoding: Encoding::Csv,
343                        compression: Compression::Zstd,
344                        delimiter: b',',
345                        is_fragment: false,
346                    })
347                } else if output.ends_with(".csv") {
348                    Ok(InferredEncoding {
349                        encoding: Encoding::Csv,
350                        compression: Compression::None,
351                        delimiter: b',',
352                        is_fragment: false,
353                    })
354                } else if output.ends_with(".tsv.zst") || output.ends_with(".xls.zst") {
355                    Ok(InferredEncoding {
356                        encoding: Encoding::Csv,
357                        compression: Compression::Zstd,
358                        delimiter: b'\t',
359                        is_fragment: false,
360                    })
361                } else if output.ends_with(".tsv") || output.ends_with(".xls") {
362                    Ok(InferredEncoding {
363                        encoding: Encoding::Csv,
364                        compression: Compression::None,
365                        delimiter: b'\t',
366                        is_fragment: false,
367                    })
368                } else if output.ends_with(".json.zst") {
369                    Ok(InferredEncoding {
370                        encoding: Encoding::Json,
371                        compression: Compression::Zstd,
372                        delimiter: 0,
373                        is_fragment: false,
374                    })
375                } else if output.ends_with(".json") {
376                    Ok(InferredEncoding {
377                        encoding: Encoding::Json,
378                        compression: Compression::None,
379                        delimiter: 0,
380                        is_fragment: false,
381                    })
382                } else {
383                    Err(anyhow!(
384                        "Unable to infer output encoding from output path '{output}'",
385                    ))
386                }
387            } else {
388                Err(anyhow!(
389                    "Unable to infer output encoding when no output was specified"
390                ))
391            }
392        }
393    }
394}
395
396/// Returns a writeable object where the `dbn` output will be directed.
397pub fn output_from_args(args: &Args) -> anyhow::Result<Box<dyn io::Write>> {
398    output(args.output.as_deref(), args.force)
399}
400
401pub fn output(output: Option<&Path>, force: bool) -> anyhow::Result<Box<dyn io::Write>> {
402    if let Some(output) = output {
403        let output_file = open_output_file(output, force)?;
404        Ok(Box::new(BufWriter::new(output_file)))
405    } else {
406        Ok(Box::new(io::stdout().lock()))
407    }
408}
409
410fn open_output_file(path: &Path, force: bool) -> anyhow::Result<File> {
411    let mut options = File::options();
412    options.write(true).truncate(true);
413    if force {
414        options.create(true);
415    } else if path.exists() {
416        return Err(anyhow!(
417            "Output file exists. Pass --force flag to overwrite the existing file."
418        ));
419    } else {
420        options.create_new(true);
421    }
422    options
423        .open(path)
424        .with_context(|| format!("Unable to open output file '{}'", path.display()))
425}
426
427#[cfg(test)]
428mod tests {
429    #![allow(clippy::too_many_arguments)]
430
431    use rstest::*;
432
433    use super::*;
434
435    #[rstest]
436    #[case(true, false, false, false, false, Encoding::Json, Compression::None, 0)]
437    #[case(
438        false,
439        true,
440        false,
441        false,
442        false,
443        Encoding::Csv,
444        Compression::None,
445        b','
446    )]
447    #[case(
448        false,
449        false,
450        true,
451        false,
452        false,
453        Encoding::Csv,
454        Compression::None,
455        b'\t'
456    )]
457    #[case(false, false, false, true, false, Encoding::Dbn, Compression::None, 0)]
458    #[case(true, false, false, false, true, Encoding::Json, Compression::Zstd, 0)]
459    #[case(
460        false,
461        true,
462        false,
463        false,
464        true,
465        Encoding::Csv,
466        Compression::Zstd,
467        b','
468    )]
469    #[case(
470        false,
471        false,
472        true,
473        false,
474        true,
475        Encoding::Csv,
476        Compression::Zstd,
477        b'\t'
478    )]
479    #[case(false, false, false, true, true, Encoding::Dbn, Compression::Zstd, 0)]
480    fn test_infer_encoding_and_compression_explicit(
481        #[case] json: bool,
482        #[case] csv: bool,
483        #[case] tsv: bool,
484        #[case] dbn: bool,
485        #[case] zstd: bool,
486        #[case] exp_enc: Encoding,
487        #[case] exp_comp: Compression,
488        #[case] exp_sep: u8,
489    ) {
490        let args = Args {
491            json,
492            csv,
493            tsv,
494            dbn,
495            zstd,
496            ..Default::default()
497        };
498        assert_eq!(
499            infer_encoding(&args).unwrap(),
500            InferredEncoding {
501                encoding: exp_enc,
502                compression: exp_comp,
503                delimiter: exp_sep,
504                is_fragment: false,
505            }
506        );
507    }
508
509    #[rstest]
510    #[case("out.json", Encoding::Json, Compression::None, 0)]
511    #[case("out.csv", Encoding::Csv, Compression::None, b',')]
512    #[case("out.tsv", Encoding::Csv, Compression::None, b'\t')]
513    #[case("out.xls", Encoding::Csv, Compression::None, b'\t')]
514    #[case("out.dbn", Encoding::Dbn, Compression::None, 0)]
515    #[case("out.json.zst", Encoding::Json, Compression::Zstd, 0)]
516    #[case("out.csv.zst", Encoding::Csv, Compression::Zstd, b',')]
517    #[case("out.tsv.zst", Encoding::Csv, Compression::Zstd, b'\t')]
518    #[case("out.xls.zst", Encoding::Csv, Compression::Zstd, b'\t')]
519    #[case("out.dbn.zst", Encoding::Dbn, Compression::Zstd, 0)]
520    fn test_infer_encoding_and_compression_inference(
521        #[case] output: &str,
522        #[case] exp_enc: Encoding,
523        #[case] exp_comp: Compression,
524        #[case] exp_sep: u8,
525    ) {
526        let args = Args {
527            output: Some(PathBuf::from(output)),
528            ..Default::default()
529        };
530        assert_eq!(
531            infer_encoding(&args).unwrap(),
532            InferredEncoding {
533                encoding: exp_enc,
534                compression: exp_comp,
535                delimiter: exp_sep,
536                is_fragment: false,
537            }
538        );
539    }
540
541    #[test]
542    fn test_infer_encoding_and_compression_bad() {
543        let args = Args {
544            output: Some(PathBuf::from("out.pb")),
545            ..Default::default()
546        };
547        assert!(
548            matches!(infer_encoding(&args), Err(e) if e.to_string().starts_with("Unable to infer"))
549        );
550    }
551}