use std::sync::Arc;
use crate::error::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionStrategy {
RoundRobin,
Hash,
Range,
}
impl Default for PartitionStrategy {
fn default() -> Self {
Self::RoundRobin
}
}
#[derive(Debug)]
pub struct Partition {
id: usize,
#[cfg(feature = "distributed")]
data: Option<arrow::record_batch::RecordBatch>,
metadata: PartitionMetadata,
}
#[derive(Debug, Clone)]
pub struct PartitionMetadata {
row_count: usize,
memory_usage: usize,
column_names: Vec<String>,
statistics: Option<PartitionStatistics>,
}
#[derive(Debug, Clone)]
pub struct PartitionStatistics {
column_statistics: Vec<ColumnStatistics>,
}
impl PartitionStatistics {
#[cfg(feature = "distributed")]
pub fn from_record_batch(batch: &arrow::record_batch::RecordBatch) -> Self {
let mut column_statistics = Vec::new();
for (i, field) in batch.schema().fields().iter().enumerate() {
let column = batch.column(i);
let column_stats = ColumnStatistics::from_arrow_array(field.name(), &field.data_type(), column);
column_statistics.push(column_stats);
}
Self {
column_statistics,
}
}
pub fn column_statistics(&self) -> &[ColumnStatistics] {
&self.column_statistics
}
}
#[derive(Debug, Clone)]
pub struct ColumnStatistics {
name: String,
data_type: String,
min_value: Option<String>,
max_value: Option<String>,
null_count: usize,
distinct_count: Option<usize>,
}
impl ColumnStatistics {
#[cfg(feature = "distributed")]
pub fn from_arrow_array(
name: &str,
data_type: &arrow::datatypes::DataType,
array: &dyn arrow::array::Array
) -> Self {
use arrow::array::*;
use arrow::compute::{min, max};
let null_count = array.null_count();
let data_type_str = format!("{:?}", data_type);
let (min_value, max_value) = match data_type {
arrow::datatypes::DataType::Utf8 => {
if let Some(string_array) = array.as_any().downcast_ref::<StringArray>() {
let min_val = min(string_array).ok()
.and_then(|v| string_array.value(v as usize).to_string().into());
let max_val = max(string_array).ok()
.and_then(|v| string_array.value(v as usize).to_string().into());
(min_val, max_val)
} else {
(None, None)
}
}
arrow::datatypes::DataType::Int64 => {
if let Some(int_array) = array.as_any().downcast_ref::<Int64Array>() {
let min_val = min(int_array).ok().map(|v| v.to_string());
let max_val = max(int_array).ok().map(|v| v.to_string());
(min_val, max_val)
} else {
(None, None)
}
}
arrow::datatypes::DataType::Float64 => {
if let Some(float_array) = array.as_any().downcast_ref::<Float64Array>() {
let min_val = min(float_array).ok().map(|v| v.to_string());
let max_val = max(float_array).ok().map(|v| v.to_string());
(min_val, max_val)
} else {
(None, None)
}
}
_ => {
(None, None)
}
};
Self {
name: name.to_string(),
data_type: data_type_str,
min_value,
max_value,
null_count,
distinct_count: None, }
}
pub fn name(&self) -> &str {
&self.name
}
pub fn data_type(&self) -> &str {
&self.data_type
}
pub fn min_value(&self) -> Option<&str> {
self.min_value.as_deref()
}
pub fn max_value(&self) -> Option<&str> {
self.max_value.as_deref()
}
pub fn null_count(&self) -> usize {
self.null_count
}
pub fn distinct_count(&self) -> Option<usize> {
self.distinct_count
}
}
impl Partition {
#[cfg(feature = "distributed")]
pub fn new(
id: usize,
data: arrow::record_batch::RecordBatch,
) -> Self {
let metadata = PartitionMetadata::from_record_batch(&data);
Self {
id,
data: Some(data),
metadata,
}
}
pub fn new_metadata_only(
id: usize,
metadata: PartitionMetadata,
) -> Self {
Self {
id,
#[cfg(feature = "distributed")]
data: None,
metadata,
}
}
pub fn id(&self) -> usize {
self.id
}
#[cfg(feature = "distributed")]
pub fn data(&self) -> Option<&arrow::record_batch::RecordBatch> {
self.data.as_ref()
}
#[cfg(feature = "distributed")]
pub fn take_data(&mut self) -> Option<arrow::record_batch::RecordBatch> {
self.data.take()
}
pub fn metadata(&self) -> &PartitionMetadata {
&self.metadata
}
}
impl PartitionMetadata {
#[cfg(feature = "distributed")]
pub fn from_record_batch(batch: &arrow::record_batch::RecordBatch) -> Self {
let row_count = batch.num_rows();
let memory_usage = estimate_batch_memory_usage(batch);
let column_names = batch
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
let statistics = Some(PartitionStatistics::from_record_batch(batch));
Self {
row_count,
memory_usage,
column_names,
statistics,
}
}
pub fn new(
row_count: usize,
memory_usage: usize,
column_names: Vec<String>,
statistics: Option<PartitionStatistics>,
) -> Self {
Self {
row_count,
memory_usage,
column_names,
statistics,
}
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn memory_usage(&self) -> usize {
self.memory_usage
}
pub fn column_names(&self) -> &[String] {
&self.column_names
}
pub fn statistics(&self) -> Option<&PartitionStatistics> {
self.statistics.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct PartitionSet {
partitions: Vec<Arc<Partition>>,
#[cfg(feature = "distributed")]
schema: arrow::datatypes::SchemaRef,
}
impl PartitionSet {
#[cfg(feature = "distributed")]
pub fn new(
partitions: Vec<Arc<Partition>>,
schema: arrow::datatypes::SchemaRef,
) -> Self {
Self {
partitions,
schema,
}
}
#[cfg(feature = "distributed")]
pub fn empty(schema: arrow::datatypes::SchemaRef) -> Self {
Self {
partitions: Vec::new(),
schema,
}
}
pub fn partitions(&self) -> &[Arc<Partition>] {
&self.partitions
}
#[cfg(feature = "distributed")]
pub fn schema(&self) -> &arrow::datatypes::SchemaRef {
&self.schema
}
pub fn total_row_count(&self) -> usize {
self.partitions
.iter()
.map(|p| p.metadata().row_count())
.sum()
}
pub fn total_memory_usage(&self) -> usize {
self.partitions
.iter()
.map(|p| p.metadata().memory_usage())
.sum()
}
pub fn add_partition(&mut self, partition: Arc<Partition>) {
self.partitions.push(partition);
}
}
#[derive(Debug)]
pub struct Partitioner {
strategy: PartitionStrategy,
partition_columns: Option<Vec<String>>,
num_partitions: usize,
}
impl Partitioner {
pub fn new(
strategy: PartitionStrategy,
partition_columns: Option<Vec<String>>,
num_partitions: usize,
) -> Self {
Self {
strategy,
partition_columns,
num_partitions,
}
}
pub fn strategy(&self) -> PartitionStrategy {
self.strategy
}
pub fn partition_columns(&self) -> Option<&[String]> {
self.partition_columns.as_deref()
}
pub fn num_partitions(&self) -> usize {
self.num_partitions
}
#[cfg(feature = "distributed")]
pub fn partition_dataframe(
&self,
df: &crate::dataframe::DataFrame,
) -> Result<PartitionSet> {
use arrow::array::{Array, ArrayRef, Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
if self.num_partitions == 0 {
return Err(crate::error::Error::InvalidValue(
"Number of partitions must be greater than 0".to_string()
));
}
if df.nrows() == 0 {
let fields = df.column_names()
.iter()
.map(|name| Field::new(name, DataType::Utf8, true))
.collect();
let schema = Arc::new(Schema::new(fields));
return Ok(PartitionSet::empty(schema));
}
let record_batch = self.dataframe_to_record_batch(df)?;
let schema = record_batch.schema();
let partitioned_batches = match self.strategy {
PartitionStrategy::RoundRobin => {
self.partition_round_robin(&record_batch)?
}
PartitionStrategy::Hash => {
self.partition_hash(&record_batch)?
}
PartitionStrategy::Range => {
self.partition_range(&record_batch)?
}
};
let partitions: Vec<Arc<Partition>> = partitioned_batches
.into_iter()
.enumerate()
.map(|(i, batch)| Arc::new(Partition::new(i, batch)))
.collect();
Ok(PartitionSet::new(partitions, schema))
}
#[cfg(feature = "distributed")]
fn dataframe_to_record_batch(&self, df: &crate::dataframe::DataFrame) -> Result<RecordBatch> {
use arrow::array::{Array, ArrayRef, Float64Array, Int64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
let column_names = df.column_names();
let mut fields = Vec::new();
let mut arrays: Vec<ArrayRef> = Vec::new();
for column_name in column_names {
let string_values = df.get_column_string_values(column_name)?;
let array: ArrayRef = Arc::new(StringArray::from(string_values));
arrays.push(array);
let field = Field::new(column_name, DataType::Utf8, true);
fields.push(field);
}
let schema = Arc::new(Schema::new(fields));
RecordBatch::try_new(schema, arrays)
.map_err(|e| crate::error::Error::InvalidValue(format!("Failed to create RecordBatch: {}", e)))
}
#[cfg(feature = "distributed")]
fn partition_round_robin(&self, batch: &RecordBatch) -> Result<Vec<RecordBatch>> {
use arrow::compute::take;
use arrow::array::UInt64Array;
let num_rows = batch.num_rows();
let mut partition_indices: Vec<Vec<u64>> = vec![Vec::new(); self.num_partitions];
for i in 0..num_rows {
let partition_id = i % self.num_partitions;
partition_indices[partition_id].push(i as u64);
}
self.create_partitions_from_indices(batch, partition_indices)
}
#[cfg(feature = "distributed")]
fn partition_hash(&self, batch: &RecordBatch) -> Result<Vec<RecordBatch>> {
let partition_columns = self.partition_columns.as_ref()
.ok_or_else(|| crate::error::Error::InvalidValue(
"Hash partitioning requires partition columns to be specified".to_string()
))?;
if partition_columns.is_empty() {
return Err(crate::error::Error::InvalidValue(
"Hash partitioning requires at least one partition column".to_string()
));
}
let column_name = &partition_columns[0];
let schema = batch.schema();
let column_index = schema.index_of(column_name)
.map_err(|_| crate::error::Error::ColumnNotFound(column_name.clone()))?;
let column_array = batch.column(column_index);
let num_rows = batch.num_rows();
let mut partition_indices: Vec<Vec<u64>> = vec![Vec::new(); self.num_partitions];
for i in 0..num_rows {
let mut hasher = DefaultHasher::new();
if let Some(string_array) = column_array.as_any().downcast_ref::<arrow::array::StringArray>() {
if let Some(value) = string_array.value(i) {
value.hash(&mut hasher);
}
} else {
format!("{:?}", column_array.slice(i, 1)).hash(&mut hasher);
}
let hash = hasher.finish();
let partition_id = (hash as usize) % self.num_partitions;
partition_indices[partition_id].push(i as u64);
}
self.create_partitions_from_indices(batch, partition_indices)
}
#[cfg(feature = "distributed")]
fn partition_range(&self, batch: &RecordBatch) -> Result<Vec<RecordBatch>> {
let num_rows = batch.num_rows();
let rows_per_partition = (num_rows + self.num_partitions - 1) / self.num_partitions;
let mut partition_indices: Vec<Vec<u64>> = vec![Vec::new(); self.num_partitions];
for i in 0..num_rows {
let partition_id = std::cmp::min(i / rows_per_partition, self.num_partitions - 1);
partition_indices[partition_id].push(i as u64);
}
self.create_partitions_from_indices(batch, partition_indices)
}
#[cfg(feature = "distributed")]
fn create_partitions_from_indices(
&self,
batch: &RecordBatch,
partition_indices: Vec<Vec<u64>>
) -> Result<Vec<RecordBatch>> {
use arrow::compute::take;
use arrow::array::UInt64Array;
let mut result = Vec::new();
for indices in partition_indices {
if indices.is_empty() {
let empty_arrays: Vec<ArrayRef> = batch.columns()
.iter()
.map(|col| arrow::compute::filter(col, &arrow::array::BooleanArray::from(vec![])).expect("operation should succeed"))
.collect();
let empty_batch = RecordBatch::try_new(batch.schema(), empty_arrays)
.map_err(|e| crate::error::Error::InvalidValue(format!("Failed to create empty batch: {}", e)))?;
result.push(empty_batch);
} else {
let indices_array = UInt64Array::from(indices);
let mut partition_arrays = Vec::new();
for column in batch.columns() {
let taken = take(column, &indices_array, None)
.map_err(|e| crate::error::Error::InvalidValue(format!("Failed to take rows: {}", e)))?;
partition_arrays.push(taken);
}
let partition_batch = RecordBatch::try_new(batch.schema(), partition_arrays)
.map_err(|e| crate::error::Error::InvalidValue(format!("Failed to create partition batch: {}", e)))?;
result.push(partition_batch);
}
}
Ok(result)
}
}
#[cfg(feature = "distributed")]
fn estimate_batch_memory_usage(batch: &arrow::record_batch::RecordBatch) -> usize {
let row_count = batch.num_rows();
let column_count = batch.num_columns();
let mut total_size = 0;
for i in 0..column_count {
let array = batch.column(i);
total_size += array.get_array_memory_size();
}
total_size + (row_count * column_count * std::mem::size_of::<usize>())
}