simd_lookup/
lookup_kernel.rs

1//! Arrow-style "lookup kernel" similar to arrow-select::take::take kernel.
2//! There are "columnar style" which does table lookups for one table first, and then cascading to another table,
3//! and the Cascading kernel uses SIMD extensively.
4//! Other kernels just do scalar lookups which are often as fast as SIMD GATHER, but all allow SIMD functions to
5//! operate on looked up values.
6//!
7//! Anecdotally, this is not really faster than SIMD gather.
8//!
9//! ----------------- SIMD based lookup kernels - "Arrow" style ------------------------------------
10//! These operate by leveraging constant-sized array reads from a slice with SIMD operations on looked up values,
11//! especially fast for multiple table lookups and combinations thereof.
12//! It turns out these don't significantly improve on a series of scalar lookups.
13//!
14//! # CPU Feature Requirements
15//!
16//! ## Intel x86_64 (AVX-512)
17//!
18//! ### Cascading Lookup Kernel (`SimdCascadingTableU32U8Lookup`)
19//!
20//! The cascading kernel is heavily optimized for AVX-512 and provides significant performance
21//! improvements (40-50% speedup) over scalar implementations on large tables.
22//!
23//! **Required CPU features:**
24//! - **AVX512F** + **AVX512VL** + **AVX512VBMI2** (for `compress_store_u8x16`)
25//! - **AVX512F** (for `compress_store_u32x16`)
26//! - **AVX512F** + **AVX512BW** (for `gather_u32index_u8` via `simd_gather` module)
27//!
28//! **Summary**: Requires **AVX512F**, **AVX512VL**, **AVX512BW**, and **AVX512VBMI2**
29//!
30//! **Available on**: Intel Ice Lake, Tiger Lake, and later (not available on Skylake-X)
31//!
32//! ## ARM aarch64 (Apple Silicon M1/M2/M3)
33//!
34//! On ARM processors, the compress operations (`compress_store_u8x16`, `compress_store_u32x8`, etc.)
35//! use NEON-optimized implementations that provide significant speedups over scalar fallbacks:
36//!
37//! - **`compress_store_u8x16`**: Uses NEON `TBL` instruction with precomputed shuffle indices
38//!   - Eliminates 16 conditional branches from the scalar fallback
39//!   - Single `vqtbl1q_u8` instruction performs all 16-byte shuffle
40//!
41//! - **`compress_store_u32x8`**: Uses NEON `TBL2` instruction with byte-level indices
42//!   - Precomputed 256×32 byte index table avoids runtime index conversion
43//!   - Uses `vqtbl2q_u8` for efficient 32-byte permutation
44//!
45//! - **Bitmask expansion**: Uses NEON parallel bit operations (`vceq`, `vmovl`)
46//!   - Converts 8-bit mask to vector mask without scalar loops
47//!
48//! These optimizations are automatically enabled on ARM and require no user configuration.
49//!
50//! ## Fallback Behavior
51//!
52//! All kernels fall back to scalar/shuffle implementations when architecture-specific
53//! features are not available. The fallback works on all architectures.
54//!
55//! ## Other Kernels
56//!
57//! - **`SimdSingleTableU32U8Lookup`**: Uses scalar lookups (works on all architectures)
58//! - **`SimdDualTableU32U8Lookup`**: Uses scalar lookups (works on all architectures)
59//! - **`PipelinedSingleTableU32U8Lookup`**: Uses scalar lookups with software prefetching
60
61use wide::{u8x16, u32x16};
62
63use rustc_hash::FxHashMap;
64
65use crate::{
66    bulk_vec_extender::{BulkVecExtender, SliceU8SIMDExtender}, compress_store_u8x16, compress_store_u32x16, gather_u32index_u32, gather_u32index_u8, prefetch::{L3, prefetch_eight_offsets}
67};
68
69/// Safe lookup helper: returns the value at the given offset, or 0 if out of bounds
70#[inline(always)]
71fn safe_lookup(lookup_table: &[u8], offset: u32) -> u8 {
72    lookup_table.get(offset as usize).copied().unwrap_or_default()
73}
74
75/// Lookup values from lookup table using u32 offsets
76/// Returns the looked up u8 values as a [u8; 16] array
77/// This centralizes the lookup logic and avoids code duplication
78/// Out-of-bounds offsets return 0 instead of panicking
79#[inline]
80fn lookup_from_offsets(lookup_table: &[u8], offsets: &[u32; 16]) -> [u8; 16] {
81    [
82        safe_lookup(lookup_table, offsets[0]),
83        safe_lookup(lookup_table, offsets[1]),
84        safe_lookup(lookup_table, offsets[2]),
85        safe_lookup(lookup_table, offsets[3]),
86        safe_lookup(lookup_table, offsets[4]),
87        safe_lookup(lookup_table, offsets[5]),
88        safe_lookup(lookup_table, offsets[6]),
89        safe_lookup(lookup_table, offsets[7]),
90        safe_lookup(lookup_table, offsets[8]),
91        safe_lookup(lookup_table, offsets[9]),
92        safe_lookup(lookup_table, offsets[10]),
93        safe_lookup(lookup_table, offsets[11]),
94        safe_lookup(lookup_table, offsets[12]),
95        safe_lookup(lookup_table, offsets[13]),
96        safe_lookup(lookup_table, offsets[14]),
97        safe_lookup(lookup_table, offsets[15]),
98    ]
99}
100
101/// Single table lookup kernel with SIMD function - u32 to u8 lookup table kernel.
102/// The user is responsible for generating the lookup table - so this can be used for different use cases, including
103/// CASE..WHEN and bitmasking/filtering.
104///
105/// It allows for SIMD operations on looked up values, but SIMD isn't actually used in the lookups themselves as
106/// there aren't major advantages for SIMD in terms of lookup for huge tables with random indices.
107/// However, we look up 16 values at a time for efficiency.  This kernel makes sense to call on hundreds or thousands
108/// of values at a time, columnar style.
109///
110/// Note: for general purpose expressions where the lookup can be any type, instead just use `arrow::compute::take()`
111/// to do a very efficient lookup where the lookup table can be any type, but then you pay the cost of write memory
112/// I/O.  These kernels here allow user to operate on each looked up u8x16 and do something.
113///
114#[derive(Debug, Clone)]
115pub struct SimdSingleTableU32U8Lookup<'a> {
116    lookup_table: &'a [u8],
117}
118
119impl<'a> SimdSingleTableU32U8Lookup<'a> {
120    #[inline]
121    pub fn new(lookup_table: &'a [u8]) -> Self {
122        Self { lookup_table }
123    }
124
125    /// Given a slice of u32 values, looks up each one and calls the user given function on an assembled u8x16 (16
126    /// looked up values) at a time.
127    ///
128    /// The user function is passed (lookedup_values: u8x16, num_bytes: usize),
129    /// where num_bytes is 16 other than the last/remainder chunk, where it may be less than that.
130    ///
131    /// If the slice does not divide evenly into 16-item chunks, the rest is handled by filling missing values in the
132    /// u8x16 with zeroes.  Thus, the lookup assumes the zero is basically a NOP.
133    #[inline]
134    pub fn lookup_func<F>(&self, values: &[u32], f: &mut F)
135    where
136        F: FnMut(u8x16, usize),
137    {
138        let (chunks, rest) = values.as_chunks::<16>();
139        for chunk in chunks {
140            // Try prefetching lookup table entries with L3 cache level
141            // NOTE: the below compiles down to nothing in release mode, as Rust can prove the below is statically
142            //       safe with no need for bounds checking.
143            // let first_half: &[u32; 8] = chunk[..8].try_into().unwrap();
144            // let second_half: &[u32; 8] = chunk[8..].try_into().unwrap();
145
146            // prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], first_half);
147            // prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], second_half);
148
149            // Get looked up values using centralized function
150            let values = lookup_from_offsets(&self.lookup_table, chunk);
151            (f)(u8x16::from(values), 16);
152        }
153
154        // Handle the rest... just loop and do a lookup, feed to user function with 0's for items not in the slice.
155        if !rest.is_empty() {
156            let mut values = [0u8; 16];
157            for i in 0..rest.len() {
158                values[i] = self.lookup_table[rest[i] as usize];
159            }
160            (f)(u8x16::from(values), rest.len());
161        }
162    }
163
164    /// Convenience function which does lookup and writes the results into a Vec of the same length as the input slice.
165    /// Does not transform the looked up values.  Actually, extends a mutable Vec of u8.
166    #[inline]
167    pub fn lookup_into_vec(&self, values: &[u32], buffer: &mut Vec<u8>) {
168        let mut write_guard = buffer.bulk_extend_guard(values.len());
169        let mut write_slice = write_guard.as_mut_slice();
170        let mut num_written = 0;
171        self.lookup_func(values, &mut |lookedup_values, num_bytes| {
172            write_slice.write_u8x16(num_written, lookedup_values, num_bytes);
173            num_written += num_bytes;
174        });
175    }
176
177    /// Version of lookup_into_vec which writes into a mutable u8x16 buffer, for cascaded lookups
178    #[inline]
179    pub fn lookup_into_u8x16_buffer(&self, values: &[u32], buffer: &mut [u8x16]) {
180        assert!(
181            (buffer.len() * 16) >= values.len(),
182            "Buffer must be at least as long as the input values"
183        );
184        let mut idx = 0;
185        self.lookup_func(values, &mut |lookedup_values, _num_bytes| {
186            buffer[idx] = lookedup_values;
187            idx += 1;
188        });
189    }
190
191    /// Prepares a Vec of u8x16 for lookup_into_u8x16_buffer by setting the length and preparing.
192    /// The Vec is extended by the amount necessary to hold the results.
193    ///
194    /// ## Safety
195    /// - We unsafe set the length because we know we will overwrite every element.
196    ///
197    #[inline]
198    pub fn lookup_extend_u8x16_vec(&self, values: &[u32], vec: &mut Vec<u8x16>) {
199        let needed = values.len().div_ceil(16);
200        let mut guard = vec.bulk_extend_guard(needed);
201        self.lookup_into_u8x16_buffer(values, guard.as_mut_slice());
202        // guard drops here, automatically finalizes to correct length
203    }
204
205    /// Convenience function which compresses and extends two Vecs:
206    /// - `nonzero_results` - Vec<u8> of nonzero looked up u8 results
207    /// - `indices` - Vec<u32> of indices of the nonzero results
208    ///
209    /// This method is intended to be used with the cascading SIMD kernels which extend lookup into two or more
210    /// tables by leveraging the nonzero output to do packed lookups into the second table.
211    ///
212    /// ## Arguments
213    /// - `values` - &[u32] of indices to lookup
214    /// - `nonzero_results` - &mut Vec<u8> to store the nonzero looked up u8 results
215    /// - `indices` - &mut Vec<u32> to store the indices of the nonzero results
216    /// - `base_index` - base index value for the indices output.
217    ///
218    /// For example, if you wanted to extend empty Vecs (reusing them as temporary buffers), then
219    /// pass `base_index = 0` and the indices will be 0, 16, 32, etc.  Also pass empty Vecs, and clear them
220    /// every time before calling.
221    ///
222    /// ## Performance and Architecture
223    ///
224    /// The lookup function is heavily optimized for Intel AVX512, using VCOMPRESS kernel (simd_compress.rs).
225    /// Using VCOMPRESS this is nearly as fast as lookup_into_vec() which does nothing but copy the results!
226    /// On other platforms, it falls back to a scalar approach which will be potentially much slower.
227    ///
228    #[inline]
229    pub fn lookup_compress_into_nonzeroes(&self, values: &[u32], nonzero_results: &mut Vec<u8>, indices: &mut Vec<u32>, base_index: u32) {
230        // First, set up a SIMD vector of indices starting at the base index, representing indices of elements
231        let mut indices_simd = u32x16::from([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
232        indices_simd = indices_simd + u32x16::splat(base_index);
233
234        let sixteen = u32x16::splat(16);
235        let zeroes = u8x16::splat(0);
236
237        // Use BulkVecExtender for bulk mutable slices - enables VCOMPRESS and efficient writes
238        // Ensure at least 16 elements for compress_store functions
239        // Allocate extra space to ensure remainder always has 16 elements available
240        let min_len = (values.len() + 16).max(16);
241        let mut result_guard = nonzero_results.bulk_extend_guard(min_len);
242        let result_slice = result_guard.as_mut_slice();
243        let mut indices_guard = indices.bulk_extend_guard(min_len);
244        let indices_slice = indices_guard.as_mut_slice();
245        let mut num_written = 0;
246
247        self.lookup_func(values, &mut |lookedup_values, num_bytes| {
248            // Check which values are nonzero and convert to bitmask
249            // simd_eq returns 0xFF where equal to zero, so invert to get nonzero mask
250            let eq_mask = lookedup_values.simd_eq(zeroes).to_bitmask();
251            let mut nonzero_mask = !eq_mask as u16;
252
253            // For remainder chunks (< 16 elements), mask out invalid elements
254            if num_bytes < 16 {
255                nonzero_mask &= (1u16 << num_bytes) - 1;
256            }
257
258            // Compress nonzero values into result_slice
259            // We always have at least 16 elements available because we allocated values.len().max(16)
260            // and process inputs sequentially (each chunk produces at most 16 outputs)
261            let written = compress_store_u8x16(lookedup_values, nonzero_mask, &mut result_slice[num_written..]);
262            let _ = compress_store_u32x16(indices_simd, nonzero_mask, &mut indices_slice[num_written..]);
263            num_written += written;
264
265            // Update indices based on num_bytes
266            if num_bytes < 16 {
267                indices_simd = indices_simd + u32x16::splat(num_bytes as u32);
268            } else {
269                indices_simd = indices_simd + sixteen;
270            }
271        });
272        result_guard.set_written(num_written);
273        indices_guard.set_written(num_written);
274    }
275}
276
277/// Pipelined single table lookup kernel - u32 to u8 lookup table kernel with prefetch pipelining
278///
279/// This version pipelines prefetch operations with the actual lookup work to hide memory latency.
280/// The algorithm works as follows:
281/// 1. Read values from current chunk addresses
282/// 2. Prefetch next chunk addresses while processing current values
283/// 3. Call SIMD lookup function on current values
284/// 4. Loop to next chunk
285///
286/// This pipelining allows memory prefetch latency to be hidden behind computation work.
287///
288/// Results: End Nov 2025: On Intel boxes this gives slight advantage - maybe up to 5%.  Regular scalar reads
289/// though, even with large random Lookup tables, are done well enough that this doesn't help much.
290#[derive(Debug, Clone)]
291pub struct PipelinedSingleTableU32U8Lookup<'a> {
292    lookup_table: &'a [u8],
293}
294
295impl<'a> PipelinedSingleTableU32U8Lookup<'a> {
296    #[inline]
297    pub fn new(lookup_table: &'a [u8]) -> Self {
298        Self { lookup_table }
299    }
300
301    /// Pipelined lookup function that prefetches the next chunk while processing the current one
302    ///
303    /// The pipelining strategy:
304    /// - Process chunks of 16 u32 values at a time
305    /// - For each chunk: prefetch next chunk addresses, then process current chunk
306    /// - This hides prefetch latency behind the lookup computation work
307    #[inline]
308    pub fn lookup_func<F>(&self, values: &[u32], f: &mut F)
309    where
310        F: FnMut(u8x16, usize),
311    {
312        let (chunks, rest) = values.as_chunks::<16>();
313
314        if chunks.is_empty() {
315            // Handle case where we have fewer than 16 values total
316            if !rest.is_empty() {
317                self.process_remainder(rest, f);
318            }
319            return;
320        }
321
322        // Deep prefetch pipeline: prefetch PREFETCH_DISTANCE chunks ahead (64 values)
323        // This hides DRAM latency (~200-400 cycles) by having multiple memory requests in flight
324        const PREFETCH_DISTANCE: usize = 4; // 4 chunks = 64 values ahead
325
326        // Helper to prefetch a chunk
327        let prefetch_chunk = |chunk: &[u32; 16]| {
328            let first_half: &[u32; 8] = chunk[..8].try_into().unwrap();
329            let second_half: &[u32; 8] = chunk[8..].try_into().unwrap();
330            prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], first_half);
331            prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], second_half);
332        };
333
334        // For small number of chunks, fall back to simple processing
335        if chunks.len() <= PREFETCH_DISTANCE {
336            for chunk in chunks {
337                let values = lookup_from_offsets(&self.lookup_table, chunk);
338                (f)(u8x16::from(values), 16);
339            }
340            if !rest.is_empty() {
341                self.process_remainder(rest, f);
342            }
343            return;
344        }
345
346        // Prime the prefetch pipeline: prefetch first PREFETCH_DISTANCE chunks
347        for i in 0..PREFETCH_DISTANCE {
348            prefetch_chunk(&chunks[i]);
349        }
350
351        // Main loop: process chunk i while prefetching chunk i+PREFETCH_DISTANCE
352        for i in 0..chunks.len() {
353            // Prefetch chunk i+PREFETCH_DISTANCE (if it exists)
354            if i + PREFETCH_DISTANCE < chunks.len() {
355                prefetch_chunk(&chunks[i + PREFETCH_DISTANCE]);
356            }
357
358            // Process chunk i (which was prefetched PREFETCH_DISTANCE iterations ago)
359            let values = lookup_from_offsets(&self.lookup_table, &chunks[i]);
360            (f)(u8x16::from(values), 16);
361        }
362
363        // Handle remainder
364        if !rest.is_empty() {
365            self.process_remainder(rest, f);
366        }
367    }
368
369    /// Process remainder elements (< 16 elements)
370    #[inline]
371    fn process_remainder<F>(&self, rest: &[u32], f: &mut F)
372    where
373        F: FnMut(u8x16, usize),
374    {
375        let mut values = [0u8; 16];
376        for i in 0..rest.len() {
377            values[i] = self.lookup_table[rest[i] as usize];
378        }
379        (f)(u8x16::from(values), rest.len());
380    }
381
382    /// Convenience function which does lookup and writes the results into a Vec
383    #[inline]
384    pub fn lookup_into_vec(&self, values: &[u32], buffer: &mut Vec<u8>) {
385        let mut write_guard = buffer.bulk_extend_guard(values.len());
386        let write_slice = write_guard.as_mut_slice();
387        let mut num_written = 0;
388        self.lookup_func(values, &mut |lookedup_values, num_bytes| {
389            let target_slice = &mut write_slice[num_written..num_written + num_bytes];
390            target_slice.copy_from_slice(&lookedup_values.to_array()[..num_bytes]);
391            num_written += num_bytes;
392        });
393        // guard drops here, automatically finalizes to correct length
394    }
395}
396
397/// Dual lookup table kernel - u32 to u8 lookup table kernel with custom SIMD function for combining the results.
398/// Second lookup table is only looked up if the first lookup table returns a non-zero value.
399/// The lookup functions are scalar and other than the SIMD combining function, no SIMD is used - and the code is
400/// very simple, so this function is a good fit for non-AVX architectures (like Apple M*) where faster SIMD
401/// instructions like VCOMPRESS are not available.
402/// OTOH, for Intel/AVX512+ architectures, the [CascadingTableU32U8Lookup] kernel is faster.
403///
404/// The user is responsible for generating the lookup tables - so this can be used for different use cases, including
405/// CASE..WHEN and bitmasking/filtering.
406#[derive(Debug, Clone)]
407pub struct SimdDualTableU32U8Lookup<'a> {
408    lookup_table1: &'a [u8],
409    lookup_table2: &'a [u8],
410}
411
412impl<'a> SimdDualTableU32U8Lookup<'a> {
413    /// Creates a new dual table lookup kernel with the given lookup tables.
414    #[inline]
415    pub fn new(lookup_table1: &'a [u8], lookup_table2: &'a [u8]) -> Self {
416        Self {
417            lookup_table1,
418            lookup_table2,
419        }
420    }
421
422    /// Given two slices of equal length &[u32] indices, looks up each one and calls the user given function
423    /// on assembled u8x16 results.
424    /// - lookup_table1 is used for the first slice, lookup_table2 is used for the second slice.
425    /// - The user function is passed (lookedup_values1: u8x16, lookedup_values2: u8x16, num_bytes), where
426    ///   num_bytes is 16 other than the last/remainder chunk, where it may be less than that.
427    /// - If the slices do not divide evenly into 16-item chunks, the rest is handled by filling missing values in the
428    ///   u8x16 with zeroes.  Thus, the lookup assumes the zero is basically a NOP.
429    #[inline]
430    pub fn lookup_func<F>(&self, values1: &[u32], values2: &[u32], f: &mut F)
431    where
432        F: FnMut(u8x16, u8x16, usize),
433    {
434        assert!(
435            values1.len() == values2.len(),
436            "Values1 and values2 must have the same length"
437        );
438        let (chunks1, rest1) = values1.as_chunks::<16>();
439        let (chunks2, rest2) = values2.as_chunks::<16>();
440
441        for (chunk1, chunk2) in chunks1.iter().zip(chunks2.iter()) {
442            let values1 = lookup_from_offsets(self.lookup_table1, chunk1);
443
444            // Conditional lookup for table2 - only where table1 is nonzero
445            let mut values2 = [0u8; 16];
446            for i in 0..16 {
447                if values1[i] != 0 {
448                    values2[i] = self.lookup_table2[chunk2[i] as usize];
449                }
450            }
451
452            (f)(u8x16::from(values1), u8x16::from(values2), 16);
453        }
454
455        // Handle the rest... just loop and do a lookup, feed to user function with 0's for items not in the slice.
456        if !rest1.is_empty() {
457            let mut values1 = [0u8; 16];
458            let mut values2 = [0u8; 16];
459            for i in 0..rest1.len() {
460                values1[i] = self.lookup_table1[rest1[i] as usize];
461                if values1[i] != 0 {
462                    values2[i] = self.lookup_table2[rest2[i] as usize];
463                }
464            }
465            (f)(u8x16::from(values1), u8x16::from(values2), rest1.len());
466        }
467    }
468
469    /// Convenience function which does dual lookup, combines the results using a user-defined combiner function,
470    /// and extends the combined results into a Vec (pushing all combined results)
471    ///
472    /// The combiner function `f` takes two u8x16 values (looked up from table1 and table2) and returns a combined u8x16.
473    /// Unlike the single table version, this dual table version requires a combiner function.
474    #[inline]
475    pub fn lookup_into_vec<F>(
476        &self,
477        values1: &[u32],
478        values2: &[u32],
479        output: &mut Vec<u8>,
480        f: &mut F,
481    ) where
482        F: FnMut(u8x16, u8x16) -> u8x16,
483    {
484        assert!(
485            values1.len() == values2.len(),
486            "Values1 and values2 must have the same length"
487        );
488
489        let mut write_guard = output.bulk_extend_guard(values1.len());
490        let mut write_slice = write_guard.as_mut_slice();
491        let mut num_written = 0;
492        self.lookup_func(
493            values1,
494            values2,
495            &mut |lookedup_values1, lookedup_values2, num_bytes| {
496                let combined = (f)(lookedup_values1, lookedup_values2);
497                write_slice.write_u8x16(num_written, combined, num_bytes);
498                num_written += num_bytes;
499            },
500        );
501        // write_guard drops here, automatically finalizes to correct length.  We don't have to set final
502        // number of bytes since it is the same as the input, which is the default
503    }
504}
505
506// =================================================================================================
507// REMOVED: SimdJoinedDualTableU32U8Lookup
508// =================================================================================================
509//
510// This kernel was an experiment testing whether TLB swaps or non-colocated lookup tables could be
511// a performance factor. The design used a single concatenated table (`joined_table = table1 ++ table2`)
512// with an offset to access table2 entries: `joined_table[table2_offset + index2]`.
513//
514// **Design:**
515// - Instead of two separate `&[u8]` lookup tables, use one contiguous allocation
516// - Table1 occupies bytes `[0..table1.len()]`
517// - Table2 occupies bytes `[table1.len()..table1.len() + table2.len()]`
518// - Lookups into table2 add the offset: `joined_table[(table2_offset + index) as usize]`
519//
520// **Hypothesis:**
521// Colocating tables might reduce TLB misses when alternating between table1 and table2 lookups,
522// since both tables would share TLB entries for the same large allocation.
523//
524// **Benchmark Results:**
525// In testing on both Intel (AVX-512) and ARM (Apple Silicon) platforms, the joined table design
526// showed NO appreciable performance wins compared to the non-joined `SimdDualTableU32U8Lookup`.
527// The TLB hypothesis did not hold in practice - modern CPUs handle multiple large allocations
528// efficiently, and the offset arithmetic adds minor overhead that offsets any potential gains.
529//
530// The non-joined design is simpler and equally performant, so this kernel was removed.
531// =================================================================================================
532
533/// Dual table lookup kernel - u32 to u8 lookup table kernel with custom SIMD function for combining the results.
534/// It tries to eliminate thrashing by using internally the single table kernel to write results out first to
535/// a local temporary buffer, which is saved, then it looks up the second table, only if the first table returns
536/// nonzero results - thus minimizing the number of reads from the second table.
537/// By sequencing in this order, we hope to minimize the cache thrashing.
538///
539/// The user is responsible for generating the lookup tables - so this can be used for different use cases, including
540/// CASE..WHEN and bitmasking/filtering.
541/// NOTE: this is not as efficient as the newer CascadingTableU32U8Lookup kernel.
542/// TODO: deprecate and remove this kernel.  It doesn't really have advantages over the others.
543#[derive(Debug, Clone)]
544pub struct SimdDualTableU32U8LookupV2<'a> {
545    lookup1: SimdSingleTableU32U8Lookup<'a>,
546    lookup2: &'a [u8],
547    temp_buffer: Vec<u8x16>,
548}
549
550impl<'a> SimdDualTableU32U8LookupV2<'a> {
551    #[inline]
552    pub fn new(lookup_table1: &'a [u8], lookup_table2: &'a [u8]) -> Self {
553        Self {
554            lookup1: SimdSingleTableU32U8Lookup::new(lookup_table1),
555            lookup2: lookup_table2,
556            temp_buffer: Vec::with_capacity(128),
557        }
558    }
559
560    /// Given two slices of equal length &[u32] indices, looks up each one and calls the user given function
561    /// on assembled u8x16 results.
562    /// - lookup_table1 is used for the first slice, lookup_table2 is used for the second slice.
563    /// - Only if the u8 from the first lookup table is nonzero, will the second lookup table be read.
564    /// - The user function is passed (lookedup_values1: u8x16, lookedup_values2: u8x16, start_idx: usize), where
565    ///   start_idx is 0 for the first chunk call, 16 for the next one, etc.
566    /// - If the slices do not divide evenly into 16-item chunks, the rest is handled by filling missing values in the
567    ///   u8x16 with zeroes.  Thus, the lookup assumes the zero is basically a NOP.
568    ///
569    /// The lookup function is passed these arguments: (lookedup_values1: u8x16, lookedup_values2: u8x16, num_bytes)
570    /// - num_bytes: usually 16, but may be less for the last/remainder chunk.
571    #[inline]
572    pub fn lookup_func<F>(&mut self, values1: &[u32], values2: &[u32], f: &mut F)
573    where
574        F: FnMut(u8x16, u8x16, usize),
575    {
576        assert!(
577            values1.len() == values2.len(),
578            "Values1 and values2 must have the same length"
579        );
580
581        // Clear temp_buffer for reuse
582        self.temp_buffer.clear();
583
584        // First read the first table into the temporary buffer
585        self.lookup1
586            .lookup_extend_u8x16_vec(values1, &mut self.temp_buffer);
587
588        let (chunks2, rest2) = values2.as_chunks::<16>();
589
590        // Process full chunks
591        for (i, chunk2) in chunks2.iter().enumerate() {
592            let table1_result = self.temp_buffer[i];
593            let table1_array = table1_result.to_array();
594
595            // Only do lookup2 for positions where table1_result is nonzero
596            // Use two u64 loops, somehow it's faster than writing to [u8; 16] directly.
597            let local_chunk = *chunk2;
598
599            // Process high 8 bytes (8-15) into first u64
600            let mut result_high = 0u64;
601            for j in (8..16).rev() {
602                result_high <<= 8;
603                if table1_array[j] != 0 {
604                    result_high += self.lookup2[local_chunk[j] as usize] as u64;
605                }
606            }
607
608            // Process low 8 bytes (0-7) into second u64
609            let mut result_low = 0u64;
610            for j in (0..8).rev() {
611                result_low <<= 8;
612                if table1_array[j] != 0 {
613                    result_low += self.lookup2[local_chunk[j] as usize] as u64;
614                }
615            }
616
617            // Combine into u128 for conversion to u8x16
618            let result = ((result_high as u128) << 64) | (result_low as u128);
619
620            // Call user function with both u8x16 results
621            (f)(table1_result, u8x16::from(result.to_le_bytes()), 16);
622        }
623
624        // Handle the remainder
625        if !rest2.is_empty() {
626            // The remainder for table1 is already in temp_buffer (lookup_extend_u8x16_vec handles it)
627            let table1_result = self.temp_buffer[chunks2.len()];
628            let table1_array = table1_result.to_array();
629            let mut table2_result = [0u8; 16];
630
631            for i in 0..rest2.len() {
632                if table1_array[i] != 0 {
633                    // Only lookup if the first table returned nonzero
634                    table2_result[i] = self.lookup2[rest2[i] as usize];
635                }
636            }
637            (f)(table1_result, u8x16::from(table2_result), rest2.len());
638        }
639    }
640}
641
642/// Dual table lookup kernel using FxHashMap for table2.
643///
644/// This kernel is optimized for the case where table2 has a very small number of entries
645/// (sparse), making a hash map lookup more memory-efficient than a full lookup table.
646/// NOTE: We also tried writing a kernel with PHF-based EntropyMap, but it is slower than the HashMap version.
647///
648/// - Table1: Standard `&[u8]` lookup table (can be large)
649/// - Table2: `FxHashMap<u32, u8>` (optimized for sparse data with fast hashing)
650///
651/// The lookup behavior is the same as `SimdDualTableU32U8Lookup`:
652/// - Table2 is only looked up if table1 returns a non-zero value
653/// - Results are passed to the user function as (u8x16, u8x16, num_bytes)
654///
655/// UPDATE: Testing on Intel Xeon shows that this kernel is maybe 20% slower than the V2 kernel with an optimized writer.
656pub struct SimdDualTableWithHashLookup<'a> {
657    lookup_table1: &'a [u8],
658    lookup_table2: &'a FxHashMap<u32, u8>,
659}
660
661impl<'a> SimdDualTableWithHashLookup<'a> {
662    /// Creates a new dual table lookup kernel with table1 as a slice and table2 as FxHashMap.
663    #[inline]
664    pub fn new(lookup_table1: &'a [u8], lookup_table2: &'a FxHashMap<u32, u8>) -> Self {
665        Self {
666            lookup_table1,
667            lookup_table2,
668        }
669    }
670
671    /// Given two slices of equal length &[u32] indices, looks up each one and calls the user given function
672    /// on assembled u8x16 results.
673    /// - lookup_table1 (slice) is used for the first slice
674    /// - lookup_table2 (FxHashMap) is used for the second slice
675    /// - Table2 is only looked up if table1 returns a non-zero value
676    /// - The user function is passed (lookedup_values1: u8x16, lookedup_values2: u8x16, num_bytes)
677    #[inline]
678    pub fn lookup_func<F>(&self, values1: &[u32], values2: &[u32], f: &mut F)
679    where
680        F: FnMut(u8x16, u8x16, usize),
681    {
682        assert!(
683            values1.len() == values2.len(),
684            "Values1 and values2 must have the same length"
685        );
686        let (chunks1, rest1) = values1.as_chunks::<16>();
687        let (chunks2, rest2) = values2.as_chunks::<16>();
688
689        for (chunk1, chunk2) in chunks1.iter().zip(chunks2.iter()) {
690            let values1 = lookup_from_offsets(self.lookup_table1, chunk1);
691
692            // Conditional hash lookup for table2 - only where table1 is nonzero
693            let mut values2 = [0u8; 16];
694            for i in 0..16 {
695                if values1[i] != 0 {
696                    values2[i] = self.lookup_table2.get(&chunk2[i]).copied().unwrap_or(0);
697                }
698            }
699
700            (f)(u8x16::from(values1), u8x16::from(values2), 16);
701        }
702
703        // Handle the rest
704        if !rest1.is_empty() {
705            let mut values1 = [0u8; 16];
706            let mut values2 = [0u8; 16];
707            for i in 0..rest1.len() {
708                values1[i] = self.lookup_table1[rest1[i] as usize];
709                if values1[i] != 0 {
710                    values2[i] = self.lookup_table2.get(&rest2[i]).copied().unwrap_or(0);
711                }
712            }
713            (f)(u8x16::from(values1), u8x16::from(values2), rest1.len());
714        }
715    }
716
717    /// Convenience function which does dual lookup, combines the results using a user-defined combiner function,
718    /// and extends the combined results into a Vec (pushing all combined results)
719    #[inline]
720    pub fn lookup_into_vec<F>(
721        &self,
722        values1: &[u32],
723        values2: &[u32],
724        output: &mut Vec<u8>,
725        f: &mut F,
726    ) where
727        F: FnMut(u8x16, u8x16) -> u8x16,
728    {
729        assert!(
730            values1.len() == values2.len(),
731            "Values1 and values2 must have the same length"
732        );
733
734        let mut write_guard = output.bulk_extend_guard(values1.len());
735        let mut write_slice = write_guard.as_mut_slice();
736        let mut num_written = 0;
737        self.lookup_func(
738            values1,
739            values2,
740            &mut |lookedup_values1, lookedup_values2, num_bytes| {
741                let combined = (f)(lookedup_values1, lookedup_values2);
742                write_slice.write_u8x16(num_written, combined, num_bytes);
743                num_written += num_bytes;
744            },
745        );
746    }
747}
748
749/// SIMD "Cascading" 2nd/3rd Table Lookup Kernel
750///
751/// This kernel is designed to "cascade" and build on top of the primary SingleTable kernel to efficiently look up
752/// secondary or additional tables.  How does this work?
753/// - First call [SimdSingleTableU32U8Lookup] to look up the primary table, using the
754///   `lookup_compress_into_nonzeroes()` method.  This returns compressed results and indices of the nonzero results.
755/// - Now feed these Vecs into this kernel, which uses compressed output to do a packed lookup into the second
756///   table.  This is faster than having to filter all the results from the first kernel.
757/// - The lookup function is called for nonzero table1 results and looked up second table lookups, and should
758///   return results for all 16 values in the u8x16.
759/// - Then, this kernel will COMPRESS the results and again output nonzero results and indices, filtered from the
760///   input.
761///
762/// Basically, this kernel can be cascaded for additional tables.
763///
764/// The theory is that this cascading and packed lookup approach allows us to come closest to kernels where
765/// even with multiple tables, the runtime is roughly O(num_nonzero_lookups).
766/// UPDATE 12/2/2025: Intel Xeon results show that, even at huge (15M) tables, this results in a 40% speedup over
767///   the V2 kernel.  The speedups increase for smaller table sizes - 4M shows over 50% increase, and even bigger
768///   for smaller tables - which shows that this design inherently scales well.
769#[derive(Debug, Clone)]
770pub struct SimdCascadingTableU32U8Lookup<'a> {
771    lookup_table: &'a [u8],
772}
773
774impl<'a> SimdCascadingTableU32U8Lookup<'a> {
775    #[inline]
776    pub fn new(lookup_table: &'a [u8]) -> Self {
777        Self { lookup_table }
778    }
779
780    /// Given a slice of u32 values, looks up each one.
781    /// Designed to work in cascading mode.  One needs to pass in the nonzero_results and indices output from
782    /// [SimdSingleTableU32U8Lookup]::lookup_compress_into_nonzeroes(), along with the values (which are the keys
783    /// for the lookup table in this struct).
784    ///
785    /// For this to be efficient, the length of values probably should be at least hundreds or thousands of values.
786    ///
787    /// ## Arguments
788    /// - `values` - &[u32] of indices to lookup.  NOTE: these are ORIGINAL values, NOT filtered, thus
789    ///   its length should be the same length as the values fed into [SimdSingleTableU32U8Lookup] kernel.
790    ///   In other words, the length of values will probably be larger than in_nonzero_results.
791    /// - `in_nonzero_results` - &[u8] of nonzero results from [SimdSingleTableU32U8Lookup]::lookup_compress_into_nonzeroes()
792    /// - `in_indices` - &[u32] of indices from [SimdSingleTableU32U8Lookup]::lookup_compress_into_nonzeroes()
793    ///   These indices should be indices into the values array.
794    /// - `f` - function to mix the results from nonzero_results and the looked up values from this lookup table.
795    ///         The results (u8x16) returned from this function, will be zero-compressed along with indices to
796    ///         generate more nonzero output.
797    /// - `out_results` - &mut Vec<u8> to store the nonzero results from the lookup function f
798    /// - `out_indices` - &mut Vec<u32>, basically same as input indices but with nonzeroes compressed out
799    #[inline]
800    pub fn cascading_lookup<F>(&self,
801        values: &[u32],
802        in_nonzero_results: &[u8],
803        in_indices: &[u32],
804        f: F,
805        out_results: &mut Vec<u8>,
806        out_indices: &mut Vec<u32>)
807    where
808        F: Fn(u8x16, u8x16) -> u8x16,
809    {
810        // Use BulkVecExtender for bulk mutable slices - enables VCOMPRESS and efficient writes
811        // Ensure at least 16 elements for compress_store functions
812        // Allocate extra space to ensure remainder always has 16 elements available
813        let min_len = (in_nonzero_results.len() + 16).max(16);
814        let mut result_guard = out_results.bulk_extend_guard(min_len);
815        let result_slice = result_guard.as_mut_slice();
816        let mut indices_guard = out_indices.bulk_extend_guard(min_len);
817        let indices_slice = indices_guard.as_mut_slice();
818        let mut num_written = 0;
819
820        let zeroes = u8x16::splat(0);
821
822        let (in_nonzero_chunks, in_nonzero_rest) = in_nonzero_results.as_chunks::<16>();
823        let (in_indices_chunks, in_indices_rest) = in_indices.as_chunks::<16>();
824
825        for (nonzero_chunk, indices_chunk) in in_nonzero_chunks.iter().zip(in_indices_chunks.iter()) {
826            let in_results = u8x16::from(*nonzero_chunk);
827            let in_indices_simd = u32x16::from(*indices_chunk);
828
829            // Gather the lookup keys from the values array
830            // NOTE: This is a great use for SIMD GATHER, since the indices should be very very close together
831            //       in memory, so it should benefit from caching
832            // Scale is 4 bytes per u32 element
833            let lookup_keys = gather_u32index_u32(in_indices_simd, values, 4);
834
835            // Lookup the values from the lookup table.  We hope this is much faster than a branch-based lookup.
836            // TODO: switch this back to scalar lookup if this turns out to be slow
837            let lookedup_values = gather_u32index_u8(lookup_keys, self.lookup_table, 1);
838
839            // Mix the results and compress the output
840            let mixed_results = f(in_results, lookedup_values);
841
842            // Check which values are nonzero and convert to bitmask
843            // simd_eq returns 0xFF where equal to zero, so invert to get nonzero mask
844            let eq_mask = mixed_results.simd_eq(zeroes).to_bitmask();
845            let nonzero_mask = !eq_mask as u16;
846
847            // Compress nonzero values into result_slice
848            // We always have at least 16 elements available because we allocated (len + 16)
849            let num_nonzeroes = compress_store_u8x16(mixed_results, nonzero_mask, &mut result_slice[num_written..]);
850            let _ = compress_store_u32x16(in_indices_simd, nonzero_mask, &mut indices_slice[num_written..]);
851            num_written += num_nonzeroes;
852        }
853
854        // Handle the "rest" of the inputs using scalar loop
855        // Build up arrays of remaining elements to pass through the mix function
856        if !in_nonzero_rest.is_empty() {
857            let mut in_results_arr = [0u8; 16];
858            let mut lookedup_arr = [0u8; 16];
859            let mut indices_arr = [0u32; 16];
860
861            for (i, (&in_result, &in_idx)) in in_nonzero_rest.iter().zip(in_indices_rest.iter()).enumerate() {
862                // Look up the key from the original values array
863                let lookup_key = values[in_idx as usize];
864                // Look up the value from the lookup table
865                let lookedup_value = self.lookup_table[lookup_key as usize];
866
867                in_results_arr[i] = in_result;
868                lookedup_arr[i] = lookedup_value;
869                indices_arr[i] = in_idx;
870            }
871
872            // Call the mix function on the padded arrays
873            let mixed = f(u8x16::from(in_results_arr), u8x16::from(lookedup_arr));
874            let mixed_arr = mixed.to_array();
875
876            // Write nonzero results only for the valid elements
877            for i in 0..in_nonzero_rest.len() {
878                if mixed_arr[i] != 0 {
879                    result_slice[num_written] = mixed_arr[i];
880                    indices_slice[num_written] = indices_arr[i];
881                    num_written += 1;
882                }
883            }
884        }
885
886        result_guard.set_written(num_written);
887        indices_guard.set_written(num_written);
888    }
889}
890
891// =================================================================================================
892// DESIGN SKETCH: Bitmap-Based Cascading Lookup (Alternative to Index-Based Design)
893// =================================================================================================
894//
895// The current `SimdCascadingTableU32U8Lookup` outputs explicit `Vec<u32>` indices alongside results.
896// An alternative design could output **bitmaps** instead of indices, which may be advantageous in
897// certain scenarios.
898//
899// ## Current Design (Index-Based)
900//
901// ```text
902// First stage output:
903//   nonzero_results: [x, y, z, w, v]     (packed u8 values)
904//   indices:         [1, 2, 4, 7, 9]     (explicit u32 positions, 4 bytes each)
905//
906// Cascading stage:
907//   - VGATHER from values2 using indices
908//   - Combine and VCOMPRESS results + indices
909//   - Output: filtered results + filtered indices
910// ```
911//
912// ## Alternative Design (Bitmap-Based)
913//
914// ```text
915// First stage output:
916//   nonzero_results: [x, y, z, w, v]     (packed u8 values)
917//   bitmap:          0b10_1001_0110      (bits set at original positions 1,2,4,7,9)
918//
919// Cascading stage:
920//   - Process values2 sequentially in chunks aligned with bitmap
921//   - For each 16-element chunk: VCOMPRESS using bitmap bits → packed lookup keys
922//   - Combine results
923//   - Use PDEP to map result mask back to original coordinates
924//   - Output: filtered results + filtered bitmap (in original row coordinates)
925// ```
926//
927// ## Memory Footprint Comparison
928//
929// For N input elements with K nonzero results:
930// - Index-based:  K bytes (results) + 4K bytes (indices) = 5K bytes
931// - Bitmap-based: K bytes (results) + N/8 bytes (bitmap) ≈ K + N/8 bytes
932//
933// Example: N=5000, 20% density (K=1000)
934// - Index-based:  5000 bytes
935// - Bitmap-based: 1625 bytes (~3x smaller intermediate data)
936//
937// ## The PDEP Trick for Bitmap Coordinate Transformation
938//
939// After the combine function, we have a `result_mask` in **compressed coordinates** (which of the
940// packed elements survived). We need to transform this back to **original row coordinates**.
941//
942// ```text
943// input_bitmap:   0b10_1001_0110    (original positions 1,2,4,7,9 were active)
944// packed results: [x', 0, z', w', 0] (after combine: positions 0,2,3 in compressed space survived)
945// result_mask:    0b01101            (compressed coordinates: bits 0,2,3 set)
946//
947// PDEP(result_mask, input_bitmap) → 0b00_1001_0010
948//   - Deposits result_mask bits at positions specified by input_bitmap
949//   - Result: bits 1,4,7 set in original coordinates
950// ```
951//
952// The BMI2 `PDEP` instruction (available on Intel Haswell+ and AMD Zen+) enables this transformation
953// efficiently, operating on 64 bits at a time.
954//
955// ## When Bitmap Design Makes Sense
956//
957// **Use bitmap-based design when:**
958// 1. The final consumer needs a **bitmap output** (not numeric indices)
959//    - Example: Combining multiple filter results with AND/OR operations
960//    - Example: Feeding into another bitmap-based operation
961// 2. The bitmap **propagates through multiple cascade stages** without index materialization
962// 3. **Memory bandwidth is the primary bottleneck** and 3x smaller intermediates matter
963// 4. You only need **counts** (popcount on final bitmap) not actual indices
964//
965// **Use index-based design (current) when:**
966// 1. The final output requires explicit `(result, row_index)` pairs
967// 2. Indices have good **locality** (close together) - VGATHER benefits from caching
968// 3. You want **simpler code** without coordinate transformation complexity
969// 4. Density is moderate (10-30%) - most bitmap chunks are non-empty anyway
970//
971// ## Implementation Considerations
972//
973// A bitmap-based kernel would require:
974//
975// 1. **First stage**: `lookup_compress_into_bitmap()` that outputs:
976//    - `nonzero_results: Vec<u8>` (same as today)
977//    - `bitmap: Vec<u64>` (64 bits per u64, representing which rows had nonzero results)
978//
979// 2. **Cascading stage**: `cascading_lookup_bitmap()` that:
980//    - Processes values2 in chunks aligned with bitmap u64 words
981//    - Uses bitmap bits as VCOMPRESS mask to pack lookup keys
982//    - After combine, uses PDEP to transform result_mask to original coordinates
983//    - Accumulates final bitmap in original row coordinates
984//
985// 3. **Index materialization** (if needed): `bitmap_to_indices()` that:
986//    - Iterates through final bitmap, extracting set bit positions
987//    - Only called if the consumer actually needs numeric indices
988//
989// ## Tradeoffs Summary
990//
991// | Aspect                     | Index-Based (Current)      | Bitmap-Based               |
992// |----------------------------|----------------------------|----------------------------|
993// | Intermediate memory        | 5K bytes per K nonzero     | K + N/8 bytes              |
994// | Coordinate system          | Always in original coords  | Requires PDEP transform    |
995// | Final index generation     | Indices flow through       | Must iterate bitmap bits   |
996// | Code complexity            | Simpler                    | More complex               |
997// | Best for                   | Sparse results needed      | Bitmap operations/counts   |
998// | VGATHER vs VCOMPRESS       | VGATHER (scattered read)   | VCOMPRESS (sequential read)|
999//
1000// The current index-based design is preferred when explicit indices are needed in the output.
1001// The bitmap-based design would be advantageous only when the downstream consumer operates on
1002// bitmaps directly, avoiding the index materialization step entirely.
1003// =================================================================================================
1004
1005
1006#[cfg(test)]
1007mod tests {
1008    use super::*;
1009
1010    #[test]
1011    fn test_pipelined_single_table_lookup_basic() {
1012        // Create a simple lookup table: index -> index as u8
1013        let lookup_table: Vec<u8> = (0..256).map(|i| i as u8).collect();
1014        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);
1015
1016        // Test with exactly 16 values (one chunk)
1017        let values = vec![
1018            10u32, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160,
1019        ];
1020        let mut results = Vec::new();
1021        pipelined_lookup.lookup_func(&values, &mut |lookedup_values, num_bytes| {
1022            let array = lookedup_values.to_array();
1023            results.extend_from_slice(&array[..num_bytes]);
1024        });
1025
1026        let expected: Vec<u8> = values.iter().map(|&v| v as u8).collect();
1027        assert_eq!(results, expected);
1028    }
1029
1030    #[test]
1031    fn test_pipelined_single_table_lookup_multiple_chunks() {
1032        // Create a simple lookup table
1033        let lookup_table: Vec<u8> = (0..256).map(|i| i as u8).collect();
1034        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);
1035
1036        // Test with 35 values (2 full chunks + 3 remainder)
1037        let values: Vec<u32> = (1..36).collect();
1038        let mut results = Vec::new();
1039        pipelined_lookup.lookup_func(&values, &mut |lookedup_values, num_bytes| {
1040            let array = lookedup_values.to_array();
1041            results.extend_from_slice(&array[..num_bytes]);
1042        });
1043
1044        let expected: Vec<u8> = values.iter().map(|&v| v as u8).collect();
1045        assert_eq!(results, expected);
1046    }
1047
1048    #[test]
1049    fn test_pipelined_single_table_lookup_into_vec() {
1050        // Create a lookup table where each index maps to its double (mod 256)
1051        let lookup_table: Vec<u8> = (0..256).map(|i| ((i * 2) % 256) as u8).collect();
1052        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);
1053
1054        let values = vec![1u32, 2, 3, 4, 5, 100, 150, 200];
1055        let mut buffer = Vec::new();
1056        pipelined_lookup.lookup_into_vec(&values, &mut buffer);
1057
1058        let expected: Vec<u8> = values.iter().map(|&v| ((v * 2) % 256) as u8).collect();
1059        assert_eq!(buffer, expected);
1060    }
1061
1062    #[test]
1063    fn test_pipelined_vs_original_consistency() {
1064        // Test that pipelined version produces same results as original
1065        let lookup_table: Vec<u8> = (0..256).map(|i| (i ^ 0xAA) as u8).collect();
1066
1067        let original_lookup = SimdSingleTableU32U8Lookup::new(&lookup_table);
1068        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);
1069
1070        // Test with various sizes
1071        for size in [5, 16, 17, 32, 33, 100] {
1072            let values: Vec<u32> = (0..size).map(|i| (i * 7) % 256).collect();
1073
1074            let mut original_results = Vec::new();
1075            original_lookup.lookup_into_vec(&values, &mut original_results);
1076
1077            let mut pipelined_results = Vec::new();
1078            pipelined_lookup.lookup_into_vec(&values, &mut pipelined_results);
1079
1080            assert_eq!(
1081                original_results, pipelined_results,
1082                "Results differ for size {}",
1083                size
1084            );
1085        }
1086    }
1087
1088    #[test]
1089    fn test_single_table_lookup_into_vec() {
1090        // Create a simple lookup table
1091        let lookup_table = vec![0u8, 10, 20, 30, 40];
1092        let lookup = SimdSingleTableU32U8Lookup::new(&lookup_table);
1093
1094        // Test with values that are less than lookup table size
1095        let values = vec![0u32, 1, 2, 3, 4, 1, 2, 3];
1096        let mut result = Vec::new();
1097        lookup.lookup_into_vec(&values, &mut result);
1098
1099        assert_eq!(result.len(), values.len());
1100        assert_eq!(result[0], 0);
1101        assert_eq!(result[1], 10);
1102        assert_eq!(result[2], 20);
1103        assert_eq!(result[3], 30);
1104        assert_eq!(result[4], 40);
1105        assert_eq!(result[5], 10);
1106        assert_eq!(result[6], 20);
1107        assert_eq!(result[7], 30);
1108    }
1109
1110    #[test]
1111    fn test_single_table_lookup_compress_into_nonzeroes_small_input() {
1112        // Test lookup_compress_into_nonzeroes with < 16 elements
1113        let lookup_table: Vec<u8> = (0..256).map(|i| if i % 2 == 0 { i as u8 } else { 0 }).collect();
1114        let lookup = SimdSingleTableU32U8Lookup::new(&lookup_table);
1115
1116        // Test with 5 values (less than 16)
1117        let values = vec![0u32, 1, 2, 3, 4];
1118        let mut nonzero_results: Vec<u8> = Vec::new();
1119        let mut indices: Vec<u32> = Vec::new();
1120
1121        lookup.lookup_compress_into_nonzeroes(&values, &mut nonzero_results, &mut indices, 0);
1122
1123        // Expected: indices 2, 4 are nonzero (lookup_table[0]=0, [1]=0, [2]=2, [3]=0, [4]=4)
1124        assert_eq!(nonzero_results.len(), 2);
1125        assert_eq!(nonzero_results, vec![2, 4]);
1126        assert_eq!(indices, vec![2, 4]);
1127    }
1128
1129    #[test]
1130    fn test_cascading_lookup_small_input() {
1131        // Test cascading_lookup with < 16 elements in input
1132        let lookup_table1: Vec<u8> = (0..256).map(|i| i as u8).collect();
1133        let lookup_table2: Vec<u8> = (0..256).map(|i| i as u8).collect();
1134
1135        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
1136        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);
1137
1138        // Test with 3 values (less than 16)
1139        let values1: Vec<u32> = vec![1, 2, 3];
1140        let values2: Vec<u32> = vec![10, 20, 30];
1141
1142        let mut nonzero_results: Vec<u8> = Vec::new();
1143        let mut indices: Vec<u32> = Vec::new();
1144        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);
1145
1146        // All 3 values are nonzero
1147        assert_eq!(nonzero_results.len(), 3);
1148        assert_eq!(indices, vec![0, 1, 2]);
1149
1150        // Cascading lookup
1151        let mut out_results: Vec<u8> = Vec::new();
1152        let mut out_indices: Vec<u32> = Vec::new();
1153
1154        cascading_table.cascading_lookup(
1155            &values2,
1156            &nonzero_results,
1157            &indices,
1158            |v1, v2| v1 & v2,
1159            &mut out_results,
1160            &mut out_indices,
1161        );
1162
1163        // Expected mixed results:
1164        //   v1=1, v2=10, mixed=1&10=0
1165        //   v1=2, v2=20, mixed=2&20=0
1166        //   v1=3, v2=30, mixed=3&30=2
1167        // Only index 2 has nonzero result
1168        assert_eq!(out_results.len(), 1);
1169        assert_eq!(out_results[0], 2);  // 3 & 30 = 2
1170        assert_eq!(out_indices[0], 2);
1171    }
1172
1173    #[test]
1174    fn test_dual_table_lookup_into_vec() {
1175        // Create two simple lookup tables
1176        let lookup_table1 = vec![0u8, 1, 2, 3, 4];
1177        let lookup_table2 = vec![0u8, 10, 20, 30, 40];
1178        let lookup = SimdDualTableU32U8Lookup::new(&lookup_table1, &lookup_table2);
1179
1180        // Test with values that are less than lookup table size
1181        let values1 = vec![0u32, 1, 2, 3, 4, 1, 2, 3];
1182        let values2 = vec![0u32, 1, 2, 3, 4, 1, 2, 3];
1183
1184        // Use bitwise OR as the combiner function
1185        let mut result = Vec::new();
1186        lookup.lookup_into_vec(&values1, &values2, &mut result, &mut |v1, v2| v1 | v2);
1187
1188        assert_eq!(result.len(), values1.len());
1189        assert_eq!(result[0], 0); // 0
1190        assert_eq!(result[1], 1 | 10); // 11
1191        assert_eq!(result[2], 2 | 20); // 22
1192        assert_eq!(result[3], 3 | 30); // 31
1193        assert_eq!(result[4], 4 | 40); // 44
1194        assert_eq!(result[5], 1 | 10); // 11
1195        assert_eq!(result[6], 2 | 20); // 22
1196        assert_eq!(result[7], 3 | 30); // 31
1197    }
1198
1199    #[test]
1200    fn test_dual_table_lookup_into_vec_large() {
1201        // Test with a larger dataset that spans multiple u8x16 chunks
1202        let lookup_table1 = vec![1u8; 100];
1203        let lookup_table2 = vec![2u8; 100];
1204        let lookup = SimdDualTableU32U8Lookup::new(&lookup_table1, &lookup_table2);
1205
1206        // Create 50 values (more than 16, so it tests multiple chunks)
1207        let values1 = vec![0u32; 50];
1208        let values2 = vec![0u32; 50];
1209
1210        // Use addition as the combiner function
1211        let mut result = Vec::new();
1212        lookup.lookup_into_vec(&values1, &values2, &mut result, &mut |v1, v2| v1 + v2);
1213
1214        assert_eq!(result.len(), 50);
1215        // All results should be 1 + 2 = 3
1216        for &val in &result {
1217            assert_eq!(val, 3);
1218        }
1219    }
1220
1221    #[test]
1222    fn test_dual_table_v2_lookup_func_basic() {
1223        // Create two simple lookup tables
1224        let lookup_table1 = vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1225        let lookup_table2 = vec![0u8, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
1226        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);
1227
1228        // Test with values that are less than lookup table size
1229        let values1 = vec![0u32, 1, 2, 3, 4];
1230        let values2 = vec![1u32, 2, 3, 4, 5];
1231
1232        let mut table1_results = Vec::new();
1233        let mut table2_results = Vec::new();
1234        let mut num_bytes_list = Vec::new();
1235
1236        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
1237            table1_results.push(v1);
1238            table2_results.push(v2);
1239            num_bytes_list.push(num_bytes);
1240        });
1241
1242        // 5 values = 0 full chunks + 1 remainder of 5
1243        assert_eq!(num_bytes_list.len(), 1);
1244        assert_eq!(num_bytes_list[0], 5);
1245
1246        // Check table1 results
1247        let v1_array = table1_results[0].as_array();
1248        assert_eq!(v1_array[0], 0);
1249        assert_eq!(v1_array[1], 1);
1250        assert_eq!(v1_array[2], 2);
1251        assert_eq!(v1_array[3], 3);
1252        assert_eq!(v1_array[4], 4);
1253
1254        // Check table2 results - should only be looked up where table1 is nonzero
1255        let v2_array = table2_results[0].as_array();
1256        assert_eq!(v2_array[0], 0); // table1[0] == 0, so table2[0] should be 0 (not looked up)
1257        assert_eq!(v2_array[1], 20); // table1[1] == 1 (nonzero), so table2[1] == lookup_table2[values2[1]] == lookup_table2[2] == 20
1258        assert_eq!(v2_array[2], 30); // table1[2] == 2 (nonzero), so table2[2] == lookup_table2[values2[2]] == lookup_table2[3] == 30
1259        assert_eq!(v2_array[3], 40); // table1[3] == 3 (nonzero), so table2[3] == lookup_table2[values2[3]] == lookup_table2[4] == 40
1260        assert_eq!(v2_array[4], 50); // table1[4] == 4 (nonzero), so table2[4] == lookup_table2[values2[4]] == lookup_table2[5] == 50
1261    }
1262
1263    #[test]
1264    fn test_dual_table_v2_lookup_func_remainder() {
1265        // Test remainder handling - values that don't divide evenly into 16
1266        let lookup_table1 = vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1267        let lookup_table2 = vec![0u8, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
1268        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);
1269
1270        // Create 25 values (16 + 9 remainder)
1271        let values1: Vec<u32> = (0..25).map(|i| (i % 5) as u32).collect();
1272        let values2: Vec<u32> = (0..25).map(|i| ((i % 5) + 1) as u32).collect();
1273
1274        let mut table1_results = Vec::new();
1275        let mut table2_results = Vec::new();
1276        let mut num_bytes_list = Vec::new();
1277
1278        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
1279            table1_results.push(v1);
1280            table2_results.push(v2);
1281            num_bytes_list.push(num_bytes);
1282        });
1283
1284        // Should have 2 chunks: one full (16) and one remainder (9)
1285        assert_eq!(num_bytes_list.len(), 2);
1286        assert_eq!(num_bytes_list[0], 16);
1287        assert_eq!(num_bytes_list[1], 9);
1288
1289        // Check first chunk (full 16)
1290        let v1_chunk0 = table1_results[0].as_array();
1291        let v2_chunk0 = table2_results[0].as_array();
1292        for i in 0..16 {
1293            let expected_v1 = lookup_table1[values1[i] as usize];
1294            assert_eq!(v1_chunk0[i], expected_v1);
1295            if expected_v1 != 0 {
1296                assert_eq!(v2_chunk0[i], lookup_table2[values2[i] as usize]);
1297            } else {
1298                assert_eq!(v2_chunk0[i], 0);
1299            }
1300        }
1301
1302        // Check remainder chunk (9 elements)
1303        let v1_remainder = table1_results[1].as_array();
1304        let v2_remainder = table2_results[1].as_array();
1305        for i in 0..9 {
1306            let expected_v1 = lookup_table1[values1[16 + i] as usize];
1307            assert_eq!(v1_remainder[i], expected_v1);
1308            if expected_v1 != 0 {
1309                assert_eq!(v2_remainder[i], lookup_table2[values2[16 + i] as usize]);
1310            } else {
1311                assert_eq!(v2_remainder[i], 0);
1312            }
1313        }
1314        // Remaining positions in the u8x16 should be zero
1315        for i in 9..16 {
1316            assert_eq!(v1_remainder[i], 0);
1317            assert_eq!(v2_remainder[i], 0);
1318        }
1319    }
1320
1321    #[test]
1322    fn test_dual_table_v2_lookup_func_zero_filtering() {
1323        // Test that lookup2 is only performed when table1 is nonzero
1324        let lookup_table1 = vec![0u8, 0, 0, 5, 0, 0, 0, 10, 0, 0, 0];
1325        let lookup_table2 = vec![0u8, 100, 200, 50, 150, 250, 60, 70, 80, 90, 100];
1326        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);
1327
1328        // values1 will map to indices that are mostly zero in lookup_table1
1329        let values1 = vec![0u32, 1, 2, 3, 4, 5, 6, 7];
1330        let values2 = vec![1u32, 2, 3, 4, 5, 6, 7, 8]; // These would normally map to 100, 200, etc.
1331
1332        let mut table1_results = Vec::new();
1333        let mut table2_results = Vec::new();
1334
1335        lookup.lookup_func(&values1, &values2, &mut |v1, v2, _num_bytes| {
1336            table1_results.push(v1);
1337            table2_results.push(v2);
1338        });
1339
1340        let v1_array = table1_results[0].as_array();
1341        let v2_array = table2_results[0].as_array();
1342
1343        // Check that table2 is only looked up where table1 is nonzero
1344        assert_eq!(v1_array[0], 0);
1345        assert_eq!(v2_array[0], 0); // table1[0] == 0, so table2 not looked up
1346
1347        assert_eq!(v1_array[1], 0);
1348        assert_eq!(v2_array[1], 0); // table1[1] == 0, so table2 not looked up
1349
1350        assert_eq!(v1_array[2], 0);
1351        assert_eq!(v2_array[2], 0); // table1[2] == 0, so table2 not looked up
1352
1353        assert_eq!(v1_array[3], 5);
1354        assert_eq!(v2_array[3], 150); // table1[3] == 5 (nonzero), so table2[3] == lookup_table2[values2[3]] == lookup_table2[4] == 150
1355
1356        assert_eq!(v1_array[4], 0);
1357        assert_eq!(v2_array[4], 0); // table1[4] == 0, so table2 not looked up
1358
1359        assert_eq!(v1_array[5], 0);
1360        assert_eq!(v2_array[5], 0); // table1[5] == 0, so table2 not looked up
1361
1362        assert_eq!(v1_array[6], 0);
1363        assert_eq!(v2_array[6], 0); // table1[6] == 0, so table2 not looked up
1364
1365        assert_eq!(v1_array[7], 10);
1366        assert_eq!(v2_array[7], 80); // table1[7] == 10 (nonzero), so table2[7] == lookup_table2[values2[7]] == lookup_table2[8] == 80
1367    }
1368
1369    #[test]
1370    fn test_dual_table_v2_lookup_func_multiple_chunks() {
1371        // Test with multiple full chunks
1372        let lookup_table1 = vec![1u8; 100];
1373        let lookup_table2 = vec![2u8; 100];
1374        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);
1375
1376        // Create 50 values (3 full chunks: 16 + 16 + 16 + 2 remainder)
1377        let values1: Vec<u32> = (0..50).map(|i| (i % 10) as u32).collect();
1378        let values2: Vec<u32> = (0..50).map(|i| ((i % 10) + 1) as u32).collect();
1379
1380        let mut table1_results = Vec::new();
1381        let mut table2_results = Vec::new();
1382        let mut num_bytes_list = Vec::new();
1383
1384        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
1385            table1_results.push(v1);
1386            table2_results.push(v2);
1387            num_bytes_list.push(num_bytes);
1388        });
1389
1390        // Should have 4 chunks: 3 full (16 each) + 1 remainder (2)
1391        assert_eq!(num_bytes_list.len(), 4);
1392        assert_eq!(num_bytes_list[0], 16);
1393        assert_eq!(num_bytes_list[1], 16);
1394        assert_eq!(num_bytes_list[2], 16);
1395        assert_eq!(num_bytes_list[3], 2);
1396
1397        // Verify all chunks
1398        let mut global_idx = 0;
1399        for chunk_idx in 0..4 {
1400            let v1_chunk = table1_results[chunk_idx].as_array();
1401            let v2_chunk = table2_results[chunk_idx].as_array();
1402            let chunk_len = num_bytes_list[chunk_idx];
1403
1404            for i in 0..chunk_len {
1405                let expected_v1 = lookup_table1[values1[global_idx] as usize];
1406                assert_eq!(v1_chunk[i], expected_v1);
1407                // Since table1 is always nonzero (lookup_table1 is all 1s), table2 should always be looked up
1408                assert_eq!(v2_chunk[i], lookup_table2[values2[global_idx] as usize]);
1409                global_idx += 1;
1410            }
1411        }
1412    }
1413
1414    #[test]
1415    fn test_dual_table_v2_lookup_func_exact_multiple_of_16() {
1416        // Test with exactly 32 values (exactly 2 chunks, no remainder)
1417        let lookup_table1 = vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
1418        let lookup_table2 = vec![0u8, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
1419        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);
1420
1421        let values1: Vec<u32> = (0..32).map(|i| (i % 5) as u32).collect();
1422        let values2: Vec<u32> = (0..32).map(|i| ((i % 5) + 1) as u32).collect();
1423
1424        let mut table1_results = Vec::new();
1425        let mut table2_results = Vec::new();
1426        let mut num_bytes_list = Vec::new();
1427
1428        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
1429            table1_results.push(v1);
1430            table2_results.push(v2);
1431            num_bytes_list.push(num_bytes);
1432        });
1433
1434        // Should have exactly 2 chunks, no remainder
1435        assert_eq!(num_bytes_list.len(), 2);
1436        assert_eq!(num_bytes_list[0], 16);
1437        assert_eq!(num_bytes_list[1], 16);
1438
1439        // Verify both chunks
1440        let mut global_idx = 0;
1441        for chunk_idx in 0..2 {
1442            let v1_chunk = table1_results[chunk_idx].as_array();
1443            let v2_chunk = table2_results[chunk_idx].as_array();
1444
1445            for i in 0..16 {
1446                let expected_v1 = lookup_table1[values1[global_idx] as usize];
1447                assert_eq!(v1_chunk[i], expected_v1);
1448                if expected_v1 != 0 {
1449                    assert_eq!(v2_chunk[i], lookup_table2[values2[global_idx] as usize]);
1450                } else {
1451                    assert_eq!(v2_chunk[i], 0);
1452                }
1453                global_idx += 1;
1454            }
1455        }
1456    }
1457
1458    #[test]
1459    fn test_dual_table_with_hash_lookup_basic() {
1460        // Create table1 as a regular lookup table
1461        let lookup_table1: Vec<u8> =
1462            (0..256).map(|i| if i % 3 == 0 { 0 } else { i as u8 }).collect();
1463
1464        // Create table2 as a FxHashMap with sparse entries
1465        let mut hash_table2: FxHashMap<u32, u8> = FxHashMap::default();
1466        hash_table2.insert(0, 100);
1467        hash_table2.insert(5, 105);
1468        hash_table2.insert(10, 110);
1469        hash_table2.insert(15, 115);
1470        hash_table2.insert(20, 120);
1471        hash_table2.insert(100, 200);
1472
1473        let lookup = SimdDualTableWithHashLookup::new(&lookup_table1, &hash_table2);
1474
1475        // Test values
1476        let values1: Vec<u32> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
1477        let values2: Vec<u32> = vec![0, 5, 10, 15, 20, 100, 0, 5, 10, 15, 20, 100, 0, 5, 10, 15];
1478
1479        let mut table1_results = Vec::new();
1480        let mut table2_results = Vec::new();
1481
1482        lookup.lookup_func(&values1, &values2, &mut |v1, v2, _num_bytes| {
1483            table1_results.push(v1);
1484            table2_results.push(v2);
1485        });
1486
1487        assert_eq!(table1_results.len(), 1); // One chunk
1488
1489        let v1_array = table1_results[0].as_array();
1490        let v2_array = table2_results[0].as_array();
1491
1492        // Check that table2 is only looked up where table1 is nonzero
1493        for i in 0..16 {
1494            let expected_v1 = lookup_table1[values1[i] as usize];
1495            assert_eq!(v1_array[i], expected_v1, "v1 mismatch at index {}", i);
1496
1497            if expected_v1 != 0 {
1498                let expected_v2 = hash_table2.get(&values2[i]).copied().unwrap_or(0);
1499                assert_eq!(v2_array[i], expected_v2, "v2 mismatch at index {}", i);
1500            } else {
1501                assert_eq!(v2_array[i], 0, "v2 should be 0 where v1 is 0 at index {}", i);
1502            }
1503        }
1504    }
1505
1506    #[test]
1507    fn test_dual_table_with_hash_lookup_into_vec() {
1508        // Create table1 as a regular lookup table (all nonzero)
1509        let lookup_table1: Vec<u8> = (0..256).map(|i| (i + 1) as u8).collect();
1510
1511        // Create table2 as a FxHashMap
1512        let mut hash_table2: FxHashMap<u32, u8> = FxHashMap::default();
1513        hash_table2.insert(0, 10);
1514        hash_table2.insert(1, 20);
1515        hash_table2.insert(2, 30);
1516        hash_table2.insert(3, 40);
1517        hash_table2.insert(4, 50);
1518
1519        let lookup = SimdDualTableWithHashLookup::new(&lookup_table1, &hash_table2);
1520
1521        let values1: Vec<u32> = vec![0, 1, 2, 3, 4, 5, 6, 7];
1522        let values2: Vec<u32> = vec![0, 1, 2, 3, 4, 0, 1, 2];
1523
1524        let mut result = Vec::new();
1525        lookup.lookup_into_vec(&values1, &values2, &mut result, &mut |v1, v2| v1 & v2);
1526
1527        // Result should be v1 & v2
1528        assert_eq!(result.len(), 8);
1529        // v1[0] = 1, v2[0] = 10, 1 & 10 = 0
1530        assert_eq!(result[0], 1 & 10);
1531        // v1[1] = 2, v2[1] = 20, 2 & 20 = 0
1532        assert_eq!(result[1], 2 & 20);
1533    }
1534
1535    #[test]
1536    fn test_cascading_lookup_basic() {
1537        // Create lookup tables
1538        // Table 1: returns the index value (modulo 256) for indices 0-255
1539        let lookup_table1: Vec<u8> = (0..256).map(|i| i as u8).collect();
1540        // Table 2: returns 2 * index (modulo 256) for indices 0-127
1541        let lookup_table2: Vec<u8> = (0..128).map(|i| ((i * 2) % 256) as u8).collect();
1542
1543        // Create kernels
1544        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
1545        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);
1546
1547        // Create test values for table1 (indices into lookup_table1)
1548        // Some will be 0, some nonzero
1549        let values1: Vec<u32> = vec![0, 1, 2, 3, 0, 5, 0, 7, 8, 9, 0, 11, 12, 0, 14, 15];
1550
1551        // Create test values for table2 (indices into lookup_table2)
1552        // These should be looked up only where table1 is nonzero
1553        let values2: Vec<u32> = vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 1, 2, 3, 4];
1554
1555        // Step 1: Call SingleTable::lookup_compress_into_nonzeroes
1556        let mut nonzero_results: Vec<u8> = Vec::new();
1557        let mut indices: Vec<u32> = Vec::new();
1558        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);
1559
1560        // Verify the nonzero results from step 1
1561        // Values 0, 4, 6, 10, 13 are zero in lookup_table1
1562        // Nonzero: 1, 2, 3, 5, 7, 8, 9, 11, 12, 14, 15 (11 values)
1563        assert_eq!(nonzero_results.len(), 11, "Expected 11 nonzero results from table1");
1564        assert_eq!(indices.len(), 11, "Expected 11 indices");
1565
1566        // Verify the indices are correct (positions of nonzero values)
1567        let expected_indices: Vec<u32> = vec![1, 2, 3, 5, 7, 8, 9, 11, 12, 14, 15];
1568        assert_eq!(indices, expected_indices);
1569
1570        // Verify the nonzero results
1571        let expected_nonzero: Vec<u8> = vec![1, 2, 3, 5, 7, 8, 9, 11, 12, 14, 15];
1572        assert_eq!(nonzero_results, expected_nonzero);
1573
1574        // Step 2: Call CascadingTable::cascading_lookup
1575        let mut out_results: Vec<u8> = Vec::new();
1576        let mut out_indices: Vec<u32> = Vec::new();
1577
1578        // Mix function: bitwise AND
1579        cascading_table.cascading_lookup(
1580            &values2,
1581            &nonzero_results,
1582            &indices,
1583            |v1, v2| v1 & v2,
1584            &mut out_results,
1585            &mut out_indices,
1586        );
1587
1588        // Verify cascading results
1589        // For each nonzero index i:
1590        //   - in_result = lookup_table1[values1[i]] = values1[i] (since lookup_table1[x] = x)
1591        //   - lookup_key = values2[i]
1592        //   - lookedup_value = lookup_table2[lookup_key] = 2 * lookup_key
1593        //   - mixed = in_result & lookedup_value
1594        // Expected (for indices 1,2,3,5,7,8,9,11,12,14,15):
1595        //   index 1: v1=1, lookup_key=20, v2=40, mixed=1&40=0
1596        //   index 2: v1=2, lookup_key=30, v2=60, mixed=2&60=0
1597        //   index 3: v1=3, lookup_key=40, v2=80, mixed=3&80=0
1598        //   index 5: v1=5, lookup_key=60, v2=120, mixed=5&120=0
1599        //   index 7: v1=7, lookup_key=80, v2=160%256=160, mixed=7&160=0
1600        //   index 8: v1=8, lookup_key=90, v2=180%256=180, mixed=8&180=0
1601        //   index 9: v1=9, lookup_key=100, v2=200%256=200, mixed=9&200=8
1602        //   index 11: v1=11, lookup_key=120, v2=240%256=240, mixed=11&240=0
1603        //   index 12: v1=12, lookup_key=1, v2=2, mixed=12&2=0
1604        //   index 14: v1=14, lookup_key=3, v2=6, mixed=14&6=6
1605        //   index 15: v1=15, lookup_key=4, v2=8, mixed=15&8=8
1606
1607        // Only indices 9, 14, 15 have nonzero mixed results
1608        assert_eq!(out_results.len(), 3, "Expected 3 nonzero cascading results, got {:?}", out_results);
1609        assert_eq!(out_indices.len(), 3, "Expected 3 output indices");
1610
1611        // Check values
1612        assert_eq!(out_results[0], 8);  // index 9: 9 & 200 = 8
1613        assert_eq!(out_results[1], 6);  // index 14: 14 & 6 = 6
1614        assert_eq!(out_results[2], 8);  // index 15: 15 & 8 = 8
1615
1616        assert_eq!(out_indices[0], 9);
1617        assert_eq!(out_indices[1], 14);
1618        assert_eq!(out_indices[2], 15);
1619    }
1620
1621    #[test]
1622    fn test_cascading_lookup_remainder() {
1623        // Test the remainder handling (< 16 elements)
1624        let lookup_table1: Vec<u8> = (0..256).map(|i| i as u8).collect();
1625        let lookup_table2: Vec<u8> = (0..256).map(|i| i as u8).collect();
1626
1627        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
1628        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);
1629
1630        // Create 5 values (less than 16, so remainder only)
1631        let values1: Vec<u32> = vec![1, 2, 3, 4, 5];
1632        let values2: Vec<u32> = vec![10, 20, 30, 40, 50];
1633
1634        let mut nonzero_results: Vec<u8> = Vec::new();
1635        let mut indices: Vec<u32> = Vec::new();
1636        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);
1637
1638        // All 5 values are nonzero
1639        assert_eq!(nonzero_results.len(), 5);
1640        assert_eq!(indices, vec![0, 1, 2, 3, 4]);
1641
1642        // Cascading lookup
1643        let mut out_results: Vec<u8> = Vec::new();
1644        let mut out_indices: Vec<u32> = Vec::new();
1645
1646        cascading_table.cascading_lookup(
1647            &values2,
1648            &nonzero_results,
1649            &indices,
1650            |v1, v2| v1 & v2,
1651            &mut out_results,
1652            &mut out_indices,
1653        );
1654
1655        // Expected mixed results:
1656        //   v1=1, v2=10, mixed=1&10=0
1657        //   v1=2, v2=20, mixed=2&20=0
1658        //   v1=3, v2=30, mixed=3&30=2
1659        //   v1=4, v2=40, mixed=4&40=0
1660        //   v1=5, v2=50, mixed=5&50=0
1661        // Only index 2 has nonzero result
1662        assert_eq!(out_results.len(), 1);
1663        assert_eq!(out_results[0], 2);  // 3 & 30 = 2
1664        assert_eq!(out_indices[0], 2);
1665    }
1666
1667    #[test]
1668    fn test_cascading_lookup_multiple_chunks() {
1669        // Test with multiple chunks (> 16 nonzero results)
1670        let lookup_table1: Vec<u8> = (0..256).map(|i| ((i + 1) % 256) as u8).collect(); // All nonzero except 255
1671        let lookup_table2: Vec<u8> = (0..256).map(|i| i as u8).collect();
1672
1673        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
1674        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);
1675
1676        // Create 35 values (2 full chunks of 16 + 3 remainder)
1677        let values1: Vec<u32> = (0..35).collect();
1678        let values2: Vec<u32> = (0..35).collect();
1679
1680        let mut nonzero_results: Vec<u8> = Vec::new();
1681        let mut indices: Vec<u32> = Vec::new();
1682        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);
1683
1684        // All 35 values should be nonzero (lookup_table1[i] = (i+1) % 256)
1685        assert_eq!(nonzero_results.len(), 35);
1686
1687        // Cascading lookup with identity mix function (just return v2)
1688        let mut out_results: Vec<u8> = Vec::new();
1689        let mut out_indices: Vec<u32> = Vec::new();
1690
1691        cascading_table.cascading_lookup(
1692            &values2,
1693            &nonzero_results,
1694            &indices,
1695            |_v1, v2| v2,
1696            &mut out_results,
1697            &mut out_indices,
1698        );
1699
1700        // v2 = lookup_table2[values2[i]] = values2[i] = i
1701        // Nonzero for i > 0
1702        assert_eq!(out_results.len(), 34, "Expected 34 nonzero results (all except index 0)");
1703
1704        // Verify all results
1705        for (i, &result) in out_results.iter().enumerate() {
1706            assert_eq!(result, (i + 1) as u8, "Result at position {} should be {}", i, i + 1);
1707        }
1708    }
1709}