sc_consensus/import_queue/
buffered_link.rs1use crate::import_queue::{Link, RuntimeOrigin};
42use futures::prelude::*;
43use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44use sp_runtime::traits::{Block as BlockT, NumberFor};
45use std::{
46 pin::Pin,
47 task::{Context, Poll},
48};
49
50use super::BlockImportResult;
51
52pub fn buffered_link<B: BlockT>(
56 queue_size_warning: usize,
57) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
58 let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
59 let tx = BufferedLinkSender { tx };
60 let rx = BufferedLinkReceiver { rx: rx.fuse() };
61 (tx, rx)
62}
63
64pub struct BufferedLinkSender<B: BlockT> {
66 tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
67}
68
69impl<B: BlockT> BufferedLinkSender<B> {
70 pub fn is_closed(&self) -> bool {
74 self.tx.is_closed()
75 }
76}
77
78impl<B: BlockT> Clone for BufferedLinkSender<B> {
79 fn clone(&self) -> Self {
80 BufferedLinkSender { tx: self.tx.clone() }
81 }
82}
83
84pub enum BlockImportWorkerMsg<B: BlockT> {
86 BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
87 JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, bool),
88 RequestJustification(B::Hash, NumberFor<B>),
89}
90
91impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
92 fn blocks_processed(
93 &self,
94 imported: usize,
95 count: usize,
96 results: Vec<(BlockImportResult<B>, B::Hash)>,
97 ) {
98 let _ = self
99 .tx
100 .unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
101 }
102
103 fn justification_imported(
104 &self,
105 who: RuntimeOrigin,
106 hash: &B::Hash,
107 number: NumberFor<B>,
108 success: bool,
109 ) {
110 let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, success);
111 let _ = self.tx.unbounded_send(msg);
112 }
113
114 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
115 let _ = self
116 .tx
117 .unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
118 }
119}
120
121pub struct BufferedLinkReceiver<B: BlockT> {
123 rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
124}
125
126impl<B: BlockT> BufferedLinkReceiver<B> {
127 pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &dyn Link<B>) {
129 match msg {
130 BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
131 link.blocks_processed(imported, count, results),
132 BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
133 link.justification_imported(who, &hash, number, success),
134 BlockImportWorkerMsg::RequestJustification(hash, number) =>
135 link.request_justification(&hash, number),
136 }
137 }
138
139 pub fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) -> Result<(), ()> {
148 loop {
149 let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
150 Poll::Ready(Some(msg)) => msg,
151 Poll::Ready(None) => break Err(()),
152 Poll::Pending => break Ok(()),
153 };
154
155 self.send_actions(msg, link);
156 }
157 }
158
159 pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
161 if let Some(msg) = self.rx.next().await {
162 self.send_actions(msg, link);
163 return Ok(())
164 }
165 Err(())
166 }
167
168 pub fn close(&mut self) -> bool {
170 self.rx.get_mut().close()
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use sp_test_primitives::Block;
177
178 #[test]
179 fn is_closed() {
180 let (tx, rx) = super::buffered_link::<Block>(1);
181 assert!(!tx.is_closed());
182 drop(rx);
183 assert!(tx.is_closed());
184 }
185}