Skip to main content

dbn_cli/
lib.rs

1use std::{
2    fs::File,
3    io::{self, BufWriter},
4    num::NonZeroU64,
5    path::{Path, PathBuf},
6};
7
8use anyhow::{anyhow, Context};
9use clap::{ArgAction, Parser, ValueEnum};
10
11use dbn::{
12    encode::SplitDuration,
13    enums::{Compression, Encoding},
14    Schema, VersionUpgradePolicy,
15};
16
17pub mod encode;
18pub mod filter;
19
20/// How the output of the `dbn` command will be encoded.
21#[derive(Clone, Copy, Debug, ValueEnum)]
22pub enum OutputEncoding {
23    /// `dbn` will infer based on the extension of the specified output file
24    Infer,
25    Dbn,
26    Csv,
27    Tsv,
28    Json,
29    DbnFragment,
30}
31
32/// How to split a DBN file
33#[derive(Clone, Copy, Debug, ValueEnum)]
34pub enum SplitBy {
35    Symbol,
36    Schema,
37    Day,
38    Week,
39    Month,
40    Year,
41}
42
43impl SplitBy {
44    pub fn duration(self) -> Option<SplitDuration> {
45        match self {
46            SplitBy::Day => Some(SplitDuration::Day),
47            SplitBy::Week => Some(SplitDuration::Week),
48            SplitBy::Month => Some(SplitDuration::Month),
49            SplitBy::Year => Some(SplitDuration::Year),
50            SplitBy::Symbol | SplitBy::Schema => None,
51        }
52    }
53}
54
55#[derive(Clone, Copy, Debug, PartialEq, Eq)]
56pub struct InferredEncoding {
57    pub encoding: Encoding,
58    pub compression: Compression,
59    pub delimiter: u8,
60    pub is_fragment: bool,
61}
62
63#[derive(Debug, Parser)]
64#[clap(name = "dbn", version, about)]
65#[cfg_attr(test, derive(Default))]
66pub struct Args {
67    #[clap(
68        help = "One or more DBN or legacy DBZ files to decode. Passing multiple files will result in a merge. Pass '-' to read from standard input",
69        value_name = "FILE...",
70        value_delimiter = ' ',
71        num_args = 1..,
72        required = true,
73    )]
74    pub input: Vec<PathBuf>,
75    #[clap(
76        short,
77        long,
78        help = "Saves the result to FILE. If no path is specified, the output will be written to standard output",
79        value_name = "FILE"
80    )]
81    pub output: Option<PathBuf>,
82    #[clap(
83        short = 'O',
84        long,
85        help = "Saves the result of file splitting to paths according to PATTERN",
86        requires = "split_by",
87        conflicts_with = "output",
88        value_name = "PATTERN"
89    )]
90    pub output_pattern: Option<String>,
91    #[clap(
92        short = 'S',
93        long,
94        help = "How to optionally split the output across files",
95        requires = "output_pattern",
96        value_name = "SPLIT_BY"
97    )]
98    pub split_by: Option<SplitBy>,
99    #[clap(
100        short = 'J',
101        long,
102        action = ArgAction::SetTrue,
103        default_value = "false",
104        group = "output_encoding",
105        help = "Output the result as JSON lines"
106    )]
107    pub json: bool,
108    #[clap(
109        short = 'C',
110        long,
111        action = ArgAction::SetTrue,
112        default_value = "false",
113        group = "output_encoding",
114        help = "Output the result as CSV"
115    )]
116    pub csv: bool,
117    #[clap(
118        short = 'T',
119        long,
120        action = ArgAction::SetTrue,
121        default_value = "false",
122        group = "output_encoding",
123        help = "Output the result as tab-separated values (TSV)"
124    )]
125    pub tsv: bool,
126    #[clap(
127        short = 'D',
128        long,
129        action = ArgAction::SetTrue,
130        default_value = "false",
131        group = "output_encoding",
132        help = "Output the result as DBN"
133    )]
134    pub dbn: bool,
135    #[clap(
136        short = 'F',
137        long,
138        action = ArgAction::SetTrue,
139        default_value = "false",
140        group = "output_encoding",
141        help = "Output the result as a DBN fragment (no metadata)"
142    )]
143    pub fragment: bool,
144    #[clap(short, long, action = ArgAction::SetTrue, default_value = "false", help = "Zstd compress the output")]
145    pub zstd: bool,
146    #[clap(
147        short = 'u',
148        long = "upgrade",
149        default_value = "false",
150        action = ArgAction::SetTrue,
151        help = "Upgrade data when decoding previous DBN versions. By default data is decoded as-is."
152    )]
153    pub should_upgrade: bool,
154    #[clap(
155        short,
156        long,
157        action = ArgAction::SetTrue,
158        default_value = "false",
159        help = "Allow overwriting of existing files, such as the output file"
160    )]
161    pub force: bool,
162    #[clap(
163        short = 'm',
164        long = "metadata",
165        action = ArgAction::SetTrue,
166        default_value = "false",
167        conflicts_with_all = ["csv", "dbn", "fragment"],
168        help = "Output the metadata section instead of the body of the DBN file. Only valid for JSON output encoding"
169    )]
170    pub should_output_metadata: bool,
171    #[clap(
172         short = 'p',
173         long = "pretty",
174         action = ArgAction::SetTrue,
175         default_value = "false",
176         conflicts_with_all = ["dbn", "fragment"],
177         help ="Make the CSV or JSON output easier to read by converting timestamps to ISO 8601 and prices to decimals"
178    )]
179    pub should_pretty_print: bool,
180    #[clap(
181         short = 's',
182         long = "map-symbols",
183         action = ArgAction::SetTrue,
184         default_value = "false",
185         conflicts_with_all = ["input_fragment", "dbn", "fragment"],
186         help ="Use symbology mappings from the metadata to create a 'symbol' field mapping the instrument ID to its requested symbol."
187    )]
188    pub map_symbols: bool,
189    #[clap(
190        short = 'l',
191        long = "limit",
192        value_name = "NUM_RECORDS",
193        help = "Limit the number of records in the output to the specified number"
194    )]
195    pub limit: Option<NonZeroU64>,
196    // Fragment arguments
197    #[clap(
198        long = "input-fragment",
199        action = ArgAction::SetTrue,
200        default_value = "false",
201        group = "input_fragment",
202        conflicts_with_all = ["is_input_zstd_fragment", "should_output_metadata", "dbn"],
203        help = "Interpret the input as an uncompressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
204    )]
205    pub is_input_fragment: bool,
206    #[clap(
207        long = "input-zstd-fragment",
208        action = ArgAction::SetTrue,
209        default_value = "false",
210        group = "input_fragment",
211        conflicts_with_all = ["should_output_metadata", "dbn"],
212        help = "Interpret the input as a Zstd-compressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
213    )]
214    pub is_input_zstd_fragment: bool,
215    #[clap(
216        long = "input-dbn-version",
217        help = "Specify the DBN version of the fragment. By default the fragment is assumed to be of the current version",
218        value_name = "DBN_VERSION",
219        value_parser = clap::value_parser!(u8).range(1..=3),
220        requires = "input_fragment"
221    )]
222    pub input_dbn_version_override: Option<u8>,
223    #[clap(
224        long = "schema",
225        help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas",
226        value_name = "SCHEMA"
227    )]
228    pub schema_filter: Option<Schema>,
229    #[clap(
230        long = "omit-header",
231        action = ArgAction::SetFalse,
232        default_value = "true",
233        conflicts_with_all = ["json", "dbn", "fragment"],
234        help = "Skip encoding the header. Only valid when encoding CSV or TSV."
235    )]
236    pub write_header: bool,
237}
238
239impl Args {
240    /// Consolidates the several output flag booleans into a single enum.
241    pub fn output_encoding(&self) -> OutputEncoding {
242        if self.json {
243            OutputEncoding::Json
244        } else if self.csv {
245            OutputEncoding::Csv
246        } else if self.tsv {
247            OutputEncoding::Tsv
248        } else if self.dbn {
249            OutputEncoding::Dbn
250        } else if self.fragment {
251            OutputEncoding::DbnFragment
252        } else {
253            OutputEncoding::Infer
254        }
255    }
256
257    pub fn upgrade_policy(&self) -> VersionUpgradePolicy {
258        if self.should_upgrade {
259            VersionUpgradePolicy::UpgradeToV3
260        } else {
261            VersionUpgradePolicy::AsIs
262        }
263    }
264
265    pub fn input_version(&self) -> u8 {
266        self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION)
267    }
268}
269
270pub fn infer_encoding(args: &Args) -> anyhow::Result<InferredEncoding> {
271    let compression = if args.zstd {
272        Compression::Zstd
273    } else {
274        Compression::None
275    };
276    match args.output_encoding() {
277        OutputEncoding::DbnFragment => Ok(InferredEncoding {
278            encoding: Encoding::Dbn,
279            compression,
280            delimiter: 0,
281            is_fragment: true,
282        }),
283        OutputEncoding::Dbn => Ok(InferredEncoding {
284            encoding: Encoding::Dbn,
285            compression,
286            delimiter: 0,
287            is_fragment: false,
288        }),
289        OutputEncoding::Csv => Ok(InferredEncoding {
290            encoding: Encoding::Csv,
291            compression,
292            delimiter: b',',
293            is_fragment: false,
294        }),
295        OutputEncoding::Tsv => Ok(InferredEncoding {
296            encoding: Encoding::Csv,
297            compression,
298            delimiter: b'\t',
299            is_fragment: false,
300        }),
301        OutputEncoding::Json => Ok(InferredEncoding {
302            encoding: Encoding::Json,
303            compression,
304            delimiter: 0,
305            is_fragment: false,
306        }),
307        OutputEncoding::Infer => {
308            let output = args
309                .output
310                .as_ref()
311                .map(|p| p.to_string_lossy().into_owned())
312                .or_else(|| args.output_pattern.clone());
313            if let Some(output) = output {
314                if output.ends_with(".dbn.frag.zst") {
315                    Ok(InferredEncoding {
316                        encoding: Encoding::Dbn,
317                        compression: Compression::Zstd,
318                        delimiter: 0,
319                        is_fragment: true,
320                    })
321                } else if output.ends_with(".dbn.frag") {
322                    Ok(InferredEncoding {
323                        encoding: Encoding::Dbn,
324                        compression: Compression::None,
325                        delimiter: 0,
326                        is_fragment: true,
327                    })
328                } else if output.ends_with(".dbn.zst") {
329                    Ok(InferredEncoding {
330                        encoding: Encoding::Dbn,
331                        compression: Compression::Zstd,
332                        delimiter: 0,
333                        is_fragment: false,
334                    })
335                } else if output.ends_with(".dbn") {
336                    Ok(InferredEncoding {
337                        encoding: Encoding::Dbn,
338                        compression: Compression::None,
339                        delimiter: 0,
340                        is_fragment: false,
341                    })
342                } else if output.ends_with(".csv.zst") {
343                    Ok(InferredEncoding {
344                        encoding: Encoding::Csv,
345                        compression: Compression::Zstd,
346                        delimiter: b',',
347                        is_fragment: false,
348                    })
349                } else if output.ends_with(".csv") {
350                    Ok(InferredEncoding {
351                        encoding: Encoding::Csv,
352                        compression: Compression::None,
353                        delimiter: b',',
354                        is_fragment: false,
355                    })
356                } else if output.ends_with(".tsv.zst") || output.ends_with(".xls.zst") {
357                    Ok(InferredEncoding {
358                        encoding: Encoding::Csv,
359                        compression: Compression::Zstd,
360                        delimiter: b'\t',
361                        is_fragment: false,
362                    })
363                } else if output.ends_with(".tsv") || output.ends_with(".xls") {
364                    Ok(InferredEncoding {
365                        encoding: Encoding::Csv,
366                        compression: Compression::None,
367                        delimiter: b'\t',
368                        is_fragment: false,
369                    })
370                } else if output.ends_with(".json.zst") {
371                    Ok(InferredEncoding {
372                        encoding: Encoding::Json,
373                        compression: Compression::Zstd,
374                        delimiter: 0,
375                        is_fragment: false,
376                    })
377                } else if output.ends_with(".json") {
378                    Ok(InferredEncoding {
379                        encoding: Encoding::Json,
380                        compression: Compression::None,
381                        delimiter: 0,
382                        is_fragment: false,
383                    })
384                } else {
385                    Err(anyhow!(
386                        "Unable to infer output encoding from output path '{output}'",
387                    ))
388                }
389            } else {
390                Err(anyhow!(
391                    "Unable to infer output encoding when no output was specified"
392                ))
393            }
394        }
395    }
396}
397
398/// Returns a writeable object where the `dbn` output will be directed.
399pub fn output_from_args(args: &Args) -> anyhow::Result<Box<dyn io::Write>> {
400    output(args.output.as_deref(), args.force)
401}
402
403pub fn output(output: Option<&Path>, force: bool) -> anyhow::Result<Box<dyn io::Write>> {
404    if let Some(output) = output {
405        let output_file = open_output_file(output, force)?;
406        Ok(Box::new(BufWriter::new(output_file)))
407    } else {
408        Ok(Box::new(io::stdout().lock()))
409    }
410}
411
412fn open_output_file(path: &Path, force: bool) -> anyhow::Result<File> {
413    let mut options = File::options();
414    options.write(true).truncate(true);
415    if force {
416        options.create(true);
417    } else if path.exists() {
418        return Err(anyhow!(
419            "Output file exists. Pass --force flag to overwrite the existing file."
420        ));
421    } else {
422        options.create_new(true);
423    }
424    options
425        .open(path)
426        .with_context(|| format!("Unable to open output file '{}'", path.display()))
427}
428
429#[cfg(test)]
430mod tests {
431    #![allow(clippy::too_many_arguments)]
432
433    use rstest::*;
434
435    use super::*;
436
437    #[rstest]
438    #[case(true, false, false, false, false, Encoding::Json, Compression::None, 0)]
439    #[case(
440        false,
441        true,
442        false,
443        false,
444        false,
445        Encoding::Csv,
446        Compression::None,
447        b','
448    )]
449    #[case(
450        false,
451        false,
452        true,
453        false,
454        false,
455        Encoding::Csv,
456        Compression::None,
457        b'\t'
458    )]
459    #[case(false, false, false, true, false, Encoding::Dbn, Compression::None, 0)]
460    #[case(true, false, false, false, true, Encoding::Json, Compression::Zstd, 0)]
461    #[case(
462        false,
463        true,
464        false,
465        false,
466        true,
467        Encoding::Csv,
468        Compression::Zstd,
469        b','
470    )]
471    #[case(
472        false,
473        false,
474        true,
475        false,
476        true,
477        Encoding::Csv,
478        Compression::Zstd,
479        b'\t'
480    )]
481    #[case(false, false, false, true, true, Encoding::Dbn, Compression::Zstd, 0)]
482    fn test_infer_encoding_and_compression_explicit(
483        #[case] json: bool,
484        #[case] csv: bool,
485        #[case] tsv: bool,
486        #[case] dbn: bool,
487        #[case] zstd: bool,
488        #[case] exp_enc: Encoding,
489        #[case] exp_comp: Compression,
490        #[case] exp_sep: u8,
491    ) {
492        let args = Args {
493            json,
494            csv,
495            tsv,
496            dbn,
497            zstd,
498            ..Default::default()
499        };
500        assert_eq!(
501            infer_encoding(&args).unwrap(),
502            InferredEncoding {
503                encoding: exp_enc,
504                compression: exp_comp,
505                delimiter: exp_sep,
506                is_fragment: false,
507            }
508        );
509    }
510
511    #[rstest]
512    #[case("out.json", Encoding::Json, Compression::None, 0)]
513    #[case("out.csv", Encoding::Csv, Compression::None, b',')]
514    #[case("out.tsv", Encoding::Csv, Compression::None, b'\t')]
515    #[case("out.xls", Encoding::Csv, Compression::None, b'\t')]
516    #[case("out.dbn", Encoding::Dbn, Compression::None, 0)]
517    #[case("out.json.zst", Encoding::Json, Compression::Zstd, 0)]
518    #[case("out.csv.zst", Encoding::Csv, Compression::Zstd, b',')]
519    #[case("out.tsv.zst", Encoding::Csv, Compression::Zstd, b'\t')]
520    #[case("out.xls.zst", Encoding::Csv, Compression::Zstd, b'\t')]
521    #[case("out.dbn.zst", Encoding::Dbn, Compression::Zstd, 0)]
522    fn test_infer_encoding_and_compression_inference(
523        #[case] output: &str,
524        #[case] exp_enc: Encoding,
525        #[case] exp_comp: Compression,
526        #[case] exp_sep: u8,
527    ) {
528        let args = Args {
529            output: Some(PathBuf::from(output)),
530            ..Default::default()
531        };
532        assert_eq!(
533            infer_encoding(&args).unwrap(),
534            InferredEncoding {
535                encoding: exp_enc,
536                compression: exp_comp,
537                delimiter: exp_sep,
538                is_fragment: false,
539            }
540        );
541    }
542
543    #[test]
544    fn test_infer_encoding_and_compression_bad() {
545        let args = Args {
546            output: Some(PathBuf::from("out.pb")),
547            ..Default::default()
548        };
549        assert!(
550            matches!(infer_encoding(&args), Err(e) if e.to_string().starts_with("Unable to infer"))
551        );
552    }
553}