use std::collections::HashMap;
use std::sync::Arc;
use crate::storage::query::unified::UnifiedRecord;
pub mod csv;
pub use csv::CsvForeignWrapper;
#[derive(Debug, Clone, Default)]
pub struct FdwOptions {
pub values: HashMap<String, String>,
}
impl FdwOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with(mut self, key: &str, value: &str) -> Self {
self.values.insert(key.to_string(), value.to_string());
self
}
pub fn get(&self, key: &str) -> Option<&str> {
self.values.get(key).map(|s| s.as_str())
}
pub fn require(&self, key: &str) -> Result<&str, FdwError> {
self.get(key)
.ok_or_else(|| FdwError::MissingOption(key.to_string()))
}
}
#[derive(Debug)]
pub enum FdwError {
MissingOption(String),
UnknownWrapper(String),
NotFound(String),
Io(String),
Custom(String),
}
impl std::fmt::Display for FdwError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
FdwError::MissingOption(k) => write!(f, "FDW option '{k}' is required"),
FdwError::UnknownWrapper(k) => write!(f, "FDW wrapper '{k}' is not registered"),
FdwError::NotFound(n) => write!(f, "FDW object '{n}' not found"),
FdwError::Io(m) => write!(f, "FDW I/O error: {m}"),
FdwError::Custom(m) => write!(f, "FDW error: {m}"),
}
}
}
impl std::error::Error for FdwError {}
pub trait WrapperState: Send + Sync {
fn as_any(&self) -> &dyn std::any::Any;
}
pub trait ForeignDataWrapper: Send + Sync {
fn kind(&self) -> &'static str;
fn build_server_state(
&self,
_options: &FdwOptions,
) -> Result<Option<Arc<dyn WrapperState>>, FdwError> {
Ok(None)
}
fn scan(
&self,
server_state: Option<&Arc<dyn WrapperState>>,
table_options: &FdwOptions,
) -> Result<Vec<UnifiedRecord>, FdwError>;
fn supports_pushdown(&self) -> bool {
false
}
fn estimated_row_count(
&self,
_server_state: Option<&Arc<dyn WrapperState>>,
_table_options: &FdwOptions,
) -> Option<usize> {
None
}
}
#[derive(Clone)]
pub struct ForeignServer {
pub name: String,
pub wrapper_kind: String,
pub options: FdwOptions,
pub wrapper: Arc<dyn ForeignDataWrapper>,
pub wrapper_state: Option<Arc<dyn WrapperState>>,
}
#[derive(Clone)]
pub struct ForeignTable {
pub name: String,
pub server_name: String,
pub columns: Vec<ForeignColumn>,
pub options: FdwOptions,
}
#[derive(Debug, Clone)]
pub struct ForeignColumn {
pub name: String,
pub data_type: String,
pub not_null: bool,
}
pub struct ForeignTableRegistry {
wrappers: parking_lot::RwLock<HashMap<String, Arc<dyn ForeignDataWrapper>>>,
servers: parking_lot::RwLock<HashMap<String, ForeignServer>>,
tables: parking_lot::RwLock<HashMap<String, ForeignTable>>,
}
impl ForeignTableRegistry {
pub fn with_builtins() -> Self {
let reg = Self {
wrappers: parking_lot::RwLock::new(HashMap::new()),
servers: parking_lot::RwLock::new(HashMap::new()),
tables: parking_lot::RwLock::new(HashMap::new()),
};
reg.register_wrapper(Arc::new(CsvForeignWrapper));
reg
}
pub fn register_wrapper(&self, wrapper: Arc<dyn ForeignDataWrapper>) {
self.wrappers
.write()
.insert(wrapper.kind().to_string(), wrapper);
}
pub fn create_server(
&self,
name: &str,
wrapper_kind: &str,
options: FdwOptions,
) -> Result<(), FdwError> {
let wrapper = self
.wrappers
.read()
.get(wrapper_kind)
.cloned()
.ok_or_else(|| FdwError::UnknownWrapper(wrapper_kind.to_string()))?;
let wrapper_state = wrapper.build_server_state(&options)?;
let server = ForeignServer {
name: name.to_string(),
wrapper_kind: wrapper_kind.to_string(),
options,
wrapper,
wrapper_state,
};
self.servers.write().insert(name.to_string(), server);
Ok(())
}
pub fn drop_server(&self, name: &str) -> bool {
let mut tables = self.tables.write();
tables.retain(|_, t| t.server_name != name);
drop(tables);
self.servers.write().remove(name).is_some()
}
pub fn create_foreign_table(&self, table: ForeignTable) -> Result<(), FdwError> {
if !self.servers.read().contains_key(&table.server_name) {
return Err(FdwError::NotFound(format!(
"server '{}'",
table.server_name
)));
}
self.tables.write().insert(table.name.clone(), table);
Ok(())
}
pub fn drop_foreign_table(&self, name: &str) -> bool {
self.tables.write().remove(name).is_some()
}
pub fn is_foreign_table(&self, name: &str) -> bool {
self.tables.read().contains_key(name)
}
pub fn foreign_table(&self, name: &str) -> Option<ForeignTable> {
self.tables.read().get(name).cloned()
}
pub fn server(&self, name: &str) -> Option<ForeignServer> {
self.servers.read().get(name).cloned()
}
pub fn scan(&self, table_name: &str) -> Result<Vec<UnifiedRecord>, FdwError> {
let table = self
.foreign_table(table_name)
.ok_or_else(|| FdwError::NotFound(format!("foreign table '{table_name}'")))?;
let server = self
.server(&table.server_name)
.ok_or_else(|| FdwError::NotFound(format!("server '{}'", table.server_name)))?;
let mut merged = server.options.clone();
for (k, v) in &table.options.values {
merged.values.insert(k.clone(), v.clone());
}
server.wrapper.scan(server.wrapper_state.as_ref(), &merged)
}
pub fn list_servers(&self) -> Vec<String> {
self.servers.read().keys().cloned().collect()
}
pub fn list_foreign_tables(&self) -> Vec<String> {
self.tables.read().keys().cloned().collect()
}
}
impl Default for ForeignTableRegistry {
fn default() -> Self {
Self::with_builtins()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn builtins_register_csv() {
let reg = ForeignTableRegistry::with_builtins();
reg.create_server("s1", "csv", FdwOptions::new().with("base_path", "/tmp"))
.expect("csv server create");
assert!(reg.server("s1").is_some());
}
#[test]
fn unknown_wrapper_rejected() {
let reg = ForeignTableRegistry::with_builtins();
let err = reg
.create_server("s1", "imaginary", FdwOptions::new())
.unwrap_err();
assert!(matches!(err, FdwError::UnknownWrapper(_)));
}
#[test]
fn foreign_table_needs_existing_server() {
let reg = ForeignTableRegistry::with_builtins();
let table = ForeignTable {
name: "t1".to_string(),
server_name: "nonexistent".to_string(),
columns: Vec::new(),
options: FdwOptions::new(),
};
let err = reg.create_foreign_table(table).unwrap_err();
assert!(matches!(err, FdwError::NotFound(_)));
}
#[test]
fn drop_server_cascades_tables() {
let reg = ForeignTableRegistry::with_builtins();
reg.create_server("s1", "csv", FdwOptions::new())
.expect("server create");
reg.create_foreign_table(ForeignTable {
name: "t1".to_string(),
server_name: "s1".to_string(),
columns: Vec::new(),
options: FdwOptions::new(),
})
.expect("table create");
assert!(reg.is_foreign_table("t1"));
assert!(reg.drop_server("s1"));
assert!(!reg.is_foreign_table("t1"));
}
}