use crate::{
context::Context,
observable::{CoreObservable, ObservableType},
observer::Observer,
prelude::Subscription,
};
#[derive(Clone)]
pub struct DistinctUntilChanged<S>(pub S);
impl<S> ObservableType for DistinctUntilChanged<S>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
impl<S, C, Unsub> CoreObservable<C> for DistinctUntilChanged<S>
where
C: Context,
S: for<'a> CoreObservable<
C::With<DistinctUntilChangedObserver<C::Inner, <S as ObservableType>::Item<'a>>>,
Unsub = Unsub,
>,
Unsub: Subscription,
{
type Unsub = Unsub;
fn subscribe(self, context: C) -> Self::Unsub {
let observer = context.transform(DistinctUntilChangedObserver::new);
self.0.subscribe(observer)
}
}
pub struct DistinctUntilChangedObserver<O, Item> {
observer: O,
last: Option<Item>,
}
impl<O, Item> DistinctUntilChangedObserver<O, Item> {
pub fn new(observer: O) -> Self { Self { observer, last: None } }
}
impl<O, Item, Err> Observer<Item, Err> for DistinctUntilChangedObserver<O, Item>
where
O: Observer<Item, Err>,
Item: PartialEq + Clone,
{
fn next(&mut self, value: Item) {
if self.last.as_ref() != Some(&value) {
self.last = Some(value.clone());
self.observer.next(value);
}
}
fn error(self, err: Err) { self.observer.error(err); }
fn complete(self) { self.observer.complete(); }
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
#[derive(Clone)]
pub struct DistinctUntilKeyChanged<S, F> {
pub(crate) source: S,
pub(crate) key_selector: F,
}
impl<S, F> ObservableType for DistinctUntilKeyChanged<S, F>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
impl<S, F, C, Key, Unsub> CoreObservable<C> for DistinctUntilKeyChanged<S, F>
where
C: Context,
S: for<'a> CoreObservable<
C::With<DistinctUntilKeyChangedObserver<C::Inner, F, Key>>,
Unsub = Unsub,
>,
Unsub: Subscription,
F: for<'a> Fn(&<S as ObservableType>::Item<'a>) -> Key,
{
type Unsub = Unsub;
fn subscribe(self, context: C) -> Self::Unsub {
let DistinctUntilKeyChanged { source, key_selector } = self;
let observer =
context.transform(|observer| DistinctUntilKeyChangedObserver::new(observer, key_selector));
source.subscribe(observer)
}
}
pub struct DistinctUntilKeyChangedObserver<O, F, Key> {
observer: O,
key_selector: F,
last_key: Option<Key>,
}
impl<O, F, Key> DistinctUntilKeyChangedObserver<O, F, Key> {
pub fn new(observer: O, key_selector: F) -> Self {
Self { observer, key_selector, last_key: None }
}
}
impl<O, F, Key, Item, Err> Observer<Item, Err> for DistinctUntilKeyChangedObserver<O, F, Key>
where
O: Observer<Item, Err>,
F: Fn(&Item) -> Key,
Key: PartialEq,
{
fn next(&mut self, value: Item) {
let key = (self.key_selector)(&value);
if self.last_key.as_ref() != Some(&key) {
self.last_key = Some(key);
self.observer.next(value);
}
}
fn error(self, err: Err) { self.observer.error(err); }
fn complete(self) { self.observer.complete(); }
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test]
fn smoke_distinct_until_changed() {
let x = Rc::new(RefCell::new(vec![]));
let x_c = x.clone();
Local::from_iter(&[1, 2, 2, 1, 2, 3])
.map(|v| v % 5)
.distinct_until_changed()
.subscribe(move |v| x_c.borrow_mut().push(v));
assert_eq!(&*x.borrow(), &[1, 2, 1, 2, 3]);
}
#[rxrust_macro::test]
fn smoke_distinct_until_key_changed() {
let x = Rc::new(RefCell::new(vec![]));
let x_c = x.clone();
Local::from_iter(vec![(1, 2), (2, 2), (2, 1), (1, 1), (2, 2), (3, 2)])
.distinct_until_key_changed(|tup: &(i32, i32)| tup.0)
.subscribe(move |v| x_c.borrow_mut().push(v));
assert_eq!(&*x.borrow(), &[(1, 2), (2, 2), (1, 1), (2, 2), (3, 2)]);
}
#[rxrust_macro::test]
fn distinct_until_key_changed_no_clone() {
#[derive(PartialEq, Debug)]
struct NoClone(i32);
let x = Rc::new(RefCell::new(vec![]));
let x_c = x.clone();
let source = Local::from_iter(vec![NoClone(1), NoClone(2), NoClone(2), NoClone(3)]);
source
.distinct_until_key_changed(|v: &NoClone| v.0)
.subscribe(move |v| x_c.borrow_mut().push(v));
assert_eq!(&*x.borrow(), &[NoClone(1), NoClone(2), NoClone(3)]);
}
}