1use std::{collections::HashMap, marker::PhantomData, sync::Arc};
20
21use codec::Decode;
22use log::debug;
23use parking_lot::Mutex;
24
25use sc_client_api::{backend::Backend, utils::is_descendent_of};
26use sc_consensus::{
27 shared_data::{SharedDataLocked, SharedDataLockedUpgradable},
28 BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
29};
30use sc_telemetry::TelemetryHandle;
31use sc_utils::mpsc::TracingUnboundedSender;
32use sp_api::{Core, RuntimeApiInfo};
33use sp_blockchain::BlockStatus;
34use sp_consensus::{BlockOrigin, Error as ConsensusError, SelectChain};
35use sp_consensus_grandpa::{ConsensusLog, GrandpaApi, ScheduledChange, SetId, GRANDPA_ENGINE_ID};
36use sp_runtime::{
37 generic::OpaqueDigestItemId,
38 traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero},
39 Justification,
40};
41
42use crate::{
43 authorities::{AuthoritySet, DelayKind, PendingChange, SharedAuthoritySet},
44 environment,
45 justification::GrandpaJustification,
46 notification::GrandpaJustificationSender,
47 AuthoritySetChanges, ClientForGrandpa, CommandOrError, Error, NewAuthoritySet, VoterCommand,
48 LOG_TARGET,
49};
50
51pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
61 inner: Arc<Client>,
62 justification_import_period: u32,
63 select_chain: SC,
64 authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
65 send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
66 authority_set_hard_forks:
67 Mutex<HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>>,
68 justification_sender: GrandpaJustificationSender<Block>,
69 telemetry: Option<TelemetryHandle>,
70 _phantom: PhantomData<Backend>,
71}
72
73impl<Backend, Block: BlockT, Client, SC: Clone> Clone
74 for GrandpaBlockImport<Backend, Block, Client, SC>
75{
76 fn clone(&self) -> Self {
77 GrandpaBlockImport {
78 inner: self.inner.clone(),
79 justification_import_period: self.justification_import_period,
80 select_chain: self.select_chain.clone(),
81 authority_set: self.authority_set.clone(),
82 send_voter_commands: self.send_voter_commands.clone(),
83 authority_set_hard_forks: Mutex::new(self.authority_set_hard_forks.lock().clone()),
84 justification_sender: self.justification_sender.clone(),
85 telemetry: self.telemetry.clone(),
86 _phantom: PhantomData,
87 }
88 }
89}
90
91#[async_trait::async_trait]
92impl<BE, Block: BlockT, Client, SC> JustificationImport<Block>
93 for GrandpaBlockImport<BE, Block, Client, SC>
94where
95 NumberFor<Block>: finality_grandpa::BlockNumberOps,
96 BE: Backend<Block>,
97 Client: ClientForGrandpa<Block, BE>,
98 SC: SelectChain<Block>,
99{
100 type Error = ConsensusError;
101
102 async fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor<Block>)> {
103 let mut out = Vec::new();
104 let chain_info = self.inner.info();
105
106 let pending_changes: Vec<_> =
109 self.authority_set.inner().pending_changes().cloned().collect();
110
111 for pending_change in pending_changes {
112 if pending_change.delay_kind == DelayKind::Finalized &&
113 pending_change.effective_number() > chain_info.finalized_number &&
114 pending_change.effective_number() <= chain_info.best_number
115 {
116 let effective_block_hash = if !pending_change.delay.is_zero() {
117 self.select_chain
118 .finality_target(
119 pending_change.canon_hash,
120 Some(pending_change.effective_number()),
121 )
122 .await
123 } else {
124 Ok(pending_change.canon_hash)
125 };
126
127 if let Ok(hash) = effective_block_hash {
128 if let Ok(Some(header)) = self.inner.header(hash) {
129 if *header.number() == pending_change.effective_number() {
130 out.push((header.hash(), *header.number()));
131 }
132 }
133 }
134 }
135 }
136
137 out
138 }
139
140 async fn import_justification(
141 &mut self,
142 hash: Block::Hash,
143 number: NumberFor<Block>,
144 justification: Justification,
145 ) -> Result<(), Self::Error> {
146 GrandpaBlockImport::import_justification(self, hash, number, justification, false, false)
152 }
153}
154
155enum AppliedChanges<H, N> {
156 Standard(bool), Forced(NewAuthoritySet<H, N>),
158 None,
159}
160
161impl<H, N> AppliedChanges<H, N> {
162 fn needs_justification(&self) -> bool {
163 match *self {
164 AppliedChanges::Standard(_) => true,
165 AppliedChanges::Forced(_) | AppliedChanges::None => false,
166 }
167 }
168}
169
170struct PendingSetChanges<Block: BlockT> {
171 just_in_case: Option<(
172 AuthoritySet<Block::Hash, NumberFor<Block>>,
173 SharedDataLockedUpgradable<AuthoritySet<Block::Hash, NumberFor<Block>>>,
174 )>,
175 applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>,
176 do_pause: bool,
177}
178
179impl<Block: BlockT> PendingSetChanges<Block> {
180 fn revert(self) {}
182
183 fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) {
184 self.just_in_case = None;
185 let applied_changes = std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
186 (applied_changes, self.do_pause)
187 }
188}
189
190impl<Block: BlockT> Drop for PendingSetChanges<Block> {
191 fn drop(&mut self) {
192 if let Some((old_set, mut authorities)) = self.just_in_case.take() {
193 *authorities.upgrade() = old_set;
194 }
195 }
196}
197
198pub fn find_scheduled_change<B: BlockT>(
201 header: &B::Header,
202) -> Option<ScheduledChange<NumberFor<B>>> {
203 let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
204
205 let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
206 ConsensusLog::ScheduledChange(change) => Some(change),
207 _ => None,
208 };
209
210 header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
213}
214
215pub fn find_forced_change<B: BlockT>(
218 header: &B::Header,
219) -> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)> {
220 let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
221
222 let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
223 ConsensusLog::ForcedChange(delay, change) => Some((delay, change)),
224 _ => None,
225 };
226
227 header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
230}
231
232impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
233where
234 NumberFor<Block>: finality_grandpa::BlockNumberOps,
235 BE: Backend<Block>,
236 Client: ClientForGrandpa<Block, BE>,
237 Client::Api: GrandpaApi<Block>,
238 for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
239{
240 fn check_new_change(
242 &self,
243 header: &Block::Header,
244 hash: Block::Hash,
245 ) -> Option<PendingChange<Block::Hash, NumberFor<Block>>> {
246 if let Some(change) = self.authority_set_hard_forks.lock().get(&hash) {
248 return Some(change.clone());
249 }
250
251 if let Some((median_last_finalized, change)) = find_forced_change::<Block>(header) {
253 return Some(PendingChange {
254 next_authorities: change.next_authorities,
255 delay: change.delay,
256 canon_height: *header.number(),
257 canon_hash: hash,
258 delay_kind: DelayKind::Best { median_last_finalized },
259 });
260 }
261
262 let change = find_scheduled_change::<Block>(header)?;
264 Some(PendingChange {
265 next_authorities: change.next_authorities,
266 delay: change.delay,
267 canon_height: *header.number(),
268 canon_hash: hash,
269 delay_kind: DelayKind::Finalized,
270 })
271 }
272
273 fn make_authorities_changes(
274 &self,
275 block: &mut BlockImportParams<Block>,
276 hash: Block::Hash,
277 initial_sync: bool,
278 ) -> Result<PendingSetChanges<Block>, ConsensusError> {
279 if block.origin == BlockOrigin::WarpSync {
283 return Ok(PendingSetChanges {
284 just_in_case: None,
285 applied_changes: AppliedChanges::None,
286 do_pause: false,
287 });
288 }
289
290 struct InnerGuard<'a, H, N> {
294 old: Option<AuthoritySet<H, N>>,
295 guard: Option<SharedDataLocked<'a, AuthoritySet<H, N>>>,
296 }
297
298 impl<'a, H, N> InnerGuard<'a, H, N> {
299 fn as_mut(&mut self) -> &mut AuthoritySet<H, N> {
300 self.guard.as_mut().expect("only taken on deconstruction; qed")
301 }
302
303 fn set_old(&mut self, old: AuthoritySet<H, N>) {
304 if self.old.is_none() {
305 self.old = Some(old);
307 }
308 }
309
310 fn consume(
311 mut self,
312 ) -> Option<(AuthoritySet<H, N>, SharedDataLocked<'a, AuthoritySet<H, N>>)> {
313 self.old
314 .take()
315 .map(|old| (old, self.guard.take().expect("only taken on deconstruction; qed")))
316 }
317 }
318
319 impl<'a, H, N> Drop for InnerGuard<'a, H, N> {
320 fn drop(&mut self) {
321 if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) {
322 *guard = old;
323 }
324 }
325 }
326
327 let number = *(block.header.number());
328 let maybe_change = self.check_new_change(&block.header, hash);
329
330 let parent_hash = *block.header.parent_hash();
333 let is_descendent_of = is_descendent_of(&*self.inner, Some((hash, parent_hash)));
334
335 let mut guard = InnerGuard { guard: Some(self.authority_set.inner_locked()), old: None };
336
337 let mut do_pause = false;
340
341 if let Some(change) = maybe_change {
343 let old = guard.as_mut().clone();
344 guard.set_old(old);
345
346 if let DelayKind::Best { .. } = change.delay_kind {
347 do_pause = true;
348 }
349
350 guard
351 .as_mut()
352 .add_pending_change(change, &is_descendent_of)
353 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
354 }
355
356 let applied_changes = {
357 let forced_change_set = guard
358 .as_mut()
359 .apply_forced_changes(
360 hash,
361 number,
362 &is_descendent_of,
363 initial_sync,
364 self.telemetry.clone(),
365 )
366 .map_err(|e| ConsensusError::ClientImport(e.to_string()))
367 .map_err(ConsensusError::from)?;
368
369 if let Some((median_last_finalized_number, new_set)) = forced_change_set {
370 let new_authorities = {
371 let (set_id, new_authorities) = new_set.current();
372
373 let best_finalized_number = self.inner.info().finalized_number;
378 let canon_number = best_finalized_number.min(median_last_finalized_number);
379 let canon_hash = self.inner.hash(canon_number)
380 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
381 .expect(
382 "the given block number is less or equal than the current best finalized number; \
383 current best finalized number must exist in chain; qed."
384 );
385
386 NewAuthoritySet {
387 canon_number,
388 canon_hash,
389 set_id,
390 authorities: new_authorities.to_vec(),
391 }
392 };
393 let old = ::std::mem::replace(guard.as_mut(), new_set);
394 guard.set_old(old);
395
396 AppliedChanges::Forced(new_authorities)
397 } else {
398 let did_standard = guard
399 .as_mut()
400 .enacts_standard_change(hash, number, &is_descendent_of)
401 .map_err(|e| ConsensusError::ClientImport(e.to_string()))
402 .map_err(ConsensusError::from)?;
403
404 if let Some(root) = did_standard {
405 AppliedChanges::Standard(root)
406 } else {
407 AppliedChanges::None
408 }
409 }
410 };
411
412 let just_in_case = guard.consume();
414 if let Some((_, ref authorities)) = just_in_case {
415 let authorities_change = match applied_changes {
416 AppliedChanges::Forced(ref new) => Some(new),
417 AppliedChanges::Standard(_) => None, AppliedChanges::None => None,
419 };
420
421 crate::aux_schema::update_authority_set::<Block, _, _>(
422 authorities,
423 authorities_change,
424 |insert| {
425 block
426 .auxiliary
427 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
428 },
429 );
430 }
431
432 let just_in_case = just_in_case.map(|(o, i)| (o, i.release_mutex()));
433
434 Ok(PendingSetChanges { just_in_case, applied_changes, do_pause })
435 }
436
437 fn current_set_id(&self, hash: Block::Hash) -> Result<SetId, ConsensusError> {
439 let runtime_version = self.inner.runtime_api().version(hash).map_err(|e| {
440 ConsensusError::ClientImport(format!(
441 "Unable to retrieve current runtime version. {}",
442 e
443 ))
444 })?;
445
446 if runtime_version
447 .api_version(&<dyn GrandpaApi<Block>>::ID)
448 .map_or(false, |v| v < 3)
449 {
450 for prefix in ["GrandpaFinality", "Grandpa"] {
453 let k = [
454 sp_crypto_hashing::twox_128(prefix.as_bytes()),
455 sp_crypto_hashing::twox_128(b"CurrentSetId"),
456 ]
457 .concat();
458 if let Ok(Some(id)) =
459 self.inner.storage(hash, &sc_client_api::StorageKey(k.to_vec()))
460 {
461 if let Ok(id) = SetId::decode(&mut id.0.as_ref()) {
462 return Ok(id);
463 }
464 }
465 }
466 Err(ConsensusError::ClientImport("Unable to retrieve current set id.".into()))
467 } else {
468 self.inner
469 .runtime_api()
470 .current_set_id(hash)
471 .map_err(|e| ConsensusError::ClientImport(e.to_string()))
472 }
473 }
474
475 async fn import_state(
477 &self,
478 mut block: BlockImportParams<Block>,
479 ) -> Result<ImportResult, ConsensusError> {
480 let hash = block.post_hash();
481 let number = *block.header.number();
482 block.finalized = true;
484 let import_result = (&*self.inner).import_block(block).await;
485 match import_result {
486 Ok(ImportResult::Imported(aux)) => {
487 self.authority_set_hard_forks.lock().clear();
491 let authorities = self
492 .inner
493 .runtime_api()
494 .grandpa_authorities(hash)
495 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
496 let set_id = self.current_set_id(hash)?;
497 let authority_set = AuthoritySet::new(
498 authorities.clone(),
499 set_id,
500 fork_tree::ForkTree::new(),
501 Vec::new(),
502 AuthoritySetChanges::empty(),
503 )
504 .ok_or_else(|| ConsensusError::ClientImport("Invalid authority list".into()))?;
505 *self.authority_set.inner_locked() = authority_set.clone();
506
507 crate::aux_schema::update_authority_set::<Block, _, _>(
508 &authority_set,
509 None,
510 |insert| self.inner.insert_aux(insert, []),
511 )
512 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
513 let new_set =
514 NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities };
515 let _ = self
516 .send_voter_commands
517 .unbounded_send(VoterCommand::ChangeAuthorities(new_set));
518 Ok(ImportResult::Imported(aux))
519 },
520 Ok(r) => Ok(r),
521 Err(e) => Err(ConsensusError::ClientImport(e.to_string())),
522 }
523 }
524}
525
526#[async_trait::async_trait]
527impl<BE, Block: BlockT, Client, SC> BlockImport<Block> for GrandpaBlockImport<BE, Block, Client, SC>
528where
529 NumberFor<Block>: finality_grandpa::BlockNumberOps,
530 BE: Backend<Block>,
531 Client: ClientForGrandpa<Block, BE>,
532 Client::Api: GrandpaApi<Block>,
533 for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
534 SC: Send + Sync,
535{
536 type Error = ConsensusError;
537
538 async fn import_block(
539 &self,
540 mut block: BlockImportParams<Block>,
541 ) -> Result<ImportResult, Self::Error> {
542 let hash = block.post_hash();
543 let number = *block.header.number();
544
545 match self.inner.status(hash) {
548 Ok(BlockStatus::InChain) => {
549 let _justifications = block.justifications.take();
551 return (&*self.inner).import_block(block).await;
552 },
553 Ok(BlockStatus::Unknown) => {},
554 Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
555 }
556
557 if block.with_state() {
558 return self.import_state(block).await;
559 }
560
561 if number <= self.inner.info().finalized_number {
562 if self.check_new_change(&block.header, hash).is_some() {
564 if block.justifications.is_none() {
565 return Err(ConsensusError::ClientImport(
566 "Justification required when importing \
567 an old block with authority set change."
568 .into(),
569 ));
570 }
571 let mut authority_set = self.authority_set.inner_locked();
572 authority_set.authority_set_changes.insert(number);
573 crate::aux_schema::update_authority_set::<Block, _, _>(
574 &authority_set,
575 None,
576 |insert| {
577 block
578 .auxiliary
579 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
580 },
581 );
582 }
583 return (&*self.inner).import_block(block).await;
584 }
585
586 let initial_sync = block.origin == BlockOrigin::NetworkInitialSync;
588
589 let pending_changes = self.make_authorities_changes(&mut block, hash, initial_sync)?;
590
591 let mut justifications = block.justifications.take();
593 let import_result = (&*self.inner).import_block(block).await;
594
595 let mut imported_aux = {
596 match import_result {
597 Ok(ImportResult::Imported(aux)) => aux,
598 Ok(r) => {
599 debug!(
600 target: LOG_TARGET,
601 "Restoring old authority set after block import result: {:?}", r,
602 );
603 pending_changes.revert();
604 return Ok(r);
605 },
606 Err(e) => {
607 debug!(
608 target: LOG_TARGET,
609 "Restoring old authority set after block import error: {}", e,
610 );
611 pending_changes.revert();
612 return Err(ConsensusError::ClientImport(e.to_string()));
613 },
614 }
615 };
616
617 let (applied_changes, do_pause) = pending_changes.defuse();
618
619 if do_pause {
621 let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause(
622 "Forced change scheduled after inactivity".to_string(),
623 ));
624 }
625
626 let needs_justification = applied_changes.needs_justification();
627
628 match applied_changes {
629 AppliedChanges::Forced(new) => {
630 let _ =
642 self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
643
644 imported_aux.clear_justification_requests = true;
647 },
648 AppliedChanges::Standard(false) => {
649 justifications.take();
654 },
655 _ => {},
656 }
657
658 let grandpa_justification =
659 justifications.and_then(|just| just.into_justification(GRANDPA_ENGINE_ID));
660
661 match grandpa_justification {
662 Some(justification) => {
663 if environment::should_process_justification(
664 &*self.inner,
665 self.justification_import_period,
666 number,
667 needs_justification,
668 ) {
669 let import_res = self.import_justification(
670 hash,
671 number,
672 (GRANDPA_ENGINE_ID, justification),
673 needs_justification,
674 initial_sync,
675 );
676
677 import_res.unwrap_or_else(|err| {
678 if needs_justification {
679 debug!(
680 target: LOG_TARGET,
681 "Requesting justification from peers due to imported block #{} that enacts authority set change with invalid justification: {}",
682 number,
683 err
684 );
685 imported_aux.bad_justification = true;
686 imported_aux.needs_justification = true;
687 }
688 });
689 } else {
690 debug!(
691 target: LOG_TARGET,
692 "Ignoring unnecessary justification for block #{}",
693 number,
694 );
695 }
696 },
697 None => {
698 if needs_justification {
699 debug!(
700 target: LOG_TARGET,
701 "Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
702 number,
703 );
704
705 imported_aux.needs_justification = true;
706 }
707 },
708 }
709
710 Ok(ImportResult::Imported(imported_aux))
711 }
712
713 async fn check_block(
714 &self,
715 block: BlockCheckParams<Block>,
716 ) -> Result<ImportResult, Self::Error> {
717 self.inner.check_block(block).await
718 }
719}
720
721impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Client, SC> {
722 pub(crate) fn new(
723 inner: Arc<Client>,
724 justification_import_period: u32,
725 select_chain: SC,
726 authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
727 send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
728 authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
729 justification_sender: GrandpaJustificationSender<Block>,
730 telemetry: Option<TelemetryHandle>,
731 ) -> GrandpaBlockImport<Backend, Block, Client, SC> {
732 if let Some((_, change)) = authority_set_hard_forks
735 .iter()
736 .find(|(set_id, _)| *set_id == authority_set.set_id())
737 {
738 authority_set.inner().current_authorities = change.next_authorities.clone();
739 }
740
741 let authority_set_hard_forks = authority_set_hard_forks
745 .into_iter()
746 .map(|(_, change)| (change.canon_hash, change))
747 .collect::<HashMap<_, _>>();
748
749 {
753 let mut authority_set = authority_set.inner();
754
755 authority_set.pending_standard_changes =
756 authority_set.pending_standard_changes.clone().map(&mut |hash, _, original| {
757 authority_set_hard_forks.get(hash).cloned().unwrap_or(original)
758 });
759 }
760
761 GrandpaBlockImport {
762 inner,
763 justification_import_period,
764 select_chain,
765 authority_set,
766 send_voter_commands,
767 authority_set_hard_forks: Mutex::new(authority_set_hard_forks),
768 justification_sender,
769 telemetry,
770 _phantom: PhantomData,
771 }
772 }
773}
774
775impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
776where
777 BE: Backend<Block>,
778 Client: ClientForGrandpa<Block, BE>,
779 NumberFor<Block>: finality_grandpa::BlockNumberOps,
780{
781 fn import_justification(
786 &self,
787 hash: Block::Hash,
788 number: NumberFor<Block>,
789 justification: Justification,
790 enacts_change: bool,
791 initial_sync: bool,
792 ) -> Result<(), ConsensusError> {
793 if justification.0 != GRANDPA_ENGINE_ID {
794 return Ok(());
800 }
801
802 let justification = GrandpaJustification::decode_and_verify_finalizes(
803 &justification.1,
804 (hash, number),
805 self.authority_set.set_id(),
806 &self.authority_set.current_authorities(),
807 );
808
809 let justification = match justification {
810 Err(e) => {
811 return match e {
812 sp_blockchain::Error::OutdatedJustification => {
813 Err(ConsensusError::OutdatedJustification)
814 },
815 _ => Err(ConsensusError::ClientImport(e.to_string())),
816 };
817 },
818 Ok(justification) => justification,
819 };
820
821 let result = environment::finalize_block(
822 self.inner.clone(),
823 &self.authority_set,
824 None,
825 hash,
826 number,
827 justification.into(),
828 initial_sync,
829 Some(&self.justification_sender),
830 self.telemetry.clone(),
831 );
832
833 match result {
834 Err(CommandOrError::VoterCommand(command)) => {
835 grandpa_log!(
836 initial_sync,
837 "👴 Imported justification for block #{} that triggers \
838 command {}, signaling voter.",
839 number,
840 command,
841 );
842
843 let _ = self.send_voter_commands.unbounded_send(command);
845 },
846 Err(CommandOrError::Error(e)) => {
847 return Err(match e {
848 Error::Grandpa(error) => ConsensusError::ClientImport(error.to_string()),
849 Error::Network(error) => ConsensusError::ClientImport(error),
850 Error::Blockchain(error) => ConsensusError::ClientImport(error),
851 Error::Client(error) => ConsensusError::ClientImport(error.to_string()),
852 Error::Safety(error) => ConsensusError::ClientImport(error),
853 Error::Signing(error) => ConsensusError::ClientImport(error),
854 Error::Timer(error) => ConsensusError::ClientImport(error.to_string()),
855 Error::RuntimeApi(error) => ConsensusError::ClientImport(error.to_string()),
856 })
857 },
858 Ok(_) => {
859 assert!(
860 !enacts_change,
861 "returns Ok when no authority set change should be enacted; qed;"
862 );
863 },
864 }
865
866 Ok(())
867 }
868}