use crate::prelude::*;
use crate::utils::Xob;
use ahash::RandomState;
use std::collections::{HashMap, HashSet};
use std::hash::Hash;
use unsafe_unwrap::UnsafeUnwrap;
pub(crate) fn prepare_hashed_relation<T>(
b: impl Iterator<Item = T>,
) -> HashMap<T, Vec<usize>, RandomState>
where
T: Hash + Eq,
{
let mut hash_tbl: HashMap<T, Vec<usize>, ahash::RandomState> =
HashMap::with_capacity_and_hasher(b.size_hint().0 / 10, RandomState::new());
b.enumerate()
.for_each(|(idx, key)| hash_tbl.entry(key).or_insert_with(Vec::new).push(idx));
hash_tbl
}
fn hash_join_tuples_inner<T>(
a: impl Iterator<Item = T>,
b: impl Iterator<Item = T>,
swap: bool,
) -> Vec<(usize, usize)>
where
T: Hash + Eq + Copy,
{
let mut results = Vec::new();
let hash_tbl = prepare_hashed_relation(b);
if swap {
a.enumerate().for_each(|(idx_a, key)| {
if let Some(indexes_b) = hash_tbl.get(&key) {
let tuples = indexes_b.iter().map(|&idx_b| (idx_b, idx_a));
results.extend(tuples)
}
});
} else {
a.enumerate().for_each(|(idx_a, key)| {
if let Some(indexes_b) = hash_tbl.get(&key) {
let tuples = indexes_b.iter().map(|&idx_b| (idx_a, idx_b));
results.extend(tuples)
}
});
}
results
}
fn hash_join_tuples_left<T>(
a: impl Iterator<Item = T>,
b: impl Iterator<Item = T>,
) -> Vec<(usize, Option<usize>)>
where
T: Hash + Eq + Copy,
{
let mut results = Vec::new();
let hash_tbl = prepare_hashed_relation(b);
a.enumerate().for_each(|(idx_a, key)| {
match hash_tbl.get(&key) {
Some(indexes_b) => results.extend(indexes_b.iter().map(|&idx_b| (idx_a, Some(idx_b)))),
None => results.push((idx_a, None)),
}
});
results
}
fn hash_join_tuples_outer<T, I, J>(a: I, b: J, swap: bool) -> Vec<(Option<usize>, Option<usize>)>
where
I: Iterator<Item = T>,
J: Iterator<Item = T>,
T: Hash + Eq + Copy + Sync,
{
let mut results = Vec::with_capacity(a.size_hint().0 + b.size_hint().0);
let mut hash_tbl = prepare_hashed_relation(b);
if swap {
a.enumerate().for_each(|(idx_a, key)| {
match hash_tbl.remove(&key) {
Some(indexes_b) => {
results.extend(indexes_b.iter().map(|&idx_b| (Some(idx_b), Some(idx_a))))
}
None => {
results.push((None, Some(idx_a)));
}
}
});
hash_tbl.iter().for_each(|(_k, indexes_b)| {
results.extend(indexes_b.iter().map(|&idx_b| (Some(idx_b), None)))
});
} else {
a.enumerate().for_each(|(idx_a, key)| {
match hash_tbl.remove(&key) {
Some(indexes_b) => {
results.extend(indexes_b.iter().map(|&idx_b| (Some(idx_a), Some(idx_b))))
}
None => {
results.push((Some(idx_a), None));
}
}
});
hash_tbl.iter().for_each(|(_k, indexes_b)| {
results.extend(indexes_b.iter().map(|&idx_b| (None, Some(idx_b))))
});
};
results
}
pub(crate) trait HashJoin<T> {
fn hash_join_inner(&self, _other: &ChunkedArray<T>) -> Vec<(usize, usize)> {
unimplemented!()
}
fn hash_join_left(&self, _other: &ChunkedArray<T>) -> Vec<(usize, Option<usize>)> {
unimplemented!()
}
fn hash_join_outer(&self, _other: &ChunkedArray<T>) -> Vec<(Option<usize>, Option<usize>)> {
unimplemented!()
}
}
impl HashJoin<Float64Type> for Float64Chunked {}
impl HashJoin<Float32Type> for Float32Chunked {}
impl HashJoin<ListType> for ListChunked {}
macro_rules! det_hash_prone_order {
($self:expr, $other:expr) => {{
let left_first = $self.len() > $other.len();
let a;
let b;
if left_first {
a = $self;
b = $other;
} else {
b = $self;
a = $other;
}
(a, b, !left_first)
}};
}
impl<T> HashJoin<T> for ChunkedArray<T>
where
T: PolarsIntegerType + Sync,
T::Native: Eq + Hash,
{
fn hash_join_inner(&self, other: &ChunkedArray<T>) -> Vec<(usize, usize)> {
let (a, b, swap) = det_hash_prone_order!(self, other);
match (a.cont_slice(), b.cont_slice()) {
(Ok(a_slice), Ok(b_slice)) => {
hash_join_tuples_inner(a_slice.iter(), b_slice.iter(), swap)
}
(Ok(a_slice), Err(_)) => {
hash_join_tuples_inner(
a_slice.iter().map(|v| Some(*v)),
b.into_iter(),
swap,
)
}
(Err(_), Ok(b_slice)) => {
hash_join_tuples_inner(a.into_iter(), b_slice.iter().map(|v| Some(*v)), swap)
}
(Err(_), Err(_)) => hash_join_tuples_inner(a.into_iter(), b.into_iter(), swap),
}
}
fn hash_join_left(&self, other: &ChunkedArray<T>) -> Vec<(usize, Option<usize>)> {
match (self.cont_slice(), other.cont_slice()) {
(Ok(a_slice), Ok(b_slice)) => hash_join_tuples_left(a_slice.iter(), b_slice.iter()),
(Ok(a_slice), Err(_)) => {
hash_join_tuples_left(
a_slice.iter().map(|v| Some(*v)),
other.into_iter(),
)
}
(Err(_), Ok(b_slice)) => {
hash_join_tuples_left(self.into_iter(), b_slice.iter().map(|v| Some(*v)))
}
(Err(_), Err(_)) => hash_join_tuples_left(self.into_iter(), other.into_iter()),
}
}
fn hash_join_outer(&self, other: &ChunkedArray<T>) -> Vec<(Option<usize>, Option<usize>)> {
let (a, b, swap) = det_hash_prone_order!(self, other);
match (a.cont_slice(), b.cont_slice()) {
(Ok(a_slice), Ok(b_slice)) => {
hash_join_tuples_outer(a_slice.iter(), b_slice.iter(), swap)
}
(Ok(a_slice), Err(_)) => {
hash_join_tuples_outer(
a_slice.iter().map(|v| Some(*v)),
b.into_iter(),
swap,
)
}
(Err(_), Ok(b_slice)) => hash_join_tuples_outer(
a.into_iter(),
b_slice.iter().map(|v: &T::Native| Some(*v)),
swap,
),
(Err(_), Err(_)) => hash_join_tuples_outer(a.into_iter(), b.into_iter(), swap),
}
}
}
impl HashJoin<BooleanType> for BooleanChunked {
fn hash_join_inner(&self, other: &BooleanChunked) -> Vec<(usize, usize)> {
let (a, b, swap) = det_hash_prone_order!(self, other);
match (a.is_optimal_aligned(), b.is_optimal_aligned()) {
(true, true) => {
hash_join_tuples_inner(a.into_no_null_iter(), b.into_no_null_iter(), swap)
}
_ => hash_join_tuples_inner(a.into_iter(), b.into_iter(), swap),
}
}
fn hash_join_left(&self, other: &BooleanChunked) -> Vec<(usize, Option<usize>)> {
match (self.is_optimal_aligned(), other.is_optimal_aligned()) {
(true, true) => {
hash_join_tuples_left(self.into_no_null_iter(), other.into_no_null_iter())
}
_ => hash_join_tuples_left(self.into_iter(), other.into_iter()),
}
}
fn hash_join_outer(&self, other: &BooleanChunked) -> Vec<(Option<usize>, Option<usize>)> {
let (a, b, swap) = det_hash_prone_order!(self, other);
match (a.is_optimal_aligned(), b.is_optimal_aligned()) {
(true, true) => {
hash_join_tuples_outer(a.into_no_null_iter(), b.into_no_null_iter(), swap)
}
_ => hash_join_tuples_outer(a.into_iter(), b.into_iter(), swap),
}
}
}
impl HashJoin<Utf8Type> for Utf8Chunked {
fn hash_join_inner(&self, other: &Utf8Chunked) -> Vec<(usize, usize)> {
let (a, b, swap) = det_hash_prone_order!(self, other);
match (a.is_optimal_aligned(), b.is_optimal_aligned()) {
(true, true) => {
hash_join_tuples_inner(a.into_no_null_iter(), b.into_no_null_iter(), swap)
}
_ => hash_join_tuples_inner(a.into_iter(), b.into_iter(), swap),
}
}
fn hash_join_left(&self, other: &Utf8Chunked) -> Vec<(usize, Option<usize>)> {
match (self.is_optimal_aligned(), other.is_optimal_aligned()) {
(true, true) => {
hash_join_tuples_left(self.into_no_null_iter(), other.into_no_null_iter())
}
_ => hash_join_tuples_left(self.into_iter(), other.into_iter()),
}
}
fn hash_join_outer(&self, other: &Utf8Chunked) -> Vec<(Option<usize>, Option<usize>)> {
let (a, b, swap) = det_hash_prone_order!(self, other);
match (a.is_optimal_aligned(), b.is_optimal_aligned()) {
(true, true) => {
hash_join_tuples_outer(a.into_no_null_iter(), b.into_no_null_iter(), swap)
}
_ => hash_join_tuples_outer(a.into_iter(), b.into_iter(), swap),
}
}
}
pub trait ZipOuterJoinColumn {
fn zip_outer_join_column(
&self,
_right_column: &Series,
_opt_join_tuples: &[(Option<usize>, Option<usize>)],
) -> Series {
unimplemented!()
}
}
impl<T> ZipOuterJoinColumn for ChunkedArray<T>
where
T: PolarsIntegerType,
ChunkedArray<T>: IntoSeries,
{
fn zip_outer_join_column(
&self,
right_column: &Series,
opt_join_tuples: &[(Option<usize>, Option<usize>)],
) -> Series {
let right_ca = self.unpack_series_matching_type(right_column).unwrap();
let left_rand_access = self.take_rand();
let right_rand_access = right_ca.take_rand();
opt_join_tuples
.iter()
.map(|(opt_left_idx, opt_right_idx)| {
if let Some(left_idx) = opt_left_idx {
unsafe { left_rand_access.get_unchecked(*left_idx) }
} else {
unsafe {
let right_idx = opt_right_idx.unsafe_unwrap();
right_rand_access.get_unchecked(right_idx)
}
}
})
.collect::<Xob<ChunkedArray<T>>>()
.into_inner()
.into_series()
}
}
impl ZipOuterJoinColumn for Float32Chunked {}
impl ZipOuterJoinColumn for Float64Chunked {}
impl ZipOuterJoinColumn for ListChunked {}
#[cfg(feature = "object")]
impl<T> ZipOuterJoinColumn for ObjectChunked<T> {}
macro_rules! impl_zip_outer_join {
($chunkedtype:ident) => {
impl ZipOuterJoinColumn for $chunkedtype {
fn zip_outer_join_column(
&self,
right_column: &Series,
opt_join_tuples: &[(Option<usize>, Option<usize>)],
) -> Series {
let right_ca = self.unpack_series_matching_type(right_column).unwrap();
let left_rand_access = self.take_rand();
let right_rand_access = right_ca.take_rand();
opt_join_tuples
.iter()
.map(|(opt_left_idx, opt_right_idx)| {
if let Some(left_idx) = opt_left_idx {
unsafe { left_rand_access.get_unchecked(*left_idx) }
} else {
unsafe {
let right_idx = opt_right_idx.unsafe_unwrap();
right_rand_access.get_unchecked(right_idx)
}
}
})
.collect::<$chunkedtype>()
.into_series()
}
}
};
}
impl_zip_outer_join!(BooleanChunked);
impl_zip_outer_join!(Utf8Chunked);
impl DataFrame {
fn finish_join(&self, mut df_left: DataFrame, mut df_right: DataFrame) -> Result<DataFrame> {
let mut left_names = HashSet::with_capacity_and_hasher(df_left.width(), RandomState::new());
df_left.columns.iter().for_each(|series| {
left_names.insert(series.name());
});
let mut rename_strs = Vec::with_capacity(df_right.width());
df_right.columns.iter().for_each(|series| {
if left_names.contains(series.name()) {
rename_strs.push(series.name().to_owned())
}
});
for name in rename_strs {
df_right.rename(&name, &format!("{}_right", name))?;
}
df_left.hstack_mut(&df_right.columns)?;
Ok(df_left)
}
fn create_left_df<B: Sync>(&self, join_tuples: &[(usize, B)]) -> DataFrame {
unsafe {
self.take_iter_unchecked_bounds(
join_tuples.iter().map(|(left, _right)| *left),
Some(join_tuples.len()),
)
}
}
pub fn inner_join(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
) -> Result<DataFrame> {
let s_left = self.column(left_on)?;
let s_right = other.column(right_on)?;
self.inner_join_from_series(other, s_left, s_right)
}
pub(crate) fn inner_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
) -> Result<DataFrame> {
let join_tuples = s_left.hash_join_inner(s_right);
let (df_left, df_right) = rayon::join(
|| self.create_left_df(&join_tuples),
|| unsafe {
other
.drop(s_right.name())
.unwrap()
.take_iter_unchecked_bounds(
join_tuples.iter().map(|(_left, right)| *right),
Some(join_tuples.len()),
)
},
);
self.finish_join(df_left, df_right)
}
pub fn left_join(&self, other: &DataFrame, left_on: &str, right_on: &str) -> Result<DataFrame> {
let s_left = self.column(left_on)?;
let s_right = other.column(right_on)?;
self.left_join_from_series(other, s_left, s_right)
}
pub(crate) fn left_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
) -> Result<DataFrame> {
let opt_join_tuples = s_left.hash_join_left(s_right);
let (df_left, df_right) = rayon::join(
|| self.create_left_df(&opt_join_tuples),
|| unsafe {
other
.drop(s_right.name())
.unwrap()
.take_opt_iter_unchecked_bounds(
opt_join_tuples.iter().map(|(_left, right)| *right),
Some(opt_join_tuples.len()),
)
},
);
self.finish_join(df_left, df_right)
}
pub fn outer_join(
&self,
other: &DataFrame,
left_on: &str,
right_on: &str,
) -> Result<DataFrame> {
let s_left = self.column(left_on)?;
let s_right = other.column(right_on)?;
self.outer_join_from_series(other, s_left, s_right)
}
pub(crate) fn outer_join_from_series(
&self,
other: &DataFrame,
s_left: &Series,
s_right: &Series,
) -> Result<DataFrame> {
let opt_join_tuples = s_left.hash_join_outer(s_right);
let (mut df_left, df_right) = rayon::join(
|| unsafe {
self.drop(s_left.name())
.unwrap()
.take_opt_iter_unchecked_bounds(
opt_join_tuples.iter().map(|(left, _right)| *left),
Some(opt_join_tuples.len()),
)
},
|| unsafe {
other
.drop(s_right.name())
.unwrap()
.take_opt_iter_unchecked_bounds(
opt_join_tuples.iter().map(|(_left, right)| *right),
Some(opt_join_tuples.len()),
)
},
);
let mut s = s_left.zip_outer_join_column(s_right, &opt_join_tuples);
s.rename(s_left.name());
df_left.hstack_mut(&[s])?;
self.finish_join(df_left, df_right)
}
}
#[cfg(test)]
mod test {
use crate::prelude::*;
fn create_frames() -> (DataFrame, DataFrame) {
let s0 = Series::new("days", &[0, 1, 2]);
let s1 = Series::new("temp", &[22.1, 19.9, 7.]);
let s2 = Series::new("rain", &[0.2, 0.1, 0.3]);
let temp = DataFrame::new(vec![s0, s1, s2]).unwrap();
let s0 = Series::new("days", &[1, 2, 3, 1]);
let s1 = Series::new("rain", &[0.1, 0.2, 0.3, 0.4]);
let rain = DataFrame::new(vec![s0, s1]).unwrap();
(temp, rain)
}
#[test]
fn test_inner_join() {
let (temp, rain) = create_frames();
let joined = temp.inner_join(&rain, "days", "days").unwrap();
let join_col_days = Series::new("days", &[1, 2, 1]);
let join_col_temp = Series::new("temp", &[19.9, 7., 19.9]);
let join_col_rain = Series::new("rain", &[0.1, 0.3, 0.1]);
let join_col_rain_right = Series::new("rain_right", [0.1, 0.2, 0.4].as_ref());
let true_df = DataFrame::new(vec![
join_col_days,
join_col_temp,
join_col_rain,
join_col_rain_right,
])
.unwrap();
println!("{}", joined);
assert!(joined.frame_equal(&true_df));
}
#[test]
fn test_left_join() {
let s0 = Series::new("days", &[0, 1, 2, 3, 4]);
let s1 = Series::new("temp", &[22.1, 19.9, 7., 2., 3.]);
let temp = DataFrame::new(vec![s0, s1]).unwrap();
let s0 = Series::new("days", &[1, 2]);
let s1 = Series::new("rain", &[0.1, 0.2]);
let rain = DataFrame::new(vec![s0, s1]).unwrap();
let joined = temp.left_join(&rain, "days", "days").unwrap();
println!("{}", &joined);
assert_eq!(
(joined.column("rain").unwrap().sum::<f32>().unwrap() * 10.).round(),
3.
);
assert_eq!(joined.column("rain").unwrap().null_count(), 3);
let s0 = Series::new("days", &["mo", "tue", "wed", "thu", "fri"]);
let s1 = Series::new("temp", &[22.1, 19.9, 7., 2., 3.]);
let temp = DataFrame::new(vec![s0, s1]).unwrap();
let s0 = Series::new("days", &["tue", "wed"]);
let s1 = Series::new("rain", &[0.1, 0.2]);
let rain = DataFrame::new(vec![s0, s1]).unwrap();
let joined = temp.left_join(&rain, "days", "days").unwrap();
println!("{}", &joined);
assert_eq!(
(joined.column("rain").unwrap().sum::<f32>().unwrap() * 10.).round(),
3.
);
assert_eq!(joined.column("rain").unwrap().null_count(), 3);
}
#[test]
fn test_outer_join() {
let (temp, rain) = create_frames();
let joined = temp.outer_join(&rain, "days", "days").unwrap();
println!("{:?}", &joined);
assert_eq!(joined.height(), 5);
assert_eq!(joined.column("days").unwrap().sum::<i32>(), Some(7));
}
#[test]
fn test_join_with_nulls() {
let dts = &[20, 21, 22, 23, 24, 25, 27, 28];
let vals = &[1.2, 2.4, 4.67, 5.8, 4.4, 3.6, 7.6, 6.5];
let df = DataFrame::new(vec![Series::new("date", dts), Series::new("val", vals)]).unwrap();
let vals2 = &[Some(1.1), None, Some(3.3), None, None];
let df2 = DataFrame::new(vec![
Series::new("date", &dts[3..]),
Series::new("val2", vals2),
])
.unwrap();
let joined = df.left_join(&df2, "date", "date").unwrap();
assert_eq!(
joined
.column("val2")
.unwrap()
.f64()
.unwrap()
.get(joined.height() - 1),
None
);
}
#[test]
fn test_join_query() {
let df_a = df! {
"a" => &[1, 2, 1, 1],
"b" => &["a", "b", "c", "c"],
"c" => &[0, 1, 2, 3]
}
.unwrap();
let df_b = df! {
"foo" => &[1, 1, 1],
"bar" => &["a", "c", "c"],
"ham" => &["let", "var", "const"]
}
.unwrap();
let mut s = df_a
.column("a")
.unwrap()
.cast::<Utf8Type>()
.unwrap()
.utf8()
.unwrap()
+ df_a.column("b").unwrap().utf8().unwrap();
s.rename("dummy");
let df_a = df_a.with_column(s).unwrap();
let mut s = df_b
.column("foo")
.unwrap()
.cast::<Utf8Type>()
.unwrap()
.utf8()
.unwrap()
+ df_b.column("bar").unwrap().utf8().unwrap();
s.rename("dummy");
let df_b = df_b.with_column(s).unwrap();
let joined = df_a.left_join(&df_b, "dummy", "dummy").unwrap();
let ham_col = joined.column("ham").unwrap();
let ca = ham_col.utf8().unwrap();
assert_eq!(
Vec::from(ca),
&[
Some("let"),
None,
Some("var"),
Some("const"),
Some("var"),
Some("const")
]
);
}
}