use std::sync::Arc;
use metrics::GaugeValue;
use seqlock::SeqLock;
use super::MetricKind;
use crate::protocol::GaugeEpoch;
pub(crate) struct Gauge {
origin: Arc<GaugeOrigin>,
epoch: GaugeEpoch,
delta: f64,
}
impl MetricKind for Gauge {
type Output = (f64, GaugeEpoch);
type Shared = Arc<GaugeOrigin>;
type Value = GaugeValue;
fn new(origin: Self::Shared) -> Self {
Self {
origin: origin.clone(),
epoch: 0,
delta: 0.0,
}
}
fn update(&mut self, value: Self::Value) {
let delta = match value {
GaugeValue::Absolute(value) => {
self.epoch = self.origin.set(value);
self.delta = 0.0;
return;
}
GaugeValue::Increment(delta) => delta,
GaugeValue::Decrement(delta) => -delta,
};
let current_epoch = self.origin.get().1;
if self.epoch == current_epoch {
self.delta += delta;
} else {
self.epoch = current_epoch;
self.delta = delta;
}
}
fn merge(self, (out_value, out_epoch): &mut Self::Output) -> usize {
let (last_absolute, current_epoch) = self.origin.get();
debug_assert!(current_epoch >= *out_epoch);
debug_assert!(current_epoch >= self.epoch);
if current_epoch > *out_epoch {
*out_value = last_absolute;
*out_epoch = current_epoch;
}
if current_epoch == self.epoch {
*out_value += self.delta;
}
std::mem::size_of::<GaugeOrigin>()
}
}
#[derive(Default)]
pub(crate) struct GaugeOrigin(SeqLock<(f64, GaugeEpoch)>);
impl GaugeOrigin {
fn get(&self) -> (f64, GaugeEpoch) {
self.0.read()
}
fn set(&self, value: f64) -> GaugeEpoch {
let mut pair = self.0.lock_write();
let new_epoch = pair.1 + 1;
*pair = (value, new_epoch);
new_epoch
}
}
#[cfg(test)]
mod tests {
use std::{collections::VecDeque, ops::Range};
use proptest::prelude::*;
use super::*;
const ACTIONS: Range<usize> = 1..1000;
const SHARDS: usize = 3;
#[derive(Debug, Clone)]
enum Action {
Update(Update),
Merge(usize),
}
#[derive(Debug, Clone)]
struct Update {
shard: usize, value: GaugeValue,
}
fn action_strategy() -> impl Strategy<Value = Action> {
prop_oneof![
1 => (1..=SHARDS).prop_map(Action::Merge),
10 => update_strategy().prop_map(Action::Update),
]
}
prop_compose! {
fn update_strategy()(shard in 0..SHARDS, value in gauge_value_strategy()) -> Update {
Update { shard, value }
}
}
fn gauge_value_strategy() -> impl Strategy<Value = GaugeValue> {
prop_oneof![
1 => (0..10).prop_map(|v| GaugeValue::Absolute(v as f64)),
5 => (1..10).prop_map(|v| GaugeValue::Increment(v as f64)),
5 => (1..10).prop_map(|v| GaugeValue::Decrement(v as f64)),
]
}
proptest! {
#[test]
fn linearizability(actions in prop::collection::vec(action_strategy(), ACTIONS)) {
let origin = Arc::new(GaugeOrigin::default());
let mut shards = (0..SHARDS).map(|_| Gauge::new(origin.clone())).collect::<VecDeque<_>>();
let mut expected = 0.0;
let mut actual = (0.0, 0);
for action in actions {
match action {
Action::Update(update) => {
expected = update.value.update_value(expected);
shards[update.shard].update(update.value);
}
Action::Merge(limit) => {
for _ in 0..limit {
let shard = shards.pop_front().unwrap();
shard.merge(&mut actual);
shards.push_back(Gauge::new(origin.clone()));
assert_eq!(shards.len(), SHARDS);
}
if limit == SHARDS {
prop_assert_eq!(actual.0, expected);
}
}
}
}
for shard in shards {
shard.merge(&mut actual);
}
prop_assert_eq!(actual.0, expected);
}
}
}