use crate::{DataFrame, DataValue, JoinRelation, Key};
use data_value::Extract as _;
use numpy::PyArray2;
use pyo3::{exceptions::PyTypeError, prelude::*, types::PyList};
use tracing::trace;
impl DataFrame {
fn select_data(
&self,
keys: Option<Vec<String>>,
transposed: Option<bool>,
) -> Result<ndarray::Array2<DataValue>, crate::error::Error> {
let keys = keys
.unwrap_or(self.keys())
.into_iter()
.map(Key::from)
.collect::<Vec<Key>>();
if transposed.unwrap_or(false) {
self.select(Some(keys.as_slice()))
} else {
self.select_transposed(Some(keys.as_slice()))
}
}
}
#[pymethods]
impl DataFrame {
#[new]
pub fn init() -> Self {
Self::default()
}
pub fn keys(&self) -> Vec<String> {
self.dataframe
.keys()
.iter()
.map(|x| x.name().to_string())
.collect()
}
#[pyo3(signature = (keys=None, transposed=None))]
pub fn as_numpy_u32<'py>(
&self,
keys: Option<Vec<String>>,
transposed: Option<bool>,
py: Python<'py>,
) -> PyResult<Bound<'py, numpy::PyArray2<u32>>> {
let data = self
.select_data(keys, transposed)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
Ok(PyArray2::from_array(py, &data.mapv(|x| u32::extract(&x))))
}
#[pyo3(signature = (keys=None, transposed=None))]
pub fn as_numpy_u64<'py>(
&self,
keys: Option<Vec<String>>,
transposed: Option<bool>,
py: Python<'py>,
) -> PyResult<Bound<'py, numpy::PyArray2<u64>>> {
let data = self
.select_data(keys, transposed)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
Ok(PyArray2::from_array(py, &data.mapv(|x| u64::extract(&x))))
}
#[pyo3(signature = (keys=None, transposed=None))]
pub fn as_numpy_i32<'py>(
&self,
keys: Option<Vec<String>>,
transposed: Option<bool>,
py: Python<'py>,
) -> PyResult<Bound<'py, numpy::PyArray2<i32>>> {
let data = self
.select_data(keys, transposed)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
Ok(PyArray2::from_array(py, &data.mapv(|x| i32::extract(&x))))
}
#[pyo3(signature = (keys=None, transposed=None))]
pub fn as_numpy_i64<'py>(
&self,
keys: Option<Vec<String>>,
transposed: Option<bool>,
py: Python<'py>,
) -> PyResult<Bound<'py, numpy::PyArray2<i64>>> {
let data = self
.select_data(keys, transposed)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
Ok(PyArray2::from_array(py, &data.mapv(|x| i64::extract(&x))))
}
#[pyo3(signature = (keys=None, transposed=None))]
pub fn as_numpy_f32<'py>(
&self,
keys: Option<Vec<String>>,
transposed: Option<bool>,
py: Python<'py>,
) -> PyResult<Bound<'py, numpy::PyArray2<f32>>> {
let data = self
.select_data(keys, transposed)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
Ok(PyArray2::from_array(py, &data.mapv(|x| f32::extract(&x))))
}
#[pyo3(signature = (keys=None, transposed=None))]
pub fn as_numpy_f64<'py>(
&self,
keys: Option<Vec<String>>,
transposed: Option<bool>,
py: Python<'py>,
) -> PyResult<Bound<'py, numpy::PyArray2<f64>>> {
let data = self
.select_data(keys, transposed)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
Ok(PyArray2::from_array(py, &data.mapv(|x| f64::extract(&x))))
}
#[pyo3(name = "shrink")]
pub fn py_shrink(&mut self) {
self.dataframe.shrink();
}
#[pyo3(name = "add_metadata")]
pub fn py_add_metadata(&mut self, key: String, value: DataValue) {
self.metadata.insert(key, value);
}
#[pyo3(name = "get_metadata")]
pub fn py_get_metadata(&self, key: &str) -> Option<DataValue> {
self.metadata.get(key).cloned()
}
#[pyo3(name = "rename_key")]
pub fn py_rename_key(&mut self, key: &str, new_name: &str) -> Result<(), PyErr> {
self.dataframe
.rename_key(key, new_name.into())
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("{e}")))
}
#[pyo3(name = "add_alias")]
pub fn py_add_alias(&mut self, key: &str, new_name: &str) -> Result<(), PyErr> {
self.dataframe
.add_alias(key, new_name)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("{e}")))
}
#[pyo3(name = "select", signature = (keys=None))]
pub fn py_select<'py>(
&self,
py: Python<'py>,
keys: Option<Vec<String>>,
) -> Result<Bound<'py, PyList>, PyErr> {
let keys = keys
.unwrap_or(self.keys())
.into_iter()
.map(Key::from)
.collect::<Vec<Key>>();
let selected = self
.select(Some(keys.as_slice()))
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
let list = PyList::empty(py);
for rows in selected.rows() {
let row = PyList::empty(py);
for value in rows.iter() {
row.append(value.clone())
.expect("BUG: cannot append to list");
}
list.append(row).expect("BUG: cannot append to list");
}
Ok(list)
}
#[pyo3(name = "select_transposed", signature = (keys=None))]
pub fn py_select_transposed<'py>(
&self,
py: Python<'py>,
keys: Option<Vec<String>>,
) -> Result<Bound<'py, PyList>, PyErr> {
let keys = keys
.unwrap_or(self.keys())
.into_iter()
.map(Key::from)
.collect::<Vec<Key>>();
let selected = self
.select_transposed(Some(keys.as_slice()))
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot select data: {e}")))?;
let list = PyList::empty(py);
for rows in selected.rows() {
let row = PyList::empty(py);
for value in rows.iter() {
row.append(value.clone())?;
}
list.append(row)?;
}
Ok(list)
}
#[pyo3(name = "select_column")]
pub fn py_select_column<'py>(
&self,
py: Python<'py>,
key: String,
) -> Result<Bound<'py, PyList>, PyErr> {
let selected = self
.select_column(Key::from(key))
.ok_or_else(|| PyErr::new::<PyTypeError, _>("Cannot select column"))?;
let list = PyList::empty(py);
for x in selected.to_vec().into_iter() {
list.append(x)?;
}
Ok(list)
}
#[pyo3(name = "join")]
pub fn py_join(&mut self, other: DataFrame, join_type: JoinRelation) -> Result<(), PyErr> {
self.dataframe
.join(other.dataframe, &join_type)
.map_err(|e| PyErr::new::<PyTypeError, _>(format!("Cannot join data: {e}")))?;
Ok(())
}
pub fn add_constant(&mut self, key: Key, feature: DataValue) -> Result<(), PyErr> {
self.constants.insert(key, feature);
Ok(())
}
fn __repr__(&self) -> String {
self.to_string()
}
fn __str__(&self) -> String {
self.to_string()
}
pub fn __iadd__(&mut self, object: Bound<'_, Self>) -> Result<(), PyErr> {
trace!("{object:?}");
let obj: Self = object.extract()?;
self.dataframe += obj.dataframe;
Ok(())
}
pub fn __isub__(&mut self, object: Bound<'_, Self>) -> Result<(), PyErr> {
trace!("{object:?}");
let obj: Self = object.extract()?;
self.dataframe -= obj.dataframe;
Ok(())
}
pub fn __imul__(&mut self, object: Bound<'_, Self>) -> Result<(), PyErr> {
trace!("{object:?}");
let obj: Self = object.extract()?;
self.dataframe *= obj.dataframe;
Ok(())
}
pub fn __itruediv__(&mut self, object: Bound<'_, Self>) -> Result<(), PyErr> {
trace!("{object:?}");
let obj: Self = object.extract()?;
self.dataframe /= obj.dataframe;
Ok(())
}
pub fn __len__(&mut self) -> Result<usize, PyErr> {
Ok(self.dataframe.len())
}
}
#[cfg(test)]
mod test {
use super::*;
use data_value::DataValue;
use halfbrown::hashmap;
use pyo3::ffi::c_str;
use rstest::*;
use tracing_test::traced_test;
#[fixture]
fn df() -> DataFrame {
let mut df = DataFrame::init();
assert!(df
.push(hashmap! {
Key::from("key1") => DataValue::U32(1),
Key::from("key2") => DataValue::U32(2),
})
.is_ok());
assert!(df
.push(hashmap! {
Key::from("key1") => DataValue::U32(11),
Key::from("key2") => DataValue::U32(21),
})
.is_ok());
df
}
#[rstest]
#[traced_test]
fn basic_ops_add(mut df: DataFrame) {
let mut df_expect = df.clone();
let df2 = df.clone();
let exec = Python::with_gil(|py| -> PyResult<()> {
df.__iadd__(df.clone().into_pyobject(py)?)?;
df_expect.dataframe += df2.dataframe;
tracing::trace!("{} vs {}", df, df_expect);
assert_eq!(df.dataframe, df_expect.dataframe);
Ok(())
});
assert!(exec.is_ok(), "{:?}", exec);
}
#[rstest]
#[traced_test]
fn basic_ops_sub(mut df: DataFrame) {
let mut df_expect = df.clone();
let df2 = df.clone();
let exec = Python::with_gil(|py| -> PyResult<()> {
df.__isub__(df.clone().into_pyobject(py)?)?;
df_expect.dataframe -= df2.dataframe;
tracing::trace!("{} vs {}", df, df_expect);
assert_eq!(df.dataframe, df_expect.dataframe);
Ok(())
});
assert!(exec.is_ok(), "{:?}", exec);
}
#[rstest]
#[traced_test]
fn basic_ops_mul(mut df: DataFrame) {
let mut df_expect = df.clone();
let df2 = df.clone();
let exec = Python::with_gil(|py| -> PyResult<()> {
df.__imul__(df.clone().into_pyobject(py)?)?;
df_expect.dataframe *= df2.dataframe;
tracing::trace!("{} vs {}", df, df_expect);
assert_eq!(df.dataframe, df_expect.dataframe);
Ok(())
});
assert!(exec.is_ok(), "{:?}", exec);
}
#[rstest]
#[traced_test]
fn basic_ops_div(mut df: DataFrame) {
let mut df_expect = df.clone();
let df2 = df.clone();
let exec = Python::with_gil(|py| -> PyResult<()> {
df.__itruediv__(df.clone().into_pyobject(py)?)?;
df_expect.dataframe /= df2.dataframe;
tracing::trace!("{} vs {}", df, df_expect);
assert_eq!(df.dataframe, df_expect.dataframe);
Ok(())
});
assert!(exec.is_ok(), "{:?}", exec);
}
#[rstest]
#[traced_test]
#[rstest]
fn test_numpy(mut df: DataFrame) {
let exec = Python::with_gil(|py| -> PyResult<()> {
let code = c_str!(
r#"
def example(df):
import numpy as np
a_np = df.as_numpy_f32(['key1', 'key2'])
print(a_np)
b_np = df.as_numpy_u32(['key1', 'key'])
print(b_np)
b_np = df.as_numpy_i32(['key1', 'key'])
print(b_np)
b_np = df.as_numpy_i64(['key1', 'key'])
print(b_np)
b_np = df.as_numpy_u64(['key1', 'key'])
print(b_np)
b_np = df.as_numpy_f64(['key1', 'key'])
print(b_np)
b_np = df.as_numpy_f64(['key1', 'key'], transposed=True)
print(b_np)
return df
"#
);
let fun: Py<PyAny> = PyModule::from_code(py, code, c_str!(""), c_str!(""))?
.getattr("example")?
.into();
let result = fun.call1(py, (df.clone(),));
assert!(df.py_join(df.clone(), JoinRelation::default()).is_ok());
if py.import("numpy").is_ok() {
assert!(result.is_ok(), "{:?}", result);
} else {
assert!(result.is_err(), "{:?}", result);
}
Ok(())
});
assert!(exec.is_ok(), "{:?}", exec);
}
#[rstest]
fn basic_python_dataframe(mut df: DataFrame) {
let exec = Python::with_gil(|py| -> PyResult<()> {
let fun: Py<PyAny> = PyModule::from_code(
py,
c_str!(
"
def example(df):
print(df)
df.shrink()
assert len(df) == 2
df.add_alias('key1', 'key1-alias')
a = df.select(['key1', 'key2'])
print(a)
b = df.select(['key1-alias', 'key2'])
print(b)
df.rename_key('key1', 'key1new')
df.rename_key('key1new', 'key1')
assert a == [[1, 2], [11, 21]]
assert a == b
df.add_metadata('test', 1)
m = df.get_metadata('test')
assert m == 1
b = df.select_transposed(['key1', 'key2'])
print(b)
assert b == [[1, 11], [2, 21]]
c = df.select_column('key1')
print(c)
assert c == [1, 11]
a += b
print(a)
assert a == [[2, 13], [4, 23]]
a -= b
print(a)
assert e == a
f = e * b
print(f)
assert f == [[1, 22], [44, 441]]
g = f / b
print(g)
assert g == e
"
),
c_str!(""),
c_str!(""),
)?
.getattr("example")?
.into();
let _ = fun.call1(py, (df.clone(),));
assert!(df.py_join(df.clone(), JoinRelation::default()).is_ok());
Ok(())
});
assert!(exec.is_ok(), "{:?}", exec);
}
}