Skip to main content

hermes_core/structures/postings/
positions.rs

1//! Position-aware posting list for phrase queries and multi-field element tracking
2//!
3//! Positions are encoded as: (element_ordinal << 20) | token_position
4//! This allows up to 4096 elements per field and ~1M tokens per element.
5//!
6//! ## Block Format
7//!
8//! Uses a block-based format with skip list for efficient binary search by doc_id:
9//! - Skip list enables O(log n) lookup by doc_id
10//! - Blocks of up to 128 documents for cache efficiency
11//! - Delta encoding within blocks for compression
12//! - Stackable for fast merge (just adjust doc_id_base per block)
13//!
14//! Format:
15//! ```text
16//! Header:
17//!   - doc_count: u32
18//!   - num_blocks: u32
19//!   - skip_list: [(base_doc_id, last_doc_id, byte_offset)] per block
20//!   - data_len: u32
21//! Data:
22//!   - blocks: each block contains delta-encoded postings with positions
23//! ```
24
25use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
26use std::io::{self, Write};
27
28use super::posting_common::{read_vint, write_vint};
29use crate::DocId;
30
31/// Block size for position posting list (same as regular posting list)
32pub const POSITION_BLOCK_SIZE: usize = 128;
33
34/// Maximum token position within an element (20 bits = 1,048,575)
35pub const MAX_TOKEN_POSITION: u32 = (1 << 20) - 1;
36
37/// Maximum element ordinal (12 bits = 4095)
38pub const MAX_ELEMENT_ORDINAL: u32 = (1 << 12) - 1;
39
40/// Encode element ordinal and token position into a single u32
41#[inline]
42pub fn encode_position(element_ordinal: u32, token_position: u32) -> u32 {
43    debug_assert!(
44        element_ordinal <= MAX_ELEMENT_ORDINAL,
45        "Element ordinal {} exceeds maximum {}",
46        element_ordinal,
47        MAX_ELEMENT_ORDINAL
48    );
49    debug_assert!(
50        token_position <= MAX_TOKEN_POSITION,
51        "Token position {} exceeds maximum {}",
52        token_position,
53        MAX_TOKEN_POSITION
54    );
55    (element_ordinal << 20) | (token_position & MAX_TOKEN_POSITION)
56}
57
58/// Decode element ordinal from encoded position
59#[inline]
60pub fn decode_element_ordinal(position: u32) -> u32 {
61    position >> 20
62}
63
64/// Decode token position from encoded position
65#[inline]
66pub fn decode_token_position(position: u32) -> u32 {
67    position & MAX_TOKEN_POSITION
68}
69
70/// A posting entry with positions (used during building)
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct PostingWithPositions {
73    pub doc_id: DocId,
74    pub term_freq: u32,
75    /// Encoded positions: (element_ordinal << 20) | token_position
76    pub positions: Vec<u32>,
77}
78
79/// Block-based position posting list with skip list for O(log n) doc_id lookup
80///
81/// Similar to BlockPostingList but stores positions per document.
82/// Uses binary search on skip list to find the right block, then linear scan within block.
83#[derive(Debug, Clone)]
84pub struct PositionPostingList {
85    /// Skip list: (base_doc_id, last_doc_id, byte_offset)
86    /// Enables binary search to find the right block
87    skip_list: Vec<(DocId, DocId, u64)>,
88    /// Compressed block data
89    data: Vec<u8>,
90    /// Total document count
91    doc_count: u32,
92}
93
94impl Default for PositionPostingList {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100impl PositionPostingList {
101    pub fn new() -> Self {
102        Self {
103            skip_list: Vec::new(),
104            data: Vec::new(),
105            doc_count: 0,
106        }
107    }
108
109    pub fn with_capacity(capacity: usize) -> Self {
110        Self {
111            skip_list: Vec::with_capacity(capacity / POSITION_BLOCK_SIZE + 1),
112            data: Vec::with_capacity(capacity * 8), // rough estimate
113            doc_count: 0,
114        }
115    }
116
117    /// Build from a list of postings with positions
118    pub fn from_postings(postings: &[PostingWithPositions]) -> io::Result<Self> {
119        if postings.is_empty() {
120            return Ok(Self::new());
121        }
122
123        let mut skip_list = Vec::new();
124        let mut data = Vec::new();
125        let mut i = 0;
126
127        while i < postings.len() {
128            let block_start = data.len() as u64;
129            let block_end = (i + POSITION_BLOCK_SIZE).min(postings.len());
130            let block = &postings[i..block_end];
131
132            // Record skip entry
133            let base_doc_id = block.first().unwrap().doc_id;
134            let last_doc_id = block.last().unwrap().doc_id;
135            skip_list.push((base_doc_id, last_doc_id, block_start));
136
137            // Write block: fixed u32 count + first_doc (8-byte prefix), then vint deltas
138            data.write_u32::<LittleEndian>(block.len() as u32)?;
139            data.write_u32::<LittleEndian>(base_doc_id)?;
140
141            let mut prev_doc_id = base_doc_id;
142            for (j, posting) in block.iter().enumerate() {
143                if j > 0 {
144                    let delta = posting.doc_id - prev_doc_id;
145                    write_vint(&mut data, delta as u64)?;
146                }
147                prev_doc_id = posting.doc_id;
148
149                // Write positions count and positions (absolute - delta bad for ordinal<<20)
150                write_vint(&mut data, posting.positions.len() as u64)?;
151                for &pos in &posting.positions {
152                    write_vint(&mut data, pos as u64)?;
153                }
154            }
155
156            i = block_end;
157        }
158
159        Ok(Self {
160            skip_list,
161            data,
162            doc_count: postings.len() as u32,
163        })
164    }
165
166    /// Add a posting with positions (for building - converts to block format on serialize)
167    pub fn push(&mut self, doc_id: DocId, positions: Vec<u32>) {
168        // For compatibility: build in-memory, convert to blocks on serialize
169        // This is a simplified approach - we rebuild skip_list on serialize
170        let posting = PostingWithPositions {
171            doc_id,
172            term_freq: positions.len() as u32,
173            positions,
174        };
175
176        // Serialize this posting to data buffer
177        let block_start = self.data.len() as u64;
178
179        // If this is first posting or we need a new block
180        let need_new_block =
181            self.skip_list.is_empty() || self.doc_count.is_multiple_of(POSITION_BLOCK_SIZE as u32);
182
183        if need_new_block {
184            // Start new block: fixed u32 count + first_doc (8-byte prefix)
185            self.skip_list.push((doc_id, doc_id, block_start));
186            self.data.write_u32::<LittleEndian>(1u32).unwrap();
187            self.data.write_u32::<LittleEndian>(doc_id).unwrap();
188        } else {
189            // Add to existing block — update count in-place + add delta
190            let last_block = self.skip_list.last_mut().unwrap();
191            let prev_doc_id = last_block.1;
192            last_block.1 = doc_id;
193
194            // Patch count u32 at block start
195            let count_offset = last_block.2 as usize;
196            let old_count = u32::from_le_bytes(
197                self.data[count_offset..count_offset + 4]
198                    .try_into()
199                    .unwrap(),
200            );
201            self.data[count_offset..count_offset + 4]
202                .copy_from_slice(&(old_count + 1).to_le_bytes());
203
204            let delta = doc_id - prev_doc_id;
205            write_vint(&mut self.data, delta as u64).unwrap();
206        }
207
208        // Write positions (absolute - delta encoding bad for ordinal<<20)
209        write_vint(&mut self.data, posting.positions.len() as u64).unwrap();
210        for &pos in &posting.positions {
211            write_vint(&mut self.data, pos as u64).unwrap();
212        }
213
214        self.doc_count += 1;
215    }
216
217    pub fn doc_count(&self) -> u32 {
218        self.doc_count
219    }
220
221    pub fn len(&self) -> usize {
222        self.doc_count as usize
223    }
224
225    pub fn is_empty(&self) -> bool {
226        self.doc_count == 0
227    }
228
229    /// Get positions for a specific document into a reusable buffer (zero allocation).
230    /// Returns true if the document was found.
231    pub fn get_positions_into(&self, target_doc_id: DocId, out: &mut Vec<u32>) -> bool {
232        out.clear();
233        self.get_positions_impl(target_doc_id, Some(out)).is_some()
234    }
235
236    /// Get positions for a specific document using binary search on skip list
237    pub fn get_positions(&self, target_doc_id: DocId) -> Option<Vec<u32>> {
238        self.get_positions_impl(target_doc_id, None)
239    }
240
241    fn get_positions_impl(
242        &self,
243        target_doc_id: DocId,
244        mut out: Option<&mut Vec<u32>>,
245    ) -> Option<Vec<u32>> {
246        if self.skip_list.is_empty() {
247            return None;
248        }
249
250        // Binary search on skip list to find the right block
251        let block_idx = match self.skip_list.binary_search_by(|&(base, last, _)| {
252            if target_doc_id < base {
253                std::cmp::Ordering::Greater
254            } else if target_doc_id > last {
255                std::cmp::Ordering::Less
256            } else {
257                std::cmp::Ordering::Equal
258            }
259        }) {
260            Ok(idx) => idx,
261            Err(_) => return None, // doc_id not in any block range
262        };
263
264        // Decode block and search for doc_id
265        let offset = self.skip_list[block_idx].2 as usize;
266        let mut reader = &self.data[offset..];
267
268        // Fixed 8-byte prefix: count(u32) + first_doc(u32)
269        let count = reader.read_u32::<LittleEndian>().ok()? as usize;
270        let first_doc = reader.read_u32::<LittleEndian>().ok()?;
271        let mut prev_doc_id = first_doc;
272
273        for i in 0..count {
274            let doc_id = if i == 0 {
275                first_doc
276            } else {
277                let delta = read_vint(&mut reader).ok()? as u32;
278                prev_doc_id + delta
279            };
280            prev_doc_id = doc_id;
281
282            let num_positions = read_vint(&mut reader).ok()? as usize;
283
284            if doc_id == target_doc_id {
285                // Found it! Read positions (stored absolute)
286                if let Some(buf) = &mut out {
287                    // Buffer-based path: write into caller's reusable buffer
288                    buf.reserve(num_positions);
289                    for _ in 0..num_positions {
290                        let pos = read_vint(&mut reader).ok()? as u32;
291                        buf.push(pos);
292                    }
293                    return Some(Vec::new()); // sentinel: actual data is in `out`
294                }
295                let mut positions = Vec::with_capacity(num_positions);
296                for _ in 0..num_positions {
297                    let pos = read_vint(&mut reader).ok()? as u32;
298                    positions.push(pos);
299                }
300                return Some(positions);
301            } else {
302                // Skip positions
303                for _ in 0..num_positions {
304                    if read_vint(&mut reader).is_err() {
305                        return None;
306                    }
307                }
308            }
309        }
310
311        None
312    }
313
314    /// Size of one serialized skip entry:
315    /// first_doc(4) + last_doc(4) + offset(8) + length(4) = 20 bytes
316    const SKIP_ENTRY_SIZE: usize = 20;
317
318    /// Serialize to bytes (footer-based: data first).
319    ///
320    /// Format:
321    /// ```text
322    /// [block data: data_len bytes]
323    /// [skip entries: N × 20 bytes (base_doc, last_doc, offset, length)]
324    /// [footer: data_len(8) + skip_count(4) + doc_count(4) = 16 bytes]
325    /// ```
326    pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
327        // Data first
328        writer.write_all(&self.data)?;
329
330        // Skip list — compute length from adjacent entries
331        for (i, (base_doc_id, last_doc_id, offset)) in self.skip_list.iter().enumerate() {
332            let next_offset = if i + 1 < self.skip_list.len() {
333                self.skip_list[i + 1].2
334            } else {
335                self.data.len() as u64
336            };
337            let length = (next_offset - offset) as u32;
338            writer.write_u32::<LittleEndian>(*base_doc_id)?;
339            writer.write_u32::<LittleEndian>(*last_doc_id)?;
340            writer.write_u64::<LittleEndian>(*offset)?;
341            writer.write_u32::<LittleEndian>(length)?;
342        }
343
344        // Footer
345        writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
346        writer.write_u32::<LittleEndian>(self.skip_list.len() as u32)?;
347        writer.write_u32::<LittleEndian>(self.doc_count)?;
348
349        Ok(())
350    }
351
352    /// Deserialize from a byte slice (footer-based format).
353    pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
354        if raw.len() < 16 {
355            return Err(io::Error::new(
356                io::ErrorKind::InvalidData,
357                "position data too short",
358            ));
359        }
360
361        // Parse footer (last 16 bytes)
362        let f = raw.len() - 16;
363        let data_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
364        let skip_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
365        let doc_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap());
366
367        // Parse skip list (20-byte entries; length field not stored in-memory)
368        let mut skip_list = Vec::with_capacity(skip_count);
369        let mut pos = data_len;
370        for _ in 0..skip_count {
371            let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
372            let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
373            let offset = u64::from_le_bytes(raw[pos + 8..pos + 16].try_into().unwrap());
374            // pos + 16..pos + 20 = length (not needed in-memory; adjacent entries suffice)
375            skip_list.push((base, last, offset));
376            pos += Self::SKIP_ENTRY_SIZE;
377        }
378
379        let data = raw[..data_len].to_vec();
380
381        Ok(Self {
382            skip_list,
383            data,
384            doc_count,
385        })
386    }
387
388    /// Concatenate blocks from multiple position lists with doc_id remapping (for merge)
389    pub fn concatenate_blocks(sources: &[(PositionPostingList, u32)]) -> io::Result<Self> {
390        let mut skip_list = Vec::new();
391        let mut data = Vec::new();
392        let mut total_docs = 0u32;
393
394        for (source, doc_offset) in sources {
395            for block_idx in 0..source.skip_list.len() {
396                let (base, last, src_offset) = source.skip_list[block_idx];
397                let next_offset = if block_idx + 1 < source.skip_list.len() {
398                    source.skip_list[block_idx + 1].2 as usize
399                } else {
400                    source.data.len()
401                };
402
403                let new_base = base + doc_offset;
404                let new_last = last + doc_offset;
405                let new_offset = data.len() as u64;
406
407                // Copy and adjust block data
408                let block_bytes = &source.data[src_offset as usize..next_offset];
409
410                // Fixed 8-byte prefix: count(u32) + first_doc(u32)
411                let count = u32::from_le_bytes(block_bytes[0..4].try_into().unwrap());
412                let first_doc = u32::from_le_bytes(block_bytes[4..8].try_into().unwrap());
413
414                // Write patched prefix + copy rest verbatim
415                data.write_u32::<LittleEndian>(count)?;
416                data.write_u32::<LittleEndian>(first_doc + doc_offset)?;
417                data.extend_from_slice(&block_bytes[8..]);
418
419                skip_list.push((new_base, new_last, new_offset));
420                total_docs += count;
421            }
422        }
423
424        Ok(Self {
425            skip_list,
426            data,
427            doc_count: total_docs,
428        })
429    }
430
431    /// Streaming merge: write blocks directly to output writer (bounded memory).
432    ///
433    /// **Zero-materializing**: reads skip entries directly from source bytes
434    /// without parsing into Vecs. Explicit `length` field in each 20-byte
435    /// entry eliminates adjacent-entry lookups.
436    ///
437    /// Only output skip bytes are buffered (bounded O(total_blocks × 20)).
438    /// Block data flows source → output writer without intermediate buffering.
439    ///
440    /// Returns `(doc_count, bytes_written)`.
441    pub fn concatenate_streaming<W: Write>(
442        sources: &[(&[u8], u32)],
443        writer: &mut W,
444    ) -> io::Result<(u32, usize)> {
445        // Parse only footers (16 bytes each) — no skip entries materialized
446        struct SourceMeta {
447            data_len: usize,
448            skip_count: usize,
449        }
450
451        let mut metas: Vec<SourceMeta> = Vec::with_capacity(sources.len());
452        let mut total_docs = 0u32;
453
454        for (raw, _) in sources {
455            if raw.len() < 16 {
456                continue;
457            }
458            let f = raw.len() - 16;
459            let data_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
460            let skip_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
461            let doc_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap());
462            total_docs += doc_count;
463            metas.push(SourceMeta {
464                data_len,
465                skip_count,
466            });
467        }
468
469        // Phase 1: Stream block data, reading skip entries on-the-fly.
470        // Accumulate output skip bytes (bounded: 20 bytes × total_blocks).
471        let mut out_skip: Vec<u8> = Vec::new();
472        let mut out_skip_count = 0u32;
473        let mut data_written = 0u64;
474        let mut patch_buf = [0u8; 8];
475        let es = Self::SKIP_ENTRY_SIZE;
476
477        for (src_idx, meta) in metas.iter().enumerate() {
478            let (raw, doc_offset) = &sources[src_idx];
479            let skip_base = meta.data_len;
480            let data = &raw[..meta.data_len];
481
482            for i in 0..meta.skip_count {
483                // Read source skip entry directly from raw bytes
484                let p = skip_base + i * es;
485                let base = u32::from_le_bytes(raw[p..p + 4].try_into().unwrap());
486                let last = u32::from_le_bytes(raw[p + 4..p + 8].try_into().unwrap());
487                let offset = u64::from_le_bytes(raw[p + 8..p + 16].try_into().unwrap());
488                let length = u32::from_le_bytes(raw[p + 16..p + 20].try_into().unwrap());
489
490                let block = &data[offset as usize..(offset as usize + length as usize)];
491
492                // Write output skip entry
493                out_skip.extend_from_slice(&(base + doc_offset).to_le_bytes());
494                out_skip.extend_from_slice(&(last + doc_offset).to_le_bytes());
495                out_skip.extend_from_slice(&data_written.to_le_bytes());
496                out_skip.extend_from_slice(&length.to_le_bytes());
497                out_skip_count += 1;
498
499                // Write patched 8-byte prefix + rest of block verbatim
500                patch_buf[0..4].copy_from_slice(&block[0..4]);
501                let first_doc = u32::from_le_bytes(block[4..8].try_into().unwrap());
502                patch_buf[4..8].copy_from_slice(&(first_doc + doc_offset).to_le_bytes());
503                writer.write_all(&patch_buf)?;
504                writer.write_all(&block[8..])?;
505
506                data_written += block.len() as u64;
507            }
508        }
509
510        // Phase 2: Write skip entries + footer
511        writer.write_all(&out_skip)?;
512
513        writer.write_u64::<LittleEndian>(data_written)?;
514        writer.write_u32::<LittleEndian>(out_skip_count)?;
515        writer.write_u32::<LittleEndian>(total_docs)?;
516
517        let total_bytes = data_written as usize + out_skip.len() + 16;
518        Ok((total_docs, total_bytes))
519    }
520
521    /// Get iterator over all postings (for phrase queries)
522    pub fn iter(&self) -> PositionPostingIterator<'_> {
523        PositionPostingIterator::new(self)
524    }
525}
526
527/// Iterator over block-based position posting list
528///
529/// Uses flat buffers instead of per-posting `Vec<u32>` to avoid heap
530/// allocations on every block load. Positions are concatenated into a
531/// single `Vec<u32>` with offset/length pairs for O(1) slicing.
532pub struct PositionPostingIterator<'a> {
533    list: &'a PositionPostingList,
534    current_block: usize,
535    position_in_block: usize,
536    /// Number of postings in the current block
537    block_count: usize,
538    /// Doc IDs for each posting in the block
539    block_doc_ids: Vec<DocId>,
540    /// Term frequency for each posting in the block
541    block_term_freqs: Vec<u32>,
542    /// All positions concatenated (flat buffer, reused across blocks)
543    block_positions: Vec<u32>,
544    /// Start offset into block_positions for each posting
545    block_pos_offsets: Vec<usize>,
546    exhausted: bool,
547}
548
549impl<'a> PositionPostingIterator<'a> {
550    pub fn new(list: &'a PositionPostingList) -> Self {
551        let exhausted = list.skip_list.is_empty();
552        let mut iter = Self {
553            list,
554            current_block: 0,
555            position_in_block: 0,
556            block_count: 0,
557            block_doc_ids: Vec::with_capacity(POSITION_BLOCK_SIZE),
558            block_term_freqs: Vec::with_capacity(POSITION_BLOCK_SIZE),
559            block_positions: Vec::new(),
560            block_pos_offsets: Vec::with_capacity(POSITION_BLOCK_SIZE + 1),
561            exhausted,
562        };
563        if !iter.exhausted {
564            iter.load_block(0);
565        }
566        iter
567    }
568
569    fn load_block(&mut self, block_idx: usize) {
570        if block_idx >= self.list.skip_list.len() {
571            self.exhausted = true;
572            return;
573        }
574
575        self.current_block = block_idx;
576        self.position_in_block = 0;
577
578        let offset = self.list.skip_list[block_idx].2 as usize;
579        let mut reader = &self.list.data[offset..];
580
581        // Fixed 8-byte prefix: count(u32) + first_doc(u32)
582        let count = reader.read_u32::<LittleEndian>().unwrap_or(0) as usize;
583        let first_doc = reader.read_u32::<LittleEndian>().unwrap_or(0);
584
585        self.block_count = count;
586        self.block_doc_ids.clear();
587        self.block_term_freqs.clear();
588        self.block_positions.clear();
589        self.block_pos_offsets.clear();
590
591        let mut prev_doc_id = first_doc;
592
593        for i in 0..count {
594            let doc_id = if i == 0 {
595                first_doc
596            } else {
597                let delta = read_vint(&mut reader).unwrap_or(0) as u32;
598                prev_doc_id + delta
599            };
600            prev_doc_id = doc_id;
601
602            let num_positions = read_vint(&mut reader).unwrap_or(0) as usize;
603            self.block_doc_ids.push(doc_id);
604            self.block_term_freqs.push(num_positions as u32);
605            self.block_pos_offsets.push(self.block_positions.len());
606            for _ in 0..num_positions {
607                let pos = read_vint(&mut reader).unwrap_or(0) as u32;
608                self.block_positions.push(pos);
609            }
610        }
611        // Sentinel offset for the last posting's end
612        self.block_pos_offsets.push(self.block_positions.len());
613    }
614
615    pub fn doc(&self) -> DocId {
616        if self.exhausted || self.position_in_block >= self.block_count {
617            u32::MAX
618        } else {
619            self.block_doc_ids[self.position_in_block]
620        }
621    }
622
623    pub fn term_freq(&self) -> u32 {
624        if self.exhausted || self.position_in_block >= self.block_count {
625            0
626        } else {
627            self.block_term_freqs[self.position_in_block]
628        }
629    }
630
631    pub fn positions(&self) -> &[u32] {
632        if self.exhausted || self.position_in_block >= self.block_count {
633            &[]
634        } else {
635            let start = self.block_pos_offsets[self.position_in_block];
636            let end = self.block_pos_offsets[self.position_in_block + 1];
637            &self.block_positions[start..end]
638        }
639    }
640
641    pub fn advance(&mut self) {
642        if self.exhausted {
643            return;
644        }
645
646        self.position_in_block += 1;
647        if self.position_in_block >= self.block_count {
648            self.load_block(self.current_block + 1);
649        }
650    }
651
652    pub fn seek(&mut self, target: DocId) {
653        if self.exhausted {
654            return;
655        }
656
657        // Check if target is in current block
658        if let Some((_, last, _)) = self.list.skip_list.get(self.current_block)
659            && target <= *last
660        {
661            // Target might be in current block, scan forward
662            while self.position_in_block < self.block_count
663                && self.block_doc_ids[self.position_in_block] < target
664            {
665                self.position_in_block += 1;
666            }
667            if self.position_in_block >= self.block_count {
668                self.load_block(self.current_block + 1);
669                self.seek(target); // Continue seeking in next block
670            }
671            return;
672        }
673
674        // Binary search on skip list to find the right block
675        let block_idx = match self.list.skip_list.binary_search_by(|&(base, last, _)| {
676            if target < base {
677                std::cmp::Ordering::Greater
678            } else if target > last {
679                std::cmp::Ordering::Less
680            } else {
681                std::cmp::Ordering::Equal
682            }
683        }) {
684            Ok(idx) => idx,
685            Err(idx) => idx, // Use the next block if not found exactly
686        };
687
688        if block_idx >= self.list.skip_list.len() {
689            self.exhausted = true;
690            return;
691        }
692
693        self.load_block(block_idx);
694
695        // Linear scan within block
696        while self.position_in_block < self.block_count
697            && self.block_doc_ids[self.position_in_block] < target
698        {
699            self.position_in_block += 1;
700        }
701
702        if self.position_in_block >= self.block_count {
703            self.load_block(self.current_block + 1);
704        }
705    }
706}
707
708#[cfg(test)]
709mod tests {
710    use super::*;
711
712    #[test]
713    fn test_position_encoding() {
714        // Element 0, position 5
715        let pos = encode_position(0, 5);
716        assert_eq!(decode_element_ordinal(pos), 0);
717        assert_eq!(decode_token_position(pos), 5);
718
719        // Element 3, position 100
720        let pos = encode_position(3, 100);
721        assert_eq!(decode_element_ordinal(pos), 3);
722        assert_eq!(decode_token_position(pos), 100);
723
724        // Max values
725        let pos = encode_position(MAX_ELEMENT_ORDINAL, MAX_TOKEN_POSITION);
726        assert_eq!(decode_element_ordinal(pos), MAX_ELEMENT_ORDINAL);
727        assert_eq!(decode_token_position(pos), MAX_TOKEN_POSITION);
728    }
729
730    #[test]
731    fn test_position_posting_list_build() {
732        // Build from postings
733        let postings = vec![
734            PostingWithPositions {
735                doc_id: 1,
736                term_freq: 2,
737                positions: vec![encode_position(0, 0), encode_position(0, 2)],
738            },
739            PostingWithPositions {
740                doc_id: 3,
741                term_freq: 1,
742                positions: vec![encode_position(1, 0)],
743            },
744        ];
745
746        let list = PositionPostingList::from_postings(&postings).unwrap();
747        assert_eq!(list.doc_count(), 2);
748
749        // Test binary search
750        let pos = list.get_positions(1).unwrap();
751        assert_eq!(pos.len(), 2);
752
753        let pos = list.get_positions(3).unwrap();
754        assert_eq!(pos.len(), 1);
755
756        // Not found
757        assert!(list.get_positions(2).is_none());
758        assert!(list.get_positions(99).is_none());
759    }
760
761    #[test]
762    fn test_serialization_roundtrip() {
763        let postings = vec![
764            PostingWithPositions {
765                doc_id: 1,
766                term_freq: 2,
767                positions: vec![encode_position(0, 0), encode_position(0, 5)],
768            },
769            PostingWithPositions {
770                doc_id: 3,
771                term_freq: 1,
772                positions: vec![encode_position(1, 0)],
773            },
774            PostingWithPositions {
775                doc_id: 5,
776                term_freq: 1,
777                positions: vec![encode_position(0, 10)],
778            },
779        ];
780
781        let list = PositionPostingList::from_postings(&postings).unwrap();
782
783        let mut bytes = Vec::new();
784        list.serialize(&mut bytes).unwrap();
785
786        let deserialized = PositionPostingList::deserialize(&bytes).unwrap();
787
788        assert_eq!(list.doc_count(), deserialized.doc_count());
789
790        // Verify binary search works on deserialized
791        let pos = deserialized.get_positions(1).unwrap();
792        assert_eq!(pos, vec![encode_position(0, 0), encode_position(0, 5)]);
793
794        let pos = deserialized.get_positions(3).unwrap();
795        assert_eq!(pos, vec![encode_position(1, 0)]);
796    }
797
798    #[test]
799    fn test_binary_search_many_blocks() {
800        // Create enough postings to span multiple blocks (128 per block)
801        let mut postings = Vec::new();
802        for i in 0..300 {
803            postings.push(PostingWithPositions {
804                doc_id: i * 2, // doc_ids: 0, 2, 4, 6, ...
805                term_freq: 1,
806                positions: vec![encode_position(0, i)],
807            });
808        }
809
810        let list = PositionPostingList::from_postings(&postings).unwrap();
811        assert_eq!(list.doc_count(), 300);
812
813        // Should have 3 blocks (128 + 128 + 44)
814        assert_eq!(list.skip_list.len(), 3);
815
816        // Test binary search across blocks
817        let pos = list.get_positions(0).unwrap();
818        assert_eq!(pos, vec![encode_position(0, 0)]);
819
820        let pos = list.get_positions(256).unwrap(); // doc 128 in block 1
821        assert_eq!(pos, vec![encode_position(0, 128)]);
822
823        let pos = list.get_positions(598).unwrap(); // last doc
824        assert_eq!(pos, vec![encode_position(0, 299)]);
825
826        // Not found (odd numbers don't exist)
827        assert!(list.get_positions(1).is_none());
828        assert!(list.get_positions(257).is_none());
829    }
830
831    #[test]
832    fn test_concatenate_blocks_merge() {
833        // Test efficient merge by concatenating blocks
834        let postings1 = vec![
835            PostingWithPositions {
836                doc_id: 0,
837                term_freq: 1,
838                positions: vec![0],
839            },
840            PostingWithPositions {
841                doc_id: 1,
842                term_freq: 1,
843                positions: vec![5],
844            },
845            PostingWithPositions {
846                doc_id: 2,
847                term_freq: 1,
848                positions: vec![10],
849            },
850        ];
851        let list1 = PositionPostingList::from_postings(&postings1).unwrap();
852
853        let postings2 = vec![
854            PostingWithPositions {
855                doc_id: 0,
856                term_freq: 1,
857                positions: vec![100],
858            },
859            PostingWithPositions {
860                doc_id: 1,
861                term_freq: 1,
862                positions: vec![105],
863            },
864        ];
865        let list2 = PositionPostingList::from_postings(&postings2).unwrap();
866
867        // Merge with doc_id offset
868        let combined = PositionPostingList::concatenate_blocks(&[
869            (list1, 0), // no offset
870            (list2, 3), // offset by 3 (list1 has 3 docs)
871        ])
872        .unwrap();
873
874        assert_eq!(combined.doc_count(), 5);
875
876        // Check doc_ids are correctly remapped
877        assert!(combined.get_positions(0).is_some());
878        assert!(combined.get_positions(1).is_some());
879        assert!(combined.get_positions(2).is_some());
880        assert!(combined.get_positions(3).is_some()); // was 0 in list2
881        assert!(combined.get_positions(4).is_some()); // was 1 in list2
882    }
883
884    #[test]
885    fn test_iterator() {
886        let postings = vec![
887            PostingWithPositions {
888                doc_id: 1,
889                term_freq: 2,
890                positions: vec![0, 5],
891            },
892            PostingWithPositions {
893                doc_id: 3,
894                term_freq: 1,
895                positions: vec![10],
896            },
897            PostingWithPositions {
898                doc_id: 5,
899                term_freq: 1,
900                positions: vec![15],
901            },
902        ];
903
904        let list = PositionPostingList::from_postings(&postings).unwrap();
905        let mut iter = list.iter();
906
907        assert_eq!(iter.doc(), 1);
908        assert_eq!(iter.positions(), &[0, 5]);
909
910        iter.advance();
911        assert_eq!(iter.doc(), 3);
912
913        iter.seek(5);
914        assert_eq!(iter.doc(), 5);
915        assert_eq!(iter.positions(), &[15]);
916
917        iter.advance();
918        assert_eq!(iter.doc(), u32::MAX); // exhausted
919    }
920
921    /// Helper: build a PositionPostingList from (doc_id, positions) pairs
922    fn build_ppl(entries: &[(u32, Vec<u32>)]) -> PositionPostingList {
923        let postings: Vec<PostingWithPositions> = entries
924            .iter()
925            .map(|(doc_id, positions)| PostingWithPositions {
926                doc_id: *doc_id,
927                term_freq: positions.len() as u32,
928                positions: positions.clone(),
929            })
930            .collect();
931        PositionPostingList::from_postings(&postings).unwrap()
932    }
933
934    /// Helper: serialize a PositionPostingList to bytes
935    fn serialize_ppl(ppl: &PositionPostingList) -> Vec<u8> {
936        let mut buf = Vec::new();
937        ppl.serialize(&mut buf).unwrap();
938        buf
939    }
940
941    /// Helper: collect all (doc_id, positions) from a PositionPostingIterator
942    fn collect_positions(ppl: &PositionPostingList) -> Vec<(u32, Vec<u32>)> {
943        let mut result = Vec::new();
944        let mut it = ppl.iter();
945        while it.doc() != u32::MAX {
946            result.push((it.doc(), it.positions().to_vec()));
947            it.advance();
948        }
949        result
950    }
951
952    #[test]
953    fn test_concatenate_streaming_matches_blocks() {
954        // Build 3 segments with positions
955        let seg_a: Vec<(u32, Vec<u32>)> = (0..150)
956            .map(|i| (i * 2, vec![i * 10, i * 10 + 3]))
957            .collect();
958        let seg_b: Vec<(u32, Vec<u32>)> = (0..100).map(|i| (i * 5, vec![i * 7])).collect();
959        let seg_c: Vec<(u32, Vec<u32>)> = (0..80).map(|i| (i * 3, vec![i, i + 1, i + 2])).collect();
960
961        let ppl_a = build_ppl(&seg_a);
962        let ppl_b = build_ppl(&seg_b);
963        let ppl_c = build_ppl(&seg_c);
964
965        let offset_b = 500u32;
966        let offset_c = 1000u32;
967
968        // Method 1: concatenate_blocks (reference)
969        let ref_merged = PositionPostingList::concatenate_blocks(&[
970            (ppl_a.clone(), 0),
971            (ppl_b.clone(), offset_b),
972            (ppl_c.clone(), offset_c),
973        ])
974        .unwrap();
975        let mut ref_buf = Vec::new();
976        ref_merged.serialize(&mut ref_buf).unwrap();
977
978        // Method 2: concatenate_streaming
979        let bytes_a = serialize_ppl(&ppl_a);
980        let bytes_b = serialize_ppl(&ppl_b);
981        let bytes_c = serialize_ppl(&ppl_c);
982
983        let sources: Vec<(&[u8], u32)> =
984            vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
985        let mut stream_buf = Vec::new();
986        let (doc_count, bytes_written) =
987            PositionPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
988
989        assert_eq!(doc_count, 330);
990        assert_eq!(bytes_written, stream_buf.len());
991
992        // Deserialize both and compare all postings + positions
993        let ref_posts = collect_positions(&PositionPostingList::deserialize(&ref_buf).unwrap());
994        let stream_posts =
995            collect_positions(&PositionPostingList::deserialize(&stream_buf).unwrap());
996
997        assert_eq!(ref_posts.len(), stream_posts.len());
998        for (i, (r, s)) in ref_posts.iter().zip(stream_posts.iter()).enumerate() {
999            assert_eq!(r.0, s.0, "doc_id mismatch at {}", i);
1000            assert_eq!(r.1, s.1, "positions mismatch at doc {}", r.0);
1001        }
1002    }
1003
1004    #[test]
1005    fn test_positions_multi_round_merge() {
1006        // Round 0: 4 segments with distinct positions
1007        let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..4)
1008            .map(|seg| {
1009                (0..200)
1010                    .map(|i| {
1011                        let pos_count = (i % 3) + 1;
1012                        let positions: Vec<u32> = (0..pos_count)
1013                            .map(|p| (seg * 1000 + i * 10 + p) as u32)
1014                            .collect();
1015                        (i as u32 * 3, positions)
1016                    })
1017                    .collect()
1018            })
1019            .collect();
1020
1021        let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
1022        let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
1023
1024        // Round 1: merge pairs
1025        let mut merged_01 = Vec::new();
1026        let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
1027        let (dc_01, _) =
1028            PositionPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
1029        assert_eq!(dc_01, 400);
1030
1031        let mut merged_23 = Vec::new();
1032        let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
1033        let (dc_23, _) =
1034            PositionPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
1035        assert_eq!(dc_23, 400);
1036
1037        // Round 2: merge intermediate results
1038        let mut final_merged = Vec::new();
1039        let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
1040        let (dc_final, _) =
1041            PositionPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
1042        assert_eq!(dc_final, 800);
1043
1044        // Verify all positions survived two rounds of merging
1045        let final_ppl = PositionPostingList::deserialize(&final_merged).unwrap();
1046        let all = collect_positions(&final_ppl);
1047        assert_eq!(all.len(), 800);
1048
1049        // Spot-check: first doc of each original segment
1050        // Seg0: doc_id=0, positions from seg=0
1051        assert_eq!(all[0].0, 0);
1052        assert_eq!(all[0].1, vec![0]); // seg=0, i=0, pos_count=1, pos=0*1000+0*10+0=0
1053
1054        // Seg1: doc_id=600, positions from seg=1
1055        assert_eq!(all[200].0, 600);
1056        assert_eq!(all[200].1, vec![1000]); // seg=1, i=0, pos_count=1, pos=1*1000+0*10+0=1000
1057
1058        // Seg2: doc_id=1200, positions from seg=2
1059        assert_eq!(all[400].0, 1200);
1060        assert_eq!(all[400].1, vec![2000]); // seg=2, i=0, pos_count=1, pos=2*1000+0*10+0=2000
1061
1062        // Verify seek + positions on the final merged list
1063        let mut it = final_ppl.iter();
1064        it.seek(1200);
1065        assert_eq!(it.doc(), 1200);
1066        assert_eq!(it.positions(), &[2000]);
1067
1068        // Verify get_positions (binary search path)
1069        let pos = final_ppl.get_positions(600).unwrap();
1070        assert_eq!(pos, vec![1000]);
1071    }
1072
1073    #[test]
1074    fn test_positions_large_scale_merge() {
1075        // 5 segments × 500 docs, each doc has 1-4 positions
1076        let num_segments = 5usize;
1077        let docs_per_segment = 500usize;
1078
1079        let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..num_segments)
1080            .map(|seg| {
1081                (0..docs_per_segment)
1082                    .map(|i| {
1083                        let n_pos = (i % 4) + 1;
1084                        let positions: Vec<u32> =
1085                            (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1086                        (i as u32 * 2, positions)
1087                    })
1088                    .collect()
1089            })
1090            .collect();
1091
1092        let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
1093        let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
1094
1095        let max_doc = (docs_per_segment as u32 - 1) * 2;
1096        let offsets: Vec<u32> = (0..num_segments)
1097            .map(|i| i as u32 * (max_doc + 1))
1098            .collect();
1099
1100        let sources: Vec<(&[u8], u32)> = serialized
1101            .iter()
1102            .zip(offsets.iter())
1103            .map(|(b, o)| (b.as_slice(), *o))
1104            .collect();
1105
1106        let mut merged = Vec::new();
1107        let (doc_count, _) =
1108            PositionPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1109        assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1110
1111        // Deserialize and verify all positions
1112        let merged_ppl = PositionPostingList::deserialize(&merged).unwrap();
1113        let all = collect_positions(&merged_ppl);
1114        assert_eq!(all.len(), num_segments * docs_per_segment);
1115
1116        // Verify positions for each segment
1117        for (seg, &offset) in offsets.iter().enumerate() {
1118            for i in 0..docs_per_segment {
1119                let idx = seg * docs_per_segment + i;
1120                let expected_doc = i as u32 * 2 + offset;
1121                assert_eq!(all[idx].0, expected_doc, "seg={} i={}", seg, i);
1122
1123                let n_pos = (i % 4) + 1;
1124                let expected_positions: Vec<u32> =
1125                    (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1126                assert_eq!(
1127                    all[idx].1, expected_positions,
1128                    "positions mismatch seg={} i={}",
1129                    seg, i
1130                );
1131            }
1132        }
1133    }
1134
1135    #[test]
1136    fn test_positions_streaming_single_source() {
1137        let entries: Vec<(u32, Vec<u32>)> =
1138            (0..300).map(|i| (i * 4, vec![i * 2, i * 2 + 1])).collect();
1139        let ppl = build_ppl(&entries);
1140        let direct = serialize_ppl(&ppl);
1141
1142        let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1143        let mut streamed = Vec::new();
1144        PositionPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1145
1146        let p1 = collect_positions(&PositionPostingList::deserialize(&direct).unwrap());
1147        let p2 = collect_positions(&PositionPostingList::deserialize(&streamed).unwrap());
1148        assert_eq!(p1, p2);
1149    }
1150
1151    #[test]
1152    fn test_positions_edge_single_doc() {
1153        let ppl_a = build_ppl(&[(0, vec![42, 43, 44])]);
1154        let ppl_b = build_ppl(&[(0, vec![100])]);
1155
1156        let merged = PositionPostingList::concatenate_blocks(&[(ppl_a, 0), (ppl_b, 1)]).unwrap();
1157
1158        assert_eq!(merged.doc_count(), 2);
1159        assert_eq!(merged.get_positions(0).unwrap(), vec![42, 43, 44]);
1160        assert_eq!(merged.get_positions(1).unwrap(), vec![100]);
1161    }
1162}