throttlecrab_server/actor.rs
1//! Actor-based rate limiter for shared state management
2//!
3//! This module implements an actor pattern to ensure thread-safe access to the
4//! rate limiter state. All transports communicate with a single actor instance,
5//! guaranteeing consistent rate limiting across protocols.
6//!
7//! # Architecture
8//!
9//! The actor pattern provides:
10//! - **Thread Safety**: Single-threaded access to mutable state
11//! - **Async Communication**: Non-blocking message passing via channels
12//! - **Protocol Independence**: All transports use the same interface
13//!
14//! # Example
15//!
16//! ```ignore
17//! // Spawn an actor with an adaptive store
18//! let limiter = RateLimiterActor::spawn_adaptive(10000, AdaptiveStore::new());
19//!
20//! // Use the handle from any transport
21//! let response = limiter.throttle(request).await?;
22//! ```
23
24use crate::types::{ThrottleRequest, ThrottleResponse};
25use anyhow::Result;
26use throttlecrab::{AdaptiveStore, CellError, PeriodicStore, ProbabilisticStore, RateLimiter};
27use tokio::sync::{mpsc, oneshot};
28
29/// Message types for the rate limiter actor
30///
31/// Currently supports throttle requests, but can be extended with
32/// additional message types like statistics queries or cache clearing.
33pub enum RateLimiterMessage {
34 /// Check rate limit for a key
35 Throttle {
36 /// The rate limit request
37 request: ThrottleRequest,
38 /// Channel to send the response back
39 response_tx: oneshot::Sender<Result<ThrottleResponse>>,
40 },
41 // Future: Stats, Clear, Shutdown, etc.
42}
43
44/// Handle to communicate with the rate limiter actor
45///
46/// This handle can be cloned and shared across multiple tasks/threads.
47/// All operations are async and non-blocking.
48#[derive(Clone)]
49pub struct RateLimiterHandle {
50 tx: mpsc::Sender<RateLimiterMessage>,
51}
52
53impl RateLimiterHandle {
54 /// Check rate limit for a key
55 ///
56 /// Sends a throttle request to the actor and waits for the response.
57 /// This method is cancel-safe and can be used in select! expressions.
58 ///
59 /// # Errors
60 ///
61 /// Returns an error if:
62 /// - The actor has shut down
63 /// - The response channel was dropped
64 pub async fn throttle(&self, request: ThrottleRequest) -> Result<ThrottleResponse> {
65 let (response_tx, response_rx) = oneshot::channel();
66
67 self.tx
68 .send(RateLimiterMessage::Throttle {
69 request,
70 response_tx,
71 })
72 .await
73 .map_err(|_| anyhow::anyhow!("Rate limiter actor has shut down"))?;
74
75 response_rx
76 .await
77 .map_err(|_| anyhow::anyhow!("Rate limiter actor dropped response channel"))?
78 }
79}
80
81/// The rate limiter actor factory
82///
83/// Provides static methods to spawn rate limiter actors with different store types.
84/// Each actor runs in its own Tokio task and processes messages sequentially.
85pub struct RateLimiterActor;
86
87impl RateLimiterActor {
88 /// Spawn a new rate limiter actor with a periodic store
89 ///
90 /// # Parameters
91 ///
92 /// - `buffer_size`: Channel buffer size for backpressure control
93 /// - `store`: The periodic store instance to use
94 ///
95 /// # Returns
96 ///
97 /// A [`RateLimiterHandle`] for communicating with the actor
98 pub fn spawn_periodic(buffer_size: usize, store: PeriodicStore) -> RateLimiterHandle {
99 let (tx, rx) = mpsc::channel(buffer_size);
100
101 tokio::spawn(async move {
102 let store_type = StoreType::Periodic(RateLimiter::new(store));
103 run_actor(rx, store_type).await;
104 });
105
106 RateLimiterHandle { tx }
107 }
108
109 /// Spawn a new rate limiter actor with a probabilistic store
110 ///
111 /// # Parameters
112 ///
113 /// - `buffer_size`: Channel buffer size for backpressure control
114 /// - `store`: The probabilistic store instance to use
115 ///
116 /// # Returns
117 ///
118 /// A [`RateLimiterHandle`] for communicating with the actor
119 pub fn spawn_probabilistic(buffer_size: usize, store: ProbabilisticStore) -> RateLimiterHandle {
120 let (tx, rx) = mpsc::channel(buffer_size);
121
122 tokio::spawn(async move {
123 let store_type = StoreType::Probabilistic(RateLimiter::new(store));
124 run_actor(rx, store_type).await;
125 });
126
127 RateLimiterHandle { tx }
128 }
129
130 /// Spawn a new rate limiter actor with an adaptive store
131 ///
132 /// # Parameters
133 ///
134 /// - `buffer_size`: Channel buffer size for backpressure control
135 /// - `store`: The adaptive store instance to use
136 ///
137 /// # Returns
138 ///
139 /// A [`RateLimiterHandle`] for communicating with the actor
140 pub fn spawn_adaptive(buffer_size: usize, store: AdaptiveStore) -> RateLimiterHandle {
141 let (tx, rx) = mpsc::channel(buffer_size);
142
143 tokio::spawn(async move {
144 let store_type = StoreType::Adaptive(RateLimiter::new(store));
145 run_actor(rx, store_type).await;
146 });
147
148 RateLimiterHandle { tx }
149 }
150}
151
152/// Internal enum to handle different store types
153enum StoreType {
154 Periodic(RateLimiter<PeriodicStore>),
155 Probabilistic(RateLimiter<ProbabilisticStore>),
156 Adaptive(RateLimiter<AdaptiveStore>),
157}
158
159impl StoreType {
160 fn rate_limit(
161 &mut self,
162 key: &str,
163 max_burst: i64,
164 count_per_period: i64,
165 period: i64,
166 quantity: i64,
167 timestamp: std::time::SystemTime,
168 ) -> Result<(bool, throttlecrab::RateLimitResult), CellError> {
169 match self {
170 StoreType::Periodic(limiter) => limiter.rate_limit(
171 key,
172 max_burst,
173 count_per_period,
174 period,
175 quantity,
176 timestamp,
177 ),
178 StoreType::Probabilistic(limiter) => limiter.rate_limit(
179 key,
180 max_burst,
181 count_per_period,
182 period,
183 quantity,
184 timestamp,
185 ),
186 StoreType::Adaptive(limiter) => limiter.rate_limit(
187 key,
188 max_burst,
189 count_per_period,
190 period,
191 quantity,
192 timestamp,
193 ),
194 }
195 }
196}
197
198async fn run_actor(mut rx: mpsc::Receiver<RateLimiterMessage>, mut store_type: StoreType) {
199 while let Some(msg) = rx.recv().await {
200 match msg {
201 RateLimiterMessage::Throttle {
202 request,
203 response_tx,
204 } => {
205 let response = handle_throttle(&mut store_type, request);
206 // Ignore send errors - receiver may have timed out
207 let _ = response_tx.send(response);
208 }
209 }
210 }
211
212 tracing::info!("Rate limiter actor shutting down");
213}
214
215fn handle_throttle(
216 store_type: &mut StoreType,
217 request: ThrottleRequest,
218) -> Result<ThrottleResponse> {
219 // Check the rate limit
220 let (allowed, result) = store_type
221 .rate_limit(
222 &request.key,
223 request.max_burst,
224 request.count_per_period,
225 request.period,
226 request.quantity,
227 request.timestamp,
228 )
229 .map_err(|e| anyhow::anyhow!("Rate limit check failed: {}", e))?;
230
231 Ok(ThrottleResponse::from((allowed, result)))
232}