effect_rs/
stream.rs

1use crate::core::*;
2use futures::stream::{BoxStream, StreamExt};
3use std::sync::Arc;
4
5/// A stream of effects.
6/// R: Environment
7/// E: Error
8/// A: Item type
9// Type alias for inner stream function
10type StreamInner<R, E, A> = dyn Fn(EnvRef<R>, Ctx) -> BoxStream<'static, Exit<E, A>> + Send + Sync;
11
12pub struct EffectStream<R, E, A> {
13    pub(crate) inner: Arc<StreamInner<R, E, A>>,
14}
15
16impl<R, E, A> Clone for EffectStream<R, E, A> {
17    fn clone(&self) -> Self {
18        Self {
19            inner: self.inner.clone(),
20        }
21    }
22}
23
24impl<R, E, A> EffectStream<R, E, A>
25where
26    R: 'static + Send + Sync + Clone,
27    E: 'static + Send + Sync + Clone,
28    A: 'static + Send + Sync + Clone,
29{
30    /// Creates a stream from an iterator.
31    #[allow(clippy::should_implement_trait)]
32    pub fn from_iter<I>(iter: I) -> Self
33    where
34        I: IntoIterator<Item = A> + Send + Sync + 'static,
35        I::IntoIter: Send,
36    {
37        // Capture iterator. Note: Iterator must be Send.
38        // For simplicity, we convert to Vec first in this MVP to avoid lifetime issues with IntoIter in BoxStream
39        let items: Vec<A> = iter.into_iter().collect();
40
41        Self {
42            inner: Arc::new(move |_, _| {
43                let items = items.clone();
44                futures::stream::iter(items.into_iter().map(Exit::Success)).boxed()
45            }),
46        }
47    }
48
49    /// Maps over elements of the stream.
50    pub fn map<B, F>(self, f: F) -> EffectStream<R, E, B>
51    where
52        F: Fn(A) -> B + Send + Sync + 'static + Clone,
53        B: Send + Sync + Clone + 'static,
54    {
55        EffectStream {
56            inner: Arc::new(move |env, ctx| {
57                let stream = (self.inner)(env, ctx);
58                let f = f.clone();
59                stream
60                    .map(move |exit| match exit {
61                        Exit::Success(a) => Exit::Success(f(a)),
62                        Exit::Failure(c) => Exit::Failure(c),
63                    })
64                    .boxed()
65            }),
66        }
67    }
68
69    /// Runs the stream and collects all values into a Vec.
70    pub fn run_collect(self) -> Effect<R, E, Vec<A>> {
71        Effect {
72            inner: Arc::new(move |env, ctx| {
73                let stream = (self.inner)(env, ctx);
74                Box::pin(async move {
75                    let mut results = Vec::new();
76                    let mut stream = stream;
77                    while let Some(exit) = stream.next().await {
78                        match exit {
79                            Exit::Success(a) => results.push(a),
80                            Exit::Failure(c) => return Exit::Failure(c),
81                        }
82                    }
83                    Exit::Success(results)
84                })
85            }),
86        }
87    }
88    pub fn merge(self, other: EffectStream<R, E, A>) -> EffectStream<R, E, A> {
89        EffectStream {
90            inner: Arc::new(move |env, ctx| {
91                let s1 = (self.inner)(env.clone(), ctx.clone());
92                let s2 = (other.inner)(env, ctx);
93                futures::stream::select(s1, s2).boxed()
94            }),
95        }
96    }
97
98    pub fn map_par<B, F>(self, concurrency: usize, f: F) -> EffectStream<R, E, B>
99    where
100        F: Fn(A) -> Effect<R, E, B> + Send + Sync + 'static + Clone,
101        B: Send + Sync + Clone + 'static,
102        R: 'static + Send + Sync + Clone,
103        E: 'static + Send + Sync + Clone,
104    {
105        EffectStream {
106            inner: Arc::new(move |env, ctx| {
107                let stream = (self.inner)(env.clone(), ctx.clone());
108                let f = f.clone();
109                let env = env.clone();
110                let ctx = ctx.clone();
111
112                stream
113                    .map(move |exit| {
114                        let f = f.clone();
115                        let env = env.clone();
116                        let ctx = ctx.clone();
117                        async move {
118                            match exit {
119                                Exit::Success(a) => {
120                                    let eff = f(a);
121                                    // Now we can run it!
122                                    (eff.inner)(env, ctx).await
123                                }
124                                Exit::Failure(c) => Exit::Failure(c),
125                            }
126                        }
127                    })
128                    .buffer_unordered(concurrency)
129                    .boxed()
130            }),
131        }
132    }
133
134    // Attempting Ctx refactor later. For now, let's implement combinators that don't need Ctx.
135
136    pub fn buffer(self, capacity: usize) -> EffectStream<R, E, A> {
137        EffectStream {
138            inner: Arc::new(move |env, ctx| {
139                let mut stream = (self.inner)(env, ctx);
140                let (tx, rx) = tokio::sync::mpsc::channel(capacity);
141
142                // Spawn producer in background
143                tokio::spawn(async move {
144                    while let Some(item) = stream.next().await {
145                        if tx.send(item).await.is_err() {
146                            break; // Receiver dropped
147                        }
148                    }
149                });
150
151                // Receiver stream
152                // We need to convert Receiver to Stream
153                tokio_stream::wrappers::ReceiverStream::new(rx).boxed()
154            }),
155        }
156    }
157
158    pub fn take(self, n: usize) -> EffectStream<R, E, A> {
159        EffectStream {
160            inner: Arc::new(move |env, ctx| {
161                let stream = (self.inner)(env, ctx);
162                stream.take(n).boxed()
163            }),
164        }
165    }
166
167    pub fn filter<F>(self, f: F) -> EffectStream<R, E, A>
168    where
169        F: Fn(&A) -> bool + Send + Sync + 'static + Clone,
170    {
171        EffectStream {
172            inner: Arc::new(move |env, ctx| {
173                let stream = (self.inner)(env, ctx);
174                let f = f.clone();
175                stream
176                    .filter_map(move |exit| {
177                        let f = f.clone();
178                        async move {
179                            match exit {
180                                Exit::Success(a) => {
181                                    if f(&a) {
182                                        Some(Exit::Success(a))
183                                    } else {
184                                        None
185                                    }
186                                }
187                                Exit::Failure(c) => Some(Exit::Failure(c)), // Failure propagates
188                            }
189                        }
190                    })
191                    .boxed()
192            }),
193        }
194    }
195}