pub mod metrics;
#[cfg(feature = "metrics-rs")]
pub mod metrics_rs_adapter;
use arc_swap::ArcSwap;
use std::sync::{Arc, LazyLock};
pub use metrics::{
CriticalEvent, ErrorCategory, METRIC_CRITICAL_EVENTS, METRIC_ERRORS, METRIC_LABEL_CATEGORY,
METRIC_LABEL_DURABILITY_MODE, METRIC_LABEL_EVENT, METRIC_LABEL_STATUS,
METRIC_TRANSACTION_COMMIT_DURATION, METRIC_TRANSACTION_COMMITS, METRIC_TRANSACTION_OPERATIONS,
METRIC_WRITE_CONFLICTS, METRICS, Metrics, MetricsRecorder, MetricsSnapshot, NoOpMetrics,
};
pub const DB_SYSTEM_NAME: &str = "aletheiadb";
pub const SPAN_QUERY_EXECUTE: &str = "aletheiadb.query.execute";
pub const SPAN_QUERY_HYBRID: &str = "aletheiadb.query.hybrid";
pub const SPAN_VECTOR_INDEX: &str = "aletheiadb.vector.index";
pub const SPAN_VECTOR_SEARCH: &str = "aletheiadb.vector.search";
pub const SPAN_TEMPORAL_QUERY: &str = "aletheiadb.temporal.query";
pub const SPAN_STORAGE_HISTORICAL_QUERY: &str = "aletheiadb.storage.historical.query";
pub const SPAN_TRANSACTION_COMMIT: &str = "aletheiadb.transaction.commit";
pub const ATTR_DB_SYSTEM_NAME: &str = "db.system.name";
pub const ATTR_DB_OPERATION_NAME: &str = "db.operation.name";
pub const ATTR_QUERY_KIND: &str = "aletheiadb.query.kind";
pub const ATTR_VECTOR_PROPERTY: &str = "aletheiadb.vector.property";
pub const ATTR_DURABILITY_MODE: &str = "aletheiadb.durability.mode";
pub const ATTR_TRANSACTION_ID: &str = "aletheiadb.transaction.id";
pub const ATTR_ERROR_CATEGORY: &str = "aletheiadb.error.category";
pub const ATTR_STATUS: &str = "aletheiadb.status";
static METRICS_RECORDER: LazyLock<ArcSwap<SharedMetricsRecorder>> =
LazyLock::new(|| ArcSwap::from_pointee(SharedMetricsRecorder::new(Arc::new(NoOpMetrics))));
struct SharedMetricsRecorder {
inner: Arc<dyn MetricsRecorder>,
}
impl SharedMetricsRecorder {
fn new(inner: Arc<dyn MetricsRecorder>) -> Self {
Self { inner }
}
}
impl MetricsRecorder for SharedMetricsRecorder {
fn record_error(&self, category: ErrorCategory) {
self.inner.record_error(category);
}
fn record_write_conflict(&self) {
self.inner.record_write_conflict();
}
fn record_critical_event(&self, event: CriticalEvent) {
self.inner.record_critical_event(event);
}
fn record_transaction_commit(
&self,
duration_secs: f64,
operations_count: u64,
durability_mode: &str,
status: &str,
) {
self.inner.record_transaction_commit(
duration_secs,
operations_count,
durability_mode,
status,
);
}
}
#[derive(Clone)]
pub struct TelemetryConfig {
pub service_name: Arc<str>,
pub metrics: Arc<dyn MetricsRecorder>,
}
impl std::fmt::Debug for TelemetryConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TelemetryConfig")
.field("service_name", &self.service_name)
.field("metrics", &std::any::type_name_of_val(&*self.metrics))
.finish()
}
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self {
service_name: Arc::from(DB_SYSTEM_NAME),
metrics: Arc::new(NoOpMetrics),
}
}
}
impl TelemetryConfig {
#[must_use]
pub fn builder() -> TelemetryConfigBuilder {
TelemetryConfigBuilder::default()
}
}
#[derive(Default)]
pub struct TelemetryConfigBuilder {
service_name: Option<Arc<str>>,
metrics: Option<Arc<dyn MetricsRecorder>>,
}
impl TelemetryConfigBuilder {
#[must_use]
pub fn service_name(mut self, service_name: impl Into<Arc<str>>) -> Self {
self.service_name = Some(service_name.into());
self
}
#[must_use]
pub fn metrics(mut self, metrics: Arc<dyn MetricsRecorder>) -> Self {
self.metrics = Some(metrics);
self
}
#[must_use]
pub fn build(self) -> TelemetryConfig {
TelemetryConfig {
service_name: self
.service_name
.unwrap_or_else(|| Arc::from(DB_SYSTEM_NAME)),
metrics: self.metrics.unwrap_or_else(|| Arc::new(NoOpMetrics)),
}
}
}
pub fn install(config: TelemetryConfig) {
METRICS_RECORDER.store(Arc::new(SharedMetricsRecorder::new(config.metrics)));
}
#[must_use]
pub fn metrics() -> MetricsSnapshot {
METRICS.snapshot()
}
pub fn record_error(category: ErrorCategory) {
METRICS.record_error(category);
with_installed_recorder(|recorder| recorder.record_error(category));
}
pub fn record_write_conflict() {
METRICS.record_write_conflict();
with_installed_recorder(|recorder| recorder.record_write_conflict());
}
pub fn record_critical_event(event: CriticalEvent) {
METRICS.record_critical_event(event);
with_installed_recorder(|recorder| recorder.record_critical_event(event));
}
pub fn record_transaction_commit(
duration_secs: f64,
operations_count: u64,
durability_mode: &str,
status: &str,
) {
METRICS.record_transaction_commit(duration_secs, operations_count, durability_mode, status);
with_installed_recorder(|recorder| {
recorder.record_transaction_commit(
duration_secs,
operations_count,
durability_mode,
status,
);
});
}
fn with_installed_recorder(f: impl FnOnce(&dyn MetricsRecorder)) {
let recorder = METRICS_RECORDER.load();
f(&**recorder);
}
#[must_use]
pub fn query_execute_span(operation_name: &str, query_kind: &str) -> tracing::Span {
tracing::info_span!(
SPAN_QUERY_EXECUTE,
"db.system.name" = DB_SYSTEM_NAME,
"db.operation.name" = operation_name,
"aletheiadb.query.kind" = query_kind,
)
}
#[must_use]
pub fn hybrid_query_span(operation_name: &str) -> tracing::Span {
tracing::info_span!(
SPAN_QUERY_HYBRID,
"db.system.name" = DB_SYSTEM_NAME,
"db.operation.name" = operation_name,
"aletheiadb.query.kind" = "hybrid",
)
}
#[must_use]
pub fn vector_index_span(operation_name: &str, property_name: &str) -> tracing::Span {
tracing::info_span!(
SPAN_VECTOR_INDEX,
"db.system.name" = DB_SYSTEM_NAME,
"db.operation.name" = operation_name,
"aletheiadb.vector.property" = property_name,
)
}
#[must_use]
pub fn vector_search_span(operation_name: &str, property_name: &str) -> tracing::Span {
tracing::info_span!(
SPAN_VECTOR_SEARCH,
"db.system.name" = DB_SYSTEM_NAME,
"db.operation.name" = operation_name,
"aletheiadb.vector.property" = property_name,
)
}
#[must_use]
pub fn temporal_query_span(operation_name: &str) -> tracing::Span {
tracing::info_span!(
SPAN_TEMPORAL_QUERY,
"db.system.name" = DB_SYSTEM_NAME,
"db.operation.name" = operation_name,
"aletheiadb.query.kind" = "temporal",
)
}
#[must_use]
pub fn historical_storage_query_span(operation_name: &str) -> tracing::Span {
tracing::info_span!(
SPAN_STORAGE_HISTORICAL_QUERY,
"db.system.name" = DB_SYSTEM_NAME,
"db.operation.name" = operation_name,
"aletheiadb.query.kind" = "temporal",
)
}
#[must_use]
pub fn transaction_commit_span(tx_id: &str, durability_mode: &str) -> tracing::Span {
tracing::info_span!(
SPAN_TRANSACTION_COMMIT,
"db.system.name" = DB_SYSTEM_NAME,
"db.operation.name" = "transaction.commit",
"aletheiadb.transaction.id" = tx_id,
"aletheiadb.durability.mode" = durability_mode,
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_telemetry_is_noop() {
let telemetry = TelemetryConfig::default();
assert_eq!(&*telemetry.service_name, DB_SYSTEM_NAME);
telemetry.metrics.record_error(ErrorCategory::Other);
}
#[test]
fn install_accepts_replacement_config() {
install(TelemetryConfig::default());
install(
TelemetryConfig::builder()
.service_name("test-service")
.build(),
);
record_error(ErrorCategory::Query);
}
}