use crate::order::TotalOrder;
use crate::progress::timestamp::Timestamp;
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::operators::probe::Handle;
use crate::dataflow::{StreamVec, Scope};
pub struct IteratorSourceInput<T: Clone, D: 'static, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(T, DI)>> {
pub lower_bound: T,
pub data: I,
pub target: T,
}
pub fn iterator_source<
'scope,
T: Timestamp,
D: 'static,
DI: IntoIterator<Item=D>,
I: IntoIterator<Item=(T, DI)>,
F: FnMut(&T)->Option<IteratorSourceInput<T, D, DI, I>>+'static>(
scope: Scope<'scope, T>,
name: &str,
mut input_f: F,
probe: Handle<T>,
) -> StreamVec<'scope, T, D> where T: TotalOrder {
let mut target = T::minimum();
source(scope, name, |cap, info| {
let mut cap = Some(cap);
let activator = scope.activator_for(info.address);
move |output| {
cap = cap.take().and_then(|mut cap| {
loop {
if !probe.less_than(&target) {
if let Some(IteratorSourceInput {
lower_bound,
data,
target: new_target,
}) = input_f(cap.time()) {
target = new_target;
let mut has_data = false;
for (t, ds) in data.into_iter() {
cap = if cap.time() != &t { cap.delayed(&t) } else { cap };
let mut session = output.session(&cap);
session.give_iterator(ds.into_iter());
has_data = true;
}
cap = if cap.time().less_than(&lower_bound) { cap.delayed(&lower_bound) } else { cap };
if !has_data {
break Some(cap);
}
} else {
break None;
}
} else {
break Some(cap);
}
}
});
if cap.is_some() {
activator.activate();
}
}
})
}