use std::{hash::Hash, sync::Arc};
use dashmap::{DashMap, Entry};
use tokio::sync::Mutex;
use tokio::sync::broadcast::{Sender, channel};
use crate::types::throttled::Throttled;
use super::Resolver;
pub type SenderMap<R> =
DashMap<<R as Resolver>::Input, Arc<Mutex<Sender<Option<<R as Resolver>::Output>>>>>;
pub type ThrottledResolver<R> = Throttled<R, SenderMap<R>>;
impl<R> Resolver for Throttled<R, SenderMap<R>>
where
R: Resolver + Send + Sync + 'static,
R::Input: Clone + Hash + Eq + Send + Sync + 'static,
R::Output: Clone + Send + Sync + 'static,
{
type Input = R::Input;
type Output = Option<R::Output>;
type Error = R::Error;
async fn resolve(&self, input: &Self::Input) -> Result<Self::Output, Self::Error> {
match self.pending.entry(input.clone()) {
Entry::Occupied(occupied) => {
let tx = occupied.get().lock().await.clone();
drop(occupied);
Ok(tx.subscribe().recv().await.expect("recv"))
}
Entry::Vacant(vacant) => {
let (tx, _) = channel(1);
vacant.insert(Arc::new(Mutex::new(tx.clone())));
let result = self.inner.resolve(input).await;
tx.send(result.as_ref().ok().cloned()).ok();
self.pending.remove(input);
result.map(Some)
}
}
}
}