sc_consensus/import_queue/
buffered_link.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Provides the `buffered_link` utility.
20//!
21//! The buffered link is a channel that allows buffering the method calls on `Link`.
22//!
23//! # Example
24//!
25//! ```
26//! use sc_consensus::import_queue::Link;
27//! # use sc_consensus::import_queue::buffered_link::buffered_link;
28//! # use sp_test_primitives::Block;
29//! # struct DummyLink; impl Link<Block> for DummyLink {}
30//! # let my_link = DummyLink;
31//! let (mut tx, mut rx) = buffered_link::<Block>(100_000);
32//! tx.blocks_processed(0, 0, vec![]);
33//!
34//! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled.
35//! let _fut = futures::future::poll_fn(move |cx| {
36//! 	rx.poll_actions(cx, &my_link).unwrap();
37//! 	std::task::Poll::Pending::<()>
38//! });
39//! ```
40
41use crate::import_queue::{Link, RuntimeOrigin};
42use futures::prelude::*;
43use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44use sp_runtime::traits::{Block as BlockT, NumberFor};
45use std::{
46	pin::Pin,
47	task::{Context, Poll},
48};
49
50use super::BlockImportResult;
51
52/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
53/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
54/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size.
55pub fn buffered_link<B: BlockT>(
56	queue_size_warning: usize,
57) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
58	let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
59	let tx = BufferedLinkSender { tx };
60	let rx = BufferedLinkReceiver { rx: rx.fuse() };
61	(tx, rx)
62}
63
64/// See [`buffered_link`].
65pub struct BufferedLinkSender<B: BlockT> {
66	tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
67}
68
69impl<B: BlockT> BufferedLinkSender<B> {
70	/// Returns true if the sender points to nowhere.
71	///
72	/// Once `true` is returned, it is pointless to use the sender anymore.
73	pub fn is_closed(&self) -> bool {
74		self.tx.is_closed()
75	}
76}
77
78impl<B: BlockT> Clone for BufferedLinkSender<B> {
79	fn clone(&self) -> Self {
80		BufferedLinkSender { tx: self.tx.clone() }
81	}
82}
83
84/// Internal buffered message.
85pub enum BlockImportWorkerMsg<B: BlockT> {
86	BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
87	JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, bool),
88	RequestJustification(B::Hash, NumberFor<B>),
89}
90
91impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
92	fn blocks_processed(
93		&self,
94		imported: usize,
95		count: usize,
96		results: Vec<(BlockImportResult<B>, B::Hash)>,
97	) {
98		let _ = self
99			.tx
100			.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
101	}
102
103	fn justification_imported(
104		&self,
105		who: RuntimeOrigin,
106		hash: &B::Hash,
107		number: NumberFor<B>,
108		success: bool,
109	) {
110		let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, success);
111		let _ = self.tx.unbounded_send(msg);
112	}
113
114	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
115		let _ = self
116			.tx
117			.unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
118	}
119}
120
121/// See [`buffered_link`].
122pub struct BufferedLinkReceiver<B: BlockT> {
123	rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
124}
125
126impl<B: BlockT> BufferedLinkReceiver<B> {
127	/// Send action for the synchronization to perform.
128	pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &dyn Link<B>) {
129		match msg {
130			BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
131				link.blocks_processed(imported, count, results),
132			BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
133				link.justification_imported(who, &hash, number, success),
134			BlockImportWorkerMsg::RequestJustification(hash, number) =>
135				link.request_justification(&hash, number),
136		}
137	}
138
139	/// Polls for the buffered link actions. Any enqueued action will be propagated to the link
140	/// passed as parameter.
141	///
142	/// This method should behave in a way similar to `Future::poll`. It can register the current
143	/// task and notify later when more actions are ready to be polled. To continue the comparison,
144	/// it is as if this method always returned `Poll::Pending`.
145	///
146	/// Returns an error if the corresponding [`BufferedLinkSender`] has been closed.
147	pub fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) -> Result<(), ()> {
148		loop {
149			let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
150				Poll::Ready(Some(msg)) => msg,
151				Poll::Ready(None) => break Err(()),
152				Poll::Pending => break Ok(()),
153			};
154
155			self.send_actions(msg, link);
156		}
157	}
158
159	/// Poll next element from import queue and send the corresponding action command over the link.
160	pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
161		if let Some(msg) = self.rx.next().await {
162			self.send_actions(msg, link);
163			return Ok(())
164		}
165		Err(())
166	}
167
168	/// Close the channel.
169	pub fn close(&mut self) -> bool {
170		self.rx.get_mut().close()
171	}
172}
173
174#[cfg(test)]
175mod tests {
176	use sp_test_primitives::Block;
177
178	#[test]
179	fn is_closed() {
180		let (tx, rx) = super::buffered_link::<Block>(1);
181		assert!(!tx.is_closed());
182		drop(rx);
183		assert!(tx.is_closed());
184	}
185}