use crate::Error;
use commonware_utils::StableBuf;
use prometheus_client::{
metrics::{counter::Counter, gauge::Gauge},
registry::Registry,
};
use std::{ops::Deref, sync::Arc};
pub struct Metrics {
pub open_blobs: Gauge,
pub storage_reads: Counter,
pub storage_read_bytes: Counter,
pub storage_writes: Counter,
pub storage_write_bytes: Counter,
}
impl Metrics {
fn new(registry: &mut Registry) -> Self {
let metrics = Self {
open_blobs: Gauge::default(),
storage_reads: Counter::default(),
storage_read_bytes: Counter::default(),
storage_writes: Counter::default(),
storage_write_bytes: Counter::default(),
};
registry.register(
"open_blobs",
"Number of open blobs",
metrics.open_blobs.clone(),
);
registry.register(
"storage_reads",
"Total number of disk reads",
metrics.storage_reads.clone(),
);
registry.register(
"storage_read_bytes",
"Total amount of data read from disk",
metrics.storage_read_bytes.clone(),
);
registry.register(
"storage_writes",
"Total number of disk writes",
metrics.storage_writes.clone(),
);
registry.register(
"storage_write_bytes",
"Total amount of data written to disk",
metrics.storage_write_bytes.clone(),
);
metrics
}
}
#[derive(Clone)]
pub struct Storage<S> {
inner: S,
metrics: Arc<Metrics>,
}
impl<S> Storage<S> {
pub fn new(inner: S, registry: &mut Registry) -> Self {
Self {
inner,
metrics: Metrics::new(registry).into(),
}
}
}
impl<S: crate::Storage> crate::Storage for Storage<S> {
type Blob = Blob<S::Blob>;
async fn open(&self, partition: &str, name: &[u8]) -> Result<(Self::Blob, u64), Error> {
self.metrics.open_blobs.inc();
let (inner, len) = self.inner.open(partition, name).await?;
Ok((
Blob {
inner,
metrics: Arc::new(MetricsHandle(self.metrics.clone())),
},
len,
))
}
async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
self.inner.remove(partition, name).await
}
async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
self.inner.scan(partition).await
}
}
#[derive(Clone)]
pub struct Blob<B> {
inner: B,
metrics: Arc<MetricsHandle>,
}
struct MetricsHandle(Arc<Metrics>);
impl Deref for MetricsHandle {
type Target = Metrics;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Drop for MetricsHandle {
fn drop(&mut self) {
self.0.open_blobs.dec();
}
}
impl<B: crate::Blob> crate::Blob for Blob<B> {
async fn read_at(
&self,
buf: impl Into<StableBuf> + Send,
offset: u64,
) -> Result<StableBuf, Error> {
let read = self.inner.read_at(buf, offset).await?;
self.metrics.storage_reads.inc();
self.metrics.storage_read_bytes.inc_by(read.len() as u64);
Ok(read)
}
async fn write_at(&self, buf: impl Into<StableBuf> + Send, offset: u64) -> Result<(), Error> {
let buf = buf.into();
let buf_len = buf.len();
self.inner.write_at(buf, offset).await?;
self.metrics.storage_writes.inc();
self.metrics.storage_write_bytes.inc_by(buf_len as u64);
Ok(())
}
async fn resize(&self, len: u64) -> Result<(), Error> {
self.inner.resize(len).await
}
async fn sync(&self) -> Result<(), Error> {
self.inner.sync().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
storage::{memory::Storage as MemoryStorage, tests::run_storage_tests},
Blob, Storage as _,
};
use prometheus_client::registry::Registry;
#[tokio::test]
async fn test_metered_storage() {
let mut registry = Registry::default();
let inner = MemoryStorage::default();
let storage = Storage::new(inner, &mut registry);
run_storage_tests(storage).await;
}
#[tokio::test]
async fn test_metered_blob_metrics() {
let mut registry = Registry::default();
let inner = MemoryStorage::default();
let storage = Storage::new(inner, &mut registry);
let (blob, _) = storage.open("partition", b"test_blob").await.unwrap();
let open_blobs = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs, 1,
"open_blobs metric was not incremented after opening a blob"
);
blob.write_at(b"hello world".to_vec(), 0).await.unwrap();
let writes = storage.metrics.storage_writes.get();
let write_bytes = storage.metrics.storage_write_bytes.get();
assert_eq!(
writes, 1,
"storage_writes metric was not incremented after write"
);
assert_eq!(
write_bytes, 11,
"storage_write_bytes metric was not updated correctly after write"
);
let read = blob.read_at(vec![0; 11], 0).await.unwrap();
assert_eq!(read.as_ref(), b"hello world");
let reads = storage.metrics.storage_reads.get();
let read_bytes = storage.metrics.storage_read_bytes.get();
assert_eq!(
reads, 1,
"storage_reads metric was not incremented after read"
);
assert_eq!(
read_bytes, 11,
"storage_read_bytes metric was not updated correctly after read"
);
blob.sync().await.unwrap();
drop(blob);
let open_blobs_after_drop = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs_after_drop, 0,
"open_blobs metric was not decremented after dropping the blob"
);
}
#[tokio::test]
async fn test_metered_blob_multiple_blobs() {
let mut registry = Registry::default();
let inner = MemoryStorage::default();
let storage = Storage::new(inner, &mut registry);
let (blob1, _) = storage.open("partition", b"blob1").await.unwrap();
let (blob2, _) = storage.open("partition", b"blob2").await.unwrap();
let open_blobs = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs, 2,
"open_blobs metric was not updated correctly after opening multiple blobs"
);
blob1.sync().await.unwrap();
drop(blob1);
let open_blobs_after_close_one = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs_after_close_one, 1,
"open_blobs metric was not decremented correctly after dropping one blob"
);
blob2.sync().await.unwrap();
drop(blob2);
let open_blobs_after_drop_all = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs_after_drop_all, 0,
"open_blobs metric was not decremented to zero after dropping all blobs"
);
}
#[tokio::test]
async fn test_cloned_blobs_share_metrics() {
let mut registry = Registry::default();
let inner = MemoryStorage::default();
let storage = Storage::new(inner, &mut registry);
let (blob, _) = storage.open("partition", b"test_blob").await.unwrap();
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric was not incremented after opening a blob"
);
let clone1 = blob.clone();
let clone2 = blob.clone();
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric should not change when blobs are cloned"
);
blob.write_at(b"hello".to_vec(), 0).await.unwrap();
clone1.write_at(b"world".to_vec(), 5).await.unwrap();
let _ = clone1.read_at(vec![0; 10], 0).await.unwrap();
let _ = clone2.read_at(vec![0; 10], 0).await.unwrap();
assert_eq!(
storage.metrics.storage_writes.get(),
2,
"Operations on cloned blobs should update shared metrics"
);
assert_eq!(
storage.metrics.storage_reads.get(),
2,
"Operations on cloned blobs should update shared metrics"
);
drop(clone1);
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric should not change when individual clones are dropped"
);
drop(clone2);
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric should not change when individual clones are dropped"
);
drop(blob);
assert_eq!(
storage.metrics.open_blobs.get(),
0,
"open_blobs metric should be decremented only when the last blob reference is dropped"
);
}
}