polkadot_approval_distribution/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! [`ApprovalDistribution`] implementation.
18//!
19//! See the documentation on [approval distribution][approval-distribution-page] in the
20//! implementers' guide.
21//!
22//! [approval-distribution-page]: https://paritytech.github.io/polkadot-sdk/book/node/approval/approval-distribution.html
23
24#![warn(missing_docs)]
25
26use self::metrics::Metrics;
27use futures::{select, FutureExt as _};
28use itertools::Itertools;
29use net_protocol::peer_set::{ProtocolVersion, ValidationVersion};
30use polkadot_node_network_protocol::{
31	self as net_protocol, filter_by_peer_version,
32	grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
33	peer_set::MAX_NOTIFICATION_SIZE,
34	v1 as protocol_v1, v2 as protocol_v2, v3 as protocol_v3, PeerId,
35	UnifiedReputationChange as Rep, Versioned, View,
36};
37use polkadot_node_primitives::{
38	approval::{
39		criteria::{AssignmentCriteria, InvalidAssignment},
40		time::{Clock, ClockExt, SystemClock, TICK_TOO_FAR_IN_FUTURE},
41		v1::{
42			AssignmentCertKind, BlockApprovalMeta, DelayTranche, IndirectAssignmentCert,
43			IndirectSignedApprovalVote, RelayVRFStory,
44		},
45		v2::{
46			AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
47			IndirectSignedApprovalVoteV2,
48		},
49	},
50	DISPUTE_WINDOW,
51};
52use polkadot_node_subsystem::{
53	messages::{
54		ApprovalDistributionMessage, ApprovalVotingMessage, CheckedIndirectAssignment,
55		CheckedIndirectSignedApprovalVote, NetworkBridgeEvent, NetworkBridgeTxMessage,
56		RuntimeApiMessage,
57	},
58	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
59};
60use polkadot_node_subsystem_util::{
61	reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
62	runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
63};
64use polkadot_primitives::{
65	BlockNumber, CandidateHash, CandidateIndex, CoreIndex, DisputeStatement, GroupIndex, Hash,
66	SessionIndex, Slot, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
67};
68use rand::{CryptoRng, Rng, SeedableRng};
69use std::{
70	collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
71	sync::Arc,
72	time::Duration,
73};
74
75/// Approval distribution metrics.
76pub mod metrics;
77
78#[cfg(test)]
79mod tests;
80
81const LOG_TARGET: &str = "parachain::approval-distribution";
82
83const COST_UNEXPECTED_MESSAGE: Rep =
84	Rep::CostMinor("Peer sent an out-of-view assignment or approval");
85const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
86const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep =
87	Rep::CostMinor("The vote was valid but too far in the future");
88const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
89const COST_OVERSIZED_BITFIELD: Rep = Rep::CostMajor("Oversized certificate or candidate bitfield");
90
91const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
92const BENEFIT_VALID_MESSAGE_FIRST: Rep =
93	Rep::BenefitMinorFirst("Valid message with new information");
94
95// Maximum valid size for the `CandidateBitfield` in the assignment messages.
96const MAX_BITFIELD_SIZE: usize = 500;
97
98/// The Approval Distribution subsystem.
99pub struct ApprovalDistribution {
100	metrics: Metrics,
101	slot_duration_millis: u64,
102	clock: Arc<dyn Clock + Send + Sync>,
103	assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
104}
105
106/// Contains recently finalized
107/// or those pruned due to finalization.
108#[derive(Default)]
109struct RecentlyOutdated {
110	buf: VecDeque<Hash>,
111}
112
113impl RecentlyOutdated {
114	fn note_outdated(&mut self, hash: Hash) {
115		const MAX_BUF_LEN: usize = 20;
116
117		self.buf.push_back(hash);
118
119		while self.buf.len() > MAX_BUF_LEN {
120			let _ = self.buf.pop_front();
121		}
122	}
123
124	fn is_recent_outdated(&self, hash: &Hash) -> bool {
125		self.buf.contains(hash)
126	}
127}
128
129// Contains topology routing information for assignments and approvals.
130struct ApprovalRouting {
131	required_routing: RequiredRouting,
132	local: bool,
133	random_routing: RandomRouting,
134	peers_randomly_routed: Vec<PeerId>,
135}
136
137impl ApprovalRouting {
138	fn mark_randomly_sent(&mut self, peer: PeerId) {
139		self.random_routing.inc_sent();
140		self.peers_randomly_routed.push(peer);
141	}
142}
143
144// This struct is responsible for tracking the full state of an assignment and grid routing
145// information.
146struct ApprovalEntry {
147	// The assignment certificate.
148	assignment: IndirectAssignmentCertV2,
149	// The candidates claimed by the certificate. A mapping between bit index and candidate index.
150	assignment_claimed_candidates: CandidateBitfield,
151	// The approval signatures for each `CandidateIndex` claimed by the assignment certificate.
152	approvals: HashMap<CandidateBitfield, IndirectSignedApprovalVoteV2>,
153	// The validator index of the assignment signer.
154	validator_index: ValidatorIndex,
155	// Information required for gossiping to other peers using the grid topology.
156	routing_info: ApprovalRouting,
157}
158
159#[derive(Debug)]
160enum ApprovalEntryError {
161	InvalidValidatorIndex,
162	CandidateIndexOutOfBounds,
163	InvalidCandidateIndex,
164	DuplicateApproval,
165	UnknownAssignment,
166}
167
168impl ApprovalEntry {
169	pub fn new(
170		assignment: IndirectAssignmentCertV2,
171		candidates: CandidateBitfield,
172		routing_info: ApprovalRouting,
173	) -> ApprovalEntry {
174		Self {
175			validator_index: assignment.validator,
176			assignment,
177			approvals: HashMap::new(),
178			assignment_claimed_candidates: candidates,
179			routing_info,
180		}
181	}
182
183	// Create a `MessageSubject` to reference the assignment.
184	pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) {
185		(
186			MessageSubject(
187				block_hash,
188				self.assignment_claimed_candidates.clone(),
189				self.validator_index,
190			),
191			MessageKind::Assignment,
192		)
193	}
194
195	// Updates routing information and returns the previous information if any.
196	pub fn routing_info_mut(&mut self) -> &mut ApprovalRouting {
197		&mut self.routing_info
198	}
199
200	// Get the routing information.
201	pub fn routing_info(&self) -> &ApprovalRouting {
202		&self.routing_info
203	}
204
205	// Update routing information.
206	pub fn update_required_routing(&mut self, required_routing: RequiredRouting) {
207		self.routing_info.required_routing = required_routing;
208	}
209
210	// Tells if this entry assignment covers at least one candidate in the approval
211	pub fn includes_approval_candidates(&self, approval: &IndirectSignedApprovalVoteV2) -> bool {
212		for candidate_index in approval.candidate_indices.iter_ones() {
213			if self.assignment_claimed_candidates.bit_at((candidate_index).as_bit_index()) {
214				return true
215			}
216		}
217		return false
218	}
219
220	// Records a new approval. Returns error if the claimed candidate is not found or we already
221	// have received the approval.
222	pub fn note_approval(
223		&mut self,
224		approval: IndirectSignedApprovalVoteV2,
225	) -> Result<(), ApprovalEntryError> {
226		// First do some sanity checks:
227		// - check validator index matches
228		// - check claimed candidate
229		// - check for duplicate approval
230		if self.validator_index != approval.validator {
231			return Err(ApprovalEntryError::InvalidValidatorIndex)
232		}
233
234		// We need at least one of the candidates in the approval to be in this assignment
235		if !self.includes_approval_candidates(&approval) {
236			return Err(ApprovalEntryError::InvalidCandidateIndex)
237		}
238
239		if self.approvals.contains_key(&approval.candidate_indices) {
240			return Err(ApprovalEntryError::DuplicateApproval)
241		}
242
243		self.approvals.insert(approval.candidate_indices.clone(), approval.clone());
244		Ok(())
245	}
246
247	// Get the assignment certificate and claimed candidates.
248	pub fn assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) {
249		(self.assignment.clone(), self.assignment_claimed_candidates.clone())
250	}
251
252	// Get all approvals for all candidates claimed by the assignment.
253	pub fn approvals(&self) -> Vec<IndirectSignedApprovalVoteV2> {
254		self.approvals.values().cloned().collect::<Vec<_>>()
255	}
256
257	// Get validator index.
258	pub fn validator_index(&self) -> ValidatorIndex {
259		self.validator_index
260	}
261}
262
263// We keep track of each peer view and protocol version using this struct.
264struct PeerEntry {
265	pub view: View,
266	pub version: ProtocolVersion,
267}
268
269// In case the original grid topology mechanisms don't work on their own, we need to trade bandwidth
270// for protocol liveliness by introducing aggression.
271//
272// Aggression has 3 levels:
273//
274//  * Aggression Level 0: The basic behaviors described above.
275//  * Aggression Level 1: The originator of a message sends to all peers. Other peers follow the
276//    rules above.
277//  * Aggression Level 2: All peers send all messages to all their row and column neighbors. This
278//    means that each validator will, on average, receive each message approximately `2*sqrt(n)`
279//    times.
280// The aggression level of messages pertaining to a block increases when that block is unfinalized
281// and is a child of the finalized block.
282// This means that only one block at a time has its messages propagated with aggression > 0.
283//
284// A note on aggression thresholds: changes in propagation apply only to blocks which are the
285// _direct descendants_ of the finalized block which are older than the given threshold,
286// not to all blocks older than the threshold. Most likely, a few assignments struggle to
287// be propagated in a single block and this holds up all of its descendants blocks.
288// Accordingly, we only step on the gas for the block which is most obviously holding up finality.
289/// Aggression configuration representation
290#[derive(Clone)]
291struct AggressionConfig {
292	/// Aggression level 1: all validators send all their own messages to all peers.
293	l1_threshold: Option<BlockNumber>,
294	/// Aggression level 2: level 1 + all validators send all messages to all peers in the X and Y
295	/// dimensions.
296	l2_threshold: Option<BlockNumber>,
297	/// How often to re-send messages to all targeted recipients.
298	/// This applies to all unfinalized blocks.
299	resend_unfinalized_period: Option<BlockNumber>,
300}
301
302impl AggressionConfig {
303	/// Returns `true` if age is past threshold depending on the aggression level
304	fn should_trigger_aggression(&self, age: BlockNumber) -> bool {
305		if let Some(t) = self.l1_threshold {
306			age >= t
307		} else if let Some(t) = self.resend_unfinalized_period {
308			age > 0 && age % t == 0
309		} else {
310			false
311		}
312	}
313}
314
315impl Default for AggressionConfig {
316	fn default() -> Self {
317		AggressionConfig {
318			l1_threshold: Some(16),
319			l2_threshold: Some(64),
320			resend_unfinalized_period: Some(8),
321		}
322	}
323}
324
325#[derive(PartialEq)]
326enum Resend {
327	Yes,
328	No,
329}
330
331/// The [`State`] struct is responsible for tracking the overall state of the subsystem.
332///
333/// It tracks metadata about our view of the unfinalized chain,
334/// which assignments and approvals we have seen, and our peers' views.
335#[derive(Default)]
336pub struct State {
337	/// These two fields are used in conjunction to construct a view over the unfinalized chain.
338	blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
339	blocks: HashMap<Hash, BlockEntry>,
340
341	/// Our view updates to our peers can race with `NewBlocks` updates. We store messages received
342	/// against the directly mentioned blocks in our view in this map until `NewBlocks` is
343	/// received.
344	///
345	/// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't
346	/// delayed by more than a block length, this strategy will work well for mitigating the race.
347	/// This is also a race that occurs typically on local networks.
348	pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
349
350	/// Peer data is partially stored here, and partially inline within the [`BlockEntry`]s
351	peer_views: HashMap<PeerId, PeerEntry>,
352
353	/// Keeps a topology for various different sessions.
354	topologies: SessionGridTopologies,
355
356	/// Tracks recently finalized blocks.
357	recent_outdated_blocks: RecentlyOutdated,
358
359	/// Aggression configuration.
360	aggression_config: AggressionConfig,
361
362	/// Current approval checking finality lag.
363	approval_checking_lag: BlockNumber,
364
365	/// Aggregated reputation change
366	reputation: ReputationAggregator,
367
368	/// Slot duration in millis
369	slot_duration_millis: u64,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq)]
373enum MessageKind {
374	Assignment,
375	Approval,
376}
377
378// Utility structure to identify assignments and approvals for specific candidates.
379// Assignments can span multiple candidates, while approvals refer to only one candidate.
380//
381#[derive(Debug, Clone, Hash, PartialEq, Eq)]
382struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex);
383
384#[derive(Debug, Clone, Default)]
385struct Knowledge {
386	// When there is no entry, this means the message is unknown
387	// When there is an entry with `MessageKind::Assignment`, the assignment is known.
388	// When there is an entry with `MessageKind::Approval`, the assignment and approval are known.
389	known_messages: HashMap<MessageSubject, MessageKind>,
390}
391
392impl Knowledge {
393	fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
394		match (kind, self.known_messages.get(message)) {
395			(_, None) => false,
396			(MessageKind::Assignment, Some(_)) => true,
397			(MessageKind::Approval, Some(MessageKind::Assignment)) => false,
398			(MessageKind::Approval, Some(MessageKind::Approval)) => true,
399		}
400	}
401
402	fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool {
403		let mut success = match self.known_messages.entry(message.clone()) {
404			hash_map::Entry::Vacant(vacant) => {
405				vacant.insert(kind);
406				// If there are multiple candidates assigned in the message, create
407				// separate entries for each one.
408				true
409			},
410			hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) {
411				(MessageKind::Assignment, MessageKind::Assignment) => false,
412				(MessageKind::Approval, MessageKind::Approval) => false,
413				(MessageKind::Approval, MessageKind::Assignment) => false,
414				(MessageKind::Assignment, MessageKind::Approval) => {
415					*occupied.get_mut() = MessageKind::Approval;
416					true
417				},
418			},
419		};
420
421		// In case of successful insertion of multiple candidate assignments create additional
422		// entries for each assigned candidate. This fakes knowledge of individual assignments, but
423		// we need to share the same `MessageSubject` with the followup approval candidate index.
424		if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 {
425			for candidate_index in message.1.iter_ones() {
426				success = success &&
427					self.insert(
428						MessageSubject(
429							message.0,
430							vec![candidate_index as u32].try_into().expect("Non-empty vec; qed"),
431							message.2,
432						),
433						kind,
434					);
435			}
436		}
437		success
438	}
439}
440
441/// Information that has been circulated to and from a peer.
442#[derive(Debug, Clone, Default)]
443struct PeerKnowledge {
444	/// The knowledge we've sent to the peer.
445	sent: Knowledge,
446	/// The knowledge we've received from the peer.
447	received: Knowledge,
448}
449
450impl PeerKnowledge {
451	fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
452		self.sent.contains(message, kind) || self.received.contains(message, kind)
453	}
454
455	// Generate the knowledge keys for querying if all assignments of an approval are known
456	// by this peer.
457	fn generate_assignments_keys(
458		approval: &IndirectSignedApprovalVoteV2,
459	) -> Vec<(MessageSubject, MessageKind)> {
460		approval
461			.candidate_indices
462			.iter_ones()
463			.map(|candidate_index| {
464				(
465					MessageSubject(
466						approval.block_hash,
467						(candidate_index as CandidateIndex).into(),
468						approval.validator,
469					),
470					MessageKind::Assignment,
471				)
472			})
473			.collect_vec()
474	}
475
476	// Generate the knowledge keys for querying if an approval is known by peer.
477	fn generate_approval_key(
478		approval: &IndirectSignedApprovalVoteV2,
479	) -> (MessageSubject, MessageKind) {
480		(
481			MessageSubject(
482				approval.block_hash,
483				approval.candidate_indices.clone(),
484				approval.validator,
485			),
486			MessageKind::Approval,
487		)
488	}
489}
490
491/// Information about blocks in our current view as well as whether peers know of them.
492struct BlockEntry {
493	/// Peers who we know are aware of this block and thus, the candidates within it.
494	/// This maps to their knowledge of messages.
495	known_by: HashMap<PeerId, PeerKnowledge>,
496	/// The number of the block.
497	number: BlockNumber,
498	/// The parent hash of the block.
499	parent_hash: Hash,
500	/// Our knowledge of messages.
501	knowledge: Knowledge,
502	/// A votes entry for each candidate indexed by [`CandidateIndex`].
503	candidates: Vec<CandidateEntry>,
504	/// Information about candidate metadata.
505	candidates_metadata: Vec<(CandidateHash, CoreIndex, GroupIndex)>,
506	/// The session index of this block.
507	session: SessionIndex,
508	/// Approval entries for whole block. These also contain all approvals in the case of multiple
509	/// candidates being claimed by assignments.
510	approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
511	/// The block vrf story.
512	vrf_story: RelayVRFStory,
513	/// The block slot.
514	slot: Slot,
515	/// Backing off from re-sending messages to peers.
516	last_resent_at_block_number: Option<u32>,
517}
518
519impl BlockEntry {
520	// Returns the peer which currently know this block.
521	pub fn known_by(&self) -> Vec<PeerId> {
522		self.known_by.keys().cloned().collect::<Vec<_>>()
523	}
524
525	pub fn insert_approval_entry(&mut self, entry: ApprovalEntry) -> &mut ApprovalEntry {
526		// First map one entry per candidate to the same key we will use in `approval_entries`.
527		// Key is (Validator_index, CandidateBitfield) that links the `ApprovalEntry` to the (K,V)
528		// entry in `candidate_entry.messages`.
529		for claimed_candidate_index in entry.assignment_claimed_candidates.iter_ones() {
530			match self.candidates.get_mut(claimed_candidate_index) {
531				Some(candidate_entry) => {
532					candidate_entry
533						.assignments
534						.entry(entry.validator_index())
535						.or_insert(entry.assignment_claimed_candidates.clone());
536				},
537				None => {
538					// This should never happen, but if it happens, it means the subsystem is
539					// broken.
540					gum::warn!(
541						target: LOG_TARGET,
542						hash = ?entry.assignment.block_hash,
543						?claimed_candidate_index,
544						"Missing candidate entry on `import_and_circulate_assignment`",
545					);
546				},
547			};
548		}
549
550		self.approval_entries
551			.entry((entry.validator_index, entry.assignment_claimed_candidates.clone()))
552			.or_insert(entry)
553	}
554
555	// Tels if all candidate_indices are valid candidates
556	pub fn contains_candidates(&self, candidate_indices: &CandidateBitfield) -> bool {
557		candidate_indices
558			.iter_ones()
559			.all(|candidate_index| self.candidates.get(candidate_index as usize).is_some())
560	}
561
562	// Saves the given approval in all ApprovalEntries that contain an assignment for any of the
563	// candidates in the approval.
564	//
565	// Returns the required routing needed for this approval and the lit of random peers the
566	// covering assignments were sent.
567	pub fn note_approval(
568		&mut self,
569		approval: IndirectSignedApprovalVoteV2,
570	) -> Result<(RequiredRouting, HashSet<PeerId>), ApprovalEntryError> {
571		let mut required_routing: Option<RequiredRouting> = None;
572		let mut peers_randomly_routed_to = HashSet::new();
573
574		if self.candidates.len() < approval.candidate_indices.len() as usize {
575			return Err(ApprovalEntryError::CandidateIndexOutOfBounds)
576		}
577
578		// First determine all assignments bitfields that might be covered by this approval
579		let covered_assignments_bitfields: HashSet<CandidateBitfield> = approval
580			.candidate_indices
581			.iter_ones()
582			.filter_map(|candidate_index| {
583				self.candidates.get_mut(candidate_index).map_or(None, |candidate_entry| {
584					candidate_entry.assignments.get(&approval.validator).cloned()
585				})
586			})
587			.collect();
588
589		// Mark the vote in all approval entries
590		for assignment_bitfield in covered_assignments_bitfields {
591			if let Some(approval_entry) =
592				self.approval_entries.get_mut(&(approval.validator, assignment_bitfield))
593			{
594				approval_entry.note_approval(approval.clone())?;
595				peers_randomly_routed_to
596					.extend(approval_entry.routing_info().peers_randomly_routed.iter());
597
598				if let Some(current_required_routing) = required_routing {
599					required_routing = Some(
600						current_required_routing
601							.combine(approval_entry.routing_info().required_routing),
602					);
603				} else {
604					required_routing = Some(approval_entry.routing_info().required_routing)
605				}
606			}
607		}
608
609		if let Some(required_routing) = required_routing {
610			Ok((required_routing, peers_randomly_routed_to))
611		} else {
612			Err(ApprovalEntryError::UnknownAssignment)
613		}
614	}
615
616	/// Returns the list of approval votes covering this candidate
617	pub fn approval_votes(
618		&self,
619		candidate_index: CandidateIndex,
620	) -> Vec<IndirectSignedApprovalVoteV2> {
621		let result: Option<
622			HashMap<(ValidatorIndex, CandidateBitfield), IndirectSignedApprovalVoteV2>,
623		> = self.candidates.get(candidate_index as usize).map(|candidate_entry| {
624			candidate_entry
625				.assignments
626				.iter()
627				.filter_map(|(validator, assignment_bitfield)| {
628					self.approval_entries.get(&(*validator, assignment_bitfield.clone()))
629				})
630				.flat_map(|approval_entry| {
631					approval_entry
632						.approvals
633						.clone()
634						.into_iter()
635						.filter(|(approved_candidates, _)| {
636							approved_candidates.bit_at(candidate_index.as_bit_index())
637						})
638						.map(|(approved_candidates, vote)| {
639							((approval_entry.validator_index, approved_candidates), vote)
640						})
641				})
642				.collect()
643		});
644
645		result.map(|result| result.into_values().collect_vec()).unwrap_or_default()
646	}
647}
648
649// Information about candidates in the context of a particular block they are included in.
650// In other words, multiple `CandidateEntry`s may exist for the same candidate,
651// if it is included by multiple blocks - this is likely the case when there are forks.
652#[derive(Debug, Default)]
653struct CandidateEntry {
654	// The value represents part of the lookup key in `approval_entries` to fetch the assignment
655	// and existing votes.
656	assignments: HashMap<ValidatorIndex, CandidateBitfield>,
657}
658
659#[derive(Debug, Clone, PartialEq)]
660enum MessageSource {
661	Peer(PeerId),
662	Local,
663}
664
665// Encountered error while validating an assignment.
666#[derive(Debug)]
667enum InvalidAssignmentError {
668	// The vrf check for the assignment failed.
669	#[allow(dead_code)]
670	CryptoCheckFailed(InvalidAssignment),
671	// The assignment did not claim any valid candidate.
672	NoClaimedCandidates,
673	// Claimed invalid candidate.
674	#[allow(dead_code)]
675	ClaimedInvalidCandidateIndex {
676		claimed_index: usize,
677		max_index: usize,
678	},
679	// The assignment claimes more candidates than the maximum allowed.
680	OversizedClaimedBitfield,
681	// `SessionInfo`  was not found for the block hash in the assignment.
682	#[allow(dead_code)]
683	SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
684}
685
686// Encountered error while validating an approval.
687#[derive(Debug)]
688enum InvalidVoteError {
689	// The candidate index was out of bounds.
690	CandidateIndexOutOfBounds,
691	// The validator index was out of bounds.
692	ValidatorIndexOutOfBounds,
693	// The signature of the vote was invalid.
694	InvalidSignature,
695	// `SessionInfo` was not found for the block hash in the approval.
696	#[allow(dead_code)]
697	SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
698}
699
700impl MessageSource {
701	fn peer_id(&self) -> Option<PeerId> {
702		match self {
703			Self::Peer(id) => Some(*id),
704			Self::Local => None,
705		}
706	}
707}
708
709enum PendingMessage {
710	Assignment(IndirectAssignmentCertV2, CandidateBitfield),
711	Approval(IndirectSignedApprovalVoteV2),
712}
713
714#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
715impl State {
716	/// Build State with specified slot duration.
717	pub fn with_config(slot_duration_millis: u64) -> Self {
718		Self { slot_duration_millis, ..Default::default() }
719	}
720
721	async fn handle_network_msg<
722		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
723		A: overseer::SubsystemSender<ApprovalVotingMessage>,
724		RA: overseer::SubsystemSender<RuntimeApiMessage>,
725	>(
726		&mut self,
727		approval_voting_sender: &mut A,
728		network_sender: &mut N,
729		runtime_api_sender: &mut RA,
730		metrics: &Metrics,
731		event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
732		rng: &mut (impl CryptoRng + Rng),
733		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
734		clock: &(impl Clock + ?Sized),
735		session_info_provider: &mut RuntimeInfo,
736	) {
737		match event {
738			NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
739				gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
740				if let Some(authority_ids) = authority_ids {
741					self.topologies.update_authority_ids(peer_id, &authority_ids);
742				}
743				// insert a blank view if none already present
744				self.peer_views
745					.entry(peer_id)
746					.or_insert(PeerEntry { view: Default::default(), version });
747			},
748			NetworkBridgeEvent::PeerDisconnected(peer_id) => {
749				gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
750				self.peer_views.remove(&peer_id);
751				self.blocks.iter_mut().for_each(|(_hash, entry)| {
752					entry.known_by.remove(&peer_id);
753				})
754			},
755			NetworkBridgeEvent::NewGossipTopology(topology) => {
756				self.handle_new_session_topology(
757					network_sender,
758					topology.session,
759					topology.topology,
760					topology.local_index,
761				)
762				.await;
763			},
764			NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
765				self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await;
766			},
767			NetworkBridgeEvent::OurViewChange(view) => {
768				gum::trace!(target: LOG_TARGET, ?view, "Own view change");
769				for head in view.iter() {
770					if !self.blocks.contains_key(head) {
771						self.pending_known.entry(*head).or_default();
772					}
773				}
774
775				self.pending_known.retain(|h, _| {
776					let live = view.contains(h);
777					if !live {
778						gum::trace!(
779							target: LOG_TARGET,
780							block_hash = ?h,
781							"Cleaning up stale pending messages",
782						);
783					}
784					live
785				});
786			},
787			NetworkBridgeEvent::PeerMessage(peer_id, message) => {
788				self.process_incoming_peer_message(
789					approval_voting_sender,
790					network_sender,
791					runtime_api_sender,
792					metrics,
793					peer_id,
794					message,
795					rng,
796					assignment_criteria,
797					clock,
798					session_info_provider,
799				)
800				.await;
801			},
802			NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
803				gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
804				// If we learn about a new PeerId for an authority ids we need to try to route the
805				// messages that should have sent to that validator according to the topology.
806				if self.topologies.update_authority_ids(peer_id, &authority_ids) {
807					if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) {
808						let intersection = self
809							.blocks_by_number
810							.iter()
811							.filter(|(block_number, _)| *block_number > &view.finalized_number)
812							.flat_map(|(_, hashes)| {
813								hashes.iter().filter(|hash| {
814									self.blocks
815										.get(&hash)
816										.map(|block| block.known_by.get(&peer_id).is_some())
817										.unwrap_or_default()
818								})
819							});
820						let view_intersection =
821							View::new(intersection.cloned(), view.finalized_number);
822						Self::unify_with_peer(
823							network_sender,
824							metrics,
825							&mut self.blocks,
826							&self.topologies,
827							self.peer_views.len(),
828							peer_id,
829							*version,
830							view_intersection,
831							rng,
832							true,
833						)
834						.await;
835					}
836				}
837			},
838		}
839	}
840
841	async fn handle_new_blocks<
842		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
843		A: overseer::SubsystemSender<ApprovalVotingMessage>,
844		RA: overseer::SubsystemSender<RuntimeApiMessage>,
845	>(
846		&mut self,
847		approval_voting_sender: &mut A,
848		network_sender: &mut N,
849		runtime_api_sender: &mut RA,
850		metrics: &Metrics,
851		metas: Vec<BlockApprovalMeta>,
852		rng: &mut (impl CryptoRng + Rng),
853		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
854		clock: &(impl Clock + ?Sized),
855		session_info_provider: &mut RuntimeInfo,
856	) {
857		let mut new_hashes = HashSet::new();
858
859		gum::debug!(
860			target: LOG_TARGET,
861			"Got new blocks {:?}",
862			metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
863		);
864
865		for meta in metas {
866			match self.blocks.entry(meta.hash) {
867				hash_map::Entry::Vacant(entry) => {
868					let candidates_count = meta.candidates.len();
869					let mut candidates = Vec::with_capacity(candidates_count);
870					candidates.resize_with(candidates_count, Default::default);
871
872					entry.insert(BlockEntry {
873						known_by: HashMap::new(),
874						number: meta.number,
875						parent_hash: meta.parent_hash,
876						knowledge: Knowledge::default(),
877						candidates,
878						session: meta.session,
879						approval_entries: HashMap::new(),
880						candidates_metadata: meta.candidates,
881						vrf_story: meta.vrf_story,
882						slot: meta.slot,
883						last_resent_at_block_number: None,
884					});
885
886					self.topologies.inc_session_refs(meta.session);
887
888					new_hashes.insert(meta.hash);
889
890					// In case there are duplicates, we should only set this if the entry
891					// was vacant.
892					self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
893				},
894				_ => continue,
895			}
896		}
897
898		{
899			for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() {
900				let intersection = view.iter().filter(|h| new_hashes.contains(h));
901				let view_intersection = View::new(intersection.cloned(), view.finalized_number);
902				Self::unify_with_peer(
903					network_sender,
904					metrics,
905					&mut self.blocks,
906					&self.topologies,
907					self.peer_views.len(),
908					*peer_id,
909					*version,
910					view_intersection,
911					rng,
912					false,
913				)
914				.await;
915			}
916
917			let pending_now_known = self
918				.pending_known
919				.keys()
920				.filter(|k| self.blocks.contains_key(k))
921				.copied()
922				.collect::<Vec<_>>();
923
924			let to_import = pending_now_known
925				.into_iter()
926				.inspect(|h| {
927					gum::trace!(
928						target: LOG_TARGET,
929						block_hash = ?h,
930						"Extracting pending messages for new block"
931					)
932				})
933				.filter_map(|k| self.pending_known.remove(&k))
934				.flatten()
935				.collect::<Vec<_>>();
936
937			if !to_import.is_empty() {
938				gum::debug!(
939					target: LOG_TARGET,
940					num = to_import.len(),
941					"Processing pending assignment/approvals",
942				);
943
944				let _timer = metrics.time_import_pending_now_known();
945
946				for (peer_id, message) in to_import {
947					match message {
948						PendingMessage::Assignment(assignment, claimed_indices) => {
949							self.import_and_circulate_assignment(
950								approval_voting_sender,
951								network_sender,
952								runtime_api_sender,
953								metrics,
954								MessageSource::Peer(peer_id),
955								assignment,
956								claimed_indices,
957								rng,
958								assignment_criteria,
959								clock,
960								session_info_provider,
961							)
962							.await;
963						},
964						PendingMessage::Approval(approval_vote) => {
965							self.import_and_circulate_approval(
966								approval_voting_sender,
967								network_sender,
968								runtime_api_sender,
969								metrics,
970								MessageSource::Peer(peer_id),
971								approval_vote,
972								session_info_provider,
973							)
974							.await;
975						},
976					}
977				}
978			}
979		}
980
981		self.enable_aggression(network_sender, Resend::Yes, metrics).await;
982	}
983
984	async fn handle_new_session_topology<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
985		&mut self,
986		network_sender: &mut N,
987		session: SessionIndex,
988		topology: SessionGridTopology,
989		local_index: Option<ValidatorIndex>,
990	) {
991		if local_index.is_none() {
992			// this subsystem only matters to validators.
993			return
994		}
995
996		self.topologies.insert_topology(session, topology, local_index);
997		let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
998
999		adjust_required_routing_and_propagate(
1000			network_sender,
1001			&mut self.blocks,
1002			&self.topologies,
1003			|block_entry| block_entry.session == session,
1004			|required_routing, local, validator_index| {
1005				if required_routing == &RequiredRouting::PendingTopology {
1006					topology
1007						.local_grid_neighbors()
1008						.required_routing_by_index(*validator_index, local)
1009				} else {
1010					*required_routing
1011				}
1012			},
1013			&self.peer_views,
1014		)
1015		.await;
1016	}
1017
1018	async fn process_incoming_assignments<A, N, R, RA>(
1019		&mut self,
1020		approval_voting_sender: &mut A,
1021		network_sender: &mut N,
1022		runtime_api_sender: &mut RA,
1023		metrics: &Metrics,
1024		peer_id: PeerId,
1025		assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
1026		rng: &mut R,
1027		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1028		clock: &(impl Clock + ?Sized),
1029		session_info_provider: &mut RuntimeInfo,
1030	) where
1031		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1032		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1033		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1034		R: CryptoRng + Rng,
1035	{
1036		for (assignment, claimed_indices) in assignments {
1037			if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
1038				let block_hash = &assignment.block_hash;
1039				let validator_index = assignment.validator;
1040
1041				gum::trace!(
1042					target: LOG_TARGET,
1043					%peer_id,
1044					?block_hash,
1045					?claimed_indices,
1046					?validator_index,
1047					"Pending assignment",
1048				);
1049
1050				pending.push((peer_id, PendingMessage::Assignment(assignment, claimed_indices)));
1051
1052				continue
1053			}
1054
1055			self.import_and_circulate_assignment(
1056				approval_voting_sender,
1057				network_sender,
1058				runtime_api_sender,
1059				metrics,
1060				MessageSource::Peer(peer_id),
1061				assignment,
1062				claimed_indices,
1063				rng,
1064				assignment_criteria,
1065				clock,
1066				session_info_provider,
1067			)
1068			.await;
1069		}
1070	}
1071
1072	// Entry point for processing an approval coming from a peer.
1073	async fn process_incoming_approvals<
1074		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1075		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1076		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1077	>(
1078		&mut self,
1079		approval_voting_sender: &mut A,
1080		network_sender: &mut N,
1081		runtime_api_sender: &mut RA,
1082		metrics: &Metrics,
1083		peer_id: PeerId,
1084		approvals: Vec<IndirectSignedApprovalVoteV2>,
1085		session_info_provider: &mut RuntimeInfo,
1086	) {
1087		gum::trace!(
1088			target: LOG_TARGET,
1089			peer_id = %peer_id,
1090			num = approvals.len(),
1091			"Processing approvals from a peer",
1092		);
1093		for approval_vote in approvals.into_iter() {
1094			if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
1095				let block_hash = approval_vote.block_hash;
1096				let validator_index = approval_vote.validator;
1097
1098				gum::trace!(
1099					target: LOG_TARGET,
1100					%peer_id,
1101					?block_hash,
1102					?validator_index,
1103					"Pending assignment candidates {:?}",
1104					approval_vote.candidate_indices,
1105				);
1106
1107				pending.push((peer_id, PendingMessage::Approval(approval_vote)));
1108
1109				continue
1110			}
1111
1112			self.import_and_circulate_approval(
1113				approval_voting_sender,
1114				network_sender,
1115				runtime_api_sender,
1116				metrics,
1117				MessageSource::Peer(peer_id),
1118				approval_vote,
1119				session_info_provider,
1120			)
1121			.await;
1122		}
1123	}
1124
1125	async fn process_incoming_peer_message<A, N, RA, R>(
1126		&mut self,
1127		approval_voting_sender: &mut A,
1128		network_sender: &mut N,
1129		runtime_api_sender: &mut RA,
1130		metrics: &Metrics,
1131		peer_id: PeerId,
1132		msg: Versioned<
1133			protocol_v1::ApprovalDistributionMessage,
1134			protocol_v2::ApprovalDistributionMessage,
1135			protocol_v3::ApprovalDistributionMessage,
1136		>,
1137		rng: &mut R,
1138		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1139		clock: &(impl Clock + ?Sized),
1140		session_info_provider: &mut RuntimeInfo,
1141	) where
1142		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1143		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1144		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1145		R: CryptoRng + Rng,
1146	{
1147		match msg {
1148			Versioned::V3(protocol_v3::ApprovalDistributionMessage::Assignments(assignments)) => {
1149				gum::trace!(
1150					target: LOG_TARGET,
1151					peer_id = %peer_id,
1152					num = assignments.len(),
1153					"Processing assignments from a peer",
1154				);
1155				let sanitized_assignments =
1156					self.sanitize_v2_assignments(peer_id, network_sender, assignments).await;
1157
1158				self.process_incoming_assignments(
1159					approval_voting_sender,
1160					network_sender,
1161					runtime_api_sender,
1162					metrics,
1163					peer_id,
1164					sanitized_assignments,
1165					rng,
1166					assignment_criteria,
1167					clock,
1168					session_info_provider,
1169				)
1170				.await;
1171			},
1172			Versioned::V1(protocol_v1::ApprovalDistributionMessage::Assignments(assignments)) |
1173			Versioned::V2(protocol_v2::ApprovalDistributionMessage::Assignments(assignments)) => {
1174				gum::trace!(
1175					target: LOG_TARGET,
1176					peer_id = %peer_id,
1177					num = assignments.len(),
1178					"Processing assignments from a peer",
1179				);
1180
1181				let sanitized_assignments =
1182					self.sanitize_v1_assignments(peer_id, network_sender, assignments).await;
1183
1184				self.process_incoming_assignments(
1185					approval_voting_sender,
1186					network_sender,
1187					runtime_api_sender,
1188					metrics,
1189					peer_id,
1190					sanitized_assignments,
1191					rng,
1192					assignment_criteria,
1193					clock,
1194					session_info_provider,
1195				)
1196				.await;
1197			},
1198			Versioned::V3(protocol_v3::ApprovalDistributionMessage::Approvals(approvals)) => {
1199				let sanitized_approvals =
1200					self.sanitize_v2_approvals(peer_id, network_sender, approvals).await;
1201				self.process_incoming_approvals(
1202					approval_voting_sender,
1203					network_sender,
1204					runtime_api_sender,
1205					metrics,
1206					peer_id,
1207					sanitized_approvals,
1208					session_info_provider,
1209				)
1210				.await;
1211			},
1212			Versioned::V1(protocol_v1::ApprovalDistributionMessage::Approvals(approvals)) |
1213			Versioned::V2(protocol_v2::ApprovalDistributionMessage::Approvals(approvals)) => {
1214				let sanitized_approvals =
1215					self.sanitize_v1_approvals(peer_id, network_sender, approvals).await;
1216				self.process_incoming_approvals(
1217					approval_voting_sender,
1218					network_sender,
1219					runtime_api_sender,
1220					metrics,
1221					peer_id,
1222					sanitized_approvals,
1223					session_info_provider,
1224				)
1225				.await;
1226			},
1227		}
1228	}
1229
1230	// handle a peer view change: requires that the peer is already connected
1231	// and has an entry in the `PeerData` struct.
1232	async fn handle_peer_view_change<N: overseer::SubsystemSender<NetworkBridgeTxMessage>, R>(
1233		&mut self,
1234		network_sender: &mut N,
1235		metrics: &Metrics,
1236		peer_id: PeerId,
1237		view: View,
1238		rng: &mut R,
1239	) where
1240		R: CryptoRng + Rng,
1241	{
1242		gum::trace!(target: LOG_TARGET, ?view, "Peer view change");
1243		let finalized_number = view.finalized_number;
1244
1245		let (old_view, protocol_version) =
1246			if let Some(peer_entry) = self.peer_views.get_mut(&peer_id) {
1247				(Some(std::mem::replace(&mut peer_entry.view, view.clone())), peer_entry.version)
1248			} else {
1249				// This shouldn't happen, but if it does we assume protocol version 1.
1250				gum::warn!(
1251					target: LOG_TARGET,
1252					?peer_id,
1253					?view,
1254					"Peer view change for missing `peer_entry`"
1255				);
1256
1257				(None, ValidationVersion::V1.into())
1258			};
1259
1260		let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
1261
1262		// we want to prune every block known_by peer up to (including) view.finalized_number
1263		let blocks = &mut self.blocks;
1264		// the `BTreeMap::range` is constrained by stored keys
1265		// so the loop won't take ages if the new finalized_number skyrockets
1266		// but we need to make sure the range is not empty, otherwise it will panic
1267		// it shouldn't be, we make sure of this in the network bridge
1268		let range = old_finalized_number..=finalized_number;
1269		if !range.is_empty() && !blocks.is_empty() {
1270			self.blocks_by_number
1271				.range(range)
1272				.flat_map(|(_number, hashes)| hashes)
1273				.for_each(|hash| {
1274					if let Some(entry) = blocks.get_mut(hash) {
1275						entry.known_by.remove(&peer_id);
1276					}
1277				});
1278		}
1279
1280		Self::unify_with_peer(
1281			network_sender,
1282			metrics,
1283			&mut self.blocks,
1284			&self.topologies,
1285			self.peer_views.len(),
1286			peer_id,
1287			protocol_version,
1288			view,
1289			rng,
1290			false,
1291		)
1292		.await;
1293	}
1294
1295	async fn handle_block_finalized<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
1296		&mut self,
1297		network_sender: &mut N,
1298		metrics: &Metrics,
1299		finalized_number: BlockNumber,
1300	) {
1301		// we want to prune every block up to (including) finalized_number
1302		// why +1 here?
1303		// split_off returns everything after the given key, including the key
1304		let split_point = finalized_number.saturating_add(1);
1305		let mut old_blocks = self.blocks_by_number.split_off(&split_point);
1306
1307		// after split_off old_blocks actually contains new blocks, we need to swap
1308		std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
1309
1310		// now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too
1311		old_blocks.values().flatten().for_each(|relay_block| {
1312			self.recent_outdated_blocks.note_outdated(*relay_block);
1313			if let Some(block_entry) = self.blocks.remove(relay_block) {
1314				self.topologies.dec_session_refs(block_entry.session);
1315			}
1316		});
1317
1318		// If a block was finalized, this means we may need to move our aggression
1319		// forward to the now oldest block(s).
1320		self.enable_aggression(network_sender, Resend::No, metrics).await;
1321	}
1322
1323	// When finality is lagging as a last resort nodes start sending the messages they have
1324	// multiples times. This means it is safe to accept duplicate messages without punishing the
1325	// peer and reduce the reputation and can end up banning the Peer, which in turn will create
1326	// more no-shows.
1327	fn accept_duplicates_from_validators(
1328		blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1329		topologies: &SessionGridTopologies,
1330		aggression_config: &AggressionConfig,
1331		entry: &BlockEntry,
1332		peer: PeerId,
1333	) -> bool {
1334		let topology = topologies.get_topology(entry.session);
1335		let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
1336		let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);
1337
1338		// Return if we don't have at least 1 block.
1339		let (min_age, max_age) = match (min_age, max_age) {
1340			(Some(min), Some(max)) => (*min, *max),
1341			_ => return false,
1342		};
1343
1344		let age = max_age.saturating_sub(min_age);
1345
1346		aggression_config.should_trigger_aggression(age) &&
1347			topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
1348	}
1349
1350	async fn import_and_circulate_assignment<A, N, RA, R>(
1351		&mut self,
1352		approval_voting_sender: &mut A,
1353		network_sender: &mut N,
1354		runtime_api_sender: &mut RA,
1355		metrics: &Metrics,
1356		source: MessageSource,
1357		assignment: IndirectAssignmentCertV2,
1358		claimed_candidate_indices: CandidateBitfield,
1359		rng: &mut R,
1360		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1361		clock: &(impl Clock + ?Sized),
1362		session_info_provider: &mut RuntimeInfo,
1363	) where
1364		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1365		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1366		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1367		R: CryptoRng + Rng,
1368	{
1369		let block_hash = assignment.block_hash;
1370		let validator_index = assignment.validator;
1371
1372		let entry = match self.blocks.get_mut(&block_hash) {
1373			Some(entry) => entry,
1374			None => {
1375				if let Some(peer_id) = source.peer_id() {
1376					gum::trace!(
1377						target: LOG_TARGET,
1378						?peer_id,
1379						hash = ?block_hash,
1380						?validator_index,
1381						"Unexpected assignment",
1382					);
1383					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1384						modify_reputation(
1385							&mut self.reputation,
1386							network_sender,
1387							peer_id,
1388							COST_UNEXPECTED_MESSAGE,
1389						)
1390						.await;
1391						gum::debug!(target: LOG_TARGET, "Received assignment for invalid block");
1392						metrics.on_assignment_recent_outdated();
1393					}
1394				}
1395				metrics.on_assignment_invalid_block();
1396				return
1397			},
1398		};
1399
1400		// Compute metadata on the assignment.
1401		let (message_subject, message_kind) = (
1402			MessageSubject(block_hash, claimed_candidate_indices.clone(), validator_index),
1403			MessageKind::Assignment,
1404		);
1405
1406		if let Some(peer_id) = source.peer_id() {
1407			// check if our knowledge of the peer already contains this assignment
1408			match entry.known_by.entry(peer_id) {
1409				hash_map::Entry::Occupied(mut peer_knowledge) => {
1410					let peer_knowledge = peer_knowledge.get_mut();
1411					if peer_knowledge.contains(&message_subject, message_kind) {
1412						// wasn't included before
1413						if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
1414							if !Self::accept_duplicates_from_validators(
1415								&self.blocks_by_number,
1416								&self.topologies,
1417								&self.aggression_config,
1418								entry,
1419								peer_id,
1420							) {
1421								gum::debug!(
1422									target: LOG_TARGET,
1423									?peer_id,
1424									?message_subject,
1425									"Duplicate assignment",
1426								);
1427
1428								modify_reputation(
1429									&mut self.reputation,
1430									network_sender,
1431									peer_id,
1432									COST_DUPLICATE_MESSAGE,
1433								)
1434								.await;
1435							}
1436
1437							metrics.on_assignment_duplicate();
1438						} else {
1439							gum::trace!(
1440								target: LOG_TARGET,
1441								?peer_id,
1442								hash = ?block_hash,
1443								?validator_index,
1444								?message_subject,
1445								"We sent the message to the peer while peer was sending it to us. Known race condition.",
1446							);
1447						}
1448						return
1449					}
1450				},
1451				hash_map::Entry::Vacant(_) => {
1452					gum::debug!(
1453						target: LOG_TARGET,
1454						?peer_id,
1455						?message_subject,
1456						"Assignment from a peer is out of view",
1457					);
1458					modify_reputation(
1459						&mut self.reputation,
1460						network_sender,
1461						peer_id,
1462						COST_UNEXPECTED_MESSAGE,
1463					)
1464					.await;
1465					metrics.on_assignment_out_of_view();
1466				},
1467			}
1468
1469			// if the assignment is known to be valid, reward the peer
1470			if entry.knowledge.contains(&message_subject, message_kind) {
1471				modify_reputation(
1472					&mut self.reputation,
1473					network_sender,
1474					peer_id,
1475					BENEFIT_VALID_MESSAGE,
1476				)
1477				.await;
1478				if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1479					gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
1480					peer_knowledge.received.insert(message_subject, message_kind);
1481				}
1482				metrics.on_assignment_good_known();
1483				return
1484			}
1485
1486			let result = Self::check_assignment_valid(
1487				assignment_criteria,
1488				&entry,
1489				&assignment,
1490				&claimed_candidate_indices,
1491				session_info_provider,
1492				runtime_api_sender,
1493			)
1494			.await;
1495
1496			match result {
1497				Ok(checked_assignment) => {
1498					let current_tranche = clock.tranche_now(self.slot_duration_millis, entry.slot);
1499					let too_far_in_future =
1500						current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
1501
1502					if checked_assignment.tranche() >= too_far_in_future {
1503						gum::debug!(
1504							target: LOG_TARGET,
1505							hash = ?block_hash,
1506							?peer_id,
1507							"Got an assignment too far in the future",
1508						);
1509						modify_reputation(
1510							&mut self.reputation,
1511							network_sender,
1512							peer_id,
1513							COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
1514						)
1515						.await;
1516						metrics.on_assignment_far();
1517
1518						return
1519					}
1520
1521					approval_voting_sender
1522						.send_message(ApprovalVotingMessage::ImportAssignment(
1523							checked_assignment,
1524							None,
1525						))
1526						.await;
1527					modify_reputation(
1528						&mut self.reputation,
1529						network_sender,
1530						peer_id,
1531						BENEFIT_VALID_MESSAGE_FIRST,
1532					)
1533					.await;
1534					entry.knowledge.insert(message_subject.clone(), message_kind);
1535					if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1536						peer_knowledge.received.insert(message_subject.clone(), message_kind);
1537					}
1538				},
1539				Err(error) => {
1540					gum::info!(
1541						target: LOG_TARGET,
1542						hash = ?block_hash,
1543						?peer_id,
1544						?error,
1545						"Got a bad assignment from peer",
1546					);
1547					modify_reputation(
1548						&mut self.reputation,
1549						network_sender,
1550						peer_id,
1551						COST_INVALID_MESSAGE,
1552					)
1553					.await;
1554					metrics.on_assignment_bad();
1555					return
1556				},
1557			}
1558		} else {
1559			if !entry.knowledge.insert(message_subject.clone(), message_kind) {
1560				// if we already imported an assignment, there is no need to distribute it again
1561				gum::warn!(
1562					target: LOG_TARGET,
1563					?message_subject,
1564					"Importing locally an already known assignment",
1565				);
1566				return
1567			} else {
1568				gum::debug!(
1569					target: LOG_TARGET,
1570					?message_subject,
1571					"Importing locally a new assignment",
1572				);
1573			}
1574		}
1575
1576		// Invariant: to our knowledge, none of the peers except for the `source` know about the
1577		// assignment.
1578		metrics.on_assignment_imported(&assignment.cert.kind);
1579
1580		let topology = self.topologies.get_topology(entry.session);
1581		let local = source == MessageSource::Local;
1582
1583		let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
1584			t.local_grid_neighbors().required_routing_by_index(validator_index, local)
1585		});
1586		// Peers that we will send the assignment to.
1587		let mut peers = HashSet::new();
1588
1589		let peers_to_route_to = topology
1590			.as_ref()
1591			.map(|t| t.peers_to_route(required_routing))
1592			.unwrap_or_default();
1593
1594		for peer in peers_to_route_to {
1595			if !entry.known_by.contains_key(&peer) {
1596				continue
1597			}
1598
1599			peers.insert(peer);
1600		}
1601
1602		// All the peers that know the relay chain block.
1603		let peers_to_filter = entry.known_by();
1604
1605		let approval_entry = entry.insert_approval_entry(ApprovalEntry::new(
1606			assignment.clone(),
1607			claimed_candidate_indices.clone(),
1608			ApprovalRouting {
1609				required_routing,
1610				local,
1611				random_routing: Default::default(),
1612				peers_randomly_routed: Default::default(),
1613			},
1614		));
1615
1616		// Dispatch the message to all peers in the routing set which
1617		// know the block.
1618		//
1619		// If the topology isn't known yet (race with networking subsystems)
1620		// then messages will be sent when we get it.
1621
1622		let assignments = vec![(assignment, claimed_candidate_indices.clone())];
1623		let n_peers_total = self.peer_views.len();
1624		let source_peer = source.peer_id();
1625
1626		// Filter destination peers
1627		for peer in peers_to_filter.into_iter() {
1628			if Some(peer) == source_peer {
1629				continue
1630			}
1631
1632			if peers.contains(&peer) {
1633				continue
1634			}
1635
1636			if !topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) {
1637				continue
1638			}
1639
1640			// Note: at this point, we haven't received the message from any peers
1641			// other than the source peer, and we just got it, so we haven't sent it
1642			// to any peers either.
1643			let route_random =
1644				approval_entry.routing_info().random_routing.sample(n_peers_total, rng);
1645
1646			if route_random {
1647				approval_entry.routing_info_mut().mark_randomly_sent(peer);
1648				peers.insert(peer);
1649			}
1650
1651			if approval_entry.routing_info().random_routing.is_complete() {
1652				break
1653			}
1654		}
1655
1656		// Add the metadata of the assignment to the knowledge of each peer.
1657		for peer in peers.iter() {
1658			// we already filtered peers above, so this should always be Some
1659			if let Some(peer_knowledge) = entry.known_by.get_mut(peer) {
1660				peer_knowledge.sent.insert(message_subject.clone(), message_kind);
1661			}
1662		}
1663
1664		if !peers.is_empty() {
1665			gum::trace!(
1666				target: LOG_TARGET,
1667				?block_hash,
1668				?claimed_candidate_indices,
1669				local = source.peer_id().is_none(),
1670				num_peers = peers.len(),
1671				"Sending an assignment to peers",
1672			);
1673
1674			let peers = peers
1675				.iter()
1676				.filter_map(|peer_id| {
1677					self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version))
1678				})
1679				.collect::<Vec<_>>();
1680
1681			send_assignments_batched(network_sender, assignments, &peers).await;
1682		}
1683	}
1684
1685	async fn check_assignment_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
1686		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1687		entry: &BlockEntry,
1688		assignment: &IndirectAssignmentCertV2,
1689		claimed_candidate_indices: &CandidateBitfield,
1690		runtime_info: &mut RuntimeInfo,
1691		runtime_api_sender: &mut RA,
1692	) -> Result<CheckedIndirectAssignment, InvalidAssignmentError> {
1693		let ExtendedSessionInfo { ref session_info, .. } = runtime_info
1694			.get_session_info_by_index(runtime_api_sender, assignment.block_hash, entry.session)
1695			.await
1696			.map_err(|err| InvalidAssignmentError::SessionInfoNotFound(err))?;
1697
1698		if claimed_candidate_indices.len() > session_info.n_cores as usize {
1699			return Err(InvalidAssignmentError::OversizedClaimedBitfield)
1700		}
1701
1702		let claimed_cores: Vec<CoreIndex> = claimed_candidate_indices
1703			.iter_ones()
1704			.map(|candidate_index| {
1705				entry.candidates_metadata.get(candidate_index).map(|(_, core, _)| *core).ok_or(
1706					InvalidAssignmentError::ClaimedInvalidCandidateIndex {
1707						claimed_index: candidate_index,
1708						max_index: entry.candidates_metadata.len(),
1709					},
1710				)
1711			})
1712			.collect::<Result<Vec<_>, InvalidAssignmentError>>()?;
1713
1714		let Ok(claimed_cores) = claimed_cores.try_into() else {
1715			return Err(InvalidAssignmentError::NoClaimedCandidates)
1716		};
1717
1718		let backing_groups = claimed_candidate_indices
1719			.iter_ones()
1720			.flat_map(|candidate_index| {
1721				entry.candidates_metadata.get(candidate_index).map(|(_, _, group)| *group)
1722			})
1723			.collect::<Vec<_>>();
1724
1725		assignment_criteria
1726			.check_assignment_cert(
1727				claimed_cores,
1728				assignment.validator,
1729				&polkadot_node_primitives::approval::criteria::Config::from(session_info),
1730				entry.vrf_story.clone(),
1731				&assignment.cert,
1732				backing_groups,
1733			)
1734			.map_err(|err| InvalidAssignmentError::CryptoCheckFailed(err))
1735			.map(|tranche| {
1736				CheckedIndirectAssignment::from_checked(
1737					assignment.clone(),
1738					claimed_candidate_indices.clone(),
1739					tranche,
1740				)
1741			})
1742	}
1743	// Checks if an approval can be processed.
1744	// Returns true if we can continue with processing the approval and false otherwise.
1745	async fn check_approval_can_be_processed<
1746		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1747	>(
1748		network_sender: &mut N,
1749		assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
1750		approval_knowledge_key: &(MessageSubject, MessageKind),
1751		entry: &mut BlockEntry,
1752		blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1753		topologies: &SessionGridTopologies,
1754		aggression_config: &AggressionConfig,
1755		reputation: &mut ReputationAggregator,
1756		peer_id: PeerId,
1757		metrics: &Metrics,
1758	) -> bool {
1759		for message_subject in assignments_knowledge_key {
1760			if !entry.knowledge.contains(&message_subject.0, message_subject.1) {
1761				gum::trace!(
1762					target: LOG_TARGET,
1763					?peer_id,
1764					?message_subject,
1765					"Unknown approval assignment",
1766				);
1767				modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1768					.await;
1769				metrics.on_approval_unknown_assignment();
1770				return false
1771			}
1772		}
1773
1774		// check if our knowledge of the peer already contains this approval
1775		match entry.known_by.entry(peer_id) {
1776			hash_map::Entry::Occupied(mut knowledge) => {
1777				let peer_knowledge = knowledge.get_mut();
1778				if peer_knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1779					if !peer_knowledge
1780						.received
1781						.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
1782					{
1783						if !Self::accept_duplicates_from_validators(
1784							blocks_by_number,
1785							topologies,
1786							aggression_config,
1787							entry,
1788							peer_id,
1789						) {
1790							gum::trace!(
1791								target: LOG_TARGET,
1792								?peer_id,
1793								?approval_knowledge_key,
1794								"Duplicate approval",
1795							);
1796							modify_reputation(
1797								reputation,
1798								network_sender,
1799								peer_id,
1800								COST_DUPLICATE_MESSAGE,
1801							)
1802							.await;
1803						}
1804						metrics.on_approval_duplicate();
1805					}
1806					return false
1807				}
1808			},
1809			hash_map::Entry::Vacant(_) => {
1810				gum::debug!(
1811					target: LOG_TARGET,
1812					?peer_id,
1813					?approval_knowledge_key,
1814					"Approval from a peer is out of view",
1815				);
1816				modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1817					.await;
1818				metrics.on_approval_out_of_view();
1819			},
1820		}
1821
1822		if entry.knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1823			if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1824				peer_knowledge
1825					.received
1826					.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1);
1827			}
1828
1829			// We already processed this approval no need to continue.
1830			gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval");
1831			metrics.on_approval_good_known();
1832			modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await;
1833			false
1834		} else {
1835			true
1836		}
1837	}
1838
1839	async fn import_and_circulate_approval<
1840		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1841		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1842		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1843	>(
1844		&mut self,
1845		approval_voting_sender: &mut A,
1846		network_sender: &mut N,
1847		runtime_api_sender: &mut RA,
1848		metrics: &Metrics,
1849		source: MessageSource,
1850		vote: IndirectSignedApprovalVoteV2,
1851		session_info_provider: &mut RuntimeInfo,
1852	) {
1853		let block_hash = vote.block_hash;
1854		let validator_index = vote.validator;
1855		let candidate_indices = &vote.candidate_indices;
1856		let entry = match self.blocks.get_mut(&block_hash) {
1857			Some(entry) if entry.contains_candidates(&vote.candidate_indices) => entry,
1858			_ => {
1859				if let Some(peer_id) = source.peer_id() {
1860					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1861						gum::debug!(
1862							target: LOG_TARGET,
1863							?peer_id,
1864							?block_hash,
1865							?validator_index,
1866							?candidate_indices,
1867							"Approval from a peer is out of view",
1868						);
1869						modify_reputation(
1870							&mut self.reputation,
1871							network_sender,
1872							peer_id,
1873							COST_UNEXPECTED_MESSAGE,
1874						)
1875						.await;
1876						metrics.on_approval_invalid_block();
1877					} else {
1878						metrics.on_approval_recent_outdated();
1879					}
1880				}
1881				return
1882			},
1883		};
1884
1885		// compute metadata on the assignment.
1886		let assignments_knowledge_keys = PeerKnowledge::generate_assignments_keys(&vote);
1887		let approval_knwowledge_key = PeerKnowledge::generate_approval_key(&vote);
1888
1889		if let Some(peer_id) = source.peer_id() {
1890			if !Self::check_approval_can_be_processed(
1891				network_sender,
1892				&assignments_knowledge_keys,
1893				&approval_knwowledge_key,
1894				entry,
1895				&self.blocks_by_number,
1896				&self.topologies,
1897				&self.aggression_config,
1898				&mut self.reputation,
1899				peer_id,
1900				metrics,
1901			)
1902			.await
1903			{
1904				return
1905			}
1906
1907			let result =
1908				Self::check_vote_valid(&vote, &entry, session_info_provider, runtime_api_sender)
1909					.await;
1910
1911			match result {
1912				Ok(vote) => {
1913					approval_voting_sender
1914						.send_message(ApprovalVotingMessage::ImportApproval(vote, None))
1915						.await;
1916
1917					modify_reputation(
1918						&mut self.reputation,
1919						network_sender,
1920						peer_id,
1921						BENEFIT_VALID_MESSAGE_FIRST,
1922					)
1923					.await;
1924
1925					entry
1926						.knowledge
1927						.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1928					if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1929						peer_knowledge
1930							.received
1931							.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1932					}
1933				},
1934				Err(err) => {
1935					modify_reputation(
1936						&mut self.reputation,
1937						network_sender,
1938						peer_id,
1939						COST_INVALID_MESSAGE,
1940					)
1941					.await;
1942
1943					gum::info!(
1944						target: LOG_TARGET,
1945						?peer_id,
1946						?err,
1947						"Got a bad approval from peer",
1948					);
1949					metrics.on_approval_bad();
1950					return
1951				},
1952			}
1953		} else {
1954			if !entry
1955				.knowledge
1956				.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1)
1957			{
1958				// if we already imported all approvals, there is no need to distribute it again
1959				gum::warn!(
1960					target: LOG_TARGET,
1961					"Importing locally an already known approval",
1962				);
1963				return
1964			} else {
1965				gum::debug!(
1966					target: LOG_TARGET,
1967					"Importing locally a new approval",
1968				);
1969			}
1970		}
1971
1972		let (required_routing, peers_randomly_routed_to) = match entry.note_approval(vote.clone()) {
1973			Ok(required_routing) => required_routing,
1974			Err(err) => {
1975				gum::warn!(
1976					target: LOG_TARGET,
1977					hash = ?block_hash,
1978					validator_index = ?vote.validator,
1979					candidate_bitfield = ?vote.candidate_indices,
1980					?err,
1981					"Possible bug: Vote import failed",
1982				);
1983				metrics.on_approval_bug();
1984				return
1985			},
1986		};
1987
1988		// Invariant: to our knowledge, none of the peers except for the `source` know about the
1989		// approval.
1990		metrics.on_approval_imported();
1991
1992		// Dispatch a ApprovalDistributionV1Message::Approval(vote)
1993		// to all peers required by the topology, with the exception of the source peer.
1994		let topology = self.topologies.get_topology(entry.session);
1995		let source_peer = source.peer_id();
1996
1997		let peer_filter = move |peer| {
1998			if Some(peer) == source_peer.as_ref() {
1999				return false
2000			}
2001
2002			// Here we're leaning on a few behaviors of assignment propagation:
2003			//   1. At this point, the only peer we're aware of which has the approval message is
2004			//      the source peer.
2005			//   2. We have sent the assignment message to every peer in the required routing which
2006			//      is aware of this block _unless_ the peer we originally received the assignment
2007			//      from was part of the required routing. In that case, we've sent the assignment
2008			//      to all aware peers in the required routing _except_ the original source of the
2009			//      assignment. Hence the `in_topology_check`.
2010			//   3. Any randomly selected peers have been sent the assignment already.
2011			let in_topology = topology
2012				.map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
2013			in_topology || peers_randomly_routed_to.contains(peer)
2014		};
2015
2016		let peers = entry
2017			.known_by
2018			.iter()
2019			.filter(|(p, _)| peer_filter(p))
2020			.filter_map(|(p, _)| self.peer_views.get(p).map(|entry| (*p, entry.version)))
2021			.collect::<Vec<_>>();
2022
2023		// Add the metadata of the assignment to the knowledge of each peer.
2024		for peer in peers.iter() {
2025			// we already filtered peers above, so this should always be Some
2026			if let Some(entry) = entry.known_by.get_mut(&peer.0) {
2027				entry.sent.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
2028			}
2029		}
2030
2031		if !peers.is_empty() {
2032			let approvals = vec![vote];
2033			gum::trace!(
2034				target: LOG_TARGET,
2035				?block_hash,
2036				local = source.peer_id().is_none(),
2037				num_peers = peers.len(),
2038				"Sending an approval to peers",
2039			);
2040			send_approvals_batched(network_sender, approvals, &peers).await;
2041		}
2042	}
2043
2044	// Checks if the approval vote is valid.
2045	async fn check_vote_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
2046		vote: &IndirectSignedApprovalVoteV2,
2047		entry: &BlockEntry,
2048		runtime_info: &mut RuntimeInfo,
2049		runtime_api_sender: &mut RA,
2050	) -> Result<CheckedIndirectSignedApprovalVote, InvalidVoteError> {
2051		if vote.candidate_indices.len() > entry.candidates_metadata.len() {
2052			return Err(InvalidVoteError::CandidateIndexOutOfBounds)
2053		}
2054
2055		let candidate_hashes = vote
2056			.candidate_indices
2057			.iter_ones()
2058			.flat_map(|candidate_index| {
2059				entry
2060					.candidates_metadata
2061					.get(candidate_index)
2062					.map(|(candidate_hash, _, _)| *candidate_hash)
2063			})
2064			.collect::<Vec<_>>();
2065
2066		let ExtendedSessionInfo { ref session_info, .. } = runtime_info
2067			.get_session_info_by_index(runtime_api_sender, vote.block_hash, entry.session)
2068			.await
2069			.map_err(|err| InvalidVoteError::SessionInfoNotFound(err))?;
2070
2071		let pubkey = session_info
2072			.validators
2073			.get(vote.validator)
2074			.ok_or(InvalidVoteError::ValidatorIndexOutOfBounds)?;
2075		DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(
2076			candidate_hashes.clone(),
2077		))
2078		.check_signature(
2079			&pubkey,
2080			*candidate_hashes.first().unwrap(),
2081			entry.session,
2082			&vote.signature,
2083		)
2084		.map_err(|_| InvalidVoteError::InvalidSignature)
2085		.map(|_| CheckedIndirectSignedApprovalVote::from_checked(vote.clone()))
2086	}
2087
2088	/// Retrieve approval signatures from state for the given relay block/indices:
2089	fn get_approval_signatures(
2090		&mut self,
2091		indices: HashSet<(Hash, CandidateIndex)>,
2092	) -> HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)> {
2093		let mut all_sigs = HashMap::new();
2094		for (hash, index) in indices {
2095			let block_entry = match self.blocks.get(&hash) {
2096				None => {
2097					gum::debug!(
2098						target: LOG_TARGET,
2099						?hash,
2100						"`get_approval_signatures`: could not find block entry for given hash!"
2101					);
2102					continue
2103				},
2104				Some(e) => e,
2105			};
2106
2107			let sigs = block_entry.approval_votes(index).into_iter().map(|approval| {
2108				(
2109					approval.validator,
2110					(
2111						hash,
2112						approval
2113							.candidate_indices
2114							.iter_ones()
2115							.map(|val| val as CandidateIndex)
2116							.collect_vec(),
2117						approval.signature,
2118					),
2119				)
2120			});
2121			all_sigs.extend(sigs);
2122		}
2123		all_sigs
2124	}
2125
2126	async fn unify_with_peer(
2127		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2128		metrics: &Metrics,
2129		entries: &mut HashMap<Hash, BlockEntry>,
2130		topologies: &SessionGridTopologies,
2131		total_peers: usize,
2132		peer_id: PeerId,
2133		protocol_version: ProtocolVersion,
2134		view: View,
2135		rng: &mut (impl CryptoRng + Rng),
2136		retry_known_blocks: bool,
2137	) {
2138		metrics.on_unify_with_peer();
2139		let _timer = metrics.time_unify_with_peer();
2140
2141		let mut assignments_to_send = Vec::new();
2142		let mut approvals_to_send = Vec::new();
2143
2144		let view_finalized_number = view.finalized_number;
2145		for head in view.into_iter() {
2146			let mut block = head;
2147
2148			// Walk the chain back to last finalized block of the peer view.
2149			loop {
2150				let entry = match entries.get_mut(&block) {
2151					Some(entry) if entry.number > view_finalized_number => entry,
2152					_ => break,
2153				};
2154
2155				// Any peer which is in the `known_by` see and we know its peer_id authority id
2156				// mapping has already been sent all messages it's meant to get for that block and
2157				// all in-scope prior blocks. In case, we just learnt about its peer_id
2158				// authority-id mapping we have to retry sending the messages that should be sent
2159				// to it for all un-finalized blocks.
2160				if entry.known_by.contains_key(&peer_id) && !retry_known_blocks {
2161					break
2162				}
2163
2164				let peer_knowledge = entry.known_by.entry(peer_id).or_default();
2165				let topology = topologies.get_topology(entry.session);
2166
2167				// We want to iterate the `approval_entries` of the block entry as these contain
2168				// all assignments that also link all approval votes.
2169				for approval_entry in entry.approval_entries.values_mut() {
2170					// Propagate the message to all peers in the required routing set OR
2171					// randomly sample peers.
2172					{
2173						let required_routing = approval_entry.routing_info().required_routing;
2174						let routing_info = &mut approval_entry.routing_info_mut();
2175						let rng = &mut *rng;
2176						let mut peer_filter = move |peer_id| {
2177							let in_topology = topology.as_ref().map_or(false, |t| {
2178								t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
2179							});
2180							in_topology || {
2181								if !topology
2182									.map(|topology| topology.is_validator(peer_id))
2183									.unwrap_or(false)
2184								{
2185									return false
2186								}
2187
2188								let route_random =
2189									routing_info.random_routing.sample(total_peers, rng);
2190								if route_random {
2191									routing_info.mark_randomly_sent(*peer_id);
2192								}
2193
2194								route_random
2195							}
2196						};
2197
2198						if !peer_filter(&peer_id) {
2199							continue
2200						}
2201					}
2202
2203					let assignment_message = approval_entry.assignment();
2204					let approval_messages = approval_entry.approvals();
2205					let (assignment_knowledge, message_kind) =
2206						approval_entry.create_assignment_knowledge(block);
2207
2208					// Only send stuff a peer doesn't know in the context of a relay chain
2209					// block.
2210					if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2211						peer_knowledge.sent.insert(assignment_knowledge, message_kind);
2212						assignments_to_send.push(assignment_message);
2213					}
2214
2215					// Filter approval votes.
2216					for approval_message in approval_messages {
2217						let approval_knowledge =
2218							PeerKnowledge::generate_approval_key(&approval_message);
2219
2220						if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2221							approvals_to_send.push(approval_message);
2222							peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2223						}
2224					}
2225				}
2226
2227				block = entry.parent_hash;
2228			}
2229		}
2230
2231		if !assignments_to_send.is_empty() {
2232			gum::trace!(
2233				target: LOG_TARGET,
2234				?peer_id,
2235				?protocol_version,
2236				num = assignments_to_send.len(),
2237				"Sending assignments to unified peer",
2238			);
2239
2240			send_assignments_batched(
2241				sender,
2242				assignments_to_send,
2243				&vec![(peer_id, protocol_version)],
2244			)
2245			.await;
2246		}
2247
2248		if !approvals_to_send.is_empty() {
2249			gum::trace!(
2250				target: LOG_TARGET,
2251				?peer_id,
2252				?protocol_version,
2253				num = approvals_to_send.len(),
2254				"Sending approvals to unified peer",
2255			);
2256
2257			send_approvals_batched(sender, approvals_to_send, &vec![(peer_id, protocol_version)])
2258				.await;
2259		}
2260	}
2261
2262	// It is very important that aggression starts with oldest unfinalized block, rather than oldest
2263	// unapproved block. Using the gossip approach to distribute potentially
2264	// missing votes to validators requires that we always trigger on finality lag, even if
2265	// we have have the approval lag value. The reason for this, is to avoid finality stall
2266	// when more than 1/3 nodes go offline for a period o time. When they come back
2267	// there wouldn't get any of the approvals since the on-line nodes would never trigger
2268	// aggression as they have approved all the candidates and don't detect any approval lag.
2269	//
2270	// In order to switch to using approval lag as a trigger we need a request/response protocol
2271	// to fetch votes from validators rather than use gossip.
2272	async fn enable_aggression<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
2273		&mut self,
2274		network_sender: &mut N,
2275		resend: Resend,
2276		metrics: &Metrics,
2277	) {
2278		let config = self.aggression_config.clone();
2279		let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
2280		let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
2281
2282		// Return if we don't have at least 1 block.
2283		let (min_age, max_age) = match (min_age, max_age) {
2284			(Some(min), Some(max)) => (*min, *max),
2285			_ => return, // empty.
2286		};
2287
2288		let age = max_age.saturating_sub(min_age);
2289
2290		// Trigger on approval checking lag.
2291		if !self.aggression_config.should_trigger_aggression(age) {
2292			gum::trace!(
2293				target: LOG_TARGET,
2294				approval_checking_lag = self.approval_checking_lag,
2295				age,
2296				"Aggression not enabled",
2297			);
2298			return
2299		}
2300		gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",);
2301
2302		adjust_required_routing_and_propagate(
2303			network_sender,
2304			&mut self.blocks,
2305			&self.topologies,
2306			|block_entry| {
2307				let block_age = max_age - block_entry.number;
2308				// We want to resend only for blocks of min_age, there is no point in
2309				// resending for blocks newer than that, because we are just going to create load
2310				// and not gain anything.
2311				let diff_from_min_age = block_entry.number - min_age;
2312
2313				// We want to back-off on resending for blocks that have been resent recently, to
2314				// give time for nodes to process all the extra messages, if we still have not
2315				// finalized we are going to resend again after unfinalized_period * 2 since the
2316				// last resend.
2317				let blocks_since_last_sent = block_entry
2318					.last_resent_at_block_number
2319					.map(|last_resent_at_block_number| max_age - last_resent_at_block_number);
2320
2321				let can_resend_at_this_age = blocks_since_last_sent
2322					.zip(config.resend_unfinalized_period)
2323					.map(|(blocks_since_last_sent, unfinalized_period)| {
2324						blocks_since_last_sent >= unfinalized_period * 2
2325					})
2326					.unwrap_or(true);
2327
2328				if resend == Resend::Yes &&
2329					config.resend_unfinalized_period.as_ref().map_or(false, |p| {
2330						block_age > 0 &&
2331							block_age % p == 0 && diff_from_min_age == 0 &&
2332							can_resend_at_this_age
2333					}) {
2334					// Retry sending to all peers.
2335					for (_, knowledge) in block_entry.known_by.iter_mut() {
2336						knowledge.sent = Knowledge::default();
2337					}
2338					block_entry.last_resent_at_block_number = Some(max_age);
2339					gum::debug!(
2340						target: LOG_TARGET,
2341						block_number = ?block_entry.number,
2342						?max_age,
2343						"Aggression enabled with resend for block",
2344					);
2345					true
2346				} else {
2347					false
2348				}
2349			},
2350			|required_routing, _, _| *required_routing,
2351			&self.peer_views,
2352		)
2353		.await;
2354
2355		adjust_required_routing_and_propagate(
2356			network_sender,
2357			&mut self.blocks,
2358			&self.topologies,
2359			|block_entry| {
2360				// Ramp up aggression only for the very oldest block(s).
2361				// Approval voting can get stuck on a single block preventing
2362				// its descendants from being finalized. Waste minimal bandwidth
2363				// this way. Also, disputes might prevent finality - again, nothing
2364				// to waste bandwidth on newer blocks for.
2365				block_entry.number == min_age
2366			},
2367			|required_routing, local, _| {
2368				// It's a bit surprising not to have a topology at this age.
2369				if *required_routing == RequiredRouting::PendingTopology {
2370					gum::debug!(
2371						target: LOG_TARGET,
2372						lag = ?self.approval_checking_lag,
2373						"Encountered old block pending gossip topology",
2374					);
2375					return *required_routing
2376				}
2377
2378				let mut new_required_routing = *required_routing;
2379
2380				if config.l1_threshold.as_ref().map_or(false, |t| &age >= t) {
2381					// Message originator sends to everyone.
2382					if local && new_required_routing != RequiredRouting::All {
2383						metrics.on_aggression_l1();
2384						new_required_routing = RequiredRouting::All;
2385					}
2386				}
2387
2388				if config.l2_threshold.as_ref().map_or(false, |t| &age >= t) {
2389					// Message originator sends to everyone. Everyone else sends to XY.
2390					if !local && new_required_routing != RequiredRouting::GridXY {
2391						metrics.on_aggression_l2();
2392						new_required_routing = RequiredRouting::GridXY;
2393					}
2394				}
2395				new_required_routing
2396			},
2397			&self.peer_views,
2398		)
2399		.await;
2400	}
2401
2402	// Filter out invalid candidate index and certificate core bitfields.
2403	// For each invalid assignment we also punish the peer.
2404	async fn sanitize_v1_assignments(
2405		&mut self,
2406		peer_id: PeerId,
2407		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2408		assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>,
2409	) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2410		let mut sanitized_assignments = Vec::new();
2411		for (cert, candidate_index) in assignments.into_iter() {
2412			let cert_bitfield_bits = match cert.cert.kind {
2413				AssignmentCertKind::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2414				// We don't want to run the VRF yet, but the output is always bounded by `n_cores`.
2415				// We assume `candidate_bitfield` length for the core bitfield and we just check
2416				// against `MAX_BITFIELD_SIZE` later.
2417				AssignmentCertKind::RelayVRFModulo { .. } => candidate_index as usize + 1,
2418			};
2419
2420			let candidate_bitfield_bits = candidate_index as usize + 1;
2421
2422			// Ensure bitfields length under hard limit.
2423			if cert_bitfield_bits > MAX_BITFIELD_SIZE || candidate_bitfield_bits > MAX_BITFIELD_SIZE
2424			{
2425				// Punish the peer for the invalid message.
2426				modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2427					.await;
2428				gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, kind = ?cert.cert.kind, "Bad assignment v1, invalid candidate index");
2429			} else {
2430				sanitized_assignments.push((cert.into(), candidate_index.into()))
2431			}
2432		}
2433
2434		sanitized_assignments
2435	}
2436
2437	// Filter out oversized candidate and certificate core bitfields.
2438	// For each invalid assignment we also punish the peer.
2439	async fn sanitize_v2_assignments(
2440		&mut self,
2441		peer_id: PeerId,
2442		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2443		assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
2444	) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2445		let mut sanitized_assignments = Vec::new();
2446		for (cert, candidate_bitfield) in assignments.into_iter() {
2447			let cert_bitfield_bits = match &cert.cert.kind {
2448				AssignmentCertKindV2::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2449				// We don't want to run the VRF yet, but the output is always bounded by `n_cores`.
2450				// We assume `candidate_bitfield` length for the core bitfield and we just check
2451				// against `MAX_BITFIELD_SIZE` later.
2452				AssignmentCertKindV2::RelayVRFModulo { .. } => candidate_bitfield.len(),
2453				AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
2454					core_bitfield.len(),
2455			};
2456
2457			let candidate_bitfield_bits = candidate_bitfield.len();
2458
2459			// Our bitfield has `Lsb0`.
2460			let msb = candidate_bitfield_bits - 1;
2461
2462			// Ensure bitfields length under hard limit.
2463			if cert_bitfield_bits > MAX_BITFIELD_SIZE
2464				|| candidate_bitfield_bits > MAX_BITFIELD_SIZE
2465				// Ensure minimum bitfield size - MSB needs to be one.
2466				|| !candidate_bitfield.bit_at(msb.as_bit_index())
2467			{
2468				// Punish the peer for the invalid message.
2469				modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2470					.await;
2471				for candidate_index in candidate_bitfield.iter_ones() {
2472					gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, "Bad assignment v2, oversized bitfield");
2473				}
2474			} else {
2475				sanitized_assignments.push((cert, candidate_bitfield))
2476			}
2477		}
2478
2479		sanitized_assignments
2480	}
2481
2482	// Filter out obviously invalid candidate indices.
2483	async fn sanitize_v1_approvals(
2484		&mut self,
2485		peer_id: PeerId,
2486		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2487		approval: Vec<IndirectSignedApprovalVote>,
2488	) -> Vec<IndirectSignedApprovalVoteV2> {
2489		let mut sanitized_approvals = Vec::new();
2490		for approval in approval.into_iter() {
2491			if approval.candidate_index as usize > MAX_BITFIELD_SIZE {
2492				// Punish the peer for the invalid message.
2493				modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2494					.await;
2495				gum::debug!(
2496					target: LOG_TARGET,
2497					block_hash = ?approval.block_hash,
2498					candidate_index = ?approval.candidate_index,
2499					"Bad approval v1, invalid candidate index"
2500				);
2501			} else {
2502				sanitized_approvals.push(approval.into())
2503			}
2504		}
2505
2506		sanitized_approvals
2507	}
2508
2509	// Filter out obviously invalid candidate indices.
2510	async fn sanitize_v2_approvals(
2511		&mut self,
2512		peer_id: PeerId,
2513		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2514		approval: Vec<IndirectSignedApprovalVoteV2>,
2515	) -> Vec<IndirectSignedApprovalVoteV2> {
2516		let mut sanitized_approvals = Vec::new();
2517		for approval in approval.into_iter() {
2518			if approval.candidate_indices.len() as usize > MAX_BITFIELD_SIZE {
2519				// Punish the peer for the invalid message.
2520				modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2521					.await;
2522				gum::debug!(
2523					target: LOG_TARGET,
2524					block_hash = ?approval.block_hash,
2525					candidate_indices_len = ?approval.candidate_indices.len(),
2526					"Bad approval v2, invalid candidate indices size"
2527				);
2528			} else {
2529				sanitized_approvals.push(approval)
2530			}
2531		}
2532
2533		sanitized_approvals
2534	}
2535}
2536
2537// This adjusts the required routing of messages in blocks that pass the block filter
2538// according to the modifier function given.
2539//
2540// The modifier accepts as inputs the current required-routing state, whether
2541// the message is locally originating, and the validator index of the message issuer.
2542//
2543// Then, if the topology is known, this propagates messages to all peers in the required
2544// routing set which are aware of the block. Peers which are unaware of the block
2545// will have the message sent when it enters their view in `unify_with_peer`.
2546//
2547// Note that the required routing of a message can be modified even if the
2548// topology is unknown yet.
2549#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2550async fn adjust_required_routing_and_propagate<
2551	N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2552	BlockFilter,
2553	RoutingModifier,
2554>(
2555	network_sender: &mut N,
2556	blocks: &mut HashMap<Hash, BlockEntry>,
2557	topologies: &SessionGridTopologies,
2558	block_filter: BlockFilter,
2559	routing_modifier: RoutingModifier,
2560	peer_views: &HashMap<PeerId, PeerEntry>,
2561) where
2562	BlockFilter: Fn(&mut BlockEntry) -> bool,
2563	RoutingModifier: Fn(&RequiredRouting, bool, &ValidatorIndex) -> RequiredRouting,
2564{
2565	let mut peer_assignments = HashMap::new();
2566	let mut peer_approvals = HashMap::new();
2567
2568	// Iterate all blocks in the session, producing payloads
2569	// for each connected peer.
2570	for (block_hash, block_entry) in blocks {
2571		if !block_filter(block_entry) {
2572			continue
2573		}
2574
2575		let topology = match topologies.get_topology(block_entry.session) {
2576			Some(t) => t,
2577			None => continue,
2578		};
2579
2580		// We just need to iterate the `approval_entries` of the block entry as these contain all
2581		// assignments that also link all approval votes.
2582		for approval_entry in block_entry.approval_entries.values_mut() {
2583			let new_required_routing = routing_modifier(
2584				&approval_entry.routing_info().required_routing,
2585				approval_entry.routing_info().local,
2586				&approval_entry.validator_index(),
2587			);
2588
2589			approval_entry.update_required_routing(new_required_routing);
2590
2591			if approval_entry.routing_info().required_routing.is_empty() {
2592				continue
2593			}
2594
2595			let assignment_message = approval_entry.assignment();
2596			let approval_messages = approval_entry.approvals();
2597			let (assignment_knowledge, message_kind) =
2598				approval_entry.create_assignment_knowledge(*block_hash);
2599
2600			for (peer, peer_knowledge) in &mut block_entry.known_by {
2601				if !topology
2602					.local_grid_neighbors()
2603					.route_to_peer(approval_entry.routing_info().required_routing, peer)
2604				{
2605					continue
2606				}
2607
2608				// Only send stuff a peer doesn't know in the context of a relay chain block.
2609				if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2610					peer_knowledge.sent.insert(assignment_knowledge.clone(), message_kind);
2611					peer_assignments
2612						.entry(*peer)
2613						.or_insert_with(Vec::new)
2614						.push(assignment_message.clone());
2615				}
2616
2617				// Filter approval votes.
2618				for approval_message in &approval_messages {
2619					let approval_knowledge = PeerKnowledge::generate_approval_key(approval_message);
2620
2621					if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2622						peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2623						peer_approvals
2624							.entry(*peer)
2625							.or_insert_with(Vec::new)
2626							.push(approval_message.clone());
2627					}
2628				}
2629			}
2630		}
2631	}
2632
2633	// Send messages in accumulated packets, assignments preceding approvals.
2634	for (peer, assignments_packet) in peer_assignments {
2635		if let Some(peer_view) = peer_views.get(&peer) {
2636			send_assignments_batched(
2637				network_sender,
2638				assignments_packet,
2639				&vec![(peer, peer_view.version)],
2640			)
2641			.await;
2642		} else {
2643			// This should never happen.
2644			gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2645		}
2646	}
2647
2648	for (peer, approvals_packet) in peer_approvals {
2649		if let Some(peer_view) = peer_views.get(&peer) {
2650			send_approvals_batched(
2651				network_sender,
2652				approvals_packet,
2653				&vec![(peer, peer_view.version)],
2654			)
2655			.await;
2656		} else {
2657			// This should never happen.
2658			gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2659		}
2660	}
2661}
2662
2663/// Modify the reputation of a peer based on its behavior.
2664async fn modify_reputation(
2665	reputation: &mut ReputationAggregator,
2666	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2667	peer_id: PeerId,
2668	rep: Rep,
2669) {
2670	gum::trace!(
2671		target: LOG_TARGET,
2672		reputation = ?rep,
2673		?peer_id,
2674		"Reputation change for peer",
2675	);
2676	reputation.modify(sender, peer_id, rep).await;
2677}
2678
2679#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2680impl ApprovalDistribution {
2681	/// Create a new instance of the [`ApprovalDistribution`] subsystem.
2682	pub fn new(
2683		metrics: Metrics,
2684		slot_duration_millis: u64,
2685		assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2686	) -> Self {
2687		Self::new_with_clock(
2688			metrics,
2689			slot_duration_millis,
2690			Arc::new(SystemClock),
2691			assignment_criteria,
2692		)
2693	}
2694
2695	/// Create a new instance of the [`ApprovalDistribution`] subsystem, with a custom clock.
2696	pub fn new_with_clock(
2697		metrics: Metrics,
2698		slot_duration_millis: u64,
2699		clock: Arc<dyn Clock + Send + Sync>,
2700		assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2701	) -> Self {
2702		Self { metrics, slot_duration_millis, clock, assignment_criteria }
2703	}
2704
2705	async fn run<Context>(self, ctx: Context) {
2706		let mut state =
2707			State { slot_duration_millis: self.slot_duration_millis, ..Default::default() };
2708		// According to the docs of `rand`, this is a ChaCha12 RNG in practice
2709		// and will always be chosen for strong performance and security properties.
2710		let mut rng = rand::rngs::StdRng::from_entropy();
2711		let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
2712			keystore: None,
2713			session_cache_lru_size: DISPUTE_WINDOW.get(),
2714		});
2715
2716		self.run_inner(
2717			ctx,
2718			&mut state,
2719			REPUTATION_CHANGE_INTERVAL,
2720			&mut rng,
2721			&mut session_info_provider,
2722		)
2723		.await
2724	}
2725
2726	/// Used for testing.
2727	async fn run_inner<Context>(
2728		self,
2729		mut ctx: Context,
2730		state: &mut State,
2731		reputation_interval: Duration,
2732		rng: &mut (impl CryptoRng + Rng),
2733		session_info_provider: &mut RuntimeInfo,
2734	) {
2735		let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
2736		let mut reputation_delay = new_reputation_delay();
2737		let mut approval_voting_sender = ctx.sender().clone();
2738		let mut network_sender = ctx.sender().clone();
2739		let mut runtime_api_sender = ctx.sender().clone();
2740
2741		loop {
2742			select! {
2743				_ = reputation_delay => {
2744					state.reputation.send(ctx.sender()).await;
2745					reputation_delay = new_reputation_delay();
2746				},
2747				message = ctx.recv().fuse() => {
2748					let message = match message {
2749						Ok(message) => message,
2750						Err(e) => {
2751							gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
2752							return
2753						},
2754					};
2755
2756					if self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, &mut runtime_api_sender, state, rng, session_info_provider).await {
2757						return;
2758					}
2759
2760				},
2761			}
2762		}
2763	}
2764
2765	/// Handles a from orchestra message received by approval distribution subystem.
2766	///
2767	/// Returns `true` if the subsystem should be stopped.
2768	pub async fn handle_from_orchestra<
2769		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2770		A: overseer::SubsystemSender<ApprovalVotingMessage>,
2771		RA: overseer::SubsystemSender<RuntimeApiMessage>,
2772	>(
2773		&self,
2774		message: FromOrchestra<ApprovalDistributionMessage>,
2775		approval_voting_sender: &mut A,
2776		network_sender: &mut N,
2777		runtime_api_sender: &mut RA,
2778		state: &mut State,
2779		rng: &mut (impl CryptoRng + Rng),
2780		session_info_provider: &mut RuntimeInfo,
2781	) -> bool {
2782		match message {
2783			FromOrchestra::Communication { msg } =>
2784				Self::handle_incoming(
2785					approval_voting_sender,
2786					network_sender,
2787					runtime_api_sender,
2788					state,
2789					msg,
2790					&self.metrics,
2791					rng,
2792					self.assignment_criteria.as_ref(),
2793					self.clock.as_ref(),
2794					session_info_provider,
2795				)
2796				.await,
2797			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_update)) => {
2798				gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
2799				// the relay chain blocks relevant to the approval subsystems
2800				// are those that are available, but not finalized yet
2801				// activated and deactivated heads hence are irrelevant to this subsystem, other
2802				// than for tracing purposes.
2803			},
2804			FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
2805				gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
2806				state.handle_block_finalized(network_sender, &self.metrics, number).await;
2807			},
2808			FromOrchestra::Signal(OverseerSignal::Conclude) => return true,
2809		}
2810		false
2811	}
2812
2813	async fn handle_incoming<
2814		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2815		A: overseer::SubsystemSender<ApprovalVotingMessage>,
2816		RA: overseer::SubsystemSender<RuntimeApiMessage>,
2817	>(
2818		approval_voting_sender: &mut A,
2819		network_sender: &mut N,
2820		runtime_api_sender: &mut RA,
2821		state: &mut State,
2822		msg: ApprovalDistributionMessage,
2823		metrics: &Metrics,
2824		rng: &mut (impl CryptoRng + Rng),
2825		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
2826		clock: &(impl Clock + ?Sized),
2827		session_info_provider: &mut RuntimeInfo,
2828	) {
2829		match msg {
2830			ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
2831				state
2832					.handle_network_msg(
2833						approval_voting_sender,
2834						network_sender,
2835						runtime_api_sender,
2836						metrics,
2837						event,
2838						rng,
2839						assignment_criteria,
2840						clock,
2841						session_info_provider,
2842					)
2843					.await;
2844			},
2845			ApprovalDistributionMessage::NewBlocks(metas) => {
2846				state
2847					.handle_new_blocks(
2848						approval_voting_sender,
2849						network_sender,
2850						runtime_api_sender,
2851						metrics,
2852						metas,
2853						rng,
2854						assignment_criteria,
2855						clock,
2856						session_info_provider,
2857					)
2858					.await;
2859			},
2860			ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => {
2861				gum::debug!(
2862					target: LOG_TARGET,
2863					?candidate_indices,
2864					block_hash = ?cert.block_hash,
2865					assignment_kind = ?cert.cert.kind,
2866					"Distributing our assignment on candidates",
2867				);
2868
2869				state
2870					.import_and_circulate_assignment(
2871						approval_voting_sender,
2872						network_sender,
2873						runtime_api_sender,
2874						&metrics,
2875						MessageSource::Local,
2876						cert,
2877						candidate_indices,
2878						rng,
2879						assignment_criteria,
2880						clock,
2881						session_info_provider,
2882					)
2883					.await;
2884			},
2885			ApprovalDistributionMessage::DistributeApproval(vote) => {
2886				gum::debug!(
2887					target: LOG_TARGET,
2888					"Distributing our approval vote on candidate (block={}, index={:?})",
2889					vote.block_hash,
2890					vote.candidate_indices,
2891				);
2892
2893				state
2894					.import_and_circulate_approval(
2895						approval_voting_sender,
2896						network_sender,
2897						runtime_api_sender,
2898						metrics,
2899						MessageSource::Local,
2900						vote,
2901						session_info_provider,
2902					)
2903					.await;
2904			},
2905			ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
2906				let sigs = state.get_approval_signatures(indices);
2907				if let Err(_) = tx.send(sigs) {
2908					gum::debug!(
2909						target: LOG_TARGET,
2910						"Sending back approval signatures failed, oneshot got closed"
2911					);
2912				}
2913			},
2914			ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
2915				gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
2916				state.approval_checking_lag = lag;
2917			},
2918		}
2919	}
2920}
2921
2922#[overseer::subsystem(ApprovalDistribution, error=SubsystemError, prefix=self::overseer)]
2923impl<Context> ApprovalDistribution {
2924	fn start(self, ctx: Context) -> SpawnedSubsystem {
2925		let future = self.run(ctx).map(|_| Ok(())).boxed();
2926
2927		SpawnedSubsystem { name: "approval-distribution-subsystem", future }
2928	}
2929}
2930
2931/// Ensures the batch size is always at least 1 element.
2932const fn ensure_size_not_zero(size: usize) -> usize {
2933	if 0 == size {
2934		panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
2935	}
2936
2937	size
2938}
2939
2940/// The maximum amount of assignments per batch is 33% of maximum allowed by protocol.
2941/// This is an arbitrary value. Bumping this up increases the maximum amount of approvals or
2942/// assignments we send in a single message to peers. Exceeding `MAX_NOTIFICATION_SIZE` will violate
2943/// the protocol configuration.
2944pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
2945	MAX_NOTIFICATION_SIZE as usize /
2946		std::mem::size_of::<(IndirectAssignmentCertV2, CandidateIndex)>() /
2947		3,
2948);
2949
2950/// The maximum amount of approvals per batch is 33% of maximum allowed by protocol.
2951pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
2952	MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVoteV2>() / 3,
2953);
2954
2955// Low level helper for sending assignments.
2956async fn send_assignments_batched_inner(
2957	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2958	batch: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)>,
2959	peers: Vec<PeerId>,
2960	peer_version: ValidationVersion,
2961) {
2962	if peer_version == ValidationVersion::V3 {
2963		sender
2964			.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2965				peers,
2966				Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2967					protocol_v3::ApprovalDistributionMessage::Assignments(
2968						batch.into_iter().collect(),
2969					),
2970				)),
2971			))
2972			.await;
2973	} else {
2974		// Create a batch of v1 assignments from v2 assignments that are compatible with v1.
2975		// `IndirectAssignmentCertV2` -> `IndirectAssignmentCert`
2976		let batch = batch
2977			.into_iter()
2978			.filter_map(|(cert, candidates)| {
2979				cert.try_into().ok().map(|cert| {
2980					(
2981						cert,
2982						// First 1 bit index is the candidate index.
2983						candidates
2984							.first_one()
2985							.map(|index| index as CandidateIndex)
2986							.expect("Assignment was checked for not being empty; qed"),
2987					)
2988				})
2989			})
2990			.collect();
2991		let message = if peer_version == ValidationVersion::V1 {
2992			Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
2993				protocol_v1::ApprovalDistributionMessage::Assignments(batch),
2994			))
2995		} else {
2996			Versioned::V2(protocol_v2::ValidationProtocol::ApprovalDistribution(
2997				protocol_v2::ApprovalDistributionMessage::Assignments(batch),
2998			))
2999		};
3000		sender
3001			.send_message(NetworkBridgeTxMessage::SendValidationMessage(peers, message))
3002			.await;
3003	}
3004}
3005
3006/// Send assignments while honoring the `max_notification_size` of the protocol.
3007///
3008/// Splitting the messages into multiple notifications allows more granular processing at the
3009/// destination, such that the subsystem doesn't get stuck for long processing a batch
3010/// of assignments and can `select!` other tasks.
3011pub(crate) async fn send_assignments_batched(
3012	network_sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
3013	v2_assignments: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)> + Clone,
3014	peers: &[(PeerId, ProtocolVersion)],
3015) {
3016	let v1_peers = filter_by_peer_version(peers, ValidationVersion::V1.into());
3017	let v2_peers = filter_by_peer_version(peers, ValidationVersion::V2.into());
3018	let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
3019
3020	// V1 and V2 validation protocol do not have any changes with regard to
3021	// ApprovalDistributionMessage so they can be treated the same.
3022	if !v1_peers.is_empty() || !v2_peers.is_empty() {
3023		// Older peers(v1) do not understand `AssignmentsV2` messages, so we have to filter these
3024		// out.
3025		let v1_assignments = v2_assignments
3026			.clone()
3027			.into_iter()
3028			.filter(|(_, candidates)| candidates.count_ones() == 1);
3029
3030		let mut v1_batches = v1_assignments.peekable();
3031
3032		while v1_batches.peek().is_some() {
3033			let batch: Vec<_> = v1_batches.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect();
3034			if !v1_peers.is_empty() {
3035				send_assignments_batched_inner(
3036					network_sender,
3037					batch.clone(),
3038					v1_peers.clone(),
3039					ValidationVersion::V1,
3040				)
3041				.await;
3042			}
3043
3044			if !v2_peers.is_empty() {
3045				send_assignments_batched_inner(
3046					network_sender,
3047					batch,
3048					v2_peers.clone(),
3049					ValidationVersion::V2,
3050				)
3051				.await;
3052			}
3053		}
3054	}
3055
3056	if !v3_peers.is_empty() {
3057		let mut v3 = v2_assignments.into_iter().peekable();
3058
3059		while v3.peek().is_some() {
3060			let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::<Vec<_>>();
3061			send_assignments_batched_inner(
3062				network_sender,
3063				batch,
3064				v3_peers.clone(),
3065				ValidationVersion::V3,
3066			)
3067			.await;
3068		}
3069	}
3070}
3071
3072/// Send approvals while honoring the `max_notification_size` of the protocol and peer version.
3073pub(crate) async fn send_approvals_batched(
3074	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
3075	approvals: impl IntoIterator<Item = IndirectSignedApprovalVoteV2> + Clone,
3076	peers: &[(PeerId, ProtocolVersion)],
3077) {
3078	let v1_peers = filter_by_peer_version(peers, ValidationVersion::V1.into());
3079	let v2_peers = filter_by_peer_version(peers, ValidationVersion::V2.into());
3080	let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
3081
3082	if !v1_peers.is_empty() || !v2_peers.is_empty() {
3083		let mut batches = approvals
3084			.clone()
3085			.into_iter()
3086			.filter(|approval| approval.candidate_indices.count_ones() == 1)
3087			.filter_map(|val| val.try_into().ok())
3088			.peekable();
3089
3090		while batches.peek().is_some() {
3091			let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
3092
3093			if !v1_peers.is_empty() {
3094				sender
3095					.send_message(NetworkBridgeTxMessage::SendValidationMessage(
3096						v1_peers.clone(),
3097						Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
3098							protocol_v1::ApprovalDistributionMessage::Approvals(batch.clone()),
3099						)),
3100					))
3101					.await;
3102			}
3103
3104			if !v2_peers.is_empty() {
3105				sender
3106					.send_message(NetworkBridgeTxMessage::SendValidationMessage(
3107						v2_peers.clone(),
3108						Versioned::V2(protocol_v2::ValidationProtocol::ApprovalDistribution(
3109							protocol_v2::ApprovalDistributionMessage::Approvals(batch),
3110						)),
3111					))
3112					.await;
3113			}
3114		}
3115	}
3116
3117	if !v3_peers.is_empty() {
3118		let mut batches = approvals.into_iter().peekable();
3119
3120		while batches.peek().is_some() {
3121			let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
3122
3123			sender
3124				.send_message(NetworkBridgeTxMessage::SendValidationMessage(
3125					v3_peers.clone(),
3126					Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
3127						protocol_v3::ApprovalDistributionMessage::Approvals(batch),
3128					)),
3129				))
3130				.await;
3131		}
3132	}
3133}