reifydb_engine/table_virtual/
registry.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Registry for user-defined virtual table factories.
5
6use std::{
7	collections::HashMap,
8	sync::{Arc, RwLock},
9};
10
11use async_trait::async_trait;
12use reifydb_core::interface::{NamespaceId, TableVirtualDef, TableVirtualId};
13
14use super::{
15	TableVirtual, VirtualTableFactory,
16	adapter::TableVirtualUserAdapter,
17	user::{TableVirtualUser, TableVirtualUserIterator},
18};
19use crate::transaction::StandardTransaction;
20
21/// Registry for user-defined virtual table factories.
22///
23/// This registry stores the runtime factories that create virtual table instances.
24/// It works in conjunction with `MaterializedCatalog` which stores the definitions.
25#[derive(Clone)]
26pub struct TableVirtualUserRegistry {
27	inner: Arc<RwLock<TableVirtualUserRegistryInner>>,
28}
29
30struct TableVirtualUserRegistryInner {
31	/// Factories keyed by (namespace_id, table_name)
32	factories: HashMap<(NamespaceId, String), Arc<dyn VirtualTableFactory>>,
33	/// Factories by ID for fast lookup
34	factories_by_id: HashMap<TableVirtualId, Arc<dyn VirtualTableFactory>>,
35	/// Next ID to assign (starts at 1000 to leave room for system tables)
36	next_id: u64,
37}
38
39impl Default for TableVirtualUserRegistry {
40	fn default() -> Self {
41		Self::new()
42	}
43}
44
45impl TableVirtualUserRegistry {
46	/// Create a new empty registry.
47	pub fn new() -> Self {
48		Self {
49			inner: Arc::new(RwLock::new(TableVirtualUserRegistryInner {
50				factories: HashMap::new(),
51				factories_by_id: HashMap::new(),
52				next_id: 1000,
53			})),
54		}
55	}
56
57	/// Allocate a new table virtual ID.
58	pub fn allocate_id(&self) -> TableVirtualId {
59		let mut inner = self.inner.write().unwrap();
60		let id = TableVirtualId(inner.next_id);
61		inner.next_id += 1;
62		id
63	}
64
65	/// Register a factory for a user virtual table.
66	///
67	/// The definition should already be registered in `MaterializedCatalog`.
68	pub fn register(&self, namespace: NamespaceId, name: String, factory: Arc<dyn VirtualTableFactory>) {
69		let mut inner = self.inner.write().unwrap();
70		let id = factory.definition().id;
71		inner.factories.insert((namespace, name), factory.clone());
72		inner.factories_by_id.insert(id, factory);
73	}
74
75	/// Unregister a user virtual table.
76	pub fn unregister(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
77		let mut inner = self.inner.write().unwrap();
78		if let Some(factory) = inner.factories.remove(&(namespace, name.to_string())) {
79			let id = factory.definition().id;
80			inner.factories_by_id.remove(&id);
81			Some(factory)
82		} else {
83			None
84		}
85	}
86
87	/// Find a factory by namespace and name.
88	pub fn find_by_name(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
89		let inner = self.inner.read().unwrap();
90		inner.factories.get(&(namespace, name.to_string())).cloned()
91	}
92
93	/// Find a factory by ID.
94	pub fn find_by_id(&self, id: TableVirtualId) -> Option<Arc<dyn VirtualTableFactory>> {
95		let inner = self.inner.read().unwrap();
96		inner.factories_by_id.get(&id).cloned()
97	}
98
99	/// List all registered factories.
100	pub fn list_all(&self) -> Vec<Arc<dyn VirtualTableFactory>> {
101		let inner = self.inner.read().unwrap();
102		inner.factories.values().cloned().collect()
103	}
104}
105
106/// Factory implementation for `TableVirtualUser` types.
107pub struct SimpleVirtualTableFactory<T: TableVirtualUser + Clone> {
108	template: T,
109	definition: Arc<TableVirtualDef>,
110}
111
112impl<T: TableVirtualUser + Clone> SimpleVirtualTableFactory<T> {
113	/// Create a new factory for the given user table.
114	pub fn new(template: T, definition: Arc<TableVirtualDef>) -> Self {
115		Self {
116			template,
117			definition,
118		}
119	}
120}
121
122impl<T: TableVirtualUser + Clone> VirtualTableFactory for SimpleVirtualTableFactory<T> {
123	fn create_boxed(&self) -> Box<dyn TableVirtual + Send + Sync> {
124		Box::new(TableVirtualUserAdapter::new(self.template.clone(), self.definition.clone()))
125	}
126
127	fn definition(&self) -> Arc<TableVirtualDef> {
128		self.definition.clone()
129	}
130}
131
132/// Factory implementation for `TableVirtualUserIterator` types.
133///
134/// This factory creates fresh iterator instances for each query.
135pub struct IteratorVirtualTableFactory<F>
136where
137	F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
138{
139	creator: F,
140	definition: Arc<TableVirtualDef>,
141}
142
143impl<F> IteratorVirtualTableFactory<F>
144where
145	F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
146{
147	/// Create a new factory with the given creator function.
148	pub fn new(creator: F, definition: Arc<TableVirtualDef>) -> Self {
149		Self {
150			creator,
151			definition,
152		}
153	}
154}
155
156impl<F> VirtualTableFactory for IteratorVirtualTableFactory<F>
157where
158	F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
159{
160	fn create_boxed(&self) -> Box<dyn TableVirtual + Send + Sync> {
161		let iter = (self.creator)();
162		Box::new(IteratorAdapter {
163			inner: iter,
164			definition: self.definition.clone(),
165			initialized: false,
166			batch_size: 1000,
167		})
168	}
169
170	fn definition(&self) -> Arc<TableVirtualDef> {
171		self.definition.clone()
172	}
173}
174
175/// Internal adapter for boxed iterators.
176struct IteratorAdapter {
177	inner: Box<dyn TableVirtualUserIterator>,
178	definition: Arc<TableVirtualDef>,
179	initialized: bool,
180	batch_size: usize,
181}
182
183#[async_trait]
184impl TableVirtual for IteratorAdapter {
185	async fn initialize<'a>(
186		&mut self,
187		_txn: &mut StandardTransaction<'a>,
188		ctx: super::TableVirtualContext,
189	) -> crate::Result<()> {
190		use super::user::TableVirtualUserPushdownContext;
191
192		let user_ctx = match ctx {
193			super::TableVirtualContext::Basic {
194				..
195			} => None,
196			super::TableVirtualContext::PushDown {
197				limit,
198				..
199			} => Some(TableVirtualUserPushdownContext {
200				limit,
201			}),
202		};
203
204		self.inner.initialize(user_ctx.as_ref()).await?;
205		self.initialized = true;
206		Ok(())
207	}
208
209	async fn next<'a>(
210		&mut self,
211		_txn: &mut StandardTransaction<'a>,
212	) -> crate::Result<Option<crate::execute::Batch>> {
213		use reifydb_core::value::column::Columns;
214
215		if !self.initialized {
216			return Ok(None);
217		}
218
219		let user_columns = self.inner.columns();
220		let user_rows = self.inner.next_batch(self.batch_size).await?;
221
222		match user_rows {
223			None => Ok(None),
224			Some(rows) if rows.is_empty() => Ok(None),
225			Some(rows) => {
226				let columns = super::adapter::convert_rows_to_columns(&user_columns, rows);
227				Ok(Some(crate::execute::Batch {
228					columns: Columns::new(columns),
229				}))
230			}
231		}
232	}
233
234	fn definition(&self) -> &TableVirtualDef {
235		&self.definition
236	}
237}
238
239// Make IteratorAdapter Send + Sync
240unsafe impl Send for IteratorAdapter {}
241unsafe impl Sync for IteratorAdapter {}