use crate::channel::{self, Subscribable, Subscriber};
use crate::pod::Pod;
use crate::wait::WaitStrategy;
use super::pipeline::Pipeline;
use super::{spawn_stage, SharedState};
pub struct FanOutBuilder<A: Pod, B: Pod> {
pub(super) sub_a: Subscriber<A>,
pub(super) subs_a: Subscribable<A>,
pub(super) sub_b: Subscriber<B>,
pub(super) subs_b: Subscribable<B>,
pub(super) capacity: usize,
pub(super) state: SharedState,
}
impl<A: Pod, B: Pod> FanOutBuilder<A, B> {
pub fn build(self) -> ((Subscriber<A>, Subscriber<B>), Pipeline) {
(
(self.sub_a, self.sub_b),
Pipeline {
handles: self.state.handles,
shutdown: self.state.shutdown,
statuses: self.state.statuses,
},
)
}
pub fn then_a<A2: Pod>(self, f: impl Fn(A) -> A2 + Send + 'static) -> FanOutBuilder<A2, B> {
self.then_a_with(f, WaitStrategy::default())
}
pub fn then_a_with<A2: Pod>(
mut self,
f: impl Fn(A) -> A2 + Send + 'static,
strategy: WaitStrategy,
) -> FanOutBuilder<A2, B> {
let (next_pub, next_subs) = channel::channel::<A2>(self.capacity);
let next_sub = next_subs.subscribe();
let (status, handle) = spawn_stage(
self.sub_a,
next_pub,
self.state.shutdown.clone(),
f,
strategy,
);
self.state.handles.push(handle);
self.state.statuses.push(status);
FanOutBuilder {
sub_a: next_sub,
subs_a: next_subs,
sub_b: self.sub_b,
subs_b: self.subs_b,
capacity: self.capacity,
state: self.state,
}
}
pub fn then_b<B2: Pod>(self, f: impl Fn(B) -> B2 + Send + 'static) -> FanOutBuilder<A, B2> {
self.then_b_with(f, WaitStrategy::default())
}
pub fn then_b_with<B2: Pod>(
mut self,
f: impl Fn(B) -> B2 + Send + 'static,
strategy: WaitStrategy,
) -> FanOutBuilder<A, B2> {
let (next_pub, next_subs) = channel::channel::<B2>(self.capacity);
let next_sub = next_subs.subscribe();
let (status, handle) = spawn_stage(
self.sub_b,
next_pub,
self.state.shutdown.clone(),
f,
strategy,
);
self.state.handles.push(handle);
self.state.statuses.push(status);
FanOutBuilder {
sub_a: self.sub_a,
subs_a: self.subs_a,
sub_b: next_sub,
subs_b: next_subs,
capacity: self.capacity,
state: self.state,
}
}
}