Skip to main content

sochdb_core/
path_trie.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Path Trie for TOON Document Path Resolution (Task 4)
19//!
20//! Implements O(|path|) path resolution for nested TOON documents,
21//! regardless of schema size.
22//!
23//! ## Performance Analysis
24//!
25//! ```text
26//! Traditional BTreeMap: O(d × log F) where d=depth, F=fields
27//! Path Trie:           O(d) where d=path depth
28//!
29//! For path="users.profile.settings.theme" (d=4) with F=100 fields:
30//! - BTreeMap: 4 × log(100) ≈ 26.6 comparisons
31//! - PathTrie: 4 lookups = 4 hash lookups
32//! ```
33//!
34//! ## Memory Model
35//!
36//! Memory: O(N × avg_path_length) where N = total columns
37//! For 100 tables × 50 cols × 3 levels = 15,000 nodes × ~100 bytes = 1.5MB
38
39use dashmap::DashMap;
40use serde::{Deserialize, Serialize};
41use std::collections::HashMap;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
44
45/// Column identifier (matches storage layer)
46pub type ColumnId = u32;
47
48/// A node in the path trie
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct TrieNode {
51    /// Column ID if this path is a leaf (terminal column)
52    pub column_id: Option<ColumnId>,
53    /// Column type for leaf nodes
54    pub column_type: Option<ColumnType>,
55    /// Children nodes keyed by path segment
56    pub children: HashMap<String, Box<TrieNode>>,
57}
58
59impl TrieNode {
60    /// Create an empty node
61    pub fn new() -> Self {
62        Self {
63            column_id: None,
64            column_type: None,
65            children: HashMap::new(),
66        }
67    }
68
69    /// Create a leaf node with column information
70    pub fn leaf(column_id: ColumnId, column_type: ColumnType) -> Self {
71        Self {
72            column_id: Some(column_id),
73            column_type: Some(column_type),
74            children: HashMap::new(),
75        }
76    }
77
78    /// Check if this is a leaf node
79    pub fn is_leaf(&self) -> bool {
80        self.column_id.is_some()
81    }
82
83    /// Count total nodes in subtree
84    pub fn count_nodes(&self) -> usize {
85        1 + self
86            .children
87            .values()
88            .map(|c| c.count_nodes())
89            .sum::<usize>()
90    }
91}
92
93impl Default for TrieNode {
94    fn default() -> Self {
95        Self::new()
96    }
97}
98
99/// Column type for physical storage optimization
100#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
101pub enum ColumnType {
102    /// Boolean (1 byte, SIMD groupable)
103    Bool,
104    /// Signed 64-bit integer (8 bytes, SIMD groupable)
105    Int64,
106    /// Unsigned 64-bit integer (8 bytes, SIMD groupable)
107    UInt64,
108    /// 64-bit float (8 bytes, SIMD groupable)
109    Float64,
110    /// Variable-length text
111    Text,
112    /// Variable-length binary
113    Binary,
114    /// Timestamp (8 bytes, SIMD groupable)
115    Timestamp,
116    /// Nested TOON document
117    Nested,
118    /// Array of values
119    Array,
120}
121
122impl ColumnType {
123    /// Check if type is fixed-size (for SIMD optimization)
124    pub fn is_fixed_size(&self) -> bool {
125        matches!(
126            self,
127            ColumnType::Bool
128                | ColumnType::Int64
129                | ColumnType::UInt64
130                | ColumnType::Float64
131                | ColumnType::Timestamp
132        )
133    }
134
135    /// Get fixed size in bytes (None for variable-length)
136    pub fn fixed_size(&self) -> Option<usize> {
137        match self {
138            ColumnType::Bool => Some(1),
139            ColumnType::Int64
140            | ColumnType::UInt64
141            | ColumnType::Float64
142            | ColumnType::Timestamp => Some(8),
143            _ => None,
144        }
145    }
146}
147
148/// Path Trie for O(|path|) TOON path resolution
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct PathTrie {
151    /// Root node
152    root: TrieNode,
153    /// Total number of columns registered
154    total_columns: u32,
155    /// Next column ID to assign
156    next_column_id: ColumnId,
157}
158
159impl PathTrie {
160    /// Create a new empty trie
161    pub fn new() -> Self {
162        Self {
163            root: TrieNode::new(),
164            total_columns: 0,
165            next_column_id: 0,
166        }
167    }
168
169    /// Insert a path and get its column ID
170    ///
171    /// Path format: "users.profile.settings.theme"
172    /// Returns the assigned column ID
173    pub fn insert(&mut self, path: &str, column_type: ColumnType) -> ColumnId {
174        let segments: Vec<&str> = path.split('.').collect();
175        let column_id = self.next_column_id;
176        self.next_column_id += 1;
177
178        let mut current = &mut self.root;
179
180        for (i, segment) in segments.iter().enumerate() {
181            let is_last = i == segments.len() - 1;
182
183            current = current
184                .children
185                .entry(segment.to_string())
186                .or_insert_with(|| Box::new(TrieNode::new()));
187
188            if is_last {
189                current.column_id = Some(column_id);
190                current.column_type = Some(column_type);
191            }
192        }
193
194        self.total_columns += 1;
195        column_id
196    }
197
198    /// Resolve a path to its column ID in O(|path|) time
199    ///
200    /// Returns None if path doesn't exist
201    pub fn resolve(&self, path: &str) -> Option<ColumnId> {
202        let segments: Vec<&str> = path.split('.').collect();
203        let mut current = &self.root;
204
205        for segment in segments {
206            current = current.children.get(segment)?;
207        }
208
209        current.column_id
210    }
211
212    /// Resolve with type information
213    pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
214        let segments: Vec<&str> = path.split('.').collect();
215        let mut current = &self.root;
216
217        for segment in segments {
218            current = current.children.get(segment)?;
219        }
220
221        Some((current.column_id?, current.column_type?))
222    }
223
224    /// Get all paths that start with a prefix
225    ///
226    /// Useful for wildcard queries like "users.profile.*"
227    pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
228        let mut results = Vec::new();
229
230        // Navigate to prefix node
231        let segments: Vec<&str> = if prefix.is_empty() {
232            vec![]
233        } else {
234            prefix.split('.').collect()
235        };
236
237        let mut current = &self.root;
238        for segment in &segments {
239            if let Some(child) = current.children.get(*segment) {
240                current = child;
241            } else {
242                return results;
243            }
244        }
245
246        // Collect all paths under this node
247        self.collect_paths(current, prefix.to_string(), &mut results);
248        results
249    }
250
251    #[allow(clippy::only_used_in_recursion)]
252    fn collect_paths(&self, node: &TrieNode, path: String, results: &mut Vec<(String, ColumnId)>) {
253        if let Some(col_id) = node.column_id {
254            results.push((path.clone(), col_id));
255        }
256
257        for (segment, child) in &node.children {
258            let child_path = if path.is_empty() {
259                segment.clone()
260            } else {
261                format!("{}.{}", path, segment)
262            };
263            self.collect_paths(child, child_path, results);
264        }
265    }
266
267    /// Get total number of columns
268    pub fn total_columns(&self) -> u32 {
269        self.total_columns
270    }
271
272    /// Get total number of nodes (memory usage indicator)
273    pub fn total_nodes(&self) -> usize {
274        self.root.count_nodes()
275    }
276
277    /// Estimate memory usage in bytes
278    pub fn memory_bytes(&self) -> usize {
279        // Rough estimate: ~100 bytes per node (HashMap overhead + strings)
280        self.total_nodes() * 100
281    }
282}
283
284impl Default for PathTrie {
285    fn default() -> Self {
286        Self::new()
287    }
288}
289
290/// Column group affinity for compression optimization
291///
292/// Groups columns by:
293/// 1. Type compatibility (all Int64 together for SIMD)
294/// 2. Access correlation (co-accessed columns together)
295/// 3. Null density (sparse columns separate)
296#[derive(Debug, Clone)]
297pub struct ColumnGroupAffinity {
298    /// Columns grouped by type for SIMD optimization
299    pub type_groups: HashMap<ColumnType, Vec<ColumnId>>,
300    /// Access frequency per column (for hot/cold separation)
301    pub access_frequency: HashMap<ColumnId, u64>,
302    /// Null density per column (0.0 = never null, 1.0 = always null)
303    pub null_density: HashMap<ColumnId, f64>,
304}
305
306impl ColumnGroupAffinity {
307    /// Create from a path trie
308    pub fn from_trie(trie: &PathTrie) -> Self {
309        let mut type_groups: HashMap<ColumnType, Vec<ColumnId>> = HashMap::new();
310
311        fn collect_columns(node: &TrieNode, groups: &mut HashMap<ColumnType, Vec<ColumnId>>) {
312            if let (Some(col_id), Some(col_type)) = (node.column_id, node.column_type) {
313                groups.entry(col_type).or_default().push(col_id);
314            }
315            for child in node.children.values() {
316                collect_columns(child, groups);
317            }
318        }
319
320        collect_columns(&trie.root, &mut type_groups);
321
322        Self {
323            type_groups,
324            access_frequency: HashMap::new(),
325            null_density: HashMap::new(),
326        }
327    }
328
329    /// Record a column access (for access correlation)
330    pub fn record_access(&mut self, column_id: ColumnId) {
331        *self.access_frequency.entry(column_id).or_insert(0) += 1;
332    }
333
334    /// Update null density for a column
335    pub fn update_null_density(&mut self, column_id: ColumnId, null_count: u64, total_count: u64) {
336        if total_count > 0 {
337            self.null_density
338                .insert(column_id, null_count as f64 / total_count as f64);
339        }
340    }
341
342    /// Get columns suitable for SIMD processing (fixed-size, same type)
343    pub fn simd_groups(&self) -> Vec<(ColumnType, Vec<ColumnId>)> {
344        self.type_groups
345            .iter()
346            .filter(|(t, _)| t.is_fixed_size())
347            .map(|(t, cols)| (*t, cols.clone()))
348            .collect()
349    }
350
351    /// Get sparse columns (null_density > threshold)
352    pub fn sparse_columns(&self, threshold: f64) -> Vec<ColumnId> {
353        self.null_density
354            .iter()
355            .filter(|(_, density)| **density > threshold)
356            .map(|(col_id, _)| *col_id)
357            .collect()
358    }
359
360    /// Get hot columns (top N by access frequency)
361    pub fn hot_columns(&self, n: usize) -> Vec<ColumnId> {
362        let mut sorted: Vec<_> = self.access_frequency.iter().collect();
363        sorted.sort_by(|a, b| b.1.cmp(a.1));
364        sorted
365            .into_iter()
366            .take(n)
367            .map(|(col_id, _)| *col_id)
368            .collect()
369    }
370}
371
372// ============================================================================
373// CONCURRENT PATH TRIE WITH EPOCH-BASED RECLAMATION
374// ============================================================================
375
376/// A node in the concurrent path trie using epoch-based reclamation
377#[derive(Debug)]
378pub struct ConcurrentTrieNode {
379    /// Column ID if this path is a leaf (terminal column)
380    pub column_id: Option<ColumnId>,
381    /// Column type for leaf nodes
382    pub column_type: Option<ColumnType>,
383    /// Children nodes keyed by path segment (lock-free via DashMap)
384    pub children: DashMap<String, Arc<ConcurrentTrieNode>>,
385    /// Epoch when this node was created (for reclamation)
386    pub created_epoch: u64,
387}
388
389impl ConcurrentTrieNode {
390    /// Create an empty node
391    pub fn new(epoch: u64) -> Self {
392        Self {
393            column_id: None,
394            column_type: None,
395            children: DashMap::new(),
396            created_epoch: epoch,
397        }
398    }
399
400    /// Create a leaf node with column information
401    pub fn leaf(column_id: ColumnId, column_type: ColumnType, epoch: u64) -> Self {
402        Self {
403            column_id: Some(column_id),
404            column_type: Some(column_type),
405            children: DashMap::new(),
406            created_epoch: epoch,
407        }
408    }
409
410    /// Check if this is a leaf node
411    pub fn is_leaf(&self) -> bool {
412        self.column_id.is_some()
413    }
414
415    /// Count total nodes in subtree
416    pub fn count_nodes(&self) -> usize {
417        1 + self
418            .children
419            .iter()
420            .map(|r| r.value().count_nodes())
421            .sum::<usize>()
422    }
423}
424
425/// Concurrent Path Trie with lock-free reads and epoch-based reclamation
426///
427/// This trie supports:
428/// - Lock-free concurrent reads via DashMap
429/// - Concurrent inserts with fine-grained locking
430/// - Epoch-based garbage collection for safe memory reclamation
431///
432/// ## Performance
433///
434/// - Read: O(|path|) with no locking (DashMap provides lock-free reads)
435/// - Insert: O(|path|) with minimal lock contention
436/// - Memory reclamation: Deferred until no readers access old nodes
437#[derive(Debug)]
438pub struct ConcurrentPathTrie {
439    /// Root node (never changes, only children do)
440    root: Arc<ConcurrentTrieNode>,
441    /// Total number of columns registered
442    total_columns: AtomicU32,
443    /// Next column ID to assign
444    next_column_id: AtomicU32,
445    /// Current epoch for versioning
446    current_epoch: AtomicU64,
447    /// Minimum active reader epoch (for GC)
448    min_reader_epoch: AtomicU64,
449    /// Reader count per epoch (for tracking when to collect)
450    reader_epochs: DashMap<u64, AtomicU32>,
451}
452
453impl ConcurrentPathTrie {
454    /// Create a new empty concurrent trie
455    pub fn new() -> Self {
456        Self {
457            root: Arc::new(ConcurrentTrieNode::new(0)),
458            total_columns: AtomicU32::new(0),
459            next_column_id: AtomicU32::new(0),
460            current_epoch: AtomicU64::new(1),
461            min_reader_epoch: AtomicU64::new(0),
462            reader_epochs: DashMap::new(),
463        }
464    }
465
466    /// Get current epoch
467    pub fn current_epoch(&self) -> u64 {
468        self.current_epoch.load(Ordering::Acquire)
469    }
470
471    /// Advance epoch (call periodically to allow GC)
472    pub fn advance_epoch(&self) -> u64 {
473        self.current_epoch.fetch_add(1, Ordering::AcqRel)
474    }
475
476    /// Begin a read operation (returns epoch guard)
477    /// The guard must be held for the duration of the read
478    pub fn begin_read(&self) -> ReadGuard<'_> {
479        let epoch = self.current_epoch.load(Ordering::Acquire);
480
481        // Increment reader count for this epoch
482        self.reader_epochs
483            .entry(epoch)
484            .or_insert_with(|| AtomicU32::new(0))
485            .fetch_add(1, Ordering::Relaxed);
486
487        ReadGuard { trie: self, epoch }
488    }
489
490    /// Insert a path and get its column ID (thread-safe)
491    ///
492    /// Path format: "users.profile.settings.theme"
493    /// Returns the assigned column ID
494    pub fn insert(&self, path: &str, column_type: ColumnType) -> ColumnId {
495        let segments: Vec<&str> = path.split('.').collect();
496        let column_id = self.next_column_id.fetch_add(1, Ordering::Relaxed);
497        let epoch = self.current_epoch.load(Ordering::Acquire);
498
499        let mut current = self.root.clone();
500
501        for (i, segment) in segments.iter().enumerate() {
502            let is_last = i == segments.len() - 1;
503
504            if is_last {
505                // Create or update leaf node
506                let leaf = Arc::new(ConcurrentTrieNode::leaf(column_id, column_type, epoch));
507                current.children.insert(segment.to_string(), leaf);
508            } else {
509                // Get or create intermediate node
510                let next = current
511                    .children
512                    .entry(segment.to_string())
513                    .or_insert_with(|| Arc::new(ConcurrentTrieNode::new(epoch)))
514                    .clone();
515                current = next;
516            }
517        }
518
519        self.total_columns.fetch_add(1, Ordering::Relaxed);
520        column_id
521    }
522
523    /// Resolve a path to its column ID in O(|path|) time (lock-free)
524    ///
525    /// Returns None if path doesn't exist
526    pub fn resolve(&self, path: &str) -> Option<ColumnId> {
527        let segments: Vec<&str> = path.split('.').collect();
528        let mut current = self.root.clone();
529
530        for segment in segments {
531            let next = current.children.get(segment)?.clone();
532            current = next;
533        }
534
535        current.column_id
536    }
537
538    /// Resolve with type information (lock-free)
539    pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
540        let segments: Vec<&str> = path.split('.').collect();
541        let mut current = self.root.clone();
542
543        for segment in segments {
544            let next = current.children.get(segment)?.clone();
545            current = next;
546        }
547
548        Some((current.column_id?, current.column_type?))
549    }
550
551    /// Get all paths that start with a prefix (lock-free)
552    pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
553        let mut results = Vec::new();
554
555        let segments: Vec<&str> = if prefix.is_empty() {
556            vec![]
557        } else {
558            prefix.split('.').collect()
559        };
560
561        let mut current = self.root.clone();
562        for segment in &segments {
563            let next = match current.children.get(*segment) {
564                Some(child) => child.clone(),
565                None => return results,
566            };
567            current = next;
568        }
569
570        self.collect_paths(&current, prefix.to_string(), &mut results);
571        results
572    }
573
574    #[allow(clippy::only_used_in_recursion)]
575    fn collect_paths(
576        &self,
577        node: &ConcurrentTrieNode,
578        path: String,
579        results: &mut Vec<(String, ColumnId)>,
580    ) {
581        if let Some(col_id) = node.column_id {
582            results.push((path.clone(), col_id));
583        }
584
585        for entry in node.children.iter() {
586            let child_path = if path.is_empty() {
587                entry.key().clone()
588            } else {
589                format!("{}.{}", path, entry.key())
590            };
591            self.collect_paths(entry.value(), child_path, results);
592        }
593    }
594
595    /// Get total number of columns
596    pub fn total_columns(&self) -> u32 {
597        self.total_columns.load(Ordering::Relaxed)
598    }
599
600    /// Get total number of nodes (memory usage indicator)
601    pub fn total_nodes(&self) -> usize {
602        self.root.count_nodes()
603    }
604
605    /// Update minimum reader epoch (call after readers complete)
606    pub fn update_min_reader_epoch(&self) {
607        let mut min_epoch = self.current_epoch.load(Ordering::Acquire);
608
609        // Find minimum epoch with active readers
610        for entry in self.reader_epochs.iter() {
611            if entry.value().load(Ordering::Relaxed) > 0 && *entry.key() < min_epoch {
612                min_epoch = *entry.key();
613            }
614        }
615
616        self.min_reader_epoch.store(min_epoch, Ordering::Release);
617
618        // Clean up old epoch entries
619        let threshold = min_epoch.saturating_sub(10);
620        self.reader_epochs
621            .retain(|epoch, count| *epoch >= threshold || count.load(Ordering::Relaxed) > 0);
622    }
623
624    /// Get minimum reader epoch (nodes older than this can be reclaimed)
625    pub fn min_reader_epoch(&self) -> u64 {
626        self.min_reader_epoch.load(Ordering::Acquire)
627    }
628}
629
630impl Default for ConcurrentPathTrie {
631    fn default() -> Self {
632        Self::new()
633    }
634}
635
636// Make ConcurrentPathTrie Send + Sync
637unsafe impl Send for ConcurrentPathTrie {}
638unsafe impl Sync for ConcurrentPathTrie {}
639
640/// RAII guard for read operations
641/// Decrements reader count when dropped
642pub struct ReadGuard<'a> {
643    trie: &'a ConcurrentPathTrie,
644    epoch: u64,
645}
646
647impl<'a> ReadGuard<'a> {
648    /// Get the epoch this read is pinned to
649    pub fn epoch(&self) -> u64 {
650        self.epoch
651    }
652}
653
654impl<'a> Drop for ReadGuard<'a> {
655    fn drop(&mut self) {
656        if let Some(count) = self.trie.reader_epochs.get(&self.epoch) {
657            count.fetch_sub(1, Ordering::Relaxed);
658        }
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    #[test]
667    fn test_path_trie_insert_resolve() {
668        let mut trie = PathTrie::new();
669
670        let id1 = trie.insert("users.id", ColumnType::UInt64);
671        let id2 = trie.insert("users.name", ColumnType::Text);
672        let id3 = trie.insert("users.profile.email", ColumnType::Text);
673        let id4 = trie.insert("users.profile.settings.theme", ColumnType::Text);
674
675        assert_eq!(trie.resolve("users.id"), Some(id1));
676        assert_eq!(trie.resolve("users.name"), Some(id2));
677        assert_eq!(trie.resolve("users.profile.email"), Some(id3));
678        assert_eq!(trie.resolve("users.profile.settings.theme"), Some(id4));
679        assert_eq!(trie.resolve("nonexistent"), None);
680        assert_eq!(trie.resolve("users.profile"), None); // Not a leaf
681
682        assert_eq!(trie.total_columns(), 4);
683    }
684
685    #[test]
686    fn test_path_trie_prefix_match() {
687        let mut trie = PathTrie::new();
688
689        trie.insert("users.id", ColumnType::UInt64);
690        trie.insert("users.name", ColumnType::Text);
691        trie.insert("users.profile.email", ColumnType::Text);
692        trie.insert("orders.id", ColumnType::UInt64);
693
694        let user_cols = trie.prefix_match("users");
695        assert_eq!(user_cols.len(), 3);
696
697        let profile_cols = trie.prefix_match("users.profile");
698        assert_eq!(profile_cols.len(), 1);
699
700        let all_cols = trie.prefix_match("");
701        assert_eq!(all_cols.len(), 4);
702    }
703
704    #[test]
705    fn test_resolve_with_type() {
706        let mut trie = PathTrie::new();
707
708        trie.insert("score", ColumnType::Float64);
709        trie.insert("name", ColumnType::Text);
710
711        let (id, col_type) = trie.resolve_with_type("score").unwrap();
712        assert_eq!(id, 0);
713        assert_eq!(col_type, ColumnType::Float64);
714
715        let (id, col_type) = trie.resolve_with_type("name").unwrap();
716        assert_eq!(id, 1);
717        assert_eq!(col_type, ColumnType::Text);
718    }
719
720    #[test]
721    fn test_column_group_affinity() {
722        let mut trie = PathTrie::new();
723
724        trie.insert("id", ColumnType::UInt64);
725        trie.insert("score", ColumnType::Float64);
726        trie.insert("age", ColumnType::Int64);
727        trie.insert("name", ColumnType::Text);
728        trie.insert("timestamp", ColumnType::Timestamp);
729
730        let mut affinity = ColumnGroupAffinity::from_trie(&trie);
731
732        // Check SIMD groups
733        let simd = affinity.simd_groups();
734        assert!(!simd.is_empty());
735
736        // Record accesses
737        affinity.record_access(0);
738        affinity.record_access(0);
739        affinity.record_access(1);
740
741        let hot = affinity.hot_columns(2);
742        assert_eq!(hot.len(), 2);
743        assert_eq!(hot[0], 0); // Most accessed
744
745        // Check sparse columns
746        affinity.update_null_density(3, 90, 100);
747        let sparse = affinity.sparse_columns(0.5);
748        assert_eq!(sparse, vec![3]);
749    }
750
751    #[test]
752    fn test_memory_estimate() {
753        let mut trie = PathTrie::new();
754
755        for i in 0..100 {
756            trie.insert(
757                &format!("table{}.column{}", i / 10, i % 10),
758                ColumnType::UInt64,
759            );
760        }
761
762        // ~10 tables × ~10 columns × 2-3 levels = ~300 nodes
763        assert!(trie.total_nodes() > 100);
764        assert!(trie.memory_bytes() > 10000); // At least 10KB
765    }
766
767    // ========================================================================
768    // CONCURRENT PATH TRIE TESTS
769    // ========================================================================
770
771    #[test]
772    fn test_concurrent_trie_basic() {
773        let trie = ConcurrentPathTrie::new();
774
775        let id1 = trie.insert("users.id", ColumnType::UInt64);
776        let id2 = trie.insert("users.name", ColumnType::Text);
777        let id3 = trie.insert("users.profile.email", ColumnType::Text);
778
779        assert_eq!(trie.resolve("users.id"), Some(id1));
780        assert_eq!(trie.resolve("users.name"), Some(id2));
781        assert_eq!(trie.resolve("users.profile.email"), Some(id3));
782        assert_eq!(trie.resolve("nonexistent"), None);
783
784        assert_eq!(trie.total_columns(), 3);
785    }
786
787    #[test]
788    fn test_concurrent_trie_resolve_with_type() {
789        let trie = ConcurrentPathTrie::new();
790
791        trie.insert("score", ColumnType::Float64);
792        trie.insert("name", ColumnType::Text);
793
794        let (id, col_type) = trie.resolve_with_type("score").unwrap();
795        assert_eq!(id, 0);
796        assert_eq!(col_type, ColumnType::Float64);
797
798        let (id, col_type) = trie.resolve_with_type("name").unwrap();
799        assert_eq!(id, 1);
800        assert_eq!(col_type, ColumnType::Text);
801    }
802
803    #[test]
804    fn test_concurrent_trie_prefix_match() {
805        let trie = ConcurrentPathTrie::new();
806
807        trie.insert("users.id", ColumnType::UInt64);
808        trie.insert("users.name", ColumnType::Text);
809        trie.insert("users.profile.email", ColumnType::Text);
810        trie.insert("orders.id", ColumnType::UInt64);
811
812        let user_cols = trie.prefix_match("users");
813        assert_eq!(user_cols.len(), 3);
814
815        let all_cols = trie.prefix_match("");
816        assert_eq!(all_cols.len(), 4);
817    }
818
819    #[test]
820    fn test_concurrent_trie_epoch_management() {
821        let trie = ConcurrentPathTrie::new();
822
823        assert_eq!(trie.current_epoch(), 1);
824
825        // Advance epoch
826        let old = trie.advance_epoch();
827        assert_eq!(old, 1);
828        assert_eq!(trie.current_epoch(), 2);
829
830        // Begin read - should pin to current epoch
831        let guard = trie.begin_read();
832        assert_eq!(guard.epoch(), 2);
833
834        // Advance epoch while read is active
835        trie.advance_epoch();
836        assert_eq!(trie.current_epoch(), 3);
837
838        // Reader still at epoch 2
839        assert_eq!(guard.epoch(), 2);
840
841        // Drop guard and update min epoch
842        drop(guard);
843        trie.update_min_reader_epoch();
844    }
845
846    #[test]
847    fn test_concurrent_trie_multithreaded() {
848        use std::sync::Arc;
849        use std::thread;
850
851        let trie = Arc::new(ConcurrentPathTrie::new());
852        let mut handles = vec![];
853
854        // Spawn writer threads
855        for i in 0..4 {
856            let trie = Arc::clone(&trie);
857            let handle = thread::spawn(move || {
858                for j in 0..25 {
859                    let path = format!("table{}.column{}", i, j);
860                    trie.insert(&path, ColumnType::UInt64);
861                }
862            });
863            handles.push(handle);
864        }
865
866        // Spawn reader threads
867        for _ in 0..4 {
868            let trie = Arc::clone(&trie);
869            let handle = thread::spawn(move || {
870                let _guard = trie.begin_read();
871                // Read some paths (may or may not exist yet)
872                for i in 0..4 {
873                    for j in 0..25 {
874                        let path = format!("table{}.column{}", i, j);
875                        let _ = trie.resolve(&path);
876                    }
877                }
878            });
879            handles.push(handle);
880        }
881
882        for handle in handles {
883            handle.join().unwrap();
884        }
885
886        // All 100 columns should be inserted
887        assert_eq!(trie.total_columns(), 100);
888
889        // Verify all can be resolved
890        for i in 0..4 {
891            for j in 0..25 {
892                let path = format!("table{}.column{}", i, j);
893                assert!(trie.resolve(&path).is_some(), "Missing path: {}", path);
894            }
895        }
896    }
897
898    #[test]
899    fn test_concurrent_trie_read_guard() {
900        let trie = ConcurrentPathTrie::new();
901
902        // Insert some data
903        trie.insert("test.path", ColumnType::UInt64);
904
905        // Multiple concurrent reads
906        {
907            let guard1 = trie.begin_read();
908            let guard2 = trie.begin_read();
909
910            assert_eq!(trie.resolve("test.path"), Some(0));
911            assert_eq!(guard1.epoch(), guard2.epoch());
912
913            // Guards dropped here
914        }
915
916        // Epoch tracking should work
917        trie.update_min_reader_epoch();
918    }
919}