llkv_runtime/runtime_storage_namespace/
registry.rs

1//! Registry for coordinating runtime storage namespaces.
2//!
3//! The registry keeps track of logical namespace instances (persistent,
4//! temporary, etc.) and provides lookup utilities for runtime sessions. It does
5//! **not** allocate storage or manage pager lifecycles; it simply records which
6//! runtime namespace currently owns a canonical table name or schema prefix.
7
8use std::sync::Arc;
9
10use rustc_hash::{FxHashMap, FxHashSet};
11
12use super::namespace::{
13    RuntimeNamespaceId, RuntimeStorageNamespace, RuntimeStorageNamespaceOps, TEMPORARY_NAMESPACE_ID,
14};
15
16struct NamespaceEntry {
17    ops: Arc<dyn RuntimeStorageNamespaceOps>,
18    erased: Arc<dyn std::any::Any + Send + Sync>,
19}
20
21impl NamespaceEntry {
22    fn new<N>(namespace: Arc<N>) -> Self
23    where
24        N: RuntimeStorageNamespace + 'static,
25    {
26        let ops: Arc<dyn RuntimeStorageNamespaceOps> = namespace.clone();
27        let erased: Arc<dyn std::any::Any + Send + Sync> = namespace;
28        Self { ops, erased }
29    }
30
31    fn ops(&self) -> Arc<dyn RuntimeStorageNamespaceOps> {
32        Arc::clone(&self.ops)
33    }
34
35    fn as_typed<N>(&self) -> Option<Arc<N>>
36    where
37        N: RuntimeStorageNamespace + 'static,
38    {
39        self.erased.clone().downcast::<N>().ok()
40    }
41}
42
43/// Tracks runtime storage namespaces and their table ownership.
44pub struct RuntimeStorageNamespaceRegistry {
45    persistent_id: RuntimeNamespaceId,
46    namespaces: FxHashMap<RuntimeNamespaceId, NamespaceEntry>,
47    schema_map: FxHashMap<String, RuntimeNamespaceId>,
48    priority: Vec<RuntimeNamespaceId>,
49    table_map: FxHashMap<String, RuntimeNamespaceId>,
50    namespace_tables: FxHashMap<RuntimeNamespaceId, FxHashSet<String>>,
51}
52
53impl RuntimeStorageNamespaceRegistry {
54    pub fn new(persistent_id: RuntimeNamespaceId) -> Self {
55        Self {
56            persistent_id: persistent_id.clone(),
57            namespaces: FxHashMap::default(),
58            schema_map: FxHashMap::default(),
59            priority: vec![persistent_id],
60            table_map: FxHashMap::default(),
61            namespace_tables: FxHashMap::default(),
62        }
63    }
64
65    pub fn persistent_id(&self) -> &str {
66        &self.persistent_id
67    }
68
69    pub fn register_namespace<N, I>(
70        &mut self,
71        namespace: Arc<N>,
72        schemas: I,
73        prefer_unqualified: bool,
74    ) where
75        N: RuntimeStorageNamespace + 'static,
76        I: IntoIterator<Item = String>,
77    {
78        let id = namespace.namespace_id().to_string();
79        let entry = NamespaceEntry::new(namespace);
80        for schema in schemas {
81            self.schema_map.insert(schema, id.clone());
82        }
83        if prefer_unqualified {
84            if !self.priority.iter().any(|existing| existing == &id) {
85                self.priority.insert(0, id.clone());
86            }
87        } else if !self.priority.iter().any(|existing| existing == &id) {
88            self.priority.push(id.clone());
89        }
90        self.namespace_tables.entry(id.clone()).or_default();
91        self.namespaces.insert(id, entry);
92    }
93
94    pub fn register_schema(&mut self, schema: impl Into<String>, namespace: RuntimeNamespaceId) {
95        self.schema_map.insert(schema.into(), namespace);
96    }
97
98    pub fn set_unqualified_priority(&mut self, order: Vec<RuntimeNamespaceId>) {
99        self.priority = order;
100    }
101
102    pub fn register_table(&mut self, namespace: &RuntimeNamespaceId, canonical: String) {
103        let namespace_id = namespace.clone();
104        self.table_map
105            .insert(canonical.clone(), namespace_id.clone());
106        self.namespace_tables
107            .entry(namespace_id)
108            .or_default()
109            .insert(canonical);
110    }
111
112    pub fn unregister_table(&mut self, canonical: &str) -> Option<RuntimeNamespaceId> {
113        if let Some(namespace_id) = self.table_map.remove(canonical) {
114            if let Some(tables) = self.namespace_tables.get_mut(&namespace_id) {
115                tables.remove(canonical);
116            }
117            Some(namespace_id)
118        } else {
119            None
120        }
121    }
122
123    pub fn drain_namespace_tables(&mut self, namespace: &RuntimeNamespaceId) -> Vec<String> {
124        let tables = self.namespace_tables.entry(namespace.clone()).or_default();
125        let drained: Vec<String> = tables.drain().collect();
126        for canonical in &drained {
127            self.table_map.remove(canonical);
128        }
129        drained
130    }
131
132    pub fn namespace_for_schema(&self, schema: &str) -> Option<&RuntimeNamespaceId> {
133        self.schema_map.get(schema)
134    }
135
136    pub fn namespace_ops(&self, id: &str) -> Option<Arc<dyn RuntimeStorageNamespaceOps>> {
137        self.namespaces.get(id).map(NamespaceEntry::ops)
138    }
139
140    pub fn namespace<N>(&self, id: &str) -> Option<Arc<N>>
141    where
142        N: RuntimeStorageNamespace + 'static,
143    {
144        self.namespaces
145            .get(id)
146            .and_then(|entry| entry.as_typed::<N>())
147    }
148
149    pub fn unqualified_order(&self) -> Box<dyn Iterator<Item = &RuntimeNamespaceId> + '_> {
150        if self.priority.is_empty() {
151            Box::new(std::iter::once(&self.persistent_id))
152        } else {
153            Box::new(self.priority.iter())
154        }
155    }
156
157    pub fn is_temporary_table(&self, canonical: &str) -> bool {
158        matches!(
159            self.table_map.get(canonical),
160            Some(namespace_id) if namespace_id == TEMPORARY_NAMESPACE_ID
161        )
162    }
163
164    pub fn namespace_for_table(&self, canonical: &str) -> RuntimeNamespaceId {
165        self.table_map
166            .get(canonical)
167            .cloned()
168            .unwrap_or_else(|| self.persistent_id.clone())
169    }
170}