Skip to main content

hermes_core/structures/
partitioned_ef.rs

1//! Partitioned Elias-Fano (PEF) encoding
2//!
3//! Based on Ottaviano & Venturini (2014) "Partitioned Elias-Fano Indexes"
4//!
5//! Key improvements over standard Elias-Fano:
6//! - **Optimal partitioning**: Divides sequence into chunks with (1+ε)-optimal compression
7//! - **Better compression**: 30-40% smaller than standard EF on typical data
8//! - **Fast random access**: O(1) access within partitions
9//! - **Efficient NextGEQ**: Uses partition endpoints for fast seeking
10//!
11//! The algorithm finds optimal partition boundaries that minimize total encoding size.
12//! Each partition is encoded with its own Elias-Fano instance using local parameters.
13
14use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
15use std::io::{self, Read, Write};
16
17/// Minimum partition size (smaller partitions have too much overhead)
18const MIN_PARTITION_SIZE: usize = 64;
19
20/// Maximum partition size (larger partitions lose locality benefits)
21const MAX_PARTITION_SIZE: usize = 512;
22
23/// A single partition in the PEF structure
24#[derive(Debug, Clone)]
25pub struct EFPartition {
26    /// Lower bits array
27    lower_bits: Vec<u64>,
28    /// Upper bits array (unary encoded)
29    upper_bits: Vec<u64>,
30    /// Number of elements in this partition
31    len: u32,
32    /// First value in partition (absolute)
33    first_value: u32,
34    /// Last value in partition (absolute)
35    last_value: u32,
36    /// Local universe size (last_value - first_value + 1)
37    local_universe: u32,
38    /// Number of lower bits per element
39    lower_bit_width: u8,
40}
41
42impl EFPartition {
43    /// Create a partition from a sorted slice
44    /// Values are stored relative to first_value for better compression
45    pub fn from_sorted_slice(values: &[u32]) -> Self {
46        if values.is_empty() {
47            return Self {
48                lower_bits: Vec::new(),
49                upper_bits: Vec::new(),
50                len: 0,
51                first_value: 0,
52                last_value: 0,
53                local_universe: 0,
54                lower_bit_width: 0,
55            };
56        }
57
58        let first_value = values[0];
59        let last_value = *values.last().unwrap();
60        let n = values.len() as u64;
61
62        // Local universe: encode values relative to first_value
63        let local_universe = last_value - first_value + 1;
64
65        // Calculate lower bit width using local universe
66        let lower_bit_width = if n <= 1 {
67            0
68        } else {
69            let ratio = (local_universe as u64).max(1) / n.max(1);
70            if ratio <= 1 {
71                0
72            } else {
73                (64 - ratio.leading_zeros() - 1) as u8
74            }
75        };
76
77        // Allocate arrays
78        let lower_bits_total = (n as usize) * (lower_bit_width as usize);
79        let lower_words = lower_bits_total.div_ceil(64);
80        let mut lower_bits = vec![0u64; lower_words];
81
82        let max_relative = local_universe.saturating_sub(1) as u64;
83        let upper_bound = n + (max_relative >> lower_bit_width) + 1;
84        let upper_words = (upper_bound as usize).div_ceil(64);
85        let mut upper_bits = vec![0u64; upper_words];
86
87        let lower_mask = if lower_bit_width == 0 {
88            0
89        } else {
90            (1u64 << lower_bit_width) - 1
91        };
92
93        // Encode each value relative to first_value
94        for (i, &val) in values.iter().enumerate() {
95            let relative_val = (val - first_value) as u64;
96
97            // Store lower bits
98            if lower_bit_width > 0 {
99                let lower = relative_val & lower_mask;
100                let bit_pos = i * (lower_bit_width as usize);
101                let word_idx = bit_pos / 64;
102                let bit_offset = bit_pos % 64;
103
104                lower_bits[word_idx] |= lower << bit_offset;
105                if bit_offset + (lower_bit_width as usize) > 64 && word_idx + 1 < lower_bits.len() {
106                    lower_bits[word_idx + 1] |= lower >> (64 - bit_offset);
107                }
108            }
109
110            // Store upper bits
111            let upper = relative_val >> lower_bit_width;
112            let upper_pos = (i as u64) + upper;
113            let word_idx = (upper_pos / 64) as usize;
114            let bit_offset = upper_pos % 64;
115            if word_idx < upper_bits.len() {
116                upper_bits[word_idx] |= 1u64 << bit_offset;
117            }
118        }
119
120        Self {
121            lower_bits,
122            upper_bits,
123            len: values.len() as u32,
124            first_value,
125            last_value,
126            local_universe,
127            lower_bit_width,
128        }
129    }
130
131    /// Get element at position i (0-indexed)
132    #[inline]
133    pub fn get(&self, i: u32) -> Option<u32> {
134        if i >= self.len {
135            return None;
136        }
137
138        let i = i as usize;
139
140        // Get lower bits
141        let lower = if self.lower_bit_width == 0 {
142            0u64
143        } else {
144            let bit_pos = i * (self.lower_bit_width as usize);
145            let word_idx = bit_pos / 64;
146            let bit_offset = bit_pos % 64;
147            let lower_mask = (1u64 << self.lower_bit_width) - 1;
148
149            let mut val = (self.lower_bits[word_idx] >> bit_offset) & lower_mask;
150            if bit_offset + (self.lower_bit_width as usize) > 64
151                && word_idx + 1 < self.lower_bits.len()
152            {
153                val |= (self.lower_bits[word_idx + 1] << (64 - bit_offset)) & lower_mask;
154            }
155            val
156        };
157
158        // Get upper bits via select1(i) - i
159        let select_pos = self.select1(i as u32)?;
160        let upper = (select_pos as u64) - (i as u64);
161
162        // Reconstruct absolute value
163        let relative_val = (upper << self.lower_bit_width) | lower;
164        Some(self.first_value + relative_val as u32)
165    }
166
167    /// Find position of i-th set bit (optimized with NEON on aarch64)
168    fn select1(&self, i: u32) -> Option<u32> {
169        if i >= self.len {
170            return None;
171        }
172
173        let mut remaining = i + 1;
174        let mut pos = 0u32;
175
176        // Process 4 words at a time on aarch64 using NEON popcount
177        #[cfg(target_arch = "aarch64")]
178        {
179            use std::arch::aarch64::*;
180
181            let chunks = self.upper_bits.chunks_exact(4);
182            let remainder = chunks.remainder();
183
184            for chunk in chunks {
185                // Load 4 u64 words (32 bytes)
186                let words: [u64; 4] = [chunk[0], chunk[1], chunk[2], chunk[3]];
187
188                // Count bits in each word using NEON
189                // vcntq_u8 counts bits per byte, then sum horizontally
190                unsafe {
191                    let bytes = std::mem::transmute::<[u64; 4], [u8; 32]>(words);
192
193                    // Process first 16 bytes (words 0-1)
194                    let v0 = vld1q_u8(bytes.as_ptr());
195                    let cnt0 = vcntq_u8(v0);
196                    let sum0 = vaddlvq_u8(cnt0) as u32;
197
198                    // Process next 16 bytes (words 2-3)
199                    let v1 = vld1q_u8(bytes.as_ptr().add(16));
200                    let cnt1 = vcntq_u8(v1);
201                    let sum1 = vaddlvq_u8(cnt1) as u32;
202
203                    let total_popcount = sum0 + sum1;
204
205                    if total_popcount >= remaining {
206                        // Found the chunk, now find exact word
207                        for &word in chunk {
208                            let popcount = word.count_ones();
209                            if popcount >= remaining {
210                                let mut w = word;
211                                for _ in 0..remaining - 1 {
212                                    w &= w - 1;
213                                }
214                                return Some(pos + w.trailing_zeros());
215                            }
216                            remaining -= popcount;
217                            pos += 64;
218                        }
219                    }
220                    remaining -= total_popcount;
221                    pos += 256; // 4 words * 64 bits
222                }
223            }
224
225            // Handle remaining words
226            for &word in remainder {
227                let popcount = word.count_ones();
228                if popcount >= remaining {
229                    let mut w = word;
230                    for _ in 0..remaining - 1 {
231                        w &= w - 1;
232                    }
233                    return Some(pos + w.trailing_zeros());
234                }
235                remaining -= popcount;
236                pos += 64;
237            }
238        }
239
240        // Scalar fallback for non-aarch64
241        #[cfg(not(target_arch = "aarch64"))]
242        {
243            for &word in &self.upper_bits {
244                let popcount = word.count_ones();
245                if popcount >= remaining {
246                    let mut w = word;
247                    for _ in 0..remaining - 1 {
248                        w &= w - 1;
249                    }
250                    return Some(pos + w.trailing_zeros());
251                }
252                remaining -= popcount;
253                pos += 64;
254            }
255        }
256
257        None
258    }
259
260    /// Find first element >= target within this partition
261    /// Returns (local_position, value) or None
262    pub fn next_geq(&self, target: u32) -> Option<(u32, u32)> {
263        if self.len == 0 || target > self.last_value {
264            return None;
265        }
266
267        if target <= self.first_value {
268            return Some((0, self.first_value));
269        }
270
271        // Binary search for target
272        let mut lo = 0u32;
273        let mut hi = self.len;
274
275        while lo < hi {
276            let mid = lo + (hi - lo) / 2;
277            if let Some(val) = self.get(mid) {
278                if val < target {
279                    lo = mid + 1;
280                } else {
281                    hi = mid;
282                }
283            } else {
284                break;
285            }
286        }
287
288        if lo < self.len {
289            self.get(lo).map(|v| (lo, v))
290        } else {
291            None
292        }
293    }
294
295    /// Serialize partition
296    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
297        writer.write_u32::<LittleEndian>(self.len)?;
298        writer.write_u32::<LittleEndian>(self.first_value)?;
299        writer.write_u32::<LittleEndian>(self.last_value)?;
300        writer.write_u32::<LittleEndian>(self.local_universe)?;
301        writer.write_u8(self.lower_bit_width)?;
302
303        writer.write_u32::<LittleEndian>(self.lower_bits.len() as u32)?;
304        for &word in &self.lower_bits {
305            writer.write_u64::<LittleEndian>(word)?;
306        }
307
308        writer.write_u32::<LittleEndian>(self.upper_bits.len() as u32)?;
309        for &word in &self.upper_bits {
310            writer.write_u64::<LittleEndian>(word)?;
311        }
312
313        Ok(())
314    }
315
316    /// Deserialize partition
317    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
318        let len = reader.read_u32::<LittleEndian>()?;
319        let first_value = reader.read_u32::<LittleEndian>()?;
320        let last_value = reader.read_u32::<LittleEndian>()?;
321        let local_universe = reader.read_u32::<LittleEndian>()?;
322        let lower_bit_width = reader.read_u8()?;
323
324        let lower_len = reader.read_u32::<LittleEndian>()? as usize;
325        let mut lower_bits = Vec::with_capacity(lower_len);
326        for _ in 0..lower_len {
327            lower_bits.push(reader.read_u64::<LittleEndian>()?);
328        }
329
330        let upper_len = reader.read_u32::<LittleEndian>()? as usize;
331        let mut upper_bits = Vec::with_capacity(upper_len);
332        for _ in 0..upper_len {
333            upper_bits.push(reader.read_u64::<LittleEndian>()?);
334        }
335
336        Ok(Self {
337            lower_bits,
338            upper_bits,
339            len,
340            first_value,
341            last_value,
342            local_universe,
343            lower_bit_width,
344        })
345    }
346
347    /// Size in bytes
348    pub fn size_bytes(&self) -> usize {
349        17 + self.lower_bits.len() * 8 + self.upper_bits.len() * 8
350    }
351}
352
353/// Estimate encoding cost for a partition
354fn estimate_partition_cost(values: &[u32]) -> usize {
355    if values.is_empty() {
356        return 0;
357    }
358
359    let n = values.len();
360    let first = values[0];
361    let last = *values.last().unwrap();
362    let local_universe = (last - first + 1) as usize;
363
364    // Lower bits: n * log2(universe/n)
365    let lower_bits = if n <= 1 || local_universe <= n {
366        0
367    } else {
368        let ratio = local_universe / n;
369        let l = (usize::BITS - ratio.leading_zeros()) as usize;
370        n * l
371    };
372
373    // Upper bits: approximately 2n bits
374    let upper_bits = 2 * n;
375
376    // Overhead: partition header
377    let overhead = 17 * 8; // 17 bytes header
378
379    (lower_bits + upper_bits + overhead).div_ceil(8)
380}
381
382/// Find optimal partition boundaries using dynamic programming
383/// Returns partition endpoints (exclusive)
384fn find_optimal_partitions(values: &[u32]) -> Vec<usize> {
385    let n = values.len();
386    if n <= MIN_PARTITION_SIZE {
387        return vec![n];
388    }
389
390    // dp[i] = minimum cost to encode values[0..i]
391    // parent[i] = start of last partition ending at i
392    let mut dp = vec![usize::MAX; n + 1];
393    let mut parent = vec![0usize; n + 1];
394    dp[0] = 0;
395
396    for i in MIN_PARTITION_SIZE..=n {
397        // Try all valid partition sizes ending at i
398        let min_start = i.saturating_sub(MAX_PARTITION_SIZE);
399        let max_start = i.saturating_sub(MIN_PARTITION_SIZE);
400
401        for start in min_start..=max_start {
402            if dp[start] == usize::MAX {
403                continue;
404            }
405
406            let partition_cost = estimate_partition_cost(&values[start..i]);
407            let total_cost = dp[start].saturating_add(partition_cost);
408
409            if total_cost < dp[i] {
410                dp[i] = total_cost;
411                parent[i] = start;
412            }
413        }
414    }
415
416    // Handle case where n is not reachable (too small for partitioning)
417    if dp[n] == usize::MAX {
418        return vec![n];
419    }
420
421    // Reconstruct partition boundaries
422    let mut boundaries = Vec::new();
423    let mut pos = n;
424    while pos > 0 {
425        boundaries.push(pos);
426        pos = parent[pos];
427    }
428    boundaries.reverse();
429
430    boundaries
431}
432
433/// Partitioned Elias-Fano encoded sequence
434#[derive(Debug, Clone)]
435pub struct PartitionedEliasFano {
436    /// Individual partitions
437    partitions: Vec<EFPartition>,
438    /// Total number of elements
439    len: u32,
440    /// Cumulative element counts for each partition (for position lookup)
441    cumulative_counts: Vec<u32>,
442}
443
444impl PartitionedEliasFano {
445    /// Create from sorted slice with optimal partitioning
446    pub fn from_sorted_slice(values: &[u32]) -> Self {
447        if values.is_empty() {
448            return Self {
449                partitions: Vec::new(),
450                len: 0,
451                cumulative_counts: Vec::new(),
452            };
453        }
454
455        let boundaries = find_optimal_partitions(values);
456        let mut partitions = Vec::with_capacity(boundaries.len());
457        let mut cumulative_counts = Vec::with_capacity(boundaries.len());
458
459        let mut start = 0;
460        let mut cumulative = 0u32;
461
462        for &end in &boundaries {
463            let partition = EFPartition::from_sorted_slice(&values[start..end]);
464            cumulative += partition.len;
465            cumulative_counts.push(cumulative);
466            partitions.push(partition);
467            start = end;
468        }
469
470        Self {
471            partitions,
472            len: values.len() as u32,
473            cumulative_counts,
474        }
475    }
476
477    /// Number of elements
478    #[inline]
479    pub fn len(&self) -> u32 {
480        self.len
481    }
482
483    /// Check if empty
484    #[inline]
485    pub fn is_empty(&self) -> bool {
486        self.len == 0
487    }
488
489    /// Number of partitions
490    pub fn num_partitions(&self) -> usize {
491        self.partitions.len()
492    }
493
494    /// Get element at global position
495    pub fn get(&self, pos: u32) -> Option<u32> {
496        if pos >= self.len {
497            return None;
498        }
499
500        // Find partition containing this position
501        let partition_idx = self
502            .cumulative_counts
503            .binary_search(&(pos + 1))
504            .unwrap_or_else(|x| x);
505
506        if partition_idx >= self.partitions.len() {
507            return None;
508        }
509
510        let local_pos = if partition_idx == 0 {
511            pos
512        } else {
513            pos - self.cumulative_counts[partition_idx - 1]
514        };
515
516        self.partitions[partition_idx].get(local_pos)
517    }
518
519    /// Find first element >= target
520    /// Returns (global_position, value) or None
521    pub fn next_geq(&self, target: u32) -> Option<(u32, u32)> {
522        if self.partitions.is_empty() {
523            return None;
524        }
525
526        // Binary search for partition containing target
527        let partition_idx = self
528            .partitions
529            .binary_search_by(|p| {
530                if p.last_value < target {
531                    std::cmp::Ordering::Less
532                } else if p.first_value > target {
533                    std::cmp::Ordering::Greater
534                } else {
535                    std::cmp::Ordering::Equal
536                }
537            })
538            .unwrap_or_else(|x| x);
539
540        // Search from this partition onwards
541        for (i, partition) in self.partitions[partition_idx..].iter().enumerate() {
542            let actual_idx = partition_idx + i;
543
544            if let Some((local_pos, val)) = partition.next_geq(target) {
545                let global_pos = if actual_idx == 0 {
546                    local_pos
547                } else {
548                    self.cumulative_counts[actual_idx - 1] + local_pos
549                };
550                return Some((global_pos, val));
551            }
552        }
553
554        None
555    }
556
557    /// Serialize
558    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
559        writer.write_u32::<LittleEndian>(self.len)?;
560        writer.write_u32::<LittleEndian>(self.partitions.len() as u32)?;
561
562        for &count in &self.cumulative_counts {
563            writer.write_u32::<LittleEndian>(count)?;
564        }
565
566        for partition in &self.partitions {
567            partition.serialize(writer)?;
568        }
569
570        Ok(())
571    }
572
573    /// Deserialize
574    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
575        let len = reader.read_u32::<LittleEndian>()?;
576        let num_partitions = reader.read_u32::<LittleEndian>()? as usize;
577
578        let mut cumulative_counts = Vec::with_capacity(num_partitions);
579        for _ in 0..num_partitions {
580            cumulative_counts.push(reader.read_u32::<LittleEndian>()?);
581        }
582
583        let mut partitions = Vec::with_capacity(num_partitions);
584        for _ in 0..num_partitions {
585            partitions.push(EFPartition::deserialize(reader)?);
586        }
587
588        Ok(Self {
589            partitions,
590            len,
591            cumulative_counts,
592        })
593    }
594
595    /// Total size in bytes
596    pub fn size_bytes(&self) -> usize {
597        let mut size = 8 + self.cumulative_counts.len() * 4;
598        for p in &self.partitions {
599            size += p.size_bytes();
600        }
601        size
602    }
603
604    /// Create iterator
605    pub fn iter(&self) -> PartitionedEFIterator<'_> {
606        PartitionedEFIterator {
607            pef: self,
608            partition_idx: 0,
609            local_pos: 0,
610        }
611    }
612}
613
614/// Iterator over Partitioned Elias-Fano
615pub struct PartitionedEFIterator<'a> {
616    pef: &'a PartitionedEliasFano,
617    partition_idx: usize,
618    local_pos: u32,
619}
620
621impl<'a> Iterator for PartitionedEFIterator<'a> {
622    type Item = u32;
623
624    fn next(&mut self) -> Option<Self::Item> {
625        if self.partition_idx >= self.pef.partitions.len() {
626            return None;
627        }
628
629        let partition = &self.pef.partitions[self.partition_idx];
630        if let Some(val) = partition.get(self.local_pos) {
631            self.local_pos += 1;
632            if self.local_pos >= partition.len {
633                self.partition_idx += 1;
634                self.local_pos = 0;
635            }
636            Some(val)
637        } else {
638            None
639        }
640    }
641
642    fn size_hint(&self) -> (usize, Option<usize>) {
643        let current_global = if self.partition_idx == 0 {
644            self.local_pos
645        } else if self.partition_idx < self.pef.cumulative_counts.len() {
646            self.pef.cumulative_counts[self.partition_idx - 1] + self.local_pos
647        } else {
648            self.pef.len
649        };
650        let remaining = (self.pef.len - current_global) as usize;
651        (remaining, Some(remaining))
652    }
653}
654
655/// Block metadata for BlockMax WAND
656#[derive(Debug, Clone)]
657pub struct PEFBlockInfo {
658    /// First document ID in block
659    pub first_doc_id: u32,
660    /// Last document ID in block
661    pub last_doc_id: u32,
662    /// Maximum term frequency in block
663    pub max_tf: u32,
664    /// Maximum BM25 score upper bound
665    pub max_block_score: f32,
666    /// Starting partition index
667    pub partition_idx: u16,
668    /// Starting position within partition
669    pub local_start: u32,
670    /// Number of documents in block
671    pub num_docs: u16,
672}
673
674impl PEFBlockInfo {
675    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
676        writer.write_u32::<LittleEndian>(self.first_doc_id)?;
677        writer.write_u32::<LittleEndian>(self.last_doc_id)?;
678        writer.write_u32::<LittleEndian>(self.max_tf)?;
679        writer.write_f32::<LittleEndian>(self.max_block_score)?;
680        writer.write_u16::<LittleEndian>(self.partition_idx)?;
681        writer.write_u32::<LittleEndian>(self.local_start)?;
682        writer.write_u16::<LittleEndian>(self.num_docs)?;
683        Ok(())
684    }
685
686    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
687        Ok(Self {
688            first_doc_id: reader.read_u32::<LittleEndian>()?,
689            last_doc_id: reader.read_u32::<LittleEndian>()?,
690            max_tf: reader.read_u32::<LittleEndian>()?,
691            max_block_score: reader.read_f32::<LittleEndian>()?,
692            partition_idx: reader.read_u16::<LittleEndian>()?,
693            local_start: reader.read_u32::<LittleEndian>()?,
694            num_docs: reader.read_u16::<LittleEndian>()?,
695        })
696    }
697}
698
699/// Block size for BlockMax (matches other formats)
700pub const PEF_BLOCK_SIZE: usize = 128;
701
702/// Partitioned Elias-Fano posting list with term frequencies and BlockMax
703#[derive(Debug, Clone)]
704pub struct PartitionedEFPostingList {
705    /// Document IDs (Partitioned Elias-Fano encoded)
706    pub doc_ids: PartitionedEliasFano,
707    /// Term frequencies (packed)
708    pub term_freqs: Vec<u8>,
709    /// Bits per term frequency
710    pub tf_bits: u8,
711    /// Maximum term frequency
712    pub max_tf: u32,
713    /// Block metadata for BlockMax WAND
714    pub blocks: Vec<PEFBlockInfo>,
715    /// Global maximum score
716    pub max_score: f32,
717}
718
719impl PartitionedEFPostingList {
720    const K1: f32 = 1.2;
721    const B: f32 = 0.75;
722
723    #[inline]
724    pub fn compute_bm25_upper_bound(max_tf: u32, idf: f32) -> f32 {
725        let tf = max_tf as f32;
726        let min_length_norm = 1.0 - Self::B;
727        let tf_norm = (tf * (Self::K1 + 1.0)) / (tf + Self::K1 * min_length_norm);
728        idf * tf_norm
729    }
730
731    /// Create from postings
732    pub fn from_postings(doc_ids: &[u32], term_freqs: &[u32]) -> Self {
733        Self::from_postings_with_idf(doc_ids, term_freqs, 1.0)
734    }
735
736    /// Create from postings with IDF for block-max scores
737    pub fn from_postings_with_idf(doc_ids: &[u32], term_freqs: &[u32], idf: f32) -> Self {
738        assert_eq!(doc_ids.len(), term_freqs.len());
739
740        if doc_ids.is_empty() {
741            return Self {
742                doc_ids: PartitionedEliasFano::from_sorted_slice(&[]),
743                term_freqs: Vec::new(),
744                tf_bits: 0,
745                max_tf: 0,
746                blocks: Vec::new(),
747                max_score: 0.0,
748            };
749        }
750
751        let pef_doc_ids = PartitionedEliasFano::from_sorted_slice(doc_ids);
752
753        // Find max TF
754        let max_tf = *term_freqs.iter().max().unwrap();
755        let tf_bits = if max_tf == 0 {
756            0
757        } else {
758            (32 - max_tf.leading_zeros()) as u8
759        };
760
761        // Pack term frequencies
762        let total_bits = doc_ids.len() * (tf_bits as usize);
763        let total_bytes = total_bits.div_ceil(8);
764        let mut packed_tfs = vec![0u8; total_bytes];
765
766        if tf_bits > 0 {
767            for (i, &tf) in term_freqs.iter().enumerate() {
768                let bit_pos = i * (tf_bits as usize);
769                let byte_idx = bit_pos / 8;
770                let bit_offset = bit_pos % 8;
771
772                let val = tf.saturating_sub(1);
773
774                packed_tfs[byte_idx] |= (val as u8) << bit_offset;
775                if bit_offset + (tf_bits as usize) > 8 && byte_idx + 1 < packed_tfs.len() {
776                    packed_tfs[byte_idx + 1] |= (val >> (8 - bit_offset)) as u8;
777                }
778                if bit_offset + (tf_bits as usize) > 16 && byte_idx + 2 < packed_tfs.len() {
779                    packed_tfs[byte_idx + 2] |= (val >> (16 - bit_offset)) as u8;
780                }
781            }
782        }
783
784        // Build block metadata
785        let mut blocks = Vec::new();
786        let mut max_score = 0.0f32;
787        let mut i = 0;
788
789        while i < doc_ids.len() {
790            let block_end = (i + PEF_BLOCK_SIZE).min(doc_ids.len());
791            let block_doc_ids = &doc_ids[i..block_end];
792            let block_tfs = &term_freqs[i..block_end];
793
794            let block_max_tf = *block_tfs.iter().max().unwrap_or(&1);
795            let block_score = Self::compute_bm25_upper_bound(block_max_tf, idf);
796            max_score = max_score.max(block_score);
797
798            // Find partition info for this block start
799            let (partition_idx, local_start) =
800                Self::find_partition_position(&pef_doc_ids, i as u32);
801
802            blocks.push(PEFBlockInfo {
803                first_doc_id: block_doc_ids[0],
804                last_doc_id: *block_doc_ids.last().unwrap(),
805                max_tf: block_max_tf,
806                max_block_score: block_score,
807                partition_idx: partition_idx as u16,
808                local_start,
809                num_docs: (block_end - i) as u16,
810            });
811
812            i = block_end;
813        }
814
815        Self {
816            doc_ids: pef_doc_ids,
817            term_freqs: packed_tfs,
818            tf_bits,
819            max_tf,
820            blocks,
821            max_score,
822        }
823    }
824
825    fn find_partition_position(pef: &PartitionedEliasFano, global_pos: u32) -> (usize, u32) {
826        let partition_idx = pef
827            .cumulative_counts
828            .binary_search(&(global_pos + 1))
829            .unwrap_or_else(|x| x);
830
831        let local_pos = if partition_idx == 0 {
832            global_pos
833        } else {
834            global_pos - pef.cumulative_counts[partition_idx - 1]
835        };
836
837        (partition_idx, local_pos)
838    }
839
840    /// Get term frequency at position
841    pub fn get_tf(&self, pos: u32) -> u32 {
842        if self.tf_bits == 0 || pos >= self.doc_ids.len() {
843            return 1;
844        }
845
846        let bit_pos = (pos as usize) * (self.tf_bits as usize);
847        let byte_idx = bit_pos / 8;
848        let bit_offset = bit_pos % 8;
849        let mask = (1u32 << self.tf_bits) - 1;
850
851        let mut val = (self.term_freqs[byte_idx] >> bit_offset) as u32;
852        if bit_offset + (self.tf_bits as usize) > 8 && byte_idx + 1 < self.term_freqs.len() {
853            val |= (self.term_freqs[byte_idx + 1] as u32) << (8 - bit_offset);
854        }
855        if bit_offset + (self.tf_bits as usize) > 16 && byte_idx + 2 < self.term_freqs.len() {
856            val |= (self.term_freqs[byte_idx + 2] as u32) << (16 - bit_offset);
857        }
858
859        (val & mask) + 1
860    }
861
862    /// Number of documents
863    pub fn len(&self) -> u32 {
864        self.doc_ids.len()
865    }
866
867    /// Check if empty
868    pub fn is_empty(&self) -> bool {
869        self.doc_ids.is_empty()
870    }
871
872    /// Serialize
873    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
874        self.doc_ids.serialize(writer)?;
875        writer.write_u8(self.tf_bits)?;
876        writer.write_u32::<LittleEndian>(self.max_tf)?;
877        writer.write_f32::<LittleEndian>(self.max_score)?;
878        writer.write_u32::<LittleEndian>(self.term_freqs.len() as u32)?;
879        writer.write_all(&self.term_freqs)?;
880
881        writer.write_u32::<LittleEndian>(self.blocks.len() as u32)?;
882        for block in &self.blocks {
883            block.serialize(writer)?;
884        }
885
886        Ok(())
887    }
888
889    /// Deserialize
890    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
891        let doc_ids = PartitionedEliasFano::deserialize(reader)?;
892        let tf_bits = reader.read_u8()?;
893        let max_tf = reader.read_u32::<LittleEndian>()?;
894        let max_score = reader.read_f32::<LittleEndian>()?;
895        let tf_len = reader.read_u32::<LittleEndian>()? as usize;
896        let mut term_freqs = vec![0u8; tf_len];
897        reader.read_exact(&mut term_freqs)?;
898
899        let num_blocks = reader.read_u32::<LittleEndian>()? as usize;
900        let mut blocks = Vec::with_capacity(num_blocks);
901        for _ in 0..num_blocks {
902            blocks.push(PEFBlockInfo::deserialize(reader)?);
903        }
904
905        Ok(Self {
906            doc_ids,
907            term_freqs,
908            tf_bits,
909            max_tf,
910            blocks,
911            max_score,
912        })
913    }
914
915    /// Create iterator
916    pub fn iterator(&self) -> PartitionedEFPostingIterator<'_> {
917        PartitionedEFPostingIterator {
918            list: self,
919            pos: 0,
920            current_block: 0,
921        }
922    }
923
924    /// Get compression ratio compared to standard EF
925    pub fn compression_info(&self) -> (usize, usize) {
926        let pef_size = self.doc_ids.size_bytes();
927        // Estimate standard EF size
928        let n = self.len() as usize;
929        let max_val = if let Some(last_block) = self.blocks.last() {
930            last_block.last_doc_id
931        } else {
932            0
933        };
934        let ef_size = if n > 0 {
935            let l = if n <= 1 {
936                0
937            } else {
938                let ratio = (max_val as usize + 1) / n;
939                if ratio <= 1 {
940                    0
941                } else {
942                    (usize::BITS - ratio.leading_zeros()) as usize
943                }
944            };
945            (n * l + 2 * n).div_ceil(8) + 16
946        } else {
947            0
948        };
949        (pef_size, ef_size)
950    }
951}
952
953/// Iterator over Partitioned EF posting list
954pub struct PartitionedEFPostingIterator<'a> {
955    list: &'a PartitionedEFPostingList,
956    pos: u32,
957    current_block: usize,
958}
959
960impl<'a> PartitionedEFPostingIterator<'a> {
961    /// Current document ID
962    pub fn doc(&self) -> u32 {
963        self.list.doc_ids.get(self.pos).unwrap_or(u32::MAX)
964    }
965
966    /// Current term frequency
967    pub fn term_freq(&self) -> u32 {
968        self.list.get_tf(self.pos)
969    }
970
971    /// Advance to next document
972    pub fn advance(&mut self) -> u32 {
973        self.pos += 1;
974        if !self.list.blocks.is_empty() {
975            let new_block = (self.pos as usize) / PEF_BLOCK_SIZE;
976            if new_block < self.list.blocks.len() {
977                self.current_block = new_block;
978            }
979        }
980        self.doc()
981    }
982
983    /// Seek to first doc >= target
984    pub fn seek(&mut self, target: u32) -> u32 {
985        // Use block skip list
986        if !self.list.blocks.is_empty() {
987            let block_idx = self.list.blocks[self.current_block..].binary_search_by(|b| {
988                if b.last_doc_id < target {
989                    std::cmp::Ordering::Less
990                } else if b.first_doc_id > target {
991                    std::cmp::Ordering::Greater
992                } else {
993                    std::cmp::Ordering::Equal
994                }
995            });
996
997            let target_block = match block_idx {
998                Ok(idx) => self.current_block + idx,
999                Err(idx) => {
1000                    let abs_idx = self.current_block + idx;
1001                    if abs_idx >= self.list.blocks.len() {
1002                        self.pos = self.list.len();
1003                        return u32::MAX;
1004                    }
1005                    abs_idx
1006                }
1007            };
1008
1009            if target_block > self.current_block {
1010                self.current_block = target_block;
1011                self.pos = (target_block * PEF_BLOCK_SIZE) as u32;
1012            }
1013        }
1014
1015        // Use PEF's next_geq
1016        if let Some((pos, val)) = self.list.doc_ids.next_geq(target)
1017            && pos >= self.pos
1018        {
1019            self.pos = pos;
1020            if !self.list.blocks.is_empty() {
1021                self.current_block = (pos as usize) / PEF_BLOCK_SIZE;
1022            }
1023            return val;
1024        }
1025
1026        // Linear scan fallback
1027        while self.pos < self.list.len() {
1028            let doc = self.doc();
1029            if doc >= target {
1030                return doc;
1031            }
1032            self.pos += 1;
1033        }
1034
1035        u32::MAX
1036    }
1037
1038    /// Check if exhausted
1039    pub fn is_exhausted(&self) -> bool {
1040        self.pos >= self.list.len()
1041    }
1042
1043    /// Current block's max score
1044    pub fn current_block_max_score(&self) -> f32 {
1045        if self.is_exhausted() || self.list.blocks.is_empty() {
1046            return 0.0;
1047        }
1048        if self.current_block < self.list.blocks.len() {
1049            self.list.blocks[self.current_block].max_block_score
1050        } else {
1051            0.0
1052        }
1053    }
1054
1055    /// Current block's max TF
1056    pub fn current_block_max_tf(&self) -> u32 {
1057        if self.is_exhausted() || self.list.blocks.is_empty() {
1058            return 0;
1059        }
1060        if self.current_block < self.list.blocks.len() {
1061            self.list.blocks[self.current_block].max_tf
1062        } else {
1063            0
1064        }
1065    }
1066
1067    /// Max score for remaining blocks
1068    pub fn max_remaining_score(&self) -> f32 {
1069        if self.is_exhausted() || self.list.blocks.is_empty() {
1070            return 0.0;
1071        }
1072        self.list.blocks[self.current_block..]
1073            .iter()
1074            .map(|b| b.max_block_score)
1075            .fold(0.0f32, |a, b| a.max(b))
1076    }
1077
1078    /// Skip to next block containing doc >= target (for BlockWAND)
1079    /// Returns (first_doc_in_block, block_max_score) or None if exhausted
1080    pub fn skip_to_block_with_doc(&mut self, target: u32) -> Option<(u32, f32)> {
1081        if self.list.blocks.is_empty() {
1082            return None;
1083        }
1084
1085        while self.current_block < self.list.blocks.len() {
1086            let block = &self.list.blocks[self.current_block];
1087            if block.last_doc_id >= target {
1088                self.pos = (self.current_block * PEF_BLOCK_SIZE) as u32;
1089                return Some((block.first_doc_id, block.max_block_score));
1090            }
1091            self.current_block += 1;
1092        }
1093
1094        self.pos = self.list.len();
1095        None
1096    }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101    use super::*;
1102
1103    #[test]
1104    fn test_ef_partition_basic() {
1105        let values = vec![10, 20, 30, 40, 50];
1106        let partition = EFPartition::from_sorted_slice(&values);
1107
1108        assert_eq!(partition.len, 5);
1109        assert_eq!(partition.first_value, 10);
1110        assert_eq!(partition.last_value, 50);
1111
1112        for (i, &expected) in values.iter().enumerate() {
1113            assert_eq!(partition.get(i as u32), Some(expected));
1114        }
1115    }
1116
1117    #[test]
1118    fn test_ef_partition_next_geq() {
1119        let values = vec![10, 20, 30, 100, 200, 300];
1120        let partition = EFPartition::from_sorted_slice(&values);
1121
1122        assert_eq!(partition.next_geq(5), Some((0, 10)));
1123        assert_eq!(partition.next_geq(10), Some((0, 10)));
1124        assert_eq!(partition.next_geq(15), Some((1, 20)));
1125        assert_eq!(partition.next_geq(100), Some((3, 100)));
1126        assert_eq!(partition.next_geq(301), None);
1127    }
1128
1129    #[test]
1130    fn test_partitioned_ef_basic() {
1131        let values: Vec<u32> = (0..500).map(|i| i * 2).collect();
1132        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1133
1134        assert_eq!(pef.len(), 500);
1135        assert!(pef.num_partitions() >= 1);
1136
1137        for (i, &expected) in values.iter().enumerate() {
1138            assert_eq!(pef.get(i as u32), Some(expected), "Mismatch at {}", i);
1139        }
1140    }
1141
1142    #[test]
1143    fn test_partitioned_ef_next_geq() {
1144        let values: Vec<u32> = (0..1000).map(|i| i * 3).collect();
1145        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1146
1147        assert_eq!(pef.next_geq(0), Some((0, 0)));
1148        assert_eq!(pef.next_geq(100), Some((34, 102))); // 100/3 = 33.33, next is 34*3=102
1149        assert_eq!(pef.next_geq(1500), Some((500, 1500)));
1150        assert_eq!(pef.next_geq(3000), None);
1151    }
1152
1153    #[test]
1154    fn test_partitioned_ef_serialization() {
1155        let values: Vec<u32> = (0..500).map(|i| i * 5).collect();
1156        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1157
1158        let mut buffer = Vec::new();
1159        pef.serialize(&mut buffer).unwrap();
1160
1161        let restored = PartitionedEliasFano::deserialize(&mut &buffer[..]).unwrap();
1162
1163        assert_eq!(restored.len(), pef.len());
1164        assert_eq!(restored.num_partitions(), pef.num_partitions());
1165
1166        for i in 0..pef.len() {
1167            assert_eq!(restored.get(i), pef.get(i));
1168        }
1169    }
1170
1171    #[test]
1172    fn test_partitioned_ef_posting_list() {
1173        let doc_ids: Vec<u32> = (0..300).map(|i| i * 2).collect();
1174        let term_freqs: Vec<u32> = (0..300).map(|i| (i % 10) + 1).collect();
1175
1176        let list = PartitionedEFPostingList::from_postings(&doc_ids, &term_freqs);
1177
1178        assert_eq!(list.len(), 300);
1179
1180        let mut iter = list.iterator();
1181        for (i, (&expected_doc, &expected_tf)) in doc_ids.iter().zip(term_freqs.iter()).enumerate()
1182        {
1183            assert_eq!(iter.doc(), expected_doc, "Doc mismatch at {}", i);
1184            assert_eq!(iter.term_freq(), expected_tf, "TF mismatch at {}", i);
1185            iter.advance();
1186        }
1187    }
1188
1189    #[test]
1190    fn test_partitioned_ef_seek() {
1191        let doc_ids: Vec<u32> = (0..500).map(|i| i * 3).collect();
1192        let term_freqs: Vec<u32> = vec![1; 500];
1193
1194        let list = PartitionedEFPostingList::from_postings(&doc_ids, &term_freqs);
1195        let mut iter = list.iterator();
1196
1197        assert_eq!(iter.seek(100), 102); // 100/3 = 33.33, next is 34*3=102
1198        assert_eq!(iter.seek(300), 300);
1199        assert_eq!(iter.seek(1500), u32::MAX);
1200    }
1201
1202    #[test]
1203    fn test_compression_improvement() {
1204        // Test that PEF achieves better compression than standard EF
1205        // on data with varying density
1206        let values: Vec<u32> = (0..10000)
1207            .map(|i| {
1208                if i < 5000 {
1209                    i * 2 // Dense region
1210                } else {
1211                    10000 + (i - 5000) * 100 // Sparse region
1212                }
1213            })
1214            .collect();
1215
1216        let pef = PartitionedEliasFano::from_sorted_slice(&values);
1217
1218        // PEF should use multiple partitions for this mixed-density data
1219        assert!(
1220            pef.num_partitions() > 1,
1221            "Expected multiple partitions, got {}",
1222            pef.num_partitions()
1223        );
1224
1225        // Verify correctness
1226        for (i, &expected) in values.iter().enumerate() {
1227            assert_eq!(pef.get(i as u32), Some(expected));
1228        }
1229    }
1230
1231    #[test]
1232    fn test_partitioned_ef_block_max() {
1233        // Create a large posting list that spans multiple blocks
1234        let doc_ids: Vec<u32> = (0..500).map(|i| i * 2).collect();
1235        // Vary term frequencies so different blocks have different max_tf
1236        let term_freqs: Vec<u32> = (0..500)
1237            .map(|i| {
1238                if i < 128 {
1239                    1 // Block 0: max_tf = 1
1240                } else if i < 256 {
1241                    5 // Block 1: max_tf = 5
1242                } else if i < 384 {
1243                    10 // Block 2: max_tf = 10
1244                } else {
1245                    3 // Block 3: max_tf = 3
1246                }
1247            })
1248            .collect();
1249
1250        let list = PartitionedEFPostingList::from_postings_with_idf(&doc_ids, &term_freqs, 2.0);
1251
1252        // Should have 4 blocks (500 docs / 128 per block)
1253        assert_eq!(list.blocks.len(), 4);
1254        assert_eq!(list.blocks[0].max_tf, 1);
1255        assert_eq!(list.blocks[1].max_tf, 5);
1256        assert_eq!(list.blocks[2].max_tf, 10);
1257        assert_eq!(list.blocks[3].max_tf, 3);
1258
1259        // Block 2 should have highest score (max_tf = 10)
1260        assert!(list.blocks[2].max_block_score > list.blocks[0].max_block_score);
1261        assert!(list.blocks[2].max_block_score > list.blocks[1].max_block_score);
1262        assert!(list.blocks[2].max_block_score > list.blocks[3].max_block_score);
1263
1264        // Global max_score should equal block 2's score
1265        assert_eq!(list.max_score, list.blocks[2].max_block_score);
1266
1267        // Test iterator block-max methods
1268        let mut iter = list.iterator();
1269        assert_eq!(iter.current_block_max_tf(), 1); // Block 0
1270
1271        // Seek to block 1
1272        iter.seek(256); // first doc in block 1
1273        assert_eq!(iter.current_block_max_tf(), 5);
1274
1275        // Seek to block 2
1276        iter.seek(512); // first doc in block 2
1277        assert_eq!(iter.current_block_max_tf(), 10);
1278
1279        // Test skip_to_block_with_doc
1280        let mut iter2 = list.iterator();
1281        let result = iter2.skip_to_block_with_doc(300);
1282        assert!(result.is_some());
1283        let (first_doc, score) = result.unwrap();
1284        assert!(first_doc <= 300);
1285        assert!(score > 0.0);
1286
1287        // Test max_remaining_score
1288        let mut iter3 = list.iterator();
1289        let max_score = iter3.max_remaining_score();
1290        assert_eq!(max_score, list.max_score);
1291
1292        // After seeking past block 2, max_remaining should be lower
1293        iter3.seek(768); // Block 3
1294        let remaining = iter3.max_remaining_score();
1295        assert!(remaining < max_score);
1296    }
1297}