use std::sync::Arc;
use abi_stable::StableAbi;
use abi_stable::std_types::RVec;
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use crate::physical_expr::FFI_PhysicalExpr;
#[repr(C)]
#[derive(Debug, StableAbi)]
pub enum FFI_Partitioning {
RoundRobinBatch(usize),
Hash(RVec<FFI_PhysicalExpr>, usize),
UnknownPartitioning(usize),
}
impl From<&Partitioning> for FFI_Partitioning {
fn from(value: &Partitioning) -> Self {
match value {
Partitioning::RoundRobinBatch(size) => Self::RoundRobinBatch(*size),
Partitioning::Hash(exprs, size) => {
let exprs = exprs
.iter()
.map(Arc::clone)
.map(FFI_PhysicalExpr::from)
.collect();
Self::Hash(exprs, *size)
}
Partitioning::UnknownPartitioning(size) => Self::UnknownPartitioning(*size),
}
}
}
impl From<&FFI_Partitioning> for Partitioning {
fn from(value: &FFI_Partitioning) -> Self {
match value {
FFI_Partitioning::RoundRobinBatch(size) => {
Partitioning::RoundRobinBatch(*size)
}
FFI_Partitioning::Hash(exprs, size) => {
let exprs = exprs.iter().map(<Arc<dyn PhysicalExpr>>::from).collect();
Self::Hash(exprs, *size)
}
FFI_Partitioning::UnknownPartitioning(size) => {
Self::UnknownPartitioning(*size)
}
}
}
}
#[cfg(test)]
mod tests {
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::expressions::lit;
use crate::physical_expr::partitioning::FFI_Partitioning;
#[test]
fn round_trip_ffi_partitioning() {
for partitioning in [
Partitioning::RoundRobinBatch(10),
Partitioning::Hash(vec![lit(1)], 10),
Partitioning::UnknownPartitioning(10),
] {
let ffi_partitioning: FFI_Partitioning = (&partitioning).into();
let returned: Partitioning = (&ffi_partitioning).into();
if let Partitioning::UnknownPartitioning(return_size) = returned {
let Partitioning::UnknownPartitioning(original_size) = partitioning
else {
panic!("Expected unknown partitioning")
};
assert_eq!(return_size, original_size);
} else {
assert_eq!(partitioning, returned);
}
}
}
}