use std::sync::Arc;
use std::time::Duration;
use apollo_router::TestHarness;
use apollo_router::services::router;
use apollo_router::services::router::BoxCloneService;
use apollo_router::services::supergraph;
use axum::Router;
use axum::extract::State;
use axum::routing::post;
use bytes::Bytes;
use http_body_util::BodyExt as _;
use once_cell::sync::Lazy;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use prost::Message;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tower::Service;
use tower::ServiceExt;
use tower_http::decompression::RequestDecompressionLayer;
mod tracing_common;
pub(crate) mod plugins {
pub(crate) mod telemetry {
pub(crate) mod apollo_exporter {
pub(crate) fn serialize_timestamp<S>(
timestamp: &Option<prost_types::Timestamp>,
serializer: S,
) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeStruct as _;
match timestamp {
Some(ts) => {
let mut s = serializer.serialize_struct("Timestamp", 2)?;
s.serialize_field("seconds", &ts.seconds)?;
s.serialize_field("nanos", &ts.nanos)?;
s.end()
}
None => serializer.serialize_none(),
}
}
}
}
}
static ROUTER_SERVICE_RUNTIME: Lazy<Arc<tokio::runtime::Runtime>> = Lazy::new(|| {
Arc::new(tokio::runtime::Runtime::new().expect("must be able to create tokio runtime"))
});
#[derive(Clone)]
struct BackendState {
reports: Arc<Mutex<Vec<ExportTraceServiceRequest>>>,
}
async fn backend_traces_handler(State(state): State<BackendState>, bytes: Bytes) -> axum::Json<()> {
if let Ok(report) = ExportTraceServiceRequest::decode(&*bytes) {
state.reports.lock().await.push(report);
}
axum::Json(())
}
#[derive(Clone)]
struct ProxyState {
intercepted_uris: Arc<Mutex<Vec<String>>>,
client: reqwest::Client,
}
async fn proxy_forward_handler(
State(state): State<ProxyState>,
req: axum::extract::Request,
) -> impl axum::response::IntoResponse {
let target_url = if req.uri().scheme().is_some() {
req.uri().to_string()
} else {
let host = req
.headers()
.get(http::header::HOST)
.and_then(|v| v.to_str().ok())
.unwrap_or("");
format!("http://{}{}", host, req.uri().path())
};
state.intercepted_uris.lock().await.push(target_url.clone());
let method = req.method().clone();
let original_headers = req.headers().clone();
let body_bytes = req
.into_body()
.collect()
.await
.map(|c| c.to_bytes())
.unwrap_or_default();
let mut req_builder = state.client.request(method, &target_url);
for (name, value) in &original_headers {
if name != http::header::HOST {
req_builder = req_builder.header(name, value);
}
}
match req_builder.body(body_bytes).send().await {
Ok(resp) => (resp.status(), axum::body::Body::empty()),
Err(err) => {
eprintln!("[proxy] forward error: {err}");
(http::StatusCode::BAD_GATEWAY, axum::body::Body::empty())
}
}
}
async fn setup(
reports: Arc<Mutex<Vec<ExportTraceServiceRequest>>>,
intercepted_uris: Arc<Mutex<Vec<String>>>,
) -> (JoinHandle<()>, JoinHandle<()>, BoxCloneService) {
let backend_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let backend_addr = backend_listener.local_addr().unwrap();
let backend_state = BackendState {
reports: reports.clone(),
};
let backend_app = Router::new()
.route("/", post(|| async { axum::Json(()) }))
.merge(
Router::new()
.route("/v1/traces", post(backend_traces_handler))
.layer(RequestDecompressionLayer::new())
.with_state(backend_state),
);
let backend_task = ROUTER_SERVICE_RUNTIME.spawn(async move {
axum::serve(backend_listener, backend_app)
.await
.expect("backend server failed")
});
let proxy_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let proxy_addr = proxy_listener.local_addr().unwrap();
let no_proxy_client = reqwest::Client::builder()
.no_proxy()
.build()
.expect("failed to build no-proxy reqwest client");
let proxy_state = ProxyState {
intercepted_uris: intercepted_uris.clone(),
client: no_proxy_client,
};
let proxy_app = Router::new()
.fallback(proxy_forward_handler)
.with_state(proxy_state);
let proxy_task = ROUTER_SERVICE_RUNTIME.spawn(async move {
axum::serve(proxy_listener, proxy_app)
.await
.expect("proxy server failed")
});
*apollo_router::_private::APOLLO_KEY.lock() = Some("test".to_string());
*apollo_router::_private::APOLLO_GRAPH_REF.lock() = Some("test".to_string());
let mut config: serde_json::Value =
serde_yaml::from_str(include_str!("fixtures/reports/apollo_reports.router.yaml"))
.expect("apollo_reports.router.yaml was invalid");
config = jsonpath_lib::replace_with(config, "$.telemetry.apollo.endpoint", &mut |_| {
Some(serde_json::Value::String(format!("http://{backend_addr}")))
})
.unwrap();
config = jsonpath_lib::replace_with(
config,
"$.telemetry.apollo.experimental_otlp_endpoint",
&mut |_| Some(serde_json::Value::String(format!("http://{backend_addr}"))),
)
.unwrap();
config = jsonpath_lib::replace_with(
config,
"$.telemetry.apollo.otlp_tracing_sampler",
&mut |_| Some(serde_json::Value::String("always_on".to_string())),
)
.unwrap();
config = jsonpath_lib::replace_with(
config,
"$.telemetry.apollo.experimental_otlp_tracing_protocol",
&mut |_| Some(serde_json::Value::String("http".to_string())),
)
.unwrap();
#[allow(unused_unsafe)]
unsafe {
std::env::remove_var("HTTP_PROXY");
std::env::remove_var("http_proxy");
std::env::remove_var("HTTPS_PROXY");
std::env::remove_var("https_proxy");
std::env::remove_var("NO_PROXY");
std::env::remove_var("no_proxy");
std::env::set_var("HTTP_PROXY", format!("http://{proxy_addr}"));
}
let router_service = TestHarness::builder()
.try_log_level("INFO")
.configuration_json(config)
.expect("test harness had config errors")
.schema(include_str!("fixtures/supergraph.graphql"))
.subgraph_hook(|subgraph, _service| tracing_common::subgraph_mocks(subgraph))
.build_router()
.await
.expect("could not create router test harness");
(backend_task, proxy_task, router_service)
}
#[tokio::test(flavor = "multi_thread")]
async fn test_otlp_http_traces_through_proxy() {
let reports: Arc<Mutex<Vec<ExportTraceServiceRequest>>> = Arc::new(Mutex::new(vec![]));
let intercepted_uris: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));
let (backend_task, proxy_task, mut service) =
setup(reports.clone(), intercepted_uris.clone()).await;
let request = supergraph::Request::fake_builder()
.query("query { topProducts { name reviews { author { name } } } }")
.build()
.unwrap();
let req: router::Request = request.try_into().expect("could not convert request");
let response = service
.ready()
.await
.expect("router was never ready")
.call(req)
.await
.expect("router call failed");
let _ = response.response.into_body().collect().await;
let mut trace_received = false;
let deadline = std::time::Instant::now() + Duration::from_secs(10);
while std::time::Instant::now() < deadline {
if !reports.lock().await.is_empty() {
trace_received = true;
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
#[allow(unused_unsafe)]
unsafe {
std::env::remove_var("HTTP_PROXY");
std::env::remove_var("http_proxy");
std::env::remove_var("HTTPS_PROXY");
std::env::remove_var("https_proxy");
std::env::remove_var("NO_PROXY");
std::env::remove_var("no_proxy");
}
backend_task.abort();
proxy_task.abort();
let uris = intercepted_uris.lock().await;
assert!(
uris.iter().any(|u| u.contains("/v1/traces")),
"Expected proxy to intercept a /v1/traces request; intercepted URIs: {uris:?}"
);
assert!(
trace_received,
"Backend should have received OTLP trace data through the proxy, but none arrived"
);
let backend_reports = reports.lock().await;
let first_report = backend_reports
.first()
.expect("expected at least one trace report");
assert!(
!first_report.resource_spans.is_empty(),
"Received trace report contains no resource spans"
);
println!("[proxy] intercepted {} request(s): {:?}", uris.len(), *uris);
println!(
"[backend] received {} trace report(s); first report has {} resource span(s)",
backend_reports.len(),
first_report.resource_spans.len()
);
}