use futures::stream::StreamExt;
use tokio::sync::mpsc;
use crate::source::Source;
pub fn conflate<T, U, S, F>(src: Source<T>, mut seed: S, mut fold: F) -> Source<U>
where
T: Send + 'static,
U: Send + 'static,
S: FnMut(T) -> U + Send + 'static,
F: FnMut(U, T) -> U + Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel::<U>();
let mut inner = src.into_boxed();
tokio::spawn(async move {
let mut acc: Option<U> = None;
loop {
match inner.next().await {
Some(item) => {
acc = Some(match acc.take() {
None => seed(item),
Some(prev) => fold(prev, item),
});
if let Some(a) = acc.take() {
if tx.send(a).is_err() {
return;
}
}
}
None => {
if let Some(a) = acc.take() {
let _ = tx.send(a);
}
return;
}
}
}
});
Source::from_receiver(rx)
}
pub fn expand<T, F, I>(src: Source<T>, mut extrapolate: F) -> Source<T>
where
T: Clone + Send + 'static,
F: FnMut(&T) -> I + Send + 'static,
I: Iterator<Item = T> + Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel::<T>();
let mut inner = src.into_boxed();
tokio::spawn(async move {
let mut last: Option<T> = None;
loop {
match inner.next().await {
Some(item) => {
if tx.send(item.clone()).is_err() {
return;
}
last = Some(item);
}
None => {
if let Some(l) = last {
for synth in extrapolate(&l) {
if tx.send(synth).is_err() {
return;
}
}
}
return;
}
}
}
});
Source::from_receiver(rx)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sink::Sink;
#[tokio::test]
async fn conflate_passes_through_when_downstream_keeps_up() {
let s = Source::from_iter(vec![1u32, 2, 3]);
let out = Sink::collect(conflate(s, |t| t, |a, b| a + b)).await;
assert_eq!(out, vec![1, 2, 3]);
}
#[tokio::test]
async fn conflate_seed_initializes_accumulator() {
let s = Source::from_iter(vec![10u32]);
let out = Sink::collect(conflate(s, |t| t * 2, |a, b| a + b)).await;
assert_eq!(out, vec![20]);
}
#[tokio::test]
async fn expand_emits_extrapolated_values_after_upstream_close() {
let s = Source::from_iter(vec![5i32]);
let out = Sink::collect(expand(s, |last| {
let l = *last;
(0..3).map(move |i| l + i + 1)
}))
.await;
assert_eq!(out, vec![5, 6, 7, 8]);
}
#[tokio::test]
async fn expand_no_synthetics_when_iterator_empty() {
let s = Source::from_iter(vec![1i32, 2, 3]);
let out = Sink::collect(expand(s, |_last| std::iter::empty::<i32>())).await;
assert_eq!(out, vec![1, 2, 3]);
}
}