use data_storage_lib::ll::distributed::{
DistributedStorageClient, DistributedStorageClientExtractionError,
};
use data_storage_lib::ll::local::SharedData;
use lock_lib::LockBuilder;
use std::collections::hash_map::DefaultHasher;
use std::convert::Infallible;
use std::hash::{Hash, Hasher};
use std::rc::Rc;
use thiserror::Error;
use crate::bucket::BucketFactory;
use crate::distribution_formula::DistributionFormula;
use crate::key_manager::KeyManagerFactory;
use crate::RateLimitInstance;
use pdk_core::classy::extract::context::ConfigureContext;
use pdk_core::classy::extract::{Extract, FromContext};
use pdk_core::classy::timer::Timer;
use pdk_core::classy::Clock;
use pdk_core::log::debug;
use pdk_core::policy_context::api::{Metadata, Tier};
const RL_VERSION: &str = "v1";
pub struct RateLimitBuilder {
prefix: String,
clock: Rc<dyn Clock>,
shared_data: Rc<SharedData>,
lock: Rc<LockBuilder>,
buckets: Vec<(String, Vec<Tier>)>,
distributed_storage:
Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
}
impl FromContext<ConfigureContext> for RateLimitBuilder {
type Error = Infallible;
fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
let shared_data: SharedData = context.extract()?;
let clock: Rc<dyn Clock> = context.extract()?;
let metadata: Metadata = context.extract()?;
let lock: LockBuilder = context.extract()?;
let distributed_storage: Result<
DistributedStorageClient,
DistributedStorageClientExtractionError,
> = context.extract();
let buckets = metadata
.api_metadata
.slas
.unwrap_or_default()
.into_iter()
.map(|sla| (sla.id, sla.tiers))
.collect();
Ok(RateLimitBuilder {
prefix: format!(
"rl-{RL_VERSION}-{}-{}",
metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
),
clock,
shared_data: Rc::new(shared_data),
lock: Rc::new(lock),
buckets,
distributed_storage: distributed_storage.map(Rc::new).map_err(Rc::new),
})
}
}
impl RateLimitBuilder {
#[allow(clippy::new_ret_no_self)]
pub fn new(&self, builder_id: String) -> RateLimitBuilderInstance {
RateLimitBuilderInstance {
prefix: self.prefix.clone(),
clock: Rc::clone(&self.clock),
shared_data: Rc::clone(&self.shared_data),
buckets: self.buckets.clone(),
distributed_storage: self.distributed_storage.clone(),
timer: None,
lock: Rc::clone(&self.lock),
builder_id,
}
}
}
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum RateLimitBuildError {
#[error("Rate limit instance was configured to work on cluster mode but storage client couldn't be extracted from context. {0}")]
MissingStorageClient(Rc<DistributedStorageClientExtractionError>),
}
pub struct RateLimitBuilderInstance {
prefix: String,
clock: Rc<dyn Clock>,
shared_data: Rc<SharedData>,
distributed_storage:
Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
timer: Option<Rc<Timer>>,
buckets: Vec<(String, Vec<Tier>)>,
lock: Rc<LockBuilder>,
builder_id: String,
}
impl RateLimitBuilderInstance {
pub fn buckets(mut self, buckets: Vec<(String, Vec<Tier>)>) -> Self {
self.buckets = buckets;
self
}
pub fn clustered(mut self, timer: Rc<Timer>) -> Self {
self.timer = Some(timer);
self
}
pub fn shared(mut self) -> Self {
self.prefix = "shared-rl".to_string();
self
}
fn hash_buckets(&self) -> String {
let mut hasher = DefaultHasher::new();
for (group, tier) in self.buckets.iter() {
group.hash(&mut hasher);
for t in tier.iter() {
t.requests.hash(&mut hasher);
t.period_in_millis.hash(&mut hasher);
}
}
hasher.finish().to_string()
}
pub fn build(self) -> Result<RateLimitInstance, RateLimitBuildError> {
let store = format!(
"{}-{}-{}",
self.prefix,
self.builder_id,
self.hash_buckets()
);
debug!("Building RateLimitInstance with store `{store}`");
let key_manager = KeyManagerFactory::new(&store, self.timer.is_some());
let bucket = BucketFactory::new(self.timer.is_some(), self.buckets);
let cluster = if self.timer.is_some() {
match self.distributed_storage {
Err(e) => return Err(RateLimitBuildError::MissingStorageClient(e)),
Ok(storage) => Some(storage),
}
} else {
None
};
let formula = DistributionFormula::RemainingPercentage(0.1f32);
debug!("Building RateLimitInstance with store `{store}` and limits `{bucket:?}`");
Ok(RateLimitInstance::new(
store,
key_manager,
bucket,
formula,
self.clock,
self.shared_data,
cluster,
self.timer,
self.lock,
))
}
}