another_rxrust/operators/
skip.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 marker::PhantomData,
5 sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct Skip<Item>
10where
11 Item: Clone + Send + Sync,
12{
13 count: usize,
14 _item: PhantomData<Item>,
15}
16
17impl<'a, Item> Skip<Item>
18where
19 Item: Clone + Send + Sync,
20{
21 pub fn new(count: usize) -> Skip<Item> {
22 Skip { count, _item: PhantomData }
23 }
24 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
25 let count = self.count;
26
27 Observable::<Item>::create(move |s| {
28 let n = Arc::new(RwLock::new(0));
29
30 let sctl = StreamController::new(s);
31 let sctl_next = sctl.clone();
32 let sctl_error = sctl.clone();
33 let sctl_complete = sctl.clone();
34
35 source.inner_subscribe(sctl.new_observer(
36 move |_, x| {
37 let emit = {
38 let mut n = n.write().unwrap();
39 let nn = *n;
40 *n += 1;
41 nn >= count
42 };
43 if emit {
44 sctl_next.sink_next(x);
45 }
46 },
47 move |_, e| {
48 sctl_error.sink_error(e);
49 },
50 move |serial| sctl_complete.sink_complete(&serial),
51 ));
52 })
53 }
54}
55
56impl<'a, Item> Observable<'a, Item>
57where
58 Item: Clone + Send + Sync,
59{
60 pub fn skip(&self, count: usize) -> Observable<'a, Item> {
61 Skip::new(count).execute(self.clone())
62 }
63}
64
65#[cfg(test)]
66mod test {
67 use crate::prelude::*;
68 use std::{thread, time};
69
70 #[test]
71 fn basic() {
72 let o = Observable::create(|s| {
73 for n in 0..5 {
74 s.next(n);
75 }
76 s.complete();
77 });
78
79 o.skip(2).subscribe(
80 print_next_fmt!("{}"),
81 print_error!(),
82 print_complete!(),
83 );
84 }
85
86 #[test]
87 fn thread() {
88 let o = Observable::create(|s| {
89 for n in 0..5 {
90 if !s.is_subscribed() {
91 println!("break!");
92 break;
93 }
94 println!("emit {}", n);
95 s.next(n);
96 thread::sleep(time::Duration::from_millis(100));
97 }
98 if s.is_subscribed() {
99 s.complete();
100 }
101 });
102
103 o.skip(2).subscribe(
104 print_next_fmt!("{}"),
105 print_error!(),
106 print_complete!(),
107 );
108 thread::sleep(time::Duration::from_millis(1000));
109 }
110}