use anyhow::Result;
use clap::Parser;
use rusqlite::Connection;
use sqlite2parquet::*;
use std::collections::HashMap;
use std::io::Write;
use std::path::{Path, PathBuf};
#[derive(Parser)]
pub struct Opts {
pub sqlite: PathBuf,
pub out_dir: PathBuf,
#[structopt(long)]
pub config: Option<PathBuf>,
#[structopt(long, short)]
pub table: Vec<String>,
#[structopt(long, short, default_value = "1000000")]
pub group_size: usize,
#[structopt(long)]
pub include_schema: bool,
}
fn main() -> anyhow::Result<()> {
let opts = Opts::parse();
tracing_subscriber::fmt::init();
let mut config: HashMap<String, Vec<Column>> = if let Some(path) = opts.config {
serde_yaml::from_reader(std::fs::File::open(path)?)?
} else {
HashMap::default()
};
let conn = rusqlite::Connection::open(&opts.sqlite)?;
let mut tables: Vec<String> = if !opts.table.is_empty() {
opts.table
} else if !config.is_empty() {
config.keys().cloned().collect()
} else {
let mut table_info = conn.prepare(
"SELECT name
FROM sqlite_schema
WHERE type = 'table'
AND name NOT LIKE 'sqlite_%'",
)?;
let x = table_info
.query_map([], |row| row.get::<_, String>(0))?
.collect::<rusqlite::Result<_>>()?;
x
};
if opts.include_schema {
tables.push("sqlite_schema".to_string());
}
std::fs::create_dir_all(&opts.out_dir)?;
for table in tables {
let out = opts.out_dir.join(format!("{}.parquet", &table));
let config = config.remove(&table);
mk_table(&conn, &table, &out, config, opts.group_size)?;
}
Ok(())
}
const COLUMN_HEADER: &str =
"Column Physical type Encoding Logical type SQL";
fn mk_table(
conn: &Connection,
table: &str,
outpath: &Path,
config: Option<Vec<Column>>,
group_size: usize,
) -> Result<()> {
print!("Counting rows...");
std::io::stdout().flush()?;
let n_rows: u64 = if let Some(config) = config.as_ref() {
conn.query_row(
&format!("SELECT COUNT(1) FROM ({})", config[0].query),
[],
|row| row.get(0),
)?
} else {
conn.query_row(&format!("SELECT COUNT(1) FROM {}", table), [], |row| {
row.get(0)
})?
};
println!(" {n_rows}");
let cols: Vec<Column> = if let Some(cols) = config {
println!(" {}", COLUMN_HEADER);
for col in &cols {
println!(" {}", col);
}
cols
} else {
println!("Inferring schema for {table}...");
println!(" {}", COLUMN_HEADER);
let t_start = std::time::Instant::now();
let cols = sqlite2parquet::infer_schema(conn, table)?
.inspect(|col| {
if let Ok(col) = col {
println!(" {}", col)
}
})
.collect::<Result<Vec<_>>>()?;
println!("Inferred schema in {:?}", t_start.elapsed());
cols
};
let total = Progress {
n_cols: cols.len() as u64,
n_rows,
n_groups: (n_rows + group_size as u64 - 1) / group_size as u64,
};
let group_size = group_size.max(1);
println!("Group size: {}", group_size);
let outfile = std::fs::File::create(outpath)?;
let t_start = std::time::Instant::now();
let metadata = sqlite2parquet::write_table_with_progress(
conn,
table,
&cols,
outfile,
group_size,
|written| print_progress(written, total, group_size, t_start.elapsed(), false),
)?;
let final_prog = Progress {
n_cols: total.n_cols,
n_rows: metadata.num_rows as u64,
n_groups: metadata.row_groups.len() as u64,
};
print_progress(final_prog, final_prog, group_size, t_start.elapsed(), true)?;
summarize(&cols, metadata);
Ok(())
}
fn print_progress(
written: Progress,
total: Progress,
group_size: usize,
time: std::time::Duration,
finished: bool,
) -> Result<()> {
use crossterm::*;
let out = std::io::stderr();
let mut out = out.lock();
let this_group = (total.n_rows - written.n_rows).min(group_size as u64) as f64;
let pc = (written.n_rows as f64 + this_group * written.n_cols as f64 / total.n_cols as f64)
/ total.n_rows as f64
* 100.0;
out.queue(cursor::MoveToColumn(1))?
.queue(terminal::Clear(terminal::ClearType::CurrentLine))?
.queue(style::Print(format_args!(
"[{:.2}%] Wrote {}{} rows as {} group{} in {:.1?}{}",
pc,
written.n_rows,
if finished {
"".to_string()
} else {
format!(" of {} ", total.n_rows)
},
written.n_groups,
if written.n_groups == 1 { "" } else { "s" },
time,
if finished { "\n" } else { "..." },
)))?
.flush()?;
Ok(())
}
fn summarize(cols: &[Column], metadata: parquet::format::FileMetaData) {
fn fmt_bytes(bytes: i64) -> String {
use thousands::Separable;
format!("{:>9} KiB", (bytes / 1024).separate_with_commas())
}
let mut total_bytes = 0;
let mut by_col_bytes = cols.iter().map(|_| 0).collect::<Vec<_>>();
for group in &metadata.row_groups {
total_bytes += group.total_byte_size;
for (meta, col_bytes) in group.columns.iter().zip(&mut by_col_bytes) {
if let Some(meta) = &meta.meta_data {
*col_bytes += meta.total_compressed_size;
}
}
}
println!("Total {}", fmt_bytes(total_bytes));
for (col, col_bytes) in cols.iter().zip(by_col_bytes) {
println!(
" {:20} {} ({:>2.0}%)",
col.name,
fmt_bytes(col_bytes),
col_bytes as f64 / total_bytes as f64 * 100.0,
);
}
}