Skip to main content

hermes_core/structures/postings/
posting.rs

1//! Posting list implementation with compact representation
2//!
3//! Text blocks use SIMD-friendly packed bit-width encoding:
4//! - Doc IDs: delta-encoded, packed at rounded bit width (0/8/16/32)
5//! - Term frequencies: packed at rounded bit width
6//! - Same SIMD primitives as sparse blocks (`simd::pack_rounded` / `unpack_rounded`)
7
8use byteorder::{LittleEndian, WriteBytesExt};
9use std::io::{self, Read, Write};
10
11use super::posting_common::{read_vint, write_vint};
12use crate::DocId;
13use crate::directories::OwnedBytes;
14use crate::structures::simd;
15
16/// A posting entry containing doc_id and term frequency
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct Posting {
19    pub doc_id: DocId,
20    pub term_freq: u32,
21}
22
23/// Compact posting list with delta encoding
24#[derive(Debug, Clone, Default)]
25pub struct PostingList {
26    postings: Vec<Posting>,
27}
28
29impl PostingList {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub fn with_capacity(capacity: usize) -> Self {
35        Self {
36            postings: Vec::with_capacity(capacity),
37        }
38    }
39
40    /// Add a posting (must be added in doc_id order)
41    pub fn push(&mut self, doc_id: DocId, term_freq: u32) {
42        debug_assert!(
43            self.postings.is_empty() || self.postings.last().unwrap().doc_id < doc_id,
44            "Postings must be added in sorted order"
45        );
46        self.postings.push(Posting { doc_id, term_freq });
47    }
48
49    /// Add a posting, incrementing term_freq if doc already exists
50    pub fn add(&mut self, doc_id: DocId, term_freq: u32) {
51        if let Some(last) = self.postings.last_mut()
52            && last.doc_id == doc_id
53        {
54            last.term_freq += term_freq;
55            return;
56        }
57        self.postings.push(Posting { doc_id, term_freq });
58    }
59
60    /// Get document count
61    pub fn doc_count(&self) -> u32 {
62        self.postings.len() as u32
63    }
64
65    pub fn len(&self) -> usize {
66        self.postings.len()
67    }
68
69    pub fn is_empty(&self) -> bool {
70        self.postings.is_empty()
71    }
72
73    pub fn iter(&self) -> impl Iterator<Item = &Posting> {
74        self.postings.iter()
75    }
76
77    /// Serialize to bytes using delta encoding and varint
78    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
79        // Write number of postings
80        write_vint(writer, self.postings.len() as u64)?;
81
82        let mut prev_doc_id = 0u32;
83        for posting in &self.postings {
84            // Delta encode doc_id
85            let delta = posting.doc_id - prev_doc_id;
86            write_vint(writer, delta as u64)?;
87            write_vint(writer, posting.term_freq as u64)?;
88            prev_doc_id = posting.doc_id;
89        }
90
91        Ok(())
92    }
93
94    /// Deserialize from bytes
95    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
96        let count = read_vint(reader)? as usize;
97        let mut postings = Vec::with_capacity(count);
98
99        let mut prev_doc_id = 0u32;
100        for _ in 0..count {
101            let delta = read_vint(reader)? as u32;
102            let term_freq = read_vint(reader)? as u32;
103            let doc_id = prev_doc_id + delta;
104            postings.push(Posting { doc_id, term_freq });
105            prev_doc_id = doc_id;
106        }
107
108        Ok(Self { postings })
109    }
110}
111
112/// Iterator over posting list that supports seeking
113pub struct PostingListIterator<'a> {
114    postings: &'a [Posting],
115    position: usize,
116}
117
118impl<'a> PostingListIterator<'a> {
119    pub fn new(posting_list: &'a PostingList) -> Self {
120        Self {
121            postings: &posting_list.postings,
122            position: 0,
123        }
124    }
125
126    /// Current document ID, or TERMINATED if exhausted
127    pub fn doc(&self) -> DocId {
128        if self.position < self.postings.len() {
129            self.postings[self.position].doc_id
130        } else {
131            TERMINATED
132        }
133    }
134
135    /// Current term frequency
136    pub fn term_freq(&self) -> u32 {
137        if self.position < self.postings.len() {
138            self.postings[self.position].term_freq
139        } else {
140            0
141        }
142    }
143
144    /// Advance to next posting, returns new doc_id or TERMINATED
145    pub fn advance(&mut self) -> DocId {
146        self.position += 1;
147        self.doc()
148    }
149
150    /// Seek to first doc_id >= target (binary search on remaining postings)
151    pub fn seek(&mut self, target: DocId) -> DocId {
152        let remaining = &self.postings[self.position..];
153        let offset = remaining.partition_point(|p| p.doc_id < target);
154        self.position += offset;
155        self.doc()
156    }
157
158    /// Size hint for remaining elements
159    pub fn size_hint(&self) -> usize {
160        self.postings.len().saturating_sub(self.position)
161    }
162}
163
164/// Sentinel value indicating iterator is exhausted
165pub const TERMINATED: DocId = DocId::MAX;
166
167/// Block-based posting list with 2-level skip index.
168///
169/// Each block contains up to `BLOCK_SIZE` postings encoded as packed bit-width arrays.
170/// Skip entries use a compact 2-level structure for cache-friendly seeking:
171/// - **Level-0** (16 bytes/block): `first_doc`, `last_doc`, `offset`, `max_weight`
172/// - **Level-1** (4 bytes/group): `last_doc` per `L1_INTERVAL` blocks
173///
174/// Seek algorithm: binary search L1, then linear scan ≤`L1_INTERVAL` L0 entries.
175pub const BLOCK_SIZE: usize = 128;
176
177/// Number of L0 blocks per L1 skip entry.
178const L1_INTERVAL: usize = 8;
179
180/// Compact level-0 skip entry — 16 bytes.
181/// `length` is omitted: computable from the block's 8-byte header.
182const L0_SIZE: usize = 16;
183
184/// Level-1 skip entry — 4 bytes (just `last_doc`).
185const L1_SIZE: usize = 4;
186
187/// Footer: stream_len(8) + l0_count(4) + l1_count(4) + doc_count(4) + max_tf(4) = 24 bytes.
188const FOOTER_SIZE: usize = 24;
189
190/// Read a compact L0 entry from raw bytes at the given index.
191#[inline]
192fn read_l0(bytes: &[u8], idx: usize) -> (u32, u32, u32, f32) {
193    let p = idx * L0_SIZE;
194    let first_doc = u32::from_le_bytes(bytes[p..p + 4].try_into().unwrap());
195    let last_doc = u32::from_le_bytes(bytes[p + 4..p + 8].try_into().unwrap());
196    let offset = u32::from_le_bytes(bytes[p + 8..p + 12].try_into().unwrap());
197    let max_weight = f32::from_le_bytes(bytes[p + 12..p + 16].try_into().unwrap());
198    (first_doc, last_doc, offset, max_weight)
199}
200
201/// Write a compact L0 entry.
202#[inline]
203fn write_l0(buf: &mut Vec<u8>, first_doc: u32, last_doc: u32, offset: u32, max_weight: f32) {
204    buf.extend_from_slice(&first_doc.to_le_bytes());
205    buf.extend_from_slice(&last_doc.to_le_bytes());
206    buf.extend_from_slice(&offset.to_le_bytes());
207    buf.extend_from_slice(&max_weight.to_le_bytes());
208}
209
210/// Compute block data size from the 8-byte header at `stream[pos..]`.
211///
212/// Header: `[count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]`
213/// Data size = 8 + (count-1) × bytes_per_value(doc_id_bits) + count × bytes_per_value(tf_bits)
214#[inline]
215fn block_data_size(stream: &[u8], pos: usize) -> usize {
216    let count = u16::from_le_bytes(stream[pos..pos + 2].try_into().unwrap()) as usize;
217    let doc_rounded = simd::RoundedBitWidth::from_u8(stream[pos + 6]);
218    let tf_rounded = simd::RoundedBitWidth::from_u8(stream[pos + 7]);
219    let delta_bytes = if count > 1 {
220        (count - 1) * doc_rounded.bytes_per_value()
221    } else {
222        0
223    };
224    8 + delta_bytes + count * tf_rounded.bytes_per_value()
225}
226
227#[derive(Debug, Clone)]
228pub struct BlockPostingList {
229    /// Block data stream (packed blocks laid out sequentially).
230    stream: OwnedBytes,
231    /// Level-0 skip entries: `(first_doc, last_doc, offset, max_weight)` × `l0_count`.
232    /// 16 bytes per entry. Supports O(1) random access by block index.
233    l0_bytes: OwnedBytes,
234    /// Number of blocks (= number of L0 entries).
235    l0_count: usize,
236    /// Level-1 skip `last_doc` values — one per `L1_INTERVAL` blocks.
237    /// Stored as `Vec<u32>` for direct SIMD-accelerated `find_first_ge_u32`.
238    l1_docs: Vec<u32>,
239    /// Total posting count.
240    doc_count: u32,
241    /// Max TF across all blocks.
242    max_tf: u32,
243}
244
245impl BlockPostingList {
246    /// Read L0 entry by block index. Returns `(first_doc, last_doc, offset, max_weight)`.
247    #[inline]
248    fn read_l0_entry(&self, idx: usize) -> (u32, u32, u32, f32) {
249        read_l0(&self.l0_bytes, idx)
250    }
251
252    /// Build from a posting list.
253    ///
254    /// Block format (8-byte header + packed arrays):
255    /// ```text
256    /// [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
257    /// [packed doc_id deltas: (count-1) × bytes_per_value(doc_id_bits)]
258    /// [packed tfs: count × bytes_per_value(tf_bits)]
259    /// ```
260    pub fn from_posting_list(list: &PostingList) -> io::Result<Self> {
261        let mut stream: Vec<u8> = Vec::new();
262        let mut l0_buf: Vec<u8> = Vec::new();
263        let mut l1_docs: Vec<u32> = Vec::new();
264        let mut l0_count = 0usize;
265        let mut max_tf = 0u32;
266
267        let postings = &list.postings;
268        let mut i = 0;
269
270        // Temp buffers reused across blocks
271        let mut deltas = Vec::with_capacity(BLOCK_SIZE);
272        let mut tf_buf = Vec::with_capacity(BLOCK_SIZE);
273
274        while i < postings.len() {
275            if stream.len() > u32::MAX as usize {
276                return Err(io::Error::new(
277                    io::ErrorKind::InvalidData,
278                    "posting list stream exceeds u32::MAX bytes",
279                ));
280            }
281            let block_start = stream.len() as u32;
282            let block_end = (i + BLOCK_SIZE).min(postings.len());
283            let block = &postings[i..block_end];
284            let count = block.len();
285
286            // Compute block's max term frequency for block-max pruning
287            let block_max_tf = block.iter().map(|p| p.term_freq).max().unwrap_or(0);
288            max_tf = max_tf.max(block_max_tf);
289
290            let base_doc_id = block.first().unwrap().doc_id;
291            let last_doc_id = block.last().unwrap().doc_id;
292
293            // Delta-encode doc IDs (skip first — stored in header)
294            deltas.clear();
295            let mut prev = base_doc_id;
296            for posting in block.iter().skip(1) {
297                deltas.push(posting.doc_id - prev);
298                prev = posting.doc_id;
299            }
300            let max_delta = deltas.iter().copied().max().unwrap_or(0);
301            let doc_id_bits = simd::round_bit_width(simd::bits_needed(max_delta));
302
303            // Collect TFs
304            tf_buf.clear();
305            tf_buf.extend(block.iter().map(|p| p.term_freq));
306            let tf_bits = simd::round_bit_width(simd::bits_needed(block_max_tf));
307
308            // Write 8-byte header: [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
309            stream.write_u16::<LittleEndian>(count as u16)?;
310            stream.write_u32::<LittleEndian>(base_doc_id)?;
311            stream.push(doc_id_bits);
312            stream.push(tf_bits);
313
314            // Write packed doc_id deltas ((count-1) values)
315            if count > 1 {
316                let rounded = simd::RoundedBitWidth::from_u8(doc_id_bits);
317                let byte_count = (count - 1) * rounded.bytes_per_value();
318                let start = stream.len();
319                stream.resize(start + byte_count, 0);
320                simd::pack_rounded(&deltas, rounded, &mut stream[start..]);
321            }
322
323            // Write packed TFs (count values)
324            {
325                let rounded = simd::RoundedBitWidth::from_u8(tf_bits);
326                let byte_count = count * rounded.bytes_per_value();
327                let start = stream.len();
328                stream.resize(start + byte_count, 0);
329                simd::pack_rounded(&tf_buf, rounded, &mut stream[start..]);
330            }
331
332            // L0 skip entry
333            write_l0(
334                &mut l0_buf,
335                base_doc_id,
336                last_doc_id,
337                block_start,
338                block_max_tf as f32,
339            );
340            l0_count += 1;
341
342            // L1 entry at the end of each L1_INTERVAL group
343            if l0_count.is_multiple_of(L1_INTERVAL) {
344                l1_docs.push(last_doc_id);
345            }
346
347            i = block_end;
348        }
349
350        // Final L1 entry for partial group
351        if !l0_count.is_multiple_of(L1_INTERVAL) && l0_count > 0 {
352            let (_, last_doc, _, _) = read_l0(&l0_buf, l0_count - 1);
353            l1_docs.push(last_doc);
354        }
355
356        Ok(Self {
357            stream: OwnedBytes::new(stream),
358            l0_bytes: OwnedBytes::new(l0_buf),
359            l0_count,
360            l1_docs,
361            doc_count: postings.len() as u32,
362            max_tf,
363        })
364    }
365
366    /// Serialize the block posting list (footer-based: stream first).
367    ///
368    /// Format:
369    /// ```text
370    /// [stream: block data]
371    /// [L0 entries: l0_count × 16 bytes (first_doc, last_doc, offset, max_weight)]
372    /// [L1 entries: l1_count × 4 bytes (last_doc)]
373    /// [footer: stream_len(8) + l0_count(4) + l1_count(4) + doc_count(4) + max_tf(4) = 24 bytes]
374    /// ```
375    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
376        writer.write_all(&self.stream)?;
377        writer.write_all(&self.l0_bytes)?;
378        for &doc in &self.l1_docs {
379            writer.write_u32::<LittleEndian>(doc)?;
380        }
381
382        // Footer (24 bytes)
383        writer.write_u64::<LittleEndian>(self.stream.len() as u64)?;
384        writer.write_u32::<LittleEndian>(self.l0_count as u32)?;
385        writer.write_u32::<LittleEndian>(self.l1_docs.len() as u32)?;
386        writer.write_u32::<LittleEndian>(self.doc_count)?;
387        writer.write_u32::<LittleEndian>(self.max_tf)?;
388
389        Ok(())
390    }
391
392    /// Deserialize from a byte slice (footer-based format).
393    pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
394        if raw.len() < FOOTER_SIZE {
395            return Err(io::Error::new(
396                io::ErrorKind::InvalidData,
397                "posting data too short",
398            ));
399        }
400
401        let f = raw.len() - FOOTER_SIZE;
402        let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
403        let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
404        let l1_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap()) as usize;
405        let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
406        let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
407
408        let l0_start = stream_len;
409        let l0_end = l0_start + l0_count * L0_SIZE;
410        let l1_start = l0_end;
411
412        let l1_docs = Self::extract_l1_docs(&raw[l1_start..], l1_count);
413
414        Ok(Self {
415            stream: OwnedBytes::new(raw[..stream_len].to_vec()),
416            l0_bytes: OwnedBytes::new(raw[l0_start..l0_end].to_vec()),
417            l0_count,
418            l1_docs,
419            doc_count,
420            max_tf,
421        })
422    }
423
424    /// Zero-copy deserialization from OwnedBytes.
425    /// Stream and L0 are sliced from the source without copying.
426    /// L1 is extracted into a `Vec<u32>` for SIMD-friendly access (tiny: ≤ N/8 entries).
427    pub fn deserialize_zero_copy(raw: OwnedBytes) -> io::Result<Self> {
428        if raw.len() < FOOTER_SIZE {
429            return Err(io::Error::new(
430                io::ErrorKind::InvalidData,
431                "posting data too short",
432            ));
433        }
434
435        let f = raw.len() - FOOTER_SIZE;
436        let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
437        let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
438        let l1_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap()) as usize;
439        let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
440        let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
441
442        let l0_start = stream_len;
443        let l0_end = l0_start + l0_count * L0_SIZE;
444        let l1_start = l0_end;
445
446        let l1_docs = Self::extract_l1_docs(&raw[l1_start..], l1_count);
447
448        Ok(Self {
449            stream: raw.slice(0..stream_len),
450            l0_bytes: raw.slice(l0_start..l0_end),
451            l0_count,
452            l1_docs,
453            doc_count,
454            max_tf,
455        })
456    }
457
458    /// Extract L1 last_doc values from raw LE bytes into a Vec<u32>.
459    fn extract_l1_docs(bytes: &[u8], count: usize) -> Vec<u32> {
460        let mut docs = Vec::with_capacity(count);
461        for i in 0..count {
462            let p = i * L1_SIZE;
463            docs.push(u32::from_le_bytes(bytes[p..p + 4].try_into().unwrap()));
464        }
465        docs
466    }
467
468    pub fn doc_count(&self) -> u32 {
469        self.doc_count
470    }
471
472    /// Get maximum term frequency (for MaxScore upper bound computation)
473    pub fn max_tf(&self) -> u32 {
474        self.max_tf
475    }
476
477    /// Get number of blocks
478    pub fn num_blocks(&self) -> usize {
479        self.l0_count
480    }
481
482    /// Get block's max term frequency for block-max pruning
483    pub fn block_max_tf(&self, block_idx: usize) -> Option<u32> {
484        if block_idx >= self.l0_count {
485            return None;
486        }
487        let (_, _, _, max_weight) = self.read_l0_entry(block_idx);
488        Some(max_weight as u32)
489    }
490
491    /// Concatenate blocks from multiple posting lists with doc_id remapping.
492    /// This is O(num_blocks) instead of O(num_postings).
493    pub fn concatenate_blocks(sources: &[(BlockPostingList, u32)]) -> io::Result<Self> {
494        let mut stream: Vec<u8> = Vec::new();
495        let mut l0_buf: Vec<u8> = Vec::new();
496        let mut l1_docs: Vec<u32> = Vec::new();
497        let mut l0_count = 0usize;
498        let mut total_docs = 0u32;
499        let mut max_tf = 0u32;
500
501        for (source, doc_offset) in sources {
502            max_tf = max_tf.max(source.max_tf);
503            for block_idx in 0..source.num_blocks() {
504                let (first_doc, last_doc, offset, max_weight) = source.read_l0_entry(block_idx);
505                let blk_size = block_data_size(&source.stream, offset as usize);
506                let block_bytes = &source.stream[offset as usize..offset as usize + blk_size];
507
508                let count = u16::from_le_bytes(block_bytes[0..2].try_into().unwrap());
509                if stream.len() > u32::MAX as usize {
510                    return Err(io::Error::new(
511                        io::ErrorKind::InvalidData,
512                        "posting list stream exceeds u32::MAX bytes during concatenation",
513                    ));
514                }
515                let new_offset = stream.len() as u32;
516
517                // Write patched header + copy packed arrays verbatim
518                stream.write_u16::<LittleEndian>(count)?;
519                stream.write_u32::<LittleEndian>(first_doc + doc_offset)?;
520                stream.extend_from_slice(&block_bytes[6..]);
521
522                let new_last = last_doc + doc_offset;
523                write_l0(
524                    &mut l0_buf,
525                    first_doc + doc_offset,
526                    new_last,
527                    new_offset,
528                    max_weight,
529                );
530                l0_count += 1;
531                total_docs += count as u32;
532
533                if l0_count.is_multiple_of(L1_INTERVAL) {
534                    l1_docs.push(new_last);
535                }
536            }
537        }
538
539        // Final L1 entry for partial group
540        if !l0_count.is_multiple_of(L1_INTERVAL) && l0_count > 0 {
541            let (_, last_doc, _, _) = read_l0(&l0_buf, l0_count - 1);
542            l1_docs.push(last_doc);
543        }
544
545        Ok(Self {
546            stream: OwnedBytes::new(stream),
547            l0_bytes: OwnedBytes::new(l0_buf),
548            l0_count,
549            l1_docs,
550            doc_count: total_docs,
551            max_tf,
552        })
553    }
554
555    /// Streaming merge: write blocks directly to output writer (bounded memory).
556    ///
557    /// **Zero-materializing**: reads L0 entries directly from source bytes
558    /// (mmap or &[u8]) without parsing into Vecs. Block sizes computed from
559    /// the 8-byte header (deterministic with packed encoding).
560    ///
561    /// Output L0 + L1 are buffered (bounded O(total_blocks × 16 + total_blocks/8 × 4)).
562    /// Block data flows source → output writer without intermediate buffering.
563    ///
564    /// Returns `(doc_count, bytes_written)`.
565    pub fn concatenate_streaming<W: Write>(
566        sources: &[(&[u8], u32)], // (serialized_bytes, doc_offset)
567        writer: &mut W,
568    ) -> io::Result<(u32, usize)> {
569        struct SourceMeta {
570            stream_len: usize,
571            l0_count: usize,
572        }
573
574        let mut metas: Vec<SourceMeta> = Vec::with_capacity(sources.len());
575        let mut total_docs = 0u32;
576        let mut merged_max_tf = 0u32;
577
578        for (raw, _) in sources {
579            if raw.len() < FOOTER_SIZE {
580                continue;
581            }
582            let f = raw.len() - FOOTER_SIZE;
583            let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
584            let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
585            // l1_count not needed — we rebuild L1
586            let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
587            let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
588            total_docs += doc_count;
589            merged_max_tf = merged_max_tf.max(max_tf);
590            metas.push(SourceMeta {
591                stream_len,
592                l0_count,
593            });
594        }
595
596        // Phase 1: Stream block data, reading L0 entries on-the-fly.
597        // Accumulate output L0 + L1 (bounded).
598        let mut out_l0: Vec<u8> = Vec::new();
599        let mut out_l1_docs: Vec<u32> = Vec::new();
600        let mut out_l0_count = 0usize;
601        let mut stream_written = 0u64;
602        let mut patch_buf = [0u8; 8];
603
604        for (src_idx, meta) in metas.iter().enumerate() {
605            let (raw, doc_offset) = &sources[src_idx];
606            let l0_base = meta.stream_len; // L0 entries start right after stream
607            let src_stream = &raw[..meta.stream_len];
608
609            for i in 0..meta.l0_count {
610                // Read source L0 entry directly from raw bytes
611                let (first_doc, last_doc, offset, max_weight) = read_l0(&raw[l0_base..], i);
612
613                // Compute block size from header
614                let blk_size = block_data_size(src_stream, offset as usize);
615                let block = &src_stream[offset as usize..offset as usize + blk_size];
616
617                // Write output L0 entry
618                let new_last = last_doc + doc_offset;
619                if stream_written > u32::MAX as u64 {
620                    return Err(io::Error::new(
621                        io::ErrorKind::InvalidData,
622                        "posting list stream exceeds u32::MAX bytes during streaming merge",
623                    ));
624                }
625                write_l0(
626                    &mut out_l0,
627                    first_doc + doc_offset,
628                    new_last,
629                    stream_written as u32,
630                    max_weight,
631                );
632                out_l0_count += 1;
633
634                // L1 entry at group boundary
635                if out_l0_count.is_multiple_of(L1_INTERVAL) {
636                    out_l1_docs.push(new_last);
637                }
638
639                // Patch 8-byte header: [count: u16][first_doc: u32][bits: 2 bytes]
640                patch_buf.copy_from_slice(&block[0..8]);
641                let blk_first = u32::from_le_bytes(patch_buf[2..6].try_into().unwrap());
642                patch_buf[2..6].copy_from_slice(&(blk_first + doc_offset).to_le_bytes());
643                writer.write_all(&patch_buf)?;
644                writer.write_all(&block[8..])?;
645
646                stream_written += blk_size as u64;
647            }
648        }
649
650        // Final L1 entry for partial group
651        if !out_l0_count.is_multiple_of(L1_INTERVAL) && out_l0_count > 0 {
652            let (_, last_doc, _, _) = read_l0(&out_l0, out_l0_count - 1);
653            out_l1_docs.push(last_doc);
654        }
655
656        // Phase 2: Write L0 + L1 + footer
657        writer.write_all(&out_l0)?;
658        for &doc in &out_l1_docs {
659            writer.write_u32::<LittleEndian>(doc)?;
660        }
661
662        writer.write_u64::<LittleEndian>(stream_written)?;
663        writer.write_u32::<LittleEndian>(out_l0_count as u32)?;
664        writer.write_u32::<LittleEndian>(out_l1_docs.len() as u32)?;
665        writer.write_u32::<LittleEndian>(total_docs)?;
666        writer.write_u32::<LittleEndian>(merged_max_tf)?;
667
668        let l1_bytes_len = out_l1_docs.len() * L1_SIZE;
669        let total_bytes = stream_written as usize + out_l0.len() + l1_bytes_len + FOOTER_SIZE;
670        Ok((total_docs, total_bytes))
671    }
672
673    /// Decode a specific block into caller-provided buffers.
674    ///
675    /// Returns `true` if the block was decoded, `false` if `block_idx` is out of range.
676    /// Reuses `doc_ids` and `tfs` buffers (cleared before filling).
677    ///
678    /// Uses SIMD-accelerated unpack for 8/16/32-bit packed arrays.
679    pub fn decode_block_into(
680        &self,
681        block_idx: usize,
682        doc_ids: &mut Vec<u32>,
683        tfs: &mut Vec<u32>,
684    ) -> bool {
685        if block_idx >= self.l0_count {
686            return false;
687        }
688
689        let (_, _, offset, _) = self.read_l0_entry(block_idx);
690        let pos = offset as usize;
691        let blk_size = block_data_size(&self.stream, pos);
692        let block_data = &self.stream[pos..pos + blk_size];
693
694        // 8-byte header: [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
695        let count = u16::from_le_bytes(block_data[0..2].try_into().unwrap()) as usize;
696        let first_doc = u32::from_le_bytes(block_data[2..6].try_into().unwrap());
697        let doc_id_bits = block_data[6];
698        let tf_bits = block_data[7];
699
700        // Decode doc IDs: unpack deltas + prefix sum
701        doc_ids.clear();
702        doc_ids.resize(count, 0);
703        doc_ids[0] = first_doc;
704
705        let doc_rounded = simd::RoundedBitWidth::from_u8(doc_id_bits);
706        let deltas_bytes = if count > 1 {
707            (count - 1) * doc_rounded.bytes_per_value()
708        } else {
709            0
710        };
711
712        if count > 1 {
713            simd::unpack_rounded(
714                &block_data[8..8 + deltas_bytes],
715                doc_rounded,
716                &mut doc_ids[1..],
717                count - 1,
718            );
719            for i in 1..count {
720                doc_ids[i] += doc_ids[i - 1];
721            }
722        }
723
724        // Decode TFs
725        tfs.clear();
726        tfs.resize(count, 0);
727        let tf_rounded = simd::RoundedBitWidth::from_u8(tf_bits);
728        let tfs_start = 8 + deltas_bytes;
729        simd::unpack_rounded(
730            &block_data[tfs_start..tfs_start + count * tf_rounded.bytes_per_value()],
731            tf_rounded,
732            tfs,
733            count,
734        );
735
736        true
737    }
738
739    /// First doc_id of a block (from L0 skip entry). Returns `None` if out of range.
740    #[inline]
741    pub fn block_first_doc(&self, block_idx: usize) -> Option<DocId> {
742        if block_idx >= self.l0_count {
743            return None;
744        }
745        let (first_doc, _, _, _) = self.read_l0_entry(block_idx);
746        Some(first_doc)
747    }
748
749    /// Last doc_id of a block (from L0 skip entry). Returns `None` if out of range.
750    #[inline]
751    pub fn block_last_doc(&self, block_idx: usize) -> Option<DocId> {
752        if block_idx >= self.l0_count {
753            return None;
754        }
755        let (_, last_doc, _, _) = self.read_l0_entry(block_idx);
756        Some(last_doc)
757    }
758
759    /// Find the first block whose `last_doc >= target`, starting from `from_block`.
760    ///
761    /// Uses SIMD-accelerated linear scan:
762    /// 1. `find_first_ge_u32` on the contiguous L1 `last_doc` array
763    /// 2. Extract ≤`L1_INTERVAL` L0 `last_doc` values into a stack buffer → `find_first_ge_u32`
764    ///
765    /// Returns `None` if no block contains `target`.
766    pub fn seek_block(&self, target: DocId, from_block: usize) -> Option<usize> {
767        if from_block >= self.l0_count {
768            return None;
769        }
770
771        let from_l1 = from_block / L1_INTERVAL;
772
773        // SIMD scan L1 to find the group containing target
774        let l1_idx = if !self.l1_docs.is_empty() {
775            let idx = from_l1 + simd::find_first_ge_u32(&self.l1_docs[from_l1..], target);
776            if idx >= self.l1_docs.len() {
777                return None;
778            }
779            idx
780        } else {
781            return None;
782        };
783
784        // Extract L0 last_doc values within the group into a stack buffer for SIMD scan
785        let start = (l1_idx * L1_INTERVAL).max(from_block);
786        let end = ((l1_idx + 1) * L1_INTERVAL).min(self.l0_count);
787        let count = end - start;
788
789        let mut last_docs = [u32::MAX; L1_INTERVAL];
790        for (j, idx) in (start..end).enumerate() {
791            let (_, ld, _, _) = read_l0(&self.l0_bytes, idx);
792            last_docs[j] = ld;
793        }
794        let within = simd::find_first_ge_u32(&last_docs[..count], target);
795        let block_idx = start + within;
796
797        if block_idx < self.l0_count {
798            Some(block_idx)
799        } else {
800            None
801        }
802    }
803
804    /// Create an iterator with skip support
805    pub fn iterator(&self) -> BlockPostingIterator<'_> {
806        BlockPostingIterator::new(self)
807    }
808
809    /// Create an owned iterator that doesn't borrow self
810    pub fn into_iterator(self) -> BlockPostingIterator<'static> {
811        BlockPostingIterator::owned(self)
812    }
813}
814
815/// Iterator over block posting list with skip support
816/// Can be either borrowed or owned via Cow
817///
818/// Uses struct-of-arrays layout: separate Vec<u32> for doc_ids and term_freqs.
819/// This is more cache-friendly for SIMD seek (contiguous doc_ids) and halves
820/// memory vs the previous AoS + separate doc_ids approach.
821pub struct BlockPostingIterator<'a> {
822    block_list: std::borrow::Cow<'a, BlockPostingList>,
823    current_block: usize,
824    block_doc_ids: Vec<u32>,
825    block_tfs: Vec<u32>,
826    position_in_block: usize,
827    exhausted: bool,
828}
829
830impl<'a> BlockPostingIterator<'a> {
831    fn new(block_list: &'a BlockPostingList) -> Self {
832        let exhausted = block_list.l0_count == 0;
833        let mut iter = Self {
834            block_list: std::borrow::Cow::Borrowed(block_list),
835            current_block: 0,
836            block_doc_ids: Vec::with_capacity(BLOCK_SIZE),
837            block_tfs: Vec::with_capacity(BLOCK_SIZE),
838            position_in_block: 0,
839            exhausted,
840        };
841        if !iter.exhausted {
842            iter.load_block(0);
843        }
844        iter
845    }
846
847    fn owned(block_list: BlockPostingList) -> BlockPostingIterator<'static> {
848        let exhausted = block_list.l0_count == 0;
849        let mut iter = BlockPostingIterator {
850            block_list: std::borrow::Cow::Owned(block_list),
851            current_block: 0,
852            block_doc_ids: Vec::with_capacity(BLOCK_SIZE),
853            block_tfs: Vec::with_capacity(BLOCK_SIZE),
854            position_in_block: 0,
855            exhausted,
856        };
857        if !iter.exhausted {
858            iter.load_block(0);
859        }
860        iter
861    }
862
863    fn load_block(&mut self, block_idx: usize) {
864        if block_idx >= self.block_list.l0_count {
865            self.exhausted = true;
866            return;
867        }
868
869        self.current_block = block_idx;
870        self.position_in_block = 0;
871
872        self.block_list
873            .decode_block_into(block_idx, &mut self.block_doc_ids, &mut self.block_tfs);
874    }
875
876    pub fn doc(&self) -> DocId {
877        if self.exhausted {
878            TERMINATED
879        } else if self.position_in_block < self.block_doc_ids.len() {
880            self.block_doc_ids[self.position_in_block]
881        } else {
882            TERMINATED
883        }
884    }
885
886    pub fn term_freq(&self) -> u32 {
887        if self.exhausted || self.position_in_block >= self.block_tfs.len() {
888            0
889        } else {
890            self.block_tfs[self.position_in_block]
891        }
892    }
893
894    pub fn advance(&mut self) -> DocId {
895        if self.exhausted {
896            return TERMINATED;
897        }
898
899        self.position_in_block += 1;
900        if self.position_in_block >= self.block_doc_ids.len() {
901            self.load_block(self.current_block + 1);
902        }
903        self.doc()
904    }
905
906    pub fn seek(&mut self, target: DocId) -> DocId {
907        if self.exhausted {
908            return TERMINATED;
909        }
910
911        // SIMD-accelerated 2-level seek (forward from current block)
912        let block_idx = match self.block_list.seek_block(target, self.current_block) {
913            Some(idx) => idx,
914            None => {
915                self.exhausted = true;
916                return TERMINATED;
917            }
918        };
919
920        if block_idx != self.current_block {
921            self.load_block(block_idx);
922        }
923
924        // SIMD linear scan within block on cached doc_ids
925        let remaining = &self.block_doc_ids[self.position_in_block..];
926        let pos = crate::structures::simd::find_first_ge_u32(remaining, target);
927        self.position_in_block += pos;
928
929        if self.position_in_block >= self.block_doc_ids.len() {
930            self.load_block(self.current_block + 1);
931        }
932        self.doc()
933    }
934
935    /// Skip to the next block, returning the first doc_id in the new block
936    /// This is used for block-max pruning when the current block's
937    /// max score can't beat the threshold.
938    pub fn skip_to_next_block(&mut self) -> DocId {
939        if self.exhausted {
940            return TERMINATED;
941        }
942        self.load_block(self.current_block + 1);
943        self.doc()
944    }
945
946    /// Get the current block index
947    #[inline]
948    pub fn current_block_idx(&self) -> usize {
949        self.current_block
950    }
951
952    /// Get total number of blocks
953    #[inline]
954    pub fn num_blocks(&self) -> usize {
955        self.block_list.l0_count
956    }
957
958    /// Get the current block's max term frequency for block-max pruning
959    #[inline]
960    pub fn current_block_max_tf(&self) -> u32 {
961        if self.exhausted || self.current_block >= self.block_list.l0_count {
962            0
963        } else {
964            let (_, _, _, max_weight) = self.block_list.read_l0_entry(self.current_block);
965            max_weight as u32
966        }
967    }
968}
969
970#[cfg(test)]
971mod tests {
972    use super::*;
973
974    #[test]
975    fn test_posting_list_basic() {
976        let mut list = PostingList::new();
977        list.push(1, 2);
978        list.push(5, 1);
979        list.push(10, 3);
980
981        assert_eq!(list.len(), 3);
982
983        let mut iter = PostingListIterator::new(&list);
984        assert_eq!(iter.doc(), 1);
985        assert_eq!(iter.term_freq(), 2);
986
987        assert_eq!(iter.advance(), 5);
988        assert_eq!(iter.term_freq(), 1);
989
990        assert_eq!(iter.advance(), 10);
991        assert_eq!(iter.term_freq(), 3);
992
993        assert_eq!(iter.advance(), TERMINATED);
994    }
995
996    #[test]
997    fn test_posting_list_serialization() {
998        let mut list = PostingList::new();
999        for i in 0..100 {
1000            list.push(i * 3, (i % 5) + 1);
1001        }
1002
1003        let mut buffer = Vec::new();
1004        list.serialize(&mut buffer).unwrap();
1005
1006        let deserialized = PostingList::deserialize(&mut &buffer[..]).unwrap();
1007        assert_eq!(deserialized.len(), list.len());
1008
1009        for (a, b) in list.iter().zip(deserialized.iter()) {
1010            assert_eq!(a, b);
1011        }
1012    }
1013
1014    #[test]
1015    fn test_posting_list_seek() {
1016        let mut list = PostingList::new();
1017        for i in 0..100 {
1018            list.push(i * 2, 1);
1019        }
1020
1021        let mut iter = PostingListIterator::new(&list);
1022
1023        assert_eq!(iter.seek(50), 50);
1024        assert_eq!(iter.seek(51), 52);
1025        assert_eq!(iter.seek(200), TERMINATED);
1026    }
1027
1028    #[test]
1029    fn test_block_posting_list() {
1030        let mut list = PostingList::new();
1031        for i in 0..500 {
1032            list.push(i * 2, (i % 10) + 1);
1033        }
1034
1035        let block_list = BlockPostingList::from_posting_list(&list).unwrap();
1036        assert_eq!(block_list.doc_count(), 500);
1037
1038        let mut iter = block_list.iterator();
1039        assert_eq!(iter.doc(), 0);
1040        assert_eq!(iter.term_freq(), 1);
1041
1042        // Test seek across blocks
1043        assert_eq!(iter.seek(500), 500);
1044        assert_eq!(iter.seek(998), 998);
1045        assert_eq!(iter.seek(1000), TERMINATED);
1046    }
1047
1048    #[test]
1049    fn test_block_posting_list_serialization() {
1050        let mut list = PostingList::new();
1051        for i in 0..300 {
1052            list.push(i * 3, i + 1);
1053        }
1054
1055        let block_list = BlockPostingList::from_posting_list(&list).unwrap();
1056
1057        let mut buffer = Vec::new();
1058        block_list.serialize(&mut buffer).unwrap();
1059
1060        let deserialized = BlockPostingList::deserialize(&buffer[..]).unwrap();
1061        assert_eq!(deserialized.doc_count(), block_list.doc_count());
1062
1063        // Verify iteration produces same results
1064        let mut iter1 = block_list.iterator();
1065        let mut iter2 = deserialized.iterator();
1066
1067        while iter1.doc() != TERMINATED {
1068            assert_eq!(iter1.doc(), iter2.doc());
1069            assert_eq!(iter1.term_freq(), iter2.term_freq());
1070            iter1.advance();
1071            iter2.advance();
1072        }
1073        assert_eq!(iter2.doc(), TERMINATED);
1074    }
1075
1076    /// Helper: collect all (doc_id, tf) from a BlockPostingIterator
1077    fn collect_postings(bpl: &BlockPostingList) -> Vec<(u32, u32)> {
1078        let mut result = Vec::new();
1079        let mut it = bpl.iterator();
1080        while it.doc() != TERMINATED {
1081            result.push((it.doc(), it.term_freq()));
1082            it.advance();
1083        }
1084        result
1085    }
1086
1087    /// Helper: build a BlockPostingList from (doc_id, tf) pairs
1088    fn build_bpl(postings: &[(u32, u32)]) -> BlockPostingList {
1089        let mut pl = PostingList::new();
1090        for &(doc_id, tf) in postings {
1091            pl.push(doc_id, tf);
1092        }
1093        BlockPostingList::from_posting_list(&pl).unwrap()
1094    }
1095
1096    /// Helper: serialize a BlockPostingList to bytes
1097    fn serialize_bpl(bpl: &BlockPostingList) -> Vec<u8> {
1098        let mut buf = Vec::new();
1099        bpl.serialize(&mut buf).unwrap();
1100        buf
1101    }
1102
1103    #[test]
1104    fn test_concatenate_blocks_two_segments() {
1105        // Segment A: docs 0,2,4,...,198 (100 docs, tf=1..100)
1106        let a: Vec<(u32, u32)> = (0..100).map(|i| (i * 2, i + 1)).collect();
1107        let bpl_a = build_bpl(&a);
1108
1109        // Segment B: docs 0,3,6,...,297 (100 docs, tf=2..101)
1110        let b: Vec<(u32, u32)> = (0..100).map(|i| (i * 3, i + 2)).collect();
1111        let bpl_b = build_bpl(&b);
1112
1113        // Merge: segment B starts at doc_offset=200
1114        let merged =
1115            BlockPostingList::concatenate_blocks(&[(bpl_a.clone(), 0), (bpl_b.clone(), 200)])
1116                .unwrap();
1117
1118        assert_eq!(merged.doc_count(), 200);
1119
1120        let postings = collect_postings(&merged);
1121        assert_eq!(postings.len(), 200);
1122
1123        // First 100 from A (unchanged)
1124        for (i, p) in postings.iter().enumerate().take(100) {
1125            assert_eq!(*p, (i as u32 * 2, i as u32 + 1));
1126        }
1127        // Next 100 from B (doc_id += 200)
1128        for i in 0..100 {
1129            assert_eq!(postings[100 + i], (i as u32 * 3 + 200, i as u32 + 2));
1130        }
1131    }
1132
1133    #[test]
1134    fn test_concatenate_streaming_matches_blocks() {
1135        // Build 3 segments with different doc distributions
1136        let seg_a: Vec<(u32, u32)> = (0..250).map(|i| (i * 2, (i % 7) + 1)).collect();
1137        let seg_b: Vec<(u32, u32)> = (0..180).map(|i| (i * 5, (i % 3) + 1)).collect();
1138        let seg_c: Vec<(u32, u32)> = (0..90).map(|i| (i * 10, (i % 11) + 1)).collect();
1139
1140        let bpl_a = build_bpl(&seg_a);
1141        let bpl_b = build_bpl(&seg_b);
1142        let bpl_c = build_bpl(&seg_c);
1143
1144        let offset_b = 1000u32;
1145        let offset_c = 2000u32;
1146
1147        // Method 1: concatenate_blocks (in-memory reference)
1148        let ref_merged = BlockPostingList::concatenate_blocks(&[
1149            (bpl_a.clone(), 0),
1150            (bpl_b.clone(), offset_b),
1151            (bpl_c.clone(), offset_c),
1152        ])
1153        .unwrap();
1154        let mut ref_buf = Vec::new();
1155        ref_merged.serialize(&mut ref_buf).unwrap();
1156
1157        // Method 2: concatenate_streaming (footer-based, writes to output)
1158        let bytes_a = serialize_bpl(&bpl_a);
1159        let bytes_b = serialize_bpl(&bpl_b);
1160        let bytes_c = serialize_bpl(&bpl_c);
1161
1162        let sources: Vec<(&[u8], u32)> =
1163            vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
1164        let mut stream_buf = Vec::new();
1165        let (doc_count, bytes_written) =
1166            BlockPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
1167
1168        assert_eq!(doc_count, 520); // 250 + 180 + 90
1169        assert_eq!(bytes_written, stream_buf.len());
1170
1171        // Deserialize both and verify identical postings
1172        let ref_postings = collect_postings(&BlockPostingList::deserialize(&ref_buf).unwrap());
1173        let stream_postings =
1174            collect_postings(&BlockPostingList::deserialize(&stream_buf).unwrap());
1175
1176        assert_eq!(ref_postings.len(), stream_postings.len());
1177        for (i, (r, s)) in ref_postings.iter().zip(stream_postings.iter()).enumerate() {
1178            assert_eq!(r, s, "mismatch at posting {}", i);
1179        }
1180    }
1181
1182    #[test]
1183    fn test_multi_round_merge() {
1184        // Simulate 3 rounds of merging (like tiered merge policy)
1185        //
1186        // Round 0: 4 small segments built independently
1187        // Round 1: merge pairs → 2 medium segments
1188        // Round 2: merge those → 1 large segment
1189
1190        let segments: Vec<Vec<(u32, u32)>> = (0..4)
1191            .map(|seg| (0..200).map(|i| (i * 3, (i + seg * 7) % 10 + 1)).collect())
1192            .collect();
1193
1194        let bpls: Vec<BlockPostingList> = segments.iter().map(|s| build_bpl(s)).collect();
1195        let serialized: Vec<Vec<u8>> = bpls.iter().map(serialize_bpl).collect();
1196
1197        // Round 1: merge seg0+seg1 (offset=0,600), seg2+seg3 (offset=0,600)
1198        let mut merged_01 = Vec::new();
1199        let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
1200        let (dc_01, _) =
1201            BlockPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
1202        assert_eq!(dc_01, 400);
1203
1204        let mut merged_23 = Vec::new();
1205        let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
1206        let (dc_23, _) =
1207            BlockPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
1208        assert_eq!(dc_23, 400);
1209
1210        // Round 2: merge the two intermediate results (offset=0, 1200)
1211        let mut final_merged = Vec::new();
1212        let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
1213        let (dc_final, _) =
1214            BlockPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
1215        assert_eq!(dc_final, 800);
1216
1217        // Verify final result has all 800 postings with correct doc_ids
1218        let final_bpl = BlockPostingList::deserialize(&final_merged).unwrap();
1219        let postings = collect_postings(&final_bpl);
1220        assert_eq!(postings.len(), 800);
1221
1222        // Verify doc_id ordering (must be monotonically non-decreasing within segments,
1223        // and segment boundaries at 0, 600, 1200, 1800)
1224        // Seg0: 0..597, Seg1: 600..1197, Seg2: 1200..1797, Seg3: 1800..2397
1225        assert_eq!(postings[0].0, 0); // first doc of seg0
1226        assert_eq!(postings[199].0, 597); // last doc of seg0 (199*3)
1227        assert_eq!(postings[200].0, 600); // first doc of seg1 (0+600)
1228        assert_eq!(postings[399].0, 1197); // last doc of seg1 (597+600)
1229        assert_eq!(postings[400].0, 1200); // first doc of seg2
1230        assert_eq!(postings[799].0, 2397); // last doc of seg3
1231
1232        // Verify TFs preserved through two rounds of merging
1233        // Creation formula: tf = (i + seg * 7) % 10 + 1
1234        for seg in 0u32..4 {
1235            for i in 0u32..200 {
1236                let idx = (seg * 200 + i) as usize;
1237                assert_eq!(
1238                    postings[idx].1,
1239                    (i + seg * 7) % 10 + 1,
1240                    "seg{} tf[{}]",
1241                    seg,
1242                    i
1243                );
1244            }
1245        }
1246
1247        // Verify seek works on final merged result
1248        let mut it = final_bpl.iterator();
1249        assert_eq!(it.seek(600), 600);
1250        assert_eq!(it.seek(1200), 1200);
1251        assert_eq!(it.seek(2397), 2397);
1252        assert_eq!(it.seek(2398), TERMINATED);
1253    }
1254
1255    #[test]
1256    fn test_large_scale_merge() {
1257        // 5 segments × 2000 docs each = 10,000 total docs
1258        // Each segment has 16 blocks (2000/128 = 15.6 → 16 blocks)
1259        let num_segments = 5;
1260        let docs_per_segment = 2000;
1261        let docs_gap = 3; // doc_ids: 0, 3, 6, ...
1262
1263        let segments: Vec<Vec<(u32, u32)>> = (0..num_segments)
1264            .map(|seg| {
1265                (0..docs_per_segment)
1266                    .map(|i| (i as u32 * docs_gap, (i as u32 + seg as u32) % 20 + 1))
1267                    .collect()
1268            })
1269            .collect();
1270
1271        let bpls: Vec<BlockPostingList> = segments.iter().map(|s| build_bpl(s)).collect();
1272
1273        // Verify each segment has multiple blocks
1274        for bpl in &bpls {
1275            assert!(
1276                bpl.num_blocks() >= 15,
1277                "expected >=15 blocks, got {}",
1278                bpl.num_blocks()
1279            );
1280        }
1281
1282        let serialized: Vec<Vec<u8>> = bpls.iter().map(serialize_bpl).collect();
1283
1284        // Compute offsets: each segment occupies max_doc+1 doc_id space
1285        let max_doc_per_seg = (docs_per_segment as u32 - 1) * docs_gap;
1286        let offsets: Vec<u32> = (0..num_segments)
1287            .map(|i| i as u32 * (max_doc_per_seg + 1))
1288            .collect();
1289
1290        let sources: Vec<(&[u8], u32)> = serialized
1291            .iter()
1292            .zip(offsets.iter())
1293            .map(|(b, o)| (b.as_slice(), *o))
1294            .collect();
1295
1296        let mut merged = Vec::new();
1297        let (doc_count, _) =
1298            BlockPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1299        assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1300
1301        // Deserialize and verify
1302        let merged_bpl = BlockPostingList::deserialize(&merged).unwrap();
1303        let postings = collect_postings(&merged_bpl);
1304        assert_eq!(postings.len(), num_segments * docs_per_segment);
1305
1306        // Verify all doc_ids are strictly monotonically increasing across segment boundaries
1307        for i in 1..postings.len() {
1308            assert!(
1309                postings[i].0 > postings[i - 1].0 || (i % docs_per_segment == 0), // new segment can have lower absolute ID
1310                "doc_id not increasing at {}: {} vs {}",
1311                i,
1312                postings[i - 1].0,
1313                postings[i].0,
1314            );
1315        }
1316
1317        // Verify seek across all block boundaries
1318        let mut it = merged_bpl.iterator();
1319        for (seg, &expected_first) in offsets.iter().enumerate() {
1320            assert_eq!(
1321                it.seek(expected_first),
1322                expected_first,
1323                "seek to segment {} start",
1324                seg
1325            );
1326        }
1327    }
1328
1329    #[test]
1330    fn test_merge_edge_cases() {
1331        // Single doc per segment
1332        let bpl_a = build_bpl(&[(0, 5)]);
1333        let bpl_b = build_bpl(&[(0, 3)]);
1334
1335        let merged =
1336            BlockPostingList::concatenate_blocks(&[(bpl_a.clone(), 0), (bpl_b.clone(), 1)])
1337                .unwrap();
1338        assert_eq!(merged.doc_count(), 2);
1339        let p = collect_postings(&merged);
1340        assert_eq!(p, vec![(0, 5), (1, 3)]);
1341
1342        // Exactly BLOCK_SIZE docs (single full block)
1343        let exact_block: Vec<(u32, u32)> = (0..BLOCK_SIZE as u32).map(|i| (i, i % 5 + 1)).collect();
1344        let bpl_exact = build_bpl(&exact_block);
1345        assert_eq!(bpl_exact.num_blocks(), 1);
1346
1347        let bytes = serialize_bpl(&bpl_exact);
1348        let mut out = Vec::new();
1349        let sources: Vec<(&[u8], u32)> = vec![(&bytes, 0), (&bytes, BLOCK_SIZE as u32)];
1350        let (dc, _) = BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1351        assert_eq!(dc, BLOCK_SIZE as u32 * 2);
1352
1353        let merged = BlockPostingList::deserialize(&out).unwrap();
1354        let postings = collect_postings(&merged);
1355        assert_eq!(postings.len(), BLOCK_SIZE * 2);
1356        // Second segment's docs offset by BLOCK_SIZE
1357        assert_eq!(postings[BLOCK_SIZE].0, BLOCK_SIZE as u32);
1358
1359        // BLOCK_SIZE + 1 docs (two blocks: 128 + 1)
1360        let over_block: Vec<(u32, u32)> = (0..BLOCK_SIZE as u32 + 1).map(|i| (i * 2, 1)).collect();
1361        let bpl_over = build_bpl(&over_block);
1362        assert_eq!(bpl_over.num_blocks(), 2);
1363    }
1364
1365    #[test]
1366    fn test_streaming_roundtrip_single_source() {
1367        // Streaming merge with a single source should produce equivalent output to serialize
1368        let docs: Vec<(u32, u32)> = (0..500).map(|i| (i * 7, i % 15 + 1)).collect();
1369        let bpl = build_bpl(&docs);
1370        let direct = serialize_bpl(&bpl);
1371
1372        let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1373        let mut streamed = Vec::new();
1374        BlockPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1375
1376        // Both should deserialize to identical postings
1377        let p1 = collect_postings(&BlockPostingList::deserialize(&direct).unwrap());
1378        let p2 = collect_postings(&BlockPostingList::deserialize(&streamed).unwrap());
1379        assert_eq!(p1, p2);
1380    }
1381
1382    #[test]
1383    fn test_max_tf_preserved_through_merge() {
1384        // Segment A: max_tf = 50
1385        let mut a = Vec::new();
1386        for i in 0..200 {
1387            a.push((i * 2, if i == 100 { 50 } else { 1 }));
1388        }
1389        let bpl_a = build_bpl(&a);
1390        assert_eq!(bpl_a.max_tf(), 50);
1391
1392        // Segment B: max_tf = 30
1393        let mut b = Vec::new();
1394        for i in 0..200 {
1395            b.push((i * 2, if i == 50 { 30 } else { 2 }));
1396        }
1397        let bpl_b = build_bpl(&b);
1398        assert_eq!(bpl_b.max_tf(), 30);
1399
1400        // After merge, max_tf should be max(50, 30) = 50
1401        let bytes_a = serialize_bpl(&bpl_a);
1402        let bytes_b = serialize_bpl(&bpl_b);
1403        let sources: Vec<(&[u8], u32)> = vec![(&bytes_a, 0), (&bytes_b, 1000)];
1404        let mut out = Vec::new();
1405        BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1406
1407        let merged = BlockPostingList::deserialize(&out).unwrap();
1408        assert_eq!(merged.max_tf(), 50);
1409        assert_eq!(merged.doc_count(), 400);
1410    }
1411
1412    // ── 2-level skip list format tests ──────────────────────────────────
1413
1414    #[test]
1415    fn test_l0_l1_counts() {
1416        // 1 block (< L1_INTERVAL) → 1 L1 entry (partial group)
1417        let bpl = build_bpl(&(0..50u32).map(|i| (i, 1)).collect::<Vec<_>>());
1418        assert_eq!(bpl.num_blocks(), 1);
1419        assert_eq!(bpl.l1_docs.len(), 1);
1420
1421        // Exactly L1_INTERVAL blocks → 1 L1 entry (full group)
1422        let n = BLOCK_SIZE * L1_INTERVAL;
1423        let bpl = build_bpl(&(0..n as u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1424        assert_eq!(bpl.num_blocks(), L1_INTERVAL);
1425        assert_eq!(bpl.l1_docs.len(), 1);
1426
1427        // L1_INTERVAL + 1 blocks → 2 L1 entries
1428        let n = BLOCK_SIZE * L1_INTERVAL + 1;
1429        let bpl = build_bpl(&(0..n as u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1430        assert_eq!(bpl.num_blocks(), L1_INTERVAL + 1);
1431        assert_eq!(bpl.l1_docs.len(), 2);
1432
1433        // 3 × L1_INTERVAL blocks → 3 L1 entries (all full groups)
1434        let n = BLOCK_SIZE * L1_INTERVAL * 3;
1435        let bpl = build_bpl(&(0..n as u32).map(|i| (i, 1)).collect::<Vec<_>>());
1436        assert_eq!(bpl.num_blocks(), L1_INTERVAL * 3);
1437        assert_eq!(bpl.l1_docs.len(), 3);
1438    }
1439
1440    #[test]
1441    fn test_l1_last_doc_values() {
1442        // 20 blocks: 2 full L1 groups (8+8) + 1 partial (4) → 3 L1 entries
1443        let n = BLOCK_SIZE * 20;
1444        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 3, 1)).collect();
1445        let bpl = build_bpl(&docs);
1446        assert_eq!(bpl.num_blocks(), 20);
1447        assert_eq!(bpl.l1_docs.len(), 3); // ceil(20/8) = 3
1448
1449        // L1[0] = last_doc of block 7 (end of first group)
1450        let expected_l1_0 = bpl.block_last_doc(7).unwrap();
1451        assert_eq!(bpl.l1_docs[0], expected_l1_0);
1452
1453        // L1[1] = last_doc of block 15 (end of second group)
1454        let expected_l1_1 = bpl.block_last_doc(15).unwrap();
1455        assert_eq!(bpl.l1_docs[1], expected_l1_1);
1456
1457        // L1[2] = last_doc of block 19 (end of partial group)
1458        let expected_l1_2 = bpl.block_last_doc(19).unwrap();
1459        assert_eq!(bpl.l1_docs[2], expected_l1_2);
1460    }
1461
1462    #[test]
1463    fn test_seek_block_basic() {
1464        // 20 blocks spanning large doc ID range
1465        let n = BLOCK_SIZE * 20;
1466        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 10, 1)).collect();
1467        let bpl = build_bpl(&docs);
1468
1469        // Seek to doc 0 → block 0
1470        assert_eq!(bpl.seek_block(0, 0), Some(0));
1471
1472        // Seek to the first doc of each block
1473        for blk in 0..20 {
1474            let first = bpl.block_first_doc(blk).unwrap();
1475            assert_eq!(
1476                bpl.seek_block(first, 0),
1477                Some(blk),
1478                "seek to block {} first_doc",
1479                blk
1480            );
1481        }
1482
1483        // Seek to the last doc of each block
1484        for blk in 0..20 {
1485            let last = bpl.block_last_doc(blk).unwrap();
1486            assert_eq!(
1487                bpl.seek_block(last, 0),
1488                Some(blk),
1489                "seek to block {} last_doc",
1490                blk
1491            );
1492        }
1493
1494        // Seek past all docs
1495        let max_doc = bpl.block_last_doc(19).unwrap();
1496        assert_eq!(bpl.seek_block(max_doc + 1, 0), None);
1497
1498        // Seek with from_block > 0 (skip early blocks)
1499        let mid_doc = bpl.block_first_doc(10).unwrap();
1500        assert_eq!(bpl.seek_block(mid_doc, 10), Some(10));
1501        assert_eq!(
1502            bpl.seek_block(mid_doc, 11),
1503            Some(11).or(bpl.seek_block(mid_doc, 11))
1504        );
1505    }
1506
1507    #[test]
1508    fn test_seek_block_across_l1_boundaries() {
1509        // 24 blocks = 3 L1 groups of 8
1510        let n = BLOCK_SIZE * 24;
1511        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 5, 1)).collect();
1512        let bpl = build_bpl(&docs);
1513        assert_eq!(bpl.l1_docs.len(), 3);
1514
1515        // Seek into each L1 group
1516        for group in 0..3 {
1517            let blk = group * L1_INTERVAL;
1518            let target = bpl.block_first_doc(blk).unwrap();
1519            assert_eq!(
1520                bpl.seek_block(target, 0),
1521                Some(blk),
1522                "seek to group {} block {}",
1523                group,
1524                blk
1525            );
1526        }
1527
1528        // Seek to doc in the middle of group 2 (block 20)
1529        let target = bpl.block_first_doc(20).unwrap() + 1;
1530        assert_eq!(bpl.seek_block(target, 0), Some(20));
1531    }
1532
1533    #[test]
1534    fn test_block_data_size_helper() {
1535        // Build a posting list and verify block_data_size matches actual block sizes
1536        let docs: Vec<(u32, u32)> = (0..500u32).map(|i| (i * 7, (i % 20) + 1)).collect();
1537        let bpl = build_bpl(&docs);
1538
1539        for blk in 0..bpl.num_blocks() {
1540            let (_, _, offset, _) = bpl.read_l0_entry(blk);
1541            let computed_size = block_data_size(&bpl.stream, offset as usize);
1542
1543            // Verify: next block's offset - this block's offset should equal computed_size
1544            // (for all but last block)
1545            if blk + 1 < bpl.num_blocks() {
1546                let (_, _, next_offset, _) = bpl.read_l0_entry(blk + 1);
1547                assert_eq!(
1548                    computed_size,
1549                    (next_offset - offset) as usize,
1550                    "block_data_size mismatch at block {}",
1551                    blk
1552                );
1553            } else {
1554                // Last block: offset + size should equal stream length
1555                assert_eq!(
1556                    offset as usize + computed_size,
1557                    bpl.stream.len(),
1558                    "last block size mismatch"
1559                );
1560            }
1561        }
1562    }
1563
1564    #[test]
1565    fn test_l0_entry_roundtrip() {
1566        // Verify L0 entries survive serialize → deserialize
1567        let docs: Vec<(u32, u32)> = (0..1000u32).map(|i| (i * 3, (i % 10) + 1)).collect();
1568        let bpl = build_bpl(&docs);
1569
1570        let bytes = serialize_bpl(&bpl);
1571        let bpl2 = BlockPostingList::deserialize(&bytes).unwrap();
1572
1573        assert_eq!(bpl.num_blocks(), bpl2.num_blocks());
1574        for blk in 0..bpl.num_blocks() {
1575            assert_eq!(
1576                bpl.read_l0_entry(blk),
1577                bpl2.read_l0_entry(blk),
1578                "L0 entry mismatch at block {}",
1579                blk
1580            );
1581        }
1582
1583        // Verify L1 docs match
1584        assert_eq!(bpl.l1_docs, bpl2.l1_docs);
1585    }
1586
1587    #[test]
1588    fn test_zero_copy_deserialize_matches() {
1589        let docs: Vec<(u32, u32)> = (0..2000u32).map(|i| (i * 2, (i % 5) + 1)).collect();
1590        let bpl = build_bpl(&docs);
1591        let bytes = serialize_bpl(&bpl);
1592
1593        let copied = BlockPostingList::deserialize(&bytes).unwrap();
1594        let zero_copy =
1595            BlockPostingList::deserialize_zero_copy(OwnedBytes::new(bytes.clone())).unwrap();
1596
1597        // Same structure
1598        assert_eq!(copied.l0_count, zero_copy.l0_count);
1599        assert_eq!(copied.l1_docs, zero_copy.l1_docs);
1600        assert_eq!(copied.doc_count, zero_copy.doc_count);
1601        assert_eq!(copied.max_tf, zero_copy.max_tf);
1602
1603        // Same iteration
1604        let p1 = collect_postings(&copied);
1605        let p2 = collect_postings(&zero_copy);
1606        assert_eq!(p1, p2);
1607    }
1608
1609    #[test]
1610    fn test_l1_preserved_through_streaming_merge() {
1611        // Merge 3 segments, verify L1 is correctly rebuilt
1612        let seg_a = build_bpl(&(0..1000u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1613        let seg_b = build_bpl(&(0..800u32).map(|i| (i * 3, 2)).collect::<Vec<_>>());
1614        let seg_c = build_bpl(&(0..500u32).map(|i| (i * 5, 3)).collect::<Vec<_>>());
1615
1616        let bytes_a = serialize_bpl(&seg_a);
1617        let bytes_b = serialize_bpl(&seg_b);
1618        let bytes_c = serialize_bpl(&seg_c);
1619
1620        let sources: Vec<(&[u8], u32)> = vec![(&bytes_a, 0), (&bytes_b, 10000), (&bytes_c, 20000)];
1621        let mut out = Vec::new();
1622        BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1623
1624        let merged = BlockPostingList::deserialize(&out).unwrap();
1625        let expected_l1_count = merged.num_blocks().div_ceil(L1_INTERVAL);
1626        assert_eq!(merged.l1_docs.len(), expected_l1_count);
1627
1628        // Verify L1 values are correct
1629        for (i, &l1_doc) in merged.l1_docs.iter().enumerate() {
1630            let last_block_in_group = ((i + 1) * L1_INTERVAL - 1).min(merged.num_blocks() - 1);
1631            let expected = merged.block_last_doc(last_block_in_group).unwrap();
1632            assert_eq!(l1_doc, expected, "L1[{}] mismatch", i);
1633        }
1634
1635        // Verify seek_block works on merged result
1636        for blk in 0..merged.num_blocks() {
1637            let first = merged.block_first_doc(blk).unwrap();
1638            assert_eq!(merged.seek_block(first, 0), Some(blk));
1639        }
1640    }
1641
1642    #[test]
1643    fn test_seek_block_single_block() {
1644        // Edge case: single block (< L1_INTERVAL)
1645        let bpl = build_bpl(&[(0, 1), (10, 2), (20, 3)]);
1646        assert_eq!(bpl.num_blocks(), 1);
1647        assert_eq!(bpl.l1_docs.len(), 1);
1648
1649        assert_eq!(bpl.seek_block(0, 0), Some(0));
1650        assert_eq!(bpl.seek_block(10, 0), Some(0));
1651        assert_eq!(bpl.seek_block(20, 0), Some(0));
1652        assert_eq!(bpl.seek_block(21, 0), None);
1653    }
1654
1655    #[test]
1656    fn test_footer_size() {
1657        // Verify serialized size = stream + L0 + L1 + FOOTER_SIZE
1658        let docs: Vec<(u32, u32)> = (0..500u32).map(|i| (i * 2, 1)).collect();
1659        let bpl = build_bpl(&docs);
1660        let bytes = serialize_bpl(&bpl);
1661
1662        let expected =
1663            bpl.stream.len() + bpl.l0_count * L0_SIZE + bpl.l1_docs.len() * L1_SIZE + FOOTER_SIZE;
1664        assert_eq!(bytes.len(), expected);
1665    }
1666
1667    #[test]
1668    fn test_seek_block_from_block_skips_earlier() {
1669        // 16 blocks: seek with from_block should skip earlier blocks
1670        let n = BLOCK_SIZE * 16;
1671        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 3, 1)).collect();
1672        let bpl = build_bpl(&docs);
1673
1674        // Target is in block 5, but from_block=8 → should find block >= 8
1675        let target_in_5 = bpl.block_first_doc(5).unwrap() + 1;
1676        // from_block=8 means we only look at blocks 8+
1677        // target_in_5 < last_doc of block 8, so seek_block(target, 8) should return 8
1678        let result = bpl.seek_block(target_in_5, 8);
1679        assert!(result.is_some());
1680        assert!(result.unwrap() >= 8);
1681    }
1682}