use std::{collections::HashSet, hash::Hash};
use crate::{
context::Context,
observable::{CoreObservable, ObservableType},
observer::Observer,
subscription::Subscription,
};
#[derive(Clone)]
pub struct Distinct<S>(pub S);
impl<S> ObservableType for Distinct<S>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
impl<S, C, Unsub> CoreObservable<C> for Distinct<S>
where
C: Context,
S: for<'a> CoreObservable<
C::With<DistinctObserver<C::Inner, <S as ObservableType>::Item<'a>>>,
Item<'a>: Eq + Hash + Clone,
Unsub = Unsub,
>,
Unsub: Subscription,
{
type Unsub = Unsub;
fn subscribe(self, context: C) -> Self::Unsub {
let observer = context.transform(DistinctObserver::new);
self.0.subscribe(observer)
}
}
pub struct DistinctObserver<O, Item> {
observer: O,
seen: HashSet<Item>,
}
impl<O, Item> DistinctObserver<O, Item> {
pub fn new(observer: O) -> Self { Self { observer, seen: HashSet::new() } }
}
impl<O, Item, Err> Observer<Item, Err> for DistinctObserver<O, Item>
where
O: Observer<Item, Err>,
Item: Eq + Hash + Clone,
{
fn next(&mut self, value: Item) {
if !self.seen.contains(&value) {
self.seen.insert(value.clone());
self.observer.next(value);
}
}
fn error(self, err: Err) { self.observer.error(err); }
fn complete(self) { self.observer.complete(); }
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
#[derive(Clone)]
pub struct DistinctKey<S, F> {
pub(crate) source: S,
pub(crate) key_selector: F,
}
impl<S, F> ObservableType for DistinctKey<S, F>
where
S: ObservableType,
{
type Item<'a>
= S::Item<'a>
where
Self: 'a;
type Err = S::Err;
}
impl<S, F, C, Key> CoreObservable<C> for DistinctKey<S, F>
where
C: Context,
S: CoreObservable<C::With<DistinctKeyObserver<C::Inner, F, Key>>>,
F: for<'a> Fn(&S::Item<'a>) -> Key,
Key: Eq + Hash + Clone,
{
type Unsub = S::Unsub;
fn subscribe(self, context: C) -> Self::Unsub {
let DistinctKey { source, key_selector } = self;
let observer = context.transform(|observer| DistinctKeyObserver::new(observer, key_selector));
source.subscribe(observer)
}
}
pub struct DistinctKeyObserver<O, F, Key> {
observer: O,
key_selector: F,
seen: HashSet<Key>,
}
impl<O, F, Key> DistinctKeyObserver<O, F, Key> {
pub fn new(observer: O, key_selector: F) -> Self {
Self { observer, key_selector, seen: HashSet::new() }
}
}
impl<O, F, Key, Item, Err> Observer<Item, Err> for DistinctKeyObserver<O, F, Key>
where
O: Observer<Item, Err>,
F: Fn(&Item) -> Key,
Key: Eq + Hash + Clone,
{
fn next(&mut self, value: Item) {
let key = (self.key_selector)(&value);
if !self.seen.contains(&key) {
self.seen.insert(key);
self.observer.next(value);
}
}
fn error(self, err: Err) { self.observer.error(err); }
fn complete(self) { self.observer.complete(); }
fn is_closed(&self) -> bool { self.observer.is_closed() }
}
#[cfg(test)]
mod tests {
use std::{cell::RefCell, rc::Rc};
use crate::prelude::*;
#[rxrust_macro::test]
fn smoke() {
let x = Rc::new(RefCell::new(vec![]));
let x_c = x.clone();
Local::from_iter(0..20)
.map(|v| v % 5)
.distinct()
.subscribe(move |v| x_c.borrow_mut().push(v));
assert_eq!(&*x.borrow(), &[0, 1, 2, 3, 4]);
}
#[rxrust_macro::test]
fn distinct_key() {
let x = Rc::new(RefCell::new(vec![]));
let x_c = x.clone();
Local::from_iter(vec![(1, 2), (2, 2), (2, 1), (1, 1), (2, 2), (3, 2)])
.distinct_key(|tup: &(i32, i32)| tup.0)
.subscribe(move |v| x_c.borrow_mut().push(v));
assert_eq!(&*x.borrow(), &[(1, 2), (2, 2), (3, 2)]);
}
}