another_rxrust/operators/
skip_last.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 collections::VecDeque,
5 marker::PhantomData,
6 sync::{Arc, RwLock},
7};
8
9#[derive(Clone)]
10pub struct SkipLast<Item>
11where
12 Item: Clone + Send + Sync,
13{
14 count: usize,
15 _item: PhantomData<Item>,
16}
17
18impl<'a, Item> SkipLast<Item>
19where
20 Item: Clone + Send + Sync,
21{
22 pub fn new(count: usize) -> SkipLast<Item> {
23 SkipLast { count, _item: PhantomData }
24 }
25 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
26 let count = self.count;
27 Observable::<Item>::create(move |s| {
28 let items = Arc::new(RwLock::new(VecDeque::new()));
29 let items_next = Arc::clone(&items);
30 let sctl = StreamController::new(s);
31 let sctl_next = sctl.clone();
32 let sctl_error = sctl.clone();
33 let sctl_complete = sctl.clone();
34
35 source.inner_subscribe(sctl.new_observer(
36 move |_, x| {
37 let emit = {
38 let mut items = items_next.write().unwrap();
39 items.push_back(x);
40 if items.len() > count {
41 items.pop_front()
42 } else {
43 None
44 }
45 };
46 if let Some(x) = emit {
47 sctl_next.sink_next(x);
48 }
49 },
50 move |_, e| {
51 sctl_error.sink_error(e);
52 },
53 move |serial| sctl_complete.sink_complete(&serial),
54 ));
55 })
56 }
57}
58
59impl<'a, Item> Observable<'a, Item>
60where
61 Item: Clone + Send + Sync,
62{
63 pub fn skip_last(&self, count: usize) -> Observable<'a, Item> {
64 SkipLast::new(count).execute(self.clone())
65 }
66}
67
68#[cfg(test)]
69mod test {
70 use crate::prelude::*;
71 use std::{thread, time};
72
73 #[test]
74 fn basic() {
75 let o = Observable::create(|s| {
76 for n in 0..10 {
77 s.next(n);
78 }
79 s.complete();
80 });
81
82 o.skip_last(5).subscribe(
83 print_next_fmt!("{}"),
84 print_error!(),
85 print_complete!(),
86 );
87 }
88
89 #[test]
90 fn thread() {
91 let o = Observable::create(|s| {
92 for n in 0..5 {
93 if !s.is_subscribed() {
94 println!("break!");
95 break;
96 }
97 println!("emit {}", n);
98 s.next(n);
99 thread::sleep(time::Duration::from_millis(100));
100 }
101 if s.is_subscribed() {
102 s.complete();
103 }
104 });
105
106 o.skip_last(2).subscribe(
107 print_next_fmt!("{}"),
108 print_error!(),
109 print_complete!(),
110 );
111 thread::sleep(time::Duration::from_millis(1000));
112 }
113}