simd-lookup 0.1.0

High-performance SIMD utilities for fast table lookups, compression and data processing
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
//! Arrow-style "lookup kernel" similar to arrow-select::take::take kernel.
//! There are "columnar style" which does table lookups for one table first, and then cascading to another table,
//! and the Cascading kernel uses SIMD extensively.
//! Other kernels just do scalar lookups which are often as fast as SIMD GATHER, but all allow SIMD functions to
//! operate on looked up values.
//!
//! Anecdotally, this is not really faster than SIMD gather.
//!
//! ----------------- SIMD based lookup kernels - "Arrow" style ------------------------------------
//! These operate by leveraging constant-sized array reads from a slice with SIMD operations on looked up values,
//! especially fast for multiple table lookups and combinations thereof.
//! It turns out these don't significantly improve on a series of scalar lookups.
//!
//! # CPU Feature Requirements
//!
//! ## Intel x86_64 (AVX-512)
//!
//! ### Cascading Lookup Kernel (`SimdCascadingTableU32U8Lookup`)
//!
//! The cascading kernel is heavily optimized for AVX-512 and provides significant performance
//! improvements (40-50% speedup) over scalar implementations on large tables.
//!
//! **Required CPU features:**
//! - **AVX512F** + **AVX512VL** + **AVX512VBMI2** (for `compress_store_u8x16`)
//! - **AVX512F** (for `compress_store_u32x16`)
//! - **AVX512F** + **AVX512BW** (for `gather_u32index_u8` via `simd_gather` module)
//!
//! **Summary**: Requires **AVX512F**, **AVX512VL**, **AVX512BW**, and **AVX512VBMI2**
//!
//! **Available on**: Intel Ice Lake, Tiger Lake, and later (not available on Skylake-X)
//!
//! ## ARM aarch64 (Apple Silicon M1/M2/M3)
//!
//! On ARM processors, the compress operations (`compress_store_u8x16`, `compress_store_u32x8`, etc.)
//! use NEON-optimized implementations that provide significant speedups over scalar fallbacks:
//!
//! - **`compress_store_u8x16`**: Uses NEON `TBL` instruction with precomputed shuffle indices
//!   - Eliminates 16 conditional branches from the scalar fallback
//!   - Single `vqtbl1q_u8` instruction performs all 16-byte shuffle
//!
//! - **`compress_store_u32x8`**: Uses NEON `TBL2` instruction with byte-level indices
//!   - Precomputed 256×32 byte index table avoids runtime index conversion
//!   - Uses `vqtbl2q_u8` for efficient 32-byte permutation
//!
//! - **Bitmask expansion**: Uses NEON parallel bit operations (`vceq`, `vmovl`)
//!   - Converts 8-bit mask to vector mask without scalar loops
//!
//! These optimizations are automatically enabled on ARM and require no user configuration.
//!
//! ## Fallback Behavior
//!
//! All kernels fall back to scalar/shuffle implementations when architecture-specific
//! features are not available. The fallback works on all architectures.
//!
//! ## Other Kernels
//!
//! - **`SimdSingleTableU32U8Lookup`**: Uses scalar lookups (works on all architectures)
//! - **`SimdDualTableU32U8Lookup`**: Uses scalar lookups (works on all architectures)
//! - **`PipelinedSingleTableU32U8Lookup`**: Uses scalar lookups with software prefetching

use wide::{u8x16, u32x16};

use rustc_hash::FxHashMap;

use crate::{
    bulk_vec_extender::{BulkVecExtender, SliceU8SIMDExtender}, compress_store_u8x16, compress_store_u32x16, gather_u32index_u32, gather_u32index_u8, prefetch::{L3, prefetch_eight_offsets}
};

/// Safe lookup helper: returns the value at the given offset, or 0 if out of bounds
#[inline(always)]
fn safe_lookup(lookup_table: &[u8], offset: u32) -> u8 {
    lookup_table.get(offset as usize).copied().unwrap_or_default()
}

/// Lookup values from lookup table using u32 offsets
/// Returns the looked up u8 values as a [u8; 16] array
/// This centralizes the lookup logic and avoids code duplication
/// Out-of-bounds offsets return 0 instead of panicking
#[inline]
fn lookup_from_offsets(lookup_table: &[u8], offsets: &[u32; 16]) -> [u8; 16] {
    [
        safe_lookup(lookup_table, offsets[0]),
        safe_lookup(lookup_table, offsets[1]),
        safe_lookup(lookup_table, offsets[2]),
        safe_lookup(lookup_table, offsets[3]),
        safe_lookup(lookup_table, offsets[4]),
        safe_lookup(lookup_table, offsets[5]),
        safe_lookup(lookup_table, offsets[6]),
        safe_lookup(lookup_table, offsets[7]),
        safe_lookup(lookup_table, offsets[8]),
        safe_lookup(lookup_table, offsets[9]),
        safe_lookup(lookup_table, offsets[10]),
        safe_lookup(lookup_table, offsets[11]),
        safe_lookup(lookup_table, offsets[12]),
        safe_lookup(lookup_table, offsets[13]),
        safe_lookup(lookup_table, offsets[14]),
        safe_lookup(lookup_table, offsets[15]),
    ]
}

/// Single table lookup kernel with SIMD function - u32 to u8 lookup table kernel.
/// The user is responsible for generating the lookup table - so this can be used for different use cases, including
/// CASE..WHEN and bitmasking/filtering.
///
/// It allows for SIMD operations on looked up values, but SIMD isn't actually used in the lookups themselves as
/// there aren't major advantages for SIMD in terms of lookup for huge tables with random indices.
/// However, we look up 16 values at a time for efficiency.  This kernel makes sense to call on hundreds or thousands
/// of values at a time, columnar style.
///
/// Note: for general purpose expressions where the lookup can be any type, instead just use `arrow::compute::take()`
/// to do a very efficient lookup where the lookup table can be any type, but then you pay the cost of write memory
/// I/O.  These kernels here allow user to operate on each looked up u8x16 and do something.
///
#[derive(Debug, Clone)]
pub struct SimdSingleTableU32U8Lookup<'a> {
    lookup_table: &'a [u8],
}

impl<'a> SimdSingleTableU32U8Lookup<'a> {
    #[inline]
    pub fn new(lookup_table: &'a [u8]) -> Self {
        Self { lookup_table }
    }

    /// Given a slice of u32 values, looks up each one and calls the user given function on an assembled u8x16 (16
    /// looked up values) at a time.
    ///
    /// The user function is passed (lookedup_values: u8x16, num_bytes: usize),
    /// where num_bytes is 16 other than the last/remainder chunk, where it may be less than that.
    ///
    /// If the slice does not divide evenly into 16-item chunks, the rest is handled by filling missing values in the
    /// u8x16 with zeroes.  Thus, the lookup assumes the zero is basically a NOP.
    #[inline]
    pub fn lookup_func<F>(&self, values: &[u32], f: &mut F)
    where
        F: FnMut(u8x16, usize),
    {
        let (chunks, rest) = values.as_chunks::<16>();
        for chunk in chunks {
            // Try prefetching lookup table entries with L3 cache level
            // NOTE: the below compiles down to nothing in release mode, as Rust can prove the below is statically
            //       safe with no need for bounds checking.
            // let first_half: &[u32; 8] = chunk[..8].try_into().unwrap();
            // let second_half: &[u32; 8] = chunk[8..].try_into().unwrap();

            // prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], first_half);
            // prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], second_half);

            // Get looked up values using centralized function
            let values = lookup_from_offsets(&self.lookup_table, chunk);
            (f)(u8x16::from(values), 16);
        }

        // Handle the rest... just loop and do a lookup, feed to user function with 0's for items not in the slice.
        if !rest.is_empty() {
            let mut values = [0u8; 16];
            for i in 0..rest.len() {
                values[i] = self.lookup_table[rest[i] as usize];
            }
            (f)(u8x16::from(values), rest.len());
        }
    }

    /// Convenience function which does lookup and writes the results into a Vec of the same length as the input slice.
    /// Does not transform the looked up values.  Actually, extends a mutable Vec of u8.
    #[inline]
    pub fn lookup_into_vec(&self, values: &[u32], buffer: &mut Vec<u8>) {
        let mut write_guard = buffer.bulk_extend_guard(values.len());
        let mut write_slice = write_guard.as_mut_slice();
        let mut num_written = 0;
        self.lookup_func(values, &mut |lookedup_values, num_bytes| {
            write_slice.write_u8x16(num_written, lookedup_values, num_bytes);
            num_written += num_bytes;
        });
    }

    /// Version of lookup_into_vec which writes into a mutable u8x16 buffer, for cascaded lookups
    #[inline]
    pub fn lookup_into_u8x16_buffer(&self, values: &[u32], buffer: &mut [u8x16]) {
        assert!(
            (buffer.len() * 16) >= values.len(),
            "Buffer must be at least as long as the input values"
        );
        let mut idx = 0;
        self.lookup_func(values, &mut |lookedup_values, _num_bytes| {
            buffer[idx] = lookedup_values;
            idx += 1;
        });
    }

    /// Prepares a Vec of u8x16 for lookup_into_u8x16_buffer by setting the length and preparing.
    /// The Vec is extended by the amount necessary to hold the results.
    ///
    /// ## Safety
    /// - We unsafe set the length because we know we will overwrite every element.
    ///
    #[inline]
    pub fn lookup_extend_u8x16_vec(&self, values: &[u32], vec: &mut Vec<u8x16>) {
        let needed = values.len().div_ceil(16);
        let mut guard = vec.bulk_extend_guard(needed);
        self.lookup_into_u8x16_buffer(values, guard.as_mut_slice());
        // guard drops here, automatically finalizes to correct length
    }

    /// Convenience function which compresses and extends two Vecs:
    /// - `nonzero_results` - Vec<u8> of nonzero looked up u8 results
    /// - `indices` - Vec<u32> of indices of the nonzero results
    ///
    /// This method is intended to be used with the cascading SIMD kernels which extend lookup into two or more
    /// tables by leveraging the nonzero output to do packed lookups into the second table.
    ///
    /// ## Arguments
    /// - `values` - &[u32] of indices to lookup
    /// - `nonzero_results` - &mut Vec<u8> to store the nonzero looked up u8 results
    /// - `indices` - &mut Vec<u32> to store the indices of the nonzero results
    /// - `base_index` - base index value for the indices output.
    ///
    /// For example, if you wanted to extend empty Vecs (reusing them as temporary buffers), then
    /// pass `base_index = 0` and the indices will be 0, 16, 32, etc.  Also pass empty Vecs, and clear them
    /// every time before calling.
    ///
    /// ## Performance and Architecture
    ///
    /// The lookup function is heavily optimized for Intel AVX512, using VCOMPRESS kernel (simd_compress.rs).
    /// Using VCOMPRESS this is nearly as fast as lookup_into_vec() which does nothing but copy the results!
    /// On other platforms, it falls back to a scalar approach which will be potentially much slower.
    ///
    #[inline]
    pub fn lookup_compress_into_nonzeroes(&self, values: &[u32], nonzero_results: &mut Vec<u8>, indices: &mut Vec<u32>, base_index: u32) {
        // First, set up a SIMD vector of indices starting at the base index, representing indices of elements
        let mut indices_simd = u32x16::from([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]);
        indices_simd = indices_simd + u32x16::splat(base_index);

        let sixteen = u32x16::splat(16);
        let zeroes = u8x16::splat(0);

        // Use BulkVecExtender for bulk mutable slices - enables VCOMPRESS and efficient writes
        // Ensure at least 16 elements for compress_store functions
        // Allocate extra space to ensure remainder always has 16 elements available
        let min_len = (values.len() + 16).max(16);
        let mut result_guard = nonzero_results.bulk_extend_guard(min_len);
        let result_slice = result_guard.as_mut_slice();
        let mut indices_guard = indices.bulk_extend_guard(min_len);
        let indices_slice = indices_guard.as_mut_slice();
        let mut num_written = 0;

        self.lookup_func(values, &mut |lookedup_values, num_bytes| {
            // Check which values are nonzero and convert to bitmask
            // simd_eq returns 0xFF where equal to zero, so invert to get nonzero mask
            let eq_mask = lookedup_values.simd_eq(zeroes).to_bitmask();
            let mut nonzero_mask = !eq_mask as u16;

            // For remainder chunks (< 16 elements), mask out invalid elements
            if num_bytes < 16 {
                nonzero_mask &= (1u16 << num_bytes) - 1;
            }

            // Compress nonzero values into result_slice
            // We always have at least 16 elements available because we allocated values.len().max(16)
            // and process inputs sequentially (each chunk produces at most 16 outputs)
            let written = compress_store_u8x16(lookedup_values, nonzero_mask, &mut result_slice[num_written..]);
            let _ = compress_store_u32x16(indices_simd, nonzero_mask, &mut indices_slice[num_written..]);
            num_written += written;

            // Update indices based on num_bytes
            if num_bytes < 16 {
                indices_simd = indices_simd + u32x16::splat(num_bytes as u32);
            } else {
                indices_simd = indices_simd + sixteen;
            }
        });
        result_guard.set_written(num_written);
        indices_guard.set_written(num_written);
    }
}

/// Pipelined single table lookup kernel - u32 to u8 lookup table kernel with prefetch pipelining
///
/// This version pipelines prefetch operations with the actual lookup work to hide memory latency.
/// The algorithm works as follows:
/// 1. Read values from current chunk addresses
/// 2. Prefetch next chunk addresses while processing current values
/// 3. Call SIMD lookup function on current values
/// 4. Loop to next chunk
///
/// This pipelining allows memory prefetch latency to be hidden behind computation work.
///
/// Results: End Nov 2025: On Intel boxes this gives slight advantage - maybe up to 5%.  Regular scalar reads
/// though, even with large random Lookup tables, are done well enough that this doesn't help much.
#[derive(Debug, Clone)]
pub struct PipelinedSingleTableU32U8Lookup<'a> {
    lookup_table: &'a [u8],
}

impl<'a> PipelinedSingleTableU32U8Lookup<'a> {
    #[inline]
    pub fn new(lookup_table: &'a [u8]) -> Self {
        Self { lookup_table }
    }

    /// Pipelined lookup function that prefetches the next chunk while processing the current one
    ///
    /// The pipelining strategy:
    /// - Process chunks of 16 u32 values at a time
    /// - For each chunk: prefetch next chunk addresses, then process current chunk
    /// - This hides prefetch latency behind the lookup computation work
    #[inline]
    pub fn lookup_func<F>(&self, values: &[u32], f: &mut F)
    where
        F: FnMut(u8x16, usize),
    {
        let (chunks, rest) = values.as_chunks::<16>();

        if chunks.is_empty() {
            // Handle case where we have fewer than 16 values total
            if !rest.is_empty() {
                self.process_remainder(rest, f);
            }
            return;
        }

        // Deep prefetch pipeline: prefetch PREFETCH_DISTANCE chunks ahead (64 values)
        // This hides DRAM latency (~200-400 cycles) by having multiple memory requests in flight
        const PREFETCH_DISTANCE: usize = 4; // 4 chunks = 64 values ahead

        // Helper to prefetch a chunk
        let prefetch_chunk = |chunk: &[u32; 16]| {
            let first_half: &[u32; 8] = chunk[..8].try_into().unwrap();
            let second_half: &[u32; 8] = chunk[8..].try_into().unwrap();
            prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], first_half);
            prefetch_eight_offsets::<_, L3>(&self.lookup_table[0], second_half);
        };

        // For small number of chunks, fall back to simple processing
        if chunks.len() <= PREFETCH_DISTANCE {
            for chunk in chunks {
                let values = lookup_from_offsets(&self.lookup_table, chunk);
                (f)(u8x16::from(values), 16);
            }
            if !rest.is_empty() {
                self.process_remainder(rest, f);
            }
            return;
        }

        // Prime the prefetch pipeline: prefetch first PREFETCH_DISTANCE chunks
        for i in 0..PREFETCH_DISTANCE {
            prefetch_chunk(&chunks[i]);
        }

        // Main loop: process chunk i while prefetching chunk i+PREFETCH_DISTANCE
        for i in 0..chunks.len() {
            // Prefetch chunk i+PREFETCH_DISTANCE (if it exists)
            if i + PREFETCH_DISTANCE < chunks.len() {
                prefetch_chunk(&chunks[i + PREFETCH_DISTANCE]);
            }

            // Process chunk i (which was prefetched PREFETCH_DISTANCE iterations ago)
            let values = lookup_from_offsets(&self.lookup_table, &chunks[i]);
            (f)(u8x16::from(values), 16);
        }

        // Handle remainder
        if !rest.is_empty() {
            self.process_remainder(rest, f);
        }
    }

    /// Process remainder elements (< 16 elements)
    #[inline]
    fn process_remainder<F>(&self, rest: &[u32], f: &mut F)
    where
        F: FnMut(u8x16, usize),
    {
        let mut values = [0u8; 16];
        for i in 0..rest.len() {
            values[i] = self.lookup_table[rest[i] as usize];
        }
        (f)(u8x16::from(values), rest.len());
    }

    /// Convenience function which does lookup and writes the results into a Vec
    #[inline]
    pub fn lookup_into_vec(&self, values: &[u32], buffer: &mut Vec<u8>) {
        let mut write_guard = buffer.bulk_extend_guard(values.len());
        let write_slice = write_guard.as_mut_slice();
        let mut num_written = 0;
        self.lookup_func(values, &mut |lookedup_values, num_bytes| {
            let target_slice = &mut write_slice[num_written..num_written + num_bytes];
            target_slice.copy_from_slice(&lookedup_values.to_array()[..num_bytes]);
            num_written += num_bytes;
        });
        // guard drops here, automatically finalizes to correct length
    }
}

/// Dual lookup table kernel - u32 to u8 lookup table kernel with custom SIMD function for combining the results.
/// Second lookup table is only looked up if the first lookup table returns a non-zero value.
/// The lookup functions are scalar and other than the SIMD combining function, no SIMD is used - and the code is
/// very simple, so this function is a good fit for non-AVX architectures (like Apple M*) where faster SIMD
/// instructions like VCOMPRESS are not available.
/// OTOH, for Intel/AVX512+ architectures, the [CascadingTableU32U8Lookup] kernel is faster.
///
/// The user is responsible for generating the lookup tables - so this can be used for different use cases, including
/// CASE..WHEN and bitmasking/filtering.
#[derive(Debug, Clone)]
pub struct SimdDualTableU32U8Lookup<'a> {
    lookup_table1: &'a [u8],
    lookup_table2: &'a [u8],
}

impl<'a> SimdDualTableU32U8Lookup<'a> {
    /// Creates a new dual table lookup kernel with the given lookup tables.
    #[inline]
    pub fn new(lookup_table1: &'a [u8], lookup_table2: &'a [u8]) -> Self {
        Self {
            lookup_table1,
            lookup_table2,
        }
    }

    /// Given two slices of equal length &[u32] indices, looks up each one and calls the user given function
    /// on assembled u8x16 results.
    /// - lookup_table1 is used for the first slice, lookup_table2 is used for the second slice.
    /// - The user function is passed (lookedup_values1: u8x16, lookedup_values2: u8x16, num_bytes), where
    ///   num_bytes is 16 other than the last/remainder chunk, where it may be less than that.
    /// - If the slices do not divide evenly into 16-item chunks, the rest is handled by filling missing values in the
    ///   u8x16 with zeroes.  Thus, the lookup assumes the zero is basically a NOP.
    #[inline]
    pub fn lookup_func<F>(&self, values1: &[u32], values2: &[u32], f: &mut F)
    where
        F: FnMut(u8x16, u8x16, usize),
    {
        assert!(
            values1.len() == values2.len(),
            "Values1 and values2 must have the same length"
        );
        let (chunks1, rest1) = values1.as_chunks::<16>();
        let (chunks2, rest2) = values2.as_chunks::<16>();

        for (chunk1, chunk2) in chunks1.iter().zip(chunks2.iter()) {
            let values1 = lookup_from_offsets(self.lookup_table1, chunk1);

            // Conditional lookup for table2 - only where table1 is nonzero
            let mut values2 = [0u8; 16];
            for i in 0..16 {
                if values1[i] != 0 {
                    values2[i] = self.lookup_table2[chunk2[i] as usize];
                }
            }

            (f)(u8x16::from(values1), u8x16::from(values2), 16);
        }

        // Handle the rest... just loop and do a lookup, feed to user function with 0's for items not in the slice.
        if !rest1.is_empty() {
            let mut values1 = [0u8; 16];
            let mut values2 = [0u8; 16];
            for i in 0..rest1.len() {
                values1[i] = self.lookup_table1[rest1[i] as usize];
                if values1[i] != 0 {
                    values2[i] = self.lookup_table2[rest2[i] as usize];
                }
            }
            (f)(u8x16::from(values1), u8x16::from(values2), rest1.len());
        }
    }

    /// Convenience function which does dual lookup, combines the results using a user-defined combiner function,
    /// and extends the combined results into a Vec (pushing all combined results)
    ///
    /// The combiner function `f` takes two u8x16 values (looked up from table1 and table2) and returns a combined u8x16.
    /// Unlike the single table version, this dual table version requires a combiner function.
    #[inline]
    pub fn lookup_into_vec<F>(
        &self,
        values1: &[u32],
        values2: &[u32],
        output: &mut Vec<u8>,
        f: &mut F,
    ) where
        F: FnMut(u8x16, u8x16) -> u8x16,
    {
        assert!(
            values1.len() == values2.len(),
            "Values1 and values2 must have the same length"
        );

        let mut write_guard = output.bulk_extend_guard(values1.len());
        let mut write_slice = write_guard.as_mut_slice();
        let mut num_written = 0;
        self.lookup_func(
            values1,
            values2,
            &mut |lookedup_values1, lookedup_values2, num_bytes| {
                let combined = (f)(lookedup_values1, lookedup_values2);
                write_slice.write_u8x16(num_written, combined, num_bytes);
                num_written += num_bytes;
            },
        );
        // write_guard drops here, automatically finalizes to correct length.  We don't have to set final
        // number of bytes since it is the same as the input, which is the default
    }
}

// =================================================================================================
// REMOVED: SimdJoinedDualTableU32U8Lookup
// =================================================================================================
//
// This kernel was an experiment testing whether TLB swaps or non-colocated lookup tables could be
// a performance factor. The design used a single concatenated table (`joined_table = table1 ++ table2`)
// with an offset to access table2 entries: `joined_table[table2_offset + index2]`.
//
// **Design:**
// - Instead of two separate `&[u8]` lookup tables, use one contiguous allocation
// - Table1 occupies bytes `[0..table1.len()]`
// - Table2 occupies bytes `[table1.len()..table1.len() + table2.len()]`
// - Lookups into table2 add the offset: `joined_table[(table2_offset + index) as usize]`
//
// **Hypothesis:**
// Colocating tables might reduce TLB misses when alternating between table1 and table2 lookups,
// since both tables would share TLB entries for the same large allocation.
//
// **Benchmark Results:**
// In testing on both Intel (AVX-512) and ARM (Apple Silicon) platforms, the joined table design
// showed NO appreciable performance wins compared to the non-joined `SimdDualTableU32U8Lookup`.
// The TLB hypothesis did not hold in practice - modern CPUs handle multiple large allocations
// efficiently, and the offset arithmetic adds minor overhead that offsets any potential gains.
//
// The non-joined design is simpler and equally performant, so this kernel was removed.
// =================================================================================================

/// Dual table lookup kernel - u32 to u8 lookup table kernel with custom SIMD function for combining the results.
/// It tries to eliminate thrashing by using internally the single table kernel to write results out first to
/// a local temporary buffer, which is saved, then it looks up the second table, only if the first table returns
/// nonzero results - thus minimizing the number of reads from the second table.
/// By sequencing in this order, we hope to minimize the cache thrashing.
///
/// The user is responsible for generating the lookup tables - so this can be used for different use cases, including
/// CASE..WHEN and bitmasking/filtering.
/// NOTE: this is not as efficient as the newer CascadingTableU32U8Lookup kernel.
/// TODO: deprecate and remove this kernel.  It doesn't really have advantages over the others.
#[derive(Debug, Clone)]
pub struct SimdDualTableU32U8LookupV2<'a> {
    lookup1: SimdSingleTableU32U8Lookup<'a>,
    lookup2: &'a [u8],
    temp_buffer: Vec<u8x16>,
}

impl<'a> SimdDualTableU32U8LookupV2<'a> {
    #[inline]
    pub fn new(lookup_table1: &'a [u8], lookup_table2: &'a [u8]) -> Self {
        Self {
            lookup1: SimdSingleTableU32U8Lookup::new(lookup_table1),
            lookup2: lookup_table2,
            temp_buffer: Vec::with_capacity(128),
        }
    }

    /// Given two slices of equal length &[u32] indices, looks up each one and calls the user given function
    /// on assembled u8x16 results.
    /// - lookup_table1 is used for the first slice, lookup_table2 is used for the second slice.
    /// - Only if the u8 from the first lookup table is nonzero, will the second lookup table be read.
    /// - The user function is passed (lookedup_values1: u8x16, lookedup_values2: u8x16, start_idx: usize), where
    ///   start_idx is 0 for the first chunk call, 16 for the next one, etc.
    /// - If the slices do not divide evenly into 16-item chunks, the rest is handled by filling missing values in the
    ///   u8x16 with zeroes.  Thus, the lookup assumes the zero is basically a NOP.
    ///
    /// The lookup function is passed these arguments: (lookedup_values1: u8x16, lookedup_values2: u8x16, num_bytes)
    /// - num_bytes: usually 16, but may be less for the last/remainder chunk.
    #[inline]
    pub fn lookup_func<F>(&mut self, values1: &[u32], values2: &[u32], f: &mut F)
    where
        F: FnMut(u8x16, u8x16, usize),
    {
        assert!(
            values1.len() == values2.len(),
            "Values1 and values2 must have the same length"
        );

        // Clear temp_buffer for reuse
        self.temp_buffer.clear();

        // First read the first table into the temporary buffer
        self.lookup1
            .lookup_extend_u8x16_vec(values1, &mut self.temp_buffer);

        let (chunks2, rest2) = values2.as_chunks::<16>();

        // Process full chunks
        for (i, chunk2) in chunks2.iter().enumerate() {
            let table1_result = self.temp_buffer[i];
            let table1_array = table1_result.to_array();

            // Only do lookup2 for positions where table1_result is nonzero
            // Use two u64 loops, somehow it's faster than writing to [u8; 16] directly.
            let local_chunk = *chunk2;

            // Process high 8 bytes (8-15) into first u64
            let mut result_high = 0u64;
            for j in (8..16).rev() {
                result_high <<= 8;
                if table1_array[j] != 0 {
                    result_high += self.lookup2[local_chunk[j] as usize] as u64;
                }
            }

            // Process low 8 bytes (0-7) into second u64
            let mut result_low = 0u64;
            for j in (0..8).rev() {
                result_low <<= 8;
                if table1_array[j] != 0 {
                    result_low += self.lookup2[local_chunk[j] as usize] as u64;
                }
            }

            // Combine into u128 for conversion to u8x16
            let result = ((result_high as u128) << 64) | (result_low as u128);

            // Call user function with both u8x16 results
            (f)(table1_result, u8x16::from(result.to_le_bytes()), 16);
        }

        // Handle the remainder
        if !rest2.is_empty() {
            // The remainder for table1 is already in temp_buffer (lookup_extend_u8x16_vec handles it)
            let table1_result = self.temp_buffer[chunks2.len()];
            let table1_array = table1_result.to_array();
            let mut table2_result = [0u8; 16];

            for i in 0..rest2.len() {
                if table1_array[i] != 0 {
                    // Only lookup if the first table returned nonzero
                    table2_result[i] = self.lookup2[rest2[i] as usize];
                }
            }
            (f)(table1_result, u8x16::from(table2_result), rest2.len());
        }
    }
}

/// Dual table lookup kernel using FxHashMap for table2.
///
/// This kernel is optimized for the case where table2 has a very small number of entries
/// (sparse), making a hash map lookup more memory-efficient than a full lookup table.
/// NOTE: We also tried writing a kernel with PHF-based EntropyMap, but it is slower than the HashMap version.
///
/// - Table1: Standard `&[u8]` lookup table (can be large)
/// - Table2: `FxHashMap<u32, u8>` (optimized for sparse data with fast hashing)
///
/// The lookup behavior is the same as `SimdDualTableU32U8Lookup`:
/// - Table2 is only looked up if table1 returns a non-zero value
/// - Results are passed to the user function as (u8x16, u8x16, num_bytes)
///
/// UPDATE: Testing on Intel Xeon shows that this kernel is maybe 20% slower than the V2 kernel with an optimized writer.
pub struct SimdDualTableWithHashLookup<'a> {
    lookup_table1: &'a [u8],
    lookup_table2: &'a FxHashMap<u32, u8>,
}

impl<'a> SimdDualTableWithHashLookup<'a> {
    /// Creates a new dual table lookup kernel with table1 as a slice and table2 as FxHashMap.
    #[inline]
    pub fn new(lookup_table1: &'a [u8], lookup_table2: &'a FxHashMap<u32, u8>) -> Self {
        Self {
            lookup_table1,
            lookup_table2,
        }
    }

    /// Given two slices of equal length &[u32] indices, looks up each one and calls the user given function
    /// on assembled u8x16 results.
    /// - lookup_table1 (slice) is used for the first slice
    /// - lookup_table2 (FxHashMap) is used for the second slice
    /// - Table2 is only looked up if table1 returns a non-zero value
    /// - The user function is passed (lookedup_values1: u8x16, lookedup_values2: u8x16, num_bytes)
    #[inline]
    pub fn lookup_func<F>(&self, values1: &[u32], values2: &[u32], f: &mut F)
    where
        F: FnMut(u8x16, u8x16, usize),
    {
        assert!(
            values1.len() == values2.len(),
            "Values1 and values2 must have the same length"
        );
        let (chunks1, rest1) = values1.as_chunks::<16>();
        let (chunks2, rest2) = values2.as_chunks::<16>();

        for (chunk1, chunk2) in chunks1.iter().zip(chunks2.iter()) {
            let values1 = lookup_from_offsets(self.lookup_table1, chunk1);

            // Conditional hash lookup for table2 - only where table1 is nonzero
            let mut values2 = [0u8; 16];
            for i in 0..16 {
                if values1[i] != 0 {
                    values2[i] = self.lookup_table2.get(&chunk2[i]).copied().unwrap_or(0);
                }
            }

            (f)(u8x16::from(values1), u8x16::from(values2), 16);
        }

        // Handle the rest
        if !rest1.is_empty() {
            let mut values1 = [0u8; 16];
            let mut values2 = [0u8; 16];
            for i in 0..rest1.len() {
                values1[i] = self.lookup_table1[rest1[i] as usize];
                if values1[i] != 0 {
                    values2[i] = self.lookup_table2.get(&rest2[i]).copied().unwrap_or(0);
                }
            }
            (f)(u8x16::from(values1), u8x16::from(values2), rest1.len());
        }
    }

    /// Convenience function which does dual lookup, combines the results using a user-defined combiner function,
    /// and extends the combined results into a Vec (pushing all combined results)
    #[inline]
    pub fn lookup_into_vec<F>(
        &self,
        values1: &[u32],
        values2: &[u32],
        output: &mut Vec<u8>,
        f: &mut F,
    ) where
        F: FnMut(u8x16, u8x16) -> u8x16,
    {
        assert!(
            values1.len() == values2.len(),
            "Values1 and values2 must have the same length"
        );

        let mut write_guard = output.bulk_extend_guard(values1.len());
        let mut write_slice = write_guard.as_mut_slice();
        let mut num_written = 0;
        self.lookup_func(
            values1,
            values2,
            &mut |lookedup_values1, lookedup_values2, num_bytes| {
                let combined = (f)(lookedup_values1, lookedup_values2);
                write_slice.write_u8x16(num_written, combined, num_bytes);
                num_written += num_bytes;
            },
        );
    }
}

/// SIMD "Cascading" 2nd/3rd Table Lookup Kernel
///
/// This kernel is designed to "cascade" and build on top of the primary SingleTable kernel to efficiently look up
/// secondary or additional tables.  How does this work?
/// - First call [SimdSingleTableU32U8Lookup] to look up the primary table, using the
///   `lookup_compress_into_nonzeroes()` method.  This returns compressed results and indices of the nonzero results.
/// - Now feed these Vecs into this kernel, which uses compressed output to do a packed lookup into the second
///   table.  This is faster than having to filter all the results from the first kernel.
/// - The lookup function is called for nonzero table1 results and looked up second table lookups, and should
///   return results for all 16 values in the u8x16.
/// - Then, this kernel will COMPRESS the results and again output nonzero results and indices, filtered from the
///   input.
///
/// Basically, this kernel can be cascaded for additional tables.
///
/// The theory is that this cascading and packed lookup approach allows us to come closest to kernels where
/// even with multiple tables, the runtime is roughly O(num_nonzero_lookups).
/// UPDATE 12/2/2025: Intel Xeon results show that, even at huge (15M) tables, this results in a 40% speedup over
///   the V2 kernel.  The speedups increase for smaller table sizes - 4M shows over 50% increase, and even bigger
///   for smaller tables - which shows that this design inherently scales well.
#[derive(Debug, Clone)]
pub struct SimdCascadingTableU32U8Lookup<'a> {
    lookup_table: &'a [u8],
}

impl<'a> SimdCascadingTableU32U8Lookup<'a> {
    #[inline]
    pub fn new(lookup_table: &'a [u8]) -> Self {
        Self { lookup_table }
    }

    /// Given a slice of u32 values, looks up each one.
    /// Designed to work in cascading mode.  One needs to pass in the nonzero_results and indices output from
    /// [SimdSingleTableU32U8Lookup]::lookup_compress_into_nonzeroes(), along with the values (which are the keys
    /// for the lookup table in this struct).
    ///
    /// For this to be efficient, the length of values probably should be at least hundreds or thousands of values.
    ///
    /// ## Arguments
    /// - `values` - &[u32] of indices to lookup.  NOTE: these are ORIGINAL values, NOT filtered, thus
    ///   its length should be the same length as the values fed into [SimdSingleTableU32U8Lookup] kernel.
    ///   In other words, the length of values will probably be larger than in_nonzero_results.
    /// - `in_nonzero_results` - &[u8] of nonzero results from [SimdSingleTableU32U8Lookup]::lookup_compress_into_nonzeroes()
    /// - `in_indices` - &[u32] of indices from [SimdSingleTableU32U8Lookup]::lookup_compress_into_nonzeroes()
    ///   These indices should be indices into the values array.
    /// - `f` - function to mix the results from nonzero_results and the looked up values from this lookup table.
    ///         The results (u8x16) returned from this function, will be zero-compressed along with indices to
    ///         generate more nonzero output.
    /// - `out_results` - &mut Vec<u8> to store the nonzero results from the lookup function f
    /// - `out_indices` - &mut Vec<u32>, basically same as input indices but with nonzeroes compressed out
    #[inline]
    pub fn cascading_lookup<F>(&self,
        values: &[u32],
        in_nonzero_results: &[u8],
        in_indices: &[u32],
        f: F,
        out_results: &mut Vec<u8>,
        out_indices: &mut Vec<u32>)
    where
        F: Fn(u8x16, u8x16) -> u8x16,
    {
        // Use BulkVecExtender for bulk mutable slices - enables VCOMPRESS and efficient writes
        // Ensure at least 16 elements for compress_store functions
        // Allocate extra space to ensure remainder always has 16 elements available
        let min_len = (in_nonzero_results.len() + 16).max(16);
        let mut result_guard = out_results.bulk_extend_guard(min_len);
        let result_slice = result_guard.as_mut_slice();
        let mut indices_guard = out_indices.bulk_extend_guard(min_len);
        let indices_slice = indices_guard.as_mut_slice();
        let mut num_written = 0;

        let zeroes = u8x16::splat(0);

        let (in_nonzero_chunks, in_nonzero_rest) = in_nonzero_results.as_chunks::<16>();
        let (in_indices_chunks, in_indices_rest) = in_indices.as_chunks::<16>();

        for (nonzero_chunk, indices_chunk) in in_nonzero_chunks.iter().zip(in_indices_chunks.iter()) {
            let in_results = u8x16::from(*nonzero_chunk);
            let in_indices_simd = u32x16::from(*indices_chunk);

            // Gather the lookup keys from the values array
            // NOTE: This is a great use for SIMD GATHER, since the indices should be very very close together
            //       in memory, so it should benefit from caching
            // Scale is 4 bytes per u32 element
            let lookup_keys = gather_u32index_u32(in_indices_simd, values, 4);

            // Lookup the values from the lookup table.  We hope this is much faster than a branch-based lookup.
            // TODO: switch this back to scalar lookup if this turns out to be slow
            let lookedup_values = gather_u32index_u8(lookup_keys, self.lookup_table, 1);

            // Mix the results and compress the output
            let mixed_results = f(in_results, lookedup_values);

            // Check which values are nonzero and convert to bitmask
            // simd_eq returns 0xFF where equal to zero, so invert to get nonzero mask
            let eq_mask = mixed_results.simd_eq(zeroes).to_bitmask();
            let nonzero_mask = !eq_mask as u16;

            // Compress nonzero values into result_slice
            // We always have at least 16 elements available because we allocated (len + 16)
            let num_nonzeroes = compress_store_u8x16(mixed_results, nonzero_mask, &mut result_slice[num_written..]);
            let _ = compress_store_u32x16(in_indices_simd, nonzero_mask, &mut indices_slice[num_written..]);
            num_written += num_nonzeroes;
        }

        // Handle the "rest" of the inputs using scalar loop
        // Build up arrays of remaining elements to pass through the mix function
        if !in_nonzero_rest.is_empty() {
            let mut in_results_arr = [0u8; 16];
            let mut lookedup_arr = [0u8; 16];
            let mut indices_arr = [0u32; 16];

            for (i, (&in_result, &in_idx)) in in_nonzero_rest.iter().zip(in_indices_rest.iter()).enumerate() {
                // Look up the key from the original values array
                let lookup_key = values[in_idx as usize];
                // Look up the value from the lookup table
                let lookedup_value = self.lookup_table[lookup_key as usize];

                in_results_arr[i] = in_result;
                lookedup_arr[i] = lookedup_value;
                indices_arr[i] = in_idx;
            }

            // Call the mix function on the padded arrays
            let mixed = f(u8x16::from(in_results_arr), u8x16::from(lookedup_arr));
            let mixed_arr = mixed.to_array();

            // Write nonzero results only for the valid elements
            for i in 0..in_nonzero_rest.len() {
                if mixed_arr[i] != 0 {
                    result_slice[num_written] = mixed_arr[i];
                    indices_slice[num_written] = indices_arr[i];
                    num_written += 1;
                }
            }
        }

        result_guard.set_written(num_written);
        indices_guard.set_written(num_written);
    }
}

// =================================================================================================
// DESIGN SKETCH: Bitmap-Based Cascading Lookup (Alternative to Index-Based Design)
// =================================================================================================
//
// The current `SimdCascadingTableU32U8Lookup` outputs explicit `Vec<u32>` indices alongside results.
// An alternative design could output **bitmaps** instead of indices, which may be advantageous in
// certain scenarios.
//
// ## Current Design (Index-Based)
//
// ```text
// First stage output:
//   nonzero_results: [x, y, z, w, v]     (packed u8 values)
//   indices:         [1, 2, 4, 7, 9]     (explicit u32 positions, 4 bytes each)
//
// Cascading stage:
//   - VGATHER from values2 using indices
//   - Combine and VCOMPRESS results + indices
//   - Output: filtered results + filtered indices
// ```
//
// ## Alternative Design (Bitmap-Based)
//
// ```text
// First stage output:
//   nonzero_results: [x, y, z, w, v]     (packed u8 values)
//   bitmap:          0b10_1001_0110      (bits set at original positions 1,2,4,7,9)
//
// Cascading stage:
//   - Process values2 sequentially in chunks aligned with bitmap
//   - For each 16-element chunk: VCOMPRESS using bitmap bits → packed lookup keys
//   - Combine results
//   - Use PDEP to map result mask back to original coordinates
//   - Output: filtered results + filtered bitmap (in original row coordinates)
// ```
//
// ## Memory Footprint Comparison
//
// For N input elements with K nonzero results:
// - Index-based:  K bytes (results) + 4K bytes (indices) = 5K bytes
// - Bitmap-based: K bytes (results) + N/8 bytes (bitmap) ≈ K + N/8 bytes
//
// Example: N=5000, 20% density (K=1000)
// - Index-based:  5000 bytes
// - Bitmap-based: 1625 bytes (~3x smaller intermediate data)
//
// ## The PDEP Trick for Bitmap Coordinate Transformation
//
// After the combine function, we have a `result_mask` in **compressed coordinates** (which of the
// packed elements survived). We need to transform this back to **original row coordinates**.
//
// ```text
// input_bitmap:   0b10_1001_0110    (original positions 1,2,4,7,9 were active)
// packed results: [x', 0, z', w', 0] (after combine: positions 0,2,3 in compressed space survived)
// result_mask:    0b01101            (compressed coordinates: bits 0,2,3 set)
//
// PDEP(result_mask, input_bitmap) → 0b00_1001_0010
//   - Deposits result_mask bits at positions specified by input_bitmap
//   - Result: bits 1,4,7 set in original coordinates
// ```
//
// The BMI2 `PDEP` instruction (available on Intel Haswell+ and AMD Zen+) enables this transformation
// efficiently, operating on 64 bits at a time.
//
// ## When Bitmap Design Makes Sense
//
// **Use bitmap-based design when:**
// 1. The final consumer needs a **bitmap output** (not numeric indices)
//    - Example: Combining multiple filter results with AND/OR operations
//    - Example: Feeding into another bitmap-based operation
// 2. The bitmap **propagates through multiple cascade stages** without index materialization
// 3. **Memory bandwidth is the primary bottleneck** and 3x smaller intermediates matter
// 4. You only need **counts** (popcount on final bitmap) not actual indices
//
// **Use index-based design (current) when:**
// 1. The final output requires explicit `(result, row_index)` pairs
// 2. Indices have good **locality** (close together) - VGATHER benefits from caching
// 3. You want **simpler code** without coordinate transformation complexity
// 4. Density is moderate (10-30%) - most bitmap chunks are non-empty anyway
//
// ## Implementation Considerations
//
// A bitmap-based kernel would require:
//
// 1. **First stage**: `lookup_compress_into_bitmap()` that outputs:
//    - `nonzero_results: Vec<u8>` (same as today)
//    - `bitmap: Vec<u64>` (64 bits per u64, representing which rows had nonzero results)
//
// 2. **Cascading stage**: `cascading_lookup_bitmap()` that:
//    - Processes values2 in chunks aligned with bitmap u64 words
//    - Uses bitmap bits as VCOMPRESS mask to pack lookup keys
//    - After combine, uses PDEP to transform result_mask to original coordinates
//    - Accumulates final bitmap in original row coordinates
//
// 3. **Index materialization** (if needed): `bitmap_to_indices()` that:
//    - Iterates through final bitmap, extracting set bit positions
//    - Only called if the consumer actually needs numeric indices
//
// ## Tradeoffs Summary
//
// | Aspect                     | Index-Based (Current)      | Bitmap-Based               |
// |----------------------------|----------------------------|----------------------------|
// | Intermediate memory        | 5K bytes per K nonzero     | K + N/8 bytes              |
// | Coordinate system          | Always in original coords  | Requires PDEP transform    |
// | Final index generation     | Indices flow through       | Must iterate bitmap bits   |
// | Code complexity            | Simpler                    | More complex               |
// | Best for                   | Sparse results needed      | Bitmap operations/counts   |
// | VGATHER vs VCOMPRESS       | VGATHER (scattered read)   | VCOMPRESS (sequential read)|
//
// The current index-based design is preferred when explicit indices are needed in the output.
// The bitmap-based design would be advantageous only when the downstream consumer operates on
// bitmaps directly, avoiding the index materialization step entirely.
// =================================================================================================


#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_pipelined_single_table_lookup_basic() {
        // Create a simple lookup table: index -> index as u8
        let lookup_table: Vec<u8> = (0..256).map(|i| i as u8).collect();
        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);

        // Test with exactly 16 values (one chunk)
        let values = vec![
            10u32, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 130, 140, 150, 160,
        ];
        let mut results = Vec::new();
        pipelined_lookup.lookup_func(&values, &mut |lookedup_values, num_bytes| {
            let array = lookedup_values.to_array();
            results.extend_from_slice(&array[..num_bytes]);
        });

        let expected: Vec<u8> = values.iter().map(|&v| v as u8).collect();
        assert_eq!(results, expected);
    }

    #[test]
    fn test_pipelined_single_table_lookup_multiple_chunks() {
        // Create a simple lookup table
        let lookup_table: Vec<u8> = (0..256).map(|i| i as u8).collect();
        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);

        // Test with 35 values (2 full chunks + 3 remainder)
        let values: Vec<u32> = (1..36).collect();
        let mut results = Vec::new();
        pipelined_lookup.lookup_func(&values, &mut |lookedup_values, num_bytes| {
            let array = lookedup_values.to_array();
            results.extend_from_slice(&array[..num_bytes]);
        });

        let expected: Vec<u8> = values.iter().map(|&v| v as u8).collect();
        assert_eq!(results, expected);
    }

    #[test]
    fn test_pipelined_single_table_lookup_into_vec() {
        // Create a lookup table where each index maps to its double (mod 256)
        let lookup_table: Vec<u8> = (0..256).map(|i| ((i * 2) % 256) as u8).collect();
        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);

        let values = vec![1u32, 2, 3, 4, 5, 100, 150, 200];
        let mut buffer = Vec::new();
        pipelined_lookup.lookup_into_vec(&values, &mut buffer);

        let expected: Vec<u8> = values.iter().map(|&v| ((v * 2) % 256) as u8).collect();
        assert_eq!(buffer, expected);
    }

    #[test]
    fn test_pipelined_vs_original_consistency() {
        // Test that pipelined version produces same results as original
        let lookup_table: Vec<u8> = (0..256).map(|i| (i ^ 0xAA) as u8).collect();

        let original_lookup = SimdSingleTableU32U8Lookup::new(&lookup_table);
        let pipelined_lookup = PipelinedSingleTableU32U8Lookup::new(&lookup_table);

        // Test with various sizes
        for size in [5, 16, 17, 32, 33, 100] {
            let values: Vec<u32> = (0..size).map(|i| (i * 7) % 256).collect();

            let mut original_results = Vec::new();
            original_lookup.lookup_into_vec(&values, &mut original_results);

            let mut pipelined_results = Vec::new();
            pipelined_lookup.lookup_into_vec(&values, &mut pipelined_results);

            assert_eq!(
                original_results, pipelined_results,
                "Results differ for size {}",
                size
            );
        }
    }

    #[test]
    fn test_single_table_lookup_into_vec() {
        // Create a simple lookup table
        let lookup_table = vec![0u8, 10, 20, 30, 40];
        let lookup = SimdSingleTableU32U8Lookup::new(&lookup_table);

        // Test with values that are less than lookup table size
        let values = vec![0u32, 1, 2, 3, 4, 1, 2, 3];
        let mut result = Vec::new();
        lookup.lookup_into_vec(&values, &mut result);

        assert_eq!(result.len(), values.len());
        assert_eq!(result[0], 0);
        assert_eq!(result[1], 10);
        assert_eq!(result[2], 20);
        assert_eq!(result[3], 30);
        assert_eq!(result[4], 40);
        assert_eq!(result[5], 10);
        assert_eq!(result[6], 20);
        assert_eq!(result[7], 30);
    }

    #[test]
    fn test_single_table_lookup_compress_into_nonzeroes_small_input() {
        // Test lookup_compress_into_nonzeroes with < 16 elements
        let lookup_table: Vec<u8> = (0..256).map(|i| if i % 2 == 0 { i as u8 } else { 0 }).collect();
        let lookup = SimdSingleTableU32U8Lookup::new(&lookup_table);

        // Test with 5 values (less than 16)
        let values = vec![0u32, 1, 2, 3, 4];
        let mut nonzero_results: Vec<u8> = Vec::new();
        let mut indices: Vec<u32> = Vec::new();

        lookup.lookup_compress_into_nonzeroes(&values, &mut nonzero_results, &mut indices, 0);

        // Expected: indices 2, 4 are nonzero (lookup_table[0]=0, [1]=0, [2]=2, [3]=0, [4]=4)
        assert_eq!(nonzero_results.len(), 2);
        assert_eq!(nonzero_results, vec![2, 4]);
        assert_eq!(indices, vec![2, 4]);
    }

    #[test]
    fn test_cascading_lookup_small_input() {
        // Test cascading_lookup with < 16 elements in input
        let lookup_table1: Vec<u8> = (0..256).map(|i| i as u8).collect();
        let lookup_table2: Vec<u8> = (0..256).map(|i| i as u8).collect();

        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);

        // Test with 3 values (less than 16)
        let values1: Vec<u32> = vec![1, 2, 3];
        let values2: Vec<u32> = vec![10, 20, 30];

        let mut nonzero_results: Vec<u8> = Vec::new();
        let mut indices: Vec<u32> = Vec::new();
        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);

        // All 3 values are nonzero
        assert_eq!(nonzero_results.len(), 3);
        assert_eq!(indices, vec![0, 1, 2]);

        // Cascading lookup
        let mut out_results: Vec<u8> = Vec::new();
        let mut out_indices: Vec<u32> = Vec::new();

        cascading_table.cascading_lookup(
            &values2,
            &nonzero_results,
            &indices,
            |v1, v2| v1 & v2,
            &mut out_results,
            &mut out_indices,
        );

        // Expected mixed results:
        //   v1=1, v2=10, mixed=1&10=0
        //   v1=2, v2=20, mixed=2&20=0
        //   v1=3, v2=30, mixed=3&30=2
        // Only index 2 has nonzero result
        assert_eq!(out_results.len(), 1);
        assert_eq!(out_results[0], 2);  // 3 & 30 = 2
        assert_eq!(out_indices[0], 2);
    }

    #[test]
    fn test_dual_table_lookup_into_vec() {
        // Create two simple lookup tables
        let lookup_table1 = vec![0u8, 1, 2, 3, 4];
        let lookup_table2 = vec![0u8, 10, 20, 30, 40];
        let lookup = SimdDualTableU32U8Lookup::new(&lookup_table1, &lookup_table2);

        // Test with values that are less than lookup table size
        let values1 = vec![0u32, 1, 2, 3, 4, 1, 2, 3];
        let values2 = vec![0u32, 1, 2, 3, 4, 1, 2, 3];

        // Use bitwise OR as the combiner function
        let mut result = Vec::new();
        lookup.lookup_into_vec(&values1, &values2, &mut result, &mut |v1, v2| v1 | v2);

        assert_eq!(result.len(), values1.len());
        assert_eq!(result[0], 0); // 0
        assert_eq!(result[1], 1 | 10); // 11
        assert_eq!(result[2], 2 | 20); // 22
        assert_eq!(result[3], 3 | 30); // 31
        assert_eq!(result[4], 4 | 40); // 44
        assert_eq!(result[5], 1 | 10); // 11
        assert_eq!(result[6], 2 | 20); // 22
        assert_eq!(result[7], 3 | 30); // 31
    }

    #[test]
    fn test_dual_table_lookup_into_vec_large() {
        // Test with a larger dataset that spans multiple u8x16 chunks
        let lookup_table1 = vec![1u8; 100];
        let lookup_table2 = vec![2u8; 100];
        let lookup = SimdDualTableU32U8Lookup::new(&lookup_table1, &lookup_table2);

        // Create 50 values (more than 16, so it tests multiple chunks)
        let values1 = vec![0u32; 50];
        let values2 = vec![0u32; 50];

        // Use addition as the combiner function
        let mut result = Vec::new();
        lookup.lookup_into_vec(&values1, &values2, &mut result, &mut |v1, v2| v1 + v2);

        assert_eq!(result.len(), 50);
        // All results should be 1 + 2 = 3
        for &val in &result {
            assert_eq!(val, 3);
        }
    }

    #[test]
    fn test_dual_table_v2_lookup_func_basic() {
        // Create two simple lookup tables
        let lookup_table1 = vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let lookup_table2 = vec![0u8, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);

        // Test with values that are less than lookup table size
        let values1 = vec![0u32, 1, 2, 3, 4];
        let values2 = vec![1u32, 2, 3, 4, 5];

        let mut table1_results = Vec::new();
        let mut table2_results = Vec::new();
        let mut num_bytes_list = Vec::new();

        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
            table1_results.push(v1);
            table2_results.push(v2);
            num_bytes_list.push(num_bytes);
        });

        // 5 values = 0 full chunks + 1 remainder of 5
        assert_eq!(num_bytes_list.len(), 1);
        assert_eq!(num_bytes_list[0], 5);

        // Check table1 results
        let v1_array = table1_results[0].as_array();
        assert_eq!(v1_array[0], 0);
        assert_eq!(v1_array[1], 1);
        assert_eq!(v1_array[2], 2);
        assert_eq!(v1_array[3], 3);
        assert_eq!(v1_array[4], 4);

        // Check table2 results - should only be looked up where table1 is nonzero
        let v2_array = table2_results[0].as_array();
        assert_eq!(v2_array[0], 0); // table1[0] == 0, so table2[0] should be 0 (not looked up)
        assert_eq!(v2_array[1], 20); // table1[1] == 1 (nonzero), so table2[1] == lookup_table2[values2[1]] == lookup_table2[2] == 20
        assert_eq!(v2_array[2], 30); // table1[2] == 2 (nonzero), so table2[2] == lookup_table2[values2[2]] == lookup_table2[3] == 30
        assert_eq!(v2_array[3], 40); // table1[3] == 3 (nonzero), so table2[3] == lookup_table2[values2[3]] == lookup_table2[4] == 40
        assert_eq!(v2_array[4], 50); // table1[4] == 4 (nonzero), so table2[4] == lookup_table2[values2[4]] == lookup_table2[5] == 50
    }

    #[test]
    fn test_dual_table_v2_lookup_func_remainder() {
        // Test remainder handling - values that don't divide evenly into 16
        let lookup_table1 = vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let lookup_table2 = vec![0u8, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);

        // Create 25 values (16 + 9 remainder)
        let values1: Vec<u32> = (0..25).map(|i| (i % 5) as u32).collect();
        let values2: Vec<u32> = (0..25).map(|i| ((i % 5) + 1) as u32).collect();

        let mut table1_results = Vec::new();
        let mut table2_results = Vec::new();
        let mut num_bytes_list = Vec::new();

        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
            table1_results.push(v1);
            table2_results.push(v2);
            num_bytes_list.push(num_bytes);
        });

        // Should have 2 chunks: one full (16) and one remainder (9)
        assert_eq!(num_bytes_list.len(), 2);
        assert_eq!(num_bytes_list[0], 16);
        assert_eq!(num_bytes_list[1], 9);

        // Check first chunk (full 16)
        let v1_chunk0 = table1_results[0].as_array();
        let v2_chunk0 = table2_results[0].as_array();
        for i in 0..16 {
            let expected_v1 = lookup_table1[values1[i] as usize];
            assert_eq!(v1_chunk0[i], expected_v1);
            if expected_v1 != 0 {
                assert_eq!(v2_chunk0[i], lookup_table2[values2[i] as usize]);
            } else {
                assert_eq!(v2_chunk0[i], 0);
            }
        }

        // Check remainder chunk (9 elements)
        let v1_remainder = table1_results[1].as_array();
        let v2_remainder = table2_results[1].as_array();
        for i in 0..9 {
            let expected_v1 = lookup_table1[values1[16 + i] as usize];
            assert_eq!(v1_remainder[i], expected_v1);
            if expected_v1 != 0 {
                assert_eq!(v2_remainder[i], lookup_table2[values2[16 + i] as usize]);
            } else {
                assert_eq!(v2_remainder[i], 0);
            }
        }
        // Remaining positions in the u8x16 should be zero
        for i in 9..16 {
            assert_eq!(v1_remainder[i], 0);
            assert_eq!(v2_remainder[i], 0);
        }
    }

    #[test]
    fn test_dual_table_v2_lookup_func_zero_filtering() {
        // Test that lookup2 is only performed when table1 is nonzero
        let lookup_table1 = vec![0u8, 0, 0, 5, 0, 0, 0, 10, 0, 0, 0];
        let lookup_table2 = vec![0u8, 100, 200, 50, 150, 250, 60, 70, 80, 90, 100];
        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);

        // values1 will map to indices that are mostly zero in lookup_table1
        let values1 = vec![0u32, 1, 2, 3, 4, 5, 6, 7];
        let values2 = vec![1u32, 2, 3, 4, 5, 6, 7, 8]; // These would normally map to 100, 200, etc.

        let mut table1_results = Vec::new();
        let mut table2_results = Vec::new();

        lookup.lookup_func(&values1, &values2, &mut |v1, v2, _num_bytes| {
            table1_results.push(v1);
            table2_results.push(v2);
        });

        let v1_array = table1_results[0].as_array();
        let v2_array = table2_results[0].as_array();

        // Check that table2 is only looked up where table1 is nonzero
        assert_eq!(v1_array[0], 0);
        assert_eq!(v2_array[0], 0); // table1[0] == 0, so table2 not looked up

        assert_eq!(v1_array[1], 0);
        assert_eq!(v2_array[1], 0); // table1[1] == 0, so table2 not looked up

        assert_eq!(v1_array[2], 0);
        assert_eq!(v2_array[2], 0); // table1[2] == 0, so table2 not looked up

        assert_eq!(v1_array[3], 5);
        assert_eq!(v2_array[3], 150); // table1[3] == 5 (nonzero), so table2[3] == lookup_table2[values2[3]] == lookup_table2[4] == 150

        assert_eq!(v1_array[4], 0);
        assert_eq!(v2_array[4], 0); // table1[4] == 0, so table2 not looked up

        assert_eq!(v1_array[5], 0);
        assert_eq!(v2_array[5], 0); // table1[5] == 0, so table2 not looked up

        assert_eq!(v1_array[6], 0);
        assert_eq!(v2_array[6], 0); // table1[6] == 0, so table2 not looked up

        assert_eq!(v1_array[7], 10);
        assert_eq!(v2_array[7], 80); // table1[7] == 10 (nonzero), so table2[7] == lookup_table2[values2[7]] == lookup_table2[8] == 80
    }

    #[test]
    fn test_dual_table_v2_lookup_func_multiple_chunks() {
        // Test with multiple full chunks
        let lookup_table1 = vec![1u8; 100];
        let lookup_table2 = vec![2u8; 100];
        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);

        // Create 50 values (3 full chunks: 16 + 16 + 16 + 2 remainder)
        let values1: Vec<u32> = (0..50).map(|i| (i % 10) as u32).collect();
        let values2: Vec<u32> = (0..50).map(|i| ((i % 10) + 1) as u32).collect();

        let mut table1_results = Vec::new();
        let mut table2_results = Vec::new();
        let mut num_bytes_list = Vec::new();

        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
            table1_results.push(v1);
            table2_results.push(v2);
            num_bytes_list.push(num_bytes);
        });

        // Should have 4 chunks: 3 full (16 each) + 1 remainder (2)
        assert_eq!(num_bytes_list.len(), 4);
        assert_eq!(num_bytes_list[0], 16);
        assert_eq!(num_bytes_list[1], 16);
        assert_eq!(num_bytes_list[2], 16);
        assert_eq!(num_bytes_list[3], 2);

        // Verify all chunks
        let mut global_idx = 0;
        for chunk_idx in 0..4 {
            let v1_chunk = table1_results[chunk_idx].as_array();
            let v2_chunk = table2_results[chunk_idx].as_array();
            let chunk_len = num_bytes_list[chunk_idx];

            for i in 0..chunk_len {
                let expected_v1 = lookup_table1[values1[global_idx] as usize];
                assert_eq!(v1_chunk[i], expected_v1);
                // Since table1 is always nonzero (lookup_table1 is all 1s), table2 should always be looked up
                assert_eq!(v2_chunk[i], lookup_table2[values2[global_idx] as usize]);
                global_idx += 1;
            }
        }
    }

    #[test]
    fn test_dual_table_v2_lookup_func_exact_multiple_of_16() {
        // Test with exactly 32 values (exactly 2 chunks, no remainder)
        let lookup_table1 = vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let lookup_table2 = vec![0u8, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
        let mut lookup = SimdDualTableU32U8LookupV2::new(&lookup_table1, &lookup_table2);

        let values1: Vec<u32> = (0..32).map(|i| (i % 5) as u32).collect();
        let values2: Vec<u32> = (0..32).map(|i| ((i % 5) + 1) as u32).collect();

        let mut table1_results = Vec::new();
        let mut table2_results = Vec::new();
        let mut num_bytes_list = Vec::new();

        lookup.lookup_func(&values1, &values2, &mut |v1, v2, num_bytes| {
            table1_results.push(v1);
            table2_results.push(v2);
            num_bytes_list.push(num_bytes);
        });

        // Should have exactly 2 chunks, no remainder
        assert_eq!(num_bytes_list.len(), 2);
        assert_eq!(num_bytes_list[0], 16);
        assert_eq!(num_bytes_list[1], 16);

        // Verify both chunks
        let mut global_idx = 0;
        for chunk_idx in 0..2 {
            let v1_chunk = table1_results[chunk_idx].as_array();
            let v2_chunk = table2_results[chunk_idx].as_array();

            for i in 0..16 {
                let expected_v1 = lookup_table1[values1[global_idx] as usize];
                assert_eq!(v1_chunk[i], expected_v1);
                if expected_v1 != 0 {
                    assert_eq!(v2_chunk[i], lookup_table2[values2[global_idx] as usize]);
                } else {
                    assert_eq!(v2_chunk[i], 0);
                }
                global_idx += 1;
            }
        }
    }

    #[test]
    fn test_dual_table_with_hash_lookup_basic() {
        // Create table1 as a regular lookup table
        let lookup_table1: Vec<u8> =
            (0..256).map(|i| if i % 3 == 0 { 0 } else { i as u8 }).collect();

        // Create table2 as a FxHashMap with sparse entries
        let mut hash_table2: FxHashMap<u32, u8> = FxHashMap::default();
        hash_table2.insert(0, 100);
        hash_table2.insert(5, 105);
        hash_table2.insert(10, 110);
        hash_table2.insert(15, 115);
        hash_table2.insert(20, 120);
        hash_table2.insert(100, 200);

        let lookup = SimdDualTableWithHashLookup::new(&lookup_table1, &hash_table2);

        // Test values
        let values1: Vec<u32> = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
        let values2: Vec<u32> = vec![0, 5, 10, 15, 20, 100, 0, 5, 10, 15, 20, 100, 0, 5, 10, 15];

        let mut table1_results = Vec::new();
        let mut table2_results = Vec::new();

        lookup.lookup_func(&values1, &values2, &mut |v1, v2, _num_bytes| {
            table1_results.push(v1);
            table2_results.push(v2);
        });

        assert_eq!(table1_results.len(), 1); // One chunk

        let v1_array = table1_results[0].as_array();
        let v2_array = table2_results[0].as_array();

        // Check that table2 is only looked up where table1 is nonzero
        for i in 0..16 {
            let expected_v1 = lookup_table1[values1[i] as usize];
            assert_eq!(v1_array[i], expected_v1, "v1 mismatch at index {}", i);

            if expected_v1 != 0 {
                let expected_v2 = hash_table2.get(&values2[i]).copied().unwrap_or(0);
                assert_eq!(v2_array[i], expected_v2, "v2 mismatch at index {}", i);
            } else {
                assert_eq!(v2_array[i], 0, "v2 should be 0 where v1 is 0 at index {}", i);
            }
        }
    }

    #[test]
    fn test_dual_table_with_hash_lookup_into_vec() {
        // Create table1 as a regular lookup table (all nonzero)
        let lookup_table1: Vec<u8> = (0..256).map(|i| (i + 1) as u8).collect();

        // Create table2 as a FxHashMap
        let mut hash_table2: FxHashMap<u32, u8> = FxHashMap::default();
        hash_table2.insert(0, 10);
        hash_table2.insert(1, 20);
        hash_table2.insert(2, 30);
        hash_table2.insert(3, 40);
        hash_table2.insert(4, 50);

        let lookup = SimdDualTableWithHashLookup::new(&lookup_table1, &hash_table2);

        let values1: Vec<u32> = vec![0, 1, 2, 3, 4, 5, 6, 7];
        let values2: Vec<u32> = vec![0, 1, 2, 3, 4, 0, 1, 2];

        let mut result = Vec::new();
        lookup.lookup_into_vec(&values1, &values2, &mut result, &mut |v1, v2| v1 & v2);

        // Result should be v1 & v2
        assert_eq!(result.len(), 8);
        // v1[0] = 1, v2[0] = 10, 1 & 10 = 0
        assert_eq!(result[0], 1 & 10);
        // v1[1] = 2, v2[1] = 20, 2 & 20 = 0
        assert_eq!(result[1], 2 & 20);
    }

    #[test]
    fn test_cascading_lookup_basic() {
        // Create lookup tables
        // Table 1: returns the index value (modulo 256) for indices 0-255
        let lookup_table1: Vec<u8> = (0..256).map(|i| i as u8).collect();
        // Table 2: returns 2 * index (modulo 256) for indices 0-127
        let lookup_table2: Vec<u8> = (0..128).map(|i| ((i * 2) % 256) as u8).collect();

        // Create kernels
        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);

        // Create test values for table1 (indices into lookup_table1)
        // Some will be 0, some nonzero
        let values1: Vec<u32> = vec![0, 1, 2, 3, 0, 5, 0, 7, 8, 9, 0, 11, 12, 0, 14, 15];

        // Create test values for table2 (indices into lookup_table2)
        // These should be looked up only where table1 is nonzero
        let values2: Vec<u32> = vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120, 1, 2, 3, 4];

        // Step 1: Call SingleTable::lookup_compress_into_nonzeroes
        let mut nonzero_results: Vec<u8> = Vec::new();
        let mut indices: Vec<u32> = Vec::new();
        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);

        // Verify the nonzero results from step 1
        // Values 0, 4, 6, 10, 13 are zero in lookup_table1
        // Nonzero: 1, 2, 3, 5, 7, 8, 9, 11, 12, 14, 15 (11 values)
        assert_eq!(nonzero_results.len(), 11, "Expected 11 nonzero results from table1");
        assert_eq!(indices.len(), 11, "Expected 11 indices");

        // Verify the indices are correct (positions of nonzero values)
        let expected_indices: Vec<u32> = vec![1, 2, 3, 5, 7, 8, 9, 11, 12, 14, 15];
        assert_eq!(indices, expected_indices);

        // Verify the nonzero results
        let expected_nonzero: Vec<u8> = vec![1, 2, 3, 5, 7, 8, 9, 11, 12, 14, 15];
        assert_eq!(nonzero_results, expected_nonzero);

        // Step 2: Call CascadingTable::cascading_lookup
        let mut out_results: Vec<u8> = Vec::new();
        let mut out_indices: Vec<u32> = Vec::new();

        // Mix function: bitwise AND
        cascading_table.cascading_lookup(
            &values2,
            &nonzero_results,
            &indices,
            |v1, v2| v1 & v2,
            &mut out_results,
            &mut out_indices,
        );

        // Verify cascading results
        // For each nonzero index i:
        //   - in_result = lookup_table1[values1[i]] = values1[i] (since lookup_table1[x] = x)
        //   - lookup_key = values2[i]
        //   - lookedup_value = lookup_table2[lookup_key] = 2 * lookup_key
        //   - mixed = in_result & lookedup_value
        // Expected (for indices 1,2,3,5,7,8,9,11,12,14,15):
        //   index 1: v1=1, lookup_key=20, v2=40, mixed=1&40=0
        //   index 2: v1=2, lookup_key=30, v2=60, mixed=2&60=0
        //   index 3: v1=3, lookup_key=40, v2=80, mixed=3&80=0
        //   index 5: v1=5, lookup_key=60, v2=120, mixed=5&120=0
        //   index 7: v1=7, lookup_key=80, v2=160%256=160, mixed=7&160=0
        //   index 8: v1=8, lookup_key=90, v2=180%256=180, mixed=8&180=0
        //   index 9: v1=9, lookup_key=100, v2=200%256=200, mixed=9&200=8
        //   index 11: v1=11, lookup_key=120, v2=240%256=240, mixed=11&240=0
        //   index 12: v1=12, lookup_key=1, v2=2, mixed=12&2=0
        //   index 14: v1=14, lookup_key=3, v2=6, mixed=14&6=6
        //   index 15: v1=15, lookup_key=4, v2=8, mixed=15&8=8

        // Only indices 9, 14, 15 have nonzero mixed results
        assert_eq!(out_results.len(), 3, "Expected 3 nonzero cascading results, got {:?}", out_results);
        assert_eq!(out_indices.len(), 3, "Expected 3 output indices");

        // Check values
        assert_eq!(out_results[0], 8);  // index 9: 9 & 200 = 8
        assert_eq!(out_results[1], 6);  // index 14: 14 & 6 = 6
        assert_eq!(out_results[2], 8);  // index 15: 15 & 8 = 8

        assert_eq!(out_indices[0], 9);
        assert_eq!(out_indices[1], 14);
        assert_eq!(out_indices[2], 15);
    }

    #[test]
    fn test_cascading_lookup_remainder() {
        // Test the remainder handling (< 16 elements)
        let lookup_table1: Vec<u8> = (0..256).map(|i| i as u8).collect();
        let lookup_table2: Vec<u8> = (0..256).map(|i| i as u8).collect();

        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);

        // Create 5 values (less than 16, so remainder only)
        let values1: Vec<u32> = vec![1, 2, 3, 4, 5];
        let values2: Vec<u32> = vec![10, 20, 30, 40, 50];

        let mut nonzero_results: Vec<u8> = Vec::new();
        let mut indices: Vec<u32> = Vec::new();
        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);

        // All 5 values are nonzero
        assert_eq!(nonzero_results.len(), 5);
        assert_eq!(indices, vec![0, 1, 2, 3, 4]);

        // Cascading lookup
        let mut out_results: Vec<u8> = Vec::new();
        let mut out_indices: Vec<u32> = Vec::new();

        cascading_table.cascading_lookup(
            &values2,
            &nonzero_results,
            &indices,
            |v1, v2| v1 & v2,
            &mut out_results,
            &mut out_indices,
        );

        // Expected mixed results:
        //   v1=1, v2=10, mixed=1&10=0
        //   v1=2, v2=20, mixed=2&20=0
        //   v1=3, v2=30, mixed=3&30=2
        //   v1=4, v2=40, mixed=4&40=0
        //   v1=5, v2=50, mixed=5&50=0
        // Only index 2 has nonzero result
        assert_eq!(out_results.len(), 1);
        assert_eq!(out_results[0], 2);  // 3 & 30 = 2
        assert_eq!(out_indices[0], 2);
    }

    #[test]
    fn test_cascading_lookup_multiple_chunks() {
        // Test with multiple chunks (> 16 nonzero results)
        let lookup_table1: Vec<u8> = (0..256).map(|i| ((i + 1) % 256) as u8).collect(); // All nonzero except 255
        let lookup_table2: Vec<u8> = (0..256).map(|i| i as u8).collect();

        let single_table = SimdSingleTableU32U8Lookup::new(&lookup_table1);
        let cascading_table = SimdCascadingTableU32U8Lookup::new(&lookup_table2);

        // Create 35 values (2 full chunks of 16 + 3 remainder)
        let values1: Vec<u32> = (0..35).collect();
        let values2: Vec<u32> = (0..35).collect();

        let mut nonzero_results: Vec<u8> = Vec::new();
        let mut indices: Vec<u32> = Vec::new();
        single_table.lookup_compress_into_nonzeroes(&values1, &mut nonzero_results, &mut indices, 0);

        // All 35 values should be nonzero (lookup_table1[i] = (i+1) % 256)
        assert_eq!(nonzero_results.len(), 35);

        // Cascading lookup with identity mix function (just return v2)
        let mut out_results: Vec<u8> = Vec::new();
        let mut out_indices: Vec<u32> = Vec::new();

        cascading_table.cascading_lookup(
            &values2,
            &nonzero_results,
            &indices,
            |_v1, v2| v2,
            &mut out_results,
            &mut out_indices,
        );

        // v2 = lookup_table2[values2[i]] = values2[i] = i
        // Nonzero for i > 0
        assert_eq!(out_results.len(), 34, "Expected 34 nonzero results (all except index 0)");

        // Verify all results
        for (i, &result) in out_results.iter().enumerate() {
            assert_eq!(result, (i + 1) as u8, "Result at position {} should be {}", i, i + 1);
        }
    }
}