use std::{collections::hash_set::Iter, hash::Hash, marker::PhantomData};
use futures::stream::LocalBoxStream;
use crate::subscribers_store::{
common, progressable,
progressable::{AllProcessed, Processed},
SubscribersStore,
};
pub type ProgressableHashSet<T> =
HashSet<T, progressable::SubStore<T>, progressable::Guarded<T>>;
pub type ObservableHashSet<T> = HashSet<T, common::SubStore<T>, T>;
#[derive(Debug)]
pub struct HashSet<T, S: SubscribersStore<T, O>, O> {
store: std::collections::HashSet<T>,
on_insert_subs: S,
on_remove_subs: S,
_output: PhantomData<O>,
}
impl<T> ProgressableHashSet<T>
where
T: Clone + 'static,
{
#[inline]
pub fn when_insert_processed(&self) -> Processed<'static> {
self.on_insert_subs.when_all_processed()
}
#[inline]
pub fn when_remove_processed(&self) -> Processed<'static> {
self.on_remove_subs.when_all_processed()
}
#[inline]
pub fn when_all_processed(&self) -> AllProcessed<'static> {
crate::when_all_processed(vec![
self.when_remove_processed().into(),
self.when_insert_processed().into(),
])
}
}
impl<T, S: SubscribersStore<T, O>, O> HashSet<T, S, O> {
#[inline]
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn iter(&self) -> impl Iterator<Item = &T> {
self.into_iter()
}
#[inline]
#[must_use]
pub fn on_insert(&self) -> LocalBoxStream<'static, O> {
self.on_insert_subs.subscribe()
}
#[inline]
#[must_use]
pub fn on_remove(&self) -> LocalBoxStream<'static, O> {
self.on_remove_subs.subscribe()
}
}
impl<T, S, O> HashSet<T, S, O>
where
T: Clone + 'static,
S: SubscribersStore<T, O>,
O: 'static,
{
#[inline]
pub fn replay_on_insert(&self) -> LocalBoxStream<'static, O> {
Box::pin(futures::stream::iter(
self.store
.clone()
.into_iter()
.map(|val| self.on_insert_subs.wrap(val))
.collect::<Vec<_>>(),
))
}
}
impl<T, S, O> HashSet<T, S, O>
where
T: Clone + Hash + Eq + 'static,
S: SubscribersStore<T, O>,
{
pub fn insert(&mut self, value: T) -> bool {
if self.store.insert(value.clone()) {
self.on_insert_subs.send_update(value);
true
} else {
false
}
}
pub fn remove(&mut self, value: &T) -> Option<T> {
let value = self.store.take(value);
if let Some(value) = &value {
self.on_remove_subs.send_update(value.clone());
}
value
}
pub fn update(&mut self, updated: std::collections::HashSet<T>) {
let removed_elems = self.store.difference(&updated);
let inserted_elems = updated.difference(&self.store);
for removed_elem in removed_elems {
self.on_remove_subs.send_update(removed_elem.clone());
}
for inserted_elem in inserted_elems {
self.on_insert_subs.send_update(inserted_elem.clone());
}
self.store = updated;
}
#[inline]
#[must_use]
pub fn contains(&self, value: &T) -> bool {
self.store.contains(value)
}
}
impl<T, S, O> Default for HashSet<T, S, O>
where
S: SubscribersStore<T, O>,
{
#[inline]
fn default() -> Self {
Self {
store: std::collections::HashSet::new(),
on_insert_subs: S::default(),
on_remove_subs: S::default(),
_output: PhantomData::default(),
}
}
}
impl<'a, T, S: SubscribersStore<T, O>, O> IntoIterator
for &'a HashSet<T, S, O>
{
type IntoIter = Iter<'a, T>;
type Item = &'a T;
#[inline]
fn into_iter(self) -> Self::IntoIter {
self.store.iter()
}
}
impl<T, S, O> Drop for HashSet<T, S, O>
where
S: SubscribersStore<T, O>,
{
fn drop(&mut self) {
let store = &mut self.store;
let on_remove_subs = &self.on_remove_subs;
store.drain().for_each(|value| {
on_remove_subs.send_update(value);
});
}
}
impl<T, S, O> From<std::collections::HashSet<T>> for HashSet<T, S, O>
where
S: SubscribersStore<T, O>,
{
#[inline]
fn from(from: std::collections::HashSet<T>) -> Self {
Self {
store: from,
on_insert_subs: S::default(),
on_remove_subs: S::default(),
_output: PhantomData::default(),
}
}
}