1use std::{collections::HashMap, marker::PhantomData, sync::Arc};
8
9use codec::Decode;
10use log::debug;
11use parking_lot::Mutex;
12
13use soil_client::blockchain::BlockStatus;
14use soil_client::client_api::{backend::Backend, utils::is_descendent_of};
15use soil_client::consensus::{BlockOrigin, Error as ConsensusError, SelectChain};
16use soil_client::import::{
17 BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
18};
19use soil_client::utils::mpsc::TracingUnboundedSender;
20use soil_consensus::shared_data::{SharedDataLocked, SharedDataLockedUpgradable};
21use soil_telemetry::TelemetryHandle;
22use subsoil::api::{Core, RuntimeApiInfo};
23use subsoil::consensus::grandpa::{
24 ConsensusLog, GrandpaApi, ScheduledChange, SetId, GRANDPA_ENGINE_ID,
25};
26use subsoil::runtime::{
27 generic::OpaqueDigestItemId,
28 traits::{Block as BlockT, Header as HeaderT, NumberFor, Zero},
29 Justification,
30};
31
32use crate::{
33 authorities::{AuthoritySet, DelayKind, PendingChange, SharedAuthoritySet},
34 environment,
35 justification::GrandpaJustification,
36 notification::GrandpaJustificationSender,
37 AuthoritySetChanges, ClientForGrandpa, CommandOrError, Error, NewAuthoritySet, VoterCommand,
38 LOG_TARGET,
39};
40
41pub struct GrandpaBlockImport<Backend, Block: BlockT, Client, SC> {
51 inner: Arc<Client>,
52 justification_import_period: u32,
53 select_chain: SC,
54 authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
55 send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
56 authority_set_hard_forks:
57 Mutex<HashMap<Block::Hash, PendingChange<Block::Hash, NumberFor<Block>>>>,
58 justification_sender: GrandpaJustificationSender<Block>,
59 telemetry: Option<TelemetryHandle>,
60 _phantom: PhantomData<Backend>,
61}
62
63impl<Backend, Block: BlockT, Client, SC: Clone> Clone
64 for GrandpaBlockImport<Backend, Block, Client, SC>
65{
66 fn clone(&self) -> Self {
67 GrandpaBlockImport {
68 inner: self.inner.clone(),
69 justification_import_period: self.justification_import_period,
70 select_chain: self.select_chain.clone(),
71 authority_set: self.authority_set.clone(),
72 send_voter_commands: self.send_voter_commands.clone(),
73 authority_set_hard_forks: Mutex::new(self.authority_set_hard_forks.lock().clone()),
74 justification_sender: self.justification_sender.clone(),
75 telemetry: self.telemetry.clone(),
76 _phantom: PhantomData,
77 }
78 }
79}
80
81#[async_trait::async_trait]
82impl<BE, Block: BlockT, Client, SC> JustificationImport<Block>
83 for GrandpaBlockImport<BE, Block, Client, SC>
84where
85 NumberFor<Block>: finality_grandpa::BlockNumberOps,
86 BE: Backend<Block>,
87 Client: ClientForGrandpa<Block, BE>,
88 SC: SelectChain<Block>,
89{
90 type Error = ConsensusError;
91
92 async fn on_start(&mut self) -> Vec<(Block::Hash, NumberFor<Block>)> {
93 let mut out = Vec::new();
94 let chain_info = self.inner.info();
95
96 let pending_changes: Vec<_> =
99 self.authority_set.inner().pending_changes().cloned().collect();
100
101 for pending_change in pending_changes {
102 if pending_change.delay_kind == DelayKind::Finalized
103 && pending_change.effective_number() > chain_info.finalized_number
104 && pending_change.effective_number() <= chain_info.best_number
105 {
106 let effective_block_hash = if !pending_change.delay.is_zero() {
107 self.select_chain
108 .finality_target(
109 pending_change.canon_hash,
110 Some(pending_change.effective_number()),
111 )
112 .await
113 } else {
114 Ok(pending_change.canon_hash)
115 };
116
117 if let Ok(hash) = effective_block_hash {
118 if let Ok(Some(header)) = self.inner.header(hash) {
119 if *header.number() == pending_change.effective_number() {
120 out.push((header.hash(), *header.number()));
121 }
122 }
123 }
124 }
125 }
126
127 out
128 }
129
130 async fn import_justification(
131 &mut self,
132 hash: Block::Hash,
133 number: NumberFor<Block>,
134 justification: Justification,
135 ) -> Result<(), Self::Error> {
136 GrandpaBlockImport::import_justification(self, hash, number, justification, false, false)
142 }
143}
144
145enum AppliedChanges<H, N> {
146 Standard(bool), Forced(NewAuthoritySet<H, N>),
148 None,
149}
150
151impl<H, N> AppliedChanges<H, N> {
152 fn needs_justification(&self) -> bool {
153 match *self {
154 AppliedChanges::Standard(_) => true,
155 AppliedChanges::Forced(_) | AppliedChanges::None => false,
156 }
157 }
158}
159
160struct PendingSetChanges<Block: BlockT> {
161 just_in_case: Option<(
162 AuthoritySet<Block::Hash, NumberFor<Block>>,
163 SharedDataLockedUpgradable<AuthoritySet<Block::Hash, NumberFor<Block>>>,
164 )>,
165 applied_changes: AppliedChanges<Block::Hash, NumberFor<Block>>,
166 do_pause: bool,
167}
168
169impl<Block: BlockT> PendingSetChanges<Block> {
170 fn revert(self) {}
172
173 fn defuse(mut self) -> (AppliedChanges<Block::Hash, NumberFor<Block>>, bool) {
174 self.just_in_case = None;
175 let applied_changes = std::mem::replace(&mut self.applied_changes, AppliedChanges::None);
176 (applied_changes, self.do_pause)
177 }
178}
179
180impl<Block: BlockT> Drop for PendingSetChanges<Block> {
181 fn drop(&mut self) {
182 if let Some((old_set, mut authorities)) = self.just_in_case.take() {
183 *authorities.upgrade() = old_set;
184 }
185 }
186}
187
188pub fn find_scheduled_change<B: BlockT>(
191 header: &B::Header,
192) -> Option<ScheduledChange<NumberFor<B>>> {
193 let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
194
195 let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
196 ConsensusLog::ScheduledChange(change) => Some(change),
197 _ => None,
198 };
199
200 header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
203}
204
205pub fn find_forced_change<B: BlockT>(
208 header: &B::Header,
209) -> Option<(NumberFor<B>, ScheduledChange<NumberFor<B>>)> {
210 let id = OpaqueDigestItemId::Consensus(&GRANDPA_ENGINE_ID);
211
212 let filter_log = |log: ConsensusLog<NumberFor<B>>| match log {
213 ConsensusLog::ForcedChange(delay, change) => Some((delay, change)),
214 _ => None,
215 };
216
217 header.digest().convert_first(|l| l.try_to(id).and_then(filter_log))
220}
221
222impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
223where
224 NumberFor<Block>: finality_grandpa::BlockNumberOps,
225 BE: Backend<Block>,
226 Client: ClientForGrandpa<Block, BE>,
227 Client::Api: GrandpaApi<Block>,
228 for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
229{
230 fn check_new_change(
232 &self,
233 header: &Block::Header,
234 hash: Block::Hash,
235 ) -> Option<PendingChange<Block::Hash, NumberFor<Block>>> {
236 if let Some(change) = self.authority_set_hard_forks.lock().get(&hash) {
238 return Some(change.clone());
239 }
240
241 if let Some((median_last_finalized, change)) = find_forced_change::<Block>(header) {
243 return Some(PendingChange {
244 next_authorities: change.next_authorities,
245 delay: change.delay,
246 canon_height: *header.number(),
247 canon_hash: hash,
248 delay_kind: DelayKind::Best { median_last_finalized },
249 });
250 }
251
252 let change = find_scheduled_change::<Block>(header)?;
254 Some(PendingChange {
255 next_authorities: change.next_authorities,
256 delay: change.delay,
257 canon_height: *header.number(),
258 canon_hash: hash,
259 delay_kind: DelayKind::Finalized,
260 })
261 }
262
263 fn make_authorities_changes(
264 &self,
265 block: &mut BlockImportParams<Block>,
266 hash: Block::Hash,
267 initial_sync: bool,
268 ) -> Result<PendingSetChanges<Block>, ConsensusError> {
269 if block.origin == BlockOrigin::WarpSync {
273 return Ok(PendingSetChanges {
274 just_in_case: None,
275 applied_changes: AppliedChanges::None,
276 do_pause: false,
277 });
278 }
279
280 struct InnerGuard<'a, H, N> {
284 old: Option<AuthoritySet<H, N>>,
285 guard: Option<SharedDataLocked<'a, AuthoritySet<H, N>>>,
286 }
287
288 impl<'a, H, N> InnerGuard<'a, H, N> {
289 fn as_mut(&mut self) -> &mut AuthoritySet<H, N> {
290 self.guard.as_mut().expect("only taken on deconstruction; qed")
291 }
292
293 fn set_old(&mut self, old: AuthoritySet<H, N>) {
294 if self.old.is_none() {
295 self.old = Some(old);
297 }
298 }
299
300 fn consume(
301 mut self,
302 ) -> Option<(AuthoritySet<H, N>, SharedDataLocked<'a, AuthoritySet<H, N>>)> {
303 self.old
304 .take()
305 .map(|old| (old, self.guard.take().expect("only taken on deconstruction; qed")))
306 }
307 }
308
309 impl<'a, H, N> Drop for InnerGuard<'a, H, N> {
310 fn drop(&mut self) {
311 if let (Some(mut guard), Some(old)) = (self.guard.take(), self.old.take()) {
312 *guard = old;
313 }
314 }
315 }
316
317 let number = *(block.header.number());
318 let maybe_change = self.check_new_change(&block.header, hash);
319
320 let parent_hash = *block.header.parent_hash();
323 let is_descendent_of = is_descendent_of(&*self.inner, Some((hash, parent_hash)));
324
325 let mut guard = InnerGuard { guard: Some(self.authority_set.inner_locked()), old: None };
326
327 let mut do_pause = false;
330
331 if let Some(change) = maybe_change {
333 let old = guard.as_mut().clone();
334 guard.set_old(old);
335
336 if let DelayKind::Best { .. } = change.delay_kind {
337 do_pause = true;
338 }
339
340 guard
341 .as_mut()
342 .add_pending_change(change, &is_descendent_of)
343 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
344 }
345
346 let applied_changes = {
347 let forced_change_set = guard
348 .as_mut()
349 .apply_forced_changes(
350 hash,
351 number,
352 &is_descendent_of,
353 initial_sync,
354 self.telemetry.clone(),
355 )
356 .map_err(|e| ConsensusError::ClientImport(e.to_string()))
357 .map_err(ConsensusError::from)?;
358
359 if let Some((median_last_finalized_number, new_set)) = forced_change_set {
360 let new_authorities = {
361 let (set_id, new_authorities) = new_set.current();
362
363 let best_finalized_number = self.inner.info().finalized_number;
368 let canon_number = best_finalized_number.min(median_last_finalized_number);
369 let canon_hash = self.inner.hash(canon_number)
370 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
371 .expect(
372 "the given block number is less or equal than the current best finalized number; \
373 current best finalized number must exist in chain; qed."
374 );
375
376 NewAuthoritySet {
377 canon_number,
378 canon_hash,
379 set_id,
380 authorities: new_authorities.to_vec(),
381 }
382 };
383 let old = ::std::mem::replace(guard.as_mut(), new_set);
384 guard.set_old(old);
385
386 AppliedChanges::Forced(new_authorities)
387 } else {
388 let did_standard = guard
389 .as_mut()
390 .enacts_standard_change(hash, number, &is_descendent_of)
391 .map_err(|e| ConsensusError::ClientImport(e.to_string()))
392 .map_err(ConsensusError::from)?;
393
394 if let Some(root) = did_standard {
395 AppliedChanges::Standard(root)
396 } else {
397 AppliedChanges::None
398 }
399 }
400 };
401
402 let just_in_case = guard.consume();
404 if let Some((_, ref authorities)) = just_in_case {
405 let authorities_change = match applied_changes {
406 AppliedChanges::Forced(ref new) => Some(new),
407 AppliedChanges::Standard(_) => None, AppliedChanges::None => None,
409 };
410
411 crate::aux_schema::update_authority_set::<Block, _, _>(
412 authorities,
413 authorities_change,
414 |insert| {
415 block
416 .auxiliary
417 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
418 },
419 );
420 }
421
422 let just_in_case = just_in_case.map(|(o, i)| (o, i.release_mutex()));
423
424 Ok(PendingSetChanges { just_in_case, applied_changes, do_pause })
425 }
426
427 fn current_set_id(&self, hash: Block::Hash) -> Result<SetId, ConsensusError> {
429 let runtime_version = self.inner.runtime_api().version(hash).map_err(|e| {
430 ConsensusError::ClientImport(format!(
431 "Unable to retrieve current runtime version. {}",
432 e
433 ))
434 })?;
435
436 if runtime_version
437 .api_version(&<dyn GrandpaApi<Block>>::ID)
438 .map_or(false, |v| v < 3)
439 {
440 for prefix in ["GrandpaFinality", "Grandpa"] {
443 let k = [
444 subsoil_crypto_hashing::twox_128(prefix.as_bytes()),
445 subsoil_crypto_hashing::twox_128(b"CurrentSetId"),
446 ]
447 .concat();
448 if let Ok(Some(id)) =
449 self.inner.storage(hash, &soil_client::client_api::StorageKey(k.to_vec()))
450 {
451 if let Ok(id) = SetId::decode(&mut id.0.as_ref()) {
452 return Ok(id);
453 }
454 }
455 }
456 Err(ConsensusError::ClientImport("Unable to retrieve current set id.".into()))
457 } else {
458 self.inner
459 .runtime_api()
460 .current_set_id(hash)
461 .map_err(|e| ConsensusError::ClientImport(e.to_string()))
462 }
463 }
464
465 async fn import_state(
467 &self,
468 mut block: BlockImportParams<Block>,
469 ) -> Result<ImportResult, ConsensusError> {
470 let hash = block.post_hash();
471 let number = *block.header.number();
472 block.finalized = true;
474 let import_result = (&*self.inner).import_block(block).await;
475 match import_result {
476 Ok(ImportResult::Imported(aux)) => {
477 self.authority_set_hard_forks.lock().clear();
481 let authorities = self
482 .inner
483 .runtime_api()
484 .grandpa_authorities(hash)
485 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
486 let set_id = self.current_set_id(hash)?;
487 let authority_set = AuthoritySet::new(
488 authorities.clone(),
489 set_id,
490 soil_fork_tree::ForkTree::new(),
491 Vec::new(),
492 AuthoritySetChanges::empty(),
493 )
494 .ok_or_else(|| ConsensusError::ClientImport("Invalid authority list".into()))?;
495 *self.authority_set.inner_locked() = authority_set.clone();
496
497 crate::aux_schema::update_authority_set::<Block, _, _>(
498 &authority_set,
499 None,
500 |insert| self.inner.insert_aux(insert, []),
501 )
502 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
503 let new_set =
504 NewAuthoritySet { canon_number: number, canon_hash: hash, set_id, authorities };
505 let _ = self
506 .send_voter_commands
507 .unbounded_send(VoterCommand::ChangeAuthorities(new_set));
508 Ok(ImportResult::Imported(aux))
509 },
510 Ok(r) => Ok(r),
511 Err(e) => Err(ConsensusError::ClientImport(e.to_string())),
512 }
513 }
514}
515
516#[async_trait::async_trait]
517impl<BE, Block: BlockT, Client, SC> BlockImport<Block> for GrandpaBlockImport<BE, Block, Client, SC>
518where
519 NumberFor<Block>: finality_grandpa::BlockNumberOps,
520 BE: Backend<Block>,
521 Client: ClientForGrandpa<Block, BE>,
522 Client::Api: GrandpaApi<Block>,
523 for<'a> &'a Client: BlockImport<Block, Error = ConsensusError>,
524 SC: Send + Sync,
525{
526 type Error = ConsensusError;
527
528 async fn import_block(
529 &self,
530 mut block: BlockImportParams<Block>,
531 ) -> Result<ImportResult, Self::Error> {
532 let hash = block.post_hash();
533 let number = *block.header.number();
534
535 match self.inner.status(hash) {
538 Ok(BlockStatus::InChain) => {
539 let _justifications = block.justifications.take();
541 return (&*self.inner).import_block(block).await;
542 },
543 Ok(BlockStatus::Unknown) => {},
544 Err(e) => return Err(ConsensusError::ClientImport(e.to_string())),
545 }
546
547 if block.with_state() {
548 return self.import_state(block).await;
549 }
550
551 if number <= self.inner.info().finalized_number {
552 if self.check_new_change(&block.header, hash).is_some() {
554 if block.justifications.is_none() {
555 return Err(ConsensusError::ClientImport(
556 "Justification required when importing \
557 an old block with authority set change."
558 .into(),
559 ));
560 }
561 let mut authority_set = self.authority_set.inner_locked();
562 authority_set.authority_set_changes.insert(number);
563 crate::aux_schema::update_authority_set::<Block, _, _>(
564 &authority_set,
565 None,
566 |insert| {
567 block
568 .auxiliary
569 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
570 },
571 );
572 }
573 return (&*self.inner).import_block(block).await;
574 }
575
576 let initial_sync = block.origin == BlockOrigin::NetworkInitialSync;
578
579 let pending_changes = self.make_authorities_changes(&mut block, hash, initial_sync)?;
580
581 let mut justifications = block.justifications.take();
583 let import_result = (&*self.inner).import_block(block).await;
584
585 let mut imported_aux = {
586 match import_result {
587 Ok(ImportResult::Imported(aux)) => aux,
588 Ok(r) => {
589 debug!(
590 target: LOG_TARGET,
591 "Restoring old authority set after block import result: {:?}", r,
592 );
593 pending_changes.revert();
594 return Ok(r);
595 },
596 Err(e) => {
597 debug!(
598 target: LOG_TARGET,
599 "Restoring old authority set after block import error: {}", e,
600 );
601 pending_changes.revert();
602 return Err(ConsensusError::ClientImport(e.to_string()));
603 },
604 }
605 };
606
607 let (applied_changes, do_pause) = pending_changes.defuse();
608
609 if do_pause {
611 let _ = self.send_voter_commands.unbounded_send(VoterCommand::Pause(
612 "Forced change scheduled after inactivity".to_string(),
613 ));
614 }
615
616 let needs_justification = applied_changes.needs_justification();
617
618 match applied_changes {
619 AppliedChanges::Forced(new) => {
620 let _ =
632 self.send_voter_commands.unbounded_send(VoterCommand::ChangeAuthorities(new));
633
634 imported_aux.clear_justification_requests = true;
637 },
638 AppliedChanges::Standard(false) => {
639 justifications.take();
644 },
645 _ => {},
646 }
647
648 let grandpa_justification =
649 justifications.and_then(|just| just.into_justification(GRANDPA_ENGINE_ID));
650
651 match grandpa_justification {
652 Some(justification) => {
653 if environment::should_process_justification(
654 &*self.inner,
655 self.justification_import_period,
656 number,
657 needs_justification,
658 ) {
659 let import_res = self.import_justification(
660 hash,
661 number,
662 (GRANDPA_ENGINE_ID, justification),
663 needs_justification,
664 initial_sync,
665 );
666
667 import_res.unwrap_or_else(|err| {
668 if needs_justification {
669 debug!(
670 target: LOG_TARGET,
671 "Requesting justification from peers due to imported block #{} that enacts authority set change with invalid justification: {}",
672 number,
673 err
674 );
675 imported_aux.bad_justification = true;
676 imported_aux.needs_justification = true;
677 }
678 });
679 } else {
680 debug!(
681 target: LOG_TARGET,
682 "Ignoring unnecessary justification for block #{}",
683 number,
684 );
685 }
686 },
687 None => {
688 if needs_justification {
689 debug!(
690 target: LOG_TARGET,
691 "Imported unjustified block #{} that enacts authority set change, waiting for finality for enactment.",
692 number,
693 );
694
695 imported_aux.needs_justification = true;
696 }
697 },
698 }
699
700 Ok(ImportResult::Imported(imported_aux))
701 }
702
703 async fn check_block(
704 &self,
705 block: BlockCheckParams<Block>,
706 ) -> Result<ImportResult, Self::Error> {
707 self.inner.check_block(block).await
708 }
709}
710
711impl<Backend, Block: BlockT, Client, SC> GrandpaBlockImport<Backend, Block, Client, SC> {
712 pub(crate) fn new(
713 inner: Arc<Client>,
714 justification_import_period: u32,
715 select_chain: SC,
716 authority_set: SharedAuthoritySet<Block::Hash, NumberFor<Block>>,
717 send_voter_commands: TracingUnboundedSender<VoterCommand<Block::Hash, NumberFor<Block>>>,
718 authority_set_hard_forks: Vec<(SetId, PendingChange<Block::Hash, NumberFor<Block>>)>,
719 justification_sender: GrandpaJustificationSender<Block>,
720 telemetry: Option<TelemetryHandle>,
721 ) -> GrandpaBlockImport<Backend, Block, Client, SC> {
722 if let Some((_, change)) = authority_set_hard_forks
725 .iter()
726 .find(|(set_id, _)| *set_id == authority_set.set_id())
727 {
728 authority_set.inner().current_authorities = change.next_authorities.clone();
729 }
730
731 let authority_set_hard_forks = authority_set_hard_forks
735 .into_iter()
736 .map(|(_, change)| (change.canon_hash, change))
737 .collect::<HashMap<_, _>>();
738
739 {
743 let mut authority_set = authority_set.inner();
744
745 authority_set.pending_standard_changes =
746 authority_set.pending_standard_changes.clone().map(&mut |hash, _, original| {
747 authority_set_hard_forks.get(hash).cloned().unwrap_or(original)
748 });
749 }
750
751 GrandpaBlockImport {
752 inner,
753 justification_import_period,
754 select_chain,
755 authority_set,
756 send_voter_commands,
757 authority_set_hard_forks: Mutex::new(authority_set_hard_forks),
758 justification_sender,
759 telemetry,
760 _phantom: PhantomData,
761 }
762 }
763}
764
765impl<BE, Block: BlockT, Client, SC> GrandpaBlockImport<BE, Block, Client, SC>
766where
767 BE: Backend<Block>,
768 Client: ClientForGrandpa<Block, BE>,
769 NumberFor<Block>: finality_grandpa::BlockNumberOps,
770{
771 fn import_justification(
776 &self,
777 hash: Block::Hash,
778 number: NumberFor<Block>,
779 justification: Justification,
780 enacts_change: bool,
781 initial_sync: bool,
782 ) -> Result<(), ConsensusError> {
783 if justification.0 != GRANDPA_ENGINE_ID {
784 return Ok(());
790 }
791
792 let justification = GrandpaJustification::decode_and_verify_finalizes(
793 &justification.1,
794 (hash, number),
795 self.authority_set.set_id(),
796 &self.authority_set.current_authorities(),
797 );
798
799 let justification = match justification {
800 Err(e) => {
801 return match e {
802 soil_client::blockchain::Error::OutdatedJustification => {
803 Err(ConsensusError::OutdatedJustification)
804 },
805 _ => Err(ConsensusError::ClientImport(e.to_string())),
806 };
807 },
808 Ok(justification) => justification,
809 };
810
811 let result = environment::finalize_block(
812 self.inner.clone(),
813 &self.authority_set,
814 None,
815 hash,
816 number,
817 justification.into(),
818 initial_sync,
819 Some(&self.justification_sender),
820 self.telemetry.clone(),
821 );
822
823 match result {
824 Err(CommandOrError::VoterCommand(command)) => {
825 grandpa_log!(
826 initial_sync,
827 "👴 Imported justification for block #{} that triggers \
828 command {}, signaling voter.",
829 number,
830 command,
831 );
832
833 let _ = self.send_voter_commands.unbounded_send(command);
835 },
836 Err(CommandOrError::Error(e)) => {
837 return Err(match e {
838 Error::Grandpa(error) => ConsensusError::ClientImport(error.to_string()),
839 Error::Network(error) => ConsensusError::ClientImport(error),
840 Error::Blockchain(error) => ConsensusError::ClientImport(error),
841 Error::Client(error) => ConsensusError::ClientImport(error.to_string()),
842 Error::Safety(error) => ConsensusError::ClientImport(error),
843 Error::Signing(error) => ConsensusError::ClientImport(error),
844 Error::Timer(error) => ConsensusError::ClientImport(error.to_string()),
845 Error::RuntimeApi(error) => ConsensusError::ClientImport(error.to_string()),
846 })
847 },
848 Ok(_) => {
849 assert!(
850 !enacts_change,
851 "returns Ok when no authority set change should be enacted; qed;"
852 );
853 },
854 }
855
856 Ok(())
857 }
858}