1#![deny(unused_crate_dependencies)]
67
68use std::{
69 collections::{HashMap, HashSet},
70 sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75 channel::{mpsc, oneshot},
76 future::BoxFuture,
77 stream::FuturesOrdered,
78 FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84 AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85 ValidationResult,
86};
87use polkadot_node_subsystem::{
88 messages::{
89 AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
90 CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
91 HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
92 ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
93 RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
94 StoreAvailableDataError,
95 },
96 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
97};
98use polkadot_node_subsystem_util::{
99 self as util,
100 backing_implicit_view::{FetchError as ImplicitViewFetchError, View as ImplicitView},
101 executor_params_at_relay_parent, request_from_runtime, request_session_index_for_child,
102 request_validator_groups, request_validators,
103 runtime::{
104 self, fetch_claim_queue, prospective_parachains_mode, request_min_backing_votes,
105 ClaimQueueSnapshot, ProspectiveParachainsMode,
106 },
107 Validator,
108};
109use polkadot_parachain_primitives::primitives::IsSystem;
110use polkadot_primitives::{
111 node_features::FeatureIndex,
112 vstaging::{
113 BackedCandidate, CandidateReceiptV2 as CandidateReceipt,
114 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState,
115 },
116 CandidateCommitments, CandidateHash, CoreIndex, ExecutorParams, GroupIndex, GroupRotationInfo,
117 Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData, SessionIndex,
118 SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
119 ValidityAttestation,
120};
121use polkadot_statement_table::{
122 generic::AttestedCandidate as TableAttestedCandidate,
123 v2::{
124 SignedStatement as TableSignedStatement, Statement as TableStatement,
125 Summary as TableSummary,
126 },
127 Config as TableConfig, Context as TableContextTrait, Table,
128};
129use sp_keystore::KeystorePtr;
130use util::runtime::{get_disabled_validators_with_fallback, request_node_features};
131
132mod error;
133
134mod metrics;
135use self::metrics::Metrics;
136
137#[cfg(test)]
138mod tests;
139
140const LOG_TARGET: &str = "parachain::candidate-backing";
141
142enum PoVData {
144 Ready(Arc<PoV>),
146 FetchFromValidator {
148 from_validator: ValidatorIndex,
149 candidate_hash: CandidateHash,
150 pov_hash: Hash,
151 },
152}
153
154enum ValidatedCandidateCommand {
155 Second(BackgroundValidationResult),
157 Attest(BackgroundValidationResult),
159 AttestNoPoV(CandidateHash),
161}
162
163impl std::fmt::Debug for ValidatedCandidateCommand {
164 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
165 let candidate_hash = self.candidate_hash();
166 match *self {
167 ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
168 ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
169 ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
170 }
171 }
172}
173
174impl ValidatedCandidateCommand {
175 fn candidate_hash(&self) -> CandidateHash {
176 match *self {
177 ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
178 ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
179 ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
180 ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
181 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
182 }
183 }
184}
185
186pub struct CandidateBackingSubsystem {
188 keystore: KeystorePtr,
189 metrics: Metrics,
190}
191
192impl CandidateBackingSubsystem {
193 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
195 Self { keystore, metrics }
196 }
197}
198
199#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
200impl<Context> CandidateBackingSubsystem
201where
202 Context: Send + Sync,
203{
204 fn start(self, ctx: Context) -> SpawnedSubsystem {
205 let future = async move {
206 run(ctx, self.keystore, self.metrics)
207 .await
208 .map_err(|e| SubsystemError::with_origin("candidate-backing", e))
209 }
210 .boxed();
211
212 SpawnedSubsystem { name: "candidate-backing-subsystem", future }
213 }
214}
215
216struct PerRelayParentState {
217 prospective_parachains_mode: ProspectiveParachainsMode,
218 parent: Hash,
220 session_index: SessionIndex,
222 assigned_core: Option<CoreIndex>,
224 backed: HashSet<CandidateHash>,
226 table: Table<TableContext>,
228 table_context: TableContext,
230 issued_statements: HashSet<CandidateHash>,
232 awaiting_validation: HashSet<CandidateHash>,
234 fallbacks: HashMap<CandidateHash, AttestingData>,
236 minimum_backing_votes: u32,
238 inject_core_index: bool,
241 n_cores: u32,
243 claim_queue: ClaimQueueSnapshot,
246 validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
248 group_rotation_info: GroupRotationInfo,
250}
251
252struct PerCandidateState {
253 persisted_validation_data: PersistedValidationData,
254 seconded_locally: bool,
255 relay_parent: Hash,
256}
257
258enum ActiveLeafState {
259 ProspectiveParachainsDisabled { seconded: HashSet<ParaId> },
262 ProspectiveParachainsEnabled { max_candidate_depth: usize, allowed_ancestry_len: usize },
263}
264
265impl ActiveLeafState {
266 fn new(mode: ProspectiveParachainsMode) -> Self {
267 match mode {
268 ProspectiveParachainsMode::Disabled =>
269 Self::ProspectiveParachainsDisabled { seconded: HashSet::new() },
270 ProspectiveParachainsMode::Enabled { max_candidate_depth, allowed_ancestry_len } =>
271 Self::ProspectiveParachainsEnabled { max_candidate_depth, allowed_ancestry_len },
272 }
273 }
274
275 fn add_seconded_candidate(&mut self, para_id: ParaId) {
276 if let Self::ProspectiveParachainsDisabled { seconded } = self {
277 seconded.insert(para_id);
278 }
279 }
280}
281
282impl From<&ActiveLeafState> for ProspectiveParachainsMode {
283 fn from(state: &ActiveLeafState) -> Self {
284 match *state {
285 ActiveLeafState::ProspectiveParachainsDisabled { .. } =>
286 ProspectiveParachainsMode::Disabled,
287 ActiveLeafState::ProspectiveParachainsEnabled {
288 max_candidate_depth,
289 allowed_ancestry_len,
290 } => ProspectiveParachainsMode::Enabled { max_candidate_depth, allowed_ancestry_len },
291 }
292 }
293}
294
295struct State {
297 implicit_view: ImplicitView,
301 per_leaf: HashMap<Hash, ActiveLeafState>,
304 per_relay_parent: HashMap<Hash, PerRelayParentState>,
320 per_candidate: HashMap<CandidateHash, PerCandidateState>,
325 validator_to_group_cache:
327 LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
328 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
331 keystore: KeystorePtr,
333}
334
335impl State {
336 fn new(
337 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
338 keystore: KeystorePtr,
339 ) -> Self {
340 State {
341 implicit_view: ImplicitView::default(),
342 per_leaf: HashMap::default(),
343 per_relay_parent: HashMap::default(),
344 per_candidate: HashMap::new(),
345 validator_to_group_cache: LruMap::new(ByLength::new(2)),
346 background_validation_tx,
347 keystore,
348 }
349 }
350}
351
352#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
353async fn run<Context>(
354 mut ctx: Context,
355 keystore: KeystorePtr,
356 metrics: Metrics,
357) -> FatalResult<()> {
358 let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
359 let mut state = State::new(background_validation_tx, keystore);
360
361 loop {
362 let res =
363 run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
364
365 match res {
366 Ok(()) => break,
367 Err(e) => crate::error::log_error(Err(e))?,
368 }
369 }
370
371 Ok(())
372}
373
374#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
375async fn run_iteration<Context>(
376 ctx: &mut Context,
377 state: &mut State,
378 metrics: &Metrics,
379 background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
380) -> Result<(), Error> {
381 loop {
382 futures::select!(
383 validated_command = background_validation_rx.next().fuse() => {
384 if let Some((relay_parent, command)) = validated_command {
385 handle_validated_candidate_command(
386 &mut *ctx,
387 state,
388 relay_parent,
389 command,
390 metrics,
391 ).await?;
392 } else {
393 panic!("background_validation_tx always alive at this point; qed");
394 }
395 }
396 from_overseer = ctx.recv().fuse() => {
397 match from_overseer.map_err(Error::OverseerExited)? {
398 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
399 handle_active_leaves_update(
400 &mut *ctx,
401 update,
402 state,
403 ).await?;
404 }
405 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
406 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
407 FromOrchestra::Communication { msg } => {
408 handle_communication(&mut *ctx, state, msg, metrics).await?;
409 }
410 }
411 }
412 )
413 }
414}
415
416#[derive(Clone)]
422struct AttestingData {
423 candidate: CandidateReceipt,
425 pov_hash: Hash,
427 from_validator: ValidatorIndex,
429 backing: Vec<ValidatorIndex>,
431}
432
433#[derive(Default, Debug)]
434struct TableContext {
435 validator: Option<Validator>,
436 groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
437 validators: Vec<ValidatorId>,
438 disabled_validators: Vec<ValidatorIndex>,
439}
440
441impl TableContext {
442 pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
444 self.disabled_validators
445 .iter()
446 .any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
447 }
448
449 pub fn local_validator_is_disabled(&self) -> Option<bool> {
451 self.validator.as_ref().map(|v| v.disabled())
452 }
453}
454
455impl TableContextTrait for TableContext {
456 type AuthorityId = ValidatorIndex;
457 type Digest = CandidateHash;
458 type GroupId = CoreIndex;
459 type Signature = ValidatorSignature;
460 type Candidate = CommittedCandidateReceipt;
461
462 fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
463 candidate.hash()
464 }
465
466 fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
467 self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
468 }
469
470 fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
471 self.groups.get(group).map(|g| g.len())
472 }
473}
474
475fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
478 let statement = match s.payload() {
479 StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
480 StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
481 };
482
483 TableSignedStatement {
484 statement,
485 signature: s.signature().clone(),
486 sender: s.validator_index(),
487 }
488}
489
490fn table_attested_to_backed(
491 attested: TableAttestedCandidate<
492 CoreIndex,
493 CommittedCandidateReceipt,
494 ValidatorIndex,
495 ValidatorSignature,
496 >,
497 table_context: &TableContext,
498 inject_core_index: bool,
499) -> Option<BackedCandidate> {
500 let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
501
502 let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
503 validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
504
505 let group = table_context.groups.get(&core_index)?;
506
507 let mut validator_indices = BitVec::with_capacity(group.len());
508
509 validator_indices.resize(group.len(), false);
510
511 let mut vote_positions = Vec::with_capacity(validity_votes.len());
515 for (orig_idx, id) in ids.iter().enumerate() {
516 if let Some(position) = group.iter().position(|x| x == id) {
517 validator_indices.set(position, true);
518 vote_positions.push((orig_idx, position));
519 } else {
520 gum::warn!(
521 target: LOG_TARGET,
522 "Logic error: Validity vote from table does not correspond to group",
523 );
524
525 return None
526 }
527 }
528 vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
529
530 Some(BackedCandidate::new(
531 candidate,
532 vote_positions
533 .into_iter()
534 .map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
535 .collect(),
536 validator_indices,
537 inject_core_index.then_some(core_index),
538 ))
539}
540
541async fn store_available_data(
542 sender: &mut impl overseer::CandidateBackingSenderTrait,
543 n_validators: u32,
544 candidate_hash: CandidateHash,
545 available_data: AvailableData,
546 expected_erasure_root: Hash,
547 core_index: CoreIndex,
548 node_features: NodeFeatures,
549) -> Result<(), Error> {
550 let (tx, rx) = oneshot::channel();
551 sender
556 .send_message(AvailabilityStoreMessage::StoreAvailableData {
557 candidate_hash,
558 n_validators,
559 available_data,
560 expected_erasure_root,
561 core_index,
562 node_features,
563 tx,
564 })
565 .await;
566
567 rx.await
568 .map_err(Error::StoreAvailableDataChannel)?
569 .map_err(Error::StoreAvailableData)
570}
571
572async fn make_pov_available(
580 sender: &mut impl overseer::CandidateBackingSenderTrait,
581 n_validators: usize,
582 pov: Arc<PoV>,
583 candidate_hash: CandidateHash,
584 validation_data: PersistedValidationData,
585 expected_erasure_root: Hash,
586 core_index: CoreIndex,
587 node_features: NodeFeatures,
588) -> Result<(), Error> {
589 store_available_data(
590 sender,
591 n_validators as u32,
592 candidate_hash,
593 AvailableData { pov, validation_data },
594 expected_erasure_root,
595 core_index,
596 node_features,
597 )
598 .await
599}
600
601async fn request_pov(
602 sender: &mut impl overseer::CandidateBackingSenderTrait,
603 relay_parent: Hash,
604 from_validator: ValidatorIndex,
605 para_id: ParaId,
606 candidate_hash: CandidateHash,
607 pov_hash: Hash,
608) -> Result<Arc<PoV>, Error> {
609 let (tx, rx) = oneshot::channel();
610 sender
611 .send_message(AvailabilityDistributionMessage::FetchPoV {
612 relay_parent,
613 from_validator,
614 para_id,
615 candidate_hash,
616 pov_hash,
617 tx,
618 })
619 .await;
620
621 let pov = rx.await.map_err(|_| Error::FetchPoV)?;
622 Ok(Arc::new(pov))
623}
624
625async fn request_candidate_validation(
626 sender: &mut impl overseer::CandidateBackingSenderTrait,
627 validation_data: PersistedValidationData,
628 validation_code: ValidationCode,
629 candidate_receipt: CandidateReceipt,
630 pov: Arc<PoV>,
631 executor_params: ExecutorParams,
632) -> Result<ValidationResult, Error> {
633 let (tx, rx) = oneshot::channel();
634 let is_system = candidate_receipt.descriptor.para_id().is_system();
635 let relay_parent = candidate_receipt.descriptor.relay_parent();
636
637 sender
638 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
639 validation_data,
640 validation_code,
641 candidate_receipt,
642 pov,
643 executor_params,
644 exec_kind: if is_system {
645 PvfExecKind::BackingSystemParas(relay_parent)
646 } else {
647 PvfExecKind::Backing(relay_parent)
648 },
649 response_sender: tx,
650 })
651 .await;
652
653 match rx.await {
654 Ok(Ok(validation_result)) => Ok(validation_result),
655 Ok(Err(err)) => Err(Error::ValidationFailed(err)),
656 Err(err) => Err(Error::ValidateFromExhaustive(err)),
657 }
658}
659
660struct BackgroundValidationOutputs {
661 candidate: CandidateReceipt,
662 commitments: CandidateCommitments,
663 persisted_validation_data: PersistedValidationData,
664}
665
666type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
667
668struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
669 sender: S,
670 tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
671 candidate: CandidateReceipt,
672 relay_parent: Hash,
673 session_index: SessionIndex,
674 persisted_validation_data: PersistedValidationData,
675 pov: PoVData,
676 n_validators: usize,
677 make_command: F,
678}
679
680async fn validate_and_make_available(
681 params: BackgroundValidationParams<
682 impl overseer::CandidateBackingSenderTrait,
683 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
684 >,
685 core_index: CoreIndex,
686) -> Result<(), Error> {
687 let BackgroundValidationParams {
688 mut sender,
689 mut tx_command,
690 candidate,
691 relay_parent,
692 session_index,
693 persisted_validation_data,
694 pov,
695 n_validators,
696 make_command,
697 } = params;
698
699 let validation_code = {
700 let validation_code_hash = candidate.descriptor().validation_code_hash();
701 let (tx, rx) = oneshot::channel();
702 sender
703 .send_message(RuntimeApiMessage::Request(
704 relay_parent,
705 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
706 ))
707 .await;
708
709 let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
710 match code {
711 Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
712 Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
713 Ok(Some(c)) => c,
714 }
715 };
716
717 let executor_params = match executor_params_at_relay_parent(relay_parent, &mut sender).await {
718 Ok(ep) => ep,
719 Err(e) => return Err(Error::UtilError(e)),
720 };
721
722 let node_features = request_node_features(relay_parent, session_index, &mut sender)
723 .await?
724 .unwrap_or(NodeFeatures::EMPTY);
725
726 let pov = match pov {
727 PoVData::Ready(pov) => pov,
728 PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } =>
729 match request_pov(
730 &mut sender,
731 relay_parent,
732 from_validator,
733 candidate.descriptor.para_id(),
734 candidate_hash,
735 pov_hash,
736 )
737 .await
738 {
739 Err(Error::FetchPoV) => {
740 tx_command
741 .send((
742 relay_parent,
743 ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
744 ))
745 .await
746 .map_err(Error::BackgroundValidationMpsc)?;
747 return Ok(())
748 },
749 Err(err) => return Err(err),
750 Ok(pov) => pov,
751 },
752 };
753
754 let v = {
755 request_candidate_validation(
756 &mut sender,
757 persisted_validation_data,
758 validation_code,
759 candidate.clone(),
760 pov.clone(),
761 executor_params,
762 )
763 .await?
764 };
765
766 let res = match v {
767 ValidationResult::Valid(commitments, validation_data) => {
768 gum::debug!(
769 target: LOG_TARGET,
770 candidate_hash = ?candidate.hash(),
771 "Validation successful",
772 );
773
774 let erasure_valid = make_pov_available(
775 &mut sender,
776 n_validators,
777 pov.clone(),
778 candidate.hash(),
779 validation_data.clone(),
780 candidate.descriptor.erasure_root(),
781 core_index,
782 node_features,
783 )
784 .await;
785
786 match erasure_valid {
787 Ok(()) => Ok(BackgroundValidationOutputs {
788 candidate,
789 commitments,
790 persisted_validation_data: validation_data,
791 }),
792 Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
793 gum::debug!(
794 target: LOG_TARGET,
795 candidate_hash = ?candidate.hash(),
796 actual_commitments = ?commitments,
797 "Erasure root doesn't match the announced by the candidate receipt",
798 );
799 Err(candidate)
800 },
801 Err(e) => return Err(e),
803 }
804 },
805 ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
806 gum::warn!(
808 target: LOG_TARGET,
809 candidate_hash = ?candidate.hash(),
810 "Validation yielded different commitments",
811 );
812 Err(candidate)
813 },
814 ValidationResult::Invalid(reason) => {
815 gum::warn!(
816 target: LOG_TARGET,
817 candidate_hash = ?candidate.hash(),
818 reason = ?reason,
819 "Validation yielded an invalid candidate",
820 );
821 Err(candidate)
822 },
823 };
824
825 tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
826}
827
828#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
829async fn handle_communication<Context>(
830 ctx: &mut Context,
831 state: &mut State,
832 message: CandidateBackingMessage,
833 metrics: &Metrics,
834) -> Result<(), Error> {
835 match message {
836 CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
837 handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
838 },
839 CandidateBackingMessage::Statement(relay_parent, statement) => {
840 handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
841 },
842 CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) =>
843 handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?,
844 CandidateBackingMessage::CanSecond(request, tx) =>
845 handle_can_second_request(ctx, state, request, tx).await,
846 }
847
848 Ok(())
849}
850
851#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
852async fn handle_active_leaves_update<Context>(
853 ctx: &mut Context,
854 update: ActiveLeavesUpdate,
855 state: &mut State,
856) -> Result<(), Error> {
857 enum LeafHasProspectiveParachains {
858 Enabled(Result<ProspectiveParachainsMode, ImplicitViewFetchError>),
859 Disabled,
860 }
861
862 let res = if let Some(leaf) = update.activated {
865 let mode = prospective_parachains_mode(ctx.sender(), leaf.hash).await?;
868
869 let leaf_hash = leaf.hash;
870 Some((
871 leaf,
872 match mode {
873 ProspectiveParachainsMode::Disabled => LeafHasProspectiveParachains::Disabled,
874 ProspectiveParachainsMode::Enabled { .. } => LeafHasProspectiveParachains::Enabled(
875 state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| mode),
876 ),
877 },
878 ))
879 } else {
880 None
881 };
882
883 for deactivated in update.deactivated {
884 state.per_leaf.remove(&deactivated);
885 state.implicit_view.deactivate_leaf(deactivated);
886 }
887
888 {
896 let remaining: HashSet<_> = state
897 .per_leaf
898 .keys()
899 .chain(state.implicit_view.all_allowed_relay_parents())
900 .collect();
901
902 state.per_relay_parent.retain(|r, _| remaining.contains(&r));
903 }
904
905 state
911 .per_candidate
912 .retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
913
914 let (fresh_relay_parents, leaf_mode) = match res {
917 None => return Ok(()),
918 Some((leaf, LeafHasProspectiveParachains::Disabled)) => {
919 if state.per_leaf.contains_key(&leaf.hash) {
921 return Ok(())
922 }
923
924 state
925 .per_leaf
926 .insert(leaf.hash, ActiveLeafState::new(ProspectiveParachainsMode::Disabled));
927
928 (vec![leaf.hash], ProspectiveParachainsMode::Disabled)
929 },
930 Some((leaf, LeafHasProspectiveParachains::Enabled(Ok(prospective_parachains_mode)))) => {
931 let fresh_relay_parents =
932 state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
933
934 let active_leaf_state = ActiveLeafState::new(prospective_parachains_mode);
935
936 state.per_leaf.insert(leaf.hash, active_leaf_state);
937
938 let fresh_relay_parent = match fresh_relay_parents {
939 Some(f) => f.to_vec(),
940 None => {
941 gum::warn!(
942 target: LOG_TARGET,
943 leaf_hash = ?leaf.hash,
944 "Implicit view gave no relay-parents"
945 );
946
947 vec![leaf.hash]
948 },
949 };
950 (fresh_relay_parent, prospective_parachains_mode)
951 },
952 Some((leaf, LeafHasProspectiveParachains::Enabled(Err(e)))) => {
953 gum::debug!(
954 target: LOG_TARGET,
955 leaf_hash = ?leaf.hash,
956 err = ?e,
957 "Failed to load implicit view for leaf."
958 );
959
960 return Ok(())
961 },
962 };
963
964 for maybe_new in fresh_relay_parents {
966 if state.per_relay_parent.contains_key(&maybe_new) {
967 continue
968 }
969
970 let mode = match state.per_leaf.get(&maybe_new) {
971 None => {
972 leaf_mode
978 },
979 Some(l) => l.into(),
980 };
981
982 let per = construct_per_relay_parent_state(
985 ctx,
986 maybe_new,
987 &state.keystore,
988 &mut state.validator_to_group_cache,
989 mode,
990 )
991 .await?;
992
993 if let Some(per) = per {
994 state.per_relay_parent.insert(maybe_new, per);
995 }
996 }
997
998 Ok(())
999}
1000
1001macro_rules! try_runtime_api {
1002 ($x: expr) => {
1003 match $x {
1004 Ok(x) => x,
1005 Err(err) => {
1006 error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1008
1009 return Ok(None)
1013 },
1014 }
1015 };
1016}
1017
1018fn core_index_from_statement(
1019 validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1020 group_rotation_info: &GroupRotationInfo,
1021 n_cores: u32,
1022 claim_queue: &ClaimQueueSnapshot,
1023 statement: &SignedFullStatementWithPVD,
1024) -> Option<CoreIndex> {
1025 let compact_statement = statement.as_unchecked();
1026 let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1027
1028 gum::trace!(
1029 target:LOG_TARGET,
1030 ?group_rotation_info,
1031 ?statement,
1032 ?validator_to_group,
1033 n_cores,
1034 ?candidate_hash,
1035 "Extracting core index from statement"
1036 );
1037
1038 let statement_validator_index = statement.validator_index();
1039 let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1040 gum::debug!(
1041 target: LOG_TARGET,
1042 ?group_rotation_info,
1043 ?statement,
1044 ?validator_to_group,
1045 n_cores,
1046 ?candidate_hash,
1047 "Invalid validator index: {:?}",
1048 statement_validator_index
1049 );
1050 return None
1051 };
1052
1053 let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1055
1056 if core_index.0 > n_cores {
1057 gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1058 return None
1059 }
1060
1061 if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1062 let candidate_para_id = candidate.descriptor.para_id();
1063 let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1064
1065 if !assigned_paras.any(|id| id == &candidate_para_id) {
1066 gum::debug!(
1067 target: LOG_TARGET,
1068 ?candidate_hash,
1069 ?core_index,
1070 assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1071 ?candidate_para_id,
1072 "Invalid CoreIndex, core is not assigned to this para_id"
1073 );
1074 return None
1075 }
1076 return Some(core_index)
1077 } else {
1078 return Some(core_index)
1079 }
1080}
1081
1082#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1084async fn construct_per_relay_parent_state<Context>(
1085 ctx: &mut Context,
1086 relay_parent: Hash,
1087 keystore: &KeystorePtr,
1088 validator_to_group_cache: &mut LruMap<
1089 SessionIndex,
1090 Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
1091 >,
1092 mode: ProspectiveParachainsMode,
1093) -> Result<Option<PerRelayParentState>, Error> {
1094 let parent = relay_parent;
1095
1096 let (session_index, validators, groups, cores) = futures::try_join!(
1097 request_session_index_for_child(parent, ctx.sender()).await,
1098 request_validators(parent, ctx.sender()).await,
1099 request_validator_groups(parent, ctx.sender()).await,
1100 request_from_runtime(parent, ctx.sender(), |tx| {
1101 RuntimeApiRequest::AvailabilityCores(tx)
1102 },)
1103 .await,
1104 )
1105 .map_err(Error::JoinMultiple)?;
1106
1107 let session_index = try_runtime_api!(session_index);
1108
1109 let inject_core_index = request_node_features(parent, session_index, ctx.sender())
1110 .await?
1111 .unwrap_or(NodeFeatures::EMPTY)
1112 .get(FeatureIndex::ElasticScalingMVP as usize)
1113 .map(|b| *b)
1114 .unwrap_or(false);
1115
1116 gum::debug!(target: LOG_TARGET, inject_core_index, ?parent, "New state");
1117
1118 let validators: Vec<_> = try_runtime_api!(validators);
1119 let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1120 let cores = try_runtime_api!(cores);
1121 let minimum_backing_votes =
1122 try_runtime_api!(request_min_backing_votes(parent, session_index, ctx.sender()).await);
1123
1124 let disabled_validators =
1129 get_disabled_validators_with_fallback(ctx.sender(), parent).await.map_err(|e| {
1130 Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed"))
1131 })?;
1132
1133 let maybe_claim_queue = try_runtime_api!(fetch_claim_queue(ctx.sender(), parent).await);
1134
1135 let signing_context = SigningContext { parent_hash: parent, session_index };
1136 let validator = match Validator::construct(
1137 &validators,
1138 &disabled_validators,
1139 signing_context.clone(),
1140 keystore.clone(),
1141 ) {
1142 Ok(v) => Some(v),
1143 Err(util::Error::NotAValidator) => None,
1144 Err(e) => {
1145 gum::warn!(
1146 target: LOG_TARGET,
1147 err = ?e,
1148 "Cannot participate in candidate backing",
1149 );
1150
1151 return Ok(None)
1152 },
1153 };
1154
1155 let n_cores = cores.len();
1156
1157 let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1158 let mut assigned_core = None;
1159
1160 let has_claim_queue = maybe_claim_queue.is_some();
1161 let mut claim_queue = maybe_claim_queue.unwrap_or_default().0;
1162
1163 for (idx, core) in cores.iter().enumerate() {
1164 let core_index = CoreIndex(idx as _);
1165
1166 if !has_claim_queue {
1167 match core {
1168 CoreState::Scheduled(scheduled) =>
1169 claim_queue.insert(core_index, [scheduled.para_id].into_iter().collect()),
1170 CoreState::Occupied(occupied) if mode.is_enabled() => {
1171 if let Some(next) = &occupied.next_up_on_available {
1174 claim_queue.insert(core_index, [next.para_id].into_iter().collect())
1175 } else {
1176 continue
1177 }
1178 },
1179 _ => continue,
1180 };
1181 } else if !claim_queue.contains_key(&core_index) {
1182 continue
1183 }
1184
1185 let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1186 if let Some(g) = validator_groups.get(group_index.0 as usize) {
1187 if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1188 assigned_core = Some(core_index);
1189 }
1190 groups.insert(core_index, g.clone());
1191 }
1192 }
1193 gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1194
1195 let validator_to_group = validator_to_group_cache
1196 .get_or_insert(session_index, || {
1197 let mut vector = vec![None; validators.len()];
1198
1199 for (group_idx, validator_group) in validator_groups.iter().enumerate() {
1200 for validator in validator_group {
1201 vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
1202 }
1203 }
1204
1205 Arc::new(IndexedVec::<_, _>::from(vector))
1206 })
1207 .expect("Just inserted");
1208
1209 let table_context = TableContext { validator, groups, validators, disabled_validators };
1210 let table_config = TableConfig {
1211 allow_multiple_seconded: match mode {
1212 ProspectiveParachainsMode::Enabled { .. } => true,
1213 ProspectiveParachainsMode::Disabled => false,
1214 },
1215 };
1216
1217 Ok(Some(PerRelayParentState {
1218 prospective_parachains_mode: mode,
1219 parent,
1220 session_index,
1221 assigned_core,
1222 backed: HashSet::new(),
1223 table: Table::new(table_config),
1224 table_context,
1225 issued_statements: HashSet::new(),
1226 awaiting_validation: HashSet::new(),
1227 fallbacks: HashMap::new(),
1228 minimum_backing_votes,
1229 inject_core_index,
1230 n_cores: cores.len() as u32,
1231 claim_queue: ClaimQueueSnapshot::from(claim_queue),
1232 validator_to_group: validator_to_group.clone(),
1233 group_rotation_info,
1234 }))
1235}
1236
1237enum SecondingAllowed {
1238 No,
1239 Yes(Vec<Hash>),
1241}
1242
1243#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1246async fn seconding_sanity_check<Context>(
1247 ctx: &mut Context,
1248 active_leaves: &HashMap<Hash, ActiveLeafState>,
1249 implicit_view: &ImplicitView,
1250 hypothetical_candidate: HypotheticalCandidate,
1251) -> SecondingAllowed {
1252 let mut leaves_for_seconding = Vec::new();
1253 let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1254
1255 let candidate_para = hypothetical_candidate.candidate_para();
1256 let candidate_relay_parent = hypothetical_candidate.relay_parent();
1257 let candidate_hash = hypothetical_candidate.candidate_hash();
1258
1259 for (head, leaf_state) in active_leaves {
1260 if ProspectiveParachainsMode::from(leaf_state).is_enabled() {
1261 let allowed_parents_for_para =
1264 implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1265 if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1266 continue
1267 }
1268
1269 let (tx, rx) = oneshot::channel();
1270 ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1271 HypotheticalMembershipRequest {
1272 candidates: vec![hypothetical_candidate.clone()],
1273 fragment_chain_relay_parent: Some(*head),
1274 },
1275 tx,
1276 ))
1277 .await;
1278 let response = rx.map_ok(move |candidate_memberships| {
1279 let is_member_or_potential = candidate_memberships
1280 .into_iter()
1281 .find_map(|(candidate, leaves)| {
1282 (candidate.candidate_hash() == candidate_hash).then_some(leaves)
1283 })
1284 .and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1285 .is_some();
1286
1287 (is_member_or_potential, head)
1288 });
1289 responses.push_back(response.boxed());
1290 } else {
1291 if *head == candidate_relay_parent {
1292 if let ActiveLeafState::ProspectiveParachainsDisabled { seconded } = leaf_state {
1293 if seconded.contains(&candidate_para) {
1294 return SecondingAllowed::No
1297 }
1298 }
1299 responses.push_back(futures::future::ok((true, head)).boxed());
1300 }
1301 }
1302 }
1303
1304 if responses.is_empty() {
1305 return SecondingAllowed::No
1306 }
1307
1308 while let Some(response) = responses.next().await {
1309 match response {
1310 Err(oneshot::Canceled) => {
1311 gum::warn!(
1312 target: LOG_TARGET,
1313 "Failed to reach prospective parachains subsystem for hypothetical membership",
1314 );
1315
1316 return SecondingAllowed::No
1317 },
1318 Ok((is_member_or_potential, head)) => match is_member_or_potential {
1319 false => {
1320 gum::debug!(
1321 target: LOG_TARGET,
1322 ?candidate_hash,
1323 leaf_hash = ?head,
1324 "Refusing to second candidate at leaf. Is not a potential member.",
1325 );
1326 },
1327 true => {
1328 leaves_for_seconding.push(*head);
1329 },
1330 },
1331 }
1332 }
1333
1334 if leaves_for_seconding.is_empty() {
1335 SecondingAllowed::No
1336 } else {
1337 SecondingAllowed::Yes(leaves_for_seconding)
1338 }
1339}
1340
1341#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1343async fn handle_can_second_request<Context>(
1344 ctx: &mut Context,
1345 state: &State,
1346 request: CanSecondRequest,
1347 tx: oneshot::Sender<bool>,
1348) {
1349 let relay_parent = request.candidate_relay_parent;
1350 let response = if state
1351 .per_relay_parent
1352 .get(&relay_parent)
1353 .map_or(false, |pr_state| pr_state.prospective_parachains_mode.is_enabled())
1354 {
1355 let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1356 candidate_hash: request.candidate_hash,
1357 candidate_para: request.candidate_para_id,
1358 parent_head_data_hash: request.parent_head_data_hash,
1359 candidate_relay_parent: relay_parent,
1360 };
1361
1362 let result = seconding_sanity_check(
1363 ctx,
1364 &state.per_leaf,
1365 &state.implicit_view,
1366 hypothetical_candidate,
1367 )
1368 .await;
1369
1370 match result {
1371 SecondingAllowed::No => false,
1372 SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1373 }
1374 } else {
1375 false
1377 };
1378
1379 let _ = tx.send(response);
1380}
1381
1382#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1383async fn handle_validated_candidate_command<Context>(
1384 ctx: &mut Context,
1385 state: &mut State,
1386 relay_parent: Hash,
1387 command: ValidatedCandidateCommand,
1388 metrics: &Metrics,
1389) -> Result<(), Error> {
1390 match state.per_relay_parent.get_mut(&relay_parent) {
1391 Some(rp_state) => {
1392 let candidate_hash = command.candidate_hash();
1393 rp_state.awaiting_validation.remove(&candidate_hash);
1394
1395 match command {
1396 ValidatedCandidateCommand::Second(res) => match res {
1397 Ok(outputs) => {
1398 let BackgroundValidationOutputs {
1399 candidate,
1400 commitments,
1401 persisted_validation_data,
1402 } = outputs;
1403
1404 if rp_state.issued_statements.contains(&candidate_hash) {
1405 return Ok(())
1406 }
1407
1408 let receipt = CommittedCandidateReceipt {
1409 descriptor: candidate.descriptor.clone(),
1410 commitments,
1411 };
1412
1413 let hypothetical_candidate = HypotheticalCandidate::Complete {
1414 candidate_hash,
1415 receipt: Arc::new(receipt.clone()),
1416 persisted_validation_data: persisted_validation_data.clone(),
1417 };
1418 let hypothetical_membership = match seconding_sanity_check(
1422 ctx,
1423 &state.per_leaf,
1424 &state.implicit_view,
1425 hypothetical_candidate,
1426 )
1427 .await
1428 {
1429 SecondingAllowed::No => return Ok(()),
1430 SecondingAllowed::Yes(membership) => membership,
1431 };
1432
1433 let statement =
1434 StatementWithPVD::Seconded(receipt, persisted_validation_data);
1435
1436 let res = sign_import_and_distribute_statement(
1440 ctx,
1441 rp_state,
1442 &mut state.per_candidate,
1443 statement,
1444 state.keystore.clone(),
1445 metrics,
1446 )
1447 .await;
1448
1449 if let Err(Error::RejectedByProspectiveParachains) = res {
1450 let candidate_hash = candidate.hash();
1451 gum::debug!(
1452 target: LOG_TARGET,
1453 relay_parent = ?candidate.descriptor().relay_parent(),
1454 ?candidate_hash,
1455 "Attempted to second candidate but was rejected by prospective parachains",
1456 );
1457
1458 ctx.send_message(CollatorProtocolMessage::Invalid(
1460 candidate.descriptor().relay_parent(),
1461 candidate,
1462 ))
1463 .await;
1464
1465 return Ok(())
1466 }
1467
1468 if let Some(stmt) = res? {
1469 match state.per_candidate.get_mut(&candidate_hash) {
1470 None => {
1471 gum::warn!(
1472 target: LOG_TARGET,
1473 ?candidate_hash,
1474 "Missing `per_candidate` for seconded candidate.",
1475 );
1476 },
1477 Some(p) => p.seconded_locally = true,
1478 }
1479
1480 for leaf in hypothetical_membership {
1482 let leaf_data = match state.per_leaf.get_mut(&leaf) {
1483 None => {
1484 gum::warn!(
1485 target: LOG_TARGET,
1486 leaf_hash = ?leaf,
1487 "Missing `per_leaf` for known active leaf."
1488 );
1489
1490 continue
1491 },
1492 Some(d) => d,
1493 };
1494
1495 leaf_data.add_seconded_candidate(candidate.descriptor().para_id());
1496 }
1497
1498 rp_state.issued_statements.insert(candidate_hash);
1499
1500 metrics.on_candidate_seconded();
1501 ctx.send_message(CollatorProtocolMessage::Seconded(
1502 rp_state.parent,
1503 StatementWithPVD::drop_pvd_from_signed(stmt),
1504 ))
1505 .await;
1506 }
1507 },
1508 Err(candidate) => {
1509 ctx.send_message(CollatorProtocolMessage::Invalid(
1510 rp_state.parent,
1511 candidate,
1512 ))
1513 .await;
1514 },
1515 },
1516 ValidatedCandidateCommand::Attest(res) => {
1517 rp_state.fallbacks.remove(&candidate_hash);
1519 if !rp_state.issued_statements.contains(&candidate_hash) {
1521 if res.is_ok() {
1522 let statement = StatementWithPVD::Valid(candidate_hash);
1523
1524 sign_import_and_distribute_statement(
1525 ctx,
1526 rp_state,
1527 &mut state.per_candidate,
1528 statement,
1529 state.keystore.clone(),
1530 metrics,
1531 )
1532 .await?;
1533 }
1534 rp_state.issued_statements.insert(candidate_hash);
1535 }
1536 },
1537 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1538 if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1539 if let Some(index) = attesting.backing.pop() {
1540 attesting.from_validator = index;
1541 let attesting = attesting.clone();
1542
1543 if let Some(pvd) = state
1549 .per_candidate
1550 .get(&candidate_hash)
1551 .map(|pc| pc.persisted_validation_data.clone())
1552 {
1553 kick_off_validation_work(
1554 ctx,
1555 rp_state,
1556 pvd,
1557 &state.background_validation_tx,
1558 attesting,
1559 )
1560 .await?;
1561 }
1562 }
1563 } else {
1564 gum::warn!(
1565 target: LOG_TARGET,
1566 "AttestNoPoV was triggered without fallback being available."
1567 );
1568 debug_assert!(false);
1569 }
1570 },
1571 }
1572 },
1573 None => {
1574 },
1577 }
1578
1579 Ok(())
1580}
1581
1582fn sign_statement(
1583 rp_state: &PerRelayParentState,
1584 statement: StatementWithPVD,
1585 keystore: KeystorePtr,
1586 metrics: &Metrics,
1587) -> Option<SignedFullStatementWithPVD> {
1588 let signed = rp_state
1589 .table_context
1590 .validator
1591 .as_ref()?
1592 .sign(keystore, statement)
1593 .ok()
1594 .flatten()?;
1595 metrics.on_statement_signed();
1596 Some(signed)
1597}
1598
1599#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1609async fn import_statement<Context>(
1610 ctx: &mut Context,
1611 rp_state: &mut PerRelayParentState,
1612 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1613 statement: &SignedFullStatementWithPVD,
1614) -> Result<Option<TableSummary>, Error> {
1615 let candidate_hash = statement.payload().candidate_hash();
1616
1617 gum::debug!(
1618 target: LOG_TARGET,
1619 statement = ?statement.payload().to_compact(),
1620 validator_index = statement.validator_index().0,
1621 ?candidate_hash,
1622 "Importing statement",
1623 );
1624
1625 if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1639 if !per_candidate.contains_key(&candidate_hash) {
1640 if rp_state.prospective_parachains_mode.is_enabled() {
1641 let (tx, rx) = oneshot::channel();
1642 ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1643 IntroduceSecondedCandidateRequest {
1644 candidate_para: candidate.descriptor.para_id(),
1645 candidate_receipt: candidate.clone(),
1646 persisted_validation_data: pvd.clone(),
1647 },
1648 tx,
1649 ))
1650 .await;
1651
1652 match rx.await {
1653 Err(oneshot::Canceled) => {
1654 gum::warn!(
1655 target: LOG_TARGET,
1656 "Could not reach the Prospective Parachains subsystem."
1657 );
1658
1659 return Err(Error::RejectedByProspectiveParachains)
1660 },
1661 Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1662 Ok(true) => {},
1663 }
1664 }
1665
1666 per_candidate.insert(
1668 candidate_hash,
1669 PerCandidateState {
1670 persisted_validation_data: pvd.clone(),
1671 seconded_locally: false,
1673 relay_parent: candidate.descriptor.relay_parent(),
1674 },
1675 );
1676 }
1677 }
1678
1679 let stmt = primitive_statement_to_table(statement);
1680
1681 let core = core_index_from_statement(
1682 &rp_state.validator_to_group,
1683 &rp_state.group_rotation_info,
1684 rp_state.n_cores,
1685 &rp_state.claim_queue,
1686 statement,
1687 )
1688 .ok_or(Error::CoreIndexUnavailable)?;
1689
1690 Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1691}
1692
1693#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1696async fn post_import_statement_actions<Context>(
1697 ctx: &mut Context,
1698 rp_state: &mut PerRelayParentState,
1699 summary: Option<&TableSummary>,
1700) {
1701 if let Some(attested) = summary.as_ref().and_then(|s| {
1702 rp_state.table.attested_candidate(
1703 &s.candidate,
1704 &rp_state.table_context,
1705 rp_state.minimum_backing_votes,
1706 )
1707 }) {
1708 let candidate_hash = attested.candidate.hash();
1709
1710 if rp_state.backed.insert(candidate_hash) {
1712 if let Some(backed) = table_attested_to_backed(
1713 attested,
1714 &rp_state.table_context,
1715 rp_state.inject_core_index,
1716 ) {
1717 let para_id = backed.candidate().descriptor.para_id();
1718 gum::debug!(
1719 target: LOG_TARGET,
1720 candidate_hash = ?candidate_hash,
1721 relay_parent = ?rp_state.parent,
1722 %para_id,
1723 "Candidate backed",
1724 );
1725
1726 if rp_state.prospective_parachains_mode.is_enabled() {
1727 ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1730 para_id,
1731 candidate_hash,
1732 ))
1733 .await;
1734 ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1736 } else {
1737 let message = ProvisionerMessage::ProvisionableData(
1743 rp_state.parent,
1744 ProvisionableData::BackedCandidate(backed.receipt()),
1745 );
1746 ctx.send_unbounded_message(message);
1747 }
1748 } else {
1749 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1750 }
1751 } else {
1752 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1753 }
1754 } else {
1755 gum::debug!(target: LOG_TARGET, "No attested candidate");
1756 }
1757
1758 issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1759}
1760
1761#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1763fn issue_new_misbehaviors<Context>(
1764 ctx: &mut Context,
1765 relay_parent: Hash,
1766 table: &mut Table<TableContext>,
1767) {
1768 let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1770 for (validator_id, report) in misbehaviors {
1771 ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1777 relay_parent,
1778 ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1779 ));
1780 }
1781}
1782
1783#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1785async fn sign_import_and_distribute_statement<Context>(
1786 ctx: &mut Context,
1787 rp_state: &mut PerRelayParentState,
1788 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1789 statement: StatementWithPVD,
1790 keystore: KeystorePtr,
1791 metrics: &Metrics,
1792) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1793 if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1794 let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1795
1796 let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1799 ctx.send_unbounded_message(smsg);
1800
1801 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1802
1803 Ok(Some(signed_statement))
1804 } else {
1805 Ok(None)
1806 }
1807}
1808
1809#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1810async fn background_validate_and_make_available<Context>(
1811 ctx: &mut Context,
1812 rp_state: &mut PerRelayParentState,
1813 params: BackgroundValidationParams<
1814 impl overseer::CandidateBackingSenderTrait,
1815 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1816 >,
1817) -> Result<(), Error> {
1818 let candidate_hash = params.candidate.hash();
1819 let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1820 if rp_state.awaiting_validation.insert(candidate_hash) {
1821 let bg = async move {
1823 if let Err(error) = validate_and_make_available(params, core_index).await {
1824 if let Error::BackgroundValidationMpsc(error) = error {
1825 gum::debug!(
1826 target: LOG_TARGET,
1827 ?candidate_hash,
1828 ?error,
1829 "Mpsc background validation mpsc died during validation- leaf no longer active?"
1830 );
1831 } else {
1832 gum::error!(
1833 target: LOG_TARGET,
1834 ?candidate_hash,
1835 ?error,
1836 "Failed to validate and make available",
1837 );
1838 }
1839 }
1840 };
1841
1842 ctx.spawn("backing-validation", bg.boxed())
1843 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1844 }
1845
1846 Ok(())
1847}
1848
1849#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1851async fn kick_off_validation_work<Context>(
1852 ctx: &mut Context,
1853 rp_state: &mut PerRelayParentState,
1854 persisted_validation_data: PersistedValidationData,
1855 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1856 attesting: AttestingData,
1857) -> Result<(), Error> {
1858 match rp_state.table_context.local_validator_is_disabled() {
1860 Some(true) => {
1861 gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1862 return Ok(())
1863 },
1864 Some(false) => {}, None => {
1866 gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1867 return Ok(())
1868 },
1869 }
1870
1871 let candidate_hash = attesting.candidate.hash();
1872 if rp_state.issued_statements.contains(&candidate_hash) {
1873 return Ok(())
1874 }
1875
1876 gum::debug!(
1877 target: LOG_TARGET,
1878 candidate_hash = ?candidate_hash,
1879 candidate_receipt = ?attesting.candidate,
1880 "Kicking off validation",
1881 );
1882
1883 let bg_sender = ctx.sender().clone();
1884 let pov = PoVData::FetchFromValidator {
1885 from_validator: attesting.from_validator,
1886 candidate_hash,
1887 pov_hash: attesting.pov_hash,
1888 };
1889
1890 background_validate_and_make_available(
1891 ctx,
1892 rp_state,
1893 BackgroundValidationParams {
1894 sender: bg_sender,
1895 tx_command: background_validation_tx.clone(),
1896 candidate: attesting.candidate,
1897 relay_parent: rp_state.parent,
1898 session_index: rp_state.session_index,
1899 persisted_validation_data,
1900 pov,
1901 n_validators: rp_state.table_context.validators.len(),
1902 make_command: ValidatedCandidateCommand::Attest,
1903 },
1904 )
1905 .await
1906}
1907
1908#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1910async fn maybe_validate_and_import<Context>(
1911 ctx: &mut Context,
1912 state: &mut State,
1913 relay_parent: Hash,
1914 statement: SignedFullStatementWithPVD,
1915) -> Result<(), Error> {
1916 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1917 Some(r) => r,
1918 None => {
1919 gum::trace!(
1920 target: LOG_TARGET,
1921 ?relay_parent,
1922 "Received statement for unknown relay-parent"
1923 );
1924
1925 return Ok(())
1926 },
1927 };
1928
1929 if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1931 gum::debug!(
1932 target: LOG_TARGET,
1933 sender_validator_idx = ?statement.validator_index(),
1934 "Not importing statement because the sender is disabled"
1935 );
1936 return Ok(())
1937 }
1938
1939 let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1940
1941 if let Err(Error::RejectedByProspectiveParachains) = res {
1944 gum::debug!(
1945 target: LOG_TARGET,
1946 ?relay_parent,
1947 "Statement rejected by prospective parachains."
1948 );
1949
1950 return Ok(())
1951 }
1952
1953 let summary = res?;
1954 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1955
1956 if let Some(summary) = summary {
1957 let candidate_hash = summary.candidate;
1962
1963 if Some(summary.group_id) != rp_state.assigned_core {
1964 return Ok(())
1965 }
1966
1967 let attesting = match statement.payload() {
1968 StatementWithPVD::Seconded(receipt, _) => {
1969 let attesting = AttestingData {
1970 candidate: rp_state
1971 .table
1972 .get_candidate(&candidate_hash)
1973 .ok_or(Error::CandidateNotFound)?
1974 .to_plain(),
1975 pov_hash: receipt.descriptor.pov_hash(),
1976 from_validator: statement.validator_index(),
1977 backing: Vec::new(),
1978 };
1979 rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1980 attesting
1981 },
1982 StatementWithPVD::Valid(candidate_hash) => {
1983 if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1984 let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1985 if our_index == Some(statement.validator_index()) {
1986 return Ok(())
1987 }
1988
1989 if rp_state.awaiting_validation.contains(candidate_hash) {
1990 attesting.backing.push(statement.validator_index());
1992 return Ok(())
1993 } else {
1994 attesting.from_validator = statement.validator_index();
1996 attesting.clone()
1997 }
1998 } else {
1999 return Ok(())
2000 }
2001 },
2002 };
2003
2004 if let Some(pvd) = state
2007 .per_candidate
2008 .get(&candidate_hash)
2009 .map(|pc| pc.persisted_validation_data.clone())
2010 {
2011 kick_off_validation_work(
2012 ctx,
2013 rp_state,
2014 pvd,
2015 &state.background_validation_tx,
2016 attesting,
2017 )
2018 .await?;
2019 }
2020 }
2021 Ok(())
2022}
2023
2024#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2026async fn validate_and_second<Context>(
2027 ctx: &mut Context,
2028 rp_state: &mut PerRelayParentState,
2029 persisted_validation_data: PersistedValidationData,
2030 candidate: &CandidateReceipt,
2031 pov: Arc<PoV>,
2032 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
2033) -> Result<(), Error> {
2034 let candidate_hash = candidate.hash();
2035
2036 gum::debug!(
2037 target: LOG_TARGET,
2038 candidate_hash = ?candidate_hash,
2039 candidate_receipt = ?candidate,
2040 "Validate and second candidate",
2041 );
2042
2043 let bg_sender = ctx.sender().clone();
2044 background_validate_and_make_available(
2045 ctx,
2046 rp_state,
2047 BackgroundValidationParams {
2048 sender: bg_sender,
2049 tx_command: background_validation_tx.clone(),
2050 candidate: candidate.clone(),
2051 relay_parent: rp_state.parent,
2052 session_index: rp_state.session_index,
2053 persisted_validation_data,
2054 pov: PoVData::Ready(pov),
2055 n_validators: rp_state.table_context.validators.len(),
2056 make_command: ValidatedCandidateCommand::Second,
2057 },
2058 )
2059 .await?;
2060
2061 Ok(())
2062}
2063
2064#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2065async fn handle_second_message<Context>(
2066 ctx: &mut Context,
2067 state: &mut State,
2068 candidate: CandidateReceipt,
2069 persisted_validation_data: PersistedValidationData,
2070 pov: PoV,
2071 metrics: &Metrics,
2072) -> Result<(), Error> {
2073 let _timer = metrics.time_process_second();
2074
2075 let candidate_hash = candidate.hash();
2076 let relay_parent = candidate.descriptor().relay_parent();
2077
2078 if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2079 gum::warn!(
2080 target: LOG_TARGET,
2081 ?candidate_hash,
2082 "Candidate backing was asked to second candidate with wrong PVD",
2083 );
2084
2085 return Ok(())
2086 }
2087
2088 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2089 None => {
2090 gum::trace!(
2091 target: LOG_TARGET,
2092 ?relay_parent,
2093 ?candidate_hash,
2094 "We were asked to second a candidate outside of our view."
2095 );
2096
2097 return Ok(())
2098 },
2099 Some(r) => r,
2100 };
2101
2102 if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2105 gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2106 return Ok(())
2107 }
2108
2109 let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2110
2111 if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2113 gum::debug!(
2114 target: LOG_TARGET,
2115 our_assignment_core = ?rp_state.assigned_core,
2116 our_assignment_paras = ?assigned_paras,
2117 collation = ?candidate.descriptor().para_id(),
2118 "Subsystem asked to second for para outside of our assignment",
2119 );
2120 return Ok(());
2121 }
2122
2123 gum::debug!(
2124 target: LOG_TARGET,
2125 our_assignment_core = ?rp_state.assigned_core,
2126 our_assignment_paras = ?assigned_paras,
2127 collation = ?candidate.descriptor().para_id(),
2128 "Current assignments vs collation",
2129 );
2130
2131 if !rp_state.issued_statements.contains(&candidate_hash) {
2139 let pov = Arc::new(pov);
2140
2141 validate_and_second(
2142 ctx,
2143 rp_state,
2144 persisted_validation_data,
2145 &candidate,
2146 pov,
2147 &state.background_validation_tx,
2148 )
2149 .await?;
2150 }
2151
2152 Ok(())
2153}
2154
2155#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2156async fn handle_statement_message<Context>(
2157 ctx: &mut Context,
2158 state: &mut State,
2159 relay_parent: Hash,
2160 statement: SignedFullStatementWithPVD,
2161 metrics: &Metrics,
2162) -> Result<(), Error> {
2163 let _timer = metrics.time_process_statement();
2164
2165 match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2167 Err(Error::ValidationFailed(_)) => Ok(()),
2168 Err(e) => Err(e),
2169 Ok(()) => Ok(()),
2170 }
2171}
2172
2173fn handle_get_backable_candidates_message(
2174 state: &State,
2175 requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2176 tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2177 metrics: &Metrics,
2178) -> Result<(), Error> {
2179 let _timer = metrics.time_get_backed_candidates();
2180
2181 let mut backed = HashMap::with_capacity(requested_candidates.len());
2182
2183 for (para_id, para_candidates) in requested_candidates {
2184 for (candidate_hash, relay_parent) in para_candidates.iter() {
2185 let rp_state = match state.per_relay_parent.get(&relay_parent) {
2186 Some(rp_state) => rp_state,
2187 None => {
2188 gum::debug!(
2189 target: LOG_TARGET,
2190 ?relay_parent,
2191 ?candidate_hash,
2192 "Requested candidate's relay parent is out of view",
2193 );
2194 break
2195 },
2196 };
2197 let maybe_backed_candidate = rp_state
2198 .table
2199 .attested_candidate(
2200 candidate_hash,
2201 &rp_state.table_context,
2202 rp_state.minimum_backing_votes,
2203 )
2204 .and_then(|attested| {
2205 table_attested_to_backed(
2206 attested,
2207 &rp_state.table_context,
2208 rp_state.inject_core_index,
2209 )
2210 });
2211
2212 if let Some(backed_candidate) = maybe_backed_candidate {
2213 backed
2214 .entry(para_id)
2215 .or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2216 .push(backed_candidate);
2217 } else {
2218 break
2219 }
2220 }
2221 }
2222
2223 tx.send(backed).map_err(|data| Error::Send(data))?;
2224 Ok(())
2225}