Skip to main content

hotmint_consensus/
pacemaker.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use hotmint_types::crypto::{AggregateSignature, Signature};
5use hotmint_types::{
6    ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
7};
8use tokio::time::{Instant, Sleep};
9use tracing::debug;
10
11const BASE_TIMEOUT_MS: u64 = 2000;
12const MAX_TIMEOUT_MS: u64 = 30000;
13const BACKOFF_MULTIPLIER: f64 = 1.5;
14
15/// Configurable pacemaker parameters.
16#[derive(Debug, Clone)]
17pub struct PacemakerConfig {
18    pub base_timeout_ms: u64,
19    pub max_timeout_ms: u64,
20    pub backoff_multiplier: f64,
21}
22
23impl Default for PacemakerConfig {
24    fn default() -> Self {
25        Self {
26            base_timeout_ms: BASE_TIMEOUT_MS,
27            max_timeout_ms: MAX_TIMEOUT_MS,
28            backoff_multiplier: BACKOFF_MULTIPLIER,
29        }
30    }
31}
32
33/// Full pacemaker with exponential backoff and TC relay
34pub struct Pacemaker {
35    config: PacemakerConfig,
36    pub view_deadline: Instant,
37    base_timeout: Duration,
38    current_timeout: Duration,
39    /// Number of consecutive timeouts without progress
40    consecutive_timeouts: u32,
41    /// Collected wishes: target_view -> (validator_id, highest_qc, signature)
42    wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
43    /// TCs we have already relayed (avoid re-broadcasting)
44    relayed_tcs: HashMap<ViewNumber, bool>,
45}
46
47impl Default for Pacemaker {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl Pacemaker {
54    pub fn new() -> Self {
55        Self::with_config(PacemakerConfig::default())
56    }
57
58    pub fn with_config(config: PacemakerConfig) -> Self {
59        let base_timeout = Duration::from_millis(config.base_timeout_ms);
60        Self {
61            config,
62            view_deadline: Instant::now() + base_timeout,
63            base_timeout,
64            current_timeout: base_timeout,
65            consecutive_timeouts: 0,
66            wishes: HashMap::new(),
67            relayed_tcs: HashMap::new(),
68        }
69    }
70
71    /// Reset timer with current backoff-adjusted timeout
72    pub fn reset_timer(&mut self) {
73        self.view_deadline = Instant::now() + self.current_timeout;
74    }
75
76    /// Reset timer after successful view completion (resets backoff)
77    pub fn reset_on_progress(&mut self) {
78        self.consecutive_timeouts = 0;
79        self.current_timeout = self.base_timeout;
80        self.view_deadline = Instant::now() + self.current_timeout;
81    }
82
83    /// Increase timeout with exponential backoff after a timeout event
84    pub fn on_timeout(&mut self) {
85        self.consecutive_timeouts += 1;
86        let multiplier = self
87            .config
88            .backoff_multiplier
89            .powi(self.consecutive_timeouts as i32);
90        let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
91        self.current_timeout = Duration::from_millis(new_ms.min(self.config.max_timeout_ms));
92        debug!(
93            consecutive = self.consecutive_timeouts,
94            timeout_ms = self.current_timeout.as_millis() as u64,
95            "pacemaker timeout backoff"
96        );
97        self.view_deadline = Instant::now() + self.current_timeout;
98    }
99
100    pub fn sleep_until_deadline(&self) -> Sleep {
101        tokio::time::sleep_until(self.view_deadline)
102    }
103
104    pub fn current_timeout(&self) -> Duration {
105        self.current_timeout
106    }
107
108    pub fn consecutive_timeouts(&self) -> u32 {
109        self.consecutive_timeouts
110    }
111
112    /// Build the Wish message for timeout
113    pub fn build_wish(
114        &self,
115        current_view: ViewNumber,
116        validator_id: ValidatorId,
117        highest_qc: Option<QuorumCertificate>,
118        signer: &dyn hotmint_types::Signer,
119    ) -> ConsensusMessage {
120        let target_view = current_view.next();
121        let msg_bytes = wish_signing_bytes(target_view);
122        let signature = signer.sign(&msg_bytes);
123        ConsensusMessage::Wish {
124            target_view,
125            validator: validator_id,
126            highest_qc,
127            signature,
128        }
129    }
130
131    /// Add a wish and check if we have 2f+1 to form a TC
132    pub fn add_wish(
133        &mut self,
134        vs: &ValidatorSet,
135        target_view: ViewNumber,
136        validator: ValidatorId,
137        highest_qc: Option<QuorumCertificate>,
138        signature: Signature,
139    ) -> Option<TimeoutCertificate> {
140        let wishes = self.wishes.entry(target_view).or_default();
141
142        // Dedup
143        if wishes.iter().any(|(id, _, _)| *id == validator) {
144            return None;
145        }
146
147        wishes.push((validator, highest_qc, signature));
148
149        // Check quorum
150        let mut power = 0u64;
151        for (vid, _, _) in wishes.iter() {
152            power += vs.power_of(*vid);
153        }
154
155        if power >= vs.quorum_threshold() {
156            let mut agg = AggregateSignature::new(vs.validator_count());
157            let mut highest_qcs = vec![None; vs.validator_count()];
158
159            for (vid, hqc, sig) in wishes.iter() {
160                if let Some(idx) = vs.index_of(*vid) {
161                    let _ = agg.add(idx, sig.clone());
162                    highest_qcs[idx] = hqc.clone();
163                }
164            }
165
166            Some(TimeoutCertificate {
167                view: ViewNumber(target_view.as_u64().saturating_sub(1)),
168                aggregate_signature: agg,
169                highest_qcs,
170            })
171        } else {
172            None
173        }
174    }
175
176    /// Check if we should relay a received TC (returns true if not yet relayed)
177    pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
178        use std::collections::hash_map::Entry;
179        match self.relayed_tcs.entry(tc.view) {
180            Entry::Vacant(e) => {
181                e.insert(true);
182                true
183            }
184            Entry::Occupied(_) => false,
185        }
186    }
187
188    pub fn clear_view(&mut self, view: ViewNumber) {
189        self.wishes.remove(&view);
190        // Keep relayed_tcs — pruned lazily
191    }
192
193    /// Prune old relay tracking data
194    pub fn prune_before(&mut self, min_view: ViewNumber) {
195        self.wishes.retain(|v, _| *v >= min_view);
196        self.relayed_tcs.retain(|v, _| *v >= min_view);
197    }
198}
199
200pub(crate) fn wish_signing_bytes(target_view: ViewNumber) -> Vec<u8> {
201    let mut buf = Vec::with_capacity(9);
202    buf.push(b'W');
203    buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
204    buf
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[test]
212    fn test_exponential_backoff() {
213        let mut pm = Pacemaker::new();
214        assert_eq!(pm.consecutive_timeouts(), 0);
215        assert_eq!(pm.current_timeout().as_millis(), 2000);
216
217        pm.on_timeout();
218        assert_eq!(pm.consecutive_timeouts(), 1);
219        assert_eq!(pm.current_timeout().as_millis(), 3000); // 2000 * 1.5
220
221        pm.on_timeout();
222        assert_eq!(pm.consecutive_timeouts(), 2);
223        assert_eq!(pm.current_timeout().as_millis(), 4500); // 2000 * 1.5^2
224
225        pm.on_timeout();
226        assert_eq!(pm.consecutive_timeouts(), 3);
227        assert_eq!(pm.current_timeout().as_millis(), 6750); // 2000 * 1.5^3
228    }
229
230    #[test]
231    fn test_backoff_caps_at_max() {
232        let mut pm = Pacemaker::new();
233        for _ in 0..20 {
234            pm.on_timeout();
235        }
236        assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
237    }
238
239    #[test]
240    fn test_reset_on_progress() {
241        let mut pm = Pacemaker::new();
242        pm.on_timeout();
243        pm.on_timeout();
244        assert!(pm.current_timeout().as_millis() > 2000);
245
246        pm.reset_on_progress();
247        assert_eq!(pm.consecutive_timeouts(), 0);
248        assert_eq!(pm.current_timeout().as_millis(), 2000);
249    }
250
251    #[test]
252    fn test_tc_relay_dedup() {
253        let mut pm = Pacemaker::new();
254        let tc = TimeoutCertificate {
255            view: ViewNumber(5),
256            aggregate_signature: AggregateSignature::new(4),
257            highest_qcs: vec![],
258        };
259        assert!(pm.should_relay_tc(&tc));
260        assert!(!pm.should_relay_tc(&tc)); // second time: already relayed
261    }
262}