apollo_router/layers/unconstrained_buffer.rs
1//! A wrapper around [`Buffer`] that runs [`poll_ready`] inside
2//! [`unconstrained`], preventing the cooperative budget from causing
3//! a [`Poll::Pending`] yield when the inner semaphore still has capacity.
4//!
5//! Without this, a [`Buffer`] that sits behind a [`LoadShed`]
6//! layer can be falsely shed: the Tokio coop budget reaches zero, [`poll_proceed`]
7//! returns [`Pending`], and [`LoadShed`] interprets that as the service not being
8//! ready, immediately returning an [`Overloaded`] error.
9//!
10//! By polling the inner [`Buffer`] in an unconstrained context, the coop budget
11//! check is bypassed and readiness is determined solely by the actual semaphore
12//! permit availability.
13//!
14//! ## Cases where this matters
15//!
16//! This only matters when a [`Buffer`] is behind a [`LoadShed`], and the problem
17//! is amplified when there's another [`Buffer`] in front of that [`LoadShed`],
18//! building a structure like: `Buffer(LoadShed(Buffer(service)))`.
19//!
20//! ### Amplification
21//!
22//! The amplification happens because the `Worker` loop of the outer `Buffer` picks up a message and
23//! then calls [`LoadShed::poll_ready`], which always returns [`Ready`] and never
24//! attempts to yield to the scheduler.
25//!
26//! During the [`LoadShed::poll_ready`] call, [`Buffer::poll_ready`] is called on the inner
27//! `Buffer`, which is where the semaphore permit availability is checked.
28//! However, before checking the semaphore, it will call [`poll_proceed`] to check coop budget
29//! availability. If the coop budget is exhausted, [`poll_proceed`] will return [`Pending`],
30//! which will "bubble up" to the [`LoadShed`] layer. This layer stores readiness as `false`
31//! but still returns [`Ready`] to the outer `Buffer` `Worker`.
32//!
33//! This means that the `Worker` keeps looping and consuming coop budget until it hits the
34//! coop budget check within the `poll_next_msg` which returns [`Pending`]. However, since this
35//! is the top-level running task future, there's nothing absorbing this state,
36//! and the `Worker` will yield to the scheduler.
37//!
38//! This will likely happen right after [`LoadShed`] observes a [`Buffer::poll_ready`]
39//! return [`Pending`] because further calls to [`poll_proceed`] will keep returning [`Pending`]
40//! until the scheduler resets the coop budget.
41//!
42//! On a single-threaded runtime or contended scenario, this is the moment where all accumulated
43//! [`Overloaded`] errors will start to show up one after another in "waves".
44//!
45//! [`Pending`]: Poll::Pending
46//! [`Ready`]: Poll::Ready
47//! [`unconstrained`]: tokio::task::unconstrained
48//! [`poll_ready`]: Service::poll_ready
49//! [`Buffer::poll_ready`]: Service::poll_ready
50//! [`LoadShed::poll_ready`]: Service::poll_ready
51//! [`poll_proceed`]: tokio::task::coop::poll_proceed
52//! [`LoadShed`]: tower::load_shed::LoadShed
53//! [`Overloaded`]: tower::load_shed::error::Overloaded
54use std::fmt;
55use std::future::Future;
56use std::marker::PhantomData;
57use std::task::Context;
58use std::task::Poll;
59
60use tower::BoxError;
61use tower::Layer;
62use tower::buffer::Buffer;
63use tower::buffer::future::ResponseFuture;
64use tower_service::Service;
65
66/// Adds a [coop unconstrained](tokio::task::unconstrained) [`Buffer`] layer to a service.
67///
68/// See the module documentation for more details.
69#[derive(Clone, Copy)]
70pub struct UnconstrainedBufferLayer<Request> {
71 bound: usize,
72 _p: PhantomData<fn(Request)>,
73}
74
75impl<Request> UnconstrainedBufferLayer<Request> {
76 /// Creates a new [`UnconstrainedBufferLayer`] with the provided `bound`.
77 ///
78 /// `bound` gives the maximal number of requests that can be queued for the service before
79 /// backpressure is applied to callers.
80 ///
81 /// See [`Buffer::new`] for guidance on choosing a `bound`.
82 pub const fn new(bound: usize) -> Self {
83 UnconstrainedBufferLayer {
84 bound,
85 _p: PhantomData,
86 }
87 }
88}
89
90impl<S, Request> Layer<S> for UnconstrainedBufferLayer<Request>
91where
92 S: Service<Request> + Send + 'static,
93 S::Future: Send,
94 S::Error: Into<BoxError> + Send + Sync,
95 Request: Send + 'static,
96{
97 type Service = UnconstrainedBuffer<Request, S::Future>;
98
99 fn layer(&self, service: S) -> Self::Service {
100 UnconstrainedBuffer::new(service, self.bound)
101 }
102}
103
104impl<Request> fmt::Debug for UnconstrainedBufferLayer<Request> {
105 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106 f.debug_struct("UnconstrainedBufferLayer")
107 .field("bound", &self.bound)
108 .finish()
109 }
110}
111
112/// A wrapper around [`Buffer`] that runs [`poll_ready`] inside
113/// [`unconstrained`], preventing the cooperative budget from causing
114/// a [`Pending`] yield when the inner semaphore still has capacity.
115///
116/// See the module documentation for more details.
117///
118/// [`Pending`]: Poll::Pending
119/// [`unconstrained`]: tokio::task::unconstrained
120/// [`poll_ready`]: Service::poll_ready
121#[derive(Debug)]
122pub struct UnconstrainedBuffer<Req, F> {
123 /// The inner [`Buffer`] layer, which wraps the actual service and is responsible for
124 /// buffering requests.
125 inner: Buffer<Req, F>,
126}
127
128impl<Req, F> UnconstrainedBuffer<Req, F>
129where
130 F: 'static,
131{
132 /// Creates a new `UnconstrainedBuffer` with the specified service and buffer capacity.
133 pub fn new<S>(service: S, bound: usize) -> Self
134 where
135 S: Service<Req, Future = F> + Send + 'static,
136 F: Send,
137 S::Error: Into<BoxError> + Send + Sync,
138 Req: Send + 'static,
139 {
140 let inner = Buffer::new(service, bound);
141
142 Self { inner }
143 }
144}
145
146impl<Req, Rsp, F, E> Service<Req> for UnconstrainedBuffer<Req, F>
147where
148 F: Future<Output = Result<Rsp, E>> + Send + 'static,
149 E: Into<BoxError>,
150 Req: Send + 'static,
151{
152 type Response = Rsp;
153 type Error = BoxError;
154 type Future = ResponseFuture<F>;
155
156 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
157 std::pin::pin!(tokio::task::unconstrained(std::future::poll_fn(|cx| {
158 self.inner.poll_ready(cx)
159 })))
160 .as_mut()
161 .poll(cx)
162 }
163
164 fn call(&mut self, request: Req) -> Self::Future {
165 self.inner.call(request)
166 }
167}
168
169impl<Req, F> Clone for UnconstrainedBuffer<Req, F>
170where
171 Req: Send + 'static,
172 F: Send + 'static,
173{
174 fn clone(&self) -> Self {
175 Self {
176 inner: self.inner.clone(),
177 }
178 }
179}
180
181#[cfg(test)]
182mod tests {
183 use std::future::poll_fn;
184 use std::task::Poll;
185
186 use tokio::task::JoinSet;
187 use tokio::task::coop::has_budget_remaining;
188 use tokio::task::coop::poll_proceed;
189 use tower::BoxError;
190 use tower::Service;
191 use tower::load_shed::LoadShed;
192
193 use super::*;
194
195 /// Consumes all available computational budget in the given context until a pending state is reached.
196 ///
197 /// This function repeatedly polls the [`poll_proceed`] function within the provided context (`cx`)
198 /// to exhaust the computational budget available and returns the total number of units consumed
199 /// before reaching a pending state.
200 ///
201 /// # Notes
202 /// - This function will loop indefinitely if [`poll_proceed`] never returns [`Poll::Pending`],
203 /// which is the case for tasks being executed in a [`tokio::task::unconstrained`] context.
204 fn consume_all_budget(cx: &mut Context) -> usize {
205 let mut consumed = 0;
206 loop {
207 let restore = poll_proceed(cx);
208 match restore {
209 Poll::Ready(r) => {
210 consumed += 1;
211 r.made_progress();
212 continue;
213 }
214 Poll::Pending => return consumed,
215 }
216 }
217 }
218
219 /// Deterministic test for cooperative budget exhaustion.
220 ///
221 /// Ensures that [`Buffer::poll_ready`] never returns [`Poll::Pending`] when the budget
222 /// is exhausted. This should only happen when there are no permits available.
223 #[tokio::test]
224 async fn coop_budget_exhaustion_should_not_cause_buffer_poll_ready_to_return_pending() {
225 // Service chain: Buffer(1000) -> inner service
226 let inner = tower::service_fn(|_: ()| async { Ok::<_, BoxError>("ok") });
227 let mut inner_buffered = UnconstrainedBuffer::new(inner, 1000);
228
229 // Tries to reset the budget by yielding to the scheduler.
230 tokio::task::yield_now().await;
231
232 // Sanity check: with a fresh budget, `Buffer::poll_ready` should always succeed.
233 poll_fn(|cx| {
234 assert!(has_budget_remaining(), "Budget should not be exhausted");
235
236 assert!(
237 matches!(inner_buffered.poll_ready(cx), Poll::Ready(Ok(()))),
238 "Buffer::poll_ready should return Ready"
239 );
240
241 // call() acquires a permit from the inner Buffer because poll_ready succeeded.
242 let fut = inner_buffered.call(());
243 let mut fut = std::pin::pin!(fut);
244
245 // Ready(Ok(_)) or Pending (waiting for Buffer worker), never an error.
246 assert!(
247 matches!(fut.as_mut().poll(cx), Poll::Ready(Ok(_)) | Poll::Pending),
248 "Buffer::call should succeed"
249 );
250 Poll::Ready(())
251 })
252 .await;
253
254 // Tries to reset the budget by yielding to the scheduler.
255 tokio::task::yield_now().await;
256
257 // Test: buffer should not return Pending even when the coop budget is exhausted,
258 // because the inner Buffer still has capacity.
259 poll_fn(|cx| {
260 // Drain all coop budget units via `consume_all_budget` loop.
261 let budget_consumed = consume_all_budget(cx);
262
263 assert_ne!(
264 budget_consumed,
265 0,
266 "Expected non-zero budget units consumed"
267 );
268
269 assert!(
270 !has_budget_remaining(),
271 "Expected budget to be exhausted after consuming all units, but poll_proceed is still Ready"
272 );
273
274 // Budget is now 0. The inner Buffer still has 999 permits available.
275 // With a constrained budget, `poll_proceed` is called and returns `Pending`
276 // before `Semaphore::poll_acquire` is even called.
277 // With an unconstrained budget, `poll_proceed` always returns `Ready`,
278 // and `Semaphore::poll_acquire` is called normally.
279 assert!(
280 matches!(inner_buffered.poll_ready(cx), Poll::Ready(Ok(()))),
281 "Buffer::poll_ready should return Ready even with exhausted budget"
282 );
283 let fut = inner_buffered.call(());
284 let mut fut = std::pin::pin!(fut);
285
286 // Ready(Ok(_)) or Pending (waiting for Buffer worker), never an error.
287 assert!(
288 matches!(fut.as_mut().poll(cx), Poll::Ready(Ok(_)) | Poll::Pending),
289 "Buffer::call should succeed"
290 );
291 Poll::Ready(())
292 })
293 .await;
294 }
295
296 /// Deterministic test for cooperative budget exhaustion.
297 ///
298 /// This ensures that when the budget is exhausted, it does not cause premature shedding
299 /// in the [`LoadShed`] layer when the inner [`Buffer`] still has capacity but tries
300 /// to yield to the scheduler.
301 #[tokio::test]
302 async fn coop_budget_exhaustion_should_not_cause_false_shedding() {
303 // Service chain: LoadShed -> Buffer(1000) -> instant_service
304 let inner = tower::service_fn(|_: ()| async { Ok::<_, BoxError>("ok") });
305 let inner_buffered = UnconstrainedBuffer::new(inner, 1000);
306 let mut load_shed = LoadShed::new(inner_buffered);
307
308 // Tries to reset the budget by yielding to the scheduler.
309 tokio::task::yield_now().await;
310
311 // Sanity check: with a fresh budget, LoadShed should not shed and Buffer should succeed.
312 poll_fn(|cx| {
313 assert!(has_budget_remaining(), "budget should not be exhausted");
314
315 // Budget is fresh (128). poll_ready -> Acquire succeeds -> is_ready = true
316 // `LoadShed::poll_ready` always returns `Poll::Ready`.
317 assert!(
318 matches!(load_shed.poll_ready(cx), Poll::Ready(Ok(()))),
319 "LoadShed::poll_ready should return Ready"
320 );
321
322 // call() forwards to the inner Buffer because is_ready = true.
323 let fut = load_shed.call(());
324 let mut fut = std::pin::pin!(fut);
325
326 // Ensures that load shedding didn't occur.
327 assert!(
328 !matches!(fut.as_mut().poll(cx), Poll::Ready(Err(_))),
329 "requests should not be shed with fresh budget"
330 );
331 Poll::Ready(())
332 })
333 .await;
334
335 // Tries to reset the budget by yielding to the scheduler.
336 tokio::task::yield_now().await;
337
338 // Test: the load should not be shed when the buffer has capacity despite the budget being
339 // exhausted.
340 poll_fn(|cx| {
341 // Drain all coop budget units via `consume_all_budget` loop.
342 let budget_consumed = consume_all_budget(cx);
343 assert_ne!(
344 budget_consumed,
345 0,
346 "Expected non-zero budget units consumed"
347 );
348
349 assert!(
350 !has_budget_remaining(),
351 "Expected budget to be exhausted after consuming all units, but poll_proceed is still Ready"
352 );
353
354 // `LoadShed::poll_ready` always returns `Poll::Ready`.
355 assert!(
356 matches!(load_shed.poll_ready(cx), Poll::Ready(Ok(()))),
357 "LoadShed::poll_ready should return Ready"
358 );
359
360 let fut = load_shed.call(());
361 let mut fut = std::pin::pin!(fut);
362
363 // Overloaded resolves immediately in one poll.
364 let shed = match fut.as_mut().poll(cx) {
365 Poll::Ready(Err(e)) => e
366 .downcast_ref::<tower::load_shed::error::Overloaded>()
367 .is_some(),
368 _ => false,
369 };
370
371 assert!(
372 !shed,
373 "Load should not be shed (Overloaded) when there's enough Buffer permits"
374 );
375
376 Poll::Ready(())
377 })
378 .await;
379 }
380
381 /// Confirms that genuine buffer exhaustion still causes [`LoadShed`] to shed requests.
382 ///
383 /// [`UnconstrainedBuffer`] bypasses the coop budget check but must still propagate genuine
384 /// [`Poll::Pending`] from a full semaphore so that real backpressure is preserved.
385 #[tokio::test]
386 async fn full_buffer_should_still_cause_load_shedding() {
387 use std::sync::Arc;
388
389 use tokio::sync::Semaphore;
390
391 // A gate that holds the inner service blocked until we release it.
392 let gate = Arc::new(Semaphore::new(0));
393 let gate_clone = gate.clone();
394
395 let inner = tower::service_fn(move |_: ()| {
396 let gate = gate_clone.clone();
397 async move {
398 // Block until explicitly released.
399 let _permit = gate.acquire().await.unwrap();
400 Ok::<_, BoxError>("ok")
401 }
402 });
403
404 // Capacity 1: the worker holds 1 in-flight; 1 more can queue. A third makes the buffer full.
405 let inner_buffered = UnconstrainedBuffer::new(inner, 1);
406 let mut load_shed = LoadShed::new(inner_buffered);
407
408 // Request 1: accepted, worker picks it up and blocks at the gate.
409 // Buffer::call() enqueues synchronously; dropping the ResponseFuture only discards
410 // the response receiver — the request is already in the channel.
411 poll_fn(|cx| load_shed.poll_ready(cx)).await.unwrap();
412 drop(load_shed.call(()));
413
414 // Yield so the worker task runs and drains request 1 from the channel.
415 tokio::task::yield_now().await;
416
417 // Request 2: fills the channel while the worker is blocked on request 1.
418 // Same as above — drop only the response receiver, not the enqueued request.
419 poll_fn(|cx| load_shed.poll_ready(cx)).await.unwrap();
420 drop(load_shed.call(()));
421
422 // Request 3: the channel is now full. Buffer::poll_ready returns genuine Pending
423 // (not coop-induced), LoadShed must shed this request.
424 poll_fn(|cx| {
425 // LoadShed::poll_ready always returns Ready — it absorbs the inner Pending.
426 assert!(matches!(load_shed.poll_ready(cx), Poll::Ready(Ok(()))));
427
428 let fut = load_shed.call(());
429 let mut fut = std::pin::pin!(fut);
430
431 // Overloaded resolves immediately in one poll.
432 let is_overloaded = match fut.as_mut().poll(cx) {
433 Poll::Ready(Err(e)) => e
434 .downcast_ref::<tower::load_shed::error::Overloaded>()
435 .is_some(),
436 _ => false,
437 };
438
439 assert!(
440 is_overloaded,
441 "Expected Overloaded when buffer is genuinely full; \
442 UnconstrainedBuffer must not suppress real backpressure"
443 );
444
445 Poll::Ready(())
446 })
447 .await;
448
449 // Release the gate so the worker can drain and the runtime can shut down cleanly.
450 gate.add_permits(2);
451 }
452
453 /// Load-based test: ensure that shedding never happens under load with the
454 /// real Buffer Worker loop.
455 ///
456 /// What happens under burst traffic:
457 /// 1. Inner buffer fills up -> genuine [`Pending`] -> [`LoadShed`] sheds (correct).
458 /// 2. Worker loops at wire speed ([`LoadShed`] always returns [`Ready`] -> never yields).
459 /// 3. Each recv consumes 1 coop budget; after ~128 iterations, `budget = 0`.
460 /// 4. Even when the inner buffer drains and has capacity, when `Acquire` checks
461 /// [`poll_proceed`]:
462 /// - With [constrained buffer], [`poll_proceed`] returns [`Pending`] because `budget = 0`
463 /// and the load is shed.
464 /// - With [unconstrained buffer], [`poll_proceed`] returns [`Ready`] and the semaphore
465 /// is checked normally.
466 ///
467 /// [`Ready`]: Poll::Ready
468 /// [`Pending`]: Poll::Pending
469 /// [constrained buffer]: Buffer
470 /// [unconstrained buffer]: UnconstrainedBuffer
471 #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
472 async fn should_not_shed_under_load() {
473 // How many times we iterate on the test
474 let iterations: usize = 500;
475 // Total number of requests per iteration
476 let total_requests: usize = 100;
477 // Buffer capacity.
478 // Way higher than what we need, which is the total number of concurrent requests.
479 let buffer_capacity = 200;
480
481 // This can only be reliably checked with at least one layer of `LoadShed` between
482 // two `Buffer` layers.
483 // That's because the `outer_buffer` `Worker` will continuously call `LoadShed::poll_ready`,
484 // which will never return `Poll::Pending`, therefore, never yield to the scheduler within
485 // this loop.
486 // This causes the `Worker` to consume all coop budget units and eventually yield from two
487 // main flows:
488 // 1. `poll_next_msg` call when fetching the next message in the queue.
489 // 2. `poll_proceed` within `Acquire` future in an `inner_buffer` `poll_ready` call.
490 // The second flow is the one that is behind a `LoadShed` layer and will cause
491 // an `Overloaded` error upon an attempt of awaiting on a `Service::call` future.
492 let service = tower::service_fn(move |_: ()| async move { Ok::<_, BoxError>("ok") });
493 let inner_buffer = UnconstrainedBuffer::new(service, buffer_capacity);
494 let load_shed = LoadShed::new(inner_buffer);
495 let outer_buffer = UnconstrainedBuffer::new(load_shed, buffer_capacity);
496
497 let mut shed = 0usize;
498 let mut other_err = 0usize;
499 let mut tasks = JoinSet::new();
500
501 for _ in 0..iterations {
502 // send all requests
503 for _ in 0..total_requests {
504 let svc = outer_buffer.clone();
505 tasks.spawn(async move {
506 // Each spawned task calls ready().await then call()
507 let mut svc = svc;
508 let svc = tower::ServiceExt::ready(&mut svc).await;
509 match svc {
510 Ok(svc) => svc.call(()).await,
511 Err(e) => Err(e),
512 }
513 });
514 }
515
516 // wait all spawned tasks to resolve
517 while let Some(handle) = tasks.join_next().await {
518 if let Err(e) = handle.expect("task panicked") {
519 if e.downcast_ref::<tower::load_shed::error::Overloaded>()
520 .is_some()
521 {
522 shed += 1;
523 } else {
524 other_err += 1;
525 }
526 }
527 }
528 }
529
530 assert_eq!(shed, 0, "Expected all requests to succeed without shedding");
531 assert_eq!(
532 other_err, 0,
533 "Expected all requests to succeed without errors"
534 );
535 }
536}