another_rxrust/operators/
distinct_until_changed.rs

1use crate::internals::stream_controller::*;
2use crate::prelude::*;
3use std::{
4  marker::PhantomData,
5  sync::{Arc, RwLock},
6};
7
8#[derive(Clone)]
9pub struct DistinctUntilChanged<Item>
10where
11  Item: Clone + Send + Sync + PartialEq,
12{
13  _item: PhantomData<Item>,
14}
15
16impl<'a, Item> DistinctUntilChanged<Item>
17where
18  Item: Clone + Send + Sync + PartialEq,
19{
20  pub fn new() -> DistinctUntilChanged<Item> {
21    DistinctUntilChanged { _item: PhantomData }
22  }
23  pub fn execute(&self, source: Observable<'a, Item>) -> Observable<'a, Item> {
24    Observable::<Item>::create(move |s| {
25      let last = Arc::new(RwLock::new(Option::<Item>::None));
26
27      let sctl = StreamController::new(s);
28      let sctl_next = sctl.clone();
29      let sctl_error = sctl.clone();
30      let sctl_complete = sctl.clone();
31
32      source.inner_subscribe(sctl.new_observer(
33        move |_, x: Item| {
34          let last_x = {
35            if let Some(x) = &*last.read().unwrap() {
36              Some(x.clone())
37            } else {
38              None
39            }
40          };
41          if let Some(last_x) = last_x {
42            if last_x != x {
43              *last.write().unwrap() = Some(x.clone());
44              sctl_next.sink_next(x);
45            }
46          } else {
47            *last.write().unwrap() = Some(x.clone());
48            sctl_next.sink_next(x);
49          }
50        },
51        move |_, e| {
52          sctl_error.sink_error(e);
53        },
54        move |serial| sctl_complete.sink_complete(&serial),
55      ));
56    })
57  }
58}
59
60impl<'a, Item> Observable<'a, Item>
61where
62  Item: Clone + Send + Sync + PartialEq,
63{
64  pub fn distinct_until_changed(&self) -> Observable<'a, Item> {
65    DistinctUntilChanged::new().execute(self.clone())
66  }
67}
68
69#[cfg(test)]
70mod test {
71  use crate::prelude::*;
72
73  #[test]
74  fn basic() {
75    let o = Observable::create(|s| {
76      s.next(0);
77      s.next(0);
78      s.next(1);
79      s.next(1);
80      s.next(2);
81      s.next(2);
82      s.next(2);
83      s.next(2);
84      s.next(3);
85      s.complete();
86    });
87
88    o.distinct_until_changed().subscribe(
89      print_next_fmt!("{}"),
90      print_error!(),
91      print_complete!(),
92    );
93  }
94}