Skip to main content

merge_dataframes

Function merge_dataframes 

Source
pub fn merge_dataframes(
    left: &DataFrame,
    right: &DataFrame,
    on: &str,
    join_type: JoinType,
) -> Result<MergedDataFrame, JoinError>
Expand description

Merge two DataFrames on a single key column.

Examples found in repository?
examples/dense_join_bench.rs (line 88)
84fn golden(n: usize) {
85    let span = span_for(n);
86    let left = build_frame("lv", n, span, 7);
87    let right = build_frame("rv", n, span, 13);
88    let merged = merge_dataframes(&left, &right, "id", JoinType::Inner).expect("join");
89    let out = DataFrame::new_with_column_order(merged.index, merged.columns, merged.column_order)
90        .expect("rebuild merged frame");
91
92    // Order-sensitive digest of the merged output: fold every cell of every
93    // column (in column-order, row-order) into a rolling FNV-1a hash so any
94    // change in row order, row count, or values flips the digest.
95    let mut h: u64 = 0xcbf2_9ce4_8422_2325;
96    let mut mix = |x: u64| {
97        h ^= x;
98        h = h.wrapping_mul(0x0000_0100_0000_01b3);
99    };
100    mix(out.len() as u64);
101    for name in out.column_names() {
102        for s in name.bytes() {
103            mix(u64::from(s));
104        }
105        let col = out.column(name).expect("col");
106        for v in col.values().iter() {
107            match v {
108                Scalar::Int64(i) => mix(*i as u64),
109                Scalar::Null(_) => mix(0xDEAD_BEEF),
110                other => mix(format!("{other:?}")
111                    .bytes()
112                    .fold(0u64, |a, b| a.wrapping_mul(131).wrapping_add(u64::from(b)))),
113            }
114        }
115    }
116    println!("rows={} digest={:016x}", out.len(), h);
117}
118
119fn mk_card(n: usize) -> i64 {
120    // card^2 ~= n/4 so the two-key inner self-join fans out to ~4n rows.
121    ((n as f64 / 4.0).sqrt() as i64).max(2)
122}
123
124fn mku_card(n: usize) -> i64 {
125    // card^2 ~= n so the two-key combos are ~all distinct (near-1:1 inner join,
126    // output ~= n). With a single payload column the build (composite key hash
127    // vs packed dense) dominates — isolating dense_packed_int64_inner_positions.
128    ((n as f64).sqrt() as i64 + 1).max(2)
129}
130
131fn run_mk(n: usize, card: i64, n_val: usize, golden_mode: bool, iters: usize) {
132    let left = build_mk_frame("lv", n, card, 7, n_val);
133    let right = build_mk_frame("rv", n, card, 13, n_val);
134    if golden_mode {
135        let merged =
136            merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner).expect("mk join");
137        let out =
138            DataFrame::new_with_column_order(merged.index, merged.columns, merged.column_order)
139                .expect("rebuild mk frame");
140        let mut h: u64 = 0xcbf2_9ce4_8422_2325;
141        let mut mix = |x: u64| {
142            h ^= x;
143            h = h.wrapping_mul(0x0000_0100_0000_01b3);
144        };
145        mix(out.len() as u64);
146        for name in out.column_names() {
147            for s in name.bytes() {
148                mix(u64::from(s));
149            }
150            let col = out.column(name).expect("col");
151            for v in col.values().iter() {
152                match v {
153                    Scalar::Int64(i) => mix(*i as u64),
154                    Scalar::Null(_) => mix(0xDEAD_BEEF),
155                    other => mix(format!("{other:?}")
156                        .bytes()
157                        .fold(0u64, |a, b| a.wrapping_mul(131).wrapping_add(u64::from(b)))),
158                }
159            }
160        }
161        println!("rows={} digest={:016x}", out.len(), h);
162        return;
163    }
164    let start = Instant::now();
165    let mut sink: usize = 0;
166    for _ in 0..iters {
167        let out =
168            merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner).expect("mk join");
169        sink = sink.wrapping_add(out.index.len());
170    }
171    let elapsed = start.elapsed();
172    eprintln!(
173        "dense_join_bench: n={n} card={card} n_val={n_val} iters={iters} {:.3}s ({:.3} ms/iter), sink={sink}",
174        elapsed.as_secs_f64(),
175        elapsed.as_secs_f64() * 1000.0 / iters as f64,
176    );
177}
178
179fn golden_mk(n: usize) {
180    let card = mk_card(n);
181    let left = build_mk_frame("lv", n, card, 7, 4);
182    let right = build_mk_frame("rv", n, card, 13, 4);
183    let merged =
184        merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner).expect("mk join");
185    let out = DataFrame::new_with_column_order(merged.index, merged.columns, merged.column_order)
186        .expect("rebuild mk frame");
187    let mut h: u64 = 0xcbf2_9ce4_8422_2325;
188    let mut mix = |x: u64| {
189        h ^= x;
190        h = h.wrapping_mul(0x0000_0100_0000_01b3);
191    };
192    mix(out.len() as u64);
193    for name in out.column_names() {
194        for s in name.bytes() {
195            mix(u64::from(s));
196        }
197        let col = out.column(name).expect("col");
198        for v in col.values().iter() {
199            match v {
200                Scalar::Int64(i) => mix(*i as u64),
201                Scalar::Null(_) => mix(0xDEAD_BEEF),
202                other => mix(format!("{other:?}")
203                    .bytes()
204                    .fold(0u64, |a, b| a.wrapping_mul(131).wrapping_add(u64::from(b)))),
205            }
206        }
207    }
208    println!("rows={} digest={:016x}", out.len(), h);
209}
210
211fn main() {
212    let args: Vec<String> = std::env::args().collect();
213    if args.get(1).map(String::as_str) == Some("golden") {
214        let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(5_000);
215        golden(n);
216        return;
217    }
218    if args.get(1).map(String::as_str) == Some("golden_mk") {
219        let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(5_000);
220        golden_mk(n);
221        return;
222    }
223    if args.get(1).map(String::as_str) == Some("golden_mku") {
224        let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(5_000);
225        run_mk(n, mku_card(n), 1, true, 0);
226        return;
227    }
228    if args.get(1).map(String::as_str) == Some("mku") {
229        let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(200_000);
230        let iters: usize = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(200);
231        run_mk(n, mku_card(n), 1, false, iters);
232        return;
233    }
234    if args.get(1).map(String::as_str) == Some("mk") {
235        let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(50_000);
236        let iters: usize = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(200);
237        let card = mk_card(n);
238        let left = build_mk_frame("lv", n, card, 7, 4);
239        let right = build_mk_frame("rv", n, card, 13, 4);
240        let start = Instant::now();
241        let mut sink: usize = 0;
242        for _ in 0..iters {
243            let out = merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner)
244                .expect("mk join");
245            sink = sink.wrapping_add(out.index.len());
246        }
247        let elapsed = start.elapsed();
248        eprintln!(
249            "dense_join_bench mk: n={n} card={card} iters={iters} {:.3}s ({:.3} ms/iter), sink={sink}",
250            elapsed.as_secs_f64(),
251            elapsed.as_secs_f64() * 1000.0 / iters as f64,
252        );
253        return;
254    }
255
256    let n: usize = args.get(1).and_then(|s| s.parse().ok()).unwrap_or(200_000);
257    let iters: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(200);
258    let span = span_for(n);
259    let left = build_frame("lv", n, span, 7);
260    let right = build_frame("rv", n, span, 13);
261
262    let start = Instant::now();
263    let mut sink: usize = 0;
264    for _ in 0..iters {
265        let out = merge_dataframes(&left, &right, "id", JoinType::Inner).expect("join");
266        sink = sink.wrapping_add(out.index.len());
267    }
268    let elapsed = start.elapsed();
269    eprintln!(
270        "dense_join_bench: n={n} iters={iters} {:.3}s ({:.3} ms/iter), sink={sink}",
271        elapsed.as_secs_f64(),
272        elapsed.as_secs_f64() * 1000.0 / iters as f64,
273    );
274}