Skip to main content

polkadot_node_core_candidate_validation/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The Candidate Validation subsystem.
18//!
19//! This handles incoming requests from other subsystems to validate candidates
20//! according to a validation function. This delegates validation to an underlying
21//! pool of processes used for execution of the Wasm.
22
23#![deny(unused_crate_dependencies, unused_results)]
24#![warn(missing_docs)]
25
26use polkadot_node_core_pvf::{
27	InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError,
28	PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost,
29};
30use polkadot_node_core_pvf_common::execute::ValidationContext;
31use polkadot_node_primitives::{InvalidCandidate, PoV, ValidationResult};
32use polkadot_node_subsystem::{
33	errors::RuntimeApiError,
34	messages::{
35		CandidateValidationMessage, ChainApiMessage, PreCheckOutcome, PvfExecKind,
36		RuntimeApiMessage, RuntimeApiRequest, ValidationFailed,
37	},
38	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
39	SubsystemSender,
40};
41use polkadot_node_subsystem_util::{
42	self as util, request_node_features,
43	runtime::{fetch_scheduling_lookahead, ClaimQueueSnapshot},
44};
45use polkadot_overseer::{ActivatedLeaf, ActiveLeavesUpdate};
46use polkadot_parachain_primitives::primitives::ValidationResult as WasmValidationResult;
47use polkadot_primitives::{
48	executor_params::{
49		DEFAULT_APPROVAL_EXECUTION_TIMEOUT, DEFAULT_BACKING_EXECUTION_TIMEOUT,
50		DEFAULT_LENIENT_PREPARATION_TIMEOUT, DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
51	},
52	node_features::FeatureIndex,
53	transpose_claim_queue, AuthorityDiscoveryId, CandidateCommitments,
54	CandidateDescriptorV2 as CandidateDescriptor, CandidateEvent,
55	CandidateReceiptV2 as CandidateReceipt,
56	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, ExecutorParams, Hash,
57	PersistedValidationData, PvfExecKind as RuntimePvfExecKind, PvfPrepKind, SessionIndex,
58	ValidationCode, ValidationCodeHash, ValidatorId,
59};
60use sp_application_crypto::{AppCrypto, ByteArray};
61use sp_keystore::KeystorePtr;
62
63use codec::Encode;
64
65use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered};
66
67use std::{
68	collections::HashSet,
69	path::PathBuf,
70	pin::Pin,
71	sync::Arc,
72	time::{Duration, Instant},
73};
74
75use async_trait::async_trait;
76
77mod metrics;
78use self::metrics::Metrics;
79
80#[cfg(test)]
81mod tests;
82
83const LOG_TARGET: &'static str = "parachain::candidate-validation";
84
85/// The amount of time to wait before retrying after a retry-able approval validation error. We use
86/// a higher value for the approval case since we have more time, and if we wait longer it is more
87/// likely that transient conditions will resolve.
88#[cfg(not(test))]
89const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3);
90#[cfg(test)]
91const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200);
92
93// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size
94// to allow exhaustive validation messages to fall through in case the tasks are clogged
95const TASK_LIMIT: usize = 30;
96
97/// Configuration for the candidate validation subsystem
98#[derive(Clone, Default)]
99pub struct Config {
100	/// The path where candidate validation can store compiled artifacts for PVFs.
101	pub artifacts_cache_path: PathBuf,
102	/// The version of the node. `None` can be passed to skip the version check (only for tests).
103	pub node_version: Option<String>,
104	/// Whether the node is attempting to run as a secure validator.
105	pub secure_validator_mode: bool,
106	/// Path to the preparation worker binary
107	pub prep_worker_path: PathBuf,
108	/// Path to the execution worker binary
109	pub exec_worker_path: PathBuf,
110	/// The maximum number of pvf execution workers.
111	pub pvf_execute_workers_max_num: usize,
112	/// The maximum number of pvf workers that can be spawned in the pvf prepare pool for tasks
113	/// with the priority below critical.
114	pub pvf_prepare_workers_soft_max_num: usize,
115	/// The absolute number of pvf workers that can be spawned in the pvf prepare pool.
116	pub pvf_prepare_workers_hard_max_num: usize,
117}
118
119/// The candidate validation subsystem.
120pub struct CandidateValidationSubsystem {
121	keystore: KeystorePtr,
122	#[allow(missing_docs)]
123	pub metrics: Metrics,
124	#[allow(missing_docs)]
125	pub pvf_metrics: polkadot_node_core_pvf::Metrics,
126	config: Option<Config>,
127}
128
129impl CandidateValidationSubsystem {
130	/// Create a new `CandidateValidationSubsystem`.
131	pub fn with_config(
132		config: Option<Config>,
133		keystore: KeystorePtr,
134		metrics: Metrics,
135		pvf_metrics: polkadot_node_core_pvf::Metrics,
136	) -> Self {
137		CandidateValidationSubsystem { keystore, config, metrics, pvf_metrics }
138	}
139}
140
141#[overseer::subsystem(CandidateValidation, error=SubsystemError, prefix=self::overseer)]
142impl<Context> CandidateValidationSubsystem {
143	fn start(self, ctx: Context) -> SpawnedSubsystem {
144		if let Some(config) = self.config {
145			let future = run(ctx, self.keystore, self.metrics, self.pvf_metrics, config)
146				.map_err(|e| SubsystemError::with_origin("candidate-validation", e))
147				.boxed();
148			SpawnedSubsystem { name: "candidate-validation-subsystem", future }
149		} else {
150			polkadot_overseer::DummySubsystem.start(ctx)
151		}
152	}
153}
154
155// Returns the claim queue at relay parent and logs a warning if it is not available.
156async fn claim_queue<Sender>(relay_parent: Hash, sender: &mut Sender) -> Option<ClaimQueueSnapshot>
157where
158	Sender: SubsystemSender<RuntimeApiMessage>,
159{
160	match util::runtime::fetch_claim_queue(sender, relay_parent).await {
161		Ok(cq) => Some(cq),
162		Err(err) => {
163			gum::warn!(
164				target: LOG_TARGET,
165				?relay_parent,
166				?err,
167				"Claim queue not available"
168			);
169			None
170		},
171	}
172}
173
174/// Fetch the validation code bomb limit for a candidate.
175///
176/// NOTE: This method is fetching state from the scheduling parent. Fetching state for the
177/// scheduling or relay parent of a candidate is not sound in disputes! This is necessary as of now
178/// though, as the provided runtime API does not allow fetching for older sessions. For the time
179/// being, we at least use the scheduling parent as this is more likely to still be around than the
180/// relay parent.
181///
182/// For what session to pick (to be fetched via an active leaf, not scheduling nor relay parent): In
183/// principle both the scheduling session and the execution session would be sensible choices here
184/// for fetching the limit, all that matters is that we have consensus among validators. For
185/// parachain block confidence, decreasing the value would be problematic in both cases. For
186/// increased values, all that matters is consensus.
187async fn fetch_bomb_limit<Sender>(
188	candidate_descriptor: &CandidateDescriptor,
189	v3_ever_seen: bool,
190	sender: &mut Sender,
191) -> Result<u32, String>
192where
193	Sender: SubsystemSender<RuntimeApiMessage>,
194{
195	// NOTE: As noted above, even looking at the scheduling parent in disputes context should be
196	// suspicious normally!
197	let scheduling_parent =
198		candidate_descriptor.scheduling_parent_for_candidate_validation(v3_ever_seen);
199
200	let scheduling_session =
201		match candidate_descriptor.scheduling_session_for_candidate_validation(v3_ever_seen) {
202			Some(session) => session,
203			None => {
204				// NOTE: This is depending on scheduling parent state to still be around!
205				let Some(session) = get_session_index(sender, scheduling_parent).await else {
206					return Err("Cannot fetch session index from the runtime".into());
207				};
208				session
209			},
210		};
211
212	// Returns a default value if the runtime API is not available for this session,
213	// but errors on unexpected runtime API failures.
214	// NOTE: This is depending on scheduling parent state to still be around!
215	util::runtime::fetch_validation_code_bomb_limit(scheduling_parent, scheduling_session, sender)
216		.await
217		.map_err(|_| "Cannot fetch validation code bomb limit from the runtime".into())
218}
219
220/// Output of [`pre_validate_candidate`]: data needed by PVF execution and
221/// post-validation.
222struct PreValidationOutput {
223	/// Validation code bomb limit for PVF preparation.
224	validation_code_bomb_limit: u32,
225	/// Claim queue for backing-only UMP signal post-validation. `None` for
226	/// approval/dispute.
227	claim_queue: Option<ClaimQueueSnapshot>,
228}
229
230/// Errors from [`pre_validate_candidate`].
231enum PreValidationError {
232	/// The candidate is definitively invalid.
233	Invalid(InvalidCandidate),
234	/// A runtime API call failed — cannot determine validity.
235	RuntimeError(String),
236}
237
238/// Pre-validate a candidate before PVF execution.
239///
240/// Performs all checks that don't require running the PVF:
241/// - Fetch validation code bomb limit (fetched from runtime)
242/// - Basic checks: PoV size, PoV hash, validation code hash
243/// - Backing-only (skipped for approval/dispute):
244///   - Scheduling session matches runtime
245///   - Relay parent valid in claimed session (via `check_relay_parent_session` utility)
246///   - Claim queue fetch
247///
248/// Backing-only checks are skipped for approval/dispute because the runtime
249/// validates them at backing time and the chain state they depend on may not
250/// be available in disputes.
251async fn pre_validate_candidate<Sender>(
252	sender: &mut Sender,
253	candidate_receipt: &CandidateReceipt,
254	persisted_validation_data: &PersistedValidationData,
255	pov: &PoV,
256	validation_code_hash: &ValidationCodeHash,
257	exec_kind: PvfExecKind,
258	v3_ever_seen: bool,
259) -> Result<PreValidationOutput, PreValidationError>
260where
261	Sender: SubsystemSender<RuntimeApiMessage>,
262{
263	let validation_code_bomb_limit =
264		fetch_bomb_limit(&candidate_receipt.descriptor, v3_ever_seen, sender)
265			.await
266			.map_err(PreValidationError::RuntimeError)?;
267
268	if let Err(e) = perform_basic_checks(
269		&candidate_receipt.descriptor,
270		persisted_validation_data.max_pov_size,
271		pov,
272		validation_code_hash,
273	) {
274		return Err(PreValidationError::Invalid(e));
275	}
276
277	let claim_queue = match exec_kind {
278		PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
279			let scheduling_parent = candidate_receipt
280				.descriptor
281				.scheduling_parent_for_candidate_validation(v3_ever_seen);
282
283			// Verify scheduling session.
284			let expected_scheduling_session =
285				get_session_index(sender, scheduling_parent).await.ok_or_else(|| {
286					PreValidationError::RuntimeError(
287						"Scheduling session index not found".to_string(),
288					)
289				})?;
290
291			if let Some(scheduling_session) = candidate_receipt
292				.descriptor
293				.scheduling_session_for_candidate_validation(v3_ever_seen)
294			{
295				if scheduling_session != expected_scheduling_session {
296					return Err(PreValidationError::Invalid(
297						InvalidCandidate::InvalidSchedulingSession,
298					));
299				}
300			}
301
302			// Verify relay parent is valid in the claimed session.
303			// Uses the node-side utility which handles both the self-query case
304			// (scheduling_parent == relay_parent, V2) and ancestor queries (V3).
305			if let Some(session_index) = candidate_receipt
306				.descriptor
307				.session_index_for_candidate_validation(v3_ever_seen)
308			{
309				let relay_parent = candidate_receipt.descriptor.relay_parent();
310				match util::check_relay_parent_session(
311					sender,
312					scheduling_parent,
313					session_index,
314					relay_parent,
315				)
316				.await
317				{
318					util::CheckRelayParentSessionResult::Valid => {},
319					// Safe to skip: on old runtimes cross-session relay parents don't
320					// exist, and the scheduling session check above already covers the
321					// relay parent session (scheduling_parent == relay_parent).
322					util::CheckRelayParentSessionResult::NotSupported => {},
323					util::CheckRelayParentSessionResult::NotFound => {
324						return Err(PreValidationError::Invalid(
325							InvalidCandidate::InvalidRelayParentSession,
326						))
327					},
328					util::CheckRelayParentSessionResult::RuntimeError(err) => {
329						return Err(PreValidationError::RuntimeError(err))
330					},
331				}
332			}
333
334			let cq = claim_queue(scheduling_parent, sender).await.ok_or_else(|| {
335				PreValidationError::RuntimeError("Claim queue not available".to_string())
336			})?;
337
338			Some(cq)
339		},
340		_ => None,
341	};
342
343	Ok(PreValidationOutput { validation_code_bomb_limit, claim_queue })
344}
345
346fn handle_validation_message<S, V>(
347	mut sender: S,
348	validation_host: V,
349	metrics: Metrics,
350	v3_ever_seen: bool,
351	msg: CandidateValidationMessage,
352) -> Pin<Box<dyn Future<Output = ()> + Send>>
353where
354	S: SubsystemSender<RuntimeApiMessage>,
355	V: ValidationBackend + Clone + Send + 'static,
356{
357	match msg {
358		CandidateValidationMessage::ValidateFromExhaustive {
359			validation_data,
360			validation_code,
361			candidate_receipt,
362			pov,
363			executor_params,
364			exec_kind,
365			response_sender,
366			..
367		} => async move {
368			let _timer = metrics.time_validate_from_exhaustive();
369
370			// Phase 1: Pre-validation — cheap checks, fail fast before PVF.
371			let pre = match pre_validate_candidate(
372				&mut sender,
373				&candidate_receipt,
374				&validation_data,
375				&pov,
376				&validation_code.hash(),
377				exec_kind,
378				v3_ever_seen,
379			)
380			.await
381			{
382				Ok(pre) => pre,
383				Err(PreValidationError::Invalid(e)) => {
384					let _ = response_sender.send(Ok(ValidationResult::Invalid(e)));
385					return;
386				},
387				Err(PreValidationError::RuntimeError(err)) => {
388					let _ = response_sender.send(Err(ValidationFailed(err)));
389					return;
390				},
391			};
392
393			// Phase 2: PVF execution + output validation.
394			let res = validate_candidate(
395				validation_host,
396				validation_data,
397				validation_code,
398				candidate_receipt,
399				pov,
400				executor_params,
401				exec_kind,
402				&metrics,
403				v3_ever_seen,
404				pre,
405			)
406			.await;
407
408			metrics.on_validation_event(&res);
409			let _ = response_sender.send(res);
410		}
411		.boxed(),
412		CandidateValidationMessage::PreCheck {
413			relay_parent,
414			validation_code_hash,
415			response_sender,
416			..
417		} => async move {
418			let Some(session_index) = get_session_index(&mut sender, relay_parent).await else {
419				let error = "cannot fetch session index from the runtime";
420				gum::warn!(
421					target: LOG_TARGET,
422					?relay_parent,
423					error,
424				);
425
426				let _ = response_sender.send(PreCheckOutcome::Failed);
427				return;
428			};
429
430			// This will return a default value for the limit if runtime API is not available.
431			// however we still error out if there is a weird runtime API error.
432			let Ok(validation_code_bomb_limit) = util::runtime::fetch_validation_code_bomb_limit(
433				relay_parent,
434				session_index,
435				&mut sender,
436			)
437			.await
438			else {
439				let error = "cannot fetch validation code bomb limit from the runtime";
440				gum::warn!(
441					target: LOG_TARGET,
442					?relay_parent,
443					error,
444				);
445
446				let _ = response_sender.send(PreCheckOutcome::Failed);
447				return;
448			};
449
450			let precheck_result = precheck_pvf(
451				&mut sender,
452				validation_host,
453				relay_parent,
454				validation_code_hash,
455				validation_code_bomb_limit,
456			)
457			.await;
458
459			let _ = response_sender.send(precheck_result);
460		}
461		.boxed(),
462	}
463}
464
465#[overseer::contextbounds(CandidateValidation, prefix = self::overseer)]
466async fn run<Context>(
467	mut ctx: Context,
468	keystore: KeystorePtr,
469	metrics: Metrics,
470	pvf_metrics: polkadot_node_core_pvf::Metrics,
471	Config {
472		artifacts_cache_path,
473		node_version,
474		secure_validator_mode,
475		prep_worker_path,
476		exec_worker_path,
477		pvf_execute_workers_max_num,
478		pvf_prepare_workers_soft_max_num,
479		pvf_prepare_workers_hard_max_num,
480	}: Config,
481) -> SubsystemResult<()> {
482	let (mut validation_host, task) = polkadot_node_core_pvf::start(
483		polkadot_node_core_pvf::Config::new(
484			artifacts_cache_path,
485			node_version,
486			secure_validator_mode,
487			prep_worker_path,
488			exec_worker_path,
489			pvf_execute_workers_max_num,
490			pvf_prepare_workers_soft_max_num,
491			pvf_prepare_workers_hard_max_num,
492		),
493		pvf_metrics,
494	)
495	.await?;
496	ctx.spawn_blocking("pvf-validation-host", task.boxed())?;
497
498	let mut tasks = FuturesUnordered::new();
499	let mut state = State::default();
500
501	loop {
502		loop {
503			futures::select! {
504				comm = ctx.recv().fuse() => {
505					match comm {
506						Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update))) => {
507							handle_active_leaves_update(
508								ctx.sender(),
509								keystore.clone(),
510								&mut validation_host,
511								update,
512								&mut state,
513							).await
514						},
515						Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {},
516						Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()),
517						Ok(FromOrchestra::Communication { msg }) => {
518							let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), state.v3_ever_seen, msg);
519							tasks.push(task);
520							if tasks.len() >= TASK_LIMIT {
521								break
522							}
523						},
524						Err(e) => return Err(SubsystemError::from(e)),
525					}
526				},
527				_ = tasks.select_next_some() => ()
528			}
529		}
530
531		gum::debug!(target: LOG_TARGET, "Validation task limit hit");
532
533		loop {
534			futures::select! {
535				signal = ctx.recv_signal().fuse() => {
536					match signal {
537						Ok(OverseerSignal::ActiveLeaves(_)) => {},
538						Ok(OverseerSignal::BlockFinalized(..)) => {},
539						Ok(OverseerSignal::Conclude) => return Ok(()),
540						Err(e) => return Err(SubsystemError::from(e)),
541					}
542				},
543				_ = tasks.select_next_some() => {
544					if tasks.len() < TASK_LIMIT {
545						break
546					}
547				}
548			}
549		}
550	}
551}
552
553/// Top-level subsystem state, owning session tracking, V3 transition detection,
554/// and PVF preparation bookkeeping.
555struct State {
556	/// Current session index, tracked across active leaf updates.
557	session_index: Option<SessionIndex>,
558	/// Monotonic flag: set to `true` once any activated leaf has the V3 candidate
559	/// descriptor node feature enabled. Once set, never unset.
560	/// Used to determine whether approval/dispute validation should trust
561	/// `version()` (V3-capable) or fall back to `version_old_rules()`.
562	/// See `CandidateDescriptorV2::version_for_candidate_validation` for the safety argument.
563	v3_ever_seen: bool,
564	/// PVF preparation state (proactive pre-compilation for next session).
565	pvf_prep: PvfPrepState,
566}
567
568impl Default for State {
569	fn default() -> Self {
570		Self { session_index: None, v3_ever_seen: false, pvf_prep: PvfPrepState::default() }
571	}
572}
573
574/// State for proactive PVF preparation.
575///
576/// Tracks whether we're a next-session authority and which code hashes we've already
577/// sent to the PVF host.
578struct PvfPrepState {
579	is_next_session_authority: bool,
580	// PVF host won't prepare the same code hash twice, so here we just avoid extra communication
581	already_prepared_code_hashes: HashSet<ValidationCodeHash>,
582	// How many PVFs per block we take to prepare themselves for the next session validation
583	per_block_limit: usize,
584}
585
586impl Default for PvfPrepState {
587	fn default() -> Self {
588		Self {
589			is_next_session_authority: false,
590			already_prepared_code_hashes: HashSet::new(),
591			per_block_limit: 1,
592		}
593	}
594}
595
596/// Check if the V3 candidate descriptor node feature is enabled at the given
597/// session. Returns `true` if the feature is set.
598async fn check_v3_feature<Sender>(
599	sender: &mut Sender,
600	relay_parent: Hash,
601	session_index: SessionIndex,
602) -> bool
603where
604	Sender: SubsystemSender<RuntimeApiMessage>,
605{
606	if let Ok(Ok(features)) = request_node_features(relay_parent, session_index, sender).await.await
607	{
608		if FeatureIndex::CandidateReceiptV3.is_set(&features) {
609			gum::info!(
610				target: LOG_TARGET,
611				?session_index,
612				"CandidateReceiptV3 node feature detected, \
613				 switching to V3-aware approval/dispute validation",
614			);
615			return true;
616		}
617	}
618	false
619}
620
621async fn handle_active_leaves_update<Sender>(
622	sender: &mut Sender,
623	keystore: KeystorePtr,
624	validation_host: &mut impl ValidationBackend,
625	update: ActiveLeavesUpdate,
626	state: &mut State,
627) where
628	Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
629{
630	update_active_leaves_validation_backend(sender, validation_host, update.clone()).await;
631
632	let Some(activated) = update.activated else { return };
633	let maybe_session_index = get_session_index(sender, activated.hash).await;
634
635	// Detect session change
636	let new_session = match (state.session_index, maybe_session_index) {
637		(Some(old), Some(new)) => (new > old).then_some(new),
638		(None, Some(new)) => Some(new),
639		_ => None,
640	};
641
642	state.session_index = new_session.or(state.session_index);
643
644	// V3 feature detection on session change
645	if !state.v3_ever_seen {
646		if let Some(session_index) = new_session {
647			state.v3_ever_seen = check_v3_feature(sender, activated.hash, session_index).await;
648		}
649	}
650
651	// Proactive PVF preparation
652	maybe_prepare_validation(
653		sender,
654		keystore.clone(),
655		validation_host,
656		activated,
657		&mut state.pvf_prep,
658		new_session,
659	)
660	.await;
661}
662
663async fn maybe_prepare_validation<Sender>(
664	sender: &mut Sender,
665	keystore: KeystorePtr,
666	validation_backend: &mut impl ValidationBackend,
667	leaf: ActivatedLeaf,
668	pvf_prep: &mut PvfPrepState,
669	new_session: Option<SessionIndex>,
670) where
671	Sender: SubsystemSender<RuntimeApiMessage>,
672{
673	if let Some(new_session_index) = new_session {
674		pvf_prep.already_prepared_code_hashes.clear();
675		pvf_prep.is_next_session_authority =
676			check_next_session_authority(sender, keystore, leaf.hash, new_session_index).await;
677	}
678
679	// On every active leaf check candidates and prepare PVFs our node doesn't have yet.
680	if pvf_prep.is_next_session_authority {
681		let code_hashes = prepare_pvfs_for_backed_candidates(
682			sender,
683			validation_backend,
684			leaf.hash,
685			&pvf_prep.already_prepared_code_hashes,
686			pvf_prep.per_block_limit,
687		)
688		.await;
689		pvf_prep.already_prepared_code_hashes.extend(code_hashes.unwrap_or_default());
690	}
691}
692
693async fn get_session_index<Sender>(sender: &mut Sender, relay_parent: Hash) -> Option<SessionIndex>
694where
695	Sender: SubsystemSender<RuntimeApiMessage>,
696{
697	let Ok(Ok(session_index)) =
698		util::request_session_index_for_child(relay_parent, sender).await.await
699	else {
700		gum::warn!(
701			target: LOG_TARGET,
702			?relay_parent,
703			"cannot fetch session index from runtime API",
704		);
705		return None;
706	};
707
708	Some(session_index)
709}
710
711// Returns true if the node is an authority in the next session.
712async fn check_next_session_authority<Sender>(
713	sender: &mut Sender,
714	keystore: KeystorePtr,
715	relay_parent: Hash,
716	session_index: SessionIndex,
717) -> bool
718where
719	Sender: SubsystemSender<RuntimeApiMessage>,
720{
721	// In spite of function name here we request past, present and future authorities.
722	// It's ok to stil prepare PVFs in other cases, but better to request only future ones.
723	let Ok(Ok(authorities)) = util::request_authorities(relay_parent, sender).await.await else {
724		gum::warn!(
725			target: LOG_TARGET,
726			?relay_parent,
727			"cannot fetch authorities from runtime API",
728		);
729		return false;
730	};
731
732	// We need to exclude at least current session authority from the previous request
733	let Ok(Ok(Some(session_info))) =
734		util::request_session_info(relay_parent, session_index, sender).await.await
735	else {
736		gum::warn!(
737			target: LOG_TARGET,
738			?relay_parent,
739			"cannot fetch session info from runtime API",
740		);
741		return false;
742	};
743
744	let is_past_present_or_future_authority = authorities
745		.iter()
746		.any(|v| keystore.has_keys(&[(v.to_raw_vec(), AuthorityDiscoveryId::ID)]));
747
748	// We could've checked discovery_keys but on Kusama validators.len() < discovery_keys.len().
749	let is_present_validator = session_info
750		.validators
751		.iter()
752		.any(|v| keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]));
753
754	// There is still a chance to be a previous session authority, but this extra work does not
755	// affect the finalization.
756	is_past_present_or_future_authority && !is_present_validator
757}
758
759// Sends PVF with unknown code hashes to the validation host returning the list of code hashes sent.
760async fn prepare_pvfs_for_backed_candidates<Sender>(
761	sender: &mut Sender,
762	validation_backend: &mut impl ValidationBackend,
763	relay_parent: Hash,
764	already_prepared: &HashSet<ValidationCodeHash>,
765	per_block_limit: usize,
766) -> Option<Vec<ValidationCodeHash>>
767where
768	Sender: SubsystemSender<RuntimeApiMessage>,
769{
770	let Ok(Ok(events)) = util::request_candidate_events(relay_parent, sender).await.await else {
771		gum::warn!(
772			target: LOG_TARGET,
773			?relay_parent,
774			"cannot fetch candidate events from runtime API",
775		);
776		return None;
777	};
778	let code_hashes = events
779		.into_iter()
780		.filter_map(|e| match e {
781			CandidateEvent::CandidateBacked(receipt, ..) => {
782				let h = receipt.descriptor.validation_code_hash();
783				if already_prepared.contains(&h) {
784					None
785				} else {
786					Some(h)
787				}
788			},
789			_ => None,
790		})
791		.take(per_block_limit)
792		.collect::<Vec<_>>();
793
794	let Ok(executor_params) = util::executor_params_at_relay_parent(relay_parent, sender).await
795	else {
796		gum::warn!(
797			target: LOG_TARGET,
798			?relay_parent,
799			"cannot fetch executor params for the session",
800		);
801		return None;
802	};
803	let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
804
805	let mut active_pvfs = vec![];
806	let mut processed_code_hashes = vec![];
807	for code_hash in code_hashes {
808		let Ok(Ok(Some(validation_code))) =
809			util::request_validation_code_by_hash(relay_parent, code_hash, sender)
810				.await
811				.await
812		else {
813			gum::warn!(
814				target: LOG_TARGET,
815				?relay_parent,
816				?code_hash,
817				"cannot fetch validation code hash from runtime API",
818			);
819			continue;
820		};
821
822		let Some(session_index) = get_session_index(sender, relay_parent).await else { continue };
823
824		let validation_code_bomb_limit = match util::runtime::fetch_validation_code_bomb_limit(
825			relay_parent,
826			session_index,
827			sender,
828		)
829		.await
830		{
831			Ok(limit) => limit,
832			Err(err) => {
833				gum::warn!(
834					target: LOG_TARGET,
835					?relay_parent,
836					?err,
837					"cannot fetch validation code bomb limit from runtime API",
838				);
839				continue;
840			},
841		};
842
843		let pvf = PvfPrepData::from_code(
844			validation_code.0,
845			executor_params.clone(),
846			timeout,
847			PrepareJobKind::Prechecking,
848			validation_code_bomb_limit,
849		);
850
851		active_pvfs.push(pvf);
852		processed_code_hashes.push(code_hash);
853	}
854
855	if active_pvfs.is_empty() {
856		return None;
857	}
858
859	if let Err(err) = validation_backend.heads_up(active_pvfs).await {
860		gum::warn!(
861			target: LOG_TARGET,
862			?relay_parent,
863			?err,
864			"cannot prepare PVF for the next session",
865		);
866		return None;
867	};
868
869	gum::debug!(
870		target: LOG_TARGET,
871		?relay_parent,
872		?processed_code_hashes,
873		"Prepared PVF for the next session",
874	);
875
876	Some(processed_code_hashes)
877}
878
879async fn update_active_leaves_validation_backend<Sender>(
880	sender: &mut Sender,
881	validation_backend: &mut impl ValidationBackend,
882	update: ActiveLeavesUpdate,
883) where
884	Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
885{
886	let ancestors = if let Some(ref activated) = update.activated {
887		get_block_ancestors(sender, activated.hash).await
888	} else {
889		vec![]
890	};
891	if let Err(err) = validation_backend.update_active_leaves(update, ancestors).await {
892		gum::warn!(
893			target: LOG_TARGET,
894			?err,
895			"cannot update active leaves in validation backend",
896		);
897	};
898}
899
900/// Get list of still valid scheduling parents for the given leaf.
901///
902/// TODO: This function does not take into account session boundaries, which leads to wasted effort:
903/// https://github.com/paritytech/polkadot-sdk/issues/11301
904async fn get_block_ancestors<Sender>(sender: &mut Sender, leaf: Hash) -> Vec<Hash>
905where
906	Sender: SubsystemSender<ChainApiMessage> + SubsystemSender<RuntimeApiMessage>,
907{
908	let Some(session_index) = get_session_index(sender, leaf).await else {
909		gum::warn!(target: LOG_TARGET, ?leaf, "Failed to request session index for leaf.");
910		return vec![];
911	};
912	let scheduling_lookahead = match fetch_scheduling_lookahead(leaf, session_index, sender).await {
913		Ok(scheduling_lookahead) => scheduling_lookahead,
914		res => {
915			gum::warn!(target: LOG_TARGET, ?res, "Failed to request scheduling lookahead");
916			return vec![];
917		},
918	};
919
920	let (tx, rx) = oneshot::channel();
921	sender
922		.send_message(ChainApiMessage::Ancestors {
923			hash: leaf,
924			// Subtract 1 from the claim queue length, as it includes current `scheduling_parent`.
925			k: scheduling_lookahead.saturating_sub(1) as usize,
926			response_channel: tx,
927		})
928		.await;
929	match rx.await {
930		Ok(Ok(x)) => x,
931		res => {
932			gum::warn!(target: LOG_TARGET, ?res, "cannot request ancestors");
933			vec![]
934		},
935	}
936}
937
938struct RuntimeRequestFailed;
939
940async fn runtime_api_request<T, Sender>(
941	sender: &mut Sender,
942	relay_parent: Hash,
943	request: RuntimeApiRequest,
944	receiver: oneshot::Receiver<Result<T, RuntimeApiError>>,
945) -> Result<T, RuntimeRequestFailed>
946where
947	Sender: SubsystemSender<RuntimeApiMessage>,
948{
949	sender
950		.send_message(RuntimeApiMessage::Request(relay_parent, request).into())
951		.await;
952
953	receiver
954		.await
955		.map_err(|_| {
956			gum::debug!(target: LOG_TARGET, ?relay_parent, "Runtime API request dropped");
957
958			RuntimeRequestFailed
959		})
960		.and_then(|res| {
961			res.map_err(|e| {
962				gum::debug!(
963					target: LOG_TARGET,
964					?relay_parent,
965					err = ?e,
966					"Runtime API request internal error"
967				);
968
969				RuntimeRequestFailed
970			})
971		})
972}
973
974async fn request_validation_code_by_hash<Sender>(
975	sender: &mut Sender,
976	relay_parent: Hash,
977	validation_code_hash: ValidationCodeHash,
978) -> Result<Option<ValidationCode>, RuntimeRequestFailed>
979where
980	Sender: SubsystemSender<RuntimeApiMessage>,
981{
982	let (tx, rx) = oneshot::channel();
983	runtime_api_request(
984		sender,
985		relay_parent,
986		RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
987		rx,
988	)
989	.await
990}
991
992async fn precheck_pvf<Sender>(
993	sender: &mut Sender,
994	mut validation_backend: impl ValidationBackend,
995	relay_parent: Hash,
996	validation_code_hash: ValidationCodeHash,
997	validation_code_bomb_limit: u32,
998) -> PreCheckOutcome
999where
1000	Sender: SubsystemSender<RuntimeApiMessage>,
1001{
1002	let validation_code =
1003		match request_validation_code_by_hash(sender, relay_parent, validation_code_hash).await {
1004			Ok(Some(code)) => code,
1005			_ => {
1006				// The reasoning why this is "failed" and not invalid is because we assume that
1007				// during pre-checking voting the relay-chain will pin the code. In case the code
1008				// actually is not there, we issue failed since this looks more like a bug.
1009				gum::warn!(
1010					target: LOG_TARGET,
1011					?relay_parent,
1012					?validation_code_hash,
1013					"precheck: requested validation code is not found on-chain!",
1014				);
1015				return PreCheckOutcome::Failed;
1016			},
1017		};
1018
1019	let executor_params = if let Ok(executor_params) =
1020		util::executor_params_at_relay_parent(relay_parent, sender).await
1021	{
1022		gum::debug!(
1023			target: LOG_TARGET,
1024			?relay_parent,
1025			?validation_code_hash,
1026			"precheck: acquired executor params for the session: {:?}",
1027			executor_params,
1028		);
1029		executor_params
1030	} else {
1031		gum::warn!(
1032			target: LOG_TARGET,
1033			?relay_parent,
1034			?validation_code_hash,
1035			"precheck: failed to acquire executor params for the session, thus voting against.",
1036		);
1037		return PreCheckOutcome::Invalid;
1038	};
1039
1040	let timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Precheck);
1041
1042	let pvf = PvfPrepData::from_code(
1043		validation_code.0,
1044		executor_params,
1045		timeout,
1046		PrepareJobKind::Prechecking,
1047		validation_code_bomb_limit,
1048	);
1049
1050	match validation_backend.precheck_pvf(pvf).await {
1051		Ok(_) => PreCheckOutcome::Valid,
1052		Err(prepare_err) => {
1053			if prepare_err.is_deterministic() {
1054				PreCheckOutcome::Invalid
1055			} else {
1056				PreCheckOutcome::Failed
1057			}
1058		},
1059	}
1060}
1061
1062/// Execute a PVF and validate the candidate's output.
1063///
1064/// Assumes all pre-validation ([`pre_validate_candidate`]) has already passed.
1065/// Handles:
1066/// 1. PVF execution (backing: single attempt; approval/dispute: with retry)
1067/// 2. Post-validation: para_head hash, commitments hash
1068/// 3. Backing-only post-validation: UMP signal validation against claim queue
1069async fn validate_candidate(
1070	mut validation_backend: impl ValidationBackend + Send,
1071	persisted_validation_data: PersistedValidationData,
1072	validation_code: ValidationCode,
1073	candidate_receipt: CandidateReceipt,
1074	pov: Arc<PoV>,
1075	executor_params: ExecutorParams,
1076	exec_kind: PvfExecKind,
1077	metrics: &Metrics,
1078	v3_seen: bool,
1079	pre: PreValidationOutput,
1080) -> Result<ValidationResult, ValidationFailed> {
1081	let _timer = metrics.time_validate_candidate_exhaustive();
1082	let para_id = candidate_receipt.descriptor.para_id();
1083	let candidate_hash = candidate_receipt.hash();
1084
1085	gum::debug!(
1086		target: LOG_TARGET,
1087		?candidate_hash,
1088		?para_id,
1089		"About to validate a candidate.",
1090	);
1091
1092	let persisted_validation_data = Arc::new(persisted_validation_data);
1093
1094	// Create the validation context shared by both backing and approval/dispute paths
1095	let validation_context = ValidationContext {
1096		candidate_receipt: candidate_receipt.clone(),
1097		pvd: persisted_validation_data.clone(),
1098		pov: pov.clone(),
1099		executor_params: executor_params.clone(),
1100		exec_timeout: pvf_exec_timeout(&executor_params, exec_kind.into()),
1101		v3_seen,
1102	};
1103
1104	let result = match exec_kind {
1105		// Retry is disabled to reduce the chance of nondeterministic blocks getting backed and
1106		// honest backers getting slashed.
1107		PvfExecKind::Backing(_) | PvfExecKind::BackingSystemParas(_) => {
1108			let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
1109			let pvf = PvfPrepData::from_code(
1110				validation_code.0,
1111				executor_params,
1112				prep_timeout,
1113				PrepareJobKind::Compilation,
1114				pre.validation_code_bomb_limit,
1115			);
1116
1117			validation_backend.validate_candidate(pvf, validation_context, exec_kind).await
1118		},
1119		PvfExecKind::Approval | PvfExecKind::Dispute => {
1120			validation_backend
1121				.validate_candidate_with_retry(
1122					validation_code.0,
1123					validation_context,
1124					PVF_APPROVAL_EXECUTION_RETRY_DELAY,
1125					exec_kind,
1126					pre.validation_code_bomb_limit,
1127				)
1128				.await
1129		},
1130	};
1131
1132	if let Err(ref error) = result {
1133		gum::info!(target: LOG_TARGET, ?para_id, ?candidate_hash, ?error, "Failed to validate candidate");
1134	}
1135
1136	match result {
1137		Err(ValidationError::Internal(e)) => {
1138			gum::warn!(
1139				target: LOG_TARGET,
1140				?para_id,
1141				?candidate_hash,
1142				?e,
1143				"An internal error occurred during validation, will abstain from voting",
1144			);
1145			Err(ValidationFailed(e.to_string()))
1146		},
1147		Err(ValidationError::Invalid(WasmInvalidCandidate::HardTimeout)) => {
1148			Ok(ValidationResult::Invalid(InvalidCandidate::Timeout))
1149		},
1150		Err(ValidationError::Invalid(WasmInvalidCandidate::WorkerReportedInvalid(e))) => {
1151			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(e)))
1152		},
1153		Err(ValidationError::Invalid(WasmInvalidCandidate::PoVDecompressionFailure)) => {
1154			Ok(ValidationResult::Invalid(InvalidCandidate::PoVDecompressionFailure))
1155		},
1156		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousWorkerDeath)) => {
1157			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(
1158				"ambiguous worker death".to_string(),
1159			)))
1160		},
1161		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(err))) => {
1162			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err)))
1163		},
1164		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::RuntimeConstruction(err))) => {
1165			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err)))
1166		},
1167		Err(ValidationError::PossiblyInvalid(err @ PossiblyInvalidError::CorruptedArtifact)) => {
1168			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(err.to_string())))
1169		},
1170
1171		Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::AmbiguousJobDeath(err))) => {
1172			Ok(ValidationResult::Invalid(InvalidCandidate::ExecutionError(format!(
1173				"ambiguous job death: {err}"
1174			))))
1175		},
1176		Err(ValidationError::Preparation(e)) => {
1177			gum::warn!(
1178				target: LOG_TARGET,
1179				?para_id,
1180				?e,
1181				"Deterministic error occurred during preparation (should have been ruled out by pre-checking phase)",
1182			);
1183			Err(ValidationFailed(e.to_string()))
1184		},
1185		Err(e @ ValidationError::ExecutionDeadline) => {
1186			gum::warn!(
1187				target: LOG_TARGET,
1188				?para_id,
1189				?e,
1190				"Job assigned too late, execution queue probably overloaded",
1191			);
1192			Err(ValidationFailed(e.to_string()))
1193		},
1194		Ok(res) => {
1195			if res.head_data.hash() != candidate_receipt.descriptor.para_head() {
1196				gum::info!(target: LOG_TARGET, ?para_id, "Invalid candidate (para_head)");
1197				Ok(ValidationResult::Invalid(InvalidCandidate::ParaHeadHashMismatch))
1198			} else {
1199				let committed_candidate_receipt = CommittedCandidateReceipt {
1200					descriptor: candidate_receipt.descriptor.clone(),
1201					commitments: CandidateCommitments {
1202						head_data: res.head_data,
1203						upward_messages: res.upward_messages,
1204						horizontal_messages: res.horizontal_messages,
1205						new_validation_code: res.new_validation_code,
1206						processed_downward_messages: res.processed_downward_messages,
1207						hrmp_watermark: res.hrmp_watermark,
1208					},
1209				};
1210
1211				if candidate_receipt.commitments_hash !=
1212					committed_candidate_receipt.commitments.hash()
1213				{
1214					gum::info!(
1215						target: LOG_TARGET,
1216						?para_id,
1217						?candidate_hash,
1218						"Invalid candidate (commitments hash)"
1219					);
1220
1221					gum::trace!(
1222						target: LOG_TARGET,
1223						?para_id,
1224						?candidate_hash,
1225						produced_commitments = ?committed_candidate_receipt.commitments,
1226						"Invalid candidate commitments"
1227					);
1228
1229					// If validation produced a new set of commitments, we treat the candidate as
1230					// invalid.
1231					Ok(ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch))
1232				} else {
1233					// Backing-only: validate UMP signals against the claim queue.
1234					if let Some(claim_queue) = &pre.claim_queue {
1235						if let Err(err) = committed_candidate_receipt
1236							.parse_ump_signals(&transpose_claim_queue(claim_queue.0.clone()))
1237						{
1238							gum::warn!(
1239								target: LOG_TARGET,
1240								candidate_hash = ?candidate_receipt.hash(),
1241								"Invalid UMP signals: {}",
1242								err
1243							);
1244							return Ok(ValidationResult::Invalid(
1245								InvalidCandidate::InvalidUMPSignals(err),
1246							));
1247						}
1248					}
1249
1250					Ok(ValidationResult::Valid(
1251						committed_candidate_receipt.commitments,
1252						(*persisted_validation_data).clone(),
1253					))
1254				}
1255			}
1256		},
1257	}
1258}
1259
1260#[async_trait]
1261trait ValidationBackend {
1262	/// Tries executing a PVF a single time (no retries).
1263	async fn validate_candidate(
1264		&mut self,
1265		pvf: PvfPrepData,
1266		validation_context: ValidationContext,
1267		exec_kind: PvfExecKind,
1268	) -> Result<WasmValidationResult, ValidationError>;
1269
1270	/// Tries executing a PVF. Will retry once if an error is encountered that may have
1271	/// been transient.
1272	///
1273	/// NOTE: Should retry only on errors that are a result of execution itself, and not of
1274	/// preparation.
1275	async fn validate_candidate_with_retry(
1276		&mut self,
1277		code: Vec<u8>,
1278		validation_context: ValidationContext,
1279		retry_delay: Duration,
1280		exec_kind: PvfExecKind,
1281		validation_code_bomb_limit: u32,
1282	) -> Result<WasmValidationResult, ValidationError> {
1283		let exec_timeout = validation_context.exec_timeout;
1284		let executor_params = validation_context.executor_params.clone();
1285		let prep_timeout = pvf_prep_timeout(&executor_params, PvfPrepKind::Prepare);
1286		// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
1287		let pvf = PvfPrepData::from_code(
1288			code,
1289			executor_params,
1290			prep_timeout,
1291			PrepareJobKind::Compilation,
1292			validation_code_bomb_limit,
1293		);
1294		// We keep track of the total time that has passed and stop retrying if we are taking too
1295		// long.
1296		let total_time_start = Instant::now();
1297
1298		let mut validation_result = self
1299			.validate_candidate(pvf.clone(), validation_context.clone(), exec_kind)
1300			.await;
1301		if validation_result.is_ok() {
1302			return validation_result;
1303		}
1304
1305		macro_rules! break_if_no_retries_left {
1306			($counter:ident) => {
1307				if $counter > 0 {
1308					$counter -= 1;
1309				} else {
1310					break;
1311				}
1312			};
1313		}
1314
1315		// Allow limited retries for each kind of error.
1316		let mut num_death_retries_left = 1;
1317		let mut num_job_error_retries_left = 1;
1318		let mut num_internal_retries_left = 1;
1319		let mut num_execution_error_retries_left = 1;
1320		loop {
1321			// Stop retrying if we exceeded the timeout.
1322			if total_time_start.elapsed() + retry_delay > exec_timeout {
1323				break;
1324			}
1325			let mut retry_immediately = false;
1326			match validation_result {
1327				Err(ValidationError::PossiblyInvalid(
1328					PossiblyInvalidError::AmbiguousWorkerDeath |
1329					PossiblyInvalidError::AmbiguousJobDeath(_),
1330				)) => break_if_no_retries_left!(num_death_retries_left),
1331
1332				Err(ValidationError::PossiblyInvalid(PossiblyInvalidError::JobError(_))) => {
1333					break_if_no_retries_left!(num_job_error_retries_left)
1334				},
1335
1336				Err(ValidationError::Internal(_)) => {
1337					break_if_no_retries_left!(num_internal_retries_left)
1338				},
1339
1340				Err(ValidationError::PossiblyInvalid(
1341					PossiblyInvalidError::RuntimeConstruction(_) |
1342					PossiblyInvalidError::CorruptedArtifact,
1343				)) => {
1344					break_if_no_retries_left!(num_execution_error_retries_left);
1345					self.precheck_pvf(pvf.clone()).await?;
1346					// In this case the error is deterministic
1347					// And a retry forces the ValidationBackend
1348					// to re-prepare the artifact so
1349					// there is no need to wait before the retry
1350					retry_immediately = true;
1351				},
1352
1353				Ok(_) |
1354				Err(
1355					ValidationError::Invalid(_) |
1356					ValidationError::Preparation(_) |
1357					ValidationError::ExecutionDeadline,
1358				) => break,
1359			}
1360
1361			// If we got a possibly transient error, retry once after a brief delay, on the
1362			// assumption that the conditions that caused this error may have resolved on their own.
1363			{
1364				// In case of many transient errors it is necessary to wait a little bit
1365				// for the error to be probably resolved
1366				if !retry_immediately {
1367					futures_timer::Delay::new(retry_delay).await;
1368				}
1369
1370				let new_timeout = exec_timeout.saturating_sub(total_time_start.elapsed());
1371
1372				gum::warn!(
1373					target: LOG_TARGET,
1374					?pvf,
1375					?new_timeout,
1376					"Re-trying failed candidate validation due to possible transient error: {:?}",
1377					validation_result
1378				);
1379
1380				// Update the validation context with the new timeout
1381				let mut retry_context = validation_context.clone();
1382				retry_context.exec_timeout = new_timeout;
1383
1384				validation_result =
1385					self.validate_candidate(pvf.clone(), retry_context, exec_kind).await;
1386			}
1387		}
1388
1389		validation_result
1390	}
1391
1392	async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError>;
1393
1394	async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String>;
1395
1396	/// Inform the backend about active leaf changes
1397	///
1398	/// Ancestors provided should match the still valid scheduling parents (implicit view) as of the
1399	/// activated leaf. This is used for pruning queued jobs which became obsolete.
1400	async fn update_active_leaves(
1401		&mut self,
1402		update: ActiveLeavesUpdate,
1403		ancestors: Vec<Hash>,
1404	) -> Result<(), String>;
1405}
1406
1407#[async_trait]
1408impl ValidationBackend for ValidationHost {
1409	/// Tries executing a PVF a single time (no retries).
1410	async fn validate_candidate(
1411		&mut self,
1412		pvf: PvfPrepData,
1413		validation_context: ValidationContext,
1414		exec_kind: PvfExecKind,
1415	) -> Result<WasmValidationResult, ValidationError> {
1416		let (tx, rx) = oneshot::channel();
1417		if let Err(err) =
1418			self.execute_pvf(pvf, validation_context, exec_kind.into(), exec_kind, tx).await
1419		{
1420			return Err(InternalValidationError::HostCommunication(format!(
1421				"cannot send pvf to the validation host, it might have shut down: {:?}",
1422				err
1423			))
1424			.into());
1425		}
1426
1427		rx.await.map_err(|_| {
1428			ValidationError::from(InternalValidationError::HostCommunication(
1429				"validation was cancelled".into(),
1430			))
1431		})?
1432	}
1433
1434	async fn precheck_pvf(&mut self, pvf: PvfPrepData) -> Result<(), PrepareError> {
1435		let (tx, rx) = oneshot::channel();
1436		if let Err(err) = self.precheck_pvf(pvf, tx).await {
1437			// Return an IO error if there was an error communicating with the host.
1438			return Err(PrepareError::IoErr(err));
1439		}
1440
1441		let precheck_result = rx.await.map_err(|err| PrepareError::IoErr(err.to_string()))?;
1442
1443		precheck_result
1444	}
1445
1446	async fn heads_up(&mut self, active_pvfs: Vec<PvfPrepData>) -> Result<(), String> {
1447		self.heads_up(active_pvfs).await
1448	}
1449
1450	async fn update_active_leaves(
1451		&mut self,
1452		update: ActiveLeavesUpdate,
1453		ancestors: Vec<Hash>,
1454	) -> Result<(), String> {
1455		self.update_active_leaves(update, ancestors).await
1456	}
1457}
1458
1459/// Does basic checks of a candidate. Provide the encoded PoV-block. Returns `Ok` if basic checks
1460/// are passed, `Err` otherwise.
1461fn perform_basic_checks(
1462	candidate: &CandidateDescriptor,
1463	max_pov_size: u32,
1464	pov: &PoV,
1465	validation_code_hash: &ValidationCodeHash,
1466) -> Result<(), InvalidCandidate> {
1467	let pov_hash = pov.hash();
1468
1469	let encoded_pov_size = pov.encoded_size();
1470	if encoded_pov_size > max_pov_size as usize {
1471		return Err(InvalidCandidate::ParamsTooLarge(encoded_pov_size as u64));
1472	}
1473
1474	if pov_hash != candidate.pov_hash() {
1475		return Err(InvalidCandidate::PoVHashMismatch);
1476	}
1477
1478	if *validation_code_hash != candidate.validation_code_hash() {
1479		return Err(InvalidCandidate::CodeHashMismatch);
1480	}
1481
1482	Ok(())
1483}
1484
1485/// To determine the amount of timeout time for the pvf execution.
1486///
1487/// Precheck
1488/// 	The time period after which the preparation worker is considered
1489/// unresponsive and will be killed.
1490///
1491/// Prepare
1492/// The time period after which the preparation worker is considered
1493/// unresponsive and will be killed.
1494fn pvf_prep_timeout(executor_params: &ExecutorParams, kind: PvfPrepKind) -> Duration {
1495	if let Some(timeout) = executor_params.pvf_prep_timeout(kind) {
1496		return timeout;
1497	}
1498	match kind {
1499		PvfPrepKind::Precheck => DEFAULT_PRECHECK_PREPARATION_TIMEOUT,
1500		PvfPrepKind::Prepare => DEFAULT_LENIENT_PREPARATION_TIMEOUT,
1501	}
1502}
1503
1504/// To determine the amount of timeout time for the pvf execution.
1505///
1506/// Backing subsystem
1507/// The amount of time to spend on execution during backing.
1508///
1509/// Approval subsystem
1510/// The amount of time to spend on execution during approval or disputes.
1511/// This should be much longer than the backing execution timeout to ensure that in the
1512/// absence of extremely large disparities between hardware, blocks that pass backing are
1513/// considered executable by approval checkers or dispute participants.
1514fn pvf_exec_timeout(executor_params: &ExecutorParams, kind: RuntimePvfExecKind) -> Duration {
1515	if let Some(timeout) = executor_params.pvf_exec_timeout(kind) {
1516		return timeout;
1517	}
1518	match kind {
1519		RuntimePvfExecKind::Backing => DEFAULT_BACKING_EXECUTION_TIMEOUT,
1520		RuntimePvfExecKind::Approval => DEFAULT_APPROVAL_EXECUTION_TIMEOUT,
1521	}
1522}