Skip to main content

sc_consensus/import_queue/
buffered_link.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Provides the `buffered_link` utility.
20//!
21//! The buffered link is a channel that allows buffering the method calls on `Link`.
22//!
23//! # Example
24//!
25//! ```
26//! use sc_consensus::import_queue::Link;
27//! # use sc_consensus::import_queue::buffered_link::buffered_link;
28//! # use sp_test_primitives::Block;
29//! # struct DummyLink; impl Link<Block> for DummyLink {}
30//! # let my_link = DummyLink;
31//! let (mut tx, mut rx) = buffered_link::<Block>(100_000);
32//! tx.blocks_processed(0, 0, vec![]);
33//!
34//! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled.
35//! let _fut = futures::future::poll_fn(move |cx| {
36//! 	rx.poll_actions(cx, &my_link).unwrap();
37//! 	std::task::Poll::Pending::<()>
38//! });
39//! ```
40
41use crate::import_queue::{JustificationImportResult, Link, RuntimeOrigin};
42use futures::prelude::*;
43use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
44use sp_runtime::traits::{Block as BlockT, NumberFor};
45use std::{
46	pin::Pin,
47	task::{Context, Poll},
48};
49
50use super::BlockImportResult;
51
52/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
53/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
54/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size.
55pub fn buffered_link<B: BlockT>(
56	queue_size_warning: usize,
57) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
58	let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
59	let tx = BufferedLinkSender { tx };
60	let rx = BufferedLinkReceiver { rx: rx.fuse() };
61	(tx, rx)
62}
63
64/// See [`buffered_link`].
65pub struct BufferedLinkSender<B: BlockT> {
66	tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
67}
68
69impl<B: BlockT> BufferedLinkSender<B> {
70	/// Returns true if the sender points to nowhere.
71	///
72	/// Once `true` is returned, it is pointless to use the sender anymore.
73	pub fn is_closed(&self) -> bool {
74		self.tx.is_closed()
75	}
76}
77
78impl<B: BlockT> Clone for BufferedLinkSender<B> {
79	fn clone(&self) -> Self {
80		BufferedLinkSender { tx: self.tx.clone() }
81	}
82}
83
84/// Internal buffered message.
85pub enum BlockImportWorkerMsg<B: BlockT> {
86	BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
87	JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, JustificationImportResult),
88	RequestJustification(B::Hash, NumberFor<B>),
89}
90
91impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
92	fn blocks_processed(
93		&self,
94		imported: usize,
95		count: usize,
96		results: Vec<(BlockImportResult<B>, B::Hash)>,
97	) {
98		let _ = self
99			.tx
100			.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
101	}
102
103	fn justification_imported(
104		&self,
105		who: RuntimeOrigin,
106		hash: &B::Hash,
107		number: NumberFor<B>,
108		import_result: JustificationImportResult,
109	) {
110		let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, import_result);
111		let _ = self.tx.unbounded_send(msg);
112	}
113
114	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
115		let _ = self
116			.tx
117			.unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
118	}
119}
120
121/// See [`buffered_link`].
122pub struct BufferedLinkReceiver<B: BlockT> {
123	rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
124}
125
126impl<B: BlockT> BufferedLinkReceiver<B> {
127	/// Send action for the synchronization to perform.
128	pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &dyn Link<B>) {
129		match msg {
130			BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => {
131				link.blocks_processed(imported, count, results)
132			},
133			BlockImportWorkerMsg::JustificationImported(who, hash, number, import_result) => {
134				link.justification_imported(who, &hash, number, import_result)
135			},
136			BlockImportWorkerMsg::RequestJustification(hash, number) => {
137				link.request_justification(&hash, number)
138			},
139		}
140	}
141
142	/// Polls for the buffered link actions. Any enqueued action will be propagated to the link
143	/// passed as parameter.
144	///
145	/// This method should behave in a way similar to `Future::poll`. It can register the current
146	/// task and notify later when more actions are ready to be polled. To continue the comparison,
147	/// it is as if this method always returned `Poll::Pending`.
148	///
149	/// Returns an error if the corresponding [`BufferedLinkSender`] has been closed.
150	pub fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) -> Result<(), ()> {
151		loop {
152			let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
153				Poll::Ready(Some(msg)) => msg,
154				Poll::Ready(None) => break Err(()),
155				Poll::Pending => break Ok(()),
156			};
157
158			self.send_actions(msg, link);
159		}
160	}
161
162	/// Poll next element from import queue and send the corresponding action command over the link.
163	pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
164		if let Some(msg) = self.rx.next().await {
165			self.send_actions(msg, link);
166			return Ok(());
167		}
168		Err(())
169	}
170
171	/// Close the channel.
172	pub fn close(&mut self) -> bool {
173		self.rx.get_mut().close()
174	}
175}
176
177#[cfg(test)]
178mod tests {
179	use sp_test_primitives::Block;
180
181	#[test]
182	fn is_closed() {
183		let (tx, rx) = super::buffered_link::<Block>(1);
184		assert!(!tx.is_closed());
185		drop(rx);
186		assert!(tx.is_closed());
187	}
188}