use cudf::aggregation::AggregationKind;
use cudf::groupby::GroupBy;
use cudf::sorting::{NullOrder, SortOrder};
use cudf::stream_compaction::{DuplicateKeepOption, NullEquality};
use cudf::{Column as GpuColumn, Table as GpuTable};
use polars_core::prelude::*;
use polars_error::{PolarsResult, polars_err};
use crate::convert;
use crate::error::gpu_result;
pub struct GpuDataFrame {
table: GpuTable,
names: Vec<String>,
}
impl GpuDataFrame {
pub fn from_table(table: GpuTable, names: Vec<String>) -> Self {
Self { table, names }
}
pub fn from_columns(columns: Vec<GpuColumn>, names: Vec<String>) -> PolarsResult<Self> {
if columns.len() != names.len() {
return Err(polars_err!(ComputeError:
"GpuDataFrame: {} columns but {} names", columns.len(), names.len()));
}
let table = gpu_result(GpuTable::new(columns))?;
Ok(Self { table, names })
}
pub fn height(&self) -> usize {
self.table.num_rows()
}
pub fn width(&self) -> usize {
self.table.num_columns()
}
pub fn column(&self, index: usize) -> PolarsResult<GpuColumn> {
gpu_result(self.table.column(index))
}
pub fn column_index(&self, name: &str) -> PolarsResult<usize> {
self.names
.iter()
.position(|n| n == name)
.ok_or_else(|| polars_err!(ColumnNotFound: "{}", name))
}
pub fn column_by_name(&self, name: &str) -> PolarsResult<GpuColumn> {
let idx = self.column_index(name)?;
self.column(idx)
}
pub fn names(&self) -> &[String] {
&self.names
}
pub fn select_columns(&self, col_names: &[&str]) -> PolarsResult<Self> {
let mut columns = Vec::with_capacity(col_names.len());
let mut new_names = Vec::with_capacity(col_names.len());
for &name in col_names {
columns.push(self.column_by_name(name)?);
new_names.push(name.to_string());
}
Self::from_columns(columns, new_names)
}
pub fn apply_boolean_mask(&self, mask: &GpuColumn) -> PolarsResult<Self> {
let filtered = gpu_result(self.table.apply_boolean_mask(mask))?;
Ok(Self {
table: filtered,
names: self.names.clone(),
})
}
pub fn slice(&self, offset: i64, length: usize) -> PolarsResult<Self> {
let height = self.height();
let begin = if offset >= 0 {
offset as usize
} else {
height.saturating_sub((-offset) as usize)
};
let end = (begin + length).min(height);
let sliced = gpu_result(self.table.slice(begin, end))?;
Ok(Self {
table: sliced,
names: self.names.clone(),
})
}
pub fn sort_by_key(
&self,
key_columns: Vec<GpuColumn>,
orders: &[SortOrder],
null_orders: &[NullOrder],
) -> PolarsResult<Self> {
let keys_table = gpu_result(GpuTable::new(key_columns))?;
let sorted = gpu_result(self.table.sort_by_key(&keys_table, orders, null_orders))?;
Ok(Self {
table: sorted,
names: self.names.clone(),
})
}
pub fn groupby(
&self,
key_columns: Vec<GpuColumn>,
key_names: Vec<String>,
value_columns: Vec<GpuColumn>,
agg_requests: Vec<(usize, AggregationKind)>,
agg_names: Vec<String>,
maintain_order: bool,
) -> PolarsResult<Self> {
let keys_table = gpu_result(GpuTable::new(key_columns))?;
if maintain_order {
let height = keys_table.num_rows();
let row_idx = gpu_result(cudf::filling::sequence_i64(height, 0, 1))?;
let row_idx_col_idx = value_columns.len();
let mut all_value_cols = value_columns;
all_value_cols.push(row_idx);
let values_with_idx = gpu_result(GpuTable::new(all_value_cols))?;
let mut gb = GroupBy::new(&keys_table);
for (col_idx, kind) in &agg_requests {
gb = gb.agg(*col_idx, kind.clone());
}
gb = gb.agg(row_idx_col_idx, AggregationKind::Min);
let result = gpu_result(gb.execute(&values_with_idx))?;
let last_col_idx = result.num_columns() - 1;
let idx_col = gpu_result(result.column(last_col_idx))?;
let sort_keys = gpu_result(GpuTable::new(vec![idx_col]))?;
let sorted = gpu_result(result.sort_by_key(
&sort_keys,
&[SortOrder::Ascending],
&[NullOrder::After],
))?;
let mut final_cols = Vec::with_capacity(last_col_idx);
for i in 0..last_col_idx {
final_cols.push(gpu_result(sorted.column(i))?);
}
let final_table = gpu_result(GpuTable::new(final_cols))?;
let mut all_names = key_names;
all_names.extend(agg_names);
Ok(Self {
table: final_table,
names: all_names,
})
} else {
let values_table = gpu_result(GpuTable::new(value_columns))?;
let mut gb = GroupBy::new(&keys_table);
for (col_idx, kind) in agg_requests {
gb = gb.agg(col_idx, kind);
}
let result = gpu_result(gb.execute(&values_table))?;
let mut all_names = key_names;
all_names.extend(agg_names);
Ok(Self {
table: result,
names: all_names,
})
}
}
pub fn distinct(
&self,
subset: Option<&[&str]>,
keep: DuplicateKeepOption,
maintain_order: bool,
) -> PolarsResult<Self> {
let key_indices: Vec<usize> = match subset {
Some(cols) => cols
.iter()
.map(|&name| self.column_index(name))
.collect::<PolarsResult<_>>()?,
None => (0..self.width()).collect(),
};
let result = if maintain_order {
gpu_result(
self.table
.stable_distinct(&key_indices, keep, NullEquality::Equal),
)?
} else {
gpu_result(self.table.distinct(&key_indices, keep, NullEquality::Equal))?
};
Ok(Self {
table: result,
names: self.names.clone(),
})
}
pub fn inner_table(&self) -> &GpuTable {
&self.table
}
pub fn into_parts(self) -> PolarsResult<(Vec<GpuColumn>, Vec<String>)> {
let cols = gpu_result(self.table.into_columns())?;
Ok((cols, self.names))
}
pub fn to_polars(self) -> PolarsResult<DataFrame> {
convert::gpu_to_dataframe(self.table, &self.names)
}
pub fn from_polars(df: &DataFrame) -> PolarsResult<Self> {
let (table, names) = convert::dataframe_to_gpu(df)?;
Ok(Self { table, names })
}
}