Skip to main content

chio_guards/
velocity.rs

1//! Velocity guard -- synchronous token bucket rate limiting per grant.
2//!
3//! Prevents runaway tool usage by throttling agent invocations per
4//! (capability_id, grant_index) pair using a token bucket algorithm.
5//! The guard uses `std::sync::Mutex` (synchronous, no async) and fits
6//! into the existing `Guard` pipeline.
7//!
8//! All arithmetic uses integer milli-tokens (u64) to eliminate accumulated
9//! floating-point drift. The refill rate is expressed as milli-tokens per
10//! millisecond.
11
12use std::collections::HashMap;
13use std::sync::Mutex;
14use std::time::Instant;
15
16use chio_kernel::{Guard, GuardContext, KernelError, Verdict};
17
18// ---------------------------------------------------------------------------
19// TokenBucket (private)
20// ---------------------------------------------------------------------------
21
22/// Token bucket using integer milli-token arithmetic to avoid floating-point
23/// drift. One logical token == 1_000 milli-tokens.
24///
25/// Fields:
26///   capacity_mt     -- maximum bucket level in milli-tokens
27///   tokens_mt       -- current level in milli-tokens
28///   refill_rate_mpm -- refill rate in milli-tokens per millisecond
29///   last_refill     -- wall-clock instant of last refill
30struct TokenBucket {
31    capacity_mt: u64,
32    tokens_mt: u64,
33    /// Milli-tokens added per millisecond of elapsed time.
34    refill_rate_mpm: u64,
35    last_refill: Instant,
36}
37
38/// Milli-tokens per logical token.
39const MT_PER_TOKEN: u64 = 1_000;
40
41impl TokenBucket {
42    /// Create a new bucket.
43    ///
44    /// `capacity_tokens`   -- maximum logical tokens (burst ceiling)
45    /// `window_secs`       -- window duration used to derive the refill rate
46    /// `max_per_window`    -- logical tokens added per window
47    fn new(capacity_tokens: u64, max_per_window: u64, window_secs: u64) -> Self {
48        // refill_rate_mpm = (max_per_window * MT_PER_TOKEN) / (window_secs * 1000 ms/s)
49        // We keep a minimum rate of 1 milli-token/ms to avoid divide-by-zero and
50        // ensure very slow rates still make progress.
51        let window_ms = window_secs.saturating_mul(1_000).max(1);
52        let refill_rate_mpm = (max_per_window.saturating_mul(MT_PER_TOKEN))
53            .checked_div(window_ms)
54            .unwrap_or(1)
55            .max(1);
56
57        Self {
58            capacity_mt: capacity_tokens.saturating_mul(MT_PER_TOKEN),
59            tokens_mt: capacity_tokens.saturating_mul(MT_PER_TOKEN),
60            refill_rate_mpm,
61            last_refill: Instant::now(),
62        }
63    }
64
65    /// Attempt to consume `amount_tokens` logical tokens. Returns true on
66    /// success (tokens were available), false if the bucket is too empty.
67    fn try_consume(&mut self, amount_tokens: u64) -> bool {
68        self.refill();
69        let cost_mt = amount_tokens.saturating_mul(MT_PER_TOKEN);
70        if self.tokens_mt >= cost_mt {
71            self.tokens_mt -= cost_mt;
72            true
73        } else {
74            false
75        }
76    }
77
78    /// Refill the bucket based on elapsed time since the last refill.
79    fn refill(&mut self) {
80        let elapsed_ms = self.last_refill.elapsed().as_millis() as u64;
81        if elapsed_ms == 0 {
82            return;
83        }
84        let added = elapsed_ms.saturating_mul(self.refill_rate_mpm);
85        self.tokens_mt = self.tokens_mt.saturating_add(added).min(self.capacity_mt);
86        self.last_refill = Instant::now();
87    }
88}
89
90// ---------------------------------------------------------------------------
91// VelocityConfig
92// ---------------------------------------------------------------------------
93
94/// Configuration for `VelocityGuard`.
95#[derive(Clone, Debug)]
96pub struct VelocityConfig {
97    /// Maximum invocations per window. None means unlimited.
98    pub max_invocations_per_window: Option<u32>,
99    /// Maximum spend (monetary units) per window. None means unlimited.
100    pub max_spend_per_window: Option<u64>,
101    /// Window duration in seconds.
102    pub window_secs: u64,
103    /// Burst factor (1.0 = no burst above steady rate).
104    pub burst_factor: f64,
105}
106
107impl Default for VelocityConfig {
108    fn default() -> Self {
109        Self {
110            max_invocations_per_window: None,
111            max_spend_per_window: None,
112            window_secs: 60,
113            burst_factor: 1.0,
114        }
115    }
116}
117
118// ---------------------------------------------------------------------------
119// VelocityGuard
120// ---------------------------------------------------------------------------
121
122/// Guard that rate-limits agent invocations using synchronous token buckets.
123///
124/// Buckets are keyed by `(capability_id, grant_index)` so different grants
125/// within the same capability can have independent rate limits.
126pub struct VelocityGuard {
127    invocation_buckets: Mutex<HashMap<(String, usize), TokenBucket>>,
128    spend_buckets: Mutex<HashMap<(String, usize), TokenBucket>>,
129    config: VelocityConfig,
130}
131
132impl VelocityGuard {
133    /// Create a new `VelocityGuard` with the given configuration.
134    pub fn new(config: VelocityConfig) -> Self {
135        Self {
136            invocation_buckets: Mutex::new(HashMap::new()),
137            spend_buckets: Mutex::new(HashMap::new()),
138            config,
139        }
140    }
141}
142
143impl Guard for VelocityGuard {
144    fn name(&self) -> &str {
145        "velocity"
146    }
147
148    fn evaluate(&self, ctx: &GuardContext) -> Result<Verdict, KernelError> {
149        let grant_index = ctx.matched_grant_index.unwrap_or(0);
150        let key = (ctx.request.capability.id.clone(), grant_index);
151
152        let window_secs = self.config.window_secs.max(1);
153
154        // Check invocation rate limit.
155        if let Some(max_inv) = self.config.max_invocations_per_window {
156            // Burst capacity: max_inv * burst_factor, rounded to nearest integer.
157            let capacity = ((max_inv as f64 * self.config.burst_factor).round() as u64).max(1);
158
159            let mut buckets = self.invocation_buckets.lock().map_err(|_| {
160                KernelError::Internal("velocity guard invocation lock poisoned".to_string())
161            })?;
162            let bucket = buckets
163                .entry(key.clone())
164                .or_insert_with(|| TokenBucket::new(capacity, max_inv as u64, window_secs));
165            if !bucket.try_consume(1) {
166                return Ok(Verdict::Deny);
167            }
168        }
169
170        // Check spend rate limit.
171        if let Some(max_spend) = self.config.max_spend_per_window {
172            let capacity = ((max_spend as f64 * self.config.burst_factor).round() as u64).max(1);
173            let spend_units = planned_spend_units(ctx)?;
174
175            let mut buckets = self.spend_buckets.lock().map_err(|_| {
176                KernelError::Internal("velocity guard spend lock poisoned".to_string())
177            })?;
178            let bucket = buckets
179                .entry(key)
180                .or_insert_with(|| TokenBucket::new(capacity, max_spend, window_secs));
181            if !bucket.try_consume(spend_units) {
182                return Ok(Verdict::Deny);
183            }
184        }
185
186        Ok(Verdict::Allow)
187    }
188}
189
190fn planned_spend_units(ctx: &GuardContext) -> Result<u64, KernelError> {
191    let grant_index = ctx.matched_grant_index.ok_or_else(|| {
192        KernelError::Internal(
193            "velocity guard spend limiting requires matched_grant_index".to_string(),
194        )
195    })?;
196    let grant = ctx.scope.grants.get(grant_index).ok_or_else(|| {
197        KernelError::Internal(format!(
198            "velocity guard could not resolve grant index {grant_index}"
199        ))
200    })?;
201    grant
202        .max_cost_per_invocation
203        .as_ref()
204        .map(|amount| amount.units)
205        .ok_or_else(|| {
206            KernelError::Internal(
207                "velocity guard spend limiting requires max_cost_per_invocation on the matched grant"
208                    .to_string(),
209            )
210        })
211}
212
213// ---------------------------------------------------------------------------
214// Tests
215// ---------------------------------------------------------------------------
216
217#[cfg(test)]
218mod tests {
219    use std::thread;
220    use std::time::Duration;
221
222    use chio_core::capability::{
223        CapabilityToken, CapabilityTokenBody, ChioScope, MonetaryAmount, Operation, ToolGrant,
224    };
225    use chio_core::crypto::Keypair;
226
227    use super::*;
228
229    // Helper: build a minimal ToolCallRequest.
230    fn make_request(
231        cap: &CapabilityToken,
232        agent_id: &str,
233        server_id: &str,
234    ) -> chio_kernel::ToolCallRequest {
235        chio_kernel::ToolCallRequest {
236            request_id: "req-test".to_string(),
237            capability: cap.clone(),
238            tool_name: "read_file".to_string(),
239            server_id: server_id.to_string(),
240            agent_id: agent_id.to_string(),
241            arguments: serde_json::json!({}),
242            dpop_proof: None,
243            governed_intent: None,
244            approval_token: None,
245            model_metadata: None,
246            federated_origin_kernel_id: None,
247        }
248    }
249
250    fn signed_cap(kp: &Keypair, cap_id: &str) -> CapabilityToken {
251        let scope = ChioScope::default();
252        let body = CapabilityTokenBody {
253            id: cap_id.to_string(),
254            issuer: kp.public_key(),
255            subject: kp.public_key(),
256            scope,
257            issued_at: 0,
258            expires_at: u64::MAX,
259            delegation_chain: vec![],
260        };
261        CapabilityToken::sign(body, kp).expect("sign cap")
262    }
263
264    fn spend_scope(max_cost_per_invocation: u64) -> ChioScope {
265        ChioScope {
266            grants: vec![ToolGrant {
267                server_id: "srv".to_string(),
268                tool_name: "read_file".to_string(),
269                operations: vec![Operation::Invoke],
270                constraints: vec![],
271                max_invocations: None,
272                max_cost_per_invocation: Some(MonetaryAmount {
273                    units: max_cost_per_invocation,
274                    currency: "USD".to_string(),
275                }),
276                max_total_cost: None,
277                dpop_required: None,
278            }],
279            ..ChioScope::default()
280        }
281    }
282
283    fn guard_ctx<'a>(
284        request: &'a chio_kernel::ToolCallRequest,
285        scope: &'a ChioScope,
286        agent_id: &'a String,
287        server_id: &'a String,
288        grant_index: Option<usize>,
289    ) -> chio_kernel::GuardContext<'a> {
290        chio_kernel::GuardContext {
291            request,
292            scope,
293            agent_id,
294            server_id,
295            session_filesystem_roots: None,
296            matched_grant_index: grant_index,
297        }
298    }
299
300    #[test]
301    fn guard_name_is_velocity() {
302        let guard = VelocityGuard::new(VelocityConfig::default());
303        assert_eq!(guard.name(), "velocity");
304    }
305
306    #[test]
307    fn velocity_config_defaults_unlimited() {
308        let config = VelocityConfig::default();
309        assert!(config.max_invocations_per_window.is_none());
310        assert!(config.max_spend_per_window.is_none());
311        assert_eq!(config.window_secs, 60);
312        assert_eq!(config.burst_factor, 1.0);
313    }
314
315    #[test]
316    fn unlimited_config_always_allows() {
317        let guard = VelocityGuard::new(VelocityConfig::default());
318        let kp = Keypair::generate();
319        let cap = signed_cap(&kp, "cap-unlimited");
320        let scope = ChioScope::default();
321        let agent = kp.public_key().to_hex();
322        let server = "srv".to_string();
323
324        let request = make_request(&cap, &agent, &server);
325        for _ in 0..100 {
326            let ctx = guard_ctx(&request, &scope, &agent, &server, None);
327            let result = guard.evaluate(&ctx).expect("should not error");
328            assert_eq!(result, Verdict::Allow);
329        }
330    }
331
332    #[test]
333    fn allows_requests_up_to_limit() {
334        let guard = VelocityGuard::new(VelocityConfig {
335            max_invocations_per_window: Some(5),
336            max_spend_per_window: None,
337            window_secs: 60,
338            burst_factor: 1.0,
339        });
340
341        let kp = Keypair::generate();
342        let cap = signed_cap(&kp, "cap-limited");
343        let scope = ChioScope::default();
344        let agent = kp.public_key().to_hex();
345        let server = "srv".to_string();
346        let request = make_request(&cap, &agent, &server);
347
348        for i in 0..5 {
349            let ctx = guard_ctx(&request, &scope, &agent, &server, None);
350            let result = guard.evaluate(&ctx).expect("evaluate should not error");
351            assert_eq!(
352                result,
353                Verdict::Allow,
354                "request {i} should be allowed (limit=5)"
355            );
356        }
357    }
358
359    #[test]
360    fn denies_request_exceeding_limit() {
361        let guard = VelocityGuard::new(VelocityConfig {
362            max_invocations_per_window: Some(5),
363            max_spend_per_window: None,
364            window_secs: 60,
365            burst_factor: 1.0,
366        });
367
368        let kp = Keypair::generate();
369        let cap = signed_cap(&kp, "cap-exceed");
370        let scope = ChioScope::default();
371        let agent = kp.public_key().to_hex();
372        let server = "srv".to_string();
373        let request = make_request(&cap, &agent, &server);
374
375        // Exhaust the 5 allowed tokens.
376        for _ in 0..5 {
377            let ctx = guard_ctx(&request, &scope, &agent, &server, None);
378            guard.evaluate(&ctx).expect("should not error");
379        }
380
381        // 6th request must be denied.
382        let ctx = guard_ctx(&request, &scope, &agent, &server, None);
383        let result = guard.evaluate(&ctx).expect("should not error");
384        assert_eq!(result, Verdict::Deny, "6th request should be denied");
385    }
386
387    #[test]
388    fn tokens_refill_after_window() {
389        // 1-second window with limit=2.  After 1.1 seconds the bucket should
390        // have refilled enough to allow at least one more request.
391        let guard = VelocityGuard::new(VelocityConfig {
392            max_invocations_per_window: Some(2),
393            max_spend_per_window: None,
394            window_secs: 1,
395            burst_factor: 1.0,
396        });
397
398        let kp = Keypair::generate();
399        let cap = signed_cap(&kp, "cap-refill");
400        let scope = ChioScope::default();
401        let agent = kp.public_key().to_hex();
402        let server = "srv".to_string();
403        let request = make_request(&cap, &agent, &server);
404
405        // Exhaust the bucket.
406        for _ in 0..2 {
407            let ctx = guard_ctx(&request, &scope, &agent, &server, None);
408            guard.evaluate(&ctx).expect("should not error");
409        }
410
411        // Verify it denies now.
412        {
413            let ctx = guard_ctx(&request, &scope, &agent, &server, None);
414            let result = guard.evaluate(&ctx).expect("should not error");
415            assert_eq!(result, Verdict::Deny, "should deny before refill");
416        }
417
418        // Wait for window to pass.
419        thread::sleep(Duration::from_millis(1100));
420
421        // Must allow again after refill.
422        let ctx = guard_ctx(&request, &scope, &agent, &server, None);
423        let result = guard.evaluate(&ctx).expect("should not error");
424        assert_eq!(result, Verdict::Allow, "should allow after refill");
425    }
426
427    #[test]
428    fn separate_buckets_for_different_grant_indices() {
429        let guard = VelocityGuard::new(VelocityConfig {
430            max_invocations_per_window: Some(1),
431            max_spend_per_window: None,
432            window_secs: 60,
433            burst_factor: 1.0,
434        });
435
436        let kp = Keypair::generate();
437        let cap = signed_cap(&kp, "cap-multi-grant");
438        let scope = ChioScope::default();
439        let agent = kp.public_key().to_hex();
440        let server = "srv".to_string();
441        let request = make_request(&cap, &agent, &server);
442
443        // Exhaust grant_index 0.
444        {
445            let ctx = guard_ctx(&request, &scope, &agent, &server, Some(0));
446            let r = guard.evaluate(&ctx).expect("should not error");
447            assert_eq!(r, Verdict::Allow, "grant 0 first request");
448        }
449        {
450            let ctx = guard_ctx(&request, &scope, &agent, &server, Some(0));
451            let r = guard.evaluate(&ctx).expect("should not error");
452            assert_eq!(r, Verdict::Deny, "grant 0 second request denied");
453        }
454
455        // grant_index 1 should have a fresh bucket.
456        {
457            let ctx = guard_ctx(&request, &scope, &agent, &server, Some(1));
458            let r = guard.evaluate(&ctx).expect("should not error");
459            assert_eq!(r, Verdict::Allow, "grant 1 first request should allow");
460        }
461    }
462
463    #[test]
464    fn separate_buckets_for_different_capability_ids() {
465        let guard = VelocityGuard::new(VelocityConfig {
466            max_invocations_per_window: Some(1),
467            max_spend_per_window: None,
468            window_secs: 60,
469            burst_factor: 1.0,
470        });
471
472        let kp = Keypair::generate();
473        let cap_a = signed_cap(&kp, "cap-a");
474        let cap_b = signed_cap(&kp, "cap-b");
475        let scope = ChioScope::default();
476        let agent = kp.public_key().to_hex();
477        let server = "srv".to_string();
478
479        let request_a = chio_kernel::ToolCallRequest {
480            request_id: "req-a".to_string(),
481            capability: cap_a.clone(),
482            tool_name: "read_file".to_string(),
483            server_id: server.clone(),
484            agent_id: agent.clone(),
485            arguments: serde_json::json!({}),
486            dpop_proof: None,
487            governed_intent: None,
488            approval_token: None,
489            model_metadata: None,
490            federated_origin_kernel_id: None,
491        };
492        let request_b = chio_kernel::ToolCallRequest {
493            request_id: "req-b".to_string(),
494            capability: cap_b.clone(),
495            tool_name: "read_file".to_string(),
496            server_id: server.clone(),
497            agent_id: agent.clone(),
498            arguments: serde_json::json!({}),
499            dpop_proof: None,
500            governed_intent: None,
501            approval_token: None,
502            model_metadata: None,
503            federated_origin_kernel_id: None,
504        };
505
506        // Exhaust cap-a.
507        {
508            let ctx = guard_ctx(&request_a, &scope, &agent, &server, None);
509            guard.evaluate(&ctx).expect("should not error");
510        }
511        {
512            let ctx = guard_ctx(&request_a, &scope, &agent, &server, None);
513            let r = guard.evaluate(&ctx).expect("should not error");
514            assert_eq!(r, Verdict::Deny, "cap-a second request denied");
515        }
516
517        // cap-b should be unaffected.
518        {
519            let ctx = guard_ctx(&request_b, &scope, &agent, &server, None);
520            let r = guard.evaluate(&ctx).expect("should not error");
521            assert_eq!(r, Verdict::Allow, "cap-b first request should allow");
522        }
523    }
524
525    #[test]
526    fn returns_verdict_deny_not_err_when_rate_limited() {
527        let guard = VelocityGuard::new(VelocityConfig {
528            max_invocations_per_window: Some(1),
529            max_spend_per_window: None,
530            window_secs: 60,
531            burst_factor: 1.0,
532        });
533
534        let kp = Keypair::generate();
535        let cap = signed_cap(&kp, "cap-deny-type");
536        let scope = ChioScope::default();
537        let agent = kp.public_key().to_hex();
538        let server = "srv".to_string();
539        let request = make_request(&cap, &agent, &server);
540
541        // Exhaust.
542        {
543            let ctx = guard_ctx(&request, &scope, &agent, &server, None);
544            guard.evaluate(&ctx).expect("should not error");
545        }
546
547        // The result must be Ok(Verdict::Deny), not Err.
548        let ctx = guard_ctx(&request, &scope, &agent, &server, None);
549        let result = guard.evaluate(&ctx);
550        assert!(result.is_ok(), "rate limit must return Ok, not Err");
551        assert_eq!(result.expect("ok"), Verdict::Deny, "must be Verdict::Deny");
552    }
553
554    #[test]
555    fn spend_velocity_allows_up_to_limit() {
556        let guard = VelocityGuard::new(VelocityConfig {
557            max_invocations_per_window: None,
558            max_spend_per_window: Some(300),
559            window_secs: 60,
560            burst_factor: 1.0,
561        });
562
563        let kp = Keypair::generate();
564        let cap = signed_cap(&kp, "cap-spend");
565        let scope = spend_scope(100);
566        let agent = kp.public_key().to_hex();
567        let server = "srv".to_string();
568        let request = make_request(&cap, &agent, &server);
569
570        for i in 0..3 {
571            let ctx = guard_ctx(&request, &scope, &agent, &server, Some(0));
572            let result = guard.evaluate(&ctx).expect("should not error");
573            assert_eq!(
574                result,
575                Verdict::Allow,
576                "spend request {i} should be allowed"
577            );
578        }
579
580        let ctx = guard_ctx(&request, &scope, &agent, &server, Some(0));
581        let result = guard.evaluate(&ctx).expect("should not error");
582        assert_eq!(result, Verdict::Deny, "4th spend request should be denied");
583    }
584
585    #[test]
586    fn spend_velocity_consumes_planned_cost_units() {
587        let guard = VelocityGuard::new(VelocityConfig {
588            max_invocations_per_window: None,
589            max_spend_per_window: Some(250),
590            window_secs: 60,
591            burst_factor: 1.0,
592        });
593
594        let kp = Keypair::generate();
595        let cap = signed_cap(&kp, "cap-spend-costed");
596        let scope = spend_scope(125);
597        let agent = kp.public_key().to_hex();
598        let server = "srv".to_string();
599        let request = make_request(&cap, &agent, &server);
600
601        let first = guard.evaluate(&guard_ctx(&request, &scope, &agent, &server, Some(0)));
602        assert_eq!(first.expect("first spend request"), Verdict::Allow);
603
604        let second = guard.evaluate(&guard_ctx(&request, &scope, &agent, &server, Some(0)));
605        assert_eq!(second.expect("second spend request"), Verdict::Allow);
606
607        let third = guard.evaluate(&guard_ctx(&request, &scope, &agent, &server, Some(0)));
608        assert_eq!(third.expect("third spend request"), Verdict::Deny);
609    }
610
611    #[test]
612    fn spend_velocity_requires_cost_metadata_on_matched_grant() {
613        let guard = VelocityGuard::new(VelocityConfig {
614            max_invocations_per_window: None,
615            max_spend_per_window: Some(10),
616            window_secs: 60,
617            burst_factor: 1.0,
618        });
619
620        let kp = Keypair::generate();
621        let cap = signed_cap(&kp, "cap-spend-missing-cost");
622        let scope = ChioScope {
623            grants: vec![ToolGrant {
624                server_id: "srv".to_string(),
625                tool_name: "read_file".to_string(),
626                operations: vec![Operation::Invoke],
627                constraints: vec![],
628                max_invocations: None,
629                max_cost_per_invocation: None,
630                max_total_cost: None,
631                dpop_required: None,
632            }],
633            ..ChioScope::default()
634        };
635        let agent = kp.public_key().to_hex();
636        let server = "srv".to_string();
637        let request = make_request(&cap, &agent, &server);
638
639        let error = guard
640            .evaluate(&guard_ctx(&request, &scope, &agent, &server, Some(0)))
641            .expect_err("missing cost metadata should fail closed");
642        assert!(
643            error.to_string().contains("max_cost_per_invocation"),
644            "unexpected error: {error}"
645        );
646    }
647}