Skip to main content

soil_pow/
worker.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use futures::{
8	prelude::*,
9	task::{Context, Poll},
10};
11use futures_timer::Delay;
12use log::*;
13use parking_lot::Mutex;
14use soil_client::client_api::ImportNotifications;
15use soil_client::consensus::{BlockOrigin, Proposal};
16use soil_client::import::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges};
17use std::{
18	pin::Pin,
19	sync::{
20		atomic::{AtomicUsize, Ordering},
21		Arc,
22	},
23	time::Duration,
24};
25use subsoil::runtime::{
26	generic::BlockId,
27	traits::{Block as BlockT, Header as HeaderT},
28	DigestItem,
29};
30
31use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, LOG_TARGET, POW_ENGINE_ID};
32
33/// Mining metadata. This is the information needed to start an actual mining loop.
34#[derive(Clone, Eq, PartialEq)]
35pub struct MiningMetadata<H, D> {
36	/// Currently known best hash which the pre-hash is built on.
37	pub best_hash: H,
38	/// Mining pre-hash.
39	pub pre_hash: H,
40	/// Pre-runtime digest item.
41	pub pre_runtime: Option<Vec<u8>>,
42	/// Mining target difficulty.
43	pub difficulty: D,
44}
45
46/// A build of mining, containing the metadata and the block proposal.
47pub struct MiningBuild<Block: BlockT, Algorithm: PowAlgorithm<Block>> {
48	/// Mining metadata.
49	pub metadata: MiningMetadata<Block::Hash, Algorithm::Difficulty>,
50	/// Mining proposal.
51	pub proposal: Proposal<Block>,
52}
53
54/// Version of the mining worker.
55#[derive(Eq, PartialEq, Clone, Copy)]
56pub struct Version(usize);
57
58/// Mining worker that exposes structs to query the current mining build and submit mined blocks.
59pub struct MiningHandle<
60	Block: BlockT,
61	Algorithm: PowAlgorithm<Block>,
62	L: soil_client::import::JustificationSyncLink<Block>,
63> {
64	version: Arc<AtomicUsize>,
65	algorithm: Arc<Algorithm>,
66	justification_sync_link: Arc<L>,
67	build: Arc<Mutex<Option<MiningBuild<Block, Algorithm>>>>,
68	block_import: Arc<Mutex<BoxBlockImport<Block>>>,
69}
70
71impl<Block, Algorithm, L> MiningHandle<Block, Algorithm, L>
72where
73	Block: BlockT,
74	Algorithm: PowAlgorithm<Block>,
75	Algorithm::Difficulty: 'static + Send,
76	L: soil_client::import::JustificationSyncLink<Block>,
77{
78	fn increment_version(&self) {
79		self.version.fetch_add(1, Ordering::SeqCst);
80	}
81
82	pub(crate) fn new(
83		algorithm: Algorithm,
84		block_import: BoxBlockImport<Block>,
85		justification_sync_link: L,
86	) -> Self {
87		Self {
88			version: Arc::new(AtomicUsize::new(0)),
89			algorithm: Arc::new(algorithm),
90			justification_sync_link: Arc::new(justification_sync_link),
91			build: Arc::new(Mutex::new(None)),
92			block_import: Arc::new(Mutex::new(block_import)),
93		}
94	}
95
96	pub(crate) fn on_major_syncing(&self) {
97		let mut build = self.build.lock();
98		*build = None;
99		self.increment_version();
100	}
101
102	pub(crate) fn on_build(&self, value: MiningBuild<Block, Algorithm>) {
103		let mut build = self.build.lock();
104		*build = Some(value);
105		self.increment_version();
106	}
107
108	/// Get the version of the mining worker.
109	///
110	/// This returns type `Version` which can only compare equality. If `Version` is unchanged, then
111	/// it can be certain that `best_hash` and `metadata` were not changed.
112	pub fn version(&self) -> Version {
113		Version(self.version.load(Ordering::SeqCst))
114	}
115
116	/// Get the current best hash. `None` if the worker has just started or the client is doing
117	/// major syncing.
118	pub fn best_hash(&self) -> Option<Block::Hash> {
119		self.build.lock().as_ref().map(|b| b.metadata.best_hash)
120	}
121
122	/// Get a copy of the current mining metadata, if available.
123	pub fn metadata(&self) -> Option<MiningMetadata<Block::Hash, Algorithm::Difficulty>> {
124		self.build.lock().as_ref().map(|b| b.metadata.clone())
125	}
126
127	/// Submit a mined seal. The seal will be validated again. Returns true if the submission is
128	/// successful.
129	pub async fn submit(&self, seal: Seal) -> bool {
130		if let Some(metadata) = self.metadata() {
131			match self.algorithm.verify(
132				&BlockId::Hash(metadata.best_hash),
133				&metadata.pre_hash,
134				metadata.pre_runtime.as_ref().map(|v| &v[..]),
135				&seal,
136				metadata.difficulty,
137			) {
138				Ok(true) => (),
139				Ok(false) => {
140					warn!(target: LOG_TARGET, "Unable to import mined block: seal is invalid",);
141					return false;
142				},
143				Err(err) => {
144					warn!(target: LOG_TARGET, "Unable to import mined block: {}", err,);
145					return false;
146				},
147			}
148		} else {
149			warn!(target: LOG_TARGET, "Unable to import mined block: metadata does not exist",);
150			return false;
151		}
152
153		let build = if let Some(build) = {
154			let mut build = self.build.lock();
155			let value = build.take();
156			if value.is_some() {
157				self.increment_version();
158			}
159			value
160		} {
161			build
162		} else {
163			warn!(target: LOG_TARGET, "Unable to import mined block: build does not exist",);
164			return false;
165		};
166
167		let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
168		let (header, body) = build.proposal.block.deconstruct();
169
170		let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
171		import_block.post_digests.push(seal);
172		import_block.body = Some(body);
173		import_block.state_action =
174			StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));
175
176		let intermediate = PowIntermediate::<Algorithm::Difficulty> {
177			difficulty: Some(build.metadata.difficulty),
178		};
179		import_block.insert_intermediate(INTERMEDIATE_KEY, intermediate);
180
181		let header = import_block.post_header();
182		let block_import = self.block_import.lock();
183
184		match block_import.import_block(import_block).await {
185			Ok(res) => {
186				res.handle_justification(
187					&header.hash(),
188					*header.number(),
189					&self.justification_sync_link,
190				);
191
192				info!(
193					target: LOG_TARGET,
194					"✅ Successfully mined block on top of: {}", build.metadata.best_hash
195				);
196				true
197			},
198			Err(err) => {
199				warn!(target: LOG_TARGET, "Unable to import mined block: {}", err,);
200				false
201			},
202		}
203	}
204}
205
206impl<Block, Algorithm, L> Clone for MiningHandle<Block, Algorithm, L>
207where
208	Block: BlockT,
209	Algorithm: PowAlgorithm<Block>,
210	L: soil_client::import::JustificationSyncLink<Block>,
211{
212	fn clone(&self) -> Self {
213		Self {
214			version: self.version.clone(),
215			algorithm: self.algorithm.clone(),
216			justification_sync_link: self.justification_sync_link.clone(),
217			build: self.build.clone(),
218			block_import: self.block_import.clone(),
219		}
220	}
221}
222
223/// A stream that waits for a block import or timeout.
224pub struct UntilImportedOrTimeout<Block: BlockT> {
225	import_notifications: ImportNotifications<Block>,
226	timeout: Duration,
227	inner_delay: Option<Delay>,
228}
229
230impl<Block: BlockT> UntilImportedOrTimeout<Block> {
231	/// Create a new stream using the given import notification and timeout duration.
232	pub fn new(import_notifications: ImportNotifications<Block>, timeout: Duration) -> Self {
233		Self { import_notifications, timeout, inner_delay: None }
234	}
235}
236
237impl<Block: BlockT> Stream for UntilImportedOrTimeout<Block> {
238	type Item = ();
239
240	fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<()>> {
241		let mut fire = false;
242
243		loop {
244			match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) {
245				Poll::Pending => break,
246				Poll::Ready(Some(_)) => {
247					fire = true;
248				},
249				Poll::Ready(None) => return Poll::Ready(None),
250			}
251		}
252
253		let timeout = self.timeout;
254		let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout));
255
256		match Future::poll(Pin::new(inner_delay), cx) {
257			Poll::Pending => (),
258			Poll::Ready(()) => {
259				fire = true;
260			},
261		}
262
263		if fire {
264			self.inner_delay = None;
265			Poll::Ready(Some(()))
266		} else {
267			Poll::Pending
268		}
269	}
270}