use dragonfly_client_util::ratelimiter::bbr::BBR;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use tonic::body::Body;
use tonic::codegen::http::{Request, Response};
use tonic::Status;
use tower::{Layer, Service};
#[derive(Clone)]
pub struct BBRLayer {
bbr: Arc<BBR>,
}
impl BBRLayer {
pub fn new(bbr: Arc<BBR>) -> Self {
Self { bbr }
}
}
impl<S> Layer<S> for BBRLayer {
type Service = BBRService<S>;
fn layer(&self, inner: S) -> Self::Service {
BBRService {
inner,
bbr: self.bbr.clone(),
}
}
}
#[derive(Clone)]
pub struct BBRService<S> {
inner: S,
bbr: Arc<BBR>,
}
impl<S> Service<Request<Body>> for BBRService<S>
where
S: Service<Request<Body>, Response = Response<Body>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Send + 'static,
{
type Response = S::Response;
type Error = S::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: Request<Body>) -> Self::Future {
let bbr = self.bbr.clone();
let inner_clone = self.inner.clone();
let mut inner = std::mem::replace(&mut self.inner, inner_clone);
Box::pin(async move {
let _guard = match bbr.acquire().await {
Some(guard) => guard,
None => {
return Ok(Status::resource_exhausted(
"server is overloaded: CPU/memory thresholds are exceeded, please retry later",
)
.into_http());
}
};
inner.call(req).await
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use dragonfly_client_util::ratelimiter::bbr::BBRConfig;
#[tokio::test]
async fn test_bbr_layer_creation() {
let bbr = Arc::new(BBR::new(BBRConfig::default()).await);
let layer = BBRLayer::new(bbr);
let _ = layer.clone();
}
}