use crate::Container;
use crate::progress::Timestamp;
use crate::container::{DrainContainer, SizableContainer, PushInto};
use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::operators::generic::builder_rc::OperatorBuilder;
use crate::dataflow::operators::generic::OutputBuilder;
use crate::dataflow::Stream;
pub trait OkErr<'scope, T: Timestamp, C: DrainContainer> {
fn ok_err<C1, D1, C2, D2, L>(
self,
logic: L,
) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)
where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
;
}
impl<'scope, T: Timestamp, C: Container + DrainContainer> OkErr<'scope, T, C> for Stream<'scope, T, C> {
fn ok_err<C1, D1, C2, D2, L>(
self,
mut logic: L,
) -> (Stream<'scope, T, C1>, Stream<'scope, T, C2>)
where
C1: Container + SizableContainer + PushInto<D1>,
C2: Container + SizableContainer + PushInto<D2>,
L: FnMut(C::Item<'_>) -> Result<D1,D2>+'static
{
let mut builder = OperatorBuilder::new("OkErr".to_owned(), self.scope());
let mut input = builder.new_input(self, Pipeline);
builder.set_notify_for(0, crate::progress::operate::FrontierInterest::Never);
let (output1, stream1) = builder.new_output();
let (output2, stream2) = builder.new_output();
let mut output1 = OutputBuilder::from(output1);
let mut output2 = OutputBuilder::from(output2);
builder.build(move |_| {
move |_frontiers| {
let mut output1_handle = output1.activate();
let mut output2_handle = output2.activate();
input.for_each_time(|time, data| {
let mut out1 = output1_handle.session(&time);
let mut out2 = output2_handle.session(&time);
for datum in data.flat_map(|d| d.drain()) {
match logic(datum) {
Ok(datum) => out1.give(datum),
Err(datum) => out2.give(datum),
}
}
});
}
});
(stream1, stream2)
}
}