fx_rs/stream/
stream.rs

1use crate::{Fx, State};
2
3#[derive(Clone)]
4pub enum Item<T> {
5    Next(T),
6    Done(T),
7}
8
9#[derive(Clone)]
10pub enum Stream<'f, I: Clone, S: Clone> {
11    Nil,
12    Cons(I, Box<StreamFx<'f, I, S>>),
13}
14
15pub type StreamFx<'f, I, S> = Fx<'f, S, Stream<'f, I, S>>;
16
17impl<'f, I: Clone, S: Clone> Stream<'f, I, S> {
18    pub fn empty() -> StreamFx<'f, I, S> {
19        Fx::value(Self::Nil)
20    }
21
22    pub fn cons(head: I, tail: StreamFx<'f, I, S>) -> StreamFx<'f, I, S> {
23        Fx::value(Self::Cons(head, Box::new(tail)))
24    }
25
26    pub fn single(i: I) -> StreamFx<'f, I, S> {
27        Fx::value(Self::Cons(i, Box::new(Self::empty())))
28    }
29
30    pub fn concat(left: StreamFx<'f, I, S>, right: StreamFx<'f, I, S>) -> StreamFx<'f, I, S> {
31        left.concat(right)
32    }
33}
34
35impl<'f, I: Clone, S: Clone> StreamFx<'f, I, S> {
36    pub fn concat(self, tail: Self) -> Self {
37        self.map_m(|s| match s {
38            Stream::Nil => tail,
39            Stream::Cons(head, fx) => Stream::cons(head, Self::concat(*fx, tail)),
40        })
41    }
42
43    pub fn fold_stream<A, F>(f: F) -> Fx<'f, (Self, (A, S)), A>
44    where
45        A: Clone + 'f,
46        F: FnOnce(A, I) -> Fx<'f, S, Item<A>> + Clone + 'f,
47    {
48        State::get()
49            .flat_map(move |(stream, initial): (Self, A)| {
50                Self::fold_stream_rec(initial, stream, f.clone())
51            })
52            .contra_map(|(s, (a, r))| ((s, a), r), |_, ((s, a), r)| (s, (a, r)))
53    }
54
55    pub fn fold<A, F>(self, f: F) -> Fx<'f, (A, S), A>
56    where
57        A: Clone + 'f,
58        F: FnOnce(A, I) -> Fx<'f, S, Item<A>> + Clone + 'f,
59    {
60        State::get()
61            .flat_map(move |initial: A| Self::fold_stream_rec(initial, self.clone(), f.clone()))
62    }
63
64    fn fold_stream_rec<A, F>(current: A, stream: Self, f: F) -> Fx<'f, S, A>
65    where
66        A: Clone + 'f,
67        F: FnOnce(A, I) -> Fx<'f, S, Item<A>> + Clone + 'f,
68    {
69        stream.map_m(move |step| {
70            match step {
71                Stream::Nil => Fx::value(current.clone()),
72                Stream::Cons(head, tail) => {
73                    let f0 = f.clone();
74                    let acc = f(current.clone(), head);
75                    acc.map_m(move |acc| match acc {
76                        Item::Done(a) => Fx::value(a), // TODO: stop stream producer
77                        Item::Next(a) => Self::fold_stream_rec(a, (&*tail).clone(), f0),
78                    })
79                }
80            }
81        })
82    }
83}