use std::sync::{Arc, RwLock};
use async_trait::async_trait;
use crate::frames::FrameDirection;
use crate::error::Result;
use crate::frames::Frame;
use crate::frames::{FrameHandler, FrameProcessor, WeakFrameProcessor};
pub(crate) struct PipelineSourceHandler {
outer: Arc<RwLock<Option<WeakFrameProcessor>>>,
}
#[async_trait]
impl FrameHandler for PipelineSourceHandler {
async fn on_process_frame(
&self,
processor: &FrameProcessor,
frame: Frame,
direction: FrameDirection,
) -> Result<()> {
match direction {
FrameDirection::Downstream => {
processor.push_frame(frame, FrameDirection::Downstream).await
}
FrameDirection::Upstream => {
let outer_opt = {
let g = self.outer.read().unwrap();
g.as_ref().and_then(|w| w.upgrade())
};
if let Some(outer) = outer_opt {
outer.push_frame(frame, FrameDirection::Upstream).await?;
}
Ok(())
}
}
}
}
pub(crate) struct PipelineSinkHandler {
outer: Arc<RwLock<Option<WeakFrameProcessor>>>,
}
#[async_trait]
impl FrameHandler for PipelineSinkHandler {
async fn on_process_frame(
&self,
processor: &FrameProcessor,
frame: Frame,
direction: FrameDirection,
) -> Result<()> {
match direction {
FrameDirection::Upstream => {
processor.push_frame(frame, FrameDirection::Upstream).await
}
FrameDirection::Downstream => {
let outer_opt = {
let g = self.outer.read().unwrap();
g.as_ref().and_then(|w| w.upgrade())
};
if let Some(outer) = outer_opt {
outer.push_frame(frame, FrameDirection::Downstream).await?;
}
Ok(())
}
}
}
}
struct PipelineHandler {
source: FrameProcessor,
sink: FrameProcessor,
}
#[async_trait]
impl FrameHandler for PipelineHandler {
async fn on_process_frame(
&self,
_processor: &FrameProcessor,
frame: Frame,
direction: FrameDirection,
) -> Result<()> {
match direction {
FrameDirection::Downstream => {
self.source
.queue_frame(frame, FrameDirection::Downstream, None)
.await
}
FrameDirection::Upstream => {
self.sink
.queue_frame(frame, FrameDirection::Upstream, None)
.await
}
}
}
}
pub struct Pipeline;
impl Pipeline {
pub fn new(processors: Vec<FrameProcessor>) -> FrameProcessor {
let outer_slot: Arc<RwLock<Option<WeakFrameProcessor>>> =
Arc::new(RwLock::new(None));
let source = FrameProcessor::new(
"PipelineSource",
Box::new(PipelineSourceHandler {
outer: outer_slot.clone(),
}),
true, );
let sink = FrameProcessor::new(
"PipelineSink",
Box::new(PipelineSinkHandler {
outer: outer_slot.clone(),
}),
true, );
let all: Vec<FrameProcessor> = std::iter::once(source.clone())
.chain(processors.iter().cloned())
.chain(std::iter::once(sink.clone()))
.collect();
for window in all.windows(2) {
window[0].link(&window[1]);
}
let outer = FrameProcessor::new(
"Pipeline",
Box::new(PipelineHandler {
source: source.clone(),
sink: sink.clone(),
}),
false, );
outer.set_sub_processors(all.clone());
outer.set_entry_processors(vec![source.clone()]);
*outer_slot.write().unwrap() = Some(outer.downgrade());
outer
}
}