1#![deny(unused_crate_dependencies)]
67
68use std::{
69 collections::{HashMap, HashSet},
70 sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75 channel::{mpsc, oneshot},
76 future::BoxFuture,
77 stream::FuturesOrdered,
78 FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use pezkuwi_node_subsystem::{
84 messages::{
85 AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
86 CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
87 HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
88 ProspectiveTeyrchainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
89 RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
90 StoreAvailableDataError,
91 },
92 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
93 SubsystemError,
94};
95use pezkuwi_node_subsystem_util::{
96 self as util,
97 backing_implicit_view::View as ImplicitView,
98 request_claim_queue, request_disabled_validators, request_min_backing_votes,
99 request_node_features, request_session_executor_params, request_session_index_for_child,
100 request_validator_groups, request_validators,
101 runtime::{self, ClaimQueueSnapshot},
102 Validator,
103};
104use pezkuwi_pez_node_primitives::{
105 AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
106 ValidationResult,
107};
108use pezkuwi_primitives::{
109 BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceiptV2 as CandidateReceipt,
110 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, ExecutorParams,
111 GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures,
112 PersistedValidationData, SessionIndex, SigningContext, ValidationCode, ValidatorId,
113 ValidatorIndex, ValidatorSignature, ValidityAttestation,
114};
115use pezkuwi_statement_table::{
116 generic::AttestedCandidate as TableAttestedCandidate,
117 v2::{
118 SignedStatement as TableSignedStatement, Statement as TableStatement,
119 Summary as TableSummary,
120 },
121 Context as TableContextTrait, Table,
122};
123use pezkuwi_teyrchain_primitives::primitives::IsSystem;
124use pezsp_keystore::KeystorePtr;
125
126mod error;
127
128mod metrics;
129use self::metrics::Metrics;
130
131#[cfg(test)]
132mod tests;
133
134const LOG_TARGET: &str = "teyrchain::candidate-backing";
135
136enum PoVData {
138 Ready(Arc<PoV>),
140 FetchFromValidator {
142 from_validator: ValidatorIndex,
143 candidate_hash: CandidateHash,
144 pov_hash: Hash,
145 },
146}
147
148enum ValidatedCandidateCommand {
149 Second(BackgroundValidationResult),
151 Attest(BackgroundValidationResult),
153 AttestNoPoV(CandidateHash),
155}
156
157impl std::fmt::Debug for ValidatedCandidateCommand {
158 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
159 let candidate_hash = self.candidate_hash();
160 match *self {
161 ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
162 ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
163 ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
164 }
165 }
166}
167
168impl ValidatedCandidateCommand {
169 fn candidate_hash(&self) -> CandidateHash {
170 match *self {
171 ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
172 ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
173 ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
174 ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
175 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
176 }
177 }
178}
179
180pub struct CandidateBackingSubsystem {
182 keystore: KeystorePtr,
183 metrics: Metrics,
184}
185
186impl CandidateBackingSubsystem {
187 pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
189 Self { keystore, metrics }
190 }
191}
192
193#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
194impl<Context> CandidateBackingSubsystem
195where
196 Context: Send + Sync,
197{
198 fn start(self, ctx: Context) -> SpawnedSubsystem {
199 let future = async move {
200 run(ctx, self.keystore, self.metrics)
201 .await
202 .map_err(|e| SubsystemError::with_origin("candidate-backing", e))
203 }
204 .boxed();
205
206 SpawnedSubsystem { name: "candidate-backing-subsystem", future }
207 }
208}
209
210struct PerRelayParentState {
211 parent: Hash,
213 node_features: NodeFeatures,
215 executor_params: Arc<ExecutorParams>,
217 assigned_core: Option<CoreIndex>,
219 backed: HashSet<CandidateHash>,
221 table: Table<TableContext>,
223 table_context: TableContext,
225 issued_statements: HashSet<CandidateHash>,
227 awaiting_validation: HashSet<CandidateHash>,
229 fallbacks: HashMap<CandidateHash, AttestingData>,
231 minimum_backing_votes: u32,
233 n_cores: u32,
235 claim_queue: ClaimQueueSnapshot,
238 validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
240 group_rotation_info: GroupRotationInfo,
242}
243
244struct PerCandidateState {
245 persisted_validation_data: PersistedValidationData,
246 seconded_locally: bool,
247 relay_parent: Hash,
248}
249
250struct PerSessionCache {
253 validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
255 node_features_cache: LruMap<SessionIndex, NodeFeatures>,
257 executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
259 minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
261 validator_to_group_cache:
263 LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
264}
265
266impl Default for PerSessionCache {
267 fn default() -> Self {
269 Self::new(2)
270 }
271}
272
273impl PerSessionCache {
274 fn new(capacity: u32) -> Self {
276 PerSessionCache {
277 validators_cache: LruMap::new(ByLength::new(capacity)),
278 node_features_cache: LruMap::new(ByLength::new(capacity)),
279 executor_params_cache: LruMap::new(ByLength::new(capacity)),
280 minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
281 validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
282 }
283 }
284
285 async fn validators(
287 &mut self,
288 session_index: SessionIndex,
289 parent: Hash,
290 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
291 ) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
292 if let Some(validators) = self.validators_cache.get(&session_index) {
294 return Ok(Arc::clone(validators));
295 }
296
297 let validators: Vec<ValidatorId> =
299 request_validators(parent, sender).await.await.map_err(|err| {
300 RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
301 })??;
302
303 let validators = Arc::new(validators);
305
306 self.validators_cache.insert(session_index, Arc::clone(&validators));
308
309 Ok(validators)
310 }
311
312 async fn node_features(
314 &mut self,
315 session_index: SessionIndex,
316 parent: Hash,
317 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
318 ) -> Result<NodeFeatures, RuntimeApiError> {
319 if let Some(node_features) = self.node_features_cache.get(&session_index) {
321 return Ok(node_features.clone());
322 }
323
324 let node_features = request_node_features(parent, session_index, sender)
326 .await
327 .await
328 .map_err(|err| RuntimeApiError::Execution {
329 runtime_api_name: "NodeFeatures",
330 source: Arc::new(err),
331 })??;
332
333 self.node_features_cache.insert(session_index, node_features.clone());
335
336 Ok(node_features)
337 }
338
339 async fn executor_params(
342 &mut self,
343 session_index: SessionIndex,
344 parent: Hash,
345 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
346 ) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
347 if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
349 return Ok(Arc::clone(executor_params));
350 }
351
352 let executor_params = request_session_executor_params(parent, session_index, sender)
354 .await
355 .await
356 .map_err(|err| RuntimeApiError::Execution {
357 runtime_api_name: "SessionExecutorParams",
358 source: Arc::new(err),
359 })??
360 .ok_or_else(|| RuntimeApiError::Execution {
361 runtime_api_name: "SessionExecutorParams",
362 source: Arc::new(Error::MissingExecutorParams),
363 })?;
364
365 let executor_params = Arc::new(executor_params);
367
368 self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
370
371 Ok(executor_params)
372 }
373
374 async fn minimum_backing_votes(
377 &mut self,
378 session_index: SessionIndex,
379 parent: Hash,
380 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
381 ) -> Result<u32, RuntimeApiError> {
382 if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
384 return Ok(*minimum_backing_votes);
385 }
386
387 let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
389 .await
390 .await
391 .map_err(|err| RuntimeApiError::Execution {
392 runtime_api_name: "MinimumBackingVotes",
393 source: Arc::new(err),
394 })??;
395
396 self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
398
399 Ok(minimum_backing_votes)
400 }
401
402 fn validator_to_group(
404 &mut self,
405 session_index: SessionIndex,
406 validators: &[ValidatorId],
407 validator_groups: &[Vec<ValidatorIndex>],
408 ) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
409 let validator_to_group = self
410 .validator_to_group_cache
411 .get_or_insert(session_index, || {
412 let mut vector = vec![None; validators.len()];
413
414 for (group_idx, validator_group) in validator_groups.iter().enumerate() {
415 for validator in validator_group {
416 vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
417 }
418 }
419
420 Arc::new(IndexedVec::<_, _>::from(vector))
421 })
422 .expect("Just inserted");
423
424 Arc::clone(validator_to_group)
425 }
426}
427
428struct State {
430 implicit_view: ImplicitView,
432 per_relay_parent: HashMap<Hash, PerRelayParentState>,
435 per_candidate: HashMap<CandidateHash, PerCandidateState>,
440 per_session_cache: PerSessionCache,
443 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
446 keystore: KeystorePtr,
448}
449
450impl State {
451 fn new(
452 background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
453 keystore: KeystorePtr,
454 ) -> Self {
455 State {
456 implicit_view: ImplicitView::default(),
457 per_relay_parent: HashMap::default(),
458 per_candidate: HashMap::new(),
459 per_session_cache: PerSessionCache::default(),
460 background_validation_tx,
461 keystore,
462 }
463 }
464}
465
466#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
467async fn run<Context>(
468 mut ctx: Context,
469 keystore: KeystorePtr,
470 metrics: Metrics,
471) -> FatalResult<()> {
472 let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
473 let mut state = State::new(background_validation_tx, keystore);
474
475 loop {
476 let res =
477 run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
478
479 match res {
480 Ok(()) => break,
481 Err(e) => crate::error::log_error(Err(e))?,
482 }
483 }
484
485 Ok(())
486}
487
488#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
489async fn run_iteration<Context>(
490 ctx: &mut Context,
491 state: &mut State,
492 metrics: &Metrics,
493 background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
494) -> Result<(), Error> {
495 loop {
496 futures::select!(
497 validated_command = background_validation_rx.next().fuse() => {
498 if let Some((relay_parent, command)) = validated_command {
499 handle_validated_candidate_command(
500 &mut *ctx,
501 state,
502 relay_parent,
503 command,
504 metrics,
505 ).await?;
506 } else {
507 panic!("background_validation_tx always alive at this point; qed");
508 }
509 }
510 from_overseer = ctx.recv().fuse() => {
511 match from_overseer.map_err(Error::OverseerExited)? {
512 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
513 handle_active_leaves_update(
514 &mut *ctx,
515 update,
516 state,
517 ).await?;
518 }
519 FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
520 FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
521 FromOrchestra::Communication { msg } => {
522 handle_communication(&mut *ctx, state, msg, metrics).await?;
523 }
524 }
525 }
526 )
527 }
528}
529
530#[derive(Clone)]
536struct AttestingData {
537 candidate: CandidateReceipt,
539 pov_hash: Hash,
541 from_validator: ValidatorIndex,
543 backing: Vec<ValidatorIndex>,
545}
546
547#[derive(Default, Debug)]
548struct TableContext {
549 validator: Option<Validator>,
550 groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
551 validators: Vec<ValidatorId>,
552 disabled_validators: Vec<ValidatorIndex>,
553}
554
555impl TableContext {
556 pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
558 self.disabled_validators
559 .iter()
560 .any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
561 }
562
563 pub fn local_validator_is_disabled(&self) -> Option<bool> {
565 self.validator.as_ref().map(|v| v.disabled())
566 }
567}
568
569impl TableContextTrait for TableContext {
570 type AuthorityId = ValidatorIndex;
571 type Digest = CandidateHash;
572 type GroupId = CoreIndex;
573 type Signature = ValidatorSignature;
574 type Candidate = CommittedCandidateReceipt;
575
576 fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
577 candidate.hash()
578 }
579
580 fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
581 self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
582 }
583
584 fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
585 self.groups.get(group).map(|g| g.len())
586 }
587}
588
589fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
592 let statement = match s.payload() {
593 StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
594 StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
595 };
596
597 TableSignedStatement {
598 statement,
599 signature: s.signature().clone(),
600 sender: s.validator_index(),
601 }
602}
603
604fn table_attested_to_backed(
605 attested: TableAttestedCandidate<
606 CoreIndex,
607 CommittedCandidateReceipt,
608 ValidatorIndex,
609 ValidatorSignature,
610 >,
611 table_context: &TableContext,
612) -> Option<BackedCandidate> {
613 let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
614
615 let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
616 validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
617
618 let group = table_context.groups.get(&core_index)?;
619
620 let mut validator_indices = BitVec::with_capacity(group.len());
621
622 validator_indices.resize(group.len(), false);
623
624 let mut vote_positions = Vec::with_capacity(validity_votes.len());
628 for (orig_idx, id) in ids.iter().enumerate() {
629 if let Some(position) = group.iter().position(|x| x == id) {
630 validator_indices.set(position, true);
631 vote_positions.push((orig_idx, position));
632 } else {
633 gum::warn!(
634 target: LOG_TARGET,
635 "Logic error: Validity vote from table does not correspond to group",
636 );
637
638 return None;
639 }
640 }
641 vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
642
643 Some(BackedCandidate::new(
644 candidate,
645 vote_positions
646 .into_iter()
647 .map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
648 .collect(),
649 validator_indices,
650 core_index,
651 ))
652}
653
654async fn store_available_data(
655 sender: &mut impl overseer::CandidateBackingSenderTrait,
656 n_validators: u32,
657 candidate_hash: CandidateHash,
658 available_data: AvailableData,
659 expected_erasure_root: Hash,
660 core_index: CoreIndex,
661 node_features: NodeFeatures,
662) -> Result<(), Error> {
663 let (tx, rx) = oneshot::channel();
664 sender
669 .send_message(AvailabilityStoreMessage::StoreAvailableData {
670 candidate_hash,
671 n_validators,
672 available_data,
673 expected_erasure_root,
674 core_index,
675 node_features,
676 tx,
677 })
678 .await;
679
680 rx.await
681 .map_err(Error::StoreAvailableDataChannel)?
682 .map_err(Error::StoreAvailableData)
683}
684
685async fn make_pov_available(
693 sender: &mut impl overseer::CandidateBackingSenderTrait,
694 n_validators: usize,
695 pov: Arc<PoV>,
696 candidate_hash: CandidateHash,
697 validation_data: PersistedValidationData,
698 expected_erasure_root: Hash,
699 core_index: CoreIndex,
700 node_features: NodeFeatures,
701) -> Result<(), Error> {
702 store_available_data(
703 sender,
704 n_validators as u32,
705 candidate_hash,
706 AvailableData { pov, validation_data },
707 expected_erasure_root,
708 core_index,
709 node_features,
710 )
711 .await
712}
713
714async fn request_pov(
715 sender: &mut impl overseer::CandidateBackingSenderTrait,
716 relay_parent: Hash,
717 from_validator: ValidatorIndex,
718 para_id: ParaId,
719 candidate_hash: CandidateHash,
720 pov_hash: Hash,
721) -> Result<Arc<PoV>, Error> {
722 let (tx, rx) = oneshot::channel();
723 sender
724 .send_message(AvailabilityDistributionMessage::FetchPoV {
725 relay_parent,
726 from_validator,
727 para_id,
728 candidate_hash,
729 pov_hash,
730 tx,
731 })
732 .await;
733
734 let pov = rx.await.map_err(|_| Error::FetchPoV)?;
735 Ok(Arc::new(pov))
736}
737
738async fn request_candidate_validation(
739 sender: &mut impl overseer::CandidateBackingSenderTrait,
740 validation_data: PersistedValidationData,
741 validation_code: ValidationCode,
742 candidate_receipt: CandidateReceipt,
743 pov: Arc<PoV>,
744 executor_params: ExecutorParams,
745) -> Result<ValidationResult, Error> {
746 let (tx, rx) = oneshot::channel();
747 let is_system = candidate_receipt.descriptor.para_id().is_system();
748 let relay_parent = candidate_receipt.descriptor.relay_parent();
749
750 sender
751 .send_message(CandidateValidationMessage::ValidateFromExhaustive {
752 validation_data,
753 validation_code,
754 candidate_receipt,
755 pov,
756 executor_params,
757 exec_kind: if is_system {
758 PvfExecKind::BackingSystemParas(relay_parent)
759 } else {
760 PvfExecKind::Backing(relay_parent)
761 },
762 response_sender: tx,
763 })
764 .await;
765
766 match rx.await {
767 Ok(Ok(validation_result)) => Ok(validation_result),
768 Ok(Err(err)) => Err(Error::ValidationFailed(err)),
769 Err(err) => Err(Error::ValidateFromExhaustive(err)),
770 }
771}
772
773struct BackgroundValidationOutputs {
774 candidate: CandidateReceipt,
775 commitments: CandidateCommitments,
776 persisted_validation_data: PersistedValidationData,
777}
778
779type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
780
781struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
782 sender: S,
783 tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
784 candidate: CandidateReceipt,
785 relay_parent: Hash,
786 node_features: NodeFeatures,
787 executor_params: Arc<ExecutorParams>,
788 persisted_validation_data: PersistedValidationData,
789 pov: PoVData,
790 n_validators: usize,
791 make_command: F,
792}
793
794async fn validate_and_make_available(
795 params: BackgroundValidationParams<
796 impl overseer::CandidateBackingSenderTrait,
797 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
798 >,
799 core_index: CoreIndex,
800) -> Result<(), Error> {
801 let BackgroundValidationParams {
802 mut sender,
803 mut tx_command,
804 candidate,
805 relay_parent,
806 node_features,
807 executor_params,
808 persisted_validation_data,
809 pov,
810 n_validators,
811 make_command,
812 } = params;
813
814 let validation_code = {
815 let validation_code_hash = candidate.descriptor().validation_code_hash();
816 let (tx, rx) = oneshot::channel();
817 sender
818 .send_message(RuntimeApiMessage::Request(
819 relay_parent,
820 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
821 ))
822 .await;
823
824 let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
825 match code {
826 Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
827 Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
828 Ok(Some(c)) => c,
829 }
830 };
831
832 let pov = match pov {
833 PoVData::Ready(pov) => pov,
834 PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => {
835 match request_pov(
836 &mut sender,
837 relay_parent,
838 from_validator,
839 candidate.descriptor.para_id(),
840 candidate_hash,
841 pov_hash,
842 )
843 .await
844 {
845 Err(Error::FetchPoV) => {
846 tx_command
847 .send((
848 relay_parent,
849 ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
850 ))
851 .await
852 .map_err(Error::BackgroundValidationMpsc)?;
853 return Ok(());
854 },
855 Err(err) => return Err(err),
856 Ok(pov) => pov,
857 }
858 },
859 };
860
861 let v = {
862 request_candidate_validation(
863 &mut sender,
864 persisted_validation_data,
865 validation_code,
866 candidate.clone(),
867 pov.clone(),
868 executor_params.as_ref().clone(),
869 )
870 .await?
871 };
872
873 let res = match v {
874 ValidationResult::Valid(commitments, validation_data) => {
875 gum::debug!(
876 target: LOG_TARGET,
877 candidate_hash = ?candidate.hash(),
878 "Validation successful",
879 );
880
881 let erasure_valid = make_pov_available(
882 &mut sender,
883 n_validators,
884 pov.clone(),
885 candidate.hash(),
886 validation_data.clone(),
887 candidate.descriptor.erasure_root(),
888 core_index,
889 node_features,
890 )
891 .await;
892
893 match erasure_valid {
894 Ok(()) => Ok(BackgroundValidationOutputs {
895 candidate,
896 commitments,
897 persisted_validation_data: validation_data,
898 }),
899 Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
900 gum::debug!(
901 target: LOG_TARGET,
902 candidate_hash = ?candidate.hash(),
903 actual_commitments = ?commitments,
904 "Erasure root doesn't match the announced by the candidate receipt",
905 );
906 Err(candidate)
907 },
908 Err(e) => return Err(e),
910 }
911 },
912 ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
913 gum::warn!(
915 target: LOG_TARGET,
916 candidate_hash = ?candidate.hash(),
917 "Validation yielded different commitments",
918 );
919 Err(candidate)
920 },
921 ValidationResult::Invalid(reason) => {
922 gum::warn!(
923 target: LOG_TARGET,
924 candidate_hash = ?candidate.hash(),
925 reason = ?reason,
926 "Validation yielded an invalid candidate",
927 );
928 Err(candidate)
929 },
930 };
931
932 tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
933}
934
935#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
936async fn handle_communication<Context>(
937 ctx: &mut Context,
938 state: &mut State,
939 message: CandidateBackingMessage,
940 metrics: &Metrics,
941) -> Result<(), Error> {
942 match message {
943 CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
944 handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
945 },
946 CandidateBackingMessage::Statement(relay_parent, statement) => {
947 handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
948 },
949 CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) => {
950 handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?
951 },
952 CandidateBackingMessage::CanSecond(request, tx) => {
953 handle_can_second_request(ctx, state, request, tx).await
954 },
955 }
956
957 Ok(())
958}
959
960#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
961async fn handle_active_leaves_update<Context>(
962 ctx: &mut Context,
963 update: ActiveLeavesUpdate,
964 state: &mut State,
965) -> Result<(), Error> {
966 let res = if let Some(leaf) = update.activated {
969 let leaf_hash = leaf.hash;
970 Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
971 } else {
972 None
973 };
974
975 for deactivated in update.deactivated {
976 state.implicit_view.deactivate_leaf(deactivated);
977 }
978
979 {
983 let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
984
985 state.per_relay_parent.retain(|r, _| remaining.contains(&r));
986 }
987
988 state
991 .per_candidate
992 .retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
993
994 let fresh_relay_parents = match res {
997 None => return Ok(()),
998 Some((leaf, Ok(_))) => {
999 let fresh_relay_parents =
1000 state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
1001
1002 let fresh_relay_parent = match fresh_relay_parents {
1003 Some(f) => f.to_vec(),
1004 None => {
1005 gum::warn!(
1006 target: LOG_TARGET,
1007 leaf_hash = ?leaf.hash,
1008 "Implicit view gave no relay-parents"
1009 );
1010
1011 vec![leaf.hash]
1012 },
1013 };
1014 fresh_relay_parent
1015 },
1016 Some((leaf, Err(e))) => {
1017 gum::debug!(
1018 target: LOG_TARGET,
1019 leaf_hash = ?leaf.hash,
1020 err = ?e,
1021 "Failed to load implicit view for leaf."
1022 );
1023
1024 return Ok(());
1025 },
1026 };
1027
1028 for maybe_new in fresh_relay_parents {
1030 if state.per_relay_parent.contains_key(&maybe_new) {
1031 continue;
1032 }
1033
1034 let per = construct_per_relay_parent_state(
1037 ctx,
1038 maybe_new,
1039 &state.keystore,
1040 &mut state.per_session_cache,
1041 )
1042 .await?;
1043
1044 if let Some(per) = per {
1045 state.per_relay_parent.insert(maybe_new, per);
1046 }
1047 }
1048
1049 Ok(())
1050}
1051
1052macro_rules! try_runtime_api {
1053 ($x: expr) => {
1054 match $x {
1055 Ok(x) => x,
1056 Err(err) => {
1057 error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1059
1060 return Ok(None);
1064 },
1065 }
1066 };
1067}
1068
1069fn core_index_from_statement(
1070 validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1071 group_rotation_info: &GroupRotationInfo,
1072 n_cores: u32,
1073 claim_queue: &ClaimQueueSnapshot,
1074 statement: &SignedFullStatementWithPVD,
1075) -> Option<CoreIndex> {
1076 let compact_statement = statement.as_unchecked();
1077 let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1078
1079 gum::trace!(
1080 target:LOG_TARGET,
1081 ?group_rotation_info,
1082 ?statement,
1083 ?validator_to_group,
1084 n_cores,
1085 ?candidate_hash,
1086 "Extracting core index from statement"
1087 );
1088
1089 let statement_validator_index = statement.validator_index();
1090 let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1091 gum::debug!(
1092 target: LOG_TARGET,
1093 ?group_rotation_info,
1094 ?statement,
1095 ?validator_to_group,
1096 n_cores,
1097 ?candidate_hash,
1098 "Invalid validator index: {:?}",
1099 statement_validator_index
1100 );
1101 return None;
1102 };
1103
1104 let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1106
1107 if core_index.0 > n_cores {
1108 gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1109 return None;
1110 }
1111
1112 if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1113 let candidate_para_id = candidate.descriptor.para_id();
1114 let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1115
1116 if !assigned_paras.any(|id| id == &candidate_para_id) {
1117 gum::debug!(
1118 target: LOG_TARGET,
1119 ?candidate_hash,
1120 ?core_index,
1121 assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1122 ?candidate_para_id,
1123 "Invalid CoreIndex, core is not assigned to this para_id"
1124 );
1125 return None;
1126 }
1127 return Some(core_index);
1128 } else {
1129 return Some(core_index);
1130 }
1131}
1132
1133#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1135async fn construct_per_relay_parent_state<Context>(
1136 ctx: &mut Context,
1137 relay_parent: Hash,
1138 keystore: &KeystorePtr,
1139 per_session_cache: &mut PerSessionCache,
1140) -> Result<Option<PerRelayParentState>, Error> {
1141 let parent = relay_parent;
1142
1143 let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1144 request_session_index_for_child(parent, ctx.sender()).await,
1145 request_validator_groups(parent, ctx.sender()).await,
1146 request_claim_queue(parent, ctx.sender()).await,
1147 request_disabled_validators(parent, ctx.sender()).await,
1148 )
1149 .map_err(Error::JoinMultiple)?;
1150
1151 let session_index = try_runtime_api!(session_index);
1152
1153 let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1154 let validators = try_runtime_api!(validators);
1155
1156 let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1157 let node_features = try_runtime_api!(node_features);
1158
1159 let executor_params =
1160 per_session_cache.executor_params(session_index, parent, ctx.sender()).await;
1161 let executor_params = try_runtime_api!(executor_params);
1162
1163 gum::debug!(target: LOG_TARGET, ?parent, "New state");
1164
1165 let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1166
1167 let minimum_backing_votes = per_session_cache
1168 .minimum_backing_votes(session_index, parent, ctx.sender())
1169 .await;
1170 let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1171 let claim_queue = try_runtime_api!(claim_queue);
1172 let disabled_validators = try_runtime_api!(disabled_validators);
1173
1174 let signing_context = SigningContext { parent_hash: parent, session_index };
1175 let validator = match Validator::construct(
1176 &validators,
1177 &disabled_validators,
1178 signing_context.clone(),
1179 keystore.clone(),
1180 ) {
1181 Ok(v) => Some(v),
1182 Err(util::Error::NotAValidator) => None,
1183 Err(e) => {
1184 gum::warn!(
1185 target: LOG_TARGET,
1186 err = ?e,
1187 "Cannot participate in candidate backing",
1188 );
1189
1190 return Ok(None);
1191 },
1192 };
1193
1194 let n_cores = validator_groups.len();
1195
1196 let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1197 let mut assigned_core = None;
1198
1199 for idx in 0..n_cores {
1200 let core_index = CoreIndex(idx as _);
1201
1202 if !claim_queue.contains_key(&core_index) {
1203 continue;
1204 }
1205
1206 let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1207 if let Some(g) = validator_groups.get(group_index.0 as usize) {
1208 if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1209 assigned_core = Some(core_index);
1210 }
1211 groups.insert(core_index, g.clone());
1212 }
1213 }
1214 gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1215
1216 let validator_to_group =
1217 per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1218
1219 let table_context =
1220 TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1221
1222 Ok(Some(PerRelayParentState {
1223 parent,
1224 node_features,
1225 executor_params,
1226 assigned_core,
1227 backed: HashSet::new(),
1228 table: Table::new(),
1229 table_context,
1230 issued_statements: HashSet::new(),
1231 awaiting_validation: HashSet::new(),
1232 fallbacks: HashMap::new(),
1233 minimum_backing_votes,
1234 n_cores: validator_groups.len() as u32,
1235 claim_queue: ClaimQueueSnapshot::from(claim_queue),
1236 validator_to_group,
1237 group_rotation_info,
1238 }))
1239}
1240
1241enum SecondingAllowed {
1242 No,
1243 Yes(Vec<Hash>),
1245}
1246
1247#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1250async fn seconding_sanity_check<Context>(
1251 ctx: &mut Context,
1252 implicit_view: &ImplicitView,
1253 hypothetical_candidate: HypotheticalCandidate,
1254) -> SecondingAllowed {
1255 let mut leaves_for_seconding = Vec::new();
1256 let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1257
1258 let candidate_para = hypothetical_candidate.candidate_para();
1259 let candidate_relay_parent = hypothetical_candidate.relay_parent();
1260 let candidate_hash = hypothetical_candidate.candidate_hash();
1261
1262 for head in implicit_view.leaves() {
1263 let allowed_parents_for_para =
1266 implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1267 if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1268 continue;
1269 }
1270
1271 let (tx, rx) = oneshot::channel();
1272 ctx.send_message(ProspectiveTeyrchainsMessage::GetHypotheticalMembership(
1273 HypotheticalMembershipRequest {
1274 candidates: vec![hypothetical_candidate.clone()],
1275 fragment_chain_relay_parent: Some(*head),
1276 },
1277 tx,
1278 ))
1279 .await;
1280 let response = rx.map_ok(move |candidate_memberships| {
1281 let is_member_or_potential = candidate_memberships
1282 .into_iter()
1283 .find_map(|(candidate, leaves)| {
1284 (candidate.candidate_hash() == candidate_hash).then_some(leaves)
1285 })
1286 .and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1287 .is_some();
1288
1289 (is_member_or_potential, head)
1290 });
1291 responses.push_back(response.boxed());
1292 }
1293
1294 if responses.is_empty() {
1295 return SecondingAllowed::No;
1296 }
1297
1298 while let Some(response) = responses.next().await {
1299 match response {
1300 Err(oneshot::Canceled) => {
1301 gum::warn!(
1302 target: LOG_TARGET,
1303 "Failed to reach prospective teyrchains subsystem for hypothetical membership",
1304 );
1305
1306 return SecondingAllowed::No;
1307 },
1308 Ok((is_member_or_potential, head)) => match is_member_or_potential {
1309 false => {
1310 gum::debug!(
1311 target: LOG_TARGET,
1312 ?candidate_hash,
1313 leaf_hash = ?head,
1314 "Refusing to second candidate at leaf. Is not a potential member.",
1315 );
1316 },
1317 true => {
1318 leaves_for_seconding.push(*head);
1319 },
1320 },
1321 }
1322 }
1323
1324 if leaves_for_seconding.is_empty() {
1325 SecondingAllowed::No
1326 } else {
1327 SecondingAllowed::Yes(leaves_for_seconding)
1328 }
1329}
1330
1331#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1333async fn handle_can_second_request<Context>(
1334 ctx: &mut Context,
1335 state: &State,
1336 request: CanSecondRequest,
1337 tx: oneshot::Sender<bool>,
1338) {
1339 let relay_parent = request.candidate_relay_parent;
1340 let response = if state.per_relay_parent.get(&relay_parent).is_some() {
1341 let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1342 candidate_hash: request.candidate_hash,
1343 candidate_para: request.candidate_para_id,
1344 parent_head_data_hash: request.parent_head_data_hash,
1345 candidate_relay_parent: relay_parent,
1346 };
1347
1348 let result =
1349 seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1350
1351 match result {
1352 SecondingAllowed::No => false,
1353 SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1354 }
1355 } else {
1356 false
1358 };
1359
1360 let _ = tx.send(response);
1361}
1362
1363#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1364async fn handle_validated_candidate_command<Context>(
1365 ctx: &mut Context,
1366 state: &mut State,
1367 relay_parent: Hash,
1368 command: ValidatedCandidateCommand,
1369 metrics: &Metrics,
1370) -> Result<(), Error> {
1371 match state.per_relay_parent.get_mut(&relay_parent) {
1372 Some(rp_state) => {
1373 let candidate_hash = command.candidate_hash();
1374 rp_state.awaiting_validation.remove(&candidate_hash);
1375
1376 match command {
1377 ValidatedCandidateCommand::Second(res) => match res {
1378 Ok(outputs) => {
1379 let BackgroundValidationOutputs {
1380 candidate,
1381 commitments,
1382 persisted_validation_data,
1383 } = outputs;
1384
1385 if rp_state.issued_statements.contains(&candidate_hash) {
1386 return Ok(());
1387 }
1388
1389 let receipt = CommittedCandidateReceipt {
1390 descriptor: candidate.descriptor.clone(),
1391 commitments,
1392 };
1393
1394 let hypothetical_candidate = HypotheticalCandidate::Complete {
1395 candidate_hash,
1396 receipt: Arc::new(receipt.clone()),
1397 persisted_validation_data: persisted_validation_data.clone(),
1398 };
1399 if let SecondingAllowed::No = seconding_sanity_check(
1403 ctx,
1404 &state.implicit_view,
1405 hypothetical_candidate,
1406 )
1407 .await
1408 {
1409 return Ok(());
1410 };
1411
1412 let statement =
1413 StatementWithPVD::Seconded(receipt, persisted_validation_data);
1414
1415 let res = sign_import_and_distribute_statement(
1419 ctx,
1420 rp_state,
1421 &mut state.per_candidate,
1422 statement,
1423 state.keystore.clone(),
1424 metrics,
1425 )
1426 .await;
1427
1428 if let Err(Error::RejectedByProspectiveTeyrchains) = res {
1429 let candidate_hash = candidate.hash();
1430 gum::debug!(
1431 target: LOG_TARGET,
1432 relay_parent = ?candidate.descriptor().relay_parent(),
1433 ?candidate_hash,
1434 "Attempted to second candidate but was rejected by prospective teyrchains",
1435 );
1436
1437 ctx.send_message(CollatorProtocolMessage::Invalid(
1439 candidate.descriptor().relay_parent(),
1440 candidate,
1441 ))
1442 .await;
1443
1444 return Ok(());
1445 }
1446
1447 if let Some(stmt) = res? {
1448 match state.per_candidate.get_mut(&candidate_hash) {
1449 None => {
1450 gum::warn!(
1451 target: LOG_TARGET,
1452 ?candidate_hash,
1453 "Missing `per_candidate` for seconded candidate.",
1454 );
1455 },
1456 Some(p) => p.seconded_locally = true,
1457 }
1458
1459 rp_state.issued_statements.insert(candidate_hash);
1460
1461 metrics.on_candidate_seconded();
1462 ctx.send_message(CollatorProtocolMessage::Seconded(
1463 rp_state.parent,
1464 StatementWithPVD::drop_pvd_from_signed(stmt),
1465 ))
1466 .await;
1467 }
1468 },
1469 Err(candidate) => {
1470 ctx.send_message(CollatorProtocolMessage::Invalid(
1471 rp_state.parent,
1472 candidate,
1473 ))
1474 .await;
1475 },
1476 },
1477 ValidatedCandidateCommand::Attest(res) => {
1478 rp_state.fallbacks.remove(&candidate_hash);
1480 if !rp_state.issued_statements.contains(&candidate_hash) {
1482 if res.is_ok() {
1483 let statement = StatementWithPVD::Valid(candidate_hash);
1484
1485 sign_import_and_distribute_statement(
1486 ctx,
1487 rp_state,
1488 &mut state.per_candidate,
1489 statement,
1490 state.keystore.clone(),
1491 metrics,
1492 )
1493 .await?;
1494 }
1495 rp_state.issued_statements.insert(candidate_hash);
1496 }
1497 },
1498 ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1499 if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1500 if let Some(index) = attesting.backing.pop() {
1501 attesting.from_validator = index;
1502 let attesting = attesting.clone();
1503
1504 if let Some(pvd) = state
1510 .per_candidate
1511 .get(&candidate_hash)
1512 .map(|pc| pc.persisted_validation_data.clone())
1513 {
1514 kick_off_validation_work(
1515 ctx,
1516 rp_state,
1517 pvd,
1518 &state.background_validation_tx,
1519 attesting,
1520 )
1521 .await?;
1522 }
1523 }
1524 } else {
1525 gum::warn!(
1526 target: LOG_TARGET,
1527 "AttestNoPoV was triggered without fallback being available."
1528 );
1529 debug_assert!(false);
1530 }
1531 },
1532 }
1533 },
1534 None => {
1535 },
1538 }
1539
1540 Ok(())
1541}
1542
1543fn sign_statement(
1544 rp_state: &PerRelayParentState,
1545 statement: StatementWithPVD,
1546 keystore: KeystorePtr,
1547 metrics: &Metrics,
1548) -> Option<SignedFullStatementWithPVD> {
1549 let signed = rp_state
1550 .table_context
1551 .validator
1552 .as_ref()?
1553 .sign(keystore, statement)
1554 .ok()
1555 .flatten()?;
1556 metrics.on_statement_signed();
1557 Some(signed)
1558}
1559
1560#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1568async fn import_statement<Context>(
1569 ctx: &mut Context,
1570 rp_state: &mut PerRelayParentState,
1571 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1572 statement: &SignedFullStatementWithPVD,
1573) -> Result<Option<TableSummary>, Error> {
1574 let candidate_hash = statement.payload().candidate_hash();
1575
1576 gum::debug!(
1577 target: LOG_TARGET,
1578 statement = ?statement.payload().to_compact(),
1579 validator_index = statement.validator_index().0,
1580 ?candidate_hash,
1581 "Importing statement",
1582 );
1583
1584 if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1597 if !per_candidate.contains_key(&candidate_hash) {
1598 let (tx, rx) = oneshot::channel();
1599 ctx.send_message(ProspectiveTeyrchainsMessage::IntroduceSecondedCandidate(
1600 IntroduceSecondedCandidateRequest {
1601 candidate_para: candidate.descriptor.para_id(),
1602 candidate_receipt: candidate.clone(),
1603 persisted_validation_data: pvd.clone(),
1604 },
1605 tx,
1606 ))
1607 .await;
1608
1609 match rx.await {
1610 Err(oneshot::Canceled) => {
1611 gum::warn!(
1612 target: LOG_TARGET,
1613 "Could not reach the Prospective Teyrchains subsystem."
1614 );
1615
1616 return Err(Error::RejectedByProspectiveTeyrchains);
1617 },
1618 Ok(false) => return Err(Error::RejectedByProspectiveTeyrchains),
1619 Ok(true) => {},
1620 }
1621
1622 per_candidate.insert(
1624 candidate_hash,
1625 PerCandidateState {
1626 persisted_validation_data: pvd.clone(),
1627 seconded_locally: false,
1629 relay_parent: candidate.descriptor.relay_parent(),
1630 },
1631 );
1632 }
1633 }
1634
1635 let stmt = primitive_statement_to_table(statement);
1636
1637 let core = core_index_from_statement(
1638 &rp_state.validator_to_group,
1639 &rp_state.group_rotation_info,
1640 rp_state.n_cores,
1641 &rp_state.claim_queue,
1642 statement,
1643 )
1644 .ok_or(Error::CoreIndexUnavailable)?;
1645
1646 Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1647}
1648
1649#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1652async fn post_import_statement_actions<Context>(
1653 ctx: &mut Context,
1654 rp_state: &mut PerRelayParentState,
1655 summary: Option<&TableSummary>,
1656) {
1657 if let Some(attested) = summary.as_ref().and_then(|s| {
1658 rp_state.table.attested_candidate(
1659 &s.candidate,
1660 &rp_state.table_context,
1661 rp_state.minimum_backing_votes,
1662 )
1663 }) {
1664 let candidate_hash = attested.candidate.hash();
1665
1666 if rp_state.backed.insert(candidate_hash) {
1668 if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
1669 let para_id = backed.candidate().descriptor.para_id();
1670 gum::debug!(
1671 target: LOG_TARGET,
1672 candidate_hash = ?candidate_hash,
1673 relay_parent = ?rp_state.parent,
1674 %para_id,
1675 "Candidate backed",
1676 );
1677
1678 ctx.send_message(ProspectiveTeyrchainsMessage::CandidateBacked(
1681 para_id,
1682 candidate_hash,
1683 ))
1684 .await;
1685 ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1687 } else {
1688 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1689 }
1690 } else {
1691 gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1692 }
1693 } else {
1694 gum::debug!(target: LOG_TARGET, "No attested candidate");
1695 }
1696
1697 issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1698}
1699
1700#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1702fn issue_new_misbehaviors<Context>(
1703 ctx: &mut Context,
1704 relay_parent: Hash,
1705 table: &mut Table<TableContext>,
1706) {
1707 let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1709 for (validator_id, report) in misbehaviors {
1710 ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1716 relay_parent,
1717 ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1718 ));
1719 }
1720}
1721
1722#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1724async fn sign_import_and_distribute_statement<Context>(
1725 ctx: &mut Context,
1726 rp_state: &mut PerRelayParentState,
1727 per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1728 statement: StatementWithPVD,
1729 keystore: KeystorePtr,
1730 metrics: &Metrics,
1731) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1732 if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1733 let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1734
1735 let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1738 ctx.send_unbounded_message(smsg);
1739
1740 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1741
1742 Ok(Some(signed_statement))
1743 } else {
1744 Ok(None)
1745 }
1746}
1747
1748#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1749async fn background_validate_and_make_available<Context>(
1750 ctx: &mut Context,
1751 rp_state: &mut PerRelayParentState,
1752 params: BackgroundValidationParams<
1753 impl overseer::CandidateBackingSenderTrait,
1754 impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1755 >,
1756) -> Result<(), Error> {
1757 let candidate_hash = params.candidate.hash();
1758 let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1759 if rp_state.awaiting_validation.insert(candidate_hash) {
1760 let bg = async move {
1762 if let Err(error) = validate_and_make_available(params, core_index).await {
1763 if let Error::BackgroundValidationMpsc(error) = error {
1764 gum::debug!(
1765 target: LOG_TARGET,
1766 ?candidate_hash,
1767 ?error,
1768 "Mpsc background validation mpsc died during validation- leaf no longer active?"
1769 );
1770 } else {
1771 gum::error!(
1772 target: LOG_TARGET,
1773 ?candidate_hash,
1774 ?error,
1775 "Failed to validate and make available",
1776 );
1777 }
1778 }
1779 };
1780
1781 ctx.spawn("backing-validation", bg.boxed())
1782 .map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1783 }
1784
1785 Ok(())
1786}
1787
1788#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1790async fn kick_off_validation_work<Context>(
1791 ctx: &mut Context,
1792 rp_state: &mut PerRelayParentState,
1793 persisted_validation_data: PersistedValidationData,
1794 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1795 attesting: AttestingData,
1796) -> Result<(), Error> {
1797 match rp_state.table_context.local_validator_is_disabled() {
1799 Some(true) => {
1800 gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1801 return Ok(());
1802 },
1803 Some(false) => {}, None => {
1805 gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1806 return Ok(());
1807 },
1808 }
1809
1810 let candidate_hash = attesting.candidate.hash();
1811 if rp_state.issued_statements.contains(&candidate_hash) {
1812 return Ok(());
1813 }
1814
1815 gum::debug!(
1816 target: LOG_TARGET,
1817 candidate_hash = ?candidate_hash,
1818 candidate_receipt = ?attesting.candidate,
1819 "Kicking off validation",
1820 );
1821
1822 let bg_sender = ctx.sender().clone();
1823 let pov = PoVData::FetchFromValidator {
1824 from_validator: attesting.from_validator,
1825 candidate_hash,
1826 pov_hash: attesting.pov_hash,
1827 };
1828
1829 background_validate_and_make_available(
1830 ctx,
1831 rp_state,
1832 BackgroundValidationParams {
1833 sender: bg_sender,
1834 tx_command: background_validation_tx.clone(),
1835 candidate: attesting.candidate,
1836 relay_parent: rp_state.parent,
1837 node_features: rp_state.node_features.clone(),
1838 executor_params: Arc::clone(&rp_state.executor_params),
1839 persisted_validation_data,
1840 pov,
1841 n_validators: rp_state.table_context.validators.len(),
1842 make_command: ValidatedCandidateCommand::Attest,
1843 },
1844 )
1845 .await
1846}
1847
1848#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1850async fn maybe_validate_and_import<Context>(
1851 ctx: &mut Context,
1852 state: &mut State,
1853 relay_parent: Hash,
1854 statement: SignedFullStatementWithPVD,
1855) -> Result<(), Error> {
1856 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1857 Some(r) => r,
1858 None => {
1859 gum::trace!(
1860 target: LOG_TARGET,
1861 ?relay_parent,
1862 "Received statement for unknown relay-parent"
1863 );
1864
1865 return Ok(());
1866 },
1867 };
1868
1869 if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1871 gum::debug!(
1872 target: LOG_TARGET,
1873 sender_validator_idx = ?statement.validator_index(),
1874 "Not importing statement because the sender is disabled"
1875 );
1876 return Ok(());
1877 }
1878
1879 let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1880
1881 if let Err(Error::RejectedByProspectiveTeyrchains) = res {
1884 gum::debug!(
1885 target: LOG_TARGET,
1886 ?relay_parent,
1887 "Statement rejected by prospective teyrchains."
1888 );
1889
1890 return Ok(());
1891 }
1892
1893 let summary = res?;
1894 post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1895
1896 if let Some(summary) = summary {
1897 let candidate_hash = summary.candidate;
1902
1903 if Some(summary.group_id) != rp_state.assigned_core {
1904 return Ok(());
1905 }
1906
1907 let attesting = match statement.payload() {
1908 StatementWithPVD::Seconded(receipt, _) => {
1909 let attesting = AttestingData {
1910 candidate: rp_state
1911 .table
1912 .get_candidate(&candidate_hash)
1913 .ok_or(Error::CandidateNotFound)?
1914 .to_plain(),
1915 pov_hash: receipt.descriptor.pov_hash(),
1916 from_validator: statement.validator_index(),
1917 backing: Vec::new(),
1918 };
1919 rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1920 attesting
1921 },
1922 StatementWithPVD::Valid(candidate_hash) => {
1923 if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1924 let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1925 if our_index == Some(statement.validator_index()) {
1926 return Ok(());
1927 }
1928
1929 if rp_state.awaiting_validation.contains(candidate_hash) {
1930 attesting.backing.push(statement.validator_index());
1932 return Ok(());
1933 } else {
1934 attesting.from_validator = statement.validator_index();
1936 attesting.clone()
1937 }
1938 } else {
1939 return Ok(());
1940 }
1941 },
1942 };
1943
1944 if let Some(pvd) = state
1947 .per_candidate
1948 .get(&candidate_hash)
1949 .map(|pc| pc.persisted_validation_data.clone())
1950 {
1951 kick_off_validation_work(
1952 ctx,
1953 rp_state,
1954 pvd,
1955 &state.background_validation_tx,
1956 attesting,
1957 )
1958 .await?;
1959 }
1960 }
1961 Ok(())
1962}
1963
1964#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1966async fn validate_and_second<Context>(
1967 ctx: &mut Context,
1968 rp_state: &mut PerRelayParentState,
1969 persisted_validation_data: PersistedValidationData,
1970 candidate: &CandidateReceipt,
1971 pov: Arc<PoV>,
1972 background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1973) -> Result<(), Error> {
1974 let candidate_hash = candidate.hash();
1975
1976 gum::debug!(
1977 target: LOG_TARGET,
1978 candidate_hash = ?candidate_hash,
1979 candidate_receipt = ?candidate,
1980 "Validate and second candidate",
1981 );
1982
1983 let bg_sender = ctx.sender().clone();
1984 background_validate_and_make_available(
1985 ctx,
1986 rp_state,
1987 BackgroundValidationParams {
1988 sender: bg_sender,
1989 tx_command: background_validation_tx.clone(),
1990 candidate: candidate.clone(),
1991 relay_parent: rp_state.parent,
1992 node_features: rp_state.node_features.clone(),
1993 executor_params: Arc::clone(&rp_state.executor_params),
1994 persisted_validation_data,
1995 pov: PoVData::Ready(pov),
1996 n_validators: rp_state.table_context.validators.len(),
1997 make_command: ValidatedCandidateCommand::Second,
1998 },
1999 )
2000 .await?;
2001
2002 Ok(())
2003}
2004
2005#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2006async fn handle_second_message<Context>(
2007 ctx: &mut Context,
2008 state: &mut State,
2009 candidate: CandidateReceipt,
2010 persisted_validation_data: PersistedValidationData,
2011 pov: PoV,
2012 metrics: &Metrics,
2013) -> Result<(), Error> {
2014 let _timer = metrics.time_process_second();
2015
2016 let candidate_hash = candidate.hash();
2017 let relay_parent = candidate.descriptor().relay_parent();
2018
2019 if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2020 gum::warn!(
2021 target: LOG_TARGET,
2022 ?candidate_hash,
2023 "Candidate backing was asked to second candidate with wrong PVD",
2024 );
2025
2026 return Ok(());
2027 }
2028
2029 let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2030 None => {
2031 gum::trace!(
2032 target: LOG_TARGET,
2033 ?relay_parent,
2034 ?candidate_hash,
2035 "We were asked to second a candidate outside of our view."
2036 );
2037
2038 return Ok(());
2039 },
2040 Some(r) => r,
2041 };
2042
2043 if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2046 gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2047 return Ok(());
2048 }
2049
2050 let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2051
2052 if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2054 gum::debug!(
2055 target: LOG_TARGET,
2056 our_assignment_core = ?rp_state.assigned_core,
2057 our_assignment_paras = ?assigned_paras,
2058 collation = ?candidate.descriptor().para_id(),
2059 "Subsystem asked to second for para outside of our assignment",
2060 );
2061 return Ok(());
2062 }
2063
2064 gum::debug!(
2065 target: LOG_TARGET,
2066 our_assignment_core = ?rp_state.assigned_core,
2067 our_assignment_paras = ?assigned_paras,
2068 collation = ?candidate.descriptor().para_id(),
2069 "Current assignments vs collation",
2070 );
2071
2072 if !rp_state.issued_statements.contains(&candidate_hash) {
2080 let pov = Arc::new(pov);
2081
2082 validate_and_second(
2083 ctx,
2084 rp_state,
2085 persisted_validation_data,
2086 &candidate,
2087 pov,
2088 &state.background_validation_tx,
2089 )
2090 .await?;
2091 }
2092
2093 Ok(())
2094}
2095
2096#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2097async fn handle_statement_message<Context>(
2098 ctx: &mut Context,
2099 state: &mut State,
2100 relay_parent: Hash,
2101 statement: SignedFullStatementWithPVD,
2102 metrics: &Metrics,
2103) -> Result<(), Error> {
2104 let _timer = metrics.time_process_statement();
2105
2106 match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2108 Err(Error::ValidationFailed(_)) => Ok(()),
2109 Err(e) => Err(e),
2110 Ok(()) => Ok(()),
2111 }
2112}
2113
2114fn handle_get_backable_candidates_message(
2115 state: &State,
2116 requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2117 tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2118 metrics: &Metrics,
2119) -> Result<(), Error> {
2120 let _timer = metrics.time_get_backed_candidates();
2121
2122 let mut backed = HashMap::with_capacity(requested_candidates.len());
2123
2124 for (para_id, para_candidates) in requested_candidates {
2125 for (candidate_hash, relay_parent) in para_candidates.iter() {
2126 let rp_state = match state.per_relay_parent.get(&relay_parent) {
2127 Some(rp_state) => rp_state,
2128 None => {
2129 gum::debug!(
2130 target: LOG_TARGET,
2131 ?relay_parent,
2132 ?candidate_hash,
2133 "Requested candidate's relay parent is out of view",
2134 );
2135 break;
2136 },
2137 };
2138 let maybe_backed_candidate = rp_state
2139 .table
2140 .attested_candidate(
2141 candidate_hash,
2142 &rp_state.table_context,
2143 rp_state.minimum_backing_votes,
2144 )
2145 .and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context));
2146
2147 if let Some(backed_candidate) = maybe_backed_candidate {
2148 backed
2149 .entry(para_id)
2150 .or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2151 .push(backed_candidate);
2152 } else {
2153 break;
2154 }
2155 }
2156 }
2157
2158 tx.send(backed).map_err(|data| Error::Send(data))?;
2159 Ok(())
2160}