sochdb_core/
path_trie.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Path Trie for TOON Document Path Resolution (Task 4)
16//!
17//! Implements O(|path|) path resolution for nested TOON documents,
18//! regardless of schema size.
19//!
20//! ## Performance Analysis
21//!
22//! ```text
23//! Traditional BTreeMap: O(d × log F) where d=depth, F=fields
24//! Path Trie:           O(d) where d=path depth
25//!
26//! For path="users.profile.settings.theme" (d=4) with F=100 fields:
27//! - BTreeMap: 4 × log(100) ≈ 26.6 comparisons
28//! - PathTrie: 4 lookups = 4 hash lookups
29//! ```
30//!
31//! ## Memory Model
32//!
33//! Memory: O(N × avg_path_length) where N = total columns
34//! For 100 tables × 50 cols × 3 levels = 15,000 nodes × ~100 bytes = 1.5MB
35
36use dashmap::DashMap;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
41
42/// Column identifier (matches storage layer)
43pub type ColumnId = u32;
44
45/// A node in the path trie
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct TrieNode {
48    /// Column ID if this path is a leaf (terminal column)
49    pub column_id: Option<ColumnId>,
50    /// Column type for leaf nodes
51    pub column_type: Option<ColumnType>,
52    /// Children nodes keyed by path segment
53    pub children: HashMap<String, Box<TrieNode>>,
54}
55
56impl TrieNode {
57    /// Create an empty node
58    pub fn new() -> Self {
59        Self {
60            column_id: None,
61            column_type: None,
62            children: HashMap::new(),
63        }
64    }
65
66    /// Create a leaf node with column information
67    pub fn leaf(column_id: ColumnId, column_type: ColumnType) -> Self {
68        Self {
69            column_id: Some(column_id),
70            column_type: Some(column_type),
71            children: HashMap::new(),
72        }
73    }
74
75    /// Check if this is a leaf node
76    pub fn is_leaf(&self) -> bool {
77        self.column_id.is_some()
78    }
79
80    /// Count total nodes in subtree
81    pub fn count_nodes(&self) -> usize {
82        1 + self
83            .children
84            .values()
85            .map(|c| c.count_nodes())
86            .sum::<usize>()
87    }
88}
89
90impl Default for TrieNode {
91    fn default() -> Self {
92        Self::new()
93    }
94}
95
96/// Column type for physical storage optimization
97#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
98pub enum ColumnType {
99    /// Boolean (1 byte, SIMD groupable)
100    Bool,
101    /// Signed 64-bit integer (8 bytes, SIMD groupable)
102    Int64,
103    /// Unsigned 64-bit integer (8 bytes, SIMD groupable)
104    UInt64,
105    /// 64-bit float (8 bytes, SIMD groupable)
106    Float64,
107    /// Variable-length text
108    Text,
109    /// Variable-length binary
110    Binary,
111    /// Timestamp (8 bytes, SIMD groupable)
112    Timestamp,
113    /// Nested TOON document
114    Nested,
115    /// Array of values
116    Array,
117}
118
119impl ColumnType {
120    /// Check if type is fixed-size (for SIMD optimization)
121    pub fn is_fixed_size(&self) -> bool {
122        matches!(
123            self,
124            ColumnType::Bool
125                | ColumnType::Int64
126                | ColumnType::UInt64
127                | ColumnType::Float64
128                | ColumnType::Timestamp
129        )
130    }
131
132    /// Get fixed size in bytes (None for variable-length)
133    pub fn fixed_size(&self) -> Option<usize> {
134        match self {
135            ColumnType::Bool => Some(1),
136            ColumnType::Int64
137            | ColumnType::UInt64
138            | ColumnType::Float64
139            | ColumnType::Timestamp => Some(8),
140            _ => None,
141        }
142    }
143}
144
145/// Path Trie for O(|path|) TOON path resolution
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct PathTrie {
148    /// Root node
149    root: TrieNode,
150    /// Total number of columns registered
151    total_columns: u32,
152    /// Next column ID to assign
153    next_column_id: ColumnId,
154}
155
156impl PathTrie {
157    /// Create a new empty trie
158    pub fn new() -> Self {
159        Self {
160            root: TrieNode::new(),
161            total_columns: 0,
162            next_column_id: 0,
163        }
164    }
165
166    /// Insert a path and get its column ID
167    ///
168    /// Path format: "users.profile.settings.theme"
169    /// Returns the assigned column ID
170    pub fn insert(&mut self, path: &str, column_type: ColumnType) -> ColumnId {
171        let segments: Vec<&str> = path.split('.').collect();
172        let column_id = self.next_column_id;
173        self.next_column_id += 1;
174
175        let mut current = &mut self.root;
176
177        for (i, segment) in segments.iter().enumerate() {
178            let is_last = i == segments.len() - 1;
179
180            current = current
181                .children
182                .entry(segment.to_string())
183                .or_insert_with(|| Box::new(TrieNode::new()));
184
185            if is_last {
186                current.column_id = Some(column_id);
187                current.column_type = Some(column_type);
188            }
189        }
190
191        self.total_columns += 1;
192        column_id
193    }
194
195    /// Resolve a path to its column ID in O(|path|) time
196    ///
197    /// Returns None if path doesn't exist
198    pub fn resolve(&self, path: &str) -> Option<ColumnId> {
199        let segments: Vec<&str> = path.split('.').collect();
200        let mut current = &self.root;
201
202        for segment in segments {
203            current = current.children.get(segment)?;
204        }
205
206        current.column_id
207    }
208
209    /// Resolve with type information
210    pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
211        let segments: Vec<&str> = path.split('.').collect();
212        let mut current = &self.root;
213
214        for segment in segments {
215            current = current.children.get(segment)?;
216        }
217
218        Some((current.column_id?, current.column_type?))
219    }
220
221    /// Get all paths that start with a prefix
222    ///
223    /// Useful for wildcard queries like "users.profile.*"
224    pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
225        let mut results = Vec::new();
226
227        // Navigate to prefix node
228        let segments: Vec<&str> = if prefix.is_empty() {
229            vec![]
230        } else {
231            prefix.split('.').collect()
232        };
233
234        let mut current = &self.root;
235        for segment in &segments {
236            if let Some(child) = current.children.get(*segment) {
237                current = child;
238            } else {
239                return results;
240            }
241        }
242
243        // Collect all paths under this node
244        self.collect_paths(current, prefix.to_string(), &mut results);
245        results
246    }
247
248    #[allow(clippy::only_used_in_recursion)]
249    fn collect_paths(&self, node: &TrieNode, path: String, results: &mut Vec<(String, ColumnId)>) {
250        if let Some(col_id) = node.column_id {
251            results.push((path.clone(), col_id));
252        }
253
254        for (segment, child) in &node.children {
255            let child_path = if path.is_empty() {
256                segment.clone()
257            } else {
258                format!("{}.{}", path, segment)
259            };
260            self.collect_paths(child, child_path, results);
261        }
262    }
263
264    /// Get total number of columns
265    pub fn total_columns(&self) -> u32 {
266        self.total_columns
267    }
268
269    /// Get total number of nodes (memory usage indicator)
270    pub fn total_nodes(&self) -> usize {
271        self.root.count_nodes()
272    }
273
274    /// Estimate memory usage in bytes
275    pub fn memory_bytes(&self) -> usize {
276        // Rough estimate: ~100 bytes per node (HashMap overhead + strings)
277        self.total_nodes() * 100
278    }
279}
280
281impl Default for PathTrie {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287/// Column group affinity for compression optimization
288///
289/// Groups columns by:
290/// 1. Type compatibility (all Int64 together for SIMD)
291/// 2. Access correlation (co-accessed columns together)
292/// 3. Null density (sparse columns separate)
293#[derive(Debug, Clone)]
294pub struct ColumnGroupAffinity {
295    /// Columns grouped by type for SIMD optimization
296    pub type_groups: HashMap<ColumnType, Vec<ColumnId>>,
297    /// Access frequency per column (for hot/cold separation)
298    pub access_frequency: HashMap<ColumnId, u64>,
299    /// Null density per column (0.0 = never null, 1.0 = always null)
300    pub null_density: HashMap<ColumnId, f64>,
301}
302
303impl ColumnGroupAffinity {
304    /// Create from a path trie
305    pub fn from_trie(trie: &PathTrie) -> Self {
306        let mut type_groups: HashMap<ColumnType, Vec<ColumnId>> = HashMap::new();
307
308        fn collect_columns(node: &TrieNode, groups: &mut HashMap<ColumnType, Vec<ColumnId>>) {
309            if let (Some(col_id), Some(col_type)) = (node.column_id, node.column_type) {
310                groups.entry(col_type).or_default().push(col_id);
311            }
312            for child in node.children.values() {
313                collect_columns(child, groups);
314            }
315        }
316
317        collect_columns(&trie.root, &mut type_groups);
318
319        Self {
320            type_groups,
321            access_frequency: HashMap::new(),
322            null_density: HashMap::new(),
323        }
324    }
325
326    /// Record a column access (for access correlation)
327    pub fn record_access(&mut self, column_id: ColumnId) {
328        *self.access_frequency.entry(column_id).or_insert(0) += 1;
329    }
330
331    /// Update null density for a column
332    pub fn update_null_density(&mut self, column_id: ColumnId, null_count: u64, total_count: u64) {
333        if total_count > 0 {
334            self.null_density
335                .insert(column_id, null_count as f64 / total_count as f64);
336        }
337    }
338
339    /// Get columns suitable for SIMD processing (fixed-size, same type)
340    pub fn simd_groups(&self) -> Vec<(ColumnType, Vec<ColumnId>)> {
341        self.type_groups
342            .iter()
343            .filter(|(t, _)| t.is_fixed_size())
344            .map(|(t, cols)| (*t, cols.clone()))
345            .collect()
346    }
347
348    /// Get sparse columns (null_density > threshold)
349    pub fn sparse_columns(&self, threshold: f64) -> Vec<ColumnId> {
350        self.null_density
351            .iter()
352            .filter(|(_, density)| **density > threshold)
353            .map(|(col_id, _)| *col_id)
354            .collect()
355    }
356
357    /// Get hot columns (top N by access frequency)
358    pub fn hot_columns(&self, n: usize) -> Vec<ColumnId> {
359        let mut sorted: Vec<_> = self.access_frequency.iter().collect();
360        sorted.sort_by(|a, b| b.1.cmp(a.1));
361        sorted
362            .into_iter()
363            .take(n)
364            .map(|(col_id, _)| *col_id)
365            .collect()
366    }
367}
368
369// ============================================================================
370// CONCURRENT PATH TRIE WITH EPOCH-BASED RECLAMATION
371// ============================================================================
372
373/// A node in the concurrent path trie using epoch-based reclamation
374#[derive(Debug)]
375pub struct ConcurrentTrieNode {
376    /// Column ID if this path is a leaf (terminal column)
377    pub column_id: Option<ColumnId>,
378    /// Column type for leaf nodes
379    pub column_type: Option<ColumnType>,
380    /// Children nodes keyed by path segment (lock-free via DashMap)
381    pub children: DashMap<String, Arc<ConcurrentTrieNode>>,
382    /// Epoch when this node was created (for reclamation)
383    pub created_epoch: u64,
384}
385
386impl ConcurrentTrieNode {
387    /// Create an empty node
388    pub fn new(epoch: u64) -> Self {
389        Self {
390            column_id: None,
391            column_type: None,
392            children: DashMap::new(),
393            created_epoch: epoch,
394        }
395    }
396
397    /// Create a leaf node with column information
398    pub fn leaf(column_id: ColumnId, column_type: ColumnType, epoch: u64) -> Self {
399        Self {
400            column_id: Some(column_id),
401            column_type: Some(column_type),
402            children: DashMap::new(),
403            created_epoch: epoch,
404        }
405    }
406
407    /// Check if this is a leaf node
408    pub fn is_leaf(&self) -> bool {
409        self.column_id.is_some()
410    }
411
412    /// Count total nodes in subtree
413    pub fn count_nodes(&self) -> usize {
414        1 + self
415            .children
416            .iter()
417            .map(|r| r.value().count_nodes())
418            .sum::<usize>()
419    }
420}
421
422/// Concurrent Path Trie with lock-free reads and epoch-based reclamation
423///
424/// This trie supports:
425/// - Lock-free concurrent reads via DashMap
426/// - Concurrent inserts with fine-grained locking
427/// - Epoch-based garbage collection for safe memory reclamation
428///
429/// ## Performance
430///
431/// - Read: O(|path|) with no locking (DashMap provides lock-free reads)
432/// - Insert: O(|path|) with minimal lock contention
433/// - Memory reclamation: Deferred until no readers access old nodes
434#[derive(Debug)]
435pub struct ConcurrentPathTrie {
436    /// Root node (never changes, only children do)
437    root: Arc<ConcurrentTrieNode>,
438    /// Total number of columns registered
439    total_columns: AtomicU32,
440    /// Next column ID to assign
441    next_column_id: AtomicU32,
442    /// Current epoch for versioning
443    current_epoch: AtomicU64,
444    /// Minimum active reader epoch (for GC)
445    min_reader_epoch: AtomicU64,
446    /// Reader count per epoch (for tracking when to collect)
447    reader_epochs: DashMap<u64, AtomicU32>,
448}
449
450impl ConcurrentPathTrie {
451    /// Create a new empty concurrent trie
452    pub fn new() -> Self {
453        Self {
454            root: Arc::new(ConcurrentTrieNode::new(0)),
455            total_columns: AtomicU32::new(0),
456            next_column_id: AtomicU32::new(0),
457            current_epoch: AtomicU64::new(1),
458            min_reader_epoch: AtomicU64::new(0),
459            reader_epochs: DashMap::new(),
460        }
461    }
462
463    /// Get current epoch
464    pub fn current_epoch(&self) -> u64 {
465        self.current_epoch.load(Ordering::Acquire)
466    }
467
468    /// Advance epoch (call periodically to allow GC)
469    pub fn advance_epoch(&self) -> u64 {
470        self.current_epoch.fetch_add(1, Ordering::AcqRel)
471    }
472
473    /// Begin a read operation (returns epoch guard)
474    /// The guard must be held for the duration of the read
475    pub fn begin_read(&self) -> ReadGuard<'_> {
476        let epoch = self.current_epoch.load(Ordering::Acquire);
477
478        // Increment reader count for this epoch
479        self.reader_epochs
480            .entry(epoch)
481            .or_insert_with(|| AtomicU32::new(0))
482            .fetch_add(1, Ordering::Relaxed);
483
484        ReadGuard { trie: self, epoch }
485    }
486
487    /// Insert a path and get its column ID (thread-safe)
488    ///
489    /// Path format: "users.profile.settings.theme"
490    /// Returns the assigned column ID
491    pub fn insert(&self, path: &str, column_type: ColumnType) -> ColumnId {
492        let segments: Vec<&str> = path.split('.').collect();
493        let column_id = self.next_column_id.fetch_add(1, Ordering::Relaxed);
494        let epoch = self.current_epoch.load(Ordering::Acquire);
495
496        let mut current = self.root.clone();
497
498        for (i, segment) in segments.iter().enumerate() {
499            let is_last = i == segments.len() - 1;
500
501            if is_last {
502                // Create or update leaf node
503                let leaf = Arc::new(ConcurrentTrieNode::leaf(column_id, column_type, epoch));
504                current.children.insert(segment.to_string(), leaf);
505            } else {
506                // Get or create intermediate node
507                let next = current
508                    .children
509                    .entry(segment.to_string())
510                    .or_insert_with(|| Arc::new(ConcurrentTrieNode::new(epoch)))
511                    .clone();
512                current = next;
513            }
514        }
515
516        self.total_columns.fetch_add(1, Ordering::Relaxed);
517        column_id
518    }
519
520    /// Resolve a path to its column ID in O(|path|) time (lock-free)
521    ///
522    /// Returns None if path doesn't exist
523    pub fn resolve(&self, path: &str) -> Option<ColumnId> {
524        let segments: Vec<&str> = path.split('.').collect();
525        let mut current = self.root.clone();
526
527        for segment in segments {
528            let next = current.children.get(segment)?.clone();
529            current = next;
530        }
531
532        current.column_id
533    }
534
535    /// Resolve with type information (lock-free)
536    pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
537        let segments: Vec<&str> = path.split('.').collect();
538        let mut current = self.root.clone();
539
540        for segment in segments {
541            let next = current.children.get(segment)?.clone();
542            current = next;
543        }
544
545        Some((current.column_id?, current.column_type?))
546    }
547
548    /// Get all paths that start with a prefix (lock-free)
549    pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
550        let mut results = Vec::new();
551
552        let segments: Vec<&str> = if prefix.is_empty() {
553            vec![]
554        } else {
555            prefix.split('.').collect()
556        };
557
558        let mut current = self.root.clone();
559        for segment in &segments {
560            let next = match current.children.get(*segment) {
561                Some(child) => child.clone(),
562                None => return results,
563            };
564            current = next;
565        }
566
567        self.collect_paths(&current, prefix.to_string(), &mut results);
568        results
569    }
570
571    #[allow(clippy::only_used_in_recursion)]
572    fn collect_paths(
573        &self,
574        node: &ConcurrentTrieNode,
575        path: String,
576        results: &mut Vec<(String, ColumnId)>,
577    ) {
578        if let Some(col_id) = node.column_id {
579            results.push((path.clone(), col_id));
580        }
581
582        for entry in node.children.iter() {
583            let child_path = if path.is_empty() {
584                entry.key().clone()
585            } else {
586                format!("{}.{}", path, entry.key())
587            };
588            self.collect_paths(entry.value(), child_path, results);
589        }
590    }
591
592    /// Get total number of columns
593    pub fn total_columns(&self) -> u32 {
594        self.total_columns.load(Ordering::Relaxed)
595    }
596
597    /// Get total number of nodes (memory usage indicator)
598    pub fn total_nodes(&self) -> usize {
599        self.root.count_nodes()
600    }
601
602    /// Update minimum reader epoch (call after readers complete)
603    pub fn update_min_reader_epoch(&self) {
604        let mut min_epoch = self.current_epoch.load(Ordering::Acquire);
605
606        // Find minimum epoch with active readers
607        for entry in self.reader_epochs.iter() {
608            if entry.value().load(Ordering::Relaxed) > 0 && *entry.key() < min_epoch {
609                min_epoch = *entry.key();
610            }
611        }
612
613        self.min_reader_epoch.store(min_epoch, Ordering::Release);
614
615        // Clean up old epoch entries
616        let threshold = min_epoch.saturating_sub(10);
617        self.reader_epochs
618            .retain(|epoch, count| *epoch >= threshold || count.load(Ordering::Relaxed) > 0);
619    }
620
621    /// Get minimum reader epoch (nodes older than this can be reclaimed)
622    pub fn min_reader_epoch(&self) -> u64 {
623        self.min_reader_epoch.load(Ordering::Acquire)
624    }
625}
626
627impl Default for ConcurrentPathTrie {
628    fn default() -> Self {
629        Self::new()
630    }
631}
632
633// Make ConcurrentPathTrie Send + Sync
634unsafe impl Send for ConcurrentPathTrie {}
635unsafe impl Sync for ConcurrentPathTrie {}
636
637/// RAII guard for read operations
638/// Decrements reader count when dropped
639pub struct ReadGuard<'a> {
640    trie: &'a ConcurrentPathTrie,
641    epoch: u64,
642}
643
644impl<'a> ReadGuard<'a> {
645    /// Get the epoch this read is pinned to
646    pub fn epoch(&self) -> u64 {
647        self.epoch
648    }
649}
650
651impl<'a> Drop for ReadGuard<'a> {
652    fn drop(&mut self) {
653        if let Some(count) = self.trie.reader_epochs.get(&self.epoch) {
654            count.fetch_sub(1, Ordering::Relaxed);
655        }
656    }
657}
658
659#[cfg(test)]
660mod tests {
661    use super::*;
662
663    #[test]
664    fn test_path_trie_insert_resolve() {
665        let mut trie = PathTrie::new();
666
667        let id1 = trie.insert("users.id", ColumnType::UInt64);
668        let id2 = trie.insert("users.name", ColumnType::Text);
669        let id3 = trie.insert("users.profile.email", ColumnType::Text);
670        let id4 = trie.insert("users.profile.settings.theme", ColumnType::Text);
671
672        assert_eq!(trie.resolve("users.id"), Some(id1));
673        assert_eq!(trie.resolve("users.name"), Some(id2));
674        assert_eq!(trie.resolve("users.profile.email"), Some(id3));
675        assert_eq!(trie.resolve("users.profile.settings.theme"), Some(id4));
676        assert_eq!(trie.resolve("nonexistent"), None);
677        assert_eq!(trie.resolve("users.profile"), None); // Not a leaf
678
679        assert_eq!(trie.total_columns(), 4);
680    }
681
682    #[test]
683    fn test_path_trie_prefix_match() {
684        let mut trie = PathTrie::new();
685
686        trie.insert("users.id", ColumnType::UInt64);
687        trie.insert("users.name", ColumnType::Text);
688        trie.insert("users.profile.email", ColumnType::Text);
689        trie.insert("orders.id", ColumnType::UInt64);
690
691        let user_cols = trie.prefix_match("users");
692        assert_eq!(user_cols.len(), 3);
693
694        let profile_cols = trie.prefix_match("users.profile");
695        assert_eq!(profile_cols.len(), 1);
696
697        let all_cols = trie.prefix_match("");
698        assert_eq!(all_cols.len(), 4);
699    }
700
701    #[test]
702    fn test_resolve_with_type() {
703        let mut trie = PathTrie::new();
704
705        trie.insert("score", ColumnType::Float64);
706        trie.insert("name", ColumnType::Text);
707
708        let (id, col_type) = trie.resolve_with_type("score").unwrap();
709        assert_eq!(id, 0);
710        assert_eq!(col_type, ColumnType::Float64);
711
712        let (id, col_type) = trie.resolve_with_type("name").unwrap();
713        assert_eq!(id, 1);
714        assert_eq!(col_type, ColumnType::Text);
715    }
716
717    #[test]
718    fn test_column_group_affinity() {
719        let mut trie = PathTrie::new();
720
721        trie.insert("id", ColumnType::UInt64);
722        trie.insert("score", ColumnType::Float64);
723        trie.insert("age", ColumnType::Int64);
724        trie.insert("name", ColumnType::Text);
725        trie.insert("timestamp", ColumnType::Timestamp);
726
727        let mut affinity = ColumnGroupAffinity::from_trie(&trie);
728
729        // Check SIMD groups
730        let simd = affinity.simd_groups();
731        assert!(!simd.is_empty());
732
733        // Record accesses
734        affinity.record_access(0);
735        affinity.record_access(0);
736        affinity.record_access(1);
737
738        let hot = affinity.hot_columns(2);
739        assert_eq!(hot.len(), 2);
740        assert_eq!(hot[0], 0); // Most accessed
741
742        // Check sparse columns
743        affinity.update_null_density(3, 90, 100);
744        let sparse = affinity.sparse_columns(0.5);
745        assert_eq!(sparse, vec![3]);
746    }
747
748    #[test]
749    fn test_memory_estimate() {
750        let mut trie = PathTrie::new();
751
752        for i in 0..100 {
753            trie.insert(
754                &format!("table{}.column{}", i / 10, i % 10),
755                ColumnType::UInt64,
756            );
757        }
758
759        // ~10 tables × ~10 columns × 2-3 levels = ~300 nodes
760        assert!(trie.total_nodes() > 100);
761        assert!(trie.memory_bytes() > 10000); // At least 10KB
762    }
763
764    // ========================================================================
765    // CONCURRENT PATH TRIE TESTS
766    // ========================================================================
767
768    #[test]
769    fn test_concurrent_trie_basic() {
770        let trie = ConcurrentPathTrie::new();
771
772        let id1 = trie.insert("users.id", ColumnType::UInt64);
773        let id2 = trie.insert("users.name", ColumnType::Text);
774        let id3 = trie.insert("users.profile.email", ColumnType::Text);
775
776        assert_eq!(trie.resolve("users.id"), Some(id1));
777        assert_eq!(trie.resolve("users.name"), Some(id2));
778        assert_eq!(trie.resolve("users.profile.email"), Some(id3));
779        assert_eq!(trie.resolve("nonexistent"), None);
780
781        assert_eq!(trie.total_columns(), 3);
782    }
783
784    #[test]
785    fn test_concurrent_trie_resolve_with_type() {
786        let trie = ConcurrentPathTrie::new();
787
788        trie.insert("score", ColumnType::Float64);
789        trie.insert("name", ColumnType::Text);
790
791        let (id, col_type) = trie.resolve_with_type("score").unwrap();
792        assert_eq!(id, 0);
793        assert_eq!(col_type, ColumnType::Float64);
794
795        let (id, col_type) = trie.resolve_with_type("name").unwrap();
796        assert_eq!(id, 1);
797        assert_eq!(col_type, ColumnType::Text);
798    }
799
800    #[test]
801    fn test_concurrent_trie_prefix_match() {
802        let trie = ConcurrentPathTrie::new();
803
804        trie.insert("users.id", ColumnType::UInt64);
805        trie.insert("users.name", ColumnType::Text);
806        trie.insert("users.profile.email", ColumnType::Text);
807        trie.insert("orders.id", ColumnType::UInt64);
808
809        let user_cols = trie.prefix_match("users");
810        assert_eq!(user_cols.len(), 3);
811
812        let all_cols = trie.prefix_match("");
813        assert_eq!(all_cols.len(), 4);
814    }
815
816    #[test]
817    fn test_concurrent_trie_epoch_management() {
818        let trie = ConcurrentPathTrie::new();
819
820        assert_eq!(trie.current_epoch(), 1);
821
822        // Advance epoch
823        let old = trie.advance_epoch();
824        assert_eq!(old, 1);
825        assert_eq!(trie.current_epoch(), 2);
826
827        // Begin read - should pin to current epoch
828        let guard = trie.begin_read();
829        assert_eq!(guard.epoch(), 2);
830
831        // Advance epoch while read is active
832        trie.advance_epoch();
833        assert_eq!(trie.current_epoch(), 3);
834
835        // Reader still at epoch 2
836        assert_eq!(guard.epoch(), 2);
837
838        // Drop guard and update min epoch
839        drop(guard);
840        trie.update_min_reader_epoch();
841    }
842
843    #[test]
844    fn test_concurrent_trie_multithreaded() {
845        use std::sync::Arc;
846        use std::thread;
847
848        let trie = Arc::new(ConcurrentPathTrie::new());
849        let mut handles = vec![];
850
851        // Spawn writer threads
852        for i in 0..4 {
853            let trie = Arc::clone(&trie);
854            let handle = thread::spawn(move || {
855                for j in 0..25 {
856                    let path = format!("table{}.column{}", i, j);
857                    trie.insert(&path, ColumnType::UInt64);
858                }
859            });
860            handles.push(handle);
861        }
862
863        // Spawn reader threads
864        for _ in 0..4 {
865            let trie = Arc::clone(&trie);
866            let handle = thread::spawn(move || {
867                let _guard = trie.begin_read();
868                // Read some paths (may or may not exist yet)
869                for i in 0..4 {
870                    for j in 0..25 {
871                        let path = format!("table{}.column{}", i, j);
872                        let _ = trie.resolve(&path);
873                    }
874                }
875            });
876            handles.push(handle);
877        }
878
879        for handle in handles {
880            handle.join().unwrap();
881        }
882
883        // All 100 columns should be inserted
884        assert_eq!(trie.total_columns(), 100);
885
886        // Verify all can be resolved
887        for i in 0..4 {
888            for j in 0..25 {
889                let path = format!("table{}.column{}", i, j);
890                assert!(trie.resolve(&path).is_some(), "Missing path: {}", path);
891            }
892        }
893    }
894
895    #[test]
896    fn test_concurrent_trie_read_guard() {
897        let trie = ConcurrentPathTrie::new();
898
899        // Insert some data
900        trie.insert("test.path", ColumnType::UInt64);
901
902        // Multiple concurrent reads
903        {
904            let guard1 = trie.begin_read();
905            let guard2 = trie.begin_read();
906
907            assert_eq!(trie.resolve("test.path"), Some(0));
908            assert_eq!(guard1.epoch(), guard2.epoch());
909
910            // Guards dropped here
911        }
912
913        // Epoch tracking should work
914        trie.update_min_reader_epoch();
915    }
916}