Skip to main content

kv_index/
token_tree.rs

1//! Token-based radix tree for gRPC router with pre-tokenized input.
2//!
3//! This implementation uses token IDs (`u32`) instead of characters,
4//! matching SGLang's Python scheduler which operates on token arrays.
5//!
6//! **Page-aligned design**: Following SGLang's radix cache, tokens are grouped
7//! into pages (default 16 tokens). Only page-aligned prefixes are cached.
8//! Sequences shorter than PAGE_SIZE get no cache benefit (matching engine behavior).
9//!
10//! Benefits:
11//! - O(1) page-key comparisons vs O(n) single-token lookups
12//! - Aligned with SGLang's internal KV cache page structure
13//! - Reduced hash table overhead (1 lookup per PAGE_SIZE tokens)
14
15use std::{
16    collections::HashMap,
17    hash::{BuildHasherDefault, Hasher},
18    sync::{
19        atomic::{AtomicI32, AtomicU64, Ordering},
20        Arc, Weak,
21    },
22};
23
24use dashmap::{mapref::entry::Entry, DashMap};
25use once_cell::sync::Lazy;
26use parking_lot::RwLock as ParkingLotRwLock;
27use tracing::debug;
28
29use super::{
30    common::{MatchResult, TenantId},
31    RadixTree,
32};
33
34/// Token ID type (matches SGLang's token representation)
35pub type TokenId = u32;
36
37/// Default page size for token grouping (matches SGLang's default radix cache page size).
38/// SGLang supports: 1, 16, 32, 64, 128 depending on attention backend.
39///
40/// Note: This is a compile-time constant used for the `TokenPageKey` type.
41/// The `TokenTree::with_config()` constructor accepts page_size as a parameter,
42/// but currently only page_size=16 is supported. Future versions may support
43/// other sizes via const generics or dynamic key types.
44pub const PAGE_SIZE: usize = 16;
45
46/// A page of tokens used as the children map key.
47/// Fixed-size array enables efficient hashing and comparison.
48pub type TokenPageKey = [TokenId; PAGE_SIZE];
49
50type NodeRef = Arc<Node>;
51
52/// Shard counts for DashMaps to balance concurrency vs allocation overhead.
53/// Root node has more shards due to higher contention.
54const ROOT_SHARD_COUNT: usize = 32;
55/// Child nodes typically have few entries, minimize shard overhead.
56const NODE_SHARD_COUNT: usize = 4;
57
58/// Eviction policy matching SGLang's scheduler options.
59/// The gateway should use the same policy as the backend worker to stay in sync.
60#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
61pub enum EvictionPolicy {
62    /// Least Recently Used - evict oldest by last access time (default)
63    #[default]
64    Lru,
65    /// Least Frequently Used - evict by (hit_count, last_access_time)
66    Lfu,
67    /// First In First Out - evict oldest by creation time
68    Fifo,
69    /// Most Recently Used - evict newest by last access time
70    Mru,
71    /// First In Last Out (stack) - evict newest by creation time
72    Filo,
73    /// Priority-based - evict by (priority, last_access_time), lower priority first
74    Priority,
75}
76
77/// Align token count to page boundary (truncate to nearest page).
78/// Matches SGLang's: `page_aligned_len = len(key) // page_size * page_size`
79#[inline]
80fn align_to_page(len: usize) -> usize {
81    (len / PAGE_SIZE) * PAGE_SIZE
82}
83
84/// Extract page key from token slice (first PAGE_SIZE tokens).
85/// Panics if tokens.len() < PAGE_SIZE.
86#[inline]
87fn make_page_key(tokens: &[TokenId]) -> TokenPageKey {
88    debug_assert!(tokens.len() >= PAGE_SIZE);
89    let mut key = [0u32; PAGE_SIZE];
90    key.copy_from_slice(&tokens[..PAGE_SIZE]);
91    key
92}
93
94/// A fast hasher for token page keys.
95/// Uses FxHash-style multiplication mixing for excellent distribution.
96#[derive(Default)]
97struct TokenPageHasher(u64);
98
99impl Hasher for TokenPageHasher {
100    #[inline(always)]
101    fn finish(&self) -> u64 {
102        self.0
103    }
104
105    #[inline(always)]
106    fn write(&mut self, bytes: &[u8]) {
107        // Process 4 bytes at a time (each token is u32)
108        for chunk in bytes.chunks(4) {
109            if chunk.len() == 4 {
110                let val = u32::from_ne_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]);
111                // FxHash-style mixing: multiply by golden ratio prime
112                self.0 = self
113                    .0
114                    .wrapping_add(val as u64)
115                    .wrapping_mul(0x517cc1b727220a95);
116            }
117        }
118    }
119}
120
121type TokenPageHasherBuilder = BuildHasherDefault<TokenPageHasher>;
122
123/// Create a children DashMap with page-key lookup
124#[inline]
125fn new_children_map() -> DashMap<TokenPageKey, NodeRef, TokenPageHasherBuilder> {
126    DashMap::with_hasher_and_shard_amount(TokenPageHasherBuilder::default(), NODE_SHARD_COUNT)
127}
128
129/// Create a tenant access time DashMap
130#[inline]
131fn new_tenant_map() -> DashMap<TenantId, u64> {
132    DashMap::with_shard_amount(NODE_SHARD_COUNT)
133}
134
135/// Result of a prefix match operation with token counts.
136#[derive(Debug, Clone)]
137pub struct PrefixMatchResult {
138    /// The tenant that owns the matched prefix
139    pub tenant: TenantId,
140    /// Number of tokens matched
141    pub matched_token_count: usize,
142    /// Total number of tokens in the input
143    pub input_token_count: usize,
144}
145
146impl MatchResult for PrefixMatchResult {
147    fn tenant(&self) -> &TenantId {
148        &self.tenant
149    }
150
151    fn matched_count(&self) -> usize {
152        self.matched_token_count
153    }
154
155    fn input_count(&self) -> usize {
156        self.input_token_count
157    }
158}
159
160/// Global tenant string intern pool to avoid repeated allocations.
161/// Uses DashMap for concurrent access with minimal contention.
162static TENANT_INTERN_POOL: Lazy<DashMap<Arc<str>, ()>> = Lazy::new(DashMap::new);
163
164/// Intern tenant ID to avoid repeated allocations.
165/// Returns cached Arc<str> if tenant was seen before.
166fn intern_tenant(tenant: &str) -> TenantId {
167    // Fast path: check if already interned
168    if let Some(entry) = TENANT_INTERN_POOL.get(tenant) {
169        return Arc::clone(entry.key());
170    }
171
172    // Slow path: intern new tenant
173    let interned: Arc<str> = Arc::from(tenant);
174    TENANT_INTERN_POOL.insert(Arc::clone(&interned), ());
175    interned
176}
177
178/// Global timestamp counter for LRU ordering
179static GLOBAL_TIMESTAMP: AtomicU64 = AtomicU64::new(0);
180
181fn next_timestamp() -> u64 {
182    GLOBAL_TIMESTAMP.fetch_add(1, Ordering::Relaxed)
183}
184
185/// Node in the token-based radix tree.
186/// Uses parking_lot RwLock for better performance (no poisoning, smaller size).
187/// Parent pointers enable O(d) ancestor cleanup during eviction.
188struct Node {
189    /// Token sequence stored at this node (always page-aligned length, multiple of PAGE_SIZE)
190    tokens: ParkingLotRwLock<Vec<TokenId>>,
191    /// Children nodes keyed by first PAGE_SIZE tokens (page key)
192    children: DashMap<TokenPageKey, NodeRef, TokenPageHasherBuilder>,
193    /// Tenants that own this node with last access timestamps
194    tenant_last_access_time: DashMap<TenantId, u64>,
195    /// Cached last tenant for fast access (probabilistic update)
196    last_tenant: ParkingLotRwLock<Option<TenantId>>,
197    /// Parent node (Weak to avoid reference cycles). None for root.
198    parent: ParkingLotRwLock<Weak<Node>>,
199    /// This node's key in parent's children map (for O(1) removal)
200    page_key: ParkingLotRwLock<Option<TokenPageKey>>,
201    /// Hit count for LFU eviction policy (incremented on each access)
202    hit_count: AtomicU64,
203    /// Creation timestamp for FIFO/FILO eviction policies
204    creation_time: u64,
205    /// Priority for priority-based eviction (higher = less likely to evict)
206    priority: AtomicI32,
207}
208
209impl Node {
210    fn new(tokens: Vec<TokenId>) -> Self {
211        Self {
212            tokens: ParkingLotRwLock::new(tokens),
213            children: new_children_map(),
214            tenant_last_access_time: new_tenant_map(),
215            last_tenant: ParkingLotRwLock::new(None),
216            parent: ParkingLotRwLock::new(Weak::new()),
217            page_key: ParkingLotRwLock::new(None),
218            hit_count: AtomicU64::new(0),
219            creation_time: next_timestamp(),
220            priority: AtomicI32::new(0),
221        }
222    }
223
224    #[expect(dead_code)] // Reserved for priority-based eviction policy support
225    fn new_with_priority(tokens: Vec<TokenId>, priority: i32) -> Self {
226        Self {
227            tokens: ParkingLotRwLock::new(tokens),
228            children: new_children_map(),
229            tenant_last_access_time: new_tenant_map(),
230            last_tenant: ParkingLotRwLock::new(None),
231            parent: ParkingLotRwLock::new(Weak::new()),
232            page_key: ParkingLotRwLock::new(None),
233            hit_count: AtomicU64::new(0),
234            creation_time: next_timestamp(),
235            priority: AtomicI32::new(priority),
236        }
237    }
238
239    fn new_root() -> Self {
240        Self {
241            tokens: ParkingLotRwLock::new(Vec::new()),
242            children: DashMap::with_hasher_and_shard_amount(
243                TokenPageHasherBuilder::default(),
244                ROOT_SHARD_COUNT,
245            ),
246            tenant_last_access_time: DashMap::with_shard_amount(ROOT_SHARD_COUNT),
247            last_tenant: ParkingLotRwLock::new(None),
248            parent: ParkingLotRwLock::new(Weak::new()),
249            page_key: ParkingLotRwLock::new(None),
250            hit_count: AtomicU64::new(0),
251            creation_time: 0,                   // Root is always oldest
252            priority: AtomicI32::new(i32::MIN), // Root has minimum priority (matches SGLang)
253        }
254    }
255
256    /// Set parent pointer and page key for this node
257    fn set_parent(&self, parent: &NodeRef, key: TokenPageKey) {
258        *self.parent.write() = Arc::downgrade(parent);
259        *self.page_key.write() = Some(key);
260    }
261
262    /// Check if this node is empty (no tenants and no children)
263    fn is_empty(&self) -> bool {
264        self.tenant_last_access_time.is_empty() && self.children.is_empty()
265    }
266
267    /// Get any tenant that owns this node (for match results)
268    fn get_any_tenant(&self) -> Option<TenantId> {
269        // Fast path: check cached tenant (parking_lot has no poisoning)
270        let guard = self.last_tenant.read();
271        if let Some(ref tenant) = *guard {
272            // Use borrowed lookup to avoid Arc clone for validation
273            if self.tenant_last_access_time.contains_key(tenant.as_ref()) {
274                return Some(Arc::clone(tenant));
275            }
276        }
277        drop(guard);
278
279        // Slow path: iterate to find any tenant
280        self.tenant_last_access_time
281            .iter()
282            .next()
283            .map(|entry| Arc::clone(entry.key()))
284    }
285
286    /// Update tenant access and cache (with probabilistic update to reduce contention).
287    ///
288    /// # Arguments
289    /// * `tenant` - The tenant to touch
290    /// * `track_lfu` - If true, increment hit_count for LFU policy (only needed when eviction_policy == LFU)
291    fn touch_tenant(&self, tenant: &TenantId, track_lfu: bool) {
292        let ts = next_timestamp();
293
294        // Conditionally increment hit count (only for LFU policy to reduce contention)
295        if track_lfu {
296            self.hit_count.fetch_add(1, Ordering::Relaxed);
297        }
298
299        // Fast path: try to update existing entry without Arc clone
300        // DashMap supports Borrow<str> lookups, avoiding allocation
301        if let Some(mut entry) = self.tenant_last_access_time.get_mut(tenant.as_ref()) {
302            *entry = ts;
303        } else {
304            // Slow path: insert new entry (requires Arc clone)
305            self.tenant_last_access_time.insert(Arc::clone(tenant), ts);
306        }
307
308        // Probabilistic cache update (1/16 chance) to reduce write contention
309        if ts & 0xF == 0 {
310            if let Some(mut guard) = self.last_tenant.try_write() {
311                *guard = Some(Arc::clone(tenant));
312            }
313        }
314    }
315
316    /// Update priority (take max to propagate higher priority, matching SGLang)
317    #[expect(dead_code)] // Reserved for priority-based eviction policy support
318    fn update_priority(&self, new_priority: i32) {
319        // Use fetch_max for correct concurrent updates (avoids CAS race condition)
320        self.priority.fetch_max(new_priority, Ordering::Relaxed);
321    }
322}
323
324/// Token-based radix tree for cache-aware routing.
325pub struct TokenTree {
326    root: NodeRef,
327    /// Track total tokens per tenant for eviction decisions
328    tenant_token_count: DashMap<TenantId, usize>,
329    /// Eviction policy (should match the backend worker's policy)
330    eviction_policy: EvictionPolicy,
331    /// Page size for token grouping (should match the backend worker's page size)
332    page_size: usize,
333}
334
335impl std::fmt::Debug for TokenTree {
336    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
337        f.debug_struct("TokenTree")
338            .field("tenant_count", &self.tenant_token_count.len())
339            .field("eviction_policy", &self.eviction_policy)
340            .field("page_size", &self.page_size)
341            .finish_non_exhaustive()
342    }
343}
344
345impl Default for TokenTree {
346    fn default() -> Self {
347        Self::new()
348    }
349}
350
351impl TokenTree {
352    /// Create a new TokenTree with default settings (page_size=16, eviction_policy=LRU).
353    pub fn new() -> Self {
354        Self::with_config(PAGE_SIZE, EvictionPolicy::default())
355    }
356
357    /// Create a new TokenTree with a specific eviction policy (uses default page_size=16).
358    /// The policy should match the backend worker's policy to stay in sync.
359    pub fn with_policy(policy: EvictionPolicy) -> Self {
360        Self::with_config(PAGE_SIZE, policy)
361    }
362
363    /// Create a new TokenTree with specific page size and eviction policy.
364    ///
365    /// # Arguments
366    /// * `page_size` - Token page size for grouping (must match backend worker's page size).
367    ///   SGLang supports: 1, 16, 32, 64, 128 depending on attention backend.
368    ///   **Note**: Currently only page_size=16 is supported at compile time.
369    /// * `policy` - Eviction policy (should match the backend worker's policy).
370    ///
371    /// # Panics
372    /// Panics if `page_size` != 16 (compile-time limitation, future versions may support other sizes).
373    pub fn with_config(page_size: usize, policy: EvictionPolicy) -> Self {
374        assert_eq!(
375            page_size, PAGE_SIZE,
376            "TokenTree currently only supports page_size={PAGE_SIZE} (compile-time limitation). \
377             Got page_size={page_size}. Future versions may support configurable page sizes."
378        );
379        Self {
380            root: Arc::new(Node::new_root()),
381            tenant_token_count: DashMap::with_shard_amount(ROOT_SHARD_COUNT),
382            eviction_policy: policy,
383            page_size,
384        }
385    }
386
387    /// Get the current eviction policy.
388    pub fn eviction_policy(&self) -> EvictionPolicy {
389        self.eviction_policy
390    }
391
392    /// Get the page size used for token grouping.
393    pub fn page_size(&self) -> usize {
394        self.page_size
395    }
396
397    /// Insert a token sequence with associated tenant.
398    ///
399    /// **Page-aligned**: Input is aligned to PAGE_SIZE boundary.
400    /// Sequences shorter than PAGE_SIZE are skipped (no cache benefit).
401    pub fn insert_tokens(&self, tokens: &[TokenId], tenant: &str) {
402        // Align to page boundary (truncate to nearest page)
403        let aligned_len = align_to_page(tokens.len());
404        if aligned_len == 0 {
405            // Sequence too short for cache benefit (matches SGLang behavior)
406            return;
407        }
408        let tokens = &tokens[..aligned_len];
409
410        let tenant_id = intern_tenant(tenant);
411
412        // Ensure tenant exists at root
413        self.root
414            .tenant_last_access_time
415            .entry(Arc::clone(&tenant_id))
416            .or_insert(0);
417
418        self.tenant_token_count
419            .entry(Arc::clone(&tenant_id))
420            .or_insert(0);
421
422        // Compute once: only track hit counts for LFU policy
423        let track_lfu = self.eviction_policy == EvictionPolicy::Lfu;
424
425        // Descend from the root inserting the whole sequence.
426        let tokens_added = Self::insert_from(
427            Arc::clone(&self.root),
428            tokens,
429            Arc::clone(&tenant_id),
430            track_lfu,
431        );
432
433        // Update tenant token count
434        if tokens_added > 0 {
435            self.tenant_token_count
436                .entry(tenant_id)
437                .and_modify(|c| *c += tokens_added)
438                .or_insert(tokens_added);
439        }
440    }
441
442    /// Insert `remaining` for `tenant_id` starting the descent at `current`
443    /// (treated as the parent of the first edge), returning the number of new
444    /// tokens to add to the tenant's count. Holds the exact node-split /
445    /// tenant-attach / counting logic of [`Self::insert_tokens`]; the caller
446    /// owns root bookkeeping and folding the returned count into
447    /// `tenant_token_count`.
448    ///
449    /// [`Self::match_and_insert_with`] reuses this to splice only the *unmatched
450    /// suffix* at the fall-off node (after re-attaching the tenant to the matched
451    /// ancestor nodes), so the already-matched prefix is never re-walked.
452    fn insert_from(
453        mut current: NodeRef,
454        mut remaining: &[TokenId],
455        tenant_id: TenantId,
456        track_lfu: bool,
457    ) -> usize {
458        let mut tokens_added = 0usize;
459
460        // Result type to carry state out of the match block
461        // This allows the entry guard to be dropped before we update current
462        enum InsertStep {
463            Done(usize),
464            Continue { next: NodeRef, advance: usize },
465        }
466
467        while remaining.len() >= PAGE_SIZE {
468            // Use first PAGE_SIZE tokens as key for children lookup
469            let page_key = make_page_key(remaining);
470
471            let step = match current.children.entry(page_key) {
472                Entry::Vacant(entry) => {
473                    // No child with this page key - create new node
474                    let new_node = Arc::new(Node::new(remaining.to_vec()));
475                    new_node.set_parent(&current, page_key);
476                    new_node.touch_tenant(&tenant_id, track_lfu);
477                    entry.insert(new_node);
478                    InsertStep::Done(remaining.len())
479                }
480                Entry::Occupied(mut entry) => {
481                    let child = Arc::clone(entry.get());
482                    let child_tokens = child.tokens.read();
483                    let child_len = child_tokens.len();
484
485                    // Find common prefix length (page-aligned)
486                    let common_len = remaining
487                        .iter()
488                        .zip(child_tokens.iter())
489                        .take_while(|(a, b)| a == b)
490                        .count();
491                    // Align common length to page boundary
492                    let common_len = align_to_page(common_len);
493
494                    if common_len == 0 {
495                        // No page-aligned match despite same page key (shouldn't happen)
496                        drop(child_tokens);
497                        InsertStep::Done(0)
498                    } else if common_len == child_len {
499                        // Full match with child - continue traversal
500                        drop(child_tokens);
501                        child.touch_tenant(&tenant_id, track_lfu);
502                        InsertStep::Continue {
503                            next: child,
504                            advance: common_len,
505                        }
506                    } else if common_len >= remaining.len() {
507                        // Input is prefix of child - split child at page boundary
508                        // Strategy: Create NEW intermediate node with prefix tokens,
509                        // keep original child as suffix (preserving its children/tenants)
510                        let common_len = align_to_page(remaining.len());
511                        let prefix_tokens: Vec<TokenId> = child_tokens[..common_len].to_vec();
512                        let suffix_page_key = make_page_key(&child_tokens[common_len..]);
513
514                        // Check if tenant already owned the child (tokens already counted)
515                        let tenant_already_owned = child
516                            .tenant_last_access_time
517                            .contains_key(tenant_id.as_ref());
518                        drop(child_tokens);
519
520                        // Modify original child to hold only suffix tokens
521                        let mut child_tokens_write = child.tokens.write();
522                        let suffix_tokens: Vec<TokenId> = child_tokens_write[common_len..].to_vec();
523                        *child_tokens_write = suffix_tokens;
524                        drop(child_tokens_write);
525
526                        // Create intermediate node with prefix - clone tenant map (O(1))
527                        // Intermediate inherits metadata from child (represents same prefix)
528                        let intermediate_node = Arc::new(Node {
529                            tokens: ParkingLotRwLock::new(prefix_tokens),
530                            children: new_children_map(),
531                            tenant_last_access_time: child.tenant_last_access_time.clone(),
532                            last_tenant: ParkingLotRwLock::new(child.last_tenant.read().clone()),
533                            parent: ParkingLotRwLock::new(Arc::downgrade(&current)),
534                            page_key: ParkingLotRwLock::new(Some(page_key)),
535                            hit_count: AtomicU64::new(child.hit_count.load(Ordering::Relaxed)),
536                            creation_time: child.creation_time,
537                            priority: AtomicI32::new(child.priority.load(Ordering::Relaxed)),
538                        });
539
540                        // Add original child (now suffix) as child of intermediate
541                        // Update child's parent to point to intermediate
542                        child.set_parent(&intermediate_node, suffix_page_key);
543                        intermediate_node
544                            .children
545                            .insert(suffix_page_key, Arc::clone(&child));
546
547                        // Replace entry with intermediate node
548                        entry.insert(intermediate_node.clone());
549
550                        intermediate_node.touch_tenant(&tenant_id, track_lfu);
551
552                        // Only count new tokens if tenant didn't already own this path
553                        let new_tokens = if tenant_already_owned { 0 } else { common_len };
554                        InsertStep::Done(new_tokens)
555                    } else {
556                        // Partial match - need to split and add new branch at page boundary
557                        // Strategy: Create NEW intermediate node with common prefix,
558                        // keep original child as one suffix, create new node for other suffix
559                        let prefix_tokens: Vec<TokenId> = child_tokens[..common_len].to_vec();
560                        let child_suffix_page_key = make_page_key(&child_tokens[common_len..]);
561
562                        // Check if tenant already owned the child (common prefix already counted)
563                        let tenant_already_owned = child
564                            .tenant_last_access_time
565                            .contains_key(tenant_id.as_ref());
566                        drop(child_tokens);
567
568                        // Modify original child to hold only its suffix tokens
569                        let mut child_tokens_write = child.tokens.write();
570                        let child_suffix: Vec<TokenId> = child_tokens_write[common_len..].to_vec();
571                        *child_tokens_write = child_suffix;
572                        drop(child_tokens_write);
573
574                        // Create intermediate node with common prefix - clone tenant map (O(1))
575                        // Intermediate inherits metadata from child (represents same prefix)
576                        let intermediate_node = Arc::new(Node {
577                            tokens: ParkingLotRwLock::new(prefix_tokens),
578                            children: new_children_map(),
579                            tenant_last_access_time: child.tenant_last_access_time.clone(),
580                            last_tenant: ParkingLotRwLock::new(child.last_tenant.read().clone()),
581                            parent: ParkingLotRwLock::new(Arc::downgrade(&current)),
582                            page_key: ParkingLotRwLock::new(Some(page_key)),
583                            hit_count: AtomicU64::new(child.hit_count.load(Ordering::Relaxed)),
584                            creation_time: child.creation_time,
585                            priority: AtomicI32::new(child.priority.load(Ordering::Relaxed)),
586                        });
587
588                        // Add original child (now suffix) as child of intermediate
589                        // Update child's parent to point to intermediate
590                        child.set_parent(&intermediate_node, child_suffix_page_key);
591                        intermediate_node
592                            .children
593                            .insert(child_suffix_page_key, Arc::clone(&child));
594
595                        // Create new node for the remaining input suffix
596                        let new_remaining = &remaining[common_len..];
597                        let new_branch_tokens = if new_remaining.len() >= PAGE_SIZE {
598                            let new_node = Arc::new(Node::new(new_remaining.to_vec()));
599                            let new_page_key = make_page_key(new_remaining);
600                            new_node.set_parent(&intermediate_node, new_page_key);
601                            new_node.touch_tenant(&tenant_id, track_lfu);
602                            intermediate_node.children.insert(new_page_key, new_node);
603                            new_remaining.len()
604                        } else {
605                            0
606                        };
607
608                        // Replace entry with intermediate node
609                        entry.insert(intermediate_node.clone());
610
611                        intermediate_node.touch_tenant(&tenant_id, track_lfu);
612
613                        // Count: new branch tokens + common prefix only if tenant is new
614                        let common_tokens = if tenant_already_owned { 0 } else { common_len };
615                        InsertStep::Done(new_branch_tokens + common_tokens)
616                    }
617                }
618            };
619
620            match step {
621                InsertStep::Done(added) => {
622                    tokens_added += added;
623                    break;
624                }
625                InsertStep::Continue { next, advance } => {
626                    tokens_added += advance;
627                    remaining = &remaining[advance..];
628                    current = next;
629                }
630            }
631        }
632
633        // The caller folds this into `tenant_token_count` (once, after any
634        // matched-prefix re-attach in match_and_insert_with).
635        tokens_added
636    }
637
638    /// Find longest matching prefix with detailed counts.
639    ///
640    /// **Page-aligned**: Input is aligned to PAGE_SIZE boundary before lookup.
641    /// Sequences shorter than PAGE_SIZE return 0 matched tokens.
642    pub fn match_prefix_with_counts(&self, tokens: &[TokenId]) -> PrefixMatchResult {
643        let input_token_count = tokens.len();
644
645        // Align to page boundary (truncate to nearest page)
646        let aligned_len = align_to_page(tokens.len());
647        if aligned_len == 0 {
648            // Sequence too short for cache lookup (matches SGLang behavior)
649            return PrefixMatchResult {
650                tenant: self
651                    .root
652                    .get_any_tenant()
653                    .unwrap_or_else(|| intern_tenant("empty")),
654                matched_token_count: 0,
655                input_token_count,
656            };
657        }
658        let tokens = &tokens[..aligned_len];
659
660        let mut matched_tokens = 0;
661        let mut last_tenant: Option<TenantId> = None;
662        let mut remaining = tokens;
663        let mut current = Arc::clone(&self.root);
664
665        // Compute once: only track hit counts for LFU policy
666        let track_lfu = self.eviction_policy == EvictionPolicy::Lfu;
667
668        enum MatchStep {
669            Done,
670            Continue {
671                next: NodeRef,
672                advance: usize,
673                tenant: Option<TenantId>,
674            },
675            PartialMatch {
676                matched: usize,
677                tenant: Option<TenantId>,
678            },
679        }
680
681        while remaining.len() >= PAGE_SIZE {
682            // Use first PAGE_SIZE tokens as key for children lookup
683            let page_key = make_page_key(remaining);
684
685            let step = match current.children.get(&page_key) {
686                None => MatchStep::Done,
687                Some(child_ref) => {
688                    let child = Arc::clone(child_ref.value());
689                    drop(child_ref);
690
691                    let child_tokens = child.tokens.read();
692
693                    // Count matching tokens
694                    let match_len = remaining
695                        .iter()
696                        .zip(child_tokens.iter())
697                        .take_while(|(a, b)| a == b)
698                        .count();
699                    // Align match length to page boundary
700                    let match_len = align_to_page(match_len);
701
702                    if match_len == 0 {
703                        MatchStep::Done
704                    } else {
705                        let tenant = child.get_any_tenant();
706
707                        // If node has no tenants (all evicted), stop matching here
708                        // This ensures we only route to workers that still have the prefix cached
709                        if tenant.is_none() {
710                            MatchStep::Done
711                        } else {
712                            // Update timestamp on match to keep LRU in sync with backend
713                            // SGLang does: child.last_access_time = access_time
714                            if let Some(ref t) = tenant {
715                                child.touch_tenant(t, track_lfu);
716                            }
717
718                            if match_len < child_tokens.len() {
719                                // Partial match within node (at page boundary)
720                                MatchStep::PartialMatch {
721                                    matched: match_len,
722                                    tenant,
723                                }
724                            } else {
725                                // Full match - continue
726                                drop(child_tokens);
727                                MatchStep::Continue {
728                                    next: child,
729                                    advance: match_len,
730                                    tenant,
731                                }
732                            }
733                        }
734                    }
735                }
736            };
737
738            match step {
739                MatchStep::Done => break,
740                MatchStep::PartialMatch { matched, tenant } => {
741                    matched_tokens += matched;
742                    if let Some(t) = tenant {
743                        last_tenant = Some(t);
744                    }
745                    break;
746                }
747                MatchStep::Continue {
748                    next,
749                    advance,
750                    tenant,
751                } => {
752                    matched_tokens += advance;
753                    if let Some(t) = tenant {
754                        last_tenant = Some(t);
755                    }
756                    remaining = &remaining[advance..];
757                    current = next;
758                }
759            }
760        }
761
762        PrefixMatchResult {
763            tenant: last_tenant.unwrap_or_else(|| intern_tenant("empty")),
764            matched_token_count: matched_tokens,
765            input_token_count,
766        }
767    }
768
769    /// Combined match + insert in a SINGLE tree descent.
770    ///
771    /// Equivalent to calling [`Self::match_prefix_with_counts`] immediately
772    /// followed by [`Self::insert_tokens`] with the same `tokens`, but it
773    /// traverses the prefix only once. For long prefixes (e.g. 150K tokens)
774    /// this halves the number of page-key lookups, child-map probes, token
775    /// comparisons, and `touch_tenant` writes on the request hot path.
776    ///
777    /// # Why a single descent is correct
778    ///
779    /// `insert_tokens` descends through *full-match* children exactly the way
780    /// `match_prefix_with_counts` does (same page key, same page-aligned common
781    /// prefix, same "continue on full match" rule). Insert's descent is a
782    /// (depth-wise) superset of match's: match stops as soon as it hits a node
783    /// with no tenants (all evicted) or a partial match, whereas insert keeps
784    /// going on a full match and only stops when it must create or split a node.
785    /// So we drive the descent with insert's logic and *freeze* the match result
786    /// the first time match's stop condition is reached (tracked by
787    /// `match_frozen`). After freezing, deeper nodes only affect insert
788    /// accounting, never the returned match result — exactly as if the separate
789    /// `match` call had already returned.
790    ///
791    /// # Preserved semantics (identical to match-then-insert)
792    ///
793    /// * **Node split**: the vacant / prefix-of-child / diverge branches below
794    ///   are copied verbatim from `insert_tokens` (intermediate node inherits the
795    ///   child's tenant map, hit_count, creation_time, priority; child is demoted
796    ///   to the suffix; parent pointers / page keys rewritten the same way).
797    /// * **Per-tenant token counting**: `tokens_added` is accumulated with the
798    ///   same rules as `insert_tokens` (full-match Continue counts `common_len`;
799    ///   the split branches count `common_len` only when the tenant did not
800    ///   already own the path, plus any brand-new branch tokens) and folded into
801    ///   `tenant_token_count` once at the end.
802    /// * **`touch_tenant` / timestamps**: at every node we reproduce the exact
803    ///   touch sequence the two separate calls would have made, in the same
804    ///   order, on the same node identities:
805    ///   1. The match side reads `child.get_any_tenant()` *before* any insert
806    ///      mutation (so the all-evicted check and the routed tenant are computed
807    ///      against the pre-insert node), then touches that tenant — exactly what
808    ///      `match_prefix_with_counts` does, and before any split so the split's
809    ///      tenant-map clone inherits the fresh timestamp just like the original
810    ///      ordering (`match` ran fully before `insert`).
811    ///   2. The insert side then touches the inserting `tenant` on the node it
812    ///      would have (the continued child, the newly created leaf, or the new
813    ///      intermediate).
814    ///   We deliberately do NOT deduplicate the two touches even when they land
815    ///   on the same node for the same tenant: the original match-then-insert
816    ///   pair already touched twice in that case (e.g. a full-match continuation
817    ///   whose `get_any_tenant()` is the routed/inserting tenant), so keeping
818    ///   both touches reproduces the LFU `hit_count` and timestamp progression
819    ///   byte-for-byte. For the default LRU policy the second touch only advances
820    ///   the timestamp, which is immaterial to relative ordering.
821    pub fn match_and_insert(&self, tokens: &[TokenId], tenant: &str) -> PrefixMatchResult {
822        let input_token_count = tokens.len();
823
824        // Align to page boundary (truncate to nearest page). Mirrors both
825        // `match_prefix_with_counts` and `insert_tokens`.
826        let aligned_len = align_to_page(tokens.len());
827        if aligned_len == 0 {
828            // Too short to cache: `insert_tokens` is a no-op and
829            // `match_prefix_with_counts` returns 0 matched tokens with any
830            // root tenant. Reproduce the match result, skip the insert.
831            return PrefixMatchResult {
832                tenant: self
833                    .root
834                    .get_any_tenant()
835                    .unwrap_or_else(|| intern_tenant("empty")),
836                matched_token_count: 0,
837                input_token_count,
838            };
839        }
840        let tokens = &tokens[..aligned_len];
841
842        let tenant_id = intern_tenant(tenant);
843
844        // Ensure tenant exists at root (insert-side bookkeeping).
845        self.root
846            .tenant_last_access_time
847            .entry(Arc::clone(&tenant_id))
848            .or_insert(0);
849        self.tenant_token_count
850            .entry(Arc::clone(&tenant_id))
851            .or_insert(0);
852
853        let mut remaining = tokens;
854        let mut current = Arc::clone(&self.root);
855        let mut tokens_added = 0usize;
856
857        // Match-result accumulators.
858        let mut matched_tokens = 0usize;
859        let mut last_tenant: Option<TenantId> = None;
860        // Once the match descent would have stopped (empty node or partial
861        // match), stop updating the match result; insert keeps descending.
862        let mut match_frozen = false;
863
864        let track_lfu = self.eviction_policy == EvictionPolicy::Lfu;
865
866        // Carries state out of the entry match so the entry guard can be
867        // dropped before we advance `current` (same pattern as `insert_tokens`).
868        enum Step {
869            Done(usize),
870            Continue { next: NodeRef, advance: usize },
871        }
872
873        while remaining.len() >= PAGE_SIZE {
874            let page_key = make_page_key(remaining);
875
876            let step = match current.children.entry(page_key) {
877                Entry::Vacant(entry) => {
878                    // Match: child not found -> match stops (records nothing).
879                    // Insert: create a new leaf node holding the remainder.
880                    let new_node = Arc::new(Node::new(remaining.to_vec()));
881                    new_node.set_parent(&current, page_key);
882                    new_node.touch_tenant(&tenant_id, track_lfu);
883                    entry.insert(new_node);
884                    Step::Done(remaining.len())
885                }
886                Entry::Occupied(mut entry) => {
887                    let child = Arc::clone(entry.get());
888                    let child_tokens = child.tokens.read();
889                    let child_len = child_tokens.len();
890
891                    let common_len = remaining
892                        .iter()
893                        .zip(child_tokens.iter())
894                        .take_while(|(a, b)| a == b)
895                        .count();
896                    let common_len = align_to_page(common_len);
897
898                    if common_len == 0 {
899                        // Same page key but no aligned match (shouldn't happen).
900                        // Match stops; insert adds nothing.
901                        drop(child_tokens);
902                        Step::Done(0)
903                    } else if common_len == child_len {
904                        // Full match with child -> continue traversal.
905                        drop(child_tokens);
906
907                        // --- Match side (pre-insert node state) ---
908                        // Read the routed tenant BEFORE insert touches the node:
909                        // insert's touch would add `tenant_id` to the map and
910                        // corrupt both the all-evicted check and the routed
911                        // tenant. This reproduces `match_prefix_with_counts`'s
912                        // Continue arm exactly (it touches `get_any_tenant()`).
913                        if !match_frozen {
914                            match child.get_any_tenant() {
915                                None => {
916                                    // All tenants evicted: match stops here.
917                                    // Insert still continues (re-populates node).
918                                    match_frozen = true;
919                                }
920                                Some(t_match) => {
921                                    matched_tokens += common_len;
922                                    child.touch_tenant(&t_match, track_lfu);
923                                    last_tenant = Some(t_match);
924                                }
925                            }
926                        }
927
928                        // --- Insert side ---
929                        // `insert_tokens` always touches the inserting tenant on
930                        // a full-match continuation. When the match side above
931                        // already touched this same tenant, the original code
932                        // ALSO touched twice (match then insert), so we keep both
933                        // touches to preserve LFU hit_count / timestamp behavior
934                        // byte-for-byte.
935                        child.touch_tenant(&tenant_id, track_lfu);
936                        Step::Continue {
937                            next: child,
938                            advance: common_len,
939                        }
940                    } else if common_len >= remaining.len() {
941                        // Input is a prefix of the child -> split child at the
942                        // page boundary. (Verbatim from `insert_tokens`, with the
943                        // match-side touch interleaved before the split so the
944                        // intermediate's tenant-map clone inherits it — exactly
945                        // as match-then-insert ordered the writes.)
946
947                        // Match side first (touches the pre-split child).
948                        if !match_frozen {
949                            match child.get_any_tenant() {
950                                None => match_frozen = true,
951                                Some(t_match) => {
952                                    matched_tokens += common_len;
953                                    child.touch_tenant(&t_match, track_lfu);
954                                    last_tenant = Some(t_match);
955                                }
956                            }
957                        }
958
959                        let common_len = align_to_page(remaining.len());
960                        let prefix_tokens: Vec<TokenId> = child_tokens[..common_len].to_vec();
961                        let suffix_page_key = make_page_key(&child_tokens[common_len..]);
962
963                        let tenant_already_owned = child
964                            .tenant_last_access_time
965                            .contains_key(tenant_id.as_ref());
966                        drop(child_tokens);
967
968                        let mut child_tokens_write = child.tokens.write();
969                        let suffix_tokens: Vec<TokenId> = child_tokens_write[common_len..].to_vec();
970                        *child_tokens_write = suffix_tokens;
971                        drop(child_tokens_write);
972
973                        let intermediate_node = Arc::new(Node {
974                            tokens: ParkingLotRwLock::new(prefix_tokens),
975                            children: new_children_map(),
976                            tenant_last_access_time: child.tenant_last_access_time.clone(),
977                            last_tenant: ParkingLotRwLock::new(child.last_tenant.read().clone()),
978                            parent: ParkingLotRwLock::new(Arc::downgrade(&current)),
979                            page_key: ParkingLotRwLock::new(Some(page_key)),
980                            hit_count: AtomicU64::new(child.hit_count.load(Ordering::Relaxed)),
981                            creation_time: child.creation_time,
982                            priority: AtomicI32::new(child.priority.load(Ordering::Relaxed)),
983                        });
984
985                        child.set_parent(&intermediate_node, suffix_page_key);
986                        intermediate_node
987                            .children
988                            .insert(suffix_page_key, Arc::clone(&child));
989
990                        entry.insert(intermediate_node.clone());
991
992                        intermediate_node.touch_tenant(&tenant_id, track_lfu);
993
994                        let new_tokens = if tenant_already_owned { 0 } else { common_len };
995                        Step::Done(new_tokens)
996                    } else {
997                        // Partial match -> split and add a new branch at the page
998                        // boundary. (Verbatim from `insert_tokens`, with the
999                        // match-side touch interleaved before the split.)
1000
1001                        // Match side first (touches the pre-split child).
1002                        if !match_frozen {
1003                            match child.get_any_tenant() {
1004                                None => match_frozen = true,
1005                                Some(t_match) => {
1006                                    matched_tokens += common_len;
1007                                    child.touch_tenant(&t_match, track_lfu);
1008                                    last_tenant = Some(t_match);
1009                                }
1010                            }
1011                        }
1012
1013                        let prefix_tokens: Vec<TokenId> = child_tokens[..common_len].to_vec();
1014                        let child_suffix_page_key = make_page_key(&child_tokens[common_len..]);
1015
1016                        let tenant_already_owned = child
1017                            .tenant_last_access_time
1018                            .contains_key(tenant_id.as_ref());
1019                        drop(child_tokens);
1020
1021                        let mut child_tokens_write = child.tokens.write();
1022                        let child_suffix: Vec<TokenId> = child_tokens_write[common_len..].to_vec();
1023                        *child_tokens_write = child_suffix;
1024                        drop(child_tokens_write);
1025
1026                        let intermediate_node = Arc::new(Node {
1027                            tokens: ParkingLotRwLock::new(prefix_tokens),
1028                            children: new_children_map(),
1029                            tenant_last_access_time: child.tenant_last_access_time.clone(),
1030                            last_tenant: ParkingLotRwLock::new(child.last_tenant.read().clone()),
1031                            parent: ParkingLotRwLock::new(Arc::downgrade(&current)),
1032                            page_key: ParkingLotRwLock::new(Some(page_key)),
1033                            hit_count: AtomicU64::new(child.hit_count.load(Ordering::Relaxed)),
1034                            creation_time: child.creation_time,
1035                            priority: AtomicI32::new(child.priority.load(Ordering::Relaxed)),
1036                        });
1037
1038                        child.set_parent(&intermediate_node, child_suffix_page_key);
1039                        intermediate_node
1040                            .children
1041                            .insert(child_suffix_page_key, Arc::clone(&child));
1042
1043                        let new_remaining = &remaining[common_len..];
1044                        let new_branch_tokens = if new_remaining.len() >= PAGE_SIZE {
1045                            let new_node = Arc::new(Node::new(new_remaining.to_vec()));
1046                            let new_page_key = make_page_key(new_remaining);
1047                            new_node.set_parent(&intermediate_node, new_page_key);
1048                            new_node.touch_tenant(&tenant_id, track_lfu);
1049                            intermediate_node.children.insert(new_page_key, new_node);
1050                            new_remaining.len()
1051                        } else {
1052                            0
1053                        };
1054
1055                        entry.insert(intermediate_node.clone());
1056
1057                        intermediate_node.touch_tenant(&tenant_id, track_lfu);
1058
1059                        let common_tokens = if tenant_already_owned { 0 } else { common_len };
1060                        Step::Done(new_branch_tokens + common_tokens)
1061                    }
1062                }
1063            };
1064
1065            match step {
1066                Step::Done(added) => {
1067                    tokens_added += added;
1068                    break;
1069                }
1070                Step::Continue { next, advance } => {
1071                    tokens_added += advance;
1072                    remaining = &remaining[advance..];
1073                    current = next;
1074                }
1075            }
1076        }
1077
1078        // Fold insert's token count in once (insert-side bookkeeping).
1079        if tokens_added > 0 {
1080            self.tenant_token_count
1081                .entry(tenant_id)
1082                .and_modify(|c| *c += tokens_added)
1083                .or_insert(tokens_added);
1084        }
1085
1086        PrefixMatchResult {
1087            tenant: last_tenant.unwrap_or_else(|| intern_tenant("empty")),
1088            matched_token_count: matched_tokens,
1089            input_token_count,
1090        }
1091    }
1092
1093    /// Match in a single descent, then choose the insert tenant from the match
1094    /// result and insert for it — still a SINGLE top-to-bottom traversal of the
1095    /// prefix.
1096    ///
1097    /// This is the variant the cache-aware router needs: the worker it inserts
1098    /// for is *derived from* the match outcome (route to the matched worker on a
1099    /// cache hit, else to the least-loaded worker), so the inserting tenant is
1100    /// not known until the match completes. `select` is invoked exactly once,
1101    /// after the match, with the [`PrefixMatchResult`]; returning `Some(tenant)`
1102    /// inserts `tokens` for that tenant, and returning `None` skips the insert
1103    /// entirely (mirroring the router's "selected worker is gone" branch, which
1104    /// performs no insert).
1105    ///
1106    /// # How the single descent is achieved
1107    ///
1108    /// The match phase walks the prefix exactly like
1109    /// [`Self::match_prefix_with_counts`], additionally recording the chain of
1110    /// nodes it traverses as `path` (one `(node, advance)` per edge) and the
1111    /// node/`remaining` slice where the walk fell off. After `select` yields the
1112    /// tenant we *replay insert's per-node work directly on the recorded nodes*
1113    /// (no second tree navigation): `touch_tenant` on every traversed node plus
1114    /// the same `tokens_added` accounting, then splice the remainder at the
1115    /// fall-off point with the very branches `insert_tokens` uses (vacant leaf /
1116    /// prefix-of-child split / diverge split). The only tree lookups are the
1117    /// single descent and one `entry()` re-probe at the fall-off node for the
1118    /// splice — never a second full walk.
1119    ///
1120    /// # Preserved semantics
1121    ///
1122    /// Identical to `match_prefix_with_counts` followed by
1123    /// `insert_tokens(tokens, tenant)`:
1124    /// * match-side: same matched-token count, same routed tenant, same per-node
1125    ///   `touch_tenant(get_any_tenant())`, same "stop at an all-evicted node or a
1126    ///   partial match" rule;
1127    /// * insert-side: same node-split structure, same `tenant_token_count`
1128    ///   accounting (including the existing behavior of re-counting a fully
1129    ///   matched path), same `touch_tenant(tenant)` on every node on the path and
1130    ///   on freshly created/split nodes.
1131    ///
1132    /// The replay reorders insert's per-node touches to *after* the match phase
1133    /// instead of interleaving them, which only changes the exact monotonic
1134    /// timestamp values written (immaterial to relative LRU/LFU ordering); the
1135    /// set of touched (node, tenant) pairs and the LFU hit-count increments are
1136    /// unchanged.
1137    ///
1138    /// # Equivalence scope (single-threaded vs concurrent)
1139    ///
1140    /// The "identical" guarantees above hold exactly **single-threaded**
1141    /// (covered by `test_match_and_insert_equiv_*`). Under **concurrent** node
1142    /// splits the equivalence is on **routing and `tenant_token_count`**, not on
1143    /// per-node access *timestamps*: this path replays onto the nodes recorded
1144    /// during the match rather than re-walking, so if another thread splits one
1145    /// of those nodes mid-flight, a freshly-created intermediate can miss a
1146    /// timestamp bump — it is then evicted slightly earlier (a bounded,
1147    /// self-healing cache-hit-rate effect on an approximate tree). The token
1148    /// count stays exact because the recorded per-node `advance`s are frozen at
1149    /// match time; `test_concurrent_match_and_insert_count_and_route_integrity`
1150    /// proves both the exact count and route integrity under a concurrent-split
1151    /// stress.
1152    pub fn match_and_insert_with<'t, F>(&self, tokens: &[TokenId], select: F) -> PrefixMatchResult
1153    where
1154        F: FnOnce(&PrefixMatchResult) -> Option<&'t str>,
1155    {
1156        let input_token_count = tokens.len();
1157
1158        let aligned_len = align_to_page(tokens.len());
1159        if aligned_len == 0 {
1160            // Too short to cache: `insert_tokens` is a no-op regardless of the
1161            // selected tenant, so just resolve + return the match result. We
1162            // still invoke `select` so the caller's routing side effects (if
1163            // any) run, matching a separate match-then-(skipped)-insert.
1164            let result = PrefixMatchResult {
1165                tenant: self
1166                    .root
1167                    .get_any_tenant()
1168                    .unwrap_or_else(|| intern_tenant("empty")),
1169                matched_token_count: 0,
1170                input_token_count,
1171            };
1172            let _ = select(&result);
1173            return result;
1174        }
1175        let tokens = &tokens[..aligned_len];
1176
1177        let track_lfu = self.eviction_policy == EvictionPolicy::Lfu;
1178
1179        // ---- Phase 1: MATCH descent (mirrors match_prefix_with_counts) ----
1180        // Additionally record every traversed edge so insert can replay its
1181        // per-node work without re-walking, and capture the fall-off node +
1182        // remaining slice for the splice.
1183        let mut matched_tokens = 0usize;
1184        let mut last_tenant: Option<TenantId> = None;
1185        let mut remaining = tokens;
1186        let mut current = Arc::clone(&self.root);
1187        // (node, advance) for each edge we descended through, in order.
1188        // Pre-allocated; most matched paths are well under this depth.
1189        let mut path: Vec<(NodeRef, usize)> = Vec::with_capacity(16);
1190        // Once match would stop (all-evicted node / partial), freeze the match
1191        // result but keep descending for insert (insert's reach is a superset).
1192        let mut match_frozen = false;
1193
1194        enum MatchStep {
1195            Stop,
1196            Continue { next: NodeRef, advance: usize },
1197        }
1198
1199        while remaining.len() >= PAGE_SIZE {
1200            let page_key = make_page_key(remaining);
1201
1202            let step = match current.children.get(&page_key) {
1203                None => MatchStep::Stop,
1204                Some(child_ref) => {
1205                    let child = Arc::clone(child_ref.value());
1206                    drop(child_ref);
1207
1208                    let child_tokens = child.tokens.read();
1209                    let match_len = remaining
1210                        .iter()
1211                        .zip(child_tokens.iter())
1212                        .take_while(|(a, b)| a == b)
1213                        .count();
1214                    let match_len = align_to_page(match_len);
1215
1216                    if match_len == 0 {
1217                        drop(child_tokens);
1218                        MatchStep::Stop
1219                    } else if match_len < child_tokens.len() {
1220                        // Partial match within the node: match stops here.
1221                        if !match_frozen {
1222                            if let Some(t) = child.get_any_tenant() {
1223                                child.touch_tenant(&t, track_lfu);
1224                                matched_tokens += match_len;
1225                                last_tenant = Some(t);
1226                            }
1227                            // (If the node is all-evicted, match records nothing
1228                            // and simply stops — same as match_prefix_with_counts.)
1229                            match_frozen = true;
1230                        }
1231                        // Insert also stops descending here (it will split this
1232                        // node). Do NOT push to `path`; the splice handles it.
1233                        drop(child_tokens);
1234                        MatchStep::Stop
1235                    } else {
1236                        // Full match: match continues (if not frozen) and insert
1237                        // continues regardless.
1238                        drop(child_tokens);
1239                        if !match_frozen {
1240                            match child.get_any_tenant() {
1241                                None => {
1242                                    // All-evicted: match stops, insert continues.
1243                                    match_frozen = true;
1244                                }
1245                                Some(t) => {
1246                                    child.touch_tenant(&t, track_lfu);
1247                                    matched_tokens += match_len;
1248                                    last_tenant = Some(t);
1249                                }
1250                            }
1251                        }
1252                        MatchStep::Continue {
1253                            next: child,
1254                            advance: match_len,
1255                        }
1256                    }
1257                }
1258            };
1259
1260            match step {
1261                MatchStep::Stop => break,
1262                MatchStep::Continue { next, advance } => {
1263                    path.push((Arc::clone(&next), advance));
1264                    remaining = &remaining[advance..];
1265                    current = next;
1266                }
1267            }
1268        }
1269
1270        // ---- Decide the insert tenant from the match result ----
1271        let result = PrefixMatchResult {
1272            tenant: last_tenant.unwrap_or_else(|| intern_tenant("empty")),
1273            matched_token_count: matched_tokens,
1274            input_token_count,
1275        };
1276        let Some(tenant) = select(&result) else {
1277            // No insert (router selected no worker).
1278            return result;
1279        };
1280        // Intern through the shared pool so the stored Arc dedups with every
1281        // other call (matches insert_tokens' interning).
1282        let tenant_id = intern_tenant(tenant);
1283
1284        // ---- Phase 2: INSERT replay for `tenant_id` (no second walk) ----
1285        // Mirrors insert_tokens' root bookkeeping.
1286        self.root
1287            .tenant_last_access_time
1288            .entry(Arc::clone(&tenant_id))
1289            .or_insert(0);
1290        self.tenant_token_count
1291            .entry(Arc::clone(&tenant_id))
1292            .or_insert(0);
1293
1294        let mut tokens_added = 0usize;
1295
1296        // Replay insert's per-node work on every edge the match descended:
1297        // `insert_tokens` touches the inserting tenant on each full-match node
1298        // and counts its `advance` tokens.
1299        for (node, advance) in &path {
1300            node.touch_tenant(&tenant_id, track_lfu);
1301            tokens_added += *advance;
1302        }
1303
1304        // Splice only the unmatched suffix at the fall-off node (`current`),
1305        // reusing insert_tokens' exact descent loop. It re-probes `current`'s
1306        // children for `remaining` (a single child-map op, not a re-walk of the
1307        // matched prefix) and handles the vacant / split / — and, under a
1308        // concurrent split race, full-match-continue — cases identically to a
1309        // standalone `insert_tokens`. The matched prefix above `current` was
1310        // already re-attached by the loop above and is never re-walked.
1311        if remaining.len() >= PAGE_SIZE {
1312            tokens_added +=
1313                Self::insert_from(current, remaining, Arc::clone(&tenant_id), track_lfu);
1314        }
1315
1316        if tokens_added > 0 {
1317            self.tenant_token_count
1318                .entry(tenant_id)
1319                .and_modify(|c| *c += tokens_added)
1320                .or_insert(tokens_added);
1321        }
1322
1323        result
1324    }
1325
1326    /// Legacy prefix_match API returning (matched_tokens, tenant_string).
1327    pub fn prefix_match_legacy(&self, tokens: &[TokenId]) -> (Vec<TokenId>, String) {
1328        let result = self.match_prefix_with_counts(tokens);
1329        let matched: Vec<TokenId> = tokens[..result.matched_token_count].to_vec();
1330        (matched, result.tenant.to_string())
1331    }
1332
1333    /// Get token counts per tenant.
1334    pub fn get_tenant_token_counts(&self) -> HashMap<String, usize> {
1335        self.tenant_token_count
1336            .iter()
1337            .map(|entry| (entry.key().to_string(), *entry.value()))
1338            .collect()
1339    }
1340
1341    /// Compute eviction priority for a node based on the configured policy.
1342    /// Returns (primary_key, tiebreaker) where lower values are evicted first.
1343    ///
1344    /// Uses wrapping arithmetic for u64→i64 conversion. This preserves relative ordering
1345    /// for values within i64::MAX of each other, which is sufficient since our timestamps
1346    /// are monotonic counters starting from 0.
1347    fn compute_eviction_priority(&self, node: &Node, last_access_time: u64) -> (i64, u64) {
1348        match self.eviction_policy {
1349            EvictionPolicy::Lru => {
1350                // Lower last_access_time = older = evict first
1351                // Wrapping cast preserves ordering for nearby values
1352                (last_access_time as i64, 0)
1353            }
1354            EvictionPolicy::Lfu => {
1355                // Lower hit_count = less used = evict first, tiebreak by last_access_time
1356                let hit_count = node.hit_count.load(Ordering::Relaxed);
1357                (hit_count as i64, last_access_time)
1358            }
1359            EvictionPolicy::Fifo => {
1360                // Lower creation_time = older = evict first
1361                (node.creation_time as i64, 0)
1362            }
1363            EvictionPolicy::Mru => {
1364                // Higher last_access_time = newer = evict first (negate for min-heap)
1365                // wrapping_neg handles i64::MIN correctly (wraps to itself)
1366                ((last_access_time as i64).wrapping_neg(), 0)
1367            }
1368            EvictionPolicy::Filo => {
1369                // Higher creation_time = newer = evict first (negate for min-heap)
1370                ((node.creation_time as i64).wrapping_neg(), 0)
1371            }
1372            EvictionPolicy::Priority => {
1373                // Lower priority = less important = evict first, tiebreak by last_access_time
1374                let priority = node.priority.load(Ordering::Relaxed);
1375                (priority as i64, last_access_time)
1376            }
1377        }
1378    }
1379
1380    /// Evict entries for a tenant to reduce to max_tokens.
1381    /// Uses leaf-first eviction with the configured policy (matching SGLang's behavior).
1382    pub fn evict_tenant(&self, tenant: &TenantId, max_tokens: usize) {
1383        use std::{cmp::Reverse, collections::BinaryHeap};
1384
1385        let current_count = self
1386            .tenant_token_count
1387            .get(tenant.as_ref())
1388            .map(|v| *v)
1389            .unwrap_or(0);
1390
1391        if current_count <= max_tokens {
1392            return;
1393        }
1394
1395        let to_evict = current_count - max_tokens;
1396        let mut evicted = 0;
1397
1398        let mut leaves: Vec<(NodeRef, u64)> = Vec::new();
1399        self.collect_tenant_leaves(&self.root, tenant, &mut leaves);
1400
1401        // Min-heap by eviction priority (policy-dependent)
1402        let mut heap: BinaryHeap<Reverse<((i64, u64), usize)>> = BinaryHeap::new();
1403
1404        // Pre-allocate for initial leaves. Additional capacity for leaf promotions
1405        // (when a parent becomes a leaf after child eviction) will be handled by
1406        // Vec's amortized growth strategy.
1407        let mut leaf_data: Vec<NodeRef> = Vec::with_capacity(leaves.len());
1408
1409        for (node, ts) in leaves.drain(..) {
1410            let priority = self.compute_eviction_priority(&node, ts);
1411            let idx = leaf_data.len();
1412            leaf_data.push(node);
1413            heap.push(Reverse((priority, idx)));
1414        }
1415
1416        while evicted < to_evict {
1417            let Some(Reverse((_, idx))) = heap.pop() else {
1418                break;
1419            };
1420
1421            let node = &leaf_data[idx];
1422            let (node_tokens, parent_became_leaf) = self.remove_tenant_and_cleanup(node, tenant);
1423            if node_tokens > 0 {
1424                evicted += node_tokens;
1425
1426                // Incremental leaf promotion (matching SGLang's approach)
1427                if let Some((parent_node, parent_ts)) = parent_became_leaf {
1428                    let priority = self.compute_eviction_priority(&parent_node, parent_ts);
1429                    let new_idx = leaf_data.len();
1430                    leaf_data.push(parent_node);
1431                    heap.push(Reverse((priority, new_idx)));
1432                }
1433            }
1434        }
1435
1436        if let Some(mut count) = self.tenant_token_count.get_mut(tenant.as_ref()) {
1437            *count = count.saturating_sub(evicted);
1438        }
1439
1440        debug!(
1441            tenant = %tenant.as_ref(),
1442            evicted = evicted,
1443            remaining = current_count.saturating_sub(evicted),
1444            policy = ?self.eviction_policy,
1445            "Evicted tokens from tenant (leaf-first)"
1446        );
1447    }
1448
1449    /// Collect tenant-specific leaves (nodes where tenant exists but no children have tenant).
1450    fn collect_tenant_leaves(
1451        &self,
1452        node: &NodeRef,
1453        tenant_id: &TenantId,
1454        result: &mut Vec<(NodeRef, u64)>,
1455    ) {
1456        let is_root = Arc::ptr_eq(node, &self.root);
1457        let node_has_tenant = !is_root
1458            && node
1459                .tenant_last_access_time
1460                .contains_key(tenant_id.as_ref());
1461
1462        let mut any_child_has_tenant = false;
1463        for child_entry in &node.children {
1464            let child = child_entry.value();
1465            if child
1466                .tenant_last_access_time
1467                .contains_key(tenant_id.as_ref())
1468            {
1469                any_child_has_tenant = true;
1470            }
1471            self.collect_tenant_leaves(child, tenant_id, result);
1472        }
1473
1474        if node_has_tenant && !any_child_has_tenant {
1475            if let Some(ts) = node.tenant_last_access_time.get(tenant_id.as_ref()) {
1476                result.push((Arc::clone(node), *ts));
1477            }
1478        }
1479    }
1480
1481    #[expect(
1482        clippy::unused_self,
1483        reason = "method logically belongs to the tree instance; keeps API consistent with collect_tenant_leaves"
1484    )]
1485    fn is_tenant_leaf(&self, node: &NodeRef, tenant_id: &TenantId) -> bool {
1486        if !node
1487            .tenant_last_access_time
1488            .contains_key(tenant_id.as_ref())
1489        {
1490            return false;
1491        }
1492        for child_entry in &node.children {
1493            if child_entry
1494                .value()
1495                .tenant_last_access_time
1496                .contains_key(tenant_id.as_ref())
1497            {
1498                return false;
1499            }
1500        }
1501        true
1502    }
1503
1504    /// Remove tenant from node and clean up empty ancestors.
1505    /// Returns (tokens_removed, Option<(parent_node, ts)> if parent became a leaf).
1506    fn remove_tenant_and_cleanup(
1507        &self,
1508        node: &NodeRef,
1509        tenant_id: &TenantId,
1510    ) -> (usize, Option<(NodeRef, u64)>) {
1511        if node
1512            .tenant_last_access_time
1513            .remove(tenant_id.as_ref())
1514            .is_none()
1515        {
1516            return (0, None);
1517        }
1518
1519        let node_tokens = node.tokens.read().len();
1520        let parent_leaf_info = self.cleanup_empty_ancestors_with_parent(node, tenant_id);
1521
1522        (node_tokens, parent_leaf_info)
1523    }
1524
1525    /// Remove empty nodes walking up via parent pointers.
1526    /// Returns Some((parent_node, ts)) if a parent became a tenant-leaf.
1527    fn cleanup_empty_ancestors_with_parent(
1528        &self,
1529        node: &NodeRef,
1530        tenant_id: &TenantId,
1531    ) -> Option<(NodeRef, u64)> {
1532        let mut current = Arc::clone(node);
1533        let mut parent_leaf_info: Option<(NodeRef, u64)> = None;
1534
1535        loop {
1536            if !current.is_empty() {
1537                if parent_leaf_info.is_none() && self.is_tenant_leaf(&current, tenant_id) {
1538                    if let Some(ts) = current.tenant_last_access_time.get(tenant_id.as_ref()) {
1539                        parent_leaf_info = Some((Arc::clone(&current), *ts));
1540                    }
1541                }
1542                break;
1543            }
1544
1545            let parent_weak = current.parent.read();
1546            let Some(parent) = parent_weak.upgrade() else {
1547                break;
1548            };
1549            drop(parent_weak);
1550
1551            let page_key_guard = current.page_key.read();
1552            let Some(page_key) = *page_key_guard else {
1553                break;
1554            };
1555            drop(page_key_guard);
1556
1557            parent.children.remove(&page_key);
1558
1559            if parent_leaf_info.is_none() && self.is_tenant_leaf(&parent, tenant_id) {
1560                if let Some(ts) = parent.tenant_last_access_time.get(tenant_id.as_ref()) {
1561                    parent_leaf_info = Some((Arc::clone(&parent), *ts));
1562                }
1563            }
1564
1565            current = parent;
1566        }
1567
1568        parent_leaf_info
1569    }
1570
1571    /// Get the token count for a specific tenant.
1572    pub fn tenant_token_size(&self, tenant: &TenantId) -> usize {
1573        // Use borrowed lookup to avoid Arc hash overhead
1574        self.tenant_token_count
1575            .get(tenant.as_ref())
1576            .map(|v| *v)
1577            .unwrap_or(0)
1578    }
1579
1580    /// Clear the tree to empty state.
1581    pub fn clear(&self) {
1582        self.root.children.clear();
1583        self.root.tenant_last_access_time.clear();
1584        self.tenant_token_count.clear();
1585    }
1586
1587    // TODO: Implement efficient remove_tenant with reverse index.
1588    // See lib.rs for design options. Current naive O(n) traversal removed.
1589    // For now, stale entries are cleaned up by LRU eviction.
1590
1591    /// Evict cache entries by total token count to reduce memory usage.
1592    /// Convenience method matching the StringTree API.
1593    ///
1594    /// For each tenant, reduces their token usage to `max_size` if they exceed it.
1595    pub fn evict_tenant_by_size(&self, max_size: usize) {
1596        // Collect tenants that exceed the max size
1597        let tenants_to_evict: Vec<TenantId> = self
1598            .tenant_token_count
1599            .iter()
1600            .filter(|entry| *entry.value() > max_size)
1601            .map(|entry| Arc::clone(entry.key()))
1602            .collect();
1603
1604        // Evict each tenant
1605        for tenant_id in tenants_to_evict {
1606            self.evict_tenant(&tenant_id, max_size);
1607        }
1608    }
1609
1610    /// Lazily walk the tree in pre-order, yielding each node's
1611    /// `(prefix_tokens, [(tenant, epoch)])` pair without
1612    /// materializing the full tree as a single buffer.
1613    /// Empty-tenant nodes are skipped (they're routing
1614    /// intermediates, not real entries). Children are visited in
1615    /// lexicographic page-key order so callers paging the
1616    /// iterator can resume by re-running and skipping the first
1617    /// N items if the tree is unchanged.
1618    ///
1619    /// The walk is not atomic — concurrent `insert_tokens` may
1620    /// split or replace nodes mid-walk; the iterator may then
1621    /// reflect a mix of pre- and post-split state. Acceptable
1622    /// for mesh sync (eventual consistency).
1623    pub fn iter_entries(&self) -> EntriesIter {
1624        EntriesIter::new(self)
1625    }
1626}
1627
1628/// Iterator yielded by [`TokenTree::iter_entries`]. Owns
1629/// `Arc<Node>` clones for the path it's currently inside so it
1630/// survives across awaits or temporary releases of `&TokenTree`.
1631pub struct EntriesIter {
1632    /// DFS frame stack. The top frame's `remaining_children` is
1633    /// the queue of children we still have to visit at the
1634    /// current depth. Frames are pushed when descending, popped
1635    /// when their children exhaust.
1636    stack: Vec<EntryFrame>,
1637    /// Mutable accumulated path-from-root tokens. Pushed when
1638    /// descending into a child, truncated when popping a frame.
1639    path: Vec<TokenId>,
1640    /// Pre-staged emission. Set when a node with tenants is
1641    /// entered; consumed by the next `next()` call before
1642    /// continuing the walk.
1643    pending: Option<EntryItem>,
1644}
1645
1646type EntryItem = (Vec<TokenId>, Vec<(TenantId, u64)>);
1647
1648struct EntryFrame {
1649    remaining_children: std::vec::IntoIter<NodeRef>,
1650    /// Number of tokens contributed by this frame's edge — used
1651    /// to truncate the path buffer correctly when popping.
1652    edge_token_count: usize,
1653}
1654
1655impl EntriesIter {
1656    fn new(tree: &TokenTree) -> Self {
1657        let root_tenants = snapshot_tenants(&tree.root);
1658        let pending = (!root_tenants.is_empty()).then(|| (Vec::<TokenId>::new(), root_tenants));
1659        Self {
1660            stack: vec![EntryFrame {
1661                remaining_children: collect_sorted_children(&tree.root).into_iter(),
1662                edge_token_count: 0,
1663            }],
1664            path: Vec::new(),
1665            pending,
1666        }
1667    }
1668}
1669
1670impl Iterator for EntriesIter {
1671    type Item = EntryItem;
1672
1673    fn next(&mut self) -> Option<Self::Item> {
1674        if let Some(entry) = self.pending.take() {
1675            return Some(entry);
1676        }
1677        loop {
1678            let frame = self.stack.last_mut()?;
1679            if let Some(child) = frame.remaining_children.next() {
1680                // Extend the path directly through the lock guard
1681                // — no intermediate Vec<TokenId> clone.
1682                let edge_token_count = {
1683                    let guard = child.tokens.read();
1684                    self.path.extend_from_slice(&guard);
1685                    guard.len()
1686                };
1687
1688                let tenants = snapshot_tenants(&child);
1689                self.stack.push(EntryFrame {
1690                    remaining_children: collect_sorted_children(&child).into_iter(),
1691                    edge_token_count,
1692                });
1693                if !tenants.is_empty() {
1694                    return Some((self.path.clone(), tenants));
1695                }
1696                // Empty tenants → loop continues, descending into
1697                // this child's children on the next iteration.
1698            } else {
1699                let edge_token_count = frame.edge_token_count;
1700                self.stack.pop();
1701                let new_len = self.path.len().saturating_sub(edge_token_count);
1702                self.path.truncate(new_len);
1703            }
1704        }
1705    }
1706}
1707
1708fn snapshot_tenants(node: &NodeRef) -> Vec<(TenantId, u64)> {
1709    let mut tenants: Vec<(TenantId, u64)> = node
1710        .tenant_last_access_time
1711        .iter()
1712        .map(|entry| (Arc::clone(entry.key()), *entry.value()))
1713        .collect();
1714    tenants.sort_by(|a, b| a.0.cmp(&b.0));
1715    tenants
1716}
1717
1718fn collect_sorted_children(node: &NodeRef) -> Vec<NodeRef> {
1719    let mut children: Vec<(TokenPageKey, NodeRef)> = node
1720        .children
1721        .iter()
1722        .map(|entry| (*entry.key(), entry.value().clone()))
1723        .collect();
1724    // Lexicographic order over the page key — deterministic
1725    // traversal across hash-shard layouts.
1726    children.sort_by_key(|(k, _)| *k);
1727    children.into_iter().map(|(_, n)| n).collect()
1728}
1729
1730impl RadixTree for TokenTree {
1731    type Key = [TokenId];
1732    type MatchResult = PrefixMatchResult;
1733
1734    fn insert(&self, key: &Self::Key, tenant: &str) {
1735        self.insert_tokens(key, tenant);
1736    }
1737
1738    fn prefix_match(&self, key: &Self::Key) -> Option<TenantId> {
1739        let result = self.match_prefix_with_counts(key);
1740        if result.matched_token_count > 0 {
1741            Some(result.tenant)
1742        } else {
1743            None
1744        }
1745    }
1746
1747    fn prefix_match_with_counts(&self, key: &Self::Key) -> Self::MatchResult {
1748        self.match_prefix_with_counts(key)
1749    }
1750
1751    fn evict(&self, tenant: &TenantId, max_units: usize) {
1752        self.evict_tenant(tenant, max_units);
1753    }
1754
1755    fn tenant_size(&self, tenant: &TenantId) -> usize {
1756        self.tenant_token_size(tenant)
1757    }
1758
1759    fn reset(&self) {
1760        self.clear();
1761    }
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766    use std::{sync::Arc, thread};
1767
1768    use super::*;
1769
1770    /// Helper to create a page-aligned token sequence starting from `base`.
1771    /// Creates `pages` full pages of tokens.
1772    fn make_tokens(base: u32, pages: usize) -> Vec<TokenId> {
1773        (0..(pages * PAGE_SIZE)).map(|i| base + i as u32).collect()
1774    }
1775
1776    #[test]
1777    fn test_basic_insert_match() {
1778        let tree = TokenTree::new();
1779
1780        // Insert 2 pages (32 tokens)
1781        let tokens = make_tokens(1, 2);
1782        tree.insert_tokens(&tokens, "tenant1");
1783
1784        // Exact match
1785        let result = tree.match_prefix_with_counts(&tokens);
1786        assert_eq!(result.matched_token_count, 32);
1787        assert_eq!(result.tenant.as_ref(), "tenant1");
1788
1789        // Match first page only
1790        let first_page = make_tokens(1, 1);
1791        let result = tree.match_prefix_with_counts(&first_page);
1792        assert_eq!(result.matched_token_count, PAGE_SIZE);
1793        assert_eq!(result.tenant.as_ref(), "tenant1");
1794
1795        // Match with extra tokens (truncated to page boundary)
1796        let mut extended = tokens.clone();
1797        extended.extend([100, 101, 102, 103, 104]);
1798        let result = tree.match_prefix_with_counts(&extended);
1799        assert_eq!(result.matched_token_count, 32);
1800    }
1801
1802    #[test]
1803    fn test_short_sequences_skipped() {
1804        let tree = TokenTree::new();
1805
1806        // Sequences shorter than PAGE_SIZE are skipped
1807        tree.insert_tokens(&[1, 2, 3, 4, 5], "tenant1");
1808
1809        // Should have no entries (too short)
1810        let counts = tree.get_tenant_token_counts();
1811        assert!(counts.is_empty() || counts.get("tenant1").copied().unwrap_or(0) == 0);
1812
1813        // Lookup also returns 0 for short sequences
1814        let result = tree.match_prefix_with_counts(&[1, 2, 3, 4, 5]);
1815        assert_eq!(result.matched_token_count, 0);
1816        assert_eq!(result.input_token_count, 5);
1817    }
1818
1819    #[test]
1820    fn test_multiple_tenants() {
1821        let tree = TokenTree::new();
1822
1823        let tokens = make_tokens(1, 1);
1824        tree.insert_tokens(&tokens, "tenant1");
1825        tree.insert_tokens(&tokens, "tenant2");
1826
1827        let result = tree.match_prefix_with_counts(&tokens);
1828        assert_eq!(result.matched_token_count, PAGE_SIZE);
1829        // Either tenant is valid
1830        assert!(result.tenant.as_ref() == "tenant1" || result.tenant.as_ref() == "tenant2");
1831    }
1832
1833    #[test]
1834    fn test_prefix_split() {
1835        let tree = TokenTree::new();
1836
1837        // Insert 3 pages first
1838        let long_tokens = make_tokens(1, 3);
1839        tree.insert_tokens(&long_tokens, "tenant1");
1840
1841        // Insert 1 page (causes split)
1842        let short_tokens = make_tokens(1, 1);
1843        tree.insert_tokens(&short_tokens, "tenant2");
1844
1845        // Short match
1846        let result = tree.match_prefix_with_counts(&short_tokens);
1847        assert_eq!(result.matched_token_count, PAGE_SIZE);
1848
1849        // Long match
1850        let result = tree.match_prefix_with_counts(&long_tokens);
1851        assert_eq!(result.matched_token_count, 3 * PAGE_SIZE);
1852        assert_eq!(result.tenant.as_ref(), "tenant1");
1853    }
1854
1855    #[test]
1856    fn test_empty_input() {
1857        let tree = TokenTree::new();
1858
1859        let tokens = make_tokens(1, 1);
1860        tree.insert_tokens(&tokens, "tenant1");
1861
1862        let result = tree.match_prefix_with_counts(&[]);
1863        assert_eq!(result.matched_token_count, 0);
1864        assert_eq!(result.input_token_count, 0);
1865    }
1866
1867    #[test]
1868    fn test_no_match() {
1869        let tree = TokenTree::new();
1870
1871        let tokens = make_tokens(1, 1);
1872        tree.insert_tokens(&tokens, "tenant1");
1873
1874        // Different page key
1875        let other = make_tokens(1000, 1);
1876        let result = tree.match_prefix_with_counts(&other);
1877        assert_eq!(result.matched_token_count, 0);
1878    }
1879
1880    #[test]
1881    fn test_eviction() {
1882        let tree = TokenTree::new();
1883
1884        let tokens1 = make_tokens(1, 2);
1885        let tokens2 = make_tokens(1, 3);
1886        tree.insert_tokens(&tokens1, "tenant1");
1887        tree.insert_tokens(&tokens2, "tenant1");
1888
1889        let counts = tree.get_tenant_token_counts();
1890        assert!(counts.get("tenant1").unwrap() > &0);
1891
1892        tree.evict(&TenantId::from("tenant1"), 0);
1893
1894        // After eviction, counts should be reduced
1895        let new_counts = tree.get_tenant_token_counts();
1896        let new_count = new_counts.get("tenant1").copied().unwrap_or(0);
1897        assert!(new_count < *counts.get("tenant1").unwrap());
1898    }
1899
1900    #[test]
1901    fn test_concurrent_insert_match() {
1902        let tree = Arc::new(TokenTree::new());
1903        let mut handles = vec![];
1904
1905        // Spawn inserters - use page-aligned sequences
1906        for i in 0..4 {
1907            let tree = Arc::clone(&tree);
1908            handles.push(thread::spawn(move || {
1909                for j in 0..100 {
1910                    let base = (i * 1000000 + j * 1000) as u32;
1911                    let tokens = make_tokens(base, 2);
1912                    tree.insert_tokens(&tokens, &format!("tenant{i}"));
1913                }
1914            }));
1915        }
1916
1917        // Spawn matchers
1918        for i in 0..4 {
1919            let tree = Arc::clone(&tree);
1920            handles.push(thread::spawn(move || {
1921                for j in 0..100 {
1922                    let base = (i * 1000000 + j * 1000) as u32;
1923                    let tokens = make_tokens(base, 2);
1924                    let _ = tree.match_prefix_with_counts(&tokens);
1925                }
1926            }));
1927        }
1928
1929        for handle in handles {
1930            handle.join().unwrap();
1931        }
1932    }
1933
1934    #[test]
1935    fn test_prefix_match_with_counts() {
1936        let tree = TokenTree::new();
1937
1938        let tokens = make_tokens(1, 2);
1939        tree.insert_tokens(&tokens, "tenant1");
1940
1941        // Exact match
1942        let result = tree.match_prefix_with_counts(&tokens);
1943        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
1944        assert_eq!(result.input_token_count, 2 * PAGE_SIZE);
1945
1946        // Match first page
1947        let first_page = make_tokens(1, 1);
1948        let result = tree.match_prefix_with_counts(&first_page);
1949        assert_eq!(result.matched_token_count, PAGE_SIZE);
1950        assert_eq!(result.input_token_count, PAGE_SIZE);
1951
1952        // Extended input (aligned)
1953        let extended = make_tokens(1, 3);
1954        let result = tree.match_prefix_with_counts(&extended);
1955        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
1956        assert_eq!(result.input_token_count, 3 * PAGE_SIZE);
1957    }
1958
1959    #[test]
1960    fn test_disjoint_paths() {
1961        let tree = TokenTree::new();
1962
1963        let tokens1 = make_tokens(1, 1);
1964        let tokens2 = make_tokens(1000, 1);
1965        let tokens3 = make_tokens(2000, 1);
1966
1967        tree.insert_tokens(&tokens1, "tenant1");
1968        tree.insert_tokens(&tokens2, "tenant2");
1969        tree.insert_tokens(&tokens3, "tenant3");
1970
1971        let result = tree.match_prefix_with_counts(&tokens1);
1972        assert_eq!(result.matched_token_count, PAGE_SIZE);
1973        assert_eq!(result.tenant.as_ref(), "tenant1");
1974
1975        let result = tree.match_prefix_with_counts(&tokens2);
1976        assert_eq!(result.matched_token_count, PAGE_SIZE);
1977        assert_eq!(result.tenant.as_ref(), "tenant2");
1978
1979        let result = tree.match_prefix_with_counts(&tokens3);
1980        assert_eq!(result.matched_token_count, PAGE_SIZE);
1981        assert_eq!(result.tenant.as_ref(), "tenant3");
1982    }
1983
1984    #[test]
1985    fn test_branching_paths() {
1986        let tree = TokenTree::new();
1987
1988        // Common first page, different second page
1989        let mut tokens1 = make_tokens(1, 1);
1990        tokens1.extend(make_tokens(100, 1));
1991
1992        let mut tokens2 = make_tokens(1, 1);
1993        tokens2.extend(make_tokens(200, 1));
1994
1995        let mut tokens3 = make_tokens(1, 1);
1996        tokens3.extend(make_tokens(300, 1));
1997
1998        tree.insert_tokens(&tokens1, "tenant1");
1999        tree.insert_tokens(&tokens2, "tenant2");
2000        tree.insert_tokens(&tokens3, "tenant3");
2001
2002        let result = tree.match_prefix_with_counts(&tokens1);
2003        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2004        assert_eq!(result.tenant.as_ref(), "tenant1");
2005
2006        let result = tree.match_prefix_with_counts(&tokens2);
2007        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2008        assert_eq!(result.tenant.as_ref(), "tenant2");
2009
2010        // Partial match at branch point
2011        let first_page = make_tokens(1, 1);
2012        let result = tree.match_prefix_with_counts(&first_page);
2013        assert_eq!(result.matched_token_count, PAGE_SIZE);
2014    }
2015
2016    #[test]
2017    fn test_radix_tree_trait() {
2018        let tree = TokenTree::new();
2019
2020        let tokens = make_tokens(1, 2);
2021        RadixTree::insert(&tree, &tokens, "tenant1");
2022
2023        let tenant = RadixTree::prefix_match(&tree, &tokens);
2024        assert!(tenant.is_some());
2025        assert_eq!(tenant.unwrap().as_ref(), "tenant1");
2026
2027        // Extended input - should match 2 pages (short sequences get 0)
2028        let extended = make_tokens(1, 3);
2029        let result = RadixTree::prefix_match_with_counts(&tree, &extended);
2030        assert_eq!(result.matched_count(), 2 * PAGE_SIZE);
2031        assert_eq!(result.input_count(), 3 * PAGE_SIZE);
2032
2033        assert!(RadixTree::tenant_size(&tree, &TenantId::from("tenant1")) > 0);
2034    }
2035
2036    #[test]
2037    fn test_clear() {
2038        let tree = TokenTree::new();
2039
2040        let tokens1 = make_tokens(1, 1);
2041        let tokens2 = make_tokens(1000, 1);
2042        tree.insert_tokens(&tokens1, "tenant1");
2043        tree.insert_tokens(&tokens2, "tenant2");
2044
2045        assert!(!tree.get_tenant_token_counts().is_empty());
2046
2047        tree.clear();
2048
2049        assert!(tree.get_tenant_token_counts().is_empty());
2050        let result = tree.match_prefix_with_counts(&tokens1);
2051        assert_eq!(result.matched_token_count, 0);
2052    }
2053
2054    #[test]
2055    fn test_tenant_token_count() {
2056        let tree = TokenTree::new();
2057
2058        let tokens1 = make_tokens(1, 2);
2059        let tokens2 = make_tokens(1, 3);
2060        let tokens3 = make_tokens(1000, 1);
2061        tree.insert_tokens(&tokens1, "tenant1");
2062        tree.insert_tokens(&tokens2, "tenant1");
2063        tree.insert_tokens(&tokens3, "tenant2");
2064
2065        let tenant1_id: TenantId = Arc::from("tenant1");
2066        let tenant2_id: TenantId = Arc::from("tenant2");
2067
2068        assert!(tree.tenant_token_size(&tenant1_id) >= PAGE_SIZE);
2069        assert!(tree.tenant_token_size(&tenant2_id) >= PAGE_SIZE);
2070
2071        let counts = tree.get_tenant_token_counts();
2072        assert!(counts.contains_key("tenant1"));
2073        assert!(counts.contains_key("tenant2"));
2074    }
2075
2076    #[test]
2077    fn test_cold_start() {
2078        let tree = TokenTree::new();
2079        // Short sequences return 0 (no cache benefit)
2080        let result = tree.match_prefix_with_counts(&[1, 2, 3, 4, 5]);
2081        assert_eq!(result.matched_token_count, 0);
2082        assert_eq!(result.input_token_count, 5);
2083
2084        // Page-sized sequences also return 0 on empty tree
2085        let tokens = make_tokens(1, 1);
2086        let result = tree.match_prefix_with_counts(&tokens);
2087        assert_eq!(result.matched_token_count, 0);
2088        assert_eq!(result.input_token_count, PAGE_SIZE);
2089    }
2090
2091    #[test]
2092    fn test_exact_match_seq() {
2093        let tree = TokenTree::new();
2094
2095        for i in 0..100 {
2096            let base = (i * 1000) as u32;
2097            let tokens = make_tokens(base, 2);
2098            tree.insert_tokens(&tokens, &format!("tenant{i}"));
2099        }
2100
2101        for i in 0..100 {
2102            let base = (i * 1000) as u32;
2103            let tokens = make_tokens(base, 2);
2104            let result = tree.match_prefix_with_counts(&tokens);
2105            assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2106            assert_eq!(result.tenant.as_ref(), &format!("tenant{i}"));
2107        }
2108    }
2109
2110    #[test]
2111    fn test_exact_match_concurrent() {
2112        let tree = Arc::new(TokenTree::new());
2113        let num_threads = 8;
2114        let entries_per_thread = 100;
2115
2116        // Insert phase
2117        let mut handles = vec![];
2118        for t in 0..num_threads {
2119            let tree = Arc::clone(&tree);
2120            handles.push(thread::spawn(move || {
2121                for i in 0..entries_per_thread {
2122                    let base = (t * 1000000 + i * 1000) as u32;
2123                    let tokens = make_tokens(base, 2);
2124                    tree.insert_tokens(&tokens, &format!("tenant{t}"));
2125                }
2126            }));
2127        }
2128        for handle in handles {
2129            handle.join().unwrap();
2130        }
2131
2132        // Match phase
2133        let mut handles = vec![];
2134        for t in 0..num_threads {
2135            let tree = Arc::clone(&tree);
2136            handles.push(thread::spawn(move || {
2137                for i in 0..entries_per_thread {
2138                    let base = (t * 1000000 + i * 1000) as u32;
2139                    let tokens = make_tokens(base, 2);
2140                    let result = tree.match_prefix_with_counts(&tokens);
2141                    assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2142                    assert_eq!(result.tenant.as_ref(), &format!("tenant{t}"));
2143                }
2144            }));
2145        }
2146        for handle in handles {
2147            handle.join().unwrap();
2148        }
2149    }
2150
2151    #[test]
2152    fn test_partial_match_concurrent() {
2153        let tree = Arc::new(TokenTree::new());
2154        let num_threads = 8;
2155        let entries_per_thread = 100;
2156
2157        // Insert full sequences (3 pages)
2158        let mut handles = vec![];
2159        for t in 0..num_threads {
2160            let tree = Arc::clone(&tree);
2161            handles.push(thread::spawn(move || {
2162                for i in 0..entries_per_thread {
2163                    let base = (t * 1000000 + i * 1000) as u32;
2164                    let tokens = make_tokens(base, 3);
2165                    tree.insert_tokens(&tokens, &format!("tenant{t}"));
2166                }
2167            }));
2168        }
2169        for handle in handles {
2170            handle.join().unwrap();
2171        }
2172
2173        // Match with partial (1 page)
2174        let mut handles = vec![];
2175        for t in 0..num_threads {
2176            let tree = Arc::clone(&tree);
2177            handles.push(thread::spawn(move || {
2178                for i in 0..entries_per_thread {
2179                    let base = (t * 1000000 + i * 1000) as u32;
2180                    let partial = make_tokens(base, 1);
2181                    let result = tree.match_prefix_with_counts(&partial);
2182                    assert_eq!(result.matched_token_count, PAGE_SIZE);
2183                }
2184            }));
2185        }
2186        for handle in handles {
2187            handle.join().unwrap();
2188        }
2189    }
2190
2191    #[test]
2192    fn test_group_prefix_insert_match_concurrent() {
2193        let tree = Arc::new(TokenTree::new());
2194        let num_threads = 8;
2195
2196        // All threads share the same prefix (1 page)
2197        let common_prefix = make_tokens(100, 1);
2198
2199        let mut handles = vec![];
2200        for t in 0..num_threads {
2201            let tree = Arc::clone(&tree);
2202            let prefix = common_prefix.clone();
2203            handles.push(thread::spawn(move || {
2204                for i in 0..50 {
2205                    let mut tokens = prefix.clone();
2206                    let suffix = make_tokens((t * 10000 + i * 100) as u32, 1);
2207                    tokens.extend(suffix);
2208                    tree.insert_tokens(&tokens, &format!("tenant{t}"));
2209                }
2210            }));
2211        }
2212        for handle in handles {
2213            handle.join().unwrap();
2214        }
2215
2216        // Verify prefix matching works
2217        let result = tree.match_prefix_with_counts(&common_prefix);
2218        assert_eq!(result.matched_token_count, PAGE_SIZE);
2219    }
2220
2221    #[test]
2222    fn test_mixed_concurrent_insert_match() {
2223        let tree = Arc::new(TokenTree::new());
2224        let num_threads = 4;
2225
2226        // Pre-populate some data
2227        for i in 0..100 {
2228            let base = (i * 1000) as u32;
2229            let tokens = make_tokens(base, 2);
2230            tree.insert_tokens(&tokens, &format!("initial{i}"));
2231        }
2232
2233        let mut handles = vec![];
2234
2235        // Concurrent inserters
2236        for t in 0..num_threads {
2237            let tree = Arc::clone(&tree);
2238            handles.push(thread::spawn(move || {
2239                for i in 0..100 {
2240                    let base = (10000000 + t * 100000 + i * 1000) as u32;
2241                    let tokens = make_tokens(base, 2);
2242                    tree.insert_tokens(&tokens, &format!("new_tenant{t}"));
2243                }
2244            }));
2245        }
2246
2247        // Concurrent matchers (matching existing data)
2248        for t in 0..num_threads {
2249            let tree = Arc::clone(&tree);
2250            handles.push(thread::spawn(move || {
2251                for _ in 0..100 {
2252                    let base = ((t * 10) * 1000) as u32;
2253                    let tokens = make_tokens(base, 2);
2254                    let result = tree.match_prefix_with_counts(&tokens);
2255                    assert!(result.matched_token_count > 0);
2256                }
2257            }));
2258        }
2259
2260        for handle in handles {
2261            handle.join().unwrap();
2262        }
2263    }
2264
2265    #[test]
2266    fn test_simple_eviction() {
2267        let tree = TokenTree::new();
2268
2269        let tokens1 = make_tokens(1, 2);
2270        let tokens2 = make_tokens(1000, 2);
2271        tree.insert_tokens(&tokens1, "tenant1");
2272        tree.insert_tokens(&tokens2, "tenant2");
2273
2274        let tenant1_id: TenantId = Arc::from("tenant1");
2275        tree.evict_tenant(&tenant1_id, 0);
2276
2277        // tenant2 should still work
2278        let result = tree.match_prefix_with_counts(&tokens2);
2279        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2280        assert_eq!(result.tenant.as_ref(), "tenant2");
2281    }
2282
2283    #[test]
2284    fn test_advanced_eviction() {
2285        let tree = TokenTree::new();
2286
2287        // Insert multiple paths for tenant1
2288        let mut tokens1 = make_tokens(1, 1);
2289        tokens1.extend(make_tokens(100, 1));
2290        let mut tokens2 = make_tokens(1, 1);
2291        tokens2.extend(make_tokens(200, 1));
2292        let mut tokens3 = make_tokens(1, 1);
2293        tokens3.extend(make_tokens(300, 1));
2294
2295        tree.insert_tokens(&tokens1, "tenant1");
2296        tree.insert_tokens(&tokens2, "tenant1");
2297        tree.insert_tokens(&tokens3, "tenant1");
2298
2299        let tenant1_id: TenantId = Arc::from("tenant1");
2300
2301        // Partial eviction
2302        let initial_size = tree.tenant_token_size(&tenant1_id);
2303        tree.evict_tenant(&tenant1_id, initial_size / 2);
2304        let after_size = tree.tenant_token_size(&tenant1_id);
2305
2306        assert!(after_size <= initial_size);
2307    }
2308
2309    #[test]
2310    fn test_concurrent_operations_with_eviction() {
2311        let tree = Arc::new(TokenTree::new());
2312        let num_threads = 4;
2313
2314        // Pre-populate
2315        for i in 0..100 {
2316            let base = (i * 1000) as u32;
2317            let tokens = make_tokens(base, 2);
2318            tree.insert_tokens(&tokens, &format!("tenant{}", i % 4));
2319        }
2320
2321        let mut handles = vec![];
2322
2323        // Inserters
2324        for t in 0..num_threads {
2325            let tree = Arc::clone(&tree);
2326            handles.push(thread::spawn(move || {
2327                for i in 0..50 {
2328                    let base = (10000000 + t * 100000 + i * 1000) as u32;
2329                    let tokens = make_tokens(base, 2);
2330                    tree.insert_tokens(&tokens, &format!("tenant{t}"));
2331                }
2332            }));
2333        }
2334
2335        // Evictors
2336        for t in 0..num_threads {
2337            let tree = Arc::clone(&tree);
2338            handles.push(thread::spawn(move || {
2339                let tenant_id: TenantId = Arc::from(format!("tenant{t}"));
2340                for _ in 0..10 {
2341                    tree.evict_tenant(&tenant_id, 50);
2342                    thread::sleep(std::time::Duration::from_millis(1));
2343                }
2344            }));
2345        }
2346
2347        // Matchers
2348        for _ in 0..num_threads {
2349            let tree = Arc::clone(&tree);
2350            handles.push(thread::spawn(move || {
2351                for i in 0..50 {
2352                    let base = (i * 1000) as u32;
2353                    let tokens = make_tokens(base, 1);
2354                    let _ = tree.match_prefix_with_counts(&tokens);
2355                }
2356            }));
2357        }
2358
2359        for handle in handles {
2360            handle.join().unwrap();
2361        }
2362    }
2363
2364    #[test]
2365    fn test_get_used_size_per_tenant() {
2366        let tree = TokenTree::new();
2367
2368        let tokens1 = make_tokens(1, 2);
2369        let tokens2 = make_tokens(1, 3);
2370        let tokens3 = make_tokens(1000, 1);
2371        tree.insert_tokens(&tokens1, "tenant1");
2372        tree.insert_tokens(&tokens2, "tenant1");
2373        tree.insert_tokens(&tokens3, "tenant2");
2374
2375        let counts = tree.get_tenant_token_counts();
2376
2377        assert!(counts.contains_key("tenant1"));
2378        assert!(counts.contains_key("tenant2"));
2379        assert!(*counts.get("tenant1").unwrap() >= PAGE_SIZE);
2380        assert!(*counts.get("tenant2").unwrap() >= PAGE_SIZE);
2381    }
2382
2383    #[test]
2384    fn test_prefix_match_tenant() {
2385        let tree = TokenTree::new();
2386
2387        let tokens = make_tokens(1, 2);
2388        tree.insert_tokens(&tokens, "tenant1");
2389        tree.insert_tokens(&tokens, "tenant2");
2390
2391        // Both tenants should have access time updated
2392        let result = tree.match_prefix_with_counts(&tokens);
2393        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2394        // tenant should be either tenant1 or tenant2 (last_tenant cache)
2395        assert!(result.tenant.as_ref() == "tenant1" || result.tenant.as_ref() == "tenant2");
2396    }
2397
2398    #[test]
2399    fn test_simple_tenant_eviction() {
2400        let tree = TokenTree::new();
2401
2402        let tokens1 = make_tokens(1, 2);
2403        let tokens2 = make_tokens(1000, 2);
2404        tree.insert_tokens(&tokens1, "tenant1");
2405        tree.insert_tokens(&tokens2, "tenant2");
2406
2407        let tenant1_id: TenantId = Arc::from("tenant1");
2408        tree.evict_tenant(&tenant1_id, 0);
2409
2410        // tenant2 should be unaffected
2411        let result = tree.match_prefix_with_counts(&tokens2);
2412        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2413        assert_eq!(result.tenant.as_ref(), "tenant2");
2414    }
2415
2416    #[test]
2417    fn test_complex_tenant_eviction() {
2418        let tree = TokenTree::new();
2419
2420        // Create overlapping paths (common first page, different second pages)
2421        let mut tokens1 = make_tokens(1, 1);
2422        tokens1.extend(make_tokens(100, 1));
2423        let mut tokens2 = make_tokens(1, 1);
2424        tokens2.extend(make_tokens(200, 1));
2425        let mut tokens3 = make_tokens(1, 1);
2426        tokens3.extend(make_tokens(300, 1));
2427
2428        tree.insert_tokens(&tokens1, "tenant1");
2429        tree.insert_tokens(&tokens2, "tenant2");
2430        tree.insert_tokens(&tokens3, "tenant1");
2431
2432        let tenant1_id: TenantId = Arc::from("tenant1");
2433        tree.evict_tenant(&tenant1_id, 0);
2434
2435        // tenant2's path should still work
2436        let result = tree.match_prefix_with_counts(&tokens2);
2437        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2438        assert_eq!(result.tenant.as_ref(), "tenant2");
2439    }
2440
2441    #[test]
2442    fn test_empty_node_cleanup_after_eviction() {
2443        // Test that empty nodes are removed from the tree after tenant eviction
2444        // This prevents memory bloat from orphaned nodes
2445        let tree = TokenTree::new();
2446
2447        // Insert a path only owned by tenant1
2448        let tokens1 = make_tokens(500, 2); // Unique path
2449        tree.insert_tokens(&tokens1, "tenant1");
2450
2451        // Verify tree has children before eviction
2452        assert!(
2453            !tree.root.children.is_empty(),
2454            "Tree should have children after insert"
2455        );
2456
2457        // Evict tenant1 completely
2458        let tenant1_id: TenantId = Arc::from("tenant1");
2459        tree.evict_tenant(&tenant1_id, 0);
2460
2461        // Verify empty nodes were cleaned up
2462        assert!(
2463            tree.root.children.is_empty(),
2464            "Empty nodes should be removed after tenant eviction"
2465        );
2466
2467        // Verify tenant count is zero
2468        assert_eq!(tree.tenant_token_size(&tenant1_id), 0);
2469    }
2470
2471    #[test]
2472    fn test_partial_cleanup_shared_nodes() {
2473        // Test that shared nodes are NOT cleaned up when still owned by other tenants
2474        let tree = TokenTree::new();
2475
2476        // Both tenants share the same path
2477        let tokens = make_tokens(100, 2);
2478        tree.insert_tokens(&tokens, "tenant1");
2479        tree.insert_tokens(&tokens, "tenant2");
2480
2481        // Evict tenant1
2482        let tenant1_id: TenantId = Arc::from("tenant1");
2483        tree.evict_tenant(&tenant1_id, 0);
2484
2485        // Tree should still have children (owned by tenant2)
2486        assert!(
2487            !tree.root.children.is_empty(),
2488            "Shared nodes should NOT be removed when other tenants exist"
2489        );
2490
2491        // tenant2 should still work
2492        let result = tree.match_prefix_with_counts(&tokens);
2493        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2494        assert_eq!(result.tenant.as_ref(), "tenant2");
2495    }
2496
2497    #[test]
2498    fn test_leaf_first_eviction() {
2499        // Test that eviction follows leaf-first LRU like SGLang's radix cache
2500        //
2501        // Tree structure for tenant1:
2502        //   root -> [A: page1] -> [B: page2] -> [C: page3]
2503        //
2504        // When we evict, C (leaf) should be evicted first, then B, then A.
2505        // This ensures shared prefixes (like system prompts near root) survive longer.
2506
2507        let tree = TokenTree::new();
2508
2509        // Insert a 3-page path: A -> B -> C
2510        let path_abc = make_tokens(1, 3); // 3 pages
2511        tree.insert_tokens(&path_abc, "tenant1");
2512
2513        // Also insert just the first page (shared prefix)
2514        let path_a = make_tokens(1, 1); // 1 page
2515        tree.insert_tokens(&path_a, "tenant1");
2516
2517        // Initial count should be 3 pages (A, B, C share with the single A)
2518        let initial_count = tree.tenant_token_size(&Arc::from("tenant1"));
2519        assert_eq!(initial_count, 3 * PAGE_SIZE);
2520
2521        // Evict to keep only 2 pages - should evict C (the leaf)
2522        let tenant1_id: TenantId = Arc::from("tenant1");
2523        tree.evict_tenant(&tenant1_id, 2 * PAGE_SIZE);
2524
2525        // After evicting C, we should still match A and B
2526        let result = tree.match_prefix_with_counts(&path_abc);
2527        // Should match 2 pages (A and B), not 3
2528        assert!(
2529            result.matched_token_count <= 2 * PAGE_SIZE,
2530            "Expected at most 2 pages matched after evicting leaf, got {}",
2531            result.matched_token_count / PAGE_SIZE
2532        );
2533
2534        // The prefix path should still work
2535        let result = tree.match_prefix_with_counts(&path_a);
2536        assert_eq!(
2537            result.matched_token_count, PAGE_SIZE,
2538            "Shared prefix A should still be cached"
2539        );
2540
2541        // Evict more - should evict B (now the new leaf)
2542        tree.evict_tenant(&tenant1_id, PAGE_SIZE);
2543
2544        // Now only A should remain
2545        let result = tree.match_prefix_with_counts(&path_abc);
2546        assert!(
2547            result.matched_token_count <= PAGE_SIZE,
2548            "Expected at most 1 page matched after evicting B"
2549        );
2550
2551        // A should still work
2552        let result = tree.match_prefix_with_counts(&path_a);
2553        assert_eq!(
2554            result.matched_token_count, PAGE_SIZE,
2555            "Root prefix A should survive longest"
2556        );
2557    }
2558
2559    #[test]
2560    fn test_single_page_operations() {
2561        let tree = TokenTree::new();
2562
2563        // Single page operations
2564        let tokens1 = make_tokens(1, 1);
2565        let tokens2 = make_tokens(1000, 1);
2566        let tokens3 = make_tokens(2000, 1);
2567
2568        tree.insert_tokens(&tokens1, "tenant1");
2569        tree.insert_tokens(&tokens2, "tenant2");
2570        tree.insert_tokens(&tokens3, "tenant3");
2571
2572        let result = tree.match_prefix_with_counts(&tokens1);
2573        assert_eq!(result.matched_token_count, PAGE_SIZE);
2574        assert_eq!(result.tenant.as_ref(), "tenant1");
2575
2576        let result = tree.match_prefix_with_counts(&tokens2);
2577        assert_eq!(result.matched_token_count, PAGE_SIZE);
2578        assert_eq!(result.tenant.as_ref(), "tenant2");
2579
2580        let result = tree.match_prefix_with_counts(&tokens3);
2581        assert_eq!(result.matched_token_count, PAGE_SIZE);
2582        assert_eq!(result.tenant.as_ref(), "tenant3");
2583    }
2584
2585    #[test]
2586    fn test_prefix_is_subset_of_existing() {
2587        let tree = TokenTree::new();
2588
2589        // Insert longer sequence first (3 pages)
2590        let long_tokens = make_tokens(1, 3);
2591        tree.insert_tokens(&long_tokens, "tenant1");
2592
2593        // Insert prefix (1 page)
2594        let short_tokens = make_tokens(1, 1);
2595        tree.insert_tokens(&short_tokens, "tenant2");
2596
2597        // Short match
2598        let result = tree.match_prefix_with_counts(&short_tokens);
2599        assert_eq!(result.matched_token_count, PAGE_SIZE);
2600
2601        // Long match
2602        let result = tree.match_prefix_with_counts(&long_tokens);
2603        assert_eq!(result.matched_token_count, 3 * PAGE_SIZE);
2604        assert_eq!(result.tenant.as_ref(), "tenant1");
2605    }
2606
2607    #[test]
2608    fn test_existing_is_prefix_of_new() {
2609        let tree = TokenTree::new();
2610
2611        // Insert shorter first (1 page)
2612        let short_tokens = make_tokens(1, 1);
2613        tree.insert_tokens(&short_tokens, "tenant1");
2614
2615        // Insert longer (3 pages)
2616        let long_tokens = make_tokens(1, 3);
2617        tree.insert_tokens(&long_tokens, "tenant2");
2618
2619        // Short match
2620        let result = tree.match_prefix_with_counts(&short_tokens);
2621        assert_eq!(result.matched_token_count, PAGE_SIZE);
2622
2623        // Long match
2624        let result = tree.match_prefix_with_counts(&long_tokens);
2625        assert_eq!(result.matched_token_count, 3 * PAGE_SIZE);
2626        assert_eq!(result.tenant.as_ref(), "tenant2");
2627    }
2628
2629    #[test]
2630    fn test_prefix_match_with_counts_accuracy() {
2631        let tree = TokenTree::new();
2632
2633        // Insert 4 pages
2634        let tokens = make_tokens(1, 4);
2635        tree.insert_tokens(&tokens, "tenant1");
2636
2637        // Exact match
2638        let result = tree.match_prefix_with_counts(&tokens);
2639        assert_eq!(result.matched_token_count, 4 * PAGE_SIZE);
2640        assert_eq!(result.input_token_count, 4 * PAGE_SIZE);
2641
2642        // Partial match (2 pages)
2643        let partial = make_tokens(1, 2);
2644        let result = tree.match_prefix_with_counts(&partial);
2645        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2646        assert_eq!(result.input_token_count, 2 * PAGE_SIZE);
2647
2648        // Extended match (input longer than inserted)
2649        let extended = make_tokens(1, 6);
2650        let result = tree.match_prefix_with_counts(&extended);
2651        assert_eq!(result.matched_token_count, 4 * PAGE_SIZE);
2652        assert_eq!(result.input_token_count, 6 * PAGE_SIZE);
2653    }
2654
2655    #[test]
2656    fn test_split_at_page_boundary() {
2657        let tree = TokenTree::new();
2658
2659        // Insert 3 pages
2660        let long_tokens = make_tokens(1, 3);
2661        tree.insert_tokens(&long_tokens, "tenant1");
2662
2663        // Insert 1 page (causes split at page boundary)
2664        let short_tokens = make_tokens(1, 1);
2665        tree.insert_tokens(&short_tokens, "tenant2");
2666
2667        // 1 page match
2668        let result = tree.match_prefix_with_counts(&short_tokens);
2669        assert_eq!(result.matched_token_count, PAGE_SIZE);
2670
2671        // 3 pages match
2672        let result = tree.match_prefix_with_counts(&long_tokens);
2673        assert_eq!(result.matched_token_count, 3 * PAGE_SIZE);
2674        assert_eq!(result.tenant.as_ref(), "tenant1");
2675    }
2676
2677    #[test]
2678    fn test_multiple_splits_same_path() {
2679        let tree = TokenTree::new();
2680
2681        // Insert 4 pages first
2682        let tokens4 = make_tokens(1, 4);
2683        tree.insert_tokens(&tokens4, "tenant1");
2684
2685        // Insert 3 pages
2686        let tokens3 = make_tokens(1, 3);
2687        tree.insert_tokens(&tokens3, "tenant2");
2688
2689        // Insert 2 pages
2690        let tokens2 = make_tokens(1, 2);
2691        tree.insert_tokens(&tokens2, "tenant3");
2692
2693        // Insert 1 page
2694        let tokens1 = make_tokens(1, 1);
2695        tree.insert_tokens(&tokens1, "tenant4");
2696
2697        // All should match correctly
2698        let result = tree.match_prefix_with_counts(&tokens1);
2699        assert_eq!(result.matched_token_count, PAGE_SIZE);
2700
2701        let result = tree.match_prefix_with_counts(&tokens2);
2702        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2703
2704        let result = tree.match_prefix_with_counts(&tokens3);
2705        assert_eq!(result.matched_token_count, 3 * PAGE_SIZE);
2706
2707        let result = tree.match_prefix_with_counts(&tokens4);
2708        assert_eq!(result.matched_token_count, 4 * PAGE_SIZE);
2709    }
2710
2711    #[test]
2712    fn test_high_contention_same_prefix() {
2713        let tree = Arc::new(TokenTree::new());
2714        let prefix = make_tokens(100, 1);
2715        let num_threads = 16;
2716
2717        let mut handles = vec![];
2718        for t in 0..num_threads {
2719            let tree = Arc::clone(&tree);
2720            let p = prefix.clone();
2721            handles.push(thread::spawn(move || {
2722                for i in 0..100 {
2723                    let mut tokens = p.clone();
2724                    let suffix = make_tokens((t * 10000 + i * 100) as u32, 1);
2725                    tokens.extend(suffix);
2726                    tree.insert_tokens(&tokens, &format!("tenant{t}"));
2727                }
2728            }));
2729        }
2730
2731        for handle in handles {
2732            handle.join().unwrap();
2733        }
2734
2735        // Verify prefix matching
2736        let result = tree.match_prefix_with_counts(&prefix);
2737        assert_eq!(result.matched_token_count, PAGE_SIZE);
2738    }
2739
2740    #[test]
2741    fn test_rapid_insert_remove_cycles() {
2742        let tree = Arc::new(TokenTree::new());
2743        let num_threads = 4;
2744
2745        let mut handles = vec![];
2746        for t in 0..num_threads {
2747            let tree = Arc::clone(&tree);
2748            handles.push(thread::spawn(move || {
2749                let tenant_id: TenantId = Arc::from(format!("tenant{t}"));
2750                for cycle in 0..10 {
2751                    // Insert
2752                    for i in 0..20 {
2753                        let base = (t * 10000000 + cycle * 100000 + i * 1000) as u32;
2754                        let tokens = make_tokens(base, 2);
2755                        tree.insert_tokens(&tokens, &format!("tenant{t}"));
2756                    }
2757                    // Evict
2758                    tree.evict_tenant(&tenant_id, 10);
2759                }
2760            }));
2761        }
2762
2763        for handle in handles {
2764            handle.join().unwrap();
2765        }
2766    }
2767
2768    #[test]
2769    fn test_eviction_empty_tree() {
2770        let tree = TokenTree::new();
2771        let tenant_id: TenantId = Arc::from("nonexistent");
2772
2773        // Should not panic
2774        tree.evict_tenant(&tenant_id, 0);
2775        tree.evict_tenant(&tenant_id, 100);
2776    }
2777
2778    #[test]
2779    fn test_eviction_zero_max_size() {
2780        let tree = TokenTree::new();
2781
2782        let tokens1 = make_tokens(1, 2);
2783        let tokens2 = make_tokens(1000, 2);
2784        tree.insert_tokens(&tokens1, "tenant1");
2785        tree.insert_tokens(&tokens2, "tenant1");
2786
2787        let tenant_id: TenantId = Arc::from("tenant1");
2788        tree.evict_tenant(&tenant_id, 0);
2789
2790        // Eviction with max_size=0 should remove entries
2791        let size = tree.tenant_token_size(&tenant_id);
2792        assert!(size < 4 * PAGE_SIZE);
2793    }
2794
2795    #[test]
2796    fn test_eviction_single_tenant_all_entries() {
2797        let tree = TokenTree::new();
2798
2799        // Insert multiple entries for one tenant
2800        for i in 0..10 {
2801            let base = (i * 1000) as u32;
2802            let tokens = make_tokens(base, 2);
2803            tree.insert_tokens(&tokens, "tenant1");
2804        }
2805
2806        let tenant_id: TenantId = Arc::from("tenant1");
2807        let initial_size = tree.tenant_token_size(&tenant_id);
2808        assert!(initial_size > 0);
2809
2810        tree.evict_tenant(&tenant_id, 0);
2811
2812        let final_size = tree.tenant_token_size(&tenant_id);
2813        assert!(final_size < initial_size);
2814    }
2815
2816    #[test]
2817    fn test_last_tenant_cache_update() {
2818        let tree = TokenTree::new();
2819
2820        let tokens = make_tokens(1, 1);
2821        tree.insert_tokens(&tokens, "tenant1");
2822        tree.insert_tokens(&tokens, "tenant2");
2823
2824        // First match
2825        let result1 = tree.match_prefix_with_counts(&tokens);
2826        let first_tenant = result1.tenant.clone();
2827
2828        // Match again - should get cached tenant
2829        let result2 = tree.match_prefix_with_counts(&tokens);
2830        assert_eq!(result2.tenant, first_tenant);
2831    }
2832
2833    #[test]
2834    fn test_stale_cache_after_tenant_removal() {
2835        let tree = TokenTree::new();
2836
2837        let tokens = make_tokens(1, 2);
2838        tree.insert_tokens(&tokens, "tenant1");
2839        tree.insert_tokens(&tokens, "tenant2");
2840
2841        // Match to populate cache
2842        let _ = tree.match_prefix_with_counts(&tokens);
2843
2844        // Evict one tenant
2845        let tenant1_id: TenantId = Arc::from("tenant1");
2846        tree.evict_tenant(&tenant1_id, 0);
2847
2848        // Match should still work (tenant2 or cache still valid)
2849        let result = tree.match_prefix_with_counts(&tokens);
2850        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2851    }
2852
2853    #[test]
2854    fn test_token_count_consistency_after_operations() {
2855        let tree = TokenTree::new();
2856
2857        // Insert
2858        let tokens1 = make_tokens(1, 3);
2859        let tokens2 = make_tokens(1, 5);
2860        tree.insert_tokens(&tokens1, "tenant1");
2861        tree.insert_tokens(&tokens2, "tenant1");
2862
2863        let tenant1_id: TenantId = Arc::from("tenant1");
2864        let count1 = tree.tenant_token_size(&tenant1_id);
2865        assert!(count1 >= PAGE_SIZE);
2866
2867        // Partial eviction
2868        tree.evict_tenant(&tenant1_id, count1 / 2);
2869        let count2 = tree.tenant_token_size(&tenant1_id);
2870        assert!(count2 <= count1);
2871
2872        // Insert more
2873        let tokens3 = make_tokens(2000, 2);
2874        tree.insert_tokens(&tokens3, "tenant1");
2875        let count3 = tree.tenant_token_size(&tenant1_id);
2876        assert!(count3 >= count2);
2877    }
2878
2879    #[test]
2880    fn test_tree_structure_integrity_after_stress() {
2881        let tree = Arc::new(TokenTree::new());
2882        let num_threads = 8;
2883
2884        let mut handles = vec![];
2885
2886        // Stress insert
2887        for t in 0..num_threads {
2888            let tree = Arc::clone(&tree);
2889            handles.push(thread::spawn(move || {
2890                for i in 0..200 {
2891                    let base = (t * 10000000 + i * 1000) as u32;
2892                    let tokens = make_tokens(base, 2);
2893                    tree.insert_tokens(&tokens, &format!("tenant{t}"));
2894                }
2895            }));
2896        }
2897
2898        for handle in handles {
2899            handle.join().unwrap();
2900        }
2901
2902        // Verify structure by matching
2903        for t in 0..num_threads {
2904            for i in 0..10 {
2905                let base = (t * 10000000 + i * 1000) as u32;
2906                let tokens = make_tokens(base, 2);
2907                let result = tree.match_prefix_with_counts(&tokens);
2908                assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2909            }
2910        }
2911    }
2912
2913    #[test]
2914    fn test_very_long_sequences() {
2915        let tree = TokenTree::new();
2916
2917        // Insert a very long sequence (64 pages = 1024 tokens)
2918        let long_seq = make_tokens(1, 64);
2919        tree.insert_tokens(&long_seq, "tenant1");
2920
2921        // Match the full sequence
2922        let result = tree.match_prefix_with_counts(&long_seq);
2923        assert_eq!(result.matched_token_count, 64 * PAGE_SIZE);
2924        assert_eq!(result.tenant.as_ref(), "tenant1");
2925
2926        // Match a prefix (32 pages)
2927        let prefix = make_tokens(1, 32);
2928        let result = tree.match_prefix_with_counts(&prefix);
2929        assert_eq!(result.matched_token_count, 32 * PAGE_SIZE);
2930    }
2931
2932    #[test]
2933    fn test_many_tenants_same_path() {
2934        let tree = TokenTree::new();
2935
2936        let tokens = make_tokens(1, 2);
2937
2938        for i in 0..100 {
2939            tree.insert_tokens(&tokens, &format!("tenant{i}"));
2940        }
2941
2942        let result = tree.match_prefix_with_counts(&tokens);
2943        assert_eq!(result.matched_token_count, 2 * PAGE_SIZE);
2944
2945        // Some tenants should have registered
2946        let counts = tree.get_tenant_token_counts();
2947        assert!(!counts.is_empty()); // At least some tenants tracked
2948    }
2949
2950    #[test]
2951    fn test_token_id_edge_values() {
2952        let tree = TokenTree::new();
2953
2954        // Test with edge case token IDs in page-aligned sequences
2955        // Create page starting with 0
2956        let mut zeros_page: Vec<TokenId> = (0..PAGE_SIZE as u32).collect();
2957        zeros_page[0] = 0;
2958        tree.insert_tokens(&zeros_page, "tenant1");
2959
2960        // Create page starting with u32::MAX
2961        let mut max_page: Vec<TokenId> = (0..PAGE_SIZE as u32).collect();
2962        max_page[0] = u32::MAX;
2963        tree.insert_tokens(&max_page, "tenant2");
2964
2965        // Create page with mixed edge values
2966        let mut mixed_page: Vec<TokenId> = (0..PAGE_SIZE as u32).collect();
2967        mixed_page[0] = 0;
2968        mixed_page[1] = u32::MAX;
2969        tree.insert_tokens(&mixed_page, "tenant3");
2970
2971        let (matched, tenant) = tree.prefix_match_legacy(&zeros_page);
2972        assert_eq!(matched.len(), PAGE_SIZE);
2973        assert!(tenant == "tenant1" || tenant == "tenant3");
2974
2975        let (matched, tenant) = tree.prefix_match_legacy(&max_page);
2976        assert_eq!(matched.len(), PAGE_SIZE);
2977        assert_eq!(tenant, "tenant2");
2978    }
2979
2980    #[test]
2981    fn test_hit_ratio_calculation() {
2982        use crate::MatchResult;
2983
2984        let tree = TokenTree::new();
2985        let one_page = make_tokens(1, 1); // 1 page = PAGE_SIZE tokens
2986        tree.insert_tokens(&one_page, "tenant1");
2987
2988        // 100% hit ratio - query exactly the inserted sequence
2989        let result = tree.match_prefix_with_counts(&one_page);
2990        assert!((result.hit_ratio() - 1.0).abs() < 0.001);
2991
2992        // 50% hit ratio - query 2 pages, only 1 page cached
2993        let two_pages = make_tokens(1, 2);
2994        let result = tree.match_prefix_with_counts(&two_pages);
2995        assert!((result.hit_ratio() - 0.5).abs() < 0.001);
2996
2997        // 0% hit ratio - query non-existent tokens (but page-aligned)
2998        let no_match = make_tokens(1000, 1);
2999        let result = tree.match_prefix_with_counts(&no_match);
3000        assert_eq!(result.hit_ratio(), 0.0);
3001    }
3002
3003    #[test]
3004    fn test_reset_via_trait() {
3005        use crate::RadixTree;
3006
3007        let tree = TokenTree::new();
3008        tree.insert_tokens(&make_tokens(1, 1), "tenant1");
3009        tree.insert_tokens(&make_tokens(100, 1), "tenant2");
3010
3011        assert!(!tree.get_tenant_token_counts().is_empty());
3012
3013        RadixTree::reset(&tree);
3014
3015        assert!(tree.get_tenant_token_counts().is_empty());
3016    }
3017
3018    #[test]
3019    fn test_eviction_policy_lru() {
3020        // LRU: Evict oldest by last access time
3021        let tree = TokenTree::with_policy(EvictionPolicy::Lru);
3022
3023        // Insert 3 paths, access them in order (oldest first)
3024        let tokens1 = make_tokens(1, 1);
3025        let tokens2 = make_tokens(100, 1);
3026        let tokens3 = make_tokens(200, 1);
3027
3028        tree.insert_tokens(&tokens1, "tenant1"); // Oldest
3029        tree.insert_tokens(&tokens2, "tenant1");
3030        tree.insert_tokens(&tokens3, "tenant1"); // Newest
3031
3032        // Access tokens2 to make it newer
3033        let _ = tree.match_prefix_with_counts(&tokens2);
3034
3035        // Evict 1 page - should evict tokens1 (oldest by last_access_time)
3036        tree.evict_tenant(&Arc::from("tenant1"), 2 * PAGE_SIZE);
3037
3038        // tokens1 should be evicted, tokens2 and tokens3 should remain
3039        let result = tree.match_prefix_with_counts(&tokens1);
3040        assert_eq!(
3041            result.matched_token_count, 0,
3042            "tokens1 should be evicted (oldest)"
3043        );
3044
3045        let result = tree.match_prefix_with_counts(&tokens2);
3046        assert_eq!(
3047            result.matched_token_count, PAGE_SIZE,
3048            "tokens2 should remain"
3049        );
3050
3051        let result = tree.match_prefix_with_counts(&tokens3);
3052        assert_eq!(
3053            result.matched_token_count, PAGE_SIZE,
3054            "tokens3 should remain"
3055        );
3056    }
3057
3058    #[test]
3059    fn test_eviction_policy_mru() {
3060        // MRU: Evict newest by last access time (opposite of LRU)
3061        let tree = TokenTree::with_policy(EvictionPolicy::Mru);
3062
3063        let tokens1 = make_tokens(1, 1);
3064        let tokens2 = make_tokens(100, 1);
3065        let tokens3 = make_tokens(200, 1);
3066
3067        tree.insert_tokens(&tokens1, "tenant1"); // Oldest
3068        tree.insert_tokens(&tokens2, "tenant1");
3069        tree.insert_tokens(&tokens3, "tenant1"); // Newest
3070
3071        // Evict 1 page - should evict tokens3 (newest by last_access_time)
3072        tree.evict_tenant(&Arc::from("tenant1"), 2 * PAGE_SIZE);
3073
3074        // tokens3 should be evicted (newest), tokens1 and tokens2 should remain
3075        let result = tree.match_prefix_with_counts(&tokens3);
3076        assert_eq!(
3077            result.matched_token_count, 0,
3078            "tokens3 should be evicted (newest)"
3079        );
3080
3081        let result = tree.match_prefix_with_counts(&tokens1);
3082        assert_eq!(
3083            result.matched_token_count, PAGE_SIZE,
3084            "tokens1 should remain"
3085        );
3086    }
3087
3088    #[test]
3089    fn test_eviction_policy_fifo() {
3090        // FIFO: Evict oldest by creation time (not affected by access)
3091        let tree = TokenTree::with_policy(EvictionPolicy::Fifo);
3092
3093        let tokens1 = make_tokens(1, 1);
3094        let tokens2 = make_tokens(100, 1);
3095        let tokens3 = make_tokens(200, 1);
3096
3097        tree.insert_tokens(&tokens1, "tenant1"); // First created
3098        tree.insert_tokens(&tokens2, "tenant1");
3099        tree.insert_tokens(&tokens3, "tenant1"); // Last created
3100
3101        // Access tokens1 multiple times (shouldn't affect FIFO)
3102        for _ in 0..10 {
3103            let _ = tree.match_prefix_with_counts(&tokens1);
3104        }
3105
3106        // Evict 1 page - should evict tokens1 (oldest by creation_time)
3107        tree.evict_tenant(&Arc::from("tenant1"), 2 * PAGE_SIZE);
3108
3109        // tokens1 should be evicted despite being accessed most recently
3110        let result = tree.match_prefix_with_counts(&tokens1);
3111        assert_eq!(
3112            result.matched_token_count, 0,
3113            "tokens1 should be evicted (oldest creation)"
3114        );
3115    }
3116
3117    #[test]
3118    fn test_eviction_policy_filo() {
3119        // FILO: Evict newest by creation time (stack-like)
3120        let tree = TokenTree::with_policy(EvictionPolicy::Filo);
3121
3122        let tokens1 = make_tokens(1, 1);
3123        let tokens2 = make_tokens(100, 1);
3124        let tokens3 = make_tokens(200, 1);
3125
3126        tree.insert_tokens(&tokens1, "tenant1"); // First created
3127        tree.insert_tokens(&tokens2, "tenant1");
3128        tree.insert_tokens(&tokens3, "tenant1"); // Last created
3129
3130        // Evict 1 page - should evict tokens3 (newest by creation_time)
3131        tree.evict_tenant(&Arc::from("tenant1"), 2 * PAGE_SIZE);
3132
3133        // tokens3 should be evicted (newest creation)
3134        let result = tree.match_prefix_with_counts(&tokens3);
3135        assert_eq!(
3136            result.matched_token_count, 0,
3137            "tokens3 should be evicted (newest creation)"
3138        );
3139
3140        let result = tree.match_prefix_with_counts(&tokens1);
3141        assert_eq!(
3142            result.matched_token_count, PAGE_SIZE,
3143            "tokens1 should remain"
3144        );
3145    }
3146
3147    #[test]
3148    fn test_eviction_policy_lfu() {
3149        // LFU: Evict least frequently used (by hit_count)
3150        let tree = TokenTree::with_policy(EvictionPolicy::Lfu);
3151
3152        let tokens1 = make_tokens(1, 1);
3153        let tokens2 = make_tokens(100, 1);
3154        let tokens3 = make_tokens(200, 1);
3155
3156        tree.insert_tokens(&tokens1, "tenant1");
3157        tree.insert_tokens(&tokens2, "tenant1");
3158        tree.insert_tokens(&tokens3, "tenant1");
3159
3160        // Access tokens1 many times to increase its hit_count
3161        for _ in 0..20 {
3162            let _ = tree.match_prefix_with_counts(&tokens1);
3163        }
3164
3165        // Access tokens3 a few times
3166        for _ in 0..5 {
3167            let _ = tree.match_prefix_with_counts(&tokens3);
3168        }
3169
3170        // tokens2 has the lowest hit_count (only 1 from insert)
3171
3172        // Evict 1 page - should evict tokens2 (lowest hit_count)
3173        tree.evict_tenant(&Arc::from("tenant1"), 2 * PAGE_SIZE);
3174
3175        // tokens2 should be evicted (least frequently used)
3176        let result = tree.match_prefix_with_counts(&tokens2);
3177        assert_eq!(
3178            result.matched_token_count, 0,
3179            "tokens2 should be evicted (lowest hit count)"
3180        );
3181
3182        // tokens1 should remain (highest hit count)
3183        let result = tree.match_prefix_with_counts(&tokens1);
3184        assert_eq!(
3185            result.matched_token_count, PAGE_SIZE,
3186            "tokens1 should remain (high hit count)"
3187        );
3188    }
3189
3190    #[test]
3191    fn test_eviction_policy_enum_default() {
3192        // Default should be LRU
3193        assert_eq!(EvictionPolicy::default(), EvictionPolicy::Lru);
3194    }
3195
3196    #[test]
3197    fn test_tree_with_policy_getter() {
3198        let tree_lru = TokenTree::with_policy(EvictionPolicy::Lru);
3199        assert_eq!(tree_lru.eviction_policy(), EvictionPolicy::Lru);
3200
3201        let tree_lfu = TokenTree::with_policy(EvictionPolicy::Lfu);
3202        assert_eq!(tree_lfu.eviction_policy(), EvictionPolicy::Lfu);
3203
3204        let tree_fifo = TokenTree::with_policy(EvictionPolicy::Fifo);
3205        assert_eq!(tree_fifo.eviction_policy(), EvictionPolicy::Fifo);
3206    }
3207
3208    #[test]
3209    fn test_tree_with_config() {
3210        // Test with_config constructor with valid page_size
3211        let tree = TokenTree::with_config(PAGE_SIZE, EvictionPolicy::Lfu);
3212        assert_eq!(tree.page_size(), PAGE_SIZE);
3213        assert_eq!(tree.eviction_policy(), EvictionPolicy::Lfu);
3214
3215        // Verify defaults: new() should use PAGE_SIZE=16 and LRU
3216        let tree_default = TokenTree::new();
3217        assert_eq!(tree_default.page_size(), 16);
3218        assert_eq!(tree_default.eviction_policy(), EvictionPolicy::Lru);
3219
3220        // with_policy should use default page_size
3221        let tree_policy = TokenTree::with_policy(EvictionPolicy::Fifo);
3222        assert_eq!(tree_policy.page_size(), PAGE_SIZE);
3223        assert_eq!(tree_policy.eviction_policy(), EvictionPolicy::Fifo);
3224    }
3225
3226    #[test]
3227    #[should_panic(expected = "TokenTree currently only supports page_size=16")]
3228    fn test_tree_with_config_invalid_page_size() {
3229        // Test that invalid page_size panics
3230        let _tree = TokenTree::with_config(32, EvictionPolicy::Lru);
3231    }
3232
3233    #[test]
3234    fn test_iter_entries_empty_tree() {
3235        let tree = TokenTree::new();
3236        let entries: Vec<_> = tree.iter_entries().collect();
3237        assert!(entries.is_empty());
3238    }
3239
3240    #[test]
3241    fn test_iter_entries_single_insert() {
3242        let tree = TokenTree::new();
3243        let tokens: Vec<TokenId> = (0..16).collect();
3244        tree.insert_tokens(&tokens, "worker-1");
3245
3246        let entries: Vec<(Vec<TokenId>, Vec<String>)> = tree
3247            .iter_entries()
3248            .map(|(p, ts)| (p, ts.into_iter().map(|(t, _)| t.to_string()).collect()))
3249            .collect();
3250        let leaf = entries
3251            .iter()
3252            .find(|(p, _)| p == &tokens)
3253            .expect("leaf with the inserted tokens");
3254        assert_eq!(leaf.1, vec!["worker-1".to_string()]);
3255    }
3256
3257    #[test]
3258    fn test_iter_entries_pre_order_lex_sorted() {
3259        // Two inserts with a shared prefix produce a split node.
3260        // Children sort by `TokenPageKey` lexicographically, so
3261        // the smaller-page-key child emits first.
3262        let tree = TokenTree::new();
3263        let prefix: Vec<TokenId> = (0..16).collect();
3264        let mut a: Vec<TokenId> = prefix.clone();
3265        a.extend(0..16); // child page = [0..16]
3266        let mut b: Vec<TokenId> = prefix.clone();
3267        b.extend(100..116); // child page = [100..116]
3268        tree.insert_tokens(&a, "wa");
3269        tree.insert_tokens(&b, "wb");
3270
3271        let paths: Vec<Vec<TokenId>> = tree.iter_entries().map(|(p, _)| p).collect();
3272        // Order: shared "0..16" parent appears first via wa being
3273        // present at index 0 of children; the actual leaves come
3274        // through in lex order.
3275        assert!(paths.contains(&a));
3276        assert!(paths.contains(&b));
3277        let pos_a = paths.iter().position(|p| p == &a).unwrap();
3278        let pos_b = paths.iter().position(|p| p == &b).unwrap();
3279        assert!(pos_a < pos_b, "page-key 0..16 < 100..116");
3280    }
3281
3282    /// Build the same sequence of operations two ways — `match_prefix_with_counts`
3283    /// + `insert_tokens` (the legacy pair) vs the fused `match_and_insert` — and
3284    /// assert the returned match results and the resulting per-tenant token
3285    /// counts match step for step. Timestamps are intentionally not compared
3286    /// (the fused path may consume a different number of monotonic ticks).
3287    fn assert_fused_matches_pair(ops: &[(Vec<TokenId>, &str)]) {
3288        let pair = TokenTree::new();
3289        let fused = TokenTree::new();
3290        for (tokens, tenant) in ops {
3291            let r_pair = pair.match_prefix_with_counts(tokens);
3292            pair.insert_tokens(tokens, tenant);
3293
3294            let r_fused = fused.match_and_insert(tokens, tenant);
3295
3296            assert_eq!(
3297                r_pair.matched_token_count, r_fused.matched_token_count,
3298                "matched_token_count mismatch for tenant {tenant}"
3299            );
3300            assert_eq!(
3301                r_pair.input_token_count, r_fused.input_token_count,
3302                "input_token_count mismatch"
3303            );
3304            // Tenant is approximate (get_any_tenant), but for these
3305            // single-tenant-per-path scenarios it is deterministic.
3306            assert_eq!(
3307                r_pair.tenant.as_ref(),
3308                r_fused.tenant.as_ref(),
3309                "matched tenant mismatch"
3310            );
3311        }
3312        assert_eq!(
3313            pair.get_tenant_token_counts(),
3314            fused.get_tenant_token_counts(),
3315            "tenant token counts diverged between pair and fused"
3316        );
3317    }
3318
3319    #[test]
3320    fn test_match_and_insert_equiv_fresh_and_full_match() {
3321        let a = make_tokens(1, 3);
3322        assert_fused_matches_pair(&[
3323            (a.clone(), "w1"), // fresh insert (single node)
3324            (a.clone(), "w1"), // exact re-insert (full match continue)
3325            (a, "w2"),         // same path, different tenant (full match, adds w2)
3326        ]);
3327    }
3328
3329    #[test]
3330    fn test_match_and_insert_equiv_prefix_of_child_split() {
3331        let long = make_tokens(1, 3);
3332        let short = make_tokens(1, 1); // prefix of `long` -> splits the node
3333        assert_fused_matches_pair(&[(long, "w1"), (short, "w2")]);
3334    }
3335
3336    #[test]
3337    fn test_match_and_insert_equiv_diverge_split() {
3338        // Shared first page, diverging second page -> partial/diverge split.
3339        let mut a = make_tokens(1, 1);
3340        a.extend(make_tokens(100, 1));
3341        let mut b = make_tokens(1, 1);
3342        b.extend(make_tokens(200, 1));
3343        assert_fused_matches_pair(&[(a, "w1"), (b, "w2")]);
3344    }
3345
3346    #[test]
3347    fn test_match_and_insert_equiv_disjoint() {
3348        let a = make_tokens(1, 2);
3349        let b = make_tokens(1000, 2);
3350        assert_fused_matches_pair(&[(a, "w1"), (b, "w2")]);
3351    }
3352
3353    #[test]
3354    fn test_match_and_insert_equiv_short_sequence() {
3355        // Below PAGE_SIZE: insert is a no-op, match returns 0.
3356        assert_fused_matches_pair(&[(vec![1, 2, 3], "w1")]);
3357    }
3358
3359    /// The `_with` closure form must behave like `match_and_insert` when the
3360    /// closure always returns the same tenant, and must skip the insert (leaving
3361    /// the tree unchanged) when it returns `None`.
3362    #[test]
3363    fn test_match_and_insert_with_select_and_skip() {
3364        let tokens = make_tokens(1, 3);
3365
3366        // Always-insert closure == match_and_insert(tokens, "w1").
3367        let via_with = TokenTree::new();
3368        let r = via_with.match_and_insert_with(&tokens, |_| Some("w1"));
3369        let via_plain = TokenTree::new();
3370        let r2 = via_plain.match_and_insert(&tokens, "w1");
3371        assert_eq!(r.matched_token_count, r2.matched_token_count);
3372        assert_eq!(
3373            via_with.get_tenant_token_counts(),
3374            via_plain.get_tenant_token_counts()
3375        );
3376
3377        // Seed a tree, then a None-returning closure must NOT mutate it.
3378        let tree = TokenTree::new();
3379        tree.insert_tokens(&tokens, "w1");
3380        let before = tree.get_tenant_token_counts();
3381        let r = tree.match_and_insert_with(&tokens, |_| None);
3382        assert_eq!(r.matched_token_count, tokens.len()); // full match observed
3383        assert_eq!(
3384            tree.get_tenant_token_counts(),
3385            before,
3386            "None selection must not insert"
3387        );
3388    }
3389
3390    /// Concurrency stress test — settles whether the fused
3391    /// `match_and_insert` / `match_and_insert_with` path corrupts
3392    /// `tenant_token_count` or loses routes under concurrent node splits
3393    /// (the review concern).
3394    ///
3395    /// Invariant under test (exact, concurrency-independent): every insert
3396    /// adds `align_to_page(input_len)` to its tenant's count, no matter how
3397    /// nodes are split mid-flight — the per-node `advance`s are frozen at match
3398    /// time, and the suffix splice re-walks from the fall-off node. If a
3399    /// concurrent split inflated the count (the "Major" review claim), the
3400    /// exact-equality assert below would fail. There is no background eviction
3401    /// in a bare `TokenTree`, so the count is pure-cumulative here.
3402    ///
3403    /// Both variants run concurrently (inline + replay) on overlapping, nested,
3404    /// diverging prefixes chosen to force splits.
3405    #[test]
3406    fn test_concurrent_match_and_insert_count_and_route_integrity() {
3407        const N_PREFIXES: usize = 8;
3408        const N_THREADS: usize = 8;
3409        const ITERS: usize = 3000;
3410
3411        // Nested shared head [1,2,3,…] (a shorter prefix matching inside a
3412        // longer node forces a split) + a disjoint divergent tail per tenant.
3413        let prefixes: Vec<Vec<TokenId>> = (0..N_PREFIXES)
3414            .map(|j| {
3415                let mut v = make_tokens(1, j + 1);
3416                v.extend(make_tokens(10_000 + (j as u32) * 100, 1));
3417                v
3418            })
3419            .collect();
3420        let tenants: Vec<String> = (0..N_PREFIXES).map(|j| format!("t{j}")).collect();
3421        let lens: Vec<usize> = prefixes.iter().map(|p| align_to_page(p.len())).collect();
3422
3423        let tree = Arc::new(TokenTree::new());
3424        let prefixes = Arc::new(prefixes);
3425        let tenants = Arc::new(tenants);
3426
3427        let handles: Vec<_> = (0..N_THREADS)
3428            .map(|t| {
3429                let tree = Arc::clone(&tree);
3430                let prefixes = Arc::clone(&prefixes);
3431                let tenants = Arc::clone(&tenants);
3432                thread::spawn(move || {
3433                    let mut local = [0usize; N_PREFIXES];
3434                    for i in 0..ITERS {
3435                        let j = (t + i) % N_PREFIXES;
3436                        if t % 2 == 0 {
3437                            // replay variant (#1: count-drift concern)
3438                            let tn = tenants[j].as_str();
3439                            tree.match_and_insert_with(&prefixes[j], |_| Some(tn));
3440                        } else {
3441                            // inline variant (#2: touch-order concern)
3442                            tree.match_and_insert(&prefixes[j], tenants[j].as_str());
3443                        }
3444                        local[j] += 1;
3445                    }
3446                    local
3447                })
3448            })
3449            .collect();
3450
3451        let mut total = [0usize; N_PREFIXES];
3452        for h in handles {
3453            let local = h.join().expect("worker panicked (deadlock/corruption)");
3454            for j in 0..N_PREFIXES {
3455                total[j] += local[j];
3456            }
3457        }
3458
3459        // DEFINITIVE: exact, concurrency-independent count.
3460        let counts = tree.get_tenant_token_counts();
3461        for j in 0..N_PREFIXES {
3462            let expected = total[j] * lens[j];
3463            let actual = counts.get(&tenants[j]).copied().unwrap_or(0);
3464            assert_eq!(
3465                actual, expected,
3466                "tenant {} token count diverged under concurrency: expected {expected}, got {actual}",
3467                tenants[j]
3468            );
3469        }
3470
3471        // Route integrity: every prefix is still fully cached at the end.
3472        for j in 0..N_PREFIXES {
3473            let r = tree.match_prefix_with_counts(&prefixes[j]);
3474            assert_eq!(
3475                r.matched_token_count, lens[j],
3476                "prefix {j} not fully cached after concurrent stress (route loss)"
3477            );
3478        }
3479    }
3480}