pub fn merge_dataframes(
left: &DataFrame,
right: &DataFrame,
on: &str,
join_type: JoinType,
) -> Result<MergedDataFrame, JoinError>Expand description
Merge two DataFrames on a single key column.
Examples found in repository?
examples/dense_join_bench.rs (line 88)
84fn golden(n: usize) {
85 let span = span_for(n);
86 let left = build_frame("lv", n, span, 7);
87 let right = build_frame("rv", n, span, 13);
88 let merged = merge_dataframes(&left, &right, "id", JoinType::Inner).expect("join");
89 let out = DataFrame::new_with_column_order(merged.index, merged.columns, merged.column_order)
90 .expect("rebuild merged frame");
91
92 // Order-sensitive digest of the merged output: fold every cell of every
93 // column (in column-order, row-order) into a rolling FNV-1a hash so any
94 // change in row order, row count, or values flips the digest.
95 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
96 let mut mix = |x: u64| {
97 h ^= x;
98 h = h.wrapping_mul(0x0000_0100_0000_01b3);
99 };
100 mix(out.len() as u64);
101 for name in out.column_names() {
102 for s in name.bytes() {
103 mix(u64::from(s));
104 }
105 let col = out.column(name).expect("col");
106 for v in col.values().iter() {
107 match v {
108 Scalar::Int64(i) => mix(*i as u64),
109 Scalar::Null(_) => mix(0xDEAD_BEEF),
110 other => mix(format!("{other:?}")
111 .bytes()
112 .fold(0u64, |a, b| a.wrapping_mul(131).wrapping_add(u64::from(b)))),
113 }
114 }
115 }
116 println!("rows={} digest={:016x}", out.len(), h);
117}
118
119fn mk_card(n: usize) -> i64 {
120 // card^2 ~= n/4 so the two-key inner self-join fans out to ~4n rows.
121 ((n as f64 / 4.0).sqrt() as i64).max(2)
122}
123
124fn mku_card(n: usize) -> i64 {
125 // card^2 ~= n so the two-key combos are ~all distinct (near-1:1 inner join,
126 // output ~= n). With a single payload column the build (composite key hash
127 // vs packed dense) dominates — isolating dense_packed_int64_inner_positions.
128 ((n as f64).sqrt() as i64 + 1).max(2)
129}
130
131fn run_mk(n: usize, card: i64, n_val: usize, golden_mode: bool, iters: usize) {
132 let left = build_mk_frame("lv", n, card, 7, n_val);
133 let right = build_mk_frame("rv", n, card, 13, n_val);
134 if golden_mode {
135 let merged =
136 merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner).expect("mk join");
137 let out =
138 DataFrame::new_with_column_order(merged.index, merged.columns, merged.column_order)
139 .expect("rebuild mk frame");
140 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
141 let mut mix = |x: u64| {
142 h ^= x;
143 h = h.wrapping_mul(0x0000_0100_0000_01b3);
144 };
145 mix(out.len() as u64);
146 for name in out.column_names() {
147 for s in name.bytes() {
148 mix(u64::from(s));
149 }
150 let col = out.column(name).expect("col");
151 for v in col.values().iter() {
152 match v {
153 Scalar::Int64(i) => mix(*i as u64),
154 Scalar::Null(_) => mix(0xDEAD_BEEF),
155 other => mix(format!("{other:?}")
156 .bytes()
157 .fold(0u64, |a, b| a.wrapping_mul(131).wrapping_add(u64::from(b)))),
158 }
159 }
160 }
161 println!("rows={} digest={:016x}", out.len(), h);
162 return;
163 }
164 let start = Instant::now();
165 let mut sink: usize = 0;
166 for _ in 0..iters {
167 let out =
168 merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner).expect("mk join");
169 sink = sink.wrapping_add(out.index.len());
170 }
171 let elapsed = start.elapsed();
172 eprintln!(
173 "dense_join_bench: n={n} card={card} n_val={n_val} iters={iters} {:.3}s ({:.3} ms/iter), sink={sink}",
174 elapsed.as_secs_f64(),
175 elapsed.as_secs_f64() * 1000.0 / iters as f64,
176 );
177}
178
179fn golden_mk(n: usize) {
180 let card = mk_card(n);
181 let left = build_mk_frame("lv", n, card, 7, 4);
182 let right = build_mk_frame("rv", n, card, 13, 4);
183 let merged =
184 merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner).expect("mk join");
185 let out = DataFrame::new_with_column_order(merged.index, merged.columns, merged.column_order)
186 .expect("rebuild mk frame");
187 let mut h: u64 = 0xcbf2_9ce4_8422_2325;
188 let mut mix = |x: u64| {
189 h ^= x;
190 h = h.wrapping_mul(0x0000_0100_0000_01b3);
191 };
192 mix(out.len() as u64);
193 for name in out.column_names() {
194 for s in name.bytes() {
195 mix(u64::from(s));
196 }
197 let col = out.column(name).expect("col");
198 for v in col.values().iter() {
199 match v {
200 Scalar::Int64(i) => mix(*i as u64),
201 Scalar::Null(_) => mix(0xDEAD_BEEF),
202 other => mix(format!("{other:?}")
203 .bytes()
204 .fold(0u64, |a, b| a.wrapping_mul(131).wrapping_add(u64::from(b)))),
205 }
206 }
207 }
208 println!("rows={} digest={:016x}", out.len(), h);
209}
210
211fn main() {
212 let args: Vec<String> = std::env::args().collect();
213 if args.get(1).map(String::as_str) == Some("golden") {
214 let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(5_000);
215 golden(n);
216 return;
217 }
218 if args.get(1).map(String::as_str) == Some("golden_mk") {
219 let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(5_000);
220 golden_mk(n);
221 return;
222 }
223 if args.get(1).map(String::as_str) == Some("golden_mku") {
224 let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(5_000);
225 run_mk(n, mku_card(n), 1, true, 0);
226 return;
227 }
228 if args.get(1).map(String::as_str) == Some("mku") {
229 let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(200_000);
230 let iters: usize = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(200);
231 run_mk(n, mku_card(n), 1, false, iters);
232 return;
233 }
234 if args.get(1).map(String::as_str) == Some("mk") {
235 let n: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(50_000);
236 let iters: usize = args.get(3).and_then(|s| s.parse().ok()).unwrap_or(200);
237 let card = mk_card(n);
238 let left = build_mk_frame("lv", n, card, 7, 4);
239 let right = build_mk_frame("rv", n, card, 13, 4);
240 let start = Instant::now();
241 let mut sink: usize = 0;
242 for _ in 0..iters {
243 let out = merge_dataframes_on(&left, &right, &["id1", "id2"], JoinType::Inner)
244 .expect("mk join");
245 sink = sink.wrapping_add(out.index.len());
246 }
247 let elapsed = start.elapsed();
248 eprintln!(
249 "dense_join_bench mk: n={n} card={card} iters={iters} {:.3}s ({:.3} ms/iter), sink={sink}",
250 elapsed.as_secs_f64(),
251 elapsed.as_secs_f64() * 1000.0 / iters as f64,
252 );
253 return;
254 }
255
256 let n: usize = args.get(1).and_then(|s| s.parse().ok()).unwrap_or(200_000);
257 let iters: usize = args.get(2).and_then(|s| s.parse().ok()).unwrap_or(200);
258 let span = span_for(n);
259 let left = build_frame("lv", n, span, 7);
260 let right = build_frame("rv", n, span, 13);
261
262 let start = Instant::now();
263 let mut sink: usize = 0;
264 for _ in 0..iters {
265 let out = merge_dataframes(&left, &right, "id", JoinType::Inner).expect("join");
266 sink = sink.wrapping_add(out.index.len());
267 }
268 let elapsed = start.elapsed();
269 eprintln!(
270 "dense_join_bench: n={n} iters={iters} {:.3}s ({:.3} ms/iter), sink={sink}",
271 elapsed.as_secs_f64(),
272 elapsed.as_secs_f64() * 1000.0 / iters as f64,
273 );
274}