Skip to main content

polkadot_approval_distribution/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! [`ApprovalDistribution`] implementation.
18//!
19//! See the documentation on [approval distribution][approval-distribution-page] in the
20//! implementers' guide.
21//!
22//! [approval-distribution-page]: https://paritytech.github.io/polkadot-sdk/book/node/approval/approval-distribution.html
23
24#![warn(missing_docs)]
25
26use self::metrics::Metrics;
27use futures::{select, FutureExt as _};
28use itertools::Itertools;
29use net_protocol::peer_set::{ProtocolVersion, ValidationVersion};
30use polkadot_node_network_protocol::{
31	self as net_protocol, filter_by_peer_version,
32	grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
33	peer_set::MAX_NOTIFICATION_SIZE,
34	v3 as protocol_v3, PeerId, UnifiedReputationChange as Rep, ValidationProtocols, View,
35};
36use polkadot_node_primitives::{
37	approval::{
38		criteria::{AssignmentCriteria, InvalidAssignment},
39		time::{Clock, ClockExt, SystemClock, TICK_TOO_FAR_IN_FUTURE},
40		v1::{BlockApprovalMeta, DelayTranche, RelayVRFStory},
41		v2::{
42			AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
43			IndirectSignedApprovalVoteV2,
44		},
45	},
46	DISPUTE_WINDOW,
47};
48use polkadot_node_subsystem::{
49	messages::{
50		ApprovalDistributionMessage, ApprovalVotingMessage, CheckedIndirectAssignment,
51		CheckedIndirectSignedApprovalVote, NetworkBridgeEvent, NetworkBridgeTxMessage,
52		RuntimeApiMessage,
53	},
54	overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
55};
56use polkadot_node_subsystem_util::{
57	reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
58	runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
59};
60use polkadot_primitives::{
61	BlockNumber, CandidateHash, CandidateIndex, CoreIndex, DisputeStatement, GroupIndex, Hash,
62	SessionIndex, Slot, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
63};
64use rand::{CryptoRng, Rng, SeedableRng};
65use std::{
66	collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
67	sync::Arc,
68	time::Duration,
69};
70
71/// Approval distribution metrics.
72pub mod metrics;
73
74#[cfg(test)]
75mod tests;
76
77const LOG_TARGET: &str = "parachain::approval-distribution";
78
79const COST_UNEXPECTED_MESSAGE: Rep =
80	Rep::CostMinor("Peer sent an out-of-view assignment or approval");
81const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
82const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep =
83	Rep::CostMinor("The vote was valid but too far in the future");
84const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
85const COST_OVERSIZED_BITFIELD: Rep = Rep::CostMajor("Oversized certificate or candidate bitfield");
86
87const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
88const BENEFIT_VALID_MESSAGE_FIRST: Rep =
89	Rep::BenefitMinorFirst("Valid message with new information");
90
91// Maximum valid size for the `CandidateBitfield` in the assignment messages.
92const MAX_BITFIELD_SIZE: usize = 500;
93
94/// The Approval Distribution subsystem.
95pub struct ApprovalDistribution {
96	metrics: Metrics,
97	slot_duration_millis: u64,
98	clock: Arc<dyn Clock + Send + Sync>,
99	assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
100}
101
102/// Contains recently finalized
103/// or those pruned due to finalization.
104#[derive(Default)]
105struct RecentlyOutdated {
106	buf: VecDeque<Hash>,
107}
108
109impl RecentlyOutdated {
110	fn note_outdated(&mut self, hash: Hash) {
111		const MAX_BUF_LEN: usize = 20;
112
113		self.buf.push_back(hash);
114
115		while self.buf.len() > MAX_BUF_LEN {
116			let _ = self.buf.pop_front();
117		}
118	}
119
120	fn is_recent_outdated(&self, hash: &Hash) -> bool {
121		self.buf.contains(hash)
122	}
123}
124
125// Contains topology routing information for assignments and approvals.
126struct ApprovalRouting {
127	required_routing: RequiredRouting,
128	local: bool,
129	random_routing: RandomRouting,
130	peers_randomly_routed: Vec<PeerId>,
131}
132
133impl ApprovalRouting {
134	fn mark_randomly_sent(&mut self, peer: PeerId) {
135		self.random_routing.inc_sent();
136		self.peers_randomly_routed.push(peer);
137	}
138}
139
140// This struct is responsible for tracking the full state of an assignment and grid routing
141// information.
142struct ApprovalEntry {
143	// The assignment certificate.
144	assignment: IndirectAssignmentCertV2,
145	// The candidates claimed by the certificate. A mapping between bit index and candidate index.
146	assignment_claimed_candidates: CandidateBitfield,
147	// The approval signatures for each `CandidateIndex` claimed by the assignment certificate.
148	approvals: HashMap<CandidateBitfield, IndirectSignedApprovalVoteV2>,
149	// The validator index of the assignment signer.
150	validator_index: ValidatorIndex,
151	// Information required for gossiping to other peers using the grid topology.
152	routing_info: ApprovalRouting,
153}
154
155#[derive(Debug)]
156enum ApprovalEntryError {
157	InvalidValidatorIndex,
158	CandidateIndexOutOfBounds,
159	InvalidCandidateIndex,
160	DuplicateApproval,
161	UnknownAssignment,
162}
163
164impl ApprovalEntry {
165	pub fn new(
166		assignment: IndirectAssignmentCertV2,
167		candidates: CandidateBitfield,
168		routing_info: ApprovalRouting,
169	) -> ApprovalEntry {
170		Self {
171			validator_index: assignment.validator,
172			assignment,
173			approvals: HashMap::new(),
174			assignment_claimed_candidates: candidates,
175			routing_info,
176		}
177	}
178
179	// Create a `MessageSubject` to reference the assignment.
180	pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) {
181		(
182			MessageSubject(
183				block_hash,
184				self.assignment_claimed_candidates.clone(),
185				self.validator_index,
186			),
187			MessageKind::Assignment,
188		)
189	}
190
191	// Updates routing information and returns the previous information if any.
192	pub fn routing_info_mut(&mut self) -> &mut ApprovalRouting {
193		&mut self.routing_info
194	}
195
196	// Get the routing information.
197	pub fn routing_info(&self) -> &ApprovalRouting {
198		&self.routing_info
199	}
200
201	// Update routing information.
202	pub fn update_required_routing(&mut self, required_routing: RequiredRouting) {
203		self.routing_info.required_routing = required_routing;
204	}
205
206	// Tells if this entry assignment covers at least one candidate in the approval
207	pub fn includes_approval_candidates(&self, approval: &IndirectSignedApprovalVoteV2) -> bool {
208		for candidate_index in approval.candidate_indices.iter_ones() {
209			if self.assignment_claimed_candidates.bit_at((candidate_index).as_bit_index()) {
210				return true;
211			}
212		}
213		return false;
214	}
215
216	// Records a new approval. Returns error if the claimed candidate is not found or we already
217	// have received the approval.
218	pub fn note_approval(
219		&mut self,
220		approval: IndirectSignedApprovalVoteV2,
221	) -> Result<(), ApprovalEntryError> {
222		// First do some sanity checks:
223		// - check validator index matches
224		// - check claimed candidate
225		// - check for duplicate approval
226		if self.validator_index != approval.validator {
227			return Err(ApprovalEntryError::InvalidValidatorIndex);
228		}
229
230		// We need at least one of the candidates in the approval to be in this assignment
231		if !self.includes_approval_candidates(&approval) {
232			return Err(ApprovalEntryError::InvalidCandidateIndex);
233		}
234
235		if self.approvals.contains_key(&approval.candidate_indices) {
236			return Err(ApprovalEntryError::DuplicateApproval);
237		}
238
239		self.approvals.insert(approval.candidate_indices.clone(), approval.clone());
240		Ok(())
241	}
242
243	// Get the assignment certificate and claimed candidates.
244	pub fn assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) {
245		(self.assignment.clone(), self.assignment_claimed_candidates.clone())
246	}
247
248	// Get all approvals for all candidates claimed by the assignment.
249	pub fn approvals(&self) -> Vec<IndirectSignedApprovalVoteV2> {
250		self.approvals.values().cloned().collect::<Vec<_>>()
251	}
252
253	// Get validator index.
254	pub fn validator_index(&self) -> ValidatorIndex {
255		self.validator_index
256	}
257}
258
259// We keep track of each peer view and protocol version using this struct.
260struct PeerEntry {
261	pub view: View,
262	pub version: ProtocolVersion,
263}
264
265// In case the original grid topology mechanisms don't work on their own, we need to trade bandwidth
266// for protocol liveliness by introducing aggression.
267//
268// Aggression has 3 levels:
269//
270//  * Aggression Level 0: The basic behaviors described above.
271//  * Aggression Level 1: The originator of a message sends to all peers. Other peers follow the
272//    rules above.
273//  * Aggression Level 2: All peers send all messages to all their row and column neighbors. This
274//    means that each validator will, on average, receive each message approximately `2*sqrt(n)`
275//    times.
276// The aggression level of messages pertaining to a block increases when that block is unfinalized
277// and is a child of the finalized block.
278// This means that only one block at a time has its messages propagated with aggression > 0.
279//
280// A note on aggression thresholds: changes in propagation apply only to blocks which are the
281// _direct descendants_ of the finalized block which are older than the given threshold,
282// not to all blocks older than the threshold. Most likely, a few assignments struggle to
283// be propagated in a single block and this holds up all of its descendants blocks.
284// Accordingly, we only step on the gas for the block which is most obviously holding up finality.
285/// Aggression configuration representation
286#[derive(Clone)]
287struct AggressionConfig {
288	/// Aggression level 1: all validators send all their own messages to all peers.
289	l1_threshold: Option<BlockNumber>,
290	/// Aggression level 2: level 1 + all validators send all messages to all peers in the X and Y
291	/// dimensions.
292	l2_threshold: Option<BlockNumber>,
293	/// How often to re-send messages to all targeted recipients.
294	/// This applies to all unfinalized blocks.
295	resend_unfinalized_period: Option<BlockNumber>,
296}
297
298impl AggressionConfig {
299	/// Returns `true` if age is past threshold depending on the aggression level
300	fn should_trigger_aggression(&self, age: BlockNumber) -> bool {
301		if let Some(t) = self.l1_threshold {
302			age >= t
303		} else if let Some(t) = self.resend_unfinalized_period {
304			age > 0 && age.is_multiple_of(t)
305		} else {
306			false
307		}
308	}
309}
310
311impl Default for AggressionConfig {
312	fn default() -> Self {
313		AggressionConfig {
314			l1_threshold: Some(16),
315			l2_threshold: Some(64),
316			resend_unfinalized_period: Some(8),
317		}
318	}
319}
320
321#[derive(PartialEq)]
322enum Resend {
323	Yes,
324	No,
325}
326
327/// The [`State`] struct is responsible for tracking the overall state of the subsystem.
328///
329/// It tracks metadata about our view of the unfinalized chain,
330/// which assignments and approvals we have seen, and our peers' views.
331#[derive(Default)]
332pub struct State {
333	/// These two fields are used in conjunction to construct a view over the unfinalized chain.
334	blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
335	blocks: HashMap<Hash, BlockEntry>,
336
337	/// Our view updates to our peers can race with `NewBlocks` updates. We store messages received
338	/// against the directly mentioned blocks in our view in this map until `NewBlocks` is
339	/// received.
340	///
341	/// As long as the parent is already in the `blocks` map and `NewBlocks` messages aren't
342	/// delayed by more than a block length, this strategy will work well for mitigating the race.
343	/// This is also a race that occurs typically on local networks.
344	pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
345
346	/// Peer data is partially stored here, and partially inline within the [`BlockEntry`]s
347	peer_views: HashMap<PeerId, PeerEntry>,
348
349	/// Keeps a topology for various different sessions.
350	topologies: SessionGridTopologies,
351
352	/// Tracks recently finalized blocks.
353	recent_outdated_blocks: RecentlyOutdated,
354
355	/// Aggression configuration.
356	aggression_config: AggressionConfig,
357
358	/// Current approval checking finality lag.
359	approval_checking_lag: BlockNumber,
360
361	/// Aggregated reputation change
362	reputation: ReputationAggregator,
363
364	/// Slot duration in millis
365	slot_duration_millis: u64,
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum MessageKind {
370	Assignment,
371	Approval,
372}
373
374// Utility structure to identify assignments and approvals for specific candidates.
375// Assignments can span multiple candidates, while approvals refer to only one candidate.
376//
377#[derive(Debug, Clone, Hash, PartialEq, Eq)]
378struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex);
379
380#[derive(Debug, Clone, Default)]
381struct Knowledge {
382	// When there is no entry, this means the message is unknown
383	// When there is an entry with `MessageKind::Assignment`, the assignment is known.
384	// When there is an entry with `MessageKind::Approval`, the assignment and approval are known.
385	known_messages: HashMap<MessageSubject, MessageKind>,
386}
387
388impl Knowledge {
389	fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
390		match (kind, self.known_messages.get(message)) {
391			(_, None) => false,
392			(MessageKind::Assignment, Some(_)) => true,
393			(MessageKind::Approval, Some(MessageKind::Assignment)) => false,
394			(MessageKind::Approval, Some(MessageKind::Approval)) => true,
395		}
396	}
397
398	fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool {
399		let mut success = match self.known_messages.entry(message.clone()) {
400			hash_map::Entry::Vacant(vacant) => {
401				vacant.insert(kind);
402				// If there are multiple candidates assigned in the message, create
403				// separate entries for each one.
404				true
405			},
406			hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) {
407				(MessageKind::Assignment, MessageKind::Assignment) => false,
408				(MessageKind::Approval, MessageKind::Approval) => false,
409				(MessageKind::Approval, MessageKind::Assignment) => false,
410				(MessageKind::Assignment, MessageKind::Approval) => {
411					*occupied.get_mut() = MessageKind::Approval;
412					true
413				},
414			},
415		};
416
417		// In case of successful insertion of multiple candidate assignments create additional
418		// entries for each assigned candidate. This fakes knowledge of individual assignments, but
419		// we need to share the same `MessageSubject` with the followup approval candidate index.
420		if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 {
421			for candidate_index in message.1.iter_ones() {
422				success = success &&
423					self.insert(
424						MessageSubject(
425							message.0,
426							vec![candidate_index as u32].try_into().expect("Non-empty vec; qed"),
427							message.2,
428						),
429						kind,
430					);
431			}
432		}
433		success
434	}
435}
436
437/// Information that has been circulated to and from a peer.
438#[derive(Debug, Clone, Default)]
439struct PeerKnowledge {
440	/// The knowledge we've sent to the peer.
441	sent: Knowledge,
442	/// The knowledge we've received from the peer.
443	received: Knowledge,
444}
445
446impl PeerKnowledge {
447	fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
448		self.sent.contains(message, kind) || self.received.contains(message, kind)
449	}
450
451	// Generate the knowledge keys for querying if all assignments of an approval are known
452	// by this peer.
453	fn generate_assignments_keys(
454		approval: &IndirectSignedApprovalVoteV2,
455	) -> Vec<(MessageSubject, MessageKind)> {
456		approval
457			.candidate_indices
458			.iter_ones()
459			.map(|candidate_index| {
460				(
461					MessageSubject(
462						approval.block_hash,
463						(candidate_index as CandidateIndex).into(),
464						approval.validator,
465					),
466					MessageKind::Assignment,
467				)
468			})
469			.collect_vec()
470	}
471
472	// Generate the knowledge keys for querying if an approval is known by peer.
473	fn generate_approval_key(
474		approval: &IndirectSignedApprovalVoteV2,
475	) -> (MessageSubject, MessageKind) {
476		(
477			MessageSubject(
478				approval.block_hash,
479				approval.candidate_indices.clone(),
480				approval.validator,
481			),
482			MessageKind::Approval,
483		)
484	}
485}
486
487/// Information about blocks in our current view as well as whether peers know of them.
488struct BlockEntry {
489	/// Peers who we know are aware of this block and thus, the candidates within it.
490	/// This maps to their knowledge of messages.
491	known_by: HashMap<PeerId, PeerKnowledge>,
492	/// The number of the block.
493	number: BlockNumber,
494	/// The parent hash of the block.
495	parent_hash: Hash,
496	/// Our knowledge of messages.
497	knowledge: Knowledge,
498	/// A votes entry for each candidate indexed by [`CandidateIndex`].
499	candidates: Vec<CandidateEntry>,
500	/// Information about candidate metadata.
501	candidates_metadata: Vec<(CandidateHash, CoreIndex, GroupIndex)>,
502	/// The session index of this block.
503	session: SessionIndex,
504	/// Approval entries for whole block. These also contain all approvals in the case of multiple
505	/// candidates being claimed by assignments.
506	approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
507	/// The block vrf story.
508	vrf_story: RelayVRFStory,
509	/// The block slot.
510	slot: Slot,
511	/// Backing off from re-sending messages to peers.
512	last_resent_at_block_number: Option<u32>,
513}
514
515impl BlockEntry {
516	// Returns the peer which currently know this block.
517	pub fn known_by(&self) -> Vec<PeerId> {
518		self.known_by.keys().cloned().collect::<Vec<_>>()
519	}
520
521	pub fn insert_approval_entry(&mut self, entry: ApprovalEntry) -> &mut ApprovalEntry {
522		// First map one entry per candidate to the same key we will use in `approval_entries`.
523		// Key is (Validator_index, CandidateBitfield) that links the `ApprovalEntry` to the (K,V)
524		// entry in `candidate_entry.messages`.
525		for claimed_candidate_index in entry.assignment_claimed_candidates.iter_ones() {
526			match self.candidates.get_mut(claimed_candidate_index) {
527				Some(candidate_entry) => {
528					candidate_entry
529						.assignments
530						.entry(entry.validator_index())
531						.or_insert(entry.assignment_claimed_candidates.clone());
532				},
533				None => {
534					// This should never happen, but if it happens, it means the subsystem is
535					// broken.
536					gum::warn!(
537						target: LOG_TARGET,
538						hash = ?entry.assignment.block_hash,
539						?claimed_candidate_index,
540						"Missing candidate entry on `import_and_circulate_assignment`",
541					);
542				},
543			};
544		}
545
546		self.approval_entries
547			.entry((entry.validator_index, entry.assignment_claimed_candidates.clone()))
548			.or_insert(entry)
549	}
550
551	// Tels if all candidate_indices are valid candidates
552	pub fn contains_candidates(&self, candidate_indices: &CandidateBitfield) -> bool {
553		candidate_indices
554			.iter_ones()
555			.all(|candidate_index| self.candidates.get(candidate_index as usize).is_some())
556	}
557
558	// Saves the given approval in all ApprovalEntries that contain an assignment for any of the
559	// candidates in the approval.
560	//
561	// Returns the required routing needed for this approval and the lit of random peers the
562	// covering assignments were sent.
563	pub fn note_approval(
564		&mut self,
565		approval: IndirectSignedApprovalVoteV2,
566	) -> Result<(RequiredRouting, HashSet<PeerId>), ApprovalEntryError> {
567		let mut required_routing: Option<RequiredRouting> = None;
568		let mut peers_randomly_routed_to = HashSet::new();
569
570		if self.candidates.len() < approval.candidate_indices.len() as usize {
571			return Err(ApprovalEntryError::CandidateIndexOutOfBounds);
572		}
573
574		// First determine all assignments bitfields that might be covered by this approval
575		let covered_assignments_bitfields: HashSet<CandidateBitfield> = approval
576			.candidate_indices
577			.iter_ones()
578			.filter_map(|candidate_index| {
579				self.candidates.get_mut(candidate_index).map_or(None, |candidate_entry| {
580					candidate_entry.assignments.get(&approval.validator).cloned()
581				})
582			})
583			.collect();
584
585		// Mark the vote in all approval entries
586		for assignment_bitfield in covered_assignments_bitfields {
587			if let Some(approval_entry) =
588				self.approval_entries.get_mut(&(approval.validator, assignment_bitfield))
589			{
590				approval_entry.note_approval(approval.clone())?;
591				peers_randomly_routed_to
592					.extend(approval_entry.routing_info().peers_randomly_routed.iter());
593
594				if let Some(current_required_routing) = required_routing {
595					required_routing = Some(
596						current_required_routing
597							.combine(approval_entry.routing_info().required_routing),
598					);
599				} else {
600					required_routing = Some(approval_entry.routing_info().required_routing)
601				}
602			}
603		}
604
605		if let Some(required_routing) = required_routing {
606			Ok((required_routing, peers_randomly_routed_to))
607		} else {
608			Err(ApprovalEntryError::UnknownAssignment)
609		}
610	}
611
612	/// Returns the list of approval votes covering this candidate
613	pub fn approval_votes(
614		&self,
615		candidate_index: CandidateIndex,
616	) -> Vec<IndirectSignedApprovalVoteV2> {
617		let result: Option<
618			HashMap<(ValidatorIndex, CandidateBitfield), IndirectSignedApprovalVoteV2>,
619		> = self.candidates.get(candidate_index as usize).map(|candidate_entry| {
620			candidate_entry
621				.assignments
622				.iter()
623				.filter_map(|(validator, assignment_bitfield)| {
624					self.approval_entries.get(&(*validator, assignment_bitfield.clone()))
625				})
626				.flat_map(|approval_entry| {
627					approval_entry
628						.approvals
629						.clone()
630						.into_iter()
631						.filter(|(approved_candidates, _)| {
632							approved_candidates.bit_at(candidate_index.as_bit_index())
633						})
634						.map(|(approved_candidates, vote)| {
635							((approval_entry.validator_index, approved_candidates), vote)
636						})
637				})
638				.collect()
639		});
640
641		result.map(|result| result.into_values().collect_vec()).unwrap_or_default()
642	}
643}
644
645// Information about candidates in the context of a particular block they are included in.
646// In other words, multiple `CandidateEntry`s may exist for the same candidate,
647// if it is included by multiple blocks - this is likely the case when there are forks.
648#[derive(Debug, Default)]
649struct CandidateEntry {
650	// The value represents part of the lookup key in `approval_entries` to fetch the assignment
651	// and existing votes.
652	assignments: HashMap<ValidatorIndex, CandidateBitfield>,
653}
654
655#[derive(Debug, Clone, PartialEq)]
656enum MessageSource {
657	Peer(PeerId),
658	Local,
659}
660
661// Encountered error while validating an assignment.
662#[derive(Debug)]
663enum InvalidAssignmentError {
664	// The vrf check for the assignment failed.
665	#[allow(dead_code)]
666	CryptoCheckFailed(InvalidAssignment),
667	// The assignment did not claim any valid candidate.
668	NoClaimedCandidates,
669	// Claimed invalid candidate.
670	#[allow(dead_code)]
671	ClaimedInvalidCandidateIndex {
672		claimed_index: usize,
673		max_index: usize,
674	},
675	// The assignment claimes more candidates than the maximum allowed.
676	OversizedClaimedBitfield,
677	// `SessionInfo`  was not found for the block hash in the assignment.
678	#[allow(dead_code)]
679	SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
680}
681
682// Encountered error while validating an approval.
683#[derive(Debug)]
684enum InvalidVoteError {
685	// The candidate index was out of bounds.
686	CandidateIndexOutOfBounds,
687	// The candidate hash was not found in the block's candidate list.
688	CandidateHashNotFound,
689	// The validator index was out of bounds.
690	ValidatorIndexOutOfBounds,
691	// The signature of the vote was invalid.
692	InvalidSignature,
693	// `SessionInfo` was not found for the block hash in the approval.
694	#[allow(dead_code)]
695	SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
696}
697
698impl MessageSource {
699	fn peer_id(&self) -> Option<PeerId> {
700		match self {
701			Self::Peer(id) => Some(*id),
702			Self::Local => None,
703		}
704	}
705}
706
707enum PendingMessage {
708	Assignment(IndirectAssignmentCertV2, CandidateBitfield),
709	Approval(IndirectSignedApprovalVoteV2),
710}
711
712#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
713impl State {
714	/// Build State with specified slot duration.
715	pub fn with_config(slot_duration_millis: u64) -> Self {
716		Self { slot_duration_millis, ..Default::default() }
717	}
718
719	async fn handle_network_msg<
720		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
721		A: overseer::SubsystemSender<ApprovalVotingMessage>,
722		RA: overseer::SubsystemSender<RuntimeApiMessage>,
723	>(
724		&mut self,
725		approval_voting_sender: &mut A,
726		network_sender: &mut N,
727		runtime_api_sender: &mut RA,
728		metrics: &Metrics,
729		event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
730		rng: &mut (impl CryptoRng + Rng),
731		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
732		clock: &(impl Clock + ?Sized),
733		session_info_provider: &mut RuntimeInfo,
734	) {
735		match event {
736			NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
737				gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
738				if let Some(authority_ids) = authority_ids {
739					self.topologies.update_authority_ids(peer_id, &authority_ids);
740				}
741				// insert a blank view if none already present
742				self.peer_views
743					.entry(peer_id)
744					.or_insert(PeerEntry { view: Default::default(), version });
745			},
746			NetworkBridgeEvent::PeerDisconnected(peer_id) => {
747				gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
748				self.peer_views.remove(&peer_id);
749				self.blocks.iter_mut().for_each(|(_hash, entry)| {
750					entry.known_by.remove(&peer_id);
751				})
752			},
753			NetworkBridgeEvent::NewGossipTopology(topology) => {
754				self.handle_new_session_topology(
755					network_sender,
756					topology.session,
757					topology.topology,
758					topology.local_index,
759				)
760				.await;
761			},
762			NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
763				self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await;
764			},
765			NetworkBridgeEvent::OurViewChange(view) => {
766				gum::trace!(target: LOG_TARGET, ?view, "Own view change");
767				for head in view.iter() {
768					if !self.blocks.contains_key(head) {
769						self.pending_known.entry(*head).or_default();
770					}
771				}
772
773				self.pending_known.retain(|h, _| {
774					let live = view.contains(h);
775					if !live {
776						gum::trace!(
777							target: LOG_TARGET,
778							block_hash = ?h,
779							"Cleaning up stale pending messages",
780						);
781					}
782					live
783				});
784			},
785			NetworkBridgeEvent::PeerMessage(peer_id, message) => {
786				self.process_incoming_peer_message(
787					approval_voting_sender,
788					network_sender,
789					runtime_api_sender,
790					metrics,
791					peer_id,
792					message,
793					rng,
794					assignment_criteria,
795					clock,
796					session_info_provider,
797				)
798				.await;
799			},
800			NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
801				gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
802				// If we learn about a new PeerId for an authority ids we need to try to route the
803				// messages that should have sent to that validator according to the topology.
804				if self.topologies.update_authority_ids(peer_id, &authority_ids) {
805					if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) {
806						let intersection = self
807							.blocks_by_number
808							.iter()
809							.filter(|(block_number, _)| *block_number > &view.finalized_number)
810							.flat_map(|(_, hashes)| {
811								hashes.iter().filter(|hash| {
812									self.blocks
813										.get(&hash)
814										.map(|block| block.known_by.get(&peer_id).is_some())
815										.unwrap_or_default()
816								})
817							});
818						let view_intersection =
819							View::new(intersection.cloned(), view.finalized_number);
820						Self::unify_with_peer(
821							network_sender,
822							metrics,
823							&mut self.blocks,
824							&self.topologies,
825							self.peer_views.len(),
826							peer_id,
827							*version,
828							view_intersection,
829							rng,
830							true,
831						)
832						.await;
833					}
834				}
835			},
836		}
837	}
838
839	async fn handle_new_blocks<
840		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
841		A: overseer::SubsystemSender<ApprovalVotingMessage>,
842		RA: overseer::SubsystemSender<RuntimeApiMessage>,
843	>(
844		&mut self,
845		approval_voting_sender: &mut A,
846		network_sender: &mut N,
847		runtime_api_sender: &mut RA,
848		metrics: &Metrics,
849		metas: Vec<BlockApprovalMeta>,
850		rng: &mut (impl CryptoRng + Rng),
851		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
852		clock: &(impl Clock + ?Sized),
853		session_info_provider: &mut RuntimeInfo,
854	) {
855		let mut new_hashes = HashSet::new();
856
857		gum::debug!(
858			target: LOG_TARGET,
859			"Got new blocks {:?}",
860			metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
861		);
862
863		for meta in metas {
864			match self.blocks.entry(meta.hash) {
865				hash_map::Entry::Vacant(entry) => {
866					let candidates_count = meta.candidates.len();
867					let mut candidates = Vec::with_capacity(candidates_count);
868					candidates.resize_with(candidates_count, Default::default);
869
870					entry.insert(BlockEntry {
871						known_by: HashMap::new(),
872						number: meta.number,
873						parent_hash: meta.parent_hash,
874						knowledge: Knowledge::default(),
875						candidates,
876						session: meta.session,
877						approval_entries: HashMap::new(),
878						candidates_metadata: meta.candidates,
879						vrf_story: meta.vrf_story,
880						slot: meta.slot,
881						last_resent_at_block_number: None,
882					});
883
884					self.topologies.inc_session_refs(meta.session);
885
886					new_hashes.insert(meta.hash);
887
888					// In case there are duplicates, we should only set this if the entry
889					// was vacant.
890					self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
891				},
892				_ => continue,
893			}
894		}
895
896		{
897			for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() {
898				let intersection = view.iter().filter(|h| new_hashes.contains(h));
899				let view_intersection = View::new(intersection.cloned(), view.finalized_number);
900				Self::unify_with_peer(
901					network_sender,
902					metrics,
903					&mut self.blocks,
904					&self.topologies,
905					self.peer_views.len(),
906					*peer_id,
907					*version,
908					view_intersection,
909					rng,
910					false,
911				)
912				.await;
913			}
914
915			let pending_now_known = self
916				.pending_known
917				.keys()
918				.filter(|k| self.blocks.contains_key(k))
919				.copied()
920				.collect::<Vec<_>>();
921
922			let to_import = pending_now_known
923				.into_iter()
924				.inspect(|h| {
925					gum::trace!(
926						target: LOG_TARGET,
927						block_hash = ?h,
928						"Extracting pending messages for new block"
929					)
930				})
931				.filter_map(|k| self.pending_known.remove(&k))
932				.flatten()
933				.collect::<Vec<_>>();
934
935			if !to_import.is_empty() {
936				gum::debug!(
937					target: LOG_TARGET,
938					num = to_import.len(),
939					"Processing pending assignment/approvals",
940				);
941
942				let _timer = metrics.time_import_pending_now_known();
943
944				for (peer_id, message) in to_import {
945					match message {
946						PendingMessage::Assignment(assignment, claimed_indices) => {
947							self.import_and_circulate_assignment(
948								approval_voting_sender,
949								network_sender,
950								runtime_api_sender,
951								metrics,
952								MessageSource::Peer(peer_id),
953								assignment,
954								claimed_indices,
955								rng,
956								assignment_criteria,
957								clock,
958								session_info_provider,
959							)
960							.await;
961						},
962						PendingMessage::Approval(approval_vote) => {
963							self.import_and_circulate_approval(
964								approval_voting_sender,
965								network_sender,
966								runtime_api_sender,
967								metrics,
968								MessageSource::Peer(peer_id),
969								approval_vote,
970								session_info_provider,
971							)
972							.await;
973						},
974					}
975				}
976			}
977		}
978
979		self.enable_aggression(network_sender, Resend::Yes, metrics).await;
980	}
981
982	async fn handle_new_session_topology<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
983		&mut self,
984		network_sender: &mut N,
985		session: SessionIndex,
986		topology: SessionGridTopology,
987		local_index: Option<ValidatorIndex>,
988	) {
989		if local_index.is_none() {
990			// this subsystem only matters to validators.
991			return;
992		}
993
994		self.topologies.insert_topology(session, topology, local_index);
995		let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
996
997		adjust_required_routing_and_propagate(
998			network_sender,
999			&mut self.blocks,
1000			&self.topologies,
1001			|block_entry| block_entry.session == session,
1002			|required_routing, local, validator_index| {
1003				if required_routing == &RequiredRouting::PendingTopology {
1004					topology
1005						.local_grid_neighbors()
1006						.required_routing_by_index(*validator_index, local)
1007				} else {
1008					*required_routing
1009				}
1010			},
1011			&self.peer_views,
1012		)
1013		.await;
1014	}
1015
1016	async fn process_incoming_assignments<A, N, R, RA>(
1017		&mut self,
1018		approval_voting_sender: &mut A,
1019		network_sender: &mut N,
1020		runtime_api_sender: &mut RA,
1021		metrics: &Metrics,
1022		peer_id: PeerId,
1023		assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
1024		rng: &mut R,
1025		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1026		clock: &(impl Clock + ?Sized),
1027		session_info_provider: &mut RuntimeInfo,
1028	) where
1029		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1030		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1031		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1032		R: CryptoRng + Rng,
1033	{
1034		for (assignment, claimed_indices) in assignments {
1035			if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
1036				let block_hash = &assignment.block_hash;
1037				let validator_index = assignment.validator;
1038
1039				gum::trace!(
1040					target: LOG_TARGET,
1041					%peer_id,
1042					?block_hash,
1043					?claimed_indices,
1044					?validator_index,
1045					"Pending assignment",
1046				);
1047
1048				pending.push((peer_id, PendingMessage::Assignment(assignment, claimed_indices)));
1049
1050				continue;
1051			}
1052
1053			self.import_and_circulate_assignment(
1054				approval_voting_sender,
1055				network_sender,
1056				runtime_api_sender,
1057				metrics,
1058				MessageSource::Peer(peer_id),
1059				assignment,
1060				claimed_indices,
1061				rng,
1062				assignment_criteria,
1063				clock,
1064				session_info_provider,
1065			)
1066			.await;
1067		}
1068	}
1069
1070	// Entry point for processing an approval coming from a peer.
1071	async fn process_incoming_approvals<
1072		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1073		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1074		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1075	>(
1076		&mut self,
1077		approval_voting_sender: &mut A,
1078		network_sender: &mut N,
1079		runtime_api_sender: &mut RA,
1080		metrics: &Metrics,
1081		peer_id: PeerId,
1082		approvals: Vec<IndirectSignedApprovalVoteV2>,
1083		session_info_provider: &mut RuntimeInfo,
1084	) {
1085		gum::trace!(
1086			target: LOG_TARGET,
1087			peer_id = %peer_id,
1088			num = approvals.len(),
1089			"Processing approvals from a peer",
1090		);
1091		for approval_vote in approvals.into_iter() {
1092			if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
1093				let block_hash = approval_vote.block_hash;
1094				let validator_index = approval_vote.validator;
1095
1096				gum::trace!(
1097					target: LOG_TARGET,
1098					%peer_id,
1099					?block_hash,
1100					?validator_index,
1101					"Pending assignment candidates {:?}",
1102					approval_vote.candidate_indices,
1103				);
1104
1105				pending.push((peer_id, PendingMessage::Approval(approval_vote)));
1106
1107				continue;
1108			}
1109
1110			self.import_and_circulate_approval(
1111				approval_voting_sender,
1112				network_sender,
1113				runtime_api_sender,
1114				metrics,
1115				MessageSource::Peer(peer_id),
1116				approval_vote,
1117				session_info_provider,
1118			)
1119			.await;
1120		}
1121	}
1122
1123	async fn process_incoming_peer_message<A, N, RA, R>(
1124		&mut self,
1125		approval_voting_sender: &mut A,
1126		network_sender: &mut N,
1127		runtime_api_sender: &mut RA,
1128		metrics: &Metrics,
1129		peer_id: PeerId,
1130		msg: ValidationProtocols<protocol_v3::ApprovalDistributionMessage>,
1131		rng: &mut R,
1132		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1133		clock: &(impl Clock + ?Sized),
1134		session_info_provider: &mut RuntimeInfo,
1135	) where
1136		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1137		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1138		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1139		R: CryptoRng + Rng,
1140	{
1141		match msg {
1142			ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Assignments(
1143				assignments,
1144			)) => {
1145				gum::trace!(
1146					target: LOG_TARGET,
1147					peer_id = %peer_id,
1148					num = assignments.len(),
1149					"Processing assignments from a peer",
1150				);
1151				let sanitized_assignments =
1152					self.sanitize_v2_assignments(peer_id, network_sender, assignments).await;
1153
1154				self.process_incoming_assignments(
1155					approval_voting_sender,
1156					network_sender,
1157					runtime_api_sender,
1158					metrics,
1159					peer_id,
1160					sanitized_assignments,
1161					rng,
1162					assignment_criteria,
1163					clock,
1164					session_info_provider,
1165				)
1166				.await;
1167			},
1168			ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Approvals(
1169				approvals,
1170			)) => {
1171				let sanitized_approvals =
1172					self.sanitize_v2_approvals(peer_id, network_sender, approvals).await;
1173				self.process_incoming_approvals(
1174					approval_voting_sender,
1175					network_sender,
1176					runtime_api_sender,
1177					metrics,
1178					peer_id,
1179					sanitized_approvals,
1180					session_info_provider,
1181				)
1182				.await;
1183			},
1184		}
1185	}
1186
1187	// handle a peer view change: requires that the peer is already connected
1188	// and has an entry in the `PeerData` struct.
1189	async fn handle_peer_view_change<N: overseer::SubsystemSender<NetworkBridgeTxMessage>, R>(
1190		&mut self,
1191		network_sender: &mut N,
1192		metrics: &Metrics,
1193		peer_id: PeerId,
1194		view: View,
1195		rng: &mut R,
1196	) where
1197		R: CryptoRng + Rng,
1198	{
1199		gum::trace!(target: LOG_TARGET, ?view, "Peer view change");
1200		let finalized_number = view.finalized_number;
1201
1202		let (old_view, protocol_version) =
1203			if let Some(peer_entry) = self.peer_views.get_mut(&peer_id) {
1204				(Some(std::mem::replace(&mut peer_entry.view, view.clone())), peer_entry.version)
1205			} else {
1206				// This shouldn't happen, but if it does we assume protocol version 3.
1207				gum::warn!(
1208					target: LOG_TARGET,
1209					?peer_id,
1210					?view,
1211					"Peer view change for missing `peer_entry`"
1212				);
1213
1214				(None, ValidationVersion::V3.into())
1215			};
1216
1217		let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
1218
1219		// we want to prune every block known_by peer up to (including) view.finalized_number
1220		let blocks = &mut self.blocks;
1221		// the `BTreeMap::range` is constrained by stored keys
1222		// so the loop won't take ages if the new finalized_number skyrockets
1223		// but we need to make sure the range is not empty, otherwise it will panic
1224		// it shouldn't be, we make sure of this in the network bridge
1225		let range = old_finalized_number..=finalized_number;
1226		if !range.is_empty() && !blocks.is_empty() {
1227			self.blocks_by_number
1228				.range(range)
1229				.flat_map(|(_number, hashes)| hashes)
1230				.for_each(|hash| {
1231					if let Some(entry) = blocks.get_mut(hash) {
1232						entry.known_by.remove(&peer_id);
1233					}
1234				});
1235		}
1236
1237		Self::unify_with_peer(
1238			network_sender,
1239			metrics,
1240			&mut self.blocks,
1241			&self.topologies,
1242			self.peer_views.len(),
1243			peer_id,
1244			protocol_version,
1245			view,
1246			rng,
1247			false,
1248		)
1249		.await;
1250	}
1251
1252	async fn handle_block_finalized<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
1253		&mut self,
1254		network_sender: &mut N,
1255		metrics: &Metrics,
1256		finalized_number: BlockNumber,
1257	) {
1258		// we want to prune every block up to (including) finalized_number
1259		// why +1 here?
1260		// split_off returns everything after the given key, including the key
1261		let split_point = finalized_number.saturating_add(1);
1262		let mut old_blocks = self.blocks_by_number.split_off(&split_point);
1263
1264		// after split_off old_blocks actually contains new blocks, we need to swap
1265		std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
1266
1267		// now that we pruned `self.blocks_by_number`, let's clean up `self.blocks` too
1268		old_blocks.values().flatten().for_each(|relay_block| {
1269			self.recent_outdated_blocks.note_outdated(*relay_block);
1270			if let Some(block_entry) = self.blocks.remove(relay_block) {
1271				self.topologies.dec_session_refs(block_entry.session);
1272			}
1273		});
1274
1275		// If a block was finalized, this means we may need to move our aggression
1276		// forward to the now oldest block(s).
1277		self.enable_aggression(network_sender, Resend::No, metrics).await;
1278	}
1279
1280	// When finality is lagging as a last resort nodes start sending the messages they have
1281	// multiples times. This means it is safe to accept duplicate messages without punishing the
1282	// peer and reduce the reputation and can end up banning the Peer, which in turn will create
1283	// more no-shows.
1284	fn accept_duplicates_from_validators(
1285		blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1286		topologies: &SessionGridTopologies,
1287		aggression_config: &AggressionConfig,
1288		entry: &BlockEntry,
1289		peer: PeerId,
1290	) -> bool {
1291		let topology = topologies.get_topology(entry.session);
1292		let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
1293		let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);
1294
1295		// Return if we don't have at least 1 block.
1296		let (min_age, max_age) = match (min_age, max_age) {
1297			(Some(min), Some(max)) => (*min, *max),
1298			_ => return false,
1299		};
1300
1301		let age = max_age.saturating_sub(min_age);
1302
1303		aggression_config.should_trigger_aggression(age) &&
1304			topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
1305	}
1306
1307	async fn import_and_circulate_assignment<A, N, RA, R>(
1308		&mut self,
1309		approval_voting_sender: &mut A,
1310		network_sender: &mut N,
1311		runtime_api_sender: &mut RA,
1312		metrics: &Metrics,
1313		source: MessageSource,
1314		assignment: IndirectAssignmentCertV2,
1315		claimed_candidate_indices: CandidateBitfield,
1316		rng: &mut R,
1317		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1318		clock: &(impl Clock + ?Sized),
1319		session_info_provider: &mut RuntimeInfo,
1320	) where
1321		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1322		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1323		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1324		R: CryptoRng + Rng,
1325	{
1326		let block_hash = assignment.block_hash;
1327		let validator_index = assignment.validator;
1328
1329		let entry = match self.blocks.get_mut(&block_hash) {
1330			Some(entry) => entry,
1331			None => {
1332				if let Some(peer_id) = source.peer_id() {
1333					gum::trace!(
1334						target: LOG_TARGET,
1335						?peer_id,
1336						hash = ?block_hash,
1337						?validator_index,
1338						"Unexpected assignment",
1339					);
1340					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1341						modify_reputation(
1342							&mut self.reputation,
1343							network_sender,
1344							peer_id,
1345							COST_UNEXPECTED_MESSAGE,
1346						)
1347						.await;
1348						gum::debug!(target: LOG_TARGET, "Received assignment for invalid block");
1349						metrics.on_assignment_recent_outdated();
1350					}
1351				}
1352				metrics.on_assignment_invalid_block();
1353				return;
1354			},
1355		};
1356
1357		// Compute metadata on the assignment.
1358		let (message_subject, message_kind) = (
1359			MessageSubject(block_hash, claimed_candidate_indices.clone(), validator_index),
1360			MessageKind::Assignment,
1361		);
1362
1363		if let Some(peer_id) = source.peer_id() {
1364			// check if our knowledge of the peer already contains this assignment
1365			match entry.known_by.entry(peer_id) {
1366				hash_map::Entry::Occupied(mut peer_knowledge) => {
1367					let peer_knowledge = peer_knowledge.get_mut();
1368					if peer_knowledge.contains(&message_subject, message_kind) {
1369						// wasn't included before
1370						if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
1371							if !Self::accept_duplicates_from_validators(
1372								&self.blocks_by_number,
1373								&self.topologies,
1374								&self.aggression_config,
1375								entry,
1376								peer_id,
1377							) {
1378								gum::debug!(
1379									target: LOG_TARGET,
1380									?peer_id,
1381									?message_subject,
1382									"Duplicate assignment",
1383								);
1384
1385								modify_reputation(
1386									&mut self.reputation,
1387									network_sender,
1388									peer_id,
1389									COST_DUPLICATE_MESSAGE,
1390								)
1391								.await;
1392							}
1393
1394							metrics.on_assignment_duplicate();
1395						} else {
1396							gum::trace!(
1397								target: LOG_TARGET,
1398								?peer_id,
1399								hash = ?block_hash,
1400								?validator_index,
1401								?message_subject,
1402								"We sent the message to the peer while peer was sending it to us. Known race condition.",
1403							);
1404						}
1405						return;
1406					}
1407				},
1408				hash_map::Entry::Vacant(_) => {
1409					gum::debug!(
1410						target: LOG_TARGET,
1411						?peer_id,
1412						?message_subject,
1413						"Assignment from a peer is out of view",
1414					);
1415					modify_reputation(
1416						&mut self.reputation,
1417						network_sender,
1418						peer_id,
1419						COST_UNEXPECTED_MESSAGE,
1420					)
1421					.await;
1422					metrics.on_assignment_out_of_view();
1423				},
1424			}
1425
1426			// if the assignment is known to be valid, reward the peer
1427			if entry.knowledge.contains(&message_subject, message_kind) {
1428				modify_reputation(
1429					&mut self.reputation,
1430					network_sender,
1431					peer_id,
1432					BENEFIT_VALID_MESSAGE,
1433				)
1434				.await;
1435				if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1436					gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
1437					peer_knowledge.received.insert(message_subject, message_kind);
1438				}
1439				metrics.on_assignment_good_known();
1440				return;
1441			}
1442
1443			let result = Self::check_assignment_valid(
1444				assignment_criteria,
1445				&entry,
1446				&assignment,
1447				&claimed_candidate_indices,
1448				session_info_provider,
1449				runtime_api_sender,
1450			)
1451			.await;
1452
1453			match result {
1454				Ok(checked_assignment) => {
1455					let current_tranche = clock.tranche_now(self.slot_duration_millis, entry.slot);
1456					let too_far_in_future =
1457						current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
1458
1459					if checked_assignment.tranche() >= too_far_in_future {
1460						gum::debug!(
1461							target: LOG_TARGET,
1462							hash = ?block_hash,
1463							?peer_id,
1464							"Got an assignment too far in the future",
1465						);
1466						modify_reputation(
1467							&mut self.reputation,
1468							network_sender,
1469							peer_id,
1470							COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
1471						)
1472						.await;
1473						metrics.on_assignment_far();
1474
1475						return;
1476					}
1477
1478					approval_voting_sender
1479						.send_message(ApprovalVotingMessage::ImportAssignment(
1480							checked_assignment,
1481							None,
1482						))
1483						.await;
1484					modify_reputation(
1485						&mut self.reputation,
1486						network_sender,
1487						peer_id,
1488						BENEFIT_VALID_MESSAGE_FIRST,
1489					)
1490					.await;
1491					entry.knowledge.insert(message_subject.clone(), message_kind);
1492					if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1493						peer_knowledge.received.insert(message_subject.clone(), message_kind);
1494					}
1495				},
1496				Err(error) => {
1497					gum::info!(
1498						target: LOG_TARGET,
1499						hash = ?block_hash,
1500						?peer_id,
1501						?error,
1502						"Got a bad assignment from peer",
1503					);
1504					modify_reputation(
1505						&mut self.reputation,
1506						network_sender,
1507						peer_id,
1508						COST_INVALID_MESSAGE,
1509					)
1510					.await;
1511					metrics.on_assignment_bad();
1512					return;
1513				},
1514			}
1515		} else {
1516			if !entry.knowledge.insert(message_subject.clone(), message_kind) {
1517				// if we already imported an assignment, there is no need to distribute it again
1518				gum::warn!(
1519					target: LOG_TARGET,
1520					?message_subject,
1521					"Importing locally an already known assignment",
1522				);
1523				return;
1524			} else {
1525				gum::debug!(
1526					target: LOG_TARGET,
1527					?message_subject,
1528					"Importing locally a new assignment",
1529				);
1530			}
1531		}
1532
1533		// Invariant: to our knowledge, none of the peers except for the `source` know about the
1534		// assignment.
1535		metrics.on_assignment_imported(&assignment.cert.kind);
1536
1537		let topology = self.topologies.get_topology(entry.session);
1538		let local = source == MessageSource::Local;
1539
1540		let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
1541			t.local_grid_neighbors().required_routing_by_index(validator_index, local)
1542		});
1543		// Peers that we will send the assignment to.
1544		let mut peers = HashSet::new();
1545
1546		let peers_to_route_to = topology
1547			.as_ref()
1548			.map(|t| t.peers_to_route(required_routing))
1549			.unwrap_or_default();
1550
1551		for peer in peers_to_route_to {
1552			if !entry.known_by.contains_key(&peer) {
1553				continue;
1554			}
1555
1556			peers.insert(peer);
1557		}
1558
1559		// All the peers that know the relay chain block.
1560		let peers_to_filter = entry.known_by();
1561
1562		let approval_entry = entry.insert_approval_entry(ApprovalEntry::new(
1563			assignment.clone(),
1564			claimed_candidate_indices.clone(),
1565			ApprovalRouting {
1566				required_routing,
1567				local,
1568				random_routing: Default::default(),
1569				peers_randomly_routed: Default::default(),
1570			},
1571		));
1572
1573		// Dispatch the message to all peers in the routing set which
1574		// know the block.
1575		//
1576		// If the topology isn't known yet (race with networking subsystems)
1577		// then messages will be sent when we get it.
1578
1579		let assignments = vec![(assignment, claimed_candidate_indices.clone())];
1580		let n_peers_total = self.peer_views.len();
1581		let source_peer = source.peer_id();
1582
1583		// Filter destination peers
1584		for peer in peers_to_filter.into_iter() {
1585			if Some(peer) == source_peer {
1586				continue;
1587			}
1588
1589			if peers.contains(&peer) {
1590				continue;
1591			}
1592
1593			if !topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) {
1594				continue;
1595			}
1596
1597			// Note: at this point, we haven't received the message from any peers
1598			// other than the source peer, and we just got it, so we haven't sent it
1599			// to any peers either.
1600			let route_random =
1601				approval_entry.routing_info().random_routing.sample(n_peers_total, rng);
1602
1603			if route_random {
1604				approval_entry.routing_info_mut().mark_randomly_sent(peer);
1605				peers.insert(peer);
1606			}
1607
1608			if approval_entry.routing_info().random_routing.is_complete() {
1609				break;
1610			}
1611		}
1612
1613		// Add the metadata of the assignment to the knowledge of each peer.
1614		for peer in peers.iter() {
1615			// we already filtered peers above, so this should always be Some
1616			if let Some(peer_knowledge) = entry.known_by.get_mut(peer) {
1617				peer_knowledge.sent.insert(message_subject.clone(), message_kind);
1618			}
1619		}
1620
1621		if !peers.is_empty() {
1622			gum::trace!(
1623				target: LOG_TARGET,
1624				?block_hash,
1625				?claimed_candidate_indices,
1626				local = source.peer_id().is_none(),
1627				num_peers = peers.len(),
1628				"Sending an assignment to peers",
1629			);
1630
1631			let peers = peers
1632				.iter()
1633				.filter_map(|peer_id| {
1634					self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version))
1635				})
1636				.collect::<Vec<_>>();
1637
1638			send_assignments_batched(network_sender, assignments, &peers).await;
1639		}
1640	}
1641
1642	async fn check_assignment_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
1643		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1644		entry: &BlockEntry,
1645		assignment: &IndirectAssignmentCertV2,
1646		claimed_candidate_indices: &CandidateBitfield,
1647		runtime_info: &mut RuntimeInfo,
1648		runtime_api_sender: &mut RA,
1649	) -> Result<CheckedIndirectAssignment, InvalidAssignmentError> {
1650		let ExtendedSessionInfo { ref session_info, .. } = runtime_info
1651			.get_session_info_by_index(runtime_api_sender, assignment.block_hash, entry.session)
1652			.await
1653			.map_err(|err| InvalidAssignmentError::SessionInfoNotFound(err))?;
1654
1655		if claimed_candidate_indices.len() > session_info.n_cores as usize {
1656			return Err(InvalidAssignmentError::OversizedClaimedBitfield);
1657		}
1658
1659		let claimed_cores: Vec<CoreIndex> = claimed_candidate_indices
1660			.iter_ones()
1661			.map(|candidate_index| {
1662				entry.candidates_metadata.get(candidate_index).map(|(_, core, _)| *core).ok_or(
1663					InvalidAssignmentError::ClaimedInvalidCandidateIndex {
1664						claimed_index: candidate_index,
1665						max_index: entry.candidates_metadata.len(),
1666					},
1667				)
1668			})
1669			.collect::<Result<Vec<_>, InvalidAssignmentError>>()?;
1670
1671		let Ok(claimed_cores) = claimed_cores.try_into() else {
1672			return Err(InvalidAssignmentError::NoClaimedCandidates);
1673		};
1674
1675		let backing_groups = claimed_candidate_indices
1676			.iter_ones()
1677			.flat_map(|candidate_index| {
1678				entry.candidates_metadata.get(candidate_index).map(|(_, _, group)| *group)
1679			})
1680			.collect::<Vec<_>>();
1681
1682		assignment_criteria
1683			.check_assignment_cert(
1684				claimed_cores,
1685				assignment.validator,
1686				&polkadot_node_primitives::approval::criteria::Config::from(session_info),
1687				entry.vrf_story.clone(),
1688				&assignment.cert,
1689				backing_groups,
1690			)
1691			.map_err(|err| InvalidAssignmentError::CryptoCheckFailed(err))
1692			.map(|tranche| {
1693				CheckedIndirectAssignment::from_checked(
1694					assignment.clone(),
1695					claimed_candidate_indices.clone(),
1696					tranche,
1697				)
1698			})
1699	}
1700	// Checks if an approval can be processed.
1701	// Returns true if we can continue with processing the approval and false otherwise.
1702	async fn check_approval_can_be_processed<
1703		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1704	>(
1705		network_sender: &mut N,
1706		assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
1707		approval_knowledge_key: &(MessageSubject, MessageKind),
1708		entry: &mut BlockEntry,
1709		blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1710		topologies: &SessionGridTopologies,
1711		aggression_config: &AggressionConfig,
1712		reputation: &mut ReputationAggregator,
1713		peer_id: PeerId,
1714		metrics: &Metrics,
1715	) -> bool {
1716		for message_subject in assignments_knowledge_key {
1717			if !entry.knowledge.contains(&message_subject.0, message_subject.1) {
1718				gum::trace!(
1719					target: LOG_TARGET,
1720					?peer_id,
1721					?message_subject,
1722					"Unknown approval assignment",
1723				);
1724				modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1725					.await;
1726				metrics.on_approval_unknown_assignment();
1727				return false;
1728			}
1729		}
1730
1731		// check if our knowledge of the peer already contains this approval
1732		match entry.known_by.entry(peer_id) {
1733			hash_map::Entry::Occupied(mut knowledge) => {
1734				let peer_knowledge = knowledge.get_mut();
1735				if peer_knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1736					if !peer_knowledge
1737						.received
1738						.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
1739					{
1740						if !Self::accept_duplicates_from_validators(
1741							blocks_by_number,
1742							topologies,
1743							aggression_config,
1744							entry,
1745							peer_id,
1746						) {
1747							gum::trace!(
1748								target: LOG_TARGET,
1749								?peer_id,
1750								?approval_knowledge_key,
1751								"Duplicate approval",
1752							);
1753							modify_reputation(
1754								reputation,
1755								network_sender,
1756								peer_id,
1757								COST_DUPLICATE_MESSAGE,
1758							)
1759							.await;
1760						}
1761						metrics.on_approval_duplicate();
1762					}
1763					return false;
1764				}
1765			},
1766			hash_map::Entry::Vacant(_) => {
1767				gum::debug!(
1768					target: LOG_TARGET,
1769					?peer_id,
1770					?approval_knowledge_key,
1771					"Approval from a peer is out of view",
1772				);
1773				modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1774					.await;
1775				metrics.on_approval_out_of_view();
1776			},
1777		}
1778
1779		if entry.knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1780			if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1781				peer_knowledge
1782					.received
1783					.insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1);
1784			}
1785
1786			// We already processed this approval no need to continue.
1787			gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval");
1788			metrics.on_approval_good_known();
1789			modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await;
1790			false
1791		} else {
1792			true
1793		}
1794	}
1795
1796	async fn import_and_circulate_approval<
1797		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1798		A: overseer::SubsystemSender<ApprovalVotingMessage>,
1799		RA: overseer::SubsystemSender<RuntimeApiMessage>,
1800	>(
1801		&mut self,
1802		approval_voting_sender: &mut A,
1803		network_sender: &mut N,
1804		runtime_api_sender: &mut RA,
1805		metrics: &Metrics,
1806		source: MessageSource,
1807		vote: IndirectSignedApprovalVoteV2,
1808		session_info_provider: &mut RuntimeInfo,
1809	) {
1810		let block_hash = vote.block_hash;
1811		let validator_index = vote.validator;
1812		let candidate_indices = &vote.candidate_indices;
1813		let entry = match self.blocks.get_mut(&block_hash) {
1814			Some(entry) if entry.contains_candidates(&vote.candidate_indices) => entry,
1815			_ => {
1816				if let Some(peer_id) = source.peer_id() {
1817					if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1818						gum::debug!(
1819							target: LOG_TARGET,
1820							?peer_id,
1821							?block_hash,
1822							?validator_index,
1823							?candidate_indices,
1824							"Approval from a peer is out of view",
1825						);
1826						modify_reputation(
1827							&mut self.reputation,
1828							network_sender,
1829							peer_id,
1830							COST_UNEXPECTED_MESSAGE,
1831						)
1832						.await;
1833						metrics.on_approval_invalid_block();
1834					} else {
1835						metrics.on_approval_recent_outdated();
1836					}
1837				}
1838				return;
1839			},
1840		};
1841
1842		// compute metadata on the assignment.
1843		let assignments_knowledge_keys = PeerKnowledge::generate_assignments_keys(&vote);
1844		let approval_knwowledge_key = PeerKnowledge::generate_approval_key(&vote);
1845
1846		if let Some(peer_id) = source.peer_id() {
1847			if !Self::check_approval_can_be_processed(
1848				network_sender,
1849				&assignments_knowledge_keys,
1850				&approval_knwowledge_key,
1851				entry,
1852				&self.blocks_by_number,
1853				&self.topologies,
1854				&self.aggression_config,
1855				&mut self.reputation,
1856				peer_id,
1857				metrics,
1858			)
1859			.await
1860			{
1861				return;
1862			}
1863
1864			let result =
1865				Self::check_vote_valid(&vote, &entry, session_info_provider, runtime_api_sender)
1866					.await;
1867
1868			match result {
1869				Ok(vote) => {
1870					approval_voting_sender
1871						.send_message(ApprovalVotingMessage::ImportApproval(vote, None))
1872						.await;
1873
1874					modify_reputation(
1875						&mut self.reputation,
1876						network_sender,
1877						peer_id,
1878						BENEFIT_VALID_MESSAGE_FIRST,
1879					)
1880					.await;
1881
1882					entry
1883						.knowledge
1884						.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1885					if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1886						peer_knowledge
1887							.received
1888							.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1889					}
1890				},
1891				Err(err) => {
1892					modify_reputation(
1893						&mut self.reputation,
1894						network_sender,
1895						peer_id,
1896						COST_INVALID_MESSAGE,
1897					)
1898					.await;
1899
1900					gum::info!(
1901						target: LOG_TARGET,
1902						?peer_id,
1903						?err,
1904						"Got a bad approval from peer",
1905					);
1906					metrics.on_approval_bad();
1907					return;
1908				},
1909			}
1910		} else {
1911			if !entry
1912				.knowledge
1913				.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1)
1914			{
1915				// if we already imported all approvals, there is no need to distribute it again
1916				gum::warn!(
1917					target: LOG_TARGET,
1918					"Importing locally an already known approval",
1919				);
1920				return;
1921			} else {
1922				gum::debug!(
1923					target: LOG_TARGET,
1924					"Importing locally a new approval",
1925				);
1926			}
1927		}
1928
1929		let (required_routing, peers_randomly_routed_to) = match entry.note_approval(vote.clone()) {
1930			Ok(required_routing) => required_routing,
1931			Err(err) => {
1932				gum::warn!(
1933					target: LOG_TARGET,
1934					hash = ?block_hash,
1935					validator_index = ?vote.validator,
1936					candidate_bitfield = ?vote.candidate_indices,
1937					?err,
1938					"Possible bug: Vote import failed",
1939				);
1940				metrics.on_approval_bug();
1941				return;
1942			},
1943		};
1944
1945		// Invariant: to our knowledge, none of the peers except for the `source` know about the
1946		// approval.
1947		metrics.on_approval_imported();
1948
1949		// Dispatch a ApprovalDistributionV3Message::Approval(vote)
1950		// to all peers required by the topology, with the exception of the source peer.
1951		let topology = self.topologies.get_topology(entry.session);
1952		let source_peer = source.peer_id();
1953
1954		let peer_filter = move |peer| {
1955			if Some(peer) == source_peer.as_ref() {
1956				return false;
1957			}
1958
1959			// Here we're leaning on a few behaviors of assignment propagation:
1960			//   1. At this point, the only peer we're aware of which has the approval message is
1961			//      the source peer.
1962			//   2. We have sent the assignment message to every peer in the required routing which
1963			//      is aware of this block _unless_ the peer we originally received the assignment
1964			//      from was part of the required routing. In that case, we've sent the assignment
1965			//      to all aware peers in the required routing _except_ the original source of the
1966			//      assignment. Hence the `in_topology_check`.
1967			//   3. Any randomly selected peers have been sent the assignment already.
1968			let in_topology = topology
1969				.map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
1970			in_topology || peers_randomly_routed_to.contains(peer)
1971		};
1972
1973		let peers = entry
1974			.known_by
1975			.iter()
1976			.filter(|(p, _)| peer_filter(p))
1977			.filter_map(|(p, _)| self.peer_views.get(p).map(|entry| (*p, entry.version)))
1978			.collect::<Vec<_>>();
1979
1980		// Add the metadata of the assignment to the knowledge of each peer.
1981		for peer in peers.iter() {
1982			// we already filtered peers above, so this should always be Some
1983			if let Some(entry) = entry.known_by.get_mut(&peer.0) {
1984				entry.sent.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1985			}
1986		}
1987
1988		if !peers.is_empty() {
1989			let approvals = vec![vote];
1990			gum::trace!(
1991				target: LOG_TARGET,
1992				?block_hash,
1993				local = source.peer_id().is_none(),
1994				num_peers = peers.len(),
1995				"Sending an approval to peers",
1996			);
1997			send_approvals_batched(network_sender, approvals, &peers).await;
1998		}
1999	}
2000
2001	// Checks if the approval vote is valid.
2002	async fn check_vote_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
2003		vote: &IndirectSignedApprovalVoteV2,
2004		entry: &BlockEntry,
2005		runtime_info: &mut RuntimeInfo,
2006		runtime_api_sender: &mut RA,
2007	) -> Result<CheckedIndirectSignedApprovalVote, InvalidVoteError> {
2008		if vote.candidate_indices.len() > entry.candidates_metadata.len() {
2009			return Err(InvalidVoteError::CandidateIndexOutOfBounds);
2010		}
2011
2012		let candidate_hashes = vote
2013			.candidate_indices
2014			.iter_ones()
2015			.flat_map(|candidate_index| {
2016				entry
2017					.candidates_metadata
2018					.get(candidate_index)
2019					.map(|(candidate_hash, _, _)| *candidate_hash)
2020			})
2021			.collect::<Vec<_>>();
2022
2023		let ExtendedSessionInfo { ref session_info, .. } = runtime_info
2024			.get_session_info_by_index(runtime_api_sender, vote.block_hash, entry.session)
2025			.await
2026			.map_err(|err| InvalidVoteError::SessionInfoNotFound(err))?;
2027
2028		let pubkey = session_info
2029			.validators
2030			.get(vote.validator)
2031			.ok_or(InvalidVoteError::ValidatorIndexOutOfBounds)?;
2032		DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(
2033			candidate_hashes.clone(),
2034		))
2035		.check_signature(
2036			&pubkey,
2037			*candidate_hashes.first().ok_or(InvalidVoteError::CandidateHashNotFound)?,
2038			entry.session,
2039			&vote.signature,
2040		)
2041		.map_err(|_| InvalidVoteError::InvalidSignature)
2042		.map(|_| CheckedIndirectSignedApprovalVote::from_checked(vote.clone()))
2043	}
2044
2045	/// Retrieve approval signatures from state for the given relay block/indices:
2046	fn get_approval_signatures(
2047		&mut self,
2048		indices: HashSet<(Hash, CandidateIndex)>,
2049	) -> HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)> {
2050		let mut all_sigs = HashMap::new();
2051		for (hash, index) in indices {
2052			let block_entry = match self.blocks.get(&hash) {
2053				None => {
2054					gum::debug!(
2055						target: LOG_TARGET,
2056						?hash,
2057						"`get_approval_signatures`: could not find block entry for given hash!"
2058					);
2059					continue;
2060				},
2061				Some(e) => e,
2062			};
2063
2064			let sigs = block_entry.approval_votes(index).into_iter().map(|approval| {
2065				(
2066					approval.validator,
2067					(
2068						hash,
2069						approval
2070							.candidate_indices
2071							.iter_ones()
2072							.map(|val| val as CandidateIndex)
2073							.collect_vec(),
2074						approval.signature,
2075					),
2076				)
2077			});
2078			all_sigs.extend(sigs);
2079		}
2080		all_sigs
2081	}
2082
2083	async fn unify_with_peer(
2084		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2085		metrics: &Metrics,
2086		entries: &mut HashMap<Hash, BlockEntry>,
2087		topologies: &SessionGridTopologies,
2088		total_peers: usize,
2089		peer_id: PeerId,
2090		protocol_version: ProtocolVersion,
2091		view: View,
2092		rng: &mut (impl CryptoRng + Rng),
2093		retry_known_blocks: bool,
2094	) {
2095		metrics.on_unify_with_peer();
2096		let _timer = metrics.time_unify_with_peer();
2097
2098		let mut assignments_to_send = Vec::new();
2099		let mut approvals_to_send = Vec::new();
2100
2101		let view_finalized_number = view.finalized_number;
2102		for head in view.into_iter() {
2103			let mut block = head;
2104
2105			// Walk the chain back to last finalized block of the peer view.
2106			loop {
2107				let entry = match entries.get_mut(&block) {
2108					Some(entry) if entry.number > view_finalized_number => entry,
2109					_ => break,
2110				};
2111
2112				// Any peer which is in the `known_by` see and we know its peer_id authority id
2113				// mapping has already been sent all messages it's meant to get for that block and
2114				// all in-scope prior blocks. In case, we just learnt about its peer_id
2115				// authority-id mapping we have to retry sending the messages that should be sent
2116				// to it for all un-finalized blocks.
2117				if entry.known_by.contains_key(&peer_id) && !retry_known_blocks {
2118					break;
2119				}
2120
2121				let peer_knowledge = entry.known_by.entry(peer_id).or_default();
2122				let topology = topologies.get_topology(entry.session);
2123
2124				// We want to iterate the `approval_entries` of the block entry as these contain
2125				// all assignments that also link all approval votes.
2126				for approval_entry in entry.approval_entries.values_mut() {
2127					// Propagate the message to all peers in the required routing set OR
2128					// randomly sample peers.
2129					{
2130						let required_routing = approval_entry.routing_info().required_routing;
2131						let routing_info = &mut approval_entry.routing_info_mut();
2132						let rng = &mut *rng;
2133						let mut peer_filter = move |peer_id| {
2134							let in_topology = topology.as_ref().map_or(false, |t| {
2135								t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
2136							});
2137							in_topology || {
2138								if !topology
2139									.map(|topology| topology.is_validator(peer_id))
2140									.unwrap_or(false)
2141								{
2142									return false;
2143								}
2144
2145								let route_random =
2146									routing_info.random_routing.sample(total_peers, rng);
2147								if route_random {
2148									routing_info.mark_randomly_sent(*peer_id);
2149								}
2150
2151								route_random
2152							}
2153						};
2154
2155						if !peer_filter(&peer_id) {
2156							continue;
2157						}
2158					}
2159
2160					let assignment_message = approval_entry.assignment();
2161					let approval_messages = approval_entry.approvals();
2162					let (assignment_knowledge, message_kind) =
2163						approval_entry.create_assignment_knowledge(block);
2164
2165					// Only send stuff a peer doesn't know in the context of a relay chain
2166					// block.
2167					if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2168						peer_knowledge.sent.insert(assignment_knowledge, message_kind);
2169						assignments_to_send.push(assignment_message);
2170					}
2171
2172					// Filter approval votes.
2173					for approval_message in approval_messages {
2174						let approval_knowledge =
2175							PeerKnowledge::generate_approval_key(&approval_message);
2176
2177						if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2178							approvals_to_send.push(approval_message);
2179							peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2180						}
2181					}
2182				}
2183
2184				block = entry.parent_hash;
2185			}
2186		}
2187
2188		if !assignments_to_send.is_empty() {
2189			gum::trace!(
2190				target: LOG_TARGET,
2191				?peer_id,
2192				?protocol_version,
2193				num = assignments_to_send.len(),
2194				"Sending assignments to unified peer",
2195			);
2196
2197			send_assignments_batched(
2198				sender,
2199				assignments_to_send,
2200				&vec![(peer_id, protocol_version)],
2201			)
2202			.await;
2203		}
2204
2205		if !approvals_to_send.is_empty() {
2206			gum::trace!(
2207				target: LOG_TARGET,
2208				?peer_id,
2209				?protocol_version,
2210				num = approvals_to_send.len(),
2211				"Sending approvals to unified peer",
2212			);
2213
2214			send_approvals_batched(sender, approvals_to_send, &vec![(peer_id, protocol_version)])
2215				.await;
2216		}
2217	}
2218
2219	// It is very important that aggression starts with oldest unfinalized block, rather than oldest
2220	// unapproved block. Using the gossip approach to distribute potentially
2221	// missing votes to validators requires that we always trigger on finality lag, even if
2222	// we have have the approval lag value. The reason for this, is to avoid finality stall
2223	// when more than 1/3 nodes go offline for a period o time. When they come back
2224	// there wouldn't get any of the approvals since the on-line nodes would never trigger
2225	// aggression as they have approved all the candidates and don't detect any approval lag.
2226	//
2227	// In order to switch to using approval lag as a trigger we need a request/response protocol
2228	// to fetch votes from validators rather than use gossip.
2229	async fn enable_aggression<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
2230		&mut self,
2231		network_sender: &mut N,
2232		resend: Resend,
2233		metrics: &Metrics,
2234	) {
2235		let config = self.aggression_config.clone();
2236		let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
2237		let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
2238
2239		// Return if we don't have at least 1 block.
2240		let (min_age, max_age) = match (min_age, max_age) {
2241			(Some(min), Some(max)) => (*min, *max),
2242			_ => return, // empty.
2243		};
2244
2245		let age = max_age.saturating_sub(min_age);
2246
2247		// Trigger on approval checking lag.
2248		if !self.aggression_config.should_trigger_aggression(age) {
2249			gum::trace!(
2250				target: LOG_TARGET,
2251				approval_checking_lag = self.approval_checking_lag,
2252				age,
2253				"Aggression not enabled",
2254			);
2255			return;
2256		}
2257		gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",);
2258
2259		adjust_required_routing_and_propagate(
2260			network_sender,
2261			&mut self.blocks,
2262			&self.topologies,
2263			|block_entry| {
2264				let block_age = max_age - block_entry.number;
2265				// We want to resend only for blocks of min_age, there is no point in
2266				// resending for blocks newer than that, because we are just going to create load
2267				// and not gain anything.
2268				let diff_from_min_age = block_entry.number - min_age;
2269
2270				// We want to back-off on resending for blocks that have been resent recently, to
2271				// give time for nodes to process all the extra messages, if we still have not
2272				// finalized we are going to resend again after unfinalized_period * 2 since the
2273				// last resend.
2274				let blocks_since_last_sent = block_entry
2275					.last_resent_at_block_number
2276					.map(|last_resent_at_block_number| max_age - last_resent_at_block_number);
2277
2278				let can_resend_at_this_age = blocks_since_last_sent
2279					.zip(config.resend_unfinalized_period)
2280					.map(|(blocks_since_last_sent, unfinalized_period)| {
2281						blocks_since_last_sent >= unfinalized_period * 2
2282					})
2283					.unwrap_or(true);
2284
2285				if resend == Resend::Yes &&
2286					config.resend_unfinalized_period.as_ref().map_or(false, |p| {
2287						block_age > 0 &&
2288							block_age % p == 0 && diff_from_min_age == 0 &&
2289							can_resend_at_this_age
2290					}) {
2291					// Retry sending to all peers.
2292					for (_, knowledge) in block_entry.known_by.iter_mut() {
2293						knowledge.sent = Knowledge::default();
2294					}
2295					block_entry.last_resent_at_block_number = Some(max_age);
2296					gum::debug!(
2297						target: LOG_TARGET,
2298						block_number = ?block_entry.number,
2299						?max_age,
2300						"Aggression enabled with resend for block",
2301					);
2302					true
2303				} else {
2304					false
2305				}
2306			},
2307			|required_routing, _, _| *required_routing,
2308			&self.peer_views,
2309		)
2310		.await;
2311
2312		adjust_required_routing_and_propagate(
2313			network_sender,
2314			&mut self.blocks,
2315			&self.topologies,
2316			|block_entry| {
2317				// Ramp up aggression only for the very oldest block(s).
2318				// Approval voting can get stuck on a single block preventing
2319				// its descendants from being finalized. Waste minimal bandwidth
2320				// this way. Also, disputes might prevent finality - again, nothing
2321				// to waste bandwidth on newer blocks for.
2322				block_entry.number == min_age
2323			},
2324			|required_routing, local, _| {
2325				// It's a bit surprising not to have a topology at this age.
2326				if *required_routing == RequiredRouting::PendingTopology {
2327					gum::debug!(
2328						target: LOG_TARGET,
2329						lag = ?self.approval_checking_lag,
2330						"Encountered old block pending gossip topology",
2331					);
2332					return *required_routing;
2333				}
2334
2335				let mut new_required_routing = *required_routing;
2336
2337				if config.l1_threshold.as_ref().map_or(false, |t| &age >= t) {
2338					// Message originator sends to everyone.
2339					if local && new_required_routing != RequiredRouting::All {
2340						metrics.on_aggression_l1();
2341						new_required_routing = RequiredRouting::All;
2342					}
2343				}
2344
2345				if config.l2_threshold.as_ref().map_or(false, |t| &age >= t) {
2346					// Message originator sends to everyone. Everyone else sends to XY.
2347					if !local && new_required_routing != RequiredRouting::GridXY {
2348						metrics.on_aggression_l2();
2349						new_required_routing = RequiredRouting::GridXY;
2350					}
2351				}
2352				new_required_routing
2353			},
2354			&self.peer_views,
2355		)
2356		.await;
2357	}
2358
2359	// Filter out oversized candidate and certificate core bitfields.
2360	// For each invalid assignment we also punish the peer.
2361	async fn sanitize_v2_assignments(
2362		&mut self,
2363		peer_id: PeerId,
2364		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2365		assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
2366	) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2367		let mut sanitized_assignments = Vec::new();
2368		for (cert, candidate_bitfield) in assignments.into_iter() {
2369			let cert_bitfield_bits = match &cert.cert.kind {
2370				AssignmentCertKindV2::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2371				// We don't want to run the VRF yet, but the output is always bounded by `n_cores`.
2372				// We assume `candidate_bitfield` length for the core bitfield and we just check
2373				// against `MAX_BITFIELD_SIZE` later.
2374				AssignmentCertKindV2::RelayVRFModulo { .. } => candidate_bitfield.len(),
2375				AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => {
2376					core_bitfield.len()
2377				},
2378			};
2379
2380			let candidate_bitfield_bits = candidate_bitfield.len();
2381
2382			// Our bitfield has `Lsb0`.
2383			let msb = candidate_bitfield_bits - 1;
2384
2385			// Ensure bitfields length under hard limit.
2386			if cert_bitfield_bits > MAX_BITFIELD_SIZE
2387				|| candidate_bitfield_bits > MAX_BITFIELD_SIZE
2388				// Ensure minimum bitfield size - MSB needs to be one.
2389				|| !candidate_bitfield.bit_at(msb.as_bit_index())
2390			{
2391				// Punish the peer for the invalid message.
2392				modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2393					.await;
2394				for candidate_index in candidate_bitfield.iter_ones() {
2395					gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, "Bad assignment v2, oversized bitfield");
2396				}
2397			} else {
2398				sanitized_assignments.push((cert, candidate_bitfield))
2399			}
2400		}
2401
2402		sanitized_assignments
2403	}
2404
2405	// Filter out obviously invalid candidate indices.
2406	async fn sanitize_v2_approvals(
2407		&mut self,
2408		peer_id: PeerId,
2409		sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2410		approval: Vec<IndirectSignedApprovalVoteV2>,
2411	) -> Vec<IndirectSignedApprovalVoteV2> {
2412		let mut sanitized_approvals = Vec::new();
2413		for approval in approval.into_iter() {
2414			let has_no_approved_candidates = approval.candidate_indices.first_one().is_none();
2415			if approval.candidate_indices.len() as usize > MAX_BITFIELD_SIZE ||
2416				has_no_approved_candidates
2417			{
2418				// Punish the peer for the invalid message.
2419				modify_reputation(
2420					&mut self.reputation,
2421					sender,
2422					peer_id,
2423					if has_no_approved_candidates {
2424						COST_INVALID_MESSAGE
2425					} else {
2426						COST_OVERSIZED_BITFIELD
2427					},
2428				)
2429				.await;
2430				gum::debug!(
2431					target: LOG_TARGET,
2432					block_hash = ?approval.block_hash,
2433					candidate_indices_len = ?approval.candidate_indices.len(),
2434					"Bad approval v2, invalid candidate indices size"
2435				);
2436			} else {
2437				sanitized_approvals.push(approval)
2438			}
2439		}
2440
2441		sanitized_approvals
2442	}
2443}
2444
2445// This adjusts the required routing of messages in blocks that pass the block filter
2446// according to the modifier function given.
2447//
2448// The modifier accepts as inputs the current required-routing state, whether
2449// the message is locally originating, and the validator index of the message issuer.
2450//
2451// Then, if the topology is known, this propagates messages to all peers in the required
2452// routing set which are aware of the block. Peers which are unaware of the block
2453// will have the message sent when it enters their view in `unify_with_peer`.
2454//
2455// Note that the required routing of a message can be modified even if the
2456// topology is unknown yet.
2457#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2458async fn adjust_required_routing_and_propagate<
2459	N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2460	BlockFilter,
2461	RoutingModifier,
2462>(
2463	network_sender: &mut N,
2464	blocks: &mut HashMap<Hash, BlockEntry>,
2465	topologies: &SessionGridTopologies,
2466	block_filter: BlockFilter,
2467	routing_modifier: RoutingModifier,
2468	peer_views: &HashMap<PeerId, PeerEntry>,
2469) where
2470	BlockFilter: Fn(&mut BlockEntry) -> bool,
2471	RoutingModifier: Fn(&RequiredRouting, bool, &ValidatorIndex) -> RequiredRouting,
2472{
2473	let mut peer_assignments = HashMap::new();
2474	let mut peer_approvals = HashMap::new();
2475
2476	// Iterate all blocks in the session, producing payloads
2477	// for each connected peer.
2478	for (block_hash, block_entry) in blocks {
2479		if !block_filter(block_entry) {
2480			continue;
2481		}
2482
2483		let topology = match topologies.get_topology(block_entry.session) {
2484			Some(t) => t,
2485			None => continue,
2486		};
2487
2488		// We just need to iterate the `approval_entries` of the block entry as these contain all
2489		// assignments that also link all approval votes.
2490		for approval_entry in block_entry.approval_entries.values_mut() {
2491			let new_required_routing = routing_modifier(
2492				&approval_entry.routing_info().required_routing,
2493				approval_entry.routing_info().local,
2494				&approval_entry.validator_index(),
2495			);
2496
2497			approval_entry.update_required_routing(new_required_routing);
2498
2499			if approval_entry.routing_info().required_routing.is_empty() {
2500				continue;
2501			}
2502
2503			let assignment_message = approval_entry.assignment();
2504			let approval_messages = approval_entry.approvals();
2505			let (assignment_knowledge, message_kind) =
2506				approval_entry.create_assignment_knowledge(*block_hash);
2507
2508			for (peer, peer_knowledge) in &mut block_entry.known_by {
2509				if !topology
2510					.local_grid_neighbors()
2511					.route_to_peer(approval_entry.routing_info().required_routing, peer)
2512				{
2513					continue;
2514				}
2515
2516				// Only send stuff a peer doesn't know in the context of a relay chain block.
2517				if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2518					peer_knowledge.sent.insert(assignment_knowledge.clone(), message_kind);
2519					peer_assignments
2520						.entry(*peer)
2521						.or_insert_with(Vec::new)
2522						.push(assignment_message.clone());
2523				}
2524
2525				// Filter approval votes.
2526				for approval_message in &approval_messages {
2527					let approval_knowledge = PeerKnowledge::generate_approval_key(approval_message);
2528
2529					if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2530						peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2531						peer_approvals
2532							.entry(*peer)
2533							.or_insert_with(Vec::new)
2534							.push(approval_message.clone());
2535					}
2536				}
2537			}
2538		}
2539	}
2540
2541	// Send messages in accumulated packets, assignments preceding approvals.
2542	for (peer, assignments_packet) in peer_assignments {
2543		if let Some(peer_view) = peer_views.get(&peer) {
2544			send_assignments_batched(
2545				network_sender,
2546				assignments_packet,
2547				&vec![(peer, peer_view.version)],
2548			)
2549			.await;
2550		} else {
2551			// This should never happen.
2552			gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2553		}
2554	}
2555
2556	for (peer, approvals_packet) in peer_approvals {
2557		if let Some(peer_view) = peer_views.get(&peer) {
2558			send_approvals_batched(
2559				network_sender,
2560				approvals_packet,
2561				&vec![(peer, peer_view.version)],
2562			)
2563			.await;
2564		} else {
2565			// This should never happen.
2566			gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2567		}
2568	}
2569}
2570
2571/// Modify the reputation of a peer based on its behavior.
2572async fn modify_reputation(
2573	reputation: &mut ReputationAggregator,
2574	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2575	peer_id: PeerId,
2576	rep: Rep,
2577) {
2578	gum::trace!(
2579		target: LOG_TARGET,
2580		reputation = ?rep,
2581		?peer_id,
2582		"Reputation change for peer",
2583	);
2584	reputation.modify(sender, peer_id, rep).await;
2585}
2586
2587#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2588impl ApprovalDistribution {
2589	/// Create a new instance of the [`ApprovalDistribution`] subsystem.
2590	pub fn new(
2591		metrics: Metrics,
2592		slot_duration_millis: u64,
2593		assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2594	) -> Self {
2595		Self::new_with_clock(
2596			metrics,
2597			slot_duration_millis,
2598			Arc::new(SystemClock),
2599			assignment_criteria,
2600		)
2601	}
2602
2603	/// Create a new instance of the [`ApprovalDistribution`] subsystem, with a custom clock.
2604	pub fn new_with_clock(
2605		metrics: Metrics,
2606		slot_duration_millis: u64,
2607		clock: Arc<dyn Clock + Send + Sync>,
2608		assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2609	) -> Self {
2610		Self { metrics, slot_duration_millis, clock, assignment_criteria }
2611	}
2612
2613	async fn run<Context>(self, ctx: Context) {
2614		let mut state =
2615			State { slot_duration_millis: self.slot_duration_millis, ..Default::default() };
2616		// According to the docs of `rand`, this is a ChaCha12 RNG in practice
2617		// and will always be chosen for strong performance and security properties.
2618		let mut rng = rand::rngs::StdRng::from_entropy();
2619		let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
2620			keystore: None,
2621			session_cache_lru_size: DISPUTE_WINDOW.get(),
2622		});
2623
2624		self.run_inner(
2625			ctx,
2626			&mut state,
2627			REPUTATION_CHANGE_INTERVAL,
2628			&mut rng,
2629			&mut session_info_provider,
2630		)
2631		.await
2632	}
2633
2634	/// Used for testing.
2635	async fn run_inner<Context>(
2636		self,
2637		mut ctx: Context,
2638		state: &mut State,
2639		reputation_interval: Duration,
2640		rng: &mut (impl CryptoRng + Rng),
2641		session_info_provider: &mut RuntimeInfo,
2642	) {
2643		let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
2644		let mut reputation_delay = new_reputation_delay();
2645		let mut approval_voting_sender = ctx.sender().clone();
2646		let mut network_sender = ctx.sender().clone();
2647		let mut runtime_api_sender = ctx.sender().clone();
2648
2649		loop {
2650			select! {
2651				_ = reputation_delay => {
2652					state.reputation.send(ctx.sender()).await;
2653					reputation_delay = new_reputation_delay();
2654				},
2655				message = ctx.recv().fuse() => {
2656					let message = match message {
2657						Ok(message) => message,
2658						Err(e) => {
2659							gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
2660							return
2661						},
2662					};
2663
2664					if self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, &mut runtime_api_sender, state, rng, session_info_provider).await {
2665						return;
2666					}
2667
2668				},
2669			}
2670		}
2671	}
2672
2673	/// Handles a from orchestra message received by approval distribution subystem.
2674	///
2675	/// Returns `true` if the subsystem should be stopped.
2676	pub async fn handle_from_orchestra<
2677		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2678		A: overseer::SubsystemSender<ApprovalVotingMessage>,
2679		RA: overseer::SubsystemSender<RuntimeApiMessage>,
2680	>(
2681		&self,
2682		message: FromOrchestra<ApprovalDistributionMessage>,
2683		approval_voting_sender: &mut A,
2684		network_sender: &mut N,
2685		runtime_api_sender: &mut RA,
2686		state: &mut State,
2687		rng: &mut (impl CryptoRng + Rng),
2688		session_info_provider: &mut RuntimeInfo,
2689	) -> bool {
2690		match message {
2691			FromOrchestra::Communication { msg } => {
2692				Self::handle_incoming(
2693					approval_voting_sender,
2694					network_sender,
2695					runtime_api_sender,
2696					state,
2697					msg,
2698					&self.metrics,
2699					rng,
2700					self.assignment_criteria.as_ref(),
2701					self.clock.as_ref(),
2702					session_info_provider,
2703				)
2704				.await
2705			},
2706			FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_update)) => {
2707				gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
2708				// the relay chain blocks relevant to the approval subsystems
2709				// are those that are available, but not finalized yet
2710				// activated and deactivated heads hence are irrelevant to this subsystem, other
2711				// than for tracing purposes.
2712			},
2713			FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
2714				gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
2715				state.handle_block_finalized(network_sender, &self.metrics, number).await;
2716			},
2717			FromOrchestra::Signal(OverseerSignal::Conclude) => return true,
2718		}
2719		false
2720	}
2721
2722	async fn handle_incoming<
2723		N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2724		A: overseer::SubsystemSender<ApprovalVotingMessage>,
2725		RA: overseer::SubsystemSender<RuntimeApiMessage>,
2726	>(
2727		approval_voting_sender: &mut A,
2728		network_sender: &mut N,
2729		runtime_api_sender: &mut RA,
2730		state: &mut State,
2731		msg: ApprovalDistributionMessage,
2732		metrics: &Metrics,
2733		rng: &mut (impl CryptoRng + Rng),
2734		assignment_criteria: &(impl AssignmentCriteria + ?Sized),
2735		clock: &(impl Clock + ?Sized),
2736		session_info_provider: &mut RuntimeInfo,
2737	) {
2738		match msg {
2739			ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
2740				state
2741					.handle_network_msg(
2742						approval_voting_sender,
2743						network_sender,
2744						runtime_api_sender,
2745						metrics,
2746						event,
2747						rng,
2748						assignment_criteria,
2749						clock,
2750						session_info_provider,
2751					)
2752					.await;
2753			},
2754			ApprovalDistributionMessage::NewBlocks(metas) => {
2755				state
2756					.handle_new_blocks(
2757						approval_voting_sender,
2758						network_sender,
2759						runtime_api_sender,
2760						metrics,
2761						metas,
2762						rng,
2763						assignment_criteria,
2764						clock,
2765						session_info_provider,
2766					)
2767					.await;
2768			},
2769			ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => {
2770				gum::debug!(
2771					target: LOG_TARGET,
2772					?candidate_indices,
2773					block_hash = ?cert.block_hash,
2774					assignment_kind = ?cert.cert.kind,
2775					"Distributing our assignment on candidates",
2776				);
2777
2778				state
2779					.import_and_circulate_assignment(
2780						approval_voting_sender,
2781						network_sender,
2782						runtime_api_sender,
2783						&metrics,
2784						MessageSource::Local,
2785						cert,
2786						candidate_indices,
2787						rng,
2788						assignment_criteria,
2789						clock,
2790						session_info_provider,
2791					)
2792					.await;
2793			},
2794			ApprovalDistributionMessage::DistributeApproval(vote) => {
2795				gum::debug!(
2796					target: LOG_TARGET,
2797					"Distributing our approval vote on candidate (block={}, index={:?})",
2798					vote.block_hash,
2799					vote.candidate_indices,
2800				);
2801
2802				state
2803					.import_and_circulate_approval(
2804						approval_voting_sender,
2805						network_sender,
2806						runtime_api_sender,
2807						metrics,
2808						MessageSource::Local,
2809						vote,
2810						session_info_provider,
2811					)
2812					.await;
2813			},
2814			ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
2815				let sigs = state.get_approval_signatures(indices);
2816				if let Err(_) = tx.send(sigs) {
2817					gum::debug!(
2818						target: LOG_TARGET,
2819						"Sending back approval signatures failed, oneshot got closed"
2820					);
2821				}
2822			},
2823			ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
2824				gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
2825				state.approval_checking_lag = lag;
2826			},
2827		}
2828	}
2829}
2830
2831#[overseer::subsystem(ApprovalDistribution, error=SubsystemError, prefix=self::overseer)]
2832impl<Context> ApprovalDistribution {
2833	fn start(self, ctx: Context) -> SpawnedSubsystem {
2834		let future = self.run(ctx).map(|_| Ok(())).boxed();
2835
2836		SpawnedSubsystem { name: "approval-distribution-subsystem", future }
2837	}
2838}
2839
2840/// Ensures the batch size is always at least 1 element.
2841const fn ensure_size_not_zero(size: usize) -> usize {
2842	if 0 == size {
2843		panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
2844	}
2845
2846	size
2847}
2848
2849/// The maximum amount of assignments per batch is 33% of maximum allowed by protocol.
2850/// This is an arbitrary value. Bumping this up increases the maximum amount of approvals or
2851/// assignments we send in a single message to peers. Exceeding `MAX_NOTIFICATION_SIZE` will violate
2852/// the protocol configuration.
2853pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
2854	MAX_NOTIFICATION_SIZE as usize /
2855		std::mem::size_of::<(IndirectAssignmentCertV2, CandidateIndex)>() /
2856		3,
2857);
2858
2859/// The maximum amount of approvals per batch is 33% of maximum allowed by protocol.
2860pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
2861	MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVoteV2>() / 3,
2862);
2863
2864// Low level helper for sending assignments.
2865async fn send_assignments_batched_inner(
2866	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2867	batch: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)>,
2868	peers: Vec<PeerId>,
2869	_peer_version: ValidationVersion,
2870) {
2871	sender
2872		.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2873			peers,
2874			ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2875				protocol_v3::ApprovalDistributionMessage::Assignments(batch.into_iter().collect()),
2876			)),
2877		))
2878		.await;
2879}
2880
2881/// Send assignments while honoring the `max_notification_size` of the protocol.
2882///
2883/// Splitting the messages into multiple notifications allows more granular processing at the
2884/// destination, such that the subsystem doesn't get stuck for long processing a batch
2885/// of assignments and can `select!` other tasks.
2886pub(crate) async fn send_assignments_batched(
2887	network_sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2888	v2_assignments: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)> + Clone,
2889	peers: &[(PeerId, ProtocolVersion)],
2890) {
2891	let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2892
2893	if !v3_peers.is_empty() {
2894		let mut v3 = v2_assignments.into_iter().peekable();
2895
2896		while v3.peek().is_some() {
2897			let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::<Vec<_>>();
2898			send_assignments_batched_inner(
2899				network_sender,
2900				batch,
2901				v3_peers.clone(),
2902				ValidationVersion::V3,
2903			)
2904			.await;
2905		}
2906	}
2907}
2908
2909/// Send approvals while honoring the `max_notification_size` of the protocol and peer version.
2910pub(crate) async fn send_approvals_batched(
2911	sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2912	approvals: impl IntoIterator<Item = IndirectSignedApprovalVoteV2> + Clone,
2913	peers: &[(PeerId, ProtocolVersion)],
2914) {
2915	let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2916
2917	if !v3_peers.is_empty() {
2918		let mut batches = approvals.into_iter().peekable();
2919
2920		while batches.peek().is_some() {
2921			let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
2922
2923			sender
2924				.send_message(NetworkBridgeTxMessage::SendValidationMessage(
2925					v3_peers.clone(),
2926					ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2927						protocol_v3::ApprovalDistributionMessage::Approvals(batch),
2928					)),
2929				))
2930				.await;
2931		}
2932	}
2933}