use {
super::prefixer::PrefixingExporter,
opentelemetry::{
KeyValue,
global,
},
opentelemetry_otlp::{
Protocol,
WithExportConfig,
WithHttpConfig,
WithTonicConfig,
},
opentelemetry_sdk::{
Resource,
propagation::TraceContextPropagator,
},
opentelemetry_stdout::SpanExporter as StdoutExporter,
};
pub struct Telemetry {
pub(crate) tracer_provider: opentelemetry_sdk::trace::SdkTracerProvider,
}
impl Drop for Telemetry {
fn drop(&mut self) {
otel::event!("Starting telemetry flush and shutdown");
if let Err(e) = self.tracer_provider.force_flush() {
otel::error!(
"tracer_provider_flush_failed",
format!("Failed to flush TracerProvider: {:?}", e)
);
} else {
otel::event!("TracerProvider flushed successfully");
}
if let Err(e) = self.tracer_provider.shutdown() {
otel::error!(
"tracer_provider_shutdown_failed",
format!("Failed to shutdown TracerProvider: {:?}", e)
);
} else {
otel::event!("TracerProvider shutdown successfully");
}
otel::event!("Telemetry shutdown complete");
}
}
fn create_grpc_exporter(
endpoint: &str,
headers: std::collections::HashMap<String, String>,
) -> anyhow::Result<opentelemetry_otlp::SpanExporter> {
let endpoint = endpoint.to_string();
let (tx, rx) = std::sync::mpsc::sync_channel::<Result<opentelemetry_otlp::SpanExporter, String>>(1);
std::thread::Builder::new()
.name("otel-grpc-runtime".to_string())
.spawn(move || {
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
| Ok(rt) => rt,
| Err(e) => {
let _ = tx.send(Err(format!("Failed to create Tokio runtime for gRPC: {e}")));
return;
}
};
let exporter_result = rt.block_on(async {
let mut builder = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&endpoint)
.with_timeout(std::time::Duration::from_secs(4));
if !headers.is_empty() {
let mut metadata = tonic::metadata::MetadataMap::new();
for (k, v) in headers.into_iter() {
if let (Ok(key), Ok(value)) = (
k.parse::<tonic::metadata::MetadataKey<_>>(),
v.parse::<tonic::metadata::MetadataValue<_>>(),
) {
metadata.insert(key, value);
}
}
builder = builder.with_metadata(metadata);
}
builder
.build()
.map_err(|e| format!("Failed to create gRPC OTLP exporter: {e}"))
});
let is_ok = exporter_result.is_ok();
let _ = tx.send(exporter_result);
if is_ok {
rt.block_on(std::future::pending::<()>());
}
})
.map_err(|e| anyhow::anyhow!("Failed to spawn gRPC runtime thread: {e}"))?;
rx.recv()
.map_err(|e| anyhow::anyhow!("Failed to receive gRPC exporter: {e}"))?
.map_err(|e| anyhow::anyhow!("{e}"))
}
pub fn setup_telemetry() -> anyhow::Result<Telemetry> {
let resource = Resource::builder()
.with_service_name(env!("CARGO_PKG_NAME"))
.with_attributes([
KeyValue::new("service.namespace", "laburnum"),
KeyValue::new("process.runtime.name", "rustc"),
KeyValue::new(
"process.runtime.version",
env!("CARGO_PKG_RUST_VERSION"),
),
KeyValue::new("service.version", env!("CARGO_PKG_VERSION")),
])
.with_schema_url(
std::iter::empty::<KeyValue>(),
format!(
"https://opentelemetry.io/schemas/{}",
otel::OTEL_SPEC_VERSION
),
)
.build();
let protocol = std::env::var("OTEL_EXPORTER_OTLP_PROTOCOL")
.unwrap_or_else(|_| "http/json".to_string());
let local_endpoint_default = if protocol == "grpc" {
"http://localhost:4317"
} else {
"http://localhost:4318"
};
let local_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT")
.unwrap_or_else(|_| local_endpoint_default.to_string());
let mut builder = opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_sampler(opentelemetry_sdk::trace::Sampler::AlwaysOn);
let local_exporter: PrefixingExporter<opentelemetry_otlp::SpanExporter> =
match protocol.as_str() {
| "grpc" => {
PrefixingExporter::new(
"laburnum",
create_grpc_exporter(
&local_endpoint,
std::collections::HashMap::new(),
)?,
)
},
| "http/protobuf" => {
PrefixingExporter::new(
"laburnum",
opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(reqwest::blocking::Client::new())
.with_protocol(Protocol::HttpBinary)
.with_endpoint(format!("{}/v1/traces", local_endpoint))
.with_timeout(std::time::Duration::from_secs(4))
.build()
.map_err(|e| anyhow::anyhow!("Failed to create HTTP/Protobuf OTLP exporter: {e}"))?,
)
},
| _ => {
PrefixingExporter::new(
"laburnum",
opentelemetry_otlp::SpanExporter::builder()
.with_http()
.with_http_client(reqwest::blocking::Client::new())
.with_protocol(Protocol::HttpJson)
.with_endpoint(format!("{}/v1/traces", local_endpoint))
.with_timeout(std::time::Duration::from_secs(4))
.build()
.map_err(|e| anyhow::anyhow!("Failed to create HTTP/JSON OTLP exporter: {e}"))?,
)
},
};
builder = builder.with_span_processor(
opentelemetry_sdk::trace::BatchSpanProcessor::builder(local_exporter)
.with_batch_config(
opentelemetry_sdk::trace::BatchConfigBuilder::default()
.with_max_queue_size(2048)
.with_max_export_batch_size(512)
.build(),
)
.build(),
);
if std::env::var("RUST_LOG").is_ok() {
let stdout_exporter = StdoutExporter::default();
builder = builder.with_span_processor(
opentelemetry_sdk::trace::SimpleSpanProcessor::new(stdout_exporter),
);
}
let tracer_provider = builder.with_resource(resource).build();
global::set_tracer_provider(tracer_provider.clone());
global::set_text_map_propagator(TraceContextPropagator::new());
Ok(Telemetry { tracer_provider })
}