solana-leader 0.4.0

solana leader library
Documentation
use crate::leader_buffer::{LeaderBuffer, SLOTS_PER_LEADER};
use crate::leader_entry::LeaderEntry;
use crate::schedule::ScheduleSnapshot;
use std::sync::Arc;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum EngineEvent {
    NoChange,
    SlotAdvanced { slot: u64 },
    ScheduleRefreshSuggested { slot: u64 },
    NeedScheduleRefresh { slot: u64 },
}

pub struct LeaderEngine {
    buffer: Arc<LeaderBuffer>,
    active_schedule: ScheduleSnapshot,
    scratch: Box<[LeaderEntry]>,
    last_slot: u64,
    last_leader_index: u64,
    refresh_hint_active: bool,
}

impl LeaderEngine {
    #[inline]
    pub fn new(leaders_ahead: usize, initial_schedule: ScheduleSnapshot) -> Self {
        Self::from_buffer(Arc::new(LeaderBuffer::new(leaders_ahead)), initial_schedule)
    }

    #[inline]
    pub fn from_buffer(buffer: Arc<LeaderBuffer>, initial_schedule: ScheduleSnapshot) -> Self {
        Self {
            scratch: vec![LeaderEntry::EMPTY; buffer.len()].into_boxed_slice(),
            buffer,
            active_schedule: initial_schedule,
            last_slot: 0,
            last_leader_index: 0,
            refresh_hint_active: false,
        }
    }

    #[inline]
    pub fn buffer(&self) -> Arc<LeaderBuffer> {
        Arc::clone(&self.buffer)
    }

    #[inline]
    pub fn active_schedule(&self) -> &ScheduleSnapshot {
        &self.active_schedule
    }

    #[inline]
    pub fn last_slot(&self) -> u64 {
        self.last_slot
    }

    pub fn seed(&mut self, current_slot: u64) -> EngineEvent {
        if current_slot == 0 {
            return EngineEvent::NoChange;
        }

        self.last_slot = current_slot;
        self.last_leader_index = current_slot / SLOTS_PER_LEADER;
        self.refill_for_slot(current_slot)
    }

    pub fn on_slot(&mut self, current_slot: u64) -> EngineEvent {
        if current_slot == 0 || current_slot == self.last_slot {
            return EngineEvent::NoChange;
        }

        let current_leader_index = current_slot / SLOTS_PER_LEADER;
        let event = if current_leader_index == self.last_leader_index {
            self.buffer.set_current_slot(current_slot);
            EngineEvent::SlotAdvanced { slot: current_slot }
        } else if current_slot < self.last_slot || current_leader_index < self.last_leader_index {
            self.refill_for_slot(current_slot)
        } else if !self.active_schedule.covers_slot(current_slot) {
            self.refresh_hint_active = false;
            self.buffer.clear(current_slot);
            EngineEvent::NeedScheduleRefresh { slot: current_slot }
        } else {
            let leaders_passed = (current_leader_index - self.last_leader_index) as usize;
            if leaders_passed >= self.buffer.len() {
                self.refill_for_slot(current_slot)
            } else {
                let lookahead_slot = (current_leader_index
                    + (self.buffer.len() - leaders_passed) as u64)
                    * SLOTS_PER_LEADER;
                let written = self
                    .active_schedule
                    .get_next_leaders_into(lookahead_slot, &mut self.scratch[..leaders_passed]);
                self.buffer
                    .shift_multiple(leaders_passed, &self.scratch[..written], current_slot);
                self.slot_advanced_event(current_slot, written == leaders_passed)
            }
        };

        self.last_slot = current_slot;
        self.last_leader_index = current_leader_index;
        event
    }

    pub fn replace_schedule(&mut self, schedule: ScheduleSnapshot) -> EngineEvent {
        self.active_schedule = schedule;
        self.refresh_hint_active = false;
        if self.last_slot == 0 {
            return EngineEvent::NoChange;
        }

        self.refill_for_slot(self.last_slot)
    }

    fn refill_for_slot(&mut self, slot: u64) -> EngineEvent {
        if !self.active_schedule.covers_slot(slot) {
            self.refresh_hint_active = false;
            self.buffer.clear(slot);
            return EngineEvent::NeedScheduleRefresh { slot };
        }

        let written = self
            .active_schedule
            .get_next_leaders_into(slot, &mut self.scratch);
        self.buffer.update(slot, &self.scratch[..written]);
        self.slot_advanced_event(slot, written == self.buffer.len())
    }

    #[inline]
    fn slot_advanced_event(&mut self, slot: u64, lookahead_complete: bool) -> EngineEvent {
        if lookahead_complete {
            self.refresh_hint_active = false;
            return EngineEvent::SlotAdvanced { slot };
        }

        if self.refresh_hint_active {
            return EngineEvent::SlotAdvanced { slot };
        }

        self.refresh_hint_active = true;
        EngineEvent::ScheduleRefreshSuggested { slot }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::leader_entry::LeaderPubkey;

    fn make_entry(id: u8) -> LeaderEntry {
        LeaderEntry::new_ipv4(
            LeaderPubkey::new([id; 32]),
            [127, 0, 0, id],
            8000,
            [127, 0, 0, id],
            8001,
        )
    }

    fn make_schedule(epoch_start_slot: u64, ids: &[u8]) -> ScheduleSnapshot {
        let leaders = ids
            .iter()
            .map(|id| make_entry(*id))
            .collect::<Vec<_>>()
            .into_boxed_slice();
        ScheduleSnapshot::new(1, epoch_start_slot, leaders)
    }

    #[test]
    fn seed_populates_buffer() {
        let schedule = make_schedule(100, &[1, 2, 3, 4]);
        let mut engine = LeaderEngine::new(2, schedule);

        assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 1);
        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 2);
        assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 3);
    }

    #[test]
    fn same_slot_is_no_change() {
        let schedule = make_schedule(100, &[1, 2, 3]);
        let mut engine = LeaderEngine::new(2, schedule);
        engine.seed(100);

        assert_eq!(engine.on_slot(100), EngineEvent::NoChange);
    }

    #[test]
    fn leader_boundary_shift_updates_tail() {
        let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
        let mut engine = LeaderEngine::new(2, schedule);
        engine.seed(100);

        assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
        assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
    }

    #[test]
    fn large_jump_refills_from_current_slot() {
        let schedule = make_schedule(100, &[1, 2, 3, 4, 5, 6]);
        let mut engine = LeaderEngine::new(2, schedule);
        engine.seed(100);

        assert_eq!(
            engine.on_slot(116),
            EngineEvent::ScheduleRefreshSuggested { slot: 116 }
        );
        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 5);
        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 6);
        assert!(!engine.buffer.read(2).is_valid());
    }

    #[test]
    fn uncovered_slot_requests_refresh() {
        let schedule = make_schedule(100, &[1, 2]);
        let mut engine = LeaderEngine::new(1, schedule);
        engine.seed(100);

        assert_eq!(
            engine.on_slot(108),
            EngineEvent::NeedScheduleRefresh { slot: 108 }
        );
        assert!(!engine.buffer.read(0).is_valid());
        assert_eq!(engine.buffer.current_slot(), 108);
    }

    #[test]
    fn replace_schedule_recovers_empty_buffer() {
        let schedule = make_schedule(100, &[1, 2]);
        let mut engine = LeaderEngine::new(1, schedule);
        engine.seed(100);
        engine.on_slot(108);

        let replacement = make_schedule(108, &[9, 10, 11]);
        assert_eq!(
            engine.replace_schedule(replacement),
            EngineEvent::SlotAdvanced { slot: 108 }
        );
        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 9);
        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 10);
    }

    #[test]
    fn seed_near_epoch_end_suggests_refresh_once() {
        let schedule = make_schedule(100, &[1, 2]);
        let mut engine = LeaderEngine::new(2, schedule);

        assert_eq!(
            engine.seed(100),
            EngineEvent::ScheduleRefreshSuggested { slot: 100 }
        );
        assert!(!engine.buffer.read(2).is_valid());
        let replacement = make_schedule(100, &[1, 2, 3]);
        assert_eq!(
            engine.replace_schedule(replacement),
            EngineEvent::SlotAdvanced { slot: 100 }
        );
    }

    #[test]
    fn replace_with_still_partial_schedule_resuggests_refresh() {
        let schedule = make_schedule(100, &[1, 2]);
        let mut engine = LeaderEngine::new(2, schedule.clone());

        assert_eq!(
            engine.seed(100),
            EngineEvent::ScheduleRefreshSuggested { slot: 100 }
        );
        assert_eq!(
            engine.replace_schedule(schedule),
            EngineEvent::ScheduleRefreshSuggested { slot: 100 }
        );
    }

    #[test]
    fn leader_boundary_shift_suggests_refresh_when_tail_falls_off_schedule() {
        let schedule = make_schedule(100, &[1, 2, 3]);
        let mut engine = LeaderEngine::new(2, schedule);

        assert_eq!(engine.seed(100), EngineEvent::SlotAdvanced { slot: 100 });
        assert_eq!(
            engine.on_slot(104),
            EngineEvent::ScheduleRefreshSuggested { slot: 104 }
        );
        assert!(!engine.buffer.read(2).is_valid());
        assert_eq!(engine.on_slot(105), EngineEvent::SlotAdvanced { slot: 105 });
    }

    #[test]
    fn slot_rollback_refills_from_current_slot() {
        let schedule = make_schedule(100, &[1, 2, 3, 4, 5]);
        let mut engine = LeaderEngine::new(2, schedule);
        engine.seed(108);

        assert_eq!(engine.on_slot(104), EngineEvent::SlotAdvanced { slot: 104 });
        assert_eq!(engine.buffer.read(0).pubkey.to_bytes()[0], 2);
        assert_eq!(engine.buffer.read(1).pubkey.to_bytes()[0], 3);
        assert_eq!(engine.buffer.read(2).pubkey.to_bytes()[0], 4);
    }
}