use std::sync::{Arc, RwLock};
use lazy_static::lazy_static;
use crate::core::error::{Error, Result};
use super::builtins::{
CsvSinkPlugin, CsvSourcePlugin, FillNaPlugin, FilterTransformPlugin, JsonSinkPlugin,
JsonSourcePlugin, NormalizePlugin, SelectColumnsPlugin,
};
use super::registry::PluginRegistry;
use super::traits::{
AggregatorPlugin, DataSinkPlugin, DataSourcePlugin, TransformPlugin, ValidatorPlugin,
};
lazy_static! {
static ref GLOBAL_REGISTRY: RwLock<PluginRegistry> = RwLock::new(PluginRegistry::new());
}
pub fn register_builtin_plugins() -> Result<()> {
let mut registry = GLOBAL_REGISTRY.write().map_err(|_| Error::LockPoisoned {
context: "global registry write lock".to_string(),
})?;
if !registry.has_source("csv_source") {
registry.register_source(CsvSourcePlugin::arc())?;
}
if !registry.has_source("json_source") {
registry.register_source(JsonSourcePlugin::arc())?;
}
if !registry.has_sink("csv_sink") {
registry.register_sink(CsvSinkPlugin::arc())?;
}
if !registry.has_sink("json_sink") {
registry.register_sink(JsonSinkPlugin::arc())?;
}
if !registry.has_transform("filter") {
registry.register_transform(FilterTransformPlugin::arc())?;
}
if !registry.has_transform("select_columns") {
registry.register_transform(SelectColumnsPlugin::arc())?;
}
if !registry.has_transform("normalize") {
registry.register_transform(NormalizePlugin::arc())?;
}
if !registry.has_transform("fill_na") {
registry.register_transform(FillNaPlugin::arc())?;
}
Ok(())
}
pub fn global_registry() -> Result<Arc<PluginRegistry>> {
let registry = GLOBAL_REGISTRY.read().map_err(|_| Error::LockPoisoned {
context: "global registry read lock".to_string(),
})?;
drop(registry);
with_global_registry(|r| {
let mut snapshot = PluginRegistry::new();
for meta in r.list_plugins() {
let _ = meta; }
for name in r
.list_plugins()
.iter()
.map(|m| m.name.clone())
.collect::<Vec<_>>()
{
if let Some(p) = r.get_source(&name) {
let _ = snapshot.register_source(p);
}
if let Some(p) = r.get_sink(&name) {
let _ = snapshot.register_sink(p);
}
if let Some(p) = r.get_transform(&name) {
let _ = snapshot.register_transform(p);
}
if let Some(p) = r.get_aggregator(&name) {
let _ = snapshot.register_aggregator(p);
}
if let Some(p) = r.get_validator(&name) {
let _ = snapshot.register_validator(p);
}
}
Ok(Arc::new(snapshot))
})
}
pub fn with_global_registry<F, T>(f: F) -> Result<T>
where
F: FnOnce(&PluginRegistry) -> Result<T>,
{
let registry = GLOBAL_REGISTRY.read().map_err(|_| Error::LockPoisoned {
context: "global registry read lock".to_string(),
})?;
f(®istry)
}
pub fn with_global_registry_mut<F, T>(f: F) -> Result<T>
where
F: FnOnce(&mut PluginRegistry) -> Result<T>,
{
let mut registry = GLOBAL_REGISTRY.write().map_err(|_| Error::LockPoisoned {
context: "global registry write lock".to_string(),
})?;
f(&mut registry)
}
pub fn register_source(plugin: Arc<dyn DataSourcePlugin>) -> Result<()> {
with_global_registry_mut(|r| r.register_source(plugin))
}
pub fn register_sink(plugin: Arc<dyn DataSinkPlugin>) -> Result<()> {
with_global_registry_mut(|r| r.register_sink(plugin))
}
pub fn register_transform(plugin: Arc<dyn TransformPlugin>) -> Result<()> {
with_global_registry_mut(|r| r.register_transform(plugin))
}
pub fn register_aggregator(plugin: Arc<dyn AggregatorPlugin>) -> Result<()> {
with_global_registry_mut(|r| r.register_aggregator(plugin))
}
pub fn register_validator(plugin: Arc<dyn ValidatorPlugin>) -> Result<()> {
with_global_registry_mut(|r| r.register_validator(plugin))
}