use std::collections::HashMap;
use std::sync::Arc;
use crate::core::error::{Error, Result};
use crate::dataframe::DataFrame;
use super::registry::PluginRegistry;
#[derive(Debug, Clone)]
pub enum PipelineStep {
Source {
plugin_name: String,
options: HashMap<String, String>,
},
Transform {
plugin_name: String,
options: HashMap<String, String>,
},
Validate {
plugin_name: String,
options: HashMap<String, String>,
},
Sink {
plugin_name: String,
options: HashMap<String, String>,
},
}
impl PipelineStep {
pub fn plugin_name(&self) -> &str {
match self {
PipelineStep::Source { plugin_name, .. } => plugin_name,
PipelineStep::Transform { plugin_name, .. } => plugin_name,
PipelineStep::Validate { plugin_name, .. } => plugin_name,
PipelineStep::Sink { plugin_name, .. } => plugin_name,
}
}
}
pub struct PluginPipeline {
registry: Arc<PluginRegistry>,
steps: Vec<PipelineStep>,
}
impl PluginPipeline {
pub fn new(registry: Arc<PluginRegistry>) -> Self {
PluginPipeline {
registry,
steps: Vec::new(),
}
}
pub fn source(mut self, plugin_name: &str, options: HashMap<String, String>) -> Self {
self.steps.push(PipelineStep::Source {
plugin_name: plugin_name.to_string(),
options,
});
self
}
pub fn transform(mut self, plugin_name: &str, options: HashMap<String, String>) -> Self {
self.steps.push(PipelineStep::Transform {
plugin_name: plugin_name.to_string(),
options,
});
self
}
pub fn validate(mut self, plugin_name: &str, options: HashMap<String, String>) -> Self {
self.steps.push(PipelineStep::Validate {
plugin_name: plugin_name.to_string(),
options,
});
self
}
pub fn sink(mut self, plugin_name: &str, options: HashMap<String, String>) -> Self {
self.steps.push(PipelineStep::Sink {
plugin_name: plugin_name.to_string(),
options,
});
self
}
pub fn execute(&self) -> Result<Option<DataFrame>> {
if self.steps.is_empty() {
return Err(Error::InvalidOperation("Pipeline has no steps".to_string()));
}
let first = self
.steps
.first()
.ok_or_else(|| Error::InvalidOperation("Pipeline has no steps".to_string()))?;
let mut current_df = match first {
PipelineStep::Source {
plugin_name,
options,
} => {
let source = self.registry.get_source(plugin_name).ok_or_else(|| {
Error::InvalidInput(format!(
"Pipeline: source plugin '{}' is not registered",
plugin_name
))
})?;
source.read(options)?
}
other => {
return Err(Error::InvalidOperation(format!(
"Pipeline must start with a Source step, found {:?}",
other.plugin_name()
)))
}
};
for step in self.steps.iter().skip(1) {
match step {
PipelineStep::Source { .. } => {
return Err(Error::InvalidOperation(
"Pipeline cannot have a Source step after the first step".to_string(),
));
}
PipelineStep::Transform {
plugin_name,
options,
} => {
let transform = self.registry.get_transform(plugin_name).ok_or_else(|| {
Error::InvalidInput(format!(
"Pipeline: transform plugin '{}' is not registered",
plugin_name
))
})?;
current_df = transform.transform(current_df, options)?;
}
PipelineStep::Validate {
plugin_name,
options,
} => {
let validator = self.registry.get_validator(plugin_name).ok_or_else(|| {
Error::InvalidInput(format!(
"Pipeline: validator plugin '{}' is not registered",
plugin_name
))
})?;
let issues = validator.validate(¤t_df, options)?;
for issue in &issues {
if issue.severity == crate::plugins::traits::IssueSeverity::Error {
return Err(Error::InvalidOperation(format!(
"Validation failed: {}",
issue.message
)));
}
}
}
PipelineStep::Sink {
plugin_name,
options,
} => {
let sink = self.registry.get_sink(plugin_name).ok_or_else(|| {
Error::InvalidInput(format!(
"Pipeline: sink plugin '{}' is not registered",
plugin_name
))
})?;
sink.write(¤t_df, options)?;
return Ok(None);
}
}
}
Ok(Some(current_df))
}
pub fn step_count(&self) -> usize {
self.steps.len()
}
pub fn steps(&self) -> &[PipelineStep] {
&self.steps
}
}