1use std::{
24 fmt,
25 sync::{
26 atomic::{AtomicU32, AtomicUsize, Ordering},
27 Arc,
28 },
29};
30
31use dashmap::{mapref::entry::Entry, DashMap};
32use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
33
34pub const XXH3_SEED: u64 = 1337;
36
37const INDEX_SHARD_COUNT: usize = 1024;
41
42const WORKER_SHARD_COUNT: usize = 8;
45
46const MAX_WORKERS: usize = 2048;
49
50#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
54pub struct ContentHash(pub u64);
55
56#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
60pub struct SequenceHash(pub u64);
61
62impl From<i64> for SequenceHash {
63 fn from(value: i64) -> Self {
64 Self(value as u64)
65 }
66}
67
68impl From<u64> for SequenceHash {
69 fn from(value: u64) -> Self {
70 Self(value)
71 }
72}
73
74pub type WorkerId = u32;
78
79#[derive(Debug, Clone, Copy)]
81pub struct StoredBlock {
82 pub seq_hash: SequenceHash,
84 pub content_hash: ContentHash,
86}
87
88#[derive(Debug)]
90pub enum ApplyError {
91 WorkerNotTracked,
93 ParentBlockNotFound,
95}
96
97impl fmt::Display for ApplyError {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 match self {
100 Self::WorkerNotTracked => write!(f, "worker not tracked in index"),
101 Self::ParentBlockNotFound => write!(f, "parent block hash not found for worker"),
102 }
103 }
104}
105
106impl std::error::Error for ApplyError {}
107
108#[derive(Debug, Default)]
113pub struct OverlapScores {
114 pub scores: FxHashMap<u32, u32>,
116 pub tree_sizes: FxHashMap<u32, usize>,
118}
119
120pub fn compute_content_hash(token_ids: &[u32]) -> ContentHash {
123 use std::hash::Hasher;
124 let mut hasher = xxhash_rust::xxh3::Xxh3::with_seed(XXH3_SEED);
125 for &t in token_ids {
126 hasher.write(&t.to_le_bytes());
127 }
128 ContentHash(hasher.finish())
129}
130
131pub fn compute_request_content_hashes(tokens: &[u32], block_size: usize) -> Vec<ContentHash> {
142 if block_size == 0 {
143 tracing::warn!("compute_request_content_hashes called with block_size=0, returning empty");
144 return Vec::new();
145 }
146 tokens
147 .chunks(block_size)
148 .filter(|chunk| chunk.len() == block_size)
149 .map(compute_content_hash)
150 .collect()
151}
152
153#[derive(Debug, Clone)]
162enum SeqEntry {
163 Single(SequenceHash, FxHashSet<u32>),
165 Multi(FxHashMap<SequenceHash, FxHashSet<u32>>),
167}
168
169impl SeqEntry {
170 fn new(seq_hash: SequenceHash, worker_id: u32) -> Self {
171 let mut workers = FxHashSet::default();
172 workers.insert(worker_id);
173 Self::Single(seq_hash, workers)
174 }
175
176 fn insert(&mut self, seq_hash: SequenceHash, worker_id: u32) {
178 match self {
179 Self::Single(existing_hash, workers) if *existing_hash == seq_hash => {
180 workers.insert(worker_id);
181 }
182 Self::Single(existing_hash, existing_workers) => {
183 let mut map = FxHashMap::with_capacity_and_hasher(2, FxBuildHasher);
184 map.insert(*existing_hash, std::mem::take(existing_workers));
185 map.entry(seq_hash).or_default().insert(worker_id);
186 *self = Self::Multi(map);
187 }
188 Self::Multi(map) => {
189 map.entry(seq_hash).or_default().insert(worker_id);
190 }
191 }
192 }
193
194 fn remove(&mut self, seq_hash: SequenceHash, worker_id: u32) -> bool {
197 match self {
198 Self::Single(existing_hash, workers) if *existing_hash == seq_hash => {
199 workers.remove(&worker_id);
200 workers.is_empty()
201 }
202 Self::Single(_, _) => false,
203 Self::Multi(map) => {
204 if let Some(workers) = map.get_mut(&seq_hash) {
205 workers.remove(&worker_id);
206 if workers.is_empty() {
207 map.remove(&seq_hash);
208 }
209 }
210 map.is_empty()
211 }
212 }
213 }
214
215 fn get(&self, seq_hash: SequenceHash) -> Option<&FxHashSet<u32>> {
217 match self {
218 Self::Single(existing_hash, workers) if *existing_hash == seq_hash => Some(workers),
219 Self::Single(_, _) => None,
220 Self::Multi(map) => map.get(&seq_hash),
221 }
222 }
223
224 #[inline]
230 fn workers_if_single(&self) -> Option<&FxHashSet<u32>> {
231 match self {
232 Self::Single(_, workers) => Some(workers),
233 Self::Multi(_) => None,
234 }
235 }
236}
237
238pub type WorkerBlockMap = FxHashMap<SequenceHash, (usize, ContentHash, SequenceHash)>;
247
248pub struct PositionalIndexer {
258 index: DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
261 tree_sizes: Vec<AtomicUsize>,
265 worker_to_id: DashMap<Arc<str>, u32, FxBuildHasher>,
267 next_worker_id: AtomicU32,
269 jump_size: usize,
271}
272
273impl PositionalIndexer {
274 pub fn new(jump_size: usize) -> Self {
280 assert!(jump_size > 0, "jump_size must be greater than 0");
281 Self {
282 index: DashMap::with_hasher_and_shard_amount(FxBuildHasher, INDEX_SHARD_COUNT),
283 tree_sizes: (0..MAX_WORKERS).map(|_| AtomicUsize::new(0)).collect(),
284 worker_to_id: DashMap::with_hasher_and_shard_amount(FxBuildHasher, WORKER_SHARD_COUNT),
285 next_worker_id: AtomicU32::new(0),
286 jump_size,
287 }
288 }
289
290 pub fn worker_id(&self, worker: &str) -> Option<u32> {
295 self.worker_to_id.get(worker).map(|entry| *entry.value())
296 }
297
298 pub fn apply_stored(
306 &self,
307 worker_id: u32,
308 blocks: &[StoredBlock],
309 parent_seq_hash: Option<SequenceHash>,
310 worker_blocks: &mut WorkerBlockMap,
311 ) -> Result<(), ApplyError> {
312 if blocks.is_empty() {
313 return Ok(());
314 }
315
316 let (start_pos, parent_prefix) = match parent_seq_hash {
318 Some(parent_hash) => {
319 if worker_blocks.is_empty() {
320 return Err(ApplyError::WorkerNotTracked);
321 }
322 let Some(&(parent_pos, _, parent_pfx)) = worker_blocks.get(&parent_hash) else {
323 return Err(ApplyError::ParentBlockNotFound);
324 };
325 (parent_pos + 1, Some(parent_pfx))
326 }
327 None => (0, None),
328 };
329
330 let mut prev_prefix = parent_prefix;
331 let mut num_new_blocks = 0usize;
332 for (i, block) in blocks.iter().enumerate() {
333 let position = start_pos + i;
334 let content_hash = block.content_hash;
335
336 let prefix_hash = match prev_prefix {
339 Some(prev) => SequenceHash(Self::compute_next_seq_hash(prev.0, content_hash.0)),
340 None => SequenceHash(content_hash.0),
342 };
343
344 self.index
345 .entry((position, content_hash))
346 .and_modify(|entry| entry.insert(prefix_hash, worker_id))
347 .or_insert_with(|| SeqEntry::new(prefix_hash, worker_id));
348
349 if worker_blocks
352 .insert(block.seq_hash, (position, content_hash, prefix_hash))
353 .is_none()
354 {
355 num_new_blocks += 1;
356 }
357 prev_prefix = Some(prefix_hash);
358 }
359
360 if num_new_blocks > 0 {
362 self.tree_sizes[worker_id as usize].fetch_add(num_new_blocks, Ordering::Relaxed);
363 }
364
365 Ok(())
366 }
367
368 pub fn apply_removed(
381 &self,
382 worker_id: u32,
383 seq_hashes: &[SequenceHash],
384 worker_blocks: &mut WorkerBlockMap,
385 ) {
386 let mut num_removed = 0usize;
387 for &seq_hash in seq_hashes {
388 let Some((position, content_hash, prefix_hash)) = worker_blocks.remove(&seq_hash)
389 else {
390 continue;
391 };
392
393 if let Entry::Occupied(mut occupied) = self.index.entry((position, content_hash)) {
394 if occupied.get_mut().remove(prefix_hash, worker_id) {
395 occupied.remove();
396 }
397 }
398 num_removed += 1;
399 }
400
401 if num_removed > 0 {
402 self.tree_sizes[worker_id as usize].fetch_sub(num_removed, Ordering::Relaxed);
403 }
404 }
405
406 pub fn apply_cleared(&self, worker_id: u32, worker_blocks: &mut WorkerBlockMap) {
411 let drained = std::mem::take(worker_blocks);
412 for &(position, content_hash, prefix_hash) in drained.values() {
413 if let Entry::Occupied(mut occ) = self.index.entry((position, content_hash)) {
414 if occ.get_mut().remove(prefix_hash, worker_id) {
415 occ.remove();
416 }
417 }
418 }
419 self.tree_sizes[worker_id as usize].store(0, Ordering::Relaxed);
420 }
421
422 pub fn remove_worker(&self, worker_id: u32, worker_blocks: WorkerBlockMap) {
427 for &(position, content_hash, prefix_hash) in worker_blocks.values() {
428 if let Entry::Occupied(mut occ) = self.index.entry((position, content_hash)) {
429 if occ.get_mut().remove(prefix_hash, worker_id) {
430 occ.remove();
431 }
432 }
433 }
434 self.tree_sizes[worker_id as usize].store(0, Ordering::Relaxed);
435 }
436
437 pub fn current_size(&self) -> usize {
439 let n = self.next_worker_id.load(Ordering::Relaxed) as usize;
440 self.tree_sizes[..n]
441 .iter()
442 .map(|size| size.load(Ordering::Relaxed))
443 .sum()
444 }
445
446 pub fn find_matches(&self, content_hashes: &[ContentHash], early_exit: bool) -> OverlapScores {
462 self.jump_search_matches(content_hashes, early_exit)
463 }
464
465 #[inline]
477 fn compute_next_seq_hash(prev_seq_hash: u64, current_content_hash: u64) -> u64 {
478 let mut bytes = [0u8; 16];
479 bytes[..8].copy_from_slice(&prev_seq_hash.to_le_bytes());
480 bytes[8..].copy_from_slice(¤t_content_hash.to_le_bytes());
481 xxhash_rust::xxh3::xxh3_64_with_seed(&bytes, XXH3_SEED)
482 }
483
484 #[inline]
486 fn ensure_seq_hash_computed(
487 seq_hashes: &mut Vec<SequenceHash>,
488 target_pos: usize,
489 sequence: &[ContentHash],
490 ) {
491 while seq_hashes.len() <= target_pos {
492 let pos = seq_hashes.len();
493 if pos == 0 {
494 seq_hashes.push(SequenceHash(sequence[0].0));
495 } else {
496 let prev = seq_hashes[pos - 1].0;
497 let current = sequence[pos].0;
498 seq_hashes.push(SequenceHash(Self::compute_next_seq_hash(prev, current)));
499 }
500 }
501 }
502
503 pub fn intern_worker(&self, worker: &str) -> u32 {
510 if let Some(entry) = self.worker_to_id.get(worker) {
512 return *entry.value();
513 }
514 let id = *self
516 .worker_to_id
517 .entry(Arc::from(worker))
518 .or_insert_with(|| self.next_worker_id.fetch_add(1, Ordering::Relaxed))
519 .value();
520 assert!(
521 (id as usize) < MAX_WORKERS,
522 "worker count {id} exceeds MAX_WORKERS ({MAX_WORKERS})"
523 );
524 id
525 }
526
527 fn get_workers_lazy(
535 index: &DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
536 position: usize,
537 content_hash: ContentHash,
538 seq_hashes: &mut Vec<SequenceHash>,
539 sequence: &[ContentHash],
540 ) -> Option<Vec<u32>> {
541 let entry = index.get(&(position, content_hash))?;
542 if let Some(workers) = entry.value().workers_if_single() {
543 return Some(workers.iter().copied().collect());
544 }
545 Self::ensure_seq_hash_computed(seq_hashes, position, sequence);
547 entry
548 .value()
549 .get(seq_hashes[position])
550 .map(|workers| workers.iter().copied().collect())
551 }
552
553 fn count_workers_at(
556 index: &DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
557 position: usize,
558 content_hash: ContentHash,
559 seq_hashes: &mut Vec<SequenceHash>,
560 sequence: &[ContentHash],
561 ) -> usize {
562 let Some(entry) = index.get(&(position, content_hash)) else {
563 return 0;
564 };
565 if let Some(workers) = entry.value().workers_if_single() {
566 return workers.len();
567 }
568 Self::ensure_seq_hash_computed(seq_hashes, position, sequence);
570 entry
571 .get(seq_hashes[position])
572 .map(|workers| workers.len())
573 .unwrap_or(0)
574 }
575
576 #[expect(clippy::too_many_arguments)]
582 fn linear_scan_drain(
583 index: &DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
584 sequence: &[ContentHash],
585 seq_hashes: &mut Vec<SequenceHash>,
586 active: &mut Vec<u32>,
587 internal_scores: &mut FxHashMap<u32, u32>,
588 lo: usize,
589 hi: usize,
590 early_exit: bool,
591 ) {
592 for (offset, &content_hash) in sequence[lo..hi].iter().enumerate() {
593 if active.is_empty() {
594 break;
595 }
596 let pos = lo + offset;
597
598 let Some(entry) = index.get(&(pos, content_hash)) else {
599 for &w in active.iter() {
600 internal_scores.insert(w, pos as u32);
601 }
602 active.clear();
603 break;
604 };
605
606 if let Some(workers) = entry.value().workers_if_single() {
608 if workers.len() < active.len() {
612 let mut i = 0;
613 while i < active.len() {
614 if workers.contains(&active[i]) {
615 i += 1;
616 } else {
617 internal_scores.insert(active[i], pos as u32);
618 active.swap_remove(i);
619 }
620 }
621 }
622 if early_exit && !active.is_empty() {
623 break;
624 }
625 continue;
626 }
627
628 Self::ensure_seq_hash_computed(seq_hashes, pos, sequence);
630 let seq_hash = seq_hashes[pos];
631
632 let Some(workers) = entry.get(seq_hash) else {
633 for &w in active.iter() {
634 internal_scores.insert(w, pos as u32);
635 }
636 active.clear();
637 break;
638 };
639
640 if workers.len() < active.len() {
642 let mut i = 0;
643 while i < active.len() {
644 if workers.contains(&active[i]) {
645 i += 1;
646 } else {
647 internal_scores.insert(active[i], pos as u32);
648 active.swap_remove(i);
649 }
650 }
651 }
652
653 if early_exit && !active.is_empty() {
654 break;
655 }
656 }
657 }
658
659 fn jump_search_matches(
660 &self,
661 content_hashes: &[ContentHash],
662 early_exit: bool,
663 ) -> OverlapScores {
664 let mut scores = OverlapScores::default();
665
666 if content_hashes.is_empty() {
667 return scores;
668 }
669
670 let mut seq_hashes = Vec::with_capacity(content_hashes.len());
671
672 let Some(initial_workers) = Self::get_workers_lazy(
673 &self.index,
674 0,
675 content_hashes[0],
676 &mut seq_hashes,
677 content_hashes,
678 ) else {
679 return scores;
680 };
681
682 let mut active = initial_workers;
683 if active.is_empty() {
684 return scores;
685 }
686
687 let len = content_hashes.len();
688 let mut internal_scores: FxHashMap<u32, u32> = FxHashMap::default();
689
690 if early_exit {
692 for &w in &active {
693 internal_scores.insert(w, 1);
694 }
695 scores.scores = internal_scores;
696 for &int_id in scores.scores.keys() {
697 scores.tree_sizes.insert(
698 int_id,
699 self.tree_sizes[int_id as usize].load(Ordering::Relaxed),
700 );
701 }
702 return scores;
703 }
704
705 let mut current_pos = 0;
706
707 while current_pos < len - 1 && !active.is_empty() {
708 let next_pos = (current_pos + self.jump_size).min(len - 1);
709
710 let count = Self::count_workers_at(
711 &self.index,
712 next_pos,
713 content_hashes[next_pos],
714 &mut seq_hashes,
715 content_hashes,
716 );
717
718 if count == active.len() {
721 current_pos = next_pos;
722 } else {
723 Self::linear_scan_drain(
724 &self.index,
725 content_hashes,
726 &mut seq_hashes,
727 &mut active,
728 &mut internal_scores,
729 current_pos + 1,
730 next_pos + 1,
731 false,
732 );
733 current_pos = next_pos;
734 }
735 }
736
737 let final_score = len as u32;
738 for &w in &active {
739 internal_scores.insert(w, final_score);
740 }
741
742 scores.scores = internal_scores;
743
744 for &int_id in scores.scores.keys() {
746 scores.tree_sizes.insert(
747 int_id,
748 self.tree_sizes[int_id as usize].load(Ordering::Relaxed),
749 );
750 }
751
752 scores
753 }
754}
755
756impl Default for PositionalIndexer {
757 fn default() -> Self {
758 Self::new(32)
759 }
760}
761
762impl fmt::Debug for PositionalIndexer {
763 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
764 f.debug_struct("PositionalIndexer")
765 .field("entries", &self.index.len())
766 .field("jump_size", &self.jump_size)
767 .field("workers", &self.next_worker_id.load(Ordering::Relaxed))
768 .finish()
769 }
770}
771
772#[cfg(test)]
773mod tests {
774 use super::*;
775
776 fn make_blocks(content_hashes: &[u64]) -> Vec<StoredBlock> {
778 let mut blocks = Vec::new();
780 let mut prev_seq: u64 = 0;
781 for (i, &ch) in content_hashes.iter().enumerate() {
782 let seq = if i == 0 {
783 ch
784 } else {
785 PositionalIndexer::compute_next_seq_hash(prev_seq, ch)
786 };
787 prev_seq = seq;
788 blocks.push(StoredBlock {
789 seq_hash: SequenceHash(seq),
790 content_hash: ContentHash(ch),
791 });
792 }
793 blocks
794 }
795
796 fn hashes(values: &[u64]) -> Vec<ContentHash> {
798 values.iter().map(|&v| ContentHash(v)).collect()
799 }
800
801 #[test]
802 fn test_new_indexer_is_empty() {
803 let indexer = PositionalIndexer::default();
804 let scores = indexer.find_matches(&hashes(&[1, 2, 3]), false);
805 assert!(scores.scores.is_empty());
806 assert_eq!(indexer.current_size(), 0);
807 }
808
809 #[test]
810 fn test_store_and_find_single_worker() {
811 let indexer = PositionalIndexer::new(64);
812 let blocks = make_blocks(&[10, 20, 30]);
813 let w1 = indexer.intern_worker("http://w1:8000");
814 let mut wb1 = WorkerBlockMap::default();
815 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
816
817 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
818 assert_eq!(scores.scores.get(&w1), Some(&3));
819 assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
820 }
821
822 #[test]
823 fn test_store_partial_prefix_match() {
824 let indexer = PositionalIndexer::new(64);
825 let blocks = make_blocks(&[10, 20, 30]);
826 let w1 = indexer.intern_worker("http://w1:8000");
827 let mut wb1 = WorkerBlockMap::default();
828 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
829
830 let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40, 50]), false);
832 assert_eq!(scores.scores.get(&w1), Some(&3));
833 }
834
835 #[test]
836 fn test_store_no_match() {
837 let indexer = PositionalIndexer::new(64);
838 let blocks = make_blocks(&[10, 20, 30]);
839 let w1 = indexer.intern_worker("http://w1:8000");
840 let mut wb1 = WorkerBlockMap::default();
841 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
842
843 let scores = indexer.find_matches(&hashes(&[99, 88, 77]), false);
844 assert!(scores.scores.is_empty());
845 }
846
847 #[test]
848 fn test_two_workers_different_depths() {
849 let indexer = PositionalIndexer::new(64);
850 let blocks_w1 = make_blocks(&[10, 20, 30]);
851 let blocks_w2 = make_blocks(&[10, 20]);
852 let w1 = indexer.intern_worker("http://w1:8000");
853 let w2 = indexer.intern_worker("http://w2:8000");
854 let mut wb1 = WorkerBlockMap::default();
855 let mut wb2 = WorkerBlockMap::default();
856 indexer
857 .apply_stored(w1, &blocks_w1, None, &mut wb1)
858 .unwrap();
859 indexer
860 .apply_stored(w2, &blocks_w2, None, &mut wb2)
861 .unwrap();
862
863 let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40]), false);
864 assert_eq!(scores.scores.get(&w1), Some(&3));
865 assert_eq!(scores.scores.get(&w2), Some(&2));
866 }
867
868 #[test]
869 fn test_remove_blocks() {
870 let indexer = PositionalIndexer::new(64);
871 let blocks = make_blocks(&[10, 20, 30]);
872 let seq_hash_of_30 = blocks[2].seq_hash;
873 let w1 = indexer.intern_worker("http://w1:8000");
874 let mut wb1 = WorkerBlockMap::default();
875 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
876 indexer.apply_removed(w1, &[seq_hash_of_30], &mut wb1);
877
878 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
880 assert_eq!(scores.scores.get(&w1), Some(&2));
881 assert_eq!(scores.tree_sizes.get(&w1), Some(&2));
882 }
883
884 #[test]
885 fn test_clear_worker() {
886 let indexer = PositionalIndexer::new(64);
887 let blocks_w1 = make_blocks(&[10, 20, 30]);
888 let blocks_w2 = make_blocks(&[10, 20]);
889 let w1 = indexer.intern_worker("http://w1:8000");
890 let w2 = indexer.intern_worker("http://w2:8000");
891 let mut wb1 = WorkerBlockMap::default();
892 let mut wb2 = WorkerBlockMap::default();
893 indexer
894 .apply_stored(w1, &blocks_w1, None, &mut wb1)
895 .unwrap();
896 indexer
897 .apply_stored(w2, &blocks_w2, None, &mut wb2)
898 .unwrap();
899
900 indexer.apply_cleared(w1, &mut wb1);
901
902 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
903 assert!(!scores.scores.contains_key(&w1));
904 assert_eq!(scores.scores.get(&w2), Some(&2));
905 }
906
907 #[test]
908 fn test_tree_sizes() {
909 let indexer = PositionalIndexer::new(64);
910 let blocks_w1 = make_blocks(&[10, 20, 30]);
911 let blocks_w2 = make_blocks(&[10, 20]);
912 let w1 = indexer.intern_worker("http://w1:8000");
913 let w2 = indexer.intern_worker("http://w2:8000");
914 let mut wb1 = WorkerBlockMap::default();
915 let mut wb2 = WorkerBlockMap::default();
916 indexer
917 .apply_stored(w1, &blocks_w1, None, &mut wb1)
918 .unwrap();
919 indexer
920 .apply_stored(w2, &blocks_w2, None, &mut wb2)
921 .unwrap();
922
923 let scores = indexer.find_matches(&hashes(&[10]), false);
924 assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
925 assert_eq!(scores.tree_sizes.get(&w2), Some(&2));
926 }
927
928 #[test]
929 fn test_store_with_parent_hash() {
930 let indexer = PositionalIndexer::new(64);
931 let blocks1 = make_blocks(&[10, 20]);
933 let parent_seq_hash = blocks1[1].seq_hash;
934 let w1 = indexer.intern_worker("http://w1:8000");
935 let mut wb1 = WorkerBlockMap::default();
936 indexer.apply_stored(w1, &blocks1, None, &mut wb1).unwrap();
937
938 let blocks2 = vec![
940 StoredBlock {
941 seq_hash: SequenceHash(300),
942 content_hash: ContentHash(30),
943 },
944 StoredBlock {
945 seq_hash: SequenceHash(400),
946 content_hash: ContentHash(40),
947 },
948 ];
949 indexer
950 .apply_stored(w1, &blocks2, Some(parent_seq_hash), &mut wb1)
951 .unwrap();
952
953 let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40]), false);
954 assert_eq!(scores.scores.get(&w1), Some(&4));
955 assert_eq!(scores.tree_sizes.get(&w1), Some(&4));
956 }
957
958 #[test]
959 fn test_store_with_parent_error_worker_not_tracked() {
960 let indexer = PositionalIndexer::new(64);
961 let blocks = make_blocks(&[10, 20]);
962 let w1 = indexer.intern_worker("http://w1:8000");
963 let mut wb1 = WorkerBlockMap::default();
964 let result = indexer.apply_stored(w1, &blocks, Some(SequenceHash(999)), &mut wb1);
965 assert!(matches!(result, Err(ApplyError::WorkerNotTracked)));
966 }
967
968 #[test]
969 fn test_store_with_parent_error_parent_not_found() {
970 let indexer = PositionalIndexer::new(64);
971 let blocks1 = make_blocks(&[10, 20]);
972 let w1 = indexer.intern_worker("http://w1:8000");
973 let mut wb1 = WorkerBlockMap::default();
974 indexer.apply_stored(w1, &blocks1, None, &mut wb1).unwrap();
975
976 let blocks2 = make_blocks(&[30]);
977 let result = indexer.apply_stored(w1, &blocks2, Some(SequenceHash(999_999)), &mut wb1);
978 assert!(matches!(result, Err(ApplyError::ParentBlockNotFound)));
979 }
980
981 #[test]
982 fn test_remove_missing_block_is_noop() {
983 let indexer = PositionalIndexer::new(64);
984 let blocks = make_blocks(&[10, 20, 30]);
985 let w1 = indexer.intern_worker("http://w1:8000");
986 let mut wb1 = WorkerBlockMap::default();
987 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
988
989 indexer.apply_removed(w1, &[SequenceHash(999)], &mut wb1);
990 assert_eq!(indexer.current_size(), 3);
991 }
992
993 #[test]
994 fn test_remove_unknown_worker_is_noop() {
995 let indexer = PositionalIndexer::new(64);
996 let w1 = indexer.intern_worker("http://unknown:8000");
997 let mut wb1 = WorkerBlockMap::default();
998 indexer.apply_removed(w1, &[SequenceHash(1)], &mut wb1);
999 }
1000
1001 #[test]
1002 fn test_remove_worker() {
1003 let indexer = PositionalIndexer::new(64);
1004 let blocks = make_blocks(&[10, 20, 30]);
1005 let w1 = indexer.intern_worker("http://w1:8000");
1006 let mut wb1 = WorkerBlockMap::default();
1007 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1008 indexer.remove_worker(w1, wb1);
1009
1010 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1011 assert!(scores.scores.is_empty());
1012 assert_eq!(indexer.current_size(), 0);
1013 }
1014
1015 #[test]
1016 fn test_multiple_workers_same_position() {
1017 let indexer = PositionalIndexer::new(64);
1018 let w1 = indexer.intern_worker("http://w1:8000");
1019 let w2 = indexer.intern_worker("http://w2:8000");
1020 let w3 = indexer.intern_worker("http://w3:8000");
1021 let mut wb1 = WorkerBlockMap::default();
1022 let mut wb2 = WorkerBlockMap::default();
1023 let mut wb3 = WorkerBlockMap::default();
1024 indexer
1025 .apply_stored(w1, &make_blocks(&[10]), None, &mut wb1)
1026 .unwrap();
1027 indexer
1028 .apply_stored(w2, &make_blocks(&[10]), None, &mut wb2)
1029 .unwrap();
1030 indexer
1031 .apply_stored(w3, &make_blocks(&[10]), None, &mut wb3)
1032 .unwrap();
1033
1034 let scores = indexer.find_matches(&hashes(&[10]), false);
1035 assert_eq!(scores.scores.get(&w1), Some(&1));
1036 assert_eq!(scores.scores.get(&w2), Some(&1));
1037 assert_eq!(scores.scores.get(&w3), Some(&1));
1038 }
1039
1040 #[test]
1041 fn test_empty_blocks_is_noop() {
1042 let indexer = PositionalIndexer::new(64);
1043 let w1 = indexer.intern_worker("http://w1:8000");
1044 let mut wb1 = WorkerBlockMap::default();
1045 indexer.apply_stored(w1, &[], None, &mut wb1).unwrap();
1046 assert_eq!(indexer.current_size(), 0);
1047 }
1048
1049 #[test]
1050 fn test_single_block_sequence() {
1051 let indexer = PositionalIndexer::new(64);
1052 let blocks = make_blocks(&[42]);
1053 let w1 = indexer.intern_worker("http://w1:8000");
1054 let mut wb1 = WorkerBlockMap::default();
1055 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1056
1057 let scores = indexer.find_matches(&hashes(&[42]), false);
1058 assert_eq!(scores.scores.get(&w1), Some(&1));
1059 }
1060
1061 #[test]
1062 fn test_request_content_hash_chunking() {
1063 let hashes = compute_request_content_hashes(&[1, 2, 3, 4, 5, 6, 7, 8], 4);
1064 assert_eq!(hashes.len(), 2);
1065 assert_eq!(hashes[0], compute_content_hash(&[1, 2, 3, 4]));
1066 assert_eq!(hashes[1], compute_content_hash(&[5, 6, 7, 8]));
1067 }
1068
1069 #[test]
1070 fn test_request_content_hash_zero_block_size() {
1071 let hashes = compute_request_content_hashes(&[1, 2, 3], 0);
1072 assert!(hashes.is_empty());
1073 }
1074
1075 #[test]
1080 fn test_jump_search_long_prefix() {
1081 let indexer = PositionalIndexer::new(4); let values: Vec<u64> = (1..=20).collect();
1083 let blocks = make_blocks(&values);
1084 let w1 = indexer.intern_worker("http://w1:8000");
1085 let mut wb1 = WorkerBlockMap::default();
1086 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1087
1088 let scores = indexer.find_matches(&hashes(&values), false);
1089 assert_eq!(scores.scores.get(&w1), Some(&20));
1090 }
1091
1092 #[test]
1093 fn test_jump_search_worker_drains_mid_jump() {
1094 let indexer = PositionalIndexer::new(4);
1095 let values_w1: Vec<u64> = (1..=10).collect();
1097 let values_w2: Vec<u64> = (1..=6).collect();
1098 let w1 = indexer.intern_worker("http://w1:8000");
1099 let w2 = indexer.intern_worker("http://w2:8000");
1100 let mut wb1 = WorkerBlockMap::default();
1101 let mut wb2 = WorkerBlockMap::default();
1102 indexer
1103 .apply_stored(w1, &make_blocks(&values_w1), None, &mut wb1)
1104 .unwrap();
1105 indexer
1106 .apply_stored(w2, &make_blocks(&values_w2), None, &mut wb2)
1107 .unwrap();
1108
1109 let query: Vec<u64> = (1..=10).collect();
1110 let scores = indexer.find_matches(&hashes(&query), false);
1111 assert_eq!(scores.scores.get(&w1), Some(&10));
1112 assert_eq!(scores.scores.get(&w2), Some(&6));
1113 }
1114
1115 #[test]
1116 fn test_jump_search_multiple_drains() {
1117 let indexer = PositionalIndexer::new(3);
1118 let v1: Vec<u64> = (1..=12).collect();
1120 let v2: Vec<u64> = (1..=7).collect();
1121 let v3: Vec<u64> = (1..=4).collect();
1122 let w1 = indexer.intern_worker("http://w1:8000");
1123 let w2 = indexer.intern_worker("http://w2:8000");
1124 let w3 = indexer.intern_worker("http://w3:8000");
1125 let mut wb1 = WorkerBlockMap::default();
1126 let mut wb2 = WorkerBlockMap::default();
1127 let mut wb3 = WorkerBlockMap::default();
1128 indexer
1129 .apply_stored(w1, &make_blocks(&v1), None, &mut wb1)
1130 .unwrap();
1131 indexer
1132 .apply_stored(w2, &make_blocks(&v2), None, &mut wb2)
1133 .unwrap();
1134 indexer
1135 .apply_stored(w3, &make_blocks(&v3), None, &mut wb3)
1136 .unwrap();
1137
1138 let query: Vec<u64> = (1..=12).collect();
1139 let scores = indexer.find_matches(&hashes(&query), false);
1140 assert_eq!(scores.scores.get(&w1), Some(&12));
1141 assert_eq!(scores.scores.get(&w2), Some(&7));
1142 assert_eq!(scores.scores.get(&w3), Some(&4));
1143 }
1144
1145 #[test]
1146 fn test_concurrent_store_and_match() {
1147 use std::{sync::Arc, thread};
1148
1149 let indexer = Arc::new(PositionalIndexer::new(64));
1150 let indexer_writer = Arc::clone(&indexer);
1151
1152 let writer = thread::spawn(move || {
1153 for i in 0..100u64 {
1154 let blocks = make_blocks(&[i * 10, i * 10 + 1, i * 10 + 2]);
1155 let wid = indexer_writer.intern_worker(&format!("http://w{i}:8000"));
1156 let mut wb = WorkerBlockMap::default();
1157 let _ = indexer_writer.apply_stored(wid, &blocks, None, &mut wb);
1158 }
1159 });
1160
1161 let reader = thread::spawn({
1162 let indexer = Arc::clone(&indexer);
1163 move || {
1164 for _ in 0..1000 {
1165 let _ = indexer.find_matches(&hashes(&[0, 1, 2, 3, 4]), false);
1166 }
1167 }
1168 });
1169
1170 writer.join().unwrap();
1171 reader.join().unwrap();
1172 }
1173
1174 #[test]
1175 fn test_seq_entry_single_to_multi_upgrade() {
1176 let indexer = PositionalIndexer::new(64);
1177
1178 let blocks_w1 = vec![StoredBlock {
1181 seq_hash: SequenceHash(100),
1182 content_hash: ContentHash(10),
1183 }];
1184 let w1 = indexer.intern_worker("http://w1:8000");
1185 let w2 = indexer.intern_worker("http://w2:8000");
1186 let mut wb1 = WorkerBlockMap::default();
1187 let mut wb2 = WorkerBlockMap::default();
1188 indexer
1189 .apply_stored(w1, &blocks_w1, None, &mut wb1)
1190 .unwrap();
1191
1192 let blocks_w2 = vec![StoredBlock {
1196 seq_hash: SequenceHash(200),
1197 content_hash: ContentHash(10),
1198 }];
1199 indexer
1200 .apply_stored(w2, &blocks_w2, None, &mut wb2)
1201 .unwrap();
1202
1203 let scores = indexer.find_matches(&hashes(&[10]), false);
1204 assert_eq!(scores.scores.get(&w1), Some(&1));
1205 assert_eq!(scores.scores.get(&w2), Some(&1));
1206 }
1207
1208 #[test]
1209 fn test_seq_entry_distinct_prefix_same_content() {
1210 let indexer = PositionalIndexer::new(64);
1211
1212 let blocks_w1 = make_blocks(&[10, 99]);
1215 let w1 = indexer.intern_worker("http://w1:8000");
1216 let w2 = indexer.intern_worker("http://w2:8000");
1217 let mut wb1 = WorkerBlockMap::default();
1218 let mut wb2 = WorkerBlockMap::default();
1219 indexer
1220 .apply_stored(w1, &blocks_w1, None, &mut wb1)
1221 .unwrap();
1222
1223 let blocks_w2 = make_blocks(&[20, 99]);
1226 indexer
1227 .apply_stored(w2, &blocks_w2, None, &mut wb2)
1228 .unwrap();
1229
1230 let scores = indexer.find_matches(&hashes(&[10, 99]), false);
1232 assert_eq!(scores.scores.get(&w1), Some(&2));
1233 let scores = indexer.find_matches(&hashes(&[20, 99]), false);
1237 assert_eq!(scores.scores.get(&w2), Some(&2));
1238 }
1239
1240 #[test]
1245 fn test_early_exit_returns_score_one() {
1246 let indexer = PositionalIndexer::new(64);
1247 let blocks = make_blocks(&[10, 20, 30]);
1248 let w1 = indexer.intern_worker("http://w1:8000");
1249 let mut wb1 = WorkerBlockMap::default();
1250 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1251
1252 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), true);
1253 assert_eq!(scores.scores.get(&w1), Some(&1));
1255 assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
1257 }
1258
1259 #[test]
1260 fn test_early_exit_no_match() {
1261 let indexer = PositionalIndexer::new(64);
1262 let blocks = make_blocks(&[10, 20, 30]);
1263 let w1 = indexer.intern_worker("http://w1:8000");
1264 let mut wb1 = WorkerBlockMap::default();
1265 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1266
1267 let scores = indexer.find_matches(&hashes(&[99, 88]), true);
1268 assert!(scores.scores.is_empty());
1269 }
1270
1271 #[test]
1276 fn test_worker_id_unknown() {
1277 let indexer = PositionalIndexer::default();
1278 assert!(indexer.worker_id("http://unknown:8000").is_none());
1279 }
1280
1281 #[test]
1282 fn test_worker_id_after_store() {
1283 let indexer = PositionalIndexer::default();
1284 let w1 = indexer.intern_worker("http://w1:8000");
1285 let mut wb1 = WorkerBlockMap::default();
1286 indexer
1287 .apply_stored(w1, &make_blocks(&[10]), None, &mut wb1)
1288 .unwrap();
1289 assert!(indexer.worker_id("http://w1:8000").is_some());
1290 }
1291
1292 #[test]
1297 fn test_tree_sizes_after_store_and_remove() {
1298 let indexer = PositionalIndexer::new(64);
1299 let blocks = make_blocks(&[10, 20, 30, 40, 50]);
1300 let w1 = indexer.intern_worker("http://w1:8000");
1301 let mut wb1 = WorkerBlockMap::default();
1302 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1303 assert_eq!(indexer.current_size(), 5);
1304
1305 indexer.apply_removed(w1, &[blocks[3].seq_hash, blocks[4].seq_hash], &mut wb1);
1307 assert_eq!(indexer.current_size(), 3);
1308
1309 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1311 assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
1312 }
1313
1314 #[test]
1315 fn test_duplicate_store_does_not_inflate_tree_size() {
1316 let indexer = PositionalIndexer::new(64);
1317 let blocks = make_blocks(&[10, 20, 30]);
1318 let w1 = indexer.intern_worker("http://w1:8000");
1319 let mut wb1 = WorkerBlockMap::default();
1320
1321 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1323 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1324 assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
1325
1326 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1328 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1329 assert_eq!(
1330 scores.tree_sizes.get(&w1),
1331 Some(&3),
1332 "Duplicate store event must not inflate tree_size"
1333 );
1334
1335 assert_eq!(scores.scores.get(&w1), Some(&3));
1337 }
1338
1339 #[test]
1340 fn test_remove_worker_nonexistent_is_noop() {
1341 let indexer = PositionalIndexer::default();
1342 let w = indexer.intern_worker("http://ghost:8000");
1343 indexer.remove_worker(w, WorkerBlockMap::default()); assert_eq!(indexer.current_size(), 0);
1345 }
1346
1347 #[test]
1348 fn test_concurrent_read_write() {
1349 let indexer = Arc::new(PositionalIndexer::new(4));
1350 let content: Vec<u64> = (1..=20).collect();
1351 let blocks = make_blocks(&content);
1352 let w1 = indexer.intern_worker("http://w1:8000");
1353 let mut wb1 = WorkerBlockMap::default();
1354 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1355
1356 let mut handles = Vec::new();
1357
1358 for _ in 0..4 {
1360 let idx = Arc::clone(&indexer);
1361 let ch = hashes(&content);
1362 handles.push(std::thread::spawn(move || {
1363 for _ in 0..100 {
1364 let scores = idx.find_matches(&ch, false);
1365 let w1 = idx.worker_id("http://w1:8000").unwrap();
1366 assert!(scores.scores.contains_key(&w1));
1367 }
1368 }));
1369 }
1370
1371 for i in 0..4 {
1373 let idx = Arc::clone(&indexer);
1374 let worker_content: Vec<u64> = (1..=5).collect();
1375 handles.push(std::thread::spawn(move || {
1376 let worker = format!("http://writer{i}:8000");
1377 let wid = idx.intern_worker(&worker);
1378 let mut wb = WorkerBlockMap::default();
1379 let blks = make_blocks(&worker_content);
1380 for _ in 0..50 {
1381 idx.apply_stored(wid, &blks, None, &mut wb).unwrap();
1382 }
1383 }));
1384 }
1385
1386 for handle in handles {
1387 handle.join().unwrap();
1388 }
1389
1390 let scores = indexer.find_matches(&hashes(&content), false);
1392 assert_eq!(scores.scores.get(&w1), Some(&20));
1393 }
1394
1395 #[test]
1396 fn test_dashmap_cleanup_no_memory_leak() {
1397 let indexer = PositionalIndexer::default();
1398 let blocks = make_blocks(&[10, 20, 30]);
1399 let w1 = indexer.intern_worker("http://w1:8000");
1400 let w2 = indexer.intern_worker("http://w2:8000");
1401 let mut wb1 = WorkerBlockMap::default();
1402 let mut wb2 = WorkerBlockMap::default();
1403 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1404 indexer.apply_stored(w2, &blocks, None, &mut wb2).unwrap();
1405
1406 assert!(!indexer.index.is_empty());
1407
1408 indexer.remove_worker(w1, wb1);
1409 assert!(!indexer.index.is_empty());
1410
1411 indexer.remove_worker(w2, wb2);
1412 assert_eq!(indexer.index.len(), 0);
1413 }
1414
1415 #[test]
1416 fn test_compute_content_hash_empty_tokens() {
1417 let hash = compute_content_hash(&[]);
1418 let hash2 = compute_content_hash(&[]);
1419 assert_eq!(hash, hash2);
1420 }
1421
1422 #[test]
1423 fn test_compute_content_hash_single_token() {
1424 let hash = compute_content_hash(&[42]);
1425 assert_ne!(hash, compute_content_hash(&[43]));
1426 }
1427
1428 #[test]
1429 fn test_seq_hash_rolling_correctness() {
1430 let content = vec![10u64, 20, 30, 40, 50];
1431 let blocks = make_blocks(&content);
1432 let content_hashes = hashes(&content);
1433
1434 let mut seq_hashes: Vec<SequenceHash> = Vec::new();
1435 PositionalIndexer::ensure_seq_hash_computed(&mut seq_hashes, 4, &content_hashes);
1436
1437 for (i, block) in blocks.iter().enumerate() {
1438 assert_eq!(
1439 seq_hashes[i], block.seq_hash,
1440 "seq_hash mismatch at position {i}"
1441 );
1442 }
1443 }
1444
1445 #[test]
1446 fn test_query_prefix_of_stored() {
1447 let indexer = PositionalIndexer::default();
1448 let blocks = make_blocks(&[10, 20, 30, 40, 50]);
1449 let w1 = indexer.intern_worker("http://w1:8000");
1450 let mut wb1 = WorkerBlockMap::default();
1451 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1452
1453 let scores = indexer.find_matches(&hashes(&[10, 20]), false);
1454 assert_eq!(scores.scores.get(&w1), Some(&2));
1455 assert_eq!(scores.tree_sizes.get(&w1), Some(&5));
1456 }
1457
1458 #[test]
1459 fn test_disjoint_workers_no_shared_prefix() {
1460 let indexer = PositionalIndexer::default();
1461 let blocks_w1 = make_blocks(&[10, 20, 30]);
1462 let blocks_w2 = make_blocks(&[99, 88, 77]);
1463 let w1 = indexer.intern_worker("http://w1:8000");
1464 let w2 = indexer.intern_worker("http://w2:8000");
1465 let mut wb1 = WorkerBlockMap::default();
1466 let mut wb2 = WorkerBlockMap::default();
1467 indexer
1468 .apply_stored(w1, &blocks_w1, None, &mut wb1)
1469 .unwrap();
1470 indexer
1471 .apply_stored(w2, &blocks_w2, None, &mut wb2)
1472 .unwrap();
1473
1474 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1475 assert_eq!(scores.scores.get(&w1), Some(&3));
1476 assert!(!scores.scores.contains_key(&w2));
1477
1478 let scores = indexer.find_matches(&hashes(&[99, 88, 77]), false);
1479 assert!(!scores.scores.contains_key(&w1));
1480 assert_eq!(scores.scores.get(&w2), Some(&3));
1481 }
1482
1483 #[test]
1484 #[should_panic(expected = "jump_size must be greater than 0")]
1485 fn test_zero_jump_size_panics() {
1486 let _ = PositionalIndexer::new(0);
1487 }
1488
1489 #[test]
1490 fn test_current_size_across_operations() {
1491 let indexer = PositionalIndexer::default();
1492 assert_eq!(indexer.current_size(), 0);
1493
1494 let blocks = make_blocks(&[10, 20, 30]);
1495 let w1 = indexer.intern_worker("http://w1:8000");
1496 let w2 = indexer.intern_worker("http://w2:8000");
1497 let mut wb1 = WorkerBlockMap::default();
1498 let mut wb2 = WorkerBlockMap::default();
1499 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1500 assert_eq!(indexer.current_size(), 3);
1501
1502 indexer.apply_stored(w2, &blocks, None, &mut wb2).unwrap();
1503 assert_eq!(indexer.current_size(), 6);
1504
1505 indexer.apply_removed(w1, &[blocks[2].seq_hash], &mut wb1);
1506 assert_eq!(indexer.current_size(), 5);
1507
1508 indexer.apply_cleared(w2, &mut wb2);
1509 assert_eq!(indexer.current_size(), 2);
1510
1511 indexer.remove_worker(w1, wb1);
1512 assert_eq!(indexer.current_size(), 0);
1513 }
1514
1515 #[test]
1520 fn test_request_hashes_basic() {
1521 let tokens: Vec<u32> = (1..=8).collect();
1522 let hashes = compute_request_content_hashes(&tokens, 4);
1523 assert_eq!(hashes.len(), 2);
1524 assert_eq!(hashes[0], compute_content_hash(&[1, 2, 3, 4]));
1525 assert_eq!(hashes[1], compute_content_hash(&[5, 6, 7, 8]));
1526 }
1527
1528 #[test]
1529 fn test_request_hashes_partial_trailing_chunk_discarded() {
1530 let tokens: Vec<u32> = (1..=10).collect();
1531 let hashes = compute_request_content_hashes(&tokens, 4);
1532 assert_eq!(hashes.len(), 2);
1533 }
1534
1535 #[test]
1536 fn test_request_hashes_fewer_than_block_size() {
1537 let hashes = compute_request_content_hashes(&[1, 2, 3], 4);
1538 assert!(hashes.is_empty());
1539 }
1540
1541 #[test]
1542 fn test_request_hashes_empty_tokens() {
1543 let hashes = compute_request_content_hashes(&[], 16);
1544 assert!(hashes.is_empty());
1545 }
1546
1547 #[test]
1548 fn test_request_hashes_exact_multiple() {
1549 let tokens: Vec<u32> = (1..=6).collect();
1550 let hashes = compute_request_content_hashes(&tokens, 2);
1551 assert_eq!(hashes.len(), 3);
1552 }
1553
1554 #[test]
1555 fn test_request_hashes_zero_block_size_returns_empty() {
1556 let hashes = compute_request_content_hashes(&[1, 2, 3], 0);
1557 assert!(hashes.is_empty());
1558 }
1559
1560 #[test]
1561 fn test_request_hashes_block_size_1() {
1562 let tokens = vec![10u32, 20, 30];
1563 let hashes = compute_request_content_hashes(&tokens, 1);
1564 assert_eq!(hashes.len(), 3);
1565 assert_eq!(hashes[0], compute_content_hash(&[10]));
1566 assert_eq!(hashes[1], compute_content_hash(&[20]));
1567 assert_eq!(hashes[2], compute_content_hash(&[30]));
1568 }
1569
1570 #[test]
1575 fn test_end_to_end_store_and_query() {
1576 let indexer = PositionalIndexer::default();
1577 let block_size = 4;
1578 let tokens: Vec<u32> = (1..=16).collect();
1579
1580 let content_hashes: Vec<ContentHash> = tokens
1581 .chunks(block_size)
1582 .map(compute_content_hash)
1583 .collect();
1584
1585 let blocks: Vec<StoredBlock> = content_hashes
1586 .iter()
1587 .enumerate()
1588 .map(|(i, &ch)| StoredBlock {
1589 seq_hash: SequenceHash(0xBEEF_0000 + i as u64),
1590 content_hash: ch,
1591 })
1592 .collect();
1593
1594 let w1 = indexer.intern_worker("http://w1:8000");
1595 let mut wb1 = WorkerBlockMap::default();
1596 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1597
1598 let query_hashes = compute_request_content_hashes(&tokens, block_size);
1599 let scores = indexer.find_matches(&query_hashes, false);
1600 assert_eq!(scores.scores.get(&w1), Some(&4));
1601 }
1602
1603 #[test]
1604 fn test_end_to_end_partial_overlap() {
1605 let indexer = PositionalIndexer::default();
1606 let block_size = 4;
1607
1608 let cached_tokens: Vec<u32> = (1..=8).collect();
1609 let blocks: Vec<StoredBlock> = cached_tokens
1610 .chunks(block_size)
1611 .enumerate()
1612 .map(|(i, chunk)| StoredBlock {
1613 seq_hash: SequenceHash(i as u64 + 1),
1614 content_hash: compute_content_hash(chunk),
1615 })
1616 .collect();
1617 let w1 = indexer.intern_worker("http://w1:8000");
1618 let mut wb1 = WorkerBlockMap::default();
1619 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1620
1621 let query_tokens: Vec<u32> = (1..=16).collect();
1622 let query_hashes = compute_request_content_hashes(&query_tokens, block_size);
1623 let scores = indexer.find_matches(&query_hashes, false);
1624 assert_eq!(scores.scores.get(&w1), Some(&2));
1625 assert_eq!(scores.tree_sizes.get(&w1), Some(&2));
1626 }
1627
1628 #[test]
1629 fn test_end_to_end_different_backends_same_content() {
1630 let indexer = PositionalIndexer::new(4);
1631 let block_size = 4;
1632 let tokens: Vec<u32> = (1..=8).collect();
1633 let content_hashes: Vec<ContentHash> = tokens
1634 .chunks(block_size)
1635 .map(compute_content_hash)
1636 .collect();
1637
1638 let blocks_w1: Vec<StoredBlock> = content_hashes
1639 .iter()
1640 .enumerate()
1641 .map(|(i, &ch)| StoredBlock {
1642 seq_hash: SequenceHash(0xAAAA_0000 + i as u64),
1643 content_hash: ch,
1644 })
1645 .collect();
1646
1647 let blocks_w2: Vec<StoredBlock> = content_hashes
1648 .iter()
1649 .enumerate()
1650 .map(|(i, &ch)| StoredBlock {
1651 seq_hash: SequenceHash(0xBBBB_0000 + i as u64),
1652 content_hash: ch,
1653 })
1654 .collect();
1655
1656 let sglang = indexer.intern_worker("http://sglang:8000");
1657 let vllm = indexer.intern_worker("http://vllm:8000");
1658 let mut wb_sg = WorkerBlockMap::default();
1659 let mut wb_vl = WorkerBlockMap::default();
1660 indexer
1661 .apply_stored(sglang, &blocks_w1, None, &mut wb_sg)
1662 .unwrap();
1663 indexer
1664 .apply_stored(vllm, &blocks_w2, None, &mut wb_vl)
1665 .unwrap();
1666
1667 let query_hashes = compute_request_content_hashes(&tokens, block_size);
1668 let scores = indexer.find_matches(&query_hashes, false);
1669 assert_eq!(scores.scores.get(&sglang), Some(&2));
1670 assert_eq!(scores.scores.get(&vllm), Some(&2));
1671 }
1672
1673 fn store_via_continuations(
1679 indexer: &PositionalIndexer,
1680 worker: &str,
1681 content: &[u64],
1682 chunk_size: usize,
1683 worker_blocks: &mut WorkerBlockMap,
1684 ) {
1685 let worker_id = indexer.intern_worker(worker);
1686 let all_blocks = make_blocks(content);
1687 let mut offset = 0;
1688 let mut parent: Option<SequenceHash> = None;
1689 while offset < all_blocks.len() {
1690 let end = (offset + chunk_size).min(all_blocks.len());
1691 let chunk = &all_blocks[offset..end];
1692 indexer
1693 .apply_stored(worker_id, chunk, parent, worker_blocks)
1694 .unwrap();
1695 parent = Some(chunk.last().unwrap().seq_hash);
1696 offset = end;
1697 }
1698 }
1699
1700 #[test]
1701 fn test_divergence_at_jump_boundaries() {
1702 let indexer = PositionalIndexer::new(32);
1703 let full: Vec<u64> = (1..=128).collect();
1704 let full_blocks = make_blocks(&full);
1705 let full_id = indexer.intern_worker("http://full:8000");
1706 let mut wb_full = WorkerBlockMap::default();
1707 indexer
1708 .apply_stored(full_id, &full_blocks, None, &mut wb_full)
1709 .unwrap();
1710
1711 for &depth in &[31, 32, 33] {
1712 let partial_blocks = make_blocks(&full[..depth]);
1713 let worker = format!("http://depth{depth}:8000");
1714 let wid = indexer.intern_worker(&worker);
1715 let mut wb = WorkerBlockMap::default();
1716 indexer
1717 .apply_stored(wid, &partial_blocks, None, &mut wb)
1718 .unwrap();
1719 }
1720
1721 for &depth in &[63, 64, 65] {
1722 let partial_blocks = make_blocks(&full[..depth]);
1723 let worker = format!("http://depth{depth}:8000");
1724 let wid = indexer.intern_worker(&worker);
1725 let mut wb = WorkerBlockMap::default();
1726 indexer
1727 .apply_stored(wid, &partial_blocks, None, &mut wb)
1728 .unwrap();
1729 }
1730
1731 let scores = indexer.find_matches(&hashes(&full), false);
1732 assert_eq!(scores.scores.get(&full_id), Some(&128));
1733 for &depth in &[31u64, 32, 33, 63, 64, 65] {
1734 let worker = format!("http://depth{depth}:8000");
1735 let wid = indexer.worker_id(&worker).unwrap();
1736 assert_eq!(scores.scores.get(&wid), Some(&(depth as u32)));
1737 }
1738 }
1739
1740 #[test]
1741 fn test_exact_jump_size_sequences() {
1742 let indexer = PositionalIndexer::new(32);
1743
1744 for &len in &[32, 64, 96] {
1745 let content: Vec<u64> = (1..=len as u64).collect();
1746 let blocks = make_blocks(&content);
1747 let worker = format!("http://len{len}:8000");
1748 let wid = indexer.intern_worker(&worker);
1749 let mut wb = WorkerBlockMap::default();
1750 indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
1751
1752 let scores = indexer.find_matches(&hashes(&content), false);
1753 assert_eq!(
1754 scores.scores.get(&wid),
1755 Some(&(len as u32)),
1756 "exact match failed for sequence length {len}"
1757 );
1758 }
1759 }
1760
1761 #[test]
1762 fn test_off_by_one_jump_boundaries() {
1763 let indexer = PositionalIndexer::new(32);
1764 let full: Vec<u64> = (1..=128).collect();
1765
1766 for &len in &[31, 33, 63, 65, 95, 97] {
1767 let content = &full[..len];
1768 let blocks = make_blocks(content);
1769 let worker = format!("http://len{len}:8000");
1770 let wid = indexer.intern_worker(&worker);
1771 let mut wb = WorkerBlockMap::default();
1772 indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
1773
1774 let scores = indexer.find_matches(&hashes(content), false);
1775 assert_eq!(
1776 scores.scores.get(&wid),
1777 Some(&(len as u32)),
1778 "exact match failed for sequence length {len}"
1779 );
1780 }
1781 }
1782
1783 #[test]
1784 fn test_staggered_workers_across_jump_boundaries() {
1785 let indexer = PositionalIndexer::new(32);
1786 let full: Vec<u64> = (1..=100).collect();
1787
1788 let depths = [10, 20, 35, 64, 100];
1789 for &depth in &depths {
1790 let blocks = make_blocks(&full[..depth]);
1791 let worker = format!("http://w{depth}:8000");
1792 let wid = indexer.intern_worker(&worker);
1793 let mut wb = WorkerBlockMap::default();
1794 indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
1795 }
1796
1797 let scores = indexer.find_matches(&hashes(&full), false);
1798 for &depth in &depths {
1799 let worker = format!("http://w{depth}:8000");
1800 let wid = indexer.worker_id(&worker).unwrap();
1801 assert_eq!(
1802 scores.scores.get(&wid),
1803 Some(&(depth as u32)),
1804 "worker at depth {depth} has wrong score"
1805 );
1806 }
1807 }
1808
1809 #[test]
1810 fn test_shared_prefix_diverge_at_jump_boundary() {
1811 let indexer = PositionalIndexer::new(32);
1812 let shared: Vec<u64> = (1..=40).collect();
1813
1814 let mut content_w1 = shared.clone();
1815 content_w1.extend(1001..=1060);
1816 let blocks_w1 = make_blocks(&content_w1);
1817 let w1 = indexer.intern_worker("http://w1:8000");
1818 let w2 = indexer.intern_worker("http://w2:8000");
1819 let w3 = indexer.intern_worker("http://w3:8000");
1820 let mut wb1 = WorkerBlockMap::default();
1821 let mut wb2 = WorkerBlockMap::default();
1822 let mut wb3 = WorkerBlockMap::default();
1823 indexer
1824 .apply_stored(w1, &blocks_w1, None, &mut wb1)
1825 .unwrap();
1826
1827 let mut content_w2 = shared.clone();
1828 content_w2.extend(2001..=2020);
1829 let blocks_w2 = make_blocks(&content_w2);
1830 indexer
1831 .apply_stored(w2, &blocks_w2, None, &mut wb2)
1832 .unwrap();
1833
1834 let blocks_w3 = make_blocks(&shared);
1835 indexer
1836 .apply_stored(w3, &blocks_w3, None, &mut wb3)
1837 .unwrap();
1838
1839 let scores = indexer.find_matches(&hashes(&content_w1), false);
1840 assert_eq!(scores.scores.get(&w1), Some(&100));
1841 assert_eq!(scores.scores.get(&w2), Some(&40));
1842 assert_eq!(scores.scores.get(&w3), Some(&40));
1843 }
1844
1845 #[test]
1846 fn test_very_long_sequence() {
1847 let indexer = PositionalIndexer::new(64);
1848 let content: Vec<u64> = (1..=1000).collect();
1849 let blocks = make_blocks(&content);
1850 let w1 = indexer.intern_worker("http://w1:8000");
1851 let mut wb1 = WorkerBlockMap::default();
1852 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1853
1854 let scores = indexer.find_matches(&hashes(&content), false);
1855 assert_eq!(scores.scores.get(&w1), Some(&1000));
1856
1857 let scores = indexer.find_matches(&hashes(&content[..500]), false);
1858 assert_eq!(scores.scores.get(&w1), Some(&500));
1859
1860 let mut divergent = content[..499].to_vec();
1861 divergent.push(999999);
1862 let scores = indexer.find_matches(&hashes(&divergent), false);
1863 assert_eq!(scores.scores.get(&w1), Some(&499));
1864 }
1865
1866 #[test]
1871 fn test_deep_continuation_chain() {
1872 let indexer = PositionalIndexer::new(64);
1873 let content: Vec<u64> = (1..=200).collect();
1874 let mut wb1 = WorkerBlockMap::default();
1875 store_via_continuations(&indexer, "http://w1:8000", &content, 10, &mut wb1);
1876
1877 assert_eq!(indexer.current_size(), 200);
1878
1879 let w1 = indexer.worker_id("http://w1:8000").unwrap();
1880 let scores = indexer.find_matches(&hashes(&content), false);
1881 assert_eq!(scores.scores.get(&w1), Some(&200));
1882
1883 let scores = indexer.find_matches(&hashes(&content[..150]), false);
1884 assert_eq!(scores.scores.get(&w1), Some(&150));
1885 }
1886
1887 #[test]
1888 fn test_continuation_chain_with_multiple_workers() {
1889 let indexer = PositionalIndexer::new(32);
1890 let content: Vec<u64> = (1..=100).collect();
1891
1892 let mut wb1 = WorkerBlockMap::default();
1893 let mut wb2 = WorkerBlockMap::default();
1894 store_via_continuations(&indexer, "http://w1:8000", &content, 10, &mut wb1);
1895 store_via_continuations(&indexer, "http://w2:8000", &content[..50], 10, &mut wb2);
1896
1897 let w1 = indexer.worker_id("http://w1:8000").unwrap();
1898 let w2 = indexer.worker_id("http://w2:8000").unwrap();
1899 let scores = indexer.find_matches(&hashes(&content), false);
1900 assert_eq!(scores.scores.get(&w1), Some(&100));
1901 assert_eq!(scores.scores.get(&w2), Some(&50));
1902 }
1903
1904 #[test]
1905 fn test_multiple_disjoint_sequences_per_worker() {
1906 let indexer = PositionalIndexer::new(64);
1907 let w1 = indexer.intern_worker("http://w1:8000");
1908 let mut wb1 = WorkerBlockMap::default();
1909
1910 let blocks1 = make_blocks(&[10, 20, 30]);
1911 indexer.apply_stored(w1, &blocks1, None, &mut wb1).unwrap();
1912
1913 let blocks2 = make_blocks(&[100, 200, 300, 400]);
1914 indexer.apply_stored(w1, &blocks2, None, &mut wb1).unwrap();
1915
1916 let scores = indexer.find_matches(&hashes(&[100, 200, 300, 400]), false);
1917 assert_eq!(scores.scores.get(&w1), Some(&4));
1918
1919 let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1920 assert_eq!(scores.scores.get(&w1), Some(&3));
1921 }
1922
1923 #[test]
1928 fn test_long_sequence_partial_removal() {
1929 let indexer = PositionalIndexer::new(32);
1930 let content: Vec<u64> = (1..=100).collect();
1931 let blocks = make_blocks(&content);
1932 let w1 = indexer.intern_worker("http://w1:8000");
1933 let mut wb1 = WorkerBlockMap::default();
1934 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1935
1936 let to_remove: Vec<SequenceHash> = blocks[80..].iter().map(|b| b.seq_hash).collect();
1937 indexer.apply_removed(w1, &to_remove, &mut wb1);
1938
1939 assert_eq!(indexer.current_size(), 80);
1940
1941 let scores = indexer.find_matches(&hashes(&content), false);
1942 assert_eq!(scores.scores.get(&w1), Some(&80));
1943
1944 let scores = indexer.find_matches(&hashes(&content[..80]), false);
1945 assert_eq!(scores.scores.get(&w1), Some(&80));
1946 }
1947
1948 #[test]
1949 fn test_remove_parent_does_not_cascade() {
1950 let indexer = PositionalIndexer::new(1);
1951 let blocks = make_blocks(&[10, 20, 30, 40, 50]);
1952 let w1 = indexer.intern_worker("http://w1:8000");
1953 let mut wb1 = WorkerBlockMap::default();
1954 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1955
1956 indexer.apply_removed(w1, &[blocks[1].seq_hash], &mut wb1);
1957
1958 assert_eq!(indexer.current_size(), 4);
1959
1960 let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40, 50]), false);
1961 assert_eq!(scores.scores.get(&w1), Some(&1));
1962 }
1963
1964 #[test]
1965 fn test_long_sequence_clear_and_rebuild() {
1966 let indexer = PositionalIndexer::new(32);
1967 let w1 = indexer.intern_worker("http://w1:8000");
1968 let mut wb1 = WorkerBlockMap::default();
1969
1970 let original: Vec<u64> = (1..=100).collect();
1971 let blocks = make_blocks(&original);
1972 indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1973
1974 indexer.apply_cleared(w1, &mut wb1);
1975 assert_eq!(indexer.current_size(), 0);
1976
1977 let replacement: Vec<u64> = (1001..=1100).collect();
1978 let new_blocks = make_blocks(&replacement);
1979 indexer
1980 .apply_stored(w1, &new_blocks, None, &mut wb1)
1981 .unwrap();
1982
1983 let scores = indexer.find_matches(&hashes(&original), false);
1984 assert!(!scores.scores.contains_key(&w1));
1985
1986 let scores = indexer.find_matches(&hashes(&replacement), false);
1987 assert_eq!(scores.scores.get(&w1), Some(&100));
1988 }
1989
1990 #[test]
1991 fn test_interleaved_long_sequences() {
1992 let indexer = PositionalIndexer::new(32);
1993 let content: Vec<u64> = (1..=100).collect();
1994
1995 let depths = [25, 50, 75, 100];
1996 for &depth in &depths {
1997 let blocks = make_blocks(&content[..depth]);
1998 let worker = format!("http://w{depth}:8000");
1999 let wid = indexer.intern_worker(&worker);
2000 let mut wb = WorkerBlockMap::default();
2001 indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
2002 }
2003
2004 let scores = indexer.find_matches(&hashes(&content), false);
2005 for &depth in &depths {
2006 let worker = format!("http://w{depth}:8000");
2007 let wid = indexer.worker_id(&worker).unwrap();
2008 assert_eq!(
2009 scores.scores.get(&wid),
2010 Some(&(depth as u32)),
2011 "worker at depth {depth} has wrong score"
2012 );
2013 assert_eq!(
2014 scores.tree_sizes.get(&wid),
2015 Some(&depth),
2016 "worker at depth {depth} has wrong tree_size"
2017 );
2018 }
2019 }
2020}