Skip to main content

sc_consensus_grandpa/
import.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19use std::{collections::HashMap, marker::PhantomData, sync::Arc};
20
21use codec::Decode;
22use log::debug;
23use parking_lot::Mutex;
24
25use sc_client_api::{backend::Backend, utils::is_descendent_of};
26use sc_consensus::{
27	shared_data::{SharedDataLocked, SharedDataLockedUpgradable},
28	BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
29};
30use sc_telemetry::TelemetryHandle;
31use sc_utils::mpsc::TracingUnboundedSender;
32use sp_api::{Core, RuntimeApiInfo};
33use sp_blockchain::BlockStatus;
34use sp_consensus::{BlockOrigin, Error as ConsensusError, SelectChain};
35use sp_consensus_grandpa::{ConsensusLog, GrandpaApi, ScheduledChange, SetId, GRANDPA_ENGINE_ID};
36use sp_runtime::{
37	generic::OpaqueDigestItemId,
38	traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero},
39	Justification,
40};
41
42use crate::{
43	authorities::{AuthoritySet, DelayKind, PendingChange, SharedAuthoritySet},
44	environment,
45	justification::GrandpaJustification,
46	notification::GrandpaJustificationSender,
47	AuthoritySetChanges, ClientForGrandpa, CommandOrError, Error, NewAuthoritySet, VoterCommand,
48	LOG_TARGET,
49};
50
51/// A block-import handler for GRANDPA.
52///
53/// This scans each imported block for signals of changing authority set.
54/// If the block being imported enacts an authority set change then:
55/// - If the current authority set is still live: we import the block
56/// - Otherwise, the block must include a valid justification.
57///
58/// When using GRANDPA, the block import worker should be using this block import
59/// object.
60pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
61	inner: Arc<Client>,
62	justification_import_period: u32,
63	select_chain: SC,
64	authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
65	send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
66	authority_set_hard_forks:
67		Mutex<HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>>,
68	justification_sender: GrandpaJustificationSender<Block>,
69	telemetry: Option<TelemetryHandle>,
70	_phantom: PhantomData<Backend>,
71}
72
73impl<Backend, Block: BlockT, Client, SC: Clone> Clone
74	for GrandpaBlockImport<Backend, Block, Client, SC>
75{
76	fn clone(&self) -> Self {
77		GrandpaBlockImport {
78			inner: self.inner.clone(),
79			justification_import_period: self.justification_import_period,
80			select_chain: self.select_chain.clone(),
81			authority_set: self.authority_set.clone(),
82			send_voter_commands: self.send_voter_commands.clone(),
83			authority_set_hard_forks: Mutex::new(self.authority_set_hard_forks.lock().clone()),
84			justification_sender: self.justification_sender.clone(),
85			telemetry: self.telemetry.clone(),
86			_phantom: PhantomData,
87		}
88	}
89}
90
91#[async_trait::async_trait]
92impl<BE, Block: BlockT, Client, SC> JustificationImport<Block>
93	for GrandpaBlockImport<BE, Block, Client, SC>
94where
95	NumberFor<Block>: finality_grandpa::BlockNumberOps,
96	BE: Backend<Block>,
97	Client: ClientForGrandpa<Block, BE>,
98	SC: SelectChain<Block>,
99{
100	type Error = ConsensusError;
101
102	async fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor<Block>)> {
103		let mut out = Vec::new();
104		let chain_info = self.inner.info();
105
106		// request justifications for all pending changes for which change blocks have already been
107		// imported
108		let pending_changes: Vec<_> =
109			self.authority_set.inner().pending_changes().cloned().collect();
110
111		for pending_change in pending_changes {
112			if pending_change.delay_kind == DelayKind::Finalized &&
113				pending_change.effective_number() > chain_info.finalized_number &&
114				pending_change.effective_number() <= chain_info.best_number
115			{
116				let effective_block_hash = if !pending_change.delay.is_zero() {
117					self.select_chain
118						.finality_target(
119							pending_change.canon_hash,
120							Some(pending_change.effective_number()),
121						)
122						.await
123				} else {
124					Ok(pending_change.canon_hash)
125				};
126
127				if let Ok(hash) = effective_block_hash {
128					if let Ok(Some(header)) = self.inner.header(hash) {
129						if *header.number() == pending_change.effective_number() {
130							out.push((header.hash(), *header.number()));
131						}
132					}
133				}
134			}
135		}
136
137		out
138	}
139
140	async fn import_justification(
141		&mut self,
142		hash: Block::Hash,
143		number: NumberFor<Block>,
144		justification: Justification,
145	) -> Result<(), Self::Error> {
146		// this justification was requested by the sync service, therefore we
147		// are not sure if it should enact a change or not. it could have been a
148		// request made as part of initial sync but that means the justification
149		// wasn't part of the block and was requested asynchronously, probably
150		// makes sense to log in that case.
151		GrandpaBlockImport::import_justification(self, hash, number, justification, false, false)
152	}
153}
154
155enum AppliedChanges<H, N> {
156	Standard(bool), // true if the change is ready to be applied (i.e. it's a root)
157	Forced(NewAuthoritySet<H, N>),
158	None,
159}
160
161impl<H, N> AppliedChanges<H, N> {
162	fn needs_justification(&self) -> bool {
163		match *self {
164			AppliedChanges::Standard(_) => true,
165			AppliedChanges::Forced(_) | AppliedChanges::None => false,
166		}
167	}
168}
169
170struct PendingSetChanges<Block: BlockT> {
171	just_in_case: Option<(
172		AuthoritySet<Block::Hash, NumberFor<Block>>,
173		SharedDataLockedUpgradable<AuthoritySet<Block::Hash, NumberFor<Block>>>,
174	)>,
175	applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>,
176	do_pause: bool,
177}
178
179impl<Block: BlockT> PendingSetChanges<Block> {
180	// revert the pending set change explicitly.
181	fn revert(self) {}
182
183	fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) {
184		self.just_in_case = None;
185		let applied_changes = std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
186		(applied_changes, self.do_pause)
187	}
188}
189
190impl<Block: BlockT> Drop for PendingSetChanges<Block> {
191	fn drop(&mut self) {
192		if let Some((old_set, mut authorities)) = self.just_in_case.take() {
193			*authorities.upgrade() = old_set;
194		}
195	}
196}
197
198/// Checks the given header for a consensus digest signalling a **standard** scheduled change and
199/// extracts it.
200pub fn find_scheduled_change<B: BlockT>(
201	header: &B::Header,
202) -> Option<ScheduledChange<NumberFor<B>>> {
203	let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
204
205	let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
206		ConsensusLog::ScheduledChange(change) => Some(change),
207		_ => None,
208	};
209
210	// find the first consensus digest with the right ID which converts to
211	// the right kind of consensus log.
212	header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
213}
214
215/// Checks the given header for a consensus digest signalling a **forced** scheduled change and
216/// extracts it.
217pub fn find_forced_change<B: BlockT>(
218	header: &B::Header,
219) -> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)> {
220	let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
221
222	let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
223		ConsensusLog::ForcedChange(delay, change) => Some((delay, change)),
224		_ => None,
225	};
226
227	// find the first consensus digest with the right ID which converts to
228	// the right kind of consensus log.
229	header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
230}
231
232impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
233where
234	NumberFor<Block>: finality_grandpa::BlockNumberOps,
235	BE: Backend<Block>,
236	Client: ClientForGrandpa<Block, BE>,
237	Client::Api: GrandpaApi<Block>,
238	for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
239{
240	// check for a new authority set change.
241	fn check_new_change(
242		&self,
243		header: &Block::Header,
244		hash: Block::Hash,
245	) -> Option<PendingChange<Block::Hash, NumberFor<Block>>> {
246		// check for forced authority set hard forks
247		if let Some(change) = self.authority_set_hard_forks.lock().get(&hash) {
248			return Some(change.clone());
249		}
250
251		// check for forced change.
252		if let Some((median_last_finalized, change)) = find_forced_change::<Block>(header) {
253			return Some(PendingChange {
254				next_authorities: change.next_authorities,
255				delay: change.delay,
256				canon_height: *header.number(),
257				canon_hash: hash,
258				delay_kind: DelayKind::Best { median_last_finalized },
259			});
260		}
261
262		// check normal scheduled change.
263		let change = find_scheduled_change::<Block>(header)?;
264		Some(PendingChange {
265			next_authorities: change.next_authorities,
266			delay: change.delay,
267			canon_height: *header.number(),
268			canon_hash: hash,
269			delay_kind: DelayKind::Finalized,
270		})
271	}
272
273	fn make_authorities_changes(
274		&self,
275		block: &mut BlockImportParams<Block>,
276		hash: Block::Hash,
277		initial_sync: bool,
278	) -> Result<PendingSetChanges<Block>, ConsensusError> {
279		// For warp synced block we can skip authority set change tracking for warp synced blocks,
280		// because authority sets will be reconstructed after sync completes from the finalized
281		// state.
282		if block.origin == BlockOrigin::WarpSync {
283			return Ok(PendingSetChanges {
284				just_in_case: None,
285				applied_changes: AppliedChanges::None,
286				do_pause: false,
287			});
288		}
289
290		// when we update the authorities, we need to hold the lock
291		// until the block is written to prevent a race if we need to restore
292		// the old authority set on error or panic.
293		struct InnerGuard<'a, H, N> {
294			old: Option<AuthoritySet<H, N>>,
295			guard: Option<SharedDataLocked<'a, AuthoritySet<H, N>>>,
296		}
297
298		impl<'a, H, N> InnerGuard<'a, H, N> {
299			fn as_mut(&mut self) -> &mut AuthoritySet<H, N> {
300				self.guard.as_mut().expect("only taken on deconstruction; qed")
301			}
302
303			fn set_old(&mut self, old: AuthoritySet<H, N>) {
304				if self.old.is_none() {
305					// ignore "newer" old changes.
306					self.old = Some(old);
307				}
308			}
309
310			fn consume(
311				mut self,
312			) -> Option<(AuthoritySet<H, N>, SharedDataLocked<'a, AuthoritySet<H, N>>)> {
313				self.old
314					.take()
315					.map(|old| (old, self.guard.take().expect("only taken on deconstruction; qed")))
316			}
317		}
318
319		impl<'a, H, N> Drop for InnerGuard<'a, H, N> {
320			fn drop(&mut self) {
321				if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) {
322					*guard = old;
323				}
324			}
325		}
326
327		let number = *(block.header.number());
328		let maybe_change = self.check_new_change(&block.header, hash);
329
330		// returns a function for checking whether a block is a descendent of another
331		// consistent with querying client directly after importing the block.
332		let parent_hash = *block.header.parent_hash();
333		let is_descendent_of = is_descendent_of(&*self.inner, Some((hash, parent_hash)));
334
335		let mut guard = InnerGuard { guard: Some(self.authority_set.inner_locked()), old: None };
336
337		// whether to pause the old authority set -- happens after import
338		// of a forced change block.
339		let mut do_pause = false;
340
341		// add any pending changes.
342		if let Some(change) = maybe_change {
343			let old = guard.as_mut().clone();
344			guard.set_old(old);
345
346			if let DelayKind::Best { .. } = change.delay_kind {
347				do_pause = true;
348			}
349
350			guard
351				.as_mut()
352				.add_pending_change(change, &is_descendent_of)
353				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
354		}
355
356		let applied_changes = {
357			let forced_change_set = guard
358				.as_mut()
359				.apply_forced_changes(
360					hash,
361					number,
362					&is_descendent_of,
363					initial_sync,
364					self.telemetry.clone(),
365				)
366				.map_err(|e| ConsensusError::ClientImport(e.to_string()))
367				.map_err(ConsensusError::from)?;
368
369			if let Some((median_last_finalized_number, new_set)) = forced_change_set {
370				let new_authorities = {
371					let (set_id, new_authorities) = new_set.current();
372
373					// we will use the median last finalized number as a hint
374					// for the canon block the new authority set should start
375					// with. we use the minimum between the median and the local
376					// best finalized block.
377					let best_finalized_number = self.inner.info().finalized_number;
378					let canon_number = best_finalized_number.min(median_last_finalized_number);
379					let canon_hash = self.inner.hash(canon_number)
380							.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
381							.expect(
382								"the given block number is less or equal than the current best finalized number; \
383								 current best finalized number must exist in chain; qed."
384							);
385
386					NewAuthoritySet {
387						canon_number,
388						canon_hash,
389						set_id,
390						authorities: new_authorities.to_vec(),
391					}
392				};
393				let old = ::std::mem::replace(guard.as_mut(), new_set);
394				guard.set_old(old);
395
396				AppliedChanges::Forced(new_authorities)
397			} else {
398				let did_standard = guard
399					.as_mut()
400					.enacts_standard_change(hash, number, &is_descendent_of)
401					.map_err(|e| ConsensusError::ClientImport(e.to_string()))
402					.map_err(ConsensusError::from)?;
403
404				if let Some(root) = did_standard {
405					AppliedChanges::Standard(root)
406				} else {
407					AppliedChanges::None
408				}
409			}
410		};
411
412		// consume the guard safely and write necessary changes.
413		let just_in_case = guard.consume();
414		if let Some((_, ref authorities)) = just_in_case {
415			let authorities_change = match applied_changes {
416				AppliedChanges::Forced(ref new) => Some(new),
417				AppliedChanges::Standard(_) => None, // the change isn't actually applied yet.
418				AppliedChanges::None => None,
419			};
420
421			crate::aux_schema::update_authority_set::<Block, _, _>(
422				authorities,
423				authorities_change,
424				|insert| {
425					block
426						.auxiliary
427						.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
428				},
429			);
430		}
431
432		let just_in_case = just_in_case.map(|(o, i)| (o, i.release_mutex()));
433
434		Ok(PendingSetChanges { just_in_case, applied_changes, do_pause })
435	}
436
437	/// Read current set id form a given state.
438	fn current_set_id(&self, hash: Block::Hash) -> Result<SetId, ConsensusError> {
439		let runtime_version = self.inner.runtime_api().version(hash).map_err(|e| {
440			ConsensusError::ClientImport(format!(
441				"Unable to retrieve current runtime version. {}",
442				e
443			))
444		})?;
445
446		if runtime_version
447			.api_version(&<dyn GrandpaApi<Block>>::ID)
448			.map_or(false, |v| v < 3)
449		{
450			// The new API is not supported in this runtime. Try reading directly from storage.
451			// This code may be removed once warp sync to an old runtime is no longer needed.
452			for prefix in ["GrandpaFinality", "Grandpa"] {
453				let k = [
454					sp_crypto_hashing::twox_128(prefix.as_bytes()),
455					sp_crypto_hashing::twox_128(b"CurrentSetId"),
456				]
457				.concat();
458				if let Ok(Some(id)) =
459					self.inner.storage(hash, &sc_client_api::StorageKey(k.to_vec()))
460				{
461					if let Ok(id) = SetId::decode(&mut id.0.as_ref()) {
462						return Ok(id);
463					}
464				}
465			}
466			Err(ConsensusError::ClientImport("Unable to retrieve current set id.".into()))
467		} else {
468			self.inner
469				.runtime_api()
470				.current_set_id(hash)
471				.map_err(|e| ConsensusError::ClientImport(e.to_string()))
472		}
473	}
474
475	/// Import whole new state and reset authority set.
476	async fn import_state(
477		&self,
478		mut block: BlockImportParams<Block>,
479	) -> Result<ImportResult, ConsensusError> {
480		let hash = block.post_hash();
481		let number = *block.header.number();
482		// Force imported state finality.
483		block.finalized = true;
484		let import_result = (&*self.inner).import_block(block).await;
485		match import_result {
486			Ok(ImportResult::Imported(aux)) => {
487				// We've just imported a new state. We trust the sync module has verified
488				// finality proofs and that the state is correct and final.
489				// So we can read the authority list and set id from the state.
490				self.authority_set_hard_forks.lock().clear();
491				let authorities = self
492					.inner
493					.runtime_api()
494					.grandpa_authorities(hash)
495					.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
496				let set_id = self.current_set_id(hash)?;
497				let authority_set = AuthoritySet::new(
498					authorities.clone(),
499					set_id,
500					fork_tree::ForkTree::new(),
501					Vec::new(),
502					AuthoritySetChanges::empty(),
503				)
504				.ok_or_else(|| ConsensusError::ClientImport("Invalid authority list".into()))?;
505				*self.authority_set.inner_locked() = authority_set.clone();
506
507				crate::aux_schema::update_authority_set::<Block, _, _>(
508					&authority_set,
509					None,
510					|insert| self.inner.insert_aux(insert, []),
511				)
512				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
513				let new_set =
514					NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities };
515				let _ = self
516					.send_voter_commands
517					.unbounded_send(VoterCommand::ChangeAuthorities(new_set));
518				Ok(ImportResult::Imported(aux))
519			},
520			Ok(r) => Ok(r),
521			Err(e) => Err(ConsensusError::ClientImport(e.to_string())),
522		}
523	}
524}
525
526#[async_trait::async_trait]
527impl<BE, Block: BlockT, Client, SC> BlockImport<Block> for GrandpaBlockImport<BE, Block, Client, SC>
528where
529	NumberFor<Block>: finality_grandpa::BlockNumberOps,
530	BE: Backend<Block>,
531	Client: ClientForGrandpa<Block, BE>,
532	Client::Api: GrandpaApi<Block>,
533	for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
534	SC: Send + Sync,
535{
536	type Error = ConsensusError;
537
538	async fn import_block(
539		&self,
540		mut block: BlockImportParams<Block>,
541	) -> Result<ImportResult, Self::Error> {
542		let hash = block.post_hash();
543		let number = *block.header.number();
544
545		// early exit if block already in chain, otherwise the check for
546		// authority changes will error when trying to re-import a change block
547		match self.inner.status(hash) {
548			Ok(BlockStatus::InChain) => {
549				// Strip justifications when re-importing an existing block.
550				let _justifications = block.justifications.take();
551				return (&*self.inner).import_block(block).await;
552			},
553			Ok(BlockStatus::Unknown) => {},
554			Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
555		}
556
557		if block.with_state() {
558			return self.import_state(block).await;
559		}
560
561		if number <= self.inner.info().finalized_number {
562			// Importing an old block. Just save justifications and authority set changes
563			if self.check_new_change(&block.header, hash).is_some() {
564				if block.justifications.is_none() {
565					return Err(ConsensusError::ClientImport(
566						"Justification required when importing \
567							an old block with authority set change."
568							.into(),
569					));
570				}
571				let mut authority_set = self.authority_set.inner_locked();
572				authority_set.authority_set_changes.insert(number);
573				crate::aux_schema::update_authority_set::<Block, _, _>(
574					&authority_set,
575					None,
576					|insert| {
577						block
578							.auxiliary
579							.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
580					},
581				);
582			}
583			return (&*self.inner).import_block(block).await;
584		}
585
586		// on initial sync we will restrict logging under info to avoid spam.
587		let initial_sync = block.origin == BlockOrigin::NetworkInitialSync;
588
589		let pending_changes = self.make_authorities_changes(&mut block, hash, initial_sync)?;
590
591		// we don't want to finalize on `inner.import_block`
592		let mut justifications = block.justifications.take();
593		let import_result = (&*self.inner).import_block(block).await;
594
595		let mut imported_aux = {
596			match import_result {
597				Ok(ImportResult::Imported(aux)) => aux,
598				Ok(r) => {
599					debug!(
600						target: LOG_TARGET,
601						"Restoring old authority set after block import result: {:?}", r,
602					);
603					pending_changes.revert();
604					return Ok(r);
605				},
606				Err(e) => {
607					debug!(
608						target: LOG_TARGET,
609						"Restoring old authority set after block import error: {}", e,
610					);
611					pending_changes.revert();
612					return Err(ConsensusError::ClientImport(e.to_string()));
613				},
614			}
615		};
616
617		let (applied_changes, do_pause) = pending_changes.defuse();
618
619		// Send the pause signal after import but BEFORE sending a `ChangeAuthorities` message.
620		if do_pause {
621			let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause(
622				"Forced change scheduled after inactivity".to_string(),
623			));
624		}
625
626		let needs_justification = applied_changes.needs_justification();
627
628		match applied_changes {
629			AppliedChanges::Forced(new) => {
630				// NOTE: when we do a force change we are "discrediting" the old set so we
631				// ignore any justifications from them. this block may contain a justification
632				// which should be checked and imported below against the new authority
633				// triggered by this forced change. the new grandpa voter will start at the
634				// last median finalized block (which is before the block that enacts the
635				// change), full nodes syncing the chain will not be able to successfully
636				// import justifications for those blocks since their local authority set view
637				// is still of the set before the forced change was enacted, still after #1867
638				// they should import the block and discard the justification, and they will
639				// then request a justification from sync if it's necessary (which they should
640				// then be able to successfully validate).
641				let _ =
642					self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
643
644				// we must clear all pending justifications requests, presumably they won't be
645				// finalized hence why this forced changes was triggered
646				imported_aux.clear_justification_requests = true;
647			},
648			AppliedChanges::Standard(false) => {
649				// we can't apply this change yet since there are other dependent changes that we
650				// need to apply first, drop any justification that might have been provided with
651				// the block to make sure we request them from `sync` which will ensure they'll be
652				// applied in-order.
653				justifications.take();
654			},
655			_ => {},
656		}
657
658		let grandpa_justification =
659			justifications.and_then(|just| just.into_justification(GRANDPA_ENGINE_ID));
660
661		match grandpa_justification {
662			Some(justification) => {
663				if environment::should_process_justification(
664					&*self.inner,
665					self.justification_import_period,
666					number,
667					needs_justification,
668				) {
669					let import_res = self.import_justification(
670						hash,
671						number,
672						(GRANDPA_ENGINE_ID, justification),
673						needs_justification,
674						initial_sync,
675					);
676
677					import_res.unwrap_or_else(|err| {
678						if needs_justification {
679							debug!(
680								target: LOG_TARGET,
681								"Requesting justification from peers due to imported block #{} that enacts authority set change with invalid justification: {}",
682								number,
683								err
684							);
685							imported_aux.bad_justification = true;
686							imported_aux.needs_justification = true;
687						}
688					});
689				} else {
690					debug!(
691						target: LOG_TARGET,
692						"Ignoring unnecessary justification for block #{}",
693						number,
694					);
695				}
696			},
697			None => {
698				if needs_justification {
699					debug!(
700						target: LOG_TARGET,
701						"Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
702						number,
703					);
704
705					imported_aux.needs_justification = true;
706				}
707			},
708		}
709
710		Ok(ImportResult::Imported(imported_aux))
711	}
712
713	async fn check_block(
714		&self,
715		block: BlockCheckParams<Block>,
716	) -> Result<ImportResult, Self::Error> {
717		self.inner.check_block(block).await
718	}
719}
720
721impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Client, SC> {
722	pub(crate) fn new(
723		inner: Arc<Client>,
724		justification_import_period: u32,
725		select_chain: SC,
726		authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
727		send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
728		authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
729		justification_sender: GrandpaJustificationSender<Block>,
730		telemetry: Option<TelemetryHandle>,
731	) -> GrandpaBlockImport<Backend, Block, Client, SC> {
732		// check for and apply any forced authority set hard fork that applies
733		// to the *current* authority set.
734		if let Some((_, change)) = authority_set_hard_forks
735			.iter()
736			.find(|(set_id, _)| *set_id == authority_set.set_id())
737		{
738			authority_set.inner().current_authorities = change.next_authorities.clone();
739		}
740
741		// index authority set hard forks by block hash so that they can be used
742		// by any node syncing the chain and importing a block hard fork
743		// authority set changes.
744		let authority_set_hard_forks = authority_set_hard_forks
745			.into_iter()
746			.map(|(_, change)| (change.canon_hash, change))
747			.collect::<HashMap<_, _>>();
748
749		// check for and apply any forced authority set hard fork that apply to
750		// any *pending* standard changes, checking by the block hash at which
751		// they were announced.
752		{
753			let mut authority_set = authority_set.inner();
754
755			authority_set.pending_standard_changes =
756				authority_set.pending_standard_changes.clone().map(&mut |hash, _, original| {
757					authority_set_hard_forks.get(hash).cloned().unwrap_or(original)
758				});
759		}
760
761		GrandpaBlockImport {
762			inner,
763			justification_import_period,
764			select_chain,
765			authority_set,
766			send_voter_commands,
767			authority_set_hard_forks: Mutex::new(authority_set_hard_forks),
768			justification_sender,
769			telemetry,
770			_phantom: PhantomData,
771		}
772	}
773}
774
775impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
776where
777	BE: Backend<Block>,
778	Client: ClientForGrandpa<Block, BE>,
779	NumberFor<Block>: finality_grandpa::BlockNumberOps,
780{
781	/// Import a block justification and finalize the block.
782	///
783	/// If `enacts_change` is set to true, then finalizing this block *must*
784	/// enact an authority set change, the function will panic otherwise.
785	fn import_justification(
786		&self,
787		hash: Block::Hash,
788		number: NumberFor<Block>,
789		justification: Justification,
790		enacts_change: bool,
791		initial_sync: bool,
792	) -> Result<(), ConsensusError> {
793		if justification.0 != GRANDPA_ENGINE_ID {
794			// TODO: the import queue needs to be refactored to be able dispatch to the correct
795			// `JustificationImport` instance based on `ConsensusEngineId`, or we need to build a
796			// justification import pipeline similar to what we do for `BlockImport`. In the
797			// meantime we'll just drop the justification, since this is only used for BEEFY which
798			// is still WIP.
799			return Ok(());
800		}
801
802		let justification = GrandpaJustification::decode_and_verify_finalizes(
803			&justification.1,
804			(hash, number),
805			self.authority_set.set_id(),
806			&self.authority_set.current_authorities(),
807		);
808
809		let justification = match justification {
810			Err(e) => {
811				return match e {
812					sp_blockchain::Error::OutdatedJustification => {
813						Err(ConsensusError::OutdatedJustification)
814					},
815					_ => Err(ConsensusError::ClientImport(e.to_string())),
816				};
817			},
818			Ok(justification) => justification,
819		};
820
821		let result = environment::finalize_block(
822			self.inner.clone(),
823			&self.authority_set,
824			None,
825			hash,
826			number,
827			justification.into(),
828			initial_sync,
829			Some(&self.justification_sender),
830			self.telemetry.clone(),
831		);
832
833		match result {
834			Err(CommandOrError::VoterCommand(command)) => {
835				grandpa_log!(
836					initial_sync,
837					"👴 Imported justification for block #{} that triggers \
838					command {}, signaling voter.",
839					number,
840					command,
841				);
842
843				// send the command to the voter
844				let _ = self.send_voter_commands.unbounded_send(command);
845			},
846			Err(CommandOrError::Error(e)) => {
847				return Err(match e {
848					Error::Grandpa(error) => ConsensusError::ClientImport(error.to_string()),
849					Error::Network(error) => ConsensusError::ClientImport(error),
850					Error::Blockchain(error) => ConsensusError::ClientImport(error),
851					Error::Client(error) => ConsensusError::ClientImport(error.to_string()),
852					Error::Safety(error) => ConsensusError::ClientImport(error),
853					Error::Signing(error) => ConsensusError::ClientImport(error),
854					Error::Timer(error) => ConsensusError::ClientImport(error.to_string()),
855					Error::RuntimeApi(error) => ConsensusError::ClientImport(error.to_string()),
856				})
857			},
858			Ok(_) => {
859				assert!(
860					!enacts_change,
861					"returns Ok when no authority set change should be enacted; qed;"
862				);
863			},
864		}
865
866		Ok(())
867	}
868}