use dashmap::DashMap;
use std::{collections::HashSet, hash::Hash, sync::Arc};
use tokio::sync::watch;
use tracing::debug;
use crate::status::StatusType;
#[derive(Clone)]
pub(crate) struct Broadcast<K, V> {
state: Arc<DashMap<K, V>>,
notifier: watch::Sender<StatusType>,
}
impl<K: Eq + Hash + Clone, V: Clone> Broadcast<K, V> {
pub(crate) fn new(capacity: Option<usize>, shard_amount: Option<usize>) -> Self {
let (notifier, _) = watch::channel(StatusType::Spawning);
let state = match (capacity, shard_amount) {
(Some(capacity), Some(shard_amount)) => Arc::new(
DashMap::with_capacity_and_shard_amount(capacity, shard_amount),
),
(Some(capacity), None) => Arc::new(DashMap::with_capacity(capacity)),
(None, Some(shard_amount)) => Arc::new(DashMap::with_shard_amount(shard_amount)),
(None, None) => Arc::new(DashMap::new()),
};
Self { state, notifier }
}
#[allow(dead_code)]
pub(crate) fn insert(&self, key: K, value: V, status: Option<StatusType>) {
self.state.insert(key, value);
if let Some(status) = status {
let _ = self.notifier.send(status);
}
}
#[allow(dead_code)]
pub(crate) fn insert_set(&self, set: Vec<(K, V)>, status: StatusType) {
for (key, value) in set {
self.state.insert(key, value);
}
let _ = self.notifier.send(status);
}
pub(crate) fn insert_filtered_set(&self, set: Vec<(K, V)>, status: StatusType) {
for (key, value) in set {
if self.state.get(&key).is_none() {
self.state.insert(key, value);
}
}
let _ = self.notifier.send(status);
}
#[allow(dead_code)]
pub(crate) fn remove(&self, key: &K, status: Option<StatusType>) {
self.state.remove(key);
if let Some(status) = status {
let _ = self.notifier.send(status);
}
}
#[allow(dead_code)]
pub(crate) fn get(&self, key: &K) -> Option<Arc<V>> {
self.state
.get(key)
.map(|entry| Arc::new((*entry.value()).clone()))
}
#[allow(dead_code)]
pub(crate) fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
keys.iter()
.filter_map(|key| {
self.state
.get(key)
.map(|entry| (key.clone(), Arc::new((*entry.value()).clone())))
})
.collect()
}
#[allow(dead_code)]
pub(crate) fn contains_key(&self, key: &K) -> bool {
self.state.contains_key(key)
}
pub(crate) fn subscribe(&self) -> watch::Receiver<StatusType> {
self.notifier.subscribe()
}
pub(crate) fn subscriber(&self) -> BroadcastSubscriber<K, V> {
BroadcastSubscriber {
state: self.get_state(),
notifier: self.subscribe(),
}
}
pub(crate) fn get_state(&self) -> Arc<DashMap<K, V>> {
Arc::clone(&self.state)
}
#[allow(dead_code)]
pub(crate) fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
self.state
.iter()
.filter(|entry| !ignore_list.contains(entry.key()))
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}
pub(crate) fn clear(&self) {
self.state.clear();
}
#[allow(dead_code)]
pub(crate) fn len(&self) -> usize {
self.state.len()
}
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.state.is_empty()
}
pub(crate) fn notify(&self, status: StatusType) {
if self.notifier.send(status).is_err() {
debug!("No subscribers are currently listening for updates.");
}
}
}
impl<K: Eq + Hash + Clone, V: Clone> Default for Broadcast<K, V> {
fn default() -> Self {
Self::new(None, None)
}
}
impl<K: Eq + Hash + Clone + std::fmt::Debug, V: Clone + std::fmt::Debug> std::fmt::Debug
for Broadcast<K, V>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state_contents: Vec<_> = self
.state
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();
f.debug_struct("Broadcast")
.field("state", &state_contents)
.field("notifier", &"watch::Sender<StatusType>")
.finish()
}
}
#[derive(Clone)]
pub(crate) struct BroadcastSubscriber<K, V> {
state: Arc<DashMap<K, V>>,
notifier: watch::Receiver<StatusType>,
}
impl<K: Eq + Hash + Clone, V: Clone> BroadcastSubscriber<K, V> {
pub(crate) async fn wait_on_notifier(&mut self) -> Result<StatusType, watch::error::RecvError> {
self.notifier.changed().await?;
let status = *self.notifier.borrow();
Ok(status)
}
#[allow(dead_code)]
pub(crate) fn get(&self, key: &K) -> Option<Arc<V>> {
self.state
.get(key)
.map(|entry| Arc::new((*entry.value()).clone()))
}
#[allow(dead_code)]
pub(crate) fn get_set(&self, keys: &[K]) -> Vec<(K, Arc<V>)> {
keys.iter()
.filter_map(|key| {
self.state
.get(key)
.map(|entry| (key.clone(), Arc::new((*entry.value()).clone())))
})
.collect()
}
#[allow(dead_code)]
pub(crate) fn contains_key(&self, key: &K) -> bool {
self.state.contains_key(key)
}
pub(crate) fn get_filtered_state(&self, ignore_list: &HashSet<K>) -> Vec<(K, V)> {
self.state
.iter()
.filter(|entry| !ignore_list.contains(entry.key()))
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect()
}
#[allow(dead_code)]
pub(crate) fn len(&self) -> usize {
self.state.len()
}
#[allow(dead_code)]
pub(crate) fn is_empty(&self) -> bool {
self.state.is_empty()
}
}
impl<K: Eq + Hash + Clone + std::fmt::Debug, V: Clone + std::fmt::Debug> std::fmt::Debug
for BroadcastSubscriber<K, V>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state_contents: Vec<_> = self
.state
.iter()
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect();
f.debug_struct("Broadcast")
.field("state", &state_contents)
.field("notifier", &"watch::Sender<StatusType>")
.finish()
}
}