1use std::fmt;
40
41use futures_util::FutureExt;
42use tokio::sync::{Semaphore, SemaphorePermit};
43
44use crate::{load::Load, Middleware, Service};
45
46#[derive(Debug)]
50pub struct Buffer<S> {
51 inner: S,
52 semaphore: Semaphore,
53}
54
55impl<S> Buffer<S> {
56 pub(crate) fn new(inner: S, capacity: usize) -> Self {
57 Self {
58 inner,
59 semaphore: Semaphore::new(capacity),
60 }
61 }
62}
63
64pub struct BufferPermit<'a, S, Request>
66where
67 S: Service<Request>,
68{
69 inner: BufferPermitInner<'a, S, Request>,
70}
71
72impl<'a, S, Request> fmt::Debug for BufferPermit<'a, S, Request>
73where
74 S: Service<Request>,
75 BufferPermitInner<'a, S, Request>: fmt::Debug,
76{
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78 f.debug_struct("BufferPermit")
79 .field("inner", &self.inner)
80 .finish()
81 }
82}
83
84enum BufferPermitInner<'a, S, Request>
85where
86 S: Service<Request>,
87{
88 Eager(S::Permit<'a>),
89 Buffered(&'a S, SemaphorePermit<'a>),
90}
91
92impl<'a, S, Request> fmt::Debug for BufferPermitInner<'a, S, Request>
93where
94 S: Service<Request> + fmt::Debug,
95 S::Permit<'a>: fmt::Debug,
96{
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 match self {
99 Self::Eager(arg0) => f.debug_tuple("Eager").field(arg0).finish(),
100 Self::Buffered(arg0, arg1) => {
101 f.debug_tuple("Buffered").field(arg0).field(arg1).finish()
102 }
103 }
104 }
105}
106
107impl<Request, S> Service<Request> for Buffer<S>
108where
109 S: Service<Request>,
110{
111 type Response = S::Response;
112 type Permit<'a> = BufferPermit<'a, S, Request>
113 where
114 S: 'a;
115
116 async fn acquire(&self) -> Self::Permit<'_> {
117 BufferPermit {
118 inner: match self.inner.acquire().now_or_never() {
119 Some(some) => BufferPermitInner::Eager(some),
120 None => BufferPermitInner::Buffered(
121 &self.inner,
122 self.semaphore.acquire().await.expect("not closed"),
123 ),
124 },
125 }
126 }
127
128 async fn call(permit: Self::Permit<'_>, request: Request) -> Self::Response {
129 let permit = match permit.inner {
130 BufferPermitInner::Eager(permit) => permit,
131 BufferPermitInner::Buffered(service, _permit) => {
132 let permit = service.acquire().await;
133 drop(_permit);
134 permit
135 }
136 };
137 S::call(permit, request).await
138 }
139}
140
141impl<S> Load for Buffer<S>
142where
143 S: Load,
144{
145 type Metric = S::Metric;
146
147 fn load(&self) -> Self::Metric {
148 self.inner.load()
149 }
150}
151
152impl<S, T> Middleware<S> for Buffer<T>
153where
154 T: Middleware<S>,
155{
156 type Service = Buffer<T::Service>;
157
158 fn apply(self, svc: S) -> Self::Service {
159 let Self { inner, semaphore } = self;
160 Buffer {
161 inner: inner.apply(svc),
162 semaphore,
163 }
164 }
165}