use commonware_runtime::{
telemetry::metrics::{
histogram::{duration_histogram, ScopedTimer, Timed},
Counter, Gauge, GaugeExt as _, MetricsExt as _,
},
Clock, Metrics as RuntimeMetrics,
};
use std::{ops::Deref, sync::Arc};
pub(super) struct CacheMetrics {
hits: Counter,
misses: Counter,
}
pub(super) struct CommitMetrics {
calls: Counter,
duration: Timed,
}
pub(super) struct CommonMetrics<E: Clock> {
clock: Arc<E>,
pub size: Gauge,
pub pruning_boundary: Gauge,
pub retained: Gauge,
pub tail_items: Gauge,
pub append_calls: Counter,
append_duration: Timed,
pub append_many_calls: Counter,
append_many_duration: Timed,
pub read_calls: Counter,
read_duration: Timed,
pub read_many_calls: Counter,
read_many_duration: Timed,
pub try_read_sync_hits: Counter,
pub items_read: Counter,
pub sync_calls: Counter,
sync_duration: Timed,
}
impl<E: RuntimeMetrics + Clock> CommonMetrics<E> {
fn new(context: Arc<E>) -> Self {
let size = context
.as_ref()
.gauge("size", "Logical end position of the journal");
let pruning_boundary = context
.as_ref()
.gauge("pruning_boundary", "Oldest readable item position");
let retained = context
.as_ref()
.gauge("retained", "Number of readable items retained");
let tail_items = context.as_ref().gauge(
"tail_items",
"Items in the section containing the newest retained item",
);
let append_calls = context
.as_ref()
.counter("append_calls", "Number of single-item append calls");
let append_duration = duration_histogram(
context.as_ref(),
"append_duration",
"Duration of single-item append calls",
);
let append_many_calls = context
.as_ref()
.counter("append_many_calls", "Number of append-many calls");
let append_many_duration = duration_histogram(
context.as_ref(),
"append_many_duration",
"Duration of append-many calls",
);
let read_calls = context
.as_ref()
.counter("read_calls", "Number of single-item read calls");
let read_duration = duration_histogram(
context.as_ref(),
"read_duration",
"Duration of single-item read calls",
);
let read_many_calls = context
.as_ref()
.counter("read_many_calls", "Number of non-empty batch read calls");
let read_many_duration = duration_histogram(
context.as_ref(),
"read_many_duration",
"Duration of non-empty batch read calls",
);
let try_read_sync_hits = context.as_ref().counter(
"try_read_sync_hits",
"Number of try_read_sync calls that returned Some",
);
let items_read = context.as_ref().counter(
"items_read",
"Number of items returned by read, read_many, and try_read_sync",
);
let sync_calls = context
.as_ref()
.counter("sync_calls", "Number of sync calls");
let sync_duration = duration_histogram(
context.as_ref(),
"sync_duration",
"Duration of full sync calls",
);
Self {
clock: context,
size,
pruning_boundary,
retained,
tail_items,
append_calls,
append_duration: Timed::new(append_duration),
append_many_calls,
append_many_duration: Timed::new(append_many_duration),
read_calls,
read_duration: Timed::new(read_duration),
read_many_calls,
read_many_duration: Timed::new(read_many_duration),
try_read_sync_hits,
items_read,
sync_calls,
sync_duration: Timed::new(sync_duration),
}
}
}
impl<E: Clock> CommonMetrics<E> {
pub(super) fn append_timer(&self) -> ScopedTimer<E> {
self.append_duration.scoped(&self.clock)
}
pub(super) fn append_many_timer(&self) -> ScopedTimer<E> {
self.append_many_duration.scoped(&self.clock)
}
pub(super) fn read_timer(&self) -> ScopedTimer<E> {
self.read_duration.scoped(&self.clock)
}
pub(super) fn read_many_timer(&self) -> ScopedTimer<E> {
self.read_many_duration.scoped(&self.clock)
}
pub(super) fn sync_timer(&self) -> ScopedTimer<E> {
self.sync_duration.scoped(&self.clock)
}
pub(super) fn update(&self, size: u64, pruning_boundary: u64, items_per_section: u64) {
let _ = self.size.try_set(size);
let _ = self.pruning_boundary.try_set(pruning_boundary);
let _ = self.retained.try_set(size.saturating_sub(pruning_boundary));
let tail_items = if size == pruning_boundary {
0
} else {
let tail_section_start = ((size - 1) / items_per_section) * items_per_section;
size - pruning_boundary.max(tail_section_start)
};
let _ = self.tail_items.try_set(tail_items);
}
}
pub(super) struct FixedMetrics<E: Clock> {
common: CommonMetrics<E>,
cache: CacheMetrics,
}
impl<E: RuntimeMetrics + Clock> FixedMetrics<E> {
pub(super) fn new(context: E) -> Self {
let context = Arc::new(context);
let hits = context
.as_ref()
.counter("cache_hits", "Number of fixed items read synchronously");
let misses = context.as_ref().counter(
"cache_misses",
"Number of fixed items not satisfied synchronously, including pruned or out-of-range \
try_read_sync probes that returned None",
);
let common = CommonMetrics::new(context);
Self {
common,
cache: CacheMetrics { hits, misses },
}
}
}
impl<E: Clock> FixedMetrics<E> {
pub(super) fn record_cache_hits(&self, hits: u64) {
self.cache.hits.inc_by(hits);
}
pub(super) fn record_cache_misses(&self, misses: u64) {
self.cache.misses.inc_by(misses);
}
}
impl<E: Clock> Deref for FixedMetrics<E> {
type Target = CommonMetrics<E>;
fn deref(&self) -> &Self::Target {
&self.common
}
}
pub(super) struct VariableMetrics<E: Clock> {
common: CommonMetrics<E>,
commit: CommitMetrics,
}
impl<E: RuntimeMetrics + Clock> VariableMetrics<E> {
pub(super) fn new(context: E) -> Self {
let context = Arc::new(context);
let calls = context
.as_ref()
.counter("commit_calls", "Number of commit calls");
let duration = duration_histogram(
context.as_ref(),
"commit_duration",
"Duration of commit calls",
);
Self {
common: CommonMetrics::new(context),
commit: CommitMetrics {
calls,
duration: Timed::new(duration),
},
}
}
}
impl<E: Clock> VariableMetrics<E> {
pub(super) fn commit_timer(&self) -> ScopedTimer<E> {
self.commit.duration.scoped(&self.common.clock)
}
pub(super) fn record_commit(&self) {
self.commit.calls.inc();
}
}
impl<E: Clock> Deref for VariableMetrics<E> {
type Target = CommonMetrics<E>;
fn deref(&self) -> &Self::Target {
&self.common
}
}