mod helpers;
mod output;
use std::collections::HashMap;
use clap::Args;
use crate::catalog::register::{DataFileInput, RegisterOptions};
use crate::cli::catalog::CatalogConfig;
use crate::cli::output::{print, OutputFormat};
use crate::io::{get_filename, is_local_path};
use crate::spec::{NamespaceIdent, PartitionSpec, Schema, TableIdent};
use helpers::{
determine_partition_values, expand_glob, format_partition_key, generate_upload_path,
introspect_file, upload_local_file,
};
use output::{CommitPlanOutput, CommitResultOutput, PartitionSummary, SchemaMismatch};
pub use helpers::{
build_partition_spec, check_schema_compatibility, parse_partition_spec,
parse_partition_values_arg, parse_type_str,
};
#[derive(Debug, Args)]
pub struct CommitArgs {
pub pattern: String,
#[arg(long, short)]
pub namespace: String,
#[arg(long, short)]
pub table: String,
#[arg(long)]
pub exemplar: Option<String>,
#[arg(long)]
pub create: bool,
#[arg(long, requires = "create")]
pub partition: Option<String>,
#[arg(long)]
pub partition_values: Option<String>,
#[arg(long)]
pub dry_run: bool,
}
struct TableLocationResult {
location: String,
table_was_pre_created: bool,
}
async fn resolve_table_location(
catalog: &dyn crate::catalog::Catalog,
namespace: &NamespaceIdent,
table_ident: &TableIdent,
schema: &Schema,
partition_spec: Option<&PartitionSpec>,
table_exists: bool,
dry_run: bool,
) -> Result<TableLocationResult, String> {
if table_exists {
let table = catalog
.load_table(table_ident)
.await
.map_err(|e| format!("Failed to load table: {}", e))?;
return Ok(TableLocationResult {
location: table.location().to_string(),
table_was_pre_created: false,
});
}
let ns_name = namespace.as_ref().first().map(|s| s.as_str()).unwrap_or("");
let table_name = table_ident.name();
if dry_run {
return Ok(TableLocationResult {
location: format!("s3://<bucket>/{}/{}", ns_name, table_name),
table_was_pre_created: false,
});
}
let mut creation_builder = crate::spec::TableCreation::builder()
.with_name(table_name.to_string())
.with_schema(schema.clone());
if let Some(spec) = partition_spec {
creation_builder = creation_builder.with_partition_spec(spec.clone());
}
let creation = creation_builder
.build()
.map_err(|e| format!("Failed to build table creation: {}", e))?;
let table = catalog
.create_table(namespace, creation)
.await
.map_err(|e| format!("Failed to create table: {}", e))?;
println!("Created table: {}.{}", ns_name, table_name);
Ok(TableLocationResult {
location: table.location().to_string(),
table_was_pre_created: true,
})
}
struct ProcessedFiles {
data_files: Vec<DataFileInput>,
uploads: Vec<(String, String)>,
schema_mismatches: Vec<SchemaMismatch>,
partition_summaries: HashMap<String, (usize, i64)>,
total_bytes: u64,
total_rows: i64,
}
async fn process_input_files(
files: &[String],
file_io: &crate::io::FileIO,
schema: &Schema,
explicit_partition_values: &Option<HashMap<String, String>>,
partition_spec: Option<&PartitionSpec>,
table_location: &str,
) -> Result<ProcessedFiles, String> {
let mut data_files: Vec<DataFileInput> = Vec::new();
let mut schema_mismatches = Vec::new();
let mut partition_summaries: HashMap<String, (usize, i64)> = HashMap::new();
let mut total_bytes = 0u64;
let mut total_rows = 0i64;
let mut uploads: Vec<(String, String)> = Vec::new();
for file_path in files {
let introspection = introspect_file(file_path, file_io).await?;
if let Err(mismatch_reason) = check_schema_compatibility(schema, &introspection.schema) {
schema_mismatches.push(SchemaMismatch {
file_path: file_path.clone(),
reason: mismatch_reason,
});
continue;
}
let partition_values = determine_partition_values(
file_path,
explicit_partition_values,
partition_spec,
schema,
)?;
let partition_key = format_partition_key(&partition_values);
let entry = partition_summaries.entry(partition_key).or_insert((0, 0));
entry.0 += 1;
entry.1 += introspection.data_file.record_count;
total_bytes += introspection.data_file.file_size_in_bytes as u64;
total_rows += introspection.data_file.record_count;
let mut data_file = introspection.data_file;
data_file.partition_values = partition_values;
if is_local_path(file_path) {
let remote_path = generate_upload_path(table_location, file_path);
uploads.push((file_path.clone(), remote_path.clone()));
data_file.file_path = remote_path;
}
data_files.push(data_file);
}
Ok(ProcessedFiles {
data_files,
uploads,
schema_mismatches,
partition_summaries,
total_bytes,
total_rows,
})
}
async fn execute_uploads(
uploads: &[(String, String)],
file_io: &crate::io::FileIO,
) -> Result<(), String> {
if uploads.is_empty() {
return Ok(());
}
println!("Uploading {} local files...", uploads.len());
for (local_path, remote_path) in uploads {
println!(" {} -> {}", get_filename(local_path), remote_path);
upload_local_file(local_path, remote_path, file_io).await?;
}
println!("Upload complete");
Ok(())
}
fn build_partition_summaries(
partition_summaries: HashMap<String, (usize, i64)>,
) -> Vec<PartitionSummary> {
partition_summaries
.into_iter()
.map(|(k, (count, rows))| PartitionSummary {
partition_value: if k.is_empty() {
"(unpartitioned)".to_string()
} else {
k
},
file_count: count,
row_count: rows,
})
.collect()
}
pub async fn execute(
args: CommitArgs,
config: &CatalogConfig,
format: OutputFormat,
) -> Result<(), String> {
let files = expand_glob(&args.pattern)?;
println!("Found {} Parquet files", files.len());
let has_local_files = files.iter().any(|f| is_local_path(f));
if has_local_files {
println!("Detected local files - will upload to table storage");
}
let catalog = config.create_catalog().await?;
let file_io = catalog.file_io();
let exemplar_path = args.exemplar.as_ref().unwrap_or(&files[0]);
let exemplar = introspect_file(exemplar_path, file_io).await?;
let schema = exemplar.schema.clone();
println!("Schema from: {}", exemplar_path);
let partition_spec = args
.partition
.as_ref()
.map(|s| build_partition_spec(s, &schema))
.transpose()?;
let explicit_partition_values = args
.partition_values
.as_ref()
.map(|pv| parse_partition_values_arg(pv))
.transpose()?;
if args.namespace.is_empty() {
return Err("Namespace cannot be empty".to_string());
}
if args.table.is_empty() {
return Err("Table name cannot be empty".to_string());
}
let namespace = NamespaceIdent::from_strs(&[args.namespace.as_str()]);
let table_ident = TableIdent::from_strs(&[args.namespace.as_str()], &args.table);
let table_exists = catalog
.table_exists(&table_ident)
.await
.map_err(|e| format!("Failed to check if table exists: {}", e))?;
if !table_exists && !args.create {
return Err(format!(
"Table {}.{} does not exist. Use --create to create it.",
args.namespace, args.table
));
}
let (table_location, table_was_pre_created) = if has_local_files {
let result = resolve_table_location(
catalog.as_ref(),
&namespace,
&table_ident,
&schema,
partition_spec.as_ref(),
table_exists,
args.dry_run,
)
.await?;
(result.location, result.table_was_pre_created)
} else {
(String::new(), false)
};
let processed = process_input_files(
&files,
file_io,
&schema,
&explicit_partition_values,
partition_spec.as_ref(),
&table_location,
)
.await?;
if !processed.schema_mismatches.is_empty() && !args.dry_run {
return Err(format!(
"{} files have schema mismatches. Run with --dry-run to see details.",
processed.schema_mismatches.len()
));
}
let partitions = build_partition_summaries(processed.partition_summaries);
if args.dry_run {
let plan = CommitPlanOutput {
schema_source: exemplar_path.clone(),
target_table: format!("{}.{}", args.namespace, args.table),
will_create_table: !table_exists,
partition_columns: partition_spec
.as_ref()
.map(|s| s.fields().iter().map(|f| f.name().to_string()).collect())
.unwrap_or_default(),
files_to_commit: processed.data_files.len(),
files_to_upload: processed.uploads.len(),
total_rows: processed.total_rows,
total_bytes: processed.total_bytes,
partitions,
schema_mismatches: processed.schema_mismatches,
};
print(&plan, format);
return Ok(());
}
execute_uploads(&processed.uploads, file_io).await?;
let options = if args.create && !table_exists && !table_was_pre_created {
let mut opts = RegisterOptions::new().allow_create_with_schema(schema.clone());
if let Some(spec) = partition_spec {
opts = opts.with_partition_spec(spec);
}
opts.allow_noop(true)
} else {
RegisterOptions::new().allow_noop(true)
};
let data_files: Vec<DataFileInput> = processed
.data_files
.into_iter()
.map(|mut f| {
f.source_schema = None;
f
})
.collect();
let result = crate::catalog::register::register_data_files(
catalog.as_ref(),
namespace,
table_ident,
data_files,
options,
)
.await
.map_err(|e| format!("Commit failed: {}", e))?;
let output = CommitResultOutput {
target_table: format!("{}.{}", args.namespace, args.table),
table_created: result.table_was_created || table_was_pre_created,
files_committed: result.added_files,
rows_committed: result.added_records,
files_skipped: result.skipped_files.len(),
snapshot_id: result.snapshot_id,
};
print(&output, format);
Ok(())
}