use std::collections::HashMap;
use std::sync::{Arc, Mutex};
pub type Labels = Vec<(String, String)>;
#[derive(Clone, Hash, Eq, PartialEq, Debug)]
pub struct MetricKey {
pub name: String,
pub labels: Labels,
}
impl MetricKey {
pub fn new(name: &str, labels: &[(&str, &str)]) -> Self {
let mut sorted_labels: Labels = labels
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
sorted_labels.sort_by(|a, b| a.0.cmp(&b.0));
Self {
name: name.to_string(),
labels: sorted_labels,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TransactionMetrics {
pub metrics: Arc<Mutex<MetricsReport>>,
}
#[derive(Debug, Default, Clone)]
pub struct TransactionInfo {
pub retries: u64,
pub read_version: Option<i64>,
pub commit_version: Option<i64>,
}
#[derive(Debug, Clone, Default)]
pub struct MetricsReport {
pub current: CounterMetrics,
pub total: CounterMetrics,
pub time: TimingMetrics,
pub custom_metrics: HashMap<MetricKey, u64>,
pub transaction: TransactionInfo,
}
impl MetricsReport {
pub fn set_read_version(&mut self, version: i64) {
self.transaction.read_version = Some(version);
}
pub fn set_commit_version(&mut self, version: i64) {
self.transaction.commit_version = Some(version);
}
pub fn increment_retries(&mut self) {
self.transaction.retries += 1;
}
}
#[derive(Debug, Default, Clone)]
pub struct TimingMetrics {
pub commit_execution_ms: u64,
pub on_error_execution_ms: Vec<u64>,
pub total_execution_ms: u64,
}
impl TimingMetrics {
pub fn record_commit_time(&mut self, duration_ms: u64) {
self.commit_execution_ms = duration_ms;
}
pub fn add_error_time(&mut self, duration_ms: u64) {
self.on_error_execution_ms.push(duration_ms);
}
pub fn set_execution_time(&mut self, duration_ms: u64) {
self.total_execution_ms = duration_ms;
}
pub fn get_total_error_time(&self) -> u64 {
self.on_error_execution_ms.iter().sum()
}
}
pub enum FdbCommand {
Atomic,
Clear,
ClearRange,
Get(u64, u64),
GetRange(u64, u64),
Set(u64),
}
const INCREMENT: u64 = 1;
impl TransactionMetrics {
pub fn new() -> Self {
Self {
metrics: Arc::new(Mutex::new(MetricsReport::default())),
}
}
pub fn set_read_version(&self, version: i64) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.set_read_version(version);
}
pub fn set_commit_version(&self, version: i64) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.set_commit_version(version);
}
pub fn reset_current(&self) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.increment_retries();
data.current = CounterMetrics::default();
data.custom_metrics.clear();
}
pub fn increment_retries(&self) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.increment_retries();
}
pub fn report_metrics(&self, fdb_command: FdbCommand) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.current.increment(&fdb_command);
data.total.increment(&fdb_command);
}
pub fn get_retries(&self) -> u64 {
let data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.transaction.retries
}
pub fn get_metrics_data(&self) -> MetricsReport {
let data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.clone()
}
pub fn get_transaction_info(&self) -> TransactionInfo {
let data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.transaction.clone()
}
pub fn set_custom(&self, name: &str, value: u64, labels: &[(&str, &str)]) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let key = MetricKey::new(name, labels);
data.custom_metrics.insert(key.clone(), value);
}
pub fn increment_custom(&self, name: &str, amount: u64, labels: &[(&str, &str)]) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
let key = MetricKey::new(name, labels);
*data.custom_metrics.entry(key.clone()).or_insert(0) += amount;
}
pub fn record_commit_time(&self, duration_ms: u64) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.time.commit_execution_ms = duration_ms;
}
pub fn add_error_time(&self, duration_ms: u64) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.time.on_error_execution_ms.push(duration_ms);
}
pub fn set_execution_time(&self, duration_ms: u64) {
let mut data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.time.set_execution_time(duration_ms);
}
pub fn get_total_error_time(&self) -> u64 {
let data = self
.metrics
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner());
data.time.on_error_execution_ms.iter().sum()
}
}
#[derive(Debug, Default, Clone)]
pub struct CounterMetrics {
pub call_atomic_op: u64,
pub call_clear: u64,
pub call_clear_range: u64,
pub call_get: u64,
pub keys_values_fetched: u64,
pub bytes_read: u64,
pub call_set: u64,
pub bytes_written: u64,
}
impl CounterMetrics {
pub fn increment(&mut self, fdb_command: &FdbCommand) {
match fdb_command {
FdbCommand::Atomic => self.call_atomic_op += INCREMENT,
FdbCommand::Clear => self.call_clear += INCREMENT,
FdbCommand::ClearRange => self.call_clear_range += INCREMENT,
FdbCommand::Get(bytes_count, kv_fetched) => {
self.keys_values_fetched += *kv_fetched;
self.bytes_read += *bytes_count;
self.call_get += INCREMENT
}
FdbCommand::GetRange(bytes_count, keys_values_fetched) => {
self.keys_values_fetched += *keys_values_fetched;
self.bytes_read += *bytes_count;
}
FdbCommand::Set(bytes_count) => {
self.bytes_written += *bytes_count;
self.call_set += INCREMENT
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashSet;
#[test]
fn test_metric_key_equality() {
let key1 = MetricKey::new("counter", &[("region", "us-west"), ("service", "api")]);
let key2 = MetricKey::new("counter", &[("service", "api"), ("region", "us-west")]);
assert_eq!(key1, key2);
let key3 = MetricKey::new("counter", &[("region", "us-east"), ("service", "api")]);
assert_ne!(key1, key3);
let key4 = MetricKey::new("counter", &[("zone", "us-west"), ("service", "api")]);
assert_ne!(key1, key4);
let key5 = MetricKey::new("timer", &[("region", "us-west"), ("service", "api")]);
assert_ne!(key1, key5);
}
#[test]
fn test_metric_key_in_hashmap() {
let mut metrics = HashMap::new();
metrics.insert(
MetricKey::new("counter", &[("region", "us-west"), ("service", "api")]),
100,
);
metrics.insert(
MetricKey::new("counter", &[("region", "us-east"), ("service", "api")]),
200,
);
metrics.insert(
MetricKey::new("timer", &[("region", "us-west"), ("service", "api")]),
300,
);
assert_eq!(
metrics.get(&MetricKey::new(
"counter",
&[("region", "us-west"), ("service", "api")]
)),
Some(&100)
);
assert_eq!(
metrics.get(&MetricKey::new(
"counter",
&[("service", "api"), ("region", "us-west")]
)),
Some(&100)
);
assert_eq!(
metrics.get(&MetricKey::new(
"counter",
&[("region", "us-east"), ("service", "api")]
)),
Some(&200)
);
assert_eq!(
metrics.get(&MetricKey::new(
"timer",
&[("region", "us-west"), ("service", "api")]
)),
Some(&300)
);
}
#[test]
fn test_metric_key_label_order_independence() {
let mut unique_keys = HashSet::new();
unique_keys.insert(MetricKey::new(
"counter",
&[("a", "1"), ("b", "2"), ("c", "3")],
));
unique_keys.insert(MetricKey::new(
"counter",
&[("a", "1"), ("c", "3"), ("b", "2")],
));
unique_keys.insert(MetricKey::new(
"counter",
&[("b", "2"), ("a", "1"), ("c", "3")],
));
unique_keys.insert(MetricKey::new(
"counter",
&[("b", "2"), ("c", "3"), ("a", "1")],
));
unique_keys.insert(MetricKey::new(
"counter",
&[("c", "3"), ("a", "1"), ("b", "2")],
));
unique_keys.insert(MetricKey::new(
"counter",
&[("c", "3"), ("b", "2"), ("a", "1")],
));
assert_eq!(unique_keys.len(), 1);
}
#[test]
fn test_custom_metrics_operations() {
let metrics = TransactionMetrics::new();
metrics.set_custom(
"api_calls",
100,
&[("endpoint", "users"), ("method", "GET")],
);
let data = metrics.get_metrics_data();
let key = MetricKey::new("api_calls", &[("endpoint", "users"), ("method", "GET")]);
assert_eq!(data.custom_metrics.get(&key).copied(), Some(100));
metrics.increment_custom("api_calls", 50, &[("endpoint", "users"), ("method", "GET")]);
let data = metrics.get_metrics_data();
assert_eq!(data.custom_metrics.get(&key).copied(), Some(150));
metrics.set_custom(
"api_calls",
200,
&[("endpoint", "users"), ("method", "GET")],
);
let data = metrics.get_metrics_data();
assert_eq!(data.custom_metrics.get(&key).copied(), Some(200));
metrics.increment_custom("api_calls", 75, &[("method", "GET"), ("endpoint", "users")]);
let data = metrics.get_metrics_data();
assert_eq!(data.custom_metrics.get(&key).copied(), Some(275));
}
}