1#![warn(missing_docs)]
20
21use std::{
22 collections::{BTreeMap, VecDeque},
23 iter::Iterator,
24 num::NonZeroUsize,
25 pin::Pin,
26};
27
28use futures::{
29 channel::oneshot,
30 future::{Future, FutureExt, RemoteHandle},
31 pin_mut,
32 prelude::*,
33 sink::SinkExt,
34 stream::{FuturesUnordered, StreamExt},
35 task::{Context, Poll},
36};
37use pezsc_network::ProtocolName;
38use schnellru::{ByLength, LruMap};
39use task::{
40 FetchChunks, FetchChunksParams, FetchFull, FetchFullParams, FetchSystematicChunks,
41 FetchSystematicChunksParams,
42};
43
44use pezkuwi_erasure_coding::{
45 branches, obtain_chunks_v1, recovery_threshold, systematic_recovery_threshold,
46 Error as ErasureEncodingError,
47};
48use task::{RecoveryParams, RecoveryStrategy, RecoveryTask};
49
50use error::{log_error, Error, FatalError, Result};
51use pezkuwi_node_network_protocol::{
52 request_response::{
53 v1 as request_v1, v2 as request_v2, IncomingRequestReceiver, IsRequest, ReqProtocolNames,
54 },
55 UnifiedReputationChange as Rep,
56};
57use pezkuwi_node_subsystem::{
58 errors::RecoveryError,
59 messages::{AvailabilityRecoveryMessage, AvailabilityStoreMessage},
60 overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem,
61 SubsystemContext, SubsystemError,
62};
63use pezkuwi_node_subsystem_util::{
64 availability_chunks::availability_chunk_indices,
65 runtime::{ExtendedSessionInfo, RuntimeInfo},
66};
67use pezkuwi_pez_node_primitives::AvailableData;
68use pezkuwi_primitives::{
69 node_features, BlockNumber, CandidateHash, CandidateReceiptV2 as CandidateReceipt, ChunkIndex,
70 CoreIndex, GroupIndex, Hash, SessionIndex, ValidatorIndex,
71};
72
73mod error;
74mod futures_undead;
75mod metrics;
76mod task;
77pub use metrics::Metrics;
78
79#[cfg(test)]
80mod tests;
81
82type RecoveryResult = std::result::Result<AvailableData, RecoveryError>;
83
84const LOG_TARGET: &str = "teyrchain::availability-recovery";
85
86const LRU_SIZE: u32 = 16;
88
89const COST_INVALID_REQUEST: Rep = Rep::CostMajor("Peer sent unparsable request");
90
91pub(crate) const CONSERVATIVE_FETCH_CHUNKS_THRESHOLD: usize = 1 * 1024 * 1024;
93pub const FETCH_CHUNKS_THRESHOLD: usize = 4 * 1024 * 1024;
95
96#[derive(Clone, PartialEq)]
97pub enum RecoveryStrategyKind {
99 BackersFirstIfSizeLower(usize),
102 BackersFirstIfSizeLowerThenSystematicChunks(usize),
105
106 #[allow(dead_code)]
110 BackersFirstAlways,
111 #[allow(dead_code)]
113 ChunksAlways,
114 #[allow(dead_code)]
116 BackersThenSystematicChunks,
117 #[allow(dead_code)]
119 SystematicChunks,
120}
121
122pub struct AvailabilityRecoverySubsystem {
124 recovery_strategy_kind: RecoveryStrategyKind,
126 bypass_availability_store: bool,
131 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
133 metrics: Metrics,
135 post_recovery_check: PostRecoveryCheck,
137 req_v1_protocol_name: ProtocolName,
139 req_v2_protocol_name: ProtocolName,
141}
142
143#[derive(Clone, PartialEq, Debug)]
144enum PostRecoveryCheck {
146 Reencode,
148 PovHash,
150}
151
152enum ErasureTask {
154 Reconstruct(
156 usize,
157 BTreeMap<ChunkIndex, Vec<u8>>,
158 oneshot::Sender<std::result::Result<AvailableData, ErasureEncodingError>>,
159 ),
160 Reencode(usize, Hash, AvailableData, oneshot::Sender<Option<AvailableData>>),
163}
164
165fn reconstructed_data_matches_root(
181 n_validators: usize,
182 expected_root: &Hash,
183 data: &AvailableData,
184 metrics: &Metrics,
185) -> bool {
186 let _timer = metrics.time_reencode_chunks();
187
188 let chunks = match obtain_chunks_v1(n_validators, data) {
189 Ok(chunks) => chunks,
190 Err(e) => {
191 gum::debug!(
192 target: LOG_TARGET,
193 err = ?e,
194 "Failed to obtain chunks",
195 );
196 return false;
197 },
198 };
199
200 let branches = branches(&chunks);
201
202 branches.root() == *expected_root
203}
204
205struct RecoveryHandle {
207 candidate_hash: CandidateHash,
208 remote: RemoteHandle<RecoveryResult>,
209 awaiting: Vec<oneshot::Sender<RecoveryResult>>,
210}
211
212impl Future for RecoveryHandle {
213 type Output = Option<(CandidateHash, RecoveryResult)>;
214
215 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
216 let mut indices_to_remove = Vec::new();
217 for (i, awaiting) in self.awaiting.iter_mut().enumerate().rev() {
218 if let Poll::Ready(()) = awaiting.poll_canceled(cx) {
219 indices_to_remove.push(i);
220 }
221 }
222
223 for index in indices_to_remove {
225 gum::debug!(
226 target: LOG_TARGET,
227 candidate_hash = ?self.candidate_hash,
228 "Receiver for available data dropped.",
229 );
230
231 self.awaiting.swap_remove(index);
232 }
233
234 if self.awaiting.is_empty() {
235 gum::debug!(
236 target: LOG_TARGET,
237 candidate_hash = ?self.candidate_hash,
238 "All receivers for available data dropped.",
239 );
240
241 return Poll::Ready(None);
242 }
243
244 let remote = &mut self.remote;
245 futures::pin_mut!(remote);
246 let result = futures::ready!(remote.poll(cx));
247
248 for awaiting in self.awaiting.drain(..) {
249 let _ = awaiting.send(result.clone());
250 }
251
252 Poll::Ready(Some((self.candidate_hash, result)))
253 }
254}
255
256#[derive(Debug, Clone)]
258enum CachedRecovery {
259 Valid(AvailableData),
261 Invalid,
263}
264
265impl CachedRecovery {
266 fn into_result(self) -> RecoveryResult {
268 match self {
269 Self::Valid(d) => Ok(d),
270 Self::Invalid => Err(RecoveryError::Invalid),
271 }
272 }
273}
274
275impl TryFrom<RecoveryResult> for CachedRecovery {
276 type Error = ();
277 fn try_from(o: RecoveryResult) -> std::result::Result<CachedRecovery, Self::Error> {
278 match o {
279 Ok(d) => Ok(Self::Valid(d)),
280 Err(RecoveryError::Invalid) => Ok(Self::Invalid),
281 Err(RecoveryError::Unavailable) => Err(()),
284 Err(RecoveryError::ChannelClosed) => Err(()),
285 }
286 }
287}
288
289struct State {
290 ongoing_recoveries: FuturesUnordered<RecoveryHandle>,
293
294 live_block: (BlockNumber, Hash),
296
297 availability_lru: LruMap<CandidateHash, CachedRecovery>,
299
300 runtime_info: RuntimeInfo,
302}
303
304impl Default for State {
305 fn default() -> Self {
306 Self {
307 ongoing_recoveries: FuturesUnordered::new(),
308 live_block: (0, Hash::default()),
309 availability_lru: LruMap::new(ByLength::new(LRU_SIZE)),
310 runtime_info: RuntimeInfo::new(None),
311 }
312 }
313}
314
315#[overseer::subsystem(AvailabilityRecovery, error=SubsystemError, prefix=self::overseer)]
316impl<Context> AvailabilityRecoverySubsystem {
317 fn start(self, ctx: Context) -> SpawnedSubsystem {
318 let future = self
319 .run(ctx)
320 .map_err(|e| SubsystemError::with_origin("availability-recovery", e))
321 .boxed();
322 SpawnedSubsystem { name: "availability-recovery-subsystem", future }
323 }
324}
325
326async fn handle_signal(state: &mut State, signal: OverseerSignal) -> bool {
329 match signal {
330 OverseerSignal::Conclude => true,
331 OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { activated, .. }) => {
332 if let Some(activated) = activated {
334 if activated.number > state.live_block.0 {
335 state.live_block = (activated.number, activated.hash)
336 }
337 }
338
339 false
340 },
341 OverseerSignal::BlockFinalized(_, _) => false,
342 }
343}
344
345#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
347async fn launch_recovery_task<Context>(
348 state: &mut State,
349 ctx: &mut Context,
350 response_sender: oneshot::Sender<RecoveryResult>,
351 recovery_strategies: VecDeque<Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>>,
352 params: RecoveryParams,
353) -> Result<()> {
354 let candidate_hash = params.candidate_hash;
355 let recovery_task = RecoveryTask::new(ctx.sender().clone(), params, recovery_strategies);
356
357 let (remote, remote_handle) = recovery_task.run().remote_handle();
358
359 state.ongoing_recoveries.push(RecoveryHandle {
360 candidate_hash,
361 remote: remote_handle,
362 awaiting: vec![response_sender],
363 });
364
365 ctx.spawn("recovery-task", Box::pin(remote))
366 .map_err(|err| Error::SpawnTask(err))
367}
368
369#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
371async fn handle_recover<Context>(
372 state: &mut State,
373 ctx: &mut Context,
374 receipt: CandidateReceipt,
375 session_index: SessionIndex,
376 backing_group: Option<GroupIndex>,
377 response_sender: oneshot::Sender<RecoveryResult>,
378 metrics: &Metrics,
379 erasure_task_tx: futures::channel::mpsc::Sender<ErasureTask>,
380 recovery_strategy_kind: RecoveryStrategyKind,
381 bypass_availability_store: bool,
382 post_recovery_check: PostRecoveryCheck,
383 maybe_core_index: Option<CoreIndex>,
384 req_v1_protocol_name: ProtocolName,
385 req_v2_protocol_name: ProtocolName,
386) -> Result<()> {
387 let candidate_hash = receipt.hash();
388
389 if let Some(result) =
390 state.availability_lru.get(&candidate_hash).cloned().map(|v| v.into_result())
391 {
392 return response_sender.send(result).map_err(|_| Error::CanceledResponseSender);
393 }
394
395 if let Some(i) =
396 state.ongoing_recoveries.iter_mut().find(|i| i.candidate_hash == candidate_hash)
397 {
398 i.awaiting.push(response_sender);
399 return Ok(());
400 }
401
402 let session_info_res = state
403 .runtime_info
404 .get_session_info_by_index(ctx.sender(), state.live_block.1, session_index)
405 .await;
406
407 match session_info_res {
408 Ok(ExtendedSessionInfo { session_info, node_features, .. }) => {
409 let mut backer_group = None;
410 let n_validators = session_info.validators.len();
411 let systematic_threshold = systematic_recovery_threshold(n_validators)?;
412 let mut recovery_strategies: VecDeque<
413 Box<dyn RecoveryStrategy<<Context as SubsystemContext>::Sender>>,
414 > = VecDeque::with_capacity(3);
415
416 if let Some(backing_group) = backing_group {
417 if let Some(backing_validators) = session_info.validator_groups.get(backing_group) {
418 let mut small_pov_size = true;
419
420 match recovery_strategy_kind {
421 RecoveryStrategyKind::BackersFirstIfSizeLower(fetch_chunks_threshold)
422 | RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
423 fetch_chunks_threshold,
424 ) => {
425 let chunk_size: Result<Option<usize>> =
427 query_chunk_size(ctx, candidate_hash).await;
428 if let Ok(Some(chunk_size)) = chunk_size {
429 let pov_size_estimate = chunk_size * systematic_threshold;
430 small_pov_size = pov_size_estimate < fetch_chunks_threshold;
431
432 if small_pov_size {
433 gum::trace!(
434 target: LOG_TARGET,
435 ?candidate_hash,
436 pov_size_estimate,
437 fetch_chunks_threshold,
438 "Prefer fetch from backing group",
439 );
440 }
441 } else {
442 small_pov_size = false;
445 }
446 },
447 _ => {},
448 };
449
450 match (&recovery_strategy_kind, small_pov_size) {
451 (RecoveryStrategyKind::BackersFirstAlways, _)
452 | (RecoveryStrategyKind::BackersFirstIfSizeLower(_), true)
453 | (
454 RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_),
455 true,
456 )
457 | (RecoveryStrategyKind::BackersThenSystematicChunks, _) => {
458 recovery_strategies.push_back(Box::new(FetchFull::new(
459 FetchFullParams { validators: backing_validators.to_vec() },
460 )))
461 },
462 _ => {},
463 };
464
465 backer_group = Some(backing_validators);
466 }
467 }
468
469 let chunk_mapping_enabled = if let Some(&true) = node_features
470 .get(usize::from(node_features::FeatureIndex::AvailabilityChunkMapping as u8))
471 .as_deref()
472 {
473 true
474 } else {
475 false
476 };
477
478 if let Some(core_index) = maybe_core_index {
481 if matches!(
482 recovery_strategy_kind,
483 RecoveryStrategyKind::BackersThenSystematicChunks
484 | RecoveryStrategyKind::SystematicChunks
485 | RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(_)
486 ) && chunk_mapping_enabled
487 {
488 let chunk_indices =
489 availability_chunk_indices(node_features, n_validators, core_index)?;
490
491 let chunk_indices: VecDeque<_> = chunk_indices
492 .iter()
493 .enumerate()
494 .map(|(v_index, c_index)| {
495 (
496 *c_index,
497 ValidatorIndex(
498 u32::try_from(v_index)
499 .expect("validator count should not exceed u32"),
500 ),
501 )
502 })
503 .collect();
504
505 let validators = chunk_indices
507 .clone()
508 .into_iter()
509 .filter(|(c_index, _)| {
510 usize::try_from(c_index.0)
511 .expect("usize is at least u32 bytes on all modern targets.")
512 < systematic_threshold
513 })
514 .collect();
515
516 recovery_strategies.push_back(Box::new(FetchSystematicChunks::new(
517 FetchSystematicChunksParams {
518 validators,
519 backers: backer_group.map(|v| v.to_vec()).unwrap_or_else(|| vec![]),
520 },
521 )));
522 }
523 }
524
525 recovery_strategies.push_back(Box::new(FetchChunks::new(FetchChunksParams {
526 n_validators: session_info.validators.len(),
527 })));
528
529 let session_info = session_info.clone();
530
531 let n_validators = session_info.validators.len();
532
533 launch_recovery_task(
534 state,
535 ctx,
536 response_sender,
537 recovery_strategies,
538 RecoveryParams {
539 validator_authority_keys: session_info.discovery_keys.clone(),
540 n_validators,
541 threshold: recovery_threshold(n_validators)?,
542 systematic_threshold,
543 candidate_hash,
544 erasure_root: receipt.descriptor.erasure_root(),
545 metrics: metrics.clone(),
546 bypass_availability_store,
547 post_recovery_check,
548 pov_hash: receipt.descriptor.pov_hash(),
549 req_v1_protocol_name,
550 req_v2_protocol_name,
551 chunk_mapping_enabled,
552 erasure_task_tx,
553 },
554 )
555 .await
556 },
557 Err(_) => {
558 response_sender
559 .send(Err(RecoveryError::Unavailable))
560 .map_err(|_| Error::CanceledResponseSender)?;
561
562 Err(Error::SessionInfoUnavailable(state.live_block.1))
563 },
564 }
565}
566
567#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
569async fn query_full_data<Context>(
570 ctx: &mut Context,
571 candidate_hash: CandidateHash,
572) -> Result<Option<AvailableData>> {
573 let (tx, rx) = oneshot::channel();
574 ctx.send_message(AvailabilityStoreMessage::QueryAvailableData(candidate_hash, tx))
575 .await;
576
577 rx.await.map_err(Error::CanceledQueryFullData)
578}
579
580#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
582async fn query_chunk_size<Context>(
583 ctx: &mut Context,
584 candidate_hash: CandidateHash,
585) -> Result<Option<usize>> {
586 let (tx, rx) = oneshot::channel();
587 ctx.send_message(AvailabilityStoreMessage::QueryChunkSize(candidate_hash, tx))
588 .await;
589
590 rx.await.map_err(Error::CanceledQueryFullData)
591}
592
593#[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
594impl AvailabilityRecoverySubsystem {
595 pub fn for_collator(
599 fetch_chunks_threshold: Option<usize>,
600 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
601 req_protocol_names: &ReqProtocolNames,
602 metrics: Metrics,
603 ) -> Self {
604 Self {
605 recovery_strategy_kind: RecoveryStrategyKind::BackersFirstIfSizeLower(
606 fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
607 ),
608 bypass_availability_store: true,
609 post_recovery_check: PostRecoveryCheck::PovHash,
610 req_receiver,
611 metrics,
612 req_v1_protocol_name: req_protocol_names
613 .get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
614 req_v2_protocol_name: req_protocol_names
615 .get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
616 }
617 }
618
619 pub fn for_validator(
628 fetch_chunks_threshold: Option<usize>,
629 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
630 req_protocol_names: &ReqProtocolNames,
631 metrics: Metrics,
632 ) -> Self {
633 Self {
634 recovery_strategy_kind:
635 RecoveryStrategyKind::BackersFirstIfSizeLowerThenSystematicChunks(
636 fetch_chunks_threshold.unwrap_or(CONSERVATIVE_FETCH_CHUNKS_THRESHOLD),
637 ),
638 bypass_availability_store: false,
639 post_recovery_check: PostRecoveryCheck::Reencode,
640 req_receiver,
641 metrics,
642 req_v1_protocol_name: req_protocol_names
643 .get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
644 req_v2_protocol_name: req_protocol_names
645 .get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
646 }
647 }
648
649 #[cfg(any(test, feature = "subsystem-benchmarks"))]
652 pub fn with_recovery_strategy_kind(
653 req_receiver: IncomingRequestReceiver<request_v1::AvailableDataFetchingRequest>,
654 req_protocol_names: &ReqProtocolNames,
655 metrics: Metrics,
656 recovery_strategy_kind: RecoveryStrategyKind,
657 ) -> Self {
658 Self {
659 recovery_strategy_kind,
660 bypass_availability_store: false,
661 post_recovery_check: PostRecoveryCheck::Reencode,
662 req_receiver,
663 metrics,
664 req_v1_protocol_name: req_protocol_names
665 .get_name(request_v1::ChunkFetchingRequest::PROTOCOL),
666 req_v2_protocol_name: req_protocol_names
667 .get_name(request_v2::ChunkFetchingRequest::PROTOCOL),
668 }
669 }
670
671 pub async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), FatalError> {
673 let mut state = State::default();
674 let Self {
675 mut req_receiver,
676 metrics,
677 recovery_strategy_kind,
678 bypass_availability_store,
679 post_recovery_check,
680 req_v1_protocol_name,
681 req_v2_protocol_name,
682 } = self;
683
684 let (erasure_task_tx, erasure_task_rx) = futures::channel::mpsc::channel(16);
685 let mut erasure_task_rx = erasure_task_rx.fuse();
686
687 let mut to_pool = ThreadPoolBuilder::build(
706 NonZeroUsize::new(2).expect("There are 2 threads; qed"),
708 metrics.clone(),
709 &mut ctx,
710 )
711 .into_iter()
712 .cycle();
713
714 loop {
715 let recv_req = req_receiver.recv(|| vec![COST_INVALID_REQUEST]).fuse();
716 pin_mut!(recv_req);
717 let res = futures::select! {
718 erasure_task = erasure_task_rx.next() => {
719 match erasure_task {
720 Some(task) => {
721 to_pool
722 .next()
723 .expect("Pool size is `NonZeroUsize`; qed")
724 .send(task)
725 .await
726 .map_err(|_| RecoveryError::ChannelClosed)
727 },
728 None => {
729 Err(RecoveryError::ChannelClosed)
730 }
731 }.map_err(Into::into)
732 }
733 signal = ctx.recv().fuse() => {
734 match signal {
735 Ok(signal) => {
736 match signal {
737 FromOrchestra::Signal(signal) => if handle_signal(
738 &mut state,
739 signal,
740 ).await {
741 gum::debug!(target: LOG_TARGET, "subsystem concluded");
742 return Ok(());
743 } else {
744 Ok(())
745 },
746 FromOrchestra::Communication {
747 msg: AvailabilityRecoveryMessage::RecoverAvailableData(
748 receipt,
749 session_index,
750 maybe_backing_group,
751 maybe_core_index,
752 response_sender,
753 )
754 } => handle_recover(
755 &mut state,
756 &mut ctx,
757 receipt,
758 session_index,
759 maybe_backing_group,
760 response_sender,
761 &metrics,
762 erasure_task_tx.clone(),
763 recovery_strategy_kind.clone(),
764 bypass_availability_store,
765 post_recovery_check.clone(),
766 maybe_core_index,
767 req_v1_protocol_name.clone(),
768 req_v2_protocol_name.clone(),
769 ).await
770 }
771 },
772 Err(e) => Err(Error::SubsystemReceive(e))
773 }
774 }
775 in_req = recv_req => {
776 match in_req {
777 Ok(req) => {
778 if bypass_availability_store {
779 gum::debug!(
780 target: LOG_TARGET,
781 "Skipping request to availability-store.",
782 );
783 let _ = req.send_response(None.into());
784 Ok(())
785 } else {
786 match query_full_data(&mut ctx, req.payload.candidate_hash).await {
787 Ok(res) => {
788 let _ = req.send_response(res.into());
789 Ok(())
790 }
791 Err(e) => {
792 let _ = req.send_response(None.into());
793 Err(e)
794 }
795 }
796 }
797 }
798 Err(e) => Err(Error::IncomingRequest(e))
799 }
800 }
801 output = state.ongoing_recoveries.select_next_some() => {
802 let mut res = Ok(());
803 if let Some((candidate_hash, result)) = output {
804 if let Err(ref e) = result {
805 res = Err(Error::Recovery(e.clone()));
806 }
807
808 if let Ok(recovery) = CachedRecovery::try_from(result) {
809 state.availability_lru.insert(candidate_hash, recovery);
810 }
811 }
812
813 res
814 }
815 };
816
817 if let Err(e) = res {
819 log_error(Err(e))?;
820 }
821 }
822 }
823}
824
825struct ThreadPoolBuilder;
827
828const MAX_THREADS: NonZeroUsize = match NonZeroUsize::new(4) {
829 Some(max_threads) => max_threads,
830 None => panic!("MAX_THREADS must be non-zero"),
831};
832
833impl ThreadPoolBuilder {
834 #[overseer::contextbounds(AvailabilityRecovery, prefix = self::overseer)]
849 pub fn build<Context>(
850 size: NonZeroUsize,
851 metrics: Metrics,
852 ctx: &mut Context,
853 ) -> Vec<futures::channel::mpsc::Sender<ErasureTask>> {
854 let size = std::cmp::min(size, MAX_THREADS);
856 let mut senders = Vec::new();
857
858 for index in 0..size.into() {
859 let (tx, rx) = futures::channel::mpsc::channel(8);
860 senders.push(tx);
861
862 if let Err(e) = ctx
863 .spawn_blocking("erasure-task", Box::pin(erasure_task_thread(metrics.clone(), rx)))
864 {
865 gum::warn!(
866 target: LOG_TARGET,
867 err = ?e,
868 index,
869 "Failed to spawn a erasure task",
870 );
871 }
872 }
873 senders
874 }
875}
876
877async fn erasure_task_thread(
879 metrics: Metrics,
880 mut ingress: futures::channel::mpsc::Receiver<ErasureTask>,
881) {
882 loop {
883 match ingress.next().await {
884 Some(ErasureTask::Reconstruct(n_validators, chunks, sender)) => {
885 let _ = sender.send(pezkuwi_erasure_coding::reconstruct_v1(
886 n_validators,
887 chunks.iter().map(|(c_index, chunk)| {
888 (
889 &chunk[..],
890 usize::try_from(c_index.0)
891 .expect("usize is at least u32 bytes on all modern targets."),
892 )
893 }),
894 ));
895 },
896 Some(ErasureTask::Reencode(n_validators, root, available_data, sender)) => {
897 let metrics = metrics.clone();
898
899 let maybe_data = if reconstructed_data_matches_root(
900 n_validators,
901 &root,
902 &available_data,
903 &metrics,
904 ) {
905 Some(available_data)
906 } else {
907 None
908 };
909
910 let _ = sender.send(maybe_data);
911 },
912 None => {
913 gum::trace!(
914 target: LOG_TARGET,
915 "Erasure task channel closed. Node shutting down ?",
916 );
917 break;
918 },
919 }
920
921 #[cfg(feature = "subsystem-benchmarks")]
924 tokio::task::yield_now().await;
925 }
926}