Skip to main content

soil_grandpa/
import.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use std::{collections::HashMap, marker::PhantomData, sync::Arc};
8
9use codec::Decode;
10use log::debug;
11use parking_lot::Mutex;
12
13use soil_client::blockchain::BlockStatus;
14use soil_client::client_api::{backend::Backend, utils::is_descendent_of};
15use soil_client::consensus::{BlockOrigin, Error as ConsensusError, SelectChain};
16use soil_client::import::{
17	BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
18};
19use soil_client::utils::mpsc::TracingUnboundedSender;
20use soil_consensus::shared_data::{SharedDataLocked, SharedDataLockedUpgradable};
21use soil_telemetry::TelemetryHandle;
22use subsoil::api::{Core, RuntimeApiInfo};
23use subsoil::consensus::grandpa::{
24	ConsensusLog, GrandpaApi, ScheduledChange, SetId, GRANDPA_ENGINE_ID,
25};
26use subsoil::runtime::{
27	generic::OpaqueDigestItemId,
28	traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero},
29	Justification,
30};
31
32use crate::{
33	authorities::{AuthoritySet, DelayKind, PendingChange, SharedAuthoritySet},
34	environment,
35	justification::GrandpaJustification,
36	notification::GrandpaJustificationSender,
37	AuthoritySetChanges, ClientForGrandpa, CommandOrError, Error, NewAuthoritySet, VoterCommand,
38	LOG_TARGET,
39};
40
41/// A block-import handler for GRANDPA.
42///
43/// This scans each imported block for signals of changing authority set.
44/// If the block being imported enacts an authority set change then:
45/// - If the current authority set is still live: we import the block
46/// - Otherwise, the block must include a valid justification.
47///
48/// When using GRANDPA, the block import worker should be using this block import
49/// object.
50pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
51	inner: Arc<Client>,
52	justification_import_period: u32,
53	select_chain: SC,
54	authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
55	send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
56	authority_set_hard_forks:
57		Mutex<HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>>,
58	justification_sender: GrandpaJustificationSender<Block>,
59	telemetry: Option<TelemetryHandle>,
60	_phantom: PhantomData<Backend>,
61}
62
63impl<Backend, Block: BlockT, Client, SC: Clone> Clone
64	for GrandpaBlockImport<Backend, Block, Client, SC>
65{
66	fn clone(&self) -> Self {
67		GrandpaBlockImport {
68			inner: self.inner.clone(),
69			justification_import_period: self.justification_import_period,
70			select_chain: self.select_chain.clone(),
71			authority_set: self.authority_set.clone(),
72			send_voter_commands: self.send_voter_commands.clone(),
73			authority_set_hard_forks: Mutex::new(self.authority_set_hard_forks.lock().clone()),
74			justification_sender: self.justification_sender.clone(),
75			telemetry: self.telemetry.clone(),
76			_phantom: PhantomData,
77		}
78	}
79}
80
81#[async_trait::async_trait]
82impl<BE, Block: BlockT, Client, SC> JustificationImport<Block>
83	for GrandpaBlockImport<BE, Block, Client, SC>
84where
85	NumberFor<Block>: finality_grandpa::BlockNumberOps,
86	BE: Backend<Block>,
87	Client: ClientForGrandpa<Block, BE>,
88	SC: SelectChain<Block>,
89{
90	type Error = ConsensusError;
91
92	async fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor<Block>)> {
93		let mut out = Vec::new();
94		let chain_info = self.inner.info();
95
96		// request justifications for all pending changes for which change blocks have already been
97		// imported
98		let pending_changes: Vec<_> =
99			self.authority_set.inner().pending_changes().cloned().collect();
100
101		for pending_change in pending_changes {
102			if pending_change.delay_kind == DelayKind::Finalized
103				&& pending_change.effective_number() > chain_info.finalized_number
104				&& pending_change.effective_number() <= chain_info.best_number
105			{
106				let effective_block_hash = if !pending_change.delay.is_zero() {
107					self.select_chain
108						.finality_target(
109							pending_change.canon_hash,
110							Some(pending_change.effective_number()),
111						)
112						.await
113				} else {
114					Ok(pending_change.canon_hash)
115				};
116
117				if let Ok(hash) = effective_block_hash {
118					if let Ok(Some(header)) = self.inner.header(hash) {
119						if *header.number() == pending_change.effective_number() {
120							out.push((header.hash(), *header.number()));
121						}
122					}
123				}
124			}
125		}
126
127		out
128	}
129
130	async fn import_justification(
131		&mut self,
132		hash: Block::Hash,
133		number: NumberFor<Block>,
134		justification: Justification,
135	) -> Result<(), Self::Error> {
136		// this justification was requested by the sync service, therefore we
137		// are not sure if it should enact a change or not. it could have been a
138		// request made as part of initial sync but that means the justification
139		// wasn't part of the block and was requested asynchronously, probably
140		// makes sense to log in that case.
141		GrandpaBlockImport::import_justification(self, hash, number, justification, false, false)
142	}
143}
144
145enum AppliedChanges<H, N> {
146	Standard(bool), // true if the change is ready to be applied (i.e. it's a root)
147	Forced(NewAuthoritySet<H, N>),
148	None,
149}
150
151impl<H, N> AppliedChanges<H, N> {
152	fn needs_justification(&self) -> bool {
153		match *self {
154			AppliedChanges::Standard(_) => true,
155			AppliedChanges::Forced(_) | AppliedChanges::None => false,
156		}
157	}
158}
159
160struct PendingSetChanges<Block: BlockT> {
161	just_in_case: Option<(
162		AuthoritySet<Block::Hash, NumberFor<Block>>,
163		SharedDataLockedUpgradable<AuthoritySet<Block::Hash, NumberFor<Block>>>,
164	)>,
165	applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>,
166	do_pause: bool,
167}
168
169impl<Block: BlockT> PendingSetChanges<Block> {
170	// revert the pending set change explicitly.
171	fn revert(self) {}
172
173	fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) {
174		self.just_in_case = None;
175		let applied_changes = std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
176		(applied_changes, self.do_pause)
177	}
178}
179
180impl<Block: BlockT> Drop for PendingSetChanges<Block> {
181	fn drop(&mut self) {
182		if let Some((old_set, mut authorities)) = self.just_in_case.take() {
183			*authorities.upgrade() = old_set;
184		}
185	}
186}
187
188/// Checks the given header for a consensus digest signalling a **standard** scheduled change and
189/// extracts it.
190pub fn find_scheduled_change<B: BlockT>(
191	header: &B::Header,
192) -> Option<ScheduledChange<NumberFor<B>>> {
193	let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
194
195	let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
196		ConsensusLog::ScheduledChange(change) => Some(change),
197		_ => None,
198	};
199
200	// find the first consensus digest with the right ID which converts to
201	// the right kind of consensus log.
202	header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
203}
204
205/// Checks the given header for a consensus digest signalling a **forced** scheduled change and
206/// extracts it.
207pub fn find_forced_change<B: BlockT>(
208	header: &B::Header,
209) -> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)> {
210	let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
211
212	let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
213		ConsensusLog::ForcedChange(delay, change) => Some((delay, change)),
214		_ => None,
215	};
216
217	// find the first consensus digest with the right ID which converts to
218	// the right kind of consensus log.
219	header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
220}
221
222impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
223where
224	NumberFor<Block>: finality_grandpa::BlockNumberOps,
225	BE: Backend<Block>,
226	Client: ClientForGrandpa<Block, BE>,
227	Client::Api: GrandpaApi<Block>,
228	for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
229{
230	// check for a new authority set change.
231	fn check_new_change(
232		&self,
233		header: &Block::Header,
234		hash: Block::Hash,
235	) -> Option<PendingChange<Block::Hash, NumberFor<Block>>> {
236		// check for forced authority set hard forks
237		if let Some(change) = self.authority_set_hard_forks.lock().get(&hash) {
238			return Some(change.clone());
239		}
240
241		// check for forced change.
242		if let Some((median_last_finalized, change)) = find_forced_change::<Block>(header) {
243			return Some(PendingChange {
244				next_authorities: change.next_authorities,
245				delay: change.delay,
246				canon_height: *header.number(),
247				canon_hash: hash,
248				delay_kind: DelayKind::Best { median_last_finalized },
249			});
250		}
251
252		// check normal scheduled change.
253		let change = find_scheduled_change::<Block>(header)?;
254		Some(PendingChange {
255			next_authorities: change.next_authorities,
256			delay: change.delay,
257			canon_height: *header.number(),
258			canon_hash: hash,
259			delay_kind: DelayKind::Finalized,
260		})
261	}
262
263	fn make_authorities_changes(
264		&self,
265		block: &mut BlockImportParams<Block>,
266		hash: Block::Hash,
267		initial_sync: bool,
268	) -> Result<PendingSetChanges<Block>, ConsensusError> {
269		// For warp synced block we can skip authority set change tracking for warp synced blocks,
270		// because authority sets will be reconstructed after sync completes from the finalized
271		// state.
272		if block.origin == BlockOrigin::WarpSync {
273			return Ok(PendingSetChanges {
274				just_in_case: None,
275				applied_changes: AppliedChanges::None,
276				do_pause: false,
277			});
278		}
279
280		// when we update the authorities, we need to hold the lock
281		// until the block is written to prevent a race if we need to restore
282		// the old authority set on error or panic.
283		struct InnerGuard<'a, H, N> {
284			old: Option<AuthoritySet<H, N>>,
285			guard: Option<SharedDataLocked<'a, AuthoritySet<H, N>>>,
286		}
287
288		impl<'a, H, N> InnerGuard<'a, H, N> {
289			fn as_mut(&mut self) -> &mut AuthoritySet<H, N> {
290				self.guard.as_mut().expect("only taken on deconstruction; qed")
291			}
292
293			fn set_old(&mut self, old: AuthoritySet<H, N>) {
294				if self.old.is_none() {
295					// ignore "newer" old changes.
296					self.old = Some(old);
297				}
298			}
299
300			fn consume(
301				mut self,
302			) -> Option<(AuthoritySet<H, N>, SharedDataLocked<'a, AuthoritySet<H, N>>)> {
303				self.old
304					.take()
305					.map(|old| (old, self.guard.take().expect("only taken on deconstruction; qed")))
306			}
307		}
308
309		impl<'a, H, N> Drop for InnerGuard<'a, H, N> {
310			fn drop(&mut self) {
311				if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) {
312					*guard = old;
313				}
314			}
315		}
316
317		let number = *(block.header.number());
318		let maybe_change = self.check_new_change(&block.header, hash);
319
320		// returns a function for checking whether a block is a descendent of another
321		// consistent with querying client directly after importing the block.
322		let parent_hash = *block.header.parent_hash();
323		let is_descendent_of = is_descendent_of(&*self.inner, Some((hash, parent_hash)));
324
325		let mut guard = InnerGuard { guard: Some(self.authority_set.inner_locked()), old: None };
326
327		// whether to pause the old authority set -- happens after import
328		// of a forced change block.
329		let mut do_pause = false;
330
331		// add any pending changes.
332		if let Some(change) = maybe_change {
333			let old = guard.as_mut().clone();
334			guard.set_old(old);
335
336			if let DelayKind::Best { .. } = change.delay_kind {
337				do_pause = true;
338			}
339
340			guard
341				.as_mut()
342				.add_pending_change(change, &is_descendent_of)
343				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
344		}
345
346		let applied_changes = {
347			let forced_change_set = guard
348				.as_mut()
349				.apply_forced_changes(
350					hash,
351					number,
352					&is_descendent_of,
353					initial_sync,
354					self.telemetry.clone(),
355				)
356				.map_err(|e| ConsensusError::ClientImport(e.to_string()))
357				.map_err(ConsensusError::from)?;
358
359			if let Some((median_last_finalized_number, new_set)) = forced_change_set {
360				let new_authorities = {
361					let (set_id, new_authorities) = new_set.current();
362
363					// we will use the median last finalized number as a hint
364					// for the canon block the new authority set should start
365					// with. we use the minimum between the median and the local
366					// best finalized block.
367					let best_finalized_number = self.inner.info().finalized_number;
368					let canon_number = best_finalized_number.min(median_last_finalized_number);
369					let canon_hash = self.inner.hash(canon_number)
370							.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
371							.expect(
372								"the given block number is less or equal than the current best finalized number; \
373								 current best finalized number must exist in chain; qed."
374							);
375
376					NewAuthoritySet {
377						canon_number,
378						canon_hash,
379						set_id,
380						authorities: new_authorities.to_vec(),
381					}
382				};
383				let old = ::std::mem::replace(guard.as_mut(), new_set);
384				guard.set_old(old);
385
386				AppliedChanges::Forced(new_authorities)
387			} else {
388				let did_standard = guard
389					.as_mut()
390					.enacts_standard_change(hash, number, &is_descendent_of)
391					.map_err(|e| ConsensusError::ClientImport(e.to_string()))
392					.map_err(ConsensusError::from)?;
393
394				if let Some(root) = did_standard {
395					AppliedChanges::Standard(root)
396				} else {
397					AppliedChanges::None
398				}
399			}
400		};
401
402		// consume the guard safely and write necessary changes.
403		let just_in_case = guard.consume();
404		if let Some((_, ref authorities)) = just_in_case {
405			let authorities_change = match applied_changes {
406				AppliedChanges::Forced(ref new) => Some(new),
407				AppliedChanges::Standard(_) => None, // the change isn't actually applied yet.
408				AppliedChanges::None => None,
409			};
410
411			crate::aux_schema::update_authority_set::<Block, _, _>(
412				authorities,
413				authorities_change,
414				|insert| {
415					block
416						.auxiliary
417						.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
418				},
419			);
420		}
421
422		let just_in_case = just_in_case.map(|(o, i)| (o, i.release_mutex()));
423
424		Ok(PendingSetChanges { just_in_case, applied_changes, do_pause })
425	}
426
427	/// Read current set id form a given state.
428	fn current_set_id(&self, hash: Block::Hash) -> Result<SetId, ConsensusError> {
429		let runtime_version = self.inner.runtime_api().version(hash).map_err(|e| {
430			ConsensusError::ClientImport(format!(
431				"Unable to retrieve current runtime version. {}",
432				e
433			))
434		})?;
435
436		if runtime_version
437			.api_version(&<dyn GrandpaApi<Block>>::ID)
438			.map_or(false, |v| v < 3)
439		{
440			// The new API is not supported in this runtime. Try reading directly from storage.
441			// This code may be removed once warp sync to an old runtime is no longer needed.
442			for prefix in ["GrandpaFinality", "Grandpa"] {
443				let k = [
444					subsoil_crypto_hashing::twox_128(prefix.as_bytes()),
445					subsoil_crypto_hashing::twox_128(b"CurrentSetId"),
446				]
447				.concat();
448				if let Ok(Some(id)) =
449					self.inner.storage(hash, &soil_client::client_api::StorageKey(k.to_vec()))
450				{
451					if let Ok(id) = SetId::decode(&mut id.0.as_ref()) {
452						return Ok(id);
453					}
454				}
455			}
456			Err(ConsensusError::ClientImport("Unable to retrieve current set id.".into()))
457		} else {
458			self.inner
459				.runtime_api()
460				.current_set_id(hash)
461				.map_err(|e| ConsensusError::ClientImport(e.to_string()))
462		}
463	}
464
465	/// Import whole new state and reset authority set.
466	async fn import_state(
467		&self,
468		mut block: BlockImportParams<Block>,
469	) -> Result<ImportResult, ConsensusError> {
470		let hash = block.post_hash();
471		let number = *block.header.number();
472		// Force imported state finality.
473		block.finalized = true;
474		let import_result = (&*self.inner).import_block(block).await;
475		match import_result {
476			Ok(ImportResult::Imported(aux)) => {
477				// We've just imported a new state. We trust the sync module has verified
478				// finality proofs and that the state is correct and final.
479				// So we can read the authority list and set id from the state.
480				self.authority_set_hard_forks.lock().clear();
481				let authorities = self
482					.inner
483					.runtime_api()
484					.grandpa_authorities(hash)
485					.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
486				let set_id = self.current_set_id(hash)?;
487				let authority_set = AuthoritySet::new(
488					authorities.clone(),
489					set_id,
490					soil_fork_tree::ForkTree::new(),
491					Vec::new(),
492					AuthoritySetChanges::empty(),
493				)
494				.ok_or_else(|| ConsensusError::ClientImport("Invalid authority list".into()))?;
495				*self.authority_set.inner_locked() = authority_set.clone();
496
497				crate::aux_schema::update_authority_set::<Block, _, _>(
498					&authority_set,
499					None,
500					|insert| self.inner.insert_aux(insert, []),
501				)
502				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
503				let new_set =
504					NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities };
505				let _ = self
506					.send_voter_commands
507					.unbounded_send(VoterCommand::ChangeAuthorities(new_set));
508				Ok(ImportResult::Imported(aux))
509			},
510			Ok(r) => Ok(r),
511			Err(e) => Err(ConsensusError::ClientImport(e.to_string())),
512		}
513	}
514}
515
516#[async_trait::async_trait]
517impl<BE, Block: BlockT, Client, SC> BlockImport<Block> for GrandpaBlockImport<BE, Block, Client, SC>
518where
519	NumberFor<Block>: finality_grandpa::BlockNumberOps,
520	BE: Backend<Block>,
521	Client: ClientForGrandpa<Block, BE>,
522	Client::Api: GrandpaApi<Block>,
523	for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
524	SC: Send + Sync,
525{
526	type Error = ConsensusError;
527
528	async fn import_block(
529		&self,
530		mut block: BlockImportParams<Block>,
531	) -> Result<ImportResult, Self::Error> {
532		let hash = block.post_hash();
533		let number = *block.header.number();
534
535		// early exit if block already in chain, otherwise the check for
536		// authority changes will error when trying to re-import a change block
537		match self.inner.status(hash) {
538			Ok(BlockStatus::InChain) => {
539				// Strip justifications when re-importing an existing block.
540				let _justifications = block.justifications.take();
541				return (&*self.inner).import_block(block).await;
542			},
543			Ok(BlockStatus::Unknown) => {},
544			Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
545		}
546
547		if block.with_state() {
548			return self.import_state(block).await;
549		}
550
551		if number <= self.inner.info().finalized_number {
552			// Importing an old block. Just save justifications and authority set changes
553			if self.check_new_change(&block.header, hash).is_some() {
554				if block.justifications.is_none() {
555					return Err(ConsensusError::ClientImport(
556						"Justification required when importing \
557							an old block with authority set change."
558							.into(),
559					));
560				}
561				let mut authority_set = self.authority_set.inner_locked();
562				authority_set.authority_set_changes.insert(number);
563				crate::aux_schema::update_authority_set::<Block, _, _>(
564					&authority_set,
565					None,
566					|insert| {
567						block
568							.auxiliary
569							.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
570					},
571				);
572			}
573			return (&*self.inner).import_block(block).await;
574		}
575
576		// on initial sync we will restrict logging under info to avoid spam.
577		let initial_sync = block.origin == BlockOrigin::NetworkInitialSync;
578
579		let pending_changes = self.make_authorities_changes(&mut block, hash, initial_sync)?;
580
581		// we don't want to finalize on `inner.import_block`
582		let mut justifications = block.justifications.take();
583		let import_result = (&*self.inner).import_block(block).await;
584
585		let mut imported_aux = {
586			match import_result {
587				Ok(ImportResult::Imported(aux)) => aux,
588				Ok(r) => {
589					debug!(
590						target: LOG_TARGET,
591						"Restoring old authority set after block import result: {:?}", r,
592					);
593					pending_changes.revert();
594					return Ok(r);
595				},
596				Err(e) => {
597					debug!(
598						target: LOG_TARGET,
599						"Restoring old authority set after block import error: {}", e,
600					);
601					pending_changes.revert();
602					return Err(ConsensusError::ClientImport(e.to_string()));
603				},
604			}
605		};
606
607		let (applied_changes, do_pause) = pending_changes.defuse();
608
609		// Send the pause signal after import but BEFORE sending a `ChangeAuthorities` message.
610		if do_pause {
611			let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause(
612				"Forced change scheduled after inactivity".to_string(),
613			));
614		}
615
616		let needs_justification = applied_changes.needs_justification();
617
618		match applied_changes {
619			AppliedChanges::Forced(new) => {
620				// NOTE: when we do a force change we are "discrediting" the old set so we
621				// ignore any justifications from them. this block may contain a justification
622				// which should be checked and imported below against the new authority
623				// triggered by this forced change. the new grandpa voter will start at the
624				// last median finalized block (which is before the block that enacts the
625				// change), full nodes syncing the chain will not be able to successfully
626				// import justifications for those blocks since their local authority set view
627				// is still of the set before the forced change was enacted, still after #1867
628				// they should import the block and discard the justification, and they will
629				// then request a justification from sync if it's necessary (which they should
630				// then be able to successfully validate).
631				let _ =
632					self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
633
634				// we must clear all pending justifications requests, presumably they won't be
635				// finalized hence why this forced changes was triggered
636				imported_aux.clear_justification_requests = true;
637			},
638			AppliedChanges::Standard(false) => {
639				// we can't apply this change yet since there are other dependent changes that we
640				// need to apply first, drop any justification that might have been provided with
641				// the block to make sure we request them from `sync` which will ensure they'll be
642				// applied in-order.
643				justifications.take();
644			},
645			_ => {},
646		}
647
648		let grandpa_justification =
649			justifications.and_then(|just| just.into_justification(GRANDPA_ENGINE_ID));
650
651		match grandpa_justification {
652			Some(justification) => {
653				if environment::should_process_justification(
654					&*self.inner,
655					self.justification_import_period,
656					number,
657					needs_justification,
658				) {
659					let import_res = self.import_justification(
660						hash,
661						number,
662						(GRANDPA_ENGINE_ID, justification),
663						needs_justification,
664						initial_sync,
665					);
666
667					import_res.unwrap_or_else(|err| {
668						if needs_justification {
669							debug!(
670								target: LOG_TARGET,
671								"Requesting justification from peers due to imported block #{} that enacts authority set change with invalid justification: {}",
672								number,
673								err
674							);
675							imported_aux.bad_justification = true;
676							imported_aux.needs_justification = true;
677						}
678					});
679				} else {
680					debug!(
681						target: LOG_TARGET,
682						"Ignoring unnecessary justification for block #{}",
683						number,
684					);
685				}
686			},
687			None => {
688				if needs_justification {
689					debug!(
690						target: LOG_TARGET,
691						"Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
692						number,
693					);
694
695					imported_aux.needs_justification = true;
696				}
697			},
698		}
699
700		Ok(ImportResult::Imported(imported_aux))
701	}
702
703	async fn check_block(
704		&self,
705		block: BlockCheckParams<Block>,
706	) -> Result<ImportResult, Self::Error> {
707		self.inner.check_block(block).await
708	}
709}
710
711impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Client, SC> {
712	pub(crate) fn new(
713		inner: Arc<Client>,
714		justification_import_period: u32,
715		select_chain: SC,
716		authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
717		send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
718		authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
719		justification_sender: GrandpaJustificationSender<Block>,
720		telemetry: Option<TelemetryHandle>,
721	) -> GrandpaBlockImport<Backend, Block, Client, SC> {
722		// check for and apply any forced authority set hard fork that applies
723		// to the *current* authority set.
724		if let Some((_, change)) = authority_set_hard_forks
725			.iter()
726			.find(|(set_id, _)| *set_id == authority_set.set_id())
727		{
728			authority_set.inner().current_authorities = change.next_authorities.clone();
729		}
730
731		// index authority set hard forks by block hash so that they can be used
732		// by any node syncing the chain and importing a block hard fork
733		// authority set changes.
734		let authority_set_hard_forks = authority_set_hard_forks
735			.into_iter()
736			.map(|(_, change)| (change.canon_hash, change))
737			.collect::<HashMap<_, _>>();
738
739		// check for and apply any forced authority set hard fork that apply to
740		// any *pending* standard changes, checking by the block hash at which
741		// they were announced.
742		{
743			let mut authority_set = authority_set.inner();
744
745			authority_set.pending_standard_changes =
746				authority_set.pending_standard_changes.clone().map(&mut |hash, _, original| {
747					authority_set_hard_forks.get(hash).cloned().unwrap_or(original)
748				});
749		}
750
751		GrandpaBlockImport {
752			inner,
753			justification_import_period,
754			select_chain,
755			authority_set,
756			send_voter_commands,
757			authority_set_hard_forks: Mutex::new(authority_set_hard_forks),
758			justification_sender,
759			telemetry,
760			_phantom: PhantomData,
761		}
762	}
763}
764
765impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
766where
767	BE: Backend<Block>,
768	Client: ClientForGrandpa<Block, BE>,
769	NumberFor<Block>: finality_grandpa::BlockNumberOps,
770{
771	/// Import a block justification and finalize the block.
772	///
773	/// If `enacts_change` is set to true, then finalizing this block *must*
774	/// enact an authority set change, the function will panic otherwise.
775	fn import_justification(
776		&self,
777		hash: Block::Hash,
778		number: NumberFor<Block>,
779		justification: Justification,
780		enacts_change: bool,
781		initial_sync: bool,
782	) -> Result<(), ConsensusError> {
783		if justification.0 != GRANDPA_ENGINE_ID {
784			// TODO: the import queue needs to be refactored to be able dispatch to the correct
785			// `JustificationImport` instance based on `ConsensusEngineId`, or we need to build a
786			// justification import pipeline similar to what we do for `BlockImport`. In the
787			// meantime we'll just drop the justification, since this is only used for BEEFY which
788			// is still WIP.
789			return Ok(());
790		}
791
792		let justification = GrandpaJustification::decode_and_verify_finalizes(
793			&justification.1,
794			(hash, number),
795			self.authority_set.set_id(),
796			&self.authority_set.current_authorities(),
797		);
798
799		let justification = match justification {
800			Err(e) => {
801				return match e {
802					soil_client::blockchain::Error::OutdatedJustification => {
803						Err(ConsensusError::OutdatedJustification)
804					},
805					_ => Err(ConsensusError::ClientImport(e.to_string())),
806				};
807			},
808			Ok(justification) => justification,
809		};
810
811		let result = environment::finalize_block(
812			self.inner.clone(),
813			&self.authority_set,
814			None,
815			hash,
816			number,
817			justification.into(),
818			initial_sync,
819			Some(&self.justification_sender),
820			self.telemetry.clone(),
821		);
822
823		match result {
824			Err(CommandOrError::VoterCommand(command)) => {
825				grandpa_log!(
826					initial_sync,
827					"👴 Imported justification for block #{} that triggers \
828					command {}, signaling voter.",
829					number,
830					command,
831				);
832
833				// send the command to the voter
834				let _ = self.send_voter_commands.unbounded_send(command);
835			},
836			Err(CommandOrError::Error(e)) => {
837				return Err(match e {
838					Error::Grandpa(error) => ConsensusError::ClientImport(error.to_string()),
839					Error::Network(error) => ConsensusError::ClientImport(error),
840					Error::Blockchain(error) => ConsensusError::ClientImport(error),
841					Error::Client(error) => ConsensusError::ClientImport(error.to_string()),
842					Error::Safety(error) => ConsensusError::ClientImport(error),
843					Error::Signing(error) => ConsensusError::ClientImport(error),
844					Error::Timer(error) => ConsensusError::ClientImport(error.to_string()),
845					Error::RuntimeApi(error) => ConsensusError::ClientImport(error.to_string()),
846				})
847			},
848			Ok(_) => {
849				assert!(
850					!enacts_change,
851					"returns Ok when no authority set change should be enacted; qed;"
852				);
853			},
854		}
855
856		Ok(())
857	}
858}