reddb_server/storage/fdw/
mod.rs1use std::collections::HashMap;
34use std::sync::Arc;
35
36use crate::storage::query::unified::UnifiedRecord;
37
38pub mod csv;
39
40pub use csv::CsvForeignWrapper;
41
42#[derive(Debug, Clone, Default)]
46pub struct FdwOptions {
47 pub values: HashMap<String, String>,
48}
49
50impl FdwOptions {
51 pub fn new() -> Self {
52 Self::default()
53 }
54
55 pub fn with(mut self, key: &str, value: &str) -> Self {
56 self.values.insert(key.to_string(), value.to_string());
57 self
58 }
59
60 pub fn get(&self, key: &str) -> Option<&str> {
61 self.values.get(key).map(|s| s.as_str())
62 }
63
64 pub fn require(&self, key: &str) -> Result<&str, FdwError> {
68 self.get(key)
69 .ok_or_else(|| FdwError::MissingOption(key.to_string()))
70 }
71}
72
73#[derive(Debug)]
76pub enum FdwError {
77 MissingOption(String),
79 UnknownWrapper(String),
81 NotFound(String),
83 Io(String),
85 Custom(String),
87}
88
89impl std::fmt::Display for FdwError {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 FdwError::MissingOption(k) => write!(f, "FDW option '{k}' is required"),
93 FdwError::UnknownWrapper(k) => write!(f, "FDW wrapper '{k}' is not registered"),
94 FdwError::NotFound(n) => write!(f, "FDW object '{n}' not found"),
95 FdwError::Io(m) => write!(f, "FDW I/O error: {m}"),
96 FdwError::Custom(m) => write!(f, "FDW error: {m}"),
97 }
98 }
99}
100
101impl std::error::Error for FdwError {}
102
103pub trait WrapperState: Send + Sync {
107 fn as_any(&self) -> &dyn std::any::Any;
108}
109
110pub trait ForeignDataWrapper: Send + Sync {
117 fn kind(&self) -> &'static str;
120
121 fn build_server_state(
125 &self,
126 _options: &FdwOptions,
127 ) -> Result<Option<Arc<dyn WrapperState>>, FdwError> {
128 Ok(None)
129 }
130
131 fn scan(
136 &self,
137 server_state: Option<&Arc<dyn WrapperState>>,
138 table_options: &FdwOptions,
139 ) -> Result<Vec<UnifiedRecord>, FdwError>;
140
141 fn supports_pushdown(&self) -> bool {
145 false
146 }
147
148 fn estimated_row_count(
152 &self,
153 _server_state: Option<&Arc<dyn WrapperState>>,
154 _table_options: &FdwOptions,
155 ) -> Option<usize> {
156 None
157 }
158}
159
160#[derive(Clone)]
162pub struct ForeignServer {
163 pub name: String,
164 pub wrapper_kind: String,
165 pub options: FdwOptions,
166 pub wrapper: Arc<dyn ForeignDataWrapper>,
167 pub wrapper_state: Option<Arc<dyn WrapperState>>,
168}
169
170#[derive(Clone)]
172pub struct ForeignTable {
173 pub name: String,
174 pub server_name: String,
175 pub columns: Vec<ForeignColumn>,
176 pub options: FdwOptions,
177}
178
179#[derive(Debug, Clone)]
180pub struct ForeignColumn {
181 pub name: String,
182 pub data_type: String,
184 pub not_null: bool,
185}
186
187pub struct ForeignTableRegistry {
190 wrappers: parking_lot::RwLock<HashMap<String, Arc<dyn ForeignDataWrapper>>>,
191 servers: parking_lot::RwLock<HashMap<String, ForeignServer>>,
192 tables: parking_lot::RwLock<HashMap<String, ForeignTable>>,
193}
194
195impl ForeignTableRegistry {
196 pub fn with_builtins() -> Self {
198 let reg = Self {
199 wrappers: parking_lot::RwLock::new(HashMap::new()),
200 servers: parking_lot::RwLock::new(HashMap::new()),
201 tables: parking_lot::RwLock::new(HashMap::new()),
202 };
203 reg.register_wrapper(Arc::new(CsvForeignWrapper));
207 reg
208 }
209
210 pub fn register_wrapper(&self, wrapper: Arc<dyn ForeignDataWrapper>) {
211 self.wrappers
212 .write()
213 .insert(wrapper.kind().to_string(), wrapper);
214 }
215
216 pub fn create_server(
217 &self,
218 name: &str,
219 wrapper_kind: &str,
220 options: FdwOptions,
221 ) -> Result<(), FdwError> {
222 let wrapper = self
223 .wrappers
224 .read()
225 .get(wrapper_kind)
226 .cloned()
227 .ok_or_else(|| FdwError::UnknownWrapper(wrapper_kind.to_string()))?;
228
229 let wrapper_state = wrapper.build_server_state(&options)?;
230 let server = ForeignServer {
231 name: name.to_string(),
232 wrapper_kind: wrapper_kind.to_string(),
233 options,
234 wrapper,
235 wrapper_state,
236 };
237 self.servers.write().insert(name.to_string(), server);
238 Ok(())
239 }
240
241 pub fn drop_server(&self, name: &str) -> bool {
242 let mut tables = self.tables.write();
244 tables.retain(|_, t| t.server_name != name);
245 drop(tables);
246 self.servers.write().remove(name).is_some()
247 }
248
249 pub fn create_foreign_table(&self, table: ForeignTable) -> Result<(), FdwError> {
250 if !self.servers.read().contains_key(&table.server_name) {
252 return Err(FdwError::NotFound(format!(
253 "server '{}'",
254 table.server_name
255 )));
256 }
257 self.tables.write().insert(table.name.clone(), table);
258 Ok(())
259 }
260
261 pub fn drop_foreign_table(&self, name: &str) -> bool {
262 self.tables.write().remove(name).is_some()
263 }
264
265 pub fn is_foreign_table(&self, name: &str) -> bool {
266 self.tables.read().contains_key(name)
267 }
268
269 pub fn foreign_table(&self, name: &str) -> Option<ForeignTable> {
270 self.tables.read().get(name).cloned()
271 }
272
273 pub fn server(&self, name: &str) -> Option<ForeignServer> {
274 self.servers.read().get(name).cloned()
275 }
276
277 pub fn scan(&self, table_name: &str) -> Result<Vec<UnifiedRecord>, FdwError> {
280 let table = self
281 .foreign_table(table_name)
282 .ok_or_else(|| FdwError::NotFound(format!("foreign table '{table_name}'")))?;
283 let server = self
284 .server(&table.server_name)
285 .ok_or_else(|| FdwError::NotFound(format!("server '{}'", table.server_name)))?;
286
287 let mut merged = server.options.clone();
289 for (k, v) in &table.options.values {
290 merged.values.insert(k.clone(), v.clone());
291 }
292
293 server.wrapper.scan(server.wrapper_state.as_ref(), &merged)
294 }
295
296 pub fn list_servers(&self) -> Vec<String> {
297 self.servers.read().keys().cloned().collect()
298 }
299
300 pub fn list_foreign_tables(&self) -> Vec<String> {
301 self.tables.read().keys().cloned().collect()
302 }
303}
304
305impl Default for ForeignTableRegistry {
306 fn default() -> Self {
307 Self::with_builtins()
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314
315 #[test]
316 fn builtins_register_csv() {
317 let reg = ForeignTableRegistry::with_builtins();
318 reg.create_server("s1", "csv", FdwOptions::new().with("base_path", "/tmp"))
319 .expect("csv server create");
320 assert!(reg.server("s1").is_some());
321 }
322
323 #[test]
324 fn unknown_wrapper_rejected() {
325 let reg = ForeignTableRegistry::with_builtins();
326 let err = reg
327 .create_server("s1", "imaginary", FdwOptions::new())
328 .unwrap_err();
329 assert!(matches!(err, FdwError::UnknownWrapper(_)));
330 }
331
332 #[test]
333 fn foreign_table_needs_existing_server() {
334 let reg = ForeignTableRegistry::with_builtins();
335 let table = ForeignTable {
336 name: "t1".to_string(),
337 server_name: "nonexistent".to_string(),
338 columns: Vec::new(),
339 options: FdwOptions::new(),
340 };
341 let err = reg.create_foreign_table(table).unwrap_err();
342 assert!(matches!(err, FdwError::NotFound(_)));
343 }
344
345 #[test]
346 fn drop_server_cascades_tables() {
347 let reg = ForeignTableRegistry::with_builtins();
348 reg.create_server("s1", "csv", FdwOptions::new())
349 .expect("server create");
350 reg.create_foreign_table(ForeignTable {
351 name: "t1".to_string(),
352 server_name: "s1".to_string(),
353 columns: Vec::new(),
354 options: FdwOptions::new(),
355 })
356 .expect("table create");
357 assert!(reg.is_foreign_table("t1"));
358 assert!(reg.drop_server("s1"));
359 assert!(!reg.is_foreign_table("t1"));
360 }
361}