use crate::Data;
use crate::order::{PartialOrder, TotalOrder};
use crate::progress::timestamp::Timestamp;
use crate::dataflow::operators::generic::operator::source;
use crate::dataflow::operators::probe::Handle;
use crate::dataflow::{Stream, Scope};
pub struct IteratorSourceInput<T: Clone, D: Data, DI: IntoIterator<Item=D>, I: IntoIterator<Item=(T, DI)>> {
pub lower_bound: T,
pub data: I,
pub target: T,
}
pub fn iterator_source<
G: Scope,
D: Data,
DI: IntoIterator<Item=D>,
I: IntoIterator<Item=(G::Timestamp, DI)>,
F: FnMut(&G::Timestamp)->Option<IteratorSourceInput<G::Timestamp, D, DI, I>>+'static>(
scope: &G,
name: &str,
mut input_f: F,
probe: Handle<G::Timestamp>,
) -> Stream<G, D> where G::Timestamp: TotalOrder {
let mut target = G::Timestamp::minimum();
source(scope, name, |cap, info| {
let mut cap = Some(cap);
let activator = scope.activator_for(&info.address[..]);
move |output| {
cap = cap.take().and_then(|mut cap| {
loop {
if !probe.less_than(&target) {
if let Some(IteratorSourceInput {
lower_bound,
data,
target: new_target,
}) = input_f(cap.time()) {
target = new_target;
let mut has_data = false;
for (t, ds) in data.into_iter() {
cap = if cap.time() != &t { cap.delayed(&t) } else { cap };
let mut session = output.session(&cap);
session.give_iterator(ds.into_iter());
has_data = true;
}
cap = if cap.time().less_than(&lower_bound) { cap.delayed(&lower_bound) } else { cap };
if !has_data {
break Some(cap);
}
} else {
break None;
}
} else {
break Some(cap);
}
}
});
if cap.is_some() {
activator.activate();
}
}
})
}