1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
use std::sync::Arc;
use std::task::Poll;
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use futures::FutureExt as _;
use futures::future::BoxFuture;
use futures::future::join_all;
use http::Method;
use http::StatusCode;
use http::header;
use http_body::Body as _;
use tower::BoxError;
use tower::Service;
use tower::ServiceExt as _;
use tracing::Instrument as _;
// FIXME(@goto-bus-stop): Ideally the batching layer shouldn't have to care about this
use crate::Context;
use crate::batching::Batch;
use crate::batching::BatchQuery;
use crate::configuration::Batching;
use crate::configuration::BatchingMode;
use crate::graphql;
use crate::services::router;
use crate::services::router::ClientRequestAccepts;
use crate::services::router::Request as RouterRequest;
use crate::services::router::Response as RouterResponse;
// FIXME(@goto-bus-stop): This is a copy of router::service::TranslateError. Probably should have
// its own error type.
#[derive(Clone)]
struct TranslateError {
status: StatusCode,
extension_code: String,
extension_details: String,
}
/// When the batching layer receives a batch query (a POST request with a JSON array in the body),
/// it splits the requests into multiple requests that flow separately through the rest of the
/// pipeline, and reassembles the responses into a single JSON array response.
pub(super) struct BatchingLayer {
config: Batching,
}
impl BatchingLayer {
pub(super) fn new(config: Batching) -> Self {
Self { config }
}
}
impl<S> tower::Layer<S> for BatchingLayer {
type Service = BatchingService<S>;
fn layer(&self, inner: S) -> Self::Service {
BatchingService {
inner,
config: self.config.clone(),
}
}
}
#[derive(Clone)]
pub(super) struct BatchingService<S> {
inner: S,
config: Batching,
}
impl<S> Service<RouterRequest> for BatchingService<S>
where
S: Service<RouterRequest, Response = RouterResponse, Error = BoxError> + Clone + Send + 'static,
S::Future: Send + 'static,
{
type Response = RouterResponse;
type Error = BoxError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> Poll<std::result::Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: RouterRequest) -> Self::Future {
// Batching is not supported for GET requests
if req.router_request.method() == Method::GET {
return self.inner.call(req).boxed();
}
let service = self.clone();
let mut service = std::mem::replace(self, service);
Box::pin(async move {
let context = req.context;
let (parts, body) = req.router_request.into_parts();
// In the future, we should have a `HttpToBytesLayer`, but until then, we need to read
// the body here first and wrap it back up after.
// We only want to record the "receive_body" span if we haven't received the body yet,
// to prevent having multiple of those spans.
let is_fixed_size = body.size_hint().exact().is_some();
let bytes = if is_fixed_size {
router::body::into_bytes(body).await?
} else {
router::body::into_bytes(body)
.instrument(tracing::debug_span!("receive_body"))
.await?
};
let batch = match service.parse_batch_request(&bytes) {
Ok(None) => {
// Not a batch request. Reassemble the request and pass it on.
let body = router::body::from_bytes(bytes);
return service
.inner
.call(RouterRequest {
context,
router_request: http::Request::from_parts(parts, body),
})
.await;
}
Ok(Some(batch)) => batch,
Err(err) => {
return router::Response::error_builder()
.error(
graphql::Error::builder()
.message(String::from("Invalid GraphQL request"))
.extension_code(err.extension_code)
.extension("details", err.extension_details)
.build(),
)
.status_code(err.status)
.header(header::CONTENT_TYPE, mime::APPLICATION_JSON.essence_str())
.context(context)
.build();
}
};
let requests = match service.build_batch_requests(&context, parts, batch) {
Ok(results) => results,
Err(err) => {
return router::Response::error_builder()
.error(
graphql::Error::builder()
.message(String::from("Invalid GraphQL request"))
.extension_code(err.extension_code)
.extension("details", err.extension_details)
.build(),
)
.status_code(err.status)
.header(header::CONTENT_TYPE, mime::APPLICATION_JSON.essence_str())
.context(context)
.build();
}
};
// We need to handle cases where a failure is part of a batch and thus must be cancelled.
// Requests can be cancelled at any point of the router pipeline, but all failures bubble back
// up through here, so we can catch them without having to specially handle batch queries in
// other portions of the codebase.
let futures = requests.into_iter().map(|request| {
let service = service.clone();
async move {
// We clone the context here, because if the request results in an Err, the
// response context will no longer exist.
let context = request.context.clone();
let result = service.call_inner_service(request).await;
// Regardless of the result, we need to make sure that we cancel any potential batch queries. This is because
// custom rust plugins, rhai scripts, and coprocessors can cancel requests at any time and return a GraphQL
// error wrapped in an `Ok` or in a `BoxError` wrapped in an `Err`.
let batch_query_opt = context
.extensions()
.with_lock(|lock| lock.remove::<BatchQuery>());
if let Some(batch_query) = batch_query_opt {
// Only proceed with signalling cancelled if the batch_query is not finished
if !batch_query.finished() {
tracing::debug!("cancelling batch query in supergraph response");
batch_query
.signal_cancelled("request terminated by user".to_string())
.await?;
}
}
result
}
});
// Use join_all to preserve ordering of concurrent operations
// (Short circuit processing and propagate any errors in the batch)
// Note: We use `join_all` here since it awaits all futures before returning, thus allowing us to
// handle cancellation logic without fear of the other futures getting killed.
let results: Vec<router::Response> = join_all(futures)
.await
.into_iter()
.collect::<Result<Vec<router::Response>, BoxError>>()?;
// If we detected we are processing a batch, return an array of results even if there is only
// one result
let mut results_it = results.into_iter();
let first = results_it
.next()
.expect("we should have at least one response");
let (parts, body) = first.response.into_parts();
let context = first.context;
let mut bytes = BytesMut::new();
bytes.put_u8(b'[');
bytes.extend_from_slice(&router::body::into_bytes(body).await?);
for result in results_it {
bytes.put_u8(b',');
bytes.extend_from_slice(
&router::body::into_bytes(result.response.into_body()).await?,
);
}
bytes.put_u8(b']');
Ok(RouterResponse {
response: http::Response::from_parts(
parts,
router::body::from_bytes(bytes.freeze()),
),
context,
})
})
}
}
impl<S> BatchingService<S>
where
S: Service<RouterRequest, Response = RouterResponse, Error = BoxError> + Clone,
{
fn parse_batch_request(
&self,
bytes: &Bytes,
) -> Result<Option<Vec<graphql::Request>>, TranslateError> {
// REVIEW NOTE(@goto-bus-stop): Previously, batching first attempted to parse a single
// response, and only attempted to parse a batch if that failed. With batching as a
// separate layer, we can't do that anymore (as parsing is the responsibility of a
// downstream service), so to avoid re-parsing in the unbatched case we need an up front
// check for if it's likely to be a batch.
let first_non_ws_character = bytes.iter().find(|byte| !byte.is_ascii_whitespace());
if first_non_ws_character != Some(&b'[') {
// Not a batch request
return Ok(None);
}
if self.config.enabled && matches!(self.config.mode, BatchingMode::BatchHttpLink) {
let result = graphql::Request::batch_from_bytes(bytes).map_err(|e| TranslateError {
status: StatusCode::BAD_REQUEST,
extension_code: "INVALID_GRAPHQL_REQUEST".to_string(),
extension_details: format!("failed to deserialize the request body into JSON: {e}"),
})?;
if result.is_empty() {
return Err(TranslateError {
status: StatusCode::BAD_REQUEST,
extension_code: "INVALID_GRAPHQL_REQUEST".to_string(),
extension_details:
"failed to decode a valid GraphQL request from path: empty array "
.to_string(),
});
}
if self.config.exceeds_batch_size(&result) {
return Err(TranslateError {
status: StatusCode::UNPROCESSABLE_ENTITY,
extension_code: "BATCH_LIMIT_EXCEEDED".to_string(),
extension_details: format!(
"Batch limits exceeded: you provided a batch with {} entries, but the configured maximum router batch size is {}",
result.len(),
self.config.maximum_size.unwrap_or_default()
),
});
}
Ok(Some(result))
} else {
let extension_details = if self.config.enabled
&& !matches!(self.config.mode, BatchingMode::BatchHttpLink)
{
format!("batching not supported for mode `{}`", self.config.mode)
} else {
"batching not enabled".to_string()
};
Err(TranslateError {
status: StatusCode::BAD_REQUEST,
extension_code: "BATCHING_NOT_ENABLED".to_string(),
extension_details,
})
}
}
/// Turns a parsed batch of queries into multiple separate requests that can flow through the
/// rest of the pipeline.
fn build_batch_requests(
&self,
context: &Context,
parts: http::request::Parts,
batch: Vec<graphql::Request>,
) -> Result<Vec<RouterRequest>, TranslateError> {
let mut results = Vec::with_capacity(batch.len());
let batch_size = batch.len();
// Modifying our Context extensions.
// If we are processing a batch (is_batch == true), insert our batching configuration.
// If subgraph batching configuration exists and is enabled for any of our subgraphs, we create our shared batch details
context
.extensions()
.with_lock(|lock| lock.insert(self.config.clone()));
let shared_batch_details = self
.config
.subgraph
.as_ref()
.map(|subgraph_batching_config| {
subgraph_batching_config.all.enabled
|| subgraph_batching_config
.subgraphs
.values()
.any(|v| v.enabled)
})
.and_then(|a| a.then_some(Arc::new(Batch::spawn_handler(batch_size))));
let mut ok_results_it = batch.into_iter();
let first = ok_results_it
.next()
.expect("we should have at least one request");
// Building up the batch of router requests is tricky.
// Firstly note that any http extensions are only propagated for the first request sent
// through the pipeline. This is because there is simply no way to clone http
// extensions.
//
// Secondly, we can't clone extensions, but we need to propagate at least
// ClientRequestAccepts to ensure correct processing of the response. We do that manually,
// but the concern is that there may be other extensions that wish to propagate into
// each request or we may add them in future and not know about it here...
//
// (Technically we could clone extensions, since it is held under an `Arc`, but that
// would mean all the requests in a batch shared the same set of extensions and review
// comments expressed the sentiment that this may be a bad thing...)
//
// Note: If we enter this loop, then we must be processing a batch.
for (index, graphql_request) in ok_results_it.enumerate() {
let body = router::body::from_bytes(serde_json::to_vec(&graphql_request).unwrap());
let new = http::Request::from_parts(parts.clone(), body);
// XXX Lose some private entries, is that ok?
let new_context = Context::new();
new_context.extend(context);
let client_request_accepts_opt = context
.extensions()
.with_lock(|lock| lock.get::<ClientRequestAccepts>().cloned());
// We are only going to insert a BatchQuery if Subgraph processing is enabled
let b_for_index_opt = if let Some(shared_batch_details) = &shared_batch_details {
Some(
Batch::query_for_index(shared_batch_details.clone(), index + 1).map_err(
|err| TranslateError {
status: StatusCode::INTERNAL_SERVER_ERROR,
extension_code: "BATCHING_ERROR".to_string(),
extension_details: format!("failed to create batch entry: {err}"),
},
)?,
)
} else {
None
};
new_context.extensions().with_lock(|lock| {
if let Some(client_request_accepts) = client_request_accepts_opt {
lock.insert(client_request_accepts);
}
lock.insert(self.config.clone());
// We are only going to insert a BatchQuery if Subgraph processing is enabled
if let Some(b_for_index) = b_for_index_opt {
lock.insert(b_for_index);
}
});
results.push(RouterRequest {
router_request: new,
context: new_context,
});
}
if let Some(shared_batch_details) = shared_batch_details {
let b_for_index =
Batch::query_for_index(shared_batch_details, 0).map_err(|err| TranslateError {
status: StatusCode::INTERNAL_SERVER_ERROR,
extension_code: "BATCHING_ERROR".to_string(),
extension_details: format!("failed to create batch entry: {err}"),
})?;
context
.extensions()
.with_lock(|lock| lock.insert(b_for_index));
}
let body = router::body::from_bytes(serde_json::to_vec(&first).unwrap());
results.insert(
0,
RouterRequest {
router_request: http::Request::from_parts(parts, body),
context: context.clone(),
},
);
Ok(results)
}
async fn call_inner_service(self, request: RouterRequest) -> Result<RouterResponse, BoxError> {
// self.inner here is a clone of the service that was readied
// in `poll_ready`. Clones are unready by default, so this
// self.inner is actually not ready, which is why we need to
// oneshot it here. That technically breaks backpressure, but because we are
// still readying the inner service before calling into the batching
// service, backpressure is actually still exerted at that point--there's
// just potential for some requests to slip through the cracks and end up
// queueing up at this .oneshot() call.
//
// Not ideal, but an improvement on the situation in Router 1.x.
self.inner.oneshot(request).await
}
}