reifydb_engine/table_virtual/
registry.rs1use std::{
7 collections::HashMap,
8 sync::{Arc, RwLock},
9};
10
11use reifydb_core::interface::{NamespaceId, TableVirtualDef, TableVirtualId};
12
13use super::{
14 VirtualTableFactory,
15 adapter::TableVirtualUserAdapter,
16 user::{TableVirtualUser, TableVirtualUserIterator},
17};
18
19#[derive(Clone)]
24pub struct TableVirtualUserRegistry {
25 inner: Arc<RwLock<TableVirtualUserRegistryInner>>,
26}
27
28struct TableVirtualUserRegistryInner {
29 factories: HashMap<(NamespaceId, String), Arc<dyn VirtualTableFactory>>,
31 factories_by_id: HashMap<TableVirtualId, Arc<dyn VirtualTableFactory>>,
33 next_id: u64,
35}
36
37impl Default for TableVirtualUserRegistry {
38 fn default() -> Self {
39 Self::new()
40 }
41}
42
43impl TableVirtualUserRegistry {
44 pub fn new() -> Self {
46 Self {
47 inner: Arc::new(RwLock::new(TableVirtualUserRegistryInner {
48 factories: HashMap::new(),
49 factories_by_id: HashMap::new(),
50 next_id: 1000,
51 })),
52 }
53 }
54
55 pub fn allocate_id(&self) -> TableVirtualId {
57 let mut inner = self.inner.write().unwrap();
58 let id = TableVirtualId(inner.next_id);
59 inner.next_id += 1;
60 id
61 }
62
63 pub fn register(&self, namespace: NamespaceId, name: String, factory: Arc<dyn VirtualTableFactory>) {
67 let mut inner = self.inner.write().unwrap();
68 let id = factory.definition().id;
69 inner.factories.insert((namespace, name), factory.clone());
70 inner.factories_by_id.insert(id, factory);
71 }
72
73 pub fn unregister(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
75 let mut inner = self.inner.write().unwrap();
76 if let Some(factory) = inner.factories.remove(&(namespace, name.to_string())) {
77 let id = factory.definition().id;
78 inner.factories_by_id.remove(&id);
79 Some(factory)
80 } else {
81 None
82 }
83 }
84
85 pub fn find_by_name(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
87 let inner = self.inner.read().unwrap();
88 inner.factories.get(&(namespace, name.to_string())).cloned()
89 }
90
91 pub fn find_by_id(&self, id: TableVirtualId) -> Option<Arc<dyn VirtualTableFactory>> {
93 let inner = self.inner.read().unwrap();
94 inner.factories_by_id.get(&id).cloned()
95 }
96
97 pub fn list_all(&self) -> Vec<Arc<dyn VirtualTableFactory>> {
99 let inner = self.inner.read().unwrap();
100 inner.factories.values().cloned().collect()
101 }
102}
103
104pub struct SimpleVirtualTableFactory<T: TableVirtualUser + Clone> {
106 template: T,
107 definition: Arc<TableVirtualDef>,
108}
109
110impl<T: TableVirtualUser + Clone> SimpleVirtualTableFactory<T> {
111 pub fn new(template: T, definition: Arc<TableVirtualDef>) -> Self {
113 Self {
114 template,
115 definition,
116 }
117 }
118}
119
120impl<T: TableVirtualUser + Clone> VirtualTableFactory for SimpleVirtualTableFactory<T> {
121 fn create_boxed(&self) -> Box<dyn super::TableVirtual<'static> + Send + Sync> {
122 Box::new(TableVirtualUserAdapter::new(self.template.clone(), self.definition.clone()))
123 }
124
125 fn definition(&self) -> Arc<TableVirtualDef> {
126 self.definition.clone()
127 }
128}
129
130pub struct IteratorVirtualTableFactory<F>
134where
135 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
136{
137 creator: F,
138 definition: Arc<TableVirtualDef>,
139}
140
141impl<F> IteratorVirtualTableFactory<F>
142where
143 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
144{
145 pub fn new(creator: F, definition: Arc<TableVirtualDef>) -> Self {
147 Self {
148 creator,
149 definition,
150 }
151 }
152}
153
154impl<F> VirtualTableFactory for IteratorVirtualTableFactory<F>
155where
156 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
157{
158 fn create_boxed(&self) -> Box<dyn super::TableVirtual<'static> + Send + Sync> {
159 let iter = (self.creator)();
160 Box::new(IteratorAdapter {
161 inner: iter,
162 definition: self.definition.clone(),
163 initialized: false,
164 batch_size: 1000,
165 })
166 }
167
168 fn definition(&self) -> Arc<TableVirtualDef> {
169 self.definition.clone()
170 }
171}
172
173struct IteratorAdapter {
175 inner: Box<dyn TableVirtualUserIterator>,
176 definition: Arc<TableVirtualDef>,
177 initialized: bool,
178 batch_size: usize,
179}
180
181impl<'a> super::TableVirtual<'a> for IteratorAdapter {
182 fn initialize(
183 &mut self,
184 _txn: &mut crate::transaction::StandardTransaction<'a>,
185 ctx: super::TableVirtualContext<'a>,
186 ) -> crate::Result<()> {
187 use super::user::TableVirtualUserPushdownContext;
188
189 let user_ctx = match ctx {
190 super::TableVirtualContext::Basic {
191 ..
192 } => None,
193 super::TableVirtualContext::PushDown {
194 limit,
195 ..
196 } => Some(TableVirtualUserPushdownContext {
197 limit,
198 }),
199 };
200
201 self.inner.initialize(user_ctx.as_ref())?;
202 self.initialized = true;
203 Ok(())
204 }
205
206 fn next(
207 &mut self,
208 _txn: &mut crate::transaction::StandardTransaction<'a>,
209 ) -> crate::Result<Option<crate::execute::Batch<'a>>> {
210 use reifydb_core::value::column::Columns;
211
212 if !self.initialized {
213 return Ok(None);
214 }
215
216 let user_columns = self.inner.columns();
217 let user_rows = self.inner.next_batch(self.batch_size)?;
218
219 match user_rows {
220 None => Ok(None),
221 Some(rows) if rows.is_empty() => Ok(None),
222 Some(rows) => {
223 let columns = super::adapter::convert_rows_to_columns(&user_columns, rows);
224 Ok(Some(crate::execute::Batch {
225 columns: Columns::new(columns),
226 }))
227 }
228 }
229 }
230
231 fn definition(&self) -> &TableVirtualDef {
232 &self.definition
233 }
234}
235
236unsafe impl Send for IteratorAdapter {}
238unsafe impl Sync for IteratorAdapter {}