another_rxrust/operators/
take_last.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  collections::VecDeque,
5  marker::PhantomData,
6  sync::{Arc, RwLock},
7};
8
9#[derive(Clone)]
10pub struct TakeLast<Item>
11where
12  Item: Clone + Send + Sync,
13{
14  count: usize,
15  _item: PhantomData<Item>,
16}
17
18impl<'a, Item> TakeLast<Item>
19where
20  Item: Clone + Send + Sync,
21{
22  pub fn new(count: usize) -> TakeLast<Item> {
23    TakeLast { count, _item: PhantomData }
24  }
25  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
26    let count = self.count;
27
28    Observable::<Item>::create(move |s| {
29      let items = Arc::new(RwLock::new(VecDeque::new()));
30      let items_next = Arc::clone(&items);
31      let items_complete = Arc::clone(&items);
32      let sctl = StreamController::new(s);
33      let sctl_error = sctl.clone();
34      let sctl_complete = sctl.clone();
35
36      source.inner_subscribe(sctl.new_observer(
37        move |_, x: Item| {
38          let mut items = items_next.write().unwrap();
39          items.push_back(x);
40          if items.len() > count {
41            items.pop_front();
42          }
43        },
44        move |_, e| {
45          sctl_error.sink_error(e);
46        },
47        move |serial| {
48          for x in items_complete.read().unwrap().iter() {
49            if !sctl_complete.is_subscribed() {
50              break;
51            }
52            sctl_complete.sink_next(x.clone());
53          }
54          sctl_complete.sink_complete(&serial)
55        },
56      ));
57    })
58  }
59}
60
61impl<'a, Item> Observable<'a, Item>
62where
63  Item: Clone + Send + Sync,
64{
65  pub fn take_last(&self, count: usize) -> Observable<'a, Item> {
66    TakeLast::new(count).execute(self.clone())
67  }
68}
69
70#[cfg(test)]
71mod test {
72  use crate::prelude::*;
73  use std::{thread, time};
74
75  #[test]
76  fn basic() {
77    let o = Observable::create(|s| {
78      for n in 0..10 {
79        s.next(n);
80      }
81      s.complete();
82    });
83
84    o.take_last(2).subscribe(
85      print_next_fmt!("{}"),
86      print_error!(),
87      print_complete!(),
88    );
89  }
90
91  #[test]
92  fn thread() {
93    let o = Observable::create(|s| {
94      for n in 0..5 {
95        if !s.is_subscribed() {
96          println!("break!");
97          break;
98        }
99        println!("emit {}", n);
100        s.next(n);
101        thread::sleep(time::Duration::from_millis(100));
102      }
103      if s.is_subscribed() {
104        s.complete();
105      }
106    });
107
108    o.take_last(2).subscribe(
109      print_next_fmt!("{}"),
110      print_error!(),
111      print_complete!(),
112    );
113    thread::sleep(time::Duration::from_millis(1000));
114  }
115}