llkv_runtime/
storage_namespace.rs

1//! Runtime storage namespace abstraction.
2//!
3//! Namespaces wrap pager-backed contexts so sessions can route operations based
4//! on table ownership (persistent vs. temporary).
5
6use std::any::Any;
7use std::sync::{Arc, RwLock};
8
9use llkv_executor::ExecutorTable;
10use llkv_plan::{CreateIndexPlan, CreateTablePlan, PlanOperation};
11use llkv_result::Error;
12use llkv_storage::pager::Pager;
13use llkv_transaction::TransactionResult;
14use rustc_hash::{FxHashMap, FxHashSet};
15use simd_r_drive_entry_handle::EntryHandle;
16
17use crate::{RuntimeContext, RuntimeStatementResult};
18use llkv_table::canonical_table_name;
19
20pub type NamespaceId = String;
21
22// TODO: Extract to constants module
23// NOTE: Keep string identifiers here until the runtime constants module is formalized.
24pub const PERSISTENT_NAMESPACE_ID: &str = "main";
25pub const TEMPORARY_NAMESPACE_ID: &str = "temp";
26
27// TODO: Rename to `RuntimeStorageNamespace`?
28// NOTE: Trait name mirrors the broader storage namespace concept used across crates.
29/// Trait implemented by all runtime storage namespaces.
30///
31/// Each namespace encapsulates a single storage backend (pager + catalog + table cache).
32/// Namespaces expose a minimal API so higher layers (RuntimeSession, SQL engine) can route
33/// operations without hard-coding per-namespace logic.
34pub trait StorageNamespace: Send + Sync + 'static {
35    type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
36
37    /// Identifier used when resolving schemas (e.g. "main", "temp").
38    fn namespace_id(&self) -> &NamespaceId;
39
40    /// Returns the runtime context bound to this namespace.
41    fn context(&self) -> Arc<RuntimeContext<Self::Pager>>;
42
43    /// Create a table inside this namespace.
44    fn create_table(
45        &self,
46        plan: CreateTablePlan,
47    ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
48
49    /// Drop a table from this namespace.
50    fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()>;
51
52    /// Create an index inside this namespace.
53    fn create_index(
54        &self,
55        plan: CreateIndexPlan,
56    ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
57
58    /// Execute a generic plan operation. Namespaces that do not yet support this entry point
59    /// should rely on the default error implementation.
60    fn execute_operation(
61        &self,
62        operation: PlanOperation,
63    ) -> crate::Result<TransactionResult<Self::Pager>> {
64        let _ = operation;
65        Err(Error::Internal(format!(
66            "namespace '{}' does not yet support execute_operation",
67            self.namespace_id()
68        )))
69    }
70
71    /// Lookup a table by canonical name.
72    fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>>;
73
74    /// List tables visible to this namespace.
75    fn list_tables(&self) -> Vec<String>;
76
77    /// Returns true if the namespace owns (and should answer for) the canonical table name.
78    fn owns_table(&self, canonical: &str) -> bool {
79        let _ = canonical;
80        false
81    }
82}
83
84/// Helper trait for storing namespaces behind trait objects with basic inspection support.
85pub trait StorageNamespaceOps: Send + Sync {
86    fn namespace_id(&self) -> &NamespaceId;
87    fn list_tables(&self) -> Vec<String>;
88    fn owns_table(&self, canonical: &str) -> bool;
89    fn as_any(&self) -> &dyn Any;
90}
91
92impl<T> StorageNamespaceOps for T
93where
94    T: StorageNamespace,
95{
96    fn namespace_id(&self) -> &NamespaceId {
97        StorageNamespace::namespace_id(self)
98    }
99
100    fn list_tables(&self) -> Vec<String> {
101        StorageNamespace::list_tables(self)
102    }
103
104    fn owns_table(&self, canonical: &str) -> bool {
105        StorageNamespace::owns_table(self, canonical)
106    }
107
108    fn as_any(&self) -> &dyn Any {
109        self
110    }
111}
112
113#[derive(Clone)]
114pub struct PersistentNamespace<P>
115where
116    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
117{
118    id: NamespaceId,
119    context: Arc<RuntimeContext<P>>,
120}
121
122impl<P> PersistentNamespace<P>
123where
124    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
125{
126    pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
127        Self { id, context }
128    }
129}
130
131impl<P> StorageNamespace for PersistentNamespace<P>
132where
133    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
134{
135    type Pager = P;
136
137    fn namespace_id(&self) -> &NamespaceId {
138        &self.id
139    }
140
141    fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
142        Arc::clone(&self.context)
143    }
144
145    fn create_table(
146        &self,
147        plan: CreateTablePlan,
148    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
149        self.context.create_table_plan(plan)
150    }
151
152    fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
153        self.context.drop_table_immediate(name, if_exists)
154    }
155
156    fn create_index(
157        &self,
158        plan: CreateIndexPlan,
159    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
160        self.context.create_index(plan)
161    }
162
163    fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
164        self.context.lookup_table(canonical)
165    }
166
167    fn list_tables(&self) -> Vec<String> {
168        self.context.table_names()
169    }
170}
171
172#[derive(Clone)]
173pub struct TemporaryNamespace<P>
174where
175    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
176{
177    id: NamespaceId,
178    context: Arc<RwLock<Arc<RuntimeContext<P>>>>,
179    tables: Arc<RwLock<FxHashSet<String>>>,
180}
181
182impl<P> TemporaryNamespace<P>
183where
184    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
185{
186    pub fn new(id: NamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
187        Self {
188            id,
189            context: Arc::new(RwLock::new(context)),
190            tables: Arc::new(RwLock::new(FxHashSet::default())),
191        }
192    }
193
194    pub fn replace_context(&self, context: Arc<RuntimeContext<P>>) {
195        let mut guard = self
196            .context
197            .write()
198            .expect("temporary context lock poisoned");
199        *guard = context;
200    }
201
202    pub fn register_table(&self, canonical: String) {
203        self.tables
204            .write()
205            .expect("temporary table registry poisoned")
206            .insert(canonical);
207    }
208
209    pub fn unregister_table(&self, canonical: &str) {
210        self.tables
211            .write()
212            .expect("temporary table registry poisoned")
213            .remove(canonical);
214    }
215
216    pub fn clear_tables(&self) {
217        let canonical_names: Vec<String> = {
218            let mut guard = self
219                .tables
220                .write()
221                .expect("temporary table registry poisoned");
222            guard.drain().collect()
223        };
224
225        if canonical_names.is_empty() {
226            return;
227        }
228
229        let catalog = self.context().table_catalog();
230        for canonical in canonical_names {
231            if let Some(table_id) = catalog.table_id(&canonical)
232                && catalog.unregister_table(table_id)
233            {
234                tracing::debug!(
235                    "[TEMP] Unregistered temporary table '{}' from shared catalog",
236                    canonical
237                );
238            }
239        }
240    }
241
242    pub fn has_table(&self, canonical: &str) -> bool {
243        self.tables
244            .read()
245            .expect("temporary table registry poisoned")
246            .contains(canonical)
247    }
248
249    pub fn context(&self) -> Arc<RuntimeContext<P>> {
250        Arc::clone(
251            &self
252                .context
253                .read()
254                .expect("temporary context lock poisoned"),
255        )
256    }
257}
258
259impl<P> StorageNamespace for TemporaryNamespace<P>
260where
261    P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
262{
263    type Pager = P;
264
265    fn namespace_id(&self) -> &NamespaceId {
266        &self.id
267    }
268
269    fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
270        self.context()
271    }
272
273    fn create_table(
274        &self,
275        plan: CreateTablePlan,
276    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
277        let (_, canonical) = canonical_table_name(&plan.name)?;
278        let context = self.context();
279        let result = context.create_table_plan(plan)?;
280        if !self.has_table(&canonical) {
281            self.register_table(canonical);
282        }
283        Ok(result)
284    }
285
286    fn drop_table(&self, name: &str, if_exists: bool) -> crate::Result<()> {
287        let (_, canonical) = canonical_table_name(name)?;
288        let context = self.context();
289        context.drop_table_immediate(name, if_exists)?;
290        self.unregister_table(&canonical);
291        Ok(())
292    }
293
294    fn create_index(
295        &self,
296        plan: CreateIndexPlan,
297    ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
298        self.context().create_index(plan)
299    }
300
301    fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
302        self.context().lookup_table(canonical)
303    }
304
305    fn list_tables(&self) -> Vec<String> {
306        let mut names = self.context().table_names();
307        let tracked = self
308            .tables
309            .read()
310            .expect("temporary table registry poisoned")
311            .clone();
312        for canonical in tracked {
313            if !names.iter().any(|existing| existing == &canonical) {
314                names.push(canonical);
315            }
316        }
317        names
318    }
319
320    fn owns_table(&self, canonical: &str) -> bool {
321        self.has_table(canonical)
322    }
323}
324
325#[derive(Clone)]
326struct NamespaceEntry {
327    ops: Arc<dyn StorageNamespaceOps>,
328    erased: Arc<dyn Any + Send + Sync>,
329}
330
331impl NamespaceEntry {
332    fn new<N>(namespace: Arc<N>) -> Self
333    where
334        N: StorageNamespace + 'static,
335    {
336        let ops: Arc<dyn StorageNamespaceOps> = namespace.clone();
337        let erased: Arc<dyn Any + Send + Sync> = namespace;
338        Self { ops, erased }
339    }
340
341    fn as_typed<N>(&self) -> Option<Arc<N>>
342    where
343        N: StorageNamespace + 'static,
344    {
345        self.erased.clone().downcast::<N>().ok()
346    }
347}
348
349pub struct StorageNamespaceRegistry {
350    persistent_id: NamespaceId,
351    namespaces: FxHashMap<NamespaceId, NamespaceEntry>,
352    schema_map: FxHashMap<String, NamespaceId>,
353    priority: Vec<NamespaceId>,
354}
355
356impl StorageNamespaceRegistry {
357    pub fn new(persistent_id: NamespaceId) -> Self {
358        Self {
359            persistent_id: persistent_id.clone(),
360            namespaces: FxHashMap::default(),
361            schema_map: FxHashMap::default(),
362            priority: vec![persistent_id],
363        }
364    }
365
366    pub fn persistent_id(&self) -> &str {
367        &self.persistent_id
368    }
369
370    pub fn register_namespace<N, I>(
371        &mut self,
372        namespace: Arc<N>,
373        schemas: I,
374        prefer_unqualified: bool,
375    ) where
376        N: StorageNamespace + 'static,
377        I: IntoIterator<Item = String>,
378    {
379        let id = namespace.namespace_id().to_string();
380        let entry = NamespaceEntry::new(namespace);
381        for schema in schemas {
382            self.schema_map.insert(schema, id.clone());
383        }
384        if prefer_unqualified {
385            if !self.priority.iter().any(|existing| existing == &id) {
386                self.priority.insert(0, id.clone());
387            }
388        } else if !self.priority.iter().any(|existing| existing == &id) {
389            self.priority.push(id.clone());
390        }
391        self.namespaces.insert(id, entry);
392    }
393
394    pub fn register_schema(&mut self, schema: impl Into<String>, namespace: NamespaceId) {
395        self.schema_map.insert(schema.into(), namespace);
396    }
397
398    pub fn set_unqualified_priority(&mut self, order: Vec<NamespaceId>) {
399        self.priority = order;
400    }
401
402    pub fn namespace_for_schema(&self, schema: &str) -> Option<&NamespaceId> {
403        self.schema_map.get(schema)
404    }
405
406    pub fn namespace_ops(&self, id: &str) -> Option<Arc<dyn StorageNamespaceOps>> {
407        self.namespaces.get(id).map(|entry| Arc::clone(&entry.ops))
408    }
409
410    pub fn namespace<N>(&self, id: &str) -> Option<Arc<N>>
411    where
412        N: StorageNamespace + 'static,
413    {
414        self.namespaces
415            .get(id)
416            .and_then(|entry| entry.as_typed::<N>())
417    }
418
419    pub fn unqualified_order(&self) -> Box<dyn Iterator<Item = &NamespaceId> + '_> {
420        if self.priority.is_empty() {
421            Box::new(std::iter::once(&self.persistent_id))
422        } else {
423            Box::new(self.priority.iter())
424        }
425    }
426
427    pub fn is_temporary_table(&self, canonical: &str) -> bool {
428        self.namespaces
429            .values()
430            .any(|entry| entry.ops.owns_table(canonical))
431    }
432
433    pub fn namespace_for_table(&self, canonical: &str) -> NamespaceId {
434        if let Some(namespace_id) = self
435            .priority
436            .iter()
437            .filter_map(|namespace_id| self.namespaces.get(namespace_id))
438            .find(|entry| entry.ops.owns_table(canonical))
439            .map(|entry| entry.ops.namespace_id().clone())
440        {
441            return namespace_id;
442        }
443
444        self.persistent_id.clone()
445    }
446}