use crate::import_queue::{JustificationImportResult, Link, RuntimeOrigin};
use futures::prelude::*;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
use sp_runtime::traits::{Block as BlockT, NumberFor};
use std::{
pin::Pin,
task::{Context, Poll},
};
use super::BlockImportResult;
pub fn buffered_link<B: BlockT>(
queue_size_warning: usize,
) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
let tx = BufferedLinkSender { tx };
let rx = BufferedLinkReceiver { rx: rx.fuse() };
(tx, rx)
}
pub struct BufferedLinkSender<B: BlockT> {
tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
}
impl<B: BlockT> BufferedLinkSender<B> {
pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}
}
impl<B: BlockT> Clone for BufferedLinkSender<B> {
fn clone(&self) -> Self {
BufferedLinkSender { tx: self.tx.clone() }
}
}
pub enum BlockImportWorkerMsg<B: BlockT> {
BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, JustificationImportResult),
RequestJustification(B::Hash, NumberFor<B>),
}
impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
fn blocks_processed(
&self,
imported: usize,
count: usize,
results: Vec<(BlockImportResult<B>, B::Hash)>,
) {
let _ = self
.tx
.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
}
fn justification_imported(
&self,
who: RuntimeOrigin,
hash: &B::Hash,
number: NumberFor<B>,
import_result: JustificationImportResult,
) {
let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, import_result);
let _ = self.tx.unbounded_send(msg);
}
fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
let _ = self
.tx
.unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
}
}
pub struct BufferedLinkReceiver<B: BlockT> {
rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
}
impl<B: BlockT> BufferedLinkReceiver<B> {
pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &dyn Link<B>) {
match msg {
BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => {
link.blocks_processed(imported, count, results)
},
BlockImportWorkerMsg::JustificationImported(who, hash, number, import_result) => {
link.justification_imported(who, &hash, number, import_result)
},
BlockImportWorkerMsg::RequestJustification(hash, number) => {
link.request_justification(&hash, number)
},
}
}
pub fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) -> Result<(), ()> {
loop {
let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
Poll::Ready(Some(msg)) => msg,
Poll::Ready(None) => break Err(()),
Poll::Pending => break Ok(()),
};
self.send_actions(msg, link);
}
}
pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
if let Some(msg) = self.rx.next().await {
self.send_actions(msg, link);
return Ok(());
}
Err(())
}
pub fn close(&mut self) -> bool {
self.rx.get_mut().close()
}
}
#[cfg(test)]
mod tests {
use sp_test_primitives::Block;
#[test]
fn is_closed() {
let (tx, rx) = super::buffered_link::<Block>(1);
assert!(!tx.is_closed());
drop(rx);
assert!(tx.is_closed());
}
}