reifydb_catalog/materialized/
mod.rs1mod dictionary;
5mod flow;
6pub mod load;
7mod namespace;
8mod operator_retention_policy;
9mod primary_key;
10mod source_retention_policy;
11mod table;
12mod view;
13
14use std::sync::Arc;
15
16use crossbeam_skiplist::SkipMap;
17use reifydb_core::{
18 interface::{
19 DictionaryDef, DictionaryId, FlowDef, FlowId, FlowNodeId, NamespaceDef, NamespaceId, PrimaryKeyDef,
20 PrimaryKeyId, SourceId, TableDef, TableId, TableVirtualDef, TableVirtualId, ViewDef, ViewId,
21 },
22 retention::RetentionPolicy,
23 util::MultiVersionContainer,
24};
25
26use crate::system::SystemCatalog;
27
28pub type MultiVersionNamespaceDef = MultiVersionContainer<NamespaceDef>;
29pub type MultiVersionTableDef = MultiVersionContainer<TableDef>;
30pub type MultiVersionViewDef = MultiVersionContainer<ViewDef>;
31pub type MultiVersionFlowDef = MultiVersionContainer<FlowDef>;
32pub type MultiVersionPrimaryKeyDef = MultiVersionContainer<PrimaryKeyDef>;
33pub type MultiVersionRetentionPolicy = MultiVersionContainer<RetentionPolicy>;
34pub type MultiVersionDictionaryDef = MultiVersionContainer<DictionaryDef>;
35
36#[derive(Debug, Clone)]
40pub struct MaterializedCatalog(Arc<MaterializedCatalogInner>);
41
42#[derive(Debug)]
43pub struct MaterializedCatalogInner {
44 pub(crate) namespaces: SkipMap<NamespaceId, MultiVersionNamespaceDef>,
46 pub(crate) namespaces_by_name: SkipMap<String, NamespaceId>,
48
49 pub(crate) tables: SkipMap<TableId, MultiVersionTableDef>,
51 pub(crate) tables_by_name: SkipMap<(NamespaceId, String), TableId>,
54
55 pub(crate) views: SkipMap<ViewId, MultiVersionViewDef>,
57 pub(crate) views_by_name: SkipMap<(NamespaceId, String), ViewId>,
60
61 pub(crate) flows: SkipMap<FlowId, MultiVersionFlowDef>,
63 pub(crate) flows_by_name: SkipMap<(NamespaceId, String), FlowId>,
66
67 pub(crate) primary_keys: SkipMap<PrimaryKeyId, MultiVersionPrimaryKeyDef>,
69
70 pub(crate) source_retention_policies: SkipMap<SourceId, MultiVersionRetentionPolicy>,
72
73 pub(crate) operator_retention_policies: SkipMap<FlowNodeId, MultiVersionRetentionPolicy>,
75
76 pub(crate) dictionaries: SkipMap<DictionaryId, MultiVersionDictionaryDef>,
78
79 pub(crate) dictionaries_by_name: SkipMap<(NamespaceId, String), DictionaryId>,
81
82 pub(crate) table_virtual_user: SkipMap<TableVirtualId, Arc<TableVirtualDef>>,
84 pub(crate) table_virtual_user_by_name: SkipMap<(NamespaceId, String), TableVirtualId>,
86
87 pub(crate) system_catalog: Option<SystemCatalog>,
89}
90
91impl std::ops::Deref for MaterializedCatalog {
92 type Target = MaterializedCatalogInner;
93
94 fn deref(&self) -> &Self::Target {
95 &self.0
96 }
97}
98
99impl Default for MaterializedCatalog {
100 fn default() -> Self {
101 Self::new()
102 }
103}
104
105impl MaterializedCatalog {
106 pub fn new() -> Self {
107 let system_namespace = NamespaceDef::system();
108 let system_namespace_id = system_namespace.id;
109
110 let namespaces = SkipMap::new();
111 let container = MultiVersionContainer::new();
112 container.insert(1, system_namespace);
113 namespaces.insert(system_namespace_id, container);
114
115 let namespaces_by_name = SkipMap::new();
116 namespaces_by_name.insert("system".to_string(), system_namespace_id);
117
118 Self(Arc::new(MaterializedCatalogInner {
119 namespaces,
120 namespaces_by_name,
121 tables: SkipMap::new(),
122 tables_by_name: SkipMap::new(),
123 views: SkipMap::new(),
124 views_by_name: SkipMap::new(),
125 flows: SkipMap::new(),
126 flows_by_name: SkipMap::new(),
127 primary_keys: SkipMap::new(),
128 source_retention_policies: SkipMap::new(),
129 operator_retention_policies: SkipMap::new(),
130 dictionaries: SkipMap::new(),
131 dictionaries_by_name: SkipMap::new(),
132 table_virtual_user: SkipMap::new(),
133 table_virtual_user_by_name: SkipMap::new(),
134 system_catalog: None,
135 }))
136 }
137
138 pub fn set_system_catalog(&self, catalog: SystemCatalog) {
140 unsafe {
143 let inner = Arc::as_ptr(&self.0) as *mut MaterializedCatalogInner;
144 (*inner).system_catalog = Some(catalog);
145 }
146 }
147
148 pub fn system_catalog(&self) -> Option<&SystemCatalog> {
150 self.0.system_catalog.as_ref()
151 }
152
153 pub fn register_table_virtual_user(&self, def: Arc<TableVirtualDef>) -> crate::Result<()> {
157 let key = (def.namespace, def.name.clone());
158
159 if self.table_virtual_user_by_name.contains_key(&key) {
161 let ns_name = self
163 .namespaces
164 .get(&def.namespace)
165 .map(|e| e.value().get_latest().map(|n| n.name.clone()).unwrap_or_default())
166 .unwrap_or_else(|| format!("{}", def.namespace.0));
167 return Err(reifydb_type::Error(
168 reifydb_type::diagnostic::catalog::virtual_table_already_exists(&ns_name, &def.name),
169 ));
170 }
171
172 self.table_virtual_user.insert(def.id, def.clone());
173 self.table_virtual_user_by_name.insert(key, def.id);
174 Ok(())
175 }
176
177 pub fn unregister_table_virtual_user(&self, namespace: NamespaceId, name: &str) -> crate::Result<()> {
179 let key = (namespace, name.to_string());
180
181 if let Some(entry) = self.table_virtual_user_by_name.remove(&key) {
182 self.table_virtual_user.remove(entry.value());
183 Ok(())
184 } else {
185 let ns_name = self
187 .namespaces
188 .get(&namespace)
189 .map(|e| e.value().get_latest().map(|n| n.name.clone()).unwrap_or_default())
190 .unwrap_or_else(|| format!("{}", namespace.0));
191 Err(reifydb_type::Error(reifydb_type::diagnostic::catalog::virtual_table_not_found(
192 &ns_name, name,
193 )))
194 }
195 }
196
197 pub fn find_table_virtual_user_by_name(
199 &self,
200 namespace: NamespaceId,
201 name: &str,
202 ) -> Option<Arc<TableVirtualDef>> {
203 let key = (namespace, name.to_string());
204 self.table_virtual_user_by_name
205 .get(&key)
206 .and_then(|entry| self.table_virtual_user.get(entry.value()).map(|e| e.value().clone()))
207 }
208
209 pub fn find_table_virtual_user(&self, id: TableVirtualId) -> Option<Arc<TableVirtualDef>> {
211 self.table_virtual_user.get(&id).map(|e| e.value().clone())
212 }
213
214 pub fn list_table_virtual_user_in_namespace(&self, namespace: NamespaceId) -> Vec<Arc<TableVirtualDef>> {
216 self.table_virtual_user
217 .iter()
218 .filter(|e| e.value().namespace == namespace)
219 .map(|e| e.value().clone())
220 .collect()
221 }
222
223 pub fn list_table_virtual_user_all(&self) -> Vec<Arc<TableVirtualDef>> {
225 self.table_virtual_user.iter().map(|e| e.value().clone()).collect()
226 }
227}