use crate::leader_entry::{LeaderEntry, LeaderPubkey};
use cpu::{CachePadded, cpu_pause, fence_acquire};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering, fence};
pub const DEFAULT_LEADERS_AHEAD: usize = 9;
pub const SLOTS_PER_LEADER: u64 = 4;
#[repr(C)]
pub struct LeaderBuffer {
version: CachePadded<AtomicU64>,
len: usize,
entries: Box<[AtomicLeaderEntry]>,
current_slot: CachePadded<AtomicU64>,
}
#[repr(C, align(64))]
struct AtomicLeaderEntry {
words: [AtomicU64; 8],
}
impl AtomicLeaderEntry {
fn new(entry: LeaderEntry) -> Self {
Self {
words: pack_entry(entry).map(AtomicU64::new),
}
}
#[inline]
fn load(&self) -> LeaderEntry {
unpack_entry([
self.words[0].load(Ordering::Relaxed),
self.words[1].load(Ordering::Relaxed),
self.words[2].load(Ordering::Relaxed),
self.words[3].load(Ordering::Relaxed),
self.words[4].load(Ordering::Relaxed),
self.words[5].load(Ordering::Relaxed),
self.words[6].load(Ordering::Relaxed),
self.words[7].load(Ordering::Relaxed),
])
}
#[inline]
fn store(&self, entry: LeaderEntry) {
let words = pack_entry(entry);
for (slot, word) in self.words.iter().zip(words) {
slot.store(word, Ordering::Relaxed);
}
}
}
impl LeaderBuffer {
#[inline]
pub fn new(leaders_ahead: usize) -> Self {
let len = leaders_ahead.saturating_add(1).max(1);
let entries = (0..len)
.map(|_| AtomicLeaderEntry::new(LeaderEntry::EMPTY))
.collect::<Vec<_>>()
.into_boxed_slice();
Self {
version: CachePadded::new(AtomicU64::new(0)),
len,
entries,
current_slot: CachePadded::new(AtomicU64::new(0)),
}
}
#[inline]
pub fn len(&self) -> usize {
self.len
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len == 0
}
#[inline]
pub fn leaders_ahead(&self) -> usize {
self.len.saturating_sub(1)
}
#[inline]
fn begin_write(&self) {
self.version.fetch_add(1, Ordering::Relaxed);
fence(Ordering::Release);
}
#[inline]
fn end_write(&self) {
self.version.fetch_add(1, Ordering::Release);
}
#[inline]
pub fn try_read(&self, position: usize) -> Option<LeaderEntry> {
if position >= self.len {
return None;
}
let version_before = self.version.load(Ordering::Acquire);
if version_before & 1 != 0 {
return None;
}
let entry = self.entries[position].load();
fence_acquire();
let version_after = self.version.load(Ordering::Acquire);
if version_before != version_after {
return None;
}
Some(entry)
}
#[inline]
pub fn read(&self, position: usize) -> LeaderEntry {
loop {
if let Some(entry) = self.try_read(position) {
return entry;
}
cpu_pause();
}
}
#[inline]
pub fn try_copy_into(&self, output: &mut [LeaderEntry]) -> Option<usize> {
let version_before = self.version.load(Ordering::Acquire);
if version_before & 1 != 0 {
return None;
}
let copy_len = output.len().min(self.len);
for (index, slot) in self.entries.iter().take(copy_len).enumerate() {
output[index] = slot.load();
}
fence_acquire();
let version_after = self.version.load(Ordering::Acquire);
if version_before != version_after {
return None;
}
Some(copy_len)
}
#[inline]
pub fn copy_into(&self, output: &mut [LeaderEntry]) -> usize {
loop {
if let Some(count) = self.try_copy_into(output) {
return count;
}
cpu_pause();
}
}
#[inline]
pub fn current_slot(&self) -> u64 {
self.current_slot.load(Ordering::Acquire)
}
#[inline]
pub fn set_current_slot(&self, slot: u64) {
self.current_slot.store(slot, Ordering::Release);
}
pub fn clear(&self, new_slot: u64) {
let base_slot = align_slot(new_slot);
self.begin_write();
for (index, entry) in self.entries.iter().enumerate() {
let mut value = LeaderEntry::EMPTY;
value.set_start_slot(base_slot + (index as u64) * SLOTS_PER_LEADER);
entry.store(value);
}
self.current_slot.store(new_slot, Ordering::Release);
self.end_write();
}
pub fn update(&self, new_slot: u64, new_entries: &[LeaderEntry]) {
let base_slot = align_slot(new_slot);
let copy_len = new_entries.len().min(self.len);
self.begin_write();
for (index, entry) in self.entries.iter().enumerate().take(copy_len) {
let mut value = new_entries[index];
value.set_start_slot(base_slot + (index as u64) * SLOTS_PER_LEADER);
entry.store(value);
}
for (index, entry) in self.entries.iter().enumerate().skip(copy_len) {
let mut value = LeaderEntry::EMPTY;
value.set_start_slot(base_slot + (index as u64) * SLOTS_PER_LEADER);
entry.store(value);
}
self.current_slot.store(new_slot, Ordering::Release);
self.end_write();
}
pub fn shift_multiple(&self, count: usize, new_leaders: &[LeaderEntry], new_slot: u64) {
if count == 0 {
self.set_current_slot(new_slot);
return;
}
if count >= self.len {
self.update(new_slot, new_leaders);
return;
}
let base_slot = align_slot(new_slot);
self.begin_write();
for index in 0..(self.len - count) {
let mut value = self.entries[index + count].load();
value.set_start_slot(base_slot + (index as u64) * SLOTS_PER_LEADER);
self.entries[index].store(value);
}
let start = self.len - count;
for (index, value) in new_leaders
.iter()
.copied()
.chain(std::iter::repeat(LeaderEntry::EMPTY))
.take(count)
.enumerate()
{
let mut value = value;
value.set_start_slot(base_slot + ((start + index) as u64) * SLOTS_PER_LEADER);
self.entries[start + index].store(value);
}
self.current_slot.store(new_slot, Ordering::Release);
self.end_write();
}
}
impl Default for LeaderBuffer {
fn default() -> Self {
Self::new(DEFAULT_LEADERS_AHEAD)
}
}
pub trait LeaderLookup: Send + Sync {
fn get_leader(&self, index: usize) -> Option<(SocketAddr, SocketAddr)>;
fn is_valid(&self, index: usize) -> bool;
fn get_leader_slots(&self, index: usize) -> Option<(u64, u64)>;
}
impl<T: LeaderLookup> LeaderLookup for std::sync::Arc<T> {
#[inline]
fn get_leader(&self, index: usize) -> Option<(SocketAddr, SocketAddr)> {
(**self).get_leader(index)
}
#[inline]
fn is_valid(&self, index: usize) -> bool {
(**self).is_valid(index)
}
#[inline]
fn get_leader_slots(&self, index: usize) -> Option<(u64, u64)> {
(**self).get_leader_slots(index)
}
}
impl LeaderLookup for LeaderBuffer {
#[inline]
fn get_leader(&self, index: usize) -> Option<(SocketAddr, SocketAddr)> {
let entry = self.try_read(index)?;
if entry.is_valid() {
Some((entry.tpu_quic(), entry.tpu_quic_fwd()))
} else {
None
}
}
#[inline]
fn is_valid(&self, index: usize) -> bool {
self.try_read(index).is_some_and(|entry| entry.is_valid())
}
#[inline]
fn get_leader_slots(&self, index: usize) -> Option<(u64, u64)> {
let entry = self.try_read(index)?;
Some((entry.start_slot(), entry.end_slot()))
}
}
#[inline]
fn align_slot(slot: u64) -> u64 {
(slot / SLOTS_PER_LEADER) * SLOTS_PER_LEADER
}
#[inline]
fn pack_entry(entry: LeaderEntry) -> [u64; 8] {
let pubkey = entry.pubkey.to_bytes();
let (tpu_quic_ip, tpu_quic_port) = entry.tpu_quic_parts();
let (tpu_quic_fwd_ip, tpu_quic_fwd_port) = entry.tpu_quic_fwd_parts();
[
pack_pubkey_word(&pubkey, 0),
pack_pubkey_word(&pubkey, 8),
pack_pubkey_word(&pubkey, 16),
pack_pubkey_word(&pubkey, 24),
pack_ip_port(tpu_quic_ip, tpu_quic_port),
pack_ip_port(tpu_quic_fwd_ip, tpu_quic_fwd_port),
entry.start_slot(),
0,
]
}
#[inline]
fn unpack_entry(words: [u64; 8]) -> LeaderEntry {
let mut pubkey = [0u8; 32];
unpack_pubkey_word(words[0], &mut pubkey, 0);
unpack_pubkey_word(words[1], &mut pubkey, 8);
unpack_pubkey_word(words[2], &mut pubkey, 16);
unpack_pubkey_word(words[3], &mut pubkey, 24);
let (tpu_quic_ip, tpu_quic_port) = unpack_ip_port(words[4]);
let (tpu_quic_fwd_ip, tpu_quic_fwd_port) = unpack_ip_port(words[5]);
let mut entry = LeaderEntry::new_ipv4(
LeaderPubkey::new(pubkey),
tpu_quic_ip,
tpu_quic_port,
tpu_quic_fwd_ip,
tpu_quic_fwd_port,
);
entry.set_start_slot(words[6]);
entry
}
#[inline]
fn pack_pubkey_word(pubkey: &[u8; 32], offset: usize) -> u64 {
let mut bytes = [0u8; 8];
bytes.copy_from_slice(&pubkey[offset..offset + 8]);
u64::from_le_bytes(bytes)
}
#[inline]
fn unpack_pubkey_word(word: u64, pubkey: &mut [u8; 32], offset: usize) {
pubkey[offset..offset + 8].copy_from_slice(&word.to_le_bytes());
}
#[inline]
fn pack_ip_port(ip: [u8; 4], port: u16) -> u64 {
u64::from_le_bytes([
ip[0],
ip[1],
ip[2],
ip[3],
port.to_le_bytes()[0],
port.to_le_bytes()[1],
0,
0,
])
}
#[inline]
fn unpack_ip_port(word: u64) -> ([u8; 4], u16) {
let bytes = word.to_le_bytes();
(
[bytes[0], bytes[1], bytes[2], bytes[3]],
u16::from_le_bytes([bytes[4], bytes[5]]),
)
}
#[cfg(test)]
mod tests {
use super::*;
fn make_entry(id: u8) -> LeaderEntry {
LeaderEntry::new_ipv4(
LeaderPubkey::new([id; 32]),
[127, 0, 0, id],
8000,
[127, 0, 0, id],
8001,
)
}
#[test]
fn buffer_respects_runtime_len() {
let buffer = LeaderBuffer::new(3);
assert_eq!(buffer.len(), 4);
assert_eq!(buffer.leaders_ahead(), 3);
}
#[test]
fn packed_entry_round_trips() {
let mut entry = make_entry(9);
entry.set_start_slot(104);
let packed = pack_entry(entry);
let unpacked = unpack_entry(packed);
assert_eq!(unpacked.pubkey, entry.pubkey);
assert_eq!(unpacked.tpu_quic(), entry.tpu_quic());
assert_eq!(unpacked.tpu_quic_fwd(), entry.tpu_quic_fwd());
assert_eq!(unpacked.start_slot(), entry.start_slot());
}
#[test]
fn update_and_copy() {
let buffer = LeaderBuffer::new(2);
let entries = [make_entry(1), make_entry(2), make_entry(3)];
buffer.update(100, &entries);
let mut output = [LeaderEntry::EMPTY; 3];
let copied = buffer.copy_into(&mut output);
assert_eq!(copied, 3);
assert_eq!(output[0].pubkey.to_bytes()[0], 1);
assert_eq!(output[1].pubkey.to_bytes()[0], 2);
assert_eq!(output[2].pubkey.to_bytes()[0], 3);
assert_eq!(output[0].start_slot(), 100);
assert_eq!(output[1].start_slot(), 104);
assert_eq!(output[2].start_slot(), 108);
}
#[test]
fn shift_multiple_updates_slots() {
let buffer = LeaderBuffer::new(4);
let entries = [
make_entry(1),
make_entry(2),
make_entry(3),
make_entry(4),
make_entry(5),
];
buffer.update(100, &entries);
let next = [make_entry(6), make_entry(7)];
buffer.shift_multiple(2, &next, 108);
assert_eq!(buffer.read(0).pubkey.to_bytes()[0], 3);
assert_eq!(buffer.read(3).pubkey.to_bytes()[0], 6);
assert_eq!(buffer.read(4).pubkey.to_bytes()[0], 7);
assert_eq!(buffer.read(0).start_slot(), 108);
assert_eq!(buffer.read(4).start_slot(), 124);
}
#[test]
fn clear_sets_empty_entries() {
let buffer = LeaderBuffer::new(1);
buffer.clear(200);
assert!(!buffer.read(0).is_valid());
assert_eq!(buffer.read(0).start_slot(), 200);
assert_eq!(buffer.read(1).start_slot(), 204);
}
}