1#![deny(unused_crate_dependencies)]
67
68use std::{
69 collections::{HashMap, HashSet},
70 sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75 channel::{mpsc, oneshot},
76 future::BoxFuture,
77 stream::FuturesOrdered,
78 FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84 AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85 ValidationResult,
86};
87use polkadot_node_subsystem::{
88 messages::{
89 AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
90 CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
91 HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
92 ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
93 RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
94 StoreAvailableDataError,
95 },
96 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97 SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100 self as util,
101 backing_implicit_view::View as ImplicitView,
102 request_claim_queue, request_disabled_validators, request_min_backing_votes,
103 request_node_features, request_session_executor_params, request_session_index_for_child,
104 request_validator_groups, request_validators,
105 runtime::{self, ClaimQueueSnapshot},
106 Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110 vstaging::{
111 BackedCandidate, CandidateReceiptV2 as CandidateReceipt,
112 CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
113 },
114 CandidateCommitments, CandidateHash, CoreIndex, ExecutorParams, GroupIndex, GroupRotationInfo,
115 Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData, SessionIndex,
116 SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
117 ValidityAttestation,
118};
119use polkadot_statement_table::{
120 generic::AttestedCandidate as TableAttestedCandidate,
121 v2::{
122 SignedStatement as TableSignedStatement, Statement as TableStatement,
123 Summary as TableSummary,
124 },
125 Context as TableContextTrait, Table,
126};
127use sp_keystore::KeystorePtr;
128
129mod error;
130
131mod metrics;
132use self::metrics::Metrics;
133
134#[cfg(test)]
135mod tests;
136
137const LOG_TARGET: &str = "parachain::candidate-backing";
138
139enum PoVData {
141 Ready(Arc<PoV>),
143 FetchFromValidator {
145 from_validator: ValidatorIndex,
146 candidate_hash: CandidateHash,
147 pov_hash: Hash,
148 },
149}
150
151enum ValidatedCandidateCommand {
152 Second(BackgroundValidationResult),
154 Attest(BackgroundValidationResult),
156 AttestNoPoV(CandidateHash),
158}
159
160impl std::fmt::Debug for ValidatedCandidateCommand {
161 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
162 let candidate_hash = self.candidate_hash();
163 match *self {
164 ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
165 ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
166 ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
167 }
168 }
169}
170
171impl ValidatedCandidateCommand {
172 fn candidate_hash(&self) -> CandidateHash {
173 match *self {
174 ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
175 ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
176 ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
177 ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
178 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
179 }
180 }
181}
182
183pub struct CandidateBackingSubsystem {
185 keystore: KeystorePtr,
186 metrics: Metrics,
187}
188
189impl CandidateBackingSubsystem {
190 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
192 Self { keystore, metrics }
193 }
194}
195
196#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
197impl<Context> CandidateBackingSubsystem
198where
199 Context: Send + Sync,
200{
201 fn start(self, ctx: Context) -> SpawnedSubsystem {
202 let future = async move {
203 run(ctx, self.keystore, self.metrics)
204 .await
205 .map_err(|e| SubsystemError::with_origin("candidate-backing", e))
206 }
207 .boxed();
208
209 SpawnedSubsystem { name: "candidate-backing-subsystem", future }
210 }
211}
212
213struct PerRelayParentState {
214 parent: Hash,
216 node_features: NodeFeatures,
218 executor_params: Arc<ExecutorParams>,
220 assigned_core: Option<CoreIndex>,
222 backed: HashSet<CandidateHash>,
224 table: Table<TableContext>,
226 table_context: TableContext,
228 issued_statements: HashSet<CandidateHash>,
230 awaiting_validation: HashSet<CandidateHash>,
232 fallbacks: HashMap<CandidateHash, AttestingData>,
234 minimum_backing_votes: u32,
236 n_cores: u32,
238 claim_queue: ClaimQueueSnapshot,
241 validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
243 group_rotation_info: GroupRotationInfo,
245}
246
247struct PerCandidateState {
248 persisted_validation_data: PersistedValidationData,
249 seconded_locally: bool,
250 relay_parent: Hash,
251}
252
253struct PerSessionCache {
256 validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
258 node_features_cache: LruMap<SessionIndex, NodeFeatures>,
260 executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
262 minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
264 validator_to_group_cache:
266 LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
267}
268
269impl Default for PerSessionCache {
270 fn default() -> Self {
272 Self::new(2)
273 }
274}
275
276impl PerSessionCache {
277 fn new(capacity: u32) -> Self {
279 PerSessionCache {
280 validators_cache: LruMap::new(ByLength::new(capacity)),
281 node_features_cache: LruMap::new(ByLength::new(capacity)),
282 executor_params_cache: LruMap::new(ByLength::new(capacity)),
283 minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
284 validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
285 }
286 }
287
288 async fn validators(
290 &mut self,
291 session_index: SessionIndex,
292 parent: Hash,
293 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
294 ) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
295 if let Some(validators) = self.validators_cache.get(&session_index) {
297 return Ok(Arc::clone(validators));
298 }
299
300 let validators: Vec<ValidatorId> =
302 request_validators(parent, sender).await.await.map_err(|err| {
303 RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
304 })??;
305
306 let validators = Arc::new(validators);
308
309 self.validators_cache.insert(session_index, Arc::clone(&validators));
311
312 Ok(validators)
313 }
314
315 async fn node_features(
317 &mut self,
318 session_index: SessionIndex,
319 parent: Hash,
320 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
321 ) -> Result<NodeFeatures, RuntimeApiError> {
322 if let Some(node_features) = self.node_features_cache.get(&session_index) {
324 return Ok(node_features.clone());
325 }
326
327 let node_features = request_node_features(parent, session_index, sender)
329 .await
330 .await
331 .map_err(|err| RuntimeApiError::Execution {
332 runtime_api_name: "NodeFeatures",
333 source: Arc::new(err),
334 })??;
335
336 self.node_features_cache.insert(session_index, node_features.clone());
338
339 Ok(node_features)
340 }
341
342 async fn executor_params(
345 &mut self,
346 session_index: SessionIndex,
347 parent: Hash,
348 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
349 ) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
350 if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
352 return Ok(Arc::clone(executor_params));
353 }
354
355 let executor_params = request_session_executor_params(parent, session_index, sender)
357 .await
358 .await
359 .map_err(|err| RuntimeApiError::Execution {
360 runtime_api_name: "SessionExecutorParams",
361 source: Arc::new(err),
362 })??
363 .ok_or_else(|| RuntimeApiError::Execution {
364 runtime_api_name: "SessionExecutorParams",
365 source: Arc::new(Error::MissingExecutorParams),
366 })?;
367
368 let executor_params = Arc::new(executor_params);
370
371 self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
373
374 Ok(executor_params)
375 }
376
377 async fn minimum_backing_votes(
380 &mut self,
381 session_index: SessionIndex,
382 parent: Hash,
383 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
384 ) -> Result<u32, RuntimeApiError> {
385 if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
387 return Ok(*minimum_backing_votes);
388 }
389
390 let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
392 .await
393 .await
394 .map_err(|err| RuntimeApiError::Execution {
395 runtime_api_name: "MinimumBackingVotes",
396 source: Arc::new(err),
397 })??;
398
399 self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
401
402 Ok(minimum_backing_votes)
403 }
404
405 fn validator_to_group(
407 &mut self,
408 session_index: SessionIndex,
409 validators: &[ValidatorId],
410 validator_groups: &[Vec<ValidatorIndex>],
411 ) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
412 let validator_to_group = self
413 .validator_to_group_cache
414 .get_or_insert(session_index, || {
415 let mut vector = vec![None; validators.len()];
416
417 for (group_idx, validator_group) in validator_groups.iter().enumerate() {
418 for validator in validator_group {
419 vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
420 }
421 }
422
423 Arc::new(IndexedVec::<_, _>::from(vector))
424 })
425 .expect("Just inserted");
426
427 Arc::clone(validator_to_group)
428 }
429}
430
431struct State {
433 implicit_view: ImplicitView,
435 per_relay_parent: HashMap<Hash, PerRelayParentState>,
438 per_candidate: HashMap<CandidateHash, PerCandidateState>,
443 per_session_cache: PerSessionCache,
446 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
449 keystore: KeystorePtr,
451}
452
453impl State {
454 fn new(
455 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
456 keystore: KeystorePtr,
457 ) -> Self {
458 State {
459 implicit_view: ImplicitView::default(),
460 per_relay_parent: HashMap::default(),
461 per_candidate: HashMap::new(),
462 per_session_cache: PerSessionCache::default(),
463 background_validation_tx,
464 keystore,
465 }
466 }
467}
468
469#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
470async fn run<Context>(
471 mut ctx: Context,
472 keystore: KeystorePtr,
473 metrics: Metrics,
474) -> FatalResult<()> {
475 let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
476 let mut state = State::new(background_validation_tx, keystore);
477
478 loop {
479 let res =
480 run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
481
482 match res {
483 Ok(()) => break,
484 Err(e) => crate::error::log_error(Err(e))?,
485 }
486 }
487
488 Ok(())
489}
490
491#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
492async fn run_iteration<Context>(
493 ctx: &mut Context,
494 state: &mut State,
495 metrics: &Metrics,
496 background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
497) -> Result<(), Error> {
498 loop {
499 futures::select!(
500 validated_command = background_validation_rx.next().fuse() => {
501 if let Some((relay_parent, command)) = validated_command {
502 handle_validated_candidate_command(
503 &mut *ctx,
504 state,
505 relay_parent,
506 command,
507 metrics,
508 ).await?;
509 } else {
510 panic!("background_validation_tx always alive at this point; qed");
511 }
512 }
513 from_overseer = ctx.recv().fuse() => {
514 match from_overseer.map_err(Error::OverseerExited)? {
515 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
516 handle_active_leaves_update(
517 &mut *ctx,
518 update,
519 state,
520 ).await?;
521 }
522 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
523 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
524 FromOrchestra::Communication { msg } => {
525 handle_communication(&mut *ctx, state, msg, metrics).await?;
526 }
527 }
528 }
529 )
530 }
531}
532
533#[derive(Clone)]
539struct AttestingData {
540 candidate: CandidateReceipt,
542 pov_hash: Hash,
544 from_validator: ValidatorIndex,
546 backing: Vec<ValidatorIndex>,
548}
549
550#[derive(Default, Debug)]
551struct TableContext {
552 validator: Option<Validator>,
553 groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
554 validators: Vec<ValidatorId>,
555 disabled_validators: Vec<ValidatorIndex>,
556}
557
558impl TableContext {
559 pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
561 self.disabled_validators
562 .iter()
563 .any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
564 }
565
566 pub fn local_validator_is_disabled(&self) -> Option<bool> {
568 self.validator.as_ref().map(|v| v.disabled())
569 }
570}
571
572impl TableContextTrait for TableContext {
573 type AuthorityId = ValidatorIndex;
574 type Digest = CandidateHash;
575 type GroupId = CoreIndex;
576 type Signature = ValidatorSignature;
577 type Candidate = CommittedCandidateReceipt;
578
579 fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
580 candidate.hash()
581 }
582
583 fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
584 self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
585 }
586
587 fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
588 self.groups.get(group).map(|g| g.len())
589 }
590}
591
592fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
595 let statement = match s.payload() {
596 StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
597 StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
598 };
599
600 TableSignedStatement {
601 statement,
602 signature: s.signature().clone(),
603 sender: s.validator_index(),
604 }
605}
606
607fn table_attested_to_backed(
608 attested: TableAttestedCandidate<
609 CoreIndex,
610 CommittedCandidateReceipt,
611 ValidatorIndex,
612 ValidatorSignature,
613 >,
614 table_context: &TableContext,
615) -> Option<BackedCandidate> {
616 let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
617
618 let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
619 validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
620
621 let group = table_context.groups.get(&core_index)?;
622
623 let mut validator_indices = BitVec::with_capacity(group.len());
624
625 validator_indices.resize(group.len(), false);
626
627 let mut vote_positions = Vec::with_capacity(validity_votes.len());
631 for (orig_idx, id) in ids.iter().enumerate() {
632 if let Some(position) = group.iter().position(|x| x == id) {
633 validator_indices.set(position, true);
634 vote_positions.push((orig_idx, position));
635 } else {
636 gum::warn!(
637 target: LOG_TARGET,
638 "Logic error: Validity vote from table does not correspond to group",
639 );
640
641 return None
642 }
643 }
644 vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
645
646 Some(BackedCandidate::new(
647 candidate,
648 vote_positions
649 .into_iter()
650 .map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
651 .collect(),
652 validator_indices,
653 core_index,
654 ))
655}
656
657async fn store_available_data(
658 sender: &mut impl overseer::CandidateBackingSenderTrait,
659 n_validators: u32,
660 candidate_hash: CandidateHash,
661 available_data: AvailableData,
662 expected_erasure_root: Hash,
663 core_index: CoreIndex,
664 node_features: NodeFeatures,
665) -> Result<(), Error> {
666 let (tx, rx) = oneshot::channel();
667 sender
672 .send_message(AvailabilityStoreMessage::StoreAvailableData {
673 candidate_hash,
674 n_validators,
675 available_data,
676 expected_erasure_root,
677 core_index,
678 node_features,
679 tx,
680 })
681 .await;
682
683 rx.await
684 .map_err(Error::StoreAvailableDataChannel)?
685 .map_err(Error::StoreAvailableData)
686}
687
688async fn make_pov_available(
696 sender: &mut impl overseer::CandidateBackingSenderTrait,
697 n_validators: usize,
698 pov: Arc<PoV>,
699 candidate_hash: CandidateHash,
700 validation_data: PersistedValidationData,
701 expected_erasure_root: Hash,
702 core_index: CoreIndex,
703 node_features: NodeFeatures,
704) -> Result<(), Error> {
705 store_available_data(
706 sender,
707 n_validators as u32,
708 candidate_hash,
709 AvailableData { pov, validation_data },
710 expected_erasure_root,
711 core_index,
712 node_features,
713 )
714 .await
715}
716
717async fn request_pov(
718 sender: &mut impl overseer::CandidateBackingSenderTrait,
719 relay_parent: Hash,
720 from_validator: ValidatorIndex,
721 para_id: ParaId,
722 candidate_hash: CandidateHash,
723 pov_hash: Hash,
724) -> Result<Arc<PoV>, Error> {
725 let (tx, rx) = oneshot::channel();
726 sender
727 .send_message(AvailabilityDistributionMessage::FetchPoV {
728 relay_parent,
729 from_validator,
730 para_id,
731 candidate_hash,
732 pov_hash,
733 tx,
734 })
735 .await;
736
737 let pov = rx.await.map_err(|_| Error::FetchPoV)?;
738 Ok(Arc::new(pov))
739}
740
741async fn request_candidate_validation(
742 sender: &mut impl overseer::CandidateBackingSenderTrait,
743 validation_data: PersistedValidationData,
744 validation_code: ValidationCode,
745 candidate_receipt: CandidateReceipt,
746 pov: Arc<PoV>,
747 executor_params: ExecutorParams,
748) -> Result<ValidationResult, Error> {
749 let (tx, rx) = oneshot::channel();
750 let is_system = candidate_receipt.descriptor.para_id().is_system();
751 let relay_parent = candidate_receipt.descriptor.relay_parent();
752
753 sender
754 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
755 validation_data,
756 validation_code,
757 candidate_receipt,
758 pov,
759 executor_params,
760 exec_kind: if is_system {
761 PvfExecKind::BackingSystemParas(relay_parent)
762 } else {
763 PvfExecKind::Backing(relay_parent)
764 },
765 response_sender: tx,
766 })
767 .await;
768
769 match rx.await {
770 Ok(Ok(validation_result)) => Ok(validation_result),
771 Ok(Err(err)) => Err(Error::ValidationFailed(err)),
772 Err(err) => Err(Error::ValidateFromExhaustive(err)),
773 }
774}
775
776struct BackgroundValidationOutputs {
777 candidate: CandidateReceipt,
778 commitments: CandidateCommitments,
779 persisted_validation_data: PersistedValidationData,
780}
781
782type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
783
784struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
785 sender: S,
786 tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
787 candidate: CandidateReceipt,
788 relay_parent: Hash,
789 node_features: NodeFeatures,
790 executor_params: Arc<ExecutorParams>,
791 persisted_validation_data: PersistedValidationData,
792 pov: PoVData,
793 n_validators: usize,
794 make_command: F,
795}
796
797async fn validate_and_make_available(
798 params: BackgroundValidationParams<
799 impl overseer::CandidateBackingSenderTrait,
800 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
801 >,
802 core_index: CoreIndex,
803) -> Result<(), Error> {
804 let BackgroundValidationParams {
805 mut sender,
806 mut tx_command,
807 candidate,
808 relay_parent,
809 node_features,
810 executor_params,
811 persisted_validation_data,
812 pov,
813 n_validators,
814 make_command,
815 } = params;
816
817 let validation_code = {
818 let validation_code_hash = candidate.descriptor().validation_code_hash();
819 let (tx, rx) = oneshot::channel();
820 sender
821 .send_message(RuntimeApiMessage::Request(
822 relay_parent,
823 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
824 ))
825 .await;
826
827 let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
828 match code {
829 Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
830 Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
831 Ok(Some(c)) => c,
832 }
833 };
834
835 let pov = match pov {
836 PoVData::Ready(pov) => pov,
837 PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } =>
838 match request_pov(
839 &mut sender,
840 relay_parent,
841 from_validator,
842 candidate.descriptor.para_id(),
843 candidate_hash,
844 pov_hash,
845 )
846 .await
847 {
848 Err(Error::FetchPoV) => {
849 tx_command
850 .send((
851 relay_parent,
852 ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
853 ))
854 .await
855 .map_err(Error::BackgroundValidationMpsc)?;
856 return Ok(())
857 },
858 Err(err) => return Err(err),
859 Ok(pov) => pov,
860 },
861 };
862
863 let v = {
864 request_candidate_validation(
865 &mut sender,
866 persisted_validation_data,
867 validation_code,
868 candidate.clone(),
869 pov.clone(),
870 executor_params.as_ref().clone(),
871 )
872 .await?
873 };
874
875 let res = match v {
876 ValidationResult::Valid(commitments, validation_data) => {
877 gum::debug!(
878 target: LOG_TARGET,
879 candidate_hash = ?candidate.hash(),
880 "Validation successful",
881 );
882
883 let erasure_valid = make_pov_available(
884 &mut sender,
885 n_validators,
886 pov.clone(),
887 candidate.hash(),
888 validation_data.clone(),
889 candidate.descriptor.erasure_root(),
890 core_index,
891 node_features,
892 )
893 .await;
894
895 match erasure_valid {
896 Ok(()) => Ok(BackgroundValidationOutputs {
897 candidate,
898 commitments,
899 persisted_validation_data: validation_data,
900 }),
901 Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
902 gum::debug!(
903 target: LOG_TARGET,
904 candidate_hash = ?candidate.hash(),
905 actual_commitments = ?commitments,
906 "Erasure root doesn't match the announced by the candidate receipt",
907 );
908 Err(candidate)
909 },
910 Err(e) => return Err(e),
912 }
913 },
914 ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
915 gum::warn!(
917 target: LOG_TARGET,
918 candidate_hash = ?candidate.hash(),
919 "Validation yielded different commitments",
920 );
921 Err(candidate)
922 },
923 ValidationResult::Invalid(reason) => {
924 gum::warn!(
925 target: LOG_TARGET,
926 candidate_hash = ?candidate.hash(),
927 reason = ?reason,
928 "Validation yielded an invalid candidate",
929 );
930 Err(candidate)
931 },
932 };
933
934 tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
935}
936
937#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
938async fn handle_communication<Context>(
939 ctx: &mut Context,
940 state: &mut State,
941 message: CandidateBackingMessage,
942 metrics: &Metrics,
943) -> Result<(), Error> {
944 match message {
945 CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
946 handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
947 },
948 CandidateBackingMessage::Statement(relay_parent, statement) => {
949 handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
950 },
951 CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) =>
952 handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?,
953 CandidateBackingMessage::CanSecond(request, tx) =>
954 handle_can_second_request(ctx, state, request, tx).await,
955 }
956
957 Ok(())
958}
959
960#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
961async fn handle_active_leaves_update<Context>(
962 ctx: &mut Context,
963 update: ActiveLeavesUpdate,
964 state: &mut State,
965) -> Result<(), Error> {
966 let res = if let Some(leaf) = update.activated {
969 let leaf_hash = leaf.hash;
970 Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
971 } else {
972 None
973 };
974
975 for deactivated in update.deactivated {
976 state.implicit_view.deactivate_leaf(deactivated);
977 }
978
979 {
983 let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
984
985 state.per_relay_parent.retain(|r, _| remaining.contains(&r));
986 }
987
988 state
991 .per_candidate
992 .retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
993
994 let fresh_relay_parents = match res {
997 None => return Ok(()),
998 Some((leaf, Ok(_))) => {
999 let fresh_relay_parents =
1000 state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
1001
1002 let fresh_relay_parent = match fresh_relay_parents {
1003 Some(f) => f.to_vec(),
1004 None => {
1005 gum::warn!(
1006 target: LOG_TARGET,
1007 leaf_hash = ?leaf.hash,
1008 "Implicit view gave no relay-parents"
1009 );
1010
1011 vec![leaf.hash]
1012 },
1013 };
1014 fresh_relay_parent
1015 },
1016 Some((leaf, Err(e))) => {
1017 gum::debug!(
1018 target: LOG_TARGET,
1019 leaf_hash = ?leaf.hash,
1020 err = ?e,
1021 "Failed to load implicit view for leaf."
1022 );
1023
1024 return Ok(())
1025 },
1026 };
1027
1028 for maybe_new in fresh_relay_parents {
1030 if state.per_relay_parent.contains_key(&maybe_new) {
1031 continue
1032 }
1033
1034 let per = construct_per_relay_parent_state(
1037 ctx,
1038 maybe_new,
1039 &state.keystore,
1040 &mut state.per_session_cache,
1041 )
1042 .await?;
1043
1044 if let Some(per) = per {
1045 state.per_relay_parent.insert(maybe_new, per);
1046 }
1047 }
1048
1049 Ok(())
1050}
1051
1052macro_rules! try_runtime_api {
1053 ($x: expr) => {
1054 match $x {
1055 Ok(x) => x,
1056 Err(err) => {
1057 error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1059
1060 return Ok(None)
1064 },
1065 }
1066 };
1067}
1068
1069fn core_index_from_statement(
1070 validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1071 group_rotation_info: &GroupRotationInfo,
1072 n_cores: u32,
1073 claim_queue: &ClaimQueueSnapshot,
1074 statement: &SignedFullStatementWithPVD,
1075) -> Option<CoreIndex> {
1076 let compact_statement = statement.as_unchecked();
1077 let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1078
1079 gum::trace!(
1080 target:LOG_TARGET,
1081 ?group_rotation_info,
1082 ?statement,
1083 ?validator_to_group,
1084 n_cores,
1085 ?candidate_hash,
1086 "Extracting core index from statement"
1087 );
1088
1089 let statement_validator_index = statement.validator_index();
1090 let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1091 gum::debug!(
1092 target: LOG_TARGET,
1093 ?group_rotation_info,
1094 ?statement,
1095 ?validator_to_group,
1096 n_cores,
1097 ?candidate_hash,
1098 "Invalid validator index: {:?}",
1099 statement_validator_index
1100 );
1101 return None
1102 };
1103
1104 let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1106
1107 if core_index.0 > n_cores {
1108 gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1109 return None
1110 }
1111
1112 if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1113 let candidate_para_id = candidate.descriptor.para_id();
1114 let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1115
1116 if !assigned_paras.any(|id| id == &candidate_para_id) {
1117 gum::debug!(
1118 target: LOG_TARGET,
1119 ?candidate_hash,
1120 ?core_index,
1121 assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1122 ?candidate_para_id,
1123 "Invalid CoreIndex, core is not assigned to this para_id"
1124 );
1125 return None
1126 }
1127 return Some(core_index)
1128 } else {
1129 return Some(core_index)
1130 }
1131}
1132
1133#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1135async fn construct_per_relay_parent_state<Context>(
1136 ctx: &mut Context,
1137 relay_parent: Hash,
1138 keystore: &KeystorePtr,
1139 per_session_cache: &mut PerSessionCache,
1140) -> Result<Option<PerRelayParentState>, Error> {
1141 let parent = relay_parent;
1142
1143 let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1144 request_session_index_for_child(parent, ctx.sender()).await,
1145 request_validator_groups(parent, ctx.sender()).await,
1146 request_claim_queue(parent, ctx.sender()).await,
1147 request_disabled_validators(parent, ctx.sender()).await,
1148 )
1149 .map_err(Error::JoinMultiple)?;
1150
1151 let session_index = try_runtime_api!(session_index);
1152
1153 let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1154 let validators = try_runtime_api!(validators);
1155
1156 let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1157 let node_features = try_runtime_api!(node_features);
1158
1159 let executor_params =
1160 per_session_cache.executor_params(session_index, parent, ctx.sender()).await;
1161 let executor_params = try_runtime_api!(executor_params);
1162
1163 gum::debug!(target: LOG_TARGET, ?parent, "New state");
1164
1165 let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1166
1167 let minimum_backing_votes = per_session_cache
1168 .minimum_backing_votes(session_index, parent, ctx.sender())
1169 .await;
1170 let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1171 let claim_queue = try_runtime_api!(claim_queue);
1172 let disabled_validators = try_runtime_api!(disabled_validators);
1173
1174 let signing_context = SigningContext { parent_hash: parent, session_index };
1175 let validator = match Validator::construct(
1176 &validators,
1177 &disabled_validators,
1178 signing_context.clone(),
1179 keystore.clone(),
1180 ) {
1181 Ok(v) => Some(v),
1182 Err(util::Error::NotAValidator) => None,
1183 Err(e) => {
1184 gum::warn!(
1185 target: LOG_TARGET,
1186 err = ?e,
1187 "Cannot participate in candidate backing",
1188 );
1189
1190 return Ok(None)
1191 },
1192 };
1193
1194 let n_cores = validator_groups.len();
1195
1196 let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1197 let mut assigned_core = None;
1198
1199 for idx in 0..n_cores {
1200 let core_index = CoreIndex(idx as _);
1201
1202 if !claim_queue.contains_key(&core_index) {
1203 continue
1204 }
1205
1206 let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1207 if let Some(g) = validator_groups.get(group_index.0 as usize) {
1208 if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1209 assigned_core = Some(core_index);
1210 }
1211 groups.insert(core_index, g.clone());
1212 }
1213 }
1214 gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1215
1216 let validator_to_group =
1217 per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1218
1219 let table_context =
1220 TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1221
1222 Ok(Some(PerRelayParentState {
1223 parent,
1224 node_features,
1225 executor_params,
1226 assigned_core,
1227 backed: HashSet::new(),
1228 table: Table::new(),
1229 table_context,
1230 issued_statements: HashSet::new(),
1231 awaiting_validation: HashSet::new(),
1232 fallbacks: HashMap::new(),
1233 minimum_backing_votes,
1234 n_cores: validator_groups.len() as u32,
1235 claim_queue: ClaimQueueSnapshot::from(claim_queue),
1236 validator_to_group,
1237 group_rotation_info,
1238 }))
1239}
1240
1241enum SecondingAllowed {
1242 No,
1243 Yes(Vec<Hash>),
1245}
1246
1247#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1250async fn seconding_sanity_check<Context>(
1251 ctx: &mut Context,
1252 implicit_view: &ImplicitView,
1253 hypothetical_candidate: HypotheticalCandidate,
1254) -> SecondingAllowed {
1255 let mut leaves_for_seconding = Vec::new();
1256 let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1257
1258 let candidate_para = hypothetical_candidate.candidate_para();
1259 let candidate_relay_parent = hypothetical_candidate.relay_parent();
1260 let candidate_hash = hypothetical_candidate.candidate_hash();
1261
1262 for head in implicit_view.leaves() {
1263 let allowed_parents_for_para =
1266 implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1267 if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1268 continue
1269 }
1270
1271 let (tx, rx) = oneshot::channel();
1272 ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1273 HypotheticalMembershipRequest {
1274 candidates: vec![hypothetical_candidate.clone()],
1275 fragment_chain_relay_parent: Some(*head),
1276 },
1277 tx,
1278 ))
1279 .await;
1280 let response = rx.map_ok(move |candidate_memberships| {
1281 let is_member_or_potential = candidate_memberships
1282 .into_iter()
1283 .find_map(|(candidate, leaves)| {
1284 (candidate.candidate_hash() == candidate_hash).then_some(leaves)
1285 })
1286 .and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1287 .is_some();
1288
1289 (is_member_or_potential, head)
1290 });
1291 responses.push_back(response.boxed());
1292 }
1293
1294 if responses.is_empty() {
1295 return SecondingAllowed::No
1296 }
1297
1298 while let Some(response) = responses.next().await {
1299 match response {
1300 Err(oneshot::Canceled) => {
1301 gum::warn!(
1302 target: LOG_TARGET,
1303 "Failed to reach prospective parachains subsystem for hypothetical membership",
1304 );
1305
1306 return SecondingAllowed::No
1307 },
1308 Ok((is_member_or_potential, head)) => match is_member_or_potential {
1309 false => {
1310 gum::debug!(
1311 target: LOG_TARGET,
1312 ?candidate_hash,
1313 leaf_hash = ?head,
1314 "Refusing to second candidate at leaf. Is not a potential member.",
1315 );
1316 },
1317 true => {
1318 leaves_for_seconding.push(*head);
1319 },
1320 },
1321 }
1322 }
1323
1324 if leaves_for_seconding.is_empty() {
1325 SecondingAllowed::No
1326 } else {
1327 SecondingAllowed::Yes(leaves_for_seconding)
1328 }
1329}
1330
1331#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1333async fn handle_can_second_request<Context>(
1334 ctx: &mut Context,
1335 state: &State,
1336 request: CanSecondRequest,
1337 tx: oneshot::Sender<bool>,
1338) {
1339 let relay_parent = request.candidate_relay_parent;
1340 let response = if state.per_relay_parent.get(&relay_parent).is_some() {
1341 let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1342 candidate_hash: request.candidate_hash,
1343 candidate_para: request.candidate_para_id,
1344 parent_head_data_hash: request.parent_head_data_hash,
1345 candidate_relay_parent: relay_parent,
1346 };
1347
1348 let result =
1349 seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1350
1351 match result {
1352 SecondingAllowed::No => false,
1353 SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1354 }
1355 } else {
1356 false
1358 };
1359
1360 let _ = tx.send(response);
1361}
1362
1363#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1364async fn handle_validated_candidate_command<Context>(
1365 ctx: &mut Context,
1366 state: &mut State,
1367 relay_parent: Hash,
1368 command: ValidatedCandidateCommand,
1369 metrics: &Metrics,
1370) -> Result<(), Error> {
1371 match state.per_relay_parent.get_mut(&relay_parent) {
1372 Some(rp_state) => {
1373 let candidate_hash = command.candidate_hash();
1374 rp_state.awaiting_validation.remove(&candidate_hash);
1375
1376 match command {
1377 ValidatedCandidateCommand::Second(res) => match res {
1378 Ok(outputs) => {
1379 let BackgroundValidationOutputs {
1380 candidate,
1381 commitments,
1382 persisted_validation_data,
1383 } = outputs;
1384
1385 if rp_state.issued_statements.contains(&candidate_hash) {
1386 return Ok(())
1387 }
1388
1389 let receipt = CommittedCandidateReceipt {
1390 descriptor: candidate.descriptor.clone(),
1391 commitments,
1392 };
1393
1394 let hypothetical_candidate = HypotheticalCandidate::Complete {
1395 candidate_hash,
1396 receipt: Arc::new(receipt.clone()),
1397 persisted_validation_data: persisted_validation_data.clone(),
1398 };
1399 if let SecondingAllowed::No = seconding_sanity_check(
1403 ctx,
1404 &state.implicit_view,
1405 hypothetical_candidate,
1406 )
1407 .await
1408 {
1409 return Ok(())
1410 };
1411
1412 let statement =
1413 StatementWithPVD::Seconded(receipt, persisted_validation_data);
1414
1415 let res = sign_import_and_distribute_statement(
1419 ctx,
1420 rp_state,
1421 &mut state.per_candidate,
1422 statement,
1423 state.keystore.clone(),
1424 metrics,
1425 )
1426 .await;
1427
1428 if let Err(Error::RejectedByProspectiveParachains) = res {
1429 let candidate_hash = candidate.hash();
1430 gum::debug!(
1431 target: LOG_TARGET,
1432 relay_parent = ?candidate.descriptor().relay_parent(),
1433 ?candidate_hash,
1434 "Attempted to second candidate but was rejected by prospective parachains",
1435 );
1436
1437 ctx.send_message(CollatorProtocolMessage::Invalid(
1439 candidate.descriptor().relay_parent(),
1440 candidate,
1441 ))
1442 .await;
1443
1444 return Ok(())
1445 }
1446
1447 if let Some(stmt) = res? {
1448 match state.per_candidate.get_mut(&candidate_hash) {
1449 None => {
1450 gum::warn!(
1451 target: LOG_TARGET,
1452 ?candidate_hash,
1453 "Missing `per_candidate` for seconded candidate.",
1454 );
1455 },
1456 Some(p) => p.seconded_locally = true,
1457 }
1458
1459 rp_state.issued_statements.insert(candidate_hash);
1460
1461 metrics.on_candidate_seconded();
1462 ctx.send_message(CollatorProtocolMessage::Seconded(
1463 rp_state.parent,
1464 StatementWithPVD::drop_pvd_from_signed(stmt),
1465 ))
1466 .await;
1467 }
1468 },
1469 Err(candidate) => {
1470 ctx.send_message(CollatorProtocolMessage::Invalid(
1471 rp_state.parent,
1472 candidate,
1473 ))
1474 .await;
1475 },
1476 },
1477 ValidatedCandidateCommand::Attest(res) => {
1478 rp_state.fallbacks.remove(&candidate_hash);
1480 if !rp_state.issued_statements.contains(&candidate_hash) {
1482 if res.is_ok() {
1483 let statement = StatementWithPVD::Valid(candidate_hash);
1484
1485 sign_import_and_distribute_statement(
1486 ctx,
1487 rp_state,
1488 &mut state.per_candidate,
1489 statement,
1490 state.keystore.clone(),
1491 metrics,
1492 )
1493 .await?;
1494 }
1495 rp_state.issued_statements.insert(candidate_hash);
1496 }
1497 },
1498 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1499 if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1500 if let Some(index) = attesting.backing.pop() {
1501 attesting.from_validator = index;
1502 let attesting = attesting.clone();
1503
1504 if let Some(pvd) = state
1510 .per_candidate
1511 .get(&candidate_hash)
1512 .map(|pc| pc.persisted_validation_data.clone())
1513 {
1514 kick_off_validation_work(
1515 ctx,
1516 rp_state,
1517 pvd,
1518 &state.background_validation_tx,
1519 attesting,
1520 )
1521 .await?;
1522 }
1523 }
1524 } else {
1525 gum::warn!(
1526 target: LOG_TARGET,
1527 "AttestNoPoV was triggered without fallback being available."
1528 );
1529 debug_assert!(false);
1530 }
1531 },
1532 }
1533 },
1534 None => {
1535 },
1538 }
1539
1540 Ok(())
1541}
1542
1543fn sign_statement(
1544 rp_state: &PerRelayParentState,
1545 statement: StatementWithPVD,
1546 keystore: KeystorePtr,
1547 metrics: &Metrics,
1548) -> Option<SignedFullStatementWithPVD> {
1549 let signed = rp_state
1550 .table_context
1551 .validator
1552 .as_ref()?
1553 .sign(keystore, statement)
1554 .ok()
1555 .flatten()?;
1556 metrics.on_statement_signed();
1557 Some(signed)
1558}
1559
1560#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1568async fn import_statement<Context>(
1569 ctx: &mut Context,
1570 rp_state: &mut PerRelayParentState,
1571 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1572 statement: &SignedFullStatementWithPVD,
1573) -> Result<Option<TableSummary>, Error> {
1574 let candidate_hash = statement.payload().candidate_hash();
1575
1576 gum::debug!(
1577 target: LOG_TARGET,
1578 statement = ?statement.payload().to_compact(),
1579 validator_index = statement.validator_index().0,
1580 ?candidate_hash,
1581 "Importing statement",
1582 );
1583
1584 if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1597 if !per_candidate.contains_key(&candidate_hash) {
1598 let (tx, rx) = oneshot::channel();
1599 ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1600 IntroduceSecondedCandidateRequest {
1601 candidate_para: candidate.descriptor.para_id(),
1602 candidate_receipt: candidate.clone(),
1603 persisted_validation_data: pvd.clone(),
1604 },
1605 tx,
1606 ))
1607 .await;
1608
1609 match rx.await {
1610 Err(oneshot::Canceled) => {
1611 gum::warn!(
1612 target: LOG_TARGET,
1613 "Could not reach the Prospective Parachains subsystem."
1614 );
1615
1616 return Err(Error::RejectedByProspectiveParachains)
1617 },
1618 Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1619 Ok(true) => {},
1620 }
1621
1622 per_candidate.insert(
1624 candidate_hash,
1625 PerCandidateState {
1626 persisted_validation_data: pvd.clone(),
1627 seconded_locally: false,
1629 relay_parent: candidate.descriptor.relay_parent(),
1630 },
1631 );
1632 }
1633 }
1634
1635 let stmt = primitive_statement_to_table(statement);
1636
1637 let core = core_index_from_statement(
1638 &rp_state.validator_to_group,
1639 &rp_state.group_rotation_info,
1640 rp_state.n_cores,
1641 &rp_state.claim_queue,
1642 statement,
1643 )
1644 .ok_or(Error::CoreIndexUnavailable)?;
1645
1646 Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1647}
1648
1649#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1652async fn post_import_statement_actions<Context>(
1653 ctx: &mut Context,
1654 rp_state: &mut PerRelayParentState,
1655 summary: Option<&TableSummary>,
1656) {
1657 if let Some(attested) = summary.as_ref().and_then(|s| {
1658 rp_state.table.attested_candidate(
1659 &s.candidate,
1660 &rp_state.table_context,
1661 rp_state.minimum_backing_votes,
1662 )
1663 }) {
1664 let candidate_hash = attested.candidate.hash();
1665
1666 if rp_state.backed.insert(candidate_hash) {
1668 if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
1669 let para_id = backed.candidate().descriptor.para_id();
1670 gum::debug!(
1671 target: LOG_TARGET,
1672 candidate_hash = ?candidate_hash,
1673 relay_parent = ?rp_state.parent,
1674 %para_id,
1675 "Candidate backed",
1676 );
1677
1678 ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1681 para_id,
1682 candidate_hash,
1683 ))
1684 .await;
1685 ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1687 } else {
1688 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1689 }
1690 } else {
1691 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1692 }
1693 } else {
1694 gum::debug!(target: LOG_TARGET, "No attested candidate");
1695 }
1696
1697 issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1698}
1699
1700#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1702fn issue_new_misbehaviors<Context>(
1703 ctx: &mut Context,
1704 relay_parent: Hash,
1705 table: &mut Table<TableContext>,
1706) {
1707 let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1709 for (validator_id, report) in misbehaviors {
1710 ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1716 relay_parent,
1717 ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1718 ));
1719 }
1720}
1721
1722#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1724async fn sign_import_and_distribute_statement<Context>(
1725 ctx: &mut Context,
1726 rp_state: &mut PerRelayParentState,
1727 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1728 statement: StatementWithPVD,
1729 keystore: KeystorePtr,
1730 metrics: &Metrics,
1731) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1732 if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1733 let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1734
1735 let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1738 ctx.send_unbounded_message(smsg);
1739
1740 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1741
1742 Ok(Some(signed_statement))
1743 } else {
1744 Ok(None)
1745 }
1746}
1747
1748#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1749async fn background_validate_and_make_available<Context>(
1750 ctx: &mut Context,
1751 rp_state: &mut PerRelayParentState,
1752 params: BackgroundValidationParams<
1753 impl overseer::CandidateBackingSenderTrait,
1754 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1755 >,
1756) -> Result<(), Error> {
1757 let candidate_hash = params.candidate.hash();
1758 let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1759 if rp_state.awaiting_validation.insert(candidate_hash) {
1760 let bg = async move {
1762 if let Err(error) = validate_and_make_available(params, core_index).await {
1763 if let Error::BackgroundValidationMpsc(error) = error {
1764 gum::debug!(
1765 target: LOG_TARGET,
1766 ?candidate_hash,
1767 ?error,
1768 "Mpsc background validation mpsc died during validation- leaf no longer active?"
1769 );
1770 } else {
1771 gum::error!(
1772 target: LOG_TARGET,
1773 ?candidate_hash,
1774 ?error,
1775 "Failed to validate and make available",
1776 );
1777 }
1778 }
1779 };
1780
1781 ctx.spawn("backing-validation", bg.boxed())
1782 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1783 }
1784
1785 Ok(())
1786}
1787
1788#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1790async fn kick_off_validation_work<Context>(
1791 ctx: &mut Context,
1792 rp_state: &mut PerRelayParentState,
1793 persisted_validation_data: PersistedValidationData,
1794 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1795 attesting: AttestingData,
1796) -> Result<(), Error> {
1797 match rp_state.table_context.local_validator_is_disabled() {
1799 Some(true) => {
1800 gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1801 return Ok(())
1802 },
1803 Some(false) => {}, None => {
1805 gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1806 return Ok(())
1807 },
1808 }
1809
1810 let candidate_hash = attesting.candidate.hash();
1811 if rp_state.issued_statements.contains(&candidate_hash) {
1812 return Ok(())
1813 }
1814
1815 gum::debug!(
1816 target: LOG_TARGET,
1817 candidate_hash = ?candidate_hash,
1818 candidate_receipt = ?attesting.candidate,
1819 "Kicking off validation",
1820 );
1821
1822 let bg_sender = ctx.sender().clone();
1823 let pov = PoVData::FetchFromValidator {
1824 from_validator: attesting.from_validator,
1825 candidate_hash,
1826 pov_hash: attesting.pov_hash,
1827 };
1828
1829 background_validate_and_make_available(
1830 ctx,
1831 rp_state,
1832 BackgroundValidationParams {
1833 sender: bg_sender,
1834 tx_command: background_validation_tx.clone(),
1835 candidate: attesting.candidate,
1836 relay_parent: rp_state.parent,
1837 node_features: rp_state.node_features.clone(),
1838 executor_params: Arc::clone(&rp_state.executor_params),
1839 persisted_validation_data,
1840 pov,
1841 n_validators: rp_state.table_context.validators.len(),
1842 make_command: ValidatedCandidateCommand::Attest,
1843 },
1844 )
1845 .await
1846}
1847
1848#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1850async fn maybe_validate_and_import<Context>(
1851 ctx: &mut Context,
1852 state: &mut State,
1853 relay_parent: Hash,
1854 statement: SignedFullStatementWithPVD,
1855) -> Result<(), Error> {
1856 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1857 Some(r) => r,
1858 None => {
1859 gum::trace!(
1860 target: LOG_TARGET,
1861 ?relay_parent,
1862 "Received statement for unknown relay-parent"
1863 );
1864
1865 return Ok(())
1866 },
1867 };
1868
1869 if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1871 gum::debug!(
1872 target: LOG_TARGET,
1873 sender_validator_idx = ?statement.validator_index(),
1874 "Not importing statement because the sender is disabled"
1875 );
1876 return Ok(())
1877 }
1878
1879 let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1880
1881 if let Err(Error::RejectedByProspectiveParachains) = res {
1884 gum::debug!(
1885 target: LOG_TARGET,
1886 ?relay_parent,
1887 "Statement rejected by prospective parachains."
1888 );
1889
1890 return Ok(())
1891 }
1892
1893 let summary = res?;
1894 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1895
1896 if let Some(summary) = summary {
1897 let candidate_hash = summary.candidate;
1902
1903 if Some(summary.group_id) != rp_state.assigned_core {
1904 return Ok(())
1905 }
1906
1907 let attesting = match statement.payload() {
1908 StatementWithPVD::Seconded(receipt, _) => {
1909 let attesting = AttestingData {
1910 candidate: rp_state
1911 .table
1912 .get_candidate(&candidate_hash)
1913 .ok_or(Error::CandidateNotFound)?
1914 .to_plain(),
1915 pov_hash: receipt.descriptor.pov_hash(),
1916 from_validator: statement.validator_index(),
1917 backing: Vec::new(),
1918 };
1919 rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1920 attesting
1921 },
1922 StatementWithPVD::Valid(candidate_hash) => {
1923 if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1924 let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1925 if our_index == Some(statement.validator_index()) {
1926 return Ok(())
1927 }
1928
1929 if rp_state.awaiting_validation.contains(candidate_hash) {
1930 attesting.backing.push(statement.validator_index());
1932 return Ok(())
1933 } else {
1934 attesting.from_validator = statement.validator_index();
1936 attesting.clone()
1937 }
1938 } else {
1939 return Ok(())
1940 }
1941 },
1942 };
1943
1944 if let Some(pvd) = state
1947 .per_candidate
1948 .get(&candidate_hash)
1949 .map(|pc| pc.persisted_validation_data.clone())
1950 {
1951 kick_off_validation_work(
1952 ctx,
1953 rp_state,
1954 pvd,
1955 &state.background_validation_tx,
1956 attesting,
1957 )
1958 .await?;
1959 }
1960 }
1961 Ok(())
1962}
1963
1964#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1966async fn validate_and_second<Context>(
1967 ctx: &mut Context,
1968 rp_state: &mut PerRelayParentState,
1969 persisted_validation_data: PersistedValidationData,
1970 candidate: &CandidateReceipt,
1971 pov: Arc<PoV>,
1972 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1973) -> Result<(), Error> {
1974 let candidate_hash = candidate.hash();
1975
1976 gum::debug!(
1977 target: LOG_TARGET,
1978 candidate_hash = ?candidate_hash,
1979 candidate_receipt = ?candidate,
1980 "Validate and second candidate",
1981 );
1982
1983 let bg_sender = ctx.sender().clone();
1984 background_validate_and_make_available(
1985 ctx,
1986 rp_state,
1987 BackgroundValidationParams {
1988 sender: bg_sender,
1989 tx_command: background_validation_tx.clone(),
1990 candidate: candidate.clone(),
1991 relay_parent: rp_state.parent,
1992 node_features: rp_state.node_features.clone(),
1993 executor_params: Arc::clone(&rp_state.executor_params),
1994 persisted_validation_data,
1995 pov: PoVData::Ready(pov),
1996 n_validators: rp_state.table_context.validators.len(),
1997 make_command: ValidatedCandidateCommand::Second,
1998 },
1999 )
2000 .await?;
2001
2002 Ok(())
2003}
2004
2005#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2006async fn handle_second_message<Context>(
2007 ctx: &mut Context,
2008 state: &mut State,
2009 candidate: CandidateReceipt,
2010 persisted_validation_data: PersistedValidationData,
2011 pov: PoV,
2012 metrics: &Metrics,
2013) -> Result<(), Error> {
2014 let _timer = metrics.time_process_second();
2015
2016 let candidate_hash = candidate.hash();
2017 let relay_parent = candidate.descriptor().relay_parent();
2018
2019 if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2020 gum::warn!(
2021 target: LOG_TARGET,
2022 ?candidate_hash,
2023 "Candidate backing was asked to second candidate with wrong PVD",
2024 );
2025
2026 return Ok(())
2027 }
2028
2029 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2030 None => {
2031 gum::trace!(
2032 target: LOG_TARGET,
2033 ?relay_parent,
2034 ?candidate_hash,
2035 "We were asked to second a candidate outside of our view."
2036 );
2037
2038 return Ok(())
2039 },
2040 Some(r) => r,
2041 };
2042
2043 if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2046 gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2047 return Ok(())
2048 }
2049
2050 let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2051
2052 if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2054 gum::debug!(
2055 target: LOG_TARGET,
2056 our_assignment_core = ?rp_state.assigned_core,
2057 our_assignment_paras = ?assigned_paras,
2058 collation = ?candidate.descriptor().para_id(),
2059 "Subsystem asked to second for para outside of our assignment",
2060 );
2061 return Ok(());
2062 }
2063
2064 gum::debug!(
2065 target: LOG_TARGET,
2066 our_assignment_core = ?rp_state.assigned_core,
2067 our_assignment_paras = ?assigned_paras,
2068 collation = ?candidate.descriptor().para_id(),
2069 "Current assignments vs collation",
2070 );
2071
2072 if !rp_state.issued_statements.contains(&candidate_hash) {
2080 let pov = Arc::new(pov);
2081
2082 validate_and_second(
2083 ctx,
2084 rp_state,
2085 persisted_validation_data,
2086 &candidate,
2087 pov,
2088 &state.background_validation_tx,
2089 )
2090 .await?;
2091 }
2092
2093 Ok(())
2094}
2095
2096#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2097async fn handle_statement_message<Context>(
2098 ctx: &mut Context,
2099 state: &mut State,
2100 relay_parent: Hash,
2101 statement: SignedFullStatementWithPVD,
2102 metrics: &Metrics,
2103) -> Result<(), Error> {
2104 let _timer = metrics.time_process_statement();
2105
2106 match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2108 Err(Error::ValidationFailed(_)) => Ok(()),
2109 Err(e) => Err(e),
2110 Ok(()) => Ok(()),
2111 }
2112}
2113
2114fn handle_get_backable_candidates_message(
2115 state: &State,
2116 requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2117 tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2118 metrics: &Metrics,
2119) -> Result<(), Error> {
2120 let _timer = metrics.time_get_backed_candidates();
2121
2122 let mut backed = HashMap::with_capacity(requested_candidates.len());
2123
2124 for (para_id, para_candidates) in requested_candidates {
2125 for (candidate_hash, relay_parent) in para_candidates.iter() {
2126 let rp_state = match state.per_relay_parent.get(&relay_parent) {
2127 Some(rp_state) => rp_state,
2128 None => {
2129 gum::debug!(
2130 target: LOG_TARGET,
2131 ?relay_parent,
2132 ?candidate_hash,
2133 "Requested candidate's relay parent is out of view",
2134 );
2135 break
2136 },
2137 };
2138 let maybe_backed_candidate = rp_state
2139 .table
2140 .attested_candidate(
2141 candidate_hash,
2142 &rp_state.table_context,
2143 rp_state.minimum_backing_votes,
2144 )
2145 .and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context));
2146
2147 if let Some(backed_candidate) = maybe_backed_candidate {
2148 backed
2149 .entry(para_id)
2150 .or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2151 .push(backed_candidate);
2152 } else {
2153 break
2154 }
2155 }
2156 }
2157
2158 tx.send(backed).map_err(|data| Error::Send(data))?;
2159 Ok(())
2160}