use std::iter::{Enumerate, Peekable};
use crate::types::{Data, NoKey};
use super::{StatelessSourceImpl, StatelessSourcePartition};
pub struct SingleIteratorSource<T>(Option<Box<dyn Iterator<Item = T>>>);
impl<T> SingleIteratorSource<T> {
pub fn new<I>(iter: I) -> Self
where
I: IntoIterator<Item = T>,
<I as IntoIterator>::IntoIter: 'static,
{
Self(Some(Box::new(iter.into_iter())))
}
}
impl<V> StatelessSourceImpl<V, usize> for SingleIteratorSource<V>
where
V: Data,
{
type Part = NoKey;
type SourcePartition = SingleIteratorPartition<V>;
fn list_parts(&self) -> Vec<Self::Part> {
vec![NoKey]
}
fn build_part(&mut self, _part: &Self::Part) -> Self::SourcePartition {
match self.0.take() {
Some(x) => SingleIteratorPartition(x.enumerate().peekable()),
None => unreachable!("SingleIteratorSource only has one part"),
}
}
}
pub struct SingleIteratorPartition<V>(Peekable<Enumerate<Box<dyn Iterator<Item = V>>>>);
impl<V> StatelessSourcePartition<V, usize> for SingleIteratorPartition<V> {
fn poll(&mut self) -> Option<(V, usize)> {
self.0.next().map(|x| (x.1, x.0))
}
fn is_finished(&mut self) -> bool {
self.0.peek().is_none()
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use proptest::bits::usize;
use crate::{
channels::operator_io::Input,
operators::*,
sinks::StatelessSink,
sources::{SingleIteratorSource, StatelessSource},
stream::OperatorBuilder,
testing::get_test_rt,
testing::VecSink,
types::{Message, NoKey},
};
#[test]
fn emits_values() {
let in_data: Vec<i32> = (0..100).collect();
let collector = VecSink::new();
let rt = get_test_rt(|provider| {
let in_data = in_data.clone();
provider
.new_stream()
.source(
"source",
StatelessSource::new(SingleIteratorSource::new(in_data)),
)
.sink("sink", StatelessSink::new(collector.clone()));
});
rt.execute().unwrap();
let c = collector.into_iter().map(|x| x.value).collect_vec();
assert_eq!(c, (0..100).collect_vec())
}
#[test]
fn emits_timestamped_messages() {
let sink = VecSink::new();
let rt = get_test_rt(|provider| {
provider
.new_stream()
.source(
"source",
StatelessSource::new(SingleIteratorSource::new(42..52)),
)
.sink("sink", StatelessSink::new(sink.clone()));
});
rt.execute().unwrap();
let timestamps = sink.into_iter().map(|x| x.timestamp).collect_vec();
let expected = (0..10).collect_vec();
assert_eq!(expected, timestamps);
}
#[test]
fn emits_max_epoch() {
let sink = VecSink::new();
let rt = get_test_rt(|provider| {
let sink = sink.clone();
provider
.new_stream()
.source(
"source",
StatelessSource::new(SingleIteratorSource::new(0..10)),
)
.then(OperatorBuilder::direct(
"sink-epochs",
move |input: &mut Input<NoKey, i32, usize>, output, _ctx| match input.recv() {
Some(Message::Epoch(x)) => {
sink.give(x.clone());
output.send(Message::Epoch(x));
}
Some(msg) => output.send(msg),
None => (),
},
));
});
rt.execute().unwrap();
let messages = sink.drain_vec(..);
let last = messages.last().unwrap();
assert_eq!(*last, usize::MAX);
}
}