Skip to main content

hermes_core/structures/postings/
partitioned_ef.rs

1//! Partitioned Elias-Fano (PEF) encoding
2//!
3//! Based on Ottaviano & Venturini (2014) "Partitioned Elias-Fano Indexes"
4//!
5//! Key improvements over standard Elias-Fano:
6//! - **Optimal partitioning**: Divides sequence into chunks with (1+ε)-optimal compression
7//! - **Better compression**: 30-40% smaller than standard EF on typical data
8//! - **Fast random access**: O(1) access within partitions
9//! - **Efficient NextGEQ**: Uses partition endpoints for fast seeking
10//!
11//! The algorithm finds optimal partition boundaries that minimize total encoding size.
12//! Each partition is encoded with its own Elias-Fano instance using local parameters.
13
14use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
15use std::io::{self, Read, Write};
16
17/// Minimum partition size (smaller partitions have too much overhead)
18const MIN_PARTITION_SIZE: usize = 64;
19
20/// Maximum partition size (larger partitions lose locality benefits)
21const MAX_PARTITION_SIZE: usize = 512;
22
23/// A single partition in the PEF structure
24#[derive(Debug, Clone)]
25pub struct EFPartition {
26    /// Lower bits array
27    lower_bits: Vec<u64>,
28    /// Upper bits array (unary encoded)
29    upper_bits: Vec<u64>,
30    /// Number of elements in this partition
31    len: u32,
32    /// First value in partition (absolute)
33    first_value: u32,
34    /// Last value in partition (absolute)
35    last_value: u32,
36    /// Number of lower bits per element
37    lower_bit_width: u8,
38}
39
40impl EFPartition {
41    /// Create a partition from a sorted slice
42    /// Values are stored relative to first_value for better compression
43    pub fn from_sorted_slice(values: &[u32]) -> Self {
44        if values.is_empty() {
45            return Self {
46                lower_bits: Vec::new(),
47                upper_bits: Vec::new(),
48                len: 0,
49                first_value: 0,
50                last_value: 0,
51                lower_bit_width: 0,
52            };
53        }
54
55        let first_value = values[0];
56        let last_value = *values.last().unwrap();
57        let n = values.len() as u64;
58
59        // Local universe: encode values relative to first_value
60        let local_universe = last_value - first_value + 1;
61
62        // Calculate lower bit width using local universe
63        let lower_bit_width = if n <= 1 {
64            0
65        } else {
66            let ratio = (local_universe as u64).max(1) / n.max(1);
67            if ratio <= 1 {
68                0
69            } else {
70                (64 - ratio.leading_zeros() - 1) as u8
71            }
72        };
73
74        // Allocate arrays
75        let lower_bits_total = (n as usize) * (lower_bit_width as usize);
76        let lower_words = lower_bits_total.div_ceil(64);
77        let mut lower_bits = vec![0u64; lower_words];
78
79        let max_relative = local_universe.saturating_sub(1) as u64;
80        let upper_bound = n + (max_relative >> lower_bit_width) + 1;
81        let upper_words = (upper_bound as usize).div_ceil(64);
82        let mut upper_bits = vec![0u64; upper_words];
83
84        let lower_mask = if lower_bit_width == 0 {
85            0
86        } else {
87            (1u64 << lower_bit_width) - 1
88        };
89
90        // Encode each value relative to first_value
91        for (i, &val) in values.iter().enumerate() {
92            let relative_val = (val - first_value) as u64;
93
94            // Store lower bits
95            if lower_bit_width > 0 {
96                let lower = relative_val & lower_mask;
97                let bit_pos = i * (lower_bit_width as usize);
98                let word_idx = bit_pos / 64;
99                let bit_offset = bit_pos % 64;
100
101                lower_bits[word_idx] |= lower << bit_offset;
102                if bit_offset + (lower_bit_width as usize) > 64 && word_idx + 1 < lower_bits.len() {
103                    lower_bits[word_idx + 1] |= lower >> (64 - bit_offset);
104                }
105            }
106
107            // Store upper bits
108            let upper = relative_val >> lower_bit_width;
109            let upper_pos = (i as u64) + upper;
110            let word_idx = (upper_pos / 64) as usize;
111            let bit_offset = upper_pos % 64;
112            if word_idx < upper_bits.len() {
113                upper_bits[word_idx] |= 1u64 << bit_offset;
114            }
115        }
116
117        Self {
118            lower_bits,
119            upper_bits,
120            len: values.len() as u32,
121            first_value,
122            last_value,
123            lower_bit_width,
124        }
125    }
126
127    /// Get element at position i (0-indexed)
128    #[inline]
129    pub fn get(&self, i: u32) -> Option<u32> {
130        if i >= self.len {
131            return None;
132        }
133
134        let i = i as usize;
135
136        // Get lower bits
137        let lower = if self.lower_bit_width == 0 {
138            0u64
139        } else {
140            let bit_pos = i * (self.lower_bit_width as usize);
141            let word_idx = bit_pos / 64;
142            let bit_offset = bit_pos % 64;
143            let lower_mask = (1u64 << self.lower_bit_width) - 1;
144
145            let mut val = (self.lower_bits[word_idx] >> bit_offset) & lower_mask;
146            if bit_offset + (self.lower_bit_width as usize) > 64
147                && word_idx + 1 < self.lower_bits.len()
148            {
149                val |= (self.lower_bits[word_idx + 1] << (64 - bit_offset)) & lower_mask;
150            }
151            val
152        };
153
154        // Get upper bits via select1(i) - i
155        let select_pos = self.select1(i as u32)?;
156        let upper = (select_pos as u64) - (i as u64);
157
158        // Reconstruct absolute value
159        let relative_val = (upper << self.lower_bit_width) | lower;
160        Some(self.first_value + relative_val as u32)
161    }
162
163    /// Find position of i-th set bit (optimized with NEON on aarch64)
164    fn select1(&self, i: u32) -> Option<u32> {
165        if i >= self.len {
166            return None;
167        }
168
169        let mut remaining = i + 1;
170        let mut pos = 0u32;
171
172        // Process 4 words at a time on aarch64 using NEON popcount
173        #[cfg(target_arch = "aarch64")]
174        {
175            use std::arch::aarch64::*;
176
177            let chunks = self.upper_bits.chunks_exact(4);
178            let remainder = chunks.remainder();
179
180            for chunk in chunks {
181                // Load 4 u64 words (32 bytes)
182                let words: [u64; 4] = [chunk[0], chunk[1], chunk[2], chunk[3]];
183
184                // Count bits in each word using NEON
185                // vcntq_u8 counts bits per byte, then sum horizontally
186                unsafe {
187                    let bytes = std::mem::transmute::<[u64; 4], [u8; 32]>(words);
188
189                    // Process first 16 bytes (words 0-1)
190                    let v0 = vld1q_u8(bytes.as_ptr());
191                    let cnt0 = vcntq_u8(v0);
192                    let sum0 = vaddlvq_u8(cnt0) as u32;
193
194                    // Process next 16 bytes (words 2-3)
195                    let v1 = vld1q_u8(bytes.as_ptr().add(16));
196                    let cnt1 = vcntq_u8(v1);
197                    let sum1 = vaddlvq_u8(cnt1) as u32;
198
199                    let total_popcount = sum0 + sum1;
200
201                    if total_popcount >= remaining {
202                        // Found the chunk, now find exact word
203                        for &word in chunk {
204                            let popcount = word.count_ones();
205                            if popcount >= remaining {
206                                let mut w = word;
207                                for _ in 0..remaining - 1 {
208                                    w &= w - 1;
209                                }
210                                return Some(pos + w.trailing_zeros());
211                            }
212                            remaining -= popcount;
213                            pos += 64;
214                        }
215                    }
216                    remaining -= total_popcount;
217                    pos += 256; // 4 words * 64 bits
218                }
219            }
220
221            // Handle remaining words
222            for &word in remainder {
223                let popcount = word.count_ones();
224                if popcount >= remaining {
225                    let mut w = word;
226                    for _ in 0..remaining - 1 {
227                        w &= w - 1;
228                    }
229                    return Some(pos + w.trailing_zeros());
230                }
231                remaining -= popcount;
232                pos += 64;
233            }
234        }
235
236        // Process 4 words at a time on x86_64 using POPCNT instruction
237        #[cfg(target_arch = "x86_64")]
238        {
239            #[cfg(target_feature = "popcnt")]
240            {
241                use std::arch::x86_64::*;
242
243                let chunks = self.upper_bits.chunks_exact(4);
244                let remainder = chunks.remainder();
245
246                for chunk in chunks {
247                    // Use hardware POPCNT for each word
248                    unsafe {
249                        let p0 = _popcnt64(chunk[0] as i64) as u32;
250                        let p1 = _popcnt64(chunk[1] as i64) as u32;
251                        let p2 = _popcnt64(chunk[2] as i64) as u32;
252                        let p3 = _popcnt64(chunk[3] as i64) as u32;
253
254                        let total_popcount = p0 + p1 + p2 + p3;
255
256                        if total_popcount >= remaining {
257                            // Found the chunk, now find exact word
258                            for &word in chunk {
259                                let popcount = word.count_ones();
260                                if popcount >= remaining {
261                                    let mut w = word;
262                                    for _ in 0..remaining - 1 {
263                                        w &= w - 1;
264                                    }
265                                    return Some(pos + w.trailing_zeros());
266                                }
267                                remaining -= popcount;
268                                pos += 64;
269                            }
270                        }
271                        remaining -= total_popcount;
272                        pos += 256; // 4 words * 64 bits
273                    }
274                }
275
276                // Handle remaining words
277                for &word in remainder {
278                    let popcount = word.count_ones();
279                    if popcount >= remaining {
280                        let mut w = word;
281                        for _ in 0..remaining - 1 {
282                            w &= w - 1;
283                        }
284                        return Some(pos + w.trailing_zeros());
285                    }
286                    remaining -= popcount;
287                    pos += 64;
288                }
289            }
290
291            // Scalar fallback when POPCNT not available
292            #[cfg(not(target_feature = "popcnt"))]
293            {
294                for &word in &self.upper_bits {
295                    let popcount = word.count_ones();
296                    if popcount >= remaining {
297                        let mut w = word;
298                        for _ in 0..remaining - 1 {
299                            w &= w - 1;
300                        }
301                        return Some(pos + w.trailing_zeros());
302                    }
303                    remaining -= popcount;
304                    pos += 64;
305                }
306            }
307        }
308
309        // Scalar fallback for other architectures
310        #[cfg(not(any(target_arch = "aarch64", target_arch = "x86_64")))]
311        {
312            for &word in &self.upper_bits {
313                let popcount = word.count_ones();
314                if popcount >= remaining {
315                    let mut w = word;
316                    for _ in 0..remaining - 1 {
317                        w &= w - 1;
318                    }
319                    return Some(pos + w.trailing_zeros());
320                }
321                remaining -= popcount;
322                pos += 64;
323            }
324        }
325
326        None
327    }
328
329    /// Find first element >= target within this partition
330    /// Returns (local_position, value) or None
331    pub fn next_geq(&self, target: u32) -> Option<(u32, u32)> {
332        if self.len == 0 || target > self.last_value {
333            return None;
334        }
335
336        if target <= self.first_value {
337            return Some((0, self.first_value));
338        }
339
340        // Binary search for target
341        let mut lo = 0u32;
342        let mut hi = self.len;
343
344        while lo < hi {
345            let mid = lo + (hi - lo) / 2;
346            if let Some(val) = self.get(mid) {
347                if val < target {
348                    lo = mid + 1;
349                } else {
350                    hi = mid;
351                }
352            } else {
353                break;
354            }
355        }
356
357        if lo < self.len {
358            self.get(lo).map(|v| (lo, v))
359        } else {
360            None
361        }
362    }
363
364    /// Serialize partition
365    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
366        writer.write_u32::<LittleEndian>(self.len)?;
367        writer.write_u32::<LittleEndian>(self.first_value)?;
368        writer.write_u32::<LittleEndian>(self.last_value)?;
369        // local_universe removed - computed as last_value - first_value + 1
370        writer.write_u8(self.lower_bit_width)?;
371
372        writer.write_u32::<LittleEndian>(self.lower_bits.len() as u32)?;
373        for &word in &self.lower_bits {
374            writer.write_u64::<LittleEndian>(word)?;
375        }
376
377        writer.write_u32::<LittleEndian>(self.upper_bits.len() as u32)?;
378        for &word in &self.upper_bits {
379            writer.write_u64::<LittleEndian>(word)?;
380        }
381
382        Ok(())
383    }
384
385    /// Deserialize partition
386    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
387        let len = reader.read_u32::<LittleEndian>()?;
388        let first_value = reader.read_u32::<LittleEndian>()?;
389        let last_value = reader.read_u32::<LittleEndian>()?;
390        // local_universe removed - computed as last_value - first_value + 1
391        let lower_bit_width = reader.read_u8()?;
392
393        let lower_len = reader.read_u32::<LittleEndian>()? as usize;
394        let mut lower_bits = Vec::with_capacity(lower_len);
395        for _ in 0..lower_len {
396            lower_bits.push(reader.read_u64::<LittleEndian>()?);
397        }
398
399        let upper_len = reader.read_u32::<LittleEndian>()? as usize;
400        let mut upper_bits = Vec::with_capacity(upper_len);
401        for _ in 0..upper_len {
402            upper_bits.push(reader.read_u64::<LittleEndian>()?);
403        }
404
405        Ok(Self {
406            lower_bits,
407            upper_bits,
408            len,
409            first_value,
410            last_value,
411            lower_bit_width,
412        })
413    }
414
415    /// Size in bytes (13 bytes header + arrays)
416    pub fn size_bytes(&self) -> usize {
417        13 + self.lower_bits.len() * 8 + self.upper_bits.len() * 8
418    }
419}
420
421/// Estimate encoding cost for a partition
422fn estimate_partition_cost(values: &[u32]) -> usize {
423    if values.is_empty() {
424        return 0;
425    }
426
427    let n = values.len();
428    let first = values[0];
429    let last = *values.last().unwrap();
430    let local_universe = (last - first + 1) as usize;
431
432    // Lower bits: n * log2(universe/n)
433    let lower_bits = if n <= 1 || local_universe <= n {
434        0
435    } else {
436        let ratio = local_universe / n;
437        let l = (usize::BITS - ratio.leading_zeros()) as usize;
438        n * l
439    };
440
441    // Upper bits: approximately 2n bits
442    let upper_bits = 2 * n;
443
444    // Overhead: partition header (13 bytes after removing local_universe)
445    let overhead = 13 * 8; // 13 bytes header
446
447    (lower_bits + upper_bits + overhead).div_ceil(8)
448}
449
450/// Find optimal partition boundaries using dynamic programming
451/// Returns partition endpoints (exclusive)
452fn find_optimal_partitions(values: &[u32]) -> Vec<usize> {
453    let n = values.len();
454    if n <= MIN_PARTITION_SIZE {
455        return vec![n];
456    }
457
458    // dp[i] = minimum cost to encode values[0..i]
459    // parent[i] = start of last partition ending at i
460    let mut dp = vec![usize::MAX; n + 1];
461    let mut parent = vec![0usize; n + 1];
462    dp[0] = 0;
463
464    for i in MIN_PARTITION_SIZE..=n {
465        // Try all valid partition sizes ending at i
466        let min_start = i.saturating_sub(MAX_PARTITION_SIZE);
467        let max_start = i.saturating_sub(MIN_PARTITION_SIZE);
468
469        for start in min_start..=max_start {
470            if dp[start] == usize::MAX {
471                continue;
472            }
473
474            let partition_cost = estimate_partition_cost(&values[start..i]);
475            let total_cost = dp[start].saturating_add(partition_cost);
476
477            if total_cost < dp[i] {
478                dp[i] = total_cost;
479                parent[i] = start;
480            }
481        }
482    }
483
484    // Handle case where n is not reachable (too small for partitioning)
485    if dp[n] == usize::MAX {
486        return vec![n];
487    }
488
489    // Reconstruct partition boundaries
490    let mut boundaries = Vec::new();
491    let mut pos = n;
492    while pos > 0 {
493        boundaries.push(pos);
494        pos = parent[pos];
495    }
496    boundaries.reverse();
497
498    boundaries
499}
500
501/// Partitioned Elias-Fano encoded sequence
502#[derive(Debug, Clone)]
503pub struct PartitionedEliasFano {
504    /// Individual partitions
505    partitions: Vec<EFPartition>,
506    /// Total number of elements
507    len: u32,
508    /// Cumulative element counts for each partition (for position lookup)
509    cumulative_counts: Vec<u32>,
510}
511
512impl PartitionedEliasFano {
513    /// Create from sorted slice with optimal partitioning
514    pub fn from_sorted_slice(values: &[u32]) -> Self {
515        if values.is_empty() {
516            return Self {
517                partitions: Vec::new(),
518                len: 0,
519                cumulative_counts: Vec::new(),
520            };
521        }
522
523        let boundaries = find_optimal_partitions(values);
524        let mut partitions = Vec::with_capacity(boundaries.len());
525        let mut cumulative_counts = Vec::with_capacity(boundaries.len());
526
527        let mut start = 0;
528        let mut cumulative = 0u32;
529
530        for &end in &boundaries {
531            let partition = EFPartition::from_sorted_slice(&values[start..end]);
532            cumulative += partition.len;
533            cumulative_counts.push(cumulative);
534            partitions.push(partition);
535            start = end;
536        }
537
538        Self {
539            partitions,
540            len: values.len() as u32,
541            cumulative_counts,
542        }
543    }
544
545    /// Number of elements
546    #[inline]
547    pub fn len(&self) -> u32 {
548        self.len
549    }
550
551    /// Check if empty
552    #[inline]
553    pub fn is_empty(&self) -> bool {
554        self.len == 0
555    }
556
557    /// Number of partitions
558    pub fn num_partitions(&self) -> usize {
559        self.partitions.len()
560    }
561
562    /// Get element at global position
563    pub fn get(&self, pos: u32) -> Option<u32> {
564        if pos >= self.len {
565            return None;
566        }
567
568        // Find partition containing this position
569        let partition_idx = self
570            .cumulative_counts
571            .binary_search(&(pos + 1))
572            .unwrap_or_else(|x| x);
573
574        if partition_idx >= self.partitions.len() {
575            return None;
576        }
577
578        let local_pos = if partition_idx == 0 {
579            pos
580        } else {
581            pos - self.cumulative_counts[partition_idx - 1]
582        };
583
584        self.partitions[partition_idx].get(local_pos)
585    }
586
587    /// Find first element >= target
588    /// Returns (global_position, value) or None
589    pub fn next_geq(&self, target: u32) -> Option<(u32, u32)> {
590        if self.partitions.is_empty() {
591            return None;
592        }
593
594        // Binary search for partition containing target
595        let partition_idx = self
596            .partitions
597            .binary_search_by(|p| {
598                if p.last_value < target {
599                    std::cmp::Ordering::Less
600                } else if p.first_value > target {
601                    std::cmp::Ordering::Greater
602                } else {
603                    std::cmp::Ordering::Equal
604                }
605            })
606            .unwrap_or_else(|x| x);
607
608        // Search from this partition onwards
609        for (i, partition) in self.partitions[partition_idx..].iter().enumerate() {
610            let actual_idx = partition_idx + i;
611
612            if let Some((local_pos, val)) = partition.next_geq(target) {
613                let global_pos = if actual_idx == 0 {
614                    local_pos
615                } else {
616                    self.cumulative_counts[actual_idx - 1] + local_pos
617                };
618                return Some((global_pos, val));
619            }
620        }
621
622        None
623    }
624
625    /// Serialize
626    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
627        writer.write_u32::<LittleEndian>(self.len)?;
628        writer.write_u32::<LittleEndian>(self.partitions.len() as u32)?;
629
630        for &count in &self.cumulative_counts {
631            writer.write_u32::<LittleEndian>(count)?;
632        }
633
634        for partition in &self.partitions {
635            partition.serialize(writer)?;
636        }
637
638        Ok(())
639    }
640
641    /// Deserialize
642    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
643        let len = reader.read_u32::<LittleEndian>()?;
644        let num_partitions = reader.read_u32::<LittleEndian>()? as usize;
645
646        let mut cumulative_counts = Vec::with_capacity(num_partitions);
647        for _ in 0..num_partitions {
648            cumulative_counts.push(reader.read_u32::<LittleEndian>()?);
649        }
650
651        let mut partitions = Vec::with_capacity(num_partitions);
652        for _ in 0..num_partitions {
653            partitions.push(EFPartition::deserialize(reader)?);
654        }
655
656        Ok(Self {
657            partitions,
658            len,
659            cumulative_counts,
660        })
661    }
662
663    /// Total size in bytes
664    pub fn size_bytes(&self) -> usize {
665        let mut size = 8 + self.cumulative_counts.len() * 4;
666        for p in &self.partitions {
667            size += p.size_bytes();
668        }
669        size
670    }
671
672    /// Create iterator
673    pub fn iter(&self) -> PartitionedEFIterator<'_> {
674        PartitionedEFIterator {
675            pef: self,
676            partition_idx: 0,
677            local_pos: 0,
678        }
679    }
680}
681
682/// Iterator over Partitioned Elias-Fano
683pub struct PartitionedEFIterator<'a> {
684    pef: &'a PartitionedEliasFano,
685    partition_idx: usize,
686    local_pos: u32,
687}
688
689impl<'a> Iterator for PartitionedEFIterator<'a> {
690    type Item = u32;
691
692    fn next(&mut self) -> Option<Self::Item> {
693        if self.partition_idx >= self.pef.partitions.len() {
694            return None;
695        }
696
697        let partition = &self.pef.partitions[self.partition_idx];
698        if let Some(val) = partition.get(self.local_pos) {
699            self.local_pos += 1;
700            if self.local_pos >= partition.len {
701                self.partition_idx += 1;
702                self.local_pos = 0;
703            }
704            Some(val)
705        } else {
706            None
707        }
708    }
709
710    fn size_hint(&self) -> (usize, Option<usize>) {
711        let current_global = if self.partition_idx == 0 {
712            self.local_pos
713        } else if self.partition_idx < self.pef.cumulative_counts.len() {
714            self.pef.cumulative_counts[self.partition_idx - 1] + self.local_pos
715        } else {
716            self.pef.len
717        };
718        let remaining = (self.pef.len - current_global) as usize;
719        (remaining, Some(remaining))
720    }
721}
722
723/// Block metadata for BlockMax WAND
724#[derive(Debug, Clone)]
725pub struct PEFBlockInfo {
726    /// First document ID in block
727    pub first_doc_id: u32,
728    /// Last document ID in block
729    pub last_doc_id: u32,
730    /// Maximum term frequency in block
731    pub max_tf: u32,
732    /// Maximum BM25 score upper bound
733    pub max_block_score: f32,
734    /// Starting partition index
735    pub partition_idx: u16,
736    /// Starting position within partition
737    pub local_start: u32,
738    /// Number of documents in block
739    pub num_docs: u16,
740}
741
742impl PEFBlockInfo {
743    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
744        writer.write_u32::<LittleEndian>(self.first_doc_id)?;
745        writer.write_u32::<LittleEndian>(self.last_doc_id)?;
746        writer.write_u32::<LittleEndian>(self.max_tf)?;
747        writer.write_f32::<LittleEndian>(self.max_block_score)?;
748        writer.write_u16::<LittleEndian>(self.partition_idx)?;
749        writer.write_u32::<LittleEndian>(self.local_start)?;
750        writer.write_u16::<LittleEndian>(self.num_docs)?;
751        Ok(())
752    }
753
754    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
755        Ok(Self {
756            first_doc_id: reader.read_u32::<LittleEndian>()?,
757            last_doc_id: reader.read_u32::<LittleEndian>()?,
758            max_tf: reader.read_u32::<LittleEndian>()?,
759            max_block_score: reader.read_f32::<LittleEndian>()?,
760            partition_idx: reader.read_u16::<LittleEndian>()?,
761            local_start: reader.read_u32::<LittleEndian>()?,
762            num_docs: reader.read_u16::<LittleEndian>()?,
763        })
764    }
765}
766
767/// Block size for BlockMax (matches other formats)
768pub const PEF_BLOCK_SIZE: usize = 128;
769
770/// Partitioned Elias-Fano posting list with term frequencies and BlockMax
771#[derive(Debug, Clone)]
772pub struct PartitionedEFPostingList {
773    /// Document IDs (Partitioned Elias-Fano encoded)
774    pub doc_ids: PartitionedEliasFano,
775    /// Term frequencies (packed)
776    pub term_freqs: Vec<u8>,
777    /// Bits per term frequency
778    pub tf_bits: u8,
779    /// Maximum term frequency
780    pub max_tf: u32,
781    /// Block metadata for BlockMax WAND
782    pub blocks: Vec<PEFBlockInfo>,
783    /// Global maximum score
784    pub max_score: f32,
785}
786
787impl PartitionedEFPostingList {
788    /// Create from postings
789    pub fn from_postings(doc_ids: &[u32], term_freqs: &[u32]) -> Self {
790        Self::from_postings_with_idf(doc_ids, term_freqs, 1.0)
791    }
792
793    /// Create from postings with IDF for block-max scores
794    pub fn from_postings_with_idf(doc_ids: &[u32], term_freqs: &[u32], idf: f32) -> Self {
795        assert_eq!(doc_ids.len(), term_freqs.len());
796
797        if doc_ids.is_empty() {
798            return Self {
799                doc_ids: PartitionedEliasFano::from_sorted_slice(&[]),
800                term_freqs: Vec::new(),
801                tf_bits: 0,
802                max_tf: 0,
803                blocks: Vec::new(),
804                max_score: 0.0,
805            };
806        }
807
808        let pef_doc_ids = PartitionedEliasFano::from_sorted_slice(doc_ids);
809
810        // Find max TF
811        let max_tf = *term_freqs.iter().max().unwrap();
812        let tf_bits = if max_tf == 0 {
813            0
814        } else {
815            (32 - max_tf.leading_zeros()) as u8
816        };
817
818        // Pack term frequencies
819        let total_bits = doc_ids.len() * (tf_bits as usize);
820        let total_bytes = total_bits.div_ceil(8);
821        let mut packed_tfs = vec![0u8; total_bytes];
822
823        if tf_bits > 0 {
824            for (i, &tf) in term_freqs.iter().enumerate() {
825                let bit_pos = i * (tf_bits as usize);
826                let byte_idx = bit_pos / 8;
827                let bit_offset = bit_pos % 8;
828
829                let val = tf.saturating_sub(1);
830
831                packed_tfs[byte_idx] |= (val as u8) << bit_offset;
832                if bit_offset + (tf_bits as usize) > 8 && byte_idx + 1 < packed_tfs.len() {
833                    packed_tfs[byte_idx + 1] |= (val >> (8 - bit_offset)) as u8;
834                }
835                if bit_offset + (tf_bits as usize) > 16 && byte_idx + 2 < packed_tfs.len() {
836                    packed_tfs[byte_idx + 2] |= (val >> (16 - bit_offset)) as u8;
837                }
838            }
839        }
840
841        // Build block metadata
842        let mut blocks = Vec::new();
843        let mut max_score = 0.0f32;
844        let mut i = 0;
845
846        while i < doc_ids.len() {
847            let block_end = (i + PEF_BLOCK_SIZE).min(doc_ids.len());
848            let block_doc_ids = &doc_ids[i..block_end];
849            let block_tfs = &term_freqs[i..block_end];
850
851            let block_max_tf = *block_tfs.iter().max().unwrap_or(&1);
852            let block_score = crate::query::bm25_upper_bound(block_max_tf as f32, idf);
853            max_score = max_score.max(block_score);
854
855            // Find partition info for this block start
856            let (partition_idx, local_start) =
857                Self::find_partition_position(&pef_doc_ids, i as u32);
858
859            blocks.push(PEFBlockInfo {
860                first_doc_id: block_doc_ids[0],
861                last_doc_id: *block_doc_ids.last().unwrap(),
862                max_tf: block_max_tf,
863                max_block_score: block_score,
864                partition_idx: partition_idx as u16,
865                local_start,
866                num_docs: (block_end - i) as u16,
867            });
868
869            i = block_end;
870        }
871
872        Self {
873            doc_ids: pef_doc_ids,
874            term_freqs: packed_tfs,
875            tf_bits,
876            max_tf,
877            blocks,
878            max_score,
879        }
880    }
881
882    fn find_partition_position(pef: &PartitionedEliasFano, global_pos: u32) -> (usize, u32) {
883        let partition_idx = pef
884            .cumulative_counts
885            .binary_search(&(global_pos + 1))
886            .unwrap_or_else(|x| x);
887
888        let local_pos = if partition_idx == 0 {
889            global_pos
890        } else {
891            global_pos - pef.cumulative_counts[partition_idx - 1]
892        };
893
894        (partition_idx, local_pos)
895    }
896
897    /// Get term frequency at position
898    pub fn get_tf(&self, pos: u32) -> u32 {
899        if self.tf_bits == 0 || pos >= self.doc_ids.len() {
900            return 1;
901        }
902
903        let bit_pos = (pos as usize) * (self.tf_bits as usize);
904        let byte_idx = bit_pos / 8;
905        let bit_offset = bit_pos % 8;
906        let mask = (1u32 << self.tf_bits) - 1;
907
908        let mut val = (self.term_freqs[byte_idx] >> bit_offset) as u32;
909        if bit_offset + (self.tf_bits as usize) > 8 && byte_idx + 1 < self.term_freqs.len() {
910            val |= (self.term_freqs[byte_idx + 1] as u32) << (8 - bit_offset);
911        }
912        if bit_offset + (self.tf_bits as usize) > 16 && byte_idx + 2 < self.term_freqs.len() {
913            val |= (self.term_freqs[byte_idx + 2] as u32) << (16 - bit_offset);
914        }
915
916        (val & mask) + 1
917    }
918
919    /// Number of documents
920    pub fn len(&self) -> u32 {
921        self.doc_ids.len()
922    }
923
924    /// Check if empty
925    pub fn is_empty(&self) -> bool {
926        self.doc_ids.is_empty()
927    }
928
929    /// Serialize
930    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
931        self.doc_ids.serialize(writer)?;
932        writer.write_u8(self.tf_bits)?;
933        writer.write_u32::<LittleEndian>(self.max_tf)?;
934        writer.write_f32::<LittleEndian>(self.max_score)?;
935        writer.write_u32::<LittleEndian>(self.term_freqs.len() as u32)?;
936        writer.write_all(&self.term_freqs)?;
937
938        writer.write_u32::<LittleEndian>(self.blocks.len() as u32)?;
939        for block in &self.blocks {
940            block.serialize(writer)?;
941        }
942
943        Ok(())
944    }
945
946    /// Deserialize
947    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
948        let doc_ids = PartitionedEliasFano::deserialize(reader)?;
949        let tf_bits = reader.read_u8()?;
950        let max_tf = reader.read_u32::<LittleEndian>()?;
951        let max_score = reader.read_f32::<LittleEndian>()?;
952        let tf_len = reader.read_u32::<LittleEndian>()? as usize;
953        let mut term_freqs = vec![0u8; tf_len];
954        reader.read_exact(&mut term_freqs)?;
955
956        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
957        let mut blocks = Vec::with_capacity(num_blocks);
958        for _ in 0..num_blocks {
959            blocks.push(PEFBlockInfo::deserialize(reader)?);
960        }
961
962        Ok(Self {
963            doc_ids,
964            term_freqs,
965            tf_bits,
966            max_tf,
967            blocks,
968            max_score,
969        })
970    }
971
972    /// Create iterator
973    pub fn iterator(&self) -> PartitionedEFPostingIterator<'_> {
974        PartitionedEFPostingIterator {
975            list: self,
976            pos: 0,
977            current_block: 0,
978        }
979    }
980
981    /// Get compression ratio compared to standard EF
982    pub fn compression_info(&self) -> (usize, usize) {
983        let pef_size = self.doc_ids.size_bytes();
984        // Estimate standard EF size
985        let n = self.len() as usize;
986        let max_val = if let Some(last_block) = self.blocks.last() {
987            last_block.last_doc_id
988        } else {
989            0
990        };
991        let ef_size = if n > 0 {
992            let l = if n <= 1 {
993                0
994            } else {
995                let ratio = (max_val as usize + 1) / n;
996                if ratio <= 1 {
997                    0
998                } else {
999                    (usize::BITS - ratio.leading_zeros()) as usize
1000                }
1001            };
1002            (n * l + 2 * n).div_ceil(8) + 16
1003        } else {
1004            0
1005        };
1006        (pef_size, ef_size)
1007    }
1008}
1009
1010/// Iterator over Partitioned EF posting list
1011pub struct PartitionedEFPostingIterator<'a> {
1012    list: &'a PartitionedEFPostingList,
1013    pos: u32,
1014    current_block: usize,
1015}
1016
1017impl<'a> PartitionedEFPostingIterator<'a> {
1018    /// Current document ID
1019    pub fn doc(&self) -> u32 {
1020        self.list.doc_ids.get(self.pos).unwrap_or(u32::MAX)
1021    }
1022
1023    /// Current term frequency
1024    pub fn term_freq(&self) -> u32 {
1025        self.list.get_tf(self.pos)
1026    }
1027
1028    /// Advance to next document
1029    pub fn advance(&mut self) -> u32 {
1030        self.pos += 1;
1031        if !self.list.blocks.is_empty() {
1032            let new_block = (self.pos as usize) / PEF_BLOCK_SIZE;
1033            if new_block < self.list.blocks.len() {
1034                self.current_block = new_block;
1035            }
1036        }
1037        self.doc()
1038    }
1039
1040    /// Seek to first doc >= target
1041    pub fn seek(&mut self, target: u32) -> u32 {
1042        // Use block skip list
1043        if !self.list.blocks.is_empty() {
1044            let block_idx = self.list.blocks[self.current_block..].binary_search_by(|b| {
1045                if b.last_doc_id < target {
1046                    std::cmp::Ordering::Less
1047                } else if b.first_doc_id > target {
1048                    std::cmp::Ordering::Greater
1049                } else {
1050                    std::cmp::Ordering::Equal
1051                }
1052            });
1053
1054            let target_block = match block_idx {
1055                Ok(idx) => self.current_block + idx,
1056                Err(idx) => {
1057                    let abs_idx = self.current_block + idx;
1058                    if abs_idx >= self.list.blocks.len() {
1059                        self.pos = self.list.len();
1060                        return u32::MAX;
1061                    }
1062                    abs_idx
1063                }
1064            };
1065
1066            if target_block > self.current_block {
1067                self.current_block = target_block;
1068                self.pos = (target_block * PEF_BLOCK_SIZE) as u32;
1069            }
1070        }
1071
1072        // Use PEF's next_geq
1073        if let Some((pos, val)) = self.list.doc_ids.next_geq(target)
1074            && pos >= self.pos
1075        {
1076            self.pos = pos;
1077            if !self.list.blocks.is_empty() {
1078                self.current_block = (pos as usize) / PEF_BLOCK_SIZE;
1079            }
1080            return val;
1081        }
1082
1083        // Linear scan fallback
1084        while self.pos < self.list.len() {
1085            let doc = self.doc();
1086            if doc >= target {
1087                return doc;
1088            }
1089            self.pos += 1;
1090        }
1091
1092        u32::MAX
1093    }
1094
1095    /// Check if exhausted
1096    pub fn is_exhausted(&self) -> bool {
1097        self.pos >= self.list.len()
1098    }
1099
1100    /// Current block's max score
1101    pub fn current_block_max_score(&self) -> f32 {
1102        if self.is_exhausted() || self.list.blocks.is_empty() {
1103            return 0.0;
1104        }
1105        if self.current_block < self.list.blocks.len() {
1106            self.list.blocks[self.current_block].max_block_score
1107        } else {
1108            0.0
1109        }
1110    }
1111
1112    /// Current block's max TF
1113    pub fn current_block_max_tf(&self) -> u32 {
1114        if self.is_exhausted() || self.list.blocks.is_empty() {
1115            return 0;
1116        }
1117        if self.current_block < self.list.blocks.len() {
1118            self.list.blocks[self.current_block].max_tf
1119        } else {
1120            0
1121        }
1122    }
1123
1124    /// Max score for remaining blocks
1125    pub fn max_remaining_score(&self) -> f32 {
1126        if self.is_exhausted() || self.list.blocks.is_empty() {
1127            return 0.0;
1128        }
1129        self.list.blocks[self.current_block..]
1130            .iter()
1131            .map(|b| b.max_block_score)
1132            .fold(0.0f32, |a, b| a.max(b))
1133    }
1134
1135    /// Skip to next block containing doc >= target (for BlockWAND)
1136    /// Returns (first_doc_in_block, block_max_score) or None if exhausted
1137    pub fn skip_to_block_with_doc(&mut self, target: u32) -> Option<(u32, f32)> {
1138        if self.list.blocks.is_empty() {
1139            return None;
1140        }
1141
1142        while self.current_block < self.list.blocks.len() {
1143            let block = &self.list.blocks[self.current_block];
1144            if block.last_doc_id >= target {
1145                self.pos = (self.current_block * PEF_BLOCK_SIZE) as u32;
1146                return Some((block.first_doc_id, block.max_block_score));
1147            }
1148            self.current_block += 1;
1149        }
1150
1151        self.pos = self.list.len();
1152        None
1153    }
1154}
1155
1156#[cfg(test)]
1157mod tests {
1158    use super::*;
1159
1160    #[test]
1161    fn test_ef_partition_basic() {
1162        let values = vec![10, 20, 30, 40, 50];
1163        let partition = EFPartition::from_sorted_slice(&values);
1164
1165        assert_eq!(partition.len, 5);
1166        assert_eq!(partition.first_value, 10);
1167        assert_eq!(partition.last_value, 50);
1168
1169        for (i, &expected) in values.iter().enumerate() {
1170            assert_eq!(partition.get(i as u32), Some(expected));
1171        }
1172    }
1173
1174    #[test]
1175    fn test_ef_partition_next_geq() {
1176        let values = vec![10, 20, 30, 100, 200, 300];
1177        let partition = EFPartition::from_sorted_slice(&values);
1178
1179        assert_eq!(partition.next_geq(5), Some((0, 10)));
1180        assert_eq!(partition.next_geq(10), Some((0, 10)));
1181        assert_eq!(partition.next_geq(15), Some((1, 20)));
1182        assert_eq!(partition.next_geq(100), Some((3, 100)));
1183        assert_eq!(partition.next_geq(301), None);
1184    }
1185
1186    #[test]
1187    fn test_partitioned_ef_basic() {
1188        let values: Vec<u32> = (0..500).map(|i| i * 2).collect();
1189        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1190
1191        assert_eq!(pef.len(), 500);
1192        assert!(pef.num_partitions() >= 1);
1193
1194        for (i, &expected) in values.iter().enumerate() {
1195            assert_eq!(pef.get(i as u32), Some(expected), "Mismatch at {}", i);
1196        }
1197    }
1198
1199    #[test]
1200    fn test_partitioned_ef_next_geq() {
1201        let values: Vec<u32> = (0..1000).map(|i| i * 3).collect();
1202        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1203
1204        assert_eq!(pef.next_geq(0), Some((0, 0)));
1205        assert_eq!(pef.next_geq(100), Some((34, 102))); // 100/3 = 33.33, next is 34*3=102
1206        assert_eq!(pef.next_geq(1500), Some((500, 1500)));
1207        assert_eq!(pef.next_geq(3000), None);
1208    }
1209
1210    #[test]
1211    fn test_partitioned_ef_serialization() {
1212        let values: Vec<u32> = (0..500).map(|i| i * 5).collect();
1213        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1214
1215        let mut buffer = Vec::new();
1216        pef.serialize(&mut buffer).unwrap();
1217
1218        let restored = PartitionedEliasFano::deserialize(&mut &buffer[..]).unwrap();
1219
1220        assert_eq!(restored.len(), pef.len());
1221        assert_eq!(restored.num_partitions(), pef.num_partitions());
1222
1223        for i in 0..pef.len() {
1224            assert_eq!(restored.get(i), pef.get(i));
1225        }
1226    }
1227
1228    #[test]
1229    fn test_partitioned_ef_posting_list() {
1230        let doc_ids: Vec<u32> = (0..300).map(|i| i * 2).collect();
1231        let term_freqs: Vec<u32> = (0..300).map(|i| (i % 10) + 1).collect();
1232
1233        let list = PartitionedEFPostingList::from_postings(&doc_ids, &term_freqs);
1234
1235        assert_eq!(list.len(), 300);
1236
1237        let mut iter = list.iterator();
1238        for (i, (&expected_doc, &expected_tf)) in doc_ids.iter().zip(term_freqs.iter()).enumerate()
1239        {
1240            assert_eq!(iter.doc(), expected_doc, "Doc mismatch at {}", i);
1241            assert_eq!(iter.term_freq(), expected_tf, "TF mismatch at {}", i);
1242            iter.advance();
1243        }
1244    }
1245
1246    #[test]
1247    fn test_partitioned_ef_seek() {
1248        let doc_ids: Vec<u32> = (0..500).map(|i| i * 3).collect();
1249        let term_freqs: Vec<u32> = vec![1; 500];
1250
1251        let list = PartitionedEFPostingList::from_postings(&doc_ids, &term_freqs);
1252        let mut iter = list.iterator();
1253
1254        assert_eq!(iter.seek(100), 102); // 100/3 = 33.33, next is 34*3=102
1255        assert_eq!(iter.seek(300), 300);
1256        assert_eq!(iter.seek(1500), u32::MAX);
1257    }
1258
1259    #[test]
1260    fn test_compression_improvement() {
1261        // Test that PEF achieves better compression than standard EF
1262        // on data with varying density
1263        let values: Vec<u32> = (0..10000)
1264            .map(|i| {
1265                if i < 5000 {
1266                    i * 2 // Dense region
1267                } else {
1268                    10000 + (i - 5000) * 100 // Sparse region
1269                }
1270            })
1271            .collect();
1272
1273        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1274
1275        // PEF should use multiple partitions for this mixed-density data
1276        assert!(
1277            pef.num_partitions() > 1,
1278            "Expected multiple partitions, got {}",
1279            pef.num_partitions()
1280        );
1281
1282        // Verify correctness
1283        for (i, &expected) in values.iter().enumerate() {
1284            assert_eq!(pef.get(i as u32), Some(expected));
1285        }
1286    }
1287
1288    #[test]
1289    fn test_partitioned_ef_block_max() {
1290        // Create a large posting list that spans multiple blocks
1291        let doc_ids: Vec<u32> = (0..500).map(|i| i * 2).collect();
1292        // Vary term frequencies so different blocks have different max_tf
1293        let term_freqs: Vec<u32> = (0..500)
1294            .map(|i| {
1295                if i < 128 {
1296                    1 // Block 0: max_tf = 1
1297                } else if i < 256 {
1298                    5 // Block 1: max_tf = 5
1299                } else if i < 384 {
1300                    10 // Block 2: max_tf = 10
1301                } else {
1302                    3 // Block 3: max_tf = 3
1303                }
1304            })
1305            .collect();
1306
1307        let list = PartitionedEFPostingList::from_postings_with_idf(&doc_ids, &term_freqs, 2.0);
1308
1309        // Should have 4 blocks (500 docs / 128 per block)
1310        assert_eq!(list.blocks.len(), 4);
1311        assert_eq!(list.blocks[0].max_tf, 1);
1312        assert_eq!(list.blocks[1].max_tf, 5);
1313        assert_eq!(list.blocks[2].max_tf, 10);
1314        assert_eq!(list.blocks[3].max_tf, 3);
1315
1316        // Block 2 should have highest score (max_tf = 10)
1317        assert!(list.blocks[2].max_block_score > list.blocks[0].max_block_score);
1318        assert!(list.blocks[2].max_block_score > list.blocks[1].max_block_score);
1319        assert!(list.blocks[2].max_block_score > list.blocks[3].max_block_score);
1320
1321        // Global max_score should equal block 2's score
1322        assert_eq!(list.max_score, list.blocks[2].max_block_score);
1323
1324        // Test iterator block-max methods
1325        let mut iter = list.iterator();
1326        assert_eq!(iter.current_block_max_tf(), 1); // Block 0
1327
1328        // Seek to block 1
1329        iter.seek(256); // first doc in block 1
1330        assert_eq!(iter.current_block_max_tf(), 5);
1331
1332        // Seek to block 2
1333        iter.seek(512); // first doc in block 2
1334        assert_eq!(iter.current_block_max_tf(), 10);
1335
1336        // Test skip_to_block_with_doc
1337        let mut iter2 = list.iterator();
1338        let result = iter2.skip_to_block_with_doc(300);
1339        assert!(result.is_some());
1340        let (first_doc, score) = result.unwrap();
1341        assert!(first_doc <= 300);
1342        assert!(score > 0.0);
1343
1344        // Test max_remaining_score
1345        let mut iter3 = list.iterator();
1346        let max_score = iter3.max_remaining_score();
1347        assert_eq!(max_score, list.max_score);
1348
1349        // After seeking past block 2, max_remaining should be lower
1350        iter3.seek(768); // Block 3
1351        let remaining = iter3.max_remaining_score();
1352        assert!(remaining < max_score);
1353    }
1354}