pub mod node;
use node::{Node, OutNode};
pub struct Pipeline<TOut, TCollected, TNext>
where
TOut: Send + 'static,
TNext: Node<TOut, TCollected> + Send + Sync + 'static,
{
first_block: Option<OutNode<TOut, TCollected, TNext>>,
}
impl<TOut, TCollected, TNext> Pipeline<TOut, TCollected, TNext>
where
TOut: Send + 'static,
TNext: Node<TOut, TCollected> + Send + Sync + 'static,
{
pub fn new(source: OutNode<TOut, TCollected, TNext>) -> Pipeline<TOut, TCollected, TNext> {
Pipeline {
first_block: Some(source),
}
}
pub fn start(&mut self) {
match &mut self.first_block {
Some(block) => {
block.start();
}
None => panic!("Error: Cannot start the pipeline!"),
}
}
pub fn wait_end(mut self) -> Option<TCollected> {
match &mut self.first_block {
Some(_block) => {
let block = self.first_block.take();
match block {
Some(block) => Node::<TOut, TCollected>::collect(block),
None => None,
}
}
None => None,
}
}
pub fn start_and_wait_end(mut self) -> Option<TCollected> {
self.start();
self.wait_end()
}
}
impl<TOut, TCollected, TNext> Drop for Pipeline<TOut, TCollected, TNext>
where
TOut: Send + 'static,
TNext: Node<TOut, TCollected> + Send + Sync + 'static,
{
fn drop(&mut self) {
match &mut self.first_block {
Some(_block) => {
let block = self.first_block.take();
if let Some(block) = block {
block.terminate();
}
}
None => (),
}
}
}
#[macro_export]
macro_rules! propagate {
($s1:expr) => {
{
let block = InNode::new(Box::new($s1), get_global_orchestrator());
block
}
};
($s1:expr $(, $tail:expr)*) => {
{
let node = ($s1);
let block = InOutNode::new(Box::new(node),
propagate!($($tail),*),
get_global_orchestrator());
block
}
};
}
#[macro_export]
macro_rules! pipeline {
($s1:expr $(, $tail:expr)*) => {
{
let orchestrator = get_global_orchestrator();
let block = OutNode::new(Box::new($s1),
propagate!($($tail),*), orchestrator);
let pipeline = Pipeline::new(block);
pipeline
}
};
}