1use std::{
2 fs::File,
3 io::{self, BufWriter},
4 num::NonZeroU64,
5 path::{Path, PathBuf},
6};
7
8use anyhow::{anyhow, Context};
9use clap::{ArgAction, Parser, ValueEnum};
10
11use dbn::{
12 encode::SplitDuration,
13 enums::{Compression, Encoding},
14 Schema, VersionUpgradePolicy,
15};
16
17pub mod encode;
18pub mod filter;
19
20#[derive(Clone, Copy, Debug, ValueEnum)]
22pub enum OutputEncoding {
23 Infer,
25 Dbn,
26 Csv,
27 Tsv,
28 Json,
29 DbnFragment,
30}
31
32#[derive(Clone, Copy, Debug, ValueEnum)]
34pub enum SplitBy {
35 Symbol,
36 Schema,
37 Day,
38 Week,
39 Month,
40}
41
42impl SplitBy {
43 pub fn duration(self) -> Option<SplitDuration> {
44 match self {
45 SplitBy::Day => Some(SplitDuration::Day),
46 SplitBy::Week => Some(SplitDuration::Week),
47 SplitBy::Month => Some(SplitDuration::Month),
48 SplitBy::Symbol | SplitBy::Schema => None,
49 }
50 }
51}
52
53#[derive(Clone, Copy, Debug, PartialEq, Eq)]
54pub struct InferredEncoding {
55 pub encoding: Encoding,
56 pub compression: Compression,
57 pub delimiter: u8,
58 pub is_fragment: bool,
59}
60
61#[derive(Debug, Parser)]
62#[clap(name = "dbn", version, about)]
63#[cfg_attr(test, derive(Default))]
64pub struct Args {
65 #[clap(
66 help = "One or more DBN or legacy DBZ files to decode. Passing multiple files will result in a merge. Pass '-' to read from standard input",
67 value_name = "FILE...",
68 value_delimiter = ' ',
69 num_args = 1..,
70 required = true,
71 )]
72 pub input: Vec<PathBuf>,
73 #[clap(
74 short,
75 long,
76 help = "Saves the result to FILE. If no path is specified, the output will be written to standard output",
77 value_name = "FILE"
78 )]
79 pub output: Option<PathBuf>,
80 #[clap(
81 short = 'O',
82 long,
83 help = "Saves the result of file splitting to paths according to PATTERN",
84 requires = "split_by",
85 conflicts_with = "output",
86 value_name = "PATTERN"
87 )]
88 pub output_pattern: Option<String>,
89 #[clap(
90 short = 'S',
91 long,
92 help = "How to optionally split the output across files",
93 requires = "output_pattern",
94 value_name = "SPLIT_BY"
95 )]
96 pub split_by: Option<SplitBy>,
97 #[clap(
98 short = 'J',
99 long,
100 action = ArgAction::SetTrue,
101 default_value = "false",
102 group = "output_encoding",
103 help = "Output the result as JSON lines"
104 )]
105 pub json: bool,
106 #[clap(
107 short = 'C',
108 long,
109 action = ArgAction::SetTrue,
110 default_value = "false",
111 group = "output_encoding",
112 help = "Output the result as CSV"
113 )]
114 pub csv: bool,
115 #[clap(
116 short = 'T',
117 long,
118 action = ArgAction::SetTrue,
119 default_value = "false",
120 group = "output_encoding",
121 help = "Output the result as tab-separated values (TSV)"
122 )]
123 pub tsv: bool,
124 #[clap(
125 short = 'D',
126 long,
127 action = ArgAction::SetTrue,
128 default_value = "false",
129 group = "output_encoding",
130 help = "Output the result as DBN"
131 )]
132 pub dbn: bool,
133 #[clap(
134 short = 'F',
135 long,
136 action = ArgAction::SetTrue,
137 default_value = "false",
138 group = "output_encoding",
139 help = "Output the result as a DBN fragment (no metadata)"
140 )]
141 pub fragment: bool,
142 #[clap(short, long, action = ArgAction::SetTrue, default_value = "false", help = "Zstd compress the output")]
143 pub zstd: bool,
144 #[clap(
145 short = 'u',
146 long = "upgrade",
147 default_value = "false",
148 action = ArgAction::SetTrue,
149 help = "Upgrade data when decoding previous DBN versions. By default data is decoded as-is."
150 )]
151 pub should_upgrade: bool,
152 #[clap(
153 short,
154 long,
155 action = ArgAction::SetTrue,
156 default_value = "false",
157 help = "Allow overwriting of existing files, such as the output file"
158 )]
159 pub force: bool,
160 #[clap(
161 short = 'm',
162 long = "metadata",
163 action = ArgAction::SetTrue,
164 default_value = "false",
165 conflicts_with_all = ["csv", "dbn", "fragment"],
166 help = "Output the metadata section instead of the body of the DBN file. Only valid for JSON output encoding"
167 )]
168 pub should_output_metadata: bool,
169 #[clap(
170 short = 'p',
171 long = "pretty",
172 action = ArgAction::SetTrue,
173 default_value = "false",
174 conflicts_with_all = ["dbn", "fragment"],
175 help ="Make the CSV or JSON output easier to read by converting timestamps to ISO 8601 and prices to decimals"
176 )]
177 pub should_pretty_print: bool,
178 #[clap(
179 short = 's',
180 long = "map-symbols",
181 action = ArgAction::SetTrue,
182 default_value = "false",
183 conflicts_with_all = ["input_fragment", "dbn", "fragment"],
184 help ="Use symbology mappings from the metadata to create a 'symbol' field mapping the instrument ID to its requested symbol."
185 )]
186 pub map_symbols: bool,
187 #[clap(
188 short = 'l',
189 long = "limit",
190 value_name = "NUM_RECORDS",
191 help = "Limit the number of records in the output to the specified number"
192 )]
193 pub limit: Option<NonZeroU64>,
194 #[clap(
196 long = "input-fragment",
197 action = ArgAction::SetTrue,
198 default_value = "false",
199 group = "input_fragment",
200 conflicts_with_all = ["is_input_zstd_fragment", "should_output_metadata", "dbn"],
201 help = "Interpret the input as an uncompressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
202 )]
203 pub is_input_fragment: bool,
204 #[clap(
205 long = "input-zstd-fragment",
206 action = ArgAction::SetTrue,
207 default_value = "false",
208 group = "input_fragment",
209 conflicts_with_all = ["should_output_metadata", "dbn"],
210 help = "Interpret the input as a Zstd-compressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
211 )]
212 pub is_input_zstd_fragment: bool,
213 #[clap(
214 long = "input-dbn-version",
215 help = "Specify the DBN version of the fragment. By default the fragment is assumed to be of the current version",
216 value_name = "DBN_VERSION",
217 value_parser = clap::value_parser!(u8).range(1..=3),
218 requires = "input_fragment"
219 )]
220 pub input_dbn_version_override: Option<u8>,
221 #[clap(
222 long = "schema",
223 help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas",
224 value_name = "SCHEMA"
225 )]
226 pub schema_filter: Option<Schema>,
227 #[clap(
228 long = "omit-header",
229 action = ArgAction::SetFalse,
230 default_value = "true",
231 conflicts_with_all = ["json", "dbn", "fragment"],
232 help = "Skip encoding the header. Only valid when encoding CSV or TSV."
233 )]
234 pub write_header: bool,
235}
236
237impl Args {
238 pub fn output_encoding(&self) -> OutputEncoding {
240 if self.json {
241 OutputEncoding::Json
242 } else if self.csv {
243 OutputEncoding::Csv
244 } else if self.tsv {
245 OutputEncoding::Tsv
246 } else if self.dbn {
247 OutputEncoding::Dbn
248 } else if self.fragment {
249 OutputEncoding::DbnFragment
250 } else {
251 OutputEncoding::Infer
252 }
253 }
254
255 pub fn upgrade_policy(&self) -> VersionUpgradePolicy {
256 if self.should_upgrade {
257 VersionUpgradePolicy::UpgradeToV3
258 } else {
259 VersionUpgradePolicy::AsIs
260 }
261 }
262
263 pub fn input_version(&self) -> u8 {
264 self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION)
265 }
266}
267
268pub fn infer_encoding(args: &Args) -> anyhow::Result<InferredEncoding> {
269 let compression = if args.zstd {
270 Compression::Zstd
271 } else {
272 Compression::None
273 };
274 match args.output_encoding() {
275 OutputEncoding::DbnFragment => Ok(InferredEncoding {
276 encoding: Encoding::Dbn,
277 compression,
278 delimiter: 0,
279 is_fragment: true,
280 }),
281 OutputEncoding::Dbn => Ok(InferredEncoding {
282 encoding: Encoding::Dbn,
283 compression,
284 delimiter: 0,
285 is_fragment: false,
286 }),
287 OutputEncoding::Csv => Ok(InferredEncoding {
288 encoding: Encoding::Csv,
289 compression,
290 delimiter: b',',
291 is_fragment: false,
292 }),
293 OutputEncoding::Tsv => Ok(InferredEncoding {
294 encoding: Encoding::Csv,
295 compression,
296 delimiter: b'\t',
297 is_fragment: false,
298 }),
299 OutputEncoding::Json => Ok(InferredEncoding {
300 encoding: Encoding::Json,
301 compression,
302 delimiter: 0,
303 is_fragment: false,
304 }),
305 OutputEncoding::Infer => {
306 let output = args
307 .output
308 .as_ref()
309 .map(|p| p.to_string_lossy().into_owned())
310 .or_else(|| args.output_pattern.clone());
311 if let Some(output) = output {
312 if output.ends_with(".dbn.frag.zst") {
313 Ok(InferredEncoding {
314 encoding: Encoding::Dbn,
315 compression: Compression::Zstd,
316 delimiter: 0,
317 is_fragment: true,
318 })
319 } else if output.ends_with(".dbn.frag") {
320 Ok(InferredEncoding {
321 encoding: Encoding::Dbn,
322 compression: Compression::None,
323 delimiter: 0,
324 is_fragment: true,
325 })
326 } else if output.ends_with(".dbn.zst") {
327 Ok(InferredEncoding {
328 encoding: Encoding::Dbn,
329 compression: Compression::Zstd,
330 delimiter: 0,
331 is_fragment: false,
332 })
333 } else if output.ends_with(".dbn") {
334 Ok(InferredEncoding {
335 encoding: Encoding::Dbn,
336 compression: Compression::None,
337 delimiter: 0,
338 is_fragment: false,
339 })
340 } else if output.ends_with(".csv.zst") {
341 Ok(InferredEncoding {
342 encoding: Encoding::Csv,
343 compression: Compression::Zstd,
344 delimiter: b',',
345 is_fragment: false,
346 })
347 } else if output.ends_with(".csv") {
348 Ok(InferredEncoding {
349 encoding: Encoding::Csv,
350 compression: Compression::None,
351 delimiter: b',',
352 is_fragment: false,
353 })
354 } else if output.ends_with(".tsv.zst") || output.ends_with(".xls.zst") {
355 Ok(InferredEncoding {
356 encoding: Encoding::Csv,
357 compression: Compression::Zstd,
358 delimiter: b'\t',
359 is_fragment: false,
360 })
361 } else if output.ends_with(".tsv") || output.ends_with(".xls") {
362 Ok(InferredEncoding {
363 encoding: Encoding::Csv,
364 compression: Compression::None,
365 delimiter: b'\t',
366 is_fragment: false,
367 })
368 } else if output.ends_with(".json.zst") {
369 Ok(InferredEncoding {
370 encoding: Encoding::Json,
371 compression: Compression::Zstd,
372 delimiter: 0,
373 is_fragment: false,
374 })
375 } else if output.ends_with(".json") {
376 Ok(InferredEncoding {
377 encoding: Encoding::Json,
378 compression: Compression::None,
379 delimiter: 0,
380 is_fragment: false,
381 })
382 } else {
383 Err(anyhow!(
384 "Unable to infer output encoding from output path '{output}'",
385 ))
386 }
387 } else {
388 Err(anyhow!(
389 "Unable to infer output encoding when no output was specified"
390 ))
391 }
392 }
393 }
394}
395
396pub fn output_from_args(args: &Args) -> anyhow::Result<Box<dyn io::Write>> {
398 output(args.output.as_deref(), args.force)
399}
400
401pub fn output(output: Option<&Path>, force: bool) -> anyhow::Result<Box<dyn io::Write>> {
402 if let Some(output) = output {
403 let output_file = open_output_file(output, force)?;
404 Ok(Box::new(BufWriter::new(output_file)))
405 } else {
406 Ok(Box::new(io::stdout().lock()))
407 }
408}
409
410fn open_output_file(path: &Path, force: bool) -> anyhow::Result<File> {
411 let mut options = File::options();
412 options.write(true).truncate(true);
413 if force {
414 options.create(true);
415 } else if path.exists() {
416 return Err(anyhow!(
417 "Output file exists. Pass --force flag to overwrite the existing file."
418 ));
419 } else {
420 options.create_new(true);
421 }
422 options
423 .open(path)
424 .with_context(|| format!("Unable to open output file '{}'", path.display()))
425}
426
427#[cfg(test)]
428mod tests {
429 #![allow(clippy::too_many_arguments)]
430
431 use rstest::*;
432
433 use super::*;
434
435 #[rstest]
436 #[case(true, false, false, false, false, Encoding::Json, Compression::None, 0)]
437 #[case(
438 false,
439 true,
440 false,
441 false,
442 false,
443 Encoding::Csv,
444 Compression::None,
445 b','
446 )]
447 #[case(
448 false,
449 false,
450 true,
451 false,
452 false,
453 Encoding::Csv,
454 Compression::None,
455 b'\t'
456 )]
457 #[case(false, false, false, true, false, Encoding::Dbn, Compression::None, 0)]
458 #[case(true, false, false, false, true, Encoding::Json, Compression::Zstd, 0)]
459 #[case(
460 false,
461 true,
462 false,
463 false,
464 true,
465 Encoding::Csv,
466 Compression::Zstd,
467 b','
468 )]
469 #[case(
470 false,
471 false,
472 true,
473 false,
474 true,
475 Encoding::Csv,
476 Compression::Zstd,
477 b'\t'
478 )]
479 #[case(false, false, false, true, true, Encoding::Dbn, Compression::Zstd, 0)]
480 fn test_infer_encoding_and_compression_explicit(
481 #[case] json: bool,
482 #[case] csv: bool,
483 #[case] tsv: bool,
484 #[case] dbn: bool,
485 #[case] zstd: bool,
486 #[case] exp_enc: Encoding,
487 #[case] exp_comp: Compression,
488 #[case] exp_sep: u8,
489 ) {
490 let args = Args {
491 json,
492 csv,
493 tsv,
494 dbn,
495 zstd,
496 ..Default::default()
497 };
498 assert_eq!(
499 infer_encoding(&args).unwrap(),
500 InferredEncoding {
501 encoding: exp_enc,
502 compression: exp_comp,
503 delimiter: exp_sep,
504 is_fragment: false,
505 }
506 );
507 }
508
509 #[rstest]
510 #[case("out.json", Encoding::Json, Compression::None, 0)]
511 #[case("out.csv", Encoding::Csv, Compression::None, b',')]
512 #[case("out.tsv", Encoding::Csv, Compression::None, b'\t')]
513 #[case("out.xls", Encoding::Csv, Compression::None, b'\t')]
514 #[case("out.dbn", Encoding::Dbn, Compression::None, 0)]
515 #[case("out.json.zst", Encoding::Json, Compression::Zstd, 0)]
516 #[case("out.csv.zst", Encoding::Csv, Compression::Zstd, b',')]
517 #[case("out.tsv.zst", Encoding::Csv, Compression::Zstd, b'\t')]
518 #[case("out.xls.zst", Encoding::Csv, Compression::Zstd, b'\t')]
519 #[case("out.dbn.zst", Encoding::Dbn, Compression::Zstd, 0)]
520 fn test_infer_encoding_and_compression_inference(
521 #[case] output: &str,
522 #[case] exp_enc: Encoding,
523 #[case] exp_comp: Compression,
524 #[case] exp_sep: u8,
525 ) {
526 let args = Args {
527 output: Some(PathBuf::from(output)),
528 ..Default::default()
529 };
530 assert_eq!(
531 infer_encoding(&args).unwrap(),
532 InferredEncoding {
533 encoding: exp_enc,
534 compression: exp_comp,
535 delimiter: exp_sep,
536 is_fragment: false,
537 }
538 );
539 }
540
541 #[test]
542 fn test_infer_encoding_and_compression_bad() {
543 let args = Args {
544 output: Some(PathBuf::from("out.pb")),
545 ..Default::default()
546 };
547 assert!(
548 matches!(infer_encoding(&args), Err(e) if e.to_string().starts_with("Unable to infer"))
549 );
550 }
551}