arc_malachitebft_sync/
state.rs1use std::cmp::max;
2use std::collections::{BTreeMap, HashMap};
3use std::ops::RangeInclusive;
4
5use malachitebft_core_types::{Context, Height};
6use malachitebft_peer::PeerId;
7
8use crate::scoring::{ema, PeerScorer, Strategy};
9use crate::{Config, OutboundRequestId, Status};
10
11pub struct State<Ctx>
12where
13 Ctx: Context,
14{
15 pub rng: Box<dyn rand::RngCore + Send>,
16
17 pub config: Config,
19
20 pub started: bool,
22
23 pub consensus_height: Ctx::Height,
25
26 pub tip_height: Ctx::Height,
28
29 pub sync_height: Ctx::Height,
32
33 pub pending_requests: BTreeMap<OutboundRequestId, (RangeInclusive<Ctx::Height>, PeerId)>,
35
36 pub peers: BTreeMap<PeerId, Status<Ctx>>,
38
39 pub peer_scorer: PeerScorer,
41}
42
43impl<Ctx> State<Ctx>
44where
45 Ctx: Context,
46{
47 pub fn new(
48 rng: Box<dyn rand::RngCore + Send>,
50 config: Config,
52 ) -> Self {
53 let peer_scorer = match config.scoring_strategy {
54 Strategy::Ema => PeerScorer::new(ema::ExponentialMovingAverage::default()),
55 };
56
57 Self {
58 rng,
59 config,
60 started: false,
61 consensus_height: Ctx::Height::ZERO,
62 tip_height: Ctx::Height::ZERO,
63 sync_height: Ctx::Height::ZERO,
64 pending_requests: BTreeMap::new(),
65 peers: BTreeMap::new(),
66 peer_scorer,
67 }
68 }
69
70 pub fn max_parallel_requests(&self) -> usize {
73 max(1, self.config.parallel_requests)
74 }
75
76 pub fn update_status(&mut self, status: Status<Ctx>) {
77 self.peers.insert(status.peer_id, status);
78 }
79
80 pub fn update_request(
81 &mut self,
82 request_id: OutboundRequestId,
83 peer_id: PeerId,
84 range: RangeInclusive<Ctx::Height>,
85 ) {
86 self.pending_requests.insert(request_id, (range, peer_id));
87 }
88
89 pub fn filter_peers_by_range(
95 peers: &BTreeMap<PeerId, Status<Ctx>>,
96 range: &RangeInclusive<Ctx::Height>,
97 except: Option<PeerId>,
98 ) -> HashMap<PeerId, RangeInclusive<Ctx::Height>> {
99 let peers_with_whole_range = peers
101 .iter()
102 .filter(|(peer, status)| {
103 status.history_min_height <= *range.start()
104 && *range.start() <= *range.end()
105 && *range.end() <= status.tip_height
106 && except.is_none_or(|p| p != **peer)
107 })
108 .map(|(peer, _)| (*peer, range.clone()))
109 .collect::<HashMap<_, _>>();
110
111 if !peers_with_whole_range.is_empty() {
113 peers_with_whole_range
114 } else {
115 peers
117 .iter()
118 .filter(|(peer, status)| {
119 status.history_min_height <= *range.start()
120 && except.is_none_or(|p| p != **peer)
121 })
122 .map(|(peer, status)| (*peer, *range.start()..=status.tip_height))
123 .filter(|(_, range)| !range.is_empty())
124 .collect::<HashMap<_, _>>()
125 }
126 }
127
128 pub fn random_peer_with_except(
130 &mut self,
131 range: &RangeInclusive<Ctx::Height>,
132 except: Option<PeerId>,
133 ) -> Option<(PeerId, RangeInclusive<Ctx::Height>)> {
134 let peers_range = Self::filter_peers_by_range(&self.peers, range, except);
136
137 let peer_ids = peers_range.keys().cloned().collect::<Vec<_>>();
139 self.peer_scorer
140 .select_peer(&peer_ids, &mut self.rng)
141 .map(|peer_id| (peer_id, peers_range.get(&peer_id).unwrap().clone()))
142 }
143
144 pub fn random_peer_with(
146 &mut self,
147 range: &RangeInclusive<Ctx::Height>,
148 ) -> Option<(PeerId, RangeInclusive<Ctx::Height>)>
149 where
150 Ctx: Context,
151 {
152 self.random_peer_with_except(range, None)
153 }
154
155 pub fn get_request_id_by(&self, height: Ctx::Height) -> Option<(OutboundRequestId, PeerId)> {
159 self.pending_requests
160 .iter()
161 .find(|(_, (range, _))| range.contains(&height))
162 .map(|(request_id, (_, stored_peer_id))| (request_id.clone(), *stored_peer_id))
163 }
164
165 pub fn trim_validated_heights(
168 &mut self,
169 range: &RangeInclusive<Ctx::Height>,
170 ) -> RangeInclusive<Ctx::Height> {
171 let start = max(self.tip_height.increment(), *range.start());
172 start..=*range.end()
173 }
174
175 pub fn prune_pending_requests(&mut self) {
177 self.pending_requests
178 .retain(|_, (range, _)| range.end() > &self.tip_height);
179 }
180}