throttlecrab_server/actor.rs
1//! Actor-based rate limiter for shared state management
2//!
3//! This module implements an actor pattern to ensure thread-safe access to the
4//! rate limiter state. All transports communicate with a single actor instance,
5//! guaranteeing consistent rate limiting across protocols.
6//!
7//! # Architecture
8//!
9//! The actor pattern provides:
10//! - **Thread Safety**: Single-threaded access to mutable state
11//! - **Async Communication**: Non-blocking message passing via channels
12//! - **Protocol Independence**: All transports use the same interface
13//!
14//! # Example
15//!
16//! ```ignore
17//! // Spawn an actor with an adaptive store
18//! let limiter = RateLimiterActor::spawn_adaptive(10000, AdaptiveStore::new());
19//!
20//! // Use the handle from any transport
21//! let response = limiter.throttle(request).await?;
22//! ```
23
24use crate::metrics::Metrics;
25use crate::types::{ThrottleRequest, ThrottleResponse};
26use anyhow::Result;
27use std::sync::Arc;
28use throttlecrab::{AdaptiveStore, CellError, PeriodicStore, ProbabilisticStore, RateLimiter};
29use tokio::sync::{mpsc, oneshot};
30
31/// Message types for the rate limiter actor
32///
33/// Currently supports throttle requests, but can be extended with
34/// additional message types like statistics queries or cache clearing.
35pub enum RateLimiterMessage {
36 /// Check rate limit for a key
37 Throttle {
38 /// The rate limit request
39 request: ThrottleRequest,
40 /// Channel to send the response back
41 response_tx: oneshot::Sender<Result<ThrottleResponse>>,
42 },
43 // Future: Stats, Clear, Shutdown, etc.
44}
45
46/// Handle to communicate with the rate limiter actor
47///
48/// This handle can be cloned and shared across multiple tasks/threads.
49/// All operations are async and non-blocking.
50#[derive(Clone)]
51pub struct RateLimiterHandle {
52 tx: mpsc::Sender<RateLimiterMessage>,
53 #[allow(dead_code)] // Will be used for future metrics queries
54 pub metrics: Arc<Metrics>,
55}
56
57impl RateLimiterHandle {
58 /// Check rate limit for a key
59 ///
60 /// Sends a throttle request to the actor and waits for the response.
61 /// This method is cancel-safe and can be used in select! expressions.
62 ///
63 /// # Errors
64 ///
65 /// Returns an error if:
66 /// - The actor has shut down
67 /// - The response channel was dropped
68 pub async fn throttle(&self, request: ThrottleRequest) -> Result<ThrottleResponse> {
69 let (response_tx, response_rx) = oneshot::channel();
70
71 self.tx
72 .send(RateLimiterMessage::Throttle {
73 request,
74 response_tx,
75 })
76 .await
77 .map_err(|_| anyhow::anyhow!("Rate limiter actor has shut down"))?;
78
79 response_rx
80 .await
81 .map_err(|_| anyhow::anyhow!("Rate limiter actor dropped response channel"))?
82 }
83}
84
85/// The rate limiter actor factory
86///
87/// Provides static methods to spawn rate limiter actors with different store types.
88/// Each actor runs in its own Tokio task and processes messages sequentially.
89pub struct RateLimiterActor;
90
91impl RateLimiterActor {
92 /// Spawn a new rate limiter actor with a periodic store
93 ///
94 /// # Parameters
95 ///
96 /// - `buffer_size`: Channel buffer size for backpressure control
97 /// - `store`: The periodic store instance to use
98 ///
99 /// # Returns
100 ///
101 /// A [`RateLimiterHandle`] for communicating with the actor
102 pub fn spawn_periodic(
103 buffer_size: usize,
104 store: PeriodicStore,
105 metrics: Arc<Metrics>,
106 ) -> RateLimiterHandle {
107 let (tx, rx) = mpsc::channel(buffer_size);
108 let metrics_clone = Arc::clone(&metrics);
109
110 tokio::spawn(async move {
111 let store_type = StoreType::Periodic(RateLimiter::new(store));
112 run_actor(rx, store_type, metrics_clone).await;
113 });
114
115 RateLimiterHandle { tx, metrics }
116 }
117
118 /// Spawn a new rate limiter actor with a probabilistic store
119 ///
120 /// # Parameters
121 ///
122 /// - `buffer_size`: Channel buffer size for backpressure control
123 /// - `store`: The probabilistic store instance to use
124 ///
125 /// # Returns
126 ///
127 /// A [`RateLimiterHandle`] for communicating with the actor
128 pub fn spawn_probabilistic(
129 buffer_size: usize,
130 store: ProbabilisticStore,
131 metrics: Arc<Metrics>,
132 ) -> RateLimiterHandle {
133 let (tx, rx) = mpsc::channel(buffer_size);
134 let metrics_clone = Arc::clone(&metrics);
135
136 tokio::spawn(async move {
137 let store_type = StoreType::Probabilistic(RateLimiter::new(store));
138 run_actor(rx, store_type, metrics_clone).await;
139 });
140
141 RateLimiterHandle { tx, metrics }
142 }
143
144 /// Spawn a new rate limiter actor with an adaptive store
145 ///
146 /// # Parameters
147 ///
148 /// - `buffer_size`: Channel buffer size for backpressure control
149 /// - `store`: The adaptive store instance to use
150 ///
151 /// # Returns
152 ///
153 /// A [`RateLimiterHandle`] for communicating with the actor
154 pub fn spawn_adaptive(
155 buffer_size: usize,
156 store: AdaptiveStore,
157 metrics: Arc<Metrics>,
158 ) -> RateLimiterHandle {
159 let (tx, rx) = mpsc::channel(buffer_size);
160 let metrics_clone = Arc::clone(&metrics);
161
162 tokio::spawn(async move {
163 let store_type = StoreType::Adaptive(RateLimiter::new(store));
164 run_actor(rx, store_type, metrics_clone).await;
165 });
166
167 RateLimiterHandle { tx, metrics }
168 }
169}
170
171/// Internal enum to handle different store types
172enum StoreType {
173 Periodic(RateLimiter<PeriodicStore>),
174 Probabilistic(RateLimiter<ProbabilisticStore>),
175 Adaptive(RateLimiter<AdaptiveStore>),
176}
177
178impl StoreType {
179 fn rate_limit(
180 &mut self,
181 key: &str,
182 max_burst: i64,
183 count_per_period: i64,
184 period: i64,
185 quantity: i64,
186 timestamp: std::time::SystemTime,
187 ) -> Result<(bool, throttlecrab::RateLimitResult), CellError> {
188 match self {
189 StoreType::Periodic(limiter) => limiter.rate_limit(
190 key,
191 max_burst,
192 count_per_period,
193 period,
194 quantity,
195 timestamp,
196 ),
197 StoreType::Probabilistic(limiter) => limiter.rate_limit(
198 key,
199 max_burst,
200 count_per_period,
201 period,
202 quantity,
203 timestamp,
204 ),
205 StoreType::Adaptive(limiter) => limiter.rate_limit(
206 key,
207 max_burst,
208 count_per_period,
209 period,
210 quantity,
211 timestamp,
212 ),
213 }
214 }
215}
216
217async fn run_actor(
218 mut rx: mpsc::Receiver<RateLimiterMessage>,
219 mut store_type: StoreType,
220 _metrics: Arc<Metrics>,
221) {
222 while let Some(msg) = rx.recv().await {
223 match msg {
224 RateLimiterMessage::Throttle {
225 request,
226 response_tx,
227 } => {
228 let response = handle_throttle(&mut store_type, request);
229 // Ignore send errors - receiver may have timed out
230 let _ = response_tx.send(response);
231 }
232 }
233 }
234
235 tracing::info!("Rate limiter actor shutting down");
236}
237
238fn handle_throttle(
239 store_type: &mut StoreType,
240 request: ThrottleRequest,
241) -> Result<ThrottleResponse> {
242 // Check the rate limit
243 let (allowed, result) = store_type
244 .rate_limit(
245 &request.key,
246 request.max_burst,
247 request.count_per_period,
248 request.period,
249 request.quantity,
250 request.timestamp,
251 )
252 .map_err(|e| anyhow::anyhow!("Rate limit check failed: {}", e))?;
253
254 Ok(ThrottleResponse::from((allowed, result)))
255}