use pdk_core::log::debug;
use pdk_core::policy_context::api::Tier;
use serde::{Deserialize, Serialize};
use std::cmp::max;
use super::super::distribution_formula::DistributionFormula;
use super::{LimitStats, QuotaInfo, RequestAllowed};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClusterBucket {
limits: Vec<ClusterLimit>,
}
impl ClusterBucket {
pub fn new(start_time: u128, tiers: &[Tier]) -> Self {
Self {
limits: tiers
.iter()
.map(|tier| ClusterLimit::from(start_time, tier))
.collect(),
}
}
pub fn request_allowed(&mut self, now: u128, amount: usize) -> RequestAllowed {
self.set_time(now);
if self.limits.iter().all(|l| l.remaining(amount)) {
self.limits.iter_mut().for_each(|l| l.allow_request(amount));
RequestAllowed::Allowed
} else if self.limits.iter().all(|l| l.cluster_remaining(amount)) {
RequestAllowed::OutOfLocalQuota
} else {
RequestAllowed::OutOfQuota
}
}
pub fn status(&self) -> Option<LimitStats> {
self.limits
.iter()
.map(ClusterLimit::stats)
.reduce(LimitStats::most_restrictive)
}
pub fn get_quota(
&mut self,
now: u128,
bucket: &ClusterBucket,
formula: &DistributionFormula,
amount: usize,
) -> Vec<QuotaInfo> {
self.set_time(now);
let mut quota_info = Vec::new();
for (cluster, local) in self.limits.iter_mut().zip(bucket.limits.iter()) {
if local.available >= amount as u64 && local.reset == cluster.reset {
quota_info.push(QuotaInfo::new(0, cluster.cluster_remaining, cluster.reset))
} else {
quota_info.push(cluster.get_quota_portion(max(
formula.portion(cluster),
amount as u64 - local.available,
)));
}
}
quota_info
}
pub fn update_quota(&mut self, quota: &[QuotaInfo]) {
for (local, quota) in self.limits.iter_mut().zip(quota.iter()) {
local.update_quota(quota)
}
}
fn set_time(&mut self, now: u128) {
self.limits.iter_mut().for_each(|limit| limit.set_time(now));
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClusterLimit {
pub available: u64,
pub cluster_remaining: u64,
pub max_requests: u64,
pub period: u128,
pub reset: u128,
}
impl ClusterLimit {
fn from(start_time: u128, tier: &Tier) -> Self {
Self {
available: 0,
cluster_remaining: tier.requests,
max_requests: tier.requests,
period: tier.period_in_millis as u128,
reset: start_time + tier.period_in_millis as u128,
}
}
fn stats(&self) -> LimitStats {
LimitStats::new(
self.available + self.cluster_remaining,
self.max_requests,
self.reset,
)
}
fn set_time(&mut self, now: u128) {
while self.reset <= now {
self.reset += self.period;
self.available = 0;
self.cluster_remaining = self.max_requests;
}
}
fn update_quota(&mut self, quota: &QuotaInfo) {
if quota.reset != self.reset {
self.available = quota.given;
self.reset = quota.reset;
self.cluster_remaining = quota.remaining;
} else {
self.available += quota.given;
self.cluster_remaining = quota.remaining;
}
}
fn remaining(&self, amount: usize) -> bool {
self.available >= amount as u64
}
fn cluster_remaining(&self, amount: usize) -> bool {
self.available + self.cluster_remaining >= amount as u64
}
fn allow_request(&mut self, amount: usize) {
self.available -= amount as u64;
}
fn get_quota_portion(&mut self, portion: u64) -> QuotaInfo {
let mut given: u64 = portion;
if given > self.cluster_remaining {
given = self.cluster_remaining;
}
if given < 1 && self.cluster_remaining >= 1 {
given = 1;
}
self.cluster_remaining -= given;
debug!(
"Portion requested: {}, given: {}, remaining: {}",
portion, given, self.cluster_remaining
);
QuotaInfo::new(given, self.cluster_remaining, self.reset)
}
}