1use crate::core::*;
2use futures::stream::{BoxStream, StreamExt};
3use std::sync::Arc;
4
5type StreamInner<R, E, A> = dyn Fn(EnvRef<R>, Ctx) -> BoxStream<'static, Exit<E, A>> + Send + Sync;
11
12pub struct EffectStream<R, E, A> {
13 pub(crate) inner: Arc<StreamInner<R, E, A>>,
14}
15
16impl<R, E, A> Clone for EffectStream<R, E, A> {
17 fn clone(&self) -> Self {
18 Self {
19 inner: self.inner.clone(),
20 }
21 }
22}
23
24impl<R, E, A> EffectStream<R, E, A>
25where
26 R: 'static + Send + Sync + Clone,
27 E: 'static + Send + Sync + Clone,
28 A: 'static + Send + Sync + Clone,
29{
30 #[allow(clippy::should_implement_trait)]
32 pub fn from_iter<I>(iter: I) -> Self
33 where
34 I: IntoIterator<Item = A> + Send + Sync + 'static,
35 I::IntoIter: Send,
36 {
37 let items: Vec<A> = iter.into_iter().collect();
40
41 Self {
42 inner: Arc::new(move |_, _| {
43 let items = items.clone();
44 futures::stream::iter(items.into_iter().map(Exit::Success)).boxed()
45 }),
46 }
47 }
48
49 pub fn map<B, F>(self, f: F) -> EffectStream<R, E, B>
51 where
52 F: Fn(A) -> B + Send + Sync + 'static + Clone,
53 B: Send + Sync + Clone + 'static,
54 {
55 EffectStream {
56 inner: Arc::new(move |env, ctx| {
57 let stream = (self.inner)(env, ctx);
58 let f = f.clone();
59 stream
60 .map(move |exit| match exit {
61 Exit::Success(a) => Exit::Success(f(a)),
62 Exit::Failure(c) => Exit::Failure(c),
63 })
64 .boxed()
65 }),
66 }
67 }
68
69 pub fn run_collect(self) -> Effect<R, E, Vec<A>> {
71 Effect {
72 inner: Arc::new(move |env, ctx| {
73 let stream = (self.inner)(env, ctx);
74 Box::pin(async move {
75 let mut results = Vec::new();
76 let mut stream = stream;
77 while let Some(exit) = stream.next().await {
78 match exit {
79 Exit::Success(a) => results.push(a),
80 Exit::Failure(c) => return Exit::Failure(c),
81 }
82 }
83 Exit::Success(results)
84 })
85 }),
86 }
87 }
88 pub fn merge(self, other: EffectStream<R, E, A>) -> EffectStream<R, E, A> {
89 EffectStream {
90 inner: Arc::new(move |env, ctx| {
91 let s1 = (self.inner)(env.clone(), ctx.clone());
92 let s2 = (other.inner)(env, ctx);
93 futures::stream::select(s1, s2).boxed()
94 }),
95 }
96 }
97
98 pub fn map_par<B, F>(self, concurrency: usize, f: F) -> EffectStream<R, E, B>
99 where
100 F: Fn(A) -> Effect<R, E, B> + Send + Sync + 'static + Clone,
101 B: Send + Sync + Clone + 'static,
102 R: 'static + Send + Sync + Clone,
103 E: 'static + Send + Sync + Clone,
104 {
105 EffectStream {
106 inner: Arc::new(move |env, ctx| {
107 let stream = (self.inner)(env.clone(), ctx.clone());
108 let f = f.clone();
109 let env = env.clone();
110 let ctx = ctx.clone();
111
112 stream
113 .map(move |exit| {
114 let f = f.clone();
115 let env = env.clone();
116 let ctx = ctx.clone();
117 async move {
118 match exit {
119 Exit::Success(a) => {
120 let eff = f(a);
121 (eff.inner)(env, ctx).await
123 }
124 Exit::Failure(c) => Exit::Failure(c),
125 }
126 }
127 })
128 .buffer_unordered(concurrency)
129 .boxed()
130 }),
131 }
132 }
133
134 pub fn buffer(self, capacity: usize) -> EffectStream<R, E, A> {
137 EffectStream {
138 inner: Arc::new(move |env, ctx| {
139 let mut stream = (self.inner)(env, ctx);
140 let (tx, rx) = tokio::sync::mpsc::channel(capacity);
141
142 tokio::spawn(async move {
144 while let Some(item) = stream.next().await {
145 if tx.send(item).await.is_err() {
146 break; }
148 }
149 });
150
151 tokio_stream::wrappers::ReceiverStream::new(rx).boxed()
154 }),
155 }
156 }
157
158 pub fn take(self, n: usize) -> EffectStream<R, E, A> {
159 EffectStream {
160 inner: Arc::new(move |env, ctx| {
161 let stream = (self.inner)(env, ctx);
162 stream.take(n).boxed()
163 }),
164 }
165 }
166
167 pub fn filter<F>(self, f: F) -> EffectStream<R, E, A>
168 where
169 F: Fn(&A) -> bool + Send + Sync + 'static + Clone,
170 {
171 EffectStream {
172 inner: Arc::new(move |env, ctx| {
173 let stream = (self.inner)(env, ctx);
174 let f = f.clone();
175 stream
176 .filter_map(move |exit| {
177 let f = f.clone();
178 async move {
179 match exit {
180 Exit::Success(a) => {
181 if f(&a) {
182 Some(Exit::Success(a))
183 } else {
184 None
185 }
186 }
187 Exit::Failure(c) => Some(Exit::Failure(c)), }
189 }
190 })
191 .boxed()
192 }),
193 }
194 }
195}