1use std::{
2 fs::File,
3 io::{self, BufWriter},
4 num::NonZeroU64,
5 path::{Path, PathBuf},
6};
7
8use anyhow::{anyhow, Context};
9use clap::{ArgAction, Parser, ValueEnum};
10
11use dbn::{
12 encode::SplitDuration,
13 enums::{Compression, Encoding},
14 Schema, VersionUpgradePolicy,
15};
16
17pub mod encode;
18pub mod filter;
19
20#[derive(Clone, Copy, Debug, ValueEnum)]
22pub enum OutputEncoding {
23 Infer,
25 Dbn,
26 Csv,
27 Tsv,
28 Json,
29 DbnFragment,
30}
31
32#[derive(Clone, Copy, Debug, ValueEnum)]
34pub enum SplitBy {
35 Symbol,
36 Schema,
37 Day,
38 Week,
39 Month,
40 Year,
41}
42
43impl SplitBy {
44 pub fn duration(self) -> Option<SplitDuration> {
45 match self {
46 SplitBy::Day => Some(SplitDuration::Day),
47 SplitBy::Week => Some(SplitDuration::Week),
48 SplitBy::Month => Some(SplitDuration::Month),
49 SplitBy::Year => Some(SplitDuration::Year),
50 SplitBy::Symbol | SplitBy::Schema => None,
51 }
52 }
53}
54
55#[derive(Clone, Copy, Debug, PartialEq, Eq)]
56pub struct InferredEncoding {
57 pub encoding: Encoding,
58 pub compression: Compression,
59 pub delimiter: u8,
60 pub is_fragment: bool,
61}
62
63#[derive(Debug, Parser)]
64#[clap(name = "dbn", version, about)]
65#[cfg_attr(test, derive(Default))]
66pub struct Args {
67 #[clap(
68 help = "One or more DBN or legacy DBZ files to decode. Passing multiple files will result in a merge. Pass '-' to read from standard input",
69 value_name = "FILE...",
70 value_delimiter = ' ',
71 num_args = 1..,
72 required = true,
73 )]
74 pub input: Vec<PathBuf>,
75 #[clap(
76 short,
77 long,
78 help = "Saves the result to FILE. If no path is specified, the output will be written to standard output",
79 value_name = "FILE"
80 )]
81 pub output: Option<PathBuf>,
82 #[clap(
83 short = 'O',
84 long,
85 help = "Saves the result of file splitting to paths according to PATTERN",
86 requires = "split_by",
87 conflicts_with = "output",
88 value_name = "PATTERN"
89 )]
90 pub output_pattern: Option<String>,
91 #[clap(
92 short = 'S',
93 long,
94 help = "How to optionally split the output across files",
95 requires = "output_pattern",
96 value_name = "SPLIT_BY"
97 )]
98 pub split_by: Option<SplitBy>,
99 #[clap(
100 short = 'J',
101 long,
102 action = ArgAction::SetTrue,
103 default_value = "false",
104 group = "output_encoding",
105 help = "Output the result as JSON lines"
106 )]
107 pub json: bool,
108 #[clap(
109 short = 'C',
110 long,
111 action = ArgAction::SetTrue,
112 default_value = "false",
113 group = "output_encoding",
114 help = "Output the result as CSV"
115 )]
116 pub csv: bool,
117 #[clap(
118 short = 'T',
119 long,
120 action = ArgAction::SetTrue,
121 default_value = "false",
122 group = "output_encoding",
123 help = "Output the result as tab-separated values (TSV)"
124 )]
125 pub tsv: bool,
126 #[clap(
127 short = 'D',
128 long,
129 action = ArgAction::SetTrue,
130 default_value = "false",
131 group = "output_encoding",
132 help = "Output the result as DBN"
133 )]
134 pub dbn: bool,
135 #[clap(
136 short = 'F',
137 long,
138 action = ArgAction::SetTrue,
139 default_value = "false",
140 group = "output_encoding",
141 help = "Output the result as a DBN fragment (no metadata)"
142 )]
143 pub fragment: bool,
144 #[clap(short, long, action = ArgAction::SetTrue, default_value = "false", help = "Zstd compress the output")]
145 pub zstd: bool,
146 #[clap(
147 short = 'u',
148 long = "upgrade",
149 default_value = "false",
150 action = ArgAction::SetTrue,
151 help = "Upgrade data when decoding previous DBN versions. By default data is decoded as-is."
152 )]
153 pub should_upgrade: bool,
154 #[clap(
155 short,
156 long,
157 action = ArgAction::SetTrue,
158 default_value = "false",
159 help = "Allow overwriting of existing files, such as the output file"
160 )]
161 pub force: bool,
162 #[clap(
163 short = 'm',
164 long = "metadata",
165 action = ArgAction::SetTrue,
166 default_value = "false",
167 conflicts_with_all = ["csv", "dbn", "fragment"],
168 help = "Output the metadata section instead of the body of the DBN file. Only valid for JSON output encoding"
169 )]
170 pub should_output_metadata: bool,
171 #[clap(
172 short = 'p',
173 long = "pretty",
174 action = ArgAction::SetTrue,
175 default_value = "false",
176 conflicts_with_all = ["dbn", "fragment"],
177 help ="Make the CSV or JSON output easier to read by converting timestamps to ISO 8601 and prices to decimals"
178 )]
179 pub should_pretty_print: bool,
180 #[clap(
181 short = 's',
182 long = "map-symbols",
183 action = ArgAction::SetTrue,
184 default_value = "false",
185 conflicts_with_all = ["input_fragment", "dbn", "fragment"],
186 help ="Use symbology mappings from the metadata to create a 'symbol' field mapping the instrument ID to its requested symbol."
187 )]
188 pub map_symbols: bool,
189 #[clap(
190 short = 'l',
191 long = "limit",
192 value_name = "NUM_RECORDS",
193 help = "Limit the number of records in the output to the specified number"
194 )]
195 pub limit: Option<NonZeroU64>,
196 #[clap(
198 long = "input-fragment",
199 action = ArgAction::SetTrue,
200 default_value = "false",
201 group = "input_fragment",
202 conflicts_with_all = ["is_input_zstd_fragment", "should_output_metadata", "dbn"],
203 help = "Interpret the input as an uncompressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
204 )]
205 pub is_input_fragment: bool,
206 #[clap(
207 long = "input-zstd-fragment",
208 action = ArgAction::SetTrue,
209 default_value = "false",
210 group = "input_fragment",
211 conflicts_with_all = ["should_output_metadata", "dbn"],
212 help = "Interpret the input as a Zstd-compressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
213 )]
214 pub is_input_zstd_fragment: bool,
215 #[clap(
216 long = "input-dbn-version",
217 help = "Specify the DBN version of the fragment. By default the fragment is assumed to be of the current version",
218 value_name = "DBN_VERSION",
219 value_parser = clap::value_parser!(u8).range(1..=3),
220 requires = "input_fragment"
221 )]
222 pub input_dbn_version_override: Option<u8>,
223 #[clap(
224 long = "schema",
225 help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas",
226 value_name = "SCHEMA"
227 )]
228 pub schema_filter: Option<Schema>,
229 #[clap(
230 long = "omit-header",
231 action = ArgAction::SetFalse,
232 default_value = "true",
233 conflicts_with_all = ["json", "dbn", "fragment"],
234 help = "Skip encoding the header. Only valid when encoding CSV or TSV."
235 )]
236 pub write_header: bool,
237}
238
239impl Args {
240 pub fn output_encoding(&self) -> OutputEncoding {
242 if self.json {
243 OutputEncoding::Json
244 } else if self.csv {
245 OutputEncoding::Csv
246 } else if self.tsv {
247 OutputEncoding::Tsv
248 } else if self.dbn {
249 OutputEncoding::Dbn
250 } else if self.fragment {
251 OutputEncoding::DbnFragment
252 } else {
253 OutputEncoding::Infer
254 }
255 }
256
257 pub fn upgrade_policy(&self) -> VersionUpgradePolicy {
258 if self.should_upgrade {
259 VersionUpgradePolicy::UpgradeToV3
260 } else {
261 VersionUpgradePolicy::AsIs
262 }
263 }
264
265 pub fn input_version(&self) -> u8 {
266 self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION)
267 }
268}
269
270pub fn infer_encoding(args: &Args) -> anyhow::Result<InferredEncoding> {
271 let compression = if args.zstd {
272 Compression::Zstd
273 } else {
274 Compression::None
275 };
276 match args.output_encoding() {
277 OutputEncoding::DbnFragment => Ok(InferredEncoding {
278 encoding: Encoding::Dbn,
279 compression,
280 delimiter: 0,
281 is_fragment: true,
282 }),
283 OutputEncoding::Dbn => Ok(InferredEncoding {
284 encoding: Encoding::Dbn,
285 compression,
286 delimiter: 0,
287 is_fragment: false,
288 }),
289 OutputEncoding::Csv => Ok(InferredEncoding {
290 encoding: Encoding::Csv,
291 compression,
292 delimiter: b',',
293 is_fragment: false,
294 }),
295 OutputEncoding::Tsv => Ok(InferredEncoding {
296 encoding: Encoding::Csv,
297 compression,
298 delimiter: b'\t',
299 is_fragment: false,
300 }),
301 OutputEncoding::Json => Ok(InferredEncoding {
302 encoding: Encoding::Json,
303 compression,
304 delimiter: 0,
305 is_fragment: false,
306 }),
307 OutputEncoding::Infer => {
308 let output = args
309 .output
310 .as_ref()
311 .map(|p| p.to_string_lossy().into_owned())
312 .or_else(|| args.output_pattern.clone());
313 if let Some(output) = output {
314 if output.ends_with(".dbn.frag.zst") {
315 Ok(InferredEncoding {
316 encoding: Encoding::Dbn,
317 compression: Compression::Zstd,
318 delimiter: 0,
319 is_fragment: true,
320 })
321 } else if output.ends_with(".dbn.frag") {
322 Ok(InferredEncoding {
323 encoding: Encoding::Dbn,
324 compression: Compression::None,
325 delimiter: 0,
326 is_fragment: true,
327 })
328 } else if output.ends_with(".dbn.zst") {
329 Ok(InferredEncoding {
330 encoding: Encoding::Dbn,
331 compression: Compression::Zstd,
332 delimiter: 0,
333 is_fragment: false,
334 })
335 } else if output.ends_with(".dbn") {
336 Ok(InferredEncoding {
337 encoding: Encoding::Dbn,
338 compression: Compression::None,
339 delimiter: 0,
340 is_fragment: false,
341 })
342 } else if output.ends_with(".csv.zst") {
343 Ok(InferredEncoding {
344 encoding: Encoding::Csv,
345 compression: Compression::Zstd,
346 delimiter: b',',
347 is_fragment: false,
348 })
349 } else if output.ends_with(".csv") {
350 Ok(InferredEncoding {
351 encoding: Encoding::Csv,
352 compression: Compression::None,
353 delimiter: b',',
354 is_fragment: false,
355 })
356 } else if output.ends_with(".tsv.zst") || output.ends_with(".xls.zst") {
357 Ok(InferredEncoding {
358 encoding: Encoding::Csv,
359 compression: Compression::Zstd,
360 delimiter: b'\t',
361 is_fragment: false,
362 })
363 } else if output.ends_with(".tsv") || output.ends_with(".xls") {
364 Ok(InferredEncoding {
365 encoding: Encoding::Csv,
366 compression: Compression::None,
367 delimiter: b'\t',
368 is_fragment: false,
369 })
370 } else if output.ends_with(".json.zst") {
371 Ok(InferredEncoding {
372 encoding: Encoding::Json,
373 compression: Compression::Zstd,
374 delimiter: 0,
375 is_fragment: false,
376 })
377 } else if output.ends_with(".json") {
378 Ok(InferredEncoding {
379 encoding: Encoding::Json,
380 compression: Compression::None,
381 delimiter: 0,
382 is_fragment: false,
383 })
384 } else {
385 Err(anyhow!(
386 "Unable to infer output encoding from output path '{output}'",
387 ))
388 }
389 } else {
390 Err(anyhow!(
391 "Unable to infer output encoding when no output was specified"
392 ))
393 }
394 }
395 }
396}
397
398pub fn output_from_args(args: &Args) -> anyhow::Result<Box<dyn io::Write>> {
400 output(args.output.as_deref(), args.force)
401}
402
403pub fn output(output: Option<&Path>, force: bool) -> anyhow::Result<Box<dyn io::Write>> {
404 if let Some(output) = output {
405 let output_file = open_output_file(output, force)?;
406 Ok(Box::new(BufWriter::new(output_file)))
407 } else {
408 Ok(Box::new(io::stdout().lock()))
409 }
410}
411
412fn open_output_file(path: &Path, force: bool) -> anyhow::Result<File> {
413 let mut options = File::options();
414 options.write(true).truncate(true);
415 if force {
416 options.create(true);
417 } else if path.exists() {
418 return Err(anyhow!(
419 "Output file exists. Pass --force flag to overwrite the existing file."
420 ));
421 } else {
422 options.create_new(true);
423 }
424 options
425 .open(path)
426 .with_context(|| format!("Unable to open output file '{}'", path.display()))
427}
428
429#[cfg(test)]
430mod tests {
431 #![allow(clippy::too_many_arguments)]
432
433 use rstest::*;
434
435 use super::*;
436
437 #[rstest]
438 #[case(true, false, false, false, false, Encoding::Json, Compression::None, 0)]
439 #[case(
440 false,
441 true,
442 false,
443 false,
444 false,
445 Encoding::Csv,
446 Compression::None,
447 b','
448 )]
449 #[case(
450 false,
451 false,
452 true,
453 false,
454 false,
455 Encoding::Csv,
456 Compression::None,
457 b'\t'
458 )]
459 #[case(false, false, false, true, false, Encoding::Dbn, Compression::None, 0)]
460 #[case(true, false, false, false, true, Encoding::Json, Compression::Zstd, 0)]
461 #[case(
462 false,
463 true,
464 false,
465 false,
466 true,
467 Encoding::Csv,
468 Compression::Zstd,
469 b','
470 )]
471 #[case(
472 false,
473 false,
474 true,
475 false,
476 true,
477 Encoding::Csv,
478 Compression::Zstd,
479 b'\t'
480 )]
481 #[case(false, false, false, true, true, Encoding::Dbn, Compression::Zstd, 0)]
482 fn test_infer_encoding_and_compression_explicit(
483 #[case] json: bool,
484 #[case] csv: bool,
485 #[case] tsv: bool,
486 #[case] dbn: bool,
487 #[case] zstd: bool,
488 #[case] exp_enc: Encoding,
489 #[case] exp_comp: Compression,
490 #[case] exp_sep: u8,
491 ) {
492 let args = Args {
493 json,
494 csv,
495 tsv,
496 dbn,
497 zstd,
498 ..Default::default()
499 };
500 assert_eq!(
501 infer_encoding(&args).unwrap(),
502 InferredEncoding {
503 encoding: exp_enc,
504 compression: exp_comp,
505 delimiter: exp_sep,
506 is_fragment: false,
507 }
508 );
509 }
510
511 #[rstest]
512 #[case("out.json", Encoding::Json, Compression::None, 0)]
513 #[case("out.csv", Encoding::Csv, Compression::None, b',')]
514 #[case("out.tsv", Encoding::Csv, Compression::None, b'\t')]
515 #[case("out.xls", Encoding::Csv, Compression::None, b'\t')]
516 #[case("out.dbn", Encoding::Dbn, Compression::None, 0)]
517 #[case("out.json.zst", Encoding::Json, Compression::Zstd, 0)]
518 #[case("out.csv.zst", Encoding::Csv, Compression::Zstd, b',')]
519 #[case("out.tsv.zst", Encoding::Csv, Compression::Zstd, b'\t')]
520 #[case("out.xls.zst", Encoding::Csv, Compression::Zstd, b'\t')]
521 #[case("out.dbn.zst", Encoding::Dbn, Compression::Zstd, 0)]
522 fn test_infer_encoding_and_compression_inference(
523 #[case] output: &str,
524 #[case] exp_enc: Encoding,
525 #[case] exp_comp: Compression,
526 #[case] exp_sep: u8,
527 ) {
528 let args = Args {
529 output: Some(PathBuf::from(output)),
530 ..Default::default()
531 };
532 assert_eq!(
533 infer_encoding(&args).unwrap(),
534 InferredEncoding {
535 encoding: exp_enc,
536 compression: exp_comp,
537 delimiter: exp_sep,
538 is_fragment: false,
539 }
540 );
541 }
542
543 #[test]
544 fn test_infer_encoding_and_compression_bad() {
545 let args = Args {
546 output: Some(PathBuf::from("out.pb")),
547 ..Default::default()
548 };
549 assert!(
550 matches!(infer_encoding(&args), Err(e) if e.to_string().starts_with("Unable to infer"))
551 );
552 }
553}