pdk_rate_limit_lib/builder.rs
1// Copyright (c) 2026, Salesforce, Inc.,
2// All rights reserved.
3// For full license text, see the LICENSE.txt file
4
5use data_storage_lib::ll::distributed::{
6 DistributedStorageClient, DistributedStorageClientExtractionError,
7};
8use data_storage_lib::ll::local::SharedData;
9use lock_lib::LockBuilder;
10use std::collections::hash_map::DefaultHasher;
11use std::convert::Infallible;
12use std::hash::{Hash, Hasher};
13use std::rc::Rc;
14use thiserror::Error;
15
16use crate::bucket::BucketFactory;
17use crate::distribution_formula::DistributionFormula;
18use crate::key_manager::KeyManagerFactory;
19use crate::RateLimitInstance;
20use pdk_core::classy::extract::context::ConfigureContext;
21use pdk_core::classy::extract::{Extract, FromContext};
22use pdk_core::classy::timer::Timer;
23use pdk_core::classy::Clock;
24use pdk_core::log::debug;
25use pdk_core::policy_context::api::{Metadata, Tier};
26
27const RL_VERSION: &str = "v1";
28
29/// Builder pattern implementation for creating rate limit instances.
30///
31/// This struct provides a convenient way to configure and create rate limit instances
32/// with various options such as clustering, custom buckets, and distributed storage.
33/// It extracts necessary dependencies from the configuration context and maintains
34/// shared references to components like clocks, storage, and locks.
35pub struct RateLimitBuilder {
36 prefix: String,
37 clock: Rc<dyn Clock>,
38 shared_data: Rc<SharedData>,
39 lock: Rc<LockBuilder>,
40 buckets: Vec<(String, Vec<Tier>)>,
41 distributed_storage:
42 Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
43}
44
45impl FromContext<ConfigureContext> for RateLimitBuilder {
46 type Error = Infallible;
47
48 fn from_context(context: &ConfigureContext) -> Result<Self, Self::Error> {
49 let shared_data: SharedData = context.extract()?;
50 let clock: Rc<dyn Clock> = context.extract()?;
51 let metadata: Metadata = context.extract()?;
52 let lock: LockBuilder = context.extract()?;
53 let distributed_storage: Result<
54 DistributedStorageClient,
55 DistributedStorageClientExtractionError,
56 > = context.extract();
57
58 let buckets = metadata
59 .api_metadata
60 .slas
61 .unwrap_or_default()
62 .into_iter()
63 .map(|sla| (sla.id, sla.tiers))
64 .collect();
65
66 Ok(RateLimitBuilder {
67 prefix: format!(
68 "rl-{RL_VERSION}-{}-{}",
69 metadata.policy_metadata.policy_name, metadata.policy_metadata.policy_namespace
70 ),
71 clock,
72 shared_data: Rc::new(shared_data),
73 lock: Rc::new(lock),
74 buckets,
75 distributed_storage: distributed_storage.map(Rc::new).map_err(Rc::new),
76 })
77 }
78}
79
80impl RateLimitBuilder {
81 /// Creates a new builder instance for configuring a rate limiter.
82 ///
83 /// # Arguments
84 ///
85 /// * `builder_id` - A string identifier for the rate limit instance.
86 ///
87 /// # Returns
88 ///
89 /// A new [`RateLimitBuilderInstance`] configured in local mode by default.
90 /// It allows for further configuration before building the final RateLimitInstance.
91 /// The returned instance inherits all the configuration and dependencies from this builder.
92 ///
93 /// Additional methods provided:
94 /// - [`buckets`][RateLimitBuilderInstance::buckets]: Allows custom quotas configuration as per-policy, per-SLA, or per-user grouping.
95 /// - [`clustered`][RateLimitBuilderInstance::clustered]: Enables distributed, cross-node rate limiting by invoking `.clustered(timer)` before `.build()`.
96 /// This requires a distributed storage client and configures the limiter for quota sharing across a cluster.
97 /// - [`shared`][RateLimitBuilderInstance::shared]: Enables sharing the rate limit across different policy instances
98 /// - [`build`][RateLimitBuilderInstance::build]: Creates a new [`RateLimitInstance`].
99 ///
100 /// # Example
101 ///
102 /// ```rust
103 /// // Local mode (single node, in-memory):
104 /// let local_limiter = builder
105 /// .new("unique-id".to_string())
106 /// .build()?;
107 ///
108 /// // Cluster mode (multi-node, distributed quota):
109 /// let custom_buckets = vec![
110 /// (
111 /// "default".to_string(),
112 /// vec![
113 /// Tier {
114 /// period_in_millis: 60000,
115 /// requests: 120,
116 /// },
117 /// ],
118 /// ),
119 /// ];
120 ///
121 /// let timer = clock.period(Duration::from_secs(5));
122 ///
123 /// let cluster_limiter = builder
124 /// .new("unique-id".to_string())
125 /// .buckets(custom_buckets)
126 /// .clustered(timer)
127 /// .build()?;
128 ///
129 /// // Shared mode (across policy instances):
130 /// let shared_limiter = builder
131 /// .new("unique-id".to_string())
132 /// .shared()
133 /// .build()?;
134 /// ```
135 #[allow(clippy::new_ret_no_self)]
136 pub fn new(&self, builder_id: String) -> RateLimitBuilderInstance {
137 RateLimitBuilderInstance {
138 prefix: self.prefix.clone(),
139 clock: Rc::clone(&self.clock),
140 shared_data: Rc::clone(&self.shared_data),
141 buckets: self.buckets.clone(),
142 distributed_storage: self.distributed_storage.clone(),
143 timer: None,
144 lock: Rc::clone(&self.lock),
145 builder_id,
146 }
147 }
148}
149
150/// Errors that can occur during rate limit instance building.
151///
152/// This enum represents the various error conditions that can occur when
153/// building a rate limit instance, particularly related to distributed
154/// storage configuration issues.
155#[non_exhaustive]
156#[derive(Error, Debug)]
157pub enum RateLimitBuildError {
158 /// This error occurs when the rate limiter is configured to operate in
159 /// cluster mode (via the `clustered()` method) but the distributed storage
160 /// client extraction failed during the builder initialization.
161 #[error("Rate limit instance was configured to work on cluster mode but storage client couldn't be extracted from context. {0}")]
162 MissingStorageClient(Rc<DistributedStorageClientExtractionError>),
163}
164
165/// A configurable instance of a rate limit builder.
166///
167/// This struct represents a particular instance of a rate limit builder that can be
168/// further configured before building the final `RateLimitInstance`.
169pub struct RateLimitBuilderInstance {
170 prefix: String,
171 clock: Rc<dyn Clock>,
172 shared_data: Rc<SharedData>,
173 distributed_storage:
174 Result<Rc<DistributedStorageClient>, Rc<DistributedStorageClientExtractionError>>,
175 timer: Option<Rc<Timer>>,
176 buckets: Vec<(String, Vec<Tier>)>,
177 lock: Rc<LockBuilder>,
178 builder_id: String,
179}
180
181impl RateLimitBuilderInstance {
182 /// Configures custom rate limiting buckets for this instance.
183 ///
184 /// This method allows overriding the default buckets extracted from the metadata
185 /// with custom bucket configurations. Each bucket is defined by a string identifier
186 /// and a vector of tiers that specify the rate limiting rules.
187 ///
188 /// # Arguments
189 ///
190 /// * `buckets` - A vector of tuples where each tuple contains:
191 /// - A string identifier for the bucket group
192 /// - A vector of `Tier` objects defining the rate limiting rules
193 ///
194 /// # Returns
195 ///
196 /// Returns `self` for method chaining.
197 pub fn buckets(mut self, buckets: Vec<(String, Vec<Tier>)>) -> Self {
198 self.buckets = buckets;
199 self
200 }
201
202 /// Enables clustered mode for distributed rate limiting.
203 ///
204 /// When clustered mode is enabled, the rate limiter will coordinate across
205 /// multiple instances using distributed storage. This requires a timer for
206 /// synchronization and a distributed storage client to be available.
207 ///
208 /// # Arguments
209 ///
210 /// * `timer` - A shared reference to a Timer used for synchronization
211 /// in the distributed environment
212 ///
213 /// # Returns
214 ///
215 /// Returns `self` for method chaining.
216 pub fn clustered(mut self, timer: Rc<Timer>) -> Self {
217 self.timer = Some(timer);
218 self
219 }
220
221 /// Allows sharing the rate limit across different policy instances
222 ///
223 /// When shared mode is enabled:
224 /// - The prefix is fixed to ensure all instances share the same store
225 /// - The user must explicitly choose between local or clustered mode
226 /// - For clustered mode, call .clustered(timer) after .shared()
227 pub fn shared(mut self) -> Self {
228 self.prefix = "shared-rl".to_string();
229 self
230 }
231
232 fn hash_buckets(&self) -> String {
233 let mut hasher = DefaultHasher::new();
234 for (group, tier) in self.buckets.iter() {
235 group.hash(&mut hasher);
236 for t in tier.iter() {
237 t.requests.hash(&mut hasher);
238 t.period_in_millis.hash(&mut hasher);
239 }
240 }
241 hasher.finish().to_string()
242 }
243
244 /// Builds the final RateLimitInstance with the configured options.
245 ///
246 /// This method constructs and returns a fully configured `RateLimitInstance`
247 /// based on all the settings and dependencies accumulated in this builder.
248 /// It performs validation to ensure that clustered mode has the necessary
249 /// distributed storage client available.
250 ///
251 /// The method creates:
252 /// - A unique storage key based on the prefix and bucket hash
253 /// - A key manager for handling rate limit keys
254 /// - A bucket factory for managing rate limit buckets
255 /// - A distribution formula for load balancing (if clustered)
256 ///
257 /// # Returns
258 ///
259 /// * `Ok(RateLimitInstance)` - A fully configured rate limiter instance
260 /// * `Err(RateLimitBuildError)` - If any error occurs during the build process
261 pub fn build(self) -> Result<RateLimitInstance, RateLimitBuildError> {
262 let store = format!(
263 "{}-{}-{}",
264 self.prefix,
265 self.builder_id,
266 self.hash_buckets()
267 );
268 debug!("Building RateLimitInstance with store `{store}`");
269 let key_manager = KeyManagerFactory::new(&store, self.timer.is_some());
270 let bucket = BucketFactory::new(self.timer.is_some(), self.buckets);
271
272 let cluster = if self.timer.is_some() {
273 match self.distributed_storage {
274 Err(e) => return Err(RateLimitBuildError::MissingStorageClient(e)),
275 Ok(storage) => Some(storage),
276 }
277 } else {
278 None
279 };
280
281 let formula = DistributionFormula::RemainingPercentage(0.1f32);
282
283 debug!("Building RateLimitInstance with store `{store}` and limits `{bucket:?}`");
284 Ok(RateLimitInstance::new(
285 store,
286 key_manager,
287 bucket,
288 formula,
289 self.clock,
290 self.shared_data,
291 cluster,
292 self.timer,
293 self.lock,
294 ))
295 }
296}