throttlecrab_server/
actor.rs

1//! Actor-based rate limiter for shared state management
2//!
3//! This module implements an actor pattern to ensure thread-safe access to the
4//! rate limiter state. All transports communicate with a single actor instance,
5//! guaranteeing consistent rate limiting across protocols.
6//!
7//! # Architecture
8//!
9//! The actor pattern provides:
10//! - **Thread Safety**: Single-threaded access to mutable state
11//! - **Async Communication**: Non-blocking message passing via channels
12//! - **Protocol Independence**: All transports use the same interface
13//!
14//! # Example
15//!
16//! ```ignore
17//! // Spawn an actor with an adaptive store
18//! let limiter = RateLimiterActor::spawn_adaptive(10000, AdaptiveStore::new());
19//!
20//! // Use the handle from any transport
21//! let response = limiter.throttle(request).await?;
22//! ```
23
24use crate::metrics::Metrics;
25use crate::types::{ThrottleRequest, ThrottleResponse};
26use anyhow::Result;
27use std::sync::Arc;
28use throttlecrab::{AdaptiveStore, CellError, PeriodicStore, ProbabilisticStore, RateLimiter};
29use tokio::sync::{mpsc, oneshot};
30
31/// Message types for the rate limiter actor
32///
33/// Currently supports throttle requests, but can be extended with
34/// additional message types like statistics queries or cache clearing.
35pub enum RateLimiterMessage {
36    /// Check rate limit for a key
37    Throttle {
38        /// The rate limit request
39        request: ThrottleRequest,
40        /// Channel to send the response back
41        response_tx: oneshot::Sender<Result<ThrottleResponse>>,
42    },
43    // Future: Stats, Clear, Shutdown, etc.
44}
45
46/// Handle to communicate with the rate limiter actor
47///
48/// This handle can be cloned and shared across multiple tasks/threads.
49/// All operations are async and non-blocking.
50#[derive(Clone)]
51pub struct RateLimiterHandle {
52    tx: mpsc::Sender<RateLimiterMessage>,
53    #[allow(dead_code)] // Will be used for future metrics queries
54    pub metrics: Arc<Metrics>,
55}
56
57impl RateLimiterHandle {
58    /// Check rate limit for a key
59    ///
60    /// Sends a throttle request to the actor and waits for the response.
61    /// This method is cancel-safe and can be used in select! expressions.
62    ///
63    /// # Errors
64    ///
65    /// Returns an error if:
66    /// - The actor has shut down
67    /// - The response channel was dropped
68    pub async fn throttle(&self, request: ThrottleRequest) -> Result<ThrottleResponse> {
69        let (response_tx, response_rx) = oneshot::channel();
70
71        self.tx
72            .send(RateLimiterMessage::Throttle {
73                request,
74                response_tx,
75            })
76            .await
77            .map_err(|_| anyhow::anyhow!("Rate limiter actor has shut down"))?;
78
79        response_rx
80            .await
81            .map_err(|_| anyhow::anyhow!("Rate limiter actor dropped response channel"))?
82    }
83}
84
85/// The rate limiter actor factory
86///
87/// Provides static methods to spawn rate limiter actors with different store types.
88/// Each actor runs in its own Tokio task and processes messages sequentially.
89pub struct RateLimiterActor;
90
91impl RateLimiterActor {
92    /// Spawn a new rate limiter actor with a periodic store
93    ///
94    /// # Parameters
95    ///
96    /// - `buffer_size`: Channel buffer size for backpressure control
97    /// - `store`: The periodic store instance to use
98    ///
99    /// # Returns
100    ///
101    /// A [`RateLimiterHandle`] for communicating with the actor
102    pub fn spawn_periodic(
103        buffer_size: usize,
104        store: PeriodicStore,
105        metrics: Arc<Metrics>,
106    ) -> RateLimiterHandle {
107        let (tx, rx) = mpsc::channel(buffer_size);
108        let metrics_clone = Arc::clone(&metrics);
109
110        tokio::spawn(async move {
111            let store_type = StoreType::Periodic(RateLimiter::new(store));
112            run_actor(rx, store_type, metrics_clone).await;
113        });
114
115        RateLimiterHandle { tx, metrics }
116    }
117
118    /// Spawn a new rate limiter actor with a probabilistic store
119    ///
120    /// # Parameters
121    ///
122    /// - `buffer_size`: Channel buffer size for backpressure control
123    /// - `store`: The probabilistic store instance to use
124    ///
125    /// # Returns
126    ///
127    /// A [`RateLimiterHandle`] for communicating with the actor
128    pub fn spawn_probabilistic(
129        buffer_size: usize,
130        store: ProbabilisticStore,
131        metrics: Arc<Metrics>,
132    ) -> RateLimiterHandle {
133        let (tx, rx) = mpsc::channel(buffer_size);
134        let metrics_clone = Arc::clone(&metrics);
135
136        tokio::spawn(async move {
137            let store_type = StoreType::Probabilistic(RateLimiter::new(store));
138            run_actor(rx, store_type, metrics_clone).await;
139        });
140
141        RateLimiterHandle { tx, metrics }
142    }
143
144    /// Spawn a new rate limiter actor with an adaptive store
145    ///
146    /// # Parameters
147    ///
148    /// - `buffer_size`: Channel buffer size for backpressure control
149    /// - `store`: The adaptive store instance to use
150    ///
151    /// # Returns
152    ///
153    /// A [`RateLimiterHandle`] for communicating with the actor
154    pub fn spawn_adaptive(
155        buffer_size: usize,
156        store: AdaptiveStore,
157        metrics: Arc<Metrics>,
158    ) -> RateLimiterHandle {
159        let (tx, rx) = mpsc::channel(buffer_size);
160        let metrics_clone = Arc::clone(&metrics);
161
162        tokio::spawn(async move {
163            let store_type = StoreType::Adaptive(RateLimiter::new(store));
164            run_actor(rx, store_type, metrics_clone).await;
165        });
166
167        RateLimiterHandle { tx, metrics }
168    }
169}
170
171/// Internal enum to handle different store types
172enum StoreType {
173    Periodic(RateLimiter<PeriodicStore>),
174    Probabilistic(RateLimiter<ProbabilisticStore>),
175    Adaptive(RateLimiter<AdaptiveStore>),
176}
177
178impl StoreType {
179    fn rate_limit(
180        &mut self,
181        key: &str,
182        max_burst: i64,
183        count_per_period: i64,
184        period: i64,
185        quantity: i64,
186        timestamp: std::time::SystemTime,
187    ) -> Result<(bool, throttlecrab::RateLimitResult), CellError> {
188        match self {
189            StoreType::Periodic(limiter) => limiter.rate_limit(
190                key,
191                max_burst,
192                count_per_period,
193                period,
194                quantity,
195                timestamp,
196            ),
197            StoreType::Probabilistic(limiter) => limiter.rate_limit(
198                key,
199                max_burst,
200                count_per_period,
201                period,
202                quantity,
203                timestamp,
204            ),
205            StoreType::Adaptive(limiter) => limiter.rate_limit(
206                key,
207                max_burst,
208                count_per_period,
209                period,
210                quantity,
211                timestamp,
212            ),
213        }
214    }
215}
216
217async fn run_actor(
218    mut rx: mpsc::Receiver<RateLimiterMessage>,
219    mut store_type: StoreType,
220    _metrics: Arc<Metrics>,
221) {
222    while let Some(msg) = rx.recv().await {
223        match msg {
224            RateLimiterMessage::Throttle {
225                request,
226                response_tx,
227            } => {
228                let response = handle_throttle(&mut store_type, request);
229                // Ignore send errors - receiver may have timed out
230                let _ = response_tx.send(response);
231            }
232        }
233    }
234
235    tracing::info!("Rate limiter actor shutting down");
236}
237
238fn handle_throttle(
239    store_type: &mut StoreType,
240    request: ThrottleRequest,
241) -> Result<ThrottleResponse> {
242    // Check the rate limit
243    let (allowed, result) = store_type
244        .rate_limit(
245            &request.key,
246            request.max_burst,
247            request.count_per_period,
248            request.period,
249            request.quantity,
250            request.timestamp,
251        )
252        .map_err(|e| anyhow::anyhow!("Rate limit check failed: {}", e))?;
253
254    Ok(ThrottleResponse::from((allowed, result)))
255}