another_rxrust/operators/
skip_while.rs1use crate::internals::{function_wrapper::*, stream_controller::*};
2use crate::prelude::*;
3use std::sync::{Arc, RwLock};
4
5#[derive(Clone)]
6pub struct SkipWhile<'a, Item>
7where
8 Item: Clone + Send + Sync,
9{
10 predicate_f: FunctionWrapper<'a, Item, bool>,
11}
12
13impl<'a, Item> SkipWhile<'a, Item>
14where
15 Item: Clone + Send + Sync,
16{
17 pub fn new<F>(f: F) -> SkipWhile<'a, Item>
18 where
19 F: Fn(Item) -> bool + Send + Sync + 'a,
20 {
21 SkipWhile { predicate_f: FunctionWrapper::new(f) }
22 }
23 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24 let f = self.predicate_f.clone();
25
26 Observable::<Item>::create(move |s| {
27 let enable = Arc::new(RwLock::new(false));
28
29 let f = f.clone();
30
31 let sctl = StreamController::new(s);
32 let sctl_next = sctl.clone();
33 let sctl_error = sctl.clone();
34 let sctl_complete = sctl.clone();
35
36 source.inner_subscribe(sctl.new_observer(
37 move |_, x: Item| {
38 if *enable.read().unwrap() {
39 sctl_next.sink_next(x);
40 } else {
41 if f.call(x.clone()) {
42 sctl_next.sink_next(x);
43 *enable.write().unwrap() = true;
44 }
45 }
46 },
47 move |_, e| {
48 sctl_error.sink_error(e);
49 },
50 move |serial| sctl_complete.sink_complete(&serial),
51 ));
52 })
53 }
54}
55
56impl<'a, Item> Observable<'a, Item>
57where
58 Item: Clone + Send + Sync,
59{
60 pub fn skip_while<F>(&self, f: F) -> Observable<'a, Item>
61 where
62 F: Fn(Item) -> bool + Send + Sync + 'a,
63 {
64 SkipWhile::new(f).execute(self.clone())
65 }
66}
67
68#[cfg(test)]
69mod test {
70 use crate::prelude::*;
71
72 #[test]
73 fn basic() {
74 let o = Observable::create(|s| {
75 for n in 0..10 {
76 s.next(n);
77 }
78 s.complete();
79 });
80
81 o.skip_while(|x| x > 5).subscribe(
82 print_next_fmt!("{}"),
83 print_error!(),
84 print_complete!(),
85 );
86 }
87}