use {
crate::protocol::jsonrpc::{
Id,
Response,
},
dashmap::DashMap,
std::{
fmt::{
self,
Debug,
Formatter,
},
sync::Arc,
},
};
pub struct ResponsePending(Arc<DashMap<Id, async_channel::Sender<Response>>>);
impl ResponsePending {
pub fn new() -> Self {
ResponsePending(Arc::new(DashMap::new()))
}
pub fn wait(&self, id: Id) -> async_channel::Receiver<Response> {
let (tx, rx) = async_channel::bounded(1);
self.0.insert(id, tx);
rx
}
pub fn insert(&self, response: Response) {
if let Some((_, tx)) = self.0.remove(response.id()) {
let id = response.id().clone();
if let Err(e) = tx.try_send(response) {
otel::error!(
"response_delivery_failed",
format!(
"CRITICAL: Failed to deliver response for request {}: {:?}. \
Request will hang indefinitely.",
id, e
)
);
}
}
}
pub fn close_all(&self) {
for entry in self.0.iter() {
entry.value().close();
}
self.0.clear();
}
}
impl Debug for ResponsePending {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_set()
.entries(self.0.iter().map(|entry| entry.key().clone()))
.finish()
}
}