1use std::any::Any;
2use std::sync::{Arc, RwLock};
3
4use llkv_executor::ExecutorTable;
5use llkv_plan::{CreateIndexPlan, CreateTablePlan, PlanOperation};
6use llkv_result::Error;
7use llkv_storage::pager::Pager;
8use llkv_transaction::TransactionResult;
9use rustc_hash::{FxHashMap, FxHashSet};
10use simd_r_drive_entry_handle::EntryHandle;
11
12use crate::{RuntimeContext, RuntimeStatementResult};
13use llkv_table::canonical_table_name;
14
15pub type NamespaceId = String;
16
17pub const PERSISTENT_NAMESPACE_ID: &str = "main";
19pub const TEMPORARY_NAMESPACE_ID: &str = "temp";
20
21pub trait StorageNamespace: Send + Sync + 'static {
28 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
29
30 fn namespace_id(&self) -> &NamespaceId;
32
33 fn context(&self) -> Arc<RuntimeContext<Self::Pager>>;
35
36 fn create_table(
38 &self,
39 plan: CreateTablePlan,
40 ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
41
42 fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()>;
44
45 fn create_index(
47 &self,
48 plan: CreateIndexPlan,
49 ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
50
51 fn execute_operation(
54 &self,
55 operation: PlanOperation,
56 ) -> crate::Result<TransactionResult<Self::Pager>> {
57 let _ = operation;
58 Err(Error::Internal(format!(
59 "namespace '{}' does not yet support execute_operation",
60 self.namespace_id()
61 )))
62 }
63
64 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>>;
66
67 fn list_tables(&self) -> Vec<String>;
69
70 fn owns_table(&self, canonical: &str) -> bool {
72 let _ = canonical;
73 false
74 }
75}
76
77pub trait StorageNamespaceOps: Send + Sync {
79 fn namespace_id(&self) -> &NamespaceId;
80 fn list_tables(&self) -> Vec<String>;
81 fn owns_table(&self, canonical: &str) -> bool;
82 fn as_any(&self) -> &dyn Any;
83}
84
85impl<T> StorageNamespaceOps for T
86where
87 T: StorageNamespace,
88{
89 fn namespace_id(&self) -> &NamespaceId {
90 StorageNamespace::namespace_id(self)
91 }
92
93 fn list_tables(&self) -> Vec<String> {
94 StorageNamespace::list_tables(self)
95 }
96
97 fn owns_table(&self, canonical: &str) -> bool {
98 StorageNamespace::owns_table(self, canonical)
99 }
100
101 fn as_any(&self) -> &dyn Any {
102 self
103 }
104}
105
106#[derive(Clone)]
107pub struct PersistentNamespace<P>
108where
109 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
110{
111 id: NamespaceId,
112 context: Arc<RuntimeContext<P>>,
113}
114
115impl<P> PersistentNamespace<P>
116where
117 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
118{
119 pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
120 Self { id, context }
121 }
122}
123
124impl<P> StorageNamespace for PersistentNamespace<P>
125where
126 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
127{
128 type Pager = P;
129
130 fn namespace_id(&self) -> &NamespaceId {
131 &self.id
132 }
133
134 fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
135 Arc::clone(&self.context)
136 }
137
138 fn create_table(
139 &self,
140 plan: CreateTablePlan,
141 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
142 self.context.create_table_plan(plan)
143 }
144
145 fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
146 self.context.drop_table_immediate(name, if_exists)
147 }
148
149 fn create_index(
150 &self,
151 plan: CreateIndexPlan,
152 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
153 self.context.create_index(plan)
154 }
155
156 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
157 self.context.lookup_table(canonical)
158 }
159
160 fn list_tables(&self) -> Vec<String> {
161 self.context.table_names()
162 }
163}
164
165#[derive(Clone)]
166pub struct TemporaryNamespace<P>
167where
168 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
169{
170 id: NamespaceId,
171 context: Arc<RwLock<Arc<RuntimeContext<P>>>>,
172 tables: Arc<RwLock<FxHashSet<String>>>,
173}
174
175impl<P> TemporaryNamespace<P>
176where
177 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
178{
179 pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
180 Self {
181 id,
182 context: Arc::new(RwLock::new(context)),
183 tables: Arc::new(RwLock::new(FxHashSet::default())),
184 }
185 }
186
187 pub fn replace_context(&self, context: Arc<RuntimeContext<P>>) {
188 let mut guard = self
189 .context
190 .write()
191 .expect("temporary context lock poisoned");
192 *guard = context;
193 }
194
195 pub fn register_table(&self, canonical: String) {
196 self.tables
197 .write()
198 .expect("temporary table registry poisoned")
199 .insert(canonical);
200 }
201
202 pub fn unregister_table(&self, canonical: &str) {
203 self.tables
204 .write()
205 .expect("temporary table registry poisoned")
206 .remove(canonical);
207 }
208
209 pub fn clear_tables(&self) {
210 let canonical_names: Vec<String> = {
211 let mut guard = self
212 .tables
213 .write()
214 .expect("temporary table registry poisoned");
215 guard.drain().collect()
216 };
217
218 if canonical_names.is_empty() {
219 return;
220 }
221
222 let catalog = self.context().table_catalog();
223 for canonical in canonical_names {
224 if let Some(table_id) = catalog.table_id(&canonical)
225 && catalog.unregister_table(table_id)
226 {
227 tracing::debug!(
228 "[TEMP] Unregistered temporary table '{}' from shared catalog",
229 canonical
230 );
231 }
232 }
233 }
234
235 pub fn has_table(&self, canonical: &str) -> bool {
236 self.tables
237 .read()
238 .expect("temporary table registry poisoned")
239 .contains(canonical)
240 }
241
242 pub fn context(&self) -> Arc<RuntimeContext<P>> {
243 Arc::clone(
244 &self
245 .context
246 .read()
247 .expect("temporary context lock poisoned"),
248 )
249 }
250}
251
252impl<P> StorageNamespace for TemporaryNamespace<P>
253where
254 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
255{
256 type Pager = P;
257
258 fn namespace_id(&self) -> &NamespaceId {
259 &self.id
260 }
261
262 fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
263 self.context()
264 }
265
266 fn create_table(
267 &self,
268 plan: CreateTablePlan,
269 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
270 let (_, canonical) = canonical_table_name(&plan.name)?;
271 let context = self.context();
272 let result = context.create_table_plan(plan)?;
273 if !self.has_table(&canonical) {
274 self.register_table(canonical);
275 }
276 Ok(result)
277 }
278
279 fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
280 let (_, canonical) = canonical_table_name(name)?;
281 let context = self.context();
282 context.drop_table_immediate(name, if_exists)?;
283 self.unregister_table(&canonical);
284 Ok(())
285 }
286
287 fn create_index(
288 &self,
289 plan: CreateIndexPlan,
290 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
291 self.context().create_index(plan)
292 }
293
294 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
295 self.context().lookup_table(canonical)
296 }
297
298 fn list_tables(&self) -> Vec<String> {
299 let mut names = self.context().table_names();
300 let tracked = self
301 .tables
302 .read()
303 .expect("temporary table registry poisoned")
304 .clone();
305 for canonical in tracked {
306 if !names.iter().any(|existing| existing == &canonical) {
307 names.push(canonical);
308 }
309 }
310 names
311 }
312
313 fn owns_table(&self, canonical: &str) -> bool {
314 self.has_table(canonical)
315 }
316}
317
318#[derive(Clone)]
319struct NamespaceEntry {
320 ops: Arc<dyn StorageNamespaceOps>,
321 erased: Arc<dyn Any + Send + Sync>,
322}
323
324impl NamespaceEntry {
325 fn new<N>(namespace: Arc<N>) -> Self
326 where
327 N: StorageNamespace + 'static,
328 {
329 let ops: Arc<dyn StorageNamespaceOps> = namespace.clone();
330 let erased: Arc<dyn Any + Send + Sync> = namespace;
331 Self { ops, erased }
332 }
333
334 fn as_typed<N>(&self) -> Option<Arc<N>>
335 where
336 N: StorageNamespace + 'static,
337 {
338 self.erased.clone().downcast::<N>().ok()
339 }
340}
341
342pub struct StorageNamespaceRegistry {
343 persistent_id: NamespaceId,
344 namespaces: FxHashMap<NamespaceId, NamespaceEntry>,
345 schema_map: FxHashMap<String, NamespaceId>,
346 priority: Vec<NamespaceId>,
347}
348
349impl StorageNamespaceRegistry {
350 pub fn new(persistent_id: NamespaceId) -> Self {
351 Self {
352 persistent_id: persistent_id.clone(),
353 namespaces: FxHashMap::default(),
354 schema_map: FxHashMap::default(),
355 priority: vec![persistent_id],
356 }
357 }
358
359 pub fn persistent_id(&self) -> &str {
360 &self.persistent_id
361 }
362
363 pub fn register_namespace<N, I>(
364 &mut self,
365 namespace: Arc<N>,
366 schemas: I,
367 prefer_unqualified: bool,
368 ) where
369 N: StorageNamespace + 'static,
370 I: IntoIterator<Item = String>,
371 {
372 let id = namespace.namespace_id().to_string();
373 let entry = NamespaceEntry::new(namespace);
374 for schema in schemas {
375 self.schema_map.insert(schema, id.clone());
376 }
377 if prefer_unqualified {
378 if !self.priority.iter().any(|existing| existing == &id) {
379 self.priority.insert(0, id.clone());
380 }
381 } else if !self.priority.iter().any(|existing| existing == &id) {
382 self.priority.push(id.clone());
383 }
384 self.namespaces.insert(id, entry);
385 }
386
387 pub fn register_schema(&mut self, schema: impl Into<String>, namespace: NamespaceId) {
388 self.schema_map.insert(schema.into(), namespace);
389 }
390
391 pub fn set_unqualified_priority(&mut self, order: Vec<NamespaceId>) {
392 self.priority = order;
393 }
394
395 pub fn namespace_for_schema(&self, schema: &str) -> Option<&NamespaceId> {
396 self.schema_map.get(schema)
397 }
398
399 pub fn namespace_ops(&self, id: &str) -> Option<Arc<dyn StorageNamespaceOps>> {
400 self.namespaces.get(id).map(|entry| Arc::clone(&entry.ops))
401 }
402
403 pub fn namespace<N>(&self, id: &str) -> Option<Arc<N>>
404 where
405 N: StorageNamespace + 'static,
406 {
407 self.namespaces
408 .get(id)
409 .and_then(|entry| entry.as_typed::<N>())
410 }
411
412 pub fn unqualified_order(&self) -> Box<dyn Iterator<Item = &NamespaceId> + '_> {
413 if self.priority.is_empty() {
414 Box::new(std::iter::once(&self.persistent_id))
415 } else {
416 Box::new(self.priority.iter())
417 }
418 }
419
420 pub fn is_temporary_table(&self, canonical: &str) -> bool {
421 self.namespaces
422 .values()
423 .any(|entry| entry.ops.owns_table(canonical))
424 }
425
426 pub fn namespace_for_table(&self, canonical: &str) -> NamespaceId {
427 if let Some(namespace_id) = self
428 .priority
429 .iter()
430 .filter_map(|namespace_id| self.namespaces.get(namespace_id))
431 .find(|entry| entry.ops.owns_table(canonical))
432 .map(|entry| entry.ops.namespace_id().clone())
433 {
434 return namespace_id;
435 }
436
437 self.persistent_id.clone()
438 }
439}