Skip to main content

sochdb_storage/
admission_control.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Admission Control with Explicit Cost Model
19//!
20//! Implements admission control to prevent unbounded queueing and ensure
21//! stable p99 latency under load spikes. Uses multi-dimensional resource budgets
22//! and fair queuing per tenant.
23//!
24//! ## Design Principles
25//!
26//! 1. **Cost Estimation Before Execution**: Estimate CPU, I/O, and memory costs
27//!    before accepting a query.
28//!
29//! 2. **Multi-Dimensional Tokens**: Separate budgets for:
30//!    - CPU tokens (compute-bound work)
31//!    - Random IOPS tokens (point reads)
32//!    - Sequential bandwidth tokens (scans)
33//!    - Memory tokens (buffer pool pressure)
34//!
35//! 3. **Fair Queuing**: WFQ (Weighted Fair Queuing) or DRR (Deficit Round Robin)
36//!    to prevent tenant starvation.
37//!
38//! 4. **Partial Results**: Opt-in only, with explicit recall degradation warning.
39//!
40//! ## Queueing Theory
41//!
42//! Stability requires: offered_load < service_capacity in each dimension.
43//! Token buckets provide: rate limiting with burst handling.
44//! WFQ provides: O(1) amortized scheduling with fairness guarantees.
45
46use std::collections::HashMap;
47use std::sync::atomic::{AtomicI64, AtomicU64, Ordering};
48use std::sync::Arc;
49use std::time::{Duration, Instant};
50
51use parking_lot::{Mutex, RwLock};
52
53/// Resource dimension for admission control
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
55pub enum ResourceDimension {
56    /// CPU compute tokens (microseconds of CPU time)
57    Cpu,
58    /// Random I/O operations (point reads)
59    RandomIops,
60    /// Sequential bandwidth (MB/s)
61    SequentialBandwidth,
62    /// Memory allocation (bytes)
63    Memory,
64    /// Concurrent connections
65    Connections,
66}
67
68impl ResourceDimension {
69    pub fn name(&self) -> &'static str {
70        match self {
71            ResourceDimension::Cpu => "cpu",
72            ResourceDimension::RandomIops => "random_iops",
73            ResourceDimension::SequentialBandwidth => "seq_bandwidth",
74            ResourceDimension::Memory => "memory",
75            ResourceDimension::Connections => "connections",
76        }
77    }
78}
79
80/// Estimated cost of an operation
81#[derive(Debug, Clone, Default)]
82pub struct OperationCost {
83    /// Estimated CPU time (microseconds)
84    pub cpu_us: u64,
85    /// Estimated random I/O operations
86    pub random_iops: u64,
87    /// Estimated sequential read bytes
88    pub sequential_bytes: u64,
89    /// Estimated memory usage (bytes)
90    pub memory_bytes: u64,
91    /// Priority (higher = more important)
92    pub priority: u32,
93}
94
95impl OperationCost {
96    /// Create a zero-cost estimate (for very cheap operations)
97    pub fn zero() -> Self {
98        Self::default()
99    }
100
101    /// Create a point-read cost estimate
102    pub fn point_read() -> Self {
103        Self {
104            cpu_us: 10,
105            random_iops: 1,
106            sequential_bytes: 0,
107            memory_bytes: 4096,
108            priority: 100,
109        }
110    }
111
112    /// Create a scan cost estimate
113    pub fn scan(rows: usize, row_bytes: usize) -> Self {
114        Self {
115            cpu_us: (rows * 5) as u64, // ~5us per row
116            random_iops: 0,
117            sequential_bytes: (rows * row_bytes) as u64,
118            memory_bytes: (rows * row_bytes).min(64 * 1024 * 1024) as u64, // Cap at 64MB
119            priority: 50,
120        }
121    }
122
123    /// Create a vector search cost estimate
124    pub fn vector_search(dimension: usize, ef_search: usize, candidates: usize) -> Self {
125        // HNSW complexity: O(dimension * ef_search * log(n))
126        let distance_calcs = ef_search * candidates;
127        Self {
128            cpu_us: (distance_calcs * dimension / 100) as u64, // ~100 dims per microsecond
129            random_iops: (ef_search / 10).max(1) as u64, // Random node access
130            sequential_bytes: 0,
131            memory_bytes: (ef_search * dimension * 4) as u64, // Candidate vectors
132            priority: 75,
133        }
134    }
135
136    /// Create a write cost estimate
137    pub fn write(bytes: usize) -> Self {
138        Self {
139            cpu_us: (bytes / 100).max(10) as u64,
140            random_iops: 0,
141            sequential_bytes: bytes as u64, // WAL write
142            memory_bytes: bytes as u64,
143            priority: 100,
144        }
145    }
146
147    /// Total weighted cost for simple comparisons
148    pub fn total_weighted_cost(&self) -> u64 {
149        self.cpu_us + self.random_iops * 100 + self.sequential_bytes / 1024 + self.memory_bytes / 4096
150    }
151}
152
153/// Token bucket for rate limiting
154pub struct TokenBucket {
155    /// Current tokens available
156    tokens: AtomicI64,
157    /// Maximum tokens (bucket capacity)
158    capacity: i64,
159    /// Token refill rate per second
160    refill_rate: i64,
161    /// Last refill timestamp (ms since epoch)
162    last_refill: AtomicU64,
163}
164
165impl TokenBucket {
166    /// Create a new token bucket
167    pub fn new(capacity: i64, refill_rate: i64) -> Self {
168        Self {
169            tokens: AtomicI64::new(capacity),
170            capacity,
171            refill_rate,
172            last_refill: AtomicU64::new(
173                std::time::SystemTime::now()
174                    .duration_since(std::time::UNIX_EPOCH)
175                    .unwrap_or_default()
176                    .as_millis() as u64,
177            ),
178        }
179    }
180
181    /// Try to acquire tokens, returns true if successful
182    pub fn try_acquire(&self, tokens: i64) -> bool {
183        self.refill();
184
185        loop {
186            let current = self.tokens.load(Ordering::Acquire);
187            if current < tokens {
188                return false;
189            }
190            if self
191                .tokens
192                .compare_exchange_weak(
193                    current,
194                    current - tokens,
195                    Ordering::AcqRel,
196                    Ordering::Relaxed,
197                )
198                .is_ok()
199            {
200                return true;
201            }
202        }
203    }
204
205    /// Refill tokens based on elapsed time
206    fn refill(&self) {
207        let now = std::time::SystemTime::now()
208            .duration_since(std::time::UNIX_EPOCH)
209            .unwrap_or_default()
210            .as_millis() as u64;
211
212        let last = self.last_refill.load(Ordering::Relaxed);
213        let elapsed_ms = now.saturating_sub(last);
214
215        if elapsed_ms > 0 {
216            let tokens_to_add = (self.refill_rate * elapsed_ms as i64) / 1000;
217            if tokens_to_add > 0 {
218                if self
219                    .last_refill
220                    .compare_exchange(last, now, Ordering::AcqRel, Ordering::Relaxed)
221                    .is_ok()
222                {
223                    let current = self.tokens.load(Ordering::Relaxed);
224                    let new_tokens = (current + tokens_to_add).min(self.capacity);
225                    self.tokens.store(new_tokens, Ordering::Release);
226                }
227            }
228        }
229    }
230
231    /// Return tokens (for cancelled operations)
232    pub fn release(&self, tokens: i64) {
233        let current = self.tokens.load(Ordering::Relaxed);
234        let new_tokens = (current + tokens).min(self.capacity);
235        self.tokens.store(new_tokens, Ordering::Release);
236    }
237
238    /// Current available tokens
239    pub fn available(&self) -> i64 {
240        self.refill();
241        self.tokens.load(Ordering::Relaxed)
242    }
243
244    /// Utilization ratio (1.0 = empty, 0.0 = full)
245    pub fn utilization(&self) -> f64 {
246        1.0 - (self.available() as f64 / self.capacity as f64)
247    }
248}
249
250/// Per-tenant quota and state
251pub struct TenantQuota {
252    /// Tenant identifier
253    pub tenant_id: String,
254    /// Weight for fair queuing (higher = more share)
255    pub weight: u32,
256    /// Token buckets per resource dimension
257    buckets: HashMap<ResourceDimension, TokenBucket>,
258    /// Deficit counter for DRR
259    deficit: AtomicI64,
260    /// Queue of pending requests
261    pending_count: AtomicU64,
262    /// Total requests processed
263    total_requests: AtomicU64,
264    /// Total requests rejected
265    rejected_requests: AtomicU64,
266}
267
268impl TenantQuota {
269    /// Create a new tenant quota with default limits
270    pub fn new(tenant_id: String, weight: u32) -> Self {
271        let mut buckets = HashMap::new();
272
273        // Default limits per tenant
274        buckets.insert(
275            ResourceDimension::Cpu,
276            TokenBucket::new(10_000_000, 1_000_000), // 10s burst, 1s/s refill
277        );
278        buckets.insert(
279            ResourceDimension::RandomIops,
280            TokenBucket::new(10_000, 1_000), // 10K burst, 1K/s
281        );
282        buckets.insert(
283            ResourceDimension::SequentialBandwidth,
284            TokenBucket::new(1_000_000_000, 100_000_000), // 1GB burst, 100MB/s
285        );
286        buckets.insert(
287            ResourceDimension::Memory,
288            TokenBucket::new(1_000_000_000, 500_000_000), // 1GB burst, 500MB/s
289        );
290        buckets.insert(
291            ResourceDimension::Connections,
292            TokenBucket::new(100, 10), // 100 burst, 10/s
293        );
294
295        Self {
296            tenant_id,
297            weight,
298            buckets,
299            deficit: AtomicI64::new(0),
300            pending_count: AtomicU64::new(0),
301            total_requests: AtomicU64::new(0),
302            rejected_requests: AtomicU64::new(0),
303        }
304    }
305
306    /// Try to acquire resources for an operation
307    pub fn try_acquire(&self, cost: &OperationCost) -> bool {
308        // Check all dimensions
309        let cpu_ok = self
310            .buckets
311            .get(&ResourceDimension::Cpu)
312            .map(|b| b.available() >= cost.cpu_us as i64)
313            .unwrap_or(true);
314
315        let iops_ok = self
316            .buckets
317            .get(&ResourceDimension::RandomIops)
318            .map(|b| b.available() >= cost.random_iops as i64)
319            .unwrap_or(true);
320
321        let bandwidth_ok = self
322            .buckets
323            .get(&ResourceDimension::SequentialBandwidth)
324            .map(|b| b.available() >= cost.sequential_bytes as i64)
325            .unwrap_or(true);
326
327        let memory_ok = self
328            .buckets
329            .get(&ResourceDimension::Memory)
330            .map(|b| b.available() >= cost.memory_bytes as i64)
331            .unwrap_or(true);
332
333        if cpu_ok && iops_ok && bandwidth_ok && memory_ok {
334            // Acquire all resources atomically (best effort)
335            if let Some(b) = self.buckets.get(&ResourceDimension::Cpu) {
336                b.try_acquire(cost.cpu_us as i64);
337            }
338            if let Some(b) = self.buckets.get(&ResourceDimension::RandomIops) {
339                b.try_acquire(cost.random_iops as i64);
340            }
341            if let Some(b) = self.buckets.get(&ResourceDimension::SequentialBandwidth) {
342                b.try_acquire(cost.sequential_bytes as i64);
343            }
344            if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
345                b.try_acquire(cost.memory_bytes as i64);
346            }
347            self.total_requests.fetch_add(1, Ordering::Relaxed);
348            true
349        } else {
350            self.rejected_requests.fetch_add(1, Ordering::Relaxed);
351            false
352        }
353    }
354
355    /// Release resources after operation completes
356    pub fn release(&self, cost: &OperationCost) {
357        if let Some(b) = self.buckets.get(&ResourceDimension::Memory) {
358            b.release(cost.memory_bytes as i64);
359        }
360    }
361
362    /// Get utilization across all dimensions
363    pub fn utilization(&self) -> HashMap<ResourceDimension, f64> {
364        self.buckets
365            .iter()
366            .map(|(dim, bucket)| (*dim, bucket.utilization()))
367            .collect()
368    }
369}
370
371/// Admission decision
372#[derive(Debug, Clone)]
373pub enum AdmissionDecision {
374    /// Request admitted with cost
375    Admit { cost: OperationCost },
376    /// Request rejected due to overload
377    Reject {
378        reason: String,
379        retry_after_ms: Option<u64>,
380    },
381    /// Request can proceed with partial results
382    PartialAdmit {
383        cost: OperationCost,
384        max_results: usize,
385        recall_warning: String,
386    },
387}
388
389/// Admission control configuration
390#[derive(Debug, Clone)]
391pub struct AdmissionConfig {
392    /// Global token bucket capacities
393    pub global_limits: HashMap<ResourceDimension, (i64, i64)>, // (capacity, refill_rate)
394    /// Default tenant weight
395    pub default_tenant_weight: u32,
396    /// Enable partial results
397    pub allow_partial_results: bool,
398    /// Queue depth warning threshold
399    pub queue_depth_warning: usize,
400    /// Queue depth rejection threshold
401    pub queue_depth_rejection: usize,
402}
403
404impl Default for AdmissionConfig {
405    fn default() -> Self {
406        let mut global_limits = HashMap::new();
407        global_limits.insert(ResourceDimension::Cpu, (100_000_000, 10_000_000));
408        global_limits.insert(ResourceDimension::RandomIops, (100_000, 10_000));
409        global_limits.insert(ResourceDimension::SequentialBandwidth, (10_000_000_000, 1_000_000_000));
410        global_limits.insert(ResourceDimension::Memory, (10_000_000_000, 2_000_000_000));
411        global_limits.insert(ResourceDimension::Connections, (1000, 100));
412
413        Self {
414            global_limits,
415            default_tenant_weight: 100,
416            allow_partial_results: false,
417            queue_depth_warning: 100,
418            queue_depth_rejection: 1000,
419        }
420    }
421}
422
423/// Admission controller
424pub struct AdmissionController {
425    config: AdmissionConfig,
426    /// Global token buckets
427    global_buckets: HashMap<ResourceDimension, TokenBucket>,
428    /// Per-tenant quotas
429    tenants: RwLock<HashMap<String, Arc<TenantQuota>>>,
430    /// Current queue depth
431    queue_depth: AtomicU64,
432    /// Metrics
433    metrics: AdmissionMetrics,
434}
435
436/// Admission control metrics
437#[derive(Default)]
438pub struct AdmissionMetrics {
439    pub total_requests: AtomicU64,
440    pub admitted_requests: AtomicU64,
441    pub rejected_requests: AtomicU64,
442    pub partial_requests: AtomicU64,
443    pub avg_queue_depth: AtomicU64,
444}
445
446impl AdmissionController {
447    /// Create a new admission controller
448    pub fn new(config: AdmissionConfig) -> Self {
449        let mut global_buckets = HashMap::new();
450        for (dim, (capacity, rate)) in &config.global_limits {
451            global_buckets.insert(*dim, TokenBucket::new(*capacity, *rate));
452        }
453
454        Self {
455            config,
456            global_buckets,
457            tenants: RwLock::new(HashMap::new()),
458            queue_depth: AtomicU64::new(0),
459            metrics: AdmissionMetrics::default(),
460        }
461    }
462
463    /// Register a tenant
464    pub fn register_tenant(&self, tenant_id: &str, weight: u32) {
465        let mut tenants = self.tenants.write();
466        if !tenants.contains_key(tenant_id) {
467            tenants.insert(
468                tenant_id.to_string(),
469                Arc::new(TenantQuota::new(tenant_id.to_string(), weight)),
470            );
471        }
472    }
473
474    /// Get or create tenant quota
475    fn get_tenant(&self, tenant_id: &str) -> Arc<TenantQuota> {
476        {
477            let tenants = self.tenants.read();
478            if let Some(tenant) = tenants.get(tenant_id) {
479                return tenant.clone();
480            }
481        }
482
483        // Create new tenant
484        let mut tenants = self.tenants.write();
485        tenants
486            .entry(tenant_id.to_string())
487            .or_insert_with(|| {
488                Arc::new(TenantQuota::new(
489                    tenant_id.to_string(),
490                    self.config.default_tenant_weight,
491                ))
492            })
493            .clone()
494    }
495
496    /// Evaluate admission for a request
497    pub fn evaluate(
498        &self,
499        tenant_id: &str,
500        cost: OperationCost,
501        allow_partial: bool,
502    ) -> AdmissionDecision {
503        self.metrics.total_requests.fetch_add(1, Ordering::Relaxed);
504
505        // Check queue depth
506        let depth = self.queue_depth.load(Ordering::Relaxed);
507        if depth >= self.config.queue_depth_rejection as u64 {
508            self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
509            return AdmissionDecision::Reject {
510                reason: format!("Queue depth {} exceeds limit", depth),
511                retry_after_ms: Some(100),
512            };
513        }
514
515        // Check global limits
516        let global_ok = self.check_global_limits(&cost);
517        if !global_ok {
518            self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
519            return AdmissionDecision::Reject {
520                reason: "Global resource limits exceeded".to_string(),
521                retry_after_ms: Some(50),
522            };
523        }
524
525        // Check tenant limits
526        let tenant = self.get_tenant(tenant_id);
527        if tenant.try_acquire(&cost) {
528            self.queue_depth.fetch_add(1, Ordering::Relaxed);
529            self.metrics.admitted_requests.fetch_add(1, Ordering::Relaxed);
530            AdmissionDecision::Admit { cost }
531        } else if allow_partial && self.config.allow_partial_results {
532            // Try with reduced cost
533            let reduced_cost = OperationCost {
534                cpu_us: cost.cpu_us / 4,
535                random_iops: cost.random_iops / 4,
536                sequential_bytes: cost.sequential_bytes / 4,
537                memory_bytes: cost.memory_bytes / 4,
538                priority: cost.priority,
539            };
540            if tenant.try_acquire(&reduced_cost) {
541                self.queue_depth.fetch_add(1, Ordering::Relaxed);
542                self.metrics.partial_requests.fetch_add(1, Ordering::Relaxed);
543                AdmissionDecision::PartialAdmit {
544                    cost: reduced_cost,
545                    max_results: 25, // 25% of normal
546                    recall_warning: "Results limited due to load - recall may be degraded".to_string(),
547                }
548            } else {
549                self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
550                AdmissionDecision::Reject {
551                    reason: format!("Tenant {} quota exceeded", tenant_id),
552                    retry_after_ms: Some(100),
553                }
554            }
555        } else {
556            self.metrics.rejected_requests.fetch_add(1, Ordering::Relaxed);
557            AdmissionDecision::Reject {
558                reason: format!("Tenant {} quota exceeded", tenant_id),
559                retry_after_ms: Some(100),
560            }
561        }
562    }
563
564    /// Check global resource limits
565    fn check_global_limits(&self, cost: &OperationCost) -> bool {
566        let cpu_ok = self
567            .global_buckets
568            .get(&ResourceDimension::Cpu)
569            .map(|b| b.available() >= cost.cpu_us as i64)
570            .unwrap_or(true);
571
572        let iops_ok = self
573            .global_buckets
574            .get(&ResourceDimension::RandomIops)
575            .map(|b| b.available() >= cost.random_iops as i64)
576            .unwrap_or(true);
577
578        cpu_ok && iops_ok
579    }
580
581    /// Complete a request (release resources)
582    pub fn complete(&self, tenant_id: &str, cost: &OperationCost) {
583        self.queue_depth.fetch_sub(1, Ordering::Relaxed);
584        let tenant = self.get_tenant(tenant_id);
585        tenant.release(cost);
586    }
587
588    /// Get current system load
589    pub fn system_load(&self) -> SystemLoad {
590        let mut utilization = HashMap::new();
591        for (dim, bucket) in &self.global_buckets {
592            utilization.insert(*dim, bucket.utilization());
593        }
594
595        SystemLoad {
596            queue_depth: self.queue_depth.load(Ordering::Relaxed),
597            utilization,
598            total_requests: self.metrics.total_requests.load(Ordering::Relaxed),
599            admitted_requests: self.metrics.admitted_requests.load(Ordering::Relaxed),
600            rejected_requests: self.metrics.rejected_requests.load(Ordering::Relaxed),
601        }
602    }
603}
604
605/// Current system load
606#[derive(Debug)]
607pub struct SystemLoad {
608    pub queue_depth: u64,
609    pub utilization: HashMap<ResourceDimension, f64>,
610    pub total_requests: u64,
611    pub admitted_requests: u64,
612    pub rejected_requests: u64,
613}
614
615impl SystemLoad {
616    /// Is the system overloaded?
617    pub fn is_overloaded(&self) -> bool {
618        self.utilization.values().any(|&u| u > 0.9)
619    }
620
621    /// Admission rate
622    pub fn admission_rate(&self) -> f64 {
623        if self.total_requests == 0 {
624            1.0
625        } else {
626            self.admitted_requests as f64 / self.total_requests as f64
627        }
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::*;
634
635    #[test]
636    fn test_token_bucket_basic() {
637        let bucket = TokenBucket::new(100, 10);
638        assert_eq!(bucket.available(), 100);
639
640        assert!(bucket.try_acquire(50));
641        assert_eq!(bucket.available(), 50);
642
643        assert!(!bucket.try_acquire(60));
644        assert_eq!(bucket.available(), 50);
645
646        bucket.release(25);
647        assert_eq!(bucket.available(), 75);
648    }
649
650    #[test]
651    fn test_operation_cost_estimation() {
652        let point_read = OperationCost::point_read();
653        assert!(point_read.random_iops > 0);
654        assert_eq!(point_read.sequential_bytes, 0);
655
656        let scan = OperationCost::scan(1000, 100);
657        assert_eq!(scan.sequential_bytes, 100_000);
658        assert_eq!(scan.random_iops, 0);
659
660        let vector = OperationCost::vector_search(128, 64, 100);
661        assert!(vector.cpu_us > 0);
662    }
663
664    #[test]
665    fn test_admission_controller_basic() {
666        let controller = AdmissionController::new(AdmissionConfig::default());
667        controller.register_tenant("test", 100);
668
669        let cost = OperationCost::point_read();
670        let decision = controller.evaluate("test", cost.clone(), false);
671
672        assert!(matches!(decision, AdmissionDecision::Admit { .. }));
673
674        controller.complete("test", &cost);
675    }
676
677    #[test]
678    fn test_tenant_quota_exhaustion() {
679        let quota = TenantQuota::new("test".to_string(), 100);
680
681        // Exhaust CPU tokens
682        let huge_cost = OperationCost {
683            cpu_us: 100_000_000, // 100 seconds
684            random_iops: 0,
685            sequential_bytes: 0,
686            memory_bytes: 0,
687            priority: 100,
688        };
689
690        assert!(!quota.try_acquire(&huge_cost));
691    }
692}