Skip to main content

sochdb_storage/
admission_control.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Admission Control with Explicit Cost Model
19//!
20//! Implements admission control to prevent unbounded queueing and ensure
21//! stable p99 latency under load spikes. Uses multi-dimensional resource budgets
22//! and fair queuing per tenant.
23//!
24//! ## Design Principles
25//!
26//! 1. **Cost Estimation Before Execution**: Estimate CPU, I/O, and memory costs
27//!    before accepting a query.
28//!
29//! 2. **Multi-Dimensional Tokens**: Separate budgets for:
30//!    - CPU tokens (compute-bound work)
31//!    - Random IOPS tokens (point reads)
32//!    - Sequential bandwidth tokens (scans)
33//!    - Memory tokens (buffer pool pressure)
34//!
35//! 3. **Fair Queuing**: WFQ (Weighted Fair Queuing) or DRR (Deficit Round Robin)
36//!    to prevent tenant starvation.
37//!
38//! 4. **Partial Results**: Opt-in only, with explicit recall degradation warning.
39//!
40//! ## Queueing Theory
41//!
42//! Stability requires: offered_load < service_capacity in each dimension.
43//! Token buckets provide: rate limiting with burst handling.
44//! WFQ provides: O(1) amortized scheduling with fairness guarantees.
45
46use std::collections::HashMap;
47use std::sync::Arc;
48use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
49
50use parking_lot::RwLock;
51
52/// Resource dimension for admission control
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub enum ResourceDimension {
55    /// CPU compute tokens (microseconds of CPU time)
56    Cpu,
57    /// Random I/O operations (point reads)
58    RandomIops,
59    /// Sequential bandwidth (MB/s)
60    SequentialBandwidth,
61    /// Memory allocation (bytes)
62    Memory,
63    /// Concurrent connections
64    Connections,
65}
66
67impl ResourceDimension {
68    pub fn name(&self) -> &'static str {
69        match self {
70            ResourceDimension::Cpu => "cpu",
71            ResourceDimension::RandomIops => "random_iops",
72            ResourceDimension::SequentialBandwidth => "seq_bandwidth",
73            ResourceDimension::Memory => "memory",
74            ResourceDimension::Connections => "connections",
75        }
76    }
77}
78
79/// Estimated cost of an operation
80#[derive(Debug, Clone, Default)]
81pub struct OperationCost {
82    /// Estimated CPU time (microseconds)
83    pub cpu_us: u64,
84    /// Estimated random I/O operations
85    pub random_iops: u64,
86    /// Estimated sequential read bytes
87    pub sequential_bytes: u64,
88    /// Estimated memory usage (bytes)
89    pub memory_bytes: u64,
90    /// Priority (higher = more important)
91    pub priority: u32,
92}
93
94impl OperationCost {
95    /// Create a zero-cost estimate (for very cheap operations)
96    pub fn zero() -> Self {
97        Self::default()
98    }
99
100    /// Create a point-read cost estimate
101    pub fn point_read() -> Self {
102        Self {
103            cpu_us: 10,
104            random_iops: 1,
105            sequential_bytes: 0,
106            memory_bytes: 4096,
107            priority: 100,
108        }
109    }
110
111    /// Create a scan cost estimate
112    pub fn scan(rows: usize, row_bytes: usize) -> Self {
113        Self {
114            cpu_us: (rows * 5) as u64, // ~5us per row
115            random_iops: 0,
116            sequential_bytes: (rows * row_bytes) as u64,
117            memory_bytes: (rows * row_bytes).min(64 * 1024 * 1024) as u64, // Cap at 64MB
118            priority: 50,
119        }
120    }
121
122    /// Create a vector search cost estimate
123    pub fn vector_search(dimension: usize, ef_search: usize, candidates: usize) -> Self {
124        // HNSW complexity: O(dimension * ef_search * log(n))
125        let distance_calcs = ef_search * candidates;
126        Self {
127            cpu_us: (distance_calcs * dimension / 100) as u64, // ~100 dims per microsecond
128            random_iops: (ef_search / 10).max(1) as u64,       // Random node access
129            sequential_bytes: 0,
130            memory_bytes: (ef_search * dimension * 4) as u64, // Candidate vectors
131            priority: 75,
132        }
133    }
134
135    /// Create a write cost estimate
136    pub fn write(bytes: usize) -> Self {
137        Self {
138            cpu_us: (bytes / 100).max(10) as u64,
139            random_iops: 0,
140            sequential_bytes: bytes as u64, // WAL write
141            memory_bytes: bytes as u64,
142            priority: 100,
143        }
144    }
145
146    /// Total weighted cost for simple comparisons
147    pub fn total_weighted_cost(&self) -> u64 {
148        self.cpu_us
149            + self.random_iops * 100
150            + self.sequential_bytes / 1024
151            + self.memory_bytes / 4096
152    }
153}
154
155/// Token bucket for rate limiting
156pub struct TokenBucket {
157    /// Current tokens available
158    tokens: AtomicI64,
159    /// Maximum tokens (bucket capacity)
160    capacity: i64,
161    /// Token refill rate per second
162    refill_rate: i64,
163    /// Last refill timestamp (ms since epoch)
164    last_refill: AtomicU64,
165}
166
167impl TokenBucket {
168    /// Create a new token bucket
169    pub fn new(capacity: i64, refill_rate: i64) -> Self {
170        Self {
171            tokens: AtomicI64::new(capacity),
172            capacity,
173            refill_rate,
174            last_refill: AtomicU64::new(
175                std::time::SystemTime::now()
176                    .duration_since(std::time::UNIX_EPOCH)
177                    .unwrap_or_default()
178                    .as_millis() as u64,
179            ),
180        }
181    }
182
183    /// Try to acquire tokens, returns true if successful
184    pub fn try_acquire(&self, tokens: i64) -> bool {
185        self.refill();
186
187        loop {
188            let current = self.tokens.load(Ordering::Acquire);
189            if current < tokens {
190                return false;
191            }
192            if self
193                .tokens
194                .compare_exchange_weak(
195                    current,
196                    current - tokens,
197                    Ordering::AcqRel,
198                    Ordering::Relaxed,
199                )
200                .is_ok()
201            {
202                return true;
203            }
204        }
205    }
206
207    /// Refill tokens based on elapsed time
208    fn refill(&self) {
209        let now = std::time::SystemTime::now()
210            .duration_since(std::time::UNIX_EPOCH)
211            .unwrap_or_default()
212            .as_millis() as u64;
213
214        let last = self.last_refill.load(Ordering::Relaxed);
215        let elapsed_ms = now.saturating_sub(last);
216
217        if elapsed_ms > 0 {
218            let tokens_to_add = (self.refill_rate * elapsed_ms as i64) / 1000;
219            if tokens_to_add > 0 {
220                if self
221                    .last_refill
222                    .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
223                    .is_ok()
224                {
225                    let current = self.tokens.load(Ordering::Relaxed);
226                    let new_tokens = (current + tokens_to_add).min(self.capacity);
227                    self.tokens.store(new_tokens, Ordering::Release);
228                }
229            }
230        }
231    }
232
233    /// Return tokens (for cancelled operations)
234    pub fn release(&self, tokens: i64) {
235        let current = self.tokens.load(Ordering::Relaxed);
236        let new_tokens = (current + tokens).min(self.capacity);
237        self.tokens.store(new_tokens, Ordering::Release);
238    }
239
240    /// Current available tokens
241    pub fn available(&self) -> i64 {
242        self.refill();
243        self.tokens.load(Ordering::Relaxed)
244    }
245
246    /// Utilization ratio (1.0 = empty, 0.0 = full)
247    pub fn utilization(&self) -> f64 {
248        1.0 - (self.available() as f64 / self.capacity as f64)
249    }
250}
251
252/// Per-tenant quota and state
253pub struct TenantQuota {
254    /// Tenant identifier
255    pub tenant_id: String,
256    /// Weight for fair queuing (higher = more share)
257    pub weight: u32,
258    /// Token buckets per resource dimension
259    buckets: HashMap<ResourceDimension, TokenBucket>,
260    /// Deficit counter for DRR
261    deficit: AtomicI64,
262    /// Queue of pending requests
263    pending_count: AtomicU64,
264    /// Total requests processed
265    total_requests: AtomicU64,
266    /// Total requests rejected
267    rejected_requests: AtomicU64,
268}
269
270impl TenantQuota {
271    /// Create a new tenant quota with default limits
272    pub fn new(tenant_id: String, weight: u32) -> Self {
273        let mut buckets = HashMap::new();
274
275        // Default limits per tenant
276        buckets.insert(
277            ResourceDimension::Cpu,
278            TokenBucket::new(10_000_000, 1_000_000), // 10s burst, 1s/s refill
279        );
280        buckets.insert(
281            ResourceDimension::RandomIops,
282            TokenBucket::new(10_000, 1_000), // 10K burst, 1K/s
283        );
284        buckets.insert(
285            ResourceDimension::SequentialBandwidth,
286            TokenBucket::new(1_000_000_000, 100_000_000), // 1GB burst, 100MB/s
287        );
288        buckets.insert(
289            ResourceDimension::Memory,
290            TokenBucket::new(1_000_000_000, 500_000_000), // 1GB burst, 500MB/s
291        );
292        buckets.insert(
293            ResourceDimension::Connections,
294            TokenBucket::new(100, 10), // 100 burst, 10/s
295        );
296
297        Self {
298            tenant_id,
299            weight,
300            buckets,
301            deficit: AtomicI64::new(0),
302            pending_count: AtomicU64::new(0),
303            total_requests: AtomicU64::new(0),
304            rejected_requests: AtomicU64::new(0),
305        }
306    }
307
308    /// Try to acquire resources for an operation
309    pub fn try_acquire(&self, cost: &OperationCost) -> bool {
310        // Check all dimensions
311        let cpu_ok = self
312            .buckets
313            .get(&ResourceDimension::Cpu)
314            .map(|b| b.available() >= cost.cpu_us as i64)
315            .unwrap_or(true);
316
317        let iops_ok = self
318            .buckets
319            .get(&ResourceDimension::RandomIops)
320            .map(|b| b.available() >= cost.random_iops as i64)
321            .unwrap_or(true);
322
323        let bandwidth_ok = self
324            .buckets
325            .get(&ResourceDimension::SequentialBandwidth)
326            .map(|b| b.available() >= cost.sequential_bytes as i64)
327            .unwrap_or(true);
328
329        let memory_ok = self
330            .buckets
331            .get(&ResourceDimension::Memory)
332            .map(|b| b.available() >= cost.memory_bytes as i64)
333            .unwrap_or(true);
334
335        if cpu_ok && iops_ok && bandwidth_ok && memory_ok {
336            // Acquire all resources atomically (best effort)
337            if let Some(b) = self.buckets.get(&ResourceDimension::Cpu) {
338                b.try_acquire(cost.cpu_us as i64);
339            }
340            if let Some(b) = self.buckets.get(&ResourceDimension::RandomIops) {
341                b.try_acquire(cost.random_iops as i64);
342            }
343            if let Some(b) = self.buckets.get(&ResourceDimension::SequentialBandwidth) {
344                b.try_acquire(cost.sequential_bytes as i64);
345            }
346            if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
347                b.try_acquire(cost.memory_bytes as i64);
348            }
349            self.total_requests.fetch_add(1, Ordering::Relaxed);
350            true
351        } else {
352            self.rejected_requests.fetch_add(1, Ordering::Relaxed);
353            false
354        }
355    }
356
357    /// Release resources after operation completes
358    pub fn release(&self, cost: &OperationCost) {
359        if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
360            b.release(cost.memory_bytes as i64);
361        }
362    }
363
364    /// Get utilization across all dimensions
365    pub fn utilization(&self) -> HashMap<ResourceDimension, f64> {
366        self.buckets
367            .iter()
368            .map(|(dim, bucket)| (*dim, bucket.utilization()))
369            .collect()
370    }
371}
372
373/// Admission decision
374#[derive(Debug, Clone)]
375pub enum AdmissionDecision {
376    /// Request admitted with cost
377    Admit { cost: OperationCost },
378    /// Request rejected due to overload
379    Reject {
380        reason: String,
381        retry_after_ms: Option<u64>,
382    },
383    /// Request can proceed with partial results
384    PartialAdmit {
385        cost: OperationCost,
386        max_results: usize,
387        recall_warning: String,
388    },
389}
390
391/// Admission control configuration
392#[derive(Debug, Clone)]
393pub struct AdmissionConfig {
394    /// Global token bucket capacities
395    pub global_limits: HashMap<ResourceDimension, (i64, i64)>, // (capacity, refill_rate)
396    /// Default tenant weight
397    pub default_tenant_weight: u32,
398    /// Enable partial results
399    pub allow_partial_results: bool,
400    /// Queue depth warning threshold
401    pub queue_depth_warning: usize,
402    /// Queue depth rejection threshold
403    pub queue_depth_rejection: usize,
404}
405
406impl Default for AdmissionConfig {
407    fn default() -> Self {
408        let mut global_limits = HashMap::new();
409        global_limits.insert(ResourceDimension::Cpu, (100_000_000, 10_000_000));
410        global_limits.insert(ResourceDimension::RandomIops, (100_000, 10_000));
411        global_limits.insert(
412            ResourceDimension::SequentialBandwidth,
413            (10_000_000_000, 1_000_000_000),
414        );
415        global_limits.insert(ResourceDimension::Memory, (10_000_000_000, 2_000_000_000));
416        global_limits.insert(ResourceDimension::Connections, (1000, 100));
417
418        Self {
419            global_limits,
420            default_tenant_weight: 100,
421            allow_partial_results: false,
422            queue_depth_warning: 100,
423            queue_depth_rejection: 1000,
424        }
425    }
426}
427
428/// Admission controller
429pub struct AdmissionController {
430    config: AdmissionConfig,
431    /// Global token buckets
432    global_buckets: HashMap<ResourceDimension, TokenBucket>,
433    /// Per-tenant quotas
434    tenants: RwLock<HashMap<String, Arc<TenantQuota>>>,
435    /// Current queue depth
436    queue_depth: AtomicU64,
437    /// Metrics
438    metrics: AdmissionMetrics,
439}
440
441/// Admission control metrics
442#[derive(Default)]
443pub struct AdmissionMetrics {
444    pub total_requests: AtomicU64,
445    pub admitted_requests: AtomicU64,
446    pub rejected_requests: AtomicU64,
447    pub partial_requests: AtomicU64,
448    pub avg_queue_depth: AtomicU64,
449}
450
451impl AdmissionController {
452    /// Create a new admission controller
453    pub fn new(config: AdmissionConfig) -> Self {
454        let mut global_buckets = HashMap::new();
455        for (dim, (capacity, rate)) in &config.global_limits {
456            global_buckets.insert(*dim, TokenBucket::new(*capacity, *rate));
457        }
458
459        Self {
460            config,
461            global_buckets,
462            tenants: RwLock::new(HashMap::new()),
463            queue_depth: AtomicU64::new(0),
464            metrics: AdmissionMetrics::default(),
465        }
466    }
467
468    /// Register a tenant
469    pub fn register_tenant(&self, tenant_id: &str, weight: u32) {
470        let mut tenants = self.tenants.write();
471        if !tenants.contains_key(tenant_id) {
472            tenants.insert(
473                tenant_id.to_string(),
474                Arc::new(TenantQuota::new(tenant_id.to_string(), weight)),
475            );
476        }
477    }
478
479    /// Get or create tenant quota
480    fn get_tenant(&self, tenant_id: &str) -> Arc<TenantQuota> {
481        {
482            let tenants = self.tenants.read();
483            if let Some(tenant) = tenants.get(tenant_id) {
484                return tenant.clone();
485            }
486        }
487
488        // Create new tenant
489        let mut tenants = self.tenants.write();
490        tenants
491            .entry(tenant_id.to_string())
492            .or_insert_with(|| {
493                Arc::new(TenantQuota::new(
494                    tenant_id.to_string(),
495                    self.config.default_tenant_weight,
496                ))
497            })
498            .clone()
499    }
500
501    /// Evaluate admission for a request
502    pub fn evaluate(
503        &self,
504        tenant_id: &str,
505        cost: OperationCost,
506        allow_partial: bool,
507    ) -> AdmissionDecision {
508        self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
509
510        // Check queue depth
511        let depth = self.queue_depth.load(Ordering::Relaxed);
512        if depth >= self.config.queue_depth_rejection as u64 {
513            self.metrics
514                .rejected_requests
515                .fetch_add(1, Ordering::Relaxed);
516            return AdmissionDecision::Reject {
517                reason: format!("Queue depth {} exceeds limit", depth),
518                retry_after_ms: Some(100),
519            };
520        }
521
522        // Check global limits
523        let global_ok = self.check_global_limits(&cost);
524        if !global_ok {
525            self.metrics
526                .rejected_requests
527                .fetch_add(1, Ordering::Relaxed);
528            return AdmissionDecision::Reject {
529                reason: "Global resource limits exceeded".to_string(),
530                retry_after_ms: Some(50),
531            };
532        }
533
534        // Check tenant limits
535        let tenant = self.get_tenant(tenant_id);
536        if tenant.try_acquire(&cost) {
537            self.queue_depth.fetch_add(1, Ordering::Relaxed);
538            self.metrics
539                .admitted_requests
540                .fetch_add(1, Ordering::Relaxed);
541            AdmissionDecision::Admit { cost }
542        } else if allow_partial && self.config.allow_partial_results {
543            // Try with reduced cost
544            let reduced_cost = OperationCost {
545                cpu_us: cost.cpu_us / 4,
546                random_iops: cost.random_iops / 4,
547                sequential_bytes: cost.sequential_bytes / 4,
548                memory_bytes: cost.memory_bytes / 4,
549                priority: cost.priority,
550            };
551            if tenant.try_acquire(&reduced_cost) {
552                self.queue_depth.fetch_add(1, Ordering::Relaxed);
553                self.metrics
554                    .partial_requests
555                    .fetch_add(1, Ordering::Relaxed);
556                AdmissionDecision::PartialAdmit {
557                    cost: reduced_cost,
558                    max_results: 25, // 25% of normal
559                    recall_warning: "Results limited due to load - recall may be degraded"
560                        .to_string(),
561                }
562            } else {
563                self.metrics
564                    .rejected_requests
565                    .fetch_add(1, Ordering::Relaxed);
566                AdmissionDecision::Reject {
567                    reason: format!("Tenant {} quota exceeded", tenant_id),
568                    retry_after_ms: Some(100),
569                }
570            }
571        } else {
572            self.metrics
573                .rejected_requests
574                .fetch_add(1, Ordering::Relaxed);
575            AdmissionDecision::Reject {
576                reason: format!("Tenant {} quota exceeded", tenant_id),
577                retry_after_ms: Some(100),
578            }
579        }
580    }
581
582    /// Check global resource limits
583    fn check_global_limits(&self, cost: &OperationCost) -> bool {
584        let cpu_ok = self
585            .global_buckets
586            .get(&ResourceDimension::Cpu)
587            .map(|b| b.available() >= cost.cpu_us as i64)
588            .unwrap_or(true);
589
590        let iops_ok = self
591            .global_buckets
592            .get(&ResourceDimension::RandomIops)
593            .map(|b| b.available() >= cost.random_iops as i64)
594            .unwrap_or(true);
595
596        cpu_ok && iops_ok
597    }
598
599    /// Complete a request (release resources)
600    pub fn complete(&self, tenant_id: &str, cost: &OperationCost) {
601        self.queue_depth.fetch_sub(1, Ordering::Relaxed);
602        let tenant = self.get_tenant(tenant_id);
603        tenant.release(cost);
604    }
605
606    /// Get current system load
607    pub fn system_load(&self) -> SystemLoad {
608        let mut utilization = HashMap::new();
609        for (dim, bucket) in &self.global_buckets {
610            utilization.insert(*dim, bucket.utilization());
611        }
612
613        SystemLoad {
614            queue_depth: self.queue_depth.load(Ordering::Relaxed),
615            utilization,
616            total_requests: self.metrics.total_requests.load(Ordering::Relaxed),
617            admitted_requests: self.metrics.admitted_requests.load(Ordering::Relaxed),
618            rejected_requests: self.metrics.rejected_requests.load(Ordering::Relaxed),
619        }
620    }
621}
622
623/// Current system load
624#[derive(Debug)]
625pub struct SystemLoad {
626    pub queue_depth: u64,
627    pub utilization: HashMap<ResourceDimension, f64>,
628    pub total_requests: u64,
629    pub admitted_requests: u64,
630    pub rejected_requests: u64,
631}
632
633impl SystemLoad {
634    /// Is the system overloaded?
635    pub fn is_overloaded(&self) -> bool {
636        self.utilization.values().any(|&u| u > 0.9)
637    }
638
639    /// Admission rate
640    pub fn admission_rate(&self) -> f64 {
641        if self.total_requests == 0 {
642            1.0
643        } else {
644            self.admitted_requests as f64 / self.total_requests as f64
645        }
646    }
647}
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652
653    #[test]
654    fn test_token_bucket_basic() {
655        let bucket = TokenBucket::new(100, 10);
656        assert_eq!(bucket.available(), 100);
657
658        assert!(bucket.try_acquire(50));
659        assert_eq!(bucket.available(), 50);
660
661        assert!(!bucket.try_acquire(60));
662        assert_eq!(bucket.available(), 50);
663
664        bucket.release(25);
665        assert_eq!(bucket.available(), 75);
666    }
667
668    #[test]
669    fn test_operation_cost_estimation() {
670        let point_read = OperationCost::point_read();
671        assert!(point_read.random_iops > 0);
672        assert_eq!(point_read.sequential_bytes, 0);
673
674        let scan = OperationCost::scan(1000, 100);
675        assert_eq!(scan.sequential_bytes, 100_000);
676        assert_eq!(scan.random_iops, 0);
677
678        let vector = OperationCost::vector_search(128, 64, 100);
679        assert!(vector.cpu_us > 0);
680    }
681
682    #[test]
683    fn test_admission_controller_basic() {
684        let controller = AdmissionController::new(AdmissionConfig::default());
685        controller.register_tenant("test", 100);
686
687        let cost = OperationCost::point_read();
688        let decision = controller.evaluate("test", cost.clone(), false);
689
690        assert!(matches!(decision, AdmissionDecision::Admit { .. }));
691
692        controller.complete("test", &cost);
693    }
694
695    #[test]
696    fn test_tenant_quota_exhaustion() {
697        let quota = TenantQuota::new("test".to_string(), 100);
698
699        // Exhaust CPU tokens
700        let huge_cost = OperationCost {
701            cpu_us: 100_000_000, // 100 seconds
702            random_iops: 0,
703            sequential_bytes: 0,
704            memory_bytes: 0,
705            priority: 100,
706        };
707
708        assert!(!quota.try_acquire(&huge_cost));
709    }
710}