polars_ops/frame/join/
general.rs

1use polars_utils::format_pl_smallstr;
2
3use super::*;
4use crate::series::coalesce_columns;
5
6pub fn _join_suffix_name(name: &str, suffix: &str) -> PlSmallStr {
7    format_pl_smallstr!("{name}{suffix}")
8}
9
10fn get_suffix(suffix: Option<PlSmallStr>) -> PlSmallStr {
11    suffix.unwrap_or_else(|| PlSmallStr::from_static("_right"))
12}
13
14/// Renames the columns on the right to not clash with the left using a specified or otherwise default suffix
15/// and then merges the right dataframe into the left
16#[doc(hidden)]
17pub fn _finish_join(
18    mut df_left: DataFrame,
19    mut df_right: DataFrame,
20    suffix: Option<PlSmallStr>,
21) -> PolarsResult<DataFrame> {
22    let mut left_names = PlHashSet::with_capacity(df_left.width());
23
24    df_left.get_columns().iter().for_each(|series| {
25        left_names.insert(series.name());
26    });
27
28    let mut rename_strs = Vec::with_capacity(df_right.width());
29    let right_names = df_right.schema();
30
31    for name in right_names.iter_names() {
32        if left_names.contains(name) {
33            rename_strs.push(name.clone())
34        }
35    }
36
37    let suffix = get_suffix(suffix);
38
39    for name in rename_strs {
40        let new_name = _join_suffix_name(name.as_str(), suffix.as_str());
41
42        df_right.rename(&name, new_name.clone()).map_err(|_| {
43            polars_err!(Duplicate: "column with name '{}' already exists\n\n\
44            You may want to try:\n\
45            - renaming the column prior to joining\n\
46            - using the `suffix` parameter to specify a suffix different to the default one ('_right')", new_name)
47        })?;
48    }
49
50    drop(left_names);
51    unsafe { df_left.hstack_mut_unchecked(df_right.get_columns()) };
52    Ok(df_left)
53}
54
55pub fn _coalesce_full_join(
56    mut df: DataFrame,
57    keys_left: &[PlSmallStr],
58    keys_right: &[PlSmallStr],
59    suffix: Option<PlSmallStr>,
60    df_left: &DataFrame,
61) -> DataFrame {
62    // No need to allocate the schema because we already
63    // know for certain that the column name for left is `name`
64    // and for right is `name + suffix`
65    let schema_left = if keys_left == keys_right {
66        Arc::new(Schema::default())
67    } else {
68        df_left.schema().clone()
69    };
70
71    let schema = df.schema().clone();
72    let mut to_remove = Vec::with_capacity(keys_right.len());
73
74    // SAFETY: we maintain invariants.
75    let columns = unsafe { df.get_columns_mut() };
76    let suffix = get_suffix(suffix);
77    for (l, r) in keys_left.iter().zip(keys_right.iter()) {
78        let pos_l = schema.get_full(l.as_str()).unwrap().0;
79
80        let r = if l == r || schema_left.contains(r.as_str()) {
81            _join_suffix_name(r.as_str(), suffix.as_str())
82        } else {
83            r.clone()
84        };
85        let pos_r = schema.get_full(&r).unwrap().0;
86
87        let l = columns[pos_l].clone();
88        let r = columns[pos_r].clone();
89
90        columns[pos_l] = coalesce_columns(&[l, r]).unwrap();
91        to_remove.push(pos_r);
92    }
93    // sort in reverse order, so the indexes remain correct if we remove.
94    to_remove.sort_by(|a, b| b.cmp(a));
95    for pos in to_remove {
96        let _ = columns.remove(pos);
97    }
98    df.clear_schema();
99    df
100}
101
102#[cfg(feature = "chunked_ids")]
103pub(crate) fn create_chunked_index_mapping(chunks: &[ArrayRef], len: usize) -> Vec<ChunkId> {
104    let mut vals = Vec::with_capacity(len);
105
106    for (chunk_i, chunk) in chunks.iter().enumerate() {
107        vals.extend(
108            (0..chunk.len()).map(|array_i| ChunkId::store(chunk_i as IdxSize, array_i as IdxSize)),
109        )
110    }
111
112    vals
113}