use anyhow::{Context, Result};
use bytes::Bytes;
use parx_rs::{Compression, ParxWriter};
use tracing::info;
use url::Url;
use super::storage;
#[derive(Debug, Clone)]
pub struct BuildOptions {
pub compression: CompressionOption,
}
impl Default for BuildOptions {
fn default() -> Self {
Self {
compression: CompressionOption::None,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionOption {
None,
Auto,
Algorithm(Compression),
}
impl std::str::FromStr for CompressionOption {
type Err = String;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"none" | "off" | "false" => Ok(CompressionOption::None),
"auto" => Ok(CompressionOption::Auto),
"zstd" | "zstandard" => Ok(CompressionOption::Algorithm(Compression::Zstd)),
"lz4" => Ok(CompressionOption::Algorithm(Compression::Lz4)),
"gzip" | "gz" => Ok(CompressionOption::Algorithm(Compression::Gzip)),
_ => Err(format!(
"unknown compression: {}. Valid options: none, auto, zstd, lz4, gzip",
s
)),
}
}
}
pub async fn run_with_options(
input: &str,
output: Option<&str>,
options: BuildOptions,
) -> Result<()> {
info!("Building PARX sidecar for: {}", input);
let output_path = match output {
Some(p) => p.to_string(),
None => format!("{}.parx", input),
};
let (mut writer, source_size, footer_size) = if is_remote_path(input) {
let (footer_bytes, source_size) = storage::extract_parquet_footer(input)
.await
.context("Failed to extract Parquet footer")?;
let footer_size = footer_bytes.len();
let mut writer = ParxWriter::new();
writer.set_source_uri(input);
writer.set_source_size(source_size);
writer.set_footer_owned(footer_bytes);
(writer, source_size, footer_size)
} else {
let writer = ParxWriter::from_parquet_file(input)
.with_context(|| format!("Failed to read local Parquet file: {}", input))?;
let source_size = writer.source_size();
let footer_size = writer.footer_size();
(writer, source_size, footer_size)
};
info!(
"Extracted footer: {} bytes from {} byte file",
footer_size, source_size
);
match options.compression {
CompressionOption::None => {}
CompressionOption::Auto => {
writer.auto_compress();
}
CompressionOption::Algorithm(algo) => {
writer.set_compression(algo);
}
}
let actual_compression = writer.compression();
let parx_bytes = writer.finish();
info!("Generated PARX file: {} bytes", parx_bytes.len());
storage::write_all(&output_path, Bytes::from(parx_bytes))
.await
.context("Failed to write PARX file")?;
println!("Created: {}", output_path);
println!(" Source size: {} bytes", source_size);
println!(" Footer size: {} bytes", footer_size);
if let Some(compression) = actual_compression {
let compression_label = match options.compression {
CompressionOption::Auto => format!("{} (auto)", compression),
_ => format!("{}", compression),
};
println!(" Compression: {}", compression_label);
}
Ok(())
}
fn is_remote_path(path: &str) -> bool {
Url::parse(path).is_ok_and(|url| url.scheme().len() > 1)
}