use anyhow::{Context, Result};
use bytes::Bytes;
use futures::stream::{self, StreamExt, TryStreamExt};
use parx_rs::ParxWriter;
use parx_rs::{ParxBundleReader, ParxBundleWriter, BUNDLE_FILENAME};
use std::time::{Duration, UNIX_EPOCH};
use tracing::info;
use super::storage;
const MAX_CONCURRENCY: usize = 10;
#[derive(Debug, Clone, Copy)]
pub struct PageIndexPolicy {
pub enabled: bool,
pub max_per_file_bytes: usize,
pub max_total_bytes: usize,
}
pub async fn build(input_dir: &str, output: Option<&str>, policy: PageIndexPolicy) -> Result<()> {
info!("Building bundle from directory: {}", input_dir);
let output_path = match output {
Some(p) => p.to_string(),
None => {
let dir = input_dir.trim_end_matches('/');
format!("{}/{}", dir, BUNDLE_FILENAME)
}
};
let parquet_files = storage::list_files(input_dir, Some(".parquet"))
.await
.context("Failed to list Parquet files")?;
if parquet_files.is_empty() {
anyhow::bail!("No Parquet files found in {}", input_dir);
}
info!("Found {} Parquet files", parquet_files.len());
let mut writer = ParxBundleWriter::new();
let results: Vec<(String, u64, Bytes, Option<Bytes>)> = stream::iter(parquet_files.iter())
.map(|parquet_path| async move {
let (footer_bytes, source_size) =
storage::extract_parquet_footer(parquet_path)
.await
.with_context(|| format!("Failed to extract footer from {}", parquet_path))?;
let relative_path = get_relative_path(input_dir, parquet_path);
info!(
"Extracted: {} ({} bytes footer)",
relative_path,
footer_bytes.len()
);
let page_index_bytes = if policy.enabled {
let page_index_bytes =
storage::extract_parquet_page_indexes(parquet_path, &footer_bytes)
.await
.with_context(|| {
format!("Failed to extract page indexes from {}", parquet_path)
})?;
Some(page_index_bytes)
} else {
None
};
Ok::<_, anyhow::Error>((relative_path, source_size, footer_bytes, page_index_bytes))
})
.buffer_unordered(MAX_CONCURRENCY)
.try_collect()
.await?;
let mut total_footer_bytes = 0usize;
let mut total_page_index_bytes = 0usize;
let mut page_index_files_included = 0usize;
let mut page_index_files_skipped_per_file = 0usize;
let mut page_index_files_skipped_total_cap = 0usize;
for (relative_path, source_size, footer_bytes, page_index_bytes) in results {
total_footer_bytes += footer_bytes.len();
let selected_page_indexes = if let Some(page_index_bytes) = page_index_bytes {
if page_index_bytes.is_empty() {
Bytes::new()
} else if page_index_bytes.len() > policy.max_per_file_bytes {
page_index_files_skipped_per_file += 1;
Bytes::new()
} else if total_page_index_bytes + page_index_bytes.len() > policy.max_total_bytes {
page_index_files_skipped_total_cap += 1;
Bytes::new()
} else {
total_page_index_bytes += page_index_bytes.len();
page_index_files_included += 1;
page_index_bytes
}
} else {
Bytes::new()
};
writer.add_entry_with_page_indexes(
&relative_path,
source_size,
footer_bytes,
selected_page_indexes,
);
}
let bundle_bytes = writer.finish();
info!("Generated bundle: {} bytes", bundle_bytes.len());
if !output_path.contains("://") {
if let Some(parent) = std::path::Path::new(&output_path).parent() {
tokio::fs::create_dir_all(parent)
.await
.with_context(|| format!("Failed to create directory {}", parent.display()))?;
}
}
storage::write_all(&output_path, Bytes::from(bundle_bytes))
.await
.context("Failed to write bundle file")?;
println!("Created: {}", output_path);
println!(" Files: {}", parquet_files.len());
println!(" Total footer bytes: {}", total_footer_bytes);
if policy.enabled {
println!(" Page index policy: enabled");
println!(" Included files: {}", page_index_files_included);
println!(" Total page index bytes: {}", total_page_index_bytes);
println!(
" Skipped (per-file cap {} bytes): {}",
policy.max_per_file_bytes, page_index_files_skipped_per_file
);
println!(
" Skipped (total cap {} bytes): {}",
policy.max_total_bytes, page_index_files_skipped_total_cap
);
} else {
println!(" Page index policy: disabled");
}
Ok(())
}
pub async fn inspect(input: &str) -> Result<()> {
let bundle_bytes = storage::read_all(input)
.await
.context("Failed to read bundle file")?;
let reader = ParxBundleReader::open(bundle_bytes).context("Failed to parse bundle file")?;
let header = reader.header();
println!("Bundle: {}", input);
println!();
println!("Header:");
println!(
" Version: {}.{}",
header.version_major, header.version_minor
);
println!(" Flags: {:#06x}", header.flags);
println!(" Entry count: {}", header.entry_count);
println!();
println!("Metadata:");
if reader.created_at_ms() > 0 {
let created = UNIX_EPOCH + Duration::from_millis(reader.created_at_ms());
if let Ok(datetime) = created.duration_since(UNIX_EPOCH) {
let secs = datetime.as_secs();
println!(" Created: {} (unix timestamp)", secs);
}
}
println!();
println!("Entries ({}):", reader.entry_count());
for entry in reader.iter_entries() {
let page_index_size = entry.page_index_bytes.map_or(0, |b| b.len());
println!(
" {} ({} bytes source, {} bytes footer, {} bytes page index)",
entry.parquet_path,
entry.source_size,
entry.footer_bytes.len(),
page_index_size
);
}
Ok(())
}
pub async fn verify(input: &str, with_sources: bool) -> Result<()> {
println!("Verifying: {}", input);
let bundle_bytes = storage::read_all(input)
.await
.context("Failed to read bundle file")?;
let reader = ParxBundleReader::open(bundle_bytes).context("Failed to parse bundle file")?;
println!(" Header magic: OK");
println!(" Manifest CRC32C: OK");
reader.validate_all().context("Bundle validation failed")?;
let page_index_entries = reader
.iter_entries()
.filter(|entry| entry.page_index_bytes.is_some())
.count();
println!(" All footer checksums: OK");
if page_index_entries > 0 {
println!(" All page index checksums: OK");
} else {
println!(" Page index checksums: N/A (no page indexes stored)");
}
println!(" Entry count: {}", reader.entry_count());
if with_sources {
println!();
println!("Verifying against source files...");
let bundle_dir = get_parent_dir(input);
let entries: Vec<_> = reader.iter_entries().collect();
let results: Vec<(String, u64, Result<u64>)> = stream::iter(entries.iter())
.map(|entry| {
let bundle_dir = bundle_dir.clone();
async move {
let source_path = if entry.parquet_path.starts_with('/')
|| entry.parquet_path.contains("://")
{
entry.parquet_path.to_string()
} else {
format!("{}/{}", bundle_dir, entry.parquet_path)
};
let result = storage::head(&source_path)
.await
.map(|meta| meta.size as u64);
(entry.parquet_path.to_string(), entry.source_size, result)
}
})
.buffer_unordered(MAX_CONCURRENCY)
.collect()
.await;
let mut ok_count = 0;
let mut mismatch_count = 0;
for (parquet_path, expected_size, result) in results {
match result {
Ok(actual_size) => {
if expected_size == actual_size {
ok_count += 1;
} else {
println!(
" MISMATCH: {} (expected {} bytes, got {})",
parquet_path, expected_size, actual_size
);
mismatch_count += 1;
}
}
Err(e) => {
println!(" NOT FOUND: {} ({})", parquet_path, e);
mismatch_count += 1;
}
}
}
println!();
println!(" Sources OK: {}", ok_count);
if mismatch_count > 0 {
println!(" Sources MISMATCH: {}", mismatch_count);
anyhow::bail!("Bundle is stale - {} source files changed", mismatch_count);
}
}
println!();
println!("Verification passed.");
Ok(())
}
fn get_parent_dir(path: &str) -> String {
if let Some(idx) = path.rfind('/') {
path[..idx].to_string()
} else {
".".to_string()
}
}
pub async fn extract(input: &str, output_dir: &str) -> Result<()> {
println!("Extracting: {}", input);
let bundle_bytes = storage::read_all(input)
.await
.context("Failed to read bundle file")?;
let reader = ParxBundleReader::open(bundle_bytes).context("Failed to parse bundle file")?;
let entries: Vec<_> = reader.iter_entries().collect();
let results: Vec<String> = stream::iter(entries.iter())
.map(|entry| {
let output_dir = output_dir.to_string();
async move {
let mut writer = ParxWriter::new();
writer.set_source_uri(entry.parquet_path);
writer.set_source_size(entry.source_size);
writer.set_footer(entry.footer_bytes);
if let Some(page_index_bytes) = entry.page_index_bytes {
writer.set_page_indexes(page_index_bytes);
}
let parx_bytes = writer.finish();
let parx_rel = sanitize_bundle_entry_path(entry.parquet_path)?;
let parx_filename = format!("{}.parx", parx_rel.to_string_lossy());
let parx_path = std::path::Path::new(&parx_filename);
let output_path = build_output_path(&output_dir, parx_path)?;
storage::write_all(&output_path, Bytes::from(parx_bytes))
.await
.with_context(|| format!("Failed to write {}", output_path))?;
Ok::<_, anyhow::Error>(output_path)
}
})
.buffer_unordered(MAX_CONCURRENCY)
.try_collect()
.await?;
for output_path in &results {
println!(" Extracted: {}", output_path);
}
println!();
println!("Extracted {} files to {}", results.len(), output_dir);
Ok(())
}
fn get_relative_path(base: &str, full_path: &str) -> String {
let base = base.trim_end_matches('/');
if let Some(stripped) = full_path.strip_prefix(base) {
stripped.trim_start_matches('/').to_string()
} else {
full_path
.rsplit('/')
.next()
.unwrap_or(full_path)
.to_string()
}
}
fn sanitize_bundle_entry_path(path: &str) -> Result<std::path::PathBuf> {
use std::path::{Component, Path};
if path.contains("://") {
anyhow::bail!("Bundle entry path must be relative: {}", path);
}
let raw = Path::new(path);
if raw.is_absolute() {
anyhow::bail!("Bundle entry path must be relative: {}", path);
}
let mut clean = std::path::PathBuf::new();
for comp in raw.components() {
match comp {
Component::Normal(part) => clean.push(part),
Component::CurDir => {}
Component::ParentDir | Component::RootDir | Component::Prefix(_) => {
anyhow::bail!("Bundle entry path contains invalid segment: {}", path);
}
}
}
if clean.as_os_str().is_empty() {
anyhow::bail!("Bundle entry path is empty: {}", path);
}
Ok(clean)
}
fn build_output_path(output_dir: &str, rel_path: &std::path::Path) -> Result<String> {
if output_dir.contains("://") {
if output_dir.split('/').any(|seg| seg == "..") {
anyhow::bail!(
"Output directory contains invalid path segment: {}",
output_dir
);
}
Ok(format!(
"{}/{}",
output_dir.trim_end_matches('/'),
rel_path.to_string_lossy()
))
} else {
let output_path = std::path::Path::new(output_dir).join(rel_path);
if let Some(parent) = output_path.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("Failed to create {}", parent.display()))?;
}
Ok(output_path.to_string_lossy().to_string())
}
}