use crate::RS2Stream;
use async_stream::stream;
use futures_util::StreamExt;
use std::future::Future;
use std::pin::Pin;
use tokio::sync::broadcast;
#[derive(Debug)]
pub enum PipelineError {
NoSource,
NoSink,
InvalidPipeline(String),
RuntimeError(Box<dyn std::error::Error + Send + Sync>),
}
impl std::fmt::Display for PipelineError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PipelineError::NoSource => write!(f, "Pipeline has no source"),
PipelineError::NoSink => write!(f, "Pipeline has no sink"),
PipelineError::InvalidPipeline(msg) => write!(f, "Invalid pipeline: {}", msg),
PipelineError::RuntimeError(e) => write!(f, "Runtime error: {}", e),
}
}
}
impl std::error::Error for PipelineError {}
pub type PipelineResult<T> = Result<T, PipelineError>;
pub enum PipelineNode<T> {
Source {
name: String,
func: Box<dyn Fn() -> RS2Stream<T> + Send + Sync>,
},
Transform {
name: String,
func: Box<dyn Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync>,
},
Sink {
name: String,
func: Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
},
Branch {
name: String,
sinks: Vec<
Box<dyn Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>,
>,
},
}
#[derive(Debug)]
pub struct PipelineConfig {
pub name: String,
pub buffer_size: usize,
pub enable_metrics: bool,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
name: "unnamed-pipeline".to_string(),
buffer_size: 1000,
enable_metrics: false,
}
}
}
pub struct Pipeline<T> {
config: PipelineConfig,
nodes: Vec<PipelineNode<T>>,
}
impl<T: Send + Clone + 'static> Pipeline<T> {
pub fn new() -> Self {
Self {
config: PipelineConfig::default(),
nodes: vec![],
}
}
pub fn with_config(mut self, config: PipelineConfig) -> Self {
self.config = config;
self
}
pub fn named_source<F>(mut self, name: &str, f: F) -> Self
where
F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
{
self.nodes.push(PipelineNode::Source {
name: name.to_string(),
func: Box::new(f),
});
self
}
pub fn source<F>(self, f: F) -> Self
where
F: Fn() -> RS2Stream<T> + Send + Sync + 'static,
{
self.named_source("source", f)
}
pub fn named_transform<F>(mut self, name: &str, f: F) -> Self
where
F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
{
self.nodes.push(PipelineNode::Transform {
name: name.to_string(),
func: Box::new(f),
});
self
}
pub fn transform<F>(self, f: F) -> Self
where
F: Fn(RS2Stream<T>) -> RS2Stream<T> + Send + Sync + 'static,
{
self.named_transform("transform", f)
}
pub fn named_sink<F>(mut self, name: &str, f: F) -> Self
where
F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
{
self.nodes.push(PipelineNode::Sink {
name: name.to_string(),
func: Box::new(f),
});
self
}
pub fn sink<F>(self, f: F) -> Self
where
F: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
{
self.named_sink("sink", f)
}
pub fn branch<F1, F2>(mut self, name: &str, f1: F1, f2: F2) -> Self
where
F1: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
F2: Fn(RS2Stream<T>) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync + 'static,
{
self.nodes.push(PipelineNode::Branch {
name: name.to_string(),
sinks: vec![Box::new(f1), Box::new(f2)],
});
self
}
pub fn validate(&self) -> PipelineResult<()> {
if self.nodes.is_empty() {
return Err(PipelineError::InvalidPipeline("Empty pipeline".to_string()));
}
let mut has_source = false;
let mut has_sink_or_branch = false;
for node in &self.nodes {
match node {
PipelineNode::Source { .. } => has_source = true,
PipelineNode::Sink { .. } | PipelineNode::Branch { .. } => {
has_sink_or_branch = true
}
_ => {}
}
}
if !has_source {
return Err(PipelineError::NoSource);
}
if !has_sink_or_branch {
return Err(PipelineError::NoSink);
}
Ok(())
}
pub async fn run(self) -> PipelineResult<()> {
self.validate()?;
let mut stream = None;
for node in self.nodes {
match node {
PipelineNode::Source { name: _name, func } => {
stream = Some(func());
}
PipelineNode::Transform { name: _name, func } => {
if let Some(s) = stream.take() {
stream = Some(func(s));
}
}
PipelineNode::Sink { name: _name, func } => {
if let Some(s) = stream.take() {
func(s).await;
}
}
PipelineNode::Branch { name: _name, sinks } => {
if let Some(s) = stream.take() {
let (tx, _) = broadcast::channel(self.config.buffer_size);
let tx_clone = tx.clone();
tokio::spawn(async move {
let mut stream = s;
while let Some(item) = stream.next().await {
if tx_clone.send(item).is_err() {
break; }
}
});
let mut handles = Vec::new();
for sink_func in sinks {
let mut rx = tx.subscribe();
let sink_stream = stream! {
while let Ok(item) = rx.recv().await {
yield item;
}
}
.boxed();
handles.push(tokio::spawn(async move {
sink_func(sink_stream).await;
}));
}
for handle in handles {
if let Err(e) = handle.await {
return Err(PipelineError::RuntimeError(Box::new(e)));
}
}
}
}
}
}
Ok(())
}
}
impl<T: Send + Clone + 'static> Default for Pipeline<T> {
fn default() -> Self {
Self::new()
}
}