solana-leader 0.4.0

solana leader library
Documentation
use crate::base58::{Base58Error, decode_32, encode_fixed};
use crate::leader_buffer::SLOTS_PER_LEADER;
use std::fmt;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};

pub const CACHE_LINE_SIZE: usize = 64;

/// fixed-size validator identity
#[repr(transparent)]
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default)]
pub struct LeaderPubkey([u8; 32]);

impl LeaderPubkey {
    pub const ZERO: Self = Self([0; 32]);

    #[inline]
    pub const fn new(bytes: [u8; 32]) -> Self {
        Self(bytes)
    }

    #[inline]
    pub fn from_base58(value: &str) -> Result<Self, PubkeyError> {
        decode_32(value).map(Self).map_err(PubkeyError::Base58)
    }

    #[inline]
    pub const fn to_bytes(self) -> [u8; 32] {
        self.0
    }

    #[inline]
    pub const fn as_bytes(&self) -> &[u8; 32] {
        &self.0
    }

    #[inline]
    pub fn to_base58(self) -> String {
        encode_fixed(&self.0)
    }
}

impl fmt::Display for LeaderPubkey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.write_str(&encode_fixed(&self.0))
    }
}

impl fmt::Debug for LeaderPubkey {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Display::fmt(self, f)
    }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum PubkeyError {
    Base58(Base58Error),
}

impl fmt::Display for PubkeyError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Base58(error) => write!(f, "invalid pubkey: {error}"),
        }
    }
}

impl std::error::Error for PubkeyError {}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[non_exhaustive]
pub enum LeaderEntryError {
    UnsupportedAddressFamily,
}

impl fmt::Display for LeaderEntryError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::UnsupportedAddressFamily => {
                f.write_str("leader entry only supports ipv4 or ipv4-mapped ipv6 addresses")
            }
        }
    }
}

impl std::error::Error for LeaderEntryError {}

/// cache-line aligned leader record.
#[repr(C, align(64))]
#[derive(Clone, Copy)]
pub struct LeaderEntry {
    pub pubkey: LeaderPubkey,
    tpu_quic_ip: [u8; 4],
    tpu_quic_port: u16,
    tpu_quic_fwd_ip: [u8; 4],
    tpu_quic_fwd_port: u16,
    start_slot: u64,
}

const _: () = {
    assert!(std::mem::size_of::<LeaderEntry>() == CACHE_LINE_SIZE);
    assert!(std::mem::align_of::<LeaderEntry>() == CACHE_LINE_SIZE);
};

impl LeaderEntry {
    pub const EMPTY: Self = Self {
        pubkey: LeaderPubkey::ZERO,
        tpu_quic_ip: [0; 4],
        tpu_quic_port: 0,
        tpu_quic_fwd_ip: [0; 4],
        tpu_quic_fwd_port: 0,
        start_slot: 0,
    };

    #[inline]
    pub fn new(
        pubkey: LeaderPubkey,
        tpu_quic: SocketAddr,
        tpu_quic_fwd: SocketAddr,
    ) -> Result<Self, LeaderEntryError> {
        let (quic_ip, quic_port) =
            extract_ipv4(tpu_quic).ok_or(LeaderEntryError::UnsupportedAddressFamily)?;
        let (fwd_ip, fwd_port) =
            extract_ipv4(tpu_quic_fwd).ok_or(LeaderEntryError::UnsupportedAddressFamily)?;

        Ok(Self {
            pubkey,
            tpu_quic_ip: quic_ip,
            tpu_quic_port: quic_port,
            tpu_quic_fwd_ip: fwd_ip,
            tpu_quic_fwd_port: fwd_port,
            start_slot: 0,
        })
    }

    #[inline]
    pub fn new_ipv4(
        pubkey: LeaderPubkey,
        tpu_quic_ip: [u8; 4],
        tpu_quic_port: u16,
        tpu_quic_fwd_ip: [u8; 4],
        tpu_quic_fwd_port: u16,
    ) -> Self {
        Self {
            pubkey,
            tpu_quic_ip,
            tpu_quic_port,
            tpu_quic_fwd_ip,
            tpu_quic_fwd_port,
            start_slot: 0,
        }
    }

    #[inline]
    pub fn tpu_quic(&self) -> SocketAddr {
        SocketAddr::V4(SocketAddrV4::new(
            Ipv4Addr::from(self.tpu_quic_ip),
            self.tpu_quic_port,
        ))
    }

    #[inline]
    pub fn tpu_quic_fwd(&self) -> SocketAddr {
        SocketAddr::V4(SocketAddrV4::new(
            Ipv4Addr::from(self.tpu_quic_fwd_ip),
            self.tpu_quic_fwd_port,
        ))
    }

    #[inline]
    pub fn tpu_quic_parts(&self) -> ([u8; 4], u16) {
        (self.tpu_quic_ip, self.tpu_quic_port)
    }

    #[inline]
    pub fn tpu_quic_fwd_parts(&self) -> ([u8; 4], u16) {
        (self.tpu_quic_fwd_ip, self.tpu_quic_fwd_port)
    }

    #[inline]
    pub fn is_valid(&self) -> bool {
        self.tpu_quic_ip != [0; 4]
            && self.tpu_quic_port != 0
            && self.tpu_quic_fwd_ip != [0; 4]
            && self.tpu_quic_fwd_port != 0
    }

    #[inline]
    pub const fn start_slot(&self) -> u64 {
        self.start_slot
    }

    #[inline]
    pub const fn end_slot(&self) -> u64 {
        self.start_slot + (SLOTS_PER_LEADER - 1)
    }

    #[inline]
    pub fn set_start_slot(&mut self, slot: u64) {
        self.start_slot = slot;
    }
}

impl Default for LeaderEntry {
    fn default() -> Self {
        Self::EMPTY
    }
}

impl fmt::Debug for LeaderEntry {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("LeaderEntry")
            .field("pubkey", &self.pubkey)
            .field("tpu_quic", &self.tpu_quic())
            .field("tpu_quic_fwd", &self.tpu_quic_fwd())
            .field("start_slot", &self.start_slot)
            .finish()
    }
}

#[inline]
fn extract_ipv4(addr: SocketAddr) -> Option<([u8; 4], u16)> {
    match addr {
        SocketAddr::V4(v4) => Some((v4.ip().octets(), v4.port())),
        SocketAddr::V6(v6) => v6
            .ip()
            .to_ipv4_mapped()
            .map(|mapped| (mapped.octets(), v6.port())),
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn size_and_alignment() {
        assert_eq!(std::mem::size_of::<LeaderEntry>(), 64);
        assert_eq!(std::mem::align_of::<LeaderEntry>(), 64);
    }

    #[test]
    fn pubkey_round_trip_base58() {
        let bytes = [7u8; 32];
        let pubkey = LeaderPubkey::new(bytes);
        let encoded = pubkey.to_base58();
        let decoded = LeaderPubkey::from_base58(&encoded).unwrap();
        assert_eq!(decoded.to_bytes(), bytes);
    }

    #[test]
    fn entry_creation() {
        let pubkey = LeaderPubkey::new([9; 32]);
        let tpu_quic: SocketAddr = "127.0.0.1:8000".parse().unwrap();
        let tpu_quic_fwd: SocketAddr = "127.0.0.1:8001".parse().unwrap();

        let entry = LeaderEntry::new(pubkey, tpu_quic, tpu_quic_fwd).unwrap();
        assert_eq!(entry.pubkey, pubkey);
        assert_eq!(entry.tpu_quic(), tpu_quic);
        assert_eq!(entry.tpu_quic_fwd(), tpu_quic_fwd);
        assert!(entry.is_valid());
    }

    #[test]
    fn empty_entry_is_invalid() {
        assert!(!LeaderEntry::EMPTY.is_valid());
    }

    #[test]
    fn ipv6_only_entry_is_rejected() {
        let pubkey = LeaderPubkey::new([9; 32]);
        let tpu_quic: SocketAddr = "[2001:db8::1]:8000".parse().unwrap();
        let tpu_quic_fwd: SocketAddr = "[2001:db8::2]:8001".parse().unwrap();

        assert!(matches!(
            LeaderEntry::new(pubkey, tpu_quic, tpu_quic_fwd),
            Err(LeaderEntryError::UnsupportedAddressFamily)
        ));
    }
}