rx_rust/operators/conditional_boolean/
skip_while.rs

1use crate::utils::types::NecessarySend;
2use crate::{
3    disposable::subscription::Subscription,
4    observable::Observable,
5    observer::{Observer, Termination},
6};
7use educe::Educe;
8
9/// Discards items emitted by an Observable until a specified condition becomes false.
10/// See <https://reactivex.io/documentation/operators/skipwhile.html>
11///
12/// # Examples
13/// ```rust
14/// use rx_rust::{
15///     observable::observable_ext::ObservableExt,
16///     observer::Termination,
17///     operators::{
18///         conditional_boolean::skip_while::SkipWhile,
19///         creating::from_iter::FromIter,
20///     },
21/// };
22///
23/// let mut values = Vec::new();
24/// let mut terminations = Vec::new();
25///
26/// let observable = SkipWhile::new(FromIter::new(vec![1, 2, 3, 4]), |value| *value < 3);
27/// observable.subscribe_with_callback(
28///     |value| values.push(value),
29///     |termination| terminations.push(termination),
30/// );
31///
32/// assert_eq!(values, vec![3, 4]);
33/// assert_eq!(terminations, vec![Termination::Completed]);
34/// ```
35#[derive(Educe)]
36#[educe(Debug, Clone)]
37pub struct SkipWhile<OE, F> {
38    source: OE,
39    callback: F,
40}
41
42impl<OE, F> SkipWhile<OE, F> {
43    pub fn new<'or, 'sub, T, E>(source: OE, callback: F) -> Self
44    where
45        OE: Observable<'or, 'sub, T, E>,
46        F: FnMut(&T) -> bool,
47    {
48        Self { source, callback }
49    }
50}
51
52impl<'or, 'sub, T, E, OE, F> Observable<'or, 'sub, T, E> for SkipWhile<OE, F>
53where
54    OE: Observable<'or, 'sub, T, E>,
55    F: FnMut(&T) -> bool + NecessarySend + 'or,
56{
57    fn subscribe(self, observer: impl Observer<T, E> + NecessarySend + 'or) -> Subscription<'sub> {
58        let observer = SkipWhileObserver {
59            observer,
60            callback: self.callback,
61            skip: true,
62        };
63        self.source.subscribe(observer)
64    }
65}
66
67struct SkipWhileObserver<OR, F> {
68    observer: OR,
69    callback: F,
70    skip: bool,
71}
72
73impl<T, E, OR, F> Observer<T, E> for SkipWhileObserver<OR, F>
74where
75    OR: Observer<T, E>,
76    F: FnMut(&T) -> bool,
77{
78    fn on_next(&mut self, value: T) {
79        if !self.skip {
80            self.observer.on_next(value);
81        } else {
82            self.skip = (self.callback)(&value);
83            if !self.skip {
84                self.observer.on_next(value);
85            }
86        }
87    }
88
89    fn on_termination(self, termination: Termination<E>) {
90        self.observer.on_termination(termination)
91    }
92}