mod batch_size_limit;
mod binary;
mod boolean;
mod column_strategy;
mod conversion_strategy;
mod current_file;
mod date;
mod decimal;
mod fetch_batch;
mod identical;
mod parquet_writer;
mod text;
mod time;
mod timestamp;
mod timestamp_precision;
mod timestamp_tz;
use anyhow::Error;
use fetch_batch::{fetch_strategy, FetchBatch};
use io_arg::IoArg;
use log::debug;
use odbc_api::{Cursor, IntoParameter};
use std::io::{stdin, Read};
use self::{
batch_size_limit::{BatchSizeLimit, FileSizeLimit},
column_strategy::{ColumnStrategy, MappingOptions},
conversion_strategy::ConversionStrategy,
parquet_writer::{parquet_output, ParquetWriterOptions},
};
use crate::{connection::open_connection, QueryOpt};
pub fn query(opt: QueryOpt) -> Result<(), Error> {
let QueryOpt {
connect_opts,
output,
parameters,
query,
batch_size_row,
batch_size_memory,
row_groups_per_file,
sequential_fetching,
file_size_threshold,
encoding,
prefer_varbinary,
column_compression_default,
column_compression_level_default,
parquet_column_encoding,
avoid_decimal,
driver_does_not_support_64bit_integers,
suffix_length,
no_empty_file,
column_length_limit,
} = opt;
let batch_size = BatchSizeLimit::new(batch_size_row, batch_size_memory);
let file_size = FileSizeLimit::new(row_groups_per_file, file_size_threshold);
let query = query_statement_text(query)?;
let params: Vec<_> = parameters
.iter()
.map(|param| param.as_str().into_parameter())
.collect();
let odbc_conn = open_connection(&connect_opts)?;
let db_name = odbc_conn.database_management_system_name()?;
debug!(name:display = db_name; "Database Management System");
let parquet_format_options = ParquetWriterOptions {
column_compression_default: column_compression_default
.to_compression(column_compression_level_default)?,
column_encodings: parquet_column_encoding,
file_size,
suffix_length,
no_empty_file,
};
let mapping_options = MappingOptions {
db_name: &db_name,
use_utf16: encoding.use_utf16(),
prefer_varbinary,
avoid_decimal,
driver_does_support_i64: !driver_does_not_support_64bit_integers,
column_length_limit,
};
if let Some(cursor) = odbc_conn
.into_cursor(&query, params.as_slice(), None)
.map_err(odbc_api::Error::from)?
{
cursor_to_parquet(
cursor,
output,
batch_size,
!sequential_fetching,
mapping_options,
parquet_format_options,
)?;
} else {
eprintln!(
"Query came back empty (not even a schema has been returned). No file has been created"
);
}
Ok(())
}
fn query_statement_text(query: String) -> Result<String, Error> {
Ok(if query == "-" {
let mut buf = String::new();
stdin().lock().read_to_string(&mut buf)?;
buf
} else {
query
})
}
fn cursor_to_parquet(
mut cursor: impl Cursor + Send + 'static,
path: IoArg,
batch_size: BatchSizeLimit,
concurrent_fetching: bool,
mapping_options: MappingOptions,
parquet_format_options: ParquetWriterOptions,
) -> Result<(), Error> {
let table_strategy = ConversionStrategy::new(&mut cursor, mapping_options)?;
let parquet_schema = table_strategy.parquet_schema();
let writer = parquet_output(path, parquet_schema.clone(), parquet_format_options)?;
let fetch_strategy: Box<dyn FetchBatch> =
fetch_strategy(concurrent_fetching, cursor, &table_strategy, batch_size)?;
table_strategy.block_cursor_to_parquet(fetch_strategy, writer)?;
Ok(())
}