#![warn(rustdoc::broken_intra_doc_links, rust_2018_idioms, clippy::all, missing_docs)]
use std::{
backtrace::Backtrace, collections::HashMap, error::Error, panic::{self, AssertUnwindSafe}, sync::Arc, task::{Context, Poll}
};
use axum::{body::Body, extract::MatchedPath, response::Response, RequestPartsExt};
use futures::{future::BoxFuture, FutureExt};
use http::StatusCode;
use http_body_util::BodyExt;
use hyper::Request;
use opentelemetry::KeyValue;
use opentelemetry_sdk::{runtime::{RuntimeChannel, Tokio}, trace::Config};
use opentelemetry_application_insights::HttpClient;
use serde::{de::DeserializeOwned, Serialize};
use tower::{Layer, Service};
use tracing::{Instrument, Span, Level};
use tracing_subscriber::{filter::LevelFilter, prelude::__tracing_subscriber_SubscriberExt, Registry};
pub mod exports {
pub use opentelemetry;
pub use opentelemetry_application_insights;
#[cfg(feature = "reqwest-client")]
pub use reqwest;
pub use serde;
pub use tracing;
pub use tracing_opentelemetry;
pub use tracing_subscriber;
}
pub trait AppInsightsError {
fn message(&self) -> Option<String>;
fn backtrace(&self) -> Option<String>;
}
impl AppInsightsError for () {
fn message(&self) -> Option<String> {
None
}
fn backtrace(&self) -> Option<String> {
None
}
}
pub struct Base;
pub struct WithConnectionString;
pub struct Ready;
type OptionalPanicMapper<E> = Option<Arc<dyn Fn(String) -> (u16, E) + Send + Sync + 'static>>;
type OptionalFieldMapper = Option<Arc<dyn Fn(&http::request::Parts) -> HashMap<String, String> + Send + Sync + 'static>>;
type OptionalSuccessFilter = Option<Arc<dyn Fn(StatusCode) -> bool + Send + Sync + 'static>>;
pub struct AppInsightsComplete<P, E> {
is_noop: bool,
field_mapper: OptionalFieldMapper,
panic_mapper: OptionalPanicMapper<P>,
success_filter: OptionalSuccessFilter,
_phantom: std::marker::PhantomData<E>,
}
#[cfg(feature = "reqwest-client")]
pub struct AppInsights<S = Base, C = reqwest::Client, R = Tokio, U = Registry, P = (), E = ()> {
connection_string: Option<String>,
config: Config,
client: C,
enable_live_metrics: bool,
sample_rate: f64,
batch_runtime: R,
minimum_level: LevelFilter,
subscriber: Option<U>,
should_catch_panic: bool,
is_noop: bool,
field_mapper: OptionalFieldMapper,
panic_mapper: OptionalPanicMapper<P>,
success_filter: OptionalSuccessFilter,
_phantom1: std::marker::PhantomData<S>,
_phantom2: std::marker::PhantomData<E>,
}
#[cfg(feature = "reqwest-client")]
impl Default for AppInsights<Base> {
fn default() -> Self {
Self {
connection_string: None,
config: Config::default(),
client: reqwest::Client::new(),
enable_live_metrics: false,
sample_rate: 1.0,
batch_runtime: Tokio,
minimum_level: LevelFilter::INFO,
subscriber: None,
should_catch_panic: false,
is_noop: false,
field_mapper: None,
panic_mapper: None,
success_filter: None,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
}
#[cfg(not(feature = "reqwest-client"))]
pub struct AppInsights<S = Base, C = NoopClient, R = Tokio, U = Registry, P = (), E = ()> {
connection_string: Option<String>,
config: Config,
client: C,
enable_live_metrics: bool,
sample_rate: f64,
batch_runtime: R,
minimum_level: LevelFilter,
subscriber: Option<U>,
should_catch_panic: bool,
is_noop: bool,
field_mapper: OptionalFieldMapper,
panic_mapper: OptionalPanicMapper<P>,
success_filter: OptionalSuccessFilter,
_phantom1: std::marker::PhantomData<S>,
_phantom2: std::marker::PhantomData<E>,
}
#[cfg(not(feature = "reqwest-client"))]
impl Default for AppInsights<Base> {
fn default() -> Self {
Self {
connection_string: None,
config: Config::default(),
client: NoopClient,
enable_live_metrics: false,
sample_rate: 1.0,
batch_runtime: Tokio,
minimum_level: LevelFilter::INFO,
subscriber: None,
should_catch_panic: false,
is_noop: false,
field_mapper: None,
panic_mapper: None,
success_filter: None,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
}
impl<C, R, U, P, E> AppInsights<Base, C, R, U, P, E> {
pub fn with_connection_string(self, connection_string: impl Into<Option<String>>) -> AppInsights<WithConnectionString, C, R, U, P, E> {
AppInsights {
connection_string: connection_string.into(),
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
}
impl<C, R, U, P, E> AppInsights<WithConnectionString, C, R, U, P, E> {
pub fn with_service_config(self, namespace: impl AsRef<str>, name: impl AsRef<str>, servername: impl AsRef<str>) -> AppInsights<Ready, C, R, U, P> {
let config = Config::default().with_resource(opentelemetry_sdk::Resource::new(vec![
KeyValue::new("service.namespace", namespace.as_ref().to_owned()),
KeyValue::new("service.name", name.as_ref().to_owned()),
KeyValue::new("service.instance.id", servername.as_ref().to_owned()), ]));
AppInsights {
connection_string: self.connection_string,
config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_trace_config(self, config: Config) -> AppInsights<Ready, C, R, U, P> {
AppInsights {
connection_string: self.connection_string,
config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
}
impl<C, R, U, P, E> AppInsights<Ready, C, R, U, P, E> {
pub fn with_client(self, client: C) -> AppInsights<Ready, C, R, U, P, E> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_live_metrics(self, should_collect_live_metrics: bool) -> AppInsights<Ready, C, R, U, P, E> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: should_collect_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_sample_rate(self, sample_rate: f64) -> AppInsights<Ready, C, R, U, P, E> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_minimum_level(self, minimum_level: LevelFilter) -> AppInsights<Ready, C, R, U, P, E> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_subscriber<T>(self, subscriber: T) -> AppInsights<Ready, C, R, T, P, E> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: Some(subscriber),
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_runtime<T>(self, runtime: T) -> AppInsights<Ready, C, T, U, P, E>
where
T: RuntimeChannel,
{
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_catch_panic(self, should_catch_panic: bool) -> AppInsights<Ready, C, R, U, P, E> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_noop(self, should_noop: bool) -> AppInsights<Ready, C, R, U, P, E> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: should_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_field_mapper<F>(self, field_mapper: F) -> AppInsights<Ready, C, R, U, P, E>
where
F: Fn(&http::request::Parts) -> HashMap<String, String> + Send + Sync + 'static,
{
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: Some(Arc::new(field_mapper)),
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_panic_mapper<F, T>(self, panic_mapper: F) -> AppInsights<Ready, C, R, U, T, E>
where
F: Fn(String) -> (u16, T) + Send + Sync + 'static,
{
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: Some(Arc::new(panic_mapper)),
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_success_filter<F>(self, success_filter: F) -> AppInsights<Ready, C, R, U, P, E>
where
F: Fn(StatusCode) -> bool + Send + Sync + 'static,
{
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: Some(Arc::new(success_filter)),
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn with_error_type<T>(self) -> AppInsights<Ready, C, R, U, P, T> {
AppInsights {
connection_string: self.connection_string,
config: self.config,
client: self.client,
enable_live_metrics: self.enable_live_metrics,
sample_rate: self.sample_rate,
batch_runtime: self.batch_runtime,
minimum_level: self.minimum_level,
subscriber: self.subscriber,
should_catch_panic: self.should_catch_panic,
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom1: std::marker::PhantomData,
_phantom2: std::marker::PhantomData,
}
}
pub fn build_and_set_global_default(self) -> Result<AppInsightsComplete<P, E>, Box<dyn Error + Send + Sync + 'static>>
where
C: HttpClient + 'static,
R: RuntimeChannel,
U: tracing_subscriber::layer::SubscriberExt + for<'span> tracing_subscriber::registry::LookupSpan<'span> + Send + Sync + 'static
{
if self.is_noop {
return Ok(AppInsightsComplete {
is_noop: true,
field_mapper: None,
panic_mapper: None,
success_filter: None,
_phantom: std::marker::PhantomData,
});
}
match self.subscriber {
Some(subscriber) => {
if let Some(connection_string) = self.connection_string {
let tracer = opentelemetry_application_insights::new_pipeline_from_connection_string(connection_string)?
.with_client(self.client)
.with_live_metrics(self.enable_live_metrics)
.with_trace_config(self.config)
.with_sample_rate(self.sample_rate)
.install_batch(self.batch_runtime);
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = subscriber.with(telemetry).with(self.minimum_level);
tracing::subscriber::set_global_default(subscriber)?;
} else {
tracing::subscriber::set_global_default(subscriber.with(self.minimum_level))?;
}
},
None => {
if let Some(connection_string) = self.connection_string {
let tracer = opentelemetry_application_insights::new_pipeline_from_connection_string(connection_string)?
.with_client(self.client)
.with_live_metrics(self.enable_live_metrics)
.with_trace_config(self.config)
.with_sample_rate(self.sample_rate)
.install_batch(self.batch_runtime);
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
let subscriber = tracing_subscriber::registry().with(telemetry).with(self.minimum_level);
tracing::subscriber::set_global_default(subscriber)?;
} else {
tracing::subscriber::set_global_default(tracing_subscriber::registry().with(self.minimum_level))?;
}
},
}
if self.should_catch_panic {
let default_panic = panic::take_hook();
panic::set_hook(Box::new(move |p| {
let payload_string = format!("{:?}", p.payload().downcast_ref::<&str>());
let backtrace = Backtrace::force_capture().to_string();
tracing::event!(
name: "exception",
Level::ERROR,
ai.customEvent.name = "exception",
"exception.type" = "PANIC",
exception.message = payload_string,
exception.stacktrace = backtrace,
"exception"
);
default_panic(p);
}));
}
Ok(AppInsightsComplete {
is_noop: false,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom: std::marker::PhantomData,
})
}
}
impl<P, E> AppInsightsComplete<P, E> {
pub fn layer(self) -> AppInsightsLayer<P, E> {
AppInsightsLayer {
is_noop: self.is_noop,
field_mapper: self.field_mapper,
panic_mapper: self.panic_mapper,
success_filter: self.success_filter,
_phantom: std::marker::PhantomData,
}
}
}
#[derive(Clone)]
pub struct AppInsightsLayer<P, E> {
is_noop: bool,
field_mapper: OptionalFieldMapper,
panic_mapper: OptionalPanicMapper<P>,
success_filter: OptionalSuccessFilter,
_phantom: std::marker::PhantomData<E>,
}
impl<S, P, E> Layer<S> for AppInsightsLayer<P, E> {
type Service = AppInsightsMiddleware<S, P, E>;
fn layer(&self, inner: S) -> Self::Service {
AppInsightsMiddleware {
inner,
is_noop: self.is_noop,
field_mapper: self.field_mapper.clone(),
panic_mapper: self.panic_mapper.clone(),
success_filter: self.success_filter.clone(),
_phantom: std::marker::PhantomData,
}
}
}
#[derive(Clone)]
pub struct AppInsightsMiddleware<S, P, E> {
inner: S,
is_noop: bool,
field_mapper: OptionalFieldMapper,
panic_mapper: OptionalPanicMapper<P>,
success_filter: OptionalSuccessFilter,
_phantom: std::marker::PhantomData<E>,
}
impl<S, P, E> Service<Request<Body>> for AppInsightsMiddleware<S, P, E>
where
S: Service<Request<Body>, Response = Response> + Send + 'static,
S::Future: Send + 'static,
S::Error: Send + 'static,
P: Serialize + Send + 'static,
E: AppInsightsError + Serialize + DeserializeOwned + Default + Send + 'static,
{
type Error = S::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
type Response = S::Response;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, request: Request<Body>) -> Self::Future {
if self.is_noop {
return Box::pin(self.inner.call(request));
}
let method = request.method().to_string();
let uri = request.uri().to_string();
let client_ip = request.headers().get("x-forwarded-for").and_then(|v| v.to_str().ok()).unwrap_or("unknown").to_string();
let client_ip = client_ip.split(',').next().unwrap_or("unknown");
let (mut parts, body) = request.into_parts();
let route = futures::executor::block_on(parts.extract::<MatchedPath>())
.map(|m| m.as_str().to_owned())
.unwrap_or_else(|_| "unknown".to_owned());
let extra_fields = self.field_mapper.as_ref().map(|f| f(&parts)).unwrap_or_default();
let request = Request::from_parts(parts, body);
let span = tracing::info_span!(
"request",
otel.kind = "server",
http.request.method = method.as_str(),
url.full = uri.as_str(),
client.address = client_ip,
http.route = route.as_str(),
http.response.status_code = tracing::field::Empty,
otel.status_code = tracing::field::Empty,
otel.status_message = tracing::field::Empty,
extra_fields = serde_json::to_string_pretty(&extra_fields).unwrap()
);
let panic_mapper = self.panic_mapper.clone();
let success_filter = self.success_filter.clone();
let future = self.inner.call(request);
Box::pin(
async move {
let response = AssertUnwindSafe(future).catch_unwind().instrument(Span::current()).await;
let response = match response {
Ok(response) => response,
Err(e) => {
let payload_string = format!("{:?}", e.downcast_ref::<&str>());
let (status, error_string) = if let Some(panic_mapper) = panic_mapper.as_ref() {
let (status, error) = panic_mapper(payload_string.clone());
(status, serde_json::to_string(&error).unwrap())
} else {
(
500,
format!(
r#"{{
"status": 500,
"message": "A panic occurred: {}.",
}}"#,
payload_string
)
.to_string(),
)
};
Ok(Response::builder()
.status(status)
.header("content-type", "application/json")
.body(Body::from(error_string))
.unwrap())
}
}?;
let status = response.status();
let is_success = success_filter.as_ref().map(|f| f(status)).unwrap_or_else(|| status.is_success() || status.is_redirection() || status.is_informational());
let (response, otel_status, otel_status_message) = if is_success {
(response, "OK", format!(r#"{{ "status": {} }}"#, status.as_u16()))
} else {
let (parts, body) = response.into_parts();
let body_bytes = body.collect().await.unwrap_or_default().to_bytes();
let error: E = serde_json::from_slice(&body_bytes).unwrap_or_default();
let error_string = serde_json::to_string_pretty(&error).unwrap();
tracing::event!(
name: "exception",
Level::ERROR,
ai.customEvent.name = "exception",
"exception.type" = format!("HTTP {}", status.as_u16()),
exception.message = error.message().unwrap_or_default(),
exception.stacktrace = error.backtrace().unwrap_or_default(),
"exception"
);
let body = Body::from(body_bytes);
let response = Response::from_parts(parts, body);
(response, "ERROR", error_string)
};
let span = Span::current().entered();
span.record("http.response.status_code", status.as_u16());
span.record("otel.status_code", otel_status);
if otel_status != "OK" {
span.record("otel.status_message", otel_status_message);
}
Ok(response)
}
.instrument(span),
)
}
}
#[cfg(not(feature = "reqwest-client"))]
#[derive(Debug)]
pub struct NoopClient;
#[cfg(not(feature = "reqwest-client"))]
#[async_trait::async_trait]
impl HttpClient for NoopClient {
async fn send(&self, _request: Request<Vec<u8>>) -> Result<Response<axum::body::Bytes>, Box<dyn Error + Sync + Send>> {
Ok(Response::new(axum::body::Bytes::new()))
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc::Sender;
use axum::{Router, routing::get, response::IntoResponse};
use http::StatusCode;
use serde::Deserialize;
use tracing::{Subscriber, span};
use tracing_subscriber::Layer;
use super::*;
#[derive(Clone, Default, Serialize, Deserialize)]
struct WebError {
status: u16,
message: String,
}
impl AppInsightsError for WebError {
fn message(&self) -> Option<String> {
Some(self.message.clone())
}
fn backtrace(&self) -> Option<String> {
None
}
}
impl IntoResponse for WebError {
fn into_response(self) -> Response {
let code = StatusCode::from_u16(self.status).unwrap();
let body = serde_json::to_string(&self).unwrap();
(code, body).into_response()
}
}
struct TestSubscriberLayer {
sender: Sender<String>,
}
impl<S> Layer<S> for TestSubscriberLayer
where
S: Subscriber
{
fn on_new_span(&self, attrs: &span::Attributes<'_>, _id: &span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
self.sender.send(format!("new|{}", attrs.metadata().name())).unwrap();
}
fn on_event(&self, event: &tracing::Event<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
self.sender.send(format!("event|{}", event.metadata().name())).unwrap();
}
fn on_record(&self, _id: &span::Id, values: &span::Record<'_>, _ctx: tracing_subscriber::layer::Context<'_, S>) {
self.sender.send(format!("record|{:?}", values)).unwrap();
}
fn on_close(&self, _id: span::Id, _ctx: tracing_subscriber::layer::Context<'_, S>) {
self.sender.send("close".to_string()).unwrap();
}
}
#[tokio::test]
async fn test_integration() {
let (sender, receiver) = std::sync::mpsc::channel();
let subscriber = tracing_subscriber::registry().with(TestSubscriberLayer {
sender: sender.clone(),
});
let i = AppInsights::default()
.with_connection_string(None)
.with_service_config("namespace", "name", "servername")
.with_client(reqwest::Client::new())
.with_sample_rate(1.0)
.with_minimum_level(LevelFilter::INFO)
.with_runtime(Tokio)
.with_catch_panic(true)
.with_subscriber(subscriber)
.with_field_mapper(|_| {
let mut map = HashMap::new();
map.insert("extra_field".to_owned(), "extra_value".to_owned());
map
})
.with_panic_mapper(|panic| {
(500, WebError { status: 500, message: panic })
})
.with_success_filter(|status| {
status.is_success() || status.is_redirection() || status.is_informational() || status == StatusCode::NOT_FOUND
})
.with_error_type::<WebError>()
.build_and_set_global_default()
.unwrap();
let layer = i.layer();
let mut app: Router<()> = Router::new()
.route("/succeed1", get(|| async { Response::new(Body::empty()) }))
.route("/succeed2", get(|| async { (StatusCode::NOT_MODIFIED, "") }))
.route("/succeed3", get(|| async { (StatusCode::NOT_FOUND, "") }))
.route("/fail1", get(|| async { WebError { status: 429, message: "foo".to_string() } }))
.route("/fail2", get(|| async { panic!("panic") }))
.layer(layer);
let request = Request::builder().uri("/succeed1").body(Body::empty()).unwrap();
let response = <axum::Router as tower::ServiceExt<Request<Body>>>::ready(&mut app).await.unwrap().call(request).await.unwrap();
assert_eq!(response.status(), 200);
assert_eq!("new|request", receiver.recv().unwrap());
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { http.response.status_code: 200"));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { otel.status_code: \"OK\""));
assert_eq!("close", receiver.recv().unwrap());
let request = Request::builder().uri("/succeed2").body(Body::empty()).unwrap();
let response = <axum::Router as tower::ServiceExt<Request<Body>>>::ready(&mut app).await.unwrap().call(request).await.unwrap();
assert_eq!(response.status(), 304);
assert_eq!("new|request", receiver.recv().unwrap());
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { http.response.status_code: 304"));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { otel.status_code: \"OK\""));
assert_eq!("close", receiver.recv().unwrap());
let request = Request::builder().uri("/succeed3").body(Body::empty()).unwrap();
let response = <axum::Router as tower::ServiceExt<Request<Body>>>::ready(&mut app).await.unwrap().call(request).await.unwrap();
assert_eq!(response.status(), 404);
assert_eq!("new|request", receiver.recv().unwrap());
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { http.response.status_code: 404"));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { otel.status_code: \"OK\""));
assert_eq!("close", receiver.recv().unwrap());
let request = Request::builder().uri("/fail1").body(Body::empty()).unwrap();
let response = <axum::Router as tower::ServiceExt<Request<Body>>>::ready(&mut app).await.unwrap().call(request).await.unwrap();
assert_eq!(response.status(), 429);
assert_eq!("new|request", receiver.recv().unwrap());
assert!(receiver.recv().unwrap().starts_with("event|exception"));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { http.response.status_code: 429"));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { otel.status_code: \"ERROR\""));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { otel.status_message: \"{\\n \\\"status\\\": 429,\\n \\\"message\\\": \\\"foo\\\"\\n}\""));
assert_eq!("close", receiver.recv().unwrap());
let request = Request::builder().uri("/fail2").body(Body::empty()).unwrap();
let response = <axum::Router as tower::ServiceExt<Request<Body>>>::ready(&mut app).await.unwrap().call(request).await.unwrap();
assert_eq!(response.status(), 500);
assert_eq!("new|request", receiver.recv().unwrap());
assert!(receiver.recv().unwrap().starts_with("event|exception"));
assert!(receiver.recv().unwrap().starts_with("event|exception"));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { http.response.status_code: 500"));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { otel.status_code: \"ERROR\""));
assert!(receiver.recv().unwrap().starts_with("record|Record { values: ValueSet { otel.status_message: \"{\\n \\\"status\\\": 500,\\n \\\"message\\\": \\\"Some(\\\\\\\"panic\\\\\\\")\\\"\\n}\""));
assert_eq!("close", receiver.recv().unwrap());
}
#[tokio::test]
async fn test_noop() {
let (sender, receiver) = std::sync::mpsc::channel();
let subscriber = tracing_subscriber::registry().with(TestSubscriberLayer {
sender: sender.clone(),
});
let i = AppInsights::default()
.with_connection_string(None)
.with_service_config("namespace", "name", "servername")
.with_subscriber(subscriber)
.with_noop(true)
.build_and_set_global_default()
.unwrap();
let layer = i.layer();
let mut app: Router<()> = Router::new()
.route("/succeed1", get(|| async { Response::new(Body::empty()) }))
.layer(layer);
let request = Request::builder().uri("/succeed1").body(Body::empty()).unwrap();
let response = <axum::Router as tower::ServiceExt<Request<Body>>>::ready(&mut app).await.unwrap().call(request).await.unwrap();
assert_eq!(response.status(), 200);
assert!(receiver.try_recv().is_err());
}
}