another_rxrust/operators/
skip_while.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct SkipWhile<'a, Item>
7where
8  Item: Clone + Send + Sync,
9{
10  predicate_f: FunctionWrapper<'a, Item, bool>,
11}
12
13impl<'a, Item> SkipWhile<'a, Item>
14where
15  Item: Clone + Send + Sync,
16{
17  pub fn new<F>(f: F) -> SkipWhile<'a, Item>
18  where
19    F: Fn(Item) -> bool + Send + Sync + 'a,
20  {
21    SkipWhile { predicate_f: FunctionWrapper::new(f) }
22  }
23  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24    let f = self.predicate_f.clone();
25
26    Observable::<Item>::create(move |s| {
27      let enable = Arc::new(RwLock::new(false));
28
29      let f = f.clone();
30
31      let sctl = StreamController::new(s);
32      let sctl_next = sctl.clone();
33      let sctl_error = sctl.clone();
34      let sctl_complete = sctl.clone();
35
36      source.inner_subscribe(sctl.new_observer(
37        move |_, x: Item| {
38          if *enable.read().unwrap() {
39            sctl_next.sink_next(x);
40          } else {
41            if f.call(x.clone()) {
42              sctl_next.sink_next(x);
43              *enable.write().unwrap() = true;
44            }
45          }
46        },
47        move |_, e| {
48          sctl_error.sink_error(e);
49        },
50        move |serial| sctl_complete.sink_complete(&serial),
51      ));
52    })
53  }
54}
55
56impl<'a, Item> Observable<'a, Item>
57where
58  Item: Clone + Send + Sync,
59{
60  pub fn skip_while<F>(&self, f: F) -> Observable<'a, Item>
61  where
62    F: Fn(Item) -> bool + Send + Sync + 'a,
63  {
64    SkipWhile::new(f).execute(self.clone())
65  }
66}
67
68#[cfg(test)]
69mod test {
70  use crate::prelude::*;
71
72  #[test]
73  fn basic() {
74    let o = Observable::create(|s| {
75      for n in 0..10 {
76        s.next(n);
77      }
78      s.complete();
79    });
80
81    o.skip_while(|x| x > 5).subscribe(
82      print_next_fmt!("{}"),
83      print_error!(),
84      print_complete!(),
85    );
86  }
87}