use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tracing::{debug, warn};
use super::proto::{
self, ExportMetricsServiceRequest, ExportTraceServiceRequest, KeyValue, Metric,
NumberDataPoint, Resource, ResourceMetrics, ResourceSpans, ScopeMetrics, ScopeSpans, Span,
SpanKind, SpanStatus, StatusCode as OtelStatusCode,
};
use crate::control::metrics::SystemMetrics;
#[derive(Debug, Clone)]
pub struct ExporterConfig {
pub endpoint: String,
pub metrics_interval: Duration,
pub export_traces: bool,
pub export_metrics: bool,
}
impl Default for ExporterConfig {
fn default() -> Self {
Self {
endpoint: String::new(),
metrics_interval: Duration::from_secs(15),
export_traces: false,
export_metrics: false,
}
}
}
impl ExporterConfig {
pub fn is_enabled(&self) -> bool {
!self.endpoint.is_empty() && (self.export_traces || self.export_metrics)
}
}
pub fn spawn_metrics_exporter(
config: ExporterConfig,
metrics: Arc<SystemMetrics>,
node_id: u64,
) -> watch::Sender<bool> {
let (shutdown_tx, mut shutdown_rx) = watch::channel(false);
if !config.is_enabled() || !config.export_metrics {
return shutdown_tx;
}
let endpoint = format!("{}/v1/metrics", config.endpoint.trim_end_matches('/'));
let interval = config.metrics_interval;
tokio::spawn(async move {
let client = reqwest::Client::new();
let mut ticker = tokio::time::interval(interval);
loop {
tokio::select! {
_ = ticker.tick() => {}
_ = shutdown_rx.changed() => break,
}
let request = build_metrics_request(&metrics, node_id);
let mut buf = Vec::new();
if prost::Message::encode(&request, &mut buf).is_err() {
continue;
}
match client
.post(&endpoint)
.header("content-type", "application/x-protobuf")
.body(buf)
.send()
.await
{
Ok(resp) if resp.status().is_success() => {
debug!("OTLP metrics export: OK");
}
Ok(resp) => {
warn!("OTLP metrics export: status {}", resp.status());
}
Err(e) => {
warn!("OTLP metrics export: {e}");
}
}
}
});
shutdown_tx
}
fn build_metrics_request(metrics: &SystemMetrics, node_id: u64) -> ExportMetricsServiceRequest {
use std::sync::atomic::Ordering;
let now_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
let resource = Resource {
attributes: vec![
kv("service.name", "nodedb"),
kv("service.instance.id", &node_id.to_string()),
],
};
let gauge_metrics = vec![
gauge_metric(
"nodedb_active_connections",
metrics.active_connections.load(Ordering::Relaxed) as f64,
now_ns,
),
gauge_metric(
"nodedb_wal_fsync_latency_us",
metrics.wal_fsync_latency_us.load(Ordering::Relaxed) as f64,
now_ns,
),
gauge_metric(
"nodedb_bridge_utilization",
metrics.bridge_utilization.load(Ordering::Relaxed) as f64,
now_ns,
),
gauge_metric(
"nodedb_compaction_debt",
metrics.compaction_debt.load(Ordering::Relaxed) as f64,
now_ns,
),
gauge_metric(
"nodedb_kv_memory_bytes",
metrics.kv_memory_bytes.load(Ordering::Relaxed) as f64,
now_ns,
),
];
let sum_metrics = vec![
sum_metric(
"nodedb_queries_total",
metrics.queries_total.load(Ordering::Relaxed) as f64,
now_ns,
),
sum_metric(
"nodedb_query_errors_total",
metrics.query_errors.load(Ordering::Relaxed) as f64,
now_ns,
),
sum_metric(
"nodedb_vector_searches_total",
metrics.vector_searches.load(Ordering::Relaxed) as f64,
now_ns,
),
sum_metric(
"nodedb_graph_traversals_total",
metrics.graph_traversals.load(Ordering::Relaxed) as f64,
now_ns,
),
];
let mut all_metrics = gauge_metrics;
all_metrics.extend(sum_metrics);
ExportMetricsServiceRequest {
resource_metrics: vec![ResourceMetrics {
resource: Some(resource),
scope_metrics: vec![ScopeMetrics {
scope: Some(proto::InstrumentationScope {
name: "nodedb".into(),
version: env!("CARGO_PKG_VERSION").into(),
}),
metrics: all_metrics,
}],
}],
}
}
pub struct SpanExport<'a> {
pub endpoint: &'a str,
pub trace_id: u64,
pub span_name: &'a str,
pub start_ns: u64,
pub end_ns: u64,
pub tenant_id: u32,
pub vshard_id: u16,
pub status_ok: bool,
}
pub async fn export_span(params: &SpanExport<'_>) {
let SpanExport {
endpoint,
trace_id,
span_name,
start_ns,
end_ns,
tenant_id,
vshard_id,
status_ok,
} = params;
if endpoint.is_empty() {
return;
}
let trace_bytes = trace_id.to_be_bytes().to_vec();
let span_id_bytes = rand_span_id();
let span = Span {
trace_id: [vec![0u8; 8], trace_bytes].concat(),
span_id: span_id_bytes.to_vec(),
name: (*span_name).into(),
kind: SpanKind::Server as i32,
start_time_unix_nano: *start_ns,
end_time_unix_nano: *end_ns,
attributes: vec![
kv("nodedb.tenant_id", &tenant_id.to_string()),
kv("nodedb.vshard_id", &vshard_id.to_string()),
],
status: Some(SpanStatus {
message: String::new(),
code: if *status_ok {
OtelStatusCode::Ok as i32
} else {
OtelStatusCode::Error as i32
},
}),
};
let request = ExportTraceServiceRequest {
resource_spans: vec![ResourceSpans {
resource: Some(Resource {
attributes: vec![kv("service.name", "nodedb")],
}),
scope_spans: vec![ScopeSpans {
scope: Some(proto::InstrumentationScope {
name: "nodedb".into(),
version: env!("CARGO_PKG_VERSION").into(),
}),
spans: vec![span],
}],
}],
};
let mut buf = Vec::new();
if prost::Message::encode(&request, &mut buf).is_err() {
return;
}
let url = format!("{}/v1/traces", endpoint.trim_end_matches('/'));
let _ = reqwest::Client::new()
.post(&url)
.header("content-type", "application/x-protobuf")
.body(buf)
.send()
.await;
}
fn kv(key: &str, value: &str) -> KeyValue {
KeyValue {
key: key.into(),
value: Some(proto::AnyValue {
value: Some(proto::any_value::Value::StringValue(value.into())),
}),
}
}
fn gauge_metric(name: &str, value: f64, time_ns: u64) -> Metric {
Metric {
name: name.into(),
description: String::new(),
unit: String::new(),
data: Some(proto::metric::Data::Gauge(proto::Gauge {
data_points: vec![NumberDataPoint {
attributes: vec![],
time_unix_nano: time_ns,
value: Some(proto::number_data_point::Value::AsDouble(value)),
}],
})),
}
}
fn sum_metric(name: &str, value: f64, time_ns: u64) -> Metric {
Metric {
name: name.into(),
description: String::new(),
unit: String::new(),
data: Some(proto::metric::Data::Sum(proto::Sum {
data_points: vec![NumberDataPoint {
attributes: vec![],
time_unix_nano: time_ns,
value: Some(proto::number_data_point::Value::AsDouble(value)),
}],
is_monotonic: true,
})),
}
}
fn rand_span_id() -> [u8; 8] {
let mut id = [0u8; 8];
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
id.copy_from_slice(&ts.to_le_bytes());
id
}