use crate::{
operators::IntoSource, stream::OperatorBuilder, types::{Data, DataMessage, Message, NoData, NoKey, NoTime}
};
pub struct SingleIteratorSource<T> {
iterator: Box<dyn Iterator<Item = T>>,
}
impl<T> SingleIteratorSource<T> {
pub fn new<I>(iter: I) -> Self
where
I: IntoIterator<Item = T>,
<I as IntoIterator>::IntoIter: 'static,
{
Self {
iterator: Box::new(iter.into_iter()),
}
}
}
impl <V> IntoSource<NoKey, V, usize> for SingleIteratorSource<V>
where
V: Data,
{
fn into_source(self) -> OperatorBuilder<NoKey, NoData, NoTime, NoKey, V, usize> {
let mut inner = self.iterator.enumerate();
let mut final_emitted = false;
OperatorBuilder::direct(move |input, output, _ctx| {
if !final_emitted {
if let Some(x) = inner.next() {
output.send(Message::Data(DataMessage::new(NoKey, x.1, x.0)));
} else {
output.send(Message::Epoch(usize::MAX));
final_emitted = true;
}
}
if let Some(msg) = input.recv() {
match msg {
Message::Data(_) => (),
Message::Epoch(_) => (),
Message::AbsBarrier(x) => output.send(Message::AbsBarrier(x)),
Message::Rescale(x) => output.send(Message::Rescale(x)),
Message::ShutdownMarker(x) => output.send(Message::ShutdownMarker(x)),
Message::Interrogate(x) => output.send(Message::Interrogate(x)),
Message::Collect(x) => output.send(Message::Collect(x)),
Message::Acquire(x) => output.send(Message::Acquire(x)),
Message::DropKey(x) => output.send(Message::DropKey(x)),
}
}
})
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use crate::{
operators::{Sink, Source}, sources::SingleIteratorSource, testing::{get_test_stream, VecSink}
};
#[test]
fn emits_values() {
let (builder, stream) = get_test_stream();
let in_data: Vec<i32> = (0..100).collect();
let collector = VecSink::new();
let stream = stream
.source(SingleIteratorSource::new(in_data))
.sink(collector.clone());
stream.finish();
builder.build().unwrap().execute();
let c = collector
.into_iter()
.map(|x| x.value)
.collect_vec();
assert_eq!(c, (0..100).collect_vec())
}
#[test]
fn emits_only_on_worker_0() {
todo!()
}
#[test]
fn emits_timestamped_messages() {
todo!()
}
#[test]
fn emits_max_epoch() {
todo!()
}
}