another_rxrust/operators/
take_while.rs

1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct TakeWhile<'a, Item>
6where
7  Item: Clone + Send + Sync,
8{
9  predicate_f: FunctionWrapper<'a, Item, bool>,
10}
11
12impl<'a, Item> TakeWhile<'a, Item>
13where
14  Item: Clone + Send + Sync,
15{
16  pub fn new<F>(f: F) -> TakeWhile<'a, Item>
17  where
18    F: Fn(Item) -> bool + Send + Sync + 'a,
19  {
20    TakeWhile { predicate_f: FunctionWrapper::new(f) }
21  }
22  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
23    let f = self.predicate_f.clone();
24
25    Observable::<Item>::create(move |s| {
26      let f = f.clone();
27
28      let sctl = StreamController::new(s);
29      let sctl_next = sctl.clone();
30      let sctl_error = sctl.clone();
31      let sctl_complete = sctl.clone();
32
33      source.inner_subscribe(sctl.new_observer(
34        move |serial, x: Item| {
35          if f.call(x.clone()) {
36            sctl_next.sink_next(x);
37          } else {
38            sctl_next.sink_complete(&serial)
39          }
40        },
41        move |_, e| {
42          sctl_error.sink_error(e);
43        },
44        move |serial| sctl_complete.sink_complete(&serial),
45      ));
46    })
47  }
48}
49
50impl<'a, Item> Observable<'a, Item>
51where
52  Item: Clone + Send + Sync,
53{
54  pub fn take_while<F>(&self, f: F) -> Observable<'a, Item>
55  where
56    F: Fn(Item) -> bool + Send + Sync + 'a,
57  {
58    TakeWhile::new(f).execute(self.clone())
59  }
60}
61
62#[cfg(test)]
63mod test {
64  use crate::prelude::*;
65
66  #[test]
67  fn basic() {
68    let o = Observable::create(|s| {
69      for n in 0..10 {
70        s.next(n);
71      }
72      s.complete();
73    });
74
75    o.take_while(|x| x < 5).subscribe(
76      print_next_fmt!("{}"),
77      print_error!(),
78      print_complete!(),
79    );
80  }
81}