use std::{
marker::PhantomData,
sync::{Arc, Mutex},
};
use super::CellValue;
use crate::{
pipeline::{
Definite, Empty, MaterializeDefinite, MaterializeEmpty, Pipeline, PipelineInstall,
PipelineSeed, Seedness,
},
signal::Signal,
subscription::SubscriptionGuard,
};
pub struct DistinctUntilChangedByPipeline<S, T, F, Sd = Definite> {
source: S,
comparator: Arc<F>,
_t: PhantomData<fn(T)>,
_sd: PhantomData<fn(Sd)>,
}
impl<S, T, F, Sd> PipelineInstall<T> for DistinctUntilChangedByPipeline<S, T, F, Sd>
where
S: PipelineInstall<T> + PipelineSeed<T> + Send + Sync + 'static,
Sd: Seedness,
T: CellValue,
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
fn install(&self, callback: Arc<dyn Fn(&Signal<T>) + Send + Sync>) -> SubscriptionGuard {
let comparator = Arc::clone(&self.comparator);
let last_value: Arc<Mutex<T>> = Arc::new(Mutex::new(self.source.seed()));
let wrapped: Arc<dyn Fn(&Signal<T>) + Send + Sync> =
Arc::new(move |signal: &Signal<T>| match signal {
Signal::Value(v) => {
let mut last = last_value
.lock()
.expect("distinct_until_changed_by poisoned");
if !(comparator)(v.as_ref(), &*last) {
*last = (**v).clone();
drop(last);
callback(signal);
}
}
Signal::Complete => callback(&Signal::Complete),
Signal::Error(e) => callback(&Signal::Error(e.clone())),
});
self.source.install(wrapped)
}
}
impl<S, T, F> PipelineSeed<T> for DistinctUntilChangedByPipeline<S, T, F, Definite>
where
S: PipelineSeed<T>,
T: CellValue,
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
fn seed(&self) -> T {
self.source.seed()
}
}
#[allow(private_bounds)]
impl<S, T, F, Sd> Pipeline<T, Sd> for DistinctUntilChangedByPipeline<S, T, F, Sd>
where
S: Pipeline<T, Sd> + PipelineSeed<T>,
Sd: Seedness,
T: CellValue,
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
}
impl<S, T, F> MaterializeDefinite<T> for DistinctUntilChangedByPipeline<S, T, F, Definite>
where
S: Pipeline<T, Definite> + PipelineSeed<T>,
T: CellValue,
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
}
impl<S, T, F> MaterializeEmpty<T> for DistinctUntilChangedByPipeline<S, T, F, Empty>
where
S: Pipeline<T, Empty> + PipelineSeed<T>,
T: CellValue,
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
}
#[allow(private_bounds)]
pub trait DistinctUntilChangedByExt<T: CellValue, S: Seedness>:
Pipeline<T, S> + PipelineSeed<T>
{
#[track_caller]
fn distinct_until_changed_by<F>(
self,
comparator: F,
) -> DistinctUntilChangedByPipeline<Self, T, F, S>
where
F: Fn(&T, &T) -> bool + Send + Sync + 'static,
{
DistinctUntilChangedByPipeline {
source: self,
comparator: Arc::new(comparator),
_t: PhantomData,
_sd: PhantomData,
}
}
}
impl<T: CellValue, S: Seedness, P: Pipeline<T, S> + PipelineSeed<T>> DistinctUntilChangedByExt<T, S>
for P
{
}
#[cfg(test)]
mod tests {
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use super::*;
use crate::{Cell, MaterializeDefinite, Mutable, traits::Watchable};
#[derive(Clone, Debug, PartialEq)]
struct User {
id: u32,
#[allow(dead_code)]
name: String,
}
#[test]
fn test_distinct_until_changed_by() {
let source = Cell::new(User {
id: 1,
name: "Alice".into(),
});
let by_id = source
.clone()
.distinct_until_changed_by(|a, b| a.id == b.id)
.materialize();
let count = Arc::new(AtomicU64::new(0));
let c = count.clone();
let _guard = by_id.subscribe(move |_| {
c.fetch_add(1, Ordering::SeqCst);
});
assert_eq!(count.load(Ordering::SeqCst), 1);
source.set(User {
id: 1,
name: "Alicia".into(),
});
assert_eq!(count.load(Ordering::SeqCst), 1);
source.set(User {
id: 2,
name: "Bob".into(),
});
assert_eq!(count.load(Ordering::SeqCst), 2);
source.set(User {
id: 2,
name: "Robert".into(),
});
assert_eq!(count.load(Ordering::SeqCst), 2);
}
}