hotmint_consensus/
pacemaker.rs1use std::collections::HashMap;
2use std::time::Duration;
3
4use hotmint_types::crypto::{AggregateSignature, Signature};
5use hotmint_types::{
6 ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
7};
8use tokio::time::{Instant, Sleep};
9use tracing::debug;
10
11const BASE_TIMEOUT_MS: u64 = 2000;
12const MAX_TIMEOUT_MS: u64 = 30000;
13const BACKOFF_MULTIPLIER: f64 = 1.5;
14
15pub struct Pacemaker {
17 pub view_deadline: Instant,
18 base_timeout: Duration,
19 current_timeout: Duration,
20 consecutive_timeouts: u32,
22 wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
24 relayed_tcs: HashMap<ViewNumber, bool>,
26}
27
28impl Default for Pacemaker {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl Pacemaker {
35 pub fn new() -> Self {
36 let base_timeout = Duration::from_millis(BASE_TIMEOUT_MS);
37 Self {
38 view_deadline: Instant::now() + base_timeout,
39 base_timeout,
40 current_timeout: base_timeout,
41 consecutive_timeouts: 0,
42 wishes: HashMap::new(),
43 relayed_tcs: HashMap::new(),
44 }
45 }
46
47 pub fn reset_timer(&mut self) {
49 self.view_deadline = Instant::now() + self.current_timeout;
50 }
51
52 pub fn reset_on_progress(&mut self) {
54 self.consecutive_timeouts = 0;
55 self.current_timeout = self.base_timeout;
56 self.view_deadline = Instant::now() + self.current_timeout;
57 }
58
59 pub fn on_timeout(&mut self) {
61 self.consecutive_timeouts += 1;
62 let multiplier = BACKOFF_MULTIPLIER.powi(self.consecutive_timeouts as i32);
63 let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
64 self.current_timeout = Duration::from_millis(new_ms.min(MAX_TIMEOUT_MS));
65 debug!(
66 consecutive = self.consecutive_timeouts,
67 timeout_ms = self.current_timeout.as_millis() as u64,
68 "pacemaker timeout backoff"
69 );
70 self.view_deadline = Instant::now() + self.current_timeout;
71 }
72
73 pub fn sleep_until_deadline(&self) -> Sleep {
74 tokio::time::sleep_until(self.view_deadline)
75 }
76
77 pub fn current_timeout(&self) -> Duration {
78 self.current_timeout
79 }
80
81 pub fn consecutive_timeouts(&self) -> u32 {
82 self.consecutive_timeouts
83 }
84
85 pub fn build_wish(
87 &self,
88 current_view: ViewNumber,
89 validator_id: ValidatorId,
90 highest_qc: Option<QuorumCertificate>,
91 signer: &dyn hotmint_types::Signer,
92 ) -> ConsensusMessage {
93 let target_view = current_view.next();
94 let msg_bytes = wish_signing_bytes(target_view);
95 let signature = signer.sign(&msg_bytes);
96 ConsensusMessage::Wish {
97 target_view,
98 validator: validator_id,
99 highest_qc,
100 signature,
101 }
102 }
103
104 pub fn add_wish(
106 &mut self,
107 vs: &ValidatorSet,
108 target_view: ViewNumber,
109 validator: ValidatorId,
110 highest_qc: Option<QuorumCertificate>,
111 signature: Signature,
112 ) -> Option<TimeoutCertificate> {
113 let wishes = self.wishes.entry(target_view).or_default();
114
115 if wishes.iter().any(|(id, _, _)| *id == validator) {
117 return None;
118 }
119
120 wishes.push((validator, highest_qc, signature));
121
122 let mut power = 0u64;
124 for (vid, _, _) in wishes.iter() {
125 power += vs.power_of(*vid);
126 }
127
128 if power >= vs.quorum_threshold() {
129 let mut agg = AggregateSignature::new(vs.validator_count());
130 let mut highest_qcs = vec![None; vs.validator_count()];
131
132 for (vid, hqc, sig) in wishes.iter() {
133 if let Some(idx) = vs.index_of(*vid) {
134 let _ = agg.add(idx, sig.clone());
135 highest_qcs[idx] = hqc.clone();
136 }
137 }
138
139 Some(TimeoutCertificate {
140 view: ViewNumber(target_view.as_u64().saturating_sub(1)),
141 aggregate_signature: agg,
142 highest_qcs,
143 })
144 } else {
145 None
146 }
147 }
148
149 pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
151 use std::collections::hash_map::Entry;
152 match self.relayed_tcs.entry(tc.view) {
153 Entry::Vacant(e) => {
154 e.insert(true);
155 true
156 }
157 Entry::Occupied(_) => false,
158 }
159 }
160
161 pub fn clear_view(&mut self, view: ViewNumber) {
162 self.wishes.remove(&view);
163 }
165
166 pub fn prune_before(&mut self, min_view: ViewNumber) {
168 self.wishes.retain(|v, _| *v >= min_view);
169 self.relayed_tcs.retain(|v, _| *v >= min_view);
170 }
171}
172
173fn wish_signing_bytes(target_view: ViewNumber) -> Vec<u8> {
174 let mut buf = Vec::with_capacity(9);
175 buf.push(b'W');
176 buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
177 buf
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[test]
185 fn test_exponential_backoff() {
186 let mut pm = Pacemaker::new();
187 assert_eq!(pm.consecutive_timeouts(), 0);
188 assert_eq!(pm.current_timeout().as_millis(), 2000);
189
190 pm.on_timeout();
191 assert_eq!(pm.consecutive_timeouts(), 1);
192 assert_eq!(pm.current_timeout().as_millis(), 3000); pm.on_timeout();
195 assert_eq!(pm.consecutive_timeouts(), 2);
196 assert_eq!(pm.current_timeout().as_millis(), 4500); pm.on_timeout();
199 assert_eq!(pm.consecutive_timeouts(), 3);
200 assert_eq!(pm.current_timeout().as_millis(), 6750); }
202
203 #[test]
204 fn test_backoff_caps_at_max() {
205 let mut pm = Pacemaker::new();
206 for _ in 0..20 {
207 pm.on_timeout();
208 }
209 assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
210 }
211
212 #[test]
213 fn test_reset_on_progress() {
214 let mut pm = Pacemaker::new();
215 pm.on_timeout();
216 pm.on_timeout();
217 assert!(pm.current_timeout().as_millis() > 2000);
218
219 pm.reset_on_progress();
220 assert_eq!(pm.consecutive_timeouts(), 0);
221 assert_eq!(pm.current_timeout().as_millis(), 2000);
222 }
223
224 #[test]
225 fn test_tc_relay_dedup() {
226 let mut pm = Pacemaker::new();
227 let tc = TimeoutCertificate {
228 view: ViewNumber(5),
229 aggregate_signature: AggregateSignature::new(4),
230 highest_qcs: vec![],
231 };
232 assert!(pm.should_relay_tc(&tc));
233 assert!(!pm.should_relay_tc(&tc)); }
235}