llkv_runtime/runtime_storage_namespace/
registry.rs1use std::sync::Arc;
9
10use rustc_hash::{FxHashMap, FxHashSet};
11
12use super::namespace::{
13 RuntimeNamespaceId, RuntimeStorageNamespace, RuntimeStorageNamespaceOps, TEMPORARY_NAMESPACE_ID,
14};
15
16struct NamespaceEntry {
17 ops: Arc<dyn RuntimeStorageNamespaceOps>,
18 erased: Arc<dyn std::any::Any + Send + Sync>,
19}
20
21impl NamespaceEntry {
22 fn new<N>(namespace: Arc<N>) -> Self
23 where
24 N: RuntimeStorageNamespace + 'static,
25 {
26 let ops: Arc<dyn RuntimeStorageNamespaceOps> = namespace.clone();
27 let erased: Arc<dyn std::any::Any + Send + Sync> = namespace;
28 Self { ops, erased }
29 }
30
31 fn ops(&self) -> Arc<dyn RuntimeStorageNamespaceOps> {
32 Arc::clone(&self.ops)
33 }
34
35 fn as_typed<N>(&self) -> Option<Arc<N>>
36 where
37 N: RuntimeStorageNamespace + 'static,
38 {
39 self.erased.clone().downcast::<N>().ok()
40 }
41}
42
43pub struct RuntimeStorageNamespaceRegistry {
45 persistent_id: RuntimeNamespaceId,
46 namespaces: FxHashMap<RuntimeNamespaceId, NamespaceEntry>,
47 schema_map: FxHashMap<String, RuntimeNamespaceId>,
48 priority: Vec<RuntimeNamespaceId>,
49 table_map: FxHashMap<String, RuntimeNamespaceId>,
50 namespace_tables: FxHashMap<RuntimeNamespaceId, FxHashSet<String>>,
51}
52
53impl RuntimeStorageNamespaceRegistry {
54 pub fn new(persistent_id: RuntimeNamespaceId) -> Self {
55 Self {
56 persistent_id: persistent_id.clone(),
57 namespaces: FxHashMap::default(),
58 schema_map: FxHashMap::default(),
59 priority: vec![persistent_id],
60 table_map: FxHashMap::default(),
61 namespace_tables: FxHashMap::default(),
62 }
63 }
64
65 pub fn persistent_id(&self) -> &str {
66 &self.persistent_id
67 }
68
69 pub fn register_namespace<N, I>(
70 &mut self,
71 namespace: Arc<N>,
72 schemas: I,
73 prefer_unqualified: bool,
74 ) where
75 N: RuntimeStorageNamespace + 'static,
76 I: IntoIterator<Item = String>,
77 {
78 let id = namespace.namespace_id().to_string();
79 let entry = NamespaceEntry::new(namespace);
80 for schema in schemas {
81 self.schema_map.insert(schema, id.clone());
82 }
83 if prefer_unqualified {
84 if !self.priority.iter().any(|existing| existing == &id) {
85 self.priority.insert(0, id.clone());
86 }
87 } else if !self.priority.iter().any(|existing| existing == &id) {
88 self.priority.push(id.clone());
89 }
90 self.namespace_tables.entry(id.clone()).or_default();
91 self.namespaces.insert(id, entry);
92 }
93
94 pub fn register_schema(&mut self, schema: impl Into<String>, namespace: RuntimeNamespaceId) {
95 self.schema_map.insert(schema.into(), namespace);
96 }
97
98 pub fn set_unqualified_priority(&mut self, order: Vec<RuntimeNamespaceId>) {
99 self.priority = order;
100 }
101
102 pub fn register_table(&mut self, namespace: &RuntimeNamespaceId, canonical: String) {
103 let namespace_id = namespace.clone();
104 self.table_map
105 .insert(canonical.clone(), namespace_id.clone());
106 self.namespace_tables
107 .entry(namespace_id)
108 .or_default()
109 .insert(canonical);
110 }
111
112 pub fn unregister_table(&mut self, canonical: &str) -> Option<RuntimeNamespaceId> {
113 if let Some(namespace_id) = self.table_map.remove(canonical) {
114 if let Some(tables) = self.namespace_tables.get_mut(&namespace_id) {
115 tables.remove(canonical);
116 }
117 Some(namespace_id)
118 } else {
119 None
120 }
121 }
122
123 pub fn drain_namespace_tables(&mut self, namespace: &RuntimeNamespaceId) -> Vec<String> {
124 let tables = self.namespace_tables.entry(namespace.clone()).or_default();
125 let drained: Vec<String> = tables.drain().collect();
126 for canonical in &drained {
127 self.table_map.remove(canonical);
128 }
129 drained
130 }
131
132 pub fn namespace_for_schema(&self, schema: &str) -> Option<&RuntimeNamespaceId> {
133 self.schema_map.get(schema)
134 }
135
136 pub fn namespace_ops(&self, id: &str) -> Option<Arc<dyn RuntimeStorageNamespaceOps>> {
137 self.namespaces.get(id).map(NamespaceEntry::ops)
138 }
139
140 pub fn namespace<N>(&self, id: &str) -> Option<Arc<N>>
141 where
142 N: RuntimeStorageNamespace + 'static,
143 {
144 self.namespaces
145 .get(id)
146 .and_then(|entry| entry.as_typed::<N>())
147 }
148
149 pub fn unqualified_order(&self) -> Box<dyn Iterator<Item = &RuntimeNamespaceId> + '_> {
150 if self.priority.is_empty() {
151 Box::new(std::iter::once(&self.persistent_id))
152 } else {
153 Box::new(self.priority.iter())
154 }
155 }
156
157 pub fn is_temporary_table(&self, canonical: &str) -> bool {
158 matches!(
159 self.table_map.get(canonical),
160 Some(namespace_id) if namespace_id == TEMPORARY_NAMESPACE_ID
161 )
162 }
163
164 pub fn namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
165 self.table_map
166 .get(canonical)
167 .cloned()
168 .unwrap_or_else(|| self.persistent_id.clone())
169 }
170}