comp_cat_rs/effect/
stream.rs1use std::sync::Arc;
13
14use super::io::Io;
15use crate::foundation::Kind;
16
17pub struct StreamK<E> {
19 _phantom: core::marker::PhantomData<E>,
20}
21
22impl<E: 'static> Kind for StreamK<E> {
23 type F<A> = Stream<E, A>;
24}
25
26type Step<E, A> = Io<E, Option<(A, Stream<E, A>)>>;
28
29type UnfoldFn<E, A, S> = Arc<dyn Fn(S) -> Io<E, Option<(A, S)>> + Send + Sync>;
31
32pub struct Stream<E, A> {
37 step: Box<dyn FnOnce() -> Step<E, A> + Send>,
38}
39
40impl<E: Send + 'static, A: Send + 'static> Stream<E, A> {
41 #[must_use]
43 pub fn empty() -> Self {
44 Self {
45 step: Box::new(|| Io::pure(None)),
46 }
47 }
48
49 #[must_use]
51 pub fn emit(a: A) -> Self {
52 Self {
53 step: Box::new(move || Io::pure(Some((a, Self::empty())))),
54 }
55 }
56
57 #[must_use]
59 pub fn from_vec(items: Vec<A>) -> Self {
60 items
61 .into_iter()
62 .rev()
63 .fold(Self::empty(), |rest, item| Self {
64 step: Box::new(move || Io::pure(Some((item, rest)))),
65 })
66 }
67
68 #[must_use]
73 pub fn unfold<S: Send + 'static>(
74 init: S,
75 step: UnfoldFn<E, A, S>,
76 ) -> Self {
77 let step_clone = Arc::clone(&step);
78 Self {
79 step: Box::new(move || {
80 step(init).map(move |opt| {
81 opt.map(|(a, next_state)| (a, Self::unfold(next_state, step_clone)))
82 })
83 }),
84 }
85 }
86
87 #[must_use]
89 pub fn from_io(io: Io<E, A>) -> Self {
90 Self {
91 step: Box::new(move || io.map(|a| Some((a, Self::empty())))),
92 }
93 }
94
95 #[must_use]
97 pub fn map<B: Send + 'static>(self, f: Arc<dyn Fn(A) -> B + Send + Sync>) -> Stream<E, B> {
98 let f_clone = Arc::clone(&f);
99 Stream {
100 step: Box::new(move || {
101 self.pull().map(move |opt| {
102 opt.map(|(a, rest)| (f(a), rest.map(f_clone)))
103 })
104 }),
105 }
106 }
107
108 #[must_use]
110 pub fn concat(self, other: Stream<E, A>) -> Self {
111 Self {
112 step: Box::new(move || {
113 self.pull().flat_map(move |opt| match opt {
114 Some((a, rest)) => Io::pure(Some((a, rest.concat(other)))),
115 None => other.pull(),
116 })
117 }),
118 }
119 }
120
121 #[must_use]
123 pub fn take(self, n: usize) -> Self {
124 match n {
125 0 => Self::empty(),
126 _ => Self {
127 step: Box::new(move || {
128 self.pull().map(move |opt| {
129 opt.map(|(a, rest)| (a, rest.take(n - 1)))
130 })
131 }),
132 },
133 }
134 }
135
136 #[must_use]
140 pub fn fold<B: Send + 'static>(self, init: B, f: Arc<dyn Fn(B, A) -> B + Send + Sync>) -> Io<E, B> {
141 self.pull().flat_map(move |opt| match opt {
142 None => Io::pure(init),
143 Some((a, rest)) => {
144 let next = f(init, a);
145 rest.fold(next, f)
146 }
147 })
148 }
149
150 #[must_use]
152 pub fn collect(self) -> Io<E, Vec<A>> {
153 self.fold(Vec::new(), Arc::new(|acc, a| {
154 acc.into_iter().chain(std::iter::once(a)).collect()
155 }))
156 }
157
158 fn pull(self) -> Step<E, A> {
162 (self.step)()
163 }
164}