use std::fs::File;
use std::io::{self, Read};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use anyhow::{Context, Result};
use arrow::datatypes::DataType;
use bytes::Bytes;
use clap::Parser;
use orc_rust::projection::ProjectionMask;
use orc_rust::reader::metadata::read_metadata;
use orc_rust::reader::ChunkReader;
use orc_rust::ArrowReaderBuilder;
use crate::common::{create_csv_writer, create_json_writer, OutputFormat, OutputWriter};
#[derive(Debug, Parser)]
#[command(about = "Export ORC data to CSV or JSON format")]
pub struct Args {
file: String,
#[arg(short, long)]
output: Option<PathBuf>,
#[arg(value_enum, short, long, default_value_t = OutputFormat::Csv)]
format: OutputFormat,
#[arg(short, long, value_name = "N")]
num_rows: Option<NonZeroUsize>,
#[arg(short, long, value_delimiter = ',')]
columns: Option<Vec<String>>,
#[arg(long, default_value_t = 8192)]
batch_size: usize,
}
pub fn run(args: Args) -> Result<()> {
let output: Box<dyn io::Write> = if let Some(ref path) = args.output {
Box::new(File::create(path).with_context(|| format!("creating {}", path.display()))?)
} else {
Box::new(io::stdout().lock())
};
if args.file == "-" {
let mut buf = Vec::new();
io::stdin().read_to_end(&mut buf).context("reading stdin")?;
let bytes = Bytes::from(buf);
run_export(bytes, &args, output)
} else {
let file = File::open(&args.file).with_context(|| format!("opening {}", args.file))?;
run_export(file, &args, output)
}
}
fn run_export<R: ChunkReader>(
mut source: R,
args: &Args,
output: Box<dyn io::Write>,
) -> Result<()> {
let projection = if let Some(selected) = &args.columns {
let metadata = read_metadata(&mut source)?;
let root_children = metadata.root_data_type().children();
let mut missing: Vec<&str> = selected
.iter()
.filter(|name| !root_children.iter().any(|c| c.name() == name.as_str()))
.map(|name| name.as_str())
.collect();
if !missing.is_empty() {
missing.sort();
anyhow::bail!("unknown column(s): {}", missing.join(", "));
}
let cols: Vec<usize> = root_children
.iter()
.filter(|nc| {
let is_selected = selected.iter().any(|c| nc.name().eq(c));
if !is_selected {
return false;
}
match nc.data_type().to_arrow_data_type() {
DataType::Binary => false,
DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
matches!(args.format, OutputFormat::Csv)
}
_ => true,
}
})
.map(|nc| nc.data_type().column_index())
.collect();
Some(ProjectionMask::roots(metadata.root_data_type(), cols))
} else {
None
};
let mut builder = ArrowReaderBuilder::try_new(source)?.with_batch_size(args.batch_size);
if let Some(proj) = projection {
builder = builder.with_projection(proj);
}
let reader = builder.build();
let mut writer: OutputWriter<Box<dyn io::Write>, _> = match args.format {
OutputFormat::Json => OutputWriter::Json(create_json_writer(output)),
OutputFormat::Csv => OutputWriter::Csv(create_csv_writer(output)),
};
let mut remaining = args.num_rows.map(NonZeroUsize::get).unwrap_or(usize::MAX);
for batch in reader {
if remaining == 0 {
break;
}
let mut batch = batch?;
if remaining < batch.num_rows() {
batch = batch.slice(0, remaining);
}
writer.write(&batch)?;
remaining = remaining.saturating_sub(batch.num_rows());
}
writer.finish().context("closing writer")?;
Ok(())
}