Skip to main content

punch_kernel/
scheduler.rs

1//! Per-fighter quota tracking and rate limiting.
2//!
3//! The [`Scheduler`] enforces tokens-per-hour and messages-per-hour limits on a
4//! per-fighter basis. Quota windows are sliding and old entries are periodically
5//! cleaned up.
6
7use std::collections::VecDeque;
8use std::sync::Mutex;
9
10use chrono::{DateTime, Duration, Utc};
11use dashmap::DashMap;
12use tracing::{debug, info, instrument};
13
14use punch_types::FighterId;
15
16/// A single recorded usage event within a quota window.
17#[derive(Debug, Clone)]
18struct UsageRecord {
19    /// When this usage was recorded.
20    timestamp: DateTime<Utc>,
21    /// Number of tokens consumed (0 for message-only records).
22    tokens: u64,
23}
24
25/// Per-fighter usage tracking.
26#[derive(Debug)]
27struct FighterQuota {
28    /// Rolling window of usage records.
29    records: VecDeque<UsageRecord>,
30}
31
32impl FighterQuota {
33    fn new() -> Self {
34        Self {
35            records: VecDeque::new(),
36        }
37    }
38
39    /// Remove records older than `window`.
40    fn evict_before(&mut self, cutoff: DateTime<Utc>) {
41        while let Some(front) = self.records.front() {
42            if front.timestamp < cutoff {
43                self.records.pop_front();
44            } else {
45                break;
46            }
47        }
48    }
49
50    /// Sum of tokens in the current window.
51    fn tokens_in_window(&self, cutoff: DateTime<Utc>) -> u64 {
52        self.records
53            .iter()
54            .filter(|r| r.timestamp >= cutoff)
55            .map(|r| r.tokens)
56            .sum()
57    }
58
59    /// Number of messages (records) in the current window.
60    fn messages_in_window(&self, cutoff: DateTime<Utc>) -> usize {
61        self.records
62            .iter()
63            .filter(|r| r.timestamp >= cutoff)
64            .count()
65    }
66}
67
68/// Quota configuration.
69#[derive(Debug, Clone)]
70pub struct QuotaConfig {
71    /// Maximum tokens a single fighter may consume per hour.
72    pub tokens_per_hour: u64,
73    /// Maximum messages a single fighter may send per hour.
74    pub messages_per_hour: u64,
75    /// Length of the sliding window.
76    pub window: Duration,
77}
78
79impl Default for QuotaConfig {
80    fn default() -> Self {
81        Self {
82            tokens_per_hour: 1_000_000,
83            messages_per_hour: 500,
84            window: Duration::hours(1),
85        }
86    }
87}
88
89/// Agent scheduler that enforces per-fighter quotas.
90///
91/// Thread-safe: all internal state is behind `DashMap` entries with inner
92/// `Mutex` guards so the `Scheduler` can be shared via `Arc`.
93pub struct Scheduler {
94    quotas: DashMap<FighterId, Mutex<FighterQuota>>,
95    config: QuotaConfig,
96}
97
98impl Scheduler {
99    /// Create a new scheduler with the given quota configuration.
100    pub fn new(config: QuotaConfig) -> Self {
101        Self {
102            quotas: DashMap::new(),
103            config,
104        }
105    }
106
107    /// Check whether `fighter_id` is within its quota limits.
108    ///
109    /// Returns `true` if the fighter may proceed, `false` if it has been
110    /// rate-limited.
111    #[instrument(skip(self), fields(%fighter_id))]
112    pub fn check_quota(&self, fighter_id: &FighterId) -> bool {
113        let now = Utc::now();
114        let cutoff = now - self.config.window;
115
116        let entry = self
117            .quotas
118            .entry(*fighter_id)
119            .or_insert_with(|| Mutex::new(FighterQuota::new()));
120        let quota = entry.value().lock().expect("quota lock poisoned");
121
122        let tokens = quota.tokens_in_window(cutoff);
123        let messages = quota.messages_in_window(cutoff);
124
125        let within_limits = tokens < self.config.tokens_per_hour
126            && messages < self.config.messages_per_hour as usize;
127
128        if !within_limits {
129            debug!(
130                tokens,
131                messages,
132                tokens_limit = self.config.tokens_per_hour,
133                messages_limit = self.config.messages_per_hour,
134                "fighter quota exceeded"
135            );
136        }
137
138        within_limits
139    }
140
141    /// Record token usage for a fighter.
142    #[instrument(skip(self), fields(%fighter_id, tokens))]
143    pub fn record_usage(&self, fighter_id: &FighterId, tokens: u64) {
144        let entry = self
145            .quotas
146            .entry(*fighter_id)
147            .or_insert_with(|| Mutex::new(FighterQuota::new()));
148        let mut quota = entry.value().lock().expect("quota lock poisoned");
149
150        quota.records.push_back(UsageRecord {
151            timestamp: Utc::now(),
152            tokens,
153        });
154
155        debug!("usage recorded");
156    }
157
158    /// Evict stale records outside the sliding window for all tracked fighters.
159    ///
160    /// Call this periodically (e.g. every few minutes) to keep memory bounded.
161    #[instrument(skip(self))]
162    pub fn cleanup(&self) {
163        let cutoff = Utc::now() - self.config.window;
164        let mut cleaned = 0usize;
165
166        self.quotas.iter().for_each(|entry| {
167            let mut quota = entry.value().lock().expect("quota lock poisoned");
168            let before = quota.records.len();
169            quota.evict_before(cutoff);
170            cleaned += before - quota.records.len();
171        });
172
173        // Remove fighters that have no records left.
174        self.quotas.retain(|_, v| {
175            let quota = v.get_mut().expect("quota lock poisoned");
176            !quota.records.is_empty()
177        });
178
179        info!(evicted_records = cleaned, "scheduler cleanup complete");
180    }
181
182    /// Remove all quota state for a given fighter (e.g. when the fighter is killed).
183    pub fn remove_fighter(&self, fighter_id: &FighterId) {
184        self.quotas.remove(fighter_id);
185    }
186}
187
188// ---------------------------------------------------------------------------
189// Tests
190// ---------------------------------------------------------------------------
191
192#[cfg(test)]
193mod tests {
194    use super::*;
195
196    fn test_config() -> QuotaConfig {
197        QuotaConfig {
198            tokens_per_hour: 1000,
199            messages_per_hour: 5,
200            window: Duration::hours(1),
201        }
202    }
203
204    #[test]
205    fn fresh_fighter_within_quota() {
206        let scheduler = Scheduler::new(test_config());
207        let id = FighterId::new();
208        assert!(scheduler.check_quota(&id));
209    }
210
211    #[test]
212    fn token_quota_exceeded() {
213        let scheduler = Scheduler::new(test_config());
214        let id = FighterId::new();
215
216        // Record usage just at the limit.
217        scheduler.record_usage(&id, 1000);
218        assert!(!scheduler.check_quota(&id));
219    }
220
221    #[test]
222    fn message_quota_exceeded() {
223        let scheduler = Scheduler::new(test_config());
224        let id = FighterId::new();
225
226        for _ in 0..5 {
227            scheduler.record_usage(&id, 0);
228        }
229
230        assert!(!scheduler.check_quota(&id));
231    }
232
233    #[test]
234    fn cleanup_removes_stale_entries() {
235        let config = QuotaConfig {
236            tokens_per_hour: 1000,
237            messages_per_hour: 100,
238            window: Duration::zero(),
239        };
240        let scheduler = Scheduler::new(config);
241        let id = FighterId::new();
242
243        scheduler.record_usage(&id, 500);
244        // With a zero-width window every record is immediately stale.
245        scheduler.cleanup();
246
247        // Fighter entry should have been removed entirely.
248        assert!(scheduler.check_quota(&id));
249    }
250
251    #[test]
252    fn remove_fighter_clears_quota() {
253        let scheduler = Scheduler::new(test_config());
254        let id = FighterId::new();
255
256        scheduler.record_usage(&id, 999);
257        scheduler.remove_fighter(&id);
258
259        // After removal the fighter starts fresh.
260        assert!(scheduler.check_quota(&id));
261    }
262
263    #[test]
264    fn independent_fighters_have_separate_quotas() {
265        let scheduler = Scheduler::new(test_config());
266        let a = FighterId::new();
267        let b = FighterId::new();
268
269        scheduler.record_usage(&a, 1000);
270
271        assert!(!scheduler.check_quota(&a));
272        assert!(scheduler.check_quota(&b));
273    }
274
275    #[test]
276    fn default_quota_config() {
277        let config = QuotaConfig::default();
278        assert_eq!(config.tokens_per_hour, 1_000_000);
279        assert_eq!(config.messages_per_hour, 500);
280    }
281
282    #[test]
283    fn check_quota_creates_entry_if_missing() {
284        let scheduler = Scheduler::new(test_config());
285        let id = FighterId::new();
286        // First check should create entry and return true (within quota).
287        assert!(scheduler.check_quota(&id));
288        // Second check should still work.
289        assert!(scheduler.check_quota(&id));
290    }
291
292    #[test]
293    fn token_quota_just_under_limit_passes() {
294        let scheduler = Scheduler::new(test_config());
295        let id = FighterId::new();
296
297        scheduler.record_usage(&id, 999);
298        assert!(scheduler.check_quota(&id));
299    }
300
301    #[test]
302    fn message_quota_just_under_limit_passes() {
303        let scheduler = Scheduler::new(test_config());
304        let id = FighterId::new();
305
306        for _ in 0..4 {
307            scheduler.record_usage(&id, 0);
308        }
309
310        assert!(scheduler.check_quota(&id));
311    }
312
313    #[test]
314    fn both_quotas_can_exceed_independently() {
315        // Exceed only tokens.
316        let scheduler = Scheduler::new(test_config());
317        let id_tok = FighterId::new();
318        scheduler.record_usage(&id_tok, 1001);
319        assert!(!scheduler.check_quota(&id_tok));
320
321        // Exceed only messages (with zero tokens each).
322        let scheduler2 = Scheduler::new(test_config());
323        let id_msg = FighterId::new();
324        for _ in 0..5 {
325            scheduler2.record_usage(&id_msg, 0);
326        }
327        assert!(!scheduler2.check_quota(&id_msg));
328    }
329
330    #[test]
331    fn remove_fighter_allows_reuse_of_quota() {
332        let scheduler = Scheduler::new(test_config());
333        let id = FighterId::new();
334
335        scheduler.record_usage(&id, 1000);
336        assert!(!scheduler.check_quota(&id));
337
338        scheduler.remove_fighter(&id);
339        assert!(scheduler.check_quota(&id));
340
341        // Can record usage again.
342        scheduler.record_usage(&id, 500);
343        assert!(scheduler.check_quota(&id));
344    }
345
346    #[test]
347    fn cleanup_with_no_entries_does_not_panic() {
348        let scheduler = Scheduler::new(test_config());
349        scheduler.cleanup();
350    }
351
352    #[test]
353    fn multiple_fighters_cleanup() {
354        let config = QuotaConfig {
355            tokens_per_hour: 1000,
356            messages_per_hour: 100,
357            window: Duration::zero(),
358        };
359        let scheduler = Scheduler::new(config);
360
361        let ids: Vec<FighterId> = (0..5).map(|_| FighterId::new()).collect();
362        for id in &ids {
363            scheduler.record_usage(id, 100);
364        }
365
366        scheduler.cleanup();
367
368        // All should be cleaned up since window is zero.
369        for id in &ids {
370            assert!(scheduler.check_quota(id));
371        }
372    }
373
374    #[test]
375    fn concurrent_quota_checks_are_safe() {
376        use std::sync::Arc;
377        use std::thread;
378
379        // Use generous limits so concurrent access doesn't exceed them.
380        let config = QuotaConfig {
381            tokens_per_hour: 1_000_000,
382            messages_per_hour: 1_000,
383            window: Duration::hours(1),
384        };
385        let scheduler = Arc::new(Scheduler::new(config));
386        let id = FighterId::new();
387
388        let mut handles = Vec::new();
389        for _ in 0..10 {
390            let sched = Arc::clone(&scheduler);
391            let fid = id;
392            handles.push(thread::spawn(move || {
393                sched.record_usage(&fid, 10);
394                sched.check_quota(&fid);
395            }));
396        }
397
398        for h in handles {
399            h.join().unwrap();
400        }
401
402        // Should still be within quota (10 records of 10 tokens = 100 tokens, 10 messages).
403        assert!(scheduler.check_quota(&id));
404    }
405
406    #[test]
407    fn remove_nonexistent_fighter_does_not_panic() {
408        let scheduler = Scheduler::new(test_config());
409        let id = FighterId::new();
410        scheduler.remove_fighter(&id); // Should not panic.
411    }
412
413    #[test]
414    fn accumulating_usage_reaches_limit() {
415        let scheduler = Scheduler::new(test_config());
416        let id = FighterId::new();
417
418        // Record 200 tokens 5 times = 1000, which should hit the limit.
419        for _ in 0..5 {
420            scheduler.record_usage(&id, 200);
421        }
422        assert!(!scheduler.check_quota(&id));
423    }
424}