Skip to main content

sochdb_vector/
cost_model.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! # Cost Model and Per-Query Budgets (Task 1)
19//!
20//! This module provides an explicit cost model with enforceable per-query budgets
21//! to stabilize p99 latency under load while preserving recall targets.
22//!
23//! ## Architecture
24//!
25//! The cost model is "bytes-moved first" and enforces runtime limits on:
26//! - RAM bytes scanned for candidate generation
27//! - SSD random reads allowed in hot path (ideally 0)
28//! - SSD sequential bytes allowed for rerank batching
29//! - CPU cycles spent in routing/scan
30//!
31//! ## Math/Algorithm
32//!
33//! Constrained optimization: minimize E[bytes scanned] subject to:
34//! - P(recall@k ≥ ρ) ≥ 1−δ
35//! - p99 ≤ T
36//!
37//! Convert latency SLA into budgets:
38//! - Bytes ≤ BW_eff · T
39//! - RandomIO ≤ ⌊T / L_io⌋
40//!
41//! ## Usage
42//!
43//! ```rust,ignore
44//! use sochdb_vector::cost_model::{QueryBudget, CostTracker, AdmissionController};
45//!
46//! // Define budget for query class
47//! let budget = QueryBudget::new("high_recall")
48//!     .ram_bytes(16 * 1024 * 1024)  // 16 MB RAM scan
49//!     .ssd_random_reads(0)           // No random reads in hot path
50//!     .ssd_sequential_bytes(4 * 1024 * 1024)  // 4 MB sequential for rerank
51//!     .cpu_cycles(1_000_000_000);    // ~1B cycles
52//!
53//! // Track costs during query execution
54//! let mut tracker = CostTracker::new(budget);
55//! tracker.add_ram_bytes(1024);
56//! if tracker.is_exhausted() {
57//!     // Return best-known results under budget
58//! }
59//! ```
60
61use std::sync::Arc;
62use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
63use std::time::{Duration, Instant};
64
65// ============================================================================
66// Query Budget Definition
67// ============================================================================
68
69/// Per-query budget limits derived from SLA targets
70#[derive(Debug, Clone)]
71pub struct QueryBudget {
72    /// Query class identifier (e.g., "low_latency", "high_recall", "balanced")
73    pub query_class: String,
74
75    /// Maximum RAM bytes scanned for candidate generation
76    /// This bounds memory bandwidth consumption
77    pub ram_bytes_limit: u64,
78
79    /// Maximum SSD random reads in hot path (ideally 0)
80    /// Random IO is the main source of p99 variance
81    pub ssd_random_reads_limit: u32,
82
83    /// Maximum SSD sequential bytes for rerank batching
84    /// Sequential IO is more predictable than random
85    pub ssd_sequential_bytes_limit: u64,
86
87    /// Maximum CPU cycles for routing and scan
88    /// Converted from latency target: cycles = latency_ns * freq_ghz
89    pub cpu_cycles_limit: u64,
90
91    /// Target latency SLA (e.g., p99 ≤ 10ms)
92    pub latency_target: Duration,
93
94    /// Target recall floor (e.g., recall@10 ≥ 0.95)
95    pub recall_target: f32,
96
97    /// Probability of meeting recall target (1 - δ)
98    pub recall_confidence: f32,
99}
100
101impl QueryBudget {
102    /// Create a new budget for a query class
103    pub fn new(query_class: &str) -> Self {
104        Self {
105            query_class: query_class.to_string(),
106            ram_bytes_limit: u64::MAX,
107            ssd_random_reads_limit: u32::MAX,
108            ssd_sequential_bytes_limit: u64::MAX,
109            cpu_cycles_limit: u64::MAX,
110            latency_target: Duration::from_millis(100),
111            recall_target: 0.95,
112            recall_confidence: 0.99,
113        }
114    }
115
116    /// Set RAM bytes limit
117    pub fn ram_bytes(mut self, limit: u64) -> Self {
118        self.ram_bytes_limit = limit;
119        self
120    }
121
122    /// Set SSD random reads limit
123    pub fn ssd_random_reads(mut self, limit: u32) -> Self {
124        self.ssd_random_reads_limit = limit;
125        self
126    }
127
128    /// Set SSD sequential bytes limit
129    pub fn ssd_sequential_bytes(mut self, limit: u64) -> Self {
130        self.ssd_sequential_bytes_limit = limit;
131        self
132    }
133
134    /// Set CPU cycles limit
135    pub fn cpu_cycles(mut self, limit: u64) -> Self {
136        self.cpu_cycles_limit = limit;
137        self
138    }
139
140    /// Set latency target
141    pub fn latency_target(mut self, target: Duration) -> Self {
142        self.latency_target = target;
143        self
144    }
145
146    /// Set recall target
147    pub fn recall_target(mut self, target: f32, confidence: f32) -> Self {
148        self.recall_target = target;
149        self.recall_confidence = confidence;
150        self
151    }
152
153    /// Create budget from SLA parameters
154    ///
155    /// Converts latency SLA into resource budgets:
156    /// - Bytes ≤ BW_eff · T
157    /// - RandomIO ≤ ⌊T / L_io⌋
158    pub fn from_sla(
159        query_class: &str,
160        latency_target: Duration,
161        recall_target: f32,
162        hardware: &HardwareProfile,
163    ) -> Self {
164        let t_ns = latency_target.as_nanos() as u64;
165
166        // Bytes ≤ BW_eff · T
167        // Assume ~50% of latency budget for memory operations
168        let ram_bytes = (hardware.ram_bandwidth_gbps as u64 * t_ns / 2) / 1_000_000_000;
169
170        // RandomIO ≤ ⌊T / L_io⌋
171        // Each random IO takes ~100μs on SSD
172        let ssd_random = (t_ns / hardware.ssd_random_latency_ns) as u32;
173
174        // Sequential bytes based on SSD bandwidth
175        let ssd_seq = (hardware.ssd_seq_bandwidth_mbps as u64 * t_ns) / 1_000_000_000;
176
177        // CPU cycles = latency_ns * freq_ghz
178        let cpu_cycles = t_ns * hardware.cpu_freq_ghz as u64;
179
180        Self {
181            query_class: query_class.to_string(),
182            ram_bytes_limit: ram_bytes,
183            ssd_random_reads_limit: ssd_random,
184            ssd_sequential_bytes_limit: ssd_seq,
185            cpu_cycles_limit: cpu_cycles,
186            latency_target,
187            recall_target,
188            recall_confidence: 0.99,
189        }
190    }
191
192    /// Predefined budget for low-latency queries (p99 ≤ 5ms)
193    pub fn low_latency() -> Self {
194        Self::new("low_latency")
195            .ram_bytes(4 * 1024 * 1024) // 4 MB
196            .ssd_random_reads(0) // No random IO
197            .ssd_sequential_bytes(0) // No SSD access
198            .cpu_cycles(500_000_000) // ~0.5B cycles
199            .latency_target(Duration::from_millis(5))
200            .recall_target(0.80, 0.95)
201    }
202
203    /// Predefined budget for balanced queries (p99 ≤ 20ms)
204    pub fn balanced() -> Self {
205        Self::new("balanced")
206            .ram_bytes(16 * 1024 * 1024) // 16 MB
207            .ssd_random_reads(0) // No random IO
208            .ssd_sequential_bytes(2 * 1024 * 1024) // 2 MB sequential
209            .cpu_cycles(2_000_000_000) // ~2B cycles
210            .latency_target(Duration::from_millis(20))
211            .recall_target(0.90, 0.99)
212    }
213
214    /// Predefined budget for high-recall queries (p99 ≤ 100ms)
215    pub fn high_recall() -> Self {
216        Self::new("high_recall")
217            .ram_bytes(64 * 1024 * 1024) // 64 MB
218            .ssd_random_reads(16) // Limited random IO
219            .ssd_sequential_bytes(8 * 1024 * 1024) // 8 MB sequential
220            .cpu_cycles(10_000_000_000) // ~10B cycles
221            .latency_target(Duration::from_millis(100))
222            .recall_target(0.99, 0.999)
223    }
224}
225
226// ============================================================================
227// Hardware Profile
228// ============================================================================
229
230/// Hardware characteristics for SLA-to-budget conversion
231#[derive(Debug, Clone)]
232pub struct HardwareProfile {
233    /// RAM bandwidth in GB/s
234    pub ram_bandwidth_gbps: f32,
235
236    /// SSD random read latency in nanoseconds
237    pub ssd_random_latency_ns: u64,
238
239    /// SSD sequential read bandwidth in MB/s
240    pub ssd_seq_bandwidth_mbps: u32,
241
242    /// CPU frequency in GHz
243    pub cpu_freq_ghz: f32,
244
245    /// LLC (Last-Level Cache) size in bytes
246    pub llc_size_bytes: usize,
247}
248
249impl Default for HardwareProfile {
250    fn default() -> Self {
251        Self {
252            ram_bandwidth_gbps: 50.0,         // Typical DDR4
253            ssd_random_latency_ns: 100_000,   // 100μs for NVMe
254            ssd_seq_bandwidth_mbps: 3000,     // 3 GB/s NVMe
255            cpu_freq_ghz: 3.5,                // Typical server CPU
256            llc_size_bytes: 32 * 1024 * 1024, // 32 MB LLC
257        }
258    }
259}
260
261impl HardwareProfile {
262    /// Profile for high-end server (AWS c6i.8xlarge equivalent)
263    pub fn high_end_server() -> Self {
264        Self {
265            ram_bandwidth_gbps: 100.0,
266            ssd_random_latency_ns: 50_000,
267            ssd_seq_bandwidth_mbps: 5000,
268            cpu_freq_ghz: 3.8,
269            llc_size_bytes: 48 * 1024 * 1024,
270        }
271    }
272
273    /// Profile for standard server (AWS c6i.2xlarge equivalent)
274    pub fn standard_server() -> Self {
275        Self::default()
276    }
277
278    /// Profile for embedded/edge deployment
279    pub fn embedded() -> Self {
280        Self {
281            ram_bandwidth_gbps: 25.0,
282            ssd_random_latency_ns: 200_000,
283            ssd_seq_bandwidth_mbps: 500,
284            cpu_freq_ghz: 2.0,
285            llc_size_bytes: 8 * 1024 * 1024,
286        }
287    }
288}
289
290// ============================================================================
291// Cost Tracker
292// ============================================================================
293
294/// Tracks resource consumption during query execution
295#[derive(Debug)]
296pub struct CostTracker {
297    /// Budget being enforced
298    budget: QueryBudget,
299
300    /// RAM bytes consumed
301    ram_bytes: AtomicU64,
302
303    /// SSD random reads performed
304    ssd_random_reads: AtomicU64,
305
306    /// SSD sequential bytes read
307    ssd_sequential_bytes: AtomicU64,
308
309    /// Estimated CPU cycles consumed
310    cpu_cycles: AtomicU64,
311
312    /// Query start time
313    start_time: Instant,
314
315    /// Whether budget is exhausted
316    exhausted: std::sync::atomic::AtomicBool,
317
318    /// Reason for exhaustion (if any)
319    exhaustion_reason: parking_lot::Mutex<Option<BudgetExhaustionReason>>,
320}
321
322/// Reason why budget was exhausted
323#[derive(Debug, Clone, Copy, PartialEq, Eq)]
324pub enum BudgetExhaustionReason {
325    RamBytesExceeded,
326    SsdRandomReadsExceeded,
327    SsdSequentialBytesExceeded,
328    CpuCyclesExceeded,
329    LatencyTargetExceeded,
330}
331
332impl CostTracker {
333    /// Create a new cost tracker with the given budget
334    pub fn new(budget: QueryBudget) -> Self {
335        Self {
336            budget,
337            ram_bytes: AtomicU64::new(0),
338            ssd_random_reads: AtomicU64::new(0),
339            ssd_sequential_bytes: AtomicU64::new(0),
340            cpu_cycles: AtomicU64::new(0),
341            start_time: Instant::now(),
342            exhausted: std::sync::atomic::AtomicBool::new(false),
343            exhaustion_reason: parking_lot::Mutex::new(None),
344        }
345    }
346
347    /// Add RAM bytes consumed
348    #[inline]
349    pub fn add_ram_bytes(&self, bytes: u64) -> bool {
350        let new_total = self.ram_bytes.fetch_add(bytes, Ordering::Relaxed) + bytes;
351        if new_total > self.budget.ram_bytes_limit {
352            self.mark_exhausted(BudgetExhaustionReason::RamBytesExceeded);
353            false
354        } else {
355            true
356        }
357    }
358
359    /// Add SSD random read
360    #[inline]
361    pub fn add_ssd_random_read(&self) -> bool {
362        let new_total = self.ssd_random_reads.fetch_add(1, Ordering::Relaxed) + 1;
363        if new_total > self.budget.ssd_random_reads_limit as u64 {
364            self.mark_exhausted(BudgetExhaustionReason::SsdRandomReadsExceeded);
365            false
366        } else {
367            true
368        }
369    }
370
371    /// Add SSD sequential bytes
372    #[inline]
373    pub fn add_ssd_sequential_bytes(&self, bytes: u64) -> bool {
374        let new_total = self
375            .ssd_sequential_bytes
376            .fetch_add(bytes, Ordering::Relaxed)
377            + bytes;
378        if new_total > self.budget.ssd_sequential_bytes_limit {
379            self.mark_exhausted(BudgetExhaustionReason::SsdSequentialBytesExceeded);
380            false
381        } else {
382            true
383        }
384    }
385
386    /// Add CPU cycles
387    #[inline]
388    pub fn add_cpu_cycles(&self, cycles: u64) -> bool {
389        let new_total = self.cpu_cycles.fetch_add(cycles, Ordering::Relaxed) + cycles;
390        if new_total > self.budget.cpu_cycles_limit {
391            self.mark_exhausted(BudgetExhaustionReason::CpuCyclesExceeded);
392            false
393        } else {
394            true
395        }
396    }
397
398    /// Check if latency budget is exceeded
399    #[inline]
400    pub fn check_latency(&self) -> bool {
401        if self.start_time.elapsed() > self.budget.latency_target {
402            self.mark_exhausted(BudgetExhaustionReason::LatencyTargetExceeded);
403            false
404        } else {
405            true
406        }
407    }
408
409    /// Mark budget as exhausted
410    fn mark_exhausted(&self, reason: BudgetExhaustionReason) {
411        self.exhausted.store(true, Ordering::Release);
412        let mut guard = self.exhaustion_reason.lock();
413        if guard.is_none() {
414            *guard = Some(reason);
415        }
416    }
417
418    /// Check if budget is exhausted
419    #[inline]
420    pub fn is_exhausted(&self) -> bool {
421        // Also check latency on every call
422        if self.start_time.elapsed() > self.budget.latency_target {
423            self.mark_exhausted(BudgetExhaustionReason::LatencyTargetExceeded);
424        }
425        self.exhausted.load(Ordering::Acquire)
426    }
427
428    /// Get exhaustion reason if budget is exhausted
429    pub fn exhaustion_reason(&self) -> Option<BudgetExhaustionReason> {
430        *self.exhaustion_reason.lock()
431    }
432
433    /// Get remaining RAM bytes budget
434    pub fn remaining_ram_bytes(&self) -> u64 {
435        self.budget
436            .ram_bytes_limit
437            .saturating_sub(self.ram_bytes.load(Ordering::Relaxed))
438    }
439
440    /// Get remaining SSD random reads budget
441    pub fn remaining_ssd_random_reads(&self) -> u32 {
442        self.budget
443            .ssd_random_reads_limit
444            .saturating_sub(self.ssd_random_reads.load(Ordering::Relaxed) as u32)
445    }
446
447    /// Get remaining time budget
448    pub fn remaining_time(&self) -> Duration {
449        self.budget
450            .latency_target
451            .saturating_sub(self.start_time.elapsed())
452    }
453
454    /// Get utilization ratios for all resources
455    pub fn utilization(&self) -> CostUtilization {
456        CostUtilization {
457            ram_bytes_ratio: self.ram_bytes.load(Ordering::Relaxed) as f64
458                / self.budget.ram_bytes_limit.max(1) as f64,
459            ssd_random_reads_ratio: self.ssd_random_reads.load(Ordering::Relaxed) as f64
460                / self.budget.ssd_random_reads_limit.max(1) as f64,
461            ssd_sequential_bytes_ratio: self.ssd_sequential_bytes.load(Ordering::Relaxed) as f64
462                / self.budget.ssd_sequential_bytes_limit.max(1) as f64,
463            cpu_cycles_ratio: self.cpu_cycles.load(Ordering::Relaxed) as f64
464                / self.budget.cpu_cycles_limit.max(1) as f64,
465            latency_ratio: self.start_time.elapsed().as_nanos() as f64
466                / self.budget.latency_target.as_nanos().max(1) as f64,
467        }
468    }
469
470    /// Generate a summary for telemetry
471    pub fn summary(&self) -> CostSummary {
472        CostSummary {
473            query_class: self.budget.query_class.clone(),
474            ram_bytes_used: self.ram_bytes.load(Ordering::Relaxed),
475            ram_bytes_limit: self.budget.ram_bytes_limit,
476            ssd_random_reads_used: self.ssd_random_reads.load(Ordering::Relaxed) as u32,
477            ssd_random_reads_limit: self.budget.ssd_random_reads_limit,
478            ssd_sequential_bytes_used: self.ssd_sequential_bytes.load(Ordering::Relaxed),
479            ssd_sequential_bytes_limit: self.budget.ssd_sequential_bytes_limit,
480            cpu_cycles_used: self.cpu_cycles.load(Ordering::Relaxed),
481            cpu_cycles_limit: self.budget.cpu_cycles_limit,
482            elapsed: self.start_time.elapsed(),
483            latency_target: self.budget.latency_target,
484            exhausted: self.is_exhausted(),
485            exhaustion_reason: self.exhaustion_reason(),
486        }
487    }
488}
489
490/// Resource utilization ratios
491#[derive(Debug, Clone)]
492pub struct CostUtilization {
493    pub ram_bytes_ratio: f64,
494    pub ssd_random_reads_ratio: f64,
495    pub ssd_sequential_bytes_ratio: f64,
496    pub cpu_cycles_ratio: f64,
497    pub latency_ratio: f64,
498}
499
500/// Summary of cost consumption
501#[derive(Debug, Clone)]
502pub struct CostSummary {
503    pub query_class: String,
504    pub ram_bytes_used: u64,
505    pub ram_bytes_limit: u64,
506    pub ssd_random_reads_used: u32,
507    pub ssd_random_reads_limit: u32,
508    pub ssd_sequential_bytes_used: u64,
509    pub ssd_sequential_bytes_limit: u64,
510    pub cpu_cycles_used: u64,
511    pub cpu_cycles_limit: u64,
512    pub elapsed: Duration,
513    pub latency_target: Duration,
514    pub exhausted: bool,
515    pub exhaustion_reason: Option<BudgetExhaustionReason>,
516}
517
518// ============================================================================
519// Admission Controller
520// ============================================================================
521
522/// Admission controller for backpressure under concurrency
523///
524/// Enforces system-wide limits to prevent individual query budgets from
525/// being violated due to resource contention.
526pub struct AdmissionController {
527    /// Maximum concurrent queries per query class
528    max_concurrent_per_class: parking_lot::RwLock<std::collections::HashMap<String, usize>>,
529
530    /// Current active queries per class
531    active_per_class: parking_lot::RwLock<std::collections::HashMap<String, AtomicUsize>>,
532
533    /// Global memory pressure (bytes currently in-flight)
534    global_memory_pressure: AtomicU64,
535
536    /// Maximum global memory pressure before backpressure
537    max_global_memory: u64,
538
539    /// Backpressure wait time
540    backpressure_wait: Duration,
541}
542
543/// Handle returned when a query is admitted
544pub struct AdmissionTicket {
545    query_class: String,
546    estimated_memory: u64,
547    controller: Arc<AdmissionController>,
548}
549
550impl Drop for AdmissionTicket {
551    fn drop(&mut self) {
552        self.controller
553            .release(&self.query_class, self.estimated_memory);
554    }
555}
556
557impl AdmissionController {
558    /// Create a new admission controller
559    pub fn new(max_global_memory: u64) -> Arc<Self> {
560        Arc::new(Self {
561            max_concurrent_per_class: parking_lot::RwLock::new(std::collections::HashMap::new()),
562            active_per_class: parking_lot::RwLock::new(std::collections::HashMap::new()),
563            global_memory_pressure: AtomicU64::new(0),
564            max_global_memory,
565            backpressure_wait: Duration::from_millis(10),
566        })
567    }
568
569    /// Set maximum concurrent queries for a class
570    pub fn set_class_limit(self: &Arc<Self>, query_class: &str, max_concurrent: usize) {
571        self.max_concurrent_per_class
572            .write()
573            .insert(query_class.to_string(), max_concurrent);
574    }
575
576    /// Try to admit a query
577    ///
578    /// Returns None if admission is denied (should backpressure)
579    pub fn try_admit(self: &Arc<Self>, budget: &QueryBudget) -> Option<AdmissionTicket> {
580        // Check class limit
581        let class_limits = self.max_concurrent_per_class.read();
582        if let Some(&limit) = class_limits.get(&budget.query_class) {
583            let mut active = self.active_per_class.write();
584            let counter = active
585                .entry(budget.query_class.clone())
586                .or_insert_with(|| AtomicUsize::new(0));
587
588            let current = counter.load(Ordering::Acquire);
589            if current >= limit {
590                return None;
591            }
592            counter.fetch_add(1, Ordering::AcqRel);
593        }
594        drop(class_limits);
595
596        // Check global memory
597        let estimated_memory = budget.ram_bytes_limit / 2; // Conservative estimate
598        let current = self
599            .global_memory_pressure
600            .fetch_add(estimated_memory, Ordering::AcqRel);
601
602        if current + estimated_memory > self.max_global_memory {
603            // Roll back
604            self.global_memory_pressure
605                .fetch_sub(estimated_memory, Ordering::AcqRel);
606            self.release_class_counter(&budget.query_class);
607            return None;
608        }
609
610        Some(AdmissionTicket {
611            query_class: budget.query_class.clone(),
612            estimated_memory,
613            controller: Arc::clone(self),
614        })
615    }
616
617    /// Admit a query, waiting with backpressure if necessary
618    pub fn admit_with_backpressure(
619        self: &Arc<Self>,
620        budget: &QueryBudget,
621        max_wait: Duration,
622    ) -> Option<AdmissionTicket> {
623        let deadline = Instant::now() + max_wait;
624
625        loop {
626            if let Some(ticket) = self.try_admit(budget) {
627                return Some(ticket);
628            }
629
630            if Instant::now() >= deadline {
631                return None;
632            }
633
634            std::thread::sleep(self.backpressure_wait);
635        }
636    }
637
638    /// Release resources when query completes
639    fn release(&self, query_class: &str, estimated_memory: u64) {
640        self.global_memory_pressure
641            .fetch_sub(estimated_memory, Ordering::AcqRel);
642        self.release_class_counter(query_class);
643    }
644
645    fn release_class_counter(&self, query_class: &str) {
646        let active = self.active_per_class.read();
647        if let Some(counter) = active.get(query_class) {
648            counter.fetch_sub(1, Ordering::AcqRel);
649        }
650    }
651
652    /// Get current system pressure metrics
653    pub fn metrics(&self) -> AdmissionMetrics {
654        let active = self.active_per_class.read();
655        let active_per_class: std::collections::HashMap<String, usize> = active
656            .iter()
657            .map(|(k, v)| (k.clone(), v.load(Ordering::Relaxed)))
658            .collect();
659
660        AdmissionMetrics {
661            global_memory_pressure: self.global_memory_pressure.load(Ordering::Relaxed),
662            max_global_memory: self.max_global_memory,
663            memory_utilization: self.global_memory_pressure.load(Ordering::Relaxed) as f64
664                / self.max_global_memory as f64,
665            active_per_class,
666        }
667    }
668}
669
670/// Metrics from admission controller
671#[derive(Debug, Clone)]
672pub struct AdmissionMetrics {
673    pub global_memory_pressure: u64,
674    pub max_global_memory: u64,
675    pub memory_utilization: f64,
676    pub active_per_class: std::collections::HashMap<String, usize>,
677}
678
679// ============================================================================
680// Query Class Registry
681// ============================================================================
682
683/// Registry of query classes with their budgets
684pub struct QueryClassRegistry {
685    classes: parking_lot::RwLock<std::collections::HashMap<String, QueryBudget>>,
686    hardware: HardwareProfile,
687}
688
689impl QueryClassRegistry {
690    /// Create a new registry with default classes
691    pub fn new(hardware: HardwareProfile) -> Self {
692        let mut classes = std::collections::HashMap::new();
693        classes.insert("low_latency".to_string(), QueryBudget::low_latency());
694        classes.insert("balanced".to_string(), QueryBudget::balanced());
695        classes.insert("high_recall".to_string(), QueryBudget::high_recall());
696
697        Self {
698            classes: parking_lot::RwLock::new(classes),
699            hardware,
700        }
701    }
702
703    /// Register a custom query class
704    pub fn register(&self, budget: QueryBudget) {
705        self.classes
706            .write()
707            .insert(budget.query_class.clone(), budget);
708    }
709
710    /// Get budget for a query class
711    pub fn get(&self, query_class: &str) -> Option<QueryBudget> {
712        self.classes.read().get(query_class).cloned()
713    }
714
715    /// Create a custom budget from SLA parameters
716    pub fn create_from_sla(
717        &self,
718        query_class: &str,
719        latency_target: Duration,
720        recall_target: f32,
721    ) -> QueryBudget {
722        QueryBudget::from_sla(query_class, latency_target, recall_target, &self.hardware)
723    }
724}
725
726impl Default for QueryClassRegistry {
727    fn default() -> Self {
728        Self::new(HardwareProfile::default())
729    }
730}
731
732#[cfg(test)]
733mod tests {
734    use super::*;
735
736    #[test]
737    fn test_budget_creation() {
738        let budget = QueryBudget::new("test")
739            .ram_bytes(1024 * 1024)
740            .ssd_random_reads(10)
741            .latency_target(Duration::from_millis(50));
742
743        assert_eq!(budget.query_class, "test");
744        assert_eq!(budget.ram_bytes_limit, 1024 * 1024);
745        assert_eq!(budget.ssd_random_reads_limit, 10);
746    }
747
748    #[test]
749    fn test_cost_tracker() {
750        let budget = QueryBudget::new("test").ram_bytes(1000).ssd_random_reads(2);
751
752        let tracker = CostTracker::new(budget);
753
754        assert!(tracker.add_ram_bytes(500));
755        assert!(!tracker.is_exhausted());
756
757        assert!(tracker.add_ram_bytes(400));
758        assert!(!tracker.is_exhausted());
759
760        // This should exceed
761        assert!(!tracker.add_ram_bytes(200));
762        assert!(tracker.is_exhausted());
763        assert_eq!(
764            tracker.exhaustion_reason(),
765            Some(BudgetExhaustionReason::RamBytesExceeded)
766        );
767    }
768
769    #[test]
770    fn test_admission_controller() {
771        // 64 MB global budget so memory is never the binding gate here — this
772        // test exercises the per-class concurrency limit (2). With the old 1 MB
773        // budget, a single low_latency query's 2 MB estimate already exceeded
774        // it, so try_admit rejected before the class limit could be reached.
775        let controller = AdmissionController::new(64 * 1024 * 1024);
776        controller.set_class_limit("low_latency", 2);
777
778        let budget = QueryBudget::low_latency();
779
780        let ticket1 = controller.try_admit(&budget);
781        assert!(ticket1.is_some());
782
783        let ticket2 = controller.try_admit(&budget);
784        assert!(ticket2.is_some());
785
786        // Third should be rejected
787        let ticket3 = controller.try_admit(&budget);
788        assert!(ticket3.is_none());
789
790        // Drop one ticket
791        drop(ticket1);
792
793        // Now should be admitted
794        let ticket4 = controller.try_admit(&budget);
795        assert!(ticket4.is_some());
796    }
797
798    #[test]
799    fn test_budget_from_sla() {
800        let hardware = HardwareProfile::default();
801        let budget = QueryBudget::from_sla("custom", Duration::from_millis(50), 0.95, &hardware);
802
803        assert!(budget.ram_bytes_limit > 0);
804        assert!(budget.cpu_cycles_limit > 0);
805    }
806}