Skip to main content

reddb_server/storage/fdw/
mod.rs

1//! Foreign Data Wrapper framework (Phase 3.2 PG parity).
2//!
3//! Allows RedDB to expose external data sources (CSV files, remote
4//! PostgreSQL / MySQL databases, S3 Parquet, etc.) as "foreign tables"
5//! that queries can reference in SELECT / JOIN / subquery positions
6//! alongside native RedDB collections.
7//!
8//! # Architecture
9//!
10//! 1. `ForeignDataWrapper` — a trait implemented per data-source kind.
11//!    Each wrapper exposes `scan(options, filter) -> Vec<UnifiedRecord>`
12//!    plus (optional) `insert` / `delete` for writable wrappers.
13//! 2. `ForeignServer` — a named instance of a wrapper, configured with
14//!    options (connection string, base path, credentials). Created via
15//!    `CREATE SERVER name FOREIGN DATA WRAPPER kind OPTIONS (...)`.
16//! 3. `ForeignTable` — a named logical table backed by a server. Created
17//!    via `CREATE FOREIGN TABLE name (cols) SERVER srv OPTIONS (...)`.
18//!    When a query references the table, the runtime's rewriter swaps
19//!    the reference for the wrapper's scan result.
20//! 4. `ForeignTableRegistry` — in-memory catalog of servers + tables.
21//!    Phase 3.2 keeps definitions in memory only; persistence across
22//!    restarts is a 3.2.2 follow-up that mirrors the view registry.
23//!
24//! # Wrappers shipped in Phase 3.2
25//!
26//! * `csv` — read a CSV file on local disk. Reuses the RFC-4180 parser
27//!   from `storage::import::csv`.
28//!
29//! Additional wrappers (`postgres_fdw`, `mysql_fdw`, `s3_parquet_fdw`)
30//! live behind feature flags to avoid pulling their client libraries
31//! into every build.
32
33use std::collections::HashMap;
34use std::sync::Arc;
35
36use crate::storage::query::unified::UnifiedRecord;
37
38pub mod csv;
39
40pub use csv::CsvForeignWrapper;
41
42/// Options bag for `CREATE SERVER ... OPTIONS (key 'value', ...)` and
43/// `CREATE FOREIGN TABLE ... OPTIONS (key 'value', ...)`. Key-value
44/// strings mirror PG's generic-options model.
45#[derive(Debug, Clone, Default)]
46pub struct FdwOptions {
47    pub values: HashMap<String, String>,
48}
49
50impl FdwOptions {
51    pub fn new() -> Self {
52        Self::default()
53    }
54
55    pub fn with(mut self, key: &str, value: &str) -> Self {
56        self.values.insert(key.to_string(), value.to_string());
57        self
58    }
59
60    pub fn get(&self, key: &str) -> Option<&str> {
61        self.values.get(key).map(|s| s.as_str())
62    }
63
64    /// Fetch an option, returning a structured error when absent.
65    /// Wrappers call this at scan time so callers get a clear message
66    /// instead of a silent `None`.
67    pub fn require(&self, key: &str) -> Result<&str, FdwError> {
68        self.get(key)
69            .ok_or_else(|| FdwError::MissingOption(key.to_string()))
70    }
71}
72
73/// Errors raised by wrappers. Wrapper-specific subclasses collapse into
74/// `Custom` so the runtime can surface them uniformly.
75#[derive(Debug)]
76pub enum FdwError {
77    /// A required option was not set on the server or foreign table.
78    MissingOption(String),
79    /// The wrapper kind referenced by `CREATE SERVER` is not registered.
80    UnknownWrapper(String),
81    /// The foreign table / server referenced by a statement doesn't exist.
82    NotFound(String),
83    /// I/O or transport failure (file read, HTTP fetch, database call).
84    Io(String),
85    /// Arbitrary wrapper-specific error.
86    Custom(String),
87}
88
89impl std::fmt::Display for FdwError {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            FdwError::MissingOption(k) => write!(f, "FDW option '{k}' is required"),
93            FdwError::UnknownWrapper(k) => write!(f, "FDW wrapper '{k}' is not registered"),
94            FdwError::NotFound(n) => write!(f, "FDW object '{n}' not found"),
95            FdwError::Io(m) => write!(f, "FDW I/O error: {m}"),
96            FdwError::Custom(m) => write!(f, "FDW error: {m}"),
97        }
98    }
99}
100
101impl std::error::Error for FdwError {}
102
103/// Immutable per-instance state a wrapper produces once when the server
104/// is created. Stored on `ForeignServer::wrapper_state` so concurrent
105/// scans don't re-initialise (e.g. re-open file handles, re-parse certs).
106pub trait WrapperState: Send + Sync {
107    fn as_any(&self) -> &dyn std::any::Any;
108}
109
110/// Core trait every FDW implements.
111///
112/// Phase 3.2 is read-only — `scan` is mandatory; `insert` / `delete`
113/// are defaulted to "not supported" so wrappers can opt-in. Filter
114/// pushdown is also opt-in via `supports_pushdown`; the runtime
115/// checks that before handing the wrapper the predicate AST.
116pub trait ForeignDataWrapper: Send + Sync {
117    /// Unique identifier for the wrapper kind (e.g. "csv", "postgres").
118    /// Matches the `FOREIGN DATA WRAPPER <name>` clause in DDL.
119    fn kind(&self) -> &'static str;
120
121    /// Validate + materialise a server's options. Called once on
122    /// `CREATE SERVER` and cached on `ForeignServer`. Wrappers that
123    /// don't need per-server state return `Ok(None)`.
124    fn build_server_state(
125        &self,
126        _options: &FdwOptions,
127    ) -> Result<Option<Arc<dyn WrapperState>>, FdwError> {
128        Ok(None)
129    }
130
131    /// Stream rows from the foreign table. `table_options` merges the
132    /// server's options with the table's options (table takes priority
133    /// on conflicts). `filter` is opaque to the wrapper unless it
134    /// advertises pushdown via `supports_pushdown`.
135    fn scan(
136        &self,
137        server_state: Option<&Arc<dyn WrapperState>>,
138        table_options: &FdwOptions,
139    ) -> Result<Vec<UnifiedRecord>, FdwError>;
140
141    /// Whether the wrapper can evaluate a SQL WHERE predicate natively.
142    /// When true, the planner will hand the AST to `scan_with_filter`;
143    /// when false, the runtime applies the filter after the scan.
144    fn supports_pushdown(&self) -> bool {
145        false
146    }
147
148    /// Estimated row count — drives planner cost. `None` means the
149    /// wrapper has no cheap way to estimate and the planner falls back
150    /// to its default assumption.
151    fn estimated_row_count(
152        &self,
153        _server_state: Option<&Arc<dyn WrapperState>>,
154        _table_options: &FdwOptions,
155    ) -> Option<usize> {
156        None
157    }
158}
159
160/// Registered CREATE SERVER instance.
161#[derive(Clone)]
162pub struct ForeignServer {
163    pub name: String,
164    pub wrapper_kind: String,
165    pub options: FdwOptions,
166    pub wrapper: Arc<dyn ForeignDataWrapper>,
167    pub wrapper_state: Option<Arc<dyn WrapperState>>,
168}
169
170/// Registered CREATE FOREIGN TABLE instance.
171#[derive(Clone)]
172pub struct ForeignTable {
173    pub name: String,
174    pub server_name: String,
175    pub columns: Vec<ForeignColumn>,
176    pub options: FdwOptions,
177}
178
179#[derive(Debug, Clone)]
180pub struct ForeignColumn {
181    pub name: String,
182    /// Declared SQL type as a string (coerced opportunistically at scan time).
183    pub data_type: String,
184    pub not_null: bool,
185}
186
187/// Central registry: maps wrapper kinds to implementations, server names
188/// to server records, foreign table names to their definitions.
189pub struct ForeignTableRegistry {
190    wrappers: parking_lot::RwLock<HashMap<String, Arc<dyn ForeignDataWrapper>>>,
191    servers: parking_lot::RwLock<HashMap<String, ForeignServer>>,
192    tables: parking_lot::RwLock<HashMap<String, ForeignTable>>,
193}
194
195impl ForeignTableRegistry {
196    /// Create an empty registry populated with Phase 3.2 built-in wrappers.
197    pub fn with_builtins() -> Self {
198        let reg = Self {
199            wrappers: parking_lot::RwLock::new(HashMap::new()),
200            servers: parking_lot::RwLock::new(HashMap::new()),
201            tables: parking_lot::RwLock::new(HashMap::new()),
202        };
203        // CSV is the only built-in shipped in Phase 3.2 — external
204        // wrappers (postgres_fdw, mysql_fdw, s3_parquet_fdw) live behind
205        // optional cargo features to avoid pulling client libraries in.
206        reg.register_wrapper(Arc::new(CsvForeignWrapper));
207        reg
208    }
209
210    pub fn register_wrapper(&self, wrapper: Arc<dyn ForeignDataWrapper>) {
211        self.wrappers
212            .write()
213            .insert(wrapper.kind().to_string(), wrapper);
214    }
215
216    pub fn create_server(
217        &self,
218        name: &str,
219        wrapper_kind: &str,
220        options: FdwOptions,
221    ) -> Result<(), FdwError> {
222        let wrapper = self
223            .wrappers
224            .read()
225            .get(wrapper_kind)
226            .cloned()
227            .ok_or_else(|| FdwError::UnknownWrapper(wrapper_kind.to_string()))?;
228
229        let wrapper_state = wrapper.build_server_state(&options)?;
230        let server = ForeignServer {
231            name: name.to_string(),
232            wrapper_kind: wrapper_kind.to_string(),
233            options,
234            wrapper,
235            wrapper_state,
236        };
237        self.servers.write().insert(name.to_string(), server);
238        Ok(())
239    }
240
241    pub fn drop_server(&self, name: &str) -> bool {
242        // Cascade: drop every foreign table pointing at this server.
243        let mut tables = self.tables.write();
244        tables.retain(|_, t| t.server_name != name);
245        drop(tables);
246        self.servers.write().remove(name).is_some()
247    }
248
249    pub fn create_foreign_table(&self, table: ForeignTable) -> Result<(), FdwError> {
250        // Validate server exists.
251        if !self.servers.read().contains_key(&table.server_name) {
252            return Err(FdwError::NotFound(format!(
253                "server '{}'",
254                table.server_name
255            )));
256        }
257        self.tables.write().insert(table.name.clone(), table);
258        Ok(())
259    }
260
261    pub fn drop_foreign_table(&self, name: &str) -> bool {
262        self.tables.write().remove(name).is_some()
263    }
264
265    pub fn is_foreign_table(&self, name: &str) -> bool {
266        self.tables.read().contains_key(name)
267    }
268
269    pub fn foreign_table(&self, name: &str) -> Option<ForeignTable> {
270        self.tables.read().get(name).cloned()
271    }
272
273    pub fn server(&self, name: &str) -> Option<ForeignServer> {
274        self.servers.read().get(name).cloned()
275    }
276
277    /// Scan a foreign table — called by the runtime when a SELECT
278    /// references the table by name.
279    pub fn scan(&self, table_name: &str) -> Result<Vec<UnifiedRecord>, FdwError> {
280        let table = self
281            .foreign_table(table_name)
282            .ok_or_else(|| FdwError::NotFound(format!("foreign table '{table_name}'")))?;
283        let server = self
284            .server(&table.server_name)
285            .ok_or_else(|| FdwError::NotFound(format!("server '{}'", table.server_name)))?;
286
287        // Merge server options with table options (table wins).
288        let mut merged = server.options.clone();
289        for (k, v) in &table.options.values {
290            merged.values.insert(k.clone(), v.clone());
291        }
292
293        server.wrapper.scan(server.wrapper_state.as_ref(), &merged)
294    }
295
296    pub fn list_servers(&self) -> Vec<String> {
297        self.servers.read().keys().cloned().collect()
298    }
299
300    pub fn list_foreign_tables(&self) -> Vec<String> {
301        self.tables.read().keys().cloned().collect()
302    }
303}
304
305impl Default for ForeignTableRegistry {
306    fn default() -> Self {
307        Self::with_builtins()
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314
315    #[test]
316    fn builtins_register_csv() {
317        let reg = ForeignTableRegistry::with_builtins();
318        reg.create_server("s1", "csv", FdwOptions::new().with("base_path", "/tmp"))
319            .expect("csv server create");
320        assert!(reg.server("s1").is_some());
321    }
322
323    #[test]
324    fn unknown_wrapper_rejected() {
325        let reg = ForeignTableRegistry::with_builtins();
326        let err = reg
327            .create_server("s1", "imaginary", FdwOptions::new())
328            .unwrap_err();
329        assert!(matches!(err, FdwError::UnknownWrapper(_)));
330    }
331
332    #[test]
333    fn foreign_table_needs_existing_server() {
334        let reg = ForeignTableRegistry::with_builtins();
335        let table = ForeignTable {
336            name: "t1".to_string(),
337            server_name: "nonexistent".to_string(),
338            columns: Vec::new(),
339            options: FdwOptions::new(),
340        };
341        let err = reg.create_foreign_table(table).unwrap_err();
342        assert!(matches!(err, FdwError::NotFound(_)));
343    }
344
345    #[test]
346    fn drop_server_cascades_tables() {
347        let reg = ForeignTableRegistry::with_builtins();
348        reg.create_server("s1", "csv", FdwOptions::new())
349            .expect("server create");
350        reg.create_foreign_table(ForeignTable {
351            name: "t1".to_string(),
352            server_name: "s1".to_string(),
353            columns: Vec::new(),
354            options: FdwOptions::new(),
355        })
356        .expect("table create");
357        assert!(reg.is_foreign_table("t1"));
358        assert!(reg.drop_server("s1"));
359        assert!(!reg.is_foreign_table("t1"));
360    }
361}