use std::{
marker::PhantomData,
sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering as AtomicOrdering},
},
};
use super::CellValue;
use crate::{
pipeline::{Empty, MaterializeEmpty, Pipeline, PipelineInstall, Seedness},
signal::Signal,
subscription::SubscriptionGuard,
};
pub struct LastPipeline<S, T, Sd = crate::pipeline::Definite> {
source: S,
_t: PhantomData<fn(T)>,
_sd: PhantomData<fn(Sd)>,
}
impl<S, T, Sd> PipelineInstall<T> for LastPipeline<S, T, Sd>
where
S: PipelineInstall<T> + Send + Sync + 'static,
Sd: Seedness,
T: CellValue,
{
fn install(&self, callback: Arc<dyn Fn(&Signal<T>) + Send + Sync>) -> SubscriptionGuard {
let last_value: Arc<Mutex<Option<T>>> = Arc::new(Mutex::new(None));
let first = Arc::new(AtomicBool::new(true));
let wrapped: Arc<dyn Fn(&Signal<T>) + Send + Sync> =
Arc::new(move |signal: &Signal<T>| match signal {
Signal::Value(v) => {
if first.swap(false, AtomicOrdering::SeqCst) {
return;
}
*last_value.lock().expect("last poisoned") = Some(v.as_ref().clone());
}
Signal::Complete => {
let val = last_value.lock().expect("last poisoned").clone();
if let Some(v) = val {
callback(&Signal::value(v));
}
callback(&Signal::Complete);
}
Signal::Error(e) => callback(&Signal::Error(e.clone())),
});
self.source.install(wrapped)
}
}
#[allow(private_bounds)]
impl<S, T, Sd> Pipeline<T, Empty> for LastPipeline<S, T, Sd>
where
S: Pipeline<T, Sd>,
Sd: Seedness,
T: CellValue,
{
}
impl<S, T, Sd> MaterializeEmpty<T> for LastPipeline<S, T, Sd>
where
S: Pipeline<T, Sd>,
Sd: Seedness,
T: CellValue,
{
}
pub struct LastOrPipeline<S, T, Sd = crate::pipeline::Definite> {
source: S,
default: T,
_sd: PhantomData<fn(Sd)>,
}
impl<S, T, Sd> PipelineInstall<T> for LastOrPipeline<S, T, Sd>
where
S: PipelineInstall<T> + Send + Sync + 'static,
Sd: Seedness,
T: CellValue,
{
fn install(&self, callback: Arc<dyn Fn(&Signal<T>) + Send + Sync>) -> SubscriptionGuard {
let last_value: Arc<Mutex<Option<T>>> = Arc::new(Mutex::new(None));
let default = self.default.clone();
let first = Arc::new(AtomicBool::new(true));
let wrapped: Arc<dyn Fn(&Signal<T>) + Send + Sync> =
Arc::new(move |signal: &Signal<T>| match signal {
Signal::Value(v) => {
if first.swap(false, AtomicOrdering::SeqCst) {
return;
}
*last_value.lock().expect("last_or poisoned") = Some(v.as_ref().clone());
}
Signal::Complete => {
let val = last_value.lock().expect("last_or poisoned").clone();
let emit = match val {
Some(v) => v,
None => default.clone(),
};
callback(&Signal::value(emit));
callback(&Signal::Complete);
}
Signal::Error(e) => callback(&Signal::Error(e.clone())),
});
self.source.install(wrapped)
}
}
#[allow(private_bounds)]
impl<S, T, Sd> Pipeline<T, Empty> for LastOrPipeline<S, T, Sd>
where
S: Pipeline<T, Sd>,
Sd: Seedness,
T: CellValue,
{
}
impl<S, T, Sd> MaterializeEmpty<T> for LastOrPipeline<S, T, Sd>
where
S: Pipeline<T, Sd>,
Sd: Seedness,
T: CellValue,
{
}
#[allow(private_bounds)]
pub trait LastExt<T: CellValue, S: Seedness>: Pipeline<T, S> {
#[track_caller]
fn last(self) -> LastPipeline<Self, T, S> {
LastPipeline {
source: self,
_t: PhantomData,
_sd: PhantomData,
}
}
#[track_caller]
fn last_or(self, default: T) -> LastOrPipeline<Self, T, S> {
LastOrPipeline {
source: self,
default,
_sd: PhantomData,
}
}
}
impl<T: CellValue, S: Seedness, P: Pipeline<T, S>> LastExt<T, S> for P {}
#[cfg(test)]
mod tests {
use std::sync::atomic::{AtomicU32, Ordering};
use super::*;
use crate::{Cell, Gettable, MaterializeEmpty, Mutable, traits::Watchable};
#[test]
fn test_last() {
let source = Cell::new(0);
let last = source.clone().last().materialize();
let value_emissions = Arc::new(AtomicU32::new(0));
let ve = value_emissions.clone();
let _guard = last.subscribe(move |signal| {
if let Signal::Value(_) = signal {
ve.fetch_add(1, Ordering::SeqCst);
}
});
assert_eq!(value_emissions.load(Ordering::SeqCst), 1);
source.set(1);
source.set(2);
source.set(3);
assert_eq!(value_emissions.load(Ordering::SeqCst), 1);
source.complete();
assert_eq!(value_emissions.load(Ordering::SeqCst), 2);
assert_eq!(last.get(), Some(3));
}
#[test]
fn test_last_or_with_values() {
let source = Cell::new(0);
let last = source.clone().last_or(999).materialize();
source.set(1);
source.set(2);
source.complete();
assert_eq!(last.get(), Some(2));
}
#[test]
fn test_last_or_without_values() {
let source = Cell::new(0);
let last = source.clone().last_or(999).materialize();
source.complete();
assert_eq!(last.get(), Some(999));
}
}