1use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
7use std::time::Instant;
8
9pub type ClientId = u64;
11
12#[derive(Debug)]
14pub struct ClientResources {
15 client_id: ClientId,
17 total_requests: AtomicU64,
19 requests_in_window: AtomicU32,
21 pending_requests: AtomicUsize,
23 rejected_requests: AtomicU64,
25 last_request: AtomicU64,
27 priority: AtomicU32,
29}
30
31impl ClientResources {
32 pub fn new(client_id: ClientId) -> Self {
34 let now = Instant::now().duration_since(Instant::now()).as_secs();
35 Self {
36 client_id,
37 total_requests: AtomicU64::new(0),
38 requests_in_window: AtomicU32::new(0),
39 pending_requests: AtomicUsize::new(0),
40 rejected_requests: AtomicU64::new(0),
41 last_request: AtomicU64::new(now),
42 priority: AtomicU32::new(0),
43 }
44 }
45
46 pub fn record_request(&self) {
48 self.total_requests.fetch_add(1, Ordering::Relaxed);
49 self.requests_in_window.fetch_add(1, Ordering::Relaxed);
50 self.pending_requests.fetch_add(1, Ordering::AcqRel);
51
52 let now = Instant::now().duration_since(Instant::now()).as_secs();
53 self.last_request.store(now, Ordering::Relaxed);
54 }
55
56 pub fn record_response(&self) {
58 self.pending_requests.fetch_sub(1, Ordering::AcqRel);
59 }
60
61 pub fn record_rejection(&self) {
63 self.rejected_requests.fetch_add(1, Ordering::Relaxed);
64 }
65
66 pub fn reset_window(&self) {
68 self.requests_in_window.store(0, Ordering::Relaxed);
69 }
70
71 pub fn client_id(&self) -> ClientId {
73 self.client_id
74 }
75
76 pub fn total_requests(&self) -> u64 {
78 self.total_requests.load(Ordering::Acquire)
79 }
80
81 pub fn requests_in_window(&self) -> u32 {
83 self.requests_in_window.load(Ordering::Acquire)
84 }
85
86 pub fn pending_requests(&self) -> usize {
88 self.pending_requests.load(Ordering::Acquire)
89 }
90
91 pub fn rejected_requests(&self) -> u64 {
93 self.rejected_requests.load(Ordering::Acquire)
94 }
95
96 pub fn priority(&self) -> u32 {
98 self.priority.load(Ordering::Acquire)
99 }
100
101 pub fn set_priority(&self, priority: u32) {
103 self.priority.store(priority, Ordering::Release);
104 }
105
106 pub fn load_factor(&self) -> f32 {
110 let pending = self.pending_requests.load(Ordering::Acquire) as f32;
111 let window = self.requests_in_window.load(Ordering::Acquire) as f32;
112
113 (pending / 100.0).min(1.0) * 0.6 + (window / 1000.0).min(1.0) * 0.4
115 }
116}
117
118#[derive(Debug, Clone)]
120pub struct ClientQuota {
121 pub max_requests_per_sec: u32,
123 pub max_pending: usize,
125 pub max_rejection_rate: f32,
127}
128
129impl Default for ClientQuota {
130 fn default() -> Self {
131 Self {
132 max_requests_per_sec: 500,
133 max_pending: 50,
134 max_rejection_rate: 0.1, }
136 }
137}
138
139impl ClientQuota {
140 pub fn new(max_requests_per_sec: u32, max_pending: usize) -> Self {
142 Self {
143 max_requests_per_sec,
144 max_pending,
145 max_rejection_rate: 0.1,
146 }
147 }
148
149 pub fn is_exceeded(&self, client: &ClientResources) -> bool {
151 if client.pending_requests() >= self.max_pending {
153 return true;
154 }
155
156 if client.requests_in_window() >= self.max_requests_per_sec {
158 return true;
159 }
160
161 let total = client.total_requests();
163 if total > 0 {
164 let rejected = client.rejected_requests();
165 let rejection_rate = rejected as f32 / total as f32;
166 if rejection_rate > self.max_rejection_rate {
167 return true;
168 }
169 }
170
171 false
172 }
173}
174
175#[derive(Debug)]
177pub struct ClientRegistry {
178 clients: dashmap::DashMap<ClientId, Arc<ClientResources>>,
180 default_quota: ClientQuota,
182 client_quotas: dashmap::DashMap<ClientId, ClientQuota>,
184}
185
186impl ClientRegistry {
187 pub fn new() -> Self {
189 Self {
190 clients: dashmap::DashMap::new(),
191 default_quota: ClientQuota::default(),
192 client_quotas: dashmap::DashMap::new(),
193 }
194 }
195
196 pub fn with_default_quota(quota: ClientQuota) -> Self {
198 Self {
199 clients: dashmap::DashMap::new(),
200 default_quota: quota,
201 client_quotas: dashmap::DashMap::new(),
202 }
203 }
204
205 pub fn get_or_create(&self, client_id: ClientId) -> Arc<ClientResources> {
207 self.clients
208 .entry(client_id)
209 .or_insert_with(|| Arc::new(ClientResources::new(client_id)))
210 .clone()
211 }
212
213 pub fn register_client(&self, client_id: ClientId, quota: ClientQuota) {
215 self.get_or_create(client_id);
216 self.client_quotas.insert(client_id, quota);
217 }
218
219 pub fn get_quota(&self, client_id: ClientId) -> ClientQuota {
221 self.client_quotas
222 .get(&client_id)
223 .map(|q| q.clone())
224 .unwrap_or_else(|| self.default_quota.clone())
225 }
226
227 pub fn can_admit(&self, client_id: ClientId) -> bool {
229 let client = self.get_or_create(client_id);
230 let quota = self.get_quota(client_id);
231 !quota.is_exceeded(&client)
232 }
233
234 pub fn record_request(&self, client_id: ClientId) -> bool {
236 if !self.can_admit(client_id) {
237 let client = self.get_or_create(client_id);
238 client.record_rejection();
239 return false;
240 }
241
242 let client = self.get_or_create(client_id);
243 client.record_request();
244 true
245 }
246
247 pub fn record_response(&self, client_id: ClientId) {
249 let client = self.get_or_create(client_id);
250 client.record_response();
251 }
252
253 pub fn client_stats(&self, client_id: ClientId) -> Option<ClientStats> {
255 self.clients.get(&client_id).map(|c| ClientStats {
256 client_id: c.client_id(),
257 total_requests: c.total_requests(),
258 pending_requests: c.pending_requests(),
259 rejected_requests: c.rejected_requests(),
260 load_factor: c.load_factor(),
261 priority: c.priority(),
262 })
263 }
264
265 pub fn all_client_ids(&self) -> Vec<ClientId> {
267 self.clients.iter().map(|e| *e.key()).collect()
268 }
269
270 pub fn client_count(&self) -> usize {
272 self.clients.len()
273 }
274
275 pub fn reset_all_windows(&self) {
277 for entry in self.clients.iter() {
278 entry.value().reset_window();
279 }
280 }
281
282 pub fn aggregate_stats(&self) -> AggregateClientStats {
284 let mut total_requests = 0;
285 let mut total_pending = 0;
286 let mut total_rejected = 0;
287 let mut max_load: f32 = 0.0;
288
289 for entry in self.clients.iter() {
290 let client = entry.value();
291 total_requests += client.total_requests();
292 total_pending += client.pending_requests();
293 total_rejected += client.rejected_requests();
294 max_load = max_load.max(client.load_factor());
295 }
296
297 AggregateClientStats {
298 client_count: self.clients.len(),
299 total_requests,
300 total_pending,
301 total_rejected,
302 max_load_factor: max_load,
303 }
304 }
305}
306
307impl Default for ClientRegistry {
308 fn default() -> Self {
309 Self::new()
310 }
311}
312
313#[derive(Debug, Clone)]
315pub struct ClientStats {
316 pub client_id: ClientId,
317 pub total_requests: u64,
318 pub pending_requests: usize,
319 pub rejected_requests: u64,
320 pub load_factor: f32,
321 pub priority: u32,
322}
323
324#[derive(Debug, Clone)]
326pub struct AggregateClientStats {
327 pub client_count: usize,
328 pub total_requests: u64,
329 pub total_pending: usize,
330 pub total_rejected: u64,
331 pub max_load_factor: f32,
332}
333
334use std::sync::Arc;
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn test_client_resources() {
342 let client = ClientResources::new(1);
343
344 assert_eq!(client.total_requests(), 0);
345 assert_eq!(client.pending_requests(), 0);
346
347 client.record_request();
348 client.record_request();
349
350 assert_eq!(client.total_requests(), 2);
351 assert_eq!(client.pending_requests(), 2);
352 assert_eq!(client.requests_in_window(), 2);
353
354 client.record_response();
355 client.record_response();
356
357 assert_eq!(client.pending_requests(), 0);
358 }
359
360 #[test]
361 fn test_client_quota() {
362 let quota = ClientQuota::new(10, 5);
363 let client = ClientResources::new(1);
364
365 assert!(!quota.is_exceeded(&client));
367
368 for _ in 0..5 {
370 client.record_request();
371 }
372
373 assert!(quota.is_exceeded(&client));
375 }
376
377 #[test]
378 fn test_client_registry() {
379 let registry = ClientRegistry::new();
380
381 let client = registry.get_or_create(1);
383 assert_eq!(client.client_id(), 1);
384
385 assert!(registry.record_request(1));
387 assert!(registry.record_request(1));
388
389 let stats = registry.client_stats(1).unwrap();
391 assert_eq!(stats.total_requests, 2);
392 assert_eq!(stats.pending_requests, 2);
393 }
394
395 #[test]
396 fn test_client_rejection() {
397 let quota = ClientQuota::new(2, 10); let registry = ClientRegistry::with_default_quota(quota);
399
400 assert!(registry.record_request(1));
402 assert!(registry.record_request(1));
403
404 assert!(!registry.record_request(1));
406
407 let stats = registry.client_stats(1).unwrap();
409 assert_eq!(stats.rejected_requests, 1);
410 }
411
412 #[test]
413 fn test_load_factor() {
414 let client = ClientResources::new(1);
415
416 assert_eq!(client.load_factor(), 0.0);
418
419 for _ in 0..50 {
421 client.record_request();
422 }
423
424 let load = client.load_factor();
425 assert!(load > 0.0);
426 assert!(load < 1.0);
427
428 for _ in 0..100 {
430 client.record_request();
431 }
432
433 let load = client.load_factor();
434 assert!(load > 0.5);
436 }
437}