llkv_runtime/
storage_namespace.rs

1use std::any::Any;
2use std::sync::{Arc, RwLock};
3
4use llkv_executor::ExecutorTable;
5use llkv_plan::{CreateIndexPlan, CreateTablePlan, PlanOperation};
6use llkv_result::Error;
7use llkv_storage::pager::Pager;
8use llkv_transaction::TransactionResult;
9use rustc_hash::{FxHashMap, FxHashSet};
10use simd_r_drive_entry_handle::EntryHandle;
11
12use crate::{RuntimeContext, RuntimeStatementResult};
13use llkv_table::canonical_table_name;
14
15pub type NamespaceId = String;
16
17// TODO: Extract to constants module
18pub const PERSISTENT_NAMESPACE_ID: &str = "main";
19pub const TEMPORARY_NAMESPACE_ID: &str = "temp";
20
21// TODO: Rename to `RuntimeStorageNamespace`?
22/// Trait implemented by all runtime storage namespaces.
23///
24/// Each namespace encapsulates a single storage backend (pager + catalog + table cache).
25/// Namespaces expose a minimal API so higher layers (RuntimeSession, SQL engine) can route
26/// operations without hard-coding per-namespace logic.
27pub trait StorageNamespace: Send + Sync + 'static {
28    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
29
30    /// Identifier used when resolving schemas (e.g. "main", "temp").
31    fn namespace_id(&self) -> &NamespaceId;
32
33    /// Returns the runtime context bound to this namespace.
34    fn context(&self) -> Arc<RuntimeContext<Self::Pager>>;
35
36    /// Create a table inside this namespace.
37    fn create_table(
38        &self,
39        plan: CreateTablePlan,
40    ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
41
42    /// Drop a table from this namespace.
43    fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()>;
44
45    /// Create an index inside this namespace.
46    fn create_index(
47        &self,
48        plan: CreateIndexPlan,
49    ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
50
51    /// Execute a generic plan operation. Namespaces that do not yet support this entry point
52    /// should rely on the default error implementation.
53    fn execute_operation(
54        &self,
55        operation: PlanOperation,
56    ) -> crate::Result<TransactionResult<Self::Pager>> {
57        let _ = operation;
58        Err(Error::Internal(format!(
59            "namespace '{}' does not yet support execute_operation",
60            self.namespace_id()
61        )))
62    }
63
64    /// Lookup a table by canonical name.
65    fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>>;
66
67    /// List tables visible to this namespace.
68    fn list_tables(&self) -> Vec<String>;
69
70    /// Returns true if the namespace owns (and should answer for) the canonical table name.
71    fn owns_table(&self, canonical: &str) -> bool {
72        let _ = canonical;
73        false
74    }
75}
76
77/// Helper trait for storing namespaces behind trait objects with basic inspection support.
78pub trait StorageNamespaceOps: Send + Sync {
79    fn namespace_id(&self) -> &NamespaceId;
80    fn list_tables(&self) -> Vec<String>;
81    fn owns_table(&self, canonical: &str) -> bool;
82    fn as_any(&self) -> &dyn Any;
83}
84
85impl<T> StorageNamespaceOps for T
86where
87    T: StorageNamespace,
88{
89    fn namespace_id(&self) -> &NamespaceId {
90        StorageNamespace::namespace_id(self)
91    }
92
93    fn list_tables(&self) -> Vec<String> {
94        StorageNamespace::list_tables(self)
95    }
96
97    fn owns_table(&self, canonical: &str) -> bool {
98        StorageNamespace::owns_table(self, canonical)
99    }
100
101    fn as_any(&self) -> &dyn Any {
102        self
103    }
104}
105
106#[derive(Clone)]
107pub struct PersistentNamespace<P>
108where
109    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
110{
111    id: NamespaceId,
112    context: Arc<RuntimeContext<P>>,
113}
114
115impl<P> PersistentNamespace<P>
116where
117    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
118{
119    pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
120        Self { id, context }
121    }
122}
123
124impl<P> StorageNamespace for PersistentNamespace<P>
125where
126    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
127{
128    type Pager = P;
129
130    fn namespace_id(&self) -> &NamespaceId {
131        &self.id
132    }
133
134    fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
135        Arc::clone(&self.context)
136    }
137
138    fn create_table(
139        &self,
140        plan: CreateTablePlan,
141    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
142        self.context.create_table_plan(plan)
143    }
144
145    fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
146        self.context.drop_table_immediate(name, if_exists)
147    }
148
149    fn create_index(
150        &self,
151        plan: CreateIndexPlan,
152    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
153        self.context.create_index(plan)
154    }
155
156    fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
157        self.context.lookup_table(canonical)
158    }
159
160    fn list_tables(&self) -> Vec<String> {
161        self.context.table_names()
162    }
163}
164
165#[derive(Clone)]
166pub struct TemporaryNamespace<P>
167where
168    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
169{
170    id: NamespaceId,
171    context: Arc<RwLock<Arc<RuntimeContext<P>>>>,
172    tables: Arc<RwLock<FxHashSet<String>>>,
173}
174
175impl<P> TemporaryNamespace<P>
176where
177    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
178{
179    pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
180        Self {
181            id,
182            context: Arc::new(RwLock::new(context)),
183            tables: Arc::new(RwLock::new(FxHashSet::default())),
184        }
185    }
186
187    pub fn replace_context(&self, context: Arc<RuntimeContext<P>>) {
188        let mut guard = self
189            .context
190            .write()
191            .expect("temporary context lock poisoned");
192        *guard = context;
193    }
194
195    pub fn register_table(&self, canonical: String) {
196        self.tables
197            .write()
198            .expect("temporary table registry poisoned")
199            .insert(canonical);
200    }
201
202    pub fn unregister_table(&self, canonical: &str) {
203        self.tables
204            .write()
205            .expect("temporary table registry poisoned")
206            .remove(canonical);
207    }
208
209    pub fn clear_tables(&self) {
210        let canonical_names: Vec<String> = {
211            let mut guard = self
212                .tables
213                .write()
214                .expect("temporary table registry poisoned");
215            guard.drain().collect()
216        };
217
218        if canonical_names.is_empty() {
219            return;
220        }
221
222        let catalog = self.context().table_catalog();
223        for canonical in canonical_names {
224            if let Some(table_id) = catalog.table_id(&canonical)
225                && catalog.unregister_table(table_id)
226            {
227                tracing::debug!(
228                    "[TEMP] Unregistered temporary table '{}' from shared catalog",
229                    canonical
230                );
231            }
232        }
233    }
234
235    pub fn has_table(&self, canonical: &str) -> bool {
236        self.tables
237            .read()
238            .expect("temporary table registry poisoned")
239            .contains(canonical)
240    }
241
242    pub fn context(&self) -> Arc<RuntimeContext<P>> {
243        Arc::clone(
244            &self
245                .context
246                .read()
247                .expect("temporary context lock poisoned"),
248        )
249    }
250}
251
252impl<P> StorageNamespace for TemporaryNamespace<P>
253where
254    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
255{
256    type Pager = P;
257
258    fn namespace_id(&self) -> &NamespaceId {
259        &self.id
260    }
261
262    fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
263        self.context()
264    }
265
266    fn create_table(
267        &self,
268        plan: CreateTablePlan,
269    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
270        let (_, canonical) = canonical_table_name(&plan.name)?;
271        let context = self.context();
272        let result = context.create_table_plan(plan)?;
273        if !self.has_table(&canonical) {
274            self.register_table(canonical);
275        }
276        Ok(result)
277    }
278
279    fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
280        let (_, canonical) = canonical_table_name(name)?;
281        let context = self.context();
282        context.drop_table_immediate(name, if_exists)?;
283        self.unregister_table(&canonical);
284        Ok(())
285    }
286
287    fn create_index(
288        &self,
289        plan: CreateIndexPlan,
290    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
291        self.context().create_index(plan)
292    }
293
294    fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
295        self.context().lookup_table(canonical)
296    }
297
298    fn list_tables(&self) -> Vec<String> {
299        let mut names = self.context().table_names();
300        let tracked = self
301            .tables
302            .read()
303            .expect("temporary table registry poisoned")
304            .clone();
305        for canonical in tracked {
306            if !names.iter().any(|existing| existing == &canonical) {
307                names.push(canonical);
308            }
309        }
310        names
311    }
312
313    fn owns_table(&self, canonical: &str) -> bool {
314        self.has_table(canonical)
315    }
316}
317
318#[derive(Clone)]
319struct NamespaceEntry {
320    ops: Arc<dyn StorageNamespaceOps>,
321    erased: Arc<dyn Any + Send + Sync>,
322}
323
324impl NamespaceEntry {
325    fn new<N>(namespace: Arc<N>) -> Self
326    where
327        N: StorageNamespace + 'static,
328    {
329        let ops: Arc<dyn StorageNamespaceOps> = namespace.clone();
330        let erased: Arc<dyn Any + Send + Sync> = namespace;
331        Self { ops, erased }
332    }
333
334    fn as_typed<N>(&self) -> Option<Arc<N>>
335    where
336        N: StorageNamespace + 'static,
337    {
338        self.erased.clone().downcast::<N>().ok()
339    }
340}
341
342pub struct StorageNamespaceRegistry {
343    persistent_id: NamespaceId,
344    namespaces: FxHashMap<NamespaceId, NamespaceEntry>,
345    schema_map: FxHashMap<String, NamespaceId>,
346    priority: Vec<NamespaceId>,
347}
348
349impl StorageNamespaceRegistry {
350    pub fn new(persistent_id: NamespaceId) -> Self {
351        Self {
352            persistent_id: persistent_id.clone(),
353            namespaces: FxHashMap::default(),
354            schema_map: FxHashMap::default(),
355            priority: vec![persistent_id],
356        }
357    }
358
359    pub fn persistent_id(&self) -> &str {
360        &self.persistent_id
361    }
362
363    pub fn register_namespace<N, I>(
364        &mut self,
365        namespace: Arc<N>,
366        schemas: I,
367        prefer_unqualified: bool,
368    ) where
369        N: StorageNamespace + 'static,
370        I: IntoIterator<Item = String>,
371    {
372        let id = namespace.namespace_id().to_string();
373        let entry = NamespaceEntry::new(namespace);
374        for schema in schemas {
375            self.schema_map.insert(schema, id.clone());
376        }
377        if prefer_unqualified {
378            if !self.priority.iter().any(|existing| existing == &id) {
379                self.priority.insert(0, id.clone());
380            }
381        } else if !self.priority.iter().any(|existing| existing == &id) {
382            self.priority.push(id.clone());
383        }
384        self.namespaces.insert(id, entry);
385    }
386
387    pub fn register_schema(&mut self, schema: impl Into<String>, namespace: NamespaceId) {
388        self.schema_map.insert(schema.into(), namespace);
389    }
390
391    pub fn set_unqualified_priority(&mut self, order: Vec<NamespaceId>) {
392        self.priority = order;
393    }
394
395    pub fn namespace_for_schema(&self, schema: &str) -> Option<&NamespaceId> {
396        self.schema_map.get(schema)
397    }
398
399    pub fn namespace_ops(&self, id: &str) -> Option<Arc<dyn StorageNamespaceOps>> {
400        self.namespaces.get(id).map(|entry| Arc::clone(&entry.ops))
401    }
402
403    pub fn namespace<N>(&self, id: &str) -> Option<Arc<N>>
404    where
405        N: StorageNamespace + 'static,
406    {
407        self.namespaces
408            .get(id)
409            .and_then(|entry| entry.as_typed::<N>())
410    }
411
412    pub fn unqualified_order(&self) -> Box<dyn Iterator<Item = &NamespaceId> + '_> {
413        if self.priority.is_empty() {
414            Box::new(std::iter::once(&self.persistent_id))
415        } else {
416            Box::new(self.priority.iter())
417        }
418    }
419
420    pub fn is_temporary_table(&self, canonical: &str) -> bool {
421        self.namespaces
422            .values()
423            .any(|entry| entry.ops.owns_table(canonical))
424    }
425
426    pub fn namespace_for_table(&self, canonical: &str) -> NamespaceId {
427        if let Some(namespace_id) = self
428            .priority
429            .iter()
430            .filter_map(|namespace_id| self.namespaces.get(namespace_id))
431            .find(|entry| entry.ops.owns_table(canonical))
432            .map(|entry| entry.ops.namespace_id().clone())
433        {
434            return namespace_id;
435        }
436
437        self.persistent_id.clone()
438    }
439}