1use dashmap::DashMap;
37use serde::{Deserialize, Serialize};
38use std::collections::HashMap;
39use std::sync::Arc;
40use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
41
42pub type ColumnId = u32;
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct TrieNode {
48 pub column_id: Option<ColumnId>,
50 pub column_type: Option<ColumnType>,
52 pub children: HashMap<String, Box<TrieNode>>,
54}
55
56impl TrieNode {
57 pub fn new() -> Self {
59 Self {
60 column_id: None,
61 column_type: None,
62 children: HashMap::new(),
63 }
64 }
65
66 pub fn leaf(column_id: ColumnId, column_type: ColumnType) -> Self {
68 Self {
69 column_id: Some(column_id),
70 column_type: Some(column_type),
71 children: HashMap::new(),
72 }
73 }
74
75 pub fn is_leaf(&self) -> bool {
77 self.column_id.is_some()
78 }
79
80 pub fn count_nodes(&self) -> usize {
82 1 + self
83 .children
84 .values()
85 .map(|c| c.count_nodes())
86 .sum::<usize>()
87 }
88}
89
90impl Default for TrieNode {
91 fn default() -> Self {
92 Self::new()
93 }
94}
95
96#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
98pub enum ColumnType {
99 Bool,
101 Int64,
103 UInt64,
105 Float64,
107 Text,
109 Binary,
111 Timestamp,
113 Nested,
115 Array,
117}
118
119impl ColumnType {
120 pub fn is_fixed_size(&self) -> bool {
122 matches!(
123 self,
124 ColumnType::Bool
125 | ColumnType::Int64
126 | ColumnType::UInt64
127 | ColumnType::Float64
128 | ColumnType::Timestamp
129 )
130 }
131
132 pub fn fixed_size(&self) -> Option<usize> {
134 match self {
135 ColumnType::Bool => Some(1),
136 ColumnType::Int64
137 | ColumnType::UInt64
138 | ColumnType::Float64
139 | ColumnType::Timestamp => Some(8),
140 _ => None,
141 }
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct PathTrie {
148 root: TrieNode,
150 total_columns: u32,
152 next_column_id: ColumnId,
154}
155
156impl PathTrie {
157 pub fn new() -> Self {
159 Self {
160 root: TrieNode::new(),
161 total_columns: 0,
162 next_column_id: 0,
163 }
164 }
165
166 pub fn insert(&mut self, path: &str, column_type: ColumnType) -> ColumnId {
171 let segments: Vec<&str> = path.split('.').collect();
172 let column_id = self.next_column_id;
173 self.next_column_id += 1;
174
175 let mut current = &mut self.root;
176
177 for (i, segment) in segments.iter().enumerate() {
178 let is_last = i == segments.len() - 1;
179
180 current = current
181 .children
182 .entry(segment.to_string())
183 .or_insert_with(|| Box::new(TrieNode::new()));
184
185 if is_last {
186 current.column_id = Some(column_id);
187 current.column_type = Some(column_type);
188 }
189 }
190
191 self.total_columns += 1;
192 column_id
193 }
194
195 pub fn resolve(&self, path: &str) -> Option<ColumnId> {
199 let segments: Vec<&str> = path.split('.').collect();
200 let mut current = &self.root;
201
202 for segment in segments {
203 current = current.children.get(segment)?;
204 }
205
206 current.column_id
207 }
208
209 pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
211 let segments: Vec<&str> = path.split('.').collect();
212 let mut current = &self.root;
213
214 for segment in segments {
215 current = current.children.get(segment)?;
216 }
217
218 Some((current.column_id?, current.column_type?))
219 }
220
221 pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
225 let mut results = Vec::new();
226
227 let segments: Vec<&str> = if prefix.is_empty() {
229 vec![]
230 } else {
231 prefix.split('.').collect()
232 };
233
234 let mut current = &self.root;
235 for segment in &segments {
236 if let Some(child) = current.children.get(*segment) {
237 current = child;
238 } else {
239 return results;
240 }
241 }
242
243 self.collect_paths(current, prefix.to_string(), &mut results);
245 results
246 }
247
248 #[allow(clippy::only_used_in_recursion)]
249 fn collect_paths(&self, node: &TrieNode, path: String, results: &mut Vec<(String, ColumnId)>) {
250 if let Some(col_id) = node.column_id {
251 results.push((path.clone(), col_id));
252 }
253
254 for (segment, child) in &node.children {
255 let child_path = if path.is_empty() {
256 segment.clone()
257 } else {
258 format!("{}.{}", path, segment)
259 };
260 self.collect_paths(child, child_path, results);
261 }
262 }
263
264 pub fn total_columns(&self) -> u32 {
266 self.total_columns
267 }
268
269 pub fn total_nodes(&self) -> usize {
271 self.root.count_nodes()
272 }
273
274 pub fn memory_bytes(&self) -> usize {
276 self.total_nodes() * 100
278 }
279}
280
281impl Default for PathTrie {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287#[derive(Debug, Clone)]
294pub struct ColumnGroupAffinity {
295 pub type_groups: HashMap<ColumnType, Vec<ColumnId>>,
297 pub access_frequency: HashMap<ColumnId, u64>,
299 pub null_density: HashMap<ColumnId, f64>,
301}
302
303impl ColumnGroupAffinity {
304 pub fn from_trie(trie: &PathTrie) -> Self {
306 let mut type_groups: HashMap<ColumnType, Vec<ColumnId>> = HashMap::new();
307
308 fn collect_columns(node: &TrieNode, groups: &mut HashMap<ColumnType, Vec<ColumnId>>) {
309 if let (Some(col_id), Some(col_type)) = (node.column_id, node.column_type) {
310 groups.entry(col_type).or_default().push(col_id);
311 }
312 for child in node.children.values() {
313 collect_columns(child, groups);
314 }
315 }
316
317 collect_columns(&trie.root, &mut type_groups);
318
319 Self {
320 type_groups,
321 access_frequency: HashMap::new(),
322 null_density: HashMap::new(),
323 }
324 }
325
326 pub fn record_access(&mut self, column_id: ColumnId) {
328 *self.access_frequency.entry(column_id).or_insert(0) += 1;
329 }
330
331 pub fn update_null_density(&mut self, column_id: ColumnId, null_count: u64, total_count: u64) {
333 if total_count > 0 {
334 self.null_density
335 .insert(column_id, null_count as f64 / total_count as f64);
336 }
337 }
338
339 pub fn simd_groups(&self) -> Vec<(ColumnType, Vec<ColumnId>)> {
341 self.type_groups
342 .iter()
343 .filter(|(t, _)| t.is_fixed_size())
344 .map(|(t, cols)| (*t, cols.clone()))
345 .collect()
346 }
347
348 pub fn sparse_columns(&self, threshold: f64) -> Vec<ColumnId> {
350 self.null_density
351 .iter()
352 .filter(|(_, density)| **density > threshold)
353 .map(|(col_id, _)| *col_id)
354 .collect()
355 }
356
357 pub fn hot_columns(&self, n: usize) -> Vec<ColumnId> {
359 let mut sorted: Vec<_> = self.access_frequency.iter().collect();
360 sorted.sort_by(|a, b| b.1.cmp(a.1));
361 sorted
362 .into_iter()
363 .take(n)
364 .map(|(col_id, _)| *col_id)
365 .collect()
366 }
367}
368
369#[derive(Debug)]
375pub struct ConcurrentTrieNode {
376 pub column_id: Option<ColumnId>,
378 pub column_type: Option<ColumnType>,
380 pub children: DashMap<String, Arc<ConcurrentTrieNode>>,
382 pub created_epoch: u64,
384}
385
386impl ConcurrentTrieNode {
387 pub fn new(epoch: u64) -> Self {
389 Self {
390 column_id: None,
391 column_type: None,
392 children: DashMap::new(),
393 created_epoch: epoch,
394 }
395 }
396
397 pub fn leaf(column_id: ColumnId, column_type: ColumnType, epoch: u64) -> Self {
399 Self {
400 column_id: Some(column_id),
401 column_type: Some(column_type),
402 children: DashMap::new(),
403 created_epoch: epoch,
404 }
405 }
406
407 pub fn is_leaf(&self) -> bool {
409 self.column_id.is_some()
410 }
411
412 pub fn count_nodes(&self) -> usize {
414 1 + self
415 .children
416 .iter()
417 .map(|r| r.value().count_nodes())
418 .sum::<usize>()
419 }
420}
421
422#[derive(Debug)]
435pub struct ConcurrentPathTrie {
436 root: Arc<ConcurrentTrieNode>,
438 total_columns: AtomicU32,
440 next_column_id: AtomicU32,
442 current_epoch: AtomicU64,
444 min_reader_epoch: AtomicU64,
446 reader_epochs: DashMap<u64, AtomicU32>,
448}
449
450impl ConcurrentPathTrie {
451 pub fn new() -> Self {
453 Self {
454 root: Arc::new(ConcurrentTrieNode::new(0)),
455 total_columns: AtomicU32::new(0),
456 next_column_id: AtomicU32::new(0),
457 current_epoch: AtomicU64::new(1),
458 min_reader_epoch: AtomicU64::new(0),
459 reader_epochs: DashMap::new(),
460 }
461 }
462
463 pub fn current_epoch(&self) -> u64 {
465 self.current_epoch.load(Ordering::Acquire)
466 }
467
468 pub fn advance_epoch(&self) -> u64 {
470 self.current_epoch.fetch_add(1, Ordering::AcqRel)
471 }
472
473 pub fn begin_read(&self) -> ReadGuard<'_> {
476 let epoch = self.current_epoch.load(Ordering::Acquire);
477
478 self.reader_epochs
480 .entry(epoch)
481 .or_insert_with(|| AtomicU32::new(0))
482 .fetch_add(1, Ordering::Relaxed);
483
484 ReadGuard { trie: self, epoch }
485 }
486
487 pub fn insert(&self, path: &str, column_type: ColumnType) -> ColumnId {
492 let segments: Vec<&str> = path.split('.').collect();
493 let column_id = self.next_column_id.fetch_add(1, Ordering::Relaxed);
494 let epoch = self.current_epoch.load(Ordering::Acquire);
495
496 let mut current = self.root.clone();
497
498 for (i, segment) in segments.iter().enumerate() {
499 let is_last = i == segments.len() - 1;
500
501 if is_last {
502 let leaf = Arc::new(ConcurrentTrieNode::leaf(column_id, column_type, epoch));
504 current.children.insert(segment.to_string(), leaf);
505 } else {
506 let next = current
508 .children
509 .entry(segment.to_string())
510 .or_insert_with(|| Arc::new(ConcurrentTrieNode::new(epoch)))
511 .clone();
512 current = next;
513 }
514 }
515
516 self.total_columns.fetch_add(1, Ordering::Relaxed);
517 column_id
518 }
519
520 pub fn resolve(&self, path: &str) -> Option<ColumnId> {
524 let segments: Vec<&str> = path.split('.').collect();
525 let mut current = self.root.clone();
526
527 for segment in segments {
528 let next = current.children.get(segment)?.clone();
529 current = next;
530 }
531
532 current.column_id
533 }
534
535 pub fn resolve_with_type(&self, path: &str) -> Option<(ColumnId, ColumnType)> {
537 let segments: Vec<&str> = path.split('.').collect();
538 let mut current = self.root.clone();
539
540 for segment in segments {
541 let next = current.children.get(segment)?.clone();
542 current = next;
543 }
544
545 Some((current.column_id?, current.column_type?))
546 }
547
548 pub fn prefix_match(&self, prefix: &str) -> Vec<(String, ColumnId)> {
550 let mut results = Vec::new();
551
552 let segments: Vec<&str> = if prefix.is_empty() {
553 vec![]
554 } else {
555 prefix.split('.').collect()
556 };
557
558 let mut current = self.root.clone();
559 for segment in &segments {
560 let next = match current.children.get(*segment) {
561 Some(child) => child.clone(),
562 None => return results,
563 };
564 current = next;
565 }
566
567 self.collect_paths(¤t, prefix.to_string(), &mut results);
568 results
569 }
570
571 #[allow(clippy::only_used_in_recursion)]
572 fn collect_paths(
573 &self,
574 node: &ConcurrentTrieNode,
575 path: String,
576 results: &mut Vec<(String, ColumnId)>,
577 ) {
578 if let Some(col_id) = node.column_id {
579 results.push((path.clone(), col_id));
580 }
581
582 for entry in node.children.iter() {
583 let child_path = if path.is_empty() {
584 entry.key().clone()
585 } else {
586 format!("{}.{}", path, entry.key())
587 };
588 self.collect_paths(entry.value(), child_path, results);
589 }
590 }
591
592 pub fn total_columns(&self) -> u32 {
594 self.total_columns.load(Ordering::Relaxed)
595 }
596
597 pub fn total_nodes(&self) -> usize {
599 self.root.count_nodes()
600 }
601
602 pub fn update_min_reader_epoch(&self) {
604 let mut min_epoch = self.current_epoch.load(Ordering::Acquire);
605
606 for entry in self.reader_epochs.iter() {
608 if entry.value().load(Ordering::Relaxed) > 0 && *entry.key() < min_epoch {
609 min_epoch = *entry.key();
610 }
611 }
612
613 self.min_reader_epoch.store(min_epoch, Ordering::Release);
614
615 let threshold = min_epoch.saturating_sub(10);
617 self.reader_epochs
618 .retain(|epoch, count| *epoch >= threshold || count.load(Ordering::Relaxed) > 0);
619 }
620
621 pub fn min_reader_epoch(&self) -> u64 {
623 self.min_reader_epoch.load(Ordering::Acquire)
624 }
625}
626
627impl Default for ConcurrentPathTrie {
628 fn default() -> Self {
629 Self::new()
630 }
631}
632
633unsafe impl Send for ConcurrentPathTrie {}
635unsafe impl Sync for ConcurrentPathTrie {}
636
637pub struct ReadGuard<'a> {
640 trie: &'a ConcurrentPathTrie,
641 epoch: u64,
642}
643
644impl<'a> ReadGuard<'a> {
645 pub fn epoch(&self) -> u64 {
647 self.epoch
648 }
649}
650
651impl<'a> Drop for ReadGuard<'a> {
652 fn drop(&mut self) {
653 if let Some(count) = self.trie.reader_epochs.get(&self.epoch) {
654 count.fetch_sub(1, Ordering::Relaxed);
655 }
656 }
657}
658
659#[cfg(test)]
660mod tests {
661 use super::*;
662
663 #[test]
664 fn test_path_trie_insert_resolve() {
665 let mut trie = PathTrie::new();
666
667 let id1 = trie.insert("users.id", ColumnType::UInt64);
668 let id2 = trie.insert("users.name", ColumnType::Text);
669 let id3 = trie.insert("users.profile.email", ColumnType::Text);
670 let id4 = trie.insert("users.profile.settings.theme", ColumnType::Text);
671
672 assert_eq!(trie.resolve("users.id"), Some(id1));
673 assert_eq!(trie.resolve("users.name"), Some(id2));
674 assert_eq!(trie.resolve("users.profile.email"), Some(id3));
675 assert_eq!(trie.resolve("users.profile.settings.theme"), Some(id4));
676 assert_eq!(trie.resolve("nonexistent"), None);
677 assert_eq!(trie.resolve("users.profile"), None); assert_eq!(trie.total_columns(), 4);
680 }
681
682 #[test]
683 fn test_path_trie_prefix_match() {
684 let mut trie = PathTrie::new();
685
686 trie.insert("users.id", ColumnType::UInt64);
687 trie.insert("users.name", ColumnType::Text);
688 trie.insert("users.profile.email", ColumnType::Text);
689 trie.insert("orders.id", ColumnType::UInt64);
690
691 let user_cols = trie.prefix_match("users");
692 assert_eq!(user_cols.len(), 3);
693
694 let profile_cols = trie.prefix_match("users.profile");
695 assert_eq!(profile_cols.len(), 1);
696
697 let all_cols = trie.prefix_match("");
698 assert_eq!(all_cols.len(), 4);
699 }
700
701 #[test]
702 fn test_resolve_with_type() {
703 let mut trie = PathTrie::new();
704
705 trie.insert("score", ColumnType::Float64);
706 trie.insert("name", ColumnType::Text);
707
708 let (id, col_type) = trie.resolve_with_type("score").unwrap();
709 assert_eq!(id, 0);
710 assert_eq!(col_type, ColumnType::Float64);
711
712 let (id, col_type) = trie.resolve_with_type("name").unwrap();
713 assert_eq!(id, 1);
714 assert_eq!(col_type, ColumnType::Text);
715 }
716
717 #[test]
718 fn test_column_group_affinity() {
719 let mut trie = PathTrie::new();
720
721 trie.insert("id", ColumnType::UInt64);
722 trie.insert("score", ColumnType::Float64);
723 trie.insert("age", ColumnType::Int64);
724 trie.insert("name", ColumnType::Text);
725 trie.insert("timestamp", ColumnType::Timestamp);
726
727 let mut affinity = ColumnGroupAffinity::from_trie(&trie);
728
729 let simd = affinity.simd_groups();
731 assert!(!simd.is_empty());
732
733 affinity.record_access(0);
735 affinity.record_access(0);
736 affinity.record_access(1);
737
738 let hot = affinity.hot_columns(2);
739 assert_eq!(hot.len(), 2);
740 assert_eq!(hot[0], 0); affinity.update_null_density(3, 90, 100);
744 let sparse = affinity.sparse_columns(0.5);
745 assert_eq!(sparse, vec![3]);
746 }
747
748 #[test]
749 fn test_memory_estimate() {
750 let mut trie = PathTrie::new();
751
752 for i in 0..100 {
753 trie.insert(
754 &format!("table{}.column{}", i / 10, i % 10),
755 ColumnType::UInt64,
756 );
757 }
758
759 assert!(trie.total_nodes() > 100);
761 assert!(trie.memory_bytes() > 10000); }
763
764 #[test]
769 fn test_concurrent_trie_basic() {
770 let trie = ConcurrentPathTrie::new();
771
772 let id1 = trie.insert("users.id", ColumnType::UInt64);
773 let id2 = trie.insert("users.name", ColumnType::Text);
774 let id3 = trie.insert("users.profile.email", ColumnType::Text);
775
776 assert_eq!(trie.resolve("users.id"), Some(id1));
777 assert_eq!(trie.resolve("users.name"), Some(id2));
778 assert_eq!(trie.resolve("users.profile.email"), Some(id3));
779 assert_eq!(trie.resolve("nonexistent"), None);
780
781 assert_eq!(trie.total_columns(), 3);
782 }
783
784 #[test]
785 fn test_concurrent_trie_resolve_with_type() {
786 let trie = ConcurrentPathTrie::new();
787
788 trie.insert("score", ColumnType::Float64);
789 trie.insert("name", ColumnType::Text);
790
791 let (id, col_type) = trie.resolve_with_type("score").unwrap();
792 assert_eq!(id, 0);
793 assert_eq!(col_type, ColumnType::Float64);
794
795 let (id, col_type) = trie.resolve_with_type("name").unwrap();
796 assert_eq!(id, 1);
797 assert_eq!(col_type, ColumnType::Text);
798 }
799
800 #[test]
801 fn test_concurrent_trie_prefix_match() {
802 let trie = ConcurrentPathTrie::new();
803
804 trie.insert("users.id", ColumnType::UInt64);
805 trie.insert("users.name", ColumnType::Text);
806 trie.insert("users.profile.email", ColumnType::Text);
807 trie.insert("orders.id", ColumnType::UInt64);
808
809 let user_cols = trie.prefix_match("users");
810 assert_eq!(user_cols.len(), 3);
811
812 let all_cols = trie.prefix_match("");
813 assert_eq!(all_cols.len(), 4);
814 }
815
816 #[test]
817 fn test_concurrent_trie_epoch_management() {
818 let trie = ConcurrentPathTrie::new();
819
820 assert_eq!(trie.current_epoch(), 1);
821
822 let old = trie.advance_epoch();
824 assert_eq!(old, 1);
825 assert_eq!(trie.current_epoch(), 2);
826
827 let guard = trie.begin_read();
829 assert_eq!(guard.epoch(), 2);
830
831 trie.advance_epoch();
833 assert_eq!(trie.current_epoch(), 3);
834
835 assert_eq!(guard.epoch(), 2);
837
838 drop(guard);
840 trie.update_min_reader_epoch();
841 }
842
843 #[test]
844 fn test_concurrent_trie_multithreaded() {
845 use std::sync::Arc;
846 use std::thread;
847
848 let trie = Arc::new(ConcurrentPathTrie::new());
849 let mut handles = vec![];
850
851 for i in 0..4 {
853 let trie = Arc::clone(&trie);
854 let handle = thread::spawn(move || {
855 for j in 0..25 {
856 let path = format!("table{}.column{}", i, j);
857 trie.insert(&path, ColumnType::UInt64);
858 }
859 });
860 handles.push(handle);
861 }
862
863 for _ in 0..4 {
865 let trie = Arc::clone(&trie);
866 let handle = thread::spawn(move || {
867 let _guard = trie.begin_read();
868 for i in 0..4 {
870 for j in 0..25 {
871 let path = format!("table{}.column{}", i, j);
872 let _ = trie.resolve(&path);
873 }
874 }
875 });
876 handles.push(handle);
877 }
878
879 for handle in handles {
880 handle.join().unwrap();
881 }
882
883 assert_eq!(trie.total_columns(), 100);
885
886 for i in 0..4 {
888 for j in 0..25 {
889 let path = format!("table{}.column{}", i, j);
890 assert!(trie.resolve(&path).is_some(), "Missing path: {}", path);
891 }
892 }
893 }
894
895 #[test]
896 fn test_concurrent_trie_read_guard() {
897 let trie = ConcurrentPathTrie::new();
898
899 trie.insert("test.path", ColumnType::UInt64);
901
902 {
904 let guard1 = trie.begin_read();
905 let guard2 = trie.begin_read();
906
907 assert_eq!(trie.resolve("test.path"), Some(0));
908 assert_eq!(guard1.epoch(), guard2.epoch());
909
910 }
912
913 trie.update_min_reader_epoch();
915 }
916}