1#![warn(missing_docs)]
69#![allow(clippy::module_name_repetitions)]
70#![allow(clippy::must_use_candidate)]
71
72pub mod allocation;
73pub mod cache;
74pub mod coordinator;
75pub mod gpu;
76pub mod memory;
77pub mod priority;
78pub mod quality;
79
80#[cfg(test)]
81mod tests;
82
83use std::sync::Arc;
84use std::time::{Duration, Instant};
85
86use parking_lot::RwLock;
87use thiserror::Error;
88
89pub use allocation::{Allocation, AllocationRequest, AllocationResult};
90pub use cache::{CacheConfig, CacheStats, CacheTier, FragmentCache};
91pub use coordinator::{Coordinator, CoordinatorConfig};
92pub use gpu::{DetectionMethod, GpuDetectionResult, GpuDetector, GpuInfo, GpuVendor};
93pub use memory::{GpuMemoryTracker, MemoryPressure, MemoryStats};
94pub use priority::{Priority, WorkloadType};
95pub use quality::{QualityAllocation, QualityBudget, QualityPolicy};
96
97#[derive(Debug, Error)]
101pub enum ArbiterError {
102 #[error("Insufficient GPU memory: requested {requested}MB, available {available}MB")]
104 InsufficientMemory {
105 requested: u64,
107 available: u64,
109 },
110
111 #[error("Cannot achieve minimum quality {minimum}: max available {available}")]
113 InsufficientQuality {
114 minimum: f32,
116 available: f32,
118 },
119
120 #[error("Workload preempted by higher priority task")]
122 Preempted,
123
124 #[error("Timeout waiting for resources after {0:?}")]
126 Timeout(Duration),
127
128 #[error("Arbiter is shutting down")]
130 ShuttingDown,
131
132 #[error("Internal error: {0}")]
134 Internal(String),
135}
136
137pub type Result<T> = std::result::Result<T, ArbiterError>;
139
140#[derive(Debug, Clone)]
144pub struct ArbiterConfig {
145 pub vram_budget: u64,
147 pub ram_budget: u64,
149 pub llm_min_quality: f32,
151 pub diffusion_min_quality: f32,
153 pub adaptive_quality: bool,
155 pub pressure_threshold: f32,
157 pub allocation_timeout: Duration,
159}
160
161impl Default for ArbiterConfig {
162 fn default() -> Self {
163 Self {
164 vram_budget: 20 * 1024 * 1024 * 1024, ram_budget: 64 * 1024 * 1024 * 1024, llm_min_quality: 0.4,
167 diffusion_min_quality: 0.3,
168 adaptive_quality: true,
169 pressure_threshold: 0.85,
170 allocation_timeout: Duration::from_secs(30),
171 }
172 }
173}
174
175impl ArbiterConfig {
176 pub fn auto_detect() -> Self {
181 let detector = gpu::GpuDetector::new();
182 let default_vram = 8 * 1024 * 1024 * 1024; let result = detector.detect_or_default(default_vram);
185
186 let usable_vram = (result.total_vram_bytes as f64 * 0.9) as u64;
188
189 let ram_budget = result.total_vram_bytes * 4;
191
192 Self {
193 vram_budget: usable_vram,
194 ram_budget,
195 ..Default::default()
196 }
197 }
198
199 pub fn for_vram_gb(vram_gb: u64) -> Self {
201 Self {
202 vram_budget: vram_gb * 1024 * 1024 * 1024,
203 ..Default::default()
204 }
205 }
206
207 pub fn from_detection(result: &gpu::GpuDetectionResult) -> Self {
209 let usable_vram = (result.total_vram_bytes as f64 * 0.9) as u64;
210 let ram_budget = result.total_vram_bytes * 4;
211
212 Self {
213 vram_budget: usable_vram,
214 ram_budget,
215 ..Default::default()
216 }
217 }
218}
219
220#[derive(Debug, Clone)]
224pub struct ArbiterState {
225 pub active_llm_workloads: usize,
227 pub active_diffusion_workloads: usize,
229 pub llm_quality: f32,
231 pub diffusion_quality: f32,
233 pub memory_pressure: f32,
235 pub vram_used: u64,
237 pub vram_available: u64,
239}
240
241#[derive(Debug, Clone, Default)]
245pub struct ArbiterStats {
246 pub total_allocations: u64,
248 pub successful_allocations: u64,
250 pub failed_allocations: u64,
252 pub quality_reductions: u64,
254 pub preemptions: u64,
256 pub avg_memory_pressure: f64,
258 pub avg_llm_quality: f64,
260 pub avg_diffusion_quality: f64,
262}
263
264pub struct Arbiter {
268 config: ArbiterConfig,
269 memory_tracker: Arc<GpuMemoryTracker>,
270 fragment_cache: Arc<FragmentCache>,
271 coordinator: Arc<Coordinator>,
272 state: RwLock<ArbiterState>,
273 stats: RwLock<ArbiterStats>,
274 started_at: Instant,
275}
276
277impl Arbiter {
278 pub fn new(config: ArbiterConfig) -> Result<Self> {
280 let memory_tracker = Arc::new(GpuMemoryTracker::new(config.vram_budget));
281 let fragment_cache = Arc::new(FragmentCache::new(CacheConfig {
282 vram_capacity: config.vram_budget / 2, ram_capacity: config.ram_budget,
284 }));
285 let coordinator = Arc::new(Coordinator::new(CoordinatorConfig {
286 llm_min_quality: config.llm_min_quality,
287 diffusion_min_quality: config.diffusion_min_quality,
288 adaptive: config.adaptive_quality,
289 policy: quality::QualityPolicy::Adaptive,
290 }));
291
292 let state = ArbiterState {
293 active_llm_workloads: 0,
294 active_diffusion_workloads: 0,
295 llm_quality: 1.0,
296 diffusion_quality: 1.0,
297 memory_pressure: 0.0,
298 vram_used: 0,
299 vram_available: config.vram_budget,
300 };
301
302 Ok(Self {
303 config,
304 memory_tracker,
305 fragment_cache,
306 coordinator,
307 state: RwLock::new(state),
308 stats: RwLock::new(ArbiterStats::default()),
309 started_at: Instant::now(),
310 })
311 }
312
313 pub fn config(&self) -> &ArbiterConfig {
315 &self.config
316 }
317
318 pub fn state(&self) -> ArbiterState {
320 self.state.read().clone()
321 }
322
323 pub fn stats(&self) -> ArbiterStats {
325 self.stats.read().clone()
326 }
327
328 pub fn memory_tracker(&self) -> &Arc<GpuMemoryTracker> {
330 &self.memory_tracker
331 }
332
333 pub fn fragment_cache(&self) -> &Arc<FragmentCache> {
335 &self.fragment_cache
336 }
337
338 pub fn coordinator(&self) -> &Arc<Coordinator> {
340 &self.coordinator
341 }
342
343 pub fn uptime(&self) -> Duration {
345 self.started_at.elapsed()
346 }
347
348 pub fn request_allocation(
350 &self,
351 workload_type: WorkloadType,
352 priority: Priority,
353 memory_required: u64,
354 ) -> Result<Allocation> {
355 let mut stats = self.stats.write();
356 stats.total_allocations += 1;
357
358 let available = self.memory_tracker.available();
360 if memory_required > available {
361 stats.failed_allocations += 1;
362 return Err(ArbiterError::InsufficientMemory {
363 requested: memory_required / (1024 * 1024),
364 available: available / (1024 * 1024),
365 });
366 }
367
368 self.memory_tracker.allocate(memory_required);
370
371 let pressure = self.memory_tracker.pressure();
373 let quality = self
374 .coordinator
375 .calculate_quality(workload_type, priority, pressure);
376
377 let mut state = self.state.write();
379 match workload_type {
380 WorkloadType::LlmInference => {
381 state.active_llm_workloads += 1;
382 state.llm_quality = quality;
383 },
384 WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
385 state.active_diffusion_workloads += 1;
386 state.diffusion_quality = quality;
387 },
388 }
389 state.vram_used += memory_required;
390 state.vram_available = self.config.vram_budget.saturating_sub(state.vram_used);
391 state.memory_pressure = pressure;
392
393 stats.successful_allocations += 1;
394
395 Ok(Allocation {
396 id: format!("{:?}-{}", workload_type, stats.total_allocations),
397 workload_type,
398 priority,
399 memory_allocated: memory_required,
400 quality_target: quality,
401 created_at: Instant::now(),
402 })
403 }
404
405 pub fn release_allocation(&self, allocation: &Allocation) {
407 self.memory_tracker.deallocate(allocation.memory_allocated);
408
409 let mut state = self.state.write();
410 match allocation.workload_type {
411 WorkloadType::LlmInference => {
412 state.active_llm_workloads = state.active_llm_workloads.saturating_sub(1);
413 },
414 WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
415 state.active_diffusion_workloads =
416 state.active_diffusion_workloads.saturating_sub(1);
417 },
418 }
419 state.vram_used = state.vram_used.saturating_sub(allocation.memory_allocated);
420 state.vram_available = self.config.vram_budget.saturating_sub(state.vram_used);
421 state.memory_pressure = self.memory_tracker.pressure();
422
423 if state.active_llm_workloads == 0 {
425 state.diffusion_quality = 1.0;
426 }
427 if state.active_diffusion_workloads == 0 {
428 state.llm_quality = 1.0;
429 }
430 }
431
432 pub fn recommended_quality(&self, workload_type: WorkloadType) -> f32 {
434 let state = self.state.read();
435 match workload_type {
436 WorkloadType::LlmInference => state.llm_quality,
437 WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
438 state.diffusion_quality
439 },
440 }
441 }
442}
443
444#[cfg(test)]
447mod integration_tests {
448 use super::*;
449
450 #[test]
451 fn test_arbiter_creation() {
452 let config = ArbiterConfig::for_vram_gb(24);
453 let arbiter = Arbiter::new(config);
454
455 assert!(arbiter.is_ok());
456 let arbiter = arbiter.expect("Failed to create arbiter");
457 assert_eq!(arbiter.config().vram_budget, 24 * 1024 * 1024 * 1024);
458 }
459
460 #[test]
461 fn test_allocation_request() {
462 let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
463
464 let allocation = arbiter.request_allocation(
465 WorkloadType::LlmInference,
466 Priority::High,
467 1024 * 1024 * 1024, );
469
470 assert!(allocation.is_ok());
471 let allocation = allocation.expect("Failed to allocate");
472 assert_eq!(allocation.workload_type, WorkloadType::LlmInference);
473 assert!(allocation.quality_target > 0.0);
474 }
475
476 #[test]
477 fn test_allocation_release() {
478 let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
479
480 let allocation = arbiter
481 .request_allocation(
482 WorkloadType::ImageGeneration,
483 Priority::Normal,
484 2 * 1024 * 1024 * 1024, )
486 .expect("Failed to allocate");
487
488 let state_before = arbiter.state();
489 assert_eq!(state_before.active_diffusion_workloads, 1);
490
491 arbiter.release_allocation(&allocation);
492
493 let state_after = arbiter.state();
494 assert_eq!(state_after.active_diffusion_workloads, 0);
495 }
496
497 #[test]
498 fn test_insufficient_memory() {
499 let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(8)).expect("Failed");
500
501 let result = arbiter.request_allocation(
502 WorkloadType::LlmInference,
503 Priority::High,
504 100 * 1024 * 1024 * 1024, );
506
507 assert!(result.is_err());
508 assert!(matches!(
509 result.unwrap_err(),
510 ArbiterError::InsufficientMemory { .. }
511 ));
512 }
513
514 #[test]
515 fn test_quality_balancing() {
516 let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
517
518 let llm1 = arbiter
520 .request_allocation(
521 WorkloadType::LlmInference,
522 Priority::High,
523 10 * 1024 * 1024 * 1024,
524 )
525 .expect("Failed");
526
527 let _diff = arbiter
529 .request_allocation(
530 WorkloadType::ImageGeneration,
531 Priority::Normal,
532 8 * 1024 * 1024 * 1024,
533 )
534 .expect("Failed");
535
536 let state = arbiter.state();
537 assert_eq!(state.active_llm_workloads, 1);
539 assert_eq!(state.active_diffusion_workloads, 1);
540 assert!(state.llm_quality >= state.diffusion_quality);
542 }
543}