liquid_cache_datafusion/
utils.rs1use std::sync::Arc;
2
3use arrow::{
4 array::BooleanBufferBuilder,
5 buffer::{BooleanBuffer, MutableBuffer},
6};
7use datafusion::{
8 common::tree_node::{TreeNode, TreeNodeRecursion},
9 datasource::source::DataSourceExec,
10 physical_plan::{ExecutionPlan, metrics::MetricValue},
11};
12use liquid_cache_common::rpc::ExecutionMetricsResponse;
13use parquet::arrow::arrow_reader::RowSelector;
14
15use crate::cache::LiquidCacheParquetRef;
16
17fn boolean_buffer_and_then_fallback(left: &BooleanBuffer, right: &BooleanBuffer) -> BooleanBuffer {
18 debug_assert_eq!(
19 left.count_set_bits(),
20 right.len(),
21 "the right selection must have the same number of set bits as the left selection"
22 );
23
24 if left.len() == right.len() {
25 debug_assert_eq!(left.count_set_bits(), left.len());
26 return right.clone();
27 }
28
29 let mut buffer = MutableBuffer::from_len_zeroed(left.values().len());
30 buffer.copy_from_slice(left.values());
31 let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, left.len());
32
33 let mut other_bits = right.iter();
34
35 for bit_idx in left.set_indices() {
36 let predicate = other_bits
37 .next()
38 .expect("Mismatch in set bits between self and other");
39 if !predicate {
40 builder.set_bit(bit_idx, false);
41 }
42 }
43
44 builder.finish()
45}
46
47pub fn boolean_buffer_and_then(left: &BooleanBuffer, right: &BooleanBuffer) -> BooleanBuffer {
63 debug_assert_eq!(
64 left.count_set_bits(),
65 right.len(),
66 "the right selection must have the same number of set bits as the left selection"
67 );
68
69 if left.len() == right.len() {
70 debug_assert_eq!(left.count_set_bits(), left.len());
71 return right.clone();
72 }
73
74 #[cfg(target_arch = "x86_64")]
76 {
77 if is_x86_feature_detected!("bmi2") {
78 return unsafe { boolean_buffer_and_then_bmi2(left, right) };
79 }
80 }
81
82 boolean_buffer_and_then_fallback(left, right)
83}
84
85#[cfg(target_arch = "x86_64")]
86#[inline(always)]
87fn load_u64_zero_padded(base_ptr: *const u8, total_bytes: usize, offset: usize) -> u64 {
88 let remaining = total_bytes.saturating_sub(offset);
89 if remaining >= 8 {
90 unsafe { core::ptr::read_unaligned(base_ptr.add(offset) as *const u64) }
91 } else if remaining > 0 {
92 let mut tmp = [0u8; 8];
93 unsafe {
94 core::ptr::copy_nonoverlapping(base_ptr.add(offset), tmp.as_mut_ptr(), remaining);
95 }
96 u64::from_le_bytes(tmp)
97 } else {
98 0
99 }
100}
101
102#[cfg(target_arch = "x86_64")]
103#[target_feature(enable = "bmi2")]
104unsafe fn boolean_buffer_and_then_bmi2(
105 left: &BooleanBuffer,
106 right: &BooleanBuffer,
107) -> BooleanBuffer {
108 use core::arch::x86_64::_pdep_u64;
109
110 debug_assert_eq!(left.count_set_bits(), right.len());
111
112 let bit_len = left.len();
113 let byte_len = bit_len.div_ceil(8);
114 let left_ptr = left.values().as_ptr();
115 let right_bytes = right.len().div_ceil(8);
116 let right_ptr = right.values().as_ptr();
117
118 let mut out = MutableBuffer::from_len_zeroed(byte_len);
119 let out_ptr = out.as_mut_ptr();
120
121 let full_words = bit_len / 64;
122 let mut right_bit_idx = 0; for word_idx in 0..full_words {
125 let left_word =
126 unsafe { core::ptr::read_unaligned(left_ptr.add(word_idx * 8) as *const u64) };
127
128 if left_word == 0 {
129 continue;
130 }
131
132 let need = left_word.count_ones();
133
134 let rb_byte = right_bit_idx / 8;
136 let rb_bit = (right_bit_idx & 7) as u32;
137
138 let need_high = rb_bit != 0;
139 let safe16 = right_bytes.saturating_sub(16);
140 let mut r_bits;
141 if rb_byte <= safe16 {
142 let low = unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte) as *const u64) };
143 if need_high {
144 let high =
145 unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte + 8) as *const u64) };
146 r_bits = (low >> rb_bit) | (high << (64 - rb_bit));
147 } else {
148 r_bits = low >> rb_bit;
149 }
150 } else {
151 let low = load_u64_zero_padded(right_ptr, right_bytes, rb_byte);
152 r_bits = low >> rb_bit;
153 if need_high {
154 let high = load_u64_zero_padded(right_ptr, right_bytes, rb_byte + 8);
155 r_bits |= high << (64 - rb_bit);
156 }
157 }
158
159 r_bits &= 1u64.unbounded_shl(need).wrapping_sub(1);
161
162 let result = _pdep_u64(r_bits, left_word);
165
166 unsafe {
167 core::ptr::write_unaligned(out_ptr.add(word_idx * 8) as *mut u64, result);
168 }
169
170 right_bit_idx += need as usize;
171 }
172
173 let tail_bits = bit_len & 63;
175 if tail_bits != 0 {
176 let tail_bytes = tail_bits.div_ceil(8);
178 let base = unsafe { left_ptr.add(full_words * 8) };
179 let mut mask: u64;
180 if tail_bytes == 8 {
181 mask = unsafe { core::ptr::read_unaligned(base as *const u64) };
182 } else {
183 let mut buf = [0u8; 8];
184 unsafe {
185 core::ptr::copy_nonoverlapping(base, buf.as_mut_ptr(), tail_bytes);
186 }
187 mask = u64::from_le_bytes(buf);
188 }
189 mask &= (1u64 << tail_bits) - 1;
191
192 if mask != 0 {
193 let need = mask.count_ones();
194
195 let rb_byte = right_bit_idx / 8;
196 let rb_bit = (right_bit_idx & 7) as u32;
197
198 let need_high = rb_bit != 0;
199 let safe16 = right_bytes.saturating_sub(16);
200 let mut r_bits;
201 if rb_byte <= safe16 {
202 let low =
203 unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte) as *const u64) };
204 if need_high {
205 let high = unsafe {
206 core::ptr::read_unaligned(right_ptr.add(rb_byte + 8) as *const u64)
207 };
208 r_bits = (low >> rb_bit) | (high << (64 - rb_bit));
209 } else {
210 r_bits = low >> rb_bit;
211 }
212 } else {
213 let low = load_u64_zero_padded(right_ptr, right_bytes, rb_byte);
214 r_bits = low >> rb_bit;
215 if need_high {
216 let high = load_u64_zero_padded(right_ptr, right_bytes, rb_byte + 8);
217 r_bits |= high << (64 - rb_bit);
218 }
219 }
220
221 r_bits &= 1u64.unbounded_shl(need).wrapping_sub(1);
222
223 let result = _pdep_u64(r_bits, mask);
224
225 let tail_bytes = tail_bits.div_ceil(8);
226 let result_bytes = result.to_le_bytes();
227 let dst_off = full_words * 8;
228 unsafe {
229 let dst = core::slice::from_raw_parts_mut(out_ptr, byte_len);
230 dst[dst_off..dst_off + tail_bytes].copy_from_slice(&result_bytes[..tail_bytes]);
231 }
232 }
233 }
234
235 BooleanBuffer::new(out.into(), 0, bit_len)
236}
237
238pub(super) fn row_selector_to_boolean_buffer(selection: &[RowSelector]) -> BooleanBuffer {
239 let mut buffer = BooleanBufferBuilder::new(8192);
240 for selector in selection.iter() {
241 if selector.skip {
242 buffer.append_n(selector.row_count, false);
243 } else {
244 buffer.append_n(selector.row_count, true);
245 }
246 }
247 buffer.finish()
248}
249
250pub fn extract_execution_metrics(
268 plan: &Arc<dyn ExecutionPlan>,
269 liquid_cache: Option<&LiquidCacheParquetRef>,
270) -> ExecutionMetricsResponse {
271 let mut time_elapsed_processing_millis = 0;
273 let mut bytes_scanned = 0;
274
275 let _ = plan.apply(|node| {
276 let any_plan = node.as_any();
277 if let Some(data_source_exec) = any_plan.downcast_ref::<DataSourceExec>()
278 && let Some(metrics) = data_source_exec.metrics()
279 {
280 let aggregated_metrics = metrics
281 .aggregate_by_name()
282 .sorted_for_display()
283 .timestamps_removed();
284
285 for metric in aggregated_metrics.iter() {
286 if let MetricValue::Time { name, time } = metric.value()
287 && name == "time_elapsed_processing"
288 {
289 time_elapsed_processing_millis += time.value() / 1_000_000;
290 } else if let MetricValue::Count { name, count } = metric.value()
291 && name == "bytes_scanned"
292 {
293 bytes_scanned += count.value();
294 }
295 }
296 }
297 Ok(TreeNodeRecursion::Continue)
298 });
299
300 let liquid_cache_usage = liquid_cache
301 .map(|cache| cache.compute_memory_usage_bytes())
302 .unwrap_or(0);
303 let cache_memory_usage = liquid_cache_usage + bytes_scanned as u64;
304
305 ExecutionMetricsResponse {
306 pushdown_eval_time: time_elapsed_processing_millis as u64,
307 cache_memory_usage,
308 liquid_cache_usage,
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 #[test]
317 #[cfg(target_arch = "x86_64")]
318 fn test_boolean_buffer_and_then_bmi2_large() {
319 use super::boolean_buffer_and_then_bmi2;
320
321 let size = 128;
323 let mut left_builder = BooleanBufferBuilder::new(size);
324 let mut right_bits = Vec::new();
325
326 for i in 0..size {
328 let is_set = i.is_multiple_of(3);
329 left_builder.append(is_set);
330 if is_set {
331 right_bits.push(right_bits.len().is_multiple_of(2));
333 }
334 }
335 let left = left_builder.finish();
336
337 let mut right_builder = BooleanBufferBuilder::new(right_bits.len());
338 for bit in right_bits {
339 right_builder.append(bit);
340 }
341 let right = right_builder.finish();
342
343 let result_bmi2 = unsafe { boolean_buffer_and_then_bmi2(&left, &right) };
344 let result_orig = boolean_buffer_and_then_fallback(&left, &right);
345
346 assert_eq!(result_bmi2.len(), result_orig.len());
347 assert_eq!(result_bmi2.len(), size);
348
349 for i in 0..size {
351 assert_eq!(
352 result_bmi2.value(i),
353 result_orig.value(i),
354 "Mismatch at position {i}"
355 );
356 }
357 }
358
359 #[test]
360 #[cfg(target_arch = "x86_64")]
361 fn test_boolean_buffer_and_then_bmi2_edge_cases() {
362 use super::boolean_buffer_and_then_bmi2;
363
364 let mut left_builder = BooleanBufferBuilder::new(16);
366 for _ in 0..16 {
367 left_builder.append(true);
368 }
369 let left = left_builder.finish();
370
371 let mut right_builder = BooleanBufferBuilder::new(16);
372 for i in 0..16 {
373 right_builder.append(i % 2 == 0);
374 }
375 let right = right_builder.finish();
376
377 let result_bmi2 = unsafe { boolean_buffer_and_then_bmi2(&left, &right) };
378 let result_orig = boolean_buffer_and_then_fallback(&left, &right);
379
380 assert_eq!(result_bmi2.len(), result_orig.len());
381 for i in 0..16 {
382 assert_eq!(
383 result_bmi2.value(i),
384 result_orig.value(i),
385 "Mismatch at position {i}"
386 );
387 assert_eq!(result_bmi2.value(i), i.is_multiple_of(2));
389 }
390
391 let mut left_empty_builder = BooleanBufferBuilder::new(8);
393 for _ in 0..8 {
394 left_empty_builder.append(false);
395 }
396 let left_empty = left_empty_builder.finish();
397 let right_empty = BooleanBufferBuilder::new(0).finish();
398
399 let result_bmi2_empty = unsafe { boolean_buffer_and_then_bmi2(&left_empty, &right_empty) };
400 let result_orig_empty = boolean_buffer_and_then_fallback(&left_empty, &right_empty);
401
402 assert_eq!(result_bmi2_empty.len(), result_orig_empty.len());
403 assert_eq!(result_bmi2_empty.len(), 8);
404 for i in 0..8 {
405 assert!(!result_bmi2_empty.value(i));
406 assert!(!result_orig_empty.value(i));
407 }
408 }
409}