Function polars_core::utils::split_ca

source ·
pub fn split_ca<T>(
    ca: &ChunkedArray<T>,
    n: usize
) -> PolarsResult<Vec<ChunkedArray<T>>>where
    T: PolarsDataType,
Examples found in repository?
src/series/mod.rs (line 494)
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
    pub fn filter_threaded(&self, filter: &BooleanChunked, rechunk: bool) -> PolarsResult<Series> {
        // this would fail if there is a broadcasting filter.
        // because we cannot split that filter over threads
        // besides they are a no-op, so we do the standard filter.
        if filter.len() == 1 {
            return self.filter(filter);
        }
        let n_threads = POOL.current_num_threads();
        let filters = split_ca(filter, n_threads).unwrap();
        let series = split_series(self, n_threads).unwrap();

        let series: PolarsResult<Vec<_>> = POOL.install(|| {
            filters
                .par_iter()
                .zip(series)
                .map(|(filter, s)| s.filter(filter))
                .collect()
        });

        Ok(self.finish_take_threaded(series?, rechunk))
    }
More examples
Hide additional examples
src/frame/mod.rs (line 1545)
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
    fn filter_vertical(&mut self, mask: &BooleanChunked) -> PolarsResult<Self> {
        let n_threads = POOL.current_num_threads();

        let masks = split_ca(mask, n_threads).unwrap();
        let dfs = split_df(self, n_threads).unwrap();
        let dfs: PolarsResult<Vec<_>> = POOL.install(|| {
            masks
                .par_iter()
                .zip(dfs)
                .map(|(mask, df)| {
                    let cols = df
                        .columns
                        .iter()
                        .map(|s| s.filter(mask))
                        .collect::<PolarsResult<_>>()?;
                    Ok(DataFrame::new_no_checks(cols))
                })
                .collect()
        });

        let mut iter = dfs?.into_iter();
        let first = iter.next().unwrap();
        Ok(iter.fold(first, |mut acc, df| {
            acc.vstack_mut(&df).unwrap();
            acc
        }))
    }

    /// Take the `DataFrame` rows by a boolean mask.
    ///
    /// # Example
    ///
    /// ```
    /// # use polars_core::prelude::*;
    /// fn example(df: &DataFrame) -> PolarsResult<DataFrame> {
    ///     let mask = df.column("sepal.width")?.is_not_null();
    ///     df.filter(&mask)
    /// }
    /// ```
    pub fn filter(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
        if std::env::var("POLARS_VERT_PAR").is_ok() {
            return self.clone().filter_vertical(mask);
        }
        let new_col = self.try_apply_columns_par(&|s| match s.dtype() {
            DataType::Utf8 => s.filter_threaded(mask, true),
            _ => s.filter(mask),
        })?;
        Ok(DataFrame::new_no_checks(new_col))
    }

    /// Same as `filter` but does not parallelize.
    pub fn _filter_seq(&self, mask: &BooleanChunked) -> PolarsResult<Self> {
        let new_col = self.try_apply_columns(&|s| s.filter(mask))?;
        Ok(DataFrame::new_no_checks(new_col))
    }

    /// Take `DataFrame` value by indexes from an iterator.
    ///
    /// # Example
    ///
    /// ```
    /// # use polars_core::prelude::*;
    /// fn example(df: &DataFrame) -> PolarsResult<DataFrame> {
    ///     let iterator = (0..9).into_iter();
    ///     df.take_iter(iterator)
    /// }
    /// ```
    pub fn take_iter<I>(&self, iter: I) -> PolarsResult<Self>
    where
        I: Iterator<Item = usize> + Clone + Sync + TrustedLen,
    {
        let new_col = self.try_apply_columns_par(&|s| {
            let mut i = iter.clone();
            s.take_iter(&mut i)
        })?;

        Ok(DataFrame::new_no_checks(new_col))
    }

    /// Take `DataFrame` values by indexes from an iterator.
    ///
    /// # Safety
    ///
    /// This doesn't do any bound checking but checks null validity.
    #[must_use]
    pub unsafe fn take_iter_unchecked<I>(&self, mut iter: I) -> Self
    where
        I: Iterator<Item = usize> + Clone + Sync + TrustedLen,
    {
        if std::env::var("POLARS_VERT_PAR").is_ok() {
            let idx_ca: NoNull<IdxCa> = iter.into_iter().map(|idx| idx as IdxSize).collect();
            return self.take_unchecked_vectical(&idx_ca.into_inner());
        }

        let n_chunks = self.n_chunks();
        let has_utf8 = self
            .columns
            .iter()
            .any(|s| matches!(s.dtype(), DataType::Utf8));

        if (n_chunks == 1 && self.width() > 1) || has_utf8 {
            let idx_ca: NoNull<IdxCa> = iter.into_iter().map(|idx| idx as IdxSize).collect();
            let idx_ca = idx_ca.into_inner();
            return self.take_unchecked(&idx_ca);
        }

        let new_col = if self.width() == 1 {
            self.columns
                .iter()
                .map(|s| s.take_iter_unchecked(&mut iter))
                .collect::<Vec<_>>()
        } else {
            self.apply_columns_par(&|s| {
                let mut i = iter.clone();
                s.take_iter_unchecked(&mut i)
            })
        };
        DataFrame::new_no_checks(new_col)
    }

    /// Take `DataFrame` values by indexes from an iterator that may contain None values.
    ///
    /// # Safety
    ///
    /// This doesn't do any bound checking. Out of bounds may access uninitialized memory.
    /// Null validity is checked
    #[must_use]
    pub unsafe fn take_opt_iter_unchecked<I>(&self, mut iter: I) -> Self
    where
        I: Iterator<Item = Option<usize>> + Clone + Sync + TrustedLen,
    {
        if std::env::var("POLARS_VERT_PAR").is_ok() {
            let idx_ca: IdxCa = iter
                .into_iter()
                .map(|opt| opt.map(|v| v as IdxSize))
                .collect();
            return self.take_unchecked_vectical(&idx_ca);
        }

        let n_chunks = self.n_chunks();

        let has_utf8 = self
            .columns
            .iter()
            .any(|s| matches!(s.dtype(), DataType::Utf8));

        if (n_chunks == 1 && self.width() > 1) || has_utf8 {
            let idx_ca: IdxCa = iter
                .into_iter()
                .map(|opt| opt.map(|v| v as IdxSize))
                .collect();
            return self.take_unchecked(&idx_ca);
        }

        let new_col = if self.width() == 1 {
            self.columns
                .iter()
                .map(|s| s.take_opt_iter_unchecked(&mut iter))
                .collect::<Vec<_>>()
        } else {
            self.apply_columns_par(&|s| {
                let mut i = iter.clone();
                s.take_opt_iter_unchecked(&mut i)
            })
        };

        DataFrame::new_no_checks(new_col)
    }

    /// Take `DataFrame` rows by index values.
    ///
    /// # Example
    ///
    /// ```
    /// # use polars_core::prelude::*;
    /// fn example(df: &DataFrame) -> PolarsResult<DataFrame> {
    ///     let idx = IdxCa::new("idx", &[0, 1, 9]);
    ///     df.take(&idx)
    /// }
    /// ```
    pub fn take(&self, indices: &IdxCa) -> PolarsResult<Self> {
        let indices = if indices.chunks.len() > 1 {
            Cow::Owned(indices.rechunk())
        } else {
            Cow::Borrowed(indices)
        };
        let new_col = POOL.install(|| {
            self.try_apply_columns_par(&|s| match s.dtype() {
                DataType::Utf8 => s.take_threaded(&indices, true),
                _ => s.take(&indices),
            })
        })?;

        Ok(DataFrame::new_no_checks(new_col))
    }

    pub(crate) unsafe fn take_unchecked(&self, idx: &IdxCa) -> Self {
        self.take_unchecked_impl(idx, true)
    }

    unsafe fn take_unchecked_impl(&self, idx: &IdxCa, allow_threads: bool) -> Self {
        let cols = if allow_threads {
            POOL.install(|| {
                self.apply_columns_par(&|s| match s.dtype() {
                    DataType::Utf8 => s.take_unchecked_threaded(idx, true).unwrap(),
                    _ => s.take_unchecked(idx).unwrap(),
                })
            })
        } else {
            self.columns
                .iter()
                .map(|s| s.take_unchecked(idx).unwrap())
                .collect()
        };
        DataFrame::new_no_checks(cols)
    }

    unsafe fn take_unchecked_vectical(&self, indices: &IdxCa) -> Self {
        let n_threads = POOL.current_num_threads();
        let idxs = split_ca(indices, n_threads).unwrap();

        let dfs: Vec<_> = POOL.install(|| {
            idxs.par_iter()
                .map(|idx| {
                    let cols = self
                        .columns
                        .iter()
                        .map(|s| s.take_unchecked(idx).unwrap())
                        .collect();
                    DataFrame::new_no_checks(cols)
                })
                .collect()
        });

        let mut iter = dfs.into_iter();
        let first = iter.next().unwrap();
        iter.fold(first, |mut acc, df| {
            acc.vstack_mut(&df).unwrap();
            acc
        })
    }
src/frame/hash_join/single_keys_dispatch.rs (line 173)
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
fn num_group_join_inner<T>(
    left: &ChunkedArray<T>,
    right: &ChunkedArray<T>,
) -> ((Vec<IdxSize>, Vec<IdxSize>), bool)
where
    T: PolarsIntegerType,
    T::Native: Hash + Eq + Send + AsU64 + Copy,
    Option<T::Native>: AsU64,
{
    let n_threads = POOL.current_num_threads();
    let (a, b, swap) = det_hash_prone_order!(left, right);
    let splitted_a = split_ca(a, n_threads).unwrap();
    let splitted_b = split_ca(b, n_threads).unwrap();
    match (
        left.null_count() == 0,
        right.null_count() == 0,
        left.chunks.len(),
        right.chunks.len(),
    ) {
        (true, true, 1, 1) => {
            let keys_a = splitted_to_slice(&splitted_a);
            let keys_b = splitted_to_slice(&splitted_b);
            (hash_join_tuples_inner(keys_a, keys_b, swap), !swap)
        }
        (true, true, _, _) => {
            let keys_a = splitted_by_chunks(&splitted_a);
            let keys_b = splitted_by_chunks(&splitted_b);
            (hash_join_tuples_inner(keys_a, keys_b, swap), !swap)
        }
        _ => {
            let keys_a = splitted_to_opt_vec(&splitted_a);
            let keys_b = splitted_to_opt_vec(&splitted_b);
            (hash_join_tuples_inner(keys_a, keys_b, swap), !swap)
        }
    }
}

#[cfg(feature = "chunked_ids")]
fn create_mappings(
    chunks_left: &[ArrayRef],
    chunks_right: &[ArrayRef],
    left_len: usize,
    right_len: usize,
) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
    let mapping_left = || {
        if chunks_left.len() > 1 {
            Some(create_chunked_index_mapping(chunks_left, left_len))
        } else {
            None
        }
    };

    let mapping_right = || {
        if chunks_right.len() > 1 {
            Some(create_chunked_index_mapping(chunks_right, right_len))
        } else {
            None
        }
    };

    POOL.join(mapping_left, mapping_right)
}

#[cfg(not(feature = "chunked_ids"))]
fn create_mappings(
    _chunks_left: &[ArrayRef],
    _chunks_right: &[ArrayRef],
    _left_len: usize,
    _right_len: usize,
) -> (Option<Vec<ChunkId>>, Option<Vec<ChunkId>>) {
    (None, None)
}

fn num_group_join_left<T>(left: &ChunkedArray<T>, right: &ChunkedArray<T>) -> LeftJoinIds
where
    T: PolarsIntegerType,
    T::Native: Hash + Eq + Send + AsU64,
    Option<T::Native>: AsU64,
{
    let n_threads = POOL.current_num_threads();
    let splitted_a = split_ca(left, n_threads).unwrap();
    let splitted_b = split_ca(right, n_threads).unwrap();
    match (
        left.null_count(),
        right.null_count(),
        left.chunks.len(),
        right.chunks.len(),
    ) {
        (0, 0, 1, 1) => {
            let keys_a = splitted_to_slice(&splitted_a);
            let keys_b = splitted_to_slice(&splitted_b);
            hash_join_tuples_left(keys_a, keys_b, None, None)
        }
        (0, 0, _, _) => {
            let keys_a = splitted_by_chunks(&splitted_a);
            let keys_b = splitted_by_chunks(&splitted_b);

            let (mapping_left, mapping_right) =
                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
            hash_join_tuples_left(
                keys_a,
                keys_b,
                mapping_left.as_deref(),
                mapping_right.as_deref(),
            )
        }
        _ => {
            let keys_a = splitted_to_opt_vec(&splitted_a);
            let keys_b = splitted_to_opt_vec(&splitted_b);
            let (mapping_left, mapping_right) =
                create_mappings(left.chunks(), right.chunks(), left.len(), right.len());
            hash_join_tuples_left(
                keys_a,
                keys_b,
                mapping_left.as_deref(),
                mapping_right.as_deref(),
            )
        }
    }
}

impl<T> ChunkedArray<T>
where
    T: PolarsIntegerType + Sync,
    T::Native: Eq + Hash + num::NumCast,
{
    fn hash_join_outer(&self, other: &ChunkedArray<T>) -> Vec<(Option<IdxSize>, Option<IdxSize>)> {
        let (a, b, swap) = det_hash_prone_order!(self, other);

        let n_partitions = _set_partition_size();
        let splitted_a = split_ca(a, n_partitions).unwrap();
        let splitted_b = split_ca(b, n_partitions).unwrap();

        match (a.null_count(), b.null_count()) {
            (0, 0) => {
                let iters_a = splitted_a
                    .iter()
                    .map(|ca| ca.into_no_null_iter())
                    .collect::<Vec<_>>();
                let iters_b = splitted_b
                    .iter()
                    .map(|ca| ca.into_no_null_iter())
                    .collect::<Vec<_>>();
                hash_join_tuples_outer(iters_a, iters_b, swap)
            }
            _ => {
                let iters_a = splitted_a
                    .iter()
                    .map(|ca| ca.into_iter())
                    .collect::<Vec<_>>();
                let iters_b = splitted_b
                    .iter()
                    .map(|ca| ca.into_iter())
                    .collect::<Vec<_>>();
                hash_join_tuples_outer(iters_a, iters_b, swap)
            }
        }
    }
}

pub(crate) fn prepare_strs<'a>(
    been_split: &'a [Utf8Chunked],
    hb: &RandomState,
) -> Vec<Vec<BytesHash<'a>>> {
    POOL.install(|| {
        been_split
            .par_iter()
            .map(|ca| {
                ca.into_iter()
                    .map(|opt_s| {
                        let mut state = hb.build_hasher();
                        opt_s.hash(&mut state);
                        let hash = state.finish();
                        BytesHash::new_from_str(opt_s, hash)
                    })
                    .collect::<Vec<_>>()
            })
            .collect()
    })
}

impl Utf8Chunked {
    fn prepare(
        &self,
        other: &Utf8Chunked,
        swapped: bool,
    ) -> (Vec<Self>, Vec<Self>, bool, RandomState) {
        let n_threads = POOL.current_num_threads();

        let (a, b, swap) = if swapped {
            det_hash_prone_order!(self, other)
        } else {
            (self, other, false)
        };

        let hb = RandomState::default();
        let splitted_a = split_ca(a, n_threads).unwrap();
        let splitted_b = split_ca(b, n_threads).unwrap();

        (splitted_a, splitted_b, swap, hb)
    }

    // returns the join tuples and whether or not the lhs tuples are sorted
    fn hash_join_inner(&self, other: &Utf8Chunked) -> ((Vec<IdxSize>, Vec<IdxSize>), bool) {
        let (splitted_a, splitted_b, swap, hb) = self.prepare(other, true);
        let str_hashes_a = prepare_strs(&splitted_a, &hb);
        let str_hashes_b = prepare_strs(&splitted_b, &hb);
        (
            hash_join_tuples_inner(str_hashes_a, str_hashes_b, swap),
            !swap,
        )
    }

    fn hash_join_left(&self, other: &Utf8Chunked) -> LeftJoinIds {
        let (splitted_a, splitted_b, _, hb) = self.prepare(other, false);
        let str_hashes_a = prepare_strs(&splitted_a, &hb);
        let str_hashes_b = prepare_strs(&splitted_b, &hb);

        let (mapping_left, mapping_right) =
            create_mappings(self.chunks(), other.chunks(), self.len(), other.len());
        hash_join_tuples_left(
            str_hashes_a,
            str_hashes_b,
            mapping_left.as_deref(),
            mapping_right.as_deref(),
        )
    }

    #[cfg(feature = "semi_anti_join")]
    fn hash_join_semi_anti(&self, other: &Utf8Chunked, anti: bool) -> Vec<IdxSize> {
        let (splitted_a, splitted_b, _, hb) = self.prepare(other, false);
        let str_hashes_a = prepare_strs(&splitted_a, &hb);
        let str_hashes_b = prepare_strs(&splitted_b, &hb);
        if anti {
            hash_join_tuples_left_anti(str_hashes_a, str_hashes_b)
        } else {
            hash_join_tuples_left_semi(str_hashes_a, str_hashes_b)
        }
    }

    fn hash_join_outer(&self, other: &Utf8Chunked) -> Vec<(Option<IdxSize>, Option<IdxSize>)> {
        let (a, b, swap) = det_hash_prone_order!(self, other);

        let n_partitions = _set_partition_size();
        let splitted_a = split_ca(a, n_partitions).unwrap();
        let splitted_b = split_ca(b, n_partitions).unwrap();

        match (a.has_validity(), b.has_validity()) {
            (false, false) => {
                let iters_a = splitted_a
                    .iter()
                    .map(|ca| ca.into_no_null_iter())
                    .collect::<Vec<_>>();
                let iters_b = splitted_b
                    .iter()
                    .map(|ca| ca.into_no_null_iter())
                    .collect::<Vec<_>>();
                hash_join_tuples_outer(iters_a, iters_b, swap)
            }
            _ => {
                let iters_a = splitted_a
                    .iter()
                    .map(|ca| ca.into_iter())
                    .collect::<Vec<_>>();
                let iters_b = splitted_b
                    .iter()
                    .map(|ca| ca.into_iter())
                    .collect::<Vec<_>>();
                hash_join_tuples_outer(iters_a, iters_b, swap)
            }
        }
    }
}

#[cfg(feature = "dtype-binary")]
pub(crate) fn prepare_bytes<'a>(
    been_split: &'a [BinaryChunked],
    hb: &RandomState,
) -> Vec<Vec<BytesHash<'a>>> {
    POOL.install(|| {
        been_split
            .par_iter()
            .map(|ca| {
                ca.into_iter()
                    .map(|opt_b| {
                        let mut state = hb.build_hasher();
                        opt_b.hash(&mut state);
                        let hash = state.finish();
                        BytesHash::new(opt_b, hash)
                    })
                    .collect::<Vec<_>>()
            })
            .collect()
    })
}

#[cfg(feature = "dtype-binary")]
impl BinaryChunked {
    fn prepare(
        &self,
        other: &BinaryChunked,
        swapped: bool,
    ) -> (Vec<Self>, Vec<Self>, bool, RandomState) {
        let n_threads = POOL.current_num_threads();

        let (a, b, swap) = if swapped {
            det_hash_prone_order!(self, other)
        } else {
            (self, other, false)
        };

        let hb = RandomState::default();
        let splitted_a = split_ca(a, n_threads).unwrap();
        let splitted_b = split_ca(b, n_threads).unwrap();

        (splitted_a, splitted_b, swap, hb)
    }

    // returns the join tuples and whether or not the lhs tuples are sorted
    fn hash_join_inner(&self, other: &BinaryChunked) -> ((Vec<IdxSize>, Vec<IdxSize>), bool) {
        let (splitted_a, splitted_b, swap, hb) = self.prepare(other, true);
        let str_hashes_a = prepare_bytes(&splitted_a, &hb);
        let str_hashes_b = prepare_bytes(&splitted_b, &hb);
        (
            hash_join_tuples_inner(str_hashes_a, str_hashes_b, swap),
            !swap,
        )
    }

    fn hash_join_left(&self, other: &BinaryChunked) -> LeftJoinIds {
        let (splitted_a, splitted_b, _, hb) = self.prepare(other, false);
        let str_hashes_a = prepare_bytes(&splitted_a, &hb);
        let str_hashes_b = prepare_bytes(&splitted_b, &hb);

        let (mapping_left, mapping_right) =
            create_mappings(self.chunks(), other.chunks(), self.len(), other.len());
        hash_join_tuples_left(
            str_hashes_a,
            str_hashes_b,
            mapping_left.as_deref(),
            mapping_right.as_deref(),
        )
    }

    #[cfg(feature = "semi_anti_join")]
    fn hash_join_semi_anti(&self, other: &BinaryChunked, anti: bool) -> Vec<IdxSize> {
        let (splitted_a, splitted_b, _, hb) = self.prepare(other, false);
        let str_hashes_a = prepare_bytes(&splitted_a, &hb);
        let str_hashes_b = prepare_bytes(&splitted_b, &hb);
        if anti {
            hash_join_tuples_left_anti(str_hashes_a, str_hashes_b)
        } else {
            hash_join_tuples_left_semi(str_hashes_a, str_hashes_b)
        }
    }

    fn hash_join_outer(&self, other: &BinaryChunked) -> Vec<(Option<IdxSize>, Option<IdxSize>)> {
        let (a, b, swap) = det_hash_prone_order!(self, other);

        let n_partitions = _set_partition_size();
        let splitted_a = split_ca(a, n_partitions).unwrap();
        let splitted_b = split_ca(b, n_partitions).unwrap();

        match (a.has_validity(), b.has_validity()) {
            (false, false) => {
                let iters_a = splitted_a
                    .iter()
                    .map(|ca| ca.into_no_null_iter())
                    .collect::<Vec<_>>();
                let iters_b = splitted_b
                    .iter()
                    .map(|ca| ca.into_no_null_iter())
                    .collect::<Vec<_>>();
                hash_join_tuples_outer(iters_a, iters_b, swap)
            }
            _ => {
                let iters_a = splitted_a
                    .iter()
                    .map(|ca| ca.into_iter())
                    .collect::<Vec<_>>();
                let iters_b = splitted_b
                    .iter()
                    .map(|ca| ca.into_iter())
                    .collect::<Vec<_>>();
                hash_join_tuples_outer(iters_a, iters_b, swap)
            }
        }
    }
}

#[cfg(feature = "semi_anti_join")]
fn num_group_join_anti_semi<T>(
    left: &ChunkedArray<T>,
    right: &ChunkedArray<T>,
    anti: bool,
) -> Vec<IdxSize>
where
    T: PolarsIntegerType,
    T::Native: Hash + Eq + Send + AsU64,
    Option<T::Native>: AsU64,
{
    let n_threads = POOL.current_num_threads();
    let splitted_a = split_ca(left, n_threads).unwrap();
    let splitted_b = split_ca(right, n_threads).unwrap();
    match (
        left.null_count(),
        right.null_count(),
        left.chunks.len(),
        right.chunks.len(),
    ) {
        (0, 0, 1, 1) => {
            let keys_a = splitted_to_slice(&splitted_a);
            let keys_b = splitted_to_slice(&splitted_b);
            if anti {
                hash_join_tuples_left_anti(keys_a, keys_b)
            } else {
                hash_join_tuples_left_semi(keys_a, keys_b)
            }
        }
        (0, 0, _, _) => {
            let keys_a = splitted_by_chunks(&splitted_a);
            let keys_b = splitted_by_chunks(&splitted_b);
            if anti {
                hash_join_tuples_left_anti(keys_a, keys_b)
            } else {
                hash_join_tuples_left_semi(keys_a, keys_b)
            }
        }
        _ => {
            let keys_a = splitted_to_opt_vec(&splitted_a);
            let keys_b = splitted_to_opt_vec(&splitted_b);
            if anti {
                hash_join_tuples_left_anti(keys_a, keys_b)
            } else {
                hash_join_tuples_left_semi(keys_a, keys_b)
            }
        }
    }
}
src/frame/asof_join/groups.rs (line 248)
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
fn asof_join_by_numeric<T, S>(
    by_left: &ChunkedArray<S>,
    by_right: &ChunkedArray<S>,
    left_asof: &ChunkedArray<T>,
    right_asof: &ChunkedArray<T>,
    tolerance: Option<AnyValue<'static>>,
    strategy: AsofStrategy,
) -> PolarsResult<Vec<Option<IdxSize>>>
where
    T: PolarsNumericType,
    S: PolarsNumericType,
    S::Native: Hash + Eq + AsU64,
{
    #[allow(clippy::type_complexity)]
    let (join_asof_fn, tolerance, forward): (
        unsafe fn(T::Native, &[T::Native], &[IdxSize], T::Native) -> (Option<IdxSize>, usize),
        _,
        _,
    ) = match (tolerance, strategy) {
        (Some(tolerance), AsofStrategy::Backward) => {
            let tol = tolerance.extract::<T::Native>().unwrap();
            (
                join_asof_backward_with_indirection_and_tolerance,
                tol,
                false,
            )
        }
        (None, AsofStrategy::Backward) => (
            join_asof_backward_with_indirection,
            T::Native::zero(),
            false,
        ),
        (Some(tolerance), AsofStrategy::Forward) => {
            let tol = tolerance.extract::<T::Native>().unwrap();
            (join_asof_forward_with_indirection_and_tolerance, tol, true)
        }
        (None, AsofStrategy::Forward) => {
            (join_asof_forward_with_indirection, T::Native::zero(), true)
        }
    };

    let left_asof = left_asof.rechunk();
    let err = |_: PolarsError| {
        PolarsError::ComputeError("Keys are not allowed to have null values in asof join.".into())
    };
    let left_asof = left_asof.cont_slice().map_err(err)?;

    let right_asof = right_asof.rechunk();
    let right_asof = right_asof.cont_slice().map_err(err)?;

    let n_threads = POOL.current_num_threads();
    let splitted_left = split_ca(by_left, n_threads).unwrap();
    let splitted_right = split_ca(by_right, n_threads).unwrap();

    let vals_left = splitted_left
        .iter()
        .map(|ca| ca.cont_slice().unwrap())
        .collect::<Vec<_>>();
    let vals_right = splitted_right
        .iter()
        .map(|ca| ca.cont_slice().unwrap())
        .collect::<Vec<_>>();

    let hash_tbls = create_probe_table(vals_right);

    // we determine the offset so that we later know which index to store in the join tuples
    let offsets = vals_left
        .iter()
        .map(|ph| ph.len())
        .scan(0, |state, val| {
            let out = *state;
            *state += val;
            Some(out)
        })
        .collect::<Vec<_>>();

    let n_tables = hash_tbls.len() as u64;
    debug_assert!(n_tables.is_power_of_two());

    // next we probe the right relation
    Ok(POOL.install(|| {
        vals_left
            .into_par_iter()
            .zip(offsets)
            // probes_hashes: Vec<u64> processed by this thread
            // offset: offset index
            .map(|(vals_left, offset)| {
                // local reference
                let hash_tbls = &hash_tbls;

                // assume the result tuples equal length of the no. of hashes processed by this thread.
                let mut results = Vec::with_capacity(vals_left.len());

                let mut right_tbl_offsets = PlHashMap::with_capacity(HASHMAP_INIT_SIZE);

                vals_left.iter().enumerate().for_each(|(idx_a, k)| {
                    let idx_a = (idx_a + offset) as IdxSize;
                    // probe table that contains the hashed value
                    let current_probe_table = unsafe {
                        get_hash_tbl_threaded_join_partitioned(k.as_u64(), hash_tbls, n_tables)
                    };

                    // we already hashed, so we don't have to hash again.
                    let value = current_probe_table.get(k);

                    match value {
                        // left and right matches
                        Some(indexes_b) => {
                            process_group(
                                *k,
                                idx_a,
                                tolerance,
                                indexes_b,
                                &mut right_tbl_offsets,
                                join_asof_fn,
                                left_asof,
                                right_asof,
                                &mut results,
                                forward,
                            );
                        }
                        // only left values, right = null
                        None => results.push(None),
                    }
                });
                results
            })
            .flatten()
            .collect()
    }))
}

fn asof_join_by_utf8<T>(
    by_left: &Utf8Chunked,
    by_right: &Utf8Chunked,
    left_asof: &ChunkedArray<T>,
    right_asof: &ChunkedArray<T>,
    tolerance: Option<AnyValue<'static>>,
    strategy: AsofStrategy,
) -> Vec<Option<IdxSize>>
where
    T: PolarsNumericType,
{
    #[allow(clippy::type_complexity)]
    let (join_asof_fn, tolerance, forward): (
        unsafe fn(T::Native, &[T::Native], &[IdxSize], T::Native) -> (Option<IdxSize>, usize),
        _,
        _,
    ) = match (tolerance, strategy) {
        (Some(tolerance), AsofStrategy::Backward) => {
            let tol = tolerance.extract::<T::Native>().unwrap();
            (
                join_asof_backward_with_indirection_and_tolerance,
                tol,
                false,
            )
        }
        (None, AsofStrategy::Backward) => (
            join_asof_backward_with_indirection,
            T::Native::zero(),
            false,
        ),
        (Some(tolerance), AsofStrategy::Forward) => {
            let tol = tolerance.extract::<T::Native>().unwrap();
            (join_asof_forward_with_indirection_and_tolerance, tol, true)
        }
        (None, AsofStrategy::Forward) => {
            (join_asof_forward_with_indirection, T::Native::zero(), true)
        }
    };

    let left_asof = left_asof.rechunk();
    let left_asof = left_asof.cont_slice().unwrap();

    let right_asof = right_asof.rechunk();
    let right_asof = right_asof.cont_slice().unwrap();

    let n_threads = POOL.current_num_threads();
    let splitted_by_left = split_ca(by_left, n_threads).unwrap();
    let splitted_right = split_ca(by_right, n_threads).unwrap();

    let hb = RandomState::default();
    let vals_left = prepare_strs(&splitted_by_left, &hb);
    let vals_right = prepare_strs(&splitted_right, &hb);

    let hash_tbls = create_probe_table(vals_right);

    // we determine the offset so that we later know which index to store in the join tuples
    let offsets = vals_left
        .iter()
        .map(|ph| ph.len())
        .scan(0, |state, val| {
            let out = *state;
            *state += val;
            Some(out)
        })
        .collect::<Vec<_>>();

    let n_tables = hash_tbls.len() as u64;
    debug_assert!(n_tables.is_power_of_two());

    // next we probe the right relation
    POOL.install(|| {
        vals_left
            .into_par_iter()
            .zip(offsets)
            // probes_hashes: Vec<u64> processed by this thread
            // offset: offset index
            .map(|(vals_left, offset)| {
                // local reference
                let hash_tbls = &hash_tbls;

                // assume the result tuples equal length of the no. of hashes processed by this thread.
                let mut results = Vec::with_capacity(vals_left.len());

                let mut right_tbl_offsets = PlHashMap::with_capacity(HASHMAP_INIT_SIZE);

                vals_left.iter().enumerate().for_each(|(idx_a, k)| {
                    let idx_a = (idx_a + offset) as IdxSize;
                    // probe table that contains the hashed value
                    let current_probe_table = unsafe {
                        get_hash_tbl_threaded_join_partitioned(k.as_u64(), hash_tbls, n_tables)
                    };

                    // we already hashed, so we don't have to hash again.
                    let value = current_probe_table.get(k);

                    match value {
                        // left and right matches
                        Some(indexes_b) => {
                            process_group(
                                *k,
                                idx_a,
                                tolerance,
                                indexes_b,
                                &mut right_tbl_offsets,
                                join_asof_fn,
                                left_asof,
                                right_asof,
                                &mut results,
                                forward,
                            );
                        }
                        // only left values, right = null
                        None => results.push(None),
                    }
                });
                results
            })
            .flatten()
            .collect()
    })
}