throttlecrab_server/
actor.rs

1//! Actor-based rate limiter for shared state management
2//!
3//! This module implements an actor pattern to ensure thread-safe access to the
4//! rate limiter state. All transports communicate with a single actor instance,
5//! guaranteeing consistent rate limiting across protocols.
6//!
7//! # Architecture
8//!
9//! The actor pattern provides:
10//! - **Thread Safety**: Single-threaded access to mutable state
11//! - **Async Communication**: Non-blocking message passing via channels
12//! - **Protocol Independence**: All transports use the same interface
13//!
14//! # Example
15//!
16//! ```ignore
17//! // Spawn an actor with an adaptive store
18//! let limiter = RateLimiterActor::spawn_adaptive(10000, AdaptiveStore::new());
19//!
20//! // Use the handle from any transport
21//! let response = limiter.throttle(request).await?;
22//! ```
23
24use crate::types::{ThrottleRequest, ThrottleResponse};
25use anyhow::Result;
26use throttlecrab::{AdaptiveStore, CellError, PeriodicStore, ProbabilisticStore, RateLimiter};
27use tokio::sync::{mpsc, oneshot};
28
29/// Message types for the rate limiter actor
30///
31/// Currently supports throttle requests, but can be extended with
32/// additional message types like statistics queries or cache clearing.
33pub enum RateLimiterMessage {
34    /// Check rate limit for a key
35    Throttle {
36        /// The rate limit request
37        request: ThrottleRequest,
38        /// Channel to send the response back
39        response_tx: oneshot::Sender<Result<ThrottleResponse>>,
40    },
41    // Future: Stats, Clear, Shutdown, etc.
42}
43
44/// Handle to communicate with the rate limiter actor
45///
46/// This handle can be cloned and shared across multiple tasks/threads.
47/// All operations are async and non-blocking.
48#[derive(Clone)]
49pub struct RateLimiterHandle {
50    tx: mpsc::Sender<RateLimiterMessage>,
51}
52
53impl RateLimiterHandle {
54    /// Check rate limit for a key
55    ///
56    /// Sends a throttle request to the actor and waits for the response.
57    /// This method is cancel-safe and can be used in select! expressions.
58    ///
59    /// # Errors
60    ///
61    /// Returns an error if:
62    /// - The actor has shut down
63    /// - The response channel was dropped
64    pub async fn throttle(&self, request: ThrottleRequest) -> Result<ThrottleResponse> {
65        let (response_tx, response_rx) = oneshot::channel();
66
67        self.tx
68            .send(RateLimiterMessage::Throttle {
69                request,
70                response_tx,
71            })
72            .await
73            .map_err(|_| anyhow::anyhow!("Rate limiter actor has shut down"))?;
74
75        response_rx
76            .await
77            .map_err(|_| anyhow::anyhow!("Rate limiter actor dropped response channel"))?
78    }
79}
80
81/// The rate limiter actor factory
82///
83/// Provides static methods to spawn rate limiter actors with different store types.
84/// Each actor runs in its own Tokio task and processes messages sequentially.
85pub struct RateLimiterActor;
86
87impl RateLimiterActor {
88    /// Spawn a new rate limiter actor with a periodic store
89    ///
90    /// # Parameters
91    ///
92    /// - `buffer_size`: Channel buffer size for backpressure control
93    /// - `store`: The periodic store instance to use
94    ///
95    /// # Returns
96    ///
97    /// A [`RateLimiterHandle`] for communicating with the actor
98    pub fn spawn_periodic(buffer_size: usize, store: PeriodicStore) -> RateLimiterHandle {
99        let (tx, rx) = mpsc::channel(buffer_size);
100
101        tokio::spawn(async move {
102            let store_type = StoreType::Periodic(RateLimiter::new(store));
103            run_actor(rx, store_type).await;
104        });
105
106        RateLimiterHandle { tx }
107    }
108
109    /// Spawn a new rate limiter actor with a probabilistic store
110    ///
111    /// # Parameters
112    ///
113    /// - `buffer_size`: Channel buffer size for backpressure control
114    /// - `store`: The probabilistic store instance to use
115    ///
116    /// # Returns
117    ///
118    /// A [`RateLimiterHandle`] for communicating with the actor
119    pub fn spawn_probabilistic(buffer_size: usize, store: ProbabilisticStore) -> RateLimiterHandle {
120        let (tx, rx) = mpsc::channel(buffer_size);
121
122        tokio::spawn(async move {
123            let store_type = StoreType::Probabilistic(RateLimiter::new(store));
124            run_actor(rx, store_type).await;
125        });
126
127        RateLimiterHandle { tx }
128    }
129
130    /// Spawn a new rate limiter actor with an adaptive store
131    ///
132    /// # Parameters
133    ///
134    /// - `buffer_size`: Channel buffer size for backpressure control
135    /// - `store`: The adaptive store instance to use
136    ///
137    /// # Returns
138    ///
139    /// A [`RateLimiterHandle`] for communicating with the actor
140    pub fn spawn_adaptive(buffer_size: usize, store: AdaptiveStore) -> RateLimiterHandle {
141        let (tx, rx) = mpsc::channel(buffer_size);
142
143        tokio::spawn(async move {
144            let store_type = StoreType::Adaptive(RateLimiter::new(store));
145            run_actor(rx, store_type).await;
146        });
147
148        RateLimiterHandle { tx }
149    }
150}
151
152/// Internal enum to handle different store types
153enum StoreType {
154    Periodic(RateLimiter<PeriodicStore>),
155    Probabilistic(RateLimiter<ProbabilisticStore>),
156    Adaptive(RateLimiter<AdaptiveStore>),
157}
158
159impl StoreType {
160    fn rate_limit(
161        &mut self,
162        key: &str,
163        max_burst: i64,
164        count_per_period: i64,
165        period: i64,
166        quantity: i64,
167        timestamp: std::time::SystemTime,
168    ) -> Result<(bool, throttlecrab::RateLimitResult), CellError> {
169        match self {
170            StoreType::Periodic(limiter) => limiter.rate_limit(
171                key,
172                max_burst,
173                count_per_period,
174                period,
175                quantity,
176                timestamp,
177            ),
178            StoreType::Probabilistic(limiter) => limiter.rate_limit(
179                key,
180                max_burst,
181                count_per_period,
182                period,
183                quantity,
184                timestamp,
185            ),
186            StoreType::Adaptive(limiter) => limiter.rate_limit(
187                key,
188                max_burst,
189                count_per_period,
190                period,
191                quantity,
192                timestamp,
193            ),
194        }
195    }
196}
197
198async fn run_actor(mut rx: mpsc::Receiver<RateLimiterMessage>, mut store_type: StoreType) {
199    while let Some(msg) = rx.recv().await {
200        match msg {
201            RateLimiterMessage::Throttle {
202                request,
203                response_tx,
204            } => {
205                let response = handle_throttle(&mut store_type, request);
206                // Ignore send errors - receiver may have timed out
207                let _ = response_tx.send(response);
208            }
209        }
210    }
211
212    tracing::info!("Rate limiter actor shutting down");
213}
214
215fn handle_throttle(
216    store_type: &mut StoreType,
217    request: ThrottleRequest,
218) -> Result<ThrottleResponse> {
219    // Check the rate limit
220    let (allowed, result) = store_type
221        .rate_limit(
222            &request.key,
223            request.max_burst,
224            request.count_per_period,
225            request.period,
226            request.quantity,
227            request.timestamp,
228        )
229        .map_err(|e| anyhow::anyhow!("Rate limit check failed: {}", e))?;
230
231    Ok(ThrottleResponse::from((allowed, result)))
232}