1use std::collections::HashMap;
2use std::collections::hash_map::Entry;
3use std::time::Duration;
4
5use hotmint_types::crypto::{AggregateSignature, Signature};
6use hotmint_types::{
7 ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
8};
9use tokio::time::{Instant, Sleep};
10use tracing::debug;
11
12const BASE_TIMEOUT_MS: u64 = 2000;
13const MAX_TIMEOUT_MS: u64 = 30000;
14const BACKOFF_MULTIPLIER: f64 = 1.5;
15
16#[derive(Debug, Clone)]
18pub struct PacemakerConfig {
19 pub base_timeout_ms: u64,
20 pub max_timeout_ms: u64,
21 pub backoff_multiplier: f64,
22}
23
24impl Default for PacemakerConfig {
25 fn default() -> Self {
26 Self {
27 base_timeout_ms: BASE_TIMEOUT_MS,
28 max_timeout_ms: MAX_TIMEOUT_MS,
29 backoff_multiplier: BACKOFF_MULTIPLIER,
30 }
31 }
32}
33
34pub struct Pacemaker {
36 config: PacemakerConfig,
37 pub view_deadline: Instant,
38 base_timeout: Duration,
39 current_timeout: Duration,
40 consecutive_timeouts: u32,
42 wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
44 relayed_tcs: HashMap<ViewNumber, bool>,
46}
47
48impl Default for Pacemaker {
49 fn default() -> Self {
50 Self::new()
51 }
52}
53
54impl Pacemaker {
55 pub fn new() -> Self {
56 Self::with_config(PacemakerConfig::default())
57 }
58
59 pub fn with_config(config: PacemakerConfig) -> Self {
60 let base_timeout = Duration::from_millis(config.base_timeout_ms);
61 Self {
62 config,
63 view_deadline: Instant::now() + base_timeout,
64 base_timeout,
65 current_timeout: base_timeout,
66 consecutive_timeouts: 0,
67 wishes: HashMap::new(),
68 relayed_tcs: HashMap::new(),
69 }
70 }
71
72 pub fn reset_timer(&mut self) {
74 self.view_deadline = Instant::now() + self.current_timeout;
75 }
76
77 pub fn reset_on_progress(&mut self) {
79 self.consecutive_timeouts = 0;
80 self.current_timeout = self.base_timeout;
81 self.view_deadline = Instant::now() + self.current_timeout;
82 }
83
84 pub fn on_timeout(&mut self) {
86 self.consecutive_timeouts += 1;
87 let multiplier = self
88 .config
89 .backoff_multiplier
90 .powi(self.consecutive_timeouts as i32);
91 let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
92 self.current_timeout = Duration::from_millis(new_ms.min(self.config.max_timeout_ms));
93 debug!(
94 consecutive = self.consecutive_timeouts,
95 timeout_ms = self.current_timeout.as_millis() as u64,
96 "pacemaker timeout backoff"
97 );
98 self.view_deadline = Instant::now() + self.current_timeout;
99 }
100
101 pub fn sleep_until_deadline(&self) -> Sleep {
102 tokio::time::sleep_until(self.view_deadline)
103 }
104
105 pub fn current_timeout(&self) -> Duration {
106 self.current_timeout
107 }
108
109 pub fn consecutive_timeouts(&self) -> u32 {
110 self.consecutive_timeouts
111 }
112
113 pub fn build_wish(
115 &self,
116 chain_id_hash: &[u8; 32],
117 current_view: ViewNumber,
118 validator_id: ValidatorId,
119 highest_qc: Option<QuorumCertificate>,
120 signer: &dyn hotmint_types::Signer,
121 ) -> ConsensusMessage {
122 let target_view = current_view.next();
123 let msg_bytes = wish_signing_bytes(chain_id_hash, target_view, highest_qc.as_ref());
124 let signature = signer.sign(&msg_bytes);
125 ConsensusMessage::Wish {
126 target_view,
127 validator: validator_id,
128 highest_qc,
129 signature,
130 }
131 }
132
133 pub fn add_wish(
135 &mut self,
136 vs: &ValidatorSet,
137 target_view: ViewNumber,
138 validator: ValidatorId,
139 highest_qc: Option<QuorumCertificate>,
140 signature: Signature,
141 ) -> Option<TimeoutCertificate> {
142 let wishes = self.wishes.entry(target_view).or_default();
143
144 if wishes.iter().any(|(id, _, _)| *id == validator) {
146 return None;
147 }
148
149 wishes.push((validator, highest_qc, signature));
150
151 let mut power = 0u64;
153 for (vid, _, _) in wishes.iter() {
154 power += vs.power_of(*vid);
155 }
156
157 if power >= vs.quorum_threshold() {
158 let mut agg = AggregateSignature::new(vs.validator_count());
159 let mut highest_qcs = vec![None; vs.validator_count()];
160
161 for (vid, hqc, sig) in wishes.iter() {
162 if let Some(idx) = vs.index_of(*vid) {
163 let _ = agg.add(idx, sig.clone());
164 highest_qcs[idx] = hqc.clone();
165 }
166 }
167
168 Some(TimeoutCertificate {
169 view: ViewNumber(target_view.as_u64().saturating_sub(1)),
170 aggregate_signature: agg,
171 highest_qcs,
172 })
173 } else {
174 None
175 }
176 }
177
178 pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
180 match self.relayed_tcs.entry(tc.view) {
181 Entry::Vacant(e) => {
182 e.insert(true);
183 true
184 }
185 Entry::Occupied(_) => false,
186 }
187 }
188
189 pub fn clear_view(&mut self, view: ViewNumber) {
190 self.wishes.remove(&view);
191 }
193
194 pub fn prune_before(&mut self, min_view: ViewNumber) {
196 self.wishes.retain(|v, _| *v >= min_view);
197 self.relayed_tcs.retain(|v, _| *v >= min_view);
198 }
199}
200
201pub(crate) fn wish_signing_bytes(
202 chain_id_hash: &[u8; 32],
203 target_view: ViewNumber,
204 highest_qc: Option<&QuorumCertificate>,
205) -> Vec<u8> {
206 let tag = b"HOTMINT_WISH_V1\0";
207 let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 42);
208 buf.extend_from_slice(tag);
209 buf.extend_from_slice(chain_id_hash);
210 buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
211 match highest_qc {
214 None => buf.push(0x00),
215 Some(qc) => {
216 buf.push(0x01);
217 buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
218 buf.extend_from_slice(&qc.block_hash.0);
219 }
220 }
221 buf
222}
223
224#[cfg(test)]
225mod tests {
226 use super::*;
227
228 #[test]
229 fn test_exponential_backoff() {
230 let mut pm = Pacemaker::new();
231 assert_eq!(pm.consecutive_timeouts(), 0);
232 assert_eq!(pm.current_timeout().as_millis(), 2000);
233
234 pm.on_timeout();
235 assert_eq!(pm.consecutive_timeouts(), 1);
236 assert_eq!(pm.current_timeout().as_millis(), 3000); pm.on_timeout();
239 assert_eq!(pm.consecutive_timeouts(), 2);
240 assert_eq!(pm.current_timeout().as_millis(), 4500); pm.on_timeout();
243 assert_eq!(pm.consecutive_timeouts(), 3);
244 assert_eq!(pm.current_timeout().as_millis(), 6750); }
246
247 #[test]
248 fn test_backoff_caps_at_max() {
249 let mut pm = Pacemaker::new();
250 for _ in 0..20 {
251 pm.on_timeout();
252 }
253 assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
254 }
255
256 #[test]
257 fn test_reset_on_progress() {
258 let mut pm = Pacemaker::new();
259 pm.on_timeout();
260 pm.on_timeout();
261 assert!(pm.current_timeout().as_millis() > 2000);
262
263 pm.reset_on_progress();
264 assert_eq!(pm.consecutive_timeouts(), 0);
265 assert_eq!(pm.current_timeout().as_millis(), 2000);
266 }
267
268 #[test]
269 fn test_tc_relay_dedup() {
270 let mut pm = Pacemaker::new();
271 let tc = TimeoutCertificate {
272 view: ViewNumber(5),
273 aggregate_signature: AggregateSignature::new(4),
274 highest_qcs: vec![],
275 };
276 assert!(pm.should_relay_tc(&tc));
277 assert!(!pm.should_relay_tc(&tc)); }
279}