use std::{collections::HashMap, sync::Arc};
use reifydb_sdk::{
config::Config,
connector::{
sink::{FFISink, FFISinkMetadata},
source::{FFISource, FFISourceMetadata},
},
error::{Result as SdkResult, SdkError},
};
type SourceFactory = Arc<dyn Fn(&Config) -> SdkResult<Box<dyn FFISource>> + Send + Sync>;
type SinkFactory = Arc<dyn Fn(&Config) -> SdkResult<Box<dyn FFISink>> + Send + Sync>;
pub struct ConnectorRegistry {
sources: HashMap<String, SourceFactory>,
sinks: HashMap<String, SinkFactory>,
}
impl ConnectorRegistry {
pub fn new() -> Self {
Self {
sources: HashMap::new(),
sinks: HashMap::new(),
}
}
pub fn register_source<S: FFISource + FFISourceMetadata>(&mut self) {
let name = S::NAME.to_string();
self.sources.insert(
name,
Arc::new(|config: &Config| {
let source = S::new(config)?;
Ok(Box::new(source) as Box<dyn FFISource>)
}),
);
}
pub fn register_sink<S: FFISink + FFISinkMetadata>(&mut self) {
let name = S::NAME.to_string();
self.sinks.insert(
name,
Arc::new(|config: &Config| {
let sink = S::new(config)?;
Ok(Box::new(sink) as Box<dyn FFISink>)
}),
);
}
pub fn create_source(&self, name: &str, config: &Config) -> SdkResult<Box<dyn FFISource>> {
let factory = self
.sources
.get(name)
.ok_or_else(|| SdkError::Configuration(format!("unknown source connector: {}", name)))?;
factory(config)
}
pub fn create_sink(&self, name: &str, config: &Config) -> SdkResult<Box<dyn FFISink>> {
let factory = self
.sinks
.get(name)
.ok_or_else(|| SdkError::Configuration(format!("unknown sink connector: {}", name)))?;
factory(config)
}
pub fn has_source(&self, name: &str) -> bool {
self.sources.contains_key(name)
}
pub fn has_sink(&self, name: &str) -> bool {
self.sinks.contains_key(name)
}
}
impl Default for ConnectorRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use reifydb_sdk::{
connector::{
sink::SinkRecord,
source::{SourceBatch, SourceEmitter, SourceMode},
},
error::Result,
operator::column::operator::OperatorColumn,
};
use super::*;
struct MockSource;
impl FFISourceMetadata for MockSource {
const NAME: &'static str = "mock";
const VERSION: &'static str = "0.1.0";
const DESCRIPTION: &'static str = "Mock source for testing";
const MODE: SourceMode = SourceMode::Pull;
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
}
impl FFISource for MockSource {
fn new(_config: &Config) -> Result<Self> {
Ok(MockSource)
}
fn poll(&mut self, _checkpoint: Option<&[u8]>) -> Result<SourceBatch> {
Ok(SourceBatch::empty())
}
fn run(&mut self, _checkpoint: Option<&[u8]>, _emitter: SourceEmitter) -> Result<()> {
unimplemented!("mock source is pull-only")
}
fn shutdown(&mut self) -> Result<()> {
Ok(())
}
}
struct MockSink;
impl FFISinkMetadata for MockSink {
const NAME: &'static str = "mock";
const VERSION: &'static str = "0.1.0";
const DESCRIPTION: &'static str = "Mock sink for testing";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
}
impl FFISink for MockSink {
fn new(_config: &Config) -> Result<Self> {
Ok(MockSink)
}
fn write(&mut self, _records: &[SinkRecord]) -> Result<()> {
Ok(())
}
fn shutdown(&mut self) -> Result<()> {
Ok(())
}
}
#[test]
fn test_register_and_create_source() {
let mut registry = ConnectorRegistry::new();
registry.register_source::<MockSource>();
assert!(registry.has_source("mock"));
assert!(!registry.has_source("nonexistent"));
let source = registry.create_source("mock", &Config::new("mock", Default::default()));
assert!(source.is_ok());
}
#[test]
fn test_register_and_create_sink() {
let mut registry = ConnectorRegistry::new();
registry.register_sink::<MockSink>();
assert!(registry.has_sink("mock"));
assert!(!registry.has_sink("nonexistent"));
let sink = registry.create_sink("mock", &Config::new("mock", Default::default()));
assert!(sink.is_ok());
}
#[test]
fn test_unknown_connector_error() {
let registry = ConnectorRegistry::new();
let result = registry.create_source("nonexistent", &Config::new("nonexistent", Default::default()));
assert!(result.is_err());
let result = registry.create_sink("nonexistent", &Config::new("nonexistent", Default::default()));
assert!(result.is_err());
}
}