use std::borrow::Borrow;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{BuildHasher, Hash, RandomState};
use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
use papaya::{HashMap, ResizeMode};
use tokio::sync::Notify;
#[derive(Debug)]
pub struct UnregisteredTask<K>(K);
impl<K: Display> Display for UnregisteredTask<K> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Attempted to wait on an unregistered task: {}", self.0)
}
}
impl<K: Debug + Display> std::error::Error for UnregisteredTask<K> {}
pub struct OnceMap<K, V, S = RandomState> {
items: HashMap<K, Value<V>, S>,
}
impl<K: Eq + Hash + Debug, V: Debug, S: BuildHasher + Clone> Debug for OnceMap<K, V, S> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
Debug::fmt(&self.items, f)
}
}
impl<K: Eq + Hash + Clone, V: Clone, H: BuildHasher + Clone> OnceMap<K, V, H> {
pub fn register(&self, key: K) -> bool {
self.items
.pin()
.try_insert(key, Value::Waiting(Arc::new(Notify::new())))
.is_ok()
}
pub async fn register_or_wait(&self, key: &K) -> Option<V> {
let notify = {
let items = self.items.pin();
match items.try_insert_with(key.clone(), || Value::Waiting(Arc::new(Notify::new()))) {
Ok(_) => return None,
Err(value) => match value {
Value::Filled(_) => return value.get(),
Value::Waiting(notify) => notify.clone(),
},
}
};
let notification = notify.notified();
if let Some(value) = self.items.pin().get(key).expect("map is append-only").get() {
return Some(value);
}
notification.await;
let items = self.items.pin();
let value = items.get(key).expect("map is append-only");
match value {
Value::Filled(_) => value.get(),
Value::Waiting(_) => unreachable!("notify was called"),
}
}
pub fn done(&self, key: K, value: V) {
if let Some(Value::Waiting(notify)) = self.items.pin().insert(key, Value::filled(value)) {
notify.notify_waiters();
}
}
pub async fn wait(&self, key: &K) -> Result<V, UnregisteredTask<K>> {
self.register_or_wait(key)
.await
.ok_or_else(|| UnregisteredTask(key.clone()))
}
pub fn wait_blocking(&self, key: &K) -> Result<V, UnregisteredTask<K>> {
futures::executor::block_on(self.register_or_wait(key))
.ok_or_else(|| UnregisteredTask(key.clone()))
}
pub fn get<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
{
let items = self.items.pin();
items.get(key)?.get()
}
pub fn remove<Q: ?Sized + Hash + Eq>(&self, key: &Q) -> Option<V>
where
K: Borrow<Q>,
{
let items = self.items.pin();
items.remove(key)?.take()
}
}
impl<K: Eq + Hash + Clone, V, H: Default + BuildHasher + Clone> Default for OnceMap<K, V, H> {
fn default() -> Self {
Self {
items: HashMap::builder()
.hasher(H::default())
.resize_mode(ResizeMode::Blocking)
.build(),
}
}
}
impl<K, V, H> FromIterator<(K, V)> for OnceMap<K, V, H>
where
K: Eq + Hash,
H: Default + Clone + BuildHasher,
{
fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
Self {
items: iter
.into_iter()
.map(|(k, v)| (k, Value::filled(v)))
.collect(),
}
}
}
#[derive(Debug)]
enum Value<V> {
Waiting(Arc<Notify>),
Filled(Mutex<Option<V>>),
}
impl<V> Value<V> {
fn filled(value: V) -> Self {
Self::Filled(Mutex::new(Some(value)))
}
fn lock(value: &Mutex<Option<V>>) -> MutexGuard<'_, Option<V>> {
value.lock().unwrap_or_else(PoisonError::into_inner)
}
fn take(&self) -> Option<V> {
match self {
Self::Filled(value) => Self::lock(value).take(),
Self::Waiting(_) => None,
}
}
}
impl<V: Clone> Value<V> {
fn get(&self) -> Option<V> {
match self {
Self::Filled(value) => Self::lock(value).clone(),
Self::Waiting(_) => None,
}
}
}