pezkuwi_availability_recovery/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
2// This file is part of Pezkuwi.
3
4// Pezkuwi is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Pezkuwi is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Pezkuwi.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Availability Recovery Subsystem of Pezkuwi.
18
19#![warn(missing_docs)]
20
21use std::{
22	collections::{BTreeMap, VecDeque},
23	iter::Iterator,
24	num::NonZeroUsize,
25	pin::Pin,
26};
27
28use futures::{
29	channel::oneshot,
30	future::{Future, FutureExt, RemoteHandle},
31	pin_mut,
32	prelude::*,
33	sink::SinkExt,
34	stream::{FuturesUnordered, StreamExt},
35	task::{Context, Poll},
36};
37use pezsc_network::ProtocolName;
38use schnellru::{ByLength, LruMap};
39use task::{
40	FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks,
41	FetchSystematicChunksParams,
42};
43
44use pezkuwi_erasure_coding::{
45	branches, obtain_chunks_v1, recovery_threshold, systematic_recovery_threshold,
46	Error as ErasureEncodingError,
47};
48use task::{RecoveryParams, RecoveryStrategy, RecoveryTask};
49
50use error::{log_error, Error, FatalError, Result};
51use pezkuwi_node_network_protocol::{
52	request_response::{
53		v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, IsRequest, ReqProtocolNames,
54	},
55	UnifiedReputationChange as Rep,
56};
57use pezkuwi_node_subsystem::{
58	errors::RecoveryError,
59	messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage},
60	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
61	SubsystemContext, SubsystemError,
62};
63use pezkuwi_node_subsystem_util::{
64	availability_chunks::availability_chunk_indices,
65	runtime::{ExtendedSessionInfo, RuntimeInfo},
66};
67use pezkuwi_pez_node_primitives::AvailableData;
68use pezkuwi_primitives::{
69	node_features, BlockNumber, CandidateHash, CandidateReceiptV2 as CandidateReceipt, ChunkIndex,
70	CoreIndex, GroupIndex, Hash, SessionIndex, ValidatorIndex,
71};
72
73mod error;
74mod futures_undead;
75mod metrics;
76mod task;
77pub use metrics::Metrics;
78
79#[cfg(test)]
80mod tests;
81
82type RecoveryResult = std::result::Result<AvailableData, RecoveryError>;
83
84const LOG_TARGET: &str = "teyrchain::availability-recovery";
85
86// Size of the LRU cache where we keep recovered data.
87const LRU_SIZE: u32 = 16;
88
89const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
90
91/// PoV size limit in bytes for which prefer fetching from backers. (conservative, Pezkuwi for now)
92pub(crate) const CONSERVATIVE_FETCH_CHUNKS_THRESHOLD: usize = 1 * 1024 * 1024;
93/// PoV size limit in bytes for which prefer fetching from backers. (Kusama and all testnets)
94pub const FETCH_CHUNKS_THRESHOLD: usize = 4 * 1024 * 1024;
95
96#[derive(Clone, PartialEq)]
97/// The strategy we use to recover the PoV.
98pub enum RecoveryStrategyKind {
99	/// We try the backing group first if PoV size is lower than specified, then fallback to
100	/// validator chunks.
101	BackersFirstIfSizeLower(usize),
102	/// We try the backing group first if PoV size is lower than specified, then fallback to
103	/// systematic chunks. Regular chunk recovery as a last resort.
104	BackersFirstIfSizeLowerThenSystematicChunks(usize),
105
106	/// The following variants are only helpful for integration tests.
107	///
108	/// We always try the backing group first, then fallback to validator chunks.
109	#[allow(dead_code)]
110	BackersFirstAlways,
111	/// We always recover using validator chunks.
112	#[allow(dead_code)]
113	ChunksAlways,
114	/// First try the backing group. Then systematic chunks.
115	#[allow(dead_code)]
116	BackersThenSystematicChunks,
117	/// Always recover using systematic chunks, fall back to regular chunks.
118	#[allow(dead_code)]
119	SystematicChunks,
120}
121
122/// The Availability Recovery Subsystem.
123pub struct AvailabilityRecoverySubsystem {
124	/// PoV recovery strategy to use.
125	recovery_strategy_kind: RecoveryStrategyKind,
126	// If this is true, do not request data from the availability store.
127	/// This is the useful for nodes where the
128	/// availability-store subsystem is not expected to run,
129	/// such as collators.
130	bypass_availability_store: bool,
131	/// Receiver for available data requests.
132	req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
133	/// Metrics for this subsystem.
134	metrics: Metrics,
135	/// The type of check to perform after available data was recovered.
136	post_recovery_check: PostRecoveryCheck,
137	/// Full protocol name for ChunkFetchingV1.
138	req_v1_protocol_name: ProtocolName,
139	/// Full protocol name for ChunkFetchingV2.
140	req_v2_protocol_name: ProtocolName,
141}
142
143#[derive(Clone, PartialEq, Debug)]
144/// The type of check to perform after available data was recovered.
145enum PostRecoveryCheck {
146	/// Reencode the data and check erasure root. For validators.
147	Reencode,
148	/// Only check the pov hash. For collators only.
149	PovHash,
150}
151
152/// Expensive erasure coding computations that we want to run on a blocking thread.
153enum ErasureTask {
154	/// Reconstructs `AvailableData` from chunks given `n_validators`.
155	Reconstruct(
156		usize,
157		BTreeMap<ChunkIndex, Vec<u8>>,
158		oneshot::Sender<std::result::Result<AvailableData, ErasureEncodingError>>,
159	),
160	/// Re-encode `AvailableData` into erasure chunks in order to verify the provided root hash of
161	/// the Merkle tree.
162	Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
163}
164
165/// Re-encode the data into erasure chunks in order to verify
166/// the root hash of the provided Merkle tree, which is built
167/// on-top of the encoded chunks.
168///
169/// This (expensive) check is necessary, as otherwise we can't be sure that some chunks won't have
170/// been tampered with by the backers, which would result in some validators considering the data
171/// valid and some invalid as having fetched different set of chunks. The checking of the Merkle
172/// proof for individual chunks only gives us guarantees, that we have fetched a chunk belonging to
173/// a set the backers have committed to.
174///
175/// NOTE: It is fine to do this check with already decoded data, because if the decoding failed for
176/// some validators, we can be sure that chunks have been tampered with (by the backers) or the
177/// data was invalid to begin with. In the former case, validators fetching valid chunks will see
178/// invalid data as well, because the root won't match. In the latter case the situation is the
179/// same for anyone anyways.
180fn reconstructed_data_matches_root(
181	n_validators: usize,
182	expected_root: &Hash,
183	data: &AvailableData,
184	metrics: &Metrics,
185) -> bool {
186	let _timer = metrics.time_reencode_chunks();
187
188	let chunks = match obtain_chunks_v1(n_validators, data) {
189		Ok(chunks) => chunks,
190		Err(e) => {
191			gum::debug!(
192				target: LOG_TARGET,
193				err = ?e,
194				"Failed to obtain chunks",
195			);
196			return false;
197		},
198	};
199
200	let branches = branches(&chunks);
201
202	branches.root() == *expected_root
203}
204
205/// Accumulate all awaiting sides for some particular `AvailableData`.
206struct RecoveryHandle {
207	candidate_hash: CandidateHash,
208	remote: RemoteHandle<RecoveryResult>,
209	awaiting: Vec<oneshot::Sender<RecoveryResult>>,
210}
211
212impl Future for RecoveryHandle {
213	type Output = Option<(CandidateHash, RecoveryResult)>;
214
215	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216		let mut indices_to_remove = Vec::new();
217		for (i, awaiting) in self.awaiting.iter_mut().enumerate().rev() {
218			if let Poll::Ready(()) = awaiting.poll_canceled(cx) {
219				indices_to_remove.push(i);
220			}
221		}
222
223		// these are reverse order, so remove is fine.
224		for index in indices_to_remove {
225			gum::debug!(
226				target: LOG_TARGET,
227				candidate_hash = ?self.candidate_hash,
228				"Receiver for available data dropped.",
229			);
230
231			self.awaiting.swap_remove(index);
232		}
233
234		if self.awaiting.is_empty() {
235			gum::debug!(
236				target: LOG_TARGET,
237				candidate_hash = ?self.candidate_hash,
238				"All receivers for available data dropped.",
239			);
240
241			return Poll::Ready(None);
242		}
243
244		let remote = &mut self.remote;
245		futures::pin_mut!(remote);
246		let result = futures::ready!(remote.poll(cx));
247
248		for awaiting in self.awaiting.drain(..) {
249			let _ = awaiting.send(result.clone());
250		}
251
252		Poll::Ready(Some((self.candidate_hash, result)))
253	}
254}
255
256/// Cached result of an availability recovery operation.
257#[derive(Debug, Clone)]
258enum CachedRecovery {
259	/// Availability was successfully retrieved before.
260	Valid(AvailableData),
261	/// Availability was successfully retrieved before, but was found to be invalid.
262	Invalid,
263}
264
265impl CachedRecovery {
266	/// Convert back to	`Result` to deliver responses.
267	fn into_result(self) -> RecoveryResult {
268		match self {
269			Self::Valid(d) => Ok(d),
270			Self::Invalid => Err(RecoveryError::Invalid),
271		}
272	}
273}
274
275impl TryFrom<RecoveryResult> for CachedRecovery {
276	type Error = ();
277	fn try_from(o: RecoveryResult) -> std::result::Result<CachedRecovery, Self::Error> {
278		match o {
279			Ok(d) => Ok(Self::Valid(d)),
280			Err(RecoveryError::Invalid) => Ok(Self::Invalid),
281			// We don't want to cache unavailable state, as that state might change, so if
282			// requested again we want to try again!
283			Err(RecoveryError::Unavailable) => Err(()),
284			Err(RecoveryError::ChannelClosed) => Err(()),
285		}
286	}
287}
288
289struct State {
290	/// Each recovery task is implemented as its own async task,
291	/// and these handles are for communicating with them.
292	ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
293
294	/// A recent block hash for which state should be available.
295	live_block: (BlockNumber, Hash),
296
297	/// An LRU cache of recently recovered data.
298	availability_lru: LruMap<CandidateHash, CachedRecovery>,
299
300	/// Cached runtime info.
301	runtime_info: RuntimeInfo,
302}
303
304impl Default for State {
305	fn default() -> Self {
306		Self {
307			ongoing_recoveries: FuturesUnordered::new(),
308			live_block: (0, Hash::default()),
309			availability_lru: LruMap::new(ByLength::new(LRU_SIZE)),
310			runtime_info: RuntimeInfo::new(None),
311		}
312	}
313}
314
315#[overseer::subsystem(AvailabilityRecovery, error=SubsystemError, prefix=self::overseer)]
316impl<Context> AvailabilityRecoverySubsystem {
317	fn start(self, ctx: Context) -> SpawnedSubsystem {
318		let future = self
319			.run(ctx)
320			.map_err(|e| SubsystemError::with_origin("availability-recovery", e))
321			.boxed();
322		SpawnedSubsystem { name: "availability-recovery-subsystem", future }
323	}
324}
325
326/// Handles a signal from the overseer.
327/// Returns true if subsystem receives a deadly signal.
328async fn handle_signal(state: &mut State, signal: OverseerSignal) -> bool {
329	match signal {
330		OverseerSignal::Conclude => true,
331		OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
332			// if activated is non-empty, set state.live_block to the highest block in `activated`
333			if let Some(activated) = activated {
334				if activated.number > state.live_block.0 {
335					state.live_block = (activated.number, activated.hash)
336				}
337			}
338
339			false
340		},
341		OverseerSignal::BlockFinalized(_, _) => false,
342	}
343}
344
345/// Machinery around launching recovery tasks into the background.
346#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
347async fn launch_recovery_task<Context>(
348	state: &mut State,
349	ctx: &mut Context,
350	response_sender: oneshot::Sender<RecoveryResult>,
351	recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
352	params: RecoveryParams,
353) -> Result<()> {
354	let candidate_hash = params.candidate_hash;
355	let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
356
357	let (remote, remote_handle) = recovery_task.run().remote_handle();
358
359	state.ongoing_recoveries.push(RecoveryHandle {
360		candidate_hash,
361		remote: remote_handle,
362		awaiting: vec![response_sender],
363	});
364
365	ctx.spawn("recovery-task", Box::pin(remote))
366		.map_err(|err| Error::SpawnTask(err))
367}
368
369/// Handles an availability recovery request.
370#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
371async fn handle_recover<Context>(
372	state: &mut State,
373	ctx: &mut Context,
374	receipt: CandidateReceipt,
375	session_index: SessionIndex,
376	backing_group: Option<GroupIndex>,
377	response_sender: oneshot::Sender<RecoveryResult>,
378	metrics: &Metrics,
379	erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
380	recovery_strategy_kind: RecoveryStrategyKind,
381	bypass_availability_store: bool,
382	post_recovery_check: PostRecoveryCheck,
383	maybe_core_index: Option<CoreIndex>,
384	req_v1_protocol_name: ProtocolName,
385	req_v2_protocol_name: ProtocolName,
386) -> Result<()> {
387	let candidate_hash = receipt.hash();
388
389	if let Some(result) =
390		state.availability_lru.get(&candidate_hash).cloned().map(|v| v.into_result())
391	{
392		return response_sender.send(result).map_err(|_| Error::CanceledResponseSender);
393	}
394
395	if let Some(i) =
396		state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash)
397	{
398		i.awaiting.push(response_sender);
399		return Ok(());
400	}
401
402	let session_info_res = state
403		.runtime_info
404		.get_session_info_by_index(ctx.sender(), state.live_block.1, session_index)
405		.await;
406
407	match session_info_res {
408		Ok(ExtendedSessionInfo { session_info, node_features, .. }) => {
409			let mut backer_group = None;
410			let n_validators = session_info.validators.len();
411			let systematic_threshold = systematic_recovery_threshold(n_validators)?;
412			let mut recovery_strategies: VecDeque<
413				Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>,
414			> = VecDeque::with_capacity(3);
415
416			if let Some(backing_group) = backing_group {
417				if let Some(backing_validators) = session_info.validator_groups.get(backing_group) {
418					let mut small_pov_size = true;
419
420					match recovery_strategy_kind {
421						RecoveryStrategyKind::BackersFirstIfSizeLower(fetch_chunks_threshold)
422						| RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
423							fetch_chunks_threshold,
424						) => {
425							// Get our own chunk size to get an estimate of the PoV size.
426							let chunk_size: Result<Option<usize>> =
427								query_chunk_size(ctx, candidate_hash).await;
428							if let Ok(Some(chunk_size)) = chunk_size {
429								let pov_size_estimate = chunk_size * systematic_threshold;
430								small_pov_size = pov_size_estimate < fetch_chunks_threshold;
431
432								if small_pov_size {
433									gum::trace!(
434										target: LOG_TARGET,
435										?candidate_hash,
436										pov_size_estimate,
437										fetch_chunks_threshold,
438										"Prefer fetch from backing group",
439									);
440								}
441							} else {
442								// we have a POV limit but were not able to query the chunk size, so
443								// don't use the backing group.
444								small_pov_size = false;
445							}
446						},
447						_ => {},
448					};
449
450					match (&recovery_strategy_kind, small_pov_size) {
451						(RecoveryStrategyKind::BackersFirstAlways, _)
452						| (RecoveryStrategyKind::BackersFirstIfSizeLower(_), true)
453						| (
454							RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_),
455							true,
456						)
457						| (RecoveryStrategyKind::BackersThenSystematicChunks, _) => {
458							recovery_strategies.push_back(Box::new(FetchFull::new(
459								FetchFullParams { validators: backing_validators.to_vec() },
460							)))
461						},
462						_ => {},
463					};
464
465					backer_group = Some(backing_validators);
466				}
467			}
468
469			let chunk_mapping_enabled = if let Some(&true) = node_features
470				.get(usize::from(node_features::FeatureIndex::AvailabilityChunkMapping as u8))
471				.as_deref()
472			{
473				true
474			} else {
475				false
476			};
477
478			// We can only attempt systematic recovery if we received the core index of the
479			// candidate and chunk mapping is enabled.
480			if let Some(core_index) = maybe_core_index {
481				if matches!(
482					recovery_strategy_kind,
483					RecoveryStrategyKind::BackersThenSystematicChunks
484						| RecoveryStrategyKind::SystematicChunks
485						| RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_)
486				) && chunk_mapping_enabled
487				{
488					let chunk_indices =
489						availability_chunk_indices(node_features, n_validators, core_index)?;
490
491					let chunk_indices: VecDeque<_> = chunk_indices
492						.iter()
493						.enumerate()
494						.map(|(v_index, c_index)| {
495							(
496								*c_index,
497								ValidatorIndex(
498									u32::try_from(v_index)
499										.expect("validator count should not exceed u32"),
500								),
501							)
502						})
503						.collect();
504
505					// Only get the validators according to the threshold.
506					let validators = chunk_indices
507						.clone()
508						.into_iter()
509						.filter(|(c_index, _)| {
510							usize::try_from(c_index.0)
511								.expect("usize is at least u32 bytes on all modern targets.")
512								< systematic_threshold
513						})
514						.collect();
515
516					recovery_strategies.push_back(Box::new(FetchSystematicChunks::new(
517						FetchSystematicChunksParams {
518							validators,
519							backers: backer_group.map(|v| v.to_vec()).unwrap_or_else(|| vec![]),
520						},
521					)));
522				}
523			}
524
525			recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams {
526				n_validators: session_info.validators.len(),
527			})));
528
529			let session_info = session_info.clone();
530
531			let n_validators = session_info.validators.len();
532
533			launch_recovery_task(
534				state,
535				ctx,
536				response_sender,
537				recovery_strategies,
538				RecoveryParams {
539					validator_authority_keys: session_info.discovery_keys.clone(),
540					n_validators,
541					threshold: recovery_threshold(n_validators)?,
542					systematic_threshold,
543					candidate_hash,
544					erasure_root: receipt.descriptor.erasure_root(),
545					metrics: metrics.clone(),
546					bypass_availability_store,
547					post_recovery_check,
548					pov_hash: receipt.descriptor.pov_hash(),
549					req_v1_protocol_name,
550					req_v2_protocol_name,
551					chunk_mapping_enabled,
552					erasure_task_tx,
553				},
554			)
555			.await
556		},
557		Err(_) => {
558			response_sender
559				.send(Err(RecoveryError::Unavailable))
560				.map_err(|_| Error::CanceledResponseSender)?;
561
562			Err(Error::SessionInfoUnavailable(state.live_block.1))
563		},
564	}
565}
566
567/// Queries the full `AvailableData` from av-store.
568#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
569async fn query_full_data<Context>(
570	ctx: &mut Context,
571	candidate_hash: CandidateHash,
572) -> Result<Option<AvailableData>> {
573	let (tx, rx) = oneshot::channel();
574	ctx.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx))
575		.await;
576
577	rx.await.map_err(Error::CanceledQueryFullData)
578}
579
580/// Queries a chunk from av-store.
581#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
582async fn query_chunk_size<Context>(
583	ctx: &mut Context,
584	candidate_hash: CandidateHash,
585) -> Result<Option<usize>> {
586	let (tx, rx) = oneshot::channel();
587	ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
588		.await;
589
590	rx.await.map_err(Error::CanceledQueryFullData)
591}
592
593#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
594impl AvailabilityRecoverySubsystem {
595	/// Create a new instance of `AvailabilityRecoverySubsystem` suitable for collator nodes,
596	/// which never requests the `AvailabilityStoreSubsystem` subsystem and only checks the POV hash
597	/// instead of reencoding the available data.
598	pub fn for_collator(
599		fetch_chunks_threshold: Option<usize>,
600		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
601		req_protocol_names: &ReqProtocolNames,
602		metrics: Metrics,
603	) -> Self {
604		Self {
605			recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(
606				fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
607			),
608			bypass_availability_store: true,
609			post_recovery_check: PostRecoveryCheck::PovHash,
610			req_receiver,
611			metrics,
612			req_v1_protocol_name: req_protocol_names
613				.get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
614			req_v2_protocol_name: req_protocol_names
615				.get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
616		}
617	}
618
619	/// Create an optimised new instance of `AvailabilityRecoverySubsystem` suitable for validator
620	/// nodes, which:
621	/// - for small POVs (over the `fetch_chunks_threshold` or the
622	///   `CONSERVATIVE_FETCH_CHUNKS_THRESHOLD`), it attempts full recovery from backers, if backing
623	///   group supplied.
624	/// - for large POVs, attempts systematic recovery, if core_index supplied and
625	///   AvailabilityChunkMapping node feature is enabled.
626	/// - as a last resort, attempt regular chunk recovery from all validators.
627	pub fn for_validator(
628		fetch_chunks_threshold: Option<usize>,
629		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
630		req_protocol_names: &ReqProtocolNames,
631		metrics: Metrics,
632	) -> Self {
633		Self {
634			recovery_strategy_kind:
635				RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
636					fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
637				),
638			bypass_availability_store: false,
639			post_recovery_check: PostRecoveryCheck::Reencode,
640			req_receiver,
641			metrics,
642			req_v1_protocol_name: req_protocol_names
643				.get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
644			req_v2_protocol_name: req_protocol_names
645				.get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
646		}
647	}
648
649	/// Customise the recovery strategy kind
650	/// Currently only useful for tests.
651	#[cfg(any(test, feature = "subsystem-benchmarks"))]
652	pub fn with_recovery_strategy_kind(
653		req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
654		req_protocol_names: &ReqProtocolNames,
655		metrics: Metrics,
656		recovery_strategy_kind: RecoveryStrategyKind,
657	) -> Self {
658		Self {
659			recovery_strategy_kind,
660			bypass_availability_store: false,
661			post_recovery_check: PostRecoveryCheck::Reencode,
662			req_receiver,
663			metrics,
664			req_v1_protocol_name: req_protocol_names
665				.get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
666			req_v2_protocol_name: req_protocol_names
667				.get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
668		}
669	}
670
671	/// Starts the inner subsystem loop.
672	pub async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
673		let mut state = State::default();
674		let Self {
675			mut req_receiver,
676			metrics,
677			recovery_strategy_kind,
678			bypass_availability_store,
679			post_recovery_check,
680			req_v1_protocol_name,
681			req_v2_protocol_name,
682		} = self;
683
684		let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
685		let mut erasure_task_rx = erasure_task_rx.fuse();
686
687		// `ThreadPoolBuilder` spawns the tasks using `spawn_blocking`. For each worker there will
688		// be a `mpsc` channel created. Each of these workers take the `Receiver` and poll it in an
689		// infinite loop. All of the sender ends of the channel are sent as a vec which we then use
690		// to create a `Cycle` iterator. We use this iterator to assign work in a round-robin
691		// fashion to the workers in the pool.
692		//
693		// How work is dispatched to the pool from the recovery tasks:
694		// - Once a recovery task finishes retrieving the availability data, it needs to reconstruct
695		//   from chunks and/or
696		// re-encode the data which are heavy CPU computations.
697		// To do so it sends an `ErasureTask` to the main loop via the `erasure_task` channel, and
698		// waits for the results over a `oneshot` channel.
699		// - In the subsystem main loop we poll the `erasure_task_rx` receiver.
700		// - We forward the received `ErasureTask` to the `next()` sender yielded by the `Cycle`
701		//   iterator.
702		// - Some worker thread handles it and sends the response over the `oneshot` channel.
703
704		// Create a thread pool with 2 workers.
705		let mut to_pool = ThreadPoolBuilder::build(
706			// Pool is guaranteed to have at least 1 worker thread.
707			NonZeroUsize::new(2).expect("There are 2 threads; qed"),
708			metrics.clone(),
709			&mut ctx,
710		)
711		.into_iter()
712		.cycle();
713
714		loop {
715			let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
716			pin_mut!(recv_req);
717			let res = futures::select! {
718				erasure_task = erasure_task_rx.next() => {
719					match erasure_task {
720						Some(task) => {
721							to_pool
722								.next()
723								.expect("Pool size is `NonZeroUsize`; qed")
724								.send(task)
725								.await
726								.map_err(|_| RecoveryError::ChannelClosed)
727						},
728						None => {
729							Err(RecoveryError::ChannelClosed)
730						}
731					}.map_err(Into::into)
732				}
733				signal = ctx.recv().fuse() => {
734					match signal {
735						Ok(signal) => {
736							match signal {
737								FromOrchestra::Signal(signal) => if handle_signal(
738									&mut state,
739									signal,
740								).await {
741									gum::debug!(target: LOG_TARGET, "subsystem concluded");
742									return Ok(());
743								} else {
744									Ok(())
745								},
746								FromOrchestra::Communication {
747									msg: AvailabilityRecoveryMessage::RecoverAvailableData(
748										receipt,
749										session_index,
750										maybe_backing_group,
751										maybe_core_index,
752										response_sender,
753									)
754								} => handle_recover(
755										&mut state,
756										&mut ctx,
757										receipt,
758										session_index,
759										maybe_backing_group,
760										response_sender,
761										&metrics,
762										erasure_task_tx.clone(),
763										recovery_strategy_kind.clone(),
764										bypass_availability_store,
765										post_recovery_check.clone(),
766										maybe_core_index,
767										req_v1_protocol_name.clone(),
768										req_v2_protocol_name.clone(),
769									).await
770							}
771						},
772						Err(e) => Err(Error::SubsystemReceive(e))
773					}
774				}
775				in_req = recv_req => {
776					match in_req {
777						Ok(req) => {
778							if bypass_availability_store {
779								gum::debug!(
780									target: LOG_TARGET,
781									"Skipping request to availability-store.",
782								);
783								let _ = req.send_response(None.into());
784								Ok(())
785							} else {
786								match query_full_data(&mut ctx, req.payload.candidate_hash).await {
787									Ok(res) => {
788										let _ = req.send_response(res.into());
789										Ok(())
790									}
791									Err(e) => {
792										let _ = req.send_response(None.into());
793										Err(e)
794									}
795								}
796							}
797						}
798						Err(e) => Err(Error::IncomingRequest(e))
799					}
800				}
801				output = state.ongoing_recoveries.select_next_some() => {
802					let mut res = Ok(());
803					if let Some((candidate_hash, result)) = output {
804						if let Err(ref e) = result {
805							res = Err(Error::Recovery(e.clone()));
806						}
807
808						if let Ok(recovery) = CachedRecovery::try_from(result) {
809							state.availability_lru.insert(candidate_hash, recovery);
810						}
811					}
812
813					res
814				}
815			};
816
817			// Only bubble up fatal errors, but log all of them.
818			if let Err(e) = res {
819				log_error(Err(e))?;
820			}
821		}
822	}
823}
824
825// A simple thread pool implementation using `spawn_blocking` threads.
826struct ThreadPoolBuilder;
827
828const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) {
829	Some(max_threads) => max_threads,
830	None => panic!("MAX_THREADS must be non-zero"),
831};
832
833impl ThreadPoolBuilder {
834	// Creates a pool of `size` workers, where 1 <= `size` <= `MAX_THREADS`.
835	//
836	// Each worker is created by `spawn_blocking` and takes the receiver side of a channel
837	// while all of the senders are returned to the caller. Each worker runs `erasure_task_thread`
838	// that polls the `Receiver` for an `ErasureTask` which is expected to be CPU intensive. The
839	// larger the input (more or larger chunks/availability data), the more CPU cycles will be
840	// spent.
841	//
842	// For example, for 32KB PoVs, we'd expect re-encode to eat as much as 90ms and 500ms for
843	// 2.5MiB.
844	//
845	// After executing such a task, the worker sends the response via a provided `oneshot` sender.
846	//
847	// The caller is responsible for routing work to the workers.
848	#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
849	pub fn build<Context>(
850		size: NonZeroUsize,
851		metrics: Metrics,
852		ctx: &mut Context,
853	) -> Vec<futures::channel::mpsc::Sender<ErasureTask>> {
854		// At least 1 task, at most `MAX_THREADS.
855		let size = std::cmp::min(size, MAX_THREADS);
856		let mut senders = Vec::new();
857
858		for index in 0..size.into() {
859			let (tx, rx) = futures::channel::mpsc::channel(8);
860			senders.push(tx);
861
862			if let Err(e) = ctx
863				.spawn_blocking("erasure-task", Box::pin(erasure_task_thread(metrics.clone(), rx)))
864			{
865				gum::warn!(
866					target: LOG_TARGET,
867					err = ?e,
868					index,
869					"Failed to spawn a erasure task",
870				);
871			}
872		}
873		senders
874	}
875}
876
877// Handles CPU intensive operation on a dedicated blocking thread.
878async fn erasure_task_thread(
879	metrics: Metrics,
880	mut ingress: futures::channel::mpsc::Receiver<ErasureTask>,
881) {
882	loop {
883		match ingress.next().await {
884			Some(ErasureTask::Reconstruct(n_validators, chunks, sender)) => {
885				let _ = sender.send(pezkuwi_erasure_coding::reconstruct_v1(
886					n_validators,
887					chunks.iter().map(|(c_index, chunk)| {
888						(
889							&chunk[..],
890							usize::try_from(c_index.0)
891								.expect("usize is at least u32 bytes on all modern targets."),
892						)
893					}),
894				));
895			},
896			Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => {
897				let metrics = metrics.clone();
898
899				let maybe_data = if reconstructed_data_matches_root(
900					n_validators,
901					&root,
902					&available_data,
903					&metrics,
904				) {
905					Some(available_data)
906				} else {
907					None
908				};
909
910				let _ = sender.send(maybe_data);
911			},
912			None => {
913				gum::trace!(
914					target: LOG_TARGET,
915					"Erasure task channel closed. Node shutting down ?",
916				);
917				break;
918			},
919		}
920
921		// In benchmarks this is a very hot loop not yielding at all.
922		// To update CPU metrics for the task we need to yield.
923		#[cfg(feature = "subsystem-benchmarks")]
924		tokio::task::yield_now().await;
925	}
926}