Skip to main content

pdk_rate_limit_lib/
builder.rs

1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5use data_storage_lib::ll::distributed::{
6    DistributedStorageClient, DistributedStorageClientExtractionError,
7};
8use data_storage_lib::ll::local::SharedData;
9use lock_lib::LockBuilder;
10use std::collections::hash_map::DefaultHasher;
11use std::convert::Infallible;
12use std::hash::{Hash, Hasher};
13use std::rc::Rc;
14use thiserror::Error;
15
16use crate::bucket::BucketFactory;
17use crate::distribution_formula::DistributionFormula;
18use crate::key_manager::KeyManagerFactory;
19use crate::RateLimitInstance;
20use pdk_core::classy::extract::context::ConfigureContext;
21use pdk_core::classy::extract::{Extract, FromContext};
22use pdk_core::classy::timer::Timer;
23use pdk_core::classy::Clock;
24use pdk_core::log::debug;
25use pdk_core::policy_context::api::{Metadata, Tier};
26
27const RL_VERSION: &str = "v1";
28
29/// Builder pattern implementation for creating rate limit instances.
30///
31/// This struct provides a convenient way to configure and create rate limit instances
32/// with various options such as clustering, custom buckets, and distributed storage.
33/// It extracts necessary dependencies from the configuration context and maintains
34/// shared references to components like clocks, storage, and locks.
35pub struct RateLimitBuilder {
36    prefix: String,
37    clock: Rc<dyn Clock>,
38    shared_data: Rc<SharedData>,
39    lock: Rc<LockBuilder>,
40    buckets: Vec<(String, Vec<Tier>)>,
41    distributed_storage:
42        Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
43}
44
45impl FromContext<ConfigureContext> for RateLimitBuilder {
46    type Error = Infallible;
47
48    fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
49        let shared_data: SharedData = context.extract()?;
50        let clock: Rc<dyn Clock> = context.extract()?;
51        let metadata: Metadata = context.extract()?;
52        let lock: LockBuilder = context.extract()?;
53        let distributed_storage: Result<
54            DistributedStorageClient,
55            DistributedStorageClientExtractionError,
56        > = context.extract();
57
58        let buckets = metadata
59            .api_metadata
60            .slas
61            .unwrap_or_default()
62            .into_iter()
63            .map(|sla| (sla.id, sla.tiers))
64            .collect();
65
66        Ok(RateLimitBuilder {
67            prefix: format!(
68                "rl-{RL_VERSION}-{}-{}",
69                metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
70            ),
71            clock,
72            shared_data: Rc::new(shared_data),
73            lock: Rc::new(lock),
74            buckets,
75            distributed_storage: distributed_storage.map(Rc::new).map_err(Rc::new),
76        })
77    }
78}
79
80impl RateLimitBuilder {
81    /// Creates a new builder instance for configuring a rate limiter.
82    ///
83    /// # Arguments
84    ///
85    /// * `builder_id` - A string identifier for the rate limit instance.
86    ///
87    /// # Returns
88    ///
89    /// A new [`RateLimitBuilderInstance`] configured in local mode by default.
90    /// It allows for further configuration before building the final RateLimitInstance.
91    /// The returned instance inherits all the configuration and dependencies from this builder.
92    ///
93    /// Additional methods provided:
94    /// - [`buckets`][RateLimitBuilderInstance::buckets]: Allows custom quotas configuration as per-policy, per-SLA, or per-user grouping.
95    /// - [`clustered`][RateLimitBuilderInstance::clustered]: Enables distributed, cross-node rate limiting by invoking `.clustered(timer)` before `.build()`.
96    ///   This requires a distributed storage client and configures the limiter for quota sharing across a cluster.
97    /// - [`shared`][RateLimitBuilderInstance::shared]: Enables sharing the rate limit across different policy instances
98    /// - [`build`][RateLimitBuilderInstance::build]: Creates a new [`RateLimitInstance`].
99    ///
100    /// # Example
101    ///
102    /// ```rust
103    /// // Local mode (single node, in-memory):
104    /// let local_limiter = builder
105    ///     .new("unique-id".to_string())
106    ///     .build()?;
107    ///
108    /// // Cluster mode (multi-node, distributed quota):
109    /// let custom_buckets = vec![
110    ///     (
111    ///         "default".to_string(),
112    ///         vec![
113    ///             Tier {
114    ///                 period_in_millis: 60000,
115    ///                 requests: 120,
116    ///             },
117    ///         ],
118    ///     ),
119    /// ];
120    ///
121    /// let timer = clock.period(Duration::from_secs(5));
122    ///
123    /// let cluster_limiter = builder
124    ///     .new("unique-id".to_string())
125    ///     .buckets(custom_buckets)
126    ///     .clustered(timer)
127    ///     .build()?;
128    ///
129    /// // Shared mode (across policy instances):
130    /// let shared_limiter = builder
131    ///     .new("unique-id".to_string())
132    ///     .shared()
133    ///     .build()?;
134    /// ```
135    #[allow(clippy::new_ret_no_self)]
136    pub fn new(&self, builder_id: String) -> RateLimitBuilderInstance {
137        RateLimitBuilderInstance {
138            prefix: self.prefix.clone(),
139            clock: Rc::clone(&self.clock),
140            shared_data: Rc::clone(&self.shared_data),
141            buckets: self.buckets.clone(),
142            distributed_storage: self.distributed_storage.clone(),
143            timer: None,
144            lock: Rc::clone(&self.lock),
145            builder_id,
146        }
147    }
148}
149
150/// Errors that can occur during rate limit instance building.
151///
152/// This enum represents the various error conditions that can occur when
153/// building a rate limit instance, particularly related to distributed
154/// storage configuration issues.
155#[non_exhaustive]
156#[derive(Error, Debug)]
157pub enum RateLimitBuildError {
158    /// This error occurs when the rate limiter is configured to operate in
159    /// cluster mode (via the `clustered()` method) but the distributed storage
160    /// client extraction failed during the builder initialization.
161    #[error("Rate limit instance was configured to work on cluster mode but storage client couldn't be extracted from context. {0}")]
162    MissingStorageClient(Rc<DistributedStorageClientExtractionError>),
163}
164
165/// A configurable instance of a rate limit builder.
166///
167/// This struct represents a particular instance of a rate limit builder that can be
168/// further configured before building the final `RateLimitInstance`.
169pub struct RateLimitBuilderInstance {
170    prefix: String,
171    clock: Rc<dyn Clock>,
172    shared_data: Rc<SharedData>,
173    distributed_storage:
174        Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
175    timer: Option<Rc<Timer>>,
176    buckets: Vec<(String, Vec<Tier>)>,
177    lock: Rc<LockBuilder>,
178    builder_id: String,
179}
180
181impl RateLimitBuilderInstance {
182    /// Configures custom rate limiting buckets for this instance.
183    ///
184    /// This method allows overriding the default buckets extracted from the metadata
185    /// with custom bucket configurations. Each bucket is defined by a string identifier
186    /// and a vector of tiers that specify the rate limiting rules.
187    ///
188    /// # Arguments
189    ///
190    /// * `buckets` - A vector of tuples where each tuple contains:
191    ///   - A string identifier for the bucket group
192    ///   - A vector of `Tier` objects defining the rate limiting rules
193    ///
194    /// # Returns
195    ///
196    /// Returns `self` for method chaining.
197    pub fn buckets(mut self, buckets: Vec<(String, Vec<Tier>)>) -> Self {
198        self.buckets = buckets;
199        self
200    }
201
202    /// Enables clustered mode for distributed rate limiting.
203    ///
204    /// When clustered mode is enabled, the rate limiter will coordinate across
205    /// multiple instances using distributed storage. This requires a timer for
206    /// synchronization and a distributed storage client to be available.
207    ///
208    /// # Arguments
209    ///
210    /// * `timer` - A shared reference to a Timer used for synchronization
211    ///   in the distributed environment
212    ///
213    /// # Returns
214    ///
215    /// Returns `self` for method chaining.
216    pub fn clustered(mut self, timer: Rc<Timer>) -> Self {
217        self.timer = Some(timer);
218        self
219    }
220
221    /// Allows sharing the rate limit across different policy instances
222    ///
223    /// When shared mode is enabled:
224    /// - The prefix is fixed to ensure all instances share the same store
225    /// - The user must explicitly choose between local or clustered mode
226    /// - For clustered mode, call .clustered(timer) after .shared()
227    pub fn shared(mut self) -> Self {
228        self.prefix = "shared-rl".to_string();
229        self
230    }
231
232    fn hash_buckets(&self) -> String {
233        let mut hasher = DefaultHasher::new();
234        for (group, tier) in self.buckets.iter() {
235            group.hash(&mut hasher);
236            for t in tier.iter() {
237                t.requests.hash(&mut hasher);
238                t.period_in_millis.hash(&mut hasher);
239            }
240        }
241        hasher.finish().to_string()
242    }
243
244    /// Builds the final RateLimitInstance with the configured options.
245    ///
246    /// This method constructs and returns a fully configured `RateLimitInstance`
247    /// based on all the settings and dependencies accumulated in this builder.
248    /// It performs validation to ensure that clustered mode has the necessary
249    /// distributed storage client available.
250    ///
251    /// The method creates:
252    /// - A unique storage key based on the prefix and bucket hash
253    /// - A key manager for handling rate limit keys
254    /// - A bucket factory for managing rate limit buckets
255    /// - A distribution formula for load balancing (if clustered)
256    ///
257    /// # Returns
258    ///
259    /// * `Ok(RateLimitInstance)` - A fully configured rate limiter instance
260    /// * `Err(RateLimitBuildError)` - If any error occurs during the build process
261    pub fn build(self) -> Result<RateLimitInstance, RateLimitBuildError> {
262        let store = format!(
263            "{}-{}-{}",
264            self.prefix,
265            self.builder_id,
266            self.hash_buckets()
267        );
268        debug!("Building RateLimitInstance with store `{store}`");
269        let key_manager = KeyManagerFactory::new(&store, self.timer.is_some());
270        let bucket = BucketFactory::new(self.timer.is_some(), self.buckets);
271
272        let cluster = if self.timer.is_some() {
273            match self.distributed_storage {
274                Err(e) => return Err(RateLimitBuildError::MissingStorageClient(e)),
275                Ok(storage) => Some(storage),
276            }
277        } else {
278            None
279        };
280
281        let formula = DistributionFormula::RemainingPercentage(0.1f32);
282
283        debug!("Building RateLimitInstance with store `{store}` and limits `{bucket:?}`");
284        Ok(RateLimitInstance::new(
285            store,
286            key_manager,
287            bucket,
288            formula,
289            self.clock,
290            self.shared_data,
291            cluster,
292            self.timer,
293            self.lock,
294        ))
295    }
296}