burger/
buffer.rs

1//! The [`ServiceExt::buffer`](crate::ServiceExt::buffer) combinator returns [`Buffer`], whose
2//! [`Service::acquire`] immediately resolves until the buffer is at maximum capacity, at which point
3//! it defers to the inner service's [`Service::acquire`]. The buffer is drained when the inner
4//! service's permit becomes available.
5//!
6//! # Example
7//!
8//! ```rust
9//! use burger::*;
10//! # use tokio::{join, time::sleep};
11//! # use std::time::Duration;
12//!
13//! # #[tokio::main]
14//! # async fn main() {
15//! let svc = service_fn(|x| async move {
16//!     sleep(Duration::from_secs(1)).await;
17//!     x + 1
18//! })
19//! .concurrency_limit(1)
20//! .buffer(2)
21//! .load_shed();
22//! let (a, b, c, d) = join! {
23//!     svc.oneshot(9),
24//!     svc.oneshot(2),
25//!     svc.oneshot(1),
26//!     svc.oneshot(5)
27//! };
28//! assert_eq!(a, Ok(10));
29//! assert_eq!(b, Ok(3));
30//! assert_eq!(c, Ok(2));
31//! assert_eq!(d, Err(5));
32//! # }
33//! ```
34//!
35//! # Load
36//!
37//! The [`Load::load`] on [`Buffer`] defers to the inner service.
38
39use std::fmt;
40
41use futures_util::FutureExt;
42use tokio::sync::{Semaphore, SemaphorePermit};
43
44use crate::{load::Load, Middleware, Service};
45
46/// A wrapper [`Service`] for the [`ServiceExt::buffer`](crate::ServiceExt::buffer) combinator.
47///
48/// See the [module](crate::buffer) for more information.
49#[derive(Debug)]
50pub struct Buffer<S> {
51    inner: S,
52    semaphore: Semaphore,
53}
54
55impl<S> Buffer<S> {
56    pub(crate) fn new(inner: S, capacity: usize) -> Self {
57        Self {
58            inner,
59            semaphore: Semaphore::new(capacity),
60        }
61    }
62}
63
64/// The [`Service::Permit`] type for [`Buffer`].
65pub struct BufferPermit<'a, S, Request>
66where
67    S: Service<Request>,
68{
69    inner: BufferPermitInner<'a, S, Request>,
70}
71
72impl<'a, S, Request> fmt::Debug for BufferPermit<'a, S, Request>
73where
74    S: Service<Request>,
75    BufferPermitInner<'a, S, Request>: fmt::Debug,
76{
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        f.debug_struct("BufferPermit")
79            .field("inner", &self.inner)
80            .finish()
81    }
82}
83
84enum BufferPermitInner<'a, S, Request>
85where
86    S: Service<Request>,
87{
88    Eager(S::Permit<'a>),
89    Buffered(&'a S, SemaphorePermit<'a>),
90}
91
92impl<'a, S, Request> fmt::Debug for BufferPermitInner<'a, S, Request>
93where
94    S: Service<Request> + fmt::Debug,
95    S::Permit<'a>: fmt::Debug,
96{
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        match self {
99            Self::Eager(arg0) => f.debug_tuple("Eager").field(arg0).finish(),
100            Self::Buffered(arg0, arg1) => {
101                f.debug_tuple("Buffered").field(arg0).field(arg1).finish()
102            }
103        }
104    }
105}
106
107impl<Request, S> Service<Request> for Buffer<S>
108where
109    S: Service<Request>,
110{
111    type Response = S::Response;
112    type Permit<'a> = BufferPermit<'a, S, Request>
113    where
114        S: 'a;
115
116    async fn acquire(&self) -> Self::Permit<'_> {
117        BufferPermit {
118            inner: match self.inner.acquire().now_or_never() {
119                Some(some) => BufferPermitInner::Eager(some),
120                None => BufferPermitInner::Buffered(
121                    &self.inner,
122                    self.semaphore.acquire().await.expect("not closed"),
123                ),
124            },
125        }
126    }
127
128    async fn call(permit: Self::Permit<'_>, request: Request) -> Self::Response {
129        let permit = match permit.inner {
130            BufferPermitInner::Eager(permit) => permit,
131            BufferPermitInner::Buffered(service, _permit) => {
132                let permit = service.acquire().await;
133                drop(_permit);
134                permit
135            }
136        };
137        S::call(permit, request).await
138    }
139}
140
141impl<S> Load for Buffer<S>
142where
143    S: Load,
144{
145    type Metric = S::Metric;
146
147    fn load(&self) -> Self::Metric {
148        self.inner.load()
149    }
150}
151
152impl<S, T> Middleware<S> for Buffer<T>
153where
154    T: Middleware<S>,
155{
156    type Service = Buffer<T::Service>;
157
158    fn apply(self, svc: S) -> Self::Service {
159        let Self { inner, semaphore } = self;
160        Buffer {
161            inner: inner.apply(svc),
162            semaphore,
163        }
164    }
165}