use crate::service::node::limiter::ciad::{CiadConcurrencyLimiter, EndpointLevel, HostLevel};
use crate::util::weak_reducing_gauge::Reduce;
use conjure_error::Error;
use futures::future::{self, MaybeDone};
use futures::ready;
use http::{Method, Response};
use parking_lot::Mutex;
use pin_project::pin_project;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
mod ciad;
mod deficit_semaphore;
#[derive(PartialEq, Eq, Hash)]
struct Endpoint {
method: Method,
pattern: &'static str,
}
pub struct Limiter {
host: Arc<CiadConcurrencyLimiter<HostLevel>>,
endpoints: Mutex<HashMap<Endpoint, Arc<CiadConcurrencyLimiter<EndpointLevel>>>>,
}
impl Limiter {
pub fn new() -> Self {
Limiter {
host: CiadConcurrencyLimiter::new(),
endpoints: Mutex::new(HashMap::new()),
}
}
pub fn host_limiter(&self) -> &Arc<CiadConcurrencyLimiter<HostLevel>> {
&self.host
}
pub fn acquire(&self, method: &Method, pattern: &'static str) -> Acquire {
Acquire {
endpoint: future::maybe_done(
self.endpoints
.lock()
.entry(Endpoint {
method: method.clone(),
pattern,
})
.or_insert_with(CiadConcurrencyLimiter::new)
.clone()
.acquire(),
),
host: self.host.clone().acquire(),
}
}
}
#[pin_project]
pub struct Acquire {
#[pin]
endpoint: MaybeDone<ciad::Acquire<EndpointLevel>>,
#[pin]
host: ciad::Acquire<HostLevel>,
}
impl Future for Acquire {
type Output = Permit;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
ready!(this.endpoint.as_mut().poll(cx));
let host = ready!(this.host.poll(cx));
Poll::Ready(Permit {
endpoint: this.endpoint.take_output().unwrap(),
host,
})
}
}
pub struct Permit {
endpoint: ciad::Permit<EndpointLevel>,
host: ciad::Permit<HostLevel>,
}
impl Permit {
pub fn on_response<B>(&mut self, response: &Result<Response<B>, Error>) {
self.endpoint.on_response(response);
self.host.on_response(response);
}
}
pub struct LimitReducer;
impl Reduce for LimitReducer {
type Input = CiadConcurrencyLimiter<HostLevel>;
type Value = f64;
fn default(&self) -> Self::Value {
0.
}
fn map(&self, v: &Self::Input) -> Self::Value {
v.limit()
}
fn reduce(&self, a: &mut Self::Value, b: Self::Value) {
*a = f64::min(*a, b);
}
}
pub struct InFlightReducer;
impl Reduce for InFlightReducer {
type Input = CiadConcurrencyLimiter<HostLevel>;
type Value = usize;
fn default(&self) -> Self::Value {
0
}
fn map(&self, v: &Self::Input) -> Self::Value {
v.in_flight()
}
fn reduce(&self, a: &mut Self::Value, b: Self::Value) {
*a += b;
}
}