use crate::{Buf, Error, IoBufs, IoBufsMut};
use prometheus_client::{
metrics::{counter::Counter, gauge::Gauge},
registry::Registry,
};
use std::{
ops::{Deref, RangeInclusive},
sync::Arc,
};
pub struct Metrics {
pub open_blobs: Gauge,
pub storage_reads: Counter,
pub storage_read_bytes: Counter,
pub storage_writes: Counter,
pub storage_write_bytes: Counter,
}
impl Metrics {
fn new(registry: &mut Registry) -> Self {
let metrics = Self {
open_blobs: Gauge::default(),
storage_reads: Counter::default(),
storage_read_bytes: Counter::default(),
storage_writes: Counter::default(),
storage_write_bytes: Counter::default(),
};
registry.register(
"open_blobs",
"Number of open blobs",
metrics.open_blobs.clone(),
);
registry.register(
"storage_reads",
"Total number of disk reads",
metrics.storage_reads.clone(),
);
registry.register(
"storage_read_bytes",
"Total amount of data read from disk",
metrics.storage_read_bytes.clone(),
);
registry.register(
"storage_writes",
"Total number of disk writes",
metrics.storage_writes.clone(),
);
registry.register(
"storage_write_bytes",
"Total amount of data written to disk",
metrics.storage_write_bytes.clone(),
);
metrics
}
}
#[derive(Clone)]
pub struct Storage<S> {
inner: S,
metrics: Arc<Metrics>,
}
impl<S> Storage<S> {
pub fn new(inner: S, registry: &mut Registry) -> Self {
Self {
inner,
metrics: Metrics::new(registry).into(),
}
}
pub const fn inner(&self) -> &S {
&self.inner
}
}
impl<S: crate::Storage> crate::Storage for Storage<S> {
type Blob = Blob<S::Blob>;
async fn open_versioned(
&self,
partition: &str,
name: &[u8],
versions: RangeInclusive<u16>,
) -> Result<(Self::Blob, u64, u16), Error> {
self.metrics.open_blobs.inc();
let (inner, len, blob_version) =
self.inner.open_versioned(partition, name, versions).await?;
Ok((
Blob {
inner,
metrics: Arc::new(MetricsHandle(self.metrics.clone())),
},
len,
blob_version,
))
}
async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
self.inner.remove(partition, name).await
}
async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
self.inner.scan(partition).await
}
}
#[derive(Clone)]
pub struct Blob<B> {
inner: B,
metrics: Arc<MetricsHandle>,
}
struct MetricsHandle(Arc<Metrics>);
impl Deref for MetricsHandle {
type Target = Metrics;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Drop for MetricsHandle {
fn drop(&mut self) {
self.0.open_blobs.dec();
}
}
impl<B: crate::Blob> crate::Blob for Blob<B> {
async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
let read = self.inner.read_at(offset, len).await?;
self.metrics.storage_reads.inc();
self.metrics.storage_read_bytes.inc_by(len as u64);
Ok(read)
}
async fn read_at_buf(
&self,
offset: u64,
len: usize,
bufs: impl Into<IoBufsMut> + Send,
) -> Result<IoBufsMut, Error> {
let read = self.inner.read_at_buf(offset, len, bufs).await?;
self.metrics.storage_reads.inc();
self.metrics.storage_read_bytes.inc_by(len as u64);
Ok(read)
}
async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
let bufs = bufs.into();
let bufs_len = bufs.remaining();
self.inner.write_at(offset, bufs).await?;
self.metrics.storage_writes.inc();
self.metrics.storage_write_bytes.inc_by(bufs_len as u64);
Ok(())
}
async fn resize(&self, len: u64) -> Result<(), Error> {
self.inner.resize(len).await
}
async fn sync(&self) -> Result<(), Error> {
self.inner.sync().await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
storage::{memory::Storage as MemoryStorage, tests::run_storage_tests},
Blob, BufferPool, BufferPoolConfig, Storage as _,
};
use prometheus_client::registry::Registry;
fn test_pool() -> BufferPool {
BufferPool::new(BufferPoolConfig::for_storage(), &mut Registry::default())
}
#[tokio::test]
async fn test_metered_storage() {
let mut registry = Registry::default();
let inner = MemoryStorage::new(test_pool());
let storage = Storage::new(inner, &mut registry);
run_storage_tests(storage).await;
}
#[tokio::test]
async fn test_metered_blob_metrics() {
let mut registry = Registry::default();
let inner = MemoryStorage::new(test_pool());
let storage = Storage::new(inner, &mut registry);
let (blob, _) = storage.open("partition", b"test_blob").await.unwrap();
let open_blobs = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs, 1,
"open_blobs metric was not incremented after opening a blob"
);
blob.write_at(0, b"hello world").await.unwrap();
let writes = storage.metrics.storage_writes.get();
let write_bytes = storage.metrics.storage_write_bytes.get();
assert_eq!(
writes, 1,
"storage_writes metric was not incremented after write"
);
assert_eq!(
write_bytes, 11,
"storage_write_bytes metric was not updated correctly after write"
);
let read = blob.read_at(0, 11).await.unwrap();
assert_eq!(read.coalesce(), b"hello world");
let reads = storage.metrics.storage_reads.get();
let read_bytes = storage.metrics.storage_read_bytes.get();
assert_eq!(
reads, 1,
"storage_reads metric was not incremented after read"
);
assert_eq!(
read_bytes, 11,
"storage_read_bytes metric was not updated correctly after read"
);
blob.sync().await.unwrap();
drop(blob);
let open_blobs_after_drop = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs_after_drop, 0,
"open_blobs metric was not decremented after dropping the blob"
);
}
#[tokio::test]
async fn test_metered_blob_multiple_blobs() {
let mut registry = Registry::default();
let inner = MemoryStorage::new(test_pool());
let storage = Storage::new(inner, &mut registry);
let (blob1, _) = storage.open("partition", b"blob1").await.unwrap();
let (blob2, _) = storage.open("partition", b"blob2").await.unwrap();
let open_blobs = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs, 2,
"open_blobs metric was not updated correctly after opening multiple blobs"
);
blob1.sync().await.unwrap();
drop(blob1);
let open_blobs_after_close_one = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs_after_close_one, 1,
"open_blobs metric was not decremented correctly after dropping one blob"
);
blob2.sync().await.unwrap();
drop(blob2);
let open_blobs_after_drop_all = storage.metrics.open_blobs.get();
assert_eq!(
open_blobs_after_drop_all, 0,
"open_blobs metric was not decremented to zero after dropping all blobs"
);
}
#[tokio::test]
async fn test_cloned_blobs_share_metrics() {
let mut registry = Registry::default();
let inner = MemoryStorage::new(test_pool());
let storage = Storage::new(inner, &mut registry);
let (blob, _) = storage.open("partition", b"test_blob").await.unwrap();
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric was not incremented after opening a blob"
);
let clone1 = blob.clone();
let clone2 = blob.clone();
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric should not change when blobs are cloned"
);
blob.write_at(0, b"hello").await.unwrap();
clone1.write_at(5, b"world").await.unwrap();
let _ = clone1.read_at(0, 10).await.unwrap();
let _ = clone2.read_at(0, 10).await.unwrap();
assert_eq!(
storage.metrics.storage_writes.get(),
2,
"Operations on cloned blobs should update shared metrics"
);
assert_eq!(
storage.metrics.storage_reads.get(),
2,
"Operations on cloned blobs should update shared metrics"
);
drop(clone1);
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric should not change when individual clones are dropped"
);
drop(clone2);
assert_eq!(
storage.metrics.open_blobs.get(),
1,
"open_blobs metric should not change when individual clones are dropped"
);
drop(blob);
assert_eq!(
storage.metrics.open_blobs.get(),
0,
"open_blobs metric should be decremented only when the last blob reference is dropped"
);
}
}