Skip to main content

hermes_core/structures/postings/
posting.rs

1//! Posting list implementation with compact representation
2//!
3//! Text blocks use SIMD-friendly packed bit-width encoding:
4//! - Doc IDs: delta-encoded, packed at rounded bit width (0/8/16/32)
5//! - Term frequencies: packed at rounded bit width
6//! - Same SIMD primitives as sparse blocks (`simd::pack_rounded` / `unpack_rounded`)
7
8use byteorder::{LittleEndian, WriteBytesExt};
9use std::io::{self, Read, Write};
10
11use super::posting_common::{read_vint, write_vint};
12use crate::DocId;
13use crate::directories::OwnedBytes;
14use crate::structures::simd;
15
16/// A posting entry containing doc_id and term frequency
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub struct Posting {
19    pub doc_id: DocId,
20    pub term_freq: u32,
21}
22
23/// Compact posting list with delta encoding
24#[derive(Debug, Clone, Default)]
25pub struct PostingList {
26    postings: Vec<Posting>,
27}
28
29impl PostingList {
30    pub fn new() -> Self {
31        Self::default()
32    }
33
34    pub fn with_capacity(capacity: usize) -> Self {
35        Self {
36            postings: Vec::with_capacity(capacity),
37        }
38    }
39
40    /// Add a posting (must be added in doc_id order)
41    pub fn push(&mut self, doc_id: DocId, term_freq: u32) {
42        debug_assert!(
43            self.postings.is_empty() || self.postings.last().unwrap().doc_id < doc_id,
44            "Postings must be added in sorted order"
45        );
46        self.postings.push(Posting { doc_id, term_freq });
47    }
48
49    /// Add a posting, incrementing term_freq if doc already exists
50    pub fn add(&mut self, doc_id: DocId, term_freq: u32) {
51        if let Some(last) = self.postings.last_mut()
52            && last.doc_id == doc_id
53        {
54            last.term_freq += term_freq;
55            return;
56        }
57        self.postings.push(Posting { doc_id, term_freq });
58    }
59
60    /// Get document count
61    pub fn doc_count(&self) -> u32 {
62        self.postings.len() as u32
63    }
64
65    pub fn len(&self) -> usize {
66        self.postings.len()
67    }
68
69    pub fn is_empty(&self) -> bool {
70        self.postings.is_empty()
71    }
72
73    pub fn iter(&self) -> impl Iterator<Item = &Posting> {
74        self.postings.iter()
75    }
76
77    /// Serialize to bytes using delta encoding and varint
78    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
79        // Write number of postings
80        write_vint(writer, self.postings.len() as u64)?;
81
82        let mut prev_doc_id = 0u32;
83        for posting in &self.postings {
84            // Delta encode doc_id
85            let delta = posting.doc_id - prev_doc_id;
86            write_vint(writer, delta as u64)?;
87            write_vint(writer, posting.term_freq as u64)?;
88            prev_doc_id = posting.doc_id;
89        }
90
91        Ok(())
92    }
93
94    /// Deserialize from bytes
95    pub fn deserialize<R: Read>(reader: &mut R) -> io::Result<Self> {
96        let count = read_vint(reader)? as usize;
97        let mut postings = Vec::with_capacity(count);
98
99        let mut prev_doc_id = 0u32;
100        for _ in 0..count {
101            let delta = read_vint(reader)? as u32;
102            let term_freq = read_vint(reader)? as u32;
103            let doc_id = prev_doc_id + delta;
104            postings.push(Posting { doc_id, term_freq });
105            prev_doc_id = doc_id;
106        }
107
108        Ok(Self { postings })
109    }
110}
111
112/// Iterator over posting list that supports seeking
113pub struct PostingListIterator<'a> {
114    postings: &'a [Posting],
115    position: usize,
116}
117
118impl<'a> PostingListIterator<'a> {
119    pub fn new(posting_list: &'a PostingList) -> Self {
120        Self {
121            postings: &posting_list.postings,
122            position: 0,
123        }
124    }
125
126    /// Current document ID, or TERMINATED if exhausted
127    pub fn doc(&self) -> DocId {
128        if self.position < self.postings.len() {
129            self.postings[self.position].doc_id
130        } else {
131            TERMINATED
132        }
133    }
134
135    /// Current term frequency
136    pub fn term_freq(&self) -> u32 {
137        if self.position < self.postings.len() {
138            self.postings[self.position].term_freq
139        } else {
140            0
141        }
142    }
143
144    /// Advance to next posting, returns new doc_id or TERMINATED
145    pub fn advance(&mut self) -> DocId {
146        self.position += 1;
147        self.doc()
148    }
149
150    /// Seek to first doc_id >= target (binary search on remaining postings)
151    pub fn seek(&mut self, target: DocId) -> DocId {
152        let remaining = &self.postings[self.position..];
153        let offset = remaining.partition_point(|p| p.doc_id < target);
154        self.position += offset;
155        self.doc()
156    }
157
158    /// Size hint for remaining elements
159    pub fn size_hint(&self) -> usize {
160        self.postings.len().saturating_sub(self.position)
161    }
162}
163
164/// Sentinel value indicating iterator is exhausted
165pub const TERMINATED: DocId = DocId::MAX;
166
167/// Block-based posting list with 2-level skip index.
168///
169/// Each block contains up to `BLOCK_SIZE` postings encoded as packed bit-width arrays.
170/// Skip entries use a compact 2-level structure for cache-friendly seeking:
171/// - **Level-0** (16 bytes/block): `first_doc`, `last_doc`, `offset`, `max_weight`
172/// - **Level-1** (4 bytes/group): `last_doc` per `L1_INTERVAL` blocks
173///
174/// Seek algorithm: binary search L1, then linear scan ≤`L1_INTERVAL` L0 entries.
175pub const BLOCK_SIZE: usize = 128;
176
177/// Number of L0 blocks per L1 skip entry.
178const L1_INTERVAL: usize = 8;
179
180/// Compact level-0 skip entry — 16 bytes.
181/// `length` is omitted: computable from the block's 8-byte header.
182const L0_SIZE: usize = 16;
183
184/// Level-1 skip entry — 4 bytes (just `last_doc`).
185const L1_SIZE: usize = 4;
186
187/// Footer: stream_len(8) + l0_count(4) + l1_count(4) + doc_count(4) + max_tf(4) = 24 bytes.
188const FOOTER_SIZE: usize = 24;
189
190/// Read a compact L0 entry from raw bytes at the given index.
191#[inline]
192fn read_l0(bytes: &[u8], idx: usize) -> (u32, u32, u32, f32) {
193    let p = idx * L0_SIZE;
194    let first_doc = u32::from_le_bytes(bytes[p..p + 4].try_into().unwrap());
195    let last_doc = u32::from_le_bytes(bytes[p + 4..p + 8].try_into().unwrap());
196    let offset = u32::from_le_bytes(bytes[p + 8..p + 12].try_into().unwrap());
197    let max_weight = f32::from_le_bytes(bytes[p + 12..p + 16].try_into().unwrap());
198    (first_doc, last_doc, offset, max_weight)
199}
200
201/// Write a compact L0 entry.
202#[inline]
203fn write_l0(buf: &mut Vec<u8>, first_doc: u32, last_doc: u32, offset: u32, max_weight: f32) {
204    buf.extend_from_slice(&first_doc.to_le_bytes());
205    buf.extend_from_slice(&last_doc.to_le_bytes());
206    buf.extend_from_slice(&offset.to_le_bytes());
207    buf.extend_from_slice(&max_weight.to_le_bytes());
208}
209
210/// Compute block data size from the 8-byte header at `stream[pos..]`.
211///
212/// Header: `[count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]`
213/// Data size = 8 + (count-1) × bytes_per_value(doc_id_bits) + count × bytes_per_value(tf_bits)
214#[inline]
215fn block_data_size(stream: &[u8], pos: usize) -> usize {
216    let count = u16::from_le_bytes(stream[pos..pos + 2].try_into().unwrap()) as usize;
217    let doc_rounded = simd::RoundedBitWidth::from_u8(stream[pos + 6]);
218    let tf_rounded = simd::RoundedBitWidth::from_u8(stream[pos + 7]);
219    let delta_bytes = if count > 1 {
220        (count - 1) * doc_rounded.bytes_per_value()
221    } else {
222        0
223    };
224    8 + delta_bytes + count * tf_rounded.bytes_per_value()
225}
226
227#[derive(Debug, Clone)]
228pub struct BlockPostingList {
229    /// Block data stream (packed blocks laid out sequentially).
230    stream: OwnedBytes,
231    /// Level-0 skip entries: `(first_doc, last_doc, offset, max_weight)` × `l0_count`.
232    /// 16 bytes per entry. Supports O(1) random access by block index.
233    l0_bytes: OwnedBytes,
234    /// Number of blocks (= number of L0 entries).
235    l0_count: usize,
236    /// Level-1 skip `last_doc` values — one per `L1_INTERVAL` blocks.
237    /// Stored as `Vec<u32>` for direct SIMD-accelerated `find_first_ge_u32`.
238    l1_docs: Vec<u32>,
239    /// Total posting count.
240    doc_count: u32,
241    /// Max TF across all blocks.
242    max_tf: u32,
243}
244
245impl BlockPostingList {
246    /// Read L0 entry by block index. Returns `(first_doc, last_doc, offset, max_weight)`.
247    #[inline]
248    fn read_l0_entry(&self, idx: usize) -> (u32, u32, u32, f32) {
249        read_l0(&self.l0_bytes, idx)
250    }
251
252    /// Build from a posting list.
253    ///
254    /// Block format (8-byte header + packed arrays):
255    /// ```text
256    /// [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
257    /// [packed doc_id deltas: (count-1) × bytes_per_value(doc_id_bits)]
258    /// [packed tfs: count × bytes_per_value(tf_bits)]
259    /// ```
260    pub fn from_posting_list(list: &PostingList) -> io::Result<Self> {
261        let mut stream: Vec<u8> = Vec::new();
262        let mut l0_buf: Vec<u8> = Vec::new();
263        let mut l1_docs: Vec<u32> = Vec::new();
264        let mut l0_count = 0usize;
265        let mut max_tf = 0u32;
266
267        let postings = &list.postings;
268        let mut i = 0;
269
270        // Temp buffers reused across blocks
271        let mut deltas = Vec::with_capacity(BLOCK_SIZE);
272        let mut tf_buf = Vec::with_capacity(BLOCK_SIZE);
273
274        while i < postings.len() {
275            let block_start = stream.len() as u32;
276            let block_end = (i + BLOCK_SIZE).min(postings.len());
277            let block = &postings[i..block_end];
278            let count = block.len();
279
280            // Compute block's max term frequency for block-max pruning
281            let block_max_tf = block.iter().map(|p| p.term_freq).max().unwrap_or(0);
282            max_tf = max_tf.max(block_max_tf);
283
284            let base_doc_id = block.first().unwrap().doc_id;
285            let last_doc_id = block.last().unwrap().doc_id;
286
287            // Delta-encode doc IDs (skip first — stored in header)
288            deltas.clear();
289            let mut prev = base_doc_id;
290            for posting in block.iter().skip(1) {
291                deltas.push(posting.doc_id - prev);
292                prev = posting.doc_id;
293            }
294            let max_delta = deltas.iter().copied().max().unwrap_or(0);
295            let doc_id_bits = simd::round_bit_width(simd::bits_needed(max_delta));
296
297            // Collect TFs
298            tf_buf.clear();
299            tf_buf.extend(block.iter().map(|p| p.term_freq));
300            let tf_bits = simd::round_bit_width(simd::bits_needed(block_max_tf));
301
302            // Write 8-byte header: [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
303            stream.write_u16::<LittleEndian>(count as u16)?;
304            stream.write_u32::<LittleEndian>(base_doc_id)?;
305            stream.push(doc_id_bits);
306            stream.push(tf_bits);
307
308            // Write packed doc_id deltas ((count-1) values)
309            if count > 1 {
310                let rounded = simd::RoundedBitWidth::from_u8(doc_id_bits);
311                let byte_count = (count - 1) * rounded.bytes_per_value();
312                let start = stream.len();
313                stream.resize(start + byte_count, 0);
314                simd::pack_rounded(&deltas, rounded, &mut stream[start..]);
315            }
316
317            // Write packed TFs (count values)
318            {
319                let rounded = simd::RoundedBitWidth::from_u8(tf_bits);
320                let byte_count = count * rounded.bytes_per_value();
321                let start = stream.len();
322                stream.resize(start + byte_count, 0);
323                simd::pack_rounded(&tf_buf, rounded, &mut stream[start..]);
324            }
325
326            // L0 skip entry
327            write_l0(
328                &mut l0_buf,
329                base_doc_id,
330                last_doc_id,
331                block_start,
332                block_max_tf as f32,
333            );
334            l0_count += 1;
335
336            // L1 entry at the end of each L1_INTERVAL group
337            if l0_count.is_multiple_of(L1_INTERVAL) {
338                l1_docs.push(last_doc_id);
339            }
340
341            i = block_end;
342        }
343
344        // Final L1 entry for partial group
345        if !l0_count.is_multiple_of(L1_INTERVAL) && l0_count > 0 {
346            let (_, last_doc, _, _) = read_l0(&l0_buf, l0_count - 1);
347            l1_docs.push(last_doc);
348        }
349
350        Ok(Self {
351            stream: OwnedBytes::new(stream),
352            l0_bytes: OwnedBytes::new(l0_buf),
353            l0_count,
354            l1_docs,
355            doc_count: postings.len() as u32,
356            max_tf,
357        })
358    }
359
360    /// Serialize the block posting list (footer-based: stream first).
361    ///
362    /// Format:
363    /// ```text
364    /// [stream: block data]
365    /// [L0 entries: l0_count × 16 bytes (first_doc, last_doc, offset, max_weight)]
366    /// [L1 entries: l1_count × 4 bytes (last_doc)]
367    /// [footer: stream_len(8) + l0_count(4) + l1_count(4) + doc_count(4) + max_tf(4) = 24 bytes]
368    /// ```
369    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
370        writer.write_all(&self.stream)?;
371        writer.write_all(&self.l0_bytes)?;
372        for &doc in &self.l1_docs {
373            writer.write_u32::<LittleEndian>(doc)?;
374        }
375
376        // Footer (24 bytes)
377        writer.write_u64::<LittleEndian>(self.stream.len() as u64)?;
378        writer.write_u32::<LittleEndian>(self.l0_count as u32)?;
379        writer.write_u32::<LittleEndian>(self.l1_docs.len() as u32)?;
380        writer.write_u32::<LittleEndian>(self.doc_count)?;
381        writer.write_u32::<LittleEndian>(self.max_tf)?;
382
383        Ok(())
384    }
385
386    /// Deserialize from a byte slice (footer-based format).
387    pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
388        if raw.len() < FOOTER_SIZE {
389            return Err(io::Error::new(
390                io::ErrorKind::InvalidData,
391                "posting data too short",
392            ));
393        }
394
395        let f = raw.len() - FOOTER_SIZE;
396        let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
397        let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
398        let l1_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap()) as usize;
399        let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
400        let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
401
402        let l0_start = stream_len;
403        let l0_end = l0_start + l0_count * L0_SIZE;
404        let l1_start = l0_end;
405
406        let l1_docs = Self::extract_l1_docs(&raw[l1_start..], l1_count);
407
408        Ok(Self {
409            stream: OwnedBytes::new(raw[..stream_len].to_vec()),
410            l0_bytes: OwnedBytes::new(raw[l0_start..l0_end].to_vec()),
411            l0_count,
412            l1_docs,
413            doc_count,
414            max_tf,
415        })
416    }
417
418    /// Zero-copy deserialization from OwnedBytes.
419    /// Stream and L0 are sliced from the source without copying.
420    /// L1 is extracted into a `Vec<u32>` for SIMD-friendly access (tiny: ≤ N/8 entries).
421    pub fn deserialize_zero_copy(raw: OwnedBytes) -> io::Result<Self> {
422        if raw.len() < FOOTER_SIZE {
423            return Err(io::Error::new(
424                io::ErrorKind::InvalidData,
425                "posting data too short",
426            ));
427        }
428
429        let f = raw.len() - FOOTER_SIZE;
430        let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
431        let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
432        let l1_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap()) as usize;
433        let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
434        let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
435
436        let l0_start = stream_len;
437        let l0_end = l0_start + l0_count * L0_SIZE;
438        let l1_start = l0_end;
439
440        let l1_docs = Self::extract_l1_docs(&raw[l1_start..], l1_count);
441
442        Ok(Self {
443            stream: raw.slice(0..stream_len),
444            l0_bytes: raw.slice(l0_start..l0_end),
445            l0_count,
446            l1_docs,
447            doc_count,
448            max_tf,
449        })
450    }
451
452    /// Extract L1 last_doc values from raw LE bytes into a Vec<u32>.
453    fn extract_l1_docs(bytes: &[u8], count: usize) -> Vec<u32> {
454        let mut docs = Vec::with_capacity(count);
455        for i in 0..count {
456            let p = i * L1_SIZE;
457            docs.push(u32::from_le_bytes(bytes[p..p + 4].try_into().unwrap()));
458        }
459        docs
460    }
461
462    pub fn doc_count(&self) -> u32 {
463        self.doc_count
464    }
465
466    /// Get maximum term frequency (for MaxScore upper bound computation)
467    pub fn max_tf(&self) -> u32 {
468        self.max_tf
469    }
470
471    /// Get number of blocks
472    pub fn num_blocks(&self) -> usize {
473        self.l0_count
474    }
475
476    /// Get block's max term frequency for block-max pruning
477    pub fn block_max_tf(&self, block_idx: usize) -> Option<u32> {
478        if block_idx >= self.l0_count {
479            return None;
480        }
481        let (_, _, _, max_weight) = self.read_l0_entry(block_idx);
482        Some(max_weight as u32)
483    }
484
485    /// Concatenate blocks from multiple posting lists with doc_id remapping.
486    /// This is O(num_blocks) instead of O(num_postings).
487    pub fn concatenate_blocks(sources: &[(BlockPostingList, u32)]) -> io::Result<Self> {
488        let mut stream: Vec<u8> = Vec::new();
489        let mut l0_buf: Vec<u8> = Vec::new();
490        let mut l1_docs: Vec<u32> = Vec::new();
491        let mut l0_count = 0usize;
492        let mut total_docs = 0u32;
493        let mut max_tf = 0u32;
494
495        for (source, doc_offset) in sources {
496            max_tf = max_tf.max(source.max_tf);
497            for block_idx in 0..source.num_blocks() {
498                let (first_doc, last_doc, offset, max_weight) = source.read_l0_entry(block_idx);
499                let blk_size = block_data_size(&source.stream, offset as usize);
500                let block_bytes = &source.stream[offset as usize..offset as usize + blk_size];
501
502                let count = u16::from_le_bytes(block_bytes[0..2].try_into().unwrap());
503                let new_offset = stream.len() as u32;
504
505                // Write patched header + copy packed arrays verbatim
506                stream.write_u16::<LittleEndian>(count)?;
507                stream.write_u32::<LittleEndian>(first_doc + doc_offset)?;
508                stream.extend_from_slice(&block_bytes[6..]);
509
510                let new_last = last_doc + doc_offset;
511                write_l0(
512                    &mut l0_buf,
513                    first_doc + doc_offset,
514                    new_last,
515                    new_offset,
516                    max_weight,
517                );
518                l0_count += 1;
519                total_docs += count as u32;
520
521                if l0_count.is_multiple_of(L1_INTERVAL) {
522                    l1_docs.push(new_last);
523                }
524            }
525        }
526
527        // Final L1 entry for partial group
528        if !l0_count.is_multiple_of(L1_INTERVAL) && l0_count > 0 {
529            let (_, last_doc, _, _) = read_l0(&l0_buf, l0_count - 1);
530            l1_docs.push(last_doc);
531        }
532
533        Ok(Self {
534            stream: OwnedBytes::new(stream),
535            l0_bytes: OwnedBytes::new(l0_buf),
536            l0_count,
537            l1_docs,
538            doc_count: total_docs,
539            max_tf,
540        })
541    }
542
543    /// Streaming merge: write blocks directly to output writer (bounded memory).
544    ///
545    /// **Zero-materializing**: reads L0 entries directly from source bytes
546    /// (mmap or &[u8]) without parsing into Vecs. Block sizes computed from
547    /// the 8-byte header (deterministic with packed encoding).
548    ///
549    /// Output L0 + L1 are buffered (bounded O(total_blocks × 16 + total_blocks/8 × 4)).
550    /// Block data flows source → output writer without intermediate buffering.
551    ///
552    /// Returns `(doc_count, bytes_written)`.
553    pub fn concatenate_streaming<W: Write>(
554        sources: &[(&[u8], u32)], // (serialized_bytes, doc_offset)
555        writer: &mut W,
556    ) -> io::Result<(u32, usize)> {
557        struct SourceMeta {
558            stream_len: usize,
559            l0_count: usize,
560        }
561
562        let mut metas: Vec<SourceMeta> = Vec::with_capacity(sources.len());
563        let mut total_docs = 0u32;
564        let mut merged_max_tf = 0u32;
565
566        for (raw, _) in sources {
567            if raw.len() < FOOTER_SIZE {
568                continue;
569            }
570            let f = raw.len() - FOOTER_SIZE;
571            let stream_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
572            let l0_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
573            // l1_count not needed — we rebuild L1
574            let doc_count = u32::from_le_bytes(raw[f + 16..f + 20].try_into().unwrap());
575            let max_tf = u32::from_le_bytes(raw[f + 20..f + 24].try_into().unwrap());
576            total_docs += doc_count;
577            merged_max_tf = merged_max_tf.max(max_tf);
578            metas.push(SourceMeta {
579                stream_len,
580                l0_count,
581            });
582        }
583
584        // Phase 1: Stream block data, reading L0 entries on-the-fly.
585        // Accumulate output L0 + L1 (bounded).
586        let mut out_l0: Vec<u8> = Vec::new();
587        let mut out_l1_docs: Vec<u32> = Vec::new();
588        let mut out_l0_count = 0usize;
589        let mut stream_written = 0u64;
590        let mut patch_buf = [0u8; 8];
591
592        for (src_idx, meta) in metas.iter().enumerate() {
593            let (raw, doc_offset) = &sources[src_idx];
594            let l0_base = meta.stream_len; // L0 entries start right after stream
595            let src_stream = &raw[..meta.stream_len];
596
597            for i in 0..meta.l0_count {
598                // Read source L0 entry directly from raw bytes
599                let (first_doc, last_doc, offset, max_weight) = read_l0(&raw[l0_base..], i);
600
601                // Compute block size from header
602                let blk_size = block_data_size(src_stream, offset as usize);
603                let block = &src_stream[offset as usize..offset as usize + blk_size];
604
605                // Write output L0 entry
606                let new_last = last_doc + doc_offset;
607                write_l0(
608                    &mut out_l0,
609                    first_doc + doc_offset,
610                    new_last,
611                    stream_written as u32,
612                    max_weight,
613                );
614                out_l0_count += 1;
615
616                // L1 entry at group boundary
617                if out_l0_count.is_multiple_of(L1_INTERVAL) {
618                    out_l1_docs.push(new_last);
619                }
620
621                // Patch 8-byte header: [count: u16][first_doc: u32][bits: 2 bytes]
622                patch_buf.copy_from_slice(&block[0..8]);
623                let blk_first = u32::from_le_bytes(patch_buf[2..6].try_into().unwrap());
624                patch_buf[2..6].copy_from_slice(&(blk_first + doc_offset).to_le_bytes());
625                writer.write_all(&patch_buf)?;
626                writer.write_all(&block[8..])?;
627
628                stream_written += blk_size as u64;
629            }
630        }
631
632        // Final L1 entry for partial group
633        if !out_l0_count.is_multiple_of(L1_INTERVAL) && out_l0_count > 0 {
634            let (_, last_doc, _, _) = read_l0(&out_l0, out_l0_count - 1);
635            out_l1_docs.push(last_doc);
636        }
637
638        // Phase 2: Write L0 + L1 + footer
639        writer.write_all(&out_l0)?;
640        for &doc in &out_l1_docs {
641            writer.write_u32::<LittleEndian>(doc)?;
642        }
643
644        writer.write_u64::<LittleEndian>(stream_written)?;
645        writer.write_u32::<LittleEndian>(out_l0_count as u32)?;
646        writer.write_u32::<LittleEndian>(out_l1_docs.len() as u32)?;
647        writer.write_u32::<LittleEndian>(total_docs)?;
648        writer.write_u32::<LittleEndian>(merged_max_tf)?;
649
650        let l1_bytes_len = out_l1_docs.len() * L1_SIZE;
651        let total_bytes = stream_written as usize + out_l0.len() + l1_bytes_len + FOOTER_SIZE;
652        Ok((total_docs, total_bytes))
653    }
654
655    /// Decode a specific block into caller-provided buffers.
656    ///
657    /// Returns `true` if the block was decoded, `false` if `block_idx` is out of range.
658    /// Reuses `doc_ids` and `tfs` buffers (cleared before filling).
659    ///
660    /// Uses SIMD-accelerated unpack for 8/16/32-bit packed arrays.
661    pub fn decode_block_into(
662        &self,
663        block_idx: usize,
664        doc_ids: &mut Vec<u32>,
665        tfs: &mut Vec<u32>,
666    ) -> bool {
667        if block_idx >= self.l0_count {
668            return false;
669        }
670
671        let (_, _, offset, _) = self.read_l0_entry(block_idx);
672        let pos = offset as usize;
673        let blk_size = block_data_size(&self.stream, pos);
674        let block_data = &self.stream[pos..pos + blk_size];
675
676        // 8-byte header: [count: u16][first_doc: u32][doc_id_bits: u8][tf_bits: u8]
677        let count = u16::from_le_bytes(block_data[0..2].try_into().unwrap()) as usize;
678        let first_doc = u32::from_le_bytes(block_data[2..6].try_into().unwrap());
679        let doc_id_bits = block_data[6];
680        let tf_bits = block_data[7];
681
682        // Decode doc IDs: unpack deltas + prefix sum
683        doc_ids.clear();
684        doc_ids.resize(count, 0);
685        doc_ids[0] = first_doc;
686
687        let doc_rounded = simd::RoundedBitWidth::from_u8(doc_id_bits);
688        let deltas_bytes = if count > 1 {
689            (count - 1) * doc_rounded.bytes_per_value()
690        } else {
691            0
692        };
693
694        if count > 1 {
695            simd::unpack_rounded(
696                &block_data[8..8 + deltas_bytes],
697                doc_rounded,
698                &mut doc_ids[1..],
699                count - 1,
700            );
701            for i in 1..count {
702                doc_ids[i] += doc_ids[i - 1];
703            }
704        }
705
706        // Decode TFs
707        tfs.clear();
708        tfs.resize(count, 0);
709        let tf_rounded = simd::RoundedBitWidth::from_u8(tf_bits);
710        let tfs_start = 8 + deltas_bytes;
711        simd::unpack_rounded(
712            &block_data[tfs_start..tfs_start + count * tf_rounded.bytes_per_value()],
713            tf_rounded,
714            tfs,
715            count,
716        );
717
718        true
719    }
720
721    /// First doc_id of a block (from L0 skip entry). Returns `None` if out of range.
722    #[inline]
723    pub fn block_first_doc(&self, block_idx: usize) -> Option<DocId> {
724        if block_idx >= self.l0_count {
725            return None;
726        }
727        let (first_doc, _, _, _) = self.read_l0_entry(block_idx);
728        Some(first_doc)
729    }
730
731    /// Last doc_id of a block (from L0 skip entry). Returns `None` if out of range.
732    #[inline]
733    pub fn block_last_doc(&self, block_idx: usize) -> Option<DocId> {
734        if block_idx >= self.l0_count {
735            return None;
736        }
737        let (_, last_doc, _, _) = self.read_l0_entry(block_idx);
738        Some(last_doc)
739    }
740
741    /// Find the first block whose `last_doc >= target`, starting from `from_block`.
742    ///
743    /// Uses SIMD-accelerated linear scan:
744    /// 1. `find_first_ge_u32` on the contiguous L1 `last_doc` array
745    /// 2. Extract ≤`L1_INTERVAL` L0 `last_doc` values into a stack buffer → `find_first_ge_u32`
746    ///
747    /// Returns `None` if no block contains `target`.
748    pub fn seek_block(&self, target: DocId, from_block: usize) -> Option<usize> {
749        if from_block >= self.l0_count {
750            return None;
751        }
752
753        let from_l1 = from_block / L1_INTERVAL;
754
755        // SIMD scan L1 to find the group containing target
756        let l1_idx = if !self.l1_docs.is_empty() {
757            let idx = from_l1 + simd::find_first_ge_u32(&self.l1_docs[from_l1..], target);
758            if idx >= self.l1_docs.len() {
759                return None;
760            }
761            idx
762        } else {
763            return None;
764        };
765
766        // Extract L0 last_doc values within the group into a stack buffer for SIMD scan
767        let start = (l1_idx * L1_INTERVAL).max(from_block);
768        let end = ((l1_idx + 1) * L1_INTERVAL).min(self.l0_count);
769        let count = end - start;
770
771        let mut last_docs = [u32::MAX; L1_INTERVAL];
772        for (j, idx) in (start..end).enumerate() {
773            let (_, ld, _, _) = read_l0(&self.l0_bytes, idx);
774            last_docs[j] = ld;
775        }
776        let within = simd::find_first_ge_u32(&last_docs[..count], target);
777        let block_idx = start + within;
778
779        if block_idx < self.l0_count {
780            Some(block_idx)
781        } else {
782            None
783        }
784    }
785
786    /// Create an iterator with skip support
787    pub fn iterator(&self) -> BlockPostingIterator<'_> {
788        BlockPostingIterator::new(self)
789    }
790
791    /// Create an owned iterator that doesn't borrow self
792    pub fn into_iterator(self) -> BlockPostingIterator<'static> {
793        BlockPostingIterator::owned(self)
794    }
795}
796
797/// Iterator over block posting list with skip support
798/// Can be either borrowed or owned via Cow
799///
800/// Uses struct-of-arrays layout: separate Vec<u32> for doc_ids and term_freqs.
801/// This is more cache-friendly for SIMD seek (contiguous doc_ids) and halves
802/// memory vs the previous AoS + separate doc_ids approach.
803pub struct BlockPostingIterator<'a> {
804    block_list: std::borrow::Cow<'a, BlockPostingList>,
805    current_block: usize,
806    block_doc_ids: Vec<u32>,
807    block_tfs: Vec<u32>,
808    position_in_block: usize,
809    exhausted: bool,
810}
811
812impl<'a> BlockPostingIterator<'a> {
813    fn new(block_list: &'a BlockPostingList) -> Self {
814        let exhausted = block_list.l0_count == 0;
815        let mut iter = Self {
816            block_list: std::borrow::Cow::Borrowed(block_list),
817            current_block: 0,
818            block_doc_ids: Vec::with_capacity(BLOCK_SIZE),
819            block_tfs: Vec::with_capacity(BLOCK_SIZE),
820            position_in_block: 0,
821            exhausted,
822        };
823        if !iter.exhausted {
824            iter.load_block(0);
825        }
826        iter
827    }
828
829    fn owned(block_list: BlockPostingList) -> BlockPostingIterator<'static> {
830        let exhausted = block_list.l0_count == 0;
831        let mut iter = BlockPostingIterator {
832            block_list: std::borrow::Cow::Owned(block_list),
833            current_block: 0,
834            block_doc_ids: Vec::with_capacity(BLOCK_SIZE),
835            block_tfs: Vec::with_capacity(BLOCK_SIZE),
836            position_in_block: 0,
837            exhausted,
838        };
839        if !iter.exhausted {
840            iter.load_block(0);
841        }
842        iter
843    }
844
845    fn load_block(&mut self, block_idx: usize) {
846        if block_idx >= self.block_list.l0_count {
847            self.exhausted = true;
848            return;
849        }
850
851        self.current_block = block_idx;
852        self.position_in_block = 0;
853
854        self.block_list
855            .decode_block_into(block_idx, &mut self.block_doc_ids, &mut self.block_tfs);
856    }
857
858    pub fn doc(&self) -> DocId {
859        if self.exhausted {
860            TERMINATED
861        } else if self.position_in_block < self.block_doc_ids.len() {
862            self.block_doc_ids[self.position_in_block]
863        } else {
864            TERMINATED
865        }
866    }
867
868    pub fn term_freq(&self) -> u32 {
869        if self.exhausted || self.position_in_block >= self.block_tfs.len() {
870            0
871        } else {
872            self.block_tfs[self.position_in_block]
873        }
874    }
875
876    pub fn advance(&mut self) -> DocId {
877        if self.exhausted {
878            return TERMINATED;
879        }
880
881        self.position_in_block += 1;
882        if self.position_in_block >= self.block_doc_ids.len() {
883            self.load_block(self.current_block + 1);
884        }
885        self.doc()
886    }
887
888    pub fn seek(&mut self, target: DocId) -> DocId {
889        if self.exhausted {
890            return TERMINATED;
891        }
892
893        // SIMD-accelerated 2-level seek (forward from current block)
894        let block_idx = match self.block_list.seek_block(target, self.current_block) {
895            Some(idx) => idx,
896            None => {
897                self.exhausted = true;
898                return TERMINATED;
899            }
900        };
901
902        if block_idx != self.current_block {
903            self.load_block(block_idx);
904        }
905
906        // SIMD linear scan within block on cached doc_ids
907        let remaining = &self.block_doc_ids[self.position_in_block..];
908        let pos = crate::structures::simd::find_first_ge_u32(remaining, target);
909        self.position_in_block += pos;
910
911        if self.position_in_block >= self.block_doc_ids.len() {
912            self.load_block(self.current_block + 1);
913        }
914        self.doc()
915    }
916
917    /// Skip to the next block, returning the first doc_id in the new block
918    /// This is used for block-max pruning when the current block's
919    /// max score can't beat the threshold.
920    pub fn skip_to_next_block(&mut self) -> DocId {
921        if self.exhausted {
922            return TERMINATED;
923        }
924        self.load_block(self.current_block + 1);
925        self.doc()
926    }
927
928    /// Get the current block index
929    #[inline]
930    pub fn current_block_idx(&self) -> usize {
931        self.current_block
932    }
933
934    /// Get total number of blocks
935    #[inline]
936    pub fn num_blocks(&self) -> usize {
937        self.block_list.l0_count
938    }
939
940    /// Get the current block's max term frequency for block-max pruning
941    #[inline]
942    pub fn current_block_max_tf(&self) -> u32 {
943        if self.exhausted || self.current_block >= self.block_list.l0_count {
944            0
945        } else {
946            let (_, _, _, max_weight) = self.block_list.read_l0_entry(self.current_block);
947            max_weight as u32
948        }
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use super::*;
955
956    #[test]
957    fn test_posting_list_basic() {
958        let mut list = PostingList::new();
959        list.push(1, 2);
960        list.push(5, 1);
961        list.push(10, 3);
962
963        assert_eq!(list.len(), 3);
964
965        let mut iter = PostingListIterator::new(&list);
966        assert_eq!(iter.doc(), 1);
967        assert_eq!(iter.term_freq(), 2);
968
969        assert_eq!(iter.advance(), 5);
970        assert_eq!(iter.term_freq(), 1);
971
972        assert_eq!(iter.advance(), 10);
973        assert_eq!(iter.term_freq(), 3);
974
975        assert_eq!(iter.advance(), TERMINATED);
976    }
977
978    #[test]
979    fn test_posting_list_serialization() {
980        let mut list = PostingList::new();
981        for i in 0..100 {
982            list.push(i * 3, (i % 5) + 1);
983        }
984
985        let mut buffer = Vec::new();
986        list.serialize(&mut buffer).unwrap();
987
988        let deserialized = PostingList::deserialize(&mut &buffer[..]).unwrap();
989        assert_eq!(deserialized.len(), list.len());
990
991        for (a, b) in list.iter().zip(deserialized.iter()) {
992            assert_eq!(a, b);
993        }
994    }
995
996    #[test]
997    fn test_posting_list_seek() {
998        let mut list = PostingList::new();
999        for i in 0..100 {
1000            list.push(i * 2, 1);
1001        }
1002
1003        let mut iter = PostingListIterator::new(&list);
1004
1005        assert_eq!(iter.seek(50), 50);
1006        assert_eq!(iter.seek(51), 52);
1007        assert_eq!(iter.seek(200), TERMINATED);
1008    }
1009
1010    #[test]
1011    fn test_block_posting_list() {
1012        let mut list = PostingList::new();
1013        for i in 0..500 {
1014            list.push(i * 2, (i % 10) + 1);
1015        }
1016
1017        let block_list = BlockPostingList::from_posting_list(&list).unwrap();
1018        assert_eq!(block_list.doc_count(), 500);
1019
1020        let mut iter = block_list.iterator();
1021        assert_eq!(iter.doc(), 0);
1022        assert_eq!(iter.term_freq(), 1);
1023
1024        // Test seek across blocks
1025        assert_eq!(iter.seek(500), 500);
1026        assert_eq!(iter.seek(998), 998);
1027        assert_eq!(iter.seek(1000), TERMINATED);
1028    }
1029
1030    #[test]
1031    fn test_block_posting_list_serialization() {
1032        let mut list = PostingList::new();
1033        for i in 0..300 {
1034            list.push(i * 3, i + 1);
1035        }
1036
1037        let block_list = BlockPostingList::from_posting_list(&list).unwrap();
1038
1039        let mut buffer = Vec::new();
1040        block_list.serialize(&mut buffer).unwrap();
1041
1042        let deserialized = BlockPostingList::deserialize(&buffer[..]).unwrap();
1043        assert_eq!(deserialized.doc_count(), block_list.doc_count());
1044
1045        // Verify iteration produces same results
1046        let mut iter1 = block_list.iterator();
1047        let mut iter2 = deserialized.iterator();
1048
1049        while iter1.doc() != TERMINATED {
1050            assert_eq!(iter1.doc(), iter2.doc());
1051            assert_eq!(iter1.term_freq(), iter2.term_freq());
1052            iter1.advance();
1053            iter2.advance();
1054        }
1055        assert_eq!(iter2.doc(), TERMINATED);
1056    }
1057
1058    /// Helper: collect all (doc_id, tf) from a BlockPostingIterator
1059    fn collect_postings(bpl: &BlockPostingList) -> Vec<(u32, u32)> {
1060        let mut result = Vec::new();
1061        let mut it = bpl.iterator();
1062        while it.doc() != TERMINATED {
1063            result.push((it.doc(), it.term_freq()));
1064            it.advance();
1065        }
1066        result
1067    }
1068
1069    /// Helper: build a BlockPostingList from (doc_id, tf) pairs
1070    fn build_bpl(postings: &[(u32, u32)]) -> BlockPostingList {
1071        let mut pl = PostingList::new();
1072        for &(doc_id, tf) in postings {
1073            pl.push(doc_id, tf);
1074        }
1075        BlockPostingList::from_posting_list(&pl).unwrap()
1076    }
1077
1078    /// Helper: serialize a BlockPostingList to bytes
1079    fn serialize_bpl(bpl: &BlockPostingList) -> Vec<u8> {
1080        let mut buf = Vec::new();
1081        bpl.serialize(&mut buf).unwrap();
1082        buf
1083    }
1084
1085    #[test]
1086    fn test_concatenate_blocks_two_segments() {
1087        // Segment A: docs 0,2,4,...,198 (100 docs, tf=1..100)
1088        let a: Vec<(u32, u32)> = (0..100).map(|i| (i * 2, i + 1)).collect();
1089        let bpl_a = build_bpl(&a);
1090
1091        // Segment B: docs 0,3,6,...,297 (100 docs, tf=2..101)
1092        let b: Vec<(u32, u32)> = (0..100).map(|i| (i * 3, i + 2)).collect();
1093        let bpl_b = build_bpl(&b);
1094
1095        // Merge: segment B starts at doc_offset=200
1096        let merged =
1097            BlockPostingList::concatenate_blocks(&[(bpl_a.clone(), 0), (bpl_b.clone(), 200)])
1098                .unwrap();
1099
1100        assert_eq!(merged.doc_count(), 200);
1101
1102        let postings = collect_postings(&merged);
1103        assert_eq!(postings.len(), 200);
1104
1105        // First 100 from A (unchanged)
1106        for (i, p) in postings.iter().enumerate().take(100) {
1107            assert_eq!(*p, (i as u32 * 2, i as u32 + 1));
1108        }
1109        // Next 100 from B (doc_id += 200)
1110        for i in 0..100 {
1111            assert_eq!(postings[100 + i], (i as u32 * 3 + 200, i as u32 + 2));
1112        }
1113    }
1114
1115    #[test]
1116    fn test_concatenate_streaming_matches_blocks() {
1117        // Build 3 segments with different doc distributions
1118        let seg_a: Vec<(u32, u32)> = (0..250).map(|i| (i * 2, (i % 7) + 1)).collect();
1119        let seg_b: Vec<(u32, u32)> = (0..180).map(|i| (i * 5, (i % 3) + 1)).collect();
1120        let seg_c: Vec<(u32, u32)> = (0..90).map(|i| (i * 10, (i % 11) + 1)).collect();
1121
1122        let bpl_a = build_bpl(&seg_a);
1123        let bpl_b = build_bpl(&seg_b);
1124        let bpl_c = build_bpl(&seg_c);
1125
1126        let offset_b = 1000u32;
1127        let offset_c = 2000u32;
1128
1129        // Method 1: concatenate_blocks (in-memory reference)
1130        let ref_merged = BlockPostingList::concatenate_blocks(&[
1131            (bpl_a.clone(), 0),
1132            (bpl_b.clone(), offset_b),
1133            (bpl_c.clone(), offset_c),
1134        ])
1135        .unwrap();
1136        let mut ref_buf = Vec::new();
1137        ref_merged.serialize(&mut ref_buf).unwrap();
1138
1139        // Method 2: concatenate_streaming (footer-based, writes to output)
1140        let bytes_a = serialize_bpl(&bpl_a);
1141        let bytes_b = serialize_bpl(&bpl_b);
1142        let bytes_c = serialize_bpl(&bpl_c);
1143
1144        let sources: Vec<(&[u8], u32)> =
1145            vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
1146        let mut stream_buf = Vec::new();
1147        let (doc_count, bytes_written) =
1148            BlockPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
1149
1150        assert_eq!(doc_count, 520); // 250 + 180 + 90
1151        assert_eq!(bytes_written, stream_buf.len());
1152
1153        // Deserialize both and verify identical postings
1154        let ref_postings = collect_postings(&BlockPostingList::deserialize(&ref_buf).unwrap());
1155        let stream_postings =
1156            collect_postings(&BlockPostingList::deserialize(&stream_buf).unwrap());
1157
1158        assert_eq!(ref_postings.len(), stream_postings.len());
1159        for (i, (r, s)) in ref_postings.iter().zip(stream_postings.iter()).enumerate() {
1160            assert_eq!(r, s, "mismatch at posting {}", i);
1161        }
1162    }
1163
1164    #[test]
1165    fn test_multi_round_merge() {
1166        // Simulate 3 rounds of merging (like tiered merge policy)
1167        //
1168        // Round 0: 4 small segments built independently
1169        // Round 1: merge pairs → 2 medium segments
1170        // Round 2: merge those → 1 large segment
1171
1172        let segments: Vec<Vec<(u32, u32)>> = (0..4)
1173            .map(|seg| (0..200).map(|i| (i * 3, (i + seg * 7) % 10 + 1)).collect())
1174            .collect();
1175
1176        let bpls: Vec<BlockPostingList> = segments.iter().map(|s| build_bpl(s)).collect();
1177        let serialized: Vec<Vec<u8>> = bpls.iter().map(serialize_bpl).collect();
1178
1179        // Round 1: merge seg0+seg1 (offset=0,600), seg2+seg3 (offset=0,600)
1180        let mut merged_01 = Vec::new();
1181        let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
1182        let (dc_01, _) =
1183            BlockPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
1184        assert_eq!(dc_01, 400);
1185
1186        let mut merged_23 = Vec::new();
1187        let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
1188        let (dc_23, _) =
1189            BlockPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
1190        assert_eq!(dc_23, 400);
1191
1192        // Round 2: merge the two intermediate results (offset=0, 1200)
1193        let mut final_merged = Vec::new();
1194        let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
1195        let (dc_final, _) =
1196            BlockPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
1197        assert_eq!(dc_final, 800);
1198
1199        // Verify final result has all 800 postings with correct doc_ids
1200        let final_bpl = BlockPostingList::deserialize(&final_merged).unwrap();
1201        let postings = collect_postings(&final_bpl);
1202        assert_eq!(postings.len(), 800);
1203
1204        // Verify doc_id ordering (must be monotonically non-decreasing within segments,
1205        // and segment boundaries at 0, 600, 1200, 1800)
1206        // Seg0: 0..597, Seg1: 600..1197, Seg2: 1200..1797, Seg3: 1800..2397
1207        assert_eq!(postings[0].0, 0); // first doc of seg0
1208        assert_eq!(postings[199].0, 597); // last doc of seg0 (199*3)
1209        assert_eq!(postings[200].0, 600); // first doc of seg1 (0+600)
1210        assert_eq!(postings[399].0, 1197); // last doc of seg1 (597+600)
1211        assert_eq!(postings[400].0, 1200); // first doc of seg2
1212        assert_eq!(postings[799].0, 2397); // last doc of seg3
1213
1214        // Verify TFs preserved through two rounds of merging
1215        // Creation formula: tf = (i + seg * 7) % 10 + 1
1216        for seg in 0u32..4 {
1217            for i in 0u32..200 {
1218                let idx = (seg * 200 + i) as usize;
1219                assert_eq!(
1220                    postings[idx].1,
1221                    (i + seg * 7) % 10 + 1,
1222                    "seg{} tf[{}]",
1223                    seg,
1224                    i
1225                );
1226            }
1227        }
1228
1229        // Verify seek works on final merged result
1230        let mut it = final_bpl.iterator();
1231        assert_eq!(it.seek(600), 600);
1232        assert_eq!(it.seek(1200), 1200);
1233        assert_eq!(it.seek(2397), 2397);
1234        assert_eq!(it.seek(2398), TERMINATED);
1235    }
1236
1237    #[test]
1238    fn test_large_scale_merge() {
1239        // 5 segments × 2000 docs each = 10,000 total docs
1240        // Each segment has 16 blocks (2000/128 = 15.6 → 16 blocks)
1241        let num_segments = 5;
1242        let docs_per_segment = 2000;
1243        let docs_gap = 3; // doc_ids: 0, 3, 6, ...
1244
1245        let segments: Vec<Vec<(u32, u32)>> = (0..num_segments)
1246            .map(|seg| {
1247                (0..docs_per_segment)
1248                    .map(|i| (i as u32 * docs_gap, (i as u32 + seg as u32) % 20 + 1))
1249                    .collect()
1250            })
1251            .collect();
1252
1253        let bpls: Vec<BlockPostingList> = segments.iter().map(|s| build_bpl(s)).collect();
1254
1255        // Verify each segment has multiple blocks
1256        for bpl in &bpls {
1257            assert!(
1258                bpl.num_blocks() >= 15,
1259                "expected >=15 blocks, got {}",
1260                bpl.num_blocks()
1261            );
1262        }
1263
1264        let serialized: Vec<Vec<u8>> = bpls.iter().map(serialize_bpl).collect();
1265
1266        // Compute offsets: each segment occupies max_doc+1 doc_id space
1267        let max_doc_per_seg = (docs_per_segment as u32 - 1) * docs_gap;
1268        let offsets: Vec<u32> = (0..num_segments)
1269            .map(|i| i as u32 * (max_doc_per_seg + 1))
1270            .collect();
1271
1272        let sources: Vec<(&[u8], u32)> = serialized
1273            .iter()
1274            .zip(offsets.iter())
1275            .map(|(b, o)| (b.as_slice(), *o))
1276            .collect();
1277
1278        let mut merged = Vec::new();
1279        let (doc_count, _) =
1280            BlockPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1281        assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1282
1283        // Deserialize and verify
1284        let merged_bpl = BlockPostingList::deserialize(&merged).unwrap();
1285        let postings = collect_postings(&merged_bpl);
1286        assert_eq!(postings.len(), num_segments * docs_per_segment);
1287
1288        // Verify all doc_ids are strictly monotonically increasing across segment boundaries
1289        for i in 1..postings.len() {
1290            assert!(
1291                postings[i].0 > postings[i - 1].0 || (i % docs_per_segment == 0), // new segment can have lower absolute ID
1292                "doc_id not increasing at {}: {} vs {}",
1293                i,
1294                postings[i - 1].0,
1295                postings[i].0,
1296            );
1297        }
1298
1299        // Verify seek across all block boundaries
1300        let mut it = merged_bpl.iterator();
1301        for (seg, &expected_first) in offsets.iter().enumerate() {
1302            assert_eq!(
1303                it.seek(expected_first),
1304                expected_first,
1305                "seek to segment {} start",
1306                seg
1307            );
1308        }
1309    }
1310
1311    #[test]
1312    fn test_merge_edge_cases() {
1313        // Single doc per segment
1314        let bpl_a = build_bpl(&[(0, 5)]);
1315        let bpl_b = build_bpl(&[(0, 3)]);
1316
1317        let merged =
1318            BlockPostingList::concatenate_blocks(&[(bpl_a.clone(), 0), (bpl_b.clone(), 1)])
1319                .unwrap();
1320        assert_eq!(merged.doc_count(), 2);
1321        let p = collect_postings(&merged);
1322        assert_eq!(p, vec![(0, 5), (1, 3)]);
1323
1324        // Exactly BLOCK_SIZE docs (single full block)
1325        let exact_block: Vec<(u32, u32)> = (0..BLOCK_SIZE as u32).map(|i| (i, i % 5 + 1)).collect();
1326        let bpl_exact = build_bpl(&exact_block);
1327        assert_eq!(bpl_exact.num_blocks(), 1);
1328
1329        let bytes = serialize_bpl(&bpl_exact);
1330        let mut out = Vec::new();
1331        let sources: Vec<(&[u8], u32)> = vec![(&bytes, 0), (&bytes, BLOCK_SIZE as u32)];
1332        let (dc, _) = BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1333        assert_eq!(dc, BLOCK_SIZE as u32 * 2);
1334
1335        let merged = BlockPostingList::deserialize(&out).unwrap();
1336        let postings = collect_postings(&merged);
1337        assert_eq!(postings.len(), BLOCK_SIZE * 2);
1338        // Second segment's docs offset by BLOCK_SIZE
1339        assert_eq!(postings[BLOCK_SIZE].0, BLOCK_SIZE as u32);
1340
1341        // BLOCK_SIZE + 1 docs (two blocks: 128 + 1)
1342        let over_block: Vec<(u32, u32)> = (0..BLOCK_SIZE as u32 + 1).map(|i| (i * 2, 1)).collect();
1343        let bpl_over = build_bpl(&over_block);
1344        assert_eq!(bpl_over.num_blocks(), 2);
1345    }
1346
1347    #[test]
1348    fn test_streaming_roundtrip_single_source() {
1349        // Streaming merge with a single source should produce equivalent output to serialize
1350        let docs: Vec<(u32, u32)> = (0..500).map(|i| (i * 7, i % 15 + 1)).collect();
1351        let bpl = build_bpl(&docs);
1352        let direct = serialize_bpl(&bpl);
1353
1354        let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1355        let mut streamed = Vec::new();
1356        BlockPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1357
1358        // Both should deserialize to identical postings
1359        let p1 = collect_postings(&BlockPostingList::deserialize(&direct).unwrap());
1360        let p2 = collect_postings(&BlockPostingList::deserialize(&streamed).unwrap());
1361        assert_eq!(p1, p2);
1362    }
1363
1364    #[test]
1365    fn test_max_tf_preserved_through_merge() {
1366        // Segment A: max_tf = 50
1367        let mut a = Vec::new();
1368        for i in 0..200 {
1369            a.push((i * 2, if i == 100 { 50 } else { 1 }));
1370        }
1371        let bpl_a = build_bpl(&a);
1372        assert_eq!(bpl_a.max_tf(), 50);
1373
1374        // Segment B: max_tf = 30
1375        let mut b = Vec::new();
1376        for i in 0..200 {
1377            b.push((i * 2, if i == 50 { 30 } else { 2 }));
1378        }
1379        let bpl_b = build_bpl(&b);
1380        assert_eq!(bpl_b.max_tf(), 30);
1381
1382        // After merge, max_tf should be max(50, 30) = 50
1383        let bytes_a = serialize_bpl(&bpl_a);
1384        let bytes_b = serialize_bpl(&bpl_b);
1385        let sources: Vec<(&[u8], u32)> = vec![(&bytes_a, 0), (&bytes_b, 1000)];
1386        let mut out = Vec::new();
1387        BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1388
1389        let merged = BlockPostingList::deserialize(&out).unwrap();
1390        assert_eq!(merged.max_tf(), 50);
1391        assert_eq!(merged.doc_count(), 400);
1392    }
1393
1394    // ── 2-level skip list format tests ──────────────────────────────────
1395
1396    #[test]
1397    fn test_l0_l1_counts() {
1398        // 1 block (< L1_INTERVAL) → 1 L1 entry (partial group)
1399        let bpl = build_bpl(&(0..50u32).map(|i| (i, 1)).collect::<Vec<_>>());
1400        assert_eq!(bpl.num_blocks(), 1);
1401        assert_eq!(bpl.l1_docs.len(), 1);
1402
1403        // Exactly L1_INTERVAL blocks → 1 L1 entry (full group)
1404        let n = BLOCK_SIZE * L1_INTERVAL;
1405        let bpl = build_bpl(&(0..n as u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1406        assert_eq!(bpl.num_blocks(), L1_INTERVAL);
1407        assert_eq!(bpl.l1_docs.len(), 1);
1408
1409        // L1_INTERVAL + 1 blocks → 2 L1 entries
1410        let n = BLOCK_SIZE * L1_INTERVAL + 1;
1411        let bpl = build_bpl(&(0..n as u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1412        assert_eq!(bpl.num_blocks(), L1_INTERVAL + 1);
1413        assert_eq!(bpl.l1_docs.len(), 2);
1414
1415        // 3 × L1_INTERVAL blocks → 3 L1 entries (all full groups)
1416        let n = BLOCK_SIZE * L1_INTERVAL * 3;
1417        let bpl = build_bpl(&(0..n as u32).map(|i| (i, 1)).collect::<Vec<_>>());
1418        assert_eq!(bpl.num_blocks(), L1_INTERVAL * 3);
1419        assert_eq!(bpl.l1_docs.len(), 3);
1420    }
1421
1422    #[test]
1423    fn test_l1_last_doc_values() {
1424        // 20 blocks: 2 full L1 groups (8+8) + 1 partial (4) → 3 L1 entries
1425        let n = BLOCK_SIZE * 20;
1426        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 3, 1)).collect();
1427        let bpl = build_bpl(&docs);
1428        assert_eq!(bpl.num_blocks(), 20);
1429        assert_eq!(bpl.l1_docs.len(), 3); // ceil(20/8) = 3
1430
1431        // L1[0] = last_doc of block 7 (end of first group)
1432        let expected_l1_0 = bpl.block_last_doc(7).unwrap();
1433        assert_eq!(bpl.l1_docs[0], expected_l1_0);
1434
1435        // L1[1] = last_doc of block 15 (end of second group)
1436        let expected_l1_1 = bpl.block_last_doc(15).unwrap();
1437        assert_eq!(bpl.l1_docs[1], expected_l1_1);
1438
1439        // L1[2] = last_doc of block 19 (end of partial group)
1440        let expected_l1_2 = bpl.block_last_doc(19).unwrap();
1441        assert_eq!(bpl.l1_docs[2], expected_l1_2);
1442    }
1443
1444    #[test]
1445    fn test_seek_block_basic() {
1446        // 20 blocks spanning large doc ID range
1447        let n = BLOCK_SIZE * 20;
1448        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 10, 1)).collect();
1449        let bpl = build_bpl(&docs);
1450
1451        // Seek to doc 0 → block 0
1452        assert_eq!(bpl.seek_block(0, 0), Some(0));
1453
1454        // Seek to the first doc of each block
1455        for blk in 0..20 {
1456            let first = bpl.block_first_doc(blk).unwrap();
1457            assert_eq!(
1458                bpl.seek_block(first, 0),
1459                Some(blk),
1460                "seek to block {} first_doc",
1461                blk
1462            );
1463        }
1464
1465        // Seek to the last doc of each block
1466        for blk in 0..20 {
1467            let last = bpl.block_last_doc(blk).unwrap();
1468            assert_eq!(
1469                bpl.seek_block(last, 0),
1470                Some(blk),
1471                "seek to block {} last_doc",
1472                blk
1473            );
1474        }
1475
1476        // Seek past all docs
1477        let max_doc = bpl.block_last_doc(19).unwrap();
1478        assert_eq!(bpl.seek_block(max_doc + 1, 0), None);
1479
1480        // Seek with from_block > 0 (skip early blocks)
1481        let mid_doc = bpl.block_first_doc(10).unwrap();
1482        assert_eq!(bpl.seek_block(mid_doc, 10), Some(10));
1483        assert_eq!(
1484            bpl.seek_block(mid_doc, 11),
1485            Some(11).or(bpl.seek_block(mid_doc, 11))
1486        );
1487    }
1488
1489    #[test]
1490    fn test_seek_block_across_l1_boundaries() {
1491        // 24 blocks = 3 L1 groups of 8
1492        let n = BLOCK_SIZE * 24;
1493        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 5, 1)).collect();
1494        let bpl = build_bpl(&docs);
1495        assert_eq!(bpl.l1_docs.len(), 3);
1496
1497        // Seek into each L1 group
1498        for group in 0..3 {
1499            let blk = group * L1_INTERVAL;
1500            let target = bpl.block_first_doc(blk).unwrap();
1501            assert_eq!(
1502                bpl.seek_block(target, 0),
1503                Some(blk),
1504                "seek to group {} block {}",
1505                group,
1506                blk
1507            );
1508        }
1509
1510        // Seek to doc in the middle of group 2 (block 20)
1511        let target = bpl.block_first_doc(20).unwrap() + 1;
1512        assert_eq!(bpl.seek_block(target, 0), Some(20));
1513    }
1514
1515    #[test]
1516    fn test_block_data_size_helper() {
1517        // Build a posting list and verify block_data_size matches actual block sizes
1518        let docs: Vec<(u32, u32)> = (0..500u32).map(|i| (i * 7, (i % 20) + 1)).collect();
1519        let bpl = build_bpl(&docs);
1520
1521        for blk in 0..bpl.num_blocks() {
1522            let (_, _, offset, _) = bpl.read_l0_entry(blk);
1523            let computed_size = block_data_size(&bpl.stream, offset as usize);
1524
1525            // Verify: next block's offset - this block's offset should equal computed_size
1526            // (for all but last block)
1527            if blk + 1 < bpl.num_blocks() {
1528                let (_, _, next_offset, _) = bpl.read_l0_entry(blk + 1);
1529                assert_eq!(
1530                    computed_size,
1531                    (next_offset - offset) as usize,
1532                    "block_data_size mismatch at block {}",
1533                    blk
1534                );
1535            } else {
1536                // Last block: offset + size should equal stream length
1537                assert_eq!(
1538                    offset as usize + computed_size,
1539                    bpl.stream.len(),
1540                    "last block size mismatch"
1541                );
1542            }
1543        }
1544    }
1545
1546    #[test]
1547    fn test_l0_entry_roundtrip() {
1548        // Verify L0 entries survive serialize → deserialize
1549        let docs: Vec<(u32, u32)> = (0..1000u32).map(|i| (i * 3, (i % 10) + 1)).collect();
1550        let bpl = build_bpl(&docs);
1551
1552        let bytes = serialize_bpl(&bpl);
1553        let bpl2 = BlockPostingList::deserialize(&bytes).unwrap();
1554
1555        assert_eq!(bpl.num_blocks(), bpl2.num_blocks());
1556        for blk in 0..bpl.num_blocks() {
1557            assert_eq!(
1558                bpl.read_l0_entry(blk),
1559                bpl2.read_l0_entry(blk),
1560                "L0 entry mismatch at block {}",
1561                blk
1562            );
1563        }
1564
1565        // Verify L1 docs match
1566        assert_eq!(bpl.l1_docs, bpl2.l1_docs);
1567    }
1568
1569    #[test]
1570    fn test_zero_copy_deserialize_matches() {
1571        let docs: Vec<(u32, u32)> = (0..2000u32).map(|i| (i * 2, (i % 5) + 1)).collect();
1572        let bpl = build_bpl(&docs);
1573        let bytes = serialize_bpl(&bpl);
1574
1575        let copied = BlockPostingList::deserialize(&bytes).unwrap();
1576        let zero_copy =
1577            BlockPostingList::deserialize_zero_copy(OwnedBytes::new(bytes.clone())).unwrap();
1578
1579        // Same structure
1580        assert_eq!(copied.l0_count, zero_copy.l0_count);
1581        assert_eq!(copied.l1_docs, zero_copy.l1_docs);
1582        assert_eq!(copied.doc_count, zero_copy.doc_count);
1583        assert_eq!(copied.max_tf, zero_copy.max_tf);
1584
1585        // Same iteration
1586        let p1 = collect_postings(&copied);
1587        let p2 = collect_postings(&zero_copy);
1588        assert_eq!(p1, p2);
1589    }
1590
1591    #[test]
1592    fn test_l1_preserved_through_streaming_merge() {
1593        // Merge 3 segments, verify L1 is correctly rebuilt
1594        let seg_a = build_bpl(&(0..1000u32).map(|i| (i * 2, 1)).collect::<Vec<_>>());
1595        let seg_b = build_bpl(&(0..800u32).map(|i| (i * 3, 2)).collect::<Vec<_>>());
1596        let seg_c = build_bpl(&(0..500u32).map(|i| (i * 5, 3)).collect::<Vec<_>>());
1597
1598        let bytes_a = serialize_bpl(&seg_a);
1599        let bytes_b = serialize_bpl(&seg_b);
1600        let bytes_c = serialize_bpl(&seg_c);
1601
1602        let sources: Vec<(&[u8], u32)> = vec![(&bytes_a, 0), (&bytes_b, 10000), (&bytes_c, 20000)];
1603        let mut out = Vec::new();
1604        BlockPostingList::concatenate_streaming(&sources, &mut out).unwrap();
1605
1606        let merged = BlockPostingList::deserialize(&out).unwrap();
1607        let expected_l1_count = merged.num_blocks().div_ceil(L1_INTERVAL);
1608        assert_eq!(merged.l1_docs.len(), expected_l1_count);
1609
1610        // Verify L1 values are correct
1611        for (i, &l1_doc) in merged.l1_docs.iter().enumerate() {
1612            let last_block_in_group = ((i + 1) * L1_INTERVAL - 1).min(merged.num_blocks() - 1);
1613            let expected = merged.block_last_doc(last_block_in_group).unwrap();
1614            assert_eq!(l1_doc, expected, "L1[{}] mismatch", i);
1615        }
1616
1617        // Verify seek_block works on merged result
1618        for blk in 0..merged.num_blocks() {
1619            let first = merged.block_first_doc(blk).unwrap();
1620            assert_eq!(merged.seek_block(first, 0), Some(blk));
1621        }
1622    }
1623
1624    #[test]
1625    fn test_seek_block_single_block() {
1626        // Edge case: single block (< L1_INTERVAL)
1627        let bpl = build_bpl(&[(0, 1), (10, 2), (20, 3)]);
1628        assert_eq!(bpl.num_blocks(), 1);
1629        assert_eq!(bpl.l1_docs.len(), 1);
1630
1631        assert_eq!(bpl.seek_block(0, 0), Some(0));
1632        assert_eq!(bpl.seek_block(10, 0), Some(0));
1633        assert_eq!(bpl.seek_block(20, 0), Some(0));
1634        assert_eq!(bpl.seek_block(21, 0), None);
1635    }
1636
1637    #[test]
1638    fn test_footer_size() {
1639        // Verify serialized size = stream + L0 + L1 + FOOTER_SIZE
1640        let docs: Vec<(u32, u32)> = (0..500u32).map(|i| (i * 2, 1)).collect();
1641        let bpl = build_bpl(&docs);
1642        let bytes = serialize_bpl(&bpl);
1643
1644        let expected =
1645            bpl.stream.len() + bpl.l0_count * L0_SIZE + bpl.l1_docs.len() * L1_SIZE + FOOTER_SIZE;
1646        assert_eq!(bytes.len(), expected);
1647    }
1648
1649    #[test]
1650    fn test_seek_block_from_block_skips_earlier() {
1651        // 16 blocks: seek with from_block should skip earlier blocks
1652        let n = BLOCK_SIZE * 16;
1653        let docs: Vec<(u32, u32)> = (0..n as u32).map(|i| (i * 3, 1)).collect();
1654        let bpl = build_bpl(&docs);
1655
1656        // Target is in block 5, but from_block=8 → should find block >= 8
1657        let target_in_5 = bpl.block_first_doc(5).unwrap() + 1;
1658        // from_block=8 means we only look at blocks 8+
1659        // target_in_5 < last_doc of block 8, so seek_block(target, 8) should return 8
1660        let result = bpl.seek_block(target_in_5, 8);
1661        assert!(result.is_some());
1662        assert!(result.unwrap() >= 8);
1663    }
1664}