use std::{
marker::PhantomData,
sync::{Arc, Mutex},
};
use super::CellValue;
use crate::{
pipeline::{Empty, MaterializeEmpty, Pipeline, PipelineInstall, PipelineSeed, Seedness},
signal::Signal,
subscription::SubscriptionGuard,
};
pub struct PairwisePipeline<S, T, Sd = crate::pipeline::Definite> {
source: S,
_t: PhantomData<fn(T)>,
_sd: PhantomData<fn(Sd)>,
}
impl<S, T, Sd> PipelineInstall<(T, T)> for PairwisePipeline<S, T, Sd>
where
S: PipelineInstall<T> + PipelineSeed<T> + Send + Sync + 'static,
Sd: Seedness,
T: CellValue,
{
fn install(&self, callback: Arc<dyn Fn(&Signal<(T, T)>) + Send + Sync>) -> SubscriptionGuard {
let last: Arc<Mutex<T>> = Arc::new(Mutex::new(self.source.seed()));
let saw_first = Arc::new(std::sync::atomic::AtomicBool::new(false));
let wrapped: Arc<dyn Fn(&Signal<T>) + Send + Sync> = Arc::new(move |signal: &Signal<T>| {
match signal {
Signal::Value(v) => {
if !saw_first.swap(true, std::sync::atomic::Ordering::SeqCst) {
*last.lock().expect("pairwise poisoned") = (**v).clone();
return;
}
let prev = {
let mut guard = last.lock().expect("pairwise poisoned");
let prev = guard.clone();
*guard = (**v).clone();
prev
};
callback(&Signal::value((prev, v.as_ref().clone())));
}
Signal::Complete => callback(&Signal::Complete),
Signal::Error(e) => callback(&Signal::Error(e.clone())),
}
});
self.source.install(wrapped)
}
}
#[allow(private_bounds)]
impl<S, T, Sd> Pipeline<(T, T), Empty> for PairwisePipeline<S, T, Sd>
where
S: Pipeline<T, Sd> + PipelineSeed<T>,
Sd: Seedness,
T: CellValue,
{
}
impl<S, T, Sd> MaterializeEmpty<(T, T)> for PairwisePipeline<S, T, Sd>
where
S: Pipeline<T, Sd> + PipelineSeed<T>,
Sd: Seedness,
T: CellValue,
{
}
#[allow(private_bounds)]
pub trait PairwiseExt<T: CellValue, S: Seedness>: Pipeline<T, S> + PipelineSeed<T> {
#[track_caller]
fn pairwise(self) -> PairwisePipeline<Self, T, S> {
PairwisePipeline {
source: self,
_t: PhantomData,
_sd: PhantomData,
}
}
}
impl<T: CellValue, S: Seedness, P: Pipeline<T, S> + PipelineSeed<T>> PairwiseExt<T, S> for P {}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Cell, Gettable, MaterializeEmpty, Mutable};
#[test]
fn test_pairwise_emits_pairs() {
let source = Cell::new(1u64);
let pairs = source.clone().pairwise().materialize();
assert_eq!(pairs.get(), None);
source.set(2);
assert_eq!(pairs.get(), Some((1, 2)));
source.set(3);
assert_eq!(pairs.get(), Some((2, 3)));
}
}