dbn_cli/
lib.rs

1use std::{
2    fs::File,
3    io::{self, BufWriter},
4    num::NonZeroU64,
5    path::PathBuf,
6};
7
8use anyhow::{anyhow, Context};
9use clap::{ArgAction, Parser, ValueEnum};
10
11use dbn::{
12    enums::{Compression, Encoding},
13    Schema, VersionUpgradePolicy,
14};
15
16pub mod encode;
17pub mod filter;
18
19/// How the output of the `dbn` command will be encoded.
20#[derive(Clone, Copy, Debug, ValueEnum)]
21pub enum OutputEncoding {
22    /// `dbn` will infer based on the extension of the specified output file
23    Infer,
24    Dbn,
25    Csv,
26    Tsv,
27    Json,
28    DbnFragment,
29}
30
31#[derive(Debug, Parser)]
32#[clap(version, about)]
33#[cfg_attr(test, derive(Default))]
34pub struct Args {
35    #[clap(
36        help = "One or more DBN or legacy DBZ files to decode. Passing multiple files will result in a merge. Pass '-' to read from standard input",
37        value_name = "FILE...",
38        value_delimiter = ' ',
39        num_args = 1..,
40        required = true,
41    )]
42    pub input: Vec<PathBuf>,
43    #[clap(
44        short,
45        long,
46        help = "Saves the result to FILE. If no path is specified, the output will be written to standard output",
47        value_name = "FILE"
48    )]
49    pub output: Option<PathBuf>,
50    #[clap(
51        short = 'J',
52        long,
53        action = ArgAction::SetTrue,
54        default_value = "false",
55        group = "output_encoding",
56        help = "Output the result as JSON lines"
57    )]
58    pub json: bool,
59    #[clap(
60        short = 'C',
61        long,
62        action = ArgAction::SetTrue,
63        default_value = "false",
64        group = "output_encoding",
65        help = "Output the result as CSV"
66    )]
67    pub csv: bool,
68    #[clap(
69        short = 'T',
70        long,
71        action = ArgAction::SetTrue,
72        default_value = "false",
73        group = "output_encoding",
74        help = "Output the result as tab-separated values (TSV)"
75    )]
76    pub tsv: bool,
77    #[clap(
78        short = 'D',
79        long,
80        action = ArgAction::SetTrue,
81        default_value = "false",
82        group = "output_encoding",
83        help = "Output the result as DBN"
84    )]
85    pub dbn: bool,
86    #[clap(
87        short = 'F',
88        long,
89        action = ArgAction::SetTrue,
90        default_value = "false",
91        group = "output_encoding",
92        help = "Output the result as a DBN fragment (no metadata)"
93    )]
94    pub fragment: bool,
95    #[clap(short, long, action = ArgAction::SetTrue, default_value = "false", help = "Zstd compress the output")]
96    pub zstd: bool,
97    #[clap(
98        short = 'u',
99        long = "upgrade",
100        default_value = "false",
101        action = ArgAction::SetTrue,
102        help = "Upgrade data when decoding previous DBN versions. By default data is decoded as-is."
103    )]
104    pub should_upgrade: bool,
105    #[clap(
106        short,
107        long,
108        action = ArgAction::SetTrue,
109        default_value = "false",
110        help = "Allow overwriting of existing files, such as the output file"
111    )]
112    pub force: bool,
113    #[clap(
114        short = 'm',
115        long = "metadata",
116        action = ArgAction::SetTrue,
117        default_value = "false",
118        conflicts_with_all = ["csv", "dbn", "fragment"],
119        help = "Output the metadata section instead of the body of the DBN file. Only valid for JSON output encoding"
120    )]
121    pub should_output_metadata: bool,
122    #[clap(
123         short = 'p',
124         long = "pretty",
125         action = ArgAction::SetTrue,
126         default_value = "false",
127         conflicts_with_all = ["dbn", "fragment"],
128         help ="Make the CSV or JSON output easier to read by converting timestamps to ISO 8601 and prices to decimals"
129    )]
130    pub should_pretty_print: bool,
131    #[clap(
132         short = 's',
133         long = "map-symbols",
134         action = ArgAction::SetTrue,
135         default_value = "false",
136         conflicts_with_all = ["input_fragment", "dbn", "fragment"],
137         help ="Use symbology mappings from the metadata to create a 'symbol' field mapping the instrument ID to its requested symbol."
138    )]
139    pub map_symbols: bool,
140    #[clap(
141        short = 'l',
142        long = "limit",
143        value_name = "NUM_RECORDS",
144        help = "Limit the number of records in the output to the specified number"
145    )]
146    pub limit: Option<NonZeroU64>,
147    // Fragment arguments
148    #[clap(
149        long = "input-fragment",
150        action = ArgAction::SetTrue,
151        default_value = "false",
152        group = "input_fragment",
153        conflicts_with_all = ["is_input_zstd_fragment", "should_output_metadata", "dbn"],
154        help = "Interpret the input as an uncompressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
155    )]
156    pub is_input_fragment: bool,
157    #[clap(
158        long = "input-zstd-fragment",
159        action = ArgAction::SetTrue,
160        default_value = "false",
161        group = "input_fragment",
162        conflicts_with_all = ["should_output_metadata", "dbn"],
163        help = "Interpret the input as a Zstd-compressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
164    )]
165    pub is_input_zstd_fragment: bool,
166    #[clap(
167        long = "input-dbn-version",
168        help = "Specify the DBN version of the fragment. By default the fragment is assumed to be of the current version",
169        value_name = "DBN_VERSION",
170        value_parser = clap::value_parser!(u8).range(1..=3),
171        requires = "input_fragment"
172    )]
173    pub input_dbn_version_override: Option<u8>,
174    #[clap(
175        long = "schema",
176        help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas",
177        value_name = "SCHEMA"
178    )]
179    pub schema_filter: Option<Schema>,
180    #[clap(
181        long = "omit-header",
182        action = ArgAction::SetFalse,
183        default_value = "true",
184        conflicts_with_all = ["json", "dbn", "fragment"],
185        help = "Skip encoding the header. Only valid when encoding CSV or TSV."
186    )]
187    pub write_header: bool,
188}
189
190impl Args {
191    /// Consolidates the several output flag booleans into a single enum.
192    pub fn output_encoding(&self) -> OutputEncoding {
193        if self.json {
194            OutputEncoding::Json
195        } else if self.csv {
196            OutputEncoding::Csv
197        } else if self.tsv {
198            OutputEncoding::Tsv
199        } else if self.dbn {
200            OutputEncoding::Dbn
201        } else if self.fragment {
202            OutputEncoding::DbnFragment
203        } else {
204            OutputEncoding::Infer
205        }
206    }
207
208    pub fn upgrade_policy(&self) -> VersionUpgradePolicy {
209        if self.should_upgrade {
210            VersionUpgradePolicy::UpgradeToV3
211        } else {
212            VersionUpgradePolicy::AsIs
213        }
214    }
215
216    pub fn input_version(&self) -> u8 {
217        self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION)
218    }
219}
220
221/// Infer the [`Encoding`], [`Compression`], and delimiter (CSV/TSV) from `args` if they
222/// aren't already explicitly set.
223pub fn infer_encoding(args: &Args) -> anyhow::Result<(Encoding, Compression, u8)> {
224    let compression = if args.zstd {
225        Compression::Zstd
226    } else {
227        Compression::None
228    };
229    match args.output_encoding() {
230        OutputEncoding::DbnFragment | OutputEncoding::Dbn => Ok((Encoding::Dbn, compression, 0)),
231        OutputEncoding::Csv => Ok((Encoding::Csv, compression, b',')),
232        OutputEncoding::Tsv => Ok((Encoding::Csv, compression, b'\t')),
233        OutputEncoding::Json => Ok((Encoding::Json, compression, 0)),
234        OutputEncoding::Infer => {
235            if let Some(output) = args.output.as_ref().map(|o| o.to_string_lossy()) {
236                if output.ends_with(".dbn.zst") {
237                    Ok((Encoding::Dbn, Compression::Zstd, 0))
238                } else if output.ends_with(".dbn") {
239                    Ok((Encoding::Dbn, Compression::None, 0))
240                } else if output.ends_with(".csv.zst") {
241                    Ok((Encoding::Csv, Compression::Zstd, b','))
242                } else if output.ends_with(".csv") {
243                    Ok((Encoding::Csv, Compression::None, b','))
244                } else if output.ends_with(".tsv.zst") || output.ends_with(".xls.zst") {
245                    Ok((Encoding::Csv, Compression::Zstd, b'\t'))
246                } else if output.ends_with(".tsv") || output.ends_with(".xls") {
247                    Ok((Encoding::Csv, Compression::None, b'\t'))
248                } else if output.ends_with(".json.zst") {
249                    Ok((Encoding::Json, Compression::Zstd, 0))
250                } else if output.ends_with(".json") {
251                    Ok((Encoding::Json, Compression::None, 0))
252                } else {
253                    Err(anyhow!(
254                        "Unable to infer output encoding from output path '{output}'",
255                    ))
256                }
257            } else {
258                Err(anyhow!(
259                    "Unable to infer output encoding when no output was specified"
260                ))
261            }
262        }
263    }
264}
265
266/// Returns a writeable object where the `dbn` output will be directed.
267pub fn output_from_args(args: &Args) -> anyhow::Result<Box<dyn io::Write>> {
268    if let Some(output) = &args.output {
269        let output_file = open_output_file(output, args.force)?;
270        Ok(Box::new(BufWriter::new(output_file)))
271    } else {
272        Ok(Box::new(io::stdout().lock()))
273    }
274}
275
276fn open_output_file(path: &PathBuf, force: bool) -> anyhow::Result<File> {
277    let mut options = File::options();
278    options.write(true).truncate(true);
279    if force {
280        options.create(true);
281    } else if path.exists() {
282        return Err(anyhow!(
283            "Output file exists. Pass --force flag to overwrite the existing file."
284        ));
285    } else {
286        options.create_new(true);
287    }
288    options
289        .open(path)
290        .with_context(|| format!("Unable to open output file '{}'", path.display()))
291}
292
293#[cfg(test)]
294mod tests {
295    #![allow(clippy::too_many_arguments)]
296
297    use rstest::*;
298
299    use super::*;
300
301    #[rstest]
302    #[case(true, false, false, false, false, Encoding::Json, Compression::None, 0)]
303    #[case(
304        false,
305        true,
306        false,
307        false,
308        false,
309        Encoding::Csv,
310        Compression::None,
311        b','
312    )]
313    #[case(
314        false,
315        false,
316        true,
317        false,
318        false,
319        Encoding::Csv,
320        Compression::None,
321        b'\t'
322    )]
323    #[case(false, false, false, true, false, Encoding::Dbn, Compression::None, 0)]
324    #[case(true, false, false, false, true, Encoding::Json, Compression::Zstd, 0)]
325    #[case(
326        false,
327        true,
328        false,
329        false,
330        true,
331        Encoding::Csv,
332        Compression::Zstd,
333        b','
334    )]
335    #[case(
336        false,
337        false,
338        true,
339        false,
340        true,
341        Encoding::Csv,
342        Compression::Zstd,
343        b'\t'
344    )]
345    #[case(false, false, false, true, true, Encoding::Dbn, Compression::Zstd, 0)]
346    fn test_infer_encoding_and_compression_explicit(
347        #[case] json: bool,
348        #[case] csv: bool,
349        #[case] tsv: bool,
350        #[case] dbn: bool,
351        #[case] zstd: bool,
352        #[case] exp_enc: Encoding,
353        #[case] exp_comp: Compression,
354        #[case] exp_sep: u8,
355    ) {
356        let args = Args {
357            json,
358            csv,
359            tsv,
360            dbn,
361            zstd,
362            ..Default::default()
363        };
364        assert_eq!(infer_encoding(&args).unwrap(), (exp_enc, exp_comp, exp_sep));
365    }
366
367    #[rstest]
368    #[case("out.json", Encoding::Json, Compression::None, 0)]
369    #[case("out.csv", Encoding::Csv, Compression::None, b',')]
370    #[case("out.tsv", Encoding::Csv, Compression::None, b'\t')]
371    #[case("out.xls", Encoding::Csv, Compression::None, b'\t')]
372    #[case("out.dbn", Encoding::Dbn, Compression::None, 0)]
373    #[case("out.json.zst", Encoding::Json, Compression::Zstd, 0)]
374    #[case("out.csv.zst", Encoding::Csv, Compression::Zstd, b',')]
375    #[case("out.tsv.zst", Encoding::Csv, Compression::Zstd, b'\t')]
376    #[case("out.xls.zst", Encoding::Csv, Compression::Zstd, b'\t')]
377    #[case("out.dbn.zst", Encoding::Dbn, Compression::Zstd, 0)]
378    fn test_infer_encoding_and_compression_inference(
379        #[case] output: &str,
380        #[case] exp_enc: Encoding,
381        #[case] exp_comp: Compression,
382        #[case] exp_sep: u8,
383    ) {
384        let args = Args {
385            output: Some(PathBuf::from(output)),
386            ..Default::default()
387        };
388        assert_eq!(infer_encoding(&args).unwrap(), (exp_enc, exp_comp, exp_sep));
389    }
390
391    #[test]
392    fn test_infer_encoding_and_compression_bad() {
393        let args = Args {
394            output: Some(PathBuf::from("out.pb")),
395            ..Default::default()
396        };
397        assert!(
398            matches!(infer_encoding(&args), Err(e) if e.to_string().starts_with("Unable to infer"))
399        );
400    }
401}