another_rxrust/operators/
take_last.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 collections::VecDeque,
5 marker::PhantomData,
6 sync::{Arc, RwLock},
7};
8
9#[derive(Clone)]
10pub struct TakeLast<Item>
11where
12 Item: Clone + Send + Sync,
13{
14 count: usize,
15 _item: PhantomData<Item>,
16}
17
18impl<'a, Item> TakeLast<Item>
19where
20 Item: Clone + Send + Sync,
21{
22 pub fn new(count: usize) -> TakeLast<Item> {
23 TakeLast { count, _item: PhantomData }
24 }
25 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
26 let count = self.count;
27
28 Observable::<Item>::create(move |s| {
29 let items = Arc::new(RwLock::new(VecDeque::new()));
30 let items_next = Arc::clone(&items);
31 let items_complete = Arc::clone(&items);
32 let sctl = StreamController::new(s);
33 let sctl_error = sctl.clone();
34 let sctl_complete = sctl.clone();
35
36 source.inner_subscribe(sctl.new_observer(
37 move |_, x: Item| {
38 let mut items = items_next.write().unwrap();
39 items.push_back(x);
40 if items.len() > count {
41 items.pop_front();
42 }
43 },
44 move |_, e| {
45 sctl_error.sink_error(e);
46 },
47 move |serial| {
48 for x in items_complete.read().unwrap().iter() {
49 if !sctl_complete.is_subscribed() {
50 break;
51 }
52 sctl_complete.sink_next(x.clone());
53 }
54 sctl_complete.sink_complete(&serial)
55 },
56 ));
57 })
58 }
59}
60
61impl<'a, Item> Observable<'a, Item>
62where
63 Item: Clone + Send + Sync,
64{
65 pub fn take_last(&self, count: usize) -> Observable<'a, Item> {
66 TakeLast::new(count).execute(self.clone())
67 }
68}
69
70#[cfg(test)]
71mod test {
72 use crate::prelude::*;
73 use std::{thread, time};
74
75 #[test]
76 fn basic() {
77 let o = Observable::create(|s| {
78 for n in 0..10 {
79 s.next(n);
80 }
81 s.complete();
82 });
83
84 o.take_last(2).subscribe(
85 print_next_fmt!("{}"),
86 print_error!(),
87 print_complete!(),
88 );
89 }
90
91 #[test]
92 fn thread() {
93 let o = Observable::create(|s| {
94 for n in 0..5 {
95 if !s.is_subscribed() {
96 println!("break!");
97 break;
98 }
99 println!("emit {}", n);
100 s.next(n);
101 thread::sleep(time::Duration::from_millis(100));
102 }
103 if s.is_subscribed() {
104 s.complete();
105 }
106 });
107
108 o.take_last(2).subscribe(
109 print_next_fmt!("{}"),
110 print_error!(),
111 print_complete!(),
112 );
113 thread::sleep(time::Duration::from_millis(1000));
114 }
115}