apollo-router 2.15.0

A configurable, high-performance routing runtime for Apollo Federation 🚀
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
//! Validates that Apollo OTLP HTTP telemetry respects standard HTTP proxy environment variables.
//!
//! `HTTP_PROXY` and `HTTPS_PROXY` are read by `hyper-util`'s proxy matcher
//! unconditionally at `reqwest::Client` creation time — which happens when the
//! router initialises its OTLP exporter on start-up.  This behaviour was
//! introduced in apollographql/router#9055 and this test serves as a regression
//! guard for the traces HTTP export path.
//!
//! The test:
//!
//! 1. Starts a mock OTLP backend that records received trace reports.
//! 2. Starts a simple in-process HTTP forward proxy that records intercepted
//!    request URIs and forwards them to the backend.
//! 3. Sets `HTTP_PROXY` to point at the in-process proxy *before* building the
//!    router, so the reqwest client picks it up at initialisation time.
//! 4. Sends a GraphQL query through the router and waits for the OTLP batch to
//!    flush.
//! 5. Asserts that the proxy intercepted a `/v1/traces` request *and* that the
//!    backend decoded a valid `ExportTraceServiceRequest` with resource spans.
//!
//! # Why there is no metrics proxy test
//!
//! The Apollo OTLP metrics pipeline uses a `PeriodicReader` that only calls
//! `export()` when `scope_metrics` is non-empty.  The `ApolloRealtime` meter
//! provider only accepts instruments matching
//! `apollo.router.operations.(error|fetch.duration)`, which are recorded by the
//! real HTTP subgraph fetch layer — bypassed by `TestHarness` mock subgraphs.
//! An equivalent metrics proxy test requires the `IntegrationTest` framework
//! (which starts a real router process) rather than `TestHarness`.  The same
//! `reqwest::Client` proxy mechanism that this test validates for traces applies
//! identically to the metrics exporter.

use std::sync::Arc;
use std::time::Duration;

use apollo_router::TestHarness;
use apollo_router::services::router;
use apollo_router::services::router::BoxCloneService;
use apollo_router::services::supergraph;
use axum::Router;
use axum::extract::State;
use axum::routing::post;
use bytes::Bytes;
use http_body_util::BodyExt as _;
use once_cell::sync::Lazy;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use prost::Message;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
use tower::Service;
use tower::ServiceExt;
use tower_http::decompression::RequestDecompressionLayer;

mod tracing_common;

// The protobuf-generated reports.rs (included by tracing_common via
// `tonic::include_proto!("reports")`) contains serde attributes that reference
// `crate::plugins::telemetry::apollo_exporter::serialize_timestamp`.  We must
// provide that path in this test crate.
pub(crate) mod plugins {
    pub(crate) mod telemetry {
        pub(crate) mod apollo_exporter {
            pub(crate) fn serialize_timestamp<S>(
                timestamp: &Option<prost_types::Timestamp>,
                serializer: S,
            ) -> Result<S::Ok, S::Error>
            where
                S: serde::Serializer,
            {
                use serde::ser::SerializeStruct as _;
                match timestamp {
                    Some(ts) => {
                        let mut s = serializer.serialize_struct("Timestamp", 2)?;
                        s.serialize_field("seconds", &ts.seconds)?;
                        s.serialize_field("nanos", &ts.nanos)?;
                        s.end()
                    }
                    None => serializer.serialize_none(),
                }
            }
        }
    }
}

static ROUTER_SERVICE_RUNTIME: Lazy<Arc<tokio::runtime::Runtime>> = Lazy::new(|| {
    Arc::new(tokio::runtime::Runtime::new().expect("must be able to create tokio runtime"))
});
// All tests in this file must run serially across this binary AND across the
// sibling `apollo_reports` / `apollo_otel_traces` binaries, because each
// installs a process-wide tracing subscriber and mutates process-wide
// environment variables (e.g. HTTP_PROXY) that the other binaries read.
// Serialization is enforced by the `serial-apollo-telemetry-integration`
// nextest test-group in `.config/nextest.toml` (an in-source mutex cannot do
// this because each `tests/*.rs` is a separate binary).
// DO NOT run this test with bare `cargo test` -- only `cargo nextest` honours
// the group; bare `cargo test` will race the global state.

// ---------------------------------------------------------------------------
// Mock OTLP backend
// ---------------------------------------------------------------------------

#[derive(Clone)]
struct BackendState {
    reports: Arc<Mutex<Vec<ExportTraceServiceRequest>>>,
}

/// Decodes gzip-decompressed protobuf bytes into an `ExportTraceServiceRequest`
/// and appends it to the shared report list.  `RequestDecompressionLayer` on the
/// `/v1/traces` route handles decompression before this handler is called.
async fn backend_traces_handler(State(state): State<BackendState>, bytes: Bytes) -> axum::Json<()> {
    if let Ok(report) = ExportTraceServiceRequest::decode(&*bytes) {
        state.reports.lock().await.push(report);
    }
    axum::Json(())
}

// ---------------------------------------------------------------------------
// In-process HTTP forward proxy
// ---------------------------------------------------------------------------

#[derive(Clone)]
struct ProxyState {
    /// URIs of requests the proxy has intercepted (absolute form, e.g.
    /// `http://127.0.0.1:PORT/v1/traces`).
    intercepted_uris: Arc<Mutex<Vec<String>>>,
    /// reqwest client configured with `.no_proxy()` so the forwarded request
    /// does NOT loop back through the proxy.
    client: reqwest::Client,
}

/// When reqwest sends a request through an HTTP proxy it uses the *absolute-form*
/// URI in the request line:
///
/// ```text
/// POST http://127.0.0.1:PORT/v1/traces HTTP/1.1
/// Host: 127.0.0.1:PORT
/// Content-Type: application/x-protobuf
/// Content-Encoding: gzip
/// x-api-key: test
/// …
/// ```
///
/// Hyper (which axum builds on) preserves the absolute URI, so `req.uri()` in
/// this handler contains the full target URL.  We fall back to reconstructing it
/// from the `Host` header + path when the URI is relative (e.g. a direct
/// connection rather than a proxy connection).
async fn proxy_forward_handler(
    State(state): State<ProxyState>,
    req: axum::extract::Request,
) -> impl axum::response::IntoResponse {
    let target_url = if req.uri().scheme().is_some() {
        req.uri().to_string()
    } else {
        let host = req
            .headers()
            .get(http::header::HOST)
            .and_then(|v| v.to_str().ok())
            .unwrap_or("");
        format!("http://{}{}", host, req.uri().path())
    };

    state.intercepted_uris.lock().await.push(target_url.clone());

    let method = req.method().clone();
    let original_headers = req.headers().clone();
    let body_bytes = req
        .into_body()
        .collect()
        .await
        .map(|c| c.to_bytes())
        .unwrap_or_default();

    let mut req_builder = state.client.request(method, &target_url);

    for (name, value) in &original_headers {
        if name != http::header::HOST {
            req_builder = req_builder.header(name, value);
        }
    }

    match req_builder.body(body_bytes).send().await {
        Ok(resp) => (resp.status(), axum::body::Body::empty()),
        Err(err) => {
            eprintln!("[proxy] forward error: {err}");
            (http::StatusCode::BAD_GATEWAY, axum::body::Body::empty())
        }
    }
}

// ---------------------------------------------------------------------------
// Test setup
// ---------------------------------------------------------------------------

async fn setup(
    reports: Arc<Mutex<Vec<ExportTraceServiceRequest>>>,
    intercepted_uris: Arc<Mutex<Vec<String>>>,
) -> (JoinHandle<()>, JoinHandle<()>, BoxCloneService) {
    // 1. Start the mock OTLP backend.
    //    /v1/traces: OTLP HTTP path — decompresses gzip, decodes protobuf.
    //    /         : legacy Apollo reporter path — intentionally ignored here.
    let backend_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
    let backend_addr = backend_listener.local_addr().unwrap();
    let backend_state = BackendState {
        reports: reports.clone(),
    };
    let backend_app = Router::new()
        .route("/", post(|| async { axum::Json(()) }))
        .merge(
            Router::new()
                .route("/v1/traces", post(backend_traces_handler))
                .layer(RequestDecompressionLayer::new())
                .with_state(backend_state),
        );
    let backend_task = ROUTER_SERVICE_RUNTIME.spawn(async move {
        axum::serve(backend_listener, backend_app)
            .await
            .expect("backend server failed")
    });

    // 2. Start the HTTP forward proxy.
    let proxy_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
    let proxy_addr = proxy_listener.local_addr().unwrap();
    let no_proxy_client = reqwest::Client::builder()
        .no_proxy()
        .build()
        .expect("failed to build no-proxy reqwest client");
    let proxy_state = ProxyState {
        intercepted_uris: intercepted_uris.clone(),
        client: no_proxy_client,
    };
    let proxy_app = Router::new()
        .fallback(proxy_forward_handler)
        .with_state(proxy_state);
    let proxy_task = ROUTER_SERVICE_RUNTIME.spawn(async move {
        axum::serve(proxy_listener, proxy_app)
            .await
            .expect("proxy server failed")
    });

    // 3. Configure the router to send OTLP traces via HTTP.
    *apollo_router::_private::APOLLO_KEY.lock() = Some("test".to_string());
    *apollo_router::_private::APOLLO_GRAPH_REF.lock() = Some("test".to_string());

    let mut config: serde_json::Value =
        serde_yaml::from_str(include_str!("fixtures/reports/apollo_reports.router.yaml"))
            .expect("apollo_reports.router.yaml was invalid");

    config = jsonpath_lib::replace_with(config, "$.telemetry.apollo.endpoint", &mut |_| {
        Some(serde_json::Value::String(format!("http://{backend_addr}")))
    })
    .unwrap();
    config = jsonpath_lib::replace_with(
        config,
        "$.telemetry.apollo.experimental_otlp_endpoint",
        &mut |_| Some(serde_json::Value::String(format!("http://{backend_addr}"))),
    )
    .unwrap();
    config = jsonpath_lib::replace_with(
        config,
        "$.telemetry.apollo.otlp_tracing_sampler",
        &mut |_| Some(serde_json::Value::String("always_on".to_string())),
    )
    .unwrap();
    config = jsonpath_lib::replace_with(
        config,
        "$.telemetry.apollo.experimental_otlp_tracing_protocol",
        &mut |_| Some(serde_json::Value::String("http".to_string())),
    )
    .unwrap();

    // 4. Route OTLP HTTP traffic through the in-process proxy.
    //
    //    hyper-util reads HTTP_PROXY at client creation time (when the router
    //    initialises its OTLP exporters).  The env var must therefore be set
    //    before build_router() is called.
    //
    //    Safety: the TEST mutex guarantees at most one test in this file runs
    //    at a time, so no concurrent code reads or writes these variables.
    //
    //    We establish a clean proxy environment before setting our own values.
    //    Two classes of interference are possible:
    //
    //    * A stale HTTP_PROXY / HTTPS_PROXY would cause traffic to hit a
    //      different proxy instead of ours.
    //    * A NO_PROXY entry matching 127.0.0.1 would bypass our proxy entirely,
    //      causing intercepted_uris to remain empty.
    //
    //    The second case is the CI-relevant one: CircleCI injects
    //    `NO_PROXY=127.0.0.1,localhost,circleci-internal-outer-build-agent`
    //    into every Linux container.  Because our backend and proxy both bind
    //    to 127.0.0.1, that NO_PROXY entry silently bypasses the proxy on Linux
    //    CI while macOS / Windows runners pass (they have no such injection).
    //    Clearing all six variables before calling build_router() removes the
    //    interference regardless of what the host or CI environment has set.
    #[allow(unused_unsafe)]
    unsafe {
        std::env::remove_var("HTTP_PROXY");
        std::env::remove_var("http_proxy");
        std::env::remove_var("HTTPS_PROXY");
        std::env::remove_var("https_proxy");
        std::env::remove_var("NO_PROXY");
        std::env::remove_var("no_proxy");
        std::env::set_var("HTTP_PROXY", format!("http://{proxy_addr}"));
    }

    let router_service = TestHarness::builder()
        .try_log_level("INFO")
        .configuration_json(config)
        .expect("test harness had config errors")
        .schema(include_str!("fixtures/supergraph.graphql"))
        .subgraph_hook(|subgraph, _service| tracing_common::subgraph_mocks(subgraph))
        .build_router()
        .await
        .expect("could not create router test harness");

    (backend_task, proxy_task, router_service)
}

// ---------------------------------------------------------------------------
// Test
// ---------------------------------------------------------------------------

/// Verifies that OTLP HTTP traces flow through an HTTP proxy without data loss.
///
/// Asserts:
/// - The proxy intercepted at least one request to `/v1/traces`.
/// - The backend decoded a valid `ExportTraceServiceRequest` with resource spans.
#[tokio::test(flavor = "multi_thread")]
async fn test_otlp_http_traces_through_proxy() {
    let reports: Arc<Mutex<Vec<ExportTraceServiceRequest>>> = Arc::new(Mutex::new(vec![]));
    let intercepted_uris: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(vec![]));

    let (backend_task, proxy_task, mut service) =
        setup(reports.clone(), intercepted_uris.clone()).await;

    // Send a simple query that produces a traceable supergraph span.
    let request = supergraph::Request::fake_builder()
        .query("query { topProducts { name reviews { author { name } } } }")
        .build()
        .unwrap();
    let req: router::Request = request.try_into().expect("could not convert request");

    let response = service
        .ready()
        .await
        .expect("router was never ready")
        .call(req)
        .await
        .expect("router call failed");

    // Drain the response body so the router can proceed with span export.
    let _ = response.response.into_body().collect().await;

    // Poll until the OTLP batch has been flushed through the proxy to the
    // backend. The OTLP integration tests use a 10 s deadline-bounded poll
    // (see `verifier.rs::validate_metrics`); apply the same window here
    // because the proxy adds an extra hop and the OTLP batch flush itself
    // can be delayed under load.
    let mut trace_received = false;
    let deadline = std::time::Instant::now() + Duration::from_secs(10);
    while std::time::Instant::now() < deadline {
        if !reports.lock().await.is_empty() {
            trace_received = true;
            break;
        }
        tokio::time::sleep(Duration::from_millis(50)).await;
    }

    // Clean up env and background tasks before asserting, so any failure message
    // does not leave environment pollution or zombie tasks behind.
    #[allow(unused_unsafe)]
    unsafe {
        std::env::remove_var("HTTP_PROXY");
        std::env::remove_var("http_proxy");
        std::env::remove_var("HTTPS_PROXY");
        std::env::remove_var("https_proxy");
        std::env::remove_var("NO_PROXY");
        std::env::remove_var("no_proxy");
    }
    backend_task.abort();
    proxy_task.abort();

    // --- Assertion 1: proxy intercepted OTLP trace traffic ----------------
    let uris = intercepted_uris.lock().await;
    assert!(
        uris.iter().any(|u| u.contains("/v1/traces")),
        "Expected proxy to intercept a /v1/traces request; intercepted URIs: {uris:?}"
    );

    // --- Assertion 2: backend received valid OTLP trace data --------------
    assert!(
        trace_received,
        "Backend should have received OTLP trace data through the proxy, but none arrived"
    );
    let backend_reports = reports.lock().await;
    let first_report = backend_reports
        .first()
        .expect("expected at least one trace report");
    assert!(
        !first_report.resource_spans.is_empty(),
        "Received trace report contains no resource spans"
    );

    println!("[proxy] intercepted {} request(s): {:?}", uris.len(), *uris);
    println!(
        "[backend] received {} trace report(s); first report has {} resource span(s)",
        backend_reports.len(),
        first_report.resource_spans.len()
    );
}