use std::env;
use actix_web::{App, Error, get, Handler, HttpRequest, HttpServer, Responder, web};
use actix_web::dev::{ServiceFactory, ServiceRequest, ServiceResponse};
use actix_web_opentelemetry::{PrometheusMetricsHandler, RequestMetrics, RequestTracing};
pub use opentelemetry;
pub use opentelemetry::global;
use opentelemetry_sdk::metrics::MeterProvider;
use opentelemetry_sdk::metrics::reader::DefaultAggregationSelector;
pub use tracing;
use tracing::{info, subscriber};
use tracing_subscriber::{EnvFilter, Layer, Registry};
use tracing_subscriber::fmt::{self, time::OffsetTime};
use tracing_subscriber::layer::SubscriberExt;
pub struct LoggerConfig {
pub sample_rate: Option<f64>,
pub detect_hostname: bool,
}
#[derive(Debug, Clone)]
pub struct Prometheus {
metrics_handler: PrometheusMetricsHandler,
provider: MeterProvider,
}
impl Prometheus {
pub fn new() -> Self {
let registry = prometheus::Registry::new();
let exporter = opentelemetry_prometheus::exporter()
.with_registry(registry.clone())
.with_aggregation_selector(DefaultAggregationSelector::new())
.build().unwrap();
info!("exporter is {:?}", &exporter);
let provider: MeterProvider = MeterProvider::builder().with_reader(exporter).build();
Self {
metrics_handler: PrometheusMetricsHandler::new(registry),
provider,
}
}
pub fn metrics_handler(&self) -> PrometheusMetricsHandler {
self.metrics_handler.clone()
}
}
#[get("/metrics")]
async fn metrics(prometheus_data: web::Data<Prometheus>, request: HttpRequest) -> impl Responder {
info!("metrics called");
prometheus_data.metrics_handler().call(request).await
}
pub fn add_metrics2actix<T>(app: App<T>, metrics_handler: PrometheusMetricsHandler) -> App<impl ServiceFactory<ServiceRequest, Config=(), Response=ServiceResponse, Error=Error, InitError=()>>
where
T: ServiceFactory<
ServiceRequest,
Config=(),
Response=ServiceResponse,
Error=Error,
InitError=(),
> + 'static {
app
.wrap(RequestTracing::new())
.wrap(RequestMetrics::default())
.route("/metrics", web::get().to(metrics_handler))
}
pub fn start_prometheus_metric_server() {
let port = env::var("PROMETHEUS_PORT").unwrap_or_else(|_| "8080".to_string());
let address = env::var("PROMETHEUS_ADDRESS").unwrap_or_else(|_| "127.0.0.1".to_string());
let address = format!("{}:{}", address, port);
let metrics_handler: PrometheusMetricsHandler = init_metric_exporter();
let result = HttpServer::new(move || {
add_metrics2actix::<_>(App::new(), metrics_handler.clone())
})
.bind(&address).unwrap()
.run();
tokio::spawn(
async move {
match result.await {
Ok(_) => {}
Err(e) => {
panic!("failed to start prometheus server e={}", e);
}
}
}
);
}
pub fn init_metric_exporter() -> PrometheusMetricsHandler {
let prometheus = Prometheus::new();
info!("prometheus exporter inited");
global::set_meter_provider(prometheus.provider);
prometheus.metrics_handler.clone()
}
pub fn init_logger(application_name: &'static str, log_config: Option<LoggerConfig>) {
let filter = EnvFilter::from_default_env()
.add_directive(format!("{}=info", application_name).parse().unwrap());
println!("filter={}", &filter);
let registry = Registry::default();
let application_name = application_name;
println!("application name={}", application_name);
let utc_offset = time::UtcOffset::from_hms(8, 0, 0).unwrap();
let timer = OffsetTime::new(utc_offset, time::format_description::well_known::Rfc3339);
let filtered = fmt::layer()
.with_timer(timer)
.compact()
.with_thread_names(true)
.with_filter(filter);
let subscriber = registry
.with(filtered); #[cfg(feature = "honeycomb-log")]
let subscriber = update_tracer(application_name, subscriber, &log_config);
subscriber::set_global_default(subscriber).expect("setting global default failed");
}
#[cfg(feature = "detect-host")]
fn gen_host_info() -> Vec<opentelemetry::KeyValue> {
match hostname::get() {
Ok(val) => {
vec![opentelemetry::KeyValue::new("host.name".to_string(), val.into_string().unwrap_or_else(|_| "unknown".into()))]
}
Err(_) => {
vec![]
}
}
}
#[cfg(feature = "honeycomb-log")]
fn update_tracer<T: tracing::Subscriber + for<'span> tracing_subscriber::registry::LookupSpan<'span>>(application_name: &str, subscriber: T, log_config: &Option<LoggerConfig>) -> tracing_subscriber::layer::Layered<tracing_subscriber::filter::Targets, tracing_subscriber::layer::Layered<tracing_opentelemetry::OpenTelemetryLayer<T, opentelemetry::sdk::trace::Tracer>, T, T>, tracing_subscriber::layer::Layered<tracing_opentelemetry::OpenTelemetryLayer<T, opentelemetry::sdk::trace::Tracer>, T, T>> {
println!("start to init honeycomb-log");
let api_key = match env::var("OTLP_KEY") {
Ok(val) => val,
Err(_) => panic!("api key not found"),
};
let mut trace_config = opentelemetry::sdk::trace::config();
if let Some(LoggerConfig { sample_rate: Some(rate), .. }) = log_config {
trace_config = trace_config.with_sampler(opentelemetry::sdk::trace::Sampler::TraceIdRatioBased(*rate));
} else {
trace_config = trace_config.with_sampler(opentelemetry::sdk::trace::Sampler::AlwaysOn);
}
let mut kvs = vec![
opentelemetry::KeyValue::new("service.name".to_string(), application_name.to_string())
];
#[cfg(feature = "detect-host")]
for kv in gen_host_info() {
kvs.push(kv);
}
let resource = opentelemetry::sdk::Resource::new(kvs);
trace_config = trace_config.with_resource(resource);
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(build_exporter(api_key.as_str()))
.with_trace_config(trace_config)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.map(|tracer| tracing_opentelemetry::layer().with_tracer(tracer)).unwrap();
let telemetry_filter = tracing_subscriber::filter::Targets::new()
.with_target("h2", tracing::Level::WARN)
.with_default(tracing::Level::INFO);
let subscriber = subscriber
.with(tracer)
.with(telemetry_filter);
subscriber
}
#[cfg(feature = "honeycomb-log")]
fn build_exporter(api_key: &str) -> opentelemetry_otlp::TonicExporterBuilder {
use opentelemetry_otlp::WithExportConfig;
let mut map = tonic::metadata::MetadataMap::with_capacity(8);
map.insert("x-honeycomb-team", api_key.parse().unwrap());
map.insert("x-honeycomb-dataset", "rust".parse().unwrap());
let exporter_builder = opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint("https://api.honeycomb.io:443")
.with_metadata(map)
.with_timeout(std::time::Duration::from_secs(3));
exporter_builder
}
#[cfg(test)]
mod test {
use std::time::Duration;
use opentelemetry::{global, KeyValue};
use tracing::info;
use tracing::instrument;
use crate::init_logger;
#[instrument]
fn foo() {
info!("test");
bar();
}
#[instrument]
fn bar() {
info!("test2");
}
#[tokio::test]
async fn test_logger() {
init_logger("test_logger", None);
for _ in 0..100 {
foo();
}
tokio::time::sleep(Duration::from_secs(4)).await;
}
#[instrument]
fn add_counter() {
let counter = global::meter("aaa").f64_counter("testCounterF64").init();
counter.add(10f64, &[KeyValue::new("rate", "standard")]);
}
}