use std::collections::HashMap;
use std::fs;
use std::io::{BufRead, BufReader, Write as IoWrite};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use arrow::array::StringArray;
use arrow::datatypes::Schema;
use arrow::record_batch::RecordBatch;
use crate::{ColumnarError, ColumnarTable, CompressionMode};
struct ManifestEntry {
partition_values: Vec<String>,
rel_path: String,
row_count: usize,
}
fn encode_partition_value(value: &str) -> String {
value
.chars()
.map(|c| {
if c.is_alphanumeric() || matches!(c, '-' | '.' | '+' | '_') {
c
} else {
'_'
}
})
.collect()
}
#[derive(Debug, Clone)]
pub enum PartitionPredicate {
Eq(String),
In(Vec<String>),
Range {
lo: String,
hi: String,
},
And(Vec<(String, PartitionPredicate)>),
}
impl PartitionPredicate {
fn matches_single(&self, value: &str) -> bool {
match self {
Self::Eq(v) => value == v.as_str(),
Self::In(vs) => vs.iter().any(|v| v.as_str() == value),
Self::Range { lo, hi } => value >= lo.as_str() && value < hi.as_str(),
Self::And(_) => panic!("matches_single called on And predicate; use matches_multi"),
}
}
fn matches_multi(&self, column_names: &[String], values: &[String]) -> bool {
match self {
Self::And(pairs) => pairs.iter().all(|(col_name, sub_pred)| {
let idx = column_names.iter().position(|c| c == col_name);
match idx {
Some(i) => {
sub_pred.matches_single(values.get(i).map(String::as_str).unwrap_or(""))
}
None => false,
}
}),
other => other.matches_single(values.first().map(String::as_str).unwrap_or("")),
}
}
}
pub struct PartitionedDataset {
root: PathBuf,
partition_columns: Vec<String>,
compression: CompressionMode,
}
impl PartitionedDataset {
#[must_use]
pub fn new(root: PathBuf, partition_columns: Vec<String>) -> Self {
Self {
root,
partition_columns,
compression: CompressionMode::None,
}
}
#[must_use]
pub fn new_single_column(root: PathBuf, col: impl Into<String>) -> Self {
Self::new(root, vec![col.into()])
}
#[must_use]
pub fn with_compression(mut self, compression: CompressionMode) -> Self {
self.compression = compression;
self
}
#[must_use]
pub fn root(&self) -> &Path {
&self.root
}
#[must_use]
pub fn partition_columns(&self) -> &[String] {
&self.partition_columns
}
pub fn write_partitioned(&self, batches: &[RecordBatch]) -> Result<(), ColumnarError> {
let mut groups: HashMap<Vec<String>, Vec<RecordBatch>> = HashMap::new();
for batch in batches {
let schema = batch.schema();
let col_indices: Vec<usize> = self
.partition_columns
.iter()
.map(|col_name| {
schema.index_of(col_name).map_err(|_| {
ColumnarError::SchemaMismatch(format!(
"partition column '{col_name}' not found in schema"
))
})
})
.collect::<Result<Vec<_>, _>>()?;
let str_cols: Vec<&StringArray> = col_indices
.iter()
.zip(self.partition_columns.iter())
.map(|(&idx, col_name)| {
let col = batch.column(idx);
col.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
ColumnarError::SchemaMismatch(format!(
"partition column '{col_name}' must be Utf8 (String), got {:?}",
col.data_type()
))
})
})
.collect::<Result<Vec<_>, _>>()?;
let num_rows = batch.num_rows();
let mut row_groups: HashMap<Vec<String>, Vec<bool>> = HashMap::new();
for row in 0..num_rows {
let tuple: Vec<String> = str_cols
.iter()
.map(|col| col.value(row).to_string())
.collect();
let entry = row_groups
.entry(tuple)
.or_insert_with(|| vec![false; num_rows]);
entry[row] = true;
}
for (tuple, mask_vec) in row_groups {
let mask = arrow::array::BooleanArray::from(mask_vec);
let filtered = arrow::compute::filter_record_batch(batch, &mask)?;
groups.entry(tuple).or_default().push(filtered);
}
}
fs::create_dir_all(&self.root)?;
let mut manifest_entries: Vec<ManifestEntry> = Vec::with_capacity(groups.len());
for (tuple, group_batches) in &groups {
let dir_parts: Vec<String> = self
.partition_columns
.iter()
.zip(tuple.iter())
.map(|(col, val)| format!("{}={}", col, encode_partition_value(val)))
.collect();
let dir_rel: String = dir_parts.join("/");
let dir_path = self.root.join(&dir_rel);
fs::create_dir_all(&dir_path)?;
let file_path = dir_path.join("part-0000.parquet");
let schema: Arc<Schema> = group_batches
.first()
.map(|b| b.schema())
.unwrap_or_else(|| Arc::new(Schema::empty()));
let total_rows =
write_group_to_file(&file_path, schema, group_batches, self.compression)?;
let rel_path = format!("{dir_rel}/part-0000.parquet");
manifest_entries.push(ManifestEntry {
partition_values: tuple.clone(),
rel_path,
row_count: total_rows,
});
}
self.write_manifest(&manifest_entries)?;
Ok(())
}
pub fn read_partitioned(
&self,
predicate: Option<&PartitionPredicate>,
) -> Result<Vec<RecordBatch>, ColumnarError> {
let (column_names, entries) = self.read_manifest()?;
let mut all_batches: Vec<RecordBatch> = Vec::new();
for entry in &entries {
if let Some(pred) = predicate {
if !pred.matches_multi(&column_names, &entry.partition_values) {
continue;
}
}
let file_path = self.root.join(&entry.rel_path);
let batches = read_group_from_file(&file_path)?;
all_batches.extend(batches);
}
Ok(all_batches)
}
pub fn list_partitions(&self) -> Result<Vec<(Vec<String>, String, usize)>, ColumnarError> {
let (_column_names, entries) = self.read_manifest()?;
Ok(entries
.into_iter()
.map(|e| (e.partition_values, e.rel_path, e.row_count))
.collect())
}
fn write_manifest(&self, entries: &[ManifestEntry]) -> Result<(), ColumnarError> {
let path = self.root.join("manifest.tsv");
let mut f = fs::File::create(&path)?;
let is_multi = self.partition_columns.len() != 1;
if is_multi {
writeln!(f, "manifest_version=2")?;
let header = self
.partition_columns
.iter()
.map(String::as_str)
.chain(["rel_path", "row_count"])
.collect::<Vec<_>>()
.join("\t");
writeln!(f, "{header}")?;
for e in entries {
let vals = e.partition_values.join("\t");
writeln!(f, "{vals}\t{}\t{}", e.rel_path, e.row_count)?;
}
} else {
writeln!(f, "partition_value\trel_path\trow_count")?;
for e in entries {
let val = e.partition_values.first().map(String::as_str).unwrap_or("");
writeln!(f, "{val}\t{}\t{}", e.rel_path, e.row_count)?;
}
}
Ok(())
}
fn read_manifest(&self) -> Result<(Vec<String>, Vec<ManifestEntry>), ColumnarError> {
let path = self.root.join("manifest.tsv");
let f = fs::File::open(&path)?;
let reader = BufReader::new(f);
let mut lines = reader.lines();
let first_line = lines
.next()
.ok_or_else(|| ColumnarError::Manifest("manifest.tsv is empty".to_string()))??;
if first_line.starts_with("manifest_version=") {
let header_line = lines.next().ok_or_else(|| {
ColumnarError::Manifest("v2 manifest has no header line".to_string())
})??;
let headers: Vec<&str> = header_line.split('\t').collect();
if headers.len() < 3 {
return Err(ColumnarError::Manifest(format!(
"v2 manifest header has only {} fields (minimum 3)",
headers.len()
)));
}
let n_cols = headers.len() - 2; let col_names: Vec<String> = headers[..n_cols].iter().map(|s| s.to_string()).collect();
let mut entries: Vec<ManifestEntry> = Vec::new();
for (line_idx, line_result) in lines.enumerate() {
let line = line_result?;
let parts: Vec<&str> = line.splitn(n_cols + 3, '\t').collect();
if parts.len() < n_cols + 2 {
return Err(ColumnarError::Manifest(format!(
"v2 manifest line {}: expected {} tab-separated fields, got {}",
line_idx + 3,
n_cols + 2,
parts.len()
)));
}
let partition_values: Vec<String> =
parts[..n_cols].iter().map(|s| s.to_string()).collect();
let rel_path = parts[n_cols].to_string();
let row_count = parts[n_cols + 1].parse::<usize>().map_err(|_| {
ColumnarError::Manifest(format!(
"v2 manifest line {}: invalid row_count '{}'",
line_idx + 3,
parts[n_cols + 1]
))
})?;
entries.push(ManifestEntry {
partition_values,
rel_path,
row_count,
});
}
Ok((col_names, entries))
} else {
let col_name = self
.partition_columns
.first()
.cloned()
.unwrap_or_else(|| "partition_value".to_string());
let mut entries: Vec<ManifestEntry> = Vec::new();
for (line_idx, line_result) in lines.enumerate() {
let line = line_result?;
let parts: Vec<&str> = line.splitn(3, '\t').collect();
if parts.len() != 3 {
return Err(ColumnarError::Manifest(format!(
"manifest line {}: expected 3 tab-separated fields, got {}",
line_idx + 2,
parts.len()
)));
}
let row_count = parts[2].parse::<usize>().map_err(|_| {
ColumnarError::Manifest(format!(
"manifest line {}: invalid row_count '{}'",
line_idx + 2,
parts[2]
))
})?;
entries.push(ManifestEntry {
partition_values: vec![parts[0].to_string()],
rel_path: parts[1].to_string(),
row_count,
});
}
Ok((vec![col_name], entries))
}
}
}
fn write_group_to_file(
path: &Path,
schema: Arc<Schema>,
batches: &[RecordBatch],
compression: CompressionMode,
) -> Result<usize, ColumnarError> {
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
match compression {
CompressionMode::None => {
crate::write_batches(path, schema, batches)?;
}
CompressionMode::OxiArc { level } => {
let mut table = ColumnarTable::new(schema);
for batch in batches {
table.push_unchecked(batch.clone());
}
let bytes = table.with_compression(level).write_to_bytes()?;
fs::write(path, bytes)?;
}
}
Ok(total_rows)
}
fn read_group_from_file(path: &Path) -> Result<Vec<RecordBatch>, ColumnarError> {
let bytes = fs::read(path)?;
if bytes.starts_with(b"OXIA") {
let table = ColumnarTable::read_from_bytes(&bytes)?;
Ok(table.batches)
} else {
crate::read_batches(path)
}
}