#![doc(html_root_url = "https://docs.rs/refreshable/1")]
#![warn(clippy::all, missing_docs)]
use arc_swap::ArcSwap;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[cfg(test)]
mod test;
trait Cleanup {}
impl<T, E> Cleanup for Subscription<T, E> {}
struct RawCallback<F: ?Sized> {
_cleanup: Option<Arc<dyn Cleanup + Sync + Send>>,
ok: bool,
callback: F,
}
type Callback<T, E> = RawCallback<dyn FnMut(&T, &mut Vec<E>) + Sync + Send>;
struct Shared<T, E> {
value: ArcSwap<T>,
update_lock: Mutex<()>,
#[allow(clippy::type_complexity)]
callbacks: Mutex<Arc<HashMap<u64, Arc<Mutex<Callback<T, E>>>>>>,
}
pub struct Refreshable<T, E> {
shared: Arc<Shared<T, E>>,
next_id: AtomicU64,
cleanup: Option<Arc<dyn Cleanup + Sync + Send>>,
}
impl<T, E> Refreshable<T, E>
where
T: PartialEq + 'static + Sync + Send,
E: 'static,
{
pub fn new(value: T) -> (Refreshable<T, E>, RefreshHandle<T, E>) {
let shared = Arc::new(Shared {
value: ArcSwap::new(Arc::new(value)),
update_lock: Mutex::new(()),
callbacks: Mutex::new(Arc::new(HashMap::new())),
});
(
Refreshable {
shared: shared.clone(),
next_id: AtomicU64::new(0),
cleanup: None,
},
RefreshHandle { shared },
)
}
#[inline]
pub fn get(&self) -> Guard<'_, T> {
Guard {
inner: self.shared.value.load(),
_p: PhantomData,
}
}
pub fn subscribe<F>(&self, mut callback: F) -> Result<Subscription<T, E>, E>
where
F: FnMut(&T) -> Result<(), E> + 'static + Sync + Send,
{
let _guard = self.shared.update_lock.lock();
callback(&self.get())?;
let subscription = self.subscribe_raw(move |value, errors| {
if let Err(e) = callback(value) {
errors.push(e);
}
});
Ok(subscription)
}
pub fn subscribe_ok<F>(&self, mut callback: F) -> Subscription<T, E>
where
F: FnMut(&T) + 'static + Sync + Send,
{
self.subscribe(move |value| {
callback(value);
Ok(())
})
.ok()
.unwrap()
}
fn subscribe_raw<F>(&self, callback: F) -> Subscription<T, E>
where
F: FnMut(&T, &mut Vec<E>) + 'static + Sync + Send,
{
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let callback = Arc::new(Mutex::new(RawCallback {
_cleanup: self.cleanup.clone(),
ok: true,
callback,
}));
Arc::make_mut(&mut *self.shared.callbacks.lock()).insert(id, callback);
Subscription {
shared: self.shared.clone(),
id,
live: true,
}
}
pub fn map<F, R>(&self, mut map: F) -> Refreshable<R, E>
where
F: FnMut(&T) -> R + 'static + Sync + Send,
R: PartialEq + 'static + Sync + Send,
{
let _guard = self.shared.update_lock.lock();
let (mut refreshable, mut handle) = Refreshable::new(map(&self.get()));
let subscription =
self.subscribe_raw(move |value, errors| handle.refresh_raw(map(value), errors));
refreshable.cleanup = Some(Arc::new(subscription));
refreshable
}
}
#[must_use = "the associated subscription is unregistered when this value is dropped"]
pub struct Subscription<T, E> {
shared: Arc<Shared<T, E>>,
id: u64,
live: bool,
}
impl<T, E> Drop for Subscription<T, E> {
fn drop(&mut self) {
if self.live {
Arc::make_mut(&mut *self.shared.callbacks.lock()).remove(&self.id);
}
}
}
impl<T, E> Subscription<T, E> {
pub fn leak(mut self) {
self.live = false;
}
}
pub struct RefreshHandle<T, E> {
shared: Arc<Shared<T, E>>,
}
impl<T, E> RefreshHandle<T, E>
where
T: PartialEq + 'static + Sync + Send,
{
pub fn refresh(&mut self, new_value: T) -> Result<(), Vec<E>> {
let mut errors = vec![];
self.refresh_raw(new_value, &mut errors);
if errors.is_empty() {
Ok(())
} else {
Err(errors)
}
}
fn refresh_raw(&mut self, new_value: T, errors: &mut Vec<E>) {
let value_changed = new_value != **self.shared.value.load();
let guard = self.shared.update_lock.lock();
self.shared.value.store(Arc::new(new_value));
let value = self.shared.value.load();
let callbacks = self.shared.callbacks.lock().clone();
drop(guard);
for callback in callbacks.values() {
let mut callback = callback.lock();
if value_changed || !callback.ok {
let nerrors = errors.len();
(callback.callback)(&value, errors);
callback.ok = errors.len() == nerrors;
}
}
}
}
pub struct Guard<'a, T> {
inner: arc_swap::Guard<Arc<T>>,
_p: PhantomData<&'a ()>,
}
impl<T> Deref for Guard<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &T {
&*self.inner
}
}