llkv_runtime/runtime_storage_namespace/
namespace.rs1use std::any::Any;
10use std::sync::{Arc, RwLock};
11
12use llkv_executor::ExecutorTable;
13use llkv_plan::{
14 AlterTablePlan, CreateIndexPlan, CreateTablePlan, DropIndexPlan, DropTablePlan, PlanOperation,
15 RenameTablePlan,
16};
17use llkv_result::Error;
18use llkv_storage::pager::Pager;
19use llkv_transaction::TransactionResult;
20use simd_r_drive_entry_handle::EntryHandle;
21
22use crate::{RuntimeContext, RuntimeStatementResult};
23use llkv_table::{CatalogDdl, SingleColumnIndexDescriptor};
24
25pub type RuntimeNamespaceId = String;
27
28pub const PERSISTENT_NAMESPACE_ID: &str = "main";
30pub const TEMPORARY_NAMESPACE_ID: &str = "temp";
32
33pub trait RuntimeStorageNamespace: Send + Sync + 'static {
40 type Pager: Pager<Blob = EntryHandle> + Send + Sync + 'static;
41
42 fn namespace_id(&self) -> &RuntimeNamespaceId;
44
45 fn context(&self) -> Arc<RuntimeContext<Self::Pager>>;
47
48 fn create_table(
50 &self,
51 plan: CreateTablePlan,
52 ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
53
54 fn drop_table(&self, plan: DropTablePlan) -> crate::Result<()>;
56
57 fn rename_table(&self, plan: RenameTablePlan) -> crate::Result<()>;
59
60 fn alter_table(
62 &self,
63 plan: AlterTablePlan,
64 ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
65
66 fn create_index(
68 &self,
69 plan: CreateIndexPlan,
70 ) -> crate::Result<RuntimeStatementResult<Self::Pager>>;
71
72 fn drop_index(&self, plan: DropIndexPlan)
74 -> crate::Result<Option<SingleColumnIndexDescriptor>>;
75
76 fn execute_operation(
79 &self,
80 operation: PlanOperation,
81 ) -> crate::Result<TransactionResult<Self::Pager>> {
82 let _ = operation;
83 Err(Error::Internal(format!(
84 "runtime namespace '{}' does not yet support execute_operation",
85 self.namespace_id()
86 )))
87 }
88
89 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>>;
91
92 fn list_tables(&self) -> Vec<String>;
94}
95
96pub trait RuntimeStorageNamespaceOps: Send + Sync {
99 fn namespace_id(&self) -> &RuntimeNamespaceId;
100 fn list_tables(&self) -> Vec<String>;
101 fn as_any(&self) -> &dyn Any;
102}
103
104impl<T> RuntimeStorageNamespaceOps for T
105where
106 T: RuntimeStorageNamespace,
107{
108 fn namespace_id(&self) -> &RuntimeNamespaceId {
109 RuntimeStorageNamespace::namespace_id(self)
110 }
111
112 fn list_tables(&self) -> Vec<String> {
113 RuntimeStorageNamespace::list_tables(self)
114 }
115
116 fn as_any(&self) -> &dyn Any {
117 self
118 }
119}
120
121#[derive(Clone)]
123pub struct PersistentRuntimeNamespace<P>
124where
125 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
126{
127 id: RuntimeNamespaceId,
128 context: Arc<RuntimeContext<P>>,
129}
130
131impl<P> PersistentRuntimeNamespace<P>
132where
133 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
134{
135 pub fn new(id: RuntimeNamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
136 Self { id, context }
137 }
138}
139
140impl<P> RuntimeStorageNamespace for PersistentRuntimeNamespace<P>
141where
142 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
143{
144 type Pager = P;
145
146 fn namespace_id(&self) -> &RuntimeNamespaceId {
147 &self.id
148 }
149
150 fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
151 Arc::clone(&self.context)
152 }
153
154 fn create_table(
155 &self,
156 plan: CreateTablePlan,
157 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
158 CatalogDdl::create_table(self.context.as_ref(), plan)
159 }
160
161 fn drop_table(&self, plan: DropTablePlan) -> crate::Result<()> {
162 CatalogDdl::drop_table(self.context.as_ref(), plan)
163 }
164
165 fn rename_table(&self, plan: RenameTablePlan) -> crate::Result<()> {
166 CatalogDdl::rename_table(self.context.as_ref(), plan)
167 }
168
169 fn alter_table(
170 &self,
171 plan: AlterTablePlan,
172 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
173 CatalogDdl::alter_table(self.context.as_ref(), plan)
174 }
175
176 fn create_index(
177 &self,
178 plan: CreateIndexPlan,
179 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
180 CatalogDdl::create_index(self.context.as_ref(), plan)
181 }
182
183 fn drop_index(
184 &self,
185 plan: DropIndexPlan,
186 ) -> crate::Result<Option<SingleColumnIndexDescriptor>> {
187 CatalogDdl::drop_index(self.context.as_ref(), plan)
188 }
189
190 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
191 self.context.lookup_table(canonical)
192 }
193
194 fn list_tables(&self) -> Vec<String> {
195 self.context.table_names()
196 }
197}
198
199#[derive(Clone)]
201pub struct TemporaryRuntimeNamespace<P>
202where
203 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
204{
205 id: RuntimeNamespaceId,
206 context: Arc<RwLock<Arc<RuntimeContext<P>>>>,
207}
208
209impl<P> TemporaryRuntimeNamespace<P>
210where
211 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
212{
213 pub fn new(id: RuntimeNamespaceId, context: Arc<RuntimeContext<P>>) -> Self {
214 Self {
215 id,
216 context: Arc::new(RwLock::new(context)),
217 }
218 }
219
220 pub fn replace_context(&self, context: Arc<RuntimeContext<P>>) {
221 let mut guard = self
222 .context
223 .write()
224 .expect("temporary context lock poisoned");
225 *guard = context;
226 }
227
228 pub fn clear_tables<I>(&self, canonical_names: I)
229 where
230 I: IntoIterator<Item = String>,
231 {
232 let canonical_names: Vec<String> = canonical_names.into_iter().collect();
233 if canonical_names.is_empty() {
234 return;
235 }
236
237 let catalog = self.context().table_catalog();
238 for canonical in canonical_names {
239 if let Some(table_id) = catalog.table_id(&canonical)
240 && catalog.unregister_table(table_id)
241 {
242 tracing::debug!(
243 "[TEMP] Unregistered temporary table '{}' from shared catalog",
244 canonical
245 );
246 }
247 }
248 }
249
250 pub fn context(&self) -> Arc<RuntimeContext<P>> {
251 Arc::clone(
252 &self
253 .context
254 .read()
255 .expect("temporary context lock poisoned"),
256 )
257 }
258}
259
260impl<P> RuntimeStorageNamespace for TemporaryRuntimeNamespace<P>
261where
262 P: Pager<Blob = EntryHandle> + Send + Sync + 'static,
263{
264 type Pager = P;
265
266 fn namespace_id(&self) -> &RuntimeNamespaceId {
267 &self.id
268 }
269
270 fn context(&self) -> Arc<RuntimeContext<Self::Pager>> {
271 self.context()
272 }
273
274 fn create_table(
275 &self,
276 plan: CreateTablePlan,
277 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
278 let context = self.context();
279 let result = CatalogDdl::create_table(context.as_ref(), plan)?;
280 Ok(result)
281 }
282
283 fn drop_table(&self, plan: DropTablePlan) -> crate::Result<()> {
284 let context = self.context();
285 CatalogDdl::drop_table(context.as_ref(), plan)?;
286 Ok(())
287 }
288
289 fn rename_table(&self, plan: RenameTablePlan) -> crate::Result<()> {
290 let context = self.context();
291 CatalogDdl::rename_table(context.as_ref(), plan)?;
292 Ok(())
293 }
294
295 fn alter_table(
296 &self,
297 plan: AlterTablePlan,
298 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
299 let context = self.context();
300 CatalogDdl::alter_table(context.as_ref(), plan)
301 }
302
303 fn create_index(
304 &self,
305 plan: CreateIndexPlan,
306 ) -> crate::Result<RuntimeStatementResult<Self::Pager>> {
307 let context = self.context();
308 CatalogDdl::create_index(context.as_ref(), plan)
309 }
310
311 fn drop_index(
312 &self,
313 plan: DropIndexPlan,
314 ) -> crate::Result<Option<SingleColumnIndexDescriptor>> {
315 let context = self.context();
316 CatalogDdl::drop_index(context.as_ref(), plan)
317 }
318
319 fn lookup_table(&self, canonical: &str) -> crate::Result<Arc<ExecutorTable<Self::Pager>>> {
320 self.context().lookup_table(canonical)
321 }
322
323 fn list_tables(&self) -> Vec<String> {
324 self.context().table_names()
325 }
326}