1use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
26use std::io::{self, Write};
27
28use super::posting_common::{read_vint, write_vint};
29use crate::DocId;
30
31pub const POSITION_BLOCK_SIZE: usize = 128;
33
34pub const MAX_TOKEN_POSITION: u32 = (1 << 20) - 1;
36
37pub const MAX_ELEMENT_ORDINAL: u32 = (1 << 12) - 1;
39
40#[inline]
42pub fn encode_position(element_ordinal: u32, token_position: u32) -> u32 {
43 debug_assert!(
44 element_ordinal <= MAX_ELEMENT_ORDINAL,
45 "Element ordinal {} exceeds maximum {}",
46 element_ordinal,
47 MAX_ELEMENT_ORDINAL
48 );
49 debug_assert!(
50 token_position <= MAX_TOKEN_POSITION,
51 "Token position {} exceeds maximum {}",
52 token_position,
53 MAX_TOKEN_POSITION
54 );
55 (element_ordinal << 20) | (token_position & MAX_TOKEN_POSITION)
56}
57
58#[inline]
60pub fn decode_element_ordinal(position: u32) -> u32 {
61 position >> 20
62}
63
64#[inline]
66pub fn decode_token_position(position: u32) -> u32 {
67 position & MAX_TOKEN_POSITION
68}
69
70#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct PostingWithPositions {
73 pub doc_id: DocId,
74 pub term_freq: u32,
75 pub positions: Vec<u32>,
77}
78
79#[derive(Debug, Clone)]
84pub struct PositionPostingList {
85 skip_list: Vec<(DocId, DocId, u64)>,
88 data: Vec<u8>,
90 doc_count: u32,
92}
93
94impl Default for PositionPostingList {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100impl PositionPostingList {
101 pub fn new() -> Self {
102 Self {
103 skip_list: Vec::new(),
104 data: Vec::new(),
105 doc_count: 0,
106 }
107 }
108
109 pub fn with_capacity(capacity: usize) -> Self {
110 Self {
111 skip_list: Vec::with_capacity(capacity / POSITION_BLOCK_SIZE + 1),
112 data: Vec::with_capacity(capacity * 8), doc_count: 0,
114 }
115 }
116
117 pub fn from_postings(postings: &[PostingWithPositions]) -> io::Result<Self> {
119 if postings.is_empty() {
120 return Ok(Self::new());
121 }
122
123 let mut skip_list = Vec::new();
124 let mut data = Vec::new();
125 let mut i = 0;
126
127 while i < postings.len() {
128 let block_start = data.len() as u64;
129 let block_end = (i + POSITION_BLOCK_SIZE).min(postings.len());
130 let block = &postings[i..block_end];
131
132 let base_doc_id = block.first().unwrap().doc_id;
134 let last_doc_id = block.last().unwrap().doc_id;
135 skip_list.push((base_doc_id, last_doc_id, block_start));
136
137 data.write_u32::<LittleEndian>(block.len() as u32)?;
139 data.write_u32::<LittleEndian>(base_doc_id)?;
140
141 let mut prev_doc_id = base_doc_id;
142 for (j, posting) in block.iter().enumerate() {
143 if j > 0 {
144 let delta = posting.doc_id - prev_doc_id;
145 write_vint(&mut data, delta as u64)?;
146 }
147 prev_doc_id = posting.doc_id;
148
149 write_vint(&mut data, posting.positions.len() as u64)?;
151 for &pos in &posting.positions {
152 write_vint(&mut data, pos as u64)?;
153 }
154 }
155
156 i = block_end;
157 }
158
159 Ok(Self {
160 skip_list,
161 data,
162 doc_count: postings.len() as u32,
163 })
164 }
165
166 pub fn push(&mut self, doc_id: DocId, positions: Vec<u32>) {
168 let posting = PostingWithPositions {
171 doc_id,
172 term_freq: positions.len() as u32,
173 positions,
174 };
175
176 let block_start = self.data.len() as u64;
178
179 let need_new_block =
181 self.skip_list.is_empty() || self.doc_count.is_multiple_of(POSITION_BLOCK_SIZE as u32);
182
183 if need_new_block {
184 self.skip_list.push((doc_id, doc_id, block_start));
186 self.data.write_u32::<LittleEndian>(1u32).unwrap();
187 self.data.write_u32::<LittleEndian>(doc_id).unwrap();
188 } else {
189 let last_block = self.skip_list.last_mut().unwrap();
191 let prev_doc_id = last_block.1;
192 last_block.1 = doc_id;
193
194 let count_offset = last_block.2 as usize;
196 let old_count = u32::from_le_bytes(
197 self.data[count_offset..count_offset + 4]
198 .try_into()
199 .unwrap(),
200 );
201 self.data[count_offset..count_offset + 4]
202 .copy_from_slice(&(old_count + 1).to_le_bytes());
203
204 let delta = doc_id - prev_doc_id;
205 write_vint(&mut self.data, delta as u64).unwrap();
206 }
207
208 write_vint(&mut self.data, posting.positions.len() as u64).unwrap();
210 for &pos in &posting.positions {
211 write_vint(&mut self.data, pos as u64).unwrap();
212 }
213
214 self.doc_count += 1;
215 }
216
217 pub fn doc_count(&self) -> u32 {
218 self.doc_count
219 }
220
221 pub fn len(&self) -> usize {
222 self.doc_count as usize
223 }
224
225 pub fn is_empty(&self) -> bool {
226 self.doc_count == 0
227 }
228
229 pub fn get_positions(&self, target_doc_id: DocId) -> Option<Vec<u32>> {
231 if self.skip_list.is_empty() {
232 return None;
233 }
234
235 let block_idx = match self.skip_list.binary_search_by(|&(base, last, _)| {
237 if target_doc_id < base {
238 std::cmp::Ordering::Greater
239 } else if target_doc_id > last {
240 std::cmp::Ordering::Less
241 } else {
242 std::cmp::Ordering::Equal
243 }
244 }) {
245 Ok(idx) => idx,
246 Err(_) => return None, };
248
249 let offset = self.skip_list[block_idx].2 as usize;
251 let mut reader = &self.data[offset..];
252
253 let count = reader.read_u32::<LittleEndian>().ok()? as usize;
255 let first_doc = reader.read_u32::<LittleEndian>().ok()?;
256 let mut prev_doc_id = first_doc;
257
258 for i in 0..count {
259 let doc_id = if i == 0 {
260 first_doc
261 } else {
262 let delta = read_vint(&mut reader).ok()? as u32;
263 prev_doc_id + delta
264 };
265 prev_doc_id = doc_id;
266
267 let num_positions = read_vint(&mut reader).ok()? as usize;
268
269 if doc_id == target_doc_id {
270 let mut positions = Vec::with_capacity(num_positions);
272 for _ in 0..num_positions {
273 let pos = read_vint(&mut reader).ok()? as u32;
274 positions.push(pos);
275 }
276 return Some(positions);
277 } else {
278 for _ in 0..num_positions {
280 let _ = read_vint(&mut reader);
281 }
282 }
283 }
284
285 None
286 }
287
288 const SKIP_ENTRY_SIZE: usize = 20;
291
292 pub fn serialize<W: Write>(&self, writer: &mut W) -> io::Result<()> {
301 writer.write_all(&self.data)?;
303
304 for (i, (base_doc_id, last_doc_id, offset)) in self.skip_list.iter().enumerate() {
306 let next_offset = if i + 1 < self.skip_list.len() {
307 self.skip_list[i + 1].2
308 } else {
309 self.data.len() as u64
310 };
311 let length = (next_offset - offset) as u32;
312 writer.write_u32::<LittleEndian>(*base_doc_id)?;
313 writer.write_u32::<LittleEndian>(*last_doc_id)?;
314 writer.write_u64::<LittleEndian>(*offset)?;
315 writer.write_u32::<LittleEndian>(length)?;
316 }
317
318 writer.write_u64::<LittleEndian>(self.data.len() as u64)?;
320 writer.write_u32::<LittleEndian>(self.skip_list.len() as u32)?;
321 writer.write_u32::<LittleEndian>(self.doc_count)?;
322
323 Ok(())
324 }
325
326 pub fn deserialize(raw: &[u8]) -> io::Result<Self> {
328 if raw.len() < 16 {
329 return Err(io::Error::new(
330 io::ErrorKind::InvalidData,
331 "position data too short",
332 ));
333 }
334
335 let f = raw.len() - 16;
337 let data_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
338 let skip_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
339 let doc_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap());
340
341 let mut skip_list = Vec::with_capacity(skip_count);
343 let mut pos = data_len;
344 for _ in 0..skip_count {
345 let base = u32::from_le_bytes(raw[pos..pos + 4].try_into().unwrap());
346 let last = u32::from_le_bytes(raw[pos + 4..pos + 8].try_into().unwrap());
347 let offset = u64::from_le_bytes(raw[pos + 8..pos + 16].try_into().unwrap());
348 skip_list.push((base, last, offset));
350 pos += Self::SKIP_ENTRY_SIZE;
351 }
352
353 let data = raw[..data_len].to_vec();
354
355 Ok(Self {
356 skip_list,
357 data,
358 doc_count,
359 })
360 }
361
362 pub fn concatenate_blocks(sources: &[(PositionPostingList, u32)]) -> io::Result<Self> {
364 let mut skip_list = Vec::new();
365 let mut data = Vec::new();
366 let mut total_docs = 0u32;
367
368 for (source, doc_offset) in sources {
369 for block_idx in 0..source.skip_list.len() {
370 let (base, last, src_offset) = source.skip_list[block_idx];
371 let next_offset = if block_idx + 1 < source.skip_list.len() {
372 source.skip_list[block_idx + 1].2 as usize
373 } else {
374 source.data.len()
375 };
376
377 let new_base = base + doc_offset;
378 let new_last = last + doc_offset;
379 let new_offset = data.len() as u64;
380
381 let block_bytes = &source.data[src_offset as usize..next_offset];
383
384 let count = u32::from_le_bytes(block_bytes[0..4].try_into().unwrap());
386 let first_doc = u32::from_le_bytes(block_bytes[4..8].try_into().unwrap());
387
388 data.write_u32::<LittleEndian>(count)?;
390 data.write_u32::<LittleEndian>(first_doc + doc_offset)?;
391 data.extend_from_slice(&block_bytes[8..]);
392
393 skip_list.push((new_base, new_last, new_offset));
394 total_docs += count;
395 }
396 }
397
398 Ok(Self {
399 skip_list,
400 data,
401 doc_count: total_docs,
402 })
403 }
404
405 pub fn concatenate_streaming<W: Write>(
416 sources: &[(&[u8], u32)],
417 writer: &mut W,
418 ) -> io::Result<(u32, usize)> {
419 struct SourceMeta {
421 data_len: usize,
422 skip_count: usize,
423 }
424
425 let mut metas: Vec<SourceMeta> = Vec::with_capacity(sources.len());
426 let mut total_docs = 0u32;
427
428 for (raw, _) in sources {
429 if raw.len() < 16 {
430 continue;
431 }
432 let f = raw.len() - 16;
433 let data_len = u64::from_le_bytes(raw[f..f + 8].try_into().unwrap()) as usize;
434 let skip_count = u32::from_le_bytes(raw[f + 8..f + 12].try_into().unwrap()) as usize;
435 let doc_count = u32::from_le_bytes(raw[f + 12..f + 16].try_into().unwrap());
436 total_docs += doc_count;
437 metas.push(SourceMeta {
438 data_len,
439 skip_count,
440 });
441 }
442
443 let mut out_skip: Vec<u8> = Vec::new();
446 let mut out_skip_count = 0u32;
447 let mut data_written = 0u64;
448 let mut patch_buf = [0u8; 8];
449 let es = Self::SKIP_ENTRY_SIZE;
450
451 for (src_idx, meta) in metas.iter().enumerate() {
452 let (raw, doc_offset) = &sources[src_idx];
453 let skip_base = meta.data_len;
454 let data = &raw[..meta.data_len];
455
456 for i in 0..meta.skip_count {
457 let p = skip_base + i * es;
459 let base = u32::from_le_bytes(raw[p..p + 4].try_into().unwrap());
460 let last = u32::from_le_bytes(raw[p + 4..p + 8].try_into().unwrap());
461 let offset = u64::from_le_bytes(raw[p + 8..p + 16].try_into().unwrap());
462 let length = u32::from_le_bytes(raw[p + 16..p + 20].try_into().unwrap());
463
464 let block = &data[offset as usize..(offset as usize + length as usize)];
465
466 out_skip.extend_from_slice(&(base + doc_offset).to_le_bytes());
468 out_skip.extend_from_slice(&(last + doc_offset).to_le_bytes());
469 out_skip.extend_from_slice(&data_written.to_le_bytes());
470 out_skip.extend_from_slice(&length.to_le_bytes());
471 out_skip_count += 1;
472
473 patch_buf[0..4].copy_from_slice(&block[0..4]);
475 let first_doc = u32::from_le_bytes(block[4..8].try_into().unwrap());
476 patch_buf[4..8].copy_from_slice(&(first_doc + doc_offset).to_le_bytes());
477 writer.write_all(&patch_buf)?;
478 writer.write_all(&block[8..])?;
479
480 data_written += block.len() as u64;
481 }
482 }
483
484 writer.write_all(&out_skip)?;
486
487 writer.write_u64::<LittleEndian>(data_written)?;
488 writer.write_u32::<LittleEndian>(out_skip_count)?;
489 writer.write_u32::<LittleEndian>(total_docs)?;
490
491 let total_bytes = data_written as usize + out_skip.len() + 16;
492 Ok((total_docs, total_bytes))
493 }
494
495 pub fn iter(&self) -> PositionPostingIterator<'_> {
497 PositionPostingIterator::new(self)
498 }
499}
500
501pub struct PositionPostingIterator<'a> {
503 list: &'a PositionPostingList,
504 current_block: usize,
505 position_in_block: usize,
506 block_postings: Vec<PostingWithPositions>,
507 exhausted: bool,
508}
509
510impl<'a> PositionPostingIterator<'a> {
511 pub fn new(list: &'a PositionPostingList) -> Self {
512 let exhausted = list.skip_list.is_empty();
513 let mut iter = Self {
514 list,
515 current_block: 0,
516 position_in_block: 0,
517 block_postings: Vec::new(),
518 exhausted,
519 };
520 if !iter.exhausted {
521 iter.load_block(0);
522 }
523 iter
524 }
525
526 fn load_block(&mut self, block_idx: usize) {
527 if block_idx >= self.list.skip_list.len() {
528 self.exhausted = true;
529 return;
530 }
531
532 self.current_block = block_idx;
533 self.position_in_block = 0;
534
535 let offset = self.list.skip_list[block_idx].2 as usize;
536 let mut reader = &self.list.data[offset..];
537
538 let count = reader.read_u32::<LittleEndian>().unwrap_or(0) as usize;
540 let first_doc = reader.read_u32::<LittleEndian>().unwrap_or(0);
541 self.block_postings.clear();
542 self.block_postings.reserve(count);
543
544 let mut prev_doc_id = first_doc;
545
546 for i in 0..count {
547 let doc_id = if i == 0 {
548 first_doc
549 } else {
550 let delta = read_vint(&mut reader).unwrap_or(0) as u32;
551 prev_doc_id + delta
552 };
553 prev_doc_id = doc_id;
554
555 let num_positions = read_vint(&mut reader).unwrap_or(0) as usize;
556 let mut positions = Vec::with_capacity(num_positions);
557 for _ in 0..num_positions {
558 let pos = read_vint(&mut reader).unwrap_or(0) as u32;
559 positions.push(pos);
560 }
561
562 self.block_postings.push(PostingWithPositions {
563 doc_id,
564 term_freq: num_positions as u32,
565 positions,
566 });
567 }
568 }
569
570 pub fn doc(&self) -> DocId {
571 if self.exhausted || self.position_in_block >= self.block_postings.len() {
572 u32::MAX
573 } else {
574 self.block_postings[self.position_in_block].doc_id
575 }
576 }
577
578 pub fn term_freq(&self) -> u32 {
579 if self.exhausted || self.position_in_block >= self.block_postings.len() {
580 0
581 } else {
582 self.block_postings[self.position_in_block].term_freq
583 }
584 }
585
586 pub fn positions(&self) -> &[u32] {
587 if self.exhausted || self.position_in_block >= self.block_postings.len() {
588 &[]
589 } else {
590 &self.block_postings[self.position_in_block].positions
591 }
592 }
593
594 pub fn advance(&mut self) {
595 if self.exhausted {
596 return;
597 }
598
599 self.position_in_block += 1;
600 if self.position_in_block >= self.block_postings.len() {
601 self.load_block(self.current_block + 1);
602 }
603 }
604
605 pub fn seek(&mut self, target: DocId) {
606 if self.exhausted {
607 return;
608 }
609
610 if let Some((_, last, _)) = self.list.skip_list.get(self.current_block)
612 && target <= *last
613 {
614 while self.position_in_block < self.block_postings.len()
616 && self.block_postings[self.position_in_block].doc_id < target
617 {
618 self.position_in_block += 1;
619 }
620 if self.position_in_block >= self.block_postings.len() {
621 self.load_block(self.current_block + 1);
622 self.seek(target); }
624 return;
625 }
626
627 let block_idx = match self.list.skip_list.binary_search_by(|&(base, last, _)| {
629 if target < base {
630 std::cmp::Ordering::Greater
631 } else if target > last {
632 std::cmp::Ordering::Less
633 } else {
634 std::cmp::Ordering::Equal
635 }
636 }) {
637 Ok(idx) => idx,
638 Err(idx) => idx, };
640
641 if block_idx >= self.list.skip_list.len() {
642 self.exhausted = true;
643 return;
644 }
645
646 self.load_block(block_idx);
647
648 while self.position_in_block < self.block_postings.len()
650 && self.block_postings[self.position_in_block].doc_id < target
651 {
652 self.position_in_block += 1;
653 }
654
655 if self.position_in_block >= self.block_postings.len() {
656 self.load_block(self.current_block + 1);
657 }
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664
665 #[test]
666 fn test_position_encoding() {
667 let pos = encode_position(0, 5);
669 assert_eq!(decode_element_ordinal(pos), 0);
670 assert_eq!(decode_token_position(pos), 5);
671
672 let pos = encode_position(3, 100);
674 assert_eq!(decode_element_ordinal(pos), 3);
675 assert_eq!(decode_token_position(pos), 100);
676
677 let pos = encode_position(MAX_ELEMENT_ORDINAL, MAX_TOKEN_POSITION);
679 assert_eq!(decode_element_ordinal(pos), MAX_ELEMENT_ORDINAL);
680 assert_eq!(decode_token_position(pos), MAX_TOKEN_POSITION);
681 }
682
683 #[test]
684 fn test_position_posting_list_build() {
685 let postings = vec![
687 PostingWithPositions {
688 doc_id: 1,
689 term_freq: 2,
690 positions: vec![encode_position(0, 0), encode_position(0, 2)],
691 },
692 PostingWithPositions {
693 doc_id: 3,
694 term_freq: 1,
695 positions: vec![encode_position(1, 0)],
696 },
697 ];
698
699 let list = PositionPostingList::from_postings(&postings).unwrap();
700 assert_eq!(list.doc_count(), 2);
701
702 let pos = list.get_positions(1).unwrap();
704 assert_eq!(pos.len(), 2);
705
706 let pos = list.get_positions(3).unwrap();
707 assert_eq!(pos.len(), 1);
708
709 assert!(list.get_positions(2).is_none());
711 assert!(list.get_positions(99).is_none());
712 }
713
714 #[test]
715 fn test_serialization_roundtrip() {
716 let postings = vec![
717 PostingWithPositions {
718 doc_id: 1,
719 term_freq: 2,
720 positions: vec![encode_position(0, 0), encode_position(0, 5)],
721 },
722 PostingWithPositions {
723 doc_id: 3,
724 term_freq: 1,
725 positions: vec![encode_position(1, 0)],
726 },
727 PostingWithPositions {
728 doc_id: 5,
729 term_freq: 1,
730 positions: vec![encode_position(0, 10)],
731 },
732 ];
733
734 let list = PositionPostingList::from_postings(&postings).unwrap();
735
736 let mut bytes = Vec::new();
737 list.serialize(&mut bytes).unwrap();
738
739 let deserialized = PositionPostingList::deserialize(&bytes).unwrap();
740
741 assert_eq!(list.doc_count(), deserialized.doc_count());
742
743 let pos = deserialized.get_positions(1).unwrap();
745 assert_eq!(pos, vec![encode_position(0, 0), encode_position(0, 5)]);
746
747 let pos = deserialized.get_positions(3).unwrap();
748 assert_eq!(pos, vec![encode_position(1, 0)]);
749 }
750
751 #[test]
752 fn test_binary_search_many_blocks() {
753 let mut postings = Vec::new();
755 for i in 0..300 {
756 postings.push(PostingWithPositions {
757 doc_id: i * 2, term_freq: 1,
759 positions: vec![encode_position(0, i)],
760 });
761 }
762
763 let list = PositionPostingList::from_postings(&postings).unwrap();
764 assert_eq!(list.doc_count(), 300);
765
766 assert_eq!(list.skip_list.len(), 3);
768
769 let pos = list.get_positions(0).unwrap();
771 assert_eq!(pos, vec![encode_position(0, 0)]);
772
773 let pos = list.get_positions(256).unwrap(); assert_eq!(pos, vec![encode_position(0, 128)]);
775
776 let pos = list.get_positions(598).unwrap(); assert_eq!(pos, vec![encode_position(0, 299)]);
778
779 assert!(list.get_positions(1).is_none());
781 assert!(list.get_positions(257).is_none());
782 }
783
784 #[test]
785 fn test_concatenate_blocks_merge() {
786 let postings1 = vec![
788 PostingWithPositions {
789 doc_id: 0,
790 term_freq: 1,
791 positions: vec![0],
792 },
793 PostingWithPositions {
794 doc_id: 1,
795 term_freq: 1,
796 positions: vec![5],
797 },
798 PostingWithPositions {
799 doc_id: 2,
800 term_freq: 1,
801 positions: vec![10],
802 },
803 ];
804 let list1 = PositionPostingList::from_postings(&postings1).unwrap();
805
806 let postings2 = vec![
807 PostingWithPositions {
808 doc_id: 0,
809 term_freq: 1,
810 positions: vec![100],
811 },
812 PostingWithPositions {
813 doc_id: 1,
814 term_freq: 1,
815 positions: vec![105],
816 },
817 ];
818 let list2 = PositionPostingList::from_postings(&postings2).unwrap();
819
820 let combined = PositionPostingList::concatenate_blocks(&[
822 (list1, 0), (list2, 3), ])
825 .unwrap();
826
827 assert_eq!(combined.doc_count(), 5);
828
829 assert!(combined.get_positions(0).is_some());
831 assert!(combined.get_positions(1).is_some());
832 assert!(combined.get_positions(2).is_some());
833 assert!(combined.get_positions(3).is_some()); assert!(combined.get_positions(4).is_some()); }
836
837 #[test]
838 fn test_iterator() {
839 let postings = vec![
840 PostingWithPositions {
841 doc_id: 1,
842 term_freq: 2,
843 positions: vec![0, 5],
844 },
845 PostingWithPositions {
846 doc_id: 3,
847 term_freq: 1,
848 positions: vec![10],
849 },
850 PostingWithPositions {
851 doc_id: 5,
852 term_freq: 1,
853 positions: vec![15],
854 },
855 ];
856
857 let list = PositionPostingList::from_postings(&postings).unwrap();
858 let mut iter = list.iter();
859
860 assert_eq!(iter.doc(), 1);
861 assert_eq!(iter.positions(), &[0, 5]);
862
863 iter.advance();
864 assert_eq!(iter.doc(), 3);
865
866 iter.seek(5);
867 assert_eq!(iter.doc(), 5);
868 assert_eq!(iter.positions(), &[15]);
869
870 iter.advance();
871 assert_eq!(iter.doc(), u32::MAX); }
873
874 fn build_ppl(entries: &[(u32, Vec<u32>)]) -> PositionPostingList {
876 let postings: Vec<PostingWithPositions> = entries
877 .iter()
878 .map(|(doc_id, positions)| PostingWithPositions {
879 doc_id: *doc_id,
880 term_freq: positions.len() as u32,
881 positions: positions.clone(),
882 })
883 .collect();
884 PositionPostingList::from_postings(&postings).unwrap()
885 }
886
887 fn serialize_ppl(ppl: &PositionPostingList) -> Vec<u8> {
889 let mut buf = Vec::new();
890 ppl.serialize(&mut buf).unwrap();
891 buf
892 }
893
894 fn collect_positions(ppl: &PositionPostingList) -> Vec<(u32, Vec<u32>)> {
896 let mut result = Vec::new();
897 let mut it = ppl.iter();
898 while it.doc() != u32::MAX {
899 result.push((it.doc(), it.positions().to_vec()));
900 it.advance();
901 }
902 result
903 }
904
905 #[test]
906 fn test_concatenate_streaming_matches_blocks() {
907 let seg_a: Vec<(u32, Vec<u32>)> = (0..150)
909 .map(|i| (i * 2, vec![i * 10, i * 10 + 3]))
910 .collect();
911 let seg_b: Vec<(u32, Vec<u32>)> = (0..100).map(|i| (i * 5, vec![i * 7])).collect();
912 let seg_c: Vec<(u32, Vec<u32>)> = (0..80).map(|i| (i * 3, vec![i, i + 1, i + 2])).collect();
913
914 let ppl_a = build_ppl(&seg_a);
915 let ppl_b = build_ppl(&seg_b);
916 let ppl_c = build_ppl(&seg_c);
917
918 let offset_b = 500u32;
919 let offset_c = 1000u32;
920
921 let ref_merged = PositionPostingList::concatenate_blocks(&[
923 (ppl_a.clone(), 0),
924 (ppl_b.clone(), offset_b),
925 (ppl_c.clone(), offset_c),
926 ])
927 .unwrap();
928 let mut ref_buf = Vec::new();
929 ref_merged.serialize(&mut ref_buf).unwrap();
930
931 let bytes_a = serialize_ppl(&ppl_a);
933 let bytes_b = serialize_ppl(&ppl_b);
934 let bytes_c = serialize_ppl(&ppl_c);
935
936 let sources: Vec<(&[u8], u32)> =
937 vec![(&bytes_a, 0), (&bytes_b, offset_b), (&bytes_c, offset_c)];
938 let mut stream_buf = Vec::new();
939 let (doc_count, bytes_written) =
940 PositionPostingList::concatenate_streaming(&sources, &mut stream_buf).unwrap();
941
942 assert_eq!(doc_count, 330);
943 assert_eq!(bytes_written, stream_buf.len());
944
945 let ref_posts = collect_positions(&PositionPostingList::deserialize(&ref_buf).unwrap());
947 let stream_posts =
948 collect_positions(&PositionPostingList::deserialize(&stream_buf).unwrap());
949
950 assert_eq!(ref_posts.len(), stream_posts.len());
951 for (i, (r, s)) in ref_posts.iter().zip(stream_posts.iter()).enumerate() {
952 assert_eq!(r.0, s.0, "doc_id mismatch at {}", i);
953 assert_eq!(r.1, s.1, "positions mismatch at doc {}", r.0);
954 }
955 }
956
957 #[test]
958 fn test_positions_multi_round_merge() {
959 let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..4)
961 .map(|seg| {
962 (0..200)
963 .map(|i| {
964 let pos_count = (i % 3) + 1;
965 let positions: Vec<u32> = (0..pos_count)
966 .map(|p| (seg * 1000 + i * 10 + p) as u32)
967 .collect();
968 (i as u32 * 3, positions)
969 })
970 .collect()
971 })
972 .collect();
973
974 let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
975 let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
976
977 let mut merged_01 = Vec::new();
979 let sources_01: Vec<(&[u8], u32)> = vec![(&serialized[0], 0), (&serialized[1], 600)];
980 let (dc_01, _) =
981 PositionPostingList::concatenate_streaming(&sources_01, &mut merged_01).unwrap();
982 assert_eq!(dc_01, 400);
983
984 let mut merged_23 = Vec::new();
985 let sources_23: Vec<(&[u8], u32)> = vec![(&serialized[2], 0), (&serialized[3], 600)];
986 let (dc_23, _) =
987 PositionPostingList::concatenate_streaming(&sources_23, &mut merged_23).unwrap();
988 assert_eq!(dc_23, 400);
989
990 let mut final_merged = Vec::new();
992 let sources_final: Vec<(&[u8], u32)> = vec![(&merged_01, 0), (&merged_23, 1200)];
993 let (dc_final, _) =
994 PositionPostingList::concatenate_streaming(&sources_final, &mut final_merged).unwrap();
995 assert_eq!(dc_final, 800);
996
997 let final_ppl = PositionPostingList::deserialize(&final_merged).unwrap();
999 let all = collect_positions(&final_ppl);
1000 assert_eq!(all.len(), 800);
1001
1002 assert_eq!(all[0].0, 0);
1005 assert_eq!(all[0].1, vec![0]); assert_eq!(all[200].0, 600);
1009 assert_eq!(all[200].1, vec![1000]); assert_eq!(all[400].0, 1200);
1013 assert_eq!(all[400].1, vec![2000]); let mut it = final_ppl.iter();
1017 it.seek(1200);
1018 assert_eq!(it.doc(), 1200);
1019 assert_eq!(it.positions(), &[2000]);
1020
1021 let pos = final_ppl.get_positions(600).unwrap();
1023 assert_eq!(pos, vec![1000]);
1024 }
1025
1026 #[test]
1027 fn test_positions_large_scale_merge() {
1028 let num_segments = 5usize;
1030 let docs_per_segment = 500usize;
1031
1032 let segments: Vec<Vec<(u32, Vec<u32>)>> = (0..num_segments)
1033 .map(|seg| {
1034 (0..docs_per_segment)
1035 .map(|i| {
1036 let n_pos = (i % 4) + 1;
1037 let positions: Vec<u32> =
1038 (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1039 (i as u32 * 2, positions)
1040 })
1041 .collect()
1042 })
1043 .collect();
1044
1045 let ppls: Vec<PositionPostingList> = segments.iter().map(|s| build_ppl(s)).collect();
1046 let serialized: Vec<Vec<u8>> = ppls.iter().map(serialize_ppl).collect();
1047
1048 let max_doc = (docs_per_segment as u32 - 1) * 2;
1049 let offsets: Vec<u32> = (0..num_segments)
1050 .map(|i| i as u32 * (max_doc + 1))
1051 .collect();
1052
1053 let sources: Vec<(&[u8], u32)> = serialized
1054 .iter()
1055 .zip(offsets.iter())
1056 .map(|(b, o)| (b.as_slice(), *o))
1057 .collect();
1058
1059 let mut merged = Vec::new();
1060 let (doc_count, _) =
1061 PositionPostingList::concatenate_streaming(&sources, &mut merged).unwrap();
1062 assert_eq!(doc_count, (num_segments * docs_per_segment) as u32);
1063
1064 let merged_ppl = PositionPostingList::deserialize(&merged).unwrap();
1066 let all = collect_positions(&merged_ppl);
1067 assert_eq!(all.len(), num_segments * docs_per_segment);
1068
1069 for (seg, &offset) in offsets.iter().enumerate() {
1071 for i in 0..docs_per_segment {
1072 let idx = seg * docs_per_segment + i;
1073 let expected_doc = i as u32 * 2 + offset;
1074 assert_eq!(all[idx].0, expected_doc, "seg={} i={}", seg, i);
1075
1076 let n_pos = (i % 4) + 1;
1077 let expected_positions: Vec<u32> =
1078 (0..n_pos).map(|p| (p * 5 + seg) as u32).collect();
1079 assert_eq!(
1080 all[idx].1, expected_positions,
1081 "positions mismatch seg={} i={}",
1082 seg, i
1083 );
1084 }
1085 }
1086 }
1087
1088 #[test]
1089 fn test_positions_streaming_single_source() {
1090 let entries: Vec<(u32, Vec<u32>)> =
1091 (0..300).map(|i| (i * 4, vec![i * 2, i * 2 + 1])).collect();
1092 let ppl = build_ppl(&entries);
1093 let direct = serialize_ppl(&ppl);
1094
1095 let sources: Vec<(&[u8], u32)> = vec![(&direct, 0)];
1096 let mut streamed = Vec::new();
1097 PositionPostingList::concatenate_streaming(&sources, &mut streamed).unwrap();
1098
1099 let p1 = collect_positions(&PositionPostingList::deserialize(&direct).unwrap());
1100 let p2 = collect_positions(&PositionPostingList::deserialize(&streamed).unwrap());
1101 assert_eq!(p1, p2);
1102 }
1103
1104 #[test]
1105 fn test_positions_edge_single_doc() {
1106 let ppl_a = build_ppl(&[(0, vec![42, 43, 44])]);
1107 let ppl_b = build_ppl(&[(0, vec![100])]);
1108
1109 let merged = PositionPostingList::concatenate_blocks(&[(ppl_a, 0), (ppl_b, 1)]).unwrap();
1110
1111 assert_eq!(merged.doc_count(), 2);
1112 assert_eq!(merged.get_positions(0).unwrap(), vec![42, 43, 44]);
1113 assert_eq!(merged.get_positions(1).unwrap(), vec![100]);
1114 }
1115}