1#![deny(unused_crate_dependencies)]
67
68use std::{
69 collections::{HashMap, HashSet},
70 sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75 channel::{mpsc, oneshot},
76 future::BoxFuture,
77 stream::FuturesOrdered,
78 FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84 AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85 ValidationResult, DISPUTE_WINDOW,
86};
87use polkadot_node_subsystem::{
88 messages::{
89 AvailabilityDistributionMessage, AvailabilityStoreMessage, BackableCandidateRef,
90 CanSecondRequest, CandidateBackingMessage, CandidateValidationMessage,
91 CollatorProtocolMessage, HypotheticalCandidate, HypotheticalMembershipRequest,
92 IntroduceSecondedCandidateRequest, ProspectiveParachainsMessage, ProvisionableData,
93 ProvisionerMessage, PvfExecKind, RuntimeApiMessage, RuntimeApiRequest,
94 StatementDistributionMessage, StoreAvailableDataError,
95 },
96 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97 SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100 self as util,
101 backing_implicit_view::View as ImplicitView,
102 request_claim_queue, request_disabled_validators, request_min_backing_votes,
103 request_node_features, request_session_executor_params, request_session_index_for_child,
104 request_validator_groups, request_validators,
105 runtime::{self, ClaimQueueSnapshot},
106 Error as UtilError, Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110 node_features::FeatureIndex, BackedCandidate, CandidateCommitments, CandidateDescriptorV2,
111 CandidateHash, CandidateReceiptV2 as CandidateReceipt,
112 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, ExecutorParams,
113 GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures,
114 PersistedValidationData, SessionIndex, SigningContext, ValidationCode, ValidatorId,
115 ValidatorIndex, ValidatorSignature, ValidityAttestation,
116};
117use polkadot_statement_table::{
118 generic::AttestedCandidate as TableAttestedCandidate,
119 v2::{
120 SignedStatement as TableSignedStatement, Statement as TableStatement,
121 Summary as TableSummary,
122 },
123 Context as TableContextTrait, Table,
124};
125use sp_keystore::KeystorePtr;
126
127mod error;
128
129mod metrics;
130use self::metrics::Metrics;
131
132#[cfg(test)]
133mod tests;
134
135const LOG_TARGET: &str = "parachain::candidate-backing";
136
137enum PoVData {
139 Ready(Arc<PoV>),
141 FetchFromValidator {
143 from_validator: ValidatorIndex,
144 candidate_hash: CandidateHash,
145 pov_hash: Hash,
146 },
147}
148
149enum ValidatedCandidateCommand {
150 Second(BackgroundValidationResult),
152 Attest(BackgroundValidationResult),
154 AttestNoPoV(CandidateHash),
156}
157
158impl std::fmt::Debug for ValidatedCandidateCommand {
159 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
160 let candidate_hash = self.candidate_hash();
161 match *self {
162 ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
163 ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
164 ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
165 }
166 }
167}
168
169impl ValidatedCandidateCommand {
170 fn candidate_hash(&self) -> CandidateHash {
171 match *self {
172 ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
173 ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
174 ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
175 ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
176 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
177 }
178 }
179}
180
181pub struct CandidateBackingSubsystem {
183 keystore: KeystorePtr,
184 metrics: Metrics,
185}
186
187impl CandidateBackingSubsystem {
188 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
190 Self { keystore, metrics }
191 }
192}
193
194#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
195impl<Context> CandidateBackingSubsystem
196where
197 Context: Send + Sync,
198{
199 fn start(self, ctx: Context) -> SpawnedSubsystem {
200 let future = async move {
201 run(ctx, self.keystore, self.metrics)
202 .await
203 .map_err(|e| SubsystemError::with_origin("candidate-backing", e))
204 }
205 .boxed();
206
207 SpawnedSubsystem { name: "candidate-backing-subsystem", future }
208 }
209}
210
211struct PerSchedulingParentState {
212 parent: Hash,
214 node_features: NodeFeatures,
216 assigned_core: Option<CoreIndex>,
218 backed: HashSet<CandidateHash>,
220 table: Table<TableContext>,
222 table_context: TableContext,
224 issued_statements: HashSet<CandidateHash>,
226 awaiting_validation: HashSet<CandidateHash>,
228 fallbacks: HashMap<CandidateHash, AttestingData>,
230 minimum_backing_votes: u32,
232 n_cores: u32,
234 claim_queue: ClaimQueueSnapshot,
237 validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
239 session_index: SessionIndex,
242 group_rotation_info: GroupRotationInfo,
244}
245
246struct PerCandidateState {
247 persisted_validation_data: PersistedValidationData,
248 seconded_locally: bool,
249 scheduling_parent: Hash,
250}
251
252struct PerSessionCache {
255 validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
257 node_features_cache: LruMap<SessionIndex, NodeFeatures>,
259 executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
261 minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
263 validator_to_group_cache:
265 LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
266}
267
268impl Default for PerSessionCache {
269 fn default() -> Self {
274 Self::new(DISPUTE_WINDOW.get())
275 }
276}
277
278impl PerSessionCache {
279 fn new(capacity: u32) -> Self {
281 PerSessionCache {
282 validators_cache: LruMap::new(ByLength::new(capacity)),
283 node_features_cache: LruMap::new(ByLength::new(capacity)),
284 executor_params_cache: LruMap::new(ByLength::new(capacity)),
285 minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
286 validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
287 }
288 }
289
290 async fn validators(
292 &mut self,
293 session_index: SessionIndex,
294 parent: Hash,
295 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
296 ) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
297 if let Some(validators) = self.validators_cache.get(&session_index) {
299 return Ok(Arc::clone(validators));
300 }
301
302 let validators: Vec<ValidatorId> =
304 request_validators(parent, sender).await.await.map_err(|err| {
305 RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
306 })??;
307
308 let validators = Arc::new(validators);
310
311 self.validators_cache.insert(session_index, Arc::clone(&validators));
313
314 Ok(validators)
315 }
316
317 async fn node_features(
319 &mut self,
320 session_index: SessionIndex,
321 parent: Hash,
322 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
323 ) -> Result<NodeFeatures, RuntimeApiError> {
324 if let Some(node_features) = self.node_features_cache.get(&session_index) {
326 return Ok(node_features.clone());
327 }
328
329 let node_features = request_node_features(parent, session_index, sender)
331 .await
332 .await
333 .map_err(|err| RuntimeApiError::Execution {
334 runtime_api_name: "NodeFeatures",
335 source: Arc::new(err),
336 })??;
337
338 self.node_features_cache.insert(session_index, node_features.clone());
340
341 Ok(node_features)
342 }
343
344 async fn executor_params(
347 &mut self,
348 session_index: SessionIndex,
349 parent: Hash,
350 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
351 ) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
352 if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
354 return Ok(Arc::clone(executor_params));
355 }
356
357 let executor_params = request_session_executor_params(parent, session_index, sender)
359 .await
360 .await
361 .map_err(|err| RuntimeApiError::Execution {
362 runtime_api_name: "SessionExecutorParams",
363 source: Arc::new(err),
364 })??
365 .ok_or_else(|| RuntimeApiError::Execution {
366 runtime_api_name: "SessionExecutorParams",
367 source: Arc::new(Error::MissingExecutorParams),
368 })?;
369
370 let executor_params = Arc::new(executor_params);
372
373 self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
375
376 Ok(executor_params)
377 }
378
379 async fn minimum_backing_votes(
382 &mut self,
383 session_index: SessionIndex,
384 parent: Hash,
385 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
386 ) -> Result<u32, RuntimeApiError> {
387 if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
389 return Ok(*minimum_backing_votes);
390 }
391
392 let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
394 .await
395 .await
396 .map_err(|err| RuntimeApiError::Execution {
397 runtime_api_name: "MinimumBackingVotes",
398 source: Arc::new(err),
399 })??;
400
401 self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
403
404 Ok(minimum_backing_votes)
405 }
406
407 fn validator_to_group(
409 &mut self,
410 session_index: SessionIndex,
411 validators: &[ValidatorId],
412 validator_groups: &[Vec<ValidatorIndex>],
413 ) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
414 let validator_to_group = self
415 .validator_to_group_cache
416 .get_or_insert(session_index, || {
417 let mut vector = vec![None; validators.len()];
418
419 for (group_idx, validator_group) in validator_groups.iter().enumerate() {
420 for validator in validator_group {
421 vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
422 }
423 }
424
425 Arc::new(IndexedVec::<_, _>::from(vector))
426 })
427 .expect("Just inserted");
428
429 Arc::clone(validator_to_group)
430 }
431}
432
433struct State {
435 implicit_view: ImplicitView,
437 per_scheduling_parent: HashMap<Hash, PerSchedulingParentState>,
440 per_candidate: HashMap<CandidateHash, PerCandidateState>,
445 per_session_cache: PerSessionCache,
448 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
451 keystore: KeystorePtr,
453 v3_ever_seen: bool,
464}
465
466impl State {
467 fn new(
468 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
469 keystore: KeystorePtr,
470 ) -> Self {
471 State {
472 implicit_view: ImplicitView::default(),
473 per_scheduling_parent: HashMap::default(),
474 per_candidate: HashMap::new(),
475 per_session_cache: PerSessionCache::default(),
476 background_validation_tx,
477 keystore,
478 v3_ever_seen: false,
479 }
480 }
481}
482
483#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
484async fn run<Context>(
485 mut ctx: Context,
486 keystore: KeystorePtr,
487 metrics: Metrics,
488) -> FatalResult<()> {
489 let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
490 let mut state = State::new(background_validation_tx, keystore);
491
492 loop {
493 let res =
494 run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
495
496 match res {
497 Ok(()) => break,
498 Err(e) => crate::error::log_error(Err(e))?,
499 }
500 }
501
502 Ok(())
503}
504
505#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
506async fn run_iteration<Context>(
507 ctx: &mut Context,
508 state: &mut State,
509 metrics: &Metrics,
510 background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
511) -> Result<(), Error> {
512 loop {
513 futures::select!(
514 validated_command = background_validation_rx.next().fuse() => {
515 if let Some((scheduling_parent, command)) = validated_command {
516 handle_validated_candidate_command(
517 &mut *ctx,
518 state,
519 scheduling_parent,
520 command,
521 metrics,
522 ).await?;
523 } else {
524 panic!("background_validation_tx always alive at this point; qed");
525 }
526 }
527 from_overseer = ctx.recv().fuse() => {
528 match from_overseer.map_err(Error::OverseerExited)? {
529 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
530 handle_active_leaves_update(
531 &mut *ctx,
532 update,
533 state,
534 ).await?;
535 }
536 FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _number)) => {
537 if !state.v3_ever_seen {
538 check_v3_on_finalized(&mut *ctx, state, hash).await?;
539 }
540 }
541 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
542 FromOrchestra::Communication { msg } => {
543 handle_communication(&mut *ctx, state, msg, metrics).await?;
544 }
545 }
546 }
547 )
548 }
549}
550
551#[derive(Clone)]
557struct AttestingData {
558 candidate: CandidateReceipt,
560 pov_hash: Hash,
562 from_validator: ValidatorIndex,
564 backing: Vec<ValidatorIndex>,
566}
567
568#[derive(Default, Debug)]
569struct TableContext {
570 validator: Option<Validator>,
571 groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
572 validators: Vec<ValidatorId>,
573 disabled_validators: Vec<ValidatorIndex>,
574}
575
576impl TableContext {
577 pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
579 self.disabled_validators
580 .iter()
581 .any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
582 }
583
584 pub fn local_validator_is_disabled(&self) -> Option<bool> {
586 self.validator.as_ref().map(|v| v.disabled())
587 }
588}
589
590impl TableContextTrait for TableContext {
591 type AuthorityId = ValidatorIndex;
592 type Digest = CandidateHash;
593 type GroupId = CoreIndex;
594 type Signature = ValidatorSignature;
595 type Candidate = CommittedCandidateReceipt;
596
597 fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
598 candidate.hash()
599 }
600
601 fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
602 self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
603 }
604
605 fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
606 self.groups.get(group).map(|g| g.len())
607 }
608}
609
610fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
613 let statement = match s.payload() {
614 StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
615 StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
616 };
617
618 TableSignedStatement {
619 statement,
620 signature: s.signature().clone(),
621 sender: s.validator_index(),
622 }
623}
624
625fn table_attested_to_backed(
626 attested: TableAttestedCandidate<
627 CoreIndex,
628 CommittedCandidateReceipt,
629 ValidatorIndex,
630 ValidatorSignature,
631 >,
632 table_context: &TableContext,
633) -> Option<BackedCandidate> {
634 let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
635
636 let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
637 validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
638
639 let group = table_context.groups.get(&core_index)?;
640
641 let mut validator_indices = BitVec::with_capacity(group.len());
642
643 validator_indices.resize(group.len(), false);
644
645 let mut vote_positions = Vec::with_capacity(validity_votes.len());
649 for (orig_idx, id) in ids.iter().enumerate() {
650 if let Some(position) = group.iter().position(|x| x == id) {
651 validator_indices.set(position, true);
652 vote_positions.push((orig_idx, position));
653 } else {
654 gum::warn!(
655 target: LOG_TARGET,
656 "Logic error: Validity vote from table does not correspond to group",
657 );
658
659 return None;
660 }
661 }
662 vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
663
664 Some(BackedCandidate::new(
665 candidate,
666 vote_positions
667 .into_iter()
668 .map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
669 .collect(),
670 validator_indices,
671 core_index,
672 ))
673}
674
675async fn store_available_data(
676 sender: &mut impl overseer::CandidateBackingSenderTrait,
677 n_validators: u32,
678 candidate_hash: CandidateHash,
679 available_data: AvailableData,
680 expected_erasure_root: Hash,
681 core_index: CoreIndex,
682 node_features: NodeFeatures,
683) -> Result<(), Error> {
684 let (tx, rx) = oneshot::channel();
685 sender
690 .send_message(AvailabilityStoreMessage::StoreAvailableData {
691 candidate_hash,
692 n_validators,
693 available_data,
694 expected_erasure_root,
695 core_index,
696 node_features,
697 tx,
698 })
699 .await;
700
701 rx.await
702 .map_err(Error::StoreAvailableDataChannel)?
703 .map_err(Error::StoreAvailableData)
704}
705
706async fn make_pov_available(
714 sender: &mut impl overseer::CandidateBackingSenderTrait,
715 n_validators: usize,
716 pov: Arc<PoV>,
717 candidate_hash: CandidateHash,
718 validation_data: PersistedValidationData,
719 expected_erasure_root: Hash,
720 core_index: CoreIndex,
721 node_features: NodeFeatures,
722) -> Result<(), Error> {
723 store_available_data(
724 sender,
725 n_validators as u32,
726 candidate_hash,
727 AvailableData { pov, validation_data },
728 expected_erasure_root,
729 core_index,
730 node_features,
731 )
732 .await
733}
734
735async fn request_pov(
736 sender: &mut impl overseer::CandidateBackingSenderTrait,
737 relay_parent: Hash,
738 from_validator: ValidatorIndex,
739 para_id: ParaId,
740 candidate_hash: CandidateHash,
741 pov_hash: Hash,
742) -> Result<Arc<PoV>, Error> {
743 let (tx, rx) = oneshot::channel();
744 sender
745 .send_message(AvailabilityDistributionMessage::FetchPoV {
746 relay_parent,
747 from_validator,
748 para_id,
749 candidate_hash,
750 pov_hash,
751 tx,
752 })
753 .await;
754
755 let pov = rx.await.map_err(|_| Error::FetchPoV)?;
756 Ok(Arc::new(pov))
757}
758
759async fn request_candidate_validation(
760 sender: &mut impl overseer::CandidateBackingSenderTrait,
761 validation_data: PersistedValidationData,
762 validation_code: ValidationCode,
763 candidate_receipt: CandidateReceipt,
764 pov: Arc<PoV>,
765 executor_params: ExecutorParams,
766) -> Result<ValidationResult, Error> {
767 let (tx, rx) = oneshot::channel();
768 let is_system = candidate_receipt.descriptor.para_id().is_system();
769 let scheduling_parent = candidate_receipt.descriptor.scheduling_parent();
771
772 sender
773 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
774 validation_data,
775 validation_code,
776 candidate_receipt,
777 pov,
778 executor_params,
779 exec_kind: if is_system {
780 PvfExecKind::BackingSystemParas(scheduling_parent)
781 } else {
782 PvfExecKind::Backing(scheduling_parent)
783 },
784 response_sender: tx,
785 })
786 .await;
787
788 match rx.await {
789 Ok(Ok(validation_result)) => Ok(validation_result),
790 Ok(Err(err)) => Err(Error::ValidationFailed(err)),
791 Err(err) => Err(Error::ValidateFromExhaustive(err)),
792 }
793}
794
795struct BackgroundValidationOutputs {
796 candidate: CandidateReceipt,
797 commitments: CandidateCommitments,
798 persisted_validation_data: PersistedValidationData,
799}
800
801type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
802
803struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
804 sender: S,
805 tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
806 candidate: CandidateReceipt,
807 scheduling_parent: Hash,
811 node_features: NodeFeatures,
812 executor_params: Arc<ExecutorParams>,
813 persisted_validation_data: PersistedValidationData,
814 pov: PoVData,
815 n_validators: usize,
816 make_command: F,
817}
818
819async fn validate_and_make_available(
820 params: BackgroundValidationParams<
821 impl overseer::CandidateBackingSenderTrait,
822 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
823 >,
824 core_index: CoreIndex,
825) -> Result<(), Error> {
826 let BackgroundValidationParams {
827 mut sender,
828 mut tx_command,
829 candidate,
830 scheduling_parent,
831 node_features,
832 executor_params,
833 persisted_validation_data,
834 pov,
835 n_validators,
836 make_command,
837 } = params;
838
839 let validation_code = {
840 let validation_code_hash = candidate.descriptor().validation_code_hash();
841 let (tx, rx) = oneshot::channel();
842 sender
843 .send_message(RuntimeApiMessage::Request(
844 scheduling_parent,
845 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
846 ))
847 .await;
848
849 let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
850 match code {
851 Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
852 Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
853 Ok(Some(c)) => c,
854 }
855 };
856
857 let pov = match pov {
858 PoVData::Ready(pov) => pov,
859 PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => {
860 match request_pov(
861 &mut sender,
862 scheduling_parent,
863 from_validator,
864 candidate.descriptor.para_id(),
865 candidate_hash,
866 pov_hash,
867 )
868 .await
869 {
870 Err(Error::FetchPoV) => {
871 tx_command
872 .send((
873 scheduling_parent,
874 ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
875 ))
876 .await
877 .map_err(Error::BackgroundValidationMpsc)?;
878 return Ok(());
879 },
880 Err(err) => return Err(err),
881 Ok(pov) => pov,
882 }
883 },
884 };
885
886 let v = {
887 request_candidate_validation(
888 &mut sender,
889 persisted_validation_data,
890 validation_code,
891 candidate.clone(),
892 pov.clone(),
893 executor_params.as_ref().clone(),
894 )
895 .await?
896 };
897
898 let res = match v {
899 ValidationResult::Valid(commitments, validation_data) => {
900 gum::debug!(
901 target: LOG_TARGET,
902 candidate_hash = ?candidate.hash(),
903 "Validation successful",
904 );
905
906 let erasure_valid = make_pov_available(
907 &mut sender,
908 n_validators,
909 pov.clone(),
910 candidate.hash(),
911 validation_data.clone(),
912 candidate.descriptor.erasure_root(),
913 core_index,
914 node_features,
915 )
916 .await;
917
918 match erasure_valid {
919 Ok(()) => Ok(BackgroundValidationOutputs {
920 candidate,
921 commitments,
922 persisted_validation_data: validation_data,
923 }),
924 Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
925 gum::debug!(
926 target: LOG_TARGET,
927 candidate_hash = ?candidate.hash(),
928 actual_commitments = ?commitments,
929 "Erasure root doesn't match the announced by the candidate receipt",
930 );
931 Err(candidate)
932 },
933 Err(e) => return Err(e),
935 }
936 },
937 ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
938 gum::warn!(
940 target: LOG_TARGET,
941 candidate_hash = ?candidate.hash(),
942 "Validation yielded different commitments",
943 );
944 Err(candidate)
945 },
946 ValidationResult::Invalid(reason) => {
947 gum::warn!(
948 target: LOG_TARGET,
949 candidate_hash = ?candidate.hash(),
950 reason = ?reason,
951 "Validation yielded an invalid candidate",
952 );
953 Err(candidate)
954 },
955 };
956
957 tx_command
958 .send((scheduling_parent, make_command(res)))
959 .await
960 .map_err(Into::into)
961}
962
963#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
964async fn handle_communication<Context>(
965 ctx: &mut Context,
966 state: &mut State,
967 message: CandidateBackingMessage,
968 metrics: &Metrics,
969) -> Result<(), Error> {
970 match message {
971 CandidateBackingMessage::Second { scheduling_parent: _, candidate, pvd, pov } => {
972 handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
973 },
974 CandidateBackingMessage::Statement { scheduling_parent, statement } => {
975 handle_statement_message(ctx, state, scheduling_parent, statement, metrics).await?;
976 },
977 CandidateBackingMessage::GetBackableCandidates { candidates, sender } => {
978 handle_get_backable_candidates_message(state, candidates, sender, metrics)?
979 },
980 CandidateBackingMessage::CanSecond(request, tx) => {
981 handle_can_second_request(ctx, state, request, tx).await
982 },
983 }
984
985 Ok(())
986}
987
988#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
989async fn handle_active_leaves_update<Context>(
990 ctx: &mut Context,
991 update: ActiveLeavesUpdate,
992 state: &mut State,
993) -> Result<(), Error> {
994 let res = if let Some(leaf) = update.activated {
997 let leaf_hash = leaf.hash;
998 Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
999 } else {
1000 None
1001 };
1002
1003 for deactivated in update.deactivated {
1004 state.implicit_view.deactivate_leaf(deactivated);
1005 }
1006
1007 {
1011 let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
1012
1013 state.per_scheduling_parent.retain(|r, _| remaining.contains(&r));
1014 }
1015
1016 state
1019 .per_candidate
1020 .retain(|_, pc| state.per_scheduling_parent.contains_key(&pc.scheduling_parent));
1021
1022 let fresh_relay_parents = match res {
1025 None => return Ok(()),
1026 Some((leaf, Ok(_))) => {
1027 let fresh_relay_parents =
1028 state.implicit_view.known_allowed_relay_parents_under(&leaf.hash);
1029
1030 let fresh_relay_parent = match fresh_relay_parents {
1031 Some(f) => f.to_vec(),
1032 None => {
1033 gum::warn!(
1034 target: LOG_TARGET,
1035 leaf_hash = ?leaf.hash,
1036 "Implicit view gave no relay-parents"
1037 );
1038
1039 vec![leaf.hash]
1040 },
1041 };
1042 fresh_relay_parent
1043 },
1044 Some((leaf, Err(e))) => {
1045 gum::debug!(
1046 target: LOG_TARGET,
1047 leaf_hash = ?leaf.hash,
1048 err = ?e,
1049 "Failed to load implicit view for leaf."
1050 );
1051
1052 return Ok(());
1053 },
1054 };
1055
1056 for maybe_new in fresh_relay_parents {
1058 if state.per_scheduling_parent.contains_key(&maybe_new) {
1059 continue;
1060 }
1061
1062 let per = construct_per_scheduling_parent_state(
1065 ctx,
1066 maybe_new,
1067 &state.keystore,
1068 &mut state.per_session_cache,
1069 )
1070 .await?;
1071
1072 if let Some(per) = per {
1073 state.per_scheduling_parent.insert(maybe_new, per);
1074 }
1075 }
1076
1077 Ok(())
1078}
1079
1080#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1083async fn check_v3_on_finalized<Context>(
1084 ctx: &mut Context,
1085 state: &mut State,
1086 finalized_hash: Hash,
1087) -> Result<(), Error> {
1088 let session_index = request_session_index_for_child(finalized_hash, ctx.sender())
1089 .await
1090 .await
1091 .map_err(runtime::Error::from)?
1092 .map_err(runtime::Error::from)?;
1093
1094 let node_features = state
1095 .per_session_cache
1096 .node_features(session_index, finalized_hash, ctx.sender())
1097 .await
1098 .map_err(runtime::Error::from)?;
1099
1100 if FeatureIndex::CandidateReceiptV3.is_set(&node_features) {
1101 gum::info!(
1102 target: LOG_TARGET,
1103 ?session_index,
1104 "CandidateReceiptV3 node feature detected in finalized block, \
1105 enabling V3 candidate support",
1106 );
1107 state.v3_ever_seen = true;
1108 }
1109
1110 Ok(())
1111}
1112
1113macro_rules! try_runtime_api {
1114 ($x: expr) => {
1115 match $x {
1116 Ok(x) => x,
1117 Err(err) => {
1118 error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1120
1121 return Ok(None);
1125 },
1126 }
1127 };
1128}
1129
1130fn core_index_from_statement(
1131 sp_state: &PerSchedulingParentState,
1132 statement: &SignedFullStatementWithPVD,
1133) -> Option<CoreIndex> {
1134 let compact_statement = statement.as_unchecked();
1135 let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1136
1137 gum::trace!(
1138 target: LOG_TARGET,
1139 group_rotation_info = ?sp_state.group_rotation_info,
1140 ?statement,
1141 validator_to_group = ?sp_state.validator_to_group,
1142 n_cores = sp_state.n_cores,
1143 ?candidate_hash,
1144 "Extracting core index from statement"
1145 );
1146
1147 let statement_validator_index = statement.validator_index();
1148 let Some(Some(group_index)) = sp_state.validator_to_group.get(statement_validator_index) else {
1149 gum::debug!(
1150 target: LOG_TARGET,
1151 group_rotation_info = ?sp_state.group_rotation_info,
1152 ?statement,
1153 validator_to_group = ?sp_state.validator_to_group,
1154 n_cores = sp_state.n_cores,
1155 ?candidate_hash,
1156 "Invalid validator index: {:?}",
1157 statement_validator_index
1158 );
1159 return None;
1160 };
1161
1162 let core_index =
1164 sp_state.group_rotation_info.core_for_group(*group_index, sp_state.n_cores as _);
1165
1166 if core_index.0 > sp_state.n_cores {
1167 gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores = sp_state.n_cores, "Invalid CoreIndex");
1168 return None;
1169 }
1170
1171 if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1172 let candidate_para_id = candidate.descriptor.para_id();
1173 let mut assigned_paras = sp_state.claim_queue.iter_claims_for_core(&core_index);
1174
1175 if !assigned_paras.any(|id| id == &candidate_para_id) {
1176 gum::debug!(
1177 target: LOG_TARGET,
1178 ?candidate_hash,
1179 ?core_index,
1180 assigned_paras = ?sp_state.claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1181 ?candidate_para_id,
1182 "Invalid CoreIndex, core is not assigned to this para_id"
1183 );
1184 return None;
1185 }
1186 Some(core_index)
1187 } else {
1188 Some(core_index)
1189 }
1190}
1191
1192#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1194async fn construct_per_scheduling_parent_state<Context>(
1195 ctx: &mut Context,
1196 relay_parent: Hash,
1197 keystore: &KeystorePtr,
1198 per_session_cache: &mut PerSessionCache,
1199) -> Result<Option<PerSchedulingParentState>, Error> {
1200 let parent = relay_parent;
1201
1202 let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1203 request_session_index_for_child(parent, ctx.sender()).await,
1204 request_validator_groups(parent, ctx.sender()).await,
1205 request_claim_queue(parent, ctx.sender()).await,
1206 request_disabled_validators(parent, ctx.sender()).await,
1207 )
1208 .map_err(Error::JoinMultiple)?;
1209
1210 let session_index = try_runtime_api!(session_index);
1211
1212 let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1213 let validators = try_runtime_api!(validators);
1214
1215 let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1216 let node_features = try_runtime_api!(node_features);
1217
1218 gum::debug!(target: LOG_TARGET, ?parent, "New state");
1219
1220 let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1221
1222 let minimum_backing_votes = per_session_cache
1223 .minimum_backing_votes(session_index, parent, ctx.sender())
1224 .await;
1225 let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1226 let claim_queue = try_runtime_api!(claim_queue);
1227 let disabled_validators = try_runtime_api!(disabled_validators);
1228
1229 let signing_context = SigningContext { parent_hash: parent, session_index };
1230 let validator = match Validator::construct(
1231 &validators,
1232 &disabled_validators,
1233 signing_context.clone(),
1234 keystore.clone(),
1235 ) {
1236 Ok(v) => Some(v),
1237 Err(util::Error::NotAValidator) => None,
1238 Err(e) => {
1239 gum::warn!(
1240 target: LOG_TARGET,
1241 err = ?e,
1242 "Cannot participate in candidate backing",
1243 );
1244
1245 return Ok(None);
1246 },
1247 };
1248
1249 let n_cores = validator_groups.len();
1250
1251 let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1252 let mut assigned_core = None;
1253
1254 for idx in 0..n_cores {
1255 let core_index = CoreIndex(idx as _);
1256
1257 if !claim_queue.contains_key(&core_index) {
1258 continue;
1259 }
1260
1261 let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1262 if let Some(g) = validator_groups.get(group_index.0 as usize) {
1263 if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1264 assigned_core = Some(core_index);
1265 }
1266 groups.insert(core_index, g.clone());
1267 }
1268 }
1269 gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1270
1271 let validator_to_group =
1272 per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1273
1274 let table_context =
1275 TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1276
1277 Ok(Some(PerSchedulingParentState {
1278 parent,
1279 node_features,
1280 assigned_core,
1281 backed: HashSet::new(),
1282 table: Table::new(),
1283 table_context,
1284 issued_statements: HashSet::new(),
1285 awaiting_validation: HashSet::new(),
1286 fallbacks: HashMap::new(),
1287 minimum_backing_votes,
1288 n_cores: validator_groups.len() as u32,
1289 claim_queue: ClaimQueueSnapshot::from(claim_queue),
1290 validator_to_group,
1291 session_index,
1292 group_rotation_info,
1293 }))
1294}
1295
1296enum SecondingAllowed {
1297 No,
1298 Yes(Vec<Hash>),
1300}
1301
1302#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1305async fn seconding_sanity_check<Context>(
1306 ctx: &mut Context,
1307 implicit_view: &ImplicitView,
1308 hypothetical_candidate: HypotheticalCandidate,
1309) -> SecondingAllowed {
1310 let mut leaves_for_seconding = Vec::new();
1311 let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1312
1313 let candidate_scheduling_parent = hypothetical_candidate.scheduling_parent();
1316 let candidate_hash = hypothetical_candidate.candidate_hash();
1317
1318 for head in implicit_view.leaves() {
1319 let allowed_parents_for_para = implicit_view.known_allowed_relay_parents_under(head);
1321 if !allowed_parents_for_para
1322 .unwrap_or_default()
1323 .contains(&candidate_scheduling_parent)
1324 {
1325 continue;
1326 }
1327
1328 let (tx, rx) = oneshot::channel();
1329 ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1330 HypotheticalMembershipRequest {
1331 candidates: vec![hypothetical_candidate.clone()],
1332 fragment_chain_relay_parent: Some(*head),
1333 },
1334 tx,
1335 ))
1336 .await;
1337 let response = rx.map_ok(move |candidate_memberships| {
1338 let is_member_or_potential = candidate_memberships
1339 .into_iter()
1340 .find_map(|(candidate, leaves)| {
1341 (candidate.candidate_hash() == candidate_hash).then_some(leaves)
1342 })
1343 .and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1344 .is_some();
1345
1346 (is_member_or_potential, head)
1347 });
1348 responses.push_back(response.boxed());
1349 }
1350
1351 if responses.is_empty() {
1352 return SecondingAllowed::No;
1353 }
1354
1355 while let Some(response) = responses.next().await {
1356 match response {
1357 Err(oneshot::Canceled) => {
1358 gum::warn!(
1359 target: LOG_TARGET,
1360 "Failed to reach prospective parachains subsystem for hypothetical membership",
1361 );
1362
1363 return SecondingAllowed::No;
1364 },
1365 Ok((is_member_or_potential, head)) => match is_member_or_potential {
1366 false => {
1367 gum::debug!(
1368 target: LOG_TARGET,
1369 ?candidate_hash,
1370 leaf_hash = ?head,
1371 "Refusing to second candidate at leaf. Is not a potential member.",
1372 );
1373 },
1374 true => {
1375 leaves_for_seconding.push(*head);
1376 },
1377 },
1378 }
1379 }
1380
1381 if leaves_for_seconding.is_empty() {
1382 SecondingAllowed::No
1383 } else {
1384 SecondingAllowed::Yes(leaves_for_seconding)
1385 }
1386}
1387
1388#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1390async fn handle_can_second_request<Context>(
1391 ctx: &mut Context,
1392 state: &State,
1393 request: CanSecondRequest,
1394 tx: oneshot::Sender<bool>,
1395) {
1396 let scheduling_parent = request.candidate_scheduling_parent;
1397 let response = if state.per_scheduling_parent.get(&scheduling_parent).is_some() {
1398 let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1399 candidate_hash: request.candidate_hash,
1400 candidate_para: request.candidate_para_id,
1401 parent_head_data_hash: request.parent_head_data_hash,
1402 candidate_scheduling_parent: scheduling_parent,
1403 };
1404
1405 let result =
1406 seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1407
1408 match result {
1409 SecondingAllowed::No => false,
1410 SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1411 }
1412 } else {
1413 false
1415 };
1416
1417 let _ = tx.send(response);
1418}
1419
1420async fn get_executor_params(
1432 per_session_cache: &mut PerSessionCache,
1433 descriptor: &CandidateDescriptorV2,
1434 sp_state: &PerSchedulingParentState,
1435 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
1436) -> Result<Arc<ExecutorParams>, Error> {
1437 let session = descriptor.session_index().unwrap_or(sp_state.session_index);
1438 per_session_cache
1439 .executor_params(session, sp_state.parent, sender)
1440 .await
1441 .map_err(|e| Error::UtilError(UtilError::RuntimeApi(e)))
1442}
1443
1444#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1445async fn handle_validated_candidate_command<Context>(
1446 ctx: &mut Context,
1447 state: &mut State,
1448 scheduling_parent: Hash,
1449 command: ValidatedCandidateCommand,
1450 metrics: &Metrics,
1451) -> Result<(), Error> {
1452 match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1453 Some(sp_state) => {
1454 let candidate_hash = command.candidate_hash();
1455 sp_state.awaiting_validation.remove(&candidate_hash);
1456
1457 match command {
1458 ValidatedCandidateCommand::Second(res) => match res {
1459 Ok(outputs) => {
1460 let BackgroundValidationOutputs {
1461 candidate,
1462 commitments,
1463 persisted_validation_data,
1464 } = outputs;
1465
1466 if sp_state.issued_statements.contains(&candidate_hash) {
1467 return Ok(());
1468 }
1469
1470 let receipt = CommittedCandidateReceipt {
1471 descriptor: candidate.descriptor.clone(),
1472 commitments,
1473 };
1474
1475 let hypothetical_candidate = HypotheticalCandidate::Complete {
1476 candidate_hash,
1477 receipt: Arc::new(receipt.clone()),
1478 persisted_validation_data: persisted_validation_data.clone(),
1479 };
1480 if let SecondingAllowed::No = seconding_sanity_check(
1484 ctx,
1485 &state.implicit_view,
1486 hypothetical_candidate,
1487 )
1488 .await
1489 {
1490 return Ok(());
1491 };
1492
1493 let statement =
1494 StatementWithPVD::Seconded(receipt, persisted_validation_data);
1495
1496 let res = sign_import_and_distribute_statement(
1500 ctx,
1501 sp_state,
1502 &mut state.per_candidate,
1503 statement,
1504 state.keystore.clone(),
1505 metrics,
1506 )
1507 .await;
1508
1509 if let Err(Error::RejectedByProspectiveParachains) = res {
1510 let candidate_hash = candidate.hash();
1511 gum::debug!(
1512 target: LOG_TARGET,
1513 scheduling_parent = ?candidate.descriptor().scheduling_parent(),
1514 ?candidate_hash,
1515 "Attempted to second candidate but was rejected by prospective parachains",
1516 );
1517
1518 ctx.send_message(CollatorProtocolMessage::Invalid(
1519 candidate.descriptor().scheduling_parent(),
1520 candidate,
1521 ))
1522 .await;
1523
1524 return Ok(());
1525 }
1526
1527 if let Some(stmt) = res? {
1528 match state.per_candidate.get_mut(&candidate_hash) {
1529 None => {
1530 gum::warn!(
1531 target: LOG_TARGET,
1532 ?candidate_hash,
1533 "Missing `per_candidate` for seconded candidate.",
1534 );
1535 },
1536 Some(p) => p.seconded_locally = true,
1537 }
1538
1539 sp_state.issued_statements.insert(candidate_hash);
1540
1541 metrics.on_candidate_seconded();
1542 ctx.send_message(CollatorProtocolMessage::Seconded(
1543 sp_state.parent,
1544 StatementWithPVD::drop_pvd_from_signed(stmt),
1545 ))
1546 .await;
1547 }
1548 },
1549 Err(candidate) => {
1550 ctx.send_message(CollatorProtocolMessage::Invalid(
1551 sp_state.parent,
1552 candidate,
1553 ))
1554 .await;
1555 },
1556 },
1557 ValidatedCandidateCommand::Attest(res) => {
1558 sp_state.fallbacks.remove(&candidate_hash);
1560 if !sp_state.issued_statements.contains(&candidate_hash) {
1562 if res.is_ok() {
1563 let statement = StatementWithPVD::Valid(candidate_hash);
1564
1565 sign_import_and_distribute_statement(
1566 ctx,
1567 sp_state,
1568 &mut state.per_candidate,
1569 statement,
1570 state.keystore.clone(),
1571 metrics,
1572 )
1573 .await?;
1574 }
1575 sp_state.issued_statements.insert(candidate_hash);
1576 }
1577 },
1578 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1579 if let Some(attesting) = sp_state.fallbacks.get_mut(&candidate_hash) {
1580 if let Some(index) = attesting.backing.pop() {
1581 attesting.from_validator = index;
1582 let attesting = attesting.clone();
1583
1584 if let Some(pvd) = state
1590 .per_candidate
1591 .get(&candidate_hash)
1592 .map(|pc| pc.persisted_validation_data.clone())
1593 {
1594 let executor_params = get_executor_params(
1595 &mut state.per_session_cache,
1596 attesting.candidate.descriptor(),
1597 sp_state,
1598 ctx.sender(),
1599 )
1600 .await?;
1601
1602 kick_off_validation_work(
1603 ctx,
1604 sp_state,
1605 pvd,
1606 &state.background_validation_tx,
1607 attesting,
1608 executor_params,
1609 )
1610 .await?;
1611 }
1612 }
1613 } else {
1614 gum::warn!(
1615 target: LOG_TARGET,
1616 "AttestNoPoV was triggered without fallback being available."
1617 );
1618 debug_assert!(false);
1619 }
1620 },
1621 }
1622 },
1623 None => {
1624 },
1627 }
1628
1629 Ok(())
1630}
1631
1632fn sign_statement(
1633 sp_state: &PerSchedulingParentState,
1634 statement: StatementWithPVD,
1635 keystore: KeystorePtr,
1636 metrics: &Metrics,
1637) -> Option<SignedFullStatementWithPVD> {
1638 let signed = sp_state
1639 .table_context
1640 .validator
1641 .as_ref()?
1642 .sign(keystore, statement)
1643 .ok()
1644 .flatten()?;
1645 metrics.on_statement_signed();
1646 Some(signed)
1647}
1648
1649#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1657async fn import_statement<Context>(
1658 ctx: &mut Context,
1659 sp_state: &mut PerSchedulingParentState,
1660 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1661 statement: &SignedFullStatementWithPVD,
1662) -> Result<Option<TableSummary>, Error> {
1663 let candidate_hash = statement.payload().candidate_hash();
1664
1665 gum::debug!(
1666 target: LOG_TARGET,
1667 statement = ?statement.payload().to_compact(),
1668 validator_index = statement.validator_index().0,
1669 ?candidate_hash,
1670 "Importing statement",
1671 );
1672
1673 if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1686 if !per_candidate.contains_key(&candidate_hash) {
1687 let (tx, rx) = oneshot::channel();
1688 ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1689 IntroduceSecondedCandidateRequest {
1690 candidate_para: candidate.descriptor.para_id(),
1691 candidate_receipt: candidate.clone(),
1692 persisted_validation_data: pvd.clone(),
1693 },
1694 tx,
1695 ))
1696 .await;
1697
1698 match rx.await {
1699 Err(oneshot::Canceled) => {
1700 gum::warn!(
1701 target: LOG_TARGET,
1702 "Could not reach the Prospective Parachains subsystem."
1703 );
1704
1705 return Err(Error::RejectedByProspectiveParachains);
1706 },
1707 Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1708 Ok(true) => {},
1709 }
1710
1711 per_candidate.insert(
1713 candidate_hash,
1714 PerCandidateState {
1715 persisted_validation_data: pvd.clone(),
1716 seconded_locally: false,
1718 scheduling_parent: candidate.descriptor.scheduling_parent(),
1720 },
1721 );
1722 }
1723 }
1724
1725 let stmt = primitive_statement_to_table(statement);
1726
1727 let core = core_index_from_statement(sp_state, statement).ok_or(Error::CoreIndexUnavailable)?;
1728
1729 Ok(sp_state.table.import_statement(&sp_state.table_context, core, stmt))
1730}
1731
1732#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1735async fn post_import_statement_actions<Context>(
1736 ctx: &mut Context,
1737 sp_state: &mut PerSchedulingParentState,
1738 summary: Option<&TableSummary>,
1739) {
1740 if let Some(attested) = summary.as_ref().and_then(|s| {
1741 sp_state.table.attested_candidate(
1742 &s.candidate,
1743 &sp_state.table_context,
1744 sp_state.minimum_backing_votes,
1745 )
1746 }) {
1747 let candidate_hash = attested.candidate.hash();
1748
1749 if sp_state.backed.insert(candidate_hash) {
1751 if let Some(backed) = table_attested_to_backed(attested, &sp_state.table_context) {
1752 let para_id = backed.candidate().descriptor.para_id();
1753 gum::debug!(
1754 target: LOG_TARGET,
1755 candidate_hash = ?candidate_hash,
1756 scheduling_parent = ?sp_state.parent,
1757 %para_id,
1758 "Candidate backed",
1759 );
1760
1761 ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1764 para_id,
1765 candidate_hash,
1766 ))
1767 .await;
1768 ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1770 } else {
1771 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1772 }
1773 } else {
1774 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1775 }
1776 } else {
1777 gum::debug!(target: LOG_TARGET, "No attested candidate");
1778 }
1779
1780 issue_new_misbehaviors(ctx, sp_state.parent, &mut sp_state.table);
1781}
1782
1783#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1785fn issue_new_misbehaviors<Context>(
1786 ctx: &mut Context,
1787 relay_parent: Hash,
1788 table: &mut Table<TableContext>,
1789) {
1790 let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1792 for (validator_id, report) in misbehaviors {
1793 ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1799 relay_parent,
1800 ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1801 ));
1802 }
1803}
1804
1805#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1807async fn sign_import_and_distribute_statement<Context>(
1808 ctx: &mut Context,
1809 sp_state: &mut PerSchedulingParentState,
1810 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1811 statement: StatementWithPVD,
1812 keystore: KeystorePtr,
1813 metrics: &Metrics,
1814) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1815 if let Some(signed_statement) = sign_statement(&*sp_state, statement, keystore, metrics) {
1816 let summary = import_statement(ctx, sp_state, per_candidate, &signed_statement).await?;
1817
1818 let smsg = StatementDistributionMessage::Share(sp_state.parent, signed_statement.clone());
1821 ctx.send_unbounded_message(smsg);
1822
1823 post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1824
1825 Ok(Some(signed_statement))
1826 } else {
1827 Ok(None)
1828 }
1829}
1830
1831#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1832async fn background_validate_and_make_available<Context>(
1833 ctx: &mut Context,
1834 sp_state: &mut PerSchedulingParentState,
1835 params: BackgroundValidationParams<
1836 impl overseer::CandidateBackingSenderTrait,
1837 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1838 >,
1839) -> Result<(), Error> {
1840 let candidate_hash = params.candidate.hash();
1841 let Some(core_index) = sp_state.assigned_core else { return Ok(()) };
1842 if sp_state.awaiting_validation.insert(candidate_hash) {
1843 let bg = async move {
1845 if let Err(error) = validate_and_make_available(params, core_index).await {
1846 if let Error::BackgroundValidationMpsc(error) = error {
1847 gum::debug!(
1848 target: LOG_TARGET,
1849 ?candidate_hash,
1850 ?error,
1851 "Mpsc background validation mpsc died during validation- leaf no longer active?"
1852 );
1853 } else {
1854 gum::error!(
1855 target: LOG_TARGET,
1856 ?candidate_hash,
1857 ?error,
1858 "Failed to validate and make available",
1859 );
1860 }
1861 }
1862 };
1863
1864 ctx.spawn("backing-validation", bg.boxed())
1865 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1866 }
1867
1868 Ok(())
1869}
1870
1871#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1873async fn kick_off_validation_work<Context>(
1874 ctx: &mut Context,
1875 sp_state: &mut PerSchedulingParentState,
1876 persisted_validation_data: PersistedValidationData,
1877 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1878 attesting: AttestingData,
1879 executor_params: Arc<ExecutorParams>,
1880) -> Result<(), Error> {
1881 match sp_state.table_context.local_validator_is_disabled() {
1883 Some(true) => {
1884 gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1885 return Ok(());
1886 },
1887 Some(false) => {}, None => {
1889 gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1890 return Ok(());
1891 },
1892 }
1893
1894 let candidate_hash = attesting.candidate.hash();
1895 if sp_state.issued_statements.contains(&candidate_hash) {
1896 return Ok(());
1897 }
1898
1899 gum::debug!(
1900 target: LOG_TARGET,
1901 candidate_hash = ?candidate_hash,
1902 candidate_receipt = ?attesting.candidate,
1903 "Kicking off validation",
1904 );
1905
1906 let bg_sender = ctx.sender().clone();
1907 let pov = PoVData::FetchFromValidator {
1908 from_validator: attesting.from_validator,
1909 candidate_hash,
1910 pov_hash: attesting.pov_hash,
1911 };
1912 let scheduling_parent = attesting.candidate.descriptor().scheduling_parent();
1913
1914 background_validate_and_make_available(
1915 ctx,
1916 sp_state,
1917 BackgroundValidationParams {
1918 sender: bg_sender,
1919 tx_command: background_validation_tx.clone(),
1920 candidate: attesting.candidate,
1921 scheduling_parent,
1922 node_features: sp_state.node_features.clone(),
1923 executor_params,
1924 persisted_validation_data,
1925 pov,
1926 n_validators: sp_state.table_context.validators.len(),
1927 make_command: ValidatedCandidateCommand::Attest,
1928 },
1929 )
1930 .await
1931}
1932
1933#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1935async fn maybe_validate_and_import<Context>(
1936 ctx: &mut Context,
1937 state: &mut State,
1938 scheduling_parent: Hash,
1939 statement: SignedFullStatementWithPVD,
1940) -> Result<(), Error> {
1941 let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1942 Some(r) => r,
1943 None => {
1944 gum::trace!(
1945 target: LOG_TARGET,
1946 ?scheduling_parent,
1947 "Received statement for unknown scheduling parent"
1948 );
1949
1950 return Ok(());
1951 },
1952 };
1953
1954 if sp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1956 gum::debug!(
1957 target: LOG_TARGET,
1958 sender_validator_idx = ?statement.validator_index(),
1959 "Not importing statement because the sender is disabled"
1960 );
1961 return Ok(());
1962 }
1963
1964 if let StatementWithPVD::Seconded(receipt, _) = statement.payload() {
1966 if let Err(reason) = receipt.descriptor.check_version_acceptance(state.v3_ever_seen) {
1967 gum::debug!(
1968 target: LOG_TARGET,
1969 ?scheduling_parent,
1970 "Not importing Seconded statement: {}",
1971 reason,
1972 );
1973 return Ok(());
1974 }
1975 }
1976
1977 let res = import_statement(ctx, sp_state, &mut state.per_candidate, &statement).await;
1978
1979 if let Err(Error::RejectedByProspectiveParachains) = res {
1982 gum::debug!(
1983 target: LOG_TARGET,
1984 ?scheduling_parent,
1985 "Statement rejected by prospective parachains."
1986 );
1987
1988 return Ok(());
1989 }
1990
1991 let summary = res?;
1992 post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1993
1994 if let Some(summary) = summary {
1995 let candidate_hash = summary.candidate;
2000
2001 if Some(summary.group_id) != sp_state.assigned_core {
2002 return Ok(());
2003 }
2004
2005 let attesting = match statement.payload() {
2006 StatementWithPVD::Seconded(receipt, _) => {
2007 let attesting = AttestingData {
2008 candidate: sp_state
2009 .table
2010 .get_candidate(&candidate_hash)
2011 .ok_or(Error::CandidateNotFound)?
2012 .to_plain(),
2013 pov_hash: receipt.descriptor.pov_hash(),
2014 from_validator: statement.validator_index(),
2015 backing: Vec::new(),
2016 };
2017 sp_state.fallbacks.insert(summary.candidate, attesting.clone());
2018 attesting
2019 },
2020 StatementWithPVD::Valid(candidate_hash) => {
2021 if let Some(attesting) = sp_state.fallbacks.get_mut(candidate_hash) {
2022 let our_index = sp_state.table_context.validator.as_ref().map(|v| v.index());
2023 if our_index == Some(statement.validator_index()) {
2024 return Ok(());
2025 }
2026
2027 if sp_state.awaiting_validation.contains(candidate_hash) {
2028 attesting.backing.push(statement.validator_index());
2030 return Ok(());
2031 } else {
2032 attesting.from_validator = statement.validator_index();
2034 attesting.clone()
2035 }
2036 } else {
2037 return Ok(());
2038 }
2039 },
2040 };
2041
2042 match sp_state.table_context.local_validator_is_disabled() {
2045 Some(true) => return Ok(()),
2046 None => return Ok(()),
2047 Some(false) => {},
2048 }
2049
2050 if let Some(pvd) = state
2053 .per_candidate
2054 .get(&candidate_hash)
2055 .map(|pc| pc.persisted_validation_data.clone())
2056 {
2057 let executor_params = get_executor_params(
2058 &mut state.per_session_cache,
2059 attesting.candidate.descriptor(),
2060 sp_state,
2061 ctx.sender(),
2062 )
2063 .await?;
2064
2065 kick_off_validation_work(
2066 ctx,
2067 sp_state,
2068 pvd,
2069 &state.background_validation_tx,
2070 attesting,
2071 executor_params,
2072 )
2073 .await?;
2074 }
2075 }
2076 Ok(())
2077}
2078
2079#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2081async fn validate_and_second<Context>(
2082 ctx: &mut Context,
2083 sp_state: &mut PerSchedulingParentState,
2084 persisted_validation_data: PersistedValidationData,
2085 candidate: &CandidateReceipt,
2086 pov: Arc<PoV>,
2087 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
2088 executor_params: Arc<ExecutorParams>,
2089) -> Result<(), Error> {
2090 let candidate_hash = candidate.hash();
2091
2092 gum::debug!(
2093 target: LOG_TARGET,
2094 candidate_hash = ?candidate_hash,
2095 candidate_receipt = ?candidate,
2096 "Validate and second candidate",
2097 );
2098
2099 let bg_sender = ctx.sender().clone();
2100 let scheduling_parent = candidate.descriptor.scheduling_parent();
2101 background_validate_and_make_available(
2102 ctx,
2103 sp_state,
2104 BackgroundValidationParams {
2105 sender: bg_sender,
2106 tx_command: background_validation_tx.clone(),
2107 candidate: candidate.clone(),
2108 scheduling_parent,
2109 node_features: sp_state.node_features.clone(),
2110 executor_params,
2111 persisted_validation_data,
2112 pov: PoVData::Ready(pov),
2113 n_validators: sp_state.table_context.validators.len(),
2114 make_command: ValidatedCandidateCommand::Second,
2115 },
2116 )
2117 .await?;
2118
2119 Ok(())
2120}
2121
2122#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2123async fn handle_second_message<Context>(
2124 ctx: &mut Context,
2125 state: &mut State,
2126 candidate: CandidateReceipt,
2127 persisted_validation_data: PersistedValidationData,
2128 pov: PoV,
2129 metrics: &Metrics,
2130) -> Result<(), Error> {
2131 let _timer = metrics.time_process_second();
2132
2133 let candidate_hash = candidate.hash();
2134
2135 if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2136 gum::warn!(
2137 target: LOG_TARGET,
2138 ?candidate_hash,
2139 "Candidate backing was asked to second candidate with wrong PVD",
2140 );
2141
2142 return Ok(());
2143 }
2144
2145 if let Err(reason) = candidate.descriptor().check_version_acceptance(state.v3_ever_seen) {
2147 gum::debug!(
2148 target: LOG_TARGET,
2149 ?candidate_hash,
2150 "Not seconding candidate: {}",
2151 reason,
2152 );
2153 ctx.send_message(CollatorProtocolMessage::Invalid(
2154 candidate.descriptor().scheduling_parent(),
2155 candidate,
2156 ))
2157 .await;
2158 return Ok(());
2159 }
2160
2161 let scheduling_parent = candidate.descriptor().scheduling_parent();
2163
2164 let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
2166 None => {
2167 gum::trace!(
2168 target: LOG_TARGET,
2169 ?scheduling_parent,
2170 ?candidate_hash,
2171 "Candidate has scheduling_parent outside of our view."
2172 );
2173
2174 return Ok(());
2175 },
2176 Some(r) => r,
2177 };
2178
2179 let assigned_core = sp_state.assigned_core;
2181 let claim_queue = &sp_state.claim_queue;
2182
2183 if sp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2186 gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2187 return Ok(());
2188 }
2189
2190 let assigned_paras = assigned_core.and_then(|core| claim_queue.0.get(&core));
2192
2193 if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2195 gum::debug!(
2196 target: LOG_TARGET,
2197 our_assignment_core = ?assigned_core,
2198 our_assignment_paras = ?assigned_paras,
2199 collation = ?candidate.descriptor().para_id(),
2200 "Subsystem asked to second for para outside of our assignment",
2201 );
2202 return Ok(());
2203 }
2204
2205 gum::debug!(
2206 target: LOG_TARGET,
2207 our_assignment_core = ?assigned_core,
2208 our_assignment_paras = ?assigned_paras,
2209 collation = ?candidate.descriptor().para_id(),
2210 "Current assignments vs collation",
2211 );
2212
2213 if !sp_state.issued_statements.contains(&candidate_hash) {
2221 let pov = Arc::new(pov);
2222
2223 let executor_params = get_executor_params(
2224 &mut state.per_session_cache,
2225 candidate.descriptor(),
2226 sp_state,
2227 ctx.sender(),
2228 )
2229 .await?;
2230
2231 validate_and_second(
2232 ctx,
2233 sp_state,
2234 persisted_validation_data,
2235 &candidate,
2236 pov,
2237 &state.background_validation_tx,
2238 executor_params,
2239 )
2240 .await?;
2241 }
2242
2243 Ok(())
2244}
2245
2246#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2247async fn handle_statement_message<Context>(
2248 ctx: &mut Context,
2249 state: &mut State,
2250 scheduling_parent: Hash,
2251 statement: SignedFullStatementWithPVD,
2252 metrics: &Metrics,
2253) -> Result<(), Error> {
2254 let _timer = metrics.time_process_statement();
2255
2256 match maybe_validate_and_import(ctx, state, scheduling_parent, statement).await {
2258 Err(Error::ValidationFailed(_)) => Ok(()),
2259 Err(e) => Err(e),
2260 Ok(()) => Ok(()),
2261 }
2262}
2263
2264fn handle_get_backable_candidates_message(
2265 state: &State,
2266 requested_candidates: HashMap<ParaId, Vec<BackableCandidateRef>>,
2267 tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2268 metrics: &Metrics,
2269) -> Result<(), Error> {
2270 let _timer = metrics.time_get_backed_candidates();
2271
2272 let mut backed = HashMap::with_capacity(requested_candidates.len());
2273
2274 for (para_id, para_candidates) in requested_candidates {
2275 for candidate_ref in para_candidates.iter() {
2276 let candidate_hash = candidate_ref.candidate_hash;
2277 let scheduling_parent = candidate_ref.scheduling_parent;
2278
2279 let sp_state = match state.per_scheduling_parent.get(&scheduling_parent) {
2280 Some(sp_state) => sp_state,
2281 None => {
2282 gum::debug!(
2283 target: LOG_TARGET,
2284 ?scheduling_parent,
2285 ?candidate_hash,
2286 "Requested candidate's scheduling parent is out of view",
2287 );
2288 break;
2289 },
2290 };
2291 let maybe_backed_candidate = sp_state
2292 .table
2293 .attested_candidate(
2294 &candidate_hash,
2295 &sp_state.table_context,
2296 sp_state.minimum_backing_votes,
2297 )
2298 .and_then(|attested| table_attested_to_backed(attested, &sp_state.table_context));
2299
2300 if let Some(backed_candidate) = maybe_backed_candidate {
2301 backed
2302 .entry(para_id)
2303 .or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2304 .push(backed_candidate);
2305 } else {
2306 break;
2307 }
2308 }
2309 }
2310
2311 tx.send(backed).map_err(|data| Error::Send(data))?;
2312 Ok(())
2313}