throttlecrab_server/
actor.rs1use crate::types::{ThrottleRequest, ThrottleResponse};
2use anyhow::Result;
3use throttlecrab::{AdaptiveStore, CellError, PeriodicStore, ProbabilisticStore, RateLimiter};
4use tokio::sync::{mpsc, oneshot};
5
6pub enum RateLimiterMessage {
8 Throttle {
9 request: ThrottleRequest,
10 response_tx: oneshot::Sender<Result<ThrottleResponse>>,
11 },
12 }
14
15#[derive(Clone)]
17pub struct RateLimiterHandle {
18 tx: mpsc::Sender<RateLimiterMessage>,
19}
20
21impl RateLimiterHandle {
22 pub async fn throttle(&self, request: ThrottleRequest) -> Result<ThrottleResponse> {
24 let (response_tx, response_rx) = oneshot::channel();
25
26 self.tx
27 .send(RateLimiterMessage::Throttle {
28 request,
29 response_tx,
30 })
31 .await
32 .map_err(|_| anyhow::anyhow!("Rate limiter actor has shut down"))?;
33
34 response_rx
35 .await
36 .map_err(|_| anyhow::anyhow!("Rate limiter actor dropped response channel"))?
37 }
38}
39
40pub struct RateLimiterActor;
42
43impl RateLimiterActor {
44 pub fn spawn_periodic(buffer_size: usize, store: PeriodicStore) -> RateLimiterHandle {
46 let (tx, rx) = mpsc::channel(buffer_size);
47
48 tokio::spawn(async move {
49 let store_type = StoreType::Periodic(RateLimiter::new(store));
50 run_actor(rx, store_type).await;
51 });
52
53 RateLimiterHandle { tx }
54 }
55
56 pub fn spawn_probabilistic(buffer_size: usize, store: ProbabilisticStore) -> RateLimiterHandle {
58 let (tx, rx) = mpsc::channel(buffer_size);
59
60 tokio::spawn(async move {
61 let store_type = StoreType::Probabilistic(RateLimiter::new(store));
62 run_actor(rx, store_type).await;
63 });
64
65 RateLimiterHandle { tx }
66 }
67
68 pub fn spawn_adaptive(buffer_size: usize, store: AdaptiveStore) -> RateLimiterHandle {
70 let (tx, rx) = mpsc::channel(buffer_size);
71
72 tokio::spawn(async move {
73 let store_type = StoreType::Adaptive(RateLimiter::new(store));
74 run_actor(rx, store_type).await;
75 });
76
77 RateLimiterHandle { tx }
78 }
79}
80
81enum StoreType {
83 Periodic(RateLimiter<PeriodicStore>),
84 Probabilistic(RateLimiter<ProbabilisticStore>),
85 Adaptive(RateLimiter<AdaptiveStore>),
86}
87
88impl StoreType {
89 fn rate_limit(
90 &mut self,
91 key: &str,
92 max_burst: i64,
93 count_per_period: i64,
94 period: i64,
95 quantity: i64,
96 timestamp: std::time::SystemTime,
97 ) -> Result<(bool, throttlecrab::RateLimitResult), CellError> {
98 match self {
99 StoreType::Periodic(limiter) => limiter.rate_limit(
100 key,
101 max_burst,
102 count_per_period,
103 period,
104 quantity,
105 timestamp,
106 ),
107 StoreType::Probabilistic(limiter) => limiter.rate_limit(
108 key,
109 max_burst,
110 count_per_period,
111 period,
112 quantity,
113 timestamp,
114 ),
115 StoreType::Adaptive(limiter) => limiter.rate_limit(
116 key,
117 max_burst,
118 count_per_period,
119 period,
120 quantity,
121 timestamp,
122 ),
123 }
124 }
125}
126
127async fn run_actor(mut rx: mpsc::Receiver<RateLimiterMessage>, mut store_type: StoreType) {
128 while let Some(msg) = rx.recv().await {
129 match msg {
130 RateLimiterMessage::Throttle {
131 request,
132 response_tx,
133 } => {
134 let response = handle_throttle(&mut store_type, request);
135 let _ = response_tx.send(response);
137 }
138 }
139 }
140
141 tracing::info!("Rate limiter actor shutting down");
142}
143
144fn handle_throttle(
145 store_type: &mut StoreType,
146 request: ThrottleRequest,
147) -> Result<ThrottleResponse> {
148 let (allowed, result) = store_type
150 .rate_limit(
151 &request.key,
152 request.max_burst,
153 request.count_per_period,
154 request.period,
155 request.quantity,
156 request.timestamp,
157 )
158 .map_err(|e| anyhow::anyhow!("Rate limit check failed: {}", e))?;
159
160 Ok(ThrottleResponse::from((allowed, result)))
161}