Skip to main content

comp_cat_rs/effect/
stream.rs

1//! Stream: effectful iteration.
2//!
3//! `Stream<E, A>` produces zero or more values of type `A`,
4//! where each step is an `Io<E, _>`.
5//!
6//! Categorically, a stream is a colimit (iterative construction),
7//! hence a left Kan extension by `collapse::monad_is_kan`.
8//!
9//! The representation is pull-based: each stream is a suspended
10//! `Io` that yields either `None` (done) or `Some((value, rest))`.
11
12use std::sync::Arc;
13
14use super::io::Io;
15use crate::foundation::Kind;
16
17/// Witness type for the Stream kind.
18pub struct StreamK<E> {
19    _phantom: core::marker::PhantomData<E>,
20}
21
22impl<E: 'static> Kind for StreamK<E> {
23    type F<A> = Stream<E, A>;
24}
25
26/// The result of pulling one element from a stream.
27type Step<E, A> = Io<E, Option<(A, Stream<E, A>)>>;
28
29/// The step function type for `Stream::unfold`.
30type UnfoldFn<E, A, S> = Arc<dyn Fn(S) -> Io<E, Option<(A, S)>> + Send + Sync>;
31
32/// An effectful stream producing values of type `A`.
33///
34/// Pull-based: each step is an `Io` that produces either
35/// the next value and the rest of the stream, or `None`.
36pub struct Stream<E, A> {
37    step: Box<dyn FnOnce() -> Step<E, A> + Send>,
38}
39
40impl<E: Send + 'static, A: Send + 'static> Stream<E, A> {
41    /// An empty stream.
42    #[must_use]
43    pub fn empty() -> Self {
44        Self {
45            step: Box::new(|| Io::pure(None)),
46        }
47    }
48
49    /// A stream that emits a single value.
50    #[must_use]
51    pub fn emit(a: A) -> Self {
52        Self {
53            step: Box::new(move || Io::pure(Some((a, Self::empty())))),
54        }
55    }
56
57    /// Build a stream from a `Vec` of values.
58    #[must_use]
59    pub fn from_vec(items: Vec<A>) -> Self {
60        items
61            .into_iter()
62            .rev()
63            .fold(Self::empty(), |rest, item| Self {
64                step: Box::new(move || Io::pure(Some((item, rest)))),
65            })
66    }
67
68    /// Build a stream by repeatedly applying a step function to state.
69    ///
70    /// The step function returns `None` to end the stream, or
71    /// `Some((value, next_state))` to emit and continue.
72    #[must_use]
73    pub fn unfold<S: Send + 'static>(
74        init: S,
75        step: UnfoldFn<E, A, S>,
76    ) -> Self {
77        let step_clone = Arc::clone(&step);
78        Self {
79            step: Box::new(move || {
80                step(init).map(move |opt| {
81                    opt.map(|(a, next_state)| (a, Self::unfold(next_state, step_clone)))
82                })
83            }),
84        }
85    }
86
87    /// Lift a single `Io` into a one-element stream.
88    #[must_use]
89    pub fn from_io(io: Io<E, A>) -> Self {
90        Self {
91            step: Box::new(move || io.map(|a| Some((a, Self::empty())))),
92        }
93    }
94
95    /// Apply a function to each element.
96    #[must_use]
97    pub fn map<B: Send + 'static>(self, f: Arc<dyn Fn(A) -> B + Send + Sync>) -> Stream<E, B> {
98        let f_clone = Arc::clone(&f);
99        Stream {
100            step: Box::new(move || {
101                self.pull().map(move |opt| {
102                    opt.map(|(a, rest)| (f(a), rest.map(f_clone)))
103                })
104            }),
105        }
106    }
107
108    /// Append another stream after this one.
109    #[must_use]
110    pub fn concat(self, other: Stream<E, A>) -> Self {
111        Self {
112            step: Box::new(move || {
113                self.pull().flat_map(move |opt| match opt {
114                    Some((a, rest)) => Io::pure(Some((a, rest.concat(other)))),
115                    None => other.pull(),
116                })
117            }),
118        }
119    }
120
121    /// Take at most `n` elements.
122    #[must_use]
123    pub fn take(self, n: usize) -> Self {
124        match n {
125            0 => Self::empty(),
126            _ => Self {
127                step: Box::new(move || {
128                    self.pull().map(move |opt| {
129                        opt.map(|(a, rest)| (a, rest.take(n - 1)))
130                    })
131                }),
132            },
133        }
134    }
135
136    /// Collapse the stream into a single value via a folding function.
137    ///
138    /// This is the primary way to "run" a stream, producing an `Io`.
139    #[must_use]
140    pub fn fold<B: Send + 'static>(self, init: B, f: Arc<dyn Fn(B, A) -> B + Send + Sync>) -> Io<E, B> {
141        self.pull().flat_map(move |opt| match opt {
142            None => Io::pure(init),
143            Some((a, rest)) => {
144                let next = f(init, a);
145                rest.fold(next, f)
146            }
147        })
148    }
149
150    /// Collect all stream elements into a `Vec`.
151    #[must_use]
152    pub fn collect(self) -> Io<E, Vec<A>> {
153        self.fold(Vec::new(), Arc::new(|acc, a| {
154            acc.into_iter().chain(std::iter::once(a)).collect()
155        }))
156    }
157
158    /// Pull the next element from the stream.
159    ///
160    /// Returns the `Io` that produces the next step.
161    fn pull(self) -> Step<E, A> {
162        (self.step)()
163    }
164}