Skip to main content

kv_index/
event_tree.rs

1//! Positional indexer for cache-aware routing.
2//!
3//! Uses a single `DashMap<(usize, ContentHash), SeqEntry>` keyed by (position, content_hash).
4//! No capacity limit — the map grows unboundedly as blocks are stored.
5//! Jump search skips positions in strides, yielding amortized O(D/J + W) complexity.
6//!
7//! **Dual-hash scheme**: backends send a position-aware `block_hash` (SequenceHash)
8//! and raw `token_ids` per block. The router computes a position-independent
9//! ContentHash (XXH3) from token_ids, then a rolling prefix hash (also XXH3) from
10//! the ContentHash sequence. SeqEntry is keyed by the router's prefix hash for
11//! precise disambiguation at query time. The backend's SequenceHash is stored in
12//! worker_blocks only, used for `apply_removed` reverse lookup.
13//!
14//! **Performance**: Internal u32 worker IDs eliminate Arc<str> hashing and atomic
15//! refcount bouncing in the hot query loop. Caller-owned `WorkerBlockMap` gives
16//! direct HashMap access (~5ns) on the write path — no DashMap hash+shard locking.
17//! Atomic tree_sizes provide O(1) size queries.
18//!
19//! Thread safety: the shared `index` DashMap is internally synchronized via sharding.
20//! `WorkerBlockMap` is caller-owned (one per tokio task), so no cross-thread
21//! synchronization is needed on the write path.
22
23use std::{
24    fmt,
25    sync::{
26        atomic::{AtomicU32, AtomicUsize, Ordering},
27        Arc,
28    },
29};
30
31use dashmap::{mapref::entry::Entry, DashMap};
32use rustc_hash::{FxBuildHasher, FxHashMap, FxHashSet};
33
34/// Seed for XXH3 hashing.
35pub const XXH3_SEED: u64 = 1337;
36
37/// Shard count for the main index DashMap.
38/// Tuned iteratively — higher values reduce per-shard contention under concurrent
39/// reads+writes at the cost of more memory for shard locks.
40const INDEX_SHARD_COUNT: usize = 1024;
41
42/// Shard count for worker-keyed DashMaps (worker_blocks, worker_to_id).
43/// These maps hold at most ~500 entries (one per worker), so 8 shards is sufficient.
44const WORKER_SHARD_COUNT: usize = 8;
45
46/// Maximum number of workers supported. tree_sizes is a flat Vec indexed by worker_id,
47/// giving lock-free reads on the query hot path (array index vs DashMap hash+lock+probe).
48const MAX_WORKERS: usize = 2048;
49
50/// Position-independent content hash of tokens within a single block.
51/// Computed via XXH3-64 from token IDs. Same tokens always produce the same hash
52/// regardless of their position in the sequence.
53#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
54pub struct ContentHash(pub u64);
55
56/// Position-aware block hash from backend (sequence hash).
57/// Matches the `block_hash` field in KvBlock proto (i64, bitwise reinterpreted as u64).
58/// Different from ContentHash because it encodes the full prefix history.
59#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Ord, PartialOrd)]
60pub struct SequenceHash(pub u64);
61
62impl From<i64> for SequenceHash {
63    fn from(value: i64) -> Self {
64        Self(value as u64)
65    }
66}
67
68impl From<u64> for SequenceHash {
69    fn from(value: u64) -> Self {
70        Self(value)
71    }
72}
73
74/// Internal worker identifier used in [`OverlapScores`].
75///
76/// Consumers map worker URLs to this type via [`PositionalIndexer::worker_id`].
77pub type WorkerId = u32;
78
79/// A block from a store event, carrying both hash representations.
80#[derive(Debug, Clone, Copy)]
81pub struct StoredBlock {
82    /// Position-aware hash from the backend proto (`block_hash` field).
83    pub seq_hash: SequenceHash,
84    /// Position-independent hash computed from token IDs via XXH3.
85    pub content_hash: ContentHash,
86}
87
88/// Error returned by [`PositionalIndexer::apply_stored`] when the event cannot be applied.
89#[derive(Debug)]
90pub enum ApplyError {
91    /// Worker has no entries in the index — cannot resolve parent block.
92    WorkerNotTracked,
93    /// The specified `parent_seq_hash` was not found in this worker's reverse lookup.
94    ParentBlockNotFound,
95}
96
97impl fmt::Display for ApplyError {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        match self {
100            Self::WorkerNotTracked => write!(f, "worker not tracked in index"),
101            Self::ParentBlockNotFound => write!(f, "parent block hash not found for worker"),
102        }
103    }
104}
105
106impl std::error::Error for ApplyError {}
107
108/// Overlap scores: how many consecutive blocks each worker has cached.
109///
110/// Keys are internal `u32` worker IDs. Use [`PositionalIndexer::worker_id`] to
111/// map a worker URL to its internal ID for lookups.
112#[derive(Debug, Default)]
113pub struct OverlapScores {
114    /// internal_worker_id → number of matching prefix blocks (depth in indexer)
115    pub scores: FxHashMap<u32, u32>,
116    /// internal_worker_id → total blocks cached by this worker
117    pub tree_sizes: FxHashMap<u32, usize>,
118}
119
120/// Compute content hash from token IDs (position-independent).
121/// Uses XXH3-64 streaming hasher with standard seed — avoids intermediate allocation.
122pub fn compute_content_hash(token_ids: &[u32]) -> ContentHash {
123    use std::hash::Hasher;
124    let mut hasher = xxhash_rust::xxh3::Xxh3::with_seed(XXH3_SEED);
125    for &t in token_ids {
126        hasher.write(&t.to_le_bytes());
127    }
128    ContentHash(hasher.finish())
129}
130
131/// Chunk request tokens by block size and compute a [`ContentHash`] per full block.
132///
133/// This is the entry point for the **query path**: given a request's token IDs and
134/// the backend's block size, produce the content-hash sequence that
135/// [`PositionalIndexer::find_matches`] expects.
136///
137/// Partial trailing chunks (fewer tokens than `block_size`) are discarded because
138/// backends only cache full blocks.
139///
140/// Returns an empty `Vec` if `block_size` is 0.
141pub fn compute_request_content_hashes(tokens: &[u32], block_size: usize) -> Vec<ContentHash> {
142    if block_size == 0 {
143        tracing::warn!("compute_request_content_hashes called with block_size=0, returning empty");
144        return Vec::new();
145    }
146    tokens
147        .chunks(block_size)
148        .filter(|chunk| chunk.len() == block_size)
149        .map(compute_content_hash)
150        .collect()
151}
152
153// ---------------------------------------------------------------------------
154// SeqEntry: optimizes for the common case (one seq_hash per position+content)
155// ---------------------------------------------------------------------------
156
157/// Entry for the innermost level of the index.
158///
159/// Optimizes for the common case where there's only one sequence hash
160/// at a given (position, content_hash) pair, avoiding HashMap allocation.
161#[derive(Debug, Clone)]
162enum SeqEntry {
163    /// Single seq_hash → workers mapping (common case, no HashMap allocation).
164    Single(SequenceHash, FxHashSet<u32>),
165    /// Multiple seq_hash → workers mappings (rare: different prefixes with same content).
166    Multi(FxHashMap<SequenceHash, FxHashSet<u32>>),
167}
168
169impl SeqEntry {
170    fn new(seq_hash: SequenceHash, worker_id: u32) -> Self {
171        let mut workers = FxHashSet::default();
172        workers.insert(worker_id);
173        Self::Single(seq_hash, workers)
174    }
175
176    /// Insert a worker for a given seq_hash, upgrading to Multi if needed.
177    fn insert(&mut self, seq_hash: SequenceHash, worker_id: u32) {
178        match self {
179            Self::Single(existing_hash, workers) if *existing_hash == seq_hash => {
180                workers.insert(worker_id);
181            }
182            Self::Single(existing_hash, existing_workers) => {
183                let mut map = FxHashMap::with_capacity_and_hasher(2, FxBuildHasher);
184                map.insert(*existing_hash, std::mem::take(existing_workers));
185                map.entry(seq_hash).or_default().insert(worker_id);
186                *self = Self::Multi(map);
187            }
188            Self::Multi(map) => {
189                map.entry(seq_hash).or_default().insert(worker_id);
190            }
191        }
192    }
193
194    /// Remove a worker from a given seq_hash.
195    /// Returns true if the entry is now completely empty and should be removed.
196    fn remove(&mut self, seq_hash: SequenceHash, worker_id: u32) -> bool {
197        match self {
198            Self::Single(existing_hash, workers) if *existing_hash == seq_hash => {
199                workers.remove(&worker_id);
200                workers.is_empty()
201            }
202            Self::Single(_, _) => false,
203            Self::Multi(map) => {
204                if let Some(workers) = map.get_mut(&seq_hash) {
205                    workers.remove(&worker_id);
206                    if workers.is_empty() {
207                        map.remove(&seq_hash);
208                    }
209                }
210                map.is_empty()
211            }
212        }
213    }
214
215    /// Get workers for a specific prefix hash (used in query path and event processing).
216    fn get(&self, seq_hash: SequenceHash) -> Option<&FxHashSet<u32>> {
217        match self {
218            Self::Single(existing_hash, workers) if *existing_hash == seq_hash => Some(workers),
219            Self::Single(_, _) => None,
220            Self::Multi(map) => map.get(&seq_hash),
221        }
222    }
223
224    /// For Single entries, return the worker set directly without prefix hash check.
225    /// Content hash collisions at 64-bit XXH3 are practically impossible (~2^-64),
226    /// so a matching content_hash at the same position is unambiguous — the rolling
227    /// hash computation can be skipped entirely.
228    /// Returns None for Multi entries — caller must compute prefix hash to disambiguate.
229    #[inline]
230    fn workers_if_single(&self) -> Option<&FxHashSet<u32>> {
231        match self {
232            Self::Single(_, workers) => Some(workers),
233            Self::Multi(_) => None,
234        }
235    }
236}
237
238// ---------------------------------------------------------------------------
239// PositionalIndexer
240// ---------------------------------------------------------------------------
241
242/// Per-worker reverse lookup: backend_seq_hash → (position, content_hash, prefix_hash).
243/// The `prefix_hash` is the router-computed rolling hash used as the SeqEntry key.
244///
245/// Callers own one `WorkerBlockMap` per worker and pass it to write-path methods.
246pub type WorkerBlockMap = FxHashMap<SequenceHash, (usize, ContentHash, SequenceHash)>;
247
248/// Positional indexer for cache-aware routing.
249///
250/// Uses a single `DashMap<(usize, ContentHash), SeqEntry>` — keyed by
251/// (position, content_hash). Grows unboundedly (no capacity limit).
252/// Jump search gives amortized O(D/J + W) matching complexity.
253///
254/// Write-path methods take a caller-owned `&mut WorkerBlockMap` (one per worker).
255/// This gives direct HashMap access (~5ns) instead of DashMap hash+shard locking
256/// (~25ns), achieving zero-contention for single-writer-per-worker.
257pub struct PositionalIndexer {
258    /// Single flat index: (position, content_hash) → SeqEntry.
259    /// No capacity limit — grows as blocks are stored.
260    index: DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
261    /// Per-worker block counts, tracked atomically for O(1) reads during queries.
262    /// Flat Vec indexed by worker_id — lock-free reads on the query hot path
263    /// (array index ~1ns vs DashMap hash+lock+probe ~25ns per access).
264    tree_sizes: Vec<AtomicUsize>,
265    /// Worker URL → internal u32 ID (fast path: DashMap shard read).
266    worker_to_id: DashMap<Arc<str>, u32, FxBuildHasher>,
267    /// Monotonic counter for assigning new worker IDs.
268    next_worker_id: AtomicU32,
269    /// Jump size for search optimization (default 64).
270    jump_size: usize,
271}
272
273impl PositionalIndexer {
274    /// Create a new PositionalIndexer with the given jump size.
275    ///
276    /// `jump_size` controls how many positions the search algorithm skips at a time.
277    /// Larger values reduce lookups on long matching prefixes but increase scan range
278    /// when workers drain. Default: 64.
279    pub fn new(jump_size: usize) -> Self {
280        assert!(jump_size > 0, "jump_size must be greater than 0");
281        Self {
282            index: DashMap::with_hasher_and_shard_amount(FxBuildHasher, INDEX_SHARD_COUNT),
283            tree_sizes: (0..MAX_WORKERS).map(|_| AtomicUsize::new(0)).collect(),
284            worker_to_id: DashMap::with_hasher_and_shard_amount(FxBuildHasher, WORKER_SHARD_COUNT),
285            next_worker_id: AtomicU32::new(0),
286            jump_size,
287        }
288    }
289
290    /// Get the internal u32 ID for a worker URL, if it has been interned.
291    ///
292    /// Used by consumers to look up scores in [`OverlapScores`] by worker URL.
293    /// Returns `None` if the worker has never been seen by this indexer.
294    pub fn worker_id(&self, worker: &str) -> Option<u32> {
295        self.worker_to_id.get(worker).map(|entry| *entry.value())
296    }
297
298    /// Apply a "blocks stored" event for a worker.
299    ///
300    /// `worker_id`: internal ID from [`intern_worker`].
301    /// `blocks`: ordered sequence of stored blocks (each with seq_hash + content_hash).
302    /// `parent_seq_hash`: if Some, the sequence extends from the parent's position + 1.
303    ///   If None, the sequence starts from position 0.
304    /// `worker_blocks`: caller-owned reverse lookup for this worker.
305    pub fn apply_stored(
306        &self,
307        worker_id: u32,
308        blocks: &[StoredBlock],
309        parent_seq_hash: Option<SequenceHash>,
310        worker_blocks: &mut WorkerBlockMap,
311    ) -> Result<(), ApplyError> {
312        if blocks.is_empty() {
313            return Ok(());
314        }
315
316        // Determine starting position and parent's router prefix hash.
317        let (start_pos, parent_prefix) = match parent_seq_hash {
318            Some(parent_hash) => {
319                if worker_blocks.is_empty() {
320                    return Err(ApplyError::WorkerNotTracked);
321                }
322                let Some(&(parent_pos, _, parent_pfx)) = worker_blocks.get(&parent_hash) else {
323                    return Err(ApplyError::ParentBlockNotFound);
324                };
325                (parent_pos + 1, Some(parent_pfx))
326            }
327            None => (0, None),
328        };
329
330        let mut prev_prefix = parent_prefix;
331        let mut num_new_blocks = 0usize;
332        for (i, block) in blocks.iter().enumerate() {
333            let position = start_pos + i;
334            let content_hash = block.content_hash;
335
336            // Compute router prefix hash (rolling XXH3 of content hashes).
337            // This is the SeqEntry key — consistent between store and query paths.
338            let prefix_hash = match prev_prefix {
339                Some(prev) => SequenceHash(Self::compute_next_seq_hash(prev.0, content_hash.0)),
340                // Position 0: prefix_hash == content_hash (no parent to chain from).
341                None => SequenceHash(content_hash.0),
342            };
343
344            self.index
345                .entry((position, content_hash))
346                .and_modify(|entry| entry.insert(prefix_hash, worker_id))
347                .or_insert_with(|| SeqEntry::new(prefix_hash, worker_id));
348
349            // Only count genuinely new blocks — duplicate store events must not
350            // inflate tree_sizes, which would cause incorrect routing decisions.
351            if worker_blocks
352                .insert(block.seq_hash, (position, content_hash, prefix_hash))
353                .is_none()
354            {
355                num_new_blocks += 1;
356            }
357            prev_prefix = Some(prefix_hash);
358        }
359
360        // Atomically update tree_sizes — lock-free array index.
361        if num_new_blocks > 0 {
362            self.tree_sizes[worker_id as usize].fetch_add(num_new_blocks, Ordering::Relaxed);
363        }
364
365        Ok(())
366    }
367
368    /// Apply a "blocks removed" event for a worker.
369    ///
370    /// `worker_id`: internal ID from [`intern_worker`].
371    /// `seq_hashes`: position-aware block hashes to remove (from proto).
372    /// `worker_blocks`: caller-owned reverse lookup for this worker.
373    ///
374    /// **Note on orphaned entries**: Removing a block at position N does not cascade to
375    /// blocks at positions > N. Those entries become orphaned — they remain in the index
376    /// but won't match queries because the rolling prefix hash chain is broken at the gap.
377    /// This is harmless: orphaned entries waste a small amount of memory and are cleaned up
378    /// when the worker is cleared or removed. Backends typically evict from the tail (LRU),
379    /// so mid-sequence gaps are rare in practice.
380    pub fn apply_removed(
381        &self,
382        worker_id: u32,
383        seq_hashes: &[SequenceHash],
384        worker_blocks: &mut WorkerBlockMap,
385    ) {
386        let mut num_removed = 0usize;
387        for &seq_hash in seq_hashes {
388            let Some((position, content_hash, prefix_hash)) = worker_blocks.remove(&seq_hash)
389            else {
390                continue;
391            };
392
393            if let Entry::Occupied(mut occupied) = self.index.entry((position, content_hash)) {
394                if occupied.get_mut().remove(prefix_hash, worker_id) {
395                    occupied.remove();
396                }
397            }
398            num_removed += 1;
399        }
400
401        if num_removed > 0 {
402            self.tree_sizes[worker_id as usize].fetch_sub(num_removed, Ordering::Relaxed);
403        }
404    }
405
406    /// Apply a "cache cleared" event — drain blocks, clean index, caller keeps the empty map.
407    ///
408    /// `worker_id`: internal ID from [`intern_worker`].
409    /// `worker_blocks`: caller-owned reverse lookup — drained and left empty.
410    pub fn apply_cleared(&self, worker_id: u32, worker_blocks: &mut WorkerBlockMap) {
411        let drained = std::mem::take(worker_blocks);
412        for &(position, content_hash, prefix_hash) in drained.values() {
413            if let Entry::Occupied(mut occ) = self.index.entry((position, content_hash)) {
414                if occ.get_mut().remove(prefix_hash, worker_id) {
415                    occ.remove();
416                }
417            }
418        }
419        self.tree_sizes[worker_id as usize].store(0, Ordering::Relaxed);
420    }
421
422    /// Remove a worker entirely — takes ownership of blocks, cleans index, worker is gone.
423    ///
424    /// `worker_id`: internal ID from [`intern_worker`].
425    /// `worker_blocks`: caller-owned reverse lookup — consumed.
426    pub fn remove_worker(&self, worker_id: u32, worker_blocks: WorkerBlockMap) {
427        for &(position, content_hash, prefix_hash) in worker_blocks.values() {
428            if let Entry::Occupied(mut occ) = self.index.entry((position, content_hash)) {
429                if occ.get_mut().remove(prefix_hash, worker_id) {
430                    occ.remove();
431                }
432            }
433        }
434        self.tree_sizes[worker_id as usize].store(0, Ordering::Relaxed);
435    }
436
437    /// Get total number of blocks across all workers.
438    pub fn current_size(&self) -> usize {
439        let n = self.next_worker_id.load(Ordering::Relaxed) as usize;
440        self.tree_sizes[..n]
441            .iter()
442            .map(|size| size.load(Ordering::Relaxed))
443            .sum()
444    }
445
446    /// Find overlap scores for a request's content hash sequence.
447    ///
448    /// Uses jump search: strides by `jump_size` positions, only scanning
449    /// intermediate positions when workers drain (stop matching).
450    /// Complexity: amortized O(D/J + W) where D=depth, J=jump_size, W=workers.
451    ///
452    /// When `early_exit` is true, returns immediately after finding any match
453    /// at position 0 (score = 1 for all matching workers). Useful when the caller
454    /// only needs to know whether any worker has cached data for this sequence.
455    ///
456    /// **Assumption**: Block sequences are prefix-closed — if a worker has a block at
457    /// position N, it has blocks at all positions 0..N. This holds when backends evict
458    /// from the tail (LRU). If `apply_removed` creates a mid-sequence gap, the rolling
459    /// prefix hash detects it (the chain breaks at the gap), but the jump heuristic may
460    /// over-count if it lands past the gap. In practice, backends only evict tail blocks.
461    pub fn find_matches(&self, content_hashes: &[ContentHash], early_exit: bool) -> OverlapScores {
462        self.jump_search_matches(content_hashes, early_exit)
463    }
464
465    // -----------------------------------------------------------------------
466    // Internal: router prefix hash + jump search
467    //
468    // The router computes its own rolling hash from ContentHashes (XXH3).
469    // This hash is stored in SeqEntry during apply_stored and recomputed
470    // at query time for precise filtering.
471    // The backend's SequenceHash (from proto block_hash) stays in
472    // worker_blocks only, used for apply_removed reverse lookup.
473    // -----------------------------------------------------------------------
474
475    /// Compute rolling prefix hash: XXH3(prev || current).
476    #[inline]
477    fn compute_next_seq_hash(prev_seq_hash: u64, current_content_hash: u64) -> u64 {
478        let mut bytes = [0u8; 16];
479        bytes[..8].copy_from_slice(&prev_seq_hash.to_le_bytes());
480        bytes[8..].copy_from_slice(&current_content_hash.to_le_bytes());
481        xxhash_rust::xxh3::xxh3_64_with_seed(&bytes, XXH3_SEED)
482    }
483
484    /// Lazily compute prefix hashes up to `target_pos`.
485    #[inline]
486    fn ensure_seq_hash_computed(
487        seq_hashes: &mut Vec<SequenceHash>,
488        target_pos: usize,
489        sequence: &[ContentHash],
490    ) {
491        while seq_hashes.len() <= target_pos {
492            let pos = seq_hashes.len();
493            if pos == 0 {
494                seq_hashes.push(SequenceHash(sequence[0].0));
495            } else {
496                let prev = seq_hashes[pos - 1].0;
497                let current = sequence[pos].0;
498                seq_hashes.push(SequenceHash(Self::compute_next_seq_hash(prev, current)));
499            }
500        }
501    }
502
503    // -----------------------------------------------------------------------
504    // Internal: worker interning (u32 IDs)
505    // -----------------------------------------------------------------------
506
507    /// Intern a worker URL to an internal u32 ID.
508    /// Fast path: DashMap shard read (no lock). Slow path: DashMap entry API (once per worker).
509    pub fn intern_worker(&self, worker: &str) -> u32 {
510        // Fast path: already interned
511        if let Some(entry) = self.worker_to_id.get(worker) {
512            return *entry.value();
513        }
514        // Slow path: DashMap entry API handles the race — or_insert_with runs at most once.
515        let id = *self
516            .worker_to_id
517            .entry(Arc::from(worker))
518            .or_insert_with(|| self.next_worker_id.fetch_add(1, Ordering::Relaxed))
519            .value();
520        assert!(
521            (id as usize) < MAX_WORKERS,
522            "worker count {id} exceeds MAX_WORKERS ({MAX_WORKERS})"
523        );
524        id
525    }
526
527    // -----------------------------------------------------------------------
528    // Internal: query helpers
529    // -----------------------------------------------------------------------
530
531    /// Get workers at a position matching content_hash (and prefix_hash for Multi).
532    /// Copies worker IDs into a Vec — used only once at position 0 to initialize `active`.
533    /// Skips rolling hash computation for Single entries (unambiguous match).
534    fn get_workers_lazy(
535        index: &DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
536        position: usize,
537        content_hash: ContentHash,
538        seq_hashes: &mut Vec<SequenceHash>,
539        sequence: &[ContentHash],
540    ) -> Option<Vec<u32>> {
541        let entry = index.get(&(position, content_hash))?;
542        if let Some(workers) = entry.value().workers_if_single() {
543            return Some(workers.iter().copied().collect());
544        }
545        // Multi: need rolling hash to disambiguate
546        Self::ensure_seq_hash_computed(seq_hashes, position, sequence);
547        entry
548            .value()
549            .get(seq_hashes[position])
550            .map(|workers| workers.iter().copied().collect())
551    }
552
553    /// Count workers at a position matching the prefix_hash (no set materialization).
554    /// Skips rolling hash computation for Single entries (unambiguous match).
555    fn count_workers_at(
556        index: &DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
557        position: usize,
558        content_hash: ContentHash,
559        seq_hashes: &mut Vec<SequenceHash>,
560        sequence: &[ContentHash],
561    ) -> usize {
562        let Some(entry) = index.get(&(position, content_hash)) else {
563            return 0;
564        };
565        if let Some(workers) = entry.value().workers_if_single() {
566            return workers.len();
567        }
568        // Multi: need rolling hash to disambiguate
569        Self::ensure_seq_hash_computed(seq_hashes, position, sequence);
570        entry
571            .get(seq_hashes[position])
572            .map(|workers| workers.len())
573            .unwrap_or(0)
574    }
575
576    /// Scan positions sequentially, draining workers that stop matching.
577    /// Accesses DashMap entries directly — no set cloning.
578    /// Skips rolling hash computation for Single entries (unambiguous match).
579    /// Uses retain guard: skips retain when workers.len() >= active.len()
580    /// (all active workers are still present, no work to do).
581    #[expect(clippy::too_many_arguments)]
582    fn linear_scan_drain(
583        index: &DashMap<(usize, ContentHash), SeqEntry, FxBuildHasher>,
584        sequence: &[ContentHash],
585        seq_hashes: &mut Vec<SequenceHash>,
586        active: &mut Vec<u32>,
587        internal_scores: &mut FxHashMap<u32, u32>,
588        lo: usize,
589        hi: usize,
590        early_exit: bool,
591    ) {
592        for (offset, &content_hash) in sequence[lo..hi].iter().enumerate() {
593            if active.is_empty() {
594                break;
595            }
596            let pos = lo + offset;
597
598            let Some(entry) = index.get(&(pos, content_hash)) else {
599                for &w in active.iter() {
600                    internal_scores.insert(w, pos as u32);
601                }
602                active.clear();
603                break;
604            };
605
606            // Fast path: Single entry — skip rolling hash, use workers directly.
607            if let Some(workers) = entry.value().workers_if_single() {
608                // Retain guard: only retain when some workers
609                // have dropped off. When workers.len() >= active.len(), all active
610                // workers are still present — skip the O(active) iteration.
611                if workers.len() < active.len() {
612                    let mut i = 0;
613                    while i < active.len() {
614                        if workers.contains(&active[i]) {
615                            i += 1;
616                        } else {
617                            internal_scores.insert(active[i], pos as u32);
618                            active.swap_remove(i);
619                        }
620                    }
621                }
622                if early_exit && !active.is_empty() {
623                    break;
624                }
625                continue;
626            }
627
628            // Multi: need rolling hash to disambiguate.
629            Self::ensure_seq_hash_computed(seq_hashes, pos, sequence);
630            let seq_hash = seq_hashes[pos];
631
632            let Some(workers) = entry.get(seq_hash) else {
633                for &w in active.iter() {
634                    internal_scores.insert(w, pos as u32);
635                }
636                active.clear();
637                break;
638            };
639
640            // Retain guard: only iterate when some workers dropped off.
641            if workers.len() < active.len() {
642                let mut i = 0;
643                while i < active.len() {
644                    if workers.contains(&active[i]) {
645                        i += 1;
646                    } else {
647                        internal_scores.insert(active[i], pos as u32);
648                        active.swap_remove(i);
649                    }
650                }
651            }
652
653            if early_exit && !active.is_empty() {
654                break;
655            }
656        }
657    }
658
659    fn jump_search_matches(
660        &self,
661        content_hashes: &[ContentHash],
662        early_exit: bool,
663    ) -> OverlapScores {
664        let mut scores = OverlapScores::default();
665
666        if content_hashes.is_empty() {
667            return scores;
668        }
669
670        let mut seq_hashes = Vec::with_capacity(content_hashes.len());
671
672        let Some(initial_workers) = Self::get_workers_lazy(
673            &self.index,
674            0,
675            content_hashes[0],
676            &mut seq_hashes,
677            content_hashes,
678        ) else {
679            return scores;
680        };
681
682        let mut active = initial_workers;
683        if active.is_empty() {
684            return scores;
685        }
686
687        let len = content_hashes.len();
688        let mut internal_scores: FxHashMap<u32, u32> = FxHashMap::default();
689
690        // Early exit: just record that workers matched at position 0.
691        if early_exit {
692            for &w in &active {
693                internal_scores.insert(w, 1);
694            }
695            scores.scores = internal_scores;
696            for &int_id in scores.scores.keys() {
697                scores.tree_sizes.insert(
698                    int_id,
699                    self.tree_sizes[int_id as usize].load(Ordering::Relaxed),
700                );
701            }
702            return scores;
703        }
704
705        let mut current_pos = 0;
706
707        while current_pos < len - 1 && !active.is_empty() {
708            let next_pos = (current_pos + self.jump_size).min(len - 1);
709
710            let count = Self::count_workers_at(
711                &self.index,
712                next_pos,
713                content_hashes[next_pos],
714                &mut seq_hashes,
715                content_hashes,
716            );
717
718            // If the worker count at the jump destination matches the active set size,
719            // all active workers are still present — safe to skip intermediate positions.
720            if count == active.len() {
721                current_pos = next_pos;
722            } else {
723                Self::linear_scan_drain(
724                    &self.index,
725                    content_hashes,
726                    &mut seq_hashes,
727                    &mut active,
728                    &mut internal_scores,
729                    current_pos + 1,
730                    next_pos + 1,
731                    false,
732                );
733                current_pos = next_pos;
734            }
735        }
736
737        let final_score = len as u32;
738        for &w in &active {
739            internal_scores.insert(w, final_score);
740        }
741
742        scores.scores = internal_scores;
743
744        // Populate tree_sizes from atomic counters — lock-free array index.
745        for &int_id in scores.scores.keys() {
746            scores.tree_sizes.insert(
747                int_id,
748                self.tree_sizes[int_id as usize].load(Ordering::Relaxed),
749            );
750        }
751
752        scores
753    }
754}
755
756impl Default for PositionalIndexer {
757    fn default() -> Self {
758        Self::new(32)
759    }
760}
761
762impl fmt::Debug for PositionalIndexer {
763    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
764        f.debug_struct("PositionalIndexer")
765            .field("entries", &self.index.len())
766            .field("jump_size", &self.jump_size)
767            .field("workers", &self.next_worker_id.load(Ordering::Relaxed))
768            .finish()
769    }
770}
771
772#[cfg(test)]
773mod tests {
774    use super::*;
775
776    /// Helper: create a sequence of StoredBlocks with distinct seq_hashes and content_hashes.
777    fn make_blocks(content_hashes: &[u64]) -> Vec<StoredBlock> {
778        // Generate seq_hashes as rolling hashes of content
779        let mut blocks = Vec::new();
780        let mut prev_seq: u64 = 0;
781        for (i, &ch) in content_hashes.iter().enumerate() {
782            let seq = if i == 0 {
783                ch
784            } else {
785                PositionalIndexer::compute_next_seq_hash(prev_seq, ch)
786            };
787            prev_seq = seq;
788            blocks.push(StoredBlock {
789                seq_hash: SequenceHash(seq),
790                content_hash: ContentHash(ch),
791            });
792        }
793        blocks
794    }
795
796    /// Helper: create ContentHash sequence for find_matches.
797    fn hashes(values: &[u64]) -> Vec<ContentHash> {
798        values.iter().map(|&v| ContentHash(v)).collect()
799    }
800
801    #[test]
802    fn test_new_indexer_is_empty() {
803        let indexer = PositionalIndexer::default();
804        let scores = indexer.find_matches(&hashes(&[1, 2, 3]), false);
805        assert!(scores.scores.is_empty());
806        assert_eq!(indexer.current_size(), 0);
807    }
808
809    #[test]
810    fn test_store_and_find_single_worker() {
811        let indexer = PositionalIndexer::new(64);
812        let blocks = make_blocks(&[10, 20, 30]);
813        let w1 = indexer.intern_worker("http://w1:8000");
814        let mut wb1 = WorkerBlockMap::default();
815        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
816
817        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
818        assert_eq!(scores.scores.get(&w1), Some(&3));
819        assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
820    }
821
822    #[test]
823    fn test_store_partial_prefix_match() {
824        let indexer = PositionalIndexer::new(64);
825        let blocks = make_blocks(&[10, 20, 30]);
826        let w1 = indexer.intern_worker("http://w1:8000");
827        let mut wb1 = WorkerBlockMap::default();
828        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
829
830        // Request has longer sequence — only first 3 match
831        let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40, 50]), false);
832        assert_eq!(scores.scores.get(&w1), Some(&3));
833    }
834
835    #[test]
836    fn test_store_no_match() {
837        let indexer = PositionalIndexer::new(64);
838        let blocks = make_blocks(&[10, 20, 30]);
839        let w1 = indexer.intern_worker("http://w1:8000");
840        let mut wb1 = WorkerBlockMap::default();
841        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
842
843        let scores = indexer.find_matches(&hashes(&[99, 88, 77]), false);
844        assert!(scores.scores.is_empty());
845    }
846
847    #[test]
848    fn test_two_workers_different_depths() {
849        let indexer = PositionalIndexer::new(64);
850        let blocks_w1 = make_blocks(&[10, 20, 30]);
851        let blocks_w2 = make_blocks(&[10, 20]);
852        let w1 = indexer.intern_worker("http://w1:8000");
853        let w2 = indexer.intern_worker("http://w2:8000");
854        let mut wb1 = WorkerBlockMap::default();
855        let mut wb2 = WorkerBlockMap::default();
856        indexer
857            .apply_stored(w1, &blocks_w1, None, &mut wb1)
858            .unwrap();
859        indexer
860            .apply_stored(w2, &blocks_w2, None, &mut wb2)
861            .unwrap();
862
863        let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40]), false);
864        assert_eq!(scores.scores.get(&w1), Some(&3));
865        assert_eq!(scores.scores.get(&w2), Some(&2));
866    }
867
868    #[test]
869    fn test_remove_blocks() {
870        let indexer = PositionalIndexer::new(64);
871        let blocks = make_blocks(&[10, 20, 30]);
872        let seq_hash_of_30 = blocks[2].seq_hash;
873        let w1 = indexer.intern_worker("http://w1:8000");
874        let mut wb1 = WorkerBlockMap::default();
875        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
876        indexer.apply_removed(w1, &[seq_hash_of_30], &mut wb1);
877
878        // After removing block at position 2, w1 should only match 2 blocks
879        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
880        assert_eq!(scores.scores.get(&w1), Some(&2));
881        assert_eq!(scores.tree_sizes.get(&w1), Some(&2));
882    }
883
884    #[test]
885    fn test_clear_worker() {
886        let indexer = PositionalIndexer::new(64);
887        let blocks_w1 = make_blocks(&[10, 20, 30]);
888        let blocks_w2 = make_blocks(&[10, 20]);
889        let w1 = indexer.intern_worker("http://w1:8000");
890        let w2 = indexer.intern_worker("http://w2:8000");
891        let mut wb1 = WorkerBlockMap::default();
892        let mut wb2 = WorkerBlockMap::default();
893        indexer
894            .apply_stored(w1, &blocks_w1, None, &mut wb1)
895            .unwrap();
896        indexer
897            .apply_stored(w2, &blocks_w2, None, &mut wb2)
898            .unwrap();
899
900        indexer.apply_cleared(w1, &mut wb1);
901
902        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
903        assert!(!scores.scores.contains_key(&w1));
904        assert_eq!(scores.scores.get(&w2), Some(&2));
905    }
906
907    #[test]
908    fn test_tree_sizes() {
909        let indexer = PositionalIndexer::new(64);
910        let blocks_w1 = make_blocks(&[10, 20, 30]);
911        let blocks_w2 = make_blocks(&[10, 20]);
912        let w1 = indexer.intern_worker("http://w1:8000");
913        let w2 = indexer.intern_worker("http://w2:8000");
914        let mut wb1 = WorkerBlockMap::default();
915        let mut wb2 = WorkerBlockMap::default();
916        indexer
917            .apply_stored(w1, &blocks_w1, None, &mut wb1)
918            .unwrap();
919        indexer
920            .apply_stored(w2, &blocks_w2, None, &mut wb2)
921            .unwrap();
922
923        let scores = indexer.find_matches(&hashes(&[10]), false);
924        assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
925        assert_eq!(scores.tree_sizes.get(&w2), Some(&2));
926    }
927
928    #[test]
929    fn test_store_with_parent_hash() {
930        let indexer = PositionalIndexer::new(64);
931        // First store: blocks at positions 0, 1
932        let blocks1 = make_blocks(&[10, 20]);
933        let parent_seq_hash = blocks1[1].seq_hash;
934        let w1 = indexer.intern_worker("http://w1:8000");
935        let mut wb1 = WorkerBlockMap::default();
936        indexer.apply_stored(w1, &blocks1, None, &mut wb1).unwrap();
937
938        // Second store: blocks at positions 2, 3 (extending from parent)
939        let blocks2 = vec![
940            StoredBlock {
941                seq_hash: SequenceHash(300),
942                content_hash: ContentHash(30),
943            },
944            StoredBlock {
945                seq_hash: SequenceHash(400),
946                content_hash: ContentHash(40),
947            },
948        ];
949        indexer
950            .apply_stored(w1, &blocks2, Some(parent_seq_hash), &mut wb1)
951            .unwrap();
952
953        let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40]), false);
954        assert_eq!(scores.scores.get(&w1), Some(&4));
955        assert_eq!(scores.tree_sizes.get(&w1), Some(&4));
956    }
957
958    #[test]
959    fn test_store_with_parent_error_worker_not_tracked() {
960        let indexer = PositionalIndexer::new(64);
961        let blocks = make_blocks(&[10, 20]);
962        let w1 = indexer.intern_worker("http://w1:8000");
963        let mut wb1 = WorkerBlockMap::default();
964        let result = indexer.apply_stored(w1, &blocks, Some(SequenceHash(999)), &mut wb1);
965        assert!(matches!(result, Err(ApplyError::WorkerNotTracked)));
966    }
967
968    #[test]
969    fn test_store_with_parent_error_parent_not_found() {
970        let indexer = PositionalIndexer::new(64);
971        let blocks1 = make_blocks(&[10, 20]);
972        let w1 = indexer.intern_worker("http://w1:8000");
973        let mut wb1 = WorkerBlockMap::default();
974        indexer.apply_stored(w1, &blocks1, None, &mut wb1).unwrap();
975
976        let blocks2 = make_blocks(&[30]);
977        let result = indexer.apply_stored(w1, &blocks2, Some(SequenceHash(999_999)), &mut wb1);
978        assert!(matches!(result, Err(ApplyError::ParentBlockNotFound)));
979    }
980
981    #[test]
982    fn test_remove_missing_block_is_noop() {
983        let indexer = PositionalIndexer::new(64);
984        let blocks = make_blocks(&[10, 20, 30]);
985        let w1 = indexer.intern_worker("http://w1:8000");
986        let mut wb1 = WorkerBlockMap::default();
987        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
988
989        indexer.apply_removed(w1, &[SequenceHash(999)], &mut wb1);
990        assert_eq!(indexer.current_size(), 3);
991    }
992
993    #[test]
994    fn test_remove_unknown_worker_is_noop() {
995        let indexer = PositionalIndexer::new(64);
996        let w1 = indexer.intern_worker("http://unknown:8000");
997        let mut wb1 = WorkerBlockMap::default();
998        indexer.apply_removed(w1, &[SequenceHash(1)], &mut wb1);
999    }
1000
1001    #[test]
1002    fn test_remove_worker() {
1003        let indexer = PositionalIndexer::new(64);
1004        let blocks = make_blocks(&[10, 20, 30]);
1005        let w1 = indexer.intern_worker("http://w1:8000");
1006        let mut wb1 = WorkerBlockMap::default();
1007        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1008        indexer.remove_worker(w1, wb1);
1009
1010        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1011        assert!(scores.scores.is_empty());
1012        assert_eq!(indexer.current_size(), 0);
1013    }
1014
1015    #[test]
1016    fn test_multiple_workers_same_position() {
1017        let indexer = PositionalIndexer::new(64);
1018        let w1 = indexer.intern_worker("http://w1:8000");
1019        let w2 = indexer.intern_worker("http://w2:8000");
1020        let w3 = indexer.intern_worker("http://w3:8000");
1021        let mut wb1 = WorkerBlockMap::default();
1022        let mut wb2 = WorkerBlockMap::default();
1023        let mut wb3 = WorkerBlockMap::default();
1024        indexer
1025            .apply_stored(w1, &make_blocks(&[10]), None, &mut wb1)
1026            .unwrap();
1027        indexer
1028            .apply_stored(w2, &make_blocks(&[10]), None, &mut wb2)
1029            .unwrap();
1030        indexer
1031            .apply_stored(w3, &make_blocks(&[10]), None, &mut wb3)
1032            .unwrap();
1033
1034        let scores = indexer.find_matches(&hashes(&[10]), false);
1035        assert_eq!(scores.scores.get(&w1), Some(&1));
1036        assert_eq!(scores.scores.get(&w2), Some(&1));
1037        assert_eq!(scores.scores.get(&w3), Some(&1));
1038    }
1039
1040    #[test]
1041    fn test_empty_blocks_is_noop() {
1042        let indexer = PositionalIndexer::new(64);
1043        let w1 = indexer.intern_worker("http://w1:8000");
1044        let mut wb1 = WorkerBlockMap::default();
1045        indexer.apply_stored(w1, &[], None, &mut wb1).unwrap();
1046        assert_eq!(indexer.current_size(), 0);
1047    }
1048
1049    #[test]
1050    fn test_single_block_sequence() {
1051        let indexer = PositionalIndexer::new(64);
1052        let blocks = make_blocks(&[42]);
1053        let w1 = indexer.intern_worker("http://w1:8000");
1054        let mut wb1 = WorkerBlockMap::default();
1055        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1056
1057        let scores = indexer.find_matches(&hashes(&[42]), false);
1058        assert_eq!(scores.scores.get(&w1), Some(&1));
1059    }
1060
1061    #[test]
1062    fn test_request_content_hash_chunking() {
1063        let hashes = compute_request_content_hashes(&[1, 2, 3, 4, 5, 6, 7, 8], 4);
1064        assert_eq!(hashes.len(), 2);
1065        assert_eq!(hashes[0], compute_content_hash(&[1, 2, 3, 4]));
1066        assert_eq!(hashes[1], compute_content_hash(&[5, 6, 7, 8]));
1067    }
1068
1069    #[test]
1070    fn test_request_content_hash_zero_block_size() {
1071        let hashes = compute_request_content_hashes(&[1, 2, 3], 0);
1072        assert!(hashes.is_empty());
1073    }
1074
1075    // -----------------------------------------------------------------------
1076    // Jump search edge cases
1077    // -----------------------------------------------------------------------
1078
1079    #[test]
1080    fn test_jump_search_long_prefix() {
1081        let indexer = PositionalIndexer::new(4); // small jump_size to exercise jump logic
1082        let values: Vec<u64> = (1..=20).collect();
1083        let blocks = make_blocks(&values);
1084        let w1 = indexer.intern_worker("http://w1:8000");
1085        let mut wb1 = WorkerBlockMap::default();
1086        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1087
1088        let scores = indexer.find_matches(&hashes(&values), false);
1089        assert_eq!(scores.scores.get(&w1), Some(&20));
1090    }
1091
1092    #[test]
1093    fn test_jump_search_worker_drains_mid_jump() {
1094        let indexer = PositionalIndexer::new(4);
1095        // w1 has 10 blocks, w2 has 6
1096        let values_w1: Vec<u64> = (1..=10).collect();
1097        let values_w2: Vec<u64> = (1..=6).collect();
1098        let w1 = indexer.intern_worker("http://w1:8000");
1099        let w2 = indexer.intern_worker("http://w2:8000");
1100        let mut wb1 = WorkerBlockMap::default();
1101        let mut wb2 = WorkerBlockMap::default();
1102        indexer
1103            .apply_stored(w1, &make_blocks(&values_w1), None, &mut wb1)
1104            .unwrap();
1105        indexer
1106            .apply_stored(w2, &make_blocks(&values_w2), None, &mut wb2)
1107            .unwrap();
1108
1109        let query: Vec<u64> = (1..=10).collect();
1110        let scores = indexer.find_matches(&hashes(&query), false);
1111        assert_eq!(scores.scores.get(&w1), Some(&10));
1112        assert_eq!(scores.scores.get(&w2), Some(&6));
1113    }
1114
1115    #[test]
1116    fn test_jump_search_multiple_drains() {
1117        let indexer = PositionalIndexer::new(3);
1118        // w1: 12, w2: 7, w3: 4
1119        let v1: Vec<u64> = (1..=12).collect();
1120        let v2: Vec<u64> = (1..=7).collect();
1121        let v3: Vec<u64> = (1..=4).collect();
1122        let w1 = indexer.intern_worker("http://w1:8000");
1123        let w2 = indexer.intern_worker("http://w2:8000");
1124        let w3 = indexer.intern_worker("http://w3:8000");
1125        let mut wb1 = WorkerBlockMap::default();
1126        let mut wb2 = WorkerBlockMap::default();
1127        let mut wb3 = WorkerBlockMap::default();
1128        indexer
1129            .apply_stored(w1, &make_blocks(&v1), None, &mut wb1)
1130            .unwrap();
1131        indexer
1132            .apply_stored(w2, &make_blocks(&v2), None, &mut wb2)
1133            .unwrap();
1134        indexer
1135            .apply_stored(w3, &make_blocks(&v3), None, &mut wb3)
1136            .unwrap();
1137
1138        let query: Vec<u64> = (1..=12).collect();
1139        let scores = indexer.find_matches(&hashes(&query), false);
1140        assert_eq!(scores.scores.get(&w1), Some(&12));
1141        assert_eq!(scores.scores.get(&w2), Some(&7));
1142        assert_eq!(scores.scores.get(&w3), Some(&4));
1143    }
1144
1145    #[test]
1146    fn test_concurrent_store_and_match() {
1147        use std::{sync::Arc, thread};
1148
1149        let indexer = Arc::new(PositionalIndexer::new(64));
1150        let indexer_writer = Arc::clone(&indexer);
1151
1152        let writer = thread::spawn(move || {
1153            for i in 0..100u64 {
1154                let blocks = make_blocks(&[i * 10, i * 10 + 1, i * 10 + 2]);
1155                let wid = indexer_writer.intern_worker(&format!("http://w{i}:8000"));
1156                let mut wb = WorkerBlockMap::default();
1157                let _ = indexer_writer.apply_stored(wid, &blocks, None, &mut wb);
1158            }
1159        });
1160
1161        let reader = thread::spawn({
1162            let indexer = Arc::clone(&indexer);
1163            move || {
1164                for _ in 0..1000 {
1165                    let _ = indexer.find_matches(&hashes(&[0, 1, 2, 3, 4]), false);
1166                }
1167            }
1168        });
1169
1170        writer.join().unwrap();
1171        reader.join().unwrap();
1172    }
1173
1174    #[test]
1175    fn test_seq_entry_single_to_multi_upgrade() {
1176        let indexer = PositionalIndexer::new(64);
1177
1178        // Two workers with same content hashes but different rolling prefixes
1179        // Worker 1: blocks at position 0 with content_hash=10
1180        let blocks_w1 = vec![StoredBlock {
1181            seq_hash: SequenceHash(100),
1182            content_hash: ContentHash(10),
1183        }];
1184        let w1 = indexer.intern_worker("http://w1:8000");
1185        let w2 = indexer.intern_worker("http://w2:8000");
1186        let mut wb1 = WorkerBlockMap::default();
1187        let mut wb2 = WorkerBlockMap::default();
1188        indexer
1189            .apply_stored(w1, &blocks_w1, None, &mut wb1)
1190            .unwrap();
1191
1192        // Worker 2: same content_hash but different seq_hash
1193        // Both start at position 0, so prefix_hash == content_hash.0 for both
1194        // This means they share the same prefix_hash → Single entry, both workers in set
1195        let blocks_w2 = vec![StoredBlock {
1196            seq_hash: SequenceHash(200),
1197            content_hash: ContentHash(10),
1198        }];
1199        indexer
1200            .apply_stored(w2, &blocks_w2, None, &mut wb2)
1201            .unwrap();
1202
1203        let scores = indexer.find_matches(&hashes(&[10]), false);
1204        assert_eq!(scores.scores.get(&w1), Some(&1));
1205        assert_eq!(scores.scores.get(&w2), Some(&1));
1206    }
1207
1208    #[test]
1209    fn test_seq_entry_distinct_prefix_same_content() {
1210        let indexer = PositionalIndexer::new(64);
1211
1212        // Worker 1: position 0 = content 10, position 1 = content 99
1213        // Prefix at pos 1 = XXH3(10 || 99)
1214        let blocks_w1 = make_blocks(&[10, 99]);
1215        let w1 = indexer.intern_worker("http://w1:8000");
1216        let w2 = indexer.intern_worker("http://w2:8000");
1217        let mut wb1 = WorkerBlockMap::default();
1218        let mut wb2 = WorkerBlockMap::default();
1219        indexer
1220            .apply_stored(w1, &blocks_w1, None, &mut wb1)
1221            .unwrap();
1222
1223        // Worker 2: position 0 = content 20, position 1 = content 99
1224        // Prefix at pos 1 = XXH3(20 || 99) ← different because position 0 differs
1225        let blocks_w2 = make_blocks(&[20, 99]);
1226        indexer
1227            .apply_stored(w2, &blocks_w2, None, &mut wb2)
1228            .unwrap();
1229
1230        // Query [10, 99] should only match w1
1231        let scores = indexer.find_matches(&hashes(&[10, 99]), false);
1232        assert_eq!(scores.scores.get(&w1), Some(&2));
1233        // w2 has a different prefix at position 0, so it won't be in initial active set
1234
1235        // Query [20, 99] should only match w2
1236        let scores = indexer.find_matches(&hashes(&[20, 99]), false);
1237        assert_eq!(scores.scores.get(&w2), Some(&2));
1238    }
1239
1240    // -----------------------------------------------------------------------
1241    // early_exit tests
1242    // -----------------------------------------------------------------------
1243
1244    #[test]
1245    fn test_early_exit_returns_score_one() {
1246        let indexer = PositionalIndexer::new(64);
1247        let blocks = make_blocks(&[10, 20, 30]);
1248        let w1 = indexer.intern_worker("http://w1:8000");
1249        let mut wb1 = WorkerBlockMap::default();
1250        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1251
1252        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), true);
1253        // early_exit: score is 1 (matched at position 0), not full depth
1254        assert_eq!(scores.scores.get(&w1), Some(&1));
1255        // tree_sizes still populated
1256        assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
1257    }
1258
1259    #[test]
1260    fn test_early_exit_no_match() {
1261        let indexer = PositionalIndexer::new(64);
1262        let blocks = make_blocks(&[10, 20, 30]);
1263        let w1 = indexer.intern_worker("http://w1:8000");
1264        let mut wb1 = WorkerBlockMap::default();
1265        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1266
1267        let scores = indexer.find_matches(&hashes(&[99, 88]), true);
1268        assert!(scores.scores.is_empty());
1269    }
1270
1271    // -----------------------------------------------------------------------
1272    // worker_id tests
1273    // -----------------------------------------------------------------------
1274
1275    #[test]
1276    fn test_worker_id_unknown() {
1277        let indexer = PositionalIndexer::default();
1278        assert!(indexer.worker_id("http://unknown:8000").is_none());
1279    }
1280
1281    #[test]
1282    fn test_worker_id_after_store() {
1283        let indexer = PositionalIndexer::default();
1284        let w1 = indexer.intern_worker("http://w1:8000");
1285        let mut wb1 = WorkerBlockMap::default();
1286        indexer
1287            .apply_stored(w1, &make_blocks(&[10]), None, &mut wb1)
1288            .unwrap();
1289        assert!(indexer.worker_id("http://w1:8000").is_some());
1290    }
1291
1292    // -----------------------------------------------------------------------
1293    // Atomic tree_sizes consistency
1294    // -----------------------------------------------------------------------
1295
1296    #[test]
1297    fn test_tree_sizes_after_store_and_remove() {
1298        let indexer = PositionalIndexer::new(64);
1299        let blocks = make_blocks(&[10, 20, 30, 40, 50]);
1300        let w1 = indexer.intern_worker("http://w1:8000");
1301        let mut wb1 = WorkerBlockMap::default();
1302        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1303        assert_eq!(indexer.current_size(), 5);
1304
1305        // Remove 2 blocks
1306        indexer.apply_removed(w1, &[blocks[3].seq_hash, blocks[4].seq_hash], &mut wb1);
1307        assert_eq!(indexer.current_size(), 3);
1308
1309        // Verify tree_sizes in query results
1310        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1311        assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
1312    }
1313
1314    #[test]
1315    fn test_duplicate_store_does_not_inflate_tree_size() {
1316        let indexer = PositionalIndexer::new(64);
1317        let blocks = make_blocks(&[10, 20, 30]);
1318        let w1 = indexer.intern_worker("http://w1:8000");
1319        let mut wb1 = WorkerBlockMap::default();
1320
1321        // First store: 3 new blocks
1322        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1323        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1324        assert_eq!(scores.tree_sizes.get(&w1), Some(&3));
1325
1326        // Replay the same store event — tree_size must not change
1327        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1328        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1329        assert_eq!(
1330            scores.tree_sizes.get(&w1),
1331            Some(&3),
1332            "Duplicate store event must not inflate tree_size"
1333        );
1334
1335        // Overlap scores should also be unchanged
1336        assert_eq!(scores.scores.get(&w1), Some(&3));
1337    }
1338
1339    #[test]
1340    fn test_remove_worker_nonexistent_is_noop() {
1341        let indexer = PositionalIndexer::default();
1342        let w = indexer.intern_worker("http://ghost:8000");
1343        indexer.remove_worker(w, WorkerBlockMap::default()); // no-op, no panic
1344        assert_eq!(indexer.current_size(), 0);
1345    }
1346
1347    #[test]
1348    fn test_concurrent_read_write() {
1349        let indexer = Arc::new(PositionalIndexer::new(4));
1350        let content: Vec<u64> = (1..=20).collect();
1351        let blocks = make_blocks(&content);
1352        let w1 = indexer.intern_worker("http://w1:8000");
1353        let mut wb1 = WorkerBlockMap::default();
1354        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1355
1356        let mut handles = Vec::new();
1357
1358        // Spawn readers
1359        for _ in 0..4 {
1360            let idx = Arc::clone(&indexer);
1361            let ch = hashes(&content);
1362            handles.push(std::thread::spawn(move || {
1363                for _ in 0..100 {
1364                    let scores = idx.find_matches(&ch, false);
1365                    let w1 = idx.worker_id("http://w1:8000").unwrap();
1366                    assert!(scores.scores.contains_key(&w1));
1367                }
1368            }));
1369        }
1370
1371        // Spawn writers (add new workers concurrently)
1372        for i in 0..4 {
1373            let idx = Arc::clone(&indexer);
1374            let worker_content: Vec<u64> = (1..=5).collect();
1375            handles.push(std::thread::spawn(move || {
1376                let worker = format!("http://writer{i}:8000");
1377                let wid = idx.intern_worker(&worker);
1378                let mut wb = WorkerBlockMap::default();
1379                let blks = make_blocks(&worker_content);
1380                for _ in 0..50 {
1381                    idx.apply_stored(wid, &blks, None, &mut wb).unwrap();
1382                }
1383            }));
1384        }
1385
1386        for handle in handles {
1387            handle.join().unwrap();
1388        }
1389
1390        // w1 should still be matchable
1391        let scores = indexer.find_matches(&hashes(&content), false);
1392        assert_eq!(scores.scores.get(&w1), Some(&20));
1393    }
1394
1395    #[test]
1396    fn test_dashmap_cleanup_no_memory_leak() {
1397        let indexer = PositionalIndexer::default();
1398        let blocks = make_blocks(&[10, 20, 30]);
1399        let w1 = indexer.intern_worker("http://w1:8000");
1400        let w2 = indexer.intern_worker("http://w2:8000");
1401        let mut wb1 = WorkerBlockMap::default();
1402        let mut wb2 = WorkerBlockMap::default();
1403        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1404        indexer.apply_stored(w2, &blocks, None, &mut wb2).unwrap();
1405
1406        assert!(!indexer.index.is_empty());
1407
1408        indexer.remove_worker(w1, wb1);
1409        assert!(!indexer.index.is_empty());
1410
1411        indexer.remove_worker(w2, wb2);
1412        assert_eq!(indexer.index.len(), 0);
1413    }
1414
1415    #[test]
1416    fn test_compute_content_hash_empty_tokens() {
1417        let hash = compute_content_hash(&[]);
1418        let hash2 = compute_content_hash(&[]);
1419        assert_eq!(hash, hash2);
1420    }
1421
1422    #[test]
1423    fn test_compute_content_hash_single_token() {
1424        let hash = compute_content_hash(&[42]);
1425        assert_ne!(hash, compute_content_hash(&[43]));
1426    }
1427
1428    #[test]
1429    fn test_seq_hash_rolling_correctness() {
1430        let content = vec![10u64, 20, 30, 40, 50];
1431        let blocks = make_blocks(&content);
1432        let content_hashes = hashes(&content);
1433
1434        let mut seq_hashes: Vec<SequenceHash> = Vec::new();
1435        PositionalIndexer::ensure_seq_hash_computed(&mut seq_hashes, 4, &content_hashes);
1436
1437        for (i, block) in blocks.iter().enumerate() {
1438            assert_eq!(
1439                seq_hashes[i], block.seq_hash,
1440                "seq_hash mismatch at position {i}"
1441            );
1442        }
1443    }
1444
1445    #[test]
1446    fn test_query_prefix_of_stored() {
1447        let indexer = PositionalIndexer::default();
1448        let blocks = make_blocks(&[10, 20, 30, 40, 50]);
1449        let w1 = indexer.intern_worker("http://w1:8000");
1450        let mut wb1 = WorkerBlockMap::default();
1451        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1452
1453        let scores = indexer.find_matches(&hashes(&[10, 20]), false);
1454        assert_eq!(scores.scores.get(&w1), Some(&2));
1455        assert_eq!(scores.tree_sizes.get(&w1), Some(&5));
1456    }
1457
1458    #[test]
1459    fn test_disjoint_workers_no_shared_prefix() {
1460        let indexer = PositionalIndexer::default();
1461        let blocks_w1 = make_blocks(&[10, 20, 30]);
1462        let blocks_w2 = make_blocks(&[99, 88, 77]);
1463        let w1 = indexer.intern_worker("http://w1:8000");
1464        let w2 = indexer.intern_worker("http://w2:8000");
1465        let mut wb1 = WorkerBlockMap::default();
1466        let mut wb2 = WorkerBlockMap::default();
1467        indexer
1468            .apply_stored(w1, &blocks_w1, None, &mut wb1)
1469            .unwrap();
1470        indexer
1471            .apply_stored(w2, &blocks_w2, None, &mut wb2)
1472            .unwrap();
1473
1474        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1475        assert_eq!(scores.scores.get(&w1), Some(&3));
1476        assert!(!scores.scores.contains_key(&w2));
1477
1478        let scores = indexer.find_matches(&hashes(&[99, 88, 77]), false);
1479        assert!(!scores.scores.contains_key(&w1));
1480        assert_eq!(scores.scores.get(&w2), Some(&3));
1481    }
1482
1483    #[test]
1484    #[should_panic(expected = "jump_size must be greater than 0")]
1485    fn test_zero_jump_size_panics() {
1486        let _ = PositionalIndexer::new(0);
1487    }
1488
1489    #[test]
1490    fn test_current_size_across_operations() {
1491        let indexer = PositionalIndexer::default();
1492        assert_eq!(indexer.current_size(), 0);
1493
1494        let blocks = make_blocks(&[10, 20, 30]);
1495        let w1 = indexer.intern_worker("http://w1:8000");
1496        let w2 = indexer.intern_worker("http://w2:8000");
1497        let mut wb1 = WorkerBlockMap::default();
1498        let mut wb2 = WorkerBlockMap::default();
1499        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1500        assert_eq!(indexer.current_size(), 3);
1501
1502        indexer.apply_stored(w2, &blocks, None, &mut wb2).unwrap();
1503        assert_eq!(indexer.current_size(), 6);
1504
1505        indexer.apply_removed(w1, &[blocks[2].seq_hash], &mut wb1);
1506        assert_eq!(indexer.current_size(), 5);
1507
1508        indexer.apply_cleared(w2, &mut wb2);
1509        assert_eq!(indexer.current_size(), 2);
1510
1511        indexer.remove_worker(w1, wb1);
1512        assert_eq!(indexer.current_size(), 0);
1513    }
1514
1515    // -----------------------------------------------------------------------
1516    // compute_request_content_hashes tests
1517    // -----------------------------------------------------------------------
1518
1519    #[test]
1520    fn test_request_hashes_basic() {
1521        let tokens: Vec<u32> = (1..=8).collect();
1522        let hashes = compute_request_content_hashes(&tokens, 4);
1523        assert_eq!(hashes.len(), 2);
1524        assert_eq!(hashes[0], compute_content_hash(&[1, 2, 3, 4]));
1525        assert_eq!(hashes[1], compute_content_hash(&[5, 6, 7, 8]));
1526    }
1527
1528    #[test]
1529    fn test_request_hashes_partial_trailing_chunk_discarded() {
1530        let tokens: Vec<u32> = (1..=10).collect();
1531        let hashes = compute_request_content_hashes(&tokens, 4);
1532        assert_eq!(hashes.len(), 2);
1533    }
1534
1535    #[test]
1536    fn test_request_hashes_fewer_than_block_size() {
1537        let hashes = compute_request_content_hashes(&[1, 2, 3], 4);
1538        assert!(hashes.is_empty());
1539    }
1540
1541    #[test]
1542    fn test_request_hashes_empty_tokens() {
1543        let hashes = compute_request_content_hashes(&[], 16);
1544        assert!(hashes.is_empty());
1545    }
1546
1547    #[test]
1548    fn test_request_hashes_exact_multiple() {
1549        let tokens: Vec<u32> = (1..=6).collect();
1550        let hashes = compute_request_content_hashes(&tokens, 2);
1551        assert_eq!(hashes.len(), 3);
1552    }
1553
1554    #[test]
1555    fn test_request_hashes_zero_block_size_returns_empty() {
1556        let hashes = compute_request_content_hashes(&[1, 2, 3], 0);
1557        assert!(hashes.is_empty());
1558    }
1559
1560    #[test]
1561    fn test_request_hashes_block_size_1() {
1562        let tokens = vec![10u32, 20, 30];
1563        let hashes = compute_request_content_hashes(&tokens, 1);
1564        assert_eq!(hashes.len(), 3);
1565        assert_eq!(hashes[0], compute_content_hash(&[10]));
1566        assert_eq!(hashes[1], compute_content_hash(&[20]));
1567        assert_eq!(hashes[2], compute_content_hash(&[30]));
1568    }
1569
1570    // -----------------------------------------------------------------------
1571    // End-to-end: store events → query with compute_request_content_hashes
1572    // -----------------------------------------------------------------------
1573
1574    #[test]
1575    fn test_end_to_end_store_and_query() {
1576        let indexer = PositionalIndexer::default();
1577        let block_size = 4;
1578        let tokens: Vec<u32> = (1..=16).collect();
1579
1580        let content_hashes: Vec<ContentHash> = tokens
1581            .chunks(block_size)
1582            .map(compute_content_hash)
1583            .collect();
1584
1585        let blocks: Vec<StoredBlock> = content_hashes
1586            .iter()
1587            .enumerate()
1588            .map(|(i, &ch)| StoredBlock {
1589                seq_hash: SequenceHash(0xBEEF_0000 + i as u64),
1590                content_hash: ch,
1591            })
1592            .collect();
1593
1594        let w1 = indexer.intern_worker("http://w1:8000");
1595        let mut wb1 = WorkerBlockMap::default();
1596        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1597
1598        let query_hashes = compute_request_content_hashes(&tokens, block_size);
1599        let scores = indexer.find_matches(&query_hashes, false);
1600        assert_eq!(scores.scores.get(&w1), Some(&4));
1601    }
1602
1603    #[test]
1604    fn test_end_to_end_partial_overlap() {
1605        let indexer = PositionalIndexer::default();
1606        let block_size = 4;
1607
1608        let cached_tokens: Vec<u32> = (1..=8).collect();
1609        let blocks: Vec<StoredBlock> = cached_tokens
1610            .chunks(block_size)
1611            .enumerate()
1612            .map(|(i, chunk)| StoredBlock {
1613                seq_hash: SequenceHash(i as u64 + 1),
1614                content_hash: compute_content_hash(chunk),
1615            })
1616            .collect();
1617        let w1 = indexer.intern_worker("http://w1:8000");
1618        let mut wb1 = WorkerBlockMap::default();
1619        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1620
1621        let query_tokens: Vec<u32> = (1..=16).collect();
1622        let query_hashes = compute_request_content_hashes(&query_tokens, block_size);
1623        let scores = indexer.find_matches(&query_hashes, false);
1624        assert_eq!(scores.scores.get(&w1), Some(&2));
1625        assert_eq!(scores.tree_sizes.get(&w1), Some(&2));
1626    }
1627
1628    #[test]
1629    fn test_end_to_end_different_backends_same_content() {
1630        let indexer = PositionalIndexer::new(4);
1631        let block_size = 4;
1632        let tokens: Vec<u32> = (1..=8).collect();
1633        let content_hashes: Vec<ContentHash> = tokens
1634            .chunks(block_size)
1635            .map(compute_content_hash)
1636            .collect();
1637
1638        let blocks_w1: Vec<StoredBlock> = content_hashes
1639            .iter()
1640            .enumerate()
1641            .map(|(i, &ch)| StoredBlock {
1642                seq_hash: SequenceHash(0xAAAA_0000 + i as u64),
1643                content_hash: ch,
1644            })
1645            .collect();
1646
1647        let blocks_w2: Vec<StoredBlock> = content_hashes
1648            .iter()
1649            .enumerate()
1650            .map(|(i, &ch)| StoredBlock {
1651                seq_hash: SequenceHash(0xBBBB_0000 + i as u64),
1652                content_hash: ch,
1653            })
1654            .collect();
1655
1656        let sglang = indexer.intern_worker("http://sglang:8000");
1657        let vllm = indexer.intern_worker("http://vllm:8000");
1658        let mut wb_sg = WorkerBlockMap::default();
1659        let mut wb_vl = WorkerBlockMap::default();
1660        indexer
1661            .apply_stored(sglang, &blocks_w1, None, &mut wb_sg)
1662            .unwrap();
1663        indexer
1664            .apply_stored(vllm, &blocks_w2, None, &mut wb_vl)
1665            .unwrap();
1666
1667        let query_hashes = compute_request_content_hashes(&tokens, block_size);
1668        let scores = indexer.find_matches(&query_hashes, false);
1669        assert_eq!(scores.scores.get(&sglang), Some(&2));
1670        assert_eq!(scores.scores.get(&vllm), Some(&2));
1671    }
1672
1673    // -----------------------------------------------------------------------
1674    // Jump boundary tests
1675    // -----------------------------------------------------------------------
1676
1677    /// Helper: store a sequence for a worker via chained continuations of `chunk_size` blocks.
1678    fn store_via_continuations(
1679        indexer: &PositionalIndexer,
1680        worker: &str,
1681        content: &[u64],
1682        chunk_size: usize,
1683        worker_blocks: &mut WorkerBlockMap,
1684    ) {
1685        let worker_id = indexer.intern_worker(worker);
1686        let all_blocks = make_blocks(content);
1687        let mut offset = 0;
1688        let mut parent: Option<SequenceHash> = None;
1689        while offset < all_blocks.len() {
1690            let end = (offset + chunk_size).min(all_blocks.len());
1691            let chunk = &all_blocks[offset..end];
1692            indexer
1693                .apply_stored(worker_id, chunk, parent, worker_blocks)
1694                .unwrap();
1695            parent = Some(chunk.last().unwrap().seq_hash);
1696            offset = end;
1697        }
1698    }
1699
1700    #[test]
1701    fn test_divergence_at_jump_boundaries() {
1702        let indexer = PositionalIndexer::new(32);
1703        let full: Vec<u64> = (1..=128).collect();
1704        let full_blocks = make_blocks(&full);
1705        let full_id = indexer.intern_worker("http://full:8000");
1706        let mut wb_full = WorkerBlockMap::default();
1707        indexer
1708            .apply_stored(full_id, &full_blocks, None, &mut wb_full)
1709            .unwrap();
1710
1711        for &depth in &[31, 32, 33] {
1712            let partial_blocks = make_blocks(&full[..depth]);
1713            let worker = format!("http://depth{depth}:8000");
1714            let wid = indexer.intern_worker(&worker);
1715            let mut wb = WorkerBlockMap::default();
1716            indexer
1717                .apply_stored(wid, &partial_blocks, None, &mut wb)
1718                .unwrap();
1719        }
1720
1721        for &depth in &[63, 64, 65] {
1722            let partial_blocks = make_blocks(&full[..depth]);
1723            let worker = format!("http://depth{depth}:8000");
1724            let wid = indexer.intern_worker(&worker);
1725            let mut wb = WorkerBlockMap::default();
1726            indexer
1727                .apply_stored(wid, &partial_blocks, None, &mut wb)
1728                .unwrap();
1729        }
1730
1731        let scores = indexer.find_matches(&hashes(&full), false);
1732        assert_eq!(scores.scores.get(&full_id), Some(&128));
1733        for &depth in &[31u64, 32, 33, 63, 64, 65] {
1734            let worker = format!("http://depth{depth}:8000");
1735            let wid = indexer.worker_id(&worker).unwrap();
1736            assert_eq!(scores.scores.get(&wid), Some(&(depth as u32)));
1737        }
1738    }
1739
1740    #[test]
1741    fn test_exact_jump_size_sequences() {
1742        let indexer = PositionalIndexer::new(32);
1743
1744        for &len in &[32, 64, 96] {
1745            let content: Vec<u64> = (1..=len as u64).collect();
1746            let blocks = make_blocks(&content);
1747            let worker = format!("http://len{len}:8000");
1748            let wid = indexer.intern_worker(&worker);
1749            let mut wb = WorkerBlockMap::default();
1750            indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
1751
1752            let scores = indexer.find_matches(&hashes(&content), false);
1753            assert_eq!(
1754                scores.scores.get(&wid),
1755                Some(&(len as u32)),
1756                "exact match failed for sequence length {len}"
1757            );
1758        }
1759    }
1760
1761    #[test]
1762    fn test_off_by_one_jump_boundaries() {
1763        let indexer = PositionalIndexer::new(32);
1764        let full: Vec<u64> = (1..=128).collect();
1765
1766        for &len in &[31, 33, 63, 65, 95, 97] {
1767            let content = &full[..len];
1768            let blocks = make_blocks(content);
1769            let worker = format!("http://len{len}:8000");
1770            let wid = indexer.intern_worker(&worker);
1771            let mut wb = WorkerBlockMap::default();
1772            indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
1773
1774            let scores = indexer.find_matches(&hashes(content), false);
1775            assert_eq!(
1776                scores.scores.get(&wid),
1777                Some(&(len as u32)),
1778                "exact match failed for sequence length {len}"
1779            );
1780        }
1781    }
1782
1783    #[test]
1784    fn test_staggered_workers_across_jump_boundaries() {
1785        let indexer = PositionalIndexer::new(32);
1786        let full: Vec<u64> = (1..=100).collect();
1787
1788        let depths = [10, 20, 35, 64, 100];
1789        for &depth in &depths {
1790            let blocks = make_blocks(&full[..depth]);
1791            let worker = format!("http://w{depth}:8000");
1792            let wid = indexer.intern_worker(&worker);
1793            let mut wb = WorkerBlockMap::default();
1794            indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
1795        }
1796
1797        let scores = indexer.find_matches(&hashes(&full), false);
1798        for &depth in &depths {
1799            let worker = format!("http://w{depth}:8000");
1800            let wid = indexer.worker_id(&worker).unwrap();
1801            assert_eq!(
1802                scores.scores.get(&wid),
1803                Some(&(depth as u32)),
1804                "worker at depth {depth} has wrong score"
1805            );
1806        }
1807    }
1808
1809    #[test]
1810    fn test_shared_prefix_diverge_at_jump_boundary() {
1811        let indexer = PositionalIndexer::new(32);
1812        let shared: Vec<u64> = (1..=40).collect();
1813
1814        let mut content_w1 = shared.clone();
1815        content_w1.extend(1001..=1060);
1816        let blocks_w1 = make_blocks(&content_w1);
1817        let w1 = indexer.intern_worker("http://w1:8000");
1818        let w2 = indexer.intern_worker("http://w2:8000");
1819        let w3 = indexer.intern_worker("http://w3:8000");
1820        let mut wb1 = WorkerBlockMap::default();
1821        let mut wb2 = WorkerBlockMap::default();
1822        let mut wb3 = WorkerBlockMap::default();
1823        indexer
1824            .apply_stored(w1, &blocks_w1, None, &mut wb1)
1825            .unwrap();
1826
1827        let mut content_w2 = shared.clone();
1828        content_w2.extend(2001..=2020);
1829        let blocks_w2 = make_blocks(&content_w2);
1830        indexer
1831            .apply_stored(w2, &blocks_w2, None, &mut wb2)
1832            .unwrap();
1833
1834        let blocks_w3 = make_blocks(&shared);
1835        indexer
1836            .apply_stored(w3, &blocks_w3, None, &mut wb3)
1837            .unwrap();
1838
1839        let scores = indexer.find_matches(&hashes(&content_w1), false);
1840        assert_eq!(scores.scores.get(&w1), Some(&100));
1841        assert_eq!(scores.scores.get(&w2), Some(&40));
1842        assert_eq!(scores.scores.get(&w3), Some(&40));
1843    }
1844
1845    #[test]
1846    fn test_very_long_sequence() {
1847        let indexer = PositionalIndexer::new(64);
1848        let content: Vec<u64> = (1..=1000).collect();
1849        let blocks = make_blocks(&content);
1850        let w1 = indexer.intern_worker("http://w1:8000");
1851        let mut wb1 = WorkerBlockMap::default();
1852        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1853
1854        let scores = indexer.find_matches(&hashes(&content), false);
1855        assert_eq!(scores.scores.get(&w1), Some(&1000));
1856
1857        let scores = indexer.find_matches(&hashes(&content[..500]), false);
1858        assert_eq!(scores.scores.get(&w1), Some(&500));
1859
1860        let mut divergent = content[..499].to_vec();
1861        divergent.push(999999);
1862        let scores = indexer.find_matches(&hashes(&divergent), false);
1863        assert_eq!(scores.scores.get(&w1), Some(&499));
1864    }
1865
1866    // -----------------------------------------------------------------------
1867    // Deep continuation chain tests
1868    // -----------------------------------------------------------------------
1869
1870    #[test]
1871    fn test_deep_continuation_chain() {
1872        let indexer = PositionalIndexer::new(64);
1873        let content: Vec<u64> = (1..=200).collect();
1874        let mut wb1 = WorkerBlockMap::default();
1875        store_via_continuations(&indexer, "http://w1:8000", &content, 10, &mut wb1);
1876
1877        assert_eq!(indexer.current_size(), 200);
1878
1879        let w1 = indexer.worker_id("http://w1:8000").unwrap();
1880        let scores = indexer.find_matches(&hashes(&content), false);
1881        assert_eq!(scores.scores.get(&w1), Some(&200));
1882
1883        let scores = indexer.find_matches(&hashes(&content[..150]), false);
1884        assert_eq!(scores.scores.get(&w1), Some(&150));
1885    }
1886
1887    #[test]
1888    fn test_continuation_chain_with_multiple_workers() {
1889        let indexer = PositionalIndexer::new(32);
1890        let content: Vec<u64> = (1..=100).collect();
1891
1892        let mut wb1 = WorkerBlockMap::default();
1893        let mut wb2 = WorkerBlockMap::default();
1894        store_via_continuations(&indexer, "http://w1:8000", &content, 10, &mut wb1);
1895        store_via_continuations(&indexer, "http://w2:8000", &content[..50], 10, &mut wb2);
1896
1897        let w1 = indexer.worker_id("http://w1:8000").unwrap();
1898        let w2 = indexer.worker_id("http://w2:8000").unwrap();
1899        let scores = indexer.find_matches(&hashes(&content), false);
1900        assert_eq!(scores.scores.get(&w1), Some(&100));
1901        assert_eq!(scores.scores.get(&w2), Some(&50));
1902    }
1903
1904    #[test]
1905    fn test_multiple_disjoint_sequences_per_worker() {
1906        let indexer = PositionalIndexer::new(64);
1907        let w1 = indexer.intern_worker("http://w1:8000");
1908        let mut wb1 = WorkerBlockMap::default();
1909
1910        let blocks1 = make_blocks(&[10, 20, 30]);
1911        indexer.apply_stored(w1, &blocks1, None, &mut wb1).unwrap();
1912
1913        let blocks2 = make_blocks(&[100, 200, 300, 400]);
1914        indexer.apply_stored(w1, &blocks2, None, &mut wb1).unwrap();
1915
1916        let scores = indexer.find_matches(&hashes(&[100, 200, 300, 400]), false);
1917        assert_eq!(scores.scores.get(&w1), Some(&4));
1918
1919        let scores = indexer.find_matches(&hashes(&[10, 20, 30]), false);
1920        assert_eq!(scores.scores.get(&w1), Some(&3));
1921    }
1922
1923    // -----------------------------------------------------------------------
1924    // Long sequence partial removal and stale entry tests
1925    // -----------------------------------------------------------------------
1926
1927    #[test]
1928    fn test_long_sequence_partial_removal() {
1929        let indexer = PositionalIndexer::new(32);
1930        let content: Vec<u64> = (1..=100).collect();
1931        let blocks = make_blocks(&content);
1932        let w1 = indexer.intern_worker("http://w1:8000");
1933        let mut wb1 = WorkerBlockMap::default();
1934        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1935
1936        let to_remove: Vec<SequenceHash> = blocks[80..].iter().map(|b| b.seq_hash).collect();
1937        indexer.apply_removed(w1, &to_remove, &mut wb1);
1938
1939        assert_eq!(indexer.current_size(), 80);
1940
1941        let scores = indexer.find_matches(&hashes(&content), false);
1942        assert_eq!(scores.scores.get(&w1), Some(&80));
1943
1944        let scores = indexer.find_matches(&hashes(&content[..80]), false);
1945        assert_eq!(scores.scores.get(&w1), Some(&80));
1946    }
1947
1948    #[test]
1949    fn test_remove_parent_does_not_cascade() {
1950        let indexer = PositionalIndexer::new(1);
1951        let blocks = make_blocks(&[10, 20, 30, 40, 50]);
1952        let w1 = indexer.intern_worker("http://w1:8000");
1953        let mut wb1 = WorkerBlockMap::default();
1954        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1955
1956        indexer.apply_removed(w1, &[blocks[1].seq_hash], &mut wb1);
1957
1958        assert_eq!(indexer.current_size(), 4);
1959
1960        let scores = indexer.find_matches(&hashes(&[10, 20, 30, 40, 50]), false);
1961        assert_eq!(scores.scores.get(&w1), Some(&1));
1962    }
1963
1964    #[test]
1965    fn test_long_sequence_clear_and_rebuild() {
1966        let indexer = PositionalIndexer::new(32);
1967        let w1 = indexer.intern_worker("http://w1:8000");
1968        let mut wb1 = WorkerBlockMap::default();
1969
1970        let original: Vec<u64> = (1..=100).collect();
1971        let blocks = make_blocks(&original);
1972        indexer.apply_stored(w1, &blocks, None, &mut wb1).unwrap();
1973
1974        indexer.apply_cleared(w1, &mut wb1);
1975        assert_eq!(indexer.current_size(), 0);
1976
1977        let replacement: Vec<u64> = (1001..=1100).collect();
1978        let new_blocks = make_blocks(&replacement);
1979        indexer
1980            .apply_stored(w1, &new_blocks, None, &mut wb1)
1981            .unwrap();
1982
1983        let scores = indexer.find_matches(&hashes(&original), false);
1984        assert!(!scores.scores.contains_key(&w1));
1985
1986        let scores = indexer.find_matches(&hashes(&replacement), false);
1987        assert_eq!(scores.scores.get(&w1), Some(&100));
1988    }
1989
1990    #[test]
1991    fn test_interleaved_long_sequences() {
1992        let indexer = PositionalIndexer::new(32);
1993        let content: Vec<u64> = (1..=100).collect();
1994
1995        let depths = [25, 50, 75, 100];
1996        for &depth in &depths {
1997            let blocks = make_blocks(&content[..depth]);
1998            let worker = format!("http://w{depth}:8000");
1999            let wid = indexer.intern_worker(&worker);
2000            let mut wb = WorkerBlockMap::default();
2001            indexer.apply_stored(wid, &blocks, None, &mut wb).unwrap();
2002        }
2003
2004        let scores = indexer.find_matches(&hashes(&content), false);
2005        for &depth in &depths {
2006            let worker = format!("http://w{depth}:8000");
2007            let wid = indexer.worker_id(&worker).unwrap();
2008            assert_eq!(
2009                scores.scores.get(&wid),
2010                Some(&(depth as u32)),
2011                "worker at depth {depth} has wrong score"
2012            );
2013            assert_eq!(
2014                scores.tree_sizes.get(&wid),
2015                Some(&depth),
2016                "worker at depth {depth} has wrong tree_size"
2017            );
2018        }
2019    }
2020}