use crate::channel::{self, Publisher, Subscribable, Subscriber};
use crate::pod::Pod;
use crate::wait::WaitStrategy;
use super::fan_out::FanOutBuilder;
use super::pipeline::Pipeline;
use super::{spawn_stage, SharedState, DEFAULT_CAPACITY};
pub struct PipelineBuilder {
capacity: usize,
}
impl Default for PipelineBuilder {
fn default() -> Self {
Self::new()
}
}
impl PipelineBuilder {
pub fn new() -> Self {
PipelineBuilder {
capacity: DEFAULT_CAPACITY,
}
}
pub fn capacity(mut self, cap: usize) -> Self {
self.capacity = cap;
self
}
pub fn input<T: Pod>(self) -> (Publisher<T>, StageBuilder<T>) {
let (pub_, subs) = channel::channel::<T>(self.capacity);
let subscriber = subs.subscribe();
(
pub_,
StageBuilder {
subscriber,
subscribable: subs,
capacity: self.capacity,
state: SharedState::new(),
},
)
}
}
pub struct StageBuilder<T: Pod> {
pub(super) subscriber: Subscriber<T>,
pub(super) subscribable: Subscribable<T>,
pub(super) capacity: usize,
pub(super) state: SharedState,
}
impl<T: Pod> StageBuilder<T> {
pub fn then<U: Pod>(self, f: impl Fn(T) -> U + Send + 'static) -> StageBuilder<U> {
self.then_with(f, WaitStrategy::default())
}
pub fn then_with<U: Pod>(
mut self,
f: impl Fn(T) -> U + Send + 'static,
strategy: WaitStrategy,
) -> StageBuilder<U> {
let (next_pub, next_subs) = channel::channel::<U>(self.capacity);
let next_sub = next_subs.subscribe();
let (status, handle) = spawn_stage(
self.subscriber,
next_pub,
self.state.shutdown.clone(),
f,
strategy,
);
self.state.handles.push(handle);
self.state.statuses.push(status);
StageBuilder {
subscriber: next_sub,
subscribable: next_subs,
capacity: self.capacity,
state: self.state,
}
}
pub fn fan_out<A, B>(
mut self,
fa: impl Fn(T) -> A + Send + 'static,
fb: impl Fn(T) -> B + Send + 'static,
) -> FanOutBuilder<A, B>
where
A: Pod,
B: Pod,
{
let (pub_a, subs_a) = channel::channel::<A>(self.capacity);
let (pub_b, subs_b) = channel::channel::<B>(self.capacity);
let sub_a_out = subs_a.subscribe();
let sub_b_out = subs_b.subscribe();
let input_a = self.subscriber;
let input_b = self.subscribable.subscribe();
let default_strategy = WaitStrategy::default();
let (status_a, handle_a) = spawn_stage(
input_a,
pub_a,
self.state.shutdown.clone(),
fa,
default_strategy,
);
let (status_b, handle_b) = spawn_stage(
input_b,
pub_b,
self.state.shutdown.clone(),
fb,
default_strategy,
);
self.state.handles.push(handle_a);
self.state.handles.push(handle_b);
self.state.statuses.push(status_a);
self.state.statuses.push(status_b);
FanOutBuilder {
sub_a: sub_a_out,
subs_a,
sub_b: sub_b_out,
subs_b,
capacity: self.capacity,
state: self.state,
}
}
pub fn build(self) -> (Subscriber<T>, Pipeline) {
(
self.subscriber,
Pipeline {
handles: self.state.handles,
shutdown: self.state.shutdown,
statuses: self.state.statuses,
},
)
}
}