1#![warn(missing_docs)]
25
26use self::metrics::Metrics;
27use futures::{select, FutureExt as _};
28use itertools::Itertools;
29use net_protocol::peer_set::{ProtocolVersion, ValidationVersion};
30use polkadot_node_network_protocol::{
31 self as net_protocol, filter_by_peer_version,
32 grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology},
33 peer_set::MAX_NOTIFICATION_SIZE,
34 v1 as protocol_v1, v2 as protocol_v2, v3 as protocol_v3, PeerId,
35 UnifiedReputationChange as Rep, Versioned, View,
36};
37use polkadot_node_primitives::{
38 approval::{
39 criteria::{AssignmentCriteria, InvalidAssignment},
40 time::{Clock, ClockExt, SystemClock, TICK_TOO_FAR_IN_FUTURE},
41 v1::{
42 AssignmentCertKind, BlockApprovalMeta, DelayTranche, IndirectAssignmentCert,
43 IndirectSignedApprovalVote, RelayVRFStory,
44 },
45 v2::{
46 AsBitIndex, AssignmentCertKindV2, CandidateBitfield, IndirectAssignmentCertV2,
47 IndirectSignedApprovalVoteV2,
48 },
49 },
50 DISPUTE_WINDOW,
51};
52use polkadot_node_subsystem::{
53 messages::{
54 ApprovalDistributionMessage, ApprovalVotingMessage, CheckedIndirectAssignment,
55 CheckedIndirectSignedApprovalVote, NetworkBridgeEvent, NetworkBridgeTxMessage,
56 RuntimeApiMessage,
57 },
58 overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
59};
60use polkadot_node_subsystem_util::{
61 reputation::{ReputationAggregator, REPUTATION_CHANGE_INTERVAL},
62 runtime::{Config as RuntimeInfoConfig, ExtendedSessionInfo, RuntimeInfo},
63};
64use polkadot_primitives::{
65 BlockNumber, CandidateHash, CandidateIndex, CoreIndex, DisputeStatement, GroupIndex, Hash,
66 SessionIndex, Slot, ValidDisputeStatementKind, ValidatorIndex, ValidatorSignature,
67};
68use rand::{CryptoRng, Rng, SeedableRng};
69use std::{
70 collections::{hash_map, BTreeMap, HashMap, HashSet, VecDeque},
71 sync::Arc,
72 time::Duration,
73};
74
75pub mod metrics;
77
78#[cfg(test)]
79mod tests;
80
81const LOG_TARGET: &str = "parachain::approval-distribution";
82
83const COST_UNEXPECTED_MESSAGE: Rep =
84 Rep::CostMinor("Peer sent an out-of-view assignment or approval");
85const COST_DUPLICATE_MESSAGE: Rep = Rep::CostMinorRepeated("Peer sent identical messages");
86const COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE: Rep =
87 Rep::CostMinor("The vote was valid but too far in the future");
88const COST_INVALID_MESSAGE: Rep = Rep::CostMajor("The vote was bad");
89const COST_OVERSIZED_BITFIELD: Rep = Rep::CostMajor("Oversized certificate or candidate bitfield");
90
91const BENEFIT_VALID_MESSAGE: Rep = Rep::BenefitMinor("Peer sent a valid message");
92const BENEFIT_VALID_MESSAGE_FIRST: Rep =
93 Rep::BenefitMinorFirst("Valid message with new information");
94
95const MAX_BITFIELD_SIZE: usize = 500;
97
98pub struct ApprovalDistribution {
100 metrics: Metrics,
101 slot_duration_millis: u64,
102 clock: Arc<dyn Clock + Send + Sync>,
103 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
104}
105
106#[derive(Default)]
109struct RecentlyOutdated {
110 buf: VecDeque<Hash>,
111}
112
113impl RecentlyOutdated {
114 fn note_outdated(&mut self, hash: Hash) {
115 const MAX_BUF_LEN: usize = 20;
116
117 self.buf.push_back(hash);
118
119 while self.buf.len() > MAX_BUF_LEN {
120 let _ = self.buf.pop_front();
121 }
122 }
123
124 fn is_recent_outdated(&self, hash: &Hash) -> bool {
125 self.buf.contains(hash)
126 }
127}
128
129struct ApprovalRouting {
131 required_routing: RequiredRouting,
132 local: bool,
133 random_routing: RandomRouting,
134 peers_randomly_routed: Vec<PeerId>,
135}
136
137impl ApprovalRouting {
138 fn mark_randomly_sent(&mut self, peer: PeerId) {
139 self.random_routing.inc_sent();
140 self.peers_randomly_routed.push(peer);
141 }
142}
143
144struct ApprovalEntry {
147 assignment: IndirectAssignmentCertV2,
149 assignment_claimed_candidates: CandidateBitfield,
151 approvals: HashMap<CandidateBitfield, IndirectSignedApprovalVoteV2>,
153 validator_index: ValidatorIndex,
155 routing_info: ApprovalRouting,
157}
158
159#[derive(Debug)]
160enum ApprovalEntryError {
161 InvalidValidatorIndex,
162 CandidateIndexOutOfBounds,
163 InvalidCandidateIndex,
164 DuplicateApproval,
165 UnknownAssignment,
166}
167
168impl ApprovalEntry {
169 pub fn new(
170 assignment: IndirectAssignmentCertV2,
171 candidates: CandidateBitfield,
172 routing_info: ApprovalRouting,
173 ) -> ApprovalEntry {
174 Self {
175 validator_index: assignment.validator,
176 assignment,
177 approvals: HashMap::new(),
178 assignment_claimed_candidates: candidates,
179 routing_info,
180 }
181 }
182
183 pub fn create_assignment_knowledge(&self, block_hash: Hash) -> (MessageSubject, MessageKind) {
185 (
186 MessageSubject(
187 block_hash,
188 self.assignment_claimed_candidates.clone(),
189 self.validator_index,
190 ),
191 MessageKind::Assignment,
192 )
193 }
194
195 pub fn routing_info_mut(&mut self) -> &mut ApprovalRouting {
197 &mut self.routing_info
198 }
199
200 pub fn routing_info(&self) -> &ApprovalRouting {
202 &self.routing_info
203 }
204
205 pub fn update_required_routing(&mut self, required_routing: RequiredRouting) {
207 self.routing_info.required_routing = required_routing;
208 }
209
210 pub fn includes_approval_candidates(&self, approval: &IndirectSignedApprovalVoteV2) -> bool {
212 for candidate_index in approval.candidate_indices.iter_ones() {
213 if self.assignment_claimed_candidates.bit_at((candidate_index).as_bit_index()) {
214 return true
215 }
216 }
217 return false
218 }
219
220 pub fn note_approval(
223 &mut self,
224 approval: IndirectSignedApprovalVoteV2,
225 ) -> Result<(), ApprovalEntryError> {
226 if self.validator_index != approval.validator {
231 return Err(ApprovalEntryError::InvalidValidatorIndex)
232 }
233
234 if !self.includes_approval_candidates(&approval) {
236 return Err(ApprovalEntryError::InvalidCandidateIndex)
237 }
238
239 if self.approvals.contains_key(&approval.candidate_indices) {
240 return Err(ApprovalEntryError::DuplicateApproval)
241 }
242
243 self.approvals.insert(approval.candidate_indices.clone(), approval.clone());
244 Ok(())
245 }
246
247 pub fn assignment(&self) -> (IndirectAssignmentCertV2, CandidateBitfield) {
249 (self.assignment.clone(), self.assignment_claimed_candidates.clone())
250 }
251
252 pub fn approvals(&self) -> Vec<IndirectSignedApprovalVoteV2> {
254 self.approvals.values().cloned().collect::<Vec<_>>()
255 }
256
257 pub fn validator_index(&self) -> ValidatorIndex {
259 self.validator_index
260 }
261}
262
263struct PeerEntry {
265 pub view: View,
266 pub version: ProtocolVersion,
267}
268
269#[derive(Clone)]
291struct AggressionConfig {
292 l1_threshold: Option<BlockNumber>,
294 l2_threshold: Option<BlockNumber>,
297 resend_unfinalized_period: Option<BlockNumber>,
300}
301
302impl AggressionConfig {
303 fn should_trigger_aggression(&self, age: BlockNumber) -> bool {
305 if let Some(t) = self.l1_threshold {
306 age >= t
307 } else if let Some(t) = self.resend_unfinalized_period {
308 age > 0 && age % t == 0
309 } else {
310 false
311 }
312 }
313}
314
315impl Default for AggressionConfig {
316 fn default() -> Self {
317 AggressionConfig {
318 l1_threshold: Some(16),
319 l2_threshold: Some(64),
320 resend_unfinalized_period: Some(8),
321 }
322 }
323}
324
325#[derive(PartialEq)]
326enum Resend {
327 Yes,
328 No,
329}
330
331#[derive(Default)]
336pub struct State {
337 blocks_by_number: BTreeMap<BlockNumber, Vec<Hash>>,
339 blocks: HashMap<Hash, BlockEntry>,
340
341 pending_known: HashMap<Hash, Vec<(PeerId, PendingMessage)>>,
349
350 peer_views: HashMap<PeerId, PeerEntry>,
352
353 topologies: SessionGridTopologies,
355
356 recent_outdated_blocks: RecentlyOutdated,
358
359 aggression_config: AggressionConfig,
361
362 approval_checking_lag: BlockNumber,
364
365 reputation: ReputationAggregator,
367
368 slot_duration_millis: u64,
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq)]
373enum MessageKind {
374 Assignment,
375 Approval,
376}
377
378#[derive(Debug, Clone, Hash, PartialEq, Eq)]
382struct MessageSubject(Hash, pub CandidateBitfield, ValidatorIndex);
383
384#[derive(Debug, Clone, Default)]
385struct Knowledge {
386 known_messages: HashMap<MessageSubject, MessageKind>,
390}
391
392impl Knowledge {
393 fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
394 match (kind, self.known_messages.get(message)) {
395 (_, None) => false,
396 (MessageKind::Assignment, Some(_)) => true,
397 (MessageKind::Approval, Some(MessageKind::Assignment)) => false,
398 (MessageKind::Approval, Some(MessageKind::Approval)) => true,
399 }
400 }
401
402 fn insert(&mut self, message: MessageSubject, kind: MessageKind) -> bool {
403 let mut success = match self.known_messages.entry(message.clone()) {
404 hash_map::Entry::Vacant(vacant) => {
405 vacant.insert(kind);
406 true
409 },
410 hash_map::Entry::Occupied(mut occupied) => match (*occupied.get(), kind) {
411 (MessageKind::Assignment, MessageKind::Assignment) => false,
412 (MessageKind::Approval, MessageKind::Approval) => false,
413 (MessageKind::Approval, MessageKind::Assignment) => false,
414 (MessageKind::Assignment, MessageKind::Approval) => {
415 *occupied.get_mut() = MessageKind::Approval;
416 true
417 },
418 },
419 };
420
421 if kind == MessageKind::Assignment && success && message.1.count_ones() > 1 {
425 for candidate_index in message.1.iter_ones() {
426 success = success &&
427 self.insert(
428 MessageSubject(
429 message.0,
430 vec![candidate_index as u32].try_into().expect("Non-empty vec; qed"),
431 message.2,
432 ),
433 kind,
434 );
435 }
436 }
437 success
438 }
439}
440
441#[derive(Debug, Clone, Default)]
443struct PeerKnowledge {
444 sent: Knowledge,
446 received: Knowledge,
448}
449
450impl PeerKnowledge {
451 fn contains(&self, message: &MessageSubject, kind: MessageKind) -> bool {
452 self.sent.contains(message, kind) || self.received.contains(message, kind)
453 }
454
455 fn generate_assignments_keys(
458 approval: &IndirectSignedApprovalVoteV2,
459 ) -> Vec<(MessageSubject, MessageKind)> {
460 approval
461 .candidate_indices
462 .iter_ones()
463 .map(|candidate_index| {
464 (
465 MessageSubject(
466 approval.block_hash,
467 (candidate_index as CandidateIndex).into(),
468 approval.validator,
469 ),
470 MessageKind::Assignment,
471 )
472 })
473 .collect_vec()
474 }
475
476 fn generate_approval_key(
478 approval: &IndirectSignedApprovalVoteV2,
479 ) -> (MessageSubject, MessageKind) {
480 (
481 MessageSubject(
482 approval.block_hash,
483 approval.candidate_indices.clone(),
484 approval.validator,
485 ),
486 MessageKind::Approval,
487 )
488 }
489}
490
491struct BlockEntry {
493 known_by: HashMap<PeerId, PeerKnowledge>,
496 number: BlockNumber,
498 parent_hash: Hash,
500 knowledge: Knowledge,
502 candidates: Vec<CandidateEntry>,
504 candidates_metadata: Vec<(CandidateHash, CoreIndex, GroupIndex)>,
506 session: SessionIndex,
508 approval_entries: HashMap<(ValidatorIndex, CandidateBitfield), ApprovalEntry>,
511 vrf_story: RelayVRFStory,
513 slot: Slot,
515 last_resent_at_block_number: Option<u32>,
517}
518
519impl BlockEntry {
520 pub fn known_by(&self) -> Vec<PeerId> {
522 self.known_by.keys().cloned().collect::<Vec<_>>()
523 }
524
525 pub fn insert_approval_entry(&mut self, entry: ApprovalEntry) -> &mut ApprovalEntry {
526 for claimed_candidate_index in entry.assignment_claimed_candidates.iter_ones() {
530 match self.candidates.get_mut(claimed_candidate_index) {
531 Some(candidate_entry) => {
532 candidate_entry
533 .assignments
534 .entry(entry.validator_index())
535 .or_insert(entry.assignment_claimed_candidates.clone());
536 },
537 None => {
538 gum::warn!(
541 target: LOG_TARGET,
542 hash = ?entry.assignment.block_hash,
543 ?claimed_candidate_index,
544 "Missing candidate entry on `import_and_circulate_assignment`",
545 );
546 },
547 };
548 }
549
550 self.approval_entries
551 .entry((entry.validator_index, entry.assignment_claimed_candidates.clone()))
552 .or_insert(entry)
553 }
554
555 pub fn contains_candidates(&self, candidate_indices: &CandidateBitfield) -> bool {
557 candidate_indices
558 .iter_ones()
559 .all(|candidate_index| self.candidates.get(candidate_index as usize).is_some())
560 }
561
562 pub fn note_approval(
568 &mut self,
569 approval: IndirectSignedApprovalVoteV2,
570 ) -> Result<(RequiredRouting, HashSet<PeerId>), ApprovalEntryError> {
571 let mut required_routing: Option<RequiredRouting> = None;
572 let mut peers_randomly_routed_to = HashSet::new();
573
574 if self.candidates.len() < approval.candidate_indices.len() as usize {
575 return Err(ApprovalEntryError::CandidateIndexOutOfBounds)
576 }
577
578 let covered_assignments_bitfields: HashSet<CandidateBitfield> = approval
580 .candidate_indices
581 .iter_ones()
582 .filter_map(|candidate_index| {
583 self.candidates.get_mut(candidate_index).map_or(None, |candidate_entry| {
584 candidate_entry.assignments.get(&approval.validator).cloned()
585 })
586 })
587 .collect();
588
589 for assignment_bitfield in covered_assignments_bitfields {
591 if let Some(approval_entry) =
592 self.approval_entries.get_mut(&(approval.validator, assignment_bitfield))
593 {
594 approval_entry.note_approval(approval.clone())?;
595 peers_randomly_routed_to
596 .extend(approval_entry.routing_info().peers_randomly_routed.iter());
597
598 if let Some(current_required_routing) = required_routing {
599 required_routing = Some(
600 current_required_routing
601 .combine(approval_entry.routing_info().required_routing),
602 );
603 } else {
604 required_routing = Some(approval_entry.routing_info().required_routing)
605 }
606 }
607 }
608
609 if let Some(required_routing) = required_routing {
610 Ok((required_routing, peers_randomly_routed_to))
611 } else {
612 Err(ApprovalEntryError::UnknownAssignment)
613 }
614 }
615
616 pub fn approval_votes(
618 &self,
619 candidate_index: CandidateIndex,
620 ) -> Vec<IndirectSignedApprovalVoteV2> {
621 let result: Option<
622 HashMap<(ValidatorIndex, CandidateBitfield), IndirectSignedApprovalVoteV2>,
623 > = self.candidates.get(candidate_index as usize).map(|candidate_entry| {
624 candidate_entry
625 .assignments
626 .iter()
627 .filter_map(|(validator, assignment_bitfield)| {
628 self.approval_entries.get(&(*validator, assignment_bitfield.clone()))
629 })
630 .flat_map(|approval_entry| {
631 approval_entry
632 .approvals
633 .clone()
634 .into_iter()
635 .filter(|(approved_candidates, _)| {
636 approved_candidates.bit_at(candidate_index.as_bit_index())
637 })
638 .map(|(approved_candidates, vote)| {
639 ((approval_entry.validator_index, approved_candidates), vote)
640 })
641 })
642 .collect()
643 });
644
645 result.map(|result| result.into_values().collect_vec()).unwrap_or_default()
646 }
647}
648
649#[derive(Debug, Default)]
653struct CandidateEntry {
654 assignments: HashMap<ValidatorIndex, CandidateBitfield>,
657}
658
659#[derive(Debug, Clone, PartialEq)]
660enum MessageSource {
661 Peer(PeerId),
662 Local,
663}
664
665#[derive(Debug)]
667enum InvalidAssignmentError {
668 #[allow(dead_code)]
670 CryptoCheckFailed(InvalidAssignment),
671 NoClaimedCandidates,
673 #[allow(dead_code)]
675 ClaimedInvalidCandidateIndex {
676 claimed_index: usize,
677 max_index: usize,
678 },
679 OversizedClaimedBitfield,
681 #[allow(dead_code)]
683 SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
684}
685
686#[derive(Debug)]
688enum InvalidVoteError {
689 CandidateIndexOutOfBounds,
691 ValidatorIndexOutOfBounds,
693 InvalidSignature,
695 #[allow(dead_code)]
697 SessionInfoNotFound(polkadot_node_subsystem_util::runtime::Error),
698}
699
700impl MessageSource {
701 fn peer_id(&self) -> Option<PeerId> {
702 match self {
703 Self::Peer(id) => Some(*id),
704 Self::Local => None,
705 }
706 }
707}
708
709enum PendingMessage {
710 Assignment(IndirectAssignmentCertV2, CandidateBitfield),
711 Approval(IndirectSignedApprovalVoteV2),
712}
713
714#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
715impl State {
716 pub fn with_config(slot_duration_millis: u64) -> Self {
718 Self { slot_duration_millis, ..Default::default() }
719 }
720
721 async fn handle_network_msg<
722 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
723 A: overseer::SubsystemSender<ApprovalVotingMessage>,
724 RA: overseer::SubsystemSender<RuntimeApiMessage>,
725 >(
726 &mut self,
727 approval_voting_sender: &mut A,
728 network_sender: &mut N,
729 runtime_api_sender: &mut RA,
730 metrics: &Metrics,
731 event: NetworkBridgeEvent<net_protocol::ApprovalDistributionMessage>,
732 rng: &mut (impl CryptoRng + Rng),
733 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
734 clock: &(impl Clock + ?Sized),
735 session_info_provider: &mut RuntimeInfo,
736 ) {
737 match event {
738 NetworkBridgeEvent::PeerConnected(peer_id, role, version, authority_ids) => {
739 gum::trace!(target: LOG_TARGET, ?peer_id, ?role, ?authority_ids, "Peer connected");
740 if let Some(authority_ids) = authority_ids {
741 self.topologies.update_authority_ids(peer_id, &authority_ids);
742 }
743 self.peer_views
745 .entry(peer_id)
746 .or_insert(PeerEntry { view: Default::default(), version });
747 },
748 NetworkBridgeEvent::PeerDisconnected(peer_id) => {
749 gum::trace!(target: LOG_TARGET, ?peer_id, "Peer disconnected");
750 self.peer_views.remove(&peer_id);
751 self.blocks.iter_mut().for_each(|(_hash, entry)| {
752 entry.known_by.remove(&peer_id);
753 })
754 },
755 NetworkBridgeEvent::NewGossipTopology(topology) => {
756 self.handle_new_session_topology(
757 network_sender,
758 topology.session,
759 topology.topology,
760 topology.local_index,
761 )
762 .await;
763 },
764 NetworkBridgeEvent::PeerViewChange(peer_id, view) => {
765 self.handle_peer_view_change(network_sender, metrics, peer_id, view, rng).await;
766 },
767 NetworkBridgeEvent::OurViewChange(view) => {
768 gum::trace!(target: LOG_TARGET, ?view, "Own view change");
769 for head in view.iter() {
770 if !self.blocks.contains_key(head) {
771 self.pending_known.entry(*head).or_default();
772 }
773 }
774
775 self.pending_known.retain(|h, _| {
776 let live = view.contains(h);
777 if !live {
778 gum::trace!(
779 target: LOG_TARGET,
780 block_hash = ?h,
781 "Cleaning up stale pending messages",
782 );
783 }
784 live
785 });
786 },
787 NetworkBridgeEvent::PeerMessage(peer_id, message) => {
788 self.process_incoming_peer_message(
789 approval_voting_sender,
790 network_sender,
791 runtime_api_sender,
792 metrics,
793 peer_id,
794 message,
795 rng,
796 assignment_criteria,
797 clock,
798 session_info_provider,
799 )
800 .await;
801 },
802 NetworkBridgeEvent::UpdatedAuthorityIds(peer_id, authority_ids) => {
803 gum::debug!(target: LOG_TARGET, ?peer_id, ?authority_ids, "Update Authority Ids");
804 if self.topologies.update_authority_ids(peer_id, &authority_ids) {
807 if let Some(PeerEntry { view, version }) = self.peer_views.get(&peer_id) {
808 let intersection = self
809 .blocks_by_number
810 .iter()
811 .filter(|(block_number, _)| *block_number > &view.finalized_number)
812 .flat_map(|(_, hashes)| {
813 hashes.iter().filter(|hash| {
814 self.blocks
815 .get(&hash)
816 .map(|block| block.known_by.get(&peer_id).is_some())
817 .unwrap_or_default()
818 })
819 });
820 let view_intersection =
821 View::new(intersection.cloned(), view.finalized_number);
822 Self::unify_with_peer(
823 network_sender,
824 metrics,
825 &mut self.blocks,
826 &self.topologies,
827 self.peer_views.len(),
828 peer_id,
829 *version,
830 view_intersection,
831 rng,
832 true,
833 )
834 .await;
835 }
836 }
837 },
838 }
839 }
840
841 async fn handle_new_blocks<
842 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
843 A: overseer::SubsystemSender<ApprovalVotingMessage>,
844 RA: overseer::SubsystemSender<RuntimeApiMessage>,
845 >(
846 &mut self,
847 approval_voting_sender: &mut A,
848 network_sender: &mut N,
849 runtime_api_sender: &mut RA,
850 metrics: &Metrics,
851 metas: Vec<BlockApprovalMeta>,
852 rng: &mut (impl CryptoRng + Rng),
853 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
854 clock: &(impl Clock + ?Sized),
855 session_info_provider: &mut RuntimeInfo,
856 ) {
857 let mut new_hashes = HashSet::new();
858
859 gum::debug!(
860 target: LOG_TARGET,
861 "Got new blocks {:?}",
862 metas.iter().map(|m| (m.hash, m.number)).collect::<Vec<_>>(),
863 );
864
865 for meta in metas {
866 match self.blocks.entry(meta.hash) {
867 hash_map::Entry::Vacant(entry) => {
868 let candidates_count = meta.candidates.len();
869 let mut candidates = Vec::with_capacity(candidates_count);
870 candidates.resize_with(candidates_count, Default::default);
871
872 entry.insert(BlockEntry {
873 known_by: HashMap::new(),
874 number: meta.number,
875 parent_hash: meta.parent_hash,
876 knowledge: Knowledge::default(),
877 candidates,
878 session: meta.session,
879 approval_entries: HashMap::new(),
880 candidates_metadata: meta.candidates,
881 vrf_story: meta.vrf_story,
882 slot: meta.slot,
883 last_resent_at_block_number: None,
884 });
885
886 self.topologies.inc_session_refs(meta.session);
887
888 new_hashes.insert(meta.hash);
889
890 self.blocks_by_number.entry(meta.number).or_default().push(meta.hash);
893 },
894 _ => continue,
895 }
896 }
897
898 {
899 for (peer_id, PeerEntry { view, version }) in self.peer_views.iter() {
900 let intersection = view.iter().filter(|h| new_hashes.contains(h));
901 let view_intersection = View::new(intersection.cloned(), view.finalized_number);
902 Self::unify_with_peer(
903 network_sender,
904 metrics,
905 &mut self.blocks,
906 &self.topologies,
907 self.peer_views.len(),
908 *peer_id,
909 *version,
910 view_intersection,
911 rng,
912 false,
913 )
914 .await;
915 }
916
917 let pending_now_known = self
918 .pending_known
919 .keys()
920 .filter(|k| self.blocks.contains_key(k))
921 .copied()
922 .collect::<Vec<_>>();
923
924 let to_import = pending_now_known
925 .into_iter()
926 .inspect(|h| {
927 gum::trace!(
928 target: LOG_TARGET,
929 block_hash = ?h,
930 "Extracting pending messages for new block"
931 )
932 })
933 .filter_map(|k| self.pending_known.remove(&k))
934 .flatten()
935 .collect::<Vec<_>>();
936
937 if !to_import.is_empty() {
938 gum::debug!(
939 target: LOG_TARGET,
940 num = to_import.len(),
941 "Processing pending assignment/approvals",
942 );
943
944 let _timer = metrics.time_import_pending_now_known();
945
946 for (peer_id, message) in to_import {
947 match message {
948 PendingMessage::Assignment(assignment, claimed_indices) => {
949 self.import_and_circulate_assignment(
950 approval_voting_sender,
951 network_sender,
952 runtime_api_sender,
953 metrics,
954 MessageSource::Peer(peer_id),
955 assignment,
956 claimed_indices,
957 rng,
958 assignment_criteria,
959 clock,
960 session_info_provider,
961 )
962 .await;
963 },
964 PendingMessage::Approval(approval_vote) => {
965 self.import_and_circulate_approval(
966 approval_voting_sender,
967 network_sender,
968 runtime_api_sender,
969 metrics,
970 MessageSource::Peer(peer_id),
971 approval_vote,
972 session_info_provider,
973 )
974 .await;
975 },
976 }
977 }
978 }
979 }
980
981 self.enable_aggression(network_sender, Resend::Yes, metrics).await;
982 }
983
984 async fn handle_new_session_topology<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
985 &mut self,
986 network_sender: &mut N,
987 session: SessionIndex,
988 topology: SessionGridTopology,
989 local_index: Option<ValidatorIndex>,
990 ) {
991 if local_index.is_none() {
992 return
994 }
995
996 self.topologies.insert_topology(session, topology, local_index);
997 let topology = self.topologies.get_topology(session).expect("just inserted above; qed");
998
999 adjust_required_routing_and_propagate(
1000 network_sender,
1001 &mut self.blocks,
1002 &self.topologies,
1003 |block_entry| block_entry.session == session,
1004 |required_routing, local, validator_index| {
1005 if required_routing == &RequiredRouting::PendingTopology {
1006 topology
1007 .local_grid_neighbors()
1008 .required_routing_by_index(*validator_index, local)
1009 } else {
1010 *required_routing
1011 }
1012 },
1013 &self.peer_views,
1014 )
1015 .await;
1016 }
1017
1018 async fn process_incoming_assignments<A, N, R, RA>(
1019 &mut self,
1020 approval_voting_sender: &mut A,
1021 network_sender: &mut N,
1022 runtime_api_sender: &mut RA,
1023 metrics: &Metrics,
1024 peer_id: PeerId,
1025 assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
1026 rng: &mut R,
1027 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1028 clock: &(impl Clock + ?Sized),
1029 session_info_provider: &mut RuntimeInfo,
1030 ) where
1031 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1032 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1033 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1034 R: CryptoRng + Rng,
1035 {
1036 for (assignment, claimed_indices) in assignments {
1037 if let Some(pending) = self.pending_known.get_mut(&assignment.block_hash) {
1038 let block_hash = &assignment.block_hash;
1039 let validator_index = assignment.validator;
1040
1041 gum::trace!(
1042 target: LOG_TARGET,
1043 %peer_id,
1044 ?block_hash,
1045 ?claimed_indices,
1046 ?validator_index,
1047 "Pending assignment",
1048 );
1049
1050 pending.push((peer_id, PendingMessage::Assignment(assignment, claimed_indices)));
1051
1052 continue
1053 }
1054
1055 self.import_and_circulate_assignment(
1056 approval_voting_sender,
1057 network_sender,
1058 runtime_api_sender,
1059 metrics,
1060 MessageSource::Peer(peer_id),
1061 assignment,
1062 claimed_indices,
1063 rng,
1064 assignment_criteria,
1065 clock,
1066 session_info_provider,
1067 )
1068 .await;
1069 }
1070 }
1071
1072 async fn process_incoming_approvals<
1074 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1075 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1076 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1077 >(
1078 &mut self,
1079 approval_voting_sender: &mut A,
1080 network_sender: &mut N,
1081 runtime_api_sender: &mut RA,
1082 metrics: &Metrics,
1083 peer_id: PeerId,
1084 approvals: Vec<IndirectSignedApprovalVoteV2>,
1085 session_info_provider: &mut RuntimeInfo,
1086 ) {
1087 gum::trace!(
1088 target: LOG_TARGET,
1089 peer_id = %peer_id,
1090 num = approvals.len(),
1091 "Processing approvals from a peer",
1092 );
1093 for approval_vote in approvals.into_iter() {
1094 if let Some(pending) = self.pending_known.get_mut(&approval_vote.block_hash) {
1095 let block_hash = approval_vote.block_hash;
1096 let validator_index = approval_vote.validator;
1097
1098 gum::trace!(
1099 target: LOG_TARGET,
1100 %peer_id,
1101 ?block_hash,
1102 ?validator_index,
1103 "Pending assignment candidates {:?}",
1104 approval_vote.candidate_indices,
1105 );
1106
1107 pending.push((peer_id, PendingMessage::Approval(approval_vote)));
1108
1109 continue
1110 }
1111
1112 self.import_and_circulate_approval(
1113 approval_voting_sender,
1114 network_sender,
1115 runtime_api_sender,
1116 metrics,
1117 MessageSource::Peer(peer_id),
1118 approval_vote,
1119 session_info_provider,
1120 )
1121 .await;
1122 }
1123 }
1124
1125 async fn process_incoming_peer_message<A, N, RA, R>(
1126 &mut self,
1127 approval_voting_sender: &mut A,
1128 network_sender: &mut N,
1129 runtime_api_sender: &mut RA,
1130 metrics: &Metrics,
1131 peer_id: PeerId,
1132 msg: Versioned<
1133 protocol_v1::ApprovalDistributionMessage,
1134 protocol_v2::ApprovalDistributionMessage,
1135 protocol_v3::ApprovalDistributionMessage,
1136 >,
1137 rng: &mut R,
1138 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1139 clock: &(impl Clock + ?Sized),
1140 session_info_provider: &mut RuntimeInfo,
1141 ) where
1142 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1143 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1144 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1145 R: CryptoRng + Rng,
1146 {
1147 match msg {
1148 Versioned::V3(protocol_v3::ApprovalDistributionMessage::Assignments(assignments)) => {
1149 gum::trace!(
1150 target: LOG_TARGET,
1151 peer_id = %peer_id,
1152 num = assignments.len(),
1153 "Processing assignments from a peer",
1154 );
1155 let sanitized_assignments =
1156 self.sanitize_v2_assignments(peer_id, network_sender, assignments).await;
1157
1158 self.process_incoming_assignments(
1159 approval_voting_sender,
1160 network_sender,
1161 runtime_api_sender,
1162 metrics,
1163 peer_id,
1164 sanitized_assignments,
1165 rng,
1166 assignment_criteria,
1167 clock,
1168 session_info_provider,
1169 )
1170 .await;
1171 },
1172 Versioned::V1(protocol_v1::ApprovalDistributionMessage::Assignments(assignments)) |
1173 Versioned::V2(protocol_v2::ApprovalDistributionMessage::Assignments(assignments)) => {
1174 gum::trace!(
1175 target: LOG_TARGET,
1176 peer_id = %peer_id,
1177 num = assignments.len(),
1178 "Processing assignments from a peer",
1179 );
1180
1181 let sanitized_assignments =
1182 self.sanitize_v1_assignments(peer_id, network_sender, assignments).await;
1183
1184 self.process_incoming_assignments(
1185 approval_voting_sender,
1186 network_sender,
1187 runtime_api_sender,
1188 metrics,
1189 peer_id,
1190 sanitized_assignments,
1191 rng,
1192 assignment_criteria,
1193 clock,
1194 session_info_provider,
1195 )
1196 .await;
1197 },
1198 Versioned::V3(protocol_v3::ApprovalDistributionMessage::Approvals(approvals)) => {
1199 let sanitized_approvals =
1200 self.sanitize_v2_approvals(peer_id, network_sender, approvals).await;
1201 self.process_incoming_approvals(
1202 approval_voting_sender,
1203 network_sender,
1204 runtime_api_sender,
1205 metrics,
1206 peer_id,
1207 sanitized_approvals,
1208 session_info_provider,
1209 )
1210 .await;
1211 },
1212 Versioned::V1(protocol_v1::ApprovalDistributionMessage::Approvals(approvals)) |
1213 Versioned::V2(protocol_v2::ApprovalDistributionMessage::Approvals(approvals)) => {
1214 let sanitized_approvals =
1215 self.sanitize_v1_approvals(peer_id, network_sender, approvals).await;
1216 self.process_incoming_approvals(
1217 approval_voting_sender,
1218 network_sender,
1219 runtime_api_sender,
1220 metrics,
1221 peer_id,
1222 sanitized_approvals,
1223 session_info_provider,
1224 )
1225 .await;
1226 },
1227 }
1228 }
1229
1230 async fn handle_peer_view_change<N: overseer::SubsystemSender<NetworkBridgeTxMessage>, R>(
1233 &mut self,
1234 network_sender: &mut N,
1235 metrics: &Metrics,
1236 peer_id: PeerId,
1237 view: View,
1238 rng: &mut R,
1239 ) where
1240 R: CryptoRng + Rng,
1241 {
1242 gum::trace!(target: LOG_TARGET, ?view, "Peer view change");
1243 let finalized_number = view.finalized_number;
1244
1245 let (old_view, protocol_version) =
1246 if let Some(peer_entry) = self.peer_views.get_mut(&peer_id) {
1247 (Some(std::mem::replace(&mut peer_entry.view, view.clone())), peer_entry.version)
1248 } else {
1249 gum::warn!(
1251 target: LOG_TARGET,
1252 ?peer_id,
1253 ?view,
1254 "Peer view change for missing `peer_entry`"
1255 );
1256
1257 (None, ValidationVersion::V1.into())
1258 };
1259
1260 let old_finalized_number = old_view.map(|v| v.finalized_number).unwrap_or(0);
1261
1262 let blocks = &mut self.blocks;
1264 let range = old_finalized_number..=finalized_number;
1269 if !range.is_empty() && !blocks.is_empty() {
1270 self.blocks_by_number
1271 .range(range)
1272 .flat_map(|(_number, hashes)| hashes)
1273 .for_each(|hash| {
1274 if let Some(entry) = blocks.get_mut(hash) {
1275 entry.known_by.remove(&peer_id);
1276 }
1277 });
1278 }
1279
1280 Self::unify_with_peer(
1281 network_sender,
1282 metrics,
1283 &mut self.blocks,
1284 &self.topologies,
1285 self.peer_views.len(),
1286 peer_id,
1287 protocol_version,
1288 view,
1289 rng,
1290 false,
1291 )
1292 .await;
1293 }
1294
1295 async fn handle_block_finalized<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
1296 &mut self,
1297 network_sender: &mut N,
1298 metrics: &Metrics,
1299 finalized_number: BlockNumber,
1300 ) {
1301 let split_point = finalized_number.saturating_add(1);
1305 let mut old_blocks = self.blocks_by_number.split_off(&split_point);
1306
1307 std::mem::swap(&mut self.blocks_by_number, &mut old_blocks);
1309
1310 old_blocks.values().flatten().for_each(|relay_block| {
1312 self.recent_outdated_blocks.note_outdated(*relay_block);
1313 if let Some(block_entry) = self.blocks.remove(relay_block) {
1314 self.topologies.dec_session_refs(block_entry.session);
1315 }
1316 });
1317
1318 self.enable_aggression(network_sender, Resend::No, metrics).await;
1321 }
1322
1323 fn accept_duplicates_from_validators(
1328 blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1329 topologies: &SessionGridTopologies,
1330 aggression_config: &AggressionConfig,
1331 entry: &BlockEntry,
1332 peer: PeerId,
1333 ) -> bool {
1334 let topology = topologies.get_topology(entry.session);
1335 let min_age = blocks_by_number.iter().next().map(|(num, _)| num);
1336 let max_age = blocks_by_number.iter().rev().next().map(|(num, _)| num);
1337
1338 let (min_age, max_age) = match (min_age, max_age) {
1340 (Some(min), Some(max)) => (*min, *max),
1341 _ => return false,
1342 };
1343
1344 let age = max_age.saturating_sub(min_age);
1345
1346 aggression_config.should_trigger_aggression(age) &&
1347 topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false)
1348 }
1349
1350 async fn import_and_circulate_assignment<A, N, RA, R>(
1351 &mut self,
1352 approval_voting_sender: &mut A,
1353 network_sender: &mut N,
1354 runtime_api_sender: &mut RA,
1355 metrics: &Metrics,
1356 source: MessageSource,
1357 assignment: IndirectAssignmentCertV2,
1358 claimed_candidate_indices: CandidateBitfield,
1359 rng: &mut R,
1360 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1361 clock: &(impl Clock + ?Sized),
1362 session_info_provider: &mut RuntimeInfo,
1363 ) where
1364 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1365 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1366 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1367 R: CryptoRng + Rng,
1368 {
1369 let block_hash = assignment.block_hash;
1370 let validator_index = assignment.validator;
1371
1372 let entry = match self.blocks.get_mut(&block_hash) {
1373 Some(entry) => entry,
1374 None => {
1375 if let Some(peer_id) = source.peer_id() {
1376 gum::trace!(
1377 target: LOG_TARGET,
1378 ?peer_id,
1379 hash = ?block_hash,
1380 ?validator_index,
1381 "Unexpected assignment",
1382 );
1383 if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1384 modify_reputation(
1385 &mut self.reputation,
1386 network_sender,
1387 peer_id,
1388 COST_UNEXPECTED_MESSAGE,
1389 )
1390 .await;
1391 gum::debug!(target: LOG_TARGET, "Received assignment for invalid block");
1392 metrics.on_assignment_recent_outdated();
1393 }
1394 }
1395 metrics.on_assignment_invalid_block();
1396 return
1397 },
1398 };
1399
1400 let (message_subject, message_kind) = (
1402 MessageSubject(block_hash, claimed_candidate_indices.clone(), validator_index),
1403 MessageKind::Assignment,
1404 );
1405
1406 if let Some(peer_id) = source.peer_id() {
1407 match entry.known_by.entry(peer_id) {
1409 hash_map::Entry::Occupied(mut peer_knowledge) => {
1410 let peer_knowledge = peer_knowledge.get_mut();
1411 if peer_knowledge.contains(&message_subject, message_kind) {
1412 if !peer_knowledge.received.insert(message_subject.clone(), message_kind) {
1414 if !Self::accept_duplicates_from_validators(
1415 &self.blocks_by_number,
1416 &self.topologies,
1417 &self.aggression_config,
1418 entry,
1419 peer_id,
1420 ) {
1421 gum::debug!(
1422 target: LOG_TARGET,
1423 ?peer_id,
1424 ?message_subject,
1425 "Duplicate assignment",
1426 );
1427
1428 modify_reputation(
1429 &mut self.reputation,
1430 network_sender,
1431 peer_id,
1432 COST_DUPLICATE_MESSAGE,
1433 )
1434 .await;
1435 }
1436
1437 metrics.on_assignment_duplicate();
1438 } else {
1439 gum::trace!(
1440 target: LOG_TARGET,
1441 ?peer_id,
1442 hash = ?block_hash,
1443 ?validator_index,
1444 ?message_subject,
1445 "We sent the message to the peer while peer was sending it to us. Known race condition.",
1446 );
1447 }
1448 return
1449 }
1450 },
1451 hash_map::Entry::Vacant(_) => {
1452 gum::debug!(
1453 target: LOG_TARGET,
1454 ?peer_id,
1455 ?message_subject,
1456 "Assignment from a peer is out of view",
1457 );
1458 modify_reputation(
1459 &mut self.reputation,
1460 network_sender,
1461 peer_id,
1462 COST_UNEXPECTED_MESSAGE,
1463 )
1464 .await;
1465 metrics.on_assignment_out_of_view();
1466 },
1467 }
1468
1469 if entry.knowledge.contains(&message_subject, message_kind) {
1471 modify_reputation(
1472 &mut self.reputation,
1473 network_sender,
1474 peer_id,
1475 BENEFIT_VALID_MESSAGE,
1476 )
1477 .await;
1478 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1479 gum::trace!(target: LOG_TARGET, ?peer_id, ?message_subject, "Known assignment");
1480 peer_knowledge.received.insert(message_subject, message_kind);
1481 }
1482 metrics.on_assignment_good_known();
1483 return
1484 }
1485
1486 let result = Self::check_assignment_valid(
1487 assignment_criteria,
1488 &entry,
1489 &assignment,
1490 &claimed_candidate_indices,
1491 session_info_provider,
1492 runtime_api_sender,
1493 )
1494 .await;
1495
1496 match result {
1497 Ok(checked_assignment) => {
1498 let current_tranche = clock.tranche_now(self.slot_duration_millis, entry.slot);
1499 let too_far_in_future =
1500 current_tranche + TICK_TOO_FAR_IN_FUTURE as DelayTranche;
1501
1502 if checked_assignment.tranche() >= too_far_in_future {
1503 gum::debug!(
1504 target: LOG_TARGET,
1505 hash = ?block_hash,
1506 ?peer_id,
1507 "Got an assignment too far in the future",
1508 );
1509 modify_reputation(
1510 &mut self.reputation,
1511 network_sender,
1512 peer_id,
1513 COST_ASSIGNMENT_TOO_FAR_IN_THE_FUTURE,
1514 )
1515 .await;
1516 metrics.on_assignment_far();
1517
1518 return
1519 }
1520
1521 approval_voting_sender
1522 .send_message(ApprovalVotingMessage::ImportAssignment(
1523 checked_assignment,
1524 None,
1525 ))
1526 .await;
1527 modify_reputation(
1528 &mut self.reputation,
1529 network_sender,
1530 peer_id,
1531 BENEFIT_VALID_MESSAGE_FIRST,
1532 )
1533 .await;
1534 entry.knowledge.insert(message_subject.clone(), message_kind);
1535 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1536 peer_knowledge.received.insert(message_subject.clone(), message_kind);
1537 }
1538 },
1539 Err(error) => {
1540 gum::info!(
1541 target: LOG_TARGET,
1542 hash = ?block_hash,
1543 ?peer_id,
1544 ?error,
1545 "Got a bad assignment from peer",
1546 );
1547 modify_reputation(
1548 &mut self.reputation,
1549 network_sender,
1550 peer_id,
1551 COST_INVALID_MESSAGE,
1552 )
1553 .await;
1554 metrics.on_assignment_bad();
1555 return
1556 },
1557 }
1558 } else {
1559 if !entry.knowledge.insert(message_subject.clone(), message_kind) {
1560 gum::warn!(
1562 target: LOG_TARGET,
1563 ?message_subject,
1564 "Importing locally an already known assignment",
1565 );
1566 return
1567 } else {
1568 gum::debug!(
1569 target: LOG_TARGET,
1570 ?message_subject,
1571 "Importing locally a new assignment",
1572 );
1573 }
1574 }
1575
1576 metrics.on_assignment_imported(&assignment.cert.kind);
1579
1580 let topology = self.topologies.get_topology(entry.session);
1581 let local = source == MessageSource::Local;
1582
1583 let required_routing = topology.map_or(RequiredRouting::PendingTopology, |t| {
1584 t.local_grid_neighbors().required_routing_by_index(validator_index, local)
1585 });
1586 let mut peers = HashSet::new();
1588
1589 let peers_to_route_to = topology
1590 .as_ref()
1591 .map(|t| t.peers_to_route(required_routing))
1592 .unwrap_or_default();
1593
1594 for peer in peers_to_route_to {
1595 if !entry.known_by.contains_key(&peer) {
1596 continue
1597 }
1598
1599 peers.insert(peer);
1600 }
1601
1602 let peers_to_filter = entry.known_by();
1604
1605 let approval_entry = entry.insert_approval_entry(ApprovalEntry::new(
1606 assignment.clone(),
1607 claimed_candidate_indices.clone(),
1608 ApprovalRouting {
1609 required_routing,
1610 local,
1611 random_routing: Default::default(),
1612 peers_randomly_routed: Default::default(),
1613 },
1614 ));
1615
1616 let assignments = vec![(assignment, claimed_candidate_indices.clone())];
1623 let n_peers_total = self.peer_views.len();
1624 let source_peer = source.peer_id();
1625
1626 for peer in peers_to_filter.into_iter() {
1628 if Some(peer) == source_peer {
1629 continue
1630 }
1631
1632 if peers.contains(&peer) {
1633 continue
1634 }
1635
1636 if !topology.map(|topology| topology.is_validator(&peer)).unwrap_or(false) {
1637 continue
1638 }
1639
1640 let route_random =
1644 approval_entry.routing_info().random_routing.sample(n_peers_total, rng);
1645
1646 if route_random {
1647 approval_entry.routing_info_mut().mark_randomly_sent(peer);
1648 peers.insert(peer);
1649 }
1650
1651 if approval_entry.routing_info().random_routing.is_complete() {
1652 break
1653 }
1654 }
1655
1656 for peer in peers.iter() {
1658 if let Some(peer_knowledge) = entry.known_by.get_mut(peer) {
1660 peer_knowledge.sent.insert(message_subject.clone(), message_kind);
1661 }
1662 }
1663
1664 if !peers.is_empty() {
1665 gum::trace!(
1666 target: LOG_TARGET,
1667 ?block_hash,
1668 ?claimed_candidate_indices,
1669 local = source.peer_id().is_none(),
1670 num_peers = peers.len(),
1671 "Sending an assignment to peers",
1672 );
1673
1674 let peers = peers
1675 .iter()
1676 .filter_map(|peer_id| {
1677 self.peer_views.get(peer_id).map(|peer_entry| (*peer_id, peer_entry.version))
1678 })
1679 .collect::<Vec<_>>();
1680
1681 send_assignments_batched(network_sender, assignments, &peers).await;
1682 }
1683 }
1684
1685 async fn check_assignment_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
1686 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
1687 entry: &BlockEntry,
1688 assignment: &IndirectAssignmentCertV2,
1689 claimed_candidate_indices: &CandidateBitfield,
1690 runtime_info: &mut RuntimeInfo,
1691 runtime_api_sender: &mut RA,
1692 ) -> Result<CheckedIndirectAssignment, InvalidAssignmentError> {
1693 let ExtendedSessionInfo { ref session_info, .. } = runtime_info
1694 .get_session_info_by_index(runtime_api_sender, assignment.block_hash, entry.session)
1695 .await
1696 .map_err(|err| InvalidAssignmentError::SessionInfoNotFound(err))?;
1697
1698 if claimed_candidate_indices.len() > session_info.n_cores as usize {
1699 return Err(InvalidAssignmentError::OversizedClaimedBitfield)
1700 }
1701
1702 let claimed_cores: Vec<CoreIndex> = claimed_candidate_indices
1703 .iter_ones()
1704 .map(|candidate_index| {
1705 entry.candidates_metadata.get(candidate_index).map(|(_, core, _)| *core).ok_or(
1706 InvalidAssignmentError::ClaimedInvalidCandidateIndex {
1707 claimed_index: candidate_index,
1708 max_index: entry.candidates_metadata.len(),
1709 },
1710 )
1711 })
1712 .collect::<Result<Vec<_>, InvalidAssignmentError>>()?;
1713
1714 let Ok(claimed_cores) = claimed_cores.try_into() else {
1715 return Err(InvalidAssignmentError::NoClaimedCandidates)
1716 };
1717
1718 let backing_groups = claimed_candidate_indices
1719 .iter_ones()
1720 .flat_map(|candidate_index| {
1721 entry.candidates_metadata.get(candidate_index).map(|(_, _, group)| *group)
1722 })
1723 .collect::<Vec<_>>();
1724
1725 assignment_criteria
1726 .check_assignment_cert(
1727 claimed_cores,
1728 assignment.validator,
1729 &polkadot_node_primitives::approval::criteria::Config::from(session_info),
1730 entry.vrf_story.clone(),
1731 &assignment.cert,
1732 backing_groups,
1733 )
1734 .map_err(|err| InvalidAssignmentError::CryptoCheckFailed(err))
1735 .map(|tranche| {
1736 CheckedIndirectAssignment::from_checked(
1737 assignment.clone(),
1738 claimed_candidate_indices.clone(),
1739 tranche,
1740 )
1741 })
1742 }
1743 async fn check_approval_can_be_processed<
1746 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1747 >(
1748 network_sender: &mut N,
1749 assignments_knowledge_key: &Vec<(MessageSubject, MessageKind)>,
1750 approval_knowledge_key: &(MessageSubject, MessageKind),
1751 entry: &mut BlockEntry,
1752 blocks_by_number: &BTreeMap<BlockNumber, Vec<Hash>>,
1753 topologies: &SessionGridTopologies,
1754 aggression_config: &AggressionConfig,
1755 reputation: &mut ReputationAggregator,
1756 peer_id: PeerId,
1757 metrics: &Metrics,
1758 ) -> bool {
1759 for message_subject in assignments_knowledge_key {
1760 if !entry.knowledge.contains(&message_subject.0, message_subject.1) {
1761 gum::trace!(
1762 target: LOG_TARGET,
1763 ?peer_id,
1764 ?message_subject,
1765 "Unknown approval assignment",
1766 );
1767 modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1768 .await;
1769 metrics.on_approval_unknown_assignment();
1770 return false
1771 }
1772 }
1773
1774 match entry.known_by.entry(peer_id) {
1776 hash_map::Entry::Occupied(mut knowledge) => {
1777 let peer_knowledge = knowledge.get_mut();
1778 if peer_knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1779 if !peer_knowledge
1780 .received
1781 .insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1)
1782 {
1783 if !Self::accept_duplicates_from_validators(
1784 blocks_by_number,
1785 topologies,
1786 aggression_config,
1787 entry,
1788 peer_id,
1789 ) {
1790 gum::trace!(
1791 target: LOG_TARGET,
1792 ?peer_id,
1793 ?approval_knowledge_key,
1794 "Duplicate approval",
1795 );
1796 modify_reputation(
1797 reputation,
1798 network_sender,
1799 peer_id,
1800 COST_DUPLICATE_MESSAGE,
1801 )
1802 .await;
1803 }
1804 metrics.on_approval_duplicate();
1805 }
1806 return false
1807 }
1808 },
1809 hash_map::Entry::Vacant(_) => {
1810 gum::debug!(
1811 target: LOG_TARGET,
1812 ?peer_id,
1813 ?approval_knowledge_key,
1814 "Approval from a peer is out of view",
1815 );
1816 modify_reputation(reputation, network_sender, peer_id, COST_UNEXPECTED_MESSAGE)
1817 .await;
1818 metrics.on_approval_out_of_view();
1819 },
1820 }
1821
1822 if entry.knowledge.contains(&approval_knowledge_key.0, approval_knowledge_key.1) {
1823 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1824 peer_knowledge
1825 .received
1826 .insert(approval_knowledge_key.0.clone(), approval_knowledge_key.1);
1827 }
1828
1829 gum::trace!(target: LOG_TARGET, ?peer_id, ?approval_knowledge_key, "Known approval");
1831 metrics.on_approval_good_known();
1832 modify_reputation(reputation, network_sender, peer_id, BENEFIT_VALID_MESSAGE).await;
1833 false
1834 } else {
1835 true
1836 }
1837 }
1838
1839 async fn import_and_circulate_approval<
1840 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
1841 A: overseer::SubsystemSender<ApprovalVotingMessage>,
1842 RA: overseer::SubsystemSender<RuntimeApiMessage>,
1843 >(
1844 &mut self,
1845 approval_voting_sender: &mut A,
1846 network_sender: &mut N,
1847 runtime_api_sender: &mut RA,
1848 metrics: &Metrics,
1849 source: MessageSource,
1850 vote: IndirectSignedApprovalVoteV2,
1851 session_info_provider: &mut RuntimeInfo,
1852 ) {
1853 let block_hash = vote.block_hash;
1854 let validator_index = vote.validator;
1855 let candidate_indices = &vote.candidate_indices;
1856 let entry = match self.blocks.get_mut(&block_hash) {
1857 Some(entry) if entry.contains_candidates(&vote.candidate_indices) => entry,
1858 _ => {
1859 if let Some(peer_id) = source.peer_id() {
1860 if !self.recent_outdated_blocks.is_recent_outdated(&block_hash) {
1861 gum::debug!(
1862 target: LOG_TARGET,
1863 ?peer_id,
1864 ?block_hash,
1865 ?validator_index,
1866 ?candidate_indices,
1867 "Approval from a peer is out of view",
1868 );
1869 modify_reputation(
1870 &mut self.reputation,
1871 network_sender,
1872 peer_id,
1873 COST_UNEXPECTED_MESSAGE,
1874 )
1875 .await;
1876 metrics.on_approval_invalid_block();
1877 } else {
1878 metrics.on_approval_recent_outdated();
1879 }
1880 }
1881 return
1882 },
1883 };
1884
1885 let assignments_knowledge_keys = PeerKnowledge::generate_assignments_keys(&vote);
1887 let approval_knwowledge_key = PeerKnowledge::generate_approval_key(&vote);
1888
1889 if let Some(peer_id) = source.peer_id() {
1890 if !Self::check_approval_can_be_processed(
1891 network_sender,
1892 &assignments_knowledge_keys,
1893 &approval_knwowledge_key,
1894 entry,
1895 &self.blocks_by_number,
1896 &self.topologies,
1897 &self.aggression_config,
1898 &mut self.reputation,
1899 peer_id,
1900 metrics,
1901 )
1902 .await
1903 {
1904 return
1905 }
1906
1907 let result =
1908 Self::check_vote_valid(&vote, &entry, session_info_provider, runtime_api_sender)
1909 .await;
1910
1911 match result {
1912 Ok(vote) => {
1913 approval_voting_sender
1914 .send_message(ApprovalVotingMessage::ImportApproval(vote, None))
1915 .await;
1916
1917 modify_reputation(
1918 &mut self.reputation,
1919 network_sender,
1920 peer_id,
1921 BENEFIT_VALID_MESSAGE_FIRST,
1922 )
1923 .await;
1924
1925 entry
1926 .knowledge
1927 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1928 if let Some(peer_knowledge) = entry.known_by.get_mut(&peer_id) {
1929 peer_knowledge
1930 .received
1931 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
1932 }
1933 },
1934 Err(err) => {
1935 modify_reputation(
1936 &mut self.reputation,
1937 network_sender,
1938 peer_id,
1939 COST_INVALID_MESSAGE,
1940 )
1941 .await;
1942
1943 gum::info!(
1944 target: LOG_TARGET,
1945 ?peer_id,
1946 ?err,
1947 "Got a bad approval from peer",
1948 );
1949 metrics.on_approval_bad();
1950 return
1951 },
1952 }
1953 } else {
1954 if !entry
1955 .knowledge
1956 .insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1)
1957 {
1958 gum::warn!(
1960 target: LOG_TARGET,
1961 "Importing locally an already known approval",
1962 );
1963 return
1964 } else {
1965 gum::debug!(
1966 target: LOG_TARGET,
1967 "Importing locally a new approval",
1968 );
1969 }
1970 }
1971
1972 let (required_routing, peers_randomly_routed_to) = match entry.note_approval(vote.clone()) {
1973 Ok(required_routing) => required_routing,
1974 Err(err) => {
1975 gum::warn!(
1976 target: LOG_TARGET,
1977 hash = ?block_hash,
1978 validator_index = ?vote.validator,
1979 candidate_bitfield = ?vote.candidate_indices,
1980 ?err,
1981 "Possible bug: Vote import failed",
1982 );
1983 metrics.on_approval_bug();
1984 return
1985 },
1986 };
1987
1988 metrics.on_approval_imported();
1991
1992 let topology = self.topologies.get_topology(entry.session);
1995 let source_peer = source.peer_id();
1996
1997 let peer_filter = move |peer| {
1998 if Some(peer) == source_peer.as_ref() {
1999 return false
2000 }
2001
2002 let in_topology = topology
2012 .map_or(false, |t| t.local_grid_neighbors().route_to_peer(required_routing, peer));
2013 in_topology || peers_randomly_routed_to.contains(peer)
2014 };
2015
2016 let peers = entry
2017 .known_by
2018 .iter()
2019 .filter(|(p, _)| peer_filter(p))
2020 .filter_map(|(p, _)| self.peer_views.get(p).map(|entry| (*p, entry.version)))
2021 .collect::<Vec<_>>();
2022
2023 for peer in peers.iter() {
2025 if let Some(entry) = entry.known_by.get_mut(&peer.0) {
2027 entry.sent.insert(approval_knwowledge_key.0.clone(), approval_knwowledge_key.1);
2028 }
2029 }
2030
2031 if !peers.is_empty() {
2032 let approvals = vec![vote];
2033 gum::trace!(
2034 target: LOG_TARGET,
2035 ?block_hash,
2036 local = source.peer_id().is_none(),
2037 num_peers = peers.len(),
2038 "Sending an approval to peers",
2039 );
2040 send_approvals_batched(network_sender, approvals, &peers).await;
2041 }
2042 }
2043
2044 async fn check_vote_valid<RA: overseer::SubsystemSender<RuntimeApiMessage>>(
2046 vote: &IndirectSignedApprovalVoteV2,
2047 entry: &BlockEntry,
2048 runtime_info: &mut RuntimeInfo,
2049 runtime_api_sender: &mut RA,
2050 ) -> Result<CheckedIndirectSignedApprovalVote, InvalidVoteError> {
2051 if vote.candidate_indices.len() > entry.candidates_metadata.len() {
2052 return Err(InvalidVoteError::CandidateIndexOutOfBounds)
2053 }
2054
2055 let candidate_hashes = vote
2056 .candidate_indices
2057 .iter_ones()
2058 .flat_map(|candidate_index| {
2059 entry
2060 .candidates_metadata
2061 .get(candidate_index)
2062 .map(|(candidate_hash, _, _)| *candidate_hash)
2063 })
2064 .collect::<Vec<_>>();
2065
2066 let ExtendedSessionInfo { ref session_info, .. } = runtime_info
2067 .get_session_info_by_index(runtime_api_sender, vote.block_hash, entry.session)
2068 .await
2069 .map_err(|err| InvalidVoteError::SessionInfoNotFound(err))?;
2070
2071 let pubkey = session_info
2072 .validators
2073 .get(vote.validator)
2074 .ok_or(InvalidVoteError::ValidatorIndexOutOfBounds)?;
2075 DisputeStatement::Valid(ValidDisputeStatementKind::ApprovalCheckingMultipleCandidates(
2076 candidate_hashes.clone(),
2077 ))
2078 .check_signature(
2079 &pubkey,
2080 *candidate_hashes.first().unwrap(),
2081 entry.session,
2082 &vote.signature,
2083 )
2084 .map_err(|_| InvalidVoteError::InvalidSignature)
2085 .map(|_| CheckedIndirectSignedApprovalVote::from_checked(vote.clone()))
2086 }
2087
2088 fn get_approval_signatures(
2090 &mut self,
2091 indices: HashSet<(Hash, CandidateIndex)>,
2092 ) -> HashMap<ValidatorIndex, (Hash, Vec<CandidateIndex>, ValidatorSignature)> {
2093 let mut all_sigs = HashMap::new();
2094 for (hash, index) in indices {
2095 let block_entry = match self.blocks.get(&hash) {
2096 None => {
2097 gum::debug!(
2098 target: LOG_TARGET,
2099 ?hash,
2100 "`get_approval_signatures`: could not find block entry for given hash!"
2101 );
2102 continue
2103 },
2104 Some(e) => e,
2105 };
2106
2107 let sigs = block_entry.approval_votes(index).into_iter().map(|approval| {
2108 (
2109 approval.validator,
2110 (
2111 hash,
2112 approval
2113 .candidate_indices
2114 .iter_ones()
2115 .map(|val| val as CandidateIndex)
2116 .collect_vec(),
2117 approval.signature,
2118 ),
2119 )
2120 });
2121 all_sigs.extend(sigs);
2122 }
2123 all_sigs
2124 }
2125
2126 async fn unify_with_peer(
2127 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2128 metrics: &Metrics,
2129 entries: &mut HashMap<Hash, BlockEntry>,
2130 topologies: &SessionGridTopologies,
2131 total_peers: usize,
2132 peer_id: PeerId,
2133 protocol_version: ProtocolVersion,
2134 view: View,
2135 rng: &mut (impl CryptoRng + Rng),
2136 retry_known_blocks: bool,
2137 ) {
2138 metrics.on_unify_with_peer();
2139 let _timer = metrics.time_unify_with_peer();
2140
2141 let mut assignments_to_send = Vec::new();
2142 let mut approvals_to_send = Vec::new();
2143
2144 let view_finalized_number = view.finalized_number;
2145 for head in view.into_iter() {
2146 let mut block = head;
2147
2148 loop {
2150 let entry = match entries.get_mut(&block) {
2151 Some(entry) if entry.number > view_finalized_number => entry,
2152 _ => break,
2153 };
2154
2155 if entry.known_by.contains_key(&peer_id) && !retry_known_blocks {
2161 break
2162 }
2163
2164 let peer_knowledge = entry.known_by.entry(peer_id).or_default();
2165 let topology = topologies.get_topology(entry.session);
2166
2167 for approval_entry in entry.approval_entries.values_mut() {
2170 {
2173 let required_routing = approval_entry.routing_info().required_routing;
2174 let routing_info = &mut approval_entry.routing_info_mut();
2175 let rng = &mut *rng;
2176 let mut peer_filter = move |peer_id| {
2177 let in_topology = topology.as_ref().map_or(false, |t| {
2178 t.local_grid_neighbors().route_to_peer(required_routing, peer_id)
2179 });
2180 in_topology || {
2181 if !topology
2182 .map(|topology| topology.is_validator(peer_id))
2183 .unwrap_or(false)
2184 {
2185 return false
2186 }
2187
2188 let route_random =
2189 routing_info.random_routing.sample(total_peers, rng);
2190 if route_random {
2191 routing_info.mark_randomly_sent(*peer_id);
2192 }
2193
2194 route_random
2195 }
2196 };
2197
2198 if !peer_filter(&peer_id) {
2199 continue
2200 }
2201 }
2202
2203 let assignment_message = approval_entry.assignment();
2204 let approval_messages = approval_entry.approvals();
2205 let (assignment_knowledge, message_kind) =
2206 approval_entry.create_assignment_knowledge(block);
2207
2208 if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2211 peer_knowledge.sent.insert(assignment_knowledge, message_kind);
2212 assignments_to_send.push(assignment_message);
2213 }
2214
2215 for approval_message in approval_messages {
2217 let approval_knowledge =
2218 PeerKnowledge::generate_approval_key(&approval_message);
2219
2220 if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2221 approvals_to_send.push(approval_message);
2222 peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2223 }
2224 }
2225 }
2226
2227 block = entry.parent_hash;
2228 }
2229 }
2230
2231 if !assignments_to_send.is_empty() {
2232 gum::trace!(
2233 target: LOG_TARGET,
2234 ?peer_id,
2235 ?protocol_version,
2236 num = assignments_to_send.len(),
2237 "Sending assignments to unified peer",
2238 );
2239
2240 send_assignments_batched(
2241 sender,
2242 assignments_to_send,
2243 &vec![(peer_id, protocol_version)],
2244 )
2245 .await;
2246 }
2247
2248 if !approvals_to_send.is_empty() {
2249 gum::trace!(
2250 target: LOG_TARGET,
2251 ?peer_id,
2252 ?protocol_version,
2253 num = approvals_to_send.len(),
2254 "Sending approvals to unified peer",
2255 );
2256
2257 send_approvals_batched(sender, approvals_to_send, &vec![(peer_id, protocol_version)])
2258 .await;
2259 }
2260 }
2261
2262 async fn enable_aggression<N: overseer::SubsystemSender<NetworkBridgeTxMessage>>(
2273 &mut self,
2274 network_sender: &mut N,
2275 resend: Resend,
2276 metrics: &Metrics,
2277 ) {
2278 let config = self.aggression_config.clone();
2279 let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
2280 let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
2281
2282 let (min_age, max_age) = match (min_age, max_age) {
2284 (Some(min), Some(max)) => (*min, *max),
2285 _ => return, };
2287
2288 let age = max_age.saturating_sub(min_age);
2289
2290 if !self.aggression_config.should_trigger_aggression(age) {
2292 gum::trace!(
2293 target: LOG_TARGET,
2294 approval_checking_lag = self.approval_checking_lag,
2295 age,
2296 "Aggression not enabled",
2297 );
2298 return
2299 }
2300 gum::debug!(target: LOG_TARGET, min_age, max_age, "Aggression enabled",);
2301
2302 adjust_required_routing_and_propagate(
2303 network_sender,
2304 &mut self.blocks,
2305 &self.topologies,
2306 |block_entry| {
2307 let block_age = max_age - block_entry.number;
2308 let diff_from_min_age = block_entry.number - min_age;
2312
2313 let blocks_since_last_sent = block_entry
2318 .last_resent_at_block_number
2319 .map(|last_resent_at_block_number| max_age - last_resent_at_block_number);
2320
2321 let can_resend_at_this_age = blocks_since_last_sent
2322 .zip(config.resend_unfinalized_period)
2323 .map(|(blocks_since_last_sent, unfinalized_period)| {
2324 blocks_since_last_sent >= unfinalized_period * 2
2325 })
2326 .unwrap_or(true);
2327
2328 if resend == Resend::Yes &&
2329 config.resend_unfinalized_period.as_ref().map_or(false, |p| {
2330 block_age > 0 &&
2331 block_age % p == 0 && diff_from_min_age == 0 &&
2332 can_resend_at_this_age
2333 }) {
2334 for (_, knowledge) in block_entry.known_by.iter_mut() {
2336 knowledge.sent = Knowledge::default();
2337 }
2338 block_entry.last_resent_at_block_number = Some(max_age);
2339 gum::debug!(
2340 target: LOG_TARGET,
2341 block_number = ?block_entry.number,
2342 ?max_age,
2343 "Aggression enabled with resend for block",
2344 );
2345 true
2346 } else {
2347 false
2348 }
2349 },
2350 |required_routing, _, _| *required_routing,
2351 &self.peer_views,
2352 )
2353 .await;
2354
2355 adjust_required_routing_and_propagate(
2356 network_sender,
2357 &mut self.blocks,
2358 &self.topologies,
2359 |block_entry| {
2360 block_entry.number == min_age
2366 },
2367 |required_routing, local, _| {
2368 if *required_routing == RequiredRouting::PendingTopology {
2370 gum::debug!(
2371 target: LOG_TARGET,
2372 lag = ?self.approval_checking_lag,
2373 "Encountered old block pending gossip topology",
2374 );
2375 return *required_routing
2376 }
2377
2378 let mut new_required_routing = *required_routing;
2379
2380 if config.l1_threshold.as_ref().map_or(false, |t| &age >= t) {
2381 if local && new_required_routing != RequiredRouting::All {
2383 metrics.on_aggression_l1();
2384 new_required_routing = RequiredRouting::All;
2385 }
2386 }
2387
2388 if config.l2_threshold.as_ref().map_or(false, |t| &age >= t) {
2389 if !local && new_required_routing != RequiredRouting::GridXY {
2391 metrics.on_aggression_l2();
2392 new_required_routing = RequiredRouting::GridXY;
2393 }
2394 }
2395 new_required_routing
2396 },
2397 &self.peer_views,
2398 )
2399 .await;
2400 }
2401
2402 async fn sanitize_v1_assignments(
2405 &mut self,
2406 peer_id: PeerId,
2407 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2408 assignments: Vec<(IndirectAssignmentCert, CandidateIndex)>,
2409 ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2410 let mut sanitized_assignments = Vec::new();
2411 for (cert, candidate_index) in assignments.into_iter() {
2412 let cert_bitfield_bits = match cert.cert.kind {
2413 AssignmentCertKind::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2414 AssignmentCertKind::RelayVRFModulo { .. } => candidate_index as usize + 1,
2418 };
2419
2420 let candidate_bitfield_bits = candidate_index as usize + 1;
2421
2422 if cert_bitfield_bits > MAX_BITFIELD_SIZE || candidate_bitfield_bits > MAX_BITFIELD_SIZE
2424 {
2425 modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2427 .await;
2428 gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, kind = ?cert.cert.kind, "Bad assignment v1, invalid candidate index");
2429 } else {
2430 sanitized_assignments.push((cert.into(), candidate_index.into()))
2431 }
2432 }
2433
2434 sanitized_assignments
2435 }
2436
2437 async fn sanitize_v2_assignments(
2440 &mut self,
2441 peer_id: PeerId,
2442 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2443 assignments: Vec<(IndirectAssignmentCertV2, CandidateBitfield)>,
2444 ) -> Vec<(IndirectAssignmentCertV2, CandidateBitfield)> {
2445 let mut sanitized_assignments = Vec::new();
2446 for (cert, candidate_bitfield) in assignments.into_iter() {
2447 let cert_bitfield_bits = match &cert.cert.kind {
2448 AssignmentCertKindV2::RelayVRFDelay { core_index } => core_index.0 as usize + 1,
2449 AssignmentCertKindV2::RelayVRFModulo { .. } => candidate_bitfield.len(),
2453 AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield } =>
2454 core_bitfield.len(),
2455 };
2456
2457 let candidate_bitfield_bits = candidate_bitfield.len();
2458
2459 let msb = candidate_bitfield_bits - 1;
2461
2462 if cert_bitfield_bits > MAX_BITFIELD_SIZE
2464 || candidate_bitfield_bits > MAX_BITFIELD_SIZE
2465 || !candidate_bitfield.bit_at(msb.as_bit_index())
2467 {
2468 modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2470 .await;
2471 for candidate_index in candidate_bitfield.iter_ones() {
2472 gum::debug!(target: LOG_TARGET, block_hash = ?cert.block_hash, ?candidate_index, validator_index = ?cert.validator, "Bad assignment v2, oversized bitfield");
2473 }
2474 } else {
2475 sanitized_assignments.push((cert, candidate_bitfield))
2476 }
2477 }
2478
2479 sanitized_assignments
2480 }
2481
2482 async fn sanitize_v1_approvals(
2484 &mut self,
2485 peer_id: PeerId,
2486 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2487 approval: Vec<IndirectSignedApprovalVote>,
2488 ) -> Vec<IndirectSignedApprovalVoteV2> {
2489 let mut sanitized_approvals = Vec::new();
2490 for approval in approval.into_iter() {
2491 if approval.candidate_index as usize > MAX_BITFIELD_SIZE {
2492 modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2494 .await;
2495 gum::debug!(
2496 target: LOG_TARGET,
2497 block_hash = ?approval.block_hash,
2498 candidate_index = ?approval.candidate_index,
2499 "Bad approval v1, invalid candidate index"
2500 );
2501 } else {
2502 sanitized_approvals.push(approval.into())
2503 }
2504 }
2505
2506 sanitized_approvals
2507 }
2508
2509 async fn sanitize_v2_approvals(
2511 &mut self,
2512 peer_id: PeerId,
2513 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2514 approval: Vec<IndirectSignedApprovalVoteV2>,
2515 ) -> Vec<IndirectSignedApprovalVoteV2> {
2516 let mut sanitized_approvals = Vec::new();
2517 for approval in approval.into_iter() {
2518 if approval.candidate_indices.len() as usize > MAX_BITFIELD_SIZE {
2519 modify_reputation(&mut self.reputation, sender, peer_id, COST_OVERSIZED_BITFIELD)
2521 .await;
2522 gum::debug!(
2523 target: LOG_TARGET,
2524 block_hash = ?approval.block_hash,
2525 candidate_indices_len = ?approval.candidate_indices.len(),
2526 "Bad approval v2, invalid candidate indices size"
2527 );
2528 } else {
2529 sanitized_approvals.push(approval)
2530 }
2531 }
2532
2533 sanitized_approvals
2534 }
2535}
2536
2537#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2550async fn adjust_required_routing_and_propagate<
2551 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2552 BlockFilter,
2553 RoutingModifier,
2554>(
2555 network_sender: &mut N,
2556 blocks: &mut HashMap<Hash, BlockEntry>,
2557 topologies: &SessionGridTopologies,
2558 block_filter: BlockFilter,
2559 routing_modifier: RoutingModifier,
2560 peer_views: &HashMap<PeerId, PeerEntry>,
2561) where
2562 BlockFilter: Fn(&mut BlockEntry) -> bool,
2563 RoutingModifier: Fn(&RequiredRouting, bool, &ValidatorIndex) -> RequiredRouting,
2564{
2565 let mut peer_assignments = HashMap::new();
2566 let mut peer_approvals = HashMap::new();
2567
2568 for (block_hash, block_entry) in blocks {
2571 if !block_filter(block_entry) {
2572 continue
2573 }
2574
2575 let topology = match topologies.get_topology(block_entry.session) {
2576 Some(t) => t,
2577 None => continue,
2578 };
2579
2580 for approval_entry in block_entry.approval_entries.values_mut() {
2583 let new_required_routing = routing_modifier(
2584 &approval_entry.routing_info().required_routing,
2585 approval_entry.routing_info().local,
2586 &approval_entry.validator_index(),
2587 );
2588
2589 approval_entry.update_required_routing(new_required_routing);
2590
2591 if approval_entry.routing_info().required_routing.is_empty() {
2592 continue
2593 }
2594
2595 let assignment_message = approval_entry.assignment();
2596 let approval_messages = approval_entry.approvals();
2597 let (assignment_knowledge, message_kind) =
2598 approval_entry.create_assignment_knowledge(*block_hash);
2599
2600 for (peer, peer_knowledge) in &mut block_entry.known_by {
2601 if !topology
2602 .local_grid_neighbors()
2603 .route_to_peer(approval_entry.routing_info().required_routing, peer)
2604 {
2605 continue
2606 }
2607
2608 if !peer_knowledge.contains(&assignment_knowledge, message_kind) {
2610 peer_knowledge.sent.insert(assignment_knowledge.clone(), message_kind);
2611 peer_assignments
2612 .entry(*peer)
2613 .or_insert_with(Vec::new)
2614 .push(assignment_message.clone());
2615 }
2616
2617 for approval_message in &approval_messages {
2619 let approval_knowledge = PeerKnowledge::generate_approval_key(approval_message);
2620
2621 if !peer_knowledge.contains(&approval_knowledge.0, approval_knowledge.1) {
2622 peer_knowledge.sent.insert(approval_knowledge.0, approval_knowledge.1);
2623 peer_approvals
2624 .entry(*peer)
2625 .or_insert_with(Vec::new)
2626 .push(approval_message.clone());
2627 }
2628 }
2629 }
2630 }
2631 }
2632
2633 for (peer, assignments_packet) in peer_assignments {
2635 if let Some(peer_view) = peer_views.get(&peer) {
2636 send_assignments_batched(
2637 network_sender,
2638 assignments_packet,
2639 &vec![(peer, peer_view.version)],
2640 )
2641 .await;
2642 } else {
2643 gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2645 }
2646 }
2647
2648 for (peer, approvals_packet) in peer_approvals {
2649 if let Some(peer_view) = peer_views.get(&peer) {
2650 send_approvals_batched(
2651 network_sender,
2652 approvals_packet,
2653 &vec![(peer, peer_view.version)],
2654 )
2655 .await;
2656 } else {
2657 gum::warn!(target: LOG_TARGET, ?peer, "Unknown protocol version for peer",);
2659 }
2660 }
2661}
2662
2663async fn modify_reputation(
2665 reputation: &mut ReputationAggregator,
2666 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2667 peer_id: PeerId,
2668 rep: Rep,
2669) {
2670 gum::trace!(
2671 target: LOG_TARGET,
2672 reputation = ?rep,
2673 ?peer_id,
2674 "Reputation change for peer",
2675 );
2676 reputation.modify(sender, peer_id, rep).await;
2677}
2678
2679#[overseer::contextbounds(ApprovalDistribution, prefix = self::overseer)]
2680impl ApprovalDistribution {
2681 pub fn new(
2683 metrics: Metrics,
2684 slot_duration_millis: u64,
2685 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2686 ) -> Self {
2687 Self::new_with_clock(
2688 metrics,
2689 slot_duration_millis,
2690 Arc::new(SystemClock),
2691 assignment_criteria,
2692 )
2693 }
2694
2695 pub fn new_with_clock(
2697 metrics: Metrics,
2698 slot_duration_millis: u64,
2699 clock: Arc<dyn Clock + Send + Sync>,
2700 assignment_criteria: Arc<dyn AssignmentCriteria + Send + Sync>,
2701 ) -> Self {
2702 Self { metrics, slot_duration_millis, clock, assignment_criteria }
2703 }
2704
2705 async fn run<Context>(self, ctx: Context) {
2706 let mut state =
2707 State { slot_duration_millis: self.slot_duration_millis, ..Default::default() };
2708 let mut rng = rand::rngs::StdRng::from_entropy();
2711 let mut session_info_provider = RuntimeInfo::new_with_config(RuntimeInfoConfig {
2712 keystore: None,
2713 session_cache_lru_size: DISPUTE_WINDOW.get(),
2714 });
2715
2716 self.run_inner(
2717 ctx,
2718 &mut state,
2719 REPUTATION_CHANGE_INTERVAL,
2720 &mut rng,
2721 &mut session_info_provider,
2722 )
2723 .await
2724 }
2725
2726 async fn run_inner<Context>(
2728 self,
2729 mut ctx: Context,
2730 state: &mut State,
2731 reputation_interval: Duration,
2732 rng: &mut (impl CryptoRng + Rng),
2733 session_info_provider: &mut RuntimeInfo,
2734 ) {
2735 let new_reputation_delay = || futures_timer::Delay::new(reputation_interval).fuse();
2736 let mut reputation_delay = new_reputation_delay();
2737 let mut approval_voting_sender = ctx.sender().clone();
2738 let mut network_sender = ctx.sender().clone();
2739 let mut runtime_api_sender = ctx.sender().clone();
2740
2741 loop {
2742 select! {
2743 _ = reputation_delay => {
2744 state.reputation.send(ctx.sender()).await;
2745 reputation_delay = new_reputation_delay();
2746 },
2747 message = ctx.recv().fuse() => {
2748 let message = match message {
2749 Ok(message) => message,
2750 Err(e) => {
2751 gum::debug!(target: LOG_TARGET, err = ?e, "Failed to receive a message from Overseer, exiting");
2752 return
2753 },
2754 };
2755
2756 if self.handle_from_orchestra(message, &mut approval_voting_sender, &mut network_sender, &mut runtime_api_sender, state, rng, session_info_provider).await {
2757 return;
2758 }
2759
2760 },
2761 }
2762 }
2763 }
2764
2765 pub async fn handle_from_orchestra<
2769 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2770 A: overseer::SubsystemSender<ApprovalVotingMessage>,
2771 RA: overseer::SubsystemSender<RuntimeApiMessage>,
2772 >(
2773 &self,
2774 message: FromOrchestra<ApprovalDistributionMessage>,
2775 approval_voting_sender: &mut A,
2776 network_sender: &mut N,
2777 runtime_api_sender: &mut RA,
2778 state: &mut State,
2779 rng: &mut (impl CryptoRng + Rng),
2780 session_info_provider: &mut RuntimeInfo,
2781 ) -> bool {
2782 match message {
2783 FromOrchestra::Communication { msg } =>
2784 Self::handle_incoming(
2785 approval_voting_sender,
2786 network_sender,
2787 runtime_api_sender,
2788 state,
2789 msg,
2790 &self.metrics,
2791 rng,
2792 self.assignment_criteria.as_ref(),
2793 self.clock.as_ref(),
2794 session_info_provider,
2795 )
2796 .await,
2797 FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_update)) => {
2798 gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)");
2799 },
2804 FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => {
2805 gum::trace!(target: LOG_TARGET, number = %number, "finalized signal");
2806 state.handle_block_finalized(network_sender, &self.metrics, number).await;
2807 },
2808 FromOrchestra::Signal(OverseerSignal::Conclude) => return true,
2809 }
2810 false
2811 }
2812
2813 async fn handle_incoming<
2814 N: overseer::SubsystemSender<NetworkBridgeTxMessage>,
2815 A: overseer::SubsystemSender<ApprovalVotingMessage>,
2816 RA: overseer::SubsystemSender<RuntimeApiMessage>,
2817 >(
2818 approval_voting_sender: &mut A,
2819 network_sender: &mut N,
2820 runtime_api_sender: &mut RA,
2821 state: &mut State,
2822 msg: ApprovalDistributionMessage,
2823 metrics: &Metrics,
2824 rng: &mut (impl CryptoRng + Rng),
2825 assignment_criteria: &(impl AssignmentCriteria + ?Sized),
2826 clock: &(impl Clock + ?Sized),
2827 session_info_provider: &mut RuntimeInfo,
2828 ) {
2829 match msg {
2830 ApprovalDistributionMessage::NetworkBridgeUpdate(event) => {
2831 state
2832 .handle_network_msg(
2833 approval_voting_sender,
2834 network_sender,
2835 runtime_api_sender,
2836 metrics,
2837 event,
2838 rng,
2839 assignment_criteria,
2840 clock,
2841 session_info_provider,
2842 )
2843 .await;
2844 },
2845 ApprovalDistributionMessage::NewBlocks(metas) => {
2846 state
2847 .handle_new_blocks(
2848 approval_voting_sender,
2849 network_sender,
2850 runtime_api_sender,
2851 metrics,
2852 metas,
2853 rng,
2854 assignment_criteria,
2855 clock,
2856 session_info_provider,
2857 )
2858 .await;
2859 },
2860 ApprovalDistributionMessage::DistributeAssignment(cert, candidate_indices) => {
2861 gum::debug!(
2862 target: LOG_TARGET,
2863 ?candidate_indices,
2864 block_hash = ?cert.block_hash,
2865 assignment_kind = ?cert.cert.kind,
2866 "Distributing our assignment on candidates",
2867 );
2868
2869 state
2870 .import_and_circulate_assignment(
2871 approval_voting_sender,
2872 network_sender,
2873 runtime_api_sender,
2874 &metrics,
2875 MessageSource::Local,
2876 cert,
2877 candidate_indices,
2878 rng,
2879 assignment_criteria,
2880 clock,
2881 session_info_provider,
2882 )
2883 .await;
2884 },
2885 ApprovalDistributionMessage::DistributeApproval(vote) => {
2886 gum::debug!(
2887 target: LOG_TARGET,
2888 "Distributing our approval vote on candidate (block={}, index={:?})",
2889 vote.block_hash,
2890 vote.candidate_indices,
2891 );
2892
2893 state
2894 .import_and_circulate_approval(
2895 approval_voting_sender,
2896 network_sender,
2897 runtime_api_sender,
2898 metrics,
2899 MessageSource::Local,
2900 vote,
2901 session_info_provider,
2902 )
2903 .await;
2904 },
2905 ApprovalDistributionMessage::GetApprovalSignatures(indices, tx) => {
2906 let sigs = state.get_approval_signatures(indices);
2907 if let Err(_) = tx.send(sigs) {
2908 gum::debug!(
2909 target: LOG_TARGET,
2910 "Sending back approval signatures failed, oneshot got closed"
2911 );
2912 }
2913 },
2914 ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
2915 gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
2916 state.approval_checking_lag = lag;
2917 },
2918 }
2919 }
2920}
2921
2922#[overseer::subsystem(ApprovalDistribution, error=SubsystemError, prefix=self::overseer)]
2923impl<Context> ApprovalDistribution {
2924 fn start(self, ctx: Context) -> SpawnedSubsystem {
2925 let future = self.run(ctx).map(|_| Ok(())).boxed();
2926
2927 SpawnedSubsystem { name: "approval-distribution-subsystem", future }
2928 }
2929}
2930
2931const fn ensure_size_not_zero(size: usize) -> usize {
2933 if 0 == size {
2934 panic!("Batch size must be at least 1 (MAX_NOTIFICATION_SIZE constant is too low)",);
2935 }
2936
2937 size
2938}
2939
2940pub const MAX_ASSIGNMENT_BATCH_SIZE: usize = ensure_size_not_zero(
2945 MAX_NOTIFICATION_SIZE as usize /
2946 std::mem::size_of::<(IndirectAssignmentCertV2, CandidateIndex)>() /
2947 3,
2948);
2949
2950pub const MAX_APPROVAL_BATCH_SIZE: usize = ensure_size_not_zero(
2952 MAX_NOTIFICATION_SIZE as usize / std::mem::size_of::<IndirectSignedApprovalVoteV2>() / 3,
2953);
2954
2955async fn send_assignments_batched_inner(
2957 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
2958 batch: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)>,
2959 peers: Vec<PeerId>,
2960 peer_version: ValidationVersion,
2961) {
2962 if peer_version == ValidationVersion::V3 {
2963 sender
2964 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
2965 peers,
2966 Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
2967 protocol_v3::ApprovalDistributionMessage::Assignments(
2968 batch.into_iter().collect(),
2969 ),
2970 )),
2971 ))
2972 .await;
2973 } else {
2974 let batch = batch
2977 .into_iter()
2978 .filter_map(|(cert, candidates)| {
2979 cert.try_into().ok().map(|cert| {
2980 (
2981 cert,
2982 candidates
2984 .first_one()
2985 .map(|index| index as CandidateIndex)
2986 .expect("Assignment was checked for not being empty; qed"),
2987 )
2988 })
2989 })
2990 .collect();
2991 let message = if peer_version == ValidationVersion::V1 {
2992 Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
2993 protocol_v1::ApprovalDistributionMessage::Assignments(batch),
2994 ))
2995 } else {
2996 Versioned::V2(protocol_v2::ValidationProtocol::ApprovalDistribution(
2997 protocol_v2::ApprovalDistributionMessage::Assignments(batch),
2998 ))
2999 };
3000 sender
3001 .send_message(NetworkBridgeTxMessage::SendValidationMessage(peers, message))
3002 .await;
3003 }
3004}
3005
3006pub(crate) async fn send_assignments_batched(
3012 network_sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
3013 v2_assignments: impl IntoIterator<Item = (IndirectAssignmentCertV2, CandidateBitfield)> + Clone,
3014 peers: &[(PeerId, ProtocolVersion)],
3015) {
3016 let v1_peers = filter_by_peer_version(peers, ValidationVersion::V1.into());
3017 let v2_peers = filter_by_peer_version(peers, ValidationVersion::V2.into());
3018 let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
3019
3020 if !v1_peers.is_empty() || !v2_peers.is_empty() {
3023 let v1_assignments = v2_assignments
3026 .clone()
3027 .into_iter()
3028 .filter(|(_, candidates)| candidates.count_ones() == 1);
3029
3030 let mut v1_batches = v1_assignments.peekable();
3031
3032 while v1_batches.peek().is_some() {
3033 let batch: Vec<_> = v1_batches.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect();
3034 if !v1_peers.is_empty() {
3035 send_assignments_batched_inner(
3036 network_sender,
3037 batch.clone(),
3038 v1_peers.clone(),
3039 ValidationVersion::V1,
3040 )
3041 .await;
3042 }
3043
3044 if !v2_peers.is_empty() {
3045 send_assignments_batched_inner(
3046 network_sender,
3047 batch,
3048 v2_peers.clone(),
3049 ValidationVersion::V2,
3050 )
3051 .await;
3052 }
3053 }
3054 }
3055
3056 if !v3_peers.is_empty() {
3057 let mut v3 = v2_assignments.into_iter().peekable();
3058
3059 while v3.peek().is_some() {
3060 let batch = v3.by_ref().take(MAX_ASSIGNMENT_BATCH_SIZE).collect::<Vec<_>>();
3061 send_assignments_batched_inner(
3062 network_sender,
3063 batch,
3064 v3_peers.clone(),
3065 ValidationVersion::V3,
3066 )
3067 .await;
3068 }
3069 }
3070}
3071
3072pub(crate) async fn send_approvals_batched(
3074 sender: &mut impl overseer::SubsystemSender<NetworkBridgeTxMessage>,
3075 approvals: impl IntoIterator<Item = IndirectSignedApprovalVoteV2> + Clone,
3076 peers: &[(PeerId, ProtocolVersion)],
3077) {
3078 let v1_peers = filter_by_peer_version(peers, ValidationVersion::V1.into());
3079 let v2_peers = filter_by_peer_version(peers, ValidationVersion::V2.into());
3080 let v3_peers = filter_by_peer_version(peers, ValidationVersion::V3.into());
3081
3082 if !v1_peers.is_empty() || !v2_peers.is_empty() {
3083 let mut batches = approvals
3084 .clone()
3085 .into_iter()
3086 .filter(|approval| approval.candidate_indices.count_ones() == 1)
3087 .filter_map(|val| val.try_into().ok())
3088 .peekable();
3089
3090 while batches.peek().is_some() {
3091 let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
3092
3093 if !v1_peers.is_empty() {
3094 sender
3095 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
3096 v1_peers.clone(),
3097 Versioned::V1(protocol_v1::ValidationProtocol::ApprovalDistribution(
3098 protocol_v1::ApprovalDistributionMessage::Approvals(batch.clone()),
3099 )),
3100 ))
3101 .await;
3102 }
3103
3104 if !v2_peers.is_empty() {
3105 sender
3106 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
3107 v2_peers.clone(),
3108 Versioned::V2(protocol_v2::ValidationProtocol::ApprovalDistribution(
3109 protocol_v2::ApprovalDistributionMessage::Approvals(batch),
3110 )),
3111 ))
3112 .await;
3113 }
3114 }
3115 }
3116
3117 if !v3_peers.is_empty() {
3118 let mut batches = approvals.into_iter().peekable();
3119
3120 while batches.peek().is_some() {
3121 let batch: Vec<_> = batches.by_ref().take(MAX_APPROVAL_BATCH_SIZE).collect();
3122
3123 sender
3124 .send_message(NetworkBridgeTxMessage::SendValidationMessage(
3125 v3_peers.clone(),
3126 Versioned::V3(protocol_v3::ValidationProtocol::ApprovalDistribution(
3127 protocol_v3::ApprovalDistributionMessage::Approvals(batch),
3128 )),
3129 ))
3130 .await;
3131 }
3132 }
3133}