Skip to main content

ferrum_interfaces/
scheduler.rs

1//! Unified scheduler interface with resource awareness and SLA support
2//!
3//! This module provides the unified scheduler interface that replaces the
4//! conflicting scheduler definitions in the original codebase.
5
6use async_trait::async_trait;
7use ferrum_types::{
8    BatchId, InferenceRequest, InferenceResponse, Priority, RequestId, RequestState, Result,
9    SchedulerConfig as TypesSchedulerConfig, SchedulerStats,
10};
11use serde::{Deserialize, Serialize};
12use std::{collections::HashMap, time::Duration};
13
14/// Main scheduler trait for request management and batching
15#[async_trait]
16pub trait Scheduler: Send + Sync {
17    /// Submit new inference request
18    async fn submit(&self, request: InferenceRequest) -> Result<RequestId>;
19
20    /// Get next batch of requests to execute
21    async fn next_batch(&self, hint: BatchHint) -> Option<BatchPlan>;
22
23    /// Mark request as completed
24    async fn complete(&self, request_id: RequestId, response: &InferenceResponse) -> Result<()>;
25
26    /// Cancel pending request
27    async fn cancel(&self, request_id: RequestId) -> Result<bool>;
28
29    /// Update request priority
30    async fn update_priority(&self, request_id: RequestId, priority: Priority) -> Result<()>;
31
32    /// Get scheduler metrics
33    fn metrics(&self) -> SchedulerMetrics;
34
35    /// Get scheduler configuration
36    fn config(&self) -> &TypesSchedulerConfig;
37
38    /// Get current request state if the request is tracked by scheduler.
39    fn request_state(&self, request_id: &RequestId) -> Option<RequestState> {
40        let _ = request_id;
41        None
42    }
43
44    /// Preempt running request (if supported)
45    async fn preempt(&self, _request_id: RequestId) -> Result<PreemptionResult> {
46        // Default implementation: preemption not supported
47        Err(ferrum_types::FerrumError::unsupported(
48            "Preemption not supported",
49        ))
50    }
51
52    /// Resume preempted request
53    async fn resume(&self, _request_id: RequestId) -> Result<()> {
54        // Default implementation: resumption not supported
55        Err(ferrum_types::FerrumError::unsupported(
56            "Resumption not supported",
57        ))
58    }
59}
60
61/// Batch hint for scheduler optimization
62#[derive(Debug, Clone)]
63pub struct BatchHint {
64    /// Maximum batch size
65    pub max_batch_size: usize,
66    /// Maximum total tokens in batch
67    pub max_tokens: usize,
68    /// Target latency for batch formation
69    pub target_latency_ms: Option<u64>,
70    /// Available memory for batch
71    pub available_memory: Option<u64>,
72    /// Resource constraints
73    pub resource_constraints: ResourceConstraints,
74}
75
76impl BatchHint {
77    /// Create simple batch hint with size limit
78    pub fn simple(max_batch_size: usize) -> Self {
79        Self {
80            max_batch_size,
81            max_tokens: max_batch_size * 2048, // Default reasonable token limit
82            target_latency_ms: None,
83            available_memory: None,
84            resource_constraints: ResourceConstraints::default(),
85        }
86    }
87}
88
89/// Resource constraints for scheduling
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct ResourceConstraints {
92    /// Maximum GPU memory usage
93    pub max_gpu_memory: Option<u64>,
94    /// Maximum CPU memory usage
95    pub max_cpu_memory: Option<u64>,
96    /// Maximum compute units
97    pub max_compute_units: Option<usize>,
98    /// Required device types
99    pub required_devices: Vec<ferrum_types::Device>,
100}
101
102impl Default for ResourceConstraints {
103    fn default() -> Self {
104        Self {
105            max_gpu_memory: None,
106            max_cpu_memory: None,
107            max_compute_units: None,
108            required_devices: vec![],
109        }
110    }
111}
112
113/// Batch execution plan
114#[derive(Debug, Clone)]
115pub struct BatchPlan {
116    /// Unique batch identifier
117    pub batch_id: BatchId,
118    /// Requests included in this batch
119    pub requests: Vec<ScheduledRequest>,
120    /// Maximum sequence length in batch
121    pub max_sequence_length: usize,
122    /// Estimated execution time
123    pub estimated_time_ms: Option<u64>,
124    /// Resource requirements
125    pub resource_requirements: BatchResourceRequirements,
126    /// Batch creation timestamp
127    pub created_at: chrono::DateTime<chrono::Utc>,
128}
129
130impl BatchPlan {
131    /// Get total number of tokens in batch
132    pub fn total_tokens(&self) -> usize {
133        self.requests
134            .iter()
135            .map(|req| req.request.sampling_params.max_tokens)
136            .sum()
137    }
138
139    /// Get batch size
140    pub fn size(&self) -> usize {
141        self.requests.len()
142    }
143
144    /// Check if batch is empty
145    pub fn is_empty(&self) -> bool {
146        self.requests.is_empty()
147    }
148
149    /// Get highest priority in batch
150    pub fn max_priority(&self) -> Priority {
151        self.requests
152            .iter()
153            .map(|req| req.request.priority)
154            .max()
155            .unwrap_or(Priority::Low)
156    }
157}
158
159/// Scheduled request with additional metadata
160#[derive(Debug, Clone)]
161pub struct ScheduledRequest {
162    /// Original inference request
163    pub request: InferenceRequest,
164    /// Current scheduling state
165    pub state: RequestState,
166    /// Queue position when waiting
167    pub queue_position: Option<usize>,
168    /// Estimated wait time
169    pub estimated_wait_time: Option<Duration>,
170    /// Number of tokens processed so far
171    pub tokens_processed: usize,
172    /// Allocated resources
173    pub allocated_resources: AllocatedResources,
174    /// Request submission time
175    pub submitted_at: chrono::DateTime<chrono::Utc>,
176    /// Request start time (when moved from waiting to running)
177    pub started_at: Option<chrono::DateTime<chrono::Utc>>,
178}
179
180impl ScheduledRequest {
181    /// Create new scheduled request
182    pub fn new(request: InferenceRequest) -> Self {
183        Self {
184            request,
185            state: RequestState::Waiting,
186            queue_position: None,
187            estimated_wait_time: None,
188            tokens_processed: 0,
189            allocated_resources: AllocatedResources::default(),
190            submitted_at: chrono::Utc::now(),
191            started_at: None,
192        }
193    }
194
195    /// Get request age since submission
196    pub fn age(&self) -> Duration {
197        (chrono::Utc::now() - self.submitted_at)
198            .to_std()
199            .unwrap_or_default()
200    }
201
202    /// Get processing time (if started)
203    pub fn processing_time(&self) -> Option<Duration> {
204        self.started_at
205            .map(|start| (chrono::Utc::now() - start).to_std().unwrap_or_default())
206    }
207}
208
209/// Allocated resources for a request
210#[derive(Debug, Clone, Default)]
211pub struct AllocatedResources {
212    /// KV cache blocks allocated
213    pub kv_cache_blocks: Vec<ferrum_types::BlockId>,
214    /// GPU memory allocated (bytes)
215    pub gpu_memory: u64,
216    /// CPU memory allocated (bytes)
217    pub cpu_memory: u64,
218    /// Compute units reserved
219    pub compute_units: usize,
220}
221
222/// Resource requirements for batch execution
223#[derive(Debug, Clone)]
224pub struct BatchResourceRequirements {
225    /// Required GPU memory
226    pub gpu_memory: u64,
227    /// Required CPU memory
228    pub cpu_memory: u64,
229    /// Required KV cache blocks
230    pub kv_cache_blocks: usize,
231    /// Required compute units
232    pub compute_units: usize,
233}
234
235/// Preemption result
236#[derive(Debug, Clone)]
237pub struct PreemptionResult {
238    /// Whether preemption was successful
239    pub success: bool,
240    /// Saved state for resumption (if any)
241    pub saved_state: Option<PreemptionState>,
242    /// Resources freed by preemption
243    pub freed_resources: AllocatedResources,
244}
245
246/// State saved during preemption
247#[derive(Debug, Clone)]
248pub struct PreemptionState {
249    /// KV cache checkpoint
250    pub kv_cache_checkpoint: Vec<u8>,
251    /// Number of tokens processed
252    pub tokens_processed: usize,
253    /// Generation state
254    pub generation_state: HashMap<String, serde_json::Value>,
255}
256
257/// Scheduler configuration
258pub type SchedulerConfig = TypesSchedulerConfig;
259
260/// Scheduling policies
261#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
262pub enum SchedulingPolicy {
263    /// First-Come-First-Served
264    FCFS,
265    /// Priority-based scheduling
266    Priority,
267    /// Fair-share scheduling
268    FairShare,
269    /// Shortest-Job-First
270    SJF,
271    /// Resource-aware scheduling
272    ResourceAware,
273    /// SLA-driven scheduling
274    SlaAware,
275}
276
277/// Fair share configuration
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct FairShareConfig {
280    /// Share weights per client
281    pub client_shares: HashMap<String, f32>,
282    /// Default share for unspecified clients
283    pub default_share: f32,
284    /// Share enforcement strictness (0.0 - 1.0)
285    pub enforcement_strictness: f32,
286}
287
288/// SLA configuration
289#[derive(Debug, Clone, Serialize, Deserialize)]
290pub struct SlaConfig {
291    /// Enable SLA enforcement
292    pub enabled: bool,
293    /// Default SLA for requests without specific SLA
294    pub default_sla: SlaRequirements,
295    /// Per-client SLA overrides
296    pub client_slas: HashMap<String, SlaRequirements>,
297}
298
299/// SLA requirements
300#[derive(Debug, Clone, Serialize, Deserialize)]
301pub struct SlaRequirements {
302    /// Maximum latency (P95)
303    pub max_latency_p95_ms: u64,
304    /// Maximum latency (P99)
305    pub max_latency_p99_ms: u64,
306    /// Minimum throughput
307    pub min_throughput_rps: f32,
308    /// Availability requirement
309    pub availability_percent: f32,
310}
311
312/// Resource limits
313#[derive(Debug, Clone, Serialize, Deserialize)]
314pub struct ResourceLimits {
315    /// Maximum total GPU memory
316    pub max_gpu_memory: Option<u64>,
317    /// Maximum total CPU memory
318    pub max_cpu_memory: Option<u64>,
319    /// Maximum KV cache blocks
320    pub max_kv_cache_blocks: Option<usize>,
321    /// Per-client resource limits
322    pub per_client_limits: HashMap<String, ClientResourceLimits>,
323}
324
325impl Default for ResourceLimits {
326    fn default() -> Self {
327        Self {
328            max_gpu_memory: None,
329            max_cpu_memory: None,
330            max_kv_cache_blocks: None,
331            per_client_limits: HashMap::new(),
332        }
333    }
334}
335
336/// Per-client resource limits
337#[derive(Debug, Clone, Serialize, Deserialize)]
338pub struct ClientResourceLimits {
339    /// Max concurrent requests per client
340    pub max_concurrent_requests: usize,
341    /// Max GPU memory per client
342    pub max_gpu_memory: Option<u64>,
343    /// Max requests per minute
344    pub max_requests_per_minute: Option<u32>,
345}
346
347pub type SchedulerMetrics = SchedulerStats;
348
349/// Advanced scheduler capabilities
350#[async_trait]
351pub trait AdvancedScheduler: Scheduler {
352    /// Enable resource-aware scheduling
353    async fn enable_resource_awareness(&mut self, config: ResourceAwarenessConfig) -> Result<()>;
354
355    /// Set custom admission policy
356    async fn set_admission_policy(&mut self, policy: Box<dyn AdmissionPolicy>) -> Result<()>;
357
358    /// Configure dynamic batching
359    async fn configure_dynamic_batching(&mut self, config: DynamicBatchingConfig) -> Result<()>;
360
361    /// Get detailed queue analysis
362    fn queue_analysis(&self) -> QueueAnalysis;
363
364    /// Simulate scheduling for capacity planning
365    async fn simulate_load(
366        &self,
367        workload: &SimulatedWorkload,
368    ) -> Result<SchedulingSimulationResult>;
369}
370
371/// Resource awareness configuration
372#[derive(Debug, Clone, Serialize, Deserialize)]
373pub struct ResourceAwarenessConfig {
374    /// Enable memory-aware scheduling
375    pub enable_memory_awareness: bool,
376    /// Enable compute-aware scheduling
377    pub enable_compute_awareness: bool,
378    /// Resource prediction horizon
379    pub prediction_horizon_ms: u64,
380    /// Resource safety margin (0.0 - 1.0)
381    pub safety_margin: f32,
382}
383
384/// Admission policy for request acceptance
385pub trait AdmissionPolicy: Send + Sync {
386    /// Decide whether to admit a request
387    fn should_admit(
388        &self,
389        request: &InferenceRequest,
390        current_metrics: &SchedulerMetrics,
391    ) -> AdmissionDecision;
392
393    /// Get policy name
394    fn name(&self) -> &str;
395}
396
397/// Admission decision
398#[derive(Debug, Clone)]
399pub enum AdmissionDecision {
400    /// Accept the request
401    Accept,
402    /// Reject the request with reason
403    Reject(String),
404    /// Accept but suggest delay
405    AcceptWithDelay(Duration),
406}
407
408/// Dynamic batching configuration
409#[derive(Debug, Clone, Serialize, Deserialize)]
410pub struct DynamicBatchingConfig {
411    /// Minimum batch size
412    pub min_batch_size: usize,
413    /// Maximum batch size
414    pub max_batch_size: usize,
415    /// Batch formation timeout
416    pub batch_timeout_ms: u64,
417    /// Enable adaptive batch sizing
418    pub enable_adaptive_sizing: bool,
419    /// Target batch utilization
420    pub target_utilization: f32,
421}
422
423/// Queue analysis results
424#[derive(Debug, Clone)]
425pub struct QueueAnalysis {
426    /// Queue depth over time
427    pub queue_depth_history: Vec<(chrono::DateTime<chrono::Utc>, usize)>,
428    /// Wait time distribution
429    pub wait_time_distribution: WaitTimeDistribution,
430    /// Request pattern analysis
431    pub request_patterns: RequestPatternAnalysis,
432    /// Bottleneck identification
433    pub bottlenecks: Vec<BottleneckAnalysis>,
434}
435
436/// Wait time distribution
437#[derive(Debug, Clone)]
438pub struct WaitTimeDistribution {
439    /// P50 wait time
440    pub p50_ms: f64,
441    /// P95 wait time
442    pub p95_ms: f64,
443    /// P99 wait time
444    pub p99_ms: f64,
445    /// Maximum wait time
446    pub max_ms: f64,
447    /// Average wait time
448    pub mean_ms: f64,
449}
450
451/// Request pattern analysis
452#[derive(Debug, Clone)]
453pub struct RequestPatternAnalysis {
454    /// Peak request times
455    pub peak_times: Vec<chrono::DateTime<chrono::Utc>>,
456    /// Request rate trend
457    pub rate_trend: RateTrend,
458    /// Seasonality patterns
459    pub seasonality: SeasonalityPattern,
460}
461
462/// Request rate trend
463#[derive(Debug, Clone, Copy)]
464pub enum RateTrend {
465    Increasing,
466    Decreasing,
467    Stable,
468    Volatile,
469}
470
471/// Seasonality patterns
472#[derive(Debug, Clone)]
473pub struct SeasonalityPattern {
474    /// Hourly patterns
475    pub hourly_pattern: Vec<f32>,
476    /// Daily patterns  
477    pub daily_pattern: Vec<f32>,
478    /// Weekly patterns
479    pub weekly_pattern: Vec<f32>,
480}
481
482/// Bottleneck analysis
483#[derive(Debug, Clone)]
484pub struct BottleneckAnalysis {
485    /// Bottleneck type
486    pub bottleneck_type: BottleneckType,
487    /// Severity (0.0 - 1.0)
488    pub severity: f32,
489    /// Description
490    pub description: String,
491    /// Suggested mitigation
492    pub mitigation: String,
493}
494
495/// Types of bottlenecks
496#[derive(Debug, Clone, Copy)]
497pub enum BottleneckType {
498    /// Memory bottleneck
499    Memory,
500    /// Compute bottleneck
501    Compute,
502    /// I/O bottleneck
503    IO,
504    /// Scheduling bottleneck
505    Scheduling,
506    /// Network bottleneck
507    Network,
508}
509
510/// Simulated workload for capacity planning
511#[derive(Debug, Clone)]
512pub struct SimulatedWorkload {
513    /// Request arrival pattern
514    pub arrival_pattern: ArrivalPattern,
515    /// Request size distribution
516    pub size_distribution: SizeDistribution,
517    /// Simulation duration
518    pub duration_seconds: u64,
519}
520
521/// Request arrival patterns
522#[derive(Debug, Clone)]
523pub enum ArrivalPattern {
524    /// Constant rate
525    Constant { rate_rps: f32 },
526    /// Poisson process
527    Poisson { lambda: f32 },
528    /// Bursty pattern
529    Bursty {
530        burst_rate: f32,
531        quiet_rate: f32,
532        burst_duration_s: f32,
533    },
534    /// Seasonal pattern
535    Seasonal {
536        base_rate: f32,
537        peaks: Vec<(f32, f32)>,
538    }, // (time, multiplier)
539}
540
541/// Request size distribution
542#[derive(Debug, Clone)]
543pub enum SizeDistribution {
544    /// Fixed size
545    Fixed { tokens: usize },
546    /// Uniform distribution
547    Uniform {
548        min_tokens: usize,
549        max_tokens: usize,
550    },
551    /// Normal distribution
552    Normal { mean: f32, std_dev: f32 },
553    /// Log-normal distribution
554    LogNormal { mu: f32, sigma: f32 },
555}
556
557/// Scheduling simulation results
558#[derive(Debug, Clone)]
559pub struct SchedulingSimulationResult {
560    /// Total requests processed
561    pub total_requests: u64,
562    /// Successful requests
563    pub successful_requests: u64,
564    /// Failed/rejected requests
565    pub failed_requests: u64,
566    /// Average latency
567    pub avg_latency_ms: f64,
568    /// P95 latency
569    pub p95_latency_ms: f64,
570    /// P99 latency
571    pub p99_latency_ms: f64,
572    /// Throughput achieved
573    pub throughput_rps: f32,
574    /// Resource utilization (optional placeholder)
575    pub resource_utilization: Option<ResourceStats>,
576    /// Predicted bottlenecks
577    pub bottlenecks: Vec<BottleneckAnalysis>,
578}
579
580#[derive(Debug, Clone, Serialize, Deserialize, Default)]
581pub struct ResourceStats {
582    pub gpu_memory_bytes: Option<u64>,
583    pub cpu_memory_bytes: Option<u64>,
584    pub compute_utilization: Option<f32>,
585}