use anyhow::{bail, Error};
use log::info;
use odbc_api::{buffers::TextRowSet, Connection, Cursor, Environment, IntoParameter};
use std::{
fs::File,
io::{stdin, stdout, Read, Write},
path::PathBuf,
};
use structopt::{clap::ArgGroup, StructOpt};
#[derive(StructOpt)]
struct Cli {
#[structopt(short = "v", long, parse(from_occurrences))]
verbose: usize,
#[structopt(subcommand)]
command: Command,
}
#[derive(StructOpt)]
enum Command {
Query {
#[structopt(flatten)]
query_opt: QueryOpt,
},
Insert {
#[structopt(flatten)]
insert_opt: InsertOpt,
},
ListDrivers,
}
#[derive(StructOpt)]
struct ConnectOpts {
#[structopt(long, short = "c")]
connection_string: Option<String>,
#[structopt(long, conflicts_with = "connection-string")]
dsn: Option<String>,
#[structopt(long, short = "u", env = "ODBC_USER")]
user: Option<String>,
#[structopt(long, short = "p", env = "ODBC_PASSWORD", hide_env_values = true)]
password: Option<String>,
}
#[derive(StructOpt)]
struct QueryOpt {
#[structopt(flatten)]
connect_opts: ConnectOpts,
#[structopt(long, default_value = "5000")]
batch_size: u32,
#[structopt(long, short = "o")]
output: Option<PathBuf>,
query: String,
parameters: Vec<String>,
}
#[derive(StructOpt)]
struct InsertOpt {
#[structopt(flatten)]
connect_opts: ConnectOpts,
#[structopt(long, default_value = "5000")]
batch_size: u32,
#[structopt(long, short = "i")]
input: Option<PathBuf>,
table: String,
}
fn main() -> Result<(), Error> {
Cli::clap().group(
ArgGroup::with_name("source")
.required(true)
.args(&["dsn", "connection-string"]),
);
let opt = Cli::from_args();
stderrlog::new()
.module(module_path!())
.module("odbc_api")
.quiet(false)
.verbosity(opt.verbose)
.timestamp(stderrlog::Timestamp::Second)
.init()?;
let mut environment = unsafe { Environment::new() }?;
match opt.command {
Command::Query { query_opt } => {
query(&environment, &query_opt)?;
}
Command::Insert { insert_opt } => {
if insert_opt.batch_size == 0 {
bail!("batch size, must be at least 1");
}
insert(&environment, &insert_opt)?;
}
Command::ListDrivers => {
for driver_info in environment.drivers()? {
println!("{:#?}", driver_info);
}
}
}
Ok(())
}
fn open_connection<'e>(
environment: &'e Environment,
opt: &ConnectOpts,
) -> Result<Connection<'e>, odbc_api::Error> {
if let Some(dsn) = opt.dsn.as_deref() {
environment.connect(
dsn,
opt.user.as_deref().unwrap_or(""),
opt.password.as_deref().unwrap_or(""),
)
} else {
environment.connect_with_connection_string(
opt.connection_string
.as_deref()
.expect("Connection string must be specified, if dsn is not."),
)
}
}
fn query(environment: &Environment, opt: &QueryOpt) -> Result<(), Error> {
let QueryOpt {
connect_opts,
output,
parameters,
query,
batch_size,
} = opt;
let hold_stdout; let out: Box<dyn Write> = if let Some(path) = output {
Box::new(File::create(path)?)
} else {
hold_stdout = stdout();
Box::new(hold_stdout.lock())
};
let mut writer = csv::Writer::from_writer(out);
let connection = open_connection(&environment, connect_opts)?;
let params: Vec<_> = parameters
.iter()
.map(|param| param.into_parameter())
.collect();
match connection.execute(&query, params.as_slice())? {
Some(cursor) => {
let headline: Vec<String> = cursor.column_names()?.collect::<Result<_, _>>()?;
writer.write_record(headline)?;
let mut buffers = TextRowSet::for_cursor(*batch_size, &cursor)?;
let mut row_set_cursor = cursor.bind_buffer(&mut buffers)?;
let mut num_batch = 0;
while let Some(buffer) = row_set_cursor.fetch()? {
num_batch += 1;
info!(
"Fetched batch {} with {} rows.",
num_batch,
buffer.num_rows()
);
for row_index in 0..buffer.num_rows() {
let record = (0..buffer.num_cols())
.map(|col_index| buffer.at(col_index, row_index).unwrap_or(&[]));
writer.write_record(record)?;
}
}
}
None => {
eprintln!("Query came back empty (not even a schema has been returned). No output has been created.");
}
};
Ok(())
}
fn insert(environment: &Environment, insert_opt: &InsertOpt) -> Result<(), Error> {
let InsertOpt {
input,
connect_opts,
table,
batch_size,
} = insert_opt;
let hold_stdin; let input: Box<dyn Read> = if let Some(path) = input {
Box::new(File::open(path)?)
} else {
hold_stdin = stdin();
Box::new(hold_stdin.lock())
};
let mut reader = csv::Reader::from_reader(input);
let connection = open_connection(&environment, connect_opts)?;
let headline = reader.byte_headers()?;
let column_names: Vec<&str> = headline
.iter()
.map(|bytes| std::str::from_utf8(bytes))
.collect::<Result<_, _>>()?;
let columns = column_names.join(", ");
let values = column_names
.iter()
.map(|_| "?")
.collect::<Vec<_>>()
.join(", ");
let statement_text = format!("INSERT INTO {} ({}) VALUES ({});", table, columns, values);
info!("Insert statement Text: {}", statement_text);
let mut statement = connection.prepare(&statement_text)?;
let _parameter_descriptions: Vec<_> = (1..=headline.len())
.map(|parameter_number| {
statement
.describe_param(parameter_number as u16)
.map(|desc| {
info!("Column {} identified as: {:?}", parameter_number, desc);
desc
})
})
.collect::<Result<_, _>>()?;
let mut buffer = TextRowSet::new(*batch_size, (0..headline.len()).map(|_| 0));
for try_record in reader.into_byte_records() {
if buffer.num_rows() == *batch_size as usize {
statement.execute(&buffer)?;
buffer.clear();
}
let record = try_record?;
buffer.append(
record
.iter()
.map(|field| if field.is_empty() { None } else { Some(field) }),
);
}
statement.execute(&buffer)?;
Ok(())
}