Skip to main content

hermes_core/structures/postings/
positions.rs

1//! Position-aware posting list for phrase queries and multi-field element tracking
2//!
3//! Positions are encoded as: (element_ordinal << 20) | token_position
4//! This allows up to 4096 elements per field and ~1M tokens per element.
5//!
6//! ## Block Format
7//!
8//! Uses a block-based format with skip list for efficient binary search by doc_id:
9//! - Skip list enables O(log n) lookup by doc_id
10//! - Blocks of up to 128 documents for cache efficiency
11//! - Delta encoding within blocks for compression
12//! - Stackable for fast merge (just adjust doc_id_base per block)
13//!
14//! Format:
15//! ```text
16//! Header:
17//!   - doc_count: u32
18//!   - num_blocks: u32
19//!   - skip_list: [(base_doc_id, last_doc_id, byte_offset)] per block
20//!   - data_len: u32
21//! Data:
22//!   - blocks: each block contains delta-encoded postings with positions
23//! ```
24
25use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
26use std::io::{self, Write};
27
28use super::posting_common::{read_vint, write_vint};
29use crate::DocId;
30
31/// Block size for position posting list (same as regular posting list)
32pub const POSITION_BLOCK_SIZE: usize = 128;
33
34/// Maximum token position within an element (20 bits = 1,048,575)
35pub const MAX_TOKEN_POSITION: u32 = (1 << 20) - 1;
36
37/// Maximum element ordinal (12 bits = 4095)
38pub const MAX_ELEMENT_ORDINAL: u32 = (1 << 12) - 1;
39
40/// Encode element ordinal and token position into a single u32
41#[inline]
42pub fn encode_position(element_ordinal: u32, token_position: u32) -> u32 {
43    debug_assert!(
44        element_ordinal <= MAX_ELEMENT_ORDINAL,
45        "Element ordinal {} exceeds maximum {}",
46        element_ordinal,
47        MAX_ELEMENT_ORDINAL
48    );
49    debug_assert!(
50        token_position <= MAX_TOKEN_POSITION,
51        "Token position {} exceeds maximum {}",
52        token_position,
53        MAX_TOKEN_POSITION
54    );
55    (element_ordinal << 20) | (token_position & MAX_TOKEN_POSITION)
56}
57
58/// Decode element ordinal from encoded position
59#[inline]
60pub fn decode_element_ordinal(position: u32) -> u32 {
61    position >> 20
62}
63
64/// Decode token position from encoded position
65#[inline]
66pub fn decode_token_position(position: u32) -> u32 {
67    position & MAX_TOKEN_POSITION
68}
69
70/// A posting entry with positions (used during building)
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct PostingWithPositions {
73    pub doc_id: DocId,
74    pub term_freq: u32,
75    /// Encoded positions: (element_ordinal << 20) | token_position
76    pub positions: Vec<u32>,
77}
78
79/// Block-based position posting list with skip list for O(log n) doc_id lookup
80///
81/// Similar to BlockPostingList but stores positions per document.
82/// Uses binary search on skip list to find the right block, then linear scan within block.
83#[derive(Debug, Clone)]
84pub struct PositionPostingList {
85    /// Skip list: (base_doc_id, last_doc_id, byte_offset)
86    /// Enables binary search to find the right block
87    skip_list: Vec<(DocId, DocId, u64)>,
88    /// Compressed block data
89    data: Vec<u8>,
90    /// Total document count
91    doc_count: u32,
92}
93
94impl Default for PositionPostingList {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl PositionPostingList {
101    pub fn new() -> Self {
102        Self {
103            skip_list: Vec::new(),
104            data: Vec::new(),
105            doc_count: 0,
106        }
107    }
108
109    pub fn with_capacity(capacity: usize) -> Self {
110        Self {
111            skip_list: Vec::with_capacity(capacity / POSITION_BLOCK_SIZE + 1),
112            data: Vec::with_capacity(capacity * 8), // rough estimate
113            doc_count: 0,
114        }
115    }
116
117    /// Build from a list of postings with positions
118    pub fn from_postings(postings: &[PostingWithPositions]) -> io::Result<Self> {
119        if postings.is_empty() {
120            return Ok(Self::new());
121        }
122
123        let mut skip_list = Vec::new();
124        let mut data = Vec::new();
125        let mut i = 0;
126
127        while i < postings.len() {
128            let block_start = data.len() as u64;
129            let block_end = (i + POSITION_BLOCK_SIZE).min(postings.len());
130            let block = &postings[i..block_end];
131
132            // Record skip entry
133            let base_doc_id = block.first().unwrap().doc_id;
134            let last_doc_id = block.last().unwrap().doc_id;
135            skip_list.push((base_doc_id, last_doc_id, block_start));
136
137            // Write block: fixed u32 count + first_doc (8-byte prefix), then vint deltas
138            data.write_u32::<LittleEndian>(block.len() as u32)?;
139            data.write_u32::<LittleEndian>(base_doc_id)?;
140
141            let mut prev_doc_id = base_doc_id;
142            for (j, posting) in block.iter().enumerate() {
143                if j > 0 {
144                    let delta = posting.doc_id - prev_doc_id;
145                    write_vint(&mut data, delta as u64)?;
146                }
147                prev_doc_id = posting.doc_id;
148
149                // Write positions count and positions (absolute - delta bad for ordinal<<20)
150                write_vint(&mut data, posting.positions.len() as u64)?;
151                for &pos in &posting.positions {
152                    write_vint(&mut data, pos as u64)?;
153                }
154            }
155
156            i = block_end;
157        }
158
159        Ok(Self {
160            skip_list,
161            data,
162            doc_count: postings.len() as u32,
163        })
164    }
165
166    /// Add a posting with positions (for building - converts to block format on serialize)
167    pub fn push(&mut self, doc_id: DocId, positions: Vec<u32>) {
168        // For compatibility: build in-memory, convert to blocks on serialize
169        // This is a simplified approach - we rebuild skip_list on serialize
170        let posting = PostingWithPositions {
171            doc_id,
172            term_freq: positions.len() as u32,
173            positions,
174        };
175
176        // Serialize this posting to data buffer
177        let block_start = self.data.len() as u64;
178
179        // If this is first posting or we need a new block
180        let need_new_block =
181            self.skip_list.is_empty() || self.doc_count.is_multiple_of(POSITION_BLOCK_SIZE as u32);
182
183        if need_new_block {
184            // Start new block: fixed u32 count + first_doc (8-byte prefix)
185            self.skip_list.push((doc_id, doc_id, block_start));
186            self.data.write_u32::<LittleEndian>(1u32).unwrap();
187            self.data.write_u32::<LittleEndian>(doc_id).unwrap();
188        } else {
189            // Add to existing block — update count in-place + add delta
190            let last_block = self.skip_list.last_mut().unwrap();
191            let prev_doc_id = last_block.1;
192            last_block.1 = doc_id;
193
194            // Patch count u32 at block start
195            let count_offset = last_block.2 as usize;
196            let old_count = u32::from_le_bytes(
197                self.data[count_offset..count_offset + 4]
198                    .try_into()
199                    .unwrap(),
200            );
201            self.data[count_offset..count_offset + 4]
202                .copy_from_slice(&(old_count + 1).to_le_bytes());
203
204            let delta = doc_id - prev_doc_id;
205            write_vint(&mut self.data, delta as u64).unwrap();
206        }
207
208        // Write positions (absolute - delta encoding bad for ordinal<<20)
209        write_vint(&mut self.data, posting.positions.len() as u64).unwrap();
210        for &pos in &posting.positions {
211            write_vint(&mut self.data, pos as u64).unwrap();
212        }
213
214        self.doc_count += 1;
215    }
216
217    pub fn doc_count(&self) -> u32 {
218        self.doc_count
219    }
220
221    pub fn len(&self) -> usize {
222        self.doc_count as usize
223    }
224
225    pub fn is_empty(&self) -> bool {
226        self.doc_count == 0
227    }
228
229    /// Get positions for a specific document using binary search on skip list
230    pub fn get_positions(&self, target_doc_id: DocId) -> Option<Vec<u32>> {
231        if self.skip_list.is_empty() {
232            return None;
233        }
234
235        // Binary search on skip list to find the right block
236        let block_idx = match self.skip_list.binary_search_by(|&(base, last, _)| {
237            if target_doc_id < base {
238                std::cmp::Ordering::Greater
239            } else if target_doc_id > last {
240                std::cmp::Ordering::Less
241            } else {
242                std::cmp::Ordering::Equal
243            }
244        }) {
245            Ok(idx) => idx,
246            Err(_) => return None, // doc_id not in any block range
247        };
248
249        // Decode block and search for doc_id
250        let offset = self.skip_list[block_idx].2 as usize;
251        let mut reader = &self.data[offset..];
252
253        // Fixed 8-byte prefix: count(u32) + first_doc(u32)
254        let count = reader.read_u32::<LittleEndian>().ok()? as usize;
255        let first_doc = reader.read_u32::<LittleEndian>().ok()?;
256        let mut prev_doc_id = first_doc;
257
258        for i in 0..count {
259            let doc_id = if i == 0 {
260                first_doc
261            } else {
262                let delta = read_vint(&mut reader).ok()? as u32;
263                prev_doc_id + delta
264            };
265            prev_doc_id = doc_id;
266
267            let num_positions = read_vint(&mut reader).ok()? as usize;
268
269            if doc_id == target_doc_id {
270                // Found it! Read positions (stored absolute)
271                let mut positions = Vec::with_capacity(num_positions);
272                for _ in 0..num_positions {
273                    let pos = read_vint(&mut reader).ok()? as u32;
274                    positions.push(pos);
275                }
276                return Some(positions);
277            } else {
278                // Skip positions
279                for _ in 0..num_positions {
280                    let _ = read_vint(&mut reader);
281                }
282            }
283        }
284
285        None
286    }
287
288    /// Size of one serialized skip entry:
289    /// first_doc(4) + last_doc(4) + offset(8) + length(4) = 20 bytes
290    const SKIP_ENTRY_SIZE: usize = 20;
291
292    /// Serialize to bytes (footer-based: data first).
293    ///
294    /// Format:
295    /// ```text
296    /// [block data: data_len bytes]
297    /// [skip entries: N × 20 bytes (base_doc, last_doc, offset, length)]
298    /// [footer: data_len(8) + skip_count(4) + doc_count(4) = 16 bytes]
299    /// ```
300    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
301        // Data first
302        writer.write_all(&self.data)?;
303
304        // Skip list — compute length from adjacent entries
305        for (i, (base_doc_id, last_doc_id, offset)) in self.skip_list.iter().enumerate() {
306            let next_offset = if i + 1 < self.skip_list.len() {
307                self.skip_list[i + 1].2
308            } else {
309                self.data.len() as u64
310            };
311            let length = (next_offset - offset) as u32;
312            writer.write_u32::<LittleEndian>(*base_doc_id)?;
313            writer.write_u32::<LittleEndian>(*last_doc_id)?;
314            writer.write_u64::<LittleEndian>(*offset)?;
315            writer.write_u32::<LittleEndian>(length)?;
316        }
317
318        // Footer
319        writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
320        writer.write_u32::<LittleEndian>(self.skip_list.len() as u32)?;
321        writer.write_u32::<LittleEndian>(self.doc_count)?;
322
323        Ok(())
324    }
325
326    /// Deserialize from a byte slice (footer-based format).
327    pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
328        if raw.len() < 16 {
329            return Err(io::Error::new(
330                io::ErrorKind::InvalidData,
331                "position data too short",
332            ));
333        }
334
335        // Parse footer (last 16 bytes)
336        let f = raw.len() - 16;
337        let data_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
338        let skip_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
339        let doc_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap());
340
341        // Parse skip list (20-byte entries; length field not stored in-memory)
342        let mut skip_list = Vec::with_capacity(skip_count);
343        let mut pos = data_len;
344        for _ in 0..skip_count {
345            let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
346            let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
347            let offset = u64::from_le_bytes(raw[pos + 8..pos + 16].try_into().unwrap());
348            // pos + 16..pos + 20 = length (not needed in-memory; adjacent entries suffice)
349            skip_list.push((base, last, offset));
350            pos += Self::SKIP_ENTRY_SIZE;
351        }
352
353        let data = raw[..data_len].to_vec();
354
355        Ok(Self {
356            skip_list,
357            data,
358            doc_count,
359        })
360    }
361
362    /// Concatenate blocks from multiple position lists with doc_id remapping (for merge)
363    pub fn concatenate_blocks(sources: &[(PositionPostingList, u32)]) -> io::Result<Self> {
364        let mut skip_list = Vec::new();
365        let mut data = Vec::new();
366        let mut total_docs = 0u32;
367
368        for (source, doc_offset) in sources {
369            for block_idx in 0..source.skip_list.len() {
370                let (base, last, src_offset) = source.skip_list[block_idx];
371                let next_offset = if block_idx + 1 < source.skip_list.len() {
372                    source.skip_list[block_idx + 1].2 as usize
373                } else {
374                    source.data.len()
375                };
376
377                let new_base = base + doc_offset;
378                let new_last = last + doc_offset;
379                let new_offset = data.len() as u64;
380
381                // Copy and adjust block data
382                let block_bytes = &source.data[src_offset as usize..next_offset];
383
384                // Fixed 8-byte prefix: count(u32) + first_doc(u32)
385                let count = u32::from_le_bytes(block_bytes[0..4].try_into().unwrap());
386                let first_doc = u32::from_le_bytes(block_bytes[4..8].try_into().unwrap());
387
388                // Write patched prefix + copy rest verbatim
389                data.write_u32::<LittleEndian>(count)?;
390                data.write_u32::<LittleEndian>(first_doc + doc_offset)?;
391                data.extend_from_slice(&block_bytes[8..]);
392
393                skip_list.push((new_base, new_last, new_offset));
394                total_docs += count;
395            }
396        }
397
398        Ok(Self {
399            skip_list,
400            data,
401            doc_count: total_docs,
402        })
403    }
404
405    /// Streaming merge: write blocks directly to output writer (bounded memory).
406    ///
407    /// **Zero-materializing**: reads skip entries directly from source bytes
408    /// without parsing into Vecs. Explicit `length` field in each 20-byte
409    /// entry eliminates adjacent-entry lookups.
410    ///
411    /// Only output skip bytes are buffered (bounded O(total_blocks × 20)).
412    /// Block data flows source → output writer without intermediate buffering.
413    ///
414    /// Returns `(doc_count, bytes_written)`.
415    pub fn concatenate_streaming<W: Write>(
416        sources: &[(&[u8], u32)],
417        writer: &mut W,
418    ) -> io::Result<(u32, usize)> {
419        // Parse only footers (16 bytes each) — no skip entries materialized
420        struct SourceMeta {
421            data_len: usize,
422            skip_count: usize,
423        }
424
425        let mut metas: Vec<SourceMeta> = Vec::with_capacity(sources.len());
426        let mut total_docs = 0u32;
427
428        for (raw, _) in sources {
429            if raw.len() < 16 {
430                continue;
431            }
432            let f = raw.len() - 16;
433            let data_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
434            let skip_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
435            let doc_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap());
436            total_docs += doc_count;
437            metas.push(SourceMeta {
438                data_len,
439                skip_count,
440            });
441        }
442
443        // Phase 1: Stream block data, reading skip entries on-the-fly.
444        // Accumulate output skip bytes (bounded: 20 bytes × total_blocks).
445        let mut out_skip: Vec<u8> = Vec::new();
446        let mut out_skip_count = 0u32;
447        let mut data_written = 0u64;
448        let mut patch_buf = [0u8; 8];
449        let es = Self::SKIP_ENTRY_SIZE;
450
451        for (src_idx, meta) in metas.iter().enumerate() {
452            let (raw, doc_offset) = &sources[src_idx];
453            let skip_base = meta.data_len;
454            let data = &raw[..meta.data_len];
455
456            for i in 0..meta.skip_count {
457                // Read source skip entry directly from raw bytes
458                let p = skip_base + i * es;
459                let base = u32::from_le_bytes(raw[p..p + 4].try_into().unwrap());
460                let last = u32::from_le_bytes(raw[p + 4..p + 8].try_into().unwrap());
461                let offset = u64::from_le_bytes(raw[p + 8..p + 16].try_into().unwrap());
462                let length = u32::from_le_bytes(raw[p + 16..p + 20].try_into().unwrap());
463
464                let block = &data[offset as usize..(offset as usize + length as usize)];
465
466                // Write output skip entry
467                out_skip.extend_from_slice(&(base + doc_offset).to_le_bytes());
468                out_skip.extend_from_slice(&(last + doc_offset).to_le_bytes());
469                out_skip.extend_from_slice(&data_written.to_le_bytes());
470                out_skip.extend_from_slice(&length.to_le_bytes());
471                out_skip_count += 1;
472
473                // Write patched 8-byte prefix + rest of block verbatim
474                patch_buf[0..4].copy_from_slice(&block[0..4]);
475                let first_doc = u32::from_le_bytes(block[4..8].try_into().unwrap());
476                patch_buf[4..8].copy_from_slice(&(first_doc + doc_offset).to_le_bytes());
477                writer.write_all(&patch_buf)?;
478                writer.write_all(&block[8..])?;
479
480                data_written += block.len() as u64;
481            }
482        }
483
484        // Phase 2: Write skip entries + footer
485        writer.write_all(&out_skip)?;
486
487        writer.write_u64::<LittleEndian>(data_written)?;
488        writer.write_u32::<LittleEndian>(out_skip_count)?;
489        writer.write_u32::<LittleEndian>(total_docs)?;
490
491        let total_bytes = data_written as usize + out_skip.len() + 16;
492        Ok((total_docs, total_bytes))
493    }
494
495    /// Get iterator over all postings (for phrase queries)
496    pub fn iter(&self) -> PositionPostingIterator<'_> {
497        PositionPostingIterator::new(self)
498    }
499}
500
501/// Iterator over block-based position posting list
502pub struct PositionPostingIterator<'a> {
503    list: &'a PositionPostingList,
504    current_block: usize,
505    position_in_block: usize,
506    block_postings: Vec<PostingWithPositions>,
507    exhausted: bool,
508}
509
510impl<'a> PositionPostingIterator<'a> {
511    pub fn new(list: &'a PositionPostingList) -> Self {
512        let exhausted = list.skip_list.is_empty();
513        let mut iter = Self {
514            list,
515            current_block: 0,
516            position_in_block: 0,
517            block_postings: Vec::new(),
518            exhausted,
519        };
520        if !iter.exhausted {
521            iter.load_block(0);
522        }
523        iter
524    }
525
526    fn load_block(&mut self, block_idx: usize) {
527        if block_idx >= self.list.skip_list.len() {
528            self.exhausted = true;
529            return;
530        }
531
532        self.current_block = block_idx;
533        self.position_in_block = 0;
534
535        let offset = self.list.skip_list[block_idx].2 as usize;
536        let mut reader = &self.list.data[offset..];
537
538        // Fixed 8-byte prefix: count(u32) + first_doc(u32)
539        let count = reader.read_u32::<LittleEndian>().unwrap_or(0) as usize;
540        let first_doc = reader.read_u32::<LittleEndian>().unwrap_or(0);
541        self.block_postings.clear();
542        self.block_postings.reserve(count);
543
544        let mut prev_doc_id = first_doc;
545
546        for i in 0..count {
547            let doc_id = if i == 0 {
548                first_doc
549            } else {
550                let delta = read_vint(&mut reader).unwrap_or(0) as u32;
551                prev_doc_id + delta
552            };
553            prev_doc_id = doc_id;
554
555            let num_positions = read_vint(&mut reader).unwrap_or(0) as usize;
556            let mut positions = Vec::with_capacity(num_positions);
557            for _ in 0..num_positions {
558                let pos = read_vint(&mut reader).unwrap_or(0) as u32;
559                positions.push(pos);
560            }
561
562            self.block_postings.push(PostingWithPositions {
563                doc_id,
564                term_freq: num_positions as u32,
565                positions,
566            });
567        }
568    }
569
570    pub fn doc(&self) -> DocId {
571        if self.exhausted || self.position_in_block >= self.block_postings.len() {
572            u32::MAX
573        } else {
574            self.block_postings[self.position_in_block].doc_id
575        }
576    }
577
578    pub fn term_freq(&self) -> u32 {
579        if self.exhausted || self.position_in_block >= self.block_postings.len() {
580            0
581        } else {
582            self.block_postings[self.position_in_block].term_freq
583        }
584    }
585
586    pub fn positions(&self) -> &[u32] {
587        if self.exhausted || self.position_in_block >= self.block_postings.len() {
588            &[]
589        } else {
590            &self.block_postings[self.position_in_block].positions
591        }
592    }
593
594    pub fn advance(&mut self) {
595        if self.exhausted {
596            return;
597        }
598
599        self.position_in_block += 1;
600        if self.position_in_block >= self.block_postings.len() {
601            self.load_block(self.current_block + 1);
602        }
603    }
604
605    pub fn seek(&mut self, target: DocId) {
606        if self.exhausted {
607            return;
608        }
609
610        // Check if target is in current block
611        if let Some((_, last, _)) = self.list.skip_list.get(self.current_block)
612            && target <= *last
613        {
614            // Target might be in current block, scan forward
615            while self.position_in_block < self.block_postings.len()
616                && self.block_postings[self.position_in_block].doc_id < target
617            {
618                self.position_in_block += 1;
619            }
620            if self.position_in_block >= self.block_postings.len() {
621                self.load_block(self.current_block + 1);
622                self.seek(target); // Continue seeking in next block
623            }
624            return;
625        }
626
627        // Binary search on skip list to find the right block
628        let block_idx = match self.list.skip_list.binary_search_by(|&(base, last, _)| {
629            if target < base {
630                std::cmp::Ordering::Greater
631            } else if target > last {
632                std::cmp::Ordering::Less
633            } else {
634                std::cmp::Ordering::Equal
635            }
636        }) {
637            Ok(idx) => idx,
638            Err(idx) => idx, // Use the next block if not found exactly
639        };
640
641        if block_idx >= self.list.skip_list.len() {
642            self.exhausted = true;
643            return;
644        }
645
646        self.load_block(block_idx);
647
648        // Linear scan within block
649        while self.position_in_block < self.block_postings.len()
650            && self.block_postings[self.position_in_block].doc_id < target
651        {
652            self.position_in_block += 1;
653        }
654
655        if self.position_in_block >= self.block_postings.len() {
656            self.load_block(self.current_block + 1);
657        }
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664
665    #[test]
666    fn test_position_encoding() {
667        // Element 0, position 5
668        let pos = encode_position(0, 5);
669        assert_eq!(decode_element_ordinal(pos), 0);
670        assert_eq!(decode_token_position(pos), 5);
671
672        // Element 3, position 100
673        let pos = encode_position(3, 100);
674        assert_eq!(decode_element_ordinal(pos), 3);
675        assert_eq!(decode_token_position(pos), 100);
676
677        // Max values
678        let pos = encode_position(MAX_ELEMENT_ORDINAL, MAX_TOKEN_POSITION);
679        assert_eq!(decode_element_ordinal(pos), MAX_ELEMENT_ORDINAL);
680        assert_eq!(decode_token_position(pos), MAX_TOKEN_POSITION);
681    }
682
683    #[test]
684    fn test_position_posting_list_build() {
685        // Build from postings
686        let postings = vec![
687            PostingWithPositions {
688                doc_id: 1,
689                term_freq: 2,
690                positions: vec![encode_position(0, 0), encode_position(0, 2)],
691            },
692            PostingWithPositions {
693                doc_id: 3,
694                term_freq: 1,
695                positions: vec![encode_position(1, 0)],
696            },
697        ];
698
699        let list = PositionPostingList::from_postings(&postings).unwrap();
700        assert_eq!(list.doc_count(), 2);
701
702        // Test binary search
703        let pos = list.get_positions(1).unwrap();
704        assert_eq!(pos.len(), 2);
705
706        let pos = list.get_positions(3).unwrap();
707        assert_eq!(pos.len(), 1);
708
709        // Not found
710        assert!(list.get_positions(2).is_none());
711        assert!(list.get_positions(99).is_none());
712    }
713
714    #[test]
715    fn test_serialization_roundtrip() {
716        let postings = vec![
717            PostingWithPositions {
718                doc_id: 1,
719                term_freq: 2,
720                positions: vec![encode_position(0, 0), encode_position(0, 5)],
721            },
722            PostingWithPositions {
723                doc_id: 3,
724                term_freq: 1,
725                positions: vec![encode_position(1, 0)],
726            },
727            PostingWithPositions {
728                doc_id: 5,
729                term_freq: 1,
730                positions: vec![encode_position(0, 10)],
731            },
732        ];
733
734        let list = PositionPostingList::from_postings(&postings).unwrap();
735
736        let mut bytes = Vec::new();
737        list.serialize(&mut bytes).unwrap();
738
739        let deserialized = PositionPostingList::deserialize(&bytes).unwrap();
740
741        assert_eq!(list.doc_count(), deserialized.doc_count());
742
743        // Verify binary search works on deserialized
744        let pos = deserialized.get_positions(1).unwrap();
745        assert_eq!(pos, vec![encode_position(0, 0), encode_position(0, 5)]);
746
747        let pos = deserialized.get_positions(3).unwrap();
748        assert_eq!(pos, vec![encode_position(1, 0)]);
749    }
750
751    #[test]
752    fn test_binary_search_many_blocks() {
753        // Create enough postings to span multiple blocks (128 per block)
754        let mut postings = Vec::new();
755        for i in 0..300 {
756            postings.push(PostingWithPositions {
757                doc_id: i * 2, // doc_ids: 0, 2, 4, 6, ...
758                term_freq: 1,
759                positions: vec![encode_position(0, i)],
760            });
761        }
762
763        let list = PositionPostingList::from_postings(&postings).unwrap();
764        assert_eq!(list.doc_count(), 300);
765
766        // Should have 3 blocks (128 + 128 + 44)
767        assert_eq!(list.skip_list.len(), 3);
768
769        // Test binary search across blocks
770        let pos = list.get_positions(0).unwrap();
771        assert_eq!(pos, vec![encode_position(0, 0)]);
772
773        let pos = list.get_positions(256).unwrap(); // doc 128 in block 1
774        assert_eq!(pos, vec![encode_position(0, 128)]);
775
776        let pos = list.get_positions(598).unwrap(); // last doc
777        assert_eq!(pos, vec![encode_position(0, 299)]);
778
779        // Not found (odd numbers don't exist)
780        assert!(list.get_positions(1).is_none());
781        assert!(list.get_positions(257).is_none());
782    }
783
784    #[test]
785    fn test_concatenate_blocks_merge() {
786        // Test efficient merge by concatenating blocks
787        let postings1 = vec![
788            PostingWithPositions {
789                doc_id: 0,
790                term_freq: 1,
791                positions: vec![0],
792            },
793            PostingWithPositions {
794                doc_id: 1,
795                term_freq: 1,
796                positions: vec![5],
797            },
798            PostingWithPositions {
799                doc_id: 2,
800                term_freq: 1,
801                positions: vec![10],
802            },
803        ];
804        let list1 = PositionPostingList::from_postings(&postings1).unwrap();
805
806        let postings2 = vec![
807            PostingWithPositions {
808                doc_id: 0,
809                term_freq: 1,
810                positions: vec![100],
811            },
812            PostingWithPositions {
813                doc_id: 1,
814                term_freq: 1,
815                positions: vec![105],
816            },
817        ];
818        let list2 = PositionPostingList::from_postings(&postings2).unwrap();
819
820        // Merge with doc_id offset
821        let combined = PositionPostingList::concatenate_blocks(&[
822            (list1, 0), // no offset
823            (list2, 3), // offset by 3 (list1 has 3 docs)
824        ])
825        .unwrap();
826
827        assert_eq!(combined.doc_count(), 5);
828
829        // Check doc_ids are correctly remapped
830        assert!(combined.get_positions(0).is_some());
831        assert!(combined.get_positions(1).is_some());
832        assert!(combined.get_positions(2).is_some());
833        assert!(combined.get_positions(3).is_some()); // was 0 in list2
834        assert!(combined.get_positions(4).is_some()); // was 1 in list2
835    }
836
837    #[test]
838    fn test_iterator() {
839        let postings = vec![
840            PostingWithPositions {
841                doc_id: 1,
842                term_freq: 2,
843                positions: vec![0, 5],
844            },
845            PostingWithPositions {
846                doc_id: 3,
847                term_freq: 1,
848                positions: vec![10],
849            },
850            PostingWithPositions {
851                doc_id: 5,
852                term_freq: 1,
853                positions: vec![15],
854            },
855        ];
856
857        let list = PositionPostingList::from_postings(&postings).unwrap();
858        let mut iter = list.iter();
859
860        assert_eq!(iter.doc(), 1);
861        assert_eq!(iter.positions(), &[0, 5]);
862
863        iter.advance();
864        assert_eq!(iter.doc(), 3);
865
866        iter.seek(5);
867        assert_eq!(iter.doc(), 5);
868        assert_eq!(iter.positions(), &[15]);
869
870        iter.advance();
871        assert_eq!(iter.doc(), u32::MAX); // exhausted
872    }
873
874    /// Helper: build a PositionPostingList from (doc_id, positions) pairs
875    fn build_ppl(entries: &[(u32, Vec<u32>)]) -> PositionPostingList {
876        let postings: Vec<PostingWithPositions> = entries
877            .iter()
878            .map(|(doc_id, positions)| PostingWithPositions {
879                doc_id: *doc_id,
880                term_freq: positions.len() as u32,
881                positions: positions.clone(),
882            })
883            .collect();
884        PositionPostingList::from_postings(&postings).unwrap()
885    }
886
887    /// Helper: serialize a PositionPostingList to bytes
888    fn serialize_ppl(ppl: &PositionPostingList) -> Vec<u8> {
889        let mut buf = Vec::new();
890        ppl.serialize(&mut buf).unwrap();
891        buf
892    }
893
894    /// Helper: collect all (doc_id, positions) from a PositionPostingIterator
895    fn collect_positions(ppl: &PositionPostingList) -> Vec<(u32, Vec<u32>)> {
896        let mut result = Vec::new();
897        let mut it = ppl.iter();
898        while it.doc() != u32::MAX {
899            result.push((it.doc(), it.positions().to_vec()));
900            it.advance();
901        }
902        result
903    }
904
905    #[test]
906    fn test_concatenate_streaming_matches_blocks() {
907        // Build 3 segments with positions
908        let seg_a: Vec<(u32, Vec<u32>)> = (0..150)
909            .map(|i| (i * 2, vec![i * 10, i * 10 + 3]))
910            .collect();
911        let seg_b: Vec<(u32, Vec<u32>)> = (0..100).map(|i| (i * 5, vec![i * 7])).collect();
912        let seg_c: Vec<(u32, Vec<u32>)> = (0..80).map(|i| (i * 3, vec![i, i + 1, i + 2])).collect();
913
914        let ppl_a = build_ppl(&seg_a);
915        let ppl_b = build_ppl(&seg_b);
916        let ppl_c = build_ppl(&seg_c);
917
918        let offset_b = 500u32;
919        let offset_c = 1000u32;
920
921        // Method 1: concatenate_blocks (reference)
922        let ref_merged = PositionPostingList::concatenate_blocks(&[
923            (ppl_a.clone(), 0),
924            (ppl_b.clone(), offset_b),
925            (ppl_c.clone(), offset_c),
926        ])
927        .unwrap();
928        let mut ref_buf = Vec::new();
929        ref_merged.serialize(&mut ref_buf).unwrap();
930
931        // Method 2: concatenate_streaming
932        let bytes_a = serialize_ppl(&ppl_a);
933        let bytes_b = serialize_ppl(&ppl_b);
934        let bytes_c = serialize_ppl(&ppl_c);
935
936        let sources: Vec<(&[u8], u32)> =
937            vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
938        let mut stream_buf = Vec::new();
939        let (doc_count, bytes_written) =
940            PositionPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
941
942        assert_eq!(doc_count, 330);
943        assert_eq!(bytes_written, stream_buf.len());
944
945        // Deserialize both and compare all postings + positions
946        let ref_posts = collect_positions(&PositionPostingList::deserialize(&ref_buf).unwrap());
947        let stream_posts =
948            collect_positions(&PositionPostingList::deserialize(&stream_buf).unwrap());
949
950        assert_eq!(ref_posts.len(), stream_posts.len());
951        for (i, (r, s)) in ref_posts.iter().zip(stream_posts.iter()).enumerate() {
952            assert_eq!(r.0, s.0, "doc_id mismatch at {}", i);
953            assert_eq!(r.1, s.1, "positions mismatch at doc {}", r.0);
954        }
955    }
956
957    #[test]
958    fn test_positions_multi_round_merge() {
959        // Round 0: 4 segments with distinct positions
960        let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..4)
961            .map(|seg| {
962                (0..200)
963                    .map(|i| {
964                        let pos_count = (i % 3) + 1;
965                        let positions: Vec<u32> = (0..pos_count)
966                            .map(|p| (seg * 1000 + i * 10 + p) as u32)
967                            .collect();
968                        (i as u32 * 3, positions)
969                    })
970                    .collect()
971            })
972            .collect();
973
974        let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
975        let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
976
977        // Round 1: merge pairs
978        let mut merged_01 = Vec::new();
979        let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
980        let (dc_01, _) =
981            PositionPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
982        assert_eq!(dc_01, 400);
983
984        let mut merged_23 = Vec::new();
985        let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
986        let (dc_23, _) =
987            PositionPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
988        assert_eq!(dc_23, 400);
989
990        // Round 2: merge intermediate results
991        let mut final_merged = Vec::new();
992        let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
993        let (dc_final, _) =
994            PositionPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
995        assert_eq!(dc_final, 800);
996
997        // Verify all positions survived two rounds of merging
998        let final_ppl = PositionPostingList::deserialize(&final_merged).unwrap();
999        let all = collect_positions(&final_ppl);
1000        assert_eq!(all.len(), 800);
1001
1002        // Spot-check: first doc of each original segment
1003        // Seg0: doc_id=0, positions from seg=0
1004        assert_eq!(all[0].0, 0);
1005        assert_eq!(all[0].1, vec![0]); // seg=0, i=0, pos_count=1, pos=0*1000+0*10+0=0
1006
1007        // Seg1: doc_id=600, positions from seg=1
1008        assert_eq!(all[200].0, 600);
1009        assert_eq!(all[200].1, vec![1000]); // seg=1, i=0, pos_count=1, pos=1*1000+0*10+0=1000
1010
1011        // Seg2: doc_id=1200, positions from seg=2
1012        assert_eq!(all[400].0, 1200);
1013        assert_eq!(all[400].1, vec![2000]); // seg=2, i=0, pos_count=1, pos=2*1000+0*10+0=2000
1014
1015        // Verify seek + positions on the final merged list
1016        let mut it = final_ppl.iter();
1017        it.seek(1200);
1018        assert_eq!(it.doc(), 1200);
1019        assert_eq!(it.positions(), &[2000]);
1020
1021        // Verify get_positions (binary search path)
1022        let pos = final_ppl.get_positions(600).unwrap();
1023        assert_eq!(pos, vec![1000]);
1024    }
1025
1026    #[test]
1027    fn test_positions_large_scale_merge() {
1028        // 5 segments × 500 docs, each doc has 1-4 positions
1029        let num_segments = 5usize;
1030        let docs_per_segment = 500usize;
1031
1032        let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..num_segments)
1033            .map(|seg| {
1034                (0..docs_per_segment)
1035                    .map(|i| {
1036                        let n_pos = (i % 4) + 1;
1037                        let positions: Vec<u32> =
1038                            (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1039                        (i as u32 * 2, positions)
1040                    })
1041                    .collect()
1042            })
1043            .collect();
1044
1045        let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
1046        let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
1047
1048        let max_doc = (docs_per_segment as u32 - 1) * 2;
1049        let offsets: Vec<u32> = (0..num_segments)
1050            .map(|i| i as u32 * (max_doc + 1))
1051            .collect();
1052
1053        let sources: Vec<(&[u8], u32)> = serialized
1054            .iter()
1055            .zip(offsets.iter())
1056            .map(|(b, o)| (b.as_slice(), *o))
1057            .collect();
1058
1059        let mut merged = Vec::new();
1060        let (doc_count, _) =
1061            PositionPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1062        assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1063
1064        // Deserialize and verify all positions
1065        let merged_ppl = PositionPostingList::deserialize(&merged).unwrap();
1066        let all = collect_positions(&merged_ppl);
1067        assert_eq!(all.len(), num_segments * docs_per_segment);
1068
1069        // Verify positions for each segment
1070        for (seg, &offset) in offsets.iter().enumerate() {
1071            for i in 0..docs_per_segment {
1072                let idx = seg * docs_per_segment + i;
1073                let expected_doc = i as u32 * 2 + offset;
1074                assert_eq!(all[idx].0, expected_doc, "seg={} i={}", seg, i);
1075
1076                let n_pos = (i % 4) + 1;
1077                let expected_positions: Vec<u32> =
1078                    (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1079                assert_eq!(
1080                    all[idx].1, expected_positions,
1081                    "positions mismatch seg={} i={}",
1082                    seg, i
1083                );
1084            }
1085        }
1086    }
1087
1088    #[test]
1089    fn test_positions_streaming_single_source() {
1090        let entries: Vec<(u32, Vec<u32>)> =
1091            (0..300).map(|i| (i * 4, vec![i * 2, i * 2 + 1])).collect();
1092        let ppl = build_ppl(&entries);
1093        let direct = serialize_ppl(&ppl);
1094
1095        let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1096        let mut streamed = Vec::new();
1097        PositionPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1098
1099        let p1 = collect_positions(&PositionPostingList::deserialize(&direct).unwrap());
1100        let p2 = collect_positions(&PositionPostingList::deserialize(&streamed).unwrap());
1101        assert_eq!(p1, p2);
1102    }
1103
1104    #[test]
1105    fn test_positions_edge_single_doc() {
1106        let ppl_a = build_ppl(&[(0, vec![42, 43, 44])]);
1107        let ppl_b = build_ppl(&[(0, vec![100])]);
1108
1109        let merged = PositionPostingList::concatenate_blocks(&[(ppl_a, 0), (ppl_b, 1)]).unwrap();
1110
1111        assert_eq!(merged.doc_count(), 2);
1112        assert_eq!(merged.get_positions(0).unwrap(), vec![42, 43, 44]);
1113        assert_eq!(merged.get_positions(1).unwrap(), vec![100]);
1114    }
1115}