use std::{iter, rc::Rc, sync::Mutex};
use super::operator::{AppendableOperator, BuildableOperator, OperatorBuilder};
use crate::{
channels::operator_io::{link, Input},
types::{Data, MaybeKey, MaybeTime},
worker::{union, InnerRuntimeBuilder},
};
pub struct StreamBuilder<K, V, T> {
operators: Vec<Box<dyn BuildableOperator>>,
tail: Input<K, V, T>,
runtime: Rc<Mutex<InnerRuntimeBuilder>>,
}
impl<K, V, T> StreamBuilder<K, V, T> {
pub(crate) fn get_runtime(&self) -> Rc<Mutex<InnerRuntimeBuilder>> {
Rc::clone(&self.runtime)
}
}
impl<K, V, T> StreamBuilder<K, V, T>
where
K: MaybeKey,
V: Data,
T: MaybeTime,
{
pub(crate) fn from_receiver(
receiver: Input<K, V, T>,
runtime: Rc<Mutex<InnerRuntimeBuilder>>,
) -> StreamBuilder<K, V, T> {
StreamBuilder {
operators: Vec::new(),
tail: receiver,
runtime,
}
}
}
impl<K, V, T> StreamBuilder<K, V, T>
where
K: MaybeKey,
V: Data,
T: MaybeTime,
{
pub fn then<KO: MaybeKey, VO: Data, TO: MaybeTime>(
mut self,
mut operator: OperatorBuilder<K, V, T, KO, VO, TO>,
) -> StreamBuilder<KO, VO, TO> {
std::mem::swap(&mut self.tail, operator.get_input_mut());
let mut new_tail = Input::new_unlinked();
link(operator.get_output_mut(), &mut new_tail);
self.operators.push(Box::new(operator).into_buildable());
StreamBuilder {
tail: new_tail,
operators: self.operators.drain(..).collect(),
runtime: Rc::clone(&self.runtime),
}
}
pub(crate) fn finish_pop_tail(mut self) -> Input<K, V, T> {
let rt = self.runtime.clone();
#[allow(clippy::unwrap_used)]
rt.lock().unwrap().add_operators(self.operators.drain(..));
let mut placeholder = Input::new_unlinked();
std::mem::swap(&mut placeholder, &mut self.tail);
placeholder
}
pub fn union(
self,
others: impl IntoIterator<Item = StreamBuilder<K, V, T>>,
) -> StreamBuilder<K, V, T> {
let runtime = self.runtime.clone();
union(runtime, iter::once(self).chain(others))
}
}
impl<K, V, T> Drop for StreamBuilder<K, V, T> {
fn drop(&mut self) {
#[allow(clippy::unwrap_used)]
self.runtime
.lock()
.unwrap()
.add_operators(self.operators.drain(..));
}
}