sodium_rust/stream.rs
1use crate::cell::Cell;
2use crate::impl_::dep::Dep;
3use crate::impl_::lambda::{lambda1, lambda2};
4use crate::impl_::lambda::{IsLambda1, IsLambda2, IsLambda3, IsLambda4, IsLambda5, IsLambda6};
5use crate::impl_::stream::Stream as StreamImpl;
6use crate::listener::Listener;
7use crate::sodium_ctx::SodiumCtx;
8use crate::Lazy;
9
10/// Represents a stream of discrete events/firings containing values
11/// of type `A`.
12///
13/// Also known in other FRP systems as an _event_ (which would contain
14/// _event occurrences_), an _event stream_, an _observable_, or a
15/// _signal_.
16pub struct Stream<A> {
17 pub impl_: StreamImpl<A>,
18}
19
20impl<A> Clone for Stream<A> {
21 fn clone(&self) -> Self {
22 Stream {
23 impl_: self.impl_.clone(),
24 }
25 }
26}
27
28impl<A: Clone + Send + 'static> Stream<Option<A>> {
29 /// Return a `Stream` that only outputs events that have present
30 /// values, removing the `Option` wrapper and discarding empty
31 /// values.
32 pub fn filter_option(&self) -> Stream<A> {
33 self.filter(|a: &Option<A>| a.is_some())
34 .map(|a: &Option<A>| a.clone().unwrap())
35 }
36}
37
38impl<
39 A: Clone + Send + Sync + 'static,
40 COLLECTION: IntoIterator<Item = A> + Clone + Send + 'static,
41 > Stream<COLLECTION>
42{
43 /// Flatten a `Stream` of a collection of `A` into a `Stream` of
44 /// single `A`s.
45 pub fn split(&self) -> Stream<A> {
46 Stream {
47 impl_: self.impl_.split(),
48 }
49 }
50}
51
52impl<A: Clone + Send + 'static> Stream<A> {
53 /// Create a `Stream` that will never fire.
54 pub fn new(sodium_ctx: &SodiumCtx) -> Stream<A> {
55 Stream {
56 impl_: StreamImpl::new(&sodium_ctx.impl_),
57 }
58 }
59
60 #[doc(hidden)]
61 // use as dependency to lambda1, lambda2, etc.
62 pub fn to_dep(&self) -> Dep {
63 self.impl_.to_dep()
64 }
65
66 /// Return a stream whose events are the result of the combination
67 /// of the event value and the current value of the cell using the
68 /// specified function.
69 ///
70 /// Note that there is an implicit delay: state updates caused by
71 /// event firings being held with [`Stream::hold`] don't become
72 /// visible as the cell's current value until the following
73 /// transaction. To put this another way, `snapshot` always sees
74 /// the value of a cell as it wass before any state changes from
75 /// the current transaction.
76 pub fn snapshot<
77 B: Clone + Send + 'static,
78 C: Clone + Send + 'static,
79 FN: IsLambda2<A, B, C> + Send + Sync + 'static,
80 >(
81 &self,
82 cb: &Cell<B>,
83 f: FN,
84 ) -> Stream<C> {
85 Stream {
86 impl_: self.impl_.snapshot(&cb.impl_, f),
87 }
88 }
89
90 /// A variant of [`snapshot`][Stream::snapshot] that captures the
91 /// cell's value at the time of the event firing, ignoring the
92 /// stream's value.
93 pub fn snapshot1<B: Send + Clone + 'static>(&self, cb: &Cell<B>) -> Stream<B> {
94 self.snapshot(cb, |_a: &A, b: &B| b.clone())
95 }
96
97 /// A variant of [`snapshot`][Stream::snapshot] that captures the
98 /// value of two cells.
99 pub fn snapshot3<
100 B: Send + Clone + 'static,
101 C: Send + Clone + 'static,
102 D: Send + Clone + 'static,
103 FN: IsLambda3<A, B, C, D> + Send + Sync + 'static,
104 >(
105 &self,
106 cb: &Cell<B>,
107 cc: &Cell<C>,
108 mut f: FN,
109 ) -> Stream<D> {
110 let mut deps = if let Some(deps2) = f.deps_op() {
111 deps2.clone()
112 } else {
113 Vec::new()
114 };
115 let cc = cc.clone();
116 deps.push(cc.to_dep());
117 self.snapshot(
118 cb,
119 lambda2(move |a: &A, b: &B| f.call(a, b, &cc.sample()), deps),
120 )
121 }
122
123 /// A variant of [`snapshot`][Stream::snapshot] that captures the
124 /// value of three cells.
125 pub fn snapshot4<
126 B: Send + Clone + 'static,
127 C: Send + Clone + 'static,
128 D: Send + Clone + 'static,
129 E: Send + Clone + 'static,
130 FN: IsLambda4<A, B, C, D, E> + Send + Sync + 'static,
131 >(
132 &self,
133 cb: &Cell<B>,
134 cc: &Cell<C>,
135 cd: &Cell<D>,
136 mut f: FN,
137 ) -> Stream<E> {
138 let mut deps = if let Some(deps2) = f.deps_op() {
139 deps2.clone()
140 } else {
141 Vec::new()
142 };
143 let cc = cc.clone();
144 let cd = cd.clone();
145 deps.push(cc.to_dep());
146 deps.push(cd.to_dep());
147 self.snapshot(
148 cb,
149 lambda2(
150 move |a: &A, b: &B| f.call(a, b, &cc.sample(), &cd.sample()),
151 deps,
152 ),
153 )
154 }
155
156 /// A variant of [`snapshot`][Stream::snapshot] that captures the
157 /// value of four cells.
158 pub fn snapshot5<
159 B: Send + Clone + 'static,
160 C: Send + Clone + 'static,
161 D: Send + Clone + 'static,
162 E: Send + Clone + 'static,
163 F: Send + Clone + 'static,
164 FN: IsLambda5<A, B, C, D, E, F> + Send + Sync + 'static,
165 >(
166 &self,
167 cb: &Cell<B>,
168 cc: &Cell<C>,
169 cd: &Cell<D>,
170 ce: &Cell<E>,
171 mut f: FN,
172 ) -> Stream<F> {
173 let mut deps = if let Some(deps2) = f.deps_op() {
174 deps2.clone()
175 } else {
176 Vec::new()
177 };
178 let cc = cc.clone();
179 let cd = cd.clone();
180 let ce = ce.clone();
181 deps.push(cc.to_dep());
182 deps.push(cd.to_dep());
183 deps.push(ce.to_dep());
184 self.snapshot(
185 cb,
186 lambda2(
187 move |a: &A, b: &B| f.call(a, b, &cc.sample(), &cd.sample(), &ce.sample()),
188 deps,
189 ),
190 )
191 }
192
193 /// A variant of [`snapshot`][Stream::snapshot] that captures the
194 /// value of five cells.
195 pub fn snapshot6<
196 B: Send + Clone + 'static,
197 C: Send + Clone + 'static,
198 D: Send + Clone + 'static,
199 E: Send + Clone + 'static,
200 F: Send + Clone + 'static,
201 G: Send + Clone + 'static,
202 FN: IsLambda6<A, B, C, D, E, F, G> + Send + Sync + 'static,
203 >(
204 &self,
205 cb: &Cell<B>,
206 cc: &Cell<C>,
207 cd: &Cell<D>,
208 ce: &Cell<E>,
209 cf: &Cell<F>,
210 mut f: FN,
211 ) -> Stream<G> {
212 let mut deps = if let Some(deps2) = f.deps_op() {
213 deps2.clone()
214 } else {
215 Vec::new()
216 };
217 let cc = cc.clone();
218 let cd = cd.clone();
219 let ce = ce.clone();
220 let cf = cf.clone();
221 deps.push(cc.to_dep());
222 deps.push(cd.to_dep());
223 deps.push(ce.to_dep());
224 deps.push(cf.to_dep());
225 self.snapshot(
226 cb,
227 lambda2(
228 move |a: &A, b: &B| {
229 f.call(a, b, &cc.sample(), &cd.sample(), &ce.sample(), &cf.sample())
230 },
231 deps,
232 ),
233 )
234 }
235
236 /// Transform this `Stream`'s event values with the supplied
237 /// function.
238 ///
239 /// The supplied function may construct FRP logic or use
240 /// [`Cell::sample`], in which case it's equivalent to
241 /// [`snapshot`][Stream::snapshot]ing the cell. In addition, the
242 /// function must be referentially transparent.
243 pub fn map<B: Send + Clone + 'static, FN: IsLambda1<A, B> + Send + Sync + 'static>(
244 &self,
245 f: FN,
246 ) -> Stream<B> {
247 Stream {
248 impl_: self.impl_.map(f),
249 }
250 }
251
252 /// Transform this `Stream`'s event values into the specified constant value.
253 pub fn map_to<B: Send + Sync + Clone + 'static>(&self, b: B) -> Stream<B> {
254 self.map(move |_: &A| b.clone())
255 }
256
257 /// Return a `Stream` that only outputs events for which the predicate returns `true`.
258 pub fn filter<PRED: IsLambda1<A, bool> + Send + Sync + 'static>(
259 &self,
260 pred: PRED,
261 ) -> Stream<A> {
262 Stream {
263 impl_: self.impl_.filter(pred),
264 }
265 }
266
267 /// Variant of [`merge`][Stream::merge] that merges two streams.
268 ///
269 /// In the case where two events are simultaneous (both in the
270 /// same transaction), the event taken from `self` takes
271 /// precedenc, and the event from `s2` will be dropped.
272 ///
273 /// If you want to specify your own combining function use
274 /// [`merge`][Stream::merge]. This function is equivalent to
275 /// `s1.merge(s2, |l, _r| l)`. The name `or_else` is used instead
276 /// of `merge` to make it clear that care should be taken because
277 /// events can be dropped.
278 pub fn or_else(&self, s2: &Stream<A>) -> Stream<A> {
279 self.merge(s2, |lhs: &A, _rhs: &A| lhs.clone())
280 }
281
282 /// Merge two streams of the same type into one, so that events on
283 /// either input appear on the returned stream.
284 ///
285 /// If the events are simultaneous (that is, one event from `self`
286 /// and one from `s2` occur in the same transaction), combine them
287 /// into one using the specified combining function so that the
288 /// returned stream is guaranteed only ever to have one event per
289 /// transaction. The event from `self` will appear at the left
290 /// input of the combining function, and the event from `s2` will
291 /// appear at the right.
292 pub fn merge<FN: IsLambda2<A, A, A> + Send + Sync + 'static>(
293 &self,
294 s2: &Stream<A>,
295 f: FN,
296 ) -> Stream<A> {
297 Stream {
298 impl_: self.impl_.merge(&s2.impl_, f),
299 }
300 }
301
302 /// Returns a cell with the specified initial value, which is
303 /// updated by this stream's event values.
304 pub fn hold(&self, a: A) -> Cell<A> {
305 Cell {
306 impl_: self.impl_.hold(a),
307 }
308 }
309
310 /// A variant of [`hold`][Stream::hold] that uses an initial value
311 /// returned by [`Cell::sample_lazy`].
312 pub fn hold_lazy(&self, a: Lazy<A>) -> Cell<A> {
313 Cell {
314 impl_: self.impl_.hold_lazy(a),
315 }
316 }
317
318 /// Return a stream that only outputs events from the input stream
319 /// when the specified cell's value is true.
320 pub fn gate(&self, cpred: &Cell<bool>) -> Stream<A> {
321 let cpred = cpred.clone();
322 let cpred_dep = cpred.to_dep();
323 self.filter(lambda1(move |_: &A| cpred.sample(), vec![cpred_dep]))
324 }
325
326 /// Return a stream that outputs only one value, which is the next
327 /// event of the input stream, starting from the transaction in
328 /// `once` was invoked.
329 pub fn once(&self) -> Stream<A> {
330 Stream {
331 impl_: self.impl_.once(),
332 }
333 }
334
335 /// Transform an event with a generalized state loop (a Mealy
336 /// machine). The function is passed the input and the old state
337 /// and returns the new state and output value.
338 pub fn collect<B, S, F>(&self, init_state: S, f: F) -> Stream<B>
339 where
340 B: Send + Clone + 'static,
341 S: Send + Clone + 'static,
342 F: IsLambda2<A, S, (B, S)> + Send + Sync + 'static,
343 {
344 self.collect_lazy(Lazy::new(move || init_state.clone()), f)
345 }
346
347 /// A variant of [`collect`][Stream::collect] that takes an
348 /// initial state that is returned by [`Cell::sample_lazy`].
349 pub fn collect_lazy<B, S, F>(&self, init_state: Lazy<S>, f: F) -> Stream<B>
350 where
351 B: Send + Clone + 'static,
352 S: Send + Clone + 'static,
353 F: IsLambda2<A, S, (B, S)> + Send + Sync + 'static,
354 {
355 Stream {
356 impl_: self.impl_.collect_lazy(init_state, f),
357 }
358 }
359
360 /// Accumulate on an input event, outputting the new state each time.
361 ///
362 /// As each event is received, the accumulating function `f` is
363 /// called with the current state and the new event value. The
364 /// accumulating function may construct FRP logic or use
365 /// [`Cell::sample`], in which case it's equivalent to
366 /// [`snapshot`][Stream::snapshot]ing the cell. In additon, the
367 /// function must be referentially transparent.
368 pub fn accum<S, F>(&self, init_state: S, f: F) -> Cell<S>
369 where
370 S: Send + Clone + 'static,
371 F: IsLambda2<A, S, S> + Send + Sync + 'static,
372 {
373 self.accum_lazy(Lazy::new(move || init_state.clone()), f)
374 }
375
376 /// A variant of [`accum`][Stream::accum] that takes an initial
377 /// state returned by [`Cell::sample_lazy`].
378 pub fn accum_lazy<S, F>(&self, init_state: Lazy<S>, f: F) -> Cell<S>
379 where
380 S: Send + Clone + 'static,
381 F: IsLambda2<A, S, S> + Send + Sync + 'static,
382 {
383 Cell {
384 impl_: self.impl_.accum_lazy(init_state, f),
385 }
386 }
387
388 /// A variant of [`listen`][Stream::listen] that will deregister
389 /// the listener automatically if the listener is
390 /// garbage-collected.
391 ///
392 /// With [`listen`][Stream::listen] the listener is only
393 /// deregistered if [`Listener::unlisten`] is called explicitly.
394 pub fn listen_weak<K: IsLambda1<A, ()> + Send + Sync + 'static>(&self, k: K) -> Listener {
395 Listener {
396 impl_: self.impl_.listen_weak(k),
397 }
398 }
399
400 /// Listen for events/firings on this stream.
401 ///
402 /// This is the observer pattern. The returned [`Listener`] has an
403 /// [`unlisten`][Listener::unlisten] method to cause the listener
404 /// to be removed. This is an operational mechanism for
405 /// interfacing between the world of I/O and FRP.
406 ///
407 /// The handler function for this listener should make no
408 /// assumptions about what thread it will be called on, and the
409 /// handler should not block. It also is not allowed to use
410 /// [`CellSink::send`][crate::CellSink::send] or
411 /// [`StreamSink::send`][crate::StreamSink::send] in the handler.
412 pub fn listen<K: IsLambda1<A, ()> + Send + Sync + 'static>(&self, k: K) -> Listener {
413 Listener {
414 impl_: self.impl_.listen(k),
415 }
416 }
417}