mod exporter;
pub use exporter::ExportMessage;
pub use exporter::Exporter as TokioExporter;
pub use exporter::ExporterConfig;
pub use exporter::StartError;
pub struct TelemetryGuard {
exporter: Option<exporter::Exporter>,
task_handles: Vec<tokio::task::JoinHandle<()>>,
metrics_flush: Option<MetricsFlushState>,
}
struct MetricsFlushState {
config: MetricsExportConfig,
sink: Arc<dyn TelemetrySink>,
}
impl From<exporter::Exporter> for TelemetryGuard {
fn from(exporter: exporter::Exporter) -> Self {
Self {
exporter: Some(exporter),
task_handles: Vec::new(),
metrics_flush: None,
}
}
}
impl TelemetryGuard {
pub async fn shutdown(mut self) {
self.abort_tasks();
if let Some(mf) = self.metrics_flush.take() {
if let Some(ref exporter) = self.exporter {
exporter.flush().await;
if let Some(data) = rolly::collect_and_encode_metrics(&mf.config) {
mf.sink.send_metrics(data);
}
}
}
if let Some(exporter) = self.exporter.take() {
exporter.flush().await;
exporter.shutdown().await;
}
}
fn abort_tasks(&mut self) {
for handle in self.task_handles.drain(..) {
handle.abort();
}
}
fn final_metrics_flush(&self) {
if let Some(ref mf) = self.metrics_flush {
if let Some(data) = rolly::collect_and_encode_metrics(&mf.config) {
mf.sink.send_metrics(data);
}
}
}
}
impl Drop for TelemetryGuard {
fn drop(&mut self) {
self.abort_tasks();
self.final_metrics_flush();
if let Some(ref exporter) = self.exporter {
let flush_shutdown = async {
exporter.flush().await;
exporter.shutdown().await;
};
if let Ok(handle) = tokio::runtime::Handle::try_current() {
if handle.runtime_flavor() == tokio::runtime::RuntimeFlavor::MultiThread {
tokio::task::block_in_place(|| {
handle.block_on(flush_shutdown);
});
} else {
let exporter = exporter.clone();
handle.spawn(async move {
exporter.flush().await;
exporter.shutdown().await;
});
}
} else {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build();
if let Ok(rt) = rt {
rt.block_on(flush_shutdown);
}
}
}
}
}
pub use rolly::*;
#[cfg(feature = "_bench")]
#[doc(hidden)]
pub mod bench {
pub use crate::exporter::{ExportMessage, Exporter, ExporterConfig};
pub use rolly::bench::*;
}
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug)]
#[non_exhaustive]
pub enum InitError {
SubscriberAlreadySet(tracing_subscriber::util::TryInitError),
Exporter(StartError),
}
impl std::fmt::Display for InitError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SubscriberAlreadySet(e) => write!(f, "global subscriber already set: {}", e),
Self::Exporter(e) => write!(f, "failed to start exporter: {}", e),
}
}
}
impl std::error::Error for InitError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::SubscriberAlreadySet(e) => Some(e),
Self::Exporter(e) => Some(e),
}
}
}
impl From<tracing_subscriber::util::TryInitError> for InitError {
fn from(e: tracing_subscriber::util::TryInitError) -> Self {
Self::SubscriberAlreadySet(e)
}
}
impl From<StartError> for InitError {
fn from(e: StartError) -> Self {
Self::Exporter(e)
}
}
pub fn init_global_once(config: TelemetryConfig) -> TelemetryGuard {
match try_init_global(config) {
Ok(guard) => guard,
Err(InitError::SubscriberAlreadySet(_)) => {
tracing::warn!(
"rolly: global tracing subscriber already set, \
skipping telemetry initialization"
);
TelemetryGuard {
exporter: None,
task_handles: Vec::new(),
metrics_flush: None,
}
}
Err(e) => panic!("failed to initialize telemetry: {}", e),
}
}
pub fn try_init_global(config: TelemetryConfig) -> Result<TelemetryGuard, InitError> {
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
let export_traces = config.otlp_traces_endpoint.is_some();
let export_logs = config.otlp_logs_endpoint.is_some();
let export_metrics = config.otlp_metrics_endpoint.is_some();
if let Some(interval) = config.use_metrics_interval {
if interval.is_zero() {
return Err(InitError::Exporter(StartError::InvalidConfig(
"use_metrics_interval must be > 0",
)));
}
}
if let Some(interval) = config.metrics_flush_interval {
if interval.is_zero() {
return Err(InitError::Exporter(StartError::InvalidConfig(
"metrics_flush_interval must be > 0",
)));
}
}
let metrics_url = config
.otlp_metrics_endpoint
.as_deref()
.map(|ep| format!("{}/v1/metrics", ep));
let exporter = if export_traces || export_logs || export_metrics {
let traces_url = config
.otlp_traces_endpoint
.as_deref()
.map(|ep| format!("{}/v1/traces", ep));
let logs_url = config
.otlp_logs_endpoint
.as_deref()
.map(|ep| format!("{}/v1/logs", ep));
Some(exporter::Exporter::start(ExporterConfig {
traces_url,
logs_url,
metrics_url,
backpressure_strategy: config.backpressure_strategy,
..ExporterConfig::default()
})?)
} else {
None
};
let sink: Arc<dyn TelemetrySink> = match &exporter {
Some(exp) => Arc::new(exp.clone()),
None => Arc::new(NullSink),
};
let layer_config = LayerConfig {
log_to_stderr: config.log_to_stderr,
export_traces,
export_logs,
service_name: config.service_name.clone(),
service_version: config.service_version.clone(),
environment: config.environment.clone(),
resource_attributes: config.resource_attributes.clone(),
sampling_rate: config.sampling_rate.unwrap_or(1.0),
..LayerConfig::default()
};
let layer = rolly::build_layer(&layer_config, sink.clone());
tracing_subscriber::registry().with(layer).try_init()?;
tracing::info!(
service.name = layer_config.service_name.as_str(),
service.version = layer_config.service_version.as_str(),
environment = layer_config.environment.as_str(),
"telemetry initialized"
);
let mut task_handles = Vec::new();
#[cfg(target_os = "linux")]
if let Some(interval) = config.use_metrics_interval {
let handle = tokio::spawn(async move {
let mut state = UseMetricsState::default();
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
ticker.tick().await;
rolly::collect_use_metrics(&mut state);
}
});
task_handles.push(handle);
}
let metrics_flush = if export_metrics {
let flush_interval = config
.metrics_flush_interval
.unwrap_or(Duration::from_secs(10));
let metrics_config = MetricsExportConfig {
service_name: config.service_name,
service_version: config.service_version,
environment: config.environment,
resource_attributes: config.resource_attributes,
scope_name: "rolly".to_string(),
scope_version: env!("CARGO_PKG_VERSION").to_string(),
start_time: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64,
};
let guard_state = MetricsFlushState {
config: metrics_config.clone(),
sink: sink.clone(),
};
let handle = spawn_metrics_loop(metrics_config, sink, flush_interval);
task_handles.push(handle);
Some(guard_state)
} else {
None
};
Ok(TelemetryGuard {
exporter,
task_handles,
metrics_flush,
})
}
pub fn spawn_metrics_loop(
config: MetricsExportConfig,
sink: Arc<dyn TelemetrySink>,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
ticker.tick().await;
loop {
ticker.tick().await;
if let Some(data) = rolly::collect_and_encode_metrics(&config) {
sink.send_metrics(data);
}
}
})
}