use crate::rpc::{FetcherError, ScheduleFetcher};
use crate::slot::{SlotCursor, SlotSourceConfig, SlotSubscriber, SlotSubscriberError};
use crate::{DEFAULT_LEADERS_AHEAD, EngineEvent, LeaderBuffer, LeaderEngine, ScheduleSnapshot};
use cpu::{cpu_pause, set_current_thread_affinity};
use std::fmt;
use std::ptr;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering};
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct ManagedTrackerConfig {
pub rpc_url: String,
pub slot_source: SlotSourceConfig,
pub schedule_refresh_interval: Duration,
pub connect_timeout: Duration,
pub reconnect_backoff: Duration,
pub rpc_timeout: Duration,
pub leaders_ahead: usize,
pub tracker_cpu_core: Option<usize>,
pub refresh_cpu_core: Option<usize>,
pub slot_source_cpu_core: Option<usize>,
}
impl ManagedTrackerConfig {
pub fn localnet() -> Self {
Self {
rpc_url: "http://127.0.0.1:8899".to_string(),
slot_source: SlotSourceConfig::WebSocket {
url: "ws://127.0.0.1:8900".to_string(),
},
schedule_refresh_interval: Duration::from_secs(600),
connect_timeout: Duration::from_secs(10),
reconnect_backoff: Duration::from_millis(250),
rpc_timeout: Duration::from_secs(5),
leaders_ahead: DEFAULT_LEADERS_AHEAD,
tracker_cpu_core: None,
refresh_cpu_core: None,
slot_source_cpu_core: None,
}
}
pub fn custom(rpc_url: impl Into<String>, slot_source: SlotSourceConfig) -> Self {
let mut config = Self::localnet();
config.rpc_url = rpc_url.into();
config.slot_source = slot_source;
config
}
#[inline]
pub fn with_rpc_url(mut self, rpc_url: impl Into<String>) -> Self {
self.rpc_url = rpc_url.into();
self
}
#[inline]
pub fn with_slot_source(mut self, slot_source: SlotSourceConfig) -> Self {
self.slot_source = slot_source;
self
}
#[inline]
pub fn with_leaders_ahead(mut self, leaders_ahead: usize) -> Self {
self.leaders_ahead = leaders_ahead;
self
}
#[inline]
pub fn with_refresh_interval(mut self, interval: Duration) -> Self {
self.schedule_refresh_interval = interval;
self
}
#[inline]
pub fn with_tracker_cpu_core(mut self, core: usize) -> Self {
self.tracker_cpu_core = Some(core);
self
}
#[inline]
pub fn with_refresh_cpu_core(mut self, core: usize) -> Self {
self.refresh_cpu_core = Some(core);
self
}
#[inline]
pub fn with_slot_source_cpu_core(mut self, core: usize) -> Self {
self.slot_source_cpu_core = Some(core);
self
}
#[inline]
pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
self.connect_timeout = timeout;
self
}
#[inline]
pub fn with_reconnect_backoff(mut self, backoff: Duration) -> Self {
self.reconnect_backoff = backoff;
self
}
#[inline]
pub fn with_rpc_timeout(mut self, timeout: Duration) -> Self {
self.rpc_timeout = timeout;
self
}
}
impl Default for ManagedTrackerConfig {
fn default() -> Self {
Self::localnet()
}
}
pub type Config = ManagedTrackerConfig;
pub struct ManagedTracker {
config: ManagedTrackerConfig,
buffer: Arc<LeaderBuffer>,
running: Arc<AtomicBool>,
pending_schedule: Arc<ScheduleMailbox>,
refresh_requested: Arc<AtomicBool>,
subscriber: Option<SlotSubscriber>,
refresh_waker: Option<thread::Thread>,
tracker_thread: Option<thread::JoinHandle<()>>,
refresh_thread: Option<thread::JoinHandle<()>>,
}
pub type LeaderTracker = ManagedTracker;
impl ManagedTracker {
pub fn new(config: ManagedTrackerConfig) -> Self {
Self {
buffer: Arc::new(LeaderBuffer::new(config.leaders_ahead)),
config,
running: Arc::new(AtomicBool::new(false)),
pending_schedule: Arc::new(ScheduleMailbox::new()),
refresh_requested: Arc::new(AtomicBool::new(false)),
subscriber: None,
refresh_waker: None,
tracker_thread: None,
refresh_thread: None,
}
}
#[inline]
pub fn buffer(&self) -> Arc<LeaderBuffer> {
Arc::clone(&self.buffer)
}
pub fn start(&mut self) -> Result<(), TrackerError> {
if self.running.swap(true, Ordering::AcqRel) {
return Err(TrackerError::AlreadyRunning);
}
self.reset_runtime_state();
let result = (|| {
let fetcher = Arc::new(ScheduleFetcher::new(
&self.config.rpc_url,
self.config.rpc_timeout,
)?);
let mut schedule = fetcher.fetch_current()?;
let mut subscriber = SlotSubscriber::new(
self.config.slot_source.clone(),
self.config.slot_source_cpu_core,
self.config.connect_timeout,
self.config.reconnect_backoff,
)?;
subscriber.start()?;
if !subscriber.wait_for_connection(self.config.connect_timeout) {
subscriber.stop();
return Err(TrackerError::ConnectionTimeout);
}
let Some(initial_slot) = wait_for_first_slot(&subscriber, self.config.connect_timeout)
else {
subscriber.stop();
return Err(TrackerError::NoSlotReceived);
};
if !schedule.covers_slot(initial_slot) {
schedule = fetcher.fetch_current()?;
}
let mut engine = LeaderEngine::from_buffer(Arc::clone(&self.buffer), schedule);
if matches!(
engine.seed(initial_slot),
EngineEvent::ScheduleRefreshSuggested { .. }
| EngineEvent::NeedScheduleRefresh { .. }
) {
self.refresh_requested.store(true, Ordering::Release);
}
self.subscriber = Some(subscriber);
let refresh_thread = spawn_refresh_thread(
Arc::clone(&fetcher),
Arc::clone(&self.pending_schedule),
Arc::clone(&self.refresh_requested),
Arc::clone(&self.running),
self.config.schedule_refresh_interval,
self.config.refresh_cpu_core,
)?;
self.refresh_waker = Some(refresh_thread.thread().clone());
self.refresh_thread = Some(refresh_thread);
let tracker_thread = spawn_tracker_thread(
engine,
self.subscriber
.as_ref()
.expect("subscriber stored before tracker spawn")
.cursor(),
Arc::clone(&self.pending_schedule),
Arc::clone(&self.refresh_requested),
Arc::clone(&self.running),
self.refresh_waker.as_ref().cloned(),
self.config.tracker_cpu_core,
)?;
self.tracker_thread = Some(tracker_thread);
Ok(())
})();
if result.is_err() {
self.running.store(false, Ordering::Release);
self.shutdown_started_components();
}
result
}
pub fn stop(&mut self) {
if !self.running.swap(false, Ordering::AcqRel) {
return;
}
self.shutdown_started_components();
}
fn reset_runtime_state(&mut self) {
self.buffer.clear(0);
self.refresh_requested.store(false, Ordering::Release);
let _ = self.pending_schedule.take();
self.refresh_waker = None;
self.subscriber = None;
self.tracker_thread = None;
self.refresh_thread = None;
}
fn shutdown_started_components(&mut self) {
if let Some(waker) = &self.refresh_waker {
waker.unpark();
}
if let Some(subscriber) = self.subscriber.as_mut() {
subscriber.stop();
}
if let Some(thread) = self.tracker_thread.take() {
let _ = thread.join();
}
if let Some(thread) = self.refresh_thread.take() {
let _ = thread.join();
}
self.buffer.clear(0);
self.refresh_requested.store(false, Ordering::Release);
let _ = self.pending_schedule.take();
self.refresh_waker = None;
self.subscriber = None;
}
}
impl Drop for LeaderTracker {
fn drop(&mut self) {
self.stop();
}
}
fn spawn_refresh_thread(
fetcher: Arc<ScheduleFetcher>,
pending_schedule: Arc<ScheduleMailbox>,
refresh_requested: Arc<AtomicBool>,
running: Arc<AtomicBool>,
interval: Duration,
cpu_core: Option<usize>,
) -> Result<thread::JoinHandle<()>, TrackerError> {
thread::Builder::new()
.name("leader-refresh".to_string())
.spawn(move || {
if let Some(core) = cpu_core {
let _ = set_current_thread_affinity([core]);
}
let mut last_refresh = Instant::now();
while running.load(Ordering::Acquire) {
let elapsed = last_refresh.elapsed();
let wait_time = interval.saturating_sub(elapsed);
if !refresh_requested.load(Ordering::Acquire) && wait_time > Duration::ZERO {
thread::park_timeout(wait_time);
}
if !running.load(Ordering::Acquire) {
break;
}
let requested = refresh_requested.swap(false, Ordering::AcqRel);
if !requested && last_refresh.elapsed() < interval {
continue;
}
if let Ok(schedule) = fetcher.fetch_current() {
pending_schedule.store(schedule);
last_refresh = Instant::now();
}
}
})
.map_err(TrackerError::ThreadSpawn)
}
fn spawn_tracker_thread(
mut engine: LeaderEngine,
slot_cursor: SlotCursor,
pending_schedule: Arc<ScheduleMailbox>,
refresh_requested: Arc<AtomicBool>,
running: Arc<AtomicBool>,
refresh_waker: Option<thread::Thread>,
cpu_core: Option<usize>,
) -> Result<thread::JoinHandle<()>, TrackerError> {
thread::Builder::new()
.name("leader-tracker".to_string())
.spawn(move || {
if let Some(core) = cpu_core {
let _ = set_current_thread_affinity([core]);
}
while running.load(Ordering::Acquire) {
if let Some(schedule) = pending_schedule.take()
&& matches!(
engine.replace_schedule(schedule),
EngineEvent::ScheduleRefreshSuggested { .. }
| EngineEvent::NeedScheduleRefresh { .. }
)
{
request_refresh(&refresh_requested, refresh_waker.as_ref());
}
let current_slot = slot_cursor.load_slot();
match engine.on_slot(current_slot) {
EngineEvent::ScheduleRefreshSuggested { .. }
| EngineEvent::NeedScheduleRefresh { .. } => {
request_refresh(&refresh_requested, refresh_waker.as_ref());
}
EngineEvent::NoChange | EngineEvent::SlotAdvanced { .. } => {}
}
cpu_pause();
}
})
.map_err(TrackerError::ThreadSpawn)
}
fn request_refresh(flag: &AtomicBool, refresh_waker: Option<&thread::Thread>) {
flag.store(true, Ordering::Release);
if let Some(waker) = refresh_waker {
waker.unpark();
}
}
fn wait_for_first_slot(subscriber: &SlotSubscriber, timeout: Duration) -> Option<u64> {
let deadline = Instant::now() + timeout;
while Instant::now() < deadline {
let slot = subscriber.load_slot();
if slot > 0 {
return Some(slot);
}
cpu_pause();
}
None
}
struct ScheduleMailbox {
slot: AtomicPtr<ScheduleSnapshot>,
}
impl ScheduleMailbox {
fn new() -> Self {
Self {
slot: AtomicPtr::new(ptr::null_mut()),
}
}
fn store(&self, schedule: ScheduleSnapshot) {
let ptr = Box::into_raw(Box::new(schedule));
let old = self.slot.swap(ptr, Ordering::AcqRel);
if !old.is_null() {
unsafe {
drop(Box::from_raw(old));
}
}
}
fn take(&self) -> Option<ScheduleSnapshot> {
let ptr = self.slot.swap(ptr::null_mut(), Ordering::AcqRel);
if ptr.is_null() {
return None;
}
let boxed = unsafe { Box::from_raw(ptr) };
Some(*boxed)
}
}
impl Drop for ScheduleMailbox {
fn drop(&mut self) {
let ptr = self.slot.load(Ordering::Acquire);
if !ptr.is_null() {
unsafe {
drop(Box::from_raw(ptr));
}
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum TrackerError {
AlreadyRunning,
Fetcher(FetcherError),
SlotSource(SlotSubscriberError),
ThreadSpawn(std::io::Error),
ConnectionTimeout,
NoSlotReceived,
}
impl fmt::Display for TrackerError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::AlreadyRunning => f.write_str("leader tracker is already running"),
Self::Fetcher(error) => write!(f, "{error}"),
Self::SlotSource(error) => write!(f, "{error}"),
Self::ThreadSpawn(error) => write!(f, "{error}"),
Self::ConnectionTimeout => f.write_str("slot source connection timed out"),
Self::NoSlotReceived => f.write_str("no slot received before timeout"),
}
}
}
impl std::error::Error for TrackerError {}
impl From<FetcherError> for TrackerError {
fn from(error: FetcherError) -> Self {
Self::Fetcher(error)
}
}
impl From<SlotSubscriberError> for TrackerError {
fn from(error: SlotSubscriberError) -> Self {
Self::SlotSource(error)
}
}