sc_consensus/import_queue/
buffered_link.rs1use crate::import_queue::{JustificationImportResult, Link, RuntimeOrigin};
42use futures::prelude::*;
43use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44use sp_runtime::traits::{Block as BlockT, NumberFor};
45use std::{
46 pin::Pin,
47 task::{Context, Poll},
48};
49
50use super::BlockImportResult;
51
52pub fn buffered_link<B: BlockT>(
56 queue_size_warning: usize,
57) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
58 let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
59 let tx = BufferedLinkSender { tx };
60 let rx = BufferedLinkReceiver { rx: rx.fuse() };
61 (tx, rx)
62}
63
64pub struct BufferedLinkSender<B: BlockT> {
66 tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
67}
68
69impl<B: BlockT> BufferedLinkSender<B> {
70 pub fn is_closed(&self) -> bool {
74 self.tx.is_closed()
75 }
76}
77
78impl<B: BlockT> Clone for BufferedLinkSender<B> {
79 fn clone(&self) -> Self {
80 BufferedLinkSender { tx: self.tx.clone() }
81 }
82}
83
84pub enum BlockImportWorkerMsg<B: BlockT> {
86 BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
87 JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, JustificationImportResult),
88 RequestJustification(B::Hash, NumberFor<B>),
89}
90
91impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
92 fn blocks_processed(
93 &self,
94 imported: usize,
95 count: usize,
96 results: Vec<(BlockImportResult<B>, B::Hash)>,
97 ) {
98 let _ = self
99 .tx
100 .unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
101 }
102
103 fn justification_imported(
104 &self,
105 who: RuntimeOrigin,
106 hash: &B::Hash,
107 number: NumberFor<B>,
108 import_result: JustificationImportResult,
109 ) {
110 let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, import_result);
111 let _ = self.tx.unbounded_send(msg);
112 }
113
114 fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
115 let _ = self
116 .tx
117 .unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
118 }
119}
120
121pub struct BufferedLinkReceiver<B: BlockT> {
123 rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
124}
125
126impl<B: BlockT> BufferedLinkReceiver<B> {
127 pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &dyn Link<B>) {
129 match msg {
130 BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => {
131 link.blocks_processed(imported, count, results)
132 },
133 BlockImportWorkerMsg::JustificationImported(who, hash, number, import_result) => {
134 link.justification_imported(who, &hash, number, import_result)
135 },
136 BlockImportWorkerMsg::RequestJustification(hash, number) => {
137 link.request_justification(&hash, number)
138 },
139 }
140 }
141
142 pub fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) -> Result<(), ()> {
151 loop {
152 let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
153 Poll::Ready(Some(msg)) => msg,
154 Poll::Ready(None) => break Err(()),
155 Poll::Pending => break Ok(()),
156 };
157
158 self.send_actions(msg, link);
159 }
160 }
161
162 pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
164 if let Some(msg) = self.rx.next().await {
165 self.send_actions(msg, link);
166 return Ok(());
167 }
168 Err(())
169 }
170
171 pub fn close(&mut self) -> bool {
173 self.rx.get_mut().close()
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use sp_test_primitives::Block;
180
181 #[test]
182 fn is_closed() {
183 let (tx, rx) = super::buffered_link::<Block>(1);
184 assert!(!tx.is_closed());
185 drop(rx);
186 assert!(tx.is_closed());
187 }
188}