Skip to main content

solana_leader/
engine.rs

1use crate::leader_buffer::{LeaderBuffer, SLOTS_PER_LEADER};
2use crate::leader_entry::LeaderEntry;
3use crate::schedule::ScheduleSnapshot;
4use std::sync::Arc;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7#[non_exhaustive]
8pub enum EngineEvent {
9    NoChange,
10    SlotAdvanced { slot: u64 },
11    ScheduleRefreshSuggested { slot: u64 },
12    NeedScheduleRefresh { slot: u64 },
13}
14
15pub struct LeaderEngine {
16    buffer: Arc<LeaderBuffer>,
17    active_schedule: ScheduleSnapshot,
18    scratch: Box<[LeaderEntry]>,
19    last_slot: u64,
20    last_leader_index: u64,
21    refresh_hint_active: bool,
22}
23
24impl LeaderEngine {
25    #[inline]
26    pub fn new(leaders_ahead: usize, initial_schedule: ScheduleSnapshot) -> Self {
27        Self::from_buffer(Arc::new(LeaderBuffer::new(leaders_ahead)), initial_schedule)
28    }
29
30    #[inline]
31    pub fn from_buffer(buffer: Arc<LeaderBuffer>, initial_schedule: ScheduleSnapshot) -> Self {
32        Self {
33            scratch: vec![LeaderEntry::EMPTY; buffer.len()].into_boxed_slice(),
34            buffer,
35            active_schedule: initial_schedule,
36            last_slot: 0,
37            last_leader_index: 0,
38            refresh_hint_active: false,
39        }
40    }
41
42    #[inline]
43    pub fn buffer(&self) -> Arc<LeaderBuffer> {
44        Arc::clone(&self.buffer)
45    }
46
47    #[inline]
48    pub fn active_schedule(&self) -> &ScheduleSnapshot {
49        &self.active_schedule
50    }
51
52    #[inline]
53    pub fn last_slot(&self) -> u64 {
54        self.last_slot
55    }
56
57    pub fn seed(&mut self, current_slot: u64) -> EngineEvent {
58        if current_slot == 0 {
59            return EngineEvent::NoChange;
60        }
61
62        self.last_slot = current_slot;
63        self.last_leader_index = current_slot / SLOTS_PER_LEADER;
64        self.refill_for_slot(current_slot)
65    }
66
67    pub fn on_slot(&mut self, current_slot: u64) -> EngineEvent {
68        if current_slot == 0 || current_slot == self.last_slot {
69            return EngineEvent::NoChange;
70        }
71
72        let current_leader_index = current_slot / SLOTS_PER_LEADER;
73        let event = if current_leader_index == self.last_leader_index {
74            self.buffer.set_current_slot(current_slot);
75            EngineEvent::SlotAdvanced { slot: current_slot }
76        } else if current_slot < self.last_slot || current_leader_index < self.last_leader_index {
77            self.refill_for_slot(current_slot)
78        } else if !self.active_schedule.covers_slot(current_slot) {
79            self.refresh_hint_active = false;
80            self.buffer.clear(current_slot);
81            EngineEvent::NeedScheduleRefresh { slot: current_slot }
82        } else {
83            let leaders_passed = (current_leader_index - self.last_leader_index) as usize;
84            if leaders_passed >= self.buffer.len() {
85                self.refill_for_slot(current_slot)
86            } else {
87                let lookahead_slot = (current_leader_index
88                    + (self.buffer.len() - leaders_passed) as u64)
89                    * SLOTS_PER_LEADER;
90                let written = self
91                    .active_schedule
92                    .get_next_leaders_into(lookahead_slot, &mut self.scratch[..leaders_passed]);
93                self.buffer
94                    .shift_multiple(leaders_passed, &self.scratch[..written], current_slot);
95                self.slot_advanced_event(current_slot, written == leaders_passed)
96            }
97        };
98
99        self.last_slot = current_slot;
100        self.last_leader_index = current_leader_index;
101        event
102    }
103
104    pub fn replace_schedule(&mut self, schedule: ScheduleSnapshot) -> EngineEvent {
105        self.active_schedule = schedule;
106        self.refresh_hint_active = false;
107        if self.last_slot == 0 {
108            return EngineEvent::NoChange;
109        }
110
111        self.refill_for_slot(self.last_slot)
112    }
113
114    fn refill_for_slot(&mut self, slot: u64) -> EngineEvent {
115        if !self.active_schedule.covers_slot(slot) {
116            self.refresh_hint_active = false;
117            self.buffer.clear(slot);
118            return EngineEvent::NeedScheduleRefresh { slot };
119        }
120
121        let written = self
122            .active_schedule
123            .get_next_leaders_into(slot, &mut self.scratch);
124        self.buffer.update(slot, &self.scratch[..written]);
125        self.slot_advanced_event(slot, written == self.buffer.len())
126    }
127
128    #[inline]
129    fn slot_advanced_event(&mut self, slot: u64, lookahead_complete: bool) -> EngineEvent {
130        if lookahead_complete {
131            self.refresh_hint_active = false;
132            return EngineEvent::SlotAdvanced { slot };
133        }
134
135        if self.refresh_hint_active {
136            return EngineEvent::SlotAdvanced { slot };
137        }
138
139        self.refresh_hint_active = true;
140        EngineEvent::ScheduleRefreshSuggested { slot }
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147    use crate::leader_entry::LeaderPubkey;
148
149    fn make_entry(id: u8) -> LeaderEntry {
150        LeaderEntry::new_ipv4(
151            LeaderPubkey::new([id; 32]),
152            [127, 0, 0, id],
153            8000,
154            [127, 0, 0, id],
155            8001,
156        )
157    }
158
159    fn make_schedule(epoch_start_slot: u64, ids: &[u8]) -> ScheduleSnapshot {
160        let leaders = ids
161            .iter()
162            .map(|id| make_entry(*id))
163            .collect::<Vec<_>>()
164            .into_boxed_slice();
165        ScheduleSnapshot::new(1, epoch_start_slot, leaders)
166    }
167
168    #[test]
169    fn seed_populates_buffer() {
170        let schedule = make_schedule(100, &[1, 2, 3, 4]);
171        let mut engine = LeaderEngine::new(2, schedule);
172
173        assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
174        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 1);
175        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 2);
176        assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 3);
177    }
178
179    #[test]
180    fn same_slot_is_no_change() {
181        let schedule = make_schedule(100, &[1, 2, 3]);
182        let mut engine = LeaderEngine::new(2, schedule);
183        engine.seed(100);
184
185        assert_eq!(engine.on_slot(100), EngineEvent::NoChange);
186    }
187
188    #[test]
189    fn leader_boundary_shift_updates_tail() {
190        let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
191        let mut engine = LeaderEngine::new(2, schedule);
192        engine.seed(100);
193
194        assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
195        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
196        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
197        assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
198    }
199
200    #[test]
201    fn large_jump_refills_from_current_slot() {
202        let schedule = make_schedule(100, &[1, 2, 3, 4, 5, 6]);
203        let mut engine = LeaderEngine::new(2, schedule);
204        engine.seed(100);
205
206        assert_eq!(
207            engine.on_slot(116),
208            EngineEvent::ScheduleRefreshSuggested { slot: 116 }
209        );
210        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 5);
211        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 6);
212        assert!(!engine.buffer.read(2).is_valid());
213    }
214
215    #[test]
216    fn uncovered_slot_requests_refresh() {
217        let schedule = make_schedule(100, &[1, 2]);
218        let mut engine = LeaderEngine::new(1, schedule);
219        engine.seed(100);
220
221        assert_eq!(
222            engine.on_slot(108),
223            EngineEvent::NeedScheduleRefresh { slot: 108 }
224        );
225        assert!(!engine.buffer.read(0).is_valid());
226        assert_eq!(engine.buffer.current_slot(), 108);
227    }
228
229    #[test]
230    fn replace_schedule_recovers_empty_buffer() {
231        let schedule = make_schedule(100, &[1, 2]);
232        let mut engine = LeaderEngine::new(1, schedule);
233        engine.seed(100);
234        engine.on_slot(108);
235
236        let replacement = make_schedule(108, &[9, 10, 11]);
237        assert_eq!(
238            engine.replace_schedule(replacement),
239            EngineEvent::SlotAdvanced { slot: 108 }
240        );
241        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 9);
242        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 10);
243    }
244
245    #[test]
246    fn seed_near_epoch_end_suggests_refresh_once() {
247        let schedule = make_schedule(100, &[1, 2]);
248        let mut engine = LeaderEngine::new(2, schedule);
249
250        assert_eq!(
251            engine.seed(100),
252            EngineEvent::ScheduleRefreshSuggested { slot: 100 }
253        );
254        assert!(!engine.buffer.read(2).is_valid());
255        let replacement = make_schedule(100, &[1, 2, 3]);
256        assert_eq!(
257            engine.replace_schedule(replacement),
258            EngineEvent::SlotAdvanced { slot: 100 }
259        );
260    }
261
262    #[test]
263    fn replace_with_still_partial_schedule_resuggests_refresh() {
264        let schedule = make_schedule(100, &[1, 2]);
265        let mut engine = LeaderEngine::new(2, schedule.clone());
266
267        assert_eq!(
268            engine.seed(100),
269            EngineEvent::ScheduleRefreshSuggested { slot: 100 }
270        );
271        assert_eq!(
272            engine.replace_schedule(schedule),
273            EngineEvent::ScheduleRefreshSuggested { slot: 100 }
274        );
275    }
276
277    #[test]
278    fn leader_boundary_shift_suggests_refresh_when_tail_falls_off_schedule() {
279        let schedule = make_schedule(100, &[1, 2, 3]);
280        let mut engine = LeaderEngine::new(2, schedule);
281
282        assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
283        assert_eq!(
284            engine.on_slot(104),
285            EngineEvent::ScheduleRefreshSuggested { slot: 104 }
286        );
287        assert!(!engine.buffer.read(2).is_valid());
288        assert_eq!(engine.on_slot(105), EngineEvent::SlotAdvanced { slot: 105 });
289    }
290
291    #[test]
292    fn slot_rollback_refills_from_current_slot() {
293        let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
294        let mut engine = LeaderEngine::new(2, schedule);
295        engine.seed(108);
296
297        assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
298        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
299        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
300        assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
301    }
302}