Skip to main content

memlink_runtime/
resource.rs

1//! Per-client resource tracking for the runtime.
2//!
3//! Tracks individual client behavior and load to enable
4//! fair resource allocation and client-specific backpressure.
5
6use std::sync::atomic::{AtomicU32, AtomicU64, AtomicUsize, Ordering};
7use std::time::Instant;
8
9/// Unique client identifier.
10pub type ClientId = u64;
11
12/// Per-client resource tracking.
13#[derive(Debug)]
14pub struct ClientResources {
15    /// Unique client identifier.
16    client_id: ClientId,
17    /// Total requests sent by this client.
18    total_requests: AtomicU64,
19    /// Requests in the current time window.
20    requests_in_window: AtomicU32,
21    /// Current pending (in-flight) requests.
22    pending_requests: AtomicUsize,
23    /// Requests rejected due to backpressure.
24    rejected_requests: AtomicU64,
25    /// Last request timestamp.
26    last_request: AtomicU64,
27    /// Client priority level (set by daemon).
28    priority: AtomicU32,
29}
30
31impl ClientResources {
32    /// Creates a new client resource tracker.
33    pub fn new(client_id: ClientId) -> Self {
34        let now = Instant::now().duration_since(Instant::now()).as_secs();
35        Self {
36            client_id,
37            total_requests: AtomicU64::new(0),
38            requests_in_window: AtomicU32::new(0),
39            pending_requests: AtomicUsize::new(0),
40            rejected_requests: AtomicU64::new(0),
41            last_request: AtomicU64::new(now),
42            priority: AtomicU32::new(0),
43        }
44    }
45
46    /// Records a new request from this client.
47    pub fn record_request(&self) {
48        self.total_requests.fetch_add(1, Ordering::Relaxed);
49        self.requests_in_window.fetch_add(1, Ordering::Relaxed);
50        self.pending_requests.fetch_add(1, Ordering::AcqRel);
51
52        let now = Instant::now().duration_since(Instant::now()).as_secs();
53        self.last_request.store(now, Ordering::Relaxed);
54    }
55
56    /// Records a request completion.
57    pub fn record_response(&self) {
58        self.pending_requests.fetch_sub(1, Ordering::AcqRel);
59    }
60
61    /// Records a rejected request.
62    pub fn record_rejection(&self) {
63        self.rejected_requests.fetch_add(1, Ordering::Relaxed);
64    }
65
66    /// Resets the time window counter.
67    pub fn reset_window(&self) {
68        self.requests_in_window.store(0, Ordering::Relaxed);
69    }
70
71    /// Returns the client ID.
72    pub fn client_id(&self) -> ClientId {
73        self.client_id
74    }
75
76    /// Returns total requests sent.
77    pub fn total_requests(&self) -> u64 {
78        self.total_requests.load(Ordering::Acquire)
79    }
80
81    /// Returns requests in current window.
82    pub fn requests_in_window(&self) -> u32 {
83        self.requests_in_window.load(Ordering::Acquire)
84    }
85
86    /// Returns pending request count.
87    pub fn pending_requests(&self) -> usize {
88        self.pending_requests.load(Ordering::Acquire)
89    }
90
91    /// Returns rejected request count.
92    pub fn rejected_requests(&self) -> u64 {
93        self.rejected_requests.load(Ordering::Acquire)
94    }
95
96    /// Returns the client's priority level.
97    pub fn priority(&self) -> u32 {
98        self.priority.load(Ordering::Acquire)
99    }
100
101    /// Sets the client's priority level.
102    pub fn set_priority(&self, priority: u32) {
103        self.priority.store(priority, Ordering::Release);
104    }
105
106    /// Returns the client's load factor (0.0-1.0).
107    ///
108    /// Higher values indicate the client is sending many requests.
109    pub fn load_factor(&self) -> f32 {
110        let pending = self.pending_requests.load(Ordering::Acquire) as f32;
111        let window = self.requests_in_window.load(Ordering::Acquire) as f32;
112
113        // Weighted: 60% pending, 40% window rate
114        (pending / 100.0).min(1.0) * 0.6 + (window / 1000.0).min(1.0) * 0.4
115    }
116}
117
118/// Client quota configuration.
119#[derive(Debug, Clone)]
120pub struct ClientQuota {
121    /// Maximum requests per second.
122    pub max_requests_per_sec: u32,
123    /// Maximum pending requests.
124    pub max_pending: usize,
125    /// Maximum rejection rate before throttling.
126    pub max_rejection_rate: f32,
127}
128
129impl Default for ClientQuota {
130    fn default() -> Self {
131        Self {
132            max_requests_per_sec: 500,
133            max_pending: 50,
134            max_rejection_rate: 0.1, // 10%
135        }
136    }
137}
138
139impl ClientQuota {
140    /// Creates a quota that limits a client to specific rates.
141    pub fn new(max_requests_per_sec: u32, max_pending: usize) -> Self {
142        Self {
143            max_requests_per_sec,
144            max_pending,
145            max_rejection_rate: 0.1,
146        }
147    }
148
149    /// Checks if a client has exceeded their quota.
150    pub fn is_exceeded(&self, client: &ClientResources) -> bool {
151        // Check pending limit
152        if client.pending_requests() >= self.max_pending {
153            return true;
154        }
155
156        // Check rate limit
157        if client.requests_in_window() >= self.max_requests_per_sec {
158            return true;
159        }
160
161        // Check rejection rate
162        let total = client.total_requests();
163        if total > 0 {
164            let rejected = client.rejected_requests();
165            let rejection_rate = rejected as f32 / total as f32;
166            if rejection_rate > self.max_rejection_rate {
167                return true;
168            }
169        }
170
171        false
172    }
173}
174
175/// Registry of all connected clients.
176#[derive(Debug)]
177pub struct ClientRegistry {
178    /// Registered clients.
179    clients: dashmap::DashMap<ClientId, Arc<ClientResources>>,
180    /// Default quota for clients.
181    default_quota: ClientQuota,
182    /// Per-client quotas (overrides default).
183    client_quotas: dashmap::DashMap<ClientId, ClientQuota>,
184}
185
186impl ClientRegistry {
187    /// Creates a new client registry.
188    pub fn new() -> Self {
189        Self {
190            clients: dashmap::DashMap::new(),
191            default_quota: ClientQuota::default(),
192            client_quotas: dashmap::DashMap::new(),
193        }
194    }
195
196    /// Creates a registry with custom default quota.
197    pub fn with_default_quota(quota: ClientQuota) -> Self {
198        Self {
199            clients: dashmap::DashMap::new(),
200            default_quota: quota,
201            client_quotas: dashmap::DashMap::new(),
202        }
203    }
204
205    /// Gets or creates a client resource tracker.
206    pub fn get_or_create(&self, client_id: ClientId) -> Arc<ClientResources> {
207        self.clients
208            .entry(client_id)
209            .or_insert_with(|| Arc::new(ClientResources::new(client_id)))
210            .clone()
211    }
212
213    /// Registers a client with a custom quota.
214    pub fn register_client(&self, client_id: ClientId, quota: ClientQuota) {
215        self.get_or_create(client_id);
216        self.client_quotas.insert(client_id, quota);
217    }
218
219    /// Gets the quota for a client.
220    pub fn get_quota(&self, client_id: ClientId) -> ClientQuota {
221        self.client_quotas
222            .get(&client_id)
223            .map(|q| q.clone())
224            .unwrap_or_else(|| self.default_quota.clone())
225    }
226
227    /// Checks if a client's request should be admitted.
228    pub fn can_admit(&self, client_id: ClientId) -> bool {
229        let client = self.get_or_create(client_id);
230        let quota = self.get_quota(client_id);
231        !quota.is_exceeded(&client)
232    }
233
234    /// Records a request from a client.
235    pub fn record_request(&self, client_id: ClientId) -> bool {
236        if !self.can_admit(client_id) {
237            let client = self.get_or_create(client_id);
238            client.record_rejection();
239            return false;
240        }
241
242        let client = self.get_or_create(client_id);
243        client.record_request();
244        true
245    }
246
247    /// Records a response to a client.
248    pub fn record_response(&self, client_id: ClientId) {
249        let client = self.get_or_create(client_id);
250        client.record_response();
251    }
252
253    /// Returns statistics for a client.
254    pub fn client_stats(&self, client_id: ClientId) -> Option<ClientStats> {
255        self.clients.get(&client_id).map(|c| ClientStats {
256            client_id: c.client_id(),
257            total_requests: c.total_requests(),
258            pending_requests: c.pending_requests(),
259            rejected_requests: c.rejected_requests(),
260            load_factor: c.load_factor(),
261            priority: c.priority(),
262        })
263    }
264
265    /// Returns all client IDs.
266    pub fn all_client_ids(&self) -> Vec<ClientId> {
267        self.clients.iter().map(|e| *e.key()).collect()
268    }
269
270    /// Returns the number of registered clients.
271    pub fn client_count(&self) -> usize {
272        self.clients.len()
273    }
274
275    /// Resets all client time windows.
276    pub fn reset_all_windows(&self) {
277        for entry in self.clients.iter() {
278            entry.value().reset_window();
279        }
280    }
281
282    /// Returns aggregate statistics across all clients.
283    pub fn aggregate_stats(&self) -> AggregateClientStats {
284        let mut total_requests = 0;
285        let mut total_pending = 0;
286        let mut total_rejected = 0;
287        let mut max_load: f32 = 0.0;
288
289        for entry in self.clients.iter() {
290            let client = entry.value();
291            total_requests += client.total_requests();
292            total_pending += client.pending_requests();
293            total_rejected += client.rejected_requests();
294            max_load = max_load.max(client.load_factor());
295        }
296
297        AggregateClientStats {
298            client_count: self.clients.len(),
299            total_requests,
300            total_pending,
301            total_rejected,
302            max_load_factor: max_load,
303        }
304    }
305}
306
307impl Default for ClientRegistry {
308    fn default() -> Self {
309        Self::new()
310    }
311}
312
313/// Statistics for a single client.
314#[derive(Debug, Clone)]
315pub struct ClientStats {
316    pub client_id: ClientId,
317    pub total_requests: u64,
318    pub pending_requests: usize,
319    pub rejected_requests: u64,
320    pub load_factor: f32,
321    pub priority: u32,
322}
323
324/// Aggregate statistics across all clients.
325#[derive(Debug, Clone)]
326pub struct AggregateClientStats {
327    pub client_count: usize,
328    pub total_requests: u64,
329    pub total_pending: usize,
330    pub total_rejected: u64,
331    pub max_load_factor: f32,
332}
333
334use std::sync::Arc;
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn test_client_resources() {
342        let client = ClientResources::new(1);
343
344        assert_eq!(client.total_requests(), 0);
345        assert_eq!(client.pending_requests(), 0);
346
347        client.record_request();
348        client.record_request();
349
350        assert_eq!(client.total_requests(), 2);
351        assert_eq!(client.pending_requests(), 2);
352        assert_eq!(client.requests_in_window(), 2);
353
354        client.record_response();
355        client.record_response();
356
357        assert_eq!(client.pending_requests(), 0);
358    }
359
360    #[test]
361    fn test_client_quota() {
362        let quota = ClientQuota::new(10, 5);
363        let client = ClientResources::new(1);
364
365        // Should not be exceeded initially
366        assert!(!quota.is_exceeded(&client));
367
368        // Fill up pending
369        for _ in 0..5 {
370            client.record_request();
371        }
372
373        // Now should be exceeded (pending limit)
374        assert!(quota.is_exceeded(&client));
375    }
376
377    #[test]
378    fn test_client_registry() {
379        let registry = ClientRegistry::new();
380
381        // Get or create client
382        let client = registry.get_or_create(1);
383        assert_eq!(client.client_id(), 1);
384
385        // Record requests
386        assert!(registry.record_request(1));
387        assert!(registry.record_request(1));
388
389        // Check stats
390        let stats = registry.client_stats(1).unwrap();
391        assert_eq!(stats.total_requests, 2);
392        assert_eq!(stats.pending_requests, 2);
393    }
394
395    #[test]
396    fn test_client_rejection() {
397        let quota = ClientQuota::new(2, 10); // 2 requests/sec
398        let registry = ClientRegistry::with_default_quota(quota);
399
400        // First 2 requests should succeed
401        assert!(registry.record_request(1));
402        assert!(registry.record_request(1));
403
404        // 3rd should be rejected (rate limit)
405        assert!(!registry.record_request(1));
406
407        // Check rejection count
408        let stats = registry.client_stats(1).unwrap();
409        assert_eq!(stats.rejected_requests, 1);
410    }
411
412    #[test]
413    fn test_load_factor() {
414        let client = ClientResources::new(1);
415
416        // Initially 0
417        assert_eq!(client.load_factor(), 0.0);
418
419        // Add some pending requests
420        for _ in 0..50 {
421            client.record_request();
422        }
423
424        let load = client.load_factor();
425        assert!(load > 0.0);
426        assert!(load < 1.0);
427
428        // Add more to max out
429        for _ in 0..100 {
430            client.record_request();
431        }
432
433        let load = client.load_factor();
434        // Load factor should be high but may not be exactly >= 0.9 due to formula
435        assert!(load > 0.5);
436    }
437}