1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
26use std::io::{self, Write};
27
28use super::posting_common::{read_vint, write_vint};
29use crate::DocId;
30
31pub const POSITION_BLOCK_SIZE: usize = 128;
33
34pub const MAX_TOKEN_POSITION: u32 = (1 << 20) - 1;
36
37pub const MAX_ELEMENT_ORDINAL: u32 = (1 << 12) - 1;
39
40#[inline]
42pub fn encode_position(element_ordinal: u32, token_position: u32) -> u32 {
43 debug_assert!(
44 element_ordinal <= MAX_ELEMENT_ORDINAL,
45 "Element ordinal {} exceeds maximum {}",
46 element_ordinal,
47 MAX_ELEMENT_ORDINAL
48 );
49 debug_assert!(
50 token_position <= MAX_TOKEN_POSITION,
51 "Token position {} exceeds maximum {}",
52 token_position,
53 MAX_TOKEN_POSITION
54 );
55 (element_ordinal << 20) | (token_position & MAX_TOKEN_POSITION)
56}
57
58#[inline]
60pub fn decode_element_ordinal(position: u32) -> u32 {
61 position >> 20
62}
63
64#[inline]
66pub fn decode_token_position(position: u32) -> u32 {
67 position & MAX_TOKEN_POSITION
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct PostingWithPositions {
73 pub doc_id: DocId,
74 pub term_freq: u32,
75 pub positions: Vec<u32>,
77}
78
79#[derive(Debug, Clone)]
84pub struct PositionPostingList {
85 skip_list: Vec<(DocId, DocId, u32)>,
88 data: Vec<u8>,
90 doc_count: u32,
92}
93
94impl Default for PositionPostingList {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100impl PositionPostingList {
101 pub fn new() -> Self {
102 Self {
103 skip_list: Vec::new(),
104 data: Vec::new(),
105 doc_count: 0,
106 }
107 }
108
109 pub fn with_capacity(capacity: usize) -> Self {
110 Self {
111 skip_list: Vec::with_capacity(capacity / POSITION_BLOCK_SIZE + 1),
112 data: Vec::with_capacity(capacity * 8), doc_count: 0,
114 }
115 }
116
117 pub fn from_postings(postings: &[PostingWithPositions]) -> io::Result<Self> {
119 if postings.is_empty() {
120 return Ok(Self::new());
121 }
122
123 let mut skip_list = Vec::new();
124 let mut data = Vec::new();
125 let mut i = 0;
126
127 while i < postings.len() {
128 let block_start = data.len() as u32;
129 let block_end = (i + POSITION_BLOCK_SIZE).min(postings.len());
130 let block = &postings[i..block_end];
131
132 let base_doc_id = block.first().unwrap().doc_id;
134 let last_doc_id = block.last().unwrap().doc_id;
135 skip_list.push((base_doc_id, last_doc_id, block_start));
136
137 data.write_u32::<LittleEndian>(block.len() as u32)?;
139 data.write_u32::<LittleEndian>(base_doc_id)?;
140
141 let mut prev_doc_id = base_doc_id;
142 for (j, posting) in block.iter().enumerate() {
143 if j > 0 {
144 let delta = posting.doc_id - prev_doc_id;
145 write_vint(&mut data, delta as u64)?;
146 }
147 prev_doc_id = posting.doc_id;
148
149 write_vint(&mut data, posting.positions.len() as u64)?;
151 for &pos in &posting.positions {
152 write_vint(&mut data, pos as u64)?;
153 }
154 }
155
156 i = block_end;
157 }
158
159 Ok(Self {
160 skip_list,
161 data,
162 doc_count: postings.len() as u32,
163 })
164 }
165
166 pub fn push(&mut self, doc_id: DocId, positions: Vec<u32>) {
168 let posting = PostingWithPositions {
171 doc_id,
172 term_freq: positions.len() as u32,
173 positions,
174 };
175
176 let block_start = self.data.len() as u32;
178
179 let need_new_block =
181 self.skip_list.is_empty() || self.doc_count.is_multiple_of(POSITION_BLOCK_SIZE as u32);
182
183 if need_new_block {
184 self.skip_list.push((doc_id, doc_id, block_start));
186 self.data.write_u32::<LittleEndian>(1u32).unwrap();
187 self.data.write_u32::<LittleEndian>(doc_id).unwrap();
188 } else {
189 let last_block = self.skip_list.last_mut().unwrap();
191 let prev_doc_id = last_block.1;
192 last_block.1 = doc_id;
193
194 let count_offset = last_block.2 as usize;
196 let old_count = u32::from_le_bytes(
197 self.data[count_offset..count_offset + 4]
198 .try_into()
199 .unwrap(),
200 );
201 self.data[count_offset..count_offset + 4]
202 .copy_from_slice(&(old_count + 1).to_le_bytes());
203
204 let delta = doc_id - prev_doc_id;
205 write_vint(&mut self.data, delta as u64).unwrap();
206 }
207
208 write_vint(&mut self.data, posting.positions.len() as u64).unwrap();
210 for &pos in &posting.positions {
211 write_vint(&mut self.data, pos as u64).unwrap();
212 }
213
214 self.doc_count += 1;
215 }
216
217 pub fn doc_count(&self) -> u32 {
218 self.doc_count
219 }
220
221 pub fn len(&self) -> usize {
222 self.doc_count as usize
223 }
224
225 pub fn is_empty(&self) -> bool {
226 self.doc_count == 0
227 }
228
229 pub fn get_positions(&self, target_doc_id: DocId) -> Option<Vec<u32>> {
231 if self.skip_list.is_empty() {
232 return None;
233 }
234
235 let block_idx = match self.skip_list.binary_search_by(|&(base, last, _)| {
237 if target_doc_id < base {
238 std::cmp::Ordering::Greater
239 } else if target_doc_id > last {
240 std::cmp::Ordering::Less
241 } else {
242 std::cmp::Ordering::Equal
243 }
244 }) {
245 Ok(idx) => idx,
246 Err(_) => return None, };
248
249 let offset = self.skip_list[block_idx].2 as usize;
251 let mut reader = &self.data[offset..];
252
253 let count = reader.read_u32::<LittleEndian>().ok()? as usize;
255 let first_doc = reader.read_u32::<LittleEndian>().ok()?;
256 let mut prev_doc_id = first_doc;
257
258 for i in 0..count {
259 let doc_id = if i == 0 {
260 first_doc
261 } else {
262 let delta = read_vint(&mut reader).ok()? as u32;
263 prev_doc_id + delta
264 };
265 prev_doc_id = doc_id;
266
267 let num_positions = read_vint(&mut reader).ok()? as usize;
268
269 if doc_id == target_doc_id {
270 let mut positions = Vec::with_capacity(num_positions);
272 for _ in 0..num_positions {
273 let pos = read_vint(&mut reader).ok()? as u32;
274 positions.push(pos);
275 }
276 return Some(positions);
277 } else {
278 for _ in 0..num_positions {
280 let _ = read_vint(&mut reader);
281 }
282 }
283 }
284
285 None
286 }
287
288 pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
297 writer.write_all(&self.data)?;
299
300 for (base_doc_id, last_doc_id, offset) in &self.skip_list {
302 writer.write_u32::<LittleEndian>(*base_doc_id)?;
303 writer.write_u32::<LittleEndian>(*last_doc_id)?;
304 writer.write_u32::<LittleEndian>(*offset)?;
305 }
306
307 writer.write_u32::<LittleEndian>(self.data.len() as u32)?;
309 writer.write_u32::<LittleEndian>(self.skip_list.len() as u32)?;
310 writer.write_u32::<LittleEndian>(self.doc_count)?;
311
312 Ok(())
313 }
314
315 pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
317 if raw.len() < 12 {
318 return Err(io::Error::new(
319 io::ErrorKind::InvalidData,
320 "position data too short",
321 ));
322 }
323
324 let f = raw.len() - 12;
326 let data_len = u32::from_le_bytes(raw[f..f + 4].try_into().unwrap()) as usize;
327 let skip_count = u32::from_le_bytes(raw[f + 4..f + 8].try_into().unwrap()) as usize;
328 let doc_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap());
329
330 let mut skip_list = Vec::with_capacity(skip_count);
332 let mut pos = data_len;
333 for _ in 0..skip_count {
334 let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
335 let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
336 let offset = u32::from_le_bytes(raw[pos + 8..pos + 12].try_into().unwrap());
337 skip_list.push((base, last, offset));
338 pos += 12;
339 }
340
341 let data = raw[..data_len].to_vec();
342
343 Ok(Self {
344 skip_list,
345 data,
346 doc_count,
347 })
348 }
349
350 pub fn concatenate_blocks(sources: &[(PositionPostingList, u32)]) -> io::Result<Self> {
352 let mut skip_list = Vec::new();
353 let mut data = Vec::new();
354 let mut total_docs = 0u32;
355
356 for (source, doc_offset) in sources {
357 for block_idx in 0..source.skip_list.len() {
358 let (base, last, src_offset) = source.skip_list[block_idx];
359 let next_offset = if block_idx + 1 < source.skip_list.len() {
360 source.skip_list[block_idx + 1].2 as usize
361 } else {
362 source.data.len()
363 };
364
365 let new_base = base + doc_offset;
366 let new_last = last + doc_offset;
367 let new_offset = data.len() as u32;
368
369 let block_bytes = &source.data[src_offset as usize..next_offset];
371
372 let count = u32::from_le_bytes(block_bytes[0..4].try_into().unwrap());
374 let first_doc = u32::from_le_bytes(block_bytes[4..8].try_into().unwrap());
375
376 data.write_u32::<LittleEndian>(count)?;
378 data.write_u32::<LittleEndian>(first_doc + doc_offset)?;
379 data.extend_from_slice(&block_bytes[8..]);
380
381 skip_list.push((new_base, new_last, new_offset));
382 total_docs += count;
383 }
384 }
385
386 Ok(Self {
387 skip_list,
388 data,
389 doc_count: total_docs,
390 })
391 }
392
393 pub fn concatenate_streaming<W: Write>(
403 sources: &[(&[u8], u32)],
404 writer: &mut W,
405 ) -> io::Result<(u32, usize)> {
406 struct RawSource<'a> {
407 skip_list: Vec<(u32, u32, u32)>,
408 data: &'a [u8],
409 doc_count: u32,
410 doc_offset: u32,
411 }
412
413 let mut parsed: Vec<RawSource<'_>> = Vec::with_capacity(sources.len());
414 for (raw, doc_offset) in sources {
415 if raw.len() < 12 {
416 continue;
417 }
418 let f = raw.len() - 12;
419 let data_len = u32::from_le_bytes(raw[f..f + 4].try_into().unwrap()) as usize;
420 let skip_count = u32::from_le_bytes(raw[f + 4..f + 8].try_into().unwrap()) as usize;
421 let doc_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap());
422
423 let mut skip_list = Vec::with_capacity(skip_count);
424 let mut pos = data_len;
425 for _ in 0..skip_count {
426 let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
427 let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
428 let offset = u32::from_le_bytes(raw[pos + 8..pos + 12].try_into().unwrap());
429 skip_list.push((base, last, offset));
430 pos += 12;
431 }
432 parsed.push(RawSource {
433 skip_list,
434 data: &raw[..data_len],
435 doc_count,
436 doc_offset: *doc_offset,
437 });
438 }
439
440 let total_docs: u32 = parsed.iter().map(|s| s.doc_count).sum();
441
442 let mut merged_skip: Vec<(u32, u32, u32)> = Vec::new();
444 let mut data_written = 0u32;
445 let mut patch_buf = [0u8; 8];
446
447 for src in &parsed {
448 for (i, &(base, last, offset)) in src.skip_list.iter().enumerate() {
449 let start = offset as usize;
450 let end = if i + 1 < src.skip_list.len() {
451 src.skip_list[i + 1].2 as usize
452 } else {
453 src.data.len()
454 };
455 let block = &src.data[start..end];
456
457 merged_skip.push((base + src.doc_offset, last + src.doc_offset, data_written));
458
459 patch_buf[0..4].copy_from_slice(&block[0..4]);
460 let first_doc = u32::from_le_bytes(block[4..8].try_into().unwrap());
461 patch_buf[4..8].copy_from_slice(&(first_doc + src.doc_offset).to_le_bytes());
462 writer.write_all(&patch_buf)?;
463 writer.write_all(&block[8..])?;
464
465 data_written += block.len() as u32;
466 }
467 }
468
469 for (base, last, offset) in &merged_skip {
471 writer.write_u32::<LittleEndian>(*base)?;
472 writer.write_u32::<LittleEndian>(*last)?;
473 writer.write_u32::<LittleEndian>(*offset)?;
474 }
475
476 writer.write_u32::<LittleEndian>(data_written)?;
477 writer.write_u32::<LittleEndian>(merged_skip.len() as u32)?;
478 writer.write_u32::<LittleEndian>(total_docs)?;
479
480 let total_bytes = data_written as usize + merged_skip.len() * 12 + 12;
481 Ok((total_docs, total_bytes))
482 }
483
484 pub fn iter(&self) -> PositionPostingIterator<'_> {
486 PositionPostingIterator::new(self)
487 }
488}
489
490pub struct PositionPostingIterator<'a> {
492 list: &'a PositionPostingList,
493 current_block: usize,
494 position_in_block: usize,
495 block_postings: Vec<PostingWithPositions>,
496 exhausted: bool,
497}
498
499impl<'a> PositionPostingIterator<'a> {
500 pub fn new(list: &'a PositionPostingList) -> Self {
501 let exhausted = list.skip_list.is_empty();
502 let mut iter = Self {
503 list,
504 current_block: 0,
505 position_in_block: 0,
506 block_postings: Vec::new(),
507 exhausted,
508 };
509 if !iter.exhausted {
510 iter.load_block(0);
511 }
512 iter
513 }
514
515 fn load_block(&mut self, block_idx: usize) {
516 if block_idx >= self.list.skip_list.len() {
517 self.exhausted = true;
518 return;
519 }
520
521 self.current_block = block_idx;
522 self.position_in_block = 0;
523
524 let offset = self.list.skip_list[block_idx].2 as usize;
525 let mut reader = &self.list.data[offset..];
526
527 let count = reader.read_u32::<LittleEndian>().unwrap_or(0) as usize;
529 let first_doc = reader.read_u32::<LittleEndian>().unwrap_or(0);
530 self.block_postings.clear();
531 self.block_postings.reserve(count);
532
533 let mut prev_doc_id = first_doc;
534
535 for i in 0..count {
536 let doc_id = if i == 0 {
537 first_doc
538 } else {
539 let delta = read_vint(&mut reader).unwrap_or(0) as u32;
540 prev_doc_id + delta
541 };
542 prev_doc_id = doc_id;
543
544 let num_positions = read_vint(&mut reader).unwrap_or(0) as usize;
545 let mut positions = Vec::with_capacity(num_positions);
546 for _ in 0..num_positions {
547 let pos = read_vint(&mut reader).unwrap_or(0) as u32;
548 positions.push(pos);
549 }
550
551 self.block_postings.push(PostingWithPositions {
552 doc_id,
553 term_freq: num_positions as u32,
554 positions,
555 });
556 }
557 }
558
559 pub fn doc(&self) -> DocId {
560 if self.exhausted || self.position_in_block >= self.block_postings.len() {
561 u32::MAX
562 } else {
563 self.block_postings[self.position_in_block].doc_id
564 }
565 }
566
567 pub fn term_freq(&self) -> u32 {
568 if self.exhausted || self.position_in_block >= self.block_postings.len() {
569 0
570 } else {
571 self.block_postings[self.position_in_block].term_freq
572 }
573 }
574
575 pub fn positions(&self) -> &[u32] {
576 if self.exhausted || self.position_in_block >= self.block_postings.len() {
577 &[]
578 } else {
579 &self.block_postings[self.position_in_block].positions
580 }
581 }
582
583 pub fn advance(&mut self) {
584 if self.exhausted {
585 return;
586 }
587
588 self.position_in_block += 1;
589 if self.position_in_block >= self.block_postings.len() {
590 self.load_block(self.current_block + 1);
591 }
592 }
593
594 pub fn seek(&mut self, target: DocId) {
595 if self.exhausted {
596 return;
597 }
598
599 if let Some((_, last, _)) = self.list.skip_list.get(self.current_block)
601 && target <= *last
602 {
603 while self.position_in_block < self.block_postings.len()
605 && self.block_postings[self.position_in_block].doc_id < target
606 {
607 self.position_in_block += 1;
608 }
609 if self.position_in_block >= self.block_postings.len() {
610 self.load_block(self.current_block + 1);
611 self.seek(target); }
613 return;
614 }
615
616 let block_idx = match self.list.skip_list.binary_search_by(|&(base, last, _)| {
618 if target < base {
619 std::cmp::Ordering::Greater
620 } else if target > last {
621 std::cmp::Ordering::Less
622 } else {
623 std::cmp::Ordering::Equal
624 }
625 }) {
626 Ok(idx) => idx,
627 Err(idx) => idx, };
629
630 if block_idx >= self.list.skip_list.len() {
631 self.exhausted = true;
632 return;
633 }
634
635 self.load_block(block_idx);
636
637 while self.position_in_block < self.block_postings.len()
639 && self.block_postings[self.position_in_block].doc_id < target
640 {
641 self.position_in_block += 1;
642 }
643
644 if self.position_in_block >= self.block_postings.len() {
645 self.load_block(self.current_block + 1);
646 }
647 }
648}
649
650#[cfg(test)]
651mod tests {
652 use super::*;
653
654 #[test]
655 fn test_position_encoding() {
656 let pos = encode_position(0, 5);
658 assert_eq!(decode_element_ordinal(pos), 0);
659 assert_eq!(decode_token_position(pos), 5);
660
661 let pos = encode_position(3, 100);
663 assert_eq!(decode_element_ordinal(pos), 3);
664 assert_eq!(decode_token_position(pos), 100);
665
666 let pos = encode_position(MAX_ELEMENT_ORDINAL, MAX_TOKEN_POSITION);
668 assert_eq!(decode_element_ordinal(pos), MAX_ELEMENT_ORDINAL);
669 assert_eq!(decode_token_position(pos), MAX_TOKEN_POSITION);
670 }
671
672 #[test]
673 fn test_position_posting_list_build() {
674 let postings = vec![
676 PostingWithPositions {
677 doc_id: 1,
678 term_freq: 2,
679 positions: vec![encode_position(0, 0), encode_position(0, 2)],
680 },
681 PostingWithPositions {
682 doc_id: 3,
683 term_freq: 1,
684 positions: vec![encode_position(1, 0)],
685 },
686 ];
687
688 let list = PositionPostingList::from_postings(&postings).unwrap();
689 assert_eq!(list.doc_count(), 2);
690
691 let pos = list.get_positions(1).unwrap();
693 assert_eq!(pos.len(), 2);
694
695 let pos = list.get_positions(3).unwrap();
696 assert_eq!(pos.len(), 1);
697
698 assert!(list.get_positions(2).is_none());
700 assert!(list.get_positions(99).is_none());
701 }
702
703 #[test]
704 fn test_serialization_roundtrip() {
705 let postings = vec![
706 PostingWithPositions {
707 doc_id: 1,
708 term_freq: 2,
709 positions: vec![encode_position(0, 0), encode_position(0, 5)],
710 },
711 PostingWithPositions {
712 doc_id: 3,
713 term_freq: 1,
714 positions: vec![encode_position(1, 0)],
715 },
716 PostingWithPositions {
717 doc_id: 5,
718 term_freq: 1,
719 positions: vec![encode_position(0, 10)],
720 },
721 ];
722
723 let list = PositionPostingList::from_postings(&postings).unwrap();
724
725 let mut bytes = Vec::new();
726 list.serialize(&mut bytes).unwrap();
727
728 let deserialized = PositionPostingList::deserialize(&bytes).unwrap();
729
730 assert_eq!(list.doc_count(), deserialized.doc_count());
731
732 let pos = deserialized.get_positions(1).unwrap();
734 assert_eq!(pos, vec![encode_position(0, 0), encode_position(0, 5)]);
735
736 let pos = deserialized.get_positions(3).unwrap();
737 assert_eq!(pos, vec![encode_position(1, 0)]);
738 }
739
740 #[test]
741 fn test_binary_search_many_blocks() {
742 let mut postings = Vec::new();
744 for i in 0..300 {
745 postings.push(PostingWithPositions {
746 doc_id: i * 2, term_freq: 1,
748 positions: vec![encode_position(0, i)],
749 });
750 }
751
752 let list = PositionPostingList::from_postings(&postings).unwrap();
753 assert_eq!(list.doc_count(), 300);
754
755 assert_eq!(list.skip_list.len(), 3);
757
758 let pos = list.get_positions(0).unwrap();
760 assert_eq!(pos, vec![encode_position(0, 0)]);
761
762 let pos = list.get_positions(256).unwrap(); assert_eq!(pos, vec![encode_position(0, 128)]);
764
765 let pos = list.get_positions(598).unwrap(); assert_eq!(pos, vec![encode_position(0, 299)]);
767
768 assert!(list.get_positions(1).is_none());
770 assert!(list.get_positions(257).is_none());
771 }
772
773 #[test]
774 fn test_concatenate_blocks_merge() {
775 let postings1 = vec![
777 PostingWithPositions {
778 doc_id: 0,
779 term_freq: 1,
780 positions: vec![0],
781 },
782 PostingWithPositions {
783 doc_id: 1,
784 term_freq: 1,
785 positions: vec![5],
786 },
787 PostingWithPositions {
788 doc_id: 2,
789 term_freq: 1,
790 positions: vec![10],
791 },
792 ];
793 let list1 = PositionPostingList::from_postings(&postings1).unwrap();
794
795 let postings2 = vec![
796 PostingWithPositions {
797 doc_id: 0,
798 term_freq: 1,
799 positions: vec![100],
800 },
801 PostingWithPositions {
802 doc_id: 1,
803 term_freq: 1,
804 positions: vec![105],
805 },
806 ];
807 let list2 = PositionPostingList::from_postings(&postings2).unwrap();
808
809 let combined = PositionPostingList::concatenate_blocks(&[
811 (list1, 0), (list2, 3), ])
814 .unwrap();
815
816 assert_eq!(combined.doc_count(), 5);
817
818 assert!(combined.get_positions(0).is_some());
820 assert!(combined.get_positions(1).is_some());
821 assert!(combined.get_positions(2).is_some());
822 assert!(combined.get_positions(3).is_some()); assert!(combined.get_positions(4).is_some()); }
825
826 #[test]
827 fn test_iterator() {
828 let postings = vec![
829 PostingWithPositions {
830 doc_id: 1,
831 term_freq: 2,
832 positions: vec![0, 5],
833 },
834 PostingWithPositions {
835 doc_id: 3,
836 term_freq: 1,
837 positions: vec![10],
838 },
839 PostingWithPositions {
840 doc_id: 5,
841 term_freq: 1,
842 positions: vec![15],
843 },
844 ];
845
846 let list = PositionPostingList::from_postings(&postings).unwrap();
847 let mut iter = list.iter();
848
849 assert_eq!(iter.doc(), 1);
850 assert_eq!(iter.positions(), &[0, 5]);
851
852 iter.advance();
853 assert_eq!(iter.doc(), 3);
854
855 iter.seek(5);
856 assert_eq!(iter.doc(), 5);
857 assert_eq!(iter.positions(), &[15]);
858
859 iter.advance();
860 assert_eq!(iter.doc(), u32::MAX); }
862
863 fn build_ppl(entries: &[(u32, Vec<u32>)]) -> PositionPostingList {
865 let postings: Vec<PostingWithPositions> = entries
866 .iter()
867 .map(|(doc_id, positions)| PostingWithPositions {
868 doc_id: *doc_id,
869 term_freq: positions.len() as u32,
870 positions: positions.clone(),
871 })
872 .collect();
873 PositionPostingList::from_postings(&postings).unwrap()
874 }
875
876 fn serialize_ppl(ppl: &PositionPostingList) -> Vec<u8> {
878 let mut buf = Vec::new();
879 ppl.serialize(&mut buf).unwrap();
880 buf
881 }
882
883 fn collect_positions(ppl: &PositionPostingList) -> Vec<(u32, Vec<u32>)> {
885 let mut result = Vec::new();
886 let mut it = ppl.iter();
887 while it.doc() != u32::MAX {
888 result.push((it.doc(), it.positions().to_vec()));
889 it.advance();
890 }
891 result
892 }
893
894 #[test]
895 fn test_concatenate_streaming_matches_blocks() {
896 let seg_a: Vec<(u32, Vec<u32>)> = (0..150)
898 .map(|i| (i * 2, vec![i * 10, i * 10 + 3]))
899 .collect();
900 let seg_b: Vec<(u32, Vec<u32>)> = (0..100).map(|i| (i * 5, vec![i * 7])).collect();
901 let seg_c: Vec<(u32, Vec<u32>)> = (0..80).map(|i| (i * 3, vec![i, i + 1, i + 2])).collect();
902
903 let ppl_a = build_ppl(&seg_a);
904 let ppl_b = build_ppl(&seg_b);
905 let ppl_c = build_ppl(&seg_c);
906
907 let offset_b = 500u32;
908 let offset_c = 1000u32;
909
910 let ref_merged = PositionPostingList::concatenate_blocks(&[
912 (ppl_a.clone(), 0),
913 (ppl_b.clone(), offset_b),
914 (ppl_c.clone(), offset_c),
915 ])
916 .unwrap();
917 let mut ref_buf = Vec::new();
918 ref_merged.serialize(&mut ref_buf).unwrap();
919
920 let bytes_a = serialize_ppl(&ppl_a);
922 let bytes_b = serialize_ppl(&ppl_b);
923 let bytes_c = serialize_ppl(&ppl_c);
924
925 let sources: Vec<(&[u8], u32)> =
926 vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
927 let mut stream_buf = Vec::new();
928 let (doc_count, bytes_written) =
929 PositionPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
930
931 assert_eq!(doc_count, 330);
932 assert_eq!(bytes_written, stream_buf.len());
933
934 let ref_posts = collect_positions(&PositionPostingList::deserialize(&ref_buf).unwrap());
936 let stream_posts =
937 collect_positions(&PositionPostingList::deserialize(&stream_buf).unwrap());
938
939 assert_eq!(ref_posts.len(), stream_posts.len());
940 for (i, (r, s)) in ref_posts.iter().zip(stream_posts.iter()).enumerate() {
941 assert_eq!(r.0, s.0, "doc_id mismatch at {}", i);
942 assert_eq!(r.1, s.1, "positions mismatch at doc {}", r.0);
943 }
944 }
945
946 #[test]
947 fn test_positions_multi_round_merge() {
948 let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..4)
950 .map(|seg| {
951 (0..200)
952 .map(|i| {
953 let pos_count = (i % 3) + 1;
954 let positions: Vec<u32> = (0..pos_count)
955 .map(|p| (seg * 1000 + i * 10 + p) as u32)
956 .collect();
957 (i as u32 * 3, positions)
958 })
959 .collect()
960 })
961 .collect();
962
963 let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
964 let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
965
966 let mut merged_01 = Vec::new();
968 let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
969 let (dc_01, _) =
970 PositionPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
971 assert_eq!(dc_01, 400);
972
973 let mut merged_23 = Vec::new();
974 let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
975 let (dc_23, _) =
976 PositionPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
977 assert_eq!(dc_23, 400);
978
979 let mut final_merged = Vec::new();
981 let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
982 let (dc_final, _) =
983 PositionPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
984 assert_eq!(dc_final, 800);
985
986 let final_ppl = PositionPostingList::deserialize(&final_merged).unwrap();
988 let all = collect_positions(&final_ppl);
989 assert_eq!(all.len(), 800);
990
991 assert_eq!(all[0].0, 0);
994 assert_eq!(all[0].1, vec![0]); assert_eq!(all[200].0, 600);
998 assert_eq!(all[200].1, vec![1000]); assert_eq!(all[400].0, 1200);
1002 assert_eq!(all[400].1, vec![2000]); let mut it = final_ppl.iter();
1006 it.seek(1200);
1007 assert_eq!(it.doc(), 1200);
1008 assert_eq!(it.positions(), &[2000]);
1009
1010 let pos = final_ppl.get_positions(600).unwrap();
1012 assert_eq!(pos, vec![1000]);
1013 }
1014
1015 #[test]
1016 fn test_positions_large_scale_merge() {
1017 let num_segments = 5usize;
1019 let docs_per_segment = 500usize;
1020
1021 let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..num_segments)
1022 .map(|seg| {
1023 (0..docs_per_segment)
1024 .map(|i| {
1025 let n_pos = (i % 4) + 1;
1026 let positions: Vec<u32> =
1027 (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1028 (i as u32 * 2, positions)
1029 })
1030 .collect()
1031 })
1032 .collect();
1033
1034 let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
1035 let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
1036
1037 let max_doc = (docs_per_segment as u32 - 1) * 2;
1038 let offsets: Vec<u32> = (0..num_segments)
1039 .map(|i| i as u32 * (max_doc + 1))
1040 .collect();
1041
1042 let sources: Vec<(&[u8], u32)> = serialized
1043 .iter()
1044 .zip(offsets.iter())
1045 .map(|(b, o)| (b.as_slice(), *o))
1046 .collect();
1047
1048 let mut merged = Vec::new();
1049 let (doc_count, _) =
1050 PositionPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1051 assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1052
1053 let merged_ppl = PositionPostingList::deserialize(&merged).unwrap();
1055 let all = collect_positions(&merged_ppl);
1056 assert_eq!(all.len(), num_segments * docs_per_segment);
1057
1058 for (seg, &offset) in offsets.iter().enumerate() {
1060 for i in 0..docs_per_segment {
1061 let idx = seg * docs_per_segment + i;
1062 let expected_doc = i as u32 * 2 + offset;
1063 assert_eq!(all[idx].0, expected_doc, "seg={} i={}", seg, i);
1064
1065 let n_pos = (i % 4) + 1;
1066 let expected_positions: Vec<u32> =
1067 (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1068 assert_eq!(
1069 all[idx].1, expected_positions,
1070 "positions mismatch seg={} i={}",
1071 seg, i
1072 );
1073 }
1074 }
1075 }
1076
1077 #[test]
1078 fn test_positions_streaming_single_source() {
1079 let entries: Vec<(u32, Vec<u32>)> =
1080 (0..300).map(|i| (i * 4, vec![i * 2, i * 2 + 1])).collect();
1081 let ppl = build_ppl(&entries);
1082 let direct = serialize_ppl(&ppl);
1083
1084 let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1085 let mut streamed = Vec::new();
1086 PositionPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1087
1088 let p1 = collect_positions(&PositionPostingList::deserialize(&direct).unwrap());
1089 let p2 = collect_positions(&PositionPostingList::deserialize(&streamed).unwrap());
1090 assert_eq!(p1, p2);
1091 }
1092
1093 #[test]
1094 fn test_positions_edge_single_doc() {
1095 let ppl_a = build_ppl(&[(0, vec![42, 43, 44])]);
1096 let ppl_b = build_ppl(&[(0, vec![100])]);
1097
1098 let merged = PositionPostingList::concatenate_blocks(&[(ppl_a, 0), (ppl_b, 1)]).unwrap();
1099
1100 assert_eq!(merged.doc_count(), 2);
1101 assert_eq!(merged.get_positions(0).unwrap(), vec![42, 43, 44]);
1102 assert_eq!(merged.get_positions(1).unwrap(), vec![100]);
1103 }
1104}