#![forbid(unsafe_code)]
#![warn(missing_docs)]
pub mod predicate;
mod reader;
mod writer;
pub mod partition;
pub mod streaming;
pub use arrow::array::{
Array, ArrayRef, BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array,
LargeStringArray, StringArray, UInt32Array, UInt64Array,
};
pub use arrow::compute;
pub use arrow::datatypes::{DataType, Field, Schema};
pub use arrow::record_batch::RecordBatch;
pub use partition::{PartitionPredicate, PartitionedDataset};
pub use predicate::{CmpOp, Predicate, Scalar};
pub use streaming::{ColumnarStreamReader, ColumnarStreamWriter};
pub use writer::WriterConfig;
pub trait ColumnarStore {
fn schema(&self) -> &Arc<Schema>;
fn batches(&self) -> &[RecordBatch];
fn compression(&self) -> CompressionMode;
fn row_count(&self) -> usize;
fn push(&mut self, batch: RecordBatch) -> Result<(), ColumnarError>;
fn push_unchecked(&mut self, batch: RecordBatch);
fn project(&self, columns: &[&str]) -> Result<ColumnarTable, ColumnarError>;
fn sort_by(&self, column_name: &str, ascending: bool) -> Result<ColumnarTable, ColumnarError>;
fn filter(&self, predicate: &Predicate) -> Result<ColumnarTable, ColumnarError>;
fn write_to_bytes(&self) -> Result<Vec<u8>, ColumnarError>;
fn write_to(&self, path: &std::path::Path) -> Result<(), ColumnarError>;
fn write_to_bytes_with_config(&self, config: &WriterConfig) -> Result<Vec<u8>, ColumnarError>;
fn write_to_with_config(
&self,
path: &std::path::Path,
config: &WriterConfig,
) -> Result<(), ColumnarError>;
}
impl ColumnarStore for ColumnarTable {
fn schema(&self) -> &Arc<Schema> {
&self.schema
}
fn batches(&self) -> &[RecordBatch] {
&self.batches
}
fn compression(&self) -> CompressionMode {
self.compression
}
fn row_count(&self) -> usize {
self.row_count()
}
fn push(&mut self, batch: RecordBatch) -> Result<(), ColumnarError> {
self.push(batch)
}
fn push_unchecked(&mut self, batch: RecordBatch) {
self.push_unchecked(batch);
}
fn project(&self, columns: &[&str]) -> Result<ColumnarTable, ColumnarError> {
self.project(columns)
}
fn sort_by(&self, column_name: &str, ascending: bool) -> Result<ColumnarTable, ColumnarError> {
self.sort_by(column_name, ascending)
}
fn filter(&self, predicate: &Predicate) -> Result<ColumnarTable, ColumnarError> {
self.filter(predicate)
}
fn write_to_bytes(&self) -> Result<Vec<u8>, ColumnarError> {
self.write_to_bytes()
}
fn write_to(&self, path: &std::path::Path) -> Result<(), ColumnarError> {
self.write_to(path)
}
fn write_to_bytes_with_config(&self, config: &WriterConfig) -> Result<Vec<u8>, ColumnarError> {
self.write_to_bytes_with_config(config)
}
fn write_to_with_config(
&self,
path: &std::path::Path,
config: &WriterConfig,
) -> Result<(), ColumnarError> {
self.write_to_with_config(path, config)
}
}
#[derive(Debug, Clone)]
pub struct ParquetFileMetaInfo {
pub num_rows: i64,
pub num_row_groups: usize,
pub num_columns: usize,
pub file_size: u64,
}
pub fn read_metadata_from_bytes(data: &[u8]) -> Result<ParquetFileMetaInfo, ColumnarError> {
reader::read_metadata_from_bytes(data)
}
use std::path::Path;
use std::sync::Arc;
const OXIA_MAGIC: &[u8; 4] = b"OXIA";
#[derive(Debug)]
pub enum ColumnarError {
Io(std::io::Error),
Arrow(arrow::error::ArrowError),
Parquet(parquet::errors::ParquetError),
SchemaMismatch(String),
Compress(String),
UnsupportedType(String),
Manifest(String),
}
impl std::fmt::Display for ColumnarError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ColumnarError::Io(e) => write!(f, "columnar I/O error: {e}"),
ColumnarError::Arrow(e) => write!(f, "Arrow error: {e}"),
ColumnarError::Parquet(e) => write!(f, "Parquet error: {e}"),
ColumnarError::SchemaMismatch(msg) => write!(f, "schema mismatch: {msg}"),
ColumnarError::Compress(msg) => write!(f, "compression error: {msg}"),
ColumnarError::UnsupportedType(msg) => write!(f, "unsupported Arrow type: {msg}"),
ColumnarError::Manifest(msg) => write!(f, "partition manifest error: {msg}"),
}
}
}
impl std::error::Error for ColumnarError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
ColumnarError::Io(e) => Some(e),
ColumnarError::Arrow(e) => Some(e),
ColumnarError::Parquet(e) => Some(e),
ColumnarError::SchemaMismatch(_)
| ColumnarError::Compress(_)
| ColumnarError::UnsupportedType(_)
| ColumnarError::Manifest(_) => None,
}
}
}
impl From<std::io::Error> for ColumnarError {
fn from(e: std::io::Error) -> Self {
ColumnarError::Io(e)
}
}
impl From<arrow::error::ArrowError> for ColumnarError {
fn from(e: arrow::error::ArrowError) -> Self {
ColumnarError::Arrow(e)
}
}
impl From<parquet::errors::ParquetError> for ColumnarError {
fn from(e: parquet::errors::ParquetError) -> Self {
ColumnarError::Parquet(e)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CompressionMode {
#[default]
None,
OxiArc {
level: u8,
},
}
pub fn write_batches(
path: &Path,
schema: Arc<Schema>,
batches: &[RecordBatch],
) -> Result<(), ColumnarError> {
writer::write_batches(path, schema, batches)
}
pub fn read_batches(path: &Path) -> Result<Vec<RecordBatch>, ColumnarError> {
reader::read_batches(path)
}
pub fn write_batches_to_bytes(
schema: Arc<Schema>,
batches: &[RecordBatch],
) -> Result<Vec<u8>, ColumnarError> {
writer::write_batches_to_bytes(schema, batches)
}
pub fn read_batches_from_bytes(data: &[u8]) -> Result<Vec<RecordBatch>, ColumnarError> {
reader::read_batches_from_bytes(data).map(|t| t.batches)
}
pub fn read_batches_with_projection(
path: &Path,
column_indices: &[usize],
) -> Result<Vec<RecordBatch>, ColumnarError> {
reader::read_batches_with_projection(path, column_indices)
}
pub fn read_metadata(path: &Path) -> Result<ParquetFileMetadata, ColumnarError> {
reader::read_metadata(path)
}
pub fn read_with_predicate(
path: &Path,
predicate: &predicate::Predicate,
) -> Result<Vec<RecordBatch>, ColumnarError> {
let bytes = std::fs::read(path)?;
let table = reader::read_with_predicate_from_bytes(&bytes, predicate)?;
Ok(table.batches)
}
pub fn read_with_projection_and_predicate(
path: &Path,
projection: &[&str],
predicate: &predicate::Predicate,
) -> Result<Vec<RecordBatch>, ColumnarError> {
let bytes = std::fs::read(path)?;
let table =
reader::read_with_projection_and_predicate_from_bytes(&bytes, projection, predicate)?;
Ok(table.batches)
}
#[derive(Debug, Clone)]
pub struct ParquetFileMetadata {
pub schema: Arc<Schema>,
pub num_rows: i64,
pub num_row_groups: usize,
pub num_columns: usize,
pub file_size: u64,
}
pub struct ColumnarTable {
pub schema: Arc<Schema>,
pub batches: Vec<RecordBatch>,
pub compression: CompressionMode,
}
impl ColumnarTable {
#[must_use]
pub fn new(schema: Arc<Schema>) -> Self {
ColumnarTable {
schema,
batches: Vec::new(),
compression: CompressionMode::None,
}
}
#[must_use]
pub fn with_compression(mut self, level: u8) -> Self {
self.compression = CompressionMode::OxiArc {
level: level.min(9),
};
self
}
pub fn push(&mut self, batch: RecordBatch) -> Result<(), ColumnarError> {
if batch.schema() != self.schema {
return Err(ColumnarError::SchemaMismatch(format!(
"expected schema {:?}, got {:?}",
self.schema,
batch.schema()
)));
}
self.batches.push(batch);
Ok(())
}
pub fn push_unchecked(&mut self, batch: RecordBatch) {
self.batches.push(batch);
}
#[must_use]
pub fn row_count(&self) -> usize {
self.batches.iter().map(|b| b.num_rows()).sum()
}
pub fn write_to(&self, path: &Path) -> Result<(), ColumnarError> {
writer::write_batches(path, Arc::clone(&self.schema), &self.batches)
}
pub fn read_from(path: &Path) -> Result<Self, ColumnarError> {
let batches = reader::read_batches(path)?;
let schema = if let Some(first) = batches.first() {
Arc::clone(first.schema_ref())
} else {
Arc::new(Schema::empty())
};
Ok(ColumnarTable {
schema,
batches,
compression: CompressionMode::None,
})
}
pub fn project(&self, columns: &[&str]) -> Result<Self, ColumnarError> {
let indices: Vec<usize> = columns
.iter()
.filter_map(|name| self.schema.index_of(name).ok())
.collect();
if indices.is_empty() {
return Ok(ColumnarTable::new(Arc::new(Schema::empty())));
}
let new_fields: Vec<Arc<Field>> = indices
.iter()
.map(|&i| Arc::new(self.schema.field(i).clone()))
.collect();
let new_schema = Arc::new(Schema::new(new_fields));
let mut new_batches = Vec::with_capacity(self.batches.len());
for batch in &self.batches {
let cols: Vec<ArrayRef> = indices
.iter()
.map(|&i| Arc::clone(batch.column(i)))
.collect();
let projected = RecordBatch::try_new(Arc::clone(&new_schema), cols)?;
new_batches.push(projected);
}
Ok(ColumnarTable {
schema: new_schema,
batches: new_batches,
compression: self.compression,
})
}
pub fn merge(&mut self, other: &ColumnarTable) -> Result<(), ColumnarError> {
if self.schema != other.schema {
return Err(ColumnarError::SchemaMismatch(format!(
"cannot merge: schemas differ ({:?} vs {:?})",
self.schema, other.schema
)));
}
self.batches.extend(other.batches.iter().cloned());
Ok(())
}
pub fn sort_by(&self, column_name: &str, ascending: bool) -> Result<Self, ColumnarError> {
let idx = self
.schema
.index_of(column_name)
.map_err(ColumnarError::Arrow)?;
let combined = arrow::compute::concat_batches(&self.schema, &self.batches)?;
let sort_column = combined.column(idx);
let sort_options = arrow::compute::SortOptions {
descending: !ascending,
nulls_first: true,
};
let sort_indices = arrow::compute::sort_to_indices(sort_column, Some(sort_options), None)?;
let sorted_cols: Vec<ArrayRef> = combined
.columns()
.iter()
.map(|col| arrow::compute::take(col.as_ref(), &sort_indices, None))
.collect::<Result<Vec<_>, _>>()?;
let sorted_batch = RecordBatch::try_new(Arc::clone(&self.schema), sorted_cols)?;
Ok(ColumnarTable {
schema: Arc::clone(&self.schema),
batches: vec![sorted_batch],
compression: self.compression,
})
}
pub fn write_to_bytes(&self) -> Result<Vec<u8>, ColumnarError> {
let raw = writer::write_batches_to_bytes(Arc::clone(&self.schema), &self.batches)?;
match self.compression {
CompressionMode::None => Ok(raw),
CompressionMode::OxiArc { level } => compress_payload(&raw, level),
}
}
pub fn read_from_bytes(data: &[u8]) -> Result<Self, ColumnarError> {
let parquet_bytes = if data.starts_with(OXIA_MAGIC) {
decompress_payload(&data[OXIA_MAGIC.len()..])?
} else {
data.to_vec()
};
reader::read_batches_from_bytes(&parquet_bytes)
}
pub fn read_columns(bytes: &[u8], columns: &[&str]) -> Result<Self, ColumnarError> {
reader::read_columns_from_bytes(bytes, columns)
}
pub fn read_with_predicate(
bytes: &[u8],
pred: &predicate::Predicate,
) -> Result<Self, ColumnarError> {
reader::read_with_predicate_from_bytes(bytes, pred)
}
pub fn read_with_projection_and_predicate(
bytes: &[u8],
columns: &[&str],
pred: &predicate::Predicate,
) -> Result<Self, ColumnarError> {
reader::read_with_projection_and_predicate_from_bytes(bytes, columns, pred)
}
pub fn read_with_schema(bytes: &[u8], target: &Arc<Schema>) -> Result<Self, ColumnarError> {
reader::read_with_schema_from_bytes(bytes, target)
}
pub fn write_to_bytes_with_config(
&self,
config: &WriterConfig,
) -> Result<Vec<u8>, ColumnarError> {
let raw = writer::write_batches_to_bytes_with_config(
Arc::clone(&self.schema),
&self.batches,
config,
)?;
match self.compression {
CompressionMode::None => Ok(raw),
CompressionMode::OxiArc { level } => compress_payload(&raw, level),
}
}
pub fn write_to_with_config(
&self,
path: &Path,
config: &WriterConfig,
) -> Result<(), ColumnarError> {
writer::write_batches_with_config(path, Arc::clone(&self.schema), &self.batches, config)
}
}
#[cfg(feature = "compress")]
fn compress_payload(raw: &[u8], level: u8) -> Result<Vec<u8>, ColumnarError> {
let compressed =
oxiarc_deflate::deflate(raw, level).map_err(|e| ColumnarError::Compress(e.to_string()))?;
let mut out = Vec::with_capacity(OXIA_MAGIC.len() + compressed.len());
out.extend_from_slice(OXIA_MAGIC);
out.extend_from_slice(&compressed);
Ok(out)
}
#[cfg(not(feature = "compress"))]
fn compress_payload(_raw: &[u8], _level: u8) -> Result<Vec<u8>, ColumnarError> {
Err(ColumnarError::Compress(
"OxiARC compression requires the `compress` feature".into(),
))
}
#[cfg(feature = "compress")]
fn decompress_payload(compressed: &[u8]) -> Result<Vec<u8>, ColumnarError> {
oxiarc_deflate::inflate(compressed).map_err(|e| ColumnarError::Compress(e.to_string()))
}
#[cfg(not(feature = "compress"))]
fn decompress_payload(_compressed: &[u8]) -> Result<Vec<u8>, ColumnarError> {
Err(ColumnarError::Compress(
"OxiARC decompression requires the `compress` feature".into(),
))
}
impl ColumnarTable {
pub fn metadata_from_bytes(data: &[u8]) -> Result<ParquetFileMetaInfo, ColumnarError> {
reader::read_metadata_from_bytes(data)
}
pub fn filter(&self, predicate: &Predicate) -> Result<ColumnarTable, ColumnarError> {
let mut filtered: Vec<RecordBatch> = Vec::with_capacity(self.batches.len());
for batch in &self.batches {
let mask = predicate.evaluate_batch(batch)?;
let result =
arrow::compute::filter_record_batch(batch, &mask).map_err(ColumnarError::Arrow)?;
if result.num_rows() > 0 {
filtered.push(result);
}
}
Ok(ColumnarTable {
schema: Arc::clone(&self.schema),
batches: filtered,
compression: self.compression,
})
}
}
#[derive(Debug, Clone)]
pub struct ColumnarTableBuilder {
schema: Arc<Schema>,
row_group_size: Option<usize>,
compression: CompressionMode,
}
impl ColumnarTableBuilder {
#[must_use]
pub fn new(schema: Arc<Schema>) -> Self {
Self {
schema,
row_group_size: None,
compression: CompressionMode::None,
}
}
#[must_use]
pub fn row_group_size(mut self, size: usize) -> Self {
self.row_group_size = Some(size);
self
}
#[must_use]
pub fn compression(mut self, level: u8) -> Self {
self.compression = CompressionMode::OxiArc {
level: level.min(9),
};
self
}
#[must_use]
pub fn row_group_size_hint(&self) -> Option<usize> {
self.row_group_size
}
#[must_use]
pub fn build(self) -> ColumnarTable {
ColumnarTable {
schema: self.schema,
batches: Vec::new(),
compression: self.compression,
}
}
#[must_use]
pub fn build_with_config(self) -> (ColumnarTable, WriterConfig) {
let config = WriterConfig {
max_row_group_size: self.row_group_size,
};
let table = ColumnarTable {
schema: self.schema,
batches: Vec::new(),
compression: self.compression,
};
(table, config)
}
}
impl std::fmt::Display for ColumnarTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ColumnarTable(schema={} cols, {} batches, {} rows)",
self.schema.fields().len(),
self.batches.len(),
self.row_count()
)
}
}