Skip to main content

hermes_core/structures/postings/
posting.rs

1//! Posting list implementation with compact representation
2//!
3//! Text blocks use SIMD-friendly packed bit-width encoding:
4//! - Doc IDs: delta-encoded, packed at rounded bit width (0/8/16/32)
5//! - Term frequencies: packed at rounded bit width
6//! - Same SIMD primitives as sparse blocks (`simd::pack_rounded` / `unpack_rounded`)
7
8use byteorder::{LittleEndian, WriteBytesExt};
9use std::io::{self, Read, Write};
10
11use super::posting_common::{read_vint, write_vint};
12use crate::DocId;
13use crate::directories::OwnedBytes;
14use crate::structures::simd;
15
16/// A posting entry containing doc_id and term frequency
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct Posting {
19    pub doc_id: DocId,
20    pub term_freq: u32,
21}
22
23/// Compact posting list with delta encoding
24#[derive(Debug, Clone, Default)]
25pub struct PostingList {
26    postings: Vec<Posting>,
27}
28
29impl PostingList {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub fn with_capacity(capacity: usize) -> Self {
35        Self {
36            postings: Vec::with_capacity(capacity),
37        }
38    }
39
40    /// Add a posting (must be added in doc_id order)
41    pub fn push(&mut self, doc_id: DocId, term_freq: u32) {
42        debug_assert!(
43            self.postings.is_empty() || self.postings.last().unwrap().doc_id < doc_id,
44            "Postings must be added in sorted order"
45        );
46        self.postings.push(Posting { doc_id, term_freq });
47    }
48
49    /// Add a posting, incrementing term_freq if doc already exists
50    pub fn add(&mut self, doc_id: DocId, term_freq: u32) {
51        if let Some(last) = self.postings.last_mut()
52            && last.doc_id == doc_id
53        {
54            last.term_freq += term_freq;
55            return;
56        }
57        self.postings.push(Posting { doc_id, term_freq });
58    }
59
60    /// Get document count
61    pub fn doc_count(&self) -> u32 {
62        self.postings.len() as u32
63    }
64
65    pub fn len(&self) -> usize {
66        self.postings.len()
67    }
68
69    pub fn is_empty(&self) -> bool {
70        self.postings.is_empty()
71    }
72
73    pub fn iter(&self) -> impl Iterator<Item = &Posting> {
74        self.postings.iter()
75    }
76
77    /// Serialize to bytes using delta encoding and varint
78    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
79        // Write number of postings
80        write_vint(writer, self.postings.len() as u64)?;
81
82        let mut prev_doc_id = 0u32;
83        for posting in &self.postings {
84            // Delta encode doc_id
85            let delta = posting.doc_id - prev_doc_id;
86            write_vint(writer, delta as u64)?;
87            write_vint(writer, posting.term_freq as u64)?;
88            prev_doc_id = posting.doc_id;
89        }
90
91        Ok(())
92    }
93
94    /// Deserialize from bytes
95    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
96        let count = read_vint(reader)? as usize;
97        let mut postings = Vec::with_capacity(count);
98
99        let mut prev_doc_id = 0u32;
100        for _ in 0..count {
101            let delta = read_vint(reader)? as u32;
102            let term_freq = read_vint(reader)? as u32;
103            let doc_id = prev_doc_id + delta;
104            postings.push(Posting { doc_id, term_freq });
105            prev_doc_id = doc_id;
106        }
107
108        Ok(Self { postings })
109    }
110}
111
112/// Iterator over posting list that supports seeking
113pub struct PostingListIterator<'a> {
114    postings: &'a [Posting],
115    position: usize,
116}
117
118impl<'a> PostingListIterator<'a> {
119    pub fn new(posting_list: &'a PostingList) -> Self {
120        Self {
121            postings: &posting_list.postings,
122            position: 0,
123        }
124    }
125
126    /// Current document ID, or TERMINATED if exhausted
127    pub fn doc(&self) -> DocId {
128        if self.position < self.postings.len() {
129            self.postings[self.position].doc_id
130        } else {
131            TERMINATED
132        }
133    }
134
135    /// Current term frequency
136    pub fn term_freq(&self) -> u32 {
137        if self.position < self.postings.len() {
138            self.postings[self.position].term_freq
139        } else {
140            0
141        }
142    }
143
144    /// Advance to next posting, returns new doc_id or TERMINATED
145    pub fn advance(&mut self) -> DocId {
146        self.position += 1;
147        self.doc()
148    }
149
150    /// Seek to first doc_id >= target (binary search on remaining postings)
151    pub fn seek(&mut self, target: DocId) -> DocId {
152        let remaining = &self.postings[self.position..];
153        let offset = remaining.partition_point(|p| p.doc_id < target);
154        self.position += offset;
155        self.doc()
156    }
157
158    /// Size hint for remaining elements
159    pub fn size_hint(&self) -> usize {
160        self.postings.len().saturating_sub(self.position)
161    }
162}
163
164/// Sentinel value indicating iterator is exhausted
165pub const TERMINATED: DocId = DocId::MAX;
166
167/// Block-based posting list with 2-level skip index.
168///
169/// Each block contains up to `BLOCK_SIZE` postings encoded as packed bit-width arrays.
170/// Skip entries use a compact 2-level structure for cache-friendly seeking:
171/// - **Level-0** (16 bytes/block): `first_doc`, `last_doc`, `offset`, `max_weight`
172/// - **Level-1** (4 bytes/group): `last_doc` per `L1_INTERVAL` blocks
173///
174/// Seek algorithm: binary search L1, then linear scan ≤`L1_INTERVAL` L0 entries.
175pub const BLOCK_SIZE: usize = 128;
176
177/// Number of L0 blocks per L1 skip entry.
178const L1_INTERVAL: usize = 8;
179
180/// Compact level-0 skip entry — 16 bytes.
181/// `length` is omitted: computable from the block's 8-byte header.
182const L0_SIZE: usize = 16;
183
184/// Level-1 skip entry — 4 bytes (just `last_doc`).
185const L1_SIZE: usize = 4;
186
187/// Footer: stream_len(8) + l0_count(4) + l1_count(4) + doc_count(4) + max_tf(4) = 24 bytes.
188const FOOTER_SIZE: usize = 24;
189
190/// Read a compact L0 entry from raw bytes at the given index.
191///
192/// Uses a single bounds check (`[..L0_SIZE]`) instead of 4× `try_into().unwrap()`.
193#[inline]
194fn read_l0(bytes: &[u8], idx: usize) -> (u32, u32, u32, f32) {
195    let b = &bytes[idx * L0_SIZE..][..L0_SIZE];
196    let first_doc = u32::from_le_bytes([b[0], b[1], b[2], b[3]]);
197    let last_doc = u32::from_le_bytes([b[4], b[5], b[6], b[7]]);
198    let offset = u32::from_le_bytes([b[8], b[9], b[10], b[11]]);
199    let max_weight = f32::from_le_bytes([b[12], b[13], b[14], b[15]]);
200    (first_doc, last_doc, offset, max_weight)
201}
202
203/// Write a compact L0 entry.
204#[inline]
205fn write_l0(buf: &mut Vec<u8>, first_doc: u32, last_doc: u32, offset: u32, max_weight: f32) {
206    buf.extend_from_slice(&first_doc.to_le_bytes());
207    buf.extend_from_slice(&last_doc.to_le_bytes());
208    buf.extend_from_slice(&offset.to_le_bytes());
209    buf.extend_from_slice(&max_weight.to_le_bytes());
210}
211
212/// Compute block data size from the 8-byte header at `stream[pos..]`.
213///
214/// Header: `[count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]`
215/// Data size = 8 + (count-1) × bytes_per_value(doc_id_bits) + count × bytes_per_value(tf_bits)
216#[inline]
217fn block_data_size(stream: &[u8], pos: usize) -> usize {
218    let count = u16::from_le_bytes(stream[pos..pos + 2].try_into().unwrap()) as usize;
219    let doc_rounded = simd::RoundedBitWidth::from_u8(stream[pos + 6]);
220    let tf_rounded = simd::RoundedBitWidth::from_u8(stream[pos + 7]);
221    let delta_bytes = if count > 1 {
222        (count - 1) * doc_rounded.bytes_per_value()
223    } else {
224        0
225    };
226    8 + delta_bytes + count * tf_rounded.bytes_per_value()
227}
228
229#[derive(Debug, Clone)]
230pub struct BlockPostingList {
231    /// Block data stream (packed blocks laid out sequentially).
232    stream: OwnedBytes,
233    /// Level-0 skip entries: `(first_doc, last_doc, offset, max_weight)` × `l0_count`.
234    /// 16 bytes per entry. Supports O(1) random access by block index.
235    l0_bytes: OwnedBytes,
236    /// Number of blocks (= number of L0 entries).
237    l0_count: usize,
238    /// Level-1 skip `last_doc` values — one per `L1_INTERVAL` blocks.
239    /// Stored as `Vec<u32>` for direct SIMD-accelerated `find_first_ge_u32`.
240    l1_docs: Vec<u32>,
241    /// Total posting count.
242    doc_count: u32,
243    /// Max TF across all blocks.
244    max_tf: u32,
245}
246
247impl BlockPostingList {
248    /// Read L0 entry by block index. Returns `(first_doc, last_doc, offset, max_weight)`.
249    #[inline]
250    fn read_l0_entry(&self, idx: usize) -> (u32, u32, u32, f32) {
251        read_l0(&self.l0_bytes, idx)
252    }
253
254    /// Build from a posting list.
255    ///
256    /// Block format (8-byte header + packed arrays):
257    /// ```text
258    /// [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
259    /// [packed doc_id deltas: (count-1) × bytes_per_value(doc_id_bits)]
260    /// [packed tfs: count × bytes_per_value(tf_bits)]
261    /// ```
262    pub fn from_posting_list(list: &PostingList) -> io::Result<Self> {
263        let mut stream: Vec<u8> = Vec::new();
264        let mut l0_buf: Vec<u8> = Vec::new();
265        let mut l1_docs: Vec<u32> = Vec::new();
266        let mut l0_count = 0usize;
267        let mut max_tf = 0u32;
268
269        let postings = &list.postings;
270        let mut i = 0;
271
272        // Temp buffers reused across blocks
273        let mut deltas = Vec::with_capacity(BLOCK_SIZE);
274        let mut tf_buf = Vec::with_capacity(BLOCK_SIZE);
275
276        while i < postings.len() {
277            if stream.len() > u32::MAX as usize {
278                return Err(io::Error::new(
279                    io::ErrorKind::InvalidData,
280                    "posting list stream exceeds u32::MAX bytes",
281                ));
282            }
283            let block_start = stream.len() as u32;
284            let block_end = (i + BLOCK_SIZE).min(postings.len());
285            let block = &postings[i..block_end];
286            let count = block.len();
287
288            // Compute block's max term frequency for block-max pruning
289            let block_max_tf = block.iter().map(|p| p.term_freq).max().unwrap_or(0);
290            max_tf = max_tf.max(block_max_tf);
291
292            let base_doc_id = block.first().unwrap().doc_id;
293            let last_doc_id = block.last().unwrap().doc_id;
294
295            // Delta-encode doc IDs (skip first — stored in header)
296            deltas.clear();
297            let mut prev = base_doc_id;
298            for posting in block.iter().skip(1) {
299                deltas.push(posting.doc_id - prev);
300                prev = posting.doc_id;
301            }
302            let max_delta = deltas.iter().copied().max().unwrap_or(0);
303            let doc_id_bits = simd::round_bit_width(simd::bits_needed(max_delta));
304
305            // Collect TFs
306            tf_buf.clear();
307            tf_buf.extend(block.iter().map(|p| p.term_freq));
308            let tf_bits = simd::round_bit_width(simd::bits_needed(block_max_tf));
309
310            // Write 8-byte header: [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
311            stream.write_u16::<LittleEndian>(count as u16)?;
312            stream.write_u32::<LittleEndian>(base_doc_id)?;
313            stream.push(doc_id_bits);
314            stream.push(tf_bits);
315
316            // Write packed doc_id deltas ((count-1) values)
317            if count > 1 {
318                let rounded = simd::RoundedBitWidth::from_u8(doc_id_bits);
319                let byte_count = (count - 1) * rounded.bytes_per_value();
320                let start = stream.len();
321                stream.resize(start + byte_count, 0);
322                simd::pack_rounded(&deltas, rounded, &mut stream[start..]);
323            }
324
325            // Write packed TFs (count values)
326            {
327                let rounded = simd::RoundedBitWidth::from_u8(tf_bits);
328                let byte_count = count * rounded.bytes_per_value();
329                let start = stream.len();
330                stream.resize(start + byte_count, 0);
331                simd::pack_rounded(&tf_buf, rounded, &mut stream[start..]);
332            }
333
334            // L0 skip entry
335            write_l0(
336                &mut l0_buf,
337                base_doc_id,
338                last_doc_id,
339                block_start,
340                block_max_tf as f32,
341            );
342            l0_count += 1;
343
344            // L1 entry at the end of each L1_INTERVAL group
345            if l0_count.is_multiple_of(L1_INTERVAL) {
346                l1_docs.push(last_doc_id);
347            }
348
349            i = block_end;
350        }
351
352        // Final L1 entry for partial group
353        if !l0_count.is_multiple_of(L1_INTERVAL) && l0_count > 0 {
354            let (_, last_doc, _, _) = read_l0(&l0_buf, l0_count - 1);
355            l1_docs.push(last_doc);
356        }
357
358        Ok(Self {
359            stream: OwnedBytes::new(stream),
360            l0_bytes: OwnedBytes::new(l0_buf),
361            l0_count,
362            l1_docs,
363            doc_count: postings.len() as u32,
364            max_tf,
365        })
366    }
367
368    /// Serialize the block posting list (footer-based: stream first).
369    ///
370    /// Format:
371    /// ```text
372    /// [stream: block data]
373    /// [L0 entries: l0_count × 16 bytes (first_doc, last_doc, offset, max_weight)]
374    /// [L1 entries: l1_count × 4 bytes (last_doc)]
375    /// [footer: stream_len(8) + l0_count(4) + l1_count(4) + doc_count(4) + max_tf(4) = 24 bytes]
376    /// ```
377    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
378        writer.write_all(&self.stream)?;
379        writer.write_all(&self.l0_bytes)?;
380        for &doc in &self.l1_docs {
381            writer.write_u32::<LittleEndian>(doc)?;
382        }
383
384        // Footer (24 bytes)
385        writer.write_u64::<LittleEndian>(self.stream.len() as u64)?;
386        writer.write_u32::<LittleEndian>(self.l0_count as u32)?;
387        writer.write_u32::<LittleEndian>(self.l1_docs.len() as u32)?;
388        writer.write_u32::<LittleEndian>(self.doc_count)?;
389        writer.write_u32::<LittleEndian>(self.max_tf)?;
390
391        Ok(())
392    }
393
394    /// Deserialize from a byte slice (footer-based format).
395    pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
396        if raw.len() < FOOTER_SIZE {
397            return Err(io::Error::new(
398                io::ErrorKind::InvalidData,
399                "posting data too short",
400            ));
401        }
402
403        let f = raw.len() - FOOTER_SIZE;
404        let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
405        let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
406        let l1_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap()) as usize;
407        let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
408        let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
409
410        let l0_start = stream_len;
411        let l0_end = l0_start + l0_count * L0_SIZE;
412        let l1_start = l0_end;
413
414        let l1_docs = Self::extract_l1_docs(&raw[l1_start..], l1_count);
415
416        Ok(Self {
417            stream: OwnedBytes::new(raw[..stream_len].to_vec()),
418            l0_bytes: OwnedBytes::new(raw[l0_start..l0_end].to_vec()),
419            l0_count,
420            l1_docs,
421            doc_count,
422            max_tf,
423        })
424    }
425
426    /// Zero-copy deserialization from OwnedBytes.
427    /// Stream and L0 are sliced from the source without copying.
428    /// L1 is extracted into a `Vec<u32>` for SIMD-friendly access (tiny: ≤ N/8 entries).
429    pub fn deserialize_zero_copy(raw: OwnedBytes) -> io::Result<Self> {
430        if raw.len() < FOOTER_SIZE {
431            return Err(io::Error::new(
432                io::ErrorKind::InvalidData,
433                "posting data too short",
434            ));
435        }
436
437        let f = raw.len() - FOOTER_SIZE;
438        let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
439        let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
440        let l1_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap()) as usize;
441        let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
442        let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
443
444        let l0_start = stream_len;
445        let l0_end = l0_start + l0_count * L0_SIZE;
446        let l1_start = l0_end;
447
448        let l1_docs = Self::extract_l1_docs(&raw[l1_start..], l1_count);
449
450        Ok(Self {
451            stream: raw.slice(0..stream_len),
452            l0_bytes: raw.slice(l0_start..l0_end),
453            l0_count,
454            l1_docs,
455            doc_count,
456            max_tf,
457        })
458    }
459
460    /// Extract L1 last_doc values from raw LE bytes into a Vec<u32>.
461    fn extract_l1_docs(bytes: &[u8], count: usize) -> Vec<u32> {
462        let mut docs = Vec::with_capacity(count);
463        for i in 0..count {
464            let p = i * L1_SIZE;
465            docs.push(u32::from_le_bytes(bytes[p..p + 4].try_into().unwrap()));
466        }
467        docs
468    }
469
470    pub fn doc_count(&self) -> u32 {
471        self.doc_count
472    }
473
474    /// Get maximum term frequency (for MaxScore upper bound computation)
475    pub fn max_tf(&self) -> u32 {
476        self.max_tf
477    }
478
479    /// Get number of blocks
480    pub fn num_blocks(&self) -> usize {
481        self.l0_count
482    }
483
484    /// Get block's max term frequency for block-max pruning
485    pub fn block_max_tf(&self, block_idx: usize) -> Option<u32> {
486        if block_idx >= self.l0_count {
487            return None;
488        }
489        let (_, _, _, max_weight) = self.read_l0_entry(block_idx);
490        Some(max_weight as u32)
491    }
492
493    /// Concatenate blocks from multiple posting lists with doc_id remapping.
494    /// This is O(num_blocks) instead of O(num_postings).
495    pub fn concatenate_blocks(sources: &[(BlockPostingList, u32)]) -> io::Result<Self> {
496        let mut stream: Vec<u8> = Vec::new();
497        let mut l0_buf: Vec<u8> = Vec::new();
498        let mut l1_docs: Vec<u32> = Vec::new();
499        let mut l0_count = 0usize;
500        let mut total_docs = 0u32;
501        let mut max_tf = 0u32;
502
503        for (source, doc_offset) in sources {
504            max_tf = max_tf.max(source.max_tf);
505            for block_idx in 0..source.num_blocks() {
506                let (first_doc, last_doc, offset, max_weight) = source.read_l0_entry(block_idx);
507                let blk_size = block_data_size(&source.stream, offset as usize);
508                let block_bytes = &source.stream[offset as usize..offset as usize + blk_size];
509
510                let count = u16::from_le_bytes(block_bytes[0..2].try_into().unwrap());
511                if stream.len() > u32::MAX as usize {
512                    return Err(io::Error::new(
513                        io::ErrorKind::InvalidData,
514                        "posting list stream exceeds u32::MAX bytes during concatenation",
515                    ));
516                }
517                let new_offset = stream.len() as u32;
518
519                // Write patched header + copy packed arrays verbatim
520                stream.write_u16::<LittleEndian>(count)?;
521                stream.write_u32::<LittleEndian>(first_doc + doc_offset)?;
522                stream.extend_from_slice(&block_bytes[6..]);
523
524                let new_last = last_doc + doc_offset;
525                write_l0(
526                    &mut l0_buf,
527                    first_doc + doc_offset,
528                    new_last,
529                    new_offset,
530                    max_weight,
531                );
532                l0_count += 1;
533                total_docs += count as u32;
534
535                if l0_count.is_multiple_of(L1_INTERVAL) {
536                    l1_docs.push(new_last);
537                }
538            }
539        }
540
541        // Final L1 entry for partial group
542        if !l0_count.is_multiple_of(L1_INTERVAL) && l0_count > 0 {
543            let (_, last_doc, _, _) = read_l0(&l0_buf, l0_count - 1);
544            l1_docs.push(last_doc);
545        }
546
547        Ok(Self {
548            stream: OwnedBytes::new(stream),
549            l0_bytes: OwnedBytes::new(l0_buf),
550            l0_count,
551            l1_docs,
552            doc_count: total_docs,
553            max_tf,
554        })
555    }
556
557    /// Streaming merge: write blocks directly to output writer (bounded memory).
558    ///
559    /// **Zero-materializing**: reads L0 entries directly from source bytes
560    /// (mmap or &[u8]) without parsing into Vecs. Block sizes computed from
561    /// the 8-byte header (deterministic with packed encoding).
562    ///
563    /// Output L0 + L1 are buffered (bounded O(total_blocks × 16 + total_blocks/8 × 4)).
564    /// Block data flows source → output writer without intermediate buffering.
565    ///
566    /// Returns `(doc_count, bytes_written)`.
567    pub fn concatenate_streaming<W: Write>(
568        sources: &[(&[u8], u32)], // (serialized_bytes, doc_offset)
569        writer: &mut W,
570    ) -> io::Result<(u32, usize)> {
571        struct SourceMeta {
572            stream_len: usize,
573            l0_count: usize,
574        }
575
576        let mut metas: Vec<SourceMeta> = Vec::with_capacity(sources.len());
577        let mut total_docs = 0u32;
578        let mut merged_max_tf = 0u32;
579
580        for (raw, _) in sources {
581            if raw.len() < FOOTER_SIZE {
582                continue;
583            }
584            let f = raw.len() - FOOTER_SIZE;
585            let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
586            let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
587            // l1_count not needed — we rebuild L1
588            let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
589            let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
590            total_docs += doc_count;
591            merged_max_tf = merged_max_tf.max(max_tf);
592            metas.push(SourceMeta {
593                stream_len,
594                l0_count,
595            });
596        }
597
598        // Phase 1: Stream block data, reading L0 entries on-the-fly.
599        // Accumulate output L0 + L1 (bounded).
600        let mut out_l0: Vec<u8> = Vec::new();
601        let mut out_l1_docs: Vec<u32> = Vec::new();
602        let mut out_l0_count = 0usize;
603        let mut stream_written = 0u64;
604        let mut patch_buf = [0u8; 8];
605
606        for (src_idx, meta) in metas.iter().enumerate() {
607            let (raw, doc_offset) = &sources[src_idx];
608            let l0_base = meta.stream_len; // L0 entries start right after stream
609            let src_stream = &raw[..meta.stream_len];
610
611            for i in 0..meta.l0_count {
612                // Read source L0 entry directly from raw bytes
613                let (first_doc, last_doc, offset, max_weight) = read_l0(&raw[l0_base..], i);
614
615                // Compute block size from header
616                let blk_size = block_data_size(src_stream, offset as usize);
617                let block = &src_stream[offset as usize..offset as usize + blk_size];
618
619                // Write output L0 entry
620                let new_last = last_doc + doc_offset;
621                if stream_written > u32::MAX as u64 {
622                    return Err(io::Error::new(
623                        io::ErrorKind::InvalidData,
624                        "posting list stream exceeds u32::MAX bytes during streaming merge",
625                    ));
626                }
627                write_l0(
628                    &mut out_l0,
629                    first_doc + doc_offset,
630                    new_last,
631                    stream_written as u32,
632                    max_weight,
633                );
634                out_l0_count += 1;
635
636                // L1 entry at group boundary
637                if out_l0_count.is_multiple_of(L1_INTERVAL) {
638                    out_l1_docs.push(new_last);
639                }
640
641                // Patch 8-byte header: [count: u16][first_doc: u32][bits: 2 bytes]
642                patch_buf.copy_from_slice(&block[0..8]);
643                let blk_first = u32::from_le_bytes(patch_buf[2..6].try_into().unwrap());
644                patch_buf[2..6].copy_from_slice(&(blk_first + doc_offset).to_le_bytes());
645                writer.write_all(&patch_buf)?;
646                writer.write_all(&block[8..])?;
647
648                stream_written += blk_size as u64;
649            }
650        }
651
652        // Final L1 entry for partial group
653        if !out_l0_count.is_multiple_of(L1_INTERVAL) && out_l0_count > 0 {
654            let (_, last_doc, _, _) = read_l0(&out_l0, out_l0_count - 1);
655            out_l1_docs.push(last_doc);
656        }
657
658        // Phase 2: Write L0 + L1 + footer
659        writer.write_all(&out_l0)?;
660        for &doc in &out_l1_docs {
661            writer.write_u32::<LittleEndian>(doc)?;
662        }
663
664        writer.write_u64::<LittleEndian>(stream_written)?;
665        writer.write_u32::<LittleEndian>(out_l0_count as u32)?;
666        writer.write_u32::<LittleEndian>(out_l1_docs.len() as u32)?;
667        writer.write_u32::<LittleEndian>(total_docs)?;
668        writer.write_u32::<LittleEndian>(merged_max_tf)?;
669
670        let l1_bytes_len = out_l1_docs.len() * L1_SIZE;
671        let total_bytes = stream_written as usize + out_l0.len() + l1_bytes_len + FOOTER_SIZE;
672        Ok((total_docs, total_bytes))
673    }
674
675    /// Decode a specific block into caller-provided buffers.
676    ///
677    /// Returns `true` if the block was decoded, `false` if `block_idx` is out of range.
678    /// Reuses `doc_ids` and `tfs` buffers (cleared before filling).
679    ///
680    /// Uses SIMD-accelerated unpack for 8/16/32-bit packed arrays.
681    pub fn decode_block_into(
682        &self,
683        block_idx: usize,
684        doc_ids: &mut Vec<u32>,
685        tfs: &mut Vec<u32>,
686    ) -> bool {
687        if block_idx >= self.l0_count {
688            return false;
689        }
690
691        let (_, _, offset, _) = self.read_l0_entry(block_idx);
692        let pos = offset as usize;
693        let blk_size = block_data_size(&self.stream, pos);
694        let block_data = &self.stream[pos..pos + blk_size];
695
696        // 8-byte header: [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
697        let count = u16::from_le_bytes(block_data[0..2].try_into().unwrap()) as usize;
698        let first_doc = u32::from_le_bytes(block_data[2..6].try_into().unwrap());
699        let doc_id_bits = block_data[6];
700        let tf_bits = block_data[7];
701
702        // Decode doc IDs: unpack deltas + prefix sum
703        doc_ids.clear();
704        doc_ids.resize(count, 0);
705        doc_ids[0] = first_doc;
706
707        let doc_rounded = simd::RoundedBitWidth::from_u8(doc_id_bits);
708        let deltas_bytes = if count > 1 {
709            (count - 1) * doc_rounded.bytes_per_value()
710        } else {
711            0
712        };
713
714        if count > 1 {
715            simd::unpack_rounded(
716                &block_data[8..8 + deltas_bytes],
717                doc_rounded,
718                &mut doc_ids[1..],
719                count - 1,
720            );
721            for i in 1..count {
722                doc_ids[i] += doc_ids[i - 1];
723            }
724        }
725
726        // Decode TFs
727        tfs.clear();
728        tfs.resize(count, 0);
729        let tf_rounded = simd::RoundedBitWidth::from_u8(tf_bits);
730        let tfs_start = 8 + deltas_bytes;
731        simd::unpack_rounded(
732            &block_data[tfs_start..tfs_start + count * tf_rounded.bytes_per_value()],
733            tf_rounded,
734            tfs,
735            count,
736        );
737
738        true
739    }
740
741    /// First doc_id of a block (from L0 skip entry). Returns `None` if out of range.
742    #[inline]
743    pub fn block_first_doc(&self, block_idx: usize) -> Option<DocId> {
744        if block_idx >= self.l0_count {
745            return None;
746        }
747        let (first_doc, _, _, _) = self.read_l0_entry(block_idx);
748        Some(first_doc)
749    }
750
751    /// Last doc_id of a block (from L0 skip entry). Returns `None` if out of range.
752    #[inline]
753    pub fn block_last_doc(&self, block_idx: usize) -> Option<DocId> {
754        if block_idx >= self.l0_count {
755            return None;
756        }
757        let (_, last_doc, _, _) = self.read_l0_entry(block_idx);
758        Some(last_doc)
759    }
760
761    /// Find the first block whose `last_doc >= target`, starting from `from_block`.
762    ///
763    /// Uses SIMD-accelerated linear scan:
764    /// 1. `find_first_ge_u32` on the contiguous L1 `last_doc` array
765    /// 2. Extract ≤`L1_INTERVAL` L0 `last_doc` values into a stack buffer → `find_first_ge_u32`
766    ///
767    /// Returns `None` if no block contains `target`.
768    pub fn seek_block(&self, target: DocId, from_block: usize) -> Option<usize> {
769        if from_block >= self.l0_count {
770            return None;
771        }
772
773        let from_l1 = from_block / L1_INTERVAL;
774
775        // SIMD scan L1 to find the group containing target
776        let l1_idx = if !self.l1_docs.is_empty() {
777            let idx = from_l1 + simd::find_first_ge_u32(&self.l1_docs[from_l1..], target);
778            if idx >= self.l1_docs.len() {
779                return None;
780            }
781            idx
782        } else {
783            return None;
784        };
785
786        // Extract L0 last_doc values within the group into a stack buffer for SIMD scan
787        let start = (l1_idx * L1_INTERVAL).max(from_block);
788        let end = ((l1_idx + 1) * L1_INTERVAL).min(self.l0_count);
789        let count = end - start;
790
791        let mut last_docs = [u32::MAX; L1_INTERVAL];
792        for (j, idx) in (start..end).enumerate() {
793            let (_, ld, _, _) = read_l0(&self.l0_bytes, idx);
794            last_docs[j] = ld;
795        }
796        let within = simd::find_first_ge_u32(&last_docs[..count], target);
797        let block_idx = start + within;
798
799        if block_idx < self.l0_count {
800            Some(block_idx)
801        } else {
802            None
803        }
804    }
805
806    /// Create an iterator with skip support
807    pub fn iterator(&self) -> BlockPostingIterator<'_> {
808        BlockPostingIterator::new(self)
809    }
810
811    /// Create an owned iterator that doesn't borrow self
812    pub fn into_iterator(self) -> BlockPostingIterator<'static> {
813        BlockPostingIterator::owned(self)
814    }
815}
816
817/// Iterator over block posting list with skip support
818/// Can be either borrowed or owned via Cow
819///
820/// Uses struct-of-arrays layout: separate Vec<u32> for doc_ids and term_freqs.
821/// This is more cache-friendly for SIMD seek (contiguous doc_ids) and halves
822/// memory vs the previous AoS + separate doc_ids approach.
823pub struct BlockPostingIterator<'a> {
824    block_list: std::borrow::Cow<'a, BlockPostingList>,
825    current_block: usize,
826    block_doc_ids: Vec<u32>,
827    block_tfs: Vec<u32>,
828    position_in_block: usize,
829    exhausted: bool,
830}
831
832impl<'a> BlockPostingIterator<'a> {
833    fn new(block_list: &'a BlockPostingList) -> Self {
834        let exhausted = block_list.l0_count == 0;
835        let mut iter = Self {
836            block_list: std::borrow::Cow::Borrowed(block_list),
837            current_block: 0,
838            block_doc_ids: Vec::with_capacity(BLOCK_SIZE),
839            block_tfs: Vec::with_capacity(BLOCK_SIZE),
840            position_in_block: 0,
841            exhausted,
842        };
843        if !iter.exhausted {
844            iter.load_block(0);
845        }
846        iter
847    }
848
849    fn owned(block_list: BlockPostingList) -> BlockPostingIterator<'static> {
850        let exhausted = block_list.l0_count == 0;
851        let mut iter = BlockPostingIterator {
852            block_list: std::borrow::Cow::Owned(block_list),
853            current_block: 0,
854            block_doc_ids: Vec::with_capacity(BLOCK_SIZE),
855            block_tfs: Vec::with_capacity(BLOCK_SIZE),
856            position_in_block: 0,
857            exhausted,
858        };
859        if !iter.exhausted {
860            iter.load_block(0);
861        }
862        iter
863    }
864
865    fn load_block(&mut self, block_idx: usize) {
866        if block_idx >= self.block_list.l0_count {
867            self.exhausted = true;
868            return;
869        }
870
871        self.current_block = block_idx;
872        self.position_in_block = 0;
873
874        self.block_list
875            .decode_block_into(block_idx, &mut self.block_doc_ids, &mut self.block_tfs);
876    }
877
878    pub fn doc(&self) -> DocId {
879        if self.exhausted {
880            TERMINATED
881        } else if self.position_in_block < self.block_doc_ids.len() {
882            self.block_doc_ids[self.position_in_block]
883        } else {
884            TERMINATED
885        }
886    }
887
888    pub fn term_freq(&self) -> u32 {
889        if self.exhausted || self.position_in_block >= self.block_tfs.len() {
890            0
891        } else {
892            self.block_tfs[self.position_in_block]
893        }
894    }
895
896    pub fn advance(&mut self) -> DocId {
897        if self.exhausted {
898            return TERMINATED;
899        }
900
901        self.position_in_block += 1;
902        if self.position_in_block >= self.block_doc_ids.len() {
903            self.load_block(self.current_block + 1);
904        }
905        self.doc()
906    }
907
908    pub fn seek(&mut self, target: DocId) -> DocId {
909        if self.exhausted {
910            return TERMINATED;
911        }
912
913        // SIMD-accelerated 2-level seek (forward from current block)
914        let block_idx = match self.block_list.seek_block(target, self.current_block) {
915            Some(idx) => idx,
916            None => {
917                self.exhausted = true;
918                return TERMINATED;
919            }
920        };
921
922        if block_idx != self.current_block {
923            self.load_block(block_idx);
924        }
925
926        // SIMD linear scan within block on cached doc_ids
927        let remaining = &self.block_doc_ids[self.position_in_block..];
928        let pos = crate::structures::simd::find_first_ge_u32(remaining, target);
929        self.position_in_block += pos;
930
931        if self.position_in_block >= self.block_doc_ids.len() {
932            self.load_block(self.current_block + 1);
933        }
934        self.doc()
935    }
936
937    /// Skip to the next block, returning the first doc_id in the new block
938    /// This is used for block-max pruning when the current block's
939    /// max score can't beat the threshold.
940    pub fn skip_to_next_block(&mut self) -> DocId {
941        if self.exhausted {
942            return TERMINATED;
943        }
944        self.load_block(self.current_block + 1);
945        self.doc()
946    }
947
948    /// Get the current block index
949    #[inline]
950    pub fn current_block_idx(&self) -> usize {
951        self.current_block
952    }
953
954    /// Get total number of blocks
955    #[inline]
956    pub fn num_blocks(&self) -> usize {
957        self.block_list.l0_count
958    }
959
960    /// Get the current block's max term frequency for block-max pruning
961    #[inline]
962    pub fn current_block_max_tf(&self) -> u32 {
963        if self.exhausted || self.current_block >= self.block_list.l0_count {
964            0
965        } else {
966            let (_, _, _, max_weight) = self.block_list.read_l0_entry(self.current_block);
967            max_weight as u32
968        }
969    }
970}
971
972#[cfg(test)]
973mod tests {
974    use super::*;
975
976    #[test]
977    fn test_posting_list_basic() {
978        let mut list = PostingList::new();
979        list.push(1, 2);
980        list.push(5, 1);
981        list.push(10, 3);
982
983        assert_eq!(list.len(), 3);
984
985        let mut iter = PostingListIterator::new(&list);
986        assert_eq!(iter.doc(), 1);
987        assert_eq!(iter.term_freq(), 2);
988
989        assert_eq!(iter.advance(), 5);
990        assert_eq!(iter.term_freq(), 1);
991
992        assert_eq!(iter.advance(), 10);
993        assert_eq!(iter.term_freq(), 3);
994
995        assert_eq!(iter.advance(), TERMINATED);
996    }
997
998    #[test]
999    fn test_posting_list_serialization() {
1000        let mut list = PostingList::new();
1001        for i in 0..100 {
1002            list.push(i * 3, (i % 5) + 1);
1003        }
1004
1005        let mut buffer = Vec::new();
1006        list.serialize(&mut buffer).unwrap();
1007
1008        let deserialized = PostingList::deserialize(&mut &buffer[..]).unwrap();
1009        assert_eq!(deserialized.len(), list.len());
1010
1011        for (a, b) in list.iter().zip(deserialized.iter()) {
1012            assert_eq!(a, b);
1013        }
1014    }
1015
1016    #[test]
1017    fn test_posting_list_seek() {
1018        let mut list = PostingList::new();
1019        for i in 0..100 {
1020            list.push(i * 2, 1);
1021        }
1022
1023        let mut iter = PostingListIterator::new(&list);
1024
1025        assert_eq!(iter.seek(50), 50);
1026        assert_eq!(iter.seek(51), 52);
1027        assert_eq!(iter.seek(200), TERMINATED);
1028    }
1029
1030    #[test]
1031    fn test_block_posting_list() {
1032        let mut list = PostingList::new();
1033        for i in 0..500 {
1034            list.push(i * 2, (i % 10) + 1);
1035        }
1036
1037        let block_list = BlockPostingList::from_posting_list(&list).unwrap();
1038        assert_eq!(block_list.doc_count(), 500);
1039
1040        let mut iter = block_list.iterator();
1041        assert_eq!(iter.doc(), 0);
1042        assert_eq!(iter.term_freq(), 1);
1043
1044        // Test seek across blocks
1045        assert_eq!(iter.seek(500), 500);
1046        assert_eq!(iter.seek(998), 998);
1047        assert_eq!(iter.seek(1000), TERMINATED);
1048    }
1049
1050    #[test]
1051    fn test_block_posting_list_serialization() {
1052        let mut list = PostingList::new();
1053        for i in 0..300 {
1054            list.push(i * 3, i + 1);
1055        }
1056
1057        let block_list = BlockPostingList::from_posting_list(&list).unwrap();
1058
1059        let mut buffer = Vec::new();
1060        block_list.serialize(&mut buffer).unwrap();
1061
1062        let deserialized = BlockPostingList::deserialize(&buffer[..]).unwrap();
1063        assert_eq!(deserialized.doc_count(), block_list.doc_count());
1064
1065        // Verify iteration produces same results
1066        let mut iter1 = block_list.iterator();
1067        let mut iter2 = deserialized.iterator();
1068
1069        while iter1.doc() != TERMINATED {
1070            assert_eq!(iter1.doc(), iter2.doc());
1071            assert_eq!(iter1.term_freq(), iter2.term_freq());
1072            iter1.advance();
1073            iter2.advance();
1074        }
1075        assert_eq!(iter2.doc(), TERMINATED);
1076    }
1077
1078    /// Helper: collect all (doc_id, tf) from a BlockPostingIterator
1079    fn collect_postings(bpl: &BlockPostingList) -> Vec<(u32, u32)> {
1080        let mut result = Vec::new();
1081        let mut it = bpl.iterator();
1082        while it.doc() != TERMINATED {
1083            result.push((it.doc(), it.term_freq()));
1084            it.advance();
1085        }
1086        result
1087    }
1088
1089    /// Helper: build a BlockPostingList from (doc_id, tf) pairs
1090    fn build_bpl(postings: &[(u32, u32)]) -> BlockPostingList {
1091        let mut pl = PostingList::new();
1092        for &(doc_id, tf) in postings {
1093            pl.push(doc_id, tf);
1094        }
1095        BlockPostingList::from_posting_list(&pl).unwrap()
1096    }
1097
1098    /// Helper: serialize a BlockPostingList to bytes
1099    fn serialize_bpl(bpl: &BlockPostingList) -> Vec<u8> {
1100        let mut buf = Vec::new();
1101        bpl.serialize(&mut buf).unwrap();
1102        buf
1103    }
1104
1105    #[test]
1106    fn test_concatenate_blocks_two_segments() {
1107        // Segment A: docs 0,2,4,...,198 (100 docs, tf=1..100)
1108        let a: Vec<(u32, u32)> = (0..100).map(|i| (i * 2, i + 1)).collect();
1109        let bpl_a = build_bpl(&a);
1110
1111        // Segment B: docs 0,3,6,...,297 (100 docs, tf=2..101)
1112        let b: Vec<(u32, u32)> = (0..100).map(|i| (i * 3, i + 2)).collect();
1113        let bpl_b = build_bpl(&b);
1114
1115        // Merge: segment B starts at doc_offset=200
1116        let merged =
1117            BlockPostingList::concatenate_blocks(&[(bpl_a.clone(), 0), (bpl_b.clone(), 200)])
1118                .unwrap();
1119
1120        assert_eq!(merged.doc_count(), 200);
1121
1122        let postings = collect_postings(&merged);
1123        assert_eq!(postings.len(), 200);
1124
1125        // First 100 from A (unchanged)
1126        for (i, p) in postings.iter().enumerate().take(100) {
1127            assert_eq!(*p, (i as u32 * 2, i as u32 + 1));
1128        }
1129        // Next 100 from B (doc_id += 200)
1130        for i in 0..100 {
1131            assert_eq!(postings[100 + i], (i as u32 * 3 + 200, i as u32 + 2));
1132        }
1133    }
1134
1135    #[test]
1136    fn test_concatenate_streaming_matches_blocks() {
1137        // Build 3 segments with different doc distributions
1138        let seg_a: Vec<(u32, u32)> = (0..250).map(|i| (i * 2, (i % 7) + 1)).collect();
1139        let seg_b: Vec<(u32, u32)> = (0..180).map(|i| (i * 5, (i % 3) + 1)).collect();
1140        let seg_c: Vec<(u32, u32)> = (0..90).map(|i| (i * 10, (i % 11) + 1)).collect();
1141
1142        let bpl_a = build_bpl(&seg_a);
1143        let bpl_b = build_bpl(&seg_b);
1144        let bpl_c = build_bpl(&seg_c);
1145
1146        let offset_b = 1000u32;
1147        let offset_c = 2000u32;
1148
1149        // Method 1: concatenate_blocks (in-memory reference)
1150        let ref_merged = BlockPostingList::concatenate_blocks(&[
1151            (bpl_a.clone(), 0),
1152            (bpl_b.clone(), offset_b),
1153            (bpl_c.clone(), offset_c),
1154        ])
1155        .unwrap();
1156        let mut ref_buf = Vec::new();
1157        ref_merged.serialize(&mut ref_buf).unwrap();
1158
1159        // Method 2: concatenate_streaming (footer-based, writes to output)
1160        let bytes_a = serialize_bpl(&bpl_a);
1161        let bytes_b = serialize_bpl(&bpl_b);
1162        let bytes_c = serialize_bpl(&bpl_c);
1163
1164        let sources: Vec<(&[u8], u32)> =
1165            vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
1166        let mut stream_buf = Vec::new();
1167        let (doc_count, bytes_written) =
1168            BlockPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
1169
1170        assert_eq!(doc_count, 520); // 250 + 180 + 90
1171        assert_eq!(bytes_written, stream_buf.len());
1172
1173        // Deserialize both and verify identical postings
1174        let ref_postings = collect_postings(&BlockPostingList::deserialize(&ref_buf).unwrap());
1175        let stream_postings =
1176            collect_postings(&BlockPostingList::deserialize(&stream_buf).unwrap());
1177
1178        assert_eq!(ref_postings.len(), stream_postings.len());
1179        for (i, (r, s)) in ref_postings.iter().zip(stream_postings.iter()).enumerate() {
1180            assert_eq!(r, s, "mismatch at posting {}", i);
1181        }
1182    }
1183
1184    #[test]
1185    fn test_multi_round_merge() {
1186        // Simulate 3 rounds of merging (like tiered merge policy)
1187        //
1188        // Round 0: 4 small segments built independently
1189        // Round 1: merge pairs → 2 medium segments
1190        // Round 2: merge those → 1 large segment
1191
1192        let segments: Vec<Vec<(u32, u32)>> = (0..4)
1193            .map(|seg| (0..200).map(|i| (i * 3, (i + seg * 7) % 10 + 1)).collect())
1194            .collect();
1195
1196        let bpls: Vec<BlockPostingList> = segments.iter().map(|s| build_bpl(s)).collect();
1197        let serialized: Vec<Vec<u8>> = bpls.iter().map(serialize_bpl).collect();
1198
1199        // Round 1: merge seg0+seg1 (offset=0,600), seg2+seg3 (offset=0,600)
1200        let mut merged_01 = Vec::new();
1201        let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
1202        let (dc_01, _) =
1203            BlockPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
1204        assert_eq!(dc_01, 400);
1205
1206        let mut merged_23 = Vec::new();
1207        let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
1208        let (dc_23, _) =
1209            BlockPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
1210        assert_eq!(dc_23, 400);
1211
1212        // Round 2: merge the two intermediate results (offset=0, 1200)
1213        let mut final_merged = Vec::new();
1214        let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
1215        let (dc_final, _) =
1216            BlockPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
1217        assert_eq!(dc_final, 800);
1218
1219        // Verify final result has all 800 postings with correct doc_ids
1220        let final_bpl = BlockPostingList::deserialize(&final_merged).unwrap();
1221        let postings = collect_postings(&final_bpl);
1222        assert_eq!(postings.len(), 800);
1223
1224        // Verify doc_id ordering (must be monotonically non-decreasing within segments,
1225        // and segment boundaries at 0, 600, 1200, 1800)
1226        // Seg0: 0..597, Seg1: 600..1197, Seg2: 1200..1797, Seg3: 1800..2397
1227        assert_eq!(postings[0].0, 0); // first doc of seg0
1228        assert_eq!(postings[199].0, 597); // last doc of seg0 (199*3)
1229        assert_eq!(postings[200].0, 600); // first doc of seg1 (0+600)
1230        assert_eq!(postings[399].0, 1197); // last doc of seg1 (597+600)
1231        assert_eq!(postings[400].0, 1200); // first doc of seg2
1232        assert_eq!(postings[799].0, 2397); // last doc of seg3
1233
1234        // Verify TFs preserved through two rounds of merging
1235        // Creation formula: tf = (i + seg * 7) % 10 + 1
1236        for seg in 0u32..4 {
1237            for i in 0u32..200 {
1238                let idx = (seg * 200 + i) as usize;
1239                assert_eq!(
1240                    postings[idx].1,
1241                    (i + seg * 7) % 10 + 1,
1242                    "seg{} tf[{}]",
1243                    seg,
1244                    i
1245                );
1246            }
1247        }
1248
1249        // Verify seek works on final merged result
1250        let mut it = final_bpl.iterator();
1251        assert_eq!(it.seek(600), 600);
1252        assert_eq!(it.seek(1200), 1200);
1253        assert_eq!(it.seek(2397), 2397);
1254        assert_eq!(it.seek(2398), TERMINATED);
1255    }
1256
1257    #[test]
1258    fn test_large_scale_merge() {
1259        // 5 segments × 2000 docs each = 10,000 total docs
1260        // Each segment has 16 blocks (2000/128 = 15.6 → 16 blocks)
1261        let num_segments = 5;
1262        let docs_per_segment = 2000;
1263        let docs_gap = 3; // doc_ids: 0, 3, 6, ...
1264
1265        let segments: Vec<Vec<(u32, u32)>> = (0..num_segments)
1266            .map(|seg| {
1267                (0..docs_per_segment)
1268                    .map(|i| (i as u32 * docs_gap, (i as u32 + seg as u32) % 20 + 1))
1269                    .collect()
1270            })
1271            .collect();
1272
1273        let bpls: Vec<BlockPostingList> = segments.iter().map(|s| build_bpl(s)).collect();
1274
1275        // Verify each segment has multiple blocks
1276        for bpl in &bpls {
1277            assert!(
1278                bpl.num_blocks() >= 15,
1279                "expected >=15 blocks, got {}",
1280                bpl.num_blocks()
1281            );
1282        }
1283
1284        let serialized: Vec<Vec<u8>> = bpls.iter().map(serialize_bpl).collect();
1285
1286        // Compute offsets: each segment occupies max_doc+1 doc_id space
1287        let max_doc_per_seg = (docs_per_segment as u32 - 1) * docs_gap;
1288        let offsets: Vec<u32> = (0..num_segments)
1289            .map(|i| i as u32 * (max_doc_per_seg + 1))
1290            .collect();
1291
1292        let sources: Vec<(&[u8], u32)> = serialized
1293            .iter()
1294            .zip(offsets.iter())
1295            .map(|(b, o)| (b.as_slice(), *o))
1296            .collect();
1297
1298        let mut merged = Vec::new();
1299        let (doc_count, _) =
1300            BlockPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1301        assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1302
1303        // Deserialize and verify
1304        let merged_bpl = BlockPostingList::deserialize(&merged).unwrap();
1305        let postings = collect_postings(&merged_bpl);
1306        assert_eq!(postings.len(), num_segments * docs_per_segment);
1307
1308        // Verify all doc_ids are strictly monotonically increasing across segment boundaries
1309        for i in 1..postings.len() {
1310            assert!(
1311                postings[i].0 > postings[i - 1].0 || (i % docs_per_segment == 0), // new segment can have lower absolute ID
1312                "doc_id not increasing at {}: {} vs {}",
1313                i,
1314                postings[i - 1].0,
1315                postings[i].0,
1316            );
1317        }
1318
1319        // Verify seek across all block boundaries
1320        let mut it = merged_bpl.iterator();
1321        for (seg, &expected_first) in offsets.iter().enumerate() {
1322            assert_eq!(
1323                it.seek(expected_first),
1324                expected_first,
1325                "seek to segment {} start",
1326                seg
1327            );
1328        }
1329    }
1330
1331    #[test]
1332    fn test_merge_edge_cases() {
1333        // Single doc per segment
1334        let bpl_a = build_bpl(&[(0, 5)]);
1335        let bpl_b = build_bpl(&[(0, 3)]);
1336
1337        let merged =
1338            BlockPostingList::concatenate_blocks(&[(bpl_a.clone(), 0), (bpl_b.clone(), 1)])
1339                .unwrap();
1340        assert_eq!(merged.doc_count(), 2);
1341        let p = collect_postings(&merged);
1342        assert_eq!(p, vec![(0, 5), (1, 3)]);
1343
1344        // Exactly BLOCK_SIZE docs (single full block)
1345        let exact_block: Vec<(u32, u32)> = (0..BLOCK_SIZE as u32).map(|i| (i, i % 5 + 1)).collect();
1346        let bpl_exact = build_bpl(&exact_block);
1347        assert_eq!(bpl_exact.num_blocks(), 1);
1348
1349        let bytes = serialize_bpl(&bpl_exact);
1350        let mut out = Vec::new();
1351        let sources: Vec<(&[u8], u32)> = vec![(&bytes, 0), (&bytes, BLOCK_SIZE as u32)];
1352        let (dc, _) = BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1353        assert_eq!(dc, BLOCK_SIZE as u32 * 2);
1354
1355        let merged = BlockPostingList::deserialize(&out).unwrap();
1356        let postings = collect_postings(&merged);
1357        assert_eq!(postings.len(), BLOCK_SIZE * 2);
1358        // Second segment's docs offset by BLOCK_SIZE
1359        assert_eq!(postings[BLOCK_SIZE].0, BLOCK_SIZE as u32);
1360
1361        // BLOCK_SIZE + 1 docs (two blocks: 128 + 1)
1362        let over_block: Vec<(u32, u32)> = (0..BLOCK_SIZE as u32 + 1).map(|i| (i * 2, 1)).collect();
1363        let bpl_over = build_bpl(&over_block);
1364        assert_eq!(bpl_over.num_blocks(), 2);
1365    }
1366
1367    #[test]
1368    fn test_streaming_roundtrip_single_source() {
1369        // Streaming merge with a single source should produce equivalent output to serialize
1370        let docs: Vec<(u32, u32)> = (0..500).map(|i| (i * 7, i % 15 + 1)).collect();
1371        let bpl = build_bpl(&docs);
1372        let direct = serialize_bpl(&bpl);
1373
1374        let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1375        let mut streamed = Vec::new();
1376        BlockPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1377
1378        // Both should deserialize to identical postings
1379        let p1 = collect_postings(&BlockPostingList::deserialize(&direct).unwrap());
1380        let p2 = collect_postings(&BlockPostingList::deserialize(&streamed).unwrap());
1381        assert_eq!(p1, p2);
1382    }
1383
1384    #[test]
1385    fn test_max_tf_preserved_through_merge() {
1386        // Segment A: max_tf = 50
1387        let mut a = Vec::new();
1388        for i in 0..200 {
1389            a.push((i * 2, if i == 100 { 50 } else { 1 }));
1390        }
1391        let bpl_a = build_bpl(&a);
1392        assert_eq!(bpl_a.max_tf(), 50);
1393
1394        // Segment B: max_tf = 30
1395        let mut b = Vec::new();
1396        for i in 0..200 {
1397            b.push((i * 2, if i == 50 { 30 } else { 2 }));
1398        }
1399        let bpl_b = build_bpl(&b);
1400        assert_eq!(bpl_b.max_tf(), 30);
1401
1402        // After merge, max_tf should be max(50, 30) = 50
1403        let bytes_a = serialize_bpl(&bpl_a);
1404        let bytes_b = serialize_bpl(&bpl_b);
1405        let sources: Vec<(&[u8], u32)> = vec![(&bytes_a, 0), (&bytes_b, 1000)];
1406        let mut out = Vec::new();
1407        BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1408
1409        let merged = BlockPostingList::deserialize(&out).unwrap();
1410        assert_eq!(merged.max_tf(), 50);
1411        assert_eq!(merged.doc_count(), 400);
1412    }
1413
1414    // ── 2-level skip list format tests ──────────────────────────────────
1415
1416    #[test]
1417    fn test_l0_l1_counts() {
1418        // 1 block (< L1_INTERVAL) → 1 L1 entry (partial group)
1419        let bpl = build_bpl(&(0..50u32).map(|i| (i, 1)).collect::<Vec<_>>());
1420        assert_eq!(bpl.num_blocks(), 1);
1421        assert_eq!(bpl.l1_docs.len(), 1);
1422
1423        // Exactly L1_INTERVAL blocks → 1 L1 entry (full group)
1424        let n = BLOCK_SIZE * L1_INTERVAL;
1425        let bpl = build_bpl(&(0..n as u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1426        assert_eq!(bpl.num_blocks(), L1_INTERVAL);
1427        assert_eq!(bpl.l1_docs.len(), 1);
1428
1429        // L1_INTERVAL + 1 blocks → 2 L1 entries
1430        let n = BLOCK_SIZE * L1_INTERVAL + 1;
1431        let bpl = build_bpl(&(0..n as u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1432        assert_eq!(bpl.num_blocks(), L1_INTERVAL + 1);
1433        assert_eq!(bpl.l1_docs.len(), 2);
1434
1435        // 3 × L1_INTERVAL blocks → 3 L1 entries (all full groups)
1436        let n = BLOCK_SIZE * L1_INTERVAL * 3;
1437        let bpl = build_bpl(&(0..n as u32).map(|i| (i, 1)).collect::<Vec<_>>());
1438        assert_eq!(bpl.num_blocks(), L1_INTERVAL * 3);
1439        assert_eq!(bpl.l1_docs.len(), 3);
1440    }
1441
1442    #[test]
1443    fn test_l1_last_doc_values() {
1444        // 20 blocks: 2 full L1 groups (8+8) + 1 partial (4) → 3 L1 entries
1445        let n = BLOCK_SIZE * 20;
1446        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 3, 1)).collect();
1447        let bpl = build_bpl(&docs);
1448        assert_eq!(bpl.num_blocks(), 20);
1449        assert_eq!(bpl.l1_docs.len(), 3); // ceil(20/8) = 3
1450
1451        // L1[0] = last_doc of block 7 (end of first group)
1452        let expected_l1_0 = bpl.block_last_doc(7).unwrap();
1453        assert_eq!(bpl.l1_docs[0], expected_l1_0);
1454
1455        // L1[1] = last_doc of block 15 (end of second group)
1456        let expected_l1_1 = bpl.block_last_doc(15).unwrap();
1457        assert_eq!(bpl.l1_docs[1], expected_l1_1);
1458
1459        // L1[2] = last_doc of block 19 (end of partial group)
1460        let expected_l1_2 = bpl.block_last_doc(19).unwrap();
1461        assert_eq!(bpl.l1_docs[2], expected_l1_2);
1462    }
1463
1464    #[test]
1465    fn test_seek_block_basic() {
1466        // 20 blocks spanning large doc ID range
1467        let n = BLOCK_SIZE * 20;
1468        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 10, 1)).collect();
1469        let bpl = build_bpl(&docs);
1470
1471        // Seek to doc 0 → block 0
1472        assert_eq!(bpl.seek_block(0, 0), Some(0));
1473
1474        // Seek to the first doc of each block
1475        for blk in 0..20 {
1476            let first = bpl.block_first_doc(blk).unwrap();
1477            assert_eq!(
1478                bpl.seek_block(first, 0),
1479                Some(blk),
1480                "seek to block {} first_doc",
1481                blk
1482            );
1483        }
1484
1485        // Seek to the last doc of each block
1486        for blk in 0..20 {
1487            let last = bpl.block_last_doc(blk).unwrap();
1488            assert_eq!(
1489                bpl.seek_block(last, 0),
1490                Some(blk),
1491                "seek to block {} last_doc",
1492                blk
1493            );
1494        }
1495
1496        // Seek past all docs
1497        let max_doc = bpl.block_last_doc(19).unwrap();
1498        assert_eq!(bpl.seek_block(max_doc + 1, 0), None);
1499
1500        // Seek with from_block > 0 (skip early blocks)
1501        let mid_doc = bpl.block_first_doc(10).unwrap();
1502        assert_eq!(bpl.seek_block(mid_doc, 10), Some(10));
1503        assert_eq!(
1504            bpl.seek_block(mid_doc, 11),
1505            Some(11).or(bpl.seek_block(mid_doc, 11))
1506        );
1507    }
1508
1509    #[test]
1510    fn test_seek_block_across_l1_boundaries() {
1511        // 24 blocks = 3 L1 groups of 8
1512        let n = BLOCK_SIZE * 24;
1513        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 5, 1)).collect();
1514        let bpl = build_bpl(&docs);
1515        assert_eq!(bpl.l1_docs.len(), 3);
1516
1517        // Seek into each L1 group
1518        for group in 0..3 {
1519            let blk = group * L1_INTERVAL;
1520            let target = bpl.block_first_doc(blk).unwrap();
1521            assert_eq!(
1522                bpl.seek_block(target, 0),
1523                Some(blk),
1524                "seek to group {} block {}",
1525                group,
1526                blk
1527            );
1528        }
1529
1530        // Seek to doc in the middle of group 2 (block 20)
1531        let target = bpl.block_first_doc(20).unwrap() + 1;
1532        assert_eq!(bpl.seek_block(target, 0), Some(20));
1533    }
1534
1535    #[test]
1536    fn test_block_data_size_helper() {
1537        // Build a posting list and verify block_data_size matches actual block sizes
1538        let docs: Vec<(u32, u32)> = (0..500u32).map(|i| (i * 7, (i % 20) + 1)).collect();
1539        let bpl = build_bpl(&docs);
1540
1541        for blk in 0..bpl.num_blocks() {
1542            let (_, _, offset, _) = bpl.read_l0_entry(blk);
1543            let computed_size = block_data_size(&bpl.stream, offset as usize);
1544
1545            // Verify: next block's offset - this block's offset should equal computed_size
1546            // (for all but last block)
1547            if blk + 1 < bpl.num_blocks() {
1548                let (_, _, next_offset, _) = bpl.read_l0_entry(blk + 1);
1549                assert_eq!(
1550                    computed_size,
1551                    (next_offset - offset) as usize,
1552                    "block_data_size mismatch at block {}",
1553                    blk
1554                );
1555            } else {
1556                // Last block: offset + size should equal stream length
1557                assert_eq!(
1558                    offset as usize + computed_size,
1559                    bpl.stream.len(),
1560                    "last block size mismatch"
1561                );
1562            }
1563        }
1564    }
1565
1566    #[test]
1567    fn test_l0_entry_roundtrip() {
1568        // Verify L0 entries survive serialize → deserialize
1569        let docs: Vec<(u32, u32)> = (0..1000u32).map(|i| (i * 3, (i % 10) + 1)).collect();
1570        let bpl = build_bpl(&docs);
1571
1572        let bytes = serialize_bpl(&bpl);
1573        let bpl2 = BlockPostingList::deserialize(&bytes).unwrap();
1574
1575        assert_eq!(bpl.num_blocks(), bpl2.num_blocks());
1576        for blk in 0..bpl.num_blocks() {
1577            assert_eq!(
1578                bpl.read_l0_entry(blk),
1579                bpl2.read_l0_entry(blk),
1580                "L0 entry mismatch at block {}",
1581                blk
1582            );
1583        }
1584
1585        // Verify L1 docs match
1586        assert_eq!(bpl.l1_docs, bpl2.l1_docs);
1587    }
1588
1589    #[test]
1590    fn test_zero_copy_deserialize_matches() {
1591        let docs: Vec<(u32, u32)> = (0..2000u32).map(|i| (i * 2, (i % 5) + 1)).collect();
1592        let bpl = build_bpl(&docs);
1593        let bytes = serialize_bpl(&bpl);
1594
1595        let copied = BlockPostingList::deserialize(&bytes).unwrap();
1596        let zero_copy =
1597            BlockPostingList::deserialize_zero_copy(OwnedBytes::new(bytes.clone())).unwrap();
1598
1599        // Same structure
1600        assert_eq!(copied.l0_count, zero_copy.l0_count);
1601        assert_eq!(copied.l1_docs, zero_copy.l1_docs);
1602        assert_eq!(copied.doc_count, zero_copy.doc_count);
1603        assert_eq!(copied.max_tf, zero_copy.max_tf);
1604
1605        // Same iteration
1606        let p1 = collect_postings(&copied);
1607        let p2 = collect_postings(&zero_copy);
1608        assert_eq!(p1, p2);
1609    }
1610
1611    #[test]
1612    fn test_l1_preserved_through_streaming_merge() {
1613        // Merge 3 segments, verify L1 is correctly rebuilt
1614        let seg_a = build_bpl(&(0..1000u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1615        let seg_b = build_bpl(&(0..800u32).map(|i| (i * 3, 2)).collect::<Vec<_>>());
1616        let seg_c = build_bpl(&(0..500u32).map(|i| (i * 5, 3)).collect::<Vec<_>>());
1617
1618        let bytes_a = serialize_bpl(&seg_a);
1619        let bytes_b = serialize_bpl(&seg_b);
1620        let bytes_c = serialize_bpl(&seg_c);
1621
1622        let sources: Vec<(&[u8], u32)> = vec![(&bytes_a, 0), (&bytes_b, 10000), (&bytes_c, 20000)];
1623        let mut out = Vec::new();
1624        BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1625
1626        let merged = BlockPostingList::deserialize(&out).unwrap();
1627        let expected_l1_count = merged.num_blocks().div_ceil(L1_INTERVAL);
1628        assert_eq!(merged.l1_docs.len(), expected_l1_count);
1629
1630        // Verify L1 values are correct
1631        for (i, &l1_doc) in merged.l1_docs.iter().enumerate() {
1632            let last_block_in_group = ((i + 1) * L1_INTERVAL - 1).min(merged.num_blocks() - 1);
1633            let expected = merged.block_last_doc(last_block_in_group).unwrap();
1634            assert_eq!(l1_doc, expected, "L1[{}] mismatch", i);
1635        }
1636
1637        // Verify seek_block works on merged result
1638        for blk in 0..merged.num_blocks() {
1639            let first = merged.block_first_doc(blk).unwrap();
1640            assert_eq!(merged.seek_block(first, 0), Some(blk));
1641        }
1642    }
1643
1644    #[test]
1645    fn test_seek_block_single_block() {
1646        // Edge case: single block (< L1_INTERVAL)
1647        let bpl = build_bpl(&[(0, 1), (10, 2), (20, 3)]);
1648        assert_eq!(bpl.num_blocks(), 1);
1649        assert_eq!(bpl.l1_docs.len(), 1);
1650
1651        assert_eq!(bpl.seek_block(0, 0), Some(0));
1652        assert_eq!(bpl.seek_block(10, 0), Some(0));
1653        assert_eq!(bpl.seek_block(20, 0), Some(0));
1654        assert_eq!(bpl.seek_block(21, 0), None);
1655    }
1656
1657    #[test]
1658    fn test_footer_size() {
1659        // Verify serialized size = stream + L0 + L1 + FOOTER_SIZE
1660        let docs: Vec<(u32, u32)> = (0..500u32).map(|i| (i * 2, 1)).collect();
1661        let bpl = build_bpl(&docs);
1662        let bytes = serialize_bpl(&bpl);
1663
1664        let expected =
1665            bpl.stream.len() + bpl.l0_count * L0_SIZE + bpl.l1_docs.len() * L1_SIZE + FOOTER_SIZE;
1666        assert_eq!(bytes.len(), expected);
1667    }
1668
1669    #[test]
1670    fn test_seek_block_from_block_skips_earlier() {
1671        // 16 blocks: seek with from_block should skip earlier blocks
1672        let n = BLOCK_SIZE * 16;
1673        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 3, 1)).collect();
1674        let bpl = build_bpl(&docs);
1675
1676        // Target is in block 5, but from_block=8 → should find block >= 8
1677        let target_in_5 = bpl.block_first_doc(5).unwrap() + 1;
1678        // from_block=8 means we only look at blocks 8+
1679        // target_in_5 < last_doc of block 8, so seek_block(target, 8) should return 8
1680        let result = bpl.seek_block(target_in_5, 8);
1681        assert!(result.is_some());
1682        assert!(result.unwrap() >= 8);
1683    }
1684}