1use dashmap::DashMap;
40use serde::{Deserialize, Serialize};
41use std::collections::HashMap;
42use std::sync::Arc;
43use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
44
45pub type ColumnId = u32;
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct TrieNode {
51 pub column_id: Option<ColumnId>,
53 pub column_type: Option<ColumnType>,
55 pub children: HashMap<String, Box<TrieNode>>,
57}
58
59impl TrieNode {
60 pub fn new() -> Self {
62 Self {
63 column_id: None,
64 column_type: None,
65 children: HashMap::new(),
66 }
67 }
68
69 pub fn leaf(column_id: ColumnId, column_type: ColumnType) -> Self {
71 Self {
72 column_id: Some(column_id),
73 column_type: Some(column_type),
74 children: HashMap::new(),
75 }
76 }
77
78 pub fn is_leaf(&self) -> bool {
80 self.column_id.is_some()
81 }
82
83 pub fn count_nodes(&self) -> usize {
85 1 + self
86 .children
87 .values()
88 .map(|c| c.count_nodes())
89 .sum::<usize>()
90 }
91}
92
93impl Default for TrieNode {
94 fn default() -> Self {
95 Self::new()
96 }
97}
98
99#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
101pub enum ColumnType {
102 Bool,
104 Int64,
106 UInt64,
108 Float64,
110 Text,
112 Binary,
114 Timestamp,
116 Nested,
118 Array,
120}
121
122impl ColumnType {
123 pub fn is_fixed_size(&self) -> bool {
125 matches!(
126 self,
127 ColumnType::Bool
128 | ColumnType::Int64
129 | ColumnType::UInt64
130 | ColumnType::Float64
131 | ColumnType::Timestamp
132 )
133 }
134
135 pub fn fixed_size(&self) -> Option<usize> {
137 match self {
138 ColumnType::Bool => Some(1),
139 ColumnType::Int64
140 | ColumnType::UInt64
141 | ColumnType::Float64
142 | ColumnType::Timestamp => Some(8),
143 _ => None,
144 }
145 }
146}
147
148#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct PathTrie {
151 root: TrieNode,
153 total_columns: u32,
155 next_column_id: ColumnId,
157}
158
159impl PathTrie {
160 pub fn new() -> Self {
162 Self {
163 root: TrieNode::new(),
164 total_columns: 0,
165 next_column_id: 0,
166 }
167 }
168
169 pub fn insert(&mut self, path: &str, column_type: ColumnType) -> ColumnId {
174 let segments: Vec<&str> = path.split('.').collect();
175 let column_id = self.next_column_id;
176 self.next_column_id += 1;
177
178 let mut current = &mut self.root;
179
180 for (i, segment) in segments.iter().enumerate() {
181 let is_last = i == segments.len() - 1;
182
183 current = current
184 .children
185 .entry(segment.to_string())
186 .or_insert_with(|| Box::new(TrieNode::new()));
187
188 if is_last {
189 current.column_id = Some(column_id);
190 current.column_type = Some(column_type);
191 }
192 }
193
194 self.total_columns += 1;
195 column_id
196 }
197
198 pub fn resolve(&self, path: &str) -> Option<ColumnId> {
202 let segments: Vec<&str> = path.split('.').collect();
203 let mut current = &self.root;
204
205 for segment in segments {
206 current = current.children.get(segment)?;
207 }
208
209 current.column_id
210 }
211
212 pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
214 let segments: Vec<&str> = path.split('.').collect();
215 let mut current = &self.root;
216
217 for segment in segments {
218 current = current.children.get(segment)?;
219 }
220
221 Some((current.column_id?, current.column_type?))
222 }
223
224 pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
228 let mut results = Vec::new();
229
230 let segments: Vec<&str> = if prefix.is_empty() {
232 vec![]
233 } else {
234 prefix.split('.').collect()
235 };
236
237 let mut current = &self.root;
238 for segment in &segments {
239 if let Some(child) = current.children.get(*segment) {
240 current = child;
241 } else {
242 return results;
243 }
244 }
245
246 self.collect_paths(current, prefix.to_string(), &mut results);
248 results
249 }
250
251 #[allow(clippy::only_used_in_recursion)]
252 fn collect_paths(&self, node: &TrieNode, path: String, results: &mut Vec<(String, ColumnId)>) {
253 if let Some(col_id) = node.column_id {
254 results.push((path.clone(), col_id));
255 }
256
257 for (segment, child) in &node.children {
258 let child_path = if path.is_empty() {
259 segment.clone()
260 } else {
261 format!("{}.{}", path, segment)
262 };
263 self.collect_paths(child, child_path, results);
264 }
265 }
266
267 pub fn total_columns(&self) -> u32 {
269 self.total_columns
270 }
271
272 pub fn total_nodes(&self) -> usize {
274 self.root.count_nodes()
275 }
276
277 pub fn memory_bytes(&self) -> usize {
279 self.total_nodes() * 100
281 }
282}
283
284impl Default for PathTrie {
285 fn default() -> Self {
286 Self::new()
287 }
288}
289
290#[derive(Debug, Clone)]
297pub struct ColumnGroupAffinity {
298 pub type_groups: HashMap<ColumnType, Vec<ColumnId>>,
300 pub access_frequency: HashMap<ColumnId, u64>,
302 pub null_density: HashMap<ColumnId, f64>,
304}
305
306impl ColumnGroupAffinity {
307 pub fn from_trie(trie: &PathTrie) -> Self {
309 let mut type_groups: HashMap<ColumnType, Vec<ColumnId>> = HashMap::new();
310
311 fn collect_columns(node: &TrieNode, groups: &mut HashMap<ColumnType, Vec<ColumnId>>) {
312 if let (Some(col_id), Some(col_type)) = (node.column_id, node.column_type) {
313 groups.entry(col_type).or_default().push(col_id);
314 }
315 for child in node.children.values() {
316 collect_columns(child, groups);
317 }
318 }
319
320 collect_columns(&trie.root, &mut type_groups);
321
322 Self {
323 type_groups,
324 access_frequency: HashMap::new(),
325 null_density: HashMap::new(),
326 }
327 }
328
329 pub fn record_access(&mut self, column_id: ColumnId) {
331 *self.access_frequency.entry(column_id).or_insert(0) += 1;
332 }
333
334 pub fn update_null_density(&mut self, column_id: ColumnId, null_count: u64, total_count: u64) {
336 if total_count > 0 {
337 self.null_density
338 .insert(column_id, null_count as f64 / total_count as f64);
339 }
340 }
341
342 pub fn simd_groups(&self) -> Vec<(ColumnType, Vec<ColumnId>)> {
344 self.type_groups
345 .iter()
346 .filter(|(t, _)| t.is_fixed_size())
347 .map(|(t, cols)| (*t, cols.clone()))
348 .collect()
349 }
350
351 pub fn sparse_columns(&self, threshold: f64) -> Vec<ColumnId> {
353 self.null_density
354 .iter()
355 .filter(|(_, density)| **density > threshold)
356 .map(|(col_id, _)| *col_id)
357 .collect()
358 }
359
360 pub fn hot_columns(&self, n: usize) -> Vec<ColumnId> {
362 let mut sorted: Vec<_> = self.access_frequency.iter().collect();
363 sorted.sort_by(|a, b| b.1.cmp(a.1));
364 sorted
365 .into_iter()
366 .take(n)
367 .map(|(col_id, _)| *col_id)
368 .collect()
369 }
370}
371
372#[derive(Debug)]
378pub struct ConcurrentTrieNode {
379 pub column_id: Option<ColumnId>,
381 pub column_type: Option<ColumnType>,
383 pub children: DashMap<String, Arc<ConcurrentTrieNode>>,
385 pub created_epoch: u64,
387}
388
389impl ConcurrentTrieNode {
390 pub fn new(epoch: u64) -> Self {
392 Self {
393 column_id: None,
394 column_type: None,
395 children: DashMap::new(),
396 created_epoch: epoch,
397 }
398 }
399
400 pub fn leaf(column_id: ColumnId, column_type: ColumnType, epoch: u64) -> Self {
402 Self {
403 column_id: Some(column_id),
404 column_type: Some(column_type),
405 children: DashMap::new(),
406 created_epoch: epoch,
407 }
408 }
409
410 pub fn is_leaf(&self) -> bool {
412 self.column_id.is_some()
413 }
414
415 pub fn count_nodes(&self) -> usize {
417 1 + self
418 .children
419 .iter()
420 .map(|r| r.value().count_nodes())
421 .sum::<usize>()
422 }
423}
424
425#[derive(Debug)]
438pub struct ConcurrentPathTrie {
439 root: Arc<ConcurrentTrieNode>,
441 total_columns: AtomicU32,
443 next_column_id: AtomicU32,
445 current_epoch: AtomicU64,
447 min_reader_epoch: AtomicU64,
449 reader_epochs: DashMap<u64, AtomicU32>,
451}
452
453impl ConcurrentPathTrie {
454 pub fn new() -> Self {
456 Self {
457 root: Arc::new(ConcurrentTrieNode::new(0)),
458 total_columns: AtomicU32::new(0),
459 next_column_id: AtomicU32::new(0),
460 current_epoch: AtomicU64::new(1),
461 min_reader_epoch: AtomicU64::new(0),
462 reader_epochs: DashMap::new(),
463 }
464 }
465
466 pub fn current_epoch(&self) -> u64 {
468 self.current_epoch.load(Ordering::Acquire)
469 }
470
471 pub fn advance_epoch(&self) -> u64 {
473 self.current_epoch.fetch_add(1, Ordering::AcqRel)
474 }
475
476 pub fn begin_read(&self) -> ReadGuard<'_> {
479 let epoch = self.current_epoch.load(Ordering::Acquire);
480
481 self.reader_epochs
483 .entry(epoch)
484 .or_insert_with(|| AtomicU32::new(0))
485 .fetch_add(1, Ordering::Relaxed);
486
487 ReadGuard { trie: self, epoch }
488 }
489
490 pub fn insert(&self, path: &str, column_type: ColumnType) -> ColumnId {
495 let segments: Vec<&str> = path.split('.').collect();
496 let column_id = self.next_column_id.fetch_add(1, Ordering::Relaxed);
497 let epoch = self.current_epoch.load(Ordering::Acquire);
498
499 let mut current = self.root.clone();
500
501 for (i, segment) in segments.iter().enumerate() {
502 let is_last = i == segments.len() - 1;
503
504 if is_last {
505 let leaf = Arc::new(ConcurrentTrieNode::leaf(column_id, column_type, epoch));
507 current.children.insert(segment.to_string(), leaf);
508 } else {
509 let next = current
511 .children
512 .entry(segment.to_string())
513 .or_insert_with(|| Arc::new(ConcurrentTrieNode::new(epoch)))
514 .clone();
515 current = next;
516 }
517 }
518
519 self.total_columns.fetch_add(1, Ordering::Relaxed);
520 column_id
521 }
522
523 pub fn resolve(&self, path: &str) -> Option<ColumnId> {
527 let segments: Vec<&str> = path.split('.').collect();
528 let mut current = self.root.clone();
529
530 for segment in segments {
531 let next = current.children.get(segment)?.clone();
532 current = next;
533 }
534
535 current.column_id
536 }
537
538 pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
540 let segments: Vec<&str> = path.split('.').collect();
541 let mut current = self.root.clone();
542
543 for segment in segments {
544 let next = current.children.get(segment)?.clone();
545 current = next;
546 }
547
548 Some((current.column_id?, current.column_type?))
549 }
550
551 pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
553 let mut results = Vec::new();
554
555 let segments: Vec<&str> = if prefix.is_empty() {
556 vec![]
557 } else {
558 prefix.split('.').collect()
559 };
560
561 let mut current = self.root.clone();
562 for segment in &segments {
563 let next = match current.children.get(*segment) {
564 Some(child) => child.clone(),
565 None => return results,
566 };
567 current = next;
568 }
569
570 self.collect_paths(¤t, prefix.to_string(), &mut results);
571 results
572 }
573
574 #[allow(clippy::only_used_in_recursion)]
575 fn collect_paths(
576 &self,
577 node: &ConcurrentTrieNode,
578 path: String,
579 results: &mut Vec<(String, ColumnId)>,
580 ) {
581 if let Some(col_id) = node.column_id {
582 results.push((path.clone(), col_id));
583 }
584
585 for entry in node.children.iter() {
586 let child_path = if path.is_empty() {
587 entry.key().clone()
588 } else {
589 format!("{}.{}", path, entry.key())
590 };
591 self.collect_paths(entry.value(), child_path, results);
592 }
593 }
594
595 pub fn total_columns(&self) -> u32 {
597 self.total_columns.load(Ordering::Relaxed)
598 }
599
600 pub fn total_nodes(&self) -> usize {
602 self.root.count_nodes()
603 }
604
605 pub fn update_min_reader_epoch(&self) {
607 let mut min_epoch = self.current_epoch.load(Ordering::Acquire);
608
609 for entry in self.reader_epochs.iter() {
611 if entry.value().load(Ordering::Relaxed) > 0 && *entry.key() < min_epoch {
612 min_epoch = *entry.key();
613 }
614 }
615
616 self.min_reader_epoch.store(min_epoch, Ordering::Release);
617
618 let threshold = min_epoch.saturating_sub(10);
620 self.reader_epochs
621 .retain(|epoch, count| *epoch >= threshold || count.load(Ordering::Relaxed) > 0);
622 }
623
624 pub fn min_reader_epoch(&self) -> u64 {
626 self.min_reader_epoch.load(Ordering::Acquire)
627 }
628}
629
630impl Default for ConcurrentPathTrie {
631 fn default() -> Self {
632 Self::new()
633 }
634}
635
636unsafe impl Send for ConcurrentPathTrie {}
638unsafe impl Sync for ConcurrentPathTrie {}
639
640pub struct ReadGuard<'a> {
643 trie: &'a ConcurrentPathTrie,
644 epoch: u64,
645}
646
647impl<'a> ReadGuard<'a> {
648 pub fn epoch(&self) -> u64 {
650 self.epoch
651 }
652}
653
654impl<'a> Drop for ReadGuard<'a> {
655 fn drop(&mut self) {
656 if let Some(count) = self.trie.reader_epochs.get(&self.epoch) {
657 count.fetch_sub(1, Ordering::Relaxed);
658 }
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 #[test]
667 fn test_path_trie_insert_resolve() {
668 let mut trie = PathTrie::new();
669
670 let id1 = trie.insert("users.id", ColumnType::UInt64);
671 let id2 = trie.insert("users.name", ColumnType::Text);
672 let id3 = trie.insert("users.profile.email", ColumnType::Text);
673 let id4 = trie.insert("users.profile.settings.theme", ColumnType::Text);
674
675 assert_eq!(trie.resolve("users.id"), Some(id1));
676 assert_eq!(trie.resolve("users.name"), Some(id2));
677 assert_eq!(trie.resolve("users.profile.email"), Some(id3));
678 assert_eq!(trie.resolve("users.profile.settings.theme"), Some(id4));
679 assert_eq!(trie.resolve("nonexistent"), None);
680 assert_eq!(trie.resolve("users.profile"), None); assert_eq!(trie.total_columns(), 4);
683 }
684
685 #[test]
686 fn test_path_trie_prefix_match() {
687 let mut trie = PathTrie::new();
688
689 trie.insert("users.id", ColumnType::UInt64);
690 trie.insert("users.name", ColumnType::Text);
691 trie.insert("users.profile.email", ColumnType::Text);
692 trie.insert("orders.id", ColumnType::UInt64);
693
694 let user_cols = trie.prefix_match("users");
695 assert_eq!(user_cols.len(), 3);
696
697 let profile_cols = trie.prefix_match("users.profile");
698 assert_eq!(profile_cols.len(), 1);
699
700 let all_cols = trie.prefix_match("");
701 assert_eq!(all_cols.len(), 4);
702 }
703
704 #[test]
705 fn test_resolve_with_type() {
706 let mut trie = PathTrie::new();
707
708 trie.insert("score", ColumnType::Float64);
709 trie.insert("name", ColumnType::Text);
710
711 let (id, col_type) = trie.resolve_with_type("score").unwrap();
712 assert_eq!(id, 0);
713 assert_eq!(col_type, ColumnType::Float64);
714
715 let (id, col_type) = trie.resolve_with_type("name").unwrap();
716 assert_eq!(id, 1);
717 assert_eq!(col_type, ColumnType::Text);
718 }
719
720 #[test]
721 fn test_column_group_affinity() {
722 let mut trie = PathTrie::new();
723
724 trie.insert("id", ColumnType::UInt64);
725 trie.insert("score", ColumnType::Float64);
726 trie.insert("age", ColumnType::Int64);
727 trie.insert("name", ColumnType::Text);
728 trie.insert("timestamp", ColumnType::Timestamp);
729
730 let mut affinity = ColumnGroupAffinity::from_trie(&trie);
731
732 let simd = affinity.simd_groups();
734 assert!(!simd.is_empty());
735
736 affinity.record_access(0);
738 affinity.record_access(0);
739 affinity.record_access(1);
740
741 let hot = affinity.hot_columns(2);
742 assert_eq!(hot.len(), 2);
743 assert_eq!(hot[0], 0); affinity.update_null_density(3, 90, 100);
747 let sparse = affinity.sparse_columns(0.5);
748 assert_eq!(sparse, vec![3]);
749 }
750
751 #[test]
752 fn test_memory_estimate() {
753 let mut trie = PathTrie::new();
754
755 for i in 0..100 {
756 trie.insert(
757 &format!("table{}.column{}", i / 10, i % 10),
758 ColumnType::UInt64,
759 );
760 }
761
762 assert!(trie.total_nodes() > 100);
764 assert!(trie.memory_bytes() > 10000); }
766
767 #[test]
772 fn test_concurrent_trie_basic() {
773 let trie = ConcurrentPathTrie::new();
774
775 let id1 = trie.insert("users.id", ColumnType::UInt64);
776 let id2 = trie.insert("users.name", ColumnType::Text);
777 let id3 = trie.insert("users.profile.email", ColumnType::Text);
778
779 assert_eq!(trie.resolve("users.id"), Some(id1));
780 assert_eq!(trie.resolve("users.name"), Some(id2));
781 assert_eq!(trie.resolve("users.profile.email"), Some(id3));
782 assert_eq!(trie.resolve("nonexistent"), None);
783
784 assert_eq!(trie.total_columns(), 3);
785 }
786
787 #[test]
788 fn test_concurrent_trie_resolve_with_type() {
789 let trie = ConcurrentPathTrie::new();
790
791 trie.insert("score", ColumnType::Float64);
792 trie.insert("name", ColumnType::Text);
793
794 let (id, col_type) = trie.resolve_with_type("score").unwrap();
795 assert_eq!(id, 0);
796 assert_eq!(col_type, ColumnType::Float64);
797
798 let (id, col_type) = trie.resolve_with_type("name").unwrap();
799 assert_eq!(id, 1);
800 assert_eq!(col_type, ColumnType::Text);
801 }
802
803 #[test]
804 fn test_concurrent_trie_prefix_match() {
805 let trie = ConcurrentPathTrie::new();
806
807 trie.insert("users.id", ColumnType::UInt64);
808 trie.insert("users.name", ColumnType::Text);
809 trie.insert("users.profile.email", ColumnType::Text);
810 trie.insert("orders.id", ColumnType::UInt64);
811
812 let user_cols = trie.prefix_match("users");
813 assert_eq!(user_cols.len(), 3);
814
815 let all_cols = trie.prefix_match("");
816 assert_eq!(all_cols.len(), 4);
817 }
818
819 #[test]
820 fn test_concurrent_trie_epoch_management() {
821 let trie = ConcurrentPathTrie::new();
822
823 assert_eq!(trie.current_epoch(), 1);
824
825 let old = trie.advance_epoch();
827 assert_eq!(old, 1);
828 assert_eq!(trie.current_epoch(), 2);
829
830 let guard = trie.begin_read();
832 assert_eq!(guard.epoch(), 2);
833
834 trie.advance_epoch();
836 assert_eq!(trie.current_epoch(), 3);
837
838 assert_eq!(guard.epoch(), 2);
840
841 drop(guard);
843 trie.update_min_reader_epoch();
844 }
845
846 #[test]
847 fn test_concurrent_trie_multithreaded() {
848 use std::sync::Arc;
849 use std::thread;
850
851 let trie = Arc::new(ConcurrentPathTrie::new());
852 let mut handles = vec![];
853
854 for i in 0..4 {
856 let trie = Arc::clone(&trie);
857 let handle = thread::spawn(move || {
858 for j in 0..25 {
859 let path = format!("table{}.column{}", i, j);
860 trie.insert(&path, ColumnType::UInt64);
861 }
862 });
863 handles.push(handle);
864 }
865
866 for _ in 0..4 {
868 let trie = Arc::clone(&trie);
869 let handle = thread::spawn(move || {
870 let _guard = trie.begin_read();
871 for i in 0..4 {
873 for j in 0..25 {
874 let path = format!("table{}.column{}", i, j);
875 let _ = trie.resolve(&path);
876 }
877 }
878 });
879 handles.push(handle);
880 }
881
882 for handle in handles {
883 handle.join().unwrap();
884 }
885
886 assert_eq!(trie.total_columns(), 100);
888
889 for i in 0..4 {
891 for j in 0..25 {
892 let path = format!("table{}.column{}", i, j);
893 assert!(trie.resolve(&path).is_some(), "Missing path: {}", path);
894 }
895 }
896 }
897
898 #[test]
899 fn test_concurrent_trie_read_guard() {
900 let trie = ConcurrentPathTrie::new();
901
902 trie.insert("test.path", ColumnType::UInt64);
904
905 {
907 let guard1 = trie.begin_read();
908 let guard2 = trie.begin_read();
909
910 assert_eq!(trie.resolve("test.path"), Some(0));
911 assert_eq!(guard1.epoch(), guard2.epoch());
912
913 }
915
916 trie.update_min_reader_epoch();
918 }
919}