hermes_core/structures/
partitioned_ef.rs

1//! Partitioned Elias-Fano (PEF) encoding
2//!
3//! Based on Ottaviano & Venturini (2014) "Partitioned Elias-Fano Indexes"
4//!
5//! Key improvements over standard Elias-Fano:
6//! - **Optimal partitioning**: Divides sequence into chunks with (1+ε)-optimal compression
7//! - **Better compression**: 30-40% smaller than standard EF on typical data
8//! - **Fast random access**: O(1) access within partitions
9//! - **Efficient NextGEQ**: Uses partition endpoints for fast seeking
10//!
11//! The algorithm finds optimal partition boundaries that minimize total encoding size.
12//! Each partition is encoded with its own Elias-Fano instance using local parameters.
13
14use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
15use std::io::{self, Read, Write};
16
17/// Minimum partition size (smaller partitions have too much overhead)
18const MIN_PARTITION_SIZE: usize = 64;
19
20/// Maximum partition size (larger partitions lose locality benefits)
21const MAX_PARTITION_SIZE: usize = 512;
22
23/// A single partition in the PEF structure
24#[derive(Debug, Clone)]
25pub struct EFPartition {
26    /// Lower bits array
27    lower_bits: Vec<u64>,
28    /// Upper bits array (unary encoded)
29    upper_bits: Vec<u64>,
30    /// Number of elements in this partition
31    len: u32,
32    /// First value in partition (absolute)
33    first_value: u32,
34    /// Last value in partition (absolute)
35    last_value: u32,
36    /// Local universe size (last_value - first_value + 1)
37    local_universe: u32,
38    /// Number of lower bits per element
39    lower_bit_width: u8,
40}
41
42impl EFPartition {
43    /// Create a partition from a sorted slice
44    /// Values are stored relative to first_value for better compression
45    pub fn from_sorted_slice(values: &[u32]) -> Self {
46        if values.is_empty() {
47            return Self {
48                lower_bits: Vec::new(),
49                upper_bits: Vec::new(),
50                len: 0,
51                first_value: 0,
52                last_value: 0,
53                local_universe: 0,
54                lower_bit_width: 0,
55            };
56        }
57
58        let first_value = values[0];
59        let last_value = *values.last().unwrap();
60        let n = values.len() as u64;
61
62        // Local universe: encode values relative to first_value
63        let local_universe = last_value - first_value + 1;
64
65        // Calculate lower bit width using local universe
66        let lower_bit_width = if n <= 1 {
67            0
68        } else {
69            let ratio = (local_universe as u64).max(1) / n.max(1);
70            if ratio <= 1 {
71                0
72            } else {
73                (64 - ratio.leading_zeros() - 1) as u8
74            }
75        };
76
77        // Allocate arrays
78        let lower_bits_total = (n as usize) * (lower_bit_width as usize);
79        let lower_words = lower_bits_total.div_ceil(64);
80        let mut lower_bits = vec![0u64; lower_words];
81
82        let max_relative = local_universe.saturating_sub(1) as u64;
83        let upper_bound = n + (max_relative >> lower_bit_width) + 1;
84        let upper_words = (upper_bound as usize).div_ceil(64);
85        let mut upper_bits = vec![0u64; upper_words];
86
87        let lower_mask = if lower_bit_width == 0 {
88            0
89        } else {
90            (1u64 << lower_bit_width) - 1
91        };
92
93        // Encode each value relative to first_value
94        for (i, &val) in values.iter().enumerate() {
95            let relative_val = (val - first_value) as u64;
96
97            // Store lower bits
98            if lower_bit_width > 0 {
99                let lower = relative_val & lower_mask;
100                let bit_pos = i * (lower_bit_width as usize);
101                let word_idx = bit_pos / 64;
102                let bit_offset = bit_pos % 64;
103
104                lower_bits[word_idx] |= lower << bit_offset;
105                if bit_offset + (lower_bit_width as usize) > 64 && word_idx + 1 < lower_bits.len() {
106                    lower_bits[word_idx + 1] |= lower >> (64 - bit_offset);
107                }
108            }
109
110            // Store upper bits
111            let upper = relative_val >> lower_bit_width;
112            let upper_pos = (i as u64) + upper;
113            let word_idx = (upper_pos / 64) as usize;
114            let bit_offset = upper_pos % 64;
115            if word_idx < upper_bits.len() {
116                upper_bits[word_idx] |= 1u64 << bit_offset;
117            }
118        }
119
120        Self {
121            lower_bits,
122            upper_bits,
123            len: values.len() as u32,
124            first_value,
125            last_value,
126            local_universe,
127            lower_bit_width,
128        }
129    }
130
131    /// Get element at position i (0-indexed)
132    #[inline]
133    pub fn get(&self, i: u32) -> Option<u32> {
134        if i >= self.len {
135            return None;
136        }
137
138        let i = i as usize;
139
140        // Get lower bits
141        let lower = if self.lower_bit_width == 0 {
142            0u64
143        } else {
144            let bit_pos = i * (self.lower_bit_width as usize);
145            let word_idx = bit_pos / 64;
146            let bit_offset = bit_pos % 64;
147            let lower_mask = (1u64 << self.lower_bit_width) - 1;
148
149            let mut val = (self.lower_bits[word_idx] >> bit_offset) & lower_mask;
150            if bit_offset + (self.lower_bit_width as usize) > 64
151                && word_idx + 1 < self.lower_bits.len()
152            {
153                val |= (self.lower_bits[word_idx + 1] << (64 - bit_offset)) & lower_mask;
154            }
155            val
156        };
157
158        // Get upper bits via select1(i) - i
159        let select_pos = self.select1(i as u32)?;
160        let upper = (select_pos as u64) - (i as u64);
161
162        // Reconstruct absolute value
163        let relative_val = (upper << self.lower_bit_width) | lower;
164        Some(self.first_value + relative_val as u32)
165    }
166
167    /// Find position of i-th set bit (optimized with NEON on aarch64)
168    fn select1(&self, i: u32) -> Option<u32> {
169        if i >= self.len {
170            return None;
171        }
172
173        let mut remaining = i + 1;
174        let mut pos = 0u32;
175
176        // Process 4 words at a time on aarch64 using NEON popcount
177        #[cfg(target_arch = "aarch64")]
178        {
179            use std::arch::aarch64::*;
180
181            let chunks = self.upper_bits.chunks_exact(4);
182            let remainder = chunks.remainder();
183
184            for chunk in chunks {
185                // Load 4 u64 words (32 bytes)
186                let words: [u64; 4] = [chunk[0], chunk[1], chunk[2], chunk[3]];
187
188                // Count bits in each word using NEON
189                // vcntq_u8 counts bits per byte, then sum horizontally
190                unsafe {
191                    let bytes = std::mem::transmute::<[u64; 4], [u8; 32]>(words);
192
193                    // Process first 16 bytes (words 0-1)
194                    let v0 = vld1q_u8(bytes.as_ptr());
195                    let cnt0 = vcntq_u8(v0);
196                    let sum0 = vaddlvq_u8(cnt0) as u32;
197
198                    // Process next 16 bytes (words 2-3)
199                    let v1 = vld1q_u8(bytes.as_ptr().add(16));
200                    let cnt1 = vcntq_u8(v1);
201                    let sum1 = vaddlvq_u8(cnt1) as u32;
202
203                    let total_popcount = sum0 + sum1;
204
205                    if total_popcount >= remaining {
206                        // Found the chunk, now find exact word
207                        for &word in chunk {
208                            let popcount = word.count_ones();
209                            if popcount >= remaining {
210                                let mut w = word;
211                                for _ in 0..remaining - 1 {
212                                    w &= w - 1;
213                                }
214                                return Some(pos + w.trailing_zeros());
215                            }
216                            remaining -= popcount;
217                            pos += 64;
218                        }
219                    }
220                    remaining -= total_popcount;
221                    pos += 256; // 4 words * 64 bits
222                }
223            }
224
225            // Handle remaining words
226            for &word in remainder {
227                let popcount = word.count_ones();
228                if popcount >= remaining {
229                    let mut w = word;
230                    for _ in 0..remaining - 1 {
231                        w &= w - 1;
232                    }
233                    return Some(pos + w.trailing_zeros());
234                }
235                remaining -= popcount;
236                pos += 64;
237            }
238        }
239
240        // Process 4 words at a time on x86_64 using POPCNT instruction
241        #[cfg(target_arch = "x86_64")]
242        {
243            #[cfg(target_feature = "popcnt")]
244            {
245                use std::arch::x86_64::*;
246
247                let chunks = self.upper_bits.chunks_exact(4);
248                let remainder = chunks.remainder();
249
250                for chunk in chunks {
251                    // Use hardware POPCNT for each word
252                    unsafe {
253                        let p0 = _popcnt64(chunk[0] as i64) as u32;
254                        let p1 = _popcnt64(chunk[1] as i64) as u32;
255                        let p2 = _popcnt64(chunk[2] as i64) as u32;
256                        let p3 = _popcnt64(chunk[3] as i64) as u32;
257
258                        let total_popcount = p0 + p1 + p2 + p3;
259
260                        if total_popcount >= remaining {
261                            // Found the chunk, now find exact word
262                            for &word in chunk {
263                                let popcount = word.count_ones();
264                                if popcount >= remaining {
265                                    let mut w = word;
266                                    for _ in 0..remaining - 1 {
267                                        w &= w - 1;
268                                    }
269                                    return Some(pos + w.trailing_zeros());
270                                }
271                                remaining -= popcount;
272                                pos += 64;
273                            }
274                        }
275                        remaining -= total_popcount;
276                        pos += 256; // 4 words * 64 bits
277                    }
278                }
279
280                // Handle remaining words
281                for &word in remainder {
282                    let popcount = word.count_ones();
283                    if popcount >= remaining {
284                        let mut w = word;
285                        for _ in 0..remaining - 1 {
286                            w &= w - 1;
287                        }
288                        return Some(pos + w.trailing_zeros());
289                    }
290                    remaining -= popcount;
291                    pos += 64;
292                }
293            }
294
295            // Scalar fallback when POPCNT not available
296            #[cfg(not(target_feature = "popcnt"))]
297            {
298                for &word in &self.upper_bits {
299                    let popcount = word.count_ones();
300                    if popcount >= remaining {
301                        let mut w = word;
302                        for _ in 0..remaining - 1 {
303                            w &= w - 1;
304                        }
305                        return Some(pos + w.trailing_zeros());
306                    }
307                    remaining -= popcount;
308                    pos += 64;
309                }
310            }
311        }
312
313        // Scalar fallback for other architectures
314        #[cfg(not(any(target_arch = "aarch64", target_arch = "x86_64")))]
315        {
316            for &word in &self.upper_bits {
317                let popcount = word.count_ones();
318                if popcount >= remaining {
319                    let mut w = word;
320                    for _ in 0..remaining - 1 {
321                        w &= w - 1;
322                    }
323                    return Some(pos + w.trailing_zeros());
324                }
325                remaining -= popcount;
326                pos += 64;
327            }
328        }
329
330        None
331    }
332
333    /// Find first element >= target within this partition
334    /// Returns (local_position, value) or None
335    pub fn next_geq(&self, target: u32) -> Option<(u32, u32)> {
336        if self.len == 0 || target > self.last_value {
337            return None;
338        }
339
340        if target <= self.first_value {
341            return Some((0, self.first_value));
342        }
343
344        // Binary search for target
345        let mut lo = 0u32;
346        let mut hi = self.len;
347
348        while lo < hi {
349            let mid = lo + (hi - lo) / 2;
350            if let Some(val) = self.get(mid) {
351                if val < target {
352                    lo = mid + 1;
353                } else {
354                    hi = mid;
355                }
356            } else {
357                break;
358            }
359        }
360
361        if lo < self.len {
362            self.get(lo).map(|v| (lo, v))
363        } else {
364            None
365        }
366    }
367
368    /// Serialize partition
369    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
370        writer.write_u32::<LittleEndian>(self.len)?;
371        writer.write_u32::<LittleEndian>(self.first_value)?;
372        writer.write_u32::<LittleEndian>(self.last_value)?;
373        writer.write_u32::<LittleEndian>(self.local_universe)?;
374        writer.write_u8(self.lower_bit_width)?;
375
376        writer.write_u32::<LittleEndian>(self.lower_bits.len() as u32)?;
377        for &word in &self.lower_bits {
378            writer.write_u64::<LittleEndian>(word)?;
379        }
380
381        writer.write_u32::<LittleEndian>(self.upper_bits.len() as u32)?;
382        for &word in &self.upper_bits {
383            writer.write_u64::<LittleEndian>(word)?;
384        }
385
386        Ok(())
387    }
388
389    /// Deserialize partition
390    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
391        let len = reader.read_u32::<LittleEndian>()?;
392        let first_value = reader.read_u32::<LittleEndian>()?;
393        let last_value = reader.read_u32::<LittleEndian>()?;
394        let local_universe = reader.read_u32::<LittleEndian>()?;
395        let lower_bit_width = reader.read_u8()?;
396
397        let lower_len = reader.read_u32::<LittleEndian>()? as usize;
398        let mut lower_bits = Vec::with_capacity(lower_len);
399        for _ in 0..lower_len {
400            lower_bits.push(reader.read_u64::<LittleEndian>()?);
401        }
402
403        let upper_len = reader.read_u32::<LittleEndian>()? as usize;
404        let mut upper_bits = Vec::with_capacity(upper_len);
405        for _ in 0..upper_len {
406            upper_bits.push(reader.read_u64::<LittleEndian>()?);
407        }
408
409        Ok(Self {
410            lower_bits,
411            upper_bits,
412            len,
413            first_value,
414            last_value,
415            local_universe,
416            lower_bit_width,
417        })
418    }
419
420    /// Size in bytes
421    pub fn size_bytes(&self) -> usize {
422        17 + self.lower_bits.len() * 8 + self.upper_bits.len() * 8
423    }
424}
425
426/// Estimate encoding cost for a partition
427fn estimate_partition_cost(values: &[u32]) -> usize {
428    if values.is_empty() {
429        return 0;
430    }
431
432    let n = values.len();
433    let first = values[0];
434    let last = *values.last().unwrap();
435    let local_universe = (last - first + 1) as usize;
436
437    // Lower bits: n * log2(universe/n)
438    let lower_bits = if n <= 1 || local_universe <= n {
439        0
440    } else {
441        let ratio = local_universe / n;
442        let l = (usize::BITS - ratio.leading_zeros()) as usize;
443        n * l
444    };
445
446    // Upper bits: approximately 2n bits
447    let upper_bits = 2 * n;
448
449    // Overhead: partition header
450    let overhead = 17 * 8; // 17 bytes header
451
452    (lower_bits + upper_bits + overhead).div_ceil(8)
453}
454
455/// Find optimal partition boundaries using dynamic programming
456/// Returns partition endpoints (exclusive)
457fn find_optimal_partitions(values: &[u32]) -> Vec<usize> {
458    let n = values.len();
459    if n <= MIN_PARTITION_SIZE {
460        return vec![n];
461    }
462
463    // dp[i] = minimum cost to encode values[0..i]
464    // parent[i] = start of last partition ending at i
465    let mut dp = vec![usize::MAX; n + 1];
466    let mut parent = vec![0usize; n + 1];
467    dp[0] = 0;
468
469    for i in MIN_PARTITION_SIZE..=n {
470        // Try all valid partition sizes ending at i
471        let min_start = i.saturating_sub(MAX_PARTITION_SIZE);
472        let max_start = i.saturating_sub(MIN_PARTITION_SIZE);
473
474        for start in min_start..=max_start {
475            if dp[start] == usize::MAX {
476                continue;
477            }
478
479            let partition_cost = estimate_partition_cost(&values[start..i]);
480            let total_cost = dp[start].saturating_add(partition_cost);
481
482            if total_cost < dp[i] {
483                dp[i] = total_cost;
484                parent[i] = start;
485            }
486        }
487    }
488
489    // Handle case where n is not reachable (too small for partitioning)
490    if dp[n] == usize::MAX {
491        return vec![n];
492    }
493
494    // Reconstruct partition boundaries
495    let mut boundaries = Vec::new();
496    let mut pos = n;
497    while pos > 0 {
498        boundaries.push(pos);
499        pos = parent[pos];
500    }
501    boundaries.reverse();
502
503    boundaries
504}
505
506/// Partitioned Elias-Fano encoded sequence
507#[derive(Debug, Clone)]
508pub struct PartitionedEliasFano {
509    /// Individual partitions
510    partitions: Vec<EFPartition>,
511    /// Total number of elements
512    len: u32,
513    /// Cumulative element counts for each partition (for position lookup)
514    cumulative_counts: Vec<u32>,
515}
516
517impl PartitionedEliasFano {
518    /// Create from sorted slice with optimal partitioning
519    pub fn from_sorted_slice(values: &[u32]) -> Self {
520        if values.is_empty() {
521            return Self {
522                partitions: Vec::new(),
523                len: 0,
524                cumulative_counts: Vec::new(),
525            };
526        }
527
528        let boundaries = find_optimal_partitions(values);
529        let mut partitions = Vec::with_capacity(boundaries.len());
530        let mut cumulative_counts = Vec::with_capacity(boundaries.len());
531
532        let mut start = 0;
533        let mut cumulative = 0u32;
534
535        for &end in &boundaries {
536            let partition = EFPartition::from_sorted_slice(&values[start..end]);
537            cumulative += partition.len;
538            cumulative_counts.push(cumulative);
539            partitions.push(partition);
540            start = end;
541        }
542
543        Self {
544            partitions,
545            len: values.len() as u32,
546            cumulative_counts,
547        }
548    }
549
550    /// Number of elements
551    #[inline]
552    pub fn len(&self) -> u32 {
553        self.len
554    }
555
556    /// Check if empty
557    #[inline]
558    pub fn is_empty(&self) -> bool {
559        self.len == 0
560    }
561
562    /// Number of partitions
563    pub fn num_partitions(&self) -> usize {
564        self.partitions.len()
565    }
566
567    /// Get element at global position
568    pub fn get(&self, pos: u32) -> Option<u32> {
569        if pos >= self.len {
570            return None;
571        }
572
573        // Find partition containing this position
574        let partition_idx = self
575            .cumulative_counts
576            .binary_search(&(pos + 1))
577            .unwrap_or_else(|x| x);
578
579        if partition_idx >= self.partitions.len() {
580            return None;
581        }
582
583        let local_pos = if partition_idx == 0 {
584            pos
585        } else {
586            pos - self.cumulative_counts[partition_idx - 1]
587        };
588
589        self.partitions[partition_idx].get(local_pos)
590    }
591
592    /// Find first element >= target
593    /// Returns (global_position, value) or None
594    pub fn next_geq(&self, target: u32) -> Option<(u32, u32)> {
595        if self.partitions.is_empty() {
596            return None;
597        }
598
599        // Binary search for partition containing target
600        let partition_idx = self
601            .partitions
602            .binary_search_by(|p| {
603                if p.last_value < target {
604                    std::cmp::Ordering::Less
605                } else if p.first_value > target {
606                    std::cmp::Ordering::Greater
607                } else {
608                    std::cmp::Ordering::Equal
609                }
610            })
611            .unwrap_or_else(|x| x);
612
613        // Search from this partition onwards
614        for (i, partition) in self.partitions[partition_idx..].iter().enumerate() {
615            let actual_idx = partition_idx + i;
616
617            if let Some((local_pos, val)) = partition.next_geq(target) {
618                let global_pos = if actual_idx == 0 {
619                    local_pos
620                } else {
621                    self.cumulative_counts[actual_idx - 1] + local_pos
622                };
623                return Some((global_pos, val));
624            }
625        }
626
627        None
628    }
629
630    /// Serialize
631    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
632        writer.write_u32::<LittleEndian>(self.len)?;
633        writer.write_u32::<LittleEndian>(self.partitions.len() as u32)?;
634
635        for &count in &self.cumulative_counts {
636            writer.write_u32::<LittleEndian>(count)?;
637        }
638
639        for partition in &self.partitions {
640            partition.serialize(writer)?;
641        }
642
643        Ok(())
644    }
645
646    /// Deserialize
647    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
648        let len = reader.read_u32::<LittleEndian>()?;
649        let num_partitions = reader.read_u32::<LittleEndian>()? as usize;
650
651        let mut cumulative_counts = Vec::with_capacity(num_partitions);
652        for _ in 0..num_partitions {
653            cumulative_counts.push(reader.read_u32::<LittleEndian>()?);
654        }
655
656        let mut partitions = Vec::with_capacity(num_partitions);
657        for _ in 0..num_partitions {
658            partitions.push(EFPartition::deserialize(reader)?);
659        }
660
661        Ok(Self {
662            partitions,
663            len,
664            cumulative_counts,
665        })
666    }
667
668    /// Total size in bytes
669    pub fn size_bytes(&self) -> usize {
670        let mut size = 8 + self.cumulative_counts.len() * 4;
671        for p in &self.partitions {
672            size += p.size_bytes();
673        }
674        size
675    }
676
677    /// Create iterator
678    pub fn iter(&self) -> PartitionedEFIterator<'_> {
679        PartitionedEFIterator {
680            pef: self,
681            partition_idx: 0,
682            local_pos: 0,
683        }
684    }
685}
686
687/// Iterator over Partitioned Elias-Fano
688pub struct PartitionedEFIterator<'a> {
689    pef: &'a PartitionedEliasFano,
690    partition_idx: usize,
691    local_pos: u32,
692}
693
694impl<'a> Iterator for PartitionedEFIterator<'a> {
695    type Item = u32;
696
697    fn next(&mut self) -> Option<Self::Item> {
698        if self.partition_idx >= self.pef.partitions.len() {
699            return None;
700        }
701
702        let partition = &self.pef.partitions[self.partition_idx];
703        if let Some(val) = partition.get(self.local_pos) {
704            self.local_pos += 1;
705            if self.local_pos >= partition.len {
706                self.partition_idx += 1;
707                self.local_pos = 0;
708            }
709            Some(val)
710        } else {
711            None
712        }
713    }
714
715    fn size_hint(&self) -> (usize, Option<usize>) {
716        let current_global = if self.partition_idx == 0 {
717            self.local_pos
718        } else if self.partition_idx < self.pef.cumulative_counts.len() {
719            self.pef.cumulative_counts[self.partition_idx - 1] + self.local_pos
720        } else {
721            self.pef.len
722        };
723        let remaining = (self.pef.len - current_global) as usize;
724        (remaining, Some(remaining))
725    }
726}
727
728/// Block metadata for BlockMax WAND
729#[derive(Debug, Clone)]
730pub struct PEFBlockInfo {
731    /// First document ID in block
732    pub first_doc_id: u32,
733    /// Last document ID in block
734    pub last_doc_id: u32,
735    /// Maximum term frequency in block
736    pub max_tf: u32,
737    /// Maximum BM25 score upper bound
738    pub max_block_score: f32,
739    /// Starting partition index
740    pub partition_idx: u16,
741    /// Starting position within partition
742    pub local_start: u32,
743    /// Number of documents in block
744    pub num_docs: u16,
745}
746
747impl PEFBlockInfo {
748    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
749        writer.write_u32::<LittleEndian>(self.first_doc_id)?;
750        writer.write_u32::<LittleEndian>(self.last_doc_id)?;
751        writer.write_u32::<LittleEndian>(self.max_tf)?;
752        writer.write_f32::<LittleEndian>(self.max_block_score)?;
753        writer.write_u16::<LittleEndian>(self.partition_idx)?;
754        writer.write_u32::<LittleEndian>(self.local_start)?;
755        writer.write_u16::<LittleEndian>(self.num_docs)?;
756        Ok(())
757    }
758
759    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
760        Ok(Self {
761            first_doc_id: reader.read_u32::<LittleEndian>()?,
762            last_doc_id: reader.read_u32::<LittleEndian>()?,
763            max_tf: reader.read_u32::<LittleEndian>()?,
764            max_block_score: reader.read_f32::<LittleEndian>()?,
765            partition_idx: reader.read_u16::<LittleEndian>()?,
766            local_start: reader.read_u32::<LittleEndian>()?,
767            num_docs: reader.read_u16::<LittleEndian>()?,
768        })
769    }
770}
771
772/// Block size for BlockMax (matches other formats)
773pub const PEF_BLOCK_SIZE: usize = 128;
774
775/// Partitioned Elias-Fano posting list with term frequencies and BlockMax
776#[derive(Debug, Clone)]
777pub struct PartitionedEFPostingList {
778    /// Document IDs (Partitioned Elias-Fano encoded)
779    pub doc_ids: PartitionedEliasFano,
780    /// Term frequencies (packed)
781    pub term_freqs: Vec<u8>,
782    /// Bits per term frequency
783    pub tf_bits: u8,
784    /// Maximum term frequency
785    pub max_tf: u32,
786    /// Block metadata for BlockMax WAND
787    pub blocks: Vec<PEFBlockInfo>,
788    /// Global maximum score
789    pub max_score: f32,
790}
791
792impl PartitionedEFPostingList {
793    const K1: f32 = 1.2;
794    const B: f32 = 0.75;
795
796    #[inline]
797    pub fn compute_bm25_upper_bound(max_tf: u32, idf: f32) -> f32 {
798        let tf = max_tf as f32;
799        let min_length_norm = 1.0 - Self::B;
800        let tf_norm = (tf * (Self::K1 + 1.0)) / (tf + Self::K1 * min_length_norm);
801        idf * tf_norm
802    }
803
804    /// Create from postings
805    pub fn from_postings(doc_ids: &[u32], term_freqs: &[u32]) -> Self {
806        Self::from_postings_with_idf(doc_ids, term_freqs, 1.0)
807    }
808
809    /// Create from postings with IDF for block-max scores
810    pub fn from_postings_with_idf(doc_ids: &[u32], term_freqs: &[u32], idf: f32) -> Self {
811        assert_eq!(doc_ids.len(), term_freqs.len());
812
813        if doc_ids.is_empty() {
814            return Self {
815                doc_ids: PartitionedEliasFano::from_sorted_slice(&[]),
816                term_freqs: Vec::new(),
817                tf_bits: 0,
818                max_tf: 0,
819                blocks: Vec::new(),
820                max_score: 0.0,
821            };
822        }
823
824        let pef_doc_ids = PartitionedEliasFano::from_sorted_slice(doc_ids);
825
826        // Find max TF
827        let max_tf = *term_freqs.iter().max().unwrap();
828        let tf_bits = if max_tf == 0 {
829            0
830        } else {
831            (32 - max_tf.leading_zeros()) as u8
832        };
833
834        // Pack term frequencies
835        let total_bits = doc_ids.len() * (tf_bits as usize);
836        let total_bytes = total_bits.div_ceil(8);
837        let mut packed_tfs = vec![0u8; total_bytes];
838
839        if tf_bits > 0 {
840            for (i, &tf) in term_freqs.iter().enumerate() {
841                let bit_pos = i * (tf_bits as usize);
842                let byte_idx = bit_pos / 8;
843                let bit_offset = bit_pos % 8;
844
845                let val = tf.saturating_sub(1);
846
847                packed_tfs[byte_idx] |= (val as u8) << bit_offset;
848                if bit_offset + (tf_bits as usize) > 8 && byte_idx + 1 < packed_tfs.len() {
849                    packed_tfs[byte_idx + 1] |= (val >> (8 - bit_offset)) as u8;
850                }
851                if bit_offset + (tf_bits as usize) > 16 && byte_idx + 2 < packed_tfs.len() {
852                    packed_tfs[byte_idx + 2] |= (val >> (16 - bit_offset)) as u8;
853                }
854            }
855        }
856
857        // Build block metadata
858        let mut blocks = Vec::new();
859        let mut max_score = 0.0f32;
860        let mut i = 0;
861
862        while i < doc_ids.len() {
863            let block_end = (i + PEF_BLOCK_SIZE).min(doc_ids.len());
864            let block_doc_ids = &doc_ids[i..block_end];
865            let block_tfs = &term_freqs[i..block_end];
866
867            let block_max_tf = *block_tfs.iter().max().unwrap_or(&1);
868            let block_score = Self::compute_bm25_upper_bound(block_max_tf, idf);
869            max_score = max_score.max(block_score);
870
871            // Find partition info for this block start
872            let (partition_idx, local_start) =
873                Self::find_partition_position(&pef_doc_ids, i as u32);
874
875            blocks.push(PEFBlockInfo {
876                first_doc_id: block_doc_ids[0],
877                last_doc_id: *block_doc_ids.last().unwrap(),
878                max_tf: block_max_tf,
879                max_block_score: block_score,
880                partition_idx: partition_idx as u16,
881                local_start,
882                num_docs: (block_end - i) as u16,
883            });
884
885            i = block_end;
886        }
887
888        Self {
889            doc_ids: pef_doc_ids,
890            term_freqs: packed_tfs,
891            tf_bits,
892            max_tf,
893            blocks,
894            max_score,
895        }
896    }
897
898    fn find_partition_position(pef: &PartitionedEliasFano, global_pos: u32) -> (usize, u32) {
899        let partition_idx = pef
900            .cumulative_counts
901            .binary_search(&(global_pos + 1))
902            .unwrap_or_else(|x| x);
903
904        let local_pos = if partition_idx == 0 {
905            global_pos
906        } else {
907            global_pos - pef.cumulative_counts[partition_idx - 1]
908        };
909
910        (partition_idx, local_pos)
911    }
912
913    /// Get term frequency at position
914    pub fn get_tf(&self, pos: u32) -> u32 {
915        if self.tf_bits == 0 || pos >= self.doc_ids.len() {
916            return 1;
917        }
918
919        let bit_pos = (pos as usize) * (self.tf_bits as usize);
920        let byte_idx = bit_pos / 8;
921        let bit_offset = bit_pos % 8;
922        let mask = (1u32 << self.tf_bits) - 1;
923
924        let mut val = (self.term_freqs[byte_idx] >> bit_offset) as u32;
925        if bit_offset + (self.tf_bits as usize) > 8 && byte_idx + 1 < self.term_freqs.len() {
926            val |= (self.term_freqs[byte_idx + 1] as u32) << (8 - bit_offset);
927        }
928        if bit_offset + (self.tf_bits as usize) > 16 && byte_idx + 2 < self.term_freqs.len() {
929            val |= (self.term_freqs[byte_idx + 2] as u32) << (16 - bit_offset);
930        }
931
932        (val & mask) + 1
933    }
934
935    /// Number of documents
936    pub fn len(&self) -> u32 {
937        self.doc_ids.len()
938    }
939
940    /// Check if empty
941    pub fn is_empty(&self) -> bool {
942        self.doc_ids.is_empty()
943    }
944
945    /// Serialize
946    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
947        self.doc_ids.serialize(writer)?;
948        writer.write_u8(self.tf_bits)?;
949        writer.write_u32::<LittleEndian>(self.max_tf)?;
950        writer.write_f32::<LittleEndian>(self.max_score)?;
951        writer.write_u32::<LittleEndian>(self.term_freqs.len() as u32)?;
952        writer.write_all(&self.term_freqs)?;
953
954        writer.write_u32::<LittleEndian>(self.blocks.len() as u32)?;
955        for block in &self.blocks {
956            block.serialize(writer)?;
957        }
958
959        Ok(())
960    }
961
962    /// Deserialize
963    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
964        let doc_ids = PartitionedEliasFano::deserialize(reader)?;
965        let tf_bits = reader.read_u8()?;
966        let max_tf = reader.read_u32::<LittleEndian>()?;
967        let max_score = reader.read_f32::<LittleEndian>()?;
968        let tf_len = reader.read_u32::<LittleEndian>()? as usize;
969        let mut term_freqs = vec![0u8; tf_len];
970        reader.read_exact(&mut term_freqs)?;
971
972        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
973        let mut blocks = Vec::with_capacity(num_blocks);
974        for _ in 0..num_blocks {
975            blocks.push(PEFBlockInfo::deserialize(reader)?);
976        }
977
978        Ok(Self {
979            doc_ids,
980            term_freqs,
981            tf_bits,
982            max_tf,
983            blocks,
984            max_score,
985        })
986    }
987
988    /// Create iterator
989    pub fn iterator(&self) -> PartitionedEFPostingIterator<'_> {
990        PartitionedEFPostingIterator {
991            list: self,
992            pos: 0,
993            current_block: 0,
994        }
995    }
996
997    /// Get compression ratio compared to standard EF
998    pub fn compression_info(&self) -> (usize, usize) {
999        let pef_size = self.doc_ids.size_bytes();
1000        // Estimate standard EF size
1001        let n = self.len() as usize;
1002        let max_val = if let Some(last_block) = self.blocks.last() {
1003            last_block.last_doc_id
1004        } else {
1005            0
1006        };
1007        let ef_size = if n > 0 {
1008            let l = if n <= 1 {
1009                0
1010            } else {
1011                let ratio = (max_val as usize + 1) / n;
1012                if ratio <= 1 {
1013                    0
1014                } else {
1015                    (usize::BITS - ratio.leading_zeros()) as usize
1016                }
1017            };
1018            (n * l + 2 * n).div_ceil(8) + 16
1019        } else {
1020            0
1021        };
1022        (pef_size, ef_size)
1023    }
1024}
1025
1026/// Iterator over Partitioned EF posting list
1027pub struct PartitionedEFPostingIterator<'a> {
1028    list: &'a PartitionedEFPostingList,
1029    pos: u32,
1030    current_block: usize,
1031}
1032
1033impl<'a> PartitionedEFPostingIterator<'a> {
1034    /// Current document ID
1035    pub fn doc(&self) -> u32 {
1036        self.list.doc_ids.get(self.pos).unwrap_or(u32::MAX)
1037    }
1038
1039    /// Current term frequency
1040    pub fn term_freq(&self) -> u32 {
1041        self.list.get_tf(self.pos)
1042    }
1043
1044    /// Advance to next document
1045    pub fn advance(&mut self) -> u32 {
1046        self.pos += 1;
1047        if !self.list.blocks.is_empty() {
1048            let new_block = (self.pos as usize) / PEF_BLOCK_SIZE;
1049            if new_block < self.list.blocks.len() {
1050                self.current_block = new_block;
1051            }
1052        }
1053        self.doc()
1054    }
1055
1056    /// Seek to first doc >= target
1057    pub fn seek(&mut self, target: u32) -> u32 {
1058        // Use block skip list
1059        if !self.list.blocks.is_empty() {
1060            let block_idx = self.list.blocks[self.current_block..].binary_search_by(|b| {
1061                if b.last_doc_id < target {
1062                    std::cmp::Ordering::Less
1063                } else if b.first_doc_id > target {
1064                    std::cmp::Ordering::Greater
1065                } else {
1066                    std::cmp::Ordering::Equal
1067                }
1068            });
1069
1070            let target_block = match block_idx {
1071                Ok(idx) => self.current_block + idx,
1072                Err(idx) => {
1073                    let abs_idx = self.current_block + idx;
1074                    if abs_idx >= self.list.blocks.len() {
1075                        self.pos = self.list.len();
1076                        return u32::MAX;
1077                    }
1078                    abs_idx
1079                }
1080            };
1081
1082            if target_block > self.current_block {
1083                self.current_block = target_block;
1084                self.pos = (target_block * PEF_BLOCK_SIZE) as u32;
1085            }
1086        }
1087
1088        // Use PEF's next_geq
1089        if let Some((pos, val)) = self.list.doc_ids.next_geq(target)
1090            && pos >= self.pos
1091        {
1092            self.pos = pos;
1093            if !self.list.blocks.is_empty() {
1094                self.current_block = (pos as usize) / PEF_BLOCK_SIZE;
1095            }
1096            return val;
1097        }
1098
1099        // Linear scan fallback
1100        while self.pos < self.list.len() {
1101            let doc = self.doc();
1102            if doc >= target {
1103                return doc;
1104            }
1105            self.pos += 1;
1106        }
1107
1108        u32::MAX
1109    }
1110
1111    /// Check if exhausted
1112    pub fn is_exhausted(&self) -> bool {
1113        self.pos >= self.list.len()
1114    }
1115
1116    /// Current block's max score
1117    pub fn current_block_max_score(&self) -> f32 {
1118        if self.is_exhausted() || self.list.blocks.is_empty() {
1119            return 0.0;
1120        }
1121        if self.current_block < self.list.blocks.len() {
1122            self.list.blocks[self.current_block].max_block_score
1123        } else {
1124            0.0
1125        }
1126    }
1127
1128    /// Current block's max TF
1129    pub fn current_block_max_tf(&self) -> u32 {
1130        if self.is_exhausted() || self.list.blocks.is_empty() {
1131            return 0;
1132        }
1133        if self.current_block < self.list.blocks.len() {
1134            self.list.blocks[self.current_block].max_tf
1135        } else {
1136            0
1137        }
1138    }
1139
1140    /// Max score for remaining blocks
1141    pub fn max_remaining_score(&self) -> f32 {
1142        if self.is_exhausted() || self.list.blocks.is_empty() {
1143            return 0.0;
1144        }
1145        self.list.blocks[self.current_block..]
1146            .iter()
1147            .map(|b| b.max_block_score)
1148            .fold(0.0f32, |a, b| a.max(b))
1149    }
1150
1151    /// Skip to next block containing doc >= target (for BlockWAND)
1152    /// Returns (first_doc_in_block, block_max_score) or None if exhausted
1153    pub fn skip_to_block_with_doc(&mut self, target: u32) -> Option<(u32, f32)> {
1154        if self.list.blocks.is_empty() {
1155            return None;
1156        }
1157
1158        while self.current_block < self.list.blocks.len() {
1159            let block = &self.list.blocks[self.current_block];
1160            if block.last_doc_id >= target {
1161                self.pos = (self.current_block * PEF_BLOCK_SIZE) as u32;
1162                return Some((block.first_doc_id, block.max_block_score));
1163            }
1164            self.current_block += 1;
1165        }
1166
1167        self.pos = self.list.len();
1168        None
1169    }
1170}
1171
1172#[cfg(test)]
1173mod tests {
1174    use super::*;
1175
1176    #[test]
1177    fn test_ef_partition_basic() {
1178        let values = vec![10, 20, 30, 40, 50];
1179        let partition = EFPartition::from_sorted_slice(&values);
1180
1181        assert_eq!(partition.len, 5);
1182        assert_eq!(partition.first_value, 10);
1183        assert_eq!(partition.last_value, 50);
1184
1185        for (i, &expected) in values.iter().enumerate() {
1186            assert_eq!(partition.get(i as u32), Some(expected));
1187        }
1188    }
1189
1190    #[test]
1191    fn test_ef_partition_next_geq() {
1192        let values = vec![10, 20, 30, 100, 200, 300];
1193        let partition = EFPartition::from_sorted_slice(&values);
1194
1195        assert_eq!(partition.next_geq(5), Some((0, 10)));
1196        assert_eq!(partition.next_geq(10), Some((0, 10)));
1197        assert_eq!(partition.next_geq(15), Some((1, 20)));
1198        assert_eq!(partition.next_geq(100), Some((3, 100)));
1199        assert_eq!(partition.next_geq(301), None);
1200    }
1201
1202    #[test]
1203    fn test_partitioned_ef_basic() {
1204        let values: Vec<u32> = (0..500).map(|i| i * 2).collect();
1205        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1206
1207        assert_eq!(pef.len(), 500);
1208        assert!(pef.num_partitions() >= 1);
1209
1210        for (i, &expected) in values.iter().enumerate() {
1211            assert_eq!(pef.get(i as u32), Some(expected), "Mismatch at {}", i);
1212        }
1213    }
1214
1215    #[test]
1216    fn test_partitioned_ef_next_geq() {
1217        let values: Vec<u32> = (0..1000).map(|i| i * 3).collect();
1218        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1219
1220        assert_eq!(pef.next_geq(0), Some((0, 0)));
1221        assert_eq!(pef.next_geq(100), Some((34, 102))); // 100/3 = 33.33, next is 34*3=102
1222        assert_eq!(pef.next_geq(1500), Some((500, 1500)));
1223        assert_eq!(pef.next_geq(3000), None);
1224    }
1225
1226    #[test]
1227    fn test_partitioned_ef_serialization() {
1228        let values: Vec<u32> = (0..500).map(|i| i * 5).collect();
1229        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1230
1231        let mut buffer = Vec::new();
1232        pef.serialize(&mut buffer).unwrap();
1233
1234        let restored = PartitionedEliasFano::deserialize(&mut &buffer[..]).unwrap();
1235
1236        assert_eq!(restored.len(), pef.len());
1237        assert_eq!(restored.num_partitions(), pef.num_partitions());
1238
1239        for i in 0..pef.len() {
1240            assert_eq!(restored.get(i), pef.get(i));
1241        }
1242    }
1243
1244    #[test]
1245    fn test_partitioned_ef_posting_list() {
1246        let doc_ids: Vec<u32> = (0..300).map(|i| i * 2).collect();
1247        let term_freqs: Vec<u32> = (0..300).map(|i| (i % 10) + 1).collect();
1248
1249        let list = PartitionedEFPostingList::from_postings(&doc_ids, &term_freqs);
1250
1251        assert_eq!(list.len(), 300);
1252
1253        let mut iter = list.iterator();
1254        for (i, (&expected_doc, &expected_tf)) in doc_ids.iter().zip(term_freqs.iter()).enumerate()
1255        {
1256            assert_eq!(iter.doc(), expected_doc, "Doc mismatch at {}", i);
1257            assert_eq!(iter.term_freq(), expected_tf, "TF mismatch at {}", i);
1258            iter.advance();
1259        }
1260    }
1261
1262    #[test]
1263    fn test_partitioned_ef_seek() {
1264        let doc_ids: Vec<u32> = (0..500).map(|i| i * 3).collect();
1265        let term_freqs: Vec<u32> = vec![1; 500];
1266
1267        let list = PartitionedEFPostingList::from_postings(&doc_ids, &term_freqs);
1268        let mut iter = list.iterator();
1269
1270        assert_eq!(iter.seek(100), 102); // 100/3 = 33.33, next is 34*3=102
1271        assert_eq!(iter.seek(300), 300);
1272        assert_eq!(iter.seek(1500), u32::MAX);
1273    }
1274
1275    #[test]
1276    fn test_compression_improvement() {
1277        // Test that PEF achieves better compression than standard EF
1278        // on data with varying density
1279        let values: Vec<u32> = (0..10000)
1280            .map(|i| {
1281                if i < 5000 {
1282                    i * 2 // Dense region
1283                } else {
1284                    10000 + (i - 5000) * 100 // Sparse region
1285                }
1286            })
1287            .collect();
1288
1289        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1290
1291        // PEF should use multiple partitions for this mixed-density data
1292        assert!(
1293            pef.num_partitions() > 1,
1294            "Expected multiple partitions, got {}",
1295            pef.num_partitions()
1296        );
1297
1298        // Verify correctness
1299        for (i, &expected) in values.iter().enumerate() {
1300            assert_eq!(pef.get(i as u32), Some(expected));
1301        }
1302    }
1303
1304    #[test]
1305    fn test_partitioned_ef_block_max() {
1306        // Create a large posting list that spans multiple blocks
1307        let doc_ids: Vec<u32> = (0..500).map(|i| i * 2).collect();
1308        // Vary term frequencies so different blocks have different max_tf
1309        let term_freqs: Vec<u32> = (0..500)
1310            .map(|i| {
1311                if i < 128 {
1312                    1 // Block 0: max_tf = 1
1313                } else if i < 256 {
1314                    5 // Block 1: max_tf = 5
1315                } else if i < 384 {
1316                    10 // Block 2: max_tf = 10
1317                } else {
1318                    3 // Block 3: max_tf = 3
1319                }
1320            })
1321            .collect();
1322
1323        let list = PartitionedEFPostingList::from_postings_with_idf(&doc_ids, &term_freqs, 2.0);
1324
1325        // Should have 4 blocks (500 docs / 128 per block)
1326        assert_eq!(list.blocks.len(), 4);
1327        assert_eq!(list.blocks[0].max_tf, 1);
1328        assert_eq!(list.blocks[1].max_tf, 5);
1329        assert_eq!(list.blocks[2].max_tf, 10);
1330        assert_eq!(list.blocks[3].max_tf, 3);
1331
1332        // Block 2 should have highest score (max_tf = 10)
1333        assert!(list.blocks[2].max_block_score > list.blocks[0].max_block_score);
1334        assert!(list.blocks[2].max_block_score > list.blocks[1].max_block_score);
1335        assert!(list.blocks[2].max_block_score > list.blocks[3].max_block_score);
1336
1337        // Global max_score should equal block 2's score
1338        assert_eq!(list.max_score, list.blocks[2].max_block_score);
1339
1340        // Test iterator block-max methods
1341        let mut iter = list.iterator();
1342        assert_eq!(iter.current_block_max_tf(), 1); // Block 0
1343
1344        // Seek to block 1
1345        iter.seek(256); // first doc in block 1
1346        assert_eq!(iter.current_block_max_tf(), 5);
1347
1348        // Seek to block 2
1349        iter.seek(512); // first doc in block 2
1350        assert_eq!(iter.current_block_max_tf(), 10);
1351
1352        // Test skip_to_block_with_doc
1353        let mut iter2 = list.iterator();
1354        let result = iter2.skip_to_block_with_doc(300);
1355        assert!(result.is_some());
1356        let (first_doc, score) = result.unwrap();
1357        assert!(first_doc <= 300);
1358        assert!(score > 0.0);
1359
1360        // Test max_remaining_score
1361        let mut iter3 = list.iterator();
1362        let max_score = iter3.max_remaining_score();
1363        assert_eq!(max_score, list.max_score);
1364
1365        // After seeking past block 2, max_remaining should be lower
1366        iter3.seek(768); // Block 3
1367        let remaining = iter3.max_remaining_score();
1368        assert!(remaining < max_score);
1369    }
1370}