1use super::EncodingError;
34use bytes::{Buf, BufMut, Bytes, BytesMut};
35use std::cmp::{Ordering, Reverse};
36use std::collections::{BinaryHeap, HashSet};
37
38#[derive(Debug, Clone, PartialEq)]
39pub(crate) struct Posting {
40 id: u64,
41 vector: Vec<f32>,
42}
43
44impl Posting {
45 pub(crate) fn new(id: u64, vector: Vec<f32>) -> Self {
46 Self { id, vector }
47 }
48
49 pub(crate) fn id(&self) -> u64 {
50 self.id
51 }
52
53 pub(crate) fn vector(&self) -> &[f32] {
54 self.vector.as_slice()
55 }
56
57 pub(crate) fn unpack(self) -> (u64, Vec<f32>) {
58 (self.id, self.vector)
59 }
60}
61
62pub(crate) type PostingList = Vec<Posting>;
63
64impl From<PostingListValue> for PostingList {
65 fn from(value: PostingListValue) -> Self {
66 let mut seen = HashSet::new();
67 value
68 .postings
69 .into_iter()
70 .filter_map(|posting| {
71 assert!(seen.insert(posting.id()));
72 match posting {
73 PostingUpdate::Append { id, vector } => Some(Posting::new(id, vector)),
74 PostingUpdate::Delete { .. } => None,
75 }
76 })
77 .collect()
78 }
79}
80
81pub(crate) const POSTING_UPDATE_TYPE_APPEND_BYTE: u8 = 0x0;
83
84pub(crate) const POSTING_UPDATE_TYPE_DELETE_BYTE: u8 = 0x1;
86
87#[derive(Debug, Clone, PartialEq)]
92pub enum PostingUpdate {
93 Append {
95 id: u64,
97 vector: Vec<f32>,
99 },
100 Delete {
101 id: u64,
103 },
104}
105
106impl PostingUpdate {
107 pub fn append(id: u64, vector: Vec<f32>) -> Self {
109 Self::Append { id, vector }
110 }
111
112 pub fn delete(id: u64) -> Self {
114 Self::Delete { id }
115 }
116
117 pub fn is_append(&self) -> bool {
119 matches!(self, Self::Append { .. })
120 }
121
122 pub fn is_delete(&self) -> bool {
124 matches!(self, Self::Delete { .. })
125 }
126
127 pub fn id(&self) -> u64 {
129 match self {
130 Self::Append { id, .. } => *id,
131 Self::Delete { id } => *id,
132 }
133 }
134
135 pub fn vector(&self) -> Option<&[f32]> {
137 match self {
138 Self::Append { vector, .. } => Some(vector.as_slice()),
139 Self::Delete { .. } => None,
140 }
141 }
142
143 pub fn encode(&self, buf: &mut BytesMut) {
147 match self {
148 Self::Append { id, vector } => {
149 buf.put_u8(POSTING_UPDATE_TYPE_APPEND_BYTE);
150 buf.put_u64_le(*id);
151 for &val in vector {
152 buf.put_f32_le(val);
153 }
154 }
155 Self::Delete { id } => {
156 buf.put_u8(POSTING_UPDATE_TYPE_DELETE_BYTE);
157 buf.put_u64_le(*id);
158 }
159 }
160 }
161
162 pub fn decode(buf: &mut impl Buf, dimensions: usize) -> Result<Self, EncodingError> {
166 let min_size = 1 + 8; if buf.remaining() < min_size {
168 return Err(EncodingError {
169 message: format!(
170 "Buffer too short for PostingUpdate: expected at least {} bytes, got {}",
171 min_size,
172 buf.remaining()
173 ),
174 });
175 }
176
177 let posting_type = buf.get_u8();
178 let id = buf.get_u64_le();
179
180 if posting_type == POSTING_UPDATE_TYPE_APPEND_BYTE {
181 let vector_size = dimensions * 4;
182 if buf.remaining() < vector_size {
183 return Err(EncodingError {
184 message: format!(
185 "Buffer too short for Append PostingUpdate vector: expected {} bytes, got {}",
186 vector_size,
187 buf.remaining()
188 ),
189 });
190 }
191
192 let mut vector = Vec::with_capacity(dimensions);
193 for _ in 0..dimensions {
194 vector.push(buf.get_f32_le());
195 }
196
197 Ok(PostingUpdate::Append { id, vector })
198 } else if posting_type == POSTING_UPDATE_TYPE_DELETE_BYTE {
199 Ok(PostingUpdate::Delete { id })
200 } else {
201 Err(EncodingError {
202 message: format!("Invalid posting type: 0x{:02x}", posting_type),
203 })
204 }
205 }
206
207 pub fn encoded_size_append(dimensions: usize) -> usize {
209 1 + 8 + (dimensions * 4) }
211
212 pub fn encoded_size_delete() -> usize {
214 1 + 8 }
216
217 pub fn encoded_size(&self) -> usize {
219 match self {
220 PostingUpdate::Append { vector, .. } => 1 + 8 + (vector.len() * 4),
221 PostingUpdate::Delete { .. } => 1 + 8,
222 }
223 }
224}
225
226#[derive(Debug, Clone, PartialEq)]
248pub struct PostingListValue {
249 postings: Vec<PostingUpdate>,
251}
252
253impl PostingListValue {
254 pub fn new() -> Self {
256 Self {
257 postings: Vec::new(),
258 }
259 }
260
261 pub fn from_posting_updates(
269 posting_updates: Vec<PostingUpdate>,
270 ) -> Result<Self, EncodingError> {
271 let mut last_by_id = std::collections::HashMap::new();
275 for (idx, update) in posting_updates.iter().enumerate() {
276 last_by_id.insert(update.id(), idx);
277 }
278 let mut deduped: Vec<PostingUpdate> = last_by_id
279 .into_values()
280 .map(|idx| posting_updates[idx].clone())
281 .collect();
282 deduped.sort_by_key(|p| p.id());
283
284 Ok(Self { postings: deduped })
285 }
286
287 pub fn len(&self) -> usize {
289 self.postings.len()
290 }
291
292 pub fn is_empty(&self) -> bool {
294 self.postings.is_empty()
295 }
296
297 pub fn iter(&self) -> impl Iterator<Item = &PostingUpdate> {
299 self.postings.iter()
300 }
301
302 pub fn encode_to_bytes(&self) -> Bytes {
304 if self.postings.is_empty() {
305 return Bytes::new();
306 }
307
308 let total_size: usize = self.postings.iter().map(|p| p.encoded_size()).sum();
309 let mut buf = BytesMut::with_capacity(total_size);
310
311 for posting in &self.postings {
312 posting.encode(&mut buf);
313 }
314
315 buf.freeze()
316 }
317
318 pub fn decode_from_bytes(buf: &[u8], dimensions: usize) -> Result<Self, EncodingError> {
322 if buf.is_empty() {
323 return Ok(PostingListValue::new());
324 }
325
326 let mut buf = buf;
327 let mut postings = Vec::new();
328
329 while buf.has_remaining() {
330 let posting = PostingUpdate::decode(&mut buf, dimensions)?;
331 postings.push(posting);
332 }
333
334 PostingListValue::from_posting_updates(postings)
335 }
336}
337
338impl Default for PostingListValue {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344pub(crate) fn merge_decoded_posting_lists(postings: Vec<PostingListValue>) -> PostingListValue {
345 struct E(PostingUpdate, usize, std::vec::IntoIter<PostingUpdate>);
346
347 impl PartialEq<Self> for E {
348 fn eq(&self, other: &Self) -> bool {
349 self.0 == other.0 && self.1 == other.1
350 }
351 }
352 impl Eq for E {}
353
354 impl PartialOrd for E {
355 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
356 Some(self.cmp(other))
357 }
358 }
359
360 impl Ord for E {
361 fn cmp(&self, other: &Self) -> Ordering {
362 match self.0.id().cmp(&other.0.id()) {
363 Ordering::Less => Ordering::Less,
364 Ordering::Equal => self.1.cmp(&other.1),
365 Ordering::Greater => Ordering::Greater,
366 }
367 }
368 }
369
370 let mut heap = BinaryHeap::new();
371 for (i, p) in postings.into_iter().enumerate() {
372 let mut p_iter = p.postings.into_iter();
373 let Some(next) = p_iter.next() else {
374 continue;
375 };
376 heap.push(Reverse(E(next, i, p_iter)));
377 }
378 let mut merged = Vec::new();
379 while let Some(Reverse(E(u, i, mut p_iter))) = heap.pop() {
380 let id = u.id();
381 merged.push(u);
382 if let Some(next) = p_iter.next() {
383 heap.push(Reverse(E(next, i, p_iter)));
384 };
385 loop {
386 if heap.peek().map(|e| e.0.0.id() == id).unwrap_or(false) {
387 let Reverse(E(_, i, mut p_iter)) = heap.pop().unwrap();
388 if let Some(next) = p_iter.next() {
389 heap.push(Reverse(E(next, i, p_iter)));
390 }
391 } else {
392 break;
393 }
394 }
395 }
396 PostingListValue::from_posting_updates(merged).expect("unexpected error")
397}
398
399pub(crate) fn merge_posting_list(
409 mut existing: Bytes,
410 mut new_value: Bytes,
411 dimensions: usize,
412) -> Bytes {
413 let vector_size = dimensions * 4;
414 let append_size = 1 + 8 + vector_size; let delete_size = 1 + 8; let peek_entry = |buf: &[u8]| -> Option<(u64, usize)> {
418 if buf.is_empty() {
419 return None;
420 }
421 assert!(buf.len() >= delete_size);
422 let entry_type = buf[0];
423 let id = u64::from_le_bytes([
424 buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8],
425 ]);
426 let entry_size = if entry_type == POSTING_UPDATE_TYPE_APPEND_BYTE {
427 append_size
428 } else {
429 delete_size
430 };
431 Some((id, entry_size))
432 };
433
434 let total_size = existing.len() + new_value.len();
436 let mut result = BytesMut::with_capacity(total_size);
437 let mut last_id = None;
438
439 loop {
440 let existing_entry = peek_entry(&existing);
441 let new_entry = peek_entry(&new_value);
442
443 let id = match (existing_entry, new_entry) {
444 (None, None) => break,
445 (Some((id, size)), None) => {
446 result.put_slice(&existing[..size]);
448 existing.advance(size);
449 id
450 }
451 (None, Some((id, size))) => {
452 result.put_slice(&new_value[..size]);
454 new_value.advance(size);
455 id
456 }
457 (Some((existing_id, existing_size)), Some((new_id, new_size))) => {
458 if existing_id < new_id {
459 result.put_slice(&existing[..existing_size]);
461 existing.advance(existing_size);
462 existing_id
463 } else if new_id < existing_id {
464 result.put_slice(&new_value[..new_size]);
466 new_value.advance(new_size);
467 new_id
468 } else {
469 result.put_slice(&new_value[..new_size]);
471 new_value.advance(new_size);
472 existing.advance(existing_size);
473 new_id
474 }
475 }
476 };
477 assert!(last_id.is_none_or(|last_id| last_id < id));
478 last_id = Some(id);
479 }
480
481 result.freeze()
482}
483
484pub(crate) fn merge_batch_posting_list(
492 existing: Option<Bytes>,
493 operands: &[Bytes],
494 dimensions: usize,
495) -> Bytes {
496 if operands.is_empty() {
498 return existing.unwrap_or_default();
499 }
500 if operands.len() == 1 && existing.is_none() {
501 return operands[0].clone();
502 }
503
504 let vector_size = dimensions * 4;
505 let append_size = 1 + 8 + vector_size;
506 let delete_size = 1 + 8;
507
508 struct Cursor {
511 buf: Bytes,
512 priority: usize,
513 }
514
515 let peek_entry = |buf: &[u8], append_size: usize, delete_size: usize| -> Option<(u64, usize)> {
516 if buf.is_empty() {
517 return None;
518 }
519 assert!(buf.len() >= delete_size);
520 let entry_type = buf[0];
521 let id = u64::from_le_bytes([
522 buf[1], buf[2], buf[3], buf[4], buf[5], buf[6], buf[7], buf[8],
523 ]);
524 let entry_size = if entry_type == POSTING_UPDATE_TYPE_APPEND_BYTE {
525 append_size
526 } else {
527 delete_size
528 };
529 Some((id, entry_size))
530 };
531
532 #[derive(Eq, PartialEq)]
536 struct HeapEntry {
537 id: u64,
538 priority: usize,
539 cursor_idx: usize,
540 entry_size: usize,
541 }
542
543 impl Ord for HeapEntry {
544 fn cmp(&self, other: &Self) -> Ordering {
545 other
547 .id
548 .cmp(&self.id)
549 .then_with(|| self.priority.cmp(&other.priority))
550 }
551 }
552
553 impl PartialOrd for HeapEntry {
554 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
555 Some(self.cmp(other))
556 }
557 }
558
559 let mut cursors: Vec<Cursor> = Vec::with_capacity(1 + operands.len());
560
561 if let Some(ex) = existing
562 && !ex.is_empty()
563 {
564 cursors.push(Cursor {
565 buf: ex,
566 priority: 0,
567 });
568 }
569 for (i, op) in operands.iter().enumerate() {
570 if !op.is_empty() {
571 cursors.push(Cursor {
572 buf: op.clone(),
573 priority: i + 1,
574 });
575 }
576 }
577
578 if cursors.is_empty() {
579 return Bytes::new();
580 }
581
582 let total_size: usize = cursors.iter().map(|c| c.buf.len()).sum();
583 let mut result = BytesMut::with_capacity(total_size);
584 let mut heap = BinaryHeap::with_capacity(cursors.len());
585
586 for (idx, cursor) in cursors.iter().enumerate() {
588 if let Some((id, entry_size)) = peek_entry(&cursor.buf, append_size, delete_size) {
589 heap.push(HeapEntry {
590 id,
591 priority: cursor.priority,
592 cursor_idx: idx,
593 entry_size,
594 });
595 }
596 }
597
598 while let Some(winner) = heap.pop() {
599 let id = winner.id;
600
601 let cursor = &mut cursors[winner.cursor_idx];
603 result.put_slice(&cursor.buf[..winner.entry_size]);
604 cursor.buf.advance(winner.entry_size);
605
606 if let Some((next_id, next_size)) = peek_entry(&cursor.buf, append_size, delete_size) {
608 heap.push(HeapEntry {
609 id: next_id,
610 priority: cursor.priority,
611 cursor_idx: winner.cursor_idx,
612 entry_size: next_size,
613 });
614 }
615
616 while heap.peek().is_some_and(|e| e.id == id) {
618 let loser = heap.pop().unwrap();
619 let cursor = &mut cursors[loser.cursor_idx];
620 cursor.buf.advance(loser.entry_size);
621
622 if let Some((next_id, next_size)) = peek_entry(&cursor.buf, append_size, delete_size) {
624 heap.push(HeapEntry {
625 id: next_id,
626 priority: cursor.priority,
627 cursor_idx: loser.cursor_idx,
628 entry_size: next_size,
629 });
630 }
631 }
632 }
633
634 result.freeze()
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640
641 #[test]
642 fn should_encode_and_decode_empty_posting_list() {
643 let value = PostingListValue::new();
645
646 let encoded = value.encode_to_bytes();
648 let decoded = PostingListValue::decode_from_bytes(&encoded, 3).unwrap();
649
650 assert!(decoded.is_empty());
652 }
653
654 #[test]
655 fn should_encode_and_decode_posting_list_with_appends() {
656 let postings = vec![
658 PostingUpdate::append(1, vec![1.0, 2.0, 3.0]),
659 PostingUpdate::append(2, vec![4.0, 5.0, 6.0]),
660 ];
661 let value = PostingListValue::from_posting_updates(postings)
662 .expect("unexpected error creating posting updates");
663
664 let encoded = value.encode_to_bytes();
666 let decoded = PostingListValue::decode_from_bytes(&encoded, 3).unwrap();
667
668 assert_eq!(decoded.len(), 2);
670 assert!(decoded.postings[0].is_append());
671 assert_eq!(decoded.postings[0].id(), 1);
672 assert_eq!(decoded.postings[0].vector().unwrap(), &[1.0, 2.0, 3.0]);
673 assert!(decoded.postings[1].is_append());
674 assert_eq!(decoded.postings[1].id(), 2);
675 assert_eq!(decoded.postings[1].vector().unwrap(), &[4.0, 5.0, 6.0]);
676 }
677
678 #[test]
679 fn should_encode_and_decode_posting_list_with_deletes() {
680 let postings = vec![PostingUpdate::delete(1), PostingUpdate::delete(2)];
682 let value = PostingListValue::from_posting_updates(postings)
683 .expect("unexpected error creating posting updates");
684
685 let encoded = value.encode_to_bytes();
687 let decoded = PostingListValue::decode_from_bytes(&encoded, 3).unwrap();
688
689 assert_eq!(decoded.len(), 2);
691 assert!(decoded.postings[0].is_delete());
692 assert_eq!(decoded.postings[0].id(), 1);
693 assert!(decoded.postings[1].is_delete());
694 assert_eq!(decoded.postings[1].id(), 2);
695 }
696
697 #[test]
698 fn should_create_posting_update_append() {
699 let update = PostingUpdate::append(42, vec![1.0, 2.0]);
701
702 assert!(update.is_append());
704 assert!(!update.is_delete());
705 assert_eq!(update.id(), 42);
706 assert_eq!(update.vector().unwrap(), &[1.0, 2.0]);
707 }
708
709 #[test]
710 fn should_create_posting_update_delete() {
711 let update = PostingUpdate::delete(42);
713
714 assert!(!update.is_append());
716 assert!(update.is_delete());
717 assert_eq!(update.id(), 42);
718 }
719
720 #[test]
721 fn should_encode_and_decode_single_posting_update() {
722 let update = PostingUpdate::append(12345, vec![1.5, 2.5, 3.5, 4.5]);
724 let mut buf = BytesMut::new();
725 update.encode(&mut buf);
726
727 let mut buf_ref: &[u8] = &buf;
729 let decoded = PostingUpdate::decode(&mut buf_ref, 4).unwrap();
730
731 assert!(decoded.is_append());
733 assert_eq!(decoded.id(), 12345);
734 assert_eq!(decoded.vector().unwrap(), &[1.5, 2.5, 3.5, 4.5]);
735 }
736
737 #[test]
738 fn should_calculate_correct_encoded_size() {
739 assert_eq!(PostingUpdate::encoded_size_append(3), 1 + 8 + 12); assert_eq!(PostingUpdate::encoded_size_append(128), 1 + 8 + 512); assert_eq!(PostingUpdate::encoded_size_delete(), 1 + 8); }
744
745 #[test]
746 fn should_reject_invalid_posting_type() {
747 let mut buf = BytesMut::new();
749 buf.put_u8(0xFF); buf.put_u64_le(1);
751 buf.put_f32_le(1.0);
752
753 let mut buf_ref: &[u8] = &buf;
755 let result = PostingUpdate::decode(&mut buf_ref, 1);
756
757 assert!(result.is_err());
759 assert!(result.unwrap_err().message.contains("Invalid posting type"));
760 }
761
762 #[test]
763 fn should_reject_buffer_too_short() {
764 let buf = [0u8; 5]; let mut buf_ref: &[u8] = &buf;
769 let result = PostingUpdate::decode(&mut buf_ref, 3);
770
771 assert!(result.is_err());
773 assert!(result.unwrap_err().message.contains("Buffer too short"));
774 }
775
776 #[test]
777 fn should_convert_value_to_postings() {
778 let postings = vec![
780 PostingUpdate::append(1, vec![1.0]),
781 PostingUpdate::append(2, vec![2.0]),
782 PostingUpdate::append(3, vec![3.0]),
783 ];
784 let value = PostingListValue::from_posting_updates(postings)
785 .expect("unexpected error creating posting updates");
786
787 let postings: PostingList = value.into();
789
790 assert_eq!(
792 postings,
793 vec![
794 Posting::new(1, vec![1.0]),
795 Posting::new(2, vec![2.0]),
796 Posting::new(3, vec![3.0])
797 ]
798 );
799 }
800
801 #[test]
802 fn should_drop_deleted_posting_when_convert_value_to_postings() {
803 let postings = vec![
805 PostingUpdate::append(1, vec![1.0]),
806 PostingUpdate::delete(99),
807 PostingUpdate::append(2, vec![2.0]),
808 ];
809 let value = PostingListValue::from_posting_updates(postings)
810 .expect("unexpected error creating posting updates");
811
812 let postings: PostingList = value.into();
814
815 assert_eq!(
817 postings,
818 vec![Posting::new(1, vec![1.0]), Posting::new(2, vec![2.0])]
819 );
820 }
821
822 #[test]
823 fn should_merge_posting_list_sorted_by_id() {
824 let existing_postings = vec![
826 PostingUpdate::append(1, vec![1.0, 2.0, 3.0]),
827 PostingUpdate::append(3, vec![4.0, 5.0, 6.0]),
828 ];
829 let existing_value = PostingListValue::from_posting_updates(existing_postings)
830 .expect("unexpected error creating posting updates")
831 .encode_to_bytes();
832
833 let new_postings = vec![PostingUpdate::append(2, vec![7.0, 8.0, 9.0])];
834 let new_value = PostingListValue::from_posting_updates(new_postings)
835 .expect("unexpected error creating posting updates")
836 .encode_to_bytes();
837
838 let merged = merge_posting_list(existing_value, new_value, 3);
840 let decoded = PostingListValue::decode_from_bytes(&merged, 3).unwrap();
841
842 assert_eq!(decoded.len(), 3);
844 assert_eq!(decoded.postings[0].id(), 1);
845 assert_eq!(decoded.postings[1].id(), 2);
846 assert_eq!(decoded.postings[2].id(), 3);
847 }
848
849 #[test]
850 fn should_merge_with_delete_masking_old_append() {
851 let existing_postings = vec![
853 PostingUpdate::append(1, vec![1.0, 2.0]),
854 PostingUpdate::append(2, vec![3.0, 4.0]),
855 ];
856 let existing_value = PostingListValue::from_posting_updates(existing_postings)
857 .expect("unexpected error creating posting updates")
858 .encode_to_bytes();
859
860 let new_postings = vec![PostingUpdate::delete(1)];
861 let new_value = PostingListValue::from_posting_updates(new_postings)
862 .expect("unexpected error creating posting updates")
863 .encode_to_bytes();
864
865 let merged = merge_posting_list(existing_value, new_value, 2);
867 let decoded = PostingListValue::decode_from_bytes(&merged, 2).unwrap();
868
869 assert_eq!(decoded.len(), 2);
871 assert!(decoded.postings[0].is_delete());
872 assert_eq!(decoded.postings[0].id(), 1);
873 assert!(decoded.postings[1].is_append());
874 assert_eq!(decoded.postings[1].id(), 2);
875 }
876
877 #[test]
878 fn should_merge_with_append_masking_old_append() {
879 let existing_postings = vec![
881 PostingUpdate::append(1, vec![1.0, 2.0]),
882 PostingUpdate::append(2, vec![3.0, 4.0]),
883 ];
884 let existing_value = PostingListValue::from_posting_updates(existing_postings)
885 .expect("unexpected error creating posting updates")
886 .encode_to_bytes();
887
888 let new_postings = vec![PostingUpdate::append(1, vec![100.0, 200.0])];
889 let new_value = PostingListValue::from_posting_updates(new_postings)
890 .expect("unexpected error creating posting updates")
891 .encode_to_bytes();
892
893 let merged = merge_posting_list(existing_value, new_value, 2);
895 let decoded = PostingListValue::decode_from_bytes(&merged, 2).unwrap();
896
897 assert_eq!(decoded.len(), 2);
899 assert_eq!(decoded.postings[0].id(), 1);
900 assert_eq!(decoded.postings[0].vector().unwrap(), &[100.0, 200.0]);
901 assert_eq!(decoded.postings[1].id(), 2);
902 assert_eq!(decoded.postings[1].vector().unwrap(), &[3.0, 4.0]);
903 }
904
905 #[test]
906 fn should_merge_empty_existing_with_new() {
907 let existing_value = PostingListValue::new().encode_to_bytes();
909
910 let new_postings = vec![
911 PostingUpdate::append(2, vec![3.0, 4.0]),
912 PostingUpdate::append(1, vec![1.0, 2.0]),
913 ];
914 let new_value = PostingListValue::from_posting_updates(new_postings)
915 .expect("unexpected error creating posting updates")
916 .encode_to_bytes();
917
918 let merged = merge_posting_list(existing_value, new_value, 2);
920 let decoded = PostingListValue::decode_from_bytes(&merged, 2).unwrap();
921
922 assert_eq!(decoded.len(), 2);
924 assert_eq!(decoded.postings[0].id(), 1);
925 assert_eq!(decoded.postings[1].id(), 2);
926 }
927
928 #[test]
929 fn should_merge_existing_with_empty_new() {
930 let existing_postings = vec![
932 PostingUpdate::append(1, vec![1.0, 2.0]),
933 PostingUpdate::append(2, vec![3.0, 4.0]),
934 ];
935 let existing_value = PostingListValue::from_posting_updates(existing_postings)
936 .expect("unexpected error creating posting updates")
937 .encode_to_bytes();
938
939 let new_value = PostingListValue::new().encode_to_bytes();
940
941 let merged = merge_posting_list(existing_value, new_value, 2);
943 let decoded = PostingListValue::decode_from_bytes(&merged, 2).unwrap();
944
945 assert_eq!(decoded.len(), 2);
947 assert_eq!(decoded.postings[0].id(), 1);
948 assert_eq!(decoded.postings[1].id(), 2);
949 }
950
951 #[test]
952 fn should_merge_both_empty() {
953 let existing_value = PostingListValue::new().encode_to_bytes();
955 let new_value = PostingListValue::new().encode_to_bytes();
956
957 let merged = merge_posting_list(existing_value, new_value, 2);
959 let decoded = PostingListValue::decode_from_bytes(&merged, 2).unwrap();
960
961 assert!(decoded.is_empty());
963 }
964
965 #[test]
966 fn should_merge_interleaved_ids_in_sorted_order() {
967 let existing_postings = vec![
969 PostingUpdate::append(1, vec![1.0]),
970 PostingUpdate::append(3, vec![3.0]),
971 PostingUpdate::append(5, vec![5.0]),
972 ];
973 let existing_value = PostingListValue::from_posting_updates(existing_postings)
974 .expect("unexpected error creating posting updates")
975 .encode_to_bytes();
976
977 let new_postings = vec![
978 PostingUpdate::append(2, vec![2.0]),
979 PostingUpdate::append(4, vec![4.0]),
980 ];
981 let new_value = PostingListValue::from_posting_updates(new_postings)
982 .expect("unexpected error creating posting updates")
983 .encode_to_bytes();
984
985 let merged = merge_posting_list(existing_value, new_value, 1);
987 let decoded = PostingListValue::decode_from_bytes(&merged, 1).unwrap();
988
989 assert_eq!(decoded.len(), 5);
991 assert_eq!(decoded.postings[0].id(), 1);
992 assert_eq!(decoded.postings[1].id(), 2);
993 assert_eq!(decoded.postings[2].id(), 3);
994 assert_eq!(decoded.postings[3].id(), 4);
995 assert_eq!(decoded.postings[4].id(), 5);
996 }
997
998 #[test]
999 fn should_sort_postings_by_id_in_from_posting_updates() {
1000 let postings = vec![
1002 PostingUpdate::append(5, vec![5.0]),
1003 PostingUpdate::append(1, vec![1.0]),
1004 PostingUpdate::append(3, vec![3.0]),
1005 PostingUpdate::append(2, vec![2.0]),
1006 PostingUpdate::append(4, vec![4.0]),
1007 ];
1008
1009 let value = PostingListValue::from_posting_updates(postings)
1011 .expect("unexpected error creating posting updates");
1012
1013 assert_eq!(value.len(), 5);
1015 assert_eq!(value.postings[0].id(), 1);
1016 assert_eq!(value.postings[1].id(), 2);
1017 assert_eq!(value.postings[2].id(), 3);
1018 assert_eq!(value.postings[3].id(), 4);
1019 assert_eq!(value.postings[4].id(), 5);
1020 }
1021
1022 #[test]
1023 fn should_serialize_in_id_order() {
1024 let postings = vec![
1026 PostingUpdate::append(3, vec![3.0, 3.0]),
1027 PostingUpdate::append(1, vec![1.0, 1.0]),
1028 PostingUpdate::append(2, vec![2.0, 2.0]),
1029 ];
1030 let value = PostingListValue::from_posting_updates(postings)
1031 .expect("unexpected error creating posting updates");
1032
1033 let encoded = value.encode_to_bytes();
1035 let decoded = PostingListValue::decode_from_bytes(&encoded, 2).unwrap();
1036
1037 assert_eq!(decoded.len(), 3);
1039 assert_eq!(decoded.postings[0].id(), 1);
1040 assert_eq!(decoded.postings[1].id(), 2);
1041 assert_eq!(decoded.postings[2].id(), 3);
1042 }
1043
1044 #[test]
1045 fn should_maintain_id_order_after_merge() {
1046 let existing_postings = vec![
1048 PostingUpdate::append(10, vec![10.0]),
1049 PostingUpdate::append(30, vec![30.0]),
1050 PostingUpdate::append(50, vec![50.0]),
1051 ];
1052 let existing_value = PostingListValue::from_posting_updates(existing_postings)
1053 .expect("unexpected error creating posting updates")
1054 .encode_to_bytes();
1055
1056 let new_postings = vec![
1057 PostingUpdate::append(20, vec![20.0]),
1058 PostingUpdate::append(30, vec![300.0]), PostingUpdate::append(40, vec![40.0]),
1060 ];
1061 let new_value = PostingListValue::from_posting_updates(new_postings)
1062 .expect("unexpected error creating posting updates")
1063 .encode_to_bytes();
1064
1065 let merged = merge_posting_list(existing_value, new_value, 1);
1067 let decoded = PostingListValue::decode_from_bytes(&merged, 1).unwrap();
1068
1069 assert_eq!(decoded.len(), 5);
1071 assert_eq!(decoded.postings[0].id(), 10);
1072 assert_eq!(decoded.postings[1].id(), 20);
1073 assert_eq!(decoded.postings[2].id(), 30);
1074 assert_eq!(decoded.postings[2].vector().unwrap(), &[300.0]); assert_eq!(decoded.postings[3].id(), 40);
1076 assert_eq!(decoded.postings[4].id(), 50);
1077 }
1078
1079 #[test]
1080 fn should_merge_decoded_posting_lists_from_all_operands() {
1081 let p0 = PostingListValue::from_posting_updates(vec![
1083 PostingUpdate::append(1, vec![1.0]),
1084 PostingUpdate::append(2, vec![2.0]),
1085 ])
1086 .unwrap();
1087 let p1 = PostingListValue::from_posting_updates(vec![
1088 PostingUpdate::append(3, vec![3.0]),
1089 PostingUpdate::append(4, vec![4.0]),
1090 ])
1091 .unwrap();
1092 let p2 = PostingListValue::from_posting_updates(vec![PostingUpdate::append(5, vec![5.0])])
1093 .unwrap();
1094
1095 let merged = merge_decoded_posting_lists(vec![p0, p1, p2]);
1097
1098 assert_eq!(merged.len(), 5);
1100 assert_eq!(merged.postings[0].id(), 1);
1101 assert_eq!(merged.postings[1].id(), 2);
1102 assert_eq!(merged.postings[2].id(), 3);
1103 assert_eq!(merged.postings[3].id(), 4);
1104 assert_eq!(merged.postings[4].id(), 5);
1105 }
1106
1107 #[test]
1108 fn should_merge_decoded_posting_lists_newer_wins_on_duplicate_key() {
1109 let p0 =
1111 PostingListValue::from_posting_updates(vec![PostingUpdate::append(2, vec![200.0])])
1112 .unwrap();
1113 let p1 = PostingListValue::from_posting_updates(vec![
1114 PostingUpdate::append(1, vec![1.0]),
1115 PostingUpdate::append(2, vec![2.0]),
1116 PostingUpdate::append(3, vec![3.0]),
1117 ])
1118 .unwrap();
1119
1120 let merged = merge_decoded_posting_lists(vec![p0, p1]);
1122
1123 assert_eq!(merged.len(), 3);
1125 assert_eq!(merged.postings[0].id(), 1);
1126 assert_eq!(merged.postings[1].id(), 2);
1127 assert_eq!(merged.postings[1].vector().unwrap(), &[200.0]);
1128 assert_eq!(merged.postings[2].id(), 3);
1129 }
1130
1131 #[test]
1132 fn should_merge_decoded_posting_lists_newer_delete_wins_over_older_append() {
1133 let p0 = PostingListValue::from_posting_updates(vec![PostingUpdate::delete(2)]).unwrap();
1135 let p1 = PostingListValue::from_posting_updates(vec![
1136 PostingUpdate::append(1, vec![1.0]),
1137 PostingUpdate::append(2, vec![2.0]),
1138 PostingUpdate::append(3, vec![3.0]),
1139 ])
1140 .unwrap();
1141
1142 let merged = merge_decoded_posting_lists(vec![p0, p1]);
1144
1145 assert_eq!(merged.len(), 3);
1147 assert_eq!(merged.postings[0].id(), 1);
1148 assert!(merged.postings[0].is_append());
1149 assert_eq!(merged.postings[1].id(), 2);
1150 assert!(merged.postings[1].is_delete());
1151 assert_eq!(merged.postings[2].id(), 3);
1152 assert!(merged.postings[2].is_append());
1153 }
1154
1155 #[test]
1156 fn should_merge_decoded_posting_lists_three_way_duplicate() {
1157 let p0 =
1159 PostingListValue::from_posting_updates(vec![PostingUpdate::append(5, vec![500.0])])
1160 .unwrap();
1161 let p1 = PostingListValue::from_posting_updates(vec![PostingUpdate::append(5, vec![50.0])])
1162 .unwrap();
1163 let p2 = PostingListValue::from_posting_updates(vec![PostingUpdate::append(5, vec![5.0])])
1164 .unwrap();
1165
1166 let merged = merge_decoded_posting_lists(vec![p0, p1, p2]);
1168
1169 assert_eq!(merged.len(), 1);
1171 assert_eq!(merged.postings[0].id(), 5);
1172 assert_eq!(merged.postings[0].vector().unwrap(), &[500.0]);
1173 }
1174
1175 #[test]
1176 fn should_merge_decoded_posting_lists_with_empty_operand() {
1177 let p0 = PostingListValue::new();
1179 let p1 = PostingListValue::from_posting_updates(vec![
1180 PostingUpdate::append(1, vec![1.0]),
1181 PostingUpdate::append(2, vec![2.0]),
1182 ])
1183 .unwrap();
1184
1185 let merged = merge_decoded_posting_lists(vec![p0, p1]);
1187
1188 assert_eq!(merged.len(), 2);
1190 assert_eq!(merged.postings[0].id(), 1);
1191 assert_eq!(merged.postings[1].id(), 2);
1192 }
1193
1194 #[test]
1195 fn should_convert_to_posting_list_in_id_order() {
1196 let postings = vec![
1198 PostingUpdate::append(5, vec![5.0]),
1199 PostingUpdate::delete(3),
1200 PostingUpdate::append(1, vec![1.0]),
1201 PostingUpdate::append(4, vec![4.0]),
1202 PostingUpdate::append(2, vec![2.0]),
1203 ];
1204 let value = PostingListValue::from_posting_updates(postings)
1205 .expect("unexpected error creating posting updates");
1206
1207 let posting_list: PostingList = value.into();
1209
1210 assert_eq!(posting_list.len(), 4);
1212 assert_eq!(posting_list[0].id(), 1);
1213 assert_eq!(posting_list[1].id(), 2);
1214 assert_eq!(posting_list[2].id(), 4);
1215 assert_eq!(posting_list[3].id(), 5);
1216 }
1217
1218 #[test]
1219 fn should_merge_batch_posting_list_newer_operand_wins() {
1220 let op0 = PostingListValue::from_posting_updates(vec![
1222 PostingUpdate::append(1, vec![10.0]),
1223 PostingUpdate::append(2, vec![20.0]),
1224 ])
1225 .unwrap()
1226 .encode_to_bytes();
1227 let op1 = PostingListValue::from_posting_updates(vec![
1228 PostingUpdate::append(2, vec![200.0]),
1229 PostingUpdate::append(3, vec![30.0]),
1230 ])
1231 .unwrap()
1232 .encode_to_bytes();
1233 let op2 = PostingListValue::from_posting_updates(vec![
1234 PostingUpdate::append(2, vec![2000.0]),
1235 PostingUpdate::append(4, vec![40.0]),
1236 ])
1237 .unwrap()
1238 .encode_to_bytes();
1239
1240 let merged = merge_batch_posting_list(None, &[op0, op1, op2], 1);
1242 let decoded = PostingListValue::decode_from_bytes(&merged, 1).unwrap();
1243
1244 assert_eq!(decoded.len(), 4);
1246 assert_eq!(decoded.postings[0].id(), 1);
1247 assert_eq!(decoded.postings[0].vector().unwrap(), &[10.0]);
1248 assert_eq!(decoded.postings[1].id(), 2);
1249 assert_eq!(decoded.postings[1].vector().unwrap(), &[2000.0]);
1250 assert_eq!(decoded.postings[2].id(), 3);
1251 assert_eq!(decoded.postings[2].vector().unwrap(), &[30.0]);
1252 assert_eq!(decoded.postings[3].id(), 4);
1253 assert_eq!(decoded.postings[3].vector().unwrap(), &[40.0]);
1254 }
1255
1256 #[test]
1257 fn should_merge_batch_posting_list_with_existing_value() {
1258 let existing = PostingListValue::from_posting_updates(vec![
1260 PostingUpdate::append(1, vec![1.0, 2.0]),
1261 PostingUpdate::append(3, vec![3.0, 4.0]),
1262 ])
1263 .unwrap()
1264 .encode_to_bytes();
1265 let op0 =
1266 PostingListValue::from_posting_updates(vec![PostingUpdate::append(2, vec![5.0, 6.0])])
1267 .unwrap()
1268 .encode_to_bytes();
1269 let op1 =
1270 PostingListValue::from_posting_updates(vec![PostingUpdate::append(4, vec![7.0, 8.0])])
1271 .unwrap()
1272 .encode_to_bytes();
1273
1274 let merged = merge_batch_posting_list(Some(existing), &[op0, op1], 2);
1276 let decoded = PostingListValue::decode_from_bytes(&merged, 2).unwrap();
1277
1278 assert_eq!(decoded.len(), 4);
1280 assert_eq!(decoded.postings[0].id(), 1);
1281 assert_eq!(decoded.postings[1].id(), 2);
1282 assert_eq!(decoded.postings[2].id(), 3);
1283 assert_eq!(decoded.postings[3].id(), 4);
1284 }
1285
1286 #[test]
1287 fn should_merge_batch_posting_list_single_operand_no_existing() {
1288 let op = PostingListValue::from_posting_updates(vec![
1290 PostingUpdate::append(1, vec![1.0]),
1291 PostingUpdate::append(2, vec![2.0]),
1292 ])
1293 .unwrap()
1294 .encode_to_bytes();
1295 let original = op.clone();
1296
1297 let merged = merge_batch_posting_list(None, &[op], 1);
1299
1300 assert_eq!(merged, original);
1302 }
1303
1304 #[test]
1305 fn should_merge_batch_posting_list_no_operands_returns_existing() {
1306 let existing = PostingListValue::from_posting_updates(vec![
1308 PostingUpdate::append(1, vec![1.0]),
1309 PostingUpdate::append(2, vec![2.0]),
1310 ])
1311 .unwrap()
1312 .encode_to_bytes();
1313 let original = existing.clone();
1314
1315 let merged = merge_batch_posting_list(Some(existing), &[], 1);
1317
1318 assert_eq!(merged, original);
1320 }
1321
1322 #[test]
1323 fn should_merge_batch_posting_list_delete_in_newer_wins() {
1324 let op0 = PostingListValue::from_posting_updates(vec![
1326 PostingUpdate::append(1, vec![1.0]),
1327 PostingUpdate::append(2, vec![2.0]),
1328 PostingUpdate::append(3, vec![3.0]),
1329 ])
1330 .unwrap()
1331 .encode_to_bytes();
1332 let op1 = PostingListValue::from_posting_updates(vec![PostingUpdate::delete(2)])
1333 .unwrap()
1334 .encode_to_bytes();
1335
1336 let merged = merge_batch_posting_list(None, &[op0, op1], 1);
1338 let decoded = PostingListValue::decode_from_bytes(&merged, 1).unwrap();
1339
1340 assert_eq!(decoded.len(), 3);
1342 assert_eq!(decoded.postings[0].id(), 1);
1343 assert!(decoded.postings[0].is_append());
1344 assert_eq!(decoded.postings[1].id(), 2);
1345 assert!(decoded.postings[1].is_delete());
1346 assert_eq!(decoded.postings[2].id(), 3);
1347 assert!(decoded.postings[2].is_append());
1348 }
1349}