use crate::queue::ValueAt;
use crate::types::*;
use anyhow::anyhow;
use std::cmp::Ordering;
type Peeker<T> = std::iter::Peekable<Box<dyn Iterator<Item = ValueAt<T>>>>;
pub struct IteratorStream<T: Element> {
peekable: Peeker<T>,
value: Burst<T>,
}
fn add_callback<T>(peekable: &mut Peeker<T>, state: &mut GraphState) -> anyhow::Result<bool> {
match peekable.peek() {
Some(value_at) => {
state.add_callback(value_at.time);
Ok(true)
}
None => Ok(false),
}
}
#[node(output = value: Burst<T>)]
impl<T: Element> MutableNode for IteratorStream<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.value.clear();
while let Some(value_at) = self.peekable.peek() {
if value_at.time == state.time() {
let val = self
.peekable
.next()
.expect("peek() just returned Some")
.value
.clone();
self.value.push(val);
} else {
break;
}
}
add_callback(&mut self.peekable, state)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
add_callback(&mut self.peekable, state)?;
Ok(())
}
}
impl<T> IteratorStream<T>
where
T: Element + 'static,
{
pub fn new(it: Box<dyn Iterator<Item = ValueAt<T>>>) -> Self {
Self {
peekable: it.peekable(),
value: Burst::new(),
}
}
}
type TryPeeker<T> = std::iter::Peekable<Box<dyn Iterator<Item = anyhow::Result<ValueAt<T>>>>>;
pub struct TryIteratorStream<T: Element> {
peekable: TryPeeker<T>,
value: Burst<T>,
}
fn try_add_callback<T: Element>(
peekable: &mut TryPeeker<T>,
state: &mut GraphState,
) -> anyhow::Result<bool> {
match peekable.peek() {
Some(Ok(value_at)) => {
let time = value_at.time;
state.add_callback(time);
Ok(true)
}
Some(Err(_)) => Err(peekable
.next()
.expect("peek() just returned Some")
.expect_err("peek() just returned Err")),
None => Ok(false),
}
}
#[node(output = value: Burst<T>)]
impl<T: Element> MutableNode for TryIteratorStream<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
self.value.clear();
while let Some(item) = self.peekable.peek() {
let due = match item {
Ok(value_at) => value_at.time == state.time(),
Err(_) => false,
};
if !due {
break;
}
let value_at = self
.peekable
.next()
.expect("peek() just returned Some")
.expect("peek() just returned Ok");
self.value.push(value_at.value);
}
try_add_callback(&mut self.peekable, state)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
try_add_callback(&mut self.peekable, state)?;
Ok(())
}
}
impl<T> TryIteratorStream<T>
where
T: Element + 'static,
{
pub fn new(it: Box<dyn Iterator<Item = anyhow::Result<ValueAt<T>>>>) -> Self {
Self {
peekable: it.peekable(),
value: Burst::new(),
}
}
}
pub struct SimpleIteratorStream<T: Element> {
peekable: Peeker<T>,
value: T,
}
#[node(output = value: T)]
impl<T: Element> MutableNode for SimpleIteratorStream<T> {
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
let val_at1 = self
.peekable
.next()
.expect("SimpleIteratorStream cycled with no upcoming item");
self.value = val_at1.value;
if let Some(val_at2) = self.peekable.peek() {
match val_at1.time.cmp(&val_at2.time) {
Ordering::Greater => {
return Err(anyhow!("source time was descending!"));
}
Ordering::Equal => {
return Err(anyhow!(
"source produced multiple ticks for same time, use IteratorStream instead"
));
}
Ordering::Less => {}
}
}
add_callback(&mut self.peekable, state)
}
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
add_callback(&mut self.peekable, state)?;
Ok(())
}
}
impl<T> SimpleIteratorStream<T>
where
T: Element + 'static,
{
pub fn new(it: Box<dyn Iterator<Item = ValueAt<T>>>) -> SimpleIteratorStream<T> {
Self {
peekable: it.peekable(),
value: T::default(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::*;
use crate::nodes::*;
use crate::types::IntoStream;
fn value_ats(pairs: &[(u64, u64)]) -> Vec<ValueAt<u64>> {
pairs
.iter()
.map(|&(v, t)| ValueAt {
value: v,
time: NanoTime::new(t),
})
.collect()
}
#[test]
fn simple_iterator_emits_in_order() {
let items = value_ats(&[(10, 0), (20, 100), (30, 200), (0, 300)]);
let out = SimpleIteratorStream::new(Box::new(items.into_iter()))
.into_stream()
.collect();
out.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
let values: Vec<u64> = out.peek_value().iter().map(|v| v.value).collect();
assert_eq!(values, vec![10, 20, 30]);
}
#[test]
fn iterator_stream_groups_same_timestamp_into_burst() {
let items = value_ats(&[(1, 0), (2, 0), (3, 100), (0, 200)]);
let out = IteratorStream::new(Box::new(items.into_iter()))
.into_stream()
.collect();
out.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
let ticks = out.peek_value();
assert_eq!(ticks.len(), 2);
assert_eq!(ticks[0].value.as_slice(), &[1u64, 2u64]);
assert_eq!(ticks[1].value.as_slice(), &[3u64]);
}
#[test]
fn try_iterator_groups_ok_items_into_burst() {
let items: Vec<anyhow::Result<ValueAt<u64>>> =
value_ats(&[(1, 0), (2, 0), (3, 100), (0, 200)])
.into_iter()
.map(Ok)
.collect();
let out = TryIteratorStream::new(Box::new(items.into_iter()))
.into_stream()
.collect();
out.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever)
.unwrap();
let ticks = out.peek_value();
assert_eq!(ticks.len(), 2);
assert_eq!(ticks[0].value.as_slice(), &[1u64, 2u64]);
assert_eq!(ticks[1].value.as_slice(), &[3u64]);
}
#[test]
fn try_iterator_surfaces_item_error() {
let items: Vec<anyhow::Result<ValueAt<u64>>> = vec![
Ok(ValueAt {
value: 1,
time: NanoTime::new(0),
}),
Err(anyhow!("bad row")),
Ok(ValueAt {
value: 3,
time: NanoTime::new(200),
}),
];
let result = TryIteratorStream::new(Box::new(items.into_iter()))
.into_stream()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
let err = result.expect_err("expected the item error to surface");
assert!(format!("{err:#}").contains("bad row"), "got: {err:#}");
}
#[test]
fn simple_iterator_errors_on_duplicate_timestamps() {
let items = value_ats(&[(1, 0), (2, 0)]); let result = SimpleIteratorStream::new(Box::new(items.into_iter()))
.into_stream()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
assert!(result.is_err());
}
#[test]
fn simple_iterator_errors_on_descending_timestamps() {
let items = value_ats(&[(1, 100), (2, 50)]); let result = SimpleIteratorStream::new(Box::new(items.into_iter()))
.into_stream()
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
assert!(result.is_err());
}
}