Skip to main content

soil_client/import/queue/
buffered_link.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Provides the `buffered_link` utility.
8//!
9//! The buffered link is a channel that allows buffering the method calls on `Link`.
10//!
11//! # Example
12//!
13//! ```
14//! use soil_client::import::Link;
15//! # use soil_client::import::queue::buffered_link::buffered_link;
16//! # use soil_test::primitives::Block;
17//! # struct DummyLink; impl Link<Block> for DummyLink {}
18//! # let my_link = DummyLink;
19//! let (mut tx, mut rx) = buffered_link::<Block>(100_000);
20//! tx.blocks_processed(0, 0, vec![]);
21//!
22//! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled.
23//! let _fut = futures::future::poll_fn(move |cx| {
24//! 	rx.poll_actions(cx, &my_link).unwrap();
25//! 	std::task::Poll::Pending::<()>
26//! });
27//! ```
28
29use super::{JustificationImportResult, Link, RuntimeOrigin};
30use crate::utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
31use futures::prelude::*;
32use std::{
33	pin::Pin,
34	task::{Context, Poll},
35};
36use subsoil::runtime::traits::{Block as BlockT, NumberFor};
37
38use super::BlockImportResult;
39
40/// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and
41/// can be used to buffer commands, and the receiver can be used to poll said commands and transfer
42/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size.
43pub fn buffered_link<B: BlockT>(
44	queue_size_warning: usize,
45) -> (BufferedLinkSender<B>, BufferedLinkReceiver<B>) {
46	let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning);
47	let tx = BufferedLinkSender { tx };
48	let rx = BufferedLinkReceiver { rx: rx.fuse() };
49	(tx, rx)
50}
51
52/// See [`buffered_link`].
53pub struct BufferedLinkSender<B: BlockT> {
54	tx: TracingUnboundedSender<BlockImportWorkerMsg<B>>,
55}
56
57impl<B: BlockT> BufferedLinkSender<B> {
58	/// Returns true if the sender points to nowhere.
59	///
60	/// Once `true` is returned, it is pointless to use the sender anymore.
61	pub fn is_closed(&self) -> bool {
62		self.tx.is_closed()
63	}
64}
65
66impl<B: BlockT> Clone for BufferedLinkSender<B> {
67	fn clone(&self) -> Self {
68		BufferedLinkSender { tx: self.tx.clone() }
69	}
70}
71
72/// Internal buffered message.
73pub enum BlockImportWorkerMsg<B: BlockT> {
74	BlocksProcessed(usize, usize, Vec<(BlockImportResult<B>, B::Hash)>),
75	JustificationImported(RuntimeOrigin, B::Hash, NumberFor<B>, JustificationImportResult),
76	RequestJustification(B::Hash, NumberFor<B>),
77}
78
79impl<B: BlockT> Link<B> for BufferedLinkSender<B> {
80	fn blocks_processed(
81		&self,
82		imported: usize,
83		count: usize,
84		results: Vec<(BlockImportResult<B>, B::Hash)>,
85	) {
86		let _ = self
87			.tx
88			.unbounded_send(BlockImportWorkerMsg::BlocksProcessed(imported, count, results));
89	}
90
91	fn justification_imported(
92		&self,
93		who: RuntimeOrigin,
94		hash: &B::Hash,
95		number: NumberFor<B>,
96		import_result: JustificationImportResult,
97	) {
98		let msg = BlockImportWorkerMsg::JustificationImported(who, *hash, number, import_result);
99		let _ = self.tx.unbounded_send(msg);
100	}
101
102	fn request_justification(&self, hash: &B::Hash, number: NumberFor<B>) {
103		let _ = self
104			.tx
105			.unbounded_send(BlockImportWorkerMsg::RequestJustification(*hash, number));
106	}
107}
108
109/// See [`buffered_link`].
110pub struct BufferedLinkReceiver<B: BlockT> {
111	rx: stream::Fuse<TracingUnboundedReceiver<BlockImportWorkerMsg<B>>>,
112}
113
114impl<B: BlockT> BufferedLinkReceiver<B> {
115	/// Send action for the synchronization to perform.
116	pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &dyn Link<B>) {
117		match msg {
118			BlockImportWorkerMsg::BlocksProcessed(imported, count, results) => {
119				link.blocks_processed(imported, count, results)
120			},
121			BlockImportWorkerMsg::JustificationImported(who, hash, number, import_result) => {
122				link.justification_imported(who, &hash, number, import_result)
123			},
124			BlockImportWorkerMsg::RequestJustification(hash, number) => {
125				link.request_justification(&hash, number)
126			},
127		}
128	}
129
130	/// Polls for the buffered link actions. Any enqueued action will be propagated to the link
131	/// passed as parameter.
132	///
133	/// This method should behave in a way similar to `Future::poll`. It can register the current
134	/// task and notify later when more actions are ready to be polled. To continue the comparison,
135	/// it is as if this method always returned `Poll::Pending`.
136	///
137	/// Returns an error if the corresponding [`BufferedLinkSender`] has been closed.
138	pub fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) -> Result<(), ()> {
139		loop {
140			let msg = match Stream::poll_next(Pin::new(&mut self.rx), cx) {
141				Poll::Ready(Some(msg)) => msg,
142				Poll::Ready(None) => break Err(()),
143				Poll::Pending => break Ok(()),
144			};
145
146			self.send_actions(msg, link);
147		}
148	}
149
150	/// Poll next element from import queue and send the corresponding action command over the link.
151	pub async fn next_action(&mut self, link: &dyn Link<B>) -> Result<(), ()> {
152		if let Some(msg) = self.rx.next().await {
153			self.send_actions(msg, link);
154			return Ok(());
155		}
156		Err(())
157	}
158
159	/// Close the channel.
160	pub fn close(&mut self) -> bool {
161		self.rx.get_mut().close()
162	}
163}
164
165#[cfg(test)]
166mod tests {
167	use soil_test::primitives::Block;
168
169	#[test]
170	fn is_closed() {
171		let (tx, rx) = super::buffered_link::<Block>(1);
172		assert!(!tx.is_closed());
173		drop(rx);
174		assert!(tx.is_closed());
175	}
176}