Skip to main content

kv_index/
string_tree.rs

1use std::{
2    cmp::Reverse,
3    collections::{BinaryHeap, HashMap},
4    hash::{BuildHasherDefault, Hasher},
5    sync::{
6        atomic::{AtomicU64, Ordering},
7        Arc, Weak,
8    },
9};
10
11use dashmap::{mapref::entry::Entry, DashMap};
12use once_cell::sync::Lazy;
13use parking_lot::RwLock;
14use tracing::debug;
15
16use super::{
17    common::{MatchResult, TenantId},
18    RadixTree,
19};
20
21type NodeRef = Arc<Node>;
22
23/// Shard counts for DashMaps to balance concurrency vs allocation overhead.
24/// Default DashMap uses num_cpus * 4 shards (e.g., 256 on 64-core machines).
25///
26/// Root node uses higher shard count since ALL requests pass through it.
27/// Other nodes use lower count as traffic diverges through the tree.
28///
29/// This reduces memory by ~90% vs default while maintaining good concurrency.
30const ROOT_SHARD_COUNT: usize = 32;
31const NODE_SHARD_COUNT: usize = 8;
32
33/// Create a children DashMap for non-root nodes
34#[inline]
35fn new_children_map() -> DashMap<char, NodeRef, CharHasherBuilder> {
36    DashMap::with_hasher_and_shard_amount(CharHasherBuilder::default(), NODE_SHARD_COUNT)
37}
38
39/// Create a tenant access time DashMap for non-root nodes
40#[inline]
41fn new_tenant_map() -> DashMap<TenantId, u64> {
42    DashMap::with_shard_amount(NODE_SHARD_COUNT)
43}
44
45/// Result of a prefix match operation, including char counts to avoid recomputation.
46#[derive(Debug, Clone)]
47pub struct PrefixMatchResult {
48    /// The tenant that owns the matched prefix (zero-copy)
49    pub tenant: TenantId,
50    /// Number of characters matched
51    pub matched_char_count: usize,
52    /// Total number of characters in the input text
53    pub input_char_count: usize,
54}
55
56impl MatchResult for PrefixMatchResult {
57    fn tenant(&self) -> &TenantId {
58        &self.tenant
59    }
60
61    fn matched_count(&self) -> usize {
62        self.matched_char_count
63    }
64
65    fn input_count(&self) -> usize {
66        self.input_char_count
67    }
68}
69
70/// A fast identity hasher for single-character keys (used in children DashMap).
71/// Since chars have good distribution already, we use identity hashing with mixing.
72#[derive(Default)]
73struct CharHasher(u64);
74
75impl Hasher for CharHasher {
76    #[inline(always)]
77    fn finish(&self) -> u64 {
78        self.0
79    }
80
81    #[inline(always)]
82    fn write(&mut self, bytes: &[u8]) {
83        // Fast path for 4-byte (char) writes - avoid loop
84        if bytes.len() == 4 {
85            let val = u32::from_ne_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]);
86            // Mix with golden ratio for better distribution
87            self.0 = (val as u64).wrapping_mul(0x9E3779B97F4A7C15);
88            return;
89        }
90        // Fallback for other sizes (shouldn't happen for char keys)
91        for &byte in bytes {
92            self.0 = self.0.wrapping_mul(0x100000001b3).wrapping_add(byte as u64);
93        }
94    }
95
96    #[inline(always)]
97    fn write_u32(&mut self, i: u32) {
98        // Chars are u32 - use golden ratio multiplication for distribution
99        self.0 = (i as u64).wrapping_mul(0x9E3779B97F4A7C15);
100    }
101}
102
103type CharHasherBuilder = BuildHasherDefault<CharHasher>;
104
105/// Advance a string slice by N characters, returning the remaining slice.
106/// Returns empty string if n >= char count.
107/// Optimized: uses direct byte slicing for ASCII, falls back to char_indices for UTF-8.
108#[inline]
109fn advance_by_chars(s: &str, n: usize) -> &str {
110    if n == 0 {
111        return s;
112    }
113    if n >= s.len() {
114        return "";
115    }
116    // Fast path: if first N bytes are all ASCII, we can slice directly
117    let bytes = s.as_bytes();
118    if bytes[..n].is_ascii() {
119        // Safe: we verified all bytes in [0..n] are ASCII (valid UTF-8 boundary)
120        return &s[n..];
121    }
122    // Slow path: UTF-8 requires char-by-char traversal
123    s.char_indices()
124        .nth(n)
125        .map(|(idx, _)| &s[idx..])
126        .unwrap_or("")
127}
128
129/// Get the first N characters of a string as a new String.
130/// More efficient than chars().take(n).collect() for known bounds.
131#[inline]
132fn take_chars(s: &str, n: usize) -> String {
133    if n == 0 {
134        return String::new();
135    }
136    s.char_indices()
137        .nth(n)
138        .map(|(idx, _)| s[..idx].to_string())
139        .unwrap_or_else(|| s.to_string())
140}
141
142/// Node text with cached character count to avoid repeated O(n) chars().count() calls.
143#[derive(Debug)]
144struct NodeText {
145    /// The actual text stored in this node
146    text: String,
147    /// Cached character count (UTF-8 chars, not bytes)
148    char_count: usize,
149}
150
151impl NodeText {
152    #[inline]
153    fn new(text: String) -> Self {
154        let char_count = text.chars().count();
155        Self { text, char_count }
156    }
157
158    #[inline]
159    fn empty() -> Self {
160        Self {
161            text: String::new(),
162            char_count: 0,
163        }
164    }
165
166    #[inline]
167    fn char_count(&self) -> usize {
168        self.char_count
169    }
170
171    #[inline]
172    fn as_str(&self) -> &str {
173        &self.text
174    }
175
176    #[inline]
177    fn first_char(&self) -> Option<char> {
178        self.text.chars().next()
179    }
180
181    /// Split the text at a character boundary, returning the prefix and suffix.
182    /// This is more efficient than slice_by_chars as it computes both at once.
183    #[inline]
184    fn split_at_char(&self, char_idx: usize) -> (NodeText, NodeText) {
185        if char_idx == 0 {
186            return (NodeText::empty(), self.clone_text());
187        }
188        if char_idx >= self.char_count {
189            return (self.clone_text(), NodeText::empty());
190        }
191
192        // Find byte index for the character boundary
193        let byte_idx = self
194            .text
195            .char_indices()
196            .nth(char_idx)
197            .map(|(i, _)| i)
198            .unwrap_or(self.text.len());
199
200        let prefix = NodeText {
201            text: self.text[..byte_idx].to_string(),
202            char_count: char_idx,
203        };
204        let suffix = NodeText {
205            text: self.text[byte_idx..].to_string(),
206            char_count: self.char_count - char_idx,
207        };
208        (prefix, suffix)
209    }
210
211    #[inline]
212    fn clone_text(&self) -> NodeText {
213        NodeText {
214            text: self.text.clone(),
215            char_count: self.char_count,
216        }
217    }
218}
219
220impl Clone for NodeText {
221    fn clone(&self) -> Self {
222        self.clone_text()
223    }
224}
225
226/// Global tenant string intern pool to avoid repeated allocations.
227/// Uses DashMap for concurrent access with minimal contention.
228static TENANT_INTERN_POOL: Lazy<DashMap<Arc<str>, ()>> = Lazy::new(DashMap::new);
229
230/// Global epoch counter for LRU ordering.
231/// Uses a simple incrementing counter instead of wall clock time.
232///
233/// Benefits:
234/// - No syscall overhead (vs SystemTime::now())
235/// - Smaller memory footprint (u64 vs u128)
236/// - Perfectly monotonic (no clock skew issues)
237///
238/// For LRU eviction, relative ordering is all that matters.
239static EPOCH_COUNTER: AtomicU64 = AtomicU64::new(0);
240
241/// Get the next epoch value for LRU timestamp ordering.
242/// Uses fetch_add for lock-free, monotonically increasing values.
243/// Relaxed ordering is sufficient since we only need eventual consistency
244/// for approximate LRU behavior.
245#[inline]
246fn get_epoch() -> u64 {
247    EPOCH_COUNTER.fetch_add(1, Ordering::Relaxed)
248}
249
250#[derive(Debug)]
251struct Node {
252    /// Children nodes indexed by first character.
253    /// Using custom hasher optimized for char keys.
254    children: DashMap<char, NodeRef, CharHasherBuilder>,
255    /// Node text with cached character count
256    text: RwLock<NodeText>,
257    /// Per-tenant last access epoch for LRU ordering. Using TenantId (Arc<str>) for cheap cloning.
258    tenant_last_access_time: DashMap<TenantId, u64>,
259    /// Parent pointer for upward traversal during timestamp updates.
260    /// Uses Weak to avoid Arc reference cycles (parent -> child -> parent).
261    parent: RwLock<Option<Weak<Node>>>,
262    /// Cached last-accessed tenant for O(1) lookup during prefix match.
263    /// Avoids O(shards) DashMap iteration in the common case.
264    last_tenant: RwLock<Option<TenantId>>,
265}
266
267#[derive(Debug)]
268pub struct Tree {
269    root: NodeRef,
270    /// Per-tenant character count for size tracking. Using TenantId for consistency.
271    pub tenant_char_count: DashMap<TenantId, usize>,
272}
273
274// For the heap
275
276struct EvictionEntry {
277    timestamp: u64,
278    tenant: TenantId,
279    node: NodeRef,
280}
281
282impl Eq for EvictionEntry {}
283
284#[expect(clippy::non_canonical_partial_ord_impl)]
285impl PartialOrd for EvictionEntry {
286    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
287        Some(self.timestamp.cmp(&other.timestamp))
288    }
289}
290
291impl Ord for EvictionEntry {
292    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
293        self.timestamp.cmp(&other.timestamp)
294    }
295}
296
297impl PartialEq for EvictionEntry {
298    fn eq(&self, other: &Self) -> bool {
299        self.timestamp == other.timestamp
300    }
301}
302
303// For char operations
304// Note that in rust, `.len()` or slice is operated on the "byte" level. It causes issues for UTF-8 characters because one character might use multiple bytes.
305// https://en.wikipedia.org/wiki/UTF-8
306
307/// Count matching prefix characters between two strings.
308/// Returns the number of characters that match from the start.
309/// Optimized: uses fast byte comparison for ASCII, falls back to char iteration for UTF-8.
310#[inline]
311fn shared_prefix_count(a: &str, b: &str) -> usize {
312    let a_bytes = a.as_bytes();
313    let b_bytes = b.as_bytes();
314
315    // Find common byte prefix length using iterator (potentially SIMD-optimized)
316    let common_byte_len = a_bytes
317        .iter()
318        .zip(b_bytes)
319        .position(|(&a_byte, &b_byte)| a_byte != b_byte)
320        .unwrap_or_else(|| a_bytes.len().min(b_bytes.len()));
321
322    // If the common byte prefix is all ASCII, byte count == char count
323    // Otherwise, fall back to char-by-char comparison for UTF-8 safety
324    if a_bytes[..common_byte_len].is_ascii() {
325        common_byte_len
326    } else {
327        shared_prefix_count_chars(a, b)
328    }
329}
330
331/// Fallback char-by-char comparison for strings with non-ASCII characters.
332#[inline]
333fn shared_prefix_count_chars(a: &str, b: &str) -> usize {
334    a.chars()
335        .zip(b.chars())
336        .take_while(|(a_char, b_char)| a_char == b_char)
337        .count()
338}
339
340/// Intern tenant ID to avoid repeated allocations.
341/// Returns cached Arc<str> if tenant was seen before.
342#[inline]
343fn intern_tenant(tenant: &str) -> TenantId {
344    // Fast path: check if already interned
345    if let Some(entry) = TENANT_INTERN_POOL.get(tenant) {
346        return Arc::clone(entry.key());
347    }
348
349    // Slow path: intern new tenant
350    let interned: Arc<str> = Arc::from(tenant);
351    TENANT_INTERN_POOL.insert(Arc::clone(&interned), ());
352    interned
353}
354
355impl Default for Tree {
356    fn default() -> Self {
357        Self::new()
358    }
359}
360
361impl Tree {
362    /*
363    Thread-safe multi tenant radix tree
364
365    1. Storing data for multiple tenants (the overlap of multiple radix tree)
366    2. Node-level lock to enable concurrent access on nodes
367    3. Leaf LRU eviction based on tenant access time
368
369    Optimizations:
370    - Cached character counts in NodeText to avoid O(n) chars().count() calls
371    - Interned tenant IDs (Arc<str>) for cheap cloning and comparison
372    - Batched timestamp updates to reduce syscalls
373    - Custom hasher for char keys in children DashMap
374    */
375
376    pub fn new() -> Self {
377        Tree {
378            // Root uses higher shard count since ALL requests pass through it
379            root: Arc::new(Node {
380                children: DashMap::with_hasher_and_shard_amount(
381                    CharHasherBuilder::default(),
382                    ROOT_SHARD_COUNT,
383                ),
384                text: RwLock::new(NodeText::empty()),
385                tenant_last_access_time: DashMap::with_shard_amount(ROOT_SHARD_COUNT),
386                parent: RwLock::new(None),
387                last_tenant: RwLock::new(None),
388            }),
389            tenant_char_count: DashMap::with_shard_amount(ROOT_SHARD_COUNT),
390        }
391    }
392
393    pub fn insert_text(&self, text: &str, tenant: &str) {
394        // Insert text into tree with given tenant
395        // Use slice-based traversal to avoid Vec<char> allocation
396
397        // Intern the tenant ID once for reuse
398        let tenant_id = intern_tenant(tenant);
399
400        // Ensure tenant exists at root (don't update timestamp - root is never evicted)
401        self.root
402            .tenant_last_access_time
403            .entry(Arc::clone(&tenant_id))
404            .or_insert(0);
405
406        self.tenant_char_count
407            .entry(Arc::clone(&tenant_id))
408            .or_insert(0);
409
410        // Descend from the root inserting the whole text.
411        self.insert_from(Arc::clone(&self.root), text, tenant_id);
412    }
413
414    /// Insert `remaining` for `tenant_id` starting the descent at `start`
415    /// (which is treated as the parent of the first edge), reusing the exact
416    /// node-split / tenant-attach / leaf-timestamp logic of [`Self::insert_text`].
417    ///
418    /// `insert_text` is `insert_from(root, text, tenant_id)` after the root
419    /// bookkeeping. [`Self::match_and_insert_with`] reuses it to splice only the
420    /// *unmatched suffix* at the fall-off node, so the already-matched prefix is
421    /// never re-walked. The caller is responsible for the root bookkeeping
422    /// (`tenant_last_access_time` / `tenant_char_count` entries) and, when
423    /// resuming below the root, for attaching `tenant_id` to the ancestor nodes
424    /// on the matched path (which this method does not touch).
425    fn insert_from(&self, start: NodeRef, mut remaining: &str, tenant_id: TenantId) {
426        let mut prev = start;
427
428        // Result type to carry state out of the match block
429        // This allows the entry guard to be dropped before we update prev
430        enum InsertStep {
431            Done,
432            Continue {
433                next_prev: NodeRef,
434                advance_chars: usize,
435            },
436        }
437
438        while let Some(first_char) = remaining.chars().next() {
439            // Use entry API for atomic check-and-insert semantics (required for thread safety)
440            let step = match prev.children.entry(first_char) {
441                Entry::Vacant(entry) => {
442                    // No match - create new node with remaining text (this is the leaf)
443                    // Compute remaining char count lazily - only here when creating leaf
444                    let remaining_char_count = remaining.chars().count();
445                    let epoch = get_epoch();
446
447                    let new_node = Arc::new(Node {
448                        children: new_children_map(),
449                        text: RwLock::new(NodeText::new(remaining.to_string())),
450                        tenant_last_access_time: new_tenant_map(),
451                        parent: RwLock::new(Some(Arc::downgrade(&prev))),
452                        last_tenant: RwLock::new(Some(Arc::clone(&tenant_id))),
453                    });
454
455                    // Attach tenant to the new leaf node with timestamp
456                    self.tenant_char_count
457                        .entry(Arc::clone(&tenant_id))
458                        .and_modify(|count| *count += remaining_char_count)
459                        .or_insert(remaining_char_count);
460                    new_node
461                        .tenant_last_access_time
462                        .insert(Arc::clone(&tenant_id), epoch);
463
464                    entry.insert(new_node);
465                    InsertStep::Done
466                }
467
468                Entry::Occupied(mut entry) => {
469                    let matched_node = entry.get().clone();
470
471                    let matched_node_text = matched_node.text.read();
472                    let matched_node_text_count = matched_node_text.char_count();
473                    let matched_node_text_str = matched_node_text.as_str();
474
475                    // Use slice-based comparison - no allocation
476                    let shared_count = shared_prefix_count(remaining, matched_node_text_str);
477
478                    if shared_count < matched_node_text_count {
479                        // Split the matched node
480                        let (matched_text, contracted_text) =
481                            matched_node_text.split_at_char(shared_count);
482                        let matched_text_count = shared_count;
483
484                        // Drop read lock before creating new node
485                        drop(matched_node_text);
486
487                        let new_node = Arc::new(Node {
488                            text: RwLock::new(matched_text),
489                            children: new_children_map(),
490                            parent: RwLock::new(Some(Arc::downgrade(&prev))),
491                            tenant_last_access_time: matched_node.tenant_last_access_time.clone(),
492                            last_tenant: RwLock::new(matched_node.last_tenant.read().clone()),
493                        });
494
495                        let Some(first_new_char) = contracted_text.first_char() else {
496                            // split_at_char with shared_count < char_count guarantees non-empty suffix
497                            return;
498                        };
499                        new_node
500                            .children
501                            .insert(first_new_char, Arc::clone(&matched_node));
502
503                        entry.insert(Arc::clone(&new_node));
504
505                        *matched_node.text.write() = contracted_text;
506                        *matched_node.parent.write() = Some(Arc::downgrade(&new_node));
507
508                        // Attach tenant to the new split node (intermediate - no timestamp update)
509                        // The cloned DashMap already has the tenant; just ensure char count is correct
510                        if !new_node
511                            .tenant_last_access_time
512                            .contains_key(tenant_id.as_ref())
513                        {
514                            self.tenant_char_count
515                                .entry(Arc::clone(&tenant_id))
516                                .and_modify(|count| *count += matched_text_count)
517                                .or_insert(matched_text_count);
518                            new_node
519                                .tenant_last_access_time
520                                .insert(Arc::clone(&tenant_id), 0);
521                        }
522
523                        InsertStep::Continue {
524                            next_prev: new_node,
525                            advance_chars: shared_count,
526                        }
527                    } else {
528                        // Full match - move to next node (intermediate - no timestamp update)
529                        drop(matched_node_text);
530
531                        // Ensure tenant exists at this intermediate node
532                        if !matched_node
533                            .tenant_last_access_time
534                            .contains_key(tenant_id.as_ref())
535                        {
536                            self.tenant_char_count
537                                .entry(Arc::clone(&tenant_id))
538                                .and_modify(|count| *count += matched_node_text_count)
539                                .or_insert(matched_node_text_count);
540                            matched_node
541                                .tenant_last_access_time
542                                .insert(Arc::clone(&tenant_id), 0);
543                        }
544
545                        InsertStep::Continue {
546                            next_prev: matched_node,
547                            advance_chars: shared_count,
548                        }
549                    }
550                }
551            };
552
553            // Entry guard is now dropped - safe to update prev
554            match step {
555                InsertStep::Done => return, // New leaf created with timestamp, we're done
556                InsertStep::Continue {
557                    next_prev,
558                    advance_chars,
559                } => {
560                    prev = next_prev;
561                    remaining = advance_by_chars(remaining, advance_chars);
562                }
563            }
564        }
565
566        // Loop exited normally (remaining empty) - prev is the leaf node
567        // Update its timestamp for LRU ordering
568        let epoch = get_epoch();
569        prev.tenant_last_access_time
570            .insert(Arc::clone(&tenant_id), epoch);
571    }
572
573    /// Performs prefix matching and returns detailed result with char counts.
574    /// Optimized: no string allocations, deferred char counting.
575    pub fn match_prefix_with_counts(&self, text: &str) -> PrefixMatchResult {
576        let mut remaining = text;
577        let mut matched_chars = 0;
578        let mut prev = Arc::clone(&self.root);
579
580        while let Some(first_char) = remaining.chars().next() {
581            let child_node = prev.children.get(&first_char).map(|e| e.value().clone());
582
583            if let Some(matched_node) = child_node {
584                let matched_text_guard = matched_node.text.read();
585                let matched_node_text_count = matched_text_guard.char_count();
586
587                // Use slice-based comparison - no allocation
588                let shared_count = shared_prefix_count(remaining, matched_text_guard.as_str());
589                drop(matched_text_guard);
590
591                if shared_count == matched_node_text_count {
592                    // Full match with current node's text, continue to next node
593                    matched_chars += shared_count;
594                    remaining = advance_by_chars(remaining, shared_count);
595                    prev = matched_node;
596                } else {
597                    // Partial match - still use this node for tenant selection
598                    matched_chars += shared_count;
599                    prev = matched_node;
600                    break;
601                }
602            } else {
603                // No match found, stop here
604                break;
605            }
606        }
607
608        let curr = prev;
609
610        // Try cached tenant first (O(1)) before falling back to O(shards) DashMap iteration.
611        // The cache is valid if the tenant still exists in tenant_last_access_time.
612        let tenant: TenantId = {
613            let cached = curr.last_tenant.read();
614            if let Some(ref t) = *cached {
615                if curr.tenant_last_access_time.contains_key(t.as_ref()) {
616                    Arc::clone(t)
617                } else {
618                    drop(cached);
619                    // Cache stale, fall back to iteration and update cache
620                    let t = curr
621                        .tenant_last_access_time
622                        .iter()
623                        .next()
624                        .map(|kv| Arc::clone(kv.key()))
625                        .unwrap_or_else(|| intern_tenant("empty"));
626                    *curr.last_tenant.write() = Some(Arc::clone(&t));
627                    t
628                }
629            } else {
630                drop(cached);
631                // No cache, iterate and populate cache
632                let t = curr
633                    .tenant_last_access_time
634                    .iter()
635                    .next()
636                    .map(|kv| Arc::clone(kv.key()))
637                    .unwrap_or_else(|| intern_tenant("empty"));
638                *curr.last_tenant.write() = Some(Arc::clone(&t));
639                t
640            }
641        };
642
643        // Update timestamp probabilistically (1 in 8 matches) to reduce DashMap contention.
644        // LRU eviction doesn't need perfect accuracy - approximate timestamps suffice.
645        // Skip the update for the synthetic "empty" tenant to avoid polluting the tree
646        // with a tenant that was never inserted via insert_text.
647        let epoch = get_epoch();
648        if epoch & 0x7 == 0 && tenant.as_ref() != "empty" {
649            curr.tenant_last_access_time
650                .insert(Arc::clone(&tenant), epoch);
651        }
652
653        // Compute input char count directly from input text.
654        // This is equivalent to matched_chars + remaining.chars().count() but avoids
655        // needing to track remaining precisely through the traversal.
656        let input_char_count = text.chars().count();
657
658        PrefixMatchResult {
659            tenant,
660            matched_char_count: matched_chars,
661            input_char_count,
662        }
663    }
664
665    /// Read-only resolution of a node's "owning" tenant, mirroring the tenant
666    /// pick in [`Self::match_prefix_with_counts`] **without** the cache-populate
667    /// or probabilistic timestamp side effects. Returns the tenant plus whether
668    /// the slow (iteration) path was taken — the caller replays the original's
669    /// single deferred side effect on the final node.
670    ///
671    /// Used by [`Self::match_and_insert`] so the match tenant is resolved from a
672    /// node's state **before** the fused insert adds the inserting tenant to it,
673    /// reproducing the original "match runs fully before insert" ordering for the
674    /// (otherwise non-deterministic) slow-path tenant pick.
675    #[expect(
676        clippy::unused_self,
677        reason = "method logically belongs to the tree; mirrors match_prefix_with_counts resolution"
678    )]
679    fn resolve_tenant_readonly(&self, node: &NodeRef) -> (TenantId, bool) {
680        let cached = node.last_tenant.read();
681        if let Some(ref t) = *cached {
682            if node.tenant_last_access_time.contains_key(t.as_ref()) {
683                return (Arc::clone(t), false);
684            }
685        }
686        drop(cached);
687        let t = node
688            .tenant_last_access_time
689            .iter()
690            .next()
691            .map(|kv| Arc::clone(kv.key()))
692            .unwrap_or_else(|| intern_tenant("empty"));
693        (t, true)
694    }
695
696    /// Combined match + insert for a known `tenant` in a single descent.
697    ///
698    /// Thin wrapper over [`Self::match_and_insert_with`] for callers that already
699    /// know the tenant to insert for before matching (e.g. the imbalanced
700    /// min-load path, which routes by worker load, not cache affinity). Returns
701    /// the match result for any cache bookkeeping the caller needs.
702    pub fn match_and_insert(&self, text: &str, tenant: &str) -> PrefixMatchResult {
703        self.match_and_insert_with(text, move |_| Some(tenant))
704    }
705
706    /// Match in a single descent, then choose the insert tenant from the match
707    /// result and insert for it — still a SINGLE walk of the matched prefix.
708    ///
709    /// String analogue of `TokenTree::match_and_insert_with`. The cache-aware
710    /// router picks the worker it inserts for *from* the match outcome, so the
711    /// tenant is unknown until the match finishes. `select` runs once, after the
712    /// match, with the [`PrefixMatchResult`]; `Some(tenant)` inserts `text` for
713    /// that tenant, `None` skips the insert (the router's "no worker selected"
714    /// branch).
715    ///
716    /// # How the single descent is achieved
717    ///
718    /// The match phase walks the prefix exactly like
719    /// [`Self::match_prefix_with_counts`] (including its single deferred,
720    /// probabilistic timestamp touch on the resolved node — done via
721    /// [`Self::finish_match_and_insert`]), while recording the chain of
722    /// full-match nodes it descended through. After `select` yields the tenant we
723    /// re-attach it to those recorded ancestor nodes directly (the epoch-0
724    /// intermediate attach `insert_text` performs while descending) and splice
725    /// only the *unmatched suffix* at the fall-off node via
726    /// [`Self::insert_from`]. The matched prefix is therefore compared once, not
727    /// twice.
728    ///
729    /// # Preserved semantics
730    ///
731    /// Identical to `match_prefix_with_counts` followed by
732    /// `insert_text(text, tenant)`: same matched/-input char counts, same
733    /// resolved tenant and probabilistic touch, same node splits, same epoch-0
734    /// ancestor attaches, same final-leaf real timestamp, and same
735    /// `tenant_char_count` accounting. Because the match phase runs fully before
736    /// any insert mutation (just like the two separate calls), the tenant pick
737    /// observes the un-polluted tree; only the exact monotonic timestamp values
738    /// of insert's writes shift by a few ticks, which is immaterial to LRU.
739    pub fn match_and_insert_with<'t, F>(&self, text: &str, select: F) -> PrefixMatchResult
740    where
741        F: FnOnce(&PrefixMatchResult) -> Option<&'t str>,
742    {
743        // ---- Phase 1: MATCH descent (mirrors match_prefix_with_counts) ----
744        // Record the full-match nodes (for ancestor re-attach) and the fall-off
745        // point (node + remaining slice) for the suffix splice. No mutation here
746        // beyond match's own deferred touch below, so the tenant pick sees the
747        // un-polluted tree exactly like the standalone match.
748        let mut remaining = text;
749        let mut matched_chars = 0usize;
750        let mut current = Arc::clone(&self.root);
751        // The node match resolves its tenant on (its final `curr`): the deepest
752        // full-match node, the partial child, or the root if nothing matched.
753        let mut match_curr = Arc::clone(&self.root);
754        // (node, char_count) for each full-match edge, in order.
755        // Pre-allocated; most matched paths are well under this depth.
756        let mut path: Vec<(NodeRef, usize)> = Vec::with_capacity(16);
757
758        while let Some(first_char) = remaining.chars().next() {
759            let child_node = current.children.get(&first_char).map(|e| e.value().clone());
760
761            let Some(matched_node) = child_node else {
762                // No child for this char: match stops at `current`.
763                break;
764            };
765
766            let matched_text_guard = matched_node.text.read();
767            let matched_node_text_count = matched_text_guard.char_count();
768            let shared_count = shared_prefix_count(remaining, matched_text_guard.as_str());
769            drop(matched_text_guard);
770
771            if shared_count == matched_node_text_count {
772                // Full match -> continue. Record for ancestor re-attach.
773                matched_chars += shared_count;
774                path.push((Arc::clone(&matched_node), matched_node_text_count));
775                remaining = advance_by_chars(remaining, shared_count);
776                current = Arc::clone(&matched_node);
777                match_curr = matched_node;
778            } else {
779                // Partial match: match stops, resolving on this child node.
780                matched_chars += shared_count;
781                match_curr = matched_node;
782                // `current` stays the parent — the splice re-probes it and
783                // splits the partial child exactly like `insert_text`.
784                break;
785            }
786        }
787
788        // ---- Match side effect + result (verbatim match_prefix_with_counts) ----
789        let (match_tenant, match_slow) = self.resolve_tenant_readonly(&match_curr);
790        let result = self.finish_match_and_insert(
791            &match_curr,
792            &match_tenant,
793            match_slow,
794            matched_chars,
795            text,
796        );
797
798        // ---- Decide the insert tenant from the match result ----
799        let Some(tenant) = select(&result) else {
800            return result;
801        };
802        // Intern through the shared pool so the stored Arc dedups (matches
803        // insert_text's interning).
804        let tenant_id = intern_tenant(tenant);
805
806        // ---- Phase 2: INSERT for `tenant_id` without re-walking the prefix ----
807        // Root bookkeeping (mirrors insert_text).
808        self.root
809            .tenant_last_access_time
810            .entry(Arc::clone(&tenant_id))
811            .or_insert(0);
812        self.tenant_char_count
813            .entry(Arc::clone(&tenant_id))
814            .or_insert(0);
815
816        // Re-attach the inserting tenant to every full-match ancestor node, the
817        // epoch-0 intermediate attach `insert_text` performs while descending.
818        for (node, char_count) in &path {
819            if !node
820                .tenant_last_access_time
821                .contains_key(tenant_id.as_ref())
822            {
823                self.tenant_char_count
824                    .entry(Arc::clone(&tenant_id))
825                    .and_modify(|count| *count += *char_count)
826                    .or_insert(*char_count);
827                node.tenant_last_access_time
828                    .insert(Arc::clone(&tenant_id), 0);
829            }
830        }
831
832        if remaining.is_empty() {
833            // Loop-end: `current` is the final leaf; give it the real timestamp
834            // (insert_text's tail). It already received the epoch-0 attach above
835            // if it is on the path.
836            let epoch = get_epoch();
837            current
838                .tenant_last_access_time
839                .insert(Arc::clone(&tenant_id), epoch);
840        } else {
841            // Fall-off: splice only the unmatched suffix below `current`,
842            // reusing insert_text's exact loop (handles split-continue + the
843            // final-leaf real timestamp). The matched prefix is not re-walked.
844            self.insert_from(current, remaining, tenant_id);
845        }
846
847        result
848    }
849
850    /// Replay `match_prefix_with_counts`'s single deferred side effect (populate
851    /// the `last_tenant` cache on the slow path, then a probabilistic 1/8
852    /// timestamp touch) on the resolved final node, and build the match result.
853    /// Factored out so [`Self::match_and_insert_with`] applies it identically to
854    /// the standalone `match_prefix_with_counts` tail.
855    #[expect(
856        clippy::unused_self,
857        reason = "method logically belongs to the tree; mirrors match_prefix_with_counts tail"
858    )]
859    fn finish_match_and_insert(
860        &self,
861        match_node: &NodeRef,
862        tenant: &TenantId,
863        took_slow_path: bool,
864        matched_chars: usize,
865        text: &str,
866    ) -> PrefixMatchResult {
867        // On the slow path the original resolution populates the cache.
868        if took_slow_path {
869            *match_node.last_tenant.write() = Some(Arc::clone(tenant));
870        }
871
872        // Probabilistic (1 in 8) timestamp touch on the resolved tenant, skipping
873        // the synthetic "empty" tenant — identical to match_prefix_with_counts.
874        let epoch = get_epoch();
875        if epoch & 0x7 == 0 && tenant.as_ref() != "empty" {
876            match_node
877                .tenant_last_access_time
878                .insert(Arc::clone(tenant), epoch);
879        }
880
881        let input_char_count = text.chars().count();
882
883        PrefixMatchResult {
884            tenant: Arc::clone(tenant),
885            matched_char_count: matched_chars,
886            input_char_count,
887        }
888    }
889
890    /// Legacy prefix_match API for backward compatibility.
891    /// Note: This computes matched_text which has allocation overhead.
892    pub fn prefix_match_legacy(&self, text: &str) -> (String, String) {
893        let result = self.match_prefix_with_counts(text);
894        let matched_text = take_chars(text, result.matched_char_count);
895        (matched_text, result.tenant.to_string())
896    }
897
898    pub fn prefix_match_tenant(&self, text: &str, tenant: &str) -> String {
899        // Use slice-based traversal - no Vec<char> allocation
900
901        // Intern tenant ID once for efficient lookups
902        let tenant_id = intern_tenant(tenant);
903
904        let mut remaining = text;
905        let mut matched_chars = 0;
906        let mut prev = Arc::clone(&self.root);
907
908        while let Some(first_char) = remaining.chars().next() {
909            let child_node = prev.children.get(&first_char).map(|e| e.value().clone());
910
911            if let Some(matched_node) = child_node {
912                // Only continue matching if this node belongs to the specified tenant
913                if !matched_node
914                    .tenant_last_access_time
915                    .contains_key(tenant_id.as_ref())
916                {
917                    break;
918                }
919
920                let matched_text_guard = matched_node.text.read();
921                let matched_node_text_count = matched_text_guard.char_count();
922
923                // Use slice-based comparison - no allocation
924                let shared_count = shared_prefix_count(remaining, matched_text_guard.as_str());
925                drop(matched_text_guard);
926
927                if shared_count == matched_node_text_count {
928                    // Full match with current node's text, continue to next node
929                    matched_chars += shared_count;
930                    remaining = advance_by_chars(remaining, shared_count);
931                    prev = matched_node;
932                } else {
933                    // Partial match - still use this node for timestamp update
934                    matched_chars += shared_count;
935                    prev = matched_node;
936                    break;
937                }
938            } else {
939                // No match found, stop here
940                break;
941            }
942        }
943
944        let curr = prev;
945
946        // Only update timestamp if we found a match for the specified tenant.
947        // Update matched node only - ancestor propagation is unnecessary.
948        if curr
949            .tenant_last_access_time
950            .contains_key(tenant_id.as_ref())
951        {
952            let epoch = get_epoch();
953            curr.tenant_last_access_time
954                .insert(Arc::clone(&tenant_id), epoch);
955        }
956
957        // Build result from original input using char count
958        take_chars(text, matched_chars)
959    }
960
961    /// Return the list of tenants for which this node is a leaf.
962    /// A tenant is a leaf at this node if no children have that tenant.
963    fn leaf_of(node: &NodeRef) -> Vec<TenantId> {
964        let mut candidates: HashMap<TenantId, bool> = node
965            .tenant_last_access_time
966            .iter()
967            .map(|entry| (Arc::clone(entry.key()), true))
968            .collect();
969
970        for child in &node.children {
971            for tenant in &child.value().tenant_last_access_time {
972                // Mark as non-leaf if any child has this tenant
973                candidates.insert(Arc::clone(tenant.key()), false);
974            }
975        }
976
977        candidates
978            .into_iter()
979            .filter(|(_, is_leaf)| *is_leaf)
980            .map(|(tenant, _)| tenant)
981            .collect()
982    }
983
984    pub fn evict_tenant_by_size(&self, max_size: usize) {
985        // Calculate used size and collect leaves
986        let mut stack = vec![Arc::clone(&self.root)];
987        let mut pq = BinaryHeap::new();
988
989        while let Some(curr) = stack.pop() {
990            for child in &curr.children {
991                stack.push(Arc::clone(child.value()));
992            }
993
994            // Add leaves to priority queue
995            for tenant in Tree::leaf_of(&curr) {
996                if let Some(timestamp) = curr.tenant_last_access_time.get(tenant.as_ref()) {
997                    pq.push(Reverse(EvictionEntry {
998                        timestamp: *timestamp,
999                        tenant: Arc::clone(&tenant),
1000                        node: Arc::clone(&curr),
1001                    }));
1002                }
1003            }
1004        }
1005
1006        debug!("Before eviction - Used size per tenant:");
1007        for entry in &self.tenant_char_count {
1008            debug!("Tenant: {}, Size: {}", entry.key(), entry.value());
1009        }
1010
1011        // Process eviction
1012        while let Some(Reverse(entry)) = pq.pop() {
1013            let EvictionEntry { tenant, node, .. } = entry;
1014
1015            if let Some(used_size) = self.tenant_char_count.get(tenant.as_ref()) {
1016                if *used_size <= max_size {
1017                    continue;
1018                }
1019            }
1020
1021            // Verify this node is still a leaf for this tenant (may have changed)
1022            // A node is a leaf for a tenant if no children have that tenant
1023            let is_still_leaf = node.tenant_last_access_time.contains_key(tenant.as_ref())
1024                && !node.children.iter().any(|child| {
1025                    child
1026                        .value()
1027                        .tenant_last_access_time
1028                        .contains_key(tenant.as_ref())
1029                });
1030            if !is_still_leaf {
1031                continue;
1032            }
1033
1034            // Decrement when removing tenant from node
1035            let node_len = node.text.read().char_count();
1036            self.tenant_char_count
1037                .entry(Arc::clone(&tenant))
1038                .and_modify(|count| {
1039                    *count = count.saturating_sub(node_len);
1040                });
1041
1042            // Remove tenant from node
1043            node.tenant_last_access_time.remove(tenant.as_ref());
1044
1045            // Get parent reference outside of the borrow scope
1046            // Use Weak::upgrade() to get a strong reference if the parent still exists
1047            let parent_opt = node.parent.read().as_ref().and_then(Weak::upgrade);
1048
1049            // Remove empty nodes
1050            if node.children.is_empty() && node.tenant_last_access_time.is_empty() {
1051                if let Some(ref parent) = parent_opt {
1052                    if let Some(fc) = node.text.read().first_char() {
1053                        parent.children.remove(&fc);
1054                    }
1055                }
1056            }
1057
1058            // If parent has this tenant and no other children have it,
1059            // parent becomes a new leaf - add to priority queue
1060            if let Some(ref parent) = parent_opt {
1061                if parent.tenant_last_access_time.contains_key(tenant.as_ref()) {
1062                    let has_child_with_tenant = parent.children.iter().any(|child| {
1063                        child
1064                            .value()
1065                            .tenant_last_access_time
1066                            .contains_key(tenant.as_ref())
1067                    });
1068
1069                    if !has_child_with_tenant {
1070                        // Add parent to priority queue as new leaf
1071                        if let Some(timestamp) = parent.tenant_last_access_time.get(tenant.as_ref())
1072                        {
1073                            pq.push(Reverse(EvictionEntry {
1074                                timestamp: *timestamp,
1075                                tenant: Arc::clone(&tenant),
1076                                node: Arc::clone(parent),
1077                            }));
1078                        }
1079                    }
1080                }
1081            }
1082        }
1083
1084        debug!("After eviction - Used size per tenant:");
1085        for entry in &self.tenant_char_count {
1086            debug!("Tenant: {}, Size: {}", entry.key(), entry.value());
1087        }
1088    }
1089
1090    // TODO: Implement efficient remove_tenant with reverse index.
1091    // See lib.rs for design options. Current naive O(n) traversal removed.
1092    // For now, stale entries are cleaned up by LRU eviction.
1093
1094    pub fn get_tenant_char_count(&self) -> HashMap<String, usize> {
1095        self.tenant_char_count
1096            .iter()
1097            .map(|entry| (entry.key().to_string(), *entry.value()))
1098            .collect()
1099    }
1100
1101    pub fn get_used_size_per_tenant(&self) -> HashMap<String, usize> {
1102        // perform a DFS to traverse all nodes and calculate the total size used by each tenant
1103
1104        let mut used_size_per_tenant: HashMap<String, usize> = HashMap::new();
1105        let mut stack = vec![Arc::clone(&self.root)];
1106
1107        while let Some(curr) = stack.pop() {
1108            // Use cached char count instead of chars().count()
1109            let text_count = curr.text.read().char_count();
1110
1111            for tenant in &curr.tenant_last_access_time {
1112                let size = used_size_per_tenant
1113                    .entry(tenant.key().to_string())
1114                    .or_insert(0);
1115                *size += text_count;
1116            }
1117
1118            for child in &curr.children {
1119                stack.push(Arc::clone(child.value()));
1120            }
1121        }
1122
1123        used_size_per_tenant
1124    }
1125
1126    /// Evict entries for a specific tenant to reduce to max_chars.
1127    pub fn evict_by_tenant(&self, tenant: &TenantId, max_chars: usize) {
1128        let current_count = self.tenant_char_count.get(tenant).map(|v| *v).unwrap_or(0);
1129
1130        if current_count <= max_chars {
1131            return;
1132        }
1133
1134        // Use existing eviction logic by temporarily setting a target
1135        // We need to evict (current_count - max_chars) chars
1136        self.evict_tenant_entries(tenant, current_count - max_chars);
1137    }
1138
1139    /// Remove a tenant from all nodes in the tree, including root.
1140    /// Used for mesh eviction propagation — when a remote node reports
1141    /// that a worker evicted all its cached prefixes.
1142    pub fn remove_tenant_all(&self, tenant_id: &TenantId) {
1143        // collect_tenant_nodes skips root (root is never LRU-evicted),
1144        // but global removal must include it.
1145        self.remove_tenant_from_node(&self.root, tenant_id);
1146
1147        let mut nodes: Vec<(NodeRef, u64)> = Vec::new();
1148        self.collect_tenant_nodes(&self.root, tenant_id, &mut nodes);
1149        for (node, _) in &nodes {
1150            self.remove_tenant_from_node(node, tenant_id);
1151        }
1152        self.tenant_char_count.remove(tenant_id);
1153    }
1154
1155    /// Evict a specific number of chars for a tenant using LRU ordering.
1156    fn evict_tenant_entries(&self, tenant_id: &TenantId, chars_to_evict: usize) {
1157        if chars_to_evict == 0 {
1158            return;
1159        }
1160
1161        let mut evicted = 0;
1162
1163        // Collect nodes with timestamps for LRU eviction
1164        let mut nodes_with_time: Vec<(NodeRef, u64)> = Vec::new();
1165        self.collect_tenant_nodes(&self.root, tenant_id, &mut nodes_with_time);
1166
1167        // Sort by timestamp (oldest first)
1168        nodes_with_time.sort_by_key(|(_, ts)| *ts);
1169
1170        for (node, _) in nodes_with_time {
1171            if evicted >= chars_to_evict {
1172                break;
1173            }
1174
1175            let node_chars = node.text.read().char_count();
1176            if self.remove_tenant_from_node(&node, tenant_id) {
1177                evicted += node_chars;
1178            }
1179        }
1180
1181        // Update tenant char count
1182        self.tenant_char_count
1183            .entry(tenant_id.clone())
1184            .and_modify(|count| *count = count.saturating_sub(evicted));
1185    }
1186
1187    fn collect_tenant_nodes(
1188        &self,
1189        node: &NodeRef,
1190        tenant_id: &TenantId,
1191        result: &mut Vec<(NodeRef, u64)>,
1192    ) {
1193        // Skip root node as it should never be evicted
1194        if !Arc::ptr_eq(node, &self.root) {
1195            if let Some(ts) = node.tenant_last_access_time.get(tenant_id) {
1196                result.push((Arc::clone(node), *ts));
1197            }
1198        }
1199
1200        for child in &node.children {
1201            self.collect_tenant_nodes(child.value(), tenant_id, result);
1202        }
1203    }
1204
1205    #[expect(
1206        clippy::unused_self,
1207        reason = "method logically belongs to the tree instance; keeps API consistent with collect_tenant_nodes"
1208    )]
1209    fn remove_tenant_from_node(&self, node: &NodeRef, tenant_id: &TenantId) -> bool {
1210        node.tenant_last_access_time.remove(tenant_id).is_some()
1211    }
1212
1213    /// Get the char count for a specific tenant.
1214    pub fn tenant_char_size(&self, tenant: &TenantId) -> usize {
1215        self.tenant_char_count.get(tenant).map(|v| *v).unwrap_or(0)
1216    }
1217
1218    /// Clear the tree to empty state.
1219    pub fn clear(&self) {
1220        // Clear root's children
1221        self.root.children.clear();
1222        // Clear root's tenant timestamps (except keep structure)
1223        self.root.tenant_last_access_time.clear();
1224        // Clear tenant char counts
1225        self.tenant_char_count.clear();
1226        // Reset root text
1227        *self.root.text.write() = NodeText::new(String::new());
1228    }
1229
1230    fn node_to_string(node: &NodeRef, prefix: &str, is_last: bool) -> String {
1231        let mut result = String::new();
1232
1233        // Add prefix and branch character
1234        result.push_str(prefix);
1235        result.push_str(if is_last { "└── " } else { "├── " });
1236
1237        // Add node text
1238        let node_text = node.text.read();
1239        result.push_str(&format!("'{}' [", node_text.as_str()));
1240
1241        // Add tenant information with epoch values
1242        let mut tenant_info = Vec::new();
1243        for entry in &node.tenant_last_access_time {
1244            let tenant_id = entry.key();
1245            let epoch = entry.value();
1246            tenant_info.push(format!("{tenant_id} | epoch:{epoch}"));
1247        }
1248
1249        result.push_str(&tenant_info.join(", "));
1250        result.push_str("]\n");
1251
1252        // Process children
1253        let children: Vec<_> = node.children.iter().collect();
1254        let child_count = children.len();
1255
1256        for (i, entry) in children.iter().enumerate() {
1257            let is_last_child = i == child_count - 1;
1258            let new_prefix = format!("{}{}", prefix, if is_last { "    " } else { "│   " });
1259
1260            result.push_str(&Tree::node_to_string(
1261                entry.value(),
1262                &new_prefix,
1263                is_last_child,
1264            ));
1265        }
1266
1267        result
1268    }
1269
1270    #[expect(
1271        clippy::print_stdout,
1272        reason = "diagnostic method intended for debugging and test output"
1273    )]
1274    pub fn pretty_print(&self) {
1275        if self.root.children.is_empty() {
1276            return;
1277        }
1278
1279        let mut result = String::new();
1280        let children: Vec<_> = self.root.children.iter().collect();
1281        let child_count = children.len();
1282
1283        for (i, entry) in children.iter().enumerate() {
1284            let is_last = i == child_count - 1;
1285            result.push_str(&Tree::node_to_string(entry.value(), "", is_last));
1286        }
1287
1288        println!("{result}");
1289    }
1290
1291    /// Create a compact snapshot of the tree for mesh synchronization.
1292    ///
1293    /// Walks the tree depth-first (pre-order) and emits each node's edge
1294    /// label + tenant list.  Shared prefixes are stored once — a tree
1295    /// with 2048 entries sharing 80% prefixes serializes to ~2-4 MB
1296    /// instead of ~40 MB as flat insert operations.
1297    ///
1298    /// # Concurrency
1299    ///
1300    /// This traversal is NOT atomic — concurrent `insert_text` calls may
1301    /// split or replace nodes during the walk.  The per-node DashMap and
1302    /// RwLock guards ensure individual reads are safe, but the snapshot
1303    /// may reflect a mix of pre-split and post-split state.  This is
1304    /// acceptable for mesh sync (eventual consistency).
1305    pub fn snapshot(&self) -> crate::snapshot::TreeSnapshot {
1306        let mut nodes = Vec::new();
1307        Self::snapshot_node(&self.root, &mut nodes);
1308        crate::snapshot::TreeSnapshot { nodes }
1309    }
1310
1311    fn snapshot_node(node: &Node, out: &mut Vec<crate::snapshot::SnapshotNode>) {
1312        let text = node.text.read();
1313        let edge = text.as_str().to_string();
1314        drop(text);
1315
1316        // Collect tenants with their epochs
1317        let tenants: Vec<(String, u64)> = node
1318            .tenant_last_access_time
1319            .iter()
1320            .map(|entry| (entry.key().to_string(), *entry.value()))
1321            .collect();
1322
1323        // Capture children list BEFORE writing child_count to ensure
1324        // the count matches the actual children we visit.
1325        let mut children: Vec<(char, NodeRef)> = node
1326            .children
1327            .iter()
1328            .map(|entry| (*entry.key(), entry.value().clone()))
1329            .collect();
1330        children.sort_by_key(|(c, _)| *c);
1331
1332        out.push(crate::snapshot::SnapshotNode {
1333            edge,
1334            tenants,
1335            child_count: children.len() as u32,
1336        });
1337
1338        for (_, child) in children {
1339            Self::snapshot_node(&child, out);
1340        }
1341    }
1342
1343    /// Lazily walk the tree in pre-order, yielding each node's
1344    /// `(prefix, [(tenant, epoch)])` pair without materializing
1345    /// the full tree as a single buffer. Empty-tenant nodes are
1346    /// skipped (they're routing intermediates, not real entries).
1347    /// Children are visited in deterministic char-sorted order so
1348    /// callers paging the iterator can resume by re-running and
1349    /// skipping the first N items if the tree is unchanged.
1350    ///
1351    /// Like [`Tree::snapshot`], the walk is not atomic — concurrent
1352    /// `insert_text` may split or replace nodes mid-walk; the
1353    /// iterator may then reflect a mix of pre- and post-split
1354    /// state. Acceptable for mesh sync (eventual consistency).
1355    pub fn iter_entries(&self) -> EntriesIter {
1356        EntriesIter::new(self)
1357    }
1358}
1359
1360/// Iterator yielded by [`Tree::iter_entries`]. Owns `Arc<Node>`
1361/// clones for the path it's currently inside so it survives
1362/// across awaits or any temporary release of `&Tree`.
1363pub struct EntriesIter {
1364    /// DFS frame stack. The top frame's `remaining_children` is
1365    /// the queue of children we still have to visit at the
1366    /// current depth. Frames are pushed when descending, popped
1367    /// when their children exhaust.
1368    stack: Vec<EntryFrame>,
1369    /// Mutable accumulated path-from-root. Pushed when descending
1370    /// into a child, truncated when popping a frame.
1371    path: String,
1372    /// Pre-staged emission. Set when a node with tenants is
1373    /// entered; consumed by the next `next()` call before
1374    /// continuing the walk.
1375    pending: Option<(String, Vec<(TenantId, u64)>)>,
1376}
1377
1378struct EntryFrame {
1379    remaining_children: std::vec::IntoIter<NodeRef>,
1380    /// Byte length of this frame's edge text — used to truncate
1381    /// the path buffer in O(1) when popping. Each descent pushes
1382    /// a complete `&str` of this length, so popping the same
1383    /// byte count always lands on a UTF-8 char boundary.
1384    edge_byte_len: usize,
1385}
1386
1387impl EntriesIter {
1388    fn new(tree: &Tree) -> Self {
1389        let root_tenants = snapshot_tenants(&tree.root);
1390        let pending = (!root_tenants.is_empty()).then(|| (String::new(), root_tenants));
1391        Self {
1392            stack: vec![EntryFrame {
1393                remaining_children: collect_sorted_children(&tree.root).into_iter(),
1394                edge_byte_len: 0,
1395            }],
1396            path: String::new(),
1397            pending,
1398        }
1399    }
1400}
1401
1402impl Iterator for EntriesIter {
1403    type Item = (String, Vec<(TenantId, u64)>);
1404
1405    fn next(&mut self) -> Option<Self::Item> {
1406        if let Some(entry) = self.pending.take() {
1407            return Some(entry);
1408        }
1409        loop {
1410            let frame = self.stack.last_mut()?;
1411            if let Some(child) = frame.remaining_children.next() {
1412                // Push the edge directly through the lock guard —
1413                // no intermediate clone — and capture its byte
1414                // length so we can truncate atomically on pop.
1415                let edge_byte_len = {
1416                    let guard = child.text.read();
1417                    let s = guard.as_str();
1418                    self.path.push_str(s);
1419                    s.len()
1420                };
1421
1422                let tenants = snapshot_tenants(&child);
1423                self.stack.push(EntryFrame {
1424                    remaining_children: collect_sorted_children(&child).into_iter(),
1425                    edge_byte_len,
1426                });
1427                if !tenants.is_empty() {
1428                    return Some((self.path.clone(), tenants));
1429                }
1430                // Empty tenants → loop continues, descending into
1431                // this child's children on the next iteration.
1432            } else {
1433                let edge_byte_len = frame.edge_byte_len;
1434                self.stack.pop();
1435                let new_len = self.path.len().saturating_sub(edge_byte_len);
1436                self.path.truncate(new_len);
1437            }
1438        }
1439    }
1440}
1441
1442fn snapshot_tenants(node: &NodeRef) -> Vec<(TenantId, u64)> {
1443    let mut tenants: Vec<(TenantId, u64)> = node
1444        .tenant_last_access_time
1445        .iter()
1446        .map(|entry| (Arc::clone(entry.key()), *entry.value()))
1447        .collect();
1448    tenants.sort_by(|a, b| a.0.cmp(&b.0));
1449    tenants
1450}
1451
1452fn collect_sorted_children(node: &NodeRef) -> Vec<NodeRef> {
1453    let mut children: Vec<(char, NodeRef)> = node
1454        .children
1455        .iter()
1456        .map(|entry| (*entry.key(), entry.value().clone()))
1457        .collect();
1458    children.sort_by_key(|(c, _)| *c);
1459    children.into_iter().map(|(_, n)| n).collect()
1460}
1461
1462impl Tree {
1463    /// Reconstruct a tree from a snapshot.
1464    ///
1465    /// The snapshot must be in pre-order (parent before children) with
1466    /// `child_count` fields indicating tree structure.
1467    pub fn from_snapshot(snapshot: &crate::snapshot::TreeSnapshot) -> Self {
1468        let tree = Tree::new();
1469        if snapshot.nodes.is_empty() {
1470            return tree;
1471        }
1472
1473        let mut idx = 0;
1474        Self::restore_node(
1475            &tree.root,
1476            &snapshot.nodes,
1477            &mut idx,
1478            &tree.tenant_char_count,
1479        );
1480        tree
1481    }
1482
1483    fn restore_node(
1484        target: &NodeRef,
1485        nodes: &[crate::snapshot::SnapshotNode],
1486        idx: &mut usize,
1487        tenant_counts: &DashMap<TenantId, usize>,
1488    ) {
1489        if *idx >= nodes.len() {
1490            return;
1491        }
1492
1493        let snap_node = &nodes[*idx];
1494        *idx += 1;
1495
1496        // Set edge text
1497        *target.text.write() = NodeText::new(snap_node.edge.clone());
1498
1499        // Set tenants
1500        for (tenant_str, epoch) in &snap_node.tenants {
1501            let tenant_id = intern_tenant(tenant_str);
1502            target
1503                .tenant_last_access_time
1504                .insert(Arc::clone(&tenant_id), *epoch);
1505
1506            // Track char counts
1507            let edge_chars = snap_node.edge.chars().count();
1508            tenant_counts
1509                .entry(tenant_id)
1510                .and_modify(|c| *c += edge_chars)
1511                .or_insert(edge_chars);
1512        }
1513
1514        // Restore children
1515        for _ in 0..snap_node.child_count {
1516            if *idx >= nodes.len() {
1517                break;
1518            }
1519            let child_snap = &nodes[*idx];
1520            let Some(first_char) = child_snap.edge.chars().next() else {
1521                // Empty edge for a child node — skip it. Advance idx
1522                // past this node and all its descendants.
1523                fn skip(nodes: &[crate::snapshot::SnapshotNode], idx: &mut usize) {
1524                    if *idx >= nodes.len() {
1525                        return;
1526                    }
1527                    let cc = nodes[*idx].child_count;
1528                    *idx += 1;
1529                    for _ in 0..cc {
1530                        skip(nodes, idx);
1531                    }
1532                }
1533                skip(nodes, idx);
1534                continue;
1535            };
1536
1537            let child_node = Arc::new(Node {
1538                children: new_children_map(),
1539                text: RwLock::new(NodeText::empty()),
1540                tenant_last_access_time: new_tenant_map(),
1541                parent: RwLock::new(Some(Arc::downgrade(target))),
1542                last_tenant: RwLock::new(None),
1543            });
1544
1545            Self::restore_node(&child_node, nodes, idx, tenant_counts);
1546            target.children.insert(first_char, child_node);
1547        }
1548    }
1549
1550    /// Merge a remote snapshot into this tree.
1551    ///
1552    /// Reconstructs the remote tree from the snapshot, then walks both
1553    /// trees recursively, handling three cases for each child:
1554    /// 1. Exact edge match → recurse into both subtrees
1555    /// 2. Local edge is prefix of remote → descend into local, continue with remainder
1556    /// 3. Shared prefix shorter than both → split local node, attach remote remainder
1557    pub fn merge_snapshot(&self, snapshot: &crate::snapshot::TreeSnapshot) {
1558        if snapshot.nodes.is_empty() {
1559            return;
1560        }
1561        // Reconstruct the remote tree, then merge tree-to-tree
1562        let remote_tree = Tree::from_snapshot(snapshot);
1563        self.merge_tree(&remote_tree);
1564    }
1565
1566    /// Merge another tree into this tree.
1567    ///
1568    /// Walks both trees recursively. At each node, merges tenant maps
1569    /// (remote wins on newer epoch) and reconciles children using the
1570    /// three-case edge comparison.
1571    pub fn merge_tree(&self, remote: &Tree) {
1572        Self::merge_nodes(&self.root, &remote.root, &self.tenant_char_count);
1573    }
1574
1575    fn merge_nodes(local: &NodeRef, remote: &NodeRef, tenant_counts: &DashMap<TenantId, usize>) {
1576        // Merge tenants at this node
1577        for entry in &remote.tenant_last_access_time {
1578            let tenant_id = Arc::clone(entry.key());
1579            let remote_epoch = *entry.value();
1580
1581            let is_new = !local
1582                .tenant_last_access_time
1583                .contains_key(tenant_id.as_ref());
1584
1585            let should_update = local
1586                .tenant_last_access_time
1587                .get(tenant_id.as_ref())
1588                .map(|local_epoch| remote_epoch > *local_epoch)
1589                .unwrap_or(true);
1590
1591            if should_update {
1592                local
1593                    .tenant_last_access_time
1594                    .insert(Arc::clone(&tenant_id), remote_epoch);
1595
1596                if is_new {
1597                    let edge_chars = local.text.read().char_count();
1598                    tenant_counts
1599                        .entry(tenant_id)
1600                        .and_modify(|c| *c += edge_chars)
1601                        .or_insert(edge_chars);
1602                }
1603            }
1604        }
1605
1606        // Merge children
1607        for remote_entry in &remote.children {
1608            let rc = *remote_entry.key();
1609            let remote_child = remote_entry.value().clone();
1610            let remote_edge = remote_child.text.read().as_str().to_string();
1611
1612            if let Some(local_entry) = local.children.get(&rc) {
1613                let local_child = local_entry.value().clone();
1614                drop(local_entry); // release DashMap guard before mutating
1615
1616                let local_edge = local_child.text.read().as_str().to_string();
1617                let shared = shared_prefix_count(&local_edge, &remote_edge);
1618
1619                if shared == local_edge.chars().count() && shared == remote_edge.chars().count() {
1620                    // Case 1: exact match — recurse
1621                    Self::merge_nodes(&local_child, &remote_child, tenant_counts);
1622                } else if shared == local_edge.chars().count() {
1623                    // Case 2: local edge is a prefix of remote edge.
1624                    // Descend into local child. The remote child's edge
1625                    // has a remainder that maps to a deeper level.
1626                    //
1627                    // First, merge remote tenants into local_child (the prefix node).
1628                    // The remote tenant owns the full path including this prefix.
1629                    for entry in &remote_child.tenant_last_access_time {
1630                        let tid = Arc::clone(entry.key());
1631                        let epoch = *entry.value();
1632                        let is_new = !local_child
1633                            .tenant_last_access_time
1634                            .contains_key(tid.as_ref());
1635                        let should_update = local_child
1636                            .tenant_last_access_time
1637                            .get(tid.as_ref())
1638                            .map(|e| epoch > *e)
1639                            .unwrap_or(true);
1640                        if should_update {
1641                            local_child
1642                                .tenant_last_access_time
1643                                .insert(Arc::clone(&tid), epoch);
1644                            if is_new {
1645                                let edge_chars = local_edge.chars().count();
1646                                tenant_counts
1647                                    .entry(tid)
1648                                    .and_modify(|c| *c += edge_chars)
1649                                    .or_insert(edge_chars);
1650                            }
1651                        }
1652                    }
1653
1654                    let remote_remainder = advance_by_chars(&remote_edge, shared);
1655                    let Some(rem_first) = remote_remainder.chars().next() else {
1656                        continue;
1657                    };
1658
1659                    // Create a virtual remote node with the trimmed edge.
1660                    // Use clone_subtree for children to rewrite parent pointers.
1661                    let trimmed_remote = Arc::new(Node {
1662                        children: new_children_map(),
1663                        text: RwLock::new(NodeText::new(remote_remainder.to_string())),
1664                        tenant_last_access_time: remote_child.tenant_last_access_time.clone(),
1665                        parent: RwLock::new(None),
1666                        last_tenant: RwLock::new(remote_child.last_tenant.read().clone()),
1667                    });
1668                    for child_entry in &remote_child.children {
1669                        let child_clone =
1670                            Self::clone_subtree(child_entry.value(), Some(&trimmed_remote));
1671                        trimmed_remote
1672                            .children
1673                            .insert(*child_entry.key(), child_clone);
1674                    }
1675
1676                    if let Some(deeper_local) = local_child.children.get(&rem_first) {
1677                        let deeper_local = deeper_local.value().clone();
1678                        // Recurse: merge trimmed remote into the deeper local child
1679                        Self::merge_nodes(&deeper_local, &trimmed_remote, tenant_counts);
1680                    } else {
1681                        // No local child at this position — graft remote subtree
1682                        *trimmed_remote.parent.write() = Some(Arc::downgrade(&local_child));
1683                        Self::accumulate_tenant_counts(&trimmed_remote, tenant_counts);
1684                        local_child.children.insert(rem_first, trimmed_remote);
1685                    }
1686                } else {
1687                    // Case 3: shared prefix shorter than at least one edge.
1688                    // Covers both "shorter than both" and "remote is prefix of local".
1689                    // Split local node at the shared prefix point,
1690                    // then attach remote remainder as a sibling.
1691
1692                    // Split local edge: "Hello world" at shared=6 →
1693                    //   split_node edge: "Hello "
1694                    //   local_child edge becomes: "world"
1695                    let (split_text, local_remainder_text) = {
1696                        let text = local_child.text.read();
1697                        text.split_at_char(shared)
1698                    };
1699
1700                    // Case 3 guarantees shared < local_edge, so remainder is non-empty.
1701                    let Some(local_remainder_first) = local_remainder_text.first_char() else {
1702                        continue;
1703                    };
1704
1705                    // Create the split node (intermediate).
1706                    // Tenants: union of local and remote at the shared prefix.
1707                    let split_node = Arc::new(Node {
1708                        children: new_children_map(),
1709                        text: RwLock::new(split_text),
1710                        tenant_last_access_time: local_child.tenant_last_access_time.clone(),
1711                        parent: RwLock::new(Some(Arc::downgrade(local))),
1712                        last_tenant: RwLock::new(local_child.last_tenant.read().clone()),
1713                    });
1714                    // Union remote tenants into the split node, updating
1715                    // tenant_char_count for newly added tenants.
1716                    let split_chars = shared;
1717                    for entry in &remote_child.tenant_last_access_time {
1718                        let tid = Arc::clone(entry.key());
1719                        let epoch = *entry.value();
1720                        let is_new = !split_node
1721                            .tenant_last_access_time
1722                            .contains_key(tid.as_ref());
1723                        let should_update = split_node
1724                            .tenant_last_access_time
1725                            .get(tid.as_ref())
1726                            .map(|e| epoch > *e)
1727                            .unwrap_or(true);
1728                        if should_update {
1729                            split_node
1730                                .tenant_last_access_time
1731                                .insert(Arc::clone(&tid), epoch);
1732                            if is_new {
1733                                tenant_counts
1734                                    .entry(tid)
1735                                    .and_modify(|c| *c += split_chars)
1736                                    .or_insert(split_chars);
1737                            }
1738                        }
1739                    }
1740
1741                    // Push local child down as child of split node
1742                    *local_child.text.write() = local_remainder_text;
1743                    *local_child.parent.write() = Some(Arc::downgrade(&split_node));
1744                    split_node
1745                        .children
1746                        .insert(local_remainder_first, Arc::clone(&local_child));
1747
1748                    // Create remote remainder as another child of split node
1749                    // Use clone_subtree to rewrite parent pointers correctly
1750                    let remote_remainder = advance_by_chars(&remote_edge, shared);
1751                    if let Some(rem_first) = remote_remainder.chars().next() {
1752                        let remote_subtree = Arc::new(Node {
1753                            children: new_children_map(),
1754                            text: RwLock::new(NodeText::new(remote_remainder.to_string())),
1755                            tenant_last_access_time: remote_child.tenant_last_access_time.clone(),
1756                            parent: RwLock::new(Some(Arc::downgrade(&split_node))),
1757                            last_tenant: RwLock::new(remote_child.last_tenant.read().clone()),
1758                        });
1759                        // Clone remote children with correct parent pointers
1760                        for child_entry in &remote_child.children {
1761                            let child_clone =
1762                                Self::clone_subtree(child_entry.value(), Some(&remote_subtree));
1763                            remote_subtree
1764                                .children
1765                                .insert(*child_entry.key(), child_clone);
1766                        }
1767                        Self::accumulate_tenant_counts(&remote_subtree, tenant_counts);
1768                        split_node.children.insert(rem_first, remote_subtree);
1769                    }
1770
1771                    // Replace local's child with the split node
1772                    local.children.insert(rc, split_node);
1773                }
1774            } else {
1775                // No local child at this char — copy entire remote subtree
1776                let cloned = Self::clone_subtree(&remote_child, Some(local));
1777                Self::accumulate_tenant_counts(&cloned, tenant_counts);
1778                local.children.insert(rc, cloned);
1779            }
1780        }
1781    }
1782
1783    /// Walk a subtree and accumulate tenant char counts into the
1784    /// tree-level `tenant_char_count` map.  Called after grafting a
1785    /// remote subtree into the local tree so that size tracking and
1786    /// eviction remain correct.
1787    fn accumulate_tenant_counts(node: &NodeRef, tenant_counts: &DashMap<TenantId, usize>) {
1788        let edge_chars = node.text.read().char_count();
1789        for entry in &node.tenant_last_access_time {
1790            let tid = Arc::clone(entry.key());
1791            tenant_counts
1792                .entry(tid)
1793                .and_modify(|c| *c += edge_chars)
1794                .or_insert(edge_chars);
1795        }
1796        for child_entry in &node.children {
1797            Self::accumulate_tenant_counts(child_entry.value(), tenant_counts);
1798        }
1799    }
1800
1801    /// Deep clone a subtree, setting parent pointers correctly.
1802    fn clone_subtree(node: &NodeRef, parent: Option<&NodeRef>) -> NodeRef {
1803        let new_node = Arc::new(Node {
1804            children: new_children_map(),
1805            text: RwLock::new(node.text.read().clone_text()),
1806            tenant_last_access_time: node.tenant_last_access_time.clone(),
1807            parent: RwLock::new(parent.map(Arc::downgrade)),
1808            last_tenant: RwLock::new(node.last_tenant.read().clone()),
1809        });
1810
1811        for entry in &node.children {
1812            let child_clone = Self::clone_subtree(entry.value(), Some(&new_node));
1813            new_node.children.insert(*entry.key(), child_clone);
1814        }
1815
1816        new_node
1817    }
1818}
1819
1820impl RadixTree for Tree {
1821    type Key = str;
1822    type MatchResult = PrefixMatchResult;
1823
1824    fn insert(&self, key: &Self::Key, tenant: &str) {
1825        self.insert_text(key, tenant);
1826    }
1827
1828    fn prefix_match(&self, key: &Self::Key) -> Option<TenantId> {
1829        let result = self.match_prefix_with_counts(key);
1830        if result.matched_char_count > 0 {
1831            Some(result.tenant)
1832        } else {
1833            None
1834        }
1835    }
1836
1837    fn prefix_match_with_counts(&self, key: &Self::Key) -> Self::MatchResult {
1838        self.match_prefix_with_counts(key)
1839    }
1840
1841    fn evict(&self, tenant: &TenantId, max_units: usize) {
1842        self.evict_by_tenant(tenant, max_units);
1843    }
1844
1845    fn tenant_size(&self, tenant: &TenantId) -> usize {
1846        self.tenant_char_size(tenant)
1847    }
1848
1849    fn reset(&self) {
1850        self.clear();
1851    }
1852}
1853
1854//  Unit tests
1855#[cfg(test)]
1856#[expect(clippy::print_stdout, reason = "test diagnostics")]
1857mod tests {
1858    use std::{
1859        thread,
1860        time::{Duration, Instant},
1861    };
1862
1863    use rand::{
1864        distr::{Alphanumeric, SampleString},
1865        rng as thread_rng, Rng,
1866    };
1867
1868    use super::*;
1869
1870    /// Helper to convert tenant_char_count to HashMap<String, usize> for comparison
1871    fn get_maintained_counts(tree: &Tree) -> HashMap<String, usize> {
1872        tree.tenant_char_count
1873            .iter()
1874            .map(|entry| (entry.key().to_string(), *entry.value()))
1875            .collect()
1876    }
1877
1878    #[test]
1879    fn test_tenant_char_count() {
1880        let tree = Tree::new();
1881
1882        tree.insert_text("apple", "tenant1");
1883        tree.insert_text("apricot", "tenant1");
1884        tree.insert_text("banana", "tenant1");
1885        tree.insert_text("amplify", "tenant2");
1886        tree.insert_text("application", "tenant2");
1887
1888        let computed_sizes = tree.get_used_size_per_tenant();
1889        let maintained_counts = get_maintained_counts(&tree);
1890
1891        println!("Phase 1 - Maintained vs Computed counts:");
1892        println!("Maintained: {maintained_counts:?}\nComputed: {computed_sizes:?}");
1893        assert_eq!(
1894            maintained_counts, computed_sizes,
1895            "Phase 1: Initial insertions"
1896        );
1897
1898        tree.insert_text("apartment", "tenant1");
1899        tree.insert_text("appetite", "tenant2");
1900        tree.insert_text("ball", "tenant1");
1901        tree.insert_text("box", "tenant2");
1902
1903        let computed_sizes = tree.get_used_size_per_tenant();
1904        let maintained_counts = get_maintained_counts(&tree);
1905
1906        println!("Phase 2 - Maintained vs Computed counts:");
1907        println!("Maintained: {maintained_counts:?}\nComputed: {computed_sizes:?}");
1908        assert_eq!(
1909            maintained_counts, computed_sizes,
1910            "Phase 2: Additional insertions"
1911        );
1912
1913        tree.insert_text("zebra", "tenant1");
1914        tree.insert_text("zebra", "tenant2");
1915        tree.insert_text("zero", "tenant1");
1916        tree.insert_text("zero", "tenant2");
1917
1918        let computed_sizes = tree.get_used_size_per_tenant();
1919        let maintained_counts = get_maintained_counts(&tree);
1920
1921        println!("Phase 3 - Maintained vs Computed counts:");
1922        println!("Maintained: {maintained_counts:?}\nComputed: {computed_sizes:?}");
1923        assert_eq!(
1924            maintained_counts, computed_sizes,
1925            "Phase 3: Overlapping insertions"
1926        );
1927
1928        tree.evict_tenant_by_size(10);
1929
1930        let computed_sizes = tree.get_used_size_per_tenant();
1931        let maintained_counts = get_maintained_counts(&tree);
1932
1933        println!("Phase 4 - Maintained vs Computed counts:");
1934        println!("Maintained: {maintained_counts:?}\nComputed: {computed_sizes:?}");
1935        assert_eq!(maintained_counts, computed_sizes, "Phase 4: After eviction");
1936    }
1937
1938    fn random_string(len: usize) -> String {
1939        Alphanumeric.sample_string(&mut thread_rng(), len)
1940    }
1941
1942    #[test]
1943    fn test_cold_start() {
1944        let tree = Tree::new();
1945
1946        let (matched_text, tenant) = tree.prefix_match_legacy("hello");
1947
1948        assert_eq!(matched_text, "");
1949        assert_eq!(tenant, "empty");
1950    }
1951
1952    #[test]
1953    fn test_exact_match_seq() {
1954        let tree = Tree::new();
1955        tree.insert_text("hello", "tenant1");
1956        tree.pretty_print();
1957        tree.insert_text("apple", "tenant2");
1958        tree.pretty_print();
1959        tree.insert_text("banana", "tenant3");
1960        tree.pretty_print();
1961
1962        let (matched_text, tenant) = tree.prefix_match_legacy("hello");
1963        assert_eq!(matched_text, "hello");
1964        assert_eq!(tenant, "tenant1");
1965
1966        let (matched_text, tenant) = tree.prefix_match_legacy("apple");
1967        assert_eq!(matched_text, "apple");
1968        assert_eq!(tenant, "tenant2");
1969
1970        let (matched_text, tenant) = tree.prefix_match_legacy("banana");
1971        assert_eq!(matched_text, "banana");
1972        assert_eq!(tenant, "tenant3");
1973    }
1974
1975    #[test]
1976    fn test_exact_match_concurrent() {
1977        let tree = Arc::new(Tree::new());
1978
1979        // spawn 3 threads for insert
1980        let tree_clone = Arc::clone(&tree);
1981
1982        let texts = ["hello", "apple", "banana"];
1983        let tenants = ["tenant1", "tenant2", "tenant3"];
1984
1985        let mut handles = vec![];
1986
1987        for i in 0..3 {
1988            let tree_clone = Arc::clone(&tree_clone);
1989            let text = texts[i];
1990            let tenant = tenants[i];
1991
1992            let handle = thread::spawn(move || {
1993                tree_clone.insert_text(text, tenant);
1994            });
1995
1996            handles.push(handle);
1997        }
1998
1999        // wait
2000        for handle in handles {
2001            handle.join().unwrap();
2002        }
2003
2004        // spawn 3 threads for match
2005        let mut handles = vec![];
2006
2007        let tree_clone = Arc::clone(&tree);
2008
2009        for i in 0..3 {
2010            let tree_clone = Arc::clone(&tree_clone);
2011            let text = texts[i];
2012            let tenant = tenants[i];
2013
2014            let handle = thread::spawn(move || {
2015                let (matched_text, matched_tenant) = tree_clone.prefix_match_legacy(text);
2016                assert_eq!(matched_text, text);
2017                assert_eq!(matched_tenant, tenant);
2018            });
2019
2020            handles.push(handle);
2021        }
2022
2023        // wait
2024        for handle in handles {
2025            handle.join().unwrap();
2026        }
2027    }
2028
2029    #[test]
2030    fn test_partial_match_concurrent() {
2031        let tree = Arc::new(Tree::new());
2032
2033        // spawn 3 threads for insert
2034        let tree_clone = Arc::clone(&tree);
2035
2036        static TEXTS: [&str; 3] = ["apple", "apabc", "acbdeds"];
2037
2038        let mut handles = vec![];
2039
2040        for text in &TEXTS {
2041            let tree_clone = Arc::clone(&tree_clone);
2042            let tenant = "tenant0";
2043
2044            let handle = thread::spawn(move || {
2045                tree_clone.insert_text(text, tenant);
2046            });
2047
2048            handles.push(handle);
2049        }
2050
2051        // wait
2052        for handle in handles {
2053            handle.join().unwrap();
2054        }
2055
2056        // spawn 3 threads for match
2057        let mut handles = vec![];
2058
2059        let tree_clone = Arc::clone(&tree);
2060
2061        for text in &TEXTS {
2062            let tree_clone = Arc::clone(&tree_clone);
2063            let tenant = "tenant0";
2064
2065            let handle = thread::spawn(move || {
2066                let (matched_text, matched_tenant) = tree_clone.prefix_match_legacy(text);
2067                assert_eq!(matched_text, *text);
2068                assert_eq!(matched_tenant, tenant);
2069            });
2070
2071            handles.push(handle);
2072        }
2073
2074        // wait
2075        for handle in handles {
2076            handle.join().unwrap();
2077        }
2078    }
2079
2080    #[test]
2081    fn test_group_prefix_insert_match_concurrent() {
2082        static PREFIXES: [&str; 4] = [
2083            "Clock strikes midnight, I'm still wide awake",
2084            "Got dreams bigger than these city lights",
2085            "Time waits for no one, gotta make my move",
2086            "Started from the bottom, that's no metaphor",
2087        ];
2088        let suffixes = [
2089            "Got too much to prove, ain't got time to lose",
2090            "History in the making, yeah, you can't erase this",
2091        ];
2092        let tree = Arc::new(Tree::new());
2093
2094        let mut handles = vec![];
2095
2096        for (i, prefix) in PREFIXES.iter().enumerate() {
2097            for suffix in &suffixes {
2098                let tree_clone = Arc::clone(&tree);
2099                let text = format!("{prefix} {suffix}");
2100                let tenant = format!("tenant{i}");
2101
2102                let handle = thread::spawn(move || {
2103                    tree_clone.insert_text(&text, &tenant);
2104                });
2105
2106                handles.push(handle);
2107            }
2108        }
2109
2110        // wait
2111        for handle in handles {
2112            handle.join().unwrap();
2113        }
2114
2115        tree.pretty_print();
2116
2117        // check matching using multi threads
2118        let mut handles = vec![];
2119
2120        for (i, prefix) in PREFIXES.iter().enumerate() {
2121            let tree_clone = Arc::clone(&tree);
2122
2123            let handle = thread::spawn(move || {
2124                let (matched_text, matched_tenant) = tree_clone.prefix_match_legacy(prefix);
2125                let tenant = format!("tenant{i}");
2126                assert_eq!(matched_text, *prefix);
2127                assert_eq!(matched_tenant, tenant);
2128            });
2129
2130            handles.push(handle);
2131        }
2132
2133        // wait
2134        for handle in handles {
2135            handle.join().unwrap();
2136        }
2137    }
2138
2139    #[test]
2140    fn test_mixed_concurrent_insert_match() {
2141        // ensure it does not deadlock instead of doing correctness check
2142
2143        static PREFIXES: [&str; 4] = [
2144            "Clock strikes midnight, I'm still wide awake",
2145            "Got dreams bigger than these city lights",
2146            "Time waits for no one, gotta make my move",
2147            "Started from the bottom, that's no metaphor",
2148        ];
2149        let suffixes = [
2150            "Got too much to prove, ain't got time to lose",
2151            "History in the making, yeah, you can't erase this",
2152        ];
2153        let tree = Arc::new(Tree::new());
2154
2155        let mut handles = vec![];
2156
2157        for (i, prefix) in PREFIXES.iter().enumerate() {
2158            for suffix in &suffixes {
2159                let tree_clone = Arc::clone(&tree);
2160                let text = format!("{prefix} {suffix}");
2161                let tenant = format!("tenant{i}");
2162
2163                let handle = thread::spawn(move || {
2164                    tree_clone.insert_text(&text, &tenant);
2165                });
2166
2167                handles.push(handle);
2168            }
2169        }
2170
2171        // check matching using multi threads
2172        for prefix in &PREFIXES {
2173            let tree_clone = Arc::clone(&tree);
2174
2175            let handle = thread::spawn(move || {
2176                let (_matched_text, _matched_tenant) = tree_clone.prefix_match_legacy(prefix);
2177            });
2178
2179            handles.push(handle);
2180        }
2181
2182        // wait
2183        for handle in handles {
2184            handle.join().unwrap();
2185        }
2186    }
2187
2188    #[test]
2189    fn test_utf8_split_seq() {
2190        // The string should be indexed and split by a utf-8 value basis instead of byte basis
2191        // use .chars() to get the iterator of the utf-8 value
2192        let tree = Arc::new(Tree::new());
2193
2194        static TEST_PAIRS: [(&str, &str); 3] = [
2195            ("你好嗎", "tenant1"),
2196            ("你好喔", "tenant2"),
2197            ("你心情好嗎", "tenant3"),
2198        ];
2199
2200        // Insert sequentially
2201        for (text, tenant) in &TEST_PAIRS {
2202            tree.insert_text(text, tenant);
2203        }
2204
2205        tree.pretty_print();
2206
2207        for (text, tenant) in &TEST_PAIRS {
2208            let (matched_text, matched_tenant) = tree.prefix_match_legacy(text);
2209            assert_eq!(matched_text, *text);
2210            assert_eq!(matched_tenant, *tenant);
2211        }
2212    }
2213
2214    #[test]
2215    fn test_utf8_split_concurrent() {
2216        let tree = Arc::new(Tree::new());
2217
2218        static TEST_PAIRS: [(&str, &str); 3] = [
2219            ("你好嗎", "tenant1"),
2220            ("你好喔", "tenant2"),
2221            ("你心情好嗎", "tenant3"),
2222        ];
2223
2224        // Create multiple threads for insertion
2225        let mut handles = vec![];
2226
2227        for (text, tenant) in &TEST_PAIRS {
2228            let tree_clone = Arc::clone(&tree);
2229
2230            let handle = thread::spawn(move || {
2231                tree_clone.insert_text(text, tenant);
2232            });
2233
2234            handles.push(handle);
2235        }
2236
2237        // Wait for all insertions to complete
2238        for handle in handles {
2239            handle.join().unwrap();
2240        }
2241
2242        tree.pretty_print();
2243
2244        // Create multiple threads for matching
2245        let mut handles = vec![];
2246
2247        for (text, tenant) in &TEST_PAIRS {
2248            let tree_clone = Arc::clone(&tree);
2249
2250            let handle = thread::spawn(move || {
2251                let (matched_text, matched_tenant) = tree_clone.prefix_match_legacy(text);
2252                assert_eq!(matched_text, *text);
2253                assert_eq!(matched_tenant, *tenant);
2254            });
2255
2256            handles.push(handle);
2257        }
2258
2259        // Wait for all matches to complete
2260        for handle in handles {
2261            handle.join().unwrap();
2262        }
2263    }
2264
2265    #[test]
2266    fn test_simple_eviction() {
2267        let tree = Tree::new();
2268        let max_size = 5;
2269
2270        // Insert strings for both tenants
2271        tree.insert_text("hello", "tenant1"); // size 5
2272
2273        tree.insert_text("hello", "tenant2"); // size 5
2274        thread::sleep(Duration::from_millis(10));
2275        tree.insert_text("world", "tenant2"); // size 5, total for tenant2 = 10
2276
2277        tree.pretty_print();
2278
2279        let sizes_before = tree.get_used_size_per_tenant();
2280        assert_eq!(sizes_before.get("tenant1").unwrap(), &5); // "hello" = 5
2281        assert_eq!(sizes_before.get("tenant2").unwrap(), &10); // "hello" + "world" = 10
2282
2283        // Evict - should remove "hello" from tenant2 as it's the oldest
2284        tree.evict_tenant_by_size(max_size);
2285
2286        tree.pretty_print();
2287
2288        let sizes_after = tree.get_used_size_per_tenant();
2289        assert_eq!(sizes_after.get("tenant1").unwrap(), &5); // Should be unchanged
2290        assert_eq!(sizes_after.get("tenant2").unwrap(), &5); // Only "world" remains
2291
2292        let (matched, tenant) = tree.prefix_match_legacy("world");
2293        assert_eq!(matched, "world");
2294        assert_eq!(tenant, "tenant2");
2295    }
2296
2297    #[test]
2298    fn test_advanced_eviction() {
2299        let tree = Tree::new();
2300
2301        // Set limits for each tenant
2302        let max_size: usize = 100;
2303
2304        // Define prefixes
2305        let prefixes = ["aqwefcisdf", "iajsdfkmade", "kjnzxcvewqe", "iejksduqasd"];
2306
2307        // Insert strings with shared prefixes
2308        for _i in 0..100 {
2309            for (j, prefix) in prefixes.iter().enumerate() {
2310                let random_suffix = random_string(10);
2311                let text = format!("{prefix}{random_suffix}");
2312                let tenant = format!("tenant{}", j + 1);
2313                tree.insert_text(&text, &tenant);
2314            }
2315        }
2316
2317        // Perform eviction
2318        tree.evict_tenant_by_size(max_size);
2319
2320        // Check sizes after eviction
2321        let sizes_after = tree.get_used_size_per_tenant();
2322        for (tenant, &size) in &sizes_after {
2323            assert!(
2324                size <= max_size,
2325                "Tenant {tenant} exceeds size limit. Current size: {size}, Limit: {max_size}"
2326            );
2327        }
2328    }
2329
2330    #[test]
2331    fn test_concurrent_operations_with_eviction() {
2332        // Ensure eviction works fine with concurrent insert and match operations for a given period
2333
2334        let tree = Arc::new(Tree::new());
2335        let mut handles = vec![];
2336        let test_duration = Duration::from_secs(10);
2337        let start_time = Instant::now();
2338        let max_size = 100; // Single max size for all tenants
2339
2340        // Spawn eviction thread
2341        {
2342            let tree = Arc::clone(&tree);
2343            let handle = thread::spawn(move || {
2344                while start_time.elapsed() < test_duration {
2345                    // Run eviction
2346                    tree.evict_tenant_by_size(max_size);
2347
2348                    // Sleep for 5 seconds
2349                    thread::sleep(Duration::from_secs(5));
2350                }
2351            });
2352            handles.push(handle);
2353        }
2354
2355        // Spawn 4 worker threads
2356        for thread_id in 0..4 {
2357            let tree = Arc::clone(&tree);
2358            let handle = thread::spawn(move || {
2359                let mut rng = rand::rng();
2360                let tenant = format!("tenant{}", thread_id + 1);
2361                let prefix = format!("prefix{thread_id}");
2362
2363                while start_time.elapsed() < test_duration {
2364                    // Random decision: match or insert (70% match, 30% insert)
2365                    if rng.random_bool(0.7) {
2366                        // Perform match operation
2367                        let random_len = rng.random_range(3..10);
2368                        let search_str = format!("{prefix}{}", random_string(random_len));
2369                        let (_matched, _) = tree.prefix_match_legacy(&search_str);
2370                    } else {
2371                        // Perform insert operation
2372                        let random_len = rng.random_range(5..15);
2373                        let insert_str = format!("{prefix}{}", random_string(random_len));
2374                        tree.insert_text(&insert_str, &tenant);
2375                        // println!("Thread {} inserted: {}", thread_id, insert_str);
2376                    }
2377
2378                    // Small random sleep to vary timing
2379                    thread::sleep(Duration::from_millis(rng.random_range(10..100)));
2380                }
2381            });
2382            handles.push(handle);
2383        }
2384
2385        // Wait for all threads to complete
2386        for handle in handles {
2387            handle.join().unwrap();
2388        }
2389
2390        // final eviction
2391        tree.evict_tenant_by_size(max_size);
2392
2393        // Final size check
2394        let final_sizes = tree.get_used_size_per_tenant();
2395        println!("Final sizes after test completion: {final_sizes:?}");
2396
2397        for &size in final_sizes.values() {
2398            assert!(
2399                size <= max_size,
2400                "Tenant exceeds size limit. Final size: {size}, Limit: {max_size}"
2401            );
2402        }
2403    }
2404
2405    #[test]
2406    fn test_leaf_of() {
2407        let tree = Tree::new();
2408
2409        // Helper to convert leaves to strings for easier assertion
2410        let leaves_as_strings =
2411            |leaves: &[TenantId]| -> Vec<String> { leaves.iter().map(|t| t.to_string()).collect() };
2412
2413        // Single node
2414        tree.insert_text("hello", "tenant1");
2415        let leaves = Tree::leaf_of(&tree.root.children.get(&'h').unwrap());
2416        assert_eq!(leaves_as_strings(&leaves), vec!["tenant1"]);
2417
2418        // Node with multiple tenants
2419        tree.insert_text("hello", "tenant2");
2420        let leaves = Tree::leaf_of(&tree.root.children.get(&'h').unwrap());
2421        let leaves_str = leaves_as_strings(&leaves);
2422        assert_eq!(leaves_str.len(), 2);
2423        assert!(leaves_str.contains(&"tenant1".to_string()));
2424        assert!(leaves_str.contains(&"tenant2".to_string()));
2425
2426        // Non-leaf node
2427        tree.insert_text("hi", "tenant1");
2428        let leaves = Tree::leaf_of(&tree.root.children.get(&'h').unwrap());
2429        assert!(leaves.is_empty());
2430    }
2431
2432    #[test]
2433    fn test_get_used_size_per_tenant() {
2434        let tree = Tree::new();
2435
2436        // Single tenant
2437        tree.insert_text("hello", "tenant1");
2438        tree.insert_text("world", "tenant1");
2439        let sizes = tree.get_used_size_per_tenant();
2440
2441        tree.pretty_print();
2442        println!("{sizes:?}");
2443        assert_eq!(sizes.get("tenant1").unwrap(), &10); // "hello" + "world"
2444
2445        // Multiple tenants sharing nodes
2446        tree.insert_text("hello", "tenant2");
2447        tree.insert_text("help", "tenant2");
2448        let sizes = tree.get_used_size_per_tenant();
2449
2450        tree.pretty_print();
2451        println!("{sizes:?}");
2452        assert_eq!(sizes.get("tenant1").unwrap(), &10);
2453        assert_eq!(sizes.get("tenant2").unwrap(), &6); // "hello" + "p"
2454
2455        // UTF-8 characters
2456        tree.insert_text("你好", "tenant3");
2457        let sizes = tree.get_used_size_per_tenant();
2458        tree.pretty_print();
2459        println!("{sizes:?}");
2460        assert_eq!(sizes.get("tenant3").unwrap(), &2); // 2 Chinese characters
2461
2462        tree.pretty_print();
2463    }
2464
2465    #[test]
2466    fn test_prefix_match_tenant() {
2467        let tree = Tree::new();
2468
2469        // Insert overlapping prefixes for different tenants
2470        tree.insert_text("hello", "tenant1"); // tenant1: hello
2471        tree.insert_text("hello", "tenant2"); // tenant2: hello
2472        tree.insert_text("hello world", "tenant2"); // tenant2: hello -> world
2473        tree.insert_text("help", "tenant1"); // tenant1: hel -> p
2474        tree.insert_text("helicopter", "tenant2"); // tenant2: hel -> icopter
2475
2476        assert_eq!(tree.prefix_match_tenant("hello", "tenant1"), "hello"); // Full match for tenant1
2477        assert_eq!(tree.prefix_match_tenant("help", "tenant1"), "help"); // Exclusive to tenant1
2478        assert_eq!(tree.prefix_match_tenant("hel", "tenant1"), "hel"); // Shared prefix
2479        assert_eq!(tree.prefix_match_tenant("hello world", "tenant1"), "hello"); // Should stop at tenant1's boundary
2480        assert_eq!(tree.prefix_match_tenant("helicopter", "tenant1"), "hel"); // Should stop at tenant1's boundary
2481
2482        assert_eq!(tree.prefix_match_tenant("hello", "tenant2"), "hello"); // Full match for tenant2
2483        assert_eq!(
2484            tree.prefix_match_tenant("hello world", "tenant2"),
2485            "hello world"
2486        ); // Exclusive to tenant2
2487        assert_eq!(
2488            tree.prefix_match_tenant("helicopter", "tenant2"),
2489            "helicopter"
2490        ); // Exclusive to tenant2
2491        assert_eq!(tree.prefix_match_tenant("hel", "tenant2"), "hel"); // Shared prefix
2492        assert_eq!(tree.prefix_match_tenant("help", "tenant2"), "hel"); // Should stop at tenant2's boundary
2493
2494        assert_eq!(tree.prefix_match_tenant("hello", "tenant3"), ""); // Non-existent tenant
2495        assert_eq!(tree.prefix_match_tenant("help", "tenant3"), ""); // Non-existent tenant
2496    }
2497
2498    // NOTE: test_simple_tenant_eviction and test_complex_tenant_eviction removed
2499    // because remove_tenant was removed (inefficient O(n) implementation).
2500    // TODO: Re-add these tests when efficient remove_tenant is implemented.
2501
2502    // ==================== Edge Case Tests ====================
2503
2504    #[test]
2505    fn test_empty_string_input() {
2506        let tree = Tree::new();
2507
2508        // Insert empty string
2509        tree.insert_text("", "tenant1");
2510
2511        // Match empty string
2512        let (matched, tenant) = tree.prefix_match_legacy("");
2513        assert_eq!(matched, "");
2514        assert_eq!(tenant, "tenant1");
2515
2516        // Insert non-empty, then match empty
2517        tree.insert_text("hello", "tenant2");
2518        let (matched, tenant) = tree.prefix_match_legacy("");
2519        assert_eq!(matched, "");
2520        assert_eq!(tenant, "tenant1");
2521    }
2522
2523    #[test]
2524    fn test_single_character_operations() {
2525        let tree = Tree::new();
2526
2527        // Insert single characters
2528        tree.insert_text("a", "tenant1");
2529        tree.insert_text("b", "tenant2");
2530        tree.insert_text("c", "tenant1");
2531
2532        let (matched, tenant) = tree.prefix_match_legacy("a");
2533        assert_eq!(matched, "a");
2534        assert_eq!(tenant, "tenant1");
2535
2536        let (matched, tenant) = tree.prefix_match_legacy("b");
2537        assert_eq!(matched, "b");
2538        assert_eq!(tenant, "tenant2");
2539
2540        // Match with longer string starting with single char
2541        let (matched, tenant) = tree.prefix_match_legacy("abc");
2542        assert_eq!(matched, "a");
2543        assert_eq!(tenant, "tenant1");
2544    }
2545
2546    #[test]
2547    fn test_prefix_is_subset_of_existing() {
2548        let tree = Tree::new();
2549
2550        // Insert longer string first
2551        tree.insert_text("application", "tenant1");
2552
2553        // Now insert prefix of existing
2554        tree.insert_text("app", "tenant2");
2555
2556        // Match the prefix - both tenants own "app" node
2557        let (matched, tenant) = tree.prefix_match_legacy("app");
2558        assert_eq!(matched, "app");
2559        assert!(tenant == "tenant1" || tenant == "tenant2");
2560
2561        // Match longer string
2562        let (matched, tenant) = tree.prefix_match_legacy("application");
2563        assert_eq!(matched, "application");
2564        assert_eq!(tenant, "tenant1");
2565
2566        // Match "apple" - matches "app" + "l" from the child node = "appl"
2567        // Then 'e' doesn't match 'i' in the remaining suffix, so stops at 4 chars
2568        let (matched, _tenant) = tree.prefix_match_legacy("apple");
2569        assert_eq!(matched, "appl");
2570    }
2571
2572    #[test]
2573    fn test_existing_is_prefix_of_new() {
2574        let tree = Tree::new();
2575
2576        // Insert shorter string first
2577        tree.insert_text("app", "tenant1");
2578
2579        // Now insert longer string with same prefix
2580        tree.insert_text("application", "tenant2");
2581
2582        let (matched, tenant) = tree.prefix_match_legacy("app");
2583        assert_eq!(matched, "app");
2584        assert!(tenant == "tenant1" || tenant == "tenant2");
2585
2586        let (matched, tenant) = tree.prefix_match_legacy("application");
2587        assert_eq!(matched, "application");
2588        assert_eq!(tenant, "tenant2");
2589
2590        // "applesauce" matches "app" + "l" from the child node = "appl"
2591        // Then 'e' in "esauce" doesn't match 'i' in the suffix, so matching stops
2592        let (matched, _tenant) = tree.prefix_match_legacy("applesauce");
2593        assert_eq!(matched, "appl");
2594    }
2595
2596    // ==================== prefix_match_with_counts Tests ====================
2597
2598    #[test]
2599    fn test_prefix_match_with_counts_accuracy() {
2600        let tree = Tree::new();
2601
2602        tree.insert_text("hello world", "tenant1");
2603
2604        // Exact match
2605        let result = tree.match_prefix_with_counts("hello world");
2606        assert_eq!(result.matched_char_count, 11);
2607        assert_eq!(result.input_char_count, 11);
2608        assert_eq!(&*result.tenant, "tenant1");
2609
2610        // Partial match
2611        let result = tree.match_prefix_with_counts("hello");
2612        assert_eq!(result.matched_char_count, 5);
2613        assert_eq!(result.input_char_count, 5);
2614
2615        // Extended match
2616        let result = tree.match_prefix_with_counts("hello world and more");
2617        assert_eq!(result.matched_char_count, 11);
2618        assert_eq!(result.input_char_count, 20);
2619
2620        // No match
2621        let result = tree.match_prefix_with_counts("goodbye");
2622        assert_eq!(result.matched_char_count, 0);
2623        assert_eq!(result.input_char_count, 7);
2624    }
2625
2626    #[test]
2627    fn test_prefix_match_with_counts_utf8() {
2628        let tree = Tree::new();
2629
2630        // UTF-8 string: 5 characters, more bytes
2631        tree.insert_text("你好世界呀", "tenant1");
2632
2633        let result = tree.match_prefix_with_counts("你好世界呀");
2634        assert_eq!(result.matched_char_count, 5);
2635        assert_eq!(result.input_char_count, 5);
2636
2637        let result = tree.match_prefix_with_counts("你好");
2638        assert_eq!(result.matched_char_count, 2);
2639        assert_eq!(result.input_char_count, 2);
2640
2641        // Mixed ASCII and UTF-8
2642        tree.insert_text("hello你好", "tenant2");
2643        let result = tree.match_prefix_with_counts("hello你好世界");
2644        assert_eq!(result.matched_char_count, 7); // "hello你好" = 7 chars
2645        assert_eq!(result.input_char_count, 9); // "hello你好世界" = 9 chars
2646    }
2647
2648    // ==================== Node Splitting Edge Cases ====================
2649
2650    #[test]
2651    fn test_split_at_first_character() {
2652        let tree = Tree::new();
2653
2654        // Insert "abc"
2655        tree.insert_text("abc", "tenant1");
2656
2657        // Insert "aXX" - should split at first char
2658        tree.insert_text("aXX", "tenant2");
2659
2660        let (matched, tenant) = tree.prefix_match_legacy("abc");
2661        assert_eq!(matched, "abc");
2662        assert_eq!(tenant, "tenant1");
2663
2664        let (matched, tenant) = tree.prefix_match_legacy("aXX");
2665        assert_eq!(matched, "aXX");
2666        assert_eq!(tenant, "tenant2");
2667
2668        let (matched, _) = tree.prefix_match_legacy("a");
2669        assert_eq!(matched, "a");
2670    }
2671
2672    #[test]
2673    fn test_split_at_last_character() {
2674        let tree = Tree::new();
2675
2676        // Insert "abcd"
2677        tree.insert_text("abcd", "tenant1");
2678
2679        // Insert "abcX" - should split at last char of shared prefix
2680        tree.insert_text("abcX", "tenant2");
2681
2682        let (matched, tenant) = tree.prefix_match_legacy("abcd");
2683        assert_eq!(matched, "abcd");
2684        assert_eq!(tenant, "tenant1");
2685
2686        let (matched, tenant) = tree.prefix_match_legacy("abcX");
2687        assert_eq!(matched, "abcX");
2688        assert_eq!(tenant, "tenant2");
2689
2690        let (matched, _) = tree.prefix_match_legacy("abc");
2691        assert_eq!(matched, "abc");
2692    }
2693
2694    #[test]
2695    fn test_multiple_splits_same_path() {
2696        let tree = Tree::new();
2697
2698        // Create a chain of splits
2699        tree.insert_text("abcdefgh", "tenant1");
2700        tree.insert_text("abcdef", "tenant2");
2701        tree.insert_text("abcd", "tenant3");
2702        tree.insert_text("ab", "tenant4");
2703
2704        // Verify all paths work
2705        assert_eq!(tree.prefix_match_legacy("abcdefgh").0, "abcdefgh");
2706        assert_eq!(tree.prefix_match_legacy("abcdef").0, "abcdef");
2707        assert_eq!(tree.prefix_match_legacy("abcd").0, "abcd");
2708        assert_eq!(tree.prefix_match_legacy("ab").0, "ab");
2709        assert_eq!(tree.prefix_match_legacy("a").0, "a");
2710    }
2711
2712    // ==================== High Contention Stress Tests ====================
2713
2714    #[test]
2715    fn test_high_contention_same_prefix() {
2716        let tree = Arc::new(Tree::new());
2717        let num_threads = 16;
2718        let ops_per_thread = 100;
2719        let mut handles = vec![];
2720
2721        // All threads operate on strings with same prefix
2722        for thread_id in 0..num_threads {
2723            let tree = Arc::clone(&tree);
2724            let handle = thread::spawn(move || {
2725                let tenant = format!("tenant{thread_id}");
2726                for i in 0..ops_per_thread {
2727                    let text = format!("shared_prefix_{i}");
2728                    tree.insert_text(&text, &tenant);
2729
2730                    // Immediately try to match
2731                    let (matched, _) = tree.prefix_match_legacy(&text);
2732                    assert!(
2733                        matched.starts_with("shared_prefix_"),
2734                        "Match should start with shared_prefix_"
2735                    );
2736                }
2737            });
2738            handles.push(handle);
2739        }
2740
2741        for handle in handles {
2742            handle.join().expect("Thread panicked");
2743        }
2744
2745        // Verify tree is still consistent
2746        let sizes = tree.get_used_size_per_tenant();
2747        assert!(!sizes.is_empty(), "Tree should have entries");
2748    }
2749
2750    // NOTE: test_rapid_insert_remove_cycles removed because remove_tenant was removed.
2751    // TODO: Re-add this test when efficient remove_tenant is implemented.
2752
2753    // ==================== ASCII/UTF-8 Consistency Tests ====================
2754
2755    #[test]
2756    fn test_ascii_utf8_consistency() {
2757        let tree = Tree::new();
2758
2759        // Insert ASCII
2760        tree.insert_text("hello", "tenant1");
2761
2762        // Insert UTF-8 with same logical prefix (none)
2763        tree.insert_text("你好", "tenant2");
2764
2765        // Insert mixed
2766        tree.insert_text("hello你好", "tenant3");
2767
2768        // All should be retrievable
2769        assert_eq!(tree.prefix_match_legacy("hello").0, "hello");
2770        assert_eq!(tree.prefix_match_legacy("你好").0, "你好");
2771        assert_eq!(tree.prefix_match_legacy("hello你好").0, "hello你好");
2772
2773        // Counts should be correct
2774        let result = tree.match_prefix_with_counts("hello");
2775        assert_eq!(result.matched_char_count, 5);
2776        assert_eq!(result.input_char_count, 5);
2777
2778        let result = tree.match_prefix_with_counts("你好");
2779        assert_eq!(result.matched_char_count, 2);
2780        assert_eq!(result.input_char_count, 2);
2781
2782        let result = tree.match_prefix_with_counts("hello你好");
2783        assert_eq!(result.matched_char_count, 7);
2784        assert_eq!(result.input_char_count, 7);
2785    }
2786
2787    #[test]
2788    fn test_emoji_handling() {
2789        let tree = Tree::new();
2790
2791        // Emoji are multi-byte UTF-8
2792        tree.insert_text("hello 👋", "tenant1");
2793        tree.insert_text("hello 👋🌍", "tenant2");
2794
2795        let (matched, tenant) = tree.prefix_match_legacy("hello 👋");
2796        assert_eq!(matched, "hello 👋");
2797        assert_eq!(tenant, "tenant1");
2798
2799        let (matched, tenant) = tree.prefix_match_legacy("hello 👋🌍");
2800        assert_eq!(matched, "hello 👋🌍");
2801        assert_eq!(tenant, "tenant2");
2802
2803        // Verify char count (not byte count)
2804        let result = tree.match_prefix_with_counts("hello 👋");
2805        assert_eq!(result.matched_char_count, 7);
2806        assert_eq!(result.input_char_count, 7); // h-e-l-l-o-space-emoji
2807    }
2808
2809    // ==================== Eviction Edge Cases ====================
2810
2811    #[test]
2812    fn test_eviction_empty_tree() {
2813        let tree = Tree::new();
2814
2815        // Should not panic on empty tree
2816        tree.evict_tenant_by_size(100);
2817
2818        let sizes = tree.get_used_size_per_tenant();
2819        assert!(sizes.is_empty());
2820    }
2821
2822    #[test]
2823    fn test_eviction_zero_max_size() {
2824        let tree = Tree::new();
2825
2826        tree.insert_text("hello", "tenant1");
2827        tree.insert_text("world", "tenant1");
2828
2829        // Evict with max_size = 0 should remove everything
2830        tree.evict_tenant_by_size(0);
2831
2832        let sizes = tree.get_used_size_per_tenant();
2833        assert!(
2834            sizes.is_empty() || sizes.values().all(|&v| v == 0),
2835            "All tenants should be evicted or have zero size"
2836        );
2837    }
2838
2839    #[test]
2840    fn test_eviction_single_tenant_all_entries() {
2841        let tree = Tree::new();
2842
2843        // Insert many entries for single tenant
2844        for i in 0..100 {
2845            let text = format!("entry{i:03}");
2846            tree.insert_text(&text, "tenant1");
2847        }
2848
2849        let initial_size = *tree.get_used_size_per_tenant().get("tenant1").unwrap();
2850        assert!(initial_size > 50, "Should have significant size");
2851
2852        // Evict to small size
2853        tree.evict_tenant_by_size(50);
2854
2855        let final_size = *tree.get_used_size_per_tenant().get("tenant1").unwrap_or(&0);
2856        assert!(
2857            final_size <= 50,
2858            "Size {final_size} should be <= 50 after eviction"
2859        );
2860    }
2861
2862    // ==================== Last Tenant Cache Tests ====================
2863
2864    #[test]
2865    fn test_last_tenant_cache_update() {
2866        let tree = Tree::new();
2867
2868        // Insert for tenant1
2869        tree.insert_text("hello", "tenant1");
2870
2871        // First match should return tenant1
2872        let (_, tenant) = tree.prefix_match_legacy("hello");
2873        assert_eq!(tenant, "tenant1");
2874
2875        // Insert for tenant2 on same path
2876        tree.insert_text("hello", "tenant2");
2877
2878        // Match again - should still work (cache or iteration)
2879        let (matched, _) = tree.prefix_match_legacy("hello");
2880        assert_eq!(matched, "hello");
2881    }
2882
2883    // NOTE: test_stale_cache_after_tenant_removal removed because remove_tenant was removed.
2884    // TODO: Re-add this test when efficient remove_tenant is implemented.
2885
2886    // ==================== Consistency Verification Tests ====================
2887
2888    #[test]
2889    fn test_char_count_consistency_after_operations() {
2890        let tree = Tree::new();
2891
2892        // Helper to verify consistency
2893        let verify_consistency = |tree: &Tree| {
2894            let maintained = get_maintained_counts(tree);
2895            let computed = tree.get_used_size_per_tenant();
2896            assert_eq!(
2897                maintained, computed,
2898                "Maintained counts should match computed counts"
2899            );
2900        };
2901
2902        // Insert phase
2903        for i in 0..50 {
2904            tree.insert_text(&format!("prefix{i}"), "tenant1");
2905            tree.insert_text(&format!("other{i}"), "tenant2");
2906        }
2907        verify_consistency(&tree);
2908
2909        // Overlapping inserts
2910        for i in 0..25 {
2911            tree.insert_text(&format!("prefix{i}"), "tenant2");
2912        }
2913        verify_consistency(&tree);
2914
2915        // Eviction
2916        tree.evict_tenant_by_size(100);
2917        verify_consistency(&tree);
2918
2919        // NOTE: Tenant removal test removed because remove_tenant was removed.
2920        // TODO: Re-add when efficient remove_tenant is implemented.
2921    }
2922
2923    #[test]
2924    fn test_tree_structure_integrity_after_stress() {
2925        let tree = Arc::new(Tree::new());
2926        let num_threads = 8;
2927        let mut handles = vec![];
2928
2929        for thread_id in 0..num_threads {
2930            let tree = Arc::clone(&tree);
2931            let handle = thread::spawn(move || {
2932                let mut rng = rand::rng();
2933                let tenant = format!("tenant{thread_id}");
2934
2935                for _ in 0..200 {
2936                    let op: u8 = rng.random_range(0..10);
2937                    let key = format!("key{}", rng.random_range(0..50));
2938
2939                    match op {
2940                        0..=6 => {
2941                            // Insert (70%)
2942                            tree.insert_text(&key, &tenant);
2943                        }
2944                        7..=8 => {
2945                            // Match (20%)
2946                            let _ = tree.prefix_match_legacy(&key);
2947                        }
2948                        _ => {
2949                            // Match with counts (10%)
2950                            let _ = tree.match_prefix_with_counts(&key);
2951                        }
2952                    }
2953                }
2954            });
2955            handles.push(handle);
2956        }
2957
2958        for handle in handles {
2959            handle.join().expect("Thread panicked during stress test");
2960        }
2961
2962        // Verify tree is still functional
2963        let sizes = tree.get_used_size_per_tenant();
2964        for (tenant, size) in &sizes {
2965            assert!(*size > 0, "Tenant {tenant} should have positive size");
2966        }
2967
2968        // Verify char count consistency
2969        let maintained = get_maintained_counts(&tree);
2970        let computed = tree.get_used_size_per_tenant();
2971        assert_eq!(
2972            maintained, computed,
2973            "Counts should be consistent after stress test"
2974        );
2975    }
2976
2977    // ==================== Boundary Condition Tests ====================
2978
2979    #[test]
2980    fn test_very_long_strings() {
2981        let tree = Tree::new();
2982
2983        // Create a very long string (10KB)
2984        let long_string: String = (0..10000)
2985            .map(|i| ((i % 26) as u8 + b'a') as char)
2986            .collect();
2987
2988        tree.insert_text(&long_string, "tenant1");
2989
2990        let (matched, tenant) = tree.prefix_match_legacy(&long_string);
2991        assert_eq!(matched.len(), long_string.len());
2992        assert_eq!(tenant, "tenant1");
2993
2994        // Partial match of long string
2995        let partial = &long_string[..5000];
2996        let (matched, _) = tree.prefix_match_legacy(partial);
2997        assert_eq!(matched, partial);
2998    }
2999
3000    #[test]
3001    fn test_many_tenants_same_path() {
3002        let tree = Tree::new();
3003
3004        // 100 tenants all insert same string
3005        for i in 0..100 {
3006            tree.insert_text("shared_path", &format!("tenant{i}"));
3007        }
3008
3009        // Match should return one of them
3010        let (matched, _) = tree.prefix_match_legacy("shared_path");
3011        assert_eq!(matched, "shared_path");
3012
3013        // Verify all tenants are tracked
3014        let sizes = tree.get_used_size_per_tenant();
3015        assert_eq!(sizes.len(), 100, "Should have 100 tenants");
3016    }
3017
3018    #[test]
3019    fn test_special_characters() {
3020        let tree = Tree::new();
3021
3022        // Various special characters
3023        let test_cases = vec![
3024            ("hello\nworld", "tenant1"),      // newline
3025            ("hello\tworld", "tenant2"),      // tab
3026            ("hello\0world", "tenant3"),      // null byte
3027            ("hello\u{A0}world", "tenant4"),  // non-breaking space
3028            ("path/to/file", "tenant5"),      // slashes
3029            ("query?param=value", "tenant6"), // URL-like
3030        ];
3031
3032        for (text, tenant) in &test_cases {
3033            tree.insert_text(text, tenant);
3034        }
3035
3036        for (text, tenant) in &test_cases {
3037            let (matched, matched_tenant) = tree.prefix_match_legacy(text);
3038            assert_eq!(matched, *text, "Failed for: {text:?}");
3039            assert_eq!(matched_tenant, *tenant);
3040        }
3041    }
3042
3043    // ── Snapshot tests ──────────────────────────────────────────────
3044
3045    #[test]
3046    fn test_snapshot_empty_tree() {
3047        let tree = Tree::new();
3048        let snap = tree.snapshot();
3049        // Root node always present
3050        assert_eq!(snap.node_count(), 1);
3051    }
3052
3053    #[test]
3054    fn test_snapshot_round_trip_single_entry() {
3055        let tree = Tree::new();
3056        tree.insert_text("Hello world", "worker-1");
3057
3058        let snap = tree.snapshot();
3059        assert!(snap.node_count() > 0);
3060
3061        let restored = Tree::from_snapshot(&snap);
3062        let result = restored.match_prefix_with_counts("Hello world");
3063        assert_eq!(result.matched_char_count, 11);
3064        assert_eq!(result.tenant.as_ref(), "worker-1");
3065    }
3066
3067    #[test]
3068    fn test_snapshot_round_trip_shared_prefixes() {
3069        let tree = Tree::new();
3070        tree.insert_text("Hello world", "worker-1");
3071        tree.insert_text("Hello there", "worker-2");
3072        tree.insert_text("Goodbye", "worker-3");
3073
3074        let snap = tree.snapshot();
3075
3076        let restored = Tree::from_snapshot(&snap);
3077
3078        // "Hello " is shared prefix — both "world" and "there" branch from it
3079        let r1 = restored.match_prefix_with_counts("Hello world");
3080        assert_eq!(r1.matched_char_count, 11);
3081        assert_eq!(r1.tenant.as_ref(), "worker-1");
3082
3083        let r2 = restored.match_prefix_with_counts("Hello there");
3084        assert_eq!(r2.matched_char_count, 11);
3085        assert_eq!(r2.tenant.as_ref(), "worker-2");
3086
3087        let r3 = restored.match_prefix_with_counts("Goodbye");
3088        assert_eq!(r3.matched_char_count, 7);
3089        assert_eq!(r3.tenant.as_ref(), "worker-3");
3090    }
3091
3092    #[test]
3093    fn test_snapshot_size_vs_flat_ops() {
3094        let tree = Tree::new();
3095        // Insert 100 entries sharing a long prefix
3096        let prefix = "A".repeat(10000);
3097        for i in 0..100 {
3098            let text = format!("{prefix}_{i}");
3099            tree.insert_text(&text, &format!("worker-{i}"));
3100        }
3101
3102        let snap = tree.snapshot();
3103        let snap_bytes = snap.to_bytes().unwrap();
3104
3105        // Flat ops would be 100 × 10000 chars = ~1 MB
3106        // Snapshot should be much smaller (shared prefix stored once)
3107        let flat_size: usize = (0..100).map(|i| format!("{prefix}_{i}").len()).sum();
3108
3109        assert!(
3110            snap_bytes.len() < flat_size / 2,
3111            "Snapshot ({} bytes) should be at least 2x smaller than flat ops ({} bytes)",
3112            snap_bytes.len(),
3113            flat_size
3114        );
3115    }
3116
3117    #[test]
3118    fn test_snapshot_bincode_round_trip() {
3119        let tree = Tree::new();
3120        tree.insert_text("Hello world", "worker-1");
3121        tree.insert_text("Hello there", "worker-2");
3122
3123        let snap = tree.snapshot();
3124        let bytes = snap.to_bytes().unwrap();
3125        let restored_snap = crate::snapshot::TreeSnapshot::from_bytes(&bytes).unwrap();
3126
3127        let restored = Tree::from_snapshot(&restored_snap);
3128        let r = restored.match_prefix_with_counts("Hello world");
3129        assert_eq!(r.matched_char_count, 11);
3130    }
3131
3132    #[test]
3133    fn test_merge_disjoint_trees() {
3134        let tree1 = Tree::new();
3135        tree1.insert_text("Hello", "worker-1");
3136
3137        let tree2 = Tree::new();
3138        tree2.insert_text("Goodbye", "worker-2");
3139
3140        let snap2 = tree2.snapshot();
3141        tree1.merge_snapshot(&snap2);
3142
3143        // tree1 should now have both entries
3144        let r1 = tree1.match_prefix_with_counts("Hello");
3145        assert_eq!(r1.matched_char_count, 5);
3146
3147        let r2 = tree1.match_prefix_with_counts("Goodbye");
3148        assert_eq!(r2.matched_char_count, 7);
3149        assert_eq!(r2.tenant.as_ref(), "worker-2");
3150    }
3151
3152    #[test]
3153    fn test_merge_overlapping_trees() {
3154        let tree1 = Tree::new();
3155        tree1.insert_text("Hello world", "worker-1");
3156
3157        let tree2 = Tree::new();
3158        tree2.insert_text("Hello there", "worker-2");
3159
3160        let snap2 = tree2.snapshot();
3161        tree1.merge_snapshot(&snap2);
3162
3163        // tree1 should have both branches under "Hello "
3164        let r1 = tree1.match_prefix_with_counts("Hello world");
3165        assert_eq!(r1.matched_char_count, 11);
3166
3167        let r2 = tree1.match_prefix_with_counts("Hello there");
3168        assert_eq!(r2.matched_char_count, 11);
3169        assert_eq!(r2.tenant.as_ref(), "worker-2");
3170    }
3171
3172    #[test]
3173    fn test_iter_entries_empty_tree() {
3174        let tree = Tree::new();
3175        let entries: Vec<_> = tree.iter_entries().collect();
3176        assert!(entries.is_empty());
3177    }
3178
3179    #[test]
3180    fn test_iter_entries_single_insert() {
3181        let tree = Tree::new();
3182        tree.insert_text("hello", "worker-1");
3183        let entries: Vec<_> = tree
3184            .iter_entries()
3185            .map(|(p, ts)| (p, ts.into_iter().map(|(t, _)| t).collect::<Vec<_>>()))
3186            .collect();
3187        // Worker registration would also tag root with the tenant,
3188        // but we only call insert_text. Root has no tenants here;
3189        // the leaf "hello" carries `worker-1`.
3190        let leaf = entries
3191            .iter()
3192            .find(|(p, _)| p == "hello")
3193            .expect("leaf with tenants");
3194        assert_eq!(leaf.1.len(), 1);
3195        assert_eq!(leaf.1[0].as_ref(), "worker-1");
3196    }
3197
3198    #[test]
3199    fn test_iter_entries_pre_order_and_split_nodes() {
3200        // Two inserts that share a prefix produce a split node.
3201        // The radix tree registers tenants at root and copies the
3202        // tenant map to intermediate split nodes, so every level
3203        // along the shared path emits an entry. Pre-order means
3204        // "" before "Hello " before its leaves; deterministic
3205        // char order gives "Hello there" before "Hello world"
3206        // (t < w).
3207        let tree = Tree::new();
3208        tree.insert_text("Hello world", "w1");
3209        tree.insert_text("Hello there", "w2");
3210
3211        let paths: Vec<String> = tree.iter_entries().map(|(p, _)| p).collect();
3212        assert_eq!(paths, vec!["", "Hello ", "Hello there", "Hello world"],);
3213    }
3214
3215    #[test]
3216    fn test_iter_entries_round_trips_via_snapshot() {
3217        // The walker yields the same `(path, tenants)` set that
3218        // building a tree from a snapshot would round-trip.
3219        let tree = Tree::new();
3220        tree.insert_text("alpha", "w1");
3221        tree.insert_text("alpha-beta", "w2");
3222        tree.insert_text("zulu", "w3");
3223
3224        let mut from_iter: Vec<(String, Vec<String>)> = tree
3225            .iter_entries()
3226            .map(|(p, ts)| (p, ts.into_iter().map(|(t, _)| t.to_string()).collect()))
3227            .collect();
3228        from_iter.sort();
3229
3230        let restored = Tree::from_snapshot(&tree.snapshot());
3231        let mut from_restored: Vec<(String, Vec<String>)> = restored
3232            .iter_entries()
3233            .map(|(p, ts)| (p, ts.into_iter().map(|(t, _)| t.to_string()).collect()))
3234            .collect();
3235        from_restored.sort();
3236
3237        assert_eq!(from_iter, from_restored);
3238    }
3239
3240    #[test]
3241    fn test_iter_entries_preserves_utf8_paths() {
3242        // Multi-byte chars must not produce torn paths when the
3243        // walker pops a frame. Path is truncated by byte length
3244        // — safe because each descent pushes a complete `&str`,
3245        // so popping the same byte count always lands on a UTF-8
3246        // char boundary (`String::truncate` would panic if not).
3247        // Also exercises the split-node case where the shared
3248        // "你好" prefix node emits with a clean multi-byte path.
3249        let tree = Tree::new();
3250        tree.insert_text("你好世界", "w1");
3251        tree.insert_text("你好朋友", "w2");
3252
3253        let mut paths: Vec<String> = tree.iter_entries().map(|(p, _)| p).collect();
3254        paths.sort();
3255        assert_eq!(paths, vec!["", "你好", "你好世界", "你好朋友"]);
3256    }
3257
3258    /// Run the same op sequence two ways — `match_prefix_with_counts` +
3259    /// `insert_text` (legacy pair) vs the fused `match_and_insert` — and assert
3260    /// the match counts and resulting per-tenant char counts agree. Timestamps
3261    /// and the probabilistic tenant cache are not compared.
3262    fn assert_fused_matches_pair(ops: &[(&str, &str)]) {
3263        let pair = Tree::new();
3264        let fused = Tree::new();
3265        for (text, tenant) in ops {
3266            let r_pair = pair.match_prefix_with_counts(text);
3267            pair.insert_text(text, tenant);
3268
3269            let r_fused = fused.match_and_insert(text, tenant);
3270
3271            assert_eq!(
3272                r_pair.matched_char_count, r_fused.matched_char_count,
3273                "matched_char_count mismatch for tenant {tenant}"
3274            );
3275            assert_eq!(
3276                r_pair.input_char_count, r_fused.input_char_count,
3277                "input_char_count mismatch"
3278            );
3279            assert_eq!(
3280                r_pair.tenant.as_ref(),
3281                r_fused.tenant.as_ref(),
3282                "matched tenant mismatch for input {text:?}"
3283            );
3284        }
3285        assert_eq!(
3286            pair.get_tenant_char_count(),
3287            fused.get_tenant_char_count(),
3288            "tenant char counts diverged between pair and fused"
3289        );
3290    }
3291
3292    #[test]
3293    fn test_string_match_and_insert_equiv_basic() {
3294        // Note: every query here either misses on an empty tree (resolves to the
3295        // synthetic "empty") or falls off on a node whose `last_tenant` is
3296        // single-valued, so the (otherwise approximate) tenant pick is
3297        // deterministic and comparable between the two construction paths.
3298        assert_fused_matches_pair(&[
3299            ("hello world", "w1"), // fresh leaf
3300            ("hello world", "w1"), // exact re-insert (full match, loop-end)
3301            ("hello there", "w2"), // shared "hello " prefix -> split
3302            ("hello", "w3"),       // prefix of "hello " split node
3303        ]);
3304    }
3305
3306    #[test]
3307    fn test_string_match_and_insert_equiv_deep_chain() {
3308        // Build a multi-node chain ("abc" -> {"def","xyz"}), then a query that
3309        // FULL-matches several nodes before falling off — exercising the fused
3310        // path's ancestor re-attach (`path`) plus a deep `insert_from` splice.
3311        assert_fused_matches_pair(&[
3312            ("abcdef", "w1"),    // leaf "abcdef"
3313            ("abcxyz", "w2"),    // split -> "abc" + "def"/"xyz"
3314            ("abcdefghi", "w1"), // full-match "abc"+"def", then append "ghi"
3315            ("abcdefghi", "w3"), // full-match whole chain for a new tenant
3316        ]);
3317    }
3318
3319    #[test]
3320    fn test_string_match_and_insert_equiv_empty_and_unicode() {
3321        assert_fused_matches_pair(&[
3322            ("", "w1"),         // empty text -> tenant attached at root
3323            ("你好世界", "w1"), // multi-byte fresh
3324            ("你好朋友", "w2"), // multi-byte shared-prefix split
3325        ]);
3326    }
3327
3328    #[test]
3329    fn test_string_match_and_insert_with_select_and_skip() {
3330        let text = "the quick brown fox";
3331
3332        let via_with = Tree::new();
3333        via_with.match_and_insert_with(text, |_| Some("w1"));
3334        let via_plain = Tree::new();
3335        via_plain.match_and_insert(text, "w1");
3336        assert_eq!(
3337            via_with.get_tenant_char_count(),
3338            via_plain.get_tenant_char_count()
3339        );
3340
3341        // None selection must leave the tree untouched.
3342        let tree = Tree::new();
3343        tree.insert_text(text, "w1");
3344        let before = tree.get_tenant_char_count();
3345        let r = tree.match_and_insert_with(text, |_| None);
3346        assert_eq!(r.matched_char_count, text.chars().count());
3347        assert_eq!(
3348            tree.get_tenant_char_count(),
3349            before,
3350            "None selection must not insert"
3351        );
3352    }
3353}