1#![warn(missing_docs)]
25
26use self::metrics::Metrics;
27use futures::{select, FutureExt as _};
28use itertools::Itertools;
29use net_protocol::peer_set::{ProtocolVersion, ValidationVersion};
30use polkadot_node_network_protocol::{
31 self as net_protocol, filter_by_peer_version,
32 grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
33 peer_set::MAX_NOTIFICATION_SIZE,
34 v3 as protocol_v3, PeerId, UnifiedReputationChange as Rep, ValidationProtocols, View,
35};
36use polkadot_node_primitives::{
37 approval::{
38 criteria::{AssignmentCriteria, InvalidAssignment},
39 time::{Clock, ClockExt, SystemClock, TICK_TOO_FAR_IN_FUTURE},
40 v1::{BlockApprovalMeta, DelayTranche, RelayVRFStory},
41 v2::{
42 AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
43 IndirectSignedApprovalVoteV2,
44 },
45 },
46 DISPUTE_WINDOW,
47};
48use polkadot_node_subsystem::{
49 messages::{
50 ApprovalDistributionMessage, ApprovalVotingMessage, CheckedIndirectAssignment,
51 CheckedIndirectSignedApprovalVote, NetworkBridgeEvent, NetworkBridgeTxMessage,
52 RuntimeApiMessage,
53 },
54 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
55};
56use polkadot_node_subsystem_util::{
57 reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
58 runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
59};
60use polkadot_primitives::{
61 BlockNumber, CandidateHash, CandidateIndex, CoreIndex, DisputeStatement, GroupIndex, Hash,
62 SessionIndex, Slot, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
63};
64use rand::{CryptoRng, Rng, SeedableRng};
65use std::{
66 collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
67 sync::Arc,
68 time::Duration,
69};
70
71pub mod metrics;
73
74#[cfg(test)]
75mod tests;
76
77const LOG_TARGET: &str = "parachain::approval-distribution";
78
79const COST_UNEXPECTED_MESSAGE: Rep =
80 Rep::CostMinor("Peer sent an out-of-view assignment or approval");
81const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
82const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep =
83 Rep::CostMinor("The vote was valid but too far in the future");
84const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
85const COST_OVERSIZED_BITFIELD: Rep = Rep::CostMajor("Oversized certificate or candidate bitfield");
86
87const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
88const BENEFIT_VALID_MESSAGE_FIRST: Rep =
89 Rep::BenefitMinorFirst("Valid message with new information");
90
91const MAX_BITFIELD_SIZE: usize = 500;
93
94pub struct ApprovalDistribution {
96 metrics: Metrics,
97 slot_duration_millis: u64,
98 clock: Arc<dyn Clock + Send + Sync>,
99 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
100}
101
102#[derive(Default)]
105struct RecentlyOutdated {
106 buf: VecDeque<Hash>,
107}
108
109impl RecentlyOutdated {
110 fn note_outdated(&mut self, hash: Hash) {
111 const MAX_BUF_LEN: usize = 20;
112
113 self.buf.push_back(hash);
114
115 while self.buf.len() > MAX_BUF_LEN {
116 let _ = self.buf.pop_front();
117 }
118 }
119
120 fn is_recent_outdated(&self, hash: &Hash) -> bool {
121 self.buf.contains(hash)
122 }
123}
124
125struct ApprovalRouting {
127 required_routing: RequiredRouting,
128 local: bool,
129 random_routing: RandomRouting,
130 peers_randomly_routed: Vec<PeerId>,
131}
132
133impl ApprovalRouting {
134 fn mark_randomly_sent(&mut self, peer: PeerId) {
135 self.random_routing.inc_sent();
136 self.peers_randomly_routed.push(peer);
137 }
138}
139
140struct ApprovalEntry {
143 assignment: IndirectAssignmentCertV2,
145 assignment_claimed_candidates: CandidateBitfield,
147 approvals: HashMap<CandidateBitfield, IndirectSignedApprovalVoteV2>,
149 validator_index: ValidatorIndex,
151 routing_info: ApprovalRouting,
153}
154
155#[derive(Debug)]
156enum ApprovalEntryError {
157 InvalidValidatorIndex,
158 CandidateIndexOutOfBounds,
159 InvalidCandidateIndex,
160 DuplicateApproval,
161 UnknownAssignment,
162}
163
164impl ApprovalEntry {
165 pub fn new(
166 assignment: IndirectAssignmentCertV2,
167 candidates: CandidateBitfield,
168 routing_info: ApprovalRouting,
169 ) -> ApprovalEntry {
170 Self {
171 validator_index: assignment.validator,
172 assignment,
173 approvals: HashMap::new(),
174 assignment_claimed_candidates: candidates,
175 routing_info,
176 }
177 }
178
179 pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) {
181 (
182 MessageSubject(
183 block_hash,
184 self.assignment_claimed_candidates.clone(),
185 self.validator_index,
186 ),
187 MessageKind::Assignment,
188 )
189 }
190
191 pub fn routing_info_mut(&mut self) -> &mut ApprovalRouting {
193 &mut self.routing_info
194 }
195
196 pub fn routing_info(&self) -> &ApprovalRouting {
198 &self.routing_info
199 }
200
201 pub fn update_required_routing(&mut self, required_routing: RequiredRouting) {
203 self.routing_info.required_routing = required_routing;
204 }
205
206 pub fn includes_approval_candidates(&self, approval: &IndirectSignedApprovalVoteV2) -> bool {
208 for candidate_index in approval.candidate_indices.iter_ones() {
209 if self.assignment_claimed_candidates.bit_at((candidate_index).as_bit_index()) {
210 return true;
211 }
212 }
213 return false;
214 }
215
216 pub fn note_approval(
219 &mut self,
220 approval: IndirectSignedApprovalVoteV2,
221 ) -> Result<(), ApprovalEntryError> {
222 if self.validator_index != approval.validator {
227 return Err(ApprovalEntryError::InvalidValidatorIndex);
228 }
229
230 if !self.includes_approval_candidates(&approval) {
232 return Err(ApprovalEntryError::InvalidCandidateIndex);
233 }
234
235 if self.approvals.contains_key(&approval.candidate_indices) {
236 return Err(ApprovalEntryError::DuplicateApproval);
237 }
238
239 self.approvals.insert(approval.candidate_indices.clone(), approval.clone());
240 Ok(())
241 }
242
243 pub fn assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) {
245 (self.assignment.clone(), self.assignment_claimed_candidates.clone())
246 }
247
248 pub fn approvals(&self) -> Vec<IndirectSignedApprovalVoteV2> {
250 self.approvals.values().cloned().collect::<Vec<_>>()
251 }
252
253 pub fn validator_index(&self) -> ValidatorIndex {
255 self.validator_index
256 }
257}
258
259struct PeerEntry {
261 pub view: View,
262 pub version: ProtocolVersion,
263}
264
265#[derive(Clone)]
287struct AggressionConfig {
288 l1_threshold: Option<BlockNumber>,
290 l2_threshold: Option<BlockNumber>,
293 resend_unfinalized_period: Option<BlockNumber>,
296}
297
298impl AggressionConfig {
299 fn should_trigger_aggression(&self, age: BlockNumber) -> bool {
301 if let Some(t) = self.l1_threshold {
302 age >= t
303 } else if let Some(t) = self.resend_unfinalized_period {
304 age > 0 && age.is_multiple_of(t)
305 } else {
306 false
307 }
308 }
309}
310
311impl Default for AggressionConfig {
312 fn default() -> Self {
313 AggressionConfig {
314 l1_threshold: Some(16),
315 l2_threshold: Some(64),
316 resend_unfinalized_period: Some(8),
317 }
318 }
319}
320
321#[derive(PartialEq)]
322enum Resend {
323 Yes,
324 No,
325}
326
327#[derive(Default)]
332pub struct State {
333 blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
335 blocks: HashMap<Hash, BlockEntry>,
336
337 pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
345
346 peer_views: HashMap<PeerId, PeerEntry>,
348
349 topologies: SessionGridTopologies,
351
352 recent_outdated_blocks: RecentlyOutdated,
354
355 aggression_config: AggressionConfig,
357
358 approval_checking_lag: BlockNumber,
360
361 reputation: ReputationAggregator,
363
364 slot_duration_millis: u64,
366}
367
368#[derive(Debug, Clone, Copy, PartialEq, Eq)]
369enum MessageKind {
370 Assignment,
371 Approval,
372}
373
374#[derive(Debug, Clone, Hash, PartialEq, Eq)]
378struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex);
379
380#[derive(Debug, Clone, Default)]
381struct Knowledge {
382 known_messages: HashMap<MessageSubject, MessageKind>,
386}
387
388impl Knowledge {
389 fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
390 match (kind, self.known_messages.get(message)) {
391 (_, None) => false,
392 (MessageKind::Assignment, Some(_)) => true,
393 (MessageKind::Approval, Some(MessageKind::Assignment)) => false,
394 (MessageKind::Approval, Some(MessageKind::Approval)) => true,
395 }
396 }
397
398 fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool {
399 let mut success = match self.known_messages.entry(message.clone()) {
400 hash_map::Entry::Vacant(vacant) => {
401 vacant.insert(kind);
402 true
405 },
406 hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) {
407 (MessageKind::Assignment, MessageKind::Assignment) => false,
408 (MessageKind::Approval, MessageKind::Approval) => false,
409 (MessageKind::Approval, MessageKind::Assignment) => false,
410 (MessageKind::Assignment, MessageKind::Approval) => {
411 *occupied.get_mut() = MessageKind::Approval;
412 true
413 },
414 },
415 };
416
417 if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 {
421 for candidate_index in message.1.iter_ones() {
422 success = success &&
423 self.insert(
424 MessageSubject(
425 message.0,
426 vec![candidate_index as u32].try_into().expect("Non-empty vec; qed"),
427 message.2,
428 ),
429 kind,
430 );
431 }
432 }
433 success
434 }
435}
436
437#[derive(Debug, Clone, Default)]
439struct PeerKnowledge {
440 sent: Knowledge,
442 received: Knowledge,
444}
445
446impl PeerKnowledge {
447 fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
448 self.sent.contains(message, kind) || self.received.contains(message, kind)
449 }
450
451 fn generate_assignments_keys(
454 approval: &IndirectSignedApprovalVoteV2,
455 ) -> Vec<(MessageSubject, MessageKind)> {
456 approval
457 .candidate_indices
458 .iter_ones()
459 .map(|candidate_index| {
460 (
461 MessageSubject(
462 approval.block_hash,
463 (candidate_index as CandidateIndex).into(),
464 approval.validator,
465 ),
466 MessageKind::Assignment,
467 )
468 })
469 .collect_vec()
470 }
471
472 fn generate_approval_key(
474 approval: &IndirectSignedApprovalVoteV2,
475 ) -> (MessageSubject, MessageKind) {
476 (
477 MessageSubject(
478 approval.block_hash,
479 approval.candidate_indices.clone(),
480 approval.validator,
481 ),
482 MessageKind::Approval,
483 )
484 }
485}
486
487struct BlockEntry {
489 known_by: HashMap<PeerId, PeerKnowledge>,
492 number: BlockNumber,
494 parent_hash: Hash,
496 knowledge: Knowledge,
498 candidates: Vec<CandidateEntry>,
500 candidates_metadata: Vec<(CandidateHash, CoreIndex, GroupIndex)>,
502 session: SessionIndex,
504 approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
507 vrf_story: RelayVRFStory,
509 slot: Slot,
511 last_resent_at_block_number: Option<u32>,
513}
514
515impl BlockEntry {
516 pub fn known_by(&self) -> Vec<PeerId> {
518 self.known_by.keys().cloned().collect::<Vec<_>>()
519 }
520
521 pub fn insert_approval_entry(&mut self, entry: ApprovalEntry) -> &mut ApprovalEntry {
522 for claimed_candidate_index in entry.assignment_claimed_candidates.iter_ones() {
526 match self.candidates.get_mut(claimed_candidate_index) {
527 Some(candidate_entry) => {
528 candidate_entry
529 .assignments
530 .entry(entry.validator_index())
531 .or_insert(entry.assignment_claimed_candidates.clone());
532 },
533 None => {
534 gum::warn!(
537 target: LOG_TARGET,
538 hash = ?entry.assignment.block_hash,
539 ?claimed_candidate_index,
540 "Missing candidate entry on `import_and_circulate_assignment`",
541 );
542 },
543 };
544 }
545
546 self.approval_entries
547 .entry((entry.validator_index, entry.assignment_claimed_candidates.clone()))
548 .or_insert(entry)
549 }
550
551 pub fn contains_candidates(&self, candidate_indices: &CandidateBitfield) -> bool {
553 candidate_indices
554 .iter_ones()
555 .all(|candidate_index| self.candidates.get(candidate_index as usize).is_some())
556 }
557
558 pub fn note_approval(
564 &mut self,
565 approval: IndirectSignedApprovalVoteV2,
566 ) -> Result<(RequiredRouting, HashSet<PeerId>), ApprovalEntryError> {
567 let mut required_routing: Option<RequiredRouting> = None;
568 let mut peers_randomly_routed_to = HashSet::new();
569
570 if self.candidates.len() < approval.candidate_indices.len() as usize {
571 return Err(ApprovalEntryError::CandidateIndexOutOfBounds);
572 }
573
574 let covered_assignments_bitfields: HashSet<CandidateBitfield> = approval
576 .candidate_indices
577 .iter_ones()
578 .filter_map(|candidate_index| {
579 self.candidates.get_mut(candidate_index).map_or(None, |candidate_entry| {
580 candidate_entry.assignments.get(&approval.validator).cloned()
581 })
582 })
583 .collect();
584
585 for assignment_bitfield in covered_assignments_bitfields {
587 if let Some(approval_entry) =
588 self.approval_entries.get_mut(&(approval.validator, assignment_bitfield))
589 {
590 approval_entry.note_approval(approval.clone())?;
591 peers_randomly_routed_to
592 .extend(approval_entry.routing_info().peers_randomly_routed.iter());
593
594 if let Some(current_required_routing) = required_routing {
595 required_routing = Some(
596 current_required_routing
597 .combine(approval_entry.routing_info().required_routing),
598 );
599 } else {
600 required_routing = Some(approval_entry.routing_info().required_routing)
601 }
602 }
603 }
604
605 if let Some(required_routing) = required_routing {
606 Ok((required_routing, peers_randomly_routed_to))
607 } else {
608 Err(ApprovalEntryError::UnknownAssignment)
609 }
610 }
611
612 pub fn approval_votes(
614 &self,
615 candidate_index: CandidateIndex,
616 ) -> Vec<IndirectSignedApprovalVoteV2> {
617 let result: Option<
618 HashMap<(ValidatorIndex, CandidateBitfield), IndirectSignedApprovalVoteV2>,
619 > = self.candidates.get(candidate_index as usize).map(|candidate_entry| {
620 candidate_entry
621 .assignments
622 .iter()
623 .filter_map(|(validator, assignment_bitfield)| {
624 self.approval_entries.get(&(*validator, assignment_bitfield.clone()))
625 })
626 .flat_map(|approval_entry| {
627 approval_entry
628 .approvals
629 .clone()
630 .into_iter()
631 .filter(|(approved_candidates, _)| {
632 approved_candidates.bit_at(candidate_index.as_bit_index())
633 })
634 .map(|(approved_candidates, vote)| {
635 ((approval_entry.validator_index, approved_candidates), vote)
636 })
637 })
638 .collect()
639 });
640
641 result.map(|result| result.into_values().collect_vec()).unwrap_or_default()
642 }
643}
644
645#[derive(Debug, Default)]
649struct CandidateEntry {
650 assignments: HashMap<ValidatorIndex, CandidateBitfield>,
653}
654
655#[derive(Debug, Clone, PartialEq)]
656enum MessageSource {
657 Peer(PeerId),
658 Local,
659}
660
661#[derive(Debug)]
663enum InvalidAssignmentError {
664 #[allow(dead_code)]
666 CryptoCheckFailed(InvalidAssignment),
667 NoClaimedCandidates,
669 #[allow(dead_code)]
671 ClaimedInvalidCandidateIndex {
672 claimed_index: usize,
673 max_index: usize,
674 },
675 OversizedClaimedBitfield,
677 #[allow(dead_code)]
679 SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
680}
681
682#[derive(Debug)]
684enum InvalidVoteError {
685 CandidateIndexOutOfBounds,
687 CandidateHashNotFound,
689 ValidatorIndexOutOfBounds,
691 InvalidSignature,
693 #[allow(dead_code)]
695 SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
696}
697
698impl MessageSource {
699 fn peer_id(&self) -> Option<PeerId> {
700 match self {
701 Self::Peer(id) => Some(*id),
702 Self::Local => None,
703 }
704 }
705}
706
707enum PendingMessage {
708 Assignment(IndirectAssignmentCertV2, CandidateBitfield),
709 Approval(IndirectSignedApprovalVoteV2),
710}
711
712#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
713impl State {
714 pub fn with_config(slot_duration_millis: u64) -> Self {
716 Self { slot_duration_millis, ..Default::default() }
717 }
718
719 async fn handle_network_msg<
720 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
721 A: overseer::SubsystemSender<ApprovalVotingMessage>,
722 RA: overseer::SubsystemSender<RuntimeApiMessage>,
723 >(
724 &mut self,
725 approval_voting_sender: &mut A,
726 network_sender: &mut N,
727 runtime_api_sender: &mut RA,
728 metrics: &Metrics,
729 event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
730 rng: &mut (impl CryptoRng + Rng),
731 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
732 clock: &(impl Clock + ?Sized),
733 session_info_provider: &mut RuntimeInfo,
734 ) {
735 match event {
736 NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
737 gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
738 if let Some(authority_ids) = authority_ids {
739 self.topologies.update_authority_ids(peer_id, &authority_ids);
740 }
741 self.peer_views
743 .entry(peer_id)
744 .or_insert(PeerEntry { view: Default::default(), version });
745 },
746 NetworkBridgeEvent::PeerDisconnected(peer_id) => {
747 gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
748 self.peer_views.remove(&peer_id);
749 self.blocks.iter_mut().for_each(|(_hash, entry)| {
750 entry.known_by.remove(&peer_id);
751 })
752 },
753 NetworkBridgeEvent::NewGossipTopology(topology) => {
754 self.handle_new_session_topology(
755 network_sender,
756 topology.session,
757 topology.topology,
758 topology.local_index,
759 )
760 .await;
761 },
762 NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
763 self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await;
764 },
765 NetworkBridgeEvent::OurViewChange(view) => {
766 gum::trace!(target: LOG_TARGET, ?view, "Own view change");
767 for head in view.iter() {
768 if !self.blocks.contains_key(head) {
769 self.pending_known.entry(*head).or_default();
770 }
771 }
772
773 self.pending_known.retain(|h, _| {
774 let live = view.contains(h);
775 if !live {
776 gum::trace!(
777 target: LOG_TARGET,
778 block_hash = ?h,
779 "Cleaning up stale pending messages",
780 );
781 }
782 live
783 });
784 },
785 NetworkBridgeEvent::PeerMessage(peer_id, message) => {
786 self.process_incoming_peer_message(
787 approval_voting_sender,
788 network_sender,
789 runtime_api_sender,
790 metrics,
791 peer_id,
792 message,
793 rng,
794 assignment_criteria,
795 clock,
796 session_info_provider,
797 )
798 .await;
799 },
800 NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
801 gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
802 if self.topologies.update_authority_ids(peer_id, &authority_ids) {
805 if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) {
806 let intersection = self
807 .blocks_by_number
808 .iter()
809 .filter(|(block_number, _)| *block_number > &view.finalized_number)
810 .flat_map(|(_, hashes)| {
811 hashes.iter().filter(|hash| {
812 self.blocks
813 .get(&hash)
814 .map(|block| block.known_by.get(&peer_id).is_some())
815 .unwrap_or_default()
816 })
817 });
818 let view_intersection =
819 View::new(intersection.cloned(), view.finalized_number);
820 Self::unify_with_peer(
821 network_sender,
822 metrics,
823 &mut self.blocks,
824 &self.topologies,
825 self.peer_views.len(),
826 peer_id,
827 *version,
828 view_intersection,
829 rng,
830 true,
831 )
832 .await;
833 }
834 }
835 },
836 }
837 }
838
839 async fn handle_new_blocks<
840 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
841 A: overseer::SubsystemSender<ApprovalVotingMessage>,
842 RA: overseer::SubsystemSender<RuntimeApiMessage>,
843 >(
844 &mut self,
845 approval_voting_sender: &mut A,
846 network_sender: &mut N,
847 runtime_api_sender: &mut RA,
848 metrics: &Metrics,
849 metas: Vec<BlockApprovalMeta>,
850 rng: &mut (impl CryptoRng + Rng),
851 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
852 clock: &(impl Clock + ?Sized),
853 session_info_provider: &mut RuntimeInfo,
854 ) {
855 let mut new_hashes = HashSet::new();
856
857 gum::debug!(
858 target: LOG_TARGET,
859 "Got new blocks {:?}",
860 metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
861 );
862
863 for meta in metas {
864 match self.blocks.entry(meta.hash) {
865 hash_map::Entry::Vacant(entry) => {
866 let candidates_count = meta.candidates.len();
867 let mut candidates = Vec::with_capacity(candidates_count);
868 candidates.resize_with(candidates_count, Default::default);
869
870 entry.insert(BlockEntry {
871 known_by: HashMap::new(),
872 number: meta.number,
873 parent_hash: meta.parent_hash,
874 knowledge: Knowledge::default(),
875 candidates,
876 session: meta.session,
877 approval_entries: HashMap::new(),
878 candidates_metadata: meta.candidates,
879 vrf_story: meta.vrf_story,
880 slot: meta.slot,
881 last_resent_at_block_number: None,
882 });
883
884 self.topologies.inc_session_refs(meta.session);
885
886 new_hashes.insert(meta.hash);
887
888 self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
891 },
892 _ => continue,
893 }
894 }
895
896 {
897 for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() {
898 let intersection = view.iter().filter(|h| new_hashes.contains(h));
899 let view_intersection = View::new(intersection.cloned(), view.finalized_number);
900 Self::unify_with_peer(
901 network_sender,
902 metrics,
903 &mut self.blocks,
904 &self.topologies,
905 self.peer_views.len(),
906 *peer_id,
907 *version,
908 view_intersection,
909 rng,
910 false,
911 )
912 .await;
913 }
914
915 let pending_now_known = self
916 .pending_known
917 .keys()
918 .filter(|k| self.blocks.contains_key(k))
919 .copied()
920 .collect::<Vec<_>>();
921
922 let to_import = pending_now_known
923 .into_iter()
924 .inspect(|h| {
925 gum::trace!(
926 target: LOG_TARGET,
927 block_hash = ?h,
928 "Extracting pending messages for new block"
929 )
930 })
931 .filter_map(|k| self.pending_known.remove(&k))
932 .flatten()
933 .collect::<Vec<_>>();
934
935 if !to_import.is_empty() {
936 gum::debug!(
937 target: LOG_TARGET,
938 num = to_import.len(),
939 "Processing pending assignment/approvals",
940 );
941
942 let _timer = metrics.time_import_pending_now_known();
943
944 for (peer_id, message) in to_import {
945 match message {
946 PendingMessage::Assignment(assignment, claimed_indices) => {
947 self.import_and_circulate_assignment(
948 approval_voting_sender,
949 network_sender,
950 runtime_api_sender,
951 metrics,
952 MessageSource::Peer(peer_id),
953 assignment,
954 claimed_indices,
955 rng,
956 assignment_criteria,
957 clock,
958 session_info_provider,
959 )
960 .await;
961 },
962 PendingMessage::Approval(approval_vote) => {
963 self.import_and_circulate_approval(
964 approval_voting_sender,
965 network_sender,
966 runtime_api_sender,
967 metrics,
968 MessageSource::Peer(peer_id),
969 approval_vote,
970 session_info_provider,
971 )
972 .await;
973 },
974 }
975 }
976 }
977 }
978
979 self.enable_aggression(network_sender, Resend::Yes, metrics).await;
980 }
981
982 async fn handle_new_session_topology<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
983 &mut self,
984 network_sender: &mut N,
985 session: SessionIndex,
986 topology: SessionGridTopology,
987 local_index: Option<ValidatorIndex>,
988 ) {
989 if local_index.is_none() {
990 return;
992 }
993
994 self.topologies.insert_topology(session, topology, local_index);
995 let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
996
997 adjust_required_routing_and_propagate(
998 network_sender,
999 &mut self.blocks,
1000 &self.topologies,
1001 |block_entry| block_entry.session == session,
1002 |required_routing, local, validator_index| {
1003 if required_routing == &RequiredRouting::PendingTopology {
1004 topology
1005 .local_grid_neighbors()
1006 .required_routing_by_index(*validator_index, local)
1007 } else {
1008 *required_routing
1009 }
1010 },
1011 &self.peer_views,
1012 )
1013 .await;
1014 }
1015
1016 async fn process_incoming_assignments<A, N, R, RA>(
1017 &mut self,
1018 approval_voting_sender: &mut A,
1019 network_sender: &mut N,
1020 runtime_api_sender: &mut RA,
1021 metrics: &Metrics,
1022 peer_id: PeerId,
1023 assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
1024 rng: &mut R,
1025 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1026 clock: &(impl Clock + ?Sized),
1027 session_info_provider: &mut RuntimeInfo,
1028 ) where
1029 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1030 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1031 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1032 R: CryptoRng + Rng,
1033 {
1034 for (assignment, claimed_indices) in assignments {
1035 if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
1036 let block_hash = &assignment.block_hash;
1037 let validator_index = assignment.validator;
1038
1039 gum::trace!(
1040 target: LOG_TARGET,
1041 %peer_id,
1042 ?block_hash,
1043 ?claimed_indices,
1044 ?validator_index,
1045 "Pending assignment",
1046 );
1047
1048 pending.push((peer_id, PendingMessage::Assignment(assignment, claimed_indices)));
1049
1050 continue;
1051 }
1052
1053 self.import_and_circulate_assignment(
1054 approval_voting_sender,
1055 network_sender,
1056 runtime_api_sender,
1057 metrics,
1058 MessageSource::Peer(peer_id),
1059 assignment,
1060 claimed_indices,
1061 rng,
1062 assignment_criteria,
1063 clock,
1064 session_info_provider,
1065 )
1066 .await;
1067 }
1068 }
1069
1070 async fn process_incoming_approvals<
1072 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1073 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1074 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1075 >(
1076 &mut self,
1077 approval_voting_sender: &mut A,
1078 network_sender: &mut N,
1079 runtime_api_sender: &mut RA,
1080 metrics: &Metrics,
1081 peer_id: PeerId,
1082 approvals: Vec<IndirectSignedApprovalVoteV2>,
1083 session_info_provider: &mut RuntimeInfo,
1084 ) {
1085 gum::trace!(
1086 target: LOG_TARGET,
1087 peer_id = %peer_id,
1088 num = approvals.len(),
1089 "Processing approvals from a peer",
1090 );
1091 for approval_vote in approvals.into_iter() {
1092 if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
1093 let block_hash = approval_vote.block_hash;
1094 let validator_index = approval_vote.validator;
1095
1096 gum::trace!(
1097 target: LOG_TARGET,
1098 %peer_id,
1099 ?block_hash,
1100 ?validator_index,
1101 "Pending assignment candidates {:?}",
1102 approval_vote.candidate_indices,
1103 );
1104
1105 pending.push((peer_id, PendingMessage::Approval(approval_vote)));
1106
1107 continue;
1108 }
1109
1110 self.import_and_circulate_approval(
1111 approval_voting_sender,
1112 network_sender,
1113 runtime_api_sender,
1114 metrics,
1115 MessageSource::Peer(peer_id),
1116 approval_vote,
1117 session_info_provider,
1118 )
1119 .await;
1120 }
1121 }
1122
1123 async fn process_incoming_peer_message<A, N, RA, R>(
1124 &mut self,
1125 approval_voting_sender: &mut A,
1126 network_sender: &mut N,
1127 runtime_api_sender: &mut RA,
1128 metrics: &Metrics,
1129 peer_id: PeerId,
1130 msg: ValidationProtocols<protocol_v3::ApprovalDistributionMessage>,
1131 rng: &mut R,
1132 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1133 clock: &(impl Clock + ?Sized),
1134 session_info_provider: &mut RuntimeInfo,
1135 ) where
1136 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1137 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1138 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1139 R: CryptoRng + Rng,
1140 {
1141 match msg {
1142 ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Assignments(
1143 assignments,
1144 )) => {
1145 gum::trace!(
1146 target: LOG_TARGET,
1147 peer_id = %peer_id,
1148 num = assignments.len(),
1149 "Processing assignments from a peer",
1150 );
1151 let sanitized_assignments =
1152 self.sanitize_v2_assignments(peer_id, network_sender, assignments).await;
1153
1154 self.process_incoming_assignments(
1155 approval_voting_sender,
1156 network_sender,
1157 runtime_api_sender,
1158 metrics,
1159 peer_id,
1160 sanitized_assignments,
1161 rng,
1162 assignment_criteria,
1163 clock,
1164 session_info_provider,
1165 )
1166 .await;
1167 },
1168 ValidationProtocols::V3(protocol_v3::ApprovalDistributionMessage::Approvals(
1169 approvals,
1170 )) => {
1171 let sanitized_approvals =
1172 self.sanitize_v2_approvals(peer_id, network_sender, approvals).await;
1173 self.process_incoming_approvals(
1174 approval_voting_sender,
1175 network_sender,
1176 runtime_api_sender,
1177 metrics,
1178 peer_id,
1179 sanitized_approvals,
1180 session_info_provider,
1181 )
1182 .await;
1183 },
1184 }
1185 }
1186
1187 async fn handle_peer_view_change<N: overseer::SubsystemSender<NetworkBridgeTxMessage>, R>(
1190 &mut self,
1191 network_sender: &mut N,
1192 metrics: &Metrics,
1193 peer_id: PeerId,
1194 view: View,
1195 rng: &mut R,
1196 ) where
1197 R: CryptoRng + Rng,
1198 {
1199 gum::trace!(target: LOG_TARGET, ?view, "Peer view change");
1200 let finalized_number = view.finalized_number;
1201
1202 let (old_view, protocol_version) =
1203 if let Some(peer_entry) = self.peer_views.get_mut(&peer_id) {
1204 (Some(std::mem::replace(&mut peer_entry.view, view.clone())), peer_entry.version)
1205 } else {
1206 gum::warn!(
1208 target: LOG_TARGET,
1209 ?peer_id,
1210 ?view,
1211 "Peer view change for missing `peer_entry`"
1212 );
1213
1214 (None, ValidationVersion::V3.into())
1215 };
1216
1217 let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
1218
1219 let blocks = &mut self.blocks;
1221 let range = old_finalized_number..=finalized_number;
1226 if !range.is_empty() && !blocks.is_empty() {
1227 self.blocks_by_number
1228 .range(range)
1229 .flat_map(|(_number, hashes)| hashes)
1230 .for_each(|hash| {
1231 if let Some(entry) = blocks.get_mut(hash) {
1232 entry.known_by.remove(&peer_id);
1233 }
1234 });
1235 }
1236
1237 Self::unify_with_peer(
1238 network_sender,
1239 metrics,
1240 &mut self.blocks,
1241 &self.topologies,
1242 self.peer_views.len(),
1243 peer_id,
1244 protocol_version,
1245 view,
1246 rng,
1247 false,
1248 )
1249 .await;
1250 }
1251
1252 async fn handle_block_finalized<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
1253 &mut self,
1254 network_sender: &mut N,
1255 metrics: &Metrics,
1256 finalized_number: BlockNumber,
1257 ) {
1258 let split_point = finalized_number.saturating_add(1);
1262 let mut old_blocks = self.blocks_by_number.split_off(&split_point);
1263
1264 std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
1266
1267 old_blocks.values().flatten().for_each(|relay_block| {
1269 self.recent_outdated_blocks.note_outdated(*relay_block);
1270 if let Some(block_entry) = self.blocks.remove(relay_block) {
1271 self.topologies.dec_session_refs(block_entry.session);
1272 }
1273 });
1274
1275 self.enable_aggression(network_sender, Resend::No, metrics).await;
1278 }
1279
1280 fn accept_duplicates_from_validators(
1285 blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1286 topologies: &SessionGridTopologies,
1287 aggression_config: &AggressionConfig,
1288 entry: &BlockEntry,
1289 peer: PeerId,
1290 ) -> bool {
1291 let topology = topologies.get_topology(entry.session);
1292 let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
1293 let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);
1294
1295 let (min_age, max_age) = match (min_age, max_age) {
1297 (Some(min), Some(max)) => (*min, *max),
1298 _ => return false,
1299 };
1300
1301 let age = max_age.saturating_sub(min_age);
1302
1303 aggression_config.should_trigger_aggression(age) &&
1304 topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
1305 }
1306
1307 async fn import_and_circulate_assignment<A, N, RA, R>(
1308 &mut self,
1309 approval_voting_sender: &mut A,
1310 network_sender: &mut N,
1311 runtime_api_sender: &mut RA,
1312 metrics: &Metrics,
1313 source: MessageSource,
1314 assignment: IndirectAssignmentCertV2,
1315 claimed_candidate_indices: CandidateBitfield,
1316 rng: &mut R,
1317 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1318 clock: &(impl Clock + ?Sized),
1319 session_info_provider: &mut RuntimeInfo,
1320 ) where
1321 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1322 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1323 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1324 R: CryptoRng + Rng,
1325 {
1326 let block_hash = assignment.block_hash;
1327 let validator_index = assignment.validator;
1328
1329 let entry = match self.blocks.get_mut(&block_hash) {
1330 Some(entry) => entry,
1331 None => {
1332 if let Some(peer_id) = source.peer_id() {
1333 gum::trace!(
1334 target: LOG_TARGET,
1335 ?peer_id,
1336 hash = ?block_hash,
1337 ?validator_index,
1338 "Unexpected assignment",
1339 );
1340 if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1341 modify_reputation(
1342 &mut self.reputation,
1343 network_sender,
1344 peer_id,
1345 COST_UNEXPECTED_MESSAGE,
1346 )
1347 .await;
1348 gum::debug!(target: LOG_TARGET, "Received assignment for invalid block");
1349 metrics.on_assignment_recent_outdated();
1350 }
1351 }
1352 metrics.on_assignment_invalid_block();
1353 return;
1354 },
1355 };
1356
1357 let (message_subject, message_kind) = (
1359 MessageSubject(block_hash, claimed_candidate_indices.clone(), validator_index),
1360 MessageKind::Assignment,
1361 );
1362
1363 if let Some(peer_id) = source.peer_id() {
1364 match entry.known_by.entry(peer_id) {
1366 hash_map::Entry::Occupied(mut peer_knowledge) => {
1367 let peer_knowledge = peer_knowledge.get_mut();
1368 if peer_knowledge.contains(&message_subject, message_kind) {
1369 if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
1371 if !Self::accept_duplicates_from_validators(
1372 &self.blocks_by_number,
1373 &self.topologies,
1374 &self.aggression_config,
1375 entry,
1376 peer_id,
1377 ) {
1378 gum::debug!(
1379 target: LOG_TARGET,
1380 ?peer_id,
1381 ?message_subject,
1382 "Duplicate assignment",
1383 );
1384
1385 modify_reputation(
1386 &mut self.reputation,
1387 network_sender,
1388 peer_id,
1389 COST_DUPLICATE_MESSAGE,
1390 )
1391 .await;
1392 }
1393
1394 metrics.on_assignment_duplicate();
1395 } else {
1396 gum::trace!(
1397 target: LOG_TARGET,
1398 ?peer_id,
1399 hash = ?block_hash,
1400 ?validator_index,
1401 ?message_subject,
1402 "We sent the message to the peer while peer was sending it to us. Known race condition.",
1403 );
1404 }
1405 return;
1406 }
1407 },
1408 hash_map::Entry::Vacant(_) => {
1409 gum::debug!(
1410 target: LOG_TARGET,
1411 ?peer_id,
1412 ?message_subject,
1413 "Assignment from a peer is out of view",
1414 );
1415 modify_reputation(
1416 &mut self.reputation,
1417 network_sender,
1418 peer_id,
1419 COST_UNEXPECTED_MESSAGE,
1420 )
1421 .await;
1422 metrics.on_assignment_out_of_view();
1423 },
1424 }
1425
1426 if entry.knowledge.contains(&message_subject, message_kind) {
1428 modify_reputation(
1429 &mut self.reputation,
1430 network_sender,
1431 peer_id,
1432 BENEFIT_VALID_MESSAGE,
1433 )
1434 .await;
1435 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1436 gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
1437 peer_knowledge.received.insert(message_subject, message_kind);
1438 }
1439 metrics.on_assignment_good_known();
1440 return;
1441 }
1442
1443 let result = Self::check_assignment_valid(
1444 assignment_criteria,
1445 &entry,
1446 &assignment,
1447 &claimed_candidate_indices,
1448 session_info_provider,
1449 runtime_api_sender,
1450 )
1451 .await;
1452
1453 match result {
1454 Ok(checked_assignment) => {
1455 let current_tranche = clock.tranche_now(self.slot_duration_millis, entry.slot);
1456 let too_far_in_future =
1457 current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
1458
1459 if checked_assignment.tranche() >= too_far_in_future {
1460 gum::debug!(
1461 target: LOG_TARGET,
1462 hash = ?block_hash,
1463 ?peer_id,
1464 "Got an assignment too far in the future",
1465 );
1466 modify_reputation(
1467 &mut self.reputation,
1468 network_sender,
1469 peer_id,
1470 COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
1471 )
1472 .await;
1473 metrics.on_assignment_far();
1474
1475 return;
1476 }
1477
1478 approval_voting_sender
1479 .send_message(ApprovalVotingMessage::ImportAssignment(
1480 checked_assignment,
1481 None,
1482 ))
1483 .await;
1484 modify_reputation(
1485 &mut self.reputation,
1486 network_sender,
1487 peer_id,
1488 BENEFIT_VALID_MESSAGE_FIRST,
1489 )
1490 .await;
1491 entry.knowledge.insert(message_subject.clone(), message_kind);
1492 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1493 peer_knowledge.received.insert(message_subject.clone(), message_kind);
1494 }
1495 },
1496 Err(error) => {
1497 gum::info!(
1498 target: LOG_TARGET,
1499 hash = ?block_hash,
1500 ?peer_id,
1501 ?error,
1502 "Got a bad assignment from peer",
1503 );
1504 modify_reputation(
1505 &mut self.reputation,
1506 network_sender,
1507 peer_id,
1508 COST_INVALID_MESSAGE,
1509 )
1510 .await;
1511 metrics.on_assignment_bad();
1512 return;
1513 },
1514 }
1515 } else {
1516 if !entry.knowledge.insert(message_subject.clone(), message_kind) {
1517 gum::warn!(
1519 target: LOG_TARGET,
1520 ?message_subject,
1521 "Importing locally an already known assignment",
1522 );
1523 return;
1524 } else {
1525 gum::debug!(
1526 target: LOG_TARGET,
1527 ?message_subject,
1528 "Importing locally a new assignment",
1529 );
1530 }
1531 }
1532
1533 metrics.on_assignment_imported(&assignment.cert.kind);
1536
1537 let topology = self.topologies.get_topology(entry.session);
1538 let local = source == MessageSource::Local;
1539
1540 let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
1541 t.local_grid_neighbors().required_routing_by_index(validator_index, local)
1542 });
1543 let mut peers = HashSet::new();
1545
1546 let peers_to_route_to = topology
1547 .as_ref()
1548 .map(|t| t.peers_to_route(required_routing))
1549 .unwrap_or_default();
1550
1551 for peer in peers_to_route_to {
1552 if !entry.known_by.contains_key(&peer) {
1553 continue;
1554 }
1555
1556 peers.insert(peer);
1557 }
1558
1559 let peers_to_filter = entry.known_by();
1561
1562 let approval_entry = entry.insert_approval_entry(ApprovalEntry::new(
1563 assignment.clone(),
1564 claimed_candidate_indices.clone(),
1565 ApprovalRouting {
1566 required_routing,
1567 local,
1568 random_routing: Default::default(),
1569 peers_randomly_routed: Default::default(),
1570 },
1571 ));
1572
1573 let assignments = vec![(assignment, claimed_candidate_indices.clone())];
1580 let n_peers_total = self.peer_views.len();
1581 let source_peer = source.peer_id();
1582
1583 for peer in peers_to_filter.into_iter() {
1585 if Some(peer) == source_peer {
1586 continue;
1587 }
1588
1589 if peers.contains(&peer) {
1590 continue;
1591 }
1592
1593 if !topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) {
1594 continue;
1595 }
1596
1597 let route_random =
1601 approval_entry.routing_info().random_routing.sample(n_peers_total, rng);
1602
1603 if route_random {
1604 approval_entry.routing_info_mut().mark_randomly_sent(peer);
1605 peers.insert(peer);
1606 }
1607
1608 if approval_entry.routing_info().random_routing.is_complete() {
1609 break;
1610 }
1611 }
1612
1613 for peer in peers.iter() {
1615 if let Some(peer_knowledge) = entry.known_by.get_mut(peer) {
1617 peer_knowledge.sent.insert(message_subject.clone(), message_kind);
1618 }
1619 }
1620
1621 if !peers.is_empty() {
1622 gum::trace!(
1623 target: LOG_TARGET,
1624 ?block_hash,
1625 ?claimed_candidate_indices,
1626 local = source.peer_id().is_none(),
1627 num_peers = peers.len(),
1628 "Sending an assignment to peers",
1629 );
1630
1631 let peers = peers
1632 .iter()
1633 .filter_map(|peer_id| {
1634 self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version))
1635 })
1636 .collect::<Vec<_>>();
1637
1638 send_assignments_batched(network_sender, assignments, &peers).await;
1639 }
1640 }
1641
1642 async fn check_assignment_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
1643 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1644 entry: &BlockEntry,
1645 assignment: &IndirectAssignmentCertV2,
1646 claimed_candidate_indices: &CandidateBitfield,
1647 runtime_info: &mut RuntimeInfo,
1648 runtime_api_sender: &mut RA,
1649 ) -> Result<CheckedIndirectAssignment, InvalidAssignmentError> {
1650 let ExtendedSessionInfo { ref session_info, .. } = runtime_info
1651 .get_session_info_by_index(runtime_api_sender, assignment.block_hash, entry.session)
1652 .await
1653 .map_err(|err| InvalidAssignmentError::SessionInfoNotFound(err))?;
1654
1655 if claimed_candidate_indices.len() > session_info.n_cores as usize {
1656 return Err(InvalidAssignmentError::OversizedClaimedBitfield);
1657 }
1658
1659 let claimed_cores: Vec<CoreIndex> = claimed_candidate_indices
1660 .iter_ones()
1661 .map(|candidate_index| {
1662 entry.candidates_metadata.get(candidate_index).map(|(_, core, _)| *core).ok_or(
1663 InvalidAssignmentError::ClaimedInvalidCandidateIndex {
1664 claimed_index: candidate_index,
1665 max_index: entry.candidates_metadata.len(),
1666 },
1667 )
1668 })
1669 .collect::<Result<Vec<_>, InvalidAssignmentError>>()?;
1670
1671 let Ok(claimed_cores) = claimed_cores.try_into() else {
1672 return Err(InvalidAssignmentError::NoClaimedCandidates);
1673 };
1674
1675 let backing_groups = claimed_candidate_indices
1676 .iter_ones()
1677 .flat_map(|candidate_index| {
1678 entry.candidates_metadata.get(candidate_index).map(|(_, _, group)| *group)
1679 })
1680 .collect::<Vec<_>>();
1681
1682 assignment_criteria
1683 .check_assignment_cert(
1684 claimed_cores,
1685 assignment.validator,
1686 &polkadot_node_primitives::approval::criteria::Config::from(session_info),
1687 entry.vrf_story.clone(),
1688 &assignment.cert,
1689 backing_groups,
1690 )
1691 .map_err(|err| InvalidAssignmentError::CryptoCheckFailed(err))
1692 .map(|tranche| {
1693 CheckedIndirectAssignment::from_checked(
1694 assignment.clone(),
1695 claimed_candidate_indices.clone(),
1696 tranche,
1697 )
1698 })
1699 }
1700 async fn check_approval_can_be_processed<
1703 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1704 >(
1705 network_sender: &mut N,
1706 assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
1707 approval_knowledge_key: &(MessageSubject, MessageKind),
1708 entry: &mut BlockEntry,
1709 blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1710 topologies: &SessionGridTopologies,
1711 aggression_config: &AggressionConfig,
1712 reputation: &mut ReputationAggregator,
1713 peer_id: PeerId,
1714 metrics: &Metrics,
1715 ) -> bool {
1716 for message_subject in assignments_knowledge_key {
1717 if !entry.knowledge.contains(&message_subject.0, message_subject.1) {
1718 gum::trace!(
1719 target: LOG_TARGET,
1720 ?peer_id,
1721 ?message_subject,
1722 "Unknown approval assignment",
1723 );
1724 modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1725 .await;
1726 metrics.on_approval_unknown_assignment();
1727 return false;
1728 }
1729 }
1730
1731 match entry.known_by.entry(peer_id) {
1733 hash_map::Entry::Occupied(mut knowledge) => {
1734 let peer_knowledge = knowledge.get_mut();
1735 if peer_knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1736 if !peer_knowledge
1737 .received
1738 .insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
1739 {
1740 if !Self::accept_duplicates_from_validators(
1741 blocks_by_number,
1742 topologies,
1743 aggression_config,
1744 entry,
1745 peer_id,
1746 ) {
1747 gum::trace!(
1748 target: LOG_TARGET,
1749 ?peer_id,
1750 ?approval_knowledge_key,
1751 "Duplicate approval",
1752 );
1753 modify_reputation(
1754 reputation,
1755 network_sender,
1756 peer_id,
1757 COST_DUPLICATE_MESSAGE,
1758 )
1759 .await;
1760 }
1761 metrics.on_approval_duplicate();
1762 }
1763 return false;
1764 }
1765 },
1766 hash_map::Entry::Vacant(_) => {
1767 gum::debug!(
1768 target: LOG_TARGET,
1769 ?peer_id,
1770 ?approval_knowledge_key,
1771 "Approval from a peer is out of view",
1772 );
1773 modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1774 .await;
1775 metrics.on_approval_out_of_view();
1776 },
1777 }
1778
1779 if entry.knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1780 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1781 peer_knowledge
1782 .received
1783 .insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1);
1784 }
1785
1786 gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval");
1788 metrics.on_approval_good_known();
1789 modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await;
1790 false
1791 } else {
1792 true
1793 }
1794 }
1795
1796 async fn import_and_circulate_approval<
1797 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1798 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1799 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1800 >(
1801 &mut self,
1802 approval_voting_sender: &mut A,
1803 network_sender: &mut N,
1804 runtime_api_sender: &mut RA,
1805 metrics: &Metrics,
1806 source: MessageSource,
1807 vote: IndirectSignedApprovalVoteV2,
1808 session_info_provider: &mut RuntimeInfo,
1809 ) {
1810 let block_hash = vote.block_hash;
1811 let validator_index = vote.validator;
1812 let candidate_indices = &vote.candidate_indices;
1813 let entry = match self.blocks.get_mut(&block_hash) {
1814 Some(entry) if entry.contains_candidates(&vote.candidate_indices) => entry,
1815 _ => {
1816 if let Some(peer_id) = source.peer_id() {
1817 if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1818 gum::debug!(
1819 target: LOG_TARGET,
1820 ?peer_id,
1821 ?block_hash,
1822 ?validator_index,
1823 ?candidate_indices,
1824 "Approval from a peer is out of view",
1825 );
1826 modify_reputation(
1827 &mut self.reputation,
1828 network_sender,
1829 peer_id,
1830 COST_UNEXPECTED_MESSAGE,
1831 )
1832 .await;
1833 metrics.on_approval_invalid_block();
1834 } else {
1835 metrics.on_approval_recent_outdated();
1836 }
1837 }
1838 return;
1839 },
1840 };
1841
1842 let assignments_knowledge_keys = PeerKnowledge::generate_assignments_keys(&vote);
1844 let approval_knwowledge_key = PeerKnowledge::generate_approval_key(&vote);
1845
1846 if let Some(peer_id) = source.peer_id() {
1847 if !Self::check_approval_can_be_processed(
1848 network_sender,
1849 &assignments_knowledge_keys,
1850 &approval_knwowledge_key,
1851 entry,
1852 &self.blocks_by_number,
1853 &self.topologies,
1854 &self.aggression_config,
1855 &mut self.reputation,
1856 peer_id,
1857 metrics,
1858 )
1859 .await
1860 {
1861 return;
1862 }
1863
1864 let result =
1865 Self::check_vote_valid(&vote, &entry, session_info_provider, runtime_api_sender)
1866 .await;
1867
1868 match result {
1869 Ok(vote) => {
1870 approval_voting_sender
1871 .send_message(ApprovalVotingMessage::ImportApproval(vote, None))
1872 .await;
1873
1874 modify_reputation(
1875 &mut self.reputation,
1876 network_sender,
1877 peer_id,
1878 BENEFIT_VALID_MESSAGE_FIRST,
1879 )
1880 .await;
1881
1882 entry
1883 .knowledge
1884 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1885 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1886 peer_knowledge
1887 .received
1888 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1889 }
1890 },
1891 Err(err) => {
1892 modify_reputation(
1893 &mut self.reputation,
1894 network_sender,
1895 peer_id,
1896 COST_INVALID_MESSAGE,
1897 )
1898 .await;
1899
1900 gum::info!(
1901 target: LOG_TARGET,
1902 ?peer_id,
1903 ?err,
1904 "Got a bad approval from peer",
1905 );
1906 metrics.on_approval_bad();
1907 return;
1908 },
1909 }
1910 } else {
1911 if !entry
1912 .knowledge
1913 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1)
1914 {
1915 gum::warn!(
1917 target: LOG_TARGET,
1918 "Importing locally an already known approval",
1919 );
1920 return;
1921 } else {
1922 gum::debug!(
1923 target: LOG_TARGET,
1924 "Importing locally a new approval",
1925 );
1926 }
1927 }
1928
1929 let (required_routing, peers_randomly_routed_to) = match entry.note_approval(vote.clone()) {
1930 Ok(required_routing) => required_routing,
1931 Err(err) => {
1932 gum::warn!(
1933 target: LOG_TARGET,
1934 hash = ?block_hash,
1935 validator_index = ?vote.validator,
1936 candidate_bitfield = ?vote.candidate_indices,
1937 ?err,
1938 "Possible bug: Vote import failed",
1939 );
1940 metrics.on_approval_bug();
1941 return;
1942 },
1943 };
1944
1945 metrics.on_approval_imported();
1948
1949 let topology = self.topologies.get_topology(entry.session);
1952 let source_peer = source.peer_id();
1953
1954 let peer_filter = move |peer| {
1955 if Some(peer) == source_peer.as_ref() {
1956 return false;
1957 }
1958
1959 let in_topology = topology
1969 .map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
1970 in_topology || peers_randomly_routed_to.contains(peer)
1971 };
1972
1973 let peers = entry
1974 .known_by
1975 .iter()
1976 .filter(|(p, _)| peer_filter(p))
1977 .filter_map(|(p, _)| self.peer_views.get(p).map(|entry| (*p, entry.version)))
1978 .collect::<Vec<_>>();
1979
1980 for peer in peers.iter() {
1982 if let Some(entry) = entry.known_by.get_mut(&peer.0) {
1984 entry.sent.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1985 }
1986 }
1987
1988 if !peers.is_empty() {
1989 let approvals = vec![vote];
1990 gum::trace!(
1991 target: LOG_TARGET,
1992 ?block_hash,
1993 local = source.peer_id().is_none(),
1994 num_peers = peers.len(),
1995 "Sending an approval to peers",
1996 );
1997 send_approvals_batched(network_sender, approvals, &peers).await;
1998 }
1999 }
2000
2001 async fn check_vote_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
2003 vote: &IndirectSignedApprovalVoteV2,
2004 entry: &BlockEntry,
2005 runtime_info: &mut RuntimeInfo,
2006 runtime_api_sender: &mut RA,
2007 ) -> Result<CheckedIndirectSignedApprovalVote, InvalidVoteError> {
2008 if vote.candidate_indices.len() > entry.candidates_metadata.len() {
2009 return Err(InvalidVoteError::CandidateIndexOutOfBounds);
2010 }
2011
2012 let candidate_hashes = vote
2013 .candidate_indices
2014 .iter_ones()
2015 .flat_map(|candidate_index| {
2016 entry
2017 .candidates_metadata
2018 .get(candidate_index)
2019 .map(|(candidate_hash, _, _)| *candidate_hash)
2020 })
2021 .collect::<Vec<_>>();
2022
2023 let ExtendedSessionInfo { ref session_info, .. } = runtime_info
2024 .get_session_info_by_index(runtime_api_sender, vote.block_hash, entry.session)
2025 .await
2026 .map_err(|err| InvalidVoteError::SessionInfoNotFound(err))?;
2027
2028 let pubkey = session_info
2029 .validators
2030 .get(vote.validator)
2031 .ok_or(InvalidVoteError::ValidatorIndexOutOfBounds)?;
2032 DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(
2033 candidate_hashes.clone(),
2034 ))
2035 .check_signature(
2036 &pubkey,
2037 *candidate_hashes.first().ok_or(InvalidVoteError::CandidateHashNotFound)?,
2038 entry.session,
2039 &vote.signature,
2040 )
2041 .map_err(|_| InvalidVoteError::InvalidSignature)
2042 .map(|_| CheckedIndirectSignedApprovalVote::from_checked(vote.clone()))
2043 }
2044
2045 fn get_approval_signatures(
2047 &mut self,
2048 indices: HashSet<(Hash, CandidateIndex)>,
2049 ) -> HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)> {
2050 let mut all_sigs = HashMap::new();
2051 for (hash, index) in indices {
2052 let block_entry = match self.blocks.get(&hash) {
2053 None => {
2054 gum::debug!(
2055 target: LOG_TARGET,
2056 ?hash,
2057 "`get_approval_signatures`: could not find block entry for given hash!"
2058 );
2059 continue;
2060 },
2061 Some(e) => e,
2062 };
2063
2064 let sigs = block_entry.approval_votes(index).into_iter().map(|approval| {
2065 (
2066 approval.validator,
2067 (
2068 hash,
2069 approval
2070 .candidate_indices
2071 .iter_ones()
2072 .map(|val| val as CandidateIndex)
2073 .collect_vec(),
2074 approval.signature,
2075 ),
2076 )
2077 });
2078 all_sigs.extend(sigs);
2079 }
2080 all_sigs
2081 }
2082
2083 async fn unify_with_peer(
2084 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2085 metrics: &Metrics,
2086 entries: &mut HashMap<Hash, BlockEntry>,
2087 topologies: &SessionGridTopologies,
2088 total_peers: usize,
2089 peer_id: PeerId,
2090 protocol_version: ProtocolVersion,
2091 view: View,
2092 rng: &mut (impl CryptoRng + Rng),
2093 retry_known_blocks: bool,
2094 ) {
2095 metrics.on_unify_with_peer();
2096 let _timer = metrics.time_unify_with_peer();
2097
2098 let mut assignments_to_send = Vec::new();
2099 let mut approvals_to_send = Vec::new();
2100
2101 let view_finalized_number = view.finalized_number;
2102 for head in view.into_iter() {
2103 let mut block = head;
2104
2105 loop {
2107 let entry = match entries.get_mut(&block) {
2108 Some(entry) if entry.number > view_finalized_number => entry,
2109 _ => break,
2110 };
2111
2112 if entry.known_by.contains_key(&peer_id) && !retry_known_blocks {
2118 break;
2119 }
2120
2121 let peer_knowledge = entry.known_by.entry(peer_id).or_default();
2122 let topology = topologies.get_topology(entry.session);
2123
2124 for approval_entry in entry.approval_entries.values_mut() {
2127 {
2130 let required_routing = approval_entry.routing_info().required_routing;
2131 let routing_info = &mut approval_entry.routing_info_mut();
2132 let rng = &mut *rng;
2133 let mut peer_filter = move |peer_id| {
2134 let in_topology = topology.as_ref().map_or(false, |t| {
2135 t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
2136 });
2137 in_topology || {
2138 if !topology
2139 .map(|topology| topology.is_validator(peer_id))
2140 .unwrap_or(false)
2141 {
2142 return false;
2143 }
2144
2145 let route_random =
2146 routing_info.random_routing.sample(total_peers, rng);
2147 if route_random {
2148 routing_info.mark_randomly_sent(*peer_id);
2149 }
2150
2151 route_random
2152 }
2153 };
2154
2155 if !peer_filter(&peer_id) {
2156 continue;
2157 }
2158 }
2159
2160 let assignment_message = approval_entry.assignment();
2161 let approval_messages = approval_entry.approvals();
2162 let (assignment_knowledge, message_kind) =
2163 approval_entry.create_assignment_knowledge(block);
2164
2165 if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2168 peer_knowledge.sent.insert(assignment_knowledge, message_kind);
2169 assignments_to_send.push(assignment_message);
2170 }
2171
2172 for approval_message in approval_messages {
2174 let approval_knowledge =
2175 PeerKnowledge::generate_approval_key(&approval_message);
2176
2177 if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2178 approvals_to_send.push(approval_message);
2179 peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2180 }
2181 }
2182 }
2183
2184 block = entry.parent_hash;
2185 }
2186 }
2187
2188 if !assignments_to_send.is_empty() {
2189 gum::trace!(
2190 target: LOG_TARGET,
2191 ?peer_id,
2192 ?protocol_version,
2193 num = assignments_to_send.len(),
2194 "Sending assignments to unified peer",
2195 );
2196
2197 send_assignments_batched(
2198 sender,
2199 assignments_to_send,
2200 &vec![(peer_id, protocol_version)],
2201 )
2202 .await;
2203 }
2204
2205 if !approvals_to_send.is_empty() {
2206 gum::trace!(
2207 target: LOG_TARGET,
2208 ?peer_id,
2209 ?protocol_version,
2210 num = approvals_to_send.len(),
2211 "Sending approvals to unified peer",
2212 );
2213
2214 send_approvals_batched(sender, approvals_to_send, &vec![(peer_id, protocol_version)])
2215 .await;
2216 }
2217 }
2218
2219 async fn enable_aggression<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
2230 &mut self,
2231 network_sender: &mut N,
2232 resend: Resend,
2233 metrics: &Metrics,
2234 ) {
2235 let config = self.aggression_config.clone();
2236 let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
2237 let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
2238
2239 let (min_age, max_age) = match (min_age, max_age) {
2241 (Some(min), Some(max)) => (*min, *max),
2242 _ => return, };
2244
2245 let age = max_age.saturating_sub(min_age);
2246
2247 if !self.aggression_config.should_trigger_aggression(age) {
2249 gum::trace!(
2250 target: LOG_TARGET,
2251 approval_checking_lag = self.approval_checking_lag,
2252 age,
2253 "Aggression not enabled",
2254 );
2255 return;
2256 }
2257 gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",);
2258
2259 adjust_required_routing_and_propagate(
2260 network_sender,
2261 &mut self.blocks,
2262 &self.topologies,
2263 |block_entry| {
2264 let block_age = max_age - block_entry.number;
2265 let diff_from_min_age = block_entry.number - min_age;
2269
2270 let blocks_since_last_sent = block_entry
2275 .last_resent_at_block_number
2276 .map(|last_resent_at_block_number| max_age - last_resent_at_block_number);
2277
2278 let can_resend_at_this_age = blocks_since_last_sent
2279 .zip(config.resend_unfinalized_period)
2280 .map(|(blocks_since_last_sent, unfinalized_period)| {
2281 blocks_since_last_sent >= unfinalized_period * 2
2282 })
2283 .unwrap_or(true);
2284
2285 if resend == Resend::Yes &&
2286 config.resend_unfinalized_period.as_ref().map_or(false, |p| {
2287 block_age > 0 &&
2288 block_age % p == 0 && diff_from_min_age == 0 &&
2289 can_resend_at_this_age
2290 }) {
2291 for (_, knowledge) in block_entry.known_by.iter_mut() {
2293 knowledge.sent = Knowledge::default();
2294 }
2295 block_entry.last_resent_at_block_number = Some(max_age);
2296 gum::debug!(
2297 target: LOG_TARGET,
2298 block_number = ?block_entry.number,
2299 ?max_age,
2300 "Aggression enabled with resend for block",
2301 );
2302 true
2303 } else {
2304 false
2305 }
2306 },
2307 |required_routing, _, _| *required_routing,
2308 &self.peer_views,
2309 )
2310 .await;
2311
2312 adjust_required_routing_and_propagate(
2313 network_sender,
2314 &mut self.blocks,
2315 &self.topologies,
2316 |block_entry| {
2317 block_entry.number == min_age
2323 },
2324 |required_routing, local, _| {
2325 if *required_routing == RequiredRouting::PendingTopology {
2327 gum::debug!(
2328 target: LOG_TARGET,
2329 lag = ?self.approval_checking_lag,
2330 "Encountered old block pending gossip topology",
2331 );
2332 return *required_routing;
2333 }
2334
2335 let mut new_required_routing = *required_routing;
2336
2337 if config.l1_threshold.as_ref().map_or(false, |t| &age >= t) {
2338 if local && new_required_routing != RequiredRouting::All {
2340 metrics.on_aggression_l1();
2341 new_required_routing = RequiredRouting::All;
2342 }
2343 }
2344
2345 if config.l2_threshold.as_ref().map_or(false, |t| &age >= t) {
2346 if !local && new_required_routing != RequiredRouting::GridXY {
2348 metrics.on_aggression_l2();
2349 new_required_routing = RequiredRouting::GridXY;
2350 }
2351 }
2352 new_required_routing
2353 },
2354 &self.peer_views,
2355 )
2356 .await;
2357 }
2358
2359 async fn sanitize_v2_assignments(
2362 &mut self,
2363 peer_id: PeerId,
2364 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2365 assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
2366 ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2367 let mut sanitized_assignments = Vec::new();
2368 for (cert, candidate_bitfield) in assignments.into_iter() {
2369 let cert_bitfield_bits = match &cert.cert.kind {
2370 AssignmentCertKindV2::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2371 AssignmentCertKindV2::RelayVRFModulo { .. } => candidate_bitfield.len(),
2375 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } => {
2376 core_bitfield.len()
2377 },
2378 };
2379
2380 let candidate_bitfield_bits = candidate_bitfield.len();
2381
2382 let msb = candidate_bitfield_bits - 1;
2384
2385 if cert_bitfield_bits > MAX_BITFIELD_SIZE
2387 || candidate_bitfield_bits > MAX_BITFIELD_SIZE
2388 || !candidate_bitfield.bit_at(msb.as_bit_index())
2390 {
2391 modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2393 .await;
2394 for candidate_index in candidate_bitfield.iter_ones() {
2395 gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, "Bad assignment v2, oversized bitfield");
2396 }
2397 } else {
2398 sanitized_assignments.push((cert, candidate_bitfield))
2399 }
2400 }
2401
2402 sanitized_assignments
2403 }
2404
2405 async fn sanitize_v2_approvals(
2407 &mut self,
2408 peer_id: PeerId,
2409 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2410 approval: Vec<IndirectSignedApprovalVoteV2>,
2411 ) -> Vec<IndirectSignedApprovalVoteV2> {
2412 let mut sanitized_approvals = Vec::new();
2413 for approval in approval.into_iter() {
2414 let has_no_approved_candidates = approval.candidate_indices.first_one().is_none();
2415 if approval.candidate_indices.len() as usize > MAX_BITFIELD_SIZE ||
2416 has_no_approved_candidates
2417 {
2418 modify_reputation(
2420 &mut self.reputation,
2421 sender,
2422 peer_id,
2423 if has_no_approved_candidates {
2424 COST_INVALID_MESSAGE
2425 } else {
2426 COST_OVERSIZED_BITFIELD
2427 },
2428 )
2429 .await;
2430 gum::debug!(
2431 target: LOG_TARGET,
2432 block_hash = ?approval.block_hash,
2433 candidate_indices_len = ?approval.candidate_indices.len(),
2434 "Bad approval v2, invalid candidate indices size"
2435 );
2436 } else {
2437 sanitized_approvals.push(approval)
2438 }
2439 }
2440
2441 sanitized_approvals
2442 }
2443}
2444
2445#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2458async fn adjust_required_routing_and_propagate<
2459 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2460 BlockFilter,
2461 RoutingModifier,
2462>(
2463 network_sender: &mut N,
2464 blocks: &mut HashMap<Hash, BlockEntry>,
2465 topologies: &SessionGridTopologies,
2466 block_filter: BlockFilter,
2467 routing_modifier: RoutingModifier,
2468 peer_views: &HashMap<PeerId, PeerEntry>,
2469) where
2470 BlockFilter: Fn(&mut BlockEntry) -> bool,
2471 RoutingModifier: Fn(&RequiredRouting, bool, &ValidatorIndex) -> RequiredRouting,
2472{
2473 let mut peer_assignments = HashMap::new();
2474 let mut peer_approvals = HashMap::new();
2475
2476 for (block_hash, block_entry) in blocks {
2479 if !block_filter(block_entry) {
2480 continue;
2481 }
2482
2483 let topology = match topologies.get_topology(block_entry.session) {
2484 Some(t) => t,
2485 None => continue,
2486 };
2487
2488 for approval_entry in block_entry.approval_entries.values_mut() {
2491 let new_required_routing = routing_modifier(
2492 &approval_entry.routing_info().required_routing,
2493 approval_entry.routing_info().local,
2494 &approval_entry.validator_index(),
2495 );
2496
2497 approval_entry.update_required_routing(new_required_routing);
2498
2499 if approval_entry.routing_info().required_routing.is_empty() {
2500 continue;
2501 }
2502
2503 let assignment_message = approval_entry.assignment();
2504 let approval_messages = approval_entry.approvals();
2505 let (assignment_knowledge, message_kind) =
2506 approval_entry.create_assignment_knowledge(*block_hash);
2507
2508 for (peer, peer_knowledge) in &mut block_entry.known_by {
2509 if !topology
2510 .local_grid_neighbors()
2511 .route_to_peer(approval_entry.routing_info().required_routing, peer)
2512 {
2513 continue;
2514 }
2515
2516 if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2518 peer_knowledge.sent.insert(assignment_knowledge.clone(), message_kind);
2519 peer_assignments
2520 .entry(*peer)
2521 .or_insert_with(Vec::new)
2522 .push(assignment_message.clone());
2523 }
2524
2525 for approval_message in &approval_messages {
2527 let approval_knowledge = PeerKnowledge::generate_approval_key(approval_message);
2528
2529 if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2530 peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2531 peer_approvals
2532 .entry(*peer)
2533 .or_insert_with(Vec::new)
2534 .push(approval_message.clone());
2535 }
2536 }
2537 }
2538 }
2539 }
2540
2541 for (peer, assignments_packet) in peer_assignments {
2543 if let Some(peer_view) = peer_views.get(&peer) {
2544 send_assignments_batched(
2545 network_sender,
2546 assignments_packet,
2547 &vec![(peer, peer_view.version)],
2548 )
2549 .await;
2550 } else {
2551 gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2553 }
2554 }
2555
2556 for (peer, approvals_packet) in peer_approvals {
2557 if let Some(peer_view) = peer_views.get(&peer) {
2558 send_approvals_batched(
2559 network_sender,
2560 approvals_packet,
2561 &vec![(peer, peer_view.version)],
2562 )
2563 .await;
2564 } else {
2565 gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2567 }
2568 }
2569}
2570
2571async fn modify_reputation(
2573 reputation: &mut ReputationAggregator,
2574 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2575 peer_id: PeerId,
2576 rep: Rep,
2577) {
2578 gum::trace!(
2579 target: LOG_TARGET,
2580 reputation = ?rep,
2581 ?peer_id,
2582 "Reputation change for peer",
2583 );
2584 reputation.modify(sender, peer_id, rep).await;
2585}
2586
2587#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2588impl ApprovalDistribution {
2589 pub fn new(
2591 metrics: Metrics,
2592 slot_duration_millis: u64,
2593 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2594 ) -> Self {
2595 Self::new_with_clock(
2596 metrics,
2597 slot_duration_millis,
2598 Arc::new(SystemClock),
2599 assignment_criteria,
2600 )
2601 }
2602
2603 pub fn new_with_clock(
2605 metrics: Metrics,
2606 slot_duration_millis: u64,
2607 clock: Arc<dyn Clock + Send + Sync>,
2608 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2609 ) -> Self {
2610 Self { metrics, slot_duration_millis, clock, assignment_criteria }
2611 }
2612
2613 async fn run<Context>(self, ctx: Context) {
2614 let mut state =
2615 State { slot_duration_millis: self.slot_duration_millis, ..Default::default() };
2616 let mut rng = rand::rngs::StdRng::from_entropy();
2619 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
2620 keystore: None,
2621 session_cache_lru_size: DISPUTE_WINDOW.get(),
2622 });
2623
2624 self.run_inner(
2625 ctx,
2626 &mut state,
2627 REPUTATION_CHANGE_INTERVAL,
2628 &mut rng,
2629 &mut session_info_provider,
2630 )
2631 .await
2632 }
2633
2634 async fn run_inner<Context>(
2636 self,
2637 mut ctx: Context,
2638 state: &mut State,
2639 reputation_interval: Duration,
2640 rng: &mut (impl CryptoRng + Rng),
2641 session_info_provider: &mut RuntimeInfo,
2642 ) {
2643 let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
2644 let mut reputation_delay = new_reputation_delay();
2645 let mut approval_voting_sender = ctx.sender().clone();
2646 let mut network_sender = ctx.sender().clone();
2647 let mut runtime_api_sender = ctx.sender().clone();
2648
2649 loop {
2650 select! {
2651 _ = reputation_delay => {
2652 state.reputation.send(ctx.sender()).await;
2653 reputation_delay = new_reputation_delay();
2654 },
2655 message = ctx.recv().fuse() => {
2656 let message = match message {
2657 Ok(message) => message,
2658 Err(e) => {
2659 gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
2660 return
2661 },
2662 };
2663
2664 if self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, &mut runtime_api_sender, state, rng, session_info_provider).await {
2665 return;
2666 }
2667
2668 },
2669 }
2670 }
2671 }
2672
2673 pub async fn handle_from_orchestra<
2677 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2678 A: overseer::SubsystemSender<ApprovalVotingMessage>,
2679 RA: overseer::SubsystemSender<RuntimeApiMessage>,
2680 >(
2681 &self,
2682 message: FromOrchestra<ApprovalDistributionMessage>,
2683 approval_voting_sender: &mut A,
2684 network_sender: &mut N,
2685 runtime_api_sender: &mut RA,
2686 state: &mut State,
2687 rng: &mut (impl CryptoRng + Rng),
2688 session_info_provider: &mut RuntimeInfo,
2689 ) -> bool {
2690 match message {
2691 FromOrchestra::Communication { msg } => {
2692 Self::handle_incoming(
2693 approval_voting_sender,
2694 network_sender,
2695 runtime_api_sender,
2696 state,
2697 msg,
2698 &self.metrics,
2699 rng,
2700 self.assignment_criteria.as_ref(),
2701 self.clock.as_ref(),
2702 session_info_provider,
2703 )
2704 .await
2705 },
2706 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_update)) => {
2707 gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
2708 },
2713 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
2714 gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
2715 state.handle_block_finalized(network_sender, &self.metrics, number).await;
2716 },
2717 FromOrchestra::Signal(OverseerSignal::Conclude) => return true,
2718 }
2719 false
2720 }
2721
2722 async fn handle_incoming<
2723 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2724 A: overseer::SubsystemSender<ApprovalVotingMessage>,
2725 RA: overseer::SubsystemSender<RuntimeApiMessage>,
2726 >(
2727 approval_voting_sender: &mut A,
2728 network_sender: &mut N,
2729 runtime_api_sender: &mut RA,
2730 state: &mut State,
2731 msg: ApprovalDistributionMessage,
2732 metrics: &Metrics,
2733 rng: &mut (impl CryptoRng + Rng),
2734 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
2735 clock: &(impl Clock + ?Sized),
2736 session_info_provider: &mut RuntimeInfo,
2737 ) {
2738 match msg {
2739 ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
2740 state
2741 .handle_network_msg(
2742 approval_voting_sender,
2743 network_sender,
2744 runtime_api_sender,
2745 metrics,
2746 event,
2747 rng,
2748 assignment_criteria,
2749 clock,
2750 session_info_provider,
2751 )
2752 .await;
2753 },
2754 ApprovalDistributionMessage::NewBlocks(metas) => {
2755 state
2756 .handle_new_blocks(
2757 approval_voting_sender,
2758 network_sender,
2759 runtime_api_sender,
2760 metrics,
2761 metas,
2762 rng,
2763 assignment_criteria,
2764 clock,
2765 session_info_provider,
2766 )
2767 .await;
2768 },
2769 ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => {
2770 gum::debug!(
2771 target: LOG_TARGET,
2772 ?candidate_indices,
2773 block_hash = ?cert.block_hash,
2774 assignment_kind = ?cert.cert.kind,
2775 "Distributing our assignment on candidates",
2776 );
2777
2778 state
2779 .import_and_circulate_assignment(
2780 approval_voting_sender,
2781 network_sender,
2782 runtime_api_sender,
2783 &metrics,
2784 MessageSource::Local,
2785 cert,
2786 candidate_indices,
2787 rng,
2788 assignment_criteria,
2789 clock,
2790 session_info_provider,
2791 )
2792 .await;
2793 },
2794 ApprovalDistributionMessage::DistributeApproval(vote) => {
2795 gum::debug!(
2796 target: LOG_TARGET,
2797 "Distributing our approval vote on candidate (block={}, index={:?})",
2798 vote.block_hash,
2799 vote.candidate_indices,
2800 );
2801
2802 state
2803 .import_and_circulate_approval(
2804 approval_voting_sender,
2805 network_sender,
2806 runtime_api_sender,
2807 metrics,
2808 MessageSource::Local,
2809 vote,
2810 session_info_provider,
2811 )
2812 .await;
2813 },
2814 ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
2815 let sigs = state.get_approval_signatures(indices);
2816 if let Err(_) = tx.send(sigs) {
2817 gum::debug!(
2818 target: LOG_TARGET,
2819 "Sending back approval signatures failed, oneshot got closed"
2820 );
2821 }
2822 },
2823 ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
2824 gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
2825 state.approval_checking_lag = lag;
2826 },
2827 }
2828 }
2829}
2830
2831#[overseer::subsystem(ApprovalDistribution, error=SubsystemError, prefix=self::overseer)]
2832impl<Context> ApprovalDistribution {
2833 fn start(self, ctx: Context) -> SpawnedSubsystem {
2834 let future = self.run(ctx).map(|_| Ok(())).boxed();
2835
2836 SpawnedSubsystem { name: "approval-distribution-subsystem", future }
2837 }
2838}
2839
2840const fn ensure_size_not_zero(size: usize) -> usize {
2842 if 0 == size {
2843 panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
2844 }
2845
2846 size
2847}
2848
2849pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
2854 MAX_NOTIFICATION_SIZE as usize /
2855 std::mem::size_of::<(IndirectAssignmentCertV2, CandidateIndex)>() /
2856 3,
2857);
2858
2859pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
2861 MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVoteV2>() / 3,
2862);
2863
2864async fn send_assignments_batched_inner(
2866 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2867 batch: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)>,
2868 peers: Vec<PeerId>,
2869 _peer_version: ValidationVersion,
2870) {
2871 sender
2872 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
2873 peers,
2874 ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2875 protocol_v3::ApprovalDistributionMessage::Assignments(batch.into_iter().collect()),
2876 )),
2877 ))
2878 .await;
2879}
2880
2881pub(crate) async fn send_assignments_batched(
2887 network_sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2888 v2_assignments: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)> + Clone,
2889 peers: &[(PeerId, ProtocolVersion)],
2890) {
2891 let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2892
2893 if !v3_peers.is_empty() {
2894 let mut v3 = v2_assignments.into_iter().peekable();
2895
2896 while v3.peek().is_some() {
2897 let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::<Vec<_>>();
2898 send_assignments_batched_inner(
2899 network_sender,
2900 batch,
2901 v3_peers.clone(),
2902 ValidationVersion::V3,
2903 )
2904 .await;
2905 }
2906 }
2907}
2908
2909pub(crate) async fn send_approvals_batched(
2911 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2912 approvals: impl IntoIterator<Item = IndirectSignedApprovalVoteV2> + Clone,
2913 peers: &[(PeerId, ProtocolVersion)],
2914) {
2915 let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
2916
2917 if !v3_peers.is_empty() {
2918 let mut batches = approvals.into_iter().peekable();
2919
2920 while batches.peek().is_some() {
2921 let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
2922
2923 sender
2924 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
2925 v3_peers.clone(),
2926 ValidationProtocols::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2927 protocol_v3::ApprovalDistributionMessage::Approvals(batch),
2928 )),
2929 ))
2930 .await;
2931 }
2932 }
2933}