1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
26use std::io::{self, Read, Write};
27
28use crate::DocId;
29
30pub const POSITION_BLOCK_SIZE: usize = 128;
32
33pub const MAX_TOKEN_POSITION: u32 = (1 << 20) - 1;
35
36pub const MAX_ELEMENT_ORDINAL: u32 = (1 << 12) - 1;
38
39#[inline]
41pub fn encode_position(element_ordinal: u32, token_position: u32) -> u32 {
42 debug_assert!(
43 element_ordinal <= MAX_ELEMENT_ORDINAL,
44 "Element ordinal {} exceeds maximum {}",
45 element_ordinal,
46 MAX_ELEMENT_ORDINAL
47 );
48 debug_assert!(
49 token_position <= MAX_TOKEN_POSITION,
50 "Token position {} exceeds maximum {}",
51 token_position,
52 MAX_TOKEN_POSITION
53 );
54 (element_ordinal << 20) | (token_position & MAX_TOKEN_POSITION)
55}
56
57#[inline]
59pub fn decode_element_ordinal(position: u32) -> u32 {
60 position >> 20
61}
62
63#[inline]
65pub fn decode_token_position(position: u32) -> u32 {
66 position & MAX_TOKEN_POSITION
67}
68
69#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct PostingWithPositions {
72 pub doc_id: DocId,
73 pub term_freq: u32,
74 pub positions: Vec<u32>,
76}
77
78#[derive(Debug, Clone)]
83pub struct PositionPostingList {
84 skip_list: Vec<(DocId, DocId, u32)>,
87 data: Vec<u8>,
89 doc_count: u32,
91}
92
93impl Default for PositionPostingList {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99impl PositionPostingList {
100 pub fn new() -> Self {
101 Self {
102 skip_list: Vec::new(),
103 data: Vec::new(),
104 doc_count: 0,
105 }
106 }
107
108 pub fn with_capacity(capacity: usize) -> Self {
109 Self {
110 skip_list: Vec::with_capacity(capacity / POSITION_BLOCK_SIZE + 1),
111 data: Vec::with_capacity(capacity * 8), doc_count: 0,
113 }
114 }
115
116 pub fn from_postings(postings: &[PostingWithPositions]) -> io::Result<Self> {
118 if postings.is_empty() {
119 return Ok(Self::new());
120 }
121
122 let mut skip_list = Vec::new();
123 let mut data = Vec::new();
124 let mut i = 0;
125
126 while i < postings.len() {
127 let block_start = data.len() as u32;
128 let block_end = (i + POSITION_BLOCK_SIZE).min(postings.len());
129 let block = &postings[i..block_end];
130
131 let base_doc_id = block.first().unwrap().doc_id;
133 let last_doc_id = block.last().unwrap().doc_id;
134 skip_list.push((base_doc_id, last_doc_id, block_start));
135
136 data.write_u32::<LittleEndian>(block.len() as u32)?;
138 data.write_u32::<LittleEndian>(base_doc_id)?;
139
140 let mut prev_doc_id = base_doc_id;
141 for (j, posting) in block.iter().enumerate() {
142 if j > 0 {
143 let delta = posting.doc_id - prev_doc_id;
144 write_vint(&mut data, delta as u64)?;
145 }
146 prev_doc_id = posting.doc_id;
147
148 write_vint(&mut data, posting.positions.len() as u64)?;
150 for &pos in &posting.positions {
151 write_vint(&mut data, pos as u64)?;
152 }
153 }
154
155 i = block_end;
156 }
157
158 Ok(Self {
159 skip_list,
160 data,
161 doc_count: postings.len() as u32,
162 })
163 }
164
165 pub fn push(&mut self, doc_id: DocId, positions: Vec<u32>) {
167 let posting = PostingWithPositions {
170 doc_id,
171 term_freq: positions.len() as u32,
172 positions,
173 };
174
175 let block_start = self.data.len() as u32;
177
178 let need_new_block =
180 self.skip_list.is_empty() || self.doc_count.is_multiple_of(POSITION_BLOCK_SIZE as u32);
181
182 if need_new_block {
183 self.skip_list.push((doc_id, doc_id, block_start));
185 self.data.write_u32::<LittleEndian>(1u32).unwrap();
186 self.data.write_u32::<LittleEndian>(doc_id).unwrap();
187 } else {
188 let last_block = self.skip_list.last_mut().unwrap();
190 let prev_doc_id = last_block.1;
191 last_block.1 = doc_id;
192
193 let count_offset = last_block.2 as usize;
195 let old_count = u32::from_le_bytes(
196 self.data[count_offset..count_offset + 4]
197 .try_into()
198 .unwrap(),
199 );
200 self.data[count_offset..count_offset + 4]
201 .copy_from_slice(&(old_count + 1).to_le_bytes());
202
203 let delta = doc_id - prev_doc_id;
204 write_vint(&mut self.data, delta as u64).unwrap();
205 }
206
207 write_vint(&mut self.data, posting.positions.len() as u64).unwrap();
209 for &pos in &posting.positions {
210 write_vint(&mut self.data, pos as u64).unwrap();
211 }
212
213 self.doc_count += 1;
214 }
215
216 pub fn doc_count(&self) -> u32 {
217 self.doc_count
218 }
219
220 pub fn len(&self) -> usize {
221 self.doc_count as usize
222 }
223
224 pub fn is_empty(&self) -> bool {
225 self.doc_count == 0
226 }
227
228 pub fn get_positions(&self, target_doc_id: DocId) -> Option<Vec<u32>> {
230 if self.skip_list.is_empty() {
231 return None;
232 }
233
234 let block_idx = match self.skip_list.binary_search_by(|&(base, last, _)| {
236 if target_doc_id < base {
237 std::cmp::Ordering::Greater
238 } else if target_doc_id > last {
239 std::cmp::Ordering::Less
240 } else {
241 std::cmp::Ordering::Equal
242 }
243 }) {
244 Ok(idx) => idx,
245 Err(_) => return None, };
247
248 let offset = self.skip_list[block_idx].2 as usize;
250 let mut reader = &self.data[offset..];
251
252 let count = reader.read_u32::<LittleEndian>().ok()? as usize;
254 let first_doc = reader.read_u32::<LittleEndian>().ok()?;
255 let mut prev_doc_id = first_doc;
256
257 for i in 0..count {
258 let doc_id = if i == 0 {
259 first_doc
260 } else {
261 let delta = read_vint(&mut reader).ok()? as u32;
262 prev_doc_id + delta
263 };
264 prev_doc_id = doc_id;
265
266 let num_positions = read_vint(&mut reader).ok()? as usize;
267
268 if doc_id == target_doc_id {
269 let mut positions = Vec::with_capacity(num_positions);
271 for _ in 0..num_positions {
272 let pos = read_vint(&mut reader).ok()? as u32;
273 positions.push(pos);
274 }
275 return Some(positions);
276 } else {
277 for _ in 0..num_positions {
279 let _ = read_vint(&mut reader);
280 }
281 }
282 }
283
284 None
285 }
286
287 pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
296 writer.write_all(&self.data)?;
298
299 for (base_doc_id, last_doc_id, offset) in &self.skip_list {
301 writer.write_u32::<LittleEndian>(*base_doc_id)?;
302 writer.write_u32::<LittleEndian>(*last_doc_id)?;
303 writer.write_u32::<LittleEndian>(*offset)?;
304 }
305
306 writer.write_u32::<LittleEndian>(self.data.len() as u32)?;
308 writer.write_u32::<LittleEndian>(self.skip_list.len() as u32)?;
309 writer.write_u32::<LittleEndian>(self.doc_count)?;
310
311 Ok(())
312 }
313
314 pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
316 if raw.len() < 12 {
317 return Err(io::Error::new(
318 io::ErrorKind::InvalidData,
319 "position data too short",
320 ));
321 }
322
323 let f = raw.len() - 12;
325 let data_len = u32::from_le_bytes(raw[f..f + 4].try_into().unwrap()) as usize;
326 let skip_count = u32::from_le_bytes(raw[f + 4..f + 8].try_into().unwrap()) as usize;
327 let doc_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap());
328
329 let mut skip_list = Vec::with_capacity(skip_count);
331 let mut pos = data_len;
332 for _ in 0..skip_count {
333 let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
334 let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
335 let offset = u32::from_le_bytes(raw[pos + 8..pos + 12].try_into().unwrap());
336 skip_list.push((base, last, offset));
337 pos += 12;
338 }
339
340 let data = raw[..data_len].to_vec();
341
342 Ok(Self {
343 skip_list,
344 data,
345 doc_count,
346 })
347 }
348
349 pub fn concatenate_blocks(sources: &[(PositionPostingList, u32)]) -> io::Result<Self> {
351 let mut skip_list = Vec::new();
352 let mut data = Vec::new();
353 let mut total_docs = 0u32;
354
355 for (source, doc_offset) in sources {
356 for block_idx in 0..source.skip_list.len() {
357 let (base, last, src_offset) = source.skip_list[block_idx];
358 let next_offset = if block_idx + 1 < source.skip_list.len() {
359 source.skip_list[block_idx + 1].2 as usize
360 } else {
361 source.data.len()
362 };
363
364 let new_base = base + doc_offset;
365 let new_last = last + doc_offset;
366 let new_offset = data.len() as u32;
367
368 let block_bytes = &source.data[src_offset as usize..next_offset];
370
371 let count = u32::from_le_bytes(block_bytes[0..4].try_into().unwrap());
373 let first_doc = u32::from_le_bytes(block_bytes[4..8].try_into().unwrap());
374
375 data.write_u32::<LittleEndian>(count)?;
377 data.write_u32::<LittleEndian>(first_doc + doc_offset)?;
378 data.extend_from_slice(&block_bytes[8..]);
379
380 skip_list.push((new_base, new_last, new_offset));
381 total_docs += count;
382 }
383 }
384
385 Ok(Self {
386 skip_list,
387 data,
388 doc_count: total_docs,
389 })
390 }
391
392 pub fn concatenate_streaming<W: Write>(
402 sources: &[(&[u8], u32)],
403 writer: &mut W,
404 ) -> io::Result<(u32, usize)> {
405 struct RawSource<'a> {
406 skip_list: Vec<(u32, u32, u32)>,
407 data: &'a [u8],
408 doc_count: u32,
409 doc_offset: u32,
410 }
411
412 let mut parsed: Vec<RawSource<'_>> = Vec::with_capacity(sources.len());
413 for (raw, doc_offset) in sources {
414 if raw.len() < 12 {
415 continue;
416 }
417 let f = raw.len() - 12;
418 let data_len = u32::from_le_bytes(raw[f..f + 4].try_into().unwrap()) as usize;
419 let skip_count = u32::from_le_bytes(raw[f + 4..f + 8].try_into().unwrap()) as usize;
420 let doc_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap());
421
422 let mut skip_list = Vec::with_capacity(skip_count);
423 let mut pos = data_len;
424 for _ in 0..skip_count {
425 let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
426 let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
427 let offset = u32::from_le_bytes(raw[pos + 8..pos + 12].try_into().unwrap());
428 skip_list.push((base, last, offset));
429 pos += 12;
430 }
431 parsed.push(RawSource {
432 skip_list,
433 data: &raw[..data_len],
434 doc_count,
435 doc_offset: *doc_offset,
436 });
437 }
438
439 let total_docs: u32 = parsed.iter().map(|s| s.doc_count).sum();
440
441 let mut merged_skip: Vec<(u32, u32, u32)> = Vec::new();
443 let mut data_written = 0u32;
444 let mut patch_buf = [0u8; 8];
445
446 for src in &parsed {
447 for (i, &(base, last, offset)) in src.skip_list.iter().enumerate() {
448 let start = offset as usize;
449 let end = if i + 1 < src.skip_list.len() {
450 src.skip_list[i + 1].2 as usize
451 } else {
452 src.data.len()
453 };
454 let block = &src.data[start..end];
455
456 merged_skip.push((base + src.doc_offset, last + src.doc_offset, data_written));
457
458 patch_buf[0..4].copy_from_slice(&block[0..4]);
459 let first_doc = u32::from_le_bytes(block[4..8].try_into().unwrap());
460 patch_buf[4..8].copy_from_slice(&(first_doc + src.doc_offset).to_le_bytes());
461 writer.write_all(&patch_buf)?;
462 writer.write_all(&block[8..])?;
463
464 data_written += block.len() as u32;
465 }
466 }
467
468 for (base, last, offset) in &merged_skip {
470 writer.write_u32::<LittleEndian>(*base)?;
471 writer.write_u32::<LittleEndian>(*last)?;
472 writer.write_u32::<LittleEndian>(*offset)?;
473 }
474
475 writer.write_u32::<LittleEndian>(data_written)?;
476 writer.write_u32::<LittleEndian>(merged_skip.len() as u32)?;
477 writer.write_u32::<LittleEndian>(total_docs)?;
478
479 let total_bytes = data_written as usize + merged_skip.len() * 12 + 12;
480 Ok((total_docs, total_bytes))
481 }
482
483 pub fn iter(&self) -> PositionPostingIterator<'_> {
485 PositionPostingIterator::new(self)
486 }
487}
488
489fn write_vint<W: Write>(writer: &mut W, mut value: u64) -> io::Result<()> {
491 loop {
492 let byte = (value & 0x7F) as u8;
493 value >>= 7;
494 if value == 0 {
495 writer.write_u8(byte)?;
496 break;
497 } else {
498 writer.write_u8(byte | 0x80)?;
499 }
500 }
501 Ok(())
502}
503
504fn read_vint<R: Read>(reader: &mut R) -> io::Result<u64> {
506 let mut result = 0u64;
507 let mut shift = 0;
508 loop {
509 let byte = reader.read_u8()?;
510 result |= ((byte & 0x7F) as u64) << shift;
511 if byte & 0x80 == 0 {
512 break;
513 }
514 shift += 7;
515 }
516 Ok(result)
517}
518
519pub struct PositionPostingIterator<'a> {
521 list: &'a PositionPostingList,
522 current_block: usize,
523 position_in_block: usize,
524 block_postings: Vec<PostingWithPositions>,
525 exhausted: bool,
526}
527
528impl<'a> PositionPostingIterator<'a> {
529 pub fn new(list: &'a PositionPostingList) -> Self {
530 let exhausted = list.skip_list.is_empty();
531 let mut iter = Self {
532 list,
533 current_block: 0,
534 position_in_block: 0,
535 block_postings: Vec::new(),
536 exhausted,
537 };
538 if !iter.exhausted {
539 iter.load_block(0);
540 }
541 iter
542 }
543
544 fn load_block(&mut self, block_idx: usize) {
545 if block_idx >= self.list.skip_list.len() {
546 self.exhausted = true;
547 return;
548 }
549
550 self.current_block = block_idx;
551 self.position_in_block = 0;
552
553 let offset = self.list.skip_list[block_idx].2 as usize;
554 let mut reader = &self.list.data[offset..];
555
556 let count = reader.read_u32::<LittleEndian>().unwrap_or(0) as usize;
558 let first_doc = reader.read_u32::<LittleEndian>().unwrap_or(0);
559 self.block_postings.clear();
560 self.block_postings.reserve(count);
561
562 let mut prev_doc_id = first_doc;
563
564 for i in 0..count {
565 let doc_id = if i == 0 {
566 first_doc
567 } else {
568 let delta = read_vint(&mut reader).unwrap_or(0) as u32;
569 prev_doc_id + delta
570 };
571 prev_doc_id = doc_id;
572
573 let num_positions = read_vint(&mut reader).unwrap_or(0) as usize;
574 let mut positions = Vec::with_capacity(num_positions);
575 for _ in 0..num_positions {
576 let pos = read_vint(&mut reader).unwrap_or(0) as u32;
577 positions.push(pos);
578 }
579
580 self.block_postings.push(PostingWithPositions {
581 doc_id,
582 term_freq: num_positions as u32,
583 positions,
584 });
585 }
586 }
587
588 pub fn doc(&self) -> DocId {
589 if self.exhausted || self.position_in_block >= self.block_postings.len() {
590 u32::MAX
591 } else {
592 self.block_postings[self.position_in_block].doc_id
593 }
594 }
595
596 pub fn term_freq(&self) -> u32 {
597 if self.exhausted || self.position_in_block >= self.block_postings.len() {
598 0
599 } else {
600 self.block_postings[self.position_in_block].term_freq
601 }
602 }
603
604 pub fn positions(&self) -> &[u32] {
605 if self.exhausted || self.position_in_block >= self.block_postings.len() {
606 &[]
607 } else {
608 &self.block_postings[self.position_in_block].positions
609 }
610 }
611
612 pub fn advance(&mut self) {
613 if self.exhausted {
614 return;
615 }
616
617 self.position_in_block += 1;
618 if self.position_in_block >= self.block_postings.len() {
619 self.load_block(self.current_block + 1);
620 }
621 }
622
623 pub fn seek(&mut self, target: DocId) {
624 if self.exhausted {
625 return;
626 }
627
628 if let Some((_, last, _)) = self.list.skip_list.get(self.current_block)
630 && target <= *last
631 {
632 while self.position_in_block < self.block_postings.len()
634 && self.block_postings[self.position_in_block].doc_id < target
635 {
636 self.position_in_block += 1;
637 }
638 if self.position_in_block >= self.block_postings.len() {
639 self.load_block(self.current_block + 1);
640 self.seek(target); }
642 return;
643 }
644
645 let block_idx = match self.list.skip_list.binary_search_by(|&(base, last, _)| {
647 if target < base {
648 std::cmp::Ordering::Greater
649 } else if target > last {
650 std::cmp::Ordering::Less
651 } else {
652 std::cmp::Ordering::Equal
653 }
654 }) {
655 Ok(idx) => idx,
656 Err(idx) => idx, };
658
659 if block_idx >= self.list.skip_list.len() {
660 self.exhausted = true;
661 return;
662 }
663
664 self.load_block(block_idx);
665
666 while self.position_in_block < self.block_postings.len()
668 && self.block_postings[self.position_in_block].doc_id < target
669 {
670 self.position_in_block += 1;
671 }
672
673 if self.position_in_block >= self.block_postings.len() {
674 self.load_block(self.current_block + 1);
675 }
676 }
677}
678
679#[cfg(test)]
680mod tests {
681 use super::*;
682
683 #[test]
684 fn test_position_encoding() {
685 let pos = encode_position(0, 5);
687 assert_eq!(decode_element_ordinal(pos), 0);
688 assert_eq!(decode_token_position(pos), 5);
689
690 let pos = encode_position(3, 100);
692 assert_eq!(decode_element_ordinal(pos), 3);
693 assert_eq!(decode_token_position(pos), 100);
694
695 let pos = encode_position(MAX_ELEMENT_ORDINAL, MAX_TOKEN_POSITION);
697 assert_eq!(decode_element_ordinal(pos), MAX_ELEMENT_ORDINAL);
698 assert_eq!(decode_token_position(pos), MAX_TOKEN_POSITION);
699 }
700
701 #[test]
702 fn test_position_posting_list_build() {
703 let postings = vec![
705 PostingWithPositions {
706 doc_id: 1,
707 term_freq: 2,
708 positions: vec![encode_position(0, 0), encode_position(0, 2)],
709 },
710 PostingWithPositions {
711 doc_id: 3,
712 term_freq: 1,
713 positions: vec![encode_position(1, 0)],
714 },
715 ];
716
717 let list = PositionPostingList::from_postings(&postings).unwrap();
718 assert_eq!(list.doc_count(), 2);
719
720 let pos = list.get_positions(1).unwrap();
722 assert_eq!(pos.len(), 2);
723
724 let pos = list.get_positions(3).unwrap();
725 assert_eq!(pos.len(), 1);
726
727 assert!(list.get_positions(2).is_none());
729 assert!(list.get_positions(99).is_none());
730 }
731
732 #[test]
733 fn test_serialization_roundtrip() {
734 let postings = vec![
735 PostingWithPositions {
736 doc_id: 1,
737 term_freq: 2,
738 positions: vec![encode_position(0, 0), encode_position(0, 5)],
739 },
740 PostingWithPositions {
741 doc_id: 3,
742 term_freq: 1,
743 positions: vec![encode_position(1, 0)],
744 },
745 PostingWithPositions {
746 doc_id: 5,
747 term_freq: 1,
748 positions: vec![encode_position(0, 10)],
749 },
750 ];
751
752 let list = PositionPostingList::from_postings(&postings).unwrap();
753
754 let mut bytes = Vec::new();
755 list.serialize(&mut bytes).unwrap();
756
757 let deserialized = PositionPostingList::deserialize(&bytes).unwrap();
758
759 assert_eq!(list.doc_count(), deserialized.doc_count());
760
761 let pos = deserialized.get_positions(1).unwrap();
763 assert_eq!(pos, vec![encode_position(0, 0), encode_position(0, 5)]);
764
765 let pos = deserialized.get_positions(3).unwrap();
766 assert_eq!(pos, vec![encode_position(1, 0)]);
767 }
768
769 #[test]
770 fn test_binary_search_many_blocks() {
771 let mut postings = Vec::new();
773 for i in 0..300 {
774 postings.push(PostingWithPositions {
775 doc_id: i * 2, term_freq: 1,
777 positions: vec![encode_position(0, i)],
778 });
779 }
780
781 let list = PositionPostingList::from_postings(&postings).unwrap();
782 assert_eq!(list.doc_count(), 300);
783
784 assert_eq!(list.skip_list.len(), 3);
786
787 let pos = list.get_positions(0).unwrap();
789 assert_eq!(pos, vec![encode_position(0, 0)]);
790
791 let pos = list.get_positions(256).unwrap(); assert_eq!(pos, vec![encode_position(0, 128)]);
793
794 let pos = list.get_positions(598).unwrap(); assert_eq!(pos, vec![encode_position(0, 299)]);
796
797 assert!(list.get_positions(1).is_none());
799 assert!(list.get_positions(257).is_none());
800 }
801
802 #[test]
803 fn test_concatenate_blocks_merge() {
804 let postings1 = vec![
806 PostingWithPositions {
807 doc_id: 0,
808 term_freq: 1,
809 positions: vec![0],
810 },
811 PostingWithPositions {
812 doc_id: 1,
813 term_freq: 1,
814 positions: vec![5],
815 },
816 PostingWithPositions {
817 doc_id: 2,
818 term_freq: 1,
819 positions: vec![10],
820 },
821 ];
822 let list1 = PositionPostingList::from_postings(&postings1).unwrap();
823
824 let postings2 = vec![
825 PostingWithPositions {
826 doc_id: 0,
827 term_freq: 1,
828 positions: vec![100],
829 },
830 PostingWithPositions {
831 doc_id: 1,
832 term_freq: 1,
833 positions: vec![105],
834 },
835 ];
836 let list2 = PositionPostingList::from_postings(&postings2).unwrap();
837
838 let combined = PositionPostingList::concatenate_blocks(&[
840 (list1, 0), (list2, 3), ])
843 .unwrap();
844
845 assert_eq!(combined.doc_count(), 5);
846
847 assert!(combined.get_positions(0).is_some());
849 assert!(combined.get_positions(1).is_some());
850 assert!(combined.get_positions(2).is_some());
851 assert!(combined.get_positions(3).is_some()); assert!(combined.get_positions(4).is_some()); }
854
855 #[test]
856 fn test_iterator() {
857 let postings = vec![
858 PostingWithPositions {
859 doc_id: 1,
860 term_freq: 2,
861 positions: vec![0, 5],
862 },
863 PostingWithPositions {
864 doc_id: 3,
865 term_freq: 1,
866 positions: vec![10],
867 },
868 PostingWithPositions {
869 doc_id: 5,
870 term_freq: 1,
871 positions: vec![15],
872 },
873 ];
874
875 let list = PositionPostingList::from_postings(&postings).unwrap();
876 let mut iter = list.iter();
877
878 assert_eq!(iter.doc(), 1);
879 assert_eq!(iter.positions(), &[0, 5]);
880
881 iter.advance();
882 assert_eq!(iter.doc(), 3);
883
884 iter.seek(5);
885 assert_eq!(iter.doc(), 5);
886 assert_eq!(iter.positions(), &[15]);
887
888 iter.advance();
889 assert_eq!(iter.doc(), u32::MAX); }
891
892 fn build_ppl(entries: &[(u32, Vec<u32>)]) -> PositionPostingList {
894 let postings: Vec<PostingWithPositions> = entries
895 .iter()
896 .map(|(doc_id, positions)| PostingWithPositions {
897 doc_id: *doc_id,
898 term_freq: positions.len() as u32,
899 positions: positions.clone(),
900 })
901 .collect();
902 PositionPostingList::from_postings(&postings).unwrap()
903 }
904
905 fn serialize_ppl(ppl: &PositionPostingList) -> Vec<u8> {
907 let mut buf = Vec::new();
908 ppl.serialize(&mut buf).unwrap();
909 buf
910 }
911
912 fn collect_positions(ppl: &PositionPostingList) -> Vec<(u32, Vec<u32>)> {
914 let mut result = Vec::new();
915 let mut it = ppl.iter();
916 while it.doc() != u32::MAX {
917 result.push((it.doc(), it.positions().to_vec()));
918 it.advance();
919 }
920 result
921 }
922
923 #[test]
924 fn test_concatenate_streaming_matches_blocks() {
925 let seg_a: Vec<(u32, Vec<u32>)> = (0..150)
927 .map(|i| (i * 2, vec![i * 10, i * 10 + 3]))
928 .collect();
929 let seg_b: Vec<(u32, Vec<u32>)> = (0..100).map(|i| (i * 5, vec![i * 7])).collect();
930 let seg_c: Vec<(u32, Vec<u32>)> = (0..80).map(|i| (i * 3, vec![i, i + 1, i + 2])).collect();
931
932 let ppl_a = build_ppl(&seg_a);
933 let ppl_b = build_ppl(&seg_b);
934 let ppl_c = build_ppl(&seg_c);
935
936 let offset_b = 500u32;
937 let offset_c = 1000u32;
938
939 let ref_merged = PositionPostingList::concatenate_blocks(&[
941 (ppl_a.clone(), 0),
942 (ppl_b.clone(), offset_b),
943 (ppl_c.clone(), offset_c),
944 ])
945 .unwrap();
946 let mut ref_buf = Vec::new();
947 ref_merged.serialize(&mut ref_buf).unwrap();
948
949 let bytes_a = serialize_ppl(&ppl_a);
951 let bytes_b = serialize_ppl(&ppl_b);
952 let bytes_c = serialize_ppl(&ppl_c);
953
954 let sources: Vec<(&[u8], u32)> =
955 vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
956 let mut stream_buf = Vec::new();
957 let (doc_count, bytes_written) =
958 PositionPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
959
960 assert_eq!(doc_count, 330);
961 assert_eq!(bytes_written, stream_buf.len());
962
963 let ref_posts = collect_positions(&PositionPostingList::deserialize(&ref_buf).unwrap());
965 let stream_posts =
966 collect_positions(&PositionPostingList::deserialize(&stream_buf).unwrap());
967
968 assert_eq!(ref_posts.len(), stream_posts.len());
969 for (i, (r, s)) in ref_posts.iter().zip(stream_posts.iter()).enumerate() {
970 assert_eq!(r.0, s.0, "doc_id mismatch at {}", i);
971 assert_eq!(r.1, s.1, "positions mismatch at doc {}", r.0);
972 }
973 }
974
975 #[test]
976 fn test_positions_multi_round_merge() {
977 let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..4)
979 .map(|seg| {
980 (0..200)
981 .map(|i| {
982 let pos_count = (i % 3) + 1;
983 let positions: Vec<u32> = (0..pos_count)
984 .map(|p| (seg * 1000 + i * 10 + p) as u32)
985 .collect();
986 (i as u32 * 3, positions)
987 })
988 .collect()
989 })
990 .collect();
991
992 let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
993 let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
994
995 let mut merged_01 = Vec::new();
997 let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
998 let (dc_01, _) =
999 PositionPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
1000 assert_eq!(dc_01, 400);
1001
1002 let mut merged_23 = Vec::new();
1003 let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
1004 let (dc_23, _) =
1005 PositionPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
1006 assert_eq!(dc_23, 400);
1007
1008 let mut final_merged = Vec::new();
1010 let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
1011 let (dc_final, _) =
1012 PositionPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
1013 assert_eq!(dc_final, 800);
1014
1015 let final_ppl = PositionPostingList::deserialize(&final_merged).unwrap();
1017 let all = collect_positions(&final_ppl);
1018 assert_eq!(all.len(), 800);
1019
1020 assert_eq!(all[0].0, 0);
1023 assert_eq!(all[0].1, vec![0]); assert_eq!(all[200].0, 600);
1027 assert_eq!(all[200].1, vec![1000]); assert_eq!(all[400].0, 1200);
1031 assert_eq!(all[400].1, vec![2000]); let mut it = final_ppl.iter();
1035 it.seek(1200);
1036 assert_eq!(it.doc(), 1200);
1037 assert_eq!(it.positions(), &[2000]);
1038
1039 let pos = final_ppl.get_positions(600).unwrap();
1041 assert_eq!(pos, vec![1000]);
1042 }
1043
1044 #[test]
1045 fn test_positions_large_scale_merge() {
1046 let num_segments = 5usize;
1048 let docs_per_segment = 500usize;
1049
1050 let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..num_segments)
1051 .map(|seg| {
1052 (0..docs_per_segment)
1053 .map(|i| {
1054 let n_pos = (i % 4) + 1;
1055 let positions: Vec<u32> =
1056 (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1057 (i as u32 * 2, positions)
1058 })
1059 .collect()
1060 })
1061 .collect();
1062
1063 let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
1064 let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
1065
1066 let max_doc = (docs_per_segment as u32 - 1) * 2;
1067 let offsets: Vec<u32> = (0..num_segments)
1068 .map(|i| i as u32 * (max_doc + 1))
1069 .collect();
1070
1071 let sources: Vec<(&[u8], u32)> = serialized
1072 .iter()
1073 .zip(offsets.iter())
1074 .map(|(b, o)| (b.as_slice(), *o))
1075 .collect();
1076
1077 let mut merged = Vec::new();
1078 let (doc_count, _) =
1079 PositionPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1080 assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1081
1082 let merged_ppl = PositionPostingList::deserialize(&merged).unwrap();
1084 let all = collect_positions(&merged_ppl);
1085 assert_eq!(all.len(), num_segments * docs_per_segment);
1086
1087 for (seg, &offset) in offsets.iter().enumerate() {
1089 for i in 0..docs_per_segment {
1090 let idx = seg * docs_per_segment + i;
1091 let expected_doc = i as u32 * 2 + offset;
1092 assert_eq!(all[idx].0, expected_doc, "seg={} i={}", seg, i);
1093
1094 let n_pos = (i % 4) + 1;
1095 let expected_positions: Vec<u32> =
1096 (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1097 assert_eq!(
1098 all[idx].1, expected_positions,
1099 "positions mismatch seg={} i={}",
1100 seg, i
1101 );
1102 }
1103 }
1104 }
1105
1106 #[test]
1107 fn test_positions_streaming_single_source() {
1108 let entries: Vec<(u32, Vec<u32>)> =
1109 (0..300).map(|i| (i * 4, vec![i * 2, i * 2 + 1])).collect();
1110 let ppl = build_ppl(&entries);
1111 let direct = serialize_ppl(&ppl);
1112
1113 let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1114 let mut streamed = Vec::new();
1115 PositionPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1116
1117 let p1 = collect_positions(&PositionPostingList::deserialize(&direct).unwrap());
1118 let p2 = collect_positions(&PositionPostingList::deserialize(&streamed).unwrap());
1119 assert_eq!(p1, p2);
1120 }
1121
1122 #[test]
1123 fn test_positions_edge_single_doc() {
1124 let ppl_a = build_ppl(&[(0, vec![42, 43, 44])]);
1125 let ppl_b = build_ppl(&[(0, vec![100])]);
1126
1127 let merged = PositionPostingList::concatenate_blocks(&[(ppl_a, 0), (ppl_b, 1)]).unwrap();
1128
1129 assert_eq!(merged.doc_count(), 2);
1130 assert_eq!(merged.get_positions(0).unwrap(), vec![42, 43, 44]);
1131 assert_eq!(merged.get_positions(1).unwrap(), vec![100]);
1132 }
1133}