pub mod constants;
pub mod metrics;
pub(crate) mod otlp_layer;
pub(crate) mod otlp_log;
pub(crate) mod otlp_metrics;
pub(crate) mod otlp_trace;
pub(crate) mod proto;
pub mod trace_id;
pub(crate) mod use_metrics;
pub use metrics::{counter, gauge, histogram, Counter, Gauge, Histogram};
pub use use_metrics::UseMetricsState;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::Duration;
use otlp_layer::OtlpLayer;
use tracing_subscriber::{fmt, EnvFilter, Layer};
static DROPPED_TOTAL: AtomicU64 = AtomicU64::new(0);
pub fn increment_dropped_total() {
DROPPED_TOTAL.fetch_add(1, Ordering::Relaxed);
}
pub fn telemetry_dropped_total() -> u64 {
DROPPED_TOTAL.load(Ordering::Relaxed)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum BackpressureStrategy {
#[default]
Drop,
}
pub trait TelemetrySink: Send + Sync + 'static {
fn send_traces(&self, data: Vec<u8>);
fn send_logs(&self, data: Vec<u8>);
fn send_metrics(&self, data: Vec<u8>);
}
#[derive(Debug, Default, Clone, Copy)]
pub struct NullSink;
impl TelemetrySink for NullSink {
fn send_traces(&self, _data: Vec<u8>) {}
fn send_logs(&self, _data: Vec<u8>) {}
fn send_metrics(&self, _data: Vec<u8>) {}
}
#[derive(Debug, Clone)]
pub struct TelemetryConfig {
pub service_name: String,
pub service_version: String,
pub environment: String,
pub otlp_traces_endpoint: Option<String>,
pub otlp_logs_endpoint: Option<String>,
pub otlp_metrics_endpoint: Option<String>,
pub log_to_stderr: bool,
pub use_metrics_interval: Option<Duration>,
pub metrics_flush_interval: Option<Duration>,
pub sampling_rate: Option<f64>,
pub backpressure_strategy: BackpressureStrategy,
pub resource_attributes: Vec<(String, String)>,
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self {
service_name: constants::defaults::SERVICE_NAME.to_string(),
service_version: constants::defaults::SERVICE_VERSION.to_string(),
environment: constants::defaults::ENVIRONMENT.to_string(),
otlp_traces_endpoint: None,
otlp_logs_endpoint: None,
otlp_metrics_endpoint: None,
log_to_stderr: true,
use_metrics_interval: None,
metrics_flush_interval: None,
sampling_rate: None,
backpressure_strategy: BackpressureStrategy::Drop,
resource_attributes: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct LayerConfig {
pub log_to_stderr: bool,
pub export_traces: bool,
pub export_logs: bool,
pub service_name: String,
pub service_version: String,
pub environment: String,
pub resource_attributes: Vec<(String, String)>,
pub sampling_rate: f64,
pub scope_name: String,
pub scope_version: String,
}
impl Default for LayerConfig {
fn default() -> Self {
Self {
log_to_stderr: false,
export_traces: true,
export_logs: true,
service_name: constants::defaults::SERVICE_NAME.to_string(),
service_version: constants::defaults::SERVICE_VERSION.to_string(),
environment: constants::defaults::ENVIRONMENT.to_string(),
resource_attributes: Vec::new(),
sampling_rate: 1.0,
scope_name: constants::scope::DEFAULT_NAME.to_string(),
scope_version: constants::scope::DEFAULT_VERSION.to_string(),
}
}
}
pub fn build_layer(
config: &LayerConfig,
sink: Arc<dyn TelemetrySink>,
) -> impl tracing_subscriber::Layer<tracing_subscriber::Registry> {
let filtered_fmt = if config.log_to_stderr {
let env_filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let fmt_layer = fmt::layer()
.json()
.with_target(true)
.with_current_span(true)
.with_span_list(false)
.with_writer(std::io::stderr)
.with_filter(env_filter);
Some(fmt_layer)
} else {
None
};
let sampling_rate = config.sampling_rate.clamp(0.0, 1.0);
let otlp_layer = Some(OtlpLayer::new(otlp_layer::OtlpLayerConfig {
sink,
service_name: &config.service_name,
service_version: &config.service_version,
environment: &config.environment,
resource_attributes: &config.resource_attributes,
export_traces: config.export_traces,
export_logs: config.export_logs,
sampling_rate,
scope_name: &config.scope_name,
scope_version: &config.scope_version,
}));
Layer::and_then(filtered_fmt, otlp_layer)
}
#[derive(Debug, Clone)]
pub struct MetricsExportConfig {
pub service_name: String,
pub service_version: String,
pub environment: String,
pub resource_attributes: Vec<(String, String)>,
pub scope_name: String,
pub scope_version: String,
pub start_time: u64,
}
pub fn collect_and_encode_metrics(config: &MetricsExportConfig) -> Option<Vec<u8>> {
use otlp_trace::{AnyValue, KeyValue};
let snapshots = metrics::global_registry().collect();
if snapshots.is_empty() {
return None;
}
let mut resource_attrs = vec![
KeyValue {
key: "service.name".to_string(),
value: AnyValue::String(config.service_name.clone()),
},
KeyValue {
key: "service.version".to_string(),
value: AnyValue::String(config.service_version.clone()),
},
KeyValue {
key: "deployment.environment".to_string(),
value: AnyValue::String(config.environment.clone()),
},
];
for (k, v) in &config.resource_attributes {
resource_attrs.push(KeyValue {
key: k.clone(),
value: AnyValue::String(v.clone()),
});
}
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
Some(otlp_metrics::encode_export_metrics_request(
&resource_attrs,
&config.scope_name,
&config.scope_version,
&snapshots,
config.start_time,
now,
))
}
pub fn collect_use_metrics(state: &mut UseMetricsState) {
use_metrics::poll_once(state);
}
#[cfg(feature = "_bench")]
#[doc(hidden)]
pub mod bench {
pub use crate::metrics::{
counter, gauge, global_registry, histogram, Attrs, Counter, CounterDataPoint, Exemplar,
ExemplarValue, Gauge, GaugeDataPoint, Histogram, HistogramDataPoint, MetricSnapshot,
MetricsRegistry,
};
pub fn should_sample(trace_id: [u8; 16], sampling_rate: f64) -> bool {
crate::otlp_layer::should_sample(trace_id, sampling_rate)
}
pub use crate::otlp_layer::{OtlpLayer, OtlpLayerConfig};
pub use crate::otlp_log::{encode_export_logs_request, LogData, SeverityNumber};
pub use crate::otlp_metrics::encode_export_metrics_request;
pub use crate::otlp_trace::{
encode_export_trace_request, encode_key_value, encode_resource, AnyValue, KeyValue,
SpanData, SpanKind, SpanStatus, StatusCode,
};
pub use crate::proto::{encode_message_field, encode_message_field_in_place};
pub use crate::trace_id::{generate_span_id, generate_trace_id, hex_encode};
pub use crate::BackpressureStrategy;
#[allow(clippy::result_unit_err)]
pub fn hex_to_bytes_16(s: &str) -> Result<[u8; 16], ()> {
crate::otlp_layer::hex_to_bytes_16(s)
}
pub fn encode_varint_field(buf: &mut Vec<u8>, field: u32, val: u64) {
crate::proto::encode_varint_field(buf, field, val);
}
pub fn encode_string_field(buf: &mut Vec<u8>, field: u32, s: &str) {
crate::proto::encode_string_field(buf, field, s);
}
pub fn encode_bytes_field(buf: &mut Vec<u8>, field: u32, data: &[u8]) {
crate::proto::encode_bytes_field(buf, field, data);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn config_with_none_endpoint_does_not_panic() {
let _config = TelemetryConfig {
service_name: "test-service".into(),
service_version: "0.0.1".into(),
log_to_stderr: false,
..Default::default()
};
}
#[test]
fn telemetry_dropped_total_is_callable() {
let _count = telemetry_dropped_total();
}
#[test]
fn null_sink_accepts_data() {
let sink: Box<dyn TelemetrySink> = Box::new(NullSink);
sink.send_traces(vec![1, 2, 3]);
sink.send_logs(vec![4, 5, 6]);
sink.send_metrics(vec![7, 8, 9]);
}
}