use std::sync::Arc;
use crate::error::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionStrategy {
RoundRobin,
Hash,
Range,
}
impl Default for PartitionStrategy {
fn default() -> Self {
Self::RoundRobin
}
}
#[derive(Debug)]
pub struct Partition {
id: usize,
#[cfg(feature = "distributed")]
data: Option<arrow::record_batch::RecordBatch>,
metadata: PartitionMetadata,
}
#[derive(Debug, Clone)]
pub struct PartitionMetadata {
row_count: usize,
memory_usage: usize,
column_names: Vec<String>,
statistics: Option<PartitionStatistics>,
}
#[derive(Debug, Clone)]
pub struct PartitionStatistics {
column_statistics: Vec<ColumnStatistics>,
}
#[derive(Debug, Clone)]
pub struct ColumnStatistics {
name: String,
data_type: String,
min_value: Option<String>,
max_value: Option<String>,
null_count: usize,
distinct_count: Option<usize>,
}
impl Partition {
#[cfg(feature = "distributed")]
pub fn new(id: usize, data: arrow::record_batch::RecordBatch) -> Self {
let metadata = PartitionMetadata::from_record_batch(&data);
Self {
id,
data: Some(data),
metadata,
}
}
pub fn new_metadata_only(id: usize, metadata: PartitionMetadata) -> Self {
Self {
id,
#[cfg(feature = "distributed")]
data: None,
metadata,
}
}
pub fn id(&self) -> usize {
self.id
}
#[cfg(feature = "distributed")]
pub fn data(&self) -> Option<&arrow::record_batch::RecordBatch> {
self.data.as_ref()
}
#[cfg(feature = "distributed")]
pub fn take_data(&mut self) -> Option<arrow::record_batch::RecordBatch> {
self.data.take()
}
pub fn metadata(&self) -> &PartitionMetadata {
&self.metadata
}
}
impl PartitionMetadata {
#[cfg(feature = "distributed")]
pub fn from_record_batch(batch: &arrow::record_batch::RecordBatch) -> Self {
let row_count = batch.num_rows();
let memory_usage = estimate_batch_memory_usage(batch);
let column_names = batch
.schema()
.fields()
.iter()
.map(|f| f.name().clone())
.collect();
let statistics = None;
Self {
row_count,
memory_usage,
column_names,
statistics,
}
}
pub fn new(
row_count: usize,
memory_usage: usize,
column_names: Vec<String>,
statistics: Option<PartitionStatistics>,
) -> Self {
Self {
row_count,
memory_usage,
column_names,
statistics,
}
}
pub fn row_count(&self) -> usize {
self.row_count
}
pub fn memory_usage(&self) -> usize {
self.memory_usage
}
pub fn column_names(&self) -> &[String] {
&self.column_names
}
pub fn statistics(&self) -> Option<&PartitionStatistics> {
self.statistics.as_ref()
}
}
#[derive(Debug, Clone)]
pub struct PartitionSet {
partitions: Vec<Arc<Partition>>,
#[cfg(feature = "distributed")]
schema: Option<arrow::datatypes::SchemaRef>,
}
impl PartitionSet {
#[cfg(feature = "distributed")]
pub fn new(partitions: Vec<Arc<Partition>>, schema: arrow::datatypes::SchemaRef) -> Self {
Self {
partitions,
schema: Some(schema),
}
}
pub fn empty() -> Self {
Self {
partitions: Vec::new(),
#[cfg(feature = "distributed")]
schema: None,
}
}
pub fn partitions(&self) -> &[Arc<Partition>] {
&self.partitions
}
#[cfg(feature = "distributed")]
pub fn schema(&self) -> Option<&arrow::datatypes::SchemaRef> {
self.schema.as_ref()
}
pub fn add_partition(&mut self, partition: Arc<Partition>) {
self.partitions.push(partition);
}
pub fn total_rows(&self) -> usize {
self.partitions
.iter()
.map(|p| p.metadata().row_count())
.sum()
}
pub fn total_memory_usage(&self) -> usize {
self.partitions
.iter()
.map(|p| p.metadata().memory_usage())
.sum()
}
}
pub trait Partitioner {
#[cfg(feature = "distributed")]
fn partition(&self, df: &crate::dataframe::DataFrame) -> Result<PartitionSet>;
}
#[cfg(feature = "distributed")]
fn estimate_batch_memory_usage(batch: &arrow::record_batch::RecordBatch) -> usize {
let mut total_size = 0;
for column in batch.columns() {
for buffer in column.to_data().buffers() {
total_size += buffer.len();
}
if column.null_count() > 0 {
total_size += (batch.num_rows() + 7) / 8; }
}
total_size += 100 * batch.num_columns();
total_size
}