reactive_rs/stream.rs
1use std::borrow::Borrow;
2use std::cell::RefCell;
3use std::iter::Iterator;
4use std::rc::Rc;
5
6#[cfg(any(test, feature = "slice-deque"))]
7use slice_deque::SliceDeque;
8
9/// A stream of context/value pairs that can be subscribed to.
10///
11/// Note: in order to use stream trait methods, this trait
12/// must imported into the current scope.
13pub trait Stream<'a>: Sized {
14 /// The type of the context attached to emitted elements.
15 ///
16 /// Can be set to `()` to ignore the context part of the stream.
17 type Context: ?Sized;
18
19 /// The type of the elements being emitted.
20 type Item: ?Sized;
21
22 /// Attaches an observer (a user-provided mutable closure) to the
23 /// stream, which consumes the stream object.
24 ///
25 /// A stream with no observer attached is essentially just
26 /// a function of an observer; it will not react to incoming events
27 /// until it is subscribed to.
28 ///
29 /// # Examples
30 ///
31 /// ```
32 /// # use reactive_rs::*; use std::cell::RefCell;
33 /// let out = RefCell::new(Vec::new());
34 /// let stream = SimpleBroadcast::<i64>::new();
35 /// stream
36 /// .clone()
37 /// .filter(|x| x % 2 != 0)
38 /// .subscribe(|x| out.borrow_mut().push(*x));
39 /// stream.feed(0..=5);
40 /// assert_eq!(&*out.borrow(), &[1, 3, 5]);
41 /// ```
42 fn subscribe<O>(self, mut observer: O)
43 where
44 O: 'a + FnMut(&Self::Item),
45 {
46 self.subscribe_ctx(move |_ctx, item| observer(item))
47 }
48
49 /// Same as `subscribe()`, but the closure receives two arguments
50 /// (context/value), by reference.
51 ///
52 /// # Examples
53 ///
54 /// ```
55 /// # use reactive_rs::*; use std::cell::Cell;
56 /// let result = Cell::new((0, 0.));
57 /// let stream = Broadcast::<i32, f64>::new();
58 /// stream
59 /// .clone()
60 /// .map_ctx(|c, x| (*c as f64) + *x)
61 /// .subscribe_ctx(|c, x| result.set((*c, *x)));
62 /// stream.send_ctx(3, 7.5);
63 /// assert_eq!(result.get(), (3, 10.5));
64 /// ```
65 fn subscribe_ctx<O>(self, observer: O)
66 where
67 O: 'a + FnMut(&Self::Context, &Self::Item);
68
69 /// Create a broadcast from a stream, enabling multiple observers. This is the only
70 /// `Stream` trait method that incurs a slight runtime cost, due to the broadcast
71 /// object having to store observers as boxed trait objects in a reference-counted
72 /// container; all other methods can be inlined.
73 ///
74 /// Note: this is equivalent to creating a broadcast via
75 /// [`from_stream()`](struct.Broadcast.html#provided-methods)
76 /// constructor.
77 fn broadcast(self) -> Broadcast<'a, Self::Context, Self::Item>
78 where
79 Self: 'a,
80 {
81 Broadcast::from_stream(self)
82 }
83
84 /// Convenience method to extract the context into a separate stream.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// # use reactive_rs::*;
90 /// let stream = Broadcast::<i32, String>::new();
91 /// let double_ctx = stream.ctx().map(|x| x * 2);
92 /// ```
93 ///
94 /// # Notes
95 ///
96 /// - Resulting stream's context/value will reference the same object
97 /// (original stream's context).
98 /// - The return value is a `Stream` object (both context/value types
99 /// are the original stream's context type).
100 fn ctx(self) -> Context<Self> {
101 Context { stream: self }
102 }
103
104 /// Set the context to a fixed constant value.
105 ///
106 /// # Examples
107 ///
108 /// ```
109 /// # use reactive_rs::*;
110 /// let stream = SimpleBroadcast::<String>::new();
111 /// let stream_with_ctx = stream.with_ctx(42);
112 /// ```
113 ///
114 /// # Notes
115 ///
116 /// - The value is passed down unchanged.
117 /// - The return value is a `Stream` object (context type is the type
118 /// of the provided value; value type is unchanged).
119 fn with_ctx<T>(self, ctx: T) -> WithContext<Self, T> {
120 WithContext { stream: self, ctx }
121 }
122
123 /// Creates a new stream which calls a closure on each context/value and uses
124 /// that as the context.
125 ///
126 /// # Examples
127 ///
128 /// ```
129 /// # use reactive_rs::*;
130 /// let stream = SimpleBroadcast::<String>::new();
131 /// let string_and_len = stream.with_ctx_map(|_, s| s.len());
132 /// ```
133 ///
134 /// # Notes
135 ///
136 /// - The value is passed down unchanged.
137 /// - The closure receives all of its arguments by reference.
138 /// - The return value is a `Stream` object (context type is the return
139 /// type of the closure; value type is unchanged).
140 fn with_ctx_map<F, T>(self, func: F) -> WithContextMap<Self, F>
141 where
142 F: 'a + FnMut(&Self::Context, &Self::Item) -> T,
143 {
144 WithContextMap { stream: self, func }
145 }
146
147 /// Creates a new stream which calls a closure on each element and uses
148 /// that as the value.
149 ///
150 /// # Examples
151 ///
152 /// ```
153 /// # use reactive_rs::*;
154 /// let stream = SimpleBroadcast::<String>::new();
155 /// let contains_foo = stream.map(|s| s.contains("foo"));
156 /// ```
157 ///
158 /// # Notes
159 ///
160 /// - The context is passed down unchanged.
161 /// - The closure receives its argument by reference.
162 /// - The return value is a `Stream` object (context type is unchanged;
163 /// value type is the return type of the closure).
164 fn map<F, T>(self, func: F) -> Map<Self, NoContext<F>>
165 where
166 F: 'a + FnMut(&Self::Item) -> T,
167 {
168 Map { stream: self, func: NoContext(func) }
169 }
170
171 /// Same as `map()`, but the closure receives two arguments
172 /// (context/value), by reference.
173 ///
174 /// # Examples
175 ///
176 /// ```
177 /// # use reactive_rs::*;
178 /// let stream = Broadcast::<bool, i32>::new();
179 /// let div2 = stream.map_ctx(|c, x| (*x % 2 == 0) == *c);
180 /// ```
181 fn map_ctx<F, T>(self, func: F) -> Map<Self, F>
182 where
183 F: 'a + FnMut(&Self::Context, &Self::Item) -> T,
184 {
185 Map { stream: self, func }
186 }
187
188 /// Same as `map()`, but the closure is expected to return a `(context, value)`
189 /// tuple, so that both the context and the value can be changed at the same time.
190 ///
191 /// # Examples
192 ///
193 /// ```
194 /// # use reactive_rs::*; type A = i32; type B = u32;
195 /// let stream = SimpleBroadcast::<i32>::new();
196 /// let string_context = stream.map_both(|x| (x.to_string(), *x));
197 /// ```
198 ///
199 /// # Notes
200 ///
201 /// - The context and the value are changed simultaneously.
202 /// - The closure receives its argument by reference.
203 /// - The return value is a `Stream` object (context type and value type
204 /// depend on the return type of the closure).
205 fn map_both<F, C, T>(self, func: F) -> MapBoth<Self, NoContext<F>>
206 where
207 F: 'a + FnMut(&Self::Item) -> (C, T),
208 {
209 MapBoth { stream: self, func: NoContext(func) }
210 }
211
212 /// Same as `map_both()`, but the closure receives two arguments
213 /// (context/value), by reference.
214 ///
215 /// # Examples
216 ///
217 /// ```
218 /// # use reactive_rs::*; type A = i32; type B = u32;
219 /// let stream = Broadcast::<A, B>::new();
220 /// let swapped = stream.map_both_ctx(|a, b| (*a, *b));
221 /// ```
222 fn map_both_ctx<F, C, T>(self, func: F) -> MapBoth<Self, F>
223 where
224 F: 'a + FnMut(&Self::Context, &Self::Item) -> (C, T),
225 {
226 MapBoth { stream: self, func }
227 }
228
229 /// Creates a stream which uses a closure to determine if an element should be
230 /// yielded.
231 ///
232 /// The closure must return `true` or `false` and is called on each element of the
233 /// original stream. If `true` is returned, the element is passed downstream.
234 ///
235 /// # Examples
236 ///
237 /// ```
238 /// # use reactive_rs::*;
239 /// let stream = SimpleBroadcast::<Vec<i32>>::new();
240 /// let non_empty = stream.filter(|v| !v.is_empty());
241 /// ```
242 ///
243 /// # Notes
244 ///
245 /// - The context is passed down unchanged.
246 /// - The closure receives its argument by reference.
247 /// - The return value is a `Stream` object (same context type and
248 /// value type as the original stream).
249 fn filter<F>(self, func: F) -> Filter<Self, NoContext<F>>
250 where
251 F: 'a + FnMut(&Self::Item) -> bool,
252 {
253 Filter { stream: self, func: NoContext(func) }
254 }
255
256 /// Same as `filter()`, but the closure receives two arguments
257 /// (context/value), by reference.
258 ///
259 /// # Examples
260 ///
261 /// ```
262 /// # use reactive_rs::*;
263 /// let stream = Broadcast::<usize, Vec<i32>>::new();
264 /// let filter_len = stream.filter_ctx(|ctx, v| v.len() == *ctx);
265 /// ```
266 fn filter_ctx<F>(self, func: F) -> Filter<Self, F>
267 where
268 F: 'a + FnMut(&Self::Context, &Self::Item) -> bool,
269 {
270 Filter { stream: self, func }
271 }
272
273 /// Creates a stream that both filters and maps.
274 ///
275 /// The closure must return an `Option<T>`. If it returns `Some(element)`, then
276 /// that element is returned; otherwise it is skipped.
277 ///
278 /// # Examples
279 ///
280 /// ```
281 /// # use reactive_rs::*;
282 /// let stream = SimpleBroadcast::<String>::new();
283 /// let valid_ints = stream.filter_map(|s| s.parse::<i64>().ok());
284 /// ```
285 ///
286 /// # Notes
287 ///
288 /// - The context is passed down unchanged.
289 /// - The closure receives its argument by reference.
290 /// - The return value is a `Stream` object (context type is unchanged;
291 /// value type is the is `T` if the return type of the closure is `Option<T>`).
292 fn filter_map<F, T>(self, func: F) -> FilterMap<Self, NoContext<F>>
293 where
294 F: 'a + FnMut(&Self::Item) -> Option<T>,
295 {
296 FilterMap { stream: self, func: NoContext(func) }
297 }
298
299 /// Same as `filter_map()`, but the closure receives two arguments
300 /// (context/value), by reference.
301 ///
302 /// # Examples
303 ///
304 /// ```
305 /// # use reactive_rs::*;
306 /// let stream = Broadcast::<Option<i64>, String>::new();
307 /// let int_or_ctx = stream.filter_map_ctx(|c, s| s.parse().ok().or(*c));
308 /// ```
309 fn filter_map_ctx<F, T>(self, func: F) -> FilterMap<Self, F>
310 where
311 F: 'a + FnMut(&Self::Context, &Self::Item) -> Option<T>,
312 {
313 FilterMap { stream: self, func }
314 }
315
316 /// 'Reduce' operation on streams.
317 ///
318 /// This method takes two arguments: an initial value, and a closure with
319 /// two arguments: an accumulator and an element. The closure returns the
320 /// value that the accumulator should have for the next iteration; the
321 /// initial value is the value the accumulator will have on the first call.
322 ///
323 /// # Examples
324 ///
325 /// ```
326 /// # use reactive_rs::*;
327 /// let stream = SimpleBroadcast::<i32>::new();
328 /// let cum_sum = stream.fold(0, |acc, x| acc + x);
329 /// ```
330 ///
331 /// # Notes
332 ///
333 /// - The context is passed down unchanged.
334 /// - The closure receives all of its arguments by reference.
335 /// - The return value is a `Stream` object (context type is unchanged;
336 /// value type is the accumulator type).
337 fn fold<F, T: 'a>(self, init: T, func: F) -> Fold<Self, NoContext<F>, T>
338 where
339 F: 'a + FnMut(&T, &Self::Item) -> T,
340 {
341 Fold { stream: self, init, func: NoContext(func) }
342 }
343
344 /// Same as `fold()`, but the closure receives three arguments
345 /// (context/accumulator/value), by reference.
346 ///
347 /// # Examples
348 ///
349 /// ```
350 /// # use reactive_rs::*;
351 /// let stream = Broadcast::<i32, i32>::new();
352 /// let bnd_sum = stream.fold_ctx(0, |c, acc, x| *c.min(&(acc + x)));
353 /// ```
354 fn fold_ctx<F, T: 'a>(self, init: T, func: F) -> Fold<Self, F, T>
355 where
356 F: 'a + FnMut(&Self::Context, &T, &Self::Item) -> T,
357 {
358 Fold { stream: self, init, func }
359 }
360
361 /// Do something with each element of a stream, passing the value on.
362 ///
363 /// The closure will only be called if the stream is actually
364 /// subscribed to (just calling `inspect()` does nothing on its own).
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// # use reactive_rs::*;
370 /// let stream = SimpleBroadcast::<String>::new();
371 /// let stream = stream.inspect(|x| println!("{:?}", x));
372 /// ```
373 ///
374 /// # Notes
375 ///
376 /// - Both context/value are passed down unchanged.
377 /// - The closure receives its argument by reference.
378 /// - The return value is a `Stream` object (same context type and
379 /// value type as the original stream).
380 fn inspect<F>(self, func: F) -> Inspect<Self, NoContext<F>>
381 where
382 F: 'a + FnMut(&Self::Item),
383 {
384 Inspect { stream: self, func: NoContext(func) }
385 }
386
387 /// Same as `inspect()`, but the closure receives two arguments
388 /// (context/value), by reference.
389 ///
390 /// # Examples
391 ///
392 /// ```
393 /// # use reactive_rs::*;
394 /// let stream = Broadcast::<i32, String>::new();
395 /// let stream = stream.inspect_ctx(|c, x| println!("{} {}", c, x));
396 /// ```
397 fn inspect_ctx<F>(self, func: F) -> Inspect<Self, F>
398 where
399 F: 'a + FnMut(&Self::Context, &Self::Item),
400 {
401 Inspect { stream: self, func }
402 }
403
404 /// Creates a stream that caches up to `n` last elements.
405 ///
406 /// The elements are stored in a contiguous double-ended
407 /// queue provided via [`slice-deque`](https://crates.io/crates/slice-deque)
408 /// crate. The output stream yields slice views into this queue.
409 ///
410 /// # Examples
411 ///
412 /// ```
413 /// # use reactive_rs::*;
414 /// let stream = SimpleBroadcast::<i64>::new();
415 /// let last_3 = stream.last_n(3);
416 /// ```
417 ///
418 /// # Notes
419 ///
420 /// - The context is passed down unchanged (only values are cached).
421 /// - The return value is a `Stream` object (context type is unchanged;
422 /// value type is `[T]` where `T` is the original value type).
423 /// - Slices may contain less than `n` elements (while the queue
424 /// is being filled up initially).
425 /// - The value type of the original stream must implement `Clone`.
426 /// - This method is only present if `feature = "slice-deque"` is
427 /// enabled (on by default).
428 #[cfg(any(test, feature = "slice-deque"))]
429 fn last_n(self, count: usize) -> LastN<Self, Self::Item>
430 where
431 Self::Item: 'a + Clone + Sized,
432 {
433 LastN { count, stream: self, data: Rc::new(RefCell::new(SliceDeque::with_capacity(count))) }
434 }
435}
436
437pub trait ContextFn<C: ?Sized, T: ?Sized> {
438 type Output;
439
440 fn call_mut(&mut self, ctx: &C, item: &T) -> Self::Output;
441}
442
443impl<C: ?Sized, T: ?Sized, V, F> ContextFn<C, T> for F
444where
445 F: FnMut(&C, &T) -> V,
446{
447 type Output = V;
448
449 #[inline(always)]
450 fn call_mut(&mut self, ctx: &C, item: &T) -> Self::Output {
451 self(ctx, item)
452 }
453}
454
455pub trait ContextFoldFn<C: ?Sized, T: ?Sized, V> {
456 type Output;
457
458 fn call_mut(&mut self, ctx: &C, acc: &V, item: &T) -> Self::Output;
459}
460
461impl<C: ?Sized, T: ?Sized, V, F> ContextFoldFn<C, T, V> for F
462where
463 F: FnMut(&C, &V, &T) -> V,
464{
465 type Output = V;
466
467 #[inline(always)]
468 fn call_mut(&mut self, ctx: &C, acc: &V, item: &T) -> Self::Output {
469 self(ctx, acc, item)
470 }
471}
472
473pub struct NoContext<F>(F);
474
475impl<F, C: ?Sized, T: ?Sized, V> ContextFn<C, T> for NoContext<F>
476where
477 F: FnMut(&T) -> V,
478{
479 type Output = V;
480
481 #[inline(always)]
482 fn call_mut(&mut self, _ctx: &C, item: &T) -> Self::Output {
483 (self.0)(item)
484 }
485}
486
487impl<F, C: ?Sized, T: ?Sized, V> ContextFoldFn<C, T, V> for NoContext<F>
488where
489 F: FnMut(&V, &T) -> V,
490{
491 type Output = V;
492
493 #[inline(always)]
494 fn call_mut(&mut self, _ctx: &C, acc: &Self::Output, item: &T) -> Self::Output {
495 (self.0)(acc, item)
496 }
497}
498
499type Callback<'a, C, T> = Box<'a + FnMut(&C, &T)>;
500
501/// Event source that transmits context/value pairs to multiple observers.
502///
503/// In order to "fork" the broadcast (creating a new stream that will
504/// be subscribed to it), the broadcast object can be simply cloned
505/// via the `Clone` trait. Note that cloning the broadcast only
506/// increases its reference count; no values are being cloned or copied.
507///
508/// A broadcast may receive a value in one of two ways. First, the user
509/// may explicitly call one of its methods: `send()`, `send_ctx()`,
510/// `feed()`, `feed_ctx()`. Second, the broadcast may be created
511/// from a parent stream via [`broadcast()`](trait.Stream.html#provided-methods)
512/// method of the stream object. Either way, each context/value pair received
513/// is passed on to each of the subscribed observers, by reference.
514///
515/// # Examples
516///
517/// ```
518/// # use reactive_rs::*; use std::cell::RefCell; use std::rc::Rc;
519/// let out = RefCell::new(Vec::new());
520/// let stream = SimpleBroadcast::<i32>::new();
521/// let child1 = stream
522/// .clone()
523/// .subscribe(|x| out.borrow_mut().push(*x + 1));
524/// let child2 = stream
525/// .clone()
526/// .subscribe(|x| out.borrow_mut().push(*x + 7));
527/// stream.feed(1..=3);
528/// assert_eq!(&*out.borrow(), &[2, 8, 3, 9, 4, 10]);
529/// ```
530pub struct Broadcast<'a, C: ?Sized, T: ?Sized> {
531 observers: Rc<RefCell<Vec<Callback<'a, C, T>>>>,
532}
533
534impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Broadcast<'a, C, T> {
535 /// Creates a new broadcast with specified context and item types.
536 pub fn new() -> Self {
537 Self { observers: Rc::new(RefCell::new(Vec::new())) }
538 }
539
540 /// Create a broadcast from a stream, enabling multiple observers
541 /// ("fork" the stream).
542 ///
543 /// Note: this is equivalent to calling
544 /// [`broadcast()`](trait.Stream.html#provided-methods) on the stream object.
545 pub fn from_stream<S>(stream: S) -> Self
546 where
547 S: Stream<'a, Context = C, Item = T>,
548 {
549 let broadcast = Self::new();
550 let clone = broadcast.clone();
551 stream.subscribe_ctx(move |ctx, x| clone.send_ctx(ctx, x));
552 broadcast
553 }
554
555 fn push<F>(&self, func: F)
556 where
557 F: FnMut(&C, &T) + 'a,
558 {
559 self.observers.borrow_mut().push(Box::new(func));
560 }
561
562 /// Send a value along with context to all observers of the broadcast.
563 pub fn send_ctx<K, B>(&self, ctx: K, value: B)
564 where
565 K: Borrow<C>,
566 B: Borrow<T>,
567 {
568 let ctx = ctx.borrow();
569 let value = value.borrow();
570 for observer in self.observers.borrow_mut().iter_mut() {
571 observer(ctx, value);
572 }
573 }
574
575 /// Similar to `send_ctx()`, but the context is set to the type's default value.
576 pub fn send<B>(&self, value: B)
577 where
578 B: Borrow<T>,
579 C: Default,
580 {
581 let ctx = C::default();
582 self.send_ctx(&ctx, value);
583 }
584
585 /// Convenience method to feed an iterator of values to all observers of the
586 /// broadcast, along with a given context.
587 pub fn feed_ctx<K, B, I>(&self, ctx: K, iter: I)
588 where
589 K: Borrow<C>,
590 I: Iterator<Item = B>,
591 B: Borrow<T>,
592 {
593 let ctx = ctx.borrow();
594 for value in iter {
595 self.send_ctx(ctx, value);
596 }
597 }
598
599 /// Similar to `feed_ctx()`, but the context is set to the type's default value.
600 pub fn feed<B, I>(&self, iter: I)
601 where
602 I: Iterator<Item = B>,
603 B: Borrow<T>,
604 C: Default,
605 {
606 let ctx = C::default();
607 self.feed_ctx(&ctx, iter);
608 }
609}
610
611/// Simplified broadcast that only transmits values without context.
612pub type SimpleBroadcast<'a, T> = Broadcast<'a, (), T>;
613
614impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Default for Broadcast<'a, C, T> {
615 fn default() -> Self {
616 Self::new()
617 }
618}
619
620impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Clone for Broadcast<'a, C, T> {
621 fn clone(&self) -> Self {
622 Self { observers: self.observers.clone() }
623 }
624}
625
626impl<'a, C: 'a + ?Sized, T: 'a + ?Sized> Stream<'a> for Broadcast<'a, C, T> {
627 type Context = C;
628 type Item = T;
629
630 fn subscribe_ctx<O>(self, observer: O)
631 where
632 O: FnMut(&Self::Context, &Self::Item) + 'a,
633 {
634 self.push(observer);
635 }
636}
637
638pub struct WithContext<S, T> {
639 stream: S,
640 ctx: T,
641}
642
643impl<'a, S, T: 'a> Stream<'a> for WithContext<S, T>
644where
645 S: Stream<'a>,
646{
647 type Context = T;
648 type Item = S::Item;
649
650 fn subscribe_ctx<O>(self, mut observer: O)
651 where
652 O: FnMut(&Self::Context, &Self::Item) + 'a,
653 {
654 let ctx = self.ctx;
655 self.stream.subscribe_ctx(move |_ctx, x| {
656 observer(&ctx, x);
657 })
658 }
659}
660
661pub struct WithContextMap<S, F> {
662 stream: S,
663 func: F,
664}
665
666impl<'a, S, F, T> Stream<'a> for WithContextMap<S, F>
667where
668 S: Stream<'a>,
669 F: 'a + FnMut(&S::Context, &S::Item) -> T,
670{
671 type Context = T;
672 type Item = S::Item;
673
674 fn subscribe_ctx<O>(self, mut observer: O)
675 where
676 O: FnMut(&Self::Context, &Self::Item) + 'a,
677 {
678 let mut func = self.func;
679 self.stream.subscribe_ctx(move |ctx, x| {
680 observer(&func(ctx, x), x);
681 })
682 }
683}
684
685pub struct Context<S> {
686 stream: S,
687}
688
689impl<'a, S> Stream<'a> for Context<S>
690where
691 S: Stream<'a>,
692{
693 type Context = S::Context;
694 type Item = S::Context;
695
696 fn subscribe_ctx<O>(self, mut observer: O)
697 where
698 O: FnMut(&Self::Context, &Self::Item) + 'a,
699 {
700 self.stream.subscribe_ctx(move |ctx, _x| {
701 observer(ctx, ctx);
702 })
703 }
704}
705
706pub struct Map<S, F> {
707 stream: S,
708 func: F,
709}
710
711impl<'a, S, F> Stream<'a> for Map<S, F>
712where
713 S: Stream<'a>,
714 F: 'a + ContextFn<S::Context, S::Item>,
715{
716 type Context = S::Context;
717 type Item = F::Output;
718
719 fn subscribe_ctx<O>(self, mut observer: O)
720 where
721 O: FnMut(&Self::Context, &Self::Item) + 'a,
722 {
723 let mut func = self.func;
724 self.stream.subscribe_ctx(move |ctx, x| observer(ctx, &func.call_mut(ctx, x)))
725 }
726}
727
728pub struct MapBoth<S, F> {
729 stream: S,
730 func: F,
731}
732
733impl<'a, S, F, C, T> Stream<'a> for MapBoth<S, F>
734where
735 S: Stream<'a>,
736 F: 'a + ContextFn<S::Context, S::Item, Output = (C, T)>,
737{
738 type Context = C;
739 type Item = T;
740
741 fn subscribe_ctx<O>(self, mut observer: O)
742 where
743 O: FnMut(&Self::Context, &Self::Item) + 'a,
744 {
745 let mut func = self.func;
746 self.stream.subscribe_ctx(move |ctx, x| {
747 let (ctx, x) = func.call_mut(ctx, x);
748 observer(&ctx, &x);
749 })
750 }
751}
752
753pub struct Filter<S, F> {
754 stream: S,
755 func: F,
756}
757
758impl<'a, S, F> Stream<'a> for Filter<S, F>
759where
760 S: Stream<'a>,
761 F: 'a + ContextFn<S::Context, S::Item, Output = bool>,
762{
763 type Context = S::Context;
764 type Item = S::Item;
765
766 fn subscribe_ctx<O>(self, mut observer: O)
767 where
768 O: 'a + FnMut(&Self::Context, &Self::Item),
769 {
770 let mut func = self.func;
771 self.stream.subscribe_ctx(move |ctx, x| {
772 if func.call_mut(ctx, x) {
773 observer(ctx, x);
774 }
775 });
776 }
777}
778
779pub struct FilterMap<S, F> {
780 stream: S,
781 func: F,
782}
783
784impl<'a, S, F, T> Stream<'a> for FilterMap<S, F>
785where
786 S: Stream<'a>,
787 F: 'a + ContextFn<S::Context, S::Item, Output = Option<T>>,
788{
789 type Context = S::Context;
790 type Item = T;
791
792 fn subscribe_ctx<O>(self, mut observer: O)
793 where
794 O: 'a + FnMut(&Self::Context, &Self::Item),
795 {
796 let mut func = self.func;
797 self.stream.subscribe_ctx(move |ctx, x| {
798 if let Some(x) = func.call_mut(ctx, x) {
799 observer(ctx, &x);
800 }
801 });
802 }
803}
804
805pub struct Fold<S, F, T> {
806 stream: S,
807 init: T,
808 func: F,
809}
810
811impl<'a, S, F, T: 'a> Stream<'a> for Fold<S, F, T>
812where
813 S: Stream<'a>,
814 F: 'a + ContextFoldFn<S::Context, S::Item, T, Output = T>,
815{
816 type Context = S::Context;
817 type Item = T;
818
819 fn subscribe_ctx<O>(self, mut observer: O)
820 where
821 O: FnMut(&Self::Context, &Self::Item) + 'a,
822 {
823 let mut value = self.init;
824 let mut func = self.func;
825 self.stream.subscribe_ctx(move |ctx, x| {
826 value = func.call_mut(ctx, &value, x);
827 observer(ctx, &value);
828 })
829 }
830}
831
832pub struct Inspect<S, F> {
833 stream: S,
834 func: F,
835}
836
837impl<'a, S, F> Stream<'a> for Inspect<S, F>
838where
839 S: Stream<'a>,
840 F: 'a + ContextFn<S::Context, S::Item, Output = ()>,
841{
842 type Context = S::Context;
843 type Item = S::Item;
844
845 fn subscribe_ctx<O>(self, mut observer: O)
846 where
847 O: FnMut(&Self::Context, &Self::Item) + 'a,
848 {
849 let mut func = self.func;
850 self.stream.subscribe_ctx(move |ctx, x| {
851 func.call_mut(ctx, x);
852 observer(ctx, x);
853 })
854 }
855}
856
857#[cfg(any(test, feature = "slice-deque"))]
858pub struct LastN<S, T: Sized> {
859 count: usize,
860 stream: S,
861 data: Rc<RefCell<SliceDeque<T>>>,
862}
863
864#[cfg(any(test, feature = "slice-deque"))]
865impl<'a, S, T> Stream<'a> for LastN<S, T>
866where
867 S: Stream<'a, Item = T>,
868 T: 'a + Clone + Sized,
869{
870 type Context = S::Context;
871 type Item = [T];
872
873 fn subscribe_ctx<O>(self, mut observer: O)
874 where
875 O: 'a + FnMut(&Self::Context, &Self::Item),
876 {
877 let data = self.data.clone();
878 let count = self.count;
879 self.stream.subscribe_ctx(move |ctx, x| {
880 let mut queue = data.borrow_mut();
881 if queue.len() == count {
882 queue.pop_front();
883 }
884 queue.push_back(x.clone());
885 drop(queue); // this is important, in order to avoid multiple mutable borrows
886 observer(ctx, &*data.as_ref().borrow());
887 })
888 }
889}