another_rxrust/operators/
distinct_until_changed.rs1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4 marker::PhantomData,
5 sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct DistinctUntilChanged<Item>
10where
11 Item: Clone + Send + Sync + PartialEq,
12{
13 _item: PhantomData<Item>,
14}
15
16impl<'a, Item> DistinctUntilChanged<Item>
17where
18 Item: Clone + Send + Sync + PartialEq,
19{
20 pub fn new() -> DistinctUntilChanged<Item> {
21 DistinctUntilChanged { _item: PhantomData }
22 }
23 pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24 Observable::<Item>::create(move |s| {
25 let last = Arc::new(RwLock::new(Option::<Item>::None));
26
27 let sctl = StreamController::new(s);
28 let sctl_next = sctl.clone();
29 let sctl_error = sctl.clone();
30 let sctl_complete = sctl.clone();
31
32 source.inner_subscribe(sctl.new_observer(
33 move |_, x: Item| {
34 let last_x = {
35 if let Some(x) = &*last.read().unwrap() {
36 Some(x.clone())
37 } else {
38 None
39 }
40 };
41 if let Some(last_x) = last_x {
42 if last_x != x {
43 *last.write().unwrap() = Some(x.clone());
44 sctl_next.sink_next(x);
45 }
46 } else {
47 *last.write().unwrap() = Some(x.clone());
48 sctl_next.sink_next(x);
49 }
50 },
51 move |_, e| {
52 sctl_error.sink_error(e);
53 },
54 move |serial| sctl_complete.sink_complete(&serial),
55 ));
56 })
57 }
58}
59
60impl<'a, Item> Observable<'a, Item>
61where
62 Item: Clone + Send + Sync + PartialEq,
63{
64 pub fn distinct_until_changed(&self) -> Observable<'a, Item> {
65 DistinctUntilChanged::new().execute(self.clone())
66 }
67}
68
69#[cfg(test)]
70mod test {
71 use crate::prelude::*;
72
73 #[test]
74 fn basic() {
75 let o = Observable::create(|s| {
76 s.next(0);
77 s.next(0);
78 s.next(1);
79 s.next(1);
80 s.next(2);
81 s.next(2);
82 s.next(2);
83 s.next(2);
84 s.next(3);
85 s.complete();
86 });
87
88 o.distinct_until_changed().subscribe(
89 print_next_fmt!("{}"),
90 print_error!(),
91 print_complete!(),
92 );
93 }
94}