mod propagation;
mod record_result;
mod traced;
pub use propagation::{
HttpClientPropagator, HttpClientPropagatorLayer, HttpServerPropagator,
HttpServerPropagatorLayer,
};
pub use record_result::{RecordResultFuture, RecordResultLayer, RecordResultService};
pub use traced::{MakeSpan, TracedFuture, TracedFutureExt, TracedLayer, TracedService};
use opentelemetry::InstrumentationScope;
use tower::ServiceBuilder;
mod private {
pub trait Sealed {}
impl<L> Sealed for tower::ServiceBuilder<L> {}
}
pub trait ServiceBuilderExt<L>: private::Sealed {
fn http_server_propagation(
self,
) -> ServiceBuilder<tower::layer::util::Stack<HttpServerPropagatorLayer, L>>;
fn http_client_propagation(
self,
) -> ServiceBuilder<tower::layer::util::Stack<HttpClientPropagatorLayer, L>>;
fn traced<F>(
self,
scope: &'static InstrumentationScope,
make_span: F,
) -> ServiceBuilder<tower::layer::util::Stack<TracedLayer<F>, L>>
where
F: Clone;
fn record_result(self) -> ServiceBuilder<tower::layer::util::Stack<RecordResultLayer, L>>;
}
impl<L> ServiceBuilderExt<L> for ServiceBuilder<L> {
fn http_server_propagation(
self,
) -> ServiceBuilder<tower::layer::util::Stack<HttpServerPropagatorLayer, L>> {
self.layer(HttpServerPropagatorLayer::new())
}
fn http_client_propagation(
self,
) -> ServiceBuilder<tower::layer::util::Stack<HttpClientPropagatorLayer, L>> {
self.layer(HttpClientPropagatorLayer::new())
}
fn traced<F>(
self,
scope: &'static InstrumentationScope,
make_span: F,
) -> ServiceBuilder<tower::layer::util::Stack<TracedLayer<F>, L>>
where
F: Clone,
{
self.layer(TracedLayer::new(scope, make_span))
}
fn record_result(self) -> ServiceBuilder<tower::layer::util::Stack<RecordResultLayer, L>> {
self.layer(RecordResultLayer::new())
}
}
#[cfg(test)]
mod tests {
use super::*;
use apollo_opentelemetry_test::{TelemetryContext, assert_spans_snapshot};
use http::{Request, Response};
use opentelemetry::KeyValue;
use opentelemetry::trace::SpanBuilder;
fn test_scope() -> &'static InstrumentationScope {
static SCOPE: std::sync::LazyLock<InstrumentationScope> =
std::sync::LazyLock::new(|| InstrumentationScope::builder("test").build());
&SCOPE
}
#[tokio::test]
async fn test_service_builder_ext() {
let ctx = TelemetryContext::new();
let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
ServiceBuilder::new()
.traced(test_scope(), |req: &String| {
SpanBuilder::from_name("echo")
.with_attributes([KeyValue::new("input", req.clone())])
})
.service(inner)
});
assert!(service.poll_ready().is_ready());
let response = service.call("hello".to_string());
let (req, send_response) = handle.next_request().await.unwrap();
assert_eq!(req, "hello");
send_response.send_response("hello".to_string());
assert_eq!(response.await.unwrap(), "hello");
assert_spans_snapshot!(ctx, @r#"
- name: echo
span_kind: Internal
is_sampled: true
attributes:
input: hello
"#);
}
#[tokio::test]
async fn test_service_builder_ext_with_record_result() {
let ctx = TelemetryContext::new();
let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
ServiceBuilder::new()
.traced(test_scope(), |req: &String| {
SpanBuilder::from_name("process")
.with_attributes([KeyValue::new("input", req.clone())])
})
.record_result()
.service(inner)
});
assert!(service.poll_ready().is_ready());
let response = service.call("test".to_string());
let (req, send_response) = handle.next_request().await.unwrap();
assert_eq!(req, "test");
send_response.send_response("ok".to_string());
assert_eq!(response.await.unwrap(), "ok");
assert_spans_snapshot!(ctx, @r#"
- name: process
span_kind: Internal
is_sampled: true
attributes:
input: test
status: Ok
"#);
}
#[tokio::test]
async fn test_nested_spans_with_record_result() {
let ctx = TelemetryContext::new();
let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
ServiceBuilder::new()
.traced(test_scope(), |req: &String| {
SpanBuilder::from_name("outer")
.with_attributes([KeyValue::new("request", req.clone())])
})
.record_result()
.traced(test_scope(), |_req: &String| {
SpanBuilder::from_name("inner")
})
.record_result()
.service(inner)
});
assert!(service.poll_ready().is_ready());
let response = service.call("test".to_string());
let (req, send_response) = handle.next_request().await.unwrap();
assert_eq!(req, "test");
send_response.send_response("ok".to_string());
assert_eq!(response.await.unwrap(), "ok");
assert_spans_snapshot!(ctx, @r#"
- name: inner
span_kind: Internal
has_parent: true
is_sampled: true
status: Ok
- name: outer
span_kind: Internal
is_sampled: true
attributes:
request: test
status: Ok
"#);
}
#[tokio::test]
async fn test_http_server_propagation_via_service_builder_ext() {
let ctx = TelemetryContext::new();
let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
ServiceBuilder::new()
.http_server_propagation()
.traced(test_scope(), |_req: &Request<()>| {
SpanBuilder::from_name("server-handler")
.with_kind(opentelemetry::trace::SpanKind::Server)
})
.service(inner)
});
let traceparent = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
let req = Request::builder()
.header("traceparent", traceparent)
.body(())
.unwrap();
assert!(service.poll_ready().is_ready());
let response_fut = service.call(req);
let (_req, send_response) = handle.next_request().await.unwrap();
send_response.send_response(Response::new("ok"));
response_fut.await.unwrap();
assert_spans_snapshot!(ctx, @r#"
- name: server-handler
span_kind: Server
has_parent: true
is_sampled: true
"#);
}
#[tokio::test]
async fn test_http_client_propagation_via_service_builder_ext() {
let _ctx = TelemetryContext::new();
let (mut service, mut handle) = tower_test::mock::spawn_with(|inner| {
ServiceBuilder::new()
.http_client_propagation()
.service(inner)
});
use opentelemetry::trace::{TraceContextExt, Tracer};
let tracer = opentelemetry::global::tracer("test");
let span = tracer.start("parent-span");
let cx = opentelemetry::Context::current_with_span(span);
let _guard = cx.attach();
assert!(service.poll_ready().is_ready());
let response_fut = service.call(Request::builder().body(()).unwrap());
let response_handle = tokio::spawn(response_fut);
let (req, send_response) = handle.next_request().await.unwrap();
assert!(
req.headers().get("traceparent").is_some(),
"traceparent should be injected via http_client_propagation"
);
send_response.send_response(Response::new("ok"));
response_handle.await.unwrap().unwrap();
}
}