use std::sync::Arc;
use crate::storage::{MergeOptions, PutOptions};
use crate::{
BytesRange, Record, StorageError, StorageIterator, StorageRead, StorageResult, Ttl,
storage::{
MergeOperator, MergeRecordOp, PutRecordOp, RecordOp, Storage, StorageSnapshot,
WriteOptions, WriteResult,
},
};
use async_trait::async_trait;
use bytes::Bytes;
use slatedb::config::ScanOptions;
use slatedb::{
Db, DbIterator, DbReader, DbSnapshot, MergeOperator as SlateDbMergeOperator,
MergeOperatorError, WriteBatch, config::WriteOptions as SlateDbWriteOptions,
};
use tokio::sync::watch;
#[cfg(feature = "metrics")]
#[derive(Debug)]
struct ReadableStatGauge(std::sync::Arc<dyn slatedb::stats::ReadableStat>);
#[cfg(feature = "metrics")]
impl prometheus_client::encoding::EncodeMetric for ReadableStatGauge {
fn encode(
&self,
mut encoder: prometheus_client::encoding::MetricEncoder,
) -> Result<(), std::fmt::Error> {
encoder.encode_gauge(&self.0.get())
}
fn metric_type(&self) -> prometheus_client::metrics::MetricType {
match self.0.metric_type() {
slatedb::stats::MetricType::Counter => prometheus_client::metrics::MetricType::Counter,
slatedb::stats::MetricType::Gauge => prometheus_client::metrics::MetricType::Gauge,
}
}
}
pub struct SlateDbMergeOperatorAdapter {
operator: Arc<dyn MergeOperator>,
}
impl SlateDbMergeOperatorAdapter {
fn new(operator: Arc<dyn MergeOperator>) -> Self {
Self { operator }
}
}
impl SlateDbMergeOperator for SlateDbMergeOperatorAdapter {
fn merge(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
value: Bytes,
) -> Result<Bytes, MergeOperatorError> {
Ok(self.operator.merge_batch(key, existing_value, &[value]))
}
fn merge_batch(
&self,
key: &Bytes,
existing_value: Option<Bytes>,
operands: &[Bytes],
) -> Result<Bytes, MergeOperatorError> {
if operands.is_empty() && existing_value.is_none() {
return Err(MergeOperatorError::EmptyBatch);
}
Ok(self.operator.merge_batch(key, existing_value, operands))
}
}
fn default_scan_options() -> ScanOptions {
ScanOptions {
durability_filter: Default::default(),
dirty: false,
read_ahead_bytes: 1024 * 1024,
cache_blocks: true,
max_fetch_tasks: 4,
}
}
pub struct SlateDbStorage {
pub(super) db: Arc<Db>,
durable_tx: watch::Sender<u64>,
durable_bridge_abort: tokio::task::AbortHandle,
}
impl SlateDbStorage {
pub fn new(db: Arc<Db>) -> Self {
let slate_rx = db.subscribe();
let (durable_tx, _) = watch::channel(slate_rx.borrow().durable_seq);
let task = tokio::spawn({
let tx = durable_tx.clone();
async move {
let mut slate_rx = slate_rx;
while slate_rx.changed().await.is_ok() {
let durable_seq = slate_rx.borrow_and_update().durable_seq;
if tx.send(durable_seq).is_err() {
break;
}
}
}
});
Self {
db,
durable_tx,
durable_bridge_abort: task.abort_handle(),
}
}
pub fn merge_operator_adapter(operator: Arc<dyn MergeOperator>) -> SlateDbMergeOperatorAdapter {
SlateDbMergeOperatorAdapter::new(operator)
}
}
#[async_trait]
impl StorageRead for SlateDbStorage {
#[tracing::instrument(level = "trace", skip_all)]
async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
let value = self
.db
.get(&key)
.await
.map_err(StorageError::from_storage)?;
match value {
Some(v) => Ok(Some(Record::new(key, v))),
None => Ok(None),
}
}
#[tracing::instrument(level = "trace", skip_all)]
async fn scan_iter(
&self,
range: BytesRange,
) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
let iter = self
.db
.scan_with_options(range, &default_scan_options())
.await
.map_err(StorageError::from_storage)?;
Ok(Box::new(SlateDbIterator { iter }))
}
}
pub(super) struct SlateDbIterator {
iter: DbIterator,
}
#[async_trait]
impl StorageIterator for SlateDbIterator {
#[tracing::instrument(level = "trace", skip_all)]
async fn next(&mut self) -> StorageResult<Option<Record>> {
match self.iter.next().await.map_err(StorageError::from_storage)? {
Some(entry) => Ok(Some(Record::new(entry.key, entry.value))),
None => Ok(None),
}
}
}
pub struct SlateDbStorageSnapshot {
snapshot: Arc<DbSnapshot>,
}
#[async_trait]
impl StorageRead for SlateDbStorageSnapshot {
#[tracing::instrument(level = "trace", skip_all)]
async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
let value = self
.snapshot
.get(&key)
.await
.map_err(StorageError::from_storage)?;
match value {
Some(v) => Ok(Some(Record::new(key, v))),
None => Ok(None),
}
}
#[tracing::instrument(level = "trace", skip_all)]
async fn scan_iter(
&self,
range: BytesRange,
) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
let iter = self
.snapshot
.scan_with_options(range, &default_scan_options())
.await
.map_err(StorageError::from_storage)?;
Ok(Box::new(SlateDbIterator { iter }))
}
}
#[async_trait]
impl StorageSnapshot for SlateDbStorageSnapshot {}
#[async_trait]
impl Storage for SlateDbStorage {
async fn apply_with_options(
&self,
records: Vec<RecordOp>,
options: WriteOptions,
) -> StorageResult<WriteResult> {
let mut batch = WriteBatch::new();
for op in records {
match op {
RecordOp::Put(op) => {
batch.put_with_options(op.record.key, op.record.value, &op.options.into())
}
RecordOp::Merge(op) => {
batch.merge_with_options(op.record.key, op.record.value, &op.options.into())
}
RecordOp::Delete(key) => batch.delete(key),
}
}
let slate_options = SlateDbWriteOptions {
await_durable: options.await_durable,
};
let write_handle = self
.db
.write_with_options(batch, &slate_options)
.await
.map_err(StorageError::from_storage)?;
Ok(WriteResult {
seqnum: write_handle.seqnum(),
})
}
async fn put_with_options(
&self,
records: Vec<PutRecordOp>,
options: WriteOptions,
) -> StorageResult<WriteResult> {
let mut batch = WriteBatch::new();
for op in records {
batch.put_with_options(op.record.key, op.record.value, &op.options.into());
}
let slate_options = SlateDbWriteOptions {
await_durable: options.await_durable,
};
let write_handle = self
.db
.write_with_options(batch, &slate_options)
.await
.map_err(StorageError::from_storage)?;
Ok(WriteResult {
seqnum: write_handle.seqnum(),
})
}
async fn merge_with_options(
&self,
records: Vec<MergeRecordOp>,
options: WriteOptions,
) -> StorageResult<WriteResult> {
let mut batch = WriteBatch::new();
for op in records {
batch.merge_with_options(op.record.key, op.record.value, &op.options.into());
}
let slate_options = SlateDbWriteOptions {
await_durable: options.await_durable,
};
let write_handle = self
.db
.write_with_options(batch, &slate_options)
.await
.map_err(|e| {
let error_msg = e.to_string();
if error_msg.contains("merge operator") || error_msg.contains("not configured") {
StorageError::Storage(
"Merge operator not configured for this database".to_string(),
)
} else {
StorageError::from_storage(e)
}
})?;
Ok(WriteResult {
seqnum: write_handle.seqnum(),
})
}
fn subscribe_durable(&self) -> watch::Receiver<u64> {
self.durable_tx.subscribe()
}
async fn snapshot(&self) -> StorageResult<Arc<dyn StorageSnapshot>> {
let snapshot = self
.db
.snapshot()
.await
.map_err(StorageError::from_storage)?;
Ok(Arc::new(SlateDbStorageSnapshot { snapshot }))
}
async fn flush(&self) -> StorageResult<()> {
self.db.flush().await.map_err(StorageError::from_storage)?;
Ok(())
}
async fn close(&self) -> StorageResult<()> {
self.durable_bridge_abort.abort();
self.db.close().await.map_err(StorageError::from_storage)?;
Ok(())
}
#[cfg(feature = "metrics")]
fn register_metrics(&self, registry: &mut prometheus_client::registry::Registry) {
let stat_registry = self.db.metrics();
let mut seen = std::collections::HashSet::new();
for name in stat_registry.names() {
if let Some(stat) = stat_registry.lookup(name) {
let sanitized: String = name
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || c == '_' {
c
} else {
'_'
}
})
.collect();
let prom_name = format!("slatedb_{sanitized}");
if !seen.insert(prom_name.clone()) {
tracing::warn!(
"Duplicate metric name after sanitization: {prom_name:?} (from {name:?}, skipped)"
);
continue;
}
registry.register(
&prom_name,
format!("SlateDB {name}"),
ReadableStatGauge(stat),
);
}
}
}
}
impl From<Ttl> for slatedb::config::Ttl {
fn from(value: Ttl) -> Self {
match value {
Ttl::Default => slatedb::config::Ttl::Default,
Ttl::NoExpiry => slatedb::config::Ttl::NoExpiry,
Ttl::ExpireAfter(ts) => slatedb::config::Ttl::ExpireAfter(ts),
}
}
}
impl From<PutOptions> for slatedb::config::PutOptions {
fn from(value: PutOptions) -> Self {
Self {
ttl: value.ttl.into(),
}
}
}
impl From<MergeOptions> for slatedb::config::MergeOptions {
fn from(value: MergeOptions) -> Self {
Self {
ttl: value.ttl.into(),
}
}
}
pub struct SlateDbStorageReader {
reader: Arc<DbReader>,
}
impl SlateDbStorageReader {
pub fn new(reader: Arc<DbReader>) -> Self {
Self { reader }
}
}
#[async_trait]
impl StorageRead for SlateDbStorageReader {
#[tracing::instrument(level = "trace", skip_all)]
async fn get(&self, key: Bytes) -> StorageResult<Option<Record>> {
let value = self
.reader
.get(&key)
.await
.map_err(StorageError::from_storage)?;
match value {
Some(v) => Ok(Some(Record::new(key, v))),
None => Ok(None),
}
}
#[tracing::instrument(level = "trace", skip_all)]
async fn scan_iter(
&self,
range: BytesRange,
) -> StorageResult<Box<dyn StorageIterator + Send + 'static>> {
let iter = self
.reader
.scan_with_options(range, &default_scan_options())
.await
.map_err(StorageError::from_storage)?;
Ok(Box::new(SlateDbIterator { iter }))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::BytesRange;
use slatedb::DbBuilder;
use slatedb::config::{DbReaderOptions, Settings};
use slatedb::object_store::memory::InMemory;
use slatedb_common::clock::MockSystemClock;
#[tokio::test]
async fn should_read_data_written_by_storage_via_reader() {
let object_store = Arc::new(InMemory::new());
let path = "/test/db";
let db = DbBuilder::new(path, object_store.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.put(vec![
Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
])
.await
.unwrap();
storage.flush().await.unwrap();
let reader = DbReader::open(path, object_store, None, Default::default())
.await
.unwrap();
let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
assert!(record.is_some());
assert_eq!(record.unwrap().value, Bytes::from("value1"));
let record = storage_reader.get(Bytes::from("key2")).await.unwrap();
assert!(record.is_some());
assert_eq!(record.unwrap().value, Bytes::from("value2"));
let record = storage_reader.get(Bytes::from("key3")).await.unwrap();
assert!(record.is_none());
storage.close().await.unwrap();
}
#[tokio::test]
async fn should_scan_data_written_by_storage_via_reader() {
let object_store = Arc::new(InMemory::new());
let path = "/test/db";
let db = DbBuilder::new(path, object_store.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.put(vec![
Record::new(Bytes::from("a"), Bytes::from("1")).into(),
Record::new(Bytes::from("b"), Bytes::from("2")).into(),
Record::new(Bytes::from("c"), Bytes::from("3")).into(),
])
.await
.unwrap();
storage.flush().await.unwrap();
let reader = DbReader::open(path, object_store, None, Default::default())
.await
.unwrap();
let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
let mut iter = storage_reader
.scan_iter(BytesRange::unbounded())
.await
.unwrap();
let mut results = Vec::new();
while let Some(record) = iter.next().await.unwrap() {
results.push((record.key, record.value));
}
assert_eq!(results.len(), 3);
assert_eq!(results[0], (Bytes::from("a"), Bytes::from("1")));
assert_eq!(results[1], (Bytes::from("b"), Bytes::from("2")));
assert_eq!(results[2], (Bytes::from("c"), Bytes::from("3")));
storage.close().await.unwrap();
}
#[tokio::test]
async fn should_coexist_writer_and_reader_without_fencing_error() {
let object_store = Arc::new(InMemory::new());
let path = "/test/db";
let db = DbBuilder::new(path, object_store.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.put(vec![
Record::new(Bytes::from("key1"), Bytes::from("value1")).into(),
])
.await
.unwrap();
storage.flush().await.unwrap();
let reader = DbReader::open(path, object_store, None, Default::default())
.await
.unwrap();
let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
let record = storage_reader.get(Bytes::from("key1")).await.unwrap();
assert!(record.is_some());
assert_eq!(record.unwrap().value, Bytes::from("value1"));
storage
.put(vec![
Record::new(Bytes::from("key2"), Bytes::from("value2")).into(),
])
.await
.unwrap();
storage.flush().await.unwrap();
storage.close().await.unwrap();
}
#[tokio::test]
async fn should_expire_records_based_on_ttl() {
let object_store = Arc::new(InMemory::new());
let path = "/test/ttl_db";
let clock = Arc::new(MockSystemClock::new());
let db = DbBuilder::new(path, object_store)
.with_settings(Settings {
default_ttl: Some(30_000),
..Default::default()
})
.with_system_clock(clock.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.put(vec![
PutRecordOp::new_with_options(
Record::new(Bytes::from("key1"), Bytes::from("value1")),
PutOptions {
ttl: Ttl::ExpireAfter(20_000),
},
),
PutRecordOp::new_with_options(
Record::new(Bytes::from("key2"), Bytes::from("value2")),
PutOptions { ttl: Ttl::Default },
),
PutRecordOp::new_with_options(
Record::new(Bytes::from("key3"), Bytes::from("value3")),
PutOptions { ttl: Ttl::NoExpiry },
),
])
.await
.unwrap();
assert!(storage.get(Bytes::from("key1")).await.unwrap().is_some());
assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
clock.set(25_000);
assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
clock.set(35_000);
assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
let record = storage.get(Bytes::from("key3")).await.unwrap();
assert!(record.is_some());
assert_eq!(record.unwrap().value, Bytes::from("value3"));
storage.close().await.unwrap();
}
struct ConcatMergeOperator;
impl MergeOperator for ConcatMergeOperator {
fn merge_batch(
&self,
_key: &Bytes,
existing_value: Option<Bytes>,
operands: &[Bytes],
) -> Bytes {
let mut result = existing_value.unwrap_or_default().to_vec();
for operand in operands {
result.extend_from_slice(operand);
}
Bytes::from(result)
}
}
#[tokio::test]
async fn should_expire_merge_records_based_on_ttl() {
let object_store = Arc::new(InMemory::new());
let path = "/test/merge_ttl_db";
let clock = Arc::new(MockSystemClock::new());
let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
let slate_merge_op = SlateDbStorage::merge_operator_adapter(merge_op);
let db = DbBuilder::new(path, object_store)
.with_settings(Settings {
default_ttl: Some(30_000),
..Default::default()
})
.with_system_clock(clock.clone())
.with_merge_operator(Arc::new(slate_merge_op))
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.merge(vec![
MergeRecordOp::new_with_ttl(
Record::new(Bytes::from("key1"), Bytes::from("v1")),
MergeOptions {
ttl: Ttl::ExpireAfter(20_000),
},
),
MergeRecordOp::new_with_ttl(
Record::new(Bytes::from("key2"), Bytes::from("v2")),
MergeOptions { ttl: Ttl::Default },
),
MergeRecordOp::new_with_ttl(
Record::new(Bytes::from("key3"), Bytes::from("v3")),
MergeOptions { ttl: Ttl::NoExpiry },
),
])
.await
.unwrap();
assert_eq!(
storage
.get(Bytes::from("key1"))
.await
.unwrap()
.unwrap()
.value,
Bytes::from("v1")
);
assert_eq!(
storage
.get(Bytes::from("key2"))
.await
.unwrap()
.unwrap()
.value,
Bytes::from("v2")
);
assert_eq!(
storage
.get(Bytes::from("key3"))
.await
.unwrap()
.unwrap()
.value,
Bytes::from("v3")
);
clock.set(25_000);
assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
assert!(storage.get(Bytes::from("key2")).await.unwrap().is_some());
assert!(storage.get(Bytes::from("key3")).await.unwrap().is_some());
clock.set(35_000);
assert!(storage.get(Bytes::from("key1")).await.unwrap().is_none());
assert!(storage.get(Bytes::from("key2")).await.unwrap().is_none());
let record = storage.get(Bytes::from("key3")).await.unwrap();
assert!(record.is_some());
assert_eq!(record.unwrap().value, Bytes::from("v3"));
storage.close().await.unwrap();
}
async fn reader_can_see(path: &str, object_store: Arc<InMemory>, key: &str) -> bool {
reader_can_see_with_merge_op(path, object_store, key, None).await
}
async fn reader_can_see_with_merge_op(
path: &str,
object_store: Arc<InMemory>,
key: &str,
merge_op: Option<Arc<dyn SlateDbMergeOperator + Send + Sync>>,
) -> bool {
let options = DbReaderOptions {
merge_operator: merge_op,
..Default::default()
};
let reader = DbReader::open(path, object_store, None, options)
.await
.unwrap();
let storage_reader = SlateDbStorageReader::new(Arc::new(reader));
storage_reader
.get(Bytes::from(key.to_owned()))
.await
.unwrap()
.is_some()
}
#[tokio::test]
async fn put_defaults_to_not_await_durable() {
let object_store = Arc::new(InMemory::new());
let path = "/test/put_default_durability";
let db = DbBuilder::new(path, object_store.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.put(vec![
Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
])
.await
.unwrap();
assert!(!reader_can_see(path, object_store.clone(), "k1").await);
storage.flush().await.unwrap();
assert!(reader_can_see(path, object_store.clone(), "k1").await);
storage.close().await.unwrap();
}
#[tokio::test]
async fn put_with_await_durable_true_is_visible_to_reader() {
let object_store = Arc::new(InMemory::new());
let path = "/test/put_durable";
let db = DbBuilder::new(path, object_store.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.put_with_options(
vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
WriteOptions {
await_durable: true,
},
)
.await
.unwrap();
assert!(reader_can_see(path, object_store.clone(), "k1").await);
storage.close().await.unwrap();
}
#[tokio::test]
async fn apply_defaults_to_not_await_durable() {
let object_store = Arc::new(InMemory::new());
let path = "/test/apply_default_durability";
let db = DbBuilder::new(path, object_store.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.apply(vec![RecordOp::Put(
Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
)])
.await
.unwrap();
assert!(!reader_can_see(path, object_store.clone(), "k1").await);
storage.flush().await.unwrap();
assert!(reader_can_see(path, object_store.clone(), "k1").await);
storage.close().await.unwrap();
}
#[tokio::test]
async fn apply_with_await_durable_true_is_visible_to_reader() {
let object_store = Arc::new(InMemory::new());
let path = "/test/apply_durable";
let db = DbBuilder::new(path, object_store.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.apply_with_options(
vec![RecordOp::Put(
Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
)],
WriteOptions {
await_durable: true,
},
)
.await
.unwrap();
assert!(reader_can_see(path, object_store.clone(), "k1").await);
storage.close().await.unwrap();
}
#[tokio::test]
async fn merge_defaults_to_not_await_durable() {
let object_store = Arc::new(InMemory::new());
let path = "/test/merge_default_durability";
let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
let db = DbBuilder::new(path, object_store.clone())
.with_merge_operator(slate_merge_op.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.merge(vec![
Record::new(Bytes::from("k1"), Bytes::from("v1")).into(),
])
.await
.unwrap();
let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
assert!(
!reader_can_see_with_merge_op(
path,
object_store.clone(),
"k1",
Some(reader_merge_op.clone()),
)
.await
);
storage.flush().await.unwrap();
assert!(
reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
.await
);
storage.close().await.unwrap();
}
#[tokio::test]
async fn merge_with_await_durable_true_is_visible_to_reader() {
let object_store = Arc::new(InMemory::new());
let path = "/test/merge_durable";
let merge_op: Arc<dyn MergeOperator> = Arc::new(ConcatMergeOperator);
let slate_merge_op = Arc::new(SlateDbStorage::merge_operator_adapter(merge_op.clone()));
let db = DbBuilder::new(path, object_store.clone())
.with_merge_operator(slate_merge_op.clone())
.build()
.await
.unwrap();
let storage = SlateDbStorage::new(Arc::new(db));
storage
.merge_with_options(
vec![Record::new(Bytes::from("k1"), Bytes::from("v1")).into()],
WriteOptions {
await_durable: true,
},
)
.await
.unwrap();
let reader_merge_op: Arc<dyn SlateDbMergeOperator + Send + Sync> =
Arc::new(SlateDbStorage::merge_operator_adapter(merge_op));
assert!(
reader_can_see_with_merge_op(path, object_store.clone(), "k1", Some(reader_merge_op),)
.await
);
storage.close().await.unwrap();
}
}
#[cfg(all(test, feature = "metrics"))]
mod metrics_tests {
use super::ReadableStatGauge;
use prometheus_client::encoding::EncodeMetric;
use slatedb::stats::{MetricType as SlateMetricType, ReadableStat};
use std::sync::Arc;
#[derive(Debug)]
struct MockStat {
metric_type: SlateMetricType,
}
impl ReadableStat for MockStat {
fn get(&self) -> i64 {
0
}
fn metric_type(&self) -> SlateMetricType {
self.metric_type
}
}
#[test]
fn should_return_counter_when_slate_metric_type_is_counter() {
let stat = Arc::new(MockStat {
metric_type: SlateMetricType::Counter,
});
let gauge = ReadableStatGauge(stat);
let result = gauge.metric_type();
assert!(matches!(
result,
prometheus_client::metrics::MetricType::Counter,
));
}
#[test]
fn should_return_gauge_when_slate_metric_type_is_gauge() {
let stat = Arc::new(MockStat {
metric_type: SlateMetricType::Gauge,
});
let gauge = ReadableStatGauge(stat);
let result = gauge.metric_type();
assert!(matches!(
result,
prometheus_client::metrics::MetricType::Gauge,
));
}
}