1use std::{
2 fs::File,
3 io::{self, BufWriter},
4 num::NonZeroU64,
5 path::PathBuf,
6};
7
8use anyhow::{anyhow, Context};
9use clap::{ArgAction, Parser, ValueEnum};
10
11use dbn::{
12 enums::{Compression, Encoding},
13 Schema, VersionUpgradePolicy,
14};
15
16pub mod encode;
17pub mod filter;
18
19#[derive(Clone, Copy, Debug, ValueEnum)]
21pub enum OutputEncoding {
22 Infer,
24 Dbn,
25 Csv,
26 Tsv,
27 Json,
28 DbnFragment,
29}
30
31#[derive(Debug, Parser)]
32#[clap(version, about)]
33#[cfg_attr(test, derive(Default))]
34pub struct Args {
35 #[clap(
36 help = "One or more DBN or legacy DBZ files to decode. Passing multiple files will result in a merge. Pass '-' to read from standard input",
37 value_name = "FILE...",
38 value_delimiter = ' ',
39 num_args = 1..,
40 required = true,
41 )]
42 pub input: Vec<PathBuf>,
43 #[clap(
44 short,
45 long,
46 help = "Saves the result to FILE. If no path is specified, the output will be written to standard output",
47 value_name = "FILE"
48 )]
49 pub output: Option<PathBuf>,
50 #[clap(
51 short = 'J',
52 long,
53 action = ArgAction::SetTrue,
54 default_value = "false",
55 group = "output_encoding",
56 help = "Output the result as JSON lines"
57 )]
58 pub json: bool,
59 #[clap(
60 short = 'C',
61 long,
62 action = ArgAction::SetTrue,
63 default_value = "false",
64 group = "output_encoding",
65 help = "Output the result as CSV"
66 )]
67 pub csv: bool,
68 #[clap(
69 short = 'T',
70 long,
71 action = ArgAction::SetTrue,
72 default_value = "false",
73 group = "output_encoding",
74 help = "Output the result as tab-separated values (TSV)"
75 )]
76 pub tsv: bool,
77 #[clap(
78 short = 'D',
79 long,
80 action = ArgAction::SetTrue,
81 default_value = "false",
82 group = "output_encoding",
83 help = "Output the result as DBN"
84 )]
85 pub dbn: bool,
86 #[clap(
87 short = 'F',
88 long,
89 action = ArgAction::SetTrue,
90 default_value = "false",
91 group = "output_encoding",
92 help = "Output the result as a DBN fragment (no metadata)"
93 )]
94 pub fragment: bool,
95 #[clap(short, long, action = ArgAction::SetTrue, default_value = "false", help = "Zstd compress the output")]
96 pub zstd: bool,
97 #[clap(
98 short = 'u',
99 long = "upgrade",
100 default_value = "false",
101 action = ArgAction::SetTrue,
102 help = "Upgrade data when decoding previous DBN versions. By default data is decoded as-is."
103 )]
104 pub should_upgrade: bool,
105 #[clap(
106 short,
107 long,
108 action = ArgAction::SetTrue,
109 default_value = "false",
110 help = "Allow overwriting of existing files, such as the output file"
111 )]
112 pub force: bool,
113 #[clap(
114 short = 'm',
115 long = "metadata",
116 action = ArgAction::SetTrue,
117 default_value = "false",
118 conflicts_with_all = ["csv", "dbn", "fragment"],
119 help = "Output the metadata section instead of the body of the DBN file. Only valid for JSON output encoding"
120 )]
121 pub should_output_metadata: bool,
122 #[clap(
123 short = 'p',
124 long = "pretty",
125 action = ArgAction::SetTrue,
126 default_value = "false",
127 conflicts_with_all = ["dbn", "fragment"],
128 help ="Make the CSV or JSON output easier to read by converting timestamps to ISO 8601 and prices to decimals"
129 )]
130 pub should_pretty_print: bool,
131 #[clap(
132 short = 's',
133 long = "map-symbols",
134 action = ArgAction::SetTrue,
135 default_value = "false",
136 conflicts_with_all = ["input_fragment", "dbn", "fragment"],
137 help ="Use symbology mappings from the metadata to create a 'symbol' field mapping the instrument ID to its requested symbol."
138 )]
139 pub map_symbols: bool,
140 #[clap(
141 short = 'l',
142 long = "limit",
143 value_name = "NUM_RECORDS",
144 help = "Limit the number of records in the output to the specified number"
145 )]
146 pub limit: Option<NonZeroU64>,
147 #[clap(
149 long = "input-fragment",
150 action = ArgAction::SetTrue,
151 default_value = "false",
152 group = "input_fragment",
153 conflicts_with_all = ["is_input_zstd_fragment", "should_output_metadata", "dbn"],
154 help = "Interpret the input as an uncompressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
155 )]
156 pub is_input_fragment: bool,
157 #[clap(
158 long = "input-zstd-fragment",
159 action = ArgAction::SetTrue,
160 default_value = "false",
161 group = "input_fragment",
162 conflicts_with_all = ["should_output_metadata", "dbn"],
163 help = "Interpret the input as a Zstd-compressed DBN fragment, i.e. records without metadata. Only valid with text output encodings"
164 )]
165 pub is_input_zstd_fragment: bool,
166 #[clap(
167 long = "input-dbn-version",
168 help = "Specify the DBN version of the fragment. By default the fragment is assumed to be of the current version",
169 value_name = "DBN_VERSION",
170 value_parser = clap::value_parser!(u8).range(1..=3),
171 requires = "input_fragment"
172 )]
173 pub input_dbn_version_override: Option<u8>,
174 #[clap(
175 long = "schema",
176 help = "Only encode records of this schema. This is particularly useful for transcoding mixed-schema DBN to CSV, which doesn't support mixing schemas",
177 value_name = "SCHEMA"
178 )]
179 pub schema_filter: Option<Schema>,
180 #[clap(
181 long = "omit-header",
182 action = ArgAction::SetFalse,
183 default_value = "true",
184 conflicts_with_all = ["json", "dbn", "fragment"],
185 help = "Skip encoding the header. Only valid when encoding CSV or TSV."
186 )]
187 pub write_header: bool,
188}
189
190impl Args {
191 pub fn output_encoding(&self) -> OutputEncoding {
193 if self.json {
194 OutputEncoding::Json
195 } else if self.csv {
196 OutputEncoding::Csv
197 } else if self.tsv {
198 OutputEncoding::Tsv
199 } else if self.dbn {
200 OutputEncoding::Dbn
201 } else if self.fragment {
202 OutputEncoding::DbnFragment
203 } else {
204 OutputEncoding::Infer
205 }
206 }
207
208 pub fn upgrade_policy(&self) -> VersionUpgradePolicy {
209 if self.should_upgrade {
210 VersionUpgradePolicy::UpgradeToV3
211 } else {
212 VersionUpgradePolicy::AsIs
213 }
214 }
215
216 pub fn input_version(&self) -> u8 {
217 self.input_dbn_version_override.unwrap_or(dbn::DBN_VERSION)
218 }
219}
220
221pub fn infer_encoding(args: &Args) -> anyhow::Result<(Encoding, Compression, u8)> {
224 let compression = if args.zstd {
225 Compression::Zstd
226 } else {
227 Compression::None
228 };
229 match args.output_encoding() {
230 OutputEncoding::DbnFragment | OutputEncoding::Dbn => Ok((Encoding::Dbn, compression, 0)),
231 OutputEncoding::Csv => Ok((Encoding::Csv, compression, b',')),
232 OutputEncoding::Tsv => Ok((Encoding::Csv, compression, b'\t')),
233 OutputEncoding::Json => Ok((Encoding::Json, compression, 0)),
234 OutputEncoding::Infer => {
235 if let Some(output) = args.output.as_ref().map(|o| o.to_string_lossy()) {
236 if output.ends_with(".dbn.zst") {
237 Ok((Encoding::Dbn, Compression::Zstd, 0))
238 } else if output.ends_with(".dbn") {
239 Ok((Encoding::Dbn, Compression::None, 0))
240 } else if output.ends_with(".csv.zst") {
241 Ok((Encoding::Csv, Compression::Zstd, b','))
242 } else if output.ends_with(".csv") {
243 Ok((Encoding::Csv, Compression::None, b','))
244 } else if output.ends_with(".tsv.zst") || output.ends_with(".xls.zst") {
245 Ok((Encoding::Csv, Compression::Zstd, b'\t'))
246 } else if output.ends_with(".tsv") || output.ends_with(".xls") {
247 Ok((Encoding::Csv, Compression::None, b'\t'))
248 } else if output.ends_with(".json.zst") {
249 Ok((Encoding::Json, Compression::Zstd, 0))
250 } else if output.ends_with(".json") {
251 Ok((Encoding::Json, Compression::None, 0))
252 } else {
253 Err(anyhow!(
254 "Unable to infer output encoding from output path '{output}'",
255 ))
256 }
257 } else {
258 Err(anyhow!(
259 "Unable to infer output encoding when no output was specified"
260 ))
261 }
262 }
263 }
264}
265
266pub fn output_from_args(args: &Args) -> anyhow::Result<Box<dyn io::Write>> {
268 if let Some(output) = &args.output {
269 let output_file = open_output_file(output, args.force)?;
270 Ok(Box::new(BufWriter::new(output_file)))
271 } else {
272 Ok(Box::new(io::stdout().lock()))
273 }
274}
275
276fn open_output_file(path: &PathBuf, force: bool) -> anyhow::Result<File> {
277 let mut options = File::options();
278 options.write(true).truncate(true);
279 if force {
280 options.create(true);
281 } else if path.exists() {
282 return Err(anyhow!(
283 "Output file exists. Pass --force flag to overwrite the existing file."
284 ));
285 } else {
286 options.create_new(true);
287 }
288 options
289 .open(path)
290 .with_context(|| format!("Unable to open output file '{}'", path.display()))
291}
292
293#[cfg(test)]
294mod tests {
295 #![allow(clippy::too_many_arguments)]
296
297 use rstest::*;
298
299 use super::*;
300
301 #[rstest]
302 #[case(true, false, false, false, false, Encoding::Json, Compression::None, 0)]
303 #[case(
304 false,
305 true,
306 false,
307 false,
308 false,
309 Encoding::Csv,
310 Compression::None,
311 b','
312 )]
313 #[case(
314 false,
315 false,
316 true,
317 false,
318 false,
319 Encoding::Csv,
320 Compression::None,
321 b'\t'
322 )]
323 #[case(false, false, false, true, false, Encoding::Dbn, Compression::None, 0)]
324 #[case(true, false, false, false, true, Encoding::Json, Compression::Zstd, 0)]
325 #[case(
326 false,
327 true,
328 false,
329 false,
330 true,
331 Encoding::Csv,
332 Compression::Zstd,
333 b','
334 )]
335 #[case(
336 false,
337 false,
338 true,
339 false,
340 true,
341 Encoding::Csv,
342 Compression::Zstd,
343 b'\t'
344 )]
345 #[case(false, false, false, true, true, Encoding::Dbn, Compression::Zstd, 0)]
346 fn test_infer_encoding_and_compression_explicit(
347 #[case] json: bool,
348 #[case] csv: bool,
349 #[case] tsv: bool,
350 #[case] dbn: bool,
351 #[case] zstd: bool,
352 #[case] exp_enc: Encoding,
353 #[case] exp_comp: Compression,
354 #[case] exp_sep: u8,
355 ) {
356 let args = Args {
357 json,
358 csv,
359 tsv,
360 dbn,
361 zstd,
362 ..Default::default()
363 };
364 assert_eq!(infer_encoding(&args).unwrap(), (exp_enc, exp_comp, exp_sep));
365 }
366
367 #[rstest]
368 #[case("out.json", Encoding::Json, Compression::None, 0)]
369 #[case("out.csv", Encoding::Csv, Compression::None, b',')]
370 #[case("out.tsv", Encoding::Csv, Compression::None, b'\t')]
371 #[case("out.xls", Encoding::Csv, Compression::None, b'\t')]
372 #[case("out.dbn", Encoding::Dbn, Compression::None, 0)]
373 #[case("out.json.zst", Encoding::Json, Compression::Zstd, 0)]
374 #[case("out.csv.zst", Encoding::Csv, Compression::Zstd, b',')]
375 #[case("out.tsv.zst", Encoding::Csv, Compression::Zstd, b'\t')]
376 #[case("out.xls.zst", Encoding::Csv, Compression::Zstd, b'\t')]
377 #[case("out.dbn.zst", Encoding::Dbn, Compression::Zstd, 0)]
378 fn test_infer_encoding_and_compression_inference(
379 #[case] output: &str,
380 #[case] exp_enc: Encoding,
381 #[case] exp_comp: Compression,
382 #[case] exp_sep: u8,
383 ) {
384 let args = Args {
385 output: Some(PathBuf::from(output)),
386 ..Default::default()
387 };
388 assert_eq!(infer_encoding(&args).unwrap(), (exp_enc, exp_comp, exp_sep));
389 }
390
391 #[test]
392 fn test_infer_encoding_and_compression_bad() {
393 let args = Args {
394 output: Some(PathBuf::from("out.pb")),
395 ..Default::default()
396 };
397 assert!(
398 matches!(infer_encoding(&args), Err(e) if e.to_string().starts_with("Unable to infer"))
399 );
400 }
401}