reifydb_catalog/materialized/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4mod dictionary;
5mod flow;
6pub mod load;
7mod namespace;
8mod operator_retention_policy;
9mod primary_key;
10mod source_retention_policy;
11mod table;
12mod view;
13
14use std::sync::Arc;
15
16use crossbeam_skiplist::SkipMap;
17use reifydb_core::{
18	interface::{
19		DictionaryDef, DictionaryId, FlowDef, FlowId, FlowNodeId, NamespaceDef, NamespaceId, PrimaryKeyDef,
20		PrimaryKeyId, SourceId, TableDef, TableId, TableVirtualDef, TableVirtualId, ViewDef, ViewId,
21	},
22	retention::RetentionPolicy,
23	util::MultiVersionContainer,
24};
25
26use crate::system::SystemCatalog;
27
28pub type MultiVersionNamespaceDef = MultiVersionContainer<NamespaceDef>;
29pub type MultiVersionTableDef = MultiVersionContainer<TableDef>;
30pub type MultiVersionViewDef = MultiVersionContainer<ViewDef>;
31pub type MultiVersionFlowDef = MultiVersionContainer<FlowDef>;
32pub type MultiVersionPrimaryKeyDef = MultiVersionContainer<PrimaryKeyDef>;
33pub type MultiVersionRetentionPolicy = MultiVersionContainer<RetentionPolicy>;
34pub type MultiVersionDictionaryDef = MultiVersionContainer<DictionaryDef>;
35
36/// A materialized catalog that stores multi namespace, store::table, and view
37/// definitions. This provides fast O(1) lookups for catalog metadata without
38/// hitting storage.
39#[derive(Debug, Clone)]
40pub struct MaterializedCatalog(Arc<MaterializedCatalogInner>);
41
42#[derive(Debug)]
43pub struct MaterializedCatalogInner {
44	/// MultiVersion namespace definitions indexed by namespace ID
45	pub(crate) namespaces: SkipMap<NamespaceId, MultiVersionNamespaceDef>,
46	/// Index from namespace name to namespace ID for fast name lookups
47	pub(crate) namespaces_by_name: SkipMap<String, NamespaceId>,
48
49	/// MultiVersion table definitions indexed by table ID
50	pub(crate) tables: SkipMap<TableId, MultiVersionTableDef>,
51	/// Index from (namespace_id, table_name) to table ID for fast name
52	/// lookups
53	pub(crate) tables_by_name: SkipMap<(NamespaceId, String), TableId>,
54
55	/// MultiVersion view definitions indexed by view ID
56	pub(crate) views: SkipMap<ViewId, MultiVersionViewDef>,
57	/// Index from (namespace_id, view_name) to view ID for fast name
58	/// lookups
59	pub(crate) views_by_name: SkipMap<(NamespaceId, String), ViewId>,
60
61	/// MultiVersion flow definitions indexed by flow ID
62	pub(crate) flows: SkipMap<FlowId, MultiVersionFlowDef>,
63	/// Index from (namespace_id, flow_name) to flow ID for fast name
64	/// lookups
65	pub(crate) flows_by_name: SkipMap<(NamespaceId, String), FlowId>,
66
67	/// MultiVersion primary key definitions indexed by primary key ID
68	pub(crate) primary_keys: SkipMap<PrimaryKeyId, MultiVersionPrimaryKeyDef>,
69
70	/// MultiVersion source retention policies indexed by source ID
71	pub(crate) source_retention_policies: SkipMap<SourceId, MultiVersionRetentionPolicy>,
72
73	/// MultiVersion operator retention policies indexed by operator ID
74	pub(crate) operator_retention_policies: SkipMap<FlowNodeId, MultiVersionRetentionPolicy>,
75
76	/// MultiVersion dictionary definitions indexed by dictionary ID
77	pub(crate) dictionaries: SkipMap<DictionaryId, MultiVersionDictionaryDef>,
78
79	/// Index from (namespace_id, dictionary_name) to dictionary ID for fast name lookups
80	pub(crate) dictionaries_by_name: SkipMap<(NamespaceId, String), DictionaryId>,
81
82	/// User-defined virtual table definitions indexed by ID
83	pub(crate) table_virtual_user: SkipMap<TableVirtualId, Arc<TableVirtualDef>>,
84	/// Index from (namespace_id, table_name) to virtual table ID for fast name lookups
85	pub(crate) table_virtual_user_by_name: SkipMap<(NamespaceId, String), TableVirtualId>,
86
87	/// System catalog with version information (None until initialized)
88	pub(crate) system_catalog: Option<SystemCatalog>,
89}
90
91impl std::ops::Deref for MaterializedCatalog {
92	type Target = MaterializedCatalogInner;
93
94	fn deref(&self) -> &Self::Target {
95		&self.0
96	}
97}
98
99impl Default for MaterializedCatalog {
100	fn default() -> Self {
101		Self::new()
102	}
103}
104
105impl MaterializedCatalog {
106	pub fn new() -> Self {
107		let system_namespace = NamespaceDef::system();
108		let system_namespace_id = system_namespace.id;
109
110		let namespaces = SkipMap::new();
111		let container = MultiVersionContainer::new();
112		container.insert(1, system_namespace);
113		namespaces.insert(system_namespace_id, container);
114
115		let namespaces_by_name = SkipMap::new();
116		namespaces_by_name.insert("system".to_string(), system_namespace_id);
117
118		Self(Arc::new(MaterializedCatalogInner {
119			namespaces,
120			namespaces_by_name,
121			tables: SkipMap::new(),
122			tables_by_name: SkipMap::new(),
123			views: SkipMap::new(),
124			views_by_name: SkipMap::new(),
125			flows: SkipMap::new(),
126			flows_by_name: SkipMap::new(),
127			primary_keys: SkipMap::new(),
128			source_retention_policies: SkipMap::new(),
129			operator_retention_policies: SkipMap::new(),
130			dictionaries: SkipMap::new(),
131			dictionaries_by_name: SkipMap::new(),
132			table_virtual_user: SkipMap::new(),
133			table_virtual_user_by_name: SkipMap::new(),
134			system_catalog: None,
135		}))
136	}
137
138	/// Set the system catalog (called once during database initialization)
139	pub fn set_system_catalog(&self, catalog: SystemCatalog) {
140		// Use unsafe to mutate through Arc (safe because only called
141		// once during init)
142		unsafe {
143			let inner = Arc::as_ptr(&self.0) as *mut MaterializedCatalogInner;
144			(*inner).system_catalog = Some(catalog);
145		}
146	}
147
148	/// Get the system catalog
149	pub fn system_catalog(&self) -> Option<&SystemCatalog> {
150		self.0.system_catalog.as_ref()
151	}
152
153	/// Register a user-defined virtual table
154	///
155	/// Returns an error if a virtual table with the same name already exists in the namespace.
156	pub fn register_table_virtual_user(&self, def: Arc<TableVirtualDef>) -> crate::Result<()> {
157		let key = (def.namespace, def.name.clone());
158
159		// Check if already exists
160		if self.table_virtual_user_by_name.contains_key(&key) {
161			// Get namespace name for error message
162			let ns_name = self
163				.namespaces
164				.get(&def.namespace)
165				.map(|e| e.value().get_latest().map(|n| n.name.clone()).unwrap_or_default())
166				.unwrap_or_else(|| format!("{}", def.namespace.0));
167			return Err(reifydb_type::Error(
168				reifydb_type::diagnostic::catalog::virtual_table_already_exists(&ns_name, &def.name),
169			));
170		}
171
172		self.table_virtual_user.insert(def.id, def.clone());
173		self.table_virtual_user_by_name.insert(key, def.id);
174		Ok(())
175	}
176
177	/// Unregister a user-defined virtual table by namespace and name
178	pub fn unregister_table_virtual_user(&self, namespace: NamespaceId, name: &str) -> crate::Result<()> {
179		let key = (namespace, name.to_string());
180
181		if let Some(entry) = self.table_virtual_user_by_name.remove(&key) {
182			self.table_virtual_user.remove(entry.value());
183			Ok(())
184		} else {
185			// Get namespace name for error message
186			let ns_name = self
187				.namespaces
188				.get(&namespace)
189				.map(|e| e.value().get_latest().map(|n| n.name.clone()).unwrap_or_default())
190				.unwrap_or_else(|| format!("{}", namespace.0));
191			Err(reifydb_type::Error(reifydb_type::diagnostic::catalog::virtual_table_not_found(
192				&ns_name, name,
193			)))
194		}
195	}
196
197	/// Find a user-defined virtual table by namespace and name
198	pub fn find_table_virtual_user_by_name(
199		&self,
200		namespace: NamespaceId,
201		name: &str,
202	) -> Option<Arc<TableVirtualDef>> {
203		let key = (namespace, name.to_string());
204		self.table_virtual_user_by_name
205			.get(&key)
206			.and_then(|entry| self.table_virtual_user.get(entry.value()).map(|e| e.value().clone()))
207	}
208
209	/// Find a user-defined virtual table by ID
210	pub fn find_table_virtual_user(&self, id: TableVirtualId) -> Option<Arc<TableVirtualDef>> {
211		self.table_virtual_user.get(&id).map(|e| e.value().clone())
212	}
213
214	/// List all user-defined virtual tables in a namespace
215	pub fn list_table_virtual_user_in_namespace(&self, namespace: NamespaceId) -> Vec<Arc<TableVirtualDef>> {
216		self.table_virtual_user
217			.iter()
218			.filter(|e| e.value().namespace == namespace)
219			.map(|e| e.value().clone())
220			.collect()
221	}
222
223	/// List all user-defined virtual tables
224	pub fn list_table_virtual_user_all(&self) -> Vec<Arc<TableVirtualDef>> {
225		self.table_virtual_user.iter().map(|e| e.value().clone()).collect()
226	}
227}