Skip to main content

hotmint_consensus/
pacemaker.rs

1use std::collections::HashMap;
2use std::collections::hash_map::Entry;
3use std::time::Duration;
4
5use hotmint_types::crypto::{AggregateSignature, Signature};
6use hotmint_types::{
7    ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
8};
9use tokio::time::{Instant, Sleep};
10use tracing::debug;
11
12const BASE_TIMEOUT_MS: u64 = 2000;
13const MAX_TIMEOUT_MS: u64 = 30000;
14const BACKOFF_MULTIPLIER: f64 = 1.5;
15
16/// Configurable pacemaker parameters.
17#[derive(Debug, Clone)]
18pub struct PacemakerConfig {
19    pub base_timeout_ms: u64,
20    pub max_timeout_ms: u64,
21    pub backoff_multiplier: f64,
22}
23
24impl Default for PacemakerConfig {
25    fn default() -> Self {
26        Self {
27            base_timeout_ms: BASE_TIMEOUT_MS,
28            max_timeout_ms: MAX_TIMEOUT_MS,
29            backoff_multiplier: BACKOFF_MULTIPLIER,
30        }
31    }
32}
33
34/// Full pacemaker with exponential backoff and TC relay
35pub struct Pacemaker {
36    config: PacemakerConfig,
37    pub view_deadline: Instant,
38    base_timeout: Duration,
39    current_timeout: Duration,
40    /// Number of consecutive timeouts without progress
41    consecutive_timeouts: u32,
42    /// Collected wishes: target_view -> (validator_id, highest_qc, signature)
43    wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
44    /// TCs we have already relayed (avoid re-broadcasting)
45    relayed_tcs: HashMap<ViewNumber, bool>,
46}
47
48impl Default for Pacemaker {
49    fn default() -> Self {
50        Self::new()
51    }
52}
53
54impl Pacemaker {
55    pub fn new() -> Self {
56        Self::with_config(PacemakerConfig::default())
57    }
58
59    pub fn with_config(config: PacemakerConfig) -> Self {
60        let base_timeout = Duration::from_millis(config.base_timeout_ms);
61        Self {
62            config,
63            view_deadline: Instant::now() + base_timeout,
64            base_timeout,
65            current_timeout: base_timeout,
66            consecutive_timeouts: 0,
67            wishes: HashMap::new(),
68            relayed_tcs: HashMap::new(),
69        }
70    }
71
72    /// Reset timer with current backoff-adjusted timeout
73    pub fn reset_timer(&mut self) {
74        self.view_deadline = Instant::now() + self.current_timeout;
75    }
76
77    /// Reset timer after successful view completion (resets backoff)
78    pub fn reset_on_progress(&mut self) {
79        self.consecutive_timeouts = 0;
80        self.current_timeout = self.base_timeout;
81        self.view_deadline = Instant::now() + self.current_timeout;
82    }
83
84    /// Increase timeout with exponential backoff after a timeout event
85    pub fn on_timeout(&mut self) {
86        self.consecutive_timeouts += 1;
87        let multiplier = self
88            .config
89            .backoff_multiplier
90            .powi(self.consecutive_timeouts as i32);
91        let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
92        self.current_timeout = Duration::from_millis(new_ms.min(self.config.max_timeout_ms));
93        debug!(
94            consecutive = self.consecutive_timeouts,
95            timeout_ms = self.current_timeout.as_millis() as u64,
96            "pacemaker timeout backoff"
97        );
98        self.view_deadline = Instant::now() + self.current_timeout;
99    }
100
101    pub fn sleep_until_deadline(&self) -> Sleep {
102        tokio::time::sleep_until(self.view_deadline)
103    }
104
105    pub fn current_timeout(&self) -> Duration {
106        self.current_timeout
107    }
108
109    pub fn consecutive_timeouts(&self) -> u32 {
110        self.consecutive_timeouts
111    }
112
113    /// Build the Wish message for timeout
114    pub fn build_wish(
115        &self,
116        chain_id_hash: &[u8; 32],
117        current_view: ViewNumber,
118        validator_id: ValidatorId,
119        highest_qc: Option<QuorumCertificate>,
120        signer: &dyn hotmint_types::Signer,
121    ) -> ConsensusMessage {
122        let target_view = current_view.next();
123        let msg_bytes = wish_signing_bytes(chain_id_hash, target_view, highest_qc.as_ref());
124        let signature = signer.sign(&msg_bytes);
125        ConsensusMessage::Wish {
126            target_view,
127            validator: validator_id,
128            highest_qc,
129            signature,
130        }
131    }
132
133    /// Add a wish and check if we have 2f+1 to form a TC
134    pub fn add_wish(
135        &mut self,
136        vs: &ValidatorSet,
137        target_view: ViewNumber,
138        validator: ValidatorId,
139        highest_qc: Option<QuorumCertificate>,
140        signature: Signature,
141    ) -> Option<TimeoutCertificate> {
142        let wishes = self.wishes.entry(target_view).or_default();
143
144        // Dedup
145        if wishes.iter().any(|(id, _, _)| *id == validator) {
146            return None;
147        }
148
149        wishes.push((validator, highest_qc, signature));
150
151        // Check quorum
152        let mut power = 0u64;
153        for (vid, _, _) in wishes.iter() {
154            power += vs.power_of(*vid);
155        }
156
157        if power >= vs.quorum_threshold() {
158            let mut agg = AggregateSignature::new(vs.validator_count());
159            let mut highest_qcs = vec![None; vs.validator_count()];
160
161            for (vid, hqc, sig) in wishes.iter() {
162                if let Some(idx) = vs.index_of(*vid) {
163                    let _ = agg.add(idx, sig.clone());
164                    highest_qcs[idx] = hqc.clone();
165                }
166            }
167
168            Some(TimeoutCertificate {
169                view: ViewNumber(target_view.as_u64().saturating_sub(1)),
170                aggregate_signature: agg,
171                highest_qcs,
172            })
173        } else {
174            None
175        }
176    }
177
178    /// Check if we should relay a received TC (returns true if not yet relayed)
179    pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
180        match self.relayed_tcs.entry(tc.view) {
181            Entry::Vacant(e) => {
182                e.insert(true);
183                true
184            }
185            Entry::Occupied(_) => false,
186        }
187    }
188
189    pub fn clear_view(&mut self, view: ViewNumber) {
190        self.wishes.remove(&view);
191        // Keep relayed_tcs — pruned lazily
192    }
193
194    /// Prune old relay tracking data
195    pub fn prune_before(&mut self, min_view: ViewNumber) {
196        self.wishes.retain(|v, _| *v >= min_view);
197        self.relayed_tcs.retain(|v, _| *v >= min_view);
198    }
199}
200
201pub(crate) fn wish_signing_bytes(
202    chain_id_hash: &[u8; 32],
203    target_view: ViewNumber,
204    highest_qc: Option<&QuorumCertificate>,
205) -> Vec<u8> {
206    let tag = b"HOTMINT_WISH_V1\0";
207    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 42);
208    buf.extend_from_slice(tag);
209    buf.extend_from_slice(chain_id_hash);
210    buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
211    // Bind the highest_qc to prevent replay with a different QC.
212    // Canonical encoding: 0x00 = None, 0x01 + view_le + hash = Some.
213    match highest_qc {
214        None => buf.push(0x00),
215        Some(qc) => {
216            buf.push(0x01);
217            buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
218            buf.extend_from_slice(&qc.block_hash.0);
219        }
220    }
221    buf
222}
223
224#[cfg(test)]
225mod tests {
226    use super::*;
227
228    #[test]
229    fn test_exponential_backoff() {
230        let mut pm = Pacemaker::new();
231        assert_eq!(pm.consecutive_timeouts(), 0);
232        assert_eq!(pm.current_timeout().as_millis(), 2000);
233
234        pm.on_timeout();
235        assert_eq!(pm.consecutive_timeouts(), 1);
236        assert_eq!(pm.current_timeout().as_millis(), 3000); // 2000 * 1.5
237
238        pm.on_timeout();
239        assert_eq!(pm.consecutive_timeouts(), 2);
240        assert_eq!(pm.current_timeout().as_millis(), 4500); // 2000 * 1.5^2
241
242        pm.on_timeout();
243        assert_eq!(pm.consecutive_timeouts(), 3);
244        assert_eq!(pm.current_timeout().as_millis(), 6750); // 2000 * 1.5^3
245    }
246
247    #[test]
248    fn test_backoff_caps_at_max() {
249        let mut pm = Pacemaker::new();
250        for _ in 0..20 {
251            pm.on_timeout();
252        }
253        assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
254    }
255
256    #[test]
257    fn test_reset_on_progress() {
258        let mut pm = Pacemaker::new();
259        pm.on_timeout();
260        pm.on_timeout();
261        assert!(pm.current_timeout().as_millis() > 2000);
262
263        pm.reset_on_progress();
264        assert_eq!(pm.consecutive_timeouts(), 0);
265        assert_eq!(pm.current_timeout().as_millis(), 2000);
266    }
267
268    #[test]
269    fn test_tc_relay_dedup() {
270        let mut pm = Pacemaker::new();
271        let tc = TimeoutCertificate {
272            view: ViewNumber(5),
273            aggregate_signature: AggregateSignature::new(4),
274            highest_qcs: vec![],
275        };
276        assert!(pm.should_relay_tc(&tc));
277        assert!(!pm.should_relay_tc(&tc)); // second time: already relayed
278    }
279}