Skip to main content

hermes_core/structures/postings/
positions.rs

1//! Position-aware posting list for phrase queries and multi-field element tracking
2//!
3//! Positions are encoded as: (element_ordinal << 20) | token_position
4//! This allows up to 4096 elements per field and ~1M tokens per element.
5//!
6//! ## Block Format
7//!
8//! Uses a block-based format with skip list for efficient binary search by doc_id:
9//! - Skip list enables O(log n) lookup by doc_id
10//! - Blocks of up to 128 documents for cache efficiency
11//! - Delta encoding within blocks for compression
12//! - Stackable for fast merge (just adjust doc_id_base per block)
13//!
14//! Format:
15//! ```text
16//! Header:
17//!   - doc_count: u32
18//!   - num_blocks: u32
19//!   - skip_list: [(base_doc_id, last_doc_id, byte_offset)] per block
20//!   - data_len: u32
21//! Data:
22//!   - blocks: each block contains delta-encoded postings with positions
23//! ```
24
25use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
26use std::io::{self, Read, Write};
27
28use crate::DocId;
29
30/// Block size for position posting list (same as regular posting list)
31pub const POSITION_BLOCK_SIZE: usize = 128;
32
33/// Maximum token position within an element (20 bits = 1,048,575)
34pub const MAX_TOKEN_POSITION: u32 = (1 << 20) - 1;
35
36/// Maximum element ordinal (12 bits = 4095)
37pub const MAX_ELEMENT_ORDINAL: u32 = (1 << 12) - 1;
38
39/// Encode element ordinal and token position into a single u32
40#[inline]
41pub fn encode_position(element_ordinal: u32, token_position: u32) -> u32 {
42    debug_assert!(
43        element_ordinal <= MAX_ELEMENT_ORDINAL,
44        "Element ordinal {} exceeds maximum {}",
45        element_ordinal,
46        MAX_ELEMENT_ORDINAL
47    );
48    debug_assert!(
49        token_position <= MAX_TOKEN_POSITION,
50        "Token position {} exceeds maximum {}",
51        token_position,
52        MAX_TOKEN_POSITION
53    );
54    (element_ordinal << 20) | (token_position & MAX_TOKEN_POSITION)
55}
56
57/// Decode element ordinal from encoded position
58#[inline]
59pub fn decode_element_ordinal(position: u32) -> u32 {
60    position >> 20
61}
62
63/// Decode token position from encoded position
64#[inline]
65pub fn decode_token_position(position: u32) -> u32 {
66    position & MAX_TOKEN_POSITION
67}
68
69/// A posting entry with positions (used during building)
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct PostingWithPositions {
72    pub doc_id: DocId,
73    pub term_freq: u32,
74    /// Encoded positions: (element_ordinal << 20) | token_position
75    pub positions: Vec<u32>,
76}
77
78/// Block-based position posting list with skip list for O(log n) doc_id lookup
79///
80/// Similar to BlockPostingList but stores positions per document.
81/// Uses binary search on skip list to find the right block, then linear scan within block.
82#[derive(Debug, Clone)]
83pub struct PositionPostingList {
84    /// Skip list: (base_doc_id, last_doc_id, byte_offset)
85    /// Enables binary search to find the right block
86    skip_list: Vec<(DocId, DocId, u32)>,
87    /// Compressed block data
88    data: Vec<u8>,
89    /// Total document count
90    doc_count: u32,
91}
92
93impl Default for PositionPostingList {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99impl PositionPostingList {
100    pub fn new() -> Self {
101        Self {
102            skip_list: Vec::new(),
103            data: Vec::new(),
104            doc_count: 0,
105        }
106    }
107
108    pub fn with_capacity(capacity: usize) -> Self {
109        Self {
110            skip_list: Vec::with_capacity(capacity / POSITION_BLOCK_SIZE + 1),
111            data: Vec::with_capacity(capacity * 8), // rough estimate
112            doc_count: 0,
113        }
114    }
115
116    /// Build from a list of postings with positions
117    pub fn from_postings(postings: &[PostingWithPositions]) -> io::Result<Self> {
118        if postings.is_empty() {
119            return Ok(Self::new());
120        }
121
122        let mut skip_list = Vec::new();
123        let mut data = Vec::new();
124        let mut i = 0;
125
126        while i < postings.len() {
127            let block_start = data.len() as u32;
128            let block_end = (i + POSITION_BLOCK_SIZE).min(postings.len());
129            let block = &postings[i..block_end];
130
131            // Record skip entry
132            let base_doc_id = block.first().unwrap().doc_id;
133            let last_doc_id = block.last().unwrap().doc_id;
134            skip_list.push((base_doc_id, last_doc_id, block_start));
135
136            // Write block: fixed u32 count + first_doc (8-byte prefix), then vint deltas
137            data.write_u32::<LittleEndian>(block.len() as u32)?;
138            data.write_u32::<LittleEndian>(base_doc_id)?;
139
140            let mut prev_doc_id = base_doc_id;
141            for (j, posting) in block.iter().enumerate() {
142                if j > 0 {
143                    let delta = posting.doc_id - prev_doc_id;
144                    write_vint(&mut data, delta as u64)?;
145                }
146                prev_doc_id = posting.doc_id;
147
148                // Write positions count and positions (absolute - delta bad for ordinal<<20)
149                write_vint(&mut data, posting.positions.len() as u64)?;
150                for &pos in &posting.positions {
151                    write_vint(&mut data, pos as u64)?;
152                }
153            }
154
155            i = block_end;
156        }
157
158        Ok(Self {
159            skip_list,
160            data,
161            doc_count: postings.len() as u32,
162        })
163    }
164
165    /// Add a posting with positions (for building - converts to block format on serialize)
166    pub fn push(&mut self, doc_id: DocId, positions: Vec<u32>) {
167        // For compatibility: build in-memory, convert to blocks on serialize
168        // This is a simplified approach - we rebuild skip_list on serialize
169        let posting = PostingWithPositions {
170            doc_id,
171            term_freq: positions.len() as u32,
172            positions,
173        };
174
175        // Serialize this posting to data buffer
176        let block_start = self.data.len() as u32;
177
178        // If this is first posting or we need a new block
179        let need_new_block =
180            self.skip_list.is_empty() || self.doc_count.is_multiple_of(POSITION_BLOCK_SIZE as u32);
181
182        if need_new_block {
183            // Start new block: fixed u32 count + first_doc (8-byte prefix)
184            self.skip_list.push((doc_id, doc_id, block_start));
185            self.data.write_u32::<LittleEndian>(1u32).unwrap();
186            self.data.write_u32::<LittleEndian>(doc_id).unwrap();
187        } else {
188            // Add to existing block — update count in-place + add delta
189            let last_block = self.skip_list.last_mut().unwrap();
190            let prev_doc_id = last_block.1;
191            last_block.1 = doc_id;
192
193            // Patch count u32 at block start
194            let count_offset = last_block.2 as usize;
195            let old_count = u32::from_le_bytes(
196                self.data[count_offset..count_offset + 4]
197                    .try_into()
198                    .unwrap(),
199            );
200            self.data[count_offset..count_offset + 4]
201                .copy_from_slice(&(old_count + 1).to_le_bytes());
202
203            let delta = doc_id - prev_doc_id;
204            write_vint(&mut self.data, delta as u64).unwrap();
205        }
206
207        // Write positions (absolute - delta encoding bad for ordinal<<20)
208        write_vint(&mut self.data, posting.positions.len() as u64).unwrap();
209        for &pos in &posting.positions {
210            write_vint(&mut self.data, pos as u64).unwrap();
211        }
212
213        self.doc_count += 1;
214    }
215
216    pub fn doc_count(&self) -> u32 {
217        self.doc_count
218    }
219
220    pub fn len(&self) -> usize {
221        self.doc_count as usize
222    }
223
224    pub fn is_empty(&self) -> bool {
225        self.doc_count == 0
226    }
227
228    /// Get positions for a specific document using binary search on skip list
229    pub fn get_positions(&self, target_doc_id: DocId) -> Option<Vec<u32>> {
230        if self.skip_list.is_empty() {
231            return None;
232        }
233
234        // Binary search on skip list to find the right block
235        let block_idx = match self.skip_list.binary_search_by(|&(base, last, _)| {
236            if target_doc_id < base {
237                std::cmp::Ordering::Greater
238            } else if target_doc_id > last {
239                std::cmp::Ordering::Less
240            } else {
241                std::cmp::Ordering::Equal
242            }
243        }) {
244            Ok(idx) => idx,
245            Err(_) => return None, // doc_id not in any block range
246        };
247
248        // Decode block and search for doc_id
249        let offset = self.skip_list[block_idx].2 as usize;
250        let mut reader = &self.data[offset..];
251
252        // Fixed 8-byte prefix: count(u32) + first_doc(u32)
253        let count = reader.read_u32::<LittleEndian>().ok()? as usize;
254        let first_doc = reader.read_u32::<LittleEndian>().ok()?;
255        let mut prev_doc_id = first_doc;
256
257        for i in 0..count {
258            let doc_id = if i == 0 {
259                first_doc
260            } else {
261                let delta = read_vint(&mut reader).ok()? as u32;
262                prev_doc_id + delta
263            };
264            prev_doc_id = doc_id;
265
266            let num_positions = read_vint(&mut reader).ok()? as usize;
267
268            if doc_id == target_doc_id {
269                // Found it! Read positions (stored absolute)
270                let mut positions = Vec::with_capacity(num_positions);
271                for _ in 0..num_positions {
272                    let pos = read_vint(&mut reader).ok()? as u32;
273                    positions.push(pos);
274                }
275                return Some(positions);
276            } else {
277                // Skip positions
278                for _ in 0..num_positions {
279                    let _ = read_vint(&mut reader);
280                }
281            }
282        }
283
284        None
285    }
286
287    /// Serialize to bytes (footer-based: data first).
288    ///
289    /// Format:
290    /// ```text
291    /// [block data: data_len bytes]
292    /// [skip entries: N × 12 bytes (base_doc, last_doc, offset)]
293    /// [footer: data_len(4) + skip_count(4) + doc_count(4) = 12 bytes]
294    /// ```
295    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
296        // Data first
297        writer.write_all(&self.data)?;
298
299        // Skip list
300        for (base_doc_id, last_doc_id, offset) in &self.skip_list {
301            writer.write_u32::<LittleEndian>(*base_doc_id)?;
302            writer.write_u32::<LittleEndian>(*last_doc_id)?;
303            writer.write_u32::<LittleEndian>(*offset)?;
304        }
305
306        // Footer
307        writer.write_u32::<LittleEndian>(self.data.len() as u32)?;
308        writer.write_u32::<LittleEndian>(self.skip_list.len() as u32)?;
309        writer.write_u32::<LittleEndian>(self.doc_count)?;
310
311        Ok(())
312    }
313
314    /// Deserialize from a byte slice (footer-based format).
315    pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
316        if raw.len() < 12 {
317            return Err(io::Error::new(
318                io::ErrorKind::InvalidData,
319                "position data too short",
320            ));
321        }
322
323        // Parse footer (last 12 bytes)
324        let f = raw.len() - 12;
325        let data_len = u32::from_le_bytes(raw[f..f + 4].try_into().unwrap()) as usize;
326        let skip_count = u32::from_le_bytes(raw[f + 4..f + 8].try_into().unwrap()) as usize;
327        let doc_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap());
328
329        // Parse skip list (between data and footer)
330        let mut skip_list = Vec::with_capacity(skip_count);
331        let mut pos = data_len;
332        for _ in 0..skip_count {
333            let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
334            let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
335            let offset = u32::from_le_bytes(raw[pos + 8..pos + 12].try_into().unwrap());
336            skip_list.push((base, last, offset));
337            pos += 12;
338        }
339
340        let data = raw[..data_len].to_vec();
341
342        Ok(Self {
343            skip_list,
344            data,
345            doc_count,
346        })
347    }
348
349    /// Concatenate blocks from multiple position lists with doc_id remapping (for merge)
350    pub fn concatenate_blocks(sources: &[(PositionPostingList, u32)]) -> io::Result<Self> {
351        let mut skip_list = Vec::new();
352        let mut data = Vec::new();
353        let mut total_docs = 0u32;
354
355        for (source, doc_offset) in sources {
356            for block_idx in 0..source.skip_list.len() {
357                let (base, last, src_offset) = source.skip_list[block_idx];
358                let next_offset = if block_idx + 1 < source.skip_list.len() {
359                    source.skip_list[block_idx + 1].2 as usize
360                } else {
361                    source.data.len()
362                };
363
364                let new_base = base + doc_offset;
365                let new_last = last + doc_offset;
366                let new_offset = data.len() as u32;
367
368                // Copy and adjust block data
369                let block_bytes = &source.data[src_offset as usize..next_offset];
370
371                // Fixed 8-byte prefix: count(u32) + first_doc(u32)
372                let count = u32::from_le_bytes(block_bytes[0..4].try_into().unwrap());
373                let first_doc = u32::from_le_bytes(block_bytes[4..8].try_into().unwrap());
374
375                // Write patched prefix + copy rest verbatim
376                data.write_u32::<LittleEndian>(count)?;
377                data.write_u32::<LittleEndian>(first_doc + doc_offset)?;
378                data.extend_from_slice(&block_bytes[8..]);
379
380                skip_list.push((new_base, new_last, new_offset));
381                total_docs += count;
382            }
383        }
384
385        Ok(Self {
386            skip_list,
387            data,
388            doc_count: total_docs,
389        })
390    }
391
392    /// Streaming merge: write blocks directly to output writer (bounded memory).
393    ///
394    /// Parses only footer + skip_list from each source (no data copy),
395    /// streams block data with patched 8-byte prefixes directly to `writer`,
396    /// then appends merged skip_list + footer.
397    ///
398    /// Memory per term: O(total_blocks × 12) for skip entries only.
399    ///
400    /// Returns `(doc_count, bytes_written)`.
401    pub fn concatenate_streaming<W: Write>(
402        sources: &[(&[u8], u32)],
403        writer: &mut W,
404    ) -> io::Result<(u32, usize)> {
405        struct RawSource<'a> {
406            skip_list: Vec<(u32, u32, u32)>,
407            data: &'a [u8],
408            doc_count: u32,
409            doc_offset: u32,
410        }
411
412        let mut parsed: Vec<RawSource<'_>> = Vec::with_capacity(sources.len());
413        for (raw, doc_offset) in sources {
414            if raw.len() < 12 {
415                continue;
416            }
417            let f = raw.len() - 12;
418            let data_len = u32::from_le_bytes(raw[f..f + 4].try_into().unwrap()) as usize;
419            let skip_count = u32::from_le_bytes(raw[f + 4..f + 8].try_into().unwrap()) as usize;
420            let doc_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap());
421
422            let mut skip_list = Vec::with_capacity(skip_count);
423            let mut pos = data_len;
424            for _ in 0..skip_count {
425                let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
426                let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
427                let offset = u32::from_le_bytes(raw[pos + 8..pos + 12].try_into().unwrap());
428                skip_list.push((base, last, offset));
429                pos += 12;
430            }
431            parsed.push(RawSource {
432                skip_list,
433                data: &raw[..data_len],
434                doc_count,
435                doc_offset: *doc_offset,
436            });
437        }
438
439        let total_docs: u32 = parsed.iter().map(|s| s.doc_count).sum();
440
441        // Phase 1: Stream block data with patched first_doc
442        let mut merged_skip: Vec<(u32, u32, u32)> = Vec::new();
443        let mut data_written = 0u32;
444        let mut patch_buf = [0u8; 8];
445
446        for src in &parsed {
447            for (i, &(base, last, offset)) in src.skip_list.iter().enumerate() {
448                let start = offset as usize;
449                let end = if i + 1 < src.skip_list.len() {
450                    src.skip_list[i + 1].2 as usize
451                } else {
452                    src.data.len()
453                };
454                let block = &src.data[start..end];
455
456                merged_skip.push((base + src.doc_offset, last + src.doc_offset, data_written));
457
458                patch_buf[0..4].copy_from_slice(&block[0..4]);
459                let first_doc = u32::from_le_bytes(block[4..8].try_into().unwrap());
460                patch_buf[4..8].copy_from_slice(&(first_doc + src.doc_offset).to_le_bytes());
461                writer.write_all(&patch_buf)?;
462                writer.write_all(&block[8..])?;
463
464                data_written += block.len() as u32;
465            }
466        }
467
468        // Phase 2: Write skip_list + footer
469        for (base, last, offset) in &merged_skip {
470            writer.write_u32::<LittleEndian>(*base)?;
471            writer.write_u32::<LittleEndian>(*last)?;
472            writer.write_u32::<LittleEndian>(*offset)?;
473        }
474
475        writer.write_u32::<LittleEndian>(data_written)?;
476        writer.write_u32::<LittleEndian>(merged_skip.len() as u32)?;
477        writer.write_u32::<LittleEndian>(total_docs)?;
478
479        let total_bytes = data_written as usize + merged_skip.len() * 12 + 12;
480        Ok((total_docs, total_bytes))
481    }
482
483    /// Get iterator over all postings (for phrase queries)
484    pub fn iter(&self) -> PositionPostingIterator<'_> {
485        PositionPostingIterator::new(self)
486    }
487}
488
489/// Write variable-length integer (same as posting.rs)
490fn write_vint<W: Write>(writer: &mut W, mut value: u64) -> io::Result<()> {
491    loop {
492        let byte = (value & 0x7F) as u8;
493        value >>= 7;
494        if value == 0 {
495            writer.write_u8(byte)?;
496            break;
497        } else {
498            writer.write_u8(byte | 0x80)?;
499        }
500    }
501    Ok(())
502}
503
504/// Read variable-length integer (same as posting.rs)
505fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
506    let mut result = 0u64;
507    let mut shift = 0;
508    loop {
509        let byte = reader.read_u8()?;
510        result |= ((byte & 0x7F) as u64) << shift;
511        if byte & 0x80 == 0 {
512            break;
513        }
514        shift += 7;
515    }
516    Ok(result)
517}
518
519/// Iterator over block-based position posting list
520pub struct PositionPostingIterator<'a> {
521    list: &'a PositionPostingList,
522    current_block: usize,
523    position_in_block: usize,
524    block_postings: Vec<PostingWithPositions>,
525    exhausted: bool,
526}
527
528impl<'a> PositionPostingIterator<'a> {
529    pub fn new(list: &'a PositionPostingList) -> Self {
530        let exhausted = list.skip_list.is_empty();
531        let mut iter = Self {
532            list,
533            current_block: 0,
534            position_in_block: 0,
535            block_postings: Vec::new(),
536            exhausted,
537        };
538        if !iter.exhausted {
539            iter.load_block(0);
540        }
541        iter
542    }
543
544    fn load_block(&mut self, block_idx: usize) {
545        if block_idx >= self.list.skip_list.len() {
546            self.exhausted = true;
547            return;
548        }
549
550        self.current_block = block_idx;
551        self.position_in_block = 0;
552
553        let offset = self.list.skip_list[block_idx].2 as usize;
554        let mut reader = &self.list.data[offset..];
555
556        // Fixed 8-byte prefix: count(u32) + first_doc(u32)
557        let count = reader.read_u32::<LittleEndian>().unwrap_or(0) as usize;
558        let first_doc = reader.read_u32::<LittleEndian>().unwrap_or(0);
559        self.block_postings.clear();
560        self.block_postings.reserve(count);
561
562        let mut prev_doc_id = first_doc;
563
564        for i in 0..count {
565            let doc_id = if i == 0 {
566                first_doc
567            } else {
568                let delta = read_vint(&mut reader).unwrap_or(0) as u32;
569                prev_doc_id + delta
570            };
571            prev_doc_id = doc_id;
572
573            let num_positions = read_vint(&mut reader).unwrap_or(0) as usize;
574            let mut positions = Vec::with_capacity(num_positions);
575            for _ in 0..num_positions {
576                let pos = read_vint(&mut reader).unwrap_or(0) as u32;
577                positions.push(pos);
578            }
579
580            self.block_postings.push(PostingWithPositions {
581                doc_id,
582                term_freq: num_positions as u32,
583                positions,
584            });
585        }
586    }
587
588    pub fn doc(&self) -> DocId {
589        if self.exhausted || self.position_in_block >= self.block_postings.len() {
590            u32::MAX
591        } else {
592            self.block_postings[self.position_in_block].doc_id
593        }
594    }
595
596    pub fn term_freq(&self) -> u32 {
597        if self.exhausted || self.position_in_block >= self.block_postings.len() {
598            0
599        } else {
600            self.block_postings[self.position_in_block].term_freq
601        }
602    }
603
604    pub fn positions(&self) -> &[u32] {
605        if self.exhausted || self.position_in_block >= self.block_postings.len() {
606            &[]
607        } else {
608            &self.block_postings[self.position_in_block].positions
609        }
610    }
611
612    pub fn advance(&mut self) {
613        if self.exhausted {
614            return;
615        }
616
617        self.position_in_block += 1;
618        if self.position_in_block >= self.block_postings.len() {
619            self.load_block(self.current_block + 1);
620        }
621    }
622
623    pub fn seek(&mut self, target: DocId) {
624        if self.exhausted {
625            return;
626        }
627
628        // Check if target is in current block
629        if let Some((_, last, _)) = self.list.skip_list.get(self.current_block)
630            && target <= *last
631        {
632            // Target might be in current block, scan forward
633            while self.position_in_block < self.block_postings.len()
634                && self.block_postings[self.position_in_block].doc_id < target
635            {
636                self.position_in_block += 1;
637            }
638            if self.position_in_block >= self.block_postings.len() {
639                self.load_block(self.current_block + 1);
640                self.seek(target); // Continue seeking in next block
641            }
642            return;
643        }
644
645        // Binary search on skip list to find the right block
646        let block_idx = match self.list.skip_list.binary_search_by(|&(base, last, _)| {
647            if target < base {
648                std::cmp::Ordering::Greater
649            } else if target > last {
650                std::cmp::Ordering::Less
651            } else {
652                std::cmp::Ordering::Equal
653            }
654        }) {
655            Ok(idx) => idx,
656            Err(idx) => idx, // Use the next block if not found exactly
657        };
658
659        if block_idx >= self.list.skip_list.len() {
660            self.exhausted = true;
661            return;
662        }
663
664        self.load_block(block_idx);
665
666        // Linear scan within block
667        while self.position_in_block < self.block_postings.len()
668            && self.block_postings[self.position_in_block].doc_id < target
669        {
670            self.position_in_block += 1;
671        }
672
673        if self.position_in_block >= self.block_postings.len() {
674            self.load_block(self.current_block + 1);
675        }
676    }
677}
678
679#[cfg(test)]
680mod tests {
681    use super::*;
682
683    #[test]
684    fn test_position_encoding() {
685        // Element 0, position 5
686        let pos = encode_position(0, 5);
687        assert_eq!(decode_element_ordinal(pos), 0);
688        assert_eq!(decode_token_position(pos), 5);
689
690        // Element 3, position 100
691        let pos = encode_position(3, 100);
692        assert_eq!(decode_element_ordinal(pos), 3);
693        assert_eq!(decode_token_position(pos), 100);
694
695        // Max values
696        let pos = encode_position(MAX_ELEMENT_ORDINAL, MAX_TOKEN_POSITION);
697        assert_eq!(decode_element_ordinal(pos), MAX_ELEMENT_ORDINAL);
698        assert_eq!(decode_token_position(pos), MAX_TOKEN_POSITION);
699    }
700
701    #[test]
702    fn test_position_posting_list_build() {
703        // Build from postings
704        let postings = vec![
705            PostingWithPositions {
706                doc_id: 1,
707                term_freq: 2,
708                positions: vec![encode_position(0, 0), encode_position(0, 2)],
709            },
710            PostingWithPositions {
711                doc_id: 3,
712                term_freq: 1,
713                positions: vec![encode_position(1, 0)],
714            },
715        ];
716
717        let list = PositionPostingList::from_postings(&postings).unwrap();
718        assert_eq!(list.doc_count(), 2);
719
720        // Test binary search
721        let pos = list.get_positions(1).unwrap();
722        assert_eq!(pos.len(), 2);
723
724        let pos = list.get_positions(3).unwrap();
725        assert_eq!(pos.len(), 1);
726
727        // Not found
728        assert!(list.get_positions(2).is_none());
729        assert!(list.get_positions(99).is_none());
730    }
731
732    #[test]
733    fn test_serialization_roundtrip() {
734        let postings = vec![
735            PostingWithPositions {
736                doc_id: 1,
737                term_freq: 2,
738                positions: vec![encode_position(0, 0), encode_position(0, 5)],
739            },
740            PostingWithPositions {
741                doc_id: 3,
742                term_freq: 1,
743                positions: vec![encode_position(1, 0)],
744            },
745            PostingWithPositions {
746                doc_id: 5,
747                term_freq: 1,
748                positions: vec![encode_position(0, 10)],
749            },
750        ];
751
752        let list = PositionPostingList::from_postings(&postings).unwrap();
753
754        let mut bytes = Vec::new();
755        list.serialize(&mut bytes).unwrap();
756
757        let deserialized = PositionPostingList::deserialize(&bytes).unwrap();
758
759        assert_eq!(list.doc_count(), deserialized.doc_count());
760
761        // Verify binary search works on deserialized
762        let pos = deserialized.get_positions(1).unwrap();
763        assert_eq!(pos, vec![encode_position(0, 0), encode_position(0, 5)]);
764
765        let pos = deserialized.get_positions(3).unwrap();
766        assert_eq!(pos, vec![encode_position(1, 0)]);
767    }
768
769    #[test]
770    fn test_binary_search_many_blocks() {
771        // Create enough postings to span multiple blocks (128 per block)
772        let mut postings = Vec::new();
773        for i in 0..300 {
774            postings.push(PostingWithPositions {
775                doc_id: i * 2, // doc_ids: 0, 2, 4, 6, ...
776                term_freq: 1,
777                positions: vec![encode_position(0, i)],
778            });
779        }
780
781        let list = PositionPostingList::from_postings(&postings).unwrap();
782        assert_eq!(list.doc_count(), 300);
783
784        // Should have 3 blocks (128 + 128 + 44)
785        assert_eq!(list.skip_list.len(), 3);
786
787        // Test binary search across blocks
788        let pos = list.get_positions(0).unwrap();
789        assert_eq!(pos, vec![encode_position(0, 0)]);
790
791        let pos = list.get_positions(256).unwrap(); // doc 128 in block 1
792        assert_eq!(pos, vec![encode_position(0, 128)]);
793
794        let pos = list.get_positions(598).unwrap(); // last doc
795        assert_eq!(pos, vec![encode_position(0, 299)]);
796
797        // Not found (odd numbers don't exist)
798        assert!(list.get_positions(1).is_none());
799        assert!(list.get_positions(257).is_none());
800    }
801
802    #[test]
803    fn test_concatenate_blocks_merge() {
804        // Test efficient merge by concatenating blocks
805        let postings1 = vec![
806            PostingWithPositions {
807                doc_id: 0,
808                term_freq: 1,
809                positions: vec![0],
810            },
811            PostingWithPositions {
812                doc_id: 1,
813                term_freq: 1,
814                positions: vec![5],
815            },
816            PostingWithPositions {
817                doc_id: 2,
818                term_freq: 1,
819                positions: vec![10],
820            },
821        ];
822        let list1 = PositionPostingList::from_postings(&postings1).unwrap();
823
824        let postings2 = vec![
825            PostingWithPositions {
826                doc_id: 0,
827                term_freq: 1,
828                positions: vec![100],
829            },
830            PostingWithPositions {
831                doc_id: 1,
832                term_freq: 1,
833                positions: vec![105],
834            },
835        ];
836        let list2 = PositionPostingList::from_postings(&postings2).unwrap();
837
838        // Merge with doc_id offset
839        let combined = PositionPostingList::concatenate_blocks(&[
840            (list1, 0), // no offset
841            (list2, 3), // offset by 3 (list1 has 3 docs)
842        ])
843        .unwrap();
844
845        assert_eq!(combined.doc_count(), 5);
846
847        // Check doc_ids are correctly remapped
848        assert!(combined.get_positions(0).is_some());
849        assert!(combined.get_positions(1).is_some());
850        assert!(combined.get_positions(2).is_some());
851        assert!(combined.get_positions(3).is_some()); // was 0 in list2
852        assert!(combined.get_positions(4).is_some()); // was 1 in list2
853    }
854
855    #[test]
856    fn test_iterator() {
857        let postings = vec![
858            PostingWithPositions {
859                doc_id: 1,
860                term_freq: 2,
861                positions: vec![0, 5],
862            },
863            PostingWithPositions {
864                doc_id: 3,
865                term_freq: 1,
866                positions: vec![10],
867            },
868            PostingWithPositions {
869                doc_id: 5,
870                term_freq: 1,
871                positions: vec![15],
872            },
873        ];
874
875        let list = PositionPostingList::from_postings(&postings).unwrap();
876        let mut iter = list.iter();
877
878        assert_eq!(iter.doc(), 1);
879        assert_eq!(iter.positions(), &[0, 5]);
880
881        iter.advance();
882        assert_eq!(iter.doc(), 3);
883
884        iter.seek(5);
885        assert_eq!(iter.doc(), 5);
886        assert_eq!(iter.positions(), &[15]);
887
888        iter.advance();
889        assert_eq!(iter.doc(), u32::MAX); // exhausted
890    }
891
892    /// Helper: build a PositionPostingList from (doc_id, positions) pairs
893    fn build_ppl(entries: &[(u32, Vec<u32>)]) -> PositionPostingList {
894        let postings: Vec<PostingWithPositions> = entries
895            .iter()
896            .map(|(doc_id, positions)| PostingWithPositions {
897                doc_id: *doc_id,
898                term_freq: positions.len() as u32,
899                positions: positions.clone(),
900            })
901            .collect();
902        PositionPostingList::from_postings(&postings).unwrap()
903    }
904
905    /// Helper: serialize a PositionPostingList to bytes
906    fn serialize_ppl(ppl: &PositionPostingList) -> Vec<u8> {
907        let mut buf = Vec::new();
908        ppl.serialize(&mut buf).unwrap();
909        buf
910    }
911
912    /// Helper: collect all (doc_id, positions) from a PositionPostingIterator
913    fn collect_positions(ppl: &PositionPostingList) -> Vec<(u32, Vec<u32>)> {
914        let mut result = Vec::new();
915        let mut it = ppl.iter();
916        while it.doc() != u32::MAX {
917            result.push((it.doc(), it.positions().to_vec()));
918            it.advance();
919        }
920        result
921    }
922
923    #[test]
924    fn test_concatenate_streaming_matches_blocks() {
925        // Build 3 segments with positions
926        let seg_a: Vec<(u32, Vec<u32>)> = (0..150)
927            .map(|i| (i * 2, vec![i * 10, i * 10 + 3]))
928            .collect();
929        let seg_b: Vec<(u32, Vec<u32>)> = (0..100).map(|i| (i * 5, vec![i * 7])).collect();
930        let seg_c: Vec<(u32, Vec<u32>)> = (0..80).map(|i| (i * 3, vec![i, i + 1, i + 2])).collect();
931
932        let ppl_a = build_ppl(&seg_a);
933        let ppl_b = build_ppl(&seg_b);
934        let ppl_c = build_ppl(&seg_c);
935
936        let offset_b = 500u32;
937        let offset_c = 1000u32;
938
939        // Method 1: concatenate_blocks (reference)
940        let ref_merged = PositionPostingList::concatenate_blocks(&[
941            (ppl_a.clone(), 0),
942            (ppl_b.clone(), offset_b),
943            (ppl_c.clone(), offset_c),
944        ])
945        .unwrap();
946        let mut ref_buf = Vec::new();
947        ref_merged.serialize(&mut ref_buf).unwrap();
948
949        // Method 2: concatenate_streaming
950        let bytes_a = serialize_ppl(&ppl_a);
951        let bytes_b = serialize_ppl(&ppl_b);
952        let bytes_c = serialize_ppl(&ppl_c);
953
954        let sources: Vec<(&[u8], u32)> =
955            vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
956        let mut stream_buf = Vec::new();
957        let (doc_count, bytes_written) =
958            PositionPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
959
960        assert_eq!(doc_count, 330);
961        assert_eq!(bytes_written, stream_buf.len());
962
963        // Deserialize both and compare all postings + positions
964        let ref_posts = collect_positions(&PositionPostingList::deserialize(&ref_buf).unwrap());
965        let stream_posts =
966            collect_positions(&PositionPostingList::deserialize(&stream_buf).unwrap());
967
968        assert_eq!(ref_posts.len(), stream_posts.len());
969        for (i, (r, s)) in ref_posts.iter().zip(stream_posts.iter()).enumerate() {
970            assert_eq!(r.0, s.0, "doc_id mismatch at {}", i);
971            assert_eq!(r.1, s.1, "positions mismatch at doc {}", r.0);
972        }
973    }
974
975    #[test]
976    fn test_positions_multi_round_merge() {
977        // Round 0: 4 segments with distinct positions
978        let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..4)
979            .map(|seg| {
980                (0..200)
981                    .map(|i| {
982                        let pos_count = (i % 3) + 1;
983                        let positions: Vec<u32> = (0..pos_count)
984                            .map(|p| (seg * 1000 + i * 10 + p) as u32)
985                            .collect();
986                        (i as u32 * 3, positions)
987                    })
988                    .collect()
989            })
990            .collect();
991
992        let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
993        let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
994
995        // Round 1: merge pairs
996        let mut merged_01 = Vec::new();
997        let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
998        let (dc_01, _) =
999            PositionPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
1000        assert_eq!(dc_01, 400);
1001
1002        let mut merged_23 = Vec::new();
1003        let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
1004        let (dc_23, _) =
1005            PositionPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
1006        assert_eq!(dc_23, 400);
1007
1008        // Round 2: merge intermediate results
1009        let mut final_merged = Vec::new();
1010        let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
1011        let (dc_final, _) =
1012            PositionPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
1013        assert_eq!(dc_final, 800);
1014
1015        // Verify all positions survived two rounds of merging
1016        let final_ppl = PositionPostingList::deserialize(&final_merged).unwrap();
1017        let all = collect_positions(&final_ppl);
1018        assert_eq!(all.len(), 800);
1019
1020        // Spot-check: first doc of each original segment
1021        // Seg0: doc_id=0, positions from seg=0
1022        assert_eq!(all[0].0, 0);
1023        assert_eq!(all[0].1, vec![0]); // seg=0, i=0, pos_count=1, pos=0*1000+0*10+0=0
1024
1025        // Seg1: doc_id=600, positions from seg=1
1026        assert_eq!(all[200].0, 600);
1027        assert_eq!(all[200].1, vec![1000]); // seg=1, i=0, pos_count=1, pos=1*1000+0*10+0=1000
1028
1029        // Seg2: doc_id=1200, positions from seg=2
1030        assert_eq!(all[400].0, 1200);
1031        assert_eq!(all[400].1, vec![2000]); // seg=2, i=0, pos_count=1, pos=2*1000+0*10+0=2000
1032
1033        // Verify seek + positions on the final merged list
1034        let mut it = final_ppl.iter();
1035        it.seek(1200);
1036        assert_eq!(it.doc(), 1200);
1037        assert_eq!(it.positions(), &[2000]);
1038
1039        // Verify get_positions (binary search path)
1040        let pos = final_ppl.get_positions(600).unwrap();
1041        assert_eq!(pos, vec![1000]);
1042    }
1043
1044    #[test]
1045    fn test_positions_large_scale_merge() {
1046        // 5 segments × 500 docs, each doc has 1-4 positions
1047        let num_segments = 5usize;
1048        let docs_per_segment = 500usize;
1049
1050        let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..num_segments)
1051            .map(|seg| {
1052                (0..docs_per_segment)
1053                    .map(|i| {
1054                        let n_pos = (i % 4) + 1;
1055                        let positions: Vec<u32> =
1056                            (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1057                        (i as u32 * 2, positions)
1058                    })
1059                    .collect()
1060            })
1061            .collect();
1062
1063        let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
1064        let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
1065
1066        let max_doc = (docs_per_segment as u32 - 1) * 2;
1067        let offsets: Vec<u32> = (0..num_segments)
1068            .map(|i| i as u32 * (max_doc + 1))
1069            .collect();
1070
1071        let sources: Vec<(&[u8], u32)> = serialized
1072            .iter()
1073            .zip(offsets.iter())
1074            .map(|(b, o)| (b.as_slice(), *o))
1075            .collect();
1076
1077        let mut merged = Vec::new();
1078        let (doc_count, _) =
1079            PositionPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1080        assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1081
1082        // Deserialize and verify all positions
1083        let merged_ppl = PositionPostingList::deserialize(&merged).unwrap();
1084        let all = collect_positions(&merged_ppl);
1085        assert_eq!(all.len(), num_segments * docs_per_segment);
1086
1087        // Verify positions for each segment
1088        for (seg, &offset) in offsets.iter().enumerate() {
1089            for i in 0..docs_per_segment {
1090                let idx = seg * docs_per_segment + i;
1091                let expected_doc = i as u32 * 2 + offset;
1092                assert_eq!(all[idx].0, expected_doc, "seg={} i={}", seg, i);
1093
1094                let n_pos = (i % 4) + 1;
1095                let expected_positions: Vec<u32> =
1096                    (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1097                assert_eq!(
1098                    all[idx].1, expected_positions,
1099                    "positions mismatch seg={} i={}",
1100                    seg, i
1101                );
1102            }
1103        }
1104    }
1105
1106    #[test]
1107    fn test_positions_streaming_single_source() {
1108        let entries: Vec<(u32, Vec<u32>)> =
1109            (0..300).map(|i| (i * 4, vec![i * 2, i * 2 + 1])).collect();
1110        let ppl = build_ppl(&entries);
1111        let direct = serialize_ppl(&ppl);
1112
1113        let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1114        let mut streamed = Vec::new();
1115        PositionPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1116
1117        let p1 = collect_positions(&PositionPostingList::deserialize(&direct).unwrap());
1118        let p2 = collect_positions(&PositionPostingList::deserialize(&streamed).unwrap());
1119        assert_eq!(p1, p2);
1120    }
1121
1122    #[test]
1123    fn test_positions_edge_single_doc() {
1124        let ppl_a = build_ppl(&[(0, vec![42, 43, 44])]);
1125        let ppl_b = build_ppl(&[(0, vec![100])]);
1126
1127        let merged = PositionPostingList::concatenate_blocks(&[(ppl_a, 0), (ppl_b, 1)]).unwrap();
1128
1129        assert_eq!(merged.doc_count(), 2);
1130        assert_eq!(merged.get_positions(0).unwrap(), vec![42, 43, 44]);
1131        assert_eq!(merged.get_positions(1).unwrap(), vec![100]);
1132    }
1133}