use std::sync::Arc;
use crate::error::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionStrategy {
RoundRobin,
Hash,
Range,
}
impl Default for PartitionStrategy {
fn default() -> Self {
Self::RoundRobin
}
}
#[derive(Debug)]
pub struct Partition {
id: usize,
#[cfg(feature = "distributed")]
data: Option<arrow::record_batch::RecordBatch>,
metadata: PartitionMetadata,
}
#[derive(Debug, Clone)]
pub struct PartitionMetadata {
row_count: usize,
memory_usage: usize,
column_names: Vec<String>,
statistics: Option<PartitionStatistics>,
}
#[derive(Debug, Clone)]
pub struct PartitionStatistics {
column_statistics: Vec<ColumnStatistics>,
}
#[derive(Debug, Clone)]
pub struct ColumnStatistics {
name: String,
data_type: String,
min_value: Option<String>,
max_value: Option<String>,
null_count: usize,
distinct_count: Option<usize>,
}
impl Partition {
#[cfg(feature = "distributed")]
pub fn new(id: usize, data: arrow::record_batch::RecordBatch) -> Self {
let metadata = PartitionMetadata::from_record_batch(&data);
Self {
id,
data: Some(data),
metadata,
}
}
pub fn new_metadata_only(id: usize, metadata: PartitionMetadata) -> Self {
Self {
id,
#[cfg(feature = "distributed")]
data: None,
metadata,
}
}
pub fn id(&self) -> usize {
self.id
}
#[cfg(feature = "distributed")]
pub fn data(&self) -> Option<&arrow::record_batch::RecordBatch> {
self.data.as_ref()
}
#[cfg(feature = "distributed")]
pub fn take_data(&mut self) -> Option<arrow::record_batch::RecordBatch> {
self.data.take()
}
pub fn metadata(&self) -> &PartitionMetadata {
&self.metadata
}
}
impl PartitionMetadata {
#[cfg(feature = "distributed")]
pub fn from_record_batch(batch: &arrow::record_batch::RecordBatch) -> Self {
let row_count = batch.num_rows();
let memory_usage = estimate_batch_memory_usage(batch);
let schema = batch.schema();
let column_names: Vec<String> = schema.fields().iter().map(|f| f.name().clone()).collect();
let statistics = Some(compute_batch_statistics(batch));
Self {
row_count,
memory_usage,
column_names,
statistics,
}
}
pub fn new(
row_count: usize,
memory_usage: usize,
column_names: Vec<String>,
statistics: Option<PartitionStatistics>,
) -> Self {
Self {
row_count,
memory_usage,
column_names,
statistics,
}
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn memory_usage(&self) -> usize {
self.memory_usage
}
pub fn column_names(&self) -> &[String] {
&self.column_names
}
pub fn statistics(&self) -> Option<&PartitionStatistics> {
self.statistics.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct PartitionSet {
partitions: Vec<Arc<Partition>>,
#[cfg(feature = "distributed")]
schema: Option<arrow::datatypes::SchemaRef>,
}
impl PartitionSet {
#[cfg(feature = "distributed")]
pub fn new(partitions: Vec<Arc<Partition>>, schema: arrow::datatypes::SchemaRef) -> Self {
Self {
partitions,
schema: Some(schema),
}
}
pub fn empty() -> Self {
Self {
partitions: Vec::new(),
#[cfg(feature = "distributed")]
schema: None,
}
}
pub fn partitions(&self) -> &[Arc<Partition>] {
&self.partitions
}
#[cfg(feature = "distributed")]
pub fn schema(&self) -> Option<&arrow::datatypes::SchemaRef> {
self.schema.as_ref()
}
pub fn add_partition(&mut self, partition: Arc<Partition>) {
self.partitions.push(partition);
}
pub fn total_rows(&self) -> usize {
self.partitions
.iter()
.map(|p| p.metadata().row_count())
.sum()
}
pub fn total_memory_usage(&self) -> usize {
self.partitions
.iter()
.map(|p| p.metadata().memory_usage())
.sum()
}
}
pub trait Partitioner {
#[cfg(feature = "distributed")]
fn partition(&self, df: &crate::dataframe::DataFrame) -> Result<PartitionSet>;
}
#[cfg(feature = "distributed")]
fn estimate_batch_memory_usage(batch: &arrow::record_batch::RecordBatch) -> usize {
let mut total_size = 0;
for column in batch.columns() {
for buffer in column.to_data().buffers() {
total_size += buffer.len();
}
if column.null_count() > 0 {
total_size += (batch.num_rows() + 7) / 8; }
}
total_size += 100 * batch.num_columns();
total_size
}
#[cfg(feature = "distributed")]
fn compute_batch_statistics(batch: &arrow::record_batch::RecordBatch) -> PartitionStatistics {
use arrow::array::{
Array as ArrowArray, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
Int8Array, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
};
use arrow::datatypes::DataType;
let schema = batch.schema();
let mut column_statistics = Vec::with_capacity(batch.num_columns());
for (col_idx, field) in schema.fields().iter().enumerate() {
let array = batch.column(col_idx);
let null_count = array.null_count();
let data_type = format!("{:?}", field.data_type());
let (min_value, max_value) = match field.data_type() {
DataType::Float64 => {
if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
let (min, max) = arr
.iter()
.flatten()
.fold((f64::INFINITY, f64::NEG_INFINITY), |(mn, mx), v| {
(mn.min(v), mx.max(v))
});
if min.is_finite() {
(Some(min.to_string()), Some(max.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::Float32 => {
if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
let (min, max) = arr
.iter()
.flatten()
.fold((f32::INFINITY, f32::NEG_INFINITY), |(mn, mx), v| {
(mn.min(v), mx.max(v))
});
if min.is_finite() {
(Some(min.to_string()), Some(max.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::Int64 => {
if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
let vals: Vec<i64> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::Int32 => {
if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
let vals: Vec<i32> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::Int16 => {
if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
let vals: Vec<i16> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::Int8 => {
if let Some(arr) = array.as_any().downcast_ref::<Int8Array>() {
let vals: Vec<i8> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::UInt64 => {
if let Some(arr) = array.as_any().downcast_ref::<UInt64Array>() {
let vals: Vec<u64> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::UInt32 => {
if let Some(arr) = array.as_any().downcast_ref::<UInt32Array>() {
let vals: Vec<u32> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::UInt16 => {
if let Some(arr) = array.as_any().downcast_ref::<UInt16Array>() {
let vals: Vec<u16> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
DataType::UInt8 => {
if let Some(arr) = array.as_any().downcast_ref::<UInt8Array>() {
let vals: Vec<u8> = arr.iter().flatten().collect();
if let (Some(&mn), Some(&mx)) = (vals.iter().min(), vals.iter().max()) {
(Some(mn.to_string()), Some(mx.to_string()))
} else {
(None, None)
}
} else {
(None, None)
}
}
_ => (None, None),
};
column_statistics.push(ColumnStatistics {
name: field.name().clone(),
data_type,
min_value,
max_value,
null_count,
distinct_count: None, });
}
PartitionStatistics { column_statistics }
}