use std::{collections::HashMap, future::Future};
use tokio::sync::broadcast;
use tower::BoxError;
use zebra_chain::transparent;
use zebra_node_services::mempool::Response;
#[derive(Debug, Default)]
pub struct PendingOutputs(HashMap<transparent::OutPoint, broadcast::Sender<transparent::Output>>);
impl PendingOutputs {
pub fn queue(
&mut self,
outpoint: transparent::OutPoint,
) -> impl Future<Output = Result<Response, BoxError>> {
let mut receiver = self
.0
.entry(outpoint)
.or_insert_with(|| {
let (sender, _) = broadcast::channel(1);
sender
})
.subscribe();
async move {
receiver
.recv()
.await
.map(Response::UnspentOutput)
.map_err(BoxError::from)
}
}
#[inline]
pub fn respond(&mut self, outpoint: &transparent::OutPoint, output: transparent::Output) {
if let Some(sender) = self.0.remove(outpoint) {
tracing::trace!(?outpoint, "found pending mempool output");
let _ = sender.send(output);
}
}
pub fn prune(&mut self) {
self.0.retain(|_, chan| chan.receiver_count() > 0);
}
pub fn clear(&mut self) {
self.0.clear();
}
}