Skip to main content

apollo_router/layers/
unconstrained_buffer.rs

1//! A wrapper around [`Buffer`] that runs [`poll_ready`] inside
2//! [`unconstrained`], preventing the cooperative budget from causing
3//! a [`Poll::Pending`] yield when the inner semaphore still has capacity.
4//!
5//! Without this, a [`Buffer`] that sits behind a [`LoadShed`]
6//! layer can be falsely shed: the Tokio coop budget reaches zero, [`poll_proceed`]
7//! returns [`Pending`], and [`LoadShed`] interprets that as the service not being
8//! ready, immediately returning an [`Overloaded`] error.
9//!
10//! By polling the inner [`Buffer`] in an unconstrained context, the coop budget
11//! check is bypassed and readiness is determined solely by the actual semaphore
12//! permit availability.
13//!
14//! ## Cases where this matters
15//!
16//! This only matters when a [`Buffer`] is behind a [`LoadShed`], and the problem
17//! is amplified when there's another [`Buffer`] in front of that [`LoadShed`],
18//! building a structure like: `Buffer(LoadShed(Buffer(service)))`.
19//!
20//! ### Amplification
21//!
22//! The amplification happens because the `Worker` loop of the outer `Buffer` picks up a message and
23//! then calls [`LoadShed::poll_ready`], which always returns [`Ready`] and never
24//! attempts to yield to the scheduler.
25//!
26//! During the [`LoadShed::poll_ready`] call, [`Buffer::poll_ready`] is called on the inner
27//! `Buffer`, which is where the semaphore permit availability is checked.
28//! However, before checking the semaphore, it will call [`poll_proceed`] to check coop budget
29//! availability. If the coop budget is exhausted, [`poll_proceed`] will return [`Pending`],
30//! which will "bubble up" to the [`LoadShed`] layer. This layer stores readiness as `false`
31//! but still returns [`Ready`] to the outer `Buffer` `Worker`.
32//!
33//! This means that the `Worker` keeps looping and consuming coop budget until it hits the
34//! coop budget check within the `poll_next_msg` which returns [`Pending`]. However, since this
35//! is the top-level running task future, there's nothing absorbing this state,
36//! and the `Worker` will yield to the scheduler.
37//!
38//! This will likely happen right after [`LoadShed`] observes a [`Buffer::poll_ready`]
39//! return [`Pending`] because further calls to [`poll_proceed`] will keep returning [`Pending`]
40//! until the scheduler resets the coop budget.
41//!
42//! On a single-threaded runtime or contended scenario, this is the moment where all accumulated
43//! [`Overloaded`] errors will start to show up one after another in "waves".
44//!
45//! [`Pending`]: Poll::Pending
46//! [`Ready`]: Poll::Ready
47//! [`unconstrained`]: tokio::task::unconstrained
48//! [`poll_ready`]: Service::poll_ready
49//! [`Buffer::poll_ready`]: Service::poll_ready
50//! [`LoadShed::poll_ready`]: Service::poll_ready
51//! [`poll_proceed`]: tokio::task::coop::poll_proceed
52//! [`LoadShed`]: tower::load_shed::LoadShed
53//! [`Overloaded`]: tower::load_shed::error::Overloaded
54use std::fmt;
55use std::future::Future;
56use std::marker::PhantomData;
57use std::task::Context;
58use std::task::Poll;
59
60use tower::BoxError;
61use tower::Layer;
62use tower::buffer::Buffer;
63use tower::buffer::future::ResponseFuture;
64use tower_service::Service;
65
66/// Adds a [coop unconstrained](tokio::task::unconstrained) [`Buffer`] layer to a service.
67///
68/// See the module documentation for more details.
69#[derive(Clone, Copy)]
70pub struct UnconstrainedBufferLayer<Request> {
71    bound: usize,
72    _p: PhantomData<fn(Request)>,
73}
74
75impl<Request> UnconstrainedBufferLayer<Request> {
76    /// Creates a new [`UnconstrainedBufferLayer`] with the provided `bound`.
77    ///
78    /// `bound` gives the maximal number of requests that can be queued for the service before
79    /// backpressure is applied to callers.
80    ///
81    /// See [`Buffer::new`] for guidance on choosing a `bound`.
82    pub const fn new(bound: usize) -> Self {
83        UnconstrainedBufferLayer {
84            bound,
85            _p: PhantomData,
86        }
87    }
88}
89
90impl<S, Request> Layer<S> for UnconstrainedBufferLayer<Request>
91where
92    S: Service<Request> + Send + 'static,
93    S::Future: Send,
94    S::Error: Into<BoxError> + Send + Sync,
95    Request: Send + 'static,
96{
97    type Service = UnconstrainedBuffer<Request, S::Future>;
98
99    fn layer(&self, service: S) -> Self::Service {
100        UnconstrainedBuffer::new(service, self.bound)
101    }
102}
103
104impl<Request> fmt::Debug for UnconstrainedBufferLayer<Request> {
105    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
106        f.debug_struct("UnconstrainedBufferLayer")
107            .field("bound", &self.bound)
108            .finish()
109    }
110}
111
112/// A wrapper around [`Buffer`] that runs [`poll_ready`] inside
113/// [`unconstrained`], preventing the cooperative budget from causing
114/// a [`Pending`] yield when the inner semaphore still has capacity.
115///
116/// See the module documentation for more details.
117///
118/// [`Pending`]: Poll::Pending
119/// [`unconstrained`]: tokio::task::unconstrained
120/// [`poll_ready`]: Service::poll_ready
121#[derive(Debug)]
122pub struct UnconstrainedBuffer<Req, F> {
123    /// The inner [`Buffer`] layer, which wraps the actual service and is responsible for
124    /// buffering requests.
125    inner: Buffer<Req, F>,
126}
127
128impl<Req, F> UnconstrainedBuffer<Req, F>
129where
130    F: 'static,
131{
132    /// Creates a new `UnconstrainedBuffer` with the specified service and buffer capacity.
133    pub fn new<S>(service: S, bound: usize) -> Self
134    where
135        S: Service<Req, Future = F> + Send + 'static,
136        F: Send,
137        S::Error: Into<BoxError> + Send + Sync,
138        Req: Send + 'static,
139    {
140        let inner = Buffer::new(service, bound);
141
142        Self { inner }
143    }
144}
145
146impl<Req, Rsp, F, E> Service<Req> for UnconstrainedBuffer<Req, F>
147where
148    F: Future<Output = Result<Rsp, E>> + Send + 'static,
149    E: Into<BoxError>,
150    Req: Send + 'static,
151{
152    type Response = Rsp;
153    type Error = BoxError;
154    type Future = ResponseFuture<F>;
155
156    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
157        std::pin::pin!(tokio::task::unconstrained(std::future::poll_fn(|cx| {
158            self.inner.poll_ready(cx)
159        })))
160        .as_mut()
161        .poll(cx)
162    }
163
164    fn call(&mut self, request: Req) -> Self::Future {
165        self.inner.call(request)
166    }
167}
168
169impl<Req, F> Clone for UnconstrainedBuffer<Req, F>
170where
171    Req: Send + 'static,
172    F: Send + 'static,
173{
174    fn clone(&self) -> Self {
175        Self {
176            inner: self.inner.clone(),
177        }
178    }
179}
180
181#[cfg(test)]
182mod tests {
183    use std::future::poll_fn;
184    use std::task::Poll;
185
186    use tokio::task::JoinSet;
187    use tokio::task::coop::has_budget_remaining;
188    use tokio::task::coop::poll_proceed;
189    use tower::BoxError;
190    use tower::Service;
191    use tower::load_shed::LoadShed;
192
193    use super::*;
194
195    /// Consumes all available computational budget in the given context until a pending state is reached.
196    ///
197    /// This function repeatedly polls the [`poll_proceed`] function within the provided context (`cx`)
198    /// to exhaust the computational budget available and returns the total number of units consumed
199    /// before reaching a pending state.
200    ///
201    /// # Notes
202    /// - This function will loop indefinitely if [`poll_proceed`] never returns [`Poll::Pending`],
203    ///   which is the case for tasks being executed in a [`tokio::task::unconstrained`] context.
204    fn consume_all_budget(cx: &mut Context) -> usize {
205        let mut consumed = 0;
206        loop {
207            let restore = poll_proceed(cx);
208            match restore {
209                Poll::Ready(r) => {
210                    consumed += 1;
211                    r.made_progress();
212                    continue;
213                }
214                Poll::Pending => return consumed,
215            }
216        }
217    }
218
219    /// Deterministic test for cooperative budget exhaustion.
220    ///
221    /// Ensures that [`Buffer::poll_ready`] never returns [`Poll::Pending`] when the budget
222    /// is exhausted. This should only happen when there are no permits available.
223    #[tokio::test]
224    async fn coop_budget_exhaustion_should_not_cause_buffer_poll_ready_to_return_pending() {
225        // Service chain: Buffer(1000) -> inner service
226        let inner = tower::service_fn(|_: ()| async { Ok::<_, BoxError>("ok") });
227        let mut inner_buffered = UnconstrainedBuffer::new(inner, 1000);
228
229        // Tries to reset the budget by yielding to the scheduler.
230        tokio::task::yield_now().await;
231
232        // Sanity check: with a fresh budget, `Buffer::poll_ready` should always succeed.
233        poll_fn(|cx| {
234            assert!(has_budget_remaining(), "Budget should not be exhausted");
235
236            assert!(
237                matches!(inner_buffered.poll_ready(cx), Poll::Ready(Ok(()))),
238                "Buffer::poll_ready should return Ready"
239            );
240
241            // call() acquires a permit from the inner Buffer because poll_ready succeeded.
242            let fut = inner_buffered.call(());
243            let mut fut = std::pin::pin!(fut);
244
245            // Ready(Ok(_)) or Pending (waiting for Buffer worker), never an error.
246            assert!(
247                matches!(fut.as_mut().poll(cx), Poll::Ready(Ok(_)) | Poll::Pending),
248                "Buffer::call should succeed"
249            );
250            Poll::Ready(())
251        })
252        .await;
253
254        // Tries to reset the budget by yielding to the scheduler.
255        tokio::task::yield_now().await;
256
257        // Test: buffer should not return Pending even when the coop budget is exhausted,
258        // because the inner Buffer still has capacity.
259        poll_fn(|cx| {
260            // Drain all coop budget units via `consume_all_budget` loop.
261            let budget_consumed = consume_all_budget(cx);
262
263            assert_ne!(
264                budget_consumed,
265                0,
266                "Expected non-zero budget units consumed"
267            );
268
269            assert!(
270                !has_budget_remaining(),
271                "Expected budget to be exhausted after consuming all units, but poll_proceed is still Ready"
272            );
273
274            // Budget is now 0. The inner Buffer still has 999 permits available.
275            // With a constrained budget, `poll_proceed` is called and returns `Pending`
276            // before `Semaphore::poll_acquire` is even called.
277            // With an unconstrained budget, `poll_proceed` always returns `Ready`,
278            // and `Semaphore::poll_acquire` is called normally.
279            assert!(
280                matches!(inner_buffered.poll_ready(cx), Poll::Ready(Ok(()))),
281                "Buffer::poll_ready should return Ready even with exhausted budget"
282            );
283            let fut = inner_buffered.call(());
284            let mut fut = std::pin::pin!(fut);
285
286            // Ready(Ok(_)) or Pending (waiting for Buffer worker), never an error.
287            assert!(
288                matches!(fut.as_mut().poll(cx), Poll::Ready(Ok(_)) | Poll::Pending),
289                "Buffer::call should succeed"
290            );
291            Poll::Ready(())
292        })
293            .await;
294    }
295
296    /// Deterministic test for cooperative budget exhaustion.
297    ///
298    /// This ensures that when the budget is exhausted, it does not cause premature shedding
299    /// in the [`LoadShed`] layer when the inner [`Buffer`] still has capacity but tries
300    /// to yield to the scheduler.
301    #[tokio::test]
302    async fn coop_budget_exhaustion_should_not_cause_false_shedding() {
303        // Service chain: LoadShed -> Buffer(1000) -> instant_service
304        let inner = tower::service_fn(|_: ()| async { Ok::<_, BoxError>("ok") });
305        let inner_buffered = UnconstrainedBuffer::new(inner, 1000);
306        let mut load_shed = LoadShed::new(inner_buffered);
307
308        // Tries to reset the budget by yielding to the scheduler.
309        tokio::task::yield_now().await;
310
311        // Sanity check: with a fresh budget, LoadShed should not shed and Buffer should succeed.
312        poll_fn(|cx| {
313            assert!(has_budget_remaining(), "budget should not be exhausted");
314
315            // Budget is fresh (128). poll_ready -> Acquire succeeds -> is_ready = true
316            // `LoadShed::poll_ready` always returns `Poll::Ready`.
317            assert!(
318                matches!(load_shed.poll_ready(cx), Poll::Ready(Ok(()))),
319                "LoadShed::poll_ready should return Ready"
320            );
321
322            // call() forwards to the inner Buffer because is_ready = true.
323            let fut = load_shed.call(());
324            let mut fut = std::pin::pin!(fut);
325
326            // Ensures that load shedding didn't occur.
327            assert!(
328                !matches!(fut.as_mut().poll(cx), Poll::Ready(Err(_))),
329                "requests should not be shed with fresh budget"
330            );
331            Poll::Ready(())
332        })
333        .await;
334
335        // Tries to reset the budget by yielding to the scheduler.
336        tokio::task::yield_now().await;
337
338        // Test: the load should not be shed when the buffer has capacity despite the budget being
339        // exhausted.
340        poll_fn(|cx| {
341            // Drain all coop budget units via `consume_all_budget` loop.
342            let budget_consumed = consume_all_budget(cx);
343            assert_ne!(
344                budget_consumed,
345                0,
346                "Expected non-zero budget units consumed"
347            );
348
349            assert!(
350                !has_budget_remaining(),
351                "Expected budget to be exhausted after consuming all units, but poll_proceed is still Ready"
352            );
353
354            // `LoadShed::poll_ready` always returns `Poll::Ready`.
355            assert!(
356                matches!(load_shed.poll_ready(cx), Poll::Ready(Ok(()))),
357                "LoadShed::poll_ready should return Ready"
358            );
359
360            let fut = load_shed.call(());
361            let mut fut = std::pin::pin!(fut);
362
363            // Overloaded resolves immediately in one poll.
364            let shed = match fut.as_mut().poll(cx) {
365                Poll::Ready(Err(e)) => e
366                    .downcast_ref::<tower::load_shed::error::Overloaded>()
367                    .is_some(),
368                _ => false,
369            };
370
371            assert!(
372                !shed,
373                "Load should not be shed (Overloaded) when there's enough Buffer permits"
374            );
375
376            Poll::Ready(())
377        })
378            .await;
379    }
380
381    /// Confirms that genuine buffer exhaustion still causes [`LoadShed`] to shed requests.
382    ///
383    /// [`UnconstrainedBuffer`] bypasses the coop budget check but must still propagate genuine
384    /// [`Poll::Pending`] from a full semaphore so that real backpressure is preserved.
385    #[tokio::test]
386    async fn full_buffer_should_still_cause_load_shedding() {
387        use std::sync::Arc;
388
389        use tokio::sync::Semaphore;
390
391        // A gate that holds the inner service blocked until we release it.
392        let gate = Arc::new(Semaphore::new(0));
393        let gate_clone = gate.clone();
394
395        let inner = tower::service_fn(move |_: ()| {
396            let gate = gate_clone.clone();
397            async move {
398                // Block until explicitly released.
399                let _permit = gate.acquire().await.unwrap();
400                Ok::<_, BoxError>("ok")
401            }
402        });
403
404        // Capacity 1: the worker holds 1 in-flight; 1 more can queue. A third makes the buffer full.
405        let inner_buffered = UnconstrainedBuffer::new(inner, 1);
406        let mut load_shed = LoadShed::new(inner_buffered);
407
408        // Request 1: accepted, worker picks it up and blocks at the gate.
409        // Buffer::call() enqueues synchronously; dropping the ResponseFuture only discards
410        // the response receiver — the request is already in the channel.
411        poll_fn(|cx| load_shed.poll_ready(cx)).await.unwrap();
412        drop(load_shed.call(()));
413
414        // Yield so the worker task runs and drains request 1 from the channel.
415        tokio::task::yield_now().await;
416
417        // Request 2: fills the channel while the worker is blocked on request 1.
418        // Same as above — drop only the response receiver, not the enqueued request.
419        poll_fn(|cx| load_shed.poll_ready(cx)).await.unwrap();
420        drop(load_shed.call(()));
421
422        // Request 3: the channel is now full. Buffer::poll_ready returns genuine Pending
423        // (not coop-induced), LoadShed must shed this request.
424        poll_fn(|cx| {
425            // LoadShed::poll_ready always returns Ready — it absorbs the inner Pending.
426            assert!(matches!(load_shed.poll_ready(cx), Poll::Ready(Ok(()))));
427
428            let fut = load_shed.call(());
429            let mut fut = std::pin::pin!(fut);
430
431            // Overloaded resolves immediately in one poll.
432            let is_overloaded = match fut.as_mut().poll(cx) {
433                Poll::Ready(Err(e)) => e
434                    .downcast_ref::<tower::load_shed::error::Overloaded>()
435                    .is_some(),
436                _ => false,
437            };
438
439            assert!(
440                is_overloaded,
441                "Expected Overloaded when buffer is genuinely full; \
442                 UnconstrainedBuffer must not suppress real backpressure"
443            );
444
445            Poll::Ready(())
446        })
447        .await;
448
449        // Release the gate so the worker can drain and the runtime can shut down cleanly.
450        gate.add_permits(2);
451    }
452
453    /// Load-based test: ensure that shedding never happens under load with the
454    /// real Buffer Worker loop.
455    ///
456    /// What happens under burst traffic:
457    ///   1. Inner buffer fills up -> genuine [`Pending`] -> [`LoadShed`] sheds (correct).
458    ///   2. Worker loops at wire speed ([`LoadShed`] always returns [`Ready`] -> never yields).
459    ///   3. Each recv consumes 1 coop budget; after ~128 iterations, `budget = 0`.
460    ///   4. Even when the inner buffer drains and has capacity, when `Acquire` checks
461    ///      [`poll_proceed`]:
462    ///      - With [constrained buffer], [`poll_proceed`] returns [`Pending`] because `budget = 0`
463    ///        and the load is shed.
464    ///      - With [unconstrained buffer], [`poll_proceed`] returns [`Ready`] and the semaphore
465    ///        is checked normally.
466    ///
467    /// [`Ready`]: Poll::Ready
468    /// [`Pending`]: Poll::Pending
469    /// [constrained buffer]: Buffer
470    /// [unconstrained buffer]: UnconstrainedBuffer
471    #[tokio::test(flavor = "multi_thread", worker_threads = 4)]
472    async fn should_not_shed_under_load() {
473        // How many times we iterate on the test
474        let iterations: usize = 500;
475        // Total number of requests per iteration
476        let total_requests: usize = 100;
477        // Buffer capacity.
478        // Way higher than what we need, which is the total number of concurrent requests.
479        let buffer_capacity = 200;
480
481        // This can only be reliably checked with at least one layer of `LoadShed` between
482        // two `Buffer` layers.
483        // That's because the `outer_buffer` `Worker` will continuously call `LoadShed::poll_ready`,
484        // which will never return `Poll::Pending`, therefore, never yield to the scheduler within
485        // this loop.
486        // This causes the `Worker` to consume all coop budget units and eventually yield from two
487        // main flows:
488        // 1. `poll_next_msg` call when fetching the next message in the queue.
489        // 2. `poll_proceed` within `Acquire` future in an `inner_buffer` `poll_ready` call.
490        // The second flow is the one that is behind a `LoadShed` layer and will cause
491        // an `Overloaded` error upon an attempt of awaiting on a `Service::call` future.
492        let service = tower::service_fn(move |_: ()| async move { Ok::<_, BoxError>("ok") });
493        let inner_buffer = UnconstrainedBuffer::new(service, buffer_capacity);
494        let load_shed = LoadShed::new(inner_buffer);
495        let outer_buffer = UnconstrainedBuffer::new(load_shed, buffer_capacity);
496
497        let mut shed = 0usize;
498        let mut other_err = 0usize;
499        let mut tasks = JoinSet::new();
500
501        for _ in 0..iterations {
502            // send all requests
503            for _ in 0..total_requests {
504                let svc = outer_buffer.clone();
505                tasks.spawn(async move {
506                    // Each spawned task calls ready().await then call()
507                    let mut svc = svc;
508                    let svc = tower::ServiceExt::ready(&mut svc).await;
509                    match svc {
510                        Ok(svc) => svc.call(()).await,
511                        Err(e) => Err(e),
512                    }
513                });
514            }
515
516            // wait all spawned tasks to resolve
517            while let Some(handle) = tasks.join_next().await {
518                if let Err(e) = handle.expect("task panicked") {
519                    if e.downcast_ref::<tower::load_shed::error::Overloaded>()
520                        .is_some()
521                    {
522                        shed += 1;
523                    } else {
524                        other_err += 1;
525                    }
526                }
527            }
528        }
529
530        assert_eq!(shed, 0, "Expected all requests to succeed without shedding");
531        assert_eq!(
532            other_err, 0,
533            "Expected all requests to succeed without errors"
534        );
535    }
536}