#![warn(missing_docs)]
#![allow(clippy::module_name_repetitions)]
#![allow(clippy::must_use_candidate)]
pub mod allocation;
pub mod cache;
pub mod coordinator;
pub mod gpu;
pub mod memory;
pub mod priority;
pub mod quality;
#[cfg(test)]
mod tests;
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use thiserror::Error;
pub use allocation::{Allocation, AllocationRequest, AllocationResult};
pub use cache::{CacheConfig, CacheStats, CacheTier, FragmentCache};
pub use coordinator::{Coordinator, CoordinatorConfig};
pub use gpu::{DetectionMethod, GpuDetectionResult, GpuDetector, GpuInfo, GpuVendor};
pub use memory::{GpuMemoryTracker, MemoryPressure, MemoryStats};
pub use priority::{Priority, WorkloadType};
pub use quality::{QualityAllocation, QualityBudget, QualityPolicy};
#[derive(Debug, Error)]
pub enum ArbiterError {
#[error("Insufficient GPU memory: requested {requested}MB, available {available}MB")]
InsufficientMemory {
requested: u64,
available: u64,
},
#[error("Cannot achieve minimum quality {minimum}: max available {available}")]
InsufficientQuality {
minimum: f32,
available: f32,
},
#[error("Workload preempted by higher priority task")]
Preempted,
#[error("Timeout waiting for resources after {0:?}")]
Timeout(Duration),
#[error("Arbiter is shutting down")]
ShuttingDown,
#[error("Internal error: {0}")]
Internal(String),
}
pub type Result<T> = std::result::Result<T, ArbiterError>;
#[derive(Debug, Clone)]
pub struct ArbiterConfig {
pub vram_budget: u64,
pub ram_budget: u64,
pub llm_min_quality: f32,
pub diffusion_min_quality: f32,
pub adaptive_quality: bool,
pub pressure_threshold: f32,
pub allocation_timeout: Duration,
}
impl Default for ArbiterConfig {
fn default() -> Self {
Self {
vram_budget: 20 * 1024 * 1024 * 1024, ram_budget: 64 * 1024 * 1024 * 1024, llm_min_quality: 0.4,
diffusion_min_quality: 0.3,
adaptive_quality: true,
pressure_threshold: 0.85,
allocation_timeout: Duration::from_secs(30),
}
}
}
impl ArbiterConfig {
pub fn auto_detect() -> Self {
let detector = gpu::GpuDetector::new();
let default_vram = 8 * 1024 * 1024 * 1024;
let result = detector.detect_or_default(default_vram);
let usable_vram = (result.total_vram_bytes as f64 * 0.9) as u64;
let ram_budget = result.total_vram_bytes * 4;
Self {
vram_budget: usable_vram,
ram_budget,
..Default::default()
}
}
pub fn for_vram_gb(vram_gb: u64) -> Self {
Self {
vram_budget: vram_gb * 1024 * 1024 * 1024,
..Default::default()
}
}
pub fn from_detection(result: &gpu::GpuDetectionResult) -> Self {
let usable_vram = (result.total_vram_bytes as f64 * 0.9) as u64;
let ram_budget = result.total_vram_bytes * 4;
Self {
vram_budget: usable_vram,
ram_budget,
..Default::default()
}
}
}
#[derive(Debug, Clone)]
pub struct ArbiterState {
pub active_llm_workloads: usize,
pub active_diffusion_workloads: usize,
pub llm_quality: f32,
pub diffusion_quality: f32,
pub memory_pressure: f32,
pub vram_used: u64,
pub vram_available: u64,
}
#[derive(Debug, Clone, Default)]
pub struct ArbiterStats {
pub total_allocations: u64,
pub successful_allocations: u64,
pub failed_allocations: u64,
pub quality_reductions: u64,
pub preemptions: u64,
pub avg_memory_pressure: f64,
pub avg_llm_quality: f64,
pub avg_diffusion_quality: f64,
}
pub struct Arbiter {
config: ArbiterConfig,
memory_tracker: Arc<GpuMemoryTracker>,
fragment_cache: Arc<FragmentCache>,
coordinator: Arc<Coordinator>,
state: RwLock<ArbiterState>,
stats: RwLock<ArbiterStats>,
started_at: Instant,
}
impl Arbiter {
pub fn new(config: ArbiterConfig) -> Result<Self> {
let memory_tracker = Arc::new(GpuMemoryTracker::new(config.vram_budget));
let fragment_cache = Arc::new(FragmentCache::new(CacheConfig {
vram_capacity: config.vram_budget / 2, ram_capacity: config.ram_budget,
}));
let coordinator = Arc::new(Coordinator::new(CoordinatorConfig {
llm_min_quality: config.llm_min_quality,
diffusion_min_quality: config.diffusion_min_quality,
adaptive: config.adaptive_quality,
policy: quality::QualityPolicy::Adaptive,
}));
let state = ArbiterState {
active_llm_workloads: 0,
active_diffusion_workloads: 0,
llm_quality: 1.0,
diffusion_quality: 1.0,
memory_pressure: 0.0,
vram_used: 0,
vram_available: config.vram_budget,
};
Ok(Self {
config,
memory_tracker,
fragment_cache,
coordinator,
state: RwLock::new(state),
stats: RwLock::new(ArbiterStats::default()),
started_at: Instant::now(),
})
}
pub fn config(&self) -> &ArbiterConfig {
&self.config
}
pub fn state(&self) -> ArbiterState {
self.state.read().clone()
}
pub fn stats(&self) -> ArbiterStats {
self.stats.read().clone()
}
pub fn memory_tracker(&self) -> &Arc<GpuMemoryTracker> {
&self.memory_tracker
}
pub fn fragment_cache(&self) -> &Arc<FragmentCache> {
&self.fragment_cache
}
pub fn coordinator(&self) -> &Arc<Coordinator> {
&self.coordinator
}
pub fn uptime(&self) -> Duration {
self.started_at.elapsed()
}
pub fn request_allocation(
&self,
workload_type: WorkloadType,
priority: Priority,
memory_required: u64,
) -> Result<Allocation> {
let mut stats = self.stats.write();
stats.total_allocations += 1;
let available = self.memory_tracker.available();
if memory_required > available {
stats.failed_allocations += 1;
return Err(ArbiterError::InsufficientMemory {
requested: memory_required / (1024 * 1024),
available: available / (1024 * 1024),
});
}
self.memory_tracker.allocate(memory_required);
let pressure = self.memory_tracker.pressure();
let quality = self
.coordinator
.calculate_quality(workload_type, priority, pressure);
let mut state = self.state.write();
match workload_type {
WorkloadType::LlmInference => {
state.active_llm_workloads += 1;
state.llm_quality = quality;
},
WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
state.active_diffusion_workloads += 1;
state.diffusion_quality = quality;
},
}
state.vram_used += memory_required;
state.vram_available = self.config.vram_budget.saturating_sub(state.vram_used);
state.memory_pressure = pressure;
stats.successful_allocations += 1;
Ok(Allocation {
id: format!("{:?}-{}", workload_type, stats.total_allocations),
workload_type,
priority,
memory_allocated: memory_required,
quality_target: quality,
created_at: Instant::now(),
})
}
pub fn release_allocation(&self, allocation: &Allocation) {
self.memory_tracker.deallocate(allocation.memory_allocated);
let mut state = self.state.write();
match allocation.workload_type {
WorkloadType::LlmInference => {
state.active_llm_workloads = state.active_llm_workloads.saturating_sub(1);
},
WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
state.active_diffusion_workloads =
state.active_diffusion_workloads.saturating_sub(1);
},
}
state.vram_used = state.vram_used.saturating_sub(allocation.memory_allocated);
state.vram_available = self.config.vram_budget.saturating_sub(state.vram_used);
state.memory_pressure = self.memory_tracker.pressure();
if state.active_llm_workloads == 0 {
state.diffusion_quality = 1.0;
}
if state.active_diffusion_workloads == 0 {
state.llm_quality = 1.0;
}
}
pub fn recommended_quality(&self, workload_type: WorkloadType) -> f32 {
let state = self.state.read();
match workload_type {
WorkloadType::LlmInference => state.llm_quality,
WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
state.diffusion_quality
},
}
}
}
#[cfg(test)]
mod integration_tests {
use super::*;
#[test]
fn test_arbiter_creation() {
let config = ArbiterConfig::for_vram_gb(24);
let arbiter = Arbiter::new(config);
assert!(arbiter.is_ok());
let arbiter = arbiter.expect("Failed to create arbiter");
assert_eq!(arbiter.config().vram_budget, 24 * 1024 * 1024 * 1024);
}
#[test]
fn test_allocation_request() {
let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
let allocation = arbiter.request_allocation(
WorkloadType::LlmInference,
Priority::High,
1024 * 1024 * 1024, );
assert!(allocation.is_ok());
let allocation = allocation.expect("Failed to allocate");
assert_eq!(allocation.workload_type, WorkloadType::LlmInference);
assert!(allocation.quality_target > 0.0);
}
#[test]
fn test_allocation_release() {
let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
let allocation = arbiter
.request_allocation(
WorkloadType::ImageGeneration,
Priority::Normal,
2 * 1024 * 1024 * 1024, )
.expect("Failed to allocate");
let state_before = arbiter.state();
assert_eq!(state_before.active_diffusion_workloads, 1);
arbiter.release_allocation(&allocation);
let state_after = arbiter.state();
assert_eq!(state_after.active_diffusion_workloads, 0);
}
#[test]
fn test_insufficient_memory() {
let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(8)).expect("Failed");
let result = arbiter.request_allocation(
WorkloadType::LlmInference,
Priority::High,
100 * 1024 * 1024 * 1024, );
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
ArbiterError::InsufficientMemory { .. }
));
}
#[test]
fn test_quality_balancing() {
let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
let llm1 = arbiter
.request_allocation(
WorkloadType::LlmInference,
Priority::High,
10 * 1024 * 1024 * 1024,
)
.expect("Failed");
let _diff = arbiter
.request_allocation(
WorkloadType::ImageGeneration,
Priority::Normal,
8 * 1024 * 1024 * 1024,
)
.expect("Failed");
let state = arbiter.state();
assert_eq!(state.active_llm_workloads, 1);
assert_eq!(state.active_diffusion_workloads, 1);
assert!(state.llm_quality >= state.diffusion_quality);
}
}