1use std::collections::HashMap;
2use std::collections::hash_map::Entry;
3use std::time::Duration;
4
5use hotmint_types::crypto::{AggregateSignature, Signature};
6use hotmint_types::epoch::EpochNumber;
7use hotmint_types::{
8 ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
9};
10use tokio::time::{Instant, Sleep};
11use tracing::debug;
12
13const BASE_TIMEOUT_MS: u64 = 2000;
14const MAX_TIMEOUT_MS: u64 = 30000;
15const BACKOFF_MULTIPLIER: f64 = 1.5;
16
17#[derive(Debug, Clone)]
19pub struct PacemakerConfig {
20 pub base_timeout_ms: u64,
21 pub max_timeout_ms: u64,
22 pub backoff_multiplier: f64,
23}
24
25impl Default for PacemakerConfig {
26 fn default() -> Self {
27 Self {
28 base_timeout_ms: BASE_TIMEOUT_MS,
29 max_timeout_ms: MAX_TIMEOUT_MS,
30 backoff_multiplier: BACKOFF_MULTIPLIER,
31 }
32 }
33}
34
35pub struct Pacemaker {
37 config: PacemakerConfig,
38 pub view_deadline: Instant,
39 base_timeout: Duration,
40 current_timeout: Duration,
41 consecutive_timeouts: u32,
43 wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
45 relayed_tcs: HashMap<ViewNumber, bool>,
47}
48
49impl Default for Pacemaker {
50 fn default() -> Self {
51 Self::new()
52 }
53}
54
55impl Pacemaker {
56 pub fn new() -> Self {
57 Self::with_config(PacemakerConfig::default())
58 }
59
60 pub fn with_config(config: PacemakerConfig) -> Self {
61 let base_timeout = Duration::from_millis(config.base_timeout_ms);
62 Self {
63 config,
64 view_deadline: Instant::now() + base_timeout,
65 base_timeout,
66 current_timeout: base_timeout,
67 consecutive_timeouts: 0,
68 wishes: HashMap::new(),
69 relayed_tcs: HashMap::new(),
70 }
71 }
72
73 pub fn reset_timer(&mut self) {
75 self.view_deadline = Instant::now() + self.current_timeout;
76 }
77
78 pub fn reset_on_progress(&mut self) {
80 self.consecutive_timeouts = 0;
81 self.current_timeout = self.base_timeout;
82 self.view_deadline = Instant::now() + self.current_timeout;
83 }
84
85 pub fn on_timeout(&mut self) {
87 self.consecutive_timeouts += 1;
88 let multiplier = self
89 .config
90 .backoff_multiplier
91 .powi(self.consecutive_timeouts as i32);
92 let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
93 self.current_timeout = Duration::from_millis(new_ms.min(self.config.max_timeout_ms));
94 debug!(
95 consecutive = self.consecutive_timeouts,
96 timeout_ms = self.current_timeout.as_millis() as u64,
97 "pacemaker timeout backoff"
98 );
99 self.view_deadline = Instant::now() + self.current_timeout;
100 }
101
102 pub fn sleep_until_deadline(&self) -> Sleep {
103 tokio::time::sleep_until(self.view_deadline)
104 }
105
106 pub fn current_timeout(&self) -> Duration {
107 self.current_timeout
108 }
109
110 pub fn consecutive_timeouts(&self) -> u32 {
111 self.consecutive_timeouts
112 }
113
114 pub fn build_wish(
116 &self,
117 chain_id_hash: &[u8; 32],
118 epoch: EpochNumber,
119 current_view: ViewNumber,
120 validator_id: ValidatorId,
121 highest_qc: Option<QuorumCertificate>,
122 signer: &dyn hotmint_types::Signer,
123 ) -> ConsensusMessage {
124 let target_view = current_view.next();
125 let msg_bytes = wish_signing_bytes(chain_id_hash, epoch, target_view, highest_qc.as_ref());
126 let signature = signer.sign(&msg_bytes);
127 ConsensusMessage::Wish {
128 target_view,
129 validator: validator_id,
130 highest_qc,
131 signature,
132 }
133 }
134
135 pub fn add_wish(
137 &mut self,
138 vs: &ValidatorSet,
139 target_view: ViewNumber,
140 validator: ValidatorId,
141 highest_qc: Option<QuorumCertificate>,
142 signature: Signature,
143 ) -> Option<TimeoutCertificate> {
144 let wishes = self.wishes.entry(target_view).or_default();
145
146 if wishes.iter().any(|(id, _, _)| *id == validator) {
148 return None;
149 }
150
151 wishes.push((validator, highest_qc, signature));
152
153 let mut power = 0u64;
155 for (vid, _, _) in wishes.iter() {
156 power += vs.power_of(*vid);
157 }
158
159 if power >= vs.quorum_threshold() {
160 let mut agg = AggregateSignature::new(vs.validator_count());
161 let mut highest_qcs = vec![None; vs.validator_count()];
162
163 for (vid, hqc, sig) in wishes.iter() {
164 if let Some(idx) = vs.index_of(*vid) {
165 let _ = agg.add(idx, sig.clone());
166 highest_qcs[idx] = hqc.clone();
167 }
168 }
169
170 Some(TimeoutCertificate {
171 view: ViewNumber(target_view.as_u64().saturating_sub(1)),
172 aggregate_signature: agg,
173 highest_qcs,
174 })
175 } else {
176 None
177 }
178 }
179
180 pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
182 match self.relayed_tcs.entry(tc.view) {
183 Entry::Vacant(e) => {
184 e.insert(true);
185 true
186 }
187 Entry::Occupied(_) => false,
188 }
189 }
190
191 pub fn clear_view(&mut self, view: ViewNumber) {
192 self.wishes.remove(&view);
193 }
195
196 pub fn prune_before(&mut self, min_view: ViewNumber) {
198 self.wishes.retain(|v, _| *v >= min_view);
199 self.relayed_tcs.retain(|v, _| *v >= min_view);
200 }
201}
202
203pub(crate) fn wish_signing_bytes(
204 chain_id_hash: &[u8; 32],
205 epoch: EpochNumber,
206 target_view: ViewNumber,
207 highest_qc: Option<&QuorumCertificate>,
208) -> Vec<u8> {
209 let tag = b"HOTMINT_WISH_V1\0";
210 let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 8 + 42);
211 buf.extend_from_slice(tag);
212 buf.extend_from_slice(chain_id_hash);
213 buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
214 buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
215 match highest_qc {
218 None => buf.push(0x00),
219 Some(qc) => {
220 buf.push(0x01);
221 buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
222 buf.extend_from_slice(&qc.block_hash.0);
223 }
224 }
225 buf
226}
227
228#[cfg(test)]
229mod tests {
230 use super::*;
231
232 #[test]
233 fn test_exponential_backoff() {
234 let mut pm = Pacemaker::new();
235 assert_eq!(pm.consecutive_timeouts(), 0);
236 assert_eq!(pm.current_timeout().as_millis(), 2000);
237
238 pm.on_timeout();
239 assert_eq!(pm.consecutive_timeouts(), 1);
240 assert_eq!(pm.current_timeout().as_millis(), 3000); pm.on_timeout();
243 assert_eq!(pm.consecutive_timeouts(), 2);
244 assert_eq!(pm.current_timeout().as_millis(), 4500); pm.on_timeout();
247 assert_eq!(pm.consecutive_timeouts(), 3);
248 assert_eq!(pm.current_timeout().as_millis(), 6750); }
250
251 #[test]
252 fn test_backoff_caps_at_max() {
253 let mut pm = Pacemaker::new();
254 for _ in 0..20 {
255 pm.on_timeout();
256 }
257 assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
258 }
259
260 #[test]
261 fn test_reset_on_progress() {
262 let mut pm = Pacemaker::new();
263 pm.on_timeout();
264 pm.on_timeout();
265 assert!(pm.current_timeout().as_millis() > 2000);
266
267 pm.reset_on_progress();
268 assert_eq!(pm.consecutive_timeouts(), 0);
269 assert_eq!(pm.current_timeout().as_millis(), 2000);
270 }
271
272 #[test]
273 fn test_tc_relay_dedup() {
274 let mut pm = Pacemaker::new();
275 let tc = TimeoutCertificate {
276 view: ViewNumber(5),
277 aggregate_signature: AggregateSignature::new(4),
278 highest_qcs: vec![],
279 };
280 assert!(pm.should_relay_tc(&tc));
281 assert!(!pm.should_relay_tc(&tc)); }
283}