#[cfg(feature = "eviction")]
use super::current_cycle;
use super::{DynamicLabelSet, thread_id};
use crossbeam_utils::CachePadded;
use parking_lot::RwLock;
use std::cell::RefCell;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
#[cfg(feature = "eviction")]
use std::sync::atomic::AtomicU32;
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Weak};
static GAUGE_I64_IDS: AtomicUsize = AtomicUsize::new(1);
const DEFAULT_MAX_SERIES: usize = 2000;
const OVERFLOW_LABEL_KEY: &str = "__ft_overflow";
const OVERFLOW_LABEL_VALUE: &str = "true";
type GaugeI64IndexShard = CachePadded<RwLock<HashMap<DynamicLabelSet, Arc<GaugeI64Series>>>>;
struct GaugeI64Series {
cells: Vec<CachePadded<AtomicI64>>,
evicted: AtomicBool,
#[cfg(feature = "eviction")]
last_accessed_cycle: AtomicU32,
}
impl GaugeI64Series {
#[cfg(feature = "eviction")]
fn new(shard_count: usize, cycle: u32) -> Self {
Self {
cells: (0..shard_count)
.map(|_| CachePadded::new(AtomicI64::new(0)))
.collect(),
evicted: AtomicBool::new(false),
last_accessed_cycle: AtomicU32::new(cycle),
}
}
#[cfg(not(feature = "eviction"))]
fn new(shard_count: usize) -> Self {
Self {
cells: (0..shard_count)
.map(|_| CachePadded::new(AtomicI64::new(0)))
.collect(),
evicted: AtomicBool::new(false),
}
}
#[inline]
fn add_at(&self, shard_idx: usize, value: i64) {
self.cells[shard_idx].fetch_add(value, Ordering::Relaxed);
}
#[inline]
fn set_at(&self, shard_idx: usize, value: i64) {
for (i, cell) in self.cells.iter().enumerate() {
if i == shard_idx {
cell.store(value, Ordering::Relaxed);
} else {
cell.store(0, Ordering::Relaxed);
}
}
}
#[cfg(feature = "eviction")]
#[inline]
fn touch(&self, cycle: u32) {
self.last_accessed_cycle.store(cycle, Ordering::Relaxed);
}
#[inline]
fn sum(&self) -> i64 {
self.cells
.iter()
.map(|cell| cell.load(Ordering::Relaxed))
.sum()
}
#[inline]
fn is_evicted(&self) -> bool {
self.evicted.load(Ordering::Relaxed)
}
#[cfg(feature = "eviction")]
fn mark_evicted(&self) {
self.evicted.store(true, Ordering::Relaxed);
}
}
#[derive(Clone)]
pub struct DynamicGaugeI64Series {
series: Arc<GaugeI64Series>,
shard_mask: usize,
}
impl DynamicGaugeI64Series {
#[inline]
pub fn inc(&self) {
self.add(1);
}
#[inline]
pub fn dec(&self) {
self.add(-1);
}
#[inline]
pub fn add(&self, value: i64) {
let shard_idx = thread_id() & self.shard_mask;
self.series.add_at(shard_idx, value);
}
#[inline]
pub fn set(&self, value: i64) {
let shard_idx = thread_id() & self.shard_mask;
self.series.set_at(shard_idx, value);
}
#[inline]
pub fn get(&self) -> i64 {
self.series.sum()
}
#[inline]
pub fn is_evicted(&self) -> bool {
self.series.is_evicted()
}
}
struct SeriesCacheEntry {
gauge_id: usize,
ordered_labels: Vec<(String, String)>,
series: Weak<GaugeI64Series>,
}
thread_local! {
static SERIES_CACHE: RefCell<Option<SeriesCacheEntry>> = const { RefCell::new(None) };
}
pub struct DynamicGaugeI64 {
id: usize,
shard_count: usize,
max_series: usize,
shard_mask: usize,
index_shards: Vec<GaugeI64IndexShard>,
series_count: AtomicUsize,
overflow_count: AtomicU64,
}
impl DynamicGaugeI64 {
pub fn new(shard_count: usize) -> Self {
Self::with_max_series(shard_count, DEFAULT_MAX_SERIES)
}
pub fn with_max_series(shard_count: usize, max_series: usize) -> Self {
let shard_count = shard_count.next_power_of_two();
let id = GAUGE_I64_IDS.fetch_add(1, Ordering::Relaxed);
Self {
id,
shard_count,
max_series,
shard_mask: shard_count - 1,
index_shards: (0..shard_count)
.map(|_| CachePadded::new(RwLock::new(HashMap::new())))
.collect(),
series_count: AtomicUsize::new(0),
overflow_count: AtomicU64::new(0),
}
}
pub fn series(&self, labels: &[(&str, &str)]) -> DynamicGaugeI64Series {
if let Some(series) = self.cached_series(labels) {
return DynamicGaugeI64Series {
series,
shard_mask: self.shard_mask,
};
}
let series = self.lookup_or_create(labels);
self.update_cache(labels, Arc::clone(&series));
DynamicGaugeI64Series {
series,
shard_mask: self.shard_mask,
}
}
#[inline]
pub fn inc(&self, labels: &[(&str, &str)]) {
self.add(labels, 1);
}
#[inline]
pub fn dec(&self, labels: &[(&str, &str)]) {
self.add(labels, -1);
}
#[inline]
pub fn add(&self, labels: &[(&str, &str)], value: i64) {
if let Some(series) = self.cached_series(labels) {
let shard_idx = thread_id() & self.shard_mask;
series.add_at(shard_idx, value);
return;
}
let series = self.lookup_or_create(labels);
self.update_cache(labels, Arc::clone(&series));
let shard_idx = thread_id() & self.shard_mask;
series.add_at(shard_idx, value);
}
#[inline]
pub fn set(&self, labels: &[(&str, &str)], value: i64) {
if let Some(series) = self.cached_series(labels) {
let shard_idx = thread_id() & self.shard_mask;
series.set_at(shard_idx, value);
return;
}
let series = self.lookup_or_create(labels);
self.update_cache(labels, Arc::clone(&series));
let shard_idx = thread_id() & self.shard_mask;
series.set_at(shard_idx, value);
}
pub fn get(&self, labels: &[(&str, &str)]) -> i64 {
let key = DynamicLabelSet::from_pairs(labels);
let index_shard = self.index_shard_for(&key);
self.index_shards[index_shard]
.read()
.get(&key)
.map(|series| series.sum())
.unwrap_or(0)
}
pub fn sum_all(&self) -> i64 {
self.snapshot().into_iter().map(|(_, value)| value).sum()
}
pub fn snapshot(&self) -> Vec<(DynamicLabelSet, i64)> {
let mut out = Vec::new();
for shard in &self.index_shards {
let guard = shard.read();
for (labels, series) in guard.iter() {
out.push((labels.clone(), series.sum()));
}
}
out
}
pub fn cardinality(&self) -> usize {
self.index_shards
.iter()
.map(|shard| shard.read().len())
.sum()
}
pub fn overflow_count(&self) -> u64 {
self.overflow_count.load(Ordering::Relaxed)
}
pub(crate) fn visit_series(&self, mut f: impl FnMut(&[(String, String)], i64)) {
for shard in &self.index_shards {
let guard = shard.read();
for (labels, series) in guard.iter() {
f(labels.pairs(), series.sum());
}
}
}
#[cfg(feature = "eviction")]
pub fn evict_stale(&self, max_staleness: u32) -> usize {
let cycle = current_cycle();
let mut removed = 0;
for shard in &self.index_shards {
let mut guard = shard.write();
guard.retain(|_labels, series| {
if Arc::strong_count(series) > 1 {
return true;
}
let last = series.last_accessed_cycle.load(Ordering::Relaxed);
let stale = cycle.saturating_sub(last) > max_staleness;
if stale {
series.mark_evicted();
removed += 1;
self.series_count.fetch_sub(1, Ordering::Relaxed);
}
!stale
});
}
removed
}
fn lookup_or_create(&self, labels: &[(&str, &str)]) -> Arc<GaugeI64Series> {
let requested_key = DynamicLabelSet::from_pairs(labels);
let requested_shard = self.index_shard_for(&requested_key);
#[cfg(feature = "eviction")]
let cycle = current_cycle();
if let Some(series) = self.index_shards[requested_shard]
.read()
.get(&requested_key)
{
#[cfg(feature = "eviction")]
series.touch(cycle);
return Arc::clone(series);
}
let key = if self.max_series > 0
&& self.series_count.load(Ordering::Relaxed) >= self.max_series
{
self.overflow_count.fetch_add(1, Ordering::Relaxed);
DynamicLabelSet::from_pairs(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)])
} else {
requested_key
};
let shard = self.index_shard_for(&key);
if let Some(series) = self.index_shards[shard].read().get(&key) {
#[cfg(feature = "eviction")]
series.touch(cycle);
return Arc::clone(series);
}
let mut guard = self.index_shards[shard].write();
if let Some(series) = guard.get(&key) {
#[cfg(feature = "eviction")]
series.touch(cycle);
return Arc::clone(series);
}
#[cfg(feature = "eviction")]
let series = Arc::new(GaugeI64Series::new(self.shard_count, cycle));
#[cfg(not(feature = "eviction"))]
let series = Arc::new(GaugeI64Series::new(self.shard_count));
guard.insert(key, Arc::clone(&series));
self.series_count.fetch_add(1, Ordering::Relaxed);
series
}
fn index_shard_for(&self, key: &DynamicLabelSet) -> usize {
let mut hasher = std::collections::hash_map::DefaultHasher::new();
key.hash(&mut hasher);
(hasher.finish() as usize) & self.shard_mask
}
fn cached_series(&self, labels: &[(&str, &str)]) -> Option<Arc<GaugeI64Series>> {
SERIES_CACHE.with(|cache| {
let cache_ref = cache.borrow();
let entry = cache_ref.as_ref()?;
if entry.gauge_id != self.id {
return None;
}
if entry.ordered_labels.len() != labels.len() {
return None;
}
for (idx, (k, v)) in labels.iter().enumerate() {
let (ek, ev) = &entry.ordered_labels[idx];
if ek != k || ev != v {
return None;
}
}
let series = entry.series.upgrade()?;
if series.is_evicted() {
return None;
}
#[cfg(feature = "eviction")]
series.touch(current_cycle());
Some(series)
})
}
fn update_cache(&self, labels: &[(&str, &str)], series: Arc<GaugeI64Series>) {
SERIES_CACHE.with(|cache| {
let ordered_labels = labels
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect();
*cache.borrow_mut() = Some(SeriesCacheEntry {
gauge_id: self.id,
ordered_labels,
series: Arc::downgrade(&series),
});
});
}
}
#[cfg(test)]
mod tests {
#[cfg(feature = "eviction")]
use super::super::advance_cycle;
use super::*;
#[test]
fn test_basic_operations() {
let gauge = DynamicGaugeI64::new(4);
gauge.inc(&[("endpoint_id", "ep1")]);
gauge.add(&[("endpoint_id", "ep1")], 2);
assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 3);
gauge.dec(&[("endpoint_id", "ep1")]);
assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 2);
gauge.add(&[("endpoint_id", "ep1")], -2);
assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 0);
}
#[test]
fn test_series_handle() {
let gauge = DynamicGaugeI64::new(4);
let series = gauge.series(&[("endpoint_id", "ep1")]);
series.inc();
series.inc();
series.dec();
assert_eq!(series.get(), 1);
assert_eq!(gauge.get(&[("endpoint_id", "ep1")]), 1);
}
#[test]
fn test_snapshot() {
let gauge = DynamicGaugeI64::new(4);
gauge.add(&[("endpoint_id", "ep1")], 10);
gauge.add(&[("endpoint_id", "ep2")], 20);
let snap = gauge.snapshot();
assert_eq!(snap.len(), 2);
let total: i64 = snap.iter().map(|(_, v)| v).sum();
assert_eq!(total, 30);
}
#[cfg(feature = "eviction")]
#[test]
fn test_evict_stale() {
let gauge = DynamicGaugeI64::new(4);
let labels = &[("endpoint_id", "evict_i64")];
gauge.add(labels, 5);
assert_eq!(gauge.cardinality(), 1);
advance_cycle();
advance_cycle();
gauge.add(&[("flush", "cache")], 1);
let removed = gauge.evict_stale(1);
assert_eq!(removed, 1);
assert_eq!(gauge.cardinality(), 1); assert_eq!(gauge.get(labels), 0);
}
#[cfg(feature = "eviction")]
#[test]
fn test_series_handle_protects_from_eviction() {
let gauge = DynamicGaugeI64::new(4);
let labels = &[("endpoint_id", "tombstone_i64")];
let series = gauge.series(labels);
series.add(5);
assert!(!series.is_evicted());
advance_cycle();
advance_cycle();
let removed = gauge.evict_stale(1);
assert_eq!(removed, 0);
assert!(!series.is_evicted());
assert_eq!(gauge.get(labels), 5);
}
#[test]
fn test_overflow_bucket_routes_new_series_at_capacity() {
let gauge = DynamicGaugeI64::with_max_series(4, 1);
gauge.add(&[("endpoint_id", "1")], 1);
gauge.add(&[("endpoint_id", "2")], 2);
assert_eq!(gauge.cardinality(), 2);
assert_eq!(gauge.get(&[(OVERFLOW_LABEL_KEY, OVERFLOW_LABEL_VALUE)]), 2);
}
}