1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
mod connection;
mod enum_args;
mod execute;
mod input;
mod insert;
mod parquet_buffer;
mod query;
use crate::enum_args::{column_encoding_from_str, EncodingArgument};
use anyhow::{bail, Error};
use bytesize::ByteSize;
use connection::ConnectOpts;
use enum_args::CompressionVariants;
use io_arg::IoArg;
use odbc_api::environment;
use parquet::basic::Encoding;
use std::path::PathBuf;
use stderrlog::ColorChoice;
use clap::{ArgAction, Args, CommandFactory, Parser};
use clap_complete::{generate, Shell};
/// Query an ODBC data source at store the result in a Parquet file.
#[derive(Parser)]
#[clap(version)]
struct Cli {
/// Only print errors to standard error stream. Suppresses warnings and all other log levels
/// independent of the verbose mode.
#[arg(short = 'q', long)]
quiet: bool,
/// Verbose mode (-v, -vv, -vvv, etc)
///
/// 'v': Info level logging
/// 'vv': Debug level logging
/// 'vvv': Trace level logging
#[arg(short = 'v', long, action = ArgAction::Count)]
verbose: u8,
#[arg(long)]
/// Never emit colors.
///
/// Controls the colors of the log output. If specified the log output will never be colored.
/// If not specified the tool will try to emit Colors, but not force it. If `TERM=dumb` or
/// `NO_COLOR` is defined, then colors will not be used.
no_color: bool,
#[command(subcommand)]
command: Command,
}
#[derive(Parser)]
enum Command {
/// Query a data source and write the result as parquet.
Query {
#[clap(flatten)]
query_opt: QueryOpt,
},
/// List available drivers and their attributes.
ListDrivers,
/// List preconfigured data sources. Useful to find data source name to connect to database.
ListDataSources,
/// Read the content of a parquet and insert it into a table.
Insert {
#[clap(flatten)]
insert_opt: InsertOpt,
},
/// Generate shell completions
Completions {
#[arg(long, short = 'o', default_value = "-")]
/// Output file. Defaults to `-` which means standard output.
output: IoArg,
/// Name of the shell to generate completions for.
shell: Shell,
},
/// Executes an arbitrary SQL statement using the contents of an parquet file as input arrays.
Exec {
#[clap(flatten)]
exec_opt: ExecOpt,
},
}
#[derive(Args)]
pub struct QueryOpt {
#[clap(flatten)]
connect_opts: ConnectOpts,
/// Size of a single batch in rows. The content of the data source is written into the output
/// parquet files in batches. This way the content does never need to be materialized completely
/// in memory at once. If `--batch-size-memory` is not specified this value defaults to 65535.
/// This avoids issues with some ODBC drivers using 16Bit integers to represent batch sizes. If
/// `--batch-size-memory` is specified no other limit is applied by default. If both option are
/// specified the batch size is the largest possible which satisfies both constraints.
#[arg(long)]
batch_size_row: Option<usize>,
/// Limits the size of a single batch. It does so by calculating the amount of memory each row
/// requires in the allocated buffers and then limits the maximum number of rows so that the
/// maximum buffer size comes as close as possible, but does not exceed the specified amount.
/// Default is 2GiB on 64 Bit platforms and 1GiB on 32 Bit Platforms if `--batch-size-row` is
/// not specified. If `--batch-size-row` is not specified no memory limit is applied by default.
/// If both option are specified the batch size is the largest possible which satisfies both
/// constraints. This option controls the size of the buffers of data in transit, and therefore
/// the memory usage of this tool. It indirectly controls the size of the row groups written to
/// parquet (since each batch is written as one row group). It is hard to make a generic
/// statement about how much smaller the average row group will be.
/// This options allows you to specify the memory usage using SI units. So you can pass `2Gib`,
/// `600Mb` and so on.
#[arg(long)]
batch_size_memory: Option<ByteSize>,
/// Maximum number of batches in a single output parquet file. If this option is omitted or 0 a
/// single output file is produces. Otherwise each output file is closed after the maximum
/// number of batches have been written and a new one with the suffix `_n` is started. There n
/// is the of the produced output file starting at one for the first one. E.g. `out_01.par`,
/// `out_2.par`, ...
#[arg(long, default_value = "0")]
row_groups_per_file: u32,
/// Trade speed for memory. If `true`, only one fetch buffer is allocated. It usually takes way
/// more memory than the buffers required to write into parquet, since it contains the data
/// uncompressed and must be able to hold the largest possible value of fields, even if the
/// actual data is small. So only using one instead of two usually halfes the required memory,
/// yet it blocks fetching the next batch from the database, until the contents of the current
/// one have been written. This can slow down the creation of parquet up to a factor of two in
/// in case writing to parquet takes just as much time as fetching from the database. Usually
/// io to the database is the bottlneck so the actual slow down is likely lower, but often still
/// significant.
#[arg(long)]
sequential_fetching: bool,
/// Then the size of the currently written parquet files goes beyond this threshold the current
/// row group will be finished and then the file will be closed. So the file will be somewhat
/// larger than the threshold. All further row groups will be written into new files to which
/// the threshold size limit is applied as well. If this option is not set, no size threshold is
/// applied. If the threshold is applied the first file name will have the suffix `_01`, the
/// second the suffix `_2` and so on. Therefore, the first resulting file will be called e.g.
/// `out_1.par`, if `out.par` has been specified as the output argument.
/// Also note that this option will not act as an upper bound. It will act as a lower bound for
/// all but the last file, all others however will not be larger than this threshold by more
/// than the size of one row group. You can use the `batch_size_row` and `batch_size_memory`
/// options to control the size of the row groups. Do not expect the `batch_size_memory` however
/// to be equal to the row group size. The row group size depends on the actual data in the
/// database, and is due to compression likely much smaller. Values of this option can be
/// specified in SI units. E.g. `--file-size-threshold 1GiB`.
#[arg(long)]
file_size_threshold: Option<ByteSize>,
/// You can use this to limit the transfer buffer size which is used for an individual variadic
/// sized column.
///
/// This is useful in situations there ODBC would require us to allocate a ridiculous amount of
/// memory for a single element of a row. Usually this is the case because the Database schema
/// has been ill-defined (like choosing `TEXT` for a username, although a users name is
/// unlikely to be several GB long). Another situation is that the ODBC driver is not good at
/// reporting the maximum length and therefore reports a really large value. The third option is
/// of course that your values are actually large. In this case you just need a ton of memory.
/// You can use the batch size limit though to retrieve less at once. For binary columns this is
/// a maximum element length in bytes. For text columns it depends on whether UTF-8 or UTF-16
/// encoding is used. See documentation of the `encoding` option. In case of UTF-8 this is the
/// maximum length in bytes for an element. In case of UTF-16 the binary length is multiplied by
/// two. This allows domain experts to configure limits (roughly) in the domain of how many
/// letters do I expect in this column, rather than to care about whether the command is
/// executed on Linux or Windows. The encoding of the column on the Database does not matter for
/// this setting or determining buffer sizes.
#[arg(long, default_value = "4096")]
column_length_limit: usize,
/// Default compression used by the parquet file writer.
#[arg(long, value_enum, default_value = "zstd")]
column_compression_default: CompressionVariants,
/// The `gzip`, `zstd` and `brotli` compression variants allow for specifying an explicit
/// compression level. If the selected compression variant does not support an explicit
/// compression level this option is ignored.
///
/// Default compression level for `zstd` is 3
#[arg(long)]
column_compression_level_default: Option<u32>,
/// Encoding used for character data requested from the data source.
///
/// `Utf16`: The tool will use 16Bit characters for requesting text from the data source,
/// implying the use of UTF-16 encoding. This should work well independent of the system
/// configuration, but implies additional work since text is always stored as UTF-8 in parquet.
///
/// `System`: The tool will use 8Bit characters for requesting text from the data source,
/// implying the use of the encoding from the system locale. This only works for non ASCII
/// characters if the locales character set is UTF-8.
///
/// `Auto`: Since on OS-X and Linux the default locales character set is always UTF-8 the
/// default option is the same as `System` on non-windows platforms. On windows the default is
/// `Utf16`.
#[arg(long, value_enum, default_value = "Auto", ignore_case = true)]
encoding: EncodingArgument,
/// Map `BINARY` SQL columns to `BYTE_ARRAY` instead of `FIXED_LEN_BYTE_ARRAY`. This flag has
/// been introduced in an effort to increase the compatibility of the output with Apache Spark.
#[clap(long)]
prefer_varbinary: bool,
/// Specify the fallback encoding of the parquet output column. You can parse multiple values
/// in format `COLUMN:ENCODING`. `ENCODING` must be one of: `plain`, `delta-binary-packed`,
/// `delta-byte-array`, `delta-length-byte-array` or `rle`.
#[arg(
long,
value_parser=column_encoding_from_str,
action = ArgAction::Append
)]
parquet_column_encoding: Vec<(String, Encoding)>,
/// Tells the odbc2parquet, that the ODBC driver does not support binding 64-Bit integers (aka
/// S_C_BIGINT in ODBC speak). This will cause the odbc2parquet to query large integers as text
/// instead and convert them to 64-Bit integers itself. Setting this flag will not affect the
/// output, but may incur a performance penalty. In case you are using an Oracle Database it
/// can make queries work which did not before, because Oracle does not support 64-Bit integers.
#[clap(long)]
driver_does_not_support_64bit_integers: bool,
/// Use this flag if you want to avoid the logical type DECIMAL in the produced output. E.g.
/// because you want to process it with polars which does not support DECIMAL. In case the scale
/// of the relational Decimal type is 0, the output will be mapped to either 32Bit or 64Bit
/// Integeres with logical type none. If the scale is not 0 the Decimal column will be fetches
/// as text.
#[clap(long)]
avoid_decimal: bool,
/// In case fetch results gets split into multiple files a suffix with a number will be appended
/// to each file name. Default suffix length is 2 leading to suffixes like e.g. `_03`. In case
/// you would expect thousands of files in your output you may want to set this to say `4` so
/// the zeros pad this to a 4 digit number in order to make the filenames more friendly for
/// lexical sorting.
#[clap(long, default_value = "2")]
suffix_length: usize,
/// In case the query comes back with a result set, but now rows, by default a file with only
/// schema information is still created. If you do not want to create any file in case the
/// result set is empty you can set this flag.
#[clap(long)]
no_empty_file: bool,
/// Name of the output parquet file. Use `-` to indicate that the output should be written to
/// standard out instead. This option does nothing if the output is written to standard out.
output: IoArg,
/// Query executed against the ODBC data source. Question marks (`?`) can be used as
/// placeholders for positional parameters. E.g. "SELECT Name FROM Employees WHERE salary > ?;".
/// Instead of passing a query verbatim, you may pass a plain dash (`-`), to indicate that the
/// query should be read from standard input. In this case the entire input until EOF will be
/// considered the query.
query: String,
/// For each placeholder question mark (`?`) in the query text one parameter must be passed at
/// the end of the command line.
parameters: Vec<String>,
}
#[derive(Args)]
pub struct InsertOpt {
#[clap(flatten)]
connect_opts: ConnectOpts,
/// Encoding used for transferring character data to the database.
///
/// `Utf16`: Use 16Bit characters to send text to the database, which implies the using
/// UTF-16 encoding. This should work well independent of the system configuration, but requires
/// additional work since text is always stored as UTF-8 in parquet.
///
/// `System`: Use 8Bit characters for requesting text from the data source, implies using
/// the encoding defined by the system locale. This only works for non ASCII characters if the
/// locales character set is UTF-8.
///
/// `Auto`: Since on OS-X and Linux the default locales character set is always UTF-8 the
/// default option is the same as `System` on non-windows platforms. On windows the default is
/// `Utf16`.
#[arg(long, value_enum, default_value = "Auto", ignore_case = true)]
encoding: EncodingArgument,
/// Path to the input parquet file which is used to fill the database table with values.
input: PathBuf,
/// Name of the table to insert the values into. No precautions against SQL injection are
/// taken. The insert statement is created by the tool. It will only work if the column names
/// are the same in the parquet file and the database.
table: String,
}
#[derive(Args)]
pub struct ExecOpt {
#[clap(flatten)]
connect_opts: ConnectOpts,
/// Encoding used for transferring character data to the database.
///
/// `Utf16`: Use 16Bit characters to send text to the database, which implies the using
/// UTF-16 encoding. This should work well independent of the system configuration, but requires
/// additional work since text is always stored as UTF-8 in parquet.
///
/// `System`: Use 8Bit characters for requesting text from the data source, implies using
/// the encoding defined by the system locale. This only works for non ASCII characters if the
/// locales character set is UTF-8.
///
/// `Auto`: Since on OS-X and Linux the default locales character set is always UTF-8 the
/// default option is the same as `System` on non-windows platforms. On windows the default is
/// `Utf16`.
#[arg(long, value_enum, default_value = "Auto", ignore_case = true)]
encoding: EncodingArgument,
/// Path to the input parquet file which is used to fill the database table with values.
input: PathBuf,
/// SQL statement to execute. You can bind the columns of the parquet file to input parameters
/// of the statement. You can do this by using the column name of the parquet file surrounded by
/// question marks (`?`). E.g. `INSERT INTO table (col1, col2) VALUES (?col1?, ?col2?)`. In case
/// you want to use the `?` in a capacity different from a placeholder it must be escaped with a
/// backslash (`\?`). Backslashes must also be escaped with another backslash. Keep in mind that
/// your shell may also need escaping for backslashes. You may need four backslashes in total to
/// write a singe backslash in e.g. a string literal (`\\\\`).
statement: String,
}
impl Cli {
/// Perform some validation logic, beyond what is possible (or sensible) to verify directly with
/// clap.
pub fn perform_extra_validation(&self) -> Result<(), Error> {
if let Command::Query { query_opt } = &self.command {
if !query_opt.output.is_file() {
if query_opt.file_size_threshold.is_some() {
bail!("file-size-threshold conflicts with specifying stdout ('-') as output.")
}
if query_opt.row_groups_per_file != 0 {
bail!("row-groups-per-file conflicts with specifying stdout ('-') as output.")
}
}
}
Ok(())
}
}
fn main() -> Result<(), Error> {
let opt = Cli::parse();
opt.perform_extra_validation()?;
let verbose = if opt.quiet {
// Log errors, but nothing else
0
} else {
// Log warnings and one additional log level for each `-v` passed in the command line.
opt.verbose as usize + 1
};
let color_choice = if opt.no_color {
ColorChoice::Never
} else {
ColorChoice::Auto
};
// Initialize logging
stderrlog::new()
.module(module_path!())
.module("odbc_api")
.quiet(false) // Even if `opt.quiet` is true, we still want to print errors
.verbosity(verbose)
.color(color_choice)
.timestamp(stderrlog::Timestamp::Second)
.init()
.unwrap();
// Initialize ODBC environment used to create the connection to the Database. We now use the
// singleton pattern with `environment`. This makes our life easier if using concurrent fetching
// since it allows us to create an environment with a 'static lifetime. From this point forward
// in the application, we may assume that calls to environment are succesful, since any error
// creating the environment must occur now.
let odbc_env = environment()?;
match opt.command {
Command::Query { query_opt } => {
query::query(query_opt)?;
}
Command::Insert { insert_opt } => {
insert::insert(&insert_opt)?;
}
Command::ListDrivers => {
for driver_info in odbc_env.drivers()? {
println!("{}", driver_info.description);
for (key, value) in &driver_info.attributes {
println!("\t{key}={value}");
}
println!()
}
}
Command::ListDataSources => {
let mut first = true;
for data_source_info in odbc_env.data_sources()? {
// After first item, always place an additional newline in between.
if first {
first = false;
} else {
println!()
}
println!("Server name: {}", data_source_info.server_name);
println!("Driver: {}", data_source_info.driver);
}
}
Command::Completions { shell, output } => {
let output = output.open_as_output()?;
let mut output = output.into_write();
generate(shell, &mut Cli::command(), "odbc2parquet", &mut output);
}
Command::Exec { exec_opt } => {
execute::execute(&exec_opt)?;
}
}
Ok(())
}