pdk-rate-limit-lib 1.7.0

PDK Rate Limit Library
Documentation
// Copyright (c) 2026, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use data_storage_lib::ll::distributed::{
    DistributedStorageClient, DistributedStorageClientExtractionError,
};
use data_storage_lib::ll::local::SharedData;
use lock_lib::LockBuilder;
use std::collections::hash_map::DefaultHasher;
use std::convert::Infallible;
use std::hash::{Hash, Hasher};
use std::rc::Rc;
use thiserror::Error;

use crate::bucket::BucketFactory;
use crate::distribution_formula::DistributionFormula;
use crate::key_manager::KeyManagerFactory;
use crate::RateLimitInstance;
use pdk_core::classy::extract::context::ConfigureContext;
use pdk_core::classy::extract::{Extract, FromContext};
use pdk_core::classy::timer::Timer;
use pdk_core::classy::Clock;
use pdk_core::log::debug;
use pdk_core::policy_context::api::{Metadata, Tier};

const RL_VERSION: &str = "v1";

/// Builder pattern implementation for creating rate limit instances.
///
/// This struct provides a convenient way to configure and create rate limit instances
/// with various options such as clustering, custom buckets, and distributed storage.
/// It extracts necessary dependencies from the configuration context and maintains
/// shared references to components like clocks, storage, and locks.
pub struct RateLimitBuilder {
    prefix: String,
    clock: Rc<dyn Clock>,
    shared_data: Rc<SharedData>,
    lock: Rc<LockBuilder>,
    buckets: Vec<(String, Vec<Tier>)>,
    distributed_storage:
        Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
}

impl FromContext<ConfigureContext> for RateLimitBuilder {
    type Error = Infallible;

    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
        let shared_data: SharedData = context.extract()?;
        let clock: Rc<dyn Clock> = context.extract()?;
        let metadata: Metadata = context.extract()?;
        let lock: LockBuilder = context.extract()?;
        let distributed_storage: Result<
            DistributedStorageClient,
            DistributedStorageClientExtractionError,
        > = context.extract();

        let buckets = metadata
            .api_metadata
            .slas
            .unwrap_or_default()
            .into_iter()
            .map(|sla| (sla.id, sla.tiers))
            .collect();

        Ok(RateLimitBuilder {
            prefix: format!(
                "rl-{RL_VERSION}-{}-{}",
                metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
            ),
            clock,
            shared_data: Rc::new(shared_data),
            lock: Rc::new(lock),
            buckets,
            distributed_storage: distributed_storage.map(Rc::new).map_err(Rc::new),
        })
    }
}

impl RateLimitBuilder {
    /// Creates a new builder instance for configuring a rate limiter.
    ///
    /// # Arguments
    ///
    /// * `builder_id` - A string identifier for the rate limit instance.
    ///
    /// # Returns
    ///
    /// A new [`RateLimitBuilderInstance`] configured in local mode by default.
    /// It allows for further configuration before building the final RateLimitInstance.
    /// The returned instance inherits all the configuration and dependencies from this builder.
    ///
    /// Additional methods provided:
    /// - [`buckets`][RateLimitBuilderInstance::buckets]: Allows custom quotas configuration as per-policy, per-SLA, or per-user grouping.
    /// - [`clustered`][RateLimitBuilderInstance::clustered]: Enables distributed, cross-node rate limiting by invoking `.clustered(timer)` before `.build()`.
    ///   This requires a distributed storage client and configures the limiter for quota sharing across a cluster.
    /// - [`shared`][RateLimitBuilderInstance::shared]: Enables sharing the rate limit across different policy instances
    /// - [`build`][RateLimitBuilderInstance::build]: Creates a new [`RateLimitInstance`].
    ///
    /// # Example
    ///
    /// ```rust
    /// // Local mode (single node, in-memory):
    /// let local_limiter = builder
    ///     .new("unique-id".to_string())
    ///     .build()?;
    ///
    /// // Cluster mode (multi-node, distributed quota):
    /// let custom_buckets = vec![
    ///     (
    ///         "default".to_string(),
    ///         vec![
    ///             Tier {
    ///                 period_in_millis: 60000,
    ///                 requests: 120,
    ///             },
    ///         ],
    ///     ),
    /// ];
    ///
    /// let timer = clock.period(Duration::from_secs(5));
    ///
    /// let cluster_limiter = builder
    ///     .new("unique-id".to_string())
    ///     .buckets(custom_buckets)
    ///     .clustered(timer)
    ///     .build()?;
    ///
    /// // Shared mode (across policy instances):
    /// let shared_limiter = builder
    ///     .new("unique-id".to_string())
    ///     .shared()
    ///     .build()?;
    /// ```
    #[allow(clippy::new_ret_no_self)]
    pub fn new(&self, builder_id: String) -> RateLimitBuilderInstance {
        RateLimitBuilderInstance {
            prefix: self.prefix.clone(),
            clock: Rc::clone(&self.clock),
            shared_data: Rc::clone(&self.shared_data),
            buckets: self.buckets.clone(),
            distributed_storage: self.distributed_storage.clone(),
            timer: None,
            lock: Rc::clone(&self.lock),
            builder_id,
        }
    }
}

/// Errors that can occur during rate limit instance building.
///
/// This enum represents the various error conditions that can occur when
/// building a rate limit instance, particularly related to distributed
/// storage configuration issues.
#[non_exhaustive]
#[derive(Error, Debug)]
pub enum RateLimitBuildError {
    /// This error occurs when the rate limiter is configured to operate in
    /// cluster mode (via the `clustered()` method) but the distributed storage
    /// client extraction failed during the builder initialization.
    #[error("Rate limit instance was configured to work on cluster mode but storage client couldn't be extracted from context. {0}")]
    MissingStorageClient(Rc<DistributedStorageClientExtractionError>),
}

/// A configurable instance of a rate limit builder.
///
/// This struct represents a particular instance of a rate limit builder that can be
/// further configured before building the final `RateLimitInstance`.
pub struct RateLimitBuilderInstance {
    prefix: String,
    clock: Rc<dyn Clock>,
    shared_data: Rc<SharedData>,
    distributed_storage:
        Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
    timer: Option<Rc<Timer>>,
    buckets: Vec<(String, Vec<Tier>)>,
    lock: Rc<LockBuilder>,
    builder_id: String,
}

impl RateLimitBuilderInstance {
    /// Configures custom rate limiting buckets for this instance.
    ///
    /// This method allows overriding the default buckets extracted from the metadata
    /// with custom bucket configurations. Each bucket is defined by a string identifier
    /// and a vector of tiers that specify the rate limiting rules.
    ///
    /// # Arguments
    ///
    /// * `buckets` - A vector of tuples where each tuple contains:
    ///   - A string identifier for the bucket group
    ///   - A vector of `Tier` objects defining the rate limiting rules
    ///
    /// # Returns
    ///
    /// Returns `self` for method chaining.
    pub fn buckets(mut self, buckets: Vec<(String, Vec<Tier>)>) -> Self {
        self.buckets = buckets;
        self
    }

    /// Enables clustered mode for distributed rate limiting.
    ///
    /// When clustered mode is enabled, the rate limiter will coordinate across
    /// multiple instances using distributed storage. This requires a timer for
    /// synchronization and a distributed storage client to be available.
    ///
    /// # Arguments
    ///
    /// * `timer` - A shared reference to a Timer used for synchronization
    ///   in the distributed environment
    ///
    /// # Returns
    ///
    /// Returns `self` for method chaining.
    pub fn clustered(mut self, timer: Rc<Timer>) -> Self {
        self.timer = Some(timer);
        self
    }

    /// Allows sharing the rate limit across different policy instances
    ///
    /// When shared mode is enabled:
    /// - The prefix is fixed to ensure all instances share the same store
    /// - The user must explicitly choose between local or clustered mode
    /// - For clustered mode, call .clustered(timer) after .shared()
    pub fn shared(mut self) -> Self {
        self.prefix = "shared-rl".to_string();
        self
    }

    fn hash_buckets(&self) -> String {
        let mut hasher = DefaultHasher::new();
        for (group, tier) in self.buckets.iter() {
            group.hash(&mut hasher);
            for t in tier.iter() {
                t.requests.hash(&mut hasher);
                t.period_in_millis.hash(&mut hasher);
            }
        }
        hasher.finish().to_string()
    }

    /// Builds the final RateLimitInstance with the configured options.
    ///
    /// This method constructs and returns a fully configured `RateLimitInstance`
    /// based on all the settings and dependencies accumulated in this builder.
    /// It performs validation to ensure that clustered mode has the necessary
    /// distributed storage client available.
    ///
    /// The method creates:
    /// - A unique storage key based on the prefix and bucket hash
    /// - A key manager for handling rate limit keys
    /// - A bucket factory for managing rate limit buckets
    /// - A distribution formula for load balancing (if clustered)
    ///
    /// # Returns
    ///
    /// * `Ok(RateLimitInstance)` - A fully configured rate limiter instance
    /// * `Err(RateLimitBuildError)` - If any error occurs during the build process
    pub fn build(self) -> Result<RateLimitInstance, RateLimitBuildError> {
        let store = format!(
            "{}-{}-{}",
            self.prefix,
            self.builder_id,
            self.hash_buckets()
        );
        debug!("Building RateLimitInstance with store `{store}`");
        let key_manager = KeyManagerFactory::new(&store, self.timer.is_some());
        let bucket = BucketFactory::new(self.timer.is_some(), self.buckets);

        let cluster = if self.timer.is_some() {
            match self.distributed_storage {
                Err(e) => return Err(RateLimitBuildError::MissingStorageClient(e)),
                Ok(storage) => Some(storage),
            }
        } else {
            None
        };

        let formula = DistributionFormula::RemainingPercentage(0.1f32);

        debug!("Building RateLimitInstance with store `{store}` and limits `{bucket:?}`");
        Ok(RateLimitInstance::new(
            store,
            key_manager,
            bucket,
            formula,
            self.clock,
            self.shared_data,
            cluster,
            self.timer,
            self.lock,
        ))
    }
}