use crate::aggregation::{Aggregation, AggregationKind};
use crate::column::Column;
use crate::error::{CudfError, Result};
use crate::table::Table;
use crate::types::checked_i32;
pub struct GroupBy<'a> {
keys: &'a Table,
requests: Vec<(usize, Aggregation)>,
}
impl<'a> GroupBy<'a> {
pub fn new(keys: &'a Table) -> Self {
Self {
keys,
requests: Vec::new(),
}
}
pub fn agg(mut self, column: usize, kind: AggregationKind) -> Self {
self.requests.push((column, Aggregation::new(kind)));
self
}
pub fn agg_with(mut self, column: usize, aggregation: Aggregation) -> Self {
self.requests.push((column, aggregation));
self
}
pub fn execute(self, values: &Table) -> Result<Table> {
if self.requests.is_empty() {
return Err(CudfError::InvalidArgument(
"groupby requires at least one aggregation request".to_string(),
));
}
let mut builder = cudf_cxx::groupby::ffi::groupby_new(&self.keys.inner);
for (col_idx, agg) in self.requests {
cudf_cxx::groupby::ffi::groupby_add_request(
builder.pin_mut(),
checked_i32(col_idx)?,
agg.inner,
);
}
let raw = cudf_cxx::groupby::ffi::groupby_execute(builder.pin_mut(), &values.inner)
.map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
pub fn execute_keys(self, values: &Table) -> Result<Table> {
if self.requests.is_empty() {
return Err(CudfError::InvalidArgument(
"groupby requires at least one aggregation request".to_string(),
));
}
let mut builder = cudf_cxx::groupby::ffi::groupby_new(&self.keys.inner);
for (col_idx, agg) in self.requests {
cudf_cxx::groupby::ffi::groupby_add_request(
builder.pin_mut(),
checked_i32(col_idx)?,
agg.inner,
);
}
let raw = cudf_cxx::groupby::ffi::groupby_execute_keys(builder.pin_mut(), &values.inner)
.map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
pub fn execute_values(self, values: &Table) -> Result<Table> {
if self.requests.is_empty() {
return Err(CudfError::InvalidArgument(
"groupby requires at least one aggregation request".to_string(),
));
}
let mut builder = cudf_cxx::groupby::ffi::groupby_new(&self.keys.inner);
for (col_idx, agg) in self.requests {
cudf_cxx::groupby::ffi::groupby_add_request(
builder.pin_mut(),
checked_i32(col_idx)?,
agg.inner,
);
}
let raw = cudf_cxx::groupby::ffi::groupby_execute_values(builder.pin_mut(), &values.inner)
.map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupByScanOp {
Sum = 0,
Min = 2,
Max = 3,
Count = 11,
Rank = 12,
}
pub struct GroupByScan<'a> {
keys: &'a Table,
requests: Vec<(usize, GroupByScanOp)>,
}
impl<'a> GroupByScan<'a> {
pub fn new(keys: &'a Table) -> Self {
Self {
keys,
requests: Vec::new(),
}
}
pub fn scan(mut self, column: usize, op: GroupByScanOp) -> Self {
self.requests.push((column, op));
self
}
pub fn execute(self, values: &Table) -> Result<Table> {
if self.requests.is_empty() {
return Err(CudfError::InvalidArgument(
"groupby scan requires at least one request".to_string(),
));
}
let mut builder = cudf_cxx::groupby::ffi::groupby_scan_new(&self.keys.inner);
for (col_idx, op) in self.requests {
cudf_cxx::groupby::ffi::groupby_scan_add_request(
builder.pin_mut(),
checked_i32(col_idx)?,
op as i32,
);
}
let raw = cudf_cxx::groupby::ffi::groupby_scan_execute(builder.pin_mut(), &values.inner)
.map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
}
pub struct GroupByGroups {
pub keys: Table,
pub offsets: Column,
pub values: Option<Table>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupByReplacePolicy {
Forward = 0,
Backward = 1,
}
impl Table {
pub fn groupby_get_groups(&self) -> Result<GroupByGroups> {
let mut raw =
cudf_cxx::groupby::ffi::groupby_get_groups(&self.inner).map_err(CudfError::from_cxx)?;
let keys_raw = cudf_cxx::groupby::ffi::groupby_groups_take_keys(raw.pin_mut())
.map_err(CudfError::from_cxx)?;
let offsets_raw = cudf_cxx::groupby::ffi::groupby_groups_take_offsets(raw.pin_mut())
.map_err(CudfError::from_cxx)?;
Ok(GroupByGroups {
keys: Table { inner: keys_raw },
offsets: Column { inner: offsets_raw },
values: None,
})
}
pub fn groupby_get_groups_with_values(&self, values: &Table) -> Result<GroupByGroups> {
let mut raw =
cudf_cxx::groupby::ffi::groupby_get_groups_with_values(&self.inner, &values.inner)
.map_err(CudfError::from_cxx)?;
let keys_raw = cudf_cxx::groupby::ffi::groupby_groups_take_keys(raw.pin_mut())
.map_err(CudfError::from_cxx)?;
let offsets_raw = cudf_cxx::groupby::ffi::groupby_groups_take_offsets(raw.pin_mut())
.map_err(CudfError::from_cxx)?;
let values_raw = cudf_cxx::groupby::ffi::groupby_groups_take_values(raw.pin_mut())
.map_err(CudfError::from_cxx)?;
Ok(GroupByGroups {
keys: Table { inner: keys_raw },
offsets: Column { inner: offsets_raw },
values: Some(Table { inner: values_raw }),
})
}
pub fn groupby_replace_nulls(
&self,
values: &Table,
policies: &[GroupByReplacePolicy],
) -> Result<Table> {
let p: Vec<i32> = policies.iter().map(|&p| p as i32).collect();
let raw = cudf_cxx::groupby::ffi::groupby_replace_nulls(&self.inner, &values.inner, &p)
.map_err(CudfError::from_cxx)?;
Ok(Table { inner: raw })
}
}