Skip to main content

liquid_cache_datafusion/
utils.rs

1use std::sync::Arc;
2
3use arrow::{
4    array::BooleanBufferBuilder,
5    buffer::{BooleanBuffer, MutableBuffer},
6};
7use datafusion::{
8    common::tree_node::{TreeNode, TreeNodeRecursion},
9    datasource::source::DataSourceExec,
10    physical_plan::{ExecutionPlan, metrics::MetricValue},
11};
12use liquid_cache_common::rpc::ExecutionMetricsResponse;
13use parquet::arrow::arrow_reader::RowSelector;
14
15use crate::cache::LiquidCacheParquetRef;
16
17fn boolean_buffer_and_then_fallback(left: &BooleanBuffer, right: &BooleanBuffer) -> BooleanBuffer {
18    debug_assert_eq!(
19        left.count_set_bits(),
20        right.len(),
21        "the right selection must have the same number of set bits as the left selection"
22    );
23
24    if left.len() == right.len() {
25        debug_assert_eq!(left.count_set_bits(), left.len());
26        return right.clone();
27    }
28
29    let mut buffer = MutableBuffer::from_len_zeroed(left.values().len());
30    buffer.copy_from_slice(left.values());
31    let mut builder = BooleanBufferBuilder::new_from_buffer(buffer, left.len());
32
33    let mut other_bits = right.iter();
34
35    for bit_idx in left.set_indices() {
36        let predicate = other_bits
37            .next()
38            .expect("Mismatch in set bits between self and other");
39        if !predicate {
40            builder.set_bit(bit_idx, false);
41        }
42    }
43
44    builder.finish()
45}
46
47/// Combines this [`BooleanBuffer`] with another using logical AND on the selected bits.
48///
49/// Unlike intersection, the `other` [`BooleanBuffer`] must have exactly as many **set bits** as `self`,
50/// i.e., self.count_set_bits() == other.len().
51///
52/// This method will keep only the bits in `self` that are also set in `other`
53/// at the positions corresponding to `self`'s set bits.
54/// For example:
55/// left:   NNYYYNNYYNYN
56/// right:    YNY  NY N
57/// result: NNYNYNNNYNNN
58///
59/// Optimized version of `boolean_buffer_and_then` using BMI2 PDEP instructions.
60/// This function performs the same operation but uses bit manipulation instructions
61/// for better performance on supported x86_64 CPUs.
62pub fn boolean_buffer_and_then(left: &BooleanBuffer, right: &BooleanBuffer) -> BooleanBuffer {
63    debug_assert_eq!(
64        left.count_set_bits(),
65        right.len(),
66        "the right selection must have the same number of set bits as the left selection"
67    );
68
69    if left.len() == right.len() {
70        debug_assert_eq!(left.count_set_bits(), left.len());
71        return right.clone();
72    }
73
74    // Fast path for BMI2 support on x86_64
75    #[cfg(target_arch = "x86_64")]
76    {
77        if is_x86_feature_detected!("bmi2") {
78            return unsafe { boolean_buffer_and_then_bmi2(left, right) };
79        }
80    }
81
82    boolean_buffer_and_then_fallback(left, right)
83}
84
85#[cfg(target_arch = "x86_64")]
86#[inline(always)]
87fn load_u64_zero_padded(base_ptr: *const u8, total_bytes: usize, offset: usize) -> u64 {
88    let remaining = total_bytes.saturating_sub(offset);
89    if remaining >= 8 {
90        unsafe { core::ptr::read_unaligned(base_ptr.add(offset) as *const u64) }
91    } else if remaining > 0 {
92        let mut tmp = [0u8; 8];
93        unsafe {
94            core::ptr::copy_nonoverlapping(base_ptr.add(offset), tmp.as_mut_ptr(), remaining);
95        }
96        u64::from_le_bytes(tmp)
97    } else {
98        0
99    }
100}
101
102#[cfg(target_arch = "x86_64")]
103#[target_feature(enable = "bmi2")]
104unsafe fn boolean_buffer_and_then_bmi2(
105    left: &BooleanBuffer,
106    right: &BooleanBuffer,
107) -> BooleanBuffer {
108    use core::arch::x86_64::_pdep_u64;
109
110    debug_assert_eq!(left.count_set_bits(), right.len());
111
112    let bit_len = left.len();
113    let byte_len = bit_len.div_ceil(8);
114    let left_ptr = left.values().as_ptr();
115    let right_bytes = right.len().div_ceil(8);
116    let right_ptr = right.values().as_ptr();
117
118    let mut out = MutableBuffer::from_len_zeroed(byte_len);
119    let out_ptr = out.as_mut_ptr();
120
121    let full_words = bit_len / 64;
122    let mut right_bit_idx = 0; // how many bits we have processed from right
123
124    for word_idx in 0..full_words {
125        let left_word =
126            unsafe { core::ptr::read_unaligned(left_ptr.add(word_idx * 8) as *const u64) };
127
128        if left_word == 0 {
129            continue;
130        }
131
132        let need = left_word.count_ones();
133
134        // Absolute byte & bit offset of the first needed bit inside `right`.
135        let rb_byte = right_bit_idx / 8;
136        let rb_bit = (right_bit_idx & 7) as u32;
137
138        let need_high = rb_bit != 0;
139        let safe16 = right_bytes.saturating_sub(16);
140        let mut r_bits;
141        if rb_byte <= safe16 {
142            let low = unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte) as *const u64) };
143            if need_high {
144                let high =
145                    unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte + 8) as *const u64) };
146                r_bits = (low >> rb_bit) | (high << (64 - rb_bit));
147            } else {
148                r_bits = low >> rb_bit;
149            }
150        } else {
151            let low = load_u64_zero_padded(right_ptr, right_bytes, rb_byte);
152            r_bits = low >> rb_bit;
153            if need_high {
154                let high = load_u64_zero_padded(right_ptr, right_bytes, rb_byte + 8);
155                r_bits |= high << (64 - rb_bit);
156            }
157        }
158
159        // Mask off the high garbage.
160        r_bits &= 1u64.unbounded_shl(need).wrapping_sub(1);
161
162        // The PDEP instruction: https://www.felixcloutier.com/x86/pdep
163        // It takes left_word as the mask, and deposit the packed bits into the sparse positions of `left_word`.
164        let result = _pdep_u64(r_bits, left_word);
165
166        unsafe {
167            core::ptr::write_unaligned(out_ptr.add(word_idx * 8) as *mut u64, result);
168        }
169
170        right_bit_idx += need as usize;
171    }
172
173    // Handle remaining bits that are less than 64 bits
174    let tail_bits = bit_len & 63;
175    if tail_bits != 0 {
176        // Build the mask from the remaining bytes in one bounded copy
177        let tail_bytes = tail_bits.div_ceil(8);
178        let base = unsafe { left_ptr.add(full_words * 8) };
179        let mut mask: u64;
180        if tail_bytes == 8 {
181            mask = unsafe { core::ptr::read_unaligned(base as *const u64) };
182        } else {
183            let mut buf = [0u8; 8];
184            unsafe {
185                core::ptr::copy_nonoverlapping(base, buf.as_mut_ptr(), tail_bytes);
186            }
187            mask = u64::from_le_bytes(buf);
188        }
189        // Clear any high bits beyond the actual tail length
190        mask &= (1u64 << tail_bits) - 1;
191
192        if mask != 0 {
193            let need = mask.count_ones();
194
195            let rb_byte = right_bit_idx / 8;
196            let rb_bit = (right_bit_idx & 7) as u32;
197
198            let need_high = rb_bit != 0;
199            let safe16 = right_bytes.saturating_sub(16);
200            let mut r_bits;
201            if rb_byte <= safe16 {
202                let low =
203                    unsafe { core::ptr::read_unaligned(right_ptr.add(rb_byte) as *const u64) };
204                if need_high {
205                    let high = unsafe {
206                        core::ptr::read_unaligned(right_ptr.add(rb_byte + 8) as *const u64)
207                    };
208                    r_bits = (low >> rb_bit) | (high << (64 - rb_bit));
209                } else {
210                    r_bits = low >> rb_bit;
211                }
212            } else {
213                let low = load_u64_zero_padded(right_ptr, right_bytes, rb_byte);
214                r_bits = low >> rb_bit;
215                if need_high {
216                    let high = load_u64_zero_padded(right_ptr, right_bytes, rb_byte + 8);
217                    r_bits |= high << (64 - rb_bit);
218                }
219            }
220
221            r_bits &= 1u64.unbounded_shl(need).wrapping_sub(1);
222
223            let result = _pdep_u64(r_bits, mask);
224
225            let tail_bytes = tail_bits.div_ceil(8);
226            let result_bytes = result.to_le_bytes();
227            let dst_off = full_words * 8;
228            unsafe {
229                let dst = core::slice::from_raw_parts_mut(out_ptr, byte_len);
230                dst[dst_off..dst_off + tail_bytes].copy_from_slice(&result_bytes[..tail_bytes]);
231            }
232        }
233    }
234
235    BooleanBuffer::new(out.into(), 0, bit_len)
236}
237
238pub(super) fn row_selector_to_boolean_buffer(selection: &[RowSelector]) -> BooleanBuffer {
239    let mut buffer = BooleanBufferBuilder::new(8192);
240    for selector in selection.iter() {
241        if selector.skip {
242            buffer.append_n(selector.row_count, false);
243        } else {
244            buffer.append_n(selector.row_count, true);
245        }
246    }
247    buffer.finish()
248}
249
250/// Extracts execution metrics from a physical execution plan.
251///
252/// This function traverses the plan tree to find all DataSourceExec nodes and aggregates
253/// their metrics, including processing time and bytes scanned. It can be used by both
254/// client-server and in-process benchmarks to get consistent metrics.
255///
256/// # Arguments
257///
258/// * `plan` - The execution plan to extract metrics from
259/// * `liquid_cache` - Optional reference to the liquid cache for memory usage calculation
260///
261/// # Returns
262///
263/// An `ExecutionMetricsResponse` containing:
264/// - `pushdown_eval_time`: Time spent processing data (in milliseconds)
265/// - `cache_memory_usage`: Total cache memory usage including bytes scanned
266/// - `liquid_cache_usage`: Memory usage of the liquid cache specifically
267pub fn extract_execution_metrics(
268    plan: &Arc<dyn ExecutionPlan>,
269    liquid_cache: Option<&LiquidCacheParquetRef>,
270) -> ExecutionMetricsResponse {
271    // Traverse the plan tree to find all DataSourceExec nodes and collect their metrics
272    let mut time_elapsed_processing_millis = 0;
273    let mut bytes_scanned = 0;
274
275    let _ = plan.apply(|node| {
276        let any_plan = node.as_any();
277        if let Some(data_source_exec) = any_plan.downcast_ref::<DataSourceExec>()
278            && let Some(metrics) = data_source_exec.metrics()
279        {
280            let aggregated_metrics = metrics
281                .aggregate_by_name()
282                .sorted_for_display()
283                .timestamps_removed();
284
285            for metric in aggregated_metrics.iter() {
286                if let MetricValue::Time { name, time } = metric.value()
287                    && name == "time_elapsed_processing"
288                {
289                    time_elapsed_processing_millis += time.value() / 1_000_000;
290                } else if let MetricValue::Count { name, count } = metric.value()
291                    && name == "bytes_scanned"
292                {
293                    bytes_scanned += count.value();
294                }
295            }
296        }
297        Ok(TreeNodeRecursion::Continue)
298    });
299
300    let liquid_cache_usage = liquid_cache
301        .map(|cache| cache.compute_memory_usage_bytes())
302        .unwrap_or(0);
303    let cache_memory_usage = liquid_cache_usage + bytes_scanned as u64;
304
305    ExecutionMetricsResponse {
306        pushdown_eval_time: time_elapsed_processing_millis as u64,
307        cache_memory_usage,
308        liquid_cache_usage,
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    #[test]
317    #[cfg(target_arch = "x86_64")]
318    fn test_boolean_buffer_and_then_bmi2_large() {
319        use super::boolean_buffer_and_then_bmi2;
320
321        // Test with larger buffer (more than 64 bits)
322        let size = 128;
323        let mut left_builder = BooleanBufferBuilder::new(size);
324        let mut right_bits = Vec::new();
325
326        // Create a pattern where every 3rd bit is set in left
327        for i in 0..size {
328            let is_set = i.is_multiple_of(3);
329            left_builder.append(is_set);
330            if is_set {
331                // For right buffer, alternate between true/false
332                right_bits.push(right_bits.len().is_multiple_of(2));
333            }
334        }
335        let left = left_builder.finish();
336
337        let mut right_builder = BooleanBufferBuilder::new(right_bits.len());
338        for bit in right_bits {
339            right_builder.append(bit);
340        }
341        let right = right_builder.finish();
342
343        let result_bmi2 = unsafe { boolean_buffer_and_then_bmi2(&left, &right) };
344        let result_orig = boolean_buffer_and_then_fallback(&left, &right);
345
346        assert_eq!(result_bmi2.len(), result_orig.len());
347        assert_eq!(result_bmi2.len(), size);
348
349        // Verify they produce the same result
350        for i in 0..size {
351            assert_eq!(
352                result_bmi2.value(i),
353                result_orig.value(i),
354                "Mismatch at position {i}"
355            );
356        }
357    }
358
359    #[test]
360    #[cfg(target_arch = "x86_64")]
361    fn test_boolean_buffer_and_then_bmi2_edge_cases() {
362        use super::boolean_buffer_and_then_bmi2;
363
364        // Test case: all bits set in left, alternating pattern in right
365        let mut left_builder = BooleanBufferBuilder::new(16);
366        for _ in 0..16 {
367            left_builder.append(true);
368        }
369        let left = left_builder.finish();
370
371        let mut right_builder = BooleanBufferBuilder::new(16);
372        for i in 0..16 {
373            right_builder.append(i % 2 == 0);
374        }
375        let right = right_builder.finish();
376
377        let result_bmi2 = unsafe { boolean_buffer_and_then_bmi2(&left, &right) };
378        let result_orig = boolean_buffer_and_then_fallback(&left, &right);
379
380        assert_eq!(result_bmi2.len(), result_orig.len());
381        for i in 0..16 {
382            assert_eq!(
383                result_bmi2.value(i),
384                result_orig.value(i),
385                "Mismatch at position {i}"
386            );
387            // Should be true for even indices, false for odd
388            assert_eq!(result_bmi2.value(i), i.is_multiple_of(2));
389        }
390
391        // Test case: no bits set in left
392        let mut left_empty_builder = BooleanBufferBuilder::new(8);
393        for _ in 0..8 {
394            left_empty_builder.append(false);
395        }
396        let left_empty = left_empty_builder.finish();
397        let right_empty = BooleanBufferBuilder::new(0).finish();
398
399        let result_bmi2_empty = unsafe { boolean_buffer_and_then_bmi2(&left_empty, &right_empty) };
400        let result_orig_empty = boolean_buffer_and_then_fallback(&left_empty, &right_empty);
401
402        assert_eq!(result_bmi2_empty.len(), result_orig_empty.len());
403        assert_eq!(result_bmi2_empty.len(), 8);
404        for i in 0..8 {
405            assert!(!result_bmi2_empty.value(i));
406            assert!(!result_orig_empty.value(i));
407        }
408    }
409}