use futures::Stream;
use soil_client::transaction_pool::TransactionStatus;
use soil_client::utils::mpsc::{
tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
};
#[derive(Debug)]
pub struct Watcher<H, BH> {
receiver: TracingUnboundedReceiver<TransactionStatus<H, BH>>,
hash: H,
}
impl<H, BH> Watcher<H, BH> {
pub fn hash(&self) -> &H {
&self.hash
}
pub fn into_stream(self) -> impl Stream<Item = TransactionStatus<H, BH>> {
self.receiver
}
}
#[derive(Debug)]
pub struct Sender<H, BH> {
receivers: Vec<TracingUnboundedSender<TransactionStatus<H, BH>>>,
is_finalized: bool,
}
impl<H, BH> Default for Sender<H, BH> {
fn default() -> Self {
Sender { receivers: Default::default(), is_finalized: false }
}
}
impl<H: Clone, BH: Clone> Sender<H, BH> {
pub fn new_watcher(&mut self, hash: H) -> Watcher<H, BH> {
let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher", 100_000);
self.receivers.push(tx);
Watcher { receiver, hash }
}
pub fn ready(&mut self) {
self.send(TransactionStatus::Ready)
}
pub fn future(&mut self) {
self.send(TransactionStatus::Future)
}
pub fn usurped(&mut self, hash: H) {
self.send(TransactionStatus::Usurped(hash));
self.is_finalized = true;
}
pub fn in_block(&mut self, hash: BH, index: usize) {
self.send(TransactionStatus::InBlock((hash, index)));
}
pub fn finalized(&mut self, hash: BH, index: usize) {
self.send(TransactionStatus::Finalized((hash, index)));
self.is_finalized = true;
}
pub fn finality_timeout(&mut self, hash: BH) {
self.send(TransactionStatus::FinalityTimeout(hash));
self.is_finalized = true;
}
pub fn retracted(&mut self, hash: BH) {
self.send(TransactionStatus::Retracted(hash));
}
pub fn invalid(&mut self) {
self.send(TransactionStatus::Invalid);
self.is_finalized = true;
}
pub fn limit_enforced(&mut self) {
self.send(TransactionStatus::Dropped);
self.is_finalized = true;
}
pub fn dropped(&mut self) {
self.send(TransactionStatus::Dropped);
self.is_finalized = true;
}
pub fn broadcast(&mut self, peers: Vec<String>) {
self.send(TransactionStatus::Broadcast(peers))
}
pub fn is_done(&self) -> bool {
self.is_finalized || self.receivers.is_empty()
}
fn send(&mut self, status: TransactionStatus<H, BH>) {
self.receivers.retain(|sender| sender.unbounded_send(status.clone()).is_ok())
}
}