use crate::leader_buffer::{LeaderBuffer, SLOTS_PER_LEADER};
use crate::leader_entry::LeaderEntry;
use crate::schedule::ScheduleSnapshot;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum EngineEvent {
NoChange,
SlotAdvanced { slot: u64 },
ScheduleRefreshSuggested { slot: u64 },
NeedScheduleRefresh { slot: u64 },
}
pub struct LeaderEngine {
buffer: Arc<LeaderBuffer>,
active_schedule: ScheduleSnapshot,
scratch: Box<[LeaderEntry]>,
last_slot: u64,
last_leader_index: u64,
refresh_hint_active: bool,
}
impl LeaderEngine {
#[inline]
pub fn new(leaders_ahead: usize, initial_schedule: ScheduleSnapshot) -> Self {
Self::from_buffer(Arc::new(LeaderBuffer::new(leaders_ahead)), initial_schedule)
}
#[inline]
pub fn from_buffer(buffer: Arc<LeaderBuffer>, initial_schedule: ScheduleSnapshot) -> Self {
Self {
scratch: vec![LeaderEntry::EMPTY; buffer.len()].into_boxed_slice(),
buffer,
active_schedule: initial_schedule,
last_slot: 0,
last_leader_index: 0,
refresh_hint_active: false,
}
}
#[inline]
pub fn buffer(&self) -> Arc<LeaderBuffer> {
Arc::clone(&self.buffer)
}
#[inline]
pub fn active_schedule(&self) -> &ScheduleSnapshot {
&self.active_schedule
}
#[inline]
pub fn last_slot(&self) -> u64 {
self.last_slot
}
pub fn seed(&mut self, current_slot: u64) -> EngineEvent {
if current_slot == 0 {
return EngineEvent::NoChange;
}
self.last_slot = current_slot;
self.last_leader_index = current_slot / SLOTS_PER_LEADER;
self.refill_for_slot(current_slot)
}
pub fn on_slot(&mut self, current_slot: u64) -> EngineEvent {
if current_slot == 0 || current_slot == self.last_slot {
return EngineEvent::NoChange;
}
let current_leader_index = current_slot / SLOTS_PER_LEADER;
let event = if current_leader_index == self.last_leader_index {
self.buffer.set_current_slot(current_slot);
EngineEvent::SlotAdvanced { slot: current_slot }
} else if current_slot < self.last_slot || current_leader_index < self.last_leader_index {
self.refill_for_slot(current_slot)
} else if !self.active_schedule.covers_slot(current_slot) {
self.refresh_hint_active = false;
self.buffer.clear(current_slot);
EngineEvent::NeedScheduleRefresh { slot: current_slot }
} else {
let leaders_passed = (current_leader_index - self.last_leader_index) as usize;
if leaders_passed >= self.buffer.len() {
self.refill_for_slot(current_slot)
} else {
let lookahead_slot = (current_leader_index
+ (self.buffer.len() - leaders_passed) as u64)
* SLOTS_PER_LEADER;
let written = self
.active_schedule
.get_next_leaders_into(lookahead_slot, &mut self.scratch[..leaders_passed]);
self.buffer
.shift_multiple(leaders_passed, &self.scratch[..written], current_slot);
self.slot_advanced_event(current_slot, written == leaders_passed)
}
};
self.last_slot = current_slot;
self.last_leader_index = current_leader_index;
event
}
pub fn replace_schedule(&mut self, schedule: ScheduleSnapshot) -> EngineEvent {
self.active_schedule = schedule;
self.refresh_hint_active = false;
if self.last_slot == 0 {
return EngineEvent::NoChange;
}
self.refill_for_slot(self.last_slot)
}
fn refill_for_slot(&mut self, slot: u64) -> EngineEvent {
if !self.active_schedule.covers_slot(slot) {
self.refresh_hint_active = false;
self.buffer.clear(slot);
return EngineEvent::NeedScheduleRefresh { slot };
}
let written = self
.active_schedule
.get_next_leaders_into(slot, &mut self.scratch);
self.buffer.update(slot, &self.scratch[..written]);
self.slot_advanced_event(slot, written == self.buffer.len())
}
#[inline]
fn slot_advanced_event(&mut self, slot: u64, lookahead_complete: bool) -> EngineEvent {
if lookahead_complete {
self.refresh_hint_active = false;
return EngineEvent::SlotAdvanced { slot };
}
if self.refresh_hint_active {
return EngineEvent::SlotAdvanced { slot };
}
self.refresh_hint_active = true;
EngineEvent::ScheduleRefreshSuggested { slot }
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::leader_entry::LeaderPubkey;
fn make_entry(id: u8) -> LeaderEntry {
LeaderEntry::new_ipv4(
LeaderPubkey::new([id; 32]),
[127, 0, 0, id],
8000,
[127, 0, 0, id],
8001,
)
}
fn make_schedule(epoch_start_slot: u64, ids: &[u8]) -> ScheduleSnapshot {
let leaders = ids
.iter()
.map(|id| make_entry(*id))
.collect::<Vec<_>>()
.into_boxed_slice();
ScheduleSnapshot::new(1, epoch_start_slot, leaders)
}
#[test]
fn seed_populates_buffer() {
let schedule = make_schedule(100, &[1, 2, 3, 4]);
let mut engine = LeaderEngine::new(2, schedule);
assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 1);
assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 2);
assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 3);
}
#[test]
fn same_slot_is_no_change() {
let schedule = make_schedule(100, &[1, 2, 3]);
let mut engine = LeaderEngine::new(2, schedule);
engine.seed(100);
assert_eq!(engine.on_slot(100), EngineEvent::NoChange);
}
#[test]
fn leader_boundary_shift_updates_tail() {
let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
let mut engine = LeaderEngine::new(2, schedule);
engine.seed(100);
assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
}
#[test]
fn large_jump_refills_from_current_slot() {
let schedule = make_schedule(100, &[1, 2, 3, 4, 5, 6]);
let mut engine = LeaderEngine::new(2, schedule);
engine.seed(100);
assert_eq!(
engine.on_slot(116),
EngineEvent::ScheduleRefreshSuggested { slot: 116 }
);
assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 5);
assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 6);
assert!(!engine.buffer.read(2).is_valid());
}
#[test]
fn uncovered_slot_requests_refresh() {
let schedule = make_schedule(100, &[1, 2]);
let mut engine = LeaderEngine::new(1, schedule);
engine.seed(100);
assert_eq!(
engine.on_slot(108),
EngineEvent::NeedScheduleRefresh { slot: 108 }
);
assert!(!engine.buffer.read(0).is_valid());
assert_eq!(engine.buffer.current_slot(), 108);
}
#[test]
fn replace_schedule_recovers_empty_buffer() {
let schedule = make_schedule(100, &[1, 2]);
let mut engine = LeaderEngine::new(1, schedule);
engine.seed(100);
engine.on_slot(108);
let replacement = make_schedule(108, &[9, 10, 11]);
assert_eq!(
engine.replace_schedule(replacement),
EngineEvent::SlotAdvanced { slot: 108 }
);
assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 9);
assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 10);
}
#[test]
fn seed_near_epoch_end_suggests_refresh_once() {
let schedule = make_schedule(100, &[1, 2]);
let mut engine = LeaderEngine::new(2, schedule);
assert_eq!(
engine.seed(100),
EngineEvent::ScheduleRefreshSuggested { slot: 100 }
);
assert!(!engine.buffer.read(2).is_valid());
let replacement = make_schedule(100, &[1, 2, 3]);
assert_eq!(
engine.replace_schedule(replacement),
EngineEvent::SlotAdvanced { slot: 100 }
);
}
#[test]
fn replace_with_still_partial_schedule_resuggests_refresh() {
let schedule = make_schedule(100, &[1, 2]);
let mut engine = LeaderEngine::new(2, schedule.clone());
assert_eq!(
engine.seed(100),
EngineEvent::ScheduleRefreshSuggested { slot: 100 }
);
assert_eq!(
engine.replace_schedule(schedule),
EngineEvent::ScheduleRefreshSuggested { slot: 100 }
);
}
#[test]
fn leader_boundary_shift_suggests_refresh_when_tail_falls_off_schedule() {
let schedule = make_schedule(100, &[1, 2, 3]);
let mut engine = LeaderEngine::new(2, schedule);
assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
assert_eq!(
engine.on_slot(104),
EngineEvent::ScheduleRefreshSuggested { slot: 104 }
);
assert!(!engine.buffer.read(2).is_valid());
assert_eq!(engine.on_slot(105), EngineEvent::SlotAdvanced { slot: 105 });
}
#[test]
fn slot_rollback_refills_from_current_slot() {
let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
let mut engine = LeaderEngine::new(2, schedule);
engine.seed(108);
assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
}
}