use super::StreamProcessor;
use alloc::boxed::Box;
pub fn alternate<'a, A, B: 'a>(
sp1: StreamProcessor<'a, A, B>,
sp2: StreamProcessor<'a, A, B>,
) -> StreamProcessor<'a, A, B> {
match sp1 {
StreamProcessor::Get(f) => StreamProcessor::Get(Box::new(|a| alternate(f(a), sp2))),
StreamProcessor::Put(b, lazy_sp) => {
StreamProcessor::Put(b, Box::new(|| alternate(sp2, lazy_sp())))
}
}
}
pub fn bind<'a, X, A: 'a, B, F>(sp: StreamProcessor<'a, X, A>, f: F) -> StreamProcessor<'a, X, B>
where
F: FnOnce(A) -> StreamProcessor<'a, X, B> + 'a,
{
match sp {
StreamProcessor::Get(g) => StreamProcessor::Get(Box::new(|a| bind(g(a), f))),
StreamProcessor::Put(b, _) => f(b),
}
}
pub fn compose<'a, A, B, C: 'a>(
mut sp1: StreamProcessor<'a, A, B>,
mut sp2: StreamProcessor<'a, B, C>,
) -> StreamProcessor<'a, A, C> {
loop {
match sp1 {
StreamProcessor::Get(f) => {
return StreamProcessor::Get(Box::new(|a| compose(f(a), sp2)))
}
StreamProcessor::Put(b, lazy_sp1) => match sp2 {
StreamProcessor::Get(f) => {
sp1 = lazy_sp1();
sp2 = f(b);
}
StreamProcessor::Put(c, lazy_sp2) => {
return StreamProcessor::Put(
c,
Box::new(|| compose(StreamProcessor::Put(b, lazy_sp1), lazy_sp2())),
)
}
},
}
}
}
pub fn filter<'a, A, P>(p: P) -> StreamProcessor<'a, A, A>
where
P: Fn(&A) -> bool + 'a,
{
StreamProcessor::Get(Box::new(|a: A| {
if p(&a) {
StreamProcessor::Put(a, Box::new(|| filter(p)))
} else {
filter(p)
}
}))
}
pub fn generate<'a, A, B, S: 'a, F>(body: F, state: S) -> StreamProcessor<'a, A, B>
where
F: Fn(S) -> (B, S) + 'a,
{
let (x, state) = body(state);
StreamProcessor::Put(x, Box::new(|| generate(body, state)))
}
pub fn map<'a, A, B, F>(f: F) -> StreamProcessor<'a, A, B>
where
F: Fn(A) -> B + 'a,
{
StreamProcessor::Get(Box::new(|a: A| {
StreamProcessor::Put(f(a), Box::new(|| map(f)))
}))
}
#[cfg(feature = "std")]
#[cfg(test)]
mod tests {
use super::*;
use crate::streams::overeager_receivers::OvereagerReceiver;
use crate::streams::Stream;
use crate::assert_head_eq;
use crate::assert_tail_starts_with;
use crate::enqueue;
#[test]
fn test_alternate() {
let is_greater_zero = |n: &i8| *n > 0;
let is_less_zero = |n: &i8| *n < 0;
let sp = alternate(filter(is_greater_zero), filter(is_less_zero));
let (tx, stream) = OvereagerReceiver::channel(0, 0);
enqueue!(tx, [1, 2, -1, -2, 1]);
let mut result = sp.eval(stream);
assert_head_eq!(result, 1);
assert_tail_starts_with!(result, [-1, 1]);
}
#[test]
fn test_bind() {
let is_zero = |n: usize| n == 0;
let sp = bind(map(is_zero), |b: bool| {
if b {
bind(map(is_zero), |b: bool| {
if b {
map(|n| n + 2)
} else {
map(|n| n + 1)
}
})
} else {
filter(|n| *n > 0)
}
});
let (tx, stream) = OvereagerReceiver::channel(0, 0);
enqueue!(tx, [1, 0, 1, 2]);
let mut result = sp.eval(stream);
assert_head_eq!(result, 1);
assert_tail_starts_with!(result, [2, 3]);
}
#[test]
fn test_compose() {
let plus_one = |n: usize| n + 1;
let sp = compose(map(plus_one), map(plus_one));
let (tx, stream) = OvereagerReceiver::channel(10, 0);
enqueue!(tx, [1, 2, 10]);
let mut result = sp.eval(stream);
assert_head_eq!(result, 2);
assert_tail_starts_with!(result, [3, 4]);
}
#[test]
fn test_filter() {
let is_greater_zero = |n: &usize| *n > 0;
let sp = filter(is_greater_zero);
let (tx, stream) = OvereagerReceiver::channel(0, 0);
enqueue!(tx, [1, 0, 2]);
let mut result = sp.eval(stream);
assert_head_eq!(result, 1);
assert_tail_starts_with!(result, [2]);
}
#[test]
fn test_generate() {
let ascending = |n: usize| (n, n + 1);
let sp = generate(ascending, 10);
let (tx, stream) = OvereagerReceiver::channel(0, 0);
enqueue!(tx, [0]);
let mut result = sp.eval(stream);
assert_head_eq!(result, 10);
assert_tail_starts_with!(result, [11]);
}
#[test]
fn test_map() {
let plus_one = |n: usize| n + 1;
let sp = map(plus_one);
let (tx, stream) = OvereagerReceiver::channel(10, 0);
enqueue!(tx, [1]);
let mut result = sp.eval(stream);
assert_head_eq!(result, 1);
assert_tail_starts_with!(result, [2]);
}
}