use std::collections::hash_map;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use tycho_types::models::ShardStateUnsplit;
use tycho_util::FastHashMap;
const METRIC_MIN_REF_MC_SEQNO: &str = "tycho_min_ref_mc_seqno";
#[derive(Clone, Default)]
#[repr(transparent)]
pub struct MinRefMcStateTracker {
inner: Arc<Inner>,
}
impl MinRefMcStateTracker {
#[inline]
pub fn new() -> Self {
Self {
inner: Arc::new(Inner::default()),
}
}
pub fn seqno(&self) -> Option<u32> {
self.inner.counters.read().min_seqno
}
pub fn insert(&self, state: &ShardStateUnsplit) -> RefMcStateHandle {
if state.seqno == 0 || state.min_ref_mc_seqno == u32::MAX {
self.insert_untracked()
} else {
self.insert_seqno(state.min_ref_mc_seqno)
}
}
pub fn insert_seqno(&self, mc_seqno: u32) -> RefMcStateHandle {
self.inner.insert(mc_seqno)
}
pub fn insert_untracked(&self) -> RefMcStateHandle {
RefMcStateHandle(Arc::new(HandleInner {
min_ref_mc_state: self.inner.clone(),
mc_seqno: None,
}))
}
#[inline]
fn wrap(inner: &Arc<Inner>) -> &Self {
unsafe { &*(inner as *const Arc<Inner>).cast::<Self>() }
}
}
#[derive(Clone)]
#[repr(transparent)]
pub struct RefMcStateHandle(Arc<HandleInner>);
impl RefMcStateHandle {
pub fn min_safe<'a>(&'a self, other: &'a Self) -> &'a Self {
match (self.0.mc_seqno, other.0.mc_seqno) {
(_, None) => self,
(None, Some(_)) => other,
(Some(this_seqno), Some(other_seqno)) => {
if other_seqno < this_seqno {
other
} else {
self
}
}
}
}
pub fn tracker(&self) -> &MinRefMcStateTracker {
MinRefMcStateTracker::wrap(&self.0.min_ref_mc_state)
}
}
impl std::fmt::Debug for RefMcStateHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RefMcStateHandle")
.field("mc_seqno", &self.0.mc_seqno)
.finish()
}
}
#[derive(Default)]
struct Inner {
counters: parking_lot::RwLock<StateIds>,
}
impl Inner {
fn insert(self: &Arc<Self>, mc_seqno: u32) -> RefMcStateHandle {
let counters = self.counters.read();
if let Some(counter) = counters.refs.get(&mc_seqno) {
counter.fetch_add(1, Ordering::Release);
return RefMcStateHandle(Arc::new(HandleInner {
min_ref_mc_state: self.clone(),
mc_seqno: Some(mc_seqno),
}));
}
drop(counters);
let mut counters = self.counters.write();
match counters.refs.entry(mc_seqno) {
hash_map::Entry::Vacant(entry) => {
entry.insert(AtomicU32::new(1));
match &mut counters.min_seqno {
Some(seqno) if mc_seqno < *seqno => *seqno = mc_seqno,
None => counters.min_seqno = Some(mc_seqno),
_ => {}
}
}
hash_map::Entry::Occupied(entry) => {
entry.get().fetch_add(1, Ordering::Release);
}
}
let min_seqno = counters.min_seqno;
drop(counters);
metrics::gauge!(METRIC_MIN_REF_MC_SEQNO).set(min_seqno.unwrap_or_default());
RefMcStateHandle(Arc::new(HandleInner {
min_ref_mc_state: self.clone(),
mc_seqno: Some(mc_seqno),
}))
}
fn remove(&self, mc_seqno: u32) {
let counters = self.counters.read();
if let Some(counter) = counters.refs.get(&mc_seqno) {
if counter.fetch_sub(1, Ordering::AcqRel) > 1 {
return;
}
} else {
return;
}
drop(counters);
let mut counters = self.counters.write();
match counters.refs.entry(mc_seqno) {
hash_map::Entry::Occupied(entry) if entry.get().load(Ordering::Acquire) == 0 => {
entry.remove();
if matches!(counters.min_seqno, Some(seqno) if seqno == mc_seqno) {
counters.min_seqno = counters.refs.keys().min().copied();
}
}
_ => {}
}
let min_seqno = counters.min_seqno;
drop(counters);
metrics::gauge!(METRIC_MIN_REF_MC_SEQNO).set(min_seqno.unwrap_or_default());
}
}
struct HandleInner {
min_ref_mc_state: Arc<Inner>,
mc_seqno: Option<u32>,
}
impl Drop for HandleInner {
fn drop(&mut self) {
if let Some(mc_seqno) = self.mc_seqno {
self.min_ref_mc_state.remove(mc_seqno);
}
}
}
#[derive(Default)]
struct StateIds {
min_seqno: Option<u32>,
refs: FastHashMap<u32, AtomicU32>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn min_ref_mc_state() {
let state = MinRefMcStateTracker::new();
{
let _handle = state.insert_seqno(10);
assert_eq!(state.seqno(), Some(10));
}
assert_eq!(state.seqno(), None);
{
let handle1 = state.insert_seqno(10);
assert_eq!(state.seqno(), Some(10));
let _handle2 = state.insert_seqno(15);
assert_eq!(state.seqno(), Some(10));
let handle3 = state.insert_seqno(10);
assert_eq!(state.seqno(), Some(10));
drop(handle3);
assert_eq!(state.seqno(), Some(10));
drop(handle1);
assert_eq!(state.seqno(), Some(15));
}
assert_eq!(state.seqno(), None);
}
}