reactive_rs/
stream.rs

1use std::borrow::Borrow;
2use std::cell::RefCell;
3use std::iter::Iterator;
4use std::rc::Rc;
5
6#[cfg(any(test, feature = "slice-deque"))]
7use slice_deque::SliceDeque;
8
9/// A stream of context/value pairs that can be subscribed to.
10///
11/// Note: in order to use stream trait methods, this trait
12/// must imported into the current scope.
13pub trait Stream<'a>: Sized {
14    /// The type of the context attached to emitted elements.
15    ///
16    /// Can be set to `()` to ignore the context part of the stream.
17    type Context: ?Sized;
18
19    /// The type of the elements being emitted.
20    type Item: ?Sized;
21
22    /// Attaches an observer (a user-provided mutable closure) to the
23    /// stream, which consumes the stream object.
24    ///
25    /// A stream with no observer attached is essentially just
26    /// a function of an observer; it will not react to incoming events
27    /// until it is subscribed to.
28    ///
29    /// # Examples
30    ///
31    /// ```
32    /// # use reactive_rs::*; use std::cell::RefCell;
33    /// let out = RefCell::new(Vec::new());
34    /// let stream = SimpleBroadcast::<i64>::new();
35    /// stream
36    ///     .clone()
37    ///     .filter(|x| x % 2 != 0)
38    ///     .subscribe(|x| out.borrow_mut().push(*x));
39    /// stream.feed(0..=5);
40    /// assert_eq!(&*out.borrow(), &[1, 3, 5]);
41    /// ```
42    fn subscribe<O>(self, mut observer: O)
43    where
44        O: 'a + FnMut(&Self::Item),
45    {
46        self.subscribe_ctx(move |_ctx, item| observer(item))
47    }
48
49    /// Same as `subscribe()`, but the closure receives two arguments
50    /// (context/value), by reference.
51    ///
52    /// # Examples
53    ///
54    /// ```
55    /// # use reactive_rs::*; use std::cell::Cell;
56    /// let result = Cell::new((0, 0.));
57    /// let stream = Broadcast::<i32, f64>::new();
58    /// stream
59    ///     .clone()
60    ///     .map_ctx(|c, x| (*c as f64) + *x)
61    ///     .subscribe_ctx(|c, x| result.set((*c, *x)));
62    /// stream.send_ctx(3, 7.5);
63    /// assert_eq!(result.get(), (3, 10.5));
64    /// ```
65    fn subscribe_ctx<O>(self, observer: O)
66    where
67        O: 'a + FnMut(&Self::Context, &Self::Item);
68
69    /// Create a broadcast from a stream, enabling multiple observers. This is the only
70    /// `Stream` trait method that incurs a slight runtime cost, due to the broadcast
71    /// object having to store observers as boxed trait objects in a reference-counted
72    /// container; all other methods can be inlined.
73    ///
74    /// Note: this is equivalent to creating a broadcast via
75    /// [`from_stream()`](struct.Broadcast.html#provided-methods)
76    /// constructor.
77    fn broadcast(self) -> Broadcast<'a, Self::Context, Self::Item>
78    where
79        Self: 'a,
80    {
81        Broadcast::from_stream(self)
82    }
83
84    /// Convenience method to extract the context into a separate stream.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// # use reactive_rs::*;
90    /// let stream = Broadcast::<i32, String>::new();
91    /// let double_ctx = stream.ctx().map(|x| x * 2);
92    /// ```
93    ///
94    /// # Notes
95    ///
96    /// - Resulting stream's context/value will reference the same object
97    ///   (original stream's context).
98    /// - The return value is a `Stream` object (both context/value types
99    ///   are the original stream's context type).
100    fn ctx(self) -> Context<Self> {
101        Context { stream: self }
102    }
103
104    /// Set the context to a fixed constant value.
105    ///
106    /// # Examples
107    ///
108    /// ```
109    /// # use reactive_rs::*;
110    /// let stream = SimpleBroadcast::<String>::new();
111    /// let stream_with_ctx = stream.with_ctx(42);
112    /// ```
113    ///
114    /// # Notes
115    ///
116    /// - The value is passed down unchanged.
117    /// - The return value is a `Stream` object (context type is the type
118    ///   of the provided value; value type is unchanged).
119    fn with_ctx<T>(self, ctx: T) -> WithContext<Self, T> {
120        WithContext { stream: self, ctx }
121    }
122
123    /// Creates a new stream which calls a closure on each context/value and uses
124    /// that as the context.
125    ///
126    /// # Examples
127    ///
128    /// ```
129    /// # use reactive_rs::*;
130    /// let stream = SimpleBroadcast::<String>::new();
131    /// let string_and_len = stream.with_ctx_map(|_, s| s.len());
132    /// ```
133    ///
134    /// # Notes
135    ///
136    /// - The value is passed down unchanged.
137    /// - The closure receives all of its arguments by reference.
138    /// - The return value is a `Stream` object (context type is the return
139    ///   type of the closure; value type is unchanged).
140    fn with_ctx_map<F, T>(self, func: F) -> WithContextMap<Self, F>
141    where
142        F: 'a + FnMut(&Self::Context, &Self::Item) -> T,
143    {
144        WithContextMap { stream: self, func }
145    }
146
147    /// Creates a new stream which calls a closure on each element and uses
148    /// that as the value.
149    ///
150    /// # Examples
151    ///
152    /// ```
153    /// # use reactive_rs::*;
154    /// let stream = SimpleBroadcast::<String>::new();
155    /// let contains_foo = stream.map(|s| s.contains("foo"));
156    /// ```
157    ///
158    /// # Notes
159    ///
160    /// - The context is passed down unchanged.
161    /// - The closure receives its argument by reference.
162    /// - The return value is a `Stream` object (context type is unchanged;
163    ///   value type is the return type of the closure).
164    fn map<F, T>(self, func: F) -> Map<Self, NoContext<F>>
165    where
166        F: 'a + FnMut(&Self::Item) -> T,
167    {
168        Map { stream: self, func: NoContext(func) }
169    }
170
171    /// Same as `map()`, but the closure receives two arguments
172    /// (context/value), by reference.
173    ///
174    /// # Examples
175    ///
176    /// ```
177    /// # use reactive_rs::*;
178    /// let stream = Broadcast::<bool, i32>::new();
179    /// let div2 = stream.map_ctx(|c, x| (*x % 2 == 0) == *c);
180    /// ```
181    fn map_ctx<F, T>(self, func: F) -> Map<Self, F>
182    where
183        F: 'a + FnMut(&Self::Context, &Self::Item) -> T,
184    {
185        Map { stream: self, func }
186    }
187
188    /// Same as `map()`, but the closure is expected to return a `(context, value)`
189    /// tuple, so that both the context and the value can be changed at the same time.
190    ///
191    /// # Examples
192    ///
193    /// ```
194    /// # use reactive_rs::*; type A = i32; type B = u32;
195    /// let stream = SimpleBroadcast::<i32>::new();
196    /// let string_context = stream.map_both(|x| (x.to_string(), *x));
197    /// ```
198    ///
199    /// # Notes
200    ///
201    /// - The context and the value are changed simultaneously.
202    /// - The closure receives its argument by reference.
203    /// - The return value is a `Stream` object (context type and value type
204    ///   depend on the return type of the closure).
205    fn map_both<F, C, T>(self, func: F) -> MapBoth<Self, NoContext<F>>
206    where
207        F: 'a + FnMut(&Self::Item) -> (C, T),
208    {
209        MapBoth { stream: self, func: NoContext(func) }
210    }
211
212    /// Same as `map_both()`, but the closure receives two arguments
213    /// (context/value), by reference.
214    ///
215    /// # Examples
216    ///
217    /// ```
218    /// # use reactive_rs::*; type A = i32; type B = u32;
219    /// let stream = Broadcast::<A, B>::new();
220    /// let swapped = stream.map_both_ctx(|a, b| (*a, *b));
221    /// ```
222    fn map_both_ctx<F, C, T>(self, func: F) -> MapBoth<Self, F>
223    where
224        F: 'a + FnMut(&Self::Context, &Self::Item) -> (C, T),
225    {
226        MapBoth { stream: self, func }
227    }
228
229    /// Creates a stream which uses a closure to determine if an element should be
230    /// yielded.
231    ///
232    /// The closure must return `true` or `false` and is called on each element of the
233    /// original stream. If `true` is returned, the element is passed downstream.
234    ///
235    /// # Examples
236    ///
237    /// ```
238    /// # use reactive_rs::*;
239    /// let stream = SimpleBroadcast::<Vec<i32>>::new();
240    /// let non_empty = stream.filter(|v| !v.is_empty());
241    /// ```
242    ///
243    /// # Notes
244    ///
245    /// - The context is passed down unchanged.
246    /// - The closure receives its argument by reference.
247    /// - The return value is a `Stream` object (same context type and
248    ///   value type as the original stream).
249    fn filter<F>(self, func: F) -> Filter<Self, NoContext<F>>
250    where
251        F: 'a + FnMut(&Self::Item) -> bool,
252    {
253        Filter { stream: self, func: NoContext(func) }
254    }
255
256    /// Same as `filter()`, but the closure receives two arguments
257    /// (context/value), by reference.
258    ///
259    /// # Examples
260    ///
261    /// ```
262    /// # use reactive_rs::*;
263    /// let stream = Broadcast::<usize, Vec<i32>>::new();
264    /// let filter_len = stream.filter_ctx(|ctx, v| v.len() == *ctx);
265    /// ```
266    fn filter_ctx<F>(self, func: F) -> Filter<Self, F>
267    where
268        F: 'a + FnMut(&Self::Context, &Self::Item) -> bool,
269    {
270        Filter { stream: self, func }
271    }
272
273    /// Creates a stream that both filters and maps.
274    ///
275    /// The closure must return an `Option<T>`. If it returns `Some(element)`, then
276    /// that element is returned; otherwise it is skipped.
277    ///
278    /// # Examples
279    ///
280    /// ```
281    /// # use reactive_rs::*;
282    /// let stream = SimpleBroadcast::<String>::new();
283    /// let valid_ints = stream.filter_map(|s| s.parse::<i64>().ok());
284    /// ```
285    ///
286    /// # Notes
287    ///
288    /// - The context is passed down unchanged.
289    /// - The closure receives its argument by reference.
290    /// - The return value is a `Stream` object (context type is unchanged;
291    ///   value type is the is `T` if the return type of the closure is `Option<T>`).
292    fn filter_map<F, T>(self, func: F) -> FilterMap<Self, NoContext<F>>
293    where
294        F: 'a + FnMut(&Self::Item) -> Option<T>,
295    {
296        FilterMap { stream: self, func: NoContext(func) }
297    }
298
299    /// Same as `filter_map()`, but the closure receives two arguments
300    /// (context/value), by reference.
301    ///
302    /// # Examples
303    ///
304    /// ```
305    /// # use reactive_rs::*;
306    /// let stream = Broadcast::<Option<i64>, String>::new();
307    /// let int_or_ctx = stream.filter_map_ctx(|c, s| s.parse().ok().or(*c));
308    /// ```
309    fn filter_map_ctx<F, T>(self, func: F) -> FilterMap<Self, F>
310    where
311        F: 'a + FnMut(&Self::Context, &Self::Item) -> Option<T>,
312    {
313        FilterMap { stream: self, func }
314    }
315
316    /// 'Reduce' operation on streams.
317    ///
318    /// This method takes two arguments: an initial value, and a closure with
319    /// two arguments: an accumulator and an element. The closure returns the
320    /// value that the accumulator should have for the next iteration; the
321    /// initial value is the value the accumulator will have on the first call.
322    ///
323    /// # Examples
324    ///
325    /// ```
326    /// # use reactive_rs::*;
327    /// let stream = SimpleBroadcast::<i32>::new();
328    /// let cum_sum = stream.fold(0, |acc, x| acc + x);
329    /// ```
330    ///
331    /// # Notes
332    ///
333    /// - The context is passed down unchanged.
334    /// - The closure receives all of its arguments by reference.
335    /// - The return value is a `Stream` object (context type is unchanged;
336    ///   value type is the accumulator type).
337    fn fold<F, T: 'a>(self, init: T, func: F) -> Fold<Self, NoContext<F>, T>
338    where
339        F: 'a + FnMut(&T, &Self::Item) -> T,
340    {
341        Fold { stream: self, init, func: NoContext(func) }
342    }
343
344    /// Same as `fold()`, but the closure receives three arguments
345    /// (context/accumulator/value), by reference.
346    ///
347    /// # Examples
348    ///
349    /// ```
350    /// # use reactive_rs::*;
351    /// let stream = Broadcast::<i32, i32>::new();
352    /// let bnd_sum = stream.fold_ctx(0, |c, acc, x| *c.min(&(acc + x)));
353    /// ```
354    fn fold_ctx<F, T: 'a>(self, init: T, func: F) -> Fold<Self, F, T>
355    where
356        F: 'a + FnMut(&Self::Context, &T, &Self::Item) -> T,
357    {
358        Fold { stream: self, init, func }
359    }
360
361    /// Do something with each element of a stream, passing the value on.
362    ///
363    /// The closure will only be called if the stream is actually
364    /// subscribed to (just calling `inspect()` does nothing on its own).
365    ///
366    /// # Examples
367    ///
368    /// ```
369    /// # use reactive_rs::*;
370    /// let stream = SimpleBroadcast::<String>::new();
371    /// let stream = stream.inspect(|x| println!("{:?}", x));
372    /// ```
373    ///
374    /// # Notes
375    ///
376    /// - Both context/value are passed down unchanged.
377    /// - The closure receives its argument by reference.
378    /// - The return value is a `Stream` object (same context type and
379    ///   value type as the original stream).
380    fn inspect<F>(self, func: F) -> Inspect<Self, NoContext<F>>
381    where
382        F: 'a + FnMut(&Self::Item),
383    {
384        Inspect { stream: self, func: NoContext(func) }
385    }
386
387    /// Same as `inspect()`, but the closure receives two arguments
388    /// (context/value), by reference.
389    ///
390    /// # Examples
391    ///
392    /// ```
393    /// # use reactive_rs::*;
394    /// let stream = Broadcast::<i32, String>::new();
395    /// let stream = stream.inspect_ctx(|c, x| println!("{} {}", c, x));
396    /// ```
397    fn inspect_ctx<F>(self, func: F) -> Inspect<Self, F>
398    where
399        F: 'a + FnMut(&Self::Context, &Self::Item),
400    {
401        Inspect { stream: self, func }
402    }
403
404    /// Creates a stream that caches up to `n` last elements.
405    ///
406    /// The elements are stored in a contiguous double-ended
407    /// queue provided via [`slice-deque`](https://crates.io/crates/slice-deque)
408    /// crate. The output stream yields slice views into this queue.
409    ///
410    /// # Examples
411    ///
412    /// ```
413    /// # use reactive_rs::*;
414    /// let stream = SimpleBroadcast::<i64>::new();
415    /// let last_3 = stream.last_n(3);
416    /// ```
417    ///
418    /// # Notes
419    ///
420    /// - The context is passed down unchanged (only values are cached).
421    /// - The return value is a `Stream` object (context type is unchanged;
422    ///   value type is `[T]` where `T` is the original value type).
423    /// - Slices may contain less than `n` elements (while the queue
424    ///   is being filled up initially).
425    /// - The value type of the original stream must implement `Clone`.
426    /// - This method is only present if `feature = "slice-deque"` is
427    ///   enabled (on by default).
428    #[cfg(any(test, feature = "slice-deque"))]
429    fn last_n(self, count: usize) -> LastN<Self, Self::Item>
430    where
431        Self::Item: 'a + Clone + Sized,
432    {
433        LastN { count, stream: self, data: Rc::new(RefCell::new(SliceDeque::with_capacity(count))) }
434    }
435}
436
437pub trait ContextFn<C: ?Sized, T: ?Sized> {
438    type Output;
439
440    fn call_mut(&mut self, ctx: &C, item: &T) -> Self::Output;
441}
442
443impl<C: ?Sized, T: ?Sized, V, F> ContextFn<C, T> for F
444where
445    F: FnMut(&C, &T) -> V,
446{
447    type Output = V;
448
449    #[inline(always)]
450    fn call_mut(&mut self, ctx: &C, item: &T) -> Self::Output {
451        self(ctx, item)
452    }
453}
454
455pub trait ContextFoldFn<C: ?Sized, T: ?Sized, V> {
456    type Output;
457
458    fn call_mut(&mut self, ctx: &C, acc: &V, item: &T) -> Self::Output;
459}
460
461impl<C: ?Sized, T: ?Sized, V, F> ContextFoldFn<C, T, V> for F
462where
463    F: FnMut(&C, &V, &T) -> V,
464{
465    type Output = V;
466
467    #[inline(always)]
468    fn call_mut(&mut self, ctx: &C, acc: &V, item: &T) -> Self::Output {
469        self(ctx, acc, item)
470    }
471}
472
473pub struct NoContext<F>(F);
474
475impl<F, C: ?Sized, T: ?Sized, V> ContextFn<C, T> for NoContext<F>
476where
477    F: FnMut(&T) -> V,
478{
479    type Output = V;
480
481    #[inline(always)]
482    fn call_mut(&mut self, _ctx: &C, item: &T) -> Self::Output {
483        (self.0)(item)
484    }
485}
486
487impl<F, C: ?Sized, T: ?Sized, V> ContextFoldFn<C, T, V> for NoContext<F>
488where
489    F: FnMut(&V, &T) -> V,
490{
491    type Output = V;
492
493    #[inline(always)]
494    fn call_mut(&mut self, _ctx: &C, acc: &Self::Output, item: &T) -> Self::Output {
495        (self.0)(acc, item)
496    }
497}
498
499type Callback<'a, C, T> = Box<'a + FnMut(&C, &T)>;
500
501/// Event source that transmits context/value pairs to multiple observers.
502///
503/// In order to "fork" the broadcast (creating a new stream that will
504/// be subscribed to it), the broadcast object can be simply cloned
505/// via the `Clone` trait. Note that cloning the broadcast only
506/// increases its reference count; no values are being cloned or copied.
507///
508/// A broadcast may receive a value in one of two ways. First, the user
509/// may explicitly call one of its methods: `send()`,  `send_ctx()`,
510/// `feed()`, `feed_ctx()`. Second, the broadcast may be created
511/// from a parent stream via [`broadcast()`](trait.Stream.html#provided-methods)
512/// method of the stream object. Either way, each context/value pair received
513/// is passed on to each of the subscribed observers, by reference.
514///
515/// # Examples
516///
517/// ```
518/// # use reactive_rs::*; use std::cell::RefCell; use std::rc::Rc;
519/// let out = RefCell::new(Vec::new());
520/// let stream = SimpleBroadcast::<i32>::new();
521/// let child1 = stream
522///     .clone()
523///     .subscribe(|x| out.borrow_mut().push(*x + 1));
524/// let child2 = stream
525///     .clone()
526///     .subscribe(|x| out.borrow_mut().push(*x + 7));
527/// stream.feed(1..=3);
528/// assert_eq!(&*out.borrow(), &[2, 8, 3, 9, 4, 10]);
529/// ```
530pub struct Broadcast<'a, C: ?Sized, T: ?Sized> {
531    observers: Rc<RefCell<Vec<Callback<'a, C, T>>>>,
532}
533
534impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Broadcast<'a, C, T> {
535    /// Creates a new broadcast with specified context and item types.
536    pub fn new() -> Self {
537        Self { observers: Rc::new(RefCell::new(Vec::new())) }
538    }
539
540    /// Create a broadcast from a stream, enabling multiple observers
541    /// ("fork" the stream).
542    ///
543    /// Note: this is equivalent to calling
544    /// [`broadcast()`](trait.Stream.html#provided-methods) on the stream object.
545    pub fn from_stream<S>(stream: S) -> Self
546    where
547        S: Stream<'a, Context = C, Item = T>,
548    {
549        let broadcast = Self::new();
550        let clone = broadcast.clone();
551        stream.subscribe_ctx(move |ctx, x| clone.send_ctx(ctx, x));
552        broadcast
553    }
554
555    fn push<F>(&self, func: F)
556    where
557        F: FnMut(&C, &T) + 'a,
558    {
559        self.observers.borrow_mut().push(Box::new(func));
560    }
561
562    /// Send a value along with context to all observers of the broadcast.
563    pub fn send_ctx<K, B>(&self, ctx: K, value: B)
564    where
565        K: Borrow<C>,
566        B: Borrow<T>,
567    {
568        let ctx = ctx.borrow();
569        let value = value.borrow();
570        for observer in self.observers.borrow_mut().iter_mut() {
571            observer(ctx, value);
572        }
573    }
574
575    /// Similar to `send_ctx()`, but the context is set to the type's default value.
576    pub fn send<B>(&self, value: B)
577    where
578        B: Borrow<T>,
579        C: Default,
580    {
581        let ctx = C::default();
582        self.send_ctx(&ctx, value);
583    }
584
585    /// Convenience method to feed an iterator of values to all observers of the
586    /// broadcast, along with a given context.
587    pub fn feed_ctx<K, B, I>(&self, ctx: K, iter: I)
588    where
589        K: Borrow<C>,
590        I: Iterator<Item = B>,
591        B: Borrow<T>,
592    {
593        let ctx = ctx.borrow();
594        for value in iter {
595            self.send_ctx(ctx, value);
596        }
597    }
598
599    /// Similar to `feed_ctx()`, but the context is set to the type's default value.
600    pub fn feed<B, I>(&self, iter: I)
601    where
602        I: Iterator<Item = B>,
603        B: Borrow<T>,
604        C: Default,
605    {
606        let ctx = C::default();
607        self.feed_ctx(&ctx, iter);
608    }
609}
610
611/// Simplified broadcast that only transmits values without context.
612pub type SimpleBroadcast<'a, T> = Broadcast<'a, (), T>;
613
614impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Default for Broadcast<'a, C, T> {
615    fn default() -> Self {
616        Self::new()
617    }
618}
619
620impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Clone for Broadcast<'a, C, T> {
621    fn clone(&self) -> Self {
622        Self { observers: self.observers.clone() }
623    }
624}
625
626impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Stream<'a> for Broadcast<'a, C, T> {
627    type Context = C;
628    type Item = T;
629
630    fn subscribe_ctx<O>(self, observer: O)
631    where
632        O: FnMut(&Self::Context, &Self::Item) + 'a,
633    {
634        self.push(observer);
635    }
636}
637
638pub struct WithContext<S, T> {
639    stream: S,
640    ctx: T,
641}
642
643impl<'a, S, T: 'a> Stream<'a> for WithContext<S, T>
644where
645    S: Stream<'a>,
646{
647    type Context = T;
648    type Item = S::Item;
649
650    fn subscribe_ctx<O>(self, mut observer: O)
651    where
652        O: FnMut(&Self::Context, &Self::Item) + 'a,
653    {
654        let ctx = self.ctx;
655        self.stream.subscribe_ctx(move |_ctx, x| {
656            observer(&ctx, x);
657        })
658    }
659}
660
661pub struct WithContextMap<S, F> {
662    stream: S,
663    func: F,
664}
665
666impl<'a, S, F, T> Stream<'a> for WithContextMap<S, F>
667where
668    S: Stream<'a>,
669    F: 'a + FnMut(&S::Context, &S::Item) -> T,
670{
671    type Context = T;
672    type Item = S::Item;
673
674    fn subscribe_ctx<O>(self, mut observer: O)
675    where
676        O: FnMut(&Self::Context, &Self::Item) + 'a,
677    {
678        let mut func = self.func;
679        self.stream.subscribe_ctx(move |ctx, x| {
680            observer(&func(ctx, x), x);
681        })
682    }
683}
684
685pub struct Context<S> {
686    stream: S,
687}
688
689impl<'a, S> Stream<'a> for Context<S>
690where
691    S: Stream<'a>,
692{
693    type Context = S::Context;
694    type Item = S::Context;
695
696    fn subscribe_ctx<O>(self, mut observer: O)
697    where
698        O: FnMut(&Self::Context, &Self::Item) + 'a,
699    {
700        self.stream.subscribe_ctx(move |ctx, _x| {
701            observer(ctx, ctx);
702        })
703    }
704}
705
706pub struct Map<S, F> {
707    stream: S,
708    func: F,
709}
710
711impl<'a, S, F> Stream<'a> for Map<S, F>
712where
713    S: Stream<'a>,
714    F: 'a + ContextFn<S::Context, S::Item>,
715{
716    type Context = S::Context;
717    type Item = F::Output;
718
719    fn subscribe_ctx<O>(self, mut observer: O)
720    where
721        O: FnMut(&Self::Context, &Self::Item) + 'a,
722    {
723        let mut func = self.func;
724        self.stream.subscribe_ctx(move |ctx, x| observer(ctx, &func.call_mut(ctx, x)))
725    }
726}
727
728pub struct MapBoth<S, F> {
729    stream: S,
730    func: F,
731}
732
733impl<'a, S, F, C, T> Stream<'a> for MapBoth<S, F>
734where
735    S: Stream<'a>,
736    F: 'a + ContextFn<S::Context, S::Item, Output = (C, T)>,
737{
738    type Context = C;
739    type Item = T;
740
741    fn subscribe_ctx<O>(self, mut observer: O)
742    where
743        O: FnMut(&Self::Context, &Self::Item) + 'a,
744    {
745        let mut func = self.func;
746        self.stream.subscribe_ctx(move |ctx, x| {
747            let (ctx, x) = func.call_mut(ctx, x);
748            observer(&ctx, &x);
749        })
750    }
751}
752
753pub struct Filter<S, F> {
754    stream: S,
755    func: F,
756}
757
758impl<'a, S, F> Stream<'a> for Filter<S, F>
759where
760    S: Stream<'a>,
761    F: 'a + ContextFn<S::Context, S::Item, Output = bool>,
762{
763    type Context = S::Context;
764    type Item = S::Item;
765
766    fn subscribe_ctx<O>(self, mut observer: O)
767    where
768        O: 'a + FnMut(&Self::Context, &Self::Item),
769    {
770        let mut func = self.func;
771        self.stream.subscribe_ctx(move |ctx, x| {
772            if func.call_mut(ctx, x) {
773                observer(ctx, x);
774            }
775        });
776    }
777}
778
779pub struct FilterMap<S, F> {
780    stream: S,
781    func: F,
782}
783
784impl<'a, S, F, T> Stream<'a> for FilterMap<S, F>
785where
786    S: Stream<'a>,
787    F: 'a + ContextFn<S::Context, S::Item, Output = Option<T>>,
788{
789    type Context = S::Context;
790    type Item = T;
791
792    fn subscribe_ctx<O>(self, mut observer: O)
793    where
794        O: 'a + FnMut(&Self::Context, &Self::Item),
795    {
796        let mut func = self.func;
797        self.stream.subscribe_ctx(move |ctx, x| {
798            if let Some(x) = func.call_mut(ctx, x) {
799                observer(ctx, &x);
800            }
801        });
802    }
803}
804
805pub struct Fold<S, F, T> {
806    stream: S,
807    init: T,
808    func: F,
809}
810
811impl<'a, S, F, T: 'a> Stream<'a> for Fold<S, F, T>
812where
813    S: Stream<'a>,
814    F: 'a + ContextFoldFn<S::Context, S::Item, T, Output = T>,
815{
816    type Context = S::Context;
817    type Item = T;
818
819    fn subscribe_ctx<O>(self, mut observer: O)
820    where
821        O: FnMut(&Self::Context, &Self::Item) + 'a,
822    {
823        let mut value = self.init;
824        let mut func = self.func;
825        self.stream.subscribe_ctx(move |ctx, x| {
826            value = func.call_mut(ctx, &value, x);
827            observer(ctx, &value);
828        })
829    }
830}
831
832pub struct Inspect<S, F> {
833    stream: S,
834    func: F,
835}
836
837impl<'a, S, F> Stream<'a> for Inspect<S, F>
838where
839    S: Stream<'a>,
840    F: 'a + ContextFn<S::Context, S::Item, Output = ()>,
841{
842    type Context = S::Context;
843    type Item = S::Item;
844
845    fn subscribe_ctx<O>(self, mut observer: O)
846    where
847        O: FnMut(&Self::Context, &Self::Item) + 'a,
848    {
849        let mut func = self.func;
850        self.stream.subscribe_ctx(move |ctx, x| {
851            func.call_mut(ctx, x);
852            observer(ctx, x);
853        })
854    }
855}
856
857#[cfg(any(test, feature = "slice-deque"))]
858pub struct LastN<S, T: Sized> {
859    count: usize,
860    stream: S,
861    data: Rc<RefCell<SliceDeque<T>>>,
862}
863
864#[cfg(any(test, feature = "slice-deque"))]
865impl<'a, S, T> Stream<'a> for LastN<S, T>
866where
867    S: Stream<'a, Item = T>,
868    T: 'a + Clone + Sized,
869{
870    type Context = S::Context;
871    type Item = [T];
872
873    fn subscribe_ctx<O>(self, mut observer: O)
874    where
875        O: 'a + FnMut(&Self::Context, &Self::Item),
876    {
877        let data = self.data.clone();
878        let count = self.count;
879        self.stream.subscribe_ctx(move |ctx, x| {
880            let mut queue = data.borrow_mut();
881            if queue.len() == count {
882                queue.pop_front();
883            }
884            queue.push_back(x.clone());
885            drop(queue); // this is important, in order to avoid multiple mutable borrows
886            observer(ctx, &*data.as_ref().borrow());
887        })
888    }
889}