another_rxrust/operators/
skip_last.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  collections::VecDeque,
5  marker::PhantomData,
6  sync::{Arc, RwLock},
7};
8
9#[derive(Clone)]
10pub struct SkipLast<Item>
11where
12  Item: Clone + Send + Sync,
13{
14  count: usize,
15  _item: PhantomData<Item>,
16}
17
18impl<'a, Item> SkipLast<Item>
19where
20  Item: Clone + Send + Sync,
21{
22  pub fn new(count: usize) -> SkipLast<Item> {
23    SkipLast { count, _item: PhantomData }
24  }
25  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
26    let count = self.count;
27    Observable::<Item>::create(move |s| {
28      let items = Arc::new(RwLock::new(VecDeque::new()));
29      let items_next = Arc::clone(&items);
30      let sctl = StreamController::new(s);
31      let sctl_next = sctl.clone();
32      let sctl_error = sctl.clone();
33      let sctl_complete = sctl.clone();
34
35      source.inner_subscribe(sctl.new_observer(
36        move |_, x| {
37          let emit = {
38            let mut items = items_next.write().unwrap();
39            items.push_back(x);
40            if items.len() > count {
41              items.pop_front()
42            } else {
43              None
44            }
45          };
46          if let Some(x) = emit {
47            sctl_next.sink_next(x);
48          }
49        },
50        move |_, e| {
51          sctl_error.sink_error(e);
52        },
53        move |serial| sctl_complete.sink_complete(&serial),
54      ));
55    })
56  }
57}
58
59impl<'a, Item> Observable<'a, Item>
60where
61  Item: Clone + Send + Sync,
62{
63  pub fn skip_last(&self, count: usize) -> Observable<'a, Item> {
64    SkipLast::new(count).execute(self.clone())
65  }
66}
67
68#[cfg(test)]
69mod test {
70  use crate::prelude::*;
71  use std::{thread, time};
72
73  #[test]
74  fn basic() {
75    let o = Observable::create(|s| {
76      for n in 0..10 {
77        s.next(n);
78      }
79      s.complete();
80    });
81
82    o.skip_last(5).subscribe(
83      print_next_fmt!("{}"),
84      print_error!(),
85      print_complete!(),
86    );
87  }
88
89  #[test]
90  fn thread() {
91    let o = Observable::create(|s| {
92      for n in 0..5 {
93        if !s.is_subscribed() {
94          println!("break!");
95          break;
96        }
97        println!("emit {}", n);
98        s.next(n);
99        thread::sleep(time::Duration::from_millis(100));
100      }
101      if s.is_subscribed() {
102        s.complete();
103      }
104    });
105
106    o.skip_last(2).subscribe(
107      print_next_fmt!("{}"),
108      print_error!(),
109      print_complete!(),
110    );
111    thread::sleep(time::Duration::from_millis(1000));
112  }
113}