sodium_rust/
stream.rs

1use crate::cell::Cell;
2use crate::impl_::dep::Dep;
3use crate::impl_::lambda::{lambda1, lambda2};
4use crate::impl_::lambda::{IsLambda1, IsLambda2, IsLambda3, IsLambda4, IsLambda5, IsLambda6};
5use crate::impl_::stream::Stream as StreamImpl;
6use crate::listener::Listener;
7use crate::sodium_ctx::SodiumCtx;
8use crate::Lazy;
9
10/// Represents a stream of discrete events/firings containing values
11/// of type `A`.
12///
13/// Also known in other FRP systems as an _event_ (which would contain
14/// _event occurrences_), an _event stream_, an _observable_, or a
15/// _signal_.
16pub struct Stream<A> {
17    pub impl_: StreamImpl<A>,
18}
19
20impl<A> Clone for Stream<A> {
21    fn clone(&self) -> Self {
22        Stream {
23            impl_: self.impl_.clone(),
24        }
25    }
26}
27
28impl<A: Clone + Send + 'static> Stream<Option<A>> {
29    /// Return a `Stream` that only outputs events that have present
30    /// values, removing the `Option` wrapper and discarding empty
31    /// values.
32    pub fn filter_option(&self) -> Stream<A> {
33        self.filter(|a: &Option<A>| a.is_some())
34            .map(|a: &Option<A>| a.clone().unwrap())
35    }
36}
37
38impl<
39        A: Clone + Send + Sync + 'static,
40        COLLECTION: IntoIterator<Item = A> + Clone + Send + 'static,
41    > Stream<COLLECTION>
42{
43    /// Flatten a `Stream` of a collection of `A` into a `Stream` of
44    /// single `A`s.
45    pub fn split(&self) -> Stream<A> {
46        Stream {
47            impl_: self.impl_.split(),
48        }
49    }
50}
51
52impl<A: Clone + Send + 'static> Stream<A> {
53    /// Create a `Stream` that will never fire.
54    pub fn new(sodium_ctx: &SodiumCtx) -> Stream<A> {
55        Stream {
56            impl_: StreamImpl::new(&sodium_ctx.impl_),
57        }
58    }
59
60    #[doc(hidden)]
61    // use as dependency to lambda1, lambda2, etc.
62    pub fn to_dep(&self) -> Dep {
63        self.impl_.to_dep()
64    }
65
66    /// Return a stream whose events are the result of the combination
67    /// of the event value and the current value of the cell using the
68    /// specified function.
69    ///
70    /// Note that there is an implicit delay: state updates caused by
71    /// event firings being held with [`Stream::hold`] don't become
72    /// visible as the cell's current value until the following
73    /// transaction. To put this another way, `snapshot` always sees
74    /// the value of a cell as it wass before any state changes from
75    /// the current transaction.
76    pub fn snapshot<
77        B: Clone + Send + 'static,
78        C: Clone + Send + 'static,
79        FN: IsLambda2<A, B, C> + Send + Sync + 'static,
80    >(
81        &self,
82        cb: &Cell<B>,
83        f: FN,
84    ) -> Stream<C> {
85        Stream {
86            impl_: self.impl_.snapshot(&cb.impl_, f),
87        }
88    }
89
90    /// A variant of [`snapshot`][Stream::snapshot] that captures the
91    /// cell's value at the time of the event firing, ignoring the
92    /// stream's value.
93    pub fn snapshot1<B: Send + Clone + 'static>(&self, cb: &Cell<B>) -> Stream<B> {
94        self.snapshot(cb, |_a: &A, b: &B| b.clone())
95    }
96
97    /// A variant of [`snapshot`][Stream::snapshot] that captures the
98    /// value of two cells.
99    pub fn snapshot3<
100        B: Send + Clone + 'static,
101        C: Send + Clone + 'static,
102        D: Send + Clone + 'static,
103        FN: IsLambda3<A, B, C, D> + Send + Sync + 'static,
104    >(
105        &self,
106        cb: &Cell<B>,
107        cc: &Cell<C>,
108        mut f: FN,
109    ) -> Stream<D> {
110        let mut deps = if let Some(deps2) = f.deps_op() {
111            deps2.clone()
112        } else {
113            Vec::new()
114        };
115        let cc = cc.clone();
116        deps.push(cc.to_dep());
117        self.snapshot(
118            cb,
119            lambda2(move |a: &A, b: &B| f.call(a, b, &cc.sample()), deps),
120        )
121    }
122
123    /// A variant of [`snapshot`][Stream::snapshot] that captures the
124    /// value of three cells.
125    pub fn snapshot4<
126        B: Send + Clone + 'static,
127        C: Send + Clone + 'static,
128        D: Send + Clone + 'static,
129        E: Send + Clone + 'static,
130        FN: IsLambda4<A, B, C, D, E> + Send + Sync + 'static,
131    >(
132        &self,
133        cb: &Cell<B>,
134        cc: &Cell<C>,
135        cd: &Cell<D>,
136        mut f: FN,
137    ) -> Stream<E> {
138        let mut deps = if let Some(deps2) = f.deps_op() {
139            deps2.clone()
140        } else {
141            Vec::new()
142        };
143        let cc = cc.clone();
144        let cd = cd.clone();
145        deps.push(cc.to_dep());
146        deps.push(cd.to_dep());
147        self.snapshot(
148            cb,
149            lambda2(
150                move |a: &A, b: &B| f.call(a, b, &cc.sample(), &cd.sample()),
151                deps,
152            ),
153        )
154    }
155
156    /// A variant of [`snapshot`][Stream::snapshot] that captures the
157    /// value of four cells.
158    pub fn snapshot5<
159        B: Send + Clone + 'static,
160        C: Send + Clone + 'static,
161        D: Send + Clone + 'static,
162        E: Send + Clone + 'static,
163        F: Send + Clone + 'static,
164        FN: IsLambda5<A, B, C, D, E, F> + Send + Sync + 'static,
165    >(
166        &self,
167        cb: &Cell<B>,
168        cc: &Cell<C>,
169        cd: &Cell<D>,
170        ce: &Cell<E>,
171        mut f: FN,
172    ) -> Stream<F> {
173        let mut deps = if let Some(deps2) = f.deps_op() {
174            deps2.clone()
175        } else {
176            Vec::new()
177        };
178        let cc = cc.clone();
179        let cd = cd.clone();
180        let ce = ce.clone();
181        deps.push(cc.to_dep());
182        deps.push(cd.to_dep());
183        deps.push(ce.to_dep());
184        self.snapshot(
185            cb,
186            lambda2(
187                move |a: &A, b: &B| f.call(a, b, &cc.sample(), &cd.sample(), &ce.sample()),
188                deps,
189            ),
190        )
191    }
192
193    /// A variant of [`snapshot`][Stream::snapshot] that captures the
194    /// value of five cells.
195    pub fn snapshot6<
196        B: Send + Clone + 'static,
197        C: Send + Clone + 'static,
198        D: Send + Clone + 'static,
199        E: Send + Clone + 'static,
200        F: Send + Clone + 'static,
201        G: Send + Clone + 'static,
202        FN: IsLambda6<A, B, C, D, E, F, G> + Send + Sync + 'static,
203    >(
204        &self,
205        cb: &Cell<B>,
206        cc: &Cell<C>,
207        cd: &Cell<D>,
208        ce: &Cell<E>,
209        cf: &Cell<F>,
210        mut f: FN,
211    ) -> Stream<G> {
212        let mut deps = if let Some(deps2) = f.deps_op() {
213            deps2.clone()
214        } else {
215            Vec::new()
216        };
217        let cc = cc.clone();
218        let cd = cd.clone();
219        let ce = ce.clone();
220        let cf = cf.clone();
221        deps.push(cc.to_dep());
222        deps.push(cd.to_dep());
223        deps.push(ce.to_dep());
224        deps.push(cf.to_dep());
225        self.snapshot(
226            cb,
227            lambda2(
228                move |a: &A, b: &B| {
229                    f.call(a, b, &cc.sample(), &cd.sample(), &ce.sample(), &cf.sample())
230                },
231                deps,
232            ),
233        )
234    }
235
236    /// Transform this `Stream`'s event values with the supplied
237    /// function.
238    ///
239    /// The supplied function may construct FRP logic or use
240    /// [`Cell::sample`], in which case it's equivalent to
241    /// [`snapshot`][Stream::snapshot]ing the cell. In addition, the
242    /// function must be referentially transparent.
243    pub fn map<B: Send + Clone + 'static, FN: IsLambda1<A, B> + Send + Sync + 'static>(
244        &self,
245        f: FN,
246    ) -> Stream<B> {
247        Stream {
248            impl_: self.impl_.map(f),
249        }
250    }
251
252    /// Transform this `Stream`'s event values into the specified constant value.
253    pub fn map_to<B: Send + Sync + Clone + 'static>(&self, b: B) -> Stream<B> {
254        self.map(move |_: &A| b.clone())
255    }
256
257    /// Return a `Stream` that only outputs events for which the predicate returns `true`.
258    pub fn filter<PRED: IsLambda1<A, bool> + Send + Sync + 'static>(
259        &self,
260        pred: PRED,
261    ) -> Stream<A> {
262        Stream {
263            impl_: self.impl_.filter(pred),
264        }
265    }
266
267    /// Variant of [`merge`][Stream::merge] that merges two streams.
268    ///
269    /// In the case where two events are simultaneous (both in the
270    /// same transaction), the event taken from `self` takes
271    /// precedenc, and the event from `s2` will be dropped.
272    ///
273    /// If you want to specify your own combining function use
274    /// [`merge`][Stream::merge]. This function is equivalent to
275    /// `s1.merge(s2, |l, _r| l)`. The name `or_else` is used instead
276    /// of `merge` to make it clear that care should be taken because
277    /// events can be dropped.
278    pub fn or_else(&self, s2: &Stream<A>) -> Stream<A> {
279        self.merge(s2, |lhs: &A, _rhs: &A| lhs.clone())
280    }
281
282    /// Merge two streams of the same type into one, so that events on
283    /// either input appear on the returned stream.
284    ///
285    /// If the events are simultaneous (that is, one event from `self`
286    /// and one from `s2` occur in the same transaction), combine them
287    /// into one using the specified combining function so that the
288    /// returned stream is guaranteed only ever to have one event per
289    /// transaction. The event from `self` will appear at the left
290    /// input of the combining function, and the event from `s2` will
291    /// appear at the right.
292    pub fn merge<FN: IsLambda2<A, A, A> + Send + Sync + 'static>(
293        &self,
294        s2: &Stream<A>,
295        f: FN,
296    ) -> Stream<A> {
297        Stream {
298            impl_: self.impl_.merge(&s2.impl_, f),
299        }
300    }
301
302    /// Returns a cell with the specified initial value, which is
303    /// updated by this stream's event values.
304    pub fn hold(&self, a: A) -> Cell<A> {
305        Cell {
306            impl_: self.impl_.hold(a),
307        }
308    }
309
310    /// A variant of [`hold`][Stream::hold] that uses an initial value
311    /// returned by [`Cell::sample_lazy`].
312    pub fn hold_lazy(&self, a: Lazy<A>) -> Cell<A> {
313        Cell {
314            impl_: self.impl_.hold_lazy(a),
315        }
316    }
317
318    /// Return a stream that only outputs events from the input stream
319    /// when the specified cell's value is true.
320    pub fn gate(&self, cpred: &Cell<bool>) -> Stream<A> {
321        let cpred = cpred.clone();
322        let cpred_dep = cpred.to_dep();
323        self.filter(lambda1(move |_: &A| cpred.sample(), vec![cpred_dep]))
324    }
325
326    /// Return a stream that outputs only one value, which is the next
327    /// event of the input stream, starting from the transaction in
328    /// `once` was invoked.
329    pub fn once(&self) -> Stream<A> {
330        Stream {
331            impl_: self.impl_.once(),
332        }
333    }
334
335    /// Transform an event with a generalized state loop (a Mealy
336    /// machine). The function is passed the input and the old state
337    /// and returns the new state and output value.
338    pub fn collect<B, S, F>(&self, init_state: S, f: F) -> Stream<B>
339    where
340        B: Send + Clone + 'static,
341        S: Send + Clone + 'static,
342        F: IsLambda2<A, S, (B, S)> + Send + Sync + 'static,
343    {
344        self.collect_lazy(Lazy::new(move || init_state.clone()), f)
345    }
346
347    /// A variant of [`collect`][Stream::collect] that takes an
348    /// initial state that is returned by [`Cell::sample_lazy`].
349    pub fn collect_lazy<B, S, F>(&self, init_state: Lazy<S>, f: F) -> Stream<B>
350    where
351        B: Send + Clone + 'static,
352        S: Send + Clone + 'static,
353        F: IsLambda2<A, S, (B, S)> + Send + Sync + 'static,
354    {
355        Stream {
356            impl_: self.impl_.collect_lazy(init_state, f),
357        }
358    }
359
360    /// Accumulate on an input event, outputting the new state each time.
361    ///
362    /// As each event is received, the accumulating function `f` is
363    /// called with the current state and the new event value. The
364    /// accumulating function may construct FRP logic or use
365    /// [`Cell::sample`], in which case it's equivalent to
366    /// [`snapshot`][Stream::snapshot]ing the cell. In additon, the
367    /// function must be referentially transparent.
368    pub fn accum<S, F>(&self, init_state: S, f: F) -> Cell<S>
369    where
370        S: Send + Clone + 'static,
371        F: IsLambda2<A, S, S> + Send + Sync + 'static,
372    {
373        self.accum_lazy(Lazy::new(move || init_state.clone()), f)
374    }
375
376    /// A variant of [`accum`][Stream::accum] that takes an initial
377    /// state returned by [`Cell::sample_lazy`].
378    pub fn accum_lazy<S, F>(&self, init_state: Lazy<S>, f: F) -> Cell<S>
379    where
380        S: Send + Clone + 'static,
381        F: IsLambda2<A, S, S> + Send + Sync + 'static,
382    {
383        Cell {
384            impl_: self.impl_.accum_lazy(init_state, f),
385        }
386    }
387
388    /// A variant of [`listen`][Stream::listen] that will deregister
389    /// the listener automatically if the listener is
390    /// garbage-collected.
391    ///
392    /// With [`listen`][Stream::listen] the listener is only
393    /// deregistered if [`Listener::unlisten`] is called explicitly.
394    pub fn listen_weak<K: IsLambda1<A, ()> + Send + Sync + 'static>(&self, k: K) -> Listener {
395        Listener {
396            impl_: self.impl_.listen_weak(k),
397        }
398    }
399
400    /// Listen for events/firings on this stream.
401    ///
402    /// This is the observer pattern. The returned [`Listener`] has an
403    /// [`unlisten`][Listener::unlisten] method to cause the listener
404    /// to be removed. This is an operational mechanism for
405    /// interfacing between the world of I/O and FRP.
406    ///
407    /// The handler function for this listener should make no
408    /// assumptions about what thread it will be called on, and the
409    /// handler should not block. It also is not allowed to use
410    /// [`CellSink::send`][crate::CellSink::send] or
411    /// [`StreamSink::send`][crate::StreamSink::send] in the handler.
412    pub fn listen<K: IsLambda1<A, ()> + Send + Sync + 'static>(&self, k: K) -> Listener {
413        Listener {
414            impl_: self.impl_.listen(k),
415        }
416    }
417}