use crate::frame::select::Selection;
use crate::prelude::*;
use arrow::datatypes::{Field, Schema};
use arrow::record_batch::RecordBatch;
use itertools::Itertools;
use rayon::prelude::*;
use std::marker::Sized;
use std::mem;
use std::sync::Arc;
pub mod explode;
pub mod group_by;
pub mod hash_join;
pub mod select;
pub mod ser;
mod upstream_traits;
pub trait IntoSeries {
fn into_series(self) -> Series
where
Self: Sized;
}
impl IntoSeries for Series {
fn into_series(self) -> Series {
self
}
}
impl<T: PolarsDataType> IntoSeries for ChunkedArray<T> {
fn into_series(self) -> Series {
Series::from_chunked_array(self)
}
}
impl Default for DataFrame {
fn default() -> Self {
DataFrame::new_no_checks(Vec::with_capacity(0))
}
}
type DfSchema = Arc<Schema>;
type DfSeries = Series;
type DfColumns = Vec<DfSeries>;
#[derive(Clone)]
pub struct DataFrame {
columns: DfColumns,
parallel: bool,
}
impl DataFrame {
fn name_to_idx(&self, name: &str) -> Result<usize> {
let mut idx = 0;
for column in &self.columns {
if column.name() == name {
break;
}
idx += 1;
}
if idx == self.columns.len() {
Err(PolarsError::NotFound(name.into()))
} else {
Ok(idx)
}
}
pub fn new<S: IntoSeries>(columns: Vec<S>) -> Result<Self> {
let mut first_len = None;
let mut series_cols = Vec::with_capacity(columns.len());
for s in columns {
let series = s.into_series();
match first_len {
Some(len) => {
if series.len() != len {
return Err(PolarsError::ShapeMisMatch("Could not create a new DataFrame from Series. The Series have different lengths".into()));
}
}
None => first_len = Some(series.len()),
}
series_cols.push(series)
}
let mut df = DataFrame {
columns: series_cols,
parallel: false,
};
df.rechunk()?;
Ok(df)
}
#[cfg(feature = "parallel")]
pub fn with_parallel(&mut self, parallel: bool) -> &mut Self {
self.parallel = parallel;
self
}
pub(crate) fn new_no_checks(columns: Vec<Series>) -> DataFrame {
DataFrame {
columns,
parallel: false,
}
}
fn rechunk(&mut self) -> Result<&mut Self> {
let mut chunk_lens = Vec::with_capacity(self.columns.len());
let mut all_equal = true;
for series in &self.columns {
let current_len = series.len();
if chunk_lens.len() > 1 {
if current_len != chunk_lens[0] {
all_equal = false
}
}
chunk_lens.push(series.len())
}
if all_equal {
Ok(self)
} else {
let argmin = chunk_lens
.iter()
.position_min()
.ok_or(PolarsError::NoData("No data in rechunk operation".into()))?;
let min_chunks = chunk_lens[argmin];
let to_rechunk = chunk_lens
.into_iter()
.enumerate()
.filter_map(|(idx, len)| if len > min_chunks { Some(idx) } else { None })
.collect::<Vec<_>>();
let chunk_id = self.columns[argmin].chunk_lengths().clone();
for idx in to_rechunk {
let col = &self.columns[idx];
let new_col = col.rechunk(Some(&chunk_id))?;
self.columns[idx] = new_col;
}
Ok(self)
}
}
pub fn schema(&self) -> Schema {
let fields = Self::create_fields(&self.columns);
Schema::new(fields)
}
#[inline]
pub fn get_columns(&self) -> &DfColumns {
&self.columns
}
pub fn columns(&self) -> Vec<&str> {
self.columns.iter().map(|s| s.name()).collect()
}
pub fn dtypes(&self) -> Vec<ArrowDataType> {
self.columns.iter().map(|s| s.dtype().clone()).collect()
}
pub fn n_chunks(&self) -> Result<usize> {
Ok(self
.columns
.get(0)
.ok_or(PolarsError::NoData(
"Can not determine number of chunks if there is no data".into(),
))?
.chunks()
.len())
}
fn create_fields(columns: &DfColumns) -> Vec<Field> {
columns.iter().map(|s| s.field().clone()).collect()
}
fn register_mutation(&mut self) -> Result<()> {
self.rechunk()?;
Ok(())
}
pub fn fields(&self) -> Vec<Field> {
self.columns.iter().map(|s| s.field().clone()).collect()
}
pub fn shape(&self) -> (usize, usize) {
let columns = self.columns.len();
if columns > 0 {
(self.columns[0].len(), columns)
} else {
(0, 0)
}
}
pub fn width(&self) -> usize {
self.columns.len()
}
pub fn height(&self) -> usize {
self.shape().0
}
pub fn hstack(&mut self, columns: &[DfSeries]) -> Result<&mut Self> {
let height = self.height();
for col in columns {
if col.len() != height {
return Err(PolarsError::ShapeMisMatch(
format!("Could not horizontally stack Series. The Series length {} differs from the DataFrame height: {}", col.len(), height).into()));
} else {
self.columns.push(col.clone());
}
}
self.register_mutation()?;
Ok(self)
}
pub fn vstack(&mut self, df: &DataFrame) -> Result<&mut Self> {
if self.width() != df.width() {
return Err(PolarsError::ShapeMisMatch(
format!("Could not vertically stack DataFrame. The DataFrames appended width {} differs from the parent DataFrames width {}", self.width(), df.width()).into()
));
}
if self.dtypes() != df.dtypes() {
return Err(PolarsError::DataTypeMisMatch(
format!(
"cannot vstack: data types don't match of {:?} {:?}",
self.head(Some(2)),
df.head(Some(2))
)
.into(),
));
}
self.columns
.iter_mut()
.zip(df.columns.iter())
.for_each(|(left, right)| {
left.append(right).expect("should not fail");
});
self.register_mutation()?;
Ok(self)
}
pub fn drop_in_place(&mut self, name: &str) -> Result<DfSeries> {
let idx = self.name_to_idx(name)?;
let result = Ok(self.columns.remove(idx));
self.register_mutation()?;
result
}
pub fn drop(&self, name: &str) -> Result<Self> {
let idx = self.name_to_idx(name)?;
let mut new_cols = Vec::with_capacity(self.columns.len() - 1);
self.columns.iter().enumerate().for_each(|(i, s)| {
if i != idx {
new_cols.push(s.clone())
}
});
Ok(DataFrame::new_no_checks(new_cols))
}
pub fn insert_at_idx<S: IntoSeries>(&mut self, index: usize, column: S) -> Result<&mut Self> {
let series = column.into_series();
if series.len() == self.height() {
self.columns.insert(index, series);
Ok(self)
} else {
Err(PolarsError::ShapeMisMatch(
format!(
"Could add column. The Series length {} differs from the DataFrame height: {}",
series.len(),
self.height()
)
.into(),
))
}
}
pub fn add_column<S: IntoSeries>(&mut self, column: S) -> Result<&mut Self> {
let series = column.into_series();
if series.len() == self.height() {
self.columns.push(series);
Ok(self)
} else {
Err(PolarsError::ShapeMisMatch(
format!(
"Could add column. The Series length {} differs from the DataFrame height: {}",
series.len(),
self.height()
)
.into(),
))
}
}
pub fn with_column<S: IntoSeries>(&self, column: S) -> Result<Self> {
let mut df = self.clone();
df.add_column(column)?;
Ok(df)
}
pub fn get(&self, idx: usize) -> Option<Vec<AnyType>> {
match self.columns.get(0) {
Some(s) => {
if s.len() <= idx {
return None;
}
}
None => return None,
}
Some(self.columns.iter().map(|s| s.get(idx)).collect())
}
pub fn select_at_idx(&self, idx: usize) -> Option<&Series> {
self.columns.get(idx)
}
fn select_idx_mut(&mut self, idx: usize) -> Option<&mut Series> {
self.columns.get_mut(idx)
}
pub fn find_idx_by_name(&self, name: &str) -> Option<usize> {
self.columns
.iter()
.enumerate()
.filter(|(_idx, series)| series.name() == name)
.map(|(idx, _)| idx)
.next()
}
pub fn column(&self, name: &str) -> Result<&Series> {
let idx = self
.find_idx_by_name(name)
.ok_or(PolarsError::NotFound(name.into()))?;
Ok(self.select_at_idx(idx).unwrap())
}
pub fn select<'a, S, J>(&self, selection: S) -> Result<Self>
where
S: Selection<'a, J>,
{
let selected = self.select_series(selection)?;
let df = DataFrame::new_no_checks(selected);
Ok(df)
}
pub fn select_series<'a, S, J>(&self, selection: S) -> Result<Vec<Series>>
where
S: Selection<'a, J>,
{
let cols = selection.to_selection_vec();
let selected = cols
.iter()
.map(|c| self.column(c).map(|s| s.clone()))
.collect::<Result<Vec<_>>>()?;
Ok(selected)
}
fn select_mut(&mut self, name: &str) -> Option<&mut Series> {
let opt_idx = self.find_idx_by_name(name);
match opt_idx {
Some(idx) => self.select_idx_mut(idx),
None => None,
}
}
pub fn filter(&self, mask: &BooleanChunked) -> Result<Self> {
let new_col = self
.columns
.par_iter()
.map(|col| col.filter(mask))
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(new_col))
}
pub fn take_iter<I>(&self, iter: I, capacity: Option<usize>) -> Result<Self>
where
I: Iterator<Item = usize> + Clone + Sync,
{
let new_col = self
.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_iter(&mut i, capacity)
})
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(new_col))
}
pub unsafe fn take_iter_unchecked<I>(&self, iter: I, capacity: Option<usize>) -> Self
where
I: Iterator<Item = usize> + Clone + Sync,
{
let new_col = self
.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_iter_unchecked(&mut i, capacity)
})
.collect::<Vec<_>>();
DataFrame::new_no_checks(new_col)
}
pub fn take_opt_iter<I>(&self, iter: I, capacity: Option<usize>) -> Result<Self>
where
I: Iterator<Item = Option<usize>> + Clone + Sync,
{
let new_col = self
.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_opt_iter(&mut i, capacity)
})
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(new_col))
}
pub unsafe fn take_opt_iter_unchecked<I>(&self, iter: I, capacity: Option<usize>) -> Self
where
I: Iterator<Item = Option<usize>> + Clone + Sync,
{
let new_col = self
.columns
.par_iter()
.map(|s| {
let mut i = iter.clone();
s.take_opt_iter_unchecked(&mut i, capacity)
})
.collect::<Vec<_>>();
DataFrame::new_no_checks(new_col)
}
pub fn take<T: AsTakeIndex + Sync>(&self, indices: &T) -> Result<Self> {
let new_col = self
.columns
.par_iter()
.map(|s| s.take(indices))
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(new_col))
}
pub fn rename(&mut self, column: &str, name: &str) -> Result<&mut Self> {
self.select_mut(column)
.ok_or(PolarsError::NotFound(name.to_string()))
.map(|s| s.rename(name))?;
Ok(self)
}
pub fn sort_in_place(&mut self, by_column: &str, reverse: bool) -> Result<&mut Self> {
let s = self.column(by_column)?;
let take = s.argsort(reverse);
self.columns = self
.columns
.par_iter()
.map(|s| s.take(&take))
.collect::<Result<Vec<_>>>()?;
Ok(self)
}
pub fn sort(&self, by_column: &str, reverse: bool) -> Result<Self> {
let s = self.column(by_column)?;
let take = s.argsort(reverse);
self.take(&take)
}
pub fn replace<S: IntoSeries>(&mut self, column: &str, new_col: S) -> Result<&mut Self> {
self.apply(column, |_| new_col.into_series())
}
pub fn replace_at_idx<S: IntoSeries>(&mut self, idx: usize, new_col: S) -> Result<&mut Self> {
let mut new_column = new_col.into_series();
if new_column.len() != self.height() {
return Err(PolarsError::ShapeMisMatch(
format!("Cannot replace Series at index {}. The shape of Series {} does not match that of the DataFrame {}",
idx, new_column.len(), self.height()
).into()));
};
if idx >= self.width() {
return Err(PolarsError::OutOfBounds(
format!(
"Column index: {} outside of DataFrame with {} columns",
idx,
self.width()
)
.into(),
));
}
let old_col = &mut self.columns[idx];
mem::swap(old_col, &mut new_column);
Ok(self)
}
pub fn apply<F, S>(&mut self, column: &str, f: F) -> Result<&mut Self>
where
F: FnOnce(&Series) -> S,
S: IntoSeries,
{
let idx = self
.find_idx_by_name(column)
.ok_or(PolarsError::NotFound(column.to_string()))?;
self.apply_at_idx(idx, f)
}
pub fn apply_at_idx<F, S>(&mut self, idx: usize, f: F) -> Result<&mut Self>
where
F: FnOnce(&Series) -> S,
S: IntoSeries,
{
let width = self.width();
let col = self.columns.get_mut(idx).ok_or(PolarsError::OutOfBounds(
format!(
"Column index: {} outside of DataFrame with {} columns",
idx, width
)
.into(),
))?;
let name = col.name().to_string();
let _ = mem::replace(col, f(col).into_series());
unsafe {
let col = self.columns.get_unchecked_mut(idx);
col.rename(&name);
}
self.register_mutation()?;
Ok(self)
}
pub fn may_apply_at_idx<F, S>(&mut self, idx: usize, f: F) -> Result<&mut Self>
where
F: FnOnce(&Series) -> Result<S>,
S: IntoSeries,
{
let width = self.width();
let col = self.columns.get_mut(idx).ok_or(PolarsError::OutOfBounds(
format!(
"Column index: {} outside of DataFrame with {} columns",
idx, width
)
.into(),
))?;
let name = col.name().to_string();
let _ = mem::replace(col, f(col).map(|s| s.into_series())?);
unsafe {
let col = self.columns.get_unchecked_mut(idx);
col.rename(&name);
}
self.register_mutation()?;
Ok(self)
}
pub fn may_apply<F, S>(&mut self, column: &str, f: F) -> Result<&mut Self>
where
F: FnOnce(&Series) -> Result<S>,
S: IntoSeries,
{
let idx = self
.find_idx_by_name(column)
.ok_or(PolarsError::NotFound(column.to_string()))?;
self.may_apply_at_idx(idx, f)
}
pub fn slice(&self, offset: usize, length: usize) -> Result<Self> {
let col = self
.columns
.par_iter()
.map(|s| s.slice(offset, length))
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(col))
}
pub fn head(&self, length: Option<usize>) -> Self {
let col = self
.columns
.iter()
.map(|s| s.head(length))
.collect::<Vec<_>>();
DataFrame::new_no_checks(col)
}
pub fn tail(&self, length: Option<usize>) -> Self {
let col = self
.columns
.iter()
.map(|s| s.tail(length))
.collect::<Vec<_>>();
DataFrame::new_no_checks(col)
}
pub fn as_record_batches(&self) -> Result<Vec<RecordBatch>> {
let n_chunks = self.n_chunks()?;
let width = self.width();
let schema = Arc::new(self.schema());
let mut record_batches = Vec::with_capacity(n_chunks);
for i in 0..n_chunks {
let mut rb_cols = Vec::with_capacity(width);
for col in &self.columns {
rb_cols.push(Arc::clone(&col.chunks()[i]))
}
let rb = RecordBatch::try_new(Arc::clone(&schema), rb_cols)?;
record_batches.push(rb)
}
Ok(record_batches)
}
pub fn iter_record_batches(
&mut self,
buffer_size: usize,
) -> impl Iterator<Item = RecordBatch> + '_ {
match self.n_chunks() {
Ok(1) => {}
Ok(_) => {
self.columns = self
.columns
.iter()
.map(|s| s.rechunk(None).unwrap())
.collect();
}
Err(_) => {}
}
RecordBatchIter {
columns: &self.columns,
schema: Arc::new(self.schema()),
buffer_size,
idx: 0,
len: self.height(),
}
}
pub fn reverse(&self) -> Self {
let col = self.columns.iter().map(|s| s.reverse()).collect::<Vec<_>>();
DataFrame::new_no_checks(col)
}
pub fn shift(&self, periods: i32) -> Result<Self> {
let col = self
.columns
.iter()
.map(|s| s.shift(periods))
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(col))
}
pub fn fill_none(&self, strategy: FillNoneStrategy) -> Result<Self> {
let col = self
.columns
.iter()
.map(|s| s.fill_none(strategy))
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(col))
}
pub fn max(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.max_as_series()).collect();
DataFrame::new_no_checks(columns)
}
pub fn min(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.min_as_series()).collect();
DataFrame::new_no_checks(columns)
}
pub fn sum(&self) -> Self {
let columns = self.columns.par_iter().map(|s| s.sum_as_series()).collect();
DataFrame::new_no_checks(columns)
}
pub fn mean(&self) -> Self {
let columns = self
.columns
.par_iter()
.map(|s| s.mean_as_series())
.collect();
DataFrame::new_no_checks(columns)
}
pub fn median(&self) -> Self {
let columns = self
.columns
.par_iter()
.map(|s| s.median_as_series())
.collect();
DataFrame::new_no_checks(columns)
}
pub fn quantile(&self, quantile: f64) -> Result<Self> {
let columns = self
.columns
.par_iter()
.map(|s| s.quantile_as_series(quantile))
.collect::<Result<Vec<_>>>()?;
Ok(DataFrame::new_no_checks(columns))
}
pub fn pipe<F, B>(self, f: F) -> Result<B>
where
F: Fn(DataFrame) -> Result<B>,
{
f(self)
}
pub fn pipe_mut<F, B>(&mut self, f: F) -> Result<B>
where
F: Fn(&mut DataFrame) -> Result<B>,
{
f(self)
}
pub fn pipe_with_args<F, B, Args>(self, f: F, args: Args) -> Result<B>
where
F: Fn(DataFrame, Args) -> Result<B>,
{
f(self, args)
}
}
pub struct RecordBatchIter<'a> {
columns: &'a Vec<Series>,
schema: Arc<Schema>,
buffer_size: usize,
idx: usize,
len: usize,
}
impl<'a> Iterator for RecordBatchIter<'a> {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.len {
return None;
}
let length = if self.idx + self.buffer_size < self.len {
self.buffer_size
} else {
self.len - self.idx
};
let mut rb_cols = Vec::with_capacity(self.columns.len());
self.columns.iter().for_each(|s| {
let slice = s.slice(self.idx, length).unwrap();
rb_cols.push(Arc::clone(&slice.chunks()[0]))
});
let rb = RecordBatch::try_new(Arc::clone(&self.schema), rb_cols).unwrap();
self.idx += length;
Some(rb)
}
}
#[cfg(test)]
mod test {
use crate::prelude::*;
fn create_frame() -> DataFrame {
let s0 = Series::new("days", [0, 1, 2].as_ref());
let s1 = Series::new("temp", [22.1, 19.9, 7.].as_ref());
DataFrame::new(vec![s0, s1]).unwrap()
}
#[test]
fn test_select() {
let df = create_frame();
assert_eq!(df.column("days").unwrap().eq(1).sum(), Some(1));
}
#[test]
fn test_filter() {
let df = create_frame();
println!("{}", df.column("days").unwrap());
println!("{:?}", df);
println!("{:?}", df.filter(&df.column("days").unwrap().eq(0)))
}
#[test]
fn test_sort() {
let mut df = create_frame();
df.sort_in_place("temp", false).unwrap();
println!("{:?}", df);
}
#[test]
fn slice() {
let df = create_frame();
let sliced_df = df.slice(0, 2).expect("slice");
assert_eq!(sliced_df.shape(), (2, 2));
println!("{:?}", df)
}
}