1use crate::leader_buffer::{LeaderBuffer, SLOTS_PER_LEADER};
2use crate::leader_entry::LeaderEntry;
3use crate::schedule::ScheduleSnapshot;
4use std::sync::Arc;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq)]
7#[non_exhaustive]
8pub enum EngineEvent {
9 NoChange,
10 SlotAdvanced { slot: u64 },
11 ScheduleRefreshSuggested { slot: u64 },
12 NeedScheduleRefresh { slot: u64 },
13}
14
15pub struct LeaderEngine {
16 buffer: Arc<LeaderBuffer>,
17 active_schedule: ScheduleSnapshot,
18 scratch: Box<[LeaderEntry]>,
19 last_slot: u64,
20 last_leader_index: u64,
21 refresh_hint_active: bool,
22}
23
24impl LeaderEngine {
25 #[inline]
26 pub fn new(leaders_ahead: usize, initial_schedule: ScheduleSnapshot) -> Self {
27 Self::from_buffer(Arc::new(LeaderBuffer::new(leaders_ahead)), initial_schedule)
28 }
29
30 #[inline]
31 pub fn from_buffer(buffer: Arc<LeaderBuffer>, initial_schedule: ScheduleSnapshot) -> Self {
32 Self {
33 scratch: vec![LeaderEntry::EMPTY; buffer.len()].into_boxed_slice(),
34 buffer,
35 active_schedule: initial_schedule,
36 last_slot: 0,
37 last_leader_index: 0,
38 refresh_hint_active: false,
39 }
40 }
41
42 #[inline]
43 pub fn buffer(&self) -> Arc<LeaderBuffer> {
44 Arc::clone(&self.buffer)
45 }
46
47 #[inline]
48 pub fn active_schedule(&self) -> &ScheduleSnapshot {
49 &self.active_schedule
50 }
51
52 #[inline]
53 pub fn last_slot(&self) -> u64 {
54 self.last_slot
55 }
56
57 pub fn seed(&mut self, current_slot: u64) -> EngineEvent {
58 if current_slot == 0 {
59 return EngineEvent::NoChange;
60 }
61
62 self.last_slot = current_slot;
63 self.last_leader_index = current_slot / SLOTS_PER_LEADER;
64 self.refill_for_slot(current_slot)
65 }
66
67 pub fn on_slot(&mut self, current_slot: u64) -> EngineEvent {
68 if current_slot == 0 || current_slot == self.last_slot {
69 return EngineEvent::NoChange;
70 }
71
72 let current_leader_index = current_slot / SLOTS_PER_LEADER;
73 let event = if current_leader_index == self.last_leader_index {
74 self.buffer.set_current_slot(current_slot);
75 EngineEvent::SlotAdvanced { slot: current_slot }
76 } else if current_slot < self.last_slot || current_leader_index < self.last_leader_index {
77 self.refill_for_slot(current_slot)
78 } else if !self.active_schedule.covers_slot(current_slot) {
79 self.refresh_hint_active = false;
80 self.buffer.clear(current_slot);
81 EngineEvent::NeedScheduleRefresh { slot: current_slot }
82 } else {
83 let leaders_passed = (current_leader_index - self.last_leader_index) as usize;
84 if leaders_passed >= self.buffer.len() {
85 self.refill_for_slot(current_slot)
86 } else {
87 let lookahead_slot = (current_leader_index
88 + (self.buffer.len() - leaders_passed) as u64)
89 * SLOTS_PER_LEADER;
90 let written = self
91 .active_schedule
92 .get_next_leaders_into(lookahead_slot, &mut self.scratch[..leaders_passed]);
93 self.buffer
94 .shift_multiple(leaders_passed, &self.scratch[..written], current_slot);
95 self.slot_advanced_event(current_slot, written == leaders_passed)
96 }
97 };
98
99 self.last_slot = current_slot;
100 self.last_leader_index = current_leader_index;
101 event
102 }
103
104 pub fn replace_schedule(&mut self, schedule: ScheduleSnapshot) -> EngineEvent {
105 self.active_schedule = schedule;
106 self.refresh_hint_active = false;
107 if self.last_slot == 0 {
108 return EngineEvent::NoChange;
109 }
110
111 self.refill_for_slot(self.last_slot)
112 }
113
114 fn refill_for_slot(&mut self, slot: u64) -> EngineEvent {
115 if !self.active_schedule.covers_slot(slot) {
116 self.refresh_hint_active = false;
117 self.buffer.clear(slot);
118 return EngineEvent::NeedScheduleRefresh { slot };
119 }
120
121 let written = self
122 .active_schedule
123 .get_next_leaders_into(slot, &mut self.scratch);
124 self.buffer.update(slot, &self.scratch[..written]);
125 self.slot_advanced_event(slot, written == self.buffer.len())
126 }
127
128 #[inline]
129 fn slot_advanced_event(&mut self, slot: u64, lookahead_complete: bool) -> EngineEvent {
130 if lookahead_complete {
131 self.refresh_hint_active = false;
132 return EngineEvent::SlotAdvanced { slot };
133 }
134
135 if self.refresh_hint_active {
136 return EngineEvent::SlotAdvanced { slot };
137 }
138
139 self.refresh_hint_active = true;
140 EngineEvent::ScheduleRefreshSuggested { slot }
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147 use crate::leader_entry::LeaderPubkey;
148
149 fn make_entry(id: u8) -> LeaderEntry {
150 LeaderEntry::new_ipv4(
151 LeaderPubkey::new([id; 32]),
152 [127, 0, 0, id],
153 8000,
154 [127, 0, 0, id],
155 8001,
156 )
157 }
158
159 fn make_schedule(epoch_start_slot: u64, ids: &[u8]) -> ScheduleSnapshot {
160 let leaders = ids
161 .iter()
162 .map(|id| make_entry(*id))
163 .collect::<Vec<_>>()
164 .into_boxed_slice();
165 ScheduleSnapshot::new(1, epoch_start_slot, leaders)
166 }
167
168 #[test]
169 fn seed_populates_buffer() {
170 let schedule = make_schedule(100, &[1, 2, 3, 4]);
171 let mut engine = LeaderEngine::new(2, schedule);
172
173 assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
174 assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 1);
175 assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 2);
176 assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 3);
177 }
178
179 #[test]
180 fn same_slot_is_no_change() {
181 let schedule = make_schedule(100, &[1, 2, 3]);
182 let mut engine = LeaderEngine::new(2, schedule);
183 engine.seed(100);
184
185 assert_eq!(engine.on_slot(100), EngineEvent::NoChange);
186 }
187
188 #[test]
189 fn leader_boundary_shift_updates_tail() {
190 let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
191 let mut engine = LeaderEngine::new(2, schedule);
192 engine.seed(100);
193
194 assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
195 assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
196 assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
197 assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
198 }
199
200 #[test]
201 fn large_jump_refills_from_current_slot() {
202 let schedule = make_schedule(100, &[1, 2, 3, 4, 5, 6]);
203 let mut engine = LeaderEngine::new(2, schedule);
204 engine.seed(100);
205
206 assert_eq!(
207 engine.on_slot(116),
208 EngineEvent::ScheduleRefreshSuggested { slot: 116 }
209 );
210 assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 5);
211 assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 6);
212 assert!(!engine.buffer.read(2).is_valid());
213 }
214
215 #[test]
216 fn uncovered_slot_requests_refresh() {
217 let schedule = make_schedule(100, &[1, 2]);
218 let mut engine = LeaderEngine::new(1, schedule);
219 engine.seed(100);
220
221 assert_eq!(
222 engine.on_slot(108),
223 EngineEvent::NeedScheduleRefresh { slot: 108 }
224 );
225 assert!(!engine.buffer.read(0).is_valid());
226 assert_eq!(engine.buffer.current_slot(), 108);
227 }
228
229 #[test]
230 fn replace_schedule_recovers_empty_buffer() {
231 let schedule = make_schedule(100, &[1, 2]);
232 let mut engine = LeaderEngine::new(1, schedule);
233 engine.seed(100);
234 engine.on_slot(108);
235
236 let replacement = make_schedule(108, &[9, 10, 11]);
237 assert_eq!(
238 engine.replace_schedule(replacement),
239 EngineEvent::SlotAdvanced { slot: 108 }
240 );
241 assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 9);
242 assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 10);
243 }
244
245 #[test]
246 fn seed_near_epoch_end_suggests_refresh_once() {
247 let schedule = make_schedule(100, &[1, 2]);
248 let mut engine = LeaderEngine::new(2, schedule);
249
250 assert_eq!(
251 engine.seed(100),
252 EngineEvent::ScheduleRefreshSuggested { slot: 100 }
253 );
254 assert!(!engine.buffer.read(2).is_valid());
255 let replacement = make_schedule(100, &[1, 2, 3]);
256 assert_eq!(
257 engine.replace_schedule(replacement),
258 EngineEvent::SlotAdvanced { slot: 100 }
259 );
260 }
261
262 #[test]
263 fn replace_with_still_partial_schedule_resuggests_refresh() {
264 let schedule = make_schedule(100, &[1, 2]);
265 let mut engine = LeaderEngine::new(2, schedule.clone());
266
267 assert_eq!(
268 engine.seed(100),
269 EngineEvent::ScheduleRefreshSuggested { slot: 100 }
270 );
271 assert_eq!(
272 engine.replace_schedule(schedule),
273 EngineEvent::ScheduleRefreshSuggested { slot: 100 }
274 );
275 }
276
277 #[test]
278 fn leader_boundary_shift_suggests_refresh_when_tail_falls_off_schedule() {
279 let schedule = make_schedule(100, &[1, 2, 3]);
280 let mut engine = LeaderEngine::new(2, schedule);
281
282 assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
283 assert_eq!(
284 engine.on_slot(104),
285 EngineEvent::ScheduleRefreshSuggested { slot: 104 }
286 );
287 assert!(!engine.buffer.read(2).is_valid());
288 assert_eq!(engine.on_slot(105), EngineEvent::SlotAdvanced { slot: 105 });
289 }
290
291 #[test]
292 fn slot_rollback_refills_from_current_slot() {
293 let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
294 let mut engine = LeaderEngine::new(2, schedule);
295 engine.seed(108);
296
297 assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
298 assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
299 assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
300 assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
301 }
302}