another_rxrust/operators/
skip.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Skip<Item>
10where
11  Item: Clone + Send + Sync,
12{
13  count: usize,
14  _item: PhantomData<Item>,
15}
16
17impl<'a, Item> Skip<Item>
18where
19  Item: Clone + Send + Sync,
20{
21  pub fn new(count: usize) -> Skip<Item> {
22    Skip { count, _item: PhantomData }
23  }
24  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
25    let count = self.count;
26
27    Observable::<Item>::create(move |s| {
28      let n = Arc::new(RwLock::new(0));
29
30      let sctl = StreamController::new(s);
31      let sctl_next = sctl.clone();
32      let sctl_error = sctl.clone();
33      let sctl_complete = sctl.clone();
34
35      source.inner_subscribe(sctl.new_observer(
36        move |_, x| {
37          let emit = {
38            let mut n = n.write().unwrap();
39            let nn = *n;
40            *n += 1;
41            nn >= count
42          };
43          if emit {
44            sctl_next.sink_next(x);
45          }
46        },
47        move |_, e| {
48          sctl_error.sink_error(e);
49        },
50        move |serial| sctl_complete.sink_complete(&serial),
51      ));
52    })
53  }
54}
55
56impl<'a, Item> Observable<'a, Item>
57where
58  Item: Clone + Send + Sync,
59{
60  pub fn skip(&self, count: usize) -> Observable<'a, Item> {
61    Skip::new(count).execute(self.clone())
62  }
63}
64
65#[cfg(test)]
66mod test {
67  use crate::prelude::*;
68  use std::{thread, time};
69
70  #[test]
71  fn basic() {
72    let o = Observable::create(|s| {
73      for n in 0..5 {
74        s.next(n);
75      }
76      s.complete();
77    });
78
79    o.skip(2).subscribe(
80      print_next_fmt!("{}"),
81      print_error!(),
82      print_complete!(),
83    );
84  }
85
86  #[test]
87  fn thread() {
88    let o = Observable::create(|s| {
89      for n in 0..5 {
90        if !s.is_subscribed() {
91          println!("break!");
92          break;
93        }
94        println!("emit {}", n);
95        s.next(n);
96        thread::sleep(time::Duration::from_millis(100));
97      }
98      if s.is_subscribed() {
99        s.complete();
100      }
101    });
102
103    o.skip(2).subscribe(
104      print_next_fmt!("{}"),
105      print_error!(),
106      print_complete!(),
107    );
108    thread::sleep(time::Duration::from_millis(1000));
109  }
110}