use super::executor::PipelineExecutor;
use super::stage::PipelineStage;
use crate::error::{Result, StreamingError};
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub max_concurrent_stages: usize,
pub buffer_size: usize,
pub enable_metrics: bool,
pub enable_profiling: bool,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
max_concurrent_stages: num_cpus::get(),
buffer_size: 100,
enable_metrics: true,
enable_profiling: false,
}
}
}
pub struct PipelineBuilder {
config: PipelineConfig,
stages: Vec<Arc<dyn PipelineStage>>,
}
impl PipelineBuilder {
pub fn new() -> Self {
Self {
config: PipelineConfig::default(),
stages: Vec::new(),
}
}
pub fn with_config(config: PipelineConfig) -> Self {
Self {
config,
stages: Vec::new(),
}
}
pub fn add_stage(mut self, stage: Arc<dyn PipelineStage>) -> Self {
self.stages.push(stage);
self
}
pub fn max_concurrent_stages(mut self, max: usize) -> Self {
self.config.max_concurrent_stages = max;
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.config.buffer_size = size;
self
}
pub fn enable_metrics(mut self, enable: bool) -> Self {
self.config.enable_metrics = enable;
self
}
pub fn enable_profiling(mut self, enable: bool) -> Self {
self.config.enable_profiling = enable;
self
}
pub async fn build(self) -> Result<PipelineExecutor> {
if self.stages.is_empty() {
return Err(StreamingError::ConfigError(
"Pipeline must have at least one stage".to_string()
));
}
PipelineExecutor::new(self.config, self.stages).await
}
}
impl Default for PipelineBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline::stage::TransformStage;
#[test]
fn test_pipeline_builder() {
let builder = PipelineBuilder::new()
.max_concurrent_stages(4)
.buffer_size(50)
.enable_metrics(true);
assert_eq!(builder.config.max_concurrent_stages, 4);
assert_eq!(builder.config.buffer_size, 50);
assert_eq!(builder.config.enable_metrics, true);
}
#[tokio::test]
async fn test_empty_pipeline() {
let builder = PipelineBuilder::new();
let result = builder.build().await;
assert!(result.is_err());
}
}