another_rxrust/operators/
take_while.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3
4#[derive(Clone)]
5pub struct TakeWhile<'a, Item>
6where
7 Item: Clone + Send + Sync,
8{
9 predicate_f: FunctionWrapper<'a, Item, bool>,
10}
11
12impl<'a, Item> TakeWhile<'a, Item>
13where
14 Item: Clone + Send + Sync,
15{
16 pub fn new<F>(f: F) -> TakeWhile<'a, Item>
17 where
18 F: Fn(Item) -> bool + Send + Sync + 'a,
19 {
20 TakeWhile { predicate_f: FunctionWrapper::new(f) }
21 }
22 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
23 let f = self.predicate_f.clone();
24
25 Observable::<Item>::create(move |s| {
26 let f = f.clone();
27
28 let sctl = StreamController::new(s);
29 let sctl_next = sctl.clone();
30 let sctl_error = sctl.clone();
31 let sctl_complete = sctl.clone();
32
33 source.inner_subscribe(sctl.new_observer(
34 move |serial, x: Item| {
35 if f.call(x.clone()) {
36 sctl_next.sink_next(x);
37 } else {
38 sctl_next.sink_complete(&serial)
39 }
40 },
41 move |_, e| {
42 sctl_error.sink_error(e);
43 },
44 move |serial| sctl_complete.sink_complete(&serial),
45 ));
46 })
47 }
48}
49
50impl<'a, Item> Observable<'a, Item>
51where
52 Item: Clone + Send + Sync,
53{
54 pub fn take_while<F>(&self, f: F) -> Observable<'a, Item>
55 where
56 F: Fn(Item) -> bool + Send + Sync + 'a,
57 {
58 TakeWhile::new(f).execute(self.clone())
59 }
60}
61
62#[cfg(test)]
63mod test {
64 use crate::prelude::*;
65
66 #[test]
67 fn basic() {
68 let o = Observable::create(|s| {
69 for n in 0..10 {
70 s.next(n);
71 }
72 s.complete();
73 });
74
75 o.take_while(|x| x < 5).subscribe(
76 print_next_fmt!("{}"),
77 print_error!(),
78 print_complete!(),
79 );
80 }
81}