reifydb_engine/table_virtual/
registry.rs1use std::{
7 collections::HashMap,
8 sync::{Arc, RwLock},
9};
10
11use async_trait::async_trait;
12use reifydb_core::interface::{NamespaceId, TableVirtualDef, TableVirtualId};
13
14use super::{
15 TableVirtual, VirtualTableFactory,
16 adapter::TableVirtualUserAdapter,
17 user::{TableVirtualUser, TableVirtualUserIterator},
18};
19use crate::transaction::StandardTransaction;
20
21#[derive(Clone)]
26pub struct TableVirtualUserRegistry {
27 inner: Arc<RwLock<TableVirtualUserRegistryInner>>,
28}
29
30struct TableVirtualUserRegistryInner {
31 factories: HashMap<(NamespaceId, String), Arc<dyn VirtualTableFactory>>,
33 factories_by_id: HashMap<TableVirtualId, Arc<dyn VirtualTableFactory>>,
35 next_id: u64,
37}
38
39impl Default for TableVirtualUserRegistry {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45impl TableVirtualUserRegistry {
46 pub fn new() -> Self {
48 Self {
49 inner: Arc::new(RwLock::new(TableVirtualUserRegistryInner {
50 factories: HashMap::new(),
51 factories_by_id: HashMap::new(),
52 next_id: 1000,
53 })),
54 }
55 }
56
57 pub fn allocate_id(&self) -> TableVirtualId {
59 let mut inner = self.inner.write().unwrap();
60 let id = TableVirtualId(inner.next_id);
61 inner.next_id += 1;
62 id
63 }
64
65 pub fn register(&self, namespace: NamespaceId, name: String, factory: Arc<dyn VirtualTableFactory>) {
69 let mut inner = self.inner.write().unwrap();
70 let id = factory.definition().id;
71 inner.factories.insert((namespace, name), factory.clone());
72 inner.factories_by_id.insert(id, factory);
73 }
74
75 pub fn unregister(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
77 let mut inner = self.inner.write().unwrap();
78 if let Some(factory) = inner.factories.remove(&(namespace, name.to_string())) {
79 let id = factory.definition().id;
80 inner.factories_by_id.remove(&id);
81 Some(factory)
82 } else {
83 None
84 }
85 }
86
87 pub fn find_by_name(&self, namespace: NamespaceId, name: &str) -> Option<Arc<dyn VirtualTableFactory>> {
89 let inner = self.inner.read().unwrap();
90 inner.factories.get(&(namespace, name.to_string())).cloned()
91 }
92
93 pub fn find_by_id(&self, id: TableVirtualId) -> Option<Arc<dyn VirtualTableFactory>> {
95 let inner = self.inner.read().unwrap();
96 inner.factories_by_id.get(&id).cloned()
97 }
98
99 pub fn list_all(&self) -> Vec<Arc<dyn VirtualTableFactory>> {
101 let inner = self.inner.read().unwrap();
102 inner.factories.values().cloned().collect()
103 }
104}
105
106pub struct SimpleVirtualTableFactory<T: TableVirtualUser + Clone> {
108 template: T,
109 definition: Arc<TableVirtualDef>,
110}
111
112impl<T: TableVirtualUser + Clone> SimpleVirtualTableFactory<T> {
113 pub fn new(template: T, definition: Arc<TableVirtualDef>) -> Self {
115 Self {
116 template,
117 definition,
118 }
119 }
120}
121
122impl<T: TableVirtualUser + Clone> VirtualTableFactory for SimpleVirtualTableFactory<T> {
123 fn create_boxed(&self) -> Box<dyn TableVirtual + Send + Sync> {
124 Box::new(TableVirtualUserAdapter::new(self.template.clone(), self.definition.clone()))
125 }
126
127 fn definition(&self) -> Arc<TableVirtualDef> {
128 self.definition.clone()
129 }
130}
131
132pub struct IteratorVirtualTableFactory<F>
136where
137 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
138{
139 creator: F,
140 definition: Arc<TableVirtualDef>,
141}
142
143impl<F> IteratorVirtualTableFactory<F>
144where
145 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
146{
147 pub fn new(creator: F, definition: Arc<TableVirtualDef>) -> Self {
149 Self {
150 creator,
151 definition,
152 }
153 }
154}
155
156impl<F> VirtualTableFactory for IteratorVirtualTableFactory<F>
157where
158 F: Fn() -> Box<dyn TableVirtualUserIterator> + Send + Sync + 'static,
159{
160 fn create_boxed(&self) -> Box<dyn TableVirtual + Send + Sync> {
161 let iter = (self.creator)();
162 Box::new(IteratorAdapter {
163 inner: iter,
164 definition: self.definition.clone(),
165 initialized: false,
166 batch_size: 1000,
167 })
168 }
169
170 fn definition(&self) -> Arc<TableVirtualDef> {
171 self.definition.clone()
172 }
173}
174
175struct IteratorAdapter {
177 inner: Box<dyn TableVirtualUserIterator>,
178 definition: Arc<TableVirtualDef>,
179 initialized: bool,
180 batch_size: usize,
181}
182
183#[async_trait]
184impl TableVirtual for IteratorAdapter {
185 async fn initialize<'a>(
186 &mut self,
187 _txn: &mut StandardTransaction<'a>,
188 ctx: super::TableVirtualContext,
189 ) -> crate::Result<()> {
190 use super::user::TableVirtualUserPushdownContext;
191
192 let user_ctx = match ctx {
193 super::TableVirtualContext::Basic {
194 ..
195 } => None,
196 super::TableVirtualContext::PushDown {
197 limit,
198 ..
199 } => Some(TableVirtualUserPushdownContext {
200 limit,
201 }),
202 };
203
204 self.inner.initialize(user_ctx.as_ref()).await?;
205 self.initialized = true;
206 Ok(())
207 }
208
209 async fn next<'a>(
210 &mut self,
211 _txn: &mut StandardTransaction<'a>,
212 ) -> crate::Result<Option<crate::execute::Batch>> {
213 use reifydb_core::value::column::Columns;
214
215 if !self.initialized {
216 return Ok(None);
217 }
218
219 let user_columns = self.inner.columns();
220 let user_rows = self.inner.next_batch(self.batch_size).await?;
221
222 match user_rows {
223 None => Ok(None),
224 Some(rows) if rows.is_empty() => Ok(None),
225 Some(rows) => {
226 let columns = super::adapter::convert_rows_to_columns(&user_columns, rows);
227 Ok(Some(crate::execute::Batch {
228 columns: Columns::new(columns),
229 }))
230 }
231 }
232 }
233
234 fn definition(&self) -> &TableVirtualDef {
235 &self.definition
236 }
237}
238
239unsafe impl Send for IteratorAdapter {}
241unsafe impl Sync for IteratorAdapter {}