rx_rust/operators/conditional_boolean/
skip_while.rs

1use crate::utils::types::NecessarySendSync;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Discards items emitted by an Observable until a specified condition becomes false.
10/// See <https://reactivex.io/documentation/operators/skipwhile.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         conditional_boolean::skip_while::SkipWhile,
19///         creating::from_iter::FromIter,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = SkipWhile::new(FromIter::new(vec![1, 2, 3, 4]), |value| *value < 3);
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![3, 4]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct SkipWhile<OE, F> {
38    source: OE,
39    callback: F,
40}
41
42impl<OE, F> SkipWhile<OE, F> {
43    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
44    where
45        OE: Observable<'or, 'sub, T, E>,
46        F: FnMut(&T) -> bool,
47    {
48        Self { source, callback }
49    }
50}
51
52impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for SkipWhile<OE, F>
53where
54    OE: Observable<'or, 'sub, T, E>,
55    F: FnMut(&T) -> bool + NecessarySendSync + 'or,
56{
57    fn subscribe(
58        self,
59        observer: impl Observer<T, E> + NecessarySendSync + 'or,
60    ) -> Subscription<'sub> {
61        let observer = SkipWhileObserver {
62            observer,
63            callback: self.callback,
64            skip: true,
65        };
66        self.source.subscribe(observer)
67    }
68}
69
70struct SkipWhileObserver<OR, F> {
71    observer: OR,
72    callback: F,
73    skip: bool,
74}
75
76impl<T, E, OR, F> Observer<T, E> for SkipWhileObserver<OR, F>
77where
78    OR: Observer<T, E>,
79    F: FnMut(&T) -> bool,
80{
81    fn on_next(&mut self, value: T) {
82        if !self.skip {
83            self.observer.on_next(value);
84        } else {
85            self.skip = (self.callback)(&value);
86            if !self.skip {
87                self.observer.on_next(value);
88            }
89        }
90    }
91
92    fn on_termination(self, termination: Termination<E>) {
93        self.observer.on_termination(termination)
94    }
95}