1#![deny(unused_crate_dependencies, unused_results)]
24#![warn(missing_docs)]
25
26use polkadot_node_core_pvf::{
27 InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
28 PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
29};
30use polkadot_node_core_pvf_common::execute::ValidationContext;
31use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
32use polkadot_node_subsystem::{
33 errors::RuntimeApiError,
34 messages::{
35 CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind,
36 RuntimeApiMessage, RuntimeApiRequest, ValidationFailed,
37 },
38 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
39 SubsystemSender,
40};
41use polkadot_node_subsystem_util::{
42 self as util, request_node_features,
43 runtime::{fetch_scheduling_lookahead, ClaimQueueSnapshot},
44};
45use polkadot_overseer::{ActivatedLeaf, ActiveLeavesUpdate};
46use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
47use polkadot_primitives::{
48 executor_params::{
49 DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
50 DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
51 },
52 node_features::FeatureIndex,
53 transpose_claim_queue, AuthorityDiscoveryId, CandidateCommitments,
54 CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
55 CandidateReceiptV2 as CandidateReceipt,
56 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, ExecutorParams, Hash,
57 PersistedValidationData, PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex,
58 ValidationCode, ValidationCodeHash, ValidatorId,
59};
60use sp_application_crypto::{AppCrypto, ByteArray};
61use sp_keystore::KeystorePtr;
62
63use codec::Encode;
64
65use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
66
67use std::{
68 collections::HashSet,
69 path::PathBuf,
70 pin::Pin,
71 sync::Arc,
72 time::{Duration, Instant},
73};
74
75use async_trait::async_trait;
76
77mod metrics;
78use self::metrics::Metrics;
79
80#[cfg(test)]
81mod tests;
82
83const LOG_TARGET: &'static str = "parachain::candidate-validation";
84
85#[cfg(not(test))]
89const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
90#[cfg(test)]
91const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
92
93const TASK_LIMIT: usize = 30;
96
97#[derive(Clone, Default)]
99pub struct Config {
100 pub artifacts_cache_path: PathBuf,
102 pub node_version: Option<String>,
104 pub secure_validator_mode: bool,
106 pub prep_worker_path: PathBuf,
108 pub exec_worker_path: PathBuf,
110 pub pvf_execute_workers_max_num: usize,
112 pub pvf_prepare_workers_soft_max_num: usize,
115 pub pvf_prepare_workers_hard_max_num: usize,
117}
118
119pub struct CandidateValidationSubsystem {
121 keystore: KeystorePtr,
122 #[allow(missing_docs)]
123 pub metrics: Metrics,
124 #[allow(missing_docs)]
125 pub pvf_metrics: polkadot_node_core_pvf::Metrics,
126 config: Option<Config>,
127}
128
129impl CandidateValidationSubsystem {
130 pub fn with_config(
132 config: Option<Config>,
133 keystore: KeystorePtr,
134 metrics: Metrics,
135 pvf_metrics: polkadot_node_core_pvf::Metrics,
136 ) -> Self {
137 CandidateValidationSubsystem { keystore, config, metrics, pvf_metrics }
138 }
139}
140
141#[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)]
142impl<Context> CandidateValidationSubsystem {
143 fn start(self, ctx: Context) -> SpawnedSubsystem {
144 if let Some(config) = self.config {
145 let future = run(ctx, self.keystore, self.metrics, self.pvf_metrics, config)
146 .map_err(|e| SubsystemError::with_origin("candidate-validation", e))
147 .boxed();
148 SpawnedSubsystem { name: "candidate-validation-subsystem", future }
149 } else {
150 polkadot_overseer::DummySubsystem.start(ctx)
151 }
152 }
153}
154
155async fn claim_queue<Sender>(relay_parent: Hash, sender: &mut Sender) -> Option<ClaimQueueSnapshot>
157where
158 Sender: SubsystemSender<RuntimeApiMessage>,
159{
160 match util::runtime::fetch_claim_queue(sender, relay_parent).await {
161 Ok(cq) => Some(cq),
162 Err(err) => {
163 gum::warn!(
164 target: LOG_TARGET,
165 ?relay_parent,
166 ?err,
167 "Claim queue not available"
168 );
169 None
170 },
171 }
172}
173
174async fn fetch_bomb_limit<Sender>(
188 candidate_descriptor: &CandidateDescriptor,
189 v3_ever_seen: bool,
190 sender: &mut Sender,
191) -> Result<u32, String>
192where
193 Sender: SubsystemSender<RuntimeApiMessage>,
194{
195 let scheduling_parent =
198 candidate_descriptor.scheduling_parent_for_candidate_validation(v3_ever_seen);
199
200 let scheduling_session =
201 match candidate_descriptor.scheduling_session_for_candidate_validation(v3_ever_seen) {
202 Some(session) => session,
203 None => {
204 let Some(session) = get_session_index(sender, scheduling_parent).await else {
206 return Err("Cannot fetch session index from the runtime".into());
207 };
208 session
209 },
210 };
211
212 util::runtime::fetch_validation_code_bomb_limit(scheduling_parent, scheduling_session, sender)
216 .await
217 .map_err(|_| "Cannot fetch validation code bomb limit from the runtime".into())
218}
219
220struct PreValidationOutput {
223 validation_code_bomb_limit: u32,
225 claim_queue: Option<ClaimQueueSnapshot>,
228}
229
230enum PreValidationError {
232 Invalid(InvalidCandidate),
234 RuntimeError(String),
236}
237
238async fn pre_validate_candidate<Sender>(
252 sender: &mut Sender,
253 candidate_receipt: &CandidateReceipt,
254 persisted_validation_data: &PersistedValidationData,
255 pov: &PoV,
256 validation_code_hash: &ValidationCodeHash,
257 exec_kind: PvfExecKind,
258 v3_ever_seen: bool,
259) -> Result<PreValidationOutput, PreValidationError>
260where
261 Sender: SubsystemSender<RuntimeApiMessage>,
262{
263 let validation_code_bomb_limit =
264 fetch_bomb_limit(&candidate_receipt.descriptor, v3_ever_seen, sender)
265 .await
266 .map_err(PreValidationError::RuntimeError)?;
267
268 if let Err(e) = perform_basic_checks(
269 &candidate_receipt.descriptor,
270 persisted_validation_data.max_pov_size,
271 pov,
272 validation_code_hash,
273 ) {
274 return Err(PreValidationError::Invalid(e));
275 }
276
277 let claim_queue = match exec_kind {
278 PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
279 let scheduling_parent = candidate_receipt
280 .descriptor
281 .scheduling_parent_for_candidate_validation(v3_ever_seen);
282
283 let expected_scheduling_session =
285 get_session_index(sender, scheduling_parent).await.ok_or_else(|| {
286 PreValidationError::RuntimeError(
287 "Scheduling session index not found".to_string(),
288 )
289 })?;
290
291 if let Some(scheduling_session) = candidate_receipt
292 .descriptor
293 .scheduling_session_for_candidate_validation(v3_ever_seen)
294 {
295 if scheduling_session != expected_scheduling_session {
296 return Err(PreValidationError::Invalid(
297 InvalidCandidate::InvalidSchedulingSession,
298 ));
299 }
300 }
301
302 if let Some(session_index) = candidate_receipt
306 .descriptor
307 .session_index_for_candidate_validation(v3_ever_seen)
308 {
309 let relay_parent = candidate_receipt.descriptor.relay_parent();
310 match util::check_relay_parent_session(
311 sender,
312 scheduling_parent,
313 session_index,
314 relay_parent,
315 )
316 .await
317 {
318 util::CheckRelayParentSessionResult::Valid => {},
319 util::CheckRelayParentSessionResult::NotSupported => {},
323 util::CheckRelayParentSessionResult::NotFound => {
324 return Err(PreValidationError::Invalid(
325 InvalidCandidate::InvalidRelayParentSession,
326 ))
327 },
328 util::CheckRelayParentSessionResult::RuntimeError(err) => {
329 return Err(PreValidationError::RuntimeError(err))
330 },
331 }
332 }
333
334 let cq = claim_queue(scheduling_parent, sender).await.ok_or_else(|| {
335 PreValidationError::RuntimeError("Claim queue not available".to_string())
336 })?;
337
338 Some(cq)
339 },
340 _ => None,
341 };
342
343 Ok(PreValidationOutput { validation_code_bomb_limit, claim_queue })
344}
345
346fn handle_validation_message<S, V>(
347 mut sender: S,
348 validation_host: V,
349 metrics: Metrics,
350 v3_ever_seen: bool,
351 msg: CandidateValidationMessage,
352) -> Pin<Box<dyn Future<Output = ()> + Send>>
353where
354 S: SubsystemSender<RuntimeApiMessage>,
355 V: ValidationBackend + Clone + Send + 'static,
356{
357 match msg {
358 CandidateValidationMessage::ValidateFromExhaustive {
359 validation_data,
360 validation_code,
361 candidate_receipt,
362 pov,
363 executor_params,
364 exec_kind,
365 response_sender,
366 ..
367 } => async move {
368 let _timer = metrics.time_validate_from_exhaustive();
369
370 let pre = match pre_validate_candidate(
372 &mut sender,
373 &candidate_receipt,
374 &validation_data,
375 &pov,
376 &validation_code.hash(),
377 exec_kind,
378 v3_ever_seen,
379 )
380 .await
381 {
382 Ok(pre) => pre,
383 Err(PreValidationError::Invalid(e)) => {
384 let _ = response_sender.send(Ok(ValidationResult::Invalid(e)));
385 return;
386 },
387 Err(PreValidationError::RuntimeError(err)) => {
388 let _ = response_sender.send(Err(ValidationFailed(err)));
389 return;
390 },
391 };
392
393 let res = validate_candidate(
395 validation_host,
396 validation_data,
397 validation_code,
398 candidate_receipt,
399 pov,
400 executor_params,
401 exec_kind,
402 &metrics,
403 v3_ever_seen,
404 pre,
405 )
406 .await;
407
408 metrics.on_validation_event(&res);
409 let _ = response_sender.send(res);
410 }
411 .boxed(),
412 CandidateValidationMessage::PreCheck {
413 relay_parent,
414 validation_code_hash,
415 response_sender,
416 ..
417 } => async move {
418 let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
419 let error = "cannot fetch session index from the runtime";
420 gum::warn!(
421 target: LOG_TARGET,
422 ?relay_parent,
423 error,
424 );
425
426 let _ = response_sender.send(PreCheckOutcome::Failed);
427 return;
428 };
429
430 let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
433 relay_parent,
434 session_index,
435 &mut sender,
436 )
437 .await
438 else {
439 let error = "cannot fetch validation code bomb limit from the runtime";
440 gum::warn!(
441 target: LOG_TARGET,
442 ?relay_parent,
443 error,
444 );
445
446 let _ = response_sender.send(PreCheckOutcome::Failed);
447 return;
448 };
449
450 let precheck_result = precheck_pvf(
451 &mut sender,
452 validation_host,
453 relay_parent,
454 validation_code_hash,
455 validation_code_bomb_limit,
456 )
457 .await;
458
459 let _ = response_sender.send(precheck_result);
460 }
461 .boxed(),
462 }
463}
464
465#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
466async fn run<Context>(
467 mut ctx: Context,
468 keystore: KeystorePtr,
469 metrics: Metrics,
470 pvf_metrics: polkadot_node_core_pvf::Metrics,
471 Config {
472 artifacts_cache_path,
473 node_version,
474 secure_validator_mode,
475 prep_worker_path,
476 exec_worker_path,
477 pvf_execute_workers_max_num,
478 pvf_prepare_workers_soft_max_num,
479 pvf_prepare_workers_hard_max_num,
480 }: Config,
481) -> SubsystemResult<()> {
482 let (mut validation_host, task) = polkadot_node_core_pvf::start(
483 polkadot_node_core_pvf::Config::new(
484 artifacts_cache_path,
485 node_version,
486 secure_validator_mode,
487 prep_worker_path,
488 exec_worker_path,
489 pvf_execute_workers_max_num,
490 pvf_prepare_workers_soft_max_num,
491 pvf_prepare_workers_hard_max_num,
492 ),
493 pvf_metrics,
494 )
495 .await?;
496 ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
497
498 let mut tasks = FuturesUnordered::new();
499 let mut state = State::default();
500
501 loop {
502 loop {
503 futures::select! {
504 comm = ctx.recv().fuse() => {
505 match comm {
506 Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => {
507 handle_active_leaves_update(
508 ctx.sender(),
509 keystore.clone(),
510 &mut validation_host,
511 update,
512 &mut state,
513 ).await
514 },
515 Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
516 Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
517 Ok(FromOrchestra::Communication { msg }) => {
518 let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), state.v3_ever_seen, msg);
519 tasks.push(task);
520 if tasks.len() >= TASK_LIMIT {
521 break
522 }
523 },
524 Err(e) => return Err(SubsystemError::from(e)),
525 }
526 },
527 _ = tasks.select_next_some() => ()
528 }
529 }
530
531 gum::debug!(target: LOG_TARGET, "Validation task limit hit");
532
533 loop {
534 futures::select! {
535 signal = ctx.recv_signal().fuse() => {
536 match signal {
537 Ok(OverseerSignal::ActiveLeaves(_)) => {},
538 Ok(OverseerSignal::BlockFinalized(..)) => {},
539 Ok(OverseerSignal::Conclude) => return Ok(()),
540 Err(e) => return Err(SubsystemError::from(e)),
541 }
542 },
543 _ = tasks.select_next_some() => {
544 if tasks.len() < TASK_LIMIT {
545 break
546 }
547 }
548 }
549 }
550 }
551}
552
553struct State {
556 session_index: Option<SessionIndex>,
558 v3_ever_seen: bool,
564 pvf_prep: PvfPrepState,
566}
567
568impl Default for State {
569 fn default() -> Self {
570 Self { session_index: None, v3_ever_seen: false, pvf_prep: PvfPrepState::default() }
571 }
572}
573
574struct PvfPrepState {
579 is_next_session_authority: bool,
580 already_prepared_code_hashes: HashSet<ValidationCodeHash>,
582 per_block_limit: usize,
584}
585
586impl Default for PvfPrepState {
587 fn default() -> Self {
588 Self {
589 is_next_session_authority: false,
590 already_prepared_code_hashes: HashSet::new(),
591 per_block_limit: 1,
592 }
593 }
594}
595
596async fn check_v3_feature<Sender>(
599 sender: &mut Sender,
600 relay_parent: Hash,
601 session_index: SessionIndex,
602) -> bool
603where
604 Sender: SubsystemSender<RuntimeApiMessage>,
605{
606 if let Ok(Ok(features)) = request_node_features(relay_parent, session_index, sender).await.await
607 {
608 if FeatureIndex::CandidateReceiptV3.is_set(&features) {
609 gum::info!(
610 target: LOG_TARGET,
611 ?session_index,
612 "CandidateReceiptV3 node feature detected, \
613 switching to V3-aware approval/dispute validation",
614 );
615 return true;
616 }
617 }
618 false
619}
620
621async fn handle_active_leaves_update<Sender>(
622 sender: &mut Sender,
623 keystore: KeystorePtr,
624 validation_host: &mut impl ValidationBackend,
625 update: ActiveLeavesUpdate,
626 state: &mut State,
627) where
628 Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
629{
630 update_active_leaves_validation_backend(sender, validation_host, update.clone()).await;
631
632 let Some(activated) = update.activated else { return };
633 let maybe_session_index = get_session_index(sender, activated.hash).await;
634
635 let new_session = match (state.session_index, maybe_session_index) {
637 (Some(old), Some(new)) => (new > old).then_some(new),
638 (None, Some(new)) => Some(new),
639 _ => None,
640 };
641
642 state.session_index = new_session.or(state.session_index);
643
644 if !state.v3_ever_seen {
646 if let Some(session_index) = new_session {
647 state.v3_ever_seen = check_v3_feature(sender, activated.hash, session_index).await;
648 }
649 }
650
651 maybe_prepare_validation(
653 sender,
654 keystore.clone(),
655 validation_host,
656 activated,
657 &mut state.pvf_prep,
658 new_session,
659 )
660 .await;
661}
662
663async fn maybe_prepare_validation<Sender>(
664 sender: &mut Sender,
665 keystore: KeystorePtr,
666 validation_backend: &mut impl ValidationBackend,
667 leaf: ActivatedLeaf,
668 pvf_prep: &mut PvfPrepState,
669 new_session: Option<SessionIndex>,
670) where
671 Sender: SubsystemSender<RuntimeApiMessage>,
672{
673 if let Some(new_session_index) = new_session {
674 pvf_prep.already_prepared_code_hashes.clear();
675 pvf_prep.is_next_session_authority =
676 check_next_session_authority(sender, keystore, leaf.hash, new_session_index).await;
677 }
678
679 if pvf_prep.is_next_session_authority {
681 let code_hashes = prepare_pvfs_for_backed_candidates(
682 sender,
683 validation_backend,
684 leaf.hash,
685 &pvf_prep.already_prepared_code_hashes,
686 pvf_prep.per_block_limit,
687 )
688 .await;
689 pvf_prep.already_prepared_code_hashes.extend(code_hashes.unwrap_or_default());
690 }
691}
692
693async fn get_session_index<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<SessionIndex>
694where
695 Sender: SubsystemSender<RuntimeApiMessage>,
696{
697 let Ok(Ok(session_index)) =
698 util::request_session_index_for_child(relay_parent, sender).await.await
699 else {
700 gum::warn!(
701 target: LOG_TARGET,
702 ?relay_parent,
703 "cannot fetch session index from runtime API",
704 );
705 return None;
706 };
707
708 Some(session_index)
709}
710
711async fn check_next_session_authority<Sender>(
713 sender: &mut Sender,
714 keystore: KeystorePtr,
715 relay_parent: Hash,
716 session_index: SessionIndex,
717) -> bool
718where
719 Sender: SubsystemSender<RuntimeApiMessage>,
720{
721 let Ok(Ok(authorities)) = util::request_authorities(relay_parent, sender).await.await else {
724 gum::warn!(
725 target: LOG_TARGET,
726 ?relay_parent,
727 "cannot fetch authorities from runtime API",
728 );
729 return false;
730 };
731
732 let Ok(Ok(Some(session_info))) =
734 util::request_session_info(relay_parent, session_index, sender).await.await
735 else {
736 gum::warn!(
737 target: LOG_TARGET,
738 ?relay_parent,
739 "cannot fetch session info from runtime API",
740 );
741 return false;
742 };
743
744 let is_past_present_or_future_authority = authorities
745 .iter()
746 .any(|v| keystore.has_keys(&[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]));
747
748 let is_present_validator = session_info
750 .validators
751 .iter()
752 .any(|v| keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]));
753
754 is_past_present_or_future_authority && !is_present_validator
757}
758
759async fn prepare_pvfs_for_backed_candidates<Sender>(
761 sender: &mut Sender,
762 validation_backend: &mut impl ValidationBackend,
763 relay_parent: Hash,
764 already_prepared: &HashSet<ValidationCodeHash>,
765 per_block_limit: usize,
766) -> Option<Vec<ValidationCodeHash>>
767where
768 Sender: SubsystemSender<RuntimeApiMessage>,
769{
770 let Ok(Ok(events)) = util::request_candidate_events(relay_parent, sender).await.await else {
771 gum::warn!(
772 target: LOG_TARGET,
773 ?relay_parent,
774 "cannot fetch candidate events from runtime API",
775 );
776 return None;
777 };
778 let code_hashes = events
779 .into_iter()
780 .filter_map(|e| match e {
781 CandidateEvent::CandidateBacked(receipt, ..) => {
782 let h = receipt.descriptor.validation_code_hash();
783 if already_prepared.contains(&h) {
784 None
785 } else {
786 Some(h)
787 }
788 },
789 _ => None,
790 })
791 .take(per_block_limit)
792 .collect::<Vec<_>>();
793
794 let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await
795 else {
796 gum::warn!(
797 target: LOG_TARGET,
798 ?relay_parent,
799 "cannot fetch executor params for the session",
800 );
801 return None;
802 };
803 let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
804
805 let mut active_pvfs = vec![];
806 let mut processed_code_hashes = vec![];
807 for code_hash in code_hashes {
808 let Ok(Ok(Some(validation_code))) =
809 util::request_validation_code_by_hash(relay_parent, code_hash, sender)
810 .await
811 .await
812 else {
813 gum::warn!(
814 target: LOG_TARGET,
815 ?relay_parent,
816 ?code_hash,
817 "cannot fetch validation code hash from runtime API",
818 );
819 continue;
820 };
821
822 let Some(session_index) = get_session_index(sender, relay_parent).await else { continue };
823
824 let validation_code_bomb_limit = match util::runtime::fetch_validation_code_bomb_limit(
825 relay_parent,
826 session_index,
827 sender,
828 )
829 .await
830 {
831 Ok(limit) => limit,
832 Err(err) => {
833 gum::warn!(
834 target: LOG_TARGET,
835 ?relay_parent,
836 ?err,
837 "cannot fetch validation code bomb limit from runtime API",
838 );
839 continue;
840 },
841 };
842
843 let pvf = PvfPrepData::from_code(
844 validation_code.0,
845 executor_params.clone(),
846 timeout,
847 PrepareJobKind::Prechecking,
848 validation_code_bomb_limit,
849 );
850
851 active_pvfs.push(pvf);
852 processed_code_hashes.push(code_hash);
853 }
854
855 if active_pvfs.is_empty() {
856 return None;
857 }
858
859 if let Err(err) = validation_backend.heads_up(active_pvfs).await {
860 gum::warn!(
861 target: LOG_TARGET,
862 ?relay_parent,
863 ?err,
864 "cannot prepare PVF for the next session",
865 );
866 return None;
867 };
868
869 gum::debug!(
870 target: LOG_TARGET,
871 ?relay_parent,
872 ?processed_code_hashes,
873 "Prepared PVF for the next session",
874 );
875
876 Some(processed_code_hashes)
877}
878
879async fn update_active_leaves_validation_backend<Sender>(
880 sender: &mut Sender,
881 validation_backend: &mut impl ValidationBackend,
882 update: ActiveLeavesUpdate,
883) where
884 Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
885{
886 let ancestors = if let Some(ref activated) = update.activated {
887 get_block_ancestors(sender, activated.hash).await
888 } else {
889 vec![]
890 };
891 if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await {
892 gum::warn!(
893 target: LOG_TARGET,
894 ?err,
895 "cannot update active leaves in validation backend",
896 );
897 };
898}
899
900async fn get_block_ancestors<Sender>(sender: &mut Sender, leaf: Hash) -> Vec<Hash>
905where
906 Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
907{
908 let Some(session_index) = get_session_index(sender, leaf).await else {
909 gum::warn!(target: LOG_TARGET, ?leaf, "Failed to request session index for leaf.");
910 return vec![];
911 };
912 let scheduling_lookahead = match fetch_scheduling_lookahead(leaf, session_index, sender).await {
913 Ok(scheduling_lookahead) => scheduling_lookahead,
914 res => {
915 gum::warn!(target: LOG_TARGET, ?res, "Failed to request scheduling lookahead");
916 return vec![];
917 },
918 };
919
920 let (tx, rx) = oneshot::channel();
921 sender
922 .send_message(ChainApiMessage::Ancestors {
923 hash: leaf,
924 k: scheduling_lookahead.saturating_sub(1) as usize,
926 response_channel: tx,
927 })
928 .await;
929 match rx.await {
930 Ok(Ok(x)) => x,
931 res => {
932 gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors");
933 vec![]
934 },
935 }
936}
937
938struct RuntimeRequestFailed;
939
940async fn runtime_api_request<T, Sender>(
941 sender: &mut Sender,
942 relay_parent: Hash,
943 request: RuntimeApiRequest,
944 receiver: oneshot::Receiver<Result<T, RuntimeApiError>>,
945) -> Result<T, RuntimeRequestFailed>
946where
947 Sender: SubsystemSender<RuntimeApiMessage>,
948{
949 sender
950 .send_message(RuntimeApiMessage::Request(relay_parent, request).into())
951 .await;
952
953 receiver
954 .await
955 .map_err(|_| {
956 gum::debug!(target: LOG_TARGET, ?relay_parent, "Runtime API request dropped");
957
958 RuntimeRequestFailed
959 })
960 .and_then(|res| {
961 res.map_err(|e| {
962 gum::debug!(
963 target: LOG_TARGET,
964 ?relay_parent,
965 err = ?e,
966 "Runtime API request internal error"
967 );
968
969 RuntimeRequestFailed
970 })
971 })
972}
973
974async fn request_validation_code_by_hash<Sender>(
975 sender: &mut Sender,
976 relay_parent: Hash,
977 validation_code_hash: ValidationCodeHash,
978) -> Result<Option<ValidationCode>, RuntimeRequestFailed>
979where
980 Sender: SubsystemSender<RuntimeApiMessage>,
981{
982 let (tx, rx) = oneshot::channel();
983 runtime_api_request(
984 sender,
985 relay_parent,
986 RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
987 rx,
988 )
989 .await
990}
991
992async fn precheck_pvf<Sender>(
993 sender: &mut Sender,
994 mut validation_backend: impl ValidationBackend,
995 relay_parent: Hash,
996 validation_code_hash: ValidationCodeHash,
997 validation_code_bomb_limit: u32,
998) -> PreCheckOutcome
999where
1000 Sender: SubsystemSender<RuntimeApiMessage>,
1001{
1002 let validation_code =
1003 match request_validation_code_by_hash(sender, relay_parent, validation_code_hash).await {
1004 Ok(Some(code)) => code,
1005 _ => {
1006 gum::warn!(
1010 target: LOG_TARGET,
1011 ?relay_parent,
1012 ?validation_code_hash,
1013 "precheck: requested validation code is not found on-chain!",
1014 );
1015 return PreCheckOutcome::Failed;
1016 },
1017 };
1018
1019 let executor_params = if let Ok(executor_params) =
1020 util::executor_params_at_relay_parent(relay_parent, sender).await
1021 {
1022 gum::debug!(
1023 target: LOG_TARGET,
1024 ?relay_parent,
1025 ?validation_code_hash,
1026 "precheck: acquired executor params for the session: {:?}",
1027 executor_params,
1028 );
1029 executor_params
1030 } else {
1031 gum::warn!(
1032 target: LOG_TARGET,
1033 ?relay_parent,
1034 ?validation_code_hash,
1035 "precheck: failed to acquire executor params for the session, thus voting against.",
1036 );
1037 return PreCheckOutcome::Invalid;
1038 };
1039
1040 let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);
1041
1042 let pvf = PvfPrepData::from_code(
1043 validation_code.0,
1044 executor_params,
1045 timeout,
1046 PrepareJobKind::Prechecking,
1047 validation_code_bomb_limit,
1048 );
1049
1050 match validation_backend.precheck_pvf(pvf).await {
1051 Ok(_) => PreCheckOutcome::Valid,
1052 Err(prepare_err) => {
1053 if prepare_err.is_deterministic() {
1054 PreCheckOutcome::Invalid
1055 } else {
1056 PreCheckOutcome::Failed
1057 }
1058 },
1059 }
1060}
1061
1062async fn validate_candidate(
1070 mut validation_backend: impl ValidationBackend + Send,
1071 persisted_validation_data: PersistedValidationData,
1072 validation_code: ValidationCode,
1073 candidate_receipt: CandidateReceipt,
1074 pov: Arc<PoV>,
1075 executor_params: ExecutorParams,
1076 exec_kind: PvfExecKind,
1077 metrics: &Metrics,
1078 v3_seen: bool,
1079 pre: PreValidationOutput,
1080) -> Result<ValidationResult, ValidationFailed> {
1081 let _timer = metrics.time_validate_candidate_exhaustive();
1082 let para_id = candidate_receipt.descriptor.para_id();
1083 let candidate_hash = candidate_receipt.hash();
1084
1085 gum::debug!(
1086 target: LOG_TARGET,
1087 ?candidate_hash,
1088 ?para_id,
1089 "About to validate a candidate.",
1090 );
1091
1092 let persisted_validation_data = Arc::new(persisted_validation_data);
1093
1094 let validation_context = ValidationContext {
1096 candidate_receipt: candidate_receipt.clone(),
1097 pvd: persisted_validation_data.clone(),
1098 pov: pov.clone(),
1099 executor_params: executor_params.clone(),
1100 exec_timeout: pvf_exec_timeout(&executor_params, exec_kind.into()),
1101 v3_seen,
1102 };
1103
1104 let result = match exec_kind {
1105 PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
1108 let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
1109 let pvf = PvfPrepData::from_code(
1110 validation_code.0,
1111 executor_params,
1112 prep_timeout,
1113 PrepareJobKind::Compilation,
1114 pre.validation_code_bomb_limit,
1115 );
1116
1117 validation_backend.validate_candidate(pvf, validation_context, exec_kind).await
1118 },
1119 PvfExecKind::Approval | PvfExecKind::Dispute => {
1120 validation_backend
1121 .validate_candidate_with_retry(
1122 validation_code.0,
1123 validation_context,
1124 PVF_APPROVAL_EXECUTION_RETRY_DELAY,
1125 exec_kind,
1126 pre.validation_code_bomb_limit,
1127 )
1128 .await
1129 },
1130 };
1131
1132 if let Err(ref error) = result {
1133 gum::info!(target: LOG_TARGET, ?para_id, ?candidate_hash, ?error, "Failed to validate candidate");
1134 }
1135
1136 match result {
1137 Err(ValidationError::Internal(e)) => {
1138 gum::warn!(
1139 target: LOG_TARGET,
1140 ?para_id,
1141 ?candidate_hash,
1142 ?e,
1143 "An internal error occurred during validation, will abstain from voting",
1144 );
1145 Err(ValidationFailed(e.to_string()))
1146 },
1147 Err(ValidationError::Invalid(WasmInvalidCandidate::HardTimeout)) => {
1148 Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))
1149 },
1150 Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) => {
1151 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e)))
1152 },
1153 Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) => {
1154 Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))
1155 },
1156 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) => {
1157 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
1158 "ambiguous worker death".to_string(),
1159 )))
1160 },
1161 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))) => {
1162 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err)))
1163 },
1164 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) => {
1165 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err)))
1166 },
1167 Err(ValidationError::PossiblyInvalid(err @ PossiblyInvalidError::CorruptedArtifact)) => {
1168 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err.to_string())))
1169 },
1170
1171 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) => {
1172 Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
1173 "ambiguous job death: {err}"
1174 ))))
1175 },
1176 Err(ValidationError::Preparation(e)) => {
1177 gum::warn!(
1178 target: LOG_TARGET,
1179 ?para_id,
1180 ?e,
1181 "Deterministic error occurred during preparation (should have been ruled out by pre-checking phase)",
1182 );
1183 Err(ValidationFailed(e.to_string()))
1184 },
1185 Err(e @ ValidationError::ExecutionDeadline) => {
1186 gum::warn!(
1187 target: LOG_TARGET,
1188 ?para_id,
1189 ?e,
1190 "Job assigned too late, execution queue probably overloaded",
1191 );
1192 Err(ValidationFailed(e.to_string()))
1193 },
1194 Ok(res) => {
1195 if res.head_data.hash() != candidate_receipt.descriptor.para_head() {
1196 gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)");
1197 Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch))
1198 } else {
1199 let committed_candidate_receipt = CommittedCandidateReceipt {
1200 descriptor: candidate_receipt.descriptor.clone(),
1201 commitments: CandidateCommitments {
1202 head_data: res.head_data,
1203 upward_messages: res.upward_messages,
1204 horizontal_messages: res.horizontal_messages,
1205 new_validation_code: res.new_validation_code,
1206 processed_downward_messages: res.processed_downward_messages,
1207 hrmp_watermark: res.hrmp_watermark,
1208 },
1209 };
1210
1211 if candidate_receipt.commitments_hash !=
1212 committed_candidate_receipt.commitments.hash()
1213 {
1214 gum::info!(
1215 target: LOG_TARGET,
1216 ?para_id,
1217 ?candidate_hash,
1218 "Invalid candidate (commitments hash)"
1219 );
1220
1221 gum::trace!(
1222 target: LOG_TARGET,
1223 ?para_id,
1224 ?candidate_hash,
1225 produced_commitments = ?committed_candidate_receipt.commitments,
1226 "Invalid candidate commitments"
1227 );
1228
1229 Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
1232 } else {
1233 if let Some(claim_queue) = &pre.claim_queue {
1235 if let Err(err) = committed_candidate_receipt
1236 .parse_ump_signals(&transpose_claim_queue(claim_queue.0.clone()))
1237 {
1238 gum::warn!(
1239 target: LOG_TARGET,
1240 candidate_hash = ?candidate_receipt.hash(),
1241 "Invalid UMP signals: {}",
1242 err
1243 );
1244 return Ok(ValidationResult::Invalid(
1245 InvalidCandidate::InvalidUMPSignals(err),
1246 ));
1247 }
1248 }
1249
1250 Ok(ValidationResult::Valid(
1251 committed_candidate_receipt.commitments,
1252 (*persisted_validation_data).clone(),
1253 ))
1254 }
1255 }
1256 },
1257 }
1258}
1259
1260#[async_trait]
1261trait ValidationBackend {
1262 async fn validate_candidate(
1264 &mut self,
1265 pvf: PvfPrepData,
1266 validation_context: ValidationContext,
1267 exec_kind: PvfExecKind,
1268 ) -> Result<WasmValidationResult, ValidationError>;
1269
1270 async fn validate_candidate_with_retry(
1276 &mut self,
1277 code: Vec<u8>,
1278 validation_context: ValidationContext,
1279 retry_delay: Duration,
1280 exec_kind: PvfExecKind,
1281 validation_code_bomb_limit: u32,
1282 ) -> Result<WasmValidationResult, ValidationError> {
1283 let exec_timeout = validation_context.exec_timeout;
1284 let executor_params = validation_context.executor_params.clone();
1285 let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
1286 let pvf = PvfPrepData::from_code(
1288 code,
1289 executor_params,
1290 prep_timeout,
1291 PrepareJobKind::Compilation,
1292 validation_code_bomb_limit,
1293 );
1294 let total_time_start = Instant::now();
1297
1298 let mut validation_result = self
1299 .validate_candidate(pvf.clone(), validation_context.clone(), exec_kind)
1300 .await;
1301 if validation_result.is_ok() {
1302 return validation_result;
1303 }
1304
1305 macro_rules! break_if_no_retries_left {
1306 ($counter:ident) => {
1307 if $counter > 0 {
1308 $counter -= 1;
1309 } else {
1310 break;
1311 }
1312 };
1313 }
1314
1315 let mut num_death_retries_left = 1;
1317 let mut num_job_error_retries_left = 1;
1318 let mut num_internal_retries_left = 1;
1319 let mut num_execution_error_retries_left = 1;
1320 loop {
1321 if total_time_start.elapsed() + retry_delay > exec_timeout {
1323 break;
1324 }
1325 let mut retry_immediately = false;
1326 match validation_result {
1327 Err(ValidationError::PossiblyInvalid(
1328 PossiblyInvalidError::AmbiguousWorkerDeath |
1329 PossiblyInvalidError::AmbiguousJobDeath(_),
1330 )) => break_if_no_retries_left!(num_death_retries_left),
1331
1332 Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(_))) => {
1333 break_if_no_retries_left!(num_job_error_retries_left)
1334 },
1335
1336 Err(ValidationError::Internal(_)) => {
1337 break_if_no_retries_left!(num_internal_retries_left)
1338 },
1339
1340 Err(ValidationError::PossiblyInvalid(
1341 PossiblyInvalidError::RuntimeConstruction(_) |
1342 PossiblyInvalidError::CorruptedArtifact,
1343 )) => {
1344 break_if_no_retries_left!(num_execution_error_retries_left);
1345 self.precheck_pvf(pvf.clone()).await?;
1346 retry_immediately = true;
1351 },
1352
1353 Ok(_) |
1354 Err(
1355 ValidationError::Invalid(_) |
1356 ValidationError::Preparation(_) |
1357 ValidationError::ExecutionDeadline,
1358 ) => break,
1359 }
1360
1361 {
1364 if !retry_immediately {
1367 futures_timer::Delay::new(retry_delay).await;
1368 }
1369
1370 let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());
1371
1372 gum::warn!(
1373 target: LOG_TARGET,
1374 ?pvf,
1375 ?new_timeout,
1376 "Re-trying failed candidate validation due to possible transient error: {:?}",
1377 validation_result
1378 );
1379
1380 let mut retry_context = validation_context.clone();
1382 retry_context.exec_timeout = new_timeout;
1383
1384 validation_result =
1385 self.validate_candidate(pvf.clone(), retry_context, exec_kind).await;
1386 }
1387 }
1388
1389 validation_result
1390 }
1391
1392 async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;
1393
1394 async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String>;
1395
1396 async fn update_active_leaves(
1401 &mut self,
1402 update: ActiveLeavesUpdate,
1403 ancestors: Vec<Hash>,
1404 ) -> Result<(), String>;
1405}
1406
1407#[async_trait]
1408impl ValidationBackend for ValidationHost {
1409 async fn validate_candidate(
1411 &mut self,
1412 pvf: PvfPrepData,
1413 validation_context: ValidationContext,
1414 exec_kind: PvfExecKind,
1415 ) -> Result<WasmValidationResult, ValidationError> {
1416 let (tx, rx) = oneshot::channel();
1417 if let Err(err) =
1418 self.execute_pvf(pvf, validation_context, exec_kind.into(), exec_kind, tx).await
1419 {
1420 return Err(InternalValidationError::HostCommunication(format!(
1421 "cannot send pvf to the validation host, it might have shut down: {:?}",
1422 err
1423 ))
1424 .into());
1425 }
1426
1427 rx.await.map_err(|_| {
1428 ValidationError::from(InternalValidationError::HostCommunication(
1429 "validation was cancelled".into(),
1430 ))
1431 })?
1432 }
1433
1434 async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError> {
1435 let (tx, rx) = oneshot::channel();
1436 if let Err(err) = self.precheck_pvf(pvf, tx).await {
1437 return Err(PrepareError::IoErr(err));
1439 }
1440
1441 let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;
1442
1443 precheck_result
1444 }
1445
1446 async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
1447 self.heads_up(active_pvfs).await
1448 }
1449
1450 async fn update_active_leaves(
1451 &mut self,
1452 update: ActiveLeavesUpdate,
1453 ancestors: Vec<Hash>,
1454 ) -> Result<(), String> {
1455 self.update_active_leaves(update, ancestors).await
1456 }
1457}
1458
1459fn perform_basic_checks(
1462 candidate: &CandidateDescriptor,
1463 max_pov_size: u32,
1464 pov: &PoV,
1465 validation_code_hash: &ValidationCodeHash,
1466) -> Result<(), InvalidCandidate> {
1467 let pov_hash = pov.hash();
1468
1469 let encoded_pov_size = pov.encoded_size();
1470 if encoded_pov_size > max_pov_size as usize {
1471 return Err(InvalidCandidate::ParamsTooLarge(encoded_pov_size as u64));
1472 }
1473
1474 if pov_hash != candidate.pov_hash() {
1475 return Err(InvalidCandidate::PoVHashMismatch);
1476 }
1477
1478 if *validation_code_hash != candidate.validation_code_hash() {
1479 return Err(InvalidCandidate::CodeHashMismatch);
1480 }
1481
1482 Ok(())
1483}
1484
1485fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepKind) -> Duration {
1495 if let Some(timeout) = executor_params.pvf_prep_timeout(kind) {
1496 return timeout;
1497 }
1498 match kind {
1499 PvfPrepKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
1500 PvfPrepKind::Prepare => DEFAULT_LENIENT_PREPARATION_TIMEOUT,
1501 }
1502}
1503
1504fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: RuntimePvfExecKind) -> Duration {
1515 if let Some(timeout) = executor_params.pvf_exec_timeout(kind) {
1516 return timeout;
1517 }
1518 match kind {
1519 RuntimePvfExecKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT,
1520 RuntimePvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
1521 }
1522}