1use std::any::Any;
7use std::sync::{Arc, RwLock};
8
9use llkv_executor::ExecutorTable;
10use llkv_plan::{CreateIndexPlan, CreateTablePlan, PlanOperation};
11use llkv_result::Error;
12use llkv_storage::pager::Pager;
13use llkv_transaction::TransactionResult;
14use rustc_hash::{FxHashMap, FxHashSet};
15use simd_r_drive_entry_handle::EntryHandle;
16
17use crate::{RuntimeContext, RuntimeStatementResult};
18use llkv_table::canonical_table_name;
19
20pub type NamespaceId = String;
21
22pub const PERSISTENT_NAMESPACE_ID: &str = "main";
25pub const TEMPORARY_NAMESPACE_ID: &str = "temp";
26
27pub trait StorageNamespace: Send + Sync + 'static {
35 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
36
37 fn namespace_id(&self) -> &NamespaceId;
39
40 fn context(&self) -> Arc<RuntimeContext<Self::Pager>>;
42
43 fn create_table(
45 &self,
46 plan: CreateTablePlan,
47 ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
48
49 fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()>;
51
52 fn create_index(
54 &self,
55 plan: CreateIndexPlan,
56 ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
57
58 fn execute_operation(
61 &self,
62 operation: PlanOperation,
63 ) -> crate::Result<TransactionResult<Self::Pager>> {
64 let _ = operation;
65 Err(Error::Internal(format!(
66 "namespace '{}' does not yet support execute_operation",
67 self.namespace_id()
68 )))
69 }
70
71 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>>;
73
74 fn list_tables(&self) -> Vec<String>;
76
77 fn owns_table(&self, canonical: &str) -> bool {
79 let _ = canonical;
80 false
81 }
82}
83
84pub trait StorageNamespaceOps: Send + Sync {
86 fn namespace_id(&self) -> &NamespaceId;
87 fn list_tables(&self) -> Vec<String>;
88 fn owns_table(&self, canonical: &str) -> bool;
89 fn as_any(&self) -> &dyn Any;
90}
91
92impl<T> StorageNamespaceOps for T
93where
94 T: StorageNamespace,
95{
96 fn namespace_id(&self) -> &NamespaceId {
97 StorageNamespace::namespace_id(self)
98 }
99
100 fn list_tables(&self) -> Vec<String> {
101 StorageNamespace::list_tables(self)
102 }
103
104 fn owns_table(&self, canonical: &str) -> bool {
105 StorageNamespace::owns_table(self, canonical)
106 }
107
108 fn as_any(&self) -> &dyn Any {
109 self
110 }
111}
112
113#[derive(Clone)]
114pub struct PersistentNamespace<P>
115where
116 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
117{
118 id: NamespaceId,
119 context: Arc<RuntimeContext<P>>,
120}
121
122impl<P> PersistentNamespace<P>
123where
124 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
125{
126 pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
127 Self { id, context }
128 }
129}
130
131impl<P> StorageNamespace for PersistentNamespace<P>
132where
133 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
134{
135 type Pager = P;
136
137 fn namespace_id(&self) -> &NamespaceId {
138 &self.id
139 }
140
141 fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
142 Arc::clone(&self.context)
143 }
144
145 fn create_table(
146 &self,
147 plan: CreateTablePlan,
148 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
149 self.context.create_table_plan(plan)
150 }
151
152 fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
153 self.context.drop_table_immediate(name, if_exists)
154 }
155
156 fn create_index(
157 &self,
158 plan: CreateIndexPlan,
159 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
160 self.context.create_index(plan)
161 }
162
163 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
164 self.context.lookup_table(canonical)
165 }
166
167 fn list_tables(&self) -> Vec<String> {
168 self.context.table_names()
169 }
170}
171
172#[derive(Clone)]
173pub struct TemporaryNamespace<P>
174where
175 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
176{
177 id: NamespaceId,
178 context: Arc<RwLock<Arc<RuntimeContext<P>>>>,
179 tables: Arc<RwLock<FxHashSet<String>>>,
180}
181
182impl<P> TemporaryNamespace<P>
183where
184 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
185{
186 pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
187 Self {
188 id,
189 context: Arc::new(RwLock::new(context)),
190 tables: Arc::new(RwLock::new(FxHashSet::default())),
191 }
192 }
193
194 pub fn replace_context(&self, context: Arc<RuntimeContext<P>>) {
195 let mut guard = self
196 .context
197 .write()
198 .expect("temporary context lock poisoned");
199 *guard = context;
200 }
201
202 pub fn register_table(&self, canonical: String) {
203 self.tables
204 .write()
205 .expect("temporary table registry poisoned")
206 .insert(canonical);
207 }
208
209 pub fn unregister_table(&self, canonical: &str) {
210 self.tables
211 .write()
212 .expect("temporary table registry poisoned")
213 .remove(canonical);
214 }
215
216 pub fn clear_tables(&self) {
217 let canonical_names: Vec<String> = {
218 let mut guard = self
219 .tables
220 .write()
221 .expect("temporary table registry poisoned");
222 guard.drain().collect()
223 };
224
225 if canonical_names.is_empty() {
226 return;
227 }
228
229 let catalog = self.context().table_catalog();
230 for canonical in canonical_names {
231 if let Some(table_id) = catalog.table_id(&canonical)
232 && catalog.unregister_table(table_id)
233 {
234 tracing::debug!(
235 "[TEMP] Unregistered temporary table '{}' from shared catalog",
236 canonical
237 );
238 }
239 }
240 }
241
242 pub fn has_table(&self, canonical: &str) -> bool {
243 self.tables
244 .read()
245 .expect("temporary table registry poisoned")
246 .contains(canonical)
247 }
248
249 pub fn context(&self) -> Arc<RuntimeContext<P>> {
250 Arc::clone(
251 &self
252 .context
253 .read()
254 .expect("temporary context lock poisoned"),
255 )
256 }
257}
258
259impl<P> StorageNamespace for TemporaryNamespace<P>
260where
261 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
262{
263 type Pager = P;
264
265 fn namespace_id(&self) -> &NamespaceId {
266 &self.id
267 }
268
269 fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
270 self.context()
271 }
272
273 fn create_table(
274 &self,
275 plan: CreateTablePlan,
276 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
277 let (_, canonical) = canonical_table_name(&plan.name)?;
278 let context = self.context();
279 let result = context.create_table_plan(plan)?;
280 if !self.has_table(&canonical) {
281 self.register_table(canonical);
282 }
283 Ok(result)
284 }
285
286 fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
287 let (_, canonical) = canonical_table_name(name)?;
288 let context = self.context();
289 context.drop_table_immediate(name, if_exists)?;
290 self.unregister_table(&canonical);
291 Ok(())
292 }
293
294 fn create_index(
295 &self,
296 plan: CreateIndexPlan,
297 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
298 self.context().create_index(plan)
299 }
300
301 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
302 self.context().lookup_table(canonical)
303 }
304
305 fn list_tables(&self) -> Vec<String> {
306 let mut names = self.context().table_names();
307 let tracked = self
308 .tables
309 .read()
310 .expect("temporary table registry poisoned")
311 .clone();
312 for canonical in tracked {
313 if !names.iter().any(|existing| existing == &canonical) {
314 names.push(canonical);
315 }
316 }
317 names
318 }
319
320 fn owns_table(&self, canonical: &str) -> bool {
321 self.has_table(canonical)
322 }
323}
324
325#[derive(Clone)]
326struct NamespaceEntry {
327 ops: Arc<dyn StorageNamespaceOps>,
328 erased: Arc<dyn Any + Send + Sync>,
329}
330
331impl NamespaceEntry {
332 fn new<N>(namespace: Arc<N>) -> Self
333 where
334 N: StorageNamespace + 'static,
335 {
336 let ops: Arc<dyn StorageNamespaceOps> = namespace.clone();
337 let erased: Arc<dyn Any + Send + Sync> = namespace;
338 Self { ops, erased }
339 }
340
341 fn as_typed<N>(&self) -> Option<Arc<N>>
342 where
343 N: StorageNamespace + 'static,
344 {
345 self.erased.clone().downcast::<N>().ok()
346 }
347}
348
349pub struct StorageNamespaceRegistry {
350 persistent_id: NamespaceId,
351 namespaces: FxHashMap<NamespaceId, NamespaceEntry>,
352 schema_map: FxHashMap<String, NamespaceId>,
353 priority: Vec<NamespaceId>,
354}
355
356impl StorageNamespaceRegistry {
357 pub fn new(persistent_id: NamespaceId) -> Self {
358 Self {
359 persistent_id: persistent_id.clone(),
360 namespaces: FxHashMap::default(),
361 schema_map: FxHashMap::default(),
362 priority: vec![persistent_id],
363 }
364 }
365
366 pub fn persistent_id(&self) -> &str {
367 &self.persistent_id
368 }
369
370 pub fn register_namespace<N, I>(
371 &mut self,
372 namespace: Arc<N>,
373 schemas: I,
374 prefer_unqualified: bool,
375 ) where
376 N: StorageNamespace + 'static,
377 I: IntoIterator<Item = String>,
378 {
379 let id = namespace.namespace_id().to_string();
380 let entry = NamespaceEntry::new(namespace);
381 for schema in schemas {
382 self.schema_map.insert(schema, id.clone());
383 }
384 if prefer_unqualified {
385 if !self.priority.iter().any(|existing| existing == &id) {
386 self.priority.insert(0, id.clone());
387 }
388 } else if !self.priority.iter().any(|existing| existing == &id) {
389 self.priority.push(id.clone());
390 }
391 self.namespaces.insert(id, entry);
392 }
393
394 pub fn register_schema(&mut self, schema: impl Into<String>, namespace: NamespaceId) {
395 self.schema_map.insert(schema.into(), namespace);
396 }
397
398 pub fn set_unqualified_priority(&mut self, order: Vec<NamespaceId>) {
399 self.priority = order;
400 }
401
402 pub fn namespace_for_schema(&self, schema: &str) -> Option<&NamespaceId> {
403 self.schema_map.get(schema)
404 }
405
406 pub fn namespace_ops(&self, id: &str) -> Option<Arc<dyn StorageNamespaceOps>> {
407 self.namespaces.get(id).map(|entry| Arc::clone(&entry.ops))
408 }
409
410 pub fn namespace<N>(&self, id: &str) -> Option<Arc<N>>
411 where
412 N: StorageNamespace + 'static,
413 {
414 self.namespaces
415 .get(id)
416 .and_then(|entry| entry.as_typed::<N>())
417 }
418
419 pub fn unqualified_order(&self) -> Box<dyn Iterator<Item = &NamespaceId> + '_> {
420 if self.priority.is_empty() {
421 Box::new(std::iter::once(&self.persistent_id))
422 } else {
423 Box::new(self.priority.iter())
424 }
425 }
426
427 pub fn is_temporary_table(&self, canonical: &str) -> bool {
428 self.namespaces
429 .values()
430 .any(|entry| entry.ops.owns_table(canonical))
431 }
432
433 pub fn namespace_for_table(&self, canonical: &str) -> NamespaceId {
434 if let Some(namespace_id) = self
435 .priority
436 .iter()
437 .filter_map(|namespace_id| self.namespaces.get(namespace_id))
438 .find(|entry| entry.ops.owns_table(canonical))
439 .map(|entry| entry.ops.namespace_id().clone())
440 {
441 return namespace_id;
442 }
443
444 self.persistent_id.clone()
445 }
446}