punch_kernel/
scheduler.rs1use std::collections::VecDeque;
8use std::sync::Mutex;
9
10use chrono::{DateTime, Duration, Utc};
11use dashmap::DashMap;
12use tracing::{debug, info, instrument};
13
14use punch_types::FighterId;
15
16#[derive(Debug, Clone)]
18struct UsageRecord {
19 timestamp: DateTime<Utc>,
21 tokens: u64,
23}
24
25#[derive(Debug)]
27struct FighterQuota {
28 records: VecDeque<UsageRecord>,
30}
31
32impl FighterQuota {
33 fn new() -> Self {
34 Self {
35 records: VecDeque::new(),
36 }
37 }
38
39 fn evict_before(&mut self, cutoff: DateTime<Utc>) {
41 while let Some(front) = self.records.front() {
42 if front.timestamp < cutoff {
43 self.records.pop_front();
44 } else {
45 break;
46 }
47 }
48 }
49
50 fn tokens_in_window(&self, cutoff: DateTime<Utc>) -> u64 {
52 self.records
53 .iter()
54 .filter(|r| r.timestamp >= cutoff)
55 .map(|r| r.tokens)
56 .sum()
57 }
58
59 fn messages_in_window(&self, cutoff: DateTime<Utc>) -> usize {
61 self.records
62 .iter()
63 .filter(|r| r.timestamp >= cutoff)
64 .count()
65 }
66}
67
68#[derive(Debug, Clone)]
70pub struct QuotaConfig {
71 pub tokens_per_hour: u64,
73 pub messages_per_hour: u64,
75 pub window: Duration,
77}
78
79impl Default for QuotaConfig {
80 fn default() -> Self {
81 Self {
82 tokens_per_hour: 1_000_000,
83 messages_per_hour: 500,
84 window: Duration::hours(1),
85 }
86 }
87}
88
89pub struct Scheduler {
94 quotas: DashMap<FighterId, Mutex<FighterQuota>>,
95 config: QuotaConfig,
96}
97
98impl Scheduler {
99 pub fn new(config: QuotaConfig) -> Self {
101 Self {
102 quotas: DashMap::new(),
103 config,
104 }
105 }
106
107 #[instrument(skip(self), fields(%fighter_id))]
112 pub fn check_quota(&self, fighter_id: &FighterId) -> bool {
113 let now = Utc::now();
114 let cutoff = now - self.config.window;
115
116 let entry = self
117 .quotas
118 .entry(*fighter_id)
119 .or_insert_with(|| Mutex::new(FighterQuota::new()));
120 let quota = entry.value().lock().expect("quota lock poisoned");
121
122 let tokens = quota.tokens_in_window(cutoff);
123 let messages = quota.messages_in_window(cutoff);
124
125 let within_limits = tokens < self.config.tokens_per_hour
126 && messages < self.config.messages_per_hour as usize;
127
128 if !within_limits {
129 debug!(
130 tokens,
131 messages,
132 tokens_limit = self.config.tokens_per_hour,
133 messages_limit = self.config.messages_per_hour,
134 "fighter quota exceeded"
135 );
136 }
137
138 within_limits
139 }
140
141 #[instrument(skip(self), fields(%fighter_id, tokens))]
143 pub fn record_usage(&self, fighter_id: &FighterId, tokens: u64) {
144 let entry = self
145 .quotas
146 .entry(*fighter_id)
147 .or_insert_with(|| Mutex::new(FighterQuota::new()));
148 let mut quota = entry.value().lock().expect("quota lock poisoned");
149
150 quota.records.push_back(UsageRecord {
151 timestamp: Utc::now(),
152 tokens,
153 });
154
155 debug!("usage recorded");
156 }
157
158 #[instrument(skip(self))]
162 pub fn cleanup(&self) {
163 let cutoff = Utc::now() - self.config.window;
164 let mut cleaned = 0usize;
165
166 self.quotas.iter().for_each(|entry| {
167 let mut quota = entry.value().lock().expect("quota lock poisoned");
168 let before = quota.records.len();
169 quota.evict_before(cutoff);
170 cleaned += before - quota.records.len();
171 });
172
173 self.quotas.retain(|_, v| {
175 let quota = v.get_mut().expect("quota lock poisoned");
176 !quota.records.is_empty()
177 });
178
179 info!(evicted_records = cleaned, "scheduler cleanup complete");
180 }
181
182 pub fn remove_fighter(&self, fighter_id: &FighterId) {
184 self.quotas.remove(fighter_id);
185 }
186}
187
188#[cfg(test)]
193mod tests {
194 use super::*;
195
196 fn test_config() -> QuotaConfig {
197 QuotaConfig {
198 tokens_per_hour: 1000,
199 messages_per_hour: 5,
200 window: Duration::hours(1),
201 }
202 }
203
204 #[test]
205 fn fresh_fighter_within_quota() {
206 let scheduler = Scheduler::new(test_config());
207 let id = FighterId::new();
208 assert!(scheduler.check_quota(&id));
209 }
210
211 #[test]
212 fn token_quota_exceeded() {
213 let scheduler = Scheduler::new(test_config());
214 let id = FighterId::new();
215
216 scheduler.record_usage(&id, 1000);
218 assert!(!scheduler.check_quota(&id));
219 }
220
221 #[test]
222 fn message_quota_exceeded() {
223 let scheduler = Scheduler::new(test_config());
224 let id = FighterId::new();
225
226 for _ in 0..5 {
227 scheduler.record_usage(&id, 0);
228 }
229
230 assert!(!scheduler.check_quota(&id));
231 }
232
233 #[test]
234 fn cleanup_removes_stale_entries() {
235 let config = QuotaConfig {
236 tokens_per_hour: 1000,
237 messages_per_hour: 100,
238 window: Duration::zero(),
239 };
240 let scheduler = Scheduler::new(config);
241 let id = FighterId::new();
242
243 scheduler.record_usage(&id, 500);
244 scheduler.cleanup();
246
247 assert!(scheduler.check_quota(&id));
249 }
250
251 #[test]
252 fn remove_fighter_clears_quota() {
253 let scheduler = Scheduler::new(test_config());
254 let id = FighterId::new();
255
256 scheduler.record_usage(&id, 999);
257 scheduler.remove_fighter(&id);
258
259 assert!(scheduler.check_quota(&id));
261 }
262
263 #[test]
264 fn independent_fighters_have_separate_quotas() {
265 let scheduler = Scheduler::new(test_config());
266 let a = FighterId::new();
267 let b = FighterId::new();
268
269 scheduler.record_usage(&a, 1000);
270
271 assert!(!scheduler.check_quota(&a));
272 assert!(scheduler.check_quota(&b));
273 }
274
275 #[test]
276 fn default_quota_config() {
277 let config = QuotaConfig::default();
278 assert_eq!(config.tokens_per_hour, 1_000_000);
279 assert_eq!(config.messages_per_hour, 500);
280 }
281
282 #[test]
283 fn check_quota_creates_entry_if_missing() {
284 let scheduler = Scheduler::new(test_config());
285 let id = FighterId::new();
286 assert!(scheduler.check_quota(&id));
288 assert!(scheduler.check_quota(&id));
290 }
291
292 #[test]
293 fn token_quota_just_under_limit_passes() {
294 let scheduler = Scheduler::new(test_config());
295 let id = FighterId::new();
296
297 scheduler.record_usage(&id, 999);
298 assert!(scheduler.check_quota(&id));
299 }
300
301 #[test]
302 fn message_quota_just_under_limit_passes() {
303 let scheduler = Scheduler::new(test_config());
304 let id = FighterId::new();
305
306 for _ in 0..4 {
307 scheduler.record_usage(&id, 0);
308 }
309
310 assert!(scheduler.check_quota(&id));
311 }
312
313 #[test]
314 fn both_quotas_can_exceed_independently() {
315 let scheduler = Scheduler::new(test_config());
317 let id_tok = FighterId::new();
318 scheduler.record_usage(&id_tok, 1001);
319 assert!(!scheduler.check_quota(&id_tok));
320
321 let scheduler2 = Scheduler::new(test_config());
323 let id_msg = FighterId::new();
324 for _ in 0..5 {
325 scheduler2.record_usage(&id_msg, 0);
326 }
327 assert!(!scheduler2.check_quota(&id_msg));
328 }
329
330 #[test]
331 fn remove_fighter_allows_reuse_of_quota() {
332 let scheduler = Scheduler::new(test_config());
333 let id = FighterId::new();
334
335 scheduler.record_usage(&id, 1000);
336 assert!(!scheduler.check_quota(&id));
337
338 scheduler.remove_fighter(&id);
339 assert!(scheduler.check_quota(&id));
340
341 scheduler.record_usage(&id, 500);
343 assert!(scheduler.check_quota(&id));
344 }
345
346 #[test]
347 fn cleanup_with_no_entries_does_not_panic() {
348 let scheduler = Scheduler::new(test_config());
349 scheduler.cleanup();
350 }
351
352 #[test]
353 fn multiple_fighters_cleanup() {
354 let config = QuotaConfig {
355 tokens_per_hour: 1000,
356 messages_per_hour: 100,
357 window: Duration::zero(),
358 };
359 let scheduler = Scheduler::new(config);
360
361 let ids: Vec<FighterId> = (0..5).map(|_| FighterId::new()).collect();
362 for id in &ids {
363 scheduler.record_usage(id, 100);
364 }
365
366 scheduler.cleanup();
367
368 for id in &ids {
370 assert!(scheduler.check_quota(id));
371 }
372 }
373
374 #[test]
375 fn concurrent_quota_checks_are_safe() {
376 use std::sync::Arc;
377 use std::thread;
378
379 let config = QuotaConfig {
381 tokens_per_hour: 1_000_000,
382 messages_per_hour: 1_000,
383 window: Duration::hours(1),
384 };
385 let scheduler = Arc::new(Scheduler::new(config));
386 let id = FighterId::new();
387
388 let mut handles = Vec::new();
389 for _ in 0..10 {
390 let sched = Arc::clone(&scheduler);
391 let fid = id;
392 handles.push(thread::spawn(move || {
393 sched.record_usage(&fid, 10);
394 sched.check_quota(&fid);
395 }));
396 }
397
398 for h in handles {
399 h.join().unwrap();
400 }
401
402 assert!(scheduler.check_quota(&id));
404 }
405
406 #[test]
407 fn remove_nonexistent_fighter_does_not_panic() {
408 let scheduler = Scheduler::new(test_config());
409 let id = FighterId::new();
410 scheduler.remove_fighter(&id); }
412
413 #[test]
414 fn accumulating_usage_reaches_limit() {
415 let scheduler = Scheduler::new(test_config());
416 let id = FighterId::new();
417
418 for _ in 0..5 {
420 scheduler.record_usage(&id, 200);
421 }
422 assert!(!scheduler.check_quota(&id));
423 }
424}