hotmint_consensus/
pacemaker.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use hotmint_types::crypto::{AggregateSignature, Signature};
5use hotmint_types::{
6 ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
7};
8use tokio::time::{Instant, Sleep};
9use tracing::debug;
10
11const BASE_TIMEOUT_MS: u64 = 2000;
12const MAX_TIMEOUT_MS: u64 = 30000;
13const BACKOFF_MULTIPLIER: f64 = 1.5;
14
15#[derive(Debug, Clone)]
17pub struct PacemakerConfig {
18 pub base_timeout_ms: u64,
19 pub max_timeout_ms: u64,
20 pub backoff_multiplier: f64,
21}
22
23impl Default for PacemakerConfig {
24 fn default() -> Self {
25 Self {
26 base_timeout_ms: BASE_TIMEOUT_MS,
27 max_timeout_ms: MAX_TIMEOUT_MS,
28 backoff_multiplier: BACKOFF_MULTIPLIER,
29 }
30 }
31}
32
33pub struct Pacemaker {
35 config: PacemakerConfig,
36 pub view_deadline: Instant,
37 base_timeout: Duration,
38 current_timeout: Duration,
39 consecutive_timeouts: u32,
41 wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
43 relayed_tcs: HashMap<ViewNumber, bool>,
45}
46
47impl Default for Pacemaker {
48 fn default() -> Self {
49 Self::new()
50 }
51}
52
53impl Pacemaker {
54 pub fn new() -> Self {
55 Self::with_config(PacemakerConfig::default())
56 }
57
58 pub fn with_config(config: PacemakerConfig) -> Self {
59 let base_timeout = Duration::from_millis(config.base_timeout_ms);
60 Self {
61 config,
62 view_deadline: Instant::now() + base_timeout,
63 base_timeout,
64 current_timeout: base_timeout,
65 consecutive_timeouts: 0,
66 wishes: HashMap::new(),
67 relayed_tcs: HashMap::new(),
68 }
69 }
70
71 pub fn reset_timer(&mut self) {
73 self.view_deadline = Instant::now() + self.current_timeout;
74 }
75
76 pub fn reset_on_progress(&mut self) {
78 self.consecutive_timeouts = 0;
79 self.current_timeout = self.base_timeout;
80 self.view_deadline = Instant::now() + self.current_timeout;
81 }
82
83 pub fn on_timeout(&mut self) {
85 self.consecutive_timeouts += 1;
86 let multiplier = self
87 .config
88 .backoff_multiplier
89 .powi(self.consecutive_timeouts as i32);
90 let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
91 self.current_timeout = Duration::from_millis(new_ms.min(self.config.max_timeout_ms));
92 debug!(
93 consecutive = self.consecutive_timeouts,
94 timeout_ms = self.current_timeout.as_millis() as u64,
95 "pacemaker timeout backoff"
96 );
97 self.view_deadline = Instant::now() + self.current_timeout;
98 }
99
100 pub fn sleep_until_deadline(&self) -> Sleep {
101 tokio::time::sleep_until(self.view_deadline)
102 }
103
104 pub fn current_timeout(&self) -> Duration {
105 self.current_timeout
106 }
107
108 pub fn consecutive_timeouts(&self) -> u32 {
109 self.consecutive_timeouts
110 }
111
112 pub fn build_wish(
114 &self,
115 current_view: ViewNumber,
116 validator_id: ValidatorId,
117 highest_qc: Option<QuorumCertificate>,
118 signer: &dyn hotmint_types::Signer,
119 ) -> ConsensusMessage {
120 let target_view = current_view.next();
121 let msg_bytes = wish_signing_bytes(target_view);
122 let signature = signer.sign(&msg_bytes);
123 ConsensusMessage::Wish {
124 target_view,
125 validator: validator_id,
126 highest_qc,
127 signature,
128 }
129 }
130
131 pub fn add_wish(
133 &mut self,
134 vs: &ValidatorSet,
135 target_view: ViewNumber,
136 validator: ValidatorId,
137 highest_qc: Option<QuorumCertificate>,
138 signature: Signature,
139 ) -> Option<TimeoutCertificate> {
140 let wishes = self.wishes.entry(target_view).or_default();
141
142 if wishes.iter().any(|(id, _, _)| *id == validator) {
144 return None;
145 }
146
147 wishes.push((validator, highest_qc, signature));
148
149 let mut power = 0u64;
151 for (vid, _, _) in wishes.iter() {
152 power += vs.power_of(*vid);
153 }
154
155 if power >= vs.quorum_threshold() {
156 let mut agg = AggregateSignature::new(vs.validator_count());
157 let mut highest_qcs = vec![None; vs.validator_count()];
158
159 for (vid, hqc, sig) in wishes.iter() {
160 if let Some(idx) = vs.index_of(*vid) {
161 let _ = agg.add(idx, sig.clone());
162 highest_qcs[idx] = hqc.clone();
163 }
164 }
165
166 Some(TimeoutCertificate {
167 view: ViewNumber(target_view.as_u64().saturating_sub(1)),
168 aggregate_signature: agg,
169 highest_qcs,
170 })
171 } else {
172 None
173 }
174 }
175
176 pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
178 use std::collections::hash_map::Entry;
179 match self.relayed_tcs.entry(tc.view) {
180 Entry::Vacant(e) => {
181 e.insert(true);
182 true
183 }
184 Entry::Occupied(_) => false,
185 }
186 }
187
188 pub fn clear_view(&mut self, view: ViewNumber) {
189 self.wishes.remove(&view);
190 }
192
193 pub fn prune_before(&mut self, min_view: ViewNumber) {
195 self.wishes.retain(|v, _| *v >= min_view);
196 self.relayed_tcs.retain(|v, _| *v >= min_view);
197 }
198}
199
200pub(crate) fn wish_signing_bytes(target_view: ViewNumber) -> Vec<u8> {
201 let mut buf = Vec::with_capacity(9);
202 buf.push(b'W');
203 buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
204 buf
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[test]
212 fn test_exponential_backoff() {
213 let mut pm = Pacemaker::new();
214 assert_eq!(pm.consecutive_timeouts(), 0);
215 assert_eq!(pm.current_timeout().as_millis(), 2000);
216
217 pm.on_timeout();
218 assert_eq!(pm.consecutive_timeouts(), 1);
219 assert_eq!(pm.current_timeout().as_millis(), 3000); pm.on_timeout();
222 assert_eq!(pm.consecutive_timeouts(), 2);
223 assert_eq!(pm.current_timeout().as_millis(), 4500); pm.on_timeout();
226 assert_eq!(pm.consecutive_timeouts(), 3);
227 assert_eq!(pm.current_timeout().as_millis(), 6750); }
229
230 #[test]
231 fn test_backoff_caps_at_max() {
232 let mut pm = Pacemaker::new();
233 for _ in 0..20 {
234 pm.on_timeout();
235 }
236 assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
237 }
238
239 #[test]
240 fn test_reset_on_progress() {
241 let mut pm = Pacemaker::new();
242 pm.on_timeout();
243 pm.on_timeout();
244 assert!(pm.current_timeout().as_millis() > 2000);
245
246 pm.reset_on_progress();
247 assert_eq!(pm.consecutive_timeouts(), 0);
248 assert_eq!(pm.current_timeout().as_millis(), 2000);
249 }
250
251 #[test]
252 fn test_tc_relay_dedup() {
253 let mut pm = Pacemaker::new();
254 let tc = TimeoutCertificate {
255 view: ViewNumber(5),
256 aggregate_signature: AggregateSignature::new(4),
257 highest_qcs: vec![],
258 };
259 assert!(pm.should_relay_tc(&tc));
260 assert!(!pm.should_relay_tc(&tc)); }
262}