Skip to main content

arc_malachitebft_sync/
state.rs

1use std::cmp::max;
2use std::collections::{BTreeMap, HashMap};
3use std::ops::RangeInclusive;
4
5use malachitebft_core_types::{Context, Height};
6use malachitebft_peer::PeerId;
7
8use crate::scoring::{ema, PeerScorer, Strategy};
9use crate::{Config, OutboundRequestId, Status};
10
11pub struct State<Ctx>
12where
13    Ctx: Context,
14{
15    pub rng: Box<dyn rand::RngCore + Send>,
16
17    /// Configuration for the sync state and behaviour.
18    pub config: Config,
19
20    /// Consensus has started
21    pub started: bool,
22
23    /// The height that consensus is at, but has not decided yet.
24    pub consensus_height: Ctx::Height,
25
26    /// Height of last decided value
27    pub tip_height: Ctx::Height,
28
29    /// Next height to send a sync request.
30    /// Invariant: `sync_height > tip_height`
31    pub sync_height: Ctx::Height,
32
33    /// The requested range of heights.
34    pub pending_requests: BTreeMap<OutboundRequestId, (RangeInclusive<Ctx::Height>, PeerId)>,
35
36    /// The set of peers we are connected to in order to get values, certificates and votes.
37    pub peers: BTreeMap<PeerId, Status<Ctx>>,
38
39    /// Peer scorer for scoring peers based on their performance.
40    pub peer_scorer: PeerScorer,
41}
42
43impl<Ctx> State<Ctx>
44where
45    Ctx: Context,
46{
47    pub fn new(
48        // Random number generator for selecting peers
49        rng: Box<dyn rand::RngCore + Send>,
50        // Sync configuration
51        config: Config,
52    ) -> Self {
53        let peer_scorer = match config.scoring_strategy {
54            Strategy::Ema => PeerScorer::new(ema::ExponentialMovingAverage::default()),
55        };
56
57        Self {
58            rng,
59            config,
60            started: false,
61            consensus_height: Ctx::Height::ZERO,
62            tip_height: Ctx::Height::ZERO,
63            sync_height: Ctx::Height::ZERO,
64            pending_requests: BTreeMap::new(),
65            peers: BTreeMap::new(),
66            peer_scorer,
67        }
68    }
69
70    /// The maximum number of parallel requests that can be made to peers.
71    /// If the configuration is set to 0, it defaults to 1.
72    pub fn max_parallel_requests(&self) -> usize {
73        max(1, self.config.parallel_requests)
74    }
75
76    pub fn update_status(&mut self, status: Status<Ctx>) {
77        self.peers.insert(status.peer_id, status);
78    }
79
80    pub fn update_request(
81        &mut self,
82        request_id: OutboundRequestId,
83        peer_id: PeerId,
84        range: RangeInclusive<Ctx::Height>,
85    ) {
86        self.pending_requests.insert(request_id, (range, peer_id));
87    }
88
89    /// Filter peers to only include those that can provide the given range of values, or at least a prefix of the range.
90    ///
91    /// If there is no peer with all requested values, select a peer that has a tip at or above the start of the range.
92    /// Prefer peers that support batching (v2 sync protocol).
93    /// Return the peer ID and the range of heights that the peer can provide.
94    pub fn filter_peers_by_range(
95        peers: &BTreeMap<PeerId, Status<Ctx>>,
96        range: &RangeInclusive<Ctx::Height>,
97        except: Option<PeerId>,
98    ) -> HashMap<PeerId, RangeInclusive<Ctx::Height>> {
99        // Peers that can provide the whole range of values.
100        let peers_with_whole_range = peers
101            .iter()
102            .filter(|(peer, status)| {
103                status.history_min_height <= *range.start()
104                    && *range.start() <= *range.end()
105                    && *range.end() <= status.tip_height
106                    && except.is_none_or(|p| p != **peer)
107            })
108            .map(|(peer, _)| (*peer, range.clone()))
109            .collect::<HashMap<_, _>>();
110
111        // Prefer peers that have the whole range of values in their history.
112        if !peers_with_whole_range.is_empty() {
113            peers_with_whole_range
114        } else {
115            // Otherwise, just get the peers that can provide a prefix of the range.
116            peers
117                .iter()
118                .filter(|(peer, status)| {
119                    status.history_min_height <= *range.start()
120                        && except.is_none_or(|p| p != **peer)
121                })
122                .map(|(peer, status)| (*peer, *range.start()..=status.tip_height))
123                .filter(|(_, range)| !range.is_empty())
124                .collect::<HashMap<_, _>>()
125        }
126    }
127
128    /// Select at random a peer that can provide the given range of values, while excluding the given peer if provided.
129    pub fn random_peer_with_except(
130        &mut self,
131        range: &RangeInclusive<Ctx::Height>,
132        except: Option<PeerId>,
133    ) -> Option<(PeerId, RangeInclusive<Ctx::Height>)> {
134        // Filtered peers together with the range of heights they can provide.
135        let peers_range = Self::filter_peers_by_range(&self.peers, range, except);
136
137        // Select a peer at random.
138        let peer_ids = peers_range.keys().cloned().collect::<Vec<_>>();
139        self.peer_scorer
140            .select_peer(&peer_ids, &mut self.rng)
141            .map(|peer_id| (peer_id, peers_range.get(&peer_id).unwrap().clone()))
142    }
143
144    /// Same as [`Self::random_peer_with_except`] but without excluding any peer.
145    pub fn random_peer_with(
146        &mut self,
147        range: &RangeInclusive<Ctx::Height>,
148    ) -> Option<(PeerId, RangeInclusive<Ctx::Height>)>
149    where
150        Ctx: Context,
151    {
152        self.random_peer_with_except(range, None)
153    }
154
155    /// Get the request that contains the given height.
156    ///
157    /// Assumes a height cannot be in multiple pending requests.
158    pub fn get_request_id_by(&self, height: Ctx::Height) -> Option<(OutboundRequestId, PeerId)> {
159        self.pending_requests
160            .iter()
161            .find(|(_, (range, _))| range.contains(&height))
162            .map(|(request_id, (_, stored_peer_id))| (request_id.clone(), *stored_peer_id))
163    }
164
165    /// Return a new range of heights, trimming from the beginning any height
166    /// that is validated by consensus.
167    pub fn trim_validated_heights(
168        &mut self,
169        range: &RangeInclusive<Ctx::Height>,
170    ) -> RangeInclusive<Ctx::Height> {
171        let start = max(self.tip_height.increment(), *range.start());
172        start..=*range.end()
173    }
174
175    /// Remove pending requests that are for heights that have already been validated by consensus.
176    pub fn prune_pending_requests(&mut self) {
177        self.pending_requests
178            .retain(|_, (range, _)| range.end() > &self.tip_height);
179    }
180}