Skip to main content

hotmint_consensus/
pacemaker.rs

1use std::collections::HashMap;
2use std::collections::hash_map::Entry;
3use std::time::Duration;
4
5use hotmint_types::crypto::{AggregateSignature, Signature};
6use hotmint_types::epoch::EpochNumber;
7use hotmint_types::{
8    ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
9};
10use tokio::time::{Instant, Sleep};
11use tracing::debug;
12
13const BASE_TIMEOUT_MS: u64 = 2000;
14const MAX_TIMEOUT_MS: u64 = 30000;
15const BACKOFF_MULTIPLIER: f64 = 1.5;
16
17/// Configurable pacemaker parameters.
18#[derive(Debug, Clone)]
19pub struct PacemakerConfig {
20    pub base_timeout_ms: u64,
21    pub max_timeout_ms: u64,
22    pub backoff_multiplier: f64,
23}
24
25impl Default for PacemakerConfig {
26    fn default() -> Self {
27        Self {
28            base_timeout_ms: BASE_TIMEOUT_MS,
29            max_timeout_ms: MAX_TIMEOUT_MS,
30            backoff_multiplier: BACKOFF_MULTIPLIER,
31        }
32    }
33}
34
35/// Full pacemaker with exponential backoff and TC relay
36pub struct Pacemaker {
37    config: PacemakerConfig,
38    pub view_deadline: Instant,
39    base_timeout: Duration,
40    current_timeout: Duration,
41    /// Number of consecutive timeouts without progress
42    consecutive_timeouts: u32,
43    /// Collected wishes: target_view -> (validator_id, highest_qc, signature)
44    wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
45    /// TCs we have already relayed (avoid re-broadcasting)
46    relayed_tcs: HashMap<ViewNumber, bool>,
47}
48
49impl Default for Pacemaker {
50    fn default() -> Self {
51        Self::new()
52    }
53}
54
55impl Pacemaker {
56    pub fn new() -> Self {
57        Self::with_config(PacemakerConfig::default())
58    }
59
60    pub fn with_config(config: PacemakerConfig) -> Self {
61        let base_timeout = Duration::from_millis(config.base_timeout_ms);
62        Self {
63            config,
64            view_deadline: Instant::now() + base_timeout,
65            base_timeout,
66            current_timeout: base_timeout,
67            consecutive_timeouts: 0,
68            wishes: HashMap::new(),
69            relayed_tcs: HashMap::new(),
70        }
71    }
72
73    /// Reset timer with current backoff-adjusted timeout
74    pub fn reset_timer(&mut self) {
75        self.view_deadline = Instant::now() + self.current_timeout;
76    }
77
78    /// Reset timer after successful view completion (resets backoff)
79    pub fn reset_on_progress(&mut self) {
80        self.consecutive_timeouts = 0;
81        self.current_timeout = self.base_timeout;
82        self.view_deadline = Instant::now() + self.current_timeout;
83    }
84
85    /// Increase timeout with exponential backoff after a timeout event
86    pub fn on_timeout(&mut self) {
87        self.consecutive_timeouts += 1;
88        let multiplier = self
89            .config
90            .backoff_multiplier
91            .powi(self.consecutive_timeouts as i32);
92        let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
93        self.current_timeout = Duration::from_millis(new_ms.min(self.config.max_timeout_ms));
94        debug!(
95            consecutive = self.consecutive_timeouts,
96            timeout_ms = self.current_timeout.as_millis() as u64,
97            "pacemaker timeout backoff"
98        );
99        self.view_deadline = Instant::now() + self.current_timeout;
100    }
101
102    pub fn sleep_until_deadline(&self) -> Sleep {
103        tokio::time::sleep_until(self.view_deadline)
104    }
105
106    pub fn current_timeout(&self) -> Duration {
107        self.current_timeout
108    }
109
110    pub fn consecutive_timeouts(&self) -> u32 {
111        self.consecutive_timeouts
112    }
113
114    /// Build the Wish message for timeout
115    pub fn build_wish(
116        &self,
117        chain_id_hash: &[u8; 32],
118        epoch: EpochNumber,
119        current_view: ViewNumber,
120        validator_id: ValidatorId,
121        highest_qc: Option<QuorumCertificate>,
122        signer: &dyn hotmint_types::Signer,
123    ) -> ConsensusMessage {
124        let target_view = current_view.next();
125        let msg_bytes = wish_signing_bytes(chain_id_hash, epoch, target_view, highest_qc.as_ref());
126        let signature = signer.sign(&msg_bytes);
127        ConsensusMessage::Wish {
128            target_view,
129            validator: validator_id,
130            highest_qc,
131            signature,
132        }
133    }
134
135    /// Add a wish and check if we have 2f+1 to form a TC
136    pub fn add_wish(
137        &mut self,
138        vs: &ValidatorSet,
139        target_view: ViewNumber,
140        validator: ValidatorId,
141        highest_qc: Option<QuorumCertificate>,
142        signature: Signature,
143    ) -> Option<TimeoutCertificate> {
144        let wishes = self.wishes.entry(target_view).or_default();
145
146        // Dedup
147        if wishes.iter().any(|(id, _, _)| *id == validator) {
148            return None;
149        }
150
151        wishes.push((validator, highest_qc, signature));
152
153        // Check quorum
154        let mut power = 0u64;
155        for (vid, _, _) in wishes.iter() {
156            power += vs.power_of(*vid);
157        }
158
159        if power >= vs.quorum_threshold() {
160            let mut agg = AggregateSignature::new(vs.validator_count());
161            let mut highest_qcs = vec![None; vs.validator_count()];
162
163            for (vid, hqc, sig) in wishes.iter() {
164                if let Some(idx) = vs.index_of(*vid) {
165                    let _ = agg.add(idx, sig.clone());
166                    highest_qcs[idx] = hqc.clone();
167                }
168            }
169
170            Some(TimeoutCertificate {
171                view: ViewNumber(target_view.as_u64().saturating_sub(1)),
172                aggregate_signature: agg,
173                highest_qcs,
174            })
175        } else {
176            None
177        }
178    }
179
180    /// Check if we should relay a received TC (returns true if not yet relayed)
181    pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
182        match self.relayed_tcs.entry(tc.view) {
183            Entry::Vacant(e) => {
184                e.insert(true);
185                true
186            }
187            Entry::Occupied(_) => false,
188        }
189    }
190
191    pub fn clear_view(&mut self, view: ViewNumber) {
192        self.wishes.remove(&view);
193        // Keep relayed_tcs — pruned lazily
194    }
195
196    /// Prune old relay tracking data
197    pub fn prune_before(&mut self, min_view: ViewNumber) {
198        self.wishes.retain(|v, _| *v >= min_view);
199        self.relayed_tcs.retain(|v, _| *v >= min_view);
200    }
201}
202
203pub(crate) fn wish_signing_bytes(
204    chain_id_hash: &[u8; 32],
205    epoch: EpochNumber,
206    target_view: ViewNumber,
207    highest_qc: Option<&QuorumCertificate>,
208) -> Vec<u8> {
209    let tag = b"HOTMINT_WISH_V1\0";
210    let mut buf = Vec::with_capacity(tag.len() + 32 + 8 + 8 + 42);
211    buf.extend_from_slice(tag);
212    buf.extend_from_slice(chain_id_hash);
213    buf.extend_from_slice(&epoch.as_u64().to_le_bytes());
214    buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
215    // Bind the highest_qc to prevent replay with a different QC.
216    // Canonical encoding: 0x00 = None, 0x01 + view_le + hash = Some.
217    match highest_qc {
218        None => buf.push(0x00),
219        Some(qc) => {
220            buf.push(0x01);
221            buf.extend_from_slice(&qc.view.as_u64().to_le_bytes());
222            buf.extend_from_slice(&qc.block_hash.0);
223        }
224    }
225    buf
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn test_exponential_backoff() {
234        let mut pm = Pacemaker::new();
235        assert_eq!(pm.consecutive_timeouts(), 0);
236        assert_eq!(pm.current_timeout().as_millis(), 2000);
237
238        pm.on_timeout();
239        assert_eq!(pm.consecutive_timeouts(), 1);
240        assert_eq!(pm.current_timeout().as_millis(), 3000); // 2000 * 1.5
241
242        pm.on_timeout();
243        assert_eq!(pm.consecutive_timeouts(), 2);
244        assert_eq!(pm.current_timeout().as_millis(), 4500); // 2000 * 1.5^2
245
246        pm.on_timeout();
247        assert_eq!(pm.consecutive_timeouts(), 3);
248        assert_eq!(pm.current_timeout().as_millis(), 6750); // 2000 * 1.5^3
249    }
250
251    #[test]
252    fn test_backoff_caps_at_max() {
253        let mut pm = Pacemaker::new();
254        for _ in 0..20 {
255            pm.on_timeout();
256        }
257        assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
258    }
259
260    #[test]
261    fn test_reset_on_progress() {
262        let mut pm = Pacemaker::new();
263        pm.on_timeout();
264        pm.on_timeout();
265        assert!(pm.current_timeout().as_millis() > 2000);
266
267        pm.reset_on_progress();
268        assert_eq!(pm.consecutive_timeouts(), 0);
269        assert_eq!(pm.current_timeout().as_millis(), 2000);
270    }
271
272    #[test]
273    fn test_tc_relay_dedup() {
274        let mut pm = Pacemaker::new();
275        let tc = TimeoutCertificate {
276            view: ViewNumber(5),
277            aggregate_signature: AggregateSignature::new(4),
278            highest_qcs: vec![],
279        };
280        assert!(pm.should_relay_tc(&tc));
281        assert!(!pm.should_relay_tc(&tc)); // second time: already relayed
282    }
283}