pdk-rate-limit-lib 1.7.0

PDK Rate Limit Library
Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use pdk_core::log::debug;
use pdk_core::policy_context::api::Tier;
use serde::{Deserialize, Serialize};
use std::cmp::max;

use super::super::distribution_formula::DistributionFormula;
use super::{LimitStats, QuotaInfo, RequestAllowed};

/// Cluster bucket for the rate limiting.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClusterBucket {
    limits: Vec<ClusterLimit>,
}

impl ClusterBucket {
    /// Create a new cluster bucket with the specified start time and tiers.
    pub fn new(start_time: u128, tiers: &[Tier]) -> Self {
        Self {
            limits: tiers
                .iter()
                .map(|tier| ClusterLimit::from(start_time, tier))
                .collect(),
        }
    }

    /// To use in case of the bucket being used in cluster mode
    pub fn request_allowed(&mut self, now: u128, amount: usize) -> RequestAllowed {
        self.set_time(now);

        if self.limits.iter().all(|l| l.remaining(amount)) {
            self.limits.iter_mut().for_each(|l| l.allow_request(amount));
            RequestAllowed::Allowed
        } else if self.limits.iter().all(|l| l.cluster_remaining(amount)) {
            RequestAllowed::OutOfLocalQuota
        } else {
            RequestAllowed::OutOfQuota
        }
    }

    /// Get the status of the cluster bucket.
    pub fn status(&self) -> Option<LimitStats> {
        self.limits
            .iter()
            .map(ClusterLimit::stats)
            .reduce(LimitStats::most_restrictive)
    }

    /// To use in the primary bucket to get quota.
    pub fn get_quota(
        &mut self,
        now: u128,
        bucket: &ClusterBucket,
        formula: &DistributionFormula,
        amount: usize,
    ) -> Vec<QuotaInfo> {
        self.set_time(now);

        let mut quota_info = Vec::new();
        for (cluster, local) in self.limits.iter_mut().zip(bucket.limits.iter()) {
            if local.available >= amount as u64 && local.reset == cluster.reset {
                quota_info.push(QuotaInfo::new(0, cluster.cluster_remaining, cluster.reset))
            } else {
                quota_info.push(cluster.get_quota_portion(max(
                    formula.portion(cluster),
                    amount as u64 - local.available,
                )));
            }
        }
        quota_info
    }

    /// To use in the secondary bucket to get quota.
    pub fn update_quota(&mut self, quota: &[QuotaInfo]) {
        //TODO: this will misbehave if the sizes of the vecs are different
        for (local, quota) in self.limits.iter_mut().zip(quota.iter()) {
            local.update_quota(quota)
        }
    }

    fn set_time(&mut self, now: u128) {
        self.limits.iter_mut().for_each(|limit| limit.set_time(now));
    }
}

/// Cluster limit for the rate limiting.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ClusterLimit {
    /// Amount of requests available in the current period.
    pub available: u64,
    /// Amount of requests remaining in the cluster.
    pub cluster_remaining: u64,
    /// Maximum requests allowed in a single period.
    pub max_requests: u64,
    /// Length of the time window in milliseconds.
    pub period: u128,
    /// Timestamp when the rate limit will reset.
    pub reset: u128,
}

impl ClusterLimit {
    fn from(start_time: u128, tier: &Tier) -> Self {
        Self {
            available: 0,
            cluster_remaining: tier.requests,
            max_requests: tier.requests,
            period: tier.period_in_millis as u128,
            reset: start_time + tier.period_in_millis as u128,
        }
    }

    fn stats(&self) -> LimitStats {
        LimitStats::new(
            self.available + self.cluster_remaining,
            self.max_requests,
            self.reset,
        )
    }

    fn set_time(&mut self, now: u128) {
        while self.reset <= now {
            self.reset += self.period;
            self.available = 0;
            self.cluster_remaining = self.max_requests;
        }
    }

    fn update_quota(&mut self, quota: &QuotaInfo) {
        if quota.reset != self.reset {
            self.available = quota.given;
            self.reset = quota.reset;
            self.cluster_remaining = quota.remaining;
        } else {
            self.available += quota.given;
            self.cluster_remaining = quota.remaining;
        }
    }

    fn remaining(&self, amount: usize) -> bool {
        self.available >= amount as u64
    }

    fn cluster_remaining(&self, amount: usize) -> bool {
        self.available + self.cluster_remaining >= amount as u64
    }

    fn allow_request(&mut self, amount: usize) {
        self.available -= amount as u64;
    }

    fn get_quota_portion(&mut self, portion: u64) -> QuotaInfo {
        let mut given: u64 = portion;

        if given > self.cluster_remaining {
            given = self.cluster_remaining;
        }

        if given < 1 && self.cluster_remaining >= 1 {
            given = 1;
        }

        self.cluster_remaining -= given;
        debug!(
            "Portion requested: {}, given: {}, remaining: {}",
            portion, given, self.cluster_remaining
        );
        QuotaInfo::new(given, self.cluster_remaining, self.reset)
    }
}