use crate::config::FleetConfig;
use crate::publisher::TelemetryPayload;
#[derive(Debug)]
pub enum OtlpError {
Build(String),
Export(String),
}
impl std::fmt::Display for OtlpError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
OtlpError::Build(msg) => write!(f, "failed to build OTLP exporter: {msg}"),
OtlpError::Export(msg) => write!(f, "failed to export telemetry: {msg}"),
}
}
}
impl std::error::Error for OtlpError {}
fn metrics_endpoint_for(base: &str) -> String {
let trimmed = base.trim();
if trimmed.is_empty() {
return "/v1/metrics".to_string();
}
let trimmed = trimmed.trim_end_matches('/');
if let Some(prefix) = trimmed.strip_suffix("/v1/metrics") {
if prefix.is_empty() {
"/v1/metrics".to_string()
} else {
format!("{prefix}/v1/metrics")
}
} else if let Some(prefix) = trimmed.strip_suffix("/v1/traces") {
if prefix.is_empty() {
"/v1/metrics".to_string()
} else {
format!("{prefix}/v1/metrics")
}
} else {
format!("{trimmed}/v1/metrics")
}
}
#[cfg(not(target_arch = "wasm32"))]
mod native {
use super::*;
use crate::machine_info::{GpuInfo, MachineInfo};
use opentelemetry::KeyValue;
use opentelemetry::metrics::{AsyncInstrument, Gauge, Meter, MeterProvider, ObservableGauge};
use opentelemetry_otlp::{HasExportConfig, MetricExporter, Protocol};
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider, Temporality};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tracing::warn;
pub struct OtlpMetricBridge {
provider: SdkMeterProvider,
meter: Meter,
gauges: Mutex<HashMap<String, Gauge<f64>>>,
_event_observable: ObservableGauge<f64>,
event_queue: Arc<Mutex<Vec<RecordedMeasurement>>>,
_panic_observable: ObservableGauge<f64>,
panic_queue: Arc<Mutex<Vec<RecordedMeasurement>>>,
}
struct RecordedMeasurement {
value: f64,
attributes: Vec<KeyValue>,
}
impl OtlpMetricBridge {
pub fn new(config: &FleetConfig) -> Result<Self, OtlpError> {
let protocol = if config.otlp_config.use_http_json {
Protocol::HttpJson
} else {
Protocol::HttpBinary
};
let metrics_endpoint = metrics_endpoint_for(&config.aggregation_url);
let mut metrics_builder = MetricExporter::builder().with_http();
metrics_builder = metrics_builder.with_temporality(Temporality::Cumulative);
metrics_builder.export_config().endpoint = Some(metrics_endpoint.clone());
metrics_builder.export_config().protocol = protocol;
let metric_exporter = metrics_builder
.build()
.map_err(|err| OtlpError::Build(err.to_string()))?;
let export_interval = Duration::from_secs(config.publish_interval_secs.max(1));
let reader = PeriodicReader::builder(metric_exporter)
.with_interval(export_interval)
.build();
let resource = base_resource(config);
let meter_resource = resource.clone();
let provider = SdkMeterProvider::builder()
.with_resource(meter_resource)
.with_reader(reader.clone())
.build();
let meter = provider.meter("bevy_fleet");
let event_queue: Arc<Mutex<Vec<RecordedMeasurement>>> =
Arc::new(Mutex::new(Vec::new()));
let event_queue_for_callback = event_queue.clone();
let event_observable = meter
.f64_observable_gauge("fleet.event")
.with_callback(move |observer: &dyn AsyncInstrument<f64>| {
if let Ok(mut queue) = event_queue_for_callback.lock() {
for measurement in queue.drain(..) {
observer.observe(measurement.value, &measurement.attributes);
}
}
})
.build();
let panic_queue: Arc<Mutex<Vec<RecordedMeasurement>>> =
Arc::new(Mutex::new(Vec::new()));
let panic_queue_for_callback = panic_queue.clone();
let panic_observable = meter
.f64_observable_gauge("fleet.panic")
.with_callback(move |observer: &dyn AsyncInstrument<f64>| {
if let Ok(mut queue) = panic_queue_for_callback.lock() {
for measurement in queue.drain(..) {
observer.observe(measurement.value, &measurement.attributes);
}
}
})
.build();
Ok(Self {
provider,
meter,
gauges: Mutex::new(HashMap::new()),
_event_observable: event_observable,
event_queue,
_panic_observable: panic_observable,
panic_queue,
})
}
fn queue_measurement(
queue: &Arc<Mutex<Vec<RecordedMeasurement>>>,
attributes: Vec<KeyValue>,
) {
if let Ok(mut guard) = queue.lock() {
guard.push(RecordedMeasurement {
value: 1.0,
attributes,
});
} else {
warn!("Fleet OTLP bridge failed to lock measurement queue");
}
}
pub fn export(&self, payload: &TelemetryPayload) -> Result<(), OtlpError> {
let base_attrs = build_base_attributes(payload);
let session_attrs = build_session_attributes(payload);
let mut gauges = self.gauges.lock().expect("gauges mutex poisoned");
for metric in &payload.metrics {
let gauge = self.get_or_create_gauge(&mut gauges, &metric.name);
let mut attrs = Vec::new();
let tag_attrs = attributes_from_tags(&metric.tags);
extend_unique_attributes(&mut attrs, &tag_attrs);
gauge.record(metric.value, &attrs);
}
for diagnostic in &payload.diagnostics {
if let Ok(value) = diagnostic.value.parse::<f64>() {
let name = format!("diagnostic.{}", diagnostic.name);
let gauge = self.get_or_create_gauge(&mut gauges, &name);
let attrs: Vec<KeyValue> = Vec::new();
gauge.record(value, attrs.as_slice());
}
}
let session_gauge = self.get_or_create_gauge(&mut gauges, "fleet.session_info");
let mut session_info_attrs = base_attrs.clone();
extend_unique_attributes(&mut session_info_attrs, &session_attrs);
if let Some(info) = payload.machine_info.as_ref() {
let machine_attrs = machine_info_attributes(info);
extend_unique_attributes(&mut session_info_attrs, &machine_attrs);
}
session_gauge.record(
payload.session_stats.payloads_published as f64,
&session_info_attrs,
);
for event in &payload.events {
let mut attrs = base_attrs.clone();
attrs.push(KeyValue::new("fleet.event_type", event.event_type.clone()));
attrs.push(KeyValue::new(
"fleet.event.timestamp",
event.timestamp as i64,
));
for (key, value) in &event.data {
attrs.push(KeyValue::new(
format!("fleet.event.data.{key}"),
value.clone(),
));
}
Self::queue_measurement(&self.event_queue, attrs);
}
for panic in &payload.panics {
let mut attrs = base_attrs.clone();
attrs.push(KeyValue::new("fleet.panic.message", panic.message.clone()));
attrs.push(KeyValue::new(
"fleet.panic.timestamp",
panic.timestamp as i64,
));
attrs.push(KeyValue::new(
"fleet.panic.session_id",
panic.session_id.clone(),
));
if let Some(location) = &panic.location {
attrs.push(KeyValue::new("fleet.panic.location", location.clone()));
}
Self::queue_measurement(&self.panic_queue, attrs);
}
drop(gauges);
self.provider
.force_flush()
.map_err(|err| OtlpError::Export(err.to_string()))?;
Ok(())
}
fn get_or_create_gauge(
&self,
gauges: &mut HashMap<String, Gauge<f64>>,
name: &str,
) -> Gauge<f64> {
if let Some(existing) = gauges.get(name) {
return existing.clone();
}
let gauge = self
.meter
.f64_gauge(name.to_string())
.with_description("bevy_fleet metric")
.build();
gauges.insert(name.to_string(), gauge.clone());
gauge
}
}
fn extend_unique_attributes(target: &mut Vec<KeyValue>, additional: &[KeyValue]) {
for attr in additional {
let key = attr.key.clone();
if let Some(existing) = target.iter_mut().find(|kv| kv.key == key) {
*existing = attr.clone();
} else {
target.push(attr.clone());
}
}
}
fn attributes_from_tags(tags: &HashMap<String, String>) -> Vec<KeyValue> {
tags.iter()
.map(|(key, value)| KeyValue::new(key.clone(), value.clone()))
.collect()
}
fn base_resource(config: &FleetConfig) -> Resource {
let mut builder = Resource::builder_empty()
.with_service_name(config.app_name.clone())
.with_attribute(KeyValue::new("service.version", config.app_version.clone()))
.with_attribute(KeyValue::new("fleet.app_id", config.app_id.clone()));
for (key, value) in &config.otlp_config.resource_attributes {
builder = builder.with_attribute(KeyValue::new(key.clone(), value.clone()));
}
builder.build()
}
fn build_base_attributes(payload: &TelemetryPayload) -> Vec<KeyValue> {
vec![
KeyValue::new("fleet.app_id", payload.app_id.clone()),
KeyValue::new("fleet.session_id", payload.session_id.clone()),
]
}
fn build_session_attributes(payload: &TelemetryPayload) -> Vec<KeyValue> {
vec![
KeyValue::new(
"fleet.session_start_time",
payload.session_start_time as i64,
),
KeyValue::new(
"fleet.session.payloads_published",
payload.session_stats.payloads_published as i64,
),
KeyValue::new(
"fleet.session.metrics_collected",
payload.session_stats.metrics_collected as i64,
),
KeyValue::new(
"fleet.session.events_tracked",
payload.session_stats.events_tracked as i64,
),
KeyValue::new(
"fleet.session.diagnostics_recorded",
payload.session_stats.diagnostics_recorded as i64,
),
KeyValue::new(
"fleet.session.panics_captured",
payload.session_stats.panics_captured as i64,
),
KeyValue::new(
"fleet.session.uptime_seconds",
payload.session_stats.uptime_seconds as i64,
),
]
}
fn machine_info_attributes(info: &MachineInfo) -> Vec<KeyValue> {
let mut attrs = vec![
KeyValue::new("host.name", info.hostname.clone()),
KeyValue::new("os.type", info.os.clone()),
KeyValue::new("os.version", info.os_version.clone()),
KeyValue::new("host.cpu.count", info.cpu_count as i64),
KeyValue::new("host.memory.total", info.total_memory_bytes as i64),
];
if !info.kernel_version.is_empty() {
attrs.push(KeyValue::new(
"os.kernel_version",
info.kernel_version.clone(),
));
}
if let Some(gpu) = info.gpu_info.as_ref() {
attrs.extend(gpu_info_attributes(gpu));
}
attrs
}
fn gpu_info_attributes(gpu: &GpuInfo) -> Vec<KeyValue> {
let mut attrs = vec![KeyValue::new("gpu.name", gpu.name.clone())];
if gpu.vendor != 0 {
attrs.push(KeyValue::new("gpu.vendor", gpu.vendor.to_string()));
}
if gpu.device != 0 {
attrs.push(KeyValue::new("gpu.device", gpu.device.to_string()));
}
if !gpu.device_type.is_empty() {
attrs.push(KeyValue::new("gpu.device_type", gpu.device_type.clone()));
}
if !gpu.backend.is_empty() {
attrs.push(KeyValue::new("gpu.backend", gpu.backend.clone()));
}
if !gpu.driver.is_empty() {
attrs.push(KeyValue::new("gpu.driver", gpu.driver.clone()));
}
if !gpu.driver_info.is_empty() {
attrs.push(KeyValue::new("gpu.driver_info", gpu.driver_info.clone()));
}
attrs
}
}
#[cfg(not(target_arch = "wasm32"))]
pub use native::OtlpMetricBridge;
#[cfg(target_arch = "wasm32")]
mod wasm {
use super::*;
use serde_json::to_string;
use wasm_bindgen::JsValue;
use wasm_bindgen_futures::{JsFuture, spawn_local};
use web_sys::{Request, RequestInit, window};
pub struct OtlpMetricBridge {
url: String,
}
impl OtlpMetricBridge {
pub fn new(config: &FleetConfig) -> Result<Self, OtlpError> {
if config.aggregation_url.trim().is_empty() {
return Err(OtlpError::Build(
"aggregation_url must not be empty".to_string(),
));
}
let metrics_endpoint = metrics_endpoint_for(&config.aggregation_url);
Ok(Self {
url: metrics_endpoint,
})
}
pub fn export(&self, payload: &TelemetryPayload) -> Result<(), OtlpError> {
let url = self.url.clone();
let body = to_string(payload).map_err(|err| OtlpError::Export(err.to_string()))?;
spawn_local(async move {
if let Err(err) = send_json(url, body).await {
web_sys::console::error_1(&JsValue::from_str(&err));
}
});
Ok(())
}
}
async fn send_json(url: String, body: String) -> Result<(), String> {
let window = window().ok_or_else(|| "missing Window".to_string())?;
let init = RequestInit::new();
init.set_method("POST");
init.set_body(&JsValue::from_str(&body));
let request = Request::new_with_str_and_init(&url, &init).map_err(|err| {
err.as_string()
.unwrap_or_else(|| "failed to build request".into())
})?;
request
.headers()
.set("Content-Type", "application/json")
.map_err(|err| {
err.as_string()
.unwrap_or_else(|| "failed to set header".into())
})?;
let future = window.fetch_with_request(&request);
let _ = JsFuture::from(future)
.await
.map_err(|err| err.as_string().unwrap_or_else(|| "fetch error".into()))?;
Ok(())
}
}
#[cfg(target_arch = "wasm32")]
pub use wasm::OtlpMetricBridge;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn metrics_endpoint_defaults_to_metrics_path() {
assert_eq!(
metrics_endpoint_for("http://localhost:4318"),
"http://localhost:4318/v1/metrics"
);
}
#[test]
fn metrics_endpoint_replaces_traces_suffix() {
assert_eq!(
metrics_endpoint_for("http://localhost:4318/v1/traces"),
"http://localhost:4318/v1/metrics"
);
}
#[test]
fn metrics_endpoint_handles_existing_metrics_path() {
assert_eq!(
metrics_endpoint_for("http://localhost:4318/v1/metrics"),
"http://localhost:4318/v1/metrics"
);
}
}