use dashmap::DashMap;
use std::sync::Arc;
use crate::common::utils::{Mutex, YieldNow};
use crate::FeedDiscoveryKey;
use super::{Feed, FeedPersistence};
pub(crate) type FeedMap<U> = Arc<DashMap<FeedDiscoveryKey, Arc<Mutex<Feed<U>>>>>;
pub(crate) const SHUTDOWN_SIGNAL_NAME: &str = "shutdown";
pub(crate) async fn get_feed<U>(
feeds: &FeedMap<U>,
discovery_key: &FeedDiscoveryKey,
) -> Option<Arc<Mutex<Feed<U>>>>
where
U: FeedPersistence,
{
loop {
match feeds.try_get(discovery_key) {
dashmap::try_result::TryResult::Absent => {
return None;
}
dashmap::try_result::TryResult::Locked => {
YieldNow(false).await;
}
dashmap::try_result::TryResult::Present(value) => {
return Some(value.clone());
}
}
}
}
pub(crate) async fn insert_feed<U>(
feeds: &FeedMap<U>,
feed: Feed<U>,
discovery_key: &FeedDiscoveryKey,
) -> bool
where
U: FeedPersistence,
{
loop {
if let Some(entry) = feeds.try_entry(*discovery_key) {
match entry {
dashmap::mapref::entry::Entry::Occupied(_) => {
break false;
}
dashmap::mapref::entry::Entry::Vacant(vacant) => {
vacant.insert(Arc::new(Mutex::new(feed)));
break true;
}
}
} else {
YieldNow(false).await;
}
}
}
pub(crate) async fn remove_feed<U>(
feeds: &FeedMap<U>,
discovery_key: &FeedDiscoveryKey,
) -> Option<Arc<Mutex<Feed<U>>>>
where
U: FeedPersistence,
{
loop {
if let Some(entry) = feeds.try_entry(*discovery_key) {
match entry {
dashmap::mapref::entry::Entry::Occupied(value) => {
break Some(value.remove());
}
dashmap::mapref::entry::Entry::Vacant(_) => {
break None;
}
}
} else {
YieldNow(false).await;
}
}
}
pub(crate) async fn get_feed_discovery_keys<U>(feeds: &FeedMap<U>) -> Vec<[u8; 32]>
where
U: FeedPersistence,
{
feeds.iter().map(|multi| *multi.key()).collect()
}