use foundationdb::{metrics::TransactionMetrics, *};
mod common;
use std::borrow::Cow;
use std::sync::{Arc, Mutex};
#[test]
fn test_metrics() {
let _guard = unsafe { foundationdb::boot() };
futures::executor::block_on(instrumented_run_success()).expect("failed to run");
futures::executor::block_on(instrumented_run_with_n_retries()).expect("failed to run");
futures::executor::block_on(test_counter_metrics()).expect("failed to run");
futures::executor::block_on(test_transaction_info()).expect("failed to run");
futures::executor::block_on(test_time_metrics()).expect("failed to run");
futures::executor::block_on(test_custom_metrics()).expect("failed to run");
futures::executor::block_on(test_transaction_custom_metrics()).expect("failed to run");
}
async fn instrumented_run_success() -> FdbResult<()> {
const KEY: &[u8] = b"test_metrics_success";
const VALUE: &[u8] = b"value";
const SUCCESS: u64 = 42;
let db = common::database().await?;
let (result, metrics) = match db
.instrumented_run(|txn, _| async move {
txn.set(KEY, VALUE);
Ok(SUCCESS)
})
.await
{
Ok((result, metrics)) => (result, metrics),
Err(_err) => {
panic!()
}
};
assert_eq!(result, SUCCESS);
let total = metrics.total;
assert_eq!(total.call_set, 1);
assert_eq!(total.bytes_written, (KEY.len() + VALUE.len()) as u64);
let transaction_info = metrics.transaction;
assert_eq!(transaction_info.retries, 0);
Ok(())
}
async fn instrumented_run_with_n_retries() -> FdbResult<()> {
const KEY: &[u8] = b"test_metrics_retry";
const VALUE: &[u8] = b"value";
const SUCCESS: u64 = 42;
const EXPECTED_RETRIES: u64 = 3;
let db = common::database().await?;
let attempt_counter = Arc::new(Mutex::new(0));
let (result, metrics) = match db
.instrumented_run(|txn, _| {
let counter = attempt_counter.clone();
async move {
txn.set(KEY, VALUE);
let mut attempts = counter.lock().unwrap();
*attempts += 1;
if *attempts <= EXPECTED_RETRIES {
let fdb_error = FdbError::from_code(1020);
Err(FdbBindingError::from(fdb_error))
} else {
Ok(SUCCESS)
}
}
})
.await
{
Ok((result, metrics)) => (result, metrics),
Err(err) => {
panic!("Test failed: {:?}", err);
}
};
assert_eq!(result, SUCCESS);
let total = metrics.total;
assert_eq!(total.call_set, EXPECTED_RETRIES + 1); assert_eq!(
total.bytes_written,
(EXPECTED_RETRIES + 1) * (KEY.len() + VALUE.len()) as u64
);
let current = metrics.current;
assert_eq!(current.call_set, 1);
assert_eq!(current.bytes_written, (KEY.len() + VALUE.len()) as u64);
let transaction_info = metrics.transaction;
assert_eq!(transaction_info.retries, EXPECTED_RETRIES);
let final_attempts = *attempt_counter.lock().unwrap();
assert_eq!(final_attempts, EXPECTED_RETRIES + 1);
Ok(())
}
async fn test_counter_metrics() -> FdbResult<()> {
let db = common::database().await?;
const PREFIX: &[u8] = b"test_counter_metrics_";
const SET_OPS: usize = 3;
let mut bytes_written: u64 = 0;
for i in 0..SET_OPS {
let key = format!("{}_key{}", std::str::from_utf8(PREFIX).unwrap(), i);
let value = format!("value{}", i);
bytes_written += (key.len() + value.len()) as u64;
}
let ((fetched_count, bytes_read), metrics) = match db
.instrumented_run(|txn, _| {
async move {
for i in 0..SET_OPS {
let key = format!("{}_key{}", std::str::from_utf8(PREFIX).unwrap(), i);
let value = format!("value{}", i);
txn.set(key.as_bytes(), value.as_bytes());
}
let get_key = format!("{}_key1", std::str::from_utf8(PREFIX).unwrap()).into_bytes();
let mut bytes_read_acc = 0;
if let Some(value_slice) = txn.get(&get_key, false).await? {
bytes_read_acc += (get_key.len() + value_slice.len()) as u64;
}
let get_count = 1;
let range_begin =
format!("{}_key", std::str::from_utf8(PREFIX).unwrap()).into_bytes();
let range_end =
format!("{}_key4", std::str::from_utf8(PREFIX).unwrap()).into_bytes();
let range_option = RangeOption {
begin: KeySelector::first_greater_or_equal(Cow::from(range_begin)),
end: KeySelector::first_greater_or_equal(Cow::from(range_end)),
limit: Some(100),
..Default::default()
};
let range_result = txn.get_range(&range_option, 1, false).await?;
for kv in range_result.iter() {
bytes_read_acc += (kv.key().len() + kv.value().len()) as u64;
}
let range_count = range_result.len();
let clear_key =
format!("{}_key2", std::str::from_utf8(PREFIX).unwrap()).into_bytes();
txn.clear(&clear_key);
let clear_range_begin =
format!("{}_key1", std::str::from_utf8(PREFIX).unwrap()).into_bytes();
let clear_range_end =
format!("{}_key3", std::str::from_utf8(PREFIX).unwrap()).into_bytes();
txn.clear_range(&clear_range_begin[..], &clear_range_end[..]);
let atomic_key =
format!("{}_atomic", std::str::from_utf8(PREFIX).unwrap()).into_bytes();
txn.atomic_op(
&atomic_key,
&[1, 0, 0, 0, 0, 0, 0, 0],
options::MutationType::Add,
);
Ok((get_count + range_count, bytes_read_acc))
}
})
.await
{
Ok(val) => val,
Err((err, _)) => match err {
FdbBindingError::NonRetryableFdbError(fdb_err) => return Err(fdb_err),
_ => panic!("Test failed with unexpected error type: {:?}", err),
},
};
let report = metrics;
assert_eq!(
report.current.call_set, SET_OPS as u64,
"Should have {} SET operations",
SET_OPS
);
assert_eq!(report.current.call_get, 1, "Should have 1 GET operation");
assert_eq!(
report.current.keys_values_fetched, fetched_count as u64,
"Should have fetched {} key-values",
fetched_count
);
assert_eq!(
report.current.bytes_written, bytes_written,
"Should have written {} bytes",
bytes_written
);
assert_eq!(
report.current.bytes_read, bytes_read,
"Should have read {} bytes",
bytes_read
);
assert_eq!(
report.current.call_clear, 1,
"Should have 1 CLEAR operation"
);
assert_eq!(
report.current.call_clear_range, 1,
"Should have 1 CLEAR_RANGE operation"
);
assert_eq!(
report.current.call_atomic_op, 1,
"Should have 1 ATOMIC operation"
);
assert_eq!(report.total.call_set, report.current.call_set);
assert_eq!(report.total.call_get, report.current.call_get);
assert_eq!(
report.total.keys_values_fetched,
report.current.keys_values_fetched
);
assert_eq!(report.total.bytes_read, report.current.bytes_read);
assert_eq!(report.total.call_clear, report.current.call_clear);
assert_eq!(
report.total.call_clear_range,
report.current.call_clear_range
);
assert_eq!(report.total.call_atomic_op, report.current.call_atomic_op);
assert_eq!(report.total.bytes_written, report.current.bytes_written);
assert_eq!(report.transaction.retries, 0, "Should have no retries");
assert!(
report.transaction.commit_version.is_some(),
"Should have a commit version"
);
Ok(())
}
async fn test_transaction_info() -> FdbResult<()> {
let db = common::database().await?;
{
let metrics = TransactionMetrics::new();
let txn = db
.create_instrumented_trx(metrics.clone())
.expect("Could not create transaction");
let read_version = txn.get_read_version().await?;
metrics.set_read_version(read_version);
let transaction_info = metrics.get_transaction_info();
assert_eq!(transaction_info.read_version, Some(read_version));
txn.commit().await?;
}
{
let _metrics = TransactionMetrics::new();
let (_result, metrics_data) = db
.instrumented_run(|txn, _| async move {
txn.set(b"test_commit_version", b"value");
Ok(())
})
.await
.expect("Transaction failed");
let transaction_info = metrics_data.transaction;
assert!(transaction_info.commit_version.is_some());
}
{
const EXPECTED_RETRIES: u64 = 2;
let attempt_counter = Arc::new(Mutex::new(0));
let result = db
.instrumented_run(|_txn, _| {
let counter = attempt_counter.clone();
async move {
let mut attempts = counter.lock().unwrap();
*attempts += 1;
if *attempts <= EXPECTED_RETRIES {
let fdb_error = FdbError::from_code(1020); Err(FdbBindingError::from(fdb_error))
} else {
Ok(())
}
}
})
.await;
match result {
Ok((_, metrics_data)) => {
let transaction_info = metrics_data.transaction;
assert_eq!(transaction_info.retries, EXPECTED_RETRIES);
}
Err(_) => panic!("Transaction should have succeeded after retries"),
}
}
Ok(())
}
async fn test_time_metrics() -> FdbResult<()> {
let db = common::database().await?;
{
let (_result, metrics_data) = db
.instrumented_run(|txn, _| async move {
for i in 0..10 {
let key = format!("test_time_metrics_{}", i).into_bytes();
txn.set(&key, b"value");
}
let _ = txn.get(b"test_time_metrics_0", false).await?;
Ok(())
})
.await
.expect("Transaction failed");
let time_metrics = metrics_data.time;
assert!(
time_metrics.get_total_error_time() == 0,
"Total execution time should not be recorded"
);
assert!(
time_metrics.commit_execution_ms > 0,
"Commit execution time should be recorded"
);
let attempt_counter = Arc::new(Mutex::new(0));
let result = db
.instrumented_run(|_txn, _| {
let counter = attempt_counter.clone();
async move {
let mut attempts = counter.lock().unwrap();
*attempts += 1;
if *attempts == 1 {
let fdb_error = FdbError::from_code(1020); Err(FdbBindingError::from(fdb_error))
} else {
Ok(())
}
}
})
.await;
if let Ok((_, metrics_data)) = result {
let time_metrics = metrics_data.time;
assert!(
!time_metrics.on_error_execution_ms.is_empty(),
"Error handling time should be recorded"
);
} else {
panic!("Transaction should have succeeded after retry");
}
}
Ok(())
}
async fn test_custom_metrics() -> FdbResult<()> {
let db = common::database().await?;
{
let metrics = TransactionMetrics::new();
let txn = db
.create_instrumented_trx(metrics.clone())
.expect("Could not create transaction");
metrics.set_custom(
"custom_counter",
123,
&[("tenant", "test"), ("region", "us-west")],
);
metrics.set_custom("custom_timer", 456, &[("operation", "write")]);
metrics.increment_custom("incremented_counter", 5, &[("service", "api")]);
metrics.increment_custom("incremented_counter", 10, &[("service", "api")]);
let metrics_data = metrics.get_metrics_data();
let custom = metrics_data.custom_metrics;
let key = foundationdb::metrics::MetricKey::new(
"custom_counter",
&[("tenant", "test"), ("region", "us-west")],
);
let custom_counter = custom.get(&key).copied();
assert_eq!(custom_counter, Some(123));
let key = foundationdb::metrics::MetricKey::new("custom_timer", &[("operation", "write")]);
let custom_timer = custom.get(&key).copied();
assert_eq!(custom_timer, Some(456));
let key =
foundationdb::metrics::MetricKey::new("incremented_counter", &[("service", "api")]);
let incremented = custom.get(&key).copied();
assert_eq!(incremented, Some(15));
txn.commit().await?;
}
Ok(())
}
async fn test_transaction_custom_metrics() -> Result<(), FdbBindingError> {
let db = common::database().await?;
let result = db
.instrumented_run(|txn, _| async move {
txn.set_custom_metric("txn_counter", 100, &[("operation", "read")])?;
txn.set_custom_metric("txn_timer", 200, &[("component", "storage")])?;
txn.increment_custom_metric("txn_incremented", 10, &[("type", "query")])?;
txn.increment_custom_metric("txn_incremented", 15, &[("type", "query")])?;
let _value = txn.get(b"test_key", false).await?;
Ok(())
})
.await;
match result {
Ok((_, metrics_data)) => {
let custom = metrics_data.custom_metrics;
let key =
foundationdb::metrics::MetricKey::new("txn_counter", &[("operation", "read")]);
assert_eq!(custom.get(&key).copied(), Some(100));
let key =
foundationdb::metrics::MetricKey::new("txn_timer", &[("component", "storage")]);
assert_eq!(custom.get(&key).copied(), Some(200));
let key =
foundationdb::metrics::MetricKey::new("txn_incremented", &[("type", "query")]);
assert_eq!(custom.get(&key).copied(), Some(25));
}
Err((err, _)) => {
return Err(err);
}
}
Ok(())
}