throttlecrab_server/
actor.rs

1use crate::types::{ThrottleRequest, ThrottleResponse};
2use anyhow::Result;
3use throttlecrab::{AdaptiveStore, CellError, PeriodicStore, ProbabilisticStore, RateLimiter};
4use tokio::sync::{mpsc, oneshot};
5
6/// Message types for the rate limiter actor
7pub enum RateLimiterMessage {
8    Throttle {
9        request: ThrottleRequest,
10        response_tx: oneshot::Sender<Result<ThrottleResponse>>,
11    },
12    // Future: Stats, Clear, Shutdown, etc.
13}
14
15/// Handle to communicate with the rate limiter actor
16#[derive(Clone)]
17pub struct RateLimiterHandle {
18    tx: mpsc::Sender<RateLimiterMessage>,
19}
20
21impl RateLimiterHandle {
22    /// Check rate limit for a key
23    pub async fn throttle(&self, request: ThrottleRequest) -> Result<ThrottleResponse> {
24        let (response_tx, response_rx) = oneshot::channel();
25
26        self.tx
27            .send(RateLimiterMessage::Throttle {
28                request,
29                response_tx,
30            })
31            .await
32            .map_err(|_| anyhow::anyhow!("Rate limiter actor has shut down"))?;
33
34        response_rx
35            .await
36            .map_err(|_| anyhow::anyhow!("Rate limiter actor dropped response channel"))?
37    }
38}
39
40/// The rate limiter actor
41pub struct RateLimiterActor;
42
43impl RateLimiterActor {
44    /// Spawn a new rate limiter actor with a periodic store
45    pub fn spawn_periodic(buffer_size: usize, store: PeriodicStore) -> RateLimiterHandle {
46        let (tx, rx) = mpsc::channel(buffer_size);
47
48        tokio::spawn(async move {
49            let store_type = StoreType::Periodic(RateLimiter::new(store));
50            run_actor(rx, store_type).await;
51        });
52
53        RateLimiterHandle { tx }
54    }
55
56    /// Spawn a new rate limiter actor with a probabilistic store
57    pub fn spawn_probabilistic(buffer_size: usize, store: ProbabilisticStore) -> RateLimiterHandle {
58        let (tx, rx) = mpsc::channel(buffer_size);
59
60        tokio::spawn(async move {
61            let store_type = StoreType::Probabilistic(RateLimiter::new(store));
62            run_actor(rx, store_type).await;
63        });
64
65        RateLimiterHandle { tx }
66    }
67
68    /// Spawn a new rate limiter actor with an adaptive store
69    pub fn spawn_adaptive(buffer_size: usize, store: AdaptiveStore) -> RateLimiterHandle {
70        let (tx, rx) = mpsc::channel(buffer_size);
71
72        tokio::spawn(async move {
73            let store_type = StoreType::Adaptive(RateLimiter::new(store));
74            run_actor(rx, store_type).await;
75        });
76
77        RateLimiterHandle { tx }
78    }
79}
80
81/// Internal enum to handle different store types
82enum StoreType {
83    Periodic(RateLimiter<PeriodicStore>),
84    Probabilistic(RateLimiter<ProbabilisticStore>),
85    Adaptive(RateLimiter<AdaptiveStore>),
86}
87
88impl StoreType {
89    fn rate_limit(
90        &mut self,
91        key: &str,
92        max_burst: i64,
93        count_per_period: i64,
94        period: i64,
95        quantity: i64,
96        timestamp: std::time::SystemTime,
97    ) -> Result<(bool, throttlecrab::RateLimitResult), CellError> {
98        match self {
99            StoreType::Periodic(limiter) => limiter.rate_limit(
100                key,
101                max_burst,
102                count_per_period,
103                period,
104                quantity,
105                timestamp,
106            ),
107            StoreType::Probabilistic(limiter) => limiter.rate_limit(
108                key,
109                max_burst,
110                count_per_period,
111                period,
112                quantity,
113                timestamp,
114            ),
115            StoreType::Adaptive(limiter) => limiter.rate_limit(
116                key,
117                max_burst,
118                count_per_period,
119                period,
120                quantity,
121                timestamp,
122            ),
123        }
124    }
125}
126
127async fn run_actor(mut rx: mpsc::Receiver<RateLimiterMessage>, mut store_type: StoreType) {
128    while let Some(msg) = rx.recv().await {
129        match msg {
130            RateLimiterMessage::Throttle {
131                request,
132                response_tx,
133            } => {
134                let response = handle_throttle(&mut store_type, request);
135                // Ignore send errors - receiver may have timed out
136                let _ = response_tx.send(response);
137            }
138        }
139    }
140
141    tracing::info!("Rate limiter actor shutting down");
142}
143
144fn handle_throttle(
145    store_type: &mut StoreType,
146    request: ThrottleRequest,
147) -> Result<ThrottleResponse> {
148    // Check the rate limit
149    let (allowed, result) = store_type
150        .rate_limit(
151            &request.key,
152            request.max_burst,
153            request.count_per_period,
154            request.period,
155            request.quantity,
156            request.timestamp,
157        )
158        .map_err(|e| anyhow::anyhow!("Rate limit check failed: {}", e))?;
159
160    Ok(ThrottleResponse::from((allowed, result)))
161}