Skip to main content

infernum_arbiter/
lib.rs

1//! # Arbiter - Unified GPU Coordination
2//!
3//! *"The judge allocates resources justly"*
4//!
5//! Arbiter coordinates GPU resources between Infernum (LLM inference)
6//! and Dantalion (diffusion/image generation), enabling simultaneous
7//! multimodal workloads on a single GPU.
8//!
9//! ## Core Principles
10//!
11//! 1. **Quality-Aware Scheduling**: Both systems can run at reduced quality
12//!    when sharing GPU, with quality improving as resources become available.
13//!
14//! 2. **Priority-Based Arbitration**: User-facing workloads get priority,
15//!    background improvement yields when needed.
16//!
17//! 3. **Unified Fragment Cache**: HoloTensor fragments are cached across
18//!    both systems, avoiding redundant loading.
19//!
20//! ## Architecture
21//!
22//! ```text
23//! ┌─────────────────────────────────────────────────────────────────┐
24//! │                         ARBITER                                 │
25//! │  Monitors GPU memory, coordinates quality targets, routes work  │
26//! └──────────────────────┬──────────────────────────────────────────┘
27//!                        │
28//!          ┌─────────────┴─────────────┐
29//!          │                           │
30//!          ▼                           ▼
31//! ┌─────────────────────┐     ┌─────────────────────┐
32//! │     INFERNUM        │     │     DANTALION       │
33//! │  (LLM Inference)    │     │  (Diffusion)        │
34//! │                     │     │                     │
35//! │  Quality: 40-100%   │     │  Quality: 30-100%   │
36//! │  via HoloTensor     │     │  via ProgressiveLoad│
37//! └─────────────────────┘     └─────────────────────┘
38//!          │                           │
39//!          └─────────────┬─────────────┘
40//!                        │
41//!                        ▼
42//!          ┌─────────────────────────────┐
43//!          │    UNIFIED FRAGMENT CACHE   │
44//!          │  VRAM ← RAM ← NVMe ← CDN    │
45//!          └─────────────────────────────┘
46//! ```
47//!
48//! ## Example
49//!
50//! ```ignore
51//! use arbiter::{Arbiter, ArbiterConfig, WorkloadType, Priority};
52//!
53//! let arbiter = Arbiter::new(ArbiterConfig::auto_detect())?;
54//!
55//! // Request LLM inference at high priority
56//! let llm_allocation = arbiter.request_allocation(
57//!     WorkloadType::LlmInference,
58//!     Priority::UserFacing,
59//! ).await?;
60//!
61//! // LLM gets 70% quality, Dantalion drops to 40%
62//! let diffusion_allocation = arbiter.request_allocation(
63//!     WorkloadType::ImageGeneration,
64//!     Priority::Background,
65//! ).await?;
66//! ```
67
68#![warn(missing_docs)]
69#![allow(clippy::module_name_repetitions)]
70#![allow(clippy::must_use_candidate)]
71
72pub mod allocation;
73pub mod cache;
74pub mod coordinator;
75pub mod gpu;
76pub mod memory;
77pub mod priority;
78pub mod quality;
79
80#[cfg(test)]
81mod tests;
82
83use std::sync::Arc;
84use std::time::{Duration, Instant};
85
86use parking_lot::RwLock;
87use thiserror::Error;
88
89pub use allocation::{Allocation, AllocationRequest, AllocationResult};
90pub use cache::{CacheConfig, CacheStats, CacheTier, FragmentCache};
91pub use coordinator::{Coordinator, CoordinatorConfig};
92pub use gpu::{DetectionMethod, GpuDetectionResult, GpuDetector, GpuInfo, GpuVendor};
93pub use memory::{GpuMemoryTracker, MemoryPressure, MemoryStats};
94pub use priority::{Priority, WorkloadType};
95pub use quality::{QualityAllocation, QualityBudget, QualityPolicy};
96
97// ==================== Error Types ====================
98
99/// Errors from Arbiter operations.
100#[derive(Debug, Error)]
101pub enum ArbiterError {
102    /// Insufficient GPU memory for allocation.
103    #[error("Insufficient GPU memory: requested {requested}MB, available {available}MB")]
104    InsufficientMemory {
105        /// Requested memory in MB.
106        requested: u64,
107        /// Available memory in MB.
108        available: u64,
109    },
110
111    /// Quality target cannot be achieved.
112    #[error("Cannot achieve minimum quality {minimum}: max available {available}")]
113    InsufficientQuality {
114        /// Minimum quality required.
115        minimum: f32,
116        /// Maximum quality available.
117        available: f32,
118    },
119
120    /// Workload was preempted.
121    #[error("Workload preempted by higher priority task")]
122    Preempted,
123
124    /// Timeout waiting for resources.
125    #[error("Timeout waiting for resources after {0:?}")]
126    Timeout(Duration),
127
128    /// Arbiter is shutting down.
129    #[error("Arbiter is shutting down")]
130    ShuttingDown,
131
132    /// Internal error.
133    #[error("Internal error: {0}")]
134    Internal(String),
135}
136
137/// Result type for Arbiter operations.
138pub type Result<T> = std::result::Result<T, ArbiterError>;
139
140// ==================== Arbiter Configuration ====================
141
142/// Configuration for the Arbiter.
143#[derive(Debug, Clone)]
144pub struct ArbiterConfig {
145    /// Total VRAM budget in bytes.
146    pub vram_budget: u64,
147    /// RAM budget for fragment caching.
148    pub ram_budget: u64,
149    /// Minimum quality for LLM inference.
150    pub llm_min_quality: f32,
151    /// Minimum quality for diffusion.
152    pub diffusion_min_quality: f32,
153    /// Enable adaptive quality balancing.
154    pub adaptive_quality: bool,
155    /// Memory pressure threshold to trigger quality reduction.
156    pub pressure_threshold: f32,
157    /// Timeout for allocation requests.
158    pub allocation_timeout: Duration,
159}
160
161impl Default for ArbiterConfig {
162    fn default() -> Self {
163        Self {
164            vram_budget: 20 * 1024 * 1024 * 1024, // 20GB
165            ram_budget: 64 * 1024 * 1024 * 1024,  // 64GB
166            llm_min_quality: 0.4,
167            diffusion_min_quality: 0.3,
168            adaptive_quality: true,
169            pressure_threshold: 0.85,
170            allocation_timeout: Duration::from_secs(30),
171        }
172    }
173}
174
175impl ArbiterConfig {
176    /// Auto-detects GPU memory and creates appropriate config.
177    ///
178    /// Uses `GpuDetector` to query nvidia-smi, rocm-smi, or system info.
179    /// Falls back to 8GB default if detection fails.
180    pub fn auto_detect() -> Self {
181        let detector = gpu::GpuDetector::new();
182        let default_vram = 8 * 1024 * 1024 * 1024; // 8GB fallback
183
184        let result = detector.detect_or_default(default_vram);
185
186        // Use detected VRAM, reserving 10% for system overhead
187        let usable_vram = (result.total_vram_bytes as f64 * 0.9) as u64;
188
189        // Estimate RAM as 4x VRAM for fragment caching
190        let ram_budget = result.total_vram_bytes * 4;
191
192        Self {
193            vram_budget: usable_vram,
194            ram_budget,
195            ..Default::default()
196        }
197    }
198
199    /// Creates config for a specific VRAM size in GB.
200    pub fn for_vram_gb(vram_gb: u64) -> Self {
201        Self {
202            vram_budget: vram_gb * 1024 * 1024 * 1024,
203            ..Default::default()
204        }
205    }
206
207    /// Creates config from detected GPU info.
208    pub fn from_detection(result: &gpu::GpuDetectionResult) -> Self {
209        let usable_vram = (result.total_vram_bytes as f64 * 0.9) as u64;
210        let ram_budget = result.total_vram_bytes * 4;
211
212        Self {
213            vram_budget: usable_vram,
214            ram_budget,
215            ..Default::default()
216        }
217    }
218}
219
220// ==================== Arbiter State ====================
221
222/// Current state of the Arbiter.
223#[derive(Debug, Clone)]
224pub struct ArbiterState {
225    /// Active LLM workloads.
226    pub active_llm_workloads: usize,
227    /// Active diffusion workloads.
228    pub active_diffusion_workloads: usize,
229    /// Current LLM quality target.
230    pub llm_quality: f32,
231    /// Current diffusion quality target.
232    pub diffusion_quality: f32,
233    /// GPU memory pressure (0.0 - 1.0).
234    pub memory_pressure: f32,
235    /// VRAM used in bytes.
236    pub vram_used: u64,
237    /// VRAM available in bytes.
238    pub vram_available: u64,
239}
240
241// ==================== Arbiter Statistics ====================
242
243/// Statistics for the Arbiter.
244#[derive(Debug, Clone, Default)]
245pub struct ArbiterStats {
246    /// Total allocations made.
247    pub total_allocations: u64,
248    /// Allocations that succeeded.
249    pub successful_allocations: u64,
250    /// Allocations that failed.
251    pub failed_allocations: u64,
252    /// Times quality was reduced due to pressure.
253    pub quality_reductions: u64,
254    /// Workloads preempted.
255    pub preemptions: u64,
256    /// Average memory pressure.
257    pub avg_memory_pressure: f64,
258    /// Average LLM quality achieved.
259    pub avg_llm_quality: f64,
260    /// Average diffusion quality achieved.
261    pub avg_diffusion_quality: f64,
262}
263
264// ==================== The Arbiter ====================
265
266/// The main GPU arbiter coordinating Infernum and Dantalion.
267pub struct Arbiter {
268    config: ArbiterConfig,
269    memory_tracker: Arc<GpuMemoryTracker>,
270    fragment_cache: Arc<FragmentCache>,
271    coordinator: Arc<Coordinator>,
272    state: RwLock<ArbiterState>,
273    stats: RwLock<ArbiterStats>,
274    started_at: Instant,
275}
276
277impl Arbiter {
278    /// Creates a new Arbiter with the given configuration.
279    pub fn new(config: ArbiterConfig) -> Result<Self> {
280        let memory_tracker = Arc::new(GpuMemoryTracker::new(config.vram_budget));
281        let fragment_cache = Arc::new(FragmentCache::new(CacheConfig {
282            vram_capacity: config.vram_budget / 2, // Reserve half for weights
283            ram_capacity: config.ram_budget,
284        }));
285        let coordinator = Arc::new(Coordinator::new(CoordinatorConfig {
286            llm_min_quality: config.llm_min_quality,
287            diffusion_min_quality: config.diffusion_min_quality,
288            adaptive: config.adaptive_quality,
289            policy: quality::QualityPolicy::Adaptive,
290        }));
291
292        let state = ArbiterState {
293            active_llm_workloads: 0,
294            active_diffusion_workloads: 0,
295            llm_quality: 1.0,
296            diffusion_quality: 1.0,
297            memory_pressure: 0.0,
298            vram_used: 0,
299            vram_available: config.vram_budget,
300        };
301
302        Ok(Self {
303            config,
304            memory_tracker,
305            fragment_cache,
306            coordinator,
307            state: RwLock::new(state),
308            stats: RwLock::new(ArbiterStats::default()),
309            started_at: Instant::now(),
310        })
311    }
312
313    /// Returns the configuration.
314    pub fn config(&self) -> &ArbiterConfig {
315        &self.config
316    }
317
318    /// Returns current state.
319    pub fn state(&self) -> ArbiterState {
320        self.state.read().clone()
321    }
322
323    /// Returns statistics.
324    pub fn stats(&self) -> ArbiterStats {
325        self.stats.read().clone()
326    }
327
328    /// Returns the memory tracker.
329    pub fn memory_tracker(&self) -> &Arc<GpuMemoryTracker> {
330        &self.memory_tracker
331    }
332
333    /// Returns the fragment cache.
334    pub fn fragment_cache(&self) -> &Arc<FragmentCache> {
335        &self.fragment_cache
336    }
337
338    /// Returns the coordinator.
339    pub fn coordinator(&self) -> &Arc<Coordinator> {
340        &self.coordinator
341    }
342
343    /// Returns uptime.
344    pub fn uptime(&self) -> Duration {
345        self.started_at.elapsed()
346    }
347
348    /// Requests a GPU allocation for a workload.
349    pub fn request_allocation(
350        &self,
351        workload_type: WorkloadType,
352        priority: Priority,
353        memory_required: u64,
354    ) -> Result<Allocation> {
355        let mut stats = self.stats.write();
356        stats.total_allocations += 1;
357
358        // Check memory availability
359        let available = self.memory_tracker.available();
360        if memory_required > available {
361            stats.failed_allocations += 1;
362            return Err(ArbiterError::InsufficientMemory {
363                requested: memory_required / (1024 * 1024),
364                available: available / (1024 * 1024),
365            });
366        }
367
368        // Track allocation in memory tracker first
369        self.memory_tracker.allocate(memory_required);
370
371        // Calculate quality allocation based on current pressure (after allocation)
372        let pressure = self.memory_tracker.pressure();
373        let quality = self
374            .coordinator
375            .calculate_quality(workload_type, priority, pressure);
376
377        // Update state
378        let mut state = self.state.write();
379        match workload_type {
380            WorkloadType::LlmInference => {
381                state.active_llm_workloads += 1;
382                state.llm_quality = quality;
383            },
384            WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
385                state.active_diffusion_workloads += 1;
386                state.diffusion_quality = quality;
387            },
388        }
389        state.vram_used += memory_required;
390        state.vram_available = self.config.vram_budget.saturating_sub(state.vram_used);
391        state.memory_pressure = pressure;
392
393        stats.successful_allocations += 1;
394
395        Ok(Allocation {
396            id: format!("{:?}-{}", workload_type, stats.total_allocations),
397            workload_type,
398            priority,
399            memory_allocated: memory_required,
400            quality_target: quality,
401            created_at: Instant::now(),
402        })
403    }
404
405    /// Releases an allocation.
406    pub fn release_allocation(&self, allocation: &Allocation) {
407        self.memory_tracker.deallocate(allocation.memory_allocated);
408
409        let mut state = self.state.write();
410        match allocation.workload_type {
411            WorkloadType::LlmInference => {
412                state.active_llm_workloads = state.active_llm_workloads.saturating_sub(1);
413            },
414            WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
415                state.active_diffusion_workloads =
416                    state.active_diffusion_workloads.saturating_sub(1);
417            },
418        }
419        state.vram_used = state.vram_used.saturating_sub(allocation.memory_allocated);
420        state.vram_available = self.config.vram_budget.saturating_sub(state.vram_used);
421        state.memory_pressure = self.memory_tracker.pressure();
422
423        // Rebalance quality if workloads reduced
424        if state.active_llm_workloads == 0 {
425            state.diffusion_quality = 1.0;
426        }
427        if state.active_diffusion_workloads == 0 {
428            state.llm_quality = 1.0;
429        }
430    }
431
432    /// Gets recommended quality for a workload type.
433    pub fn recommended_quality(&self, workload_type: WorkloadType) -> f32 {
434        let state = self.state.read();
435        match workload_type {
436            WorkloadType::LlmInference => state.llm_quality,
437            WorkloadType::ImageGeneration | WorkloadType::VideoGeneration => {
438                state.diffusion_quality
439            },
440        }
441    }
442}
443
444// ==================== Integration Tests ====================
445
446#[cfg(test)]
447mod integration_tests {
448    use super::*;
449
450    #[test]
451    fn test_arbiter_creation() {
452        let config = ArbiterConfig::for_vram_gb(24);
453        let arbiter = Arbiter::new(config);
454
455        assert!(arbiter.is_ok());
456        let arbiter = arbiter.expect("Failed to create arbiter");
457        assert_eq!(arbiter.config().vram_budget, 24 * 1024 * 1024 * 1024);
458    }
459
460    #[test]
461    fn test_allocation_request() {
462        let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
463
464        let allocation = arbiter.request_allocation(
465            WorkloadType::LlmInference,
466            Priority::High,
467            1024 * 1024 * 1024, // 1GB
468        );
469
470        assert!(allocation.is_ok());
471        let allocation = allocation.expect("Failed to allocate");
472        assert_eq!(allocation.workload_type, WorkloadType::LlmInference);
473        assert!(allocation.quality_target > 0.0);
474    }
475
476    #[test]
477    fn test_allocation_release() {
478        let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
479
480        let allocation = arbiter
481            .request_allocation(
482                WorkloadType::ImageGeneration,
483                Priority::Normal,
484                2 * 1024 * 1024 * 1024, // 2GB
485            )
486            .expect("Failed to allocate");
487
488        let state_before = arbiter.state();
489        assert_eq!(state_before.active_diffusion_workloads, 1);
490
491        arbiter.release_allocation(&allocation);
492
493        let state_after = arbiter.state();
494        assert_eq!(state_after.active_diffusion_workloads, 0);
495    }
496
497    #[test]
498    fn test_insufficient_memory() {
499        let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(8)).expect("Failed");
500
501        let result = arbiter.request_allocation(
502            WorkloadType::LlmInference,
503            Priority::High,
504            100 * 1024 * 1024 * 1024, // 100GB - more than available
505        );
506
507        assert!(result.is_err());
508        assert!(matches!(
509            result.unwrap_err(),
510            ArbiterError::InsufficientMemory { .. }
511        ));
512    }
513
514    #[test]
515    fn test_quality_balancing() {
516        let arbiter = Arbiter::new(ArbiterConfig::for_vram_gb(24)).expect("Failed");
517
518        // First LLM allocation gets full quality
519        let llm1 = arbiter
520            .request_allocation(
521                WorkloadType::LlmInference,
522                Priority::High,
523                10 * 1024 * 1024 * 1024,
524            )
525            .expect("Failed");
526
527        // Adding diffusion should reduce both qualities
528        let _diff = arbiter
529            .request_allocation(
530                WorkloadType::ImageGeneration,
531                Priority::Normal,
532                8 * 1024 * 1024 * 1024,
533            )
534            .expect("Failed");
535
536        let state = arbiter.state();
537        // Both should be running
538        assert_eq!(state.active_llm_workloads, 1);
539        assert_eq!(state.active_diffusion_workloads, 1);
540        // High priority LLM should have better quality
541        assert!(state.llm_quality >= state.diffusion_quality);
542    }
543}