Skip to main content

hotmint_consensus/
pacemaker.rs

1use std::collections::HashMap;
2use std::time::Duration;
3
4use hotmint_types::crypto::{AggregateSignature, Signature};
5use hotmint_types::{
6    ConsensusMessage, QuorumCertificate, TimeoutCertificate, ValidatorId, ValidatorSet, ViewNumber,
7};
8use tokio::time::{Instant, Sleep};
9use tracing::debug;
10
11const BASE_TIMEOUT_MS: u64 = 2000;
12const MAX_TIMEOUT_MS: u64 = 30000;
13const BACKOFF_MULTIPLIER: f64 = 1.5;
14
15/// Full pacemaker with exponential backoff and TC relay
16pub struct Pacemaker {
17    pub view_deadline: Instant,
18    base_timeout: Duration,
19    current_timeout: Duration,
20    /// Number of consecutive timeouts without progress
21    consecutive_timeouts: u32,
22    /// Collected wishes: target_view -> (validator_id, highest_qc, signature)
23    wishes: HashMap<ViewNumber, Vec<(ValidatorId, Option<QuorumCertificate>, Signature)>>,
24    /// TCs we have already relayed (avoid re-broadcasting)
25    relayed_tcs: HashMap<ViewNumber, bool>,
26}
27
28impl Default for Pacemaker {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl Pacemaker {
35    pub fn new() -> Self {
36        let base_timeout = Duration::from_millis(BASE_TIMEOUT_MS);
37        Self {
38            view_deadline: Instant::now() + base_timeout,
39            base_timeout,
40            current_timeout: base_timeout,
41            consecutive_timeouts: 0,
42            wishes: HashMap::new(),
43            relayed_tcs: HashMap::new(),
44        }
45    }
46
47    /// Reset timer with current backoff-adjusted timeout
48    pub fn reset_timer(&mut self) {
49        self.view_deadline = Instant::now() + self.current_timeout;
50    }
51
52    /// Reset timer after successful view completion (resets backoff)
53    pub fn reset_on_progress(&mut self) {
54        self.consecutive_timeouts = 0;
55        self.current_timeout = self.base_timeout;
56        self.view_deadline = Instant::now() + self.current_timeout;
57    }
58
59    /// Increase timeout with exponential backoff after a timeout event
60    pub fn on_timeout(&mut self) {
61        self.consecutive_timeouts += 1;
62        let multiplier = BACKOFF_MULTIPLIER.powi(self.consecutive_timeouts as i32);
63        let new_ms = (self.base_timeout.as_millis() as f64 * multiplier) as u64;
64        self.current_timeout = Duration::from_millis(new_ms.min(MAX_TIMEOUT_MS));
65        debug!(
66            consecutive = self.consecutive_timeouts,
67            timeout_ms = self.current_timeout.as_millis() as u64,
68            "pacemaker timeout backoff"
69        );
70        self.view_deadline = Instant::now() + self.current_timeout;
71    }
72
73    pub fn sleep_until_deadline(&self) -> Sleep {
74        tokio::time::sleep_until(self.view_deadline)
75    }
76
77    pub fn current_timeout(&self) -> Duration {
78        self.current_timeout
79    }
80
81    pub fn consecutive_timeouts(&self) -> u32 {
82        self.consecutive_timeouts
83    }
84
85    /// Build the Wish message for timeout
86    pub fn build_wish(
87        &self,
88        current_view: ViewNumber,
89        validator_id: ValidatorId,
90        highest_qc: Option<QuorumCertificate>,
91        signer: &dyn hotmint_types::Signer,
92    ) -> ConsensusMessage {
93        let target_view = current_view.next();
94        let msg_bytes = wish_signing_bytes(target_view);
95        let signature = signer.sign(&msg_bytes);
96        ConsensusMessage::Wish {
97            target_view,
98            validator: validator_id,
99            highest_qc,
100            signature,
101        }
102    }
103
104    /// Add a wish and check if we have 2f+1 to form a TC
105    pub fn add_wish(
106        &mut self,
107        vs: &ValidatorSet,
108        target_view: ViewNumber,
109        validator: ValidatorId,
110        highest_qc: Option<QuorumCertificate>,
111        signature: Signature,
112    ) -> Option<TimeoutCertificate> {
113        let wishes = self.wishes.entry(target_view).or_default();
114
115        // Dedup
116        if wishes.iter().any(|(id, _, _)| *id == validator) {
117            return None;
118        }
119
120        wishes.push((validator, highest_qc, signature));
121
122        // Check quorum
123        let mut power = 0u64;
124        for (vid, _, _) in wishes.iter() {
125            power += vs.power_of(*vid);
126        }
127
128        if power >= vs.quorum_threshold() {
129            let mut agg = AggregateSignature::new(vs.validator_count());
130            let mut highest_qcs = vec![None; vs.validator_count()];
131
132            for (vid, hqc, sig) in wishes.iter() {
133                if let Some(idx) = vs.index_of(*vid) {
134                    let _ = agg.add(idx, sig.clone());
135                    highest_qcs[idx] = hqc.clone();
136                }
137            }
138
139            Some(TimeoutCertificate {
140                view: ViewNumber(target_view.as_u64().saturating_sub(1)),
141                aggregate_signature: agg,
142                highest_qcs,
143            })
144        } else {
145            None
146        }
147    }
148
149    /// Check if we should relay a received TC (returns true if not yet relayed)
150    pub fn should_relay_tc(&mut self, tc: &TimeoutCertificate) -> bool {
151        use std::collections::hash_map::Entry;
152        match self.relayed_tcs.entry(tc.view) {
153            Entry::Vacant(e) => {
154                e.insert(true);
155                true
156            }
157            Entry::Occupied(_) => false,
158        }
159    }
160
161    pub fn clear_view(&mut self, view: ViewNumber) {
162        self.wishes.remove(&view);
163        // Keep relayed_tcs — pruned lazily
164    }
165
166    /// Prune old relay tracking data
167    pub fn prune_before(&mut self, min_view: ViewNumber) {
168        self.wishes.retain(|v, _| *v >= min_view);
169        self.relayed_tcs.retain(|v, _| *v >= min_view);
170    }
171}
172
173fn wish_signing_bytes(target_view: ViewNumber) -> Vec<u8> {
174    let mut buf = Vec::with_capacity(9);
175    buf.push(b'W');
176    buf.extend_from_slice(&target_view.as_u64().to_le_bytes());
177    buf
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[test]
185    fn test_exponential_backoff() {
186        let mut pm = Pacemaker::new();
187        assert_eq!(pm.consecutive_timeouts(), 0);
188        assert_eq!(pm.current_timeout().as_millis(), 2000);
189
190        pm.on_timeout();
191        assert_eq!(pm.consecutive_timeouts(), 1);
192        assert_eq!(pm.current_timeout().as_millis(), 3000); // 2000 * 1.5
193
194        pm.on_timeout();
195        assert_eq!(pm.consecutive_timeouts(), 2);
196        assert_eq!(pm.current_timeout().as_millis(), 4500); // 2000 * 1.5^2
197
198        pm.on_timeout();
199        assert_eq!(pm.consecutive_timeouts(), 3);
200        assert_eq!(pm.current_timeout().as_millis(), 6750); // 2000 * 1.5^3
201    }
202
203    #[test]
204    fn test_backoff_caps_at_max() {
205        let mut pm = Pacemaker::new();
206        for _ in 0..20 {
207            pm.on_timeout();
208        }
209        assert!(pm.current_timeout().as_millis() <= MAX_TIMEOUT_MS as u128);
210    }
211
212    #[test]
213    fn test_reset_on_progress() {
214        let mut pm = Pacemaker::new();
215        pm.on_timeout();
216        pm.on_timeout();
217        assert!(pm.current_timeout().as_millis() > 2000);
218
219        pm.reset_on_progress();
220        assert_eq!(pm.consecutive_timeouts(), 0);
221        assert_eq!(pm.current_timeout().as_millis(), 2000);
222    }
223
224    #[test]
225    fn test_tc_relay_dedup() {
226        let mut pm = Pacemaker::new();
227        let tc = TimeoutCertificate {
228            view: ViewNumber(5),
229            aggregate_signature: AggregateSignature::new(4),
230            highest_qcs: vec![],
231        };
232        assert!(pm.should_relay_tc(&tc));
233        assert!(!pm.should_relay_tc(&tc)); // second time: already relayed
234    }
235}