use std::{
collections::{HashMap, HashSet},
time::Instant,
};
use crate::{
config::ValorRuntimeTuning, service::ValorServiceId, types::ValorID,
worker::ValorWorkerCapacity,
};
static MISSED_BEFORE_UNREACHABLE: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(2);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ValorWorkerState {
Joining,
Ready,
Unreachable,
}
#[derive(Debug, Clone)]
pub struct ValorWorkerRecord {
pub capacity: ValorWorkerCapacity,
pub last_seen: Instant,
pub services: Vec<ValorServiceId>,
pub state: ValorWorkerState,
pub last_heartbeat_ms: Option<u64>,
pub last_seq_no: Option<u64>,
pub last_ts_mono_ms: Option<u64>,
pub missed_heartbeat_intervals: u32,
pub unreachable_after_secs: u64,
pub eviction_after_secs: u64,
}
impl ValorWorkerRecord {
pub fn new(now: Instant) -> Self {
Self {
capacity: ValorWorkerCapacity::default(),
last_seen: now,
services: Vec::new(),
state: ValorWorkerState::Joining,
last_heartbeat_ms: None,
last_seq_no: None,
last_ts_mono_ms: None,
missed_heartbeat_intervals: 0,
unreachable_after_secs: 0,
eviction_after_secs: 0,
}
}
}
#[derive(Debug, Default)]
pub struct ValorWorkerRegistry {
workers: HashMap<ValorID, ValorWorkerRecord>,
default_unreachable_after_secs: u64,
default_eviction_after_secs: u64,
}
impl ValorWorkerRegistry {
pub fn new(unreachable_after_secs: u64, eviction_after_secs: u64) -> Self {
Self {
workers: HashMap::new(),
default_unreachable_after_secs: unreachable_after_secs,
default_eviction_after_secs: eviction_after_secs,
}
}
pub fn apply_tuning(&mut self, tuning: &ValorRuntimeTuning) {
MISSED_BEFORE_UNREACHABLE.store(
tuning.missed_before_unreachable,
std::sync::atomic::Ordering::Relaxed,
);
self.default_unreachable_after_secs = tuning.unreachable_after_secs;
self.default_eviction_after_secs = tuning.eviction_after_secs;
}
pub fn on_join(&mut self, id: ValorID) {
let now = Instant::now();
self.workers
.entry(id)
.and_modify(|rec| {
rec.last_seen = now;
rec.state = ValorWorkerState::Joining;
rec.unreachable_after_secs = self.default_unreachable_after_secs;
rec.eviction_after_secs = self.default_eviction_after_secs;
rec.services.clear();
})
.or_insert_with(|| {
let mut rec = ValorWorkerRecord::new(now);
rec.unreachable_after_secs = self.default_unreachable_after_secs;
rec.eviction_after_secs = self.default_eviction_after_secs;
rec
});
}
pub fn on_leave(&mut self, id: &ValorID) {
self.workers.remove(id);
}
pub fn on_heartbeat(&mut self, id: &ValorID, ts_mono_ms: u64, seq_no: u64) -> (bool, u64) {
let now = Instant::now();
let entry = self.workers.entry(id.clone()).or_insert_with(|| {
let mut rec = ValorWorkerRecord::new(now);
rec.unreachable_after_secs = self.default_unreachable_after_secs;
rec.eviction_after_secs = self.default_eviction_after_secs;
rec
});
let prev_state = entry.state;
let gap = if let Some(prev_seq) = entry.last_seq_no {
if seq_no > prev_seq {
seq_no.saturating_sub(prev_seq).saturating_sub(1)
} else {
0
}
} else {
0
};
entry.missed_heartbeat_intervals = entry
.missed_heartbeat_intervals
.saturating_add(u32::try_from(gap).unwrap_or(u32::MAX));
entry.last_seq_no = Some(seq_no);
entry.last_ts_mono_ms = Some(ts_mono_ms);
entry.last_seen = now;
let became_ready_now = prev_state != ValorWorkerState::Ready;
entry.state = ValorWorkerState::Ready;
entry.last_heartbeat_ms = Some(ts_mono_ms);
(became_ready_now, gap)
}
pub fn update_capacity(
&mut self,
id: &ValorID,
capacity: ValorWorkerCapacity,
) -> Option<ValorWorkerCapacity> {
let now = Instant::now();
let entry = self
.workers
.entry(id.clone())
.and_modify(|rec| {
rec.last_seen = now;
rec.state = ValorWorkerState::Ready;
})
.or_insert_with(|| {
let mut rec = ValorWorkerRecord::new(now);
rec.unreachable_after_secs = self.default_unreachable_after_secs;
rec.eviction_after_secs = self.default_eviction_after_secs;
rec
});
let prev = entry.capacity;
entry.capacity = capacity;
Some(prev)
}
pub fn update_services(&mut self, id: &ValorID, services: Vec<ValorServiceId>) {
let now = Instant::now();
self.workers
.entry(id.clone())
.and_modify(|rec| {
rec.last_seen = now;
rec.services = services.clone();
if rec.state == ValorWorkerState::Joining {
rec.state = ValorWorkerState::Ready;
}
})
.or_insert_with(|| ValorWorkerRecord {
capacity: ValorWorkerCapacity::default(),
last_seen: now,
services,
state: ValorWorkerState::Ready,
last_heartbeat_ms: None,
last_seq_no: None,
last_ts_mono_ms: None,
missed_heartbeat_intervals: 0,
unreachable_after_secs: self.default_unreachable_after_secs,
eviction_after_secs: self.default_eviction_after_secs,
});
}
pub fn cleanup_stale_detailed(&mut self) -> (usize, usize, Vec<ValorID>) {
let now = Instant::now();
let before = self.workers.len();
let to_remove: Vec<ValorID> = self
.workers
.iter()
.filter_map(|(id, rec)| {
let elapsed = now.duration_since(rec.last_seen).as_secs();
if elapsed > rec.eviction_after_secs {
Some(id.clone())
} else {
None
}
})
.collect();
for id in &to_remove {
self.workers.remove(id);
}
let threshold = MISSED_BEFORE_UNREACHABLE.load(std::sync::atomic::Ordering::Relaxed);
for rec in self.workers.values_mut() {
let elapsed = now.duration_since(rec.last_seen).as_secs();
if elapsed > rec.unreachable_after_secs || rec.missed_heartbeat_intervals >= threshold {
rec.state = ValorWorkerState::Unreachable;
}
}
let after = self.workers.len();
(before, after, to_remove)
}
pub fn len(&self) -> usize {
self.workers.len()
}
pub fn get_capacity(&self, id: &ValorID) -> Option<ValorWorkerCapacity> {
self.workers.get(id).map(|r| r.capacity)
}
pub fn get_state(&self, id: &ValorID) -> Option<ValorWorkerState> {
self.workers.get(id).map(|r| r.state)
}
pub fn get_missed_heartbeat_intervals(&self, id: &ValorID) -> Option<u32> {
self.workers.get(id).map(|r| r.missed_heartbeat_intervals)
}
pub fn state_counts(&self) -> (usize, usize, usize, usize) {
let mut joining = 0usize;
let mut ready = 0usize;
let mut unreachable = 0usize;
for rec in self.workers.values() {
match rec.state {
ValorWorkerState::Joining => joining += 1,
ValorWorkerState::Ready => ready += 1,
ValorWorkerState::Unreachable => unreachable += 1,
}
}
let total = self.workers.len();
(joining, ready, unreachable, total)
}
pub fn iter(&self) -> impl Iterator<Item = (&ValorID, &ValorWorkerRecord)> {
self.workers.iter()
}
pub fn union_services(&self) -> HashSet<ValorServiceId> {
let mut set: HashSet<ValorServiceId> = HashSet::new();
for rec in self.workers.values() {
if rec.state == ValorWorkerState::Ready {
for id in &rec.services {
set.insert(id.clone());
}
}
}
set
}
}