reifydb_engine/table_virtual/
registry.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Registry for user-defined virtual table factories.
5
6use std::{
7	collections::HashMap,
8	sync::{Arc, RwLock},
9};
10
11use reifydb_core::interface::{NamespaceId, TableVirtualDef, TableVirtualId};
12
13use super::{
14	VirtualTableFactory,
15	adapter::TableVirtualUserAdapter,
16	user::{TableVirtualUser, TableVirtualUserIterator},
17};
18
19/// Registry for user-defined virtual table factories.
20///
21/// This registry stores the runtime factories that create virtual table instances.
22/// It works in conjunction with `MaterializedCatalog` which stores the definitions.
23#[derive(Clone)]
24pub struct TableVirtualUserRegistry {
25	inner: Arc<RwLock<TableVirtualUserRegistryInner>>,
26}
27
28struct TableVirtualUserRegistryInner {
29	/// Factories keyed by (namespace_id, table_name)
30	factories: HashMap<(NamespaceId, String), Arc<dyn VirtualTableFactory>>,
31	/// Factories by ID for fast lookup
32	factories_by_id: HashMap<TableVirtualId, Arc<dyn VirtualTableFactory>>,
33	/// Next ID to assign (starts at 1000 to leave room for system tables)
34	next_id: u64,
35}
36
37impl Default for TableVirtualUserRegistry {
38	fn default() -> Self {
39		Self::new()
40	}
41}
42
43impl TableVirtualUserRegistry {
44	/// Create a new empty registry.
45	pub fn new() -> Self {
46		Self {
47			inner: Arc::new(RwLock::new(TableVirtualUserRegistryInner {
48				factories: HashMap::new(),
49				factories_by_id: HashMap::new(),
50				next_id: 1000,
51			})),
52		}
53	}
54
55	/// Allocate a new table virtual ID.
56	pub fn allocate_id(&self) -> TableVirtualId {
57		let mut inner = self.inner.write().unwrap();
58		let id = TableVirtualId(inner.next_id);
59		inner.next_id += 1;
60		id
61	}
62
63	/// Register a factory for a user virtual table.
64	///
65	/// The definition should already be registered in `MaterializedCatalog`.
66	pub fn register(&self, namespace: NamespaceId, name: String, factory: Arc<dyn VirtualTableFactory>) {
67		let mut inner = self.inner.write().unwrap();
68		let id = factory.definition().id;
69		inner.factories.insert((namespace, name), factory.clone());
70		inner.factories_by_id.insert(id, factory);
71	}
72
73	/// Unregister a user virtual table.
74	pub fn unregister(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
75		let mut inner = self.inner.write().unwrap();
76		if let Some(factory) = inner.factories.remove(&(namespace, name.to_string())) {
77			let id = factory.definition().id;
78			inner.factories_by_id.remove(&id);
79			Some(factory)
80		} else {
81			None
82		}
83	}
84
85	/// Find a factory by namespace and name.
86	pub fn find_by_name(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
87		let inner = self.inner.read().unwrap();
88		inner.factories.get(&(namespace, name.to_string())).cloned()
89	}
90
91	/// Find a factory by ID.
92	pub fn find_by_id(&self, id: TableVirtualId) -> Option<Arc<dyn VirtualTableFactory>> {
93		let inner = self.inner.read().unwrap();
94		inner.factories_by_id.get(&id).cloned()
95	}
96
97	/// List all registered factories.
98	pub fn list_all(&self) -> Vec<Arc<dyn VirtualTableFactory>> {
99		let inner = self.inner.read().unwrap();
100		inner.factories.values().cloned().collect()
101	}
102}
103
104/// Factory implementation for `TableVirtualUser` types.
105pub struct SimpleVirtualTableFactory<T: TableVirtualUser + Clone> {
106	template: T,
107	definition: Arc<TableVirtualDef>,
108}
109
110impl<T: TableVirtualUser + Clone> SimpleVirtualTableFactory<T> {
111	/// Create a new factory for the given user table.
112	pub fn new(template: T, definition: Arc<TableVirtualDef>) -> Self {
113		Self {
114			template,
115			definition,
116		}
117	}
118}
119
120impl<T: TableVirtualUser + Clone> VirtualTableFactory for SimpleVirtualTableFactory<T> {
121	fn create_boxed(&self) -> Box<dyn super::TableVirtual<'static> + Send + Sync> {
122		Box::new(TableVirtualUserAdapter::new(self.template.clone(), self.definition.clone()))
123	}
124
125	fn definition(&self) -> Arc<TableVirtualDef> {
126		self.definition.clone()
127	}
128}
129
130/// Factory implementation for `TableVirtualUserIterator` types.
131///
132/// This factory creates fresh iterator instances for each query.
133pub struct IteratorVirtualTableFactory<F>
134where
135	F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
136{
137	creator: F,
138	definition: Arc<TableVirtualDef>,
139}
140
141impl<F> IteratorVirtualTableFactory<F>
142where
143	F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
144{
145	/// Create a new factory with the given creator function.
146	pub fn new(creator: F, definition: Arc<TableVirtualDef>) -> Self {
147		Self {
148			creator,
149			definition,
150		}
151	}
152}
153
154impl<F> VirtualTableFactory for IteratorVirtualTableFactory<F>
155where
156	F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
157{
158	fn create_boxed(&self) -> Box<dyn super::TableVirtual<'static> + Send + Sync> {
159		let iter = (self.creator)();
160		Box::new(IteratorAdapter {
161			inner: iter,
162			definition: self.definition.clone(),
163			initialized: false,
164			batch_size: 1000,
165		})
166	}
167
168	fn definition(&self) -> Arc<TableVirtualDef> {
169		self.definition.clone()
170	}
171}
172
173/// Internal adapter for boxed iterators.
174struct IteratorAdapter {
175	inner: Box<dyn TableVirtualUserIterator>,
176	definition: Arc<TableVirtualDef>,
177	initialized: bool,
178	batch_size: usize,
179}
180
181impl<'a> super::TableVirtual<'a> for IteratorAdapter {
182	fn initialize(
183		&mut self,
184		_txn: &mut crate::transaction::StandardTransaction<'a>,
185		ctx: super::TableVirtualContext<'a>,
186	) -> crate::Result<()> {
187		use super::user::TableVirtualUserPushdownContext;
188
189		let user_ctx = match ctx {
190			super::TableVirtualContext::Basic {
191				..
192			} => None,
193			super::TableVirtualContext::PushDown {
194				limit,
195				..
196			} => Some(TableVirtualUserPushdownContext {
197				limit,
198			}),
199		};
200
201		self.inner.initialize(user_ctx.as_ref())?;
202		self.initialized = true;
203		Ok(())
204	}
205
206	fn next(
207		&mut self,
208		_txn: &mut crate::transaction::StandardTransaction<'a>,
209	) -> crate::Result<Option<crate::execute::Batch<'a>>> {
210		use reifydb_core::value::column::Columns;
211
212		if !self.initialized {
213			return Ok(None);
214		}
215
216		let user_columns = self.inner.columns();
217		let user_rows = self.inner.next_batch(self.batch_size)?;
218
219		match user_rows {
220			None => Ok(None),
221			Some(rows) if rows.is_empty() => Ok(None),
222			Some(rows) => {
223				let columns = super::adapter::convert_rows_to_columns(&user_columns, rows);
224				Ok(Some(crate::execute::Batch {
225					columns: Columns::new(columns),
226				}))
227			}
228		}
229	}
230
231	fn definition(&self) -> &TableVirtualDef {
232		&self.definition
233	}
234}
235
236// Make IteratorAdapter Send + Sync
237unsafe impl Send for IteratorAdapter {}
238unsafe impl Sync for IteratorAdapter {}